diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 611bea97c..10a05002c 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -33,6 +33,8 @@ These contribution guidelines were adapted from / inspired by those of Gitea (ht - [Federation](#federation) - [Updating Swagger docs](#updating-swagger-docs) - [CI/CD configuration](#ci-cd-configuration) +- [Other Useful Stuff](#other-useful-stuff) + - [Running migrations on a Postgres DB backup locally](#running-migrations-on-a-postgres-db-backup-locally) ## Introduction @@ -525,3 +527,38 @@ The `woodpecker` pipeline files are in the `.woodpecker` directory of this repos The Woodpecker instance for GoToSocial is [here](https://woodpecker.superseriousbusiness.org/repos/2). Documentation for Woodpecker is [here](https://woodpecker-ci.org/docs/intro). + +## Other Useful Stuff + +Various bits and bobs. + +### Running migrations on a Postgres DB backup locally + +It may be useful when testing or debugging migrations to be able to run them against a copy of a real instance's Postgres database locally. + +Basic steps for this: + +First dump the Postgres database on the remote machine, and copy the dump over to your development machine. + +Now create a local Postgres container and mount the dump into it with, for example: + +```bash +docker run -it --name postgres --network host -e POSTGRES_PASSWORD=postgres -v /path/to/db_dump:/db_dump postgres +``` + +In a separate terminal window, execute a command inside the running container to load the dump into the "postgres" database: + +```bash +docker exec -it --user postgres postgres psql -X -f /db_dump postgres +``` + +With the Postgres container still running, run GoToSocial and point it towards the container. Use the appropriate `GTS_HOST` (and `GTS_ACCOUNT_DOMAIN`) values for the instance you dumped: + + ```bash + GTS_HOST=example.org \ + GTS_DB_TYPE=postgres \ + GTS_DB_POSTGRES_CONNECTION_STRING=postgres://postgres:postgres@localhost:5432/postgres \ + ./gotosocial migrations run + ``` + +When you're done messing around, don't forget to remove any containers that you started up, and remove any lingering volumes with `docker volume prune`, else you might end up filling your disk with unused temporary volumes. diff --git a/README.md b/README.md index 7739f1ad2..605f654f6 100644 --- a/README.md +++ b/README.md @@ -328,7 +328,7 @@ This is the current status of support offered by GoToSocial for different platfo Notes on 64-bit CPU feature requirements: -- x86_64 requires the SSE4.1 instruction set. (CPUs manufactured after ~2010) +- x86_64 requires the [x86-64-v2](https://en.wikipedia.org/wiki/X86-64-v2) level instruction sets. (CPUs manufactured after ~2010) - ARM64 requires no specific features, ARMv8 CPUs (and later) have all required features. diff --git a/go.mod b/go.mod index 8accf4398..b1d744ffa 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( codeberg.org/gruf/go-errors/v2 v2.3.2 codeberg.org/gruf/go-fastcopy v1.1.3 codeberg.org/gruf/go-fastpath/v2 v2.0.0 - codeberg.org/gruf/go-ffmpreg v0.6.11 + codeberg.org/gruf/go-ffmpreg v0.6.12 codeberg.org/gruf/go-iotools v0.0.0-20240710125620-934ae9c654cf codeberg.org/gruf/go-kv/v2 v2.0.7 codeberg.org/gruf/go-list v0.0.0-20240425093752-494db03d641f diff --git a/go.sum b/go.sum index 6b9b97d00..507d2d38f 100644 --- a/go.sum +++ b/go.sum @@ -26,8 +26,8 @@ codeberg.org/gruf/go-fastcopy v1.1.3 h1:Jo9VTQjI6KYimlw25PPc7YLA3Xm+XMQhaHwKnM7x codeberg.org/gruf/go-fastcopy v1.1.3/go.mod h1:GDDYR0Cnb3U/AIfGM3983V/L+GN+vuwVMvrmVABo21s= codeberg.org/gruf/go-fastpath/v2 v2.0.0 h1:iAS9GZahFhyWEH0KLhFEJR+txx1ZhMXxYzu2q5Qo9c0= codeberg.org/gruf/go-fastpath/v2 v2.0.0/go.mod h1:3pPqu5nZjpbRrOqvLyAK7puS1OfEtQvjd6342Cwz56Q= -codeberg.org/gruf/go-ffmpreg v0.6.11 h1:+lvB5Loy0KUAKfv6nOZRWHFVgN08cpHhUlYcZxL8M20= -codeberg.org/gruf/go-ffmpreg v0.6.11/go.mod h1:tGqIMh/I2cizqauxxNAN+WGkICI0j5G3xwF1uBkyw1E= +codeberg.org/gruf/go-ffmpreg v0.6.12 h1:mPdRx1TAQJQPhRkTOOHnRSY6omNCLJ7M6ajjuEMNNvE= +codeberg.org/gruf/go-ffmpreg v0.6.12/go.mod h1:tGqIMh/I2cizqauxxNAN+WGkICI0j5G3xwF1uBkyw1E= codeberg.org/gruf/go-iotools v0.0.0-20240710125620-934ae9c654cf h1:84s/ii8N6lYlskZjHH+DG6jyia8w2mXMZlRwFn8Gs3A= codeberg.org/gruf/go-iotools v0.0.0-20240710125620-934ae9c654cf/go.mod h1:zZAICsp5rY7+hxnws2V0ePrWxE0Z2Z/KXcN3p/RQCfk= codeberg.org/gruf/go-kv v1.6.5 h1:ttPf0NA8F79pDqBttSudPTVCZmGncumeNIxmeM9ztz0= diff --git a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go index fc02d1e40..9115dfe90 100644 --- a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go +++ b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go @@ -24,13 +24,15 @@ import ( "reflect" "slices" "strings" + "time" - "code.superseriousbusiness.org/gotosocial/internal/db" newmodel "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new" oldmodel "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/old" + "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/util" "code.superseriousbusiness.org/gotosocial/internal/gtserror" "code.superseriousbusiness.org/gotosocial/internal/id" "code.superseriousbusiness.org/gotosocial/internal/log" + "code.superseriousbusiness.org/gotosocial/internal/util/xslices" "github.com/uptrace/bun" ) @@ -44,135 +46,222 @@ func init() { return gtserror.Newf("error getting bun column def: %w", err) } - // Update column def to use '${name}_new'. + // Update column def to use temporary + // '${name}_new' while we migrate. newColDef = strings.Replace(newColDef, "thread_id", "thread_id_new", 1) - var sr statusRethreader - var count int - var maxID string - var statuses []*oldmodel.Status + // Create thread_id_new already + // so we can populate it as we go. + log.Info(ctx, "creating statuses column thread_id_new") + if _, err := db.NewAddColumn(). + Table("statuses"). + ColumnExpr(newColDef). + Exec(ctx); err != nil { + return gtserror.Newf("error adding statuses column thread_id_new: %w", err) + } - // Get a total count of all statuses before migration. - total, err := db.NewSelect().Table("statuses").Count(ctx) + if err := doWALCheckpoint(ctx, db); err != nil { + return err + } + + // Get a total count of all + // statuses before migration. + totalStatuses, err := db. + NewSelect(). + Table("statuses"). + Count(ctx) if err != nil { return gtserror.Newf("error getting status table count: %w", err) } + log.Warnf(ctx, "migrating %d statuses total, this may take a *long* time", totalStatuses) - // Start at largest + var sr statusRethreader + var updatedRowsTotal int64 + var statuses []*oldmodel.Status + + // Page starting at largest // possible ULID value. - maxID = id.Highest + var maxID = id.Highest - log.Warn(ctx, "rethreading top-level statuses, this will take a *long* time") - for /* TOP LEVEL STATUS LOOP */ { + // Open initial transaction. + tx, err := db.BeginTx(ctx, nil) + if err != nil { + return err + } + + for i := 1; ; i++ { // Reset slice. clear(statuses) statuses = statuses[:0] - // Select top-level statuses. - if err := db.NewSelect(). - Model(&statuses). - Column("id", "thread_id"). + batchStart := time.Now() - // We specifically use in_reply_to_account_id instead of in_reply_to_id as - // they should both be set / unset in unison, but we specifically have an - // index on in_reply_to_account_id with ID ordering, unlike in_reply_to_id. - Where("? IS NULL", bun.Ident("in_reply_to_account_id")). + // Select IDs of next + // batch, paging down. + if err := tx.NewSelect(). + Model(&statuses). + Column("id"). + Where("? IS NULL", bun.Ident("in_reply_to_id")). Where("? < ?", bun.Ident("id"), maxID). OrderExpr("? DESC", bun.Ident("id")). - Limit(5000). + Limit(500). Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) { - return gtserror.Newf("error selecting top level statuses: %w", err) + return gtserror.Newf("error selecting statuses: %w", err) } - // Reached end of block. - if len(statuses) == 0 { + l := len(statuses) + if l == 0 { + // No more statuses! + // + // Transaction will be closed + // after leaving the loop. break - } - // Set next maxID value from statuses. - maxID = statuses[len(statuses)-1].ID + } else if i%100 == 0 { + // Begin a new transaction every + // 100 batches (~50000 statuses), + // to avoid massive commits. - // Rethread each selected batch of top-level statuses in a transaction. - if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { - - // Rethread each top-level status. - for _, status := range statuses { - n, err := sr.rethreadStatus(ctx, tx, status) - if err != nil { - return gtserror.Newf("error rethreading status %s: %w", status.URI, err) - } - count += n + // Close existing transaction. + if err := tx.Commit(); err != nil { + return err } - return nil - }); err != nil { - return err + // Try to flush the wal + // to avoid silly wal sizes. + if err := doWALCheckpoint(ctx, db); err != nil { + return err + } + + // Open new transaction. + tx, err = db.BeginTx(ctx, nil) + if err != nil { + return err + } } - log.Infof(ctx, "[approx %d of %d] rethreading statuses (top-level)", count, total) + // Set next maxID + // value from statuses. + maxID = statuses[l-1].ID + + // Rethread using the + // open transaction. + var updatedRowsThisBatch int64 + for _, status := range statuses { + n, err := sr.rethreadStatus(ctx, tx, status, false) + if err != nil { + return gtserror.Newf("error rethreading status %s: %w", status.URI, err) + } + updatedRowsThisBatch += n + updatedRowsTotal += n + } + + // Show speed for this batch. + timeTaken := time.Since(batchStart).Milliseconds() + msPerRow := float64(timeTaken) / float64(updatedRowsThisBatch) + rowsPerMs := float64(1) / float64(msPerRow) + rowsPerSecond := 1000 * rowsPerMs + + // Show percent migrated overall. + totalDone := (float64(updatedRowsTotal) / float64(totalStatuses)) * 100 + + log.Infof( + ctx, + "[~%.2f%% done; ~%.0f rows/s] migrating threads", + totalDone, rowsPerSecond, + ) } - // Attempt to merge any sqlite write-ahead-log. - if err := doWALCheckpoint(ctx, db); err != nil { + // Close transaction. + if err := tx.Commit(); err != nil { return err } - log.Warn(ctx, "rethreading straggler statuses, this will take a *long* time") - for /* STRAGGLER STATUS LOOP */ { + // Create a partial index on thread_id_new to find stragglers. + // This index will be removed at the end of the migration. + log.Info(ctx, "creating temporary statuses thread_id_new index") + if _, err := db.NewCreateIndex(). + Table("statuses"). + Index("statuses_thread_id_new_idx"). + Column("thread_id_new"). + Where("? = ?", bun.Ident("thread_id_new"), id.Lowest). + Exec(ctx); err != nil { + return gtserror.Newf("error creating new thread_id index: %w", err) + } + + for i := 1; ; i++ { // Reset slice. clear(statuses) statuses = statuses[:0] - // Select straggler statuses. + batchStart := time.Now() + + // Get stragglers for which + // we haven't set thread ID yet. if err := db.NewSelect(). Model(&statuses). - Column("id", "in_reply_to_id", "thread_id"). - Where("? IS NULL", bun.Ident("thread_id")). - - // We select in smaller batches for this part - // of the migration as there is a chance that - // we may be fetching statuses that might be - // part of the same thread, i.e. one call to - // rethreadStatus() may effect other statuses - // later in the slice. - Limit(1000). + Column("id"). + Where("? = ?", bun.Ident("thread_id_new"), id.Lowest). + Limit(250). Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) { - return gtserror.Newf("error selecting straggler statuses: %w", err) + return gtserror.Newf("error selecting straggler: %w", err) } - // Reached end of block. if len(statuses) == 0 { + // No more + // statuses! break } - // Rethread each selected batch of straggler statuses in a transaction. + // Update this batch + // inside a transaction. + var updatedRowsThisBatch int64 if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { - - // Rethread each top-level status. for _, status := range statuses { - n, err := sr.rethreadStatus(ctx, tx, status) + + n, err := sr.rethreadStatus(ctx, tx, status, true) if err != nil { return gtserror.Newf("error rethreading status %s: %w", status.URI, err) } - count += n + updatedRowsThisBatch += n + updatedRowsTotal += n } - return nil }); err != nil { return err } - log.Infof(ctx, "[approx %d of %d] rethreading statuses (stragglers)", count, total) + // Show speed for this batch. + timeTaken := time.Since(batchStart).Milliseconds() + msPerRow := float64(timeTaken) / float64(updatedRowsThisBatch) + rowsPerMs := float64(1) / float64(msPerRow) + rowsPerSecond := 1000 * rowsPerMs + + // Show percent migrated overall. + totalDone := (float64(updatedRowsTotal) / float64(totalStatuses)) * 100 + + log.Infof( + ctx, + "[~%.2f%% done; ~%.0f rows/s] migrating stragglers", + totalDone, rowsPerSecond, + ) } - // Attempt to merge any sqlite write-ahead-log. + // Try to merge everything we've done so far. if err := doWALCheckpoint(ctx, db); err != nil { return err } + log.Info(ctx, "dropping temporary thread_id_new index") + if _, err := db.NewDropIndex(). + Index("statuses_thread_id_new_idx"). + Exec(ctx); err != nil { + return gtserror.Newf("error dropping temporary thread_id_new index: %w", err) + } + log.Info(ctx, "dropping old thread_to_statuses table") if _, err := db.NewDropTable(). Table("thread_to_statuses"). @@ -180,33 +269,6 @@ func init() { return gtserror.Newf("error dropping old thread_to_statuses table: %w", err) } - log.Info(ctx, "creating new statuses thread_id column") - if _, err := db.NewAddColumn(). - Table("statuses"). - ColumnExpr(newColDef). - Exec(ctx); err != nil { - return gtserror.Newf("error adding new thread_id column: %w", err) - } - - log.Info(ctx, "setting thread_id_new = thread_id (this may take a while...)") - if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { - return batchUpdateByID(ctx, tx, - "statuses", // table - "id", // batchByCol - "UPDATE ? SET ? = ?", // updateQuery - []any{bun.Ident("statuses"), - bun.Ident("thread_id_new"), - bun.Ident("thread_id")}, - ) - }); err != nil { - return err - } - - // Attempt to merge any sqlite write-ahead-log. - if err := doWALCheckpoint(ctx, db); err != nil { - return err - } - log.Info(ctx, "dropping old statuses thread_id index") if _, err := db.NewDropIndex(). Index("statuses_thread_id_idx"). @@ -289,8 +351,8 @@ type statusRethreader struct { } // rethreadStatus is the main logic handler for statusRethreader{}. this is what gets called from the migration -// in order to trigger a status rethreading operation for the given status, returning total number rethreaded. -func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, status *oldmodel.Status) (int, error) { +// in order to trigger a status rethreading operation for the given status, returning total number of rows changed. +func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, status *oldmodel.Status, straggler bool) (int64, error) { // Zero slice and // map ptr values. @@ -312,17 +374,33 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu // Ensure the passed status // has up-to-date information. - // This may have changed from - // the initial batch selection - // to the rethreadStatus() call. + upToDateValues := make(map[string]any, 3) if err := tx.NewSelect(). - Model(status). - Column("in_reply_to_id", "thread_id"). + TableExpr("? AS ?", bun.Ident("statuses"), bun.Ident("status")). + Column("in_reply_to_id", "thread_id", "thread_id_new"). Where("? = ?", bun.Ident("id"), status.ID). - Scan(ctx); err != nil { + Scan(ctx, &upToDateValues); err != nil { return 0, gtserror.Newf("error selecting status: %w", err) } + // If we've just threaded this status by setting + // thread_id_new, then by definition anything we + // could find from the entire thread must now be + // threaded, so we can save some database calls + // by skipping iterating up + down from here. + if v, ok := upToDateValues["thread_id_new"]; ok && v.(string) != id.Lowest { + log.Debug(ctx, "skipping just rethreaded status") + return 0, nil + } + + // Set up-to-date values on the status. + if v, ok := upToDateValues["in_reply_to_id"]; ok && v != nil { + status.InReplyToID = v.(string) + } + if v, ok := upToDateValues["thread_id"]; ok && v != nil { + status.ThreadID = v.(string) + } + // status and thread ID cursor // index values. these are used // to keep track of newly loaded @@ -371,14 +449,14 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu threadIdx = len(sr.threadIDs) } - // Total number of - // statuses threaded. - total := len(sr.statusIDs) - // Check for the case where the entire // batch of statuses is already correctly // threaded. Then we have nothing to do! - if sr.allThreaded && len(sr.threadIDs) == 1 { + // + // Skip this check for straggler statuses + // that are part of broken threads. + if !straggler && sr.allThreaded && len(sr.threadIDs) == 1 { + log.Debug(ctx, "skipping just rethreaded thread") return 0, nil } @@ -417,29 +495,61 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu } } - // Update all the statuses to - // use determined thread_id. - if _, err := tx.NewUpdate(). - Table("statuses"). - Where("? IN (?)", bun.Ident("id"), bun.In(sr.statusIDs)). - Set("? = ?", bun.Ident("thread_id"), threadID). - Exec(ctx); err != nil { + // Use a bulk update to update all the + // statuses to use determined thread_id. + // + // https://bun.uptrace.dev/guide/query-update.html#bulk-update + values := make([]*util.Status, 0, len(sr.statusIDs)) + for _, statusID := range sr.statusIDs { + values = append(values, &util.Status{ + ID: statusID, + ThreadIDNew: threadID, + }) + } + + res, err := tx.NewUpdate(). + With("_data", tx.NewValues(&values)). + Model((*util.Status)(nil)). + TableExpr("_data"). + // Set the new thread ID, which we can use as + // an indication that we've migrated this batch. + Set("? = ?", bun.Ident("thread_id_new"), bun.Ident("_data.thread_id_new")). + // While we're here, also set old thread_id, as + // we'll use it for further rethreading purposes. + Set("? = ?", bun.Ident("thread_id"), bun.Ident("_data.thread_id_new")). + // "Join" on status ID. + Where("? = ?", bun.Ident("status.id"), bun.Ident("_data.id")). + // To avoid spurious writes, + // only update unmigrated statuses. + Where("? = ?", bun.Ident("status.thread_id_new"), id.Lowest). + Exec(ctx) + if err != nil { return 0, gtserror.Newf("error updating status thread ids: %w", err) } + rowsAffected, err := res.RowsAffected() + if err != nil { + return 0, gtserror.Newf("error counting rows affected: %w", err) + } + if len(sr.threadIDs) > 0 { // Update any existing thread // mutes to use latest thread_id. + + // Dedupe thread IDs before query + // to avoid ludicrous "IN" clause. + threadIDs := sr.threadIDs + threadIDs = xslices.Deduplicate(threadIDs) if _, err := tx.NewUpdate(). Table("thread_mutes"). - Where("? IN (?)", bun.Ident("thread_id"), bun.In(sr.threadIDs)). + Where("? IN (?)", bun.Ident("thread_id"), bun.In(threadIDs)). Set("? = ?", bun.Ident("thread_id"), threadID). Exec(ctx); err != nil { return 0, gtserror.Newf("error updating mute thread ids: %w", err) } } - return total, nil + return rowsAffected, nil } // append will append the given status to the internal tracking of statusRethreader{} for @@ -498,7 +608,7 @@ func (sr *statusRethreader) getParents(ctx context.Context, tx bun.Tx) error { Model(&parent). Column("id", "in_reply_to_id", "thread_id"). Where("? = ?", bun.Ident("id"), id). - Scan(ctx); err != nil && err != db.ErrNoEntries { + Scan(ctx); err != nil && err != sql.ErrNoRows { return err } @@ -537,7 +647,7 @@ func (sr *statusRethreader) getChildren(ctx context.Context, tx bun.Tx, idx int) Model(&sr.statuses). Column("id", "thread_id"). Where("? = ?", bun.Ident("in_reply_to_id"), id). - Scan(ctx); err != nil && err != db.ErrNoEntries { + Scan(ctx); err != nil && err != sql.ErrNoRows { return err } @@ -560,6 +670,11 @@ func (sr *statusRethreader) getStragglers(ctx context.Context, tx bun.Tx, idx in clear(sr.statuses) sr.statuses = sr.statuses[:0] + // Dedupe thread IDs before query + // to avoid ludicrous "IN" clause. + threadIDs := sr.threadIDs[idx:] + threadIDs = xslices.Deduplicate(threadIDs) + // Select stragglers that // also have thread IDs. if err := tx.NewSelect(). @@ -567,11 +682,11 @@ func (sr *statusRethreader) getStragglers(ctx context.Context, tx bun.Tx, idx in Column("id", "thread_id", "in_reply_to_id"). Where("? IN (?) AND ? NOT IN (?)", bun.Ident("thread_id"), - bun.In(sr.threadIDs[idx:]), + bun.In(threadIDs), bun.Ident("id"), bun.In(sr.statusIDs), ). - Scan(ctx); err != nil && err != db.ErrNoEntries { + Scan(ctx); err != nil && err != sql.ErrNoRows { return err } diff --git a/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new/status.go b/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new/status.go index a03e93859..0f20bb7a4 100644 --- a/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new/status.go +++ b/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new/status.go @@ -23,45 +23,45 @@ import ( // Status represents a user-created 'post' or 'status' in the database, either remote or local type Status struct { - ID string `bun:"type:CHAR(26),pk,nullzero,notnull,unique"` // id of this item in the database - CreatedAt time.Time `bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"` // when was item created - EditedAt time.Time `bun:"type:timestamptz,nullzero"` // when this status was last edited (if set) - FetchedAt time.Time `bun:"type:timestamptz,nullzero"` // when was item (remote) last fetched. - PinnedAt time.Time `bun:"type:timestamptz,nullzero"` // Status was pinned by owning account at this time. - URI string `bun:",unique,nullzero,notnull"` // activitypub URI of this status - URL string `bun:",nullzero"` // web url for viewing this status - Content string `bun:""` // Content HTML for this status. - AttachmentIDs []string `bun:"attachments,array"` // Database IDs of any media attachments associated with this status - TagIDs []string `bun:"tags,array"` // Database IDs of any tags used in this status - MentionIDs []string `bun:"mentions,array"` // Database IDs of any mentions in this status - EmojiIDs []string `bun:"emojis,array"` // Database IDs of any emojis used in this status - Local *bool `bun:",nullzero,notnull,default:false"` // is this status from a local account? - AccountID string `bun:"type:CHAR(26),nullzero,notnull"` // which account posted this status? - AccountURI string `bun:",nullzero,notnull"` // activitypub uri of the owner of this status - InReplyToID string `bun:"type:CHAR(26),nullzero"` // id of the status this status replies to - InReplyToURI string `bun:",nullzero"` // activitypub uri of the status this status is a reply to - InReplyToAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that this status replies to - InReplyTo *Status `bun:"-"` // status corresponding to inReplyToID - BoostOfID string `bun:"type:CHAR(26),nullzero"` // id of the status this status is a boost of - BoostOfURI string `bun:"-"` // URI of the status this status is a boost of; field not inserted in the db, just for dereferencing purposes. - BoostOfAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that owns the boosted status - BoostOf *Status `bun:"-"` // status that corresponds to boostOfID - ThreadID string `bun:"type:CHAR(26),nullzero,notnull,default:00000000000000000000000000"` // id of the thread to which this status belongs - EditIDs []string `bun:"edits,array"` // - PollID string `bun:"type:CHAR(26),nullzero"` // - ContentWarning string `bun:",nullzero"` // Content warning HTML for this status. - ContentWarningText string `bun:""` // Original text of the content warning without formatting - Visibility Visibility `bun:",nullzero,notnull"` // visibility entry for this status - Sensitive *bool `bun:",nullzero,notnull,default:false"` // mark the status as sensitive? - Language string `bun:",nullzero"` // what language is this status written in? - CreatedWithApplicationID string `bun:"type:CHAR(26),nullzero"` // Which application was used to create this status? - ActivityStreamsType string `bun:",nullzero,notnull"` // What is the activitystreams type of this status? See: https://www.w3.org/TR/activitystreams-vocabulary/#object-types. Will probably almost always be Note but who knows!. - Text string `bun:""` // Original text of the status without formatting - ContentType StatusContentType `bun:",nullzero"` // Content type used to process the original text of the status - Federated *bool `bun:",notnull"` // This status will be federated beyond the local timeline(s) - PendingApproval *bool `bun:",nullzero,notnull,default:false"` // If true then status is a reply or boost wrapper that must be Approved by the reply-ee or boost-ee before being fully distributed. - PreApproved bool `bun:"-"` // If true, then status is a reply to or boost wrapper of a status on our instance, has permission to do the interaction, and an Accept should be sent out for it immediately. Field not stored in the DB. - ApprovedByURI string `bun:",nullzero"` // URI of an Accept Activity that approves the Announce or Create Activity that this status was/will be attached to. + ID string `bun:"type:CHAR(26),pk,nullzero,notnull,unique"` // id of this item in the database + CreatedAt time.Time `bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"` // when was item created + EditedAt time.Time `bun:"type:timestamptz,nullzero"` // when this status was last edited (if set) + FetchedAt time.Time `bun:"type:timestamptz,nullzero"` // when was item (remote) last fetched. + PinnedAt time.Time `bun:"type:timestamptz,nullzero"` // Status was pinned by owning account at this time. + URI string `bun:",unique,nullzero,notnull"` // activitypub URI of this status + URL string `bun:",nullzero"` // web url for viewing this status + Content string `bun:""` // Content HTML for this status. + AttachmentIDs []string `bun:"attachments,array"` // Database IDs of any media attachments associated with this status + TagIDs []string `bun:"tags,array"` // Database IDs of any tags used in this status + MentionIDs []string `bun:"mentions,array"` // Database IDs of any mentions in this status + EmojiIDs []string `bun:"emojis,array"` // Database IDs of any emojis used in this status + Local *bool `bun:",nullzero,notnull,default:false"` // is this status from a local account? + AccountID string `bun:"type:CHAR(26),nullzero,notnull"` // which account posted this status? + AccountURI string `bun:",nullzero,notnull"` // activitypub uri of the owner of this status + InReplyToID string `bun:"type:CHAR(26),nullzero"` // id of the status this status replies to + InReplyToURI string `bun:",nullzero"` // activitypub uri of the status this status is a reply to + InReplyToAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that this status replies to + InReplyTo *Status `bun:"-"` // status corresponding to inReplyToID + BoostOfID string `bun:"type:CHAR(26),nullzero"` // id of the status this status is a boost of + BoostOfURI string `bun:"-"` // URI of the status this status is a boost of; field not inserted in the db, just for dereferencing purposes. + BoostOfAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that owns the boosted status + BoostOf *Status `bun:"-"` // status that corresponds to boostOfID + ThreadID string `bun:"type:CHAR(26),nullzero,notnull,default:'00000000000000000000000000'"` // id of the thread to which this status belongs + EditIDs []string `bun:"edits,array"` // + PollID string `bun:"type:CHAR(26),nullzero"` // + ContentWarning string `bun:",nullzero"` // Content warning HTML for this status. + ContentWarningText string `bun:""` // Original text of the content warning without formatting + Visibility Visibility `bun:",nullzero,notnull"` // visibility entry for this status + Sensitive *bool `bun:",nullzero,notnull,default:false"` // mark the status as sensitive? + Language string `bun:",nullzero"` // what language is this status written in? + CreatedWithApplicationID string `bun:"type:CHAR(26),nullzero"` // Which application was used to create this status? + ActivityStreamsType string `bun:",nullzero,notnull"` // What is the activitystreams type of this status? See: https://www.w3.org/TR/activitystreams-vocabulary/#object-types. Will probably almost always be Note but who knows!. + Text string `bun:""` // Original text of the status without formatting + ContentType StatusContentType `bun:",nullzero"` // Content type used to process the original text of the status + Federated *bool `bun:",notnull"` // This status will be federated beyond the local timeline(s) + PendingApproval *bool `bun:",nullzero,notnull,default:false"` // If true then status is a reply or boost wrapper that must be Approved by the reply-ee or boost-ee before being fully distributed. + PreApproved bool `bun:"-"` // If true, then status is a reply to or boost wrapper of a status on our instance, has permission to do the interaction, and an Accept should be sent out for it immediately. Field not stored in the DB. + ApprovedByURI string `bun:",nullzero"` // URI of an Accept Activity that approves the Announce or Create Activity that this status was/will be attached to. } // enumType is the type we (at least, should) use diff --git a/internal/db/bundb/migrations/20250415111056_thread_all_statuses/util/utiltypes.go b/internal/db/bundb/migrations/20250415111056_thread_all_statuses/util/utiltypes.go new file mode 100644 index 000000000..ed7256aad --- /dev/null +++ b/internal/db/bundb/migrations/20250415111056_thread_all_statuses/util/utiltypes.go @@ -0,0 +1,25 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package util + +// Status is a helper type specifically +// for updating the thread ID of a status. +type Status struct { + ID string `bun:"type:CHAR(26)"` + ThreadIDNew string `bun:"type:CHAR(26)"` +} diff --git a/internal/db/bundb/migrations/util.go b/internal/db/bundb/migrations/util.go index 4a3a62b21..b5543c824 100644 --- a/internal/db/bundb/migrations/util.go +++ b/internal/db/bundb/migrations/util.go @@ -66,98 +66,6 @@ func doWALCheckpoint(ctx context.Context, db *bun.DB) error { return nil } -// batchUpdateByID performs the given updateQuery with updateArgs -// over the entire given table, batching by the ID of batchByCol. -func batchUpdateByID( - ctx context.Context, - tx bun.Tx, - table string, - batchByCol string, - updateQuery string, - updateArgs []any, -) error { - // Get a count of all in table. - total, err := tx.NewSelect(). - Table(table). - Count(ctx) - if err != nil { - return gtserror.Newf("error selecting total count: %w", err) - } - - // Query batch size - // in number of rows. - const batchsz = 5000 - - // Stores highest batch value - // used in iterate queries, - // starting at highest possible. - highest := id.Highest - - // Total updated rows. - var updated int - - for { - // Limit to batchsz - // items at once. - batchQ := tx. - NewSelect(). - Table(table). - Column(batchByCol). - Where("? < ?", bun.Ident(batchByCol), highest). - OrderExpr("? DESC", bun.Ident(batchByCol)). - Limit(batchsz) - - // Finalize UPDATE to act only on batch. - qStr := updateQuery + " WHERE ? IN (?)" - args := append(slices.Clone(updateArgs), - bun.Ident(batchByCol), - batchQ, - ) - - // Execute the prepared raw query with arguments. - res, err := tx.NewRaw(qStr, args...).Exec(ctx) - if err != nil { - return gtserror.Newf("error updating old column values: %w", err) - } - - // Check how many items we updated. - thisUpdated, err := res.RowsAffected() - if err != nil { - return gtserror.Newf("error counting affected rows: %w", err) - } - - if thisUpdated == 0 { - // Nothing updated - // means we're done. - break - } - - // Update the overall count. - updated += int(thisUpdated) - - // Log helpful message to admin. - log.Infof(ctx, "migrated %d of %d %s (up to %s)", - updated, total, table, highest) - - // Get next highest - // id for next batch. - if err := tx. - NewSelect(). - With("batch_query", batchQ). - ColumnExpr("min(?) FROM ?", bun.Ident(batchByCol), bun.Ident("batch_query")). - Scan(ctx, &highest); err != nil { - return gtserror.Newf("error selecting next highest: %w", err) - } - } - - if total != int(updated) { - // Return error here in order to rollback the whole transaction. - return fmt.Errorf("total=%d does not match updated=%d", total, updated) - } - - return nil -} - // convertEnums performs a transaction that converts // a table's column of our old-style enums (strings) to // more performant and space-saving integer types. diff --git a/internal/gtsmodel/status.go b/internal/gtsmodel/status.go index 31e8fe881..0f0fb8404 100644 --- a/internal/gtsmodel/status.go +++ b/internal/gtsmodel/status.go @@ -27,56 +27,56 @@ import ( // Status represents a user-created 'post' or 'status' in the database, either remote or local type Status struct { - ID string `bun:"type:CHAR(26),pk,nullzero,notnull,unique"` // id of this item in the database - CreatedAt time.Time `bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"` // when was item created - EditedAt time.Time `bun:"type:timestamptz,nullzero"` // when this status was last edited (if set) - FetchedAt time.Time `bun:"type:timestamptz,nullzero"` // when was item (remote) last fetched. - PinnedAt time.Time `bun:"type:timestamptz,nullzero"` // Status was pinned by owning account at this time. - URI string `bun:",unique,nullzero,notnull"` // activitypub URI of this status - URL string `bun:",nullzero"` // web url for viewing this status - Content string `bun:""` // Content HTML for this status. - AttachmentIDs []string `bun:"attachments,array"` // Database IDs of any media attachments associated with this status - Attachments []*MediaAttachment `bun:"attached_media,rel:has-many"` // Attachments corresponding to attachmentIDs - TagIDs []string `bun:"tags,array"` // Database IDs of any tags used in this status - Tags []*Tag `bun:"attached_tags,m2m:status_to_tags"` // Tags corresponding to tagIDs. https://bun.uptrace.dev/guide/relations.html#many-to-many-relation - MentionIDs []string `bun:"mentions,array"` // Database IDs of any mentions in this status - Mentions []*Mention `bun:"attached_mentions,rel:has-many"` // Mentions corresponding to mentionIDs - EmojiIDs []string `bun:"emojis,array"` // Database IDs of any emojis used in this status - Emojis []*Emoji `bun:"attached_emojis,m2m:status_to_emojis"` // Emojis corresponding to emojiIDs. https://bun.uptrace.dev/guide/relations.html#many-to-many-relation - Local *bool `bun:",nullzero,notnull,default:false"` // is this status from a local account? - AccountID string `bun:"type:CHAR(26),nullzero,notnull"` // which account posted this status? - Account *Account `bun:"rel:belongs-to"` // account corresponding to accountID - AccountURI string `bun:",nullzero,notnull"` // activitypub uri of the owner of this status - InReplyToID string `bun:"type:CHAR(26),nullzero"` // id of the status this status replies to - InReplyToURI string `bun:",nullzero"` // activitypub uri of the status this status is a reply to - InReplyToAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that this status replies to - InReplyTo *Status `bun:"-"` // status corresponding to inReplyToID - InReplyToAccount *Account `bun:"rel:belongs-to"` // account corresponding to inReplyToAccountID - BoostOfID string `bun:"type:CHAR(26),nullzero"` // id of the status this status is a boost of - BoostOfURI string `bun:"-"` // URI of the status this status is a boost of; field not inserted in the db, just for dereferencing purposes. - BoostOfAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that owns the boosted status - BoostOf *Status `bun:"-"` // status that corresponds to boostOfID - BoostOfAccount *Account `bun:"rel:belongs-to"` // account that corresponds to boostOfAccountID - ThreadID string `bun:"type:CHAR(26),nullzero,notnull,default:00000000000000000000000000"` // id of the thread to which this status belongs - EditIDs []string `bun:"edits,array"` // IDs of status edits for this status, ordered from smallest (oldest) -> largest (newest) ID. - Edits []*StatusEdit `bun:"-"` // Edits of this status, ordered from oldest -> newest edit. - PollID string `bun:"type:CHAR(26),nullzero"` // - Poll *Poll `bun:"-"` // - ContentWarning string `bun:",nullzero"` // Content warning HTML for this status. - ContentWarningText string `bun:""` // Original text of the content warning without formatting - Visibility Visibility `bun:",nullzero,notnull"` // visibility entry for this status - Sensitive *bool `bun:",nullzero,notnull,default:false"` // mark the status as sensitive? - Language string `bun:",nullzero"` // what language is this status written in? - CreatedWithApplicationID string `bun:"type:CHAR(26),nullzero"` // Which application was used to create this status? - CreatedWithApplication *Application `bun:"rel:belongs-to"` // application corresponding to createdWithApplicationID - ActivityStreamsType string `bun:",nullzero,notnull"` // What is the activitystreams type of this status? See: https://www.w3.org/TR/activitystreams-vocabulary/#object-types. Will probably almost always be Note but who knows!. - Text string `bun:""` // Original text of the status without formatting - ContentType StatusContentType `bun:",nullzero"` // Content type used to process the original text of the status - Federated *bool `bun:",notnull"` // This status will be federated beyond the local timeline(s) - InteractionPolicy *InteractionPolicy `bun:""` // InteractionPolicy for this status. If null then the default InteractionPolicy should be assumed for this status's Visibility. Always null for boost wrappers. - PendingApproval *bool `bun:",nullzero,notnull,default:false"` // If true then status is a reply or boost wrapper that must be Approved by the reply-ee or boost-ee before being fully distributed. - PreApproved bool `bun:"-"` // If true, then status is a reply to or boost wrapper of a status on our instance, has permission to do the interaction, and an Accept should be sent out for it immediately. Field not stored in the DB. - ApprovedByURI string `bun:",nullzero"` // URI of *either* an Accept Activity, or a ReplyAuthorization or AnnounceAuthorization, which approves the Announce, Create or interaction request Activity that this status was/will be attached to. + ID string `bun:"type:CHAR(26),pk,nullzero,notnull,unique"` // id of this item in the database + CreatedAt time.Time `bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"` // when was item created + EditedAt time.Time `bun:"type:timestamptz,nullzero"` // when this status was last edited (if set) + FetchedAt time.Time `bun:"type:timestamptz,nullzero"` // when was item (remote) last fetched. + PinnedAt time.Time `bun:"type:timestamptz,nullzero"` // Status was pinned by owning account at this time. + URI string `bun:",unique,nullzero,notnull"` // activitypub URI of this status + URL string `bun:",nullzero"` // web url for viewing this status + Content string `bun:""` // Content HTML for this status. + AttachmentIDs []string `bun:"attachments,array"` // Database IDs of any media attachments associated with this status + Attachments []*MediaAttachment `bun:"attached_media,rel:has-many"` // Attachments corresponding to attachmentIDs + TagIDs []string `bun:"tags,array"` // Database IDs of any tags used in this status + Tags []*Tag `bun:"attached_tags,m2m:status_to_tags"` // Tags corresponding to tagIDs. https://bun.uptrace.dev/guide/relations.html#many-to-many-relation + MentionIDs []string `bun:"mentions,array"` // Database IDs of any mentions in this status + Mentions []*Mention `bun:"attached_mentions,rel:has-many"` // Mentions corresponding to mentionIDs + EmojiIDs []string `bun:"emojis,array"` // Database IDs of any emojis used in this status + Emojis []*Emoji `bun:"attached_emojis,m2m:status_to_emojis"` // Emojis corresponding to emojiIDs. https://bun.uptrace.dev/guide/relations.html#many-to-many-relation + Local *bool `bun:",nullzero,notnull,default:false"` // is this status from a local account? + AccountID string `bun:"type:CHAR(26),nullzero,notnull"` // which account posted this status? + Account *Account `bun:"rel:belongs-to"` // account corresponding to accountID + AccountURI string `bun:",nullzero,notnull"` // activitypub uri of the owner of this status + InReplyToID string `bun:"type:CHAR(26),nullzero"` // id of the status this status replies to + InReplyToURI string `bun:",nullzero"` // activitypub uri of the status this status is a reply to + InReplyToAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that this status replies to + InReplyTo *Status `bun:"-"` // status corresponding to inReplyToID + InReplyToAccount *Account `bun:"rel:belongs-to"` // account corresponding to inReplyToAccountID + BoostOfID string `bun:"type:CHAR(26),nullzero"` // id of the status this status is a boost of + BoostOfURI string `bun:"-"` // URI of the status this status is a boost of; field not inserted in the db, just for dereferencing purposes. + BoostOfAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that owns the boosted status + BoostOf *Status `bun:"-"` // status that corresponds to boostOfID + BoostOfAccount *Account `bun:"rel:belongs-to"` // account that corresponds to boostOfAccountID + ThreadID string `bun:"type:CHAR(26),nullzero,notnull,default:'00000000000000000000000000'"` // id of the thread to which this status belongs + EditIDs []string `bun:"edits,array"` // IDs of status edits for this status, ordered from smallest (oldest) -> largest (newest) ID. + Edits []*StatusEdit `bun:"-"` // Edits of this status, ordered from oldest -> newest edit. + PollID string `bun:"type:CHAR(26),nullzero"` // + Poll *Poll `bun:"-"` // + ContentWarning string `bun:",nullzero"` // Content warning HTML for this status. + ContentWarningText string `bun:""` // Original text of the content warning without formatting + Visibility Visibility `bun:",nullzero,notnull"` // visibility entry for this status + Sensitive *bool `bun:",nullzero,notnull,default:false"` // mark the status as sensitive? + Language string `bun:",nullzero"` // what language is this status written in? + CreatedWithApplicationID string `bun:"type:CHAR(26),nullzero"` // Which application was used to create this status? + CreatedWithApplication *Application `bun:"rel:belongs-to"` // application corresponding to createdWithApplicationID + ActivityStreamsType string `bun:",nullzero,notnull"` // What is the activitystreams type of this status? See: https://www.w3.org/TR/activitystreams-vocabulary/#object-types. Will probably almost always be Note but who knows!. + Text string `bun:""` // Original text of the status without formatting + ContentType StatusContentType `bun:",nullzero"` // Content type used to process the original text of the status + Federated *bool `bun:",notnull"` // This status will be federated beyond the local timeline(s) + InteractionPolicy *InteractionPolicy `bun:""` // InteractionPolicy for this status. If null then the default InteractionPolicy should be assumed for this status's Visibility. Always null for boost wrappers. + PendingApproval *bool `bun:",nullzero,notnull,default:false"` // If true then status is a reply or boost wrapper that must be Approved by the reply-ee or boost-ee before being fully distributed. + PreApproved bool `bun:"-"` // If true, then status is a reply to or boost wrapper of a status on our instance, has permission to do the interaction, and an Accept should be sent out for it immediately. Field not stored in the DB. + ApprovedByURI string `bun:",nullzero"` // URI of *either* an Accept Activity, or a ReplyAuthorization or AnnounceAuthorization, which approves the Announce, Create or interaction request Activity that this status was/will be attached to. } // GetID implements timeline.Timelineable{}. diff --git a/internal/media/ffmpeg.go b/internal/media/ffmpeg.go index d98e93baf..938a10894 100644 --- a/internal/media/ffmpeg.go +++ b/internal/media/ffmpeg.go @@ -21,8 +21,6 @@ import ( "context" "encoding/json" "errors" - "os" - "path" "strconv" "strings" @@ -158,34 +156,20 @@ func ffmpeg(ctx context.Context, inpath string, outpath string, args ...string) Config: func(modcfg wazero.ModuleConfig) wazero.ModuleConfig { fscfg := wazero.NewFSConfig() - // Needs read-only access to - // /dev/urandom for some types. - urandom := &allowFiles{ - { - abs: "/dev/urandom", - flag: os.O_RDONLY, - perm: 0, - }, - } - fscfg = fscfg.WithFSMount(urandom, "/dev") + // Needs read-only access /dev/urandom, + // required by some ffmpeg operations. + fscfg = fscfg.WithFSMount(&allowFiles{ + allowRead("/dev/urandom"), + }, "/dev") // In+out dirs are always the same (tmp), // so we can share one file system for // both + grant different perms to inpath // (read only) and outpath (read+write). - shared := &allowFiles{ - { - abs: inpath, - flag: os.O_RDONLY, - perm: 0, - }, - { - abs: outpath, - flag: os.O_RDWR | os.O_CREATE | os.O_TRUNC, - perm: 0666, - }, - } - fscfg = fscfg.WithFSMount(shared, path.Dir(inpath)) + fscfg = fscfg.WithFSMount(&allowFiles{ + allowCreate(outpath), + allowRead(inpath), + }, tmpdir) // Set anonymous module name. modcfg = modcfg.WithName("") @@ -246,16 +230,10 @@ func ffprobe(ctx context.Context, filepath string) (*result, error) { Config: func(modcfg wazero.ModuleConfig) wazero.ModuleConfig { fscfg := wazero.NewFSConfig() - // Needs read-only access - // to file being probed. - in := &allowFiles{ - { - abs: filepath, - flag: os.O_RDONLY, - perm: 0, - }, - } - fscfg = fscfg.WithFSMount(in, path.Dir(filepath)) + // Needs read-only access to probed file. + fscfg = fscfg.WithFSMount(&allowFiles{ + allowRead(filepath), + }, tmpdir) // Set anonymous module name. modcfg = modcfg.WithName("") diff --git a/internal/media/ffmpeg/wasm.go b/internal/media/ffmpeg/wasm.go index 1cd92f05d..f395032fa 100644 --- a/internal/media/ffmpeg/wasm.go +++ b/internal/media/ffmpeg/wasm.go @@ -21,12 +21,12 @@ package ffmpeg import ( "context" + "errors" "os" "runtime" "sync/atomic" "unsafe" - "code.superseriousbusiness.org/gotosocial/internal/log" "codeberg.org/gruf/go-ffmpreg/embed" "codeberg.org/gruf/go-ffmpreg/wasm" "github.com/tetratelabs/wazero" @@ -49,24 +49,19 @@ func initWASM(ctx context.Context) error { return nil } - var cfg wazero.RuntimeConfig - - // Allocate new runtime config, letting - // wazero determine compiler / interpreter. - cfg = wazero.NewRuntimeConfig() - - // Though still perform a check of CPU features at - // runtime to warn about slow interpreter performance. - if reason, supported := compilerSupported(); !supported { - log.Warn(ctx, "!!! WAZERO COMPILER MAY NOT BE AVAILABLE !!!"+ - " Reason: "+reason+"."+ - " Wazero will likely fall back to interpreter mode,"+ - " resulting in poor performance for media processing (and SQLite, if in use)."+ - " For more info and possible workarounds, please check:"+ - " https://docs.gotosocial.org/en/latest/getting_started/releases/#supported-platforms", - ) + // Check at runtime whether Wazero compiler support is available, + // interpreter mode is too slow for a usable gotosocial experience. + if reason, supported := isCompilerSupported(); !supported { + return errors.New("!!! WAZERO COMPILER SUPPORT NOT AVAILABLE !!!" + + " Reason: " + reason + "." + + " Wazero in interpreter mode is too slow to use ffmpeg" + + " (this will also affect SQLite if in use)." + + " For more info and possible workarounds, please check: https://docs.gotosocial.org/en/latest/getting_started/releases/#supported-platforms") } + // Allocate new runtime compiler config. + cfg := wazero.NewRuntimeConfigCompiler() + if dir := os.Getenv("GTS_WAZERO_COMPILATION_CACHE"); dir != "" { // Use on-filesystem compilation cache given by env. cache, err := wazero.NewCompilationCacheWithDir(dir) @@ -88,7 +83,7 @@ func initWASM(ctx context.Context) error { defer func() { if err == nil && set { // Drop binary. - embed.B = nil + embed.Free() return } @@ -110,7 +105,7 @@ func initWASM(ctx context.Context) error { } // Compile ffmpreg WebAssembly into memory. - mod, err = run.CompileModule(ctx, embed.B) + mod, err = run.CompileModule(ctx, embed.B()) if err != nil { return err } @@ -128,7 +123,7 @@ func initWASM(ctx context.Context) error { return nil } -func compilerSupported() (string, bool) { +func isCompilerSupported() (string, bool) { switch runtime.GOOS { case "linux", "android", "windows", "darwin", @@ -141,10 +136,11 @@ func compilerSupported() (string, bool) { switch runtime.GOARCH { case "amd64": // NOTE: wazero in the future may decouple the - // requirement of simd (sse4_1) from requirements + // requirement of simd (sse4_1+2) from requirements // for compiler support in the future, but even // still our module go-ffmpreg makes use of them. - return "amd64 SSE4.1 required", cpu.X86.HasSSE41 + return "amd64 x86-64-v2 required (see: https://en.wikipedia.org/wiki/X86-64-v2)", + cpu.Initialized && cpu.X86.HasSSE3 && cpu.X86.HasSSE41 && cpu.X86.HasSSE42 case "arm64": // NOTE: this particular check may change if we // later update go-ffmpreg to a version that makes diff --git a/internal/media/metadata.go b/internal/media/metadata.go index 44b1a87b6..c1fa58645 100644 --- a/internal/media/metadata.go +++ b/internal/media/metadata.go @@ -74,20 +74,28 @@ func clearMetadata(ctx context.Context, filepath string) error { // terminateExif cleans exif data from file at input path, into file // at output path, using given file extension to determine cleaning type. -func terminateExif(outpath, inpath string, ext string) error { +func terminateExif(outpath, inpath string, ext string) (err error) { + var inFile *os.File + var outFile *os.File + + // Ensure handles + // closed on return. + defer func() { + outFile.Close() + inFile.Close() + }() + // Open input file at given path. - inFile, err := os.Open(inpath) + inFile, err = openRead(inpath) if err != nil { return gtserror.Newf("error opening input file %s: %w", inpath, err) } - defer inFile.Close() - // Open output file at given path. - outFile, err := os.Create(outpath) + // Create output file at given path. + outFile, err = openWrite(outpath) if err != nil { return gtserror.Newf("error opening output file %s: %w", outpath, err) } - defer outFile.Close() // Terminate EXIF data from 'inFile' -> 'outFile'. err = terminator.TerminateInto(outFile, inFile, ext) diff --git a/internal/media/probe.go b/internal/media/probe.go index 791b6a8c2..c66254b90 100644 --- a/internal/media/probe.go +++ b/internal/media/probe.go @@ -38,8 +38,9 @@ const ( // probe will first attempt to probe the file at path using native Go code // (for performance), but falls back to using ffprobe to retrieve media details. func probe(ctx context.Context, filepath string) (*result, error) { + // Open input file at given path. - file, err := os.Open(filepath) + file, err := openRead(filepath) if err != nil { return nil, gtserror.Newf("error opening file %s: %w", filepath, err) } @@ -80,6 +81,7 @@ func probe(ctx context.Context, filepath string) (*result, error) { // probeJPEG decodes the given file as JPEG and determines // image details from the decoded JPEG using native Go code. func probeJPEG(file *os.File) (*result, error) { + // Attempt to decode JPEG, adding back hdr magic. cfg, err := jpeg.DecodeConfig(io.MultiReader( strings.NewReader(magicJPEG), diff --git a/internal/media/thumbnail.go b/internal/media/thumbnail.go index d9a2e522a..5fccaf5ce 100644 --- a/internal/media/thumbnail.go +++ b/internal/media/thumbnail.go @@ -24,7 +24,6 @@ import ( "image/jpeg" "image/png" "io" - "os" "strings" "code.superseriousbusiness.org/gotosocial/internal/gtserror" @@ -89,8 +88,8 @@ func generateThumb( // Default type is webp. mimeType = "image/webp" - // Generate thumb output path REPLACING extension. - if i := strings.IndexByte(filepath, '.'); i != -1 { + // Generate thumb output path REPLACING file extension. + if i := strings.LastIndexByte(filepath, '.'); i != -1 { outpath = filepath[:i] + "_thumb.webp" ext = filepath[i+1:] // old extension } else { @@ -231,7 +230,7 @@ func generateNativeThumb( error, ) { // Open input file at given path. - infile, err := os.Open(inpath) + infile, err := openRead(inpath) if err != nil { return "", gtserror.Newf("error opening input file %s: %w", inpath, err) } @@ -272,7 +271,7 @@ func generateNativeThumb( ) // Open output file at given path. - outfile, err := os.Create(outpath) + outfile, err := openWrite(outpath) if err != nil { return "", gtserror.Newf("error opening output file %s: %w", outpath, err) } @@ -313,8 +312,9 @@ func generateNativeThumb( // generateWebpBlurhash generates a blurhash for Webp at filepath. func generateWebpBlurhash(filepath string) (string, error) { + // Open the file at given path. - file, err := os.Open(filepath) + file, err := openRead(filepath) if err != nil { return "", gtserror.Newf("error opening input file %s: %w", filepath, err) } diff --git a/internal/media/util.go b/internal/media/util.go index ea52b415b..d73206434 100644 --- a/internal/media/util.go +++ b/internal/media/util.go @@ -30,14 +30,41 @@ import ( "codeberg.org/gruf/go-iotools" ) +// media processing tmpdir. +var tmpdir = os.TempDir() + // file represents one file // with the given flag and perms. type file struct { - abs string + abs string // absolute file path, including root + dir string // containing directory of abs + rel string // relative to root, i.e. trim_prefix(abs, dir) flag int perm os.FileMode } +// allowRead returns a new file{} for filepath permitted only to read. +func allowRead(filepath string) file { + return newFile(filepath, os.O_RDONLY, 0) +} + +// allowCreate returns a new file{} for filepath permitted to read / write / create. +func allowCreate(filepath string) file { + return newFile(filepath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600) +} + +// newFile returns a new instance of file{} for given path and open args. +func newFile(filepath string, flag int, perms os.FileMode) file { + dir, rel := path.Split(filepath) + return file{ + abs: filepath, + rel: rel, + dir: dir, + flag: flag, + perm: perms, + } +} + // allowFiles implements fs.FS to allow // access to a specified slice of files. type allowFiles []file @@ -45,36 +72,32 @@ type allowFiles []file // Open implements fs.FS. func (af allowFiles) Open(name string) (fs.File, error) { for _, file := range af { - var ( - abs = file.abs - flag = file.flag - perm = file.perm - ) - + switch name { // Allowed to open file - // at absolute path. - if name == file.abs { - return os.OpenFile(abs, flag, perm) - } + // at absolute path, or + // relative as ffmpeg likes. + case file.abs, file.rel: + return os.OpenFile(file.abs, file.flag, file.perm) - // Check for other valid reads. - thisDir, thisFile := path.Split(file.abs) - - // Allowed to read directory itself. - if name == thisDir || name == "." { - return os.OpenFile(thisDir, flag, perm) - } - - // Allowed to read file - // itself (at relative path). - if name == thisFile { - return os.OpenFile(abs, flag, perm) + // Ffmpeg likes to read containing + // dir as '.'. Allow RO access here. + case ".": + return openRead(file.dir) } } - return nil, os.ErrPermission } +// openRead opens the existing file at path for reads only. +func openRead(path string) (*os.File, error) { + return os.OpenFile(path, os.O_RDONLY, 0) +} + +// openWrite opens the (new!) file at path for read / writes. +func openWrite(path string) (*os.File, error) { + return os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600) +} + // getExtension splits file extension from path. func getExtension(path string) string { for i := len(path) - 1; i >= 0 && path[i] != '/'; i-- { @@ -93,17 +116,24 @@ func getExtension(path string) string { // chance that Linux's sendfile syscall can be utilised for optimal // draining of data source to temporary file storage. func drainToTmp(rc io.ReadCloser) (string, error) { - defer rc.Close() + var tmp *os.File + var err error + + // Close handles + // on func return. + defer func() { + tmp.Close() + rc.Close() + }() // Open new temporary file. - tmp, err := os.CreateTemp( - os.TempDir(), + tmp, err = os.CreateTemp( + tmpdir, "gotosocial-*", ) if err != nil { return "", err } - defer tmp.Close() // Extract file path. path := tmp.Name() diff --git a/vendor/codeberg.org/gruf/go-ffmpreg/embed/lib.go b/vendor/codeberg.org/gruf/go-ffmpreg/embed/lib.go index 7829b5524..b39d7d10e 100644 --- a/vendor/codeberg.org/gruf/go-ffmpreg/embed/lib.go +++ b/vendor/codeberg.org/gruf/go-ffmpreg/embed/lib.go @@ -1,39 +1,46 @@ package embed import ( - "bytes" "compress/gzip" _ "embed" "io" - "os" + "strings" ) func init() { var err error - if path := os.Getenv("FFMPREG_WASM"); path != "" { - // Read file into memory. - B, err = os.ReadFile(path) - if err != nil { - panic(err) - } - } - // Wrap bytes in reader. - b := bytes.NewReader(B) + r := strings.NewReader(s) // Create unzipper from reader. - gz, err := gzip.NewReader(b) + gz, err := gzip.NewReader(r) if err != nil { panic(err) } // Extract gzipped binary. - B, err = io.ReadAll(gz) + b, err := io.ReadAll(gz) if err != nil { panic(err) } + + // Set binary. + s = string(b) } +// B returns a copy of +// embedded binary data. +func B() []byte { + if s == "" { + panic("binary already dropped from memory") + } + return []byte(s) +} + +// Free will drop embedded +// binary from runtime mem. +func Free() { s = "" } + //go:embed ffmpreg.wasm.gz -var B []byte +var s string diff --git a/vendor/codeberg.org/gruf/go-ffmpreg/wasm/funcs.go b/vendor/codeberg.org/gruf/go-ffmpreg/wasm/funcs.go index a809ff120..a0a199ca1 100644 --- a/vendor/codeberg.org/gruf/go-ffmpreg/wasm/funcs.go +++ b/vendor/codeberg.org/gruf/go-ffmpreg/wasm/funcs.go @@ -9,22 +9,93 @@ import ( type snapshotskey struct{} +type snapshotctx struct { + context.Context + snaps *snapshots +} + +func (ctx snapshotctx) Value(key any) any { + if _, ok := key.(snapshotskey); ok { + return ctx.snaps + } + return ctx.Context.Value(key) +} + +const ringsz uint = 8 + +type snapshots struct { + r [ringsz]struct { + eptr uint32 + snap experimental.Snapshot + } + n uint +} + +func (s *snapshots) get(envptr uint32) experimental.Snapshot { + start := (s.n % ringsz) + + for i := start; i != ^uint(0); i-- { + if s.r[i].eptr == envptr { + snap := s.r[i].snap + s.r[i].eptr = 0 + s.r[i].snap = nil + s.n = i - 1 + return snap + } + } + + for i := ringsz - 1; i > start; i-- { + if s.r[i].eptr == envptr { + snap := s.r[i].snap + s.r[i].eptr = 0 + s.r[i].snap = nil + s.n = i - 1 + return snap + } + } + + panic("snapshot not found") +} + +func (s *snapshots) set(envptr uint32, snapshot experimental.Snapshot) { + start := (s.n % ringsz) + + for i := start; i < ringsz; i++ { + switch s.r[i].eptr { + case 0, envptr: + s.r[i].eptr = envptr + s.r[i].snap = snapshot + s.n = i + return + } + } + + for i := uint(0); i < start; i++ { + switch s.r[i].eptr { + case 0, envptr: + s.r[i].eptr = envptr + s.r[i].snap = snapshot + s.n = i + return + } + } + + panic("snapshots full") +} + // withSetjmpLongjmp updates the context to contain wazero/experimental.Snapshotter{} support, // and embeds the necessary snapshots map required for later calls to Setjmp() / Longjmp(). func withSetjmpLongjmp(ctx context.Context) context.Context { - snapshots := make(map[uint32]experimental.Snapshot, 10) - ctx = experimental.WithSnapshotter(ctx) - ctx = context.WithValue(ctx, snapshotskey{}, snapshots) - return ctx + return snapshotctx{Context: experimental.WithSnapshotter(ctx), snaps: new(snapshots)} } -func getSnapshots(ctx context.Context) map[uint32]experimental.Snapshot { - v, _ := ctx.Value(snapshotskey{}).(map[uint32]experimental.Snapshot) +func getSnapshots(ctx context.Context) *snapshots { + v, _ := ctx.Value(snapshotskey{}).(*snapshots) return v } // setjmp implements the C function: setjmp(env jmp_buf) -func setjmp(ctx context.Context, mod api.Module, stack []uint64) { +func setjmp(ctx context.Context, _ api.Module, stack []uint64) { // Input arguments. envptr := api.DecodeU32(stack[0]) @@ -35,19 +106,16 @@ func setjmp(ctx context.Context, mod api.Module, stack []uint64) { // Get stored snapshots map. snapshots := getSnapshots(ctx) - if snapshots == nil { - panic("setjmp / longjmp not supported") - } // Set latest snapshot in map. - snapshots[envptr] = snapshot + snapshots.set(envptr, snapshot) // Set return. stack[0] = 0 } // longjmp implements the C function: int longjmp(env jmp_buf, value int) -func longjmp(ctx context.Context, mod api.Module, stack []uint64) { +func longjmp(ctx context.Context, _ api.Module, stack []uint64) { // Input arguments. envptr := api.DecodeU32(stack[0]) @@ -60,10 +128,7 @@ func longjmp(ctx context.Context, mod api.Module, stack []uint64) { } // Get snapshot stored in map. - snapshot := snapshots[envptr] - if snapshot == nil { - panic("must first call setjmp") - } + snapshot := snapshots.get(envptr) // Set return. stack[0] = 0 diff --git a/vendor/codeberg.org/gruf/go-ffmpreg/wasm/run.go b/vendor/codeberg.org/gruf/go-ffmpreg/wasm/run.go index 7b07d851d..c247abaf0 100644 --- a/vendor/codeberg.org/gruf/go-ffmpreg/wasm/run.go +++ b/vendor/codeberg.org/gruf/go-ffmpreg/wasm/run.go @@ -53,6 +53,7 @@ func Run( modcfg = modcfg.WithStdin(args.Stdin) modcfg = modcfg.WithStdout(args.Stdout) modcfg = modcfg.WithStderr(args.Stderr) + modcfg = modcfg.WithName("") if args.Config != nil { // Pass through config fn. diff --git a/vendor/codeberg.org/gruf/go-ffmpreg/wasm/runtime.go b/vendor/codeberg.org/gruf/go-ffmpreg/wasm/runtime.go index 328a26193..ca13bf775 100644 --- a/vendor/codeberg.org/gruf/go-ffmpreg/wasm/runtime.go +++ b/vendor/codeberg.org/gruf/go-ffmpreg/wasm/runtime.go @@ -28,6 +28,7 @@ func NewRuntime(ctx context.Context, cfg wazero.RuntimeConfig) (wazero.Runtime, // Set core features ffmpeg compiled with. cfg = cfg.WithCoreFeatures(CoreFeatures) + cfg = cfg.WithDebugInfoEnabled(false) // Instantiate runtime with prepared config. rt := wazero.NewRuntimeWithConfig(ctx, cfg) diff --git a/vendor/modules.txt b/vendor/modules.txt index ac4bab587..7d2b53ad4 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -247,7 +247,7 @@ codeberg.org/gruf/go-fastcopy # codeberg.org/gruf/go-fastpath/v2 v2.0.0 ## explicit; go 1.14 codeberg.org/gruf/go-fastpath/v2 -# codeberg.org/gruf/go-ffmpreg v0.6.11 +# codeberg.org/gruf/go-ffmpreg v0.6.12 ## explicit; go 1.22.0 codeberg.org/gruf/go-ffmpreg/embed codeberg.org/gruf/go-ffmpreg/wasm