From e7cd8bb43ef3a17b0a856f45b64bdc2c8336ba44 Mon Sep 17 00:00:00 2001 From: tobi Date: Fri, 3 Oct 2025 12:28:55 +0200 Subject: [PATCH 1/3] [chore] Use bulk updates + fewer loops in status rethreading migration (#4459) This pull request tries to optimize our status rethreading migration by using bulk updates + avoiding unnecessary writes, and doing the migration in one top-level loop and one stragglers loop, without the extra loop to copy thread_id over. On my machine it runs at about 2400 rows per second on Postgres, now, and about 9000 rows per second on SQLite. Tried *many* different ways of doing this, with and without temporary indexes, with different batch and transaction sizes, etc., and this seems to be just about the most performant way of getting stuff done. With the changes, a few minutes have been shaved off migration time testing on my development machine. *Hopefully* this will translate to more time shaved off when running on a vps with slower read/write speed and less processor power. SQLite before: ``` real 20m58,446s user 16m26,635s sys 5m53,648s ``` SQLite after: ``` real 14m25,435s user 12m47,449s sys 2m27,898s ``` Postgres before: ``` real 28m25,307s user 3m40,005s sys 4m45,018s ``` Postgres after: ``` real 22m31,999s user 3m46,674s sys 4m39,592s ``` Reviewed-on: https://codeberg.org/superseriousbusiness/gotosocial/pulls/4459 Co-authored-by: tobi Co-committed-by: tobi --- CONTRIBUTING.md | 37 ++ .../20250415111056_thread_all_statuses.go | 361 +++++++++++++----- .../new/status.go | 78 ++-- .../old/status.go | 8 +- .../util/utiltypes.go | 24 ++ internal/db/bundb/migrations/util.go | 92 ----- internal/gtsmodel/status.go | 100 ++--- 7 files changed, 429 insertions(+), 271 deletions(-) create mode 100644 internal/db/bundb/migrations/20250415111056_thread_all_statuses/util/utiltypes.go diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 611bea97c..10a05002c 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -33,6 +33,8 @@ These contribution guidelines were adapted from / inspired by those of Gitea (ht - [Federation](#federation) - [Updating Swagger docs](#updating-swagger-docs) - [CI/CD configuration](#ci-cd-configuration) +- [Other Useful Stuff](#other-useful-stuff) + - [Running migrations on a Postgres DB backup locally](#running-migrations-on-a-postgres-db-backup-locally) ## Introduction @@ -525,3 +527,38 @@ The `woodpecker` pipeline files are in the `.woodpecker` directory of this repos The Woodpecker instance for GoToSocial is [here](https://woodpecker.superseriousbusiness.org/repos/2). Documentation for Woodpecker is [here](https://woodpecker-ci.org/docs/intro). + +## Other Useful Stuff + +Various bits and bobs. + +### Running migrations on a Postgres DB backup locally + +It may be useful when testing or debugging migrations to be able to run them against a copy of a real instance's Postgres database locally. + +Basic steps for this: + +First dump the Postgres database on the remote machine, and copy the dump over to your development machine. + +Now create a local Postgres container and mount the dump into it with, for example: + +```bash +docker run -it --name postgres --network host -e POSTGRES_PASSWORD=postgres -v /path/to/db_dump:/db_dump postgres +``` + +In a separate terminal window, execute a command inside the running container to load the dump into the "postgres" database: + +```bash +docker exec -it --user postgres postgres psql -X -f /db_dump postgres +``` + +With the Postgres container still running, run GoToSocial and point it towards the container. Use the appropriate `GTS_HOST` (and `GTS_ACCOUNT_DOMAIN`) values for the instance you dumped: + + ```bash + GTS_HOST=example.org \ + GTS_DB_TYPE=postgres \ + GTS_DB_POSTGRES_CONNECTION_STRING=postgres://postgres:postgres@localhost:5432/postgres \ + ./gotosocial migrations run + ``` + +When you're done messing around, don't forget to remove any containers that you started up, and remove any lingering volumes with `docker volume prune`, else you might end up filling your disk with unused temporary volumes. diff --git a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go index fc02d1e40..2a3808120 100644 --- a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go +++ b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go @@ -24,13 +24,16 @@ import ( "reflect" "slices" "strings" + "time" "code.superseriousbusiness.org/gotosocial/internal/db" newmodel "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new" oldmodel "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/old" + "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/util" "code.superseriousbusiness.org/gotosocial/internal/gtserror" "code.superseriousbusiness.org/gotosocial/internal/id" "code.superseriousbusiness.org/gotosocial/internal/log" + "code.superseriousbusiness.org/gotosocial/internal/util/xslices" "github.com/uptrace/bun" ) @@ -49,10 +52,26 @@ func init() { "thread_id", "thread_id_new", 1) var sr statusRethreader - var count int + var updatedTotal int64 var maxID string var statuses []*oldmodel.Status + // Create thread_id_new already + // so we can populate it as we go. + log.Info(ctx, "creating statuses column thread_id_new") + if _, err := db.NewAddColumn(). + Table("statuses"). + ColumnExpr(newColDef). + Exec(ctx); err != nil { + return gtserror.Newf("error adding statuses column thread_id_new: %w", err) + } + + // Try to merge the wal so we're + // not working on the wal file. + if err := doWALCheckpoint(ctx, db); err != nil { + return err + } + // Get a total count of all statuses before migration. total, err := db.NewSelect().Table("statuses").Count(ctx) if err != nil { @@ -63,74 +82,129 @@ func init() { // possible ULID value. maxID = id.Highest - log.Warn(ctx, "rethreading top-level statuses, this will take a *long* time") - for /* TOP LEVEL STATUS LOOP */ { + log.Warnf(ctx, "rethreading %d statuses, this will take a *long* time", total) + + // Open initial transaction. + tx, err := db.BeginTx(ctx, nil) + if err != nil { + return err + } + + for i := 1; ; i++ { // Reset slice. clear(statuses) statuses = statuses[:0] - // Select top-level statuses. - if err := db.NewSelect(). - Model(&statuses). - Column("id", "thread_id"). + batchStart := time.Now() + // Select top-level statuses. + if err := tx.NewSelect(). + Model(&statuses). + Column("id"). // We specifically use in_reply_to_account_id instead of in_reply_to_id as // they should both be set / unset in unison, but we specifically have an // index on in_reply_to_account_id with ID ordering, unlike in_reply_to_id. Where("? IS NULL", bun.Ident("in_reply_to_account_id")). Where("? < ?", bun.Ident("id"), maxID). OrderExpr("? DESC", bun.Ident("id")). - Limit(5000). + Limit(500). Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) { return gtserror.Newf("error selecting top level statuses: %w", err) } - // Reached end of block. - if len(statuses) == 0 { + l := len(statuses) + if l == 0 { + // No more statuses! + // + // Transaction will be closed + // after leaving the loop. break + + } else if i%200 == 0 { + // Begin a new transaction every + // 200 batches (~100,000 statuses), + // to avoid massive commits. + + // Close existing transaction. + if err := tx.Commit(); err != nil { + return err + } + + // Try to flush the wal + // to avoid silly wal sizes. + if err := doWALCheckpoint(ctx, db); err != nil { + return err + } + + // Open new transaction. + tx, err = db.BeginTx(ctx, nil) + if err != nil { + return err + } } // Set next maxID value from statuses. maxID = statuses[len(statuses)-1].ID - // Rethread each selected batch of top-level statuses in a transaction. - if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { - - // Rethread each top-level status. - for _, status := range statuses { - n, err := sr.rethreadStatus(ctx, tx, status) - if err != nil { - return gtserror.Newf("error rethreading status %s: %w", status.URI, err) - } - count += n + // Rethread using the + // open transaction. + var updatedInBatch int64 + for _, status := range statuses { + n, err := sr.rethreadStatus(ctx, tx, status, false) + if err != nil { + return gtserror.Newf("error rethreading status %s: %w", status.URI, err) } - - return nil - }); err != nil { - return err + updatedInBatch += n + updatedTotal += n } - log.Infof(ctx, "[approx %d of %d] rethreading statuses (top-level)", count, total) + // Show speed for this batch. + timeTaken := time.Since(batchStart).Milliseconds() + msPerRow := float64(timeTaken) / float64(updatedInBatch) + rowsPerMs := float64(1) / float64(msPerRow) + rowsPerSecond := 1000 * rowsPerMs + + // Show percent migrated overall. + totalDone := (float64(updatedTotal) / float64(total)) * 100 + + log.Infof( + ctx, + "[~%.2f%% done; ~%.0f rows/s] migrating threads", + totalDone, rowsPerSecond, + ) } - // Attempt to merge any sqlite write-ahead-log. - if err := doWALCheckpoint(ctx, db); err != nil { + // Close transaction. + if err := tx.Commit(); err != nil { return err } - log.Warn(ctx, "rethreading straggler statuses, this will take a *long* time") - for /* STRAGGLER STATUS LOOP */ { + // Create a partial index on thread_id_new to find stragglers. + // This index will be removed at the end of the migration. + log.Info(ctx, "creating temporary statuses thread_id_new index") + if _, err := db.NewCreateIndex(). + Table("statuses"). + Index("statuses_thread_id_new_idx"). + Column("thread_id_new"). + Where("? = ?", bun.Ident("thread_id_new"), id.Lowest). + Exec(ctx); err != nil { + return gtserror.Newf("error creating new thread_id index: %w", err) + } + + for i := 1; ; i++ { // Reset slice. clear(statuses) statuses = statuses[:0] + batchStart := time.Now() + // Select straggler statuses. if err := db.NewSelect(). Model(&statuses). - Column("id", "in_reply_to_id", "thread_id"). - Where("? IS NULL", bun.Ident("thread_id")). + Column("id"). + Where("? = ?", bun.Ident("thread_id_new"), id.Lowest). // We select in smaller batches for this part // of the migration as there is a chance that @@ -138,7 +212,7 @@ func init() { // part of the same thread, i.e. one call to // rethreadStatus() may effect other statuses // later in the slice. - Limit(1000). + Limit(250). Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) { return gtserror.Newf("error selecting straggler statuses: %w", err) } @@ -149,23 +223,35 @@ func init() { } // Rethread each selected batch of straggler statuses in a transaction. + var updatedInBatch int64 if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { - - // Rethread each top-level status. for _, status := range statuses { - n, err := sr.rethreadStatus(ctx, tx, status) + n, err := sr.rethreadStatus(ctx, tx, status, true) if err != nil { return gtserror.Newf("error rethreading status %s: %w", status.URI, err) } - count += n + updatedInBatch += n + updatedTotal += n } - return nil }); err != nil { return err } - log.Infof(ctx, "[approx %d of %d] rethreading statuses (stragglers)", count, total) + // Show speed for this batch. + timeTaken := time.Since(batchStart).Milliseconds() + msPerRow := float64(timeTaken) / float64(updatedInBatch) + rowsPerMs := float64(1) / float64(msPerRow) + rowsPerSecond := 1000 * rowsPerMs + + // Show percent migrated overall. + totalDone := (float64(updatedTotal) / float64(total)) * 100 + + log.Infof( + ctx, + "[~%.2f%% done; ~%.0f rows/s] migrating stragglers", + totalDone, rowsPerSecond, + ) } // Attempt to merge any sqlite write-ahead-log. @@ -173,6 +259,13 @@ func init() { return err } + log.Info(ctx, "dropping temporary thread_id_new index") + if _, err := db.NewDropIndex(). + Index("statuses_thread_id_new_idx"). + Exec(ctx); err != nil { + return gtserror.Newf("error dropping temporary thread_id_new index: %w", err) + } + log.Info(ctx, "dropping old thread_to_statuses table") if _, err := db.NewDropTable(). Table("thread_to_statuses"). @@ -180,33 +273,6 @@ func init() { return gtserror.Newf("error dropping old thread_to_statuses table: %w", err) } - log.Info(ctx, "creating new statuses thread_id column") - if _, err := db.NewAddColumn(). - Table("statuses"). - ColumnExpr(newColDef). - Exec(ctx); err != nil { - return gtserror.Newf("error adding new thread_id column: %w", err) - } - - log.Info(ctx, "setting thread_id_new = thread_id (this may take a while...)") - if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { - return batchUpdateByID(ctx, tx, - "statuses", // table - "id", // batchByCol - "UPDATE ? SET ? = ?", // updateQuery - []any{bun.Ident("statuses"), - bun.Ident("thread_id_new"), - bun.Ident("thread_id")}, - ) - }); err != nil { - return err - } - - // Attempt to merge any sqlite write-ahead-log. - if err := doWALCheckpoint(ctx, db); err != nil { - return err - } - log.Info(ctx, "dropping old statuses thread_id index") if _, err := db.NewDropIndex(). Index("statuses_thread_id_idx"). @@ -274,6 +340,11 @@ type statusRethreader struct { // its contents are ephemeral. statuses []*oldmodel.Status + // newThreadIDSet is used to track whether + // statuses in statusIDs have already have + // thread_id_new set on them. + newThreadIDSet map[string]struct{} + // seenIDs tracks the unique status and // thread IDs we have seen, ensuring we // don't append duplicates to statusIDs @@ -289,14 +360,15 @@ type statusRethreader struct { } // rethreadStatus is the main logic handler for statusRethreader{}. this is what gets called from the migration -// in order to trigger a status rethreading operation for the given status, returning total number rethreaded. -func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, status *oldmodel.Status) (int, error) { +// in order to trigger a status rethreading operation for the given status, returning total number of rows changed. +func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, status *oldmodel.Status, straggler bool) (int64, error) { // Zero slice and // map ptr values. clear(sr.statusIDs) clear(sr.threadIDs) clear(sr.statuses) + clear(sr.newThreadIDSet) clear(sr.seenIDs) // Reset slices and values for use. @@ -305,6 +377,11 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu sr.statuses = sr.statuses[:0] sr.allThreaded = true + if sr.newThreadIDSet == nil { + // Allocate new hash set for newThreadIDSet. + sr.newThreadIDSet = make(map[string]struct{}) + } + if sr.seenIDs == nil { // Allocate new hash set for status IDs. sr.seenIDs = make(map[string]struct{}) @@ -317,12 +394,22 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu // to the rethreadStatus() call. if err := tx.NewSelect(). Model(status). - Column("in_reply_to_id", "thread_id"). + Column("in_reply_to_id", "thread_id", "thread_id_new"). Where("? = ?", bun.Ident("id"), status.ID). Scan(ctx); err != nil { return 0, gtserror.Newf("error selecting status: %w", err) } + // If we've just threaded this status by setting + // thread_id_new, then by definition anything we + // could find from the entire thread must now be + // threaded, so we can save some database calls + // by skipping iterating up + down from here. + if status.ThreadIDNew != id.Lowest { + log.Debugf(ctx, "skipping just rethreaded status: %s", status.ID) + return 0, nil + } + // status and thread ID cursor // index values. these are used // to keep track of newly loaded @@ -371,14 +458,14 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu threadIdx = len(sr.threadIDs) } - // Total number of - // statuses threaded. - total := len(sr.statusIDs) - // Check for the case where the entire // batch of statuses is already correctly // threaded. Then we have nothing to do! - if sr.allThreaded && len(sr.threadIDs) == 1 { + // + // Skip this check for straggler statuses + // that are part of broken threads. + if !straggler && sr.allThreaded && len(sr.threadIDs) == 1 { + log.Debug(ctx, "skipping just rethreaded thread") return 0, nil } @@ -417,36 +504,120 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu } } - // Update all the statuses to - // use determined thread_id. - if _, err := tx.NewUpdate(). - Table("statuses"). - Where("? IN (?)", bun.Ident("id"), bun.In(sr.statusIDs)). - Set("? = ?", bun.Ident("thread_id"), threadID). - Exec(ctx); err != nil { + var ( + res sql.Result + err error + ) + + if len(sr.statusIDs) == 1 { + + // If we're only updating one status + // we can use a simple update query. + res, err = tx.NewUpdate(). + // Update the status model. + TableExpr("? AS ?", bun.Ident("statuses"), bun.Ident("status")). + // Set the new thread ID, which we can use as + // an indication that we've migrated this batch. + Set("? = ?", bun.Ident("thread_id_new"), threadID). + // While we're here, also set old thread_id, as + // we'll use it for further rethreading purposes. + Set("? = ?", bun.Ident("thread_id"), threadID). + Where("? = ?", bun.Ident("status.id"), sr.statusIDs[0]). + Exec(ctx) + + } else { + + // If we're updating multiple statuses at once, + // build up a common table expression to update + // all statuses in this thread to use threadID. + // + // This ought to be a little more readable than + // using an "IN(*)" query, and PG or SQLite *may* + // be able to optimize it better. + // + // See: + // + // - https://sqlite.org/lang_with.html + // - https://www.postgresql.org/docs/current/queries-with.html + // - https://bun.uptrace.dev/guide/query-update.html#bulk-update + values := make([]*util.Status, 0, len(sr.statusIDs)) + for _, statusID := range sr.statusIDs { + // Filter out statusIDs that have already had + // thread_id_new set, to avoid spurious writes. + if _, set := sr.newThreadIDSet[statusID]; !set { + values = append(values, &util.Status{ + ID: statusID, + }) + } + } + + // Resulting query will look something like this: + // + // WITH "_data" ("id") AS ( + // VALUES + // ('01JR6PZED0DDR2VZHQ8H87ZW98'), + // ('01JR6PZED0J91MJCAFDTCCCG8Q') + // ) + // UPDATE "statuses" AS "status" + // SET + // "thread_id_new" = '01K6MGKX54BBJ3Y1FBPQY45E5P', + // "thread_id" = '01K6MGKX54BBJ3Y1FBPQY45E5P' + // FROM _data + // WHERE "status"."id" = "_data"."id" + res, err = tx.NewUpdate(). + // Update the status model. + Model((*oldmodel.Status)(nil)). + // Provide the CTE values as "_data". + With("_data", tx.NewValues(&values)). + // Include `FROM _data` statement so we can use + // `_data` table in SET and WHERE components. + TableExpr("_data"). + // Set the new thread ID, which we can use as + // an indication that we've migrated this batch. + Set("? = ?", bun.Ident("thread_id_new"), threadID). + // While we're here, also set old thread_id, as + // we'll use it for further rethreading purposes. + Set("? = ?", bun.Ident("thread_id"), threadID). + // "Join" to the CTE on status ID. + Where("? = ?", bun.Ident("status.id"), bun.Ident("_data.id")). + Exec(ctx) + } + + if err != nil { return 0, gtserror.Newf("error updating status thread ids: %w", err) } + rowsAffected, err := res.RowsAffected() + if err != nil { + return 0, gtserror.Newf("error counting rows affected: %w", err) + } + if len(sr.threadIDs) > 0 { // Update any existing thread // mutes to use latest thread_id. + + // Dedupe thread IDs before query + // to avoid ludicrous "IN" clause. + threadIDs := sr.threadIDs + threadIDs = xslices.Deduplicate(threadIDs) if _, err := tx.NewUpdate(). Table("thread_mutes"). - Where("? IN (?)", bun.Ident("thread_id"), bun.In(sr.threadIDs)). + Where("? IN (?)", bun.Ident("thread_id"), bun.In(threadIDs)). Set("? = ?", bun.Ident("thread_id"), threadID). Exec(ctx); err != nil { return 0, gtserror.Newf("error updating mute thread ids: %w", err) } } - return total, nil + return rowsAffected, nil } // append will append the given status to the internal tracking of statusRethreader{} for // potential future operations, checking for uniqueness. it tracks the inReplyToID value // for the next call to getParents(), it tracks the status ID for list of statuses that -// need updating, the thread ID for the list of thread links and mutes that need updating, -// and whether all the statuses all have a provided thread ID (i.e. allThreaded). +// may need updating, whether a new thread ID has been set for each status, the thread ID +// for the list of thread links and mutes that need updating, and whether all the statuses +// all have a provided thread ID (i.e. allThreaded). func (sr *statusRethreader) append(status *oldmodel.Status) { // Check if status already seen before. @@ -479,7 +650,14 @@ func (sr *statusRethreader) append(status *oldmodel.Status) { } // Add status ID to map of seen IDs. - sr.seenIDs[status.ID] = struct{}{} + mark := struct{}{} + sr.seenIDs[status.ID] = mark + + // If new thread ID has already been + // set, add status ID to map of set IDs. + if status.ThreadIDNew != id.Lowest { + sr.newThreadIDSet[status.ID] = mark + } } func (sr *statusRethreader) getParents(ctx context.Context, tx bun.Tx) error { @@ -496,7 +674,7 @@ func (sr *statusRethreader) getParents(ctx context.Context, tx bun.Tx) error { // Select next parent status. if err := tx.NewSelect(). Model(&parent). - Column("id", "in_reply_to_id", "thread_id"). + Column("id", "in_reply_to_id", "thread_id", "thread_id_new"). Where("? = ?", bun.Ident("id"), id). Scan(ctx); err != nil && err != db.ErrNoEntries { return err @@ -535,7 +713,7 @@ func (sr *statusRethreader) getChildren(ctx context.Context, tx bun.Tx, idx int) // Select children of ID. if err := tx.NewSelect(). Model(&sr.statuses). - Column("id", "thread_id"). + Column("id", "thread_id", "thread_id_new"). Where("? = ?", bun.Ident("in_reply_to_id"), id). Scan(ctx); err != nil && err != db.ErrNoEntries { return err @@ -560,14 +738,19 @@ func (sr *statusRethreader) getStragglers(ctx context.Context, tx bun.Tx, idx in clear(sr.statuses) sr.statuses = sr.statuses[:0] + // Dedupe thread IDs before query + // to avoid ludicrous "IN" clause. + threadIDs := sr.threadIDs[idx:] + threadIDs = xslices.Deduplicate(threadIDs) + // Select stragglers that // also have thread IDs. if err := tx.NewSelect(). Model(&sr.statuses). - Column("id", "thread_id", "in_reply_to_id"). + Column("id", "thread_id", "in_reply_to_id", "thread_id_new"). Where("? IN (?) AND ? NOT IN (?)", bun.Ident("thread_id"), - bun.In(sr.threadIDs[idx:]), + bun.In(threadIDs), bun.Ident("id"), bun.In(sr.statusIDs), ). diff --git a/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new/status.go b/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new/status.go index a03e93859..0f20bb7a4 100644 --- a/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new/status.go +++ b/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new/status.go @@ -23,45 +23,45 @@ import ( // Status represents a user-created 'post' or 'status' in the database, either remote or local type Status struct { - ID string `bun:"type:CHAR(26),pk,nullzero,notnull,unique"` // id of this item in the database - CreatedAt time.Time `bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"` // when was item created - EditedAt time.Time `bun:"type:timestamptz,nullzero"` // when this status was last edited (if set) - FetchedAt time.Time `bun:"type:timestamptz,nullzero"` // when was item (remote) last fetched. - PinnedAt time.Time `bun:"type:timestamptz,nullzero"` // Status was pinned by owning account at this time. - URI string `bun:",unique,nullzero,notnull"` // activitypub URI of this status - URL string `bun:",nullzero"` // web url for viewing this status - Content string `bun:""` // Content HTML for this status. - AttachmentIDs []string `bun:"attachments,array"` // Database IDs of any media attachments associated with this status - TagIDs []string `bun:"tags,array"` // Database IDs of any tags used in this status - MentionIDs []string `bun:"mentions,array"` // Database IDs of any mentions in this status - EmojiIDs []string `bun:"emojis,array"` // Database IDs of any emojis used in this status - Local *bool `bun:",nullzero,notnull,default:false"` // is this status from a local account? - AccountID string `bun:"type:CHAR(26),nullzero,notnull"` // which account posted this status? - AccountURI string `bun:",nullzero,notnull"` // activitypub uri of the owner of this status - InReplyToID string `bun:"type:CHAR(26),nullzero"` // id of the status this status replies to - InReplyToURI string `bun:",nullzero"` // activitypub uri of the status this status is a reply to - InReplyToAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that this status replies to - InReplyTo *Status `bun:"-"` // status corresponding to inReplyToID - BoostOfID string `bun:"type:CHAR(26),nullzero"` // id of the status this status is a boost of - BoostOfURI string `bun:"-"` // URI of the status this status is a boost of; field not inserted in the db, just for dereferencing purposes. - BoostOfAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that owns the boosted status - BoostOf *Status `bun:"-"` // status that corresponds to boostOfID - ThreadID string `bun:"type:CHAR(26),nullzero,notnull,default:00000000000000000000000000"` // id of the thread to which this status belongs - EditIDs []string `bun:"edits,array"` // - PollID string `bun:"type:CHAR(26),nullzero"` // - ContentWarning string `bun:",nullzero"` // Content warning HTML for this status. - ContentWarningText string `bun:""` // Original text of the content warning without formatting - Visibility Visibility `bun:",nullzero,notnull"` // visibility entry for this status - Sensitive *bool `bun:",nullzero,notnull,default:false"` // mark the status as sensitive? - Language string `bun:",nullzero"` // what language is this status written in? - CreatedWithApplicationID string `bun:"type:CHAR(26),nullzero"` // Which application was used to create this status? - ActivityStreamsType string `bun:",nullzero,notnull"` // What is the activitystreams type of this status? See: https://www.w3.org/TR/activitystreams-vocabulary/#object-types. Will probably almost always be Note but who knows!. - Text string `bun:""` // Original text of the status without formatting - ContentType StatusContentType `bun:",nullzero"` // Content type used to process the original text of the status - Federated *bool `bun:",notnull"` // This status will be federated beyond the local timeline(s) - PendingApproval *bool `bun:",nullzero,notnull,default:false"` // If true then status is a reply or boost wrapper that must be Approved by the reply-ee or boost-ee before being fully distributed. - PreApproved bool `bun:"-"` // If true, then status is a reply to or boost wrapper of a status on our instance, has permission to do the interaction, and an Accept should be sent out for it immediately. Field not stored in the DB. - ApprovedByURI string `bun:",nullzero"` // URI of an Accept Activity that approves the Announce or Create Activity that this status was/will be attached to. + ID string `bun:"type:CHAR(26),pk,nullzero,notnull,unique"` // id of this item in the database + CreatedAt time.Time `bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"` // when was item created + EditedAt time.Time `bun:"type:timestamptz,nullzero"` // when this status was last edited (if set) + FetchedAt time.Time `bun:"type:timestamptz,nullzero"` // when was item (remote) last fetched. + PinnedAt time.Time `bun:"type:timestamptz,nullzero"` // Status was pinned by owning account at this time. + URI string `bun:",unique,nullzero,notnull"` // activitypub URI of this status + URL string `bun:",nullzero"` // web url for viewing this status + Content string `bun:""` // Content HTML for this status. + AttachmentIDs []string `bun:"attachments,array"` // Database IDs of any media attachments associated with this status + TagIDs []string `bun:"tags,array"` // Database IDs of any tags used in this status + MentionIDs []string `bun:"mentions,array"` // Database IDs of any mentions in this status + EmojiIDs []string `bun:"emojis,array"` // Database IDs of any emojis used in this status + Local *bool `bun:",nullzero,notnull,default:false"` // is this status from a local account? + AccountID string `bun:"type:CHAR(26),nullzero,notnull"` // which account posted this status? + AccountURI string `bun:",nullzero,notnull"` // activitypub uri of the owner of this status + InReplyToID string `bun:"type:CHAR(26),nullzero"` // id of the status this status replies to + InReplyToURI string `bun:",nullzero"` // activitypub uri of the status this status is a reply to + InReplyToAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that this status replies to + InReplyTo *Status `bun:"-"` // status corresponding to inReplyToID + BoostOfID string `bun:"type:CHAR(26),nullzero"` // id of the status this status is a boost of + BoostOfURI string `bun:"-"` // URI of the status this status is a boost of; field not inserted in the db, just for dereferencing purposes. + BoostOfAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that owns the boosted status + BoostOf *Status `bun:"-"` // status that corresponds to boostOfID + ThreadID string `bun:"type:CHAR(26),nullzero,notnull,default:'00000000000000000000000000'"` // id of the thread to which this status belongs + EditIDs []string `bun:"edits,array"` // + PollID string `bun:"type:CHAR(26),nullzero"` // + ContentWarning string `bun:",nullzero"` // Content warning HTML for this status. + ContentWarningText string `bun:""` // Original text of the content warning without formatting + Visibility Visibility `bun:",nullzero,notnull"` // visibility entry for this status + Sensitive *bool `bun:",nullzero,notnull,default:false"` // mark the status as sensitive? + Language string `bun:",nullzero"` // what language is this status written in? + CreatedWithApplicationID string `bun:"type:CHAR(26),nullzero"` // Which application was used to create this status? + ActivityStreamsType string `bun:",nullzero,notnull"` // What is the activitystreams type of this status? See: https://www.w3.org/TR/activitystreams-vocabulary/#object-types. Will probably almost always be Note but who knows!. + Text string `bun:""` // Original text of the status without formatting + ContentType StatusContentType `bun:",nullzero"` // Content type used to process the original text of the status + Federated *bool `bun:",notnull"` // This status will be federated beyond the local timeline(s) + PendingApproval *bool `bun:",nullzero,notnull,default:false"` // If true then status is a reply or boost wrapper that must be Approved by the reply-ee or boost-ee before being fully distributed. + PreApproved bool `bun:"-"` // If true, then status is a reply to or boost wrapper of a status on our instance, has permission to do the interaction, and an Accept should be sent out for it immediately. Field not stored in the DB. + ApprovedByURI string `bun:",nullzero"` // URI of an Accept Activity that approves the Announce or Create Activity that this status was/will be attached to. } // enumType is the type we (at least, should) use diff --git a/internal/db/bundb/migrations/20250415111056_thread_all_statuses/old/status.go b/internal/db/bundb/migrations/20250415111056_thread_all_statuses/old/status.go index f33a2b29e..8cfce2e6b 100644 --- a/internal/db/bundb/migrations/20250415111056_thread_all_statuses/old/status.go +++ b/internal/db/bundb/migrations/20250415111056_thread_all_statuses/old/status.go @@ -21,7 +21,10 @@ import ( "time" ) -// Status represents a user-created 'post' or 'status' in the database, either remote or local +// Status represents a user-created 'post' or 'status' in the database, either remote or local. +// +// Note: this model differs from an exact representation of the old model at the time of migration, +// as it includes the intermediate field "ThreadIDNew", which is only used during the migration. type Status struct { ID string `bun:"type:CHAR(26),pk,nullzero,notnull,unique"` // id of this item in the database CreatedAt time.Time `bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"` // when was item created @@ -60,6 +63,9 @@ type Status struct { PendingApproval *bool `bun:",nullzero,notnull,default:false"` // If true then status is a reply or boost wrapper that must be Approved by the reply-ee or boost-ee before being fully distributed. PreApproved bool `bun:"-"` // If true, then status is a reply to or boost wrapper of a status on our instance, has permission to do the interaction, and an Accept should be sent out for it immediately. Field not stored in the DB. ApprovedByURI string `bun:",nullzero"` // URI of an Accept Activity that approves the Announce or Create Activity that this status was/will be attached to. + + // This field is *only* used during the migration, it was not on the original status model. + ThreadIDNew string `bun:"type:CHAR(26),nullzero,notnull,default:'00000000000000000000000000'"` } // enumType is the type we (at least, should) use diff --git a/internal/db/bundb/migrations/20250415111056_thread_all_statuses/util/utiltypes.go b/internal/db/bundb/migrations/20250415111056_thread_all_statuses/util/utiltypes.go new file mode 100644 index 000000000..0acb18e21 --- /dev/null +++ b/internal/db/bundb/migrations/20250415111056_thread_all_statuses/util/utiltypes.go @@ -0,0 +1,24 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package util + +// Status is a helper type specifically +// for updating the thread ID of a status. +type Status struct { + ID string `bun:"type:CHAR(26)"` +} diff --git a/internal/db/bundb/migrations/util.go b/internal/db/bundb/migrations/util.go index 4a3a62b21..b5543c824 100644 --- a/internal/db/bundb/migrations/util.go +++ b/internal/db/bundb/migrations/util.go @@ -66,98 +66,6 @@ func doWALCheckpoint(ctx context.Context, db *bun.DB) error { return nil } -// batchUpdateByID performs the given updateQuery with updateArgs -// over the entire given table, batching by the ID of batchByCol. -func batchUpdateByID( - ctx context.Context, - tx bun.Tx, - table string, - batchByCol string, - updateQuery string, - updateArgs []any, -) error { - // Get a count of all in table. - total, err := tx.NewSelect(). - Table(table). - Count(ctx) - if err != nil { - return gtserror.Newf("error selecting total count: %w", err) - } - - // Query batch size - // in number of rows. - const batchsz = 5000 - - // Stores highest batch value - // used in iterate queries, - // starting at highest possible. - highest := id.Highest - - // Total updated rows. - var updated int - - for { - // Limit to batchsz - // items at once. - batchQ := tx. - NewSelect(). - Table(table). - Column(batchByCol). - Where("? < ?", bun.Ident(batchByCol), highest). - OrderExpr("? DESC", bun.Ident(batchByCol)). - Limit(batchsz) - - // Finalize UPDATE to act only on batch. - qStr := updateQuery + " WHERE ? IN (?)" - args := append(slices.Clone(updateArgs), - bun.Ident(batchByCol), - batchQ, - ) - - // Execute the prepared raw query with arguments. - res, err := tx.NewRaw(qStr, args...).Exec(ctx) - if err != nil { - return gtserror.Newf("error updating old column values: %w", err) - } - - // Check how many items we updated. - thisUpdated, err := res.RowsAffected() - if err != nil { - return gtserror.Newf("error counting affected rows: %w", err) - } - - if thisUpdated == 0 { - // Nothing updated - // means we're done. - break - } - - // Update the overall count. - updated += int(thisUpdated) - - // Log helpful message to admin. - log.Infof(ctx, "migrated %d of %d %s (up to %s)", - updated, total, table, highest) - - // Get next highest - // id for next batch. - if err := tx. - NewSelect(). - With("batch_query", batchQ). - ColumnExpr("min(?) FROM ?", bun.Ident(batchByCol), bun.Ident("batch_query")). - Scan(ctx, &highest); err != nil { - return gtserror.Newf("error selecting next highest: %w", err) - } - } - - if total != int(updated) { - // Return error here in order to rollback the whole transaction. - return fmt.Errorf("total=%d does not match updated=%d", total, updated) - } - - return nil -} - // convertEnums performs a transaction that converts // a table's column of our old-style enums (strings) to // more performant and space-saving integer types. diff --git a/internal/gtsmodel/status.go b/internal/gtsmodel/status.go index 31e8fe881..0f0fb8404 100644 --- a/internal/gtsmodel/status.go +++ b/internal/gtsmodel/status.go @@ -27,56 +27,56 @@ import ( // Status represents a user-created 'post' or 'status' in the database, either remote or local type Status struct { - ID string `bun:"type:CHAR(26),pk,nullzero,notnull,unique"` // id of this item in the database - CreatedAt time.Time `bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"` // when was item created - EditedAt time.Time `bun:"type:timestamptz,nullzero"` // when this status was last edited (if set) - FetchedAt time.Time `bun:"type:timestamptz,nullzero"` // when was item (remote) last fetched. - PinnedAt time.Time `bun:"type:timestamptz,nullzero"` // Status was pinned by owning account at this time. - URI string `bun:",unique,nullzero,notnull"` // activitypub URI of this status - URL string `bun:",nullzero"` // web url for viewing this status - Content string `bun:""` // Content HTML for this status. - AttachmentIDs []string `bun:"attachments,array"` // Database IDs of any media attachments associated with this status - Attachments []*MediaAttachment `bun:"attached_media,rel:has-many"` // Attachments corresponding to attachmentIDs - TagIDs []string `bun:"tags,array"` // Database IDs of any tags used in this status - Tags []*Tag `bun:"attached_tags,m2m:status_to_tags"` // Tags corresponding to tagIDs. https://bun.uptrace.dev/guide/relations.html#many-to-many-relation - MentionIDs []string `bun:"mentions,array"` // Database IDs of any mentions in this status - Mentions []*Mention `bun:"attached_mentions,rel:has-many"` // Mentions corresponding to mentionIDs - EmojiIDs []string `bun:"emojis,array"` // Database IDs of any emojis used in this status - Emojis []*Emoji `bun:"attached_emojis,m2m:status_to_emojis"` // Emojis corresponding to emojiIDs. https://bun.uptrace.dev/guide/relations.html#many-to-many-relation - Local *bool `bun:",nullzero,notnull,default:false"` // is this status from a local account? - AccountID string `bun:"type:CHAR(26),nullzero,notnull"` // which account posted this status? - Account *Account `bun:"rel:belongs-to"` // account corresponding to accountID - AccountURI string `bun:",nullzero,notnull"` // activitypub uri of the owner of this status - InReplyToID string `bun:"type:CHAR(26),nullzero"` // id of the status this status replies to - InReplyToURI string `bun:",nullzero"` // activitypub uri of the status this status is a reply to - InReplyToAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that this status replies to - InReplyTo *Status `bun:"-"` // status corresponding to inReplyToID - InReplyToAccount *Account `bun:"rel:belongs-to"` // account corresponding to inReplyToAccountID - BoostOfID string `bun:"type:CHAR(26),nullzero"` // id of the status this status is a boost of - BoostOfURI string `bun:"-"` // URI of the status this status is a boost of; field not inserted in the db, just for dereferencing purposes. - BoostOfAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that owns the boosted status - BoostOf *Status `bun:"-"` // status that corresponds to boostOfID - BoostOfAccount *Account `bun:"rel:belongs-to"` // account that corresponds to boostOfAccountID - ThreadID string `bun:"type:CHAR(26),nullzero,notnull,default:00000000000000000000000000"` // id of the thread to which this status belongs - EditIDs []string `bun:"edits,array"` // IDs of status edits for this status, ordered from smallest (oldest) -> largest (newest) ID. - Edits []*StatusEdit `bun:"-"` // Edits of this status, ordered from oldest -> newest edit. - PollID string `bun:"type:CHAR(26),nullzero"` // - Poll *Poll `bun:"-"` // - ContentWarning string `bun:",nullzero"` // Content warning HTML for this status. - ContentWarningText string `bun:""` // Original text of the content warning without formatting - Visibility Visibility `bun:",nullzero,notnull"` // visibility entry for this status - Sensitive *bool `bun:",nullzero,notnull,default:false"` // mark the status as sensitive? - Language string `bun:",nullzero"` // what language is this status written in? - CreatedWithApplicationID string `bun:"type:CHAR(26),nullzero"` // Which application was used to create this status? - CreatedWithApplication *Application `bun:"rel:belongs-to"` // application corresponding to createdWithApplicationID - ActivityStreamsType string `bun:",nullzero,notnull"` // What is the activitystreams type of this status? See: https://www.w3.org/TR/activitystreams-vocabulary/#object-types. Will probably almost always be Note but who knows!. - Text string `bun:""` // Original text of the status without formatting - ContentType StatusContentType `bun:",nullzero"` // Content type used to process the original text of the status - Federated *bool `bun:",notnull"` // This status will be federated beyond the local timeline(s) - InteractionPolicy *InteractionPolicy `bun:""` // InteractionPolicy for this status. If null then the default InteractionPolicy should be assumed for this status's Visibility. Always null for boost wrappers. - PendingApproval *bool `bun:",nullzero,notnull,default:false"` // If true then status is a reply or boost wrapper that must be Approved by the reply-ee or boost-ee before being fully distributed. - PreApproved bool `bun:"-"` // If true, then status is a reply to or boost wrapper of a status on our instance, has permission to do the interaction, and an Accept should be sent out for it immediately. Field not stored in the DB. - ApprovedByURI string `bun:",nullzero"` // URI of *either* an Accept Activity, or a ReplyAuthorization or AnnounceAuthorization, which approves the Announce, Create or interaction request Activity that this status was/will be attached to. + ID string `bun:"type:CHAR(26),pk,nullzero,notnull,unique"` // id of this item in the database + CreatedAt time.Time `bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"` // when was item created + EditedAt time.Time `bun:"type:timestamptz,nullzero"` // when this status was last edited (if set) + FetchedAt time.Time `bun:"type:timestamptz,nullzero"` // when was item (remote) last fetched. + PinnedAt time.Time `bun:"type:timestamptz,nullzero"` // Status was pinned by owning account at this time. + URI string `bun:",unique,nullzero,notnull"` // activitypub URI of this status + URL string `bun:",nullzero"` // web url for viewing this status + Content string `bun:""` // Content HTML for this status. + AttachmentIDs []string `bun:"attachments,array"` // Database IDs of any media attachments associated with this status + Attachments []*MediaAttachment `bun:"attached_media,rel:has-many"` // Attachments corresponding to attachmentIDs + TagIDs []string `bun:"tags,array"` // Database IDs of any tags used in this status + Tags []*Tag `bun:"attached_tags,m2m:status_to_tags"` // Tags corresponding to tagIDs. https://bun.uptrace.dev/guide/relations.html#many-to-many-relation + MentionIDs []string `bun:"mentions,array"` // Database IDs of any mentions in this status + Mentions []*Mention `bun:"attached_mentions,rel:has-many"` // Mentions corresponding to mentionIDs + EmojiIDs []string `bun:"emojis,array"` // Database IDs of any emojis used in this status + Emojis []*Emoji `bun:"attached_emojis,m2m:status_to_emojis"` // Emojis corresponding to emojiIDs. https://bun.uptrace.dev/guide/relations.html#many-to-many-relation + Local *bool `bun:",nullzero,notnull,default:false"` // is this status from a local account? + AccountID string `bun:"type:CHAR(26),nullzero,notnull"` // which account posted this status? + Account *Account `bun:"rel:belongs-to"` // account corresponding to accountID + AccountURI string `bun:",nullzero,notnull"` // activitypub uri of the owner of this status + InReplyToID string `bun:"type:CHAR(26),nullzero"` // id of the status this status replies to + InReplyToURI string `bun:",nullzero"` // activitypub uri of the status this status is a reply to + InReplyToAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that this status replies to + InReplyTo *Status `bun:"-"` // status corresponding to inReplyToID + InReplyToAccount *Account `bun:"rel:belongs-to"` // account corresponding to inReplyToAccountID + BoostOfID string `bun:"type:CHAR(26),nullzero"` // id of the status this status is a boost of + BoostOfURI string `bun:"-"` // URI of the status this status is a boost of; field not inserted in the db, just for dereferencing purposes. + BoostOfAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that owns the boosted status + BoostOf *Status `bun:"-"` // status that corresponds to boostOfID + BoostOfAccount *Account `bun:"rel:belongs-to"` // account that corresponds to boostOfAccountID + ThreadID string `bun:"type:CHAR(26),nullzero,notnull,default:'00000000000000000000000000'"` // id of the thread to which this status belongs + EditIDs []string `bun:"edits,array"` // IDs of status edits for this status, ordered from smallest (oldest) -> largest (newest) ID. + Edits []*StatusEdit `bun:"-"` // Edits of this status, ordered from oldest -> newest edit. + PollID string `bun:"type:CHAR(26),nullzero"` // + Poll *Poll `bun:"-"` // + ContentWarning string `bun:",nullzero"` // Content warning HTML for this status. + ContentWarningText string `bun:""` // Original text of the content warning without formatting + Visibility Visibility `bun:",nullzero,notnull"` // visibility entry for this status + Sensitive *bool `bun:",nullzero,notnull,default:false"` // mark the status as sensitive? + Language string `bun:",nullzero"` // what language is this status written in? + CreatedWithApplicationID string `bun:"type:CHAR(26),nullzero"` // Which application was used to create this status? + CreatedWithApplication *Application `bun:"rel:belongs-to"` // application corresponding to createdWithApplicationID + ActivityStreamsType string `bun:",nullzero,notnull"` // What is the activitystreams type of this status? See: https://www.w3.org/TR/activitystreams-vocabulary/#object-types. Will probably almost always be Note but who knows!. + Text string `bun:""` // Original text of the status without formatting + ContentType StatusContentType `bun:",nullzero"` // Content type used to process the original text of the status + Federated *bool `bun:",notnull"` // This status will be federated beyond the local timeline(s) + InteractionPolicy *InteractionPolicy `bun:""` // InteractionPolicy for this status. If null then the default InteractionPolicy should be assumed for this status's Visibility. Always null for boost wrappers. + PendingApproval *bool `bun:",nullzero,notnull,default:false"` // If true then status is a reply or boost wrapper that must be Approved by the reply-ee or boost-ee before being fully distributed. + PreApproved bool `bun:"-"` // If true, then status is a reply to or boost wrapper of a status on our instance, has permission to do the interaction, and an Accept should be sent out for it immediately. Field not stored in the DB. + ApprovedByURI string `bun:",nullzero"` // URI of *either* an Accept Activity, or a ReplyAuthorization or AnnounceAuthorization, which approves the Announce, Create or interaction request Activity that this status was/will be attached to. } // GetID implements timeline.Timelineable{}. From ff950e94bb8a2e1b3c905bdba4c44d0232704b18 Mon Sep 17 00:00:00 2001 From: kim Date: Fri, 3 Oct 2025 15:29:41 +0200 Subject: [PATCH 2/3] [chore] update dependencies (#4468) - github.com/ncruces/go-sqlite3 - codeberg.org/gruf/go-mempool - codeberg.org/gruf/go-structr (changes related on the above) * - codeberg.org/gruf/go-mutexes (changes related on the above) * * this is largely just fiddling around with package internals in structr and mutexes to rely on changes in mempool, which added a new concurrency-safe pool Reviewed-on: https://codeberg.org/superseriousbusiness/gotosocial/pulls/4468 Co-authored-by: kim Co-committed-by: kim --- go.mod | 8 +- go.sum | 16 +- internal/queue/simple.go | 2 +- vendor/codeberg.org/gruf/go-mempool/README.md | 2 +- vendor/codeberg.org/gruf/go-mempool/pool.go | 158 +++++++++++------- vendor/codeberg.org/gruf/go-mempool/simple.go | 111 ++++++++++++ vendor/codeberg.org/gruf/go-mutexes/map.go | 23 ++- vendor/codeberg.org/gruf/go-structr/index.go | 17 +- vendor/codeberg.org/gruf/go-structr/item.go | 18 +- vendor/codeberg.org/gruf/go-structr/list.go | 30 ++-- .../codeberg.org/gruf/go-structr/runtime.go | 10 +- .../codeberg.org/gruf/go-structr/timeline.go | 19 ++- vendor/github.com/ncruces/go-sqlite3/conn.go | 27 +-- .../github.com/ncruces/go-sqlite3/context.go | 16 -- .../ncruces/go-sqlite3/driver/driver.go | 107 ++++++------ .../ncruces/go-sqlite3/internal/util/json.go | 2 + .../go-sqlite3/internal/util/json_v2.go | 52 ++++++ vendor/github.com/ncruces/go-sqlite3/json.go | 83 ++++++++- .../github.com/ncruces/go-sqlite3/json_v2.go | 113 +++++++++++++ .../github.com/ncruces/go-sqlite3/sqlite.go | 8 +- vendor/github.com/ncruces/go-sqlite3/stmt.go | 40 ----- vendor/github.com/ncruces/go-sqlite3/time.go | 2 +- vendor/github.com/ncruces/go-sqlite3/value.go | 23 --- .../ncruces/go-sqlite3/vfs/const.go | 4 + .../github.com/ncruces/go-sqlite3/vfs/file.go | 68 ++++---- .../github.com/ncruces/go-sqlite3/vfs/lock.go | 2 +- .../ncruces/go-sqlite3/vfs/memdb/README.md | 5 +- .../ncruces/go-sqlite3/vfs/memdb/api.go | 19 ++- .../ncruces/go-sqlite3/vfs/os_darwin.go | 22 ++- .../ncruces/go-sqlite3/vfs/os_linux.go | 2 +- .../ncruces/go-sqlite3/vfs/os_std_sync.go | 2 +- vendor/modules.txt | 12 +- 32 files changed, 706 insertions(+), 317 deletions(-) create mode 100644 vendor/codeberg.org/gruf/go-mempool/simple.go create mode 100644 vendor/github.com/ncruces/go-sqlite3/internal/util/json_v2.go create mode 100644 vendor/github.com/ncruces/go-sqlite3/json_v2.go diff --git a/go.mod b/go.mod index b1d744ffa..9d3d8c988 100644 --- a/go.mod +++ b/go.mod @@ -25,13 +25,13 @@ require ( codeberg.org/gruf/go-iotools v0.0.0-20240710125620-934ae9c654cf codeberg.org/gruf/go-kv/v2 v2.0.7 codeberg.org/gruf/go-list v0.0.0-20240425093752-494db03d641f - codeberg.org/gruf/go-mempool v0.0.0-20240507125005-cef10d64a760 - codeberg.org/gruf/go-mutexes v1.5.3 + codeberg.org/gruf/go-mempool v0.0.0-20251003110531-b54adae66253 + codeberg.org/gruf/go-mutexes v1.5.8 codeberg.org/gruf/go-runners v1.6.3 codeberg.org/gruf/go-sched v1.2.4 codeberg.org/gruf/go-split v1.2.0 codeberg.org/gruf/go-storage v0.3.1 - codeberg.org/gruf/go-structr v0.9.9 + codeberg.org/gruf/go-structr v0.9.12 github.com/DmitriyVTitov/size v1.5.0 github.com/KimMachineGun/automemlimit v0.7.4 github.com/SherClockHolmes/webpush-go v1.4.0 @@ -53,7 +53,7 @@ require ( github.com/miekg/dns v1.1.68 github.com/minio/minio-go/v7 v7.0.95 github.com/mitchellh/mapstructure v1.5.0 - github.com/ncruces/go-sqlite3 v0.29.0 + github.com/ncruces/go-sqlite3 v0.29.1 github.com/oklog/ulid v1.3.1 github.com/pquerna/otp v1.5.0 github.com/rivo/uniseg v0.4.7 diff --git a/go.sum b/go.sum index 507d2d38f..f68de3494 100644 --- a/go.sum +++ b/go.sum @@ -42,10 +42,10 @@ codeberg.org/gruf/go-mangler/v2 v2.0.6 h1:c3cwnI6Mi17EAwGSYGNMN6+9PMzaIj2GLAKx9D codeberg.org/gruf/go-mangler/v2 v2.0.6/go.mod h1:CXIm7zAWPdNmZVAGM1NRiF/ekJTPE7YTb8kiRxiEFaQ= codeberg.org/gruf/go-maps v1.0.4 h1:K+Ww4vvR3TZqm5jqrKVirmguZwa3v1VUvmig2SE8uxY= codeberg.org/gruf/go-maps v1.0.4/go.mod h1:ASX7osM7kFwt5O8GfGflcFjrwYGD8eIuRLl/oMjhEi8= -codeberg.org/gruf/go-mempool v0.0.0-20240507125005-cef10d64a760 h1:m2/UCRXhjDwAg4vyji6iKCpomKw6P4PmBOUi5DvAMH4= -codeberg.org/gruf/go-mempool v0.0.0-20240507125005-cef10d64a760/go.mod h1:E3RcaCFNq4zXpvaJb8lfpPqdUAmSkP5F1VmMiEUYTEk= -codeberg.org/gruf/go-mutexes v1.5.3 h1:RIEy1UuDxKgAiINRMrPxTUWSGW6pFx9DzeJN4WPqra8= -codeberg.org/gruf/go-mutexes v1.5.3/go.mod h1:AnhagsMzUISL/nBVwhnHwDwTZOAxMILwCOG8/wKOblg= +codeberg.org/gruf/go-mempool v0.0.0-20251003110531-b54adae66253 h1:qPAY72xCWlySVROSNZecfLGAyeV/SiXmPmfhUU+o3Xw= +codeberg.org/gruf/go-mempool v0.0.0-20251003110531-b54adae66253/go.mod h1:761koiXmqfgzvu5mez2Rk7YlwWilpqJ/zv5hIA6NoNI= +codeberg.org/gruf/go-mutexes v1.5.8 h1:HRGnvT4COb3jX9xdeoSUUbjPgmk5kXPuDfld9ksUJKA= +codeberg.org/gruf/go-mutexes v1.5.8/go.mod h1:21sy/hWH8dDQBk7ocsxqo2GNpWiIir+e82RG3hjnN20= codeberg.org/gruf/go-runners v1.6.3 h1:To/AX7eTrWuXrTkA3RA01YTP5zha1VZ68LQ+0D4RY7E= codeberg.org/gruf/go-runners v1.6.3/go.mod h1:oXAaUmG2VxoKttpCqZGv5nQBeSvZSR2BzIk7h1yTRlU= codeberg.org/gruf/go-sched v1.2.4 h1:ddBB9o0D/2oU8NbQ0ldN5aWxogpXPRBATWi58+p++Hw= @@ -54,8 +54,8 @@ codeberg.org/gruf/go-split v1.2.0 h1:PmzL23nVEVHm8VxjsJmv4m4wGQz2bGgQw52dgSSj65c codeberg.org/gruf/go-split v1.2.0/go.mod h1:0rejWJpqvOoFAd7nwm5tIXYKaAqjtFGOXmTqQV+VO38= codeberg.org/gruf/go-storage v0.3.1 h1:g66UIM/xXnEk9ejT+W0T9s/PODBZhXa/8ajzeY/MELI= codeberg.org/gruf/go-storage v0.3.1/go.mod h1:r43n/zi7YGOCl2iSl7AMI27D1zcWS65Bi2+5xDzypeo= -codeberg.org/gruf/go-structr v0.9.9 h1:fwIzi/94yBNSWleXZIfVW/QyNK5+/xxI2reVYzu5V/c= -codeberg.org/gruf/go-structr v0.9.9/go.mod h1:5dsazOsIeJyV8Dl2DdSXqCDEZUx3e3dc41N6f2mPtgw= +codeberg.org/gruf/go-structr v0.9.12 h1:yMopvexnuKgZme9WgvIhrJaAuAjfper/x38xsVuJOOo= +codeberg.org/gruf/go-structr v0.9.12/go.mod h1:sP2ZSjM5X5XKlxuhAbTKuVQm9DWbHsrQRuTl3MUwbHw= codeberg.org/gruf/go-xunsafe v0.0.0-20250809104800-512a9df57d73 h1:pRaOwIOS1WSZoPCAvE0H1zpv+D4gF37OVppybffqdI8= codeberg.org/gruf/go-xunsafe v0.0.0-20250809104800-512a9df57d73/go.mod h1:9wkq+dmHjUhB/0ZxDUWAwsWuXwwGyx5N1dDCB9hpWs8= codeberg.org/superseriousbusiness/go-swagger v0.32.3-gts-go1.23-fix h1:k76/Th+bruqU/d+dB0Ru466ctTF2aVjKpisy/471ILE= @@ -338,8 +338,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= -github.com/ncruces/go-sqlite3 v0.29.0 h1:1tsLiagCoqZEfcHDeKsNSv5jvrY/Iu393pAnw2wLNJU= -github.com/ncruces/go-sqlite3 v0.29.0/go.mod h1:r1hSvYKPNJ+OlUA1O3r8o9LAawzPAlqeZiIdxTBBBJ0= +github.com/ncruces/go-sqlite3 v0.29.1 h1:NIi8AISWBToRHyoz01FXiTNvU147Tqdibgj2tFzJCqM= +github.com/ncruces/go-sqlite3 v0.29.1/go.mod h1:PpccBNNhvjwUOwDQEn2gXQPFPTWdlromj0+fSkd5KSg= github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4= github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= github.com/ncruces/julianday v1.0.0 h1:fH0OKwa7NWvniGQtxdJRxAgkBMolni2BjDHaWTxqt7M= diff --git a/internal/queue/simple.go b/internal/queue/simple.go index b5201f891..41dc695db 100644 --- a/internal/queue/simple.go +++ b/internal/queue/simple.go @@ -31,7 +31,7 @@ import ( // elements to reduce overall memory usage. type SimpleQueue[T any] struct { l list.List[T] - p mempool.UnsafePool + p mempool.UnsafeSimplePool w chan struct{} m sync.Mutex } diff --git a/vendor/codeberg.org/gruf/go-mempool/README.md b/vendor/codeberg.org/gruf/go-mempool/README.md index af4cb6770..1acc23d26 100644 --- a/vendor/codeberg.org/gruf/go-mempool/README.md +++ b/vendor/codeberg.org/gruf/go-mempool/README.md @@ -1,3 +1,3 @@ # go-mempool -very simple memory pool implementation \ No newline at end of file +very simple memory pool implementation diff --git a/vendor/codeberg.org/gruf/go-mempool/pool.go b/vendor/codeberg.org/gruf/go-mempool/pool.go index e5ff6ba3d..5bb80234c 100644 --- a/vendor/codeberg.org/gruf/go-mempool/pool.go +++ b/vendor/codeberg.org/gruf/go-mempool/pool.go @@ -1,17 +1,17 @@ package mempool import ( + "sync" + "sync/atomic" "unsafe" + + "golang.org/x/sys/cpu" ) -const DefaultDirtyFactor = 128 - -// Pool provides a type-safe form -// of UnsafePool using generics. -// -// Note it is NOT safe for concurrent -// use, you must protect it yourself! +// Pool provides a form of SimplePool +// with the addition of concurrency safety. type Pool[T any] struct { + UnsafePool // New is an optionally provided // allocator used when no value @@ -21,79 +21,119 @@ type Pool[T any] struct { // Reset is an optionally provided // value resetting function called // on passed value to Put(). - Reset func(T) + Reset func(T) bool +} - UnsafePool +func NewPool[T any](new func() T, reset func(T) bool, check func(current, victim int) bool) Pool[T] { + return Pool[T]{ + New: new, + Reset: reset, + UnsafePool: NewUnsafePool(check), + } } func (p *Pool[T]) Get() T { if ptr := p.UnsafePool.Get(); ptr != nil { return *(*T)(ptr) - } else if p.New != nil { - return p.New() } - var z T - return z + var t T + if p.New != nil { + t = p.New() + } + return t } func (p *Pool[T]) Put(t T) { - if p.Reset != nil { - p.Reset(t) + if p.Reset != nil && !p.Reset(t) { + return } ptr := unsafe.Pointer(&t) p.UnsafePool.Put(ptr) } -// UnsafePool provides an incredibly -// simple memory pool implementation -// that stores ptrs to memory values, -// and regularly flushes internal pool -// structures according to DirtyFactor. -// -// Note it is NOT safe for concurrent -// use, you must protect it yourself! +// UnsafePool provides a form of UnsafeSimplePool +// with the addition of concurrency safety. type UnsafePool struct { - - // DirtyFactor determines the max - // number of $dirty count before - // pool is garbage collected. Where: - // $dirty = len(current) - len(victim) - DirtyFactor int - - current []unsafe.Pointer - victim []unsafe.Pointer + internal + _ [cache_line_size - unsafe.Sizeof(internal{})%cache_line_size]byte } -func (p *UnsafePool) Get() unsafe.Pointer { - // First try current list. - if len(p.current) > 0 { - ptr := p.current[len(p.current)-1] - p.current = p.current[:len(p.current)-1] +func NewUnsafePool(check func(current, victim int) bool) UnsafePool { + return UnsafePool{internal: internal{ + pool: UnsafeSimplePool{Check: check}, + }} +} + +const ( + // current platform integer size. + int_size = 32 << (^uint(0) >> 63) + + // platform CPU cache line size to avoid false sharing. + cache_line_size = unsafe.Sizeof(cpu.CacheLinePad{}) +) + +type internal struct { + // fast-access ring-buffer of + // pointers accessible by index. + // + // if Go ever exposes goroutine IDs + // to us we can make this a lot faster. + ring [int_size / 4]unsafe.Pointer + index atomic.Uint64 + + // underlying pool and + // slow mutex protection. + pool UnsafeSimplePool + mutex sync.Mutex +} + +func (p *internal) Check(fn func(current, victim int) bool) func(current, victim int) bool { + p.mutex.Lock() + if fn == nil { + if p.pool.Check == nil { + fn = defaultCheck + } else { + fn = p.pool.Check + } + } else { + p.pool.Check = fn + } + p.mutex.Unlock() + return fn +} + +func (p *internal) Get() unsafe.Pointer { + if ptr := atomic.SwapPointer(&p.ring[p.index.Load()%uint64(cap(p.ring))], nil); ptr != nil { + p.index.Add(^uint64(0)) // i.e. -1 return ptr } - - // Fallback to victim. - if len(p.victim) > 0 { - ptr := p.victim[len(p.victim)-1] - p.victim = p.victim[:len(p.victim)-1] - return ptr - } - - return nil + p.mutex.Lock() + ptr := p.pool.Get() + p.mutex.Unlock() + return ptr } -func (p *UnsafePool) Put(ptr unsafe.Pointer) { - p.current = append(p.current, ptr) - - // Get dirty factor. - df := p.DirtyFactor - if df == 0 { - df = DefaultDirtyFactor - } - - if len(p.current)-len(p.victim) > df { - // Garbage collection! - p.victim = p.current - p.current = nil +func (p *internal) Put(ptr unsafe.Pointer) { + if atomic.CompareAndSwapPointer(&p.ring[p.index.Add(1)%uint64(cap(p.ring))], nil, ptr) { + return } + p.mutex.Lock() + p.pool.Put(ptr) + p.mutex.Unlock() +} + +func (p *internal) GC() { + for i := range p.ring { + atomic.StorePointer(&p.ring[i], nil) + } + p.mutex.Lock() + p.pool.GC() + p.mutex.Unlock() +} + +func (p *internal) Size() int { + p.mutex.Lock() + sz := p.pool.Size() + p.mutex.Unlock() + return sz } diff --git a/vendor/codeberg.org/gruf/go-mempool/simple.go b/vendor/codeberg.org/gruf/go-mempool/simple.go new file mode 100644 index 000000000..c9f459890 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-mempool/simple.go @@ -0,0 +1,111 @@ +package mempool + +import ( + "unsafe" +) + +// SimplePool provides a type-safe form +// of UnsafePool using generics. +// +// Note it is NOT safe for concurrent +// use, you must protect it yourself! +type SimplePool[T any] struct { + UnsafeSimplePool + + // New is an optionally provided + // allocator used when no value + // is available for use in pool. + New func() T + + // Reset is an optionally provided + // value resetting function called + // on passed value to Put(). + Reset func(T) bool +} + +func (p *SimplePool[T]) Get() T { + if ptr := p.UnsafeSimplePool.Get(); ptr != nil { + return *(*T)(ptr) + } + var t T + if p.New != nil { + t = p.New() + } + return t +} + +func (p *SimplePool[T]) Put(t T) { + if p.Reset != nil && !p.Reset(t) { + return + } + ptr := unsafe.Pointer(&t) + p.UnsafeSimplePool.Put(ptr) +} + +// UnsafeSimplePool provides an incredibly +// simple memory pool implementation +// that stores ptrs to memory values, +// and regularly flushes internal pool +// structures according to CheckGC(). +// +// Note it is NOT safe for concurrent +// use, you must protect it yourself! +type UnsafeSimplePool struct { + + // Check determines how often to flush + // internal pools based on underlying + // current and victim pool sizes. It gets + // called on every pool Put() operation. + // + // A flush will start a new current + // pool, make victim the old current, + // and drop the existing victim pool. + Check func(current, victim int) bool + + current []unsafe.Pointer + victim []unsafe.Pointer +} + +func (p *UnsafeSimplePool) Get() unsafe.Pointer { + // First try current list. + if len(p.current) > 0 { + ptr := p.current[len(p.current)-1] + p.current = p.current[:len(p.current)-1] + return ptr + } + + // Fallback to victim. + if len(p.victim) > 0 { + ptr := p.victim[len(p.victim)-1] + p.victim = p.victim[:len(p.victim)-1] + return ptr + } + + return nil +} + +func (p *UnsafeSimplePool) Put(ptr unsafe.Pointer) { + p.current = append(p.current, ptr) + + // Get GC check func. + if p.Check == nil { + p.Check = defaultCheck + } + + if p.Check(len(p.current), len(p.victim)) { + p.GC() // garbage collection time! + } +} + +func (p *UnsafeSimplePool) GC() { + p.victim = p.current + p.current = nil +} + +func (p *UnsafeSimplePool) Size() int { + return len(p.current) + len(p.victim) +} + +func defaultCheck(current, victim int) bool { + return current-victim > 128 || victim > 256 +} diff --git a/vendor/codeberg.org/gruf/go-mutexes/map.go b/vendor/codeberg.org/gruf/go-mutexes/map.go index 2f21ae0bb..e8c4d0058 100644 --- a/vendor/codeberg.org/gruf/go-mutexes/map.go +++ b/vendor/codeberg.org/gruf/go-mutexes/map.go @@ -26,14 +26,13 @@ const ( type MutexMap struct { mapmu sync.Mutex mumap hashmap - mupool mempool.UnsafePool + mupool mempool.UnsafeSimplePool } // checkInit ensures MutexMap is initialized (UNSAFE). func (mm *MutexMap) checkInit() { if mm.mumap.m == nil { mm.mumap.init(0) - mm.mupool.DirtyFactor = 256 } } @@ -175,13 +174,9 @@ func (mu *rwmutex) Lock(lt uint8) bool { // sleeping goroutines waiting on this mutex. func (mu *rwmutex) Unlock() bool { switch mu.l--; { - case mu.l > 0 && mu.t == lockTypeWrite: - panic("BUG: multiple writer locks") - case mu.l < 0: - panic("BUG: negative lock count") - case mu.l == 0: - // Fully unlocked. + // Fully + // unlock. mu.t = 0 // Awake all blocked goroutines and check @@ -197,11 +192,15 @@ func (mu *rwmutex) Unlock() bool { // (before == after) => (waiters = 0) return (before == after) - default: - // i.e. mutex still - // locked by others. - return false + case mu.l < 0: + panic("BUG: negative lock count") + case mu.t == lockTypeWrite: + panic("BUG: multiple write locks") } + + // i.e. mutex still + // locked by others. + return false } // WaitRelock expects a mutex to be passed in, already in the diff --git a/vendor/codeberg.org/gruf/go-structr/index.go b/vendor/codeberg.org/gruf/go-structr/index.go index d5bd5562e..d8469577d 100644 --- a/vendor/codeberg.org/gruf/go-structr/index.go +++ b/vendor/codeberg.org/gruf/go-structr/index.go @@ -4,10 +4,10 @@ import ( "os" "reflect" "strings" - "sync" "unsafe" "codeberg.org/gruf/go-byteutil" + "codeberg.org/gruf/go-mempool" "codeberg.org/gruf/go-xunsafe" ) @@ -371,17 +371,15 @@ type index_entry struct { key string } -var index_entry_pool sync.Pool +var index_entry_pool mempool.UnsafePool // new_index_entry returns a new prepared index_entry. func new_index_entry() *index_entry { - v := index_entry_pool.Get() - if v == nil { - e := new(index_entry) - e.elem.data = unsafe.Pointer(e) - v = e + if ptr := index_entry_pool.Get(); ptr != nil { + return (*index_entry)(ptr) } - entry := v.(*index_entry) + entry := new(index_entry) + entry.elem.data = unsafe.Pointer(entry) return entry } @@ -396,7 +394,8 @@ func free_index_entry(entry *index_entry) { entry.key = "" entry.index = nil entry.item = nil - index_entry_pool.Put(entry) + ptr := unsafe.Pointer(entry) + index_entry_pool.Put(ptr) } func is_unique(f uint8) bool { diff --git a/vendor/codeberg.org/gruf/go-structr/item.go b/vendor/codeberg.org/gruf/go-structr/item.go index 08f054907..4c63b97c6 100644 --- a/vendor/codeberg.org/gruf/go-structr/item.go +++ b/vendor/codeberg.org/gruf/go-structr/item.go @@ -2,8 +2,9 @@ package structr import ( "os" - "sync" "unsafe" + + "codeberg.org/gruf/go-mempool" ) type indexed_item struct { @@ -19,17 +20,15 @@ type indexed_item struct { indexed []*index_entry } -var indexed_item_pool sync.Pool +var indexed_item_pool mempool.UnsafePool // new_indexed_item returns a new prepared indexed_item. func new_indexed_item() *indexed_item { - v := indexed_item_pool.Get() - if v == nil { - i := new(indexed_item) - i.elem.data = unsafe.Pointer(i) - v = i + if ptr := indexed_item_pool.Get(); ptr != nil { + return (*indexed_item)(ptr) } - item := v.(*indexed_item) + item := new(indexed_item) + item.elem.data = unsafe.Pointer(item) return item } @@ -43,7 +42,8 @@ func free_indexed_item(item *indexed_item) { return } item.data = nil - indexed_item_pool.Put(item) + ptr := unsafe.Pointer(item) + indexed_item_pool.Put(ptr) } // drop_index will drop the given index entry from item's indexed. diff --git a/vendor/codeberg.org/gruf/go-structr/list.go b/vendor/codeberg.org/gruf/go-structr/list.go index a2cb5b658..1c931fd51 100644 --- a/vendor/codeberg.org/gruf/go-structr/list.go +++ b/vendor/codeberg.org/gruf/go-structr/list.go @@ -2,8 +2,9 @@ package structr import ( "os" - "sync" "unsafe" + + "codeberg.org/gruf/go-mempool" ) // elem represents an elem @@ -27,16 +28,14 @@ type list struct { len int } -var list_pool sync.Pool +var list_pool mempool.UnsafePool // new_list returns a new prepared list. func new_list() *list { - v := list_pool.Get() - if v == nil { - v = new(list) + if ptr := list_pool.Get(); ptr != nil { + return (*list)(ptr) } - list := v.(*list) - return list + return new(list) } // free_list releases the list. @@ -48,11 +47,13 @@ func free_list(list *list) { os.Stderr.WriteString(msg + "\n") return } - list_pool.Put(list) + ptr := unsafe.Pointer(list) + list_pool.Put(ptr) } // push_front will push the given elem to front (head) of list. func (l *list) push_front(elem *list_elem) { + // Set new head. oldHead := l.head l.head = elem @@ -66,12 +67,14 @@ func (l *list) push_front(elem *list_elem) { l.tail = elem } - // Incr count + // Incr + // count l.len++ } // push_back will push the given elem to back (tail) of list. func (l *list) push_back(elem *list_elem) { + // Set new tail. oldTail := l.tail l.tail = elem @@ -85,7 +88,8 @@ func (l *list) push_back(elem *list_elem) { l.head = elem } - // Incr count + // Incr + // count l.len++ } @@ -131,7 +135,8 @@ func (l *list) insert(elem *list_elem, at *list_elem) { elem.next = oldNext } - // Incr count + // Incr + // count l.len++ } @@ -174,6 +179,7 @@ func (l *list) remove(elem *list_elem) { prev.next = next } - // Decr count + // Decr + // count l.len-- } diff --git a/vendor/codeberg.org/gruf/go-structr/runtime.go b/vendor/codeberg.org/gruf/go-structr/runtime.go index 8a8d53ede..508cd6e4c 100644 --- a/vendor/codeberg.org/gruf/go-structr/runtime.go +++ b/vendor/codeberg.org/gruf/go-structr/runtime.go @@ -146,7 +146,7 @@ func find_field(t xunsafe.TypeIter, names []string) (sfield struct_field, ftype sfield.mangle = mangler.Get(t) // Calculate zero value string. - zptr := zero_value_field(o, sfield.offsets) + zptr := zero_value_ptr(o, sfield.offsets) zstr := string(sfield.mangle(nil, zptr)) sfield.zerostr = zstr sfield.zero = zptr @@ -154,7 +154,9 @@ func find_field(t xunsafe.TypeIter, names []string) (sfield struct_field, ftype return } -// zero_value ... +// zero_value iterates the type contained in TypeIter{} along the given +// next_offset{} values, creating new ptrs where necessary, returning the +// zero reflect.Value{} after fully iterating the next_offset{} slice. func zero_value(t xunsafe.TypeIter, offsets []next_offset) reflect.Value { v := reflect.New(t.Type).Elem() for _, offset := range offsets { @@ -175,8 +177,8 @@ func zero_value(t xunsafe.TypeIter, offsets []next_offset) reflect.Value { return v } -// zero_value_field ... -func zero_value_field(t xunsafe.TypeIter, offsets []next_offset) unsafe.Pointer { +// zero_value_ptr returns the unsafe pointer address of the result of zero_value(). +func zero_value_ptr(t xunsafe.TypeIter, offsets []next_offset) unsafe.Pointer { return zero_value(t, offsets).Addr().UnsafePointer() } diff --git a/vendor/codeberg.org/gruf/go-structr/timeline.go b/vendor/codeberg.org/gruf/go-structr/timeline.go index 1e9703fca..749ec862a 100644 --- a/vendor/codeberg.org/gruf/go-structr/timeline.go +++ b/vendor/codeberg.org/gruf/go-structr/timeline.go @@ -8,6 +8,8 @@ import ( "strings" "sync" "unsafe" + + "codeberg.org/gruf/go-mempool" ) // Direction defines a direction @@ -1133,18 +1135,16 @@ func to_timeline_item(item *indexed_item) *timeline_item { return to } -var timeline_item_pool sync.Pool +var timeline_item_pool mempool.UnsafePool // new_timeline_item returns a new prepared timeline_item. func new_timeline_item() *timeline_item { - v := timeline_item_pool.Get() - if v == nil { - i := new(timeline_item) - i.elem.data = unsafe.Pointer(i) - i.ck = ^uint(0) - v = i + if ptr := timeline_item_pool.Get(); ptr != nil { + return (*timeline_item)(ptr) } - item := v.(*timeline_item) + item := new(timeline_item) + item.elem.data = unsafe.Pointer(item) + item.ck = ^uint(0) return item } @@ -1159,5 +1159,6 @@ func free_timeline_item(item *timeline_item) { } item.data = nil item.pk = nil - timeline_item_pool.Put(item) + ptr := unsafe.Pointer(item) + timeline_item_pool.Put(ptr) } diff --git a/vendor/github.com/ncruces/go-sqlite3/conn.go b/vendor/github.com/ncruces/go-sqlite3/conn.go index 7e88d8c85..a7eca1652 100644 --- a/vendor/github.com/ncruces/go-sqlite3/conn.go +++ b/vendor/github.com/ncruces/go-sqlite3/conn.go @@ -444,20 +444,27 @@ func (c *Conn) Status(op DBStatus, reset bool) (current, highwater int, err erro // https://sqlite.org/c3ref/table_column_metadata.html func (c *Conn) TableColumnMetadata(schema, table, column string) (declType, collSeq string, notNull, primaryKey, autoInc bool, err error) { defer c.arena.mark()() - - var schemaPtr, columnPtr ptr_t - declTypePtr := c.arena.new(ptrlen) - collSeqPtr := c.arena.new(ptrlen) - notNullPtr := c.arena.new(ptrlen) - autoIncPtr := c.arena.new(ptrlen) - primaryKeyPtr := c.arena.new(ptrlen) + var ( + declTypePtr ptr_t + collSeqPtr ptr_t + notNullPtr ptr_t + primaryKeyPtr ptr_t + autoIncPtr ptr_t + columnPtr ptr_t + schemaPtr ptr_t + ) + if column != "" { + declTypePtr = c.arena.new(ptrlen) + collSeqPtr = c.arena.new(ptrlen) + notNullPtr = c.arena.new(ptrlen) + primaryKeyPtr = c.arena.new(ptrlen) + autoIncPtr = c.arena.new(ptrlen) + columnPtr = c.arena.string(column) + } if schema != "" { schemaPtr = c.arena.string(schema) } tablePtr := c.arena.string(table) - if column != "" { - columnPtr = c.arena.string(column) - } rc := res_t(c.call("sqlite3_table_column_metadata", stk_t(c.handle), stk_t(schemaPtr), stk_t(tablePtr), stk_t(columnPtr), diff --git a/vendor/github.com/ncruces/go-sqlite3/context.go b/vendor/github.com/ncruces/go-sqlite3/context.go index 154c228cf..269bf52f9 100644 --- a/vendor/github.com/ncruces/go-sqlite3/context.go +++ b/vendor/github.com/ncruces/go-sqlite3/context.go @@ -1,7 +1,6 @@ package sqlite3 import ( - "encoding/json" "errors" "math" "time" @@ -173,21 +172,6 @@ func (ctx Context) ResultPointer(ptr any) { stk_t(ctx.handle), stk_t(valPtr)) } -// ResultJSON sets the result of the function to the JSON encoding of value. -// -// https://sqlite.org/c3ref/result_blob.html -func (ctx Context) ResultJSON(value any) { - err := json.NewEncoder(callbackWriter(func(p []byte) (int, error) { - ctx.ResultRawText(p[:len(p)-1]) // remove the newline - return 0, nil - })).Encode(value) - - if err != nil { - ctx.ResultError(err) - return // notest - } -} - // ResultValue sets the result of the function to a copy of [Value]. // // https://sqlite.org/c3ref/result_blob.html diff --git a/vendor/github.com/ncruces/go-sqlite3/driver/driver.go b/vendor/github.com/ncruces/go-sqlite3/driver/driver.go index 27496f6cb..5d2847369 100644 --- a/vendor/github.com/ncruces/go-sqlite3/driver/driver.go +++ b/vendor/github.com/ncruces/go-sqlite3/driver/driver.go @@ -607,14 +607,24 @@ func (r resultRowsAffected) RowsAffected() (int64, error) { type scantype byte const ( - _ANY scantype = iota - _INT scantype = scantype(sqlite3.INTEGER) - _REAL scantype = scantype(sqlite3.FLOAT) - _TEXT scantype = scantype(sqlite3.TEXT) - _BLOB scantype = scantype(sqlite3.BLOB) - _NULL scantype = scantype(sqlite3.NULL) - _BOOL scantype = iota + _ANY scantype = iota + _INT + _REAL + _TEXT + _BLOB + _NULL + _BOOL _TIME + _NOT_NULL +) + +var ( + _ [0]struct{} = [scantype(sqlite3.INTEGER) - _INT]struct{}{} + _ [0]struct{} = [scantype(sqlite3.FLOAT) - _REAL]struct{}{} + _ [0]struct{} = [scantype(sqlite3.TEXT) - _TEXT]struct{}{} + _ [0]struct{} = [scantype(sqlite3.BLOB) - _BLOB]struct{}{} + _ [0]struct{} = [scantype(sqlite3.NULL) - _NULL]struct{}{} + _ [0]struct{} = [_NOT_NULL & (_NOT_NULL - 1)]struct{}{} ) func scanFromDecl(decl string) scantype { @@ -644,8 +654,8 @@ type rows struct { *stmt names []string types []string - nulls []bool scans []scantype + dest []driver.Value } var ( @@ -675,34 +685,36 @@ func (r *rows) Columns() []string { func (r *rows) scanType(index int) scantype { if r.scans == nil { - count := r.Stmt.ColumnCount() + count := len(r.names) scans := make([]scantype, count) for i := range scans { scans[i] = scanFromDecl(strings.ToUpper(r.Stmt.ColumnDeclType(i))) } r.scans = scans } - return r.scans[index] + return r.scans[index] &^ _NOT_NULL } func (r *rows) loadColumnMetadata() { - if r.nulls == nil { + if r.types == nil { c := r.Stmt.Conn() - count := r.Stmt.ColumnCount() - nulls := make([]bool, count) + count := len(r.names) types := make([]string, count) scans := make([]scantype, count) - for i := range nulls { + for i := range types { + var notnull bool if col := r.Stmt.ColumnOriginName(i); col != "" { - types[i], _, nulls[i], _, _, _ = c.TableColumnMetadata( + types[i], _, notnull, _, _, _ = c.TableColumnMetadata( r.Stmt.ColumnDatabaseName(i), r.Stmt.ColumnTableName(i), col) types[i] = strings.ToUpper(types[i]) scans[i] = scanFromDecl(types[i]) + if notnull { + scans[i] |= _NOT_NULL + } } } - r.nulls = nulls r.types = types r.scans = scans } @@ -721,15 +733,13 @@ func (r *rows) ColumnTypeDatabaseTypeName(index int) string { func (r *rows) ColumnTypeNullable(index int) (nullable, ok bool) { r.loadColumnMetadata() - if r.nulls[index] { - return false, true - } - return true, false + nullable = r.scans[index]&^_NOT_NULL == 0 + return nullable, !nullable } func (r *rows) ColumnTypeScanType(index int) (typ reflect.Type) { r.loadColumnMetadata() - scan := r.scans[index] + scan := r.scans[index] &^ _NOT_NULL if r.Stmt.Busy() { // SQLite is dynamically typed and we now have a row. @@ -772,6 +782,7 @@ func (r *rows) ColumnTypeScanType(index int) (typ reflect.Type) { } func (r *rows) Next(dest []driver.Value) error { + r.dest = nil c := r.Stmt.Conn() if old := c.SetInterrupt(r.ctx); old != r.ctx { defer c.SetInterrupt(old) @@ -790,18 +801,7 @@ func (r *rows) Next(dest []driver.Value) error { } for i := range dest { scan := r.scanType(i) - switch v := dest[i].(type) { - case int64: - if scan == _BOOL { - switch v { - case 1: - dest[i] = true - case 0: - dest[i] = false - } - continue - } - case []byte: + if v, ok := dest[i].([]byte); ok { if len(v) == cap(v) { // a BLOB continue } @@ -816,38 +816,49 @@ func (r *rows) Next(dest []driver.Value) error { } } dest[i] = string(v) - case float64: - break - default: - continue } - if scan == _TIME { + switch scan { + case _TIME: t, err := r.tmRead.Decode(dest[i]) if err == nil { dest[i] = t - continue + } + case _BOOL: + switch dest[i] { + case int64(0): + dest[i] = false + case int64(1): + dest[i] = true } } } + r.dest = dest return nil } -func (r *rows) ScanColumn(dest any, index int) error { +func (r *rows) ScanColumn(dest any, index int) (err error) { // notest // Go 1.26 - var ptr *time.Time + var tm *time.Time + var ok *bool switch d := dest.(type) { case *time.Time: - ptr = d + tm = d case *sql.NullTime: - ptr = &d.Time + tm = &d.Time + ok = &d.Valid case *sql.Null[time.Time]: - ptr = &d.V + tm = &d.V + ok = &d.Valid default: return driver.ErrSkip } - if t := r.Stmt.ColumnTime(index, r.tmRead); !t.IsZero() { - *ptr = t - return nil + value := r.dest[index] + *tm, err = r.tmRead.Decode(value) + if ok != nil { + *ok = err == nil + if value == nil { + return nil + } } - return driver.ErrSkip + return err } diff --git a/vendor/github.com/ncruces/go-sqlite3/internal/util/json.go b/vendor/github.com/ncruces/go-sqlite3/internal/util/json.go index 846237405..f582734cf 100644 --- a/vendor/github.com/ncruces/go-sqlite3/internal/util/json.go +++ b/vendor/github.com/ncruces/go-sqlite3/internal/util/json.go @@ -1,3 +1,5 @@ +//go:build !goexperiment.jsonv2 + package util import ( diff --git a/vendor/github.com/ncruces/go-sqlite3/internal/util/json_v2.go b/vendor/github.com/ncruces/go-sqlite3/internal/util/json_v2.go new file mode 100644 index 000000000..2fb052233 --- /dev/null +++ b/vendor/github.com/ncruces/go-sqlite3/internal/util/json_v2.go @@ -0,0 +1,52 @@ +//go:build goexperiment.jsonv2 + +package util + +import ( + "encoding/json/v2" + "math" + "strconv" + "time" + "unsafe" +) + +type JSON struct{ Value any } + +func (j JSON) Scan(value any) error { + var buf []byte + + switch v := value.(type) { + case []byte: + buf = v + case string: + buf = unsafe.Slice(unsafe.StringData(v), len(v)) + case int64: + buf = strconv.AppendInt(nil, v, 10) + case float64: + buf = AppendNumber(nil, v) + case time.Time: + buf = append(buf, '"') + buf = v.AppendFormat(buf, time.RFC3339Nano) + buf = append(buf, '"') + case nil: + buf = []byte("null") + default: + panic(AssertErr()) + } + + return json.Unmarshal(buf, j.Value) +} + +func AppendNumber(dst []byte, f float64) []byte { + switch { + case math.IsNaN(f): + dst = append(dst, "null"...) + case math.IsInf(f, 1): + dst = append(dst, "9.0e999"...) + case math.IsInf(f, -1): + dst = append(dst, "-9.0e999"...) + default: + return strconv.AppendFloat(dst, f, 'g', -1, 64) + } + return dst +} diff --git a/vendor/github.com/ncruces/go-sqlite3/json.go b/vendor/github.com/ncruces/go-sqlite3/json.go index 2b762c092..78195f2e3 100644 --- a/vendor/github.com/ncruces/go-sqlite3/json.go +++ b/vendor/github.com/ncruces/go-sqlite3/json.go @@ -1,6 +1,13 @@ +//go:build !goexperiment.jsonv2 + package sqlite3 -import "github.com/ncruces/go-sqlite3/internal/util" +import ( + "encoding/json" + "strconv" + + "github.com/ncruces/go-sqlite3/internal/util" +) // JSON returns a value that can be used as an argument to // [database/sql.DB.Exec], [database/sql.Row.Scan] and similar methods to @@ -10,3 +17,77 @@ import "github.com/ncruces/go-sqlite3/internal/util" func JSON(value any) any { return util.JSON{Value: value} } + +// ResultJSON sets the result of the function to the JSON encoding of value. +// +// https://sqlite.org/c3ref/result_blob.html +func (ctx Context) ResultJSON(value any) { + err := json.NewEncoder(callbackWriter(func(p []byte) (int, error) { + ctx.ResultRawText(p[:len(p)-1]) // remove the newline + return 0, nil + })).Encode(value) + + if err != nil { + ctx.ResultError(err) + return // notest + } +} + +// BindJSON binds the JSON encoding of value to the prepared statement. +// The leftmost SQL parameter has an index of 1. +// +// https://sqlite.org/c3ref/bind_blob.html +func (s *Stmt) BindJSON(param int, value any) error { + return json.NewEncoder(callbackWriter(func(p []byte) (int, error) { + return 0, s.BindRawText(param, p[:len(p)-1]) // remove the newline + })).Encode(value) +} + +// ColumnJSON parses the JSON-encoded value of the result column +// and stores it in the value pointed to by ptr. +// The leftmost column of the result set has the index 0. +// +// https://sqlite.org/c3ref/column_blob.html +func (s *Stmt) ColumnJSON(col int, ptr any) error { + var data []byte + switch s.ColumnType(col) { + case NULL: + data = []byte("null") + case TEXT: + data = s.ColumnRawText(col) + case BLOB: + data = s.ColumnRawBlob(col) + case INTEGER: + data = strconv.AppendInt(nil, s.ColumnInt64(col), 10) + case FLOAT: + data = util.AppendNumber(nil, s.ColumnFloat(col)) + default: + panic(util.AssertErr()) + } + return json.Unmarshal(data, ptr) +} + +// JSON parses a JSON-encoded value +// and stores the result in the value pointed to by ptr. +func (v Value) JSON(ptr any) error { + var data []byte + switch v.Type() { + case NULL: + data = []byte("null") + case TEXT: + data = v.RawText() + case BLOB: + data = v.RawBlob() + case INTEGER: + data = strconv.AppendInt(nil, v.Int64(), 10) + case FLOAT: + data = util.AppendNumber(nil, v.Float()) + default: + panic(util.AssertErr()) + } + return json.Unmarshal(data, ptr) +} + +type callbackWriter func(p []byte) (int, error) + +func (fn callbackWriter) Write(p []byte) (int, error) { return fn(p) } diff --git a/vendor/github.com/ncruces/go-sqlite3/json_v2.go b/vendor/github.com/ncruces/go-sqlite3/json_v2.go new file mode 100644 index 000000000..4b74bc7a4 --- /dev/null +++ b/vendor/github.com/ncruces/go-sqlite3/json_v2.go @@ -0,0 +1,113 @@ +//go:build goexperiment.jsonv2 + +package sqlite3 + +import ( + "encoding/json/v2" + "strconv" + + "github.com/ncruces/go-sqlite3/internal/util" +) + +// JSON returns a value that can be used as an argument to +// [database/sql.DB.Exec], [database/sql.Row.Scan] and similar methods to +// store value as JSON, or decode JSON into value. +// JSON should NOT be used with [Stmt.BindJSON], [Stmt.ColumnJSON], +// [Value.JSON], or [Context.ResultJSON]. +func JSON(value any) any { + return util.JSON{Value: value} +} + +// ResultJSON sets the result of the function to the JSON encoding of value. +// +// https://sqlite.org/c3ref/result_blob.html +func (ctx Context) ResultJSON(value any) { + w := bytesWriter{sqlite: ctx.c.sqlite} + if err := json.MarshalWrite(&w, value); err != nil { + ctx.c.free(w.ptr) + ctx.ResultError(err) + return // notest + } + ctx.c.call("sqlite3_result_text_go", + stk_t(ctx.handle), stk_t(w.ptr), stk_t(len(w.buf))) +} + +// BindJSON binds the JSON encoding of value to the prepared statement. +// The leftmost SQL parameter has an index of 1. +// +// https://sqlite.org/c3ref/bind_blob.html +func (s *Stmt) BindJSON(param int, value any) error { + w := bytesWriter{sqlite: s.c.sqlite} + if err := json.MarshalWrite(&w, value); err != nil { + s.c.free(w.ptr) + return err + } + rc := res_t(s.c.call("sqlite3_bind_text_go", + stk_t(s.handle), stk_t(param), + stk_t(w.ptr), stk_t(len(w.buf)))) + return s.c.error(rc) +} + +// ColumnJSON parses the JSON-encoded value of the result column +// and stores it in the value pointed to by ptr. +// The leftmost column of the result set has the index 0. +// +// https://sqlite.org/c3ref/column_blob.html +func (s *Stmt) ColumnJSON(col int, ptr any) error { + var data []byte + switch s.ColumnType(col) { + case NULL: + data = []byte("null") + case TEXT: + data = s.ColumnRawText(col) + case BLOB: + data = s.ColumnRawBlob(col) + case INTEGER: + data = strconv.AppendInt(nil, s.ColumnInt64(col), 10) + case FLOAT: + data = util.AppendNumber(nil, s.ColumnFloat(col)) + default: + panic(util.AssertErr()) + } + return json.Unmarshal(data, ptr) +} + +// JSON parses a JSON-encoded value +// and stores the result in the value pointed to by ptr. +func (v Value) JSON(ptr any) error { + var data []byte + switch v.Type() { + case NULL: + data = []byte("null") + case TEXT: + data = v.RawText() + case BLOB: + data = v.RawBlob() + case INTEGER: + data = strconv.AppendInt(nil, v.Int64(), 10) + case FLOAT: + data = util.AppendNumber(nil, v.Float()) + default: + panic(util.AssertErr()) + } + return json.Unmarshal(data, ptr) +} + +type bytesWriter struct { + *sqlite + buf []byte + ptr ptr_t +} + +func (b *bytesWriter) Write(p []byte) (n int, err error) { + if len(p) > cap(b.buf)-len(b.buf) { + want := int64(len(b.buf)) + int64(len(p)) + grow := int64(cap(b.buf)) + grow += grow >> 1 + want = max(want, grow) + b.ptr = b.realloc(b.ptr, want) + b.buf = util.View(b.mod, b.ptr, want)[:len(b.buf)] + } + b.buf = append(b.buf, p...) + return len(p), nil +} diff --git a/vendor/github.com/ncruces/go-sqlite3/sqlite.go b/vendor/github.com/ncruces/go-sqlite3/sqlite.go index c05a86fde..fb64ac5c0 100644 --- a/vendor/github.com/ncruces/go-sqlite3/sqlite.go +++ b/vendor/github.com/ncruces/go-sqlite3/sqlite.go @@ -5,6 +5,7 @@ import ( "context" "math/bits" "os" + "strings" "sync" "unsafe" @@ -128,11 +129,10 @@ func (sqlt *sqlite) error(rc res_t, handle ptr_t, sql ...string) error { var msg, query string if ptr := ptr_t(sqlt.call("sqlite3_errmsg", stk_t(handle))); ptr != 0 { msg = util.ReadString(sqlt.mod, ptr, _MAX_LENGTH) - switch { - case msg == "not an error": - msg = "" - case msg == util.ErrorCodeString(uint32(rc))[len("sqlite3: "):]: + if msg == "not an error" { msg = "" + } else { + msg = strings.TrimPrefix(msg, util.ErrorCodeString(uint32(rc))[len("sqlite3: "):]) } } diff --git a/vendor/github.com/ncruces/go-sqlite3/stmt.go b/vendor/github.com/ncruces/go-sqlite3/stmt.go index 706182f9f..e2523b6cb 100644 --- a/vendor/github.com/ncruces/go-sqlite3/stmt.go +++ b/vendor/github.com/ncruces/go-sqlite3/stmt.go @@ -1,9 +1,7 @@ package sqlite3 import ( - "encoding/json" "math" - "strconv" "time" "github.com/ncruces/go-sqlite3/internal/util" @@ -362,16 +360,6 @@ func (s *Stmt) BindPointer(param int, ptr any) error { return s.c.error(rc) } -// BindJSON binds the JSON encoding of value to the prepared statement. -// The leftmost SQL parameter has an index of 1. -// -// https://sqlite.org/c3ref/bind_blob.html -func (s *Stmt) BindJSON(param int, value any) error { - return json.NewEncoder(callbackWriter(func(p []byte) (int, error) { - return 0, s.BindRawText(param, p[:len(p)-1]) // remove the newline - })).Encode(value) -} - // BindValue binds a copy of value to the prepared statement. // The leftmost SQL parameter has an index of 1. // @@ -598,30 +586,6 @@ func (s *Stmt) columnRawBytes(col int, ptr ptr_t, nul int32) []byte { return util.View(s.c.mod, ptr, int64(n+nul))[:n] } -// ColumnJSON parses the JSON-encoded value of the result column -// and stores it in the value pointed to by ptr. -// The leftmost column of the result set has the index 0. -// -// https://sqlite.org/c3ref/column_blob.html -func (s *Stmt) ColumnJSON(col int, ptr any) error { - var data []byte - switch s.ColumnType(col) { - case NULL: - data = []byte("null") - case TEXT: - data = s.ColumnRawText(col) - case BLOB: - data = s.ColumnRawBlob(col) - case INTEGER: - data = strconv.AppendInt(nil, s.ColumnInt64(col), 10) - case FLOAT: - data = util.AppendNumber(nil, s.ColumnFloat(col)) - default: - panic(util.AssertErr()) - } - return json.Unmarshal(data, ptr) -} - // ColumnValue returns the unprotected value of the result column. // The leftmost column of the result set has the index 0. // @@ -748,7 +712,3 @@ func (s *Stmt) columns(count int64) ([]byte, ptr_t, error) { return util.View(s.c.mod, typePtr, count), dataPtr, nil } - -type callbackWriter func(p []byte) (int, error) - -func (fn callbackWriter) Write(p []byte) (int, error) { return fn(p) } diff --git a/vendor/github.com/ncruces/go-sqlite3/time.go b/vendor/github.com/ncruces/go-sqlite3/time.go index d9c516c81..19bcd2b0b 100644 --- a/vendor/github.com/ncruces/go-sqlite3/time.go +++ b/vendor/github.com/ncruces/go-sqlite3/time.go @@ -94,7 +94,7 @@ func (f TimeFormat) Encode(t time.Time) any { case TimeFormatUnix: return t.Unix() case TimeFormatUnixFrac: - return float64(t.Unix()) + float64(t.Nanosecond())*1e-9 + return math.FMA(1e-9, float64(t.Nanosecond()), float64(t.Unix())) case TimeFormatUnixMilli: return t.UnixMilli() case TimeFormatUnixMicro: diff --git a/vendor/github.com/ncruces/go-sqlite3/value.go b/vendor/github.com/ncruces/go-sqlite3/value.go index 6806e9a79..994743f82 100644 --- a/vendor/github.com/ncruces/go-sqlite3/value.go +++ b/vendor/github.com/ncruces/go-sqlite3/value.go @@ -1,9 +1,7 @@ package sqlite3 import ( - "encoding/json" "math" - "strconv" "time" "github.com/ncruces/go-sqlite3/internal/util" @@ -162,27 +160,6 @@ func (v Value) Pointer() any { return util.GetHandle(v.c.ctx, ptr) } -// JSON parses a JSON-encoded value -// and stores the result in the value pointed to by ptr. -func (v Value) JSON(ptr any) error { - var data []byte - switch v.Type() { - case NULL: - data = []byte("null") - case TEXT: - data = v.RawText() - case BLOB: - data = v.RawBlob() - case INTEGER: - data = strconv.AppendInt(nil, v.Int64(), 10) - case FLOAT: - data = util.AppendNumber(nil, v.Float()) - default: - panic(util.AssertErr()) - } - return json.Unmarshal(data, ptr) -} - // NoChange returns true if and only if the value is unchanged // in a virtual table update operatiom. // diff --git a/vendor/github.com/ncruces/go-sqlite3/vfs/const.go b/vendor/github.com/ncruces/go-sqlite3/vfs/const.go index 11afb1254..9ed67e385 100644 --- a/vendor/github.com/ncruces/go-sqlite3/vfs/const.go +++ b/vendor/github.com/ncruces/go-sqlite3/vfs/const.go @@ -94,6 +94,10 @@ const ( OPEN_PRIVATECACHE OpenFlag = 0x00040000 /* Ok for sqlite3_open_v2() */ OPEN_WAL OpenFlag = 0x00080000 /* VFS only */ OPEN_NOFOLLOW OpenFlag = 0x01000000 /* Ok for sqlite3_open_v2() */ + _FLAG_ATOMIC OpenFlag = 0x10000000 + _FLAG_KEEP_WAL OpenFlag = 0x20000000 + _FLAG_PSOW OpenFlag = 0x40000000 + _FLAG_SYNC_DIR OpenFlag = 0x80000000 ) // AccessFlag is a flag for the [VFS] Access method. diff --git a/vendor/github.com/ncruces/go-sqlite3/vfs/file.go b/vendor/github.com/ncruces/go-sqlite3/vfs/file.go index 06906c961..bdebdf6aa 100644 --- a/vendor/github.com/ncruces/go-sqlite3/vfs/file.go +++ b/vendor/github.com/ncruces/go-sqlite3/vfs/file.go @@ -51,7 +51,7 @@ func (vfsOS) Delete(path string, syncDir bool) error { return _OK } defer f.Close() - err = osSync(f, false, false) + err = osSync(f, 0, SYNC_FULL) if err != nil { return _IOERR_DIR_FSYNC } @@ -131,27 +131,24 @@ func (vfsOS) OpenFilename(name *Filename, flags OpenFlag) (File, OpenFlag, error } file := vfsFile{ - File: f, - psow: true, - atomic: osBatchAtomic(f), - readOnly: flags&OPEN_READONLY != 0, - syncDir: isUnix && isCreate && isJournl, - delete: !isUnix && flags&OPEN_DELETEONCLOSE != 0, - shm: NewSharedMemory(name.String()+"-shm", flags), + File: f, + flags: flags | _FLAG_PSOW, + shm: NewSharedMemory(name.String()+"-shm", flags), + } + if osBatchAtomic(f) { + file.flags |= _FLAG_ATOMIC + } + if isUnix && isCreate && isJournl { + file.flags |= _FLAG_SYNC_DIR } return &file, flags, nil } type vfsFile struct { *os.File - shm SharedMemory - lock LockLevel - readOnly bool - keepWAL bool - syncDir bool - atomic bool - delete bool - psow bool + shm SharedMemory + lock LockLevel + flags OpenFlag } var ( @@ -164,7 +161,7 @@ var ( ) func (f *vfsFile) Close() error { - if f.delete { + if !isUnix && f.flags&OPEN_DELETEONCLOSE != 0 { defer os.Remove(f.Name()) } if f.shm != nil { @@ -183,21 +180,18 @@ func (f *vfsFile) WriteAt(p []byte, off int64) (n int, err error) { } func (f *vfsFile) Sync(flags SyncFlag) error { - dataonly := (flags & SYNC_DATAONLY) != 0 - fullsync := (flags & 0x0f) == SYNC_FULL - - err := osSync(f.File, fullsync, dataonly) + err := osSync(f.File, f.flags, flags) if err != nil { return err } - if isUnix && f.syncDir { - f.syncDir = false + if isUnix && f.flags&_FLAG_SYNC_DIR != 0 { + f.flags ^= _FLAG_SYNC_DIR d, err := os.Open(filepath.Dir(f.File.Name())) if err != nil { return nil } defer d.Close() - err = osSync(d, false, false) + err = osSync(f.File, f.flags, flags) if err != nil { return _IOERR_DIR_FSYNC } @@ -215,10 +209,10 @@ func (f *vfsFile) SectorSize() int { func (f *vfsFile) DeviceCharacteristics() DeviceCharacteristic { ret := IOCAP_SUBPAGE_READ - if f.atomic { + if f.flags&_FLAG_ATOMIC != 0 { ret |= IOCAP_BATCH_ATOMIC } - if f.psow { + if f.flags&_FLAG_PSOW != 0 { ret |= IOCAP_POWERSAFE_OVERWRITE } if runtime.GOOS == "windows" { @@ -249,8 +243,20 @@ func (f *vfsFile) HasMoved() (bool, error) { return !os.SameFile(fi, pi), nil } -func (f *vfsFile) LockState() LockLevel { return f.lock } -func (f *vfsFile) PowersafeOverwrite() bool { return f.psow } -func (f *vfsFile) PersistWAL() bool { return f.keepWAL } -func (f *vfsFile) SetPowersafeOverwrite(psow bool) { f.psow = psow } -func (f *vfsFile) SetPersistWAL(keepWAL bool) { f.keepWAL = keepWAL } +func (f *vfsFile) LockState() LockLevel { return f.lock } +func (f *vfsFile) PowersafeOverwrite() bool { return f.flags&_FLAG_PSOW != 0 } +func (f *vfsFile) PersistWAL() bool { return f.flags&_FLAG_KEEP_WAL != 0 } + +func (f *vfsFile) SetPowersafeOverwrite(psow bool) { + f.flags &^= _FLAG_PSOW + if psow { + f.flags |= _FLAG_PSOW + } +} + +func (f *vfsFile) SetPersistWAL(keepWAL bool) { + f.flags &^= _FLAG_KEEP_WAL + if keepWAL { + f.flags |= _FLAG_KEEP_WAL + } +} diff --git a/vendor/github.com/ncruces/go-sqlite3/vfs/lock.go b/vendor/github.com/ncruces/go-sqlite3/vfs/lock.go index b28d83230..253057aea 100644 --- a/vendor/github.com/ncruces/go-sqlite3/vfs/lock.go +++ b/vendor/github.com/ncruces/go-sqlite3/vfs/lock.go @@ -41,7 +41,7 @@ func (f *vfsFile) Lock(lock LockLevel) error { } // Do not allow any kind of write-lock on a read-only database. - if f.readOnly && lock >= LOCK_RESERVED { + if lock >= LOCK_RESERVED && f.flags&OPEN_READONLY != 0 { return _IOERR_LOCK } diff --git a/vendor/github.com/ncruces/go-sqlite3/vfs/memdb/README.md b/vendor/github.com/ncruces/go-sqlite3/vfs/memdb/README.md index 2e2611bf8..e37db1be6 100644 --- a/vendor/github.com/ncruces/go-sqlite3/vfs/memdb/README.md +++ b/vendor/github.com/ncruces/go-sqlite3/vfs/memdb/README.md @@ -6,4 +6,7 @@ SQLite VFS in pure Go. It has some benefits over the C version: - the memory backing the database needs not be contiguous, - the database can grow/shrink incrementally without copying, -- reader-writer concurrency is slightly improved. \ No newline at end of file +- reader-writer concurrency is slightly improved. + +[`memdb.TestDB`](https://pkg.go.dev/github.com/ncruces/go-sqlite3/vfs/memdb#TestDB) +is the preferred way to setup an in-memory database for testing. \ No newline at end of file diff --git a/vendor/github.com/ncruces/go-sqlite3/vfs/memdb/api.go b/vendor/github.com/ncruces/go-sqlite3/vfs/memdb/api.go index eb12eba09..a12819855 100644 --- a/vendor/github.com/ncruces/go-sqlite3/vfs/memdb/api.go +++ b/vendor/github.com/ncruces/go-sqlite3/vfs/memdb/api.go @@ -10,6 +10,7 @@ package memdb import ( + "crypto/rand" "fmt" "net/url" "sync" @@ -74,11 +75,27 @@ func Delete(name string) { // TestDB creates an empty shared memory database for the test to use. // The database is automatically deleted when the test and all its subtests complete. +// Returns a URI filename appropriate to call Open with. // Each subsequent call to TestDB returns a unique database. +// +// func Test_something(t *testing.T) { +// t.Parallel() +// dsn := memdb.TestDB(t, url.Values{ +// "_pragma": {"busy_timeout(1000)"}, +// }) +// +// db, err := sql.Open("sqlite3", dsn) +// if err != nil { +// t.Fatal(err) +// } +// defer db.Close() +// +// // ... +// } func TestDB(tb testing.TB, params ...url.Values) string { tb.Helper() - name := fmt.Sprintf("%s_%p", tb.Name(), tb) + name := fmt.Sprintf("%s_%s", tb.Name(), rand.Text()) tb.Cleanup(func() { Delete(name) }) Create(name, nil) diff --git a/vendor/github.com/ncruces/go-sqlite3/vfs/os_darwin.go b/vendor/github.com/ncruces/go-sqlite3/vfs/os_darwin.go index ee08e9a7b..9bb8b559c 100644 --- a/vendor/github.com/ncruces/go-sqlite3/vfs/os_darwin.go +++ b/vendor/github.com/ncruces/go-sqlite3/vfs/os_darwin.go @@ -23,12 +23,26 @@ type flocktimeout_t struct { timeout unix.Timespec } -func osSync(file *os.File, fullsync, _ /*dataonly*/ bool) error { - if fullsync { - return file.Sync() +func osSync(file *os.File, open OpenFlag, sync SyncFlag) error { + var cmd int + if sync&SYNC_FULL == SYNC_FULL { + // For rollback journals all we really need is a barrier. + if open&OPEN_MAIN_JOURNAL != 0 { + cmd = unix.F_BARRIERFSYNC + } else { + cmd = unix.F_FULLFSYNC + } } + + fd := file.Fd() for { - err := unix.Fsync(int(file.Fd())) + err := error(unix.ENOTSUP) + if cmd != 0 { + _, err = unix.FcntlInt(fd, cmd, 0) + } + if err == unix.ENOTSUP { + err = unix.Fsync(int(fd)) + } if err != unix.EINTR { return err } diff --git a/vendor/github.com/ncruces/go-sqlite3/vfs/os_linux.go b/vendor/github.com/ncruces/go-sqlite3/vfs/os_linux.go index d112c5a99..893f1512c 100644 --- a/vendor/github.com/ncruces/go-sqlite3/vfs/os_linux.go +++ b/vendor/github.com/ncruces/go-sqlite3/vfs/os_linux.go @@ -10,7 +10,7 @@ import ( "golang.org/x/sys/unix" ) -func osSync(file *os.File, _ /*fullsync*/, _ /*dataonly*/ bool) error { +func osSync(file *os.File, _ OpenFlag, _ SyncFlag) error { // SQLite trusts Linux's fdatasync for all fsync's. for { err := unix.Fdatasync(int(file.Fd())) diff --git a/vendor/github.com/ncruces/go-sqlite3/vfs/os_std_sync.go b/vendor/github.com/ncruces/go-sqlite3/vfs/os_std_sync.go index b32e83e08..87427d9ed 100644 --- a/vendor/github.com/ncruces/go-sqlite3/vfs/os_std_sync.go +++ b/vendor/github.com/ncruces/go-sqlite3/vfs/os_std_sync.go @@ -4,6 +4,6 @@ package vfs import "os" -func osSync(file *os.File, _ /*fullsync*/, _ /*dataonly*/ bool) error { +func osSync(file *os.File, _ OpenFlag, _ SyncFlag) error { return file.Sync() } diff --git a/vendor/modules.txt b/vendor/modules.txt index 7d2b53ad4..10c10e1c2 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -271,11 +271,11 @@ codeberg.org/gruf/go-mangler/v2 # codeberg.org/gruf/go-maps v1.0.4 ## explicit; go 1.20 codeberg.org/gruf/go-maps -# codeberg.org/gruf/go-mempool v0.0.0-20240507125005-cef10d64a760 -## explicit; go 1.22.2 +# codeberg.org/gruf/go-mempool v0.0.0-20251003110531-b54adae66253 +## explicit; go 1.24.0 codeberg.org/gruf/go-mempool -# codeberg.org/gruf/go-mutexes v1.5.3 -## explicit; go 1.22.2 +# codeberg.org/gruf/go-mutexes v1.5.8 +## explicit; go 1.24.0 codeberg.org/gruf/go-mutexes # codeberg.org/gruf/go-runners v1.6.3 ## explicit; go 1.19 @@ -293,7 +293,7 @@ codeberg.org/gruf/go-storage/disk codeberg.org/gruf/go-storage/internal codeberg.org/gruf/go-storage/memory codeberg.org/gruf/go-storage/s3 -# codeberg.org/gruf/go-structr v0.9.9 +# codeberg.org/gruf/go-structr v0.9.12 ## explicit; go 1.24.5 codeberg.org/gruf/go-structr # codeberg.org/gruf/go-xunsafe v0.0.0-20250809104800-512a9df57d73 @@ -727,7 +727,7 @@ github.com/modern-go/reflect2 # github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 ## explicit github.com/munnerz/goautoneg -# github.com/ncruces/go-sqlite3 v0.29.0 +# github.com/ncruces/go-sqlite3 v0.29.1 ## explicit; go 1.24.0 github.com/ncruces/go-sqlite3 github.com/ncruces/go-sqlite3/driver From 57cb4fe7482962aa8e5a05874a343474d5a453e7 Mon Sep 17 00:00:00 2001 From: kim Date: Fri, 3 Oct 2025 15:50:57 +0200 Subject: [PATCH 3/3] [bugfix] status refresh race condition causing double edit notifications (#4470) # Description fixes possible race condition of existing status being out-of-date in enrichStatus() ## Checklist - [x] I/we have read the [GoToSocial contribution guidelines](https://codeberg.org/superseriousbusiness/gotosocial/src/branch/main/CONTRIBUTING.md). - [x] I/we have discussed the proposed changes already, either in an issue on the repository, or in the Matrix chat. - [x] I/we have not leveraged AI to create the proposed changes. - [x] I/we have performed a self-review of added code. - [x] I/we have written code that is legible and maintainable by others. - [x] I/we have commented the added code, particularly in hard-to-understand areas. - [ ] I/we have made any necessary changes to documentation. - [x] I/we have added tests that cover new code. - [x] I/we have run tests and they pass locally with the changes. - [x] I/we have run `go fmt ./...` and `golangci-lint run`. Reviewed-on: https://codeberg.org/superseriousbusiness/gotosocial/pulls/4470 Co-authored-by: kim Co-committed-by: kim --- internal/federation/dereferencing/status.go | 39 ++++--- .../federation/dereferencing/status_test.go | 103 +++++++++++++++++- 2 files changed, 120 insertions(+), 22 deletions(-) diff --git a/internal/federation/dereferencing/status.go b/internal/federation/dereferencing/status.go index 0e86dca7c..fffaa88a6 100644 --- a/internal/federation/dereferencing/status.go +++ b/internal/federation/dereferencing/status.go @@ -277,18 +277,6 @@ func (d *Dereferencer) enrichStatusSafely( ) (*gtsmodel.Status, ap.Statusable, bool, error) { uriStr := status.URI - var isNew bool - - // Check if this is a new status (to us). - if isNew = (status.ID == ""); !isNew { - - // This is an existing status, first try to populate it. This - // is required by the checks below for existing tags, media etc. - if err := d.state.DB.PopulateStatus(ctx, status); err != nil { - log.Errorf(ctx, "error populating existing status %s: %v", uriStr, err) - } - } - // Acquire per-URI deref lock, wraping unlock // to safely defer in case of panic, while still // performing more granular unlocks when needed. @@ -296,6 +284,23 @@ func (d *Dereferencer) enrichStatusSafely( unlock = util.DoOnce(unlock) defer unlock() + var err error + var isNew bool + + // Check if this is a new status (to us). + if isNew = (status.ID == ""); !isNew { + + // We reload the existing status, just to ensure we have the + // latest version of it. e.g. another racing thread might have + // just input a change but we still have an old status copy. + // + // Note: returned status will be fully populated, required below. + status, err = d.state.DB.GetStatusByID(ctx, status.ID) + if err != nil { + return nil, nil, false, gtserror.Newf("error getting up-to-date existing status: %w", err) + } + } + // Perform status enrichment with passed vars. latest, statusable, err := d.enrichStatus(ctx, requestUser, @@ -479,12 +484,10 @@ func (d *Dereferencer) enrichStatus( // Ensure the final parsed status URI or URL matches // the input URI we fetched (or received) it as. - matches, err := util.URIMatches(uri, - append( - ap.GetURL(statusable), // status URL(s) - ap.GetJSONLDId(statusable), // status URI - )..., - ) + matches, err := util.URIMatches(uri, append( + ap.GetURL(statusable), // status URL(s) + ap.GetJSONLDId(statusable), // status URI + )...) if err != nil { return nil, nil, gtserror.Newf( "error checking dereferenced status uri %s: %w", diff --git a/internal/federation/dereferencing/status_test.go b/internal/federation/dereferencing/status_test.go index 62b38f188..189fa2f23 100644 --- a/internal/federation/dereferencing/status_test.go +++ b/internal/federation/dereferencing/status_test.go @@ -18,7 +18,6 @@ package dereferencing_test import ( - "context" "fmt" "testing" "time" @@ -237,9 +236,7 @@ func (suite *StatusTestSuite) TestDereferenceStatusWithNonMatchingURI() { } func (suite *StatusTestSuite) TestDereferencerRefreshStatusUpdated() { - // Create a new context for this test. - ctx, cncl := context.WithCancel(suite.T().Context()) - defer cncl() + ctx := suite.T().Context() // The local account we will be fetching statuses as. fetchingAccount := suite.testAccounts["local_account_1"] @@ -343,6 +340,104 @@ func (suite *StatusTestSuite) TestDereferencerRefreshStatusUpdated() { } } +func (suite *StatusTestSuite) TestDereferencerRefreshStatusRace() { + ctx := suite.T().Context() + + // The local account we will be fetching statuses as. + fetchingAccount := suite.testAccounts["local_account_1"] + + // The test status in question that we will be dereferencing from "remote". + testURIStr := "https://unknown-instance.com/users/brand_new_person/statuses/01FE4NTHKWW7THT67EF10EB839" + testURI := testrig.URLMustParse(testURIStr) + testStatusable := suite.client.TestRemoteStatuses[testURIStr] + + // Fetch the remote status first to load it into instance. + testStatus, statusable, err := suite.dereferencer.GetStatusByURI(ctx, + fetchingAccount.Username, + testURI, + ) + suite.NotNil(statusable) + suite.NoError(err) + + // Take a snapshot of current + // state of the test status. + beforeEdit := copyStatus(testStatus) + + // Edit the "remote" statusable obj. + suite.editStatusable(testStatusable, + "updated status content!", + "CW: edited status content", + beforeEdit.Language, // no change + *beforeEdit.Sensitive, // no change + beforeEdit.AttachmentIDs, // no change + getPollOptions(beforeEdit), // no change + getPollVotes(beforeEdit), // no change + time.Now(), + ) + + // Refresh with a given statusable to updated to edited copy. + afterEdit, statusable, err := suite.dereferencer.RefreshStatus(ctx, + fetchingAccount.Username, + testStatus, + testStatusable, + instantFreshness, + ) + suite.NotNil(statusable) + suite.NoError(err) + + // verify updated status details. + suite.verifyEditedStatusUpdate( + + // the original status + // before any changes. + beforeEdit, + + // latest status + // being tested. + afterEdit, + + // expected current state. + >smodel.StatusEdit{ + Content: "updated status content!", + ContentWarning: "CW: edited status content", + Language: beforeEdit.Language, + Sensitive: beforeEdit.Sensitive, + AttachmentIDs: beforeEdit.AttachmentIDs, + PollOptions: getPollOptions(beforeEdit), + PollVotes: getPollVotes(beforeEdit), + // createdAt never changes + }, + + // expected historic edit. + >smodel.StatusEdit{ + Content: beforeEdit.Content, + ContentWarning: beforeEdit.ContentWarning, + Language: beforeEdit.Language, + Sensitive: beforeEdit.Sensitive, + AttachmentIDs: beforeEdit.AttachmentIDs, + PollOptions: getPollOptions(beforeEdit), + PollVotes: getPollVotes(beforeEdit), + CreatedAt: beforeEdit.UpdatedAt(), + }, + ) + + // Now make another attempt to refresh, using the old copy of the + // status. This should still successfully update based on our passed + // freshness window, but it *should* refetch the provided status to + // check for race shenanigans and realize that no edit has occurred. + afterBodge, statusable, err := suite.dereferencer.RefreshStatus(ctx, + fetchingAccount.Username, + beforeEdit, + testStatusable, + instantFreshness, + ) + suite.NotNil(statusable) + suite.NoError(err) + + // Check that no further edit occurred on status. + suite.Equal(afterEdit.EditIDs, afterBodge.EditIDs) +} + // editStatusable updates the given statusable attributes. // note that this acts on the original object, no copying. func (suite *StatusTestSuite) editStatusable(