| 
									
										
										
										
											2023-09-07 13:20:37 +02:00
										 |  |  | /* | 
					
						
							|  |  |  |  * | 
					
						
							|  |  |  |  * Copyright 2023 gRPC authors. | 
					
						
							|  |  |  |  * | 
					
						
							|  |  |  |  * Licensed under the Apache License, Version 2.0 (the "License"); | 
					
						
							|  |  |  |  * you may not use this file except in compliance with the License. | 
					
						
							|  |  |  |  * You may obtain a copy of the License at | 
					
						
							|  |  |  |  * | 
					
						
							|  |  |  |  *     http://www.apache.org/licenses/LICENSE-2.0 | 
					
						
							|  |  |  |  * | 
					
						
							|  |  |  |  * Unless required by applicable law or agreed to in writing, software | 
					
						
							|  |  |  |  * distributed under the License is distributed on an "AS IS" BASIS, | 
					
						
							|  |  |  |  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
					
						
							|  |  |  |  * See the License for the specific language governing permissions and | 
					
						
							|  |  |  |  * limitations under the License. | 
					
						
							|  |  |  |  * | 
					
						
							|  |  |  |  */ | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | package grpcsync | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							|  |  |  | 	"context" | 
					
						
							|  |  |  | 	"sync" | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Subscriber represents an entity that is subscribed to messages published on | 
					
						
							|  |  |  | // a PubSub. It wraps the callback to be invoked by the PubSub when a new | 
					
						
							|  |  |  | // message is published. | 
					
						
							|  |  |  | type Subscriber interface { | 
					
						
							|  |  |  | 	// OnMessage is invoked when a new message is published. Implementations | 
					
						
							|  |  |  | 	// must not block in this method. | 
					
						
							| 
									
										
										
										
											2023-09-18 13:47:28 +01:00
										 |  |  | 	OnMessage(msg any) | 
					
						
							| 
									
										
										
										
											2023-09-07 13:20:37 +02:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // PubSub is a simple one-to-many publish-subscribe system that supports | 
					
						
							|  |  |  | // messages of arbitrary type. It guarantees that messages are delivered in | 
					
						
							|  |  |  | // the same order in which they were published. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // Publisher invokes the Publish() method to publish new messages, while | 
					
						
							|  |  |  | // subscribers interested in receiving these messages register a callback | 
					
						
							|  |  |  | // via the Subscribe() method. | 
					
						
							|  |  |  | // | 
					
						
							| 
									
										
										
										
											2023-09-18 13:47:28 +01:00
										 |  |  | // Once a PubSub is stopped, no more messages can be published, but any pending | 
					
						
							|  |  |  | // published messages will be delivered to the subscribers.  Done may be used | 
					
						
							|  |  |  | // to determine when all published messages have been delivered. | 
					
						
							| 
									
										
										
										
											2023-09-07 13:20:37 +02:00
										 |  |  | type PubSub struct { | 
					
						
							| 
									
										
										
										
											2023-09-18 13:47:28 +01:00
										 |  |  | 	cs *CallbackSerializer | 
					
						
							| 
									
										
										
										
											2023-09-07 13:20:37 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	// Access to the below fields are guarded by this mutex. | 
					
						
							|  |  |  | 	mu          sync.Mutex | 
					
						
							| 
									
										
										
										
											2023-09-18 13:47:28 +01:00
										 |  |  | 	msg         any | 
					
						
							| 
									
										
										
										
											2023-09-07 13:20:37 +02:00
										 |  |  | 	subscribers map[Subscriber]bool | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-09-18 13:47:28 +01:00
										 |  |  | // NewPubSub returns a new PubSub instance.  Users should cancel the | 
					
						
							|  |  |  | // provided context to shutdown the PubSub. | 
					
						
							|  |  |  | func NewPubSub(ctx context.Context) *PubSub { | 
					
						
							| 
									
										
										
										
											2023-09-07 13:20:37 +02:00
										 |  |  | 	return &PubSub{ | 
					
						
							|  |  |  | 		cs:          NewCallbackSerializer(ctx), | 
					
						
							|  |  |  | 		subscribers: map[Subscriber]bool{}, | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Subscribe registers the provided Subscriber to the PubSub. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // If the PubSub contains a previously published message, the Subscriber's | 
					
						
							|  |  |  | // OnMessage() callback will be invoked asynchronously with the existing | 
					
						
							|  |  |  | // message to begin with, and subsequently for every newly published message. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // The caller is responsible for invoking the returned cancel function to | 
					
						
							|  |  |  | // unsubscribe itself from the PubSub. | 
					
						
							|  |  |  | func (ps *PubSub) Subscribe(sub Subscriber) (cancel func()) { | 
					
						
							|  |  |  | 	ps.mu.Lock() | 
					
						
							|  |  |  | 	defer ps.mu.Unlock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	ps.subscribers[sub] = true | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	if ps.msg != nil { | 
					
						
							|  |  |  | 		msg := ps.msg | 
					
						
							|  |  |  | 		ps.cs.Schedule(func(context.Context) { | 
					
						
							|  |  |  | 			ps.mu.Lock() | 
					
						
							|  |  |  | 			defer ps.mu.Unlock() | 
					
						
							|  |  |  | 			if !ps.subscribers[sub] { | 
					
						
							|  |  |  | 				return | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			sub.OnMessage(msg) | 
					
						
							|  |  |  | 		}) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return func() { | 
					
						
							|  |  |  | 		ps.mu.Lock() | 
					
						
							|  |  |  | 		defer ps.mu.Unlock() | 
					
						
							|  |  |  | 		delete(ps.subscribers, sub) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Publish publishes the provided message to the PubSub, and invokes | 
					
						
							|  |  |  | // callbacks registered by subscribers asynchronously. | 
					
						
							| 
									
										
										
										
											2023-09-18 13:47:28 +01:00
										 |  |  | func (ps *PubSub) Publish(msg any) { | 
					
						
							| 
									
										
										
										
											2023-09-07 13:20:37 +02:00
										 |  |  | 	ps.mu.Lock() | 
					
						
							|  |  |  | 	defer ps.mu.Unlock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	ps.msg = msg | 
					
						
							|  |  |  | 	for sub := range ps.subscribers { | 
					
						
							|  |  |  | 		s := sub | 
					
						
							|  |  |  | 		ps.cs.Schedule(func(context.Context) { | 
					
						
							|  |  |  | 			ps.mu.Lock() | 
					
						
							|  |  |  | 			defer ps.mu.Unlock() | 
					
						
							|  |  |  | 			if !ps.subscribers[s] { | 
					
						
							|  |  |  | 				return | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			s.OnMessage(msg) | 
					
						
							|  |  |  | 		}) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-09-18 13:47:28 +01:00
										 |  |  | // Done returns a channel that is closed after the context passed to NewPubSub | 
					
						
							|  |  |  | // is canceled and all updates have been sent to subscribers. | 
					
						
							|  |  |  | func (ps *PubSub) Done() <-chan struct{} { | 
					
						
							|  |  |  | 	return ps.cs.Done() | 
					
						
							| 
									
										
										
										
											2023-09-07 13:20:37 +02:00
										 |  |  | } |