| 
									
										
										
										
											2023-03-12 16:00:57 +01:00
										 |  |  | // GoToSocial | 
					
						
							|  |  |  | // Copyright (C) GoToSocial Authors admin@gotosocial.org | 
					
						
							|  |  |  | // SPDX-License-Identifier: AGPL-3.0-or-later | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // This program is free software: you can redistribute it and/or modify | 
					
						
							|  |  |  | // it under the terms of the GNU Affero General Public License as published by | 
					
						
							|  |  |  | // the Free Software Foundation, either version 3 of the License, or | 
					
						
							|  |  |  | // (at your option) any later version. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // This program is distributed in the hope that it will be useful, | 
					
						
							|  |  |  | // but WITHOUT ANY WARRANTY; without even the implied warranty of | 
					
						
							|  |  |  | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | 
					
						
							|  |  |  | // GNU Affero General Public License for more details. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // You should have received a copy of the GNU Affero General Public License | 
					
						
							|  |  |  | // along with this program.  If not, see <http://www.gnu.org/licenses/>. | 
					
						
							| 
									
										
										
										
											2021-10-04 15:24:19 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-02-22 16:05:26 +01:00
										 |  |  | package stream | 
					
						
							| 
									
										
										
										
											2021-06-19 11:18:55 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							| 
									
										
										
										
											2021-08-25 15:34:33 +02:00
										 |  |  | 	"context" | 
					
						
							| 
									
										
										
										
											2021-06-19 11:18:55 +02:00
										 |  |  | 	"errors" | 
					
						
							|  |  |  | 	"fmt" | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-07-19 09:47:55 +01:00
										 |  |  | 	"codeberg.org/gruf/go-kv" | 
					
						
							| 
									
										
										
										
											2021-06-19 11:18:55 +02:00
										 |  |  | 	"github.com/superseriousbusiness/gotosocial/internal/gtserror" | 
					
						
							|  |  |  | 	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" | 
					
						
							|  |  |  | 	"github.com/superseriousbusiness/gotosocial/internal/id" | 
					
						
							| 
									
										
										
										
											2022-07-19 09:47:55 +01:00
										 |  |  | 	"github.com/superseriousbusiness/gotosocial/internal/log" | 
					
						
							| 
									
										
										
										
											2021-09-01 18:29:25 +02:00
										 |  |  | 	"github.com/superseriousbusiness/gotosocial/internal/stream" | 
					
						
							| 
									
										
										
										
											2021-06-19 11:18:55 +02:00
										 |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-02-22 16:05:26 +01:00
										 |  |  | // Open returns a new Stream for the given account, which will contain a channel for passing messages back to the caller. | 
					
						
							| 
									
										
										
										
											2023-05-25 10:37:38 +02:00
										 |  |  | func (p *Processor) Open(ctx context.Context, account *gtsmodel.Account, streamType string) (*stream.Stream, gtserror.WithCode) { | 
					
						
							| 
									
										
										
										
											2023-02-22 16:05:26 +01:00
										 |  |  | 	l := log.WithContext(ctx).WithFields(kv.Fields{ | 
					
						
							|  |  |  | 		{"account", account.ID}, | 
					
						
							| 
									
										
										
										
											2023-05-25 10:37:38 +02:00
										 |  |  | 		{"streamType", streamType}, | 
					
						
							| 
									
										
										
										
											2023-02-22 16:05:26 +01:00
										 |  |  | 	}...) | 
					
						
							| 
									
										
										
										
											2021-06-19 11:18:55 +02:00
										 |  |  | 	l.Debug("received open stream request") | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-05-25 10:37:38 +02:00
										 |  |  | 	var ( | 
					
						
							|  |  |  | 		streamID string | 
					
						
							|  |  |  | 		err      error | 
					
						
							|  |  |  | 	) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Each stream needs a unique ID so we know to close it. | 
					
						
							|  |  |  | 	streamID, err = id.NewRandomULID() | 
					
						
							| 
									
										
										
										
											2021-06-19 11:18:55 +02:00
										 |  |  | 	if err != nil { | 
					
						
							| 
									
										
										
										
											2023-05-25 10:37:38 +02:00
										 |  |  | 		return nil, gtserror.NewErrorInternalError(fmt.Errorf("error generating stream id: %w", err)) | 
					
						
							| 
									
										
										
										
											2021-06-19 11:18:55 +02:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-05-25 10:37:38 +02:00
										 |  |  | 	// Each stream can be subscibed to multiple types. | 
					
						
							| 
									
										
										
										
											2023-03-11 02:10:58 -08:00
										 |  |  | 	// Record them in a set, and include the initial one | 
					
						
							| 
									
										
										
										
											2023-05-25 10:37:38 +02:00
										 |  |  | 	// if it was given to us. | 
					
						
							|  |  |  | 	streamTypes := map[string]any{} | 
					
						
							|  |  |  | 	if streamType != "" { | 
					
						
							|  |  |  | 		streamTypes[streamType] = true | 
					
						
							| 
									
										
										
										
											2023-03-11 02:10:58 -08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-05-25 10:37:38 +02:00
										 |  |  | 	newStream := &stream.Stream{ | 
					
						
							|  |  |  | 		ID:          streamID, | 
					
						
							|  |  |  | 		StreamTypes: streamTypes, | 
					
						
							|  |  |  | 		Messages:    make(chan *stream.Message, 100), | 
					
						
							|  |  |  | 		Hangup:      make(chan interface{}, 1), | 
					
						
							|  |  |  | 		Connected:   true, | 
					
						
							| 
									
										
										
										
											2021-06-19 11:18:55 +02:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2023-05-25 10:37:38 +02:00
										 |  |  | 	go p.waitToCloseStream(account, newStream) | 
					
						
							| 
									
										
										
										
											2021-06-19 11:18:55 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	v, ok := p.streamMap.Load(account.ID) | 
					
						
							| 
									
										
										
										
											2023-05-25 10:37:38 +02:00
										 |  |  | 	if ok { | 
					
						
							|  |  |  | 		// There is an entry in the streamMap | 
					
						
							|  |  |  | 		// for this account. Parse it out. | 
					
						
							| 
									
										
										
										
											2021-09-01 18:29:25 +02:00
										 |  |  | 		streamsForAccount, ok := v.(*stream.StreamsForAccount) | 
					
						
							| 
									
										
										
										
											2021-06-19 11:18:55 +02:00
										 |  |  | 		if !ok { | 
					
						
							|  |  |  | 			return nil, gtserror.NewErrorInternalError(errors.New("stream map error")) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-05-25 10:37:38 +02:00
										 |  |  | 		// Append new stream to existing entry. | 
					
						
							| 
									
										
										
										
											2021-06-19 11:18:55 +02:00
										 |  |  | 		streamsForAccount.Lock() | 
					
						
							| 
									
										
										
										
											2023-05-25 10:37:38 +02:00
										 |  |  | 		streamsForAccount.Streams = append(streamsForAccount.Streams, newStream) | 
					
						
							| 
									
										
										
										
											2021-06-19 11:18:55 +02:00
										 |  |  | 		streamsForAccount.Unlock() | 
					
						
							| 
									
										
										
										
											2023-05-25 10:37:38 +02:00
										 |  |  | 	} else { | 
					
						
							|  |  |  | 		// There is no entry in the streamMap for | 
					
						
							|  |  |  | 		// this account yet. Create one and store it. | 
					
						
							|  |  |  | 		p.streamMap.Store(account.ID, &stream.StreamsForAccount{ | 
					
						
							|  |  |  | 			Streams: []*stream.Stream{ | 
					
						
							|  |  |  | 				newStream, | 
					
						
							|  |  |  | 			}, | 
					
						
							|  |  |  | 		}) | 
					
						
							| 
									
										
										
										
											2021-06-19 11:18:55 +02:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-05-25 10:37:38 +02:00
										 |  |  | 	return newStream, nil | 
					
						
							| 
									
										
										
										
											2021-06-19 11:18:55 +02:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // waitToCloseStream waits until the hangup channel is closed for the given stream. | 
					
						
							|  |  |  | // It then iterates through the map of streams stored by the processor, removes the stream from it, | 
					
						
							|  |  |  | // and then closes the messages channel of the stream to indicate that the channel should no longer be read from. | 
					
						
							| 
									
										
										
										
											2023-02-22 16:05:26 +01:00
										 |  |  | func (p *Processor) waitToCloseStream(account *gtsmodel.Account, thisStream *stream.Stream) { | 
					
						
							| 
									
										
										
										
											2021-06-19 11:18:55 +02:00
										 |  |  | 	<-thisStream.Hangup // wait for a hangup message | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// lock the stream to prevent more messages being put in it while we work | 
					
						
							|  |  |  | 	thisStream.Lock() | 
					
						
							|  |  |  | 	defer thisStream.Unlock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// indicate the stream is no longer connected | 
					
						
							|  |  |  | 	thisStream.Connected = false | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// load and parse the entry for this account from the stream map | 
					
						
							|  |  |  | 	v, ok := p.streamMap.Load(account.ID) | 
					
						
							|  |  |  | 	if !ok || v == nil { | 
					
						
							|  |  |  | 		return | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2021-09-01 18:29:25 +02:00
										 |  |  | 	streamsForAccount, ok := v.(*stream.StreamsForAccount) | 
					
						
							| 
									
										
										
										
											2021-06-19 11:18:55 +02:00
										 |  |  | 	if !ok { | 
					
						
							|  |  |  | 		return | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// lock the streams for account while we remove this stream from its slice | 
					
						
							|  |  |  | 	streamsForAccount.Lock() | 
					
						
							|  |  |  | 	defer streamsForAccount.Unlock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// put everything into modified streams *except* the stream we're removing | 
					
						
							| 
									
										
										
										
											2021-09-01 18:29:25 +02:00
										 |  |  | 	modifiedStreams := []*stream.Stream{} | 
					
						
							| 
									
										
										
										
											2021-06-19 11:18:55 +02:00
										 |  |  | 	for _, s := range streamsForAccount.Streams { | 
					
						
							|  |  |  | 		if s.ID != thisStream.ID { | 
					
						
							|  |  |  | 			modifiedStreams = append(modifiedStreams, s) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	streamsForAccount.Streams = modifiedStreams | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// finally close the messages channel so no more messages can be read from it | 
					
						
							|  |  |  | 	close(thisStream.Messages) | 
					
						
							|  |  |  | } |