some performance tweaks and adding more comments

This commit is contained in:
kim 2025-02-14 16:26:13 +00:00
commit 60fe595913

View file

@ -22,6 +22,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"reflect" "reflect"
"slices"
"strconv" "strconv"
"strings" "strings"
@ -89,7 +90,7 @@ func convertEnums[OldType ~string, NewType ~int16](
var qbuf byteutil.Buffer var qbuf byteutil.Buffer
// Prepare a singular UPDATE statement using // Prepare a singular UPDATE statement using
// SET $newColumn = (CASE $column WHEN $old THEN $new ... END) // SET $newColumn = (CASE $column WHEN $old THEN $new ... END).
qbuf.B = append(qbuf.B, "UPDATE ? SET ? = (CASE ? "...) qbuf.B = append(qbuf.B, "UPDATE ? SET ? = (CASE ? "...)
args = append(args, bun.Ident(table)) args = append(args, bun.Ident(table))
args = append(args, bun.Ident(newColumn)) args = append(args, bun.Ident(newColumn))
@ -100,56 +101,75 @@ func convertEnums[OldType ~string, NewType ~int16](
} }
qbuf.B = append(qbuf.B, "ELSE ? END)"...) qbuf.B = append(qbuf.B, "ELSE ? END)"...)
args = append(args, *defaultValue) args = append(args, *defaultValue)
qbuf.B = append(qbuf.B, " WHERE ? IN (?)"...)
args = append(args, bun.Ident(batchByColumn))
baseQ := qbuf.String()
var ( // Serialize it here to be
nextHighest = id.Highest // used as the base for each
updated int64 // set of batch queries below.
) baseQStr := string(qbuf.B)
baseArgs := args
// Query batch size
// in number of rows.
const batchsz = 5000
// Prepare storage slice for each
// returned batch of column values.
vals := make([]string, 0, batchsz)
// Stores highest batch value
// used in iterate queries,
// starting at highest possible.
highest := id.Highest
// Total updated rows.
var updated int
for { for {
batchQ := tx.NewRaw( // Reset values.
"SELECT ? FROM ? WHERE ? < ? ORDER BY ? DESC LIMIT ?", vals = vals[:0]
// SELECT next batch of column values to iteratively update since embedding
// it in the below UPDATE statement with RETURNING guarantees no return order.
if err := tx.NewRaw("SELECT ? FROM ? WHERE ? < ? ORDER BY ? DESC LIMIT ?",
bun.Ident(batchByColumn), bun.Ident(batchByColumn),
bun.Ident(table), bun.Ident(table),
bun.Ident(batchByColumn), bun.Ident(batchByColumn),
nextHighest, highest,
bun.Ident(batchByColumn), bun.Ident(batchByColumn),
5000, batchsz,
) ).Scan(ctx, &vals); err != nil {
return gtserror.Newf("error selecting batch: %w", err)
}
q := baseQ + " RETURNING ?" // Check if at end.
qArgs := append(args, batchQ) // nolint:gocritic if len(vals) == 0 {
qArgs = append(qArgs, bun.Ident(batchByColumn)) break
}
// Finalize UPDATE to operate on batch.
qStr := baseQStr + " WHERE ? IN (?)"
args := append(slices.Clone(baseArgs), bun.Ident(batchByColumn))
args = append(args, bun.In(vals))
// Execute the prepared raw query with arguments. // Execute the prepared raw query with arguments.
var ids []string _, err := tx.NewRaw(qStr, args...).Exec(ctx)
res, err := tx.NewRaw(q, qArgs...).Exec(ctx, &ids)
if err != nil { if err != nil {
return gtserror.Newf("error updating old column values: %w", err) return gtserror.Newf("error updating old column values: %w", err)
} }
// Count number items updated. // Update the count.
thisUpdated, _ := res.RowsAffected() updated += len(vals)
if thisUpdated == 0 {
break
}
updated += thisUpdated log.Infof(ctx, "updated %d of %d %s (up to %s)",
highestID := ids[0] updated, total, table, highest)
lowestID := ids[len(ids)-1]
log.Infof(ctx,
"updated %d of %d %s (just done from %s to %s)",
updated, total, table, highestID, lowestID,
)
nextHighest = lowestID // Get next highest.
highest = vals[0]
} }
if total != int(updated) { if total != int(updated) {
log.Warnf(ctx, "total=%d does not match updated=%d", total, updated) // Return error here in order to rollback the whole transaction.
return fmt.Errorf("total=%d does not match updated=%d", total, updated)
} }
// Run index cleanup callback if set. // Run index cleanup callback if set.