From ae8ac4dd6c56d5d3a562d6d03c9eb82334f8b876 Mon Sep 17 00:00:00 2001 From: tobi Date: Fri, 26 Sep 2025 15:14:26 +0200 Subject: [PATCH 01/11] finalize indexes etc --- CONTRIBUTING.md | 39 +++ .../20250415111056_thread_all_statuses.go | 259 ++++++++++-------- .../new/status.go | 78 +++--- .../util/utiltypes.go | 25 ++ internal/db/bundb/migrations/util.go | 92 ------- internal/gtsmodel/status.go | 100 +++---- 6 files changed, 296 insertions(+), 297 deletions(-) create mode 100644 internal/db/bundb/migrations/20250415111056_thread_all_statuses/util/utiltypes.go diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 611bea97c..7359d65fa 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -33,6 +33,8 @@ These contribution guidelines were adapted from / inspired by those of Gitea (ht - [Federation](#federation) - [Updating Swagger docs](#updating-swagger-docs) - [CI/CD configuration](#ci-cd-configuration) +- [Other Useful Stuff](#other-useful-stuff) + - [Running migrations on a Postgres DB backup locally](#running-migrations-on-a-postgres-db-backup-locally) ## Introduction @@ -525,3 +527,40 @@ The `woodpecker` pipeline files are in the `.woodpecker` directory of this repos The Woodpecker instance for GoToSocial is [here](https://woodpecker.superseriousbusiness.org/repos/2). Documentation for Woodpecker is [here](https://woodpecker-ci.org/docs/intro). + +## Other Useful Stuff + +Various bits and bobs. + +### Running migrations on a Postgres DB backup locally + +It may be useful when testing or debugging migrations to be able to run them against a copy of a real instance's Postgres database locally. + +Basic steps for this: + +1. Dump the Postgres database on the remote machine, and copy the dump over to your development machine. +2. Create a local Postgres container and mount the dump into it with, for example: + + ```bash + docker run -it --name postgres --network host -e POSTGRES_PASSWORD=postgres -v /path/to/db_dump:/db_dump postgres + ``` +3. Get a terminal inside the running container: + + ```bash + docker exec -it --user postgres postgres bash + ``` +4. Using that terminal, restore the dump (this will probably take a little while depending on the dump size and the specs of your machine): + + ```bash + psql -X postgres < /db_dump + ``` +5. With the Postgres container still running, run GoToSocial and point it towards the container. Use the appropriate `GTS_HOST` (and `GTS_ACCOUNT_DOMAIN`) values for the instance you dumped: + + ```bash + GTS_HOST=example.org \ + GTS_DB_TYPE=postgres \ + GTS_DB_POSTGRES_CONNECTION_STRING=postgres://postgres:postgres@localhost:5432/postgres \ + ./gotosocial migrations run + ``` + +When you're done messing around, don't forget to remove any containers that you started up, and remove any lingering volumes with `docker volume prune`, else you might end up filling your disk with unused temporary volumes. diff --git a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go index fc02d1e40..0b0389e7a 100644 --- a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go +++ b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go @@ -28,10 +28,13 @@ import ( "code.superseriousbusiness.org/gotosocial/internal/db" newmodel "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new" oldmodel "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/old" + "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/util" "code.superseriousbusiness.org/gotosocial/internal/gtserror" "code.superseriousbusiness.org/gotosocial/internal/id" "code.superseriousbusiness.org/gotosocial/internal/log" + "code.superseriousbusiness.org/gotosocial/internal/util/xslices" "github.com/uptrace/bun" + "github.com/uptrace/bun/dialect" ) func init() { @@ -44,12 +47,60 @@ func init() { return gtserror.Newf("error getting bun column def: %w", err) } - // Update column def to use '${name}_new'. + // Update column def to use temporary + // '${name}_new' while we migrate. newColDef = strings.Replace(newColDef, "thread_id", "thread_id_new", 1) + // Create thread_id_new already + // so we can populate it as we go. + log.Info(ctx, "creating statuses column thread_id_new") + if _, err := db.NewAddColumn(). + Table("statuses"). + ColumnExpr(newColDef). + Exec(ctx); err != nil { + return gtserror.Newf("error adding statuses column thread_id_new: %w", err) + } + + // Create an index on thread_id_new so we can keep + // track of it as we update, and use it to avoid + // selecting statuses we've already updated. + // + // We'll remove this at the end of the migration. + log.Info(ctx, "creating temporary thread_id_new index") + if db.Dialect().Name() == dialect.PG { + // On Postgres we can use a partial index + // to indicate we only care about default + // value thread IDs (ie., not set yet). + if _, err := db.NewCreateIndex(). + Table("statuses"). + Index("statuses_thread_id_new_idx"). + Column("thread_id_new"). + Where("? = ?", bun.Ident("thread_id_new"), id.Lowest). + Exec(ctx); err != nil { + return gtserror.Newf("error creating temporary thread_id_new index: %w", err) + } + } else { + // On SQLite we can use an index expression + // to do the same thing. While we *can* use + // a partial index for this, an index expression + // is magnitudes faster. Databases! + if _, err := db.NewCreateIndex(). + Table("statuses"). + Index("statuses_thread_id_new_idx"). + ColumnExpr("? = ?", bun.Ident("thread_id_new"), id.Lowest). + Exec(ctx); err != nil { + return gtserror.Newf("error creating temporary thread_id_new index: %w", err) + } + } + + // Attempt to merge any sqlite write-ahead-log. + if err := doWALCheckpoint(ctx, db); err != nil { + return err + } + var sr statusRethreader - var count int + var updatedRows int64 var maxID string var statuses []*oldmodel.Status @@ -63,47 +114,45 @@ func init() { // possible ULID value. maxID = id.Highest - log.Warn(ctx, "rethreading top-level statuses, this will take a *long* time") - for /* TOP LEVEL STATUS LOOP */ { + log.Warnf(ctx, "migrating %d statuses, this may take a *long* time", total) + for { // Reset slice. clear(statuses) statuses = statuses[:0] - // Select top-level statuses. + // Select IDs of next batch, paging down + // and skipping statuses to which we've + // already allocated a (new) thread ID. if err := db.NewSelect(). Model(&statuses). - Column("id", "thread_id"). - - // We specifically use in_reply_to_account_id instead of in_reply_to_id as - // they should both be set / unset in unison, but we specifically have an - // index on in_reply_to_account_id with ID ordering, unlike in_reply_to_id. - Where("? IS NULL", bun.Ident("in_reply_to_account_id")). + Column("id"). + Where("? = ?", bun.Ident("thread_id_new"), id.Lowest). Where("? < ?", bun.Ident("id"), maxID). OrderExpr("? DESC", bun.Ident("id")). - Limit(5000). + Limit(200). Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) { - return gtserror.Newf("error selecting top level statuses: %w", err) + return gtserror.Newf("error selecting unthreaded statuses: %w", err) } - // Reached end of block. - if len(statuses) == 0 { + // No more statuses! + l := len(statuses) + if l == 0 { + log.Info(ctx, "done migrating statuses!") break } // Set next maxID value from statuses. - maxID = statuses[len(statuses)-1].ID + maxID = statuses[l-1].ID - // Rethread each selected batch of top-level statuses in a transaction. + // Rethread each selected status in a transaction. if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { - - // Rethread each top-level status. for _, status := range statuses { n, err := sr.rethreadStatus(ctx, tx, status) if err != nil { return gtserror.Newf("error rethreading status %s: %w", status.URI, err) } - count += n + updatedRows += n } return nil @@ -111,7 +160,24 @@ func init() { return err } - log.Infof(ctx, "[approx %d of %d] rethreading statuses (top-level)", count, total) + // Show percent migrated. + // + // Will maybe end up wonky due to approximations + // and batching, so show a generic message at 100%. + percentDone := (float64(updatedRows) / float64(total)) * 100 + if percentDone <= 100 { + log.Infof( + ctx, + "[updated %d rows] migrated approx. %.2f%% of statuses", + updatedRows, percentDone, + ) + } else { + log.Infof( + ctx, + "[updated %d rows] almost done migrating... ", + updatedRows, + ) + } } // Attempt to merge any sqlite write-ahead-log. @@ -119,58 +185,11 @@ func init() { return err } - log.Warn(ctx, "rethreading straggler statuses, this will take a *long* time") - for /* STRAGGLER STATUS LOOP */ { - - // Reset slice. - clear(statuses) - statuses = statuses[:0] - - // Select straggler statuses. - if err := db.NewSelect(). - Model(&statuses). - Column("id", "in_reply_to_id", "thread_id"). - Where("? IS NULL", bun.Ident("thread_id")). - - // We select in smaller batches for this part - // of the migration as there is a chance that - // we may be fetching statuses that might be - // part of the same thread, i.e. one call to - // rethreadStatus() may effect other statuses - // later in the slice. - Limit(1000). - Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) { - return gtserror.Newf("error selecting straggler statuses: %w", err) - } - - // Reached end of block. - if len(statuses) == 0 { - break - } - - // Rethread each selected batch of straggler statuses in a transaction. - if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { - - // Rethread each top-level status. - for _, status := range statuses { - n, err := sr.rethreadStatus(ctx, tx, status) - if err != nil { - return gtserror.Newf("error rethreading status %s: %w", status.URI, err) - } - count += n - } - - return nil - }); err != nil { - return err - } - - log.Infof(ctx, "[approx %d of %d] rethreading statuses (stragglers)", count, total) - } - - // Attempt to merge any sqlite write-ahead-log. - if err := doWALCheckpoint(ctx, db); err != nil { - return err + log.Info(ctx, "dropping temporary thread_id_new index") + if _, err := db.NewDropIndex(). + Index("statuses_thread_id_new_idx"). + Exec(ctx); err != nil { + return gtserror.Newf("error dropping temporary thread_id_new index: %w", err) } log.Info(ctx, "dropping old thread_to_statuses table") @@ -180,33 +199,6 @@ func init() { return gtserror.Newf("error dropping old thread_to_statuses table: %w", err) } - log.Info(ctx, "creating new statuses thread_id column") - if _, err := db.NewAddColumn(). - Table("statuses"). - ColumnExpr(newColDef). - Exec(ctx); err != nil { - return gtserror.Newf("error adding new thread_id column: %w", err) - } - - log.Info(ctx, "setting thread_id_new = thread_id (this may take a while...)") - if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { - return batchUpdateByID(ctx, tx, - "statuses", // table - "id", // batchByCol - "UPDATE ? SET ? = ?", // updateQuery - []any{bun.Ident("statuses"), - bun.Ident("thread_id_new"), - bun.Ident("thread_id")}, - ) - }); err != nil { - return err - } - - // Attempt to merge any sqlite write-ahead-log. - if err := doWALCheckpoint(ctx, db); err != nil { - return err - } - log.Info(ctx, "dropping old statuses thread_id index") if _, err := db.NewDropIndex(). Index("statuses_thread_id_idx"). @@ -289,8 +281,8 @@ type statusRethreader struct { } // rethreadStatus is the main logic handler for statusRethreader{}. this is what gets called from the migration -// in order to trigger a status rethreading operation for the given status, returning total number rethreaded. -func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, status *oldmodel.Status) (int, error) { +// in order to trigger a status rethreading operation for the given status, returning total number of rows changed. +func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, status *oldmodel.Status) (int64, error) { // Zero slice and // map ptr values. @@ -346,6 +338,8 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu return 0, gtserror.Newf("error getting children: %w", err) } + // Dedupe thread IDs. + // Check for newly picked-up threads // to find stragglers for below. Else // we've reached end of what we can do. @@ -371,10 +365,6 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu threadIdx = len(sr.threadIDs) } - // Total number of - // statuses threaded. - total := len(sr.statusIDs) - // Check for the case where the entire // batch of statuses is already correctly // threaded. Then we have nothing to do! @@ -417,29 +407,61 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu } } - // Update all the statuses to - // use determined thread_id. - if _, err := tx.NewUpdate(). - Table("statuses"). - Where("? IN (?)", bun.Ident("id"), bun.In(sr.statusIDs)). - Set("? = ?", bun.Ident("thread_id"), threadID). - Exec(ctx); err != nil { + // Use a bulk update to update all the + // statuses to use determined thread_id. + // + // https://bun.uptrace.dev/guide/query-update.html#bulk-update + values := make([]*util.Status, 0, len(sr.statusIDs)) + for _, statusID := range sr.statusIDs { + values = append(values, &util.Status{ + ID: statusID, + ThreadIDNew: threadID, + }) + } + + res, err := tx.NewUpdate(). + With("_data", tx.NewValues(&values)). + Model((*util.Status)(nil)). + TableExpr("_data"). + // Set the new thread ID, which we can use as + // an indication that we've migrated this batch. + Set("? = ?", bun.Ident("thread_id_new"), bun.Ident("_data.thread_id_new")). + // While we're here, also set old thread_id, as + // we'll use it for further rethreading purposes. + Set("? = ?", bun.Ident("thread_id"), bun.Ident("_data.thread_id_new")). + // "Join" on status ID. + Where("? = ?", bun.Ident("status.id"), bun.Ident("_data.id")). + // To avoid spurious writes, + // only update unmigrated statuses. + Where("? = ?", bun.Ident("status.thread_id_new"), id.Lowest). + Exec(ctx) + if err != nil { return 0, gtserror.Newf("error updating status thread ids: %w", err) } + rowsAffected, err := res.RowsAffected() + if err != nil { + return 0, gtserror.Newf("error counting rows affected: %w", err) + } + if len(sr.threadIDs) > 0 { // Update any existing thread // mutes to use latest thread_id. + + // Dedupe thread IDs before query + // to avoid ludicrous "IN" clause. + threadIDs := sr.threadIDs + threadIDs = xslices.Deduplicate(threadIDs) if _, err := tx.NewUpdate(). Table("thread_mutes"). - Where("? IN (?)", bun.Ident("thread_id"), bun.In(sr.threadIDs)). + Where("? IN (?)", bun.Ident("thread_id"), bun.In(threadIDs)). Set("? = ?", bun.Ident("thread_id"), threadID). Exec(ctx); err != nil { return 0, gtserror.Newf("error updating mute thread ids: %w", err) } } - return total, nil + return rowsAffected, nil } // append will append the given status to the internal tracking of statusRethreader{} for @@ -560,6 +582,11 @@ func (sr *statusRethreader) getStragglers(ctx context.Context, tx bun.Tx, idx in clear(sr.statuses) sr.statuses = sr.statuses[:0] + // Dedupe thread IDs before query + // to avoid ludicrous "IN" clause. + threadIDs := sr.threadIDs[idx:] + threadIDs = xslices.Deduplicate(threadIDs) + // Select stragglers that // also have thread IDs. if err := tx.NewSelect(). @@ -567,7 +594,7 @@ func (sr *statusRethreader) getStragglers(ctx context.Context, tx bun.Tx, idx in Column("id", "thread_id", "in_reply_to_id"). Where("? IN (?) AND ? NOT IN (?)", bun.Ident("thread_id"), - bun.In(sr.threadIDs[idx:]), + bun.In(threadIDs), bun.Ident("id"), bun.In(sr.statusIDs), ). diff --git a/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new/status.go b/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new/status.go index a03e93859..0f20bb7a4 100644 --- a/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new/status.go +++ b/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new/status.go @@ -23,45 +23,45 @@ import ( // Status represents a user-created 'post' or 'status' in the database, either remote or local type Status struct { - ID string `bun:"type:CHAR(26),pk,nullzero,notnull,unique"` // id of this item in the database - CreatedAt time.Time `bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"` // when was item created - EditedAt time.Time `bun:"type:timestamptz,nullzero"` // when this status was last edited (if set) - FetchedAt time.Time `bun:"type:timestamptz,nullzero"` // when was item (remote) last fetched. - PinnedAt time.Time `bun:"type:timestamptz,nullzero"` // Status was pinned by owning account at this time. - URI string `bun:",unique,nullzero,notnull"` // activitypub URI of this status - URL string `bun:",nullzero"` // web url for viewing this status - Content string `bun:""` // Content HTML for this status. - AttachmentIDs []string `bun:"attachments,array"` // Database IDs of any media attachments associated with this status - TagIDs []string `bun:"tags,array"` // Database IDs of any tags used in this status - MentionIDs []string `bun:"mentions,array"` // Database IDs of any mentions in this status - EmojiIDs []string `bun:"emojis,array"` // Database IDs of any emojis used in this status - Local *bool `bun:",nullzero,notnull,default:false"` // is this status from a local account? - AccountID string `bun:"type:CHAR(26),nullzero,notnull"` // which account posted this status? - AccountURI string `bun:",nullzero,notnull"` // activitypub uri of the owner of this status - InReplyToID string `bun:"type:CHAR(26),nullzero"` // id of the status this status replies to - InReplyToURI string `bun:",nullzero"` // activitypub uri of the status this status is a reply to - InReplyToAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that this status replies to - InReplyTo *Status `bun:"-"` // status corresponding to inReplyToID - BoostOfID string `bun:"type:CHAR(26),nullzero"` // id of the status this status is a boost of - BoostOfURI string `bun:"-"` // URI of the status this status is a boost of; field not inserted in the db, just for dereferencing purposes. - BoostOfAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that owns the boosted status - BoostOf *Status `bun:"-"` // status that corresponds to boostOfID - ThreadID string `bun:"type:CHAR(26),nullzero,notnull,default:00000000000000000000000000"` // id of the thread to which this status belongs - EditIDs []string `bun:"edits,array"` // - PollID string `bun:"type:CHAR(26),nullzero"` // - ContentWarning string `bun:",nullzero"` // Content warning HTML for this status. - ContentWarningText string `bun:""` // Original text of the content warning without formatting - Visibility Visibility `bun:",nullzero,notnull"` // visibility entry for this status - Sensitive *bool `bun:",nullzero,notnull,default:false"` // mark the status as sensitive? - Language string `bun:",nullzero"` // what language is this status written in? - CreatedWithApplicationID string `bun:"type:CHAR(26),nullzero"` // Which application was used to create this status? - ActivityStreamsType string `bun:",nullzero,notnull"` // What is the activitystreams type of this status? See: https://www.w3.org/TR/activitystreams-vocabulary/#object-types. Will probably almost always be Note but who knows!. - Text string `bun:""` // Original text of the status without formatting - ContentType StatusContentType `bun:",nullzero"` // Content type used to process the original text of the status - Federated *bool `bun:",notnull"` // This status will be federated beyond the local timeline(s) - PendingApproval *bool `bun:",nullzero,notnull,default:false"` // If true then status is a reply or boost wrapper that must be Approved by the reply-ee or boost-ee before being fully distributed. - PreApproved bool `bun:"-"` // If true, then status is a reply to or boost wrapper of a status on our instance, has permission to do the interaction, and an Accept should be sent out for it immediately. Field not stored in the DB. - ApprovedByURI string `bun:",nullzero"` // URI of an Accept Activity that approves the Announce or Create Activity that this status was/will be attached to. + ID string `bun:"type:CHAR(26),pk,nullzero,notnull,unique"` // id of this item in the database + CreatedAt time.Time `bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"` // when was item created + EditedAt time.Time `bun:"type:timestamptz,nullzero"` // when this status was last edited (if set) + FetchedAt time.Time `bun:"type:timestamptz,nullzero"` // when was item (remote) last fetched. + PinnedAt time.Time `bun:"type:timestamptz,nullzero"` // Status was pinned by owning account at this time. + URI string `bun:",unique,nullzero,notnull"` // activitypub URI of this status + URL string `bun:",nullzero"` // web url for viewing this status + Content string `bun:""` // Content HTML for this status. + AttachmentIDs []string `bun:"attachments,array"` // Database IDs of any media attachments associated with this status + TagIDs []string `bun:"tags,array"` // Database IDs of any tags used in this status + MentionIDs []string `bun:"mentions,array"` // Database IDs of any mentions in this status + EmojiIDs []string `bun:"emojis,array"` // Database IDs of any emojis used in this status + Local *bool `bun:",nullzero,notnull,default:false"` // is this status from a local account? + AccountID string `bun:"type:CHAR(26),nullzero,notnull"` // which account posted this status? + AccountURI string `bun:",nullzero,notnull"` // activitypub uri of the owner of this status + InReplyToID string `bun:"type:CHAR(26),nullzero"` // id of the status this status replies to + InReplyToURI string `bun:",nullzero"` // activitypub uri of the status this status is a reply to + InReplyToAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that this status replies to + InReplyTo *Status `bun:"-"` // status corresponding to inReplyToID + BoostOfID string `bun:"type:CHAR(26),nullzero"` // id of the status this status is a boost of + BoostOfURI string `bun:"-"` // URI of the status this status is a boost of; field not inserted in the db, just for dereferencing purposes. + BoostOfAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that owns the boosted status + BoostOf *Status `bun:"-"` // status that corresponds to boostOfID + ThreadID string `bun:"type:CHAR(26),nullzero,notnull,default:'00000000000000000000000000'"` // id of the thread to which this status belongs + EditIDs []string `bun:"edits,array"` // + PollID string `bun:"type:CHAR(26),nullzero"` // + ContentWarning string `bun:",nullzero"` // Content warning HTML for this status. + ContentWarningText string `bun:""` // Original text of the content warning without formatting + Visibility Visibility `bun:",nullzero,notnull"` // visibility entry for this status + Sensitive *bool `bun:",nullzero,notnull,default:false"` // mark the status as sensitive? + Language string `bun:",nullzero"` // what language is this status written in? + CreatedWithApplicationID string `bun:"type:CHAR(26),nullzero"` // Which application was used to create this status? + ActivityStreamsType string `bun:",nullzero,notnull"` // What is the activitystreams type of this status? See: https://www.w3.org/TR/activitystreams-vocabulary/#object-types. Will probably almost always be Note but who knows!. + Text string `bun:""` // Original text of the status without formatting + ContentType StatusContentType `bun:",nullzero"` // Content type used to process the original text of the status + Federated *bool `bun:",notnull"` // This status will be federated beyond the local timeline(s) + PendingApproval *bool `bun:",nullzero,notnull,default:false"` // If true then status is a reply or boost wrapper that must be Approved by the reply-ee or boost-ee before being fully distributed. + PreApproved bool `bun:"-"` // If true, then status is a reply to or boost wrapper of a status on our instance, has permission to do the interaction, and an Accept should be sent out for it immediately. Field not stored in the DB. + ApprovedByURI string `bun:",nullzero"` // URI of an Accept Activity that approves the Announce or Create Activity that this status was/will be attached to. } // enumType is the type we (at least, should) use diff --git a/internal/db/bundb/migrations/20250415111056_thread_all_statuses/util/utiltypes.go b/internal/db/bundb/migrations/20250415111056_thread_all_statuses/util/utiltypes.go new file mode 100644 index 000000000..ed7256aad --- /dev/null +++ b/internal/db/bundb/migrations/20250415111056_thread_all_statuses/util/utiltypes.go @@ -0,0 +1,25 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package util + +// Status is a helper type specifically +// for updating the thread ID of a status. +type Status struct { + ID string `bun:"type:CHAR(26)"` + ThreadIDNew string `bun:"type:CHAR(26)"` +} diff --git a/internal/db/bundb/migrations/util.go b/internal/db/bundb/migrations/util.go index 4a3a62b21..b5543c824 100644 --- a/internal/db/bundb/migrations/util.go +++ b/internal/db/bundb/migrations/util.go @@ -66,98 +66,6 @@ func doWALCheckpoint(ctx context.Context, db *bun.DB) error { return nil } -// batchUpdateByID performs the given updateQuery with updateArgs -// over the entire given table, batching by the ID of batchByCol. -func batchUpdateByID( - ctx context.Context, - tx bun.Tx, - table string, - batchByCol string, - updateQuery string, - updateArgs []any, -) error { - // Get a count of all in table. - total, err := tx.NewSelect(). - Table(table). - Count(ctx) - if err != nil { - return gtserror.Newf("error selecting total count: %w", err) - } - - // Query batch size - // in number of rows. - const batchsz = 5000 - - // Stores highest batch value - // used in iterate queries, - // starting at highest possible. - highest := id.Highest - - // Total updated rows. - var updated int - - for { - // Limit to batchsz - // items at once. - batchQ := tx. - NewSelect(). - Table(table). - Column(batchByCol). - Where("? < ?", bun.Ident(batchByCol), highest). - OrderExpr("? DESC", bun.Ident(batchByCol)). - Limit(batchsz) - - // Finalize UPDATE to act only on batch. - qStr := updateQuery + " WHERE ? IN (?)" - args := append(slices.Clone(updateArgs), - bun.Ident(batchByCol), - batchQ, - ) - - // Execute the prepared raw query with arguments. - res, err := tx.NewRaw(qStr, args...).Exec(ctx) - if err != nil { - return gtserror.Newf("error updating old column values: %w", err) - } - - // Check how many items we updated. - thisUpdated, err := res.RowsAffected() - if err != nil { - return gtserror.Newf("error counting affected rows: %w", err) - } - - if thisUpdated == 0 { - // Nothing updated - // means we're done. - break - } - - // Update the overall count. - updated += int(thisUpdated) - - // Log helpful message to admin. - log.Infof(ctx, "migrated %d of %d %s (up to %s)", - updated, total, table, highest) - - // Get next highest - // id for next batch. - if err := tx. - NewSelect(). - With("batch_query", batchQ). - ColumnExpr("min(?) FROM ?", bun.Ident(batchByCol), bun.Ident("batch_query")). - Scan(ctx, &highest); err != nil { - return gtserror.Newf("error selecting next highest: %w", err) - } - } - - if total != int(updated) { - // Return error here in order to rollback the whole transaction. - return fmt.Errorf("total=%d does not match updated=%d", total, updated) - } - - return nil -} - // convertEnums performs a transaction that converts // a table's column of our old-style enums (strings) to // more performant and space-saving integer types. diff --git a/internal/gtsmodel/status.go b/internal/gtsmodel/status.go index 31e8fe881..0f0fb8404 100644 --- a/internal/gtsmodel/status.go +++ b/internal/gtsmodel/status.go @@ -27,56 +27,56 @@ import ( // Status represents a user-created 'post' or 'status' in the database, either remote or local type Status struct { - ID string `bun:"type:CHAR(26),pk,nullzero,notnull,unique"` // id of this item in the database - CreatedAt time.Time `bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"` // when was item created - EditedAt time.Time `bun:"type:timestamptz,nullzero"` // when this status was last edited (if set) - FetchedAt time.Time `bun:"type:timestamptz,nullzero"` // when was item (remote) last fetched. - PinnedAt time.Time `bun:"type:timestamptz,nullzero"` // Status was pinned by owning account at this time. - URI string `bun:",unique,nullzero,notnull"` // activitypub URI of this status - URL string `bun:",nullzero"` // web url for viewing this status - Content string `bun:""` // Content HTML for this status. - AttachmentIDs []string `bun:"attachments,array"` // Database IDs of any media attachments associated with this status - Attachments []*MediaAttachment `bun:"attached_media,rel:has-many"` // Attachments corresponding to attachmentIDs - TagIDs []string `bun:"tags,array"` // Database IDs of any tags used in this status - Tags []*Tag `bun:"attached_tags,m2m:status_to_tags"` // Tags corresponding to tagIDs. https://bun.uptrace.dev/guide/relations.html#many-to-many-relation - MentionIDs []string `bun:"mentions,array"` // Database IDs of any mentions in this status - Mentions []*Mention `bun:"attached_mentions,rel:has-many"` // Mentions corresponding to mentionIDs - EmojiIDs []string `bun:"emojis,array"` // Database IDs of any emojis used in this status - Emojis []*Emoji `bun:"attached_emojis,m2m:status_to_emojis"` // Emojis corresponding to emojiIDs. https://bun.uptrace.dev/guide/relations.html#many-to-many-relation - Local *bool `bun:",nullzero,notnull,default:false"` // is this status from a local account? - AccountID string `bun:"type:CHAR(26),nullzero,notnull"` // which account posted this status? - Account *Account `bun:"rel:belongs-to"` // account corresponding to accountID - AccountURI string `bun:",nullzero,notnull"` // activitypub uri of the owner of this status - InReplyToID string `bun:"type:CHAR(26),nullzero"` // id of the status this status replies to - InReplyToURI string `bun:",nullzero"` // activitypub uri of the status this status is a reply to - InReplyToAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that this status replies to - InReplyTo *Status `bun:"-"` // status corresponding to inReplyToID - InReplyToAccount *Account `bun:"rel:belongs-to"` // account corresponding to inReplyToAccountID - BoostOfID string `bun:"type:CHAR(26),nullzero"` // id of the status this status is a boost of - BoostOfURI string `bun:"-"` // URI of the status this status is a boost of; field not inserted in the db, just for dereferencing purposes. - BoostOfAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that owns the boosted status - BoostOf *Status `bun:"-"` // status that corresponds to boostOfID - BoostOfAccount *Account `bun:"rel:belongs-to"` // account that corresponds to boostOfAccountID - ThreadID string `bun:"type:CHAR(26),nullzero,notnull,default:00000000000000000000000000"` // id of the thread to which this status belongs - EditIDs []string `bun:"edits,array"` // IDs of status edits for this status, ordered from smallest (oldest) -> largest (newest) ID. - Edits []*StatusEdit `bun:"-"` // Edits of this status, ordered from oldest -> newest edit. - PollID string `bun:"type:CHAR(26),nullzero"` // - Poll *Poll `bun:"-"` // - ContentWarning string `bun:",nullzero"` // Content warning HTML for this status. - ContentWarningText string `bun:""` // Original text of the content warning without formatting - Visibility Visibility `bun:",nullzero,notnull"` // visibility entry for this status - Sensitive *bool `bun:",nullzero,notnull,default:false"` // mark the status as sensitive? - Language string `bun:",nullzero"` // what language is this status written in? - CreatedWithApplicationID string `bun:"type:CHAR(26),nullzero"` // Which application was used to create this status? - CreatedWithApplication *Application `bun:"rel:belongs-to"` // application corresponding to createdWithApplicationID - ActivityStreamsType string `bun:",nullzero,notnull"` // What is the activitystreams type of this status? See: https://www.w3.org/TR/activitystreams-vocabulary/#object-types. Will probably almost always be Note but who knows!. - Text string `bun:""` // Original text of the status without formatting - ContentType StatusContentType `bun:",nullzero"` // Content type used to process the original text of the status - Federated *bool `bun:",notnull"` // This status will be federated beyond the local timeline(s) - InteractionPolicy *InteractionPolicy `bun:""` // InteractionPolicy for this status. If null then the default InteractionPolicy should be assumed for this status's Visibility. Always null for boost wrappers. - PendingApproval *bool `bun:",nullzero,notnull,default:false"` // If true then status is a reply or boost wrapper that must be Approved by the reply-ee or boost-ee before being fully distributed. - PreApproved bool `bun:"-"` // If true, then status is a reply to or boost wrapper of a status on our instance, has permission to do the interaction, and an Accept should be sent out for it immediately. Field not stored in the DB. - ApprovedByURI string `bun:",nullzero"` // URI of *either* an Accept Activity, or a ReplyAuthorization or AnnounceAuthorization, which approves the Announce, Create or interaction request Activity that this status was/will be attached to. + ID string `bun:"type:CHAR(26),pk,nullzero,notnull,unique"` // id of this item in the database + CreatedAt time.Time `bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"` // when was item created + EditedAt time.Time `bun:"type:timestamptz,nullzero"` // when this status was last edited (if set) + FetchedAt time.Time `bun:"type:timestamptz,nullzero"` // when was item (remote) last fetched. + PinnedAt time.Time `bun:"type:timestamptz,nullzero"` // Status was pinned by owning account at this time. + URI string `bun:",unique,nullzero,notnull"` // activitypub URI of this status + URL string `bun:",nullzero"` // web url for viewing this status + Content string `bun:""` // Content HTML for this status. + AttachmentIDs []string `bun:"attachments,array"` // Database IDs of any media attachments associated with this status + Attachments []*MediaAttachment `bun:"attached_media,rel:has-many"` // Attachments corresponding to attachmentIDs + TagIDs []string `bun:"tags,array"` // Database IDs of any tags used in this status + Tags []*Tag `bun:"attached_tags,m2m:status_to_tags"` // Tags corresponding to tagIDs. https://bun.uptrace.dev/guide/relations.html#many-to-many-relation + MentionIDs []string `bun:"mentions,array"` // Database IDs of any mentions in this status + Mentions []*Mention `bun:"attached_mentions,rel:has-many"` // Mentions corresponding to mentionIDs + EmojiIDs []string `bun:"emojis,array"` // Database IDs of any emojis used in this status + Emojis []*Emoji `bun:"attached_emojis,m2m:status_to_emojis"` // Emojis corresponding to emojiIDs. https://bun.uptrace.dev/guide/relations.html#many-to-many-relation + Local *bool `bun:",nullzero,notnull,default:false"` // is this status from a local account? + AccountID string `bun:"type:CHAR(26),nullzero,notnull"` // which account posted this status? + Account *Account `bun:"rel:belongs-to"` // account corresponding to accountID + AccountURI string `bun:",nullzero,notnull"` // activitypub uri of the owner of this status + InReplyToID string `bun:"type:CHAR(26),nullzero"` // id of the status this status replies to + InReplyToURI string `bun:",nullzero"` // activitypub uri of the status this status is a reply to + InReplyToAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that this status replies to + InReplyTo *Status `bun:"-"` // status corresponding to inReplyToID + InReplyToAccount *Account `bun:"rel:belongs-to"` // account corresponding to inReplyToAccountID + BoostOfID string `bun:"type:CHAR(26),nullzero"` // id of the status this status is a boost of + BoostOfURI string `bun:"-"` // URI of the status this status is a boost of; field not inserted in the db, just for dereferencing purposes. + BoostOfAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that owns the boosted status + BoostOf *Status `bun:"-"` // status that corresponds to boostOfID + BoostOfAccount *Account `bun:"rel:belongs-to"` // account that corresponds to boostOfAccountID + ThreadID string `bun:"type:CHAR(26),nullzero,notnull,default:'00000000000000000000000000'"` // id of the thread to which this status belongs + EditIDs []string `bun:"edits,array"` // IDs of status edits for this status, ordered from smallest (oldest) -> largest (newest) ID. + Edits []*StatusEdit `bun:"-"` // Edits of this status, ordered from oldest -> newest edit. + PollID string `bun:"type:CHAR(26),nullzero"` // + Poll *Poll `bun:"-"` // + ContentWarning string `bun:",nullzero"` // Content warning HTML for this status. + ContentWarningText string `bun:""` // Original text of the content warning without formatting + Visibility Visibility `bun:",nullzero,notnull"` // visibility entry for this status + Sensitive *bool `bun:",nullzero,notnull,default:false"` // mark the status as sensitive? + Language string `bun:",nullzero"` // what language is this status written in? + CreatedWithApplicationID string `bun:"type:CHAR(26),nullzero"` // Which application was used to create this status? + CreatedWithApplication *Application `bun:"rel:belongs-to"` // application corresponding to createdWithApplicationID + ActivityStreamsType string `bun:",nullzero,notnull"` // What is the activitystreams type of this status? See: https://www.w3.org/TR/activitystreams-vocabulary/#object-types. Will probably almost always be Note but who knows!. + Text string `bun:""` // Original text of the status without formatting + ContentType StatusContentType `bun:",nullzero"` // Content type used to process the original text of the status + Federated *bool `bun:",notnull"` // This status will be federated beyond the local timeline(s) + InteractionPolicy *InteractionPolicy `bun:""` // InteractionPolicy for this status. If null then the default InteractionPolicy should be assumed for this status's Visibility. Always null for boost wrappers. + PendingApproval *bool `bun:",nullzero,notnull,default:false"` // If true then status is a reply or boost wrapper that must be Approved by the reply-ee or boost-ee before being fully distributed. + PreApproved bool `bun:"-"` // If true, then status is a reply to or boost wrapper of a status on our instance, has permission to do the interaction, and an Accept should be sent out for it immediately. Field not stored in the DB. + ApprovedByURI string `bun:",nullzero"` // URI of *either* an Accept Activity, or a ReplyAuthorization or AnnounceAuthorization, which approves the Announce, Create or interaction request Activity that this status was/will be attached to. } // GetID implements timeline.Timelineable{}. From 487292e6f323eec47ea50c1a3a43fed65d6715da Mon Sep 17 00:00:00 2001 From: tobi Date: Fri, 26 Sep 2025 15:17:30 +0200 Subject: [PATCH 02/11] remove errant comment --- .../db/bundb/migrations/20250415111056_thread_all_statuses.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go index 0b0389e7a..86fbbd49f 100644 --- a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go +++ b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go @@ -338,8 +338,6 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu return 0, gtserror.Newf("error getting children: %w", err) } - // Dedupe thread IDs. - // Check for newly picked-up threads // to find stragglers for below. Else // we've reached end of what we can do. From 365b9efb12cfc56e31d48f1c5f51b2f743b3e477 Mon Sep 17 00:00:00 2001 From: tobi Date: Fri, 26 Sep 2025 17:14:42 +0200 Subject: [PATCH 03/11] whew --- .../20250415111056_thread_all_statuses.go | 79 +++++-------------- 1 file changed, 20 insertions(+), 59 deletions(-) diff --git a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go index 86fbbd49f..bfa4dd84f 100644 --- a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go +++ b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go @@ -24,6 +24,7 @@ import ( "reflect" "slices" "strings" + "time" "code.superseriousbusiness.org/gotosocial/internal/db" newmodel "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new" @@ -34,7 +35,6 @@ import ( "code.superseriousbusiness.org/gotosocial/internal/log" "code.superseriousbusiness.org/gotosocial/internal/util/xslices" "github.com/uptrace/bun" - "github.com/uptrace/bun/dialect" ) func init() { @@ -62,45 +62,8 @@ func init() { return gtserror.Newf("error adding statuses column thread_id_new: %w", err) } - // Create an index on thread_id_new so we can keep - // track of it as we update, and use it to avoid - // selecting statuses we've already updated. - // - // We'll remove this at the end of the migration. - log.Info(ctx, "creating temporary thread_id_new index") - if db.Dialect().Name() == dialect.PG { - // On Postgres we can use a partial index - // to indicate we only care about default - // value thread IDs (ie., not set yet). - if _, err := db.NewCreateIndex(). - Table("statuses"). - Index("statuses_thread_id_new_idx"). - Column("thread_id_new"). - Where("? = ?", bun.Ident("thread_id_new"), id.Lowest). - Exec(ctx); err != nil { - return gtserror.Newf("error creating temporary thread_id_new index: %w", err) - } - } else { - // On SQLite we can use an index expression - // to do the same thing. While we *can* use - // a partial index for this, an index expression - // is magnitudes faster. Databases! - if _, err := db.NewCreateIndex(). - Table("statuses"). - Index("statuses_thread_id_new_idx"). - ColumnExpr("? = ?", bun.Ident("thread_id_new"), id.Lowest). - Exec(ctx); err != nil { - return gtserror.Newf("error creating temporary thread_id_new index: %w", err) - } - } - - // Attempt to merge any sqlite write-ahead-log. - if err := doWALCheckpoint(ctx, db); err != nil { - return err - } - var sr statusRethreader - var updatedRows int64 + var updatedRowsTotal int64 var maxID string var statuses []*oldmodel.Status @@ -116,21 +79,20 @@ func init() { log.Warnf(ctx, "migrating %d statuses, this may take a *long* time", total) for { + start := time.Now() // Reset slice. clear(statuses) statuses = statuses[:0] - // Select IDs of next batch, paging down - // and skipping statuses to which we've - // already allocated a (new) thread ID. + // Select IDs of next + // batch, paging down. if err := db.NewSelect(). Model(&statuses). Column("id"). - Where("? = ?", bun.Ident("thread_id_new"), id.Lowest). Where("? < ?", bun.Ident("id"), maxID). OrderExpr("? DESC", bun.Ident("id")). - Limit(200). + Limit(250). Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) { return gtserror.Newf("error selecting unthreaded statuses: %w", err) } @@ -146,13 +108,15 @@ func init() { maxID = statuses[l-1].ID // Rethread each selected status in a transaction. + var updatedRowsThisBatch int64 if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { for _, status := range statuses { n, err := sr.rethreadStatus(ctx, tx, status) if err != nil { return gtserror.Newf("error rethreading status %s: %w", status.URI, err) } - updatedRows += n + updatedRowsThisBatch += n + updatedRowsTotal += n } return nil @@ -160,22 +124,26 @@ func init() { return err } - // Show percent migrated. + // Show current speed + percent migrated. // - // Will maybe end up wonky due to approximations + // Percent may end up wonky due to approximations // and batching, so show a generic message at 100%. - percentDone := (float64(updatedRows) / float64(total)) * 100 + timeTaken := time.Since(start).Milliseconds() + msPerRow := float64(timeTaken) / float64(updatedRowsThisBatch) + rowsPerMs := float64(1) / float64(msPerRow) + rowsPerSecond := 1000 * rowsPerMs + percentDone := (float64(updatedRowsTotal) / float64(total)) * 100 if percentDone <= 100 { log.Infof( ctx, - "[updated %d rows] migrated approx. %.2f%% of statuses", - updatedRows, percentDone, + "[updated %d total rows, now @ ~%.0f rows/s] done ~%.2f%% of statuses", + updatedRowsTotal, rowsPerSecond, percentDone, ) } else { log.Infof( ctx, - "[updated %d rows] almost done migrating... ", - updatedRows, + "[updated %d total rows, now @ ~%.0f rows/s] almost done... ", + updatedRowsTotal, rowsPerSecond, ) } } @@ -185,13 +153,6 @@ func init() { return err } - log.Info(ctx, "dropping temporary thread_id_new index") - if _, err := db.NewDropIndex(). - Index("statuses_thread_id_new_idx"). - Exec(ctx); err != nil { - return gtserror.Newf("error dropping temporary thread_id_new index: %w", err) - } - log.Info(ctx, "dropping old thread_to_statuses table") if _, err := db.NewDropTable(). Table("thread_to_statuses"). From dd3a32acdb1c5f88f9198f8005ffbc4b111ccbfc Mon Sep 17 00:00:00 2001 From: tobi Date: Mon, 29 Sep 2025 11:59:35 +0200 Subject: [PATCH 04/11] few more little tweaks --- .../20250415111056_thread_all_statuses.go | 30 +++++++++++++++---- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go index bfa4dd84f..fada3ea29 100644 --- a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go +++ b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go @@ -136,13 +136,13 @@ func init() { if percentDone <= 100 { log.Infof( ctx, - "[updated %d total rows, now @ ~%.0f rows/s] done ~%.2f%% of statuses", + "[~%.0f rows/s; updated %d total rows] migrated ~%.2f%% of statuses", updatedRowsTotal, rowsPerSecond, percentDone, ) } else { log.Infof( ctx, - "[updated %d total rows, now @ ~%.0f rows/s] almost done... ", + "[~%.0f rows/s; updated %d total rows] almost done... ", updatedRowsTotal, rowsPerSecond, ) } @@ -268,14 +268,33 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu // This may have changed from // the initial batch selection // to the rethreadStatus() call. + upToDateValues := make(map[string]any, 3) if err := tx.NewSelect(). - Model(status). - Column("in_reply_to_id", "thread_id"). + TableExpr("? AS ?", bun.Ident("statuses"), bun.Ident("status")). + Column("in_reply_to_id", "thread_id", "thread_id_new"). Where("? = ?", bun.Ident("id"), status.ID). - Scan(ctx); err != nil { + Scan(ctx, &upToDateValues); err != nil { return 0, gtserror.Newf("error selecting status: %w", err) } + // If we've just threaded this status by setting + // thread_id_new, then by definition anything we + // could find from the entire thread must now be + // threaded, so we can save some database calls + // by skipping iterating up + down from here. + if v, ok := upToDateValues["thread_id_new"]; ok && v.(string) != id.Lowest { + log.Debug(ctx, "skipping just rethreaded status") + return 0, nil + } + + // Set up-to-date values on the status. + if inReplyToID, ok := upToDateValues["in_reply_to_id"]; ok && inReplyToID != nil { + status.InReplyToID = inReplyToID.(string) + } + if threadID, ok := upToDateValues["thread_id"]; ok && threadID != nil { + status.ThreadID = threadID.(string) + } + // status and thread ID cursor // index values. these are used // to keep track of newly loaded @@ -328,6 +347,7 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu // batch of statuses is already correctly // threaded. Then we have nothing to do! if sr.allThreaded && len(sr.threadIDs) == 1 { + log.Debug(ctx, "skipping just rethreaded thread") return 0, nil } From 5c000620e225b13faf1e0cac920b22b757e5c69d Mon Sep 17 00:00:00 2001 From: tobi Date: Mon, 29 Sep 2025 12:02:19 +0200 Subject: [PATCH 05/11] whoops --- .../bundb/migrations/20250415111056_thread_all_statuses.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go index fada3ea29..7b864a445 100644 --- a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go +++ b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go @@ -92,7 +92,7 @@ func init() { Column("id"). Where("? < ?", bun.Ident("id"), maxID). OrderExpr("? DESC", bun.Ident("id")). - Limit(250). + Limit(1000). Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) { return gtserror.Newf("error selecting unthreaded statuses: %w", err) } @@ -137,13 +137,13 @@ func init() { log.Infof( ctx, "[~%.0f rows/s; updated %d total rows] migrated ~%.2f%% of statuses", - updatedRowsTotal, rowsPerSecond, percentDone, + rowsPerSecond, updatedRowsTotal, percentDone, ) } else { log.Infof( ctx, "[~%.0f rows/s; updated %d total rows] almost done... ", - updatedRowsTotal, rowsPerSecond, + rowsPerSecond, updatedRowsTotal, ) } } From 4fd0bdcf2f180d66f03107e26fb270c1c7ec4d42 Mon Sep 17 00:00:00 2001 From: tobi Date: Mon, 29 Sep 2025 12:11:37 +0200 Subject: [PATCH 06/11] should be done poking now --- .../bundb/migrations/20250415111056_thread_all_statuses.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go index 7b864a445..0e67d1d7d 100644 --- a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go +++ b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go @@ -92,7 +92,7 @@ func init() { Column("id"). Where("? < ?", bun.Ident("id"), maxID). OrderExpr("? DESC", bun.Ident("id")). - Limit(1000). + Limit(200). Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) { return gtserror.Newf("error selecting unthreaded statuses: %w", err) } @@ -265,9 +265,6 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu // Ensure the passed status // has up-to-date information. - // This may have changed from - // the initial batch selection - // to the rethreadStatus() call. upToDateValues := make(map[string]any, 3) if err := tx.NewSelect(). TableExpr("? AS ?", bun.Ident("statuses"), bun.Ident("status")). From 1725769733615706c3736b84a7ef0126931c8d72 Mon Sep 17 00:00:00 2001 From: tobi Date: Mon, 29 Sep 2025 16:48:46 +0200 Subject: [PATCH 07/11] i'm adjusting the PR, pray i don't adjust it further --- .../20250415111056_thread_all_statuses.go | 200 +++++++++++++----- 1 file changed, 150 insertions(+), 50 deletions(-) diff --git a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go index 0e67d1d7d..a4b910fa6 100644 --- a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go +++ b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go @@ -26,7 +26,6 @@ import ( "strings" "time" - "code.superseriousbusiness.org/gotosocial/internal/db" newmodel "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new" oldmodel "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/old" "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/util" @@ -62,44 +61,80 @@ func init() { return gtserror.Newf("error adding statuses column thread_id_new: %w", err) } - var sr statusRethreader - var updatedRowsTotal int64 - var maxID string - var statuses []*oldmodel.Status + if err := doWALCheckpoint(ctx, db); err != nil { + return err + } - // Get a total count of all statuses before migration. - total, err := db.NewSelect().Table("statuses").Count(ctx) + // Get a total count of all + // statuses before migration. + totalStatuses, err := db. + NewSelect(). + Table("statuses"). + Count(ctx) if err != nil { return gtserror.Newf("error getting status table count: %w", err) } + log.Warnf(ctx, "migrating %d statuses total, this may take a *long* time", totalStatuses) - // Start at largest + var sr statusRethreader + var updatedRowsTotal int64 + var statuses []*oldmodel.Status + + // Page starting at largest // possible ULID value. - maxID = id.Highest + var maxID = id.Highest - log.Warnf(ctx, "migrating %d statuses, this may take a *long* time", total) - for { - start := time.Now() + // Open initial transaction. + tx, err := db.BeginTx(ctx, nil) + if err != nil { + return err + } + + for i := 1; ; i++ { // Reset slice. clear(statuses) statuses = statuses[:0] + start := time.Now() + // Select IDs of next // batch, paging down. - if err := db.NewSelect(). + if err := tx.NewSelect(). Model(&statuses). Column("id"). + Where("? IS NULL", bun.Ident("in_reply_to_id")). Where("? < ?", bun.Ident("id"), maxID). OrderExpr("? DESC", bun.Ident("id")). - Limit(200). + Limit(500). Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) { - return gtserror.Newf("error selecting unthreaded statuses: %w", err) + return gtserror.Newf("error selecting top-level statuses: %w", err) + } + + // Every 50 loops, flush wal and begin new + // transaction, to avoid silly wal sizes. + if i%50 == 0 { + if err := tx.Commit(); err != nil { + return err + } + + if err := doWALCheckpoint(ctx, db); err != nil { + return err + } + + tx, err = db.BeginTx(ctx, nil) + if err != nil { + return err + } } // No more statuses! l := len(statuses) if l == 0 { + if err := tx.Commit(); err != nil { + return err + } + log.Info(ctx, "done migrating statuses!") break } @@ -107,52 +142,117 @@ func init() { // Set next maxID value from statuses. maxID = statuses[l-1].ID - // Rethread each selected status in a transaction. + // Rethread inside the transaction. var updatedRowsThisBatch int64 - if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { - for _, status := range statuses { - n, err := sr.rethreadStatus(ctx, tx, status) - if err != nil { - return gtserror.Newf("error rethreading status %s: %w", status.URI, err) - } - updatedRowsThisBatch += n - updatedRowsTotal += n + for _, status := range statuses { + n, err := sr.rethreadStatus(ctx, tx, status) + if err != nil { + return gtserror.Newf("error rethreading status %s: %w", status.URI, err) } - - return nil - }); err != nil { - return err + updatedRowsThisBatch += n + updatedRowsTotal += n } - // Show current speed + percent migrated. - // - // Percent may end up wonky due to approximations - // and batching, so show a generic message at 100%. + // Show speed for this batch. timeTaken := time.Since(start).Milliseconds() msPerRow := float64(timeTaken) / float64(updatedRowsThisBatch) rowsPerMs := float64(1) / float64(msPerRow) rowsPerSecond := 1000 * rowsPerMs - percentDone := (float64(updatedRowsTotal) / float64(total)) * 100 - if percentDone <= 100 { - log.Infof( - ctx, - "[~%.0f rows/s; updated %d total rows] migrated ~%.2f%% of statuses", - rowsPerSecond, updatedRowsTotal, percentDone, - ) - } else { - log.Infof( - ctx, - "[~%.0f rows/s; updated %d total rows] almost done... ", - rowsPerSecond, updatedRowsTotal, - ) - } + + // Show percent migrated overall. + totalDone := (float64(updatedRowsTotal) / float64(totalStatuses)) * 100 + + log.Infof( + ctx, + "[~%.2f%% done; ~%.0f rows/s] paging top-level statuses", + totalDone, rowsPerSecond, + ) } - // Attempt to merge any sqlite write-ahead-log. if err := doWALCheckpoint(ctx, db); err != nil { return err } + // Open a new transaction lads. + tx, err = db.BeginTx(ctx, nil) + if err != nil { + return err + } + + for i := 1; ; i++ { + + // Reset slice. + clear(statuses) + statuses = statuses[:0] + + start := time.Now() + + // Select IDs of stragglers for + // which we haven't set thread_id yet. + if err := tx.NewSelect(). + Model(&statuses). + Column("id"). + Where("? IS NULL", bun.Ident("thread_id")). + Limit(250). + Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) { + return gtserror.Newf("error selecting unthreaded statuses: %w", err) + } + + // Every 50 loops, flush wal and begin new + // transaction, to avoid silly wal sizes. + if i%50 == 0 { + if err := tx.Commit(); err != nil { + return err + } + + if err := doWALCheckpoint(ctx, db); err != nil { + return err + } + + tx, err = db.BeginTx(ctx, nil) + if err != nil { + return err + } + } + + // No more statuses! + l := len(statuses) + if l == 0 { + if err := tx.Commit(); err != nil { + return err + } + + log.Info(ctx, "done migrating statuses!") + break + } + + // Rethread inside the transaction. + var updatedRowsThisBatch int64 + for _, status := range statuses { + n, err := sr.rethreadStatus(ctx, tx, status) + if err != nil { + return gtserror.Newf("error rethreading status %s: %w", status.URI, err) + } + updatedRowsThisBatch += n + updatedRowsTotal += n + } + + // Show speed for this batch. + timeTaken := time.Since(start).Milliseconds() + msPerRow := float64(timeTaken) / float64(updatedRowsThisBatch) + rowsPerMs := float64(1) / float64(msPerRow) + rowsPerSecond := 1000 * rowsPerMs + + // Show percent migrated overall. + totalDone := (float64(updatedRowsTotal) / float64(totalStatuses)) * 100 + + log.Infof( + ctx, + "[~%.2f%% done; ~%.0f rows/s] cleaning up stragglers", + totalDone, rowsPerSecond, + ) + } + log.Info(ctx, "dropping old thread_to_statuses table") if _, err := db.NewDropTable(). Table("thread_to_statuses"). @@ -496,7 +596,7 @@ func (sr *statusRethreader) getParents(ctx context.Context, tx bun.Tx) error { Model(&parent). Column("id", "in_reply_to_id", "thread_id"). Where("? = ?", bun.Ident("id"), id). - Scan(ctx); err != nil && err != db.ErrNoEntries { + Scan(ctx); err != nil && err != sql.ErrNoRows { return err } @@ -535,7 +635,7 @@ func (sr *statusRethreader) getChildren(ctx context.Context, tx bun.Tx, idx int) Model(&sr.statuses). Column("id", "thread_id"). Where("? = ?", bun.Ident("in_reply_to_id"), id). - Scan(ctx); err != nil && err != db.ErrNoEntries { + Scan(ctx); err != nil && err != sql.ErrNoRows { return err } @@ -574,7 +674,7 @@ func (sr *statusRethreader) getStragglers(ctx context.Context, tx bun.Tx, idx in bun.Ident("id"), bun.In(sr.statusIDs), ). - Scan(ctx); err != nil && err != db.ErrNoEntries { + Scan(ctx); err != nil && err != sql.ErrNoRows { return err } From 9c544e732cf38d79f83902c70fb836cfe1251e23 Mon Sep 17 00:00:00 2001 From: tobi Date: Mon, 29 Sep 2025 18:56:44 +0200 Subject: [PATCH 08/11] boobs --- .../20250415111056_thread_all_statuses.go | 26 ++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go index a4b910fa6..daf392ee6 100644 --- a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go +++ b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go @@ -106,7 +106,7 @@ func init() { Where("? IS NULL", bun.Ident("in_reply_to_id")). Where("? < ?", bun.Ident("id"), maxID). OrderExpr("? DESC", bun.Ident("id")). - Limit(500). + Limit(100). Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) { return gtserror.Newf("error selecting top-level statuses: %w", err) } @@ -173,6 +173,19 @@ func init() { return err } + // Reset max ID. + maxID = id.Highest + + // Create a temporary index on thread_id_new for stragglers. + log.Info(ctx, "creating temporary statuses thread_id_new index") + if _, err := db.NewCreateIndex(). + Table("statuses"). + Index("statuses_thread_id_new_idx"). + Column("thread_id_new"). + Exec(ctx); err != nil { + return gtserror.Newf("error creating new thread_id index: %w", err) + } + // Open a new transaction lads. tx, err = db.BeginTx(ctx, nil) if err != nil { @@ -192,8 +205,8 @@ func init() { if err := tx.NewSelect(). Model(&statuses). Column("id"). - Where("? IS NULL", bun.Ident("thread_id")). - Limit(250). + Where("? = ?", bun.Ident("thread_id_new"), id.Lowest). + Limit(500). Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) { return gtserror.Newf("error selecting unthreaded statuses: %w", err) } @@ -253,6 +266,13 @@ func init() { ) } + log.Info(ctx, "dropping temporary thread_id_new index") + if _, err := db.NewDropIndex(). + Index("statuses_thread_id_new_idx"). + Exec(ctx); err != nil { + return gtserror.Newf("error dropping temporary thread_id_new index: %w", err) + } + log.Info(ctx, "dropping old thread_to_statuses table") if _, err := db.NewDropTable(). Table("thread_to_statuses"). From 2563568ccce15de8925355ea4d29ea9d59ef6945 Mon Sep 17 00:00:00 2001 From: tobi Date: Tue, 30 Sep 2025 14:50:21 +0200 Subject: [PATCH 09/11] that'll do --- CONTRIBUTING.md | 32 ++-- .../20250415111056_thread_all_statuses.go | 150 +++++++++--------- 2 files changed, 86 insertions(+), 96 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 7359d65fa..10a05002c 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -538,23 +538,21 @@ It may be useful when testing or debugging migrations to be able to run them aga Basic steps for this: -1. Dump the Postgres database on the remote machine, and copy the dump over to your development machine. -2. Create a local Postgres container and mount the dump into it with, for example: - - ```bash - docker run -it --name postgres --network host -e POSTGRES_PASSWORD=postgres -v /path/to/db_dump:/db_dump postgres - ``` -3. Get a terminal inside the running container: - - ```bash - docker exec -it --user postgres postgres bash - ``` -4. Using that terminal, restore the dump (this will probably take a little while depending on the dump size and the specs of your machine): - - ```bash - psql -X postgres < /db_dump - ``` -5. With the Postgres container still running, run GoToSocial and point it towards the container. Use the appropriate `GTS_HOST` (and `GTS_ACCOUNT_DOMAIN`) values for the instance you dumped: +First dump the Postgres database on the remote machine, and copy the dump over to your development machine. + +Now create a local Postgres container and mount the dump into it with, for example: + +```bash +docker run -it --name postgres --network host -e POSTGRES_PASSWORD=postgres -v /path/to/db_dump:/db_dump postgres +``` + +In a separate terminal window, execute a command inside the running container to load the dump into the "postgres" database: + +```bash +docker exec -it --user postgres postgres psql -X -f /db_dump postgres +``` + +With the Postgres container still running, run GoToSocial and point it towards the container. Use the appropriate `GTS_HOST` (and `GTS_ACCOUNT_DOMAIN`) values for the instance you dumped: ```bash GTS_HOST=example.org \ diff --git a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go index daf392ee6..9115dfe90 100644 --- a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go +++ b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go @@ -96,7 +96,7 @@ func init() { clear(statuses) statuses = statuses[:0] - start := time.Now() + batchStart := time.Now() // Select IDs of next // batch, paging down. @@ -106,46 +106,51 @@ func init() { Where("? IS NULL", bun.Ident("in_reply_to_id")). Where("? < ?", bun.Ident("id"), maxID). OrderExpr("? DESC", bun.Ident("id")). - Limit(100). + Limit(500). Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) { - return gtserror.Newf("error selecting top-level statuses: %w", err) + return gtserror.Newf("error selecting statuses: %w", err) } - // Every 50 loops, flush wal and begin new - // transaction, to avoid silly wal sizes. - if i%50 == 0 { + l := len(statuses) + if l == 0 { + // No more statuses! + // + // Transaction will be closed + // after leaving the loop. + break + + } else if i%100 == 0 { + // Begin a new transaction every + // 100 batches (~50000 statuses), + // to avoid massive commits. + + // Close existing transaction. if err := tx.Commit(); err != nil { return err } + // Try to flush the wal + // to avoid silly wal sizes. if err := doWALCheckpoint(ctx, db); err != nil { return err } + // Open new transaction. tx, err = db.BeginTx(ctx, nil) if err != nil { return err } } - // No more statuses! - l := len(statuses) - if l == 0 { - if err := tx.Commit(); err != nil { - return err - } - - log.Info(ctx, "done migrating statuses!") - break - } - - // Set next maxID value from statuses. + // Set next maxID + // value from statuses. maxID = statuses[l-1].ID - // Rethread inside the transaction. + // Rethread using the + // open transaction. var updatedRowsThisBatch int64 for _, status := range statuses { - n, err := sr.rethreadStatus(ctx, tx, status) + n, err := sr.rethreadStatus(ctx, tx, status, false) if err != nil { return gtserror.Newf("error rethreading status %s: %w", status.URI, err) } @@ -154,7 +159,7 @@ func init() { } // Show speed for this batch. - timeTaken := time.Since(start).Milliseconds() + timeTaken := time.Since(batchStart).Milliseconds() msPerRow := float64(timeTaken) / float64(updatedRowsThisBatch) rowsPerMs := float64(1) / float64(msPerRow) rowsPerSecond := 1000 * rowsPerMs @@ -164,94 +169,73 @@ func init() { log.Infof( ctx, - "[~%.2f%% done; ~%.0f rows/s] paging top-level statuses", + "[~%.2f%% done; ~%.0f rows/s] migrating threads", totalDone, rowsPerSecond, ) } - if err := doWALCheckpoint(ctx, db); err != nil { + // Close transaction. + if err := tx.Commit(); err != nil { return err } - // Reset max ID. - maxID = id.Highest - - // Create a temporary index on thread_id_new for stragglers. + // Create a partial index on thread_id_new to find stragglers. + // This index will be removed at the end of the migration. log.Info(ctx, "creating temporary statuses thread_id_new index") if _, err := db.NewCreateIndex(). Table("statuses"). Index("statuses_thread_id_new_idx"). Column("thread_id_new"). + Where("? = ?", bun.Ident("thread_id_new"), id.Lowest). Exec(ctx); err != nil { return gtserror.Newf("error creating new thread_id index: %w", err) } - // Open a new transaction lads. - tx, err = db.BeginTx(ctx, nil) - if err != nil { - return err - } - for i := 1; ; i++ { // Reset slice. clear(statuses) statuses = statuses[:0] - start := time.Now() + batchStart := time.Now() - // Select IDs of stragglers for - // which we haven't set thread_id yet. - if err := tx.NewSelect(). + // Get stragglers for which + // we haven't set thread ID yet. + if err := db.NewSelect(). Model(&statuses). Column("id"). Where("? = ?", bun.Ident("thread_id_new"), id.Lowest). - Limit(500). + Limit(250). Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) { - return gtserror.Newf("error selecting unthreaded statuses: %w", err) + return gtserror.Newf("error selecting straggler: %w", err) } - // Every 50 loops, flush wal and begin new - // transaction, to avoid silly wal sizes. - if i%50 == 0 { - if err := tx.Commit(); err != nil { - return err - } - - if err := doWALCheckpoint(ctx, db); err != nil { - return err - } - - tx, err = db.BeginTx(ctx, nil) - if err != nil { - return err - } - } - - // No more statuses! - l := len(statuses) - if l == 0 { - if err := tx.Commit(); err != nil { - return err - } - - log.Info(ctx, "done migrating statuses!") + if len(statuses) == 0 { + // No more + // statuses! break } - // Rethread inside the transaction. + // Update this batch + // inside a transaction. var updatedRowsThisBatch int64 - for _, status := range statuses { - n, err := sr.rethreadStatus(ctx, tx, status) - if err != nil { - return gtserror.Newf("error rethreading status %s: %w", status.URI, err) + if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { + for _, status := range statuses { + + n, err := sr.rethreadStatus(ctx, tx, status, true) + if err != nil { + return gtserror.Newf("error rethreading status %s: %w", status.URI, err) + } + updatedRowsThisBatch += n + updatedRowsTotal += n } - updatedRowsThisBatch += n - updatedRowsTotal += n + return nil + }); err != nil { + return err } // Show speed for this batch. - timeTaken := time.Since(start).Milliseconds() + timeTaken := time.Since(batchStart).Milliseconds() msPerRow := float64(timeTaken) / float64(updatedRowsThisBatch) rowsPerMs := float64(1) / float64(msPerRow) rowsPerSecond := 1000 * rowsPerMs @@ -261,11 +245,16 @@ func init() { log.Infof( ctx, - "[~%.2f%% done; ~%.0f rows/s] cleaning up stragglers", + "[~%.2f%% done; ~%.0f rows/s] migrating stragglers", totalDone, rowsPerSecond, ) } + // Try to merge everything we've done so far. + if err := doWALCheckpoint(ctx, db); err != nil { + return err + } + log.Info(ctx, "dropping temporary thread_id_new index") if _, err := db.NewDropIndex(). Index("statuses_thread_id_new_idx"). @@ -363,7 +352,7 @@ type statusRethreader struct { // rethreadStatus is the main logic handler for statusRethreader{}. this is what gets called from the migration // in order to trigger a status rethreading operation for the given status, returning total number of rows changed. -func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, status *oldmodel.Status) (int64, error) { +func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, status *oldmodel.Status, straggler bool) (int64, error) { // Zero slice and // map ptr values. @@ -405,11 +394,11 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu } // Set up-to-date values on the status. - if inReplyToID, ok := upToDateValues["in_reply_to_id"]; ok && inReplyToID != nil { - status.InReplyToID = inReplyToID.(string) + if v, ok := upToDateValues["in_reply_to_id"]; ok && v != nil { + status.InReplyToID = v.(string) } - if threadID, ok := upToDateValues["thread_id"]; ok && threadID != nil { - status.ThreadID = threadID.(string) + if v, ok := upToDateValues["thread_id"]; ok && v != nil { + status.ThreadID = v.(string) } // status and thread ID cursor @@ -463,7 +452,10 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu // Check for the case where the entire // batch of statuses is already correctly // threaded. Then we have nothing to do! - if sr.allThreaded && len(sr.threadIDs) == 1 { + // + // Skip this check for straggler statuses + // that are part of broken threads. + if !straggler && sr.allThreaded && len(sr.threadIDs) == 1 { log.Debug(ctx, "skipping just rethreaded thread") return 0, nil } From bd1c43d55e0bf113a49abb3aefdb89a07e8cf259 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zoe=CC=88=20Bijl?= Date: Wed, 1 Oct 2025 19:04:44 +0200 Subject: [PATCH 10/11] =?UTF-8?q?[bugfix/frontend]=20restore=20blockquote?= =?UTF-8?q?=20=E2=80=9Cblock=E2=80=9D=20margin=20(#4465)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit stripping `
` of all the margin looks a bit funky. this only removes the inline margin. in English this generally means that it won’t have horizontal margin but will still have vertical margin. Closes #4466 ![before the change any content after the blockquote is flush against it without space](/attachments/7cc808ee-a999-435d-9235-60651a3d9bca) ![after the changes there is vertical rhythm](/attachments/3240480a-14ee-4739-a497-14237879993c) ## Checklist Please put an x inside each checkbox to indicate that you've read and followed it: `[ ]` -> `[x]` If this is a documentation change, only the first checkbox must be filled (you can delete the others if you want). - [x] I/we have read the [GoToSocial contribution guidelines](https://codeberg.org/superseriousbusiness/gotosocial/src/branch/main/CONTRIBUTING.md). - [ ] I/we have discussed the proposed changes already, either in an issue on the repository, or in the Matrix chat. - [x] I/we have not leveraged AI to create the proposed changes. - [x] I/we have performed a self-review of added code. - [x] I/we have written code that is legible and maintainable by others. - [ ] I/we have commented the added code, particularly in hard-to-understand areas. - [ ] I/we have made any necessary changes to documentation. - [ ] I/we have added tests that cover new code. - [x] I/we have run tests and they pass locally with the changes. (I ran `go test ./...` from the main dir, they passed with one exception related to thumbnail file size, most likely caused by testing on macOS) - [x] I/we have run `go fmt ./...` and `golangci-lint run`. Reviewed-on: https://codeberg.org/superseriousbusiness/gotosocial/pulls/4465 Co-authored-by: Zoë Bijl Co-committed-by: Zoë Bijl --- web/source/css/base.css | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/web/source/css/base.css b/web/source/css/base.css index 0b8c246f4..7f3a04187 100644 --- a/web/source/css/base.css +++ b/web/source/css/base.css @@ -269,7 +269,7 @@ ol { blockquote { padding: 0.5rem; border-left: 0.2rem solid $border-accent; - margin: 0; + margin-inline: 0; font-style: normal; /* From 18a08d9165b985b7e1f4794c353c60639f6b95e8 Mon Sep 17 00:00:00 2001 From: tobi Date: Thu, 2 Oct 2025 11:52:53 +0200 Subject: [PATCH 11/11] [chore] Use bulk updates + fewer loops in status rethreading migration --- CONTRIBUTING.md | 37 +++ .../20250415111056_thread_all_statuses.go | 296 +++++++++++++----- .../new/status.go | 78 ++--- .../util/utiltypes.go | 25 ++ internal/db/bundb/migrations/util.go | 92 ------ internal/gtsmodel/status.go | 100 +++--- 6 files changed, 362 insertions(+), 266 deletions(-) create mode 100644 internal/db/bundb/migrations/20250415111056_thread_all_statuses/util/utiltypes.go diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 611bea97c..10a05002c 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -33,6 +33,8 @@ These contribution guidelines were adapted from / inspired by those of Gitea (ht - [Federation](#federation) - [Updating Swagger docs](#updating-swagger-docs) - [CI/CD configuration](#ci-cd-configuration) +- [Other Useful Stuff](#other-useful-stuff) + - [Running migrations on a Postgres DB backup locally](#running-migrations-on-a-postgres-db-backup-locally) ## Introduction @@ -525,3 +527,38 @@ The `woodpecker` pipeline files are in the `.woodpecker` directory of this repos The Woodpecker instance for GoToSocial is [here](https://woodpecker.superseriousbusiness.org/repos/2). Documentation for Woodpecker is [here](https://woodpecker-ci.org/docs/intro). + +## Other Useful Stuff + +Various bits and bobs. + +### Running migrations on a Postgres DB backup locally + +It may be useful when testing or debugging migrations to be able to run them against a copy of a real instance's Postgres database locally. + +Basic steps for this: + +First dump the Postgres database on the remote machine, and copy the dump over to your development machine. + +Now create a local Postgres container and mount the dump into it with, for example: + +```bash +docker run -it --name postgres --network host -e POSTGRES_PASSWORD=postgres -v /path/to/db_dump:/db_dump postgres +``` + +In a separate terminal window, execute a command inside the running container to load the dump into the "postgres" database: + +```bash +docker exec -it --user postgres postgres psql -X -f /db_dump postgres +``` + +With the Postgres container still running, run GoToSocial and point it towards the container. Use the appropriate `GTS_HOST` (and `GTS_ACCOUNT_DOMAIN`) values for the instance you dumped: + + ```bash + GTS_HOST=example.org \ + GTS_DB_TYPE=postgres \ + GTS_DB_POSTGRES_CONNECTION_STRING=postgres://postgres:postgres@localhost:5432/postgres \ + ./gotosocial migrations run + ``` + +When you're done messing around, don't forget to remove any containers that you started up, and remove any lingering volumes with `docker volume prune`, else you might end up filling your disk with unused temporary volumes. diff --git a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go index fc02d1e40..4dff02a0c 100644 --- a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go +++ b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go @@ -24,13 +24,16 @@ import ( "reflect" "slices" "strings" + "time" "code.superseriousbusiness.org/gotosocial/internal/db" newmodel "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new" oldmodel "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/old" + "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/util" "code.superseriousbusiness.org/gotosocial/internal/gtserror" "code.superseriousbusiness.org/gotosocial/internal/id" "code.superseriousbusiness.org/gotosocial/internal/log" + "code.superseriousbusiness.org/gotosocial/internal/util/xslices" "github.com/uptrace/bun" ) @@ -49,10 +52,26 @@ func init() { "thread_id", "thread_id_new", 1) var sr statusRethreader - var count int + var updatedTotal int64 var maxID string var statuses []*oldmodel.Status + // Create thread_id_new already + // so we can populate it as we go. + log.Info(ctx, "creating statuses column thread_id_new") + if _, err := db.NewAddColumn(). + Table("statuses"). + ColumnExpr(newColDef). + Exec(ctx); err != nil { + return gtserror.Newf("error adding statuses column thread_id_new: %w", err) + } + + // Try to merge the wal so we're + // not working on the wal file. + if err := doWALCheckpoint(ctx, db); err != nil { + return err + } + // Get a total count of all statuses before migration. total, err := db.NewSelect().Table("statuses").Count(ctx) if err != nil { @@ -63,74 +82,129 @@ func init() { // possible ULID value. maxID = id.Highest - log.Warn(ctx, "rethreading top-level statuses, this will take a *long* time") - for /* TOP LEVEL STATUS LOOP */ { + log.Warnf(ctx, "rethreading %d statuses, this will take a *long* time", total) + + // Open initial transaction. + tx, err := db.BeginTx(ctx, nil) + if err != nil { + return err + } + + for i := 1; ; i++ { // Reset slice. clear(statuses) statuses = statuses[:0] - // Select top-level statuses. - if err := db.NewSelect(). - Model(&statuses). - Column("id", "thread_id"). + batchStart := time.Now() + // Select top-level statuses. + if err := tx.NewSelect(). + Model(&statuses). + Column("id"). // We specifically use in_reply_to_account_id instead of in_reply_to_id as // they should both be set / unset in unison, but we specifically have an // index on in_reply_to_account_id with ID ordering, unlike in_reply_to_id. Where("? IS NULL", bun.Ident("in_reply_to_account_id")). Where("? < ?", bun.Ident("id"), maxID). OrderExpr("? DESC", bun.Ident("id")). - Limit(5000). + Limit(500). Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) { return gtserror.Newf("error selecting top level statuses: %w", err) } - // Reached end of block. - if len(statuses) == 0 { + l := len(statuses) + if l == 0 { + // No more statuses! + // + // Transaction will be closed + // after leaving the loop. break + + } else if i%200 == 0 { + // Begin a new transaction every + // 200 batches (~100,000 statuses), + // to avoid massive commits. + + // Close existing transaction. + if err := tx.Commit(); err != nil { + return err + } + + // Try to flush the wal + // to avoid silly wal sizes. + if err := doWALCheckpoint(ctx, db); err != nil { + return err + } + + // Open new transaction. + tx, err = db.BeginTx(ctx, nil) + if err != nil { + return err + } } // Set next maxID value from statuses. maxID = statuses[len(statuses)-1].ID - // Rethread each selected batch of top-level statuses in a transaction. - if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { - - // Rethread each top-level status. - for _, status := range statuses { - n, err := sr.rethreadStatus(ctx, tx, status) - if err != nil { - return gtserror.Newf("error rethreading status %s: %w", status.URI, err) - } - count += n + // Rethread using the + // open transaction. + var updatedInBatch int64 + for _, status := range statuses { + n, err := sr.rethreadStatus(ctx, tx, status, false) + if err != nil { + return gtserror.Newf("error rethreading status %s: %w", status.URI, err) } - - return nil - }); err != nil { - return err + updatedInBatch += n + updatedTotal += n } - log.Infof(ctx, "[approx %d of %d] rethreading statuses (top-level)", count, total) + // Show speed for this batch. + timeTaken := time.Since(batchStart).Milliseconds() + msPerRow := float64(timeTaken) / float64(updatedInBatch) + rowsPerMs := float64(1) / float64(msPerRow) + rowsPerSecond := 1000 * rowsPerMs + + // Show percent migrated overall. + totalDone := (float64(updatedTotal) / float64(total)) * 100 + + log.Infof( + ctx, + "[~%.2f%% done; ~%.0f rows/s] migrating threads", + totalDone, rowsPerSecond, + ) } - // Attempt to merge any sqlite write-ahead-log. - if err := doWALCheckpoint(ctx, db); err != nil { + // Close transaction. + if err := tx.Commit(); err != nil { return err } - log.Warn(ctx, "rethreading straggler statuses, this will take a *long* time") - for /* STRAGGLER STATUS LOOP */ { + // Create a partial index on thread_id_new to find stragglers. + // This index will be removed at the end of the migration. + log.Info(ctx, "creating temporary statuses thread_id_new index") + if _, err := db.NewCreateIndex(). + Table("statuses"). + Index("statuses_thread_id_new_idx"). + Column("thread_id_new"). + Where("? = ?", bun.Ident("thread_id_new"), id.Lowest). + Exec(ctx); err != nil { + return gtserror.Newf("error creating new thread_id index: %w", err) + } + + for i := 1; ; i++ { // Reset slice. clear(statuses) statuses = statuses[:0] + batchStart := time.Now() + // Select straggler statuses. if err := db.NewSelect(). Model(&statuses). - Column("id", "in_reply_to_id", "thread_id"). - Where("? IS NULL", bun.Ident("thread_id")). + Column("id"). + Where("? = ?", bun.Ident("thread_id_new"), id.Lowest). // We select in smaller batches for this part // of the migration as there is a chance that @@ -138,7 +212,7 @@ func init() { // part of the same thread, i.e. one call to // rethreadStatus() may effect other statuses // later in the slice. - Limit(1000). + Limit(250). Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) { return gtserror.Newf("error selecting straggler statuses: %w", err) } @@ -149,23 +223,35 @@ func init() { } // Rethread each selected batch of straggler statuses in a transaction. + var updatedInBatch int64 if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { - - // Rethread each top-level status. for _, status := range statuses { - n, err := sr.rethreadStatus(ctx, tx, status) + n, err := sr.rethreadStatus(ctx, tx, status, true) if err != nil { return gtserror.Newf("error rethreading status %s: %w", status.URI, err) } - count += n + updatedInBatch += n + updatedTotal += n } - return nil }); err != nil { return err } - log.Infof(ctx, "[approx %d of %d] rethreading statuses (stragglers)", count, total) + // Show speed for this batch. + timeTaken := time.Since(batchStart).Milliseconds() + msPerRow := float64(timeTaken) / float64(updatedInBatch) + rowsPerMs := float64(1) / float64(msPerRow) + rowsPerSecond := 1000 * rowsPerMs + + // Show percent migrated overall. + totalDone := (float64(updatedTotal) / float64(total)) * 100 + + log.Infof( + ctx, + "[~%.2f%% done; ~%.0f rows/s] migrating stragglers", + totalDone, rowsPerSecond, + ) } // Attempt to merge any sqlite write-ahead-log. @@ -173,6 +259,13 @@ func init() { return err } + log.Info(ctx, "dropping temporary thread_id_new index") + if _, err := db.NewDropIndex(). + Index("statuses_thread_id_new_idx"). + Exec(ctx); err != nil { + return gtserror.Newf("error dropping temporary thread_id_new index: %w", err) + } + log.Info(ctx, "dropping old thread_to_statuses table") if _, err := db.NewDropTable(). Table("thread_to_statuses"). @@ -180,33 +273,6 @@ func init() { return gtserror.Newf("error dropping old thread_to_statuses table: %w", err) } - log.Info(ctx, "creating new statuses thread_id column") - if _, err := db.NewAddColumn(). - Table("statuses"). - ColumnExpr(newColDef). - Exec(ctx); err != nil { - return gtserror.Newf("error adding new thread_id column: %w", err) - } - - log.Info(ctx, "setting thread_id_new = thread_id (this may take a while...)") - if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { - return batchUpdateByID(ctx, tx, - "statuses", // table - "id", // batchByCol - "UPDATE ? SET ? = ?", // updateQuery - []any{bun.Ident("statuses"), - bun.Ident("thread_id_new"), - bun.Ident("thread_id")}, - ) - }); err != nil { - return err - } - - // Attempt to merge any sqlite write-ahead-log. - if err := doWALCheckpoint(ctx, db); err != nil { - return err - } - log.Info(ctx, "dropping old statuses thread_id index") if _, err := db.NewDropIndex(). Index("statuses_thread_id_idx"). @@ -289,8 +355,8 @@ type statusRethreader struct { } // rethreadStatus is the main logic handler for statusRethreader{}. this is what gets called from the migration -// in order to trigger a status rethreading operation for the given status, returning total number rethreaded. -func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, status *oldmodel.Status) (int, error) { +// in order to trigger a status rethreading operation for the given status, returning total number of rows changed. +func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, status *oldmodel.Status, straggler bool) (int64, error) { // Zero slice and // map ptr values. @@ -315,14 +381,37 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu // This may have changed from // the initial batch selection // to the rethreadStatus() call. + // + // Note: Use a map for this so we + // can also select thread_id_new, + // which is not part of *oldmodel.Status. + upToDateValues := make(map[string]any, 3) if err := tx.NewSelect(). - Model(status). - Column("in_reply_to_id", "thread_id"). + TableExpr("? AS ?", bun.Ident("statuses"), bun.Ident("status")). + Column("in_reply_to_id", "thread_id", "thread_id_new"). Where("? = ?", bun.Ident("id"), status.ID). - Scan(ctx); err != nil { + Scan(ctx, &upToDateValues); err != nil { return 0, gtserror.Newf("error selecting status: %w", err) } + // If we've just threaded this status by setting + // thread_id_new, then by definition anything we + // could find from the entire thread must now be + // threaded, so we can save some database calls + // by skipping iterating up + down from here. + if v, ok := upToDateValues["thread_id_new"]; ok && v.(string) != id.Lowest { + log.Debug(ctx, "skipping just rethreaded status") + return 0, nil + } + + // Set up-to-date values on the status. + if v, ok := upToDateValues["in_reply_to_id"]; ok && v != nil { + status.InReplyToID = v.(string) + } + if v, ok := upToDateValues["thread_id"]; ok && v != nil { + status.ThreadID = v.(string) + } + // status and thread ID cursor // index values. these are used // to keep track of newly loaded @@ -371,14 +460,14 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu threadIdx = len(sr.threadIDs) } - // Total number of - // statuses threaded. - total := len(sr.statusIDs) - // Check for the case where the entire // batch of statuses is already correctly // threaded. Then we have nothing to do! - if sr.allThreaded && len(sr.threadIDs) == 1 { + // + // Skip this check for straggler statuses + // that are part of broken threads. + if !straggler && sr.allThreaded && len(sr.threadIDs) == 1 { + log.Debug(ctx, "skipping just rethreaded thread") return 0, nil } @@ -417,29 +506,61 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu } } - // Update all the statuses to - // use determined thread_id. - if _, err := tx.NewUpdate(). - Table("statuses"). - Where("? IN (?)", bun.Ident("id"), bun.In(sr.statusIDs)). - Set("? = ?", bun.Ident("thread_id"), threadID). - Exec(ctx); err != nil { + // Use a bulk update to update all the + // statuses to use determined thread_id. + // + // https://bun.uptrace.dev/guide/query-update.html#bulk-update + values := make([]*util.Status, 0, len(sr.statusIDs)) + for _, statusID := range sr.statusIDs { + values = append(values, &util.Status{ + ID: statusID, + ThreadIDNew: threadID, + }) + } + + res, err := tx.NewUpdate(). + With("_data", tx.NewValues(&values)). + Model((*util.Status)(nil)). + TableExpr("_data"). + // Set the new thread ID, which we can use as + // an indication that we've migrated this batch. + Set("? = ?", bun.Ident("thread_id_new"), bun.Ident("_data.thread_id_new")). + // While we're here, also set old thread_id, as + // we'll use it for further rethreading purposes. + Set("? = ?", bun.Ident("thread_id"), bun.Ident("_data.thread_id_new")). + // "Join" on status ID. + Where("? = ?", bun.Ident("status.id"), bun.Ident("_data.id")). + // To avoid spurious writes, + // only update unmigrated statuses. + Where("? = ?", bun.Ident("status.thread_id_new"), id.Lowest). + Exec(ctx) + if err != nil { return 0, gtserror.Newf("error updating status thread ids: %w", err) } + rowsAffected, err := res.RowsAffected() + if err != nil { + return 0, gtserror.Newf("error counting rows affected: %w", err) + } + if len(sr.threadIDs) > 0 { // Update any existing thread // mutes to use latest thread_id. + + // Dedupe thread IDs before query + // to avoid ludicrous "IN" clause. + threadIDs := sr.threadIDs + threadIDs = xslices.Deduplicate(threadIDs) if _, err := tx.NewUpdate(). Table("thread_mutes"). - Where("? IN (?)", bun.Ident("thread_id"), bun.In(sr.threadIDs)). + Where("? IN (?)", bun.Ident("thread_id"), bun.In(threadIDs)). Set("? = ?", bun.Ident("thread_id"), threadID). Exec(ctx); err != nil { return 0, gtserror.Newf("error updating mute thread ids: %w", err) } } - return total, nil + return rowsAffected, nil } // append will append the given status to the internal tracking of statusRethreader{} for @@ -560,6 +681,11 @@ func (sr *statusRethreader) getStragglers(ctx context.Context, tx bun.Tx, idx in clear(sr.statuses) sr.statuses = sr.statuses[:0] + // Dedupe thread IDs before query + // to avoid ludicrous "IN" clause. + threadIDs := sr.threadIDs[idx:] + threadIDs = xslices.Deduplicate(threadIDs) + // Select stragglers that // also have thread IDs. if err := tx.NewSelect(). @@ -567,7 +693,7 @@ func (sr *statusRethreader) getStragglers(ctx context.Context, tx bun.Tx, idx in Column("id", "thread_id", "in_reply_to_id"). Where("? IN (?) AND ? NOT IN (?)", bun.Ident("thread_id"), - bun.In(sr.threadIDs[idx:]), + bun.In(threadIDs), bun.Ident("id"), bun.In(sr.statusIDs), ). diff --git a/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new/status.go b/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new/status.go index a03e93859..0f20bb7a4 100644 --- a/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new/status.go +++ b/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new/status.go @@ -23,45 +23,45 @@ import ( // Status represents a user-created 'post' or 'status' in the database, either remote or local type Status struct { - ID string `bun:"type:CHAR(26),pk,nullzero,notnull,unique"` // id of this item in the database - CreatedAt time.Time `bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"` // when was item created - EditedAt time.Time `bun:"type:timestamptz,nullzero"` // when this status was last edited (if set) - FetchedAt time.Time `bun:"type:timestamptz,nullzero"` // when was item (remote) last fetched. - PinnedAt time.Time `bun:"type:timestamptz,nullzero"` // Status was pinned by owning account at this time. - URI string `bun:",unique,nullzero,notnull"` // activitypub URI of this status - URL string `bun:",nullzero"` // web url for viewing this status - Content string `bun:""` // Content HTML for this status. - AttachmentIDs []string `bun:"attachments,array"` // Database IDs of any media attachments associated with this status - TagIDs []string `bun:"tags,array"` // Database IDs of any tags used in this status - MentionIDs []string `bun:"mentions,array"` // Database IDs of any mentions in this status - EmojiIDs []string `bun:"emojis,array"` // Database IDs of any emojis used in this status - Local *bool `bun:",nullzero,notnull,default:false"` // is this status from a local account? - AccountID string `bun:"type:CHAR(26),nullzero,notnull"` // which account posted this status? - AccountURI string `bun:",nullzero,notnull"` // activitypub uri of the owner of this status - InReplyToID string `bun:"type:CHAR(26),nullzero"` // id of the status this status replies to - InReplyToURI string `bun:",nullzero"` // activitypub uri of the status this status is a reply to - InReplyToAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that this status replies to - InReplyTo *Status `bun:"-"` // status corresponding to inReplyToID - BoostOfID string `bun:"type:CHAR(26),nullzero"` // id of the status this status is a boost of - BoostOfURI string `bun:"-"` // URI of the status this status is a boost of; field not inserted in the db, just for dereferencing purposes. - BoostOfAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that owns the boosted status - BoostOf *Status `bun:"-"` // status that corresponds to boostOfID - ThreadID string `bun:"type:CHAR(26),nullzero,notnull,default:00000000000000000000000000"` // id of the thread to which this status belongs - EditIDs []string `bun:"edits,array"` // - PollID string `bun:"type:CHAR(26),nullzero"` // - ContentWarning string `bun:",nullzero"` // Content warning HTML for this status. - ContentWarningText string `bun:""` // Original text of the content warning without formatting - Visibility Visibility `bun:",nullzero,notnull"` // visibility entry for this status - Sensitive *bool `bun:",nullzero,notnull,default:false"` // mark the status as sensitive? - Language string `bun:",nullzero"` // what language is this status written in? - CreatedWithApplicationID string `bun:"type:CHAR(26),nullzero"` // Which application was used to create this status? - ActivityStreamsType string `bun:",nullzero,notnull"` // What is the activitystreams type of this status? See: https://www.w3.org/TR/activitystreams-vocabulary/#object-types. Will probably almost always be Note but who knows!. - Text string `bun:""` // Original text of the status without formatting - ContentType StatusContentType `bun:",nullzero"` // Content type used to process the original text of the status - Federated *bool `bun:",notnull"` // This status will be federated beyond the local timeline(s) - PendingApproval *bool `bun:",nullzero,notnull,default:false"` // If true then status is a reply or boost wrapper that must be Approved by the reply-ee or boost-ee before being fully distributed. - PreApproved bool `bun:"-"` // If true, then status is a reply to or boost wrapper of a status on our instance, has permission to do the interaction, and an Accept should be sent out for it immediately. Field not stored in the DB. - ApprovedByURI string `bun:",nullzero"` // URI of an Accept Activity that approves the Announce or Create Activity that this status was/will be attached to. + ID string `bun:"type:CHAR(26),pk,nullzero,notnull,unique"` // id of this item in the database + CreatedAt time.Time `bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"` // when was item created + EditedAt time.Time `bun:"type:timestamptz,nullzero"` // when this status was last edited (if set) + FetchedAt time.Time `bun:"type:timestamptz,nullzero"` // when was item (remote) last fetched. + PinnedAt time.Time `bun:"type:timestamptz,nullzero"` // Status was pinned by owning account at this time. + URI string `bun:",unique,nullzero,notnull"` // activitypub URI of this status + URL string `bun:",nullzero"` // web url for viewing this status + Content string `bun:""` // Content HTML for this status. + AttachmentIDs []string `bun:"attachments,array"` // Database IDs of any media attachments associated with this status + TagIDs []string `bun:"tags,array"` // Database IDs of any tags used in this status + MentionIDs []string `bun:"mentions,array"` // Database IDs of any mentions in this status + EmojiIDs []string `bun:"emojis,array"` // Database IDs of any emojis used in this status + Local *bool `bun:",nullzero,notnull,default:false"` // is this status from a local account? + AccountID string `bun:"type:CHAR(26),nullzero,notnull"` // which account posted this status? + AccountURI string `bun:",nullzero,notnull"` // activitypub uri of the owner of this status + InReplyToID string `bun:"type:CHAR(26),nullzero"` // id of the status this status replies to + InReplyToURI string `bun:",nullzero"` // activitypub uri of the status this status is a reply to + InReplyToAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that this status replies to + InReplyTo *Status `bun:"-"` // status corresponding to inReplyToID + BoostOfID string `bun:"type:CHAR(26),nullzero"` // id of the status this status is a boost of + BoostOfURI string `bun:"-"` // URI of the status this status is a boost of; field not inserted in the db, just for dereferencing purposes. + BoostOfAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that owns the boosted status + BoostOf *Status `bun:"-"` // status that corresponds to boostOfID + ThreadID string `bun:"type:CHAR(26),nullzero,notnull,default:'00000000000000000000000000'"` // id of the thread to which this status belongs + EditIDs []string `bun:"edits,array"` // + PollID string `bun:"type:CHAR(26),nullzero"` // + ContentWarning string `bun:",nullzero"` // Content warning HTML for this status. + ContentWarningText string `bun:""` // Original text of the content warning without formatting + Visibility Visibility `bun:",nullzero,notnull"` // visibility entry for this status + Sensitive *bool `bun:",nullzero,notnull,default:false"` // mark the status as sensitive? + Language string `bun:",nullzero"` // what language is this status written in? + CreatedWithApplicationID string `bun:"type:CHAR(26),nullzero"` // Which application was used to create this status? + ActivityStreamsType string `bun:",nullzero,notnull"` // What is the activitystreams type of this status? See: https://www.w3.org/TR/activitystreams-vocabulary/#object-types. Will probably almost always be Note but who knows!. + Text string `bun:""` // Original text of the status without formatting + ContentType StatusContentType `bun:",nullzero"` // Content type used to process the original text of the status + Federated *bool `bun:",notnull"` // This status will be federated beyond the local timeline(s) + PendingApproval *bool `bun:",nullzero,notnull,default:false"` // If true then status is a reply or boost wrapper that must be Approved by the reply-ee or boost-ee before being fully distributed. + PreApproved bool `bun:"-"` // If true, then status is a reply to or boost wrapper of a status on our instance, has permission to do the interaction, and an Accept should be sent out for it immediately. Field not stored in the DB. + ApprovedByURI string `bun:",nullzero"` // URI of an Accept Activity that approves the Announce or Create Activity that this status was/will be attached to. } // enumType is the type we (at least, should) use diff --git a/internal/db/bundb/migrations/20250415111056_thread_all_statuses/util/utiltypes.go b/internal/db/bundb/migrations/20250415111056_thread_all_statuses/util/utiltypes.go new file mode 100644 index 000000000..ed7256aad --- /dev/null +++ b/internal/db/bundb/migrations/20250415111056_thread_all_statuses/util/utiltypes.go @@ -0,0 +1,25 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package util + +// Status is a helper type specifically +// for updating the thread ID of a status. +type Status struct { + ID string `bun:"type:CHAR(26)"` + ThreadIDNew string `bun:"type:CHAR(26)"` +} diff --git a/internal/db/bundb/migrations/util.go b/internal/db/bundb/migrations/util.go index 4a3a62b21..b5543c824 100644 --- a/internal/db/bundb/migrations/util.go +++ b/internal/db/bundb/migrations/util.go @@ -66,98 +66,6 @@ func doWALCheckpoint(ctx context.Context, db *bun.DB) error { return nil } -// batchUpdateByID performs the given updateQuery with updateArgs -// over the entire given table, batching by the ID of batchByCol. -func batchUpdateByID( - ctx context.Context, - tx bun.Tx, - table string, - batchByCol string, - updateQuery string, - updateArgs []any, -) error { - // Get a count of all in table. - total, err := tx.NewSelect(). - Table(table). - Count(ctx) - if err != nil { - return gtserror.Newf("error selecting total count: %w", err) - } - - // Query batch size - // in number of rows. - const batchsz = 5000 - - // Stores highest batch value - // used in iterate queries, - // starting at highest possible. - highest := id.Highest - - // Total updated rows. - var updated int - - for { - // Limit to batchsz - // items at once. - batchQ := tx. - NewSelect(). - Table(table). - Column(batchByCol). - Where("? < ?", bun.Ident(batchByCol), highest). - OrderExpr("? DESC", bun.Ident(batchByCol)). - Limit(batchsz) - - // Finalize UPDATE to act only on batch. - qStr := updateQuery + " WHERE ? IN (?)" - args := append(slices.Clone(updateArgs), - bun.Ident(batchByCol), - batchQ, - ) - - // Execute the prepared raw query with arguments. - res, err := tx.NewRaw(qStr, args...).Exec(ctx) - if err != nil { - return gtserror.Newf("error updating old column values: %w", err) - } - - // Check how many items we updated. - thisUpdated, err := res.RowsAffected() - if err != nil { - return gtserror.Newf("error counting affected rows: %w", err) - } - - if thisUpdated == 0 { - // Nothing updated - // means we're done. - break - } - - // Update the overall count. - updated += int(thisUpdated) - - // Log helpful message to admin. - log.Infof(ctx, "migrated %d of %d %s (up to %s)", - updated, total, table, highest) - - // Get next highest - // id for next batch. - if err := tx. - NewSelect(). - With("batch_query", batchQ). - ColumnExpr("min(?) FROM ?", bun.Ident(batchByCol), bun.Ident("batch_query")). - Scan(ctx, &highest); err != nil { - return gtserror.Newf("error selecting next highest: %w", err) - } - } - - if total != int(updated) { - // Return error here in order to rollback the whole transaction. - return fmt.Errorf("total=%d does not match updated=%d", total, updated) - } - - return nil -} - // convertEnums performs a transaction that converts // a table's column of our old-style enums (strings) to // more performant and space-saving integer types. diff --git a/internal/gtsmodel/status.go b/internal/gtsmodel/status.go index 31e8fe881..0f0fb8404 100644 --- a/internal/gtsmodel/status.go +++ b/internal/gtsmodel/status.go @@ -27,56 +27,56 @@ import ( // Status represents a user-created 'post' or 'status' in the database, either remote or local type Status struct { - ID string `bun:"type:CHAR(26),pk,nullzero,notnull,unique"` // id of this item in the database - CreatedAt time.Time `bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"` // when was item created - EditedAt time.Time `bun:"type:timestamptz,nullzero"` // when this status was last edited (if set) - FetchedAt time.Time `bun:"type:timestamptz,nullzero"` // when was item (remote) last fetched. - PinnedAt time.Time `bun:"type:timestamptz,nullzero"` // Status was pinned by owning account at this time. - URI string `bun:",unique,nullzero,notnull"` // activitypub URI of this status - URL string `bun:",nullzero"` // web url for viewing this status - Content string `bun:""` // Content HTML for this status. - AttachmentIDs []string `bun:"attachments,array"` // Database IDs of any media attachments associated with this status - Attachments []*MediaAttachment `bun:"attached_media,rel:has-many"` // Attachments corresponding to attachmentIDs - TagIDs []string `bun:"tags,array"` // Database IDs of any tags used in this status - Tags []*Tag `bun:"attached_tags,m2m:status_to_tags"` // Tags corresponding to tagIDs. https://bun.uptrace.dev/guide/relations.html#many-to-many-relation - MentionIDs []string `bun:"mentions,array"` // Database IDs of any mentions in this status - Mentions []*Mention `bun:"attached_mentions,rel:has-many"` // Mentions corresponding to mentionIDs - EmojiIDs []string `bun:"emojis,array"` // Database IDs of any emojis used in this status - Emojis []*Emoji `bun:"attached_emojis,m2m:status_to_emojis"` // Emojis corresponding to emojiIDs. https://bun.uptrace.dev/guide/relations.html#many-to-many-relation - Local *bool `bun:",nullzero,notnull,default:false"` // is this status from a local account? - AccountID string `bun:"type:CHAR(26),nullzero,notnull"` // which account posted this status? - Account *Account `bun:"rel:belongs-to"` // account corresponding to accountID - AccountURI string `bun:",nullzero,notnull"` // activitypub uri of the owner of this status - InReplyToID string `bun:"type:CHAR(26),nullzero"` // id of the status this status replies to - InReplyToURI string `bun:",nullzero"` // activitypub uri of the status this status is a reply to - InReplyToAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that this status replies to - InReplyTo *Status `bun:"-"` // status corresponding to inReplyToID - InReplyToAccount *Account `bun:"rel:belongs-to"` // account corresponding to inReplyToAccountID - BoostOfID string `bun:"type:CHAR(26),nullzero"` // id of the status this status is a boost of - BoostOfURI string `bun:"-"` // URI of the status this status is a boost of; field not inserted in the db, just for dereferencing purposes. - BoostOfAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that owns the boosted status - BoostOf *Status `bun:"-"` // status that corresponds to boostOfID - BoostOfAccount *Account `bun:"rel:belongs-to"` // account that corresponds to boostOfAccountID - ThreadID string `bun:"type:CHAR(26),nullzero,notnull,default:00000000000000000000000000"` // id of the thread to which this status belongs - EditIDs []string `bun:"edits,array"` // IDs of status edits for this status, ordered from smallest (oldest) -> largest (newest) ID. - Edits []*StatusEdit `bun:"-"` // Edits of this status, ordered from oldest -> newest edit. - PollID string `bun:"type:CHAR(26),nullzero"` // - Poll *Poll `bun:"-"` // - ContentWarning string `bun:",nullzero"` // Content warning HTML for this status. - ContentWarningText string `bun:""` // Original text of the content warning without formatting - Visibility Visibility `bun:",nullzero,notnull"` // visibility entry for this status - Sensitive *bool `bun:",nullzero,notnull,default:false"` // mark the status as sensitive? - Language string `bun:",nullzero"` // what language is this status written in? - CreatedWithApplicationID string `bun:"type:CHAR(26),nullzero"` // Which application was used to create this status? - CreatedWithApplication *Application `bun:"rel:belongs-to"` // application corresponding to createdWithApplicationID - ActivityStreamsType string `bun:",nullzero,notnull"` // What is the activitystreams type of this status? See: https://www.w3.org/TR/activitystreams-vocabulary/#object-types. Will probably almost always be Note but who knows!. - Text string `bun:""` // Original text of the status without formatting - ContentType StatusContentType `bun:",nullzero"` // Content type used to process the original text of the status - Federated *bool `bun:",notnull"` // This status will be federated beyond the local timeline(s) - InteractionPolicy *InteractionPolicy `bun:""` // InteractionPolicy for this status. If null then the default InteractionPolicy should be assumed for this status's Visibility. Always null for boost wrappers. - PendingApproval *bool `bun:",nullzero,notnull,default:false"` // If true then status is a reply or boost wrapper that must be Approved by the reply-ee or boost-ee before being fully distributed. - PreApproved bool `bun:"-"` // If true, then status is a reply to or boost wrapper of a status on our instance, has permission to do the interaction, and an Accept should be sent out for it immediately. Field not stored in the DB. - ApprovedByURI string `bun:",nullzero"` // URI of *either* an Accept Activity, or a ReplyAuthorization or AnnounceAuthorization, which approves the Announce, Create or interaction request Activity that this status was/will be attached to. + ID string `bun:"type:CHAR(26),pk,nullzero,notnull,unique"` // id of this item in the database + CreatedAt time.Time `bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"` // when was item created + EditedAt time.Time `bun:"type:timestamptz,nullzero"` // when this status was last edited (if set) + FetchedAt time.Time `bun:"type:timestamptz,nullzero"` // when was item (remote) last fetched. + PinnedAt time.Time `bun:"type:timestamptz,nullzero"` // Status was pinned by owning account at this time. + URI string `bun:",unique,nullzero,notnull"` // activitypub URI of this status + URL string `bun:",nullzero"` // web url for viewing this status + Content string `bun:""` // Content HTML for this status. + AttachmentIDs []string `bun:"attachments,array"` // Database IDs of any media attachments associated with this status + Attachments []*MediaAttachment `bun:"attached_media,rel:has-many"` // Attachments corresponding to attachmentIDs + TagIDs []string `bun:"tags,array"` // Database IDs of any tags used in this status + Tags []*Tag `bun:"attached_tags,m2m:status_to_tags"` // Tags corresponding to tagIDs. https://bun.uptrace.dev/guide/relations.html#many-to-many-relation + MentionIDs []string `bun:"mentions,array"` // Database IDs of any mentions in this status + Mentions []*Mention `bun:"attached_mentions,rel:has-many"` // Mentions corresponding to mentionIDs + EmojiIDs []string `bun:"emojis,array"` // Database IDs of any emojis used in this status + Emojis []*Emoji `bun:"attached_emojis,m2m:status_to_emojis"` // Emojis corresponding to emojiIDs. https://bun.uptrace.dev/guide/relations.html#many-to-many-relation + Local *bool `bun:",nullzero,notnull,default:false"` // is this status from a local account? + AccountID string `bun:"type:CHAR(26),nullzero,notnull"` // which account posted this status? + Account *Account `bun:"rel:belongs-to"` // account corresponding to accountID + AccountURI string `bun:",nullzero,notnull"` // activitypub uri of the owner of this status + InReplyToID string `bun:"type:CHAR(26),nullzero"` // id of the status this status replies to + InReplyToURI string `bun:",nullzero"` // activitypub uri of the status this status is a reply to + InReplyToAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that this status replies to + InReplyTo *Status `bun:"-"` // status corresponding to inReplyToID + InReplyToAccount *Account `bun:"rel:belongs-to"` // account corresponding to inReplyToAccountID + BoostOfID string `bun:"type:CHAR(26),nullzero"` // id of the status this status is a boost of + BoostOfURI string `bun:"-"` // URI of the status this status is a boost of; field not inserted in the db, just for dereferencing purposes. + BoostOfAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that owns the boosted status + BoostOf *Status `bun:"-"` // status that corresponds to boostOfID + BoostOfAccount *Account `bun:"rel:belongs-to"` // account that corresponds to boostOfAccountID + ThreadID string `bun:"type:CHAR(26),nullzero,notnull,default:'00000000000000000000000000'"` // id of the thread to which this status belongs + EditIDs []string `bun:"edits,array"` // IDs of status edits for this status, ordered from smallest (oldest) -> largest (newest) ID. + Edits []*StatusEdit `bun:"-"` // Edits of this status, ordered from oldest -> newest edit. + PollID string `bun:"type:CHAR(26),nullzero"` // + Poll *Poll `bun:"-"` // + ContentWarning string `bun:",nullzero"` // Content warning HTML for this status. + ContentWarningText string `bun:""` // Original text of the content warning without formatting + Visibility Visibility `bun:",nullzero,notnull"` // visibility entry for this status + Sensitive *bool `bun:",nullzero,notnull,default:false"` // mark the status as sensitive? + Language string `bun:",nullzero"` // what language is this status written in? + CreatedWithApplicationID string `bun:"type:CHAR(26),nullzero"` // Which application was used to create this status? + CreatedWithApplication *Application `bun:"rel:belongs-to"` // application corresponding to createdWithApplicationID + ActivityStreamsType string `bun:",nullzero,notnull"` // What is the activitystreams type of this status? See: https://www.w3.org/TR/activitystreams-vocabulary/#object-types. Will probably almost always be Note but who knows!. + Text string `bun:""` // Original text of the status without formatting + ContentType StatusContentType `bun:",nullzero"` // Content type used to process the original text of the status + Federated *bool `bun:",notnull"` // This status will be federated beyond the local timeline(s) + InteractionPolicy *InteractionPolicy `bun:""` // InteractionPolicy for this status. If null then the default InteractionPolicy should be assumed for this status's Visibility. Always null for boost wrappers. + PendingApproval *bool `bun:",nullzero,notnull,default:false"` // If true then status is a reply or boost wrapper that must be Approved by the reply-ee or boost-ee before being fully distributed. + PreApproved bool `bun:"-"` // If true, then status is a reply to or boost wrapper of a status on our instance, has permission to do the interaction, and an Accept should be sent out for it immediately. Field not stored in the DB. + ApprovedByURI string `bun:",nullzero"` // URI of *either* an Accept Activity, or a ReplyAuthorization or AnnounceAuthorization, which approves the Announce, Create or interaction request Activity that this status was/will be attached to. } // GetID implements timeline.Timelineable{}.