[feature] add streaming of statuses and status updates to LOCAL / PUBLIC timelines (#4353)

This adds streaming of statuses and edits to LOCAL and PUBLIC timeline types. Note that in the future we should probably rearrange some of the surface code so we don't perform so many repeated mute and visibility checks on the same status in sequence.

closes https://codeberg.org/superseriousbusiness/gotosocial/issues/4342

Reviewed-on: https://codeberg.org/superseriousbusiness/gotosocial/pulls/4353
Co-authored-by: kim <grufwub@gmail.com>
Co-committed-by: kim <grufwub@gmail.com>
This commit is contained in:
kim 2025-07-30 17:54:07 +02:00 committed by kim
commit 7e45168d33
5 changed files with 275 additions and 34 deletions

View file

@ -154,6 +154,7 @@ type DBCaches struct {
Domains atomic.Pointer[int]
Statuses atomic.Pointer[int]
Users atomic.Pointer[int]
UserIDs atomic.Pointer[[]string]
}
// InteractionRequest provides access to the gtsmodel InteractionRequest database cache.

View file

@ -365,7 +365,8 @@ func (c *Caches) OnInvalidateUser(user *gtsmodel.User) {
c.Visibility.Invalidate("ItemID", user.AccountID)
c.Visibility.Invalidate("RequesterID", user.AccountID)
// Invalidate the local users count.
// Invalidate the local user IDs / count.
c.DB.LocalInstance.UserIDs.Store(nil)
c.DB.LocalInstance.Users.Store(nil)
}

View file

@ -19,12 +19,15 @@ package bundb
import (
"context"
"slices"
"time"
"code.superseriousbusiness.org/gotosocial/internal/gtscontext"
"code.superseriousbusiness.org/gotosocial/internal/gtserror"
"code.superseriousbusiness.org/gotosocial/internal/gtsmodel"
"code.superseriousbusiness.org/gotosocial/internal/log"
"code.superseriousbusiness.org/gotosocial/internal/state"
"code.superseriousbusiness.org/gotosocial/internal/util/xslices"
"github.com/uptrace/bun"
)
@ -45,27 +48,47 @@ func (u *userDB) GetUserByID(ctx context.Context, id string) (*gtsmodel.User, er
}
func (u *userDB) GetUsersByIDs(ctx context.Context, ids []string) ([]*gtsmodel.User, error) {
var (
users = make([]*gtsmodel.User, 0, len(ids))
// Load all input user IDs via cache loader callback.
users, err := u.state.Caches.DB.User.LoadIDs("ID",
ids,
func(uncached []string) ([]*gtsmodel.User, error) {
// Preallocate expected length of uncached users.
users := make([]*gtsmodel.User, 0, len(uncached))
// Collect errors instead of
// returning early on any.
errs gtserror.MultiError
// Perform database query scanning
// the remaining (uncached) user IDs.
if err := u.db.NewSelect().
Model(&users).
Where("? IN (?)", bun.Ident("id"), bun.In(uncached)).
Scan(ctx); err != nil {
return nil, err
}
return users, nil
},
)
for _, id := range ids {
// Attempt to fetch user from DB.
user, err := u.GetUserByID(ctx, id)
if err != nil {
errs.Appendf("error getting user %s: %w", id, err)
continue
}
// Append user to return slice.
users = append(users, user)
if err != nil {
return nil, err
}
return users, errs.Combine()
// Reorder the users by their
// IDs to ensure in correct order.
getID := func(s *gtsmodel.User) string { return s.ID }
xslices.OrderBy(users, ids, getID)
if gtscontext.Barebones(ctx) {
// no need to fully populate.
return users, nil
}
// Populate all loaded users.
for _, user := range users {
if err := u.PopulateUser(ctx, user); err != nil {
log.Errorf(ctx, "error populating user %s: %v", user.ID, err)
}
}
return users, nil
}
func (u *userDB) GetUserByAccountID(ctx context.Context, accountID string) (*gtsmodel.User, error) {
@ -161,7 +184,11 @@ func (u *userDB) PopulateUser(ctx context.Context, user *gtsmodel.User) error {
return errs.Combine()
}
func (u *userDB) GetAllUsers(ctx context.Context) ([]*gtsmodel.User, error) {
func (u *userDB) GetAllUserIDs(ctx context.Context) ([]string, error) {
if p := u.state.Caches.DB.LocalInstance.UserIDs.Load(); p != nil {
return slices.Clone(*p), nil
}
var userIDs []string
// Scan all user IDs into slice.
@ -172,7 +199,16 @@ func (u *userDB) GetAllUsers(ctx context.Context) ([]*gtsmodel.User, error) {
return nil, err
}
// Transform user IDs into user slice.
// Store the scanned user IDs in our local cache ptr.
u.state.Caches.DB.LocalInstance.UserIDs.Store(&userIDs)
return userIDs, nil
}
func (u *userDB) GetAllUsers(ctx context.Context) ([]*gtsmodel.User, error) {
userIDs, err := u.GetAllUserIDs(ctx)
if err != nil {
return nil, err
}
return u.GetUsersByIDs(ctx, userIDs)
}

View file

@ -262,9 +262,10 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithNotification() {
receivingAccount,
[]string{testList.ID},
)
homeStream = streams[stream.TimelineHome]
listStream = streams[stream.TimelineList+":"+testList.ID]
notifStream = streams[stream.TimelineNotifications]
publicStream = streams[stream.TimelinePublic]
homeStream = streams[stream.TimelineHome]
listStream = streams[stream.TimelineList+":"+testList.ID]
notifStream = streams[stream.TimelineNotifications]
// Admin account posts a new top-level status.
status = suite.newStatus(
@ -310,6 +311,14 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithNotification() {
receivingAccount,
)
// Check message in public stream.
suite.checkStreamed(
publicStream,
true,
statusJSON,
stream.EventTypeUpdate,
)
// Check message in home stream.
suite.checkStreamed(
homeStream,
@ -379,9 +388,10 @@ func (suite *FromClientAPITestSuite) TestProcessCreateBackfilledStatusWithNotifi
receivingAccount,
[]string{testList.ID},
)
homeStream = streams[stream.TimelineHome]
listStream = streams[stream.TimelineList+":"+testList.ID]
notifStream = streams[stream.TimelineNotifications]
publicStream = streams[stream.TimelinePublic]
homeStream = streams[stream.TimelineHome]
listStream = streams[stream.TimelineList+":"+testList.ID]
notifStream = streams[stream.TimelineNotifications]
// Admin account posts a new top-level status.
status = suite.newStatus(
@ -420,6 +430,14 @@ func (suite *FromClientAPITestSuite) TestProcessCreateBackfilledStatusWithNotifi
suite.FailNow(err.Error())
}
// There should be no message in public stream.
suite.checkStreamed(
publicStream,
false,
"",
"",
)
// There should be no message in the home stream.
suite.checkStreamed(
homeStream,
@ -530,6 +548,7 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusReply() {
receivingAccount = suite.testAccounts["local_account_1"]
testList = suite.testLists["local_account_1_list_1"]
streams = suite.openStreams(ctx, testStructs.Processor, receivingAccount, []string{testList.ID})
publicStream = streams[stream.TimelinePublic]
homeStream = streams[stream.TimelineHome]
listStream = streams[stream.TimelineList+":"+testList.ID]
@ -571,6 +590,14 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusReply() {
receivingAccount,
)
// Check message *not* in public stream.
suite.checkStreamed(
publicStream,
false,
"",
"",
)
// Check message in home stream.
suite.checkStreamed(
homeStream,
@ -732,6 +759,7 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusListRepliesPolicyLis
postingAccount = suite.testAccounts["admin_account"]
receivingAccount = suite.testAccounts["local_account_1"]
streams = suite.openStreams(ctx, testStructs.Processor, receivingAccount, []string{testList.ID})
publicStream = streams[stream.TimelinePublic]
homeStream = streams[stream.TimelineHome]
listStream = streams[stream.TimelineList+":"+testList.ID]
@ -778,6 +806,14 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusListRepliesPolicyLis
receivingAccount,
)
// Check message *not* in public stream.
suite.checkStreamed(
publicStream,
false,
"",
"",
)
// Check message in home stream.
suite.checkStreamed(
homeStream,
@ -811,6 +847,7 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusListRepliesPolicyLis
postingAccount = suite.testAccounts["admin_account"]
receivingAccount = suite.testAccounts["local_account_1"]
streams = suite.openStreams(ctx, testStructs.Processor, receivingAccount, []string{testList.ID})
publicStream = streams[stream.TimelinePublic]
homeStream = streams[stream.TimelineHome]
listStream = streams[stream.TimelineList+":"+testList.ID]
@ -863,6 +900,14 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusListRepliesPolicyLis
receivingAccount,
)
// Check message *not* in public stream.
suite.checkStreamed(
publicStream,
false,
"",
"",
)
// Check message in home stream.
suite.checkStreamed(
homeStream,
@ -896,6 +941,7 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusReplyListRepliesPoli
postingAccount = suite.testAccounts["admin_account"]
receivingAccount = suite.testAccounts["local_account_1"]
streams = suite.openStreams(ctx, testStructs.Processor, receivingAccount, []string{testList.ID})
publicStream = streams[stream.TimelinePublic]
homeStream = streams[stream.TimelineHome]
listStream = streams[stream.TimelineList+":"+testList.ID]
@ -942,6 +988,14 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusReplyListRepliesPoli
receivingAccount,
)
// Check message *not* in public stream.
suite.checkStreamed(
publicStream,
false,
"",
"",
)
// Check message in home stream.
suite.checkStreamed(
homeStream,
@ -972,6 +1026,7 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoost() {
receivingAccount = suite.testAccounts["local_account_1"]
testList = suite.testLists["local_account_1_list_1"]
streams = suite.openStreams(ctx, testStructs.Processor, receivingAccount, []string{testList.ID})
publicStream = streams[stream.TimelinePublic]
homeStream = streams[stream.TimelineHome]
listStream = streams[stream.TimelineList+":"+testList.ID]
@ -1009,6 +1064,14 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoost() {
receivingAccount,
)
// Check message *not* in public stream.
suite.checkStreamed(
publicStream,
false,
"",
"",
)
// Check message in home stream.
suite.checkStreamed(
homeStream,
@ -1039,6 +1102,7 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoostNoReblogs() {
receivingAccount = suite.testAccounts["local_account_1"]
testList = suite.testLists["local_account_1_list_1"]
streams = suite.openStreams(ctx, testStructs.Processor, receivingAccount, []string{testList.ID})
publicStream = streams[stream.TimelinePublic]
homeStream = streams[stream.TimelineHome]
listStream = streams[stream.TimelineList+":"+testList.ID]
@ -1078,6 +1142,14 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoostNoReblogs() {
suite.FailNow(err.Error())
}
// Check message *not* in public stream.
suite.checkStreamed(
publicStream,
false,
"",
"",
)
// Check message NOT in home stream.
suite.checkStreamed(
homeStream,
@ -1763,8 +1835,9 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithAuthorOnExclusiv
receivingAccount,
[]string{testList.ID},
)
homeStream = streams[stream.TimelineHome]
listStream = streams[stream.TimelineList+":"+testList.ID]
publicStream = streams[stream.TimelinePublic]
homeStream = streams[stream.TimelineHome]
listStream = streams[stream.TimelineList+":"+testList.ID]
// postingAccount posts a new public status not mentioning anyone.
status = suite.newStatus(
@ -1802,6 +1875,14 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithAuthorOnExclusiv
suite.FailNow(err.Error())
}
// Check status in public stream.
suite.checkStreamed(
publicStream,
true,
"",
stream.EventTypeUpdate,
)
// Check status in list stream.
suite.checkStreamed(
listStream,
@ -1857,6 +1938,7 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithAuthorOnExclusiv
testExclusiveList.ID,
},
)
publicStream = streams[stream.TimelinePublic]
homeStream = streams[stream.TimelineHome]
inclusiveListStream = streams[stream.TimelineList+":"+testInclusiveList.ID]
exclusiveListStream = streams[stream.TimelineList+":"+testExclusiveList.ID]
@ -1911,6 +1993,14 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithAuthorOnExclusiv
suite.FailNow(err.Error())
}
// Check status in public stream.
suite.checkStreamed(
publicStream,
true,
"",
stream.EventTypeUpdate,
)
// Check status in inclusive list stream.
suite.checkStreamed(
inclusiveListStream,
@ -1957,9 +2047,10 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithAuthorOnExclusiv
receivingAccount,
[]string{testList.ID},
)
homeStream = streams[stream.TimelineHome]
listStream = streams[stream.TimelineList+":"+testList.ID]
notifStream = streams[stream.TimelineNotifications]
publicStream = streams[stream.TimelinePublic]
homeStream = streams[stream.TimelineHome]
listStream = streams[stream.TimelineList+":"+testList.ID]
notifStream = streams[stream.TimelineNotifications]
// postingAccount posts a new public status not mentioning anyone.
status = suite.newStatus(
@ -2005,6 +2096,14 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithAuthorOnExclusiv
suite.FailNow(err.Error())
}
// Check status in public stream.
suite.checkStreamed(
publicStream,
true,
"",
stream.EventTypeUpdate,
)
// Check status in list stream.
suite.checkStreamed(
listStream,

View file

@ -20,6 +20,7 @@ package workers
import (
"context"
apimodel "code.superseriousbusiness.org/gotosocial/internal/api/model"
"code.superseriousbusiness.org/gotosocial/internal/cache/timeline"
"code.superseriousbusiness.org/gotosocial/internal/gtscontext"
"code.superseriousbusiness.org/gotosocial/internal/gtserror"
@ -37,6 +38,7 @@ import (
// the account, notifications for any local accounts that want
// to know when this account posts, and conversations containing the status.
func (s *Surface) timelineAndNotifyStatus(ctx context.Context, status *gtsmodel.Status) error {
// Ensure status fully populated; including account, mentions, etc.
if err := s.State.DB.PopulateStatus(ctx, status); err != nil {
return gtserror.Newf("error populating status with id %s: %w", status.ID, err)
@ -59,6 +61,11 @@ func (s *Surface) timelineAndNotifyStatus(ctx context.Context, status *gtsmodel.
})
}
// Stream the status for public timelines for all local users as update msg.
if err := s.timelineStatusForPublic(ctx, status, s.Stream.Update); err != nil {
return err
}
// Timeline the status for each local follower of this account. This will
// also handle notifying any followers with notify set to true on their follow.
homeTimelinedAccountIDs := s.timelineAndNotifyStatusForFollowers(ctx, status, follows)
@ -68,16 +75,18 @@ func (s *Surface) timelineAndNotifyStatus(ctx context.Context, status *gtsmodel.
return gtserror.Newf("error timelining status %s for tag followers: %w", status.ID, err)
}
// Notify each local account that's mentioned by this status.
// Notify each local account mentioned by status.
if err := s.notifyMentions(ctx, status); err != nil {
return gtserror.Newf("error notifying status mentions for status %s: %w", status.ID, err)
}
// Update any conversations containing this status, and send conversation notifications.
// Update any conversations containing this status, and get notifications for them.
notifications, err := s.Conversations.UpdateConversationsForStatus(ctx, status)
if err != nil {
return gtserror.Newf("error updating conversations for status %s: %w", status.ID, err)
}
// Stream these conversation notfications.
for _, notification := range notifications {
s.Stream.Conversation(ctx, notification.AccountID, notification.Conversation)
}
@ -85,6 +94,95 @@ func (s *Surface) timelineAndNotifyStatus(ctx context.Context, status *gtsmodel.
return nil
}
// timelineStatusForPublic timelines the given status
// to LOCAL and PUBLIC (i.e. federated) timelines.
func (s *Surface) timelineStatusForPublic(
ctx context.Context,
status *gtsmodel.Status,
streamFn func(context.Context, *gtsmodel.Account, *apimodel.Status, string),
) error {
// Nil check function
// outside main loop.
if streamFn == nil {
panic("nil func")
}
if status.Visibility != gtsmodel.VisibilityPublic ||
status.BoostOfID != "" {
// Fast code path, if it's not "public"
// or a boost, don't public timeline it.
return nil
}
// Get a list of all our local users.
users, err := s.State.DB.GetAllUsers(ctx)
if err != nil {
return gtserror.Newf("error getting local users: %v", err)
}
// Iterate our list of users.
isLocal := status.IsLocal()
for _, user := range users {
// Check whether this status should be visible this user on public timelines.
visible, err := s.VisFilter.StatusPublicTimelineable(ctx, user.Account, status)
if err != nil {
log.Errorf(ctx, "error checking status %s visibility: %v", status.URI, err)
continue
}
if !visible {
continue
}
// Check whether this status is muted in any form by this user.
muted, err := s.MuteFilter.StatusMuted(ctx, user.Account, status)
if err != nil {
log.Errorf(ctx, "error checking status %s mutes: %v", status.URI, err)
continue
}
if muted {
continue
}
// Get status-filter results for this status in context by this user.
filtered, hidden, err := s.StatusFilter.StatusFilterResultsInContext(ctx,
user.Account,
status,
gtsmodel.FilterContextPublic,
)
if err != nil {
log.Errorf(ctx, "error getting status %s filter results: %v", status.URI, err)
continue
}
if hidden {
continue
}
// Now all checks / filters are passed, convert status to frontend model.
apiStatus, err := s.Converter.StatusToAPIStatus(ctx, status, user.Account)
if err != nil {
log.Errorf(ctx, "error converting status %s: %v", status.URI, err)
continue
}
// Set API model filter results.
apiStatus.Filtered = filtered
if isLocal {
// This is local status, send it to local timeline stream.
streamFn(ctx, user.Account, apiStatus, stream.TimelineLocal)
}
// For public timeline stream, send all local / remote statuses.
streamFn(ctx, user.Account, apiStatus, stream.TimelinePublic)
}
return nil
}
// timelineAndNotifyStatusForFollowers iterates through the given
// slice of followers of the account that posted the given status,
// adding the status to list timelines + home timelines of each
@ -409,8 +507,9 @@ func (s *Surface) timelineAndNotifyStatusForTagFollowers(
status = status.BoostOf
}
var errs gtserror.MultiError
// Insert the status into the home timeline of each tag follower.
errs := gtserror.MultiError{}
for _, tagFollowerAccount := range tagFollowerAccounts {
_ = s.timelineStatus(ctx,
s.State.Caches.Timelines.Home.MustGet(tagFollowerAccount.ID),
@ -537,6 +636,11 @@ func (s *Surface) timelineStatusUpdate(ctx context.Context, status *gtsmodel.Sta
})
}
// Stream the status update for public timelines for all of our local users.
if err := s.timelineStatusForPublic(ctx, status, s.Stream.StatusUpdate); err != nil {
return err
}
// Push updated status to streams for each local follower of this account.
homeTimelinedAccountIDs := s.timelineStatusUpdateForFollowers(ctx, status, follows)