mirror of
				https://github.com/superseriousbusiness/gotosocial.git
				synced 2025-11-03 20:22:25 -06:00 
			
		
		
		
	- codeberg.org/gruf/go-ffmpreg: v0.6.9 -> v0.6.10
- github.com/ncruces/go-sqlite3: v0.27.1 -> v0.28.0
- github.com/stretchr/testify: v1.10.0 -> v1.11.1
- github.com/tdewolff/minify/v2 v2.23.11 -> v2.24.2
- go.opentelemetry.io/otel{,/*}: v1.37.0 -> v1.38.0
- go.opentelemetry.io/contrib/*: v0.62.0 -> v0.63.0
Reviewed-on: https://codeberg.org/superseriousbusiness/gotosocial/pulls/4406
Co-authored-by: kim <grufwub@gmail.com>
Co-committed-by: kim <grufwub@gmail.com>
		
	
			
		
			
				
	
	
		
			389 lines
		
	
	
	
		
			13 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			389 lines
		
	
	
	
		
			13 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
 *
 | 
						|
 * Copyright 2024 gRPC authors.
 | 
						|
 *
 | 
						|
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
 * you may not use this file except in compliance with the License.
 | 
						|
 * You may obtain a copy of the License at
 | 
						|
 *
 | 
						|
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 *
 | 
						|
 * Unless required by applicable law or agreed to in writing, software
 | 
						|
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
 * See the License for the specific language governing permissions and
 | 
						|
 * limitations under the License.
 | 
						|
 *
 | 
						|
 */
 | 
						|
 | 
						|
// Package endpointsharding implements a load balancing policy that manages
 | 
						|
// homogeneous child policies each owning a single endpoint.
 | 
						|
//
 | 
						|
// # Experimental
 | 
						|
//
 | 
						|
// Notice: This package is EXPERIMENTAL and may be changed or removed in a
 | 
						|
// later release.
 | 
						|
package endpointsharding
 | 
						|
 | 
						|
import (
 | 
						|
	"errors"
 | 
						|
	rand "math/rand/v2"
 | 
						|
	"sync"
 | 
						|
	"sync/atomic"
 | 
						|
 | 
						|
	"google.golang.org/grpc/balancer"
 | 
						|
	"google.golang.org/grpc/balancer/base"
 | 
						|
	"google.golang.org/grpc/connectivity"
 | 
						|
	"google.golang.org/grpc/resolver"
 | 
						|
)
 | 
						|
 | 
						|
var randIntN = rand.IntN
 | 
						|
 | 
						|
// ChildState is the balancer state of a child along with the endpoint which
 | 
						|
// identifies the child balancer.
 | 
						|
type ChildState struct {
 | 
						|
	Endpoint resolver.Endpoint
 | 
						|
	State    balancer.State
 | 
						|
 | 
						|
	// Balancer exposes only the ExitIdler interface of the child LB policy.
 | 
						|
	// Other methods of the child policy are called only by endpointsharding.
 | 
						|
	Balancer ExitIdler
 | 
						|
}
 | 
						|
 | 
						|
// ExitIdler provides access to only the ExitIdle method of the child balancer.
 | 
						|
type ExitIdler interface {
 | 
						|
	// ExitIdle instructs the LB policy to reconnect to backends / exit the
 | 
						|
	// IDLE state, if appropriate and possible.  Note that SubConns that enter
 | 
						|
	// the IDLE state will not reconnect until SubConn.Connect is called.
 | 
						|
	ExitIdle()
 | 
						|
}
 | 
						|
 | 
						|
// Options are the options to configure the behaviour of the
 | 
						|
// endpointsharding balancer.
 | 
						|
type Options struct {
 | 
						|
	// DisableAutoReconnect allows the balancer to keep child balancer in the
 | 
						|
	// IDLE state until they are explicitly triggered to exit using the
 | 
						|
	// ChildState obtained from the endpointsharding picker. When set to false,
 | 
						|
	// the endpointsharding balancer will automatically call ExitIdle on child
 | 
						|
	// connections that report IDLE.
 | 
						|
	DisableAutoReconnect bool
 | 
						|
}
 | 
						|
 | 
						|
// ChildBuilderFunc creates a new balancer with the ClientConn. It has the same
 | 
						|
// type as the balancer.Builder.Build method.
 | 
						|
type ChildBuilderFunc func(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer
 | 
						|
 | 
						|
// NewBalancer returns a load balancing policy that manages homogeneous child
 | 
						|
// policies each owning a single endpoint. The endpointsharding balancer
 | 
						|
// forwards the LoadBalancingConfig in ClientConn state updates to its children.
 | 
						|
func NewBalancer(cc balancer.ClientConn, opts balancer.BuildOptions, childBuilder ChildBuilderFunc, esOpts Options) balancer.Balancer {
 | 
						|
	es := &endpointSharding{
 | 
						|
		cc:           cc,
 | 
						|
		bOpts:        opts,
 | 
						|
		esOpts:       esOpts,
 | 
						|
		childBuilder: childBuilder,
 | 
						|
	}
 | 
						|
	es.children.Store(resolver.NewEndpointMap[*balancerWrapper]())
 | 
						|
	return es
 | 
						|
}
 | 
						|
 | 
						|
// endpointSharding is a balancer that wraps child balancers. It creates a child
 | 
						|
// balancer with child config for every unique Endpoint received. It updates the
 | 
						|
// child states on any update from parent or child.
 | 
						|
type endpointSharding struct {
 | 
						|
	cc           balancer.ClientConn
 | 
						|
	bOpts        balancer.BuildOptions
 | 
						|
	esOpts       Options
 | 
						|
	childBuilder ChildBuilderFunc
 | 
						|
 | 
						|
	// childMu synchronizes calls to any single child. It must be held for all
 | 
						|
	// calls into a child. To avoid deadlocks, do not acquire childMu while
 | 
						|
	// holding mu.
 | 
						|
	childMu  sync.Mutex
 | 
						|
	children atomic.Pointer[resolver.EndpointMap[*balancerWrapper]]
 | 
						|
 | 
						|
	// inhibitChildUpdates is set during UpdateClientConnState/ResolverError
 | 
						|
	// calls (calls to children will each produce an update, only want one
 | 
						|
	// update).
 | 
						|
	inhibitChildUpdates atomic.Bool
 | 
						|
 | 
						|
	// mu synchronizes access to the state stored in balancerWrappers in the
 | 
						|
	// children field. mu must not be held during calls into a child since
 | 
						|
	// synchronous calls back from the child may require taking mu, causing a
 | 
						|
	// deadlock. To avoid deadlocks, do not acquire childMu while holding mu.
 | 
						|
	mu sync.Mutex
 | 
						|
}
 | 
						|
 | 
						|
// rotateEndpoints returns a slice of all the input endpoints rotated a random
 | 
						|
// amount.
 | 
						|
func rotateEndpoints(es []resolver.Endpoint) []resolver.Endpoint {
 | 
						|
	les := len(es)
 | 
						|
	if les == 0 {
 | 
						|
		return es
 | 
						|
	}
 | 
						|
	r := randIntN(les)
 | 
						|
	// Make a copy to avoid mutating data beyond the end of es.
 | 
						|
	ret := make([]resolver.Endpoint, les)
 | 
						|
	copy(ret, es[r:])
 | 
						|
	copy(ret[les-r:], es[:r])
 | 
						|
	return ret
 | 
						|
}
 | 
						|
 | 
						|
// UpdateClientConnState creates a child for new endpoints and deletes children
 | 
						|
// for endpoints that are no longer present. It also updates all the children,
 | 
						|
// and sends a single synchronous update of the childrens' aggregated state at
 | 
						|
// the end of the UpdateClientConnState operation. If any endpoint has no
 | 
						|
// addresses it will ignore that endpoint. Otherwise, returns first error found
 | 
						|
// from a child, but fully processes the new update.
 | 
						|
func (es *endpointSharding) UpdateClientConnState(state balancer.ClientConnState) error {
 | 
						|
	es.childMu.Lock()
 | 
						|
	defer es.childMu.Unlock()
 | 
						|
 | 
						|
	es.inhibitChildUpdates.Store(true)
 | 
						|
	defer func() {
 | 
						|
		es.inhibitChildUpdates.Store(false)
 | 
						|
		es.updateState()
 | 
						|
	}()
 | 
						|
	var ret error
 | 
						|
 | 
						|
	children := es.children.Load()
 | 
						|
	newChildren := resolver.NewEndpointMap[*balancerWrapper]()
 | 
						|
 | 
						|
	// Update/Create new children.
 | 
						|
	for _, endpoint := range rotateEndpoints(state.ResolverState.Endpoints) {
 | 
						|
		if _, ok := newChildren.Get(endpoint); ok {
 | 
						|
			// Endpoint child was already created, continue to avoid duplicate
 | 
						|
			// update.
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		childBalancer, ok := children.Get(endpoint)
 | 
						|
		if ok {
 | 
						|
			// Endpoint attributes may have changed, update the stored endpoint.
 | 
						|
			es.mu.Lock()
 | 
						|
			childBalancer.childState.Endpoint = endpoint
 | 
						|
			es.mu.Unlock()
 | 
						|
		} else {
 | 
						|
			childBalancer = &balancerWrapper{
 | 
						|
				childState: ChildState{Endpoint: endpoint},
 | 
						|
				ClientConn: es.cc,
 | 
						|
				es:         es,
 | 
						|
			}
 | 
						|
			childBalancer.childState.Balancer = childBalancer
 | 
						|
			childBalancer.child = es.childBuilder(childBalancer, es.bOpts)
 | 
						|
		}
 | 
						|
		newChildren.Set(endpoint, childBalancer)
 | 
						|
		if err := childBalancer.updateClientConnStateLocked(balancer.ClientConnState{
 | 
						|
			BalancerConfig: state.BalancerConfig,
 | 
						|
			ResolverState: resolver.State{
 | 
						|
				Endpoints:  []resolver.Endpoint{endpoint},
 | 
						|
				Attributes: state.ResolverState.Attributes,
 | 
						|
			},
 | 
						|
		}); err != nil && ret == nil {
 | 
						|
			// Return first error found, and always commit full processing of
 | 
						|
			// updating children. If desired to process more specific errors
 | 
						|
			// across all endpoints, caller should make these specific
 | 
						|
			// validations, this is a current limitation for simplicity sake.
 | 
						|
			ret = err
 | 
						|
		}
 | 
						|
	}
 | 
						|
	// Delete old children that are no longer present.
 | 
						|
	for _, e := range children.Keys() {
 | 
						|
		child, _ := children.Get(e)
 | 
						|
		if _, ok := newChildren.Get(e); !ok {
 | 
						|
			child.closeLocked()
 | 
						|
		}
 | 
						|
	}
 | 
						|
	es.children.Store(newChildren)
 | 
						|
	if newChildren.Len() == 0 {
 | 
						|
		return balancer.ErrBadResolverState
 | 
						|
	}
 | 
						|
	return ret
 | 
						|
}
 | 
						|
 | 
						|
// ResolverError forwards the resolver error to all of the endpointSharding's
 | 
						|
// children and sends a single synchronous update of the childStates at the end
 | 
						|
// of the ResolverError operation.
 | 
						|
func (es *endpointSharding) ResolverError(err error) {
 | 
						|
	es.childMu.Lock()
 | 
						|
	defer es.childMu.Unlock()
 | 
						|
	es.inhibitChildUpdates.Store(true)
 | 
						|
	defer func() {
 | 
						|
		es.inhibitChildUpdates.Store(false)
 | 
						|
		es.updateState()
 | 
						|
	}()
 | 
						|
	children := es.children.Load()
 | 
						|
	for _, child := range children.Values() {
 | 
						|
		child.resolverErrorLocked(err)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (es *endpointSharding) UpdateSubConnState(balancer.SubConn, balancer.SubConnState) {
 | 
						|
	// UpdateSubConnState is deprecated.
 | 
						|
}
 | 
						|
 | 
						|
func (es *endpointSharding) Close() {
 | 
						|
	es.childMu.Lock()
 | 
						|
	defer es.childMu.Unlock()
 | 
						|
	children := es.children.Load()
 | 
						|
	for _, child := range children.Values() {
 | 
						|
		child.closeLocked()
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (es *endpointSharding) ExitIdle() {
 | 
						|
	es.childMu.Lock()
 | 
						|
	defer es.childMu.Unlock()
 | 
						|
	for _, bw := range es.children.Load().Values() {
 | 
						|
		if !bw.isClosed {
 | 
						|
			bw.child.ExitIdle()
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// updateState updates this component's state. It sends the aggregated state,
 | 
						|
// and a picker with round robin behavior with all the child states present if
 | 
						|
// needed.
 | 
						|
func (es *endpointSharding) updateState() {
 | 
						|
	if es.inhibitChildUpdates.Load() {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	var readyPickers, connectingPickers, idlePickers, transientFailurePickers []balancer.Picker
 | 
						|
 | 
						|
	es.mu.Lock()
 | 
						|
	defer es.mu.Unlock()
 | 
						|
 | 
						|
	children := es.children.Load()
 | 
						|
	childStates := make([]ChildState, 0, children.Len())
 | 
						|
 | 
						|
	for _, child := range children.Values() {
 | 
						|
		childState := child.childState
 | 
						|
		childStates = append(childStates, childState)
 | 
						|
		childPicker := childState.State.Picker
 | 
						|
		switch childState.State.ConnectivityState {
 | 
						|
		case connectivity.Ready:
 | 
						|
			readyPickers = append(readyPickers, childPicker)
 | 
						|
		case connectivity.Connecting:
 | 
						|
			connectingPickers = append(connectingPickers, childPicker)
 | 
						|
		case connectivity.Idle:
 | 
						|
			idlePickers = append(idlePickers, childPicker)
 | 
						|
		case connectivity.TransientFailure:
 | 
						|
			transientFailurePickers = append(transientFailurePickers, childPicker)
 | 
						|
			// connectivity.Shutdown shouldn't appear.
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// Construct the round robin picker based off the aggregated state. Whatever
 | 
						|
	// the aggregated state, use the pickers present that are currently in that
 | 
						|
	// state only.
 | 
						|
	var aggState connectivity.State
 | 
						|
	var pickers []balancer.Picker
 | 
						|
	if len(readyPickers) >= 1 {
 | 
						|
		aggState = connectivity.Ready
 | 
						|
		pickers = readyPickers
 | 
						|
	} else if len(connectingPickers) >= 1 {
 | 
						|
		aggState = connectivity.Connecting
 | 
						|
		pickers = connectingPickers
 | 
						|
	} else if len(idlePickers) >= 1 {
 | 
						|
		aggState = connectivity.Idle
 | 
						|
		pickers = idlePickers
 | 
						|
	} else if len(transientFailurePickers) >= 1 {
 | 
						|
		aggState = connectivity.TransientFailure
 | 
						|
		pickers = transientFailurePickers
 | 
						|
	} else {
 | 
						|
		aggState = connectivity.TransientFailure
 | 
						|
		pickers = []balancer.Picker{base.NewErrPicker(errors.New("no children to pick from"))}
 | 
						|
	} // No children (resolver error before valid update).
 | 
						|
	p := &pickerWithChildStates{
 | 
						|
		pickers:     pickers,
 | 
						|
		childStates: childStates,
 | 
						|
		next:        uint32(randIntN(len(pickers))),
 | 
						|
	}
 | 
						|
	es.cc.UpdateState(balancer.State{
 | 
						|
		ConnectivityState: aggState,
 | 
						|
		Picker:            p,
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
// pickerWithChildStates delegates to the pickers it holds in a round robin
 | 
						|
// fashion. It also contains the childStates of all the endpointSharding's
 | 
						|
// children.
 | 
						|
type pickerWithChildStates struct {
 | 
						|
	pickers     []balancer.Picker
 | 
						|
	childStates []ChildState
 | 
						|
	next        uint32
 | 
						|
}
 | 
						|
 | 
						|
func (p *pickerWithChildStates) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
 | 
						|
	nextIndex := atomic.AddUint32(&p.next, 1)
 | 
						|
	picker := p.pickers[nextIndex%uint32(len(p.pickers))]
 | 
						|
	return picker.Pick(info)
 | 
						|
}
 | 
						|
 | 
						|
// ChildStatesFromPicker returns the state of all the children managed by the
 | 
						|
// endpoint sharding balancer that created this picker.
 | 
						|
func ChildStatesFromPicker(picker balancer.Picker) []ChildState {
 | 
						|
	p, ok := picker.(*pickerWithChildStates)
 | 
						|
	if !ok {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	return p.childStates
 | 
						|
}
 | 
						|
 | 
						|
// balancerWrapper is a wrapper of a balancer. It ID's a child balancer by
 | 
						|
// endpoint, and persists recent child balancer state.
 | 
						|
type balancerWrapper struct {
 | 
						|
	// The following fields are initialized at build time and read-only after
 | 
						|
	// that and therefore do not need to be guarded by a mutex.
 | 
						|
 | 
						|
	// child contains the wrapped balancer. Access its methods only through
 | 
						|
	// methods on balancerWrapper to ensure proper synchronization
 | 
						|
	child               balancer.Balancer
 | 
						|
	balancer.ClientConn // embed to intercept UpdateState, doesn't deal with SubConns
 | 
						|
 | 
						|
	es *endpointSharding
 | 
						|
 | 
						|
	// Access to the following fields is guarded by es.mu.
 | 
						|
 | 
						|
	childState ChildState
 | 
						|
	isClosed   bool
 | 
						|
}
 | 
						|
 | 
						|
func (bw *balancerWrapper) UpdateState(state balancer.State) {
 | 
						|
	bw.es.mu.Lock()
 | 
						|
	bw.childState.State = state
 | 
						|
	bw.es.mu.Unlock()
 | 
						|
	if state.ConnectivityState == connectivity.Idle && !bw.es.esOpts.DisableAutoReconnect {
 | 
						|
		bw.ExitIdle()
 | 
						|
	}
 | 
						|
	bw.es.updateState()
 | 
						|
}
 | 
						|
 | 
						|
// ExitIdle pings an IDLE child balancer to exit idle in a new goroutine to
 | 
						|
// avoid deadlocks due to synchronous balancer state updates.
 | 
						|
func (bw *balancerWrapper) ExitIdle() {
 | 
						|
	go func() {
 | 
						|
		bw.es.childMu.Lock()
 | 
						|
		if !bw.isClosed {
 | 
						|
			bw.child.ExitIdle()
 | 
						|
		}
 | 
						|
		bw.es.childMu.Unlock()
 | 
						|
	}()
 | 
						|
}
 | 
						|
 | 
						|
// updateClientConnStateLocked delivers the ClientConnState to the child
 | 
						|
// balancer. Callers must hold the child mutex of the parent endpointsharding
 | 
						|
// balancer.
 | 
						|
func (bw *balancerWrapper) updateClientConnStateLocked(ccs balancer.ClientConnState) error {
 | 
						|
	return bw.child.UpdateClientConnState(ccs)
 | 
						|
}
 | 
						|
 | 
						|
// closeLocked closes the child balancer. Callers must hold the child mutext of
 | 
						|
// the parent endpointsharding balancer.
 | 
						|
func (bw *balancerWrapper) closeLocked() {
 | 
						|
	bw.child.Close()
 | 
						|
	bw.isClosed = true
 | 
						|
}
 | 
						|
 | 
						|
func (bw *balancerWrapper) resolverErrorLocked(err error) {
 | 
						|
	bw.child.ResolverError(err)
 | 
						|
}
 |