mirror of
https://github.com/superseriousbusiness/gotosocial.git
synced 2025-10-29 15:02:25 -05:00
finalize indexes etc
This commit is contained in:
parent
121677754c
commit
c99412af98
6 changed files with 296 additions and 297 deletions
|
|
@ -28,10 +28,13 @@ import (
|
|||
"code.superseriousbusiness.org/gotosocial/internal/db"
|
||||
newmodel "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new"
|
||||
oldmodel "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/old"
|
||||
"code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/util"
|
||||
"code.superseriousbusiness.org/gotosocial/internal/gtserror"
|
||||
"code.superseriousbusiness.org/gotosocial/internal/id"
|
||||
"code.superseriousbusiness.org/gotosocial/internal/log"
|
||||
"code.superseriousbusiness.org/gotosocial/internal/util/xslices"
|
||||
"github.com/uptrace/bun"
|
||||
"github.com/uptrace/bun/dialect"
|
||||
)
|
||||
|
||||
func init() {
|
||||
|
|
@ -44,12 +47,60 @@ func init() {
|
|||
return gtserror.Newf("error getting bun column def: %w", err)
|
||||
}
|
||||
|
||||
// Update column def to use '${name}_new'.
|
||||
// Update column def to use temporary
|
||||
// '${name}_new' while we migrate.
|
||||
newColDef = strings.Replace(newColDef,
|
||||
"thread_id", "thread_id_new", 1)
|
||||
|
||||
// Create thread_id_new already
|
||||
// so we can populate it as we go.
|
||||
log.Info(ctx, "creating statuses column thread_id_new")
|
||||
if _, err := db.NewAddColumn().
|
||||
Table("statuses").
|
||||
ColumnExpr(newColDef).
|
||||
Exec(ctx); err != nil {
|
||||
return gtserror.Newf("error adding statuses column thread_id_new: %w", err)
|
||||
}
|
||||
|
||||
// Create an index on thread_id_new so we can keep
|
||||
// track of it as we update, and use it to avoid
|
||||
// selecting statuses we've already updated.
|
||||
//
|
||||
// We'll remove this at the end of the migration.
|
||||
log.Info(ctx, "creating temporary thread_id_new index")
|
||||
if db.Dialect().Name() == dialect.PG {
|
||||
// On Postgres we can use a partial index
|
||||
// to indicate we only care about default
|
||||
// value thread IDs (ie., not set yet).
|
||||
if _, err := db.NewCreateIndex().
|
||||
Table("statuses").
|
||||
Index("statuses_thread_id_new_idx").
|
||||
Column("thread_id_new").
|
||||
Where("? = ?", bun.Ident("thread_id_new"), id.Lowest).
|
||||
Exec(ctx); err != nil {
|
||||
return gtserror.Newf("error creating temporary thread_id_new index: %w", err)
|
||||
}
|
||||
} else {
|
||||
// On SQLite we can use an index expression
|
||||
// to do the same thing. While we *can* use
|
||||
// a partial index for this, an index expression
|
||||
// is magnitudes faster. Databases!
|
||||
if _, err := db.NewCreateIndex().
|
||||
Table("statuses").
|
||||
Index("statuses_thread_id_new_idx").
|
||||
ColumnExpr("? = ?", bun.Ident("thread_id_new"), id.Lowest).
|
||||
Exec(ctx); err != nil {
|
||||
return gtserror.Newf("error creating temporary thread_id_new index: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Attempt to merge any sqlite write-ahead-log.
|
||||
if err := doWALCheckpoint(ctx, db); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var sr statusRethreader
|
||||
var count int
|
||||
var updatedRows int64
|
||||
var maxID string
|
||||
var statuses []*oldmodel.Status
|
||||
|
||||
|
|
@ -63,47 +114,45 @@ func init() {
|
|||
// possible ULID value.
|
||||
maxID = id.Highest
|
||||
|
||||
log.Warn(ctx, "rethreading top-level statuses, this will take a *long* time")
|
||||
for /* TOP LEVEL STATUS LOOP */ {
|
||||
log.Warnf(ctx, "migrating %d statuses, this may take a *long* time", total)
|
||||
for {
|
||||
|
||||
// Reset slice.
|
||||
clear(statuses)
|
||||
statuses = statuses[:0]
|
||||
|
||||
// Select top-level statuses.
|
||||
// Select IDs of next batch, paging down
|
||||
// and skipping statuses to which we've
|
||||
// already allocated a (new) thread ID.
|
||||
if err := db.NewSelect().
|
||||
Model(&statuses).
|
||||
Column("id", "thread_id").
|
||||
|
||||
// We specifically use in_reply_to_account_id instead of in_reply_to_id as
|
||||
// they should both be set / unset in unison, but we specifically have an
|
||||
// index on in_reply_to_account_id with ID ordering, unlike in_reply_to_id.
|
||||
Where("? IS NULL", bun.Ident("in_reply_to_account_id")).
|
||||
Column("id").
|
||||
Where("? = ?", bun.Ident("thread_id_new"), id.Lowest).
|
||||
Where("? < ?", bun.Ident("id"), maxID).
|
||||
OrderExpr("? DESC", bun.Ident("id")).
|
||||
Limit(5000).
|
||||
Limit(200).
|
||||
Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) {
|
||||
return gtserror.Newf("error selecting top level statuses: %w", err)
|
||||
return gtserror.Newf("error selecting unthreaded statuses: %w", err)
|
||||
}
|
||||
|
||||
// Reached end of block.
|
||||
if len(statuses) == 0 {
|
||||
// No more statuses!
|
||||
l := len(statuses)
|
||||
if l == 0 {
|
||||
log.Info(ctx, "done migrating statuses!")
|
||||
break
|
||||
}
|
||||
|
||||
// Set next maxID value from statuses.
|
||||
maxID = statuses[len(statuses)-1].ID
|
||||
maxID = statuses[l-1].ID
|
||||
|
||||
// Rethread each selected batch of top-level statuses in a transaction.
|
||||
// Rethread each selected status in a transaction.
|
||||
if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
|
||||
|
||||
// Rethread each top-level status.
|
||||
for _, status := range statuses {
|
||||
n, err := sr.rethreadStatus(ctx, tx, status)
|
||||
if err != nil {
|
||||
return gtserror.Newf("error rethreading status %s: %w", status.URI, err)
|
||||
}
|
||||
count += n
|
||||
updatedRows += n
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
@ -111,7 +160,24 @@ func init() {
|
|||
return err
|
||||
}
|
||||
|
||||
log.Infof(ctx, "[approx %d of %d] rethreading statuses (top-level)", count, total)
|
||||
// Show percent migrated.
|
||||
//
|
||||
// Will maybe end up wonky due to approximations
|
||||
// and batching, so show a generic message at 100%.
|
||||
percentDone := (float64(updatedRows) / float64(total)) * 100
|
||||
if percentDone <= 100 {
|
||||
log.Infof(
|
||||
ctx,
|
||||
"[updated %d rows] migrated approx. %.2f%% of statuses",
|
||||
updatedRows, percentDone,
|
||||
)
|
||||
} else {
|
||||
log.Infof(
|
||||
ctx,
|
||||
"[updated %d rows] almost done migrating... ",
|
||||
updatedRows,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// Attempt to merge any sqlite write-ahead-log.
|
||||
|
|
@ -119,58 +185,11 @@ func init() {
|
|||
return err
|
||||
}
|
||||
|
||||
log.Warn(ctx, "rethreading straggler statuses, this will take a *long* time")
|
||||
for /* STRAGGLER STATUS LOOP */ {
|
||||
|
||||
// Reset slice.
|
||||
clear(statuses)
|
||||
statuses = statuses[:0]
|
||||
|
||||
// Select straggler statuses.
|
||||
if err := db.NewSelect().
|
||||
Model(&statuses).
|
||||
Column("id", "in_reply_to_id", "thread_id").
|
||||
Where("? IS NULL", bun.Ident("thread_id")).
|
||||
|
||||
// We select in smaller batches for this part
|
||||
// of the migration as there is a chance that
|
||||
// we may be fetching statuses that might be
|
||||
// part of the same thread, i.e. one call to
|
||||
// rethreadStatus() may effect other statuses
|
||||
// later in the slice.
|
||||
Limit(1000).
|
||||
Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) {
|
||||
return gtserror.Newf("error selecting straggler statuses: %w", err)
|
||||
}
|
||||
|
||||
// Reached end of block.
|
||||
if len(statuses) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
// Rethread each selected batch of straggler statuses in a transaction.
|
||||
if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
|
||||
|
||||
// Rethread each top-level status.
|
||||
for _, status := range statuses {
|
||||
n, err := sr.rethreadStatus(ctx, tx, status)
|
||||
if err != nil {
|
||||
return gtserror.Newf("error rethreading status %s: %w", status.URI, err)
|
||||
}
|
||||
count += n
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Infof(ctx, "[approx %d of %d] rethreading statuses (stragglers)", count, total)
|
||||
}
|
||||
|
||||
// Attempt to merge any sqlite write-ahead-log.
|
||||
if err := doWALCheckpoint(ctx, db); err != nil {
|
||||
return err
|
||||
log.Info(ctx, "dropping temporary thread_id_new index")
|
||||
if _, err := db.NewDropIndex().
|
||||
Index("statuses_thread_id_new_idx").
|
||||
Exec(ctx); err != nil {
|
||||
return gtserror.Newf("error dropping temporary thread_id_new index: %w", err)
|
||||
}
|
||||
|
||||
log.Info(ctx, "dropping old thread_to_statuses table")
|
||||
|
|
@ -180,33 +199,6 @@ func init() {
|
|||
return gtserror.Newf("error dropping old thread_to_statuses table: %w", err)
|
||||
}
|
||||
|
||||
log.Info(ctx, "creating new statuses thread_id column")
|
||||
if _, err := db.NewAddColumn().
|
||||
Table("statuses").
|
||||
ColumnExpr(newColDef).
|
||||
Exec(ctx); err != nil {
|
||||
return gtserror.Newf("error adding new thread_id column: %w", err)
|
||||
}
|
||||
|
||||
log.Info(ctx, "setting thread_id_new = thread_id (this may take a while...)")
|
||||
if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
|
||||
return batchUpdateByID(ctx, tx,
|
||||
"statuses", // table
|
||||
"id", // batchByCol
|
||||
"UPDATE ? SET ? = ?", // updateQuery
|
||||
[]any{bun.Ident("statuses"),
|
||||
bun.Ident("thread_id_new"),
|
||||
bun.Ident("thread_id")},
|
||||
)
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Attempt to merge any sqlite write-ahead-log.
|
||||
if err := doWALCheckpoint(ctx, db); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Info(ctx, "dropping old statuses thread_id index")
|
||||
if _, err := db.NewDropIndex().
|
||||
Index("statuses_thread_id_idx").
|
||||
|
|
@ -289,8 +281,8 @@ type statusRethreader struct {
|
|||
}
|
||||
|
||||
// rethreadStatus is the main logic handler for statusRethreader{}. this is what gets called from the migration
|
||||
// in order to trigger a status rethreading operation for the given status, returning total number rethreaded.
|
||||
func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, status *oldmodel.Status) (int, error) {
|
||||
// in order to trigger a status rethreading operation for the given status, returning total number of rows changed.
|
||||
func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, status *oldmodel.Status) (int64, error) {
|
||||
|
||||
// Zero slice and
|
||||
// map ptr values.
|
||||
|
|
@ -346,6 +338,8 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu
|
|||
return 0, gtserror.Newf("error getting children: %w", err)
|
||||
}
|
||||
|
||||
// Dedupe thread IDs.
|
||||
|
||||
// Check for newly picked-up threads
|
||||
// to find stragglers for below. Else
|
||||
// we've reached end of what we can do.
|
||||
|
|
@ -371,10 +365,6 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu
|
|||
threadIdx = len(sr.threadIDs)
|
||||
}
|
||||
|
||||
// Total number of
|
||||
// statuses threaded.
|
||||
total := len(sr.statusIDs)
|
||||
|
||||
// Check for the case where the entire
|
||||
// batch of statuses is already correctly
|
||||
// threaded. Then we have nothing to do!
|
||||
|
|
@ -417,29 +407,61 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu
|
|||
}
|
||||
}
|
||||
|
||||
// Update all the statuses to
|
||||
// use determined thread_id.
|
||||
if _, err := tx.NewUpdate().
|
||||
Table("statuses").
|
||||
Where("? IN (?)", bun.Ident("id"), bun.In(sr.statusIDs)).
|
||||
Set("? = ?", bun.Ident("thread_id"), threadID).
|
||||
Exec(ctx); err != nil {
|
||||
// Use a bulk update to update all the
|
||||
// statuses to use determined thread_id.
|
||||
//
|
||||
// https://bun.uptrace.dev/guide/query-update.html#bulk-update
|
||||
values := make([]*util.Status, 0, len(sr.statusIDs))
|
||||
for _, statusID := range sr.statusIDs {
|
||||
values = append(values, &util.Status{
|
||||
ID: statusID,
|
||||
ThreadIDNew: threadID,
|
||||
})
|
||||
}
|
||||
|
||||
res, err := tx.NewUpdate().
|
||||
With("_data", tx.NewValues(&values)).
|
||||
Model((*util.Status)(nil)).
|
||||
TableExpr("_data").
|
||||
// Set the new thread ID, which we can use as
|
||||
// an indication that we've migrated this batch.
|
||||
Set("? = ?", bun.Ident("thread_id_new"), bun.Ident("_data.thread_id_new")).
|
||||
// While we're here, also set old thread_id, as
|
||||
// we'll use it for further rethreading purposes.
|
||||
Set("? = ?", bun.Ident("thread_id"), bun.Ident("_data.thread_id_new")).
|
||||
// "Join" on status ID.
|
||||
Where("? = ?", bun.Ident("status.id"), bun.Ident("_data.id")).
|
||||
// To avoid spurious writes,
|
||||
// only update unmigrated statuses.
|
||||
Where("? = ?", bun.Ident("status.thread_id_new"), id.Lowest).
|
||||
Exec(ctx)
|
||||
if err != nil {
|
||||
return 0, gtserror.Newf("error updating status thread ids: %w", err)
|
||||
}
|
||||
|
||||
rowsAffected, err := res.RowsAffected()
|
||||
if err != nil {
|
||||
return 0, gtserror.Newf("error counting rows affected: %w", err)
|
||||
}
|
||||
|
||||
if len(sr.threadIDs) > 0 {
|
||||
// Update any existing thread
|
||||
// mutes to use latest thread_id.
|
||||
|
||||
// Dedupe thread IDs before query
|
||||
// to avoid ludicrous "IN" clause.
|
||||
threadIDs := sr.threadIDs
|
||||
threadIDs = xslices.Deduplicate(threadIDs)
|
||||
if _, err := tx.NewUpdate().
|
||||
Table("thread_mutes").
|
||||
Where("? IN (?)", bun.Ident("thread_id"), bun.In(sr.threadIDs)).
|
||||
Where("? IN (?)", bun.Ident("thread_id"), bun.In(threadIDs)).
|
||||
Set("? = ?", bun.Ident("thread_id"), threadID).
|
||||
Exec(ctx); err != nil {
|
||||
return 0, gtserror.Newf("error updating mute thread ids: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return total, nil
|
||||
return rowsAffected, nil
|
||||
}
|
||||
|
||||
// append will append the given status to the internal tracking of statusRethreader{} for
|
||||
|
|
@ -560,6 +582,11 @@ func (sr *statusRethreader) getStragglers(ctx context.Context, tx bun.Tx, idx in
|
|||
clear(sr.statuses)
|
||||
sr.statuses = sr.statuses[:0]
|
||||
|
||||
// Dedupe thread IDs before query
|
||||
// to avoid ludicrous "IN" clause.
|
||||
threadIDs := sr.threadIDs[idx:]
|
||||
threadIDs = xslices.Deduplicate(threadIDs)
|
||||
|
||||
// Select stragglers that
|
||||
// also have thread IDs.
|
||||
if err := tx.NewSelect().
|
||||
|
|
@ -567,7 +594,7 @@ func (sr *statusRethreader) getStragglers(ctx context.Context, tx bun.Tx, idx in
|
|||
Column("id", "thread_id", "in_reply_to_id").
|
||||
Where("? IN (?) AND ? NOT IN (?)",
|
||||
bun.Ident("thread_id"),
|
||||
bun.In(sr.threadIDs[idx:]),
|
||||
bun.In(threadIDs),
|
||||
bun.Ident("id"),
|
||||
bun.In(sr.statusIDs),
|
||||
).
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue