mirror of
https://github.com/superseriousbusiness/gotosocial.git
synced 2025-10-28 18:42:25 -05:00
[chore] Use bulk updates + fewer loops in status rethreading migration (#4459)
This pull request tries to optimize our status rethreading migration by using bulk updates + avoiding unnecessary writes, and doing the migration in one top-level loop and one stragglers loop, without the extra loop to copy thread_id over. On my machine it runs at about 2400 rows per second on Postgres, now, and about 9000 rows per second on SQLite. Tried *many* different ways of doing this, with and without temporary indexes, with different batch and transaction sizes, etc., and this seems to be just about the most performant way of getting stuff done. With the changes, a few minutes have been shaved off migration time testing on my development machine. *Hopefully* this will translate to more time shaved off when running on a vps with slower read/write speed and less processor power. SQLite before: ``` real 20m58,446s user 16m26,635s sys 5m53,648s ``` SQLite after: ``` real 14m25,435s user 12m47,449s sys 2m27,898s ``` Postgres before: ``` real 28m25,307s user 3m40,005s sys 4m45,018s ``` Postgres after: ``` real 22m31,999s user 3m46,674s sys 4m39,592s ``` Reviewed-on: https://codeberg.org/superseriousbusiness/gotosocial/pulls/4459 Co-authored-by: tobi <tobi.smethurst@protonmail.com> Co-committed-by: tobi <tobi.smethurst@protonmail.com>
This commit is contained in:
parent
bd1c43d55e
commit
e7cd8bb43e
7 changed files with 429 additions and 271 deletions
|
|
@ -33,6 +33,8 @@ These contribution guidelines were adapted from / inspired by those of Gitea (ht
|
||||||
- [Federation](#federation)
|
- [Federation](#federation)
|
||||||
- [Updating Swagger docs](#updating-swagger-docs)
|
- [Updating Swagger docs](#updating-swagger-docs)
|
||||||
- [CI/CD configuration](#ci-cd-configuration)
|
- [CI/CD configuration](#ci-cd-configuration)
|
||||||
|
- [Other Useful Stuff](#other-useful-stuff)
|
||||||
|
- [Running migrations on a Postgres DB backup locally](#running-migrations-on-a-postgres-db-backup-locally)
|
||||||
|
|
||||||
## Introduction
|
## Introduction
|
||||||
|
|
||||||
|
|
@ -525,3 +527,38 @@ The `woodpecker` pipeline files are in the `.woodpecker` directory of this repos
|
||||||
The Woodpecker instance for GoToSocial is [here](https://woodpecker.superseriousbusiness.org/repos/2).
|
The Woodpecker instance for GoToSocial is [here](https://woodpecker.superseriousbusiness.org/repos/2).
|
||||||
|
|
||||||
Documentation for Woodpecker is [here](https://woodpecker-ci.org/docs/intro).
|
Documentation for Woodpecker is [here](https://woodpecker-ci.org/docs/intro).
|
||||||
|
|
||||||
|
## Other Useful Stuff
|
||||||
|
|
||||||
|
Various bits and bobs.
|
||||||
|
|
||||||
|
### Running migrations on a Postgres DB backup locally
|
||||||
|
|
||||||
|
It may be useful when testing or debugging migrations to be able to run them against a copy of a real instance's Postgres database locally.
|
||||||
|
|
||||||
|
Basic steps for this:
|
||||||
|
|
||||||
|
First dump the Postgres database on the remote machine, and copy the dump over to your development machine.
|
||||||
|
|
||||||
|
Now create a local Postgres container and mount the dump into it with, for example:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
docker run -it --name postgres --network host -e POSTGRES_PASSWORD=postgres -v /path/to/db_dump:/db_dump postgres
|
||||||
|
```
|
||||||
|
|
||||||
|
In a separate terminal window, execute a command inside the running container to load the dump into the "postgres" database:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
docker exec -it --user postgres postgres psql -X -f /db_dump postgres
|
||||||
|
```
|
||||||
|
|
||||||
|
With the Postgres container still running, run GoToSocial and point it towards the container. Use the appropriate `GTS_HOST` (and `GTS_ACCOUNT_DOMAIN`) values for the instance you dumped:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
GTS_HOST=example.org \
|
||||||
|
GTS_DB_TYPE=postgres \
|
||||||
|
GTS_DB_POSTGRES_CONNECTION_STRING=postgres://postgres:postgres@localhost:5432/postgres \
|
||||||
|
./gotosocial migrations run
|
||||||
|
```
|
||||||
|
|
||||||
|
When you're done messing around, don't forget to remove any containers that you started up, and remove any lingering volumes with `docker volume prune`, else you might end up filling your disk with unused temporary volumes.
|
||||||
|
|
|
||||||
|
|
@ -24,13 +24,16 @@ import (
|
||||||
"reflect"
|
"reflect"
|
||||||
"slices"
|
"slices"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"code.superseriousbusiness.org/gotosocial/internal/db"
|
"code.superseriousbusiness.org/gotosocial/internal/db"
|
||||||
newmodel "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new"
|
newmodel "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new"
|
||||||
oldmodel "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/old"
|
oldmodel "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/old"
|
||||||
|
"code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/util"
|
||||||
"code.superseriousbusiness.org/gotosocial/internal/gtserror"
|
"code.superseriousbusiness.org/gotosocial/internal/gtserror"
|
||||||
"code.superseriousbusiness.org/gotosocial/internal/id"
|
"code.superseriousbusiness.org/gotosocial/internal/id"
|
||||||
"code.superseriousbusiness.org/gotosocial/internal/log"
|
"code.superseriousbusiness.org/gotosocial/internal/log"
|
||||||
|
"code.superseriousbusiness.org/gotosocial/internal/util/xslices"
|
||||||
"github.com/uptrace/bun"
|
"github.com/uptrace/bun"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -49,10 +52,26 @@ func init() {
|
||||||
"thread_id", "thread_id_new", 1)
|
"thread_id", "thread_id_new", 1)
|
||||||
|
|
||||||
var sr statusRethreader
|
var sr statusRethreader
|
||||||
var count int
|
var updatedTotal int64
|
||||||
var maxID string
|
var maxID string
|
||||||
var statuses []*oldmodel.Status
|
var statuses []*oldmodel.Status
|
||||||
|
|
||||||
|
// Create thread_id_new already
|
||||||
|
// so we can populate it as we go.
|
||||||
|
log.Info(ctx, "creating statuses column thread_id_new")
|
||||||
|
if _, err := db.NewAddColumn().
|
||||||
|
Table("statuses").
|
||||||
|
ColumnExpr(newColDef).
|
||||||
|
Exec(ctx); err != nil {
|
||||||
|
return gtserror.Newf("error adding statuses column thread_id_new: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to merge the wal so we're
|
||||||
|
// not working on the wal file.
|
||||||
|
if err := doWALCheckpoint(ctx, db); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// Get a total count of all statuses before migration.
|
// Get a total count of all statuses before migration.
|
||||||
total, err := db.NewSelect().Table("statuses").Count(ctx)
|
total, err := db.NewSelect().Table("statuses").Count(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -63,74 +82,129 @@ func init() {
|
||||||
// possible ULID value.
|
// possible ULID value.
|
||||||
maxID = id.Highest
|
maxID = id.Highest
|
||||||
|
|
||||||
log.Warn(ctx, "rethreading top-level statuses, this will take a *long* time")
|
log.Warnf(ctx, "rethreading %d statuses, this will take a *long* time", total)
|
||||||
for /* TOP LEVEL STATUS LOOP */ {
|
|
||||||
|
// Open initial transaction.
|
||||||
|
tx, err := db.BeginTx(ctx, nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 1; ; i++ {
|
||||||
|
|
||||||
// Reset slice.
|
// Reset slice.
|
||||||
clear(statuses)
|
clear(statuses)
|
||||||
statuses = statuses[:0]
|
statuses = statuses[:0]
|
||||||
|
|
||||||
// Select top-level statuses.
|
batchStart := time.Now()
|
||||||
if err := db.NewSelect().
|
|
||||||
Model(&statuses).
|
|
||||||
Column("id", "thread_id").
|
|
||||||
|
|
||||||
|
// Select top-level statuses.
|
||||||
|
if err := tx.NewSelect().
|
||||||
|
Model(&statuses).
|
||||||
|
Column("id").
|
||||||
// We specifically use in_reply_to_account_id instead of in_reply_to_id as
|
// We specifically use in_reply_to_account_id instead of in_reply_to_id as
|
||||||
// they should both be set / unset in unison, but we specifically have an
|
// they should both be set / unset in unison, but we specifically have an
|
||||||
// index on in_reply_to_account_id with ID ordering, unlike in_reply_to_id.
|
// index on in_reply_to_account_id with ID ordering, unlike in_reply_to_id.
|
||||||
Where("? IS NULL", bun.Ident("in_reply_to_account_id")).
|
Where("? IS NULL", bun.Ident("in_reply_to_account_id")).
|
||||||
Where("? < ?", bun.Ident("id"), maxID).
|
Where("? < ?", bun.Ident("id"), maxID).
|
||||||
OrderExpr("? DESC", bun.Ident("id")).
|
OrderExpr("? DESC", bun.Ident("id")).
|
||||||
Limit(5000).
|
Limit(500).
|
||||||
Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) {
|
Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) {
|
||||||
return gtserror.Newf("error selecting top level statuses: %w", err)
|
return gtserror.Newf("error selecting top level statuses: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reached end of block.
|
l := len(statuses)
|
||||||
if len(statuses) == 0 {
|
if l == 0 {
|
||||||
|
// No more statuses!
|
||||||
|
//
|
||||||
|
// Transaction will be closed
|
||||||
|
// after leaving the loop.
|
||||||
break
|
break
|
||||||
|
|
||||||
|
} else if i%200 == 0 {
|
||||||
|
// Begin a new transaction every
|
||||||
|
// 200 batches (~100,000 statuses),
|
||||||
|
// to avoid massive commits.
|
||||||
|
|
||||||
|
// Close existing transaction.
|
||||||
|
if err := tx.Commit(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to flush the wal
|
||||||
|
// to avoid silly wal sizes.
|
||||||
|
if err := doWALCheckpoint(ctx, db); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Open new transaction.
|
||||||
|
tx, err = db.BeginTx(ctx, nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set next maxID value from statuses.
|
// Set next maxID value from statuses.
|
||||||
maxID = statuses[len(statuses)-1].ID
|
maxID = statuses[len(statuses)-1].ID
|
||||||
|
|
||||||
// Rethread each selected batch of top-level statuses in a transaction.
|
// Rethread using the
|
||||||
if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
|
// open transaction.
|
||||||
|
var updatedInBatch int64
|
||||||
// Rethread each top-level status.
|
for _, status := range statuses {
|
||||||
for _, status := range statuses {
|
n, err := sr.rethreadStatus(ctx, tx, status, false)
|
||||||
n, err := sr.rethreadStatus(ctx, tx, status)
|
if err != nil {
|
||||||
if err != nil {
|
return gtserror.Newf("error rethreading status %s: %w", status.URI, err)
|
||||||
return gtserror.Newf("error rethreading status %s: %w", status.URI, err)
|
|
||||||
}
|
|
||||||
count += n
|
|
||||||
}
|
}
|
||||||
|
updatedInBatch += n
|
||||||
return nil
|
updatedTotal += n
|
||||||
}); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infof(ctx, "[approx %d of %d] rethreading statuses (top-level)", count, total)
|
// Show speed for this batch.
|
||||||
|
timeTaken := time.Since(batchStart).Milliseconds()
|
||||||
|
msPerRow := float64(timeTaken) / float64(updatedInBatch)
|
||||||
|
rowsPerMs := float64(1) / float64(msPerRow)
|
||||||
|
rowsPerSecond := 1000 * rowsPerMs
|
||||||
|
|
||||||
|
// Show percent migrated overall.
|
||||||
|
totalDone := (float64(updatedTotal) / float64(total)) * 100
|
||||||
|
|
||||||
|
log.Infof(
|
||||||
|
ctx,
|
||||||
|
"[~%.2f%% done; ~%.0f rows/s] migrating threads",
|
||||||
|
totalDone, rowsPerSecond,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Attempt to merge any sqlite write-ahead-log.
|
// Close transaction.
|
||||||
if err := doWALCheckpoint(ctx, db); err != nil {
|
if err := tx.Commit(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Warn(ctx, "rethreading straggler statuses, this will take a *long* time")
|
// Create a partial index on thread_id_new to find stragglers.
|
||||||
for /* STRAGGLER STATUS LOOP */ {
|
// This index will be removed at the end of the migration.
|
||||||
|
log.Info(ctx, "creating temporary statuses thread_id_new index")
|
||||||
|
if _, err := db.NewCreateIndex().
|
||||||
|
Table("statuses").
|
||||||
|
Index("statuses_thread_id_new_idx").
|
||||||
|
Column("thread_id_new").
|
||||||
|
Where("? = ?", bun.Ident("thread_id_new"), id.Lowest).
|
||||||
|
Exec(ctx); err != nil {
|
||||||
|
return gtserror.Newf("error creating new thread_id index: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 1; ; i++ {
|
||||||
|
|
||||||
// Reset slice.
|
// Reset slice.
|
||||||
clear(statuses)
|
clear(statuses)
|
||||||
statuses = statuses[:0]
|
statuses = statuses[:0]
|
||||||
|
|
||||||
|
batchStart := time.Now()
|
||||||
|
|
||||||
// Select straggler statuses.
|
// Select straggler statuses.
|
||||||
if err := db.NewSelect().
|
if err := db.NewSelect().
|
||||||
Model(&statuses).
|
Model(&statuses).
|
||||||
Column("id", "in_reply_to_id", "thread_id").
|
Column("id").
|
||||||
Where("? IS NULL", bun.Ident("thread_id")).
|
Where("? = ?", bun.Ident("thread_id_new"), id.Lowest).
|
||||||
|
|
||||||
// We select in smaller batches for this part
|
// We select in smaller batches for this part
|
||||||
// of the migration as there is a chance that
|
// of the migration as there is a chance that
|
||||||
|
|
@ -138,7 +212,7 @@ func init() {
|
||||||
// part of the same thread, i.e. one call to
|
// part of the same thread, i.e. one call to
|
||||||
// rethreadStatus() may effect other statuses
|
// rethreadStatus() may effect other statuses
|
||||||
// later in the slice.
|
// later in the slice.
|
||||||
Limit(1000).
|
Limit(250).
|
||||||
Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) {
|
Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) {
|
||||||
return gtserror.Newf("error selecting straggler statuses: %w", err)
|
return gtserror.Newf("error selecting straggler statuses: %w", err)
|
||||||
}
|
}
|
||||||
|
|
@ -149,23 +223,35 @@ func init() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Rethread each selected batch of straggler statuses in a transaction.
|
// Rethread each selected batch of straggler statuses in a transaction.
|
||||||
|
var updatedInBatch int64
|
||||||
if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
|
if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
|
||||||
|
|
||||||
// Rethread each top-level status.
|
|
||||||
for _, status := range statuses {
|
for _, status := range statuses {
|
||||||
n, err := sr.rethreadStatus(ctx, tx, status)
|
n, err := sr.rethreadStatus(ctx, tx, status, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return gtserror.Newf("error rethreading status %s: %w", status.URI, err)
|
return gtserror.Newf("error rethreading status %s: %w", status.URI, err)
|
||||||
}
|
}
|
||||||
count += n
|
updatedInBatch += n
|
||||||
|
updatedTotal += n
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infof(ctx, "[approx %d of %d] rethreading statuses (stragglers)", count, total)
|
// Show speed for this batch.
|
||||||
|
timeTaken := time.Since(batchStart).Milliseconds()
|
||||||
|
msPerRow := float64(timeTaken) / float64(updatedInBatch)
|
||||||
|
rowsPerMs := float64(1) / float64(msPerRow)
|
||||||
|
rowsPerSecond := 1000 * rowsPerMs
|
||||||
|
|
||||||
|
// Show percent migrated overall.
|
||||||
|
totalDone := (float64(updatedTotal) / float64(total)) * 100
|
||||||
|
|
||||||
|
log.Infof(
|
||||||
|
ctx,
|
||||||
|
"[~%.2f%% done; ~%.0f rows/s] migrating stragglers",
|
||||||
|
totalDone, rowsPerSecond,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Attempt to merge any sqlite write-ahead-log.
|
// Attempt to merge any sqlite write-ahead-log.
|
||||||
|
|
@ -173,6 +259,13 @@ func init() {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Info(ctx, "dropping temporary thread_id_new index")
|
||||||
|
if _, err := db.NewDropIndex().
|
||||||
|
Index("statuses_thread_id_new_idx").
|
||||||
|
Exec(ctx); err != nil {
|
||||||
|
return gtserror.Newf("error dropping temporary thread_id_new index: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
log.Info(ctx, "dropping old thread_to_statuses table")
|
log.Info(ctx, "dropping old thread_to_statuses table")
|
||||||
if _, err := db.NewDropTable().
|
if _, err := db.NewDropTable().
|
||||||
Table("thread_to_statuses").
|
Table("thread_to_statuses").
|
||||||
|
|
@ -180,33 +273,6 @@ func init() {
|
||||||
return gtserror.Newf("error dropping old thread_to_statuses table: %w", err)
|
return gtserror.Newf("error dropping old thread_to_statuses table: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info(ctx, "creating new statuses thread_id column")
|
|
||||||
if _, err := db.NewAddColumn().
|
|
||||||
Table("statuses").
|
|
||||||
ColumnExpr(newColDef).
|
|
||||||
Exec(ctx); err != nil {
|
|
||||||
return gtserror.Newf("error adding new thread_id column: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Info(ctx, "setting thread_id_new = thread_id (this may take a while...)")
|
|
||||||
if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
|
|
||||||
return batchUpdateByID(ctx, tx,
|
|
||||||
"statuses", // table
|
|
||||||
"id", // batchByCol
|
|
||||||
"UPDATE ? SET ? = ?", // updateQuery
|
|
||||||
[]any{bun.Ident("statuses"),
|
|
||||||
bun.Ident("thread_id_new"),
|
|
||||||
bun.Ident("thread_id")},
|
|
||||||
)
|
|
||||||
}); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Attempt to merge any sqlite write-ahead-log.
|
|
||||||
if err := doWALCheckpoint(ctx, db); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Info(ctx, "dropping old statuses thread_id index")
|
log.Info(ctx, "dropping old statuses thread_id index")
|
||||||
if _, err := db.NewDropIndex().
|
if _, err := db.NewDropIndex().
|
||||||
Index("statuses_thread_id_idx").
|
Index("statuses_thread_id_idx").
|
||||||
|
|
@ -274,6 +340,11 @@ type statusRethreader struct {
|
||||||
// its contents are ephemeral.
|
// its contents are ephemeral.
|
||||||
statuses []*oldmodel.Status
|
statuses []*oldmodel.Status
|
||||||
|
|
||||||
|
// newThreadIDSet is used to track whether
|
||||||
|
// statuses in statusIDs have already have
|
||||||
|
// thread_id_new set on them.
|
||||||
|
newThreadIDSet map[string]struct{}
|
||||||
|
|
||||||
// seenIDs tracks the unique status and
|
// seenIDs tracks the unique status and
|
||||||
// thread IDs we have seen, ensuring we
|
// thread IDs we have seen, ensuring we
|
||||||
// don't append duplicates to statusIDs
|
// don't append duplicates to statusIDs
|
||||||
|
|
@ -289,14 +360,15 @@ type statusRethreader struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// rethreadStatus is the main logic handler for statusRethreader{}. this is what gets called from the migration
|
// rethreadStatus is the main logic handler for statusRethreader{}. this is what gets called from the migration
|
||||||
// in order to trigger a status rethreading operation for the given status, returning total number rethreaded.
|
// in order to trigger a status rethreading operation for the given status, returning total number of rows changed.
|
||||||
func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, status *oldmodel.Status) (int, error) {
|
func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, status *oldmodel.Status, straggler bool) (int64, error) {
|
||||||
|
|
||||||
// Zero slice and
|
// Zero slice and
|
||||||
// map ptr values.
|
// map ptr values.
|
||||||
clear(sr.statusIDs)
|
clear(sr.statusIDs)
|
||||||
clear(sr.threadIDs)
|
clear(sr.threadIDs)
|
||||||
clear(sr.statuses)
|
clear(sr.statuses)
|
||||||
|
clear(sr.newThreadIDSet)
|
||||||
clear(sr.seenIDs)
|
clear(sr.seenIDs)
|
||||||
|
|
||||||
// Reset slices and values for use.
|
// Reset slices and values for use.
|
||||||
|
|
@ -305,6 +377,11 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu
|
||||||
sr.statuses = sr.statuses[:0]
|
sr.statuses = sr.statuses[:0]
|
||||||
sr.allThreaded = true
|
sr.allThreaded = true
|
||||||
|
|
||||||
|
if sr.newThreadIDSet == nil {
|
||||||
|
// Allocate new hash set for newThreadIDSet.
|
||||||
|
sr.newThreadIDSet = make(map[string]struct{})
|
||||||
|
}
|
||||||
|
|
||||||
if sr.seenIDs == nil {
|
if sr.seenIDs == nil {
|
||||||
// Allocate new hash set for status IDs.
|
// Allocate new hash set for status IDs.
|
||||||
sr.seenIDs = make(map[string]struct{})
|
sr.seenIDs = make(map[string]struct{})
|
||||||
|
|
@ -317,12 +394,22 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu
|
||||||
// to the rethreadStatus() call.
|
// to the rethreadStatus() call.
|
||||||
if err := tx.NewSelect().
|
if err := tx.NewSelect().
|
||||||
Model(status).
|
Model(status).
|
||||||
Column("in_reply_to_id", "thread_id").
|
Column("in_reply_to_id", "thread_id", "thread_id_new").
|
||||||
Where("? = ?", bun.Ident("id"), status.ID).
|
Where("? = ?", bun.Ident("id"), status.ID).
|
||||||
Scan(ctx); err != nil {
|
Scan(ctx); err != nil {
|
||||||
return 0, gtserror.Newf("error selecting status: %w", err)
|
return 0, gtserror.Newf("error selecting status: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If we've just threaded this status by setting
|
||||||
|
// thread_id_new, then by definition anything we
|
||||||
|
// could find from the entire thread must now be
|
||||||
|
// threaded, so we can save some database calls
|
||||||
|
// by skipping iterating up + down from here.
|
||||||
|
if status.ThreadIDNew != id.Lowest {
|
||||||
|
log.Debugf(ctx, "skipping just rethreaded status: %s", status.ID)
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
// status and thread ID cursor
|
// status and thread ID cursor
|
||||||
// index values. these are used
|
// index values. these are used
|
||||||
// to keep track of newly loaded
|
// to keep track of newly loaded
|
||||||
|
|
@ -371,14 +458,14 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu
|
||||||
threadIdx = len(sr.threadIDs)
|
threadIdx = len(sr.threadIDs)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Total number of
|
|
||||||
// statuses threaded.
|
|
||||||
total := len(sr.statusIDs)
|
|
||||||
|
|
||||||
// Check for the case where the entire
|
// Check for the case where the entire
|
||||||
// batch of statuses is already correctly
|
// batch of statuses is already correctly
|
||||||
// threaded. Then we have nothing to do!
|
// threaded. Then we have nothing to do!
|
||||||
if sr.allThreaded && len(sr.threadIDs) == 1 {
|
//
|
||||||
|
// Skip this check for straggler statuses
|
||||||
|
// that are part of broken threads.
|
||||||
|
if !straggler && sr.allThreaded && len(sr.threadIDs) == 1 {
|
||||||
|
log.Debug(ctx, "skipping just rethreaded thread")
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -417,36 +504,120 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update all the statuses to
|
var (
|
||||||
// use determined thread_id.
|
res sql.Result
|
||||||
if _, err := tx.NewUpdate().
|
err error
|
||||||
Table("statuses").
|
)
|
||||||
Where("? IN (?)", bun.Ident("id"), bun.In(sr.statusIDs)).
|
|
||||||
Set("? = ?", bun.Ident("thread_id"), threadID).
|
if len(sr.statusIDs) == 1 {
|
||||||
Exec(ctx); err != nil {
|
|
||||||
|
// If we're only updating one status
|
||||||
|
// we can use a simple update query.
|
||||||
|
res, err = tx.NewUpdate().
|
||||||
|
// Update the status model.
|
||||||
|
TableExpr("? AS ?", bun.Ident("statuses"), bun.Ident("status")).
|
||||||
|
// Set the new thread ID, which we can use as
|
||||||
|
// an indication that we've migrated this batch.
|
||||||
|
Set("? = ?", bun.Ident("thread_id_new"), threadID).
|
||||||
|
// While we're here, also set old thread_id, as
|
||||||
|
// we'll use it for further rethreading purposes.
|
||||||
|
Set("? = ?", bun.Ident("thread_id"), threadID).
|
||||||
|
Where("? = ?", bun.Ident("status.id"), sr.statusIDs[0]).
|
||||||
|
Exec(ctx)
|
||||||
|
|
||||||
|
} else {
|
||||||
|
|
||||||
|
// If we're updating multiple statuses at once,
|
||||||
|
// build up a common table expression to update
|
||||||
|
// all statuses in this thread to use threadID.
|
||||||
|
//
|
||||||
|
// This ought to be a little more readable than
|
||||||
|
// using an "IN(*)" query, and PG or SQLite *may*
|
||||||
|
// be able to optimize it better.
|
||||||
|
//
|
||||||
|
// See:
|
||||||
|
//
|
||||||
|
// - https://sqlite.org/lang_with.html
|
||||||
|
// - https://www.postgresql.org/docs/current/queries-with.html
|
||||||
|
// - https://bun.uptrace.dev/guide/query-update.html#bulk-update
|
||||||
|
values := make([]*util.Status, 0, len(sr.statusIDs))
|
||||||
|
for _, statusID := range sr.statusIDs {
|
||||||
|
// Filter out statusIDs that have already had
|
||||||
|
// thread_id_new set, to avoid spurious writes.
|
||||||
|
if _, set := sr.newThreadIDSet[statusID]; !set {
|
||||||
|
values = append(values, &util.Status{
|
||||||
|
ID: statusID,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Resulting query will look something like this:
|
||||||
|
//
|
||||||
|
// WITH "_data" ("id") AS (
|
||||||
|
// VALUES
|
||||||
|
// ('01JR6PZED0DDR2VZHQ8H87ZW98'),
|
||||||
|
// ('01JR6PZED0J91MJCAFDTCCCG8Q')
|
||||||
|
// )
|
||||||
|
// UPDATE "statuses" AS "status"
|
||||||
|
// SET
|
||||||
|
// "thread_id_new" = '01K6MGKX54BBJ3Y1FBPQY45E5P',
|
||||||
|
// "thread_id" = '01K6MGKX54BBJ3Y1FBPQY45E5P'
|
||||||
|
// FROM _data
|
||||||
|
// WHERE "status"."id" = "_data"."id"
|
||||||
|
res, err = tx.NewUpdate().
|
||||||
|
// Update the status model.
|
||||||
|
Model((*oldmodel.Status)(nil)).
|
||||||
|
// Provide the CTE values as "_data".
|
||||||
|
With("_data", tx.NewValues(&values)).
|
||||||
|
// Include `FROM _data` statement so we can use
|
||||||
|
// `_data` table in SET and WHERE components.
|
||||||
|
TableExpr("_data").
|
||||||
|
// Set the new thread ID, which we can use as
|
||||||
|
// an indication that we've migrated this batch.
|
||||||
|
Set("? = ?", bun.Ident("thread_id_new"), threadID).
|
||||||
|
// While we're here, also set old thread_id, as
|
||||||
|
// we'll use it for further rethreading purposes.
|
||||||
|
Set("? = ?", bun.Ident("thread_id"), threadID).
|
||||||
|
// "Join" to the CTE on status ID.
|
||||||
|
Where("? = ?", bun.Ident("status.id"), bun.Ident("_data.id")).
|
||||||
|
Exec(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
return 0, gtserror.Newf("error updating status thread ids: %w", err)
|
return 0, gtserror.Newf("error updating status thread ids: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rowsAffected, err := res.RowsAffected()
|
||||||
|
if err != nil {
|
||||||
|
return 0, gtserror.Newf("error counting rows affected: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
if len(sr.threadIDs) > 0 {
|
if len(sr.threadIDs) > 0 {
|
||||||
// Update any existing thread
|
// Update any existing thread
|
||||||
// mutes to use latest thread_id.
|
// mutes to use latest thread_id.
|
||||||
|
|
||||||
|
// Dedupe thread IDs before query
|
||||||
|
// to avoid ludicrous "IN" clause.
|
||||||
|
threadIDs := sr.threadIDs
|
||||||
|
threadIDs = xslices.Deduplicate(threadIDs)
|
||||||
if _, err := tx.NewUpdate().
|
if _, err := tx.NewUpdate().
|
||||||
Table("thread_mutes").
|
Table("thread_mutes").
|
||||||
Where("? IN (?)", bun.Ident("thread_id"), bun.In(sr.threadIDs)).
|
Where("? IN (?)", bun.Ident("thread_id"), bun.In(threadIDs)).
|
||||||
Set("? = ?", bun.Ident("thread_id"), threadID).
|
Set("? = ?", bun.Ident("thread_id"), threadID).
|
||||||
Exec(ctx); err != nil {
|
Exec(ctx); err != nil {
|
||||||
return 0, gtserror.Newf("error updating mute thread ids: %w", err)
|
return 0, gtserror.Newf("error updating mute thread ids: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return total, nil
|
return rowsAffected, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// append will append the given status to the internal tracking of statusRethreader{} for
|
// append will append the given status to the internal tracking of statusRethreader{} for
|
||||||
// potential future operations, checking for uniqueness. it tracks the inReplyToID value
|
// potential future operations, checking for uniqueness. it tracks the inReplyToID value
|
||||||
// for the next call to getParents(), it tracks the status ID for list of statuses that
|
// for the next call to getParents(), it tracks the status ID for list of statuses that
|
||||||
// need updating, the thread ID for the list of thread links and mutes that need updating,
|
// may need updating, whether a new thread ID has been set for each status, the thread ID
|
||||||
// and whether all the statuses all have a provided thread ID (i.e. allThreaded).
|
// for the list of thread links and mutes that need updating, and whether all the statuses
|
||||||
|
// all have a provided thread ID (i.e. allThreaded).
|
||||||
func (sr *statusRethreader) append(status *oldmodel.Status) {
|
func (sr *statusRethreader) append(status *oldmodel.Status) {
|
||||||
|
|
||||||
// Check if status already seen before.
|
// Check if status already seen before.
|
||||||
|
|
@ -479,7 +650,14 @@ func (sr *statusRethreader) append(status *oldmodel.Status) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add status ID to map of seen IDs.
|
// Add status ID to map of seen IDs.
|
||||||
sr.seenIDs[status.ID] = struct{}{}
|
mark := struct{}{}
|
||||||
|
sr.seenIDs[status.ID] = mark
|
||||||
|
|
||||||
|
// If new thread ID has already been
|
||||||
|
// set, add status ID to map of set IDs.
|
||||||
|
if status.ThreadIDNew != id.Lowest {
|
||||||
|
sr.newThreadIDSet[status.ID] = mark
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sr *statusRethreader) getParents(ctx context.Context, tx bun.Tx) error {
|
func (sr *statusRethreader) getParents(ctx context.Context, tx bun.Tx) error {
|
||||||
|
|
@ -496,7 +674,7 @@ func (sr *statusRethreader) getParents(ctx context.Context, tx bun.Tx) error {
|
||||||
// Select next parent status.
|
// Select next parent status.
|
||||||
if err := tx.NewSelect().
|
if err := tx.NewSelect().
|
||||||
Model(&parent).
|
Model(&parent).
|
||||||
Column("id", "in_reply_to_id", "thread_id").
|
Column("id", "in_reply_to_id", "thread_id", "thread_id_new").
|
||||||
Where("? = ?", bun.Ident("id"), id).
|
Where("? = ?", bun.Ident("id"), id).
|
||||||
Scan(ctx); err != nil && err != db.ErrNoEntries {
|
Scan(ctx); err != nil && err != db.ErrNoEntries {
|
||||||
return err
|
return err
|
||||||
|
|
@ -535,7 +713,7 @@ func (sr *statusRethreader) getChildren(ctx context.Context, tx bun.Tx, idx int)
|
||||||
// Select children of ID.
|
// Select children of ID.
|
||||||
if err := tx.NewSelect().
|
if err := tx.NewSelect().
|
||||||
Model(&sr.statuses).
|
Model(&sr.statuses).
|
||||||
Column("id", "thread_id").
|
Column("id", "thread_id", "thread_id_new").
|
||||||
Where("? = ?", bun.Ident("in_reply_to_id"), id).
|
Where("? = ?", bun.Ident("in_reply_to_id"), id).
|
||||||
Scan(ctx); err != nil && err != db.ErrNoEntries {
|
Scan(ctx); err != nil && err != db.ErrNoEntries {
|
||||||
return err
|
return err
|
||||||
|
|
@ -560,14 +738,19 @@ func (sr *statusRethreader) getStragglers(ctx context.Context, tx bun.Tx, idx in
|
||||||
clear(sr.statuses)
|
clear(sr.statuses)
|
||||||
sr.statuses = sr.statuses[:0]
|
sr.statuses = sr.statuses[:0]
|
||||||
|
|
||||||
|
// Dedupe thread IDs before query
|
||||||
|
// to avoid ludicrous "IN" clause.
|
||||||
|
threadIDs := sr.threadIDs[idx:]
|
||||||
|
threadIDs = xslices.Deduplicate(threadIDs)
|
||||||
|
|
||||||
// Select stragglers that
|
// Select stragglers that
|
||||||
// also have thread IDs.
|
// also have thread IDs.
|
||||||
if err := tx.NewSelect().
|
if err := tx.NewSelect().
|
||||||
Model(&sr.statuses).
|
Model(&sr.statuses).
|
||||||
Column("id", "thread_id", "in_reply_to_id").
|
Column("id", "thread_id", "in_reply_to_id", "thread_id_new").
|
||||||
Where("? IN (?) AND ? NOT IN (?)",
|
Where("? IN (?) AND ? NOT IN (?)",
|
||||||
bun.Ident("thread_id"),
|
bun.Ident("thread_id"),
|
||||||
bun.In(sr.threadIDs[idx:]),
|
bun.In(threadIDs),
|
||||||
bun.Ident("id"),
|
bun.Ident("id"),
|
||||||
bun.In(sr.statusIDs),
|
bun.In(sr.statusIDs),
|
||||||
).
|
).
|
||||||
|
|
|
||||||
|
|
@ -23,45 +23,45 @@ import (
|
||||||
|
|
||||||
// Status represents a user-created 'post' or 'status' in the database, either remote or local
|
// Status represents a user-created 'post' or 'status' in the database, either remote or local
|
||||||
type Status struct {
|
type Status struct {
|
||||||
ID string `bun:"type:CHAR(26),pk,nullzero,notnull,unique"` // id of this item in the database
|
ID string `bun:"type:CHAR(26),pk,nullzero,notnull,unique"` // id of this item in the database
|
||||||
CreatedAt time.Time `bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"` // when was item created
|
CreatedAt time.Time `bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"` // when was item created
|
||||||
EditedAt time.Time `bun:"type:timestamptz,nullzero"` // when this status was last edited (if set)
|
EditedAt time.Time `bun:"type:timestamptz,nullzero"` // when this status was last edited (if set)
|
||||||
FetchedAt time.Time `bun:"type:timestamptz,nullzero"` // when was item (remote) last fetched.
|
FetchedAt time.Time `bun:"type:timestamptz,nullzero"` // when was item (remote) last fetched.
|
||||||
PinnedAt time.Time `bun:"type:timestamptz,nullzero"` // Status was pinned by owning account at this time.
|
PinnedAt time.Time `bun:"type:timestamptz,nullzero"` // Status was pinned by owning account at this time.
|
||||||
URI string `bun:",unique,nullzero,notnull"` // activitypub URI of this status
|
URI string `bun:",unique,nullzero,notnull"` // activitypub URI of this status
|
||||||
URL string `bun:",nullzero"` // web url for viewing this status
|
URL string `bun:",nullzero"` // web url for viewing this status
|
||||||
Content string `bun:""` // Content HTML for this status.
|
Content string `bun:""` // Content HTML for this status.
|
||||||
AttachmentIDs []string `bun:"attachments,array"` // Database IDs of any media attachments associated with this status
|
AttachmentIDs []string `bun:"attachments,array"` // Database IDs of any media attachments associated with this status
|
||||||
TagIDs []string `bun:"tags,array"` // Database IDs of any tags used in this status
|
TagIDs []string `bun:"tags,array"` // Database IDs of any tags used in this status
|
||||||
MentionIDs []string `bun:"mentions,array"` // Database IDs of any mentions in this status
|
MentionIDs []string `bun:"mentions,array"` // Database IDs of any mentions in this status
|
||||||
EmojiIDs []string `bun:"emojis,array"` // Database IDs of any emojis used in this status
|
EmojiIDs []string `bun:"emojis,array"` // Database IDs of any emojis used in this status
|
||||||
Local *bool `bun:",nullzero,notnull,default:false"` // is this status from a local account?
|
Local *bool `bun:",nullzero,notnull,default:false"` // is this status from a local account?
|
||||||
AccountID string `bun:"type:CHAR(26),nullzero,notnull"` // which account posted this status?
|
AccountID string `bun:"type:CHAR(26),nullzero,notnull"` // which account posted this status?
|
||||||
AccountURI string `bun:",nullzero,notnull"` // activitypub uri of the owner of this status
|
AccountURI string `bun:",nullzero,notnull"` // activitypub uri of the owner of this status
|
||||||
InReplyToID string `bun:"type:CHAR(26),nullzero"` // id of the status this status replies to
|
InReplyToID string `bun:"type:CHAR(26),nullzero"` // id of the status this status replies to
|
||||||
InReplyToURI string `bun:",nullzero"` // activitypub uri of the status this status is a reply to
|
InReplyToURI string `bun:",nullzero"` // activitypub uri of the status this status is a reply to
|
||||||
InReplyToAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that this status replies to
|
InReplyToAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that this status replies to
|
||||||
InReplyTo *Status `bun:"-"` // status corresponding to inReplyToID
|
InReplyTo *Status `bun:"-"` // status corresponding to inReplyToID
|
||||||
BoostOfID string `bun:"type:CHAR(26),nullzero"` // id of the status this status is a boost of
|
BoostOfID string `bun:"type:CHAR(26),nullzero"` // id of the status this status is a boost of
|
||||||
BoostOfURI string `bun:"-"` // URI of the status this status is a boost of; field not inserted in the db, just for dereferencing purposes.
|
BoostOfURI string `bun:"-"` // URI of the status this status is a boost of; field not inserted in the db, just for dereferencing purposes.
|
||||||
BoostOfAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that owns the boosted status
|
BoostOfAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that owns the boosted status
|
||||||
BoostOf *Status `bun:"-"` // status that corresponds to boostOfID
|
BoostOf *Status `bun:"-"` // status that corresponds to boostOfID
|
||||||
ThreadID string `bun:"type:CHAR(26),nullzero,notnull,default:00000000000000000000000000"` // id of the thread to which this status belongs
|
ThreadID string `bun:"type:CHAR(26),nullzero,notnull,default:'00000000000000000000000000'"` // id of the thread to which this status belongs
|
||||||
EditIDs []string `bun:"edits,array"` //
|
EditIDs []string `bun:"edits,array"` //
|
||||||
PollID string `bun:"type:CHAR(26),nullzero"` //
|
PollID string `bun:"type:CHAR(26),nullzero"` //
|
||||||
ContentWarning string `bun:",nullzero"` // Content warning HTML for this status.
|
ContentWarning string `bun:",nullzero"` // Content warning HTML for this status.
|
||||||
ContentWarningText string `bun:""` // Original text of the content warning without formatting
|
ContentWarningText string `bun:""` // Original text of the content warning without formatting
|
||||||
Visibility Visibility `bun:",nullzero,notnull"` // visibility entry for this status
|
Visibility Visibility `bun:",nullzero,notnull"` // visibility entry for this status
|
||||||
Sensitive *bool `bun:",nullzero,notnull,default:false"` // mark the status as sensitive?
|
Sensitive *bool `bun:",nullzero,notnull,default:false"` // mark the status as sensitive?
|
||||||
Language string `bun:",nullzero"` // what language is this status written in?
|
Language string `bun:",nullzero"` // what language is this status written in?
|
||||||
CreatedWithApplicationID string `bun:"type:CHAR(26),nullzero"` // Which application was used to create this status?
|
CreatedWithApplicationID string `bun:"type:CHAR(26),nullzero"` // Which application was used to create this status?
|
||||||
ActivityStreamsType string `bun:",nullzero,notnull"` // What is the activitystreams type of this status? See: https://www.w3.org/TR/activitystreams-vocabulary/#object-types. Will probably almost always be Note but who knows!.
|
ActivityStreamsType string `bun:",nullzero,notnull"` // What is the activitystreams type of this status? See: https://www.w3.org/TR/activitystreams-vocabulary/#object-types. Will probably almost always be Note but who knows!.
|
||||||
Text string `bun:""` // Original text of the status without formatting
|
Text string `bun:""` // Original text of the status without formatting
|
||||||
ContentType StatusContentType `bun:",nullzero"` // Content type used to process the original text of the status
|
ContentType StatusContentType `bun:",nullzero"` // Content type used to process the original text of the status
|
||||||
Federated *bool `bun:",notnull"` // This status will be federated beyond the local timeline(s)
|
Federated *bool `bun:",notnull"` // This status will be federated beyond the local timeline(s)
|
||||||
PendingApproval *bool `bun:",nullzero,notnull,default:false"` // If true then status is a reply or boost wrapper that must be Approved by the reply-ee or boost-ee before being fully distributed.
|
PendingApproval *bool `bun:",nullzero,notnull,default:false"` // If true then status is a reply or boost wrapper that must be Approved by the reply-ee or boost-ee before being fully distributed.
|
||||||
PreApproved bool `bun:"-"` // If true, then status is a reply to or boost wrapper of a status on our instance, has permission to do the interaction, and an Accept should be sent out for it immediately. Field not stored in the DB.
|
PreApproved bool `bun:"-"` // If true, then status is a reply to or boost wrapper of a status on our instance, has permission to do the interaction, and an Accept should be sent out for it immediately. Field not stored in the DB.
|
||||||
ApprovedByURI string `bun:",nullzero"` // URI of an Accept Activity that approves the Announce or Create Activity that this status was/will be attached to.
|
ApprovedByURI string `bun:",nullzero"` // URI of an Accept Activity that approves the Announce or Create Activity that this status was/will be attached to.
|
||||||
}
|
}
|
||||||
|
|
||||||
// enumType is the type we (at least, should) use
|
// enumType is the type we (at least, should) use
|
||||||
|
|
|
||||||
|
|
@ -21,7 +21,10 @@ import (
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Status represents a user-created 'post' or 'status' in the database, either remote or local
|
// Status represents a user-created 'post' or 'status' in the database, either remote or local.
|
||||||
|
//
|
||||||
|
// Note: this model differs from an exact representation of the old model at the time of migration,
|
||||||
|
// as it includes the intermediate field "ThreadIDNew", which is only used during the migration.
|
||||||
type Status struct {
|
type Status struct {
|
||||||
ID string `bun:"type:CHAR(26),pk,nullzero,notnull,unique"` // id of this item in the database
|
ID string `bun:"type:CHAR(26),pk,nullzero,notnull,unique"` // id of this item in the database
|
||||||
CreatedAt time.Time `bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"` // when was item created
|
CreatedAt time.Time `bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"` // when was item created
|
||||||
|
|
@ -60,6 +63,9 @@ type Status struct {
|
||||||
PendingApproval *bool `bun:",nullzero,notnull,default:false"` // If true then status is a reply or boost wrapper that must be Approved by the reply-ee or boost-ee before being fully distributed.
|
PendingApproval *bool `bun:",nullzero,notnull,default:false"` // If true then status is a reply or boost wrapper that must be Approved by the reply-ee or boost-ee before being fully distributed.
|
||||||
PreApproved bool `bun:"-"` // If true, then status is a reply to or boost wrapper of a status on our instance, has permission to do the interaction, and an Accept should be sent out for it immediately. Field not stored in the DB.
|
PreApproved bool `bun:"-"` // If true, then status is a reply to or boost wrapper of a status on our instance, has permission to do the interaction, and an Accept should be sent out for it immediately. Field not stored in the DB.
|
||||||
ApprovedByURI string `bun:",nullzero"` // URI of an Accept Activity that approves the Announce or Create Activity that this status was/will be attached to.
|
ApprovedByURI string `bun:",nullzero"` // URI of an Accept Activity that approves the Announce or Create Activity that this status was/will be attached to.
|
||||||
|
|
||||||
|
// This field is *only* used during the migration, it was not on the original status model.
|
||||||
|
ThreadIDNew string `bun:"type:CHAR(26),nullzero,notnull,default:'00000000000000000000000000'"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// enumType is the type we (at least, should) use
|
// enumType is the type we (at least, should) use
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,24 @@
|
||||||
|
// GoToSocial
|
||||||
|
// Copyright (C) GoToSocial Authors admin@gotosocial.org
|
||||||
|
// SPDX-License-Identifier: AGPL-3.0-or-later
|
||||||
|
//
|
||||||
|
// This program is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Affero General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// This program is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Affero General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU Affero General Public License
|
||||||
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package util
|
||||||
|
|
||||||
|
// Status is a helper type specifically
|
||||||
|
// for updating the thread ID of a status.
|
||||||
|
type Status struct {
|
||||||
|
ID string `bun:"type:CHAR(26)"`
|
||||||
|
}
|
||||||
|
|
@ -66,98 +66,6 @@ func doWALCheckpoint(ctx context.Context, db *bun.DB) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// batchUpdateByID performs the given updateQuery with updateArgs
|
|
||||||
// over the entire given table, batching by the ID of batchByCol.
|
|
||||||
func batchUpdateByID(
|
|
||||||
ctx context.Context,
|
|
||||||
tx bun.Tx,
|
|
||||||
table string,
|
|
||||||
batchByCol string,
|
|
||||||
updateQuery string,
|
|
||||||
updateArgs []any,
|
|
||||||
) error {
|
|
||||||
// Get a count of all in table.
|
|
||||||
total, err := tx.NewSelect().
|
|
||||||
Table(table).
|
|
||||||
Count(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return gtserror.Newf("error selecting total count: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Query batch size
|
|
||||||
// in number of rows.
|
|
||||||
const batchsz = 5000
|
|
||||||
|
|
||||||
// Stores highest batch value
|
|
||||||
// used in iterate queries,
|
|
||||||
// starting at highest possible.
|
|
||||||
highest := id.Highest
|
|
||||||
|
|
||||||
// Total updated rows.
|
|
||||||
var updated int
|
|
||||||
|
|
||||||
for {
|
|
||||||
// Limit to batchsz
|
|
||||||
// items at once.
|
|
||||||
batchQ := tx.
|
|
||||||
NewSelect().
|
|
||||||
Table(table).
|
|
||||||
Column(batchByCol).
|
|
||||||
Where("? < ?", bun.Ident(batchByCol), highest).
|
|
||||||
OrderExpr("? DESC", bun.Ident(batchByCol)).
|
|
||||||
Limit(batchsz)
|
|
||||||
|
|
||||||
// Finalize UPDATE to act only on batch.
|
|
||||||
qStr := updateQuery + " WHERE ? IN (?)"
|
|
||||||
args := append(slices.Clone(updateArgs),
|
|
||||||
bun.Ident(batchByCol),
|
|
||||||
batchQ,
|
|
||||||
)
|
|
||||||
|
|
||||||
// Execute the prepared raw query with arguments.
|
|
||||||
res, err := tx.NewRaw(qStr, args...).Exec(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return gtserror.Newf("error updating old column values: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check how many items we updated.
|
|
||||||
thisUpdated, err := res.RowsAffected()
|
|
||||||
if err != nil {
|
|
||||||
return gtserror.Newf("error counting affected rows: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if thisUpdated == 0 {
|
|
||||||
// Nothing updated
|
|
||||||
// means we're done.
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update the overall count.
|
|
||||||
updated += int(thisUpdated)
|
|
||||||
|
|
||||||
// Log helpful message to admin.
|
|
||||||
log.Infof(ctx, "migrated %d of %d %s (up to %s)",
|
|
||||||
updated, total, table, highest)
|
|
||||||
|
|
||||||
// Get next highest
|
|
||||||
// id for next batch.
|
|
||||||
if err := tx.
|
|
||||||
NewSelect().
|
|
||||||
With("batch_query", batchQ).
|
|
||||||
ColumnExpr("min(?) FROM ?", bun.Ident(batchByCol), bun.Ident("batch_query")).
|
|
||||||
Scan(ctx, &highest); err != nil {
|
|
||||||
return gtserror.Newf("error selecting next highest: %w", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if total != int(updated) {
|
|
||||||
// Return error here in order to rollback the whole transaction.
|
|
||||||
return fmt.Errorf("total=%d does not match updated=%d", total, updated)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// convertEnums performs a transaction that converts
|
// convertEnums performs a transaction that converts
|
||||||
// a table's column of our old-style enums (strings) to
|
// a table's column of our old-style enums (strings) to
|
||||||
// more performant and space-saving integer types.
|
// more performant and space-saving integer types.
|
||||||
|
|
|
||||||
|
|
@ -27,56 +27,56 @@ import (
|
||||||
|
|
||||||
// Status represents a user-created 'post' or 'status' in the database, either remote or local
|
// Status represents a user-created 'post' or 'status' in the database, either remote or local
|
||||||
type Status struct {
|
type Status struct {
|
||||||
ID string `bun:"type:CHAR(26),pk,nullzero,notnull,unique"` // id of this item in the database
|
ID string `bun:"type:CHAR(26),pk,nullzero,notnull,unique"` // id of this item in the database
|
||||||
CreatedAt time.Time `bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"` // when was item created
|
CreatedAt time.Time `bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"` // when was item created
|
||||||
EditedAt time.Time `bun:"type:timestamptz,nullzero"` // when this status was last edited (if set)
|
EditedAt time.Time `bun:"type:timestamptz,nullzero"` // when this status was last edited (if set)
|
||||||
FetchedAt time.Time `bun:"type:timestamptz,nullzero"` // when was item (remote) last fetched.
|
FetchedAt time.Time `bun:"type:timestamptz,nullzero"` // when was item (remote) last fetched.
|
||||||
PinnedAt time.Time `bun:"type:timestamptz,nullzero"` // Status was pinned by owning account at this time.
|
PinnedAt time.Time `bun:"type:timestamptz,nullzero"` // Status was pinned by owning account at this time.
|
||||||
URI string `bun:",unique,nullzero,notnull"` // activitypub URI of this status
|
URI string `bun:",unique,nullzero,notnull"` // activitypub URI of this status
|
||||||
URL string `bun:",nullzero"` // web url for viewing this status
|
URL string `bun:",nullzero"` // web url for viewing this status
|
||||||
Content string `bun:""` // Content HTML for this status.
|
Content string `bun:""` // Content HTML for this status.
|
||||||
AttachmentIDs []string `bun:"attachments,array"` // Database IDs of any media attachments associated with this status
|
AttachmentIDs []string `bun:"attachments,array"` // Database IDs of any media attachments associated with this status
|
||||||
Attachments []*MediaAttachment `bun:"attached_media,rel:has-many"` // Attachments corresponding to attachmentIDs
|
Attachments []*MediaAttachment `bun:"attached_media,rel:has-many"` // Attachments corresponding to attachmentIDs
|
||||||
TagIDs []string `bun:"tags,array"` // Database IDs of any tags used in this status
|
TagIDs []string `bun:"tags,array"` // Database IDs of any tags used in this status
|
||||||
Tags []*Tag `bun:"attached_tags,m2m:status_to_tags"` // Tags corresponding to tagIDs. https://bun.uptrace.dev/guide/relations.html#many-to-many-relation
|
Tags []*Tag `bun:"attached_tags,m2m:status_to_tags"` // Tags corresponding to tagIDs. https://bun.uptrace.dev/guide/relations.html#many-to-many-relation
|
||||||
MentionIDs []string `bun:"mentions,array"` // Database IDs of any mentions in this status
|
MentionIDs []string `bun:"mentions,array"` // Database IDs of any mentions in this status
|
||||||
Mentions []*Mention `bun:"attached_mentions,rel:has-many"` // Mentions corresponding to mentionIDs
|
Mentions []*Mention `bun:"attached_mentions,rel:has-many"` // Mentions corresponding to mentionIDs
|
||||||
EmojiIDs []string `bun:"emojis,array"` // Database IDs of any emojis used in this status
|
EmojiIDs []string `bun:"emojis,array"` // Database IDs of any emojis used in this status
|
||||||
Emojis []*Emoji `bun:"attached_emojis,m2m:status_to_emojis"` // Emojis corresponding to emojiIDs. https://bun.uptrace.dev/guide/relations.html#many-to-many-relation
|
Emojis []*Emoji `bun:"attached_emojis,m2m:status_to_emojis"` // Emojis corresponding to emojiIDs. https://bun.uptrace.dev/guide/relations.html#many-to-many-relation
|
||||||
Local *bool `bun:",nullzero,notnull,default:false"` // is this status from a local account?
|
Local *bool `bun:",nullzero,notnull,default:false"` // is this status from a local account?
|
||||||
AccountID string `bun:"type:CHAR(26),nullzero,notnull"` // which account posted this status?
|
AccountID string `bun:"type:CHAR(26),nullzero,notnull"` // which account posted this status?
|
||||||
Account *Account `bun:"rel:belongs-to"` // account corresponding to accountID
|
Account *Account `bun:"rel:belongs-to"` // account corresponding to accountID
|
||||||
AccountURI string `bun:",nullzero,notnull"` // activitypub uri of the owner of this status
|
AccountURI string `bun:",nullzero,notnull"` // activitypub uri of the owner of this status
|
||||||
InReplyToID string `bun:"type:CHAR(26),nullzero"` // id of the status this status replies to
|
InReplyToID string `bun:"type:CHAR(26),nullzero"` // id of the status this status replies to
|
||||||
InReplyToURI string `bun:",nullzero"` // activitypub uri of the status this status is a reply to
|
InReplyToURI string `bun:",nullzero"` // activitypub uri of the status this status is a reply to
|
||||||
InReplyToAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that this status replies to
|
InReplyToAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that this status replies to
|
||||||
InReplyTo *Status `bun:"-"` // status corresponding to inReplyToID
|
InReplyTo *Status `bun:"-"` // status corresponding to inReplyToID
|
||||||
InReplyToAccount *Account `bun:"rel:belongs-to"` // account corresponding to inReplyToAccountID
|
InReplyToAccount *Account `bun:"rel:belongs-to"` // account corresponding to inReplyToAccountID
|
||||||
BoostOfID string `bun:"type:CHAR(26),nullzero"` // id of the status this status is a boost of
|
BoostOfID string `bun:"type:CHAR(26),nullzero"` // id of the status this status is a boost of
|
||||||
BoostOfURI string `bun:"-"` // URI of the status this status is a boost of; field not inserted in the db, just for dereferencing purposes.
|
BoostOfURI string `bun:"-"` // URI of the status this status is a boost of; field not inserted in the db, just for dereferencing purposes.
|
||||||
BoostOfAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that owns the boosted status
|
BoostOfAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that owns the boosted status
|
||||||
BoostOf *Status `bun:"-"` // status that corresponds to boostOfID
|
BoostOf *Status `bun:"-"` // status that corresponds to boostOfID
|
||||||
BoostOfAccount *Account `bun:"rel:belongs-to"` // account that corresponds to boostOfAccountID
|
BoostOfAccount *Account `bun:"rel:belongs-to"` // account that corresponds to boostOfAccountID
|
||||||
ThreadID string `bun:"type:CHAR(26),nullzero,notnull,default:00000000000000000000000000"` // id of the thread to which this status belongs
|
ThreadID string `bun:"type:CHAR(26),nullzero,notnull,default:'00000000000000000000000000'"` // id of the thread to which this status belongs
|
||||||
EditIDs []string `bun:"edits,array"` // IDs of status edits for this status, ordered from smallest (oldest) -> largest (newest) ID.
|
EditIDs []string `bun:"edits,array"` // IDs of status edits for this status, ordered from smallest (oldest) -> largest (newest) ID.
|
||||||
Edits []*StatusEdit `bun:"-"` // Edits of this status, ordered from oldest -> newest edit.
|
Edits []*StatusEdit `bun:"-"` // Edits of this status, ordered from oldest -> newest edit.
|
||||||
PollID string `bun:"type:CHAR(26),nullzero"` //
|
PollID string `bun:"type:CHAR(26),nullzero"` //
|
||||||
Poll *Poll `bun:"-"` //
|
Poll *Poll `bun:"-"` //
|
||||||
ContentWarning string `bun:",nullzero"` // Content warning HTML for this status.
|
ContentWarning string `bun:",nullzero"` // Content warning HTML for this status.
|
||||||
ContentWarningText string `bun:""` // Original text of the content warning without formatting
|
ContentWarningText string `bun:""` // Original text of the content warning without formatting
|
||||||
Visibility Visibility `bun:",nullzero,notnull"` // visibility entry for this status
|
Visibility Visibility `bun:",nullzero,notnull"` // visibility entry for this status
|
||||||
Sensitive *bool `bun:",nullzero,notnull,default:false"` // mark the status as sensitive?
|
Sensitive *bool `bun:",nullzero,notnull,default:false"` // mark the status as sensitive?
|
||||||
Language string `bun:",nullzero"` // what language is this status written in?
|
Language string `bun:",nullzero"` // what language is this status written in?
|
||||||
CreatedWithApplicationID string `bun:"type:CHAR(26),nullzero"` // Which application was used to create this status?
|
CreatedWithApplicationID string `bun:"type:CHAR(26),nullzero"` // Which application was used to create this status?
|
||||||
CreatedWithApplication *Application `bun:"rel:belongs-to"` // application corresponding to createdWithApplicationID
|
CreatedWithApplication *Application `bun:"rel:belongs-to"` // application corresponding to createdWithApplicationID
|
||||||
ActivityStreamsType string `bun:",nullzero,notnull"` // What is the activitystreams type of this status? See: https://www.w3.org/TR/activitystreams-vocabulary/#object-types. Will probably almost always be Note but who knows!.
|
ActivityStreamsType string `bun:",nullzero,notnull"` // What is the activitystreams type of this status? See: https://www.w3.org/TR/activitystreams-vocabulary/#object-types. Will probably almost always be Note but who knows!.
|
||||||
Text string `bun:""` // Original text of the status without formatting
|
Text string `bun:""` // Original text of the status without formatting
|
||||||
ContentType StatusContentType `bun:",nullzero"` // Content type used to process the original text of the status
|
ContentType StatusContentType `bun:",nullzero"` // Content type used to process the original text of the status
|
||||||
Federated *bool `bun:",notnull"` // This status will be federated beyond the local timeline(s)
|
Federated *bool `bun:",notnull"` // This status will be federated beyond the local timeline(s)
|
||||||
InteractionPolicy *InteractionPolicy `bun:""` // InteractionPolicy for this status. If null then the default InteractionPolicy should be assumed for this status's Visibility. Always null for boost wrappers.
|
InteractionPolicy *InteractionPolicy `bun:""` // InteractionPolicy for this status. If null then the default InteractionPolicy should be assumed for this status's Visibility. Always null for boost wrappers.
|
||||||
PendingApproval *bool `bun:",nullzero,notnull,default:false"` // If true then status is a reply or boost wrapper that must be Approved by the reply-ee or boost-ee before being fully distributed.
|
PendingApproval *bool `bun:",nullzero,notnull,default:false"` // If true then status is a reply or boost wrapper that must be Approved by the reply-ee or boost-ee before being fully distributed.
|
||||||
PreApproved bool `bun:"-"` // If true, then status is a reply to or boost wrapper of a status on our instance, has permission to do the interaction, and an Accept should be sent out for it immediately. Field not stored in the DB.
|
PreApproved bool `bun:"-"` // If true, then status is a reply to or boost wrapper of a status on our instance, has permission to do the interaction, and an Accept should be sent out for it immediately. Field not stored in the DB.
|
||||||
ApprovedByURI string `bun:",nullzero"` // URI of *either* an Accept Activity, or a ReplyAuthorization or AnnounceAuthorization, which approves the Announce, Create or interaction request Activity that this status was/will be attached to.
|
ApprovedByURI string `bun:",nullzero"` // URI of *either* an Accept Activity, or a ReplyAuthorization or AnnounceAuthorization, which approves the Announce, Create or interaction request Activity that this status was/will be attached to.
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetID implements timeline.Timelineable{}.
|
// GetID implements timeline.Timelineable{}.
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue