add logic to allow dynamic clear / preloading of timelines

This commit is contained in:
kim 2025-04-08 16:28:33 +01:00
commit 83f6685437
5 changed files with 318 additions and 222 deletions

View file

@ -351,11 +351,6 @@ var Start action.GTSAction = func(ctx context.Context) error {
intFilter, intFilter,
) )
// Preload our local user's streaming timeline caches.
if err := process.Timeline().Preload(ctx); err != nil {
return fmt.Errorf("error preloading timelines: %w", err)
}
// Schedule background cleaning tasks. // Schedule background cleaning tasks.
if err := cleaner.ScheduleJobs(); err != nil { if err := cleaner.ScheduleJobs(); err != nil {
return fmt.Errorf("error scheduling cleaner jobs: %w", err) return fmt.Errorf("error scheduling cleaner jobs: %w", err)

View file

@ -19,8 +19,8 @@ package timeline
import ( import (
"context" "context"
"maps"
"slices" "slices"
"sync"
"sync/atomic" "sync/atomic"
"codeberg.org/gruf/go-structr" "codeberg.org/gruf/go-structr"
@ -68,181 +68,6 @@ type StatusMeta struct {
loaded *gtsmodel.Status loaded *gtsmodel.Status
} }
// StatusTimelines is a concurrency safe map of StatusTimeline{}
// objects, optimizing *very heavily* for reads over writes.
type StatusTimelines struct {
ptr atomic.Pointer[map[string]*StatusTimeline] // ronly except by CAS
cap int
}
// Init stores the given argument(s) such that any created StatusTimeline{}
// objects by MustGet() will initialize them with the given arguments.
func (t *StatusTimelines) Init(cap int) { t.cap = cap }
// MustGet will attempt to fetch StatusTimeline{} stored under key, else creating one.
func (t *StatusTimelines) MustGet(key string) *StatusTimeline {
var tt *StatusTimeline
for {
// Load current ptr.
cur := t.ptr.Load()
// Get timeline map to work on.
var m map[string]*StatusTimeline
if cur != nil {
// Look for existing
// timeline in cache.
tt = (*cur)[key]
if tt != nil {
return tt
}
// Get clone of current
// before modifications.
m = maps.Clone(*cur)
} else {
// Allocate new timeline map for below.
m = make(map[string]*StatusTimeline)
}
if tt == nil {
// Allocate new timeline.
tt = new(StatusTimeline)
tt.Init(t.cap)
}
// Store timeline
// in new map.
m[key] = tt
// Attempt to update the map ptr.
if !t.ptr.CompareAndSwap(cur, &m) {
// We failed the
// CAS, reloop.
continue
}
// Successfully inserted
// new timeline model.
return tt
}
}
// Delete will delete the stored StatusTimeline{} under key, if any.
func (t *StatusTimelines) Delete(key string) {
for {
// Load current ptr.
cur := t.ptr.Load()
// Check for empty map / not in map.
if cur == nil || (*cur)[key] == nil {
return
}
// Get clone of current
// before modifications.
m := maps.Clone(*cur)
// Delete ID.
delete(m, key)
// Attempt to update the map ptr.
if !t.ptr.CompareAndSwap(cur, &m) {
// We failed the
// CAS, reloop.
continue
}
// Successfully
// deleted ID.
return
}
}
// RemoveByStatusIDs calls RemoveByStatusIDs() for each of the stored StatusTimeline{}s.
func (t *StatusTimelines) RemoveByStatusIDs(statusIDs ...string) {
if p := t.ptr.Load(); p != nil {
for _, tt := range *p {
tt.RemoveByStatusIDs(statusIDs...)
}
}
}
// RemoveByAccountIDs calls RemoveByAccountIDs() for each of the stored StatusTimeline{}s.
func (t *StatusTimelines) RemoveByAccountIDs(accountIDs ...string) {
if p := t.ptr.Load(); p != nil {
for _, tt := range *p {
tt.RemoveByAccountIDs(accountIDs...)
}
}
}
// UnprepareByStatusIDs calls UnprepareByStatusIDs() for each of the stored StatusTimeline{}s.
func (t *StatusTimelines) UnprepareByStatusIDs(statusIDs ...string) {
if p := t.ptr.Load(); p != nil {
for _, tt := range *p {
tt.UnprepareByStatusIDs(statusIDs...)
}
}
}
// UnprepareByAccountIDs calls UnprepareByAccountIDs() for each of the stored StatusTimeline{}s.
func (t *StatusTimelines) UnprepareByAccountIDs(accountIDs ...string) {
if p := t.ptr.Load(); p != nil {
for _, tt := range *p {
tt.UnprepareByAccountIDs(accountIDs...)
}
}
}
// Unprepare attempts to call UnprepareAll() for StatusTimeline{} under key.
func (t *StatusTimelines) Unprepare(key string) {
if p := t.ptr.Load(); p != nil {
if tt := (*p)[key]; tt != nil {
tt.UnprepareAll()
}
}
}
// UnprepareAll calls UnprepareAll() for each of the stored StatusTimeline{}s.
func (t *StatusTimelines) UnprepareAll() {
if p := t.ptr.Load(); p != nil {
for _, tt := range *p {
tt.UnprepareAll()
}
}
}
// Trim calls Trim() for each of the stored StatusTimeline{}s.
func (t *StatusTimelines) Trim() {
if p := t.ptr.Load(); p != nil {
for _, tt := range *p {
tt.Trim()
}
}
}
// Clear attempts to call Clear() for StatusTimeline{} under key.
func (t *StatusTimelines) Clear(key string) {
if p := t.ptr.Load(); p != nil {
if tt := (*p)[key]; tt != nil {
tt.Clear()
}
}
}
// ClearAll calls Clear() for each of the stored StatusTimeline{}s.
func (t *StatusTimelines) ClearAll() {
if p := t.ptr.Load(); p != nil {
for _, tt := range *p {
tt.Clear()
}
}
}
// StatusTimeline provides a concurrency-safe timeline // StatusTimeline provides a concurrency-safe timeline
// cache of status information. Internally only StatusMeta{} // cache of status information. Internally only StatusMeta{}
// objects are stored, and the statuses themselves are loaded // objects are stored, and the statuses themselves are loaded
@ -255,6 +80,9 @@ type StatusTimeline struct {
// primary-keyed by ID, with extra indices below. // primary-keyed by ID, with extra indices below.
cache structr.Timeline[*StatusMeta, string] cache structr.Timeline[*StatusMeta, string]
// ...
preload atomic.Pointer[any]
// fast-access cache indices. // fast-access cache indices.
idx_ID *structr.Index //nolint:revive idx_ID *structr.Index //nolint:revive
idx_AccountID *structr.Index //nolint:revive idx_AccountID *structr.Index //nolint:revive
@ -320,6 +148,76 @@ func (t *StatusTimeline) Init(cap int) {
t.max = cap t.max = cap
} }
func (t *StatusTimeline) startPreload(
ctx context.Context,
old *any, // old 'preload' ptr
loadPage func(page *paging.Page) ([]*gtsmodel.Status, error),
filter func(*gtsmodel.Status) (bool, error),
) (
started bool,
err error,
) {
// Optimistically setup a
// new waitgroup to set as
// the preload waiter.
var wg sync.WaitGroup
wg.Add(1)
defer wg.Done()
// Wrap waitgroup in
// 'any' for pointer.
new := any(&wg)
// Attempt CAS operation to claim preload start.
started = t.preload.CompareAndSwap(old, &new)
if !started {
return
}
// Begin the preload.
_, err = t.Preload(ctx,
loadPage,
filter,
)
return
}
func (t *StatusTimeline) checkPreload(
ctx context.Context,
loadPage func(page *paging.Page) ([]*gtsmodel.Status, error),
filter func(*gtsmodel.Status) (bool, error),
) error {
for {
// Get preload state.
p := t.preload.Load()
if p == nil || *p == false {
// Timeline needs preloading, start this process.
ok, err := t.startPreload(ctx, p, loadPage, filter)
if !ok {
// Failed to acquire start,
// other thread beat us to it.
continue
}
// Return
// result.
return err
}
// Check for a preload currently in progress.
if wg, _ := (*p).(*sync.WaitGroup); wg != nil {
wg.Wait()
continue
}
// Anything else means
// timeline is ready.
return nil
}
}
// Preload ... // Preload ...
func (t *StatusTimeline) Preload( func (t *StatusTimeline) Preload(
ctx context.Context, ctx context.Context,
@ -336,6 +234,10 @@ func (t *StatusTimeline) Preload(
panic("nil load page func") panic("nil load page func")
} }
// Clear timeline
// before preload.
t.cache.Clear()
// Our starting, page at the top // Our starting, page at the top
// of the possible timeline. // of the possible timeline.
page := new(paging.Page) page := new(paging.Page)
@ -407,6 +309,12 @@ func (t *StatusTimeline) Preload(
} }
} }
// Mark timeline as preloaded.
old := t.preload.Swap(new(any))
if old != nil && *old != false {
log.Errorf(ctx, "BUG: invalid timeline preload state: %#v", *old)
}
return n, nil return n, nil
} }
@ -416,6 +324,8 @@ func (t *StatusTimeline) Preload(
// database models before eventual return to the user. The // database models before eventual return to the user. The
// returned strings are the lo, hi ID paging values, used // returned strings are the lo, hi ID paging values, used
// for generation of next, prev page links in the response. // for generation of next, prev page links in the response.
// Load ...
func (t *StatusTimeline) Load( func (t *StatusTimeline) Load(
ctx context.Context, ctx context.Context,
page *paging.Page, page *paging.Page,
@ -438,6 +348,14 @@ func (t *StatusTimeline) Load(
string, // hi string, // hi
error, error,
) { ) {
// Ensure timeline is loaded.
if err := t.checkPreload(ctx,
loadPage,
filter,
); err != nil {
return nil, "", "", err
}
// Get paging details. // Get paging details.
lo := page.Min.Value lo := page.Min.Value
hi := page.Max.Value hi := page.Max.Value
@ -867,8 +785,15 @@ func (t *StatusTimeline) UnprepareAll() {
// trim from the bottom-up to prioritize streamed inserts. // trim from the bottom-up to prioritize streamed inserts.
func (t *StatusTimeline) Trim() { t.cache.Trim(t.cut, structr.Asc) } func (t *StatusTimeline) Trim() { t.cache.Trim(t.cut, structr.Asc) }
// Clear will remove all cached entries from underlying timeline. // Clear will mark the entire timeline as requiring preload,
func (t *StatusTimeline) Clear() { t.cache.Trim(0, structr.Desc) } // which will trigger a clear and reload of the entire thing.
func (t *StatusTimeline) Clear() {
t.preload.Store(func() *any {
var b bool
a := any(b)
return &a
}())
}
// prepareStatuses takes a slice of cached (or, freshly loaded!) StatusMeta{} // prepareStatuses takes a slice of cached (or, freshly loaded!) StatusMeta{}
// models, and use given function to return prepared frontend API models. // models, and use given function to return prepared frontend API models.

198
internal/cache/timeline/status_map.go vendored Normal file
View file

@ -0,0 +1,198 @@
// GoToSocial
// Copyright (C) GoToSocial Authors admin@gotosocial.org
// SPDX-License-Identifier: AGPL-3.0-or-later
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package timeline
import (
"maps"
"sync/atomic"
)
// StatusTimelines is a concurrency safe map of StatusTimeline{}
// objects, optimizing *very heavily* for reads over writes.
type StatusTimelines struct {
ptr atomic.Pointer[map[string]*StatusTimeline] // ronly except by CAS
cap int
}
// Init stores the given argument(s) such that any created StatusTimeline{}
// objects by MustGet() will initialize them with the given arguments.
func (t *StatusTimelines) Init(cap int) { t.cap = cap }
// MustGet will attempt to fetch StatusTimeline{} stored under key, else creating one.
func (t *StatusTimelines) MustGet(key string) *StatusTimeline {
var tt *StatusTimeline
for {
// Load current ptr.
cur := t.ptr.Load()
// Get timeline map to work on.
var m map[string]*StatusTimeline
if cur != nil {
// Look for existing
// timeline in cache.
tt = (*cur)[key]
if tt != nil {
return tt
}
// Get clone of current
// before modifications.
m = maps.Clone(*cur)
} else {
// Allocate new timeline map for below.
m = make(map[string]*StatusTimeline)
}
if tt == nil {
// Allocate new timeline.
tt = new(StatusTimeline)
tt.Init(t.cap)
}
// Store timeline
// in new map.
m[key] = tt
// Attempt to update the map ptr.
if !t.ptr.CompareAndSwap(cur, &m) {
// We failed the
// CAS, reloop.
continue
}
// Successfully inserted
// new timeline model.
return tt
}
}
// Delete will delete the stored StatusTimeline{} under key, if any.
func (t *StatusTimelines) Delete(key string) {
for {
// Load current ptr.
cur := t.ptr.Load()
// Check for empty map / not in map.
if cur == nil || (*cur)[key] == nil {
return
}
// Get clone of current
// before modifications.
m := maps.Clone(*cur)
// Delete ID.
delete(m, key)
// Attempt to update the map ptr.
if !t.ptr.CompareAndSwap(cur, &m) {
// We failed the
// CAS, reloop.
continue
}
// Successfully
// deleted ID.
return
}
}
// RemoveByStatusIDs calls RemoveByStatusIDs() for each of the stored StatusTimeline{}s.
func (t *StatusTimelines) RemoveByStatusIDs(statusIDs ...string) {
if p := t.ptr.Load(); p != nil {
for _, tt := range *p {
tt.RemoveByStatusIDs(statusIDs...)
}
}
}
// RemoveByAccountIDs calls RemoveByAccountIDs() for each of the stored StatusTimeline{}s.
func (t *StatusTimelines) RemoveByAccountIDs(accountIDs ...string) {
if p := t.ptr.Load(); p != nil {
for _, tt := range *p {
tt.RemoveByAccountIDs(accountIDs...)
}
}
}
// UnprepareByStatusIDs calls UnprepareByStatusIDs() for each of the stored StatusTimeline{}s.
func (t *StatusTimelines) UnprepareByStatusIDs(statusIDs ...string) {
if p := t.ptr.Load(); p != nil {
for _, tt := range *p {
tt.UnprepareByStatusIDs(statusIDs...)
}
}
}
// UnprepareByAccountIDs calls UnprepareByAccountIDs() for each of the stored StatusTimeline{}s.
func (t *StatusTimelines) UnprepareByAccountIDs(accountIDs ...string) {
if p := t.ptr.Load(); p != nil {
for _, tt := range *p {
tt.UnprepareByAccountIDs(accountIDs...)
}
}
}
// Unprepare attempts to call UnprepareAll() for StatusTimeline{} under key.
func (t *StatusTimelines) Unprepare(key string) {
if p := t.ptr.Load(); p != nil {
if tt := (*p)[key]; tt != nil {
tt.UnprepareAll()
}
}
}
// UnprepareAll calls UnprepareAll() for each of the stored StatusTimeline{}s.
func (t *StatusTimelines) UnprepareAll() {
if p := t.ptr.Load(); p != nil {
for _, tt := range *p {
tt.UnprepareAll()
}
}
}
// Trim calls Trim() for each of the stored StatusTimeline{}s.
func (t *StatusTimelines) Trim() {
if p := t.ptr.Load(); p != nil {
for _, tt := range *p {
tt.Trim()
}
}
}
// Clear attempts to call Clear() for StatusTimeline{} under key.
func (t *StatusTimelines) Clear(key string) {
if p := t.ptr.Load(); p != nil {
if tt := (*p)[key]; tt != nil {
tt.Clear()
}
}
}
// ClearAll calls Clear() for each of the stored StatusTimeline{}s.
func (t *StatusTimelines) ClearAll() {
if p := t.ptr.Load(); p != nil {
for _, tt := range *p {
tt.Clear()
}
}
}

View file

@ -1,3 +1,20 @@
// GoToSocial
// Copyright (C) GoToSocial Authors admin@gotosocial.org
// SPDX-License-Identifier: AGPL-3.0-or-later
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package timeline package timeline
import ( import (

View file

@ -29,7 +29,6 @@ import (
statusfilter "github.com/superseriousbusiness/gotosocial/internal/filter/status" statusfilter "github.com/superseriousbusiness/gotosocial/internal/filter/status"
"github.com/superseriousbusiness/gotosocial/internal/filter/usermute" "github.com/superseriousbusiness/gotosocial/internal/filter/usermute"
"github.com/superseriousbusiness/gotosocial/internal/filter/visibility" "github.com/superseriousbusiness/gotosocial/internal/filter/visibility"
"github.com/superseriousbusiness/gotosocial/internal/gtscontext"
"github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/id" "github.com/superseriousbusiness/gotosocial/internal/id"
@ -62,44 +61,6 @@ func New(state *state.State, converter *typeutils.Converter, visFilter *visibili
} }
} }
// Preload ...
func (p *Processor) Preload(ctx context.Context) error {
// Get all of our local user accounts.
users, err := p.state.DB.GetAllUsers(ctx)
if err != nil {
return gtserror.Newf("error getting users: %w", err)
}
for _, user := range users {
// Get associated account.
account := user.Account
// Preload this user account's home timeline cache.
if err := p.preloadHomeTimeline(ctx, account); err != nil {
return gtserror.Newf("error preloading home timeline: %w", err)
}
// Get all lists owned by this user account.
lists, err := p.state.DB.GetListsByAccountID(
gtscontext.SetBarebones(ctx),
account.ID,
)
if err != nil {
return gtserror.Newf("error getting account %s lists: %w", account.ID, err)
}
for _, list := range lists {
// Preload each of this user account's list timeline caches.
if err := p.preloadListTimeline(ctx, account, list); err != nil {
return gtserror.Newf("error preloading list timeline: %w", err)
}
}
}
return nil
}
func (p *Processor) getStatusTimeline( func (p *Processor) getStatusTimeline(
ctx context.Context, ctx context.Context,
requester *gtsmodel.Account, requester *gtsmodel.Account,