From 6c04ae231c89095fc43d15daabd6f8cd73788c89 Mon Sep 17 00:00:00 2001 From: tobi Date: Mon, 29 Sep 2025 16:48:46 +0200 Subject: [PATCH] i'm adjusting the PR, pray i don't adjust it further --- .../20250415111056_thread_all_statuses.go | 200 +++++++++++++----- 1 file changed, 150 insertions(+), 50 deletions(-) diff --git a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go index 0e67d1d7d..a4b910fa6 100644 --- a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go +++ b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go @@ -26,7 +26,6 @@ import ( "strings" "time" - "code.superseriousbusiness.org/gotosocial/internal/db" newmodel "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new" oldmodel "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/old" "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/util" @@ -62,44 +61,80 @@ func init() { return gtserror.Newf("error adding statuses column thread_id_new: %w", err) } - var sr statusRethreader - var updatedRowsTotal int64 - var maxID string - var statuses []*oldmodel.Status + if err := doWALCheckpoint(ctx, db); err != nil { + return err + } - // Get a total count of all statuses before migration. - total, err := db.NewSelect().Table("statuses").Count(ctx) + // Get a total count of all + // statuses before migration. + totalStatuses, err := db. + NewSelect(). + Table("statuses"). + Count(ctx) if err != nil { return gtserror.Newf("error getting status table count: %w", err) } + log.Warnf(ctx, "migrating %d statuses total, this may take a *long* time", totalStatuses) - // Start at largest + var sr statusRethreader + var updatedRowsTotal int64 + var statuses []*oldmodel.Status + + // Page starting at largest // possible ULID value. - maxID = id.Highest + var maxID = id.Highest - log.Warnf(ctx, "migrating %d statuses, this may take a *long* time", total) - for { - start := time.Now() + // Open initial transaction. + tx, err := db.BeginTx(ctx, nil) + if err != nil { + return err + } + + for i := 1; ; i++ { // Reset slice. clear(statuses) statuses = statuses[:0] + start := time.Now() + // Select IDs of next // batch, paging down. - if err := db.NewSelect(). + if err := tx.NewSelect(). Model(&statuses). Column("id"). + Where("? IS NULL", bun.Ident("in_reply_to_id")). Where("? < ?", bun.Ident("id"), maxID). OrderExpr("? DESC", bun.Ident("id")). - Limit(200). + Limit(500). Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) { - return gtserror.Newf("error selecting unthreaded statuses: %w", err) + return gtserror.Newf("error selecting top-level statuses: %w", err) + } + + // Every 50 loops, flush wal and begin new + // transaction, to avoid silly wal sizes. + if i%50 == 0 { + if err := tx.Commit(); err != nil { + return err + } + + if err := doWALCheckpoint(ctx, db); err != nil { + return err + } + + tx, err = db.BeginTx(ctx, nil) + if err != nil { + return err + } } // No more statuses! l := len(statuses) if l == 0 { + if err := tx.Commit(); err != nil { + return err + } + log.Info(ctx, "done migrating statuses!") break } @@ -107,52 +142,117 @@ func init() { // Set next maxID value from statuses. maxID = statuses[l-1].ID - // Rethread each selected status in a transaction. + // Rethread inside the transaction. var updatedRowsThisBatch int64 - if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { - for _, status := range statuses { - n, err := sr.rethreadStatus(ctx, tx, status) - if err != nil { - return gtserror.Newf("error rethreading status %s: %w", status.URI, err) - } - updatedRowsThisBatch += n - updatedRowsTotal += n + for _, status := range statuses { + n, err := sr.rethreadStatus(ctx, tx, status) + if err != nil { + return gtserror.Newf("error rethreading status %s: %w", status.URI, err) } - - return nil - }); err != nil { - return err + updatedRowsThisBatch += n + updatedRowsTotal += n } - // Show current speed + percent migrated. - // - // Percent may end up wonky due to approximations - // and batching, so show a generic message at 100%. + // Show speed for this batch. timeTaken := time.Since(start).Milliseconds() msPerRow := float64(timeTaken) / float64(updatedRowsThisBatch) rowsPerMs := float64(1) / float64(msPerRow) rowsPerSecond := 1000 * rowsPerMs - percentDone := (float64(updatedRowsTotal) / float64(total)) * 100 - if percentDone <= 100 { - log.Infof( - ctx, - "[~%.0f rows/s; updated %d total rows] migrated ~%.2f%% of statuses", - rowsPerSecond, updatedRowsTotal, percentDone, - ) - } else { - log.Infof( - ctx, - "[~%.0f rows/s; updated %d total rows] almost done... ", - rowsPerSecond, updatedRowsTotal, - ) - } + + // Show percent migrated overall. + totalDone := (float64(updatedRowsTotal) / float64(totalStatuses)) * 100 + + log.Infof( + ctx, + "[~%.2f%% done; ~%.0f rows/s] paging top-level statuses", + totalDone, rowsPerSecond, + ) } - // Attempt to merge any sqlite write-ahead-log. if err := doWALCheckpoint(ctx, db); err != nil { return err } + // Open a new transaction lads. + tx, err = db.BeginTx(ctx, nil) + if err != nil { + return err + } + + for i := 1; ; i++ { + + // Reset slice. + clear(statuses) + statuses = statuses[:0] + + start := time.Now() + + // Select IDs of stragglers for + // which we haven't set thread_id yet. + if err := tx.NewSelect(). + Model(&statuses). + Column("id"). + Where("? IS NULL", bun.Ident("thread_id")). + Limit(250). + Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) { + return gtserror.Newf("error selecting unthreaded statuses: %w", err) + } + + // Every 50 loops, flush wal and begin new + // transaction, to avoid silly wal sizes. + if i%50 == 0 { + if err := tx.Commit(); err != nil { + return err + } + + if err := doWALCheckpoint(ctx, db); err != nil { + return err + } + + tx, err = db.BeginTx(ctx, nil) + if err != nil { + return err + } + } + + // No more statuses! + l := len(statuses) + if l == 0 { + if err := tx.Commit(); err != nil { + return err + } + + log.Info(ctx, "done migrating statuses!") + break + } + + // Rethread inside the transaction. + var updatedRowsThisBatch int64 + for _, status := range statuses { + n, err := sr.rethreadStatus(ctx, tx, status) + if err != nil { + return gtserror.Newf("error rethreading status %s: %w", status.URI, err) + } + updatedRowsThisBatch += n + updatedRowsTotal += n + } + + // Show speed for this batch. + timeTaken := time.Since(start).Milliseconds() + msPerRow := float64(timeTaken) / float64(updatedRowsThisBatch) + rowsPerMs := float64(1) / float64(msPerRow) + rowsPerSecond := 1000 * rowsPerMs + + // Show percent migrated overall. + totalDone := (float64(updatedRowsTotal) / float64(totalStatuses)) * 100 + + log.Infof( + ctx, + "[~%.2f%% done; ~%.0f rows/s] cleaning up stragglers", + totalDone, rowsPerSecond, + ) + } + log.Info(ctx, "dropping old thread_to_statuses table") if _, err := db.NewDropTable(). Table("thread_to_statuses"). @@ -496,7 +596,7 @@ func (sr *statusRethreader) getParents(ctx context.Context, tx bun.Tx) error { Model(&parent). Column("id", "in_reply_to_id", "thread_id"). Where("? = ?", bun.Ident("id"), id). - Scan(ctx); err != nil && err != db.ErrNoEntries { + Scan(ctx); err != nil && err != sql.ErrNoRows { return err } @@ -535,7 +635,7 @@ func (sr *statusRethreader) getChildren(ctx context.Context, tx bun.Tx, idx int) Model(&sr.statuses). Column("id", "thread_id"). Where("? = ?", bun.Ident("in_reply_to_id"), id). - Scan(ctx); err != nil && err != db.ErrNoEntries { + Scan(ctx); err != nil && err != sql.ErrNoRows { return err } @@ -574,7 +674,7 @@ func (sr *statusRethreader) getStragglers(ctx context.Context, tx bun.Tx, idx in bun.Ident("id"), bun.In(sr.statusIDs), ). - Scan(ctx); err != nil && err != db.ErrNoEntries { + Scan(ctx); err != nil && err != sql.ErrNoRows { return err }