2023-03-12 16:00:57 +01:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								// GoToSocial  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// Copyright (C) GoToSocial Authors admin@gotosocial.org  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// SPDX-License-Identifier: AGPL-3.0-or-later  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// This program is free software: you can redistribute it and/or modify  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// it under the terms of the GNU Affero General Public License as published by  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// the Free Software Foundation, either version 3 of the License, or  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// (at your option) any later version.  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// This program is distributed in the hope that it will be useful,  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// but WITHOUT ANY WARRANTY; without even the implied warranty of  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// GNU Affero General Public License for more details.  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// You should have received a copy of the GNU Affero General Public License  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// along with this program.  If not, see <http://www.gnu.org/licenses/>.  
						 
					
						
							
								
									
										
										
										
											2023-01-02 13:10:50 +01:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2021-06-19 11:18:55 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								package  streaming  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  (  
						 
					
						
							
								
									
										
										
										
											2023-01-08 11:43:08 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									"context" 
							 
						 
					
						
							
								
									
										
										
										
											2024-01-17 14:54:30 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									"slices" 
							 
						 
					
						
							
								
									
										
										
										
											2021-06-19 11:18:55 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
									"time" 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2023-01-02 13:10:50 +01:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									apiutil  "github.com/superseriousbusiness/gotosocial/internal/api/util" 
							 
						 
					
						
							
								
									
										
										
										
											2022-06-08 20:38:03 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									"github.com/superseriousbusiness/gotosocial/internal/gtserror" 
							 
						 
					
						
							
								
									
										
										
										
											2023-03-11 02:10:58 -08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" 
							 
						 
					
						
							
								
									
										
										
										
											2024-02-20 18:07:49 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									"github.com/superseriousbusiness/gotosocial/internal/id" 
							 
						 
					
						
							
								
									
										
										
										
											2022-07-19 09:47:55 +01:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									"github.com/superseriousbusiness/gotosocial/internal/log" 
							 
						 
					
						
							
								
									
										
										
										
											2023-03-11 02:10:58 -08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									"github.com/superseriousbusiness/gotosocial/internal/oauth" 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									streampkg  "github.com/superseriousbusiness/gotosocial/internal/stream" 
							 
						 
					
						
							
								
									
										
										
										
											2022-06-08 20:38:03 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2021-06-19 11:18:55 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
									"github.com/gin-gonic/gin" 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									"github.com/gorilla/websocket" 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								)  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2021-08-02 19:06:44 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								// StreamGETHandler swagger:operation GET /api/v1/streaming streamGet  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// Initiate a websocket connection for live streaming of statuses and notifications.  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// The scheme used should *always* be `wss`. The streaming basepath can be viewed at `/api/v1/instance`.  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// On a successful connection, a code `101` will be returned, which indicates that the connection is being upgraded to a secure websocket connection.  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// As long as the connection is open, various message types will be streamed into it.  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// GoToSocial will ping the connection every 30 seconds to check whether the client is still receiving.  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// If the ping fails, or something else goes wrong during transmission, then the connection will be dropped, and the client will be expected to start it again.  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//  
						 
					
						
							
								
									
										
										
										
											2022-09-28 18:30:40 +01:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								//	---  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//	tags:  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//	- streaming  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//	produces:  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//	- application/json  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//	schemes:  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//	- wss  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//	parameters:  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//	-  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//		name: access_token  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//		type: string  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//		description: Access token for the requesting account.  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//		in: query  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//		required: true  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//	-  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//		name: stream  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//		type: string  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//		description: |-  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//			Type of stream to request.  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//			Options are:  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//			`user`: receive updates for the account's home timeline.  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//			`public`: receive updates for the public timeline.  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//			`public:local`: receive updates for the local timeline.  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//			`hashtag`: receive updates for a given hashtag.  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//			`hashtag:local`: receive local updates for a given hashtag.  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//			`list`: receive updates for a certain list of accounts.  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//			`direct`: receive updates for direct messages.  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//		in: query  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//		required: true  
						 
					
						
							
								
									
										
										
										
											2023-05-25 10:37:38 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								//	-  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//		name: list  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//		type: string  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//		description: |-  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//			ID of the list to subscribe to.  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//			Only used if stream type is 'list'.  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//		in: query  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//	-  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//		name: tag  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//		type: string  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//		description: |-  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//			Name of the tag to subscribe to.  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//			Only used if stream type is 'hashtag' or 'hashtag:local'.  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//		in: query  
						 
					
						
							
								
									
										
										
										
											2022-09-28 18:30:40 +01:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								//  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//	security:  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//	- OAuth2 Bearer:  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//		- read:streaming  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//	responses:  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//		'101':  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//			schema:  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//				type: object  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//				properties:  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//					stream:  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//						type: array  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//						items:  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//							type: string  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//							enum:  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//							- user  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//							- public  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//							- public:local  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//							- hashtag  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//							- hashtag:local  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//							- list  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//							- direct  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//					event:  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//						description: |-  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//							The type of event being received.  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//							`update`: a new status has been received.  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//							`notification`: a new notification has been received.  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//							`delete`: a status has been deleted.  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//							`filters_changed`: not implemented.  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//						type: string  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//						enum:  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//						- update  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//						- notification  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//						- delete  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//						- filters_changed  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//					payload:  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//						description: |-  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//							The payload of the streamed message.  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//							Different depending on the `event` type.  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//							If present, it should be parsed as a string.  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//							If `event` = `update`, then the payload will be a JSON string of a status.  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//							If `event` = `notification`, then the payload will be a JSON string of a notification.  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//							If `event` = `delete`, then the payload will be a status ID.  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//						type: string  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//						example: "{\"id\":\"01FC3TZ5CFG6H65GCKCJRKA669\",\"created_at\":\"2021-08-02T16:25:52Z\",\"sensitive\":false,\"spoiler_text\":\"\",\"visibility\":\"public\",\"language\":\"en\",\"uri\":\"https://gts.superseriousbusiness.org/users/dumpsterqueer/statuses/01FC3TZ5CFG6H65GCKCJRKA669\",\"url\":\"https://gts.superseriousbusiness.org/@dumpsterqueer/statuses/01FC3TZ5CFG6H65GCKCJRKA669\",\"replies_count\":0,\"reblogs_count\":0,\"favourites_count\":0,\"favourited\":false,\"reblogged\":false,\"muted\":false,\"bookmarked\":fals…//gts.superseriousbusiness.org/fileserver/01JNN207W98SGG3CBJ76R5MVDN/header/original/019036W043D8FXPJKSKCX7G965.png\",\"header_static\":\"https://gts.superseriousbusiness.org/fileserver/01JNN207W98SGG3CBJ76R5MVDN/header/small/019036W043D8FXPJKSKCX7G965.png\",\"followers_count\":33,\"following_count\":28,\"statuses_count\":126,\"last_status_at\":\"2021-08-02T16:25:52Z\",\"emojis\":[],\"fields\":[]},\"media_attachments\":[],\"mentions\":[],\"tags\":[],\"emojis\":[],\"card\":null,\"poll\":null,\"text\":\"a\"}"  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//		'401':  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//			description: unauthorized  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//		'400':  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//			description: bad request  
						 
					
						
							
								
									
										
										
										
											2021-06-19 11:18:55 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								func  ( m  * Module )  StreamGETHandler ( c  * gin . Context )  {  
						 
					
						
							
								
									
										
										
										
											2023-07-04 12:55:10 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									var  ( 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										account      * gtsmodel . Account 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										errWithCode  gtserror . WithCode 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									) 
							 
						 
					
						
							
								
									
										
										
										
											2023-01-08 11:43:08 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2023-07-04 12:55:10 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									// Try query param access token. 
							 
						 
					
						
							
								
									
										
										
										
											2023-03-11 02:10:58 -08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									token  :=  c . Query ( AccessTokenQueryKey ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  token  ==  ""  { 
							 
						 
					
						
							
								
									
										
										
										
											2023-07-04 12:55:10 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
										// Try fallback HTTP header provided token. 
							 
						 
					
						
							
								
									
										
										
										
											2023-03-11 02:10:58 -08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
										token  =  c . GetHeader ( AccessTokenHeader ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  token  !=  ""  { 
							 
						 
					
						
							
								
									
										
										
										
											2023-11-28 11:05:07 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2023-07-04 12:55:10 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
										// Token was provided, use it to authorize stream. 
							 
						 
					
						
							
								
									
										
										
										
											2023-03-11 02:10:58 -08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
										account ,  errWithCode  =  m . processor . Stream ( ) . Authorize ( c . Request . Context ( ) ,  token ) 
							 
						 
					
						
							
								
									
										
										
										
											2023-11-28 11:05:07 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
										if  errWithCode  !=  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											apiutil . ErrorHandler ( c ,  errWithCode ,  m . processor . InstanceGetV1 ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											return 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2023-03-11 02:10:58 -08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									}  else  { 
							 
						 
					
						
							
								
									
										
										
										
											2023-11-28 11:05:07 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2023-07-04 12:55:10 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
										// No explicit token was provided: 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										// try regular oauth as a last resort. 
							 
						 
					
						
							
								
									
										
										
										
											2023-11-28 11:05:07 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
										authed ,  err  :=  oauth . Authed ( c ,  true ,  true ,  true ,  true ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										if  err  !=  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											errWithCode  :=  gtserror . NewErrorUnauthorized ( err ,  err . Error ( ) ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											apiutil . ErrorHandler ( c ,  errWithCode ,  m . processor . InstanceGetV1 ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											return 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
									
										
										
										
											2023-07-04 12:55:10 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2023-11-28 11:05:07 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
										// Set the auth'ed account. 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										account  =  authed . Account 
							 
						 
					
						
							
								
									
										
										
										
											2021-06-19 11:18:55 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2024-03-13 13:53:29 +01:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									if  account . IsMoving ( )  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										// Moving accounts can't 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										// use streaming endpoints. 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										apiutil . NotFoundAfterMove ( c ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										return 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2023-07-04 12:55:10 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									// Get the initial requested stream type, if there is one. 
							 
						 
					
						
							
								
									
										
										
										
											2023-03-11 02:10:58 -08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									streamType  :=  c . Query ( StreamQueryKey ) 
							 
						 
					
						
							
								
									
										
										
										
											2023-07-04 12:55:10 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									// By appending other query params to the streamType, we 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									// can allow streaming for specific list IDs or hashtags. 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									// The streamType in this case will end up looking like 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									// `hashtag:example` or `list:01H3YF48G8B7KTPQFS8D2QBVG8`. 
							 
						 
					
						
							
								
									
										
										
										
											2023-05-25 10:37:38 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									if  list  :=  c . Query ( StreamListKey ) ;  list  !=  ""  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										streamType  +=  ":"  +  list 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									}  else  if  tag  :=  c . Query ( StreamTagKey ) ;  tag  !=  ""  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										streamType  +=  ":"  +  tag 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2023-07-04 12:55:10 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									// Open a stream with the processor; this lets processor 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									// functions pass messages into a channel, which we can 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									// then read from and put into a websockets connection. 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									stream ,  errWithCode  :=  m . processor . Stream ( ) . Open ( 
							 
						 
					
						
							
								
									
										
										
										
											2024-02-20 18:07:49 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
										c . Request . Context ( ) ,  // this ctx is only used for logging 
							 
						 
					
						
							
								
									
										
										
										
											2023-07-04 12:55:10 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
										account , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										streamType , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									) 
							 
						 
					
						
							
								
									
										
										
										
											2022-06-08 20:38:03 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									if  errWithCode  !=  nil  { 
							 
						 
					
						
							
								
									
										
										
										
											2023-02-02 14:08:13 +01:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
										apiutil . ErrorHandler ( c ,  errWithCode ,  m . processor . InstanceGetV1 ) 
							 
						 
					
						
							
								
									
										
										
										
											2022-06-08 20:38:03 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
										return 
							 
						 
					
						
							
								
									
										
										
										
											2021-06-19 11:18:55 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2023-07-04 12:55:10 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									l  :=  log . 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										WithContext ( c . Request . Context ( ) ) . 
							 
						 
					
						
							
								
									
										
										
										
											2024-02-20 18:07:49 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
										WithField ( "streamID" ,  id . NewULID ( ) ) . 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										WithField ( "username" ,  account . Username ) 
							 
						 
					
						
							
								
									
										
										
										
											2022-06-08 20:38:03 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2023-07-04 12:55:10 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									// Upgrade the incoming HTTP request. This hijacks the 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									// underlying connection and reuses it for the websocket 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									// (non-http) protocol. 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									// 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									// If the upgrade fails, then Upgrade replies to the client 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									// with an HTTP error response. 
							 
						 
					
						
							
								
									
										
										
										
											2023-01-08 11:43:08 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									wsConn ,  err  :=  m . wsUpgrade . Upgrade ( c . Writer ,  c . Request ,  nil ) 
							 
						 
					
						
							
								
									
										
										
										
											2021-06-19 11:18:55 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
									if  err  !=  nil  { 
							 
						 
					
						
							
								
									
										
										
										
											2023-01-08 11:43:08 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
										l . Errorf ( "error upgrading websocket connection: %v" ,  err ) 
							 
						 
					
						
							
								
									
										
										
										
											2024-02-20 18:07:49 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
										stream . Close ( ) 
							 
						 
					
						
							
								
									
										
										
										
											2021-06-19 11:18:55 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
										return 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2023-07-04 12:55:10 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									// We perform the main websocket rw loops in a separate 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									// goroutine in order to let the upgrade handler return. 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									// This prevents the upgrade handler from holding open any 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									// throttle / rate-limit request tokens which could become 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									// problematic on instances with multiple users. 
							 
						 
					
						
							
								
									
										
										
										
											2024-02-20 18:07:49 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									go  m . handleWSConn ( & l ,  wsConn ,  stream ) 
							 
						 
					
						
							
								
									
										
										
										
											2023-07-04 12:55:10 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// handleWSConn handles a two-way websocket streaming connection.  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// It will both read messages from the connection, and push messages  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// into the connection. If any errors are encountered while reading  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// or writing (including expected errors like clients leaving), the  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// connection will be closed.  
						 
					
						
							
								
									
										
										
										
											2024-02-20 18:07:49 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								func  ( m  * Module )  handleWSConn ( l  * log . Entry ,  wsConn  * websocket . Conn ,  stream  * streampkg . Stream )  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									l . Info ( "opened websocket connection" ) 
							 
						 
					
						
							
								
									
										
										
										
											2023-07-04 12:55:10 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2024-02-20 18:07:49 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									// Create new async context with cancel. 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									ctx ,  cncl  :=  context . WithCancel ( context . Background ( ) ) 
							 
						 
					
						
							
								
									
										
										
										
											2023-07-04 12:55:10 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2023-01-08 11:43:08 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									go  func ( )  { 
							 
						 
					
						
							
								
									
										
										
										
											2024-02-20 18:07:49 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
										defer  cncl ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										// Read messages from websocket to server. 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										m . readFromWSConn ( ctx ,  wsConn ,  stream ,  l ) 
							 
						 
					
						
							
								
									
										
										
										
											2023-07-04 12:55:10 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									} ( ) 
							 
						 
					
						
							
								
									
										
										
										
											2023-01-08 11:43:08 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2023-07-04 12:55:10 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									go  func ( )  { 
							 
						 
					
						
							
								
									
										
										
										
											2024-02-20 18:07:49 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
										defer  cncl ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										// Write messages from processor in websocket conn. 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										m . writeToWSConn ( ctx ,  wsConn ,  stream ,  m . dTicker ,  l ) 
							 
						 
					
						
							
								
									
										
										
										
											2023-07-04 12:55:10 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									} ( ) 
							 
						 
					
						
							
								
									
										
										
										
											2023-01-08 11:43:08 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2024-02-20 18:07:49 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									// Wait for ctx 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									// to be closed. 
							 
						 
					
						
							
								
									
										
										
										
											2023-07-04 12:55:10 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									<- ctx . Done ( ) 
							 
						 
					
						
							
								
									
										
										
										
											2023-01-08 11:43:08 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2024-02-20 18:07:49 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									// Close stream 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									// straightaway. 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									stream . Close ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2023-07-04 12:55:10 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									// Tidy up underlying websocket connection. 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  err  :=  wsConn . Close ( ) ;  err  !=  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										l . Errorf ( "error closing websocket connection: %v" ,  err ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
									
										
										
										
											2021-06-19 11:18:55 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2023-07-04 12:55:10 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									l . Info ( "closed websocket connection" ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
									
										
										
										
											2023-01-08 11:43:08 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2023-07-04 12:55:10 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								// readFromWSConn reads control messages coming in from the given  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// websockets connection, and modifies the subscription StreamTypes  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// of the given stream accordingly after acquiring a lock on it.  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// This is a blocking function; will return only on read error or  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// if the given context is canceled.  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								func  ( m  * Module )  readFromWSConn (  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									ctx  context . Context , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									wsConn  * websocket . Conn , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									stream  * streampkg . Stream , 
							 
						 
					
						
							
								
									
										
										
										
											2024-02-20 18:07:49 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									l  * log . Entry , 
							 
						 
					
						
							
								
									
										
										
										
											2023-07-04 12:55:10 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								)  {  
						 
					
						
							
								
									
										
										
										
											2023-05-25 10:37:38 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2023-07-04 12:55:10 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									for  { 
							 
						 
					
						
							
								
									
										
										
										
											2024-02-20 18:07:49 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
										var  msg  struct  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											Type    string  ` json:"type" ` 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											Stream  string  ` json:"stream" ` 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											List    string  ` json:"list,omitempty" ` 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
									
										
										
										
											2023-03-11 02:10:58 -08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2024-02-20 18:07:49 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
										// Read JSON objects from the client and act on them. 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										if  err  :=  wsConn . ReadJSON ( & msg ) ;  err  !=  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											// Only log an error if something weird happened. 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											// See: https://www.rfc-editor.org/rfc/rfc6455.html#section-11.7 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											if  ! websocket . IsCloseError ( err ,  [ ] int { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												websocket . CloseNormalClosure , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												websocket . CloseGoingAway , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												websocket . CloseNoStatusReceived , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											} ... )  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												l . Errorf ( "error during websocket read: %v" ,  err ) 
							 
						 
					
						
							
								
									
										
										
										
											2023-07-04 12:55:10 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
											} 
							 
						 
					
						
							
								
									
										
										
										
											2023-05-25 10:37:38 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2024-02-20 18:07:49 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
											// The connection is gone; no 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											// further streaming possible. 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											break 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
									
										
										
										
											2023-01-08 11:43:08 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2024-02-20 18:07:49 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
										// Messages *from* the WS connection are infrequent 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										// and usually interesting, so log this at info. 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										l . Infof ( "received websocket message: %+v" ,  msg ) 
							 
						 
					
						
							
								
									
										
										
										
											2023-01-08 11:43:08 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2024-02-20 18:07:49 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
										// Ignore if the updateStreamType is unknown (or missing), 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										// so a bad client can't cause extra memory allocations 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										if  ! slices . Contains ( streampkg . AllStatusTimelines ,  msg . Stream )  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											l . Warnf ( "unknown 'stream' field: %v" ,  msg ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											continue 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
									
										
										
										
											2023-07-04 12:55:10 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2024-02-20 18:07:49 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
										if  msg . List  !=  ""  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											// If a list is given, add this to 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											// the stream name as this is how we 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											// we track stream types internally. 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											msg . Stream  +=  ":"  +  msg . List 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
									
										
										
										
											2023-07-04 12:55:10 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2024-02-20 18:07:49 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
										switch  msg . Type  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										case  "subscribe" : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											stream . Subscribe ( msg . Stream ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										case  "unsubscribe" : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											stream . Unsubscribe ( msg . Stream ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										default : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											l . Warnf ( "invalid 'type' field: %v" ,  msg ) 
							 
						 
					
						
							
								
									
										
										
										
											2021-06-19 11:18:55 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
									
										
										
										
											2023-07-04 12:55:10 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2024-02-20 18:07:49 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									l . Debug ( "finished websocket read" ) 
							 
						 
					
						
							
								
									
										
										
										
											2023-07-04 12:55:10 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// writeToWSConn receives messages coming from the processor via the  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// given stream, and writes them into the given websockets connection.  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// This function also handles sending ping messages into the websockets  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// connection to keep it alive when no other activity occurs.  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// This is a blocking function; will return only on write error or  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// if the given context is canceled.  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								func  ( m  * Module )  writeToWSConn (  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									ctx  context . Context , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									wsConn  * websocket . Conn , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									stream  * streampkg . Stream , 
							 
						 
					
						
							
								
									
										
										
										
											2024-02-20 18:07:49 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									ping  time . Duration , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									l  * log . Entry , 
							 
						 
					
						
							
								
									
										
										
										
											2023-07-04 12:55:10 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								)  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									for  { 
							 
						 
					
						
							
								
									
										
										
										
											2024-02-20 18:07:49 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
										// Wrap context with timeout to send a ping. 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										pingctx ,  cncl  :=  context . WithTimeout ( ctx ,  ping ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										// Block on receipt of msg. 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										msg ,  ok  :=  stream . Recv ( pingctx ) 
							 
						 
					
						
							
								
									
										
										
										
											2023-07-04 12:55:10 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2024-02-20 18:07:49 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
										// Check if cancel because ping. 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										pinged  :=  ( pingctx . Err ( )  !=  nil ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										cncl ( ) 
							 
						 
					
						
							
								
									
										
										
										
											2023-07-04 12:55:10 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2024-02-20 18:07:49 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
										switch  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										case  ! ok  &&  pinged : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											// The ping context timed out! 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											l . Trace ( "writing websocket ping" ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											// Wrapped context time-out, send a keep-alive "ping". 
							 
						 
					
						
							
								
									
										
										
										
											2023-07-04 12:55:10 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
											if  err  :=  wsConn . WriteControl ( websocket . PingMessage ,  nil ,  time . Time { } ) ;  err  !=  nil  { 
							 
						 
					
						
							
								
									
										
										
										
											2024-02-20 18:07:49 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
												l . Debugf ( "error writing websocket ping: %v" ,  err ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												break 
							 
						 
					
						
							
								
									
										
										
										
											2023-07-04 12:55:10 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
											} 
							 
						 
					
						
							
								
									
										
										
										
											2024-02-20 18:07:49 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										case  ! ok : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											// Stream was 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											// closed. 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											return 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										l . Trace ( "writing websocket message: %+v" ,  msg ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										// Received a new message from the processor. 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										if  err  :=  wsConn . WriteJSON ( msg ) ;  err  !=  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											l . Debugf ( "error writing websocket message: %v" ,  err ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											break 
							 
						 
					
						
							
								
									
										
										
										
											2023-07-04 12:55:10 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2024-02-20 18:07:49 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									l . Debug ( "finished websocket write" ) 
							 
						 
					
						
							
								
									
										
										
										
											2021-06-19 11:18:55 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								}