mirror of
				https://github.com/superseriousbusiness/gotosocial.git
				synced 2025-10-31 05:32:25 -05:00 
			
		
		
		
	* rewrite Stream{} to use much less mutex locking, update related code
* use new context for the stream context
* ensure stream gets closed on return of writeTo / readFrom WSConn()
* ensure stream write timeout gets cancelled
* remove embedded context type from Stream{}, reformat log messages for consistency
* use c.Request.Context() for context passed into Stream().Open()
* only return 1 boolean, fix tests to expect multiple stream types in messages
* changes to ping logic
* further improved ping logic
* don't export unused function types, update message sending to only include relevant stream type
* ensure stream gets closed 🤦
* update to error log on failed json marshal (instead of panic)
* inverse websocket read error checking to _ignore_ expected close errors
		
	
			
		
			
				
	
	
		
			381 lines
		
	
	
	
		
			8.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			381 lines
		
	
	
	
		
			8.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // GoToSocial
 | |
| // Copyright (C) GoToSocial Authors admin@gotosocial.org
 | |
| // SPDX-License-Identifier: AGPL-3.0-or-later
 | |
| //
 | |
| // This program is free software: you can redistribute it and/or modify
 | |
| // it under the terms of the GNU Affero General Public License as published by
 | |
| // the Free Software Foundation, either version 3 of the License, or
 | |
| // (at your option) any later version.
 | |
| //
 | |
| // This program is distributed in the hope that it will be useful,
 | |
| // but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
| // GNU Affero General Public License for more details.
 | |
| //
 | |
| // You should have received a copy of the GNU Affero General Public License
 | |
| // along with this program.  If not, see <http://www.gnu.org/licenses/>.
 | |
| 
 | |
| package stream
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"maps"
 | |
| 	"slices"
 | |
| 	"sync"
 | |
| 	"sync/atomic"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	// EventTypeNotification -- a user
 | |
| 	// should be shown a notification.
 | |
| 	EventTypeNotification = "notification"
 | |
| 
 | |
| 	// EventTypeUpdate -- a user should
 | |
| 	// be shown an update in their timeline.
 | |
| 	EventTypeUpdate = "update"
 | |
| 
 | |
| 	// EventTypeDelete -- something
 | |
| 	// should be deleted from a user.
 | |
| 	EventTypeDelete = "delete"
 | |
| 
 | |
| 	// EventTypeStatusUpdate -- something in the
 | |
| 	// user's timeline has been edited (yes this
 | |
| 	// is a confusing name, blame Mastodon ...).
 | |
| 	EventTypeStatusUpdate = "status.update"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	// TimelineLocal:
 | |
| 	// All public posts originating from this
 | |
| 	// server. Analogous to the local timeline.
 | |
| 	TimelineLocal = "public:local"
 | |
| 
 | |
| 	// TimelinePublic:
 | |
| 	// All public posts known to the server.
 | |
| 	// Analogous to the federated timeline.
 | |
| 	TimelinePublic = "public"
 | |
| 
 | |
| 	// TimelineHome:
 | |
| 	// Events related to the current user, such
 | |
| 	// as home feed updates and notifications.
 | |
| 	TimelineHome = "user"
 | |
| 
 | |
| 	// TimelineNotifications:
 | |
| 	// Notifications for the current user.
 | |
| 	TimelineNotifications = "user:notification"
 | |
| 
 | |
| 	// TimelineDirect:
 | |
| 	// Updates to direct conversations.
 | |
| 	TimelineDirect = "direct"
 | |
| 
 | |
| 	// TimelineList:
 | |
| 	// Updates to a specific list.
 | |
| 	TimelineList = "list"
 | |
| )
 | |
| 
 | |
| // AllStatusTimelines contains all Timelines
 | |
| // that a status could conceivably be delivered
 | |
| // to, useful for sending out status deletes.
 | |
| var AllStatusTimelines = []string{
 | |
| 	TimelineLocal,
 | |
| 	TimelinePublic,
 | |
| 	TimelineHome,
 | |
| 	TimelineDirect,
 | |
| 	TimelineList,
 | |
| }
 | |
| 
 | |
| type Streams struct {
 | |
| 	streams map[string][]*Stream
 | |
| 	mutex   sync.Mutex
 | |
| }
 | |
| 
 | |
| // Open will open open a new Stream for given account ID and stream types, the given context will be passed to Stream.
 | |
| func (s *Streams) Open(accountID string, streamTypes ...string) *Stream {
 | |
| 	if len(streamTypes) == 0 {
 | |
| 		panic("no stream types given")
 | |
| 	}
 | |
| 
 | |
| 	// Prep new Stream.
 | |
| 	str := new(Stream)
 | |
| 	str.done = make(chan struct{})
 | |
| 	str.msgCh = make(chan Message, 50) // TODO: make configurable
 | |
| 	for _, streamType := range streamTypes {
 | |
| 		str.Subscribe(streamType)
 | |
| 	}
 | |
| 
 | |
| 	// TODO: add configurable
 | |
| 	// max streams per account.
 | |
| 
 | |
| 	// Acquire lock.
 | |
| 	s.mutex.Lock()
 | |
| 
 | |
| 	if s.streams == nil {
 | |
| 		// Main stream-map needs allocating.
 | |
| 		s.streams = make(map[string][]*Stream)
 | |
| 	}
 | |
| 
 | |
| 	// Add new stream for account.
 | |
| 	strs := s.streams[accountID]
 | |
| 	strs = append(strs, str)
 | |
| 	s.streams[accountID] = strs
 | |
| 
 | |
| 	// Register close callback
 | |
| 	// to remove stream from our
 | |
| 	// internal map for this account.
 | |
| 	str.close = func() {
 | |
| 		s.mutex.Lock()
 | |
| 		strs := s.streams[accountID]
 | |
| 		strs = slices.DeleteFunc(strs, func(s *Stream) bool {
 | |
| 			return s == str // remove 'str' ptr
 | |
| 		})
 | |
| 		s.streams[accountID] = strs
 | |
| 		s.mutex.Unlock()
 | |
| 	}
 | |
| 
 | |
| 	// Done with lock.
 | |
| 	s.mutex.Unlock()
 | |
| 
 | |
| 	return str
 | |
| }
 | |
| 
 | |
| // Post will post the given message to all streams of given account ID matching type.
 | |
| func (s *Streams) Post(ctx context.Context, accountID string, msg Message) bool {
 | |
| 	var deferred []func() bool
 | |
| 
 | |
| 	// Acquire lock.
 | |
| 	s.mutex.Lock()
 | |
| 
 | |
| 	// Iterate all streams stored for account.
 | |
| 	for _, str := range s.streams[accountID] {
 | |
| 
 | |
| 		// Check whether stream supports any of our message targets.
 | |
| 		if stype := str.getStreamType(msg.Stream...); stype != "" {
 | |
| 
 | |
| 			// Rescope var
 | |
| 			// to prevent
 | |
| 			// ptr reuse.
 | |
| 			stream := str
 | |
| 
 | |
| 			// Use a message copy to *only*
 | |
| 			// include the supported stream.
 | |
| 			msgCopy := Message{
 | |
| 				Stream:  []string{stype},
 | |
| 				Event:   msg.Event,
 | |
| 				Payload: msg.Payload,
 | |
| 			}
 | |
| 
 | |
| 			// Send message to supported stream
 | |
| 			// DEFERRED (i.e. OUTSIDE OF MAIN MUTEX).
 | |
| 			// This prevents deadlocks between each
 | |
| 			// msg channel and main Streams{} mutex.
 | |
| 			deferred = append(deferred, func() bool {
 | |
| 				return stream.send(ctx, msgCopy)
 | |
| 			})
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Done with lock.
 | |
| 	s.mutex.Unlock()
 | |
| 
 | |
| 	var ok bool
 | |
| 
 | |
| 	// Execute deferred outside lock.
 | |
| 	for _, deferfn := range deferred {
 | |
| 		v := deferfn()
 | |
| 		ok = ok && v
 | |
| 	}
 | |
| 
 | |
| 	return ok
 | |
| }
 | |
| 
 | |
| // PostAll will post the given message to all streams with matching types.
 | |
| func (s *Streams) PostAll(ctx context.Context, msg Message) bool {
 | |
| 	var deferred []func() bool
 | |
| 
 | |
| 	// Acquire lock.
 | |
| 	s.mutex.Lock()
 | |
| 
 | |
| 	// Iterate ALL stored streams.
 | |
| 	for _, strs := range s.streams {
 | |
| 		for _, str := range strs {
 | |
| 
 | |
| 			// Check whether stream supports any of our message targets.
 | |
| 			if stype := str.getStreamType(msg.Stream...); stype != "" {
 | |
| 
 | |
| 				// Rescope var
 | |
| 				// to prevent
 | |
| 				// ptr reuse.
 | |
| 				stream := str
 | |
| 
 | |
| 				// Use a message copy to *only*
 | |
| 				// include the supported stream.
 | |
| 				msgCopy := Message{
 | |
| 					Stream:  []string{stype},
 | |
| 					Event:   msg.Event,
 | |
| 					Payload: msg.Payload,
 | |
| 				}
 | |
| 
 | |
| 				// Send message to supported stream
 | |
| 				// DEFERRED (i.e. OUTSIDE OF MAIN MUTEX).
 | |
| 				// This prevents deadlocks between each
 | |
| 				// msg channel and main Streams{} mutex.
 | |
| 				deferred = append(deferred, func() bool {
 | |
| 					return stream.send(ctx, msgCopy)
 | |
| 				})
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Done with lock.
 | |
| 	s.mutex.Unlock()
 | |
| 
 | |
| 	var ok bool
 | |
| 
 | |
| 	// Execute deferred outside lock.
 | |
| 	for _, deferfn := range deferred {
 | |
| 		v := deferfn()
 | |
| 		ok = ok && v
 | |
| 	}
 | |
| 
 | |
| 	return ok
 | |
| }
 | |
| 
 | |
| // Stream represents one
 | |
| // open stream for a client.
 | |
| type Stream struct {
 | |
| 
 | |
| 	// atomically updated ptr to a read-only copy
 | |
| 	// of supported stream types in a hashmap. this
 | |
| 	// gets updated via CAS operations in .cas().
 | |
| 	types atomic.Pointer[map[string]struct{}]
 | |
| 
 | |
| 	// protects stream close.
 | |
| 	done chan struct{}
 | |
| 
 | |
| 	// inbound msg ch.
 | |
| 	msgCh chan Message
 | |
| 
 | |
| 	// close hook to remove
 | |
| 	// stream from Streams{}.
 | |
| 	close func()
 | |
| }
 | |
| 
 | |
| // Subscribe will add given type to given types this stream supports.
 | |
| func (s *Stream) Subscribe(streamType string) {
 | |
| 	s.cas(func(m map[string]struct{}) bool {
 | |
| 		if _, ok := m[streamType]; ok {
 | |
| 			return false
 | |
| 		}
 | |
| 		m[streamType] = struct{}{}
 | |
| 		return true
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // Unsubscribe will remove given type (if found) from types this stream supports.
 | |
| func (s *Stream) Unsubscribe(streamType string) {
 | |
| 	s.cas(func(m map[string]struct{}) bool {
 | |
| 		if _, ok := m[streamType]; !ok {
 | |
| 			return false
 | |
| 		}
 | |
| 		delete(m, streamType)
 | |
| 		return true
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // getStreamType returns the first stream type in given list that stream supports.
 | |
| func (s *Stream) getStreamType(streamTypes ...string) string {
 | |
| 	if ptr := s.types.Load(); ptr != nil {
 | |
| 		for _, streamType := range streamTypes {
 | |
| 			if _, ok := (*ptr)[streamType]; ok {
 | |
| 				return streamType
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	return ""
 | |
| }
 | |
| 
 | |
| // send will block on posting a new Message{}, returning early with
 | |
| // a false value if provided context is canceled, or stream closed.
 | |
| func (s *Stream) send(ctx context.Context, msg Message) bool {
 | |
| 	select {
 | |
| 	case <-s.done:
 | |
| 		return false
 | |
| 	case <-ctx.Done():
 | |
| 		return false
 | |
| 	case s.msgCh <- msg:
 | |
| 		return true
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Recv will block on receiving Message{}, returning early with a
 | |
| // false value if provided context is canceled, or stream closed.
 | |
| func (s *Stream) Recv(ctx context.Context) (Message, bool) {
 | |
| 	select {
 | |
| 	case <-s.done:
 | |
| 		return Message{}, false
 | |
| 	case <-ctx.Done():
 | |
| 		return Message{}, false
 | |
| 	case msg := <-s.msgCh:
 | |
| 		return msg, true
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Close will close the underlying context, finally
 | |
| // removing it from the parent Streams per-account-map.
 | |
| func (s *Stream) Close() {
 | |
| 	select {
 | |
| 	case <-s.done:
 | |
| 	default:
 | |
| 		close(s.done)
 | |
| 		s.close()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // cas will perform a Compare And Swap operation on s.types using modifier func.
 | |
| func (s *Stream) cas(fn func(map[string]struct{}) bool) {
 | |
| 	if fn == nil {
 | |
| 		panic("nil function")
 | |
| 	}
 | |
| 	for {
 | |
| 		var m map[string]struct{}
 | |
| 
 | |
| 		// Get current value.
 | |
| 		ptr := s.types.Load()
 | |
| 
 | |
| 		if ptr == nil {
 | |
| 			// Allocate new types map.
 | |
| 			m = make(map[string]struct{})
 | |
| 		} else {
 | |
| 			// Clone r-only map.
 | |
| 			m = maps.Clone(*ptr)
 | |
| 		}
 | |
| 
 | |
| 		// Apply
 | |
| 		// changes.
 | |
| 		if !fn(m) {
 | |
| 			return
 | |
| 		}
 | |
| 
 | |
| 		// Attempt to Compare And Swap ptr.
 | |
| 		if s.types.CompareAndSwap(ptr, &m) {
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Message represents
 | |
| // one streamed message.
 | |
| type Message struct {
 | |
| 
 | |
| 	// All the stream types this
 | |
| 	// message should be delivered to.
 | |
| 	Stream []string `json:"stream"`
 | |
| 
 | |
| 	// The event type of the message
 | |
| 	// (update/delete/notification etc)
 | |
| 	Event string `json:"event"`
 | |
| 
 | |
| 	// The actual payload of the message. In case of an
 | |
| 	// update or notification, this will be a JSON string.
 | |
| 	Payload string `json:"payload"`
 | |
| }
 |