From 227d6edc3e8edac78e96d4224258066ed73f7b31 Mon Sep 17 00:00:00 2001 From: kim Date: Thu, 3 Apr 2025 13:51:47 +0100 Subject: [PATCH] remove local / public caches (is out of scope for this work), share more timeline code --- internal/cache/cache.go | 4 - internal/cache/db.go | 1 + internal/cache/timeline.go | 26 ---- internal/cache/timeline/status.go | 140 +++++++++++++++++ internal/processing/timeline/public.go | 8 +- internal/processing/timeline/public_test.go | 3 - internal/processing/timeline/tag.go | 141 ++++++------------ internal/processing/timeline/timeline.go | 103 ++++++++----- internal/processing/workers/fromclientapi.go | 2 - .../processing/workers/surfacetimeline.go | 8 - 10 files changed, 256 insertions(+), 180 deletions(-) diff --git a/internal/cache/cache.go b/internal/cache/cache.go index 5a9f015d7..563930ef5 100644 --- a/internal/cache/cache.go +++ b/internal/cache/cache.go @@ -98,7 +98,6 @@ func (c *Caches) Init() { c.initListIDs() c.initListedIDs() c.initListTimelines() - c.initLocalTimeline() c.initMarker() c.initMedia() c.initMention() @@ -107,7 +106,6 @@ func (c *Caches) Init() { c.initPoll() c.initPollVote() c.initPollVoteIDs() - c.initPublicTimeline() c.initReport() c.initSinBinStatus() c.initStatus() @@ -216,8 +214,6 @@ func (c *Caches) Sweep(threshold float64) { c.DB.UserMuteIDs.Trim(threshold) c.Timelines.Home.Trim(threshold) c.Timelines.List.Trim(threshold) - c.Timelines.Public.Trim(threshold) - c.Timelines.Local.Trim(threshold) c.Visibility.Trim(threshold) } diff --git a/internal/cache/db.go b/internal/cache/db.go index c5404c8ed..146cf8dfb 100644 --- a/internal/cache/db.go +++ b/internal/cache/db.go @@ -150,6 +150,7 @@ type DBCaches struct { Domains atomic.Pointer[int] Statuses atomic.Pointer[int] Users atomic.Pointer[int] + UserIDs atomic.Pointer[[]string] } // InteractionRequest provides access to the gtsmodel InteractionRequest database cache. diff --git a/internal/cache/timeline.go b/internal/cache/timeline.go index fffeb36f8..058c34e21 100644 --- a/internal/cache/timeline.go +++ b/internal/cache/timeline.go @@ -29,12 +29,6 @@ type TimelineCaches struct { // List ... List timeline.StatusTimelines - - // Public ... - Public timeline.StatusTimelines - - // Local ... - Local timeline.StatusTimelines } func (c *Caches) initHomeTimelines() { @@ -56,23 +50,3 @@ func (c *Caches) initListTimelines() { c.Timelines.List.Init(cap) } - -func (c *Caches) initPublicTimeline() { - // Global cache so - // allow larger. - cap := 800 - - log.Infof(nil, "cache size = %d", cap) - - c.Timelines.Public.Init(cap) -} - -func (c *Caches) initLocalTimeline() { - // Global cache so - // allow larger. - cap := 800 - - log.Infof(nil, "cache size = %d", cap) - - c.Timelines.Local.Init(cap) -} diff --git a/internal/cache/timeline/status.go b/internal/cache/timeline/status.go index 96846ee2e..062a94278 100644 --- a/internal/cache/timeline/status.go +++ b/internal/cache/timeline/status.go @@ -522,6 +522,146 @@ func (t *StatusTimeline) Load( return apiStatuses, lo, hi, nil } +// LoadStatusTimeline is a function that may be used to load a timeline +// page in a functionally similar way to StatusTimeline{}.Load(), but without +// actually having access to a StatusTimeline{}. For example, for timelines that +// we want to share code, but without yet implementing a cache for them. Note this +// function may be removed in the future when un-needed. +func LoadStatusTimeline( + ctx context.Context, + page *paging.Page, + + // loadPage should load the timeline of given page for cache hydration. + loadPage func(page *paging.Page) (statuses []*gtsmodel.Status, err error), + + // loadIDs should load status models with given IDs, this is used + // to load status models of already cached entries in the timeline. + loadIDs func(ids []string) (statuses []*gtsmodel.Status, err error), + + // filter can be used to perform filtering of returned + // statuses BEFORE insert into cache. i.e. this will effect + // what actually gets stored in the timeline cache. + filter func(each *gtsmodel.Status) (delete bool, err error), + + // prepareAPI should prepare internal status model to frontend API model. + prepareAPI func(status *gtsmodel.Status) (apiStatus *apimodel.Status, err error), +) ( + []*apimodel.Status, + string, // lo + string, // hi + error, +) { + switch { + case page == nil: + panic("nil page") + case loadPage == nil: + panic("nil load page func") + } + + // Get paging details. + lo := page.Min.Value + hi := page.Max.Value + limit := page.Limit + order := page.Order() + + // Use a copy of current page so + // we can repeatedly update it. + nextPg := new(paging.Page) + *nextPg = *page + nextPg.Min.Value = lo + nextPg.Max.Value = hi + + // We now reset the lo,hi values to + // represent the lowest and highest + // index values of loaded statuses. + lo, hi = "", "" + + // Preallocate a slice of up-to-limit API models. + apiStatuses := make([]*apimodel.Status, 0, limit) + + // Check whether loaded enough from cache. + if need := limit - len(apiStatuses); need > 0 { + + // Load a little more than + // limit to reduce db calls. + nextPg.Limit += 10 + + // Perform maximum of 10 load + // attempts fetching statuses. + for i := 0; i < 10; i++ { + + // Load next timeline statuses. + statuses, err := loadPage(nextPg) + if err != nil { + return nil, "", "", gtserror.Newf("error loading timeline: %w", err) + } + + // No more statuses from + // load function = at end. + if len(statuses) == 0 { + break + } + + if hi == "" { + // Set hi returned paging + // value if not already set. + hi = statuses[0].ID + } + + // Update nextPg cursor parameter for next database query. + nextPageParams(nextPg, statuses[len(statuses)-1].ID, order) + + // Perform any filtering on newly loaded statuses. + statuses, err = doStatusFilter(statuses, filter) + if err != nil { + return nil, "", "", gtserror.Newf("error filtering statuses: %w", err) + } + + // After filtering no more + // statuses remain, retry. + if len(statuses) == 0 { + continue + } + + // Convert to our cache type, + // these will get inserted into + // the cache in prepare() below. + metas := toStatusMeta(statuses) + + // Prepare frontend API models for + // the loaded statuses. For now this + // also does its own extra filtering. + apiStatuses = prepareStatuses(ctx, + metas, + prepareAPI, + apiStatuses, + limit, + ) + + // If we have anything, return + // here. Even if below limit. + if len(apiStatuses) > 0 { + + // Set returned lo status paging value. + lo = apiStatuses[len(apiStatuses)-1].ID + break + } + } + } + + if order.Ascending() { + // The caller always expects the statuses + // to be returned in DESC order, but we + // build the status slice in paging order. + // If paging ASC, we need to reverse the + // returned statuses and paging values. + slices.Reverse(apiStatuses) + lo, hi = hi, lo + } + + return apiStatuses, lo, hi, nil +} + // InsertOne allows you to insert a single status into the timeline, with optional prepared API model. func (t *StatusTimeline) InsertOne(status *gtsmodel.Status, prepared *apimodel.Status) { t.cache.Insert(&StatusMeta{ diff --git a/internal/processing/timeline/public.go b/internal/processing/timeline/public.go index 07ab37418..d1c65fc3d 100644 --- a/internal/processing/timeline/public.go +++ b/internal/processing/timeline/public.go @@ -57,8 +57,8 @@ func (p *Processor) publicTimelineGet( // account. requester, - // Keyed-by-account-ID, public timeline cache. - p.state.Caches.Timelines.Public.MustGet(requester.ID), + // No cache. + nil, // Current // page. @@ -106,8 +106,8 @@ func (p *Processor) localTimelineGet( // account. requester, - // Keyed-by-account-ID, local timeline cache. - p.state.Caches.Timelines.Local.MustGet(requester.ID), + // No cache. + nil, // Current // page. diff --git a/internal/processing/timeline/public_test.go b/internal/processing/timeline/public_test.go index e69016efc..b5017af71 100644 --- a/internal/processing/timeline/public_test.go +++ b/internal/processing/timeline/public_test.go @@ -154,9 +154,6 @@ func (suite *PublicTestSuite) TestPublicTimelineGetHideFiltered() { suite.FailNow("precondition failed: status we would filter isn't present in unfiltered timeline") } - // Clear the timeline to drop all cached statuses. - suite.state.Caches.Timelines.Public.ClearAll() - // Create a filter to hide one status on the timeline. if err := suite.db.PutFilter(ctx, filter); err != nil { suite.FailNow(err.Error()) diff --git a/internal/processing/timeline/tag.go b/internal/processing/timeline/tag.go index 579d1cea9..995c12285 100644 --- a/internal/processing/timeline/tag.go +++ b/internal/processing/timeline/tag.go @@ -20,20 +20,15 @@ package timeline import ( "context" "errors" - "fmt" - "slices" + "net/http" apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" "github.com/superseriousbusiness/gotosocial/internal/db" statusfilter "github.com/superseriousbusiness/gotosocial/internal/filter/status" - "github.com/superseriousbusiness/gotosocial/internal/filter/usermute" - "github.com/superseriousbusiness/gotosocial/internal/gtscontext" "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" - "github.com/superseriousbusiness/gotosocial/internal/log" "github.com/superseriousbusiness/gotosocial/internal/paging" "github.com/superseriousbusiness/gotosocial/internal/text" - "github.com/superseriousbusiness/gotosocial/internal/util" ) // TagTimelineGet gets a pageable timeline for the given @@ -42,49 +37,69 @@ import ( // to requestingAcct before returning it. func (p *Processor) TagTimelineGet( ctx context.Context, - requestingAcct *gtsmodel.Account, + requester *gtsmodel.Account, tagName string, maxID string, sinceID string, minID string, limit int, ) (*apimodel.PageableResponse, gtserror.WithCode) { + + // Fetch the requested tag with name. tag, errWithCode := p.getTag(ctx, tagName) if errWithCode != nil { return nil, errWithCode } + // Check for a useable returned tag for endpoint. if tag == nil || !*tag.Useable || !*tag.Listable { + // Obey mastodon API by returning 404 for this. - err := fmt.Errorf("tag was not found, or not useable/listable on this instance") - return nil, gtserror.NewErrorNotFound(err, err.Error()) + const text = "tag was not found, or not useable/listable on this instance" + return nil, gtserror.NewWithCode(http.StatusNotFound, text) } - page := paging.Page{ - Min: paging.EitherMinID(minID, sinceID), - Max: paging.MaxID(maxID), - Limit: limit, - } + // Fetch status timeline for tag. + return p.getStatusTimeline(ctx, - statuses, err := p.state.DB.GetTagTimeline(ctx, tag.ID, &page) - if err != nil && !errors.Is(err, db.ErrNoEntries) { - err = gtserror.Newf("db error getting statuses: %w", err) - return nil, gtserror.NewErrorInternalError(err) - } + // Auth'd + // account. + requester, - if page.Order().Ascending() { - // Returned statuses always - // need to be in DESC order. - slices.Reverse(statuses) - } + // No cache. + nil, - return p.packageTagResponse( - ctx, - requestingAcct, - statuses, - limit, - // Use API URL for tag. + // Current + // page. + &paging.Page{ + Min: paging.EitherMinID(minID, sinceID), + Max: paging.MaxID(maxID), + Limit: limit, + }, + + // Tag timeline name's endpoint. "/api/v1/timelines/tag/"+tagName, + + // No page + // query. + nil, + + // Status filter context. + statusfilter.FilterContextPublic, + + // Database load function. + func(pg *paging.Page) (statuses []*gtsmodel.Status, err error) { + return p.state.DB.GetTagTimeline(ctx, tag.ID, pg) + }, + + // Filtering function, + // i.e. filter before caching. + func(s *gtsmodel.Status) (bool, error) { + + // Check the visibility of passed status to requesting user. + ok, err := p.visFilter.StatusHomeTimelineable(ctx, requester, s) + return !ok, err + }, ) } @@ -106,69 +121,3 @@ func (p *Processor) getTag(ctx context.Context, tagName string) (*gtsmodel.Tag, return tag, nil } - -func (p *Processor) packageTagResponse( - ctx context.Context, - requestingAcct *gtsmodel.Account, - statuses []*gtsmodel.Status, - limit int, - requestPath string, -) (*apimodel.PageableResponse, gtserror.WithCode) { - count := len(statuses) - if count == 0 { - return util.EmptyPageableResponse(), nil - } - - var ( - items = make([]interface{}, 0, count) - - // Set next + prev values before filtering and API - // converting, so caller can still page properly. - nextMaxIDValue = statuses[count-1].ID - prevMinIDValue = statuses[0].ID - ) - - filters, err := p.state.DB.GetFiltersForAccountID(ctx, requestingAcct.ID) - if err != nil { - err = gtserror.Newf("couldn't retrieve filters for account %s: %w", requestingAcct.ID, err) - return nil, gtserror.NewErrorInternalError(err) - } - - mutes, err := p.state.DB.GetAccountMutes(gtscontext.SetBarebones(ctx), requestingAcct.ID, nil) - if err != nil { - err = gtserror.Newf("couldn't retrieve mutes for account %s: %w", requestingAcct.ID, err) - return nil, gtserror.NewErrorInternalError(err) - } - compiledMutes := usermute.NewCompiledUserMuteList(mutes) - - for _, s := range statuses { - timelineable, err := p.visFilter.StatusTagTimelineable(ctx, requestingAcct, s) - if err != nil { - log.Errorf(ctx, "error checking status visibility: %v", err) - continue - } - - if !timelineable { - continue - } - - apiStatus, err := p.converter.StatusToAPIStatus(ctx, s, requestingAcct, statusfilter.FilterContextPublic, filters, compiledMutes) - if errors.Is(err, statusfilter.ErrHideStatus) { - continue - } - if err != nil { - log.Errorf(ctx, "error converting to api status: %v", err) - continue - } - - items = append(items, apiStatus) - } - - return util.PackagePageableResponse(util.PageableResponseParams{ - Items: items, - Path: requestPath, - NextMaxIDValue: nextMaxIDValue, - PrevMinIDValue: prevMinIDValue, - Limit: limit, - }) -} diff --git a/internal/processing/timeline/timeline.go b/internal/processing/timeline/timeline.go index 5f360943b..3e674b29c 100644 --- a/internal/processing/timeline/timeline.go +++ b/internal/processing/timeline/timeline.go @@ -64,7 +64,7 @@ func New(state *state.State, converter *typeutils.Converter, visFilter *visibili func (p *Processor) getStatusTimeline( ctx context.Context, requester *gtsmodel.Account, - timeline *timeline.StatusTimeline, + cache *timeline.StatusTimeline, page *paging.Page, pagePath string, pageQuery url.Values, @@ -75,13 +75,11 @@ func (p *Processor) getStatusTimeline( *apimodel.PageableResponse, gtserror.WithCode, ) { - var ( - filters []*gtsmodel.Filter - mutes *usermute.CompiledUserMuteList - ) + var err error + var filters []*gtsmodel.Filter + var mutes *usermute.CompiledUserMuteList if requester != nil { - var err error // Fetch all filters relevant for requesting account. filters, err = p.state.DB.GetFiltersForAccountID(ctx, @@ -110,42 +108,73 @@ func (p *Processor) getStatusTimeline( // input paging cursor. id.ValidatePage(page) - // Load status page via timeline cache, also - // getting lo, hi values for next, prev pages. - apiStatuses, lo, hi, err := timeline.Load(ctx, + // Returned models and page params. + var apiStatuses []*apimodel.Status + var lo, hi string - // Status page - // to load. - page, + if cache != nil { + // Load status page via timeline cache, also + // getting lo, hi values for next, prev pages. + apiStatuses, lo, hi, err = cache.Load(ctx, - // Caller provided database - // status page loading function. - loadPage, + // Status page + // to load. + page, - // Status load function for cached timeline entries. - func(ids []string) ([]*gtsmodel.Status, error) { - return p.state.DB.GetStatusesByIDs(ctx, ids) - }, + // Caller provided database + // status page loading function. + loadPage, - // Filtering function, - // i.e. filter before caching. - filter, + // Status load function for cached timeline entries. + func(ids []string) ([]*gtsmodel.Status, error) { + return p.state.DB.GetStatusesByIDs(ctx, ids) + }, + + // Filtering function, + // i.e. filter before caching. + filter, + + // Frontend API model preparation function. + func(status *gtsmodel.Status) (*apimodel.Status, error) { + apiStatus, err := p.converter.StatusToAPIStatus(ctx, + status, + requester, + filterCtx, + filters, + mutes, + ) + if err != nil && !errors.Is(err, statusfilter.ErrHideStatus) { + return nil, err + } + return apiStatus, nil + }, + ) + } else { + // Load status page without a receiving timeline cache. + // TODO: remove this code path when all support caching. + apiStatuses, lo, hi, err = timeline.LoadStatusTimeline(ctx, + page, + loadPage, + func(ids []string) ([]*gtsmodel.Status, error) { + return p.state.DB.GetStatusesByIDs(ctx, ids) + }, + filter, + func(status *gtsmodel.Status) (*apimodel.Status, error) { + apiStatus, err := p.converter.StatusToAPIStatus(ctx, + status, + requester, + filterCtx, + filters, + mutes, + ) + if err != nil && !errors.Is(err, statusfilter.ErrHideStatus) { + return nil, err + } + return apiStatus, nil + }, + ) + } - // Frontend API model preparation function. - func(status *gtsmodel.Status) (*apimodel.Status, error) { - apiStatus, err := p.converter.StatusToAPIStatus(ctx, - status, - requester, - filterCtx, - filters, - mutes, - ) - if err != nil && !errors.Is(err, statusfilter.ErrHideStatus) { - return nil, err - } - return apiStatus, nil - }, - ) if err != nil { err := gtserror.Newf("error loading timeline: %w", err) return nil, gtserror.WrapWithCode(http.StatusInternalServerError, err) diff --git a/internal/processing/workers/fromclientapi.go b/internal/processing/workers/fromclientapi.go index 9f375a026..8c72859e1 100644 --- a/internal/processing/workers/fromclientapi.go +++ b/internal/processing/workers/fromclientapi.go @@ -1041,9 +1041,7 @@ func (p *clientAPI) DeleteAccountOrUser(ctx context.Context, cMsg *messages.From p.surface.removeTimelineEntriesByAccount(account.ID) // Remove any of their cached timelines. - p.state.Caches.Timelines.Public.Delete(account.ID) p.state.Caches.Timelines.Home.Delete(account.ID) - p.state.Caches.Timelines.Local.Delete(account.ID) // Get the IDs of all the lists owned by the given account ID. listIDs, err := p.state.DB.GetListIDsByAccountID(ctx, account.ID) diff --git a/internal/processing/workers/surfacetimeline.go b/internal/processing/workers/surfacetimeline.go index 351b0abc6..14a94a757 100644 --- a/internal/processing/workers/surfacetimeline.go +++ b/internal/processing/workers/surfacetimeline.go @@ -804,8 +804,6 @@ func (s *Surface) timelineStatusUpdateForTagFollowers( // deleteStatusFromTimelines completely removes the given status from all timelines. // It will also stream deletion of the status to all open streams. func (s *Surface) deleteStatusFromTimelines(ctx context.Context, statusID string) { - s.State.Caches.Timelines.Public.RemoveByStatusIDs(statusID) - s.State.Caches.Timelines.Local.RemoveByStatusIDs(statusID) s.State.Caches.Timelines.Home.RemoveByStatusIDs(statusID) s.State.Caches.Timelines.List.RemoveByStatusIDs(statusID) s.Stream.Delete(ctx, statusID) @@ -816,17 +814,13 @@ func (s *Surface) deleteStatusFromTimelines(ctx context.Context, statusID string // stats, boost counts, etc) next time it's fetched by the timeline owner. This goes // both for the status itself, and for any boosts of the status. func (s *Surface) invalidateStatusFromTimelines(statusID string) { - s.State.Caches.Timelines.Public.UnprepareByStatusIDs(statusID) - s.State.Caches.Timelines.Local.UnprepareByStatusIDs(statusID) s.State.Caches.Timelines.Home.UnprepareByStatusIDs(statusID) s.State.Caches.Timelines.List.UnprepareByStatusIDs(statusID) } // removeTimelineEntriesByAccount removes all cached timeline entries authored by account ID. func (s *Surface) removeTimelineEntriesByAccount(accountID string) { - s.State.Caches.Timelines.Public.RemoveByAccountIDs(accountID) s.State.Caches.Timelines.Home.RemoveByAccountIDs(accountID) - s.State.Caches.Timelines.Local.RemoveByAccountIDs(accountID) s.State.Caches.Timelines.List.RemoveByAccountIDs(accountID) } @@ -835,9 +829,7 @@ func (s *Surface) invalidateTimelinesForAccount(ctx context.Context, accountID s // There's a lot of visibility changes to caclculate for any // relationship change, so just clear all account's timelines. - s.State.Caches.Timelines.Public.Clear(accountID) s.State.Caches.Timelines.Home.Clear(accountID) - s.State.Caches.Timelines.Local.Clear(accountID) // Get the IDs of all the lists owned by the given account ID. listIDs, err := s.State.DB.GetListIDsByAccountID(ctx, accountID)