i'm adjusting the PR, pray i don't adjust it further

This commit is contained in:
tobi 2025-09-29 16:48:46 +02:00
commit 6c04ae231c

View file

@ -26,7 +26,6 @@ import (
"strings"
"time"
"code.superseriousbusiness.org/gotosocial/internal/db"
newmodel "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new"
oldmodel "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/old"
"code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/util"
@ -62,44 +61,80 @@ func init() {
return gtserror.Newf("error adding statuses column thread_id_new: %w", err)
}
var sr statusRethreader
var updatedRowsTotal int64
var maxID string
var statuses []*oldmodel.Status
if err := doWALCheckpoint(ctx, db); err != nil {
return err
}
// Get a total count of all statuses before migration.
total, err := db.NewSelect().Table("statuses").Count(ctx)
// Get a total count of all
// statuses before migration.
totalStatuses, err := db.
NewSelect().
Table("statuses").
Count(ctx)
if err != nil {
return gtserror.Newf("error getting status table count: %w", err)
}
log.Warnf(ctx, "migrating %d statuses total, this may take a *long* time", totalStatuses)
// Start at largest
var sr statusRethreader
var updatedRowsTotal int64
var statuses []*oldmodel.Status
// Page starting at largest
// possible ULID value.
maxID = id.Highest
var maxID = id.Highest
log.Warnf(ctx, "migrating %d statuses, this may take a *long* time", total)
for {
start := time.Now()
// Open initial transaction.
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
for i := 1; ; i++ {
// Reset slice.
clear(statuses)
statuses = statuses[:0]
start := time.Now()
// Select IDs of next
// batch, paging down.
if err := db.NewSelect().
if err := tx.NewSelect().
Model(&statuses).
Column("id").
Where("? IS NULL", bun.Ident("in_reply_to_id")).
Where("? < ?", bun.Ident("id"), maxID).
OrderExpr("? DESC", bun.Ident("id")).
Limit(200).
Limit(500).
Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) {
return gtserror.Newf("error selecting unthreaded statuses: %w", err)
return gtserror.Newf("error selecting top-level statuses: %w", err)
}
// Every 50 loops, flush wal and begin new
// transaction, to avoid silly wal sizes.
if i%50 == 0 {
if err := tx.Commit(); err != nil {
return err
}
if err := doWALCheckpoint(ctx, db); err != nil {
return err
}
tx, err = db.BeginTx(ctx, nil)
if err != nil {
return err
}
}
// No more statuses!
l := len(statuses)
if l == 0 {
if err := tx.Commit(); err != nil {
return err
}
log.Info(ctx, "done migrating statuses!")
break
}
@ -107,52 +142,117 @@ func init() {
// Set next maxID value from statuses.
maxID = statuses[l-1].ID
// Rethread each selected status in a transaction.
// Rethread inside the transaction.
var updatedRowsThisBatch int64
if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
for _, status := range statuses {
n, err := sr.rethreadStatus(ctx, tx, status)
if err != nil {
return gtserror.Newf("error rethreading status %s: %w", status.URI, err)
}
updatedRowsThisBatch += n
updatedRowsTotal += n
for _, status := range statuses {
n, err := sr.rethreadStatus(ctx, tx, status)
if err != nil {
return gtserror.Newf("error rethreading status %s: %w", status.URI, err)
}
return nil
}); err != nil {
return err
updatedRowsThisBatch += n
updatedRowsTotal += n
}
// Show current speed + percent migrated.
//
// Percent may end up wonky due to approximations
// and batching, so show a generic message at 100%.
// Show speed for this batch.
timeTaken := time.Since(start).Milliseconds()
msPerRow := float64(timeTaken) / float64(updatedRowsThisBatch)
rowsPerMs := float64(1) / float64(msPerRow)
rowsPerSecond := 1000 * rowsPerMs
percentDone := (float64(updatedRowsTotal) / float64(total)) * 100
if percentDone <= 100 {
log.Infof(
ctx,
"[~%.0f rows/s; updated %d total rows] migrated ~%.2f%% of statuses",
rowsPerSecond, updatedRowsTotal, percentDone,
)
} else {
log.Infof(
ctx,
"[~%.0f rows/s; updated %d total rows] almost done... ",
rowsPerSecond, updatedRowsTotal,
)
}
// Show percent migrated overall.
totalDone := (float64(updatedRowsTotal) / float64(totalStatuses)) * 100
log.Infof(
ctx,
"[~%.2f%% done; ~%.0f rows/s] paging top-level statuses",
totalDone, rowsPerSecond,
)
}
// Attempt to merge any sqlite write-ahead-log.
if err := doWALCheckpoint(ctx, db); err != nil {
return err
}
// Open a new transaction lads.
tx, err = db.BeginTx(ctx, nil)
if err != nil {
return err
}
for i := 1; ; i++ {
// Reset slice.
clear(statuses)
statuses = statuses[:0]
start := time.Now()
// Select IDs of stragglers for
// which we haven't set thread_id yet.
if err := tx.NewSelect().
Model(&statuses).
Column("id").
Where("? IS NULL", bun.Ident("thread_id")).
Limit(250).
Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) {
return gtserror.Newf("error selecting unthreaded statuses: %w", err)
}
// Every 50 loops, flush wal and begin new
// transaction, to avoid silly wal sizes.
if i%50 == 0 {
if err := tx.Commit(); err != nil {
return err
}
if err := doWALCheckpoint(ctx, db); err != nil {
return err
}
tx, err = db.BeginTx(ctx, nil)
if err != nil {
return err
}
}
// No more statuses!
l := len(statuses)
if l == 0 {
if err := tx.Commit(); err != nil {
return err
}
log.Info(ctx, "done migrating statuses!")
break
}
// Rethread inside the transaction.
var updatedRowsThisBatch int64
for _, status := range statuses {
n, err := sr.rethreadStatus(ctx, tx, status)
if err != nil {
return gtserror.Newf("error rethreading status %s: %w", status.URI, err)
}
updatedRowsThisBatch += n
updatedRowsTotal += n
}
// Show speed for this batch.
timeTaken := time.Since(start).Milliseconds()
msPerRow := float64(timeTaken) / float64(updatedRowsThisBatch)
rowsPerMs := float64(1) / float64(msPerRow)
rowsPerSecond := 1000 * rowsPerMs
// Show percent migrated overall.
totalDone := (float64(updatedRowsTotal) / float64(totalStatuses)) * 100
log.Infof(
ctx,
"[~%.2f%% done; ~%.0f rows/s] cleaning up stragglers",
totalDone, rowsPerSecond,
)
}
log.Info(ctx, "dropping old thread_to_statuses table")
if _, err := db.NewDropTable().
Table("thread_to_statuses").
@ -496,7 +596,7 @@ func (sr *statusRethreader) getParents(ctx context.Context, tx bun.Tx) error {
Model(&parent).
Column("id", "in_reply_to_id", "thread_id").
Where("? = ?", bun.Ident("id"), id).
Scan(ctx); err != nil && err != db.ErrNoEntries {
Scan(ctx); err != nil && err != sql.ErrNoRows {
return err
}
@ -535,7 +635,7 @@ func (sr *statusRethreader) getChildren(ctx context.Context, tx bun.Tx, idx int)
Model(&sr.statuses).
Column("id", "thread_id").
Where("? = ?", bun.Ident("in_reply_to_id"), id).
Scan(ctx); err != nil && err != db.ErrNoEntries {
Scan(ctx); err != nil && err != sql.ErrNoRows {
return err
}
@ -574,7 +674,7 @@ func (sr *statusRethreader) getStragglers(ctx context.Context, tx bun.Tx, idx in
bun.Ident("id"),
bun.In(sr.statusIDs),
).
Scan(ctx); err != nil && err != db.ErrNoEntries {
Scan(ctx); err != nil && err != sql.ErrNoRows {
return err
}