mirror of
				https://github.com/superseriousbusiness/gotosocial.git
				synced 2025-11-02 20:02:25 -06:00 
			
		
		
		
	* Implement conversations API * Sort and page conversations by last status ID * Appease linter * Fix deleting conversations and statuses * Refactor to make migrations automatic * Lint * Update tests post-merge * Fixes from live-fire testing * Linter caught a format problem * Refactor tests, fix cache * Negative test for non-DMs * Run conversations advanced migration on testrig startup as well as regular server startup * Document (lack of) side effects of API method for deleting a conversation * Make not-found check less nested for readability * Rename PutConversation to UpsertConversation * Use util.Ptr instead of IIFE * Reduce cache used by conversations * Remove unnecessary TableExpr/ColumnExpr * Use struct tags for both unique constraints on Conversation * Make it clear how paging with GetDirectStatusIDsBatch should be used * Let conversation paging skip conversations it can't render * Use Bun NewDropTable * Convert delete raw query to Bun * Convert update raw query to Bun * Convert latestConversationStatusesTempTable raw query partially to Bun * Convert conversationStatusesTempTable raw query partially to Bun * Rename field used to store result of MaxDirectStatusID * Move advanced migrations to their own tiny processor * Catch up util function name with main * Remove json.… wrappers * Remove redundant check * Combine error checks * Replace map with slice of structs * Address processor/type converter comments - Add context info for errors - Extract some common processor code into shared methods - Move conversation eligibility check ahead of populating conversation * Add error context when dropping temp tables
		
			
				
	
	
		
			389 lines
		
	
	
	
		
			9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			389 lines
		
	
	
	
		
			9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// GoToSocial
 | 
						|
// Copyright (C) GoToSocial Authors admin@gotosocial.org
 | 
						|
// SPDX-License-Identifier: AGPL-3.0-or-later
 | 
						|
//
 | 
						|
// This program is free software: you can redistribute it and/or modify
 | 
						|
// it under the terms of the GNU Affero General Public License as published by
 | 
						|
// the Free Software Foundation, either version 3 of the License, or
 | 
						|
// (at your option) any later version.
 | 
						|
//
 | 
						|
// This program is distributed in the hope that it will be useful,
 | 
						|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
// GNU Affero General Public License for more details.
 | 
						|
//
 | 
						|
// You should have received a copy of the GNU Affero General Public License
 | 
						|
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
 | 
						|
 | 
						|
package stream
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"maps"
 | 
						|
	"slices"
 | 
						|
	"sync"
 | 
						|
	"sync/atomic"
 | 
						|
)
 | 
						|
 | 
						|
const (
 | 
						|
	// EventTypeNotification -- a user
 | 
						|
	// should be shown a notification.
 | 
						|
	EventTypeNotification = "notification"
 | 
						|
 | 
						|
	// EventTypeUpdate -- a user should
 | 
						|
	// be shown an update in their timeline.
 | 
						|
	EventTypeUpdate = "update"
 | 
						|
 | 
						|
	// EventTypeDelete -- something
 | 
						|
	// should be deleted from a user.
 | 
						|
	EventTypeDelete = "delete"
 | 
						|
 | 
						|
	// EventTypeStatusUpdate -- something in the
 | 
						|
	// user's timeline has been edited (yes this
 | 
						|
	// is a confusing name, blame Mastodon ...).
 | 
						|
	EventTypeStatusUpdate = "status.update"
 | 
						|
 | 
						|
	// EventTypeFiltersChanged -- the user's filters
 | 
						|
	// (including keywords and statuses) have changed.
 | 
						|
	EventTypeFiltersChanged = "filters_changed"
 | 
						|
 | 
						|
	// EventTypeConversation -- a user
 | 
						|
	// should be shown an updated conversation.
 | 
						|
	EventTypeConversation = "conversation"
 | 
						|
)
 | 
						|
 | 
						|
const (
 | 
						|
	// TimelineLocal:
 | 
						|
	// All public posts originating from this
 | 
						|
	// server. Analogous to the local timeline.
 | 
						|
	TimelineLocal = "public:local"
 | 
						|
 | 
						|
	// TimelinePublic:
 | 
						|
	// All public posts known to the server.
 | 
						|
	// Analogous to the federated timeline.
 | 
						|
	TimelinePublic = "public"
 | 
						|
 | 
						|
	// TimelineHome:
 | 
						|
	// Events related to the current user, such
 | 
						|
	// as home feed updates and notifications.
 | 
						|
	TimelineHome = "user"
 | 
						|
 | 
						|
	// TimelineNotifications:
 | 
						|
	// Notifications for the current user.
 | 
						|
	TimelineNotifications = "user:notification"
 | 
						|
 | 
						|
	// TimelineDirect:
 | 
						|
	// Updates to direct conversations.
 | 
						|
	TimelineDirect = "direct"
 | 
						|
 | 
						|
	// TimelineList:
 | 
						|
	// Updates to a specific list.
 | 
						|
	TimelineList = "list"
 | 
						|
)
 | 
						|
 | 
						|
// AllStatusTimelines contains all Timelines
 | 
						|
// that a status could conceivably be delivered
 | 
						|
// to, useful for sending out status deletes.
 | 
						|
var AllStatusTimelines = []string{
 | 
						|
	TimelineLocal,
 | 
						|
	TimelinePublic,
 | 
						|
	TimelineHome,
 | 
						|
	TimelineDirect,
 | 
						|
	TimelineList,
 | 
						|
}
 | 
						|
 | 
						|
type Streams struct {
 | 
						|
	streams map[string][]*Stream
 | 
						|
	mutex   sync.Mutex
 | 
						|
}
 | 
						|
 | 
						|
// Open will open open a new Stream for given account ID and stream types, the given context will be passed to Stream.
 | 
						|
func (s *Streams) Open(accountID string, streamTypes ...string) *Stream {
 | 
						|
	if len(streamTypes) == 0 {
 | 
						|
		panic("no stream types given")
 | 
						|
	}
 | 
						|
 | 
						|
	// Prep new Stream.
 | 
						|
	str := new(Stream)
 | 
						|
	str.done = make(chan struct{})
 | 
						|
	str.msgCh = make(chan Message, 50) // TODO: make configurable
 | 
						|
	for _, streamType := range streamTypes {
 | 
						|
		str.Subscribe(streamType)
 | 
						|
	}
 | 
						|
 | 
						|
	// TODO: add configurable
 | 
						|
	// max streams per account.
 | 
						|
 | 
						|
	// Acquire lock.
 | 
						|
	s.mutex.Lock()
 | 
						|
 | 
						|
	if s.streams == nil {
 | 
						|
		// Main stream-map needs allocating.
 | 
						|
		s.streams = make(map[string][]*Stream)
 | 
						|
	}
 | 
						|
 | 
						|
	// Add new stream for account.
 | 
						|
	strs := s.streams[accountID]
 | 
						|
	strs = append(strs, str)
 | 
						|
	s.streams[accountID] = strs
 | 
						|
 | 
						|
	// Register close callback
 | 
						|
	// to remove stream from our
 | 
						|
	// internal map for this account.
 | 
						|
	str.close = func() {
 | 
						|
		s.mutex.Lock()
 | 
						|
		strs := s.streams[accountID]
 | 
						|
		strs = slices.DeleteFunc(strs, func(s *Stream) bool {
 | 
						|
			return s == str // remove 'str' ptr
 | 
						|
		})
 | 
						|
		s.streams[accountID] = strs
 | 
						|
		s.mutex.Unlock()
 | 
						|
	}
 | 
						|
 | 
						|
	// Done with lock.
 | 
						|
	s.mutex.Unlock()
 | 
						|
 | 
						|
	return str
 | 
						|
}
 | 
						|
 | 
						|
// Post will post the given message to all streams of given account ID matching type.
 | 
						|
func (s *Streams) Post(ctx context.Context, accountID string, msg Message) bool {
 | 
						|
	var deferred []func() bool
 | 
						|
 | 
						|
	// Acquire lock.
 | 
						|
	s.mutex.Lock()
 | 
						|
 | 
						|
	// Iterate all streams stored for account.
 | 
						|
	for _, str := range s.streams[accountID] {
 | 
						|
 | 
						|
		// Check whether stream supports any of our message targets.
 | 
						|
		if stype := str.getStreamType(msg.Stream...); stype != "" {
 | 
						|
 | 
						|
			// Rescope var
 | 
						|
			// to prevent
 | 
						|
			// ptr reuse.
 | 
						|
			stream := str
 | 
						|
 | 
						|
			// Use a message copy to *only*
 | 
						|
			// include the supported stream.
 | 
						|
			msgCopy := Message{
 | 
						|
				Stream:  []string{stype},
 | 
						|
				Event:   msg.Event,
 | 
						|
				Payload: msg.Payload,
 | 
						|
			}
 | 
						|
 | 
						|
			// Send message to supported stream
 | 
						|
			// DEFERRED (i.e. OUTSIDE OF MAIN MUTEX).
 | 
						|
			// This prevents deadlocks between each
 | 
						|
			// msg channel and main Streams{} mutex.
 | 
						|
			deferred = append(deferred, func() bool {
 | 
						|
				return stream.send(ctx, msgCopy)
 | 
						|
			})
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// Done with lock.
 | 
						|
	s.mutex.Unlock()
 | 
						|
 | 
						|
	var ok bool
 | 
						|
 | 
						|
	// Execute deferred outside lock.
 | 
						|
	for _, deferfn := range deferred {
 | 
						|
		v := deferfn()
 | 
						|
		ok = ok && v
 | 
						|
	}
 | 
						|
 | 
						|
	return ok
 | 
						|
}
 | 
						|
 | 
						|
// PostAll will post the given message to all streams with matching types.
 | 
						|
func (s *Streams) PostAll(ctx context.Context, msg Message) bool {
 | 
						|
	var deferred []func() bool
 | 
						|
 | 
						|
	// Acquire lock.
 | 
						|
	s.mutex.Lock()
 | 
						|
 | 
						|
	// Iterate ALL stored streams.
 | 
						|
	for _, strs := range s.streams {
 | 
						|
		for _, str := range strs {
 | 
						|
 | 
						|
			// Check whether stream supports any of our message targets.
 | 
						|
			if stype := str.getStreamType(msg.Stream...); stype != "" {
 | 
						|
 | 
						|
				// Rescope var
 | 
						|
				// to prevent
 | 
						|
				// ptr reuse.
 | 
						|
				stream := str
 | 
						|
 | 
						|
				// Use a message copy to *only*
 | 
						|
				// include the supported stream.
 | 
						|
				msgCopy := Message{
 | 
						|
					Stream:  []string{stype},
 | 
						|
					Event:   msg.Event,
 | 
						|
					Payload: msg.Payload,
 | 
						|
				}
 | 
						|
 | 
						|
				// Send message to supported stream
 | 
						|
				// DEFERRED (i.e. OUTSIDE OF MAIN MUTEX).
 | 
						|
				// This prevents deadlocks between each
 | 
						|
				// msg channel and main Streams{} mutex.
 | 
						|
				deferred = append(deferred, func() bool {
 | 
						|
					return stream.send(ctx, msgCopy)
 | 
						|
				})
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// Done with lock.
 | 
						|
	s.mutex.Unlock()
 | 
						|
 | 
						|
	var ok bool
 | 
						|
 | 
						|
	// Execute deferred outside lock.
 | 
						|
	for _, deferfn := range deferred {
 | 
						|
		v := deferfn()
 | 
						|
		ok = ok && v
 | 
						|
	}
 | 
						|
 | 
						|
	return ok
 | 
						|
}
 | 
						|
 | 
						|
// Stream represents one
 | 
						|
// open stream for a client.
 | 
						|
type Stream struct {
 | 
						|
 | 
						|
	// atomically updated ptr to a read-only copy
 | 
						|
	// of supported stream types in a hashmap. this
 | 
						|
	// gets updated via CAS operations in .cas().
 | 
						|
	types atomic.Pointer[map[string]struct{}]
 | 
						|
 | 
						|
	// protects stream close.
 | 
						|
	done chan struct{}
 | 
						|
 | 
						|
	// inbound msg ch.
 | 
						|
	msgCh chan Message
 | 
						|
 | 
						|
	// close hook to remove
 | 
						|
	// stream from Streams{}.
 | 
						|
	close func()
 | 
						|
}
 | 
						|
 | 
						|
// Subscribe will add given type to given types this stream supports.
 | 
						|
func (s *Stream) Subscribe(streamType string) {
 | 
						|
	s.cas(func(m map[string]struct{}) bool {
 | 
						|
		if _, ok := m[streamType]; ok {
 | 
						|
			return false
 | 
						|
		}
 | 
						|
		m[streamType] = struct{}{}
 | 
						|
		return true
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
// Unsubscribe will remove given type (if found) from types this stream supports.
 | 
						|
func (s *Stream) Unsubscribe(streamType string) {
 | 
						|
	s.cas(func(m map[string]struct{}) bool {
 | 
						|
		if _, ok := m[streamType]; !ok {
 | 
						|
			return false
 | 
						|
		}
 | 
						|
		delete(m, streamType)
 | 
						|
		return true
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
// getStreamType returns the first stream type in given list that stream supports.
 | 
						|
func (s *Stream) getStreamType(streamTypes ...string) string {
 | 
						|
	if ptr := s.types.Load(); ptr != nil {
 | 
						|
		for _, streamType := range streamTypes {
 | 
						|
			if _, ok := (*ptr)[streamType]; ok {
 | 
						|
				return streamType
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return ""
 | 
						|
}
 | 
						|
 | 
						|
// send will block on posting a new Message{}, returning early with
 | 
						|
// a false value if provided context is canceled, or stream closed.
 | 
						|
func (s *Stream) send(ctx context.Context, msg Message) bool {
 | 
						|
	select {
 | 
						|
	case <-s.done:
 | 
						|
		return false
 | 
						|
	case <-ctx.Done():
 | 
						|
		return false
 | 
						|
	case s.msgCh <- msg:
 | 
						|
		return true
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Recv will block on receiving Message{}, returning early with a
 | 
						|
// false value if provided context is canceled, or stream closed.
 | 
						|
func (s *Stream) Recv(ctx context.Context) (Message, bool) {
 | 
						|
	select {
 | 
						|
	case <-s.done:
 | 
						|
		return Message{}, false
 | 
						|
	case <-ctx.Done():
 | 
						|
		return Message{}, false
 | 
						|
	case msg := <-s.msgCh:
 | 
						|
		return msg, true
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Close will close the underlying context, finally
 | 
						|
// removing it from the parent Streams per-account-map.
 | 
						|
func (s *Stream) Close() {
 | 
						|
	select {
 | 
						|
	case <-s.done:
 | 
						|
	default:
 | 
						|
		close(s.done)
 | 
						|
		s.close()
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// cas will perform a Compare And Swap operation on s.types using modifier func.
 | 
						|
func (s *Stream) cas(fn func(map[string]struct{}) bool) {
 | 
						|
	if fn == nil {
 | 
						|
		panic("nil function")
 | 
						|
	}
 | 
						|
	for {
 | 
						|
		var m map[string]struct{}
 | 
						|
 | 
						|
		// Get current value.
 | 
						|
		ptr := s.types.Load()
 | 
						|
 | 
						|
		if ptr == nil {
 | 
						|
			// Allocate new types map.
 | 
						|
			m = make(map[string]struct{})
 | 
						|
		} else {
 | 
						|
			// Clone r-only map.
 | 
						|
			m = maps.Clone(*ptr)
 | 
						|
		}
 | 
						|
 | 
						|
		// Apply
 | 
						|
		// changes.
 | 
						|
		if !fn(m) {
 | 
						|
			return
 | 
						|
		}
 | 
						|
 | 
						|
		// Attempt to Compare And Swap ptr.
 | 
						|
		if s.types.CompareAndSwap(ptr, &m) {
 | 
						|
			return
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Message represents
 | 
						|
// one streamed message.
 | 
						|
type Message struct {
 | 
						|
 | 
						|
	// All the stream types this
 | 
						|
	// message should be delivered to.
 | 
						|
	Stream []string `json:"stream"`
 | 
						|
 | 
						|
	// The event type of the message
 | 
						|
	// (update/delete/notification etc)
 | 
						|
	Event string `json:"event"`
 | 
						|
 | 
						|
	// The actual payload of the message. In case of an
 | 
						|
	// update or notification, this will be a JSON string.
 | 
						|
	Payload string `json:"payload"`
 | 
						|
}
 |