mirror of
https://github.com/superseriousbusiness/gotosocial.git
synced 2025-10-29 11:42:24 -05:00
Compare commits
2 commits
2563568ccc
...
18a08d9165
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
18a08d9165 | ||
|
|
bd1c43d55e |
2 changed files with 58 additions and 47 deletions
|
|
@ -26,6 +26,7 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"code.superseriousbusiness.org/gotosocial/internal/db"
|
||||||
newmodel "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new"
|
newmodel "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new"
|
||||||
oldmodel "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/old"
|
oldmodel "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/old"
|
||||||
"code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/util"
|
"code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/util"
|
||||||
|
|
@ -46,11 +47,15 @@ func init() {
|
||||||
return gtserror.Newf("error getting bun column def: %w", err)
|
return gtserror.Newf("error getting bun column def: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update column def to use temporary
|
// Update column def to use '${name}_new'.
|
||||||
// '${name}_new' while we migrate.
|
|
||||||
newColDef = strings.Replace(newColDef,
|
newColDef = strings.Replace(newColDef,
|
||||||
"thread_id", "thread_id_new", 1)
|
"thread_id", "thread_id_new", 1)
|
||||||
|
|
||||||
|
var sr statusRethreader
|
||||||
|
var updatedTotal int64
|
||||||
|
var maxID string
|
||||||
|
var statuses []*oldmodel.Status
|
||||||
|
|
||||||
// Create thread_id_new already
|
// Create thread_id_new already
|
||||||
// so we can populate it as we go.
|
// so we can populate it as we go.
|
||||||
log.Info(ctx, "creating statuses column thread_id_new")
|
log.Info(ctx, "creating statuses column thread_id_new")
|
||||||
|
|
@ -61,28 +66,23 @@ func init() {
|
||||||
return gtserror.Newf("error adding statuses column thread_id_new: %w", err)
|
return gtserror.Newf("error adding statuses column thread_id_new: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Try to merge the wal so we're
|
||||||
|
// not working on the wal file.
|
||||||
if err := doWALCheckpoint(ctx, db); err != nil {
|
if err := doWALCheckpoint(ctx, db); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get a total count of all
|
// Get a total count of all statuses before migration.
|
||||||
// statuses before migration.
|
total, err := db.NewSelect().Table("statuses").Count(ctx)
|
||||||
totalStatuses, err := db.
|
|
||||||
NewSelect().
|
|
||||||
Table("statuses").
|
|
||||||
Count(ctx)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return gtserror.Newf("error getting status table count: %w", err)
|
return gtserror.Newf("error getting status table count: %w", err)
|
||||||
}
|
}
|
||||||
log.Warnf(ctx, "migrating %d statuses total, this may take a *long* time", totalStatuses)
|
|
||||||
|
|
||||||
var sr statusRethreader
|
// Start at largest
|
||||||
var updatedRowsTotal int64
|
|
||||||
var statuses []*oldmodel.Status
|
|
||||||
|
|
||||||
// Page starting at largest
|
|
||||||
// possible ULID value.
|
// possible ULID value.
|
||||||
var maxID = id.Highest
|
maxID = id.Highest
|
||||||
|
|
||||||
|
log.Warnf(ctx, "rethreading %d statuses, this will take a *long* time", total)
|
||||||
|
|
||||||
// Open initial transaction.
|
// Open initial transaction.
|
||||||
tx, err := db.BeginTx(ctx, nil)
|
tx, err := db.BeginTx(ctx, nil)
|
||||||
|
|
@ -98,17 +98,19 @@ func init() {
|
||||||
|
|
||||||
batchStart := time.Now()
|
batchStart := time.Now()
|
||||||
|
|
||||||
// Select IDs of next
|
// Select top-level statuses.
|
||||||
// batch, paging down.
|
|
||||||
if err := tx.NewSelect().
|
if err := tx.NewSelect().
|
||||||
Model(&statuses).
|
Model(&statuses).
|
||||||
Column("id").
|
Column("id").
|
||||||
Where("? IS NULL", bun.Ident("in_reply_to_id")).
|
// We specifically use in_reply_to_account_id instead of in_reply_to_id as
|
||||||
|
// they should both be set / unset in unison, but we specifically have an
|
||||||
|
// index on in_reply_to_account_id with ID ordering, unlike in_reply_to_id.
|
||||||
|
Where("? IS NULL", bun.Ident("in_reply_to_account_id")).
|
||||||
Where("? < ?", bun.Ident("id"), maxID).
|
Where("? < ?", bun.Ident("id"), maxID).
|
||||||
OrderExpr("? DESC", bun.Ident("id")).
|
OrderExpr("? DESC", bun.Ident("id")).
|
||||||
Limit(500).
|
Limit(500).
|
||||||
Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) {
|
Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) {
|
||||||
return gtserror.Newf("error selecting statuses: %w", err)
|
return gtserror.Newf("error selecting top level statuses: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
l := len(statuses)
|
l := len(statuses)
|
||||||
|
|
@ -119,9 +121,9 @@ func init() {
|
||||||
// after leaving the loop.
|
// after leaving the loop.
|
||||||
break
|
break
|
||||||
|
|
||||||
} else if i%100 == 0 {
|
} else if i%200 == 0 {
|
||||||
// Begin a new transaction every
|
// Begin a new transaction every
|
||||||
// 100 batches (~50000 statuses),
|
// 200 batches (~100,000 statuses),
|
||||||
// to avoid massive commits.
|
// to avoid massive commits.
|
||||||
|
|
||||||
// Close existing transaction.
|
// Close existing transaction.
|
||||||
|
|
@ -142,30 +144,29 @@ func init() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set next maxID
|
// Set next maxID value from statuses.
|
||||||
// value from statuses.
|
maxID = statuses[len(statuses)-1].ID
|
||||||
maxID = statuses[l-1].ID
|
|
||||||
|
|
||||||
// Rethread using the
|
// Rethread using the
|
||||||
// open transaction.
|
// open transaction.
|
||||||
var updatedRowsThisBatch int64
|
var updatedInBatch int64
|
||||||
for _, status := range statuses {
|
for _, status := range statuses {
|
||||||
n, err := sr.rethreadStatus(ctx, tx, status, false)
|
n, err := sr.rethreadStatus(ctx, tx, status, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return gtserror.Newf("error rethreading status %s: %w", status.URI, err)
|
return gtserror.Newf("error rethreading status %s: %w", status.URI, err)
|
||||||
}
|
}
|
||||||
updatedRowsThisBatch += n
|
updatedInBatch += n
|
||||||
updatedRowsTotal += n
|
updatedTotal += n
|
||||||
}
|
}
|
||||||
|
|
||||||
// Show speed for this batch.
|
// Show speed for this batch.
|
||||||
timeTaken := time.Since(batchStart).Milliseconds()
|
timeTaken := time.Since(batchStart).Milliseconds()
|
||||||
msPerRow := float64(timeTaken) / float64(updatedRowsThisBatch)
|
msPerRow := float64(timeTaken) / float64(updatedInBatch)
|
||||||
rowsPerMs := float64(1) / float64(msPerRow)
|
rowsPerMs := float64(1) / float64(msPerRow)
|
||||||
rowsPerSecond := 1000 * rowsPerMs
|
rowsPerSecond := 1000 * rowsPerMs
|
||||||
|
|
||||||
// Show percent migrated overall.
|
// Show percent migrated overall.
|
||||||
totalDone := (float64(updatedRowsTotal) / float64(totalStatuses)) * 100
|
totalDone := (float64(updatedTotal) / float64(total)) * 100
|
||||||
|
|
||||||
log.Infof(
|
log.Infof(
|
||||||
ctx,
|
ctx,
|
||||||
|
|
@ -199,35 +200,38 @@ func init() {
|
||||||
|
|
||||||
batchStart := time.Now()
|
batchStart := time.Now()
|
||||||
|
|
||||||
// Get stragglers for which
|
// Select straggler statuses.
|
||||||
// we haven't set thread ID yet.
|
|
||||||
if err := db.NewSelect().
|
if err := db.NewSelect().
|
||||||
Model(&statuses).
|
Model(&statuses).
|
||||||
Column("id").
|
Column("id").
|
||||||
Where("? = ?", bun.Ident("thread_id_new"), id.Lowest).
|
Where("? = ?", bun.Ident("thread_id_new"), id.Lowest).
|
||||||
|
|
||||||
|
// We select in smaller batches for this part
|
||||||
|
// of the migration as there is a chance that
|
||||||
|
// we may be fetching statuses that might be
|
||||||
|
// part of the same thread, i.e. one call to
|
||||||
|
// rethreadStatus() may effect other statuses
|
||||||
|
// later in the slice.
|
||||||
Limit(250).
|
Limit(250).
|
||||||
Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) {
|
Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) {
|
||||||
return gtserror.Newf("error selecting straggler: %w", err)
|
return gtserror.Newf("error selecting straggler statuses: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Reached end of block.
|
||||||
if len(statuses) == 0 {
|
if len(statuses) == 0 {
|
||||||
// No more
|
|
||||||
// statuses!
|
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update this batch
|
// Rethread each selected batch of straggler statuses in a transaction.
|
||||||
// inside a transaction.
|
var updatedInBatch int64
|
||||||
var updatedRowsThisBatch int64
|
|
||||||
if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
|
if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
|
||||||
for _, status := range statuses {
|
for _, status := range statuses {
|
||||||
|
|
||||||
n, err := sr.rethreadStatus(ctx, tx, status, true)
|
n, err := sr.rethreadStatus(ctx, tx, status, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return gtserror.Newf("error rethreading status %s: %w", status.URI, err)
|
return gtserror.Newf("error rethreading status %s: %w", status.URI, err)
|
||||||
}
|
}
|
||||||
updatedRowsThisBatch += n
|
updatedInBatch += n
|
||||||
updatedRowsTotal += n
|
updatedTotal += n
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
|
|
@ -236,12 +240,12 @@ func init() {
|
||||||
|
|
||||||
// Show speed for this batch.
|
// Show speed for this batch.
|
||||||
timeTaken := time.Since(batchStart).Milliseconds()
|
timeTaken := time.Since(batchStart).Milliseconds()
|
||||||
msPerRow := float64(timeTaken) / float64(updatedRowsThisBatch)
|
msPerRow := float64(timeTaken) / float64(updatedInBatch)
|
||||||
rowsPerMs := float64(1) / float64(msPerRow)
|
rowsPerMs := float64(1) / float64(msPerRow)
|
||||||
rowsPerSecond := 1000 * rowsPerMs
|
rowsPerSecond := 1000 * rowsPerMs
|
||||||
|
|
||||||
// Show percent migrated overall.
|
// Show percent migrated overall.
|
||||||
totalDone := (float64(updatedRowsTotal) / float64(totalStatuses)) * 100
|
totalDone := (float64(updatedTotal) / float64(total)) * 100
|
||||||
|
|
||||||
log.Infof(
|
log.Infof(
|
||||||
ctx,
|
ctx,
|
||||||
|
|
@ -250,7 +254,7 @@ func init() {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Try to merge everything we've done so far.
|
// Attempt to merge any sqlite write-ahead-log.
|
||||||
if err := doWALCheckpoint(ctx, db); err != nil {
|
if err := doWALCheckpoint(ctx, db); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -374,6 +378,13 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu
|
||||||
|
|
||||||
// Ensure the passed status
|
// Ensure the passed status
|
||||||
// has up-to-date information.
|
// has up-to-date information.
|
||||||
|
// This may have changed from
|
||||||
|
// the initial batch selection
|
||||||
|
// to the rethreadStatus() call.
|
||||||
|
//
|
||||||
|
// Note: Use a map for this so we
|
||||||
|
// can also select thread_id_new,
|
||||||
|
// which is not part of *oldmodel.Status.
|
||||||
upToDateValues := make(map[string]any, 3)
|
upToDateValues := make(map[string]any, 3)
|
||||||
if err := tx.NewSelect().
|
if err := tx.NewSelect().
|
||||||
TableExpr("? AS ?", bun.Ident("statuses"), bun.Ident("status")).
|
TableExpr("? AS ?", bun.Ident("statuses"), bun.Ident("status")).
|
||||||
|
|
@ -608,7 +619,7 @@ func (sr *statusRethreader) getParents(ctx context.Context, tx bun.Tx) error {
|
||||||
Model(&parent).
|
Model(&parent).
|
||||||
Column("id", "in_reply_to_id", "thread_id").
|
Column("id", "in_reply_to_id", "thread_id").
|
||||||
Where("? = ?", bun.Ident("id"), id).
|
Where("? = ?", bun.Ident("id"), id).
|
||||||
Scan(ctx); err != nil && err != sql.ErrNoRows {
|
Scan(ctx); err != nil && err != db.ErrNoEntries {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -647,7 +658,7 @@ func (sr *statusRethreader) getChildren(ctx context.Context, tx bun.Tx, idx int)
|
||||||
Model(&sr.statuses).
|
Model(&sr.statuses).
|
||||||
Column("id", "thread_id").
|
Column("id", "thread_id").
|
||||||
Where("? = ?", bun.Ident("in_reply_to_id"), id).
|
Where("? = ?", bun.Ident("in_reply_to_id"), id).
|
||||||
Scan(ctx); err != nil && err != sql.ErrNoRows {
|
Scan(ctx); err != nil && err != db.ErrNoEntries {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -686,7 +697,7 @@ func (sr *statusRethreader) getStragglers(ctx context.Context, tx bun.Tx, idx in
|
||||||
bun.Ident("id"),
|
bun.Ident("id"),
|
||||||
bun.In(sr.statusIDs),
|
bun.In(sr.statusIDs),
|
||||||
).
|
).
|
||||||
Scan(ctx); err != nil && err != sql.ErrNoRows {
|
Scan(ctx); err != nil && err != db.ErrNoEntries {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -269,7 +269,7 @@ ol {
|
||||||
blockquote {
|
blockquote {
|
||||||
padding: 0.5rem;
|
padding: 0.5rem;
|
||||||
border-left: 0.2rem solid $border-accent;
|
border-left: 0.2rem solid $border-accent;
|
||||||
margin: 0;
|
margin-inline: 0;
|
||||||
font-style: normal;
|
font-style: normal;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue