| 
									
										
										
										
											2023-05-09 19:19:48 +02:00
										 |  |  | /* | 
					
						
							|  |  |  |  * | 
					
						
							|  |  |  |  * Copyright 2017 gRPC authors. | 
					
						
							|  |  |  |  * | 
					
						
							|  |  |  |  * Licensed under the Apache License, Version 2.0 (the "License"); | 
					
						
							|  |  |  |  * you may not use this file except in compliance with the License. | 
					
						
							|  |  |  |  * You may obtain a copy of the License at | 
					
						
							|  |  |  |  * | 
					
						
							|  |  |  |  *     http://www.apache.org/licenses/LICENSE-2.0 | 
					
						
							|  |  |  |  * | 
					
						
							|  |  |  |  * Unless required by applicable law or agreed to in writing, software | 
					
						
							|  |  |  |  * distributed under the License is distributed on an "AS IS" BASIS, | 
					
						
							|  |  |  |  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
					
						
							|  |  |  |  * See the License for the specific language governing permissions and | 
					
						
							|  |  |  |  * limitations under the License. | 
					
						
							|  |  |  |  * | 
					
						
							|  |  |  |  */ | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | package grpc | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							|  |  |  | 	"context" | 
					
						
							|  |  |  | 	"fmt" | 
					
						
							|  |  |  | 	"sync" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	"google.golang.org/grpc/balancer" | 
					
						
							| 
									
										
										
										
											2025-02-06 12:14:37 +01:00
										 |  |  | 	"google.golang.org/grpc/codes" | 
					
						
							| 
									
										
										
										
											2023-05-09 19:19:48 +02:00
										 |  |  | 	"google.golang.org/grpc/connectivity" | 
					
						
							| 
									
										
										
										
											2024-09-16 11:06:00 +02:00
										 |  |  | 	"google.golang.org/grpc/internal" | 
					
						
							| 
									
										
										
										
											2023-05-09 19:19:48 +02:00
										 |  |  | 	"google.golang.org/grpc/internal/balancer/gracefulswitch" | 
					
						
							|  |  |  | 	"google.golang.org/grpc/internal/channelz" | 
					
						
							|  |  |  | 	"google.golang.org/grpc/internal/grpcsync" | 
					
						
							|  |  |  | 	"google.golang.org/grpc/resolver" | 
					
						
							| 
									
										
										
										
											2025-02-06 12:14:37 +01:00
										 |  |  | 	"google.golang.org/grpc/status" | 
					
						
							| 
									
										
										
										
											2023-09-07 13:20:37 +02:00
										 |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-09-16 11:06:00 +02:00
										 |  |  | var setConnectedAddress = internal.SetConnectedAddress.(func(*balancer.SubConnState, resolver.Address)) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-05-09 19:19:48 +02:00
										 |  |  | // ccBalancerWrapper sits between the ClientConn and the Balancer. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // ccBalancerWrapper implements methods corresponding to the ones on the | 
					
						
							|  |  |  | // balancer.Balancer interface. The ClientConn is free to call these methods | 
					
						
							|  |  |  | // concurrently and the ccBalancerWrapper ensures that calls from the ClientConn | 
					
						
							| 
									
										
										
										
											2024-03-11 15:34:34 +01:00
										 |  |  | // to the Balancer happen in order by performing them in the serializer, without | 
					
						
							|  |  |  | // any mutexes held. | 
					
						
							| 
									
										
										
										
											2023-05-09 19:19:48 +02:00
										 |  |  | // | 
					
						
							|  |  |  | // ccBalancerWrapper also implements the balancer.ClientConn interface and is | 
					
						
							|  |  |  | // passed to the Balancer implementations. It invokes unexported methods on the | 
					
						
							|  |  |  | // ClientConn to handle these calls from the Balancer. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // It uses the gracefulswitch.Balancer internally to ensure that balancer | 
					
						
							|  |  |  | // switches happen in a graceful manner. | 
					
						
							|  |  |  | type ccBalancerWrapper struct { | 
					
						
							| 
									
										
										
										
											2023-09-07 13:20:37 +02:00
										 |  |  | 	// The following fields are initialized when the wrapper is created and are | 
					
						
							|  |  |  | 	// read-only afterwards, and therefore can be accessed without a mutex. | 
					
						
							| 
									
										
										
										
											2024-03-11 15:34:34 +01:00
										 |  |  | 	cc               *ClientConn | 
					
						
							|  |  |  | 	opts             balancer.BuildOptions | 
					
						
							|  |  |  | 	serializer       *grpcsync.CallbackSerializer | 
					
						
							|  |  |  | 	serializerCancel context.CancelFunc | 
					
						
							| 
									
										
										
										
											2023-09-07 13:20:37 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-03-11 15:34:34 +01:00
										 |  |  | 	// The following fields are only accessed within the serializer or during | 
					
						
							|  |  |  | 	// initialization. | 
					
						
							| 
									
										
										
										
											2023-05-09 19:19:48 +02:00
										 |  |  | 	curBalancerName string | 
					
						
							| 
									
										
										
										
											2024-03-11 15:34:34 +01:00
										 |  |  | 	balancer        *gracefulswitch.Balancer | 
					
						
							| 
									
										
										
										
											2023-05-09 19:19:48 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-03-11 15:34:34 +01:00
										 |  |  | 	// The following field is protected by mu.  Caller must take cc.mu before | 
					
						
							|  |  |  | 	// taking mu. | 
					
						
							|  |  |  | 	mu     sync.Mutex | 
					
						
							|  |  |  | 	closed bool | 
					
						
							| 
									
										
										
										
											2023-05-09 19:19:48 +02:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-03-11 15:34:34 +01:00
										 |  |  | // newCCBalancerWrapper creates a new balancer wrapper in idle state. The | 
					
						
							| 
									
										
										
										
											2024-04-11 11:46:18 +02:00
										 |  |  | // underlying balancer is not created until the updateClientConnState() method | 
					
						
							|  |  |  | // is invoked. | 
					
						
							| 
									
										
										
										
											2024-03-11 15:34:34 +01:00
										 |  |  | func newCCBalancerWrapper(cc *ClientConn) *ccBalancerWrapper { | 
					
						
							|  |  |  | 	ctx, cancel := context.WithCancel(cc.ctx) | 
					
						
							| 
									
										
										
										
											2023-05-09 19:19:48 +02:00
										 |  |  | 	ccb := &ccBalancerWrapper{ | 
					
						
							| 
									
										
										
										
											2024-03-11 15:34:34 +01:00
										 |  |  | 		cc: cc, | 
					
						
							|  |  |  | 		opts: balancer.BuildOptions{ | 
					
						
							| 
									
										
										
										
											2024-04-11 11:46:18 +02:00
										 |  |  | 			DialCreds:       cc.dopts.copts.TransportCredentials, | 
					
						
							|  |  |  | 			CredsBundle:     cc.dopts.copts.CredsBundle, | 
					
						
							|  |  |  | 			Dialer:          cc.dopts.copts.Dialer, | 
					
						
							|  |  |  | 			Authority:       cc.authority, | 
					
						
							|  |  |  | 			CustomUserAgent: cc.dopts.copts.UserAgent, | 
					
						
							|  |  |  | 			ChannelzParent:  cc.channelz, | 
					
						
							|  |  |  | 			Target:          cc.parsedTarget, | 
					
						
							| 
									
										
										
										
											2024-09-16 11:06:00 +02:00
										 |  |  | 			MetricsRecorder: cc.metricsRecorderList, | 
					
						
							| 
									
										
										
										
											2024-03-11 15:34:34 +01:00
										 |  |  | 		}, | 
					
						
							| 
									
										
										
										
											2023-09-07 13:20:37 +02:00
										 |  |  | 		serializer:       grpcsync.NewCallbackSerializer(ctx), | 
					
						
							|  |  |  | 		serializerCancel: cancel, | 
					
						
							| 
									
										
										
										
											2023-05-09 19:19:48 +02:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-03-11 15:34:34 +01:00
										 |  |  | 	ccb.balancer = gracefulswitch.NewBalancer(ccb, ccb.opts) | 
					
						
							| 
									
										
										
										
											2023-05-09 19:19:48 +02:00
										 |  |  | 	return ccb | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // updateClientConnState is invoked by grpc to push a ClientConnState update to | 
					
						
							| 
									
										
										
										
											2024-03-11 15:34:34 +01:00
										 |  |  | // the underlying balancer.  This is always executed from the serializer, so | 
					
						
							|  |  |  | // it is safe to call into the balancer here. | 
					
						
							| 
									
										
										
										
											2023-05-09 19:19:48 +02:00
										 |  |  | func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error { | 
					
						
							| 
									
										
										
										
											2024-03-11 15:34:34 +01:00
										 |  |  | 	errCh := make(chan error) | 
					
						
							| 
									
										
										
										
											2024-09-16 11:06:00 +02:00
										 |  |  | 	uccs := func(ctx context.Context) { | 
					
						
							| 
									
										
										
										
											2024-03-11 15:34:34 +01:00
										 |  |  | 		defer close(errCh) | 
					
						
							|  |  |  | 		if ctx.Err() != nil || ccb.balancer == nil { | 
					
						
							|  |  |  | 			return | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2024-04-11 11:46:18 +02:00
										 |  |  | 		name := gracefulswitch.ChildName(ccs.BalancerConfig) | 
					
						
							|  |  |  | 		if ccb.curBalancerName != name { | 
					
						
							|  |  |  | 			ccb.curBalancerName = name | 
					
						
							|  |  |  | 			channelz.Infof(logger, ccb.cc.channelz, "Channel switches to new LB policy %q", name) | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2024-03-11 15:34:34 +01:00
										 |  |  | 		err := ccb.balancer.UpdateClientConnState(*ccs) | 
					
						
							|  |  |  | 		if logger.V(2) && err != nil { | 
					
						
							|  |  |  | 			logger.Infof("error from balancer.UpdateClientConnState: %v", err) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		errCh <- err | 
					
						
							| 
									
										
										
										
											2023-09-07 13:20:37 +02:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-09-16 11:06:00 +02:00
										 |  |  | 	onFailure := func() { close(errCh) } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// UpdateClientConnState can race with Close, and when the latter wins, the | 
					
						
							|  |  |  | 	// serializer is closed, and the attempt to schedule the callback will fail. | 
					
						
							|  |  |  | 	// It is acceptable to ignore this failure. But since we want to handle the | 
					
						
							|  |  |  | 	// state update in a blocking fashion (when we successfully schedule the | 
					
						
							|  |  |  | 	// callback), we have to use the ScheduleOr method and not the MaybeSchedule | 
					
						
							|  |  |  | 	// method on the serializer. | 
					
						
							|  |  |  | 	ccb.serializer.ScheduleOr(uccs, onFailure) | 
					
						
							| 
									
										
										
										
											2024-03-11 15:34:34 +01:00
										 |  |  | 	return <-errCh | 
					
						
							| 
									
										
										
										
											2023-05-09 19:19:48 +02:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-03-11 15:34:34 +01:00
										 |  |  | // resolverError is invoked by grpc to push a resolver error to the underlying | 
					
						
							|  |  |  | // balancer.  The call to the balancer is executed from the serializer. | 
					
						
							| 
									
										
										
										
											2023-05-09 19:19:48 +02:00
										 |  |  | func (ccb *ccBalancerWrapper) resolverError(err error) { | 
					
						
							| 
									
										
										
										
											2024-09-16 11:06:00 +02:00
										 |  |  | 	ccb.serializer.TrySchedule(func(ctx context.Context) { | 
					
						
							| 
									
										
										
										
											2024-03-11 15:34:34 +01:00
										 |  |  | 		if ctx.Err() != nil || ccb.balancer == nil { | 
					
						
							|  |  |  | 			return | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2023-09-07 13:20:37 +02:00
										 |  |  | 		ccb.balancer.ResolverError(err) | 
					
						
							|  |  |  | 	}) | 
					
						
							| 
									
										
										
										
											2023-05-09 19:19:48 +02:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-03-11 15:34:34 +01:00
										 |  |  | // close initiates async shutdown of the wrapper.  cc.mu must be held when | 
					
						
							|  |  |  | // calling this function.  To determine the wrapper has finished shutting down, | 
					
						
							|  |  |  | // the channel should block on ccb.serializer.Done() without cc.mu held. | 
					
						
							| 
									
										
										
										
											2023-09-07 13:20:37 +02:00
										 |  |  | func (ccb *ccBalancerWrapper) close() { | 
					
						
							|  |  |  | 	ccb.mu.Lock() | 
					
						
							| 
									
										
										
										
											2024-03-11 15:34:34 +01:00
										 |  |  | 	ccb.closed = true | 
					
						
							| 
									
										
										
										
											2023-09-07 13:20:37 +02:00
										 |  |  | 	ccb.mu.Unlock() | 
					
						
							| 
									
										
										
										
											2024-04-11 11:46:18 +02:00
										 |  |  | 	channelz.Info(logger, ccb.cc.channelz, "ccBalancerWrapper: closing") | 
					
						
							| 
									
										
										
										
											2024-09-16 11:06:00 +02:00
										 |  |  | 	ccb.serializer.TrySchedule(func(context.Context) { | 
					
						
							| 
									
										
										
										
											2024-03-11 15:34:34 +01:00
										 |  |  | 		if ccb.balancer == nil { | 
					
						
							| 
									
										
										
										
											2023-09-07 13:20:37 +02:00
										 |  |  | 			return | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2024-03-11 15:34:34 +01:00
										 |  |  | 		ccb.balancer.Close() | 
					
						
							|  |  |  | 		ccb.balancer = nil | 
					
						
							| 
									
										
										
										
											2023-09-07 13:20:37 +02:00
										 |  |  | 	}) | 
					
						
							| 
									
										
										
										
											2024-03-11 15:34:34 +01:00
										 |  |  | 	ccb.serializerCancel() | 
					
						
							| 
									
										
										
										
											2023-09-07 13:20:37 +02:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-03-11 15:34:34 +01:00
										 |  |  | // exitIdle invokes the balancer's exitIdle method in the serializer. | 
					
						
							|  |  |  | func (ccb *ccBalancerWrapper) exitIdle() { | 
					
						
							| 
									
										
										
										
											2024-09-16 11:06:00 +02:00
										 |  |  | 	ccb.serializer.TrySchedule(func(ctx context.Context) { | 
					
						
							| 
									
										
										
										
											2024-03-11 15:34:34 +01:00
										 |  |  | 		if ctx.Err() != nil || ccb.balancer == nil { | 
					
						
							|  |  |  | 			return | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		ccb.balancer.ExitIdle() | 
					
						
							|  |  |  | 	}) | 
					
						
							| 
									
										
										
										
											2023-05-09 19:19:48 +02:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { | 
					
						
							| 
									
										
										
										
											2024-03-11 15:34:34 +01:00
										 |  |  | 	ccb.cc.mu.Lock() | 
					
						
							|  |  |  | 	defer ccb.cc.mu.Unlock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	ccb.mu.Lock() | 
					
						
							|  |  |  | 	if ccb.closed { | 
					
						
							|  |  |  | 		ccb.mu.Unlock() | 
					
						
							|  |  |  | 		return nil, fmt.Errorf("balancer is being closed; no new SubConns allowed") | 
					
						
							| 
									
										
										
										
											2023-09-07 13:20:37 +02:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-03-11 15:34:34 +01:00
										 |  |  | 	ccb.mu.Unlock() | 
					
						
							| 
									
										
										
										
											2023-09-07 13:20:37 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	if len(addrs) == 0 { | 
					
						
							| 
									
										
										
										
											2023-05-09 19:19:48 +02:00
										 |  |  | 		return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list") | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-03-11 15:34:34 +01:00
										 |  |  | 	ac, err := ccb.cc.newAddrConnLocked(addrs, opts) | 
					
						
							| 
									
										
										
										
											2023-05-09 19:19:48 +02:00
										 |  |  | 	if err != nil { | 
					
						
							| 
									
										
										
										
											2024-04-11 11:46:18 +02:00
										 |  |  | 		channelz.Warningf(logger, ccb.cc.channelz, "acBalancerWrapper: NewSubConn: failed to newAddrConn: %v", err) | 
					
						
							| 
									
										
										
										
											2023-05-09 19:19:48 +02:00
										 |  |  | 		return nil, err | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2023-09-18 13:47:28 +01:00
										 |  |  | 	acbw := &acBalancerWrapper{ | 
					
						
							|  |  |  | 		ccb:           ccb, | 
					
						
							|  |  |  | 		ac:            ac, | 
					
						
							|  |  |  | 		producers:     make(map[balancer.ProducerBuilder]*refCountedProducer), | 
					
						
							|  |  |  | 		stateListener: opts.StateListener, | 
					
						
							| 
									
										
										
										
											2025-02-06 12:14:37 +01:00
										 |  |  | 		healthData:    newHealthData(connectivity.Idle), | 
					
						
							| 
									
										
										
										
											2023-09-18 13:47:28 +01:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2023-05-09 19:19:48 +02:00
										 |  |  | 	ac.acbw = acbw | 
					
						
							|  |  |  | 	return acbw, nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-02-06 12:14:37 +01:00
										 |  |  | func (ccb *ccBalancerWrapper) RemoveSubConn(balancer.SubConn) { | 
					
						
							| 
									
										
										
										
											2023-09-18 13:47:28 +01:00
										 |  |  | 	// The graceful switch balancer will never call this. | 
					
						
							|  |  |  | 	logger.Errorf("ccb RemoveSubConn(%v) called unexpectedly, sc") | 
					
						
							| 
									
										
										
										
											2023-05-09 19:19:48 +02:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) { | 
					
						
							|  |  |  | 	acbw, ok := sc.(*acBalancerWrapper) | 
					
						
							|  |  |  | 	if !ok { | 
					
						
							|  |  |  | 		return | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	acbw.UpdateAddresses(addrs) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) { | 
					
						
							| 
									
										
										
										
											2024-03-11 15:34:34 +01:00
										 |  |  | 	ccb.cc.mu.Lock() | 
					
						
							|  |  |  | 	defer ccb.cc.mu.Unlock() | 
					
						
							| 
									
										
										
										
											2024-08-26 18:05:54 +02:00
										 |  |  | 	if ccb.cc.conns == nil { | 
					
						
							|  |  |  | 		// The CC has been closed; ignore this update. | 
					
						
							|  |  |  | 		return | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-03-11 15:34:34 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	ccb.mu.Lock() | 
					
						
							|  |  |  | 	if ccb.closed { | 
					
						
							|  |  |  | 		ccb.mu.Unlock() | 
					
						
							| 
									
										
										
										
											2023-09-07 13:20:37 +02:00
										 |  |  | 		return | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-03-11 15:34:34 +01:00
										 |  |  | 	ccb.mu.Unlock() | 
					
						
							| 
									
										
										
										
											2023-05-09 19:19:48 +02:00
										 |  |  | 	// Update picker before updating state.  Even though the ordering here does | 
					
						
							|  |  |  | 	// not matter, it can lead to multiple calls of Pick in the common start-up | 
					
						
							|  |  |  | 	// case where we wait for ready and then perform an RPC.  If the picker is | 
					
						
							|  |  |  | 	// updated later, we could call the "connecting" picker when the state is | 
					
						
							|  |  |  | 	// updated, and then call the "ready" picker after the picker gets updated. | 
					
						
							| 
									
										
										
										
											2024-03-11 15:34:34 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	// Note that there is no need to check if the balancer wrapper was closed, | 
					
						
							|  |  |  | 	// as we know the graceful switch LB policy will not call cc if it has been | 
					
						
							|  |  |  | 	// closed. | 
					
						
							|  |  |  | 	ccb.cc.pickerWrapper.updatePicker(s.Picker) | 
					
						
							| 
									
										
										
										
											2023-05-09 19:19:48 +02:00
										 |  |  | 	ccb.cc.csMgr.updateState(s.ConnectivityState) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (ccb *ccBalancerWrapper) ResolveNow(o resolver.ResolveNowOptions) { | 
					
						
							| 
									
										
										
										
											2024-03-11 15:34:34 +01:00
										 |  |  | 	ccb.cc.mu.RLock() | 
					
						
							|  |  |  | 	defer ccb.cc.mu.RUnlock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	ccb.mu.Lock() | 
					
						
							|  |  |  | 	if ccb.closed { | 
					
						
							|  |  |  | 		ccb.mu.Unlock() | 
					
						
							| 
									
										
										
										
											2023-09-07 13:20:37 +02:00
										 |  |  | 		return | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-03-11 15:34:34 +01:00
										 |  |  | 	ccb.mu.Unlock() | 
					
						
							|  |  |  | 	ccb.cc.resolveNowLocked(o) | 
					
						
							| 
									
										
										
										
											2023-05-09 19:19:48 +02:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (ccb *ccBalancerWrapper) Target() string { | 
					
						
							|  |  |  | 	return ccb.cc.target | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // acBalancerWrapper is a wrapper on top of ac for balancers. | 
					
						
							|  |  |  | // It implements balancer.SubConn interface. | 
					
						
							|  |  |  | type acBalancerWrapper struct { | 
					
						
							| 
									
										
										
										
											2025-02-06 12:14:37 +01:00
										 |  |  | 	internal.EnforceSubConnEmbedding | 
					
						
							| 
									
										
										
										
											2023-09-18 13:47:28 +01:00
										 |  |  | 	ac            *addrConn          // read-only | 
					
						
							|  |  |  | 	ccb           *ccBalancerWrapper // read-only | 
					
						
							|  |  |  | 	stateListener func(balancer.SubConnState) | 
					
						
							| 
									
										
										
										
											2023-09-07 13:20:37 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-02-06 12:14:37 +01:00
										 |  |  | 	producersMu sync.Mutex | 
					
						
							|  |  |  | 	producers   map[balancer.ProducerBuilder]*refCountedProducer | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Access to healthData is protected by healthMu. | 
					
						
							|  |  |  | 	healthMu sync.Mutex | 
					
						
							|  |  |  | 	// healthData is stored as a pointer to detect when the health listener is | 
					
						
							|  |  |  | 	// dropped or updated. This is required as closures can't be compared for | 
					
						
							|  |  |  | 	// equality. | 
					
						
							|  |  |  | 	healthData *healthData | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // healthData holds data related to health state reporting. | 
					
						
							|  |  |  | type healthData struct { | 
					
						
							|  |  |  | 	// connectivityState stores the most recent connectivity state delivered | 
					
						
							|  |  |  | 	// to the LB policy. This is stored to avoid sending updates when the | 
					
						
							|  |  |  | 	// SubConn has already exited connectivity state READY. | 
					
						
							|  |  |  | 	connectivityState connectivity.State | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func newHealthData(s connectivity.State) *healthData { | 
					
						
							|  |  |  | 	return &healthData{connectivityState: s} | 
					
						
							| 
									
										
										
										
											2023-05-09 19:19:48 +02:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-03-11 15:34:34 +01:00
										 |  |  | // updateState is invoked by grpc to push a subConn state update to the | 
					
						
							|  |  |  | // underlying balancer. | 
					
						
							| 
									
										
										
										
											2024-09-16 11:06:00 +02:00
										 |  |  | func (acbw *acBalancerWrapper) updateState(s connectivity.State, curAddr resolver.Address, err error) { | 
					
						
							|  |  |  | 	acbw.ccb.serializer.TrySchedule(func(ctx context.Context) { | 
					
						
							| 
									
										
										
										
											2024-03-11 15:34:34 +01:00
										 |  |  | 		if ctx.Err() != nil || acbw.ccb.balancer == nil { | 
					
						
							|  |  |  | 			return | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2025-02-06 12:14:37 +01:00
										 |  |  | 		// Invalidate all producers on any state change. | 
					
						
							|  |  |  | 		acbw.closeProducers() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-03-11 15:34:34 +01:00
										 |  |  | 		// Even though it is optional for balancers, gracefulswitch ensures | 
					
						
							|  |  |  | 		// opts.StateListener is set, so this cannot ever be nil. | 
					
						
							|  |  |  | 		// TODO: delete this comment when UpdateSubConnState is removed. | 
					
						
							| 
									
										
										
										
											2024-09-16 11:06:00 +02:00
										 |  |  | 		scs := balancer.SubConnState{ConnectivityState: s, ConnectionError: err} | 
					
						
							|  |  |  | 		if s == connectivity.Ready { | 
					
						
							|  |  |  | 			setConnectedAddress(&scs, curAddr) | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2025-02-06 12:14:37 +01:00
										 |  |  | 		// Invalidate the health listener by updating the healthData. | 
					
						
							|  |  |  | 		acbw.healthMu.Lock() | 
					
						
							|  |  |  | 		// A race may occur if a health listener is registered soon after the | 
					
						
							|  |  |  | 		// connectivity state is set but before the stateListener is called. | 
					
						
							|  |  |  | 		// Two cases may arise: | 
					
						
							|  |  |  | 		// 1. The new state is not READY: RegisterHealthListener has checks to | 
					
						
							|  |  |  | 		//    ensure no updates are sent when the connectivity state is not | 
					
						
							|  |  |  | 		//    READY. | 
					
						
							|  |  |  | 		// 2. The new state is READY: This means that the old state wasn't Ready. | 
					
						
							|  |  |  | 		//    The RegisterHealthListener API mentions that a health listener | 
					
						
							|  |  |  | 		//    must not be registered when a SubConn is not ready to avoid such | 
					
						
							|  |  |  | 		//    races. When this happens, the LB policy would get health updates | 
					
						
							|  |  |  | 		//    on the old listener. When the LB policy registers a new listener | 
					
						
							|  |  |  | 		//    on receiving the connectivity update, the health updates will be | 
					
						
							|  |  |  | 		//    sent to the new health listener. | 
					
						
							|  |  |  | 		acbw.healthData = newHealthData(scs.ConnectivityState) | 
					
						
							|  |  |  | 		acbw.healthMu.Unlock() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-09-16 11:06:00 +02:00
										 |  |  | 		acbw.stateListener(scs) | 
					
						
							| 
									
										
										
										
											2024-03-11 15:34:34 +01:00
										 |  |  | 	}) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-09-07 13:20:37 +02:00
										 |  |  | func (acbw *acBalancerWrapper) String() string { | 
					
						
							| 
									
										
										
										
											2024-04-11 11:46:18 +02:00
										 |  |  | 	return fmt.Sprintf("SubConn(id:%d)", acbw.ac.channelz.ID) | 
					
						
							| 
									
										
										
										
											2023-09-07 13:20:37 +02:00
										 |  |  | } | 
					
						
							| 
									
										
										
										
											2023-05-09 19:19:48 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-09-07 13:20:37 +02:00
										 |  |  | func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) { | 
					
						
							|  |  |  | 	acbw.ac.updateAddrs(addrs) | 
					
						
							| 
									
										
										
										
											2023-05-09 19:19:48 +02:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (acbw *acBalancerWrapper) Connect() { | 
					
						
							|  |  |  | 	go acbw.ac.connect() | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-09-18 13:47:28 +01:00
										 |  |  | func (acbw *acBalancerWrapper) Shutdown() { | 
					
						
							| 
									
										
										
										
											2025-02-06 12:14:37 +01:00
										 |  |  | 	acbw.closeProducers() | 
					
						
							| 
									
										
										
										
											2024-03-11 15:34:34 +01:00
										 |  |  | 	acbw.ccb.cc.removeAddrConn(acbw.ac, errConnDrain) | 
					
						
							| 
									
										
										
										
											2023-09-18 13:47:28 +01:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-05-09 19:19:48 +02:00
										 |  |  | // NewStream begins a streaming RPC on the addrConn.  If the addrConn is not | 
					
						
							| 
									
										
										
										
											2023-09-07 13:20:37 +02:00
										 |  |  | // ready, blocks until it is or ctx expires.  Returns an error when the context | 
					
						
							|  |  |  | // expires or the addrConn is shut down. | 
					
						
							| 
									
										
										
										
											2023-05-09 19:19:48 +02:00
										 |  |  | func (acbw *acBalancerWrapper) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) { | 
					
						
							| 
									
										
										
										
											2025-02-06 12:14:37 +01:00
										 |  |  | 	transport := acbw.ac.getReadyTransport() | 
					
						
							|  |  |  | 	if transport == nil { | 
					
						
							|  |  |  | 		return nil, status.Errorf(codes.Unavailable, "SubConn state is not Ready") | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-05-09 19:19:48 +02:00
										 |  |  | 	} | 
					
						
							|  |  |  | 	return newNonRetryClientStream(ctx, desc, method, transport, acbw.ac, opts...) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Invoke performs a unary RPC.  If the addrConn is not ready, returns | 
					
						
							|  |  |  | // errSubConnNotReady. | 
					
						
							| 
									
										
										
										
											2023-09-18 13:47:28 +01:00
										 |  |  | func (acbw *acBalancerWrapper) Invoke(ctx context.Context, method string, args any, reply any, opts ...CallOption) error { | 
					
						
							| 
									
										
										
										
											2023-05-09 19:19:48 +02:00
										 |  |  | 	cs, err := acbw.NewStream(ctx, unaryStreamDesc, method, opts...) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if err := cs.SendMsg(args); err != nil { | 
					
						
							|  |  |  | 		return err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return cs.RecvMsg(reply) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | type refCountedProducer struct { | 
					
						
							|  |  |  | 	producer balancer.Producer | 
					
						
							|  |  |  | 	refs     int    // number of current refs to the producer | 
					
						
							|  |  |  | 	close    func() // underlying producer's close function | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (acbw *acBalancerWrapper) GetOrBuildProducer(pb balancer.ProducerBuilder) (balancer.Producer, func()) { | 
					
						
							| 
									
										
										
										
											2025-02-06 12:14:37 +01:00
										 |  |  | 	acbw.producersMu.Lock() | 
					
						
							|  |  |  | 	defer acbw.producersMu.Unlock() | 
					
						
							| 
									
										
										
										
											2023-05-09 19:19:48 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	// Look up existing producer from this builder. | 
					
						
							|  |  |  | 	pData := acbw.producers[pb] | 
					
						
							|  |  |  | 	if pData == nil { | 
					
						
							|  |  |  | 		// Not found; create a new one and add it to the producers map. | 
					
						
							| 
									
										
										
										
											2025-02-06 12:14:37 +01:00
										 |  |  | 		p, closeFn := pb.Build(acbw) | 
					
						
							|  |  |  | 		pData = &refCountedProducer{producer: p, close: closeFn} | 
					
						
							| 
									
										
										
										
											2023-05-09 19:19:48 +02:00
										 |  |  | 		acbw.producers[pb] = pData | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	// Account for this new reference. | 
					
						
							|  |  |  | 	pData.refs++ | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Return a cleanup function wrapped in a OnceFunc to remove this reference | 
					
						
							|  |  |  | 	// and delete the refCountedProducer from the map if the total reference | 
					
						
							|  |  |  | 	// count goes to zero. | 
					
						
							|  |  |  | 	unref := func() { | 
					
						
							| 
									
										
										
										
											2025-02-06 12:14:37 +01:00
										 |  |  | 		acbw.producersMu.Lock() | 
					
						
							|  |  |  | 		// If closeProducers has already closed this producer instance, refs is | 
					
						
							|  |  |  | 		// set to 0, so the check after decrementing will never pass, and the | 
					
						
							|  |  |  | 		// producer will not be double-closed. | 
					
						
							| 
									
										
										
										
											2023-05-09 19:19:48 +02:00
										 |  |  | 		pData.refs-- | 
					
						
							|  |  |  | 		if pData.refs == 0 { | 
					
						
							|  |  |  | 			defer pData.close() // Run outside the acbw mutex | 
					
						
							|  |  |  | 			delete(acbw.producers, pb) | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2025-02-06 12:14:37 +01:00
										 |  |  | 		acbw.producersMu.Unlock() | 
					
						
							| 
									
										
										
										
											2023-05-09 19:19:48 +02:00
										 |  |  | 	} | 
					
						
							|  |  |  | 	return pData.producer, grpcsync.OnceFunc(unref) | 
					
						
							|  |  |  | } | 
					
						
							| 
									
										
										
										
											2025-02-06 12:14:37 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  | func (acbw *acBalancerWrapper) closeProducers() { | 
					
						
							|  |  |  | 	acbw.producersMu.Lock() | 
					
						
							|  |  |  | 	defer acbw.producersMu.Unlock() | 
					
						
							|  |  |  | 	for pb, pData := range acbw.producers { | 
					
						
							|  |  |  | 		pData.refs = 0 | 
					
						
							|  |  |  | 		pData.close() | 
					
						
							|  |  |  | 		delete(acbw.producers, pb) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // RegisterHealthListener accepts a health listener from the LB policy. It sends | 
					
						
							|  |  |  | // updates to the health listener as long as the SubConn's connectivity state | 
					
						
							|  |  |  | // doesn't change and a new health listener is not registered. To invalidate | 
					
						
							|  |  |  | // the currently registered health listener, acbw updates the healthData. If a | 
					
						
							|  |  |  | // nil listener is registered, the active health listener is dropped. | 
					
						
							|  |  |  | func (acbw *acBalancerWrapper) RegisterHealthListener(listener func(balancer.SubConnState)) { | 
					
						
							|  |  |  | 	acbw.healthMu.Lock() | 
					
						
							|  |  |  | 	defer acbw.healthMu.Unlock() | 
					
						
							|  |  |  | 	// listeners should not be registered when the connectivity state | 
					
						
							|  |  |  | 	// isn't Ready. This may happen when the balancer registers a listener | 
					
						
							|  |  |  | 	// after the connectivityState is updated, but before it is notified | 
					
						
							|  |  |  | 	// of the update. | 
					
						
							|  |  |  | 	if acbw.healthData.connectivityState != connectivity.Ready { | 
					
						
							|  |  |  | 		return | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	// Replace the health data to stop sending updates to any previously | 
					
						
							|  |  |  | 	// registered health listeners. | 
					
						
							|  |  |  | 	hd := newHealthData(connectivity.Ready) | 
					
						
							|  |  |  | 	acbw.healthData = hd | 
					
						
							|  |  |  | 	if listener == nil { | 
					
						
							|  |  |  | 		return | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	acbw.ccb.serializer.TrySchedule(func(ctx context.Context) { | 
					
						
							|  |  |  | 		if ctx.Err() != nil || acbw.ccb.balancer == nil { | 
					
						
							|  |  |  | 			return | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		// Don't send updates if a new listener is registered. | 
					
						
							|  |  |  | 		acbw.healthMu.Lock() | 
					
						
							|  |  |  | 		defer acbw.healthMu.Unlock() | 
					
						
							|  |  |  | 		curHD := acbw.healthData | 
					
						
							|  |  |  | 		if curHD != hd { | 
					
						
							|  |  |  | 			return | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		listener(balancer.SubConnState{ConnectivityState: connectivity.Ready}) | 
					
						
							|  |  |  | 	}) | 
					
						
							|  |  |  | } |