mirror of
				https://github.com/superseriousbusiness/gotosocial.git
				synced 2025-11-04 05:02:25 -06:00 
			
		
		
		
	* start replacing client + federator + media workers with new worker + queue types
* refactor federatingDB.Delete(), drop queued messages when deleting account / status
* move all queue purging to the processor workers
* undo toolchain updates
* code comments, ensure dereferencer worker pool gets started
* update gruf libraries in readme
* start the job scheduler separately to the worker pools
* reshuffle ordering or server.go + remove duplicate worker start / stop
* update go-list version
* fix vendoring
* move queue invalidation to before wipeing / deletion, to ensure queued work not dropped
* add logging to worker processing functions in testrig, don't start workers in unexpected places
* update go-structr to add (+then rely on) QueueCtx{} type
* ensure more worker pools get started properly in tests
* fix remaining broken tests relying on worker queue logic
* fix account test suite queue popping logic, ensure noop workers do not pull from queue
* move back accidentally shuffled account deletion order
* ensure error (non nil!!) gets passed in refactored federatingDB{}.Delete()
* silently drop deletes from accounts not permitted to
* don't warn log on forwarded deletes
* make if else clauses easier to parse
* use getFederatorMsg()
* improved code comment
* improved code comment re: requesting account delete checks
* remove boolean result from worker start / stop since false = already running or already stopped
* remove optional passed-in http.client
* remove worker starting from the admin CLI commands (we don't need to handle side-effects)
* update prune cli to start scheduler but not all of the workers
* fix rebase issues
* remove redundant return statements
* i'm sorry sir linter
		
	
			
		
			
				
	
	
		
			478 lines
		
	
	
	
		
			13 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			478 lines
		
	
	
	
		
			13 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// GoToSocial
 | 
						|
// Copyright (C) GoToSocial Authors admin@gotosocial.org
 | 
						|
// SPDX-License-Identifier: AGPL-3.0-or-later
 | 
						|
//
 | 
						|
// This program is free software: you can redistribute it and/or modify
 | 
						|
// it under the terms of the GNU Affero General Public License as published by
 | 
						|
// the Free Software Foundation, either version 3 of the License, or
 | 
						|
// (at your option) any later version.
 | 
						|
//
 | 
						|
// This program is distributed in the hope that it will be useful,
 | 
						|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
// GNU Affero General Public License for more details.
 | 
						|
//
 | 
						|
// You should have received a copy of the GNU Affero General Public License
 | 
						|
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
 | 
						|
 | 
						|
package workers
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"errors"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/superseriousbusiness/gotosocial/internal/db"
 | 
						|
	"github.com/superseriousbusiness/gotosocial/internal/federation/dereferencing"
 | 
						|
	"github.com/superseriousbusiness/gotosocial/internal/gtscontext"
 | 
						|
	"github.com/superseriousbusiness/gotosocial/internal/gtserror"
 | 
						|
	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
 | 
						|
	"github.com/superseriousbusiness/gotosocial/internal/id"
 | 
						|
	"github.com/superseriousbusiness/gotosocial/internal/log"
 | 
						|
	"github.com/superseriousbusiness/gotosocial/internal/messages"
 | 
						|
)
 | 
						|
 | 
						|
// ShouldProcessMove checks whether we should attempt
 | 
						|
// to process a move with the given object and target,
 | 
						|
// based on whether or not a move with those values
 | 
						|
// was attempted or succeeded recently.
 | 
						|
func (p *fediAPI) ShouldProcessMove(
 | 
						|
	ctx context.Context,
 | 
						|
	object string,
 | 
						|
	target string,
 | 
						|
) (bool, error) {
 | 
						|
	// If a Move has been *attempted* within last 5m,
 | 
						|
	// that involved the origin and target in any way,
 | 
						|
	// then we shouldn't try to reprocess immediately.
 | 
						|
	//
 | 
						|
	// This avoids the potential DDOS vector of a given
 | 
						|
	// origin account spamming out moves to various
 | 
						|
	// target accounts, causing loads of dereferences.
 | 
						|
	latestMoveAttempt, err := p.state.DB.GetLatestMoveAttemptInvolvingURIs(
 | 
						|
		ctx, object, target,
 | 
						|
	)
 | 
						|
	if err != nil {
 | 
						|
		return false, gtserror.Newf(
 | 
						|
			"error checking latest Move attempt involving object %s and target %s: %w",
 | 
						|
			object, target, err,
 | 
						|
		)
 | 
						|
	}
 | 
						|
 | 
						|
	if !latestMoveAttempt.IsZero() &&
 | 
						|
		time.Since(latestMoveAttempt) < 5*time.Minute {
 | 
						|
		log.Infof(ctx,
 | 
						|
			"object %s or target %s have been involved in a Move attempt within the last 5 minutes, will not process Move",
 | 
						|
			object, target,
 | 
						|
		)
 | 
						|
		return false, nil
 | 
						|
	}
 | 
						|
 | 
						|
	// If a Move has *succeeded* within the last week
 | 
						|
	// that involved the origin and target in any way,
 | 
						|
	// then we shouldn't process again for a while.
 | 
						|
	latestMoveSuccess, err := p.state.DB.GetLatestMoveSuccessInvolvingURIs(
 | 
						|
		ctx, object, target,
 | 
						|
	)
 | 
						|
	if err != nil {
 | 
						|
		return false, gtserror.Newf(
 | 
						|
			"error checking latest Move success involving object %s and target %s: %w",
 | 
						|
			object, target, err,
 | 
						|
		)
 | 
						|
	}
 | 
						|
 | 
						|
	if !latestMoveSuccess.IsZero() &&
 | 
						|
		time.Since(latestMoveSuccess) < 168*time.Hour {
 | 
						|
		log.Infof(ctx,
 | 
						|
			"object %s or target %s have been involved in a successful Move within the last 7 days, will not process Move",
 | 
						|
			object, target,
 | 
						|
		)
 | 
						|
		return false, nil
 | 
						|
	}
 | 
						|
 | 
						|
	return true, nil
 | 
						|
}
 | 
						|
 | 
						|
// GetOrCreateMove takes a stub move created by the
 | 
						|
// requesting account, and either retrieves or creates
 | 
						|
// a corresponding move in the database. If a move is
 | 
						|
// created in this way, requestingAcct will be updated
 | 
						|
// with the correct moveID.
 | 
						|
func (p *fediAPI) GetOrCreateMove(
 | 
						|
	ctx context.Context,
 | 
						|
	requestingAcct *gtsmodel.Account,
 | 
						|
	stubMove *gtsmodel.Move,
 | 
						|
) (*gtsmodel.Move, error) {
 | 
						|
	var (
 | 
						|
		moveURIStr = stubMove.URI
 | 
						|
		objectStr  = stubMove.OriginURI
 | 
						|
		object     = stubMove.Origin
 | 
						|
		targetStr  = stubMove.TargetURI
 | 
						|
		target     = stubMove.Target
 | 
						|
 | 
						|
		move *gtsmodel.Move
 | 
						|
		err  error
 | 
						|
	)
 | 
						|
 | 
						|
	// See if we have a move with
 | 
						|
	// this ID/URI stored already.
 | 
						|
	move, err = p.state.DB.GetMoveByURI(ctx, moveURIStr)
 | 
						|
	if err != nil && !errors.Is(err, db.ErrNoEntries) {
 | 
						|
		return nil, gtserror.Newf(
 | 
						|
			"db error retrieving move with URI %s: %w",
 | 
						|
			moveURIStr, err,
 | 
						|
		)
 | 
						|
	}
 | 
						|
 | 
						|
	if move != nil {
 | 
						|
		// We had a Move with this ID/URI.
 | 
						|
		//
 | 
						|
		// Make sure the Move we already had
 | 
						|
		// stored has the same origin + target.
 | 
						|
		if move.OriginURI != objectStr ||
 | 
						|
			move.TargetURI != targetStr {
 | 
						|
			return nil, gtserror.Newf(
 | 
						|
				"Move object %s and/or target %s differ from stored object and target for this ID (%s)",
 | 
						|
				objectStr, targetStr, moveURIStr,
 | 
						|
			)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// If we didn't have a move stored for
 | 
						|
	// this ID/URI, then see if we have a
 | 
						|
	// Move with this origin and target
 | 
						|
	// already (but a different ID/URI).
 | 
						|
	if move == nil {
 | 
						|
		move, err = p.state.DB.GetMoveByOriginTarget(ctx, objectStr, targetStr)
 | 
						|
		if err != nil && !errors.Is(err, db.ErrNoEntries) {
 | 
						|
			return nil, gtserror.Newf(
 | 
						|
				"db error retrieving Move with object %s and target %s: %w",
 | 
						|
				objectStr, targetStr, err,
 | 
						|
			)
 | 
						|
		}
 | 
						|
 | 
						|
		if move != nil {
 | 
						|
			// We had a move for this object and
 | 
						|
			// target, but the ID/URI has changed.
 | 
						|
			// Update the Move's URI in the db to
 | 
						|
			// reflect that this is but the latest
 | 
						|
			// attempt with this origin + target.
 | 
						|
			//
 | 
						|
			// The remote may be trying to retry
 | 
						|
			// the Move but their server might
 | 
						|
			// not reuse the same Activity URIs,
 | 
						|
			// and we don't want to store a brand
 | 
						|
			// new Move for each attempt!
 | 
						|
			move.URI = moveURIStr
 | 
						|
			if err := p.state.DB.UpdateMove(ctx, move, "uri"); err != nil {
 | 
						|
				return nil, gtserror.Newf(
 | 
						|
					"db error updating Move with object %s and target %s: %w",
 | 
						|
					objectStr, targetStr, err,
 | 
						|
				)
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if move == nil {
 | 
						|
		// If Move is still nil then
 | 
						|
		// we didn't have this Move
 | 
						|
		// stored yet, so it's new.
 | 
						|
		// Store it now!
 | 
						|
		move = >smodel.Move{
 | 
						|
			ID:          id.NewULID(),
 | 
						|
			AttemptedAt: time.Now(),
 | 
						|
			OriginURI:   objectStr,
 | 
						|
			Origin:      object,
 | 
						|
			TargetURI:   targetStr,
 | 
						|
			Target:      target,
 | 
						|
			URI:         moveURIStr,
 | 
						|
		}
 | 
						|
		if err := p.state.DB.PutMove(ctx, move); err != nil {
 | 
						|
			return nil, gtserror.Newf(
 | 
						|
				"db error storing move %s: %w",
 | 
						|
				moveURIStr, err,
 | 
						|
			)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// If move_id isn't set on the requesting
 | 
						|
	// account yet, set it so other processes
 | 
						|
	// know there's a Move in progress.
 | 
						|
	if requestingAcct.MoveID != move.ID {
 | 
						|
		requestingAcct.Move = move
 | 
						|
		requestingAcct.MoveID = move.ID
 | 
						|
		if err := p.state.DB.UpdateAccount(ctx,
 | 
						|
			requestingAcct, "move_id",
 | 
						|
		); err != nil {
 | 
						|
			return nil, gtserror.Newf(
 | 
						|
				"db error updating move_id on account: %w",
 | 
						|
				err,
 | 
						|
			)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return move, nil
 | 
						|
}
 | 
						|
 | 
						|
// MoveAccount processes the given
 | 
						|
// Move FromFediAPI message:
 | 
						|
//
 | 
						|
//	APObjectType:     "Profile"
 | 
						|
//	APActivityType:   "Move"
 | 
						|
//	GTSModel:         stub *gtsmodel.Move.
 | 
						|
//	ReceivingAccount: Account of inbox owner receiving the Move.
 | 
						|
func (p *fediAPI) MoveAccount(ctx context.Context, fMsg *messages.FromFediAPI) error {
 | 
						|
	// *gtsmodel.Move activity.
 | 
						|
	stubMove, ok := fMsg.GTSModel.(*gtsmodel.Move)
 | 
						|
	if !ok {
 | 
						|
		return gtserror.Newf(
 | 
						|
			"%T not parseable as *gtsmodel.Move",
 | 
						|
			fMsg.GTSModel,
 | 
						|
		)
 | 
						|
	}
 | 
						|
 | 
						|
	// Move origin and target info.
 | 
						|
	var (
 | 
						|
		originAcctURIStr = stubMove.OriginURI
 | 
						|
		originAcct       = fMsg.Requesting
 | 
						|
		targetAcctURIStr = stubMove.TargetURI
 | 
						|
		targetAcctURI    = stubMove.Target
 | 
						|
	)
 | 
						|
 | 
						|
	// Assemble log context.
 | 
						|
	l := log.
 | 
						|
		WithContext(ctx).
 | 
						|
		WithField("originAcct", originAcctURIStr).
 | 
						|
		WithField("targetAcct", targetAcctURIStr)
 | 
						|
 | 
						|
	// We can't/won't validate Move activities
 | 
						|
	// to domains we have blocked, so check this.
 | 
						|
	targetDomainBlocked, err := p.state.DB.IsDomainBlocked(ctx, targetAcctURI.Host)
 | 
						|
	if err != nil {
 | 
						|
		return gtserror.Newf(
 | 
						|
			"db error checking if target domain %s blocked: %w",
 | 
						|
			targetAcctURI.Host, err,
 | 
						|
		)
 | 
						|
	}
 | 
						|
 | 
						|
	if targetDomainBlocked {
 | 
						|
		l.Info("target domain is blocked, will not process Move")
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	// Next steps require making calls to remote +
 | 
						|
	// setting values that may be attempted by other
 | 
						|
	// in-process Moves. To avoid race conditions,
 | 
						|
	// ensure we're only trying to process this
 | 
						|
	// Move combo one attempt at a time.
 | 
						|
	//
 | 
						|
	// We use a custom lock because remotes might
 | 
						|
	// try to send the same Move several times with
 | 
						|
	// different IDs (you never know), but we only
 | 
						|
	// want to process them based on origin + target.
 | 
						|
	unlock := p.state.FedLocks.Lock(
 | 
						|
		"move:" + originAcctURIStr + ":" + targetAcctURIStr,
 | 
						|
	)
 | 
						|
	defer unlock()
 | 
						|
 | 
						|
	// Check if Move is rate limited based
 | 
						|
	// on previous attempts / successes.
 | 
						|
	shouldProcess, err := p.ShouldProcessMove(ctx,
 | 
						|
		originAcctURIStr, targetAcctURIStr,
 | 
						|
	)
 | 
						|
	if err != nil {
 | 
						|
		return gtserror.Newf(
 | 
						|
			"error checking if Move should be processed now: %w",
 | 
						|
			err,
 | 
						|
		)
 | 
						|
	}
 | 
						|
 | 
						|
	if !shouldProcess {
 | 
						|
		// Move is rate limited, so don't process.
 | 
						|
		// Reason why should already be logged.
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	// Store new or retrieve existing Move. This will
 | 
						|
	// also update moveID on originAcct if necessary.
 | 
						|
	move, err := p.GetOrCreateMove(ctx, originAcct, stubMove)
 | 
						|
	if err != nil {
 | 
						|
		return gtserror.Newf(
 | 
						|
			"error refreshing target account %s: %w",
 | 
						|
			targetAcctURIStr, err,
 | 
						|
		)
 | 
						|
	}
 | 
						|
 | 
						|
	// Account to which the Move is taking place.
 | 
						|
	targetAcct, targetAcctable, err := p.federate.GetAccountByURI(
 | 
						|
		ctx,
 | 
						|
		fMsg.Receiving.Username,
 | 
						|
		targetAcctURI,
 | 
						|
	)
 | 
						|
	if err != nil {
 | 
						|
		return gtserror.Newf(
 | 
						|
			"error getting target account %s: %w",
 | 
						|
			targetAcctURIStr, err,
 | 
						|
		)
 | 
						|
	}
 | 
						|
 | 
						|
	// If target is suspended from this instance,
 | 
						|
	// then we can't/won't process any move side
 | 
						|
	// effects to that account, because:
 | 
						|
	//
 | 
						|
	//   1. We can't verify that it's aliased correctly
 | 
						|
	//      back to originAcct without dereferencing it.
 | 
						|
	//   2. We can't/won't forward follows to a suspended
 | 
						|
	//      account, since suspension would remove follows
 | 
						|
	//      etc. targeting the new account anyways.
 | 
						|
	//   3. If someone is moving to a suspended account
 | 
						|
	//      they probably totally suck ass (according to
 | 
						|
	//      the moderators of this instance, anyway) so
 | 
						|
	//      to hell with it.
 | 
						|
	if targetAcct.IsSuspended() {
 | 
						|
		l.Info("target account is suspended, will not process Move")
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	if targetAcct.IsRemote() {
 | 
						|
		// Force refresh Move target account
 | 
						|
		// to ensure we have up-to-date version.
 | 
						|
		targetAcct, _, err = p.federate.RefreshAccount(ctx,
 | 
						|
			fMsg.Receiving.Username,
 | 
						|
			targetAcct,
 | 
						|
			targetAcctable,
 | 
						|
			dereferencing.Freshest,
 | 
						|
		)
 | 
						|
		if err != nil {
 | 
						|
			return gtserror.Newf(
 | 
						|
				"error refreshing target account %s: %w",
 | 
						|
				targetAcctURIStr, err,
 | 
						|
			)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// Target must not itself have moved somewhere.
 | 
						|
	// You can't move to an already-moved account.
 | 
						|
	targetAcctMovedTo := targetAcct.MovedToURI
 | 
						|
	if targetAcctMovedTo != "" {
 | 
						|
		l.Infof(
 | 
						|
			"target account has, itself, already moved to %s, will not process Move",
 | 
						|
			targetAcctMovedTo,
 | 
						|
		)
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	// Target must be aliased back to origin account.
 | 
						|
	// Ie., its alsoKnownAs values must include the
 | 
						|
	// origin account, so we know it's for real.
 | 
						|
	if !targetAcct.IsAliasedTo(originAcctURIStr) {
 | 
						|
		l.Info("target account is not aliased back to origin account, will not process Move")
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	/*
 | 
						|
		At this point we know that the move
 | 
						|
		looks valid and we should process it.
 | 
						|
	*/
 | 
						|
 | 
						|
	// Transfer originAcct's followers
 | 
						|
	// on this instance to targetAcct.
 | 
						|
	redirectOK := p.utils.redirectFollowers(
 | 
						|
		ctx,
 | 
						|
		originAcct,
 | 
						|
		targetAcct,
 | 
						|
	)
 | 
						|
 | 
						|
	// Remove follows on this
 | 
						|
	// instance owned by originAcct.
 | 
						|
	removeFollowingOK := p.RemoveAccountFollowing(
 | 
						|
		ctx,
 | 
						|
		originAcct,
 | 
						|
	)
 | 
						|
 | 
						|
	// Whatever happened above, error or
 | 
						|
	// not, we've just at least attempted
 | 
						|
	// the Move so we'll need to update it.
 | 
						|
	move.AttemptedAt = time.Now()
 | 
						|
	updateColumns := []string{"attempted_at"}
 | 
						|
 | 
						|
	if redirectOK && removeFollowingOK {
 | 
						|
		// All OK means we can mark the
 | 
						|
		// Move as definitively succeeded.
 | 
						|
		//
 | 
						|
		// Take same time so SucceededAt
 | 
						|
		// isn't 0.0001s later or something.
 | 
						|
		move.SucceededAt = move.AttemptedAt
 | 
						|
		updateColumns = append(updateColumns, "succeeded_at")
 | 
						|
	}
 | 
						|
 | 
						|
	// Update whatever columns we need to update.
 | 
						|
	if err := p.state.DB.UpdateMove(ctx,
 | 
						|
		move, updateColumns...,
 | 
						|
	); err != nil {
 | 
						|
		return gtserror.Newf(
 | 
						|
			"db error updating Move %s: %w",
 | 
						|
			move.URI, err,
 | 
						|
		)
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// RemoveAccountFollowing removes all
 | 
						|
// follows owned by the move originAcct.
 | 
						|
//
 | 
						|
// originAcct must be fully dereferenced
 | 
						|
// already, and the Move must be valid.
 | 
						|
//
 | 
						|
// Callers to this function MUST have obtained
 | 
						|
// a lock already by calling FedLocks.Lock.
 | 
						|
//
 | 
						|
// Return bool will be true if all goes OK.
 | 
						|
func (p *fediAPI) RemoveAccountFollowing(
 | 
						|
	ctx context.Context,
 | 
						|
	originAcct *gtsmodel.Account,
 | 
						|
) bool {
 | 
						|
	// Any follows owned by originAcct which target
 | 
						|
	// accounts on our instance should be removed.
 | 
						|
	//
 | 
						|
	// We should rely on the target instance
 | 
						|
	// to send out new follows from targetAcct.
 | 
						|
	following, err := p.state.DB.GetAccountLocalFollows(
 | 
						|
		gtscontext.SetBarebones(ctx),
 | 
						|
		originAcct.ID,
 | 
						|
	)
 | 
						|
	if err != nil && !errors.Is(err, db.ErrNoEntries) {
 | 
						|
		log.Errorf(ctx,
 | 
						|
			"db error getting follows owned by originAcct: %v",
 | 
						|
			err,
 | 
						|
		)
 | 
						|
		return false
 | 
						|
	}
 | 
						|
 | 
						|
	for _, follow := range following {
 | 
						|
		// Ditch it. This is a one-way action
 | 
						|
		// from our side so we don't need to
 | 
						|
		// send any messages this time.
 | 
						|
		if err := p.state.DB.DeleteFollowByID(ctx, follow.ID); err != nil {
 | 
						|
			log.Errorf(ctx,
 | 
						|
				"error removing old follow owned by account %s: %v",
 | 
						|
				follow.AccountID, err,
 | 
						|
			)
 | 
						|
			return false
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// Finally delete any follow requests
 | 
						|
	// owned by or targeting the originAcct.
 | 
						|
	if err := p.state.DB.DeleteAccountFollowRequests(
 | 
						|
		ctx, originAcct.ID,
 | 
						|
	); err != nil {
 | 
						|
		log.Errorf(ctx,
 | 
						|
			"db error deleting follow requests involving originAcct %s: %v",
 | 
						|
			originAcct.URI, err,
 | 
						|
		)
 | 
						|
		return false
 | 
						|
	}
 | 
						|
 | 
						|
	return true
 | 
						|
}
 |