mirror of
				https://github.com/superseriousbusiness/gotosocial.git
				synced 2025-10-30 23:22:26 -05:00 
			
		
		
		
	additional faffing around with streaming
This commit is contained in:
		
					parent
					
						
							
								0cee5aa569
							
						
					
				
			
			
				commit
				
					
						42b8333d1b
					
				
			
		
					 9 changed files with 171 additions and 28 deletions
				
			
		|  | @ -10,6 +10,8 @@ import ( | |||
| ) | ||||
| 
 | ||||
| func (m *Module) StreamGETHandler(c *gin.Context) { | ||||
| 	l := m.log.WithField("func", "StreamGETHandler") | ||||
| 
 | ||||
| 	streamType := c.Query(StreamQueryKey) | ||||
| 	if streamType == "" { | ||||
| 		c.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("no stream type provided under query key %s", StreamQueryKey)}) | ||||
|  | @ -33,14 +35,18 @@ func (m *Module) StreamGETHandler(c *gin.Context) { | |||
| 		ReadBufferSize:   1024, | ||||
| 		WriteBufferSize:  1024, | ||||
| 		Subprotocols:     []string{"wss"}, | ||||
| 		CheckOrigin: func(r *http.Request) bool { | ||||
| 			return true | ||||
| 		}, | ||||
| 	} | ||||
| 
 | ||||
| 	conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) | ||||
| 	if err != nil { | ||||
| 		l.Infof("error upgrading websocket connection: %s", err) | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	if errWithCode := m.processor.StreamForAccount(conn, account, streamType); errWithCode != nil { | ||||
| 	if errWithCode := m.processor.OpenStreamForAccount(conn, account, streamType); errWithCode != nil { | ||||
| 		c.JSON(errWithCode.Code(), errWithCode.Safe()) | ||||
| 	} | ||||
| } | ||||
|  |  | |||
|  | @ -39,6 +39,7 @@ import ( | |||
| 	"github.com/superseriousbusiness/gotosocial/internal/media" | ||||
| 	"github.com/superseriousbusiness/gotosocial/internal/oauth" | ||||
| 	"github.com/superseriousbusiness/gotosocial/internal/processing" | ||||
| 	"github.com/superseriousbusiness/gotosocial/internal/api/client/streaming" | ||||
| 	"github.com/superseriousbusiness/gotosocial/internal/router" | ||||
| 	timelineprocessing "github.com/superseriousbusiness/gotosocial/internal/timeline" | ||||
| 	"github.com/superseriousbusiness/gotosocial/internal/transport" | ||||
|  | @ -134,6 +135,7 @@ var Start cliactions.GTSAction = func(ctx context.Context, c *config.Config, log | |||
| 	adminModule := admin.New(c, processor, log) | ||||
| 	statusModule := status.New(c, processor, log) | ||||
| 	securityModule := security.New(c, log) | ||||
| 	streamingModule := streaming.New(c, processor, log) | ||||
| 
 | ||||
| 	apis := []api.ClientModule{ | ||||
| 		// modules with middleware go first | ||||
|  | @ -157,6 +159,7 @@ var Start cliactions.GTSAction = func(ctx context.Context, c *config.Config, log | |||
| 		filtersModule, | ||||
| 		emojiModule, | ||||
| 		listsModule, | ||||
| 		streamingModule, | ||||
| 	} | ||||
| 
 | ||||
| 	for _, m := range apis { | ||||
|  |  | |||
|  | @ -324,6 +324,15 @@ func (p *processor) timelineStatusForAccount(status *gtsmodel.Status, accountID | |||
| 	if err := p.timelineManager.IngestAndPrepare(status, timelineAccount.ID); err != nil { | ||||
| 		errors <- fmt.Errorf("timelineStatusForAccount: error ingesting status %s: %s", status.ID, err) | ||||
| 	} | ||||
| 
 | ||||
| 	mastoStatus, err := p.tc.StatusToMasto(status, timelineAccount) | ||||
| 	if err != nil { | ||||
| 		errors <- fmt.Errorf("timelineStatusForAccount: error converting status %s to frontend representation: %s", status.ID, err) | ||||
| 	} else { | ||||
| 		if err := p.streamingProcessor.StreamStatusForAccount(mastoStatus, timelineAccount); err != nil { | ||||
| 			errors <- fmt.Errorf("timelineStatusForAccount: error streaming status %s: %s", status.ID, err) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (p *processor) deleteStatusFromTimelines(status *gtsmodel.Status) error { | ||||
|  |  | |||
|  | @ -136,8 +136,8 @@ type Processor interface { | |||
| 
 | ||||
| 	// AuthorizeStreamingRequest returns an oauth2 token info in response to an access token query from the streaming API | ||||
| 	AuthorizeStreamingRequest(accessToken string) (*gtsmodel.Account, error) | ||||
| 	// StreamForAccount streams to websocket connection c for an account, with the given streamType. | ||||
| 	StreamForAccount(c *websocket.Conn, account *gtsmodel.Account, streamType string) gtserror.WithCode | ||||
| 	// OpenStreamForAccount streams to websocket connection c for an account, with the given streamType. | ||||
| 	OpenStreamForAccount(c *websocket.Conn, account *gtsmodel.Account, streamType string) gtserror.WithCode | ||||
| 
 | ||||
| 	/* | ||||
| 		FEDERATION API-FACING PROCESSING FUNCTIONS | ||||
|  |  | |||
|  | @ -10,6 +10,6 @@ func (p *processor) AuthorizeStreamingRequest(accessToken string) (*gtsmodel.Acc | |||
| 	return p.streamingProcessor.AuthorizeStreamingRequest(accessToken) | ||||
| } | ||||
| 
 | ||||
| func (p *processor) StreamForAccount(c *websocket.Conn, account *gtsmodel.Account, streamType string) gtserror.WithCode { | ||||
| 	return p.streamingProcessor.StreamForAccount(c, account, streamType) | ||||
| func (p *processor) OpenStreamForAccount(c *websocket.Conn, account *gtsmodel.Account, streamType string) gtserror.WithCode { | ||||
| 	return p.streamingProcessor.OpenStreamForAccount(c, account, streamType) | ||||
| } | ||||
|  |  | |||
							
								
								
									
										108
									
								
								internal/processing/synchronous/streaming/openstream.go
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										108
									
								
								internal/processing/synchronous/streaming/openstream.go
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,108 @@ | |||
| package streaming | ||||
| 
 | ||||
| import ( | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"sync" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/gorilla/websocket" | ||||
| 	"github.com/sirupsen/logrus" | ||||
| 	"github.com/superseriousbusiness/gotosocial/internal/gtserror" | ||||
| 	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" | ||||
| 	"github.com/superseriousbusiness/gotosocial/internal/id" | ||||
| ) | ||||
| 
 | ||||
| func (p *processor) OpenStreamForAccount(conn *websocket.Conn, account *gtsmodel.Account, streamType string) gtserror.WithCode { | ||||
| 	l := p.log.WithFields(logrus.Fields{ | ||||
| 		"func":       "OpenStreamForAccount", | ||||
| 		"account":    account.ID, | ||||
| 		"streamType": streamType, | ||||
| 	}) | ||||
| 	l.Debug("received open stream request") | ||||
| 
 | ||||
| 	streamID, err := id.NewRandomULID() | ||||
| 	if err != nil { | ||||
| 		return gtserror.NewErrorInternalError(fmt.Errorf("error generating stream id: %s", err)) | ||||
| 	} | ||||
| 
 | ||||
| 	thisStream := &stream{ | ||||
| 		streamID:   streamID, | ||||
| 		streamType: streamType, | ||||
| 		conn:       conn, | ||||
| 	} | ||||
| 
 | ||||
| 	v, ok := p.streamMap.Load(account.ID) | ||||
| 	if !ok || v == nil { | ||||
| 		// there is no entry in the streamMap for this account yet, so make one and store it | ||||
| 		streams := &streamsForAccount{ | ||||
| 			s: []*stream{ | ||||
| 				thisStream, | ||||
| 			}, | ||||
| 		} | ||||
| 		p.streamMap.Store(account.ID, streams) | ||||
| 	} else { | ||||
| 		// there is an entry in the streamMap for this account | ||||
| 		// parse the interface as a streamsForAccount | ||||
| 		streams, ok := v.(*streamsForAccount) | ||||
| 		if !ok { | ||||
| 			return gtserror.NewErrorInternalError(errors.New("stream map error")) | ||||
| 		} | ||||
| 
 | ||||
| 		// append this stream to it | ||||
| 		streams.Lock() | ||||
| 		streams.s = append(streams.s, thisStream) | ||||
| 		streams.Unlock() | ||||
| 	} | ||||
| 
 | ||||
| 	// set the close handler to remove the given stream from the stream map so that messages stop getting put into it | ||||
| 	conn.SetCloseHandler(func(code int, text string) error { | ||||
| 		l.Debug("closing stream") | ||||
| 		v, ok := p.streamMap.Load(account.ID) | ||||
| 		if !ok || v == nil { | ||||
| 			// the map doesn't contain an entry for the account anyway, so we can just return | ||||
| 			// this probably should never happen but let's check anyway | ||||
| 			return nil | ||||
| 		} | ||||
| 
 | ||||
| 		// parse the interface as a streamsForAccount | ||||
| 		streams, ok := v.(*streamsForAccount) | ||||
| 		if !ok { | ||||
| 			return gtserror.NewErrorInternalError(errors.New("stream map error")) | ||||
| 		} | ||||
| 
 | ||||
| 		// remove thisStream from the slice of streams stored in streamsForAccount | ||||
| 		streams.Lock() | ||||
| 		newStreamSlice := []*stream{} | ||||
| 		for _, s := range streams.s { | ||||
| 			if s.streamID != thisStream.streamID { | ||||
| 				newStreamSlice = append(newStreamSlice, s) | ||||
| 			} | ||||
| 		} | ||||
| 		streams.s = newStreamSlice | ||||
| 		streams.Unlock() | ||||
| 		l.Debug("stream closed") | ||||
| 		return nil | ||||
| 	}) | ||||
| 
 | ||||
| 	defer conn.Close() | ||||
| 	t := time.NewTicker(60 * time.Second) | ||||
| 	for range t.C { | ||||
| 		if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil { | ||||
| 			return gtserror.NewErrorInternalError(err) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| type streamsForAccount struct { | ||||
| 	s []*stream | ||||
| 	sync.Mutex | ||||
| } | ||||
| 
 | ||||
| type stream struct { | ||||
| 	streamID   string | ||||
| 	streamType string | ||||
| 	conn       *websocket.Conn | ||||
| } | ||||
|  | @ -1,22 +0,0 @@ | |||
| package streaming | ||||
| 
 | ||||
| import ( | ||||
| 	"github.com/gorilla/websocket" | ||||
| 	"github.com/superseriousbusiness/gotosocial/internal/gtserror" | ||||
| 	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" | ||||
| ) | ||||
| 
 | ||||
| func (p *processor) StreamForAccount(c *websocket.Conn, account *gtsmodel.Account, streamType string) gtserror.WithCode { | ||||
| 
 | ||||
| 	v, loaded := p.streamMap.LoadOrStore(account.ID, sync.Slice) | ||||
| 	if loaded { | ||||
| 
 | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| type streams struct { | ||||
| 	accountID string | ||||
| 	 | ||||
| } | ||||
|  | @ -12,13 +12,15 @@ import ( | |||
| 	"github.com/superseriousbusiness/gotosocial/internal/oauth" | ||||
| 	"github.com/superseriousbusiness/gotosocial/internal/typeutils" | ||||
| 	"github.com/superseriousbusiness/gotosocial/internal/visibility" | ||||
| 	apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" | ||||
| ) | ||||
| 
 | ||||
| // Processor wraps a bunch of functions for processing streaming. | ||||
| type Processor interface { | ||||
| 	// AuthorizeStreamingRequest returns an oauth2 token info in response to an access token query from the streaming API | ||||
| 	AuthorizeStreamingRequest(accessToken string) (*gtsmodel.Account, error) | ||||
| 	StreamForAccount(c *websocket.Conn, account *gtsmodel.Account, streamType string) gtserror.WithCode | ||||
| 	OpenStreamForAccount(c *websocket.Conn, account *gtsmodel.Account, streamType string) gtserror.WithCode | ||||
| 	StreamStatusForAccount(s *apimodel.Status, account *gtsmodel.Account) error | ||||
| } | ||||
| 
 | ||||
| type processor struct { | ||||
|  |  | |||
							
								
								
									
										37
									
								
								internal/processing/synchronous/streaming/streamstatus.go
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										37
									
								
								internal/processing/synchronous/streaming/streamstatus.go
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,37 @@ | |||
| package streaming | ||||
| 
 | ||||
| import ( | ||||
| 	"errors" | ||||
| 
 | ||||
| 	"github.com/sirupsen/logrus" | ||||
| 	apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" | ||||
| 	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" | ||||
| ) | ||||
| 
 | ||||
| func (p *processor) StreamStatusForAccount(s *apimodel.Status, account *gtsmodel.Account) error { | ||||
| 	l := p.log.WithFields(logrus.Fields{ | ||||
| 		"func":    "StreamStatusForAccount", | ||||
| 		"account": account.ID, | ||||
| 	}) | ||||
| 	v, ok := p.streamMap.Load(account.ID) | ||||
| 	if !ok { | ||||
| 		// no open connections so nothing to stream | ||||
| 		return nil | ||||
| 	} | ||||
| 
 | ||||
| 	streams, ok := v.(*streamsForAccount) | ||||
| 	if !ok { | ||||
| 		return errors.New("stream map error") | ||||
| 	} | ||||
| 
 | ||||
| 	streams.Lock() | ||||
| 	defer streams.Unlock() | ||||
| 	for _, stream := range streams.s { | ||||
| 		l.Debugf("streaming status to stream id %s", stream.streamID) | ||||
| 		if err := stream.conn.WriteJSON(s); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue