much improved status timeline code comments

This commit is contained in:
kim 2025-04-08 18:03:32 +01:00
commit 00d8a1f8ac
3 changed files with 277 additions and 240 deletions

129
internal/cache/timeline/preload.go vendored Normal file
View file

@ -0,0 +1,129 @@
// GoToSocial
// Copyright (C) GoToSocial Authors admin@gotosocial.org
// SPDX-License-Identifier: AGPL-3.0-or-later
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package timeline
import (
"sync"
"sync/atomic"
"github.com/superseriousbusiness/gotosocial/internal/log"
)
// preloader provides a means of synchronising the
// initial fill, or "preload", of a timeline cache.
// it has 4 possible states in the atomic pointer:
// - preloading = &(interface{}(*sync.WaitGroup))
// - preloaded = &(interface{}(nil))
// - needs preload = &(interface{}(false))
// - brand-new = nil (functionally same as 'needs preload')
type preloader struct{ p atomic.Pointer[any] }
// Check will concurrency-safely check the preload
// state, and if needed call the provided function.
// if a preload is in progress, it will wait until complete.
func (p *preloader) Check(preload func()) {
for {
// Get state ptr.
ptr := p.p.Load()
if ptr == nil || *ptr == false {
// Needs preloading, start it.
ok := p.start(ptr, preload)
if !ok {
// Failed to acquire start,
// other thread beat us to it.
continue
}
// Success!
return
}
// Check for a preload currently in progress.
if wg, _ := (*ptr).(*sync.WaitGroup); wg != nil {
wg.Wait()
continue
}
// Anything else
// means success.
return
}
}
// start attempts to start the given preload function, by
// performing a CAS operation with 'old'. return is success.
func (p *preloader) start(old *any, preload func()) bool {
// Optimistically setup a
// new waitgroup to set as
// the preload waiter.
var wg sync.WaitGroup
wg.Add(1)
defer wg.Done()
// Wrap waitgroup in
// 'any' for pointer.
new := any(&wg)
// Attempt CAS operation to claim start.
started := p.p.CompareAndSwap(old, &new)
if !started {
return false
}
// Start.
preload()
return true
}
// done marks state as preloaded,
// i.e. no more preload required.
func (p *preloader) done() {
old := p.p.Swap(new(any))
if old == nil { // was brand-new
return
}
switch t := (*old).(type) {
case *sync.WaitGroup: // was preloading
default:
log.Errorf(nil, "BUG: invalid preloader state: %#v", t)
}
}
// clear will clear the state, marking a "preload" as required.
// i.e. next call to Check() will call provided preload func.
func (p *preloader) clear() {
b := false
a := any(b)
for {
old := p.p.Swap(&a)
if old == nil { // was brand-new
return
}
switch t := (*old).(type) {
case nil: // was preloaded
return
case bool: // was cleared
return
case *sync.WaitGroup: // was preloading
t.Wait()
}
}
}

View file

@ -20,8 +20,6 @@ package timeline
import (
"context"
"slices"
"sync"
"sync/atomic"
"codeberg.org/gruf/go-structr"
@ -68,22 +66,44 @@ type StatusMeta struct {
loaded *gtsmodel.Status
}
// StatusTimeline provides a concurrency-safe timeline
// cache of status information. Internally only StatusMeta{}
// objects are stored, and the statuses themselves are loaded
// as-needed, caching prepared frontend representations where
// possible. This is largely wrapping code for our own codebase
// to be able to smoothly interact with structr.Timeline{}.
// ...
// StatusTimeline provides a concurrency-safe sliding-window
// cache of the freshest statuses in a timeline. Internally,
// only StatusMeta{} objects themselves are stored, loading
// the actual statuses when necessary, but caching prepared
// frontend API models where possible.
//
// Notes on design:
//
// Previously, and initially when designing this newer type,
// we had status timeline caches that would dynamically fill
// themselves with statuses on call to Load() with statuses
// at *any* location in the timeline, while simultaneously
// accepting new input of statuses from the background workers.
// This unfortunately can lead to situations where posts need
// to be fetched from the database, but the cache isn't aware
// they exist and instead returns an incomplete selection.
// This problem is best outlined by the follow simple example:
//
// "what if my timeline cache contains posts 0-to-6 and 8-to-12,
// and i make a request for posts between 4-and-10 with no limit,
// how is it to know that it's missing post 7?"
//
// The solution is to unfortunately remove a lot of the caching
// of "older areas" of the timeline, and instead just have it
// be a sliding window of the freshest posts of that timeline.
// It gets preloaded initially on start / first-call, and kept
// up-to-date with new posts by streamed inserts from background
// workers. Any requests for posts outside this we know therefore
// must hit the database, (which we then *don't* cache).
type StatusTimeline struct {
// underlying timeline cache of *StatusMeta{},
// primary-keyed by ID, with extra indices below.
cache structr.Timeline[*StatusMeta, string]
// ...
preload atomic.Pointer[any]
// preloader synchronizes preload
// state of the timeline cache.
preloader preloader
// fast-access cache indices.
idx_ID *structr.Index //nolint:revive
@ -150,79 +170,41 @@ func (t *StatusTimeline) Init(cap int) {
t.max = cap
}
func (t *StatusTimeline) startPreload(
ctx context.Context,
old *any, // old 'preload' ptr
loadPage func(page *paging.Page) ([]*gtsmodel.Status, error),
filter func(*gtsmodel.Status) (bool, error),
// Preload will fill with StatusTimeline{} cache with
// the latest sliding window of status metadata for the
// timeline type returned by database 'loadPage' function.
//
// This function is concurrency-safe and repeated calls to
// it when already preloaded will be no-ops. To trigger a
// preload as being required, call .Clear().
func (t *StatusTimeline) Preload(
// loadPage should load the timeline of given page for cache hydration.
loadPage func(page *paging.Page) (statuses []*gtsmodel.Status, err error),
// filter can be used to perform filtering of returned
// statuses BEFORE insert into cache. i.e. this will effect
// what actually gets stored in the timeline cache.
filter func(each *gtsmodel.Status) (delete bool, err error),
) (
started bool,
n int,
err error,
) {
// Optimistically setup a
// new waitgroup to set as
// the preload waiter.
var wg sync.WaitGroup
wg.Add(1)
defer wg.Done()
t.preloader.Check(func() {
n, err = t.preload(loadPage, filter)
if err != nil {
return
}
// Wrap waitgroup in
// 'any' for pointer.
new := any(&wg)
// Attempt CAS operation to claim preload start.
started = t.preload.CompareAndSwap(old, &new)
if !started {
return
}
// Begin the preload.
_, err = t.Preload(ctx,
loadPage,
filter,
)
// Mark preloaded.
t.preloader.done()
})
return
}
func (t *StatusTimeline) checkPreload(
ctx context.Context,
loadPage func(page *paging.Page) ([]*gtsmodel.Status, error),
filter func(*gtsmodel.Status) (bool, error),
) error {
for {
// Get preload state.
p := t.preload.Load()
if p == nil || *p == false {
// Timeline needs preloading, start this process.
ok, err := t.startPreload(ctx, p, loadPage, filter)
if !ok {
// Failed to acquire start,
// other thread beat us to it.
continue
}
// Return
// result.
return err
}
// Check for a preload currently in progress.
if wg, _ := (*p).(*sync.WaitGroup); wg != nil {
wg.Wait()
continue
}
// Anything else means
// timeline is ready.
return nil
}
}
// Preload ...
func (t *StatusTimeline) Preload(
ctx context.Context,
// preload contains the core logic of
// Preload(), without t.preloader checks.
func (t *StatusTimeline) preload(
// loadPage should load the timeline of given page for cache hydration.
loadPage func(page *paging.Page) (statuses []*gtsmodel.Status, err error),
@ -311,27 +293,15 @@ func (t *StatusTimeline) Preload(
}
}
// Mark timeline as preloaded.
old := t.preload.Swap(new(any))
if old != nil {
switch t := (*old).(type) {
case *sync.WaitGroup:
default:
log.Errorf(ctx, "BUG: invalid timeline preload state: %#v", t)
}
}
return n, nil
}
// Load will load timeline statuses according to given
// page, using provided callbacks to load extra data when
// necessary, and perform fine-grained filtering loaded
// database models before eventual return to the user. The
// returned strings are the lo, hi ID paging values, used
// for generation of next, prev page links in the response.
// Load ...
// Load will load given page of timeline statuses. First it
// will prioritize fetching statuses from the sliding window
// that is the timeline cache of latest statuses, else it will
// fall back to loading from the database using callback funcs.
// The returned string values are the low / high status ID
// paging values, used in calculating next / prev page links.
func (t *StatusTimeline) Load(
ctx context.Context,
page *paging.Page,
@ -354,13 +324,7 @@ func (t *StatusTimeline) Load(
string, // hi
error,
) {
// Ensure timeline is loaded.
if err := t.checkPreload(ctx,
loadPage,
filter,
); err != nil {
return nil, "", "", err
}
var err error
// Get paging details.
lo := page.Min.Value
@ -376,62 +340,73 @@ func (t *StatusTimeline) Load(
nextPg.Min.Value = lo
nextPg.Max.Value = hi
// First we attempt to load status
// metadata entries from the timeline
// cache, up to given limit.
metas := t.cache.Select(
util.PtrIf(lo),
util.PtrIf(hi),
util.PtrIf(limit),
dir,
)
// We now reset the lo,hi values to
// represent the lowest and highest
// index values of loaded statuses.
//
// We continually update these while
// building up statuses to return, for
// caller to build next / prev page
// response values.
lo, hi = "", ""
// Interstitial meta objects.
var metas []*StatusMeta
// Returned frontend API statuses.
var apiStatuses []*apimodel.Status
if len(metas) > 0 {
// Before we can do any filtering, we need
// to load status models for cached entries.
err := loadStatuses(metas, loadIDs)
// TODO: we can remove this nil
// check when we've updated all
// our timeline endpoints to have
// streamed timeline caches.
if t != nil {
// Ensure timeline has been preloaded.
_, err = t.Preload(loadPage, filter)
if err != nil {
return nil, "", "", gtserror.Newf("error loading statuses: %w", err)
return nil, "", "", err
}
// Set initial lo, hi values.
lo = metas[len(metas)-1].ID
hi = metas[0].ID
// Update paging parameters used for next database query.
nextPageParams(nextPg, metas[len(metas)-1].ID, order)
// Allocate slice of expected required API models.
apiStatuses = make([]*apimodel.Status, 0, len(metas))
// Prepare frontend API models for
// the cached statuses. For now this
// also does its own extra filtering.
apiStatuses = prepareStatuses(ctx,
metas,
prepareAPI,
apiStatuses,
limit,
// First we attempt to load status
// metadata entries from the timeline
// cache, up to given limit.
metas = t.cache.Select(
util.PtrIf(lo),
util.PtrIf(hi),
util.PtrIf(limit),
dir,
)
// Reset lo, hi values from being
// used as input arguments, ready
// to be updated for return.
lo, hi = "", ""
if len(metas) > 0 {
// Before we can do any filtering, we need
// to load status models for cached entries.
err = loadStatuses(metas, loadIDs)
if err != nil {
return nil, "", "", gtserror.Newf("error loading statuses: %w", err)
}
// Set initial lo, hi values.
lo = metas[len(metas)-1].ID
hi = metas[0].ID
// Update paging parameters used for next database query.
nextPageParams(nextPg, metas[len(metas)-1].ID, order)
// Allocate slice of expected required API models.
apiStatuses = make([]*apimodel.Status, 0, len(metas))
// Prepare frontend API models for
// the cached statuses. For now this
// also does its own extra filtering.
apiStatuses = prepareStatuses(ctx,
metas,
prepareAPI,
apiStatuses,
limit,
)
}
}
// If no cached timeline statuses
// were found for page, we need to
// call through to the database.
if len(apiStatuses) == 0 {
var err error
// Pass through to main timeline db load function.
apiStatuses, lo, hi, err = loadStatusTimeline(ctx,
@ -460,63 +435,13 @@ func (t *StatusTimeline) Load(
return apiStatuses, lo, hi, nil
}
// LoadStatusTimeline is a function that may be used to load a timeline
// page in a functionally similar way to StatusTimeline{}.Load(), but without
// actually having access to a StatusTimeline{}. For example, for timelines that
// we want to share code, but without yet implementing a cache for them. Note this
// function may be removed in the future when un-needed.
func LoadStatusTimeline(
ctx context.Context,
page *paging.Page,
loadPage func(page *paging.Page) (statuses []*gtsmodel.Status, err error),
filter func(each *gtsmodel.Status) (delete bool, err error),
prepareAPI func(status *gtsmodel.Status) (apiStatus *apimodel.Status, err error),
) (
[]*apimodel.Status,
string, // lo
string, // hi
error,
) {
// Use a copy of current page so
// we can repeatedly update it.
nextPg := new(paging.Page)
*nextPg = *page
// Pass through to main timeline db load function.
apiStatuses, lo, hi, err := loadStatusTimeline(ctx,
nextPg,
nil,
nil,
loadPage,
filter,
prepareAPI,
)
if err != nil {
return nil, "", "", err
}
if page.Order().Ascending() {
// The caller always expects the statuses
// to be returned in DESC order, but we
// build the status slice in paging order.
// If paging ASC, we need to reverse the
// returned statuses and paging values.
slices.Reverse(apiStatuses)
lo, hi = hi, lo
}
return apiStatuses, lo, hi, nil
}
// loadStatusTimeline encapsulates most of the main
// timeline-load-from-database logic, allowing both
// the temporary LoadStatusTimeline() function AND
// the main StatusTimeline{}.Load() function to share
// as much logic as possible.
// loadStatusTimeline encapsulates the logic of iteratively
// attempting to load a status timeline page from the database,
// that is in the form of given callback functions. these will
// then be prepared to frontend API models for return.
//
// TODO: it may be worth moving this into StatusTimeline{}.Load()
// once the temporary function above has been removed. Or it may
// still be worth keeping *some* database logic separate.
// in time it may make sense to move this logic
// into the StatusTimeline{}.Load() function.
func loadStatusTimeline(
ctx context.Context,
nextPg *paging.Page,
@ -793,13 +718,7 @@ func (t *StatusTimeline) Trim() { t.cache.Trim(t.cut, structr.Asc) }
// Clear will mark the entire timeline as requiring preload,
// which will trigger a clear and reload of the entire thing.
func (t *StatusTimeline) Clear() {
t.preload.Store(func() *any {
var b bool
a := any(b)
return &a
}())
}
func (t *StatusTimeline) Clear() { t.preloader.clear() }
// prepareStatuses takes a slice of cached (or, freshly loaded!) StatusMeta{}
// models, and use given function to return prepared frontend API models.

View file

@ -128,42 +128,31 @@ func (p *Processor) getStatusTimeline(
return apiStatus, nil
}
if timeline != nil {
// Load status page via timeline cache, also
// getting lo, hi values for next, prev pages.
apiStatuses, lo, hi, err = timeline.Load(ctx,
// Load status page via timeline cache, also
// getting lo, hi values for next, prev pages.
apiStatuses, lo, hi, err = timeline.Load(ctx,
// Status page
// to load.
page,
// Status page
// to load.
page,
// Caller provided database
// status page loading function.
loadPage,
// Caller provided database
// status page loading function.
loadPage,
// Status load function for cached timeline entries.
func(ids []string) ([]*gtsmodel.Status, error) {
return p.state.DB.GetStatusesByIDs(ctx, ids)
},
// Status load function for cached timeline entries.
func(ids []string) ([]*gtsmodel.Status, error) {
return p.state.DB.GetStatusesByIDs(ctx, ids)
},
// Filtering function,
// i.e. filter before caching.
filter,
// Filtering function,
// i.e. filter before caching.
filter,
// Frontend API model
// preparation function.
prepare,
)
} else {
// Load status page without a receiving timeline cache.
// TODO: remove this code path when all support caching.
apiStatuses, lo, hi, err = timelinepkg.LoadStatusTimeline(ctx,
page,
loadPage,
filter,
prepare,
)
}
// Frontend API model
// preparation function.
prepare,
)
if err != nil {
err := gtserror.Newf("error loading timeline: %w", err)