implement streaming notifications

This commit is contained in:
tsmethurst 2021-06-19 10:47:05 +02:00
commit 03a6f387e2
9 changed files with 152 additions and 29 deletions

View file

@ -96,6 +96,16 @@ func (p *processor) notifyStatus(status *gtsmodel.Status) error {
if err := p.db.Put(notif); err != nil {
return fmt.Errorf("notifyStatus: error putting notification in database: %s", err)
}
// now stream the notification to the user
mastoNotif, err := p.tc.NotificationToMasto(notif)
if err != nil {
return fmt.Errorf("notifyStatus: error converting notification to masto representation: %s", err)
}
if err := p.streamingProcessor.StreamNotificationToAccount(mastoNotif, m.GTSAccount); err != nil {
return fmt.Errorf("notifyStatus: error streaming notification to account: %s", err)
}
}
return nil
@ -123,6 +133,16 @@ func (p *processor) notifyFollowRequest(followRequest *gtsmodel.FollowRequest, r
return fmt.Errorf("notifyFollowRequest: error putting notification in database: %s", err)
}
// now stream the notification to the user
mastoNotif, err := p.tc.NotificationToMasto(notif)
if err != nil {
return fmt.Errorf("notifyStatus: error converting notification to masto representation: %s", err)
}
if err := p.streamingProcessor.StreamNotificationToAccount(mastoNotif, receivingAccount); err != nil {
return fmt.Errorf("notifyStatus: error streaming notification to account: %s", err)
}
return nil
}
@ -157,6 +177,16 @@ func (p *processor) notifyFollow(follow *gtsmodel.Follow, receivingAccount *gtsm
return fmt.Errorf("notifyFollow: error putting notification in database: %s", err)
}
// now stream the notification to the user
mastoNotif, err := p.tc.NotificationToMasto(notif)
if err != nil {
return fmt.Errorf("notifyStatus: error converting notification to masto representation: %s", err)
}
if err := p.streamingProcessor.StreamNotificationToAccount(mastoNotif, receivingAccount); err != nil {
return fmt.Errorf("notifyStatus: error streaming notification to account: %s", err)
}
return nil
}
@ -183,6 +213,16 @@ func (p *processor) notifyFave(fave *gtsmodel.StatusFave, receivingAccount *gtsm
return fmt.Errorf("notifyFave: error putting notification in database: %s", err)
}
// now stream the notification to the user
mastoNotif, err := p.tc.NotificationToMasto(notif)
if err != nil {
return fmt.Errorf("notifyStatus: error converting notification to masto representation: %s", err)
}
if err := p.streamingProcessor.StreamNotificationToAccount(mastoNotif, receivingAccount); err != nil {
return fmt.Errorf("notifyStatus: error streaming notification to account: %s", err)
}
return nil
}
@ -242,6 +282,16 @@ func (p *processor) notifyAnnounce(status *gtsmodel.Status) error {
return fmt.Errorf("notifyAnnounce: error putting notification in database: %s", err)
}
// now stream the notification to the user
mastoNotif, err := p.tc.NotificationToMasto(notif)
if err != nil {
return fmt.Errorf("notifyStatus: error converting notification to masto representation: %s", err)
}
if err := p.streamingProcessor.StreamNotificationToAccount(mastoNotif, boostedAcct); err != nil {
return fmt.Errorf("notifyStatus: error streaming notification to account: %s", err)
}
return nil
}
@ -321,16 +371,22 @@ func (p *processor) timelineStatusForAccount(status *gtsmodel.Status, accountID
return
}
if err := p.timelineManager.IngestAndPrepare(status, timelineAccount.ID); err != nil {
// stick the status in the timeline for the account and then immediately prepare it so they can see it right away
inserted, err := p.timelineManager.IngestAndPrepare(status, timelineAccount.ID)
if err != nil {
errors <- fmt.Errorf("timelineStatusForAccount: error ingesting status %s: %s", status.ID, err)
return
}
mastoStatus, err := p.tc.StatusToMasto(status, timelineAccount)
if err != nil {
errors <- fmt.Errorf("timelineStatusForAccount: error converting status %s to frontend representation: %s", status.ID, err)
} else {
if err := p.streamingProcessor.StreamStatusToAccount(mastoStatus, timelineAccount); err != nil {
errors <- fmt.Errorf("timelineStatusForAccount: error streaming status %s: %s", status.ID, err)
// the status was inserted to stream it to the user
if inserted {
mastoStatus, err := p.tc.StatusToMasto(status, timelineAccount)
if err != nil {
errors <- fmt.Errorf("timelineStatusForAccount: error converting status %s to frontend representation: %s", status.ID, err)
} else {
if err := p.streamingProcessor.StreamStatusToAccount(mastoStatus, timelineAccount); err != nil {
errors <- fmt.Errorf("timelineStatusForAccount: error streaming status %s: %s", status.ID, err)
}
}
}
}