From 2563568ccce15de8925355ea4d29ea9d59ef6945 Mon Sep 17 00:00:00 2001 From: tobi Date: Tue, 30 Sep 2025 14:50:21 +0200 Subject: [PATCH] that'll do --- CONTRIBUTING.md | 32 ++-- .../20250415111056_thread_all_statuses.go | 150 +++++++++--------- 2 files changed, 86 insertions(+), 96 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 7359d65fa..10a05002c 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -538,23 +538,21 @@ It may be useful when testing or debugging migrations to be able to run them aga Basic steps for this: -1. Dump the Postgres database on the remote machine, and copy the dump over to your development machine. -2. Create a local Postgres container and mount the dump into it with, for example: - - ```bash - docker run -it --name postgres --network host -e POSTGRES_PASSWORD=postgres -v /path/to/db_dump:/db_dump postgres - ``` -3. Get a terminal inside the running container: - - ```bash - docker exec -it --user postgres postgres bash - ``` -4. Using that terminal, restore the dump (this will probably take a little while depending on the dump size and the specs of your machine): - - ```bash - psql -X postgres < /db_dump - ``` -5. With the Postgres container still running, run GoToSocial and point it towards the container. Use the appropriate `GTS_HOST` (and `GTS_ACCOUNT_DOMAIN`) values for the instance you dumped: +First dump the Postgres database on the remote machine, and copy the dump over to your development machine. + +Now create a local Postgres container and mount the dump into it with, for example: + +```bash +docker run -it --name postgres --network host -e POSTGRES_PASSWORD=postgres -v /path/to/db_dump:/db_dump postgres +``` + +In a separate terminal window, execute a command inside the running container to load the dump into the "postgres" database: + +```bash +docker exec -it --user postgres postgres psql -X -f /db_dump postgres +``` + +With the Postgres container still running, run GoToSocial and point it towards the container. Use the appropriate `GTS_HOST` (and `GTS_ACCOUNT_DOMAIN`) values for the instance you dumped: ```bash GTS_HOST=example.org \ diff --git a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go index daf392ee6..9115dfe90 100644 --- a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go +++ b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go @@ -96,7 +96,7 @@ func init() { clear(statuses) statuses = statuses[:0] - start := time.Now() + batchStart := time.Now() // Select IDs of next // batch, paging down. @@ -106,46 +106,51 @@ func init() { Where("? IS NULL", bun.Ident("in_reply_to_id")). Where("? < ?", bun.Ident("id"), maxID). OrderExpr("? DESC", bun.Ident("id")). - Limit(100). + Limit(500). Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) { - return gtserror.Newf("error selecting top-level statuses: %w", err) + return gtserror.Newf("error selecting statuses: %w", err) } - // Every 50 loops, flush wal and begin new - // transaction, to avoid silly wal sizes. - if i%50 == 0 { + l := len(statuses) + if l == 0 { + // No more statuses! + // + // Transaction will be closed + // after leaving the loop. + break + + } else if i%100 == 0 { + // Begin a new transaction every + // 100 batches (~50000 statuses), + // to avoid massive commits. + + // Close existing transaction. if err := tx.Commit(); err != nil { return err } + // Try to flush the wal + // to avoid silly wal sizes. if err := doWALCheckpoint(ctx, db); err != nil { return err } + // Open new transaction. tx, err = db.BeginTx(ctx, nil) if err != nil { return err } } - // No more statuses! - l := len(statuses) - if l == 0 { - if err := tx.Commit(); err != nil { - return err - } - - log.Info(ctx, "done migrating statuses!") - break - } - - // Set next maxID value from statuses. + // Set next maxID + // value from statuses. maxID = statuses[l-1].ID - // Rethread inside the transaction. + // Rethread using the + // open transaction. var updatedRowsThisBatch int64 for _, status := range statuses { - n, err := sr.rethreadStatus(ctx, tx, status) + n, err := sr.rethreadStatus(ctx, tx, status, false) if err != nil { return gtserror.Newf("error rethreading status %s: %w", status.URI, err) } @@ -154,7 +159,7 @@ func init() { } // Show speed for this batch. - timeTaken := time.Since(start).Milliseconds() + timeTaken := time.Since(batchStart).Milliseconds() msPerRow := float64(timeTaken) / float64(updatedRowsThisBatch) rowsPerMs := float64(1) / float64(msPerRow) rowsPerSecond := 1000 * rowsPerMs @@ -164,94 +169,73 @@ func init() { log.Infof( ctx, - "[~%.2f%% done; ~%.0f rows/s] paging top-level statuses", + "[~%.2f%% done; ~%.0f rows/s] migrating threads", totalDone, rowsPerSecond, ) } - if err := doWALCheckpoint(ctx, db); err != nil { + // Close transaction. + if err := tx.Commit(); err != nil { return err } - // Reset max ID. - maxID = id.Highest - - // Create a temporary index on thread_id_new for stragglers. + // Create a partial index on thread_id_new to find stragglers. + // This index will be removed at the end of the migration. log.Info(ctx, "creating temporary statuses thread_id_new index") if _, err := db.NewCreateIndex(). Table("statuses"). Index("statuses_thread_id_new_idx"). Column("thread_id_new"). + Where("? = ?", bun.Ident("thread_id_new"), id.Lowest). Exec(ctx); err != nil { return gtserror.Newf("error creating new thread_id index: %w", err) } - // Open a new transaction lads. - tx, err = db.BeginTx(ctx, nil) - if err != nil { - return err - } - for i := 1; ; i++ { // Reset slice. clear(statuses) statuses = statuses[:0] - start := time.Now() + batchStart := time.Now() - // Select IDs of stragglers for - // which we haven't set thread_id yet. - if err := tx.NewSelect(). + // Get stragglers for which + // we haven't set thread ID yet. + if err := db.NewSelect(). Model(&statuses). Column("id"). Where("? = ?", bun.Ident("thread_id_new"), id.Lowest). - Limit(500). + Limit(250). Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) { - return gtserror.Newf("error selecting unthreaded statuses: %w", err) + return gtserror.Newf("error selecting straggler: %w", err) } - // Every 50 loops, flush wal and begin new - // transaction, to avoid silly wal sizes. - if i%50 == 0 { - if err := tx.Commit(); err != nil { - return err - } - - if err := doWALCheckpoint(ctx, db); err != nil { - return err - } - - tx, err = db.BeginTx(ctx, nil) - if err != nil { - return err - } - } - - // No more statuses! - l := len(statuses) - if l == 0 { - if err := tx.Commit(); err != nil { - return err - } - - log.Info(ctx, "done migrating statuses!") + if len(statuses) == 0 { + // No more + // statuses! break } - // Rethread inside the transaction. + // Update this batch + // inside a transaction. var updatedRowsThisBatch int64 - for _, status := range statuses { - n, err := sr.rethreadStatus(ctx, tx, status) - if err != nil { - return gtserror.Newf("error rethreading status %s: %w", status.URI, err) + if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { + for _, status := range statuses { + + n, err := sr.rethreadStatus(ctx, tx, status, true) + if err != nil { + return gtserror.Newf("error rethreading status %s: %w", status.URI, err) + } + updatedRowsThisBatch += n + updatedRowsTotal += n } - updatedRowsThisBatch += n - updatedRowsTotal += n + return nil + }); err != nil { + return err } // Show speed for this batch. - timeTaken := time.Since(start).Milliseconds() + timeTaken := time.Since(batchStart).Milliseconds() msPerRow := float64(timeTaken) / float64(updatedRowsThisBatch) rowsPerMs := float64(1) / float64(msPerRow) rowsPerSecond := 1000 * rowsPerMs @@ -261,11 +245,16 @@ func init() { log.Infof( ctx, - "[~%.2f%% done; ~%.0f rows/s] cleaning up stragglers", + "[~%.2f%% done; ~%.0f rows/s] migrating stragglers", totalDone, rowsPerSecond, ) } + // Try to merge everything we've done so far. + if err := doWALCheckpoint(ctx, db); err != nil { + return err + } + log.Info(ctx, "dropping temporary thread_id_new index") if _, err := db.NewDropIndex(). Index("statuses_thread_id_new_idx"). @@ -363,7 +352,7 @@ type statusRethreader struct { // rethreadStatus is the main logic handler for statusRethreader{}. this is what gets called from the migration // in order to trigger a status rethreading operation for the given status, returning total number of rows changed. -func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, status *oldmodel.Status) (int64, error) { +func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, status *oldmodel.Status, straggler bool) (int64, error) { // Zero slice and // map ptr values. @@ -405,11 +394,11 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu } // Set up-to-date values on the status. - if inReplyToID, ok := upToDateValues["in_reply_to_id"]; ok && inReplyToID != nil { - status.InReplyToID = inReplyToID.(string) + if v, ok := upToDateValues["in_reply_to_id"]; ok && v != nil { + status.InReplyToID = v.(string) } - if threadID, ok := upToDateValues["thread_id"]; ok && threadID != nil { - status.ThreadID = threadID.(string) + if v, ok := upToDateValues["thread_id"]; ok && v != nil { + status.ThreadID = v.(string) } // status and thread ID cursor @@ -463,7 +452,10 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu // Check for the case where the entire // batch of statuses is already correctly // threaded. Then we have nothing to do! - if sr.allThreaded && len(sr.threadIDs) == 1 { + // + // Skip this check for straggler statuses + // that are part of broken threads. + if !straggler && sr.allThreaded && len(sr.threadIDs) == 1 { log.Debug(ctx, "skipping just rethreaded thread") return 0, nil }