[feature] Rework timeline code to make it useful for more than just statuses (#373)

* add preparable and timelineable interfaces

* initialize timeline manager within the processor

* generic renaming

* move status-specific timeline logic into the processor

* refactor timeline to make it useful for more than statuses
This commit is contained in:
tobi 2022-02-05 12:47:38 +01:00 committed by GitHub
commit 1b36e85840
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
26 changed files with 801 additions and 566 deletions

View file

@ -23,173 +23,166 @@ import (
"context"
"errors"
"fmt"
"time"
"github.com/sirupsen/logrus"
"github.com/superseriousbusiness/gotosocial/internal/db"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
)
func (t *timeline) IndexBefore(ctx context.Context, statusID string, include bool, amount int) error {
func (t *timeline) IndexBefore(ctx context.Context, itemID string, amount int) error {
l := logrus.WithFields(logrus.Fields{
"func": "IndexBefore",
"amount": amount,
})
// lazily initialize index if it hasn't been done already
if t.postIndex.data == nil {
t.postIndex.data = &list.List{}
t.postIndex.data.Init()
if t.itemIndex.data == nil {
t.itemIndex.data = &list.List{}
t.itemIndex.data.Init()
}
filtered := []*gtsmodel.Status{}
offsetStatus := statusID
toIndex := []Timelineable{}
offsetID := itemID
if include {
// if we have the status with given statusID in the database, include it in the results set as well
s := &gtsmodel.Status{}
if err := t.db.GetByID(ctx, statusID, s); err == nil {
filtered = append(filtered, s)
}
}
i := 0
l.Trace("entering grabloop")
grabloop:
for ; len(filtered) < amount && i < 5; i++ { // try the grabloop 5 times only
statuses, err := t.db.GetHomeTimeline(ctx, t.accountID, "", "", offsetStatus, amount, false)
for i := 0; len(toIndex) < amount && i < 5; i++ { // try the grabloop 5 times only
// first grab items using the caller-provided grab function
l.Trace("grabbing...")
items, stop, err := t.grabFunction(ctx, t.accountID, "", "", offsetID, amount)
if err != nil {
if err == db.ErrNoEntries {
break grabloop // we just don't have enough statuses left in the db so index what we've got and then bail
}
return fmt.Errorf("IndexBefore: error getting statuses from db: %s", err)
return err
}
if stop {
break grabloop
}
for _, s := range statuses {
timelineable, err := t.filter.StatusHometimelineable(ctx, s, t.account)
l.Trace("filtering...")
// now filter each item using the caller-provided filter function
for _, item := range items {
shouldIndex, err := t.filterFunction(ctx, t.accountID, item)
if err != nil {
continue
return err
}
if timelineable {
filtered = append(filtered, s)
if shouldIndex {
toIndex = append(toIndex, item)
}
offsetStatus = s.ID
offsetID = item.GetID()
}
}
l.Trace("left grabloop")
for _, s := range filtered {
if _, err := t.IndexOne(ctx, s.CreatedAt, s.ID, s.BoostOfID, s.AccountID, s.BoostOfAccountID); err != nil {
return fmt.Errorf("IndexBefore: error indexing status with id %s: %s", s.ID, err)
// index the items we got
for _, s := range toIndex {
if _, err := t.IndexOne(ctx, s.GetID(), s.GetBoostOfID(), s.GetAccountID(), s.GetBoostOfAccountID()); err != nil {
return fmt.Errorf("IndexBehind: error indexing item with id %s: %s", s.GetID(), err)
}
}
return nil
}
func (t *timeline) IndexBehind(ctx context.Context, statusID string, include bool, amount int) error {
func (t *timeline) IndexBehind(ctx context.Context, itemID string, amount int) error {
l := logrus.WithFields(logrus.Fields{
"func": "IndexBehind",
"include": include,
"amount": amount,
"func": "IndexBehind",
"amount": amount,
})
// lazily initialize index if it hasn't been done already
if t.postIndex.data == nil {
t.postIndex.data = &list.List{}
t.postIndex.data.Init()
if t.itemIndex.data == nil {
t.itemIndex.data = &list.List{}
t.itemIndex.data.Init()
}
// If we're already indexedBehind given statusID by the required amount, we can return nil.
// First find position of statusID (or as near as possible).
// If we're already indexedBehind given itemID by the required amount, we can return nil.
// First find position of itemID (or as near as possible).
var position int
positionLoop:
for e := t.postIndex.data.Front(); e != nil; e = e.Next() {
entry, ok := e.Value.(*postIndexEntry)
for e := t.itemIndex.data.Front(); e != nil; e = e.Next() {
entry, ok := e.Value.(*itemIndexEntry)
if !ok {
return errors.New("IndexBehind: could not parse e as a postIndexEntry")
return errors.New("IndexBehind: could not parse e as an itemIndexEntry")
}
if entry.statusID <= statusID {
if entry.itemID <= itemID {
// we've found it
break positionLoop
}
position++
}
// now check if the length of indexed posts exceeds the amount of posts required (position of statusID, plus amount of posts requested after that)
if t.postIndex.data.Len() > position+amount {
// now check if the length of indexed items exceeds the amount of items required (position of itemID, plus amount of posts requested after that)
if t.itemIndex.data.Len() > position+amount {
// we have enough indexed behind already to satisfy amount, so don't need to make db calls
l.Trace("returning nil since we already have enough posts indexed")
l.Trace("returning nil since we already have enough items indexed")
return nil
}
filtered := []*gtsmodel.Status{}
offsetStatus := statusID
toIndex := []Timelineable{}
offsetID := itemID
if include {
// if we have the status with given statusID in the database, include it in the results set as well
s := &gtsmodel.Status{}
if err := t.db.GetByID(ctx, statusID, s); err == nil {
filtered = append(filtered, s)
}
}
i := 0
l.Trace("entering grabloop")
grabloop:
for ; len(filtered) < amount && i < 5; i++ { // try the grabloop 5 times only
l.Tracef("entering grabloop; i is %d; len(filtered) is %d", i, len(filtered))
statuses, err := t.db.GetHomeTimeline(ctx, t.accountID, offsetStatus, "", "", amount, false)
for i := 0; len(toIndex) < amount && i < 5; i++ { // try the grabloop 5 times only
// first grab items using the caller-provided grab function
l.Trace("grabbing...")
items, stop, err := t.grabFunction(ctx, t.accountID, offsetID, "", "", amount)
if err != nil {
if err == db.ErrNoEntries {
break grabloop // we just don't have enough statuses left in the db so index what we've got and then bail
}
return fmt.Errorf("IndexBehind: error getting statuses from db: %s", err)
return err
}
if stop {
break grabloop
}
l.Tracef("got %d statuses", len(statuses))
for _, s := range statuses {
timelineable, err := t.filter.StatusHometimelineable(ctx, s, t.account)
l.Trace("filtering...")
// now filter each item using the caller-provided filter function
for _, item := range items {
shouldIndex, err := t.filterFunction(ctx, t.accountID, item)
if err != nil {
l.Tracef("status was not hometimelineable: %s", err)
continue
return err
}
if timelineable {
filtered = append(filtered, s)
if shouldIndex {
toIndex = append(toIndex, item)
}
offsetStatus = s.ID
offsetID = item.GetID()
}
}
l.Trace("left grabloop")
for _, s := range filtered {
if _, err := t.IndexOne(ctx, s.CreatedAt, s.ID, s.BoostOfID, s.AccountID, s.BoostOfAccountID); err != nil {
return fmt.Errorf("IndexBehind: error indexing status with id %s: %s", s.ID, err)
// index the items we got
for _, s := range toIndex {
if _, err := t.IndexOne(ctx, s.GetID(), s.GetBoostOfID(), s.GetAccountID(), s.GetBoostOfAccountID()); err != nil {
return fmt.Errorf("IndexBehind: error indexing item with id %s: %s", s.GetID(), err)
}
}
l.Trace("exiting function")
return nil
}
func (t *timeline) IndexOne(ctx context.Context, statusCreatedAt time.Time, statusID string, boostOfID string, accountID string, boostOfAccountID string) (bool, error) {
func (t *timeline) IndexOne(ctx context.Context, itemID string, boostOfID string, accountID string, boostOfAccountID string) (bool, error) {
t.Lock()
defer t.Unlock()
postIndexEntry := &postIndexEntry{
statusID: statusID,
postIndexEntry := &itemIndexEntry{
itemID: itemID,
boostOfID: boostOfID,
accountID: accountID,
boostOfAccountID: boostOfAccountID,
}
return t.postIndex.insertIndexed(postIndexEntry)
return t.itemIndex.insertIndexed(ctx, postIndexEntry)
}
func (t *timeline) IndexAndPrepareOne(ctx context.Context, statusCreatedAt time.Time, statusID string, boostOfID string, accountID string, boostOfAccountID string) (bool, error) {
func (t *timeline) IndexAndPrepareOne(ctx context.Context, statusID string, boostOfID string, accountID string, boostOfAccountID string) (bool, error) {
t.Lock()
defer t.Unlock()
postIndexEntry := &postIndexEntry{
statusID: statusID,
postIndexEntry := &itemIndexEntry{
itemID: statusID,
boostOfID: boostOfID,
accountID: accountID,
boostOfAccountID: boostOfAccountID,
}
inserted, err := t.postIndex.insertIndexed(postIndexEntry)
inserted, err := t.itemIndex.insertIndexed(ctx, postIndexEntry)
if err != nil {
return inserted, fmt.Errorf("IndexAndPrepareOne: error inserting indexed: %s", err)
}
@ -203,32 +196,32 @@ func (t *timeline) IndexAndPrepareOne(ctx context.Context, statusCreatedAt time.
return inserted, nil
}
func (t *timeline) OldestIndexedPostID(ctx context.Context) (string, error) {
func (t *timeline) OldestIndexedItemID(ctx context.Context) (string, error) {
var id string
if t.postIndex == nil || t.postIndex.data == nil || t.postIndex.data.Back() == nil {
if t.itemIndex == nil || t.itemIndex.data == nil || t.itemIndex.data.Back() == nil {
// return an empty string if postindex hasn't been initialized yet
return id, nil
}
e := t.postIndex.data.Back()
entry, ok := e.Value.(*postIndexEntry)
e := t.itemIndex.data.Back()
entry, ok := e.Value.(*itemIndexEntry)
if !ok {
return id, errors.New("OldestIndexedPostID: could not parse e as a postIndexEntry")
return id, errors.New("OldestIndexedItemID: could not parse e as itemIndexEntry")
}
return entry.statusID, nil
return entry.itemID, nil
}
func (t *timeline) NewestIndexedPostID(ctx context.Context) (string, error) {
func (t *timeline) NewestIndexedItemID(ctx context.Context) (string, error) {
var id string
if t.postIndex == nil || t.postIndex.data == nil || t.postIndex.data.Front() == nil {
if t.itemIndex == nil || t.itemIndex.data == nil || t.itemIndex.data.Front() == nil {
// return an empty string if postindex hasn't been initialized yet
return id, nil
}
e := t.postIndex.data.Front()
entry, ok := e.Value.(*postIndexEntry)
e := t.itemIndex.data.Front()
entry, ok := e.Value.(*itemIndexEntry)
if !ok {
return id, errors.New("NewestIndexedPostID: could not parse e as a postIndexEntry")
return id, errors.New("NewestIndexedItemID: could not parse e as itemIndexEntry")
}
return entry.statusID, nil
return entry.itemID, nil
}