share a bunch of the database load code in timeline cache, don't clear timelines on relationship change

This commit is contained in:
kim 2025-04-08 15:19:36 +01:00
commit b04b4f8516
5 changed files with 148 additions and 172 deletions

View file

@ -387,7 +387,8 @@ func (t *StatusTimeline) Preload(
// but still easily manageable memory-wise. // but still easily manageable memory-wise.
recentBoosts := make(map[string]int, t.cut) recentBoosts := make(map[string]int, t.cut)
// Iterate the entire timeline cache and mark repeat boosts. // Iterate timeline ascending (i.e. oldest -> newest), marking
// entry IDs and marking down if boosts have been seen recently.
for idx, value := range t.cache.RangeUnsafe(structr.Asc) { for idx, value := range t.cache.RangeUnsafe(structr.Asc) {
// Store current ID in map. // Store current ID in map.
@ -426,9 +427,7 @@ func (t *StatusTimeline) Load(
// to load status models of already cached entries in the timeline. // to load status models of already cached entries in the timeline.
loadIDs func(ids []string) (statuses []*gtsmodel.Status, err error), loadIDs func(ids []string) (statuses []*gtsmodel.Status, err error),
// filter can be used to perform filtering of returned // filter can be used to perform filtering of returned statuses.
// statuses BEFORE insert into cache. i.e. this will effect
// what actually gets stored in the timeline cache.
filter func(each *gtsmodel.Status) (delete bool, err error), filter func(each *gtsmodel.Status) (delete bool, err error),
// prepareAPI should prepare internal status model to frontend API model. // prepareAPI should prepare internal status model to frontend API model.
@ -439,13 +438,6 @@ func (t *StatusTimeline) Load(
string, // hi string, // hi
error, error,
) { ) {
switch {
case page == nil:
panic("nil page")
case loadPage == nil:
panic("nil load page func")
}
// Get paging details. // Get paging details.
lo := page.Min.Value lo := page.Min.Value
hi := page.Max.Value hi := page.Max.Value
@ -470,12 +462,6 @@ func (t *StatusTimeline) Load(
dir, dir,
) )
// TODO: in time, we should think about (dynamically?) preloading
// the timelines, and any page requests outside of the cached
// range go straight to the database. otherwise there's peculiarities
// that may arise due to concurrent new and old range inserts, also
// requests for old page ranges are almost always going to be one-off.
// We now reset the lo,hi values to // We now reset the lo,hi values to
// represent the lowest and highest // represent the lowest and highest
// index values of loaded statuses. // index values of loaded statuses.
@ -486,8 +472,7 @@ func (t *StatusTimeline) Load(
// response values. // response values.
lo, hi = "", "" lo, hi = "", ""
// Preallocate a slice of up-to-limit API models. var apiStatuses []*apimodel.Status
apiStatuses := make([]*apimodel.Status, 0, limit)
if len(metas) > 0 { if len(metas) > 0 {
// Before we can do any filtering, we need // Before we can do any filtering, we need
@ -504,6 +489,9 @@ func (t *StatusTimeline) Load(
// Update paging parameters used for next database query. // Update paging parameters used for next database query.
nextPageParams(nextPg, metas[len(metas)-1].ID, order) nextPageParams(nextPg, metas[len(metas)-1].ID, order)
// Allocate slice of expected required API models.
apiStatuses = make([]*apimodel.Status, 0, len(metas))
// Prepare frontend API models for // Prepare frontend API models for
// the cached statuses. For now this // the cached statuses. For now this
// also does its own extra filtering. // also does its own extra filtering.
@ -515,74 +503,23 @@ func (t *StatusTimeline) Load(
) )
} }
// Check if we need to call // If no cached timeline statuses
// through to the database. // were found for page, we need to
// call through to the database.
if len(apiStatuses) == 0 { if len(apiStatuses) == 0 {
var err error
// Load a little more than // Pass through to main timeline db load function.
// limit to reduce db calls. apiStatuses, lo, hi, err = loadStatusTimeline(ctx,
nextPg.Limit += 10 nextPg,
// Perform maximum of 5 load
// attempts fetching statuses.
for i := 0; i < 5; i++ {
// Load next timeline statuses.
statuses, err := loadPage(nextPg)
if err != nil {
return nil, "", "", gtserror.Newf("error loading timeline: %w", err)
}
// No more statuses from
// load function = at end.
if len(statuses) == 0 {
break
}
if hi == "" {
// Set hi returned paging
// value if not already set.
hi = statuses[0].ID
}
// Update nextPg cursor parameter for next database query.
nextPageParams(nextPg, statuses[len(statuses)-1].ID, order)
// Perform any filtering on newly loaded statuses.
statuses, err = doStatusFilter(statuses, filter)
if err != nil {
return nil, "", "", gtserror.Newf("error filtering statuses: %w", err)
}
// After filtering no more
// statuses remain, retry.
if len(statuses) == 0 {
continue
}
// Convert to our cache type,
// these will get inserted into
// the cache in prepare() below.
metas := toStatusMeta(nil, statuses)
// Prepare frontend API models for
// the loaded statuses. For now this
// also does its own extra filtering.
apiStatuses = prepareStatuses(ctx,
metas, metas,
prepareAPI,
apiStatuses, apiStatuses,
limit, loadPage,
filter,
prepareAPI,
) )
if err != nil {
// If we have anything, return return nil, "", "", err
// here. Even if below limit.
if len(apiStatuses) > 0 {
// Set returned lo status paging value.
lo = apiStatuses[len(apiStatuses)-1].ID
break
}
} }
} }
@ -608,7 +545,6 @@ func LoadStatusTimeline(
ctx context.Context, ctx context.Context,
page *paging.Page, page *paging.Page,
loadPage func(page *paging.Page) (statuses []*gtsmodel.Status, err error), loadPage func(page *paging.Page) (statuses []*gtsmodel.Status, err error),
loadIDs func(ids []string) (statuses []*gtsmodel.Status, err error),
filter func(each *gtsmodel.Status) (delete bool, err error), filter func(each *gtsmodel.Status) (delete bool, err error),
prepareAPI func(status *gtsmodel.Status) (apiStatus *apimodel.Status, err error), prepareAPI func(status *gtsmodel.Status) (apiStatus *apimodel.Status, err error),
) ( ) (
@ -617,38 +553,83 @@ func LoadStatusTimeline(
string, // hi string, // hi
error, error,
) { ) {
switch {
case page == nil:
panic("nil page")
case loadPage == nil:
panic("nil load page func")
}
// Get paging details.
lo := page.Min.Value
hi := page.Max.Value
limit := page.Limit
order := page.Order()
// Use a copy of current page so // Use a copy of current page so
// we can repeatedly update it. // we can repeatedly update it.
nextPg := new(paging.Page) nextPg := new(paging.Page)
*nextPg = *page *nextPg = *page
nextPg.Min.Value = lo
nextPg.Max.Value = hi
// We now reset the lo,hi values to // Pass through to main timeline db load function.
// represent the lowest and highest apiStatuses, lo, hi, err := loadStatusTimeline(ctx,
// index values of loaded statuses. nextPg,
lo, hi = "", "" nil,
nil,
loadPage,
filter,
prepareAPI,
)
if err != nil {
return nil, "", "", err
}
// Preallocate a slice of up-to-limit API models. if page.Order().Ascending() {
apiStatuses := make([]*apimodel.Status, 0, limit) // The caller always expects the statuses
// to be returned in DESC order, but we
// build the status slice in paging order.
// If paging ASC, we need to reverse the
// returned statuses and paging values.
slices.Reverse(apiStatuses)
lo, hi = hi, lo
}
return apiStatuses, lo, hi, nil
}
// loadStatusTimeline encapsulates most of the main
// timeline-load-from-database logic, allowing both
// the temporary LoadStatusTimeline() function AND
// the main StatusTimeline{}.Load() function to share
// as much logic as possible.
//
// TODO: it may be worth moving this into StatusTimeline{}.Load()
// once the temporary function above has been removed. Or it may
// still be worth keeping *some* database logic separate.
func loadStatusTimeline(
ctx context.Context,
nextPg *paging.Page,
metas []*StatusMeta,
apiStatuses []*apimodel.Status,
loadPage func(page *paging.Page) (statuses []*gtsmodel.Status, err error),
filter func(each *gtsmodel.Status) (delete bool, err error),
prepareAPI func(status *gtsmodel.Status) (apiStatus *apimodel.Status, err error),
) (
[]*apimodel.Status,
string, // lo
string, // hi
error,
) {
if loadPage == nil {
panic("nil load page func")
}
// Lowest and highest ID
// vals of loaded statuses.
var lo, hi string
// Extract paging params.
order := nextPg.Order()
limit := nextPg.Limit
// Load a little more than // Load a little more than
// limit to reduce db calls. // limit to reduce db calls.
nextPg.Limit += 10 nextPg.Limit += 10
// Ensure we have a slice of meta objects to
// use in later preparation of the API models.
metas = xslices.GrowJust(metas[:0], nextPg.Limit)
// Ensure we have a slice of required frontend API models.
apiStatuses = xslices.GrowJust(apiStatuses[:0], nextPg.Limit)
// Perform maximum of 5 load // Perform maximum of 5 load
// attempts fetching statuses. // attempts fetching statuses.
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
@ -689,7 +670,7 @@ func LoadStatusTimeline(
// Convert to our cache type, // Convert to our cache type,
// these will get inserted into // these will get inserted into
// the cache in prepare() below. // the cache in prepare() below.
metas := toStatusMeta(nil, statuses) metas = toStatusMeta(metas[:0], statuses)
// Prepare frontend API models for // Prepare frontend API models for
// the loaded statuses. For now this // the loaded statuses. For now this
@ -711,22 +692,12 @@ func LoadStatusTimeline(
} }
} }
if order.Ascending() {
// The caller always expects the statuses
// to be returned in DESC order, but we
// build the status slice in paging order.
// If paging ASC, we need to reverse the
// returned statuses and paging values.
slices.Reverse(apiStatuses)
lo, hi = hi, lo
}
return apiStatuses, lo, hi, nil return apiStatuses, lo, hi, nil
} }
// InsertOne allows you to insert a single status into the timeline, with optional prepared API model, // InsertOne allows you to insert a single status into the timeline, with optional prepared API model.
// the return value indicates whether the passed status has been boosted recently on the timeline. // The return value indicates whether status should be skipped from streams, e.g. if already boosted recently.
func (t *StatusTimeline) InsertOne(status *gtsmodel.Status, prepared *apimodel.Status) (repeatBoost bool) { func (t *StatusTimeline) InsertOne(status *gtsmodel.Status, prepared *apimodel.Status) (skip bool) {
if status.BoostOfID != "" { if status.BoostOfID != "" {
// Check through top $repeatBoostDepth number of timeline items. // Check through top $repeatBoostDepth number of timeline items.
for i, value := range t.cache.RangeUnsafe(structr.Desc) { for i, value := range t.cache.RangeUnsafe(structr.Desc) {
@ -737,7 +708,7 @@ func (t *StatusTimeline) InsertOne(status *gtsmodel.Status, prepared *apimodel.S
// If inserted status has already been boosted, or original was posted // If inserted status has already been boosted, or original was posted
// within last $repeatBoostDepth, we indicate it as a repeated boost. // within last $repeatBoostDepth, we indicate it as a repeated boost.
if value.ID == status.BoostOfID || value.BoostOfID == status.BoostOfID { if value.ID == status.BoostOfID || value.BoostOfID == status.BoostOfID {
repeatBoost = true skip = true
break break
} }
} }
@ -749,7 +720,7 @@ func (t *StatusTimeline) InsertOne(status *gtsmodel.Status, prepared *apimodel.S
AccountID: status.AccountID, AccountID: status.AccountID,
BoostOfID: status.BoostOfID, BoostOfID: status.BoostOfID,
BoostOfAccountID: status.BoostOfAccountID, BoostOfAccountID: status.BoostOfAccountID,
repeatBoost: repeatBoost, repeatBoost: skip,
loaded: nil, loaded: nil,
prepared: prepared, prepared: prepared,
}) > t.max { }) > t.max {

View file

@ -199,9 +199,6 @@ func (p *Processor) getStatusTimeline(
apiStatuses, lo, hi, err = timelinepkg.LoadStatusTimeline(ctx, apiStatuses, lo, hi, err = timelinepkg.LoadStatusTimeline(ctx,
page, page,
loadPage, loadPage,
func(ids []string) ([]*gtsmodel.Status, error) {
return p.state.DB.GetStatusesByIDs(ctx, ids)
},
filter, filter,
prepare, prepare,
) )

View file

@ -683,13 +683,19 @@ func (p *clientAPI) CreateBlock(ctx context.Context, cMsg *messages.FromClientAP
} }
if block.Account.IsLocal() { if block.Account.IsLocal() {
// Perform timeline invalidation for block origin account. // Remove posts by target from origin's timelines.
p.surface.invalidateTimelinesForAccount(ctx, block.AccountID) p.surface.removeRelationshipFromTimelines(ctx,
block.AccountID,
block.TargetAccountID,
)
} }
if block.TargetAccount.IsLocal() { if block.TargetAccount.IsLocal() {
// Perform timeline invalidation for block target account. // Remove posts by origin from target's timelines.
p.surface.invalidateTimelinesForAccount(ctx, block.TargetAccountID) p.surface.removeRelationshipFromTimelines(ctx,
block.TargetAccountID,
block.AccountID,
)
} }
// TODO: same with notifications? // TODO: same with notifications?
@ -851,13 +857,19 @@ func (p *clientAPI) UndoFollow(ctx context.Context, cMsg *messages.FromClientAPI
} }
if follow.Account.IsLocal() { if follow.Account.IsLocal() {
// Perform timeline invalidation for block origin account. // Remove posts by target from origin's timelines.
p.surface.invalidateTimelinesForAccount(ctx, follow.AccountID) p.surface.removeRelationshipFromTimelines(ctx,
follow.AccountID,
follow.TargetAccountID,
)
} }
if follow.TargetAccount.IsLocal() { if follow.TargetAccount.IsLocal() {
// Perform timeline invalidation for block target account. // Remove posts by origin from target's timelines.
p.surface.invalidateTimelinesForAccount(ctx, follow.TargetAccountID) p.surface.removeRelationshipFromTimelines(ctx,
follow.TargetAccountID,
follow.AccountID,
)
} }
if err := p.federate.UndoFollow(ctx, follow); err != nil { if err := p.federate.UndoFollow(ctx, follow); err != nil {
@ -873,16 +885,6 @@ func (p *clientAPI) UndoBlock(ctx context.Context, cMsg *messages.FromClientAPI)
return gtserror.Newf("%T not parseable as *gtsmodel.Block", cMsg.GTSModel) return gtserror.Newf("%T not parseable as *gtsmodel.Block", cMsg.GTSModel)
} }
if block.Account.IsLocal() {
// Perform timeline invalidation for block origin account.
p.surface.invalidateTimelinesForAccount(ctx, block.AccountID)
}
if block.TargetAccount.IsLocal() {
// Perform timeline invalidation for block target account.
p.surface.invalidateTimelinesForAccount(ctx, block.TargetAccountID)
}
if err := p.federate.UndoBlock(ctx, block); err != nil { if err := p.federate.UndoBlock(ctx, block); err != nil {
log.Errorf(ctx, "error federating block undo: %v", err) log.Errorf(ctx, "error federating block undo: %v", err)
} }

View file

@ -715,13 +715,19 @@ func (p *fediAPI) CreateBlock(ctx context.Context, fMsg *messages.FromFediAPI) e
} }
if block.Account.IsLocal() { if block.Account.IsLocal() {
// Perform timeline invalidation for block origin account. // Remove posts by target from origin's timelines.
p.surface.invalidateTimelinesForAccount(ctx, block.AccountID) p.surface.removeRelationshipFromTimelines(ctx,
block.AccountID,
block.TargetAccountID,
)
} }
if block.TargetAccount.IsLocal() { if block.TargetAccount.IsLocal() {
// Perform timeline invalidation for block target account. // Remove posts by origin from target's timelines.
p.surface.invalidateTimelinesForAccount(ctx, block.TargetAccountID) p.surface.removeRelationshipFromTimelines(ctx,
block.TargetAccountID,
block.AccountID,
)
} }
// Remove any follows that existed between blocker + blockee. // Remove any follows that existed between blocker + blockee.
@ -1202,33 +1208,31 @@ func (p *fediAPI) UndoFollow(ctx context.Context, fMsg *messages.FromFediAPI) er
} }
if follow.Account.IsLocal() { if follow.Account.IsLocal() {
// Perform timeline invalidation for block origin account. // Remove posts by target from origin's timelines.
p.surface.invalidateTimelinesForAccount(ctx, follow.AccountID) p.surface.removeRelationshipFromTimelines(ctx,
follow.AccountID,
follow.TargetAccountID,
)
} }
if follow.TargetAccount.IsLocal() { if follow.TargetAccount.IsLocal() {
// Perform timeline invalidation for block target account. // Remove posts by origin from target's timelines.
p.surface.invalidateTimelinesForAccount(ctx, follow.TargetAccountID) p.surface.removeRelationshipFromTimelines(ctx,
follow.TargetAccountID,
follow.AccountID,
)
} }
return nil return nil
} }
func (p *fediAPI) UndoBlock(ctx context.Context, fMsg *messages.FromFediAPI) error { func (p *fediAPI) UndoBlock(ctx context.Context, fMsg *messages.FromFediAPI) error {
block, ok := fMsg.GTSModel.(*gtsmodel.Block) _, ok := fMsg.GTSModel.(*gtsmodel.Block)
if !ok { if !ok {
return gtserror.Newf("%T not parseable as *gtsmodel.Block", fMsg.GTSModel) return gtserror.Newf("%T not parseable as *gtsmodel.Block", fMsg.GTSModel)
} }
if block.Account.IsLocal() { // TODO: any required changes
// Perform timeline invalidation for block origin account.
p.surface.invalidateTimelinesForAccount(ctx, block.AccountID)
}
if block.TargetAccount.IsLocal() {
// Perform timeline invalidation for block target account.
p.surface.invalidateTimelinesForAccount(ctx, block.TargetAccountID)
}
return nil return nil
} }

View file

@ -826,21 +826,23 @@ func (s *Surface) removeTimelineEntriesByAccount(accountID string) {
s.State.Caches.Timelines.List.RemoveByAccountIDs(accountID) s.State.Caches.Timelines.List.RemoveByAccountIDs(accountID)
} }
// invalidateTimelinesForAccount invalidates all timeline caches stored for given account ID. func (s *Surface) removeRelationshipFromTimelines(ctx context.Context, timelineAccountID string, targetAccountID string) {
func (s *Surface) invalidateTimelinesForAccount(ctx context.Context, accountID string) { // Remove all statuses by target account
// from given account's home timeline.
// There's a lot of visibility changes to caclculate for any s.State.Caches.Timelines.Home.
// relationship change, so just clear all account's timelines. MustGet(timelineAccountID).
s.State.Caches.Timelines.Home.Clear(accountID) RemoveByAccountIDs(targetAccountID)
// Get the IDs of all the lists owned by the given account ID. // Get the IDs of all the lists owned by the given account ID.
listIDs, err := s.State.DB.GetListIDsByAccountID(ctx, accountID) listIDs, err := s.State.DB.GetListIDsByAccountID(ctx, timelineAccountID)
if err != nil { if err != nil {
log.Errorf(ctx, "error getting lists for account %s: %v", accountID, err) log.Errorf(ctx, "error getting lists for account %s: %v", timelineAccountID, err)
} }
// Clear list timelines of account.
for _, listID := range listIDs { for _, listID := range listIDs {
s.State.Caches.Timelines.List.Clear(listID) // Remove all statuses by target account
// from given account's list timelines.
s.State.Caches.Timelines.List.MustGet(listID).
RemoveByAccountIDs(targetAccountID)
} }
} }