mirror of
https://github.com/superseriousbusiness/gotosocial.git
synced 2025-10-29 17:22:24 -05:00
whew
This commit is contained in:
parent
cde373143a
commit
c7b0150834
1 changed files with 20 additions and 59 deletions
|
|
@ -24,6 +24,7 @@ import (
|
|||
"reflect"
|
||||
"slices"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"code.superseriousbusiness.org/gotosocial/internal/db"
|
||||
newmodel "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new"
|
||||
|
|
@ -34,7 +35,6 @@ import (
|
|||
"code.superseriousbusiness.org/gotosocial/internal/log"
|
||||
"code.superseriousbusiness.org/gotosocial/internal/util/xslices"
|
||||
"github.com/uptrace/bun"
|
||||
"github.com/uptrace/bun/dialect"
|
||||
)
|
||||
|
||||
func init() {
|
||||
|
|
@ -62,45 +62,8 @@ func init() {
|
|||
return gtserror.Newf("error adding statuses column thread_id_new: %w", err)
|
||||
}
|
||||
|
||||
// Create an index on thread_id_new so we can keep
|
||||
// track of it as we update, and use it to avoid
|
||||
// selecting statuses we've already updated.
|
||||
//
|
||||
// We'll remove this at the end of the migration.
|
||||
log.Info(ctx, "creating temporary thread_id_new index")
|
||||
if db.Dialect().Name() == dialect.PG {
|
||||
// On Postgres we can use a partial index
|
||||
// to indicate we only care about default
|
||||
// value thread IDs (ie., not set yet).
|
||||
if _, err := db.NewCreateIndex().
|
||||
Table("statuses").
|
||||
Index("statuses_thread_id_new_idx").
|
||||
Column("thread_id_new").
|
||||
Where("? = ?", bun.Ident("thread_id_new"), id.Lowest).
|
||||
Exec(ctx); err != nil {
|
||||
return gtserror.Newf("error creating temporary thread_id_new index: %w", err)
|
||||
}
|
||||
} else {
|
||||
// On SQLite we can use an index expression
|
||||
// to do the same thing. While we *can* use
|
||||
// a partial index for this, an index expression
|
||||
// is magnitudes faster. Databases!
|
||||
if _, err := db.NewCreateIndex().
|
||||
Table("statuses").
|
||||
Index("statuses_thread_id_new_idx").
|
||||
ColumnExpr("? = ?", bun.Ident("thread_id_new"), id.Lowest).
|
||||
Exec(ctx); err != nil {
|
||||
return gtserror.Newf("error creating temporary thread_id_new index: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Attempt to merge any sqlite write-ahead-log.
|
||||
if err := doWALCheckpoint(ctx, db); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var sr statusRethreader
|
||||
var updatedRows int64
|
||||
var updatedRowsTotal int64
|
||||
var maxID string
|
||||
var statuses []*oldmodel.Status
|
||||
|
||||
|
|
@ -116,21 +79,20 @@ func init() {
|
|||
|
||||
log.Warnf(ctx, "migrating %d statuses, this may take a *long* time", total)
|
||||
for {
|
||||
start := time.Now()
|
||||
|
||||
// Reset slice.
|
||||
clear(statuses)
|
||||
statuses = statuses[:0]
|
||||
|
||||
// Select IDs of next batch, paging down
|
||||
// and skipping statuses to which we've
|
||||
// already allocated a (new) thread ID.
|
||||
// Select IDs of next
|
||||
// batch, paging down.
|
||||
if err := db.NewSelect().
|
||||
Model(&statuses).
|
||||
Column("id").
|
||||
Where("? = ?", bun.Ident("thread_id_new"), id.Lowest).
|
||||
Where("? < ?", bun.Ident("id"), maxID).
|
||||
OrderExpr("? DESC", bun.Ident("id")).
|
||||
Limit(200).
|
||||
Limit(250).
|
||||
Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) {
|
||||
return gtserror.Newf("error selecting unthreaded statuses: %w", err)
|
||||
}
|
||||
|
|
@ -146,13 +108,15 @@ func init() {
|
|||
maxID = statuses[l-1].ID
|
||||
|
||||
// Rethread each selected status in a transaction.
|
||||
var updatedRowsThisBatch int64
|
||||
if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
|
||||
for _, status := range statuses {
|
||||
n, err := sr.rethreadStatus(ctx, tx, status)
|
||||
if err != nil {
|
||||
return gtserror.Newf("error rethreading status %s: %w", status.URI, err)
|
||||
}
|
||||
updatedRows += n
|
||||
updatedRowsThisBatch += n
|
||||
updatedRowsTotal += n
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
@ -160,22 +124,26 @@ func init() {
|
|||
return err
|
||||
}
|
||||
|
||||
// Show percent migrated.
|
||||
// Show current speed + percent migrated.
|
||||
//
|
||||
// Will maybe end up wonky due to approximations
|
||||
// Percent may end up wonky due to approximations
|
||||
// and batching, so show a generic message at 100%.
|
||||
percentDone := (float64(updatedRows) / float64(total)) * 100
|
||||
timeTaken := time.Since(start).Milliseconds()
|
||||
msPerRow := float64(timeTaken) / float64(updatedRowsThisBatch)
|
||||
rowsPerMs := float64(1) / float64(msPerRow)
|
||||
rowsPerSecond := 1000 * rowsPerMs
|
||||
percentDone := (float64(updatedRowsTotal) / float64(total)) * 100
|
||||
if percentDone <= 100 {
|
||||
log.Infof(
|
||||
ctx,
|
||||
"[updated %d rows] migrated approx. %.2f%% of statuses",
|
||||
updatedRows, percentDone,
|
||||
"[updated %d total rows, now @ ~%.0f rows/s] done ~%.2f%% of statuses",
|
||||
updatedRowsTotal, rowsPerSecond, percentDone,
|
||||
)
|
||||
} else {
|
||||
log.Infof(
|
||||
ctx,
|
||||
"[updated %d rows] almost done migrating... ",
|
||||
updatedRows,
|
||||
"[updated %d total rows, now @ ~%.0f rows/s] almost done... ",
|
||||
updatedRowsTotal, rowsPerSecond,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
|
@ -185,13 +153,6 @@ func init() {
|
|||
return err
|
||||
}
|
||||
|
||||
log.Info(ctx, "dropping temporary thread_id_new index")
|
||||
if _, err := db.NewDropIndex().
|
||||
Index("statuses_thread_id_new_idx").
|
||||
Exec(ctx); err != nil {
|
||||
return gtserror.Newf("error dropping temporary thread_id_new index: %w", err)
|
||||
}
|
||||
|
||||
log.Info(ctx, "dropping old thread_to_statuses table")
|
||||
if _, err := db.NewDropTable().
|
||||
Table("thread_to_statuses").
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue