From 64564496f1877405b0d437c056df99ed4fe91cea Mon Sep 17 00:00:00 2001 From: kim Date: Tue, 25 Mar 2025 12:12:09 +0000 Subject: [PATCH] fix up more tests, fix missing required changes, etc --- internal/cache/timeline/status.go | 82 +++++++++++++++---- internal/cache/timeline/timeline.go | 9 +- internal/db/bundb/timeline.go | 25 +++--- internal/processing/timeline/list.go | 2 +- internal/processing/timeline/public_test.go | 11 +-- internal/processing/timeline/timeline.go | 13 ++- internal/processing/workers/fromclientapi.go | 27 +++--- internal/processing/workers/fromfediapi.go | 28 +++---- .../processing/workers/surfacetimeline.go | 9 +- internal/processing/workers/util.go | 8 +- 10 files changed, 130 insertions(+), 84 deletions(-) diff --git a/internal/cache/timeline/status.go b/internal/cache/timeline/status.go index a53ae43ba..3cfafc60f 100644 --- a/internal/cache/timeline/status.go +++ b/internal/cache/timeline/status.go @@ -340,8 +340,8 @@ func (t *StatusTimeline) Load( } // Get paging details. - min := page.Min.Value - max := page.Max.Value + lo := page.Min.Value + hi := page.Max.Value lim := page.Limit ord := page.Order() dir := toDirection(ord) @@ -350,19 +350,21 @@ func (t *StatusTimeline) Load( // metadata entries from the timeline // cache, up to given limit. metas := t.cache.Select( - util.PtrIf(min), - util.PtrIf(max), + util.PtrIf(lo), + util.PtrIf(hi), util.PtrIf(lim), dir, ) - // Set the starting lo / hi ID paging - // values. We continually update these - // for further timeline selections and - // for returning final next / prev pgs. - lo, hi := min, max - if len(metas) > 0 { + // We ALWAYS return and work on + // statuses in DESC order, but the + // timeline cache returns statuses + // in the *requested* order. + if dir == structr.Asc { + slices.Reverse(metas) + } + // Update paging values // based on returned data. lo, hi = nextPageParams( @@ -466,19 +468,65 @@ func (t *StatusTimeline) Load( } } - // Using meta and funcs, prepare frontend API models. - apiStatuses, err := t.prepare(ctx, metas, prepareAPI) - if err != nil { - return nil, "", "", gtserror.Newf("error preparing api statuses: %w", err) + // Reset the lo, hi paging parameters, + // so we can set the final return vals. + lo, hi = "", "" + + // Returned frontend API models. + var apiStatuses []*apimodel.Status + if len(metas) > 0 { + var err error + + // Using meta and funcs, prepare frontend API models. + apiStatuses, err = t.prepare(ctx, metas, prepareAPI) + if err != nil { + return nil, "", "", gtserror.Newf("error preparing api statuses: %w", err) + } + + // Get lo / hi from meta. + lo = metas[len(metas)-1].ID + hi = metas[0].ID } - // Even if we don't return them, insert - // the excess (post-filtered) into cache. - t.cache.Insert(filtered...) + if len(filtered) > 0 { + // Even if we don't return them, insert + // the excess (post-filtered) into cache. + t.cache.Insert(filtered...) + + // Check filtered values for lo / hi values. + lo = minIf(lo, filtered[len(filtered)-1].ID) + hi = maxIf(hi, filtered[0].ID) + } return apiStatuses, lo, hi, nil } +func minIf(id1, id2 string) string { + switch { + case id1 == "": + return id2 + case id2 == "": + return id1 + case id1 < id2: + return id1 + default: + return id2 + } +} + +func maxIf(id1, id2 string) string { + switch { + case id1 == "": + return id2 + case id2 == "": + return id1 + case id1 > id2: + return id1 + default: + return id2 + } +} + // InsertOne allows you to insert a single status into the timeline, with optional prepared API model. func (t *StatusTimeline) InsertOne(status *gtsmodel.Status, prepared *apimodel.Status) { t.cache.Insert(&StatusMeta{ diff --git a/internal/cache/timeline/timeline.go b/internal/cache/timeline/timeline.go index 2a80d9099..0165fba71 100644 --- a/internal/cache/timeline/timeline.go +++ b/internal/cache/timeline/timeline.go @@ -41,13 +41,10 @@ func nextPageParams( } // toDirection converts page order to timeline direction. -func toDirection(o paging.Order) structr.Direction { - switch o { - case paging.OrderAscending: +func toDirection(order paging.Order) structr.Direction { + if order.Ascending() { return structr.Asc - case paging.OrderDescending: + } else /* i.e. descending */ { return structr.Desc - default: - return false } } diff --git a/internal/db/bundb/timeline.go b/internal/db/bundb/timeline.go index 8bdcb3b61..f5d7a6a12 100644 --- a/internal/db/bundb/timeline.go +++ b/internal/db/bundb/timeline.go @@ -157,7 +157,7 @@ func (t *timelineDB) GetLocalTimeline(ctx context.Context, page *paging.Page) ([ func(q *bun.SelectQuery) (*bun.SelectQuery, error) { // Local only. - q = q.Where("? = ?", bun.Ident("status.local"), true) + q = q.Where("? = ?", bun.Ident("local"), true) // Public only. q = q.Where("? = ?", bun.Ident("visibility"), gtsmodel.VisibilityPublic) @@ -263,10 +263,10 @@ func (t *timelineDB) GetListTimeline(ctx context.Context, listID string, page *p // Select target account // IDs from list follows. subQ := t.db.NewSelect(). - TableExpr("? AS ?", bun.Ident("follows"), bun.Ident("follow")). - Column("follow.target_account_id"). - Where("? IN (?)", bun.Ident("follow.id"), bun.In(followIDs)) - q = q.Where("? IN (?)", bun.Ident("status.account_id"), subQ) + Table("follows"). + Column("follows.target_account_id"). + Where("? IN (?)", bun.Ident("follows.id"), bun.In(followIDs)) + q = q.Where("? IN (?)", bun.Ident("statuses.account_id"), subQ) // Only include statuses that aren't pending approval. q = q.Where("NOT ? = ?", bun.Ident("pending_approval"), true) @@ -291,14 +291,14 @@ func (t *timelineDB) GetTagTimeline(ctx context.Context, tagID string, page *pag q = q.Join( "INNER JOIN ? ON ? = ?", bun.Ident("status_to_tags"), - bun.Ident("status.id"), bun.Ident("status_to_tags.status_id"), + bun.Ident("statuses.id"), bun.Ident("status_to_tags.status_id"), ) // This tag only. q = q.Where("? = ?", bun.Ident("status_to_tags.tag_id"), tagID) // Public only. - q = q.Where("? = ?", bun.Ident("status.visibility"), gtsmodel.VisibilityPublic) + q = q.Where("? = ?", bun.Ident("visibility"), gtsmodel.VisibilityPublic) return q, nil }, @@ -349,11 +349,10 @@ func loadStatusTimelinePage( return nil, err } - // Set ordering. - switch order { - case paging.OrderAscending: + // Set query ordering. + if order.Ascending() { q = q.OrderExpr("? ASC", bun.Ident("id")) - case paging.OrderDescending: + } else /* i.e. descending */ { q = q.OrderExpr("? DESC", bun.Ident("id")) } @@ -368,8 +367,8 @@ func loadStatusTimelinePage( // The order we return from the database and // timeline caches differs depending on ordering, - // but the caller always expected DESCENDING. - if page.GetOrder() == paging.OrderAscending { + // but the caller always expects DESCENDING. + if order.Ascending() { slices.Reverse(statusIDs) } diff --git a/internal/processing/timeline/list.go b/internal/processing/timeline/list.go index 3a26b0d42..15322d7f9 100644 --- a/internal/processing/timeline/list.go +++ b/internal/processing/timeline/list.go @@ -50,7 +50,7 @@ func (p *Processor) ListTimelineGet( } // Check exists. - if list != nil { + if list == nil { const text = "list not found" return nil, gtserror.NewErrorNotFound( errors.New(text), diff --git a/internal/processing/timeline/public_test.go b/internal/processing/timeline/public_test.go index b3ff87951..1451d1ac9 100644 --- a/internal/processing/timeline/public_test.go +++ b/internal/processing/timeline/public_test.go @@ -93,9 +93,9 @@ func (suite *PublicTestSuite) TestPublicTimelineGetNotEmpty() { // some other statuses were filtered out. suite.NoError(errWithCode) suite.Len(resp.Items, 1) - suite.Equal(`; rel="next", ; rel="prev"`, resp.LinkHeader) - suite.Equal(`http://localhost:8080/api/v1/timelines/public?limit=1&max_id=01F8MHCP5P2NWYQ416SBA0XSEV&local=false`, resp.NextLink) - suite.Equal(`http://localhost:8080/api/v1/timelines/public?limit=1&min_id=01HE7XJ1CG84TBKH5V9XKBVGF5&local=false`, resp.PrevLink) + suite.Equal(`; rel="next", ; rel="prev"`, resp.LinkHeader) + suite.Equal(`http://localhost:8080/api/v1/timelines/public?limit=1&local=false&max_id=01F8MHCP5P2NWYQ416SBA0XSEV`, resp.NextLink) + suite.Equal(`http://localhost:8080/api/v1/timelines/public?limit=1&local=false&min_id=01HE7XJ1CG84TBKH5V9XKBVGF5`, resp.PrevLink) } // A timeline containing a status hidden due to filtering should return other statuses with no error. @@ -153,8 +153,9 @@ func (suite *PublicTestSuite) TestPublicTimelineGetHideFiltered() { if !filteredStatusFound { suite.FailNow("precondition failed: status we would filter isn't present in unfiltered timeline") } - // The public timeline has no prepared status cache and doesn't need to be pruned, - // as in the home timeline version of this test. + + // Clear the timeline to drop all cached statuses. + suite.state.Caches.Timelines.Public.Clear() // Create a filter to hide one status on the timeline. if err := suite.db.PutFilter(ctx, filter); err != nil { diff --git a/internal/processing/timeline/timeline.go b/internal/processing/timeline/timeline.go index deedcb76d..89fc877e2 100644 --- a/internal/processing/timeline/timeline.go +++ b/internal/processing/timeline/timeline.go @@ -66,8 +66,8 @@ func (p *Processor) getStatusTimeline( requester *gtsmodel.Account, timeline *timeline.StatusTimeline, page *paging.Page, - pgPath string, // timeline page path - pgQuery url.Values, // timeline query parameters + pagePath string, + pageQuery url.Values, filterCtx statusfilter.FilterContext, loadPage func(*paging.Page) (statuses []*gtsmodel.Status, err error), preFilter func(*gtsmodel.Status) (bool, error), @@ -153,12 +153,17 @@ func (p *Processor) getStatusTimeline( return nil, gtserror.WrapWithCode(http.StatusInternalServerError, err) } + // Check for empty response. + if len(apiStatuses) == 0 { + return paging.EmptyResponse(), nil + } + // Package returned API statuses as pageable response. return paging.PackageResponse(paging.ResponseParams{ Items: xslices.ToAny(apiStatuses), - Path: pgPath, + Path: pagePath, Next: page.Next(lo, hi), Prev: page.Prev(lo, hi), - Query: pgQuery, + Query: pageQuery, }), nil } diff --git a/internal/processing/workers/fromclientapi.go b/internal/processing/workers/fromclientapi.go index bfead91c6..26c8f8543 100644 --- a/internal/processing/workers/fromclientapi.go +++ b/internal/processing/workers/fromclientapi.go @@ -371,7 +371,7 @@ func (p *clientAPI) CreateStatus(ctx context.Context, cMsg *messages.FromClientA if status.InReplyToID != "" { // Interaction counts changed on the replied status; // uncache the prepared version from all timelines. - p.surface.invalidateStatusFromTimelines(ctx, status.InReplyToID) + p.surface.invalidateStatusFromTimelines(status.InReplyToID) } return nil @@ -413,7 +413,7 @@ func (p *clientAPI) CreatePollVote(ctx context.Context, cMsg *messages.FromClien } // Interaction counts changed on the source status, uncache from timelines. - p.surface.invalidateStatusFromTimelines(ctx, vote.Poll.StatusID) + p.surface.invalidateStatusFromTimelines(vote.Poll.StatusID) return nil } @@ -565,7 +565,7 @@ func (p *clientAPI) CreateLike(ctx context.Context, cMsg *messages.FromClientAPI // Interaction counts changed on the faved status; // uncache the prepared version from all timelines. - p.surface.invalidateStatusFromTimelines(ctx, fave.StatusID) + p.surface.invalidateStatusFromTimelines(fave.StatusID) return nil } @@ -671,7 +671,7 @@ func (p *clientAPI) CreateAnnounce(ctx context.Context, cMsg *messages.FromClien // Interaction counts changed on the boosted status; // uncache the prepared version from all timelines. - p.surface.invalidateStatusFromTimelines(ctx, boost.BoostOfID) + p.surface.invalidateStatusFromTimelines(boost.BoostOfID) return nil } @@ -722,7 +722,7 @@ func (p *clientAPI) UpdateStatus(ctx context.Context, cMsg *messages.FromClientA } // Status representation has changed, invalidate from timelines. - p.surface.invalidateStatusFromTimelines(ctx, status.ID) + p.surface.invalidateStatusFromTimelines(status.ID) return nil } @@ -875,7 +875,7 @@ func (p *clientAPI) UndoFave(ctx context.Context, cMsg *messages.FromClientAPI) // Interaction counts changed on the faved status; // uncache the prepared version from all timelines. - p.surface.invalidateStatusFromTimelines(ctx, statusFave.StatusID) + p.surface.invalidateStatusFromTimelines(statusFave.StatusID) return nil } @@ -895,9 +895,8 @@ func (p *clientAPI) UndoAnnounce(ctx context.Context, cMsg *messages.FromClientA log.Errorf(ctx, "error updating account stats: %v", err) } - if err := p.surface.deleteStatusFromTimelines(ctx, status.ID); err != nil { - log.Errorf(ctx, "error removing timelined status: %v", err) - } + // Delete the boost wrapper status from timelines. + p.surface.deleteStatusFromTimelines(ctx, status.ID) if err := p.federate.UndoAnnounce(ctx, status); err != nil { log.Errorf(ctx, "error federating announce undo: %v", err) @@ -905,7 +904,7 @@ func (p *clientAPI) UndoAnnounce(ctx context.Context, cMsg *messages.FromClientA // Interaction counts changed on the boosted status; // uncache the prepared version from all timelines. - p.surface.invalidateStatusFromTimelines(ctx, status.BoostOfID) + p.surface.invalidateStatusFromTimelines(status.BoostOfID) return nil } @@ -968,7 +967,7 @@ func (p *clientAPI) DeleteStatus(ctx context.Context, cMsg *messages.FromClientA if status.InReplyToID != "" { // Interaction counts changed on the replied status; // uncache the prepared version from all timelines. - p.surface.invalidateStatusFromTimelines(ctx, status.InReplyToID) + p.surface.invalidateStatusFromTimelines(status.InReplyToID) } return nil @@ -1154,7 +1153,7 @@ func (p *clientAPI) AcceptLike(ctx context.Context, cMsg *messages.FromClientAPI // Interaction counts changed on the faved status; // uncache the prepared version from all timelines. - p.surface.invalidateStatusFromTimelines(ctx, req.Like.StatusID) + p.surface.invalidateStatusFromTimelines(req.Like.StatusID) return nil } @@ -1187,7 +1186,7 @@ func (p *clientAPI) AcceptReply(ctx context.Context, cMsg *messages.FromClientAP // Interaction counts changed on the replied status; // uncache the prepared version from all timelines. - p.surface.invalidateStatusFromTimelines(ctx, reply.InReplyToID) + p.surface.invalidateStatusFromTimelines(reply.InReplyToID) return nil } @@ -1225,7 +1224,7 @@ func (p *clientAPI) AcceptAnnounce(ctx context.Context, cMsg *messages.FromClien // Interaction counts changed on the original status; // uncache the prepared version from all timelines. - p.surface.invalidateStatusFromTimelines(ctx, boost.BoostOfID) + p.surface.invalidateStatusFromTimelines(boost.BoostOfID) return nil } diff --git a/internal/processing/workers/fromfediapi.go b/internal/processing/workers/fromfediapi.go index 6d3a17675..b3866a1c4 100644 --- a/internal/processing/workers/fromfediapi.go +++ b/internal/processing/workers/fromfediapi.go @@ -346,7 +346,7 @@ func (p *fediAPI) CreateStatus(ctx context.Context, fMsg *messages.FromFediAPI) // Interaction counts changed on the replied status; uncache the // prepared version from all timelines. The status dereferencer // functions will ensure necessary ancestors exist before this point. - p.surface.invalidateStatusFromTimelines(ctx, status.InReplyToID) + p.surface.invalidateStatusFromTimelines(status.InReplyToID) } return nil @@ -393,7 +393,7 @@ func (p *fediAPI) CreatePollVote(ctx context.Context, fMsg *messages.FromFediAPI } // Interaction counts changed, uncache from timelines. - p.surface.invalidateStatusFromTimelines(ctx, status.ID) + p.surface.invalidateStatusFromTimelines(status.ID) return nil } @@ -428,7 +428,7 @@ func (p *fediAPI) UpdatePollVote(ctx context.Context, fMsg *messages.FromFediAPI } // Interaction counts changed, uncache from timelines. - p.surface.invalidateStatusFromTimelines(ctx, status.ID) + p.surface.invalidateStatusFromTimelines(status.ID) return nil } @@ -573,7 +573,7 @@ func (p *fediAPI) CreateLike(ctx context.Context, fMsg *messages.FromFediAPI) er // Interaction counts changed on the faved status; // uncache the prepared version from all timelines. - p.surface.invalidateStatusFromTimelines(ctx, fave.StatusID) + p.surface.invalidateStatusFromTimelines(fave.StatusID) return nil } @@ -690,7 +690,7 @@ func (p *fediAPI) CreateAnnounce(ctx context.Context, fMsg *messages.FromFediAPI // Interaction counts changed on the original status; // uncache the prepared version from all timelines. - p.surface.invalidateStatusFromTimelines(ctx, boost.BoostOfID) + p.surface.invalidateStatusFromTimelines(boost.BoostOfID) return nil } @@ -835,7 +835,7 @@ func (p *fediAPI) AcceptReply(ctx context.Context, fMsg *messages.FromFediAPI) e // Interaction counts changed on the replied-to status; // uncache the prepared version from all timelines. - p.surface.invalidateStatusFromTimelines(ctx, status.InReplyToID) + p.surface.invalidateStatusFromTimelines(status.InReplyToID) return nil } @@ -884,11 +884,11 @@ func (p *fediAPI) AcceptRemoteStatus(ctx context.Context, fMsg *messages.FromFed // Interaction counts changed on the interacted status; // uncache the prepared version from all timelines. if status.InReplyToID != "" { - p.surface.invalidateStatusFromTimelines(ctx, status.InReplyToID) + p.surface.invalidateStatusFromTimelines(status.InReplyToID) } if status.BoostOfID != "" { - p.surface.invalidateStatusFromTimelines(ctx, status.BoostOfID) + p.surface.invalidateStatusFromTimelines(status.BoostOfID) } return nil @@ -917,7 +917,7 @@ func (p *fediAPI) AcceptAnnounce(ctx context.Context, fMsg *messages.FromFediAPI // Interaction counts changed on the boosted status; // uncache the prepared version from all timelines. - p.surface.invalidateStatusFromTimelines(ctx, boost.BoostOfID) + p.surface.invalidateStatusFromTimelines(boost.BoostOfID) return nil } @@ -968,7 +968,7 @@ func (p *fediAPI) UpdateStatus(ctx context.Context, fMsg *messages.FromFediAPI) } // Status representation was refetched, uncache from timelines. - p.surface.invalidateStatusFromTimelines(ctx, status.ID) + p.surface.invalidateStatusFromTimelines(status.ID) return nil } @@ -1027,7 +1027,7 @@ func (p *fediAPI) DeleteStatus(ctx context.Context, fMsg *messages.FromFediAPI) if status.InReplyToID != "" { // Interaction counts changed on the replied status; // uncache the prepared version from all timelines. - p.surface.invalidateStatusFromTimelines(ctx, status.InReplyToID) + p.surface.invalidateStatusFromTimelines(status.InReplyToID) } return nil @@ -1192,13 +1192,11 @@ func (p *fediAPI) UndoAnnounce( } // Remove the boost wrapper from all timelines. - if err := p.surface.deleteStatusFromTimelines(ctx, boost.ID); err != nil { - log.Errorf(ctx, "error removing timelined boost: %v", err) - } + p.surface.deleteStatusFromTimelines(ctx, boost.ID) // Interaction counts changed on the boosted status; // uncache the prepared version from all timelines. - p.surface.invalidateStatusFromTimelines(ctx, boost.BoostOfID) + p.surface.invalidateStatusFromTimelines(boost.BoostOfID) return nil } diff --git a/internal/processing/workers/surfacetimeline.go b/internal/processing/workers/surfacetimeline.go index 54eb75577..09e26fec0 100644 --- a/internal/processing/workers/surfacetimeline.go +++ b/internal/processing/workers/surfacetimeline.go @@ -531,18 +531,21 @@ func (s *Surface) tagFollowersForStatus( // deleteStatusFromTimelines completely removes the given status from all timelines. // It will also stream deletion of the status to all open streams. -func (s *Surface) deleteStatusFromTimelines(ctx context.Context, statusID string) error { +func (s *Surface) deleteStatusFromTimelines(ctx context.Context, statusID string) { + s.State.Caches.Timelines.Public.RemoveByStatusIDs(statusID) + s.State.Caches.Timelines.Local.RemoveByStatusIDs(statusID) s.State.Caches.Timelines.Home.RemoveByStatusIDs(statusID) s.State.Caches.Timelines.List.RemoveByStatusIDs(statusID) s.Stream.Delete(ctx, statusID) - return nil } // invalidateStatusFromTimelines does cache invalidation on the given status by // unpreparing it from all timelines, forcing it to be prepared again (with updated // stats, boost counts, etc) next time it's fetched by the timeline owner. This goes // both for the status itself, and for any boosts of the status. -func (s *Surface) invalidateStatusFromTimelines(ctx context.Context, statusID string) { +func (s *Surface) invalidateStatusFromTimelines(statusID string) { + s.State.Caches.Timelines.Public.UnprepareByStatusIDs(statusID) + s.State.Caches.Timelines.Local.UnprepareByStatusIDs(statusID) s.State.Caches.Timelines.Home.UnprepareByStatusIDs(statusID) s.State.Caches.Timelines.List.UnprepareByStatusIDs(statusID) } diff --git a/internal/processing/workers/util.go b/internal/processing/workers/util.go index b358dc951..d844ab762 100644 --- a/internal/processing/workers/util.go +++ b/internal/processing/workers/util.go @@ -172,15 +172,11 @@ func (u *utils) wipeStatus( } // Remove the boost from any and all timelines. - if err := u.surface.deleteStatusFromTimelines(ctx, boost.ID); err != nil { - errs.Appendf("error deleting boost from timelines: %w", err) - } + u.surface.deleteStatusFromTimelines(ctx, boost.ID) } // Delete the status itself from any and all timelines. - if err := u.surface.deleteStatusFromTimelines(ctx, status.ID); err != nil { - errs.Appendf("error deleting status from timelines: %w", err) - } + u.surface.deleteStatusFromTimelines(ctx, status.ID) // Delete this status from any conversations it's part of. if err := u.state.DB.DeleteStatusFromConversations(ctx, status.ID); err != nil {