diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 634850d42..7359d65fa 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -32,6 +32,7 @@ These contribution guidelines were adapted from / inspired by those of Gitea (ht - [CLI Tests](#cli-tests) - [Federation](#federation) - [Updating Swagger docs](#updating-swagger-docs) + - [CI/CD configuration](#ci-cd-configuration) - [Other Useful Stuff](#other-useful-stuff) - [Running migrations on a Postgres DB backup locally](#running-migrations-on-a-postgres-db-backup-locally) diff --git a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go index f4ca7cd90..bfa4dd84f 100644 --- a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go +++ b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go @@ -24,6 +24,7 @@ import ( "reflect" "slices" "strings" + "time" "code.superseriousbusiness.org/gotosocial/internal/db" newmodel "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new" @@ -61,21 +62,9 @@ func init() { return gtserror.Newf("error adding statuses column thread_id_new: %w", err) } - // Create an index on thread_id_new so - // we can keep track of it as we update. - // - // We'll remove this at the end of the migration. - log.Info(ctx, "creating temporary thread_id_new index") - if _, err := db.NewCreateIndex(). - Table("statuses"). - Index("statuses_thread_id_new_idx"). - Column("thread_id_new"). - Exec(ctx); err != nil { - return gtserror.Newf("error creating temporary thread_id_new index: %w", err) - } - var sr statusRethreader - var updatedRows int64 + var updatedRowsTotal int64 + var maxID string var statuses []*oldmodel.Status // Get a total count of all statuses before migration. @@ -84,41 +73,50 @@ func init() { return gtserror.Newf("error getting status table count: %w", err) } - log.Warnf(ctx, "migrating %d statuses, this may take a *long* time, and the first few queries will likely be slower than the rest", total) + // Start at largest + // possible ULID value. + maxID = id.Highest + + log.Warnf(ctx, "migrating %d statuses, this may take a *long* time", total) for { + start := time.Now() // Reset slice. clear(statuses) statuses = statuses[:0] - // Select IDs of next batch, choosing - // only statuses we haven't migrated yet. - // - // Let the database give us these in whatever order - // it likes, as it's faster than doing an ORDER BY. + // Select IDs of next + // batch, paging down. if err := db.NewSelect(). Model(&statuses). Column("id"). - Where("? = ?", bun.Ident("thread_id_new"), id.Lowest). + Where("? < ?", bun.Ident("id"), maxID). + OrderExpr("? DESC", bun.Ident("id")). Limit(250). Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) { return gtserror.Newf("error selecting unthreaded statuses: %w", err) } // No more statuses! - if len(statuses) == 0 { + l := len(statuses) + if l == 0 { log.Info(ctx, "done migrating statuses!") break } + // Set next maxID value from statuses. + maxID = statuses[l-1].ID + // Rethread each selected status in a transaction. + var updatedRowsThisBatch int64 if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { for _, status := range statuses { n, err := sr.rethreadStatus(ctx, tx, status) if err != nil { return gtserror.Newf("error rethreading status %s: %w", status.URI, err) } - updatedRows += n + updatedRowsThisBatch += n + updatedRowsTotal += n } return nil @@ -126,22 +124,26 @@ func init() { return err } - // Show percent migrated. + // Show current speed + percent migrated. // - // Will maybe end up wonky due to approximations - // and batching, so stop showing it after 99%. - percentDone := (float64(updatedRows) / float64(total)) * 100 - if percentDone <= 99 { + // Percent may end up wonky due to approximations + // and batching, so show a generic message at 100%. + timeTaken := time.Since(start).Milliseconds() + msPerRow := float64(timeTaken) / float64(updatedRowsThisBatch) + rowsPerMs := float64(1) / float64(msPerRow) + rowsPerSecond := 1000 * rowsPerMs + percentDone := (float64(updatedRowsTotal) / float64(total)) * 100 + if percentDone <= 100 { log.Infof( ctx, - "[updated %d rows] migrated approx. %.2f%% of statuses", - updatedRows, percentDone, + "[updated %d total rows, now @ ~%.0f rows/s] done ~%.2f%% of statuses", + updatedRowsTotal, rowsPerSecond, percentDone, ) } else { log.Infof( ctx, - "[updated %d rows] almost done migrating... ", - updatedRows, + "[updated %d total rows, now @ ~%.0f rows/s] almost done... ", + updatedRowsTotal, rowsPerSecond, ) } } @@ -151,13 +153,6 @@ func init() { return err } - log.Info(ctx, "dropping temporary thread_id_new index") - if _, err := db.NewDropIndex(). - Index("statuses_thread_id_new_idx"). - Exec(ctx); err != nil { - return gtserror.Newf("error dropping temporary thread_id_new index: %w", err) - } - log.Info(ctx, "dropping old thread_to_statuses table") if _, err := db.NewDropTable(). Table("thread_to_statuses"). @@ -304,8 +299,6 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu return 0, gtserror.Newf("error getting children: %w", err) } - // Dedupe thread IDs. - // Check for newly picked-up threads // to find stragglers for below. Else // we've reached end of what we can do.