that'll do

This commit is contained in:
tobi 2025-09-30 14:50:21 +02:00 committed by tobi
commit 2563568ccc
2 changed files with 86 additions and 96 deletions

View file

@ -538,23 +538,21 @@ It may be useful when testing or debugging migrations to be able to run them aga
Basic steps for this: Basic steps for this:
1. Dump the Postgres database on the remote machine, and copy the dump over to your development machine. First dump the Postgres database on the remote machine, and copy the dump over to your development machine.
2. Create a local Postgres container and mount the dump into it with, for example:
```bash Now create a local Postgres container and mount the dump into it with, for example:
docker run -it --name postgres --network host -e POSTGRES_PASSWORD=postgres -v /path/to/db_dump:/db_dump postgres
```
3. Get a terminal inside the running container:
```bash ```bash
docker exec -it --user postgres postgres bash docker run -it --name postgres --network host -e POSTGRES_PASSWORD=postgres -v /path/to/db_dump:/db_dump postgres
``` ```
4. Using that terminal, restore the dump (this will probably take a little while depending on the dump size and the specs of your machine):
```bash In a separate terminal window, execute a command inside the running container to load the dump into the "postgres" database:
psql -X postgres < /db_dump
``` ```bash
5. With the Postgres container still running, run GoToSocial and point it towards the container. Use the appropriate `GTS_HOST` (and `GTS_ACCOUNT_DOMAIN`) values for the instance you dumped: docker exec -it --user postgres postgres psql -X -f /db_dump postgres
```
With the Postgres container still running, run GoToSocial and point it towards the container. Use the appropriate `GTS_HOST` (and `GTS_ACCOUNT_DOMAIN`) values for the instance you dumped:
```bash ```bash
GTS_HOST=example.org \ GTS_HOST=example.org \

View file

@ -96,7 +96,7 @@ func init() {
clear(statuses) clear(statuses)
statuses = statuses[:0] statuses = statuses[:0]
start := time.Now() batchStart := time.Now()
// Select IDs of next // Select IDs of next
// batch, paging down. // batch, paging down.
@ -106,46 +106,51 @@ func init() {
Where("? IS NULL", bun.Ident("in_reply_to_id")). Where("? IS NULL", bun.Ident("in_reply_to_id")).
Where("? < ?", bun.Ident("id"), maxID). Where("? < ?", bun.Ident("id"), maxID).
OrderExpr("? DESC", bun.Ident("id")). OrderExpr("? DESC", bun.Ident("id")).
Limit(100). Limit(500).
Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) { Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) {
return gtserror.Newf("error selecting top-level statuses: %w", err) return gtserror.Newf("error selecting statuses: %w", err)
} }
// Every 50 loops, flush wal and begin new l := len(statuses)
// transaction, to avoid silly wal sizes. if l == 0 {
if i%50 == 0 { // No more statuses!
//
// Transaction will be closed
// after leaving the loop.
break
} else if i%100 == 0 {
// Begin a new transaction every
// 100 batches (~50000 statuses),
// to avoid massive commits.
// Close existing transaction.
if err := tx.Commit(); err != nil { if err := tx.Commit(); err != nil {
return err return err
} }
// Try to flush the wal
// to avoid silly wal sizes.
if err := doWALCheckpoint(ctx, db); err != nil { if err := doWALCheckpoint(ctx, db); err != nil {
return err return err
} }
// Open new transaction.
tx, err = db.BeginTx(ctx, nil) tx, err = db.BeginTx(ctx, nil)
if err != nil { if err != nil {
return err return err
} }
} }
// No more statuses! // Set next maxID
l := len(statuses) // value from statuses.
if l == 0 {
if err := tx.Commit(); err != nil {
return err
}
log.Info(ctx, "done migrating statuses!")
break
}
// Set next maxID value from statuses.
maxID = statuses[l-1].ID maxID = statuses[l-1].ID
// Rethread inside the transaction. // Rethread using the
// open transaction.
var updatedRowsThisBatch int64 var updatedRowsThisBatch int64
for _, status := range statuses { for _, status := range statuses {
n, err := sr.rethreadStatus(ctx, tx, status) n, err := sr.rethreadStatus(ctx, tx, status, false)
if err != nil { if err != nil {
return gtserror.Newf("error rethreading status %s: %w", status.URI, err) return gtserror.Newf("error rethreading status %s: %w", status.URI, err)
} }
@ -154,7 +159,7 @@ func init() {
} }
// Show speed for this batch. // Show speed for this batch.
timeTaken := time.Since(start).Milliseconds() timeTaken := time.Since(batchStart).Milliseconds()
msPerRow := float64(timeTaken) / float64(updatedRowsThisBatch) msPerRow := float64(timeTaken) / float64(updatedRowsThisBatch)
rowsPerMs := float64(1) / float64(msPerRow) rowsPerMs := float64(1) / float64(msPerRow)
rowsPerSecond := 1000 * rowsPerMs rowsPerSecond := 1000 * rowsPerMs
@ -164,94 +169,73 @@ func init() {
log.Infof( log.Infof(
ctx, ctx,
"[~%.2f%% done; ~%.0f rows/s] paging top-level statuses", "[~%.2f%% done; ~%.0f rows/s] migrating threads",
totalDone, rowsPerSecond, totalDone, rowsPerSecond,
) )
} }
if err := doWALCheckpoint(ctx, db); err != nil { // Close transaction.
if err := tx.Commit(); err != nil {
return err return err
} }
// Reset max ID. // Create a partial index on thread_id_new to find stragglers.
maxID = id.Highest // This index will be removed at the end of the migration.
// Create a temporary index on thread_id_new for stragglers.
log.Info(ctx, "creating temporary statuses thread_id_new index") log.Info(ctx, "creating temporary statuses thread_id_new index")
if _, err := db.NewCreateIndex(). if _, err := db.NewCreateIndex().
Table("statuses"). Table("statuses").
Index("statuses_thread_id_new_idx"). Index("statuses_thread_id_new_idx").
Column("thread_id_new"). Column("thread_id_new").
Where("? = ?", bun.Ident("thread_id_new"), id.Lowest).
Exec(ctx); err != nil { Exec(ctx); err != nil {
return gtserror.Newf("error creating new thread_id index: %w", err) return gtserror.Newf("error creating new thread_id index: %w", err)
} }
// Open a new transaction lads.
tx, err = db.BeginTx(ctx, nil)
if err != nil {
return err
}
for i := 1; ; i++ { for i := 1; ; i++ {
// Reset slice. // Reset slice.
clear(statuses) clear(statuses)
statuses = statuses[:0] statuses = statuses[:0]
start := time.Now() batchStart := time.Now()
// Select IDs of stragglers for // Get stragglers for which
// which we haven't set thread_id yet. // we haven't set thread ID yet.
if err := tx.NewSelect(). if err := db.NewSelect().
Model(&statuses). Model(&statuses).
Column("id"). Column("id").
Where("? = ?", bun.Ident("thread_id_new"), id.Lowest). Where("? = ?", bun.Ident("thread_id_new"), id.Lowest).
Limit(500). Limit(250).
Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) { Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) {
return gtserror.Newf("error selecting unthreaded statuses: %w", err) return gtserror.Newf("error selecting straggler: %w", err)
} }
// Every 50 loops, flush wal and begin new if len(statuses) == 0 {
// transaction, to avoid silly wal sizes. // No more
if i%50 == 0 { // statuses!
if err := tx.Commit(); err != nil {
return err
}
if err := doWALCheckpoint(ctx, db); err != nil {
return err
}
tx, err = db.BeginTx(ctx, nil)
if err != nil {
return err
}
}
// No more statuses!
l := len(statuses)
if l == 0 {
if err := tx.Commit(); err != nil {
return err
}
log.Info(ctx, "done migrating statuses!")
break break
} }
// Rethread inside the transaction. // Update this batch
// inside a transaction.
var updatedRowsThisBatch int64 var updatedRowsThisBatch int64
for _, status := range statuses { if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
n, err := sr.rethreadStatus(ctx, tx, status) for _, status := range statuses {
if err != nil {
return gtserror.Newf("error rethreading status %s: %w", status.URI, err) n, err := sr.rethreadStatus(ctx, tx, status, true)
if err != nil {
return gtserror.Newf("error rethreading status %s: %w", status.URI, err)
}
updatedRowsThisBatch += n
updatedRowsTotal += n
} }
updatedRowsThisBatch += n return nil
updatedRowsTotal += n }); err != nil {
return err
} }
// Show speed for this batch. // Show speed for this batch.
timeTaken := time.Since(start).Milliseconds() timeTaken := time.Since(batchStart).Milliseconds()
msPerRow := float64(timeTaken) / float64(updatedRowsThisBatch) msPerRow := float64(timeTaken) / float64(updatedRowsThisBatch)
rowsPerMs := float64(1) / float64(msPerRow) rowsPerMs := float64(1) / float64(msPerRow)
rowsPerSecond := 1000 * rowsPerMs rowsPerSecond := 1000 * rowsPerMs
@ -261,11 +245,16 @@ func init() {
log.Infof( log.Infof(
ctx, ctx,
"[~%.2f%% done; ~%.0f rows/s] cleaning up stragglers", "[~%.2f%% done; ~%.0f rows/s] migrating stragglers",
totalDone, rowsPerSecond, totalDone, rowsPerSecond,
) )
} }
// Try to merge everything we've done so far.
if err := doWALCheckpoint(ctx, db); err != nil {
return err
}
log.Info(ctx, "dropping temporary thread_id_new index") log.Info(ctx, "dropping temporary thread_id_new index")
if _, err := db.NewDropIndex(). if _, err := db.NewDropIndex().
Index("statuses_thread_id_new_idx"). Index("statuses_thread_id_new_idx").
@ -363,7 +352,7 @@ type statusRethreader struct {
// rethreadStatus is the main logic handler for statusRethreader{}. this is what gets called from the migration // rethreadStatus is the main logic handler for statusRethreader{}. this is what gets called from the migration
// in order to trigger a status rethreading operation for the given status, returning total number of rows changed. // in order to trigger a status rethreading operation for the given status, returning total number of rows changed.
func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, status *oldmodel.Status) (int64, error) { func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, status *oldmodel.Status, straggler bool) (int64, error) {
// Zero slice and // Zero slice and
// map ptr values. // map ptr values.
@ -405,11 +394,11 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu
} }
// Set up-to-date values on the status. // Set up-to-date values on the status.
if inReplyToID, ok := upToDateValues["in_reply_to_id"]; ok && inReplyToID != nil { if v, ok := upToDateValues["in_reply_to_id"]; ok && v != nil {
status.InReplyToID = inReplyToID.(string) status.InReplyToID = v.(string)
} }
if threadID, ok := upToDateValues["thread_id"]; ok && threadID != nil { if v, ok := upToDateValues["thread_id"]; ok && v != nil {
status.ThreadID = threadID.(string) status.ThreadID = v.(string)
} }
// status and thread ID cursor // status and thread ID cursor
@ -463,7 +452,10 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu
// Check for the case where the entire // Check for the case where the entire
// batch of statuses is already correctly // batch of statuses is already correctly
// threaded. Then we have nothing to do! // threaded. Then we have nothing to do!
if sr.allThreaded && len(sr.threadIDs) == 1 { //
// Skip this check for straggler statuses
// that are part of broken threads.
if !straggler && sr.allThreaded && len(sr.threadIDs) == 1 {
log.Debug(ctx, "skipping just rethreaded thread") log.Debug(ctx, "skipping just rethreaded thread")
return 0, nil return 0, nil
} }