mirror of
https://github.com/superseriousbusiness/gotosocial.git
synced 2025-10-29 18:22:24 -05:00
start adding preloading support
This commit is contained in:
parent
6210ea33ee
commit
b3c9bfde18
7 changed files with 255 additions and 79 deletions
|
|
@ -351,6 +351,11 @@ var Start action.GTSAction = func(ctx context.Context) error {
|
||||||
intFilter,
|
intFilter,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Preload our local user's streaming timeline caches.
|
||||||
|
if err := process.Timeline().Preload(ctx); err != nil {
|
||||||
|
return fmt.Errorf("error preloading timelines: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
// Schedule background cleaning tasks.
|
// Schedule background cleaning tasks.
|
||||||
if err := cleaner.ScheduleJobs(); err != nil {
|
if err := cleaner.ScheduleJobs(); err != nil {
|
||||||
return fmt.Errorf("error scheduling cleaner jobs: %w", err)
|
return fmt.Errorf("error scheduling cleaner jobs: %w", err)
|
||||||
|
|
|
||||||
96
internal/cache/timeline/status.go
vendored
96
internal/cache/timeline/status.go
vendored
|
|
@ -318,6 +318,72 @@ func (t *StatusTimeline) Init(cap int) {
|
||||||
t.max = cap
|
t.max = cap
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Preload ...
|
||||||
|
func (t *StatusTimeline) Preload(
|
||||||
|
ctx context.Context,
|
||||||
|
|
||||||
|
// loadPage should load the timeline of given page for cache hydration.
|
||||||
|
loadPage func(page *paging.Page) (statuses []*gtsmodel.Status, err error),
|
||||||
|
|
||||||
|
// filter can be used to perform filtering of returned
|
||||||
|
// statuses BEFORE insert into cache. i.e. this will effect
|
||||||
|
// what actually gets stored in the timeline cache.
|
||||||
|
filter func(each *gtsmodel.Status) (delete bool, err error),
|
||||||
|
) (int, error) {
|
||||||
|
if loadPage == nil {
|
||||||
|
panic("nil load page func")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Our starting, page at the top
|
||||||
|
// of the possible timeline.
|
||||||
|
page := new(paging.Page)
|
||||||
|
order := paging.OrderDescending
|
||||||
|
page.Max.Order = order
|
||||||
|
page.Max.Value = plus24hULID()
|
||||||
|
page.Min.Order = order
|
||||||
|
page.Min.Value = ""
|
||||||
|
page.Limit = 100
|
||||||
|
|
||||||
|
// Prepare a slice for gathering status meta.
|
||||||
|
metas := make([]*StatusMeta, 0, page.Limit)
|
||||||
|
|
||||||
|
var n int
|
||||||
|
for n < t.cut {
|
||||||
|
// Load page of timeline statuses.
|
||||||
|
statuses, err := loadPage(page)
|
||||||
|
if err != nil {
|
||||||
|
return n, gtserror.Newf("error loading statuses: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// No more statuses from
|
||||||
|
// load function = at end.
|
||||||
|
if len(statuses) == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update our next page cursor from statuses.
|
||||||
|
page.Max.Value = statuses[len(statuses)-1].ID
|
||||||
|
|
||||||
|
// Perform any filtering on newly loaded statuses.
|
||||||
|
statuses, err = doStatusFilter(statuses, filter)
|
||||||
|
if err != nil {
|
||||||
|
return n, gtserror.Newf("error filtering statuses: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// After filtering no more
|
||||||
|
// statuses remain, retry.
|
||||||
|
if len(statuses) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert statuses to meta and insert.
|
||||||
|
metas = toStatusMeta(metas[:0], statuses)
|
||||||
|
n = t.cache.Insert(metas...)
|
||||||
|
}
|
||||||
|
|
||||||
|
return n, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Load will load timeline statuses according to given
|
// Load will load timeline statuses according to given
|
||||||
// page, using provided callbacks to load extra data when
|
// page, using provided callbacks to load extra data when
|
||||||
// necessary, and perform fine-grained filtering loaded
|
// necessary, and perform fine-grained filtering loaded
|
||||||
|
|
@ -424,12 +490,9 @@ func (t *StatusTimeline) Load(
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Track all newly loaded status entries
|
// Check if we need to call
|
||||||
// after filtering for insert into cache.
|
// through to the database.
|
||||||
var justLoaded []*StatusMeta
|
if len(apiStatuses) == 0 {
|
||||||
|
|
||||||
// Check whether loaded enough from cache.
|
|
||||||
if need := limit - len(apiStatuses); need > 0 {
|
|
||||||
|
|
||||||
// Load a little more than
|
// Load a little more than
|
||||||
// limit to reduce db calls.
|
// limit to reduce db calls.
|
||||||
|
|
@ -475,10 +538,7 @@ func (t *StatusTimeline) Load(
|
||||||
// Convert to our cache type,
|
// Convert to our cache type,
|
||||||
// these will get inserted into
|
// these will get inserted into
|
||||||
// the cache in prepare() below.
|
// the cache in prepare() below.
|
||||||
metas := toStatusMeta(statuses)
|
metas := toStatusMeta(nil, statuses)
|
||||||
|
|
||||||
// Append to newly loaded for later insert.
|
|
||||||
justLoaded = append(justLoaded, metas...)
|
|
||||||
|
|
||||||
// Prepare frontend API models for
|
// Prepare frontend API models for
|
||||||
// the loaded statuses. For now this
|
// the loaded statuses. For now this
|
||||||
|
|
@ -511,12 +571,6 @@ func (t *StatusTimeline) Load(
|
||||||
lo, hi = hi, lo
|
lo, hi = hi, lo
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(justLoaded) > 0 {
|
|
||||||
// Even if not returning them, insert
|
|
||||||
// the excess (filtered) into cache.
|
|
||||||
_ = t.cache.Insert(justLoaded...)
|
|
||||||
}
|
|
||||||
|
|
||||||
return apiStatuses, lo, hi, nil
|
return apiStatuses, lo, hi, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -566,9 +620,6 @@ func LoadStatusTimeline(
|
||||||
// Preallocate a slice of up-to-limit API models.
|
// Preallocate a slice of up-to-limit API models.
|
||||||
apiStatuses := make([]*apimodel.Status, 0, limit)
|
apiStatuses := make([]*apimodel.Status, 0, limit)
|
||||||
|
|
||||||
// Check whether loaded enough from cache.
|
|
||||||
if need := limit - len(apiStatuses); need > 0 {
|
|
||||||
|
|
||||||
// Load a little more than
|
// Load a little more than
|
||||||
// limit to reduce db calls.
|
// limit to reduce db calls.
|
||||||
nextPg.Limit += 10
|
nextPg.Limit += 10
|
||||||
|
|
@ -613,7 +664,7 @@ func LoadStatusTimeline(
|
||||||
// Convert to our cache type,
|
// Convert to our cache type,
|
||||||
// these will get inserted into
|
// these will get inserted into
|
||||||
// the cache in prepare() below.
|
// the cache in prepare() below.
|
||||||
metas := toStatusMeta(statuses)
|
metas := toStatusMeta(nil, statuses)
|
||||||
|
|
||||||
// Prepare frontend API models for
|
// Prepare frontend API models for
|
||||||
// the loaded statuses. For now this
|
// the loaded statuses. For now this
|
||||||
|
|
@ -634,7 +685,6 @@ func LoadStatusTimeline(
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if order.Ascending() {
|
if order.Ascending() {
|
||||||
// The caller always expects the statuses
|
// The caller always expects the statuses
|
||||||
|
|
@ -922,8 +972,8 @@ func loadStatuses(
|
||||||
|
|
||||||
// toStatusMeta converts a slice of database model statuses
|
// toStatusMeta converts a slice of database model statuses
|
||||||
// into our cache wrapper type, a slice of []StatusMeta{}.
|
// into our cache wrapper type, a slice of []StatusMeta{}.
|
||||||
func toStatusMeta(statuses []*gtsmodel.Status) []*StatusMeta {
|
func toStatusMeta(in []*StatusMeta, statuses []*gtsmodel.Status) []*StatusMeta {
|
||||||
return xslices.Gather(nil, statuses, func(s *gtsmodel.Status) *StatusMeta {
|
return xslices.Gather(in, statuses, func(s *gtsmodel.Status) *StatusMeta {
|
||||||
return &StatusMeta{
|
return &StatusMeta{
|
||||||
ID: s.ID,
|
ID: s.ID,
|
||||||
AccountID: s.AccountID,
|
AccountID: s.AccountID,
|
||||||
|
|
|
||||||
6
internal/cache/timeline/status_test.go
vendored
6
internal/cache/timeline/status_test.go
vendored
|
|
@ -273,6 +273,12 @@ func TestStatusTimelineTrim(t *testing.T) {
|
||||||
assert.NotEqual(t, minID2, minStatus(&tt).ID)
|
assert.NotEqual(t, minID2, minStatus(&tt).ID)
|
||||||
assert.False(t, containsStatusID(&tt, minID1))
|
assert.False(t, containsStatusID(&tt, minID1))
|
||||||
assert.False(t, containsStatusID(&tt, minID2))
|
assert.False(t, containsStatusID(&tt, minID2))
|
||||||
|
|
||||||
|
// Trim at desired length
|
||||||
|
// should cause no change.
|
||||||
|
before := tt.cache.Len()
|
||||||
|
tt.Trim()
|
||||||
|
assert.Equal(t, before, tt.cache.Len())
|
||||||
}
|
}
|
||||||
|
|
||||||
// containsStatusID returns whether timeline contains a status with ID.
|
// containsStatusID returns whether timeline contains a status with ID.
|
||||||
|
|
|
||||||
9
internal/cache/timeline/timeline.go
vendored
9
internal/cache/timeline/timeline.go
vendored
|
|
@ -18,10 +18,19 @@
|
||||||
package timeline
|
package timeline
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
"codeberg.org/gruf/go-structr"
|
"codeberg.org/gruf/go-structr"
|
||||||
|
"github.com/superseriousbusiness/gotosocial/internal/id"
|
||||||
"github.com/superseriousbusiness/gotosocial/internal/paging"
|
"github.com/superseriousbusiness/gotosocial/internal/paging"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// plus24hULID returns a ULID for now+24h.
|
||||||
|
func plus24hULID() string {
|
||||||
|
t := time.Now().Add(24 * time.Hour)
|
||||||
|
return id.NewULIDFromTime(t)
|
||||||
|
}
|
||||||
|
|
||||||
// nextPageParams gets the next set of paging
|
// nextPageParams gets the next set of paging
|
||||||
// parameters to use based on the current set,
|
// parameters to use based on the current set,
|
||||||
// and the next set of lo / hi values. This will
|
// and the next set of lo / hi values. This will
|
||||||
|
|
|
||||||
|
|
@ -25,6 +25,7 @@ import (
|
||||||
statusfilter "github.com/superseriousbusiness/gotosocial/internal/filter/status"
|
statusfilter "github.com/superseriousbusiness/gotosocial/internal/filter/status"
|
||||||
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
|
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
|
||||||
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
|
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
|
||||||
|
"github.com/superseriousbusiness/gotosocial/internal/log"
|
||||||
"github.com/superseriousbusiness/gotosocial/internal/paging"
|
"github.com/superseriousbusiness/gotosocial/internal/paging"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -90,3 +91,35 @@ func (p *Processor) HomeTimelineGet(
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// preloadHomeTimeline will ensure that the timeline
|
||||||
|
// cache for home owned by given account is preloaded.
|
||||||
|
func (p *Processor) preloadHomeTimeline(
|
||||||
|
ctx context.Context,
|
||||||
|
account *gtsmodel.Account,
|
||||||
|
) error {
|
||||||
|
|
||||||
|
// Get (and so, create) home timeline cache for account ID.
|
||||||
|
timeline := p.state.Caches.Timelines.Home.MustGet(account.ID)
|
||||||
|
|
||||||
|
// Preload timeline with funcs.
|
||||||
|
n, err := timeline.Preload(ctx,
|
||||||
|
|
||||||
|
// Database load function.
|
||||||
|
func(page *paging.Page) ([]*gtsmodel.Status, error) {
|
||||||
|
return p.state.DB.GetHomeTimeline(ctx, account.ID, page)
|
||||||
|
},
|
||||||
|
|
||||||
|
// Status filtering function.
|
||||||
|
func(status *gtsmodel.Status) (bool, error) {
|
||||||
|
ok, err := p.visFilter.StatusHomeTimelineable(ctx, account, status)
|
||||||
|
return !ok, err
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return gtserror.Newf("error preloading home timeline %s: %w", account.ID, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof(ctx, "%s: preloaded %d", account.Username, n)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -27,6 +27,7 @@ import (
|
||||||
"github.com/superseriousbusiness/gotosocial/internal/gtscontext"
|
"github.com/superseriousbusiness/gotosocial/internal/gtscontext"
|
||||||
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
|
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
|
||||||
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
|
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
|
||||||
|
"github.com/superseriousbusiness/gotosocial/internal/log"
|
||||||
"github.com/superseriousbusiness/gotosocial/internal/paging"
|
"github.com/superseriousbusiness/gotosocial/internal/paging"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -103,3 +104,36 @@ func (p *Processor) ListTimelineGet(
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// preloadListTimeline will ensure that the timeline
|
||||||
|
// cache for list owned by given account is preloaded.
|
||||||
|
func (p *Processor) preloadListTimeline(
|
||||||
|
ctx context.Context,
|
||||||
|
account *gtsmodel.Account,
|
||||||
|
list *gtsmodel.List,
|
||||||
|
) error {
|
||||||
|
|
||||||
|
// Get (and so, create) list timeline cache for list ID.
|
||||||
|
timeline := p.state.Caches.Timelines.List.MustGet(list.ID)
|
||||||
|
|
||||||
|
// Preload timeline with funcs.
|
||||||
|
n, err := timeline.Preload(ctx,
|
||||||
|
|
||||||
|
// Database load function.
|
||||||
|
func(page *paging.Page) ([]*gtsmodel.Status, error) {
|
||||||
|
return p.state.DB.GetListTimeline(ctx, list.ID, page)
|
||||||
|
},
|
||||||
|
|
||||||
|
// Status filtering function.
|
||||||
|
func(status *gtsmodel.Status) (bool, error) {
|
||||||
|
ok, err := p.visFilter.StatusHomeTimelineable(ctx, account, status)
|
||||||
|
return !ok, err
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return gtserror.Newf("error preloading list timeline %s: %w", list.ID, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof(ctx, "%s[%q]: preloaded %d", account.Username, list.Title, n)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -24,11 +24,12 @@ import (
|
||||||
"net/url"
|
"net/url"
|
||||||
|
|
||||||
apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
|
apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
|
||||||
"github.com/superseriousbusiness/gotosocial/internal/cache/timeline"
|
timelinepkg "github.com/superseriousbusiness/gotosocial/internal/cache/timeline"
|
||||||
"github.com/superseriousbusiness/gotosocial/internal/db"
|
"github.com/superseriousbusiness/gotosocial/internal/db"
|
||||||
statusfilter "github.com/superseriousbusiness/gotosocial/internal/filter/status"
|
statusfilter "github.com/superseriousbusiness/gotosocial/internal/filter/status"
|
||||||
"github.com/superseriousbusiness/gotosocial/internal/filter/usermute"
|
"github.com/superseriousbusiness/gotosocial/internal/filter/usermute"
|
||||||
"github.com/superseriousbusiness/gotosocial/internal/filter/visibility"
|
"github.com/superseriousbusiness/gotosocial/internal/filter/visibility"
|
||||||
|
"github.com/superseriousbusiness/gotosocial/internal/gtscontext"
|
||||||
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
|
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
|
||||||
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
|
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
|
||||||
"github.com/superseriousbusiness/gotosocial/internal/id"
|
"github.com/superseriousbusiness/gotosocial/internal/id"
|
||||||
|
|
@ -61,10 +62,48 @@ func New(state *state.State, converter *typeutils.Converter, visFilter *visibili
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Preload ...
|
||||||
|
func (p *Processor) Preload(ctx context.Context) error {
|
||||||
|
|
||||||
|
// Get all of our local user accounts.
|
||||||
|
users, err := p.state.DB.GetAllUsers(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return gtserror.Newf("error getting users: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, user := range users {
|
||||||
|
// Get associated account.
|
||||||
|
account := user.Account
|
||||||
|
|
||||||
|
// Preload this user account's home timeline cache.
|
||||||
|
if err := p.preloadHomeTimeline(ctx, account); err != nil {
|
||||||
|
return gtserror.Newf("error preloading home timeline: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get all lists owned by this user account.
|
||||||
|
lists, err := p.state.DB.GetListsByAccountID(
|
||||||
|
gtscontext.SetBarebones(ctx),
|
||||||
|
account.ID,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return gtserror.Newf("error getting account %s lists: %w", account.ID, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, list := range lists {
|
||||||
|
// Preload each of this user account's list timeline caches.
|
||||||
|
if err := p.preloadListTimeline(ctx, account, list); err != nil {
|
||||||
|
return gtserror.Newf("error preloading list timeline: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (p *Processor) getStatusTimeline(
|
func (p *Processor) getStatusTimeline(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
requester *gtsmodel.Account,
|
requester *gtsmodel.Account,
|
||||||
cache *timeline.StatusTimeline,
|
timeline *timelinepkg.StatusTimeline,
|
||||||
page *paging.Page,
|
page *paging.Page,
|
||||||
pagePath string,
|
pagePath string,
|
||||||
pageQuery url.Values,
|
pageQuery url.Values,
|
||||||
|
|
@ -128,10 +167,10 @@ func (p *Processor) getStatusTimeline(
|
||||||
return apiStatus, nil
|
return apiStatus, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if cache != nil {
|
if timeline != nil {
|
||||||
// Load status page via timeline cache, also
|
// Load status page via timeline cache, also
|
||||||
// getting lo, hi values for next, prev pages.
|
// getting lo, hi values for next, prev pages.
|
||||||
apiStatuses, lo, hi, err = cache.Load(ctx,
|
apiStatuses, lo, hi, err = timeline.Load(ctx,
|
||||||
|
|
||||||
// Status page
|
// Status page
|
||||||
// to load.
|
// to load.
|
||||||
|
|
@ -157,7 +196,7 @@ func (p *Processor) getStatusTimeline(
|
||||||
} else {
|
} else {
|
||||||
// Load status page without a receiving timeline cache.
|
// Load status page without a receiving timeline cache.
|
||||||
// TODO: remove this code path when all support caching.
|
// TODO: remove this code path when all support caching.
|
||||||
apiStatuses, lo, hi, err = timeline.LoadStatusTimeline(ctx,
|
apiStatuses, lo, hi, err = timelinepkg.LoadStatusTimeline(ctx,
|
||||||
page,
|
page,
|
||||||
loadPage,
|
loadPage,
|
||||||
func(ids []string) ([]*gtsmodel.Status, error) {
|
func(ids []string) ([]*gtsmodel.Status, error) {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue