start messing about with worker functions etc

This commit is contained in:
tobi 2025-05-24 17:17:26 +02:00
commit 060ef4149b
15 changed files with 649 additions and 119 deletions

View file

@ -203,26 +203,23 @@ func (p *Processor) acceptAnnounce(
ctx context.Context,
req *gtsmodel.InteractionRequest,
) gtserror.WithCode {
// If the Announce is missing, that means it's
// probably already been undone by someone,
// so there's nothing to actually accept.
if req.Reply == nil {
err := gtserror.Newf("no Announce found for interaction request %s", req.ID)
return gtserror.NewErrorNotFound(err)
}
// Update the Announce.
req.Announce.PendingApproval = util.Ptr(false)
req.Announce.PreApproved = false
req.Announce.ApprovedByURI = req.URI
if err := p.state.DB.UpdateStatus(
ctx,
req.Announce,
"pending_approval",
"approved_by_uri",
); err != nil {
err := gtserror.Newf("db error updating status announce: %w", err)
return gtserror.NewErrorInternalError(err)
// If the Announce is set, that means it comes
// from someone straight up sending the Announce
// instead of AnnounceRequest, so we already have
// the Announce in the db. We can update it now.
if req.Announce != nil {
req.Announce.PendingApproval = util.Ptr(false)
req.Announce.PreApproved = false
req.Announce.ApprovedByURI = req.URI
if err := p.state.DB.UpdateStatus(
ctx,
req.Announce,
"pending_approval",
"approved_by_uri",
); err != nil {
err := gtserror.Newf("db error updating status announce: %w", err)
return gtserror.NewErrorInternalError(err)
}
}
// Send the accepted request off through the

View file

@ -287,7 +287,7 @@ func (p *clientAPI) CreateStatus(ctx context.Context, cMsg *messages.FromClientA
// and/or notify the account that's being
// interacted with (if it's local): they can
// approve or deny the interaction later.
if err := p.utils.requestReply(ctx, status); err != nil {
if err := p.utils.replyToRequestReply(ctx, status); err != nil {
return gtserror.Newf("error pending reply: %w", err)
}
@ -494,7 +494,7 @@ func (p *clientAPI) CreateLike(ctx context.Context, cMsg *messages.FromClientAPI
// and/or notify the account that's being
// interacted with (if it's local): they can
// approve or deny the interaction later.
if err := p.utils.requestFave(ctx, fave); err != nil {
if err := p.utils.faveToPendingFave(ctx, fave); err != nil {
return gtserror.Newf("error pending fave: %w", err)
}
@ -555,7 +555,11 @@ func (p *clientAPI) CreateLike(ctx context.Context, cMsg *messages.FromClientAPI
// Don't return, just continue as normal.
}
if err := p.surface.notifyFave(ctx, fave); err != nil {
if err := p.surface.notifyFave(ctx,
fave.Account,
fave.TargetAccount,
fave.Status,
); err != nil {
log.Errorf(ctx, "error notifying fave: %v", err)
}
@ -589,7 +593,7 @@ func (p *clientAPI) CreateAnnounce(ctx context.Context, cMsg *messages.FromClien
// and/or notify the account that's being
// interacted with (if it's local): they can
// approve or deny the interaction later.
if err := p.utils.requestAnnounce(ctx, boost); err != nil {
if err := p.utils.announceToRequestAnnounce(ctx, boost); err != nil {
return gtserror.Newf("error pending boost: %w", err)
}
@ -661,7 +665,11 @@ func (p *clientAPI) CreateAnnounce(ctx context.Context, cMsg *messages.FromClien
}
// Notify the boost target account.
if err := p.surface.notifyAnnounce(ctx, boost); err != nil {
if err := p.surface.notifyAnnounce(ctx,
boost.Account,
boost.BoostOfAccount,
boost.BoostOf,
); err != nil {
log.Errorf(ctx, "error notifying boost: %v", err)
}
@ -1217,8 +1225,13 @@ func (p *clientAPI) AcceptLike(ctx context.Context, cMsg *messages.FromClientAPI
return gtserror.Newf("%T not parseable as *gtsmodel.InteractionRequest", cMsg.GTSModel)
}
// Notify the fave (distinct from the notif for the pending fave).
if err := p.surface.notifyFave(ctx, req.Like); err != nil {
// Notify the fave (distinct from
// the notif for the pending fave).
if err := p.surface.notifyFave(ctx,
req.InteractingAccount,
req.TargetAccount,
req.Status,
); err != nil {
log.Errorf(ctx, "error notifying fave: %v", err)
}
@ -1273,6 +1286,25 @@ func (p *clientAPI) AcceptAnnounce(ctx context.Context, cMsg *messages.FromClien
return gtserror.Newf("%T not parseable as *gtsmodel.InteractionRequest", cMsg.GTSModel)
}
// Send out the Accept.
if err := p.federate.AcceptInteraction(ctx, req); err != nil {
log.Errorf(ctx, "error federating approval of announce: %v", err)
}
// If req.Announce is not set, that means we got
// the request politely as AnnounceRequest, and
// so we don't have the Announce wrapper stored
// because it hasn't been sent yet.
if req.Announce == nil {
// Nothing to do but wait for the
// remote to send the Announce.
return nil
}
// If it *is* set, that means it comes from someone
// straight up sending the Announce first instead of
// AnnounceRequest, so we already have the Announce
// in the db, and we can process it now.
var (
interactingAcct = req.InteractingAccount
boost = req.Announce
@ -1288,16 +1320,17 @@ func (p *clientAPI) AcceptAnnounce(ctx context.Context, cMsg *messages.FromClien
log.Errorf(ctx, "error timelining and notifying status: %v", err)
}
// Notify the announce (distinct from the notif for the pending announce).
if err := p.surface.notifyAnnounce(ctx, boost); err != nil {
// Notify the announce (distinct from
// the notif for the pending announce).
if err := p.surface.notifyAnnounce(
ctx,
boost.Account,
boost.BoostOfAccount,
boost.BoostOf,
); err != nil {
log.Errorf(ctx, "error notifying announce: %v", err)
}
// Send out the Accept.
if err := p.federate.AcceptInteraction(ctx, req); err != nil {
log.Errorf(ctx, "error federating approval of announce: %v", err)
}
// Interaction counts changed on the original status;
// uncache the prepared version from all timelines.
p.surface.invalidateStatusFromTimelines(boost.BoostOfID)

View file

@ -96,6 +96,10 @@ func (p *Processor) ProcessFromFediAPI(ctx context.Context, fMsg *messages.FromF
case ap.ActivityLike:
return p.fediAPI.CreateLike(ctx, fMsg)
// CREATE LIKE/FAVE REQUEST
case ap.ActivityLikeRequest:
return p.fediAPI.CreateLikeRequest(ctx, fMsg)
// CREATE ANNOUNCE/BOOST
case ap.ActivityAnnounce:
return p.fediAPI.CreateAnnounce(ctx, fMsg)
@ -291,7 +295,7 @@ func (p *fediAPI) CreateStatus(ctx context.Context, fMsg *messages.FromFediAPI)
// preapproved, then just notify the account
// that's being interacted with: they can
// approve or deny the interaction later.
if err := p.utils.requestReply(ctx, status); err != nil {
if err := p.utils.replyToRequestReply(ctx, status); err != nil {
return gtserror.Newf("error pending reply: %w", err)
}
@ -525,7 +529,7 @@ func (p *fediAPI) CreateLike(ctx context.Context, fMsg *messages.FromFediAPI) er
// preapproved, then just notify the account
// that's being interacted with: they can
// approve or deny the interaction later.
if err := p.utils.requestFave(ctx, fave); err != nil {
if err := p.utils.faveToPendingFave(ctx, fave); err != nil {
return gtserror.Newf("error pending fave: %w", err)
}
@ -580,7 +584,11 @@ func (p *fediAPI) CreateLike(ctx context.Context, fMsg *messages.FromFediAPI) er
// Don't return, just continue as normal.
}
if err := p.surface.notifyFave(ctx, fave); err != nil {
if err := p.surface.notifyFave(ctx,
fave.Account,
fave.TargetAccount,
fave.Status,
); err != nil {
log.Errorf(ctx, "error notifying fave: %v", err)
}
@ -591,6 +599,76 @@ func (p *fediAPI) CreateLike(ctx context.Context, fMsg *messages.FromFediAPI) er
return nil
}
func (p *fediAPI) CreateLikeRequest(
ctx context.Context,
fMsg *messages.FromFediAPI,
) error {
// Unlike InteractionReq from xyz, InteractionReq from
// xyzRequest will only ever have xyz set on it if xyz
// is a reply, not a Like or an Announce.
//
// In this case, that means there's no Fave set on it.
interactionReq, ok := fMsg.GTSModel.(*gtsmodel.InteractionRequest)
if !ok {
return gtserror.Newf("%T not parseable as *gtsmodel.InteractionRequest", fMsg.GTSModel)
}
// Whatever happens, we'll need to store the request.
//
// AcceptedAt or RejectedAt should already be set on the
// request if it's automatically accepted, or not permitted,
// so we don't have to do that here.
err := p.state.DB.PutInteractionRequest(ctx, interactionReq)
switch {
case err == nil:
// All good,
// it's stored.
case errors.Is(err, db.ErrAlreadyExists):
// Request already stored,
// did something race?
// Nothing to in that case.
return nil
default:
// Real error, cannot continue.
return gtserror.Newf("db error storing like request: %w", err)
}
// Process side effects.
switch {
case interactionReq.IsPending():
// If pending, ie., manual approval
// required, the only side effect is
// to notify the interactee.
if err := p.utils.surface.notifyPendingFave(
ctx,
interactionReq.InteractingAccount,
interactionReq.TargetAccount,
interactionReq.Status,
); err != nil {
log.Errorf(ctx, "error storing pending like notif: %v", err)
}
case interactionReq.IsAccepted():
// If accepted, ie., automatic approval,
// just send out the Accept message and
// wait for the Like to be delivered.
if err := p.federate.AcceptInteraction(ctx, interactionReq); err != nil {
log.Errorf(ctx, "error sending like accept: %v", err)
}
case interactionReq.IsRejected():
// If rejected, ie., not permitted,
// just send out the Reject message.
if err := p.federate.RejectInteraction(ctx, interactionReq); err != nil {
log.Errorf(ctx, "error sending like reject: %v", err)
}
}
return nil
}
func (p *fediAPI) CreateAnnounce(ctx context.Context, fMsg *messages.FromFediAPI) error {
boost, ok := fMsg.GTSModel.(*gtsmodel.Status)
if !ok {
@ -632,7 +710,7 @@ func (p *fediAPI) CreateAnnounce(ctx context.Context, fMsg *messages.FromFediAPI
// preapproved, then just notify the account
// that's being interacted with: they can
// approve or deny the interaction later.
if err := p.utils.requestAnnounce(ctx, boost); err != nil {
if err := p.utils.announceToRequestAnnounce(ctx, boost); err != nil {
return gtserror.Newf("error pending boost: %w", err)
}
@ -697,7 +775,12 @@ func (p *fediAPI) CreateAnnounce(ctx context.Context, fMsg *messages.FromFediAPI
log.Errorf(ctx, "error timelining and notifying status: %v", err)
}
if err := p.surface.notifyAnnounce(ctx, boost); err != nil {
if err := p.surface.notifyAnnounce(
ctx,
boost.Account,
boost.BoostOfAccount,
boost.BoostOf,
); err != nil {
log.Errorf(ctx, "error notifying announce: %v", err)
}
@ -708,6 +791,76 @@ func (p *fediAPI) CreateAnnounce(ctx context.Context, fMsg *messages.FromFediAPI
return nil
}
func (p *fediAPI) CreateAnnounceRequest(
ctx context.Context,
fMsg *messages.FromFediAPI,
) error {
// Unlike InteractionReq from xyz, InteractionReq from
// xyzRequest will only ever have xyz set on it if xyz
// is a reply, not a Like or an Announce.
//
// In this case, that means there's no Announce set on it.
interactionReq, ok := fMsg.GTSModel.(*gtsmodel.InteractionRequest)
if !ok {
return gtserror.Newf("%T not parseable as *gtsmodel.InteractionRequest", fMsg.GTSModel)
}
// Whatever happens, we'll need to store the request.
//
// AcceptedAt or RejectedAt should already be set on the
// request if it's automatically accepted, or not permitted,
// so we don't have to do that here.
err := p.state.DB.PutInteractionRequest(ctx, interactionReq)
switch {
case err == nil:
// All good,
// it's stored.
case errors.Is(err, db.ErrAlreadyExists):
// Request already stored,
// did something race?
// Nothing to in that case.
return nil
default:
// Real error, cannot continue.
return gtserror.Newf("db error storing announce request: %w", err)
}
// Process side effects.
switch {
case interactionReq.IsPending():
// If pending, ie., manual approval
// required, the only side effect is
// to notify the interactee.
if err := p.utils.surface.notifyPendingAnnounce(
ctx,
interactionReq.InteractingAccount,
interactionReq.TargetAccount,
interactionReq.Status,
); err != nil {
log.Errorf(ctx, "error storing pending announce notif: %v", err)
}
case interactionReq.IsAccepted():
// If accepted, ie., automatic approval,
// just send out the Accept message and
// wait for the Announce to be delivered.
if err := p.federate.AcceptInteraction(ctx, interactionReq); err != nil {
log.Errorf(ctx, "error sending announce accept: %v", err)
}
case interactionReq.IsRejected():
// If rejected, ie., not permitted,
// just send out the Reject message.
if err := p.federate.RejectInteraction(ctx, interactionReq); err != nil {
log.Errorf(ctx, "error sending announce reject: %v", err)
}
}
return nil
}
func (p *fediAPI) CreateBlock(ctx context.Context, fMsg *messages.FromFediAPI) error {
block, ok := fMsg.GTSModel.(*gtsmodel.Block)
if !ok {

View file

@ -253,13 +253,19 @@ func (s *Surface) notifyFollow(
return nil
}
// notifyFave notifies the target of the given
// fave that their status has been liked/faved.
// notifyFave notifies the target of of a
// fave that their status has been faved.
func (s *Surface) notifyFave(
ctx context.Context,
fave *gtsmodel.StatusFave,
account *gtsmodel.Account,
targetAccount *gtsmodel.Account,
status *gtsmodel.Status,
) error {
notifyable, err := s.notifyableFave(ctx, fave)
notifyable, err := s.notifyableFave(ctx,
account,
targetAccount,
status,
)
if err != nil {
return err
}
@ -273,24 +279,30 @@ func (s *Surface) notifyFave(
// of fave by account.
if err := s.Notify(ctx,
gtsmodel.NotificationFavourite,
fave.TargetAccount,
fave.Account,
fave.StatusID,
targetAccount,
account,
status.ID,
); err != nil {
return gtserror.Newf("error notifying status author %s: %w", fave.TargetAccountID, err)
return gtserror.Newf("error notifying status author %s: %w", targetAccount.ID, err)
}
return nil
}
// notifyPendingFave notifies the target of the
// given fave that their status has been faved
// and that approval is required.
// notifyPendingFave notifies the target of a
// fave that their status has been faved and
// that approval is required.
func (s *Surface) notifyPendingFave(
ctx context.Context,
fave *gtsmodel.StatusFave,
account *gtsmodel.Account,
targetAccount *gtsmodel.Account,
status *gtsmodel.Status,
) error {
notifyable, err := s.notifyableFave(ctx, fave)
notifyable, err := s.notifyableFave(ctx,
account,
targetAccount,
status,
)
if err != nil {
return err
}
@ -304,34 +316,31 @@ func (s *Surface) notifyPendingFave(
// of fave by account.
if err := s.Notify(ctx,
gtsmodel.NotificationPendingFave,
fave.TargetAccount,
fave.Account,
fave.StatusID,
targetAccount,
account,
status.ID,
); err != nil {
return gtserror.Newf("error notifying status author %s: %w", fave.TargetAccountID, err)
return gtserror.Newf("error notifying status author %s: %w", targetAccount.ID, err)
}
return nil
}
// notifyableFave checks that the given
// fave should be notified, taking account
// of localness of receiving account, and mutes.
// notifyableFave checks if a fave should
// be notified, taking account of localness
// of target account, and thread mutes.
func (s *Surface) notifyableFave(
ctx context.Context,
fave *gtsmodel.StatusFave,
account *gtsmodel.Account,
targetAccount *gtsmodel.Account,
status *gtsmodel.Status,
) (bool, error) {
if fave.TargetAccountID == fave.AccountID {
if targetAccount.ID == account.ID {
// Self-fave, nothing to do.
return false, nil
}
// Beforehand, ensure the passed status fave is fully populated.
if err := s.State.DB.PopulateStatusFave(ctx, fave); err != nil {
return false, gtserror.Newf("error populating fave %s: %w", fave.ID, err)
}
if fave.TargetAccount.IsRemote() {
if targetAccount.IsRemote() {
// no need to notify
// remote accounts.
return false, nil
@ -341,11 +350,11 @@ func (s *Surface) notifyableFave(
// muted the thread.
muted, err := s.State.DB.IsThreadMutedByAccount(
ctx,
fave.Status.ThreadID,
fave.TargetAccountID,
status.ThreadID,
targetAccount.ID,
)
if err != nil {
return false, gtserror.Newf("error checking status thread mute %s: %w", fave.StatusID, err)
return false, gtserror.Newf("error checking status thread mute %s: %w", status.ID, err)
}
if muted {
@ -357,13 +366,20 @@ func (s *Surface) notifyableFave(
return true, nil
}
// notifyAnnounce notifies the status boost target
// account that their status has been boosted.
// notifyAnnounce notifies the target
// acct that their status has been boosted.
func (s *Surface) notifyAnnounce(
ctx context.Context,
boost *gtsmodel.Status,
account *gtsmodel.Account,
targetAccount *gtsmodel.Account,
targetStatus *gtsmodel.Status,
) error {
notifyable, err := s.notifyableAnnounce(ctx, boost)
notifyable, err := s.notifyableAnnounce(
ctx,
account,
targetAccount,
targetStatus,
)
if err != nil {
return err
}
@ -377,11 +393,11 @@ func (s *Surface) notifyAnnounce(
// of boost by account.
if err := s.Notify(ctx,
gtsmodel.NotificationReblog,
boost.BoostOfAccount,
boost.Account,
boost.ID,
targetAccount,
account,
targetStatus.ID,
); err != nil {
return gtserror.Newf("error notifying boost target %s: %w", boost.BoostOfAccountID, err)
return gtserror.Newf("error notifying boost target %s: %w", targetAccount.ID, err)
}
return nil
@ -392,9 +408,16 @@ func (s *Surface) notifyAnnounce(
// and that the boost requires approval.
func (s *Surface) notifyPendingAnnounce(
ctx context.Context,
boost *gtsmodel.Status,
account *gtsmodel.Account,
targetAccount *gtsmodel.Account,
targetStatus *gtsmodel.Status,
) error {
notifyable, err := s.notifyableAnnounce(ctx, boost)
notifyable, err := s.notifyableAnnounce(
ctx,
account,
targetAccount,
targetStatus,
)
if err != nil {
return err
}
@ -408,11 +431,11 @@ func (s *Surface) notifyPendingAnnounce(
// of boost by account.
if err := s.Notify(ctx,
gtsmodel.NotificationPendingReblog,
boost.BoostOfAccount,
boost.Account,
boost.ID,
targetAccount,
account,
targetStatus.ID,
); err != nil {
return gtserror.Newf("error notifying boost target %s: %w", boost.BoostOfAccountID, err)
return gtserror.Newf("error notifying pending boost target %s: %w", targetAccount.ID, err)
}
return nil
@ -423,24 +446,21 @@ func (s *Surface) notifyPendingAnnounce(
// of localness of receiving account, and mutes.
func (s *Surface) notifyableAnnounce(
ctx context.Context,
status *gtsmodel.Status,
account *gtsmodel.Account,
targetAccount *gtsmodel.Account,
targetStatus *gtsmodel.Status,
) (bool, error) {
if status.BoostOfID == "" {
// Not a boost, nothing to do.
return false, nil
}
if status.BoostOfAccountID == status.AccountID {
if account.ID == targetStatus.AccountID {
// Self-boost, nothing to do.
return false, nil
}
// Beforehand, ensure the passed status is fully populated.
if err := s.State.DB.PopulateStatus(ctx, status); err != nil {
return false, gtserror.Newf("error populating status %s: %w", status.ID, err)
// Ensure boosted status is populated.
if err := s.State.DB.PopulateStatus(ctx, targetStatus); err != nil {
return false, gtserror.Newf("error populating status %s: %w", targetStatus.ID, err)
}
if status.BoostOfAccount.IsRemote() {
if targetStatus.Account.IsRemote() {
// no need to notify
// remote accounts.
return false, nil
@ -450,12 +470,11 @@ func (s *Surface) notifyableAnnounce(
// muted the thread.
muted, err := s.State.DB.IsThreadMutedByAccount(
ctx,
status.BoostOf.ThreadID,
status.BoostOfAccountID,
targetStatus.ThreadID,
targetAccount.ID,
)
if err != nil {
return false, gtserror.Newf("error checking status thread mute %s: %w", status.BoostOfID, err)
return false, gtserror.Newf("error checking status thread mute %s: %w", targetStatus.ID, err)
}
if muted {

View file

@ -526,9 +526,14 @@ func (u *utils) decrementFollowRequestsCount(
return nil
}
// requestFave stores an interaction request
// faveToPendingFave stores an interaction request
// for the given fave, and notifies the interactee.
func (u *utils) requestFave(
//
// This is useful when a local account needs to send
// out a LikeRequest, or when a remote account has
// sent us a Like instead of a LikeRequest for a
// status where Liking requires approval.
func (u *utils) faveToPendingFave(
ctx context.Context,
fave *gtsmodel.StatusFave,
) error {
@ -561,17 +566,26 @@ func (u *utils) requestFave(
return gtserror.Newf("db error storing interaction request: %w", err)
}
// Notify *local* account of pending announce.
if err := u.surface.notifyPendingFave(ctx, fave); err != nil {
// Notify *local* account of pending fave.
if err := u.surface.notifyPendingFave(ctx,
fave.Account,
fave.TargetAccount,
fave.Status,
); err != nil {
return gtserror.Newf("error notifying pending fave: %w", err)
}
return nil
}
// requestReply stores an interaction request
// replyToRequestReply stores an interaction request
// for the given reply, and notifies the interactee.
func (u *utils) requestReply(
//
// This is useful when a local account needs to send
// out a ReplyRequest, or when a remote account has
// sent us a Create instead of a ReplyRequest for a
// status where replying requires approval.
func (u *utils) replyToRequestReply(
ctx context.Context,
reply *gtsmodel.Status,
) error {
@ -612,9 +626,14 @@ func (u *utils) requestReply(
return nil
}
// requestAnnounce stores an interaction request
// announceToRequestAnnounce stores an interaction request
// for the given announce, and notifies the interactee.
func (u *utils) requestAnnounce(
//
// This is useful when a local account needs to send
// out an AnnounceRequest, or when a remote account has
// sent us an Announce instead of an AnnounceRequest for
// a status where announcing requires approval.
func (u *utils) announceToRequestAnnounce(
ctx context.Context,
boost *gtsmodel.Status,
) error {
@ -648,7 +667,12 @@ func (u *utils) requestAnnounce(
}
// Notify *local* account of pending announce.
if err := u.surface.notifyPendingAnnounce(ctx, boost); err != nil {
if err := u.surface.notifyPendingAnnounce(
ctx,
boost.Account,
boost.BoostOfAccount,
boost.BoostOf,
); err != nil {
return gtserror.Newf("error notifying pending announce: %w", err)
}