[feature] Add List functionality (#1802)

* start working on lists

* further list work

* test list db functions nicely

* more work on lists

* peepoopeepoo

* poke

* start list timeline func

* we're getting there lads

* couldn't be me working on stuff... could it?

* hook up handlers

* fiddling

* weeee

* woah

* screaming, pissing

* fix streaming being a whiny baby

* lint, small test fix, swagger

* tidying up, testing

* fucked! by the linter

* move timelines to state like a boss

* add timeline start to tests using state

* invalidate lists
This commit is contained in:
tobi 2023-05-25 10:37:38 +02:00 committed by GitHub
commit f5c004d67d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
123 changed files with 5654 additions and 970 deletions

View file

@ -32,10 +32,10 @@ const (
pruneLengthPrepared = 50
)
// Manager abstracts functions for creating timelines for multiple accounts, and adding, removing, and fetching entries from those timelines.
// Manager abstracts functions for creating multiple timelines, and adding, removing, and fetching entries from those timelines.
//
// By the time a timelineable hits the manager interface, it should already have been filtered and it should be established that the item indeed
// belongs in the timeline of the given account ID.
// belongs in the given timeline.
//
// The manager makes a distinction between *indexed* items and *prepared* items.
//
@ -45,33 +45,36 @@ const (
// Prepared items consist of the item's database ID, the time it was created, AND the apimodel representation of that item, for quick serialization.
// Prepared items of course take up more memory than indexed items, so they should be regularly pruned if they're not being actively served.
type Manager interface {
// IngestOne takes one timelineable and indexes it into the timeline for the given account ID, and then immediately prepares it for serving.
// IngestOne takes one timelineable and indexes it into the given timeline, and then immediately prepares it for serving.
// This is useful in cases where we know the item will need to be shown at the top of a user's timeline immediately (eg., a new status is created).
//
// It should already be established before calling this function that the item actually belongs in the timeline!
//
// The returned bool indicates whether the item was actually put in the timeline. This could be false in cases where
// a status is a boost, but a boost of the original status or the status itself already exists recently in the timeline.
IngestOne(ctx context.Context, accountID string, item Timelineable) (bool, error)
IngestOne(ctx context.Context, timelineID string, item Timelineable) (bool, error)
// GetTimeline returns limit n amount of prepared entries from the timeline of the given account ID, in descending chronological order.
GetTimeline(ctx context.Context, accountID string, maxID string, sinceID string, minID string, limit int, local bool) ([]Preparable, error)
// GetTimeline returns limit n amount of prepared entries from the given timeline, in descending chronological order.
GetTimeline(ctx context.Context, timelineID string, maxID string, sinceID string, minID string, limit int, local bool) ([]Preparable, error)
// GetIndexedLength returns the amount of items that have been indexed for the given account ID.
GetIndexedLength(ctx context.Context, accountID string) int
GetIndexedLength(ctx context.Context, timelineID string) int
// GetOldestIndexedID returns the id ID for the oldest item that we have indexed for the given account.
// GetOldestIndexedID returns the id ID for the oldest item that we have indexed for the given timeline.
// Will be an empty string if nothing is (yet) indexed.
GetOldestIndexedID(ctx context.Context, accountID string) string
GetOldestIndexedID(ctx context.Context, timelineID string) string
// Remove removes one item from the timeline of the given timelineAccountID
Remove(ctx context.Context, accountID string, itemID string) (int, error)
// Remove removes one item from the given timeline.
Remove(ctx context.Context, timelineID string, itemID string) (int, error)
// RemoveTimeline completely removes one timeline.
RemoveTimeline(ctx context.Context, timelineID string) error
// WipeItemFromAllTimelines removes one item from the index and prepared items of all timelines
WipeItemFromAllTimelines(ctx context.Context, itemID string) error
// WipeStatusesFromAccountID removes all items by the given accountID from the timelineAccountID's timelines.
WipeItemsFromAccountID(ctx context.Context, timelineAccountID string, accountID string) error
// WipeStatusesFromAccountID removes all items by the given accountID from the given timeline.
WipeItemsFromAccountID(ctx context.Context, timelineID string, accountID string) error
// Start starts hourly cleanup jobs for this timeline manager.
Start() error
@ -83,7 +86,7 @@ type Manager interface {
// NewManager returns a new timeline manager.
func NewManager(grabFunction GrabFunction, filterFunction FilterFunction, prepareFunction PrepareFunction, skipInsertFunction SkipInsertFunction) Manager {
return &manager{
accountTimelines: sync.Map{},
timelines: sync.Map{},
grabFunction: grabFunction,
filterFunction: filterFunction,
prepareFunction: prepareFunction,
@ -92,7 +95,7 @@ func NewManager(grabFunction GrabFunction, filterFunction FilterFunction, prepar
}
type manager struct {
accountTimelines sync.Map
timelines sync.Map
grabFunction GrabFunction
filterFunction FilterFunction
prepareFunction PrepareFunction
@ -127,14 +130,14 @@ func (m *manager) Start() error {
}
if amountPruned := timeline.Prune(pruneLengthPrepared, pruneLengthIndexed); amountPruned > 0 {
log.WithField("accountID", timeline.AccountID()).Infof("pruned %d indexed and prepared items from timeline", amountPruned)
log.WithField("accountID", timeline.TimelineID()).Infof("pruned %d indexed and prepared items from timeline", amountPruned)
}
return true
}
// Execute the function for each timeline.
m.accountTimelines.Range(f)
m.timelines.Range(f)
}
}()
@ -145,8 +148,8 @@ func (m *manager) Stop() error {
return nil
}
func (m *manager) IngestOne(ctx context.Context, accountID string, item Timelineable) (bool, error) {
return m.getOrCreateTimeline(ctx, accountID).IndexAndPrepareOne(
func (m *manager) IngestOne(ctx context.Context, timelineID string, item Timelineable) (bool, error) {
return m.getOrCreateTimeline(ctx, timelineID).IndexAndPrepareOne(
ctx,
item.GetID(),
item.GetBoostOfID(),
@ -155,27 +158,32 @@ func (m *manager) IngestOne(ctx context.Context, accountID string, item Timeline
)
}
func (m *manager) Remove(ctx context.Context, accountID string, itemID string) (int, error) {
return m.getOrCreateTimeline(ctx, accountID).Remove(ctx, itemID)
func (m *manager) Remove(ctx context.Context, timelineID string, itemID string) (int, error) {
return m.getOrCreateTimeline(ctx, timelineID).Remove(ctx, itemID)
}
func (m *manager) GetTimeline(ctx context.Context, accountID string, maxID string, sinceID string, minID string, limit int, local bool) ([]Preparable, error) {
return m.getOrCreateTimeline(ctx, accountID).Get(ctx, limit, maxID, sinceID, minID, true)
func (m *manager) RemoveTimeline(ctx context.Context, timelineID string) error {
m.timelines.Delete(timelineID)
return nil
}
func (m *manager) GetIndexedLength(ctx context.Context, accountID string) int {
return m.getOrCreateTimeline(ctx, accountID).Len()
func (m *manager) GetTimeline(ctx context.Context, timelineID string, maxID string, sinceID string, minID string, limit int, local bool) ([]Preparable, error) {
return m.getOrCreateTimeline(ctx, timelineID).Get(ctx, limit, maxID, sinceID, minID, true)
}
func (m *manager) GetOldestIndexedID(ctx context.Context, accountID string) string {
return m.getOrCreateTimeline(ctx, accountID).OldestIndexedItemID()
func (m *manager) GetIndexedLength(ctx context.Context, timelineID string) int {
return m.getOrCreateTimeline(ctx, timelineID).Len()
}
func (m *manager) WipeItemFromAllTimelines(ctx context.Context, statusID string) error {
func (m *manager) GetOldestIndexedID(ctx context.Context, timelineID string) string {
return m.getOrCreateTimeline(ctx, timelineID).OldestIndexedItemID()
}
func (m *manager) WipeItemFromAllTimelines(ctx context.Context, itemID string) error {
errors := gtserror.MultiError{}
m.accountTimelines.Range(func(_ any, v any) bool {
if _, err := v.(Timeline).Remove(ctx, statusID); err != nil {
m.timelines.Range(func(_ any, v any) bool {
if _, err := v.(Timeline).Remove(ctx, itemID); err != nil {
errors.Append(err)
}
@ -183,22 +191,21 @@ func (m *manager) WipeItemFromAllTimelines(ctx context.Context, statusID string)
})
if len(errors) > 0 {
return fmt.Errorf("WipeItemFromAllTimelines: one or more errors wiping status %s: %w", statusID, errors.Combine())
return fmt.Errorf("WipeItemFromAllTimelines: one or more errors wiping status %s: %w", itemID, errors.Combine())
}
return nil
}
func (m *manager) WipeItemsFromAccountID(ctx context.Context, timelineAccountID string, accountID string) error {
_, err := m.getOrCreateTimeline(ctx, timelineAccountID).RemoveAllByOrBoosting(ctx, accountID)
func (m *manager) WipeItemsFromAccountID(ctx context.Context, timelineID string, accountID string) error {
_, err := m.getOrCreateTimeline(ctx, timelineID).RemoveAllByOrBoosting(ctx, accountID)
return err
}
// getOrCreateTimeline returns a timeline for the given
// accountID. If a timeline does not yet exist in the
// manager's sync.Map, it will be created and stored.
func (m *manager) getOrCreateTimeline(ctx context.Context, accountID string) Timeline {
i, ok := m.accountTimelines.Load(accountID)
// getOrCreateTimeline returns a timeline with the given id,
// creating a new timeline with that id if necessary.
func (m *manager) getOrCreateTimeline(ctx context.Context, timelineID string) Timeline {
i, ok := m.timelines.Load(timelineID)
if ok {
// Timeline already existed in sync.Map.
return i.(Timeline) //nolint:forcetypeassert
@ -206,8 +213,8 @@ func (m *manager) getOrCreateTimeline(ctx context.Context, accountID string) Tim
// Timeline did not yet exist in sync.Map.
// Create + store it.
timeline := NewTimeline(ctx, accountID, m.grabFunction, m.filterFunction, m.prepareFunction, m.skipInsertFunction)
m.accountTimelines.Store(accountID, timeline)
timeline := NewTimeline(ctx, timelineID, m.grabFunction, m.filterFunction, m.prepareFunction, m.skipInsertFunction)
m.timelines.Store(timelineID, timeline)
return timeline
}