mirror of
https://github.com/superseriousbusiness/gotosocial.git
synced 2025-10-29 09:52:26 -05:00
[chore] move client/federator workerpools to Workers{} (#1575)
* replace concurrency worker pools with base models in State.Workers, update code and tests accordingly * improve code comment * change back testrig default log level * un-comment-out TestAnnounceTwice() and fix --------- Signed-off-by: kim <grufwub@gmail.com> Reviewed-by: tobi
This commit is contained in:
parent
24cec4e7aa
commit
baf933cb9f
130 changed files with 1037 additions and 1083 deletions
|
|
@ -35,7 +35,6 @@ import (
|
|||
"github.com/superseriousbusiness/gotosocial/internal/middleware"
|
||||
"go.uber.org/automaxprocs/maxprocs"
|
||||
|
||||
"github.com/superseriousbusiness/gotosocial/internal/concurrency"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/config"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/db/bundb"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/email"
|
||||
|
|
@ -45,7 +44,6 @@ import (
|
|||
"github.com/superseriousbusiness/gotosocial/internal/httpclient"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/log"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/media"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/messages"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/oauth"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/oidc"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/processing"
|
||||
|
|
@ -107,19 +105,11 @@ var Start action.GTSAction = func(ctx context.Context) error {
|
|||
state.Workers.Start()
|
||||
defer state.Workers.Stop()
|
||||
|
||||
// Create the client API and federator worker pools
|
||||
// NOTE: these MUST NOT be used until they are passed to the
|
||||
// processor and it is started. The reason being that the processor
|
||||
// sets the Worker process functions and start the underlying pools
|
||||
// TODO: move these into state.Workers (and maybe reformat worker pools).
|
||||
clientWorker := concurrency.NewWorkerPool[messages.FromClientAPI](-1, -1)
|
||||
fedWorker := concurrency.NewWorkerPool[messages.FromFederator](-1, -1)
|
||||
|
||||
// build backend handlers
|
||||
mediaManager := media.NewManager(&state)
|
||||
oauthServer := oauth.New(ctx, dbService)
|
||||
typeConverter := typeutils.NewConverter(dbService)
|
||||
federatingDB := federatingdb.New(dbService, fedWorker, typeConverter)
|
||||
federatingDB := federatingdb.New(&state, typeConverter)
|
||||
transportController := transport.NewController(dbService, federatingDB, &federation.Clock{}, client)
|
||||
federator := federation.NewFederator(dbService, federatingDB, transportController, typeConverter, mediaManager)
|
||||
|
||||
|
|
@ -140,11 +130,15 @@ var Start action.GTSAction = func(ctx context.Context) error {
|
|||
}
|
||||
|
||||
// create the message processor using the other services we've created so far
|
||||
processor := processing.NewProcessor(typeConverter, federator, oauthServer, mediaManager, storage, dbService, emailSender, clientWorker, fedWorker)
|
||||
processor := processing.NewProcessor(typeConverter, federator, oauthServer, mediaManager, &state, emailSender)
|
||||
if err := processor.Start(); err != nil {
|
||||
return fmt.Errorf("error creating processor: %s", err)
|
||||
}
|
||||
|
||||
// Set state client / federator worker enqueue functions
|
||||
state.Workers.EnqueueClientAPI = processor.EnqueueClientAPI
|
||||
state.Workers.EnqueueFederator = processor.EnqueueFederator
|
||||
|
||||
/*
|
||||
HTTP router initialization
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -33,14 +33,13 @@ import (
|
|||
"github.com/superseriousbusiness/gotosocial/cmd/gotosocial/action"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/api"
|
||||
apiutil "github.com/superseriousbusiness/gotosocial/internal/api/util"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/concurrency"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/config"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/gotosocial"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/log"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/messages"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/middleware"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/oidc"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/state"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/storage"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/web"
|
||||
"github.com/superseriousbusiness/gotosocial/testrig"
|
||||
|
|
@ -48,37 +47,44 @@ import (
|
|||
|
||||
// Start creates and starts a gotosocial testrig server
|
||||
var Start action.GTSAction = func(ctx context.Context) error {
|
||||
var state state.State
|
||||
|
||||
testrig.InitTestConfig()
|
||||
testrig.InitTestLog()
|
||||
|
||||
dbService := testrig.NewTestDB()
|
||||
testrig.StandardDBSetup(dbService, nil)
|
||||
var storageBackend *storage.Driver
|
||||
if os.Getenv("GTS_STORAGE_BACKEND") == "s3" {
|
||||
storageBackend, _ = storage.NewS3Storage()
|
||||
} else {
|
||||
storageBackend = testrig.NewInMemoryStorage()
|
||||
}
|
||||
testrig.StandardStorageSetup(storageBackend, "./testrig/media")
|
||||
// Initialize caches
|
||||
state.Caches.Init()
|
||||
state.Caches.Start()
|
||||
defer state.Caches.Stop()
|
||||
|
||||
// Create client API and federator worker pools
|
||||
clientWorker := concurrency.NewWorkerPool[messages.FromClientAPI](-1, -1)
|
||||
fedWorker := concurrency.NewWorkerPool[messages.FromFederator](-1, -1)
|
||||
state.DB = testrig.NewTestDB(&state)
|
||||
testrig.StandardDBSetup(state.DB, nil)
|
||||
|
||||
if os.Getenv("GTS_STORAGE_BACKEND") == "s3" {
|
||||
state.Storage, _ = storage.NewS3Storage()
|
||||
} else {
|
||||
state.Storage = testrig.NewInMemoryStorage()
|
||||
}
|
||||
testrig.StandardStorageSetup(state.Storage, "./testrig/media")
|
||||
|
||||
// Initialize workers.
|
||||
state.Workers.Start()
|
||||
defer state.Workers.Stop()
|
||||
|
||||
// build backend handlers
|
||||
transportController := testrig.NewTestTransportController(testrig.NewMockHTTPClient(func(req *http.Request) (*http.Response, error) {
|
||||
transportController := testrig.NewTestTransportController(&state, testrig.NewMockHTTPClient(func(req *http.Request) (*http.Response, error) {
|
||||
r := io.NopCloser(bytes.NewReader([]byte{}))
|
||||
return &http.Response{
|
||||
StatusCode: 200,
|
||||
Body: r,
|
||||
}, nil
|
||||
}, ""), dbService, fedWorker)
|
||||
mediaManager := testrig.NewTestMediaManager(dbService, storageBackend)
|
||||
federator := testrig.NewTestFederator(dbService, transportController, storageBackend, mediaManager, fedWorker)
|
||||
}, ""))
|
||||
mediaManager := testrig.NewTestMediaManager(&state)
|
||||
federator := testrig.NewTestFederator(&state, transportController, mediaManager)
|
||||
|
||||
emailSender := testrig.NewEmailSender("./web/template/", nil)
|
||||
|
||||
processor := testrig.NewTestProcessor(dbService, storageBackend, federator, emailSender, mediaManager, clientWorker, fedWorker)
|
||||
processor := testrig.NewTestProcessor(&state, federator, emailSender, mediaManager)
|
||||
if err := processor.Start(); err != nil {
|
||||
return fmt.Errorf("error starting processor: %s", err)
|
||||
}
|
||||
|
|
@ -87,7 +93,7 @@ var Start action.GTSAction = func(ctx context.Context) error {
|
|||
HTTP router initialization
|
||||
*/
|
||||
|
||||
router := testrig.NewTestRouter(dbService)
|
||||
router := testrig.NewTestRouter(state.DB)
|
||||
|
||||
// attach global middlewares which are used for every request
|
||||
router.AttachGlobalMiddleware(
|
||||
|
|
@ -112,7 +118,7 @@ var Start action.GTSAction = func(ctx context.Context) error {
|
|||
}
|
||||
}
|
||||
|
||||
routerSession, err := dbService.GetSession(ctx)
|
||||
routerSession, err := state.DB.GetSession(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error retrieving router session for session middleware: %w", err)
|
||||
}
|
||||
|
|
@ -123,13 +129,13 @@ var Start action.GTSAction = func(ctx context.Context) error {
|
|||
}
|
||||
|
||||
var (
|
||||
authModule = api.NewAuth(dbService, processor, idp, routerSession, sessionName) // auth/oauth paths
|
||||
clientModule = api.NewClient(dbService, processor) // api client endpoints
|
||||
fileserverModule = api.NewFileserver(processor) // fileserver endpoints
|
||||
wellKnownModule = api.NewWellKnown(processor) // .well-known endpoints
|
||||
nodeInfoModule = api.NewNodeInfo(processor) // nodeinfo endpoint
|
||||
activityPubModule = api.NewActivityPub(dbService, processor) // ActivityPub endpoints
|
||||
webModule = web.New(dbService, processor) // web pages + user profiles + settings panels etc
|
||||
authModule = api.NewAuth(state.DB, processor, idp, routerSession, sessionName) // auth/oauth paths
|
||||
clientModule = api.NewClient(state.DB, processor) // api client endpoints
|
||||
fileserverModule = api.NewFileserver(processor) // fileserver endpoints
|
||||
wellKnownModule = api.NewWellKnown(processor) // .well-known endpoints
|
||||
nodeInfoModule = api.NewNodeInfo(processor) // nodeinfo endpoint
|
||||
activityPubModule = api.NewActivityPub(state.DB, processor) // ActivityPub endpoints
|
||||
webModule = web.New(state.DB, processor) // web pages + user profiles + settings panels etc
|
||||
)
|
||||
|
||||
// these should be routed in order
|
||||
|
|
@ -142,7 +148,7 @@ var Start action.GTSAction = func(ctx context.Context) error {
|
|||
activityPubModule.RoutePublicKey(router)
|
||||
webModule.Route(router)
|
||||
|
||||
gts, err := gotosocial.NewServer(dbService, router, federator, mediaManager)
|
||||
gts, err := gotosocial.NewServer(state.DB, router, federator, mediaManager)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating gotosocial service: %s", err)
|
||||
}
|
||||
|
|
@ -157,8 +163,8 @@ var Start action.GTSAction = func(ctx context.Context) error {
|
|||
sig := <-sigs
|
||||
log.Infof(ctx, "received signal %s, shutting down", sig)
|
||||
|
||||
testrig.StandardDBTeardown(dbService)
|
||||
testrig.StandardStorageTeardown(storageBackend)
|
||||
testrig.StandardDBTeardown(state.DB)
|
||||
testrig.StandardStorageTeardown(state.Storage)
|
||||
|
||||
// close down all running services in order
|
||||
if err := gts.Stop(ctx); err != nil {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue