diff --git a/cmd/gotosocial/action/server/server.go b/cmd/gotosocial/action/server/server.go index 4caf44cad..67e267f89 100644 --- a/cmd/gotosocial/action/server/server.go +++ b/cmd/gotosocial/action/server/server.go @@ -57,12 +57,10 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/observability" "github.com/superseriousbusiness/gotosocial/internal/oidc" "github.com/superseriousbusiness/gotosocial/internal/processing" - tlprocessor "github.com/superseriousbusiness/gotosocial/internal/processing/timeline" "github.com/superseriousbusiness/gotosocial/internal/router" "github.com/superseriousbusiness/gotosocial/internal/state" gtsstorage "github.com/superseriousbusiness/gotosocial/internal/storage" "github.com/superseriousbusiness/gotosocial/internal/subscriptions" - "github.com/superseriousbusiness/gotosocial/internal/timeline" "github.com/superseriousbusiness/gotosocial/internal/transport" "github.com/superseriousbusiness/gotosocial/internal/typeutils" "github.com/superseriousbusiness/gotosocial/internal/web" @@ -323,26 +321,6 @@ var Start action.GTSAction = func(ctx context.Context) error { // Create a Web Push notification sender. webPushSender := webpush.NewSender(client, state, typeConverter) - // Initialize both home / list timelines. - state.Timelines.Home = timeline.NewManager( - tlprocessor.HomeTimelineGrab(state), - tlprocessor.HomeTimelineFilter(state, visFilter), - tlprocessor.HomeTimelineStatusPrepare(state, typeConverter), - tlprocessor.SkipInsert(), - ) - if err := state.Timelines.Home.Start(); err != nil { - return fmt.Errorf("error starting home timeline: %s", err) - } - state.Timelines.List = timeline.NewManager( - tlprocessor.ListTimelineGrab(state), - tlprocessor.ListTimelineFilter(state, visFilter), - tlprocessor.ListTimelineStatusPrepare(state, typeConverter), - tlprocessor.SkipInsert(), - ) - if err := state.Timelines.List.Start(); err != nil { - return fmt.Errorf("error starting list timeline: %s", err) - } - // Start the job scheduler // (this is required for cleaner). state.Workers.StartScheduler() diff --git a/internal/api/client/timelines/home.go b/internal/api/client/timelines/home.go index 8e957d498..4cb0ae8aa 100644 --- a/internal/api/client/timelines/home.go +++ b/internal/api/client/timelines/home.go @@ -23,6 +23,7 @@ import ( "github.com/gin-gonic/gin" apiutil "github.com/superseriousbusiness/gotosocial/internal/api/util" "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/paging" ) // HomeTimelineGETHandler swagger:operation GET /api/v1/timelines/home homeTimeline @@ -127,13 +128,17 @@ func (m *Module) HomeTimelineGETHandler(c *gin.Context) { return } - limit, errWithCode := apiutil.ParseLimit(c.Query(apiutil.LimitKey), 20, 40, 1) + local, errWithCode := apiutil.ParseLocal(c.Query(apiutil.LocalKey), false) if errWithCode != nil { apiutil.ErrorHandler(c, errWithCode, m.processor.InstanceGetV1) return } - local, errWithCode := apiutil.ParseLocal(c.Query(apiutil.LocalKey), false) + page, errWithCode := paging.ParseIDPage(c, + 1, // min limit + 40, // max limit + 20, // default limit + ) if errWithCode != nil { apiutil.ErrorHandler(c, errWithCode, m.processor.InstanceGetV1) return @@ -141,11 +146,8 @@ func (m *Module) HomeTimelineGETHandler(c *gin.Context) { resp, errWithCode := m.processor.Timeline().HomeTimelineGet( c.Request.Context(), - authed, - c.Query(apiutil.MaxIDKey), - c.Query(apiutil.SinceIDKey), - c.Query(apiutil.MinIDKey), - limit, + authed.Account, + page, local, ) if errWithCode != nil { diff --git a/internal/api/client/timelines/list.go b/internal/api/client/timelines/list.go index b02489d6c..2e89f16ea 100644 --- a/internal/api/client/timelines/list.go +++ b/internal/api/client/timelines/list.go @@ -23,6 +23,7 @@ import ( "github.com/gin-gonic/gin" apiutil "github.com/superseriousbusiness/gotosocial/internal/api/util" "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/paging" ) // ListTimelineGETHandler swagger:operation GET /api/v1/timelines/list/{id} listTimeline @@ -131,7 +132,11 @@ func (m *Module) ListTimelineGETHandler(c *gin.Context) { apiutil.ErrorHandler(c, errWithCode, m.processor.InstanceGetV1) } - limit, errWithCode := apiutil.ParseLimit(c.Query(apiutil.LimitKey), 20, 40, 1) + page, errWithCode := paging.ParseIDPage(c, + 1, // min limit + 40, // max limit + 20, // default limit + ) if errWithCode != nil { apiutil.ErrorHandler(c, errWithCode, m.processor.InstanceGetV1) return @@ -139,12 +144,9 @@ func (m *Module) ListTimelineGETHandler(c *gin.Context) { resp, errWithCode := m.processor.Timeline().ListTimelineGet( c.Request.Context(), - authed, + authed.Account, targetListID, - c.Query(apiutil.MaxIDKey), - c.Query(apiutil.SinceIDKey), - c.Query(apiutil.MinIDKey), - limit, + page, ) if errWithCode != nil { apiutil.ErrorHandler(c, errWithCode, m.processor.InstanceGetV1) diff --git a/internal/api/client/timelines/public.go b/internal/api/client/timelines/public.go index d6df36f09..7a4a68b77 100644 --- a/internal/api/client/timelines/public.go +++ b/internal/api/client/timelines/public.go @@ -24,6 +24,7 @@ import ( apiutil "github.com/superseriousbusiness/gotosocial/internal/api/util" "github.com/superseriousbusiness/gotosocial/internal/config" "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/paging" ) // PublicTimelineGETHandler swagger:operation GET /api/v1/timelines/public publicTimeline @@ -141,7 +142,11 @@ func (m *Module) PublicTimelineGETHandler(c *gin.Context) { return } - limit, errWithCode := apiutil.ParseLimit(c.Query(apiutil.LimitKey), 20, 40, 1) + page, errWithCode := paging.ParseIDPage(c, + 1, // min limit + 40, // max limit + 20, // default limit + ) if errWithCode != nil { apiutil.ErrorHandler(c, errWithCode, m.processor.InstanceGetV1) return @@ -156,10 +161,7 @@ func (m *Module) PublicTimelineGETHandler(c *gin.Context) { resp, errWithCode := m.processor.Timeline().PublicTimelineGet( c.Request.Context(), authed.Account, - c.Query(apiutil.MaxIDKey), - c.Query(apiutil.SinceIDKey), - c.Query(apiutil.MinIDKey), - limit, + page, local, ) if errWithCode != nil { diff --git a/internal/cache/cache.go b/internal/cache/cache.go index 7844c03f8..df2a9e49c 100644 --- a/internal/cache/cache.go +++ b/internal/cache/cache.go @@ -46,6 +46,9 @@ type Caches struct { // `[status.ID][status.UpdatedAt.Unix()]` StatusesFilterableFields *ttl.Cache[string, []string] + // Timelines ... + Timelines TimelineCaches + // Visibility provides access to the item visibility // cache. (used by the visibility filter). Visibility VisibilityCache @@ -87,12 +90,14 @@ func (c *Caches) Init() { c.initFollowRequest() c.initFollowRequestIDs() c.initFollowingTagIDs() + c.initHomeTimelines() c.initInReplyToIDs() c.initInstance() c.initInteractionRequest() c.initList() c.initListIDs() c.initListedIDs() + c.initListTimelines() c.initMarker() c.initMedia() c.initMention() @@ -101,6 +106,7 @@ func (c *Caches) Init() { c.initPoll() c.initPollVote() c.initPollVoteIDs() + c.initPublicTimeline() c.initReport() c.initSinBinStatus() c.initStatus() @@ -109,6 +115,7 @@ func (c *Caches) Init() { c.initStatusEdit() c.initStatusFave() c.initStatusFaveIDs() + c.initStatusesFilterableFields() c.initTag() c.initThreadMute() c.initToken() @@ -120,7 +127,6 @@ func (c *Caches) Init() { c.initWebPushSubscription() c.initWebPushSubscriptionIDs() c.initVisibility() - c.initStatusesFilterableFields() } // Start will start any caches that require a background @@ -207,6 +213,8 @@ func (c *Caches) Sweep(threshold float64) { c.DB.User.Trim(threshold) c.DB.UserMute.Trim(threshold) c.DB.UserMuteIDs.Trim(threshold) + c.Timelines.Home.Trim(threshold) + c.Timelines.List.Trim(threshold) c.Visibility.Trim(threshold) } diff --git a/internal/cache/copy.go b/internal/cache/copy.go new file mode 100644 index 000000000..ef7c9c96d --- /dev/null +++ b/internal/cache/copy.go @@ -0,0 +1,42 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cache + +import "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + +func copyStatus(s1 *gtsmodel.Status) *gtsmodel.Status { + s2 := new(gtsmodel.Status) + *s2 = *s1 + + // Don't include ptr fields that + // will be populated separately. + // See internal/db/bundb/status.go. + s2.Account = nil + s2.InReplyTo = nil + s2.InReplyToAccount = nil + s2.BoostOf = nil + s2.BoostOfAccount = nil + s2.Poll = nil + s2.Attachments = nil + s2.Tags = nil + s2.Mentions = nil + s2.Emojis = nil + s2.CreatedWithApplication = nil + + return s2 +} diff --git a/internal/cache/db.go b/internal/cache/db.go index 82cd9ac5f..ff67b6cde 100644 --- a/internal/cache/db.go +++ b/internal/cache/db.go @@ -1345,7 +1345,7 @@ func (c *Caches) initStatus() { }, MaxSize: cap, IgnoreErr: ignoreErrors, - Copy: copyF, + Copy: copyStatus, Invalidate: c.OnInvalidateStatus, }) } diff --git a/internal/cache/timeline.go b/internal/cache/timeline.go new file mode 100644 index 000000000..4f385cafd --- /dev/null +++ b/internal/cache/timeline.go @@ -0,0 +1,131 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cache + +import ( + "codeberg.org/gruf/go-structr" + apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" + "github.com/superseriousbusiness/gotosocial/internal/cache/timeline" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/log" +) + +type TimelineCaches struct { + + // Home ... + Home TimelinesCache[*gtsmodel.Status] + + // List ... + List TimelinesCache[*gtsmodel.Status] + + // Public ... + Public timeline.StatusTimeline +} + +func (c *Caches) initHomeTimelines() { + cap := 1000 + + log.Infof(nil, "cache size = %d", cap) + + c.Timelines.Home.Init(structr.TimelineConfig[*gtsmodel.Status, string]{ + PKey: "StatusID", + Indices: []structr.IndexConfig{ + {Fields: "StatusID"}, + {Fields: "AccountID"}, + {Fields: "BoostOfStatusID"}, + {Fields: "BoostOfAccountID"}, + }, + Copy: copyStatus, + }, cap) +} + +func (c *Caches) initListTimelines() { + cap := 1000 + + log.Infof(nil, "cache size = %d", cap) + + c.Timelines.List.Init(structr.TimelineConfig[*gtsmodel.Status, string]{ + PKey: "StatusID", + Indices: []structr.IndexConfig{ + {Fields: "StatusID"}, + {Fields: "AccountID"}, + {Fields: "BoostOfStatusID"}, + {Fields: "BoostOfAccountID"}, + }, + Copy: copyStatus, + }, cap) +} + +func (c *Caches) initPublicTimeline() { + cap := 1000 + + log.Infof(nil, "cache size = %d", cap) + + c.Timelines.Public.Init(cap) +} + +type TimelineStatus struct { + + // ID ... + ID string + + // AccountID ... + AccountID string + + // BoostOfID ... + BoostOfID string + + // BoostOfAccountID ... + BoostOfAccountID string + + // Local ... + Local bool + + // Loaded is a temporary field that may be + // set for a newly loaded timeline status + // so that statuses don't need to be loaded + // from the database twice in succession. + // + // i.e. this will only be set if the status + // was newly inserted into the timeline cache. + // for existing cache items this will be nil. + Loaded *gtsmodel.Status + + // Prepared contains prepared frontend API + // model for the referenced status. This may + // or may-not be nil depending on whether the + // status has been "unprepared" since the last + // call to "prepare" the frontend model. + Prepared *apimodel.Status +} + +func (s *TimelineStatus) Copy() *TimelineStatus { + var prepared *apimodel.Status + if s.Prepared != nil { + prepared = new(apimodel.Status) + *prepared = *s.Prepared + } + return &TimelineStatus{ + ID: s.ID, + AccountID: s.AccountID, + BoostOfID: s.BoostOfID, + BoostOfAccountID: s.BoostOfAccountID, + Loaded: nil, // NEVER set + Prepared: prepared, + } +} diff --git a/internal/cache/timeline/status.go b/internal/cache/timeline/status.go new file mode 100644 index 000000000..29e185642 --- /dev/null +++ b/internal/cache/timeline/status.go @@ -0,0 +1,508 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package timeline + +import ( + "context" + "slices" + + "codeberg.org/gruf/go-structr" + + apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/log" + "github.com/superseriousbusiness/gotosocial/internal/paging" +) + +// StatusMeta ... +type StatusMeta struct { + + // ID ... + ID string + + // AccountID ... + AccountID string + + // BoostOfID ... + BoostOfID string + + // BoostOfAccountID ... + BoostOfAccountID string + + // Local ... + Local bool + + // prepared contains prepared frontend API + // model for the referenced status. This may + // or may-not be nil depending on whether the + // status has been "unprepared" since the last + // call to "prepare" the frontend model. + prepared *apimodel.Status + + // Loaded is a temporary field that may be + // set for a newly loaded timeline status + // so that statuses don't need to be loaded + // from the database twice in succession. + // + // i.e. this will only be set if the status + // was newly inserted into the timeline cache. + // for existing cache items this will be nil. + loaded *gtsmodel.Status +} + +// StatusTimeline ... +type StatusTimeline struct { + + // underlying cache of *StatusMeta{}, primary-keyed by ID string. + cache structr.Timeline[*StatusMeta, string] + + // fast-access cache indices. + idx_ID *structr.Index //nolint:revive + idx_AccountID *structr.Index //nolint:revive + idx_BoostOfID *structr.Index //nolint:revive + idx_BoostOfAccountID *structr.Index //nolint:revive +} + +// Init ... +func (t *StatusTimeline) Init(cap int) { + t.cache.Init(structr.TimelineConfig[*StatusMeta, string]{ + PKey: "ID", + + Indices: []structr.IndexConfig{ + // ID as primary key is inherently an index. + // {Fields: "ID"}, + {Fields: "AccountID", Multiple: true}, + {Fields: "BoostOfStatusID", Multiple: true}, + {Fields: "BoostOfAccountID", Multiple: true}, + }, + + Copy: func(s *StatusMeta) *StatusMeta { + var prepared *apimodel.Status + if s.prepared != nil { + prepared = new(apimodel.Status) + *prepared = *s.prepared + } + return &StatusMeta{ + ID: s.ID, + AccountID: s.AccountID, + BoostOfID: s.BoostOfID, + BoostOfAccountID: s.BoostOfAccountID, + loaded: nil, // NEVER copied + prepared: prepared, + } + }, + }) + + // Create a fast index lookup ptrs. + t.idx_ID = t.cache.Index("ID") + t.idx_AccountID = t.cache.Index("AccountID") + t.idx_BoostOfID = t.cache.Index("BoostOfID") + t.idx_BoostOfAccountID = t.cache.Index("BoostOfAccountID") +} + +// Load ... +func (t *StatusTimeline) Load( + ctx context.Context, + page *paging.Page, + + // loadPage should load the timeline of given page for cache hydration. + loadPage func(page *paging.Page) ([]*gtsmodel.Status, error), + + // loadIDs should load status models with given IDs. + loadIDs func([]string) ([]*gtsmodel.Status, error), + + // preFilter can be used to perform filtering of returned + // statuses BEFORE insert into cache. i.e. this will effect + // what actually gets stored in the timeline cache. + preFilter func(*gtsmodel.Status) (bool, error), + + // postFilterFn can be used to perform filtering of returned + // statuses AFTER insert into cache. i.e. this will not effect + // what actually gets stored in the timeline cache. + postFilter func(*StatusMeta) bool, + + // prepareAPI should prepare internal status model to frontend API model. + prepareAPI func(*gtsmodel.Status) (*apimodel.Status, error), +) ( + []*apimodel.Status, + error, +) { + switch { + case page == nil: + panic("nil page") + case loadPage == nil: + panic("nil load page func") + } + + // Get paging details. + min := page.Min.Value + max := page.Max.Value + lim := page.Limit + ord := page.Order() + dir := toDirection(ord) + + // Load cached timeline entries for page. + meta := t.cache.Select(min, max, lim, dir) + + // Perform any timeline post-filtering. + meta = doPostFilter(meta, postFilter) + + // ... + if need := len(meta) - lim; need > 0 { + + // Set first page + // query to load. + nextPg := page + + // Perform a maximum of 5 + // load attempts fetching + // statuses to reach limit. + for i := 0; i < 5; i++ { + + // Load next timeline statuses. + statuses, err := loadPage(nextPg) + if err != nil { + return nil, gtserror.Newf("error loading timeline: %w", err) + } + + // No more statuses from + // load function = at end. + if len(statuses) == 0 { + break + } + + // Get the lowest and highest + // ID values, used for next pg. + // Done BEFORE status filtering. + lo := statuses[len(statuses)-1].ID + hi := statuses[0].ID + + // Perform any status timeline pre-filtering. + statuses, err = doPreFilter(statuses, preFilter) + if err != nil { + return nil, gtserror.Newf("error pre-filtering timeline: %w", err) + } + + // Convert to our cache type, + // these will get inserted into + // the cache in prepare() below. + m := toStatusMeta(statuses) + + // Perform any post-filtering. + // and append to main meta slice. + m = slices.DeleteFunc(m, postFilter) + meta = append(meta, m...) + + // Check if we reached + // requested page limit. + if len(meta) >= lim { + break + } + + // Set next paging value. + nextPg = nextPg.Next(lo, hi) + } + } + + // Using meta and given funcs, prepare frontend API models. + apiStatuses, err := t.prepare(ctx, meta, loadIDs, prepareAPI) + if err != nil { + return nil, gtserror.Newf("error preparing api statuses: %w", err) + } + + // Ensure the returned statuses are ALWAYS in descending order. + slices.SortFunc(apiStatuses, func(s1, s2 *apimodel.Status) int { + const k = +1 + switch { + case s1.ID > s2.ID: + return +k + case s1.ID < s2.ID: + return -k + default: + return 0 + } + }) + + return apiStatuses, nil +} + +// RemoveByStatusID removes all cached timeline entries pertaining to +// status ID, including those that may be a boost of the given status. +func (t *StatusTimeline) RemoveByStatusIDs(statusIDs ...string) { + keys := make([]structr.Key, len(statusIDs)) + + // Convert statusIDs to index keys. + for i, id := range statusIDs { + keys[i] = t.idx_ID.Key(id) + } + + // Invalidate all cached entries with IDs. + t.cache.Invalidate(t.idx_ID, keys...) + + // Convert statusIDs to index keys. + for i, id := range statusIDs { + keys[i] = t.idx_BoostOfID.Key(id) + } + + // Invalidate all cached entries as boost of IDs. + t.cache.Invalidate(t.idx_BoostOfID, keys...) +} + +// RemoveByAccountID removes all cached timeline entries authored by +// account ID, including those that may be boosted by account ID. +func (t *StatusTimeline) RemoveByAccountIDs(accountIDs ...string) { + keys := make([]structr.Key, len(accountIDs)) + + // Convert accountIDs to index keys. + for i, id := range accountIDs { + keys[i] = t.idx_AccountID.Key(id) + } + + // Invalidate all cached entries as by IDs. + t.cache.Invalidate(t.idx_AccountID, keys...) + + // Convert accountIDs to index keys. + for i, id := range accountIDs { + keys[i] = t.idx_BoostOfAccountID.Key(id) + } + + // Invalidate all cached entries as boosted by IDs. + t.cache.Invalidate(t.idx_BoostOfAccountID, keys...) +} + +// UnprepareByStatusIDs removes cached frontend API models for all cached +// timeline entries pertaining to status ID, including boosts of given status. +func (t *StatusTimeline) UnprepareByStatusIDs(statusIDs ...string) { + keys := make([]structr.Key, len(statusIDs)) + + // Convert statusIDs to index keys. + for i, id := range statusIDs { + keys[i] = t.idx_ID.Key(id) + } + + // TODO: replace below with for-range-function loop when Go1.23. + t.cache.RangeKeys(t.idx_ID, keys...)(func(meta *StatusMeta) bool { + meta.prepared = nil + return true + }) + + // Convert statusIDs to index keys. + for i, id := range statusIDs { + keys[i] = t.idx_BoostOfID.Key(id) + } + + // TODO: replace below with for-range-function loop when Go1.23. + t.cache.RangeKeys(t.idx_BoostOfID, keys...)(func(meta *StatusMeta) bool { + meta.prepared = nil + return true + }) +} + +// UnprepareByAccountIDs removes cached frontend API models for all cached +// timeline entries authored by account ID, including boosts by account ID. +func (t *StatusTimeline) UnprepareByAccountIDs(accountIDs ...string) { + keys := make([]structr.Key, len(accountIDs)) + + // Convert accountIDs to index keys. + for i, id := range accountIDs { + keys[i] = t.idx_AccountID.Key(id) + } + + // TODO: replace below with for-range-function loop when Go1.23. + t.cache.RangeKeys(t.idx_AccountID, keys...)(func(meta *StatusMeta) bool { + meta.prepared = nil + return true + }) + + // Convert accountIDs to index keys. + for i, id := range accountIDs { + keys[i] = t.idx_BoostOfAccountID.Key(id) + } + + // TODO: replace below with for-range-function loop when Go1.23. + t.cache.RangeKeys(t.idx_BoostOfAccountID, keys...)(func(meta *StatusMeta) bool { + meta.prepared = nil + return true + }) +} + +// Clear will remove all cached entries from timeline. +func (t *StatusTimeline) Clear() { t.cache.Clear() } + +// prepare will take a slice of cached (or, freshly loaded!) StatusMeta{} +// models, and use given functions to return prepared frontend API models. +func (t *StatusTimeline) prepare( + ctx context.Context, + meta []*StatusMeta, + loadIDs func([]string) ([]*gtsmodel.Status, error), + prepareAPI func(*gtsmodel.Status) (*apimodel.Status, error), +) ( + []*apimodel.Status, + error, +) { + switch { + case loadIDs == nil: + panic("nil load fn") + case prepareAPI == nil: + panic("nil prepare fn") + } + + // Iterate the given StatusMeta objects for pre-prepared frontend + // models, otherwise storing as unprepared for further processing. + apiStatuses := make([]*apimodel.Status, len(meta)) + unprepared := make([]*StatusMeta, 0, len(meta)) + for i, meta := range meta { + apiStatuses[i] = meta.prepared + if meta.prepared == nil { + unprepared = append(unprepared, meta) + } + } + + // If there were no unprepared + // StatusMeta objects, then we + // gathered everything we need! + if len(unprepared) == 0 { + return apiStatuses, nil + } + + // Of the StatusMeta objects missing a prepared + // frontend model, find those without a recently + // fetched database model and store their IDs, + // as well mapping them for faster update below. + toLoadIDs := make([]string, len(unprepared)) + loadedMap := make(map[string]*StatusMeta, len(unprepared)) + for i, meta := range unprepared { + if meta.loaded == nil { + toLoadIDs[i] = meta.ID + loadedMap[meta.ID] = meta + } + } + + // Load statuses with given IDs. + loaded, err := loadIDs(toLoadIDs) + if err != nil { + return nil, gtserror.Newf("error loading statuses: %w", err) + } + + // Update returned StatusMeta objects + // with newly loaded statuses by IDs. + for i := range loaded { + status := loaded[i] + meta := loadedMap[status.ID] + meta.loaded = status + } + + for i := 0; i < len(unprepared); { + // Get meta at index. + meta := unprepared[i] + + if meta.loaded == nil { + // We failed loading this + // status, skip preparing. + continue + } + + // Prepare the provided status to frontend. + apiStatus, err := prepareAPI(meta.loaded) + if err != nil { + log.Errorf(ctx, "error preparing status %s: %v", meta.loaded.URI, err) + continue + } + + if apiStatus != nil { + // TODO: we won't need nil check when mutes + // / filters are moved to appropriate funcs. + apiStatuses = append(apiStatuses, apiStatus) + } + } + + // Re-insert all (previously) unprepared + // status meta types into timeline cache. + t.cache.Insert(unprepared...) + + return apiStatuses, nil +} + +// toStatusMeta converts a slice of database model statuses +// into our cache wrapper type, a slice of []StatusMeta{}. +func toStatusMeta(statuses []*gtsmodel.Status) []*StatusMeta { + meta := make([]*StatusMeta, len(statuses)) + for i := range statuses { + status := statuses[i] + meta[i] = &StatusMeta{ + ID: status.ID, + AccountID: status.AccountID, + BoostOfID: status.BoostOfID, + BoostOfAccountID: status.BoostOfAccountID, + Local: *status.Local, + loaded: status, + prepared: nil, + } + } + return meta +} + +// doPreFilter acts similarly to slices.DeleteFunc but it accepts function with error return, or nil, returning early if so. +func doPreFilter(statuses []*gtsmodel.Status, preFilter func(*gtsmodel.Status) (bool, error)) ([]*gtsmodel.Status, error) { + if preFilter == nil { + return statuses, nil + } + + // Iterate through input statuses. + for i := 0; i < len(statuses); { + status := statuses[i] + + // Pass through filter func. + ok, err := preFilter(status) + if err != nil { + return nil, err + } + + if ok { + // Delete this status from input slice. + statuses = slices.Delete(statuses, i, i+1) + continue + } + + // Iter. + i++ + } + + return statuses, nil +} + +// doPostFilter acts similarly to slices.DeleteFunc but it handles case of a nil function. +func doPostFilter(statuses []*StatusMeta, postFilter func(*StatusMeta) bool) []*StatusMeta { + if postFilter == nil { + return statuses + } + return slices.DeleteFunc(statuses, postFilter) +} + +// toDirection converts page order to timeline direction. +func toDirection(o paging.Order) structr.Direction { + switch o { + case paging.OrderAscending: + return structr.Asc + case paging.OrderDescending: + return structr.Desc + default: + return false + } +} diff --git a/internal/cache/wrappers.go b/internal/cache/wrappers.go index 9cb4fca98..1754fbf9b 100644 --- a/internal/cache/wrappers.go +++ b/internal/cache/wrappers.go @@ -18,28 +18,31 @@ package cache import ( + "maps" "slices" + "sync/atomic" "codeberg.org/gruf/go-cache/v3/simple" "codeberg.org/gruf/go-structr" + "github.com/superseriousbusiness/gotosocial/internal/paging" ) // SliceCache wraps a simple.Cache to provide simple loader-callback // functions for fetching + caching slices of objects (e.g. IDs). type SliceCache[T any] struct { - cache simple.Cache[string, []T] + simple.Cache[string, []T] } // Init initializes the cache with given length + capacity. func (c *SliceCache[T]) Init(len, cap int) { - c.cache = simple.Cache[string, []T]{} - c.cache.Init(len, cap) + c.Cache = simple.Cache[string, []T]{} + c.Cache.Init(len, cap) } // Load will attempt to load an existing slice from cache for key, else calling load function and caching the result. func (c *SliceCache[T]) Load(key string, load func() ([]T, error)) ([]T, error) { // Look for cached values. - data, ok := c.cache.Get(key) + data, ok := c.Cache.Get(key) if !ok { var err error @@ -51,7 +54,7 @@ func (c *SliceCache[T]) Load(key string, load func() ([]T, error)) ([]T, error) } // Store the data. - c.cache.Set(key, data) + c.Cache.Set(key, data) } // Return data clone for safety. @@ -60,27 +63,7 @@ func (c *SliceCache[T]) Load(key string, load func() ([]T, error)) ([]T, error) // Invalidate: see simple.Cache{}.InvalidateAll(). func (c *SliceCache[T]) Invalidate(keys ...string) { - _ = c.cache.InvalidateAll(keys...) -} - -// Trim: see simple.Cache{}.Trim(). -func (c *SliceCache[T]) Trim(perc float64) { - c.cache.Trim(perc) -} - -// Clear: see simple.Cache{}.Clear(). -func (c *SliceCache[T]) Clear() { - c.cache.Clear() -} - -// Len: see simple.Cache{}.Len(). -func (c *SliceCache[T]) Len() int { - return c.cache.Len() -} - -// Cap: see simple.Cache{}.Cap(). -func (c *SliceCache[T]) Cap() int { - return c.cache.Cap() + _ = c.Cache.InvalidateAll(keys...) } // StructCache wraps a structr.Cache{} to simple index caching @@ -89,17 +72,17 @@ func (c *SliceCache[T]) Cap() int { // name under the main database caches struct which would reduce // time required to access cached values). type StructCache[StructType any] struct { - cache structr.Cache[StructType] + structr.Cache[StructType] index map[string]*structr.Index } // Init initializes the cache with given structr.CacheConfig{}. func (c *StructCache[T]) Init(config structr.CacheConfig[T]) { c.index = make(map[string]*structr.Index, len(config.Indices)) - c.cache = structr.Cache[T]{} - c.cache.Init(config) + c.Cache = structr.Cache[T]{} + c.Cache.Init(config) for _, cfg := range config.Indices { - c.index[cfg.Fields] = c.cache.Index(cfg.Fields) + c.index[cfg.Fields] = c.Cache.Index(cfg.Fields) } } @@ -107,26 +90,21 @@ func (c *StructCache[T]) Init(config structr.CacheConfig[T]) { // Note: this also handles conversion of the untyped (any) keys to structr.Key{} via structr.Index{}. func (c *StructCache[T]) GetOne(index string, key ...any) (T, bool) { i := c.index[index] - return c.cache.GetOne(i, i.Key(key...)) + return c.Cache.GetOne(i, i.Key(key...)) } // Get calls structr.Cache{}.Get(), using a cached structr.Index{} by 'index' name. // Note: this also handles conversion of the untyped (any) keys to structr.Key{} via structr.Index{}. func (c *StructCache[T]) Get(index string, keys ...[]any) []T { i := c.index[index] - return c.cache.Get(i, i.Keys(keys...)...) -} - -// Put: see structr.Cache{}.Put(). -func (c *StructCache[T]) Put(values ...T) { - c.cache.Put(values...) + return c.Cache.Get(i, i.Keys(keys...)...) } // LoadOne calls structr.Cache{}.LoadOne(), using a cached structr.Index{} by 'index' name. // Note: this also handles conversion of the untyped (any) keys to structr.Key{} via structr.Index{}. func (c *StructCache[T]) LoadOne(index string, load func() (T, error), key ...any) (T, error) { i := c.index[index] - return c.cache.LoadOne(i, i.Key(key...), load) + return c.Cache.LoadOne(i, i.Key(key...), load) } // LoadIDs calls structr.Cache{}.Load(), using a cached structr.Index{} by 'index' name. Note: this also handles @@ -149,7 +127,7 @@ func (c *StructCache[T]) LoadIDs(index string, ids []string, load func([]string) } // Pass loader callback with wrapper onto main cache load function. - return c.cache.Load(i, keys, func(uncached []structr.Key) ([]T, error) { + return c.Cache.Load(i, keys, func(uncached []structr.Key) ([]T, error) { uncachedIDs := make([]string, len(uncached)) for i := range uncached { uncachedIDs[i] = uncached[i].Values()[0].(string) @@ -177,7 +155,7 @@ func (c *StructCache[T]) LoadIDs2Part(index string, id1 string, id2s []string, l } // Pass loader callback with wrapper onto main cache load function. - return c.cache.Load(i, keys, func(uncached []structr.Key) ([]T, error) { + return c.Cache.Load(i, keys, func(uncached []structr.Key) ([]T, error) { uncachedIDs := make([]string, len(uncached)) for i := range uncached { uncachedIDs[i] = uncached[i].Values()[1].(string) @@ -186,16 +164,11 @@ func (c *StructCache[T]) LoadIDs2Part(index string, id1 string, id2s []string, l }) } -// Store: see structr.Cache{}.Store(). -func (c *StructCache[T]) Store(value T, store func() error) error { - return c.cache.Store(value, store) -} - // Invalidate calls structr.Cache{}.Invalidate(), using a cached structr.Index{} by 'index' name. // Note: this also handles conversion of the untyped (any) keys to structr.Key{} via structr.Index{}. func (c *StructCache[T]) Invalidate(index string, key ...any) { i := c.index[index] - c.cache.Invalidate(i, i.Key(key...)) + c.Cache.Invalidate(i, i.Key(key...)) } // InvalidateIDs calls structr.Cache{}.Invalidate(), using a cached structr.Index{} by 'index' name. Note: this also @@ -218,25 +191,220 @@ func (c *StructCache[T]) InvalidateIDs(index string, ids []string) { } // Pass to main invalidate func. - c.cache.Invalidate(i, keys...) + c.Cache.Invalidate(i, keys...) } -// Trim: see structr.Cache{}.Trim(). -func (c *StructCache[T]) Trim(perc float64) { - c.cache.Trim(perc) +type TimelineCache[T any] struct { + structr.Timeline[T, string] + index map[string]*structr.Index + maxSz int } -// Clear: see structr.Cache{}.Clear(). -func (c *StructCache[T]) Clear() { - c.cache.Clear() +func (t *TimelineCache[T]) Init(config structr.TimelineConfig[T, string], maxSz int) { + t.index = make(map[string]*structr.Index, len(config.Indices)) + t.Timeline = structr.Timeline[T, string]{} + t.Timeline.Init(config) + for _, cfg := range config.Indices { + t.index[cfg.Fields] = t.Timeline.Index(cfg.Fields) + } + t.maxSz = maxSz } -// Len: see structr.Cache{}.Len(). -func (c *StructCache[T]) Len() int { - return c.cache.Len() +func toDirection(order paging.Order) structr.Direction { + switch order { + case paging.OrderAscending: + return structr.Asc + case paging.OrderDescending: + return structr.Desc + default: + panic("invalid order") + } } -// Cap: see structr.Cache{}.Cap(). -func (c *StructCache[T]) Cap() int { - return c.cache.Cap() +func (t *TimelineCache[T]) Select(page *paging.Page) []T { + min, max := page.Min.Value, page.Max.Value + lim, dir := page.Limit, toDirection(page.Order()) + return t.Timeline.Select(min, max, lim, dir) } + +func (t *TimelineCache[T]) Invalidate(index string, keyParts ...any) { + i := t.index[index] + t.Timeline.Invalidate(i, i.Key(keyParts...)) +} + +func (t *TimelineCache[T]) Trim(perc float64) { + t.Timeline.Trim(perc, t.maxSz, structr.Asc) +} + +func (t *TimelineCache[T]) InvalidateIDs(index string, ids []string) { + i := t.index[index] + if i == nil { + // we only perform this check here as + // we're going to use the index before + // passing it to cache in main .Load(). + panic("missing index for cache type") + } + + // Generate cache keys for ID types. + keys := make([]structr.Key, len(ids)) + for x, id := range ids { + keys[x] = i.Key(id) + } + + // Pass to main invalidate func. + t.Timeline.Invalidate(i, keys...) +} + +// TimelinesCache provides a cache of TimelineCache{} +// objects, keyed by string and concurrency safe, optimized +// almost entirely for reads. On each creation of a new key +// in the cache, the entire internal map will be cloned, BUT +// all reads are only a single atomic operation, no mutex locks! +type TimelinesCache[T any] struct { + cfg structr.TimelineConfig[T, string] + ptr atomic.Pointer[map[string]*TimelineCache[T]] // ronly except by CAS + max int +} + +// Init ... +func (t *TimelinesCache[T]) Init(config structr.TimelineConfig[T, string], max int) { + // Create new test timeline to validate. + (&TimelineCache[T]{}).Init(config, max) + + // Invalidate + // timeline maps. + t.ptr.Store(nil) + + // Set config. + t.cfg = config + t.max = max +} + +// Get fetches a timeline with given ID from cache, creating it if required. +func (t *TimelinesCache[T]) Get(id string) *TimelineCache[T] { + var tt *TimelineCache[T] + + for { + // Load current ptr. + cur := t.ptr.Load() + + // Get timeline map to work on. + var m map[string]*TimelineCache[T] + + if cur != nil { + // Look for existing + // timeline in cache. + tt = (*cur)[id] + if tt != nil { + return tt + } + + // Get clone of current + // before modifications. + m = maps.Clone(*cur) + } else { + // Allocate new timeline map for below. + m = make(map[string]*TimelineCache[T]) + } + + if tt == nil { + // Allocate new timeline. + tt = new(TimelineCache[T]) + tt.Init(t.cfg, t.max) + } + + // Store timeline + // in new map. + m[id] = tt + + // Attempt to update the map ptr. + if !t.ptr.CompareAndSwap(cur, &m) { + + // We failed the + // CAS, reloop. + continue + } + + // Successfully inserted + // new timeline model. + return tt + } +} + +// Delete removes timeline with ID from cache. +func (t *TimelinesCache[T]) Delete(id string) { + for { + // Load current ptr. + cur := t.ptr.Load() + + // Check for empty map / not in map. + if cur == nil || (*cur)[id] == nil { + return + } + + // Get clone of current + // before modifications. + m := maps.Clone(*cur) + + // Delete ID. + delete(m, id) + + // Attempt to update the map ptr. + if !t.ptr.CompareAndSwap(cur, &m) { + + // We failed the + // CAS, reloop. + continue + } + + // Successfully + // deleted ID. + return + } +} + +func (t *TimelinesCache[T]) Insert(values ...T) { + if p := t.ptr.Load(); p != nil { + for _, timeline := range *p { + timeline.Insert(values...) + } + } +} + +func (t *TimelinesCache[T]) InsertInto(id string, values ...T) { + t.Get(id).Insert(values...) +} + +func (t *TimelinesCache[T]) Invalidate(index string, keyParts ...any) { + if p := t.ptr.Load(); p != nil { + for _, timeline := range *p { + timeline.Invalidate(index, keyParts...) + } + } +} + +func (t *TimelinesCache[T]) InvalidateFrom(id string, index string, keyParts ...any) { + t.Get(id).Invalidate(index, keyParts...) +} + +func (t *TimelinesCache[T]) InvalidateIDs(index string, ids []string) { + if p := t.ptr.Load(); p != nil { + for _, timeline := range *p { + timeline.InvalidateIDs(index, ids) + } + } +} + +func (t *TimelinesCache[T]) InvalidateIDsFrom(id string, index string, ids []string) { + t.Get(id).InvalidateIDs(index, ids) +} + +func (t *TimelinesCache[T]) Trim(perc float64) { + if p := t.ptr.Load(); p != nil { + for _, timeline := range *p { + timeline.Trim(perc) + } + } +} + +func (t *TimelinesCache[T]) Clear(id string) { t.Get(id).Clear() } diff --git a/internal/db/bundb/timeline.go b/internal/db/bundb/timeline.go index 404cb6601..05db33d1a 100644 --- a/internal/db/bundb/timeline.go +++ b/internal/db/bundb/timeline.go @@ -20,15 +20,13 @@ package bundb import ( "context" "errors" - "fmt" "slices" - "time" "github.com/superseriousbusiness/gotosocial/internal/db" "github.com/superseriousbusiness/gotosocial/internal/gtscontext" "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" - "github.com/superseriousbusiness/gotosocial/internal/id" + "github.com/superseriousbusiness/gotosocial/internal/paging" "github.com/superseriousbusiness/gotosocial/internal/state" "github.com/uptrace/bun" ) @@ -38,161 +36,97 @@ type timelineDB struct { state *state.State } -func (t *timelineDB) GetHomeTimeline(ctx context.Context, accountID string, maxID string, sinceID string, minID string, limit int, local bool) ([]*gtsmodel.Status, error) { - // Ensure reasonable - if limit < 0 { - limit = 0 - } +func (t *timelineDB) GetHomeTimeline(ctx context.Context, accountID string, page *paging.Page) ([]*gtsmodel.Status, error) { + return loadStatusTimelinePage(ctx, t.db, t.state, - // Make educated guess for slice size - var ( - statusIDs = make([]string, 0, limit) - frontToBack = true - ) + // Paging + // params. + page, - // As this is the home timeline, it should be - // populated by statuses from accounts followed - // by accountID, and posts from accountID itself. - // - // So, begin by seeing who accountID follows. - // It should be a little cheaper to do this in - // a separate query like this, rather than using - // a join, since followIDs are cached in memory. - follows, err := t.state.DB.GetAccountFollows( - gtscontext.SetBarebones(ctx), - accountID, - nil, // select all - ) - if err != nil && !errors.Is(err, db.ErrNoEntries) { - return nil, gtserror.Newf("db error getting follows for account %s: %w", accountID, err) - } + // The actual meat of the home-timeline query, outside + // of any paging parameters that selects by followings. + func(q *bun.SelectQuery) (*bun.SelectQuery, error) { - // To take account of exclusive lists, get all of - // this account's lists, so we can filter out follows - // that are in contained in exclusive lists. - lists, err := t.state.DB.GetListsByAccountID(ctx, accountID) - if err != nil && !errors.Is(err, db.ErrNoEntries) { - return nil, gtserror.Newf("db error getting lists for account %s: %w", accountID, err) - } - - // Index all follow IDs that fall in exclusive lists. - ignoreFollowIDs := make(map[string]struct{}) - for _, list := range lists { - if !*list.Exclusive { - // Not exclusive, - // we don't care. - continue - } - - // Fetch all follow IDs of the entries ccontained in this list. - listFollowIDs, err := t.state.DB.GetFollowIDsInList(ctx, list.ID, nil) - if err != nil && !errors.Is(err, db.ErrNoEntries) { - return nil, gtserror.Newf("db error getting list entry follow ids: %w", err) - } - - // Exclusive list, index all its follow IDs. - for _, followID := range listFollowIDs { - ignoreFollowIDs[followID] = struct{}{} - } - } - - // Extract just the accountID from each follow, - // ignoring follows that are in exclusive lists. - targetAccountIDs := make([]string, 0, len(follows)+1) - for _, f := range follows { - _, ignore := ignoreFollowIDs[f.ID] - if !ignore { - targetAccountIDs = append( - targetAccountIDs, - f.TargetAccountID, + // As this is the home timeline, it should be + // populated by statuses from accounts followed + // by accountID, and posts from accountID itself. + // + // So, begin by seeing who accountID follows. + // It should be a little cheaper to do this in + // a separate query like this, rather than using + // a join, since followIDs are cached in memory. + follows, err := t.state.DB.GetAccountFollows( + gtscontext.SetBarebones(ctx), + accountID, + nil, // select all ) - } - } + if err != nil && !errors.Is(err, db.ErrNoEntries) { + return nil, gtserror.Newf("db error getting follows for account %s: %w", accountID, err) + } - // Add accountID itself as a pseudo follow so that - // accountID can see its own posts in the timeline. - targetAccountIDs = append(targetAccountIDs, accountID) + // To take account of exclusive lists, get all of + // this account's lists, so we can filter out follows + // that are in contained in exclusive lists. + lists, err := t.state.DB.GetListsByAccountID(ctx, accountID) + if err != nil && !errors.Is(err, db.ErrNoEntries) { + return nil, gtserror.Newf("db error getting lists for account %s: %w", accountID, err) + } - // Now start building the database query. - q := t.db. - NewSelect(). - TableExpr("? AS ?", bun.Ident("statuses"), bun.Ident("status")). - // Select only IDs from table - Column("status.id") + // Index all follow IDs that fall in exclusive lists. + ignoreFollowIDs := make(map[string]struct{}) + for _, list := range lists { + if !*list.Exclusive { + // Not exclusive, + // we don't care. + continue + } - if maxID == "" || maxID >= id.Highest { - const future = 24 * time.Hour + // Fetch all follow IDs of the entries ccontained in this list. + listFollowIDs, err := t.state.DB.GetFollowIDsInList(ctx, list.ID, nil) + if err != nil && !errors.Is(err, db.ErrNoEntries) { + return nil, gtserror.Newf("db error getting list entry follow ids: %w", err) + } - // don't return statuses more than 24hr in the future - maxID = id.NewULIDFromTime(time.Now().Add(future)) - } + // Exclusive list, index all its follow IDs. + for _, followID := range listFollowIDs { + ignoreFollowIDs[followID] = struct{}{} + } + } - // return only statuses LOWER (ie., older) than maxID - q = q.Where("? < ?", bun.Ident("status.id"), maxID) + // Extract just the accountID from each follow, + // ignoring follows that are in exclusive lists. + targetAccountIDs := make([]string, 0, len(follows)+1) + for _, f := range follows { + _, ignore := ignoreFollowIDs[f.ID] + if !ignore { + targetAccountIDs = append( + targetAccountIDs, + f.TargetAccountID, + ) + } + } - if sinceID != "" { - // return only statuses HIGHER (ie., newer) than sinceID - q = q.Where("? > ?", bun.Ident("status.id"), sinceID) - } + // Add accountID itself as a pseudo follow so that + // accountID can see its own posts in the timeline. + targetAccountIDs = append(targetAccountIDs, accountID) - if minID != "" { - // return only statuses HIGHER (ie., newer) than minID - q = q.Where("? > ?", bun.Ident("status.id"), minID) + // Select only statuses authored by + // accounts with IDs in the slice. + q = q.Where( + "? IN (?)", + bun.Ident("account_id"), + bun.In(targetAccountIDs), + ) - // page up - frontToBack = false - } + // Only include statuses that aren't pending approval. + q = q.Where("NOT ? = ?", bun.Ident("pending_approval"), true) - if local { - // return only statuses posted by local account havers - q = q.Where("? = ?", bun.Ident("status.local"), local) - } - - // Select only statuses authored by - // accounts with IDs in the slice. - q = q.Where( - "? IN (?)", - bun.Ident("status.account_id"), - bun.In(targetAccountIDs), + return q, nil + }, ) - - // Only include statuses that aren't pending approval. - q = q.Where("NOT ? = ?", bun.Ident("status.pending_approval"), true) - - if limit > 0 { - // limit amount of statuses returned - q = q.Limit(limit) - } - - if frontToBack { - // Page down. - q = q.Order("status.id DESC") - } else { - // Page up. - q = q.Order("status.id ASC") - } - - if err := q.Scan(ctx, &statusIDs); err != nil { - return nil, err - } - - if len(statusIDs) == 0 { - return nil, nil - } - - // If we're paging up, we still want statuses - // to be sorted by ID desc, so reverse ids slice. - // https://zchee.github.io/golang-wiki/SliceTricks/#reversing - if !frontToBack { - for l, r := 0, len(statusIDs)-1; l < r; l, r = l+1, r-1 { - statusIDs[l], statusIDs[r] = statusIDs[r], statusIDs[l] - } - } - - // Return status IDs loaded from cache + db. - return t.state.DB.GetStatusesByIDs(ctx, statusIDs) } +<<<<<<< HEAD func (t *timelineDB) GetPublicTimeline( ctx context.Context, maxID string, @@ -289,6 +223,28 @@ func (t *timelineDB) GetPublicTimeline( // Return status IDs loaded from cache + db. return t.state.DB.GetStatusesByIDs(ctx, statusIDs) +======= +func (t *timelineDB) GetPublicTimeline(ctx context.Context, page *paging.Page) ([]*gtsmodel.Status, error) { + return loadStatusTimelinePage(ctx, t.db, t.state, + + // Paging + // params. + page, + + func(q *bun.SelectQuery) (*bun.SelectQuery, error) { + // Public only. + q = q.Where("? = ?", bun.Ident("visibility"), gtsmodel.VisibilityPublic) + + // Ignore boosts. + q = q.Where("? IS NULL", bun.Ident("boost_of_id")) + + // Only include statuses that aren't pending approval. + q = q.Where("NOT ? = ?", bun.Ident("pending_approval"), true) + + return q, nil + }, + ) +>>>>>>> 6f0abe7fb (start work rewriting timeline cache type) } func (t *timelineDB) getLocalTimeline( @@ -378,6 +334,7 @@ func (t *timelineDB) getLocalTimeline( // TODO optimize this query and the logic here, because it's slow as balls -- it takes like a literal second to return with a limit of 20! // It might be worth serving it through a timeline instead of raw DB queries, like we do for Home feeds. func (t *timelineDB) GetFavedTimeline(ctx context.Context, accountID string, maxID string, minID string, limit int) ([]*gtsmodel.Status, string, string, error) { + // Ensure reasonable if limit < 0 { limit = 0 @@ -442,205 +399,138 @@ func (t *timelineDB) GetFavedTimeline(ctx context.Context, accountID string, max return statuses, nextMaxID, prevMinID, nil } -func (t *timelineDB) GetListTimeline( +func (t *timelineDB) GetListTimeline(ctx context.Context, listID string, page *paging.Page) ([]*gtsmodel.Status, error) { + return loadStatusTimelinePage(ctx, t.db, t.state, + + // Paging + // params. + page, + + // The actual meat of the list-timeline query, outside + // of any paging parameters, it selects by list entries. + func(q *bun.SelectQuery) (*bun.SelectQuery, error) { + + // Fetch all follow IDs contained in list from DB. + followIDs, err := t.state.DB.GetFollowIDsInList( + ctx, listID, nil, + ) + if err != nil { + return nil, gtserror.Newf("error getting follows in list: %w", err) + } + + // Select target account + // IDs from list follows. + subQ := t.db.NewSelect(). + TableExpr("? AS ?", bun.Ident("follows"), bun.Ident("follow")). + Column("follow.target_account_id"). + Where("? IN (?)", bun.Ident("follow.id"), bun.In(followIDs)) + q = q.Where("? IN (?)", bun.Ident("status.account_id"), subQ) + + // Only include statuses that aren't pending approval. + q = q.Where("NOT ? = ?", bun.Ident("pending_approval"), true) + + return q, nil + }, + ) +} + +func (t *timelineDB) GetTagTimeline(ctx context.Context, tagID string, page *paging.Page) ([]*gtsmodel.Status, error) { + return loadStatusTimelinePage(ctx, t.db, t.state, + + // Paging + // params. + page, + + // The actual meat of the list-timeline query, outside of any + // paging params, selects by status tags with public visibility. + func(q *bun.SelectQuery) (*bun.SelectQuery, error) { + + // ... + q = q.Join( + "INNER JOIN ? ON ? = ?", + bun.Ident("status_to_tags"), + bun.Ident("status.id"), bun.Ident("status_to_tags.status_id"), + ) + + // This tag only. + q = q.Where("? = ?", bun.Ident("status_to_tags.tag_id"), tagID) + + // Public only. + q = q.Where("? = ?", bun.Ident("status.visibility"), gtsmodel.VisibilityPublic) + + return q, nil + }, + ) +} + +func loadStatusTimelinePage( ctx context.Context, - listID string, - maxID string, - sinceID string, - minID string, - limit int, -) ([]*gtsmodel.Status, error) { - // Ensure reasonable - if limit < 0 { - limit = 0 + db *bun.DB, + state *state.State, + page *paging.Page, + query func(*bun.SelectQuery) (*bun.SelectQuery, error), +) ( + []*gtsmodel.Status, + error, +) { + // Extract page params. + minID := page.Min.Value + maxID := page.Max.Value + limit := page.Limit + order := page.Order() + + // Pre-allocate slice of IDs as dest. + statusIDs := make([]string, 0, limit) + + // Now start building the database query. + // + // Select the following: + // - status ID + q := db.NewSelect(). + Table("statuses"). + Column("id") + + if maxID != "" { + // Set a maximum ID boundary if was given. + q = q.Where("? < ?", bun.Ident("id"), maxID) } - // Make educated guess for slice size - var ( - statusIDs = make([]string, 0, limit) - frontToBack = true - ) + if minID != "" { + // Set a minimum ID boundary if was given. + q = q.Where("? > ?", bun.Ident("id"), minID) + } - // Fetch all follow IDs contained in list from DB. - followIDs, err := t.state.DB.GetFollowIDsInList( - ctx, listID, nil, - ) + // Append caller + // query details. + q, err := query(q) if err != nil { - return nil, fmt.Errorf("error getting follows in list: %w", err) + return nil, err } - // If there's no list follows we can't - // possibly return anything for this list. - if len(followIDs) == 0 { - return make([]*gtsmodel.Status, 0), nil + // Set ordering. + switch order { + case paging.OrderAscending: + q = q.OrderExpr("? ASC", bun.Ident("id")) + case paging.OrderDescending: + q = q.OrderExpr("? DESC", bun.Ident("id")) } - // Select target account IDs from follows. - subQ := t.db. - NewSelect(). - TableExpr("? AS ?", bun.Ident("follows"), bun.Ident("follow")). - Column("follow.target_account_id"). - Where("? IN (?)", bun.Ident("follow.id"), bun.In(followIDs)) - - // Select only status IDs created - // by one of the followed accounts. - q := t.db. - NewSelect(). - TableExpr("? AS ?", bun.Ident("statuses"), bun.Ident("status")). - // Select only IDs from table - Column("status.id"). - Where("? IN (?)", bun.Ident("status.account_id"), subQ) - - if maxID == "" || maxID >= id.Highest { - const future = 24 * time.Hour - - // don't return statuses more than 24hr in the future - maxID = id.NewULIDFromTime(time.Now().Add(future)) - } - - // return only statuses LOWER (ie., older) than maxID - q = q.Where("? < ?", bun.Ident("status.id"), maxID) - - if sinceID != "" { - // return only statuses HIGHER (ie., newer) than sinceID - q = q.Where("? > ?", bun.Ident("status.id"), sinceID) - } - - if minID != "" { - // return only statuses HIGHER (ie., newer) than minID - q = q.Where("? > ?", bun.Ident("status.id"), minID) - - // page up - frontToBack = false - } - - // Only include statuses that aren't pending approval. - q = q.Where("NOT ? = ?", bun.Ident("status.pending_approval"), true) - - if limit > 0 { - // limit amount of statuses returned - q = q.Limit(limit) - } - - if frontToBack { - // Page down. - q = q.Order("status.id DESC") - } else { - // Page up. - q = q.Order("status.id ASC") - } + // A limit should always + // be supplied for this. + q = q.Limit(limit) + // Finally, perform query into status ID slice. if err := q.Scan(ctx, &statusIDs); err != nil { return nil, err } - if len(statusIDs) == 0 { - return nil, nil + // The order we return from the database and + // timeline caches differs depending on ordering, + // but the caller always expected DESCENDING. + if page.GetOrder() == paging.OrderAscending { + slices.Reverse(statusIDs) } - // If we're paging up, we still want statuses - // to be sorted by ID desc, so reverse ids slice. - // https://zchee.github.io/golang-wiki/SliceTricks/#reversing - if !frontToBack { - for l, r := 0, len(statusIDs)-1; l < r; l, r = l+1, r-1 { - statusIDs[l], statusIDs[r] = statusIDs[r], statusIDs[l] - } - } - - // Return status IDs loaded from cache + db. - return t.state.DB.GetStatusesByIDs(ctx, statusIDs) -} - -func (t *timelineDB) GetTagTimeline( - ctx context.Context, - tagID string, - maxID string, - sinceID string, - minID string, - limit int, -) ([]*gtsmodel.Status, error) { - // Ensure reasonable - if limit < 0 { - limit = 0 - } - - // Make educated guess for slice size - var ( - statusIDs = make([]string, 0, limit) - frontToBack = true - ) - - q := t.db. - NewSelect(). - TableExpr("? AS ?", bun.Ident("status_to_tags"), bun.Ident("status_to_tag")). - Column("status_to_tag.status_id"). - // Join with statuses for filtering. - Join( - "INNER JOIN ? AS ? ON ? = ?", - bun.Ident("statuses"), bun.Ident("status"), - bun.Ident("status.id"), bun.Ident("status_to_tag.status_id"), - ). - // Public only. - Where("? = ?", bun.Ident("status.visibility"), gtsmodel.VisibilityPublic). - // This tag only. - Where("? = ?", bun.Ident("status_to_tag.tag_id"), tagID) - - if maxID == "" || maxID >= id.Highest { - const future = 24 * time.Hour - - // don't return statuses more than 24hr in the future - maxID = id.NewULIDFromTime(time.Now().Add(future)) - } - - // return only statuses LOWER (ie., older) than maxID - q = q.Where("? < ?", bun.Ident("status_to_tag.status_id"), maxID) - - if sinceID != "" { - // return only statuses HIGHER (ie., newer) than sinceID - q = q.Where("? > ?", bun.Ident("status_to_tag.status_id"), sinceID) - } - - if minID != "" { - // return only statuses HIGHER (ie., newer) than minID - q = q.Where("? > ?", bun.Ident("status_to_tag.status_id"), minID) - - // page up - frontToBack = false - } - - // Only include statuses that aren't pending approval. - q = q.Where("NOT ? = ?", bun.Ident("status.pending_approval"), true) - - if limit > 0 { - // limit amount of statuses returned - q = q.Limit(limit) - } - - if frontToBack { - // Page down. - q = q.Order("status_to_tag.status_id DESC") - } else { - // Page up. - q = q.Order("status_to_tag.status_id ASC") - } - - if err := q.Scan(ctx, &statusIDs); err != nil { - return nil, err - } - - if len(statusIDs) == 0 { - return nil, nil - } - - // If we're paging up, we still want statuses - // to be sorted by ID desc, so reverse ids slice. - // https://zchee.github.io/golang-wiki/SliceTricks/#reversing - if !frontToBack { - for l, r := 0, len(statusIDs)-1; l < r; l, r = l+1, r-1 { - statusIDs[l], statusIDs[r] = statusIDs[r], statusIDs[l] - } - } - - // Return status IDs loaded from cache + db. - return t.state.DB.GetStatusesByIDs(ctx, statusIDs) + // Fetch statuses from DB / cache with given IDs. + return state.DB.GetStatusesByIDs(ctx, statusIDs) } diff --git a/internal/db/timeline.go b/internal/db/timeline.go index 43ac655d0..68e494261 100644 --- a/internal/db/timeline.go +++ b/internal/db/timeline.go @@ -21,6 +21,7 @@ import ( "context" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/paging" ) // Timeline contains functionality for retrieving home/public/faved etc timelines for an account. @@ -28,13 +29,13 @@ type Timeline interface { // GetHomeTimeline returns a slice of statuses from accounts that are followed by the given account id. // // Statuses should be returned in descending order of when they were created (newest first). - GetHomeTimeline(ctx context.Context, accountID string, maxID string, sinceID string, minID string, limit int, local bool) ([]*gtsmodel.Status, error) + GetHomeTimeline(ctx context.Context, accountID string, page *paging.Page) ([]*gtsmodel.Status, error) // GetPublicTimeline fetches the account's PUBLIC timeline -- ie., posts and replies that are public. // It will use the given filters and try to return as many statuses as possible up to the limit. // // Statuses should be returned in descending order of when they were created (newest first). - GetPublicTimeline(ctx context.Context, maxID string, sinceID string, minID string, limit int, local bool) ([]*gtsmodel.Status, error) + GetPublicTimeline(ctx context.Context, page *paging.Page) ([]*gtsmodel.Status, error) // GetFavedTimeline fetches the account's FAVED timeline -- ie., posts and replies that the requesting account has faved. // It will use the given filters and try to return as many statuses as possible up to the limit. @@ -47,9 +48,9 @@ type Timeline interface { // GetListTimeline returns a slice of statuses from followed accounts collected within the list with the given listID. // Statuses should be returned in descending order of when they were created (newest first). - GetListTimeline(ctx context.Context, listID string, maxID string, sinceID string, minID string, limit int) ([]*gtsmodel.Status, error) + GetListTimeline(ctx context.Context, listID string, page *paging.Page) ([]*gtsmodel.Status, error) // GetTagTimeline returns a slice of public-visibility statuses that use the given tagID. // Statuses should be returned in descending order of when they were created (newest first). - GetTagTimeline(ctx context.Context, tagID string, maxID string, sinceID string, minID string, limit int) ([]*gtsmodel.Status, error) + GetTagTimeline(ctx context.Context, tagID string, page *paging.Page) ([]*gtsmodel.Status, error) } diff --git a/internal/paging/page.go b/internal/paging/page.go index 082012879..9df27fe68 100644 --- a/internal/paging/page.go +++ b/internal/paging/page.go @@ -64,10 +64,11 @@ func (p *Page) GetOrder() Order { if p == nil { return 0 } - return p.order() + return p.Order() } -func (p *Page) order() Order { +// Order is a small helper function to return page sort ordering. +func (p *Page) Order() Order { switch { case p.Min.Order != 0: return p.Min.Order @@ -78,6 +79,27 @@ func (p *Page) order() Order { } } +// GetBounds is a small helper function to return low and high page bound ptrs (checking for nil page). +func (p *Page) GetBounds() (lo *Boundary, hi *Boundary) { + if p == nil { + return nil, nil + } + return p.Bounds() +} + +// Bounds is a smaller helper function to return low and high page bound ptrs. +func (p *Page) Bounds() (lo *Boundary, hi *Boundary) { + switch p.Order() { + case OrderAscending: + lo = &p.Max + hi = &p.Min + case OrderDescending: + lo = &p.Min + hi = &p.Max + } + return +} + // Page will page the given slice of input according // to the receiving Page's minimum, maximum and limit. // NOTE: input slice MUST be sorted according to the order is @@ -90,7 +112,7 @@ func (p *Page) Page(in []string) []string { return in } - if p.order().Ascending() { + if p.Order().Ascending() { // Sort type is ascending, input // data is assumed to be ascending. @@ -150,7 +172,7 @@ func Page_PageFunc[WithID any](p *Page, in []WithID, get func(WithID) string) [] return in } - if p.order().Ascending() { + if p.Order().Ascending() { // Sort type is ascending, input // data is assumed to be ascending. diff --git a/internal/processing/timeline/home.go b/internal/processing/timeline/home.go index 38cf38405..0060a78a9 100644 --- a/internal/processing/timeline/home.go +++ b/internal/processing/timeline/home.go @@ -20,131 +20,108 @@ package timeline import ( "context" "errors" + "slices" apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" - apiutil "github.com/superseriousbusiness/gotosocial/internal/api/util" "github.com/superseriousbusiness/gotosocial/internal/db" statusfilter "github.com/superseriousbusiness/gotosocial/internal/filter/status" - "github.com/superseriousbusiness/gotosocial/internal/filter/usermute" - "github.com/superseriousbusiness/gotosocial/internal/filter/visibility" - "github.com/superseriousbusiness/gotosocial/internal/gtscontext" "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" - "github.com/superseriousbusiness/gotosocial/internal/state" - "github.com/superseriousbusiness/gotosocial/internal/timeline" - "github.com/superseriousbusiness/gotosocial/internal/typeutils" - "github.com/superseriousbusiness/gotosocial/internal/util" + "github.com/superseriousbusiness/gotosocial/internal/paging" ) -// HomeTimelineGrab returns a function that satisfies GrabFunction for home timelines. -func HomeTimelineGrab(state *state.State) timeline.GrabFunction { - return func(ctx context.Context, accountID string, maxID string, sinceID string, minID string, limit int) ([]timeline.Timelineable, bool, error) { - statuses, err := state.DB.GetHomeTimeline(ctx, accountID, maxID, sinceID, minID, limit, false) - if err != nil && !errors.Is(err, db.ErrNoEntries) { - err = gtserror.Newf("error getting statuses from db: %w", err) - return nil, false, err - } +// HomeTimelineGet ... +func (p *Processor) HomeTimelineGet( + ctx context.Context, + requester *gtsmodel.Account, + page *paging.Page, + local bool, +) ( + *apimodel.PageableResponse, + gtserror.WithCode, +) { - count := len(statuses) - if count == 0 { - // We just don't have enough statuses - // left in the db so return stop = true. - return nil, true, nil - } + // Load timeline data. + return p.getTimeline(ctx, - items := make([]timeline.Timelineable, count) - for i, s := range statuses { - items[i] = s - } + // Auth'd + // account. + requester, - return items, false, nil - } -} + // Home timeline cache for authorized account. + p.state.Caches.Timelines.Home.Get(requester.ID), -// HomeTimelineFilter returns a function that satisfies FilterFunction for home timelines. -func HomeTimelineFilter(state *state.State, visFilter *visibility.Filter) timeline.FilterFunction { - return func(ctx context.Context, accountID string, item timeline.Timelineable) (shouldIndex bool, err error) { - status, ok := item.(*gtsmodel.Status) - if !ok { - err = gtserror.New("could not convert item to *gtsmodel.Status") - return false, err - } + // Current + // page. + page, - requestingAccount, err := state.DB.GetAccountByID(ctx, accountID) - if err != nil { - err = gtserror.Newf("error getting account with id %s: %w", accountID, err) - return false, err - } + // Home timeline endpoint. + "/api/v1/timelines/home", - timelineable, err := visFilter.StatusHomeTimelineable(ctx, requestingAccount, status) - if err != nil { - err = gtserror.Newf("error checking hometimelineability of status %s for account %s: %w", status.ID, accountID, err) - return false, err - } + // No page + // query. + nil, - return timelineable, nil - } -} + // Status filter context. + statusfilter.FilterContextHome, -// HomeTimelineStatusPrepare returns a function that satisfies PrepareFunction for home timelines. -func HomeTimelineStatusPrepare(state *state.State, converter *typeutils.Converter) timeline.PrepareFunction { - return func(ctx context.Context, accountID string, itemID string) (timeline.Preparable, error) { - status, err := state.DB.GetStatusByID(ctx, itemID) - if err != nil { - err = gtserror.Newf("error getting status with id %s: %w", itemID, err) - return nil, err - } + // Timeline cache load function, used to further hydrate cache where necessary. + func(page *paging.Page) (statuses []*gtsmodel.Status, next *paging.Page, err error) { - requestingAccount, err := state.DB.GetAccountByID(ctx, accountID) - if err != nil { - err = gtserror.Newf("error getting account with id %s: %w", accountID, err) - return nil, err - } + // Fetch requesting account's home timeline page. + statuses, err = p.state.DB.GetHomeTimeline(ctx, + requester.ID, + page, + ) + if err != nil && !errors.Is(err, db.ErrNoEntries) { + return nil, nil, gtserror.Newf("error getting statuses: %w", err) + } - filters, err := state.DB.GetFiltersForAccountID(ctx, requestingAccount.ID) - if err != nil { - err = gtserror.Newf("couldn't retrieve filters for account %s: %w", requestingAccount.ID, err) - return nil, err - } + if len(statuses) == 0 { + // No more to load. + return nil, nil, nil + } - mutes, err := state.DB.GetAccountMutes(gtscontext.SetBarebones(ctx), requestingAccount.ID, nil) - if err != nil { - err = gtserror.Newf("couldn't retrieve mutes for account %s: %w", requestingAccount.ID, err) - return nil, err - } - compiledMutes := usermute.NewCompiledUserMuteList(mutes) + // Get the lowest and highest + // ID values, used for next pg. + lo := statuses[len(statuses)-1].ID + hi := statuses[0].ID - return converter.StatusToAPIStatus(ctx, status, requestingAccount, statusfilter.FilterContextHome, filters, compiledMutes) - } -} + // Set next paging value. + page = page.Next(lo, hi) -func (p *Processor) HomeTimelineGet(ctx context.Context, authed *apiutil.Auth, maxID string, sinceID string, minID string, limit int, local bool) (*apimodel.PageableResponse, gtserror.WithCode) { - statuses, err := p.state.Timelines.Home.GetTimeline(ctx, authed.Account.ID, maxID, sinceID, minID, limit, local) - if err != nil && !errors.Is(err, db.ErrNoEntries) { - err = gtserror.Newf("error getting statuses: %w", err) - return nil, gtserror.NewErrorInternalError(err) - } + for i := 0; i < len(statuses); { + // Get status at idx. + status := statuses[i] - count := len(statuses) - if count == 0 { - return util.EmptyPageableResponse(), nil - } + // Check whether status should be show on home timeline. + visible, err := p.visFilter.StatusHomeTimelineable(ctx, + requester, + status, + ) + if err != nil { + return nil, nil, gtserror.Newf("error checking visibility: %w", err) + } - var ( - items = make([]interface{}, count) - nextMaxIDValue = statuses[count-1].GetID() - prevMinIDValue = statuses[0].GetID() + if !visible { + // Status not visible to home timeline. + statuses = slices.Delete(statuses, i, i+1) + continue + } + + // Iter. + i++ + } + + return + }, + + // Per-request filtering function. + func(s *gtsmodel.Status) bool { + if local { + return !*s.Local + } + return false + }, ) - - for i := range statuses { - items[i] = statuses[i] - } - - return util.PackagePageableResponse(util.PageableResponseParams{ - Items: items, - Path: "/api/v1/timelines/home", - NextMaxIDValue: nextMaxIDValue, - PrevMinIDValue: prevMinIDValue, - Limit: limit, - }) } diff --git a/internal/processing/timeline/home_test.go b/internal/processing/timeline/home_test.go index ea56418f6..0b9681744 100644 --- a/internal/processing/timeline/home_test.go +++ b/internal/processing/timeline/home_test.go @@ -24,12 +24,9 @@ import ( "github.com/stretchr/testify/suite" apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" apiutil "github.com/superseriousbusiness/gotosocial/internal/api/util" - "github.com/superseriousbusiness/gotosocial/internal/filter/visibility" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/id" - tlprocessor "github.com/superseriousbusiness/gotosocial/internal/processing/timeline" - "github.com/superseriousbusiness/gotosocial/internal/timeline" - "github.com/superseriousbusiness/gotosocial/internal/typeutils" + "github.com/superseriousbusiness/gotosocial/internal/paging" "github.com/superseriousbusiness/gotosocial/internal/util" ) @@ -37,20 +34,6 @@ type HomeTestSuite struct { TimelineStandardTestSuite } -func (suite *HomeTestSuite) SetupTest() { - suite.TimelineStandardTestSuite.SetupTest() - - suite.state.Timelines.Home = timeline.NewManager( - tlprocessor.HomeTimelineGrab(&suite.state), - tlprocessor.HomeTimelineFilter(&suite.state, visibility.NewFilter(&suite.state)), - tlprocessor.HomeTimelineStatusPrepare(&suite.state, typeutils.NewConverter(&suite.state)), - tlprocessor.SkipInsert(), - ) - if err := suite.state.Timelines.Home.Start(); err != nil { - suite.FailNow(err.Error()) - } -} - func (suite *HomeTestSuite) TearDownTest() { if err := suite.state.Timelines.Home.Stop(); err != nil { suite.FailNow(err.Error()) @@ -97,11 +80,12 @@ func (suite *HomeTestSuite) TestHomeTimelineGetHideFiltered() { // Fetch the timeline to make sure the status we're going to filter is in that section of it. resp, errWithCode := suite.timeline.HomeTimelineGet( ctx, - authed, - maxID, - sinceID, - minID, - limit, + requester, + &paging.Page{ + Min: paging.EitherMinID(minID, sinceID), + Max: paging.MaxID(maxID), + Limit: limit, + }, local, ) suite.NoError(errWithCode) @@ -127,11 +111,12 @@ func (suite *HomeTestSuite) TestHomeTimelineGetHideFiltered() { // Fetch the timeline again with the filter in place. resp, errWithCode = suite.timeline.HomeTimelineGet( ctx, - authed, - maxID, - sinceID, - minID, - limit, + requester, + &paging.Page{ + Min: paging.EitherMinID(minID, sinceID), + Max: paging.MaxID(maxID), + Limit: limit, + }, local, ) diff --git a/internal/processing/timeline/list.go b/internal/processing/timeline/list.go index 147f87ab4..11eba87f2 100644 --- a/internal/processing/timeline/list.go +++ b/internal/processing/timeline/list.go @@ -20,157 +20,128 @@ package timeline import ( "context" "errors" + "slices" apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" - apiutil "github.com/superseriousbusiness/gotosocial/internal/api/util" "github.com/superseriousbusiness/gotosocial/internal/db" statusfilter "github.com/superseriousbusiness/gotosocial/internal/filter/status" - "github.com/superseriousbusiness/gotosocial/internal/filter/usermute" - "github.com/superseriousbusiness/gotosocial/internal/filter/visibility" "github.com/superseriousbusiness/gotosocial/internal/gtscontext" "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" - "github.com/superseriousbusiness/gotosocial/internal/state" - "github.com/superseriousbusiness/gotosocial/internal/timeline" - "github.com/superseriousbusiness/gotosocial/internal/typeutils" - "github.com/superseriousbusiness/gotosocial/internal/util" + "github.com/superseriousbusiness/gotosocial/internal/paging" ) -// ListTimelineGrab returns a function that satisfies GrabFunction for list timelines. -func ListTimelineGrab(state *state.State) timeline.GrabFunction { - return func(ctx context.Context, listID string, maxID string, sinceID string, minID string, limit int) ([]timeline.Timelineable, bool, error) { - statuses, err := state.DB.GetListTimeline(ctx, listID, maxID, sinceID, minID, limit) - if err != nil && !errors.Is(err, db.ErrNoEntries) { - err = gtserror.Newf("error getting statuses from db: %w", err) - return nil, false, err - } - - count := len(statuses) - if count == 0 { - // We just don't have enough statuses - // left in the db so return stop = true. - return nil, true, nil - } - - items := make([]timeline.Timelineable, count) - for i, s := range statuses { - items[i] = s - } - - return items, false, nil - } -} - -// ListTimelineFilter returns a function that satisfies FilterFunction for list timelines. -func ListTimelineFilter(state *state.State, visFilter *visibility.Filter) timeline.FilterFunction { - return func(ctx context.Context, listID string, item timeline.Timelineable) (shouldIndex bool, err error) { - status, ok := item.(*gtsmodel.Status) - if !ok { - err = gtserror.New("could not convert item to *gtsmodel.Status") - return false, err - } - - list, err := state.DB.GetListByID(ctx, listID) - if err != nil { - err = gtserror.Newf("error getting list with id %s: %w", listID, err) - return false, err - } - - requestingAccount, err := state.DB.GetAccountByID(ctx, list.AccountID) - if err != nil { - err = gtserror.Newf("error getting account with id %s: %w", list.AccountID, err) - return false, err - } - - timelineable, err := visFilter.StatusHomeTimelineable(ctx, requestingAccount, status) - if err != nil { - err = gtserror.Newf("error checking hometimelineability of status %s for account %s: %w", status.ID, list.AccountID, err) - return false, err - } - - return timelineable, nil - } -} - -// ListTimelineStatusPrepare returns a function that satisfies PrepareFunction for list timelines. -func ListTimelineStatusPrepare(state *state.State, converter *typeutils.Converter) timeline.PrepareFunction { - return func(ctx context.Context, listID string, itemID string) (timeline.Preparable, error) { - status, err := state.DB.GetStatusByID(ctx, itemID) - if err != nil { - err = gtserror.Newf("error getting status with id %s: %w", itemID, err) - return nil, err - } - - list, err := state.DB.GetListByID(ctx, listID) - if err != nil { - err = gtserror.Newf("error getting list with id %s: %w", listID, err) - return nil, err - } - - requestingAccount, err := state.DB.GetAccountByID(ctx, list.AccountID) - if err != nil { - err = gtserror.Newf("error getting account with id %s: %w", list.AccountID, err) - return nil, err - } - - filters, err := state.DB.GetFiltersForAccountID(ctx, requestingAccount.ID) - if err != nil { - err = gtserror.Newf("couldn't retrieve filters for account %s: %w", requestingAccount.ID, err) - return nil, err - } - - mutes, err := state.DB.GetAccountMutes(gtscontext.SetBarebones(ctx), requestingAccount.ID, nil) - if err != nil { - err = gtserror.Newf("couldn't retrieve mutes for account %s: %w", requestingAccount.ID, err) - return nil, err - } - compiledMutes := usermute.NewCompiledUserMuteList(mutes) - - return converter.StatusToAPIStatus(ctx, status, requestingAccount, statusfilter.FilterContextHome, filters, compiledMutes) - } -} - -func (p *Processor) ListTimelineGet(ctx context.Context, authed *apiutil.Auth, listID string, maxID string, sinceID string, minID string, limit int) (*apimodel.PageableResponse, gtserror.WithCode) { - // Ensure list exists + is owned by this account. - list, err := p.state.DB.GetListByID(ctx, listID) - if err != nil { - if errors.Is(err, db.ErrNoEntries) { - return nil, gtserror.NewErrorNotFound(err) - } +// ListTimelineGet ... +func (p *Processor) ListTimelineGet( + ctx context.Context, + requester *gtsmodel.Account, + listID string, + page *paging.Page, +) ( + *apimodel.PageableResponse, + gtserror.WithCode, +) { + // Fetch the requested list with ID. + list, err := p.state.DB.GetListByID( + gtscontext.SetBarebones(ctx), + listID, + ) + if err != nil && !errors.Is(err, db.ErrNoEntries) { return nil, gtserror.NewErrorInternalError(err) } - if list.AccountID != authed.Account.ID { - err = gtserror.Newf("list with id %s does not belong to account %s", list.ID, authed.Account.ID) + // Check exists. + if list != nil { + const text = "list not found" + return nil, gtserror.NewErrorNotFound( + errors.New(text), + text, + ) + } + + // Check list owned by auth'd account. + if list.AccountID != requester.ID { + err := gtserror.New("list does not belong to account") return nil, gtserror.NewErrorNotFound(err) } - statuses, err := p.state.Timelines.List.GetTimeline(ctx, listID, maxID, sinceID, minID, limit, false) - if err != nil && !errors.Is(err, db.ErrNoEntries) { - err = gtserror.Newf("error getting statuses: %w", err) - return nil, gtserror.NewErrorInternalError(err) - } + // Load timeline data. + return p.getTimeline(ctx, - count := len(statuses) - if count == 0 { - return util.EmptyPageableResponse(), nil - } + // Auth'd + // account. + requester, - var ( - items = make([]interface{}, count) - nextMaxIDValue = statuses[count-1].GetID() - prevMinIDValue = statuses[0].GetID() + // List timeline cache for list with ID. + p.state.Caches.Timelines.List.Get(listID), + + // Current + // page. + page, + + // List timeline endpoint. + "/api/v1/timelines/list/"+listID, + + // No page + // query. + nil, + + // Status filter context. + statusfilter.FilterContextHome, + + // Timeline cache load function, used to further hydrate cache where necessary. + func(page *paging.Page) (statuses []*gtsmodel.Status, next *paging.Page, err error) { + + // Fetch requesting account's list timeline page. + statuses, err = p.state.DB.GetListTimeline(ctx, + listID, + page, + ) + if err != nil && !errors.Is(err, db.ErrNoEntries) { + return nil, nil, gtserror.Newf("error getting statuses: %w", err) + } + + if len(statuses) == 0 { + // No more to load. + return nil, nil, nil + } + + // Get the lowest and highest + // ID values, used for next pg. + lo := statuses[len(statuses)-1].ID + hi := statuses[0].ID + + // Set next paging value. + page = page.Next(lo, hi) + + for i := 0; i < len(statuses); { + // Get status at idx. + status := statuses[i] + + // Check whether status should be show on home timeline. + visible, err := p.visFilter.StatusHomeTimelineable(ctx, + requester, + status, + ) + if err != nil { + return nil, nil, gtserror.Newf("error checking visibility: %w", err) + } + + if !visible { + // Status not visible to home timeline. + statuses = slices.Delete(statuses, i, i+1) + continue + } + + // Iter. + i++ + } + + return + }, + + // No furthering + // filter function. + nil, ) - - for i := range statuses { - items[i] = statuses[i] - } - - return util.PackagePageableResponse(util.PageableResponseParams{ - Items: items, - Path: "/api/v1/timelines/list/" + listID, - NextMaxIDValue: nextMaxIDValue, - PrevMinIDValue: prevMinIDValue, - Limit: limit, - }) } diff --git a/internal/processing/timeline/public.go b/internal/processing/timeline/public.go index dc00688e3..91eaee743 100644 --- a/internal/processing/timeline/public.go +++ b/internal/processing/timeline/public.go @@ -20,151 +20,108 @@ package timeline import ( "context" "errors" + "net/url" + "slices" "strconv" apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" "github.com/superseriousbusiness/gotosocial/internal/db" statusfilter "github.com/superseriousbusiness/gotosocial/internal/filter/status" - "github.com/superseriousbusiness/gotosocial/internal/filter/usermute" - "github.com/superseriousbusiness/gotosocial/internal/gtscontext" "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" - "github.com/superseriousbusiness/gotosocial/internal/log" - "github.com/superseriousbusiness/gotosocial/internal/util" + "github.com/superseriousbusiness/gotosocial/internal/paging" ) +// PublicTimelineGet ... func (p *Processor) PublicTimelineGet( ctx context.Context, requester *gtsmodel.Account, - maxID string, - sinceID string, - minID string, - limit int, + page *paging.Page, local bool, -) (*apimodel.PageableResponse, gtserror.WithCode) { - const maxAttempts = 3 - var ( - nextMaxIDValue string - prevMinIDValue string - items = make([]any, 0, limit) - ) +) ( + *apimodel.PageableResponse, + gtserror.WithCode, +) { - var filters []*gtsmodel.Filter - var compiledMutes *usermute.CompiledUserMuteList - if requester != nil { - var err error - filters, err = p.state.DB.GetFiltersForAccountID(ctx, requester.ID) - if err != nil { - err = gtserror.Newf("couldn't retrieve filters for account %s: %w", requester.ID, err) - return nil, gtserror.NewErrorInternalError(err) - } + // Load timeline data. + return p.getTimeline(ctx, - mutes, err := p.state.DB.GetAccountMutes(gtscontext.SetBarebones(ctx), requester.ID, nil) - if err != nil { - err = gtserror.Newf("couldn't retrieve mutes for account %s: %w", requester.ID, err) - return nil, gtserror.NewErrorInternalError(err) - } - compiledMutes = usermute.NewCompiledUserMuteList(mutes) - } + // Auth'd + // account. + requester, - // Try a few times to select appropriate public - // statuses from the db, paging up or down to - // reattempt if nothing suitable is found. -outer: - for attempts := 1; ; attempts++ { - // Select slightly more than the limit to try to avoid situations where - // we filter out all the entries, and have to make another db call. - // It's cheaper to select more in 1 query than it is to do multiple queries. - statuses, err := p.state.DB.GetPublicTimeline(ctx, maxID, sinceID, minID, limit+5, local) - if err != nil && !errors.Is(err, db.ErrNoEntries) { - err = gtserror.Newf("db error getting statuses: %w", err) - return nil, gtserror.NewErrorInternalError(err) - } + // Global public timeline cache. + &p.state.Caches.Timelines.Public, - count := len(statuses) - if count == 0 { - // Nothing relevant (left) in the db. - return util.EmptyPageableResponse(), nil - } + // Current + // page. + page, - // Page up from first status in slice - // (ie., one with the highest ID). - prevMinIDValue = statuses[0].ID + // Public timeline endpoint. + "/api/v1/timelines/public", - inner: - for _, s := range statuses { - // Push back the next page down ID to - // this status, regardless of whether - // we end up filtering it out or not. - nextMaxIDValue = s.ID + // Set local-only timeline page query flag. + url.Values{"local": {strconv.FormatBool(local)}}, - timelineable, err := p.visFilter.StatusPublicTimelineable(ctx, requester, s) - if err != nil { - log.Errorf(ctx, "error checking status visibility: %v", err) - continue inner + // Status filter context. + statusfilter.FilterContextPublic, + + // Timeline cache load function, used to further hydrate cache where necessary. + func(page *paging.Page) (statuses []*gtsmodel.Status, next *paging.Page, err error) { + + // Fetch the global public status timeline page. + statuses, err = p.state.DB.GetPublicTimeline(ctx, + page, + ) + if err != nil && !errors.Is(err, db.ErrNoEntries) { + return nil, nil, gtserror.Newf("error getting statuses: %w", err) } - if !timelineable { - continue inner + if len(statuses) == 0 { + // No more to load. + return nil, nil, nil } - apiStatus, err := p.converter.StatusToAPIStatus(ctx, s, requester, statusfilter.FilterContextPublic, filters, compiledMutes) - if errors.Is(err, statusfilter.ErrHideStatus) { - continue - } - if err != nil { - log.Errorf(ctx, "error converting to api status: %v", err) - continue inner + // Get the lowest and highest + // ID values, used for next pg. + lo := statuses[len(statuses)-1].ID + hi := statuses[0].ID + + // Set next paging value. + page = page.Next(lo, hi) + + for i := 0; i < len(statuses); { + // Get status at idx. + status := statuses[i] + + // Check whether status should be show on public timeline. + visible, err := p.visFilter.StatusPublicTimelineable(ctx, + requester, + status, + ) + if err != nil { + return nil, nil, gtserror.Newf("error checking visibility: %w", err) + } + + if !visible { + // Status not visible to home timeline. + statuses = slices.Delete(statuses, i, i+1) + continue + } + + // Iter. + i++ } - // Looks good, add this. - items = append(items, apiStatus) - - // We called the db with a little - // more than the desired limit. - // - // Ensure we don't return more - // than the caller asked for. - if len(items) == limit { - break outer - } - } - - if len(items) != 0 { - // We've got some items left after - // filtering, happily break + return. - break - } - - if attempts >= maxAttempts { - // We reached our attempts limit. - // Be nice + warn about it. - log.Warn(ctx, "reached max attempts to find items in public timeline") - break - } - - // We filtered out all items before we - // found anything we could return, but - // we still have attempts left to try - // fetching again. Set paging params - // and allow loop to continue. - if minID != "" { - // Paging up. - minID = prevMinIDValue - } else { - // Paging down. - maxID = nextMaxIDValue - } - } - - return util.PackagePageableResponse(util.PageableResponseParams{ - Items: items, - Path: "/api/v1/timelines/public", - NextMaxIDValue: nextMaxIDValue, - PrevMinIDValue: prevMinIDValue, - Limit: limit, - ExtraQueryParams: []string{ - "local=" + strconv.FormatBool(local), + return }, - }) + + // Per-request filtering function. + func(s *gtsmodel.Status) bool { + if local { + return !*s.Local + } + return false + }, + ) } diff --git a/internal/processing/timeline/public_test.go b/internal/processing/timeline/public_test.go index ab8e33429..b3ff87951 100644 --- a/internal/processing/timeline/public_test.go +++ b/internal/processing/timeline/public_test.go @@ -25,6 +25,7 @@ import ( apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/id" + "github.com/superseriousbusiness/gotosocial/internal/paging" "github.com/superseriousbusiness/gotosocial/internal/util" ) @@ -46,10 +47,11 @@ func (suite *PublicTestSuite) TestPublicTimelineGet() { resp, errWithCode := suite.timeline.PublicTimelineGet( ctx, requester, - maxID, - sinceID, - minID, - limit, + &paging.Page{ + Min: paging.EitherMinID(minID, sinceID), + Max: paging.MaxID(maxID), + Limit: limit, + }, local, ) @@ -79,10 +81,11 @@ func (suite *PublicTestSuite) TestPublicTimelineGetNotEmpty() { resp, errWithCode := suite.timeline.PublicTimelineGet( ctx, requester, - maxID, - sinceID, - minID, - limit, + &paging.Page{ + Min: paging.EitherMinID(minID, sinceID), + Max: paging.MaxID(maxID), + Limit: limit, + }, local, ) @@ -133,10 +136,11 @@ func (suite *PublicTestSuite) TestPublicTimelineGetHideFiltered() { resp, errWithCode := suite.timeline.PublicTimelineGet( ctx, requester, - maxID, - sinceID, - minID, - limit, + &paging.Page{ + Min: paging.EitherMinID(minID, sinceID), + Max: paging.MaxID(maxID), + Limit: limit, + }, local, ) suite.NoError(errWithCode) @@ -161,10 +165,11 @@ func (suite *PublicTestSuite) TestPublicTimelineGetHideFiltered() { resp, errWithCode = suite.timeline.PublicTimelineGet( ctx, requester, - maxID, - sinceID, - minID, - limit, + &paging.Page{ + Min: paging.EitherMinID(minID, sinceID), + Max: paging.MaxID(maxID), + Limit: limit, + }, local, ) diff --git a/internal/processing/timeline/tag.go b/internal/processing/timeline/tag.go index 811d0bb33..5a771431f 100644 --- a/internal/processing/timeline/tag.go +++ b/internal/processing/timeline/tag.go @@ -30,6 +30,7 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/log" + "github.com/superseriousbusiness/gotosocial/internal/paging" "github.com/superseriousbusiness/gotosocial/internal/text" "github.com/superseriousbusiness/gotosocial/internal/util" ) @@ -58,7 +59,13 @@ func (p *Processor) TagTimelineGet( return nil, gtserror.NewErrorNotFound(err, err.Error()) } - statuses, err := p.state.DB.GetTagTimeline(ctx, tag.ID, maxID, sinceID, minID, limit) + page := paging.Page{ + Min: paging.EitherMinID(minID, sinceID), + Max: paging.MaxID(maxID), + Limit: limit, + } + + statuses, err := p.state.DB.GetTagTimeline(ctx, tag.ID, &page) if err != nil && !errors.Is(err, db.ErrNoEntries) { err = gtserror.Newf("db error getting statuses: %w", err) return nil, gtserror.NewErrorInternalError(err) diff --git a/internal/processing/timeline/timeline.go b/internal/processing/timeline/timeline.go index 5966fe864..b285265ff 100644 --- a/internal/processing/timeline/timeline.go +++ b/internal/processing/timeline/timeline.go @@ -18,7 +18,22 @@ package timeline import ( + "context" + "errors" + "net/url" + "slices" + + apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" + "github.com/superseriousbusiness/gotosocial/internal/cache" + "github.com/superseriousbusiness/gotosocial/internal/cache/timeline" + "github.com/superseriousbusiness/gotosocial/internal/db" + statusfilter "github.com/superseriousbusiness/gotosocial/internal/filter/status" + "github.com/superseriousbusiness/gotosocial/internal/filter/usermute" "github.com/superseriousbusiness/gotosocial/internal/filter/visibility" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/log" + "github.com/superseriousbusiness/gotosocial/internal/paging" "github.com/superseriousbusiness/gotosocial/internal/state" "github.com/superseriousbusiness/gotosocial/internal/typeutils" ) @@ -36,3 +51,273 @@ func New(state *state.State, converter *typeutils.Converter, visFilter *visibili visFilter: visFilter, } } + +func (p *Processor) getStatusTimeline( + ctx context.Context, + requester *gtsmodel.Account, + timeline *timeline.StatusTimeline, + page *paging.Page, + pgPath string, // timeline page path + pgQuery url.Values, // timeline query parameters + filterCtx statusfilter.FilterContext, + loadPage func(*paging.Page) (statuses []*gtsmodel.Status, err error), + preFilter func(*gtsmodel.Status) (bool, error), + postFilter func(*timeline.StatusMeta) bool, +) ( + *apimodel.PageableResponse, + gtserror.WithCode, +) { + var ( + filters []*gtsmodel.Filter + mutes *usermute.CompiledUserMuteList + ) + + if requester != nil { + var err error + + // Fetch all filters relevant for requesting account. + filters, err = p.state.DB.GetFiltersForAccountID(ctx, + requester.ID, + ) + if err != nil && !errors.Is(err, db.ErrNoEntries) { + err := gtserror.Newf("error getting account filters: %w", err) + return nil, gtserror.NewErrorInternalError(err) + } + + // Get a list of all account mutes for requester. + allMutes, err := p.state.DB.GetAccountMutes(ctx, + requester.ID, + nil, // nil page, i.e. all + ) + if err != nil && !errors.Is(err, db.ErrNoEntries) { + err := gtserror.Newf("error getting account mutes: %w", err) + return nil, gtserror.NewErrorInternalError(err) + } + + // Compile all account mutes to useable form. + mutes = usermute.NewCompiledUserMuteList(allMutes) + } + + // ... + statuses, err := timeline.Load(ctx, + page, + + // ... + loadPage, + + // ... + func(ids []string) ([]*gtsmodel.Status, error) { + return p.state.DB.GetStatusesByIDs(ctx, ids) + }, + + // ... + preFilter, + + // ... + postFilter, + + // ... + func(status *gtsmodel.Status) (*apimodel.Status, error) { + apiStatus, err := p.converter.StatusToAPIStatus(ctx, + status, + requester, + filterCtx, + filters, + mutes, + ) + if err != nil && !errors.Is(err, statusfilter.ErrHideStatus) { + return nil, err + } + return apiStatus, nil + }, + ) + if err != nil { + panic(err) + } +} + +func (p *Processor) getTimeline( + ctx context.Context, + requester *gtsmodel.Account, + timeline *cache.TimelineCache[*gtsmodel.Status], + page *paging.Page, + pgPath string, // timeline page path + pgQuery url.Values, // timeline query parameters + filterCtx statusfilter.FilterContext, + load func(*paging.Page) (statuses []*gtsmodel.Status, next *paging.Page, err error), // timeline cache load function + filter func(*gtsmodel.Status) bool, // per-request filtering function, done AFTER timeline caching +) ( + *apimodel.PageableResponse, + gtserror.WithCode, +) { + // Load timeline with cache / loader funcs. + statuses, errWithCode := p.loadTimeline(ctx, + timeline, + page, + load, + filter, + ) + if errWithCode != nil { + return nil, errWithCode + } + + if len(statuses) == 0 { + // Check for an empty timeline rsp. + return paging.EmptyResponse(), nil + } + + // Get the lowest and highest + // ID values, used for paging. + lo := statuses[len(statuses)-1].ID + hi := statuses[0].ID + + var ( + filters []*gtsmodel.Filter + mutes *usermute.CompiledUserMuteList + ) + + if requester != nil { + var err error + + // Fetch all filters relevant for requesting account. + filters, err = p.state.DB.GetFiltersForAccountID(ctx, + requester.ID, + ) + if err != nil && !errors.Is(err, db.ErrNoEntries) { + err := gtserror.Newf("error getting account filters: %w", err) + return nil, gtserror.NewErrorInternalError(err) + } + + // Get a list of all account mutes for requester. + allMutes, err := p.state.DB.GetAccountMutes(ctx, + requester.ID, + nil, // nil page, i.e. all + ) + if err != nil && !errors.Is(err, db.ErrNoEntries) { + err := gtserror.Newf("error getting account mutes: %w", err) + return nil, gtserror.NewErrorInternalError(err) + } + + // Compile all account mutes to useable form. + mutes = usermute.NewCompiledUserMuteList(allMutes) + } + + // NOTE: + // Right now this is not ideal, as we perform mute and + // status filtering *after* the above load loop, so we + // could end up with no statuses still AFTER all loading. + // + // In a PR coming *soon* we will move the filtering and + // status muting into separate module similar to the visibility + // filtering and caching which should move it to the above + // load loop and provided function. + + // API response requires them in interface{} form. + items := make([]interface{}, 0, len(statuses)) + + for _, status := range statuses { + // Convert internal status model to frontend model. + apiStatus, err := p.converter.StatusToAPIStatus(ctx, + status, + requester, + filterCtx, + filters, + mutes, + ) + if err != nil && !errors.Is(err, statusfilter.ErrHideStatus) { + log.Errorf(ctx, "error converting status: %v", err) + continue + } + + if apiStatus != nil { + // Append status to return slice. + items = append(items, apiStatus) + } + } + + // Package converted API statuses as pageable response. + return paging.PackageResponse(paging.ResponseParams{ + Items: items, + Next: page.Next(lo, hi), + Prev: page.Prev(lo, hi), + Path: pgPath, + Query: pgQuery, + }), nil +} + +func (p *Processor) loadTimeline( + ctx context.Context, + timeline *cache.TimelineCache[*gtsmodel.Status], + page *paging.Page, + load func(*paging.Page) (statuses []*gtsmodel.Status, next *paging.Page, err error), + filter func(*gtsmodel.Status) bool, +) ( + []*gtsmodel.Status, + gtserror.WithCode, +) { + if load == nil { + // nil check outside + // below main loop. + panic("nil func") + } + + if page == nil { + const text = "timeline must be paged" + return nil, gtserror.NewErrorBadRequest( + errors.New(text), + text, + ) + } + + // Try load statuses from cache. + statuses := timeline.Select(page) + + // Filter statuses using provided function. + statuses = slices.DeleteFunc(statuses, filter) + + // Check if more statuses need to be loaded. + if limit := page.Limit; len(statuses) < limit { + + // Set first page + // query to load. + nextPg := page + + for i := 0; i < 5; i++ { + var err error + var next []*gtsmodel.Status + + // Load next timeline statuses. + next, nextPg, err = load(nextPg) + if err != nil { + err := gtserror.Newf("error loading timeline: %w", err) + return nil, gtserror.NewErrorInternalError(err) + } + + // An empty next page means no more. + if len(next) == 0 && nextPg == nil { + break + } + + // Cache loaded statuses. + timeline.Insert(next...) + + // Filter statuses using provided function, + // this must be done AFTER cache insert but + // BEFORE adding to slice, as this is used + // for request-specific timeline filtering, + // as opposed to filtering for entire cache. + next = slices.DeleteFunc(next, filter) + + // Append loaded statuses to return. + statuses = append(statuses, next...) + + if len(statuses) >= limit { + // We loaded all the statuses + // that were requested of us! + break + } + } + } + + return statuses, nil +} diff --git a/internal/processing/workers/fromclientapi.go b/internal/processing/workers/fromclientapi.go index 28a2b37b9..18106d393 100644 --- a/internal/processing/workers/fromclientapi.go +++ b/internal/processing/workers/fromclientapi.go @@ -682,23 +682,13 @@ func (p *clientAPI) CreateBlock(ctx context.Context, cMsg *messages.FromClientAP return gtserror.Newf("%T not parseable as *gtsmodel.Block", cMsg.GTSModel) } - // Remove blockee's statuses from blocker's timeline. - if err := p.state.Timelines.Home.WipeItemsFromAccountID( - ctx, - block.AccountID, - block.TargetAccountID, - ); err != nil { - return gtserror.Newf("error wiping timeline items for block: %w", err) - } + // Remove blocker's statuses from blocker's timeline. + p.state.Caches.Timelines.Home.InvalidateFrom(block.AccountID, "AccountID", block.TargetAccountID) + p.state.Caches.Timelines.Home.InvalidateFrom(block.AccountID, "BoostOfAccountID", block.TargetAccountID) - // Remove blocker's statuses from blockee's timeline. - if err := p.state.Timelines.Home.WipeItemsFromAccountID( - ctx, - block.TargetAccountID, - block.AccountID, - ); err != nil { - return gtserror.Newf("error wiping timeline items for block: %w", err) - } + // Remove blockee's statuses from blockee's timeline. + p.state.Caches.Timelines.Home.InvalidateFrom(block.TargetAccountID, "AccountID", block.AccountID) + p.state.Caches.Timelines.Home.InvalidateFrom(block.TargetAccountID, "BoostOfAccountID", block.AccountID) // TODO: same with notifications? // TODO: same with bookmarks? diff --git a/internal/processing/workers/surfacetimeline.go b/internal/processing/workers/surfacetimeline.go index b071bd72e..43213a197 100644 --- a/internal/processing/workers/surfacetimeline.go +++ b/internal/processing/workers/surfacetimeline.go @@ -559,6 +559,7 @@ func (s *Surface) deleteStatusFromTimelines(ctx context.Context, statusID string if err := s.State.Timelines.List.WipeItemFromAllTimelines(ctx, statusID); err != nil { return err } + s.Stream.Delete(ctx, statusID) return nil } diff --git a/internal/state/state.go b/internal/state/state.go index 8aefa658a..d6f58e714 100644 --- a/internal/state/state.go +++ b/internal/state/state.go @@ -23,7 +23,6 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/cache" "github.com/superseriousbusiness/gotosocial/internal/db" "github.com/superseriousbusiness/gotosocial/internal/storage" - "github.com/superseriousbusiness/gotosocial/internal/timeline" "github.com/superseriousbusiness/gotosocial/internal/workers" ) @@ -34,11 +33,10 @@ import ( // subpackage initialization, while the returned subpackage type will later // then be set and stored within the State{} itself. type State struct { - // Caches provides access to this state's collection of caches. - Caches cache.Caches - // Timelines provides access to this state's collection of timelines. - Timelines timeline.Timelines + // Caches provides access to this + // state's collection of caches. + Caches cache.Caches // DB provides access to the database. DB db.DB @@ -59,7 +57,8 @@ type State struct { // pinned statuses, creating notifs, etc. ProcessingLocks mutexes.MutexMap - // Storage provides access to the storage driver. + // Storage provides access + // to the storage driver. Storage *storage.Driver // Workers provides access to this