From c7b0150834fb878a86a52e615b303dc00a66fe32 Mon Sep 17 00:00:00 2001 From: tobi Date: Fri, 26 Sep 2025 17:14:42 +0200 Subject: [PATCH] whew --- .../20250415111056_thread_all_statuses.go | 79 +++++-------------- 1 file changed, 20 insertions(+), 59 deletions(-) diff --git a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go index 86fbbd49f..bfa4dd84f 100644 --- a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go +++ b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go @@ -24,6 +24,7 @@ import ( "reflect" "slices" "strings" + "time" "code.superseriousbusiness.org/gotosocial/internal/db" newmodel "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new" @@ -34,7 +35,6 @@ import ( "code.superseriousbusiness.org/gotosocial/internal/log" "code.superseriousbusiness.org/gotosocial/internal/util/xslices" "github.com/uptrace/bun" - "github.com/uptrace/bun/dialect" ) func init() { @@ -62,45 +62,8 @@ func init() { return gtserror.Newf("error adding statuses column thread_id_new: %w", err) } - // Create an index on thread_id_new so we can keep - // track of it as we update, and use it to avoid - // selecting statuses we've already updated. - // - // We'll remove this at the end of the migration. - log.Info(ctx, "creating temporary thread_id_new index") - if db.Dialect().Name() == dialect.PG { - // On Postgres we can use a partial index - // to indicate we only care about default - // value thread IDs (ie., not set yet). - if _, err := db.NewCreateIndex(). - Table("statuses"). - Index("statuses_thread_id_new_idx"). - Column("thread_id_new"). - Where("? = ?", bun.Ident("thread_id_new"), id.Lowest). - Exec(ctx); err != nil { - return gtserror.Newf("error creating temporary thread_id_new index: %w", err) - } - } else { - // On SQLite we can use an index expression - // to do the same thing. While we *can* use - // a partial index for this, an index expression - // is magnitudes faster. Databases! - if _, err := db.NewCreateIndex(). - Table("statuses"). - Index("statuses_thread_id_new_idx"). - ColumnExpr("? = ?", bun.Ident("thread_id_new"), id.Lowest). - Exec(ctx); err != nil { - return gtserror.Newf("error creating temporary thread_id_new index: %w", err) - } - } - - // Attempt to merge any sqlite write-ahead-log. - if err := doWALCheckpoint(ctx, db); err != nil { - return err - } - var sr statusRethreader - var updatedRows int64 + var updatedRowsTotal int64 var maxID string var statuses []*oldmodel.Status @@ -116,21 +79,20 @@ func init() { log.Warnf(ctx, "migrating %d statuses, this may take a *long* time", total) for { + start := time.Now() // Reset slice. clear(statuses) statuses = statuses[:0] - // Select IDs of next batch, paging down - // and skipping statuses to which we've - // already allocated a (new) thread ID. + // Select IDs of next + // batch, paging down. if err := db.NewSelect(). Model(&statuses). Column("id"). - Where("? = ?", bun.Ident("thread_id_new"), id.Lowest). Where("? < ?", bun.Ident("id"), maxID). OrderExpr("? DESC", bun.Ident("id")). - Limit(200). + Limit(250). Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) { return gtserror.Newf("error selecting unthreaded statuses: %w", err) } @@ -146,13 +108,15 @@ func init() { maxID = statuses[l-1].ID // Rethread each selected status in a transaction. + var updatedRowsThisBatch int64 if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { for _, status := range statuses { n, err := sr.rethreadStatus(ctx, tx, status) if err != nil { return gtserror.Newf("error rethreading status %s: %w", status.URI, err) } - updatedRows += n + updatedRowsThisBatch += n + updatedRowsTotal += n } return nil @@ -160,22 +124,26 @@ func init() { return err } - // Show percent migrated. + // Show current speed + percent migrated. // - // Will maybe end up wonky due to approximations + // Percent may end up wonky due to approximations // and batching, so show a generic message at 100%. - percentDone := (float64(updatedRows) / float64(total)) * 100 + timeTaken := time.Since(start).Milliseconds() + msPerRow := float64(timeTaken) / float64(updatedRowsThisBatch) + rowsPerMs := float64(1) / float64(msPerRow) + rowsPerSecond := 1000 * rowsPerMs + percentDone := (float64(updatedRowsTotal) / float64(total)) * 100 if percentDone <= 100 { log.Infof( ctx, - "[updated %d rows] migrated approx. %.2f%% of statuses", - updatedRows, percentDone, + "[updated %d total rows, now @ ~%.0f rows/s] done ~%.2f%% of statuses", + updatedRowsTotal, rowsPerSecond, percentDone, ) } else { log.Infof( ctx, - "[updated %d rows] almost done migrating... ", - updatedRows, + "[updated %d total rows, now @ ~%.0f rows/s] almost done... ", + updatedRowsTotal, rowsPerSecond, ) } } @@ -185,13 +153,6 @@ func init() { return err } - log.Info(ctx, "dropping temporary thread_id_new index") - if _, err := db.NewDropIndex(). - Index("statuses_thread_id_new_idx"). - Exec(ctx); err != nil { - return gtserror.Newf("error dropping temporary thread_id_new index: %w", err) - } - log.Info(ctx, "dropping old thread_to_statuses table") if _, err := db.NewDropTable(). Table("thread_to_statuses").