This commit is contained in:
tobi 2025-04-25 12:36:55 +02:00
commit 6c2a614851
884 changed files with 1876 additions and 3539 deletions

View file

@ -0,0 +1,270 @@
# pub
Implements the Social and Federating Protocols in the ActivityPub specification.
## Reference & Tutorial
The [go-fed website](https://go-fed.org/) contains tutorials and reference
materials, in addition to the rest of this README.
## How To Use
```
go get github.com/go-fed/activity
```
The root of all ActivityPub behavior is the `Actor`, which requires you to
implement a few interfaces:
```golang
import (
"code.superseriousbusiness.org/activity/pub"
)
type myActivityPubApp struct { /* ... */ }
type myAppsDatabase struct { /* ... */ }
type myAppsClock struct { /* ... */ }
var (
// Your app will implement pub.CommonBehavior, and either
// pub.SocialProtocol, pub.FederatingProtocol, or both.
myApp = &myActivityPubApp{}
myCommonBehavior pub.CommonBehavior = myApp
mySocialProtocol pub.SocialProtocol = myApp
myFederatingProtocol pub.FederatingProtocol = myApp
// Your app's database implementation.
myDatabase pub.Database = &myAppsDatabase{}
// Your app's clock.
myClock pub.Clock = &myAppsClock{}
)
// Only support the C2S Social protocol
actor := pub.NewSocialActor(
myCommonBehavior,
mySocialProtocol,
myDatabase,
myClock)
// OR
//
// Only support S2S Federating protocol
actor = pub.NewFederatingActor(
myCommonBehavior,
myFederatingProtocol,
myDatabase,
myClock)
// OR
//
// Support both C2S Social and S2S Federating protocol.
actor = pub.NewActor(
myCommonBehavior,
mySocialProtocol,
myFederatingProtocol,
myDatabase,
myClock)
```
Next, hook the `Actor` into your web server:
```golang
// The application's actor
var actor pub.Actor
var outboxHandler http.HandlerFunc = func(w http.ResponseWriter, r *http.Request) {
c := context.Background()
// Populate c with request-specific information
if handled, err := actor.PostOutbox(c, w, r); err != nil {
// Write to w
return
} else if handled {
return
} else if handled, err = actor.GetOutbox(c, w, r); err != nil {
// Write to w
return
} else if handled {
return
}
// else:
//
// Handle non-ActivityPub request, such as serving a webpage.
}
var inboxHandler http.HandlerFunc = func(w http.ResponseWriter, r *http.Request) {
c := context.Background()
// Populate c with request-specific information
if handled, err := actor.PostInbox(c, w, r); err != nil {
// Write to w
return
} else if handled {
return
} else if handled, err = actor.GetInbox(c, w, r); err != nil {
// Write to w
return
} else if handled {
return
}
// else:
//
// Handle non-ActivityPub request, such as serving a webpage.
}
// Add the handlers to a HTTP server
serveMux := http.NewServeMux()
serveMux.HandleFunc("/actor/outbox", outboxHandler)
serveMux.HandleFunc("/actor/inbox", inboxHandler)
var server http.Server
server.Handler = serveMux
```
To serve ActivityStreams data:
```golang
myHander := pub.NewActivityStreamsHandler(myDatabase, myClock)
var activityStreamsHandler http.HandlerFunc = func(w http.ResponseWriter, r *http.Request) {
c := context.Background()
// Populate c with request-specific information
if handled, err := myHandler(c, w, r); err != nil {
// Write to w
return
} else if handled {
return
}
// else:
//
// Handle non-ActivityPub request, such as serving a webpage.
}
serveMux.HandleFunc("/some/data/like/a/note", activityStreamsHandler)
```
### Dependency Injection
Package `pub` relies on dependency injection to provide out-of-the-box support
for ActivityPub. The interfaces to be satisfied are:
* `CommonBehavior` - Behavior needed regardless of which Protocol is used.
* `SocialProtocol` - Behavior needed for the Social Protocol.
* `FederatingProtocol` - Behavior needed for the Federating Protocol.
* `Database` - The data store abstraction, not tied to the `database/sql`
package.
* `Clock` - The server's internal clock.
* `Transport` - Responsible for the network that serves requests and deliveries
of ActivityStreams data. A `HttpSigTransport` type is provided.
These implementations form the core of an application's behavior without
worrying about the particulars and pitfalls of the ActivityPub protocol.
Implementing these interfaces gives you greater assurance about being
ActivityPub compliant.
### Application Logic
The `SocialProtocol` and `FederatingProtocol` are responsible for returning
callback functions compatible with `streams.TypeResolver`. They also return
`SocialWrappedCallbacks` and `FederatingWrappedCallbacks`, which are nothing
more than a bundle of default behaviors for types like `Create`, `Update`, and
so on.
Applications will want to focus on implementing their specific behaviors in the
callbacks, and have fine-grained control over customization:
```golang
// Implements the FederatingProtocol interface.
//
// This illustration can also be applied for the Social Protocol.
func (m *myAppsFederatingProtocol) Callbacks(c context.Context) (wrapped pub.FederatingWrappedCallbacks, other []interface{}) {
// The context 'c' has request-specific logic and can be used to apply complex
// logic building the right behaviors, if desired.
//
// 'c' will later be passed through to the callbacks created below.
wrapped = pub.FederatingWrappedCallbacks{
Create: func(ctx context.Context, create vocab.ActivityStreamsCreate) error {
// This function is wrapped by default behavior.
//
// More application specific logic can be written here.
//
// 'ctx' will have request-specific information from the HTTP handler. It
// is the same as the 'c' passed to the Callbacks method.
// 'create' has, at this point, already triggered the recommended
// ActivityPub side effect behavior. The application can process it
// further as needed.
return nil
},
}
// The 'other' must contain functions that satisfy the signature pattern
// required by streams.JSONResolver.
//
// If they are not, at runtime errors will be returned to indicate this.
other = []interface{}{
// The FederatingWrappedCallbacks has default behavior for an "Update" type,
// but since we are providing this behavior in "other" and not in the
// FederatingWrappedCallbacks.Update member, we will entirely replace the
// default behavior provided by go-fed. Be careful that this still
// implements ActivityPub properly.
func(ctx context.Context, update vocab.ActivityStreamsUpdate) error {
// This function is NOT wrapped by default behavior.
//
// Application specific logic can be written here.
//
// 'ctx' will have request-specific information from the HTTP handler. It
// is the same as the 'c' passed to the Callbacks method.
// 'update' will NOT trigger the recommended ActivityPub side effect
// behavior. The application should do so in addition to any other custom
// side effects required.
return nil
},
// The "Listen" type has no default suggested behavior in ActivityPub, so
// this just makes this application able to handle "Listen" activities.
func(ctx context.Context, listen vocab.ActivityStreamsListen) error {
// This function is NOT wrapped by default behavior. There's not a
// FederatingWrappedCallbacks.Listen member to wrap.
//
// Application specific logic can be written here.
//
// 'ctx' will have request-specific information from the HTTP handler. It
// is the same as the 'c' passed to the Callbacks method.
// 'listen' can be processed with side effects as the application needs.
return nil
},
}
return
}
```
The `pub` package supports applications that grow into more custom solutions by
overriding the default behaviors as needed.
### ActivityStreams Extensions: Future-Proofing An Application
Package `pub` relies on the `streams.TypeResolver` and `streams.JSONResolver`
code generated types. As new ActivityStreams extensions are developed and their
code is generated, `pub` will automatically pick up support for these
extensions.
The steps to rapidly implement a new extension in a `pub` application are:
1. Generate an OWL definition of the ActivityStreams extension. This definition
could be the same one defining the vocabulary at the `@context` IRI.
2. Run `astool` to autogenerate the golang types in the `streams` package.
3. Implement the application's callbacks in the `FederatingProtocol.Callbacks`
or `SocialProtocol.Callbacks` for the new behaviors needed.
4. Build the application, which builds `pub`, with the newly generated `streams`
code. No code changes in `pub` are required.
Whether an author of an ActivityStreams extension or an application developer,
these quick steps should reduce the barrier to adopion in a statically-typed
environment.
### DelegateActor
For those that need a near-complete custom ActivityPub solution, or want to have
that possibility in the future after adopting go-fed, the `DelegateActor`
interface can be used to obtain an `Actor`:
```golang
// Use custom ActivityPub implementation
actor = pub.NewCustomActor(
myDelegateActor,
isSocialProtocolEnabled,
isFederatedProtocolEnabled,
myAppsClock)
```
It does not guarantee that an implementation adheres to the ActivityPub
specification. It acts as a stepping stone for applications that want to build
up to a fully custom solution and not be locked into the `pub` package
implementation.

View file

@ -0,0 +1,49 @@
package pub
import (
"code.superseriousbusiness.org/activity/streams/vocab"
)
// Activity represents any ActivityStreams Activity type.
//
// The Activity types provided in the streams package implement this.
type Activity interface {
// Activity is also a vocab.Type
vocab.Type
// GetActivityStreamsActor returns the "actor" property if it exists, and
// nil otherwise.
GetActivityStreamsActor() vocab.ActivityStreamsActorProperty
// GetActivityStreamsAudience returns the "audience" property if it
// exists, and nil otherwise.
GetActivityStreamsAudience() vocab.ActivityStreamsAudienceProperty
// GetActivityStreamsBcc returns the "bcc" property if it exists, and nil
// otherwise.
GetActivityStreamsBcc() vocab.ActivityStreamsBccProperty
// GetActivityStreamsBto returns the "bto" property if it exists, and nil
// otherwise.
GetActivityStreamsBto() vocab.ActivityStreamsBtoProperty
// GetActivityStreamsCc returns the "cc" property if it exists, and nil
// otherwise.
GetActivityStreamsCc() vocab.ActivityStreamsCcProperty
// GetActivityStreamsTo returns the "to" property if it exists, and nil
// otherwise.
GetActivityStreamsTo() vocab.ActivityStreamsToProperty
// GetActivityStreamsAttributedTo returns the "attributedTo" property if
// it exists, and nil otherwise.
GetActivityStreamsAttributedTo() vocab.ActivityStreamsAttributedToProperty
// GetActivityStreamsObject returns the "object" property if it exists,
// and nil otherwise.
GetActivityStreamsObject() vocab.ActivityStreamsObjectProperty
// SetActivityStreamsActor sets the "actor" property.
SetActivityStreamsActor(i vocab.ActivityStreamsActorProperty)
// SetActivityStreamsObject sets the "object" property.
SetActivityStreamsObject(i vocab.ActivityStreamsObjectProperty)
// SetActivityStreamsTo sets the "to" property.
SetActivityStreamsTo(i vocab.ActivityStreamsToProperty)
// SetActivityStreamsBto sets the "bto" property.
SetActivityStreamsBto(i vocab.ActivityStreamsBtoProperty)
// SetActivityStreamsBcc sets the "bcc" property.
SetActivityStreamsBcc(i vocab.ActivityStreamsBccProperty)
// SetActivityStreamsAttributedTo sets the "attributedTo" property.
SetActivityStreamsAttributedTo(i vocab.ActivityStreamsAttributedToProperty)
}

View file

@ -0,0 +1,128 @@
package pub
import (
"context"
"net/http"
"net/url"
"code.superseriousbusiness.org/activity/streams/vocab"
)
// Actor represents ActivityPub's actor concept. It conceptually has an inbox
// and outbox that receives either a POST or GET request, which triggers side
// effects in the federating application.
//
// An Actor within an application may federate server-to-server (Federation
// Protocol), client-to-server (Social API), or both. The Actor represents the
// server in either use case.
//
// An actor can be created by calling NewSocialActor (only the Social Protocol
// is supported), NewFederatingActor (only the Federating Protocol is
// supported), NewActor (both are supported), or NewCustomActor (neither are).
//
// Not all Actors have the same behaviors depending on the constructor used to
// create them. Refer to the constructor's documentation to determine the exact
// behavior of the Actor on an application.
//
// The behaviors documented here are common to all Actors returned by any
// constructor.
type Actor interface {
// PostInbox returns true if the request was handled as an ActivityPub
// POST to an actor's inbox. If false, the request was not an
// ActivityPub request and may still be handled by the caller in
// another way, such as serving a web page.
//
// If the error is nil, then the ResponseWriter's headers and response
// has already been written. If a non-nil error is returned, then no
// response has been written.
//
// If the Actor was constructed with the Federated Protocol enabled,
// side effects will occur.
//
// If the Federated Protocol is not enabled, writes the
// http.StatusMethodNotAllowed status code in the response. No side
// effects occur.
//
// The request and data of your application will be interpreted as
// having an HTTPS protocol scheme.
PostInbox(c context.Context, w http.ResponseWriter, r *http.Request) (bool, error)
// PostInboxScheme is similar to PostInbox, except clients are able to
// specify which protocol scheme to handle the incoming request and the
// data stored within the application (HTTP, HTTPS, etc).
PostInboxScheme(c context.Context, w http.ResponseWriter, r *http.Request, scheme string) (bool, error)
// GetInbox returns true if the request was handled as an ActivityPub
// GET to an actor's inbox. If false, the request was not an ActivityPub
// request and may still be handled by the caller in another way, such
// as serving a web page.
//
// If the error is nil, then the ResponseWriter's headers and response
// has already been written. If a non-nil error is returned, then no
// response has been written.
//
// If the request is an ActivityPub request, the Actor will defer to the
// application to determine the correct authorization of the request and
// the resulting OrderedCollection to respond with. The Actor handles
// serializing this OrderedCollection and responding with the correct
// headers and http.StatusOK.
GetInbox(c context.Context, w http.ResponseWriter, r *http.Request) (bool, error)
// PostOutbox returns true if the request was handled as an ActivityPub
// POST to an actor's outbox. If false, the request was not an
// ActivityPub request and may still be handled by the caller in another
// way, such as serving a web page.
//
// If the error is nil, then the ResponseWriter's headers and response
// has already been written. If a non-nil error is returned, then no
// response has been written.
//
// If the Actor was constructed with the Social Protocol enabled, side
// effects will occur.
//
// If the Social Protocol is not enabled, writes the
// http.StatusMethodNotAllowed status code in the response. No side
// effects occur.
//
// If the Social and Federated Protocol are both enabled, it will handle
// the side effects of receiving an ActivityStream Activity, and then
// federate the Activity to peers.
//
// The request will be interpreted as having an HTTPS scheme.
PostOutbox(c context.Context, w http.ResponseWriter, r *http.Request) (bool, error)
// PostOutboxScheme is similar to PostOutbox, except clients are able to
// specify which protocol scheme to handle the incoming request and the
// data stored within the application (HTTP, HTTPS, etc).
PostOutboxScheme(c context.Context, w http.ResponseWriter, r *http.Request, scheme string) (bool, error)
// GetOutbox returns true if the request was handled as an ActivityPub
// GET to an actor's outbox. If false, the request was not an
// ActivityPub request.
//
// If the error is nil, then the ResponseWriter's headers and response
// has already been written. If a non-nil error is returned, then no
// response has been written.
//
// If the request is an ActivityPub request, the Actor will defer to the
// application to determine the correct authorization of the request and
// the resulting OrderedCollection to respond with. The Actor handles
// serializing this OrderedCollection and responding with the correct
// headers and http.StatusOK.
GetOutbox(c context.Context, w http.ResponseWriter, r *http.Request) (bool, error)
}
// FederatingActor is an Actor that allows programmatically delivering an
// Activity to a federating peer.
type FederatingActor interface {
Actor
// Send a federated activity.
//
// The provided url must be the outbox of the sender. All processing of
// the activity occurs similarly to the C2S flow:
// - If t is not an Activity, it is wrapped in a Create activity.
// - A new ID is generated for the activity.
// - The activity is added to the specified outbox.
// - The activity is prepared and delivered to recipients.
//
// Note that this function will only behave as expected if the
// implementation has been constructed to support federation. This
// method will guaranteed work for non-custom Actors. For custom actors,
// care should be used to not call this method if only C2S is supported.
Send(c context.Context, outbox *url.URL, t vocab.Type) (Activity, error)
}

View file

@ -0,0 +1,475 @@
package pub
import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"code.superseriousbusiness.org/activity/streams"
"code.superseriousbusiness.org/activity/streams/vocab"
)
// baseActor must satisfy the Actor interface.
var _ Actor = &baseActor{}
// baseActor is an application-independent ActivityPub implementation. It does
// not implement the entire protocol, and relies on a delegate to do so. It
// only implements the part of the protocol that is side-effect-free, allowing
// an existing application to write a DelegateActor that glues their application
// into the ActivityPub world.
//
// It is preferred to use a DelegateActor provided by this library, so that the
// application does not need to worry about the ActivityPub implementation.
type baseActor struct {
// delegate contains application-specific delegation logic.
delegate DelegateActor
// enableSocialProtocol enables or disables the Social API, the client to
// server part of ActivityPub. Useful if permitting remote clients to
// act on behalf of the users of the client application.
enableSocialProtocol bool
// enableFederatedProtocol enables or disables the Federated Protocol, or the
// server to server part of ActivityPub. Useful to permit integrating
// with the rest of the federative web.
enableFederatedProtocol bool
// clock simply tracks the current time.
clock Clock
}
// baseActorFederating must satisfy the FederatingActor interface.
var _ FederatingActor = &baseActorFederating{}
// baseActorFederating is a baseActor that also satisfies the FederatingActor
// interface.
//
// The baseActor is preserved as an Actor which will not successfully cast to a
// FederatingActor.
type baseActorFederating struct {
baseActor
}
// NewSocialActor builds a new Actor concept that handles only the Social
// Protocol part of ActivityPub.
//
// This Actor can be created once in an application and reused to handle
// multiple requests concurrently and for different endpoints.
//
// It leverages as much of go-fed as possible to ensure the implementation is
// compliant with the ActivityPub specification, while providing enough freedom
// to be productive without shooting one's self in the foot.
//
// Do not try to use NewSocialActor and NewFederatingActor together to cover
// both the Social and Federating parts of the protocol. Instead, use NewActor.
func NewSocialActor(c CommonBehavior,
c2s SocialProtocol,
db Database,
clock Clock) Actor {
return &baseActor{
// Use SideEffectActor without s2s.
delegate: NewSideEffectActor(c, nil, c2s, db, clock),
enableSocialProtocol: true,
clock: clock,
}
}
// NewFederatingActor builds a new Actor concept that handles only the Federating
// Protocol part of ActivityPub.
//
// This Actor can be created once in an application and reused to handle
// multiple requests concurrently and for different endpoints.
//
// It leverages as much of go-fed as possible to ensure the implementation is
// compliant with the ActivityPub specification, while providing enough freedom
// to be productive without shooting one's self in the foot.
//
// Do not try to use NewSocialActor and NewFederatingActor together to cover
// both the Social and Federating parts of the protocol. Instead, use NewActor.
func NewFederatingActor(c CommonBehavior,
s2s FederatingProtocol,
db Database,
clock Clock) FederatingActor {
return &baseActorFederating{
baseActor{
// Use SideEffectActor without c2s.
delegate: NewSideEffectActor(c, s2s, nil, db, clock),
enableFederatedProtocol: true,
clock: clock,
},
}
}
// NewActor builds a new Actor concept that handles both the Social and
// Federating Protocol parts of ActivityPub.
//
// This Actor can be created once in an application and reused to handle
// multiple requests concurrently and for different endpoints.
//
// It leverages as much of go-fed as possible to ensure the implementation is
// compliant with the ActivityPub specification, while providing enough freedom
// to be productive without shooting one's self in the foot.
func NewActor(c CommonBehavior,
c2s SocialProtocol,
s2s FederatingProtocol,
db Database,
clock Clock) FederatingActor {
return &baseActorFederating{
baseActor{
delegate: NewSideEffectActor(c, s2s, c2s, db, clock),
enableSocialProtocol: true,
enableFederatedProtocol: true,
clock: clock,
},
}
}
// NewCustomActor allows clients to create a custom ActivityPub implementation
// for the Social Protocol, Federating Protocol, or both.
//
// It still uses the library as a high-level scaffold, which has the benefit of
// allowing applications to grow into a custom ActivityPub solution without
// having to refactor the code that passes HTTP requests into the Actor.
//
// It is possible to create a DelegateActor that is not ActivityPub compliant.
// Use with due care.
//
// If you find yourself passing a SideEffectActor in as the DelegateActor,
// consider using NewActor, NewFederatingActor, or NewSocialActor instead.
func NewCustomActor(delegate DelegateActor,
enableSocialProtocol, enableFederatedProtocol bool,
clock Clock) FederatingActor {
return &baseActorFederating{
baseActor{
delegate: delegate,
enableSocialProtocol: enableSocialProtocol,
enableFederatedProtocol: enableFederatedProtocol,
clock: clock,
},
}
}
// PostInbox implements the generic algorithm for handling a POST request to an
// actor's inbox independent on an application. It relies on a delegate to
// implement application specific functionality.
//
// Only supports serving data with identifiers having the HTTPS scheme.
func (b *baseActor) PostInbox(c context.Context, w http.ResponseWriter, r *http.Request) (bool, error) {
return b.PostInboxScheme(c, w, r, "https")
}
// PostInbox implements the generic algorithm for handling a POST request to an
// actor's inbox independent on an application. It relies on a delegate to
// implement application specific functionality.
//
// Specifying the "scheme" allows for retrieving ActivityStreams content with
// identifiers such as HTTP, HTTPS, or other protocol schemes.
func (b *baseActor) PostInboxScheme(c context.Context, w http.ResponseWriter, r *http.Request, scheme string) (bool, error) {
// Do nothing if it is not an ActivityPub POST request.
if !isActivityPubPost(r) {
return false, nil
}
// If the Federated Protocol is not enabled, then this endpoint is not
// enabled.
if !b.enableFederatedProtocol {
w.WriteHeader(http.StatusMethodNotAllowed)
return true, nil
}
// Check the peer request is authentic.
c, authenticated, err := b.delegate.AuthenticatePostInbox(c, w, r)
if err != nil {
return true, err
} else if !authenticated {
return true, nil
}
// Begin processing the request, but have not yet applied
// authorization (ex: blocks). Obtain the activity reject unknown
// activities.
m, err := readActivityPubReq(r)
if err != nil {
return true, err
}
asValue, err := streams.ToType(c, m)
if err != nil && !streams.IsUnmatchedErr(err) {
return true, err
} else if streams.IsUnmatchedErr(err) {
// Respond with bad request -- we do not understand the type.
w.WriteHeader(http.StatusBadRequest)
return true, nil
}
activity, ok := asValue.(Activity)
if !ok {
return true, fmt.Errorf("activity streams value is not an Activity: %T", asValue)
}
if activity.GetJSONLDId() == nil {
w.WriteHeader(http.StatusBadRequest)
return true, nil
}
// Allow server implementations to set context data with a hook.
c, err = b.delegate.PostInboxRequestBodyHook(c, r, activity)
if err != nil {
return true, err
}
// Check authorization of the activity.
authorized, err := b.delegate.AuthorizePostInbox(c, w, activity)
if err != nil {
return true, err
} else if !authorized {
return true, nil
}
// Post the activity to the actor's inbox and trigger side effects for
// that particular Activity type. It is up to the delegate to resolve
// the given map.
inboxId := requestId(r, scheme)
err = b.delegate.PostInbox(c, inboxId, activity)
if err != nil {
// Special case: We know it is a bad request if the object or
// target properties needed to be populated, but weren't.
//
// Send the rejection to the peer.
if err == ErrObjectRequired || err == ErrTargetRequired {
w.WriteHeader(http.StatusBadRequest)
return true, nil
}
return true, err
}
// Our side effects are complete, now delegate determining whether to
// do inbox forwarding, as well as the action to do it.
if err := b.delegate.InboxForwarding(c, inboxId, activity); err != nil {
return true, err
}
// Request has been processed. Begin responding to the request.
//
// Simply respond with an OK status to the peer.
w.WriteHeader(http.StatusOK)
return true, nil
}
// GetInbox implements the generic algorithm for handling a GET request to an
// actor's inbox independent on an application. It relies on a delegate to
// implement application specific functionality.
func (b *baseActor) GetInbox(c context.Context, w http.ResponseWriter, r *http.Request) (bool, error) {
// Do nothing if it is not an ActivityPub GET request.
if !isActivityPubGet(r) {
return false, nil
}
// Delegate authenticating and authorizing the request.
c, authenticated, err := b.delegate.AuthenticateGetInbox(c, w, r)
if err != nil {
return true, err
} else if !authenticated {
return true, nil
}
// Everything is good to begin processing the request.
oc, err := b.delegate.GetInbox(c, r)
if err != nil {
return true, err
}
// Deduplicate the 'orderedItems' property by ID.
err = dedupeOrderedItems(oc)
if err != nil {
return true, err
}
// Request has been processed. Begin responding to the request.
//
// Serialize the OrderedCollection.
m, err := streams.Serialize(oc)
if err != nil {
return true, err
}
raw, err := json.Marshal(m)
if err != nil {
return true, err
}
// Write the response.
addResponseHeaders(w.Header(), b.clock, raw)
w.WriteHeader(http.StatusOK)
n, err := w.Write(raw)
if err != nil {
return true, err
} else if n != len(raw) {
return true, fmt.Errorf("ResponseWriter.Write wrote %d of %d bytes", n, len(raw))
}
return true, nil
}
// PostOutbox implements the generic algorithm for handling a POST request to an
// actor's outbox independent on an application. It relies on a delegate to
// implement application specific functionality.
//
// Only supports serving data with identifiers having the HTTPS scheme.
func (b *baseActor) PostOutbox(c context.Context, w http.ResponseWriter, r *http.Request) (bool, error) {
return b.PostOutboxScheme(c, w, r, "https")
}
// PostOutbox implements the generic algorithm for handling a POST request to an
// actor's outbox independent on an application. It relies on a delegate to
// implement application specific functionality.
//
// Specifying the "scheme" allows for retrieving ActivityStreams content with
// identifiers such as HTTP, HTTPS, or other protocol schemes.
func (b *baseActor) PostOutboxScheme(c context.Context, w http.ResponseWriter, r *http.Request, scheme string) (bool, error) {
// Do nothing if it is not an ActivityPub POST request.
if !isActivityPubPost(r) {
return false, nil
}
// If the Social API is not enabled, then this endpoint is not enabled.
if !b.enableSocialProtocol {
w.WriteHeader(http.StatusMethodNotAllowed)
return true, nil
}
// Delegate authenticating and authorizing the request.
c, authenticated, err := b.delegate.AuthenticatePostOutbox(c, w, r)
if err != nil {
return true, err
} else if !authenticated {
return true, nil
}
// Everything is good to begin processing the request.
m, err := readActivityPubReq(r)
if err != nil {
return true, err
}
// Note that converting to a Type will NOT successfully convert types
// not known to go-fed. This prevents accidentally wrapping an Activity
// type unknown to go-fed in a Create below. Instead,
// streams.ErrUnhandledType will be returned here.
asValue, err := streams.ToType(c, m)
if err != nil && !streams.IsUnmatchedErr(err) {
return true, err
} else if streams.IsUnmatchedErr(err) {
// Respond with bad request -- we do not understand the type.
w.WriteHeader(http.StatusBadRequest)
return true, nil
}
// Allow server implementations to set context data with a hook.
c, err = b.delegate.PostOutboxRequestBodyHook(c, r, asValue)
if err != nil {
return true, err
}
// The HTTP request steps are complete, complete the rest of the outbox
// and delivery process.
outboxId := requestId(r, scheme)
activity, err := b.deliver(c, outboxId, asValue, m)
// Special case: We know it is a bad request if the object or
// target properties needed to be populated, but weren't.
//
// Send the rejection to the client.
if err == ErrObjectRequired || err == ErrTargetRequired {
w.WriteHeader(http.StatusBadRequest)
return true, nil
} else if err != nil {
return true, err
}
// Respond to the request with the new Activity's IRI location.
w.Header().Set(locationHeader, activity.GetJSONLDId().Get().String())
w.WriteHeader(http.StatusCreated)
return true, nil
}
// GetOutbox implements the generic algorithm for handling a Get request to an
// actor's outbox independent on an application. It relies on a delegate to
// implement application specific functionality.
func (b *baseActor) GetOutbox(c context.Context, w http.ResponseWriter, r *http.Request) (bool, error) {
// Do nothing if it is not an ActivityPub GET request.
if !isActivityPubGet(r) {
return false, nil
}
// Delegate authenticating and authorizing the request.
c, authenticated, err := b.delegate.AuthenticateGetOutbox(c, w, r)
if err != nil {
return true, err
} else if !authenticated {
return true, nil
}
// Everything is good to begin processing the request.
oc, err := b.delegate.GetOutbox(c, r)
if err != nil {
return true, err
}
// Request has been processed. Begin responding to the request.
//
// Serialize the OrderedCollection.
m, err := streams.Serialize(oc)
if err != nil {
return true, err
}
raw, err := json.Marshal(m)
if err != nil {
return true, err
}
// Write the response.
addResponseHeaders(w.Header(), b.clock, raw)
w.WriteHeader(http.StatusOK)
n, err := w.Write(raw)
if err != nil {
return true, err
} else if n != len(raw) {
return true, fmt.Errorf("ResponseWriter.Write wrote %d of %d bytes", n, len(raw))
}
return true, nil
}
// deliver delegates all outbox handling steps and optionally will federate the
// activity if the federated protocol is enabled.
//
// This function is not exported so an Actor that only supports C2S cannot be
// type casted to a FederatingActor. It doesn't exactly fit the Send method
// signature anyways.
//
// Note: 'm' is nilable.
func (b *baseActor) deliver(c context.Context, outbox *url.URL, asValue vocab.Type, m map[string]interface{}) (activity Activity, err error) {
// If the value is not an Activity or type extending from Activity, then
// we need to wrap it in a Create Activity.
if !streams.IsOrExtendsActivityStreamsActivity(asValue) {
asValue, err = b.delegate.WrapInCreate(c, asValue, outbox)
if err != nil {
return
}
}
// At this point, this should be a safe conversion. If this error is
// triggered, then there is either a bug in the delegation of
// WrapInCreate, behavior is not lining up in the generated ExtendedBy
// code, or something else is incorrect with the type system.
var ok bool
activity, ok = asValue.(Activity)
if !ok {
err = fmt.Errorf("activity streams value is not an Activity: %T", asValue)
return
}
// Delegate generating new IDs for the activity and all new objects.
if err = b.delegate.AddNewIDs(c, activity); err != nil {
return
}
// Post the activity to the actor's outbox and trigger side effects for
// that particular Activity type.
//
// Since 'm' is nil-able and side effects may need access to literal nil
// values, such as for Update activities, ensure 'm' is non-nil.
if m == nil {
m, err = asValue.Serialize()
if err != nil {
return
}
}
deliverable, err := b.delegate.PostOutbox(c, activity, outbox, m)
if err != nil {
return
}
// Request has been processed and all side effects internal to this
// application server have finished. Begin side effects affecting other
// servers and/or the client who sent this request.
//
// If we are federating and the type is a deliverable one, then deliver
// the activity to federating peers.
if b.enableFederatedProtocol && deliverable {
if err = b.delegate.Deliver(c, outbox, activity); err != nil {
return
}
}
return
}
// Send is programmatically accessible if the federated protocol is enabled.
func (b *baseActorFederating) Send(c context.Context, outbox *url.URL, t vocab.Type) (Activity, error) {
return b.deliver(c, outbox, t, nil)
}

View file

@ -0,0 +1,11 @@
package pub
import (
"time"
)
// Clock determines the time.
type Clock interface {
// Now returns the current time.
Now() time.Time
}

View file

@ -0,0 +1,90 @@
package pub
import (
"context"
"net/http"
"net/url"
"code.superseriousbusiness.org/activity/streams/vocab"
)
// Common contains functions required for both the Social API and Federating
// Protocol.
//
// It is passed to the library as a dependency injection from the client
// application.
type CommonBehavior interface {
// AuthenticateGetInbox delegates the authentication of a GET to an
// inbox.
//
// Always called, regardless whether the Federated Protocol or Social
// API is enabled.
//
// If an error is returned, it is passed back to the caller of
// GetInbox. In this case, the implementation must not write a
// response to the ResponseWriter as is expected that the client will
// do so when handling the error. The 'authenticated' is ignored.
//
// If no error is returned, but authentication or authorization fails,
// then authenticated must be false and error nil. It is expected that
// the implementation handles writing to the ResponseWriter in this
// case.
//
// Finally, if the authentication and authorization succeeds, then
// authenticated must be true and error nil. The request will continue
// to be processed.
AuthenticateGetInbox(c context.Context, w http.ResponseWriter, r *http.Request) (out context.Context, authenticated bool, err error)
// AuthenticateGetOutbox delegates the authentication of a GET to an
// outbox.
//
// Always called, regardless whether the Federated Protocol or Social
// API is enabled.
//
// If an error is returned, it is passed back to the caller of
// GetOutbox. In this case, the implementation must not write a
// response to the ResponseWriter as is expected that the client will
// do so when handling the error. The 'authenticated' is ignored.
//
// If no error is returned, but authentication or authorization fails,
// then authenticated must be false and error nil. It is expected that
// the implementation handles writing to the ResponseWriter in this
// case.
//
// Finally, if the authentication and authorization succeeds, then
// authenticated must be true and error nil. The request will continue
// to be processed.
AuthenticateGetOutbox(c context.Context, w http.ResponseWriter, r *http.Request) (out context.Context, authenticated bool, err error)
// GetOutbox returns the OrderedCollection inbox of the actor for this
// context. It is up to the implementation to provide the correct
// collection for the kind of authorization given in the request.
//
// AuthenticateGetOutbox will be called prior to this.
//
// Always called, regardless whether the Federated Protocol or Social
// API is enabled.
GetOutbox(c context.Context, r *http.Request) (vocab.ActivityStreamsOrderedCollectionPage, error)
// NewTransport returns a new Transport on behalf of a specific actor.
//
// The actorBoxIRI will be either the inbox or outbox of an actor who is
// attempting to do the dereferencing or delivery. Any authentication
// scheme applied on the request must be based on this actor. The
// request must contain some sort of credential of the user, such as a
// HTTP Signature.
//
// The gofedAgent passed in should be used by the Transport
// implementation in the User-Agent, as well as the application-specific
// user agent string. The gofedAgent will indicate this library's use as
// well as the library's version number.
//
// Any server-wide rate-limiting that needs to occur should happen in a
// Transport implementation. This factory function allows this to be
// created, so peer servers are not DOS'd.
//
// Any retry logic should also be handled by the Transport
// implementation.
//
// Note that the library will not maintain a long-lived pointer to the
// returned Transport so that any private credentials are able to be
// garbage collected.
NewTransport(c context.Context, actorBoxIRI *url.URL, gofedAgent string) (t Transport, err error)
}

View file

@ -0,0 +1,152 @@
package pub
import (
"context"
"net/url"
"code.superseriousbusiness.org/activity/streams/vocab"
)
type Database interface {
// Lock takes a lock for the object at the specified id. If an error
// is returned, the lock must not have been taken.
//
// The lock must be able to succeed for an id that does not exist in
// the database. This means acquiring the lock does not guarantee the
// entry exists in the database.
//
// Locks are encouraged to be lightweight and in the Go layer, as some
// processes require tight loops acquiring and releasing locks.
//
// Used to ensure race conditions in multiple requests do not occur.
Lock(c context.Context, id *url.URL) (unlock func(), err error)
// InboxContains returns true if the OrderedCollection at 'inbox'
// contains the specified 'id'.
//
// The library makes this call only after acquiring a lock first.
InboxContains(c context.Context, inbox, id *url.URL) (contains bool, err error)
// GetInbox returns the first ordered collection page of the outbox at
// the specified IRI, for prepending new items.
//
// The library makes this call only after acquiring a lock first.
GetInbox(c context.Context, inboxIRI *url.URL) (inbox vocab.ActivityStreamsOrderedCollectionPage, err error)
// SetInbox saves the inbox value given from GetInbox, with new items
// prepended. Note that the new items must not be added as independent
// database entries. Separate calls to Create will do that.
//
// The library makes this call only after acquiring a lock first.
SetInbox(c context.Context, inbox vocab.ActivityStreamsOrderedCollectionPage) error
// Owns returns true if the database has an entry for the IRI and it
// exists in the database.
//
// The library makes this call only after acquiring a lock first.
Owns(c context.Context, id *url.URL) (owns bool, err error)
// ActorForOutbox fetches the actor's IRI for the given outbox IRI.
//
// The library makes this call only after acquiring a lock first.
ActorForOutbox(c context.Context, outboxIRI *url.URL) (actorIRI *url.URL, err error)
// ActorForInbox fetches the actor's IRI for the given outbox IRI.
//
// The library makes this call only after acquiring a lock first.
ActorForInbox(c context.Context, inboxIRI *url.URL) (actorIRI *url.URL, err error)
// OutboxForInbox fetches the corresponding actor's outbox IRI for the
// actor's inbox IRI.
//
// The library makes this call only after acquiring a lock first.
OutboxForInbox(c context.Context, inboxIRI *url.URL) (outboxIRI *url.URL, err error)
// InboxesForIRI fetches inboxes corresponding to the given iri.
// This allows your server to skip remote dereferencing of iris
// in order to speed up message delivery, if desired.
//
// It is acceptable to just return nil or an empty slice for the inboxIRIs,
// if you don't know the inbox iri, or you don't wish to use this feature.
// In this case, the library will attempt to resolve inboxes of the iri
// by remote dereferencing instead.
//
// If the input iri is the iri of an Actor, then the inbox for the actor
// should be returned as a single-entry slice.
//
// If the input iri is a Collection (such as a Collection of followers),
// then each follower inbox IRI should be returned in the inboxIRIs slice.
//
// The library makes this call only after acquiring a lock first.
InboxesForIRI(c context.Context, iri *url.URL) (inboxIRIs []*url.URL, err error)
// Exists returns true if the database has an entry for the specified
// id. It may not be owned by this application instance.
//
// The library makes this call only after acquiring a lock first.
Exists(c context.Context, id *url.URL) (exists bool, err error)
// Get returns the database entry for the specified id.
//
// The library makes this call only after acquiring a lock first.
Get(c context.Context, id *url.URL) (value vocab.Type, err error)
// Create adds a new entry to the database which must be able to be
// keyed by its id.
//
// Note that Activity values received from federated peers may also be
// created in the database this way if the Federating Protocol is
// enabled. The client may freely decide to store only the id instead of
// the entire value.
//
// The library makes this call only after acquiring a lock first.
//
// Under certain conditions and network activities, Create may be called
// multiple times for the same ActivityStreams object.
Create(c context.Context, asType vocab.Type) error
// Update sets an existing entry to the database based on the value's
// id.
//
// Note that Activity values received from federated peers may also be
// updated in the database this way if the Federating Protocol is
// enabled. The client may freely decide to store only the id instead of
// the entire value.
//
// The library makes this call only after acquiring a lock first.
Update(c context.Context, asType vocab.Type) error
// Delete removes the entry with the given id.
//
// Delete is only called for federated objects. Deletes from the Social
// Protocol instead call Update to create a Tombstone.
//
// The library makes this call only after acquiring a lock first.
Delete(c context.Context, id *url.URL) error
// GetOutbox returns the first ordered collection page of the outbox
// at the specified IRI, for prepending new items.
//
// The library makes this call only after acquiring a lock first.
GetOutbox(c context.Context, outboxIRI *url.URL) (outbox vocab.ActivityStreamsOrderedCollectionPage, err error)
// SetOutbox saves the outbox value given from GetOutbox, with new items
// prepended. Note that the new items must not be added as independent
// database entries. Separate calls to Create will do that.
//
// The library makes this call only after acquiring a lock first.
SetOutbox(c context.Context, outbox vocab.ActivityStreamsOrderedCollectionPage) error
// NewID creates a new IRI id for the provided activity or object. The
// implementation does not need to set the 'id' property and simply
// needs to determine the value.
//
// The go-fed library will handle setting the 'id' property on the
// activity or object provided with the value returned.
NewID(c context.Context, t vocab.Type) (id *url.URL, err error)
// Followers obtains the Followers Collection for an actor with the
// given id.
//
// If modified, the library will then call Update.
//
// The library makes this call only after acquiring a lock first.
Followers(c context.Context, actorIRI *url.URL) (followers vocab.ActivityStreamsCollection, err error)
// Following obtains the Following Collection for an actor with the
// given id.
//
// If modified, the library will then call Update.
//
// The library makes this call only after acquiring a lock first.
Following(c context.Context, actorIRI *url.URL) (following vocab.ActivityStreamsCollection, err error)
// Liked obtains the Liked Collection for an actor with the
// given id.
//
// If modified, the library will then call Update.
//
// The library makes this call only after acquiring a lock first.
Liked(c context.Context, actorIRI *url.URL) (liked vocab.ActivityStreamsCollection, err error)
}

View file

@ -0,0 +1,249 @@
package pub
import (
"context"
"net/http"
"net/url"
"code.superseriousbusiness.org/activity/streams/vocab"
)
// DelegateActor contains the detailed interface an application must satisfy in
// order to implement the ActivityPub specification.
//
// Note that an implementation of this interface is implicitly provided in the
// calls to NewActor, NewSocialActor, and NewFederatingActor.
//
// Implementing the DelegateActor requires familiarity with the ActivityPub
// specification because it does not a strong enough abstraction for the client
// application to ignore the ActivityPub spec. It is very possible to implement
// this interface and build a foot-gun that trashes the fediverse without being
// ActivityPub compliant. Please use with due consideration.
//
// Alternatively, build an application that uses the parts of the pub library
// that do not require implementing a DelegateActor so that the ActivityPub
// implementation is completely provided out of the box.
type DelegateActor interface {
// Hook callback after parsing the request body for a federated request
// to the Actor's inbox.
//
// Can be used to set contextual information based on the Activity
// received.
//
// Only called if the Federated Protocol is enabled.
//
// Warning: Neither authentication nor authorization has taken place at
// this time. Doing anything beyond setting contextual information is
// strongly discouraged.
//
// If an error is returned, it is passed back to the caller of
// PostInbox. In this case, the DelegateActor implementation must not
// write a response to the ResponseWriter as is expected that the caller
// to PostInbox will do so when handling the error.
PostInboxRequestBodyHook(c context.Context, r *http.Request, activity Activity) (context.Context, error)
// Hook callback after parsing the request body for a client request
// to the Actor's outbox.
//
// Can be used to set contextual information based on the
// ActivityStreams object received.
//
// Only called if the Social API is enabled.
//
// Warning: Neither authentication nor authorization has taken place at
// this time. Doing anything beyond setting contextual information is
// strongly discouraged.
//
// If an error is returned, it is passed back to the caller of
// PostOutbox. In this case, the DelegateActor implementation must not
// write a response to the ResponseWriter as is expected that the caller
// to PostOutbox will do so when handling the error.
PostOutboxRequestBodyHook(c context.Context, r *http.Request, data vocab.Type) (context.Context, error)
// AuthenticatePostInbox delegates the authentication of a POST to an
// inbox.
//
// Only called if the Federated Protocol is enabled.
//
// If an error is returned, it is passed back to the caller of
// PostInbox. In this case, the implementation must not write a
// response to the ResponseWriter as is expected that the client will
// do so when handling the error. The 'authenticated' is ignored.
//
// If no error is returned, but authentication or authorization fails,
// then authenticated must be false and error nil. It is expected that
// the implementation handles writing to the ResponseWriter in this
// case.
//
// Finally, if the authentication and authorization succeeds, then
// authenticated must be true and error nil. The request will continue
// to be processed.
AuthenticatePostInbox(c context.Context, w http.ResponseWriter, r *http.Request) (out context.Context, authenticated bool, err error)
// AuthenticateGetInbox delegates the authentication of a GET to an
// inbox.
//
// Always called, regardless whether the Federated Protocol or Social
// API is enabled.
//
// If an error is returned, it is passed back to the caller of
// GetInbox. In this case, the implementation must not write a
// response to the ResponseWriter as is expected that the client will
// do so when handling the error. The 'authenticated' is ignored.
//
// If no error is returned, but authentication or authorization fails,
// then authenticated must be false and error nil. It is expected that
// the implementation handles writing to the ResponseWriter in this
// case.
//
// Finally, if the authentication and authorization succeeds, then
// authenticated must be true and error nil. The request will continue
// to be processed.
AuthenticateGetInbox(c context.Context, w http.ResponseWriter, r *http.Request) (out context.Context, authenticated bool, err error)
// AuthorizePostInbox delegates the authorization of an activity that
// has been sent by POST to an inbox.
//
// Only called if the Federated Protocol is enabled.
//
// If an error is returned, it is passed back to the caller of
// PostInbox. In this case, the implementation must not write a
// response to the ResponseWriter as is expected that the client will
// do so when handling the error. The 'authorized' is ignored.
//
// If no error is returned, but authorization fails, then authorized
// must be false and error nil. It is expected that the implementation
// handles writing to the ResponseWriter in this case.
//
// Finally, if the authentication and authorization succeeds, then
// authorized must be true and error nil. The request will continue
// to be processed.
AuthorizePostInbox(c context.Context, w http.ResponseWriter, activity Activity) (authorized bool, err error)
// PostInbox delegates the side effects of adding to the inbox and
// determining if it is a request that should be blocked.
//
// Only called if the Federated Protocol is enabled.
//
// As a side effect, PostInbox sets the federated data in the inbox, but
// not on its own in the database, as InboxForwarding (which is called
// later) must decide whether it has seen this activity before in order
// to determine whether to do the forwarding algorithm.
//
// If the error is ErrObjectRequired or ErrTargetRequired, then a Bad
// Request status is sent in the response.
PostInbox(c context.Context, inboxIRI *url.URL, activity Activity) error
// InboxForwarding delegates inbox forwarding logic when a POST request
// is received in the Actor's inbox.
//
// Only called if the Federated Protocol is enabled.
//
// The delegate is responsible for determining whether to do the inbox
// forwarding, as well as actually conducting it if it determines it
// needs to.
//
// As a side effect, InboxForwarding must set the federated data in the
// database, independently of the inbox, however it sees fit in order to
// determine whether it has seen the activity before.
//
// The provided url is the inbox of the recipient of the Activity. The
// Activity is examined for the information about who to inbox forward
// to.
//
// If an error is returned, it is returned to the caller of PostInbox.
InboxForwarding(c context.Context, inboxIRI *url.URL, activity Activity) error
// PostOutbox delegates the logic for side effects and adding to the
// outbox.
//
// Always called, regardless whether the Federated Protocol or Social
// API is enabled. In the case of the Social API being enabled, side
// effects of the Activity must occur.
//
// The delegate is responsible for adding the activity to the database's
// general storage for independent retrieval, and not just within the
// actor's outbox.
//
// If the error is ErrObjectRequired or ErrTargetRequired, then a Bad
// Request status is sent in the response.
//
// Note that 'rawJSON' is an unfortunate consequence where an 'Update'
// Activity is the only one that explicitly cares about 'null' values in
// JSON. Since go-fed does not differentiate between 'null' values and
// values that are simply not present, the 'rawJSON' map is ONLY needed
// for this narrow and specific use case.
PostOutbox(c context.Context, a Activity, outboxIRI *url.URL, rawJSON map[string]interface{}) (deliverable bool, e error)
// AddNewIDs sets new URL ids on the activity. It also does so for all
// 'object' properties if the Activity is a Create type.
//
// Only called if the Social API is enabled.
//
// If an error is returned, it is returned to the caller of PostOutbox.
AddNewIDs(c context.Context, a Activity) error
// Deliver sends a federated message. Called only if federation is
// enabled.
//
// Called if the Federated Protocol is enabled.
//
// The provided url is the outbox of the sender. The Activity contains
// the information about the intended recipients.
//
// If an error is returned, it is returned to the caller of PostOutbox.
Deliver(c context.Context, outbox *url.URL, activity Activity) error
// AuthenticatePostOutbox delegates the authentication and authorization
// of a POST to an outbox.
//
// Only called if the Social API is enabled.
//
// If an error is returned, it is passed back to the caller of
// PostOutbox. In this case, the implementation must not write a
// response to the ResponseWriter as is expected that the client will
// do so when handling the error. The 'authenticated' is ignored.
//
// If no error is returned, but authentication or authorization fails,
// then authenticated must be false and error nil. It is expected that
// the implementation handles writing to the ResponseWriter in this
// case.
//
// Finally, if the authentication and authorization succeeds, then
// authenticated must be true and error nil. The request will continue
// to be processed.
AuthenticatePostOutbox(c context.Context, w http.ResponseWriter, r *http.Request) (out context.Context, authenticated bool, err error)
// AuthenticateGetOutbox delegates the authentication of a GET to an
// outbox.
//
// Always called, regardless whether the Federated Protocol or Social
// API is enabled.
//
// If an error is returned, it is passed back to the caller of
// GetOutbox. In this case, the implementation must not write a
// response to the ResponseWriter as is expected that the client will
// do so when handling the error. The 'authenticated' is ignored.
//
// If no error is returned, but authentication or authorization fails,
// then authenticated must be false and error nil. It is expected that
// the implementation handles writing to the ResponseWriter in this
// case.
//
// Finally, if the authentication and authorization succeeds, then
// authenticated must be true and error nil. The request will continue
// to be processed.
AuthenticateGetOutbox(c context.Context, w http.ResponseWriter, r *http.Request) (out context.Context, authenticated bool, err error)
// WrapInCreate wraps the provided object in a Create ActivityStreams
// activity. The provided URL is the actor's outbox endpoint.
//
// Only called if the Social API is enabled.
WrapInCreate(c context.Context, value vocab.Type, outboxIRI *url.URL) (vocab.ActivityStreamsCreate, error)
// GetOutbox returns the OrderedCollection inbox of the actor for this
// context. It is up to the implementation to provide the correct
// collection for the kind of authorization given in the request.
//
// AuthenticateGetOutbox will be called prior to this.
//
// Always called, regardless whether the Federated Protocol or Social
// API is enabled.
GetOutbox(c context.Context, r *http.Request) (vocab.ActivityStreamsOrderedCollectionPage, error)
// GetInbox returns the OrderedCollection inbox of the actor for this
// context. It is up to the implementation to provide the correct
// collection for the kind of authorization given in the request.
//
// AuthenticateGetInbox will be called prior to this.
//
// Always called, regardless whether the Federated Protocol or Social
// API is enabled.
GetInbox(c context.Context, r *http.Request) (vocab.ActivityStreamsOrderedCollectionPage, error)
}

View file

@ -0,0 +1,9 @@
// Package pub implements the ActivityPub protocol.
//
// Note that every time the ActivityStreams types are changed (added, removed)
// due to code generation, the internal function toASType needs to be modified
// to know about these types.
//
// Note that every version change should also include a change in the version.go
// file.
package pub

View file

@ -0,0 +1,125 @@
package pub
import (
"context"
"net/http"
"net/url"
"code.superseriousbusiness.org/activity/streams/vocab"
)
// FederatingProtocol contains behaviors an application needs to satisfy for the
// full ActivityPub S2S implementation to be supported by this library.
//
// It is only required if the client application wants to support the server-to-
// server, or federating, protocol.
//
// It is passed to the library as a dependency injection from the client
// application.
type FederatingProtocol interface {
// Hook callback after parsing the request body for a federated request
// to the Actor's inbox.
//
// Can be used to set contextual information based on the Activity
// received.
//
// Only called if the Federated Protocol is enabled.
//
// Warning: Neither authentication nor authorization has taken place at
// this time. Doing anything beyond setting contextual information is
// strongly discouraged.
//
// If an error is returned, it is passed back to the caller of
// PostInbox. In this case, the DelegateActor implementation must not
// write a response to the ResponseWriter as is expected that the caller
// to PostInbox will do so when handling the error.
PostInboxRequestBodyHook(c context.Context, r *http.Request, activity Activity) (context.Context, error)
// AuthenticatePostInbox delegates the authentication of a POST to an
// inbox.
//
// If an error is returned, it is passed back to the caller of
// PostInbox. In this case, the implementation must not write a
// response to the ResponseWriter as is expected that the client will
// do so when handling the error. The 'authenticated' is ignored.
//
// If no error is returned, but authentication or authorization fails,
// then authenticated must be false and error nil. It is expected that
// the implementation handles writing to the ResponseWriter in this
// case.
//
// Finally, if the authentication and authorization succeeds, then
// authenticated must be true and error nil. The request will continue
// to be processed.
AuthenticatePostInbox(c context.Context, w http.ResponseWriter, r *http.Request) (out context.Context, authenticated bool, err error)
// Blocked should determine whether to permit a set of actors given by
// their ids are able to interact with this particular end user due to
// being blocked or other application-specific logic.
//
// If an error is returned, it is passed back to the caller of
// PostInbox.
//
// If no error is returned, but authentication or authorization fails,
// then blocked must be true and error nil. An http.StatusForbidden
// will be written in the wresponse.
//
// Finally, if the authentication and authorization succeeds, then
// blocked must be false and error nil. The request will continue
// to be processed.
Blocked(c context.Context, actorIRIs []*url.URL) (blocked bool, err error)
// FederatingCallbacks returns the application logic that handles
// ActivityStreams received from federating peers.
//
// Note that certain types of callbacks will be 'wrapped' with default
// behaviors supported natively by the library. Other callbacks
// compatible with streams.TypeResolver can be specified by 'other'.
//
// For example, setting the 'Create' field in the
// FederatingWrappedCallbacks lets an application dependency inject
// additional behaviors they want to take place, including the default
// behavior supplied by this library. This is guaranteed to be compliant
// with the ActivityPub Social protocol.
//
// To override the default behavior, instead supply the function in
// 'other', which does not guarantee the application will be compliant
// with the ActivityPub Social Protocol.
//
// Applications are not expected to handle every single ActivityStreams
// type and extension. The unhandled ones are passed to DefaultCallback.
FederatingCallbacks(c context.Context) (wrapped FederatingWrappedCallbacks, other []interface{}, err error)
// DefaultCallback is called for types that go-fed can deserialize but
// are not handled by the application's callbacks returned in the
// Callbacks method.
//
// Applications are not expected to handle every single ActivityStreams
// type and extension, so the unhandled ones are passed to
// DefaultCallback.
DefaultCallback(c context.Context, activity Activity) error
// MaxInboxForwardingRecursionDepth determines how deep to search within
// an activity to determine if inbox forwarding needs to occur.
//
// Zero or negative numbers indicate infinite recursion.
MaxInboxForwardingRecursionDepth(c context.Context) int
// MaxDeliveryRecursionDepth determines how deep to search within
// collections owned by peers when they are targeted to receive a
// delivery.
//
// Zero or negative numbers indicate infinite recursion.
MaxDeliveryRecursionDepth(c context.Context) int
// FilterForwarding allows the implementation to apply business logic
// such as blocks, spam filtering, and so on to a list of potential
// Collections and OrderedCollections of recipients when inbox
// forwarding has been triggered.
//
// The activity is provided as a reference for more intelligent
// logic to be used, but the implementation must not modify it.
FilterForwarding(c context.Context, potentialRecipients []*url.URL, a Activity) (filteredRecipients []*url.URL, err error)
// GetInbox returns the OrderedCollection inbox of the actor for this
// context. It is up to the implementation to provide the correct
// collection for the kind of authorization given in the request.
//
// AuthenticateGetInbox will be called prior to this.
//
// Always called, regardless whether the Federated Protocol or Social
// API is enabled.
GetInbox(c context.Context, r *http.Request) (vocab.ActivityStreamsOrderedCollectionPage, error)
}

View file

@ -0,0 +1,917 @@
package pub
import (
"context"
"fmt"
"net/url"
"code.superseriousbusiness.org/activity/streams"
"code.superseriousbusiness.org/activity/streams/vocab"
)
// OnFollowBehavior enumerates the different default actions that the go-fed
// library can provide when receiving a Follow Activity from a peer.
type OnFollowBehavior int
const (
// OnFollowDoNothing does not take any action when a Follow Activity
// is received.
OnFollowDoNothing OnFollowBehavior = iota
// OnFollowAutomaticallyAccept triggers the side effect of sending an
// Accept of this Follow request in response.
OnFollowAutomaticallyAccept
// OnFollowAutomaticallyAccept triggers the side effect of sending a
// Reject of this Follow request in response.
OnFollowAutomaticallyReject
)
// FederatingWrappedCallbacks lists the callback functions that already have
// some side effect behavior provided by the pub library.
//
// These functions are wrapped for the Federating Protocol.
type FederatingWrappedCallbacks struct {
// Create handles additional side effects for the Create ActivityStreams
// type, specific to the application using go-fed.
//
// The wrapping callback for the Federating Protocol ensures the
// 'object' property is created in the database.
//
// Create calls Create for each object in the federated Activity.
Create func(context.Context, vocab.ActivityStreamsCreate) error
// Update handles additional side effects for the Update ActivityStreams
// type, specific to the application using go-fed.
//
// The wrapping callback for the Federating Protocol ensures the
// 'object' property is updated in the database.
//
// Update calls Update on the federated entry from the database, with a
// new value.
Update func(context.Context, vocab.ActivityStreamsUpdate) error
// Delete handles additional side effects for the Delete ActivityStreams
// type, specific to the application using go-fed.
//
// Delete removes the federated entry from the database.
Delete func(context.Context, vocab.ActivityStreamsDelete) error
// Follow handles additional side effects for the Follow ActivityStreams
// type, specific to the application using go-fed.
//
// The wrapping function can have one of several default behaviors,
// depending on the value of the OnFollow setting.
Follow func(context.Context, vocab.ActivityStreamsFollow) error
// OnFollow determines what action to take for this particular callback
// if a Follow Activity is handled.
OnFollow OnFollowBehavior
// Accept handles additional side effects for the Accept ActivityStreams
// type, specific to the application using go-fed.
//
// The wrapping function determines if this 'Accept' is in response to a
// 'Follow'. If so, then the 'actor' is added to the original 'actor's
// 'following' collection.
//
// Otherwise, no side effects are done by go-fed.
Accept func(context.Context, vocab.ActivityStreamsAccept) error
// Reject handles additional side effects for the Reject ActivityStreams
// type, specific to the application using go-fed.
//
// The wrapping function has no default side effects. However, if this
// 'Reject' is in response to a 'Follow' then the client MUST NOT go
// forward with adding the 'actor' to the original 'actor's 'following'
// collection by the client application.
Reject func(context.Context, vocab.ActivityStreamsReject) error
// Add handles additional side effects for the Add ActivityStreams
// type, specific to the application using go-fed.
//
// The wrapping function will add the 'object' IRIs to a specific
// 'target' collection if the 'target' collection(s) live on this
// server.
Add func(context.Context, vocab.ActivityStreamsAdd) error
// Remove handles additional side effects for the Remove ActivityStreams
// type, specific to the application using go-fed.
//
// The wrapping function will remove all 'object' IRIs from a specific
// 'target' collection if the 'target' collection(s) live on this
// server.
Remove func(context.Context, vocab.ActivityStreamsRemove) error
// Like handles additional side effects for the Like ActivityStreams
// type, specific to the application using go-fed.
//
// The wrapping function will add the activity to the "likes" collection
// on all 'object' targets owned by this server.
Like func(context.Context, vocab.ActivityStreamsLike) error
// Announce handles additional side effects for the Announce
// ActivityStreams type, specific to the application using go-fed.
//
// The wrapping function will add the activity to the "shares"
// collection on all 'object' targets owned by this server.
Announce func(context.Context, vocab.ActivityStreamsAnnounce) error
// Undo handles additional side effects for the Undo ActivityStreams
// type, specific to the application using go-fed.
//
// The wrapping function ensures the 'actor' on the 'Undo'
// is be the same as the 'actor' on all Activities being undone.
// It enforces that the actors on the Undo must correspond to all of the
// 'object' actors in some manner.
//
// It is expected that the application will implement the proper
// reversal of activities that are being undone.
Undo func(context.Context, vocab.ActivityStreamsUndo) error
// Block handles additional side effects for the Block ActivityStreams
// type, specific to the application using go-fed.
//
// The wrapping function provides no default side effects. It simply
// calls the wrapped function. However, note that Blocks should not be
// received from a federated peer, as delivering Blocks explicitly
// deviates from the original ActivityPub specification.
Block func(context.Context, vocab.ActivityStreamsBlock) error
// Sidechannel data -- this is set at request handling time. These must
// be set before the callbacks are used.
// db is the Database the FederatingWrappedCallbacks should use.
db Database
// inboxIRI is the inboxIRI that is handling this callback.
inboxIRI *url.URL
// addNewIds creates new 'id' entries on an activity and its objects if
// it is a Create activity.
addNewIds func(c context.Context, activity Activity) error
// deliver delivers an outgoing message.
deliver func(c context.Context, outboxIRI *url.URL, activity Activity) error
// newTransport creates a new Transport.
newTransport func(c context.Context, actorBoxIRI *url.URL, gofedAgent string) (t Transport, err error)
}
// callbacks returns the WrappedCallbacks members into a single interface slice
// for use in streams.Resolver callbacks.
//
// If the given functions have a type that collides with the default behavior,
// then disable our default behavior
func (w FederatingWrappedCallbacks) callbacks(fns []interface{}) []interface{} {
enableCreate := true
enableUpdate := true
enableDelete := true
enableFollow := true
enableAccept := true
enableReject := true
enableAdd := true
enableRemove := true
enableLike := true
enableAnnounce := true
enableUndo := true
enableBlock := true
for _, fn := range fns {
switch fn.(type) {
default:
continue
case func(context.Context, vocab.ActivityStreamsCreate) error:
enableCreate = false
case func(context.Context, vocab.ActivityStreamsUpdate) error:
enableUpdate = false
case func(context.Context, vocab.ActivityStreamsDelete) error:
enableDelete = false
case func(context.Context, vocab.ActivityStreamsFollow) error:
enableFollow = false
case func(context.Context, vocab.ActivityStreamsAccept) error:
enableAccept = false
case func(context.Context, vocab.ActivityStreamsReject) error:
enableReject = false
case func(context.Context, vocab.ActivityStreamsAdd) error:
enableAdd = false
case func(context.Context, vocab.ActivityStreamsRemove) error:
enableRemove = false
case func(context.Context, vocab.ActivityStreamsLike) error:
enableLike = false
case func(context.Context, vocab.ActivityStreamsAnnounce) error:
enableAnnounce = false
case func(context.Context, vocab.ActivityStreamsUndo) error:
enableUndo = false
case func(context.Context, vocab.ActivityStreamsBlock) error:
enableBlock = false
}
}
if enableCreate {
fns = append(fns, w.create)
}
if enableUpdate {
fns = append(fns, w.update)
}
if enableDelete {
fns = append(fns, w.deleteFn)
}
if enableFollow {
fns = append(fns, w.follow)
}
if enableAccept {
fns = append(fns, w.accept)
}
if enableReject {
fns = append(fns, w.reject)
}
if enableAdd {
fns = append(fns, w.add)
}
if enableRemove {
fns = append(fns, w.remove)
}
if enableLike {
fns = append(fns, w.like)
}
if enableAnnounce {
fns = append(fns, w.announce)
}
if enableUndo {
fns = append(fns, w.undo)
}
if enableBlock {
fns = append(fns, w.block)
}
return fns
}
// create implements the federating Create activity side effects.
func (w FederatingWrappedCallbacks) create(c context.Context, a vocab.ActivityStreamsCreate) error {
op := a.GetActivityStreamsObject()
if op == nil || op.Len() == 0 {
return ErrObjectRequired
}
// Create anonymous loop function to be able to properly scope the defer
// for the database lock at each iteration.
loopFn := func(iter vocab.ActivityStreamsObjectPropertyIterator) error {
t := iter.GetType()
if t == nil && iter.IsIRI() {
// Attempt to dereference the IRI instead
tport, err := w.newTransport(c, w.inboxIRI, goFedUserAgent())
if err != nil {
return err
}
resp, err := tport.Dereference(c, iter.GetIRI())
if err != nil {
return err
}
m, err := readActivityPubResp(resp)
if err != nil {
return err
}
t, err = streams.ToType(c, m)
if err != nil {
return err
}
} else if t == nil {
return fmt.Errorf("cannot handle federated create: object is neither a value nor IRI")
}
id, err := GetId(t)
if err != nil {
return err
}
var unlock func()
unlock, err = w.db.Lock(c, id)
if err != nil {
return err
}
defer unlock()
if err := w.db.Create(c, t); err != nil {
return err
}
return nil
}
for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
if err := loopFn(iter); err != nil {
return err
}
}
if w.Create != nil {
return w.Create(c, a)
}
return nil
}
// update implements the federating Update activity side effects.
func (w FederatingWrappedCallbacks) update(c context.Context, a vocab.ActivityStreamsUpdate) error {
op := a.GetActivityStreamsObject()
if op == nil || op.Len() == 0 {
return ErrObjectRequired
}
if err := mustHaveActivityOriginMatchObjects(a); err != nil {
return err
}
// Create anonymous loop function to be able to properly scope the defer
// for the database lock at each iteration.
loopFn := func(iter vocab.ActivityStreamsObjectPropertyIterator) error {
t := iter.GetType()
if t == nil {
return fmt.Errorf("update requires an object to be wholly provided")
}
id, err := GetId(t)
if err != nil {
return err
}
var unlock func()
unlock, err = w.db.Lock(c, id)
if err != nil {
return err
}
defer unlock()
if err := w.db.Update(c, t); err != nil {
return err
}
return nil
}
for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
if err := loopFn(iter); err != nil {
return err
}
}
if w.Update != nil {
return w.Update(c, a)
}
return nil
}
// deleteFn implements the federating Delete activity side effects.
func (w FederatingWrappedCallbacks) deleteFn(c context.Context, a vocab.ActivityStreamsDelete) error {
op := a.GetActivityStreamsObject()
if op == nil || op.Len() == 0 {
return ErrObjectRequired
}
if err := mustHaveActivityOriginMatchObjects(a); err != nil {
return err
}
// Create anonymous loop function to be able to properly scope the defer
// for the database lock at each iteration.
loopFn := func(iter vocab.ActivityStreamsObjectPropertyIterator) error {
id, err := ToId(iter)
if err != nil {
return err
}
var unlock func()
unlock, err = w.db.Lock(c, id)
if err != nil {
return err
}
defer unlock()
if err := w.db.Delete(c, id); err != nil {
return err
}
return nil
}
for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
if err := loopFn(iter); err != nil {
return err
}
}
if w.Delete != nil {
return w.Delete(c, a)
}
return nil
}
// follow implements the federating Follow activity side effects.
func (w FederatingWrappedCallbacks) follow(c context.Context, a vocab.ActivityStreamsFollow) error {
op := a.GetActivityStreamsObject()
if op == nil || op.Len() == 0 {
return ErrObjectRequired
}
// Check that we own at least one of the 'object' properties, and ensure
// it is to the actor that owns this inbox.
//
// If not then don't send a response. It was federated to us as an FYI,
// by mistake, or some other reason.
unlock, err := w.db.Lock(c, w.inboxIRI)
if err != nil {
return err
}
// WARNING: Unlock not deferred.
actorIRI, err := w.db.ActorForInbox(c, w.inboxIRI)
unlock() // unlock even on error
if err != nil {
return err
}
// Unlock must be called by now and every branch above.
isMe := false
if w.OnFollow != OnFollowDoNothing {
for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
id, err := ToId(iter)
if err != nil {
return err
}
if id.String() == actorIRI.String() {
isMe = true
break
}
}
}
if isMe {
// Prepare the response.
var response Activity
if w.OnFollow == OnFollowAutomaticallyAccept {
response = streams.NewActivityStreamsAccept()
} else if w.OnFollow == OnFollowAutomaticallyReject {
response = streams.NewActivityStreamsReject()
} else {
return fmt.Errorf("unknown OnFollowBehavior: %d", w.OnFollow)
}
// Set us as the 'actor'.
me := streams.NewActivityStreamsActorProperty()
response.SetActivityStreamsActor(me)
me.AppendIRI(actorIRI)
// Set the Follow as the 'object' property.
op := streams.NewActivityStreamsObjectProperty()
response.SetActivityStreamsObject(op)
op.AppendActivityStreamsFollow(a)
// Add all actors on the original Follow to the 'to' property.
recipients := make([]*url.URL, 0)
to := streams.NewActivityStreamsToProperty()
response.SetActivityStreamsTo(to)
followActors := a.GetActivityStreamsActor()
for iter := followActors.Begin(); iter != followActors.End(); iter = iter.Next() {
id, err := ToId(iter)
if err != nil {
return err
}
to.AppendIRI(id)
recipients = append(recipients, id)
}
if w.OnFollow == OnFollowAutomaticallyAccept {
// If automatically accepting, then also update our
// followers collection with the new actors.
//
// If automatically rejecting, do not update the
// followers collection.
unlock, err := w.db.Lock(c, actorIRI)
if err != nil {
return err
}
// WARNING: Unlock not deferred.
followers, err := w.db.Followers(c, actorIRI)
if err != nil {
unlock()
return err
}
items := followers.GetActivityStreamsItems()
if items == nil {
items = streams.NewActivityStreamsItemsProperty()
followers.SetActivityStreamsItems(items)
}
for _, elem := range recipients {
items.PrependIRI(elem)
}
err = w.db.Update(c, followers)
unlock() // unlock even on error
if err != nil {
return err
}
// Unlock must be called by now and every branch above.
}
// Lock without defer!
unlock, err := w.db.Lock(c, w.inboxIRI)
if err != nil {
return err
}
outboxIRI, err := w.db.OutboxForInbox(c, w.inboxIRI)
unlock() // unlock after, regardless
if err != nil {
return err
}
// Everything must be unlocked by now.
if err := w.addNewIds(c, response); err != nil {
return err
} else if err := w.deliver(c, outboxIRI, response); err != nil {
return err
}
}
if w.Follow != nil {
return w.Follow(c, a)
}
return nil
}
// accept implements the federating Accept activity side effects.
func (w FederatingWrappedCallbacks) accept(c context.Context, a vocab.ActivityStreamsAccept) error {
op := a.GetActivityStreamsObject()
if op != nil && op.Len() > 0 {
// Get this actor's id.
unlock, err := w.db.Lock(c, w.inboxIRI)
if err != nil {
return err
}
// WARNING: Unlock not deferred.
actorIRI, err := w.db.ActorForInbox(c, w.inboxIRI)
unlock() // unlock after regardless
if err != nil {
return err
}
// Unlock must be called by now and every branch above.
//
// Determine if we are in a follow on the 'object' property.
//
// TODO: Handle Accept multiple Follow.
var maybeMyFollowIRI *url.URL
for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
t := iter.GetType()
if t == nil && iter.IsIRI() {
// Attempt to dereference the IRI instead
tport, err := w.newTransport(c, w.inboxIRI, goFedUserAgent())
if err != nil {
return err
}
resp, err := tport.Dereference(c, iter.GetIRI())
if err != nil {
return err
}
m, err := readActivityPubResp(resp)
if err != nil {
return err
}
t, err = streams.ToType(c, m)
if err != nil {
return err
}
} else if t == nil {
return fmt.Errorf("cannot handle federated create: object is neither a value nor IRI")
}
// Ensure it is a Follow.
if !streams.IsOrExtendsActivityStreamsFollow(t) {
continue
}
follow, ok := t.(Activity)
if !ok {
return fmt.Errorf("a Follow in an Accept does not satisfy the Activity interface")
}
followId, err := GetId(follow)
if err != nil {
return err
}
// Ensure that we are one of the actors on the Follow.
actors := follow.GetActivityStreamsActor()
for iter := actors.Begin(); iter != actors.End(); iter = iter.Next() {
id, err := ToId(iter)
if err != nil {
return err
}
if id.String() == actorIRI.String() {
maybeMyFollowIRI = followId
break
}
}
// Continue breaking if we found ourselves
if maybeMyFollowIRI != nil {
break
}
}
// If we received an Accept whose 'object' is a Follow with an
// Accept that we sent, add to the following collection.
if maybeMyFollowIRI != nil {
// Verify our Follow request exists and the peer didn't
// fabricate it.
activityActors := a.GetActivityStreamsActor()
if activityActors == nil || activityActors.Len() == 0 {
return fmt.Errorf("an Accept with a Follow has no actors")
}
// This may be a duplicate check if we dereferenced the
// Follow above. TODO: Separate this logic to avoid
// redundancy.
//
// Use an anonymous function to properly scope the
// database lock, immediately call it.
err = func() error {
unlock, err := w.db.Lock(c, maybeMyFollowIRI)
if err != nil {
return err
}
defer unlock()
t, err := w.db.Get(c, maybeMyFollowIRI)
if err != nil {
return err
}
if !streams.IsOrExtendsActivityStreamsFollow(t) {
return fmt.Errorf("peer gave an Accept wrapping a Follow but provided a non-Follow id")
}
follow, ok := t.(Activity)
if !ok {
return fmt.Errorf("a Follow in an Accept does not satisfy the Activity interface")
}
// Ensure that we are one of the actors on the Follow.
ok = false
actors := follow.GetActivityStreamsActor()
for iter := actors.Begin(); iter != actors.End(); iter = iter.Next() {
id, err := ToId(iter)
if err != nil {
return err
}
if id.String() == actorIRI.String() {
ok = true
break
}
}
if !ok {
return fmt.Errorf("peer gave an Accept wrapping a Follow but we are not the actor on that Follow")
}
// Build map of original Accept actors
acceptActors := make(map[string]bool)
for iter := activityActors.Begin(); iter != activityActors.End(); iter = iter.Next() {
id, err := ToId(iter)
if err != nil {
return err
}
acceptActors[id.String()] = false
}
// Verify all actor(s) were on the original Follow.
followObj := follow.GetActivityStreamsObject()
for iter := followObj.Begin(); iter != followObj.End(); iter = iter.Next() {
id, err := ToId(iter)
if err != nil {
return err
}
if _, ok := acceptActors[id.String()]; ok {
acceptActors[id.String()] = true
}
}
for _, found := range acceptActors {
if !found {
return fmt.Errorf("peer gave an Accept wrapping a Follow but was not an object in the original Follow")
}
}
return nil
}()
if err != nil {
return err
}
// Add the peer to our following collection.
unlock, err := w.db.Lock(c, actorIRI)
if err != nil {
return err
}
// WARNING: Unlock not deferred.
following, err := w.db.Following(c, actorIRI)
if err != nil {
unlock()
return err
}
items := following.GetActivityStreamsItems()
if items == nil {
items = streams.NewActivityStreamsItemsProperty()
following.SetActivityStreamsItems(items)
}
for iter := activityActors.Begin(); iter != activityActors.End(); iter = iter.Next() {
id, err := ToId(iter)
if err != nil {
unlock()
return err
}
items.PrependIRI(id)
}
err = w.db.Update(c, following)
unlock() // unlock after regardless
if err != nil {
return err
}
// Unlock must be called by now and every branch above.
}
}
if w.Accept != nil {
return w.Accept(c, a)
}
return nil
}
// reject implements the federating Reject activity side effects.
func (w FederatingWrappedCallbacks) reject(c context.Context, a vocab.ActivityStreamsReject) error {
if w.Reject != nil {
return w.Reject(c, a)
}
return nil
}
// add implements the federating Add activity side effects.
func (w FederatingWrappedCallbacks) add(c context.Context, a vocab.ActivityStreamsAdd) error {
op := a.GetActivityStreamsObject()
if op == nil || op.Len() == 0 {
return ErrObjectRequired
}
target := a.GetActivityStreamsTarget()
if target == nil || target.Len() == 0 {
return ErrTargetRequired
}
if err := add(c, op, target, w.db); err != nil {
return err
}
if w.Add != nil {
return w.Add(c, a)
}
return nil
}
// remove implements the federating Remove activity side effects.
func (w FederatingWrappedCallbacks) remove(c context.Context, a vocab.ActivityStreamsRemove) error {
op := a.GetActivityStreamsObject()
if op == nil || op.Len() == 0 {
return ErrObjectRequired
}
target := a.GetActivityStreamsTarget()
if target == nil || target.Len() == 0 {
return ErrTargetRequired
}
if err := remove(c, op, target, w.db); err != nil {
return err
}
if w.Remove != nil {
return w.Remove(c, a)
}
return nil
}
// like implements the federating Like activity side effects.
func (w FederatingWrappedCallbacks) like(c context.Context, a vocab.ActivityStreamsLike) error {
op := a.GetActivityStreamsObject()
if op == nil || op.Len() == 0 {
return ErrObjectRequired
}
id, err := GetId(a)
if err != nil {
return err
}
// Create anonymous loop function to be able to properly scope the defer
// for the database lock at each iteration.
loopFn := func(iter vocab.ActivityStreamsObjectPropertyIterator) error {
objId, err := ToId(iter)
if err != nil {
return err
}
unlock, err := w.db.Lock(c, objId)
if err != nil {
return err
}
defer unlock()
if owns, err := w.db.Owns(c, objId); err != nil {
return err
} else if !owns {
return nil
}
t, err := w.db.Get(c, objId)
if err != nil {
return err
}
l, ok := t.(likeser)
if !ok {
return fmt.Errorf("cannot add Like to likes collection for type %T", t)
}
// Get 'likes' property on the object, creating default if
// necessary.
likes := l.GetActivityStreamsLikes()
if likes == nil {
likes = streams.NewActivityStreamsLikesProperty()
l.SetActivityStreamsLikes(likes)
}
// Get 'likes' value, defaulting to a collection.
likesT := likes.GetType()
if likesT == nil {
col := streams.NewActivityStreamsCollection()
likesT = col
likes.SetActivityStreamsCollection(col)
}
// Prepend the activity's 'id' on the 'likes' Collection or
// OrderedCollection.
if col, ok := likesT.(itemser); ok {
items := col.GetActivityStreamsItems()
if items == nil {
items = streams.NewActivityStreamsItemsProperty()
col.SetActivityStreamsItems(items)
}
items.PrependIRI(id)
} else if oCol, ok := likesT.(orderedItemser); ok {
oItems := oCol.GetActivityStreamsOrderedItems()
if oItems == nil {
oItems = streams.NewActivityStreamsOrderedItemsProperty()
oCol.SetActivityStreamsOrderedItems(oItems)
}
oItems.PrependIRI(id)
} else {
return fmt.Errorf("likes type is neither a Collection nor an OrderedCollection: %T", likesT)
}
err = w.db.Update(c, t)
if err != nil {
return err
}
return nil
}
for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
if err := loopFn(iter); err != nil {
return err
}
}
if w.Like != nil {
return w.Like(c, a)
}
return nil
}
// announce implements the federating Announce activity side effects.
func (w FederatingWrappedCallbacks) announce(c context.Context, a vocab.ActivityStreamsAnnounce) error {
id, err := GetId(a)
if err != nil {
return err
}
op := a.GetActivityStreamsObject()
// Create anonymous loop function to be able to properly scope the defer
// for the database lock at each iteration.
loopFn := func(iter vocab.ActivityStreamsObjectPropertyIterator) error {
objId, err := ToId(iter)
if err != nil {
return err
}
unlock, err := w.db.Lock(c, objId)
if err != nil {
return err
}
defer unlock()
if owns, err := w.db.Owns(c, objId); err != nil {
return err
} else if !owns {
return nil
}
t, err := w.db.Get(c, objId)
if err != nil {
return err
}
s, ok := t.(shareser)
if !ok {
return fmt.Errorf("cannot add Announce to Shares collection for type %T", t)
}
// Get 'shares' property on the object, creating default if
// necessary.
shares := s.GetActivityStreamsShares()
if shares == nil {
shares = streams.NewActivityStreamsSharesProperty()
s.SetActivityStreamsShares(shares)
}
// Get 'shares' value, defaulting to a collection.
sharesT := shares.GetType()
if sharesT == nil {
col := streams.NewActivityStreamsCollection()
sharesT = col
shares.SetActivityStreamsCollection(col)
}
// Prepend the activity's 'id' on the 'shares' Collection or
// OrderedCollection.
if col, ok := sharesT.(itemser); ok {
items := col.GetActivityStreamsItems()
if items == nil {
items = streams.NewActivityStreamsItemsProperty()
col.SetActivityStreamsItems(items)
}
items.PrependIRI(id)
} else if oCol, ok := sharesT.(orderedItemser); ok {
oItems := oCol.GetActivityStreamsOrderedItems()
if oItems == nil {
oItems = streams.NewActivityStreamsOrderedItemsProperty()
oCol.SetActivityStreamsOrderedItems(oItems)
}
oItems.PrependIRI(id)
} else {
return fmt.Errorf("shares type is neither a Collection nor an OrderedCollection: %T", sharesT)
}
err = w.db.Update(c, t)
if err != nil {
return err
}
return nil
}
if op != nil {
for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
if err := loopFn(iter); err != nil {
return err
}
}
}
if w.Announce != nil {
return w.Announce(c, a)
}
return nil
}
// undo implements the federating Undo activity side effects.
func (w FederatingWrappedCallbacks) undo(c context.Context, a vocab.ActivityStreamsUndo) error {
op := a.GetActivityStreamsObject()
if op == nil || op.Len() == 0 {
return ErrObjectRequired
}
actors := a.GetActivityStreamsActor()
if err := mustHaveActivityActorsMatchObjectActors(c, actors, op, w.newTransport, w.inboxIRI); err != nil {
return err
}
if w.Undo != nil {
return w.Undo(c, a)
}
return nil
}
// block implements the federating Block activity side effects.
func (w FederatingWrappedCallbacks) block(c context.Context, a vocab.ActivityStreamsBlock) error {
op := a.GetActivityStreamsObject()
if op == nil || op.Len() == 0 {
return ErrObjectRequired
}
if w.Block != nil {
return w.Block(c, a)
}
return nil
}

View file

@ -0,0 +1,115 @@
package pub
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"code.superseriousbusiness.org/activity/streams"
)
var ErrNotFound = errors.New("go-fed/activity: ActivityStreams data not found")
// HandlerFunc determines whether an incoming HTTP request is an ActivityStreams
// GET request, and if so attempts to serve ActivityStreams data.
//
// If an error is returned, then the calling function is responsible for writing
// to the ResponseWriter as part of error handling.
//
// If 'isASRequest' is false and there is no error, then the calling function
// may continue processing the request, and the HandlerFunc will not have
// written anything to the ResponseWriter. For example, a webpage may be served
// instead.
//
// If 'isASRequest' is true and there is no error, then the HandlerFunc
// successfully served the request and wrote to the ResponseWriter.
//
// Callers are responsible for authorized access to this resource.
type HandlerFunc func(c context.Context, w http.ResponseWriter, r *http.Request) (isASRequest bool, err error)
// NewActivityStreamsHandler creates a HandlerFunc to serve ActivityStreams
// requests which are coming from other clients or servers that wish to obtain
// an ActivityStreams representation of data.
//
// Strips retrieved ActivityStreams values of sensitive fields ('bto' and 'bcc')
// before responding with them. Sets the appropriate HTTP status code for
// Tombstone Activities as well.
//
// Defaults to supporting content to be retrieved by HTTPS only.
func NewActivityStreamsHandler(db Database, clock Clock) HandlerFunc {
return NewActivityStreamsHandlerScheme(db, clock, "https")
}
// NewActivityStreamsHandlerScheme creates a HandlerFunc to serve
// ActivityStreams requests which are coming from other clients or servers that
// wish to obtain an ActivityStreams representation of data provided by the
// specified protocol scheme.
//
// Strips retrieved ActivityStreams values of sensitive fields ('bto' and 'bcc')
// before responding with them. Sets the appropriate HTTP status code for
// Tombstone Activities as well.
//
// Specifying the "scheme" allows for retrieving ActivityStreams content with
// identifiers such as HTTP, HTTPS, or other protocol schemes.
//
// Returns ErrNotFound when the database does not retrieve any data and no
// errors occurred during retrieval.
func NewActivityStreamsHandlerScheme(db Database, clock Clock, scheme string) HandlerFunc {
return func(c context.Context, w http.ResponseWriter, r *http.Request) (isASRequest bool, err error) {
// Do nothing if it is not an ActivityPub GET request
if !isActivityPubGet(r) {
return
}
isASRequest = true
id := requestId(r, scheme)
var unlock func()
// Lock and obtain a copy of the requested ActivityStreams value
unlock, err = db.Lock(c, id)
if err != nil {
return
}
// WARNING: Unlock not deferred
t, err := db.Get(c, id)
unlock() // unlock even on error
if err != nil {
return
}
// Unlock must have been called by this point and in every
// branch above
if t == nil {
err = ErrNotFound
return
}
// Remove sensitive fields.
clearSensitiveFields(t)
// Serialize the fetched value.
m, err := streams.Serialize(t)
if err != nil {
return
}
raw, err := json.Marshal(m)
if err != nil {
return
}
// Construct the response.
addResponseHeaders(w.Header(), clock, raw)
// Write the response.
if streams.IsOrExtendsActivityStreamsTombstone(t) {
w.WriteHeader(http.StatusGone)
} else {
w.WriteHeader(http.StatusOK)
}
n, err := w.Write(raw)
if err != nil {
return
} else if n != len(raw) {
err = fmt.Errorf("only wrote %d of %d bytes", n, len(raw))
return
}
return
}
}

View file

@ -0,0 +1,118 @@
package pub
import (
"net/url"
"code.superseriousbusiness.org/activity/streams/vocab"
)
// inReplyToer is an ActivityStreams type with an 'inReplyTo' property
type inReplyToer interface {
GetActivityStreamsInReplyTo() vocab.ActivityStreamsInReplyToProperty
}
// objecter is an ActivityStreams type with an 'object' property
type objecter interface {
GetActivityStreamsObject() vocab.ActivityStreamsObjectProperty
}
// targeter is an ActivityStreams type with a 'target' property
type targeter interface {
GetActivityStreamsTarget() vocab.ActivityStreamsTargetProperty
}
// tagger is an ActivityStreams type with a 'tag' property
type tagger interface {
GetActivityStreamsTag() vocab.ActivityStreamsTagProperty
}
// hrefer is an ActivityStreams type with a 'href' property
type hrefer interface {
GetActivityStreamsHref() vocab.ActivityStreamsHrefProperty
}
// itemser is an ActivityStreams type with an 'items' property
type itemser interface {
GetActivityStreamsItems() vocab.ActivityStreamsItemsProperty
SetActivityStreamsItems(vocab.ActivityStreamsItemsProperty)
}
// orderedItemser is an ActivityStreams type with an 'orderedItems' property
type orderedItemser interface {
GetActivityStreamsOrderedItems() vocab.ActivityStreamsOrderedItemsProperty
SetActivityStreamsOrderedItems(vocab.ActivityStreamsOrderedItemsProperty)
}
// publisheder is an ActivityStreams type with a 'published' property
type publisheder interface {
GetActivityStreamsPublished() vocab.ActivityStreamsPublishedProperty
}
// updateder is an ActivityStreams type with an 'updateder' property
type updateder interface {
GetActivityStreamsUpdated() vocab.ActivityStreamsUpdatedProperty
}
// toer is an ActivityStreams type with a 'to' property
type toer interface {
GetActivityStreamsTo() vocab.ActivityStreamsToProperty
SetActivityStreamsTo(i vocab.ActivityStreamsToProperty)
}
// btoer is an ActivityStreams type with a 'bto' property
type btoer interface {
GetActivityStreamsBto() vocab.ActivityStreamsBtoProperty
SetActivityStreamsBto(i vocab.ActivityStreamsBtoProperty)
}
// ccer is an ActivityStreams type with a 'cc' property
type ccer interface {
GetActivityStreamsCc() vocab.ActivityStreamsCcProperty
SetActivityStreamsCc(i vocab.ActivityStreamsCcProperty)
}
// bccer is an ActivityStreams type with a 'bcc' property
type bccer interface {
GetActivityStreamsBcc() vocab.ActivityStreamsBccProperty
SetActivityStreamsBcc(i vocab.ActivityStreamsBccProperty)
}
// audiencer is an ActivityStreams type with an 'audience' property
type audiencer interface {
GetActivityStreamsAudience() vocab.ActivityStreamsAudienceProperty
SetActivityStreamsAudience(i vocab.ActivityStreamsAudienceProperty)
}
// inboxer is an ActivityStreams type with an 'inbox' property
type inboxer interface {
GetActivityStreamsInbox() vocab.ActivityStreamsInboxProperty
}
// attributedToer is an ActivityStreams type with an 'attributedTo' property
type attributedToer interface {
GetActivityStreamsAttributedTo() vocab.ActivityStreamsAttributedToProperty
SetActivityStreamsAttributedTo(i vocab.ActivityStreamsAttributedToProperty)
}
// likeser is an ActivityStreams type with a 'likes' property
type likeser interface {
GetActivityStreamsLikes() vocab.ActivityStreamsLikesProperty
SetActivityStreamsLikes(i vocab.ActivityStreamsLikesProperty)
}
// shareser is an ActivityStreams type with a 'shares' property
type shareser interface {
GetActivityStreamsShares() vocab.ActivityStreamsSharesProperty
SetActivityStreamsShares(i vocab.ActivityStreamsSharesProperty)
}
// actorer is an ActivityStreams type with an 'actor' property
type actorer interface {
GetActivityStreamsActor() vocab.ActivityStreamsActorProperty
SetActivityStreamsActor(i vocab.ActivityStreamsActorProperty)
}
// appendIRIer is an ActivityStreams type that can Append IRIs.
type appendIRIer interface {
AppendIRI(v *url.URL)
}

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,83 @@
package pub
import (
"context"
"net/http"
"code.superseriousbusiness.org/activity/streams/vocab"
)
// SocialProtocol contains behaviors an application needs to satisfy for the
// full ActivityPub C2S implementation to be supported by this library.
//
// It is only required if the client application wants to support the client-to-
// server, or social, protocol.
//
// It is passed to the library as a dependency injection from the client
// application.
type SocialProtocol interface {
// Hook callback after parsing the request body for a client request
// to the Actor's outbox.
//
// Can be used to set contextual information based on the
// ActivityStreams object received.
//
// Only called if the Social API is enabled.
//
// Warning: Neither authentication nor authorization has taken place at
// this time. Doing anything beyond setting contextual information is
// strongly discouraged.
//
// If an error is returned, it is passed back to the caller of
// PostOutbox. In this case, the DelegateActor implementation must not
// write a response to the ResponseWriter as is expected that the caller
// to PostOutbox will do so when handling the error.
PostOutboxRequestBodyHook(c context.Context, r *http.Request, data vocab.Type) (context.Context, error)
// AuthenticatePostOutbox delegates the authentication of a POST to an
// outbox.
//
// Only called if the Social API is enabled.
//
// If an error is returned, it is passed back to the caller of
// PostOutbox. In this case, the implementation must not write a
// response to the ResponseWriter as is expected that the client will
// do so when handling the error. The 'authenticated' is ignored.
//
// If no error is returned, but authentication or authorization fails,
// then authenticated must be false and error nil. It is expected that
// the implementation handles writing to the ResponseWriter in this
// case.
//
// Finally, if the authentication and authorization succeeds, then
// authenticated must be true and error nil. The request will continue
// to be processed.
AuthenticatePostOutbox(c context.Context, w http.ResponseWriter, r *http.Request) (out context.Context, authenticated bool, err error)
// SocialCallbacks returns the application logic that handles
// ActivityStreams received from C2S clients.
//
// Note that certain types of callbacks will be 'wrapped' with default
// behaviors supported natively by the library. Other callbacks
// compatible with streams.TypeResolver can be specified by 'other'.
//
// For example, setting the 'Create' field in the SocialWrappedCallbacks
// lets an application dependency inject additional behaviors they want
// to take place, including the default behavior supplied by this
// library. This is guaranteed to be compliant with the ActivityPub
// Social protocol.
//
// To override the default behavior, instead supply the function in
// 'other', which does not guarantee the application will be compliant
// with the ActivityPub Social Protocol.
//
// Applications are not expected to handle every single ActivityStreams
// type and extension. The unhandled ones are passed to DefaultCallback.
SocialCallbacks(c context.Context) (wrapped SocialWrappedCallbacks, other []interface{}, err error)
// DefaultCallback is called for types that go-fed can deserialize but
// are not handled by the application's callbacks returned in the
// Callbacks method.
//
// Applications are not expected to handle every single ActivityStreams
// type and extension, so the unhandled ones are passed to
// DefaultCallback.
DefaultCallback(c context.Context, activity Activity) error
}

View file

@ -0,0 +1,534 @@
package pub
import (
"context"
"fmt"
"net/url"
"code.superseriousbusiness.org/activity/streams"
"code.superseriousbusiness.org/activity/streams/vocab"
)
// SocialWrappedCallbacks lists the callback functions that already have some
// side effect behavior provided by the pub library.
//
// These functions are wrapped for the Social Protocol.
type SocialWrappedCallbacks struct {
// Create handles additional side effects for the Create ActivityStreams
// type.
//
// The wrapping callback copies the actor(s) to the 'attributedTo'
// property and copies recipients between the Create activity and all
// objects. It then saves the entry in the database.
Create func(context.Context, vocab.ActivityStreamsCreate) error
// Update handles additional side effects for the Update ActivityStreams
// type.
//
// The wrapping callback applies new top-level values on an object to
// the stored objects. Any top-level null literals will be deleted on
// the stored objects as well.
Update func(context.Context, vocab.ActivityStreamsUpdate) error
// Delete handles additional side effects for the Delete ActivityStreams
// type.
//
// The wrapping callback replaces the object(s) with tombstones in the
// database.
Delete func(context.Context, vocab.ActivityStreamsDelete) error
// Follow handles additional side effects for the Follow ActivityStreams
// type.
//
// The wrapping callback only ensures the 'Follow' has at least one
// 'object' entry, but otherwise has no default side effect.
Follow func(context.Context, vocab.ActivityStreamsFollow) error
// Add handles additional side effects for the Add ActivityStreams
// type.
//
//
// The wrapping function will add the 'object' IRIs to a specific
// 'target' collection if the 'target' collection(s) live on this
// server.
Add func(context.Context, vocab.ActivityStreamsAdd) error
// Remove handles additional side effects for the Remove ActivityStreams
// type.
//
// The wrapping function will remove all 'object' IRIs from a specific
// 'target' collection if the 'target' collection(s) live on this
// server.
Remove func(context.Context, vocab.ActivityStreamsRemove) error
// Like handles additional side effects for the Like ActivityStreams
// type.
//
// The wrapping function will add the objects on the activity to the
// "liked" collection of this actor.
Like func(context.Context, vocab.ActivityStreamsLike) error
// Undo handles additional side effects for the Undo ActivityStreams
// type.
//
//
// The wrapping function ensures the 'actor' on the 'Undo'
// is be the same as the 'actor' on all Activities being undone.
// It enforces that the actors on the Undo must correspond to all of the
// 'object' actors in some manner.
//
// It is expected that the application will implement the proper
// reversal of activities that are being undone.
Undo func(context.Context, vocab.ActivityStreamsUndo) error
// Block handles additional side effects for the Block ActivityStreams
// type.
//
// The wrapping callback only ensures the 'Block' has at least one
// 'object' entry, but otherwise has no default side effect. It is up
// to the wrapped application function to properly enforce the new
// blocking behavior.
//
// Note that go-fed does not federate 'Block' activities received in the
// Social Protocol.
Block func(context.Context, vocab.ActivityStreamsBlock) error
// Sidechannel data -- this is set at request handling time. These must
// be set before the callbacks are used.
// db is the Database the SocialWrappedCallbacks should use. It must be
// set before calling the callbacks.
db Database
// outboxIRI is the outboxIRI that is handling this callback.
outboxIRI *url.URL
// rawActivity is the JSON map literal received when deserializing the
// request body.
rawActivity map[string]interface{}
// clock is the server's clock.
clock Clock
// newTransport creates a new Transport.
newTransport func(c context.Context, actorBoxIRI *url.URL, gofedAgent string) (t Transport, err error)
// undeliverable is a sidechannel out, indicating if the handled activity
// should not be delivered to a peer.
//
// Its provided default value will always be used when a custom function
// is called.
undeliverable *bool
}
// callbacks returns the WrappedCallbacks members into a single interface slice
// for use in streams.Resolver callbacks.
//
// If the given functions have a type that collides with the default behavior,
// then disable our default behavior
func (w SocialWrappedCallbacks) callbacks(fns []interface{}) []interface{} {
enableCreate := true
enableUpdate := true
enableDelete := true
enableFollow := true
enableAdd := true
enableRemove := true
enableLike := true
enableUndo := true
enableBlock := true
for _, fn := range fns {
switch fn.(type) {
default:
continue
case func(context.Context, vocab.ActivityStreamsCreate) error:
enableCreate = false
case func(context.Context, vocab.ActivityStreamsUpdate) error:
enableUpdate = false
case func(context.Context, vocab.ActivityStreamsDelete) error:
enableDelete = false
case func(context.Context, vocab.ActivityStreamsFollow) error:
enableFollow = false
case func(context.Context, vocab.ActivityStreamsAdd) error:
enableAdd = false
case func(context.Context, vocab.ActivityStreamsRemove) error:
enableRemove = false
case func(context.Context, vocab.ActivityStreamsLike) error:
enableLike = false
case func(context.Context, vocab.ActivityStreamsUndo) error:
enableUndo = false
case func(context.Context, vocab.ActivityStreamsBlock) error:
enableBlock = false
}
}
if enableCreate {
fns = append(fns, w.create)
}
if enableUpdate {
fns = append(fns, w.update)
}
if enableDelete {
fns = append(fns, w.deleteFn)
}
if enableFollow {
fns = append(fns, w.follow)
}
if enableAdd {
fns = append(fns, w.add)
}
if enableRemove {
fns = append(fns, w.remove)
}
if enableLike {
fns = append(fns, w.like)
}
if enableUndo {
fns = append(fns, w.undo)
}
if enableBlock {
fns = append(fns, w.block)
}
return fns
}
// create implements the social Create activity side effects.
func (w SocialWrappedCallbacks) create(c context.Context, a vocab.ActivityStreamsCreate) error {
*w.undeliverable = false
op := a.GetActivityStreamsObject()
if op == nil || op.Len() == 0 {
return ErrObjectRequired
}
// Obtain all actor IRIs.
actors := a.GetActivityStreamsActor()
createActorIds := make(map[string]*url.URL)
if actors != nil {
createActorIds = make(map[string]*url.URL, actors.Len())
for iter := actors.Begin(); iter != actors.End(); iter = iter.Next() {
id, err := ToId(iter)
if err != nil {
return err
}
createActorIds[id.String()] = id
}
}
// Obtain each object's 'attributedTo' IRIs.
objectAttributedToIds := make([]map[string]*url.URL, op.Len())
for i := range objectAttributedToIds {
objectAttributedToIds[i] = make(map[string]*url.URL)
}
for i := 0; i < op.Len(); i++ {
t := op.At(i).GetType()
attrToer, ok := t.(attributedToer)
if !ok {
continue
}
attr := attrToer.GetActivityStreamsAttributedTo()
if attr == nil {
attr = streams.NewActivityStreamsAttributedToProperty()
attrToer.SetActivityStreamsAttributedTo(attr)
}
for iter := attr.Begin(); iter != attr.End(); iter = iter.Next() {
id, err := ToId(iter)
if err != nil {
return err
}
objectAttributedToIds[i][id.String()] = id
}
}
// Put all missing actor IRIs onto all object attributedTo properties.
for k, v := range createActorIds {
for i, attributedToMap := range objectAttributedToIds {
if _, ok := attributedToMap[k]; !ok {
t := op.At(i).GetType()
attrToer, ok := t.(attributedToer)
if !ok {
continue
}
attr := attrToer.GetActivityStreamsAttributedTo()
attr.AppendIRI(v)
}
}
}
// Put all missing object attributedTo IRIs onto the actor property
// if there is one.
if actors != nil {
for _, attributedToMap := range objectAttributedToIds {
for k, v := range attributedToMap {
if _, ok := createActorIds[k]; !ok {
actors.AppendIRI(v)
}
}
}
}
// Copy over the 'to', 'bto', 'cc', 'bcc', and 'audience' recipients
// between the activity and all child objects and vice versa.
if err := normalizeRecipients(a); err != nil {
return err
}
// Create anonymous loop function to be able to properly scope the defer
// for the database lock at each iteration.
loopFn := func(i int) error {
obj := op.At(i).GetType()
id, err := GetId(obj)
if err != nil {
return err
}
var unlock func()
unlock, err = w.db.Lock(c, id)
if err != nil {
return err
}
defer unlock()
if err := w.db.Create(c, obj); err != nil {
return err
}
return nil
}
// Persist all objects we've created, which will include sensitive
// recipients such as 'bcc' and 'bto'.
for i := 0; i < op.Len(); i++ {
if err := loopFn(i); err != nil {
return err
}
}
if w.Create != nil {
return w.Create(c, a)
}
return nil
}
// update implements the social Update activity side effects.
func (w SocialWrappedCallbacks) update(c context.Context, a vocab.ActivityStreamsUpdate) error {
*w.undeliverable = false
op := a.GetActivityStreamsObject()
if op == nil || op.Len() == 0 {
return ErrObjectRequired
}
// Obtain all object ids, which should be owned by this server.
objIds := make([]*url.URL, 0, op.Len())
for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
id, err := ToId(iter)
if err != nil {
return err
}
objIds = append(objIds, id)
}
// Create anonymous loop function to be able to properly scope the defer
// for the database lock at each iteration.
loopFn := func(idx int, loopId *url.URL) error {
unlock, err := w.db.Lock(c, loopId)
if err != nil {
return err
}
defer unlock()
t, err := w.db.Get(c, loopId)
if err != nil {
return err
}
m, err := t.Serialize()
if err != nil {
return err
}
// Copy over new top-level values.
objType := op.At(idx).GetType()
if objType == nil {
return fmt.Errorf("object at index %d is not a literal type value", idx)
}
newM, err := objType.Serialize()
if err != nil {
return err
}
for k, v := range newM {
m[k] = v
}
// Delete top-level values where the raw Activity had nils.
for k, v := range w.rawActivity {
if _, ok := m[k]; v == nil && ok {
delete(m, k)
}
}
newT, err := streams.ToType(c, m)
if err != nil {
return err
}
if err = w.db.Update(c, newT); err != nil {
return err
}
return nil
}
for i, id := range objIds {
if err := loopFn(i, id); err != nil {
return err
}
}
if w.Update != nil {
return w.Update(c, a)
}
return nil
}
// deleteFn implements the social Delete activity side effects.
func (w SocialWrappedCallbacks) deleteFn(c context.Context, a vocab.ActivityStreamsDelete) error {
*w.undeliverable = false
op := a.GetActivityStreamsObject()
if op == nil || op.Len() == 0 {
return ErrObjectRequired
}
// Obtain all object ids, which should be owned by this server.
objIds := make([]*url.URL, 0, op.Len())
for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
id, err := ToId(iter)
if err != nil {
return err
}
objIds = append(objIds, id)
}
// Create anonymous loop function to be able to properly scope the defer
// for the database lock at each iteration.
loopFn := func(idx int, loopId *url.URL) error {
unlock, err := w.db.Lock(c, loopId)
if err != nil {
return err
}
defer unlock()
t, err := w.db.Get(c, loopId)
if err != nil {
return err
}
tomb := toTombstone(t, loopId, w.clock.Now())
if err := w.db.Update(c, tomb); err != nil {
return err
}
return nil
}
for i, id := range objIds {
if err := loopFn(i, id); err != nil {
return err
}
}
if w.Delete != nil {
return w.Delete(c, a)
}
return nil
}
// follow implements the social Follow activity side effects.
func (w SocialWrappedCallbacks) follow(c context.Context, a vocab.ActivityStreamsFollow) error {
*w.undeliverable = false
op := a.GetActivityStreamsObject()
if op == nil || op.Len() == 0 {
return ErrObjectRequired
}
if w.Follow != nil {
return w.Follow(c, a)
}
return nil
}
// add implements the social Add activity side effects.
func (w SocialWrappedCallbacks) add(c context.Context, a vocab.ActivityStreamsAdd) error {
*w.undeliverable = false
op := a.GetActivityStreamsObject()
if op == nil || op.Len() == 0 {
return ErrObjectRequired
}
target := a.GetActivityStreamsTarget()
if target == nil || target.Len() == 0 {
return ErrTargetRequired
}
if err := add(c, op, target, w.db); err != nil {
return err
}
if w.Add != nil {
return w.Add(c, a)
}
return nil
}
// remove implements the social Remove activity side effects.
func (w SocialWrappedCallbacks) remove(c context.Context, a vocab.ActivityStreamsRemove) error {
*w.undeliverable = false
op := a.GetActivityStreamsObject()
if op == nil || op.Len() == 0 {
return ErrObjectRequired
}
target := a.GetActivityStreamsTarget()
if target == nil || target.Len() == 0 {
return ErrTargetRequired
}
if err := remove(c, op, target, w.db); err != nil {
return err
}
if w.Remove != nil {
return w.Remove(c, a)
}
return nil
}
// like implements the social Like activity side effects.
func (w SocialWrappedCallbacks) like(c context.Context, a vocab.ActivityStreamsLike) error {
*w.undeliverable = false
op := a.GetActivityStreamsObject()
if op == nil || op.Len() == 0 {
return ErrObjectRequired
}
// Get this actor's IRI.
unlock, err := w.db.Lock(c, w.outboxIRI)
if err != nil {
return err
}
// WARNING: Unlock not deferred.
actorIRI, err := w.db.ActorForOutbox(c, w.outboxIRI)
unlock() // unlock even on error
if err != nil {
return err
}
// Unlock must be called by now and every branch above.
//
// Now obtain this actor's 'liked' collection.
unlock, err = w.db.Lock(c, actorIRI)
if err != nil {
return err
}
defer unlock()
liked, err := w.db.Liked(c, actorIRI)
if err != nil {
return err
}
likedItems := liked.GetActivityStreamsItems()
if likedItems == nil {
likedItems = streams.NewActivityStreamsItemsProperty()
liked.SetActivityStreamsItems(likedItems)
}
for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
objId, err := ToId(iter)
if err != nil {
return err
}
likedItems.PrependIRI(objId)
}
err = w.db.Update(c, liked)
if err != nil {
return err
}
if w.Like != nil {
return w.Like(c, a)
}
return nil
}
// undo implements the social Undo activity side effects.
func (w SocialWrappedCallbacks) undo(c context.Context, a vocab.ActivityStreamsUndo) error {
*w.undeliverable = false
op := a.GetActivityStreamsObject()
if op == nil || op.Len() == 0 {
return ErrObjectRequired
}
actors := a.GetActivityStreamsActor()
if err := mustHaveActivityActorsMatchObjectActors(c, actors, op, w.newTransport, w.outboxIRI); err != nil {
return err
}
if w.Undo != nil {
return w.Undo(c, a)
}
return nil
}
// block implements the social Block activity side effects.
func (w SocialWrappedCallbacks) block(c context.Context, a vocab.ActivityStreamsBlock) error {
*w.undeliverable = true
op := a.GetActivityStreamsObject()
if op == nil || op.Len() == 0 {
return ErrObjectRequired
}
if w.Block != nil {
return w.Block(c, a)
}
return nil
}

View file

@ -0,0 +1,219 @@
package pub
import (
"bytes"
"context"
"crypto"
"encoding/json"
"fmt"
"net/http"
"net/url"
"strings"
"sync"
"code.superseriousbusiness.org/httpsig"
)
const (
// acceptHeaderValue is the Accept header value indicating that the
// response should contain an ActivityStreams object.
acceptHeaderValue = "application/ld+json; profile=\"https://www.w3.org/ns/activitystreams\""
)
// isSuccess returns true if the HTTP status code is either OK, Created, or
// Accepted.
func isSuccess(code int) bool {
return code == http.StatusOK ||
code == http.StatusCreated ||
code == http.StatusAccepted
}
// Transport makes ActivityStreams calls to other servers in order to send or
// receive ActivityStreams data.
//
// It is responsible for setting the appropriate request headers, signing the
// requests if needed, and facilitating the traffic between this server and
// another.
//
// The transport is exclusively used to issue requests on behalf of an actor,
// and is never sending requests on behalf of the server in general.
//
// It may be reused multiple times, but never concurrently.
type Transport interface {
// Dereference fetches the ActivityStreams object located at this IRI with
// a GET request. Note that Response will only be returned on status = OK.
Dereference(c context.Context, iri *url.URL) (*http.Response, error)
// Deliver sends an ActivityStreams object.
Deliver(c context.Context, obj map[string]interface{}, to *url.URL) error
// BatchDeliver sends an ActivityStreams object to multiple recipients.
BatchDeliver(c context.Context, obj map[string]interface{}, recipients []*url.URL) error
}
// Transport must be implemented by HttpSigTransport.
var _ Transport = &HttpSigTransport{}
// HttpSigTransport makes a dereference call using HTTP signatures to
// authenticate the request on behalf of a particular actor.
//
// No rate limiting is applied.
//
// Only one request is tried per call.
type HttpSigTransport struct {
client HttpClient
appAgent string
gofedAgent string
clock Clock
getSigner httpsig.Signer
getSignerMu *sync.Mutex
postSigner httpsig.Signer
postSignerMu *sync.Mutex
pubKeyId string
privKey crypto.PrivateKey
}
// NewHttpSigTransport returns a new Transport.
//
// It sends requests specifically on behalf of a specific actor on this server.
// The actor's credentials are used to add an HTTP Signature to requests, which
// requires an actor's private key, a unique identifier for their public key,
// and an HTTP Signature signing algorithm.
//
// The client lets users issue requests through any HTTP client, including the
// standard library's HTTP client.
//
// The appAgent uniquely identifies the calling application's requests, so peers
// may aid debugging the requests incoming from this server. Note that the
// agent string will also include one for go-fed, so at minimum peer servers can
// reach out to the go-fed library to aid in notifying implementors of malformed
// or unsupported requests.
func NewHttpSigTransport(
client HttpClient,
appAgent string,
clock Clock,
getSigner, postSigner httpsig.Signer,
pubKeyId string,
privKey crypto.PrivateKey) *HttpSigTransport {
return &HttpSigTransport{
client: client,
appAgent: appAgent,
gofedAgent: goFedUserAgent(),
clock: clock,
getSigner: getSigner,
getSignerMu: &sync.Mutex{},
postSigner: postSigner,
postSignerMu: &sync.Mutex{},
pubKeyId: pubKeyId,
privKey: privKey,
}
}
// Dereference sends a GET request signed with an HTTP Signature to obtain an ActivityStreams value.
func (h HttpSigTransport) Dereference(c context.Context, iri *url.URL) (*http.Response, error) {
req, err := http.NewRequest("GET", iri.String(), nil)
if err != nil {
return nil, err
}
req = req.WithContext(c)
req.Header.Add(acceptHeader, acceptHeaderValue)
req.Header.Add("Accept-Charset", "utf-8")
req.Header.Add("Date", h.clock.Now().UTC().Format("Mon, 02 Jan 2006 15:04:05")+" GMT")
req.Header.Add("User-Agent", fmt.Sprintf("%s %s", h.appAgent, h.gofedAgent))
req.Header.Set("Host", iri.Host)
h.getSignerMu.Lock()
err = h.getSigner.SignRequest(h.privKey, h.pubKeyId, req, nil)
h.getSignerMu.Unlock()
if err != nil {
return nil, err
}
resp, err := h.client.Do(req)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
_ = resp.Body.Close()
return nil, fmt.Errorf("GET request to %s failed (%d): %s", iri.String(), resp.StatusCode, resp.Status)
}
return resp, nil
}
// Deliver sends a POST request with an HTTP Signature.
func (h HttpSigTransport) Deliver(c context.Context, data map[string]interface{}, to *url.URL) error {
b, err := json.Marshal(data)
if err != nil {
return err
}
return h.deliver(c, b, to)
}
// BatchDeliver sends concurrent POST requests. Returns an error if any of the requests had an error.
func (h HttpSigTransport) BatchDeliver(c context.Context, data map[string]interface{}, recipients []*url.URL) error {
b, err := json.Marshal(data)
if err != nil {
return err
}
var wg sync.WaitGroup
errCh := make(chan error, len(recipients))
for _, recipient := range recipients {
wg.Add(1)
go func(r *url.URL) {
defer wg.Done()
if err := h.deliver(c, b, r); err != nil {
errCh <- err
}
}(recipient)
}
wg.Wait()
errs := make([]string, 0, len(recipients))
outer:
for {
select {
case e := <-errCh:
errs = append(errs, e.Error())
default:
break outer
}
}
if len(errs) > 0 {
return fmt.Errorf("batch deliver had at least one failure: %s", strings.Join(errs, "; "))
}
return nil
}
func (h HttpSigTransport) deliver(c context.Context, b []byte, to *url.URL) error {
req, err := http.NewRequest("POST", to.String(), bytes.NewReader(b))
if err != nil {
return err
}
req = req.WithContext(c)
req.Header.Add(contentTypeHeader, contentTypeHeaderValue)
req.Header.Add("Accept-Charset", "utf-8")
req.Header.Add("Date", h.clock.Now().UTC().Format("Mon, 02 Jan 2006 15:04:05")+" GMT")
req.Header.Add("User-Agent", fmt.Sprintf("%s %s", h.appAgent, h.gofedAgent))
req.Header.Set("Host", to.Host)
h.postSignerMu.Lock()
err = h.postSigner.SignRequest(h.privKey, h.pubKeyId, req, b)
h.postSignerMu.Unlock()
if err != nil {
return err
}
resp, err := h.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if !isSuccess(resp.StatusCode) {
return fmt.Errorf("POST request to %s failed (%d): %s", to.String(), resp.StatusCode, resp.Status)
}
return nil
}
// HttpClient sends http requests, and is an abstraction only needed by the
// HttpSigTransport. The standard library's Client satisfies this interface.
type HttpClient interface {
Do(req *http.Request) (*http.Response, error)
}
// HttpClient must be implemented by http.Client.
var _ HttpClient = &http.Client{}

1077
vendor/code.superseriousbusiness.org/activity/pub/util.go generated vendored Normal file

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,15 @@
package pub
import (
"fmt"
)
const (
// Version string, used in the User-Agent
version = "v1.0.0"
)
// goFedUserAgent returns the user agent string for the go-fed library.
func goFedUserAgent() string {
return fmt.Sprintf("(go-fed/activity %s)", version)
}