Compare commits

..

3 commits

Author SHA1 Message Date
tobi
c7b0150834 whew 2025-09-26 17:14:42 +02:00
tobi
cde373143a remove errant comment 2025-09-26 15:17:30 +02:00
tobi
c99412af98 finalize indexes etc 2025-09-26 15:14:26 +02:00
2 changed files with 35 additions and 41 deletions

View file

@ -32,6 +32,7 @@ These contribution guidelines were adapted from / inspired by those of Gitea (ht
- [CLI Tests](#cli-tests) - [CLI Tests](#cli-tests)
- [Federation](#federation) - [Federation](#federation)
- [Updating Swagger docs](#updating-swagger-docs) - [Updating Swagger docs](#updating-swagger-docs)
- [CI/CD configuration](#ci-cd-configuration)
- [Other Useful Stuff](#other-useful-stuff) - [Other Useful Stuff](#other-useful-stuff)
- [Running migrations on a Postgres DB backup locally](#running-migrations-on-a-postgres-db-backup-locally) - [Running migrations on a Postgres DB backup locally](#running-migrations-on-a-postgres-db-backup-locally)

View file

@ -24,6 +24,7 @@ import (
"reflect" "reflect"
"slices" "slices"
"strings" "strings"
"time"
"code.superseriousbusiness.org/gotosocial/internal/db" "code.superseriousbusiness.org/gotosocial/internal/db"
newmodel "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new" newmodel "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new"
@ -61,21 +62,9 @@ func init() {
return gtserror.Newf("error adding statuses column thread_id_new: %w", err) return gtserror.Newf("error adding statuses column thread_id_new: %w", err)
} }
// Create an index on thread_id_new so
// we can keep track of it as we update.
//
// We'll remove this at the end of the migration.
log.Info(ctx, "creating temporary thread_id_new index")
if _, err := db.NewCreateIndex().
Table("statuses").
Index("statuses_thread_id_new_idx").
Column("thread_id_new").
Exec(ctx); err != nil {
return gtserror.Newf("error creating temporary thread_id_new index: %w", err)
}
var sr statusRethreader var sr statusRethreader
var updatedRows int64 var updatedRowsTotal int64
var maxID string
var statuses []*oldmodel.Status var statuses []*oldmodel.Status
// Get a total count of all statuses before migration. // Get a total count of all statuses before migration.
@ -84,41 +73,50 @@ func init() {
return gtserror.Newf("error getting status table count: %w", err) return gtserror.Newf("error getting status table count: %w", err)
} }
log.Warnf(ctx, "migrating %d statuses, this may take a *long* time, and the first few queries will likely be slower than the rest", total) // Start at largest
// possible ULID value.
maxID = id.Highest
log.Warnf(ctx, "migrating %d statuses, this may take a *long* time", total)
for { for {
start := time.Now()
// Reset slice. // Reset slice.
clear(statuses) clear(statuses)
statuses = statuses[:0] statuses = statuses[:0]
// Select IDs of next batch, choosing // Select IDs of next
// only statuses we haven't migrated yet. // batch, paging down.
//
// Let the database give us these in whatever order
// it likes, as it's faster than doing an ORDER BY.
if err := db.NewSelect(). if err := db.NewSelect().
Model(&statuses). Model(&statuses).
Column("id"). Column("id").
Where("? = ?", bun.Ident("thread_id_new"), id.Lowest). Where("? < ?", bun.Ident("id"), maxID).
OrderExpr("? DESC", bun.Ident("id")).
Limit(250). Limit(250).
Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) { Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) {
return gtserror.Newf("error selecting unthreaded statuses: %w", err) return gtserror.Newf("error selecting unthreaded statuses: %w", err)
} }
// No more statuses! // No more statuses!
if len(statuses) == 0 { l := len(statuses)
if l == 0 {
log.Info(ctx, "done migrating statuses!") log.Info(ctx, "done migrating statuses!")
break break
} }
// Set next maxID value from statuses.
maxID = statuses[l-1].ID
// Rethread each selected status in a transaction. // Rethread each selected status in a transaction.
var updatedRowsThisBatch int64
if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
for _, status := range statuses { for _, status := range statuses {
n, err := sr.rethreadStatus(ctx, tx, status) n, err := sr.rethreadStatus(ctx, tx, status)
if err != nil { if err != nil {
return gtserror.Newf("error rethreading status %s: %w", status.URI, err) return gtserror.Newf("error rethreading status %s: %w", status.URI, err)
} }
updatedRows += n updatedRowsThisBatch += n
updatedRowsTotal += n
} }
return nil return nil
@ -126,22 +124,26 @@ func init() {
return err return err
} }
// Show percent migrated. // Show current speed + percent migrated.
// //
// Will maybe end up wonky due to approximations // Percent may end up wonky due to approximations
// and batching, so stop showing it after 99%. // and batching, so show a generic message at 100%.
percentDone := (float64(updatedRows) / float64(total)) * 100 timeTaken := time.Since(start).Milliseconds()
if percentDone <= 99 { msPerRow := float64(timeTaken) / float64(updatedRowsThisBatch)
rowsPerMs := float64(1) / float64(msPerRow)
rowsPerSecond := 1000 * rowsPerMs
percentDone := (float64(updatedRowsTotal) / float64(total)) * 100
if percentDone <= 100 {
log.Infof( log.Infof(
ctx, ctx,
"[updated %d rows] migrated approx. %.2f%% of statuses", "[updated %d total rows, now @ ~%.0f rows/s] done ~%.2f%% of statuses",
updatedRows, percentDone, updatedRowsTotal, rowsPerSecond, percentDone,
) )
} else { } else {
log.Infof( log.Infof(
ctx, ctx,
"[updated %d rows] almost done migrating... ", "[updated %d total rows, now @ ~%.0f rows/s] almost done... ",
updatedRows, updatedRowsTotal, rowsPerSecond,
) )
} }
} }
@ -151,13 +153,6 @@ func init() {
return err return err
} }
log.Info(ctx, "dropping temporary thread_id_new index")
if _, err := db.NewDropIndex().
Index("statuses_thread_id_new_idx").
Exec(ctx); err != nil {
return gtserror.Newf("error dropping temporary thread_id_new index: %w", err)
}
log.Info(ctx, "dropping old thread_to_statuses table") log.Info(ctx, "dropping old thread_to_statuses table")
if _, err := db.NewDropTable(). if _, err := db.NewDropTable().
Table("thread_to_statuses"). Table("thread_to_statuses").
@ -304,8 +299,6 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu
return 0, gtserror.Newf("error getting children: %w", err) return 0, gtserror.Newf("error getting children: %w", err)
} }
// Dedupe thread IDs.
// Check for newly picked-up threads // Check for newly picked-up threads
// to find stragglers for below. Else // to find stragglers for below. Else
// we've reached end of what we can do. // we've reached end of what we can do.