mirror of
https://github.com/superseriousbusiness/gotosocial.git
synced 2025-11-18 13:27:28 -06:00
Compare commits
3 commits
e58f939278
...
c7b0150834
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c7b0150834 | ||
|
|
cde373143a | ||
|
|
c99412af98 |
2 changed files with 35 additions and 41 deletions
|
|
@ -32,6 +32,7 @@ These contribution guidelines were adapted from / inspired by those of Gitea (ht
|
||||||
- [CLI Tests](#cli-tests)
|
- [CLI Tests](#cli-tests)
|
||||||
- [Federation](#federation)
|
- [Federation](#federation)
|
||||||
- [Updating Swagger docs](#updating-swagger-docs)
|
- [Updating Swagger docs](#updating-swagger-docs)
|
||||||
|
- [CI/CD configuration](#ci-cd-configuration)
|
||||||
- [Other Useful Stuff](#other-useful-stuff)
|
- [Other Useful Stuff](#other-useful-stuff)
|
||||||
- [Running migrations on a Postgres DB backup locally](#running-migrations-on-a-postgres-db-backup-locally)
|
- [Running migrations on a Postgres DB backup locally](#running-migrations-on-a-postgres-db-backup-locally)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -24,6 +24,7 @@ import (
|
||||||
"reflect"
|
"reflect"
|
||||||
"slices"
|
"slices"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"code.superseriousbusiness.org/gotosocial/internal/db"
|
"code.superseriousbusiness.org/gotosocial/internal/db"
|
||||||
newmodel "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new"
|
newmodel "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new"
|
||||||
|
|
@ -61,21 +62,9 @@ func init() {
|
||||||
return gtserror.Newf("error adding statuses column thread_id_new: %w", err)
|
return gtserror.Newf("error adding statuses column thread_id_new: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create an index on thread_id_new so
|
|
||||||
// we can keep track of it as we update.
|
|
||||||
//
|
|
||||||
// We'll remove this at the end of the migration.
|
|
||||||
log.Info(ctx, "creating temporary thread_id_new index")
|
|
||||||
if _, err := db.NewCreateIndex().
|
|
||||||
Table("statuses").
|
|
||||||
Index("statuses_thread_id_new_idx").
|
|
||||||
Column("thread_id_new").
|
|
||||||
Exec(ctx); err != nil {
|
|
||||||
return gtserror.Newf("error creating temporary thread_id_new index: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var sr statusRethreader
|
var sr statusRethreader
|
||||||
var updatedRows int64
|
var updatedRowsTotal int64
|
||||||
|
var maxID string
|
||||||
var statuses []*oldmodel.Status
|
var statuses []*oldmodel.Status
|
||||||
|
|
||||||
// Get a total count of all statuses before migration.
|
// Get a total count of all statuses before migration.
|
||||||
|
|
@ -84,41 +73,50 @@ func init() {
|
||||||
return gtserror.Newf("error getting status table count: %w", err)
|
return gtserror.Newf("error getting status table count: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Warnf(ctx, "migrating %d statuses, this may take a *long* time, and the first few queries will likely be slower than the rest", total)
|
// Start at largest
|
||||||
|
// possible ULID value.
|
||||||
|
maxID = id.Highest
|
||||||
|
|
||||||
|
log.Warnf(ctx, "migrating %d statuses, this may take a *long* time", total)
|
||||||
for {
|
for {
|
||||||
|
start := time.Now()
|
||||||
|
|
||||||
// Reset slice.
|
// Reset slice.
|
||||||
clear(statuses)
|
clear(statuses)
|
||||||
statuses = statuses[:0]
|
statuses = statuses[:0]
|
||||||
|
|
||||||
// Select IDs of next batch, choosing
|
// Select IDs of next
|
||||||
// only statuses we haven't migrated yet.
|
// batch, paging down.
|
||||||
//
|
|
||||||
// Let the database give us these in whatever order
|
|
||||||
// it likes, as it's faster than doing an ORDER BY.
|
|
||||||
if err := db.NewSelect().
|
if err := db.NewSelect().
|
||||||
Model(&statuses).
|
Model(&statuses).
|
||||||
Column("id").
|
Column("id").
|
||||||
Where("? = ?", bun.Ident("thread_id_new"), id.Lowest).
|
Where("? < ?", bun.Ident("id"), maxID).
|
||||||
|
OrderExpr("? DESC", bun.Ident("id")).
|
||||||
Limit(250).
|
Limit(250).
|
||||||
Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) {
|
Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) {
|
||||||
return gtserror.Newf("error selecting unthreaded statuses: %w", err)
|
return gtserror.Newf("error selecting unthreaded statuses: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// No more statuses!
|
// No more statuses!
|
||||||
if len(statuses) == 0 {
|
l := len(statuses)
|
||||||
|
if l == 0 {
|
||||||
log.Info(ctx, "done migrating statuses!")
|
log.Info(ctx, "done migrating statuses!")
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Set next maxID value from statuses.
|
||||||
|
maxID = statuses[l-1].ID
|
||||||
|
|
||||||
// Rethread each selected status in a transaction.
|
// Rethread each selected status in a transaction.
|
||||||
|
var updatedRowsThisBatch int64
|
||||||
if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
|
if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
|
||||||
for _, status := range statuses {
|
for _, status := range statuses {
|
||||||
n, err := sr.rethreadStatus(ctx, tx, status)
|
n, err := sr.rethreadStatus(ctx, tx, status)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return gtserror.Newf("error rethreading status %s: %w", status.URI, err)
|
return gtserror.Newf("error rethreading status %s: %w", status.URI, err)
|
||||||
}
|
}
|
||||||
updatedRows += n
|
updatedRowsThisBatch += n
|
||||||
|
updatedRowsTotal += n
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
@ -126,22 +124,26 @@ func init() {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Show percent migrated.
|
// Show current speed + percent migrated.
|
||||||
//
|
//
|
||||||
// Will maybe end up wonky due to approximations
|
// Percent may end up wonky due to approximations
|
||||||
// and batching, so stop showing it after 99%.
|
// and batching, so show a generic message at 100%.
|
||||||
percentDone := (float64(updatedRows) / float64(total)) * 100
|
timeTaken := time.Since(start).Milliseconds()
|
||||||
if percentDone <= 99 {
|
msPerRow := float64(timeTaken) / float64(updatedRowsThisBatch)
|
||||||
|
rowsPerMs := float64(1) / float64(msPerRow)
|
||||||
|
rowsPerSecond := 1000 * rowsPerMs
|
||||||
|
percentDone := (float64(updatedRowsTotal) / float64(total)) * 100
|
||||||
|
if percentDone <= 100 {
|
||||||
log.Infof(
|
log.Infof(
|
||||||
ctx,
|
ctx,
|
||||||
"[updated %d rows] migrated approx. %.2f%% of statuses",
|
"[updated %d total rows, now @ ~%.0f rows/s] done ~%.2f%% of statuses",
|
||||||
updatedRows, percentDone,
|
updatedRowsTotal, rowsPerSecond, percentDone,
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
log.Infof(
|
log.Infof(
|
||||||
ctx,
|
ctx,
|
||||||
"[updated %d rows] almost done migrating... ",
|
"[updated %d total rows, now @ ~%.0f rows/s] almost done... ",
|
||||||
updatedRows,
|
updatedRowsTotal, rowsPerSecond,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -151,13 +153,6 @@ func init() {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info(ctx, "dropping temporary thread_id_new index")
|
|
||||||
if _, err := db.NewDropIndex().
|
|
||||||
Index("statuses_thread_id_new_idx").
|
|
||||||
Exec(ctx); err != nil {
|
|
||||||
return gtserror.Newf("error dropping temporary thread_id_new index: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Info(ctx, "dropping old thread_to_statuses table")
|
log.Info(ctx, "dropping old thread_to_statuses table")
|
||||||
if _, err := db.NewDropTable().
|
if _, err := db.NewDropTable().
|
||||||
Table("thread_to_statuses").
|
Table("thread_to_statuses").
|
||||||
|
|
@ -304,8 +299,6 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu
|
||||||
return 0, gtserror.Newf("error getting children: %w", err)
|
return 0, gtserror.Newf("error getting children: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Dedupe thread IDs.
|
|
||||||
|
|
||||||
// Check for newly picked-up threads
|
// Check for newly picked-up threads
|
||||||
// to find stragglers for below. Else
|
// to find stragglers for below. Else
|
||||||
// we've reached end of what we can do.
|
// we've reached end of what we can do.
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue