| 
									
										
										
										
											2021-06-19 11:18:55 +02:00
										 |  |  | package streaming | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							|  |  |  | 	"encoding/json" | 
					
						
							|  |  |  | 	"errors" | 
					
						
							|  |  |  | 	"fmt" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	"github.com/sirupsen/logrus" | 
					
						
							|  |  |  | 	apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" | 
					
						
							|  |  |  | 	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" | 
					
						
							| 
									
										
										
										
											2021-09-01 18:29:25 +02:00
										 |  |  | 	"github.com/superseriousbusiness/gotosocial/internal/stream" | 
					
						
							| 
									
										
										
										
											2021-06-19 11:18:55 +02:00
										 |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (p *processor) StreamStatusToAccount(s *apimodel.Status, account *gtsmodel.Account) error { | 
					
						
							|  |  |  | 	l := p.log.WithFields(logrus.Fields{ | 
					
						
							|  |  |  | 		"func":    "StreamStatusForAccount", | 
					
						
							|  |  |  | 		"account": account.ID, | 
					
						
							|  |  |  | 	}) | 
					
						
							|  |  |  | 	v, ok := p.streamMap.Load(account.ID) | 
					
						
							|  |  |  | 	if !ok { | 
					
						
							|  |  |  | 		// no open connections so nothing to stream | 
					
						
							|  |  |  | 		return nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-09-01 18:29:25 +02:00
										 |  |  | 	streamsForAccount, ok := v.(*stream.StreamsForAccount) | 
					
						
							| 
									
										
										
										
											2021-06-19 11:18:55 +02:00
										 |  |  | 	if !ok { | 
					
						
							|  |  |  | 		return errors.New("stream map error") | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	statusBytes, err := json.Marshal(s) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return fmt.Errorf("error marshalling status to json: %s", err) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	streamsForAccount.Lock() | 
					
						
							|  |  |  | 	defer streamsForAccount.Unlock() | 
					
						
							| 
									
										
										
										
											2021-09-01 18:29:25 +02:00
										 |  |  | 	for _, s := range streamsForAccount.Streams { | 
					
						
							|  |  |  | 		s.Lock() | 
					
						
							|  |  |  | 		defer s.Unlock() | 
					
						
							|  |  |  | 		if s.Connected { | 
					
						
							|  |  |  | 			l.Debugf("streaming status to stream id %s", s.ID) | 
					
						
							|  |  |  | 			s.Messages <- &stream.Message{ | 
					
						
							|  |  |  | 				Stream:  []string{s.Type}, | 
					
						
							| 
									
										
										
										
											2021-06-19 11:18:55 +02:00
										 |  |  | 				Event:   "update", | 
					
						
							|  |  |  | 				Payload: string(statusBytes), | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return nil | 
					
						
							|  |  |  | } |