mirror of
https://github.com/superseriousbusiness/gotosocial.git
synced 2025-11-12 16:37:29 -06:00
more work integration new timeline code
This commit is contained in:
parent
49d9a008d9
commit
771fbe2d5e
14 changed files with 419 additions and 606 deletions
301
internal/cache/timeline/status.go
vendored
301
internal/cache/timeline/status.go
vendored
|
|
@ -30,6 +30,7 @@ import (
|
|||
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/log"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/paging"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/util/xslices"
|
||||
)
|
||||
|
||||
// StatusMeta contains minimum viable metadata
|
||||
|
|
@ -59,6 +60,12 @@ type StatusMeta struct {
|
|||
loaded *gtsmodel.Status
|
||||
}
|
||||
|
||||
// isLoaded is a small utility func that can fill
|
||||
// the slices.DeleteFunc() signature requirements.
|
||||
func (m *StatusMeta) isLoaded() bool {
|
||||
return m.loaded == nil
|
||||
}
|
||||
|
||||
// StatusTimelines ...
|
||||
type StatusTimelines struct {
|
||||
ptr atomic.Pointer[map[string]*StatusTimeline] // ronly except by CAS
|
||||
|
|
@ -266,7 +273,7 @@ func (t *StatusTimeline) Init(cap int) {
|
|||
AccountID: s.AccountID,
|
||||
BoostOfID: s.BoostOfID,
|
||||
BoostOfAccountID: s.BoostOfAccountID,
|
||||
loaded: nil, // NEVER copied
|
||||
loaded: nil, // NEVER stored
|
||||
prepared: prepared,
|
||||
}
|
||||
},
|
||||
|
|
@ -285,25 +292,28 @@ func (t *StatusTimeline) Load(
|
|||
page *paging.Page,
|
||||
|
||||
// loadPage should load the timeline of given page for cache hydration.
|
||||
loadPage func(page *paging.Page) ([]*gtsmodel.Status, error),
|
||||
loadPage func(page *paging.Page) (statuses []*gtsmodel.Status, err error),
|
||||
|
||||
// loadIDs should load status models with given IDs.
|
||||
loadIDs func([]string) ([]*gtsmodel.Status, error),
|
||||
// loadIDs should load status models with given IDs, this is used
|
||||
// to load status models of already cached entries in the timeline.
|
||||
loadIDs func(ids []string) (statuses []*gtsmodel.Status, err error),
|
||||
|
||||
// preFilter can be used to perform filtering of returned
|
||||
// statuses BEFORE insert into cache. i.e. this will effect
|
||||
// what actually gets stored in the timeline cache.
|
||||
preFilter func(*gtsmodel.Status) (bool, error),
|
||||
preFilter func(each *gtsmodel.Status) (delete bool, err error),
|
||||
|
||||
// postFilterFn can be used to perform filtering of returned
|
||||
// statuses AFTER insert into cache. i.e. this will not effect
|
||||
// what actually gets stored in the timeline cache.
|
||||
postFilter func(*StatusMeta) bool,
|
||||
postFilter func(each *gtsmodel.Status) (delete bool, err error),
|
||||
|
||||
// prepareAPI should prepare internal status model to frontend API model.
|
||||
prepareAPI func(*gtsmodel.Status) (*apimodel.Status, error),
|
||||
prepareAPI func(status *gtsmodel.Status) (apiStatus *apimodel.Status, err error),
|
||||
) (
|
||||
[]*apimodel.Status,
|
||||
string, // lo
|
||||
string, // hi
|
||||
error,
|
||||
) {
|
||||
switch {
|
||||
|
|
@ -320,18 +330,54 @@ func (t *StatusTimeline) Load(
|
|||
ord := page.Order()
|
||||
dir := toDirection(ord)
|
||||
|
||||
// Load cached timeline entries for page.
|
||||
meta := t.cache.Select(min, max, lim, dir)
|
||||
// First we attempt to load status metadata
|
||||
// entries from the timeline cache, up to lim.
|
||||
metas := t.cache.Select(min, max, lim, dir)
|
||||
|
||||
// Perform any timeline post-filtering.
|
||||
meta = doPostFilter(meta, postFilter)
|
||||
// Set the starting lo / hi ID paging
|
||||
// values. We continually update these
|
||||
// for further timeline selections and
|
||||
// for returning final next / prev pgs.
|
||||
lo, hi := min, max
|
||||
|
||||
// ...
|
||||
if need := len(meta) - lim; need > 0 {
|
||||
if len(metas) > 0 {
|
||||
// Update paging values
|
||||
// based on returned data.
|
||||
lo, hi = nextPageParams(
|
||||
lo, hi,
|
||||
metas[len(metas)-1].ID,
|
||||
metas[0].ID,
|
||||
ord,
|
||||
)
|
||||
|
||||
// Set first page
|
||||
// query to load.
|
||||
nextPg := page
|
||||
// Before we can do any filtering, we need
|
||||
// to load status models for cached entries.
|
||||
err := loadStatuses(ctx, metas, loadIDs)
|
||||
if err != nil {
|
||||
return nil, "", "", gtserror.Newf("error loading statuses: %w", err)
|
||||
}
|
||||
|
||||
// Drop all entries we failed to load statuses for.
|
||||
metas = slices.DeleteFunc(metas, (*StatusMeta).isLoaded)
|
||||
|
||||
// Perform any post-filtering on cached status entries.
|
||||
metas, _, err = doStatusPostFilter(metas, postFilter)
|
||||
if err != nil {
|
||||
return nil, "", "", gtserror.Newf("error post-filtering statuses: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
var filtered []*StatusMeta
|
||||
|
||||
// Check whether loaded enough from cache.
|
||||
if need := len(metas) - lim; need > 0 {
|
||||
|
||||
// Use a copy of current page so
|
||||
// we can repeatedly update it.
|
||||
nextPg := new(paging.Page)
|
||||
*nextPg = *page
|
||||
nextPg.Min.Value = lo
|
||||
nextPg.Max.Value = hi
|
||||
|
||||
// Perform a maximum of 5
|
||||
// load attempts fetching
|
||||
|
|
@ -341,7 +387,7 @@ func (t *StatusTimeline) Load(
|
|||
// Load next timeline statuses.
|
||||
statuses, err := loadPage(nextPg)
|
||||
if err != nil {
|
||||
return nil, gtserror.Newf("error loading timeline: %w", err)
|
||||
return nil, "", "", gtserror.Newf("error loading timeline: %w", err)
|
||||
}
|
||||
|
||||
// No more statuses from
|
||||
|
|
@ -350,59 +396,65 @@ func (t *StatusTimeline) Load(
|
|||
break
|
||||
}
|
||||
|
||||
// Get the lowest and highest
|
||||
// ID values, used for next pg.
|
||||
// Done BEFORE status filtering.
|
||||
lo := statuses[len(statuses)-1].ID
|
||||
hi := statuses[0].ID
|
||||
// Update paging values
|
||||
// based on returned data.
|
||||
lo, hi = nextPageParams(
|
||||
lo, hi,
|
||||
statuses[len(statuses)-1].ID,
|
||||
statuses[0].ID,
|
||||
ord,
|
||||
)
|
||||
|
||||
// Perform any status timeline pre-filtering.
|
||||
statuses, err = doPreFilter(statuses, preFilter)
|
||||
// Update paging params.
|
||||
nextPg.Min.Value = lo
|
||||
nextPg.Max.Value = hi
|
||||
|
||||
// Perform any pre-filtering on newly loaded statuses.
|
||||
statuses, err = doStatusPreFilter(statuses, preFilter)
|
||||
if err != nil {
|
||||
return nil, gtserror.Newf("error pre-filtering timeline: %w", err)
|
||||
return nil, "", "", gtserror.Newf("error pre-filtering statuses: %w", err)
|
||||
}
|
||||
|
||||
// After filtering no more
|
||||
// statuses remain, retry.
|
||||
if len(statuses) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// Convert to our cache type,
|
||||
// these will get inserted into
|
||||
// the cache in prepare() below.
|
||||
m := toStatusMeta(statuses)
|
||||
uncached := toStatusMeta(statuses)
|
||||
|
||||
// Perform any post-filtering.
|
||||
// and append to main meta slice.
|
||||
m = slices.DeleteFunc(m, postFilter)
|
||||
meta = append(meta, m...)
|
||||
// Perform any post-filtering on recently loaded timeline entries.
|
||||
newMetas, newFiltered, err := doStatusPostFilter(uncached, postFilter)
|
||||
if err != nil {
|
||||
return nil, "", "", gtserror.Newf("error post-filtering statuses: %w", err)
|
||||
}
|
||||
|
||||
// Append the meta to their relevant slices.
|
||||
filtered = append(filtered, newFiltered...)
|
||||
metas = append(metas, newMetas...)
|
||||
|
||||
// Check if we reached
|
||||
// requested page limit.
|
||||
if len(meta) >= lim {
|
||||
if len(metas) >= lim {
|
||||
break
|
||||
}
|
||||
|
||||
// Set next paging value.
|
||||
nextPg = nextPg.Next(lo, hi)
|
||||
}
|
||||
}
|
||||
|
||||
// Using meta and given funcs, prepare frontend API models.
|
||||
apiStatuses, err := t.prepare(ctx, meta, loadIDs, prepareAPI)
|
||||
// Using meta and funcs, prepare frontend API models.
|
||||
apiStatuses, err := t.prepare(ctx, metas, prepareAPI)
|
||||
if err != nil {
|
||||
return nil, gtserror.Newf("error preparing api statuses: %w", err)
|
||||
return nil, "", "", gtserror.Newf("error preparing api statuses: %w", err)
|
||||
}
|
||||
|
||||
// Ensure the returned statuses are ALWAYS in descending order.
|
||||
slices.SortFunc(apiStatuses, func(s1, s2 *apimodel.Status) int {
|
||||
const k = +1
|
||||
switch {
|
||||
case s1.ID > s2.ID:
|
||||
return +k
|
||||
case s1.ID < s2.ID:
|
||||
return -k
|
||||
default:
|
||||
return 0
|
||||
}
|
||||
})
|
||||
// Even if we don't return them, insert
|
||||
// the excess (post-filtered) into cache.
|
||||
t.cache.Insert(filtered...)
|
||||
|
||||
return apiStatuses, nil
|
||||
return apiStatuses, lo, hi, nil
|
||||
}
|
||||
|
||||
// Insert ...
|
||||
|
|
@ -543,15 +595,12 @@ func (t *StatusTimeline) Clear() { t.cache.Clear() }
|
|||
func (t *StatusTimeline) prepare(
|
||||
ctx context.Context,
|
||||
meta []*StatusMeta,
|
||||
loadIDs func([]string) ([]*gtsmodel.Status, error),
|
||||
prepareAPI func(*gtsmodel.Status) (*apimodel.Status, error),
|
||||
) (
|
||||
[]*apimodel.Status,
|
||||
error,
|
||||
) {
|
||||
switch {
|
||||
case loadIDs == nil:
|
||||
panic("nil load fn")
|
||||
case prepareAPI == nil:
|
||||
panic("nil prepare fn")
|
||||
}
|
||||
|
|
@ -569,39 +618,16 @@ func (t *StatusTimeline) prepare(
|
|||
|
||||
// If there were no unprepared
|
||||
// StatusMeta objects, then we
|
||||
// gathered everything we need!
|
||||
// gathered everything we can!
|
||||
if len(unprepared) == 0 {
|
||||
return apiStatuses, nil
|
||||
}
|
||||
|
||||
// Of the StatusMeta objects missing a prepared
|
||||
// frontend model, find those without a recently
|
||||
// fetched database model and store their IDs,
|
||||
// as well mapping them for faster update below.
|
||||
toLoadIDs := make([]string, len(unprepared))
|
||||
loadedMap := make(map[string]*StatusMeta, len(unprepared))
|
||||
for i, meta := range unprepared {
|
||||
if meta.loaded == nil {
|
||||
toLoadIDs[i] = meta.ID
|
||||
loadedMap[meta.ID] = meta
|
||||
}
|
||||
}
|
||||
|
||||
// Load statuses with given IDs.
|
||||
loaded, err := loadIDs(toLoadIDs)
|
||||
if err != nil {
|
||||
return nil, gtserror.Newf("error loading statuses: %w", err)
|
||||
}
|
||||
|
||||
// Update returned StatusMeta objects
|
||||
// with newly loaded statuses by IDs.
|
||||
for i := range loaded {
|
||||
status := loaded[i]
|
||||
meta := loadedMap[status.ID]
|
||||
meta.loaded = status
|
||||
}
|
||||
|
||||
// By this point all status objects should
|
||||
// be fully populated with loaded models,
|
||||
// since they are required for filtering.
|
||||
for i := 0; i < len(unprepared); {
|
||||
|
||||
// Get meta at index.
|
||||
meta := unprepared[i]
|
||||
|
||||
|
|
@ -632,28 +658,61 @@ func (t *StatusTimeline) prepare(
|
|||
return apiStatuses, nil
|
||||
}
|
||||
|
||||
// loadStatuses ...
|
||||
func loadStatuses(
|
||||
ctx context.Context,
|
||||
metas []*StatusMeta,
|
||||
loadIDs func([]string) ([]*gtsmodel.Status, error),
|
||||
) error {
|
||||
// ...
|
||||
toLoadIDs := make([]string, len(metas))
|
||||
loadedMap := make(map[string]*StatusMeta, len(metas))
|
||||
for i, meta := range metas {
|
||||
if meta.loaded == nil {
|
||||
toLoadIDs[i] = meta.ID
|
||||
loadedMap[meta.ID] = meta
|
||||
}
|
||||
}
|
||||
|
||||
// Load statuses with given IDs.
|
||||
loaded, err := loadIDs(toLoadIDs)
|
||||
if err != nil {
|
||||
return gtserror.Newf("error loading statuses: %w", err)
|
||||
}
|
||||
|
||||
// Update returned StatusMeta objects
|
||||
// with newly loaded statuses by IDs.
|
||||
for i := range loaded {
|
||||
status := loaded[i]
|
||||
meta := loadedMap[status.ID]
|
||||
meta.loaded = status
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// toStatusMeta converts a slice of database model statuses
|
||||
// into our cache wrapper type, a slice of []StatusMeta{}.
|
||||
func toStatusMeta(statuses []*gtsmodel.Status) []*StatusMeta {
|
||||
meta := make([]*StatusMeta, len(statuses))
|
||||
for i := range statuses {
|
||||
status := statuses[i]
|
||||
meta[i] = &StatusMeta{
|
||||
ID: status.ID,
|
||||
AccountID: status.AccountID,
|
||||
BoostOfID: status.BoostOfID,
|
||||
BoostOfAccountID: status.BoostOfAccountID,
|
||||
Local: *status.Local,
|
||||
loaded: status,
|
||||
return xslices.Gather(nil, statuses, func(s *gtsmodel.Status) *StatusMeta {
|
||||
return &StatusMeta{
|
||||
ID: s.ID,
|
||||
AccountID: s.AccountID,
|
||||
BoostOfID: s.BoostOfID,
|
||||
BoostOfAccountID: s.BoostOfAccountID,
|
||||
Local: *s.Local,
|
||||
loaded: s,
|
||||
prepared: nil,
|
||||
}
|
||||
}
|
||||
return meta
|
||||
})
|
||||
}
|
||||
|
||||
// doPreFilter acts similarly to slices.DeleteFunc but it accepts function with error return, or nil, returning early if so.
|
||||
func doPreFilter(statuses []*gtsmodel.Status, preFilter func(*gtsmodel.Status) (bool, error)) ([]*gtsmodel.Status, error) {
|
||||
if preFilter == nil {
|
||||
// ...
|
||||
func doStatusPreFilter(statuses []*gtsmodel.Status, filter func(*gtsmodel.Status) (bool, error)) ([]*gtsmodel.Status, error) {
|
||||
|
||||
// Check for provided
|
||||
// filter function.
|
||||
if filter == nil {
|
||||
return statuses, nil
|
||||
}
|
||||
|
||||
|
|
@ -662,7 +721,7 @@ func doPreFilter(statuses []*gtsmodel.Status, preFilter func(*gtsmodel.Status) (
|
|||
status := statuses[i]
|
||||
|
||||
// Pass through filter func.
|
||||
ok, err := preFilter(status)
|
||||
ok, err := filter(status)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -680,22 +739,38 @@ func doPreFilter(statuses []*gtsmodel.Status, preFilter func(*gtsmodel.Status) (
|
|||
return statuses, nil
|
||||
}
|
||||
|
||||
// doPostFilter acts similarly to slices.DeleteFunc but it handles case of a nil function.
|
||||
func doPostFilter(statuses []*StatusMeta, postFilter func(*StatusMeta) bool) []*StatusMeta {
|
||||
if postFilter == nil {
|
||||
return statuses
|
||||
}
|
||||
return slices.DeleteFunc(statuses, postFilter)
|
||||
}
|
||||
// ...
|
||||
func doStatusPostFilter(metas []*StatusMeta, filter func(*gtsmodel.Status) (bool, error)) ([]*StatusMeta, []*StatusMeta, error) {
|
||||
|
||||
// toDirection converts page order to timeline direction.
|
||||
func toDirection(o paging.Order) structr.Direction {
|
||||
switch o {
|
||||
case paging.OrderAscending:
|
||||
return structr.Asc
|
||||
case paging.OrderDescending:
|
||||
return structr.Desc
|
||||
default:
|
||||
return false
|
||||
// Check for provided
|
||||
// filter function.
|
||||
if filter == nil {
|
||||
return metas, nil, nil
|
||||
}
|
||||
|
||||
// Prepare a slice to store filtered statuses.
|
||||
filtered := make([]*StatusMeta, 0, len(metas))
|
||||
|
||||
// Iterate through input metas.
|
||||
for i := 0; i < len(metas); {
|
||||
meta := metas[i]
|
||||
|
||||
// Pass through filter func.
|
||||
ok, err := filter(meta.loaded)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if ok {
|
||||
// Delete meta and add to filtered.
|
||||
metas = slices.Delete(metas, i, i+1)
|
||||
filtered = append(filtered, meta)
|
||||
continue
|
||||
}
|
||||
|
||||
// Iter.
|
||||
i++
|
||||
}
|
||||
|
||||
return metas, filtered, nil
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue