mirror of
				https://github.com/superseriousbusiness/gotosocial.git
				synced 2025-10-31 09:52:26 -05:00 
			
		
		
		
	[chore] Use bulk updates + fewer loops in status rethreading migration
This commit is contained in:
		
					parent
					
						
							
								bd1c43d55e
							
						
					
				
			
			
				commit
				
					
						18a08d9165
					
				
			
		
					 6 changed files with 362 additions and 266 deletions
				
			
		|  | @ -33,6 +33,8 @@ These contribution guidelines were adapted from / inspired by those of Gitea (ht | |||
|     - [Federation](#federation) | ||||
|   - [Updating Swagger docs](#updating-swagger-docs) | ||||
|   - [CI/CD configuration](#ci-cd-configuration) | ||||
| - [Other Useful Stuff](#other-useful-stuff) | ||||
|   - [Running migrations on a Postgres DB backup locally](#running-migrations-on-a-postgres-db-backup-locally) | ||||
| 
 | ||||
| ## Introduction | ||||
| 
 | ||||
|  | @ -525,3 +527,38 @@ The `woodpecker` pipeline files are in the `.woodpecker` directory of this repos | |||
| The Woodpecker instance for GoToSocial is [here](https://woodpecker.superseriousbusiness.org/repos/2). | ||||
| 
 | ||||
| Documentation for Woodpecker is [here](https://woodpecker-ci.org/docs/intro). | ||||
| 
 | ||||
| ## Other Useful Stuff | ||||
| 
 | ||||
| Various bits and bobs. | ||||
| 
 | ||||
| ### Running migrations on a Postgres DB backup locally | ||||
| 
 | ||||
| It may be useful when testing or debugging migrations to be able to run them against a copy of a real instance's Postgres database locally. | ||||
| 
 | ||||
| Basic steps for this: | ||||
| 
 | ||||
| First dump the Postgres database on the remote machine, and copy the dump over to your development machine. | ||||
| 
 | ||||
| Now create a local Postgres container and mount the dump into it with, for example: | ||||
| 
 | ||||
| ```bash | ||||
| docker run -it --name postgres --network host -e POSTGRES_PASSWORD=postgres -v /path/to/db_dump:/db_dump postgres | ||||
| ``` | ||||
| 
 | ||||
| In a separate terminal window, execute a command inside the running container to load the dump into the "postgres" database: | ||||
| 
 | ||||
| ```bash | ||||
| docker exec -it --user postgres postgres psql -X -f /db_dump postgres | ||||
| ``` | ||||
| 
 | ||||
| With the Postgres container still running, run GoToSocial and point it towards the container. Use the appropriate `GTS_HOST` (and `GTS_ACCOUNT_DOMAIN`) values for the instance you dumped: | ||||
|    | ||||
|   ```bash | ||||
|   GTS_HOST=example.org \ | ||||
|   GTS_DB_TYPE=postgres \ | ||||
|   GTS_DB_POSTGRES_CONNECTION_STRING=postgres://postgres:postgres@localhost:5432/postgres \ | ||||
|   ./gotosocial migrations run | ||||
|   ``` | ||||
| 
 | ||||
| When you're done messing around, don't forget to remove any containers that you started up, and remove any lingering volumes with `docker volume prune`, else you might end up filling your disk with unused temporary volumes. | ||||
|  |  | |||
|  | @ -24,13 +24,16 @@ import ( | |||
| 	"reflect" | ||||
| 	"slices" | ||||
| 	"strings" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"code.superseriousbusiness.org/gotosocial/internal/db" | ||||
| 	newmodel "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new" | ||||
| 	oldmodel "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/old" | ||||
| 	"code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/util" | ||||
| 	"code.superseriousbusiness.org/gotosocial/internal/gtserror" | ||||
| 	"code.superseriousbusiness.org/gotosocial/internal/id" | ||||
| 	"code.superseriousbusiness.org/gotosocial/internal/log" | ||||
| 	"code.superseriousbusiness.org/gotosocial/internal/util/xslices" | ||||
| 	"github.com/uptrace/bun" | ||||
| ) | ||||
| 
 | ||||
|  | @ -49,10 +52,26 @@ func init() { | |||
| 			"thread_id", "thread_id_new", 1) | ||||
| 
 | ||||
| 		var sr statusRethreader | ||||
| 		var count int | ||||
| 		var updatedTotal int64 | ||||
| 		var maxID string | ||||
| 		var statuses []*oldmodel.Status | ||||
| 
 | ||||
| 		// Create thread_id_new already | ||||
| 		// so we can populate it as we go. | ||||
| 		log.Info(ctx, "creating statuses column thread_id_new") | ||||
| 		if _, err := db.NewAddColumn(). | ||||
| 			Table("statuses"). | ||||
| 			ColumnExpr(newColDef). | ||||
| 			Exec(ctx); err != nil { | ||||
| 			return gtserror.Newf("error adding statuses column thread_id_new: %w", err) | ||||
| 		} | ||||
| 
 | ||||
| 		// Try to merge the wal so we're | ||||
| 		// not working on the wal file. | ||||
| 		if err := doWALCheckpoint(ctx, db); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 
 | ||||
| 		// Get a total count of all statuses before migration. | ||||
| 		total, err := db.NewSelect().Table("statuses").Count(ctx) | ||||
| 		if err != nil { | ||||
|  | @ -63,74 +82,129 @@ func init() { | |||
| 		// possible ULID value. | ||||
| 		maxID = id.Highest | ||||
| 
 | ||||
| 		log.Warn(ctx, "rethreading top-level statuses, this will take a *long* time") | ||||
| 		for /* TOP LEVEL STATUS LOOP */ { | ||||
| 		log.Warnf(ctx, "rethreading %d statuses, this will take a *long* time", total) | ||||
| 
 | ||||
| 		// Open initial transaction. | ||||
| 		tx, err := db.BeginTx(ctx, nil) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 
 | ||||
| 		for i := 1; ; i++ { | ||||
| 
 | ||||
| 			// Reset slice. | ||||
| 			clear(statuses) | ||||
| 			statuses = statuses[:0] | ||||
| 
 | ||||
| 			// Select top-level statuses. | ||||
| 			if err := db.NewSelect(). | ||||
| 				Model(&statuses). | ||||
| 				Column("id", "thread_id"). | ||||
| 			batchStart := time.Now() | ||||
| 
 | ||||
| 			// Select top-level statuses. | ||||
| 			if err := tx.NewSelect(). | ||||
| 				Model(&statuses). | ||||
| 				Column("id"). | ||||
| 				// We specifically use in_reply_to_account_id instead of in_reply_to_id as | ||||
| 				// they should both be set / unset in unison, but we specifically have an | ||||
| 				// index on in_reply_to_account_id with ID ordering, unlike in_reply_to_id. | ||||
| 				Where("? IS NULL", bun.Ident("in_reply_to_account_id")). | ||||
| 				Where("? < ?", bun.Ident("id"), maxID). | ||||
| 				OrderExpr("? DESC", bun.Ident("id")). | ||||
| 				Limit(5000). | ||||
| 				Limit(500). | ||||
| 				Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) { | ||||
| 				return gtserror.Newf("error selecting top level statuses: %w", err) | ||||
| 			} | ||||
| 
 | ||||
| 			// Reached end of block. | ||||
| 			if len(statuses) == 0 { | ||||
| 			l := len(statuses) | ||||
| 			if l == 0 { | ||||
| 				// No more statuses! | ||||
| 				// | ||||
| 				// Transaction will be closed | ||||
| 				// after leaving the loop. | ||||
| 				break | ||||
| 
 | ||||
| 			} else if i%200 == 0 { | ||||
| 				// Begin a new transaction every | ||||
| 				// 200 batches (~100,000 statuses), | ||||
| 				// to avoid massive commits. | ||||
| 
 | ||||
| 				// Close existing transaction. | ||||
| 				if err := tx.Commit(); err != nil { | ||||
| 					return err | ||||
| 				} | ||||
| 
 | ||||
| 				// Try to flush the wal | ||||
| 				// to avoid silly wal sizes. | ||||
| 				if err := doWALCheckpoint(ctx, db); err != nil { | ||||
| 					return err | ||||
| 				} | ||||
| 
 | ||||
| 				// Open new transaction. | ||||
| 				tx, err = db.BeginTx(ctx, nil) | ||||
| 				if err != nil { | ||||
| 					return err | ||||
| 				} | ||||
| 			} | ||||
| 
 | ||||
| 			// Set next maxID value from statuses. | ||||
| 			maxID = statuses[len(statuses)-1].ID | ||||
| 
 | ||||
| 			// Rethread each selected batch of top-level statuses in a transaction. | ||||
| 			if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { | ||||
| 
 | ||||
| 				// Rethread each top-level status. | ||||
| 			// Rethread using the | ||||
| 			// open transaction. | ||||
| 			var updatedInBatch int64 | ||||
| 			for _, status := range statuses { | ||||
| 					n, err := sr.rethreadStatus(ctx, tx, status) | ||||
| 				n, err := sr.rethreadStatus(ctx, tx, status, false) | ||||
| 				if err != nil { | ||||
| 					return gtserror.Newf("error rethreading status %s: %w", status.URI, err) | ||||
| 				} | ||||
| 					count += n | ||||
| 				updatedInBatch += n | ||||
| 				updatedTotal += n | ||||
| 			} | ||||
| 
 | ||||
| 				return nil | ||||
| 			}); err != nil { | ||||
| 			// Show speed for this batch. | ||||
| 			timeTaken := time.Since(batchStart).Milliseconds() | ||||
| 			msPerRow := float64(timeTaken) / float64(updatedInBatch) | ||||
| 			rowsPerMs := float64(1) / float64(msPerRow) | ||||
| 			rowsPerSecond := 1000 * rowsPerMs | ||||
| 
 | ||||
| 			// Show percent migrated overall. | ||||
| 			totalDone := (float64(updatedTotal) / float64(total)) * 100 | ||||
| 
 | ||||
| 			log.Infof( | ||||
| 				ctx, | ||||
| 				"[~%.2f%% done; ~%.0f rows/s] migrating threads", | ||||
| 				totalDone, rowsPerSecond, | ||||
| 			) | ||||
| 		} | ||||
| 
 | ||||
| 		// Close transaction. | ||||
| 		if err := tx.Commit(); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 
 | ||||
| 			log.Infof(ctx, "[approx %d of %d] rethreading statuses (top-level)", count, total) | ||||
| 		// Create a partial index on thread_id_new to find stragglers. | ||||
| 		// This index will be removed at the end of the migration. | ||||
| 		log.Info(ctx, "creating temporary statuses thread_id_new index") | ||||
| 		if _, err := db.NewCreateIndex(). | ||||
| 			Table("statuses"). | ||||
| 			Index("statuses_thread_id_new_idx"). | ||||
| 			Column("thread_id_new"). | ||||
| 			Where("? = ?", bun.Ident("thread_id_new"), id.Lowest). | ||||
| 			Exec(ctx); err != nil { | ||||
| 			return gtserror.Newf("error creating new thread_id index: %w", err) | ||||
| 		} | ||||
| 
 | ||||
| 		// Attempt to merge any sqlite write-ahead-log. | ||||
| 		if err := doWALCheckpoint(ctx, db); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 
 | ||||
| 		log.Warn(ctx, "rethreading straggler statuses, this will take a *long* time") | ||||
| 		for /* STRAGGLER STATUS LOOP */ { | ||||
| 		for i := 1; ; i++ { | ||||
| 
 | ||||
| 			// Reset slice. | ||||
| 			clear(statuses) | ||||
| 			statuses = statuses[:0] | ||||
| 
 | ||||
| 			batchStart := time.Now() | ||||
| 
 | ||||
| 			// Select straggler statuses. | ||||
| 			if err := db.NewSelect(). | ||||
| 				Model(&statuses). | ||||
| 				Column("id", "in_reply_to_id", "thread_id"). | ||||
| 				Where("? IS NULL", bun.Ident("thread_id")). | ||||
| 				Column("id"). | ||||
| 				Where("? = ?", bun.Ident("thread_id_new"), id.Lowest). | ||||
| 
 | ||||
| 				// We select in smaller batches for this part | ||||
| 				// of the migration as there is a chance that | ||||
|  | @ -138,7 +212,7 @@ func init() { | |||
| 				// part of the same thread, i.e. one call to | ||||
| 				// rethreadStatus() may effect other statuses | ||||
| 				// later in the slice. | ||||
| 				Limit(1000). | ||||
| 				Limit(250). | ||||
| 				Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) { | ||||
| 				return gtserror.Newf("error selecting straggler statuses: %w", err) | ||||
| 			} | ||||
|  | @ -149,23 +223,35 @@ func init() { | |||
| 			} | ||||
| 
 | ||||
| 			// Rethread each selected batch of straggler statuses in a transaction. | ||||
| 			var updatedInBatch int64 | ||||
| 			if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { | ||||
| 
 | ||||
| 				// Rethread each top-level status. | ||||
| 				for _, status := range statuses { | ||||
| 					n, err := sr.rethreadStatus(ctx, tx, status) | ||||
| 					n, err := sr.rethreadStatus(ctx, tx, status, true) | ||||
| 					if err != nil { | ||||
| 						return gtserror.Newf("error rethreading status %s: %w", status.URI, err) | ||||
| 					} | ||||
| 					count += n | ||||
| 					updatedInBatch += n | ||||
| 					updatedTotal += n | ||||
| 				} | ||||
| 
 | ||||
| 				return nil | ||||
| 			}); err != nil { | ||||
| 				return err | ||||
| 			} | ||||
| 
 | ||||
| 			log.Infof(ctx, "[approx %d of %d] rethreading statuses (stragglers)", count, total) | ||||
| 			// Show speed for this batch. | ||||
| 			timeTaken := time.Since(batchStart).Milliseconds() | ||||
| 			msPerRow := float64(timeTaken) / float64(updatedInBatch) | ||||
| 			rowsPerMs := float64(1) / float64(msPerRow) | ||||
| 			rowsPerSecond := 1000 * rowsPerMs | ||||
| 
 | ||||
| 			// Show percent migrated overall. | ||||
| 			totalDone := (float64(updatedTotal) / float64(total)) * 100 | ||||
| 
 | ||||
| 			log.Infof( | ||||
| 				ctx, | ||||
| 				"[~%.2f%% done; ~%.0f rows/s] migrating stragglers", | ||||
| 				totalDone, rowsPerSecond, | ||||
| 			) | ||||
| 		} | ||||
| 
 | ||||
| 		// Attempt to merge any sqlite write-ahead-log. | ||||
|  | @ -173,6 +259,13 @@ func init() { | |||
| 			return err | ||||
| 		} | ||||
| 
 | ||||
| 		log.Info(ctx, "dropping temporary thread_id_new index") | ||||
| 		if _, err := db.NewDropIndex(). | ||||
| 			Index("statuses_thread_id_new_idx"). | ||||
| 			Exec(ctx); err != nil { | ||||
| 			return gtserror.Newf("error dropping temporary thread_id_new index: %w", err) | ||||
| 		} | ||||
| 
 | ||||
| 		log.Info(ctx, "dropping old thread_to_statuses table") | ||||
| 		if _, err := db.NewDropTable(). | ||||
| 			Table("thread_to_statuses"). | ||||
|  | @ -180,33 +273,6 @@ func init() { | |||
| 			return gtserror.Newf("error dropping old thread_to_statuses table: %w", err) | ||||
| 		} | ||||
| 
 | ||||
| 		log.Info(ctx, "creating new statuses thread_id column") | ||||
| 		if _, err := db.NewAddColumn(). | ||||
| 			Table("statuses"). | ||||
| 			ColumnExpr(newColDef). | ||||
| 			Exec(ctx); err != nil { | ||||
| 			return gtserror.Newf("error adding new thread_id column: %w", err) | ||||
| 		} | ||||
| 
 | ||||
| 		log.Info(ctx, "setting thread_id_new = thread_id (this may take a while...)") | ||||
| 		if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { | ||||
| 			return batchUpdateByID(ctx, tx, | ||||
| 				"statuses",           // table | ||||
| 				"id",                 // batchByCol | ||||
| 				"UPDATE ? SET ? = ?", // updateQuery | ||||
| 				[]any{bun.Ident("statuses"), | ||||
| 					bun.Ident("thread_id_new"), | ||||
| 					bun.Ident("thread_id")}, | ||||
| 			) | ||||
| 		}); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 
 | ||||
| 		// Attempt to merge any sqlite write-ahead-log. | ||||
| 		if err := doWALCheckpoint(ctx, db); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 
 | ||||
| 		log.Info(ctx, "dropping old statuses thread_id index") | ||||
| 		if _, err := db.NewDropIndex(). | ||||
| 			Index("statuses_thread_id_idx"). | ||||
|  | @ -289,8 +355,8 @@ type statusRethreader struct { | |||
| } | ||||
| 
 | ||||
| // rethreadStatus is the main logic handler for statusRethreader{}. this is what gets called from the migration | ||||
| // in order to trigger a status rethreading operation for the given status, returning total number rethreaded. | ||||
| func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, status *oldmodel.Status) (int, error) { | ||||
| // in order to trigger a status rethreading operation for the given status, returning total number of rows changed. | ||||
| func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, status *oldmodel.Status, straggler bool) (int64, error) { | ||||
| 
 | ||||
| 	// Zero slice and | ||||
| 	// map ptr values. | ||||
|  | @ -315,14 +381,37 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu | |||
| 	// This may have changed from | ||||
| 	// the initial batch selection | ||||
| 	// to the rethreadStatus() call. | ||||
| 	// | ||||
| 	// Note: Use a map for this so we | ||||
| 	// can also select thread_id_new, | ||||
| 	// which is not part of *oldmodel.Status. | ||||
| 	upToDateValues := make(map[string]any, 3) | ||||
| 	if err := tx.NewSelect(). | ||||
| 		Model(status). | ||||
| 		Column("in_reply_to_id", "thread_id"). | ||||
| 		TableExpr("? AS ?", bun.Ident("statuses"), bun.Ident("status")). | ||||
| 		Column("in_reply_to_id", "thread_id", "thread_id_new"). | ||||
| 		Where("? = ?", bun.Ident("id"), status.ID). | ||||
| 		Scan(ctx); err != nil { | ||||
| 		Scan(ctx, &upToDateValues); err != nil { | ||||
| 		return 0, gtserror.Newf("error selecting status: %w", err) | ||||
| 	} | ||||
| 
 | ||||
| 	// If we've just threaded this status by setting | ||||
| 	// thread_id_new, then by definition anything we | ||||
| 	// could find from the entire thread must now be | ||||
| 	// threaded, so we can save some database calls | ||||
| 	// by skipping iterating up + down from here. | ||||
| 	if v, ok := upToDateValues["thread_id_new"]; ok && v.(string) != id.Lowest { | ||||
| 		log.Debug(ctx, "skipping just rethreaded status") | ||||
| 		return 0, nil | ||||
| 	} | ||||
| 
 | ||||
| 	// Set up-to-date values on the status. | ||||
| 	if v, ok := upToDateValues["in_reply_to_id"]; ok && v != nil { | ||||
| 		status.InReplyToID = v.(string) | ||||
| 	} | ||||
| 	if v, ok := upToDateValues["thread_id"]; ok && v != nil { | ||||
| 		status.ThreadID = v.(string) | ||||
| 	} | ||||
| 
 | ||||
| 	// status and thread ID cursor | ||||
| 	// index values. these are used | ||||
| 	// to keep track of newly loaded | ||||
|  | @ -371,14 +460,14 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu | |||
| 		threadIdx = len(sr.threadIDs) | ||||
| 	} | ||||
| 
 | ||||
| 	// Total number of | ||||
| 	// statuses threaded. | ||||
| 	total := len(sr.statusIDs) | ||||
| 
 | ||||
| 	// Check for the case where the entire | ||||
| 	// batch of statuses is already correctly | ||||
| 	// threaded. Then we have nothing to do! | ||||
| 	if sr.allThreaded && len(sr.threadIDs) == 1 { | ||||
| 	// | ||||
| 	// Skip this check for straggler statuses | ||||
| 	// that are part of broken threads. | ||||
| 	if !straggler && sr.allThreaded && len(sr.threadIDs) == 1 { | ||||
| 		log.Debug(ctx, "skipping just rethreaded thread") | ||||
| 		return 0, nil | ||||
| 	} | ||||
| 
 | ||||
|  | @ -417,29 +506,61 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu | |||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	// Update all the statuses to | ||||
| 	// use determined thread_id. | ||||
| 	if _, err := tx.NewUpdate(). | ||||
| 		Table("statuses"). | ||||
| 		Where("? IN (?)", bun.Ident("id"), bun.In(sr.statusIDs)). | ||||
| 		Set("? = ?", bun.Ident("thread_id"), threadID). | ||||
| 		Exec(ctx); err != nil { | ||||
| 	// Use a bulk update to update all the | ||||
| 	// statuses to use determined thread_id. | ||||
| 	// | ||||
| 	// https://bun.uptrace.dev/guide/query-update.html#bulk-update | ||||
| 	values := make([]*util.Status, 0, len(sr.statusIDs)) | ||||
| 	for _, statusID := range sr.statusIDs { | ||||
| 		values = append(values, &util.Status{ | ||||
| 			ID:          statusID, | ||||
| 			ThreadIDNew: threadID, | ||||
| 		}) | ||||
| 	} | ||||
| 
 | ||||
| 	res, err := tx.NewUpdate(). | ||||
| 		With("_data", tx.NewValues(&values)). | ||||
| 		Model((*util.Status)(nil)). | ||||
| 		TableExpr("_data"). | ||||
| 		// Set the new thread ID, which we can use as | ||||
| 		// an indication that we've migrated this batch. | ||||
| 		Set("? = ?", bun.Ident("thread_id_new"), bun.Ident("_data.thread_id_new")). | ||||
| 		// While we're here, also set old thread_id, as | ||||
| 		// we'll use it for further rethreading purposes. | ||||
| 		Set("? = ?", bun.Ident("thread_id"), bun.Ident("_data.thread_id_new")). | ||||
| 		// "Join" on status ID. | ||||
| 		Where("? = ?", bun.Ident("status.id"), bun.Ident("_data.id")). | ||||
| 		// To avoid spurious writes, | ||||
| 		// only update unmigrated statuses. | ||||
| 		Where("? = ?", bun.Ident("status.thread_id_new"), id.Lowest). | ||||
| 		Exec(ctx) | ||||
| 	if err != nil { | ||||
| 		return 0, gtserror.Newf("error updating status thread ids: %w", err) | ||||
| 	} | ||||
| 
 | ||||
| 	rowsAffected, err := res.RowsAffected() | ||||
| 	if err != nil { | ||||
| 		return 0, gtserror.Newf("error counting rows affected: %w", err) | ||||
| 	} | ||||
| 
 | ||||
| 	if len(sr.threadIDs) > 0 { | ||||
| 		// Update any existing thread | ||||
| 		// mutes to use latest thread_id. | ||||
| 
 | ||||
| 		// Dedupe thread IDs before query | ||||
| 		// to avoid ludicrous "IN" clause. | ||||
| 		threadIDs := sr.threadIDs | ||||
| 		threadIDs = xslices.Deduplicate(threadIDs) | ||||
| 		if _, err := tx.NewUpdate(). | ||||
| 			Table("thread_mutes"). | ||||
| 			Where("? IN (?)", bun.Ident("thread_id"), bun.In(sr.threadIDs)). | ||||
| 			Where("? IN (?)", bun.Ident("thread_id"), bun.In(threadIDs)). | ||||
| 			Set("? = ?", bun.Ident("thread_id"), threadID). | ||||
| 			Exec(ctx); err != nil { | ||||
| 			return 0, gtserror.Newf("error updating mute thread ids: %w", err) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	return total, nil | ||||
| 	return rowsAffected, nil | ||||
| } | ||||
| 
 | ||||
| // append will append the given status to the internal tracking of statusRethreader{} for | ||||
|  | @ -560,6 +681,11 @@ func (sr *statusRethreader) getStragglers(ctx context.Context, tx bun.Tx, idx in | |||
| 	clear(sr.statuses) | ||||
| 	sr.statuses = sr.statuses[:0] | ||||
| 
 | ||||
| 	// Dedupe thread IDs before query | ||||
| 	// to avoid ludicrous "IN" clause. | ||||
| 	threadIDs := sr.threadIDs[idx:] | ||||
| 	threadIDs = xslices.Deduplicate(threadIDs) | ||||
| 
 | ||||
| 	// Select stragglers that | ||||
| 	// also have thread IDs. | ||||
| 	if err := tx.NewSelect(). | ||||
|  | @ -567,7 +693,7 @@ func (sr *statusRethreader) getStragglers(ctx context.Context, tx bun.Tx, idx in | |||
| 		Column("id", "thread_id", "in_reply_to_id"). | ||||
| 		Where("? IN (?) AND ? NOT IN (?)", | ||||
| 			bun.Ident("thread_id"), | ||||
| 			bun.In(sr.threadIDs[idx:]), | ||||
| 			bun.In(threadIDs), | ||||
| 			bun.Ident("id"), | ||||
| 			bun.In(sr.statusIDs), | ||||
| 		). | ||||
|  |  | |||
|  | @ -46,7 +46,7 @@ type Status struct { | |||
| 	BoostOfURI               string            `bun:"-"`                                                                   // URI of the status this status is a boost of; field not inserted in the db, just for dereferencing purposes. | ||||
| 	BoostOfAccountID         string            `bun:"type:CHAR(26),nullzero"`                                              // id of the account that owns the boosted status | ||||
| 	BoostOf                  *Status           `bun:"-"`                                                                   // status that corresponds to boostOfID | ||||
| 	ThreadID                 string            `bun:"type:CHAR(26),nullzero,notnull,default:00000000000000000000000000"` // id of the thread to which this status belongs | ||||
| 	ThreadID                 string            `bun:"type:CHAR(26),nullzero,notnull,default:'00000000000000000000000000'"` // id of the thread to which this status belongs | ||||
| 	EditIDs                  []string          `bun:"edits,array"`                                                         // | ||||
| 	PollID                   string            `bun:"type:CHAR(26),nullzero"`                                              // | ||||
| 	ContentWarning           string            `bun:",nullzero"`                                                           // Content warning HTML for this status. | ||||
|  |  | |||
|  | @ -0,0 +1,25 @@ | |||
| // GoToSocial | ||||
| // Copyright (C) GoToSocial Authors admin@gotosocial.org | ||||
| // SPDX-License-Identifier: AGPL-3.0-or-later | ||||
| // | ||||
| // This program is free software: you can redistribute it and/or modify | ||||
| // it under the terms of the GNU Affero General Public License as published by | ||||
| // the Free Software Foundation, either version 3 of the License, or | ||||
| // (at your option) any later version. | ||||
| // | ||||
| // This program is distributed in the hope that it will be useful, | ||||
| // but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||
| // GNU Affero General Public License for more details. | ||||
| // | ||||
| // You should have received a copy of the GNU Affero General Public License | ||||
| // along with this program.  If not, see <http://www.gnu.org/licenses/>. | ||||
| 
 | ||||
| package util | ||||
| 
 | ||||
| // Status is a helper type specifically | ||||
| // for updating the thread ID of a status. | ||||
| type Status struct { | ||||
| 	ID          string `bun:"type:CHAR(26)"` | ||||
| 	ThreadIDNew string `bun:"type:CHAR(26)"` | ||||
| } | ||||
|  | @ -66,98 +66,6 @@ func doWALCheckpoint(ctx context.Context, db *bun.DB) error { | |||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| // batchUpdateByID performs the given updateQuery with updateArgs | ||||
| // over the entire given table, batching by the ID of batchByCol. | ||||
| func batchUpdateByID( | ||||
| 	ctx context.Context, | ||||
| 	tx bun.Tx, | ||||
| 	table string, | ||||
| 	batchByCol string, | ||||
| 	updateQuery string, | ||||
| 	updateArgs []any, | ||||
| ) error { | ||||
| 	// Get a count of all in table. | ||||
| 	total, err := tx.NewSelect(). | ||||
| 		Table(table). | ||||
| 		Count(ctx) | ||||
| 	if err != nil { | ||||
| 		return gtserror.Newf("error selecting total count: %w", err) | ||||
| 	} | ||||
| 
 | ||||
| 	// Query batch size | ||||
| 	// in number of rows. | ||||
| 	const batchsz = 5000 | ||||
| 
 | ||||
| 	// Stores highest batch value | ||||
| 	// used in iterate queries, | ||||
| 	// starting at highest possible. | ||||
| 	highest := id.Highest | ||||
| 
 | ||||
| 	// Total updated rows. | ||||
| 	var updated int | ||||
| 
 | ||||
| 	for { | ||||
| 		// Limit to batchsz | ||||
| 		// items at once. | ||||
| 		batchQ := tx. | ||||
| 			NewSelect(). | ||||
| 			Table(table). | ||||
| 			Column(batchByCol). | ||||
| 			Where("? < ?", bun.Ident(batchByCol), highest). | ||||
| 			OrderExpr("? DESC", bun.Ident(batchByCol)). | ||||
| 			Limit(batchsz) | ||||
| 
 | ||||
| 		// Finalize UPDATE to act only on batch. | ||||
| 		qStr := updateQuery + " WHERE ? IN (?)" | ||||
| 		args := append(slices.Clone(updateArgs), | ||||
| 			bun.Ident(batchByCol), | ||||
| 			batchQ, | ||||
| 		) | ||||
| 
 | ||||
| 		// Execute the prepared raw query with arguments. | ||||
| 		res, err := tx.NewRaw(qStr, args...).Exec(ctx) | ||||
| 		if err != nil { | ||||
| 			return gtserror.Newf("error updating old column values: %w", err) | ||||
| 		} | ||||
| 
 | ||||
| 		// Check how many items we updated. | ||||
| 		thisUpdated, err := res.RowsAffected() | ||||
| 		if err != nil { | ||||
| 			return gtserror.Newf("error counting affected rows: %w", err) | ||||
| 		} | ||||
| 
 | ||||
| 		if thisUpdated == 0 { | ||||
| 			// Nothing updated | ||||
| 			// means we're done. | ||||
| 			break | ||||
| 		} | ||||
| 
 | ||||
| 		// Update the overall count. | ||||
| 		updated += int(thisUpdated) | ||||
| 
 | ||||
| 		// Log helpful message to admin. | ||||
| 		log.Infof(ctx, "migrated %d of %d %s (up to %s)", | ||||
| 			updated, total, table, highest) | ||||
| 
 | ||||
| 		// Get next highest | ||||
| 		// id for next batch. | ||||
| 		if err := tx. | ||||
| 			NewSelect(). | ||||
| 			With("batch_query", batchQ). | ||||
| 			ColumnExpr("min(?) FROM ?", bun.Ident(batchByCol), bun.Ident("batch_query")). | ||||
| 			Scan(ctx, &highest); err != nil { | ||||
| 			return gtserror.Newf("error selecting next highest: %w", err) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	if total != int(updated) { | ||||
| 		// Return error here in order to rollback the whole transaction. | ||||
| 		return fmt.Errorf("total=%d does not match updated=%d", total, updated) | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| // convertEnums performs a transaction that converts | ||||
| // a table's column of our old-style enums (strings) to | ||||
| // more performant and space-saving integer types. | ||||
|  |  | |||
|  | @ -57,7 +57,7 @@ type Status struct { | |||
| 	BoostOfAccountID         string             `bun:"type:CHAR(26),nullzero"`                                              // id of the account that owns the boosted status | ||||
| 	BoostOf                  *Status            `bun:"-"`                                                                   // status that corresponds to boostOfID | ||||
| 	BoostOfAccount           *Account           `bun:"rel:belongs-to"`                                                      // account that corresponds to boostOfAccountID | ||||
| 	ThreadID                 string             `bun:"type:CHAR(26),nullzero,notnull,default:00000000000000000000000000"` // id of the thread to which this status belongs | ||||
| 	ThreadID                 string             `bun:"type:CHAR(26),nullzero,notnull,default:'00000000000000000000000000'"` // id of the thread to which this status belongs | ||||
| 	EditIDs                  []string           `bun:"edits,array"`                                                         // IDs of status edits for this status, ordered from smallest (oldest) -> largest (newest) ID. | ||||
| 	Edits                    []*StatusEdit      `bun:"-"`                                                                   // Edits of this status, ordered from oldest -> newest edit. | ||||
| 	PollID                   string             `bun:"type:CHAR(26),nullzero"`                                              // | ||||
|  |  | |||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue