Compare commits

..

8 commits

Author SHA1 Message Date
tobi
d212736165 boobs 2025-09-29 18:56:44 +02:00
tobi
6c04ae231c i'm adjusting the PR, pray i don't adjust it further 2025-09-29 16:48:46 +02:00
tobi
9e2fd4734b should be done poking now 2025-09-29 12:11:37 +02:00
tobi
408ddc367d whoops 2025-09-29 12:02:19 +02:00
tobi
228b41cb53 few more little tweaks 2025-09-29 11:59:35 +02:00
tobi
c7b0150834 whew 2025-09-26 17:14:42 +02:00
tobi
cde373143a remove errant comment 2025-09-26 15:17:30 +02:00
tobi
c99412af98 finalize indexes etc 2025-09-26 15:14:26 +02:00
16 changed files with 230 additions and 308 deletions

View file

@ -538,21 +538,23 @@ It may be useful when testing or debugging migrations to be able to run them aga
Basic steps for this: Basic steps for this:
First dump the Postgres database on the remote machine, and copy the dump over to your development machine. 1. Dump the Postgres database on the remote machine, and copy the dump over to your development machine.
2. Create a local Postgres container and mount the dump into it with, for example:
Now create a local Postgres container and mount the dump into it with, for example: ```bash
docker run -it --name postgres --network host -e POSTGRES_PASSWORD=postgres -v /path/to/db_dump:/db_dump postgres
```
3. Get a terminal inside the running container:
```bash ```bash
docker run -it --name postgres --network host -e POSTGRES_PASSWORD=postgres -v /path/to/db_dump:/db_dump postgres docker exec -it --user postgres postgres bash
``` ```
4. Using that terminal, restore the dump (this will probably take a little while depending on the dump size and the specs of your machine):
In a separate terminal window, execute a command inside the running container to load the dump into the "postgres" database: ```bash
psql -X postgres < /db_dump
```bash ```
docker exec -it --user postgres postgres psql -X -f /db_dump postgres 5. With the Postgres container still running, run GoToSocial and point it towards the container. Use the appropriate `GTS_HOST` (and `GTS_ACCOUNT_DOMAIN`) values for the instance you dumped:
```
With the Postgres container still running, run GoToSocial and point it towards the container. Use the appropriate `GTS_HOST` (and `GTS_ACCOUNT_DOMAIN`) values for the instance you dumped:
```bash ```bash
GTS_HOST=example.org \ GTS_HOST=example.org \

View file

@ -328,7 +328,7 @@ This is the current status of support offered by GoToSocial for different platfo
Notes on 64-bit CPU feature requirements: Notes on 64-bit CPU feature requirements:
- x86_64 requires the [x86-64-v2](https://en.wikipedia.org/wiki/X86-64-v2) level instruction sets. (CPUs manufactured after ~2010) - x86_64 requires the SSE4.1 instruction set. (CPUs manufactured after ~2010)
- ARM64 requires no specific features, ARMv8 CPUs (and later) have all required features. - ARM64 requires no specific features, ARMv8 CPUs (and later) have all required features.

2
go.mod
View file

@ -21,7 +21,7 @@ require (
codeberg.org/gruf/go-errors/v2 v2.3.2 codeberg.org/gruf/go-errors/v2 v2.3.2
codeberg.org/gruf/go-fastcopy v1.1.3 codeberg.org/gruf/go-fastcopy v1.1.3
codeberg.org/gruf/go-fastpath/v2 v2.0.0 codeberg.org/gruf/go-fastpath/v2 v2.0.0
codeberg.org/gruf/go-ffmpreg v0.6.12 codeberg.org/gruf/go-ffmpreg v0.6.11
codeberg.org/gruf/go-iotools v0.0.0-20240710125620-934ae9c654cf codeberg.org/gruf/go-iotools v0.0.0-20240710125620-934ae9c654cf
codeberg.org/gruf/go-kv/v2 v2.0.7 codeberg.org/gruf/go-kv/v2 v2.0.7
codeberg.org/gruf/go-list v0.0.0-20240425093752-494db03d641f codeberg.org/gruf/go-list v0.0.0-20240425093752-494db03d641f

4
go.sum generated
View file

@ -26,8 +26,8 @@ codeberg.org/gruf/go-fastcopy v1.1.3 h1:Jo9VTQjI6KYimlw25PPc7YLA3Xm+XMQhaHwKnM7x
codeberg.org/gruf/go-fastcopy v1.1.3/go.mod h1:GDDYR0Cnb3U/AIfGM3983V/L+GN+vuwVMvrmVABo21s= codeberg.org/gruf/go-fastcopy v1.1.3/go.mod h1:GDDYR0Cnb3U/AIfGM3983V/L+GN+vuwVMvrmVABo21s=
codeberg.org/gruf/go-fastpath/v2 v2.0.0 h1:iAS9GZahFhyWEH0KLhFEJR+txx1ZhMXxYzu2q5Qo9c0= codeberg.org/gruf/go-fastpath/v2 v2.0.0 h1:iAS9GZahFhyWEH0KLhFEJR+txx1ZhMXxYzu2q5Qo9c0=
codeberg.org/gruf/go-fastpath/v2 v2.0.0/go.mod h1:3pPqu5nZjpbRrOqvLyAK7puS1OfEtQvjd6342Cwz56Q= codeberg.org/gruf/go-fastpath/v2 v2.0.0/go.mod h1:3pPqu5nZjpbRrOqvLyAK7puS1OfEtQvjd6342Cwz56Q=
codeberg.org/gruf/go-ffmpreg v0.6.12 h1:mPdRx1TAQJQPhRkTOOHnRSY6omNCLJ7M6ajjuEMNNvE= codeberg.org/gruf/go-ffmpreg v0.6.11 h1:+lvB5Loy0KUAKfv6nOZRWHFVgN08cpHhUlYcZxL8M20=
codeberg.org/gruf/go-ffmpreg v0.6.12/go.mod h1:tGqIMh/I2cizqauxxNAN+WGkICI0j5G3xwF1uBkyw1E= codeberg.org/gruf/go-ffmpreg v0.6.11/go.mod h1:tGqIMh/I2cizqauxxNAN+WGkICI0j5G3xwF1uBkyw1E=
codeberg.org/gruf/go-iotools v0.0.0-20240710125620-934ae9c654cf h1:84s/ii8N6lYlskZjHH+DG6jyia8w2mXMZlRwFn8Gs3A= codeberg.org/gruf/go-iotools v0.0.0-20240710125620-934ae9c654cf h1:84s/ii8N6lYlskZjHH+DG6jyia8w2mXMZlRwFn8Gs3A=
codeberg.org/gruf/go-iotools v0.0.0-20240710125620-934ae9c654cf/go.mod h1:zZAICsp5rY7+hxnws2V0ePrWxE0Z2Z/KXcN3p/RQCfk= codeberg.org/gruf/go-iotools v0.0.0-20240710125620-934ae9c654cf/go.mod h1:zZAICsp5rY7+hxnws2V0ePrWxE0Z2Z/KXcN3p/RQCfk=
codeberg.org/gruf/go-kv v1.6.5 h1:ttPf0NA8F79pDqBttSudPTVCZmGncumeNIxmeM9ztz0= codeberg.org/gruf/go-kv v1.6.5 h1:ttPf0NA8F79pDqBttSudPTVCZmGncumeNIxmeM9ztz0=

View file

@ -96,7 +96,7 @@ func init() {
clear(statuses) clear(statuses)
statuses = statuses[:0] statuses = statuses[:0]
batchStart := time.Now() start := time.Now()
// Select IDs of next // Select IDs of next
// batch, paging down. // batch, paging down.
@ -106,51 +106,46 @@ func init() {
Where("? IS NULL", bun.Ident("in_reply_to_id")). Where("? IS NULL", bun.Ident("in_reply_to_id")).
Where("? < ?", bun.Ident("id"), maxID). Where("? < ?", bun.Ident("id"), maxID).
OrderExpr("? DESC", bun.Ident("id")). OrderExpr("? DESC", bun.Ident("id")).
Limit(500). Limit(100).
Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) { Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) {
return gtserror.Newf("error selecting statuses: %w", err) return gtserror.Newf("error selecting top-level statuses: %w", err)
} }
l := len(statuses) // Every 50 loops, flush wal and begin new
if l == 0 { // transaction, to avoid silly wal sizes.
// No more statuses! if i%50 == 0 {
//
// Transaction will be closed
// after leaving the loop.
break
} else if i%100 == 0 {
// Begin a new transaction every
// 100 batches (~50000 statuses),
// to avoid massive commits.
// Close existing transaction.
if err := tx.Commit(); err != nil { if err := tx.Commit(); err != nil {
return err return err
} }
// Try to flush the wal
// to avoid silly wal sizes.
if err := doWALCheckpoint(ctx, db); err != nil { if err := doWALCheckpoint(ctx, db); err != nil {
return err return err
} }
// Open new transaction.
tx, err = db.BeginTx(ctx, nil) tx, err = db.BeginTx(ctx, nil)
if err != nil { if err != nil {
return err return err
} }
} }
// Set next maxID // No more statuses!
// value from statuses. l := len(statuses)
if l == 0 {
if err := tx.Commit(); err != nil {
return err
}
log.Info(ctx, "done migrating statuses!")
break
}
// Set next maxID value from statuses.
maxID = statuses[l-1].ID maxID = statuses[l-1].ID
// Rethread using the // Rethread inside the transaction.
// open transaction.
var updatedRowsThisBatch int64 var updatedRowsThisBatch int64
for _, status := range statuses { for _, status := range statuses {
n, err := sr.rethreadStatus(ctx, tx, status, false) n, err := sr.rethreadStatus(ctx, tx, status)
if err != nil { if err != nil {
return gtserror.Newf("error rethreading status %s: %w", status.URI, err) return gtserror.Newf("error rethreading status %s: %w", status.URI, err)
} }
@ -159,7 +154,7 @@ func init() {
} }
// Show speed for this batch. // Show speed for this batch.
timeTaken := time.Since(batchStart).Milliseconds() timeTaken := time.Since(start).Milliseconds()
msPerRow := float64(timeTaken) / float64(updatedRowsThisBatch) msPerRow := float64(timeTaken) / float64(updatedRowsThisBatch)
rowsPerMs := float64(1) / float64(msPerRow) rowsPerMs := float64(1) / float64(msPerRow)
rowsPerSecond := 1000 * rowsPerMs rowsPerSecond := 1000 * rowsPerMs
@ -169,73 +164,94 @@ func init() {
log.Infof( log.Infof(
ctx, ctx,
"[~%.2f%% done; ~%.0f rows/s] migrating threads", "[~%.2f%% done; ~%.0f rows/s] paging top-level statuses",
totalDone, rowsPerSecond, totalDone, rowsPerSecond,
) )
} }
// Close transaction. if err := doWALCheckpoint(ctx, db); err != nil {
if err := tx.Commit(); err != nil {
return err return err
} }
// Create a partial index on thread_id_new to find stragglers. // Reset max ID.
// This index will be removed at the end of the migration. maxID = id.Highest
// Create a temporary index on thread_id_new for stragglers.
log.Info(ctx, "creating temporary statuses thread_id_new index") log.Info(ctx, "creating temporary statuses thread_id_new index")
if _, err := db.NewCreateIndex(). if _, err := db.NewCreateIndex().
Table("statuses"). Table("statuses").
Index("statuses_thread_id_new_idx"). Index("statuses_thread_id_new_idx").
Column("thread_id_new"). Column("thread_id_new").
Where("? = ?", bun.Ident("thread_id_new"), id.Lowest).
Exec(ctx); err != nil { Exec(ctx); err != nil {
return gtserror.Newf("error creating new thread_id index: %w", err) return gtserror.Newf("error creating new thread_id index: %w", err)
} }
// Open a new transaction lads.
tx, err = db.BeginTx(ctx, nil)
if err != nil {
return err
}
for i := 1; ; i++ { for i := 1; ; i++ {
// Reset slice. // Reset slice.
clear(statuses) clear(statuses)
statuses = statuses[:0] statuses = statuses[:0]
batchStart := time.Now() start := time.Now()
// Get stragglers for which // Select IDs of stragglers for
// we haven't set thread ID yet. // which we haven't set thread_id yet.
if err := db.NewSelect(). if err := tx.NewSelect().
Model(&statuses). Model(&statuses).
Column("id"). Column("id").
Where("? = ?", bun.Ident("thread_id_new"), id.Lowest). Where("? = ?", bun.Ident("thread_id_new"), id.Lowest).
Limit(250). Limit(500).
Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) { Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) {
return gtserror.Newf("error selecting straggler: %w", err) return gtserror.Newf("error selecting unthreaded statuses: %w", err)
} }
if len(statuses) == 0 { // Every 50 loops, flush wal and begin new
// No more // transaction, to avoid silly wal sizes.
// statuses! if i%50 == 0 {
if err := tx.Commit(); err != nil {
return err
}
if err := doWALCheckpoint(ctx, db); err != nil {
return err
}
tx, err = db.BeginTx(ctx, nil)
if err != nil {
return err
}
}
// No more statuses!
l := len(statuses)
if l == 0 {
if err := tx.Commit(); err != nil {
return err
}
log.Info(ctx, "done migrating statuses!")
break break
} }
// Update this batch // Rethread inside the transaction.
// inside a transaction.
var updatedRowsThisBatch int64 var updatedRowsThisBatch int64
if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
for _, status := range statuses { for _, status := range statuses {
n, err := sr.rethreadStatus(ctx, tx, status)
n, err := sr.rethreadStatus(ctx, tx, status, true)
if err != nil { if err != nil {
return gtserror.Newf("error rethreading status %s: %w", status.URI, err) return gtserror.Newf("error rethreading status %s: %w", status.URI, err)
} }
updatedRowsThisBatch += n updatedRowsThisBatch += n
updatedRowsTotal += n updatedRowsTotal += n
} }
return nil
}); err != nil {
return err
}
// Show speed for this batch. // Show speed for this batch.
timeTaken := time.Since(batchStart).Milliseconds() timeTaken := time.Since(start).Milliseconds()
msPerRow := float64(timeTaken) / float64(updatedRowsThisBatch) msPerRow := float64(timeTaken) / float64(updatedRowsThisBatch)
rowsPerMs := float64(1) / float64(msPerRow) rowsPerMs := float64(1) / float64(msPerRow)
rowsPerSecond := 1000 * rowsPerMs rowsPerSecond := 1000 * rowsPerMs
@ -245,16 +261,11 @@ func init() {
log.Infof( log.Infof(
ctx, ctx,
"[~%.2f%% done; ~%.0f rows/s] migrating stragglers", "[~%.2f%% done; ~%.0f rows/s] cleaning up stragglers",
totalDone, rowsPerSecond, totalDone, rowsPerSecond,
) )
} }
// Try to merge everything we've done so far.
if err := doWALCheckpoint(ctx, db); err != nil {
return err
}
log.Info(ctx, "dropping temporary thread_id_new index") log.Info(ctx, "dropping temporary thread_id_new index")
if _, err := db.NewDropIndex(). if _, err := db.NewDropIndex().
Index("statuses_thread_id_new_idx"). Index("statuses_thread_id_new_idx").
@ -352,7 +363,7 @@ type statusRethreader struct {
// rethreadStatus is the main logic handler for statusRethreader{}. this is what gets called from the migration // rethreadStatus is the main logic handler for statusRethreader{}. this is what gets called from the migration
// in order to trigger a status rethreading operation for the given status, returning total number of rows changed. // in order to trigger a status rethreading operation for the given status, returning total number of rows changed.
func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, status *oldmodel.Status, straggler bool) (int64, error) { func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, status *oldmodel.Status) (int64, error) {
// Zero slice and // Zero slice and
// map ptr values. // map ptr values.
@ -394,11 +405,11 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu
} }
// Set up-to-date values on the status. // Set up-to-date values on the status.
if v, ok := upToDateValues["in_reply_to_id"]; ok && v != nil { if inReplyToID, ok := upToDateValues["in_reply_to_id"]; ok && inReplyToID != nil {
status.InReplyToID = v.(string) status.InReplyToID = inReplyToID.(string)
} }
if v, ok := upToDateValues["thread_id"]; ok && v != nil { if threadID, ok := upToDateValues["thread_id"]; ok && threadID != nil {
status.ThreadID = v.(string) status.ThreadID = threadID.(string)
} }
// status and thread ID cursor // status and thread ID cursor
@ -452,10 +463,7 @@ func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, statu
// Check for the case where the entire // Check for the case where the entire
// batch of statuses is already correctly // batch of statuses is already correctly
// threaded. Then we have nothing to do! // threaded. Then we have nothing to do!
// if sr.allThreaded && len(sr.threadIDs) == 1 {
// Skip this check for straggler statuses
// that are part of broken threads.
if !straggler && sr.allThreaded && len(sr.threadIDs) == 1 {
log.Debug(ctx, "skipping just rethreaded thread") log.Debug(ctx, "skipping just rethreaded thread")
return 0, nil return 0, nil
} }

View file

@ -21,6 +21,8 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"errors" "errors"
"os"
"path"
"strconv" "strconv"
"strings" "strings"
@ -156,20 +158,34 @@ func ffmpeg(ctx context.Context, inpath string, outpath string, args ...string)
Config: func(modcfg wazero.ModuleConfig) wazero.ModuleConfig { Config: func(modcfg wazero.ModuleConfig) wazero.ModuleConfig {
fscfg := wazero.NewFSConfig() fscfg := wazero.NewFSConfig()
// Needs read-only access /dev/urandom, // Needs read-only access to
// required by some ffmpeg operations. // /dev/urandom for some types.
fscfg = fscfg.WithFSMount(&allowFiles{ urandom := &allowFiles{
allowRead("/dev/urandom"), {
}, "/dev") abs: "/dev/urandom",
flag: os.O_RDONLY,
perm: 0,
},
}
fscfg = fscfg.WithFSMount(urandom, "/dev")
// In+out dirs are always the same (tmp), // In+out dirs are always the same (tmp),
// so we can share one file system for // so we can share one file system for
// both + grant different perms to inpath // both + grant different perms to inpath
// (read only) and outpath (read+write). // (read only) and outpath (read+write).
fscfg = fscfg.WithFSMount(&allowFiles{ shared := &allowFiles{
allowCreate(outpath), {
allowRead(inpath), abs: inpath,
}, tmpdir) flag: os.O_RDONLY,
perm: 0,
},
{
abs: outpath,
flag: os.O_RDWR | os.O_CREATE | os.O_TRUNC,
perm: 0666,
},
}
fscfg = fscfg.WithFSMount(shared, path.Dir(inpath))
// Set anonymous module name. // Set anonymous module name.
modcfg = modcfg.WithName("") modcfg = modcfg.WithName("")
@ -230,10 +246,16 @@ func ffprobe(ctx context.Context, filepath string) (*result, error) {
Config: func(modcfg wazero.ModuleConfig) wazero.ModuleConfig { Config: func(modcfg wazero.ModuleConfig) wazero.ModuleConfig {
fscfg := wazero.NewFSConfig() fscfg := wazero.NewFSConfig()
// Needs read-only access to probed file. // Needs read-only access
fscfg = fscfg.WithFSMount(&allowFiles{ // to file being probed.
allowRead(filepath), in := &allowFiles{
}, tmpdir) {
abs: filepath,
flag: os.O_RDONLY,
perm: 0,
},
}
fscfg = fscfg.WithFSMount(in, path.Dir(filepath))
// Set anonymous module name. // Set anonymous module name.
modcfg = modcfg.WithName("") modcfg = modcfg.WithName("")

View file

@ -21,12 +21,12 @@ package ffmpeg
import ( import (
"context" "context"
"errors"
"os" "os"
"runtime" "runtime"
"sync/atomic" "sync/atomic"
"unsafe" "unsafe"
"code.superseriousbusiness.org/gotosocial/internal/log"
"codeberg.org/gruf/go-ffmpreg/embed" "codeberg.org/gruf/go-ffmpreg/embed"
"codeberg.org/gruf/go-ffmpreg/wasm" "codeberg.org/gruf/go-ffmpreg/wasm"
"github.com/tetratelabs/wazero" "github.com/tetratelabs/wazero"
@ -49,18 +49,23 @@ func initWASM(ctx context.Context) error {
return nil return nil
} }
// Check at runtime whether Wazero compiler support is available, var cfg wazero.RuntimeConfig
// interpreter mode is too slow for a usable gotosocial experience.
if reason, supported := isCompilerSupported(); !supported {
return errors.New("!!! WAZERO COMPILER SUPPORT NOT AVAILABLE !!!" +
" Reason: " + reason + "." +
" Wazero in interpreter mode is too slow to use ffmpeg" +
" (this will also affect SQLite if in use)." +
" For more info and possible workarounds, please check: https://docs.gotosocial.org/en/latest/getting_started/releases/#supported-platforms")
}
// Allocate new runtime compiler config. // Allocate new runtime config, letting
cfg := wazero.NewRuntimeConfigCompiler() // wazero determine compiler / interpreter.
cfg = wazero.NewRuntimeConfig()
// Though still perform a check of CPU features at
// runtime to warn about slow interpreter performance.
if reason, supported := compilerSupported(); !supported {
log.Warn(ctx, "!!! WAZERO COMPILER MAY NOT BE AVAILABLE !!!"+
" Reason: "+reason+"."+
" Wazero will likely fall back to interpreter mode,"+
" resulting in poor performance for media processing (and SQLite, if in use)."+
" For more info and possible workarounds, please check:"+
" https://docs.gotosocial.org/en/latest/getting_started/releases/#supported-platforms",
)
}
if dir := os.Getenv("GTS_WAZERO_COMPILATION_CACHE"); dir != "" { if dir := os.Getenv("GTS_WAZERO_COMPILATION_CACHE"); dir != "" {
// Use on-filesystem compilation cache given by env. // Use on-filesystem compilation cache given by env.
@ -83,7 +88,7 @@ func initWASM(ctx context.Context) error {
defer func() { defer func() {
if err == nil && set { if err == nil && set {
// Drop binary. // Drop binary.
embed.Free() embed.B = nil
return return
} }
@ -105,7 +110,7 @@ func initWASM(ctx context.Context) error {
} }
// Compile ffmpreg WebAssembly into memory. // Compile ffmpreg WebAssembly into memory.
mod, err = run.CompileModule(ctx, embed.B()) mod, err = run.CompileModule(ctx, embed.B)
if err != nil { if err != nil {
return err return err
} }
@ -123,7 +128,7 @@ func initWASM(ctx context.Context) error {
return nil return nil
} }
func isCompilerSupported() (string, bool) { func compilerSupported() (string, bool) {
switch runtime.GOOS { switch runtime.GOOS {
case "linux", "android", case "linux", "android",
"windows", "darwin", "windows", "darwin",
@ -136,11 +141,10 @@ func isCompilerSupported() (string, bool) {
switch runtime.GOARCH { switch runtime.GOARCH {
case "amd64": case "amd64":
// NOTE: wazero in the future may decouple the // NOTE: wazero in the future may decouple the
// requirement of simd (sse4_1+2) from requirements // requirement of simd (sse4_1) from requirements
// for compiler support in the future, but even // for compiler support in the future, but even
// still our module go-ffmpreg makes use of them. // still our module go-ffmpreg makes use of them.
return "amd64 x86-64-v2 required (see: https://en.wikipedia.org/wiki/X86-64-v2)", return "amd64 SSE4.1 required", cpu.X86.HasSSE41
cpu.Initialized && cpu.X86.HasSSE3 && cpu.X86.HasSSE41 && cpu.X86.HasSSE42
case "arm64": case "arm64":
// NOTE: this particular check may change if we // NOTE: this particular check may change if we
// later update go-ffmpreg to a version that makes // later update go-ffmpreg to a version that makes

View file

@ -74,28 +74,20 @@ func clearMetadata(ctx context.Context, filepath string) error {
// terminateExif cleans exif data from file at input path, into file // terminateExif cleans exif data from file at input path, into file
// at output path, using given file extension to determine cleaning type. // at output path, using given file extension to determine cleaning type.
func terminateExif(outpath, inpath string, ext string) (err error) { func terminateExif(outpath, inpath string, ext string) error {
var inFile *os.File
var outFile *os.File
// Ensure handles
// closed on return.
defer func() {
outFile.Close()
inFile.Close()
}()
// Open input file at given path. // Open input file at given path.
inFile, err = openRead(inpath) inFile, err := os.Open(inpath)
if err != nil { if err != nil {
return gtserror.Newf("error opening input file %s: %w", inpath, err) return gtserror.Newf("error opening input file %s: %w", inpath, err)
} }
defer inFile.Close()
// Create output file at given path. // Open output file at given path.
outFile, err = openWrite(outpath) outFile, err := os.Create(outpath)
if err != nil { if err != nil {
return gtserror.Newf("error opening output file %s: %w", outpath, err) return gtserror.Newf("error opening output file %s: %w", outpath, err)
} }
defer outFile.Close()
// Terminate EXIF data from 'inFile' -> 'outFile'. // Terminate EXIF data from 'inFile' -> 'outFile'.
err = terminator.TerminateInto(outFile, inFile, ext) err = terminator.TerminateInto(outFile, inFile, ext)

View file

@ -38,9 +38,8 @@ const (
// probe will first attempt to probe the file at path using native Go code // probe will first attempt to probe the file at path using native Go code
// (for performance), but falls back to using ffprobe to retrieve media details. // (for performance), but falls back to using ffprobe to retrieve media details.
func probe(ctx context.Context, filepath string) (*result, error) { func probe(ctx context.Context, filepath string) (*result, error) {
// Open input file at given path. // Open input file at given path.
file, err := openRead(filepath) file, err := os.Open(filepath)
if err != nil { if err != nil {
return nil, gtserror.Newf("error opening file %s: %w", filepath, err) return nil, gtserror.Newf("error opening file %s: %w", filepath, err)
} }
@ -81,7 +80,6 @@ func probe(ctx context.Context, filepath string) (*result, error) {
// probeJPEG decodes the given file as JPEG and determines // probeJPEG decodes the given file as JPEG and determines
// image details from the decoded JPEG using native Go code. // image details from the decoded JPEG using native Go code.
func probeJPEG(file *os.File) (*result, error) { func probeJPEG(file *os.File) (*result, error) {
// Attempt to decode JPEG, adding back hdr magic. // Attempt to decode JPEG, adding back hdr magic.
cfg, err := jpeg.DecodeConfig(io.MultiReader( cfg, err := jpeg.DecodeConfig(io.MultiReader(
strings.NewReader(magicJPEG), strings.NewReader(magicJPEG),

View file

@ -24,6 +24,7 @@ import (
"image/jpeg" "image/jpeg"
"image/png" "image/png"
"io" "io"
"os"
"strings" "strings"
"code.superseriousbusiness.org/gotosocial/internal/gtserror" "code.superseriousbusiness.org/gotosocial/internal/gtserror"
@ -88,8 +89,8 @@ func generateThumb(
// Default type is webp. // Default type is webp.
mimeType = "image/webp" mimeType = "image/webp"
// Generate thumb output path REPLACING file extension. // Generate thumb output path REPLACING extension.
if i := strings.LastIndexByte(filepath, '.'); i != -1 { if i := strings.IndexByte(filepath, '.'); i != -1 {
outpath = filepath[:i] + "_thumb.webp" outpath = filepath[:i] + "_thumb.webp"
ext = filepath[i+1:] // old extension ext = filepath[i+1:] // old extension
} else { } else {
@ -230,7 +231,7 @@ func generateNativeThumb(
error, error,
) { ) {
// Open input file at given path. // Open input file at given path.
infile, err := openRead(inpath) infile, err := os.Open(inpath)
if err != nil { if err != nil {
return "", gtserror.Newf("error opening input file %s: %w", inpath, err) return "", gtserror.Newf("error opening input file %s: %w", inpath, err)
} }
@ -271,7 +272,7 @@ func generateNativeThumb(
) )
// Open output file at given path. // Open output file at given path.
outfile, err := openWrite(outpath) outfile, err := os.Create(outpath)
if err != nil { if err != nil {
return "", gtserror.Newf("error opening output file %s: %w", outpath, err) return "", gtserror.Newf("error opening output file %s: %w", outpath, err)
} }
@ -312,9 +313,8 @@ func generateNativeThumb(
// generateWebpBlurhash generates a blurhash for Webp at filepath. // generateWebpBlurhash generates a blurhash for Webp at filepath.
func generateWebpBlurhash(filepath string) (string, error) { func generateWebpBlurhash(filepath string) (string, error) {
// Open the file at given path. // Open the file at given path.
file, err := openRead(filepath) file, err := os.Open(filepath)
if err != nil { if err != nil {
return "", gtserror.Newf("error opening input file %s: %w", filepath, err) return "", gtserror.Newf("error opening input file %s: %w", filepath, err)
} }

View file

@ -30,41 +30,14 @@ import (
"codeberg.org/gruf/go-iotools" "codeberg.org/gruf/go-iotools"
) )
// media processing tmpdir.
var tmpdir = os.TempDir()
// file represents one file // file represents one file
// with the given flag and perms. // with the given flag and perms.
type file struct { type file struct {
abs string // absolute file path, including root abs string
dir string // containing directory of abs
rel string // relative to root, i.e. trim_prefix(abs, dir)
flag int flag int
perm os.FileMode perm os.FileMode
} }
// allowRead returns a new file{} for filepath permitted only to read.
func allowRead(filepath string) file {
return newFile(filepath, os.O_RDONLY, 0)
}
// allowCreate returns a new file{} for filepath permitted to read / write / create.
func allowCreate(filepath string) file {
return newFile(filepath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
}
// newFile returns a new instance of file{} for given path and open args.
func newFile(filepath string, flag int, perms os.FileMode) file {
dir, rel := path.Split(filepath)
return file{
abs: filepath,
rel: rel,
dir: dir,
flag: flag,
perm: perms,
}
}
// allowFiles implements fs.FS to allow // allowFiles implements fs.FS to allow
// access to a specified slice of files. // access to a specified slice of files.
type allowFiles []file type allowFiles []file
@ -72,32 +45,36 @@ type allowFiles []file
// Open implements fs.FS. // Open implements fs.FS.
func (af allowFiles) Open(name string) (fs.File, error) { func (af allowFiles) Open(name string) (fs.File, error) {
for _, file := range af { for _, file := range af {
switch name { var (
abs = file.abs
flag = file.flag
perm = file.perm
)
// Allowed to open file // Allowed to open file
// at absolute path, or // at absolute path.
// relative as ffmpeg likes. if name == file.abs {
case file.abs, file.rel: return os.OpenFile(abs, flag, perm)
return os.OpenFile(file.abs, file.flag, file.perm) }
// Ffmpeg likes to read containing // Check for other valid reads.
// dir as '.'. Allow RO access here. thisDir, thisFile := path.Split(file.abs)
case ".":
return openRead(file.dir) // Allowed to read directory itself.
if name == thisDir || name == "." {
return os.OpenFile(thisDir, flag, perm)
}
// Allowed to read file
// itself (at relative path).
if name == thisFile {
return os.OpenFile(abs, flag, perm)
} }
} }
return nil, os.ErrPermission return nil, os.ErrPermission
} }
// openRead opens the existing file at path for reads only.
func openRead(path string) (*os.File, error) {
return os.OpenFile(path, os.O_RDONLY, 0)
}
// openWrite opens the (new!) file at path for read / writes.
func openWrite(path string) (*os.File, error) {
return os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
}
// getExtension splits file extension from path. // getExtension splits file extension from path.
func getExtension(path string) string { func getExtension(path string) string {
for i := len(path) - 1; i >= 0 && path[i] != '/'; i-- { for i := len(path) - 1; i >= 0 && path[i] != '/'; i-- {
@ -116,24 +93,17 @@ func getExtension(path string) string {
// chance that Linux's sendfile syscall can be utilised for optimal // chance that Linux's sendfile syscall can be utilised for optimal
// draining of data source to temporary file storage. // draining of data source to temporary file storage.
func drainToTmp(rc io.ReadCloser) (string, error) { func drainToTmp(rc io.ReadCloser) (string, error) {
var tmp *os.File defer rc.Close()
var err error
// Close handles
// on func return.
defer func() {
tmp.Close()
rc.Close()
}()
// Open new temporary file. // Open new temporary file.
tmp, err = os.CreateTemp( tmp, err := os.CreateTemp(
tmpdir, os.TempDir(),
"gotosocial-*", "gotosocial-*",
) )
if err != nil { if err != nil {
return "", err return "", err
} }
defer tmp.Close()
// Extract file path. // Extract file path.
path := tmp.Name() path := tmp.Name()

View file

@ -1,46 +1,39 @@
package embed package embed
import ( import (
"bytes"
"compress/gzip" "compress/gzip"
_ "embed" _ "embed"
"io" "io"
"strings" "os"
) )
func init() { func init() {
var err error var err error
if path := os.Getenv("FFMPREG_WASM"); path != "" {
// Read file into memory.
B, err = os.ReadFile(path)
if err != nil {
panic(err)
}
}
// Wrap bytes in reader. // Wrap bytes in reader.
r := strings.NewReader(s) b := bytes.NewReader(B)
// Create unzipper from reader. // Create unzipper from reader.
gz, err := gzip.NewReader(r) gz, err := gzip.NewReader(b)
if err != nil { if err != nil {
panic(err) panic(err)
} }
// Extract gzipped binary. // Extract gzipped binary.
b, err := io.ReadAll(gz) B, err = io.ReadAll(gz)
if err != nil { if err != nil {
panic(err) panic(err)
} }
// Set binary.
s = string(b)
} }
// B returns a copy of
// embedded binary data.
func B() []byte {
if s == "" {
panic("binary already dropped from memory")
}
return []byte(s)
}
// Free will drop embedded
// binary from runtime mem.
func Free() { s = "" }
//go:embed ffmpreg.wasm.gz //go:embed ffmpreg.wasm.gz
var s string var B []byte

View file

@ -9,93 +9,22 @@ import (
type snapshotskey struct{} type snapshotskey struct{}
type snapshotctx struct {
context.Context
snaps *snapshots
}
func (ctx snapshotctx) Value(key any) any {
if _, ok := key.(snapshotskey); ok {
return ctx.snaps
}
return ctx.Context.Value(key)
}
const ringsz uint = 8
type snapshots struct {
r [ringsz]struct {
eptr uint32
snap experimental.Snapshot
}
n uint
}
func (s *snapshots) get(envptr uint32) experimental.Snapshot {
start := (s.n % ringsz)
for i := start; i != ^uint(0); i-- {
if s.r[i].eptr == envptr {
snap := s.r[i].snap
s.r[i].eptr = 0
s.r[i].snap = nil
s.n = i - 1
return snap
}
}
for i := ringsz - 1; i > start; i-- {
if s.r[i].eptr == envptr {
snap := s.r[i].snap
s.r[i].eptr = 0
s.r[i].snap = nil
s.n = i - 1
return snap
}
}
panic("snapshot not found")
}
func (s *snapshots) set(envptr uint32, snapshot experimental.Snapshot) {
start := (s.n % ringsz)
for i := start; i < ringsz; i++ {
switch s.r[i].eptr {
case 0, envptr:
s.r[i].eptr = envptr
s.r[i].snap = snapshot
s.n = i
return
}
}
for i := uint(0); i < start; i++ {
switch s.r[i].eptr {
case 0, envptr:
s.r[i].eptr = envptr
s.r[i].snap = snapshot
s.n = i
return
}
}
panic("snapshots full")
}
// withSetjmpLongjmp updates the context to contain wazero/experimental.Snapshotter{} support, // withSetjmpLongjmp updates the context to contain wazero/experimental.Snapshotter{} support,
// and embeds the necessary snapshots map required for later calls to Setjmp() / Longjmp(). // and embeds the necessary snapshots map required for later calls to Setjmp() / Longjmp().
func withSetjmpLongjmp(ctx context.Context) context.Context { func withSetjmpLongjmp(ctx context.Context) context.Context {
return snapshotctx{Context: experimental.WithSnapshotter(ctx), snaps: new(snapshots)} snapshots := make(map[uint32]experimental.Snapshot, 10)
ctx = experimental.WithSnapshotter(ctx)
ctx = context.WithValue(ctx, snapshotskey{}, snapshots)
return ctx
} }
func getSnapshots(ctx context.Context) *snapshots { func getSnapshots(ctx context.Context) map[uint32]experimental.Snapshot {
v, _ := ctx.Value(snapshotskey{}).(*snapshots) v, _ := ctx.Value(snapshotskey{}).(map[uint32]experimental.Snapshot)
return v return v
} }
// setjmp implements the C function: setjmp(env jmp_buf) // setjmp implements the C function: setjmp(env jmp_buf)
func setjmp(ctx context.Context, _ api.Module, stack []uint64) { func setjmp(ctx context.Context, mod api.Module, stack []uint64) {
// Input arguments. // Input arguments.
envptr := api.DecodeU32(stack[0]) envptr := api.DecodeU32(stack[0])
@ -106,16 +35,19 @@ func setjmp(ctx context.Context, _ api.Module, stack []uint64) {
// Get stored snapshots map. // Get stored snapshots map.
snapshots := getSnapshots(ctx) snapshots := getSnapshots(ctx)
if snapshots == nil {
panic("setjmp / longjmp not supported")
}
// Set latest snapshot in map. // Set latest snapshot in map.
snapshots.set(envptr, snapshot) snapshots[envptr] = snapshot
// Set return. // Set return.
stack[0] = 0 stack[0] = 0
} }
// longjmp implements the C function: int longjmp(env jmp_buf, value int) // longjmp implements the C function: int longjmp(env jmp_buf, value int)
func longjmp(ctx context.Context, _ api.Module, stack []uint64) { func longjmp(ctx context.Context, mod api.Module, stack []uint64) {
// Input arguments. // Input arguments.
envptr := api.DecodeU32(stack[0]) envptr := api.DecodeU32(stack[0])
@ -128,7 +60,10 @@ func longjmp(ctx context.Context, _ api.Module, stack []uint64) {
} }
// Get snapshot stored in map. // Get snapshot stored in map.
snapshot := snapshots.get(envptr) snapshot := snapshots[envptr]
if snapshot == nil {
panic("must first call setjmp")
}
// Set return. // Set return.
stack[0] = 0 stack[0] = 0

View file

@ -53,7 +53,6 @@ func Run(
modcfg = modcfg.WithStdin(args.Stdin) modcfg = modcfg.WithStdin(args.Stdin)
modcfg = modcfg.WithStdout(args.Stdout) modcfg = modcfg.WithStdout(args.Stdout)
modcfg = modcfg.WithStderr(args.Stderr) modcfg = modcfg.WithStderr(args.Stderr)
modcfg = modcfg.WithName("")
if args.Config != nil { if args.Config != nil {
// Pass through config fn. // Pass through config fn.

View file

@ -28,7 +28,6 @@ func NewRuntime(ctx context.Context, cfg wazero.RuntimeConfig) (wazero.Runtime,
// Set core features ffmpeg compiled with. // Set core features ffmpeg compiled with.
cfg = cfg.WithCoreFeatures(CoreFeatures) cfg = cfg.WithCoreFeatures(CoreFeatures)
cfg = cfg.WithDebugInfoEnabled(false)
// Instantiate runtime with prepared config. // Instantiate runtime with prepared config.
rt := wazero.NewRuntimeWithConfig(ctx, cfg) rt := wazero.NewRuntimeWithConfig(ctx, cfg)

2
vendor/modules.txt vendored
View file

@ -247,7 +247,7 @@ codeberg.org/gruf/go-fastcopy
# codeberg.org/gruf/go-fastpath/v2 v2.0.0 # codeberg.org/gruf/go-fastpath/v2 v2.0.0
## explicit; go 1.14 ## explicit; go 1.14
codeberg.org/gruf/go-fastpath/v2 codeberg.org/gruf/go-fastpath/v2
# codeberg.org/gruf/go-ffmpreg v0.6.12 # codeberg.org/gruf/go-ffmpreg v0.6.11
## explicit; go 1.22.0 ## explicit; go 1.22.0
codeberg.org/gruf/go-ffmpreg/embed codeberg.org/gruf/go-ffmpreg/embed
codeberg.org/gruf/go-ffmpreg/wasm codeberg.org/gruf/go-ffmpreg/wasm