mirror of
https://github.com/superseriousbusiness/gotosocial.git
synced 2025-10-29 08:32:24 -05:00
i'm adjusting the PR, pray i don't adjust it further
This commit is contained in:
parent
4fd0bdcf2f
commit
1725769733
1 changed files with 150 additions and 50 deletions
|
|
@ -26,7 +26,6 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"code.superseriousbusiness.org/gotosocial/internal/db"
|
||||
newmodel "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new"
|
||||
oldmodel "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/old"
|
||||
"code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/util"
|
||||
|
|
@ -62,44 +61,80 @@ func init() {
|
|||
return gtserror.Newf("error adding statuses column thread_id_new: %w", err)
|
||||
}
|
||||
|
||||
var sr statusRethreader
|
||||
var updatedRowsTotal int64
|
||||
var maxID string
|
||||
var statuses []*oldmodel.Status
|
||||
if err := doWALCheckpoint(ctx, db); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Get a total count of all statuses before migration.
|
||||
total, err := db.NewSelect().Table("statuses").Count(ctx)
|
||||
// Get a total count of all
|
||||
// statuses before migration.
|
||||
totalStatuses, err := db.
|
||||
NewSelect().
|
||||
Table("statuses").
|
||||
Count(ctx)
|
||||
if err != nil {
|
||||
return gtserror.Newf("error getting status table count: %w", err)
|
||||
}
|
||||
log.Warnf(ctx, "migrating %d statuses total, this may take a *long* time", totalStatuses)
|
||||
|
||||
// Start at largest
|
||||
var sr statusRethreader
|
||||
var updatedRowsTotal int64
|
||||
var statuses []*oldmodel.Status
|
||||
|
||||
// Page starting at largest
|
||||
// possible ULID value.
|
||||
maxID = id.Highest
|
||||
var maxID = id.Highest
|
||||
|
||||
log.Warnf(ctx, "migrating %d statuses, this may take a *long* time", total)
|
||||
for {
|
||||
start := time.Now()
|
||||
// Open initial transaction.
|
||||
tx, err := db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for i := 1; ; i++ {
|
||||
|
||||
// Reset slice.
|
||||
clear(statuses)
|
||||
statuses = statuses[:0]
|
||||
|
||||
start := time.Now()
|
||||
|
||||
// Select IDs of next
|
||||
// batch, paging down.
|
||||
if err := db.NewSelect().
|
||||
if err := tx.NewSelect().
|
||||
Model(&statuses).
|
||||
Column("id").
|
||||
Where("? IS NULL", bun.Ident("in_reply_to_id")).
|
||||
Where("? < ?", bun.Ident("id"), maxID).
|
||||
OrderExpr("? DESC", bun.Ident("id")).
|
||||
Limit(200).
|
||||
Limit(500).
|
||||
Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) {
|
||||
return gtserror.Newf("error selecting unthreaded statuses: %w", err)
|
||||
return gtserror.Newf("error selecting top-level statuses: %w", err)
|
||||
}
|
||||
|
||||
// Every 50 loops, flush wal and begin new
|
||||
// transaction, to avoid silly wal sizes.
|
||||
if i%50 == 0 {
|
||||
if err := tx.Commit(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := doWALCheckpoint(ctx, db); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tx, err = db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// No more statuses!
|
||||
l := len(statuses)
|
||||
if l == 0 {
|
||||
if err := tx.Commit(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Info(ctx, "done migrating statuses!")
|
||||
break
|
||||
}
|
||||
|
|
@ -107,9 +142,8 @@ func init() {
|
|||
// Set next maxID value from statuses.
|
||||
maxID = statuses[l-1].ID
|
||||
|
||||
// Rethread each selected status in a transaction.
|
||||
// Rethread inside the transaction.
|
||||
var updatedRowsThisBatch int64
|
||||
if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
|
||||
for _, status := range statuses {
|
||||
n, err := sr.rethreadStatus(ctx, tx, status)
|
||||
if err != nil {
|
||||
|
|
@ -119,40 +153,106 @@ func init() {
|
|||
updatedRowsTotal += n
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Show current speed + percent migrated.
|
||||
//
|
||||
// Percent may end up wonky due to approximations
|
||||
// and batching, so show a generic message at 100%.
|
||||
// Show speed for this batch.
|
||||
timeTaken := time.Since(start).Milliseconds()
|
||||
msPerRow := float64(timeTaken) / float64(updatedRowsThisBatch)
|
||||
rowsPerMs := float64(1) / float64(msPerRow)
|
||||
rowsPerSecond := 1000 * rowsPerMs
|
||||
percentDone := (float64(updatedRowsTotal) / float64(total)) * 100
|
||||
if percentDone <= 100 {
|
||||
|
||||
// Show percent migrated overall.
|
||||
totalDone := (float64(updatedRowsTotal) / float64(totalStatuses)) * 100
|
||||
|
||||
log.Infof(
|
||||
ctx,
|
||||
"[~%.0f rows/s; updated %d total rows] migrated ~%.2f%% of statuses",
|
||||
rowsPerSecond, updatedRowsTotal, percentDone,
|
||||
)
|
||||
} else {
|
||||
log.Infof(
|
||||
ctx,
|
||||
"[~%.0f rows/s; updated %d total rows] almost done... ",
|
||||
rowsPerSecond, updatedRowsTotal,
|
||||
"[~%.2f%% done; ~%.0f rows/s] paging top-level statuses",
|
||||
totalDone, rowsPerSecond,
|
||||
)
|
||||
}
|
||||
|
||||
if err := doWALCheckpoint(ctx, db); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Open a new transaction lads.
|
||||
tx, err = db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for i := 1; ; i++ {
|
||||
|
||||
// Reset slice.
|
||||
clear(statuses)
|
||||
statuses = statuses[:0]
|
||||
|
||||
start := time.Now()
|
||||
|
||||
// Select IDs of stragglers for
|
||||
// which we haven't set thread_id yet.
|
||||
if err := tx.NewSelect().
|
||||
Model(&statuses).
|
||||
Column("id").
|
||||
Where("? IS NULL", bun.Ident("thread_id")).
|
||||
Limit(250).
|
||||
Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) {
|
||||
return gtserror.Newf("error selecting unthreaded statuses: %w", err)
|
||||
}
|
||||
|
||||
// Every 50 loops, flush wal and begin new
|
||||
// transaction, to avoid silly wal sizes.
|
||||
if i%50 == 0 {
|
||||
if err := tx.Commit(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := doWALCheckpoint(ctx, db); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tx, err = db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Attempt to merge any sqlite write-ahead-log.
|
||||
if err := doWALCheckpoint(ctx, db); err != nil {
|
||||
// No more statuses!
|
||||
l := len(statuses)
|
||||
if l == 0 {
|
||||
if err := tx.Commit(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Info(ctx, "done migrating statuses!")
|
||||
break
|
||||
}
|
||||
|
||||
// Rethread inside the transaction.
|
||||
var updatedRowsThisBatch int64
|
||||
for _, status := range statuses {
|
||||
n, err := sr.rethreadStatus(ctx, tx, status)
|
||||
if err != nil {
|
||||
return gtserror.Newf("error rethreading status %s: %w", status.URI, err)
|
||||
}
|
||||
updatedRowsThisBatch += n
|
||||
updatedRowsTotal += n
|
||||
}
|
||||
|
||||
// Show speed for this batch.
|
||||
timeTaken := time.Since(start).Milliseconds()
|
||||
msPerRow := float64(timeTaken) / float64(updatedRowsThisBatch)
|
||||
rowsPerMs := float64(1) / float64(msPerRow)
|
||||
rowsPerSecond := 1000 * rowsPerMs
|
||||
|
||||
// Show percent migrated overall.
|
||||
totalDone := (float64(updatedRowsTotal) / float64(totalStatuses)) * 100
|
||||
|
||||
log.Infof(
|
||||
ctx,
|
||||
"[~%.2f%% done; ~%.0f rows/s] cleaning up stragglers",
|
||||
totalDone, rowsPerSecond,
|
||||
)
|
||||
}
|
||||
|
||||
log.Info(ctx, "dropping old thread_to_statuses table")
|
||||
if _, err := db.NewDropTable().
|
||||
Table("thread_to_statuses").
|
||||
|
|
@ -496,7 +596,7 @@ func (sr *statusRethreader) getParents(ctx context.Context, tx bun.Tx) error {
|
|||
Model(&parent).
|
||||
Column("id", "in_reply_to_id", "thread_id").
|
||||
Where("? = ?", bun.Ident("id"), id).
|
||||
Scan(ctx); err != nil && err != db.ErrNoEntries {
|
||||
Scan(ctx); err != nil && err != sql.ErrNoRows {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
@ -535,7 +635,7 @@ func (sr *statusRethreader) getChildren(ctx context.Context, tx bun.Tx, idx int)
|
|||
Model(&sr.statuses).
|
||||
Column("id", "thread_id").
|
||||
Where("? = ?", bun.Ident("in_reply_to_id"), id).
|
||||
Scan(ctx); err != nil && err != db.ErrNoEntries {
|
||||
Scan(ctx); err != nil && err != sql.ErrNoRows {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
@ -574,7 +674,7 @@ func (sr *statusRethreader) getStragglers(ctx context.Context, tx bun.Tx, idx in
|
|||
bun.Ident("id"),
|
||||
bun.In(sr.statusIDs),
|
||||
).
|
||||
Scan(ctx); err != nil && err != db.ErrNoEntries {
|
||||
Scan(ctx); err != nil && err != sql.ErrNoRows {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue