[feature] status refetch support (#1690)

* revamp http client to not limit requests, instead use sender worker

Signed-off-by: kim <grufwub@gmail.com>

* remove separate sender worker pool, spawn 2*GOMAXPROCS batch senders each time, no need for transport cache sweeping

Signed-off-by: kim <grufwub@gmail.com>

* improve batch senders to keep popping recipients until remote URL found

Signed-off-by: kim <grufwub@gmail.com>

* fix recipient looping issue

Signed-off-by: kim <grufwub@gmail.com>

* move request id ctx key to gtscontext, finish filling out more code comments, add basic support for not logging client IP

Signed-off-by: kim <grufwub@gmail.com>

* first draft of status refetching logic

Signed-off-by: kim <grufwub@gmail.com>

* fix testrig to use new federation alloc func signature

Signed-off-by: kim <grufwub@gmail.com>

* fix log format directive

Signed-off-by: kim <grufwub@gmail.com>

* add status fetched_at migration

Signed-off-by: kim <grufwub@gmail.com>

* remove unused / unchecked for error types

Signed-off-by: kim <grufwub@gmail.com>

* add back the used type...

Signed-off-by: kim <grufwub@gmail.com>

* add separate internal getStatus() function for derefThread() that doesn't recurse

Signed-off-by: kim <grufwub@gmail.com>

* improved mention and media attachment error handling

Signed-off-by: kim <grufwub@gmail.com>

* fix log and error format directives

Signed-off-by: kim <grufwub@gmail.com>

* update account deref to match status deref changes

Signed-off-by: kim <grufwub@gmail.com>

* very small code formatting change to make things clearer

Signed-off-by: kim <grufwub@gmail.com>

* add more code comments

Signed-off-by: kim <grufwub@gmail.com>

* improved code commenting

Signed-off-by: kim <grufwub@gmail.com>

* only check for required further derefs if needed

Signed-off-by: kim <grufwub@gmail.com>

* improved cache invalidation

Signed-off-by: kim <grufwub@gmail.com>

* tweak cache restarting to use a (very small) backoff

Signed-off-by: kim <grufwub@gmail.com>

* small readability changes and fixes

Signed-off-by: kim <grufwub@gmail.com>

* fix account sync issues

Signed-off-by: kim <grufwub@gmail.com>

* fix merge conflicts + update account enrichment to accept already-passed accountable

Signed-off-by: kim <grufwub@gmail.com>

* remove secondary function declaration

Signed-off-by: kim <grufwub@gmail.com>

* normalise dereferencer get status / account behaviour, fix remaining tests

Signed-off-by: kim <grufwub@gmail.com>

* fix remaining rebase conflicts, finish commenting code

Signed-off-by: kim <grufwub@gmail.com>

* appease the linter

Signed-off-by: kim <grufwub@gmail.com>

* add source file header

Signed-off-by: kim <grufwub@gmail.com>

* update to use TIMESTAMPTZ column type instead of just TIMESTAMP

Signed-off-by: kim <grufwub@gmail.com>

* don't pass in 'updated_at' to UpdateEmoji()

Signed-off-by: kim <grufwub@gmail.com>

* use new ap.Resolve{Account,Status}able() functions

Signed-off-by: kim <grufwub@gmail.com>

* remove the somewhat confusing rescoping of the same variable names

Signed-off-by: kim <grufwub@gmail.com>

* update migration file name, improved database delete error returns

Signed-off-by: kim <grufwub@gmail.com>

* formatting

Signed-off-by: kim <grufwub@gmail.com>

* improved multi-delete database functions to minimise DB calls

Signed-off-by: kim <grufwub@gmail.com>

* remove unused type

Signed-off-by: kim <grufwub@gmail.com>

* fix delete statements

Signed-off-by: kim <grufwub@gmail.com>

---------

Signed-off-by: kim <grufwub@gmail.com>
This commit is contained in:
kim 2023-05-12 10:15:54 +01:00 committed by GitHub
commit 6c9d8e78eb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
55 changed files with 1552 additions and 1118 deletions

View file

@ -28,16 +28,17 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/gtscontext"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/log"
)
// Get processes the given request for account information.
func (p *Processor) Get(ctx context.Context, requestingAccount *gtsmodel.Account, targetAccountID string) (*apimodel.Account, gtserror.WithCode) {
targetAccount, err := p.state.DB.GetAccountByID(ctx, targetAccountID)
if err != nil {
if err == db.ErrNoEntries {
if errors.Is(err, db.ErrNoEntries) {
return nil, gtserror.NewErrorNotFound(errors.New("account not found"))
}
return nil, gtserror.NewErrorInternalError(fmt.Errorf("db error: %s", err))
return nil, gtserror.NewErrorInternalError(fmt.Errorf("db error: %w", err))
}
return p.getFor(ctx, requestingAccount, targetAccount)
@ -47,10 +48,10 @@ func (p *Processor) Get(ctx context.Context, requestingAccount *gtsmodel.Account
func (p *Processor) GetLocalByUsername(ctx context.Context, requestingAccount *gtsmodel.Account, username string) (*apimodel.Account, gtserror.WithCode) {
targetAccount, err := p.state.DB.GetAccountByUsernameDomain(ctx, username, "")
if err != nil {
if err == db.ErrNoEntries {
if errors.Is(err, db.ErrNoEntries) {
return nil, gtserror.NewErrorNotFound(errors.New("account not found"))
}
return nil, gtserror.NewErrorInternalError(fmt.Errorf("db error: %s", err))
return nil, gtserror.NewErrorInternalError(fmt.Errorf("db error: %w", err))
}
return p.getFor(ctx, requestingAccount, targetAccount)
@ -60,48 +61,50 @@ func (p *Processor) GetLocalByUsername(ctx context.Context, requestingAccount *g
func (p *Processor) GetCustomCSSForUsername(ctx context.Context, username string) (string, gtserror.WithCode) {
customCSS, err := p.state.DB.GetAccountCustomCSSByUsername(ctx, username)
if err != nil {
if err == db.ErrNoEntries {
if errors.Is(err, db.ErrNoEntries) {
return "", gtserror.NewErrorNotFound(errors.New("account not found"))
}
return "", gtserror.NewErrorInternalError(fmt.Errorf("db error: %s", err))
return "", gtserror.NewErrorInternalError(fmt.Errorf("db error: %w", err))
}
return customCSS, nil
}
func (p *Processor) getFor(ctx context.Context, requestingAccount *gtsmodel.Account, targetAccount *gtsmodel.Account) (*apimodel.Account, gtserror.WithCode) {
var blocked bool
var err error
if requestingAccount != nil {
blocked, err = p.state.DB.IsEitherBlocked(ctx, requestingAccount.ID, targetAccount.ID)
blocked, err := p.state.DB.IsEitherBlocked(ctx, requestingAccount.ID, targetAccount.ID)
if err != nil {
return nil, gtserror.NewErrorInternalError(fmt.Errorf("error checking account block: %s", err))
return nil, gtserror.NewErrorInternalError(fmt.Errorf("error checking account block: %w", err))
}
if blocked {
apiAccount, err := p.tc.AccountToAPIAccountBlocked(ctx, targetAccount)
if err != nil {
return nil, gtserror.NewErrorInternalError(fmt.Errorf("error converting account: %w", err))
}
return apiAccount, nil
}
}
if targetAccount.Domain != "" {
targetAccountURI, err := url.Parse(targetAccount.URI)
if err != nil {
return nil, gtserror.NewErrorInternalError(fmt.Errorf("error parsing url %s: %w", targetAccount.URI, err))
}
// Perform a last-minute fetch of target account to ensure remote account header / avatar is cached.
latest, _, err := p.federator.GetAccountByURI(gtscontext.SetFastFail(ctx), requestingAccount.Username, targetAccountURI)
if err != nil {
log.Errorf(ctx, "error fetching latest target account: %v", err)
} else {
// Use latest account model.
targetAccount = latest
}
}
var apiAccount *apimodel.Account
if blocked {
apiAccount, err = p.tc.AccountToAPIAccountBlocked(ctx, targetAccount)
if err != nil {
return nil, gtserror.NewErrorInternalError(fmt.Errorf("error converting account: %s", err))
}
return apiAccount, nil
}
// last-minute check to make sure we have remote account header/avi cached
if targetAccount.Domain != "" {
targetAccountURI, err := url.Parse(targetAccount.URI)
if err != nil {
return nil, gtserror.NewErrorInternalError(fmt.Errorf("error parsing url %s: %s", targetAccount.URI, err))
}
a, err := p.federator.GetAccountByURI(
gtscontext.SetFastFail(ctx), requestingAccount.Username, targetAccountURI,
)
if err == nil {
targetAccount = a
}
}
if requestingAccount != nil && targetAccount.ID == requestingAccount.ID {
apiAccount, err = p.tc.AccountToAPIAccountSensitive(ctx, targetAccount)
@ -109,7 +112,7 @@ func (p *Processor) getFor(ctx context.Context, requestingAccount *gtsmodel.Acco
apiAccount, err = p.tc.AccountToAPIAccountPublic(ctx, targetAccount)
}
if err != nil {
return nil, gtserror.NewErrorInternalError(fmt.Errorf("error converting account: %s", err))
return nil, gtserror.NewErrorInternalError(fmt.Errorf("error converting account: %w", err))
}
return apiAccount, nil

View file

@ -385,7 +385,7 @@ func (p *Processor) emojiUpdateDisable(ctx context.Context, emoji *gtsmodel.Emoj
emojiDisabled := true
emoji.Disabled = &emojiDisabled
updatedEmoji, err := p.state.DB.UpdateEmoji(ctx, emoji, "updated_at", "disabled")
updatedEmoji, err := p.state.DB.UpdateEmoji(ctx, emoji, "disabled")
if err != nil {
err = fmt.Errorf("emojiUpdateDisable: error updating emoji %s: %s", emoji.ID, err)
return nil, gtserror.NewErrorInternalError(err)
@ -434,7 +434,7 @@ func (p *Processor) emojiUpdateModify(ctx context.Context, emoji *gtsmodel.Emoji
if !updateImage {
// only updating fields, we only need
// to do a database update for this
columns := []string{"updated_at"}
var columns []string
if updateCategoryID {
emoji.CategoryID = updatedCategoryID

View file

@ -40,7 +40,7 @@ func (p *Processor) authenticate(ctx context.Context, requestedUsername string)
return
}
if requestingAccount, err = p.federator.GetAccountByURI(gtscontext.SetFastFail(ctx), requestedUsername, requestingAccountURI); err != nil {
if requestingAccount, _, err = p.federator.GetAccountByURI(gtscontext.SetFastFail(ctx), requestedUsername, requestingAccountURI); err != nil {
errWithCode = gtserror.NewErrorUnauthorized(err)
return
}

View file

@ -55,9 +55,7 @@ func (p *Processor) UserGet(ctx context.Context, requestedUsername string, reque
// if we're not already handshaking/dereferencing a remote account, dereference it now
if !p.federator.Handshaking(requestedUsername, requestingAccountURI) {
requestingAccount, err := p.federator.GetAccountByURI(
gtscontext.SetFastFail(ctx), requestedUsername, requestingAccountURI,
)
requestingAccount, _, err := p.federator.GetAccountByURI(gtscontext.SetFastFail(ctx), requestedUsername, requestingAccountURI)
if err != nil {
return nil, gtserror.NewErrorUnauthorized(err)
}

View file

@ -110,17 +110,30 @@ func (p *Processor) ProcessFromFederator(ctx context.Context, federatorMsg messa
func (p *Processor) processCreateStatusFromFederator(ctx context.Context, federatorMsg messages.FromFederator) error {
// check for either an IRI that we still need to dereference, OR an already dereferenced
// and converted status pinned to the message.
var status *gtsmodel.Status
var (
status *gtsmodel.Status
err error
)
if federatorMsg.GTSModel != nil {
// there's a gts model already pinned to the message, it should be a status
var ok bool
// there's a gts model already pinned to the message, it should be a status
if status, ok = federatorMsg.GTSModel.(*gtsmodel.Status); !ok {
return errors.New("ProcessFromFederator: note was not parseable as *gtsmodel.Status")
}
var err error
status, err = p.federator.EnrichRemoteStatus(ctx, federatorMsg.ReceivingAccount.Username, status, true)
// Since this was a create originating AP object
// statusable may have been set on message (no problem if not).
statusable, _ := federatorMsg.APObjectModel.(ap.Statusable)
// Call refresh on status to deref if necessary etc.
status, _, err = p.federator.RefreshStatus(ctx,
federatorMsg.ReceivingAccount.Username,
status,
statusable,
false,
)
if err != nil {
return err
}
@ -129,38 +142,29 @@ func (p *Processor) processCreateStatusFromFederator(ctx context.Context, federa
if federatorMsg.APIri == nil {
return errors.New("ProcessFromFederator: status was not pinned to federatorMsg, and neither was an IRI for us to dereference")
}
var err error
status, _, err = p.federator.GetStatus(ctx, federatorMsg.ReceivingAccount.Username, federatorMsg.APIri, false, false)
status, _, err = p.federator.GetStatusByURI(ctx, federatorMsg.ReceivingAccount.Username, federatorMsg.APIri)
if err != nil {
return err
}
}
// make sure the account is pinned
if status.Account == nil {
a, err := p.state.DB.GetAccountByID(ctx, status.AccountID)
if err != nil {
return err
}
status.Account = a
}
// Get the remote account to make sure the avi and header are cached.
if status.Account.Domain != "" {
remoteAccountID, err := url.Parse(status.Account.URI)
if status.Account == nil || status.Account.IsRemote() {
// Either no account attached yet, or a remote account.
// Both situations we need to parse account URI to fetch it.
remoteAccURI, err := url.Parse(status.AccountURI)
if err != nil {
return err
}
a, err := p.federator.GetAccountByURI(ctx,
// Ensure that account for this status has been deref'd.
status.Account, _, err = p.federator.GetAccountByURI(ctx,
federatorMsg.ReceivingAccount.Username,
remoteAccountID,
remoteAccURI,
)
if err != nil {
return err
}
status.Account = a
}
if err := p.timelineAndNotifyStatus(ctx, status); err != nil {
@ -193,7 +197,7 @@ func (p *Processor) processCreateFaveFromFederator(ctx context.Context, federato
return err
}
a, err := p.federator.GetAccountByURI(ctx,
a, _, err := p.federator.GetAccountByURI(ctx,
federatorMsg.ReceivingAccount.Username,
remoteAccountID,
)
@ -234,7 +238,7 @@ func (p *Processor) processCreateFollowRequestFromFederator(ctx context.Context,
return err
}
a, err := p.federator.GetAccountByURI(ctx,
a, _, err := p.federator.GetAccountByURI(ctx,
federatorMsg.ReceivingAccount.Username,
remoteAccountID,
)
@ -294,7 +298,7 @@ func (p *Processor) processCreateAnnounceFromFederator(ctx context.Context, fede
return err
}
a, err := p.federator.GetAccountByURI(ctx,
a, _, err := p.federator.GetAccountByURI(ctx,
federatorMsg.ReceivingAccount.Username,
remoteAccountID,
)
@ -376,11 +380,12 @@ func (p *Processor) processUpdateAccountFromFederator(ctx context.Context, feder
}
// Call RefreshAccount to fetch up-to-date bio, avatar, header, etc.
updatedAccount, err := p.federator.RefreshAccount(
updatedAccount, _, err := p.federator.RefreshAccount(
ctx,
federatorMsg.ReceivingAccount.Username,
incomingAccountable,
incomingAccount,
incomingAccountable,
true,
)
if err != nil {
return fmt.Errorf("error enriching updated account from federator: %s", err)

View file

@ -142,15 +142,10 @@ func (suite *FromFederatorTestSuite) TestProcessReplyMention() {
suite.NoError(err)
// 2. a notification should exist for the mention
where := []db.Where{
{
Key: "status_id",
Value: replyingStatus.ID,
},
}
notif := &gtsmodel.Notification{}
err = suite.db.GetWhere(context.Background(), where, notif)
var notif gtsmodel.Notification
err = suite.db.GetWhere(context.Background(), []db.Where{
{Key: "status_id", Value: replyingStatus.ID},
}, &notif)
suite.NoError(err)
suite.Equal(gtsmodel.NotificationMention, notif.NotificationType)
suite.Equal(replyingStatus.InReplyToAccountID, notif.TargetAccountID)

View file

@ -131,7 +131,7 @@ func NewProcessor(
processor.fedi = fedi.New(state, tc, federator, filter)
processor.media = media.New(state, tc, mediaManager, federator.TransportController())
processor.report = report.New(state, tc)
processor.status = status.New(state, tc, filter, parseMentionFunc)
processor.status = status.New(state, federator, tc, filter, parseMentionFunc)
processor.stream = stream.New(state, oauthServer)
processor.user = user.New(state, emailSender)

View file

@ -100,6 +100,8 @@ func (suite *ProcessingStandardTestSuite) SetupTest() {
suite.state.Storage = suite.storage
suite.typeconverter = testrig.NewTestTypeConverter(suite.db)
suite.httpClient = testrig.NewMockHTTPClient(nil, "../../testrig/media")
suite.httpClient.TestRemotePeople = testrig.NewTestFediPeople()
suite.httpClient.TestRemoteStatuses = testrig.NewTestFediStatuses()
suite.transportController = testrig.NewTestTransportController(&suite.state, suite.httpClient)
suite.mediaManager = testrig.NewTestMediaManager(&suite.state)

View file

@ -226,17 +226,8 @@ func (p *Processor) SearchGet(ctx context.Context, authed *oauth.Auth, search *a
}
func (p *Processor) searchStatusByURI(ctx context.Context, authed *oauth.Auth, uri *url.URL) (*gtsmodel.Status, error) {
status, statusable, err := p.federator.GetStatus(gtscontext.SetFastFail(ctx), authed.Account.Username, uri, true, true)
if err != nil {
return nil, err
}
if !*status.Local && statusable != nil {
// Attempt to dereference the status thread while we are here
p.federator.DereferenceThread(gtscontext.SetFastFail(ctx), authed.Account.Username, uri, status, statusable)
}
return status, nil
status, _, err := p.federator.GetStatusByURI(gtscontext.SetFastFail(ctx), authed.Account.Username, uri)
return status, err
}
func (p *Processor) searchAccountByURI(ctx context.Context, authed *oauth.Auth, uri *url.URL, resolve bool) (*gtsmodel.Account, error) {
@ -267,11 +258,12 @@ func (p *Processor) searchAccountByURI(ctx context.Context, authed *oauth.Auth,
return account, nil
}
return p.federator.GetAccountByURI(
account, _, err := p.federator.GetAccountByURI(
gtscontext.SetFastFail(ctx),
authed.Account.Username,
uri,
)
return account, err
}
func (p *Processor) searchAccountByUsernameDomain(ctx context.Context, authed *oauth.Auth, username string, domain string, resolve bool) (*gtsmodel.Account, error) {
@ -294,9 +286,10 @@ func (p *Processor) searchAccountByUsernameDomain(ctx context.Context, authed *o
return account, nil
}
return p.federator.GetAccountByUsernameDomain(
account, _, err := p.federator.GetAccountByUsernameDomain(
gtscontext.SetFastFail(ctx),
authed.Account.Username,
username, domain,
)
return account, err
}

View file

@ -43,6 +43,16 @@ func (p *Processor) getVisibleStatus(ctx context.Context, requestingAccount *gts
return nil, gtserror.NewErrorNotFound(err)
}
if requestingAccount != nil {
// Ensure the status is up-to-date.
p.federator.RefreshStatusAsync(ctx,
requestingAccount.Username,
targetStatus,
nil,
false,
)
}
visible, err := p.filter.StatusVisible(ctx, requestingAccount, targetStatus)
if err != nil {
err = fmt.Errorf("getVisibleStatus: error seeing if status %s is visible: %w", targetStatus.ID, err)

View file

@ -18,6 +18,7 @@
package status
import (
"github.com/superseriousbusiness/gotosocial/internal/federation"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/state"
"github.com/superseriousbusiness/gotosocial/internal/text"
@ -27,6 +28,7 @@ import (
type Processor struct {
state *state.State
federator federation.Federator
tc typeutils.TypeConverter
filter *visibility.Filter
formatter text.Formatter
@ -34,9 +36,10 @@ type Processor struct {
}
// New returns a new status processor.
func New(state *state.State, tc typeutils.TypeConverter, filter *visibility.Filter, parseMention gtsmodel.ParseMentionFunc) Processor {
func New(state *state.State, federator federation.Federator, tc typeutils.TypeConverter, filter *visibility.Filter, parseMention gtsmodel.ParseMentionFunc) Processor {
return Processor{
state: state,
federator: federator,
tc: tc,
filter: filter,
formatter: text.NewFormatter(state.DB),

View file

@ -88,7 +88,7 @@ func (suite *StatusStandardTestSuite) SetupTest() {
suite.federator = testrig.NewTestFederator(&suite.state, suite.tc, suite.mediaManager)
filter := visibility.NewFilter(&suite.state)
suite.status = status.New(&suite.state, suite.typeConverter, filter, processing.GetParseMentionFunc(suite.db, suite.federator))
suite.status = status.New(&suite.state, suite.federator, suite.typeConverter, filter, processing.GetParseMentionFunc(suite.db, suite.federator))
testrig.StandardDBSetup(suite.db, suite.testAccounts)
testrig.StandardStorageSetup(suite.storage, "../../../testrig/media")

View file

@ -57,7 +57,7 @@ func GetParseMentionFunc(dbConn db.DB, federator federation.Federator) gtsmodel.
requestingUsername = originAccount.Username
}
remoteAccount, err := federator.GetAccountByUsernameDomain(
remoteAccount, _, err := federator.GetAccountByUsernameDomain(
gtscontext.SetFastFail(ctx),
requestingUsername,
username,