diff --git a/internal/cache/timeline/preload.go b/internal/cache/timeline/preload.go new file mode 100644 index 000000000..71c8c7c69 --- /dev/null +++ b/internal/cache/timeline/preload.go @@ -0,0 +1,129 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package timeline + +import ( + "sync" + "sync/atomic" + + "github.com/superseriousbusiness/gotosocial/internal/log" +) + +// preloader provides a means of synchronising the +// initial fill, or "preload", of a timeline cache. +// it has 4 possible states in the atomic pointer: +// - preloading = &(interface{}(*sync.WaitGroup)) +// - preloaded = &(interface{}(nil)) +// - needs preload = &(interface{}(false)) +// - brand-new = nil (functionally same as 'needs preload') +type preloader struct{ p atomic.Pointer[any] } + +// Check will concurrency-safely check the preload +// state, and if needed call the provided function. +// if a preload is in progress, it will wait until complete. +func (p *preloader) Check(preload func()) { + for { + // Get state ptr. + ptr := p.p.Load() + + if ptr == nil || *ptr == false { + // Needs preloading, start it. + ok := p.start(ptr, preload) + + if !ok { + // Failed to acquire start, + // other thread beat us to it. + continue + } + + // Success! + return + } + + // Check for a preload currently in progress. + if wg, _ := (*ptr).(*sync.WaitGroup); wg != nil { + wg.Wait() + continue + } + + // Anything else + // means success. + return + } +} + +// start attempts to start the given preload function, by +// performing a CAS operation with 'old'. return is success. +func (p *preloader) start(old *any, preload func()) bool { + + // Optimistically setup a + // new waitgroup to set as + // the preload waiter. + var wg sync.WaitGroup + wg.Add(1) + defer wg.Done() + + // Wrap waitgroup in + // 'any' for pointer. + new := any(&wg) + + // Attempt CAS operation to claim start. + started := p.p.CompareAndSwap(old, &new) + if !started { + return false + } + + // Start. + preload() + return true +} + +// done marks state as preloaded, +// i.e. no more preload required. +func (p *preloader) done() { + old := p.p.Swap(new(any)) + if old == nil { // was brand-new + return + } + switch t := (*old).(type) { + case *sync.WaitGroup: // was preloading + default: + log.Errorf(nil, "BUG: invalid preloader state: %#v", t) + } +} + +// clear will clear the state, marking a "preload" as required. +// i.e. next call to Check() will call provided preload func. +func (p *preloader) clear() { + b := false + a := any(b) + for { + old := p.p.Swap(&a) + if old == nil { // was brand-new + return + } + switch t := (*old).(type) { + case nil: // was preloaded + return + case bool: // was cleared + return + case *sync.WaitGroup: // was preloading + t.Wait() + } + } +} diff --git a/internal/cache/timeline/status.go b/internal/cache/timeline/status.go index 25889369d..d76204599 100644 --- a/internal/cache/timeline/status.go +++ b/internal/cache/timeline/status.go @@ -20,8 +20,6 @@ package timeline import ( "context" "slices" - "sync" - "sync/atomic" "codeberg.org/gruf/go-structr" @@ -68,22 +66,44 @@ type StatusMeta struct { loaded *gtsmodel.Status } -// StatusTimeline provides a concurrency-safe timeline -// cache of status information. Internally only StatusMeta{} -// objects are stored, and the statuses themselves are loaded -// as-needed, caching prepared frontend representations where -// possible. This is largely wrapping code for our own codebase -// to be able to smoothly interact with structr.Timeline{}. - -// ... +// StatusTimeline provides a concurrency-safe sliding-window +// cache of the freshest statuses in a timeline. Internally, +// only StatusMeta{} objects themselves are stored, loading +// the actual statuses when necessary, but caching prepared +// frontend API models where possible. +// +// Notes on design: +// +// Previously, and initially when designing this newer type, +// we had status timeline caches that would dynamically fill +// themselves with statuses on call to Load() with statuses +// at *any* location in the timeline, while simultaneously +// accepting new input of statuses from the background workers. +// This unfortunately can lead to situations where posts need +// to be fetched from the database, but the cache isn't aware +// they exist and instead returns an incomplete selection. +// This problem is best outlined by the follow simple example: +// +// "what if my timeline cache contains posts 0-to-6 and 8-to-12, +// and i make a request for posts between 4-and-10 with no limit, +// how is it to know that it's missing post 7?" +// +// The solution is to unfortunately remove a lot of the caching +// of "older areas" of the timeline, and instead just have it +// be a sliding window of the freshest posts of that timeline. +// It gets preloaded initially on start / first-call, and kept +// up-to-date with new posts by streamed inserts from background +// workers. Any requests for posts outside this we know therefore +// must hit the database, (which we then *don't* cache). type StatusTimeline struct { // underlying timeline cache of *StatusMeta{}, // primary-keyed by ID, with extra indices below. cache structr.Timeline[*StatusMeta, string] - // ... - preload atomic.Pointer[any] + // preloader synchronizes preload + // state of the timeline cache. + preloader preloader // fast-access cache indices. idx_ID *structr.Index //nolint:revive @@ -150,79 +170,41 @@ func (t *StatusTimeline) Init(cap int) { t.max = cap } -func (t *StatusTimeline) startPreload( - ctx context.Context, - old *any, // old 'preload' ptr - loadPage func(page *paging.Page) ([]*gtsmodel.Status, error), - filter func(*gtsmodel.Status) (bool, error), +// Preload will fill with StatusTimeline{} cache with +// the latest sliding window of status metadata for the +// timeline type returned by database 'loadPage' function. +// +// This function is concurrency-safe and repeated calls to +// it when already preloaded will be no-ops. To trigger a +// preload as being required, call .Clear(). +func (t *StatusTimeline) Preload( + + // loadPage should load the timeline of given page for cache hydration. + loadPage func(page *paging.Page) (statuses []*gtsmodel.Status, err error), + + // filter can be used to perform filtering of returned + // statuses BEFORE insert into cache. i.e. this will effect + // what actually gets stored in the timeline cache. + filter func(each *gtsmodel.Status) (delete bool, err error), ) ( - started bool, + n int, err error, ) { - // Optimistically setup a - // new waitgroup to set as - // the preload waiter. - var wg sync.WaitGroup - wg.Add(1) - defer wg.Done() + t.preloader.Check(func() { + n, err = t.preload(loadPage, filter) + if err != nil { + return + } - // Wrap waitgroup in - // 'any' for pointer. - new := any(&wg) - - // Attempt CAS operation to claim preload start. - started = t.preload.CompareAndSwap(old, &new) - if !started { - return - } - - // Begin the preload. - _, err = t.Preload(ctx, - loadPage, - filter, - ) + // Mark preloaded. + t.preloader.done() + }) return } -func (t *StatusTimeline) checkPreload( - ctx context.Context, - loadPage func(page *paging.Page) ([]*gtsmodel.Status, error), - filter func(*gtsmodel.Status) (bool, error), -) error { - for { - // Get preload state. - p := t.preload.Load() - - if p == nil || *p == false { - // Timeline needs preloading, start this process. - ok, err := t.startPreload(ctx, p, loadPage, filter) - - if !ok { - // Failed to acquire start, - // other thread beat us to it. - continue - } - - // Return - // result. - return err - } - - // Check for a preload currently in progress. - if wg, _ := (*p).(*sync.WaitGroup); wg != nil { - wg.Wait() - continue - } - - // Anything else means - // timeline is ready. - return nil - } -} - -// Preload ... -func (t *StatusTimeline) Preload( - ctx context.Context, +// preload contains the core logic of +// Preload(), without t.preloader checks. +func (t *StatusTimeline) preload( // loadPage should load the timeline of given page for cache hydration. loadPage func(page *paging.Page) (statuses []*gtsmodel.Status, err error), @@ -311,27 +293,15 @@ func (t *StatusTimeline) Preload( } } - // Mark timeline as preloaded. - old := t.preload.Swap(new(any)) - if old != nil { - switch t := (*old).(type) { - case *sync.WaitGroup: - default: - log.Errorf(ctx, "BUG: invalid timeline preload state: %#v", t) - } - } - return n, nil } -// Load will load timeline statuses according to given -// page, using provided callbacks to load extra data when -// necessary, and perform fine-grained filtering loaded -// database models before eventual return to the user. The -// returned strings are the lo, hi ID paging values, used -// for generation of next, prev page links in the response. - -// Load ... +// Load will load given page of timeline statuses. First it +// will prioritize fetching statuses from the sliding window +// that is the timeline cache of latest statuses, else it will +// fall back to loading from the database using callback funcs. +// The returned string values are the low / high status ID +// paging values, used in calculating next / prev page links. func (t *StatusTimeline) Load( ctx context.Context, page *paging.Page, @@ -354,13 +324,7 @@ func (t *StatusTimeline) Load( string, // hi error, ) { - // Ensure timeline is loaded. - if err := t.checkPreload(ctx, - loadPage, - filter, - ); err != nil { - return nil, "", "", err - } + var err error // Get paging details. lo := page.Min.Value @@ -376,62 +340,73 @@ func (t *StatusTimeline) Load( nextPg.Min.Value = lo nextPg.Max.Value = hi - // First we attempt to load status - // metadata entries from the timeline - // cache, up to given limit. - metas := t.cache.Select( - util.PtrIf(lo), - util.PtrIf(hi), - util.PtrIf(limit), - dir, - ) - - // We now reset the lo,hi values to - // represent the lowest and highest - // index values of loaded statuses. - // - // We continually update these while - // building up statuses to return, for - // caller to build next / prev page - // response values. - lo, hi = "", "" + // Interstitial meta objects. + var metas []*StatusMeta + // Returned frontend API statuses. var apiStatuses []*apimodel.Status - if len(metas) > 0 { - // Before we can do any filtering, we need - // to load status models for cached entries. - err := loadStatuses(metas, loadIDs) + // TODO: we can remove this nil + // check when we've updated all + // our timeline endpoints to have + // streamed timeline caches. + if t != nil { + + // Ensure timeline has been preloaded. + _, err = t.Preload(loadPage, filter) if err != nil { - return nil, "", "", gtserror.Newf("error loading statuses: %w", err) + return nil, "", "", err } - // Set initial lo, hi values. - lo = metas[len(metas)-1].ID - hi = metas[0].ID - - // Update paging parameters used for next database query. - nextPageParams(nextPg, metas[len(metas)-1].ID, order) - - // Allocate slice of expected required API models. - apiStatuses = make([]*apimodel.Status, 0, len(metas)) - - // Prepare frontend API models for - // the cached statuses. For now this - // also does its own extra filtering. - apiStatuses = prepareStatuses(ctx, - metas, - prepareAPI, - apiStatuses, - limit, + // First we attempt to load status + // metadata entries from the timeline + // cache, up to given limit. + metas = t.cache.Select( + util.PtrIf(lo), + util.PtrIf(hi), + util.PtrIf(limit), + dir, ) + + // Reset lo, hi values from being + // used as input arguments, ready + // to be updated for return. + lo, hi = "", "" + + if len(metas) > 0 { + // Before we can do any filtering, we need + // to load status models for cached entries. + err = loadStatuses(metas, loadIDs) + if err != nil { + return nil, "", "", gtserror.Newf("error loading statuses: %w", err) + } + + // Set initial lo, hi values. + lo = metas[len(metas)-1].ID + hi = metas[0].ID + + // Update paging parameters used for next database query. + nextPageParams(nextPg, metas[len(metas)-1].ID, order) + + // Allocate slice of expected required API models. + apiStatuses = make([]*apimodel.Status, 0, len(metas)) + + // Prepare frontend API models for + // the cached statuses. For now this + // also does its own extra filtering. + apiStatuses = prepareStatuses(ctx, + metas, + prepareAPI, + apiStatuses, + limit, + ) + } } // If no cached timeline statuses // were found for page, we need to // call through to the database. if len(apiStatuses) == 0 { - var err error // Pass through to main timeline db load function. apiStatuses, lo, hi, err = loadStatusTimeline(ctx, @@ -460,63 +435,13 @@ func (t *StatusTimeline) Load( return apiStatuses, lo, hi, nil } -// LoadStatusTimeline is a function that may be used to load a timeline -// page in a functionally similar way to StatusTimeline{}.Load(), but without -// actually having access to a StatusTimeline{}. For example, for timelines that -// we want to share code, but without yet implementing a cache for them. Note this -// function may be removed in the future when un-needed. -func LoadStatusTimeline( - ctx context.Context, - page *paging.Page, - loadPage func(page *paging.Page) (statuses []*gtsmodel.Status, err error), - filter func(each *gtsmodel.Status) (delete bool, err error), - prepareAPI func(status *gtsmodel.Status) (apiStatus *apimodel.Status, err error), -) ( - []*apimodel.Status, - string, // lo - string, // hi - error, -) { - // Use a copy of current page so - // we can repeatedly update it. - nextPg := new(paging.Page) - *nextPg = *page - - // Pass through to main timeline db load function. - apiStatuses, lo, hi, err := loadStatusTimeline(ctx, - nextPg, - nil, - nil, - loadPage, - filter, - prepareAPI, - ) - if err != nil { - return nil, "", "", err - } - - if page.Order().Ascending() { - // The caller always expects the statuses - // to be returned in DESC order, but we - // build the status slice in paging order. - // If paging ASC, we need to reverse the - // returned statuses and paging values. - slices.Reverse(apiStatuses) - lo, hi = hi, lo - } - - return apiStatuses, lo, hi, nil -} - -// loadStatusTimeline encapsulates most of the main -// timeline-load-from-database logic, allowing both -// the temporary LoadStatusTimeline() function AND -// the main StatusTimeline{}.Load() function to share -// as much logic as possible. +// loadStatusTimeline encapsulates the logic of iteratively +// attempting to load a status timeline page from the database, +// that is in the form of given callback functions. these will +// then be prepared to frontend API models for return. // -// TODO: it may be worth moving this into StatusTimeline{}.Load() -// once the temporary function above has been removed. Or it may -// still be worth keeping *some* database logic separate. +// in time it may make sense to move this logic +// into the StatusTimeline{}.Load() function. func loadStatusTimeline( ctx context.Context, nextPg *paging.Page, @@ -793,13 +718,7 @@ func (t *StatusTimeline) Trim() { t.cache.Trim(t.cut, structr.Asc) } // Clear will mark the entire timeline as requiring preload, // which will trigger a clear and reload of the entire thing. -func (t *StatusTimeline) Clear() { - t.preload.Store(func() *any { - var b bool - a := any(b) - return &a - }()) -} +func (t *StatusTimeline) Clear() { t.preloader.clear() } // prepareStatuses takes a slice of cached (or, freshly loaded!) StatusMeta{} // models, and use given function to return prepared frontend API models. diff --git a/internal/processing/timeline/timeline.go b/internal/processing/timeline/timeline.go index fa21329d4..1d22e2d5a 100644 --- a/internal/processing/timeline/timeline.go +++ b/internal/processing/timeline/timeline.go @@ -128,42 +128,31 @@ func (p *Processor) getStatusTimeline( return apiStatus, nil } - if timeline != nil { - // Load status page via timeline cache, also - // getting lo, hi values for next, prev pages. - apiStatuses, lo, hi, err = timeline.Load(ctx, + // Load status page via timeline cache, also + // getting lo, hi values for next, prev pages. + apiStatuses, lo, hi, err = timeline.Load(ctx, - // Status page - // to load. - page, + // Status page + // to load. + page, - // Caller provided database - // status page loading function. - loadPage, + // Caller provided database + // status page loading function. + loadPage, - // Status load function for cached timeline entries. - func(ids []string) ([]*gtsmodel.Status, error) { - return p.state.DB.GetStatusesByIDs(ctx, ids) - }, + // Status load function for cached timeline entries. + func(ids []string) ([]*gtsmodel.Status, error) { + return p.state.DB.GetStatusesByIDs(ctx, ids) + }, - // Filtering function, - // i.e. filter before caching. - filter, + // Filtering function, + // i.e. filter before caching. + filter, - // Frontend API model - // preparation function. - prepare, - ) - } else { - // Load status page without a receiving timeline cache. - // TODO: remove this code path when all support caching. - apiStatuses, lo, hi, err = timelinepkg.LoadStatusTimeline(ctx, - page, - loadPage, - filter, - prepare, - ) - } + // Frontend API model + // preparation function. + prepare, + ) if err != nil { err := gtserror.Newf("error loading timeline: %w", err)