diff --git a/cmd/gotosocial/action/server/server.go b/cmd/gotosocial/action/server/server.go index b43ed25a2..6bc27a7c4 100644 --- a/cmd/gotosocial/action/server/server.go +++ b/cmd/gotosocial/action/server/server.go @@ -351,11 +351,6 @@ var Start action.GTSAction = func(ctx context.Context) error { intFilter, ) - // Preload our local user's streaming timeline caches. - if err := process.Timeline().Preload(ctx); err != nil { - return fmt.Errorf("error preloading timelines: %w", err) - } - // Schedule background cleaning tasks. if err := cleaner.ScheduleJobs(); err != nil { return fmt.Errorf("error scheduling cleaner jobs: %w", err) diff --git a/internal/cache/timeline/status.go b/internal/cache/timeline/status.go index bf8c8f5df..0cf7e6deb 100644 --- a/internal/cache/timeline/status.go +++ b/internal/cache/timeline/status.go @@ -19,8 +19,8 @@ package timeline import ( "context" - "maps" "slices" + "sync" "sync/atomic" "codeberg.org/gruf/go-structr" @@ -68,181 +68,6 @@ type StatusMeta struct { loaded *gtsmodel.Status } -// StatusTimelines is a concurrency safe map of StatusTimeline{} -// objects, optimizing *very heavily* for reads over writes. -type StatusTimelines struct { - ptr atomic.Pointer[map[string]*StatusTimeline] // ronly except by CAS - cap int -} - -// Init stores the given argument(s) such that any created StatusTimeline{} -// objects by MustGet() will initialize them with the given arguments. -func (t *StatusTimelines) Init(cap int) { t.cap = cap } - -// MustGet will attempt to fetch StatusTimeline{} stored under key, else creating one. -func (t *StatusTimelines) MustGet(key string) *StatusTimeline { - var tt *StatusTimeline - - for { - // Load current ptr. - cur := t.ptr.Load() - - // Get timeline map to work on. - var m map[string]*StatusTimeline - - if cur != nil { - // Look for existing - // timeline in cache. - tt = (*cur)[key] - if tt != nil { - return tt - } - - // Get clone of current - // before modifications. - m = maps.Clone(*cur) - } else { - // Allocate new timeline map for below. - m = make(map[string]*StatusTimeline) - } - - if tt == nil { - // Allocate new timeline. - tt = new(StatusTimeline) - tt.Init(t.cap) - } - - // Store timeline - // in new map. - m[key] = tt - - // Attempt to update the map ptr. - if !t.ptr.CompareAndSwap(cur, &m) { - - // We failed the - // CAS, reloop. - continue - } - - // Successfully inserted - // new timeline model. - return tt - } -} - -// Delete will delete the stored StatusTimeline{} under key, if any. -func (t *StatusTimelines) Delete(key string) { - for { - // Load current ptr. - cur := t.ptr.Load() - - // Check for empty map / not in map. - if cur == nil || (*cur)[key] == nil { - return - } - - // Get clone of current - // before modifications. - m := maps.Clone(*cur) - - // Delete ID. - delete(m, key) - - // Attempt to update the map ptr. - if !t.ptr.CompareAndSwap(cur, &m) { - - // We failed the - // CAS, reloop. - continue - } - - // Successfully - // deleted ID. - return - } -} - -// RemoveByStatusIDs calls RemoveByStatusIDs() for each of the stored StatusTimeline{}s. -func (t *StatusTimelines) RemoveByStatusIDs(statusIDs ...string) { - if p := t.ptr.Load(); p != nil { - for _, tt := range *p { - tt.RemoveByStatusIDs(statusIDs...) - } - } -} - -// RemoveByAccountIDs calls RemoveByAccountIDs() for each of the stored StatusTimeline{}s. -func (t *StatusTimelines) RemoveByAccountIDs(accountIDs ...string) { - if p := t.ptr.Load(); p != nil { - for _, tt := range *p { - tt.RemoveByAccountIDs(accountIDs...) - } - } -} - -// UnprepareByStatusIDs calls UnprepareByStatusIDs() for each of the stored StatusTimeline{}s. -func (t *StatusTimelines) UnprepareByStatusIDs(statusIDs ...string) { - if p := t.ptr.Load(); p != nil { - for _, tt := range *p { - tt.UnprepareByStatusIDs(statusIDs...) - } - } -} - -// UnprepareByAccountIDs calls UnprepareByAccountIDs() for each of the stored StatusTimeline{}s. -func (t *StatusTimelines) UnprepareByAccountIDs(accountIDs ...string) { - if p := t.ptr.Load(); p != nil { - for _, tt := range *p { - tt.UnprepareByAccountIDs(accountIDs...) - } - } -} - -// Unprepare attempts to call UnprepareAll() for StatusTimeline{} under key. -func (t *StatusTimelines) Unprepare(key string) { - if p := t.ptr.Load(); p != nil { - if tt := (*p)[key]; tt != nil { - tt.UnprepareAll() - } - } -} - -// UnprepareAll calls UnprepareAll() for each of the stored StatusTimeline{}s. -func (t *StatusTimelines) UnprepareAll() { - if p := t.ptr.Load(); p != nil { - for _, tt := range *p { - tt.UnprepareAll() - } - } -} - -// Trim calls Trim() for each of the stored StatusTimeline{}s. -func (t *StatusTimelines) Trim() { - if p := t.ptr.Load(); p != nil { - for _, tt := range *p { - tt.Trim() - } - } -} - -// Clear attempts to call Clear() for StatusTimeline{} under key. -func (t *StatusTimelines) Clear(key string) { - if p := t.ptr.Load(); p != nil { - if tt := (*p)[key]; tt != nil { - tt.Clear() - } - } -} - -// ClearAll calls Clear() for each of the stored StatusTimeline{}s. -func (t *StatusTimelines) ClearAll() { - if p := t.ptr.Load(); p != nil { - for _, tt := range *p { - tt.Clear() - } - } -} - // StatusTimeline provides a concurrency-safe timeline // cache of status information. Internally only StatusMeta{} // objects are stored, and the statuses themselves are loaded @@ -255,6 +80,9 @@ type StatusTimeline struct { // primary-keyed by ID, with extra indices below. cache structr.Timeline[*StatusMeta, string] + // ... + preload atomic.Pointer[any] + // fast-access cache indices. idx_ID *structr.Index //nolint:revive idx_AccountID *structr.Index //nolint:revive @@ -320,6 +148,76 @@ func (t *StatusTimeline) Init(cap int) { t.max = cap } +func (t *StatusTimeline) startPreload( + ctx context.Context, + old *any, // old 'preload' ptr + loadPage func(page *paging.Page) ([]*gtsmodel.Status, error), + filter func(*gtsmodel.Status) (bool, error), +) ( + started bool, + err error, +) { + // Optimistically setup a + // new waitgroup to set as + // the preload waiter. + var wg sync.WaitGroup + wg.Add(1) + defer wg.Done() + + // Wrap waitgroup in + // 'any' for pointer. + new := any(&wg) + + // Attempt CAS operation to claim preload start. + started = t.preload.CompareAndSwap(old, &new) + if !started { + return + } + + // Begin the preload. + _, err = t.Preload(ctx, + loadPage, + filter, + ) + return +} + +func (t *StatusTimeline) checkPreload( + ctx context.Context, + loadPage func(page *paging.Page) ([]*gtsmodel.Status, error), + filter func(*gtsmodel.Status) (bool, error), +) error { + for { + // Get preload state. + p := t.preload.Load() + + if p == nil || *p == false { + // Timeline needs preloading, start this process. + ok, err := t.startPreload(ctx, p, loadPage, filter) + + if !ok { + // Failed to acquire start, + // other thread beat us to it. + continue + } + + // Return + // result. + return err + } + + // Check for a preload currently in progress. + if wg, _ := (*p).(*sync.WaitGroup); wg != nil { + wg.Wait() + continue + } + + // Anything else means + // timeline is ready. + return nil + } +} + // Preload ... func (t *StatusTimeline) Preload( ctx context.Context, @@ -336,6 +234,10 @@ func (t *StatusTimeline) Preload( panic("nil load page func") } + // Clear timeline + // before preload. + t.cache.Clear() + // Our starting, page at the top // of the possible timeline. page := new(paging.Page) @@ -407,6 +309,12 @@ func (t *StatusTimeline) Preload( } } + // Mark timeline as preloaded. + old := t.preload.Swap(new(any)) + if old != nil && *old != false { + log.Errorf(ctx, "BUG: invalid timeline preload state: %#v", *old) + } + return n, nil } @@ -416,6 +324,8 @@ func (t *StatusTimeline) Preload( // database models before eventual return to the user. The // returned strings are the lo, hi ID paging values, used // for generation of next, prev page links in the response. + +// Load ... func (t *StatusTimeline) Load( ctx context.Context, page *paging.Page, @@ -438,6 +348,14 @@ func (t *StatusTimeline) Load( string, // hi error, ) { + // Ensure timeline is loaded. + if err := t.checkPreload(ctx, + loadPage, + filter, + ); err != nil { + return nil, "", "", err + } + // Get paging details. lo := page.Min.Value hi := page.Max.Value @@ -867,8 +785,15 @@ func (t *StatusTimeline) UnprepareAll() { // trim from the bottom-up to prioritize streamed inserts. func (t *StatusTimeline) Trim() { t.cache.Trim(t.cut, structr.Asc) } -// Clear will remove all cached entries from underlying timeline. -func (t *StatusTimeline) Clear() { t.cache.Trim(0, structr.Desc) } +// Clear will mark the entire timeline as requiring preload, +// which will trigger a clear and reload of the entire thing. +func (t *StatusTimeline) Clear() { + t.preload.Store(func() *any { + var b bool + a := any(b) + return &a + }()) +} // prepareStatuses takes a slice of cached (or, freshly loaded!) StatusMeta{} // models, and use given function to return prepared frontend API models. diff --git a/internal/cache/timeline/status_map.go b/internal/cache/timeline/status_map.go new file mode 100644 index 000000000..e402883af --- /dev/null +++ b/internal/cache/timeline/status_map.go @@ -0,0 +1,198 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package timeline + +import ( + "maps" + "sync/atomic" +) + +// StatusTimelines is a concurrency safe map of StatusTimeline{} +// objects, optimizing *very heavily* for reads over writes. +type StatusTimelines struct { + ptr atomic.Pointer[map[string]*StatusTimeline] // ronly except by CAS + cap int +} + +// Init stores the given argument(s) such that any created StatusTimeline{} +// objects by MustGet() will initialize them with the given arguments. +func (t *StatusTimelines) Init(cap int) { t.cap = cap } + +// MustGet will attempt to fetch StatusTimeline{} stored under key, else creating one. +func (t *StatusTimelines) MustGet(key string) *StatusTimeline { + var tt *StatusTimeline + + for { + // Load current ptr. + cur := t.ptr.Load() + + // Get timeline map to work on. + var m map[string]*StatusTimeline + + if cur != nil { + // Look for existing + // timeline in cache. + tt = (*cur)[key] + if tt != nil { + return tt + } + + // Get clone of current + // before modifications. + m = maps.Clone(*cur) + } else { + // Allocate new timeline map for below. + m = make(map[string]*StatusTimeline) + } + + if tt == nil { + // Allocate new timeline. + tt = new(StatusTimeline) + tt.Init(t.cap) + } + + // Store timeline + // in new map. + m[key] = tt + + // Attempt to update the map ptr. + if !t.ptr.CompareAndSwap(cur, &m) { + + // We failed the + // CAS, reloop. + continue + } + + // Successfully inserted + // new timeline model. + return tt + } +} + +// Delete will delete the stored StatusTimeline{} under key, if any. +func (t *StatusTimelines) Delete(key string) { + for { + // Load current ptr. + cur := t.ptr.Load() + + // Check for empty map / not in map. + if cur == nil || (*cur)[key] == nil { + return + } + + // Get clone of current + // before modifications. + m := maps.Clone(*cur) + + // Delete ID. + delete(m, key) + + // Attempt to update the map ptr. + if !t.ptr.CompareAndSwap(cur, &m) { + + // We failed the + // CAS, reloop. + continue + } + + // Successfully + // deleted ID. + return + } +} + +// RemoveByStatusIDs calls RemoveByStatusIDs() for each of the stored StatusTimeline{}s. +func (t *StatusTimelines) RemoveByStatusIDs(statusIDs ...string) { + if p := t.ptr.Load(); p != nil { + for _, tt := range *p { + tt.RemoveByStatusIDs(statusIDs...) + } + } +} + +// RemoveByAccountIDs calls RemoveByAccountIDs() for each of the stored StatusTimeline{}s. +func (t *StatusTimelines) RemoveByAccountIDs(accountIDs ...string) { + if p := t.ptr.Load(); p != nil { + for _, tt := range *p { + tt.RemoveByAccountIDs(accountIDs...) + } + } +} + +// UnprepareByStatusIDs calls UnprepareByStatusIDs() for each of the stored StatusTimeline{}s. +func (t *StatusTimelines) UnprepareByStatusIDs(statusIDs ...string) { + if p := t.ptr.Load(); p != nil { + for _, tt := range *p { + tt.UnprepareByStatusIDs(statusIDs...) + } + } +} + +// UnprepareByAccountIDs calls UnprepareByAccountIDs() for each of the stored StatusTimeline{}s. +func (t *StatusTimelines) UnprepareByAccountIDs(accountIDs ...string) { + if p := t.ptr.Load(); p != nil { + for _, tt := range *p { + tt.UnprepareByAccountIDs(accountIDs...) + } + } +} + +// Unprepare attempts to call UnprepareAll() for StatusTimeline{} under key. +func (t *StatusTimelines) Unprepare(key string) { + if p := t.ptr.Load(); p != nil { + if tt := (*p)[key]; tt != nil { + tt.UnprepareAll() + } + } +} + +// UnprepareAll calls UnprepareAll() for each of the stored StatusTimeline{}s. +func (t *StatusTimelines) UnprepareAll() { + if p := t.ptr.Load(); p != nil { + for _, tt := range *p { + tt.UnprepareAll() + } + } +} + +// Trim calls Trim() for each of the stored StatusTimeline{}s. +func (t *StatusTimelines) Trim() { + if p := t.ptr.Load(); p != nil { + for _, tt := range *p { + tt.Trim() + } + } +} + +// Clear attempts to call Clear() for StatusTimeline{} under key. +func (t *StatusTimelines) Clear(key string) { + if p := t.ptr.Load(); p != nil { + if tt := (*p)[key]; tt != nil { + tt.Clear() + } + } +} + +// ClearAll calls Clear() for each of the stored StatusTimeline{}s. +func (t *StatusTimelines) ClearAll() { + if p := t.ptr.Load(); p != nil { + for _, tt := range *p { + tt.Clear() + } + } +} diff --git a/internal/cache/timeline/status_test.go b/internal/cache/timeline/status_test.go index 7e07bfc9c..99a2054ba 100644 --- a/internal/cache/timeline/status_test.go +++ b/internal/cache/timeline/status_test.go @@ -1,3 +1,20 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + package timeline import ( diff --git a/internal/processing/timeline/timeline.go b/internal/processing/timeline/timeline.go index ed2bf7b90..fa21329d4 100644 --- a/internal/processing/timeline/timeline.go +++ b/internal/processing/timeline/timeline.go @@ -29,7 +29,6 @@ import ( statusfilter "github.com/superseriousbusiness/gotosocial/internal/filter/status" "github.com/superseriousbusiness/gotosocial/internal/filter/usermute" "github.com/superseriousbusiness/gotosocial/internal/filter/visibility" - "github.com/superseriousbusiness/gotosocial/internal/gtscontext" "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/id" @@ -62,44 +61,6 @@ func New(state *state.State, converter *typeutils.Converter, visFilter *visibili } } -// Preload ... -func (p *Processor) Preload(ctx context.Context) error { - - // Get all of our local user accounts. - users, err := p.state.DB.GetAllUsers(ctx) - if err != nil { - return gtserror.Newf("error getting users: %w", err) - } - - for _, user := range users { - // Get associated account. - account := user.Account - - // Preload this user account's home timeline cache. - if err := p.preloadHomeTimeline(ctx, account); err != nil { - return gtserror.Newf("error preloading home timeline: %w", err) - } - - // Get all lists owned by this user account. - lists, err := p.state.DB.GetListsByAccountID( - gtscontext.SetBarebones(ctx), - account.ID, - ) - if err != nil { - return gtserror.Newf("error getting account %s lists: %w", account.ID, err) - } - - for _, list := range lists { - // Preload each of this user account's list timeline caches. - if err := p.preloadListTimeline(ctx, account, list); err != nil { - return gtserror.Newf("error preloading list timeline: %w", err) - } - } - } - - return nil -} - func (p *Processor) getStatusTimeline( ctx context.Context, requester *gtsmodel.Account,