[feature/performance] Store account stats in separate table (#2831)

* [feature/performance] Store account stats in separate table, get stats from remote

* test account stats

* add some missing increment / decrement calls

* change stats function signatures

* rejig logging a bit

* use lock when updating stats
This commit is contained in:
tobi 2024-04-16 13:10:13 +02:00 committed by GitHub
commit 3cceed11b2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
43 changed files with 1285 additions and 450 deletions

View file

@ -695,7 +695,7 @@ func (d *Dereferencer) enrichAccount(
representation of the target account, derived from
a combination of webfinger lookups and dereferencing.
Further fetching beyond this point is for peripheral
things like account avatar, header, emojis.
things like account avatar, header, emojis, stats.
*/
// Ensure internal db ID is
@ -718,6 +718,11 @@ func (d *Dereferencer) enrichAccount(
log.Errorf(ctx, "error fetching remote emojis for account %s: %v", uri, err)
}
// Fetch followers/following count for this account.
if err := d.fetchRemoteAccountStats(ctx, latestAcc, requestUser); err != nil {
log.Errorf(ctx, "error fetching remote stats for account %s: %v", uri, err)
}
if account.IsNew() {
// Prefer published/created time from
// apubAcc, fall back to FetchedAt value.
@ -1036,6 +1041,113 @@ func (d *Dereferencer) fetchRemoteAccountEmojis(ctx context.Context, targetAccou
return changed, nil
}
func (d *Dereferencer) fetchRemoteAccountStats(ctx context.Context, account *gtsmodel.Account, requestUser string) error {
// Ensure we have a stats model for this account.
if account.Stats == nil {
if err := d.state.DB.PopulateAccountStats(ctx, account); err != nil {
return gtserror.Newf("db error getting account stats: %w", err)
}
}
// We want to update stats by getting remote
// followers/following/statuses counts for
// this account.
//
// If we fail getting any particular stat,
// it will just fall back to counting local.
// Followers first.
if count, err := d.countCollection(
ctx,
account.FollowersURI,
requestUser,
); err != nil {
// Log this but don't bail.
log.Warnf(ctx,
"couldn't count followers for @%s@%s: %v",
account.Username, account.Domain, err,
)
} else if count > 0 {
// Positive integer is useful!
account.Stats.FollowersCount = &count
}
// Now following.
if count, err := d.countCollection(
ctx,
account.FollowingURI,
requestUser,
); err != nil {
// Log this but don't bail.
log.Warnf(ctx,
"couldn't count following for @%s@%s: %v",
account.Username, account.Domain, err,
)
} else if count > 0 {
// Positive integer is useful!
account.Stats.FollowingCount = &count
}
// Now statuses count.
if count, err := d.countCollection(
ctx,
account.OutboxURI,
requestUser,
); err != nil {
// Log this but don't bail.
log.Warnf(ctx,
"couldn't count statuses for @%s@%s: %v",
account.Username, account.Domain, err,
)
} else if count > 0 {
// Positive integer is useful!
account.Stats.StatusesCount = &count
}
// Update stats now.
if err := d.state.DB.UpdateAccountStats(
ctx,
account.Stats,
"followers_count",
"following_count",
"statuses_count",
); err != nil {
return gtserror.Newf("db error updating account stats: %w", err)
}
return nil
}
// countCollection parses the given uriStr,
// dereferences the result as a collection
// type, and returns total items as 0, or
// a positive integer, or -1 if total items
// cannot be counted.
//
// Error will be returned for invalid non-empty
// URIs or dereferencing isses.
func (d *Dereferencer) countCollection(
ctx context.Context,
uriStr string,
requestUser string,
) (int, error) {
if uriStr == "" {
return -1, nil
}
uri, err := url.Parse(uriStr)
if err != nil {
return -1, err
}
collect, err := d.dereferenceCollection(ctx, requestUser, uri)
if err != nil {
return -1, err
}
return collect.TotalItems(), nil
}
// dereferenceAccountFeatured dereferences an account's featuredCollectionURI (if not empty). For each discovered status, this status will
// be dereferenced (if necessary) and marked as pinned (if necessary). Then, old pins will be removed if they're not included in new pins.
func (d *Dereferencer) dereferenceAccountFeatured(ctx context.Context, requestUser string, account *gtsmodel.Account) error {

View file

@ -40,7 +40,7 @@ func (d *Dereferencer) dereferenceCollection(ctx context.Context, username strin
rsp, err := transport.Dereference(ctx, pageIRI)
if err != nil {
return nil, gtserror.Newf("error deferencing %s: %w", pageIRI.String(), err)
return nil, gtserror.Newf("error dereferencing %s: %w", pageIRI.String(), err)
}
collect, err := ap.ResolveCollection(ctx, rsp.Body)

View file

@ -89,11 +89,13 @@ func (f *federatingDB) Accept(ctx context.Context, accept vocab.ActivityStreamsA
return err
}
// Process side effects asynchronously.
f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{
APObjectType: ap.ActivityFollow,
APActivityType: ap.ActivityAccept,
GTSModel: follow,
ReceivingAccount: receivingAcct,
APObjectType: ap.ActivityFollow,
APActivityType: ap.ActivityAccept,
GTSModel: follow,
ReceivingAccount: receivingAcct,
RequestingAccount: requestingAcct,
})
}
@ -136,11 +138,13 @@ func (f *federatingDB) Accept(ctx context.Context, accept vocab.ActivityStreamsA
return err
}
// Process side effects asynchronously.
f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{
APObjectType: ap.ActivityFollow,
APActivityType: ap.ActivityAccept,
GTSModel: follow,
ReceivingAccount: receivingAcct,
APObjectType: ap.ActivityFollow,
APActivityType: ap.ActivityAccept,
GTSModel: follow,
ReceivingAccount: receivingAcct,
RequestingAccount: requestingAcct,
})
continue

View file

@ -82,10 +82,11 @@ func (f *federatingDB) Announce(ctx context.Context, announce vocab.ActivityStre
// This is a new boost. Process side effects asynchronously.
f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{
APObjectType: ap.ActivityAnnounce,
APActivityType: ap.ActivityCreate,
GTSModel: boost,
ReceivingAccount: receivingAcct,
APObjectType: ap.ActivityAnnounce,
APActivityType: ap.ActivityCreate,
GTSModel: boost,
ReceivingAccount: receivingAcct,
RequestingAccount: requestingAcct,
})
return nil

View file

@ -131,10 +131,11 @@ func (f *federatingDB) activityBlock(ctx context.Context, asType vocab.Type, rec
}
f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{
APObjectType: ap.ActivityBlock,
APActivityType: ap.ActivityCreate,
GTSModel: block,
ReceivingAccount: receiving,
APObjectType: ap.ActivityBlock,
APActivityType: ap.ActivityCreate,
GTSModel: block,
ReceivingAccount: receiving,
RequestingAccount: requestingAccount,
})
return nil
@ -307,7 +308,8 @@ func (f *federatingDB) createPollOptionables(
PollID: inReplyTo.PollID,
Poll: inReplyTo.Poll,
},
ReceivingAccount: receiver,
ReceivingAccount: receiver,
RequestingAccount: requester,
})
return nil
@ -376,12 +378,13 @@ func (f *federatingDB) createStatusable(
// Pass the statusable URI (APIri) into the processor
// worker and do the rest of the processing asynchronously.
f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{
APObjectType: ap.ObjectNote,
APActivityType: ap.ActivityCreate,
APIri: ap.GetJSONLDId(statusable),
APObjectModel: nil,
GTSModel: nil,
ReceivingAccount: receiver,
APObjectType: ap.ObjectNote,
APActivityType: ap.ActivityCreate,
APIri: ap.GetJSONLDId(statusable),
APObjectModel: nil,
GTSModel: nil,
ReceivingAccount: receiver,
RequestingAccount: requester,
})
return nil
}
@ -389,12 +392,13 @@ func (f *federatingDB) createStatusable(
// Do the rest of the processing asynchronously. The processor
// will handle inserting/updating + further dereferencing the status.
f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{
APObjectType: ap.ObjectNote,
APActivityType: ap.ActivityCreate,
APIri: nil,
GTSModel: nil,
APObjectModel: statusable,
ReceivingAccount: receiver,
APObjectType: ap.ObjectNote,
APActivityType: ap.ActivityCreate,
APIri: nil,
GTSModel: nil,
APObjectModel: statusable,
ReceivingAccount: receiver,
RequestingAccount: requester,
})
return nil
@ -436,10 +440,11 @@ func (f *federatingDB) activityFollow(ctx context.Context, asType vocab.Type, re
}
f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{
APObjectType: ap.ActivityFollow,
APActivityType: ap.ActivityCreate,
GTSModel: followRequest,
ReceivingAccount: receivingAccount,
APObjectType: ap.ActivityFollow,
APActivityType: ap.ActivityCreate,
GTSModel: followRequest,
ReceivingAccount: receivingAccount,
RequestingAccount: requestingAccount,
})
return nil
@ -480,10 +485,11 @@ func (f *federatingDB) activityLike(ctx context.Context, asType vocab.Type, rece
}
f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{
APObjectType: ap.ActivityLike,
APActivityType: ap.ActivityCreate,
GTSModel: fave,
ReceivingAccount: receivingAccount,
APObjectType: ap.ActivityLike,
APActivityType: ap.ActivityCreate,
GTSModel: fave,
ReceivingAccount: receivingAccount,
RequestingAccount: requestingAccount,
})
return nil
@ -531,10 +537,11 @@ func (f *federatingDB) activityFlag(ctx context.Context, asType vocab.Type, rece
}
f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{
APObjectType: ap.ActivityFlag,
APActivityType: ap.ActivityCreate,
GTSModel: report,
ReceivingAccount: receivingAccount,
APObjectType: ap.ActivityFlag,
APActivityType: ap.ActivityCreate,
GTSModel: report,
ReceivingAccount: receivingAccount,
RequestingAccount: requestingAccount,
})
return nil

View file

@ -63,10 +63,11 @@ func (f *federatingDB) Delete(ctx context.Context, id *url.URL) error {
if a, err := f.state.DB.GetAccountByURI(ctx, id.String()); err == nil && requestingAcct.ID == a.ID {
l.Debugf("deleting account: %s", a.ID)
f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{
APObjectType: ap.ObjectProfile,
APActivityType: ap.ActivityDelete,
GTSModel: a,
ReceivingAccount: receivingAcct,
APObjectType: ap.ObjectProfile,
APActivityType: ap.ActivityDelete,
GTSModel: a,
ReceivingAccount: receivingAcct,
RequestingAccount: requestingAcct,
})
}

View file

@ -99,11 +99,12 @@ func (f *federatingDB) updateAccountable(ctx context.Context, receivingAcct *gts
// updating of eg., avatar/header, emojis, etc. The actual db
// inserts/updates will take place there.
f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{
APObjectType: ap.ObjectProfile,
APActivityType: ap.ActivityUpdate,
GTSModel: requestingAcct,
APObjectModel: accountable,
ReceivingAccount: receivingAcct,
APObjectType: ap.ObjectProfile,
APActivityType: ap.ActivityUpdate,
GTSModel: requestingAcct,
APObjectModel: accountable,
ReceivingAccount: receivingAcct,
RequestingAccount: requestingAcct,
})
return nil
@ -155,11 +156,12 @@ func (f *federatingDB) updateStatusable(ctx context.Context, receivingAcct *gtsm
// Queue an UPDATE NOTE activity to our fedi API worker,
// this will handle necessary database insertions, etc.
f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{
APObjectType: ap.ObjectNote,
APActivityType: ap.ActivityUpdate,
GTSModel: status, // original status
APObjectModel: (ap.Statusable)(statusable),
ReceivingAccount: receivingAcct,
APObjectType: ap.ObjectNote,
APActivityType: ap.ActivityUpdate,
GTSModel: status, // original status
APObjectModel: (ap.Statusable)(statusable),
ReceivingAccount: receivingAcct,
RequestingAccount: requestingAcct,
})
return nil