From b04b4f85161e8f8b7b7ce19b6cd9d67ebe08b21f Mon Sep 17 00:00:00 2001 From: kim Date: Tue, 8 Apr 2025 15:19:36 +0100 Subject: [PATCH] share a bunch of the database load code in timeline cache, don't clear timelines on relationship change --- internal/cache/timeline/status.go | 217 ++++++++---------- internal/processing/timeline/timeline.go | 3 - internal/processing/workers/fromclientapi.go | 38 +-- internal/processing/workers/fromfediapi.go | 40 ++-- .../processing/workers/surfacetimeline.go | 22 +- 5 files changed, 148 insertions(+), 172 deletions(-) diff --git a/internal/cache/timeline/status.go b/internal/cache/timeline/status.go index fe5120aec..bf8c8f5df 100644 --- a/internal/cache/timeline/status.go +++ b/internal/cache/timeline/status.go @@ -387,7 +387,8 @@ func (t *StatusTimeline) Preload( // but still easily manageable memory-wise. recentBoosts := make(map[string]int, t.cut) - // Iterate the entire timeline cache and mark repeat boosts. + // Iterate timeline ascending (i.e. oldest -> newest), marking + // entry IDs and marking down if boosts have been seen recently. for idx, value := range t.cache.RangeUnsafe(structr.Asc) { // Store current ID in map. @@ -426,9 +427,7 @@ func (t *StatusTimeline) Load( // to load status models of already cached entries in the timeline. loadIDs func(ids []string) (statuses []*gtsmodel.Status, err error), - // filter can be used to perform filtering of returned - // statuses BEFORE insert into cache. i.e. this will effect - // what actually gets stored in the timeline cache. + // filter can be used to perform filtering of returned statuses. filter func(each *gtsmodel.Status) (delete bool, err error), // prepareAPI should prepare internal status model to frontend API model. @@ -439,13 +438,6 @@ func (t *StatusTimeline) Load( string, // hi error, ) { - switch { - case page == nil: - panic("nil page") - case loadPage == nil: - panic("nil load page func") - } - // Get paging details. lo := page.Min.Value hi := page.Max.Value @@ -470,12 +462,6 @@ func (t *StatusTimeline) Load( dir, ) - // TODO: in time, we should think about (dynamically?) preloading - // the timelines, and any page requests outside of the cached - // range go straight to the database. otherwise there's peculiarities - // that may arise due to concurrent new and old range inserts, also - // requests for old page ranges are almost always going to be one-off. - // We now reset the lo,hi values to // represent the lowest and highest // index values of loaded statuses. @@ -486,8 +472,7 @@ func (t *StatusTimeline) Load( // response values. lo, hi = "", "" - // Preallocate a slice of up-to-limit API models. - apiStatuses := make([]*apimodel.Status, 0, limit) + var apiStatuses []*apimodel.Status if len(metas) > 0 { // Before we can do any filtering, we need @@ -504,6 +489,9 @@ func (t *StatusTimeline) Load( // Update paging parameters used for next database query. nextPageParams(nextPg, metas[len(metas)-1].ID, order) + // Allocate slice of expected required API models. + apiStatuses = make([]*apimodel.Status, 0, len(metas)) + // Prepare frontend API models for // the cached statuses. For now this // also does its own extra filtering. @@ -515,74 +503,23 @@ func (t *StatusTimeline) Load( ) } - // Check if we need to call - // through to the database. + // If no cached timeline statuses + // were found for page, we need to + // call through to the database. if len(apiStatuses) == 0 { + var err error - // Load a little more than - // limit to reduce db calls. - nextPg.Limit += 10 - - // Perform maximum of 5 load - // attempts fetching statuses. - for i := 0; i < 5; i++ { - - // Load next timeline statuses. - statuses, err := loadPage(nextPg) - if err != nil { - return nil, "", "", gtserror.Newf("error loading timeline: %w", err) - } - - // No more statuses from - // load function = at end. - if len(statuses) == 0 { - break - } - - if hi == "" { - // Set hi returned paging - // value if not already set. - hi = statuses[0].ID - } - - // Update nextPg cursor parameter for next database query. - nextPageParams(nextPg, statuses[len(statuses)-1].ID, order) - - // Perform any filtering on newly loaded statuses. - statuses, err = doStatusFilter(statuses, filter) - if err != nil { - return nil, "", "", gtserror.Newf("error filtering statuses: %w", err) - } - - // After filtering no more - // statuses remain, retry. - if len(statuses) == 0 { - continue - } - - // Convert to our cache type, - // these will get inserted into - // the cache in prepare() below. - metas := toStatusMeta(nil, statuses) - - // Prepare frontend API models for - // the loaded statuses. For now this - // also does its own extra filtering. - apiStatuses = prepareStatuses(ctx, - metas, - prepareAPI, - apiStatuses, - limit, - ) - - // If we have anything, return - // here. Even if below limit. - if len(apiStatuses) > 0 { - - // Set returned lo status paging value. - lo = apiStatuses[len(apiStatuses)-1].ID - break - } + // Pass through to main timeline db load function. + apiStatuses, lo, hi, err = loadStatusTimeline(ctx, + nextPg, + metas, + apiStatuses, + loadPage, + filter, + prepareAPI, + ) + if err != nil { + return nil, "", "", err } } @@ -608,7 +545,6 @@ func LoadStatusTimeline( ctx context.Context, page *paging.Page, loadPage func(page *paging.Page) (statuses []*gtsmodel.Status, err error), - loadIDs func(ids []string) (statuses []*gtsmodel.Status, err error), filter func(each *gtsmodel.Status) (delete bool, err error), prepareAPI func(status *gtsmodel.Status) (apiStatus *apimodel.Status, err error), ) ( @@ -617,38 +553,83 @@ func LoadStatusTimeline( string, // hi error, ) { - switch { - case page == nil: - panic("nil page") - case loadPage == nil: - panic("nil load page func") - } - - // Get paging details. - lo := page.Min.Value - hi := page.Max.Value - limit := page.Limit - order := page.Order() - // Use a copy of current page so // we can repeatedly update it. nextPg := new(paging.Page) *nextPg = *page - nextPg.Min.Value = lo - nextPg.Max.Value = hi - // We now reset the lo,hi values to - // represent the lowest and highest - // index values of loaded statuses. - lo, hi = "", "" + // Pass through to main timeline db load function. + apiStatuses, lo, hi, err := loadStatusTimeline(ctx, + nextPg, + nil, + nil, + loadPage, + filter, + prepareAPI, + ) + if err != nil { + return nil, "", "", err + } - // Preallocate a slice of up-to-limit API models. - apiStatuses := make([]*apimodel.Status, 0, limit) + if page.Order().Ascending() { + // The caller always expects the statuses + // to be returned in DESC order, but we + // build the status slice in paging order. + // If paging ASC, we need to reverse the + // returned statuses and paging values. + slices.Reverse(apiStatuses) + lo, hi = hi, lo + } + + return apiStatuses, lo, hi, nil +} + +// loadStatusTimeline encapsulates most of the main +// timeline-load-from-database logic, allowing both +// the temporary LoadStatusTimeline() function AND +// the main StatusTimeline{}.Load() function to share +// as much logic as possible. +// +// TODO: it may be worth moving this into StatusTimeline{}.Load() +// once the temporary function above has been removed. Or it may +// still be worth keeping *some* database logic separate. +func loadStatusTimeline( + ctx context.Context, + nextPg *paging.Page, + metas []*StatusMeta, + apiStatuses []*apimodel.Status, + loadPage func(page *paging.Page) (statuses []*gtsmodel.Status, err error), + filter func(each *gtsmodel.Status) (delete bool, err error), + prepareAPI func(status *gtsmodel.Status) (apiStatus *apimodel.Status, err error), +) ( + []*apimodel.Status, + string, // lo + string, // hi + error, +) { + if loadPage == nil { + panic("nil load page func") + } + + // Lowest and highest ID + // vals of loaded statuses. + var lo, hi string + + // Extract paging params. + order := nextPg.Order() + limit := nextPg.Limit // Load a little more than // limit to reduce db calls. nextPg.Limit += 10 + // Ensure we have a slice of meta objects to + // use in later preparation of the API models. + metas = xslices.GrowJust(metas[:0], nextPg.Limit) + + // Ensure we have a slice of required frontend API models. + apiStatuses = xslices.GrowJust(apiStatuses[:0], nextPg.Limit) + // Perform maximum of 5 load // attempts fetching statuses. for i := 0; i < 5; i++ { @@ -689,7 +670,7 @@ func LoadStatusTimeline( // Convert to our cache type, // these will get inserted into // the cache in prepare() below. - metas := toStatusMeta(nil, statuses) + metas = toStatusMeta(metas[:0], statuses) // Prepare frontend API models for // the loaded statuses. For now this @@ -711,22 +692,12 @@ func LoadStatusTimeline( } } - if order.Ascending() { - // The caller always expects the statuses - // to be returned in DESC order, but we - // build the status slice in paging order. - // If paging ASC, we need to reverse the - // returned statuses and paging values. - slices.Reverse(apiStatuses) - lo, hi = hi, lo - } - return apiStatuses, lo, hi, nil } -// InsertOne allows you to insert a single status into the timeline, with optional prepared API model, -// the return value indicates whether the passed status has been boosted recently on the timeline. -func (t *StatusTimeline) InsertOne(status *gtsmodel.Status, prepared *apimodel.Status) (repeatBoost bool) { +// InsertOne allows you to insert a single status into the timeline, with optional prepared API model. +// The return value indicates whether status should be skipped from streams, e.g. if already boosted recently. +func (t *StatusTimeline) InsertOne(status *gtsmodel.Status, prepared *apimodel.Status) (skip bool) { if status.BoostOfID != "" { // Check through top $repeatBoostDepth number of timeline items. for i, value := range t.cache.RangeUnsafe(structr.Desc) { @@ -737,7 +708,7 @@ func (t *StatusTimeline) InsertOne(status *gtsmodel.Status, prepared *apimodel.S // If inserted status has already been boosted, or original was posted // within last $repeatBoostDepth, we indicate it as a repeated boost. if value.ID == status.BoostOfID || value.BoostOfID == status.BoostOfID { - repeatBoost = true + skip = true break } } @@ -749,7 +720,7 @@ func (t *StatusTimeline) InsertOne(status *gtsmodel.Status, prepared *apimodel.S AccountID: status.AccountID, BoostOfID: status.BoostOfID, BoostOfAccountID: status.BoostOfAccountID, - repeatBoost: repeatBoost, + repeatBoost: skip, loaded: nil, prepared: prepared, }) > t.max { diff --git a/internal/processing/timeline/timeline.go b/internal/processing/timeline/timeline.go index d176118b4..ed2bf7b90 100644 --- a/internal/processing/timeline/timeline.go +++ b/internal/processing/timeline/timeline.go @@ -199,9 +199,6 @@ func (p *Processor) getStatusTimeline( apiStatuses, lo, hi, err = timelinepkg.LoadStatusTimeline(ctx, page, loadPage, - func(ids []string) ([]*gtsmodel.Status, error) { - return p.state.DB.GetStatusesByIDs(ctx, ids) - }, filter, prepare, ) diff --git a/internal/processing/workers/fromclientapi.go b/internal/processing/workers/fromclientapi.go index 8c72859e1..661fea866 100644 --- a/internal/processing/workers/fromclientapi.go +++ b/internal/processing/workers/fromclientapi.go @@ -683,13 +683,19 @@ func (p *clientAPI) CreateBlock(ctx context.Context, cMsg *messages.FromClientAP } if block.Account.IsLocal() { - // Perform timeline invalidation for block origin account. - p.surface.invalidateTimelinesForAccount(ctx, block.AccountID) + // Remove posts by target from origin's timelines. + p.surface.removeRelationshipFromTimelines(ctx, + block.AccountID, + block.TargetAccountID, + ) } if block.TargetAccount.IsLocal() { - // Perform timeline invalidation for block target account. - p.surface.invalidateTimelinesForAccount(ctx, block.TargetAccountID) + // Remove posts by origin from target's timelines. + p.surface.removeRelationshipFromTimelines(ctx, + block.TargetAccountID, + block.AccountID, + ) } // TODO: same with notifications? @@ -851,13 +857,19 @@ func (p *clientAPI) UndoFollow(ctx context.Context, cMsg *messages.FromClientAPI } if follow.Account.IsLocal() { - // Perform timeline invalidation for block origin account. - p.surface.invalidateTimelinesForAccount(ctx, follow.AccountID) + // Remove posts by target from origin's timelines. + p.surface.removeRelationshipFromTimelines(ctx, + follow.AccountID, + follow.TargetAccountID, + ) } if follow.TargetAccount.IsLocal() { - // Perform timeline invalidation for block target account. - p.surface.invalidateTimelinesForAccount(ctx, follow.TargetAccountID) + // Remove posts by origin from target's timelines. + p.surface.removeRelationshipFromTimelines(ctx, + follow.TargetAccountID, + follow.AccountID, + ) } if err := p.federate.UndoFollow(ctx, follow); err != nil { @@ -873,16 +885,6 @@ func (p *clientAPI) UndoBlock(ctx context.Context, cMsg *messages.FromClientAPI) return gtserror.Newf("%T not parseable as *gtsmodel.Block", cMsg.GTSModel) } - if block.Account.IsLocal() { - // Perform timeline invalidation for block origin account. - p.surface.invalidateTimelinesForAccount(ctx, block.AccountID) - } - - if block.TargetAccount.IsLocal() { - // Perform timeline invalidation for block target account. - p.surface.invalidateTimelinesForAccount(ctx, block.TargetAccountID) - } - if err := p.federate.UndoBlock(ctx, block); err != nil { log.Errorf(ctx, "error federating block undo: %v", err) } diff --git a/internal/processing/workers/fromfediapi.go b/internal/processing/workers/fromfediapi.go index 6aafe159e..3e0f0ba59 100644 --- a/internal/processing/workers/fromfediapi.go +++ b/internal/processing/workers/fromfediapi.go @@ -715,13 +715,19 @@ func (p *fediAPI) CreateBlock(ctx context.Context, fMsg *messages.FromFediAPI) e } if block.Account.IsLocal() { - // Perform timeline invalidation for block origin account. - p.surface.invalidateTimelinesForAccount(ctx, block.AccountID) + // Remove posts by target from origin's timelines. + p.surface.removeRelationshipFromTimelines(ctx, + block.AccountID, + block.TargetAccountID, + ) } if block.TargetAccount.IsLocal() { - // Perform timeline invalidation for block target account. - p.surface.invalidateTimelinesForAccount(ctx, block.TargetAccountID) + // Remove posts by origin from target's timelines. + p.surface.removeRelationshipFromTimelines(ctx, + block.TargetAccountID, + block.AccountID, + ) } // Remove any follows that existed between blocker + blockee. @@ -1202,33 +1208,31 @@ func (p *fediAPI) UndoFollow(ctx context.Context, fMsg *messages.FromFediAPI) er } if follow.Account.IsLocal() { - // Perform timeline invalidation for block origin account. - p.surface.invalidateTimelinesForAccount(ctx, follow.AccountID) + // Remove posts by target from origin's timelines. + p.surface.removeRelationshipFromTimelines(ctx, + follow.AccountID, + follow.TargetAccountID, + ) } if follow.TargetAccount.IsLocal() { - // Perform timeline invalidation for block target account. - p.surface.invalidateTimelinesForAccount(ctx, follow.TargetAccountID) + // Remove posts by origin from target's timelines. + p.surface.removeRelationshipFromTimelines(ctx, + follow.TargetAccountID, + follow.AccountID, + ) } return nil } func (p *fediAPI) UndoBlock(ctx context.Context, fMsg *messages.FromFediAPI) error { - block, ok := fMsg.GTSModel.(*gtsmodel.Block) + _, ok := fMsg.GTSModel.(*gtsmodel.Block) if !ok { return gtserror.Newf("%T not parseable as *gtsmodel.Block", fMsg.GTSModel) } - if block.Account.IsLocal() { - // Perform timeline invalidation for block origin account. - p.surface.invalidateTimelinesForAccount(ctx, block.AccountID) - } - - if block.TargetAccount.IsLocal() { - // Perform timeline invalidation for block target account. - p.surface.invalidateTimelinesForAccount(ctx, block.TargetAccountID) - } + // TODO: any required changes return nil } diff --git a/internal/processing/workers/surfacetimeline.go b/internal/processing/workers/surfacetimeline.go index a0762822d..0f2e80d0f 100644 --- a/internal/processing/workers/surfacetimeline.go +++ b/internal/processing/workers/surfacetimeline.go @@ -826,21 +826,23 @@ func (s *Surface) removeTimelineEntriesByAccount(accountID string) { s.State.Caches.Timelines.List.RemoveByAccountIDs(accountID) } -// invalidateTimelinesForAccount invalidates all timeline caches stored for given account ID. -func (s *Surface) invalidateTimelinesForAccount(ctx context.Context, accountID string) { - - // There's a lot of visibility changes to caclculate for any - // relationship change, so just clear all account's timelines. - s.State.Caches.Timelines.Home.Clear(accountID) +func (s *Surface) removeRelationshipFromTimelines(ctx context.Context, timelineAccountID string, targetAccountID string) { + // Remove all statuses by target account + // from given account's home timeline. + s.State.Caches.Timelines.Home. + MustGet(timelineAccountID). + RemoveByAccountIDs(targetAccountID) // Get the IDs of all the lists owned by the given account ID. - listIDs, err := s.State.DB.GetListIDsByAccountID(ctx, accountID) + listIDs, err := s.State.DB.GetListIDsByAccountID(ctx, timelineAccountID) if err != nil { - log.Errorf(ctx, "error getting lists for account %s: %v", accountID, err) + log.Errorf(ctx, "error getting lists for account %s: %v", timelineAccountID, err) } - // Clear list timelines of account. for _, listID := range listIDs { - s.State.Caches.Timelines.List.Clear(listID) + // Remove all statuses by target account + // from given account's list timelines. + s.State.Caches.Timelines.List.MustGet(listID). + RemoveByAccountIDs(targetAccountID) } }