[bugfix] process account delete synchronously to prevent OOM (#4260)

# Description

- updates account delete processing to handle side-effects synchronously to prevent OOM
- updates account delete processing to check more often if account.IsLocal() for certain deletes / side-effects
- ensures that mutes get removed from database on delete

## Checklist

- [x] I/we have read the [GoToSocial contribution guidelines](https://codeberg.org/superseriousbusiness/gotosocial/src/branch/main/CONTRIBUTING.md).
- [x] I/we have discussed the proposed changes already, either in an issue on the repository, or in the Matrix chat.
- [x] I/we have not leveraged AI to create the proposed changes.
- [x] I/we have performed a self-review of added code.
- [x] I/we have written code that is legible and maintainable by others.
- [x] I/we have commented the added code, particularly in hard-to-understand areas.
- [ ] I/we have made any necessary changes to documentation.
- [ ] I/we have added tests that cover new code.
- [ ] I/we have run tests and they pass locally with the changes.
- [x] I/we have run `go fmt ./...` and `golangci-lint run`.

Reviewed-on: https://codeberg.org/superseriousbusiness/gotosocial/pulls/4260
Co-authored-by: kim <grufwub@gmail.com>
Co-committed-by: kim <grufwub@gmail.com>
This commit is contained in:
kim 2025-06-11 11:38:10 +02:00 committed by tobi
commit e87681d433
11 changed files with 433 additions and 377 deletions

View file

@ -20,6 +20,7 @@ package bundb
import (
"context"
"errors"
"slices"
"time"
"code.superseriousbusiness.org/gotosocial/internal/db"
@ -170,16 +171,24 @@ func (r *relationshipDB) GetAccountFollowRequesting(ctx context.Context, account
return r.GetFollowRequestsByIDs(ctx, followReqIDs)
}
func (r *relationshipDB) GetAccountBlocks(ctx context.Context, accountID string, page *paging.Page) ([]*gtsmodel.Block, error) {
blockIDs, err := r.GetAccountBlockIDs(ctx, accountID, page)
func (r *relationshipDB) GetAccountBlocking(ctx context.Context, accountID string, page *paging.Page) ([]*gtsmodel.Block, error) {
blockIDs, err := r.GetAccountBlockingIDs(ctx, accountID, page)
if err != nil {
return nil, err
}
return r.GetBlocksByIDs(ctx, blockIDs)
}
func (r *relationshipDB) CountAccountBlocks(ctx context.Context, accountID string) (int, error) {
blockIDs, err := r.GetAccountBlockIDs(ctx, accountID, nil)
func (r *relationshipDB) GetAccountBlockedBy(ctx context.Context, accountID string, page *paging.Page) ([]*gtsmodel.Block, error) {
blockIDs, err := r.GetAccountBlockedByIDs(ctx, accountID, page)
if err != nil {
return nil, err
}
return r.GetBlocksByIDs(ctx, blockIDs)
}
func (r *relationshipDB) CountAccountBlocking(ctx context.Context, accountID string) (int, error) {
blockIDs, err := r.GetAccountBlockingIDs(ctx, accountID, nil)
return len(blockIDs), err
}
@ -273,12 +282,12 @@ func (r *relationshipDB) GetAccountFollowRequestingIDs(ctx context.Context, acco
})
}
func (r *relationshipDB) GetAccountBlockIDs(ctx context.Context, accountID string, page *paging.Page) ([]string, error) {
func (r *relationshipDB) GetAccountBlockingIDs(ctx context.Context, accountID string, page *paging.Page) ([]string, error) {
return loadPagedIDs(&r.state.Caches.DB.BlockIDs, accountID, page, func() ([]string, error) {
var blockIDs []string
// Block IDs not in cache, perform DB query!
q := newSelectBlocks(r.db, accountID)
q := newSelectBlocking(r.db, accountID)
if _, err := q.Exec(ctx, &blockIDs); // nocollapse
err != nil && !errors.Is(err, db.ErrNoEntries) {
return nil, err
@ -288,6 +297,33 @@ func (r *relationshipDB) GetAccountBlockIDs(ctx context.Context, accountID strin
})
}
func (r *relationshipDB) GetAccountBlockedByIDs(ctx context.Context, accountID string, page *paging.Page) ([]string, error) {
var blockIDs []string
// NOTE that we are specifically not using
// any caching here, as this is only called
// when deleting an account, i.e. pointless!
// Block IDs not in cache, perform DB query!
q := newSelectBlockedBy(r.db, accountID)
if _, err := q.Exec(ctx, &blockIDs); // nocollapse
err != nil && !errors.Is(err, db.ErrNoEntries) {
return nil, err
}
// Our selected IDs are ALWAYS fetched
// from `loadDESC` in descending order.
// Depending on the paging requested
// this may be an unexpected order.
if page.GetOrder().Ascending() {
slices.Reverse(blockIDs)
}
// Page the resulting block IDs.
blockIDs = page.Page(blockIDs)
return blockIDs, nil
}
// newSelectFollowRequests returns a new select query for all rows in the follow_requests table with target_account_id = accountID.
func newSelectFollowRequests(db *bun.DB, accountID string) *bun.SelectQuery {
return db.NewSelect().
@ -360,11 +396,20 @@ func newSelectLocalFollowers(db *bun.DB, accountID string) *bun.SelectQuery {
OrderExpr("? DESC", bun.Ident("created_at"))
}
// newSelectBlocks returns a new select query for all rows in the blocks table with account_id = accountID.
func newSelectBlocks(db *bun.DB, accountID string) *bun.SelectQuery {
// newSelectBlocking returns a new select query for all rows in the blocks table with account_id = accountID.
func newSelectBlocking(db *bun.DB, accountID string) *bun.SelectQuery {
return db.NewSelect().
TableExpr("?", bun.Ident("blocks")).
ColumnExpr("?", bun.Ident("id")).
Where("? = ?", bun.Ident("account_id"), accountID).
OrderExpr("? DESC", bun.Ident("id"))
}
// newSelectBlocking returns a new select query for all rows in the blocks table with target_account_id = accountID.
func newSelectBlockedBy(db *bun.DB, accountID string) *bun.SelectQuery {
return db.NewSelect().
TableExpr("?", bun.Ident("blocks")).
ColumnExpr("?", bun.Ident("id")).
Where("? = ?", bun.Ident("target_account_id"), accountID).
OrderExpr("? DESC", bun.Ident("id"))
}

View file

@ -176,14 +176,20 @@ type Relationship interface {
// GetAccountFollowRequestingIDs is like GetAccountFollowRequesting, but returns just IDs.
GetAccountFollowRequestingIDs(ctx context.Context, accountID string, page *paging.Page) ([]string, error)
// GetAccountBlocks returns all blocks originating from the given account, with given optional paging parameters.
GetAccountBlocks(ctx context.Context, accountID string, paging *paging.Page) ([]*gtsmodel.Block, error)
// GetAccountBlocking returns all blocks originating from the given account, with given optional paging parameters.
GetAccountBlocking(ctx context.Context, accountID string, paging *paging.Page) ([]*gtsmodel.Block, error)
// GetAccountBlockIDs is like GetAccountBlocks, but returns just IDs.
GetAccountBlockIDs(ctx context.Context, accountID string, page *paging.Page) ([]string, error)
// GetAccountBlockingIDs is like GetAccountBlocking, but returns just IDs.
GetAccountBlockingIDs(ctx context.Context, accountID string, page *paging.Page) ([]string, error)
// CountAccountBlocks counts the number of blocks owned by the given account.
CountAccountBlocks(ctx context.Context, accountID string) (int, error)
// GetAccountBlockedBy returns all blocks targeting the given account, with optional paging parameters.
GetAccountBlockedBy(ctx context.Context, accountID string, page *paging.Page) ([]*gtsmodel.Block, error)
// GetAccountBlockedByIDs is like GetAccountBlockedBy, but returns just IDs.
GetAccountBlockedByIDs(ctx context.Context, accountID string, page *paging.Page) ([]string, error)
// CountAccountBlocking counts the number of blocks owned by the given account.
CountAccountBlocking(ctx context.Context, accountID string) (int, error)
// GetNote gets a private note from a source account on a target account, if it exists.
GetNote(ctx context.Context, sourceAccountID string, targetAccountID string) (*gtsmodel.AccountNote, error)

View file

@ -20,15 +20,14 @@ package middleware
import (
"fmt"
"net/http"
"runtime"
"strings"
"time"
"code.superseriousbusiness.org/gotosocial/internal/gtscontext"
"code.superseriousbusiness.org/gotosocial/internal/gtserror"
"code.superseriousbusiness.org/gotosocial/internal/log"
"code.superseriousbusiness.org/gotosocial/internal/util"
"codeberg.org/gruf/go-bytesize"
"codeberg.org/gruf/go-errors/v2"
"codeberg.org/gruf/go-kv"
"github.com/gin-gonic/gin"
)
@ -49,23 +48,18 @@ func Logger(logClientIP bool) gin.HandlerFunc {
// Get request context.
ctx := c.Request.Context()
if r := recover(); r != nil {
// Recover from any panics
// and dump stack to stderr.
if r := util.Recover(); r != nil {
if code == 0 {
// No response was written, send a generic Internal Error
c.Writer.WriteHeader(http.StatusInternalServerError)
}
// Append panic information to the request ctx
// Append panic information to the request.
err := fmt.Errorf("recovered panic: %v", r)
_ = c.Error(err)
// Dump a stacktrace to error log
pcs := make([]uintptr, 10)
n := runtime.Callers(3, pcs)
iter := runtime.CallersFrames(pcs[:n])
callers := errors.Callers(gatherFrames(iter, n))
log.WithContext(c.Request.Context()).
WithField("stacktrace", callers).Error(err)
}
// Initialize the logging fields
@ -134,7 +128,7 @@ func Logger(logClientIP bool) gin.HandlerFunc {
}
}
// Generate a nicer looking bytecount
// Generate a nicer looking bytecount.
size := bytesize.Size(c.Writer.Size()) // #nosec G115 -- Just logging
// Write log entry with status text + body size.
@ -152,19 +146,3 @@ func Logger(logClientIP bool) gin.HandlerFunc {
func sensitiveQuery(query string) bool {
return strings.Contains(query, "token")
}
// gatherFrames gathers runtime frames from a frame iterator.
func gatherFrames(iter *runtime.Frames, n int) []runtime.Frame {
if iter == nil {
return nil
}
frames := make([]runtime.Frame, 0, n)
for {
f, ok := iter.Next()
if !ok {
break
}
frames = append(frames, f)
}
return frames
}

View file

@ -137,7 +137,7 @@ func (p *Processor) BlocksGet(
requestingAccount *gtsmodel.Account,
page *paging.Page,
) (*apimodel.PageableResponse, gtserror.WithCode) {
blocks, err := p.state.DB.GetAccountBlocks(ctx,
blocks, err := p.state.DB.GetAccountBlocking(ctx,
requestingAccount.ID,
page,
)

View file

@ -36,309 +36,300 @@ import (
"golang.org/x/crypto/bcrypt"
)
const deleteSelectLimit = 50
const deleteSelectLimit = 100
// Delete deletes an account, and all of that account's statuses, media, follows, notifications, etc etc etc.
// The origin passed here should be either the ID of the account doing the delete (can be itself), or the ID of a domain block.
func (p *Processor) Delete(
ctx context.Context,
account *gtsmodel.Account,
origin string,
) gtserror.WithCode {
l := log.WithContext(ctx).WithFields(kv.Fields{
{"username", account.Username},
{"domain", account.Domain},
//
// This delete function handles the case of both local and remote accounts, and processes side
// effects synchronously to not clog worker queues with potentially tens-of-thousands of requests.
func (p *Processor) Delete(ctx context.Context, account *gtsmodel.Account, origin string) error {
// Prepare a new log entry for account delete.
log := log.WithContext(ctx).WithFields(kv.Fields{
{"uri", account.URI},
{"origin", origin},
}...)
l.Trace("beginning account delete process")
// Delete statuses *before* follows to ensure correct addressing
// of any outgoing fedi messages generated by deleting statuses.
if err := p.deleteAccountStatuses(ctx, account); err != nil {
l.Errorf("continuing after error during account delete: %v", err)
}
var err error
if err := p.deleteAccountFollows(ctx, account); err != nil {
l.Errorf("continuing after error during account delete: %v", err)
}
// Log operation start / stop.
log.Info("start account delete")
defer func() {
if err != nil {
log.Errorf("fatal error during account delete: %v", err)
} else {
log.Info("finished account delete")
}
}()
if err := p.deleteAccountBlocks(ctx, account); err != nil {
l.Errorf("continuing after error during account delete: %v", err)
}
// Delete statuses *before* anything else as for local
// accounts we need to federate out deletes, which relies
// on follows for addressing the appropriate accounts.
p.deleteAccountStatuses(ctx, &log, account)
if err := p.deleteAccountNotifications(ctx, account); err != nil {
l.Errorf("continuing after error during account delete: %v", err)
}
// Now delete relationships to / from account.
p.deleteAccountRelations(ctx, &log, account)
if err := p.deleteAccountPeripheral(ctx, account); err != nil {
l.Errorf("continuing after error during account delete: %v", err)
}
// Now delete any notifications to / from account.
p.deleteAccountNotifications(ctx, &log, account)
// Delete other peripheral objects ownable /
// manageable by any local / remote account.
p.deleteAccountPeripheral(ctx, &log, account)
if account.IsLocal() {
// We delete tokens, applications and clients for
// account as one of the last stages during deletion,
// as other database models rely on these.
if err := p.deleteUserAndTokensForAccount(ctx, account); err != nil {
l.Errorf("continuing after error during account delete: %v", err)
if err = p.deleteUserAndTokensForAccount(ctx, &log, account); err != nil {
return err
}
}
// To prevent the account being created again,
// stubbify it and update it in the db.
// The account will not be deleted, but it
// will become completely unusable.
// (which would cause horrible federation shenanigans),
// the account will be stubbed out to an unusable state
// with no identifying info remaining, but NOT deleted.
columns := stubbifyAccount(account, origin)
if err := p.state.DB.UpdateAccount(ctx, account, columns...); err != nil {
return gtserror.NewErrorInternalError(err)
if err = p.state.DB.UpdateAccount(ctx, account, columns...); err != nil {
return gtserror.Newf("error stubbing out account: %v", err)
}
l.Info("account delete process complete")
return nil
}
// deleteUserAndTokensForAccount deletes the gtsmodel.User and
// any OAuth tokens, applications, and Web Push subscriptions for the given account.
//
// Callers to this function should already have checked that
// this is a local account, or else it won't have a user associated
// with it, and this will fail.
func (p *Processor) deleteUserAndTokensForAccount(ctx context.Context, account *gtsmodel.Account) error {
func (p *Processor) deleteUserAndTokensForAccount(
ctx context.Context,
log *log.Entry,
account *gtsmodel.Account,
) error {
// Fetch the associated user for account, on fail return
// early as all other parts of this func rely on this user.
user, err := p.state.DB.GetUserByAccountID(ctx, account.ID)
if err != nil {
return gtserror.Newf("db error getting user: %w", err)
return gtserror.Newf("error getting account user: %v", err)
}
// Get all applications owned by user.
apps, err := p.state.DB.GetApplicationsManagedByUserID(ctx, user.ID, nil)
// Get list of applications managed by deleting user.
apps, err := p.state.DB.GetApplicationsManagedByUserID(ctx,
user.ID,
nil, // i.e. all
)
if err != nil {
return gtserror.Newf("db error getting apps: %w", err)
log.Errorf("error getting user applications: %v", err)
}
// Delete each app and any tokens it had created
// (not necessarily owned by deleted account).
for _, a := range apps {
if err := p.state.DB.DeleteApplicationByID(ctx, a.ID); err != nil {
return gtserror.Newf("db error deleting app: %w", err)
for _, app := range apps {
if err := p.state.DB.DeleteTokensByClientID(ctx, app.ClientID); err != nil {
log.Errorf("error deleting application token: %v", err)
}
if err := p.state.DB.DeleteTokensByClientID(ctx, a.ClientID); err != nil {
return gtserror.Newf("db error deleting tokens for app: %w", err)
if err := p.state.DB.DeleteApplicationByID(ctx, app.ID); err != nil {
log.Errorf("error deleting user application: %v", err)
}
}
// Get any remaining access tokens owned by user.
tokens, err := p.state.DB.GetAccessTokens(ctx, user.ID, nil)
tokens, err := p.state.DB.GetAccessTokens(ctx,
user.ID,
nil, // i.e. all
)
if err != nil {
return gtserror.Newf("db error getting tokens: %w", err)
log.Errorf("error getting user access tokens: %v", err)
}
// Delete each token.
for _, t := range tokens {
if err := p.state.DB.DeleteTokenByID(ctx, t.ID); err != nil {
return gtserror.Newf("db error deleting token: %w", err)
// Delete user access tokens.
for _, token := range tokens {
if err := p.state.DB.DeleteTokenByID(ctx, token.ID); err != nil {
log.Errorf("error deleting user access token: %v", err)
}
}
// Delete any web push subscriptions created by this local user account.
if err := p.state.DB.DeleteWebPushSubscriptionsByAccountID(ctx, account.ID); err != nil {
return gtserror.Newf("db error deleting Web Push subscriptions: %w", err)
}
columns, err := stubbifyUser(user)
if err != nil {
return gtserror.Newf("error stubbifying user: %w", err)
log.Errorf("error deleting account web push subscriptions: %v", err)
}
// To prevent the user being created again,
// the user will be stubbed out to an unusable state
// with no identifying info remaining, but NOT deleted.
columns := stubbifyUser(user)
if err := p.state.DB.UpdateUser(ctx, user, columns...); err != nil {
return gtserror.Newf("db error updating user: %w", err)
return gtserror.Newf("error stubbing out user: %w", err)
}
return nil
}
// deleteAccountFollows deletes:
// - Follows targeting account.
// - Follow requests targeting account.
// - Follows created by account.
// - Follow requests created by account.
func (p *Processor) deleteAccountFollows(ctx context.Context, account *gtsmodel.Account) error {
// Delete follows targeting this account.
followedBy, err := p.state.DB.GetAccountFollowers(ctx, account.ID, nil)
func (p *Processor) deleteAccountRelations(
ctx context.Context,
log *log.Entry,
account *gtsmodel.Account,
) {
// Get a list of the follows targeting this account.
followedBy, err := p.state.DB.GetAccountFollowers(ctx,
account.ID,
nil, // i.e. all
)
if err != nil && !errors.Is(err, db.ErrNoEntries) {
return gtserror.Newf("db error getting follows targeting account %s: %w", account.ID, err)
log.Errorf("error getting account followed-bys: %v", err)
}
// Delete these follows from database.
for _, follow := range followedBy {
if err := p.state.DB.DeleteFollowByID(ctx, follow.ID); err != nil {
return gtserror.Newf("db error unfollowing account followedBy: %w", err)
log.Errorf("error deleting account followed-by %s: %v", follow.URI, err)
}
}
// Delete follow requests targeting this account.
followRequestedBy, err := p.state.DB.GetAccountFollowRequests(ctx, account.ID, nil)
if err != nil && !errors.Is(err, db.ErrNoEntries) {
return gtserror.Newf("db error getting follow requests targeting account %s: %w", account.ID, err)
}
for _, followRequest := range followRequestedBy {
if err := p.state.DB.DeleteFollowRequestByID(ctx, followRequest.ID); err != nil {
return gtserror.Newf("db error unfollowing account followRequestedBy: %w", err)
}
}
var (
// Use this slice to batch unfollow messages.
msgs = []*messages.FromClientAPI{}
// To avoid checking if account is local over + over
// inside the subsequent loops, just generate static
// side effects function once now.
unfollowSideEffects = p.unfollowSideEffectsFunc(account.IsLocal())
// Get a list of the follow requests targeting this account.
followRequestedBy, err := p.state.DB.GetAccountFollowRequests(ctx,
account.ID,
nil, // i.e. all
)
// Delete follows originating from this account.
following, err := p.state.DB.GetAccountFollows(ctx, account.ID, nil)
if err != nil && !errors.Is(err, db.ErrNoEntries) {
return gtserror.Newf("db error getting follows owned by account %s: %w", account.ID, err)
log.Errorf("error getting account follow-requested-bys: %v", err)
}
// For each follow owned by this account, unfollow
// and process side effects (noop if remote account).
// Delete these follow requests from database.
for _, followReq := range followRequestedBy {
if err := p.state.DB.DeleteFollowRequestByID(ctx, followReq.ID); err != nil {
log.Errorf("error deleting account follow-requested-by %s: %v", followReq.URI, err)
}
}
// Get a list of the blocks targeting this account.
blockedBy, err := p.state.DB.GetAccountBlockedBy(ctx,
account.ID,
nil, // i.e. all
)
if err != nil && !errors.Is(err, db.ErrNoEntries) {
log.Errorf("error getting account blocked-bys: %v", err)
}
// Delete these blocks from database.
for _, block := range blockedBy {
if err := p.state.DB.DeleteBlockByID(ctx, block.ID); err != nil {
log.Errorf("error deleting account blocked-by %s: %v", block.URI, err)
}
}
// Get the follows originating from this account.
following, err := p.state.DB.GetAccountFollows(ctx,
account.ID,
nil, // i.e. all
)
if err != nil && !errors.Is(err, db.ErrNoEntries) {
log.Errorf("error getting account follows: %v", err)
}
// Delete these follows from database.
for _, follow := range following {
if err := p.state.DB.DeleteFollowByID(ctx, follow.ID); err != nil {
return gtserror.Newf("db error unfollowing account: %w", err)
}
if msg := unfollowSideEffects(ctx, account, follow); msg != nil {
// There was a side effect to process.
msgs = append(msgs, msg)
log.Errorf("error deleting account followed %s: %v", follow.URI, err)
}
}
// Delete follow requests originating from this account.
followRequesting, err := p.state.DB.GetAccountFollowRequesting(ctx, account.ID, nil)
// Get a list of the follow requests originating from this account.
followRequesting, err := p.state.DB.GetAccountFollowRequesting(ctx,
account.ID,
nil, // i.e. all
)
if err != nil && !errors.Is(err, db.ErrNoEntries) {
return gtserror.Newf("db error getting follow requests owned by account %s: %w", account.ID, err)
log.Errorf("error getting account follow-requests: %v", err)
}
// For each follow owned by this account, unfollow
// and process side effects (noop if remote account).
for _, followRequest := range followRequesting {
if err := p.state.DB.DeleteFollowRequestByID(ctx, followRequest.ID); err != nil {
return gtserror.Newf("db error unfollowingRequesting account: %w", err)
}
// Dummy out a follow so our side effects func
// has something to work with. This follow will
// never enter the db, it's just for convenience.
follow := &gtsmodel.Follow{
URI: followRequest.URI,
AccountID: followRequest.AccountID,
Account: followRequest.Account,
TargetAccountID: followRequest.TargetAccountID,
TargetAccount: followRequest.TargetAccount,
}
if msg := unfollowSideEffects(ctx, account, follow); msg != nil {
// There was a side effect to process.
msgs = append(msgs, msg)
// Delete these follow requests from database.
for _, followReq := range followRequesting {
if err := p.state.DB.DeleteFollowRequestByID(ctx, followReq.ID); err != nil {
log.Errorf("error deleting account follow-request %s: %v", followReq.URI, err)
}
}
// Process accreted messages in serial.
for _, msg := range msgs {
if err := p.state.Workers.Client.Process(ctx, msg); err != nil {
log.Errorf(
ctx,
"error processing %s of %s during Delete of account %s: %v",
msg.APActivityType, msg.APObjectType, account.ID, err,
// Get the blocks originating from this account.
blocking, err := p.state.DB.GetAccountBlocking(ctx,
account.ID,
nil, // i.e. all
)
if err != nil && !errors.Is(err, db.ErrNoEntries) {
log.Errorf("error getting account blocks: %v", err)
}
// Delete these blocks from database.
for _, block := range blocking {
if err := p.state.DB.DeleteBlockByID(ctx, block.ID); err != nil {
log.Errorf("error deleting account block %s: %v", block.URI, err)
}
}
// Delete all mutes targetting / originating from account.
if err := p.state.DB.DeleteAccountMutes(ctx, account.ID); // nocollapse
err != nil && !errors.Is(err, db.ErrNoEntries) {
log.Errorf("error deleting mutes to / from account: %v", err)
}
if account.IsLocal() {
// Process side-effects for deleting
// of account follows from local user.
for _, follow := range following {
p.processSideEffect(ctx, log,
ap.ActivityUndo,
ap.ActivityFollow,
follow,
account,
follow.TargetAccount,
)
}
// Process side-effects for deleting of account follow requests
// from local user. Though handled as though UNDO of a follow.
for _, followReq := range followRequesting {
p.processSideEffect(ctx, log,
ap.ActivityUndo,
ap.ActivityFollow,
&gtsmodel.Follow{
ID: followReq.ID,
URI: followReq.URI,
AccountID: followReq.AccountID,
Account: followReq.Account,
TargetAccountID: followReq.TargetAccountID,
TargetAccount: followReq.TargetAccount,
ShowReblogs: new(bool),
Notify: new(bool),
},
account,
followReq.TargetAccount,
)
}
// Process side-effects for deleting
// of account blocks from local user.
for _, block := range blocking {
p.processSideEffect(ctx, log,
ap.ActivityUndo,
ap.ActivityBlock,
block,
account,
block.TargetAccount,
)
}
}
return nil
}
func (p *Processor) unfollowSideEffectsFunc(local bool) func(
ctx context.Context,
account *gtsmodel.Account,
follow *gtsmodel.Follow,
) *messages.FromClientAPI {
if !local {
// Don't try to process side effects
// for accounts that aren't local.
return func(
_ context.Context,
_ *gtsmodel.Account,
_ *gtsmodel.Follow,
) *messages.FromClientAPI {
// noop
return nil
}
}
return func(
ctx context.Context,
account *gtsmodel.Account,
follow *gtsmodel.Follow,
) *messages.FromClientAPI {
if follow.TargetAccount == nil {
// TargetAccount seems to have gone;
// race condition? db corruption?
log.
WithContext(ctx).
WithField("follow", follow).
Warn("follow had no TargetAccount, likely race condition")
return nil
}
if follow.TargetAccount.IsLocal() {
// No side effects
// for local unfollows.
return nil
}
// There was a follow, process side effects.
return &messages.FromClientAPI{
APObjectType: ap.ActivityFollow,
APActivityType: ap.ActivityUndo,
GTSModel: follow,
Origin: account,
Target: follow.TargetAccount,
}
}
}
func (p *Processor) deleteAccountBlocks(ctx context.Context, account *gtsmodel.Account) error {
if err := p.state.DB.DeleteAccountBlocks(ctx, account.ID); err != nil {
return gtserror.Newf("db error deleting account blocks for %s: %w", account.ID, err)
}
return nil
}
// deleteAccountStatuses iterates through all statuses owned by
// the given account, passing each discovered status (and boosts
// thereof) to the processor workers for further processing.
func (p *Processor) deleteAccountStatuses(
ctx context.Context,
log *log.Entry,
account *gtsmodel.Account,
) error {
// We'll select statuses 50 at a time so we don't wreck the db,
// and pass them through to the client api worker to handle.
//
// Deleting the statuses in this way also handles deleting the
// account's media attachments, mentions, and polls, since these
// are all attached to statuses.
) {
var maxID string
var (
statuses []*gtsmodel.Status
err error
maxID string
msgs = []*messages.FromClientAPI{}
)
statusLoop:
for {
// Page through account's statuses.
statuses, err = p.state.DB.GetAccountStatuses(
ctx,
// Page through deleting account's statuses.
statuses, err := p.state.DB.GetAccountStatuses(
gtscontext.SetBarebones(ctx),
account.ID,
deleteSelectLimit,
false,
@ -349,12 +340,14 @@ statusLoop:
false,
)
if err != nil && !errors.Is(err, db.ErrNoEntries) {
// Make sure we don't have a real error.
return err
log.Errorf("error getting account statuses: %v", err)
return
}
if len(statuses) == 0 {
break statusLoop
// reached
// the end.
break
}
// Update next maxID from last status.
@ -365,140 +358,162 @@ statusLoop:
status.Account = account
// Look for any boosts of this status in DB.
//
// We put these in the msgs slice first so
// that they're handled first, before the
// parent status that's being boosted.
//
// Use a barebones context and just select the
// origin account separately. The rest will be
// populated later anyway, and we don't want to
// stop now because we couldn't get something.
boosts, err := p.state.DB.GetStatusBoosts(
gtscontext.SetBarebones(ctx),
status.ID,
)
if err != nil && !errors.Is(err, db.ErrNoEntries) {
return gtserror.Newf("error fetching status boosts for %s: %w", status.ID, err)
log.Errorf("error getting status boosts for %s: %v", status.URI, err)
continue
}
// Prepare to Undo each boost.
for _, boost := range boosts {
// Fetch the owning account of this boost.
boost.Account, err = p.state.DB.GetAccountByID(
gtscontext.SetBarebones(ctx),
boost.AccountID,
)
if err != nil {
log.Warnf(
ctx,
"db error getting owner %s of status boost %s: %v",
boost.AccountID, boost.ID, err,
)
log.Errorf("error getting owner %s of status boost %s: %v",
boost.AccountURI, boost.URI, err)
continue
}
msgs = append(msgs, &messages.FromClientAPI{
APObjectType: ap.ActivityAnnounce,
APActivityType: ap.ActivityUndo,
GTSModel: status,
Origin: boost.Account,
Target: account,
})
// Process boost undo event.
p.processSideEffect(ctx, log,
ap.ActivityUndo,
ap.ActivityAnnounce,
boost,
account,
account,
)
}
// Now prepare to Delete status.
msgs = append(msgs, &messages.FromClientAPI{
APObjectType: ap.ObjectNote,
APActivityType: ap.ActivityDelete,
GTSModel: status,
Origin: account,
Target: account,
})
}
}
// Process accreted messages in serial.
for _, msg := range msgs {
if err := p.state.Workers.Client.Process(ctx, msg); err != nil {
log.Errorf(
ctx,
"error processing %s of %s during Delete of account %s: %v",
msg.APActivityType, msg.APObjectType, account.ID, err,
// Process status delete event.
p.processSideEffect(ctx, log,
ap.ActivityDelete,
ap.ObjectNote,
status,
account,
account,
)
}
}
return nil
}
func (p *Processor) deleteAccountNotifications(ctx context.Context, account *gtsmodel.Account) error {
// Delete all notifications of all types targeting given account.
if err := p.state.DB.DeleteNotifications(ctx, nil, account.ID, ""); err != nil && !errors.Is(err, db.ErrNoEntries) {
return gtserror.Newf("error deleting notifications targeting account: %w", err)
func (p *Processor) deleteAccountNotifications(
ctx context.Context,
log *log.Entry,
account *gtsmodel.Account,
) {
if account.IsLocal() {
// Delete all types of notifications targeting this local account.
if err := p.state.DB.DeleteNotifications(ctx, nil, account.ID, ""); // nocollapse
err != nil && !errors.Is(err, db.ErrNoEntries) {
log.Errorf("error deleting notifications targeting account: %v", err)
}
}
// Delete all notifications of all types originating from given account.
if err := p.state.DB.DeleteNotifications(ctx, nil, "", account.ID); err != nil && !errors.Is(err, db.ErrNoEntries) {
return gtserror.Newf("error deleting notifications by account: %w", err)
}
return nil
}
func (p *Processor) deleteAccountPeripheral(ctx context.Context, account *gtsmodel.Account) error {
// Delete all bookmarks owned by given account.
if err := p.state.DB.DeleteStatusBookmarks(ctx, account.ID, ""); // nocollapse
// Delete all types of notifications originating from this account.
if err := p.state.DB.DeleteNotifications(ctx, nil, "", account.ID); // nocollapse
err != nil && !errors.Is(err, db.ErrNoEntries) {
return gtserror.Newf("error deleting bookmarks by account: %w", err)
log.Errorf("error deleting notifications originating from account: %v", err)
}
}
func (p *Processor) deleteAccountPeripheral(
ctx context.Context,
log *log.Entry,
account *gtsmodel.Account,
) {
if account.IsLocal() {
// Delete all bookmarks owned by given account, only for local.
if err := p.state.DB.DeleteStatusBookmarks(ctx, account.ID, ""); // nocollapse
err != nil && !errors.Is(err, db.ErrNoEntries) {
log.Errorf("error deleting bookmarks by account: %v", err)
}
// Delete all faves owned by given account, only for local.
if err := p.state.DB.DeleteStatusFaves(ctx, account.ID, ""); // nocollapse
err != nil && !errors.Is(err, db.ErrNoEntries) {
log.Errorf("error deleting faves by account: %v", err)
}
// Delete all conversations owned by given account, only for local.
//
// *Participated* conversations will be retained, leaving up to *their* owners.
if err := p.state.DB.DeleteConversationsByOwnerAccountID(ctx, account.ID); // nocollapse
err != nil && !errors.Is(err, db.ErrNoEntries) {
log.Errorf("error deleting conversations by account: %v", err)
}
// Delete all followed tags owned by given account, only for local.
if err := p.state.DB.DeleteFollowedTagsByAccountID(ctx, account.ID); // nocollapse
err != nil && !errors.Is(err, db.ErrNoEntries) {
log.Errorf("error deleting followed tags by account: %v", err)
}
// Delete stats model stored for given account, only for local.
if err := p.state.DB.DeleteAccountStats(ctx, account.ID); err != nil {
log.Errorf("error deleting stats for account: %v", err)
}
}
// Delete all bookmarks targeting given account.
// Delete all bookmarks targeting given account, local and remote.
if err := p.state.DB.DeleteStatusBookmarks(ctx, "", account.ID); // nocollapse
err != nil && !errors.Is(err, db.ErrNoEntries) {
return gtserror.Newf("error deleting bookmarks targeting account: %w", err)
log.Errorf("error deleting bookmarks targeting account: %v", err)
}
// Delete all faves owned by given account.
if err := p.state.DB.DeleteStatusFaves(ctx, account.ID, ""); // nocollapse
err != nil && !errors.Is(err, db.ErrNoEntries) {
return gtserror.Newf("error deleting faves by account: %w", err)
}
// Delete all faves targeting given account.
// Delete all faves targeting given account, local and remote.
if err := p.state.DB.DeleteStatusFaves(ctx, "", account.ID); // nocollapse
err != nil && !errors.Is(err, db.ErrNoEntries) {
return gtserror.Newf("error deleting faves targeting account: %w", err)
log.Errorf("error deleting faves targeting account: %v", err)
}
// TODO: add status mutes here when they're implemented.
// Delete all conversations owned by given account.
// Conversations in which it has only participated will be retained;
// they can always be deleted by their owners.
if err := p.state.DB.DeleteConversationsByOwnerAccountID(ctx, account.ID); // nocollapse
err != nil && !errors.Is(err, db.ErrNoEntries) {
return gtserror.Newf("error deleting conversations owned by account: %w", err)
}
// Delete all poll votes owned by given account.
// Delete all poll votes owned by given account, local and remote.
if err := p.state.DB.DeletePollVotesByAccountID(ctx, account.ID); // nocollapse
err != nil && !errors.Is(err, db.ErrNoEntries) {
return gtserror.Newf("error deleting poll votes by account: %w", err)
log.Errorf("error deleting poll votes by account: %v", err)
}
}
// Delete all followed tags owned by given account.
if err := p.state.DB.DeleteFollowedTagsByAccountID(ctx, account.ID); // nocollapse
err != nil && !errors.Is(err, db.ErrNoEntries) {
return gtserror.Newf("error deleting followed tags by account: %w", err)
// processSideEffect will process the given side effect details,
// with appropriate worker depending on if origin is local / remote.
func (p *Processor) processSideEffect(
ctx context.Context,
log *log.Entry,
activityType string,
objectType string,
gtsModel any,
origin *gtsmodel.Account,
target *gtsmodel.Account,
) {
if origin.IsLocal() {
// Process side-effect through our client API as this is a local account.
if err := p.state.Workers.Client.Process(ctx, &messages.FromClientAPI{
APActivityType: activityType,
APObjectType: objectType,
GTSModel: gtsModel,
Origin: origin,
Target: target,
}); err != nil {
log.Errorf("error processing %s of %s during local account %s delete: %v", activityType, objectType, origin.ID, err)
}
} else {
// Process side-effect through our fedi API as this is a remote account.
if err := p.state.Workers.Federator.Process(ctx, &messages.FromFediAPI{
APActivityType: activityType,
APObjectType: objectType,
GTSModel: gtsModel,
Requesting: origin,
Receiving: target,
}); err != nil {
log.Errorf("error processing %s of %s during local account %s delete: %v", activityType, objectType, origin.ID, err)
}
}
// Delete account stats model.
if err := p.state.DB.DeleteAccountStats(ctx, account.ID); err != nil {
return gtserror.Newf("error deleting stats for account: %w", err)
}
return nil
}
// stubbifyAccount renders the given account as a stub,
@ -567,15 +582,21 @@ func stubbifyAccount(account *gtsmodel.Account, origin string) []string {
//
// For caller's convenience, this function returns the db
// names of all columns that are updated by it.
func stubbifyUser(user *gtsmodel.User) ([]string, error) {
func stubbifyUser(user *gtsmodel.User) []string {
uuid, err := uuid.New().MarshalBinary()
if err != nil {
return nil, err
// this should never happen,
// it indicates /dev/random
// is misbehaving.
panic(err)
}
dummyPassword, err := bcrypt.GenerateFromPassword(uuid, bcrypt.DefaultCost)
if err != nil {
return nil, err
// this should never happen,
// it indicates /dev/random
// is misbehaving.
panic(err)
}
never := time.Time{}
@ -600,5 +621,5 @@ func stubbifyUser(user *gtsmodel.User) ([]string, error) {
"confirmation_sent_at",
"reset_password_token",
"reset_password_sent_at",
}, nil
}
}

View file

@ -120,7 +120,7 @@ func (p *Processor) ExportBlocks(
ctx context.Context,
requester *gtsmodel.Account,
) ([][]string, gtserror.WithCode) {
blocks, err := p.state.DB.GetAccountBlocks(ctx, requester.ID, nil)
blocks, err := p.state.DB.GetAccountBlocking(ctx, requester.ID, nil)
if err != nil && !errors.Is(err, db.ErrNoEntries) {
err = gtserror.Newf("db error getting blocks: %w", err)
return nil, gtserror.NewErrorInternalError(err)

View file

@ -298,7 +298,7 @@ func importBlocksAsyncF(
err error
)
prevBlocks, err = p.state.DB.GetAccountBlocks(ctx, requester.ID, nil)
prevBlocks, err = p.state.DB.GetAccountBlocking(ctx, requester.ID, nil)
if err != nil {
log.Errorf(ctx, "db error getting blocks: %v", err)
return

View file

@ -1093,11 +1093,13 @@ func (p *clientAPI) DeleteAccountOrUser(ctx context.Context, cMsg *messages.From
p.state.Caches.Timelines.List.Delete(listID)
}
// Federate out a delete activity targeting account to remote servers.
if err := p.federate.DeleteAccount(ctx, cMsg.Target); err != nil {
log.Errorf(ctx, "error federating account delete: %v", err)
}
if err := p.account.Delete(ctx, cMsg.Target, originID); err != nil {
// And finally, perform the actual account deletion synchronously.
if err := p.account.Delete(ctx, account, originID); err != nil {
log.Errorf(ctx, "error deleting account: %v", err)
}

View file

@ -1120,7 +1120,7 @@ func (p *fediAPI) DeleteAccount(ctx context.Context, fMsg *messages.FromFediAPI)
// Remove any entries authored by account from timelines.
p.surface.removeTimelineEntriesByAccount(account.ID)
// First perform the actual account deletion.
// And finally, perform the actual account deletion synchronously.
if err := p.account.Delete(ctx, account, account.ID); err != nil {
log.Errorf(ctx, "error deleting account: %v", err)
}

View file

@ -50,7 +50,7 @@ func (c *Converter) AccountToExportStats(
)
}
blockingCount, err := c.state.DB.CountAccountBlocks(ctx, a.ID)
blockingCount, err := c.state.DB.CountAccountBlocking(ctx, a.ID)
if err != nil {
return nil, gtserror.Newf(
"error counting lists for account %s: %w",

View file

@ -32,18 +32,7 @@ func Must(fn func()) {
panic("nil func")
}
for !func() (done bool) {
defer func() {
if r := recover(); r != nil {
// Gather calling func frames.
pcs := make([]uintptr, 10)
n := runtime.Callers(3, pcs)
i := runtime.CallersFrames(pcs[:n])
c := gatherFrames(i, n)
const msg = "recovered panic: %v\n\n%s\n"
fmt.Fprintf(os.Stderr, msg, r, c.String())
}
}()
defer Recover()
fn()
done = true
return
@ -51,6 +40,21 @@ func Must(fn func()) {
}
}
// Recover wraps runtime.recover() to dump the current
// stack to stderr on panic and return the panic value.
func Recover() any {
if r := recover(); r != nil {
// Gather calling func frames.
pcs := make([]uintptr, 10)
n := runtime.Callers(3, pcs)
i := runtime.CallersFrames(pcs[:n])
c := gatherFrames(i, n)
fmt.Fprintf(os.Stderr, "recovered panic: %v\n\n%s\n", r, c.String())
return r
}
return nil
}
// gatherFrames collates runtime frames from a frame iterator.
func gatherFrames(iter *runtime.Frames, n int) errors.Callers {
if iter == nil {