diff --git a/internal/cache/db.go b/internal/cache/db.go index 6b482d5f8..d31017ccd 100644 --- a/internal/cache/db.go +++ b/internal/cache/db.go @@ -154,6 +154,7 @@ type DBCaches struct { Domains atomic.Pointer[int] Statuses atomic.Pointer[int] Users atomic.Pointer[int] + UserIDs atomic.Pointer[[]string] } // InteractionRequest provides access to the gtsmodel InteractionRequest database cache. diff --git a/internal/cache/invalidate.go b/internal/cache/invalidate.go index 569238e9b..863719b77 100644 --- a/internal/cache/invalidate.go +++ b/internal/cache/invalidate.go @@ -365,7 +365,8 @@ func (c *Caches) OnInvalidateUser(user *gtsmodel.User) { c.Visibility.Invalidate("ItemID", user.AccountID) c.Visibility.Invalidate("RequesterID", user.AccountID) - // Invalidate the local users count. + // Invalidate the local user IDs / count. + c.DB.LocalInstance.UserIDs.Store(nil) c.DB.LocalInstance.Users.Store(nil) } diff --git a/internal/db/bundb/user.go b/internal/db/bundb/user.go index 2800a32e9..f51d1bf74 100644 --- a/internal/db/bundb/user.go +++ b/internal/db/bundb/user.go @@ -19,12 +19,15 @@ package bundb import ( "context" + "slices" "time" "code.superseriousbusiness.org/gotosocial/internal/gtscontext" "code.superseriousbusiness.org/gotosocial/internal/gtserror" "code.superseriousbusiness.org/gotosocial/internal/gtsmodel" + "code.superseriousbusiness.org/gotosocial/internal/log" "code.superseriousbusiness.org/gotosocial/internal/state" + "code.superseriousbusiness.org/gotosocial/internal/util/xslices" "github.com/uptrace/bun" ) @@ -45,27 +48,47 @@ func (u *userDB) GetUserByID(ctx context.Context, id string) (*gtsmodel.User, er } func (u *userDB) GetUsersByIDs(ctx context.Context, ids []string) ([]*gtsmodel.User, error) { - var ( - users = make([]*gtsmodel.User, 0, len(ids)) + // Load all input user IDs via cache loader callback. + users, err := u.state.Caches.DB.User.LoadIDs("ID", + ids, + func(uncached []string) ([]*gtsmodel.User, error) { + // Preallocate expected length of uncached users. + users := make([]*gtsmodel.User, 0, len(uncached)) - // Collect errors instead of - // returning early on any. - errs gtserror.MultiError + // Perform database query scanning + // the remaining (uncached) user IDs. + if err := u.db.NewSelect(). + Model(&users). + Where("? IN (?)", bun.Ident("id"), bun.In(uncached)). + Scan(ctx); err != nil { + return nil, err + } + + return users, nil + }, ) - - for _, id := range ids { - // Attempt to fetch user from DB. - user, err := u.GetUserByID(ctx, id) - if err != nil { - errs.Appendf("error getting user %s: %w", id, err) - continue - } - - // Append user to return slice. - users = append(users, user) + if err != nil { + return nil, err } - return users, errs.Combine() + // Reorder the users by their + // IDs to ensure in correct order. + getID := func(s *gtsmodel.User) string { return s.ID } + xslices.OrderBy(users, ids, getID) + + if gtscontext.Barebones(ctx) { + // no need to fully populate. + return users, nil + } + + // Populate all loaded users. + for _, user := range users { + if err := u.PopulateUser(ctx, user); err != nil { + log.Errorf(ctx, "error populating user %s: %v", user.ID, err) + } + } + + return users, nil } func (u *userDB) GetUserByAccountID(ctx context.Context, accountID string) (*gtsmodel.User, error) { @@ -161,7 +184,11 @@ func (u *userDB) PopulateUser(ctx context.Context, user *gtsmodel.User) error { return errs.Combine() } -func (u *userDB) GetAllUsers(ctx context.Context) ([]*gtsmodel.User, error) { +func (u *userDB) GetAllUserIDs(ctx context.Context) ([]string, error) { + if p := u.state.Caches.DB.LocalInstance.UserIDs.Load(); p != nil { + return slices.Clone(*p), nil + } + var userIDs []string // Scan all user IDs into slice. @@ -172,7 +199,16 @@ func (u *userDB) GetAllUsers(ctx context.Context) ([]*gtsmodel.User, error) { return nil, err } - // Transform user IDs into user slice. + // Store the scanned user IDs in our local cache ptr. + u.state.Caches.DB.LocalInstance.UserIDs.Store(&userIDs) + return userIDs, nil +} + +func (u *userDB) GetAllUsers(ctx context.Context) ([]*gtsmodel.User, error) { + userIDs, err := u.GetAllUserIDs(ctx) + if err != nil { + return nil, err + } return u.GetUsersByIDs(ctx, userIDs) } diff --git a/internal/processing/workers/fromclientapi_test.go b/internal/processing/workers/fromclientapi_test.go index 7da34ff42..5967d4d34 100644 --- a/internal/processing/workers/fromclientapi_test.go +++ b/internal/processing/workers/fromclientapi_test.go @@ -262,9 +262,10 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithNotification() { receivingAccount, []string{testList.ID}, ) - homeStream = streams[stream.TimelineHome] - listStream = streams[stream.TimelineList+":"+testList.ID] - notifStream = streams[stream.TimelineNotifications] + publicStream = streams[stream.TimelinePublic] + homeStream = streams[stream.TimelineHome] + listStream = streams[stream.TimelineList+":"+testList.ID] + notifStream = streams[stream.TimelineNotifications] // Admin account posts a new top-level status. status = suite.newStatus( @@ -310,6 +311,14 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithNotification() { receivingAccount, ) + // Check message in public stream. + suite.checkStreamed( + publicStream, + true, + statusJSON, + stream.EventTypeUpdate, + ) + // Check message in home stream. suite.checkStreamed( homeStream, @@ -379,9 +388,10 @@ func (suite *FromClientAPITestSuite) TestProcessCreateBackfilledStatusWithNotifi receivingAccount, []string{testList.ID}, ) - homeStream = streams[stream.TimelineHome] - listStream = streams[stream.TimelineList+":"+testList.ID] - notifStream = streams[stream.TimelineNotifications] + publicStream = streams[stream.TimelinePublic] + homeStream = streams[stream.TimelineHome] + listStream = streams[stream.TimelineList+":"+testList.ID] + notifStream = streams[stream.TimelineNotifications] // Admin account posts a new top-level status. status = suite.newStatus( @@ -420,6 +430,14 @@ func (suite *FromClientAPITestSuite) TestProcessCreateBackfilledStatusWithNotifi suite.FailNow(err.Error()) } + // There should be no message in public stream. + suite.checkStreamed( + publicStream, + false, + "", + "", + ) + // There should be no message in the home stream. suite.checkStreamed( homeStream, @@ -530,6 +548,7 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusReply() { receivingAccount = suite.testAccounts["local_account_1"] testList = suite.testLists["local_account_1_list_1"] streams = suite.openStreams(ctx, testStructs.Processor, receivingAccount, []string{testList.ID}) + publicStream = streams[stream.TimelinePublic] homeStream = streams[stream.TimelineHome] listStream = streams[stream.TimelineList+":"+testList.ID] @@ -571,6 +590,14 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusReply() { receivingAccount, ) + // Check message *not* in public stream. + suite.checkStreamed( + publicStream, + false, + "", + "", + ) + // Check message in home stream. suite.checkStreamed( homeStream, @@ -732,6 +759,7 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusListRepliesPolicyLis postingAccount = suite.testAccounts["admin_account"] receivingAccount = suite.testAccounts["local_account_1"] streams = suite.openStreams(ctx, testStructs.Processor, receivingAccount, []string{testList.ID}) + publicStream = streams[stream.TimelinePublic] homeStream = streams[stream.TimelineHome] listStream = streams[stream.TimelineList+":"+testList.ID] @@ -778,6 +806,14 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusListRepliesPolicyLis receivingAccount, ) + // Check message *not* in public stream. + suite.checkStreamed( + publicStream, + false, + "", + "", + ) + // Check message in home stream. suite.checkStreamed( homeStream, @@ -811,6 +847,7 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusListRepliesPolicyLis postingAccount = suite.testAccounts["admin_account"] receivingAccount = suite.testAccounts["local_account_1"] streams = suite.openStreams(ctx, testStructs.Processor, receivingAccount, []string{testList.ID}) + publicStream = streams[stream.TimelinePublic] homeStream = streams[stream.TimelineHome] listStream = streams[stream.TimelineList+":"+testList.ID] @@ -863,6 +900,14 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusListRepliesPolicyLis receivingAccount, ) + // Check message *not* in public stream. + suite.checkStreamed( + publicStream, + false, + "", + "", + ) + // Check message in home stream. suite.checkStreamed( homeStream, @@ -896,6 +941,7 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusReplyListRepliesPoli postingAccount = suite.testAccounts["admin_account"] receivingAccount = suite.testAccounts["local_account_1"] streams = suite.openStreams(ctx, testStructs.Processor, receivingAccount, []string{testList.ID}) + publicStream = streams[stream.TimelinePublic] homeStream = streams[stream.TimelineHome] listStream = streams[stream.TimelineList+":"+testList.ID] @@ -942,6 +988,14 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusReplyListRepliesPoli receivingAccount, ) + // Check message *not* in public stream. + suite.checkStreamed( + publicStream, + false, + "", + "", + ) + // Check message in home stream. suite.checkStreamed( homeStream, @@ -972,6 +1026,7 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoost() { receivingAccount = suite.testAccounts["local_account_1"] testList = suite.testLists["local_account_1_list_1"] streams = suite.openStreams(ctx, testStructs.Processor, receivingAccount, []string{testList.ID}) + publicStream = streams[stream.TimelinePublic] homeStream = streams[stream.TimelineHome] listStream = streams[stream.TimelineList+":"+testList.ID] @@ -1009,6 +1064,14 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoost() { receivingAccount, ) + // Check message *not* in public stream. + suite.checkStreamed( + publicStream, + false, + "", + "", + ) + // Check message in home stream. suite.checkStreamed( homeStream, @@ -1039,6 +1102,7 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoostNoReblogs() { receivingAccount = suite.testAccounts["local_account_1"] testList = suite.testLists["local_account_1_list_1"] streams = suite.openStreams(ctx, testStructs.Processor, receivingAccount, []string{testList.ID}) + publicStream = streams[stream.TimelinePublic] homeStream = streams[stream.TimelineHome] listStream = streams[stream.TimelineList+":"+testList.ID] @@ -1078,6 +1142,14 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoostNoReblogs() { suite.FailNow(err.Error()) } + // Check message *not* in public stream. + suite.checkStreamed( + publicStream, + false, + "", + "", + ) + // Check message NOT in home stream. suite.checkStreamed( homeStream, @@ -1763,8 +1835,9 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithAuthorOnExclusiv receivingAccount, []string{testList.ID}, ) - homeStream = streams[stream.TimelineHome] - listStream = streams[stream.TimelineList+":"+testList.ID] + publicStream = streams[stream.TimelinePublic] + homeStream = streams[stream.TimelineHome] + listStream = streams[stream.TimelineList+":"+testList.ID] // postingAccount posts a new public status not mentioning anyone. status = suite.newStatus( @@ -1802,6 +1875,14 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithAuthorOnExclusiv suite.FailNow(err.Error()) } + // Check status in public stream. + suite.checkStreamed( + publicStream, + true, + "", + stream.EventTypeUpdate, + ) + // Check status in list stream. suite.checkStreamed( listStream, @@ -1857,6 +1938,7 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithAuthorOnExclusiv testExclusiveList.ID, }, ) + publicStream = streams[stream.TimelinePublic] homeStream = streams[stream.TimelineHome] inclusiveListStream = streams[stream.TimelineList+":"+testInclusiveList.ID] exclusiveListStream = streams[stream.TimelineList+":"+testExclusiveList.ID] @@ -1911,6 +1993,14 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithAuthorOnExclusiv suite.FailNow(err.Error()) } + // Check status in public stream. + suite.checkStreamed( + publicStream, + true, + "", + stream.EventTypeUpdate, + ) + // Check status in inclusive list stream. suite.checkStreamed( inclusiveListStream, @@ -1957,9 +2047,10 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithAuthorOnExclusiv receivingAccount, []string{testList.ID}, ) - homeStream = streams[stream.TimelineHome] - listStream = streams[stream.TimelineList+":"+testList.ID] - notifStream = streams[stream.TimelineNotifications] + publicStream = streams[stream.TimelinePublic] + homeStream = streams[stream.TimelineHome] + listStream = streams[stream.TimelineList+":"+testList.ID] + notifStream = streams[stream.TimelineNotifications] // postingAccount posts a new public status not mentioning anyone. status = suite.newStatus( @@ -2005,6 +2096,14 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithAuthorOnExclusiv suite.FailNow(err.Error()) } + // Check status in public stream. + suite.checkStreamed( + publicStream, + true, + "", + stream.EventTypeUpdate, + ) + // Check status in list stream. suite.checkStreamed( listStream, diff --git a/internal/processing/workers/surfacetimeline.go b/internal/processing/workers/surfacetimeline.go index b1177cd28..0e30f54f7 100644 --- a/internal/processing/workers/surfacetimeline.go +++ b/internal/processing/workers/surfacetimeline.go @@ -20,6 +20,7 @@ package workers import ( "context" + apimodel "code.superseriousbusiness.org/gotosocial/internal/api/model" "code.superseriousbusiness.org/gotosocial/internal/cache/timeline" "code.superseriousbusiness.org/gotosocial/internal/gtscontext" "code.superseriousbusiness.org/gotosocial/internal/gtserror" @@ -37,6 +38,7 @@ import ( // the account, notifications for any local accounts that want // to know when this account posts, and conversations containing the status. func (s *Surface) timelineAndNotifyStatus(ctx context.Context, status *gtsmodel.Status) error { + // Ensure status fully populated; including account, mentions, etc. if err := s.State.DB.PopulateStatus(ctx, status); err != nil { return gtserror.Newf("error populating status with id %s: %w", status.ID, err) @@ -59,6 +61,11 @@ func (s *Surface) timelineAndNotifyStatus(ctx context.Context, status *gtsmodel. }) } + // Stream the status for public timelines for all local users as update msg. + if err := s.timelineStatusForPublic(ctx, status, s.Stream.Update); err != nil { + return err + } + // Timeline the status for each local follower of this account. This will // also handle notifying any followers with notify set to true on their follow. homeTimelinedAccountIDs := s.timelineAndNotifyStatusForFollowers(ctx, status, follows) @@ -68,16 +75,18 @@ func (s *Surface) timelineAndNotifyStatus(ctx context.Context, status *gtsmodel. return gtserror.Newf("error timelining status %s for tag followers: %w", status.ID, err) } - // Notify each local account that's mentioned by this status. + // Notify each local account mentioned by status. if err := s.notifyMentions(ctx, status); err != nil { return gtserror.Newf("error notifying status mentions for status %s: %w", status.ID, err) } - // Update any conversations containing this status, and send conversation notifications. + // Update any conversations containing this status, and get notifications for them. notifications, err := s.Conversations.UpdateConversationsForStatus(ctx, status) if err != nil { return gtserror.Newf("error updating conversations for status %s: %w", status.ID, err) } + + // Stream these conversation notfications. for _, notification := range notifications { s.Stream.Conversation(ctx, notification.AccountID, notification.Conversation) } @@ -85,6 +94,95 @@ func (s *Surface) timelineAndNotifyStatus(ctx context.Context, status *gtsmodel. return nil } +// timelineStatusForPublic timelines the given status +// to LOCAL and PUBLIC (i.e. federated) timelines. +func (s *Surface) timelineStatusForPublic( + ctx context.Context, + status *gtsmodel.Status, + streamFn func(context.Context, *gtsmodel.Account, *apimodel.Status, string), +) error { + // Nil check function + // outside main loop. + if streamFn == nil { + panic("nil func") + } + + if status.Visibility != gtsmodel.VisibilityPublic || + status.BoostOfID != "" { + // Fast code path, if it's not "public" + // or a boost, don't public timeline it. + return nil + } + + // Get a list of all our local users. + users, err := s.State.DB.GetAllUsers(ctx) + if err != nil { + return gtserror.Newf("error getting local users: %v", err) + } + + // Iterate our list of users. + isLocal := status.IsLocal() + for _, user := range users { + + // Check whether this status should be visible this user on public timelines. + visible, err := s.VisFilter.StatusPublicTimelineable(ctx, user.Account, status) + if err != nil { + log.Errorf(ctx, "error checking status %s visibility: %v", status.URI, err) + continue + } + + if !visible { + continue + } + + // Check whether this status is muted in any form by this user. + muted, err := s.MuteFilter.StatusMuted(ctx, user.Account, status) + if err != nil { + log.Errorf(ctx, "error checking status %s mutes: %v", status.URI, err) + continue + } + + if muted { + continue + } + + // Get status-filter results for this status in context by this user. + filtered, hidden, err := s.StatusFilter.StatusFilterResultsInContext(ctx, + user.Account, + status, + gtsmodel.FilterContextPublic, + ) + if err != nil { + log.Errorf(ctx, "error getting status %s filter results: %v", status.URI, err) + continue + } + + if hidden { + continue + } + + // Now all checks / filters are passed, convert status to frontend model. + apiStatus, err := s.Converter.StatusToAPIStatus(ctx, status, user.Account) + if err != nil { + log.Errorf(ctx, "error converting status %s: %v", status.URI, err) + continue + } + + // Set API model filter results. + apiStatus.Filtered = filtered + + if isLocal { + // This is local status, send it to local timeline stream. + streamFn(ctx, user.Account, apiStatus, stream.TimelineLocal) + } + + // For public timeline stream, send all local / remote statuses. + streamFn(ctx, user.Account, apiStatus, stream.TimelinePublic) + } + + return nil +} + // timelineAndNotifyStatusForFollowers iterates through the given // slice of followers of the account that posted the given status, // adding the status to list timelines + home timelines of each @@ -409,8 +507,9 @@ func (s *Surface) timelineAndNotifyStatusForTagFollowers( status = status.BoostOf } + var errs gtserror.MultiError + // Insert the status into the home timeline of each tag follower. - errs := gtserror.MultiError{} for _, tagFollowerAccount := range tagFollowerAccounts { _ = s.timelineStatus(ctx, s.State.Caches.Timelines.Home.MustGet(tagFollowerAccount.ID), @@ -537,6 +636,11 @@ func (s *Surface) timelineStatusUpdate(ctx context.Context, status *gtsmodel.Sta }) } + // Stream the status update for public timelines for all of our local users. + if err := s.timelineStatusForPublic(ctx, status, s.Stream.StatusUpdate); err != nil { + return err + } + // Push updated status to streams for each local follower of this account. homeTimelinedAccountIDs := s.timelineStatusUpdateForFollowers(ctx, status, follows)