mirror of
				https://github.com/superseriousbusiness/gotosocial.git
				synced 2025-10-31 05:12:25 -05:00 
			
		
		
		
	[chore] thread_id migration tweaks (#4198)
# Description - add a `migrations run` sub command to allow simply starting / stopping the db service, useful if you want to run *only* the migrations, both for testing or if you have a speedier box you want to run them on - tweaks to log messages - moved more stages outside of transactions, on sqlite if the transactions were getting too lengthy it could occasionally show an `sqlite: disk i/o error` ## Checklist - [x] I/we have read the [GoToSocial contribution guidelines](https://codeberg.org/superseriousbusiness/gotosocial/src/branch/main/CONTRIBUTING.md). - [x] I/we have discussed the proposed changes already, either in an issue on the repository, or in the Matrix chat. - [x] I/we have not leveraged AI to create the proposed changes. - [x] I/we have performed a self-review of added code. - [x] I/we have written code that is legible and maintainable by others. - [x] I/we have commented the added code, particularly in hard-to-understand areas. - [ ] I/we have made any necessary changes to documentation. - [ ] I/we have added tests that cover new code. - [x] I/we have run tests and they pass locally with the changes. - [x] I/we have run `go fmt ./...` and `golangci-lint run`. Reviewed-on: https://codeberg.org/superseriousbusiness/gotosocial/pulls/4198 Co-authored-by: kim <grufwub@gmail.com> Co-committed-by: kim <grufwub@gmail.com>
This commit is contained in:
		
					parent
					
						
							
								143febb318
							
						
					
				
			
			
				commit
				
					
						f3c4ea0106
					
				
			
		
					 5 changed files with 164 additions and 51 deletions
				
			
		
							
								
								
									
										65
									
								
								cmd/gotosocial/action/migration/run.go
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										65
									
								
								cmd/gotosocial/action/migration/run.go
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,65 @@ | ||||||
|  | // GoToSocial | ||||||
|  | // Copyright (C) GoToSocial Authors admin@gotosocial.org | ||||||
|  | // SPDX-License-Identifier: AGPL-3.0-or-later | ||||||
|  | // | ||||||
|  | // This program is free software: you can redistribute it and/or modify | ||||||
|  | // it under the terms of the GNU Affero General Public License as published by | ||||||
|  | // the Free Software Foundation, either version 3 of the License, or | ||||||
|  | // (at your option) any later version. | ||||||
|  | // | ||||||
|  | // This program is distributed in the hope that it will be useful, | ||||||
|  | // but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||||
|  | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||||
|  | // GNU Affero General Public License for more details. | ||||||
|  | // | ||||||
|  | // You should have received a copy of the GNU Affero General Public License | ||||||
|  | // along with this program.  If not, see <http://www.gnu.org/licenses/>. | ||||||
|  | 
 | ||||||
|  | package migration | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"fmt" | ||||||
|  | 
 | ||||||
|  | 	"code.superseriousbusiness.org/gotosocial/cmd/gotosocial/action" | ||||||
|  | 	"code.superseriousbusiness.org/gotosocial/internal/db/bundb" | ||||||
|  | 	"code.superseriousbusiness.org/gotosocial/internal/log" | ||||||
|  | 	"code.superseriousbusiness.org/gotosocial/internal/state" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | // Run will initialize the database, running any available migrations. | ||||||
|  | var Run action.GTSAction = func(ctx context.Context) error { | ||||||
|  | 	var state state.State | ||||||
|  | 
 | ||||||
|  | 	defer func() { | ||||||
|  | 		if state.DB != nil { | ||||||
|  | 			// Lastly, if database service was started, | ||||||
|  | 			// ensure it gets closed now all else stopped. | ||||||
|  | 			if err := state.DB.Close(); err != nil { | ||||||
|  | 				log.Errorf(ctx, "error stopping database: %v", err) | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		// Finally reached end of shutdown. | ||||||
|  | 		log.Info(ctx, "done! exiting...") | ||||||
|  | 	}() | ||||||
|  | 
 | ||||||
|  | 	// Initialize caches | ||||||
|  | 	state.Caches.Init() | ||||||
|  | 	if err := state.Caches.Start(); err != nil { | ||||||
|  | 		return fmt.Errorf("error starting caches: %w", err) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	log.Info(ctx, "starting db service...") | ||||||
|  | 
 | ||||||
|  | 	// Open connection to the database now caches started. | ||||||
|  | 	dbService, err := bundb.NewBunDBService(ctx, &state) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return fmt.Errorf("error creating dbservice: %s", err) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	// Set DB on state. | ||||||
|  | 	state.DB = dbService | ||||||
|  | 
 | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  | @ -55,6 +55,7 @@ func main() { | ||||||
| 	rootCmd.AddCommand(serverCommands()) | 	rootCmd.AddCommand(serverCommands()) | ||||||
| 	rootCmd.AddCommand(debugCommands()) | 	rootCmd.AddCommand(debugCommands()) | ||||||
| 	rootCmd.AddCommand(adminCommands()) | 	rootCmd.AddCommand(adminCommands()) | ||||||
|  | 	rootCmd.AddCommand(migrationCommands()) | ||||||
| 
 | 
 | ||||||
| 	// Testrigcmd will only be set when debug is enabled. | 	// Testrigcmd will only be set when debug is enabled. | ||||||
| 	if testrigCmd := testrigCommands(); testrigCmd != nil { | 	if testrigCmd := testrigCommands(); testrigCmd != nil { | ||||||
|  |  | ||||||
							
								
								
									
										43
									
								
								cmd/gotosocial/migrations.go
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										43
									
								
								cmd/gotosocial/migrations.go
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,43 @@ | ||||||
|  | // GoToSocial | ||||||
|  | // Copyright (C) GoToSocial Authors admin@gotosocial.org | ||||||
|  | // SPDX-License-Identifier: AGPL-3.0-or-later | ||||||
|  | // | ||||||
|  | // This program is free software: you can redistribute it and/or modify | ||||||
|  | // it under the terms of the GNU Affero General Public License as published by | ||||||
|  | // the Free Software Foundation, either version 3 of the License, or | ||||||
|  | // (at your option) any later version. | ||||||
|  | // | ||||||
|  | // This program is distributed in the hope that it will be useful, | ||||||
|  | // but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||||
|  | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||||
|  | // GNU Affero General Public License for more details. | ||||||
|  | // | ||||||
|  | // You should have received a copy of the GNU Affero General Public License | ||||||
|  | // along with this program.  If not, see <http://www.gnu.org/licenses/>. | ||||||
|  | 
 | ||||||
|  | package main | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"code.superseriousbusiness.org/gotosocial/cmd/gotosocial/action/migration" | ||||||
|  | 	"github.com/spf13/cobra" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | // migrationCommands returns the 'migrations' subcommand | ||||||
|  | func migrationCommands() *cobra.Command { | ||||||
|  | 	migrationCmd := &cobra.Command{ | ||||||
|  | 		Use:   "migrations", | ||||||
|  | 		Short: "gotosocial migrations-related tasks", | ||||||
|  | 	} | ||||||
|  | 	migrationRunCmd := &cobra.Command{ | ||||||
|  | 		Use:   "run", | ||||||
|  | 		Short: "starts and stops the database, running any outstanding migrations", | ||||||
|  | 		PreRunE: func(cmd *cobra.Command, args []string) error { | ||||||
|  | 			return preRun(preRunArgs{cmd: cmd}) | ||||||
|  | 		}, | ||||||
|  | 		RunE: func(cmd *cobra.Command, args []string) error { | ||||||
|  | 			return run(cmd.Context(), migration.Run) | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  | 	migrationCmd.AddCommand(migrationRunCmd) | ||||||
|  | 	return migrationCmd | ||||||
|  | } | ||||||
|  | @ -49,10 +49,16 @@ func init() { | ||||||
| 			"thread_id", "thread_id_new", 1) | 			"thread_id", "thread_id_new", 1) | ||||||
| 
 | 
 | ||||||
| 		var sr statusRethreader | 		var sr statusRethreader | ||||||
| 		var total uint64 | 		var count int | ||||||
| 		var maxID string | 		var maxID string | ||||||
| 		var statuses []*oldmodel.Status | 		var statuses []*oldmodel.Status | ||||||
| 
 | 
 | ||||||
|  | 		// Get a total count of all statuses before migration. | ||||||
|  | 		total, err := db.NewSelect().Table("statuses").Count(ctx) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return gtserror.Newf("error getting status table count: %w", err) | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
| 		// Start at largest | 		// Start at largest | ||||||
| 		// possible ULID value. | 		// possible ULID value. | ||||||
| 		maxID = id.Highest | 		maxID = id.Highest | ||||||
|  | @ -97,7 +103,7 @@ func init() { | ||||||
| 					if err != nil { | 					if err != nil { | ||||||
| 						return gtserror.Newf("error rethreading status %s: %w", status.URI, err) | 						return gtserror.Newf("error rethreading status %s: %w", status.URI, err) | ||||||
| 					} | 					} | ||||||
| 					total += n | 					count += n | ||||||
| 				} | 				} | ||||||
| 
 | 
 | ||||||
| 				return nil | 				return nil | ||||||
|  | @ -105,7 +111,12 @@ func init() { | ||||||
| 				return err | 				return err | ||||||
| 			} | 			} | ||||||
| 
 | 
 | ||||||
| 			log.Infof(ctx, "[%d] rethreading statuses (top-level)", total) | 			log.Infof(ctx, "[approx %d of %d] rethreading statuses (top-level)", count, total) | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		// Attempt to merge any sqlite write-ahead-log. | ||||||
|  | 		if err := doWALCheckpoint(ctx, db); err != nil { | ||||||
|  | 			return err | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		log.Warn(ctx, "rethreading straggler statuses, this will take a *long* time") | 		log.Warn(ctx, "rethreading straggler statuses, this will take a *long* time") | ||||||
|  | @ -146,7 +157,7 @@ func init() { | ||||||
| 					if err != nil { | 					if err != nil { | ||||||
| 						return gtserror.Newf("error rethreading status %s: %w", status.URI, err) | 						return gtserror.Newf("error rethreading status %s: %w", status.URI, err) | ||||||
| 					} | 					} | ||||||
| 					total += n | 					count += n | ||||||
| 				} | 				} | ||||||
| 
 | 
 | ||||||
| 				return nil | 				return nil | ||||||
|  | @ -154,7 +165,7 @@ func init() { | ||||||
| 				return err | 				return err | ||||||
| 			} | 			} | ||||||
| 
 | 
 | ||||||
| 			log.Infof(ctx, "[%d] rethreading statuses (stragglers)", total) | 			log.Infof(ctx, "[approx %d of %d] rethreading statuses (stragglers)", count, total) | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		// Attempt to merge any sqlite write-ahead-log. | 		// Attempt to merge any sqlite write-ahead-log. | ||||||
|  | @ -165,59 +176,28 @@ func init() { | ||||||
| 		log.Info(ctx, "dropping old thread_to_statuses table") | 		log.Info(ctx, "dropping old thread_to_statuses table") | ||||||
| 		if _, err := db.NewDropTable(). | 		if _, err := db.NewDropTable(). | ||||||
| 			Table("thread_to_statuses"). | 			Table("thread_to_statuses"). | ||||||
| 			IfExists(). |  | ||||||
| 			Exec(ctx); err != nil { | 			Exec(ctx); err != nil { | ||||||
| 			return gtserror.Newf("error dropping old thread_to_statuses table: %w", err) | 			return gtserror.Newf("error dropping old thread_to_statuses table: %w", err) | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		// Run the majority of the thread_id_new -> thread_id migration in a tx. |  | ||||||
| 		if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { |  | ||||||
| 		log.Info(ctx, "creating new statuses thread_id column") | 		log.Info(ctx, "creating new statuses thread_id column") | ||||||
| 			if _, err := tx.NewAddColumn(). | 		if _, err := db.NewAddColumn(). | ||||||
| 			Table("statuses"). | 			Table("statuses"). | ||||||
| 			ColumnExpr(newColDef). | 			ColumnExpr(newColDef). | ||||||
| 			Exec(ctx); err != nil { | 			Exec(ctx); err != nil { | ||||||
| 				return gtserror.Newf("error creating new thread_id column: %w", err) | 			return gtserror.Newf("error adding new thread_id column: %w", err) | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		log.Info(ctx, "setting thread_id_new = thread_id (this may take a while...)") | 		log.Info(ctx, "setting thread_id_new = thread_id (this may take a while...)") | ||||||
| 			if err := batchUpdateByID(ctx, tx, | 		if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { | ||||||
|  | 			return batchUpdateByID(ctx, tx, | ||||||
| 				"statuses",           // table | 				"statuses",           // table | ||||||
| 				"id",                 // batchByCol | 				"id",                 // batchByCol | ||||||
| 				"UPDATE ? SET ? = ?", // updateQuery | 				"UPDATE ? SET ? = ?", // updateQuery | ||||||
| 				[]any{bun.Ident("statuses"), | 				[]any{bun.Ident("statuses"), | ||||||
| 					bun.Ident("thread_id_new"), | 					bun.Ident("thread_id_new"), | ||||||
| 					bun.Ident("thread_id")}, | 					bun.Ident("thread_id")}, | ||||||
| 			); err != nil { | 			) | ||||||
| 				return err |  | ||||||
| 			} |  | ||||||
| 
 |  | ||||||
| 			log.Info(ctx, "dropping old statuses thread_id index") |  | ||||||
| 			if _, err := tx.NewDropIndex(). |  | ||||||
| 				Index("statuses_thread_id_idx"). |  | ||||||
| 				Exec(ctx); err != nil { |  | ||||||
| 				return gtserror.Newf("error dropping old thread_id index: %w", err) |  | ||||||
| 			} |  | ||||||
| 
 |  | ||||||
| 			log.Info(ctx, "dropping old statuses thread_id column") |  | ||||||
| 			if _, err := tx.NewDropColumn(). |  | ||||||
| 				Table("statuses"). |  | ||||||
| 				Column("thread_id"). |  | ||||||
| 				Exec(ctx); err != nil { |  | ||||||
| 				return gtserror.Newf("error dropping old thread_id column: %w", err) |  | ||||||
| 			} |  | ||||||
| 
 |  | ||||||
| 			log.Info(ctx, "renaming thread_id_new to thread_id") |  | ||||||
| 			if _, err := tx.NewRaw( |  | ||||||
| 				"ALTER TABLE ? RENAME COLUMN ? TO ?", |  | ||||||
| 				bun.Ident("statuses"), |  | ||||||
| 				bun.Ident("thread_id_new"), |  | ||||||
| 				bun.Ident("thread_id"), |  | ||||||
| 			).Exec(ctx); err != nil { |  | ||||||
| 				return gtserror.Newf("error renaming new column: %w", err) |  | ||||||
| 			} |  | ||||||
| 
 |  | ||||||
| 			return nil |  | ||||||
| 		}); err != nil { | 		}); err != nil { | ||||||
| 			return err | 			return err | ||||||
| 		} | 		} | ||||||
|  | @ -227,12 +207,36 @@ func init() { | ||||||
| 			return err | 			return err | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
|  | 		log.Info(ctx, "dropping old statuses thread_id index") | ||||||
|  | 		if _, err := db.NewDropIndex(). | ||||||
|  | 			Index("statuses_thread_id_idx"). | ||||||
|  | 			Exec(ctx); err != nil { | ||||||
|  | 			return gtserror.Newf("error dropping old thread_id index: %w", err) | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		log.Info(ctx, "dropping old statuses thread_id column") | ||||||
|  | 		if _, err := db.NewDropColumn(). | ||||||
|  | 			Table("statuses"). | ||||||
|  | 			Column("thread_id"). | ||||||
|  | 			Exec(ctx); err != nil { | ||||||
|  | 			return gtserror.Newf("error dropping old thread_id column: %w", err) | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		log.Info(ctx, "renaming thread_id_new to thread_id") | ||||||
|  | 		if _, err := db.NewRaw( | ||||||
|  | 			"ALTER TABLE ? RENAME COLUMN ? TO ?", | ||||||
|  | 			bun.Ident("statuses"), | ||||||
|  | 			bun.Ident("thread_id_new"), | ||||||
|  | 			bun.Ident("thread_id"), | ||||||
|  | 		).Exec(ctx); err != nil { | ||||||
|  | 			return gtserror.Newf("error renaming new column: %w", err) | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
| 		log.Info(ctx, "creating new statuses thread_id index") | 		log.Info(ctx, "creating new statuses thread_id index") | ||||||
| 		if _, err := db.NewCreateIndex(). | 		if _, err := db.NewCreateIndex(). | ||||||
| 			Table("statuses"). | 			Table("statuses"). | ||||||
| 			Index("statuses_thread_id_idx"). | 			Index("statuses_thread_id_idx"). | ||||||
| 			Column("thread_id"). | 			Column("thread_id"). | ||||||
| 			IfNotExists(). |  | ||||||
| 			Exec(ctx); err != nil { | 			Exec(ctx); err != nil { | ||||||
| 			return gtserror.Newf("error creating new thread_id index: %w", err) | 			return gtserror.Newf("error creating new thread_id index: %w", err) | ||||||
| 		} | 		} | ||||||
|  | @ -286,7 +290,7 @@ type statusRethreader struct { | ||||||
| 
 | 
 | ||||||
| // rethreadStatus is the main logic handler for statusRethreader{}. this is what gets called from the migration | // rethreadStatus is the main logic handler for statusRethreader{}. this is what gets called from the migration | ||||||
| // in order to trigger a status rethreading operation for the given status, returning total number rethreaded. | // in order to trigger a status rethreading operation for the given status, returning total number rethreaded. | ||||||
| func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, status *oldmodel.Status) (uint64, error) { | func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, status *oldmodel.Status) (int, error) { | ||||||
| 
 | 
 | ||||||
| 	// Zero slice and | 	// Zero slice and | ||||||
| 	// map ptr values. | 	// map ptr values. | ||||||
|  | @ -435,7 +439,7 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	return uint64(total), nil | 	return total, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // append will append the given status to the internal tracking of statusRethreader{} for | // append will append the given status to the internal tracking of statusRethreader{} for | ||||||
|  |  | ||||||
|  | @ -417,7 +417,7 @@ func getModelField(db bun.IDB, rtype reflect.Type, fieldName string) (*schema.Fi | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // doesColumnExist safely checks whether given column exists on table, handling both SQLite and PostgreSQL appropriately. | // doesColumnExist safely checks whether given column exists on table, handling both SQLite and PostgreSQL appropriately. | ||||||
| func doesColumnExist(ctx context.Context, tx bun.Tx, table, col string) (bool, error) { | func doesColumnExist(ctx context.Context, tx bun.IDB, table, col string) (bool, error) { | ||||||
| 	var n int | 	var n int | ||||||
| 	var err error | 	var err error | ||||||
| 	switch tx.Dialect().Name() { | 	switch tx.Dialect().Name() { | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue