wowee some serious moving stuff around

This commit is contained in:
tsmethurst 2021-05-05 19:10:13 +02:00
commit 41e6e8ed10
35 changed files with 611 additions and 459 deletions

View file

@ -23,6 +23,7 @@ import (
apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
"github.com/superseriousbusiness/gotosocial/internal/config"
"github.com/superseriousbusiness/gotosocial/internal/db"
"github.com/superseriousbusiness/gotosocial/internal/federation"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/media"
"github.com/superseriousbusiness/gotosocial/internal/oauth"
@ -39,8 +40,12 @@ import (
type Processor interface {
// ToClientAPI returns a channel for putting in messages that need to go to the gts client API.
ToClientAPI() chan ToClientAPI
// FromClientAPI returns a channel for putting messages in that come from the client api going to the processor
FromClientAPI() chan FromClientAPI
// ToFederator returns a channel for putting in messages that need to go to the federator (activitypub).
ToFederator() chan ToFederator
// FromFederator returns a channel for putting messages in that come from the federator going into the processor
FromFederator() chan FromFederator
/*
API-FACING PROCESSING FUNCTIONS
@ -86,53 +91,67 @@ type Processor interface {
// processor just implements the Processor interface
type processor struct {
// federator pub.FederatingActor
toClientAPI chan ToClientAPI
toFederator chan ToFederator
stop chan interface{}
log *logrus.Logger
config *config.Config
tc typeutils.TypeConverter
oauthServer oauth.Server
mediaHandler media.Handler
storage storage.Storage
db db.DB
toClientAPI chan ToClientAPI
fromClientAPI chan FromClientAPI
toFederator chan ToFederator
fromFederator chan FromFederator
federator federation.Federator
stop chan interface{}
log *logrus.Logger
config *config.Config
tc typeutils.TypeConverter
oauthServer oauth.Server
mediaHandler media.Handler
storage storage.Storage
db db.DB
}
// NewProcessor returns a new Processor that uses the given federator and logger
func NewProcessor(config *config.Config, tc typeutils.TypeConverter, oauthServer oauth.Server, mediaHandler media.Handler, storage storage.Storage, db db.DB, log *logrus.Logger) Processor {
func NewProcessor(config *config.Config, tc typeutils.TypeConverter, federator federation.Federator, oauthServer oauth.Server, mediaHandler media.Handler, storage storage.Storage, db db.DB, log *logrus.Logger) Processor {
return &processor{
toClientAPI: make(chan ToClientAPI, 100),
toFederator: make(chan ToFederator, 100),
stop: make(chan interface{}),
log: log,
config: config,
tc: tc,
oauthServer: oauthServer,
mediaHandler: mediaHandler,
storage: storage,
db: db,
toClientAPI: make(chan ToClientAPI, 100),
fromClientAPI: make(chan FromClientAPI, 100),
toFederator: make(chan ToFederator, 100),
fromFederator: make(chan FromFederator, 100),
federator: federator,
stop: make(chan interface{}),
log: log,
config: config,
tc: tc,
oauthServer: oauthServer,
mediaHandler: mediaHandler,
storage: storage,
db: db,
}
}
func (d *processor) ToClientAPI() chan ToClientAPI {
return d.toClientAPI
func (p *processor) ToClientAPI() chan ToClientAPI {
return p.toClientAPI
}
func (d *processor) ToFederator() chan ToFederator {
return d.toFederator
func (p *processor) FromClientAPI() chan FromClientAPI {
return p.fromClientAPI
}
func (p *processor) ToFederator() chan ToFederator {
return p.toFederator
}
func (p *processor) FromFederator() chan FromFederator {
return p.fromFederator
}
// Start starts the Processor, reading from its channels and passing messages back and forth.
func (d *processor) Start() error {
func (p *processor) Start() error {
go func() {
DistLoop:
for {
select {
case clientMsg := <-d.toClientAPI:
d.log.Infof("received message TO client API: %+v", clientMsg)
case federatorMsg := <-d.toFederator:
d.log.Infof("received message TO federator: %+v", federatorMsg)
case <-d.stop:
case clientMsg := <-p.toClientAPI:
p.log.Infof("received message TO client API: %+v", clientMsg)
case federatorMsg := <-p.toFederator:
p.log.Infof("received message TO federator: %+v", federatorMsg)
case <-p.stop:
break DistLoop
}
}
@ -142,8 +161,8 @@ func (d *processor) Start() error {
// Stop stops the processor cleanly, finishing handling any remaining messages before closing down.
// TODO: empty message buffer properly before stopping otherwise we'll lose federating messages.
func (d *processor) Stop() error {
close(d.stop)
func (p *processor) Stop() error {
close(p.stop)
return nil
}
@ -154,9 +173,23 @@ type ToClientAPI struct {
Activity interface{}
}
// FromClientAPI wraps a message that travels from client API into the processor
type FromClientAPI struct {
APObjectType gtsmodel.ActivityStreamsObject
APActivityType gtsmodel.ActivityStreamsActivity
Activity interface{}
}
// ToFederator wraps a message that travels from the processor into the federator
type ToFederator struct {
APObjectType gtsmodel.ActivityStreamsObject
APActivityType gtsmodel.ActivityStreamsActivity
Activity interface{}
}
// FromFederator wraps a message that travels from the federator into the processor
type FromFederator struct {
APObjectType gtsmodel.ActivityStreamsObject
APActivityType gtsmodel.ActivityStreamsActivity
Activity interface{}
}