diff --git a/internal/cache/cache.go b/internal/cache/cache.go index df2a9e49c..a63c35ae9 100644 --- a/internal/cache/cache.go +++ b/internal/cache/cache.go @@ -215,6 +215,7 @@ func (c *Caches) Sweep(threshold float64) { c.DB.UserMuteIDs.Trim(threshold) c.Timelines.Home.Trim(threshold) c.Timelines.List.Trim(threshold) + c.Timelines.Public.Trim(threshold) c.Visibility.Trim(threshold) } diff --git a/internal/cache/timeline.go b/internal/cache/timeline.go index 4f385cafd..14c91bcb9 100644 --- a/internal/cache/timeline.go +++ b/internal/cache/timeline.go @@ -18,20 +18,17 @@ package cache import ( - "codeberg.org/gruf/go-structr" - apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" "github.com/superseriousbusiness/gotosocial/internal/cache/timeline" - "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/log" ) type TimelineCaches struct { // Home ... - Home TimelinesCache[*gtsmodel.Status] + Home timeline.StatusTimelines // List ... - List TimelinesCache[*gtsmodel.Status] + List timeline.StatusTimelines // Public ... Public timeline.StatusTimeline @@ -42,16 +39,7 @@ func (c *Caches) initHomeTimelines() { log.Infof(nil, "cache size = %d", cap) - c.Timelines.Home.Init(structr.TimelineConfig[*gtsmodel.Status, string]{ - PKey: "StatusID", - Indices: []structr.IndexConfig{ - {Fields: "StatusID"}, - {Fields: "AccountID"}, - {Fields: "BoostOfStatusID"}, - {Fields: "BoostOfAccountID"}, - }, - Copy: copyStatus, - }, cap) + c.Timelines.Home.Init(cap) } func (c *Caches) initListTimelines() { @@ -59,16 +47,7 @@ func (c *Caches) initListTimelines() { log.Infof(nil, "cache size = %d", cap) - c.Timelines.List.Init(structr.TimelineConfig[*gtsmodel.Status, string]{ - PKey: "StatusID", - Indices: []structr.IndexConfig{ - {Fields: "StatusID"}, - {Fields: "AccountID"}, - {Fields: "BoostOfStatusID"}, - {Fields: "BoostOfAccountID"}, - }, - Copy: copyStatus, - }, cap) + c.Timelines.List.Init(cap) } func (c *Caches) initPublicTimeline() { @@ -78,54 +57,3 @@ func (c *Caches) initPublicTimeline() { c.Timelines.Public.Init(cap) } - -type TimelineStatus struct { - - // ID ... - ID string - - // AccountID ... - AccountID string - - // BoostOfID ... - BoostOfID string - - // BoostOfAccountID ... - BoostOfAccountID string - - // Local ... - Local bool - - // Loaded is a temporary field that may be - // set for a newly loaded timeline status - // so that statuses don't need to be loaded - // from the database twice in succession. - // - // i.e. this will only be set if the status - // was newly inserted into the timeline cache. - // for existing cache items this will be nil. - Loaded *gtsmodel.Status - - // Prepared contains prepared frontend API - // model for the referenced status. This may - // or may-not be nil depending on whether the - // status has been "unprepared" since the last - // call to "prepare" the frontend model. - Prepared *apimodel.Status -} - -func (s *TimelineStatus) Copy() *TimelineStatus { - var prepared *apimodel.Status - if s.Prepared != nil { - prepared = new(apimodel.Status) - *prepared = *s.Prepared - } - return &TimelineStatus{ - ID: s.ID, - AccountID: s.AccountID, - BoostOfID: s.BoostOfID, - BoostOfAccountID: s.BoostOfAccountID, - Loaded: nil, // NEVER set - Prepared: prepared, - } -} diff --git a/internal/cache/timeline/status.go b/internal/cache/timeline/status.go index 29e185642..f75f00154 100644 --- a/internal/cache/timeline/status.go +++ b/internal/cache/timeline/status.go @@ -19,7 +19,9 @@ package timeline import ( "context" + "maps" "slices" + "sync/atomic" "codeberg.org/gruf/go-structr" @@ -30,23 +32,14 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/paging" ) -// StatusMeta ... +// StatusMeta contains minimum viable metadata +// about a Status in order to cache a timeline. type StatusMeta struct { - - // ID ... - ID string - - // AccountID ... - AccountID string - - // BoostOfID ... - BoostOfID string - - // BoostOfAccountID ... + ID string + AccountID string + BoostOfID string BoostOfAccountID string - - // Local ... - Local bool + Local bool // prepared contains prepared frontend API // model for the referenced status. This may @@ -66,6 +59,176 @@ type StatusMeta struct { loaded *gtsmodel.Status } +// StatusTimelines ... +type StatusTimelines struct { + ptr atomic.Pointer[map[string]*StatusTimeline] // ronly except by CAS + cap int +} + +// Init ... +func (t *StatusTimelines) Init(cap int) { t.cap = cap } + +// MustGet ... +func (t *StatusTimelines) MustGet(key string) *StatusTimeline { + var tt *StatusTimeline + + for { + // Load current ptr. + cur := t.ptr.Load() + + // Get timeline map to work on. + var m map[string]*StatusTimeline + + if cur != nil { + // Look for existing + // timeline in cache. + tt = (*cur)[key] + if tt != nil { + return tt + } + + // Get clone of current + // before modifications. + m = maps.Clone(*cur) + } else { + // Allocate new timeline map for below. + m = make(map[string]*StatusTimeline) + } + + if tt == nil { + // Allocate new timeline. + tt = new(StatusTimeline) + tt.Init(t.cap) + } + + // Store timeline + // in new map. + m[key] = tt + + // Attempt to update the map ptr. + if !t.ptr.CompareAndSwap(cur, &m) { + + // We failed the + // CAS, reloop. + continue + } + + // Successfully inserted + // new timeline model. + return tt + } +} + +// Delete ... +func (t *StatusTimelines) Delete(key string) { + for { + // Load current ptr. + cur := t.ptr.Load() + + // Check for empty map / not in map. + if cur == nil || (*cur)[key] == nil { + return + } + + // Get clone of current + // before modifications. + m := maps.Clone(*cur) + + // Delete ID. + delete(m, key) + + // Attempt to update the map ptr. + if !t.ptr.CompareAndSwap(cur, &m) { + + // We failed the + // CAS, reloop. + continue + } + + // Successfully + // deleted ID. + return + } +} + +// Insert ... +func (t *StatusTimelines) Insert(statuses ...*gtsmodel.Status) { + meta := toStatusMeta(statuses) + if p := t.ptr.Load(); p != nil { + for _, tt := range *p { + tt.cache.Insert(meta...) + } + } +} + +// InsertInto ... +func (t *StatusTimelines) InsertInto(key string, statuses ...*gtsmodel.Status) { + t.MustGet(key).Insert(statuses...) +} + +// RemoveByStatusIDs ... +func (t *StatusTimelines) RemoveByStatusIDs(statusIDs ...string) { + if p := t.ptr.Load(); p != nil { + for _, tt := range *p { + tt.RemoveByStatusIDs(statusIDs...) + } + } +} + +// RemoveByAccountIDs ... +func (t *StatusTimelines) RemoveByAccountIDs(accountIDs ...string) { + if p := t.ptr.Load(); p != nil { + for _, tt := range *p { + tt.RemoveByAccountIDs(accountIDs...) + } + } +} + +// UnprepareByStatusIDs ... +func (t *StatusTimelines) UnprepareByStatusIDs(statusIDs ...string) { + if p := t.ptr.Load(); p != nil { + for _, tt := range *p { + tt.UnprepareByStatusIDs(statusIDs...) + } + } +} + +// UnprepareByAccountIDs ... +func (t *StatusTimelines) UnprepareByAccountIDs(accountIDs ...string) { + if p := t.ptr.Load(); p != nil { + for _, tt := range *p { + tt.UnprepareByAccountIDs(accountIDs...) + } + } +} + +// Trim ... +func (t *StatusTimelines) Trim(threshold float64) { + if p := t.ptr.Load(); p != nil { + for _, tt := range *p { + tt.Trim(threshold) + } + } +} + +// Clear ... +func (t *StatusTimelines) Clear(key string) { + if p := t.ptr.Load(); p != nil { + if tt := (*p)[key]; tt != nil { + tt.Clear() + } + } +} + +// ClearAll ... +func (t *StatusTimelines) ClearAll() { + if p := t.ptr.Load(); p != nil { + for _, tt := range *p { + tt.Clear() + } + } +} + // StatusTimeline ... type StatusTimeline struct { @@ -242,11 +405,22 @@ func (t *StatusTimeline) Load( return apiStatuses, nil } +// Insert ... +func (t *StatusTimeline) Insert(statuses ...*gtsmodel.Status) { + t.cache.Insert(toStatusMeta(statuses)...) +} + // RemoveByStatusID removes all cached timeline entries pertaining to // status ID, including those that may be a boost of the given status. func (t *StatusTimeline) RemoveByStatusIDs(statusIDs ...string) { keys := make([]structr.Key, len(statusIDs)) + // Nil check indices outside loops. + if t.idx_ID == nil || + t.idx_BoostOfID == nil { + panic("indices are nil") + } + // Convert statusIDs to index keys. for i, id := range statusIDs { keys[i] = t.idx_ID.Key(id) @@ -269,6 +443,12 @@ func (t *StatusTimeline) RemoveByStatusIDs(statusIDs ...string) { func (t *StatusTimeline) RemoveByAccountIDs(accountIDs ...string) { keys := make([]structr.Key, len(accountIDs)) + // Nil check indices outside loops. + if t.idx_AccountID == nil || + t.idx_BoostOfAccountID == nil { + panic("indices are nil") + } + // Convert accountIDs to index keys. for i, id := range accountIDs { keys[i] = t.idx_AccountID.Key(id) @@ -291,27 +471,31 @@ func (t *StatusTimeline) RemoveByAccountIDs(accountIDs ...string) { func (t *StatusTimeline) UnprepareByStatusIDs(statusIDs ...string) { keys := make([]structr.Key, len(statusIDs)) + // Nil check indices outside loops. + if t.idx_ID == nil || + t.idx_BoostOfID == nil { + panic("indices are nil") + } + // Convert statusIDs to index keys. for i, id := range statusIDs { keys[i] = t.idx_ID.Key(id) } - // TODO: replace below with for-range-function loop when Go1.23. - t.cache.RangeKeys(t.idx_ID, keys...)(func(meta *StatusMeta) bool { + // Unprepare all statuses stored under StatusMeta.ID. + for meta := range t.cache.RangeKeys(t.idx_ID, keys...) { meta.prepared = nil - return true - }) + } // Convert statusIDs to index keys. for i, id := range statusIDs { keys[i] = t.idx_BoostOfID.Key(id) } - // TODO: replace below with for-range-function loop when Go1.23. - t.cache.RangeKeys(t.idx_BoostOfID, keys...)(func(meta *StatusMeta) bool { + // Unprepare all statuses stored under StatusMeta.BoostOfID. + for meta := range t.cache.RangeKeys(t.idx_BoostOfID, keys...) { meta.prepared = nil - return true - }) + } } // UnprepareByAccountIDs removes cached frontend API models for all cached @@ -319,27 +503,36 @@ func (t *StatusTimeline) UnprepareByStatusIDs(statusIDs ...string) { func (t *StatusTimeline) UnprepareByAccountIDs(accountIDs ...string) { keys := make([]structr.Key, len(accountIDs)) + // Nil check indices outside loops. + if t.idx_AccountID == nil || + t.idx_BoostOfAccountID == nil { + panic("indices are nil") + } + // Convert accountIDs to index keys. for i, id := range accountIDs { keys[i] = t.idx_AccountID.Key(id) } - // TODO: replace below with for-range-function loop when Go1.23. - t.cache.RangeKeys(t.idx_AccountID, keys...)(func(meta *StatusMeta) bool { + // Unprepare all statuses stored under StatusMeta.AccountID. + for meta := range t.cache.RangeKeys(t.idx_AccountID, keys...) { meta.prepared = nil - return true - }) + } // Convert accountIDs to index keys. for i, id := range accountIDs { keys[i] = t.idx_BoostOfAccountID.Key(id) } - // TODO: replace below with for-range-function loop when Go1.23. - t.cache.RangeKeys(t.idx_BoostOfAccountID, keys...)(func(meta *StatusMeta) bool { + // Unprepare all statuses stored under StatusMeta.BoostOfAccountID. + for meta := range t.cache.RangeKeys(t.idx_BoostOfAccountID, keys...) { meta.prepared = nil - return true - }) + } +} + +// Trim ... +func (t *StatusTimeline) Trim(threshold float64) { + panic("TODO") } // Clear will remove all cached entries from timeline. diff --git a/internal/cache/wrappers.go b/internal/cache/wrappers.go index 1754fbf9b..34d7cb8db 100644 --- a/internal/cache/wrappers.go +++ b/internal/cache/wrappers.go @@ -18,13 +18,10 @@ package cache import ( - "maps" "slices" - "sync/atomic" "codeberg.org/gruf/go-cache/v3/simple" "codeberg.org/gruf/go-structr" - "github.com/superseriousbusiness/gotosocial/internal/paging" ) // SliceCache wraps a simple.Cache to provide simple loader-callback @@ -193,218 +190,3 @@ func (c *StructCache[T]) InvalidateIDs(index string, ids []string) { // Pass to main invalidate func. c.Cache.Invalidate(i, keys...) } - -type TimelineCache[T any] struct { - structr.Timeline[T, string] - index map[string]*structr.Index - maxSz int -} - -func (t *TimelineCache[T]) Init(config structr.TimelineConfig[T, string], maxSz int) { - t.index = make(map[string]*structr.Index, len(config.Indices)) - t.Timeline = structr.Timeline[T, string]{} - t.Timeline.Init(config) - for _, cfg := range config.Indices { - t.index[cfg.Fields] = t.Timeline.Index(cfg.Fields) - } - t.maxSz = maxSz -} - -func toDirection(order paging.Order) structr.Direction { - switch order { - case paging.OrderAscending: - return structr.Asc - case paging.OrderDescending: - return structr.Desc - default: - panic("invalid order") - } -} - -func (t *TimelineCache[T]) Select(page *paging.Page) []T { - min, max := page.Min.Value, page.Max.Value - lim, dir := page.Limit, toDirection(page.Order()) - return t.Timeline.Select(min, max, lim, dir) -} - -func (t *TimelineCache[T]) Invalidate(index string, keyParts ...any) { - i := t.index[index] - t.Timeline.Invalidate(i, i.Key(keyParts...)) -} - -func (t *TimelineCache[T]) Trim(perc float64) { - t.Timeline.Trim(perc, t.maxSz, structr.Asc) -} - -func (t *TimelineCache[T]) InvalidateIDs(index string, ids []string) { - i := t.index[index] - if i == nil { - // we only perform this check here as - // we're going to use the index before - // passing it to cache in main .Load(). - panic("missing index for cache type") - } - - // Generate cache keys for ID types. - keys := make([]structr.Key, len(ids)) - for x, id := range ids { - keys[x] = i.Key(id) - } - - // Pass to main invalidate func. - t.Timeline.Invalidate(i, keys...) -} - -// TimelinesCache provides a cache of TimelineCache{} -// objects, keyed by string and concurrency safe, optimized -// almost entirely for reads. On each creation of a new key -// in the cache, the entire internal map will be cloned, BUT -// all reads are only a single atomic operation, no mutex locks! -type TimelinesCache[T any] struct { - cfg structr.TimelineConfig[T, string] - ptr atomic.Pointer[map[string]*TimelineCache[T]] // ronly except by CAS - max int -} - -// Init ... -func (t *TimelinesCache[T]) Init(config structr.TimelineConfig[T, string], max int) { - // Create new test timeline to validate. - (&TimelineCache[T]{}).Init(config, max) - - // Invalidate - // timeline maps. - t.ptr.Store(nil) - - // Set config. - t.cfg = config - t.max = max -} - -// Get fetches a timeline with given ID from cache, creating it if required. -func (t *TimelinesCache[T]) Get(id string) *TimelineCache[T] { - var tt *TimelineCache[T] - - for { - // Load current ptr. - cur := t.ptr.Load() - - // Get timeline map to work on. - var m map[string]*TimelineCache[T] - - if cur != nil { - // Look for existing - // timeline in cache. - tt = (*cur)[id] - if tt != nil { - return tt - } - - // Get clone of current - // before modifications. - m = maps.Clone(*cur) - } else { - // Allocate new timeline map for below. - m = make(map[string]*TimelineCache[T]) - } - - if tt == nil { - // Allocate new timeline. - tt = new(TimelineCache[T]) - tt.Init(t.cfg, t.max) - } - - // Store timeline - // in new map. - m[id] = tt - - // Attempt to update the map ptr. - if !t.ptr.CompareAndSwap(cur, &m) { - - // We failed the - // CAS, reloop. - continue - } - - // Successfully inserted - // new timeline model. - return tt - } -} - -// Delete removes timeline with ID from cache. -func (t *TimelinesCache[T]) Delete(id string) { - for { - // Load current ptr. - cur := t.ptr.Load() - - // Check for empty map / not in map. - if cur == nil || (*cur)[id] == nil { - return - } - - // Get clone of current - // before modifications. - m := maps.Clone(*cur) - - // Delete ID. - delete(m, id) - - // Attempt to update the map ptr. - if !t.ptr.CompareAndSwap(cur, &m) { - - // We failed the - // CAS, reloop. - continue - } - - // Successfully - // deleted ID. - return - } -} - -func (t *TimelinesCache[T]) Insert(values ...T) { - if p := t.ptr.Load(); p != nil { - for _, timeline := range *p { - timeline.Insert(values...) - } - } -} - -func (t *TimelinesCache[T]) InsertInto(id string, values ...T) { - t.Get(id).Insert(values...) -} - -func (t *TimelinesCache[T]) Invalidate(index string, keyParts ...any) { - if p := t.ptr.Load(); p != nil { - for _, timeline := range *p { - timeline.Invalidate(index, keyParts...) - } - } -} - -func (t *TimelinesCache[T]) InvalidateFrom(id string, index string, keyParts ...any) { - t.Get(id).Invalidate(index, keyParts...) -} - -func (t *TimelinesCache[T]) InvalidateIDs(index string, ids []string) { - if p := t.ptr.Load(); p != nil { - for _, timeline := range *p { - timeline.InvalidateIDs(index, ids) - } - } -} - -func (t *TimelinesCache[T]) InvalidateIDsFrom(id string, index string, ids []string) { - t.Get(id).InvalidateIDs(index, ids) -} - -func (t *TimelinesCache[T]) Trim(perc float64) { - if p := t.ptr.Load(); p != nil { - for _, timeline := range *p { - timeline.Trim(perc) - } - } -} - -func (t *TimelinesCache[T]) Clear(id string) { t.Get(id).Clear() } diff --git a/internal/db/bundb/list.go b/internal/db/bundb/list.go index f81c59c42..d35a3a562 100644 --- a/internal/db/bundb/list.go +++ b/internal/db/bundb/list.go @@ -176,7 +176,6 @@ func (l *listDB) UpdateList(ctx context.Context, list *gtsmodel.List, columns .. return err } - // Invalidate this entire list's timeline. if err := l.state.Timelines.List.RemoveTimeline(ctx, list.ID); err != nil { log.Errorf(ctx, "error invalidating list timeline: %q", err) } diff --git a/internal/processing/timeline/timeline.go b/internal/processing/timeline/timeline.go index b285265ff..0cf15b1c7 100644 --- a/internal/processing/timeline/timeline.go +++ b/internal/processing/timeline/timeline.go @@ -139,7 +139,7 @@ func (p *Processor) getStatusTimeline( func (p *Processor) getTimeline( ctx context.Context, requester *gtsmodel.Account, - timeline *cache.TimelineCache[*gtsmodel.Status], + timeline *timeline.StatusTimeline, page *paging.Page, pgPath string, // timeline page path pgQuery url.Values, // timeline query parameters