[chore] migration to update statuses.thread_id to be notnull (#4160)

# Description

This is quite a complex database migration that updates the `statuses.thread_id` column to be notnull, in order that statuses always be threaded, which will be useful in various pieces of upcoming work. This is unfortunately a migration that acts over the entire statuses table, and is quite complex in order to ensure that all existing statuses get correctly threaded together, and where possible fix any issues of statuses in the same thread having incorrect thread_ids.

TODO:
- ~~update testrig models to all be threaded~~
- ~~update code to ensure thread_id is always set~~
- ~~run on **a copy** of an sqlite production database~~
- ~~run on **a copy** of a postgres production database~~

## Checklist

- [x] I/we have read the [GoToSocial contribution guidelines](https://codeberg.org/superseriousbusiness/gotosocial/src/branch/main/CONTRIBUTING.md).
- [x] I/we have discussed the proposed changes already, either in an issue on the repository, or in the Matrix chat.
- [x] I/we have not leveraged AI to create the proposed changes.
- [x] I/we have performed a self-review of added code.
- [x] I/we have written code that is legible and maintainable by others.
- [x] I/we have commented the added code, particularly in hard-to-understand areas.
- [ ] I/we have made any necessary changes to documentation.
- [x] I/we have added tests that cover new code.
- [x] I/we have run tests and they pass locally with the changes.
- [x] I/we have run `go fmt ./...` and `golangci-lint run`.

Reviewed-on: https://codeberg.org/superseriousbusiness/gotosocial/pulls/4160
Co-authored-by: kim <grufwub@gmail.com>
Co-committed-by: kim <grufwub@gmail.com>
This commit is contained in:
kim 2025-05-26 15:33:42 +02:00 committed by tobi
commit 311d9a1697
19 changed files with 1660 additions and 386 deletions

View file

@ -21,11 +21,13 @@ import (
"context"
"errors"
"slices"
"strings"
"code.superseriousbusiness.org/gotosocial/internal/db"
"code.superseriousbusiness.org/gotosocial/internal/gtscontext"
"code.superseriousbusiness.org/gotosocial/internal/gtserror"
"code.superseriousbusiness.org/gotosocial/internal/gtsmodel"
"code.superseriousbusiness.org/gotosocial/internal/id"
"code.superseriousbusiness.org/gotosocial/internal/log"
"code.superseriousbusiness.org/gotosocial/internal/state"
"code.superseriousbusiness.org/gotosocial/internal/util/xslices"
@ -335,115 +337,284 @@ func (s *statusDB) PutStatus(ctx context.Context, status *gtsmodel.Status) error
// as the cache does not attempt a mutex lock until AFTER hook.
//
return s.db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
// create links between this status and any emojis it uses
for _, i := range status.EmojiIDs {
if status.BoostOfID != "" {
var threadID string
// Boost wrappers always inherit thread
// of the origin status they're boosting.
if err := tx.
NewSelect().
Table("statuses").
Column("thread_id").
Where("? = ?", bun.Ident("id"), status.BoostOfID).
Scan(ctx, &threadID); err != nil {
return gtserror.Newf("error selecting boosted status: %w", err)
}
// Set the selected thread.
status.ThreadID = threadID
// They also require no further
// checks! Simply insert status here.
return insertStatus(ctx, tx, status)
}
// Gather a list of possible thread IDs
// of all the possible related statuses
// to this one. If one exists we can use
// the end result, and if too many exist
// we can fix the status threading.
var threadIDs []string
if status.InReplyToID != "" {
var threadID string
// A stored parent status exists,
// select its thread ID to ideally
// inherit this for status.
if err := tx.
NewSelect().
Table("statuses").
Column("thread_id").
Where("? = ?", bun.Ident("id"), status.InReplyToID).
Scan(ctx, &threadID); err != nil {
return gtserror.Newf("error selecting status parent: %w", err)
}
// Append possible ID to threads slice.
threadIDs = append(threadIDs, threadID)
} else if status.InReplyToURI != "" {
var ids []string
// A parent status exists but is not
// yet stored. See if any siblings for
// this shared parent exist with their
// own thread IDs.
if err := tx.
NewSelect().
Table("statuses").
Column("thread_id").
Where("? = ?", bun.Ident("in_reply_to_uri"), status.InReplyToURI).
Scan(ctx, &ids); err != nil && !errors.Is(err, db.ErrNoEntries) {
return gtserror.Newf("error selecting status siblings: %w", err)
}
// Append possible IDs to threads slice.
threadIDs = append(threadIDs, ids...)
}
if !*status.Local {
var ids []string
// For remote statuses specifically, check to
// see if any children are stored for this new
// stored parent with their own thread IDs.
if err := tx.
NewSelect().
Table("statuses").
Column("thread_id").
Where("? = ?", bun.Ident("in_reply_to_uri"), status.URI).
Scan(ctx, &ids); err != nil && !errors.Is(err, db.ErrNoEntries) {
return gtserror.Newf("error selecting status children: %w", err)
}
// Append possible IDs to threads slice.
threadIDs = append(threadIDs, ids...)
}
// Ensure only *unique* posssible thread IDs.
threadIDs = xslices.Deduplicate(threadIDs)
switch len(threadIDs) {
case 0:
// No related status with thread ID already exists,
// so create new thread ID from status creation time.
threadID := id.NewULIDFromTime(status.CreatedAt)
// Insert new thread.
if _, err := tx.
NewInsert().
Model(&gtsmodel.StatusToEmoji{
StatusID: status.ID,
EmojiID: i,
}).
On("CONFLICT (?, ?) DO NOTHING", bun.Ident("status_id"), bun.Ident("emoji_id")).
Model(&gtsmodel.Thread{ID: threadID}).
Exec(ctx); err != nil {
if !errors.Is(err, db.ErrAlreadyExists) {
return err
}
return gtserror.Newf("error inserting thread: %w", err)
}
// Update status thread ID.
status.ThreadID = threadID
case 1:
// Inherit single known thread.
status.ThreadID = threadIDs[0]
default:
var err error
log.Infof(ctx, "reconciling status threading for %s: [%s]", status.URI, strings.Join(threadIDs, ","))
status.ThreadID, err = s.fixStatusThreading(ctx, tx, threadIDs)
if err != nil {
return err
}
}
// create links between this status and any tags it uses
for _, i := range status.TagIDs {
if _, err := tx.
NewInsert().
Model(&gtsmodel.StatusToTag{
StatusID: status.ID,
TagID: i,
}).
On("CONFLICT (?, ?) DO NOTHING", bun.Ident("status_id"), bun.Ident("tag_id")).
Exec(ctx); err != nil {
if !errors.Is(err, db.ErrAlreadyExists) {
return err
}
}
}
// change the status ID of the media
// attachments to the current status
for _, a := range status.Attachments {
a.StatusID = status.ID
if _, err := tx.
NewUpdate().
Model(a).
Column("status_id").
Where("? = ?", bun.Ident("media_attachment.id"), a.ID).
Exec(ctx); err != nil {
if !errors.Is(err, db.ErrAlreadyExists) {
return err
}
}
}
// If the status is threaded, create
// link between thread and status.
if status.ThreadID != "" {
if _, err := tx.
NewInsert().
Model(&gtsmodel.ThreadToStatus{
ThreadID: status.ThreadID,
StatusID: status.ID,
}).
On("CONFLICT (?, ?) DO NOTHING", bun.Ident("thread_id"), bun.Ident("status_id")).
Exec(ctx); err != nil {
if !errors.Is(err, db.ErrAlreadyExists) {
return err
}
}
}
// Finally, insert the status
_, err := tx.NewInsert().
Model(status).
Exec(ctx)
return err
// And after threading, insert status.
// This will error if ThreadID is unset.
return insertStatus(ctx, tx, status)
})
})
}
// fixStatusThreading can be called to reconcile statuses in the same thread but known to be using multiple given threads.
func (s *statusDB) fixStatusThreading(ctx context.Context, tx bun.Tx, threadIDs []string) (string, error) {
if len(threadIDs) <= 1 {
panic("invalid call to fixStatusThreading()")
}
// Sort ascending, i.e.
// oldest thread ID first.
slices.Sort(threadIDs)
// Drop the oldest thread ID
// from slice, we'll keep this.
threadID := threadIDs[0]
threadIDs = threadIDs[1:]
// On updates, gather IDs of changed model
// IDs for later stage of cache invalidation,
// preallocating slices for worst-case scenarios.
statusIDs := make([]string, 0, 4*len(threadIDs))
muteIDs := make([]string, 0, 4*len(threadIDs))
// Update all statuses with
// thread IDs to use oldest.
if _, err := tx.
NewUpdate().
Table("statuses").
Where("? IN (?)", bun.Ident("thread_id"), bun.In(threadIDs)).
Set("? = ?", bun.Ident("thread_id"), threadID).
Returning("?", bun.Ident("id")).
Exec(ctx, &statusIDs); err != nil && !errors.Is(err, db.ErrNoEntries) {
return "", gtserror.Newf("error updating statuses: %w", err)
}
// Update all thread mutes with
// thread IDs to use oldest.
if _, err := tx.
NewUpdate().
Table("thread_mutes").
Where("? IN (?)", bun.Ident("thread_id"), bun.In(threadIDs)).
Set("? = ?", bun.Ident("thread_id"), threadID).
Returning("?", bun.Ident("id")).
Exec(ctx, &muteIDs); err != nil && !errors.Is(err, db.ErrNoEntries) {
return "", gtserror.Newf("error updating thread mutes: %w", err)
}
// Delete all now
// unused thread IDs.
if _, err := tx.
NewDelete().
Table("threads").
Where("? IN (?)", bun.Ident("id"), bun.In(threadIDs)).
Exec(ctx); err != nil {
return "", gtserror.Newf("error deleting threads: %w", err)
}
// Invalidate caches for changed statuses and mutes.
s.state.Caches.DB.Status.InvalidateIDs("ID", statusIDs)
s.state.Caches.DB.ThreadMute.InvalidateIDs("ID", muteIDs)
return threadID, nil
}
// insertStatus handles the base status insert logic, that is the status itself,
// any intermediary table links, and updating media attachments to point to status.
func insertStatus(ctx context.Context, tx bun.Tx, status *gtsmodel.Status) error {
// create links between this
// status and any emojis it uses
for _, id := range status.EmojiIDs {
if _, err := tx.
NewInsert().
Model(&gtsmodel.StatusToEmoji{
StatusID: status.ID,
EmojiID: id,
}).
Exec(ctx); err != nil {
return gtserror.Newf("error inserting status_to_emoji: %w", err)
}
}
// create links between this
// status and any tags it uses
for _, id := range status.TagIDs {
if _, err := tx.
NewInsert().
Model(&gtsmodel.StatusToTag{
StatusID: status.ID,
TagID: id,
}).
Exec(ctx); err != nil {
return gtserror.Newf("error inserting status_to_tag: %w", err)
}
}
// change the status ID of the media
// attachments to the current status
for _, a := range status.Attachments {
a.StatusID = status.ID
if _, err := tx.
NewUpdate().
Model(a).
Column("status_id").
Where("? = ?", bun.Ident("media_attachment.id"), a.ID).
Exec(ctx); err != nil {
return gtserror.Newf("error updating media: %w", err)
}
}
// Finally, insert the status
if _, err := tx.NewInsert().
Model(status).
Exec(ctx); err != nil {
return gtserror.Newf("error inserting status: %w", err)
}
return nil
}
func (s *statusDB) UpdateStatus(ctx context.Context, status *gtsmodel.Status, columns ...string) error {
return s.state.Caches.DB.Status.Store(status, func() error {
// It is safe to run this database transaction within cache.Store
// as the cache does not attempt a mutex lock until AFTER hook.
//
return s.db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
// create links between this status and any emojis it uses
for _, i := range status.EmojiIDs {
// create links between this
// status and any emojis it uses
for _, id := range status.EmojiIDs {
if _, err := tx.
NewInsert().
Model(&gtsmodel.StatusToEmoji{
StatusID: status.ID,
EmojiID: i,
EmojiID: id,
}).
On("CONFLICT (?, ?) DO NOTHING", bun.Ident("status_id"), bun.Ident("emoji_id")).
Exec(ctx); err != nil {
if !errors.Is(err, db.ErrAlreadyExists) {
return err
}
return err
}
}
// create links between this status and any tags it uses
for _, i := range status.TagIDs {
// create links between this
// status and any tags it uses
for _, id := range status.TagIDs {
if _, err := tx.
NewInsert().
Model(&gtsmodel.StatusToTag{
StatusID: status.ID,
TagID: i,
TagID: id,
}).
On("CONFLICT (?, ?) DO NOTHING", bun.Ident("status_id"), bun.Ident("tag_id")).
Exec(ctx); err != nil {
if !errors.Is(err, db.ErrAlreadyExists) {
return err
}
return err
}
}
@ -457,26 +628,7 @@ func (s *statusDB) UpdateStatus(ctx context.Context, status *gtsmodel.Status, co
Column("status_id").
Where("? = ?", bun.Ident("media_attachment.id"), a.ID).
Exec(ctx); err != nil {
if !errors.Is(err, db.ErrAlreadyExists) {
return err
}
}
}
// If the status is threaded, create
// link between thread and status.
if status.ThreadID != "" {
if _, err := tx.
NewInsert().
Model(&gtsmodel.ThreadToStatus{
ThreadID: status.ThreadID,
StatusID: status.ID,
}).
On("CONFLICT (?, ?) DO NOTHING", bun.Ident("thread_id"), bun.Ident("status_id")).
Exec(ctx); err != nil {
if !errors.Is(err, db.ErrAlreadyExists) {
return err
}
return err
}
}
@ -499,7 +651,9 @@ func (s *statusDB) DeleteStatusByID(ctx context.Context, id string) error {
// Delete status from database and any related links in a transaction.
if err := s.db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
// delete links between this status and any emojis it uses
// delete links between this
// status and any emojis it uses
if _, err := tx.
NewDelete().
TableExpr("? AS ?", bun.Ident("status_to_emojis"), bun.Ident("status_to_emoji")).
@ -508,7 +662,8 @@ func (s *statusDB) DeleteStatusByID(ctx context.Context, id string) error {
return err
}
// delete links between this status and any tags it uses
// delete links between this
// status and any tags it uses
if _, err := tx.
NewDelete().
TableExpr("? AS ?", bun.Ident("status_to_tags"), bun.Ident("status_to_tag")).
@ -517,16 +672,6 @@ func (s *statusDB) DeleteStatusByID(ctx context.Context, id string) error {
return err
}
// Delete links between this status
// and any threads it was a part of.
if _, err := tx.
NewDelete().
TableExpr("? AS ?", bun.Ident("thread_to_statuses"), bun.Ident("thread_to_status")).
Where("? = ?", bun.Ident("thread_to_status.status_id"), id).
Exec(ctx); err != nil {
return err
}
// delete the status itself
if _, err := tx.
NewDelete().