From 3db2d42247c5f88196ae7fb68b6bbec603bb7f26 Mon Sep 17 00:00:00 2001 From: kim Date: Wed, 24 Sep 2025 15:12:25 +0200 Subject: [PATCH 01/11] [chore] ffmpeg webassembly fiddling (#4454) This disables ffmpeg / ffprobe support on platforms where the wazero compiler is not available. The slowness introduced is hard to pindown for admins (and us!), so it's easier to just return an error message linking to docs on attempted media processing. It still allows the instance to run, just erroring if anything other than a jpeg is attempted to be processed. This should hopefully make it easier for users to notice these issues. Also further locks down our wazero 'allowFiles' fs and other media code to address: https://codeberg.org/superseriousbusiness/gotosocial/issues/4408 relates to: https://codeberg.org/superseriousbusiness/gotosocial/issues/4427 also relates to issues raised in #gotosocial-help on matrix closes https://codeberg.org/superseriousbusiness/gotosocial/issues/4408 Reviewed-on: https://codeberg.org/superseriousbusiness/gotosocial/pulls/4454 Co-authored-by: kim Co-committed-by: kim --- README.md | 2 +- internal/media/ffmpeg.go | 48 ++++++------------- internal/media/ffmpeg/wasm.go | 36 +++++++-------- internal/media/metadata.go | 20 +++++--- internal/media/probe.go | 4 +- internal/media/thumbnail.go | 12 ++--- internal/media/util.go | 86 +++++++++++++++++++++++------------ 7 files changed, 111 insertions(+), 97 deletions(-) diff --git a/README.md b/README.md index 7739f1ad2..605f654f6 100644 --- a/README.md +++ b/README.md @@ -328,7 +328,7 @@ This is the current status of support offered by GoToSocial for different platfo Notes on 64-bit CPU feature requirements: -- x86_64 requires the SSE4.1 instruction set. (CPUs manufactured after ~2010) +- x86_64 requires the [x86-64-v2](https://en.wikipedia.org/wiki/X86-64-v2) level instruction sets. (CPUs manufactured after ~2010) - ARM64 requires no specific features, ARMv8 CPUs (and later) have all required features. diff --git a/internal/media/ffmpeg.go b/internal/media/ffmpeg.go index d98e93baf..938a10894 100644 --- a/internal/media/ffmpeg.go +++ b/internal/media/ffmpeg.go @@ -21,8 +21,6 @@ import ( "context" "encoding/json" "errors" - "os" - "path" "strconv" "strings" @@ -158,34 +156,20 @@ func ffmpeg(ctx context.Context, inpath string, outpath string, args ...string) Config: func(modcfg wazero.ModuleConfig) wazero.ModuleConfig { fscfg := wazero.NewFSConfig() - // Needs read-only access to - // /dev/urandom for some types. - urandom := &allowFiles{ - { - abs: "/dev/urandom", - flag: os.O_RDONLY, - perm: 0, - }, - } - fscfg = fscfg.WithFSMount(urandom, "/dev") + // Needs read-only access /dev/urandom, + // required by some ffmpeg operations. + fscfg = fscfg.WithFSMount(&allowFiles{ + allowRead("/dev/urandom"), + }, "/dev") // In+out dirs are always the same (tmp), // so we can share one file system for // both + grant different perms to inpath // (read only) and outpath (read+write). - shared := &allowFiles{ - { - abs: inpath, - flag: os.O_RDONLY, - perm: 0, - }, - { - abs: outpath, - flag: os.O_RDWR | os.O_CREATE | os.O_TRUNC, - perm: 0666, - }, - } - fscfg = fscfg.WithFSMount(shared, path.Dir(inpath)) + fscfg = fscfg.WithFSMount(&allowFiles{ + allowCreate(outpath), + allowRead(inpath), + }, tmpdir) // Set anonymous module name. modcfg = modcfg.WithName("") @@ -246,16 +230,10 @@ func ffprobe(ctx context.Context, filepath string) (*result, error) { Config: func(modcfg wazero.ModuleConfig) wazero.ModuleConfig { fscfg := wazero.NewFSConfig() - // Needs read-only access - // to file being probed. - in := &allowFiles{ - { - abs: filepath, - flag: os.O_RDONLY, - perm: 0, - }, - } - fscfg = fscfg.WithFSMount(in, path.Dir(filepath)) + // Needs read-only access to probed file. + fscfg = fscfg.WithFSMount(&allowFiles{ + allowRead(filepath), + }, tmpdir) // Set anonymous module name. modcfg = modcfg.WithName("") diff --git a/internal/media/ffmpeg/wasm.go b/internal/media/ffmpeg/wasm.go index 1cd92f05d..bcf73725e 100644 --- a/internal/media/ffmpeg/wasm.go +++ b/internal/media/ffmpeg/wasm.go @@ -21,12 +21,12 @@ package ffmpeg import ( "context" + "errors" "os" "runtime" "sync/atomic" "unsafe" - "code.superseriousbusiness.org/gotosocial/internal/log" "codeberg.org/gruf/go-ffmpreg/embed" "codeberg.org/gruf/go-ffmpreg/wasm" "github.com/tetratelabs/wazero" @@ -49,24 +49,19 @@ func initWASM(ctx context.Context) error { return nil } - var cfg wazero.RuntimeConfig - - // Allocate new runtime config, letting - // wazero determine compiler / interpreter. - cfg = wazero.NewRuntimeConfig() - - // Though still perform a check of CPU features at - // runtime to warn about slow interpreter performance. - if reason, supported := compilerSupported(); !supported { - log.Warn(ctx, "!!! WAZERO COMPILER MAY NOT BE AVAILABLE !!!"+ - " Reason: "+reason+"."+ - " Wazero will likely fall back to interpreter mode,"+ - " resulting in poor performance for media processing (and SQLite, if in use)."+ - " For more info and possible workarounds, please check:"+ - " https://docs.gotosocial.org/en/latest/getting_started/releases/#supported-platforms", - ) + // Check at runtime whether Wazero compiler support is available, + // interpreter mode is too slow for a usable gotosocial experience. + if reason, supported := isCompilerSupported(); !supported { + return errors.New("!!! WAZERO COMPILER SUPPORT NOT AVAILABLE !!!" + + " Reason: " + reason + "." + + " Wazero in interpreter mode is too slow to use ffmpeg" + + " (this will also affect SQLite if in use)." + + " For more info and possible workarounds, please check: https://docs.gotosocial.org/en/latest/getting_started/releases/#supported-platforms") } + // Allocate new runtime compiler config. + cfg := wazero.NewRuntimeConfigCompiler() + if dir := os.Getenv("GTS_WAZERO_COMPILATION_CACHE"); dir != "" { // Use on-filesystem compilation cache given by env. cache, err := wazero.NewCompilationCacheWithDir(dir) @@ -128,7 +123,7 @@ func initWASM(ctx context.Context) error { return nil } -func compilerSupported() (string, bool) { +func isCompilerSupported() (string, bool) { switch runtime.GOOS { case "linux", "android", "windows", "darwin", @@ -141,10 +136,11 @@ func compilerSupported() (string, bool) { switch runtime.GOARCH { case "amd64": // NOTE: wazero in the future may decouple the - // requirement of simd (sse4_1) from requirements + // requirement of simd (sse4_1+2) from requirements // for compiler support in the future, but even // still our module go-ffmpreg makes use of them. - return "amd64 SSE4.1 required", cpu.X86.HasSSE41 + return "amd64 x86-64-v2 required (see: https://en.wikipedia.org/wiki/X86-64-v2)", + cpu.Initialized && cpu.X86.HasSSE3 && cpu.X86.HasSSE41 && cpu.X86.HasSSE42 case "arm64": // NOTE: this particular check may change if we // later update go-ffmpreg to a version that makes diff --git a/internal/media/metadata.go b/internal/media/metadata.go index 44b1a87b6..c1fa58645 100644 --- a/internal/media/metadata.go +++ b/internal/media/metadata.go @@ -74,20 +74,28 @@ func clearMetadata(ctx context.Context, filepath string) error { // terminateExif cleans exif data from file at input path, into file // at output path, using given file extension to determine cleaning type. -func terminateExif(outpath, inpath string, ext string) error { +func terminateExif(outpath, inpath string, ext string) (err error) { + var inFile *os.File + var outFile *os.File + + // Ensure handles + // closed on return. + defer func() { + outFile.Close() + inFile.Close() + }() + // Open input file at given path. - inFile, err := os.Open(inpath) + inFile, err = openRead(inpath) if err != nil { return gtserror.Newf("error opening input file %s: %w", inpath, err) } - defer inFile.Close() - // Open output file at given path. - outFile, err := os.Create(outpath) + // Create output file at given path. + outFile, err = openWrite(outpath) if err != nil { return gtserror.Newf("error opening output file %s: %w", outpath, err) } - defer outFile.Close() // Terminate EXIF data from 'inFile' -> 'outFile'. err = terminator.TerminateInto(outFile, inFile, ext) diff --git a/internal/media/probe.go b/internal/media/probe.go index 791b6a8c2..c66254b90 100644 --- a/internal/media/probe.go +++ b/internal/media/probe.go @@ -38,8 +38,9 @@ const ( // probe will first attempt to probe the file at path using native Go code // (for performance), but falls back to using ffprobe to retrieve media details. func probe(ctx context.Context, filepath string) (*result, error) { + // Open input file at given path. - file, err := os.Open(filepath) + file, err := openRead(filepath) if err != nil { return nil, gtserror.Newf("error opening file %s: %w", filepath, err) } @@ -80,6 +81,7 @@ func probe(ctx context.Context, filepath string) (*result, error) { // probeJPEG decodes the given file as JPEG and determines // image details from the decoded JPEG using native Go code. func probeJPEG(file *os.File) (*result, error) { + // Attempt to decode JPEG, adding back hdr magic. cfg, err := jpeg.DecodeConfig(io.MultiReader( strings.NewReader(magicJPEG), diff --git a/internal/media/thumbnail.go b/internal/media/thumbnail.go index d9a2e522a..5fccaf5ce 100644 --- a/internal/media/thumbnail.go +++ b/internal/media/thumbnail.go @@ -24,7 +24,6 @@ import ( "image/jpeg" "image/png" "io" - "os" "strings" "code.superseriousbusiness.org/gotosocial/internal/gtserror" @@ -89,8 +88,8 @@ func generateThumb( // Default type is webp. mimeType = "image/webp" - // Generate thumb output path REPLACING extension. - if i := strings.IndexByte(filepath, '.'); i != -1 { + // Generate thumb output path REPLACING file extension. + if i := strings.LastIndexByte(filepath, '.'); i != -1 { outpath = filepath[:i] + "_thumb.webp" ext = filepath[i+1:] // old extension } else { @@ -231,7 +230,7 @@ func generateNativeThumb( error, ) { // Open input file at given path. - infile, err := os.Open(inpath) + infile, err := openRead(inpath) if err != nil { return "", gtserror.Newf("error opening input file %s: %w", inpath, err) } @@ -272,7 +271,7 @@ func generateNativeThumb( ) // Open output file at given path. - outfile, err := os.Create(outpath) + outfile, err := openWrite(outpath) if err != nil { return "", gtserror.Newf("error opening output file %s: %w", outpath, err) } @@ -313,8 +312,9 @@ func generateNativeThumb( // generateWebpBlurhash generates a blurhash for Webp at filepath. func generateWebpBlurhash(filepath string) (string, error) { + // Open the file at given path. - file, err := os.Open(filepath) + file, err := openRead(filepath) if err != nil { return "", gtserror.Newf("error opening input file %s: %w", filepath, err) } diff --git a/internal/media/util.go b/internal/media/util.go index ea52b415b..d73206434 100644 --- a/internal/media/util.go +++ b/internal/media/util.go @@ -30,14 +30,41 @@ import ( "codeberg.org/gruf/go-iotools" ) +// media processing tmpdir. +var tmpdir = os.TempDir() + // file represents one file // with the given flag and perms. type file struct { - abs string + abs string // absolute file path, including root + dir string // containing directory of abs + rel string // relative to root, i.e. trim_prefix(abs, dir) flag int perm os.FileMode } +// allowRead returns a new file{} for filepath permitted only to read. +func allowRead(filepath string) file { + return newFile(filepath, os.O_RDONLY, 0) +} + +// allowCreate returns a new file{} for filepath permitted to read / write / create. +func allowCreate(filepath string) file { + return newFile(filepath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600) +} + +// newFile returns a new instance of file{} for given path and open args. +func newFile(filepath string, flag int, perms os.FileMode) file { + dir, rel := path.Split(filepath) + return file{ + abs: filepath, + rel: rel, + dir: dir, + flag: flag, + perm: perms, + } +} + // allowFiles implements fs.FS to allow // access to a specified slice of files. type allowFiles []file @@ -45,36 +72,32 @@ type allowFiles []file // Open implements fs.FS. func (af allowFiles) Open(name string) (fs.File, error) { for _, file := range af { - var ( - abs = file.abs - flag = file.flag - perm = file.perm - ) - + switch name { // Allowed to open file - // at absolute path. - if name == file.abs { - return os.OpenFile(abs, flag, perm) - } + // at absolute path, or + // relative as ffmpeg likes. + case file.abs, file.rel: + return os.OpenFile(file.abs, file.flag, file.perm) - // Check for other valid reads. - thisDir, thisFile := path.Split(file.abs) - - // Allowed to read directory itself. - if name == thisDir || name == "." { - return os.OpenFile(thisDir, flag, perm) - } - - // Allowed to read file - // itself (at relative path). - if name == thisFile { - return os.OpenFile(abs, flag, perm) + // Ffmpeg likes to read containing + // dir as '.'. Allow RO access here. + case ".": + return openRead(file.dir) } } - return nil, os.ErrPermission } +// openRead opens the existing file at path for reads only. +func openRead(path string) (*os.File, error) { + return os.OpenFile(path, os.O_RDONLY, 0) +} + +// openWrite opens the (new!) file at path for read / writes. +func openWrite(path string) (*os.File, error) { + return os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600) +} + // getExtension splits file extension from path. func getExtension(path string) string { for i := len(path) - 1; i >= 0 && path[i] != '/'; i-- { @@ -93,17 +116,24 @@ func getExtension(path string) string { // chance that Linux's sendfile syscall can be utilised for optimal // draining of data source to temporary file storage. func drainToTmp(rc io.ReadCloser) (string, error) { - defer rc.Close() + var tmp *os.File + var err error + + // Close handles + // on func return. + defer func() { + tmp.Close() + rc.Close() + }() // Open new temporary file. - tmp, err := os.CreateTemp( - os.TempDir(), + tmp, err = os.CreateTemp( + tmpdir, "gotosocial-*", ) if err != nil { return "", err } - defer tmp.Close() // Extract file path. path := tmp.Name() From dfdf06e4add02e66c3b3aca7d0f144b88b6008a5 Mon Sep 17 00:00:00 2001 From: kim Date: Thu, 25 Sep 2025 16:38:19 +0200 Subject: [PATCH 02/11] [chore] update dependencies (#4458) - codeberg.org/gruf/go-ffmpreg: v0.6.11 -> v0.6.12 Reviewed-on: https://codeberg.org/superseriousbusiness/gotosocial/pulls/4458 Co-authored-by: kim Co-committed-by: kim --- go.mod | 2 +- go.sum | 4 +- internal/media/ffmpeg/wasm.go | 4 +- .../codeberg.org/gruf/go-ffmpreg/embed/lib.go | 35 ++++--- .../gruf/go-ffmpreg/wasm/funcs.go | 97 ++++++++++++++++--- .../codeberg.org/gruf/go-ffmpreg/wasm/run.go | 1 + .../gruf/go-ffmpreg/wasm/runtime.go | 1 + vendor/modules.txt | 2 +- 8 files changed, 110 insertions(+), 36 deletions(-) diff --git a/go.mod b/go.mod index 8accf4398..b1d744ffa 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( codeberg.org/gruf/go-errors/v2 v2.3.2 codeberg.org/gruf/go-fastcopy v1.1.3 codeberg.org/gruf/go-fastpath/v2 v2.0.0 - codeberg.org/gruf/go-ffmpreg v0.6.11 + codeberg.org/gruf/go-ffmpreg v0.6.12 codeberg.org/gruf/go-iotools v0.0.0-20240710125620-934ae9c654cf codeberg.org/gruf/go-kv/v2 v2.0.7 codeberg.org/gruf/go-list v0.0.0-20240425093752-494db03d641f diff --git a/go.sum b/go.sum index 6b9b97d00..507d2d38f 100644 --- a/go.sum +++ b/go.sum @@ -26,8 +26,8 @@ codeberg.org/gruf/go-fastcopy v1.1.3 h1:Jo9VTQjI6KYimlw25PPc7YLA3Xm+XMQhaHwKnM7x codeberg.org/gruf/go-fastcopy v1.1.3/go.mod h1:GDDYR0Cnb3U/AIfGM3983V/L+GN+vuwVMvrmVABo21s= codeberg.org/gruf/go-fastpath/v2 v2.0.0 h1:iAS9GZahFhyWEH0KLhFEJR+txx1ZhMXxYzu2q5Qo9c0= codeberg.org/gruf/go-fastpath/v2 v2.0.0/go.mod h1:3pPqu5nZjpbRrOqvLyAK7puS1OfEtQvjd6342Cwz56Q= -codeberg.org/gruf/go-ffmpreg v0.6.11 h1:+lvB5Loy0KUAKfv6nOZRWHFVgN08cpHhUlYcZxL8M20= -codeberg.org/gruf/go-ffmpreg v0.6.11/go.mod h1:tGqIMh/I2cizqauxxNAN+WGkICI0j5G3xwF1uBkyw1E= +codeberg.org/gruf/go-ffmpreg v0.6.12 h1:mPdRx1TAQJQPhRkTOOHnRSY6omNCLJ7M6ajjuEMNNvE= +codeberg.org/gruf/go-ffmpreg v0.6.12/go.mod h1:tGqIMh/I2cizqauxxNAN+WGkICI0j5G3xwF1uBkyw1E= codeberg.org/gruf/go-iotools v0.0.0-20240710125620-934ae9c654cf h1:84s/ii8N6lYlskZjHH+DG6jyia8w2mXMZlRwFn8Gs3A= codeberg.org/gruf/go-iotools v0.0.0-20240710125620-934ae9c654cf/go.mod h1:zZAICsp5rY7+hxnws2V0ePrWxE0Z2Z/KXcN3p/RQCfk= codeberg.org/gruf/go-kv v1.6.5 h1:ttPf0NA8F79pDqBttSudPTVCZmGncumeNIxmeM9ztz0= diff --git a/internal/media/ffmpeg/wasm.go b/internal/media/ffmpeg/wasm.go index bcf73725e..f395032fa 100644 --- a/internal/media/ffmpeg/wasm.go +++ b/internal/media/ffmpeg/wasm.go @@ -83,7 +83,7 @@ func initWASM(ctx context.Context) error { defer func() { if err == nil && set { // Drop binary. - embed.B = nil + embed.Free() return } @@ -105,7 +105,7 @@ func initWASM(ctx context.Context) error { } // Compile ffmpreg WebAssembly into memory. - mod, err = run.CompileModule(ctx, embed.B) + mod, err = run.CompileModule(ctx, embed.B()) if err != nil { return err } diff --git a/vendor/codeberg.org/gruf/go-ffmpreg/embed/lib.go b/vendor/codeberg.org/gruf/go-ffmpreg/embed/lib.go index 7829b5524..b39d7d10e 100644 --- a/vendor/codeberg.org/gruf/go-ffmpreg/embed/lib.go +++ b/vendor/codeberg.org/gruf/go-ffmpreg/embed/lib.go @@ -1,39 +1,46 @@ package embed import ( - "bytes" "compress/gzip" _ "embed" "io" - "os" + "strings" ) func init() { var err error - if path := os.Getenv("FFMPREG_WASM"); path != "" { - // Read file into memory. - B, err = os.ReadFile(path) - if err != nil { - panic(err) - } - } - // Wrap bytes in reader. - b := bytes.NewReader(B) + r := strings.NewReader(s) // Create unzipper from reader. - gz, err := gzip.NewReader(b) + gz, err := gzip.NewReader(r) if err != nil { panic(err) } // Extract gzipped binary. - B, err = io.ReadAll(gz) + b, err := io.ReadAll(gz) if err != nil { panic(err) } + + // Set binary. + s = string(b) } +// B returns a copy of +// embedded binary data. +func B() []byte { + if s == "" { + panic("binary already dropped from memory") + } + return []byte(s) +} + +// Free will drop embedded +// binary from runtime mem. +func Free() { s = "" } + //go:embed ffmpreg.wasm.gz -var B []byte +var s string diff --git a/vendor/codeberg.org/gruf/go-ffmpreg/wasm/funcs.go b/vendor/codeberg.org/gruf/go-ffmpreg/wasm/funcs.go index a809ff120..a0a199ca1 100644 --- a/vendor/codeberg.org/gruf/go-ffmpreg/wasm/funcs.go +++ b/vendor/codeberg.org/gruf/go-ffmpreg/wasm/funcs.go @@ -9,22 +9,93 @@ import ( type snapshotskey struct{} +type snapshotctx struct { + context.Context + snaps *snapshots +} + +func (ctx snapshotctx) Value(key any) any { + if _, ok := key.(snapshotskey); ok { + return ctx.snaps + } + return ctx.Context.Value(key) +} + +const ringsz uint = 8 + +type snapshots struct { + r [ringsz]struct { + eptr uint32 + snap experimental.Snapshot + } + n uint +} + +func (s *snapshots) get(envptr uint32) experimental.Snapshot { + start := (s.n % ringsz) + + for i := start; i != ^uint(0); i-- { + if s.r[i].eptr == envptr { + snap := s.r[i].snap + s.r[i].eptr = 0 + s.r[i].snap = nil + s.n = i - 1 + return snap + } + } + + for i := ringsz - 1; i > start; i-- { + if s.r[i].eptr == envptr { + snap := s.r[i].snap + s.r[i].eptr = 0 + s.r[i].snap = nil + s.n = i - 1 + return snap + } + } + + panic("snapshot not found") +} + +func (s *snapshots) set(envptr uint32, snapshot experimental.Snapshot) { + start := (s.n % ringsz) + + for i := start; i < ringsz; i++ { + switch s.r[i].eptr { + case 0, envptr: + s.r[i].eptr = envptr + s.r[i].snap = snapshot + s.n = i + return + } + } + + for i := uint(0); i < start; i++ { + switch s.r[i].eptr { + case 0, envptr: + s.r[i].eptr = envptr + s.r[i].snap = snapshot + s.n = i + return + } + } + + panic("snapshots full") +} + // withSetjmpLongjmp updates the context to contain wazero/experimental.Snapshotter{} support, // and embeds the necessary snapshots map required for later calls to Setjmp() / Longjmp(). func withSetjmpLongjmp(ctx context.Context) context.Context { - snapshots := make(map[uint32]experimental.Snapshot, 10) - ctx = experimental.WithSnapshotter(ctx) - ctx = context.WithValue(ctx, snapshotskey{}, snapshots) - return ctx + return snapshotctx{Context: experimental.WithSnapshotter(ctx), snaps: new(snapshots)} } -func getSnapshots(ctx context.Context) map[uint32]experimental.Snapshot { - v, _ := ctx.Value(snapshotskey{}).(map[uint32]experimental.Snapshot) +func getSnapshots(ctx context.Context) *snapshots { + v, _ := ctx.Value(snapshotskey{}).(*snapshots) return v } // setjmp implements the C function: setjmp(env jmp_buf) -func setjmp(ctx context.Context, mod api.Module, stack []uint64) { +func setjmp(ctx context.Context, _ api.Module, stack []uint64) { // Input arguments. envptr := api.DecodeU32(stack[0]) @@ -35,19 +106,16 @@ func setjmp(ctx context.Context, mod api.Module, stack []uint64) { // Get stored snapshots map. snapshots := getSnapshots(ctx) - if snapshots == nil { - panic("setjmp / longjmp not supported") - } // Set latest snapshot in map. - snapshots[envptr] = snapshot + snapshots.set(envptr, snapshot) // Set return. stack[0] = 0 } // longjmp implements the C function: int longjmp(env jmp_buf, value int) -func longjmp(ctx context.Context, mod api.Module, stack []uint64) { +func longjmp(ctx context.Context, _ api.Module, stack []uint64) { // Input arguments. envptr := api.DecodeU32(stack[0]) @@ -60,10 +128,7 @@ func longjmp(ctx context.Context, mod api.Module, stack []uint64) { } // Get snapshot stored in map. - snapshot := snapshots[envptr] - if snapshot == nil { - panic("must first call setjmp") - } + snapshot := snapshots.get(envptr) // Set return. stack[0] = 0 diff --git a/vendor/codeberg.org/gruf/go-ffmpreg/wasm/run.go b/vendor/codeberg.org/gruf/go-ffmpreg/wasm/run.go index 7b07d851d..c247abaf0 100644 --- a/vendor/codeberg.org/gruf/go-ffmpreg/wasm/run.go +++ b/vendor/codeberg.org/gruf/go-ffmpreg/wasm/run.go @@ -53,6 +53,7 @@ func Run( modcfg = modcfg.WithStdin(args.Stdin) modcfg = modcfg.WithStdout(args.Stdout) modcfg = modcfg.WithStderr(args.Stderr) + modcfg = modcfg.WithName("") if args.Config != nil { // Pass through config fn. diff --git a/vendor/codeberg.org/gruf/go-ffmpreg/wasm/runtime.go b/vendor/codeberg.org/gruf/go-ffmpreg/wasm/runtime.go index 328a26193..ca13bf775 100644 --- a/vendor/codeberg.org/gruf/go-ffmpreg/wasm/runtime.go +++ b/vendor/codeberg.org/gruf/go-ffmpreg/wasm/runtime.go @@ -28,6 +28,7 @@ func NewRuntime(ctx context.Context, cfg wazero.RuntimeConfig) (wazero.Runtime, // Set core features ffmpeg compiled with. cfg = cfg.WithCoreFeatures(CoreFeatures) + cfg = cfg.WithDebugInfoEnabled(false) // Instantiate runtime with prepared config. rt := wazero.NewRuntimeWithConfig(ctx, cfg) diff --git a/vendor/modules.txt b/vendor/modules.txt index ac4bab587..7d2b53ad4 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -247,7 +247,7 @@ codeberg.org/gruf/go-fastcopy # codeberg.org/gruf/go-fastpath/v2 v2.0.0 ## explicit; go 1.14 codeberg.org/gruf/go-fastpath/v2 -# codeberg.org/gruf/go-ffmpreg v0.6.11 +# codeberg.org/gruf/go-ffmpreg v0.6.12 ## explicit; go 1.22.0 codeberg.org/gruf/go-ffmpreg/embed codeberg.org/gruf/go-ffmpreg/wasm From ae8ac4dd6c56d5d3a562d6d03c9eb82334f8b876 Mon Sep 17 00:00:00 2001 From: tobi Date: Fri, 26 Sep 2025 15:14:26 +0200 Subject: [PATCH 03/11] finalize indexes etc --- CONTRIBUTING.md | 39 +++ .../20250415111056_thread_all_statuses.go | 259 ++++++++++-------- .../new/status.go | 78 +++--- .../util/utiltypes.go | 25 ++ internal/db/bundb/migrations/util.go | 92 ------- internal/gtsmodel/status.go | 100 +++---- 6 files changed, 296 insertions(+), 297 deletions(-) create mode 100644 internal/db/bundb/migrations/20250415111056_thread_all_statuses/util/utiltypes.go diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 611bea97c..7359d65fa 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -33,6 +33,8 @@ These contribution guidelines were adapted from / inspired by those of Gitea (ht - [Federation](#federation) - [Updating Swagger docs](#updating-swagger-docs) - [CI/CD configuration](#ci-cd-configuration) +- [Other Useful Stuff](#other-useful-stuff) + - [Running migrations on a Postgres DB backup locally](#running-migrations-on-a-postgres-db-backup-locally) ## Introduction @@ -525,3 +527,40 @@ The `woodpecker` pipeline files are in the `.woodpecker` directory of this repos The Woodpecker instance for GoToSocial is [here](https://woodpecker.superseriousbusiness.org/repos/2). Documentation for Woodpecker is [here](https://woodpecker-ci.org/docs/intro). + +## Other Useful Stuff + +Various bits and bobs. + +### Running migrations on a Postgres DB backup locally + +It may be useful when testing or debugging migrations to be able to run them against a copy of a real instance's Postgres database locally. + +Basic steps for this: + +1. Dump the Postgres database on the remote machine, and copy the dump over to your development machine. +2. Create a local Postgres container and mount the dump into it with, for example: + + ```bash + docker run -it --name postgres --network host -e POSTGRES_PASSWORD=postgres -v /path/to/db_dump:/db_dump postgres + ``` +3. Get a terminal inside the running container: + + ```bash + docker exec -it --user postgres postgres bash + ``` +4. Using that terminal, restore the dump (this will probably take a little while depending on the dump size and the specs of your machine): + + ```bash + psql -X postgres < /db_dump + ``` +5. With the Postgres container still running, run GoToSocial and point it towards the container. Use the appropriate `GTS_HOST` (and `GTS_ACCOUNT_DOMAIN`) values for the instance you dumped: + + ```bash + GTS_HOST=example.org \ + GTS_DB_TYPE=postgres \ + GTS_DB_POSTGRES_CONNECTION_STRING=postgres://postgres:postgres@localhost:5432/postgres \ + ./gotosocial migrations run + ``` + +When you're done messing around, don't forget to remove any containers that you started up, and remove any lingering volumes with `docker volume prune`, else you might end up filling your disk with unused temporary volumes. diff --git a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go index fc02d1e40..0b0389e7a 100644 --- a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go +++ b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go @@ -28,10 +28,13 @@ import ( "code.superseriousbusiness.org/gotosocial/internal/db" newmodel "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new" oldmodel "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/old" + "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/util" "code.superseriousbusiness.org/gotosocial/internal/gtserror" "code.superseriousbusiness.org/gotosocial/internal/id" "code.superseriousbusiness.org/gotosocial/internal/log" + "code.superseriousbusiness.org/gotosocial/internal/util/xslices" "github.com/uptrace/bun" + "github.com/uptrace/bun/dialect" ) func init() { @@ -44,12 +47,60 @@ func init() { return gtserror.Newf("error getting bun column def: %w", err) } - // Update column def to use '${name}_new'. + // Update column def to use temporary + // '${name}_new' while we migrate. newColDef = strings.Replace(newColDef, "thread_id", "thread_id_new", 1) + // Create thread_id_new already + // so we can populate it as we go. + log.Info(ctx, "creating statuses column thread_id_new") + if _, err := db.NewAddColumn(). + Table("statuses"). + ColumnExpr(newColDef). + Exec(ctx); err != nil { + return gtserror.Newf("error adding statuses column thread_id_new: %w", err) + } + + // Create an index on thread_id_new so we can keep + // track of it as we update, and use it to avoid + // selecting statuses we've already updated. + // + // We'll remove this at the end of the migration. + log.Info(ctx, "creating temporary thread_id_new index") + if db.Dialect().Name() == dialect.PG { + // On Postgres we can use a partial index + // to indicate we only care about default + // value thread IDs (ie., not set yet). + if _, err := db.NewCreateIndex(). + Table("statuses"). + Index("statuses_thread_id_new_idx"). + Column("thread_id_new"). + Where("? = ?", bun.Ident("thread_id_new"), id.Lowest). + Exec(ctx); err != nil { + return gtserror.Newf("error creating temporary thread_id_new index: %w", err) + } + } else { + // On SQLite we can use an index expression + // to do the same thing. While we *can* use + // a partial index for this, an index expression + // is magnitudes faster. Databases! + if _, err := db.NewCreateIndex(). + Table("statuses"). + Index("statuses_thread_id_new_idx"). + ColumnExpr("? = ?", bun.Ident("thread_id_new"), id.Lowest). + Exec(ctx); err != nil { + return gtserror.Newf("error creating temporary thread_id_new index: %w", err) + } + } + + // Attempt to merge any sqlite write-ahead-log. + if err := doWALCheckpoint(ctx, db); err != nil { + return err + } + var sr statusRethreader - var count int + var updatedRows int64 var maxID string var statuses []*oldmodel.Status @@ -63,47 +114,45 @@ func init() { // possible ULID value. maxID = id.Highest - log.Warn(ctx, "rethreading top-level statuses, this will take a *long* time") - for /* TOP LEVEL STATUS LOOP */ { + log.Warnf(ctx, "migrating %d statuses, this may take a *long* time", total) + for { // Reset slice. clear(statuses) statuses = statuses[:0] - // Select top-level statuses. + // Select IDs of next batch, paging down + // and skipping statuses to which we've + // already allocated a (new) thread ID. if err := db.NewSelect(). Model(&statuses). - Column("id", "thread_id"). - - // We specifically use in_reply_to_account_id instead of in_reply_to_id as - // they should both be set / unset in unison, but we specifically have an - // index on in_reply_to_account_id with ID ordering, unlike in_reply_to_id. - Where("? IS NULL", bun.Ident("in_reply_to_account_id")). + Column("id"). + Where("? = ?", bun.Ident("thread_id_new"), id.Lowest). Where("? < ?", bun.Ident("id"), maxID). OrderExpr("? DESC", bun.Ident("id")). - Limit(5000). + Limit(200). Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) { - return gtserror.Newf("error selecting top level statuses: %w", err) + return gtserror.Newf("error selecting unthreaded statuses: %w", err) } - // Reached end of block. - if len(statuses) == 0 { + // No more statuses! + l := len(statuses) + if l == 0 { + log.Info(ctx, "done migrating statuses!") break } // Set next maxID value from statuses. - maxID = statuses[len(statuses)-1].ID + maxID = statuses[l-1].ID - // Rethread each selected batch of top-level statuses in a transaction. + // Rethread each selected status in a transaction. if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { - - // Rethread each top-level status. for _, status := range statuses { n, err := sr.rethreadStatus(ctx, tx, status) if err != nil { return gtserror.Newf("error rethreading status %s: %w", status.URI, err) } - count += n + updatedRows += n } return nil @@ -111,7 +160,24 @@ func init() { return err } - log.Infof(ctx, "[approx %d of %d] rethreading statuses (top-level)", count, total) + // Show percent migrated. + // + // Will maybe end up wonky due to approximations + // and batching, so show a generic message at 100%. + percentDone := (float64(updatedRows) / float64(total)) * 100 + if percentDone <= 100 { + log.Infof( + ctx, + "[updated %d rows] migrated approx. %.2f%% of statuses", + updatedRows, percentDone, + ) + } else { + log.Infof( + ctx, + "[updated %d rows] almost done migrating... ", + updatedRows, + ) + } } // Attempt to merge any sqlite write-ahead-log. @@ -119,58 +185,11 @@ func init() { return err } - log.Warn(ctx, "rethreading straggler statuses, this will take a *long* time") - for /* STRAGGLER STATUS LOOP */ { - - // Reset slice. - clear(statuses) - statuses = statuses[:0] - - // Select straggler statuses. - if err := db.NewSelect(). - Model(&statuses). - Column("id", "in_reply_to_id", "thread_id"). - Where("? IS NULL", bun.Ident("thread_id")). - - // We select in smaller batches for this part - // of the migration as there is a chance that - // we may be fetching statuses that might be - // part of the same thread, i.e. one call to - // rethreadStatus() may effect other statuses - // later in the slice. - Limit(1000). - Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) { - return gtserror.Newf("error selecting straggler statuses: %w", err) - } - - // Reached end of block. - if len(statuses) == 0 { - break - } - - // Rethread each selected batch of straggler statuses in a transaction. - if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { - - // Rethread each top-level status. - for _, status := range statuses { - n, err := sr.rethreadStatus(ctx, tx, status) - if err != nil { - return gtserror.Newf("error rethreading status %s: %w", status.URI, err) - } - count += n - } - - return nil - }); err != nil { - return err - } - - log.Infof(ctx, "[approx %d of %d] rethreading statuses (stragglers)", count, total) - } - - // Attempt to merge any sqlite write-ahead-log. - if err := doWALCheckpoint(ctx, db); err != nil { - return err + log.Info(ctx, "dropping temporary thread_id_new index") + if _, err := db.NewDropIndex(). + Index("statuses_thread_id_new_idx"). + Exec(ctx); err != nil { + return gtserror.Newf("error dropping temporary thread_id_new index: %w", err) } log.Info(ctx, "dropping old thread_to_statuses table") @@ -180,33 +199,6 @@ func init() { return gtserror.Newf("error dropping old thread_to_statuses table: %w", err) } - log.Info(ctx, "creating new statuses thread_id column") - if _, err := db.NewAddColumn(). - Table("statuses"). - ColumnExpr(newColDef). - Exec(ctx); err != nil { - return gtserror.Newf("error adding new thread_id column: %w", err) - } - - log.Info(ctx, "setting thread_id_new = thread_id (this may take a while...)") - if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { - return batchUpdateByID(ctx, tx, - "statuses", // table - "id", // batchByCol - "UPDATE ? SET ? = ?", // updateQuery - []any{bun.Ident("statuses"), - bun.Ident("thread_id_new"), - bun.Ident("thread_id")}, - ) - }); err != nil { - return err - } - - // Attempt to merge any sqlite write-ahead-log. - if err := doWALCheckpoint(ctx, db); err != nil { - return err - } - log.Info(ctx, "dropping old statuses thread_id index") if _, err := db.NewDropIndex(). Index("statuses_thread_id_idx"). @@ -289,8 +281,8 @@ type statusRethreader struct { } // rethreadStatus is the main logic handler for statusRethreader{}. this is what gets called from the migration -// in order to trigger a status rethreading operation for the given status, returning total number rethreaded. -func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, status *oldmodel.Status) (int, error) { +// in order to trigger a status rethreading operation for the given status, returning total number of rows changed. +func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, status *oldmodel.Status) (int64, error) { // Zero slice and // map ptr values. @@ -346,6 +338,8 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu return 0, gtserror.Newf("error getting children: %w", err) } + // Dedupe thread IDs. + // Check for newly picked-up threads // to find stragglers for below. Else // we've reached end of what we can do. @@ -371,10 +365,6 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu threadIdx = len(sr.threadIDs) } - // Total number of - // statuses threaded. - total := len(sr.statusIDs) - // Check for the case where the entire // batch of statuses is already correctly // threaded. Then we have nothing to do! @@ -417,29 +407,61 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu } } - // Update all the statuses to - // use determined thread_id. - if _, err := tx.NewUpdate(). - Table("statuses"). - Where("? IN (?)", bun.Ident("id"), bun.In(sr.statusIDs)). - Set("? = ?", bun.Ident("thread_id"), threadID). - Exec(ctx); err != nil { + // Use a bulk update to update all the + // statuses to use determined thread_id. + // + // https://bun.uptrace.dev/guide/query-update.html#bulk-update + values := make([]*util.Status, 0, len(sr.statusIDs)) + for _, statusID := range sr.statusIDs { + values = append(values, &util.Status{ + ID: statusID, + ThreadIDNew: threadID, + }) + } + + res, err := tx.NewUpdate(). + With("_data", tx.NewValues(&values)). + Model((*util.Status)(nil)). + TableExpr("_data"). + // Set the new thread ID, which we can use as + // an indication that we've migrated this batch. + Set("? = ?", bun.Ident("thread_id_new"), bun.Ident("_data.thread_id_new")). + // While we're here, also set old thread_id, as + // we'll use it for further rethreading purposes. + Set("? = ?", bun.Ident("thread_id"), bun.Ident("_data.thread_id_new")). + // "Join" on status ID. + Where("? = ?", bun.Ident("status.id"), bun.Ident("_data.id")). + // To avoid spurious writes, + // only update unmigrated statuses. + Where("? = ?", bun.Ident("status.thread_id_new"), id.Lowest). + Exec(ctx) + if err != nil { return 0, gtserror.Newf("error updating status thread ids: %w", err) } + rowsAffected, err := res.RowsAffected() + if err != nil { + return 0, gtserror.Newf("error counting rows affected: %w", err) + } + if len(sr.threadIDs) > 0 { // Update any existing thread // mutes to use latest thread_id. + + // Dedupe thread IDs before query + // to avoid ludicrous "IN" clause. + threadIDs := sr.threadIDs + threadIDs = xslices.Deduplicate(threadIDs) if _, err := tx.NewUpdate(). Table("thread_mutes"). - Where("? IN (?)", bun.Ident("thread_id"), bun.In(sr.threadIDs)). + Where("? IN (?)", bun.Ident("thread_id"), bun.In(threadIDs)). Set("? = ?", bun.Ident("thread_id"), threadID). Exec(ctx); err != nil { return 0, gtserror.Newf("error updating mute thread ids: %w", err) } } - return total, nil + return rowsAffected, nil } // append will append the given status to the internal tracking of statusRethreader{} for @@ -560,6 +582,11 @@ func (sr *statusRethreader) getStragglers(ctx context.Context, tx bun.Tx, idx in clear(sr.statuses) sr.statuses = sr.statuses[:0] + // Dedupe thread IDs before query + // to avoid ludicrous "IN" clause. + threadIDs := sr.threadIDs[idx:] + threadIDs = xslices.Deduplicate(threadIDs) + // Select stragglers that // also have thread IDs. if err := tx.NewSelect(). @@ -567,7 +594,7 @@ func (sr *statusRethreader) getStragglers(ctx context.Context, tx bun.Tx, idx in Column("id", "thread_id", "in_reply_to_id"). Where("? IN (?) AND ? NOT IN (?)", bun.Ident("thread_id"), - bun.In(sr.threadIDs[idx:]), + bun.In(threadIDs), bun.Ident("id"), bun.In(sr.statusIDs), ). diff --git a/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new/status.go b/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new/status.go index a03e93859..0f20bb7a4 100644 --- a/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new/status.go +++ b/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new/status.go @@ -23,45 +23,45 @@ import ( // Status represents a user-created 'post' or 'status' in the database, either remote or local type Status struct { - ID string `bun:"type:CHAR(26),pk,nullzero,notnull,unique"` // id of this item in the database - CreatedAt time.Time `bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"` // when was item created - EditedAt time.Time `bun:"type:timestamptz,nullzero"` // when this status was last edited (if set) - FetchedAt time.Time `bun:"type:timestamptz,nullzero"` // when was item (remote) last fetched. - PinnedAt time.Time `bun:"type:timestamptz,nullzero"` // Status was pinned by owning account at this time. - URI string `bun:",unique,nullzero,notnull"` // activitypub URI of this status - URL string `bun:",nullzero"` // web url for viewing this status - Content string `bun:""` // Content HTML for this status. - AttachmentIDs []string `bun:"attachments,array"` // Database IDs of any media attachments associated with this status - TagIDs []string `bun:"tags,array"` // Database IDs of any tags used in this status - MentionIDs []string `bun:"mentions,array"` // Database IDs of any mentions in this status - EmojiIDs []string `bun:"emojis,array"` // Database IDs of any emojis used in this status - Local *bool `bun:",nullzero,notnull,default:false"` // is this status from a local account? - AccountID string `bun:"type:CHAR(26),nullzero,notnull"` // which account posted this status? - AccountURI string `bun:",nullzero,notnull"` // activitypub uri of the owner of this status - InReplyToID string `bun:"type:CHAR(26),nullzero"` // id of the status this status replies to - InReplyToURI string `bun:",nullzero"` // activitypub uri of the status this status is a reply to - InReplyToAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that this status replies to - InReplyTo *Status `bun:"-"` // status corresponding to inReplyToID - BoostOfID string `bun:"type:CHAR(26),nullzero"` // id of the status this status is a boost of - BoostOfURI string `bun:"-"` // URI of the status this status is a boost of; field not inserted in the db, just for dereferencing purposes. - BoostOfAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that owns the boosted status - BoostOf *Status `bun:"-"` // status that corresponds to boostOfID - ThreadID string `bun:"type:CHAR(26),nullzero,notnull,default:00000000000000000000000000"` // id of the thread to which this status belongs - EditIDs []string `bun:"edits,array"` // - PollID string `bun:"type:CHAR(26),nullzero"` // - ContentWarning string `bun:",nullzero"` // Content warning HTML for this status. - ContentWarningText string `bun:""` // Original text of the content warning without formatting - Visibility Visibility `bun:",nullzero,notnull"` // visibility entry for this status - Sensitive *bool `bun:",nullzero,notnull,default:false"` // mark the status as sensitive? - Language string `bun:",nullzero"` // what language is this status written in? - CreatedWithApplicationID string `bun:"type:CHAR(26),nullzero"` // Which application was used to create this status? - ActivityStreamsType string `bun:",nullzero,notnull"` // What is the activitystreams type of this status? See: https://www.w3.org/TR/activitystreams-vocabulary/#object-types. Will probably almost always be Note but who knows!. - Text string `bun:""` // Original text of the status without formatting - ContentType StatusContentType `bun:",nullzero"` // Content type used to process the original text of the status - Federated *bool `bun:",notnull"` // This status will be federated beyond the local timeline(s) - PendingApproval *bool `bun:",nullzero,notnull,default:false"` // If true then status is a reply or boost wrapper that must be Approved by the reply-ee or boost-ee before being fully distributed. - PreApproved bool `bun:"-"` // If true, then status is a reply to or boost wrapper of a status on our instance, has permission to do the interaction, and an Accept should be sent out for it immediately. Field not stored in the DB. - ApprovedByURI string `bun:",nullzero"` // URI of an Accept Activity that approves the Announce or Create Activity that this status was/will be attached to. + ID string `bun:"type:CHAR(26),pk,nullzero,notnull,unique"` // id of this item in the database + CreatedAt time.Time `bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"` // when was item created + EditedAt time.Time `bun:"type:timestamptz,nullzero"` // when this status was last edited (if set) + FetchedAt time.Time `bun:"type:timestamptz,nullzero"` // when was item (remote) last fetched. + PinnedAt time.Time `bun:"type:timestamptz,nullzero"` // Status was pinned by owning account at this time. + URI string `bun:",unique,nullzero,notnull"` // activitypub URI of this status + URL string `bun:",nullzero"` // web url for viewing this status + Content string `bun:""` // Content HTML for this status. + AttachmentIDs []string `bun:"attachments,array"` // Database IDs of any media attachments associated with this status + TagIDs []string `bun:"tags,array"` // Database IDs of any tags used in this status + MentionIDs []string `bun:"mentions,array"` // Database IDs of any mentions in this status + EmojiIDs []string `bun:"emojis,array"` // Database IDs of any emojis used in this status + Local *bool `bun:",nullzero,notnull,default:false"` // is this status from a local account? + AccountID string `bun:"type:CHAR(26),nullzero,notnull"` // which account posted this status? + AccountURI string `bun:",nullzero,notnull"` // activitypub uri of the owner of this status + InReplyToID string `bun:"type:CHAR(26),nullzero"` // id of the status this status replies to + InReplyToURI string `bun:",nullzero"` // activitypub uri of the status this status is a reply to + InReplyToAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that this status replies to + InReplyTo *Status `bun:"-"` // status corresponding to inReplyToID + BoostOfID string `bun:"type:CHAR(26),nullzero"` // id of the status this status is a boost of + BoostOfURI string `bun:"-"` // URI of the status this status is a boost of; field not inserted in the db, just for dereferencing purposes. + BoostOfAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that owns the boosted status + BoostOf *Status `bun:"-"` // status that corresponds to boostOfID + ThreadID string `bun:"type:CHAR(26),nullzero,notnull,default:'00000000000000000000000000'"` // id of the thread to which this status belongs + EditIDs []string `bun:"edits,array"` // + PollID string `bun:"type:CHAR(26),nullzero"` // + ContentWarning string `bun:",nullzero"` // Content warning HTML for this status. + ContentWarningText string `bun:""` // Original text of the content warning without formatting + Visibility Visibility `bun:",nullzero,notnull"` // visibility entry for this status + Sensitive *bool `bun:",nullzero,notnull,default:false"` // mark the status as sensitive? + Language string `bun:",nullzero"` // what language is this status written in? + CreatedWithApplicationID string `bun:"type:CHAR(26),nullzero"` // Which application was used to create this status? + ActivityStreamsType string `bun:",nullzero,notnull"` // What is the activitystreams type of this status? See: https://www.w3.org/TR/activitystreams-vocabulary/#object-types. Will probably almost always be Note but who knows!. + Text string `bun:""` // Original text of the status without formatting + ContentType StatusContentType `bun:",nullzero"` // Content type used to process the original text of the status + Federated *bool `bun:",notnull"` // This status will be federated beyond the local timeline(s) + PendingApproval *bool `bun:",nullzero,notnull,default:false"` // If true then status is a reply or boost wrapper that must be Approved by the reply-ee or boost-ee before being fully distributed. + PreApproved bool `bun:"-"` // If true, then status is a reply to or boost wrapper of a status on our instance, has permission to do the interaction, and an Accept should be sent out for it immediately. Field not stored in the DB. + ApprovedByURI string `bun:",nullzero"` // URI of an Accept Activity that approves the Announce or Create Activity that this status was/will be attached to. } // enumType is the type we (at least, should) use diff --git a/internal/db/bundb/migrations/20250415111056_thread_all_statuses/util/utiltypes.go b/internal/db/bundb/migrations/20250415111056_thread_all_statuses/util/utiltypes.go new file mode 100644 index 000000000..ed7256aad --- /dev/null +++ b/internal/db/bundb/migrations/20250415111056_thread_all_statuses/util/utiltypes.go @@ -0,0 +1,25 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package util + +// Status is a helper type specifically +// for updating the thread ID of a status. +type Status struct { + ID string `bun:"type:CHAR(26)"` + ThreadIDNew string `bun:"type:CHAR(26)"` +} diff --git a/internal/db/bundb/migrations/util.go b/internal/db/bundb/migrations/util.go index 4a3a62b21..b5543c824 100644 --- a/internal/db/bundb/migrations/util.go +++ b/internal/db/bundb/migrations/util.go @@ -66,98 +66,6 @@ func doWALCheckpoint(ctx context.Context, db *bun.DB) error { return nil } -// batchUpdateByID performs the given updateQuery with updateArgs -// over the entire given table, batching by the ID of batchByCol. -func batchUpdateByID( - ctx context.Context, - tx bun.Tx, - table string, - batchByCol string, - updateQuery string, - updateArgs []any, -) error { - // Get a count of all in table. - total, err := tx.NewSelect(). - Table(table). - Count(ctx) - if err != nil { - return gtserror.Newf("error selecting total count: %w", err) - } - - // Query batch size - // in number of rows. - const batchsz = 5000 - - // Stores highest batch value - // used in iterate queries, - // starting at highest possible. - highest := id.Highest - - // Total updated rows. - var updated int - - for { - // Limit to batchsz - // items at once. - batchQ := tx. - NewSelect(). - Table(table). - Column(batchByCol). - Where("? < ?", bun.Ident(batchByCol), highest). - OrderExpr("? DESC", bun.Ident(batchByCol)). - Limit(batchsz) - - // Finalize UPDATE to act only on batch. - qStr := updateQuery + " WHERE ? IN (?)" - args := append(slices.Clone(updateArgs), - bun.Ident(batchByCol), - batchQ, - ) - - // Execute the prepared raw query with arguments. - res, err := tx.NewRaw(qStr, args...).Exec(ctx) - if err != nil { - return gtserror.Newf("error updating old column values: %w", err) - } - - // Check how many items we updated. - thisUpdated, err := res.RowsAffected() - if err != nil { - return gtserror.Newf("error counting affected rows: %w", err) - } - - if thisUpdated == 0 { - // Nothing updated - // means we're done. - break - } - - // Update the overall count. - updated += int(thisUpdated) - - // Log helpful message to admin. - log.Infof(ctx, "migrated %d of %d %s (up to %s)", - updated, total, table, highest) - - // Get next highest - // id for next batch. - if err := tx. - NewSelect(). - With("batch_query", batchQ). - ColumnExpr("min(?) FROM ?", bun.Ident(batchByCol), bun.Ident("batch_query")). - Scan(ctx, &highest); err != nil { - return gtserror.Newf("error selecting next highest: %w", err) - } - } - - if total != int(updated) { - // Return error here in order to rollback the whole transaction. - return fmt.Errorf("total=%d does not match updated=%d", total, updated) - } - - return nil -} - // convertEnums performs a transaction that converts // a table's column of our old-style enums (strings) to // more performant and space-saving integer types. diff --git a/internal/gtsmodel/status.go b/internal/gtsmodel/status.go index 31e8fe881..0f0fb8404 100644 --- a/internal/gtsmodel/status.go +++ b/internal/gtsmodel/status.go @@ -27,56 +27,56 @@ import ( // Status represents a user-created 'post' or 'status' in the database, either remote or local type Status struct { - ID string `bun:"type:CHAR(26),pk,nullzero,notnull,unique"` // id of this item in the database - CreatedAt time.Time `bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"` // when was item created - EditedAt time.Time `bun:"type:timestamptz,nullzero"` // when this status was last edited (if set) - FetchedAt time.Time `bun:"type:timestamptz,nullzero"` // when was item (remote) last fetched. - PinnedAt time.Time `bun:"type:timestamptz,nullzero"` // Status was pinned by owning account at this time. - URI string `bun:",unique,nullzero,notnull"` // activitypub URI of this status - URL string `bun:",nullzero"` // web url for viewing this status - Content string `bun:""` // Content HTML for this status. - AttachmentIDs []string `bun:"attachments,array"` // Database IDs of any media attachments associated with this status - Attachments []*MediaAttachment `bun:"attached_media,rel:has-many"` // Attachments corresponding to attachmentIDs - TagIDs []string `bun:"tags,array"` // Database IDs of any tags used in this status - Tags []*Tag `bun:"attached_tags,m2m:status_to_tags"` // Tags corresponding to tagIDs. https://bun.uptrace.dev/guide/relations.html#many-to-many-relation - MentionIDs []string `bun:"mentions,array"` // Database IDs of any mentions in this status - Mentions []*Mention `bun:"attached_mentions,rel:has-many"` // Mentions corresponding to mentionIDs - EmojiIDs []string `bun:"emojis,array"` // Database IDs of any emojis used in this status - Emojis []*Emoji `bun:"attached_emojis,m2m:status_to_emojis"` // Emojis corresponding to emojiIDs. https://bun.uptrace.dev/guide/relations.html#many-to-many-relation - Local *bool `bun:",nullzero,notnull,default:false"` // is this status from a local account? - AccountID string `bun:"type:CHAR(26),nullzero,notnull"` // which account posted this status? - Account *Account `bun:"rel:belongs-to"` // account corresponding to accountID - AccountURI string `bun:",nullzero,notnull"` // activitypub uri of the owner of this status - InReplyToID string `bun:"type:CHAR(26),nullzero"` // id of the status this status replies to - InReplyToURI string `bun:",nullzero"` // activitypub uri of the status this status is a reply to - InReplyToAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that this status replies to - InReplyTo *Status `bun:"-"` // status corresponding to inReplyToID - InReplyToAccount *Account `bun:"rel:belongs-to"` // account corresponding to inReplyToAccountID - BoostOfID string `bun:"type:CHAR(26),nullzero"` // id of the status this status is a boost of - BoostOfURI string `bun:"-"` // URI of the status this status is a boost of; field not inserted in the db, just for dereferencing purposes. - BoostOfAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that owns the boosted status - BoostOf *Status `bun:"-"` // status that corresponds to boostOfID - BoostOfAccount *Account `bun:"rel:belongs-to"` // account that corresponds to boostOfAccountID - ThreadID string `bun:"type:CHAR(26),nullzero,notnull,default:00000000000000000000000000"` // id of the thread to which this status belongs - EditIDs []string `bun:"edits,array"` // IDs of status edits for this status, ordered from smallest (oldest) -> largest (newest) ID. - Edits []*StatusEdit `bun:"-"` // Edits of this status, ordered from oldest -> newest edit. - PollID string `bun:"type:CHAR(26),nullzero"` // - Poll *Poll `bun:"-"` // - ContentWarning string `bun:",nullzero"` // Content warning HTML for this status. - ContentWarningText string `bun:""` // Original text of the content warning without formatting - Visibility Visibility `bun:",nullzero,notnull"` // visibility entry for this status - Sensitive *bool `bun:",nullzero,notnull,default:false"` // mark the status as sensitive? - Language string `bun:",nullzero"` // what language is this status written in? - CreatedWithApplicationID string `bun:"type:CHAR(26),nullzero"` // Which application was used to create this status? - CreatedWithApplication *Application `bun:"rel:belongs-to"` // application corresponding to createdWithApplicationID - ActivityStreamsType string `bun:",nullzero,notnull"` // What is the activitystreams type of this status? See: https://www.w3.org/TR/activitystreams-vocabulary/#object-types. Will probably almost always be Note but who knows!. - Text string `bun:""` // Original text of the status without formatting - ContentType StatusContentType `bun:",nullzero"` // Content type used to process the original text of the status - Federated *bool `bun:",notnull"` // This status will be federated beyond the local timeline(s) - InteractionPolicy *InteractionPolicy `bun:""` // InteractionPolicy for this status. If null then the default InteractionPolicy should be assumed for this status's Visibility. Always null for boost wrappers. - PendingApproval *bool `bun:",nullzero,notnull,default:false"` // If true then status is a reply or boost wrapper that must be Approved by the reply-ee or boost-ee before being fully distributed. - PreApproved bool `bun:"-"` // If true, then status is a reply to or boost wrapper of a status on our instance, has permission to do the interaction, and an Accept should be sent out for it immediately. Field not stored in the DB. - ApprovedByURI string `bun:",nullzero"` // URI of *either* an Accept Activity, or a ReplyAuthorization or AnnounceAuthorization, which approves the Announce, Create or interaction request Activity that this status was/will be attached to. + ID string `bun:"type:CHAR(26),pk,nullzero,notnull,unique"` // id of this item in the database + CreatedAt time.Time `bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"` // when was item created + EditedAt time.Time `bun:"type:timestamptz,nullzero"` // when this status was last edited (if set) + FetchedAt time.Time `bun:"type:timestamptz,nullzero"` // when was item (remote) last fetched. + PinnedAt time.Time `bun:"type:timestamptz,nullzero"` // Status was pinned by owning account at this time. + URI string `bun:",unique,nullzero,notnull"` // activitypub URI of this status + URL string `bun:",nullzero"` // web url for viewing this status + Content string `bun:""` // Content HTML for this status. + AttachmentIDs []string `bun:"attachments,array"` // Database IDs of any media attachments associated with this status + Attachments []*MediaAttachment `bun:"attached_media,rel:has-many"` // Attachments corresponding to attachmentIDs + TagIDs []string `bun:"tags,array"` // Database IDs of any tags used in this status + Tags []*Tag `bun:"attached_tags,m2m:status_to_tags"` // Tags corresponding to tagIDs. https://bun.uptrace.dev/guide/relations.html#many-to-many-relation + MentionIDs []string `bun:"mentions,array"` // Database IDs of any mentions in this status + Mentions []*Mention `bun:"attached_mentions,rel:has-many"` // Mentions corresponding to mentionIDs + EmojiIDs []string `bun:"emojis,array"` // Database IDs of any emojis used in this status + Emojis []*Emoji `bun:"attached_emojis,m2m:status_to_emojis"` // Emojis corresponding to emojiIDs. https://bun.uptrace.dev/guide/relations.html#many-to-many-relation + Local *bool `bun:",nullzero,notnull,default:false"` // is this status from a local account? + AccountID string `bun:"type:CHAR(26),nullzero,notnull"` // which account posted this status? + Account *Account `bun:"rel:belongs-to"` // account corresponding to accountID + AccountURI string `bun:",nullzero,notnull"` // activitypub uri of the owner of this status + InReplyToID string `bun:"type:CHAR(26),nullzero"` // id of the status this status replies to + InReplyToURI string `bun:",nullzero"` // activitypub uri of the status this status is a reply to + InReplyToAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that this status replies to + InReplyTo *Status `bun:"-"` // status corresponding to inReplyToID + InReplyToAccount *Account `bun:"rel:belongs-to"` // account corresponding to inReplyToAccountID + BoostOfID string `bun:"type:CHAR(26),nullzero"` // id of the status this status is a boost of + BoostOfURI string `bun:"-"` // URI of the status this status is a boost of; field not inserted in the db, just for dereferencing purposes. + BoostOfAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that owns the boosted status + BoostOf *Status `bun:"-"` // status that corresponds to boostOfID + BoostOfAccount *Account `bun:"rel:belongs-to"` // account that corresponds to boostOfAccountID + ThreadID string `bun:"type:CHAR(26),nullzero,notnull,default:'00000000000000000000000000'"` // id of the thread to which this status belongs + EditIDs []string `bun:"edits,array"` // IDs of status edits for this status, ordered from smallest (oldest) -> largest (newest) ID. + Edits []*StatusEdit `bun:"-"` // Edits of this status, ordered from oldest -> newest edit. + PollID string `bun:"type:CHAR(26),nullzero"` // + Poll *Poll `bun:"-"` // + ContentWarning string `bun:",nullzero"` // Content warning HTML for this status. + ContentWarningText string `bun:""` // Original text of the content warning without formatting + Visibility Visibility `bun:",nullzero,notnull"` // visibility entry for this status + Sensitive *bool `bun:",nullzero,notnull,default:false"` // mark the status as sensitive? + Language string `bun:",nullzero"` // what language is this status written in? + CreatedWithApplicationID string `bun:"type:CHAR(26),nullzero"` // Which application was used to create this status? + CreatedWithApplication *Application `bun:"rel:belongs-to"` // application corresponding to createdWithApplicationID + ActivityStreamsType string `bun:",nullzero,notnull"` // What is the activitystreams type of this status? See: https://www.w3.org/TR/activitystreams-vocabulary/#object-types. Will probably almost always be Note but who knows!. + Text string `bun:""` // Original text of the status without formatting + ContentType StatusContentType `bun:",nullzero"` // Content type used to process the original text of the status + Federated *bool `bun:",notnull"` // This status will be federated beyond the local timeline(s) + InteractionPolicy *InteractionPolicy `bun:""` // InteractionPolicy for this status. If null then the default InteractionPolicy should be assumed for this status's Visibility. Always null for boost wrappers. + PendingApproval *bool `bun:",nullzero,notnull,default:false"` // If true then status is a reply or boost wrapper that must be Approved by the reply-ee or boost-ee before being fully distributed. + PreApproved bool `bun:"-"` // If true, then status is a reply to or boost wrapper of a status on our instance, has permission to do the interaction, and an Accept should be sent out for it immediately. Field not stored in the DB. + ApprovedByURI string `bun:",nullzero"` // URI of *either* an Accept Activity, or a ReplyAuthorization or AnnounceAuthorization, which approves the Announce, Create or interaction request Activity that this status was/will be attached to. } // GetID implements timeline.Timelineable{}. From 487292e6f323eec47ea50c1a3a43fed65d6715da Mon Sep 17 00:00:00 2001 From: tobi Date: Fri, 26 Sep 2025 15:17:30 +0200 Subject: [PATCH 04/11] remove errant comment --- .../db/bundb/migrations/20250415111056_thread_all_statuses.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go index 0b0389e7a..86fbbd49f 100644 --- a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go +++ b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go @@ -338,8 +338,6 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu return 0, gtserror.Newf("error getting children: %w", err) } - // Dedupe thread IDs. - // Check for newly picked-up threads // to find stragglers for below. Else // we've reached end of what we can do. From 365b9efb12cfc56e31d48f1c5f51b2f743b3e477 Mon Sep 17 00:00:00 2001 From: tobi Date: Fri, 26 Sep 2025 17:14:42 +0200 Subject: [PATCH 05/11] whew --- .../20250415111056_thread_all_statuses.go | 79 +++++-------------- 1 file changed, 20 insertions(+), 59 deletions(-) diff --git a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go index 86fbbd49f..bfa4dd84f 100644 --- a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go +++ b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go @@ -24,6 +24,7 @@ import ( "reflect" "slices" "strings" + "time" "code.superseriousbusiness.org/gotosocial/internal/db" newmodel "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new" @@ -34,7 +35,6 @@ import ( "code.superseriousbusiness.org/gotosocial/internal/log" "code.superseriousbusiness.org/gotosocial/internal/util/xslices" "github.com/uptrace/bun" - "github.com/uptrace/bun/dialect" ) func init() { @@ -62,45 +62,8 @@ func init() { return gtserror.Newf("error adding statuses column thread_id_new: %w", err) } - // Create an index on thread_id_new so we can keep - // track of it as we update, and use it to avoid - // selecting statuses we've already updated. - // - // We'll remove this at the end of the migration. - log.Info(ctx, "creating temporary thread_id_new index") - if db.Dialect().Name() == dialect.PG { - // On Postgres we can use a partial index - // to indicate we only care about default - // value thread IDs (ie., not set yet). - if _, err := db.NewCreateIndex(). - Table("statuses"). - Index("statuses_thread_id_new_idx"). - Column("thread_id_new"). - Where("? = ?", bun.Ident("thread_id_new"), id.Lowest). - Exec(ctx); err != nil { - return gtserror.Newf("error creating temporary thread_id_new index: %w", err) - } - } else { - // On SQLite we can use an index expression - // to do the same thing. While we *can* use - // a partial index for this, an index expression - // is magnitudes faster. Databases! - if _, err := db.NewCreateIndex(). - Table("statuses"). - Index("statuses_thread_id_new_idx"). - ColumnExpr("? = ?", bun.Ident("thread_id_new"), id.Lowest). - Exec(ctx); err != nil { - return gtserror.Newf("error creating temporary thread_id_new index: %w", err) - } - } - - // Attempt to merge any sqlite write-ahead-log. - if err := doWALCheckpoint(ctx, db); err != nil { - return err - } - var sr statusRethreader - var updatedRows int64 + var updatedRowsTotal int64 var maxID string var statuses []*oldmodel.Status @@ -116,21 +79,20 @@ func init() { log.Warnf(ctx, "migrating %d statuses, this may take a *long* time", total) for { + start := time.Now() // Reset slice. clear(statuses) statuses = statuses[:0] - // Select IDs of next batch, paging down - // and skipping statuses to which we've - // already allocated a (new) thread ID. + // Select IDs of next + // batch, paging down. if err := db.NewSelect(). Model(&statuses). Column("id"). - Where("? = ?", bun.Ident("thread_id_new"), id.Lowest). Where("? < ?", bun.Ident("id"), maxID). OrderExpr("? DESC", bun.Ident("id")). - Limit(200). + Limit(250). Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) { return gtserror.Newf("error selecting unthreaded statuses: %w", err) } @@ -146,13 +108,15 @@ func init() { maxID = statuses[l-1].ID // Rethread each selected status in a transaction. + var updatedRowsThisBatch int64 if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { for _, status := range statuses { n, err := sr.rethreadStatus(ctx, tx, status) if err != nil { return gtserror.Newf("error rethreading status %s: %w", status.URI, err) } - updatedRows += n + updatedRowsThisBatch += n + updatedRowsTotal += n } return nil @@ -160,22 +124,26 @@ func init() { return err } - // Show percent migrated. + // Show current speed + percent migrated. // - // Will maybe end up wonky due to approximations + // Percent may end up wonky due to approximations // and batching, so show a generic message at 100%. - percentDone := (float64(updatedRows) / float64(total)) * 100 + timeTaken := time.Since(start).Milliseconds() + msPerRow := float64(timeTaken) / float64(updatedRowsThisBatch) + rowsPerMs := float64(1) / float64(msPerRow) + rowsPerSecond := 1000 * rowsPerMs + percentDone := (float64(updatedRowsTotal) / float64(total)) * 100 if percentDone <= 100 { log.Infof( ctx, - "[updated %d rows] migrated approx. %.2f%% of statuses", - updatedRows, percentDone, + "[updated %d total rows, now @ ~%.0f rows/s] done ~%.2f%% of statuses", + updatedRowsTotal, rowsPerSecond, percentDone, ) } else { log.Infof( ctx, - "[updated %d rows] almost done migrating... ", - updatedRows, + "[updated %d total rows, now @ ~%.0f rows/s] almost done... ", + updatedRowsTotal, rowsPerSecond, ) } } @@ -185,13 +153,6 @@ func init() { return err } - log.Info(ctx, "dropping temporary thread_id_new index") - if _, err := db.NewDropIndex(). - Index("statuses_thread_id_new_idx"). - Exec(ctx); err != nil { - return gtserror.Newf("error dropping temporary thread_id_new index: %w", err) - } - log.Info(ctx, "dropping old thread_to_statuses table") if _, err := db.NewDropTable(). Table("thread_to_statuses"). From dd3a32acdb1c5f88f9198f8005ffbc4b111ccbfc Mon Sep 17 00:00:00 2001 From: tobi Date: Mon, 29 Sep 2025 11:59:35 +0200 Subject: [PATCH 06/11] few more little tweaks --- .../20250415111056_thread_all_statuses.go | 30 +++++++++++++++---- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go index bfa4dd84f..fada3ea29 100644 --- a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go +++ b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go @@ -136,13 +136,13 @@ func init() { if percentDone <= 100 { log.Infof( ctx, - "[updated %d total rows, now @ ~%.0f rows/s] done ~%.2f%% of statuses", + "[~%.0f rows/s; updated %d total rows] migrated ~%.2f%% of statuses", updatedRowsTotal, rowsPerSecond, percentDone, ) } else { log.Infof( ctx, - "[updated %d total rows, now @ ~%.0f rows/s] almost done... ", + "[~%.0f rows/s; updated %d total rows] almost done... ", updatedRowsTotal, rowsPerSecond, ) } @@ -268,14 +268,33 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu // This may have changed from // the initial batch selection // to the rethreadStatus() call. + upToDateValues := make(map[string]any, 3) if err := tx.NewSelect(). - Model(status). - Column("in_reply_to_id", "thread_id"). + TableExpr("? AS ?", bun.Ident("statuses"), bun.Ident("status")). + Column("in_reply_to_id", "thread_id", "thread_id_new"). Where("? = ?", bun.Ident("id"), status.ID). - Scan(ctx); err != nil { + Scan(ctx, &upToDateValues); err != nil { return 0, gtserror.Newf("error selecting status: %w", err) } + // If we've just threaded this status by setting + // thread_id_new, then by definition anything we + // could find from the entire thread must now be + // threaded, so we can save some database calls + // by skipping iterating up + down from here. + if v, ok := upToDateValues["thread_id_new"]; ok && v.(string) != id.Lowest { + log.Debug(ctx, "skipping just rethreaded status") + return 0, nil + } + + // Set up-to-date values on the status. + if inReplyToID, ok := upToDateValues["in_reply_to_id"]; ok && inReplyToID != nil { + status.InReplyToID = inReplyToID.(string) + } + if threadID, ok := upToDateValues["thread_id"]; ok && threadID != nil { + status.ThreadID = threadID.(string) + } + // status and thread ID cursor // index values. these are used // to keep track of newly loaded @@ -328,6 +347,7 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu // batch of statuses is already correctly // threaded. Then we have nothing to do! if sr.allThreaded && len(sr.threadIDs) == 1 { + log.Debug(ctx, "skipping just rethreaded thread") return 0, nil } From 5c000620e225b13faf1e0cac920b22b757e5c69d Mon Sep 17 00:00:00 2001 From: tobi Date: Mon, 29 Sep 2025 12:02:19 +0200 Subject: [PATCH 07/11] whoops --- .../bundb/migrations/20250415111056_thread_all_statuses.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go index fada3ea29..7b864a445 100644 --- a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go +++ b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go @@ -92,7 +92,7 @@ func init() { Column("id"). Where("? < ?", bun.Ident("id"), maxID). OrderExpr("? DESC", bun.Ident("id")). - Limit(250). + Limit(1000). Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) { return gtserror.Newf("error selecting unthreaded statuses: %w", err) } @@ -137,13 +137,13 @@ func init() { log.Infof( ctx, "[~%.0f rows/s; updated %d total rows] migrated ~%.2f%% of statuses", - updatedRowsTotal, rowsPerSecond, percentDone, + rowsPerSecond, updatedRowsTotal, percentDone, ) } else { log.Infof( ctx, "[~%.0f rows/s; updated %d total rows] almost done... ", - updatedRowsTotal, rowsPerSecond, + rowsPerSecond, updatedRowsTotal, ) } } From 4fd0bdcf2f180d66f03107e26fb270c1c7ec4d42 Mon Sep 17 00:00:00 2001 From: tobi Date: Mon, 29 Sep 2025 12:11:37 +0200 Subject: [PATCH 08/11] should be done poking now --- .../bundb/migrations/20250415111056_thread_all_statuses.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go index 7b864a445..0e67d1d7d 100644 --- a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go +++ b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go @@ -92,7 +92,7 @@ func init() { Column("id"). Where("? < ?", bun.Ident("id"), maxID). OrderExpr("? DESC", bun.Ident("id")). - Limit(1000). + Limit(200). Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) { return gtserror.Newf("error selecting unthreaded statuses: %w", err) } @@ -265,9 +265,6 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu // Ensure the passed status // has up-to-date information. - // This may have changed from - // the initial batch selection - // to the rethreadStatus() call. upToDateValues := make(map[string]any, 3) if err := tx.NewSelect(). TableExpr("? AS ?", bun.Ident("statuses"), bun.Ident("status")). From 1725769733615706c3736b84a7ef0126931c8d72 Mon Sep 17 00:00:00 2001 From: tobi Date: Mon, 29 Sep 2025 16:48:46 +0200 Subject: [PATCH 09/11] i'm adjusting the PR, pray i don't adjust it further --- .../20250415111056_thread_all_statuses.go | 200 +++++++++++++----- 1 file changed, 150 insertions(+), 50 deletions(-) diff --git a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go index 0e67d1d7d..a4b910fa6 100644 --- a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go +++ b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go @@ -26,7 +26,6 @@ import ( "strings" "time" - "code.superseriousbusiness.org/gotosocial/internal/db" newmodel "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new" oldmodel "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/old" "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/util" @@ -62,44 +61,80 @@ func init() { return gtserror.Newf("error adding statuses column thread_id_new: %w", err) } - var sr statusRethreader - var updatedRowsTotal int64 - var maxID string - var statuses []*oldmodel.Status + if err := doWALCheckpoint(ctx, db); err != nil { + return err + } - // Get a total count of all statuses before migration. - total, err := db.NewSelect().Table("statuses").Count(ctx) + // Get a total count of all + // statuses before migration. + totalStatuses, err := db. + NewSelect(). + Table("statuses"). + Count(ctx) if err != nil { return gtserror.Newf("error getting status table count: %w", err) } + log.Warnf(ctx, "migrating %d statuses total, this may take a *long* time", totalStatuses) - // Start at largest + var sr statusRethreader + var updatedRowsTotal int64 + var statuses []*oldmodel.Status + + // Page starting at largest // possible ULID value. - maxID = id.Highest + var maxID = id.Highest - log.Warnf(ctx, "migrating %d statuses, this may take a *long* time", total) - for { - start := time.Now() + // Open initial transaction. + tx, err := db.BeginTx(ctx, nil) + if err != nil { + return err + } + + for i := 1; ; i++ { // Reset slice. clear(statuses) statuses = statuses[:0] + start := time.Now() + // Select IDs of next // batch, paging down. - if err := db.NewSelect(). + if err := tx.NewSelect(). Model(&statuses). Column("id"). + Where("? IS NULL", bun.Ident("in_reply_to_id")). Where("? < ?", bun.Ident("id"), maxID). OrderExpr("? DESC", bun.Ident("id")). - Limit(200). + Limit(500). Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) { - return gtserror.Newf("error selecting unthreaded statuses: %w", err) + return gtserror.Newf("error selecting top-level statuses: %w", err) + } + + // Every 50 loops, flush wal and begin new + // transaction, to avoid silly wal sizes. + if i%50 == 0 { + if err := tx.Commit(); err != nil { + return err + } + + if err := doWALCheckpoint(ctx, db); err != nil { + return err + } + + tx, err = db.BeginTx(ctx, nil) + if err != nil { + return err + } } // No more statuses! l := len(statuses) if l == 0 { + if err := tx.Commit(); err != nil { + return err + } + log.Info(ctx, "done migrating statuses!") break } @@ -107,52 +142,117 @@ func init() { // Set next maxID value from statuses. maxID = statuses[l-1].ID - // Rethread each selected status in a transaction. + // Rethread inside the transaction. var updatedRowsThisBatch int64 - if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { - for _, status := range statuses { - n, err := sr.rethreadStatus(ctx, tx, status) - if err != nil { - return gtserror.Newf("error rethreading status %s: %w", status.URI, err) - } - updatedRowsThisBatch += n - updatedRowsTotal += n + for _, status := range statuses { + n, err := sr.rethreadStatus(ctx, tx, status) + if err != nil { + return gtserror.Newf("error rethreading status %s: %w", status.URI, err) } - - return nil - }); err != nil { - return err + updatedRowsThisBatch += n + updatedRowsTotal += n } - // Show current speed + percent migrated. - // - // Percent may end up wonky due to approximations - // and batching, so show a generic message at 100%. + // Show speed for this batch. timeTaken := time.Since(start).Milliseconds() msPerRow := float64(timeTaken) / float64(updatedRowsThisBatch) rowsPerMs := float64(1) / float64(msPerRow) rowsPerSecond := 1000 * rowsPerMs - percentDone := (float64(updatedRowsTotal) / float64(total)) * 100 - if percentDone <= 100 { - log.Infof( - ctx, - "[~%.0f rows/s; updated %d total rows] migrated ~%.2f%% of statuses", - rowsPerSecond, updatedRowsTotal, percentDone, - ) - } else { - log.Infof( - ctx, - "[~%.0f rows/s; updated %d total rows] almost done... ", - rowsPerSecond, updatedRowsTotal, - ) - } + + // Show percent migrated overall. + totalDone := (float64(updatedRowsTotal) / float64(totalStatuses)) * 100 + + log.Infof( + ctx, + "[~%.2f%% done; ~%.0f rows/s] paging top-level statuses", + totalDone, rowsPerSecond, + ) } - // Attempt to merge any sqlite write-ahead-log. if err := doWALCheckpoint(ctx, db); err != nil { return err } + // Open a new transaction lads. + tx, err = db.BeginTx(ctx, nil) + if err != nil { + return err + } + + for i := 1; ; i++ { + + // Reset slice. + clear(statuses) + statuses = statuses[:0] + + start := time.Now() + + // Select IDs of stragglers for + // which we haven't set thread_id yet. + if err := tx.NewSelect(). + Model(&statuses). + Column("id"). + Where("? IS NULL", bun.Ident("thread_id")). + Limit(250). + Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) { + return gtserror.Newf("error selecting unthreaded statuses: %w", err) + } + + // Every 50 loops, flush wal and begin new + // transaction, to avoid silly wal sizes. + if i%50 == 0 { + if err := tx.Commit(); err != nil { + return err + } + + if err := doWALCheckpoint(ctx, db); err != nil { + return err + } + + tx, err = db.BeginTx(ctx, nil) + if err != nil { + return err + } + } + + // No more statuses! + l := len(statuses) + if l == 0 { + if err := tx.Commit(); err != nil { + return err + } + + log.Info(ctx, "done migrating statuses!") + break + } + + // Rethread inside the transaction. + var updatedRowsThisBatch int64 + for _, status := range statuses { + n, err := sr.rethreadStatus(ctx, tx, status) + if err != nil { + return gtserror.Newf("error rethreading status %s: %w", status.URI, err) + } + updatedRowsThisBatch += n + updatedRowsTotal += n + } + + // Show speed for this batch. + timeTaken := time.Since(start).Milliseconds() + msPerRow := float64(timeTaken) / float64(updatedRowsThisBatch) + rowsPerMs := float64(1) / float64(msPerRow) + rowsPerSecond := 1000 * rowsPerMs + + // Show percent migrated overall. + totalDone := (float64(updatedRowsTotal) / float64(totalStatuses)) * 100 + + log.Infof( + ctx, + "[~%.2f%% done; ~%.0f rows/s] cleaning up stragglers", + totalDone, rowsPerSecond, + ) + } + log.Info(ctx, "dropping old thread_to_statuses table") if _, err := db.NewDropTable(). Table("thread_to_statuses"). @@ -496,7 +596,7 @@ func (sr *statusRethreader) getParents(ctx context.Context, tx bun.Tx) error { Model(&parent). Column("id", "in_reply_to_id", "thread_id"). Where("? = ?", bun.Ident("id"), id). - Scan(ctx); err != nil && err != db.ErrNoEntries { + Scan(ctx); err != nil && err != sql.ErrNoRows { return err } @@ -535,7 +635,7 @@ func (sr *statusRethreader) getChildren(ctx context.Context, tx bun.Tx, idx int) Model(&sr.statuses). Column("id", "thread_id"). Where("? = ?", bun.Ident("in_reply_to_id"), id). - Scan(ctx); err != nil && err != db.ErrNoEntries { + Scan(ctx); err != nil && err != sql.ErrNoRows { return err } @@ -574,7 +674,7 @@ func (sr *statusRethreader) getStragglers(ctx context.Context, tx bun.Tx, idx in bun.Ident("id"), bun.In(sr.statusIDs), ). - Scan(ctx); err != nil && err != db.ErrNoEntries { + Scan(ctx); err != nil && err != sql.ErrNoRows { return err } From 9c544e732cf38d79f83902c70fb836cfe1251e23 Mon Sep 17 00:00:00 2001 From: tobi Date: Mon, 29 Sep 2025 18:56:44 +0200 Subject: [PATCH 10/11] boobs --- .../20250415111056_thread_all_statuses.go | 26 ++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go index a4b910fa6..daf392ee6 100644 --- a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go +++ b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go @@ -106,7 +106,7 @@ func init() { Where("? IS NULL", bun.Ident("in_reply_to_id")). Where("? < ?", bun.Ident("id"), maxID). OrderExpr("? DESC", bun.Ident("id")). - Limit(500). + Limit(100). Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) { return gtserror.Newf("error selecting top-level statuses: %w", err) } @@ -173,6 +173,19 @@ func init() { return err } + // Reset max ID. + maxID = id.Highest + + // Create a temporary index on thread_id_new for stragglers. + log.Info(ctx, "creating temporary statuses thread_id_new index") + if _, err := db.NewCreateIndex(). + Table("statuses"). + Index("statuses_thread_id_new_idx"). + Column("thread_id_new"). + Exec(ctx); err != nil { + return gtserror.Newf("error creating new thread_id index: %w", err) + } + // Open a new transaction lads. tx, err = db.BeginTx(ctx, nil) if err != nil { @@ -192,8 +205,8 @@ func init() { if err := tx.NewSelect(). Model(&statuses). Column("id"). - Where("? IS NULL", bun.Ident("thread_id")). - Limit(250). + Where("? = ?", bun.Ident("thread_id_new"), id.Lowest). + Limit(500). Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) { return gtserror.Newf("error selecting unthreaded statuses: %w", err) } @@ -253,6 +266,13 @@ func init() { ) } + log.Info(ctx, "dropping temporary thread_id_new index") + if _, err := db.NewDropIndex(). + Index("statuses_thread_id_new_idx"). + Exec(ctx); err != nil { + return gtserror.Newf("error dropping temporary thread_id_new index: %w", err) + } + log.Info(ctx, "dropping old thread_to_statuses table") if _, err := db.NewDropTable(). Table("thread_to_statuses"). From 2563568ccce15de8925355ea4d29ea9d59ef6945 Mon Sep 17 00:00:00 2001 From: tobi Date: Tue, 30 Sep 2025 14:50:21 +0200 Subject: [PATCH 11/11] that'll do --- CONTRIBUTING.md | 32 ++-- .../20250415111056_thread_all_statuses.go | 150 +++++++++--------- 2 files changed, 86 insertions(+), 96 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 7359d65fa..10a05002c 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -538,23 +538,21 @@ It may be useful when testing or debugging migrations to be able to run them aga Basic steps for this: -1. Dump the Postgres database on the remote machine, and copy the dump over to your development machine. -2. Create a local Postgres container and mount the dump into it with, for example: - - ```bash - docker run -it --name postgres --network host -e POSTGRES_PASSWORD=postgres -v /path/to/db_dump:/db_dump postgres - ``` -3. Get a terminal inside the running container: - - ```bash - docker exec -it --user postgres postgres bash - ``` -4. Using that terminal, restore the dump (this will probably take a little while depending on the dump size and the specs of your machine): - - ```bash - psql -X postgres < /db_dump - ``` -5. With the Postgres container still running, run GoToSocial and point it towards the container. Use the appropriate `GTS_HOST` (and `GTS_ACCOUNT_DOMAIN`) values for the instance you dumped: +First dump the Postgres database on the remote machine, and copy the dump over to your development machine. + +Now create a local Postgres container and mount the dump into it with, for example: + +```bash +docker run -it --name postgres --network host -e POSTGRES_PASSWORD=postgres -v /path/to/db_dump:/db_dump postgres +``` + +In a separate terminal window, execute a command inside the running container to load the dump into the "postgres" database: + +```bash +docker exec -it --user postgres postgres psql -X -f /db_dump postgres +``` + +With the Postgres container still running, run GoToSocial and point it towards the container. Use the appropriate `GTS_HOST` (and `GTS_ACCOUNT_DOMAIN`) values for the instance you dumped: ```bash GTS_HOST=example.org \ diff --git a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go index daf392ee6..9115dfe90 100644 --- a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go +++ b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go @@ -96,7 +96,7 @@ func init() { clear(statuses) statuses = statuses[:0] - start := time.Now() + batchStart := time.Now() // Select IDs of next // batch, paging down. @@ -106,46 +106,51 @@ func init() { Where("? IS NULL", bun.Ident("in_reply_to_id")). Where("? < ?", bun.Ident("id"), maxID). OrderExpr("? DESC", bun.Ident("id")). - Limit(100). + Limit(500). Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) { - return gtserror.Newf("error selecting top-level statuses: %w", err) + return gtserror.Newf("error selecting statuses: %w", err) } - // Every 50 loops, flush wal and begin new - // transaction, to avoid silly wal sizes. - if i%50 == 0 { + l := len(statuses) + if l == 0 { + // No more statuses! + // + // Transaction will be closed + // after leaving the loop. + break + + } else if i%100 == 0 { + // Begin a new transaction every + // 100 batches (~50000 statuses), + // to avoid massive commits. + + // Close existing transaction. if err := tx.Commit(); err != nil { return err } + // Try to flush the wal + // to avoid silly wal sizes. if err := doWALCheckpoint(ctx, db); err != nil { return err } + // Open new transaction. tx, err = db.BeginTx(ctx, nil) if err != nil { return err } } - // No more statuses! - l := len(statuses) - if l == 0 { - if err := tx.Commit(); err != nil { - return err - } - - log.Info(ctx, "done migrating statuses!") - break - } - - // Set next maxID value from statuses. + // Set next maxID + // value from statuses. maxID = statuses[l-1].ID - // Rethread inside the transaction. + // Rethread using the + // open transaction. var updatedRowsThisBatch int64 for _, status := range statuses { - n, err := sr.rethreadStatus(ctx, tx, status) + n, err := sr.rethreadStatus(ctx, tx, status, false) if err != nil { return gtserror.Newf("error rethreading status %s: %w", status.URI, err) } @@ -154,7 +159,7 @@ func init() { } // Show speed for this batch. - timeTaken := time.Since(start).Milliseconds() + timeTaken := time.Since(batchStart).Milliseconds() msPerRow := float64(timeTaken) / float64(updatedRowsThisBatch) rowsPerMs := float64(1) / float64(msPerRow) rowsPerSecond := 1000 * rowsPerMs @@ -164,94 +169,73 @@ func init() { log.Infof( ctx, - "[~%.2f%% done; ~%.0f rows/s] paging top-level statuses", + "[~%.2f%% done; ~%.0f rows/s] migrating threads", totalDone, rowsPerSecond, ) } - if err := doWALCheckpoint(ctx, db); err != nil { + // Close transaction. + if err := tx.Commit(); err != nil { return err } - // Reset max ID. - maxID = id.Highest - - // Create a temporary index on thread_id_new for stragglers. + // Create a partial index on thread_id_new to find stragglers. + // This index will be removed at the end of the migration. log.Info(ctx, "creating temporary statuses thread_id_new index") if _, err := db.NewCreateIndex(). Table("statuses"). Index("statuses_thread_id_new_idx"). Column("thread_id_new"). + Where("? = ?", bun.Ident("thread_id_new"), id.Lowest). Exec(ctx); err != nil { return gtserror.Newf("error creating new thread_id index: %w", err) } - // Open a new transaction lads. - tx, err = db.BeginTx(ctx, nil) - if err != nil { - return err - } - for i := 1; ; i++ { // Reset slice. clear(statuses) statuses = statuses[:0] - start := time.Now() + batchStart := time.Now() - // Select IDs of stragglers for - // which we haven't set thread_id yet. - if err := tx.NewSelect(). + // Get stragglers for which + // we haven't set thread ID yet. + if err := db.NewSelect(). Model(&statuses). Column("id"). Where("? = ?", bun.Ident("thread_id_new"), id.Lowest). - Limit(500). + Limit(250). Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) { - return gtserror.Newf("error selecting unthreaded statuses: %w", err) + return gtserror.Newf("error selecting straggler: %w", err) } - // Every 50 loops, flush wal and begin new - // transaction, to avoid silly wal sizes. - if i%50 == 0 { - if err := tx.Commit(); err != nil { - return err - } - - if err := doWALCheckpoint(ctx, db); err != nil { - return err - } - - tx, err = db.BeginTx(ctx, nil) - if err != nil { - return err - } - } - - // No more statuses! - l := len(statuses) - if l == 0 { - if err := tx.Commit(); err != nil { - return err - } - - log.Info(ctx, "done migrating statuses!") + if len(statuses) == 0 { + // No more + // statuses! break } - // Rethread inside the transaction. + // Update this batch + // inside a transaction. var updatedRowsThisBatch int64 - for _, status := range statuses { - n, err := sr.rethreadStatus(ctx, tx, status) - if err != nil { - return gtserror.Newf("error rethreading status %s: %w", status.URI, err) + if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { + for _, status := range statuses { + + n, err := sr.rethreadStatus(ctx, tx, status, true) + if err != nil { + return gtserror.Newf("error rethreading status %s: %w", status.URI, err) + } + updatedRowsThisBatch += n + updatedRowsTotal += n } - updatedRowsThisBatch += n - updatedRowsTotal += n + return nil + }); err != nil { + return err } // Show speed for this batch. - timeTaken := time.Since(start).Milliseconds() + timeTaken := time.Since(batchStart).Milliseconds() msPerRow := float64(timeTaken) / float64(updatedRowsThisBatch) rowsPerMs := float64(1) / float64(msPerRow) rowsPerSecond := 1000 * rowsPerMs @@ -261,11 +245,16 @@ func init() { log.Infof( ctx, - "[~%.2f%% done; ~%.0f rows/s] cleaning up stragglers", + "[~%.2f%% done; ~%.0f rows/s] migrating stragglers", totalDone, rowsPerSecond, ) } + // Try to merge everything we've done so far. + if err := doWALCheckpoint(ctx, db); err != nil { + return err + } + log.Info(ctx, "dropping temporary thread_id_new index") if _, err := db.NewDropIndex(). Index("statuses_thread_id_new_idx"). @@ -363,7 +352,7 @@ type statusRethreader struct { // rethreadStatus is the main logic handler for statusRethreader{}. this is what gets called from the migration // in order to trigger a status rethreading operation for the given status, returning total number of rows changed. -func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, status *oldmodel.Status) (int64, error) { +func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, status *oldmodel.Status, straggler bool) (int64, error) { // Zero slice and // map ptr values. @@ -405,11 +394,11 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu } // Set up-to-date values on the status. - if inReplyToID, ok := upToDateValues["in_reply_to_id"]; ok && inReplyToID != nil { - status.InReplyToID = inReplyToID.(string) + if v, ok := upToDateValues["in_reply_to_id"]; ok && v != nil { + status.InReplyToID = v.(string) } - if threadID, ok := upToDateValues["thread_id"]; ok && threadID != nil { - status.ThreadID = threadID.(string) + if v, ok := upToDateValues["thread_id"]; ok && v != nil { + status.ThreadID = v.(string) } // status and thread ID cursor @@ -463,7 +452,10 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu // Check for the case where the entire // batch of statuses is already correctly // threaded. Then we have nothing to do! - if sr.allThreaded && len(sr.threadIDs) == 1 { + // + // Skip this check for straggler statuses + // that are part of broken threads. + if !straggler && sr.allThreaded && len(sr.threadIDs) == 1 { log.Debug(ctx, "skipping just rethreaded thread") return 0, nil }