simplify timeline cache loading, fix lo/hi returns, fix timeline invalidation side-effects missing for some federated actions

This commit is contained in:
kim 2025-04-02 17:25:33 +01:00
commit 53817c23fd
12 changed files with 311 additions and 279 deletions

View file

@ -338,15 +338,10 @@ func (t *StatusTimeline) Load(
// to load status models of already cached entries in the timeline. // to load status models of already cached entries in the timeline.
loadIDs func(ids []string) (statuses []*gtsmodel.Status, err error), loadIDs func(ids []string) (statuses []*gtsmodel.Status, err error),
// preFilter can be used to perform filtering of returned // filter can be used to perform filtering of returned
// statuses BEFORE insert into cache. i.e. this will effect // statuses BEFORE insert into cache. i.e. this will effect
// what actually gets stored in the timeline cache. // what actually gets stored in the timeline cache.
preFilter func(each *gtsmodel.Status) (delete bool, err error), filter func(each *gtsmodel.Status) (delete bool, err error),
// postFilter can be used to perform filtering of returned
// statuses AFTER insert into cache. i.e. this will not effect
// what actually gets stored in the timeline cache.
postFilter func(each *gtsmodel.Status) (delete bool, err error),
// prepareAPI should prepare internal status model to frontend API model. // prepareAPI should prepare internal status model to frontend API model.
prepareAPI func(status *gtsmodel.Status) (apiStatus *apimodel.Status, err error), prepareAPI func(status *gtsmodel.Status) (apiStatus *apimodel.Status, err error),
@ -392,15 +387,20 @@ func (t *StatusTimeline) Load(
dir, dir,
) )
if len(metas) > 0 { // We now reset the lo,hi values to
// We ALWAYS return and work on // represent the lowest and highest
// statuses in DESC order, but the // index values of loaded statuses.
// timeline cache returns statuses //
// in the *requested* order. // We continually update these while
if dir == structr.Asc { // building up statuses to return, for
slices.Reverse(metas) // caller to build next / prev page
} // response values.
lo, hi = "", ""
// Preallocate a slice of up-to-limit API models.
apiStatuses := make([]*apimodel.Status, 0, limit)
if len(metas) > 0 {
// Before we can do any filtering, we need // Before we can do any filtering, we need
// to load status models for cached entries. // to load status models for cached entries.
err := loadStatuses(metas, loadIDs) err := loadStatuses(metas, loadIDs)
@ -408,42 +408,38 @@ func (t *StatusTimeline) Load(
return nil, "", "", gtserror.Newf("error loading statuses: %w", err) return nil, "", "", gtserror.Newf("error loading statuses: %w", err)
} }
// Update paging values // Set initial lo, hi values.
// based on returned data. hi = metas[len(metas)-1].ID
nextPageParams(nextPg, lo = metas[0].ID
metas[len(metas)-1].ID,
metas[0].ID, // Update paging parameters used for next database query.
order, nextPageParams(nextPg, metas[len(metas)-1].ID, order)
// Prepare frontend API models for
// the cached statuses. For now this
// also does its own extra filtering.
apiStatuses = prepareStatuses(ctx,
metas,
prepareAPI,
apiStatuses,
limit,
) )
// Before any further loading,
// store current lo, hi values
// as possible lo, hi returns.
lo = metas[len(metas)-1].ID
hi = metas[0].ID
// Drop all entries we failed to load statuses for.
metas = slices.DeleteFunc(metas, (*StatusMeta).isNotLoaded)
// Perform post-filtering on cached status entries.
metas, err = doStatusPostFilter(metas, postFilter)
if err != nil {
return nil, "", "", gtserror.Newf("error post-filtering statuses: %w", err)
}
} }
// Track all newly loaded status entries // Track all newly loaded status entries
// AFTER 'preFilter', but before 'postFilter', // after filtering for insert into cache.
// to later insert into timeline cache.
var justLoaded []*StatusMeta var justLoaded []*StatusMeta
// Check whether loaded enough from cache. // Check whether loaded enough from cache.
if need := limit - len(metas); need > 0 { if need := limit - len(apiStatuses); need > 0 {
// Perform a maximum of 5 // Load a little more than
// load attempts fetching // limit to reduce db calls.
// statuses to reach limit. nextPg.Limit += 10
for i := 0; i < 5; i++ {
// Perform maximum of 10 load
// attempts fetching statuses.
for i := 0; i < 10; i++ {
// Load next timeline statuses. // Load next timeline statuses.
statuses, err := loadPage(nextPg) statuses, err := loadPage(nextPg)
@ -457,18 +453,19 @@ func (t *StatusTimeline) Load(
break break
} }
// Update paging values if lo == "" {
// based on returned data. // Set min returned paging
nextPageParams(nextPg, // value if not already set.
statuses[len(statuses)-1].ID, lo = statuses[0].ID
statuses[0].ID, }
order,
)
// Perform any pre-filtering on newly loaded statuses. // Update nextPg cursor parameter for next database query.
statuses, err = doStatusPreFilter(statuses, preFilter) nextPageParams(nextPg, statuses[len(statuses)-1].ID, order)
// Perform any filtering on newly loaded statuses.
statuses, err = doStatusFilter(statuses, filter)
if err != nil { if err != nil {
return nil, "", "", gtserror.Newf("error pre-filtering statuses: %w", err) return nil, "", "", gtserror.Newf("error filtering statuses: %w", err)
} }
// After filtering no more // After filtering no more
@ -477,71 +474,46 @@ func (t *StatusTimeline) Load(
continue continue
} }
// On each iteration, since statuses
// returned will always be in DESC order,
// iteratively update the lo paging value
// that we return for next / prev pages.
lo = statuses[len(statuses)-1].ID
// Convert to our cache type, // Convert to our cache type,
// these will get inserted into // these will get inserted into
// the cache in prepare() below. // the cache in prepare() below.
uncached := toStatusMeta(statuses) metas := toStatusMeta(statuses)
// Before any filtering append to newly loaded. // Append to newly loaded for later insert.
justLoaded = append(justLoaded, uncached...) justLoaded = append(justLoaded, metas...)
// Perform any post-filtering on loaded timeline entries. // Prepare frontend API models for
filtered, err := doStatusPostFilter(uncached, postFilter) // the loaded statuses. For now this
if err != nil { // also does its own extra filtering.
return nil, "", "", gtserror.Newf("error post-filtering statuses: %w", err) apiStatuses = prepareStatuses(ctx,
} metas,
prepareAPI,
apiStatuses,
limit,
)
// Append newly filtered meta entries. // If we have anything, return
metas = append(metas, filtered...) // here. Even if below limit.
if len(apiStatuses) > 0 {
// Check if we reached // Set returned hi status paging value.
// requested page limit. hi = apiStatuses[len(apiStatuses)-1].ID
if len(metas) >= limit {
break break
} }
} }
} }
// Prepare frontend API models. if order.Ascending() {
var apiStatuses []*apimodel.Status // The caller always expects the statuses
if len(metas) > 0 { // to be returned in DESC order, but we
switch { // build the status slice in paging order.
case len(metas) <= limit: // If paging ASC, we need to reverse the
// We have under // returned statuses and paging values.
// expected limit. slices.Reverse(apiStatuses)
lo, hi = hi, lo
case order.Ascending():
// Ascending order was requested
// and we have more than limit, so
// trim extra metadata from end.
metas = metas[:limit]
default: /* i.e. descending */
// Descending order was requested
// and we have more than limit, so
// trim extra metadata from start.
metas = metas[len(metas)-limit:]
}
// Using meta and funcs, prepare frontend API models.
apiStatuses = prepareStatuses(ctx, metas, prepareAPI)
} }
if len(justLoaded) > 0 { if len(justLoaded) > 0 {
if hi == "" {
// Check whether a hi value was set
// from an initial load of cached entries,
// if not we set the returned hi paging
// value from first in loaded statuses.
hi = justLoaded[0].ID
}
// Even if we don't return them, insert // Even if we don't return them, insert
// the excess (post-filtered) into cache. // the excess (post-filtered) into cache.
t.cache.Insert(justLoaded...) t.cache.Insert(justLoaded...)
@ -729,6 +701,8 @@ func prepareStatuses(
ctx context.Context, ctx context.Context,
meta []*StatusMeta, meta []*StatusMeta,
prepareAPI func(*gtsmodel.Status) (*apimodel.Status, error), prepareAPI func(*gtsmodel.Status) (*apimodel.Status, error),
apiStatuses []*apimodel.Status,
limit int,
) []*apimodel.Status { ) []*apimodel.Status {
switch { //nolint:gocritic switch { //nolint:gocritic
case prepareAPI == nil: case prepareAPI == nil:
@ -737,9 +711,14 @@ func prepareStatuses(
// Iterate the given StatusMeta objects for pre-prepared // Iterate the given StatusMeta objects for pre-prepared
// frontend models, otherwise attempting to prepare them. // frontend models, otherwise attempting to prepare them.
apiStatuses := make([]*apimodel.Status, 0, len(meta))
for _, meta := range meta { for _, meta := range meta {
// Check if we have prepared enough
// API statuses for caller to return.
if len(apiStatuses) >= limit {
break
}
if meta.loaded == nil { if meta.loaded == nil {
// We failed loading this // We failed loading this
// status, skip preparing. // status, skip preparing.
@ -758,10 +737,6 @@ func prepareStatuses(
} }
if meta.prepared != nil { if meta.prepared != nil {
// TODO: we won't need nil check when mutes
// / filters are moved to appropriate funcs.
//
// Add the prepared API model to return slice.
apiStatuses = append(apiStatuses, meta.prepared) apiStatuses = append(apiStatuses, meta.prepared)
} }
} }
@ -823,9 +798,9 @@ func toStatusMeta(statuses []*gtsmodel.Status) []*StatusMeta {
}) })
} }
// doStatusPreFilter performs given filter function on provided statuses, // doStatusFilter performs given filter function on provided statuses,
// returning early if an error is returned. returns filtered statuses. // returning early if an error is returned. returns filtered statuses.
func doStatusPreFilter(statuses []*gtsmodel.Status, filter func(*gtsmodel.Status) (bool, error)) ([]*gtsmodel.Status, error) { func doStatusFilter(statuses []*gtsmodel.Status, filter func(*gtsmodel.Status) (bool, error)) ([]*gtsmodel.Status, error) {
// Check for provided // Check for provided
// filter function. // filter function.
@ -855,37 +830,3 @@ func doStatusPreFilter(statuses []*gtsmodel.Status, filter func(*gtsmodel.Status
return statuses, nil return statuses, nil
} }
// doStatusPostFilter performs given filter function on provided status meta,
// expecting that embedded status is already loaded, returning filtered status
// meta, as well as those *filtered out*. returns early if error is returned.
func doStatusPostFilter(metas []*StatusMeta, filter func(*gtsmodel.Status) (bool, error)) ([]*StatusMeta, error) {
// Check for provided
// filter function.
if filter == nil {
return metas, nil
}
// Iterate through input metas.
for i := 0; i < len(metas); {
meta := metas[i]
// Pass through filter func.
ok, err := filter(meta.loaded)
if err != nil {
return nil, err
}
if ok {
// Delete meta entry from input slice.
metas = slices.Delete(metas, i, i+1)
continue
}
// Iter.
i++
}
return metas, nil
}

View file

@ -30,13 +30,13 @@ import (
// updated while maintaining the boundary value. // updated while maintaining the boundary value.
func nextPageParams( func nextPageParams(
page *paging.Page, page *paging.Page,
nextLo, nextHi string, lastIdx string,
order paging.Order, order paging.Order,
) { ) {
if order.Ascending() { if order.Ascending() {
page.Min.Value = nextLo page.Min.Value = lastIdx
} else /* i.e. descending */ { //nolint:revive } else /* i.e. descending */ { //nolint:revive
page.Max.Value = nextHi page.Max.Value = lastIdx
} }
} }

View file

@ -365,13 +365,6 @@ func loadStatusTimelinePage(
return nil, err return nil, err
} }
// The order we return from the database and
// timeline caches differs depending on ordering,
// but the caller always expects DESCENDING.
if order.Ascending() {
slices.Reverse(statusIDs)
}
// Fetch statuses from DB / cache with given IDs. // Fetch statuses from DB / cache with given IDs.
return state.DB.GetStatusesByIDs(ctx, statusIDs) return state.DB.GetStatusesByIDs(ctx, statusIDs)
} }

View file

@ -136,10 +136,7 @@ func (f *federatingDB) undoFollow(
// Convert AS Follow to barebones *gtsmodel.Follow, // Convert AS Follow to barebones *gtsmodel.Follow,
// retrieving origin + target accts from the db. // retrieving origin + target accts from the db.
follow, err := f.converter.ASFollowToFollow( follow, err := f.converter.ASFollowToFollow(ctx, asFollow)
gtscontext.SetBarebones(ctx),
asFollow,
)
if err != nil && !errors.Is(err, db.ErrNoEntries) { if err != nil && !errors.Is(err, db.ErrNoEntries) {
err := gtserror.Newf("error converting AS Follow to follow: %w", err) err := gtserror.Newf("error converting AS Follow to follow: %w", err)
return err return err
@ -152,6 +149,11 @@ func (f *federatingDB) undoFollow(
return nil return nil
} }
// Lock on the Follow URI
// as we may be updating it.
unlock := f.state.FedLocks.Lock(follow.URI)
defer unlock()
// Ensure addressee is follow target. // Ensure addressee is follow target.
if follow.TargetAccountID != receivingAcct.ID { if follow.TargetAccountID != receivingAcct.ID {
const text = "receivingAcct was not Follow target" const text = "receivingAcct was not Follow target"
@ -178,7 +180,16 @@ func (f *federatingDB) undoFollow(
return err return err
} }
log.Debug(ctx, "Follow undone") // Send the deleted follow through to
// the fedi worker to process side effects.
f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{
APObjectType: ap.ActivityFollow,
APActivityType: ap.ActivityUndo,
GTSModel: follow,
Receiving: receivingAcct,
Requesting: requestingAcct,
})
return nil return nil
} }
@ -269,7 +280,16 @@ func (f *federatingDB) undoLike(
return err return err
} }
log.Debug(ctx, "Like undone") // Send the deleted block through to
// the fedi worker to process side effects.
f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{
APObjectType: ap.ActivityLike,
APActivityType: ap.ActivityUndo,
GTSModel: fave,
Receiving: receivingAcct,
Requesting: requestingAcct,
})
return nil return nil
} }
@ -298,10 +318,7 @@ func (f *federatingDB) undoBlock(
// Convert AS Block to barebones *gtsmodel.Block, // Convert AS Block to barebones *gtsmodel.Block,
// retrieving origin + target accts from the DB. // retrieving origin + target accts from the DB.
block, err := f.converter.ASBlockToBlock( block, err := f.converter.ASBlockToBlock(ctx, asBlock)
gtscontext.SetBarebones(ctx),
asBlock,
)
if err != nil && !errors.Is(err, db.ErrNoEntries) { if err != nil && !errors.Is(err, db.ErrNoEntries) {
err := gtserror.Newf("error converting AS Block to block: %w", err) err := gtserror.Newf("error converting AS Block to block: %w", err)
return err return err
@ -333,7 +350,16 @@ func (f *federatingDB) undoBlock(
return err return err
} }
log.Debug(ctx, "Block undone") // Send the deleted block through to
// the fedi worker to process side effects.
f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{
APObjectType: ap.ActivityBlock,
APActivityType: ap.ActivityUndo,
GTSModel: block,
Receiving: receivingAcct,
Requesting: requestingAcct,
})
return nil return nil
} }

View file

@ -38,22 +38,6 @@ func (p *Processor) HomeTimelineGet(
*apimodel.PageableResponse, *apimodel.PageableResponse,
gtserror.WithCode, gtserror.WithCode,
) { ) {
var pageQuery url.Values
var postFilter func(*gtsmodel.Status) (bool, error)
if local {
// Set local = true query.
pageQuery = localOnlyTrue
// Remove any non-local statuses if local-only requested.
postFilter = func(s *gtsmodel.Status) (bool, error) {
return !*s.Local, nil
}
} else {
// Set local = false query.
pageQuery = localOnlyFalse
}
return p.getStatusTimeline(ctx, return p.getStatusTimeline(ctx,
// Auth'd // Auth'd
@ -74,7 +58,19 @@ func (p *Processor) HomeTimelineGet(
// page query flag, (this map // page query flag, (this map
// later gets copied before // later gets copied before
// any further usage). // any further usage).
pageQuery, func() url.Values {
var pageQuery url.Values
if local {
// Set local = true query.
pageQuery = localOnlyTrue
} else {
// Set local = false query.
pageQuery = localOnlyFalse
}
return pageQuery
}(),
// Status filter context. // Status filter context.
statusfilter.FilterContextHome, statusfilter.FilterContextHome,
@ -92,9 +88,5 @@ func (p *Processor) HomeTimelineGet(
ok, err := p.visFilter.StatusHomeTimelineable(ctx, requester, s) ok, err := p.visFilter.StatusHomeTimelineable(ctx, requester, s)
return !ok, err return !ok, err
}, },
// Post-filtering function,
// i.e. filter after caching.
postFilter,
) )
} }

View file

@ -93,7 +93,7 @@ func (p *Processor) ListTimelineGet(
return p.state.DB.GetListTimeline(ctx, listID, pg) return p.state.DB.GetListTimeline(ctx, listID, pg)
}, },
// Pre-filtering function, // Filtering function,
// i.e. filter before caching. // i.e. filter before caching.
func(s *gtsmodel.Status) (bool, error) { func(s *gtsmodel.Status) (bool, error) {
@ -101,9 +101,5 @@ func (p *Processor) ListTimelineGet(
ok, err := p.visFilter.StatusHomeTimelineable(ctx, requester, s) ok, err := p.visFilter.StatusHomeTimelineable(ctx, requester, s)
return !ok, err return !ok, err
}, },
// Post-filtering function,
// i.e. filter after caching.
nil,
) )
} }

View file

@ -89,10 +89,6 @@ func (p *Processor) publicTimelineGet(
ok, err := p.visFilter.StatusPublicTimelineable(ctx, requester, s) ok, err := p.visFilter.StatusPublicTimelineable(ctx, requester, s)
return !ok, err return !ok, err
}, },
// Post-filtering function,
// i.e. filter after caching.
nil,
) )
} }
@ -134,7 +130,7 @@ func (p *Processor) localTimelineGet(
return p.state.DB.GetLocalTimeline(ctx, pg) return p.state.DB.GetLocalTimeline(ctx, pg)
}, },
// Pre-filtering function, // Filtering function,
// i.e. filter before caching. // i.e. filter before caching.
func(s *gtsmodel.Status) (bool, error) { func(s *gtsmodel.Status) (bool, error) {
@ -142,9 +138,5 @@ func (p *Processor) localTimelineGet(
ok, err := p.visFilter.StatusPublicTimelineable(ctx, requester, s) ok, err := p.visFilter.StatusPublicTimelineable(ctx, requester, s)
return !ok, err return !ok, err
}, },
// Post-filtering function,
// i.e. filter after caching.
nil,
) )
} }

View file

@ -70,8 +70,7 @@ func (p *Processor) getStatusTimeline(
pageQuery url.Values, pageQuery url.Values,
filterCtx statusfilter.FilterContext, filterCtx statusfilter.FilterContext,
loadPage func(*paging.Page) (statuses []*gtsmodel.Status, err error), loadPage func(*paging.Page) (statuses []*gtsmodel.Status, err error),
preFilter func(*gtsmodel.Status) (bool, error), filter func(*gtsmodel.Status) (bool, error),
postFilter func(*gtsmodel.Status) (bool, error),
) ( ) (
*apimodel.PageableResponse, *apimodel.PageableResponse,
gtserror.WithCode, gtserror.WithCode,
@ -128,13 +127,9 @@ func (p *Processor) getStatusTimeline(
return p.state.DB.GetStatusesByIDs(ctx, ids) return p.state.DB.GetStatusesByIDs(ctx, ids)
}, },
// Pre-filtering function, // Filtering function,
// i.e. filter before caching. // i.e. filter before caching.
preFilter, filter,
// Post-filtering function,
// i.e. filter after caching.
postFilter,
// Frontend API model preparation function. // Frontend API model preparation function.
func(status *gtsmodel.Status) (*apimodel.Status, error) { func(status *gtsmodel.Status) (*apimodel.Status, error) {

View file

@ -682,8 +682,15 @@ func (p *clientAPI) CreateBlock(ctx context.Context, cMsg *messages.FromClientAP
return gtserror.Newf("%T not parseable as *gtsmodel.Block", cMsg.GTSModel) return gtserror.Newf("%T not parseable as *gtsmodel.Block", cMsg.GTSModel)
} }
// Perform any necessary timeline invalidation. if block.Account.IsLocal() {
p.surface.invalidateTimelinesForBlock(ctx, block) // Perform timeline invalidation for block origin account.
p.surface.invalidateTimelinesForAccount(ctx, block.AccountID)
}
if block.TargetAccount.IsLocal() {
// Perform timeline invalidation for block target account.
p.surface.invalidateTimelinesForAccount(ctx, block.TargetAccountID)
}
// TODO: same with notifications? // TODO: same with notifications?
// TODO: same with bookmarks? // TODO: same with bookmarks?
@ -843,6 +850,16 @@ func (p *clientAPI) UndoFollow(ctx context.Context, cMsg *messages.FromClientAPI
log.Errorf(ctx, "error updating account stats: %v", err) log.Errorf(ctx, "error updating account stats: %v", err)
} }
if follow.Account.IsLocal() {
// Perform timeline invalidation for block origin account.
p.surface.invalidateTimelinesForAccount(ctx, follow.AccountID)
}
if follow.TargetAccount.IsLocal() {
// Perform timeline invalidation for block target account.
p.surface.invalidateTimelinesForAccount(ctx, follow.TargetAccountID)
}
if err := p.federate.UndoFollow(ctx, follow); err != nil { if err := p.federate.UndoFollow(ctx, follow); err != nil {
log.Errorf(ctx, "error federating follow undo: %v", err) log.Errorf(ctx, "error federating follow undo: %v", err)
} }
@ -856,6 +873,16 @@ func (p *clientAPI) UndoBlock(ctx context.Context, cMsg *messages.FromClientAPI)
return gtserror.Newf("%T not parseable as *gtsmodel.Block", cMsg.GTSModel) return gtserror.Newf("%T not parseable as *gtsmodel.Block", cMsg.GTSModel)
} }
if block.Account.IsLocal() {
// Perform timeline invalidation for block origin account.
p.surface.invalidateTimelinesForAccount(ctx, block.AccountID)
}
if block.TargetAccount.IsLocal() {
// Perform timeline invalidation for block target account.
p.surface.invalidateTimelinesForAccount(ctx, block.TargetAccountID)
}
if err := p.federate.UndoBlock(ctx, block); err != nil { if err := p.federate.UndoBlock(ctx, block); err != nil {
log.Errorf(ctx, "error federating block undo: %v", err) log.Errorf(ctx, "error federating block undo: %v", err)
} }
@ -1010,6 +1037,25 @@ func (p *clientAPI) DeleteAccountOrUser(ctx context.Context, cMsg *messages.From
p.state.Workers.Federator.Queue.Delete("Receiving.ID", account.ID) p.state.Workers.Federator.Queue.Delete("Receiving.ID", account.ID)
p.state.Workers.Federator.Queue.Delete("TargetURI", account.URI) p.state.Workers.Federator.Queue.Delete("TargetURI", account.URI)
// Remove any entries authored by account from timelines.
p.surface.removeTimelineEntriesByAccount(account.ID)
// Remove any of their cached timelines.
p.state.Caches.Timelines.Public.Delete(account.ID)
p.state.Caches.Timelines.Home.Delete(account.ID)
p.state.Caches.Timelines.Local.Delete(account.ID)
// Get the IDs of all the lists owned by the given account ID.
listIDs, err := p.state.DB.GetListIDsByAccountID(ctx, account.ID)
if err != nil {
log.Errorf(ctx, "error getting lists for account %s: %v", account.ID, err)
}
// Remove list timelines of account.
for _, listID := range listIDs {
p.state.Caches.Timelines.List.Delete(listID)
}
if err := p.federate.DeleteAccount(ctx, cMsg.Target); err != nil { if err := p.federate.DeleteAccount(ctx, cMsg.Target); err != nil {
log.Errorf(ctx, "error federating account delete: %v", err) log.Errorf(ctx, "error federating account delete: %v", err)
} }

View file

@ -197,9 +197,22 @@ func (p *Processor) ProcessFromFediAPI(ctx context.Context, fMsg *messages.FromF
// UNDO SOMETHING // UNDO SOMETHING
case ap.ActivityUndo: case ap.ActivityUndo:
switch fMsg.APObjectType {
// UNDO FOLLOW
case ap.ActivityFollow:
return p.fediAPI.UndoFollow(ctx, fMsg)
// UNDO BLOCK
case ap.ActivityBlock:
return p.fediAPI.UndoBlock(ctx, fMsg)
// UNDO ANNOUNCE // UNDO ANNOUNCE
if fMsg.APObjectType == ap.ActivityAnnounce { case ap.ActivityAnnounce:
return p.fediAPI.UndoAnnounce(ctx, fMsg) return p.fediAPI.UndoAnnounce(ctx, fMsg)
// UNDO LIKE
case ap.ActivityLike:
return p.fediAPI.UndoFave(ctx, fMsg)
} }
} }
@ -701,8 +714,15 @@ func (p *fediAPI) CreateBlock(ctx context.Context, fMsg *messages.FromFediAPI) e
return gtserror.Newf("%T not parseable as *gtsmodel.Block", fMsg.GTSModel) return gtserror.Newf("%T not parseable as *gtsmodel.Block", fMsg.GTSModel)
} }
// Perform any necessary timeline invalidation. if block.Account.IsLocal() {
p.surface.invalidateTimelinesForBlock(ctx, block) // Perform timeline invalidation for block origin account.
p.surface.invalidateTimelinesForAccount(ctx, block.AccountID)
}
if block.TargetAccount.IsLocal() {
// Perform timeline invalidation for block target account.
p.surface.invalidateTimelinesForAccount(ctx, block.TargetAccountID)
}
// Remove any follows that existed between blocker + blockee. // Remove any follows that existed between blocker + blockee.
// (note this handles removing any necessary list entries). // (note this handles removing any necessary list entries).
@ -1054,6 +1074,9 @@ func (p *fediAPI) DeleteAccount(ctx context.Context, fMsg *messages.FromFediAPI)
p.state.Workers.Federator.Queue.Delete("Requesting.ID", account.ID) p.state.Workers.Federator.Queue.Delete("Requesting.ID", account.ID)
p.state.Workers.Federator.Queue.Delete("TargetURI", account.URI) p.state.Workers.Federator.Queue.Delete("TargetURI", account.URI)
// Remove any entries authored by account from timelines.
p.surface.removeTimelineEntriesByAccount(account.ID)
// First perform the actual account deletion. // First perform the actual account deletion.
if err := p.account.Delete(ctx, account, account.ID); err != nil { if err := p.account.Delete(ctx, account, account.ID); err != nil {
log.Errorf(ctx, "error deleting account: %v", err) log.Errorf(ctx, "error deleting account: %v", err)
@ -1172,6 +1195,44 @@ func (p *fediAPI) RejectAnnounce(ctx context.Context, fMsg *messages.FromFediAPI
return nil return nil
} }
func (p *fediAPI) UndoFollow(ctx context.Context, fMsg *messages.FromFediAPI) error {
follow, ok := fMsg.GTSModel.(*gtsmodel.Follow)
if !ok {
return gtserror.Newf("%T not parseable as *gtsmodel.Follow", fMsg.GTSModel)
}
if follow.Account.IsLocal() {
// Perform timeline invalidation for block origin account.
p.surface.invalidateTimelinesForAccount(ctx, follow.AccountID)
}
if follow.TargetAccount.IsLocal() {
// Perform timeline invalidation for block target account.
p.surface.invalidateTimelinesForAccount(ctx, follow.TargetAccountID)
}
return nil
}
func (p *fediAPI) UndoBlock(ctx context.Context, fMsg *messages.FromFediAPI) error {
block, ok := fMsg.GTSModel.(*gtsmodel.Block)
if !ok {
return gtserror.Newf("%T not parseable as *gtsmodel.Block", fMsg.GTSModel)
}
if block.Account.IsLocal() {
// Perform timeline invalidation for block origin account.
p.surface.invalidateTimelinesForAccount(ctx, block.AccountID)
}
if block.TargetAccount.IsLocal() {
// Perform timeline invalidation for block target account.
p.surface.invalidateTimelinesForAccount(ctx, block.TargetAccountID)
}
return nil
}
func (p *fediAPI) UndoAnnounce( func (p *fediAPI) UndoAnnounce(
ctx context.Context, ctx context.Context,
fMsg *messages.FromFediAPI, fMsg *messages.FromFediAPI,
@ -1200,3 +1261,16 @@ func (p *fediAPI) UndoAnnounce(
return nil return nil
} }
func (p *fediAPI) UndoFave(ctx context.Context, fMsg *messages.FromFediAPI) error {
statusFave, ok := fMsg.GTSModel.(*gtsmodel.StatusFave)
if !ok {
return gtserror.Newf("%T not parseable as *gtsmodel.StatusFave", fMsg.GTSModel)
}
// Interaction counts changed on the faved status;
// uncache the prepared version from all timelines.
p.surface.invalidateStatusFromTimelines(statusFave.StatusID)
return nil
}

View file

@ -524,27 +524,6 @@ func (s *Surface) tagFollowersForStatus(
return visibleTagFollowerAccounts, errs.Combine() return visibleTagFollowerAccounts, errs.Combine()
} }
// deleteStatusFromTimelines completely removes the given status from all timelines.
// It will also stream deletion of the status to all open streams.
func (s *Surface) deleteStatusFromTimelines(ctx context.Context, statusID string) {
s.State.Caches.Timelines.Public.RemoveByStatusIDs(statusID)
s.State.Caches.Timelines.Local.RemoveByStatusIDs(statusID)
s.State.Caches.Timelines.Home.RemoveByStatusIDs(statusID)
s.State.Caches.Timelines.List.RemoveByStatusIDs(statusID)
s.Stream.Delete(ctx, statusID)
}
// invalidateStatusFromTimelines does cache invalidation on the given status by
// unpreparing it from all timelines, forcing it to be prepared again (with updated
// stats, boost counts, etc) next time it's fetched by the timeline owner. This goes
// both for the status itself, and for any boosts of the status.
func (s *Surface) invalidateStatusFromTimelines(statusID string) {
s.State.Caches.Timelines.Public.UnprepareByStatusIDs(statusID)
s.State.Caches.Timelines.Local.UnprepareByStatusIDs(statusID)
s.State.Caches.Timelines.Home.UnprepareByStatusIDs(statusID)
s.State.Caches.Timelines.List.UnprepareByStatusIDs(statusID)
}
// timelineStatusUpdate looks up HOME and LIST timelines of accounts // timelineStatusUpdate looks up HOME and LIST timelines of accounts
// that follow the the status author or tags and pushes edit messages into any // that follow the the status author or tags and pushes edit messages into any
// active streams. // active streams.
@ -822,56 +801,52 @@ func (s *Surface) timelineStatusUpdateForTagFollowers(
return errs.Combine() return errs.Combine()
} }
// invalidateTimelinesForBlock ... // deleteStatusFromTimelines completely removes the given status from all timelines.
func (s *Surface) invalidateTimelinesForBlock(ctx context.Context, block *gtsmodel.Block) { // It will also stream deletion of the status to all open streams.
func (s *Surface) deleteStatusFromTimelines(ctx context.Context, statusID string) {
s.State.Caches.Timelines.Public.RemoveByStatusIDs(statusID)
s.State.Caches.Timelines.Local.RemoveByStatusIDs(statusID)
s.State.Caches.Timelines.Home.RemoveByStatusIDs(statusID)
s.State.Caches.Timelines.List.RemoveByStatusIDs(statusID)
s.Stream.Delete(ctx, statusID)
}
// Check if origin is local account, // invalidateStatusFromTimelines does cache invalidation on the given status by
// i.e. has status timeline caches. // unpreparing it from all timelines, forcing it to be prepared again (with updated
if block.Account.IsLocal() { // stats, boost counts, etc) next time it's fetched by the timeline owner. This goes
// both for the status itself, and for any boosts of the status.
func (s *Surface) invalidateStatusFromTimelines(statusID string) {
s.State.Caches.Timelines.Public.UnprepareByStatusIDs(statusID)
s.State.Caches.Timelines.Local.UnprepareByStatusIDs(statusID)
s.State.Caches.Timelines.Home.UnprepareByStatusIDs(statusID)
s.State.Caches.Timelines.List.UnprepareByStatusIDs(statusID)
}
// Remove target's statuses // removeTimelineEntriesByAccount removes all cached timeline entries authored by account ID.
// from origin's home timeline. func (s *Surface) removeTimelineEntriesByAccount(accountID string) {
s.State.Caches.Timelines.Home. s.State.Caches.Timelines.Public.RemoveByAccountIDs(accountID)
MustGet(block.AccountID). s.State.Caches.Timelines.Home.RemoveByAccountIDs(accountID)
RemoveByAccountIDs(block.TargetAccountID) s.State.Caches.Timelines.Local.RemoveByAccountIDs(accountID)
s.State.Caches.Timelines.List.RemoveByAccountIDs(accountID)
}
// Get the IDs of any lists created by origin account. // invalidateTimelinesForAccount invalidates all timeline caches stored for given account ID.
listIDs, err := s.State.DB.GetListIDsByAccountID(ctx, block.AccountID) func (s *Surface) invalidateTimelinesForAccount(ctx context.Context, accountID string) {
if err != nil {
log.Errorf(ctx, "error getting account's list IDs for %s: %v", block.URI, err)
}
// Remove target's statuses from // There's a lot of visibility changes to caclculate for any
// any of origin's list timelines. // relationship change, so just clear all account's timelines.
for _, listID := range listIDs { s.State.Caches.Timelines.Public.Clear(accountID)
s.State.Caches.Timelines.List. s.State.Caches.Timelines.Home.Clear(accountID)
MustGet(listID). s.State.Caches.Timelines.Local.Clear(accountID)
RemoveByAccountIDs(block.TargetAccountID)
} // Get the IDs of all the lists owned by the given account ID.
listIDs, err := s.State.DB.GetListIDsByAccountID(ctx, accountID)
if err != nil {
log.Errorf(ctx, "error getting lists for account %s: %v", accountID, err)
} }
// Check if target is local account, // Clear list timelines of account.
// i.e. has status timeline caches. for _, listID := range listIDs {
if block.TargetAccount.IsLocal() { s.State.Caches.Timelines.List.Clear(listID)
// Remove origin's statuses
// from target's home timeline.
s.State.Caches.Timelines.Home.
MustGet(block.TargetAccountID).
RemoveByAccountIDs(block.AccountID)
// Get the IDs of any lists created by target account.
listIDs, err := s.State.DB.GetListIDsByAccountID(ctx, block.TargetAccountID)
if err != nil {
log.Errorf(ctx, "error getting target account's list IDs for %s: %v", block.URI, err)
}
// Remove origin's statuses from
// any of target's list timelines.
for _, listID := range listIDs {
s.State.Caches.Timelines.List.
MustGet(listID).
RemoveByAccountIDs(block.AccountID)
}
} }
} }

View file

@ -512,7 +512,9 @@ func (c *Converter) ASFollowToFollow(ctx context.Context, followable ap.Followab
follow := &gtsmodel.Follow{ follow := &gtsmodel.Follow{
URI: uri, URI: uri,
Account: origin,
AccountID: origin.ID, AccountID: origin.ID,
TargetAccount: target,
TargetAccountID: target.ID, TargetAccountID: target.ID,
} }