From b3c9bfde182440ff4b2d5ae705f0a5b475cdc05e Mon Sep 17 00:00:00 2001 From: kim Date: Tue, 8 Apr 2025 14:16:08 +0100 Subject: [PATCH] start adding preloading support --- cmd/gotosocial/action/server/server.go | 5 + internal/cache/timeline/status.go | 198 ++++++++++++++--------- internal/cache/timeline/status_test.go | 6 + internal/cache/timeline/timeline.go | 9 ++ internal/processing/timeline/home.go | 33 ++++ internal/processing/timeline/list.go | 34 ++++ internal/processing/timeline/timeline.go | 49 +++++- 7 files changed, 255 insertions(+), 79 deletions(-) diff --git a/cmd/gotosocial/action/server/server.go b/cmd/gotosocial/action/server/server.go index 6bc27a7c4..b43ed25a2 100644 --- a/cmd/gotosocial/action/server/server.go +++ b/cmd/gotosocial/action/server/server.go @@ -351,6 +351,11 @@ var Start action.GTSAction = func(ctx context.Context) error { intFilter, ) + // Preload our local user's streaming timeline caches. + if err := process.Timeline().Preload(ctx); err != nil { + return fmt.Errorf("error preloading timelines: %w", err) + } + // Schedule background cleaning tasks. if err := cleaner.ScheduleJobs(); err != nil { return fmt.Errorf("error scheduling cleaner jobs: %w", err) diff --git a/internal/cache/timeline/status.go b/internal/cache/timeline/status.go index a497ce5ed..9660f1303 100644 --- a/internal/cache/timeline/status.go +++ b/internal/cache/timeline/status.go @@ -318,6 +318,72 @@ func (t *StatusTimeline) Init(cap int) { t.max = cap } +// Preload ... +func (t *StatusTimeline) Preload( + ctx context.Context, + + // loadPage should load the timeline of given page for cache hydration. + loadPage func(page *paging.Page) (statuses []*gtsmodel.Status, err error), + + // filter can be used to perform filtering of returned + // statuses BEFORE insert into cache. i.e. this will effect + // what actually gets stored in the timeline cache. + filter func(each *gtsmodel.Status) (delete bool, err error), +) (int, error) { + if loadPage == nil { + panic("nil load page func") + } + + // Our starting, page at the top + // of the possible timeline. + page := new(paging.Page) + order := paging.OrderDescending + page.Max.Order = order + page.Max.Value = plus24hULID() + page.Min.Order = order + page.Min.Value = "" + page.Limit = 100 + + // Prepare a slice for gathering status meta. + metas := make([]*StatusMeta, 0, page.Limit) + + var n int + for n < t.cut { + // Load page of timeline statuses. + statuses, err := loadPage(page) + if err != nil { + return n, gtserror.Newf("error loading statuses: %w", err) + } + + // No more statuses from + // load function = at end. + if len(statuses) == 0 { + break + } + + // Update our next page cursor from statuses. + page.Max.Value = statuses[len(statuses)-1].ID + + // Perform any filtering on newly loaded statuses. + statuses, err = doStatusFilter(statuses, filter) + if err != nil { + return n, gtserror.Newf("error filtering statuses: %w", err) + } + + // After filtering no more + // statuses remain, retry. + if len(statuses) == 0 { + continue + } + + // Convert statuses to meta and insert. + metas = toStatusMeta(metas[:0], statuses) + n = t.cache.Insert(metas...) + } + + return n, nil +} + // Load will load timeline statuses according to given // page, using provided callbacks to load extra data when // necessary, and perform fine-grained filtering loaded @@ -424,12 +490,9 @@ func (t *StatusTimeline) Load( ) } - // Track all newly loaded status entries - // after filtering for insert into cache. - var justLoaded []*StatusMeta - - // Check whether loaded enough from cache. - if need := limit - len(apiStatuses); need > 0 { + // Check if we need to call + // through to the database. + if len(apiStatuses) == 0 { // Load a little more than // limit to reduce db calls. @@ -475,10 +538,7 @@ func (t *StatusTimeline) Load( // Convert to our cache type, // these will get inserted into // the cache in prepare() below. - metas := toStatusMeta(statuses) - - // Append to newly loaded for later insert. - justLoaded = append(justLoaded, metas...) + metas := toStatusMeta(nil, statuses) // Prepare frontend API models for // the loaded statuses. For now this @@ -511,12 +571,6 @@ func (t *StatusTimeline) Load( lo, hi = hi, lo } - if len(justLoaded) > 0 { - // Even if not returning them, insert - // the excess (filtered) into cache. - _ = t.cache.Insert(justLoaded...) - } - return apiStatuses, lo, hi, nil } @@ -566,73 +620,69 @@ func LoadStatusTimeline( // Preallocate a slice of up-to-limit API models. apiStatuses := make([]*apimodel.Status, 0, limit) - // Check whether loaded enough from cache. - if need := limit - len(apiStatuses); need > 0 { + // Load a little more than + // limit to reduce db calls. + nextPg.Limit += 10 - // Load a little more than - // limit to reduce db calls. - nextPg.Limit += 10 + // Perform maximum of 5 load + // attempts fetching statuses. + for i := 0; i < 5; i++ { - // Perform maximum of 5 load - // attempts fetching statuses. - for i := 0; i < 5; i++ { + // Load next timeline statuses. + statuses, err := loadPage(nextPg) + if err != nil { + return nil, "", "", gtserror.Newf("error loading timeline: %w", err) + } - // Load next timeline statuses. - statuses, err := loadPage(nextPg) - if err != nil { - return nil, "", "", gtserror.Newf("error loading timeline: %w", err) - } + // No more statuses from + // load function = at end. + if len(statuses) == 0 { + break + } - // No more statuses from - // load function = at end. - if len(statuses) == 0 { - break - } + if hi == "" { + // Set hi returned paging + // value if not already set. + hi = statuses[0].ID + } - if hi == "" { - // Set hi returned paging - // value if not already set. - hi = statuses[0].ID - } + // Update nextPg cursor parameter for next database query. + nextPageParams(nextPg, statuses[len(statuses)-1].ID, order) - // Update nextPg cursor parameter for next database query. - nextPageParams(nextPg, statuses[len(statuses)-1].ID, order) + // Perform any filtering on newly loaded statuses. + statuses, err = doStatusFilter(statuses, filter) + if err != nil { + return nil, "", "", gtserror.Newf("error filtering statuses: %w", err) + } - // Perform any filtering on newly loaded statuses. - statuses, err = doStatusFilter(statuses, filter) - if err != nil { - return nil, "", "", gtserror.Newf("error filtering statuses: %w", err) - } + // After filtering no more + // statuses remain, retry. + if len(statuses) == 0 { + continue + } - // After filtering no more - // statuses remain, retry. - if len(statuses) == 0 { - continue - } + // Convert to our cache type, + // these will get inserted into + // the cache in prepare() below. + metas := toStatusMeta(nil, statuses) - // Convert to our cache type, - // these will get inserted into - // the cache in prepare() below. - metas := toStatusMeta(statuses) + // Prepare frontend API models for + // the loaded statuses. For now this + // also does its own extra filtering. + apiStatuses = prepareStatuses(ctx, + metas, + prepareAPI, + apiStatuses, + limit, + ) - // Prepare frontend API models for - // the loaded statuses. For now this - // also does its own extra filtering. - apiStatuses = prepareStatuses(ctx, - metas, - prepareAPI, - apiStatuses, - limit, - ) + // If we have anything, return + // here. Even if below limit. + if len(apiStatuses) > 0 { - // If we have anything, return - // here. Even if below limit. - if len(apiStatuses) > 0 { - - // Set returned lo status paging value. - lo = apiStatuses[len(apiStatuses)-1].ID - break - } + // Set returned lo status paging value. + lo = apiStatuses[len(apiStatuses)-1].ID + break } } @@ -922,8 +972,8 @@ func loadStatuses( // toStatusMeta converts a slice of database model statuses // into our cache wrapper type, a slice of []StatusMeta{}. -func toStatusMeta(statuses []*gtsmodel.Status) []*StatusMeta { - return xslices.Gather(nil, statuses, func(s *gtsmodel.Status) *StatusMeta { +func toStatusMeta(in []*StatusMeta, statuses []*gtsmodel.Status) []*StatusMeta { + return xslices.Gather(in, statuses, func(s *gtsmodel.Status) *StatusMeta { return &StatusMeta{ ID: s.ID, AccountID: s.AccountID, diff --git a/internal/cache/timeline/status_test.go b/internal/cache/timeline/status_test.go index b01813ae8..7e07bfc9c 100644 --- a/internal/cache/timeline/status_test.go +++ b/internal/cache/timeline/status_test.go @@ -273,6 +273,12 @@ func TestStatusTimelineTrim(t *testing.T) { assert.NotEqual(t, minID2, minStatus(&tt).ID) assert.False(t, containsStatusID(&tt, minID1)) assert.False(t, containsStatusID(&tt, minID2)) + + // Trim at desired length + // should cause no change. + before := tt.cache.Len() + tt.Trim() + assert.Equal(t, before, tt.cache.Len()) } // containsStatusID returns whether timeline contains a status with ID. diff --git a/internal/cache/timeline/timeline.go b/internal/cache/timeline/timeline.go index 655c0657a..dfb383fcb 100644 --- a/internal/cache/timeline/timeline.go +++ b/internal/cache/timeline/timeline.go @@ -18,10 +18,19 @@ package timeline import ( + "time" + "codeberg.org/gruf/go-structr" + "github.com/superseriousbusiness/gotosocial/internal/id" "github.com/superseriousbusiness/gotosocial/internal/paging" ) +// plus24hULID returns a ULID for now+24h. +func plus24hULID() string { + t := time.Now().Add(24 * time.Hour) + return id.NewULIDFromTime(t) +} + // nextPageParams gets the next set of paging // parameters to use based on the current set, // and the next set of lo / hi values. This will diff --git a/internal/processing/timeline/home.go b/internal/processing/timeline/home.go index eefb45a2a..997998360 100644 --- a/internal/processing/timeline/home.go +++ b/internal/processing/timeline/home.go @@ -25,6 +25,7 @@ import ( statusfilter "github.com/superseriousbusiness/gotosocial/internal/filter/status" "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/log" "github.com/superseriousbusiness/gotosocial/internal/paging" ) @@ -90,3 +91,35 @@ func (p *Processor) HomeTimelineGet( }, ) } + +// preloadHomeTimeline will ensure that the timeline +// cache for home owned by given account is preloaded. +func (p *Processor) preloadHomeTimeline( + ctx context.Context, + account *gtsmodel.Account, +) error { + + // Get (and so, create) home timeline cache for account ID. + timeline := p.state.Caches.Timelines.Home.MustGet(account.ID) + + // Preload timeline with funcs. + n, err := timeline.Preload(ctx, + + // Database load function. + func(page *paging.Page) ([]*gtsmodel.Status, error) { + return p.state.DB.GetHomeTimeline(ctx, account.ID, page) + }, + + // Status filtering function. + func(status *gtsmodel.Status) (bool, error) { + ok, err := p.visFilter.StatusHomeTimelineable(ctx, account, status) + return !ok, err + }, + ) + if err != nil { + return gtserror.Newf("error preloading home timeline %s: %w", account.ID, err) + } + + log.Infof(ctx, "%s: preloaded %d", account.Username, n) + return nil +} diff --git a/internal/processing/timeline/list.go b/internal/processing/timeline/list.go index 04166226f..41c637be1 100644 --- a/internal/processing/timeline/list.go +++ b/internal/processing/timeline/list.go @@ -27,6 +27,7 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/gtscontext" "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/log" "github.com/superseriousbusiness/gotosocial/internal/paging" ) @@ -103,3 +104,36 @@ func (p *Processor) ListTimelineGet( }, ) } + +// preloadListTimeline will ensure that the timeline +// cache for list owned by given account is preloaded. +func (p *Processor) preloadListTimeline( + ctx context.Context, + account *gtsmodel.Account, + list *gtsmodel.List, +) error { + + // Get (and so, create) list timeline cache for list ID. + timeline := p.state.Caches.Timelines.List.MustGet(list.ID) + + // Preload timeline with funcs. + n, err := timeline.Preload(ctx, + + // Database load function. + func(page *paging.Page) ([]*gtsmodel.Status, error) { + return p.state.DB.GetListTimeline(ctx, list.ID, page) + }, + + // Status filtering function. + func(status *gtsmodel.Status) (bool, error) { + ok, err := p.visFilter.StatusHomeTimelineable(ctx, account, status) + return !ok, err + }, + ) + if err != nil { + return gtserror.Newf("error preloading list timeline %s: %w", list.ID, err) + } + + log.Infof(ctx, "%s[%q]: preloaded %d", account.Username, list.Title, n) + return nil +} diff --git a/internal/processing/timeline/timeline.go b/internal/processing/timeline/timeline.go index 3876f1563..d176118b4 100644 --- a/internal/processing/timeline/timeline.go +++ b/internal/processing/timeline/timeline.go @@ -24,11 +24,12 @@ import ( "net/url" apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" - "github.com/superseriousbusiness/gotosocial/internal/cache/timeline" + timelinepkg "github.com/superseriousbusiness/gotosocial/internal/cache/timeline" "github.com/superseriousbusiness/gotosocial/internal/db" statusfilter "github.com/superseriousbusiness/gotosocial/internal/filter/status" "github.com/superseriousbusiness/gotosocial/internal/filter/usermute" "github.com/superseriousbusiness/gotosocial/internal/filter/visibility" + "github.com/superseriousbusiness/gotosocial/internal/gtscontext" "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/id" @@ -61,10 +62,48 @@ func New(state *state.State, converter *typeutils.Converter, visFilter *visibili } } +// Preload ... +func (p *Processor) Preload(ctx context.Context) error { + + // Get all of our local user accounts. + users, err := p.state.DB.GetAllUsers(ctx) + if err != nil { + return gtserror.Newf("error getting users: %w", err) + } + + for _, user := range users { + // Get associated account. + account := user.Account + + // Preload this user account's home timeline cache. + if err := p.preloadHomeTimeline(ctx, account); err != nil { + return gtserror.Newf("error preloading home timeline: %w", err) + } + + // Get all lists owned by this user account. + lists, err := p.state.DB.GetListsByAccountID( + gtscontext.SetBarebones(ctx), + account.ID, + ) + if err != nil { + return gtserror.Newf("error getting account %s lists: %w", account.ID, err) + } + + for _, list := range lists { + // Preload each of this user account's list timeline caches. + if err := p.preloadListTimeline(ctx, account, list); err != nil { + return gtserror.Newf("error preloading list timeline: %w", err) + } + } + } + + return nil +} + func (p *Processor) getStatusTimeline( ctx context.Context, requester *gtsmodel.Account, - cache *timeline.StatusTimeline, + timeline *timelinepkg.StatusTimeline, page *paging.Page, pagePath string, pageQuery url.Values, @@ -128,10 +167,10 @@ func (p *Processor) getStatusTimeline( return apiStatus, nil } - if cache != nil { + if timeline != nil { // Load status page via timeline cache, also // getting lo, hi values for next, prev pages. - apiStatuses, lo, hi, err = cache.Load(ctx, + apiStatuses, lo, hi, err = timeline.Load(ctx, // Status page // to load. @@ -157,7 +196,7 @@ func (p *Processor) getStatusTimeline( } else { // Load status page without a receiving timeline cache. // TODO: remove this code path when all support caching. - apiStatuses, lo, hi, err = timeline.LoadStatusTimeline(ctx, + apiStatuses, lo, hi, err = timelinepkg.LoadStatusTimeline(ctx, page, loadPage, func(ids []string) ([]*gtsmodel.Status, error) {