[bugfix/chore] Refactor timeline code (#1656)

* start poking timelines

* OK yes we're refactoring, but it's nothing like the last time so don't worry

* more fiddling

* update tests, simplify Get

* thanks linter, you're the best, mwah mwah kisses

* do a bit more tidying up

* start buggering about with the prepare function

* fix little oopsie

* start merging lists into 1

* ik heb een heel zwaar leven
nee nee echt waar

* hey it works we did it reddit

* regenerate swagger docs

* tidy up a wee bit

* adjust paging

* fix little error, remove unused functions
This commit is contained in:
tobi 2023-04-06 13:43:13 +02:00 committed by GitHub
commit 3510454768
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
22 changed files with 1319 additions and 1365 deletions

View file

@ -20,14 +20,18 @@ package timeline
import (
"context"
"fmt"
"strings"
"sync"
"time"
"codeberg.org/gruf/go-kv"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/log"
)
const (
pruneLengthIndexed = 400
pruneLengthPrepared = 50
)
// Manager abstracts functions for creating timelines for multiple accounts, and adding, removing, and fetching entries from those timelines.
//
// By the time a timelineable hits the manager interface, it should already have been filtered and it should be established that the item indeed
@ -41,38 +45,37 @@ import (
// Prepared items consist of the item's database ID, the time it was created, AND the apimodel representation of that item, for quick serialization.
// Prepared items of course take up more memory than indexed items, so they should be regularly pruned if they're not being actively served.
type Manager interface {
// Ingest takes one item and indexes it into the timeline for the given account ID.
//
// It should already be established before calling this function that the item actually belongs in the timeline!
//
// The returned bool indicates whether the item was actually put in the timeline. This could be false in cases where
// the item is a boosted status, but a boost of the original status or the status itself already exists recently in the timeline.
Ingest(ctx context.Context, item Timelineable, timelineAccountID string) (bool, error)
// IngestAndPrepare takes one timelineable and indexes it into the timeline for the given account ID, and then immediately prepares it for serving.
// IngestOne takes one timelineable and indexes it into the timeline for the given account ID, and then immediately prepares it for serving.
// This is useful in cases where we know the item will need to be shown at the top of a user's timeline immediately (eg., a new status is created).
//
// It should already be established before calling this function that the item actually belongs in the timeline!
//
// The returned bool indicates whether the item was actually put in the timeline. This could be false in cases where
// a status is a boost, but a boost of the original status or the status itself already exists recently in the timeline.
IngestAndPrepare(ctx context.Context, item Timelineable, timelineAccountID string) (bool, error)
IngestOne(ctx context.Context, accountID string, item Timelineable) (bool, error)
// GetTimeline returns limit n amount of prepared entries from the timeline of the given account ID, in descending chronological order.
// If maxID is provided, it will return prepared entries from that maxID onwards, inclusive.
GetTimeline(ctx context.Context, accountID string, maxID string, sinceID string, minID string, limit int, local bool) ([]Preparable, error)
// GetIndexedLength returns the amount of items that have been *indexed* for the given account ID.
GetIndexedLength(ctx context.Context, timelineAccountID string) int
// GetIndexedLength returns the amount of items that have been indexed for the given account ID.
GetIndexedLength(ctx context.Context, accountID string) int
// GetOldestIndexedID returns the id ID for the oldest item that we have indexed for the given account.
GetOldestIndexedID(ctx context.Context, timelineAccountID string) (string, error)
// PrepareXFromTop prepares limit n amount of items, based on their indexed representations, from the top of the index.
PrepareXFromTop(ctx context.Context, timelineAccountID string, limit int) error
// Will be an empty string if nothing is (yet) indexed.
GetOldestIndexedID(ctx context.Context, accountID string) string
// Remove removes one item from the timeline of the given timelineAccountID
Remove(ctx context.Context, timelineAccountID string, itemID string) (int, error)
Remove(ctx context.Context, accountID string, itemID string) (int, error)
// WipeItemFromAllTimelines removes one item from the index and prepared items of all timelines
WipeItemFromAllTimelines(ctx context.Context, itemID string) error
// WipeStatusesFromAccountID removes all items by the given accountID from the timelineAccountID's timelines.
WipeItemsFromAccountID(ctx context.Context, timelineAccountID string, accountID string) error
// Start starts hourly cleanup jobs for this timeline manager.
Start() error
// Stop stops the timeline manager (currently a stub, doesn't do anything).
Stop() error
}
@ -97,31 +100,41 @@ type manager struct {
}
func (m *manager) Start() error {
// range through all timelines in the sync map once per hour + prune as necessary
// Start a background goroutine which iterates
// through all stored timelines once per hour,
// and cleans up old entries if that timeline
// hasn't been accessed in the last hour.
go func() {
for now := range time.NewTicker(1 * time.Hour).C {
m.accountTimelines.Range(func(key any, value any) bool {
timelineAccountID, ok := key.(string)
// Define the range function inside here,
// so that we can use the 'now' returned
// by the ticker, instead of having to call
// time.Now() multiple times.
//
// Unless it panics, this function always
// returns 'true', to continue the Range
// call through the sync.Map.
f := func(_ any, v any) bool {
timeline, ok := v.(Timeline)
if !ok {
panic("couldn't parse timeline manager sync map key as string, this should never happen so panic")
log.Panic(nil, "couldn't parse timeline manager sync map value as Timeline, this should never happen so panic")
}
t, ok := value.(Timeline)
if !ok {
panic("couldn't parse timeline manager sync map value as Timeline, this should never happen so panic")
if now.Sub(timeline.LastGot()) < 1*time.Hour {
// Timeline has been fetched in the
// last hour, move on to the next one.
return true
}
anHourAgo := now.Add(-1 * time.Hour)
if lastGot := t.LastGot(); lastGot.Before(anHourAgo) {
amountPruned := t.Prune(defaultDesiredPreparedItemsLength, defaultDesiredIndexedItemsLength)
log.WithFields(kv.Fields{
{"timelineAccountID", timelineAccountID},
{"amountPruned", amountPruned},
}...).Info("pruned indexed and prepared items from timeline")
if amountPruned := timeline.Prune(pruneLengthPrepared, pruneLengthIndexed); amountPruned > 0 {
log.WithField("accountID", timeline.AccountID()).Infof("pruned %d indexed and prepared items from timeline", amountPruned)
}
return true
})
}
// Execute the function for each timeline.
m.accountTimelines.Range(f)
}
}()
@ -132,146 +145,69 @@ func (m *manager) Stop() error {
return nil
}
func (m *manager) Ingest(ctx context.Context, item Timelineable, timelineAccountID string) (bool, error) {
l := log.WithContext(ctx).
WithFields(kv.Fields{
{"timelineAccountID", timelineAccountID},
{"itemID", item.GetID()},
}...)
t, err := m.getOrCreateTimeline(ctx, timelineAccountID)
if err != nil {
return false, err
}
l.Trace("ingesting item")
return t.IndexOne(ctx, item.GetID(), item.GetBoostOfID(), item.GetAccountID(), item.GetBoostOfAccountID())
func (m *manager) IngestOne(ctx context.Context, accountID string, item Timelineable) (bool, error) {
return m.getOrCreateTimeline(ctx, accountID).IndexAndPrepareOne(
ctx,
item.GetID(),
item.GetBoostOfID(),
item.GetAccountID(),
item.GetBoostOfAccountID(),
)
}
func (m *manager) IngestAndPrepare(ctx context.Context, item Timelineable, timelineAccountID string) (bool, error) {
l := log.WithContext(ctx).
WithFields(kv.Fields{
{"timelineAccountID", timelineAccountID},
{"itemID", item.GetID()},
}...)
t, err := m.getOrCreateTimeline(ctx, timelineAccountID)
if err != nil {
return false, err
}
l.Trace("ingesting item")
return t.IndexAndPrepareOne(ctx, item.GetID(), item.GetBoostOfID(), item.GetAccountID(), item.GetBoostOfAccountID())
func (m *manager) Remove(ctx context.Context, accountID string, itemID string) (int, error) {
return m.getOrCreateTimeline(ctx, accountID).Remove(ctx, itemID)
}
func (m *manager) Remove(ctx context.Context, timelineAccountID string, itemID string) (int, error) {
l := log.WithContext(ctx).
WithFields(kv.Fields{
{"timelineAccountID", timelineAccountID},
{"itemID", itemID},
}...)
t, err := m.getOrCreateTimeline(ctx, timelineAccountID)
if err != nil {
return 0, err
}
l.Trace("removing item")
return t.Remove(ctx, itemID)
func (m *manager) GetTimeline(ctx context.Context, accountID string, maxID string, sinceID string, minID string, limit int, local bool) ([]Preparable, error) {
return m.getOrCreateTimeline(ctx, accountID).Get(ctx, limit, maxID, sinceID, minID, true)
}
func (m *manager) GetTimeline(ctx context.Context, timelineAccountID string, maxID string, sinceID string, minID string, limit int, local bool) ([]Preparable, error) {
l := log.WithContext(ctx).
WithFields(kv.Fields{{"timelineAccountID", timelineAccountID}}...)
t, err := m.getOrCreateTimeline(ctx, timelineAccountID)
if err != nil {
return nil, err
}
items, err := t.Get(ctx, limit, maxID, sinceID, minID, true)
if err != nil {
l.Errorf("error getting statuses: %s", err)
}
return items, nil
func (m *manager) GetIndexedLength(ctx context.Context, accountID string) int {
return m.getOrCreateTimeline(ctx, accountID).Len()
}
func (m *manager) GetIndexedLength(ctx context.Context, timelineAccountID string) int {
t, err := m.getOrCreateTimeline(ctx, timelineAccountID)
if err != nil {
return 0
}
return t.ItemIndexLength(ctx)
}
func (m *manager) GetOldestIndexedID(ctx context.Context, timelineAccountID string) (string, error) {
t, err := m.getOrCreateTimeline(ctx, timelineAccountID)
if err != nil {
return "", err
}
return t.OldestIndexedItemID(ctx)
}
func (m *manager) PrepareXFromTop(ctx context.Context, timelineAccountID string, limit int) error {
t, err := m.getOrCreateTimeline(ctx, timelineAccountID)
if err != nil {
return err
}
return t.PrepareFromTop(ctx, limit)
func (m *manager) GetOldestIndexedID(ctx context.Context, accountID string) string {
return m.getOrCreateTimeline(ctx, accountID).OldestIndexedItemID()
}
func (m *manager) WipeItemFromAllTimelines(ctx context.Context, statusID string) error {
errors := []string{}
m.accountTimelines.Range(func(k interface{}, i interface{}) bool {
t, ok := i.(Timeline)
if !ok {
panic("couldn't parse entry as Timeline, this should never happen so panic")
errors := gtserror.MultiError{}
m.accountTimelines.Range(func(_ any, v any) bool {
if _, err := v.(Timeline).Remove(ctx, statusID); err != nil {
errors.Append(err)
}
if _, err := t.Remove(ctx, statusID); err != nil {
errors = append(errors, err.Error())
}
return true
return true // always continue range
})
var err error
if len(errors) > 0 {
err = fmt.Errorf("one or more errors removing status %s from all timelines: %s", statusID, strings.Join(errors, ";"))
return fmt.Errorf("WipeItemFromAllTimelines: one or more errors wiping status %s: %w", statusID, errors.Combine())
}
return err
return nil
}
func (m *manager) WipeItemsFromAccountID(ctx context.Context, timelineAccountID string, accountID string) error {
t, err := m.getOrCreateTimeline(ctx, timelineAccountID)
if err != nil {
return err
}
_, err = t.RemoveAllBy(ctx, accountID)
_, err := m.getOrCreateTimeline(ctx, timelineAccountID).RemoveAllByOrBoosting(ctx, accountID)
return err
}
func (m *manager) getOrCreateTimeline(ctx context.Context, timelineAccountID string) (Timeline, error) {
var t Timeline
i, ok := m.accountTimelines.Load(timelineAccountID)
if !ok {
var err error
t, err = NewTimeline(ctx, timelineAccountID, m.grabFunction, m.filterFunction, m.prepareFunction, m.skipInsertFunction)
if err != nil {
return nil, err
}
m.accountTimelines.Store(timelineAccountID, t)
} else {
t, ok = i.(Timeline)
if !ok {
panic("couldn't parse entry as Timeline, this should never happen so panic")
}
// getOrCreateTimeline returns a timeline for the given
// accountID. If a timeline does not yet exist in the
// manager's sync.Map, it will be created and stored.
func (m *manager) getOrCreateTimeline(ctx context.Context, accountID string) Timeline {
i, ok := m.accountTimelines.Load(accountID)
if ok {
// Timeline already existed in sync.Map.
return i.(Timeline) //nolint:forcetypeassert
}
return t, nil
// Timeline did not yet exist in sync.Map.
// Create + store it.
timeline := NewTimeline(ctx, accountID, m.grabFunction, m.filterFunction, m.prepareFunction, m.skipInsertFunction)
m.accountTimelines.Store(accountID, timeline)
return timeline
}