[chore] update dependencies (#4196)

- go.opentelemetry.io/contrib/exporters/autoexport v0.60.0 -> v0.61.0
- go.opentelemetry.io/contrib/instrumentation/runtime v0.60.0 -> v0.61.0

Reviewed-on: https://codeberg.org/superseriousbusiness/gotosocial/pulls/4196
Co-authored-by: kim <grufwub@gmail.com>
Co-committed-by: kim <grufwub@gmail.com>
This commit is contained in:
kim 2025-05-26 16:13:55 +02:00 committed by tobi
commit 143febb318
152 changed files with 2635 additions and 1688 deletions

View file

@ -41,7 +41,7 @@ func (bb *baseBuilder) Build(cc balancer.ClientConn, _ balancer.BuildOptions) ba
cc: cc,
pickerBuilder: bb.pickerBuilder,
subConns: resolver.NewAddressMap(),
subConns: resolver.NewAddressMapV2[balancer.SubConn](),
scStates: make(map[balancer.SubConn]connectivity.State),
csEvltr: &balancer.ConnectivityStateEvaluator{},
config: bb.config,
@ -65,7 +65,7 @@ type baseBalancer struct {
csEvltr *balancer.ConnectivityStateEvaluator
state connectivity.State
subConns *resolver.AddressMap
subConns *resolver.AddressMapV2[balancer.SubConn]
scStates map[balancer.SubConn]connectivity.State
picker balancer.Picker
config Config
@ -100,7 +100,7 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
// Successful resolution; clear resolver error and ensure we return nil.
b.resolverErr = nil
// addrsSet is the set converted from addrs, it's used for quick lookup of an address.
addrsSet := resolver.NewAddressMap()
addrsSet := resolver.NewAddressMapV2[any]()
for _, a := range s.ResolverState.Addresses {
addrsSet.Set(a, nil)
if _, ok := b.subConns.Get(a); !ok {
@ -122,8 +122,7 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
}
}
for _, a := range b.subConns.Keys() {
sci, _ := b.subConns.Get(a)
sc := sci.(balancer.SubConn)
sc, _ := b.subConns.Get(a)
// a was removed by resolver.
if _, ok := addrsSet.Get(a); !ok {
sc.Shutdown()
@ -173,8 +172,7 @@ func (b *baseBalancer) regeneratePicker() {
// Filter out all ready SCs from full subConn map.
for _, addr := range b.subConns.Keys() {
sci, _ := b.subConns.Get(addr)
sc := sci.(balancer.SubConn)
sc, _ := b.subConns.Get(addr)
if st, ok := b.scStates[sc]; ok && st == connectivity.Ready {
readySCs[sc] = SubConnInfo{Address: addr}
}

View file

@ -73,7 +73,7 @@ func NewBalancer(cc balancer.ClientConn, opts balancer.BuildOptions, childBuilde
esOpts: esOpts,
childBuilder: childBuilder,
}
es.children.Store(resolver.NewEndpointMap())
es.children.Store(resolver.NewEndpointMap[*balancerWrapper]())
return es
}
@ -90,7 +90,7 @@ type endpointSharding struct {
// calls into a child. To avoid deadlocks, do not acquire childMu while
// holding mu.
childMu sync.Mutex
children atomic.Pointer[resolver.EndpointMap] // endpoint -> *balancerWrapper
children atomic.Pointer[resolver.EndpointMap[*balancerWrapper]]
// inhibitChildUpdates is set during UpdateClientConnState/ResolverError
// calls (calls to children will each produce an update, only want one
@ -122,7 +122,7 @@ func (es *endpointSharding) UpdateClientConnState(state balancer.ClientConnState
var ret error
children := es.children.Load()
newChildren := resolver.NewEndpointMap()
newChildren := resolver.NewEndpointMap[*balancerWrapper]()
// Update/Create new children.
for _, endpoint := range state.ResolverState.Endpoints {
@ -131,9 +131,8 @@ func (es *endpointSharding) UpdateClientConnState(state balancer.ClientConnState
// update.
continue
}
var childBalancer *balancerWrapper
if val, ok := children.Get(endpoint); ok {
childBalancer = val.(*balancerWrapper)
childBalancer, ok := children.Get(endpoint)
if ok {
// Endpoint attributes may have changed, update the stored endpoint.
es.mu.Lock()
childBalancer.childState.Endpoint = endpoint
@ -166,7 +165,7 @@ func (es *endpointSharding) UpdateClientConnState(state balancer.ClientConnState
for _, e := range children.Keys() {
child, _ := children.Get(e)
if _, ok := newChildren.Get(e); !ok {
child.(*balancerWrapper).closeLocked()
child.closeLocked()
}
}
es.children.Store(newChildren)
@ -189,7 +188,7 @@ func (es *endpointSharding) ResolverError(err error) {
}()
children := es.children.Load()
for _, child := range children.Values() {
child.(*balancerWrapper).resolverErrorLocked(err)
child.resolverErrorLocked(err)
}
}
@ -202,7 +201,7 @@ func (es *endpointSharding) Close() {
defer es.childMu.Unlock()
children := es.children.Load()
for _, child := range children.Values() {
child.(*balancerWrapper).closeLocked()
child.closeLocked()
}
}
@ -222,8 +221,7 @@ func (es *endpointSharding) updateState() {
childStates := make([]ChildState, 0, children.Len())
for _, child := range children.Values() {
bw := child.(*balancerWrapper)
childState := bw.childState
childState := child.childState
childStates = append(childStates, childState)
childPicker := childState.State.Picker
switch childState.State.ConnectivityState {

View file

@ -122,7 +122,7 @@ func (pickfirstBuilder) Build(cc balancer.ClientConn, bo balancer.BuildOptions)
target: bo.Target.String(),
metricsRecorder: cc.MetricsRecorder(),
subConns: resolver.NewAddressMap(),
subConns: resolver.NewAddressMapV2[*scData](),
state: connectivity.Connecting,
cancelConnectionTimer: func() {},
}
@ -220,7 +220,7 @@ type pickfirstBalancer struct {
// updates.
state connectivity.State
// scData for active subonns mapped by address.
subConns *resolver.AddressMap
subConns *resolver.AddressMapV2[*scData]
addressList addressList
firstPass bool
numTF int
@ -319,7 +319,7 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
prevAddr := b.addressList.currentAddress()
prevSCData, found := b.subConns.Get(prevAddr)
prevAddrsCount := b.addressList.size()
isPrevRawConnectivityStateReady := found && prevSCData.(*scData).rawConnectivityState == connectivity.Ready
isPrevRawConnectivityStateReady := found && prevSCData.rawConnectivityState == connectivity.Ready
b.addressList.updateAddrs(newAddrs)
// If the previous ready SubConn exists in new address list,
@ -381,21 +381,21 @@ func (b *pickfirstBalancer) startFirstPassLocked() {
b.numTF = 0
// Reset the connection attempt record for existing SubConns.
for _, sd := range b.subConns.Values() {
sd.(*scData).connectionFailedInFirstPass = false
sd.connectionFailedInFirstPass = false
}
b.requestConnectionLocked()
}
func (b *pickfirstBalancer) closeSubConnsLocked() {
for _, sd := range b.subConns.Values() {
sd.(*scData).subConn.Shutdown()
sd.subConn.Shutdown()
}
b.subConns = resolver.NewAddressMap()
b.subConns = resolver.NewAddressMapV2[*scData]()
}
// deDupAddresses ensures that each address appears only once in the slice.
func deDupAddresses(addrs []resolver.Address) []resolver.Address {
seenAddrs := resolver.NewAddressMap()
seenAddrs := resolver.NewAddressMapV2[*scData]()
retAddrs := []resolver.Address{}
for _, addr := range addrs {
@ -481,7 +481,7 @@ func addressFamily(address string) ipAddrFamily {
// This ensures that the subchannel map accurately reflects the current set of
// addresses received from the name resolver.
func (b *pickfirstBalancer) reconcileSubConnsLocked(newAddrs []resolver.Address) {
newAddrsMap := resolver.NewAddressMap()
newAddrsMap := resolver.NewAddressMapV2[bool]()
for _, addr := range newAddrs {
newAddrsMap.Set(addr, true)
}
@ -491,7 +491,7 @@ func (b *pickfirstBalancer) reconcileSubConnsLocked(newAddrs []resolver.Address)
continue
}
val, _ := b.subConns.Get(oldAddr)
val.(*scData).subConn.Shutdown()
val.subConn.Shutdown()
b.subConns.Delete(oldAddr)
}
}
@ -500,13 +500,12 @@ func (b *pickfirstBalancer) reconcileSubConnsLocked(newAddrs []resolver.Address)
// becomes ready, which means that all other subConn must be shutdown.
func (b *pickfirstBalancer) shutdownRemainingLocked(selected *scData) {
b.cancelConnectionTimer()
for _, v := range b.subConns.Values() {
sd := v.(*scData)
for _, sd := range b.subConns.Values() {
if sd.subConn != selected.subConn {
sd.subConn.Shutdown()
}
}
b.subConns = resolver.NewAddressMap()
b.subConns = resolver.NewAddressMapV2[*scData]()
b.subConns.Set(selected.addr, selected)
}
@ -539,18 +538,17 @@ func (b *pickfirstBalancer) requestConnectionLocked() {
b.subConns.Set(curAddr, sd)
}
scd := sd.(*scData)
switch scd.rawConnectivityState {
switch sd.rawConnectivityState {
case connectivity.Idle:
scd.subConn.Connect()
sd.subConn.Connect()
b.scheduleNextConnectionLocked()
return
case connectivity.TransientFailure:
// The SubConn is being re-used and failed during a previous pass
// over the addressList. It has not completed backoff yet.
// Mark it as having failed and try the next address.
scd.connectionFailedInFirstPass = true
lastErr = scd.lastErr
sd.connectionFailedInFirstPass = true
lastErr = sd.lastErr
continue
case connectivity.Connecting:
// Wait for the connection attempt to complete or the timer to fire
@ -558,7 +556,7 @@ func (b *pickfirstBalancer) requestConnectionLocked() {
b.scheduleNextConnectionLocked()
return
default:
b.logger.Errorf("SubConn with unexpected state %v present in SubConns map.", scd.rawConnectivityState)
b.logger.Errorf("SubConn with unexpected state %v present in SubConns map.", sd.rawConnectivityState)
return
}
@ -753,8 +751,7 @@ func (b *pickfirstBalancer) endFirstPassIfPossibleLocked(lastErr error) {
}
// Connect() has been called on all the SubConns. The first pass can be
// ended if all the SubConns have reported a failure.
for _, v := range b.subConns.Values() {
sd := v.(*scData)
for _, sd := range b.subConns.Values() {
if !sd.connectionFailedInFirstPass {
return
}
@ -765,8 +762,7 @@ func (b *pickfirstBalancer) endFirstPassIfPossibleLocked(lastErr error) {
Picker: &picker{err: lastErr},
})
// Start re-connecting all the SubConns that are already in IDLE.
for _, v := range b.subConns.Values() {
sd := v.(*scData)
for _, sd := range b.subConns.Values() {
if sd.rawConnectivityState == connectivity.Idle {
sd.subConn.Connect()
}
@ -927,6 +923,5 @@ func (al *addressList) hasNext() bool {
// fields that are meaningful to the SubConn.
func equalAddressIgnoringBalAttributes(a, b *resolver.Address) bool {
return a.Addr == b.Addr && a.ServerName == b.ServerName &&
a.Attributes.Equal(b.Attributes) &&
a.Metadata == b.Metadata
a.Attributes.Equal(b.Attributes)
}

View file

@ -18,7 +18,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.36.4
// protoc-gen-go v1.36.5
// protoc v5.27.1
// source: grpc/binlog/v1/binarylog.proto

View file

@ -1231,8 +1231,7 @@ func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error)
// adjustParams updates parameters used to create transports upon
// receiving a GoAway.
func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
switch r {
case transport.GoAwayTooManyPings:
if r == transport.GoAwayTooManyPings {
v := 2 * ac.dopts.copts.KeepaliveParams.Time
ac.cc.mu.Lock()
if v > ac.cc.keepaliveParams.Time {

View file

@ -17,7 +17,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.36.4
// protoc-gen-go v1.36.5
// protoc v5.27.1
// source: grpc/health/v1/health.proto
@ -178,6 +178,87 @@ func (x *HealthCheckResponse) GetStatus() HealthCheckResponse_ServingStatus {
return HealthCheckResponse_UNKNOWN
}
type HealthListRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *HealthListRequest) Reset() {
*x = HealthListRequest{}
mi := &file_grpc_health_v1_health_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *HealthListRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*HealthListRequest) ProtoMessage() {}
func (x *HealthListRequest) ProtoReflect() protoreflect.Message {
mi := &file_grpc_health_v1_health_proto_msgTypes[2]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use HealthListRequest.ProtoReflect.Descriptor instead.
func (*HealthListRequest) Descriptor() ([]byte, []int) {
return file_grpc_health_v1_health_proto_rawDescGZIP(), []int{2}
}
type HealthListResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
// statuses contains all the services and their respective status.
Statuses map[string]*HealthCheckResponse `protobuf:"bytes,1,rep,name=statuses,proto3" json:"statuses,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *HealthListResponse) Reset() {
*x = HealthListResponse{}
mi := &file_grpc_health_v1_health_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *HealthListResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*HealthListResponse) ProtoMessage() {}
func (x *HealthListResponse) ProtoReflect() protoreflect.Message {
mi := &file_grpc_health_v1_health_proto_msgTypes[3]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use HealthListResponse.ProtoReflect.Descriptor instead.
func (*HealthListResponse) Descriptor() ([]byte, []int) {
return file_grpc_health_v1_health_proto_rawDescGZIP(), []int{3}
}
func (x *HealthListResponse) GetStatuses() map[string]*HealthCheckResponse {
if x != nil {
return x.Statuses
}
return nil
}
var File_grpc_health_v1_health_proto protoreflect.FileDescriptor
var file_grpc_health_v1_health_proto_rawDesc = string([]byte{
@ -198,25 +279,44 @@ var file_grpc_health_v1_health_proto_rawDesc = string([]byte{
0x0a, 0x07, 0x53, 0x45, 0x52, 0x56, 0x49, 0x4e, 0x47, 0x10, 0x01, 0x12, 0x0f, 0x0a, 0x0b, 0x4e,
0x4f, 0x54, 0x5f, 0x53, 0x45, 0x52, 0x56, 0x49, 0x4e, 0x47, 0x10, 0x02, 0x12, 0x13, 0x0a, 0x0f,
0x53, 0x45, 0x52, 0x56, 0x49, 0x43, 0x45, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10,
0x03, 0x32, 0xae, 0x01, 0x0a, 0x06, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x12, 0x50, 0x0a, 0x05,
0x43, 0x68, 0x65, 0x63, 0x6b, 0x12, 0x22, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x68, 0x65, 0x61,
0x03, 0x22, 0x13, 0x0a, 0x11, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x4c, 0x69, 0x73, 0x74, 0x52,
0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0xc4, 0x01, 0x0a, 0x12, 0x48, 0x65, 0x61, 0x6c, 0x74,
0x68, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x4c, 0x0a,
0x08, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32,
0x30, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2e, 0x76, 0x31,
0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f,
0x6e, 0x73, 0x65, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x65, 0x73, 0x45, 0x6e, 0x74, 0x72,
0x79, 0x52, 0x08, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x65, 0x73, 0x1a, 0x60, 0x0a, 0x0d, 0x53,
0x74, 0x61, 0x74, 0x75, 0x73, 0x65, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03,
0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x39,
0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x23, 0x2e,
0x67, 0x72, 0x70, 0x63, 0x2e, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2e, 0x76, 0x31, 0x2e, 0x48,
0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
0x73, 0x65, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x32, 0xfd, 0x01,
0x0a, 0x06, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x12, 0x50, 0x0a, 0x05, 0x43, 0x68, 0x65, 0x63,
0x6b, 0x12, 0x22, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2e,
0x76, 0x31, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65,
0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x68, 0x65, 0x61,
0x6c, 0x74, 0x68, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65,
0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x67, 0x72, 0x70, 0x63,
0x2e, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74,
0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x52,
0x0a, 0x05, 0x57, 0x61, 0x74, 0x63, 0x68, 0x12, 0x22, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x68,
0x65, 0x61, 0x6c, 0x74, 0x68, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43,
0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x67, 0x72,
0x70, 0x63, 0x2e, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x65, 0x61,
0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
0x30, 0x01, 0x42, 0x70, 0x0a, 0x11, 0x69, 0x6f, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x68, 0x65,
0x61, 0x6c, 0x74, 0x68, 0x2e, 0x76, 0x31, 0x42, 0x0b, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x50,
0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x2c, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x67,
0x6f, 0x6c, 0x61, 0x6e, 0x67, 0x2e, 0x6f, 0x72, 0x67, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x68,
0x65, 0x61, 0x6c, 0x74, 0x68, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x5f, 0x68, 0x65, 0x61, 0x6c, 0x74,
0x68, 0x5f, 0x76, 0x31, 0xa2, 0x02, 0x0c, 0x47, 0x72, 0x70, 0x63, 0x48, 0x65, 0x61, 0x6c, 0x74,
0x68, 0x56, 0x31, 0xaa, 0x02, 0x0e, 0x47, 0x72, 0x70, 0x63, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74,
0x68, 0x2e, 0x56, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x4d, 0x0a, 0x04, 0x4c, 0x69,
0x73, 0x74, 0x12, 0x21, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68,
0x2e, 0x76, 0x31, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65,
0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x22, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x68, 0x65, 0x61,
0x6c, 0x74, 0x68, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x4c, 0x69, 0x73,
0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x52, 0x0a, 0x05, 0x57, 0x61, 0x74,
0x63, 0x68, 0x12, 0x22, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68,
0x2e, 0x76, 0x31, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52,
0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x68, 0x65,
0x61, 0x6c, 0x74, 0x68, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68,
0x65, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01, 0x42, 0x70, 0x0a,
0x11, 0x69, 0x6f, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2e,
0x76, 0x31, 0x42, 0x0b, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50,
0x01, 0x5a, 0x2c, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x67, 0x6f, 0x6c, 0x61, 0x6e, 0x67,
0x2e, 0x6f, 0x72, 0x67, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68,
0x2f, 0x67, 0x72, 0x70, 0x63, 0x5f, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x5f, 0x76, 0x31, 0xa2,
0x02, 0x0c, 0x47, 0x72, 0x70, 0x63, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x56, 0x31, 0xaa, 0x02,
0x0e, 0x47, 0x72, 0x70, 0x63, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2e, 0x56, 0x31, 0x62,
0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
})
var (
@ -232,23 +332,30 @@ func file_grpc_health_v1_health_proto_rawDescGZIP() []byte {
}
var file_grpc_health_v1_health_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
var file_grpc_health_v1_health_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_grpc_health_v1_health_proto_msgTypes = make([]protoimpl.MessageInfo, 5)
var file_grpc_health_v1_health_proto_goTypes = []any{
(HealthCheckResponse_ServingStatus)(0), // 0: grpc.health.v1.HealthCheckResponse.ServingStatus
(*HealthCheckRequest)(nil), // 1: grpc.health.v1.HealthCheckRequest
(*HealthCheckResponse)(nil), // 2: grpc.health.v1.HealthCheckResponse
(*HealthListRequest)(nil), // 3: grpc.health.v1.HealthListRequest
(*HealthListResponse)(nil), // 4: grpc.health.v1.HealthListResponse
nil, // 5: grpc.health.v1.HealthListResponse.StatusesEntry
}
var file_grpc_health_v1_health_proto_depIdxs = []int32{
0, // 0: grpc.health.v1.HealthCheckResponse.status:type_name -> grpc.health.v1.HealthCheckResponse.ServingStatus
1, // 1: grpc.health.v1.Health.Check:input_type -> grpc.health.v1.HealthCheckRequest
1, // 2: grpc.health.v1.Health.Watch:input_type -> grpc.health.v1.HealthCheckRequest
2, // 3: grpc.health.v1.Health.Check:output_type -> grpc.health.v1.HealthCheckResponse
2, // 4: grpc.health.v1.Health.Watch:output_type -> grpc.health.v1.HealthCheckResponse
3, // [3:5] is the sub-list for method output_type
1, // [1:3] is the sub-list for method input_type
1, // [1:1] is the sub-list for extension type_name
1, // [1:1] is the sub-list for extension extendee
0, // [0:1] is the sub-list for field type_name
5, // 1: grpc.health.v1.HealthListResponse.statuses:type_name -> grpc.health.v1.HealthListResponse.StatusesEntry
2, // 2: grpc.health.v1.HealthListResponse.StatusesEntry.value:type_name -> grpc.health.v1.HealthCheckResponse
1, // 3: grpc.health.v1.Health.Check:input_type -> grpc.health.v1.HealthCheckRequest
3, // 4: grpc.health.v1.Health.List:input_type -> grpc.health.v1.HealthListRequest
1, // 5: grpc.health.v1.Health.Watch:input_type -> grpc.health.v1.HealthCheckRequest
2, // 6: grpc.health.v1.Health.Check:output_type -> grpc.health.v1.HealthCheckResponse
4, // 7: grpc.health.v1.Health.List:output_type -> grpc.health.v1.HealthListResponse
2, // 8: grpc.health.v1.Health.Watch:output_type -> grpc.health.v1.HealthCheckResponse
6, // [6:9] is the sub-list for method output_type
3, // [3:6] is the sub-list for method input_type
3, // [3:3] is the sub-list for extension type_name
3, // [3:3] is the sub-list for extension extendee
0, // [0:3] is the sub-list for field type_name
}
func init() { file_grpc_health_v1_health_proto_init() }
@ -262,7 +369,7 @@ func file_grpc_health_v1_health_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: unsafe.Slice(unsafe.StringData(file_grpc_health_v1_health_proto_rawDesc), len(file_grpc_health_v1_health_proto_rawDesc)),
NumEnums: 1,
NumMessages: 2,
NumMessages: 5,
NumExtensions: 0,
NumServices: 1,
},

View file

@ -37,6 +37,7 @@ const _ = grpc.SupportPackageIsVersion9
const (
Health_Check_FullMethodName = "/grpc.health.v1.Health/Check"
Health_List_FullMethodName = "/grpc.health.v1.Health/List"
Health_Watch_FullMethodName = "/grpc.health.v1.Health/Watch"
)
@ -55,9 +56,19 @@ type HealthClient interface {
//
// Clients should set a deadline when calling Check, and can declare the
// server unhealthy if they do not receive a timely response.
//
// Check implementations should be idempotent and side effect free.
Check(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error)
// List provides a non-atomic snapshot of the health of all the available
// services.
//
// The server may respond with a RESOURCE_EXHAUSTED error if too many services
// exist.
//
// Clients should set a deadline when calling List, and can declare the server
// unhealthy if they do not receive a timely response.
//
// Clients should keep in mind that the list of health services exposed by an
// application can change over the lifetime of the process.
List(ctx context.Context, in *HealthListRequest, opts ...grpc.CallOption) (*HealthListResponse, error)
// Performs a watch for the serving status of the requested service.
// The server will immediately send back a message indicating the current
// serving status. It will then subsequently send a new message whenever
@ -94,6 +105,16 @@ func (c *healthClient) Check(ctx context.Context, in *HealthCheckRequest, opts .
return out, nil
}
func (c *healthClient) List(ctx context.Context, in *HealthListRequest, opts ...grpc.CallOption) (*HealthListResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(HealthListResponse)
err := c.cc.Invoke(ctx, Health_List_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *healthClient) Watch(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[HealthCheckResponse], error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
stream, err := c.cc.NewStream(ctx, &Health_ServiceDesc.Streams[0], Health_Watch_FullMethodName, cOpts...)
@ -128,9 +149,19 @@ type HealthServer interface {
//
// Clients should set a deadline when calling Check, and can declare the
// server unhealthy if they do not receive a timely response.
//
// Check implementations should be idempotent and side effect free.
Check(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error)
// List provides a non-atomic snapshot of the health of all the available
// services.
//
// The server may respond with a RESOURCE_EXHAUSTED error if too many services
// exist.
//
// Clients should set a deadline when calling List, and can declare the server
// unhealthy if they do not receive a timely response.
//
// Clients should keep in mind that the list of health services exposed by an
// application can change over the lifetime of the process.
List(context.Context, *HealthListRequest) (*HealthListResponse, error)
// Performs a watch for the serving status of the requested service.
// The server will immediately send back a message indicating the current
// serving status. It will then subsequently send a new message whenever
@ -159,6 +190,9 @@ type UnimplementedHealthServer struct{}
func (UnimplementedHealthServer) Check(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Check not implemented")
}
func (UnimplementedHealthServer) List(context.Context, *HealthListRequest) (*HealthListResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method List not implemented")
}
func (UnimplementedHealthServer) Watch(*HealthCheckRequest, grpc.ServerStreamingServer[HealthCheckResponse]) error {
return status.Errorf(codes.Unimplemented, "method Watch not implemented")
}
@ -200,6 +234,24 @@ func _Health_Check_Handler(srv interface{}, ctx context.Context, dec func(interf
return interceptor(ctx, in, info, handler)
}
func _Health_List_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(HealthListRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(HealthServer).List(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: Health_List_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(HealthServer).List(ctx, req.(*HealthListRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Health_Watch_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(HealthCheckRequest)
if err := stream.RecvMsg(m); err != nil {
@ -222,6 +274,10 @@ var Health_ServiceDesc = grpc.ServiceDesc{
MethodName: "Check",
Handler: _Health_Check_Handler,
},
{
MethodName: "List",
Handler: _Health_List_Handler,
},
},
Streams: []grpc.StreamDesc{
{

View file

@ -51,10 +51,24 @@ var (
// xDS server in the list of server configs will be used.
XDSFallbackSupport = boolFromEnv("GRPC_EXPERIMENTAL_XDS_FALLBACK", true)
// NewPickFirstEnabled is set if the new pickfirst leaf policy is to be used
// instead of the exiting pickfirst implementation. This can be enabled by
// instead of the exiting pickfirst implementation. This can be disabled by
// setting the environment variable "GRPC_EXPERIMENTAL_ENABLE_NEW_PICK_FIRST"
// to "true".
NewPickFirstEnabled = boolFromEnv("GRPC_EXPERIMENTAL_ENABLE_NEW_PICK_FIRST", false)
// to "false".
NewPickFirstEnabled = boolFromEnv("GRPC_EXPERIMENTAL_ENABLE_NEW_PICK_FIRST", true)
// XDSEndpointHashKeyBackwardCompat controls the parsing of the endpoint hash
// key from EDS LbEndpoint metadata. Endpoint hash keys can be disabled by
// setting "GRPC_XDS_ENDPOINT_HASH_KEY_BACKWARD_COMPAT" to "true". When the
// implementation of A76 is stable, we will flip the default value to false
// in a subsequent release. A final release will remove this environment
// variable, enabling the new behavior unconditionally.
XDSEndpointHashKeyBackwardCompat = boolFromEnv("GRPC_XDS_ENDPOINT_HASH_KEY_BACKWARD_COMPAT", true)
// RingHashSetRequestHashKey is set if the ring hash balancer can get the
// request hash header by setting the "requestHashHeader" field, according
// to gRFC A76. It can be enabled by setting the environment variable
// "GRPC_EXPERIMENTAL_RING_HASH_SET_REQUEST_HASH_KEY" to "true".
RingHashSetRequestHashKey = boolFromEnv("GRPC_EXPERIMENTAL_RING_HASH_SET_REQUEST_HASH_KEY", false)
)
func boolFromEnv(envVar string, def bool) bool {

View file

@ -259,6 +259,13 @@ var (
// SetBufferPoolingThresholdForTesting updates the buffer pooling threshold, for
// testing purposes.
SetBufferPoolingThresholdForTesting any // func(int)
// TimeAfterFunc is used to create timers. During tests the function is
// replaced to track allocated timers and fail the test if a timer isn't
// cancelled.
TimeAfterFunc = func(d time.Duration, f func()) Timer {
return time.AfterFunc(d, f)
}
)
// HealthChecker defines the signature of the client-side LB channel health
@ -300,3 +307,9 @@ type EnforceSubConnEmbedding interface {
type EnforceClientConnEmbedding interface {
enforceClientConnEmbedding()
}
// Timer is an interface to allow injecting different time.Timer implementations
// during tests.
type Timer interface {
Stop() bool
}

View file

@ -97,13 +97,11 @@ func hasNotPrintable(msg string) bool {
return false
}
// ValidatePair validate a key-value pair with the following rules (the pseudo-header will be skipped) :
//
// - key must contain one or more characters.
// - the characters in the key must be contained in [0-9 a-z _ - .].
// - if the key ends with a "-bin" suffix, no validation of the corresponding value is performed.
// - the characters in the every value must be printable (in [%x20-%x7E]).
func ValidatePair(key string, vals ...string) error {
// ValidateKey validates a key with the following rules (pseudo-headers are
// skipped):
// - the key must contain one or more characters.
// - the characters in the key must be in [0-9 a-z _ - .].
func ValidateKey(key string) error {
// key should not be empty
if key == "" {
return fmt.Errorf("there is an empty key in the header")
@ -119,6 +117,20 @@ func ValidatePair(key string, vals ...string) error {
return fmt.Errorf("header key %q contains illegal characters not in [0-9a-z-_.]", key)
}
}
return nil
}
// ValidatePair validates a key-value pair with the following rules
// (pseudo-header are skipped):
// - the key must contain one or more characters.
// - the characters in the key must be in [0-9 a-z _ - .].
// - if the key ends with a "-bin" suffix, no validation of the corresponding
// value is performed.
// - the characters in every value must be printable (in [%x20-%x7E]).
func ValidatePair(key string, vals ...string) error {
if err := ValidateKey(key); err != nil {
return err
}
if strings.HasSuffix(key, "-bin") {
return nil
}

View file

@ -28,6 +28,8 @@ import (
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/proxyattributes"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/internal/transport/networktype"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
)
@ -40,19 +42,26 @@ var (
// delegatingResolver manages both target URI and proxy address resolution by
// delegating these tasks to separate child resolvers. Essentially, it acts as
// a intermediary between the gRPC ClientConn and the child resolvers.
// an intermediary between the gRPC ClientConn and the child resolvers.
//
// It implements the [resolver.Resolver] interface.
type delegatingResolver struct {
target resolver.Target // parsed target URI to be resolved
cc resolver.ClientConn // gRPC ClientConn
targetResolver resolver.Resolver // resolver for the target URI, based on its scheme
proxyResolver resolver.Resolver // resolver for the proxy URI; nil if no proxy is configured
proxyURL *url.URL // proxy URL, derived from proxy environment and target
target resolver.Target // parsed target URI to be resolved
cc resolver.ClientConn // gRPC ClientConn
proxyURL *url.URL // proxy URL, derived from proxy environment and target
// We do not hold both mu and childMu in the same goroutine. Avoid holding
// both locks when calling into the child, as the child resolver may
// synchronously callback into the channel.
mu sync.Mutex // protects all the fields below
targetResolverState *resolver.State // state of the target resolver
proxyAddrs []resolver.Address // resolved proxy addresses; empty if no proxy is configured
// childMu serializes calls into child resolvers. It also protects access to
// the following fields.
childMu sync.Mutex
targetResolver resolver.Resolver // resolver for the target URI, based on its scheme
proxyResolver resolver.Resolver // resolver for the proxy URI; nil if no proxy is configured
}
// nopResolver is a resolver that does nothing.
@ -62,8 +71,8 @@ func (nopResolver) ResolveNow(resolver.ResolveNowOptions) {}
func (nopResolver) Close() {}
// proxyURLForTarget determines the proxy URL for the given address based on
// the environment. It can return the following:
// proxyURLForTarget determines the proxy URL for the given address based on the
// environment. It can return the following:
// - nil URL, nil error: No proxy is configured or the address is excluded
// using the `NO_PROXY` environment variable or if req.URL.Host is
// "localhost" (with or without // a port number)
@ -82,7 +91,8 @@ func proxyURLForTarget(address string) (*url.URL, error) {
// resolvers:
// - one to resolve the proxy address specified using the supported
// environment variables. This uses the registered resolver for the "dns"
// scheme.
// scheme. It is lazily built when a target resolver update contains at least
// one TCP address.
// - one to resolve the target URI using the resolver specified by the scheme
// in the target URI or specified by the user using the WithResolvers dial
// option. As a special case, if the target URI's scheme is "dns" and a
@ -91,8 +101,10 @@ func proxyURLForTarget(address string) (*url.URL, error) {
// resolution is enabled using the dial option.
func New(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions, targetResolverBuilder resolver.Builder, targetResolutionEnabled bool) (resolver.Resolver, error) {
r := &delegatingResolver{
target: target,
cc: cc,
target: target,
cc: cc,
proxyResolver: nopResolver{},
targetResolver: nopResolver{},
}
var err error
@ -111,41 +123,34 @@ func New(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOpti
logger.Infof("Proxy URL detected : %s", r.proxyURL)
}
// Resolver updates from one child may trigger calls into the other. Block
// updates until the children are initialized.
r.childMu.Lock()
defer r.childMu.Unlock()
// When the scheme is 'dns' and target resolution on client is not enabled,
// resolution should be handled by the proxy, not the client. Therefore, we
// bypass the target resolver and store the unresolved target address.
if target.URL.Scheme == "dns" && !targetResolutionEnabled {
state := resolver.State{
r.targetResolverState = &resolver.State{
Addresses: []resolver.Address{{Addr: target.Endpoint()}},
Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: target.Endpoint()}}}},
}
r.targetResolverState = &state
} else {
wcc := &wrappingClientConn{
stateListener: r.updateTargetResolverState,
parent: r,
}
if r.targetResolver, err = targetResolverBuilder.Build(target, wcc, opts); err != nil {
return nil, fmt.Errorf("delegating_resolver: unable to build the resolver for target %s: %v", target, err)
}
r.updateTargetResolverState(*r.targetResolverState)
return r, nil
}
if r.proxyResolver, err = r.proxyURIResolver(opts); err != nil {
return nil, fmt.Errorf("delegating_resolver: failed to build resolver for proxy URL %q: %v", r.proxyURL, err)
wcc := &wrappingClientConn{
stateListener: r.updateTargetResolverState,
parent: r,
}
if r.targetResolver == nil {
r.targetResolver = nopResolver{}
}
if r.proxyResolver == nil {
r.proxyResolver = nopResolver{}
if r.targetResolver, err = targetResolverBuilder.Build(target, wcc, opts); err != nil {
return nil, fmt.Errorf("delegating_resolver: unable to build the resolver for target %s: %v", target, err)
}
return r, nil
}
// proxyURIResolver creates a resolver for resolving proxy URIs using the
// "dns" scheme. It adjusts the proxyURL to conform to the "dns:///" format and
// builds a resolver with a wrappingClientConn to capture resolved addresses.
// proxyURIResolver creates a resolver for resolving proxy URIs using the "dns"
// scheme. It adjusts the proxyURL to conform to the "dns:///" format and builds
// a resolver with a wrappingClientConn to capture resolved addresses.
func (r *delegatingResolver) proxyURIResolver(opts resolver.BuildOptions) (resolver.Resolver, error) {
proxyBuilder := resolver.Get("dns")
if proxyBuilder == nil {
@ -165,11 +170,15 @@ func (r *delegatingResolver) proxyURIResolver(opts resolver.BuildOptions) (resol
}
func (r *delegatingResolver) ResolveNow(o resolver.ResolveNowOptions) {
r.childMu.Lock()
defer r.childMu.Unlock()
r.targetResolver.ResolveNow(o)
r.proxyResolver.ResolveNow(o)
}
func (r *delegatingResolver) Close() {
r.childMu.Lock()
defer r.childMu.Unlock()
r.targetResolver.Close()
r.targetResolver = nil
@ -177,18 +186,43 @@ func (r *delegatingResolver) Close() {
r.proxyResolver = nil
}
// updateClientConnStateLocked creates a list of combined addresses by
// pairing each proxy address with every target address. For each pair, it
// generates a new [resolver.Address] using the proxy address, and adding the
// target address as the attribute along with user info. It returns nil if
// either resolver has not sent update even once and returns the error from
// ClientConn update once both resolvers have sent update atleast once.
func networkTypeFromAddr(addr resolver.Address) string {
networkType, ok := networktype.Get(addr)
if !ok {
networkType, _ = transport.ParseDialTarget(addr.Addr)
}
return networkType
}
func isTCPAddressPresent(state *resolver.State) bool {
for _, addr := range state.Addresses {
if networkType := networkTypeFromAddr(addr); networkType == "tcp" {
return true
}
}
for _, endpoint := range state.Endpoints {
for _, addr := range endpoint.Addresses {
if networktype := networkTypeFromAddr(addr); networktype == "tcp" {
return true
}
}
}
return false
}
// updateClientConnStateLocked constructs a combined list of addresses by
// pairing each proxy address with every target address of type TCP. For each
// pair, it creates a new [resolver.Address] using the proxy address and
// attaches the corresponding target address and user info as attributes. Target
// addresses that are not of type TCP are appended to the list as-is. The
// function returns nil if either resolver has not yet provided an update, and
// returns the result of ClientConn.UpdateState once both resolvers have
// provided at least one update.
func (r *delegatingResolver) updateClientConnStateLocked() error {
if r.targetResolverState == nil || r.proxyAddrs == nil {
return nil
}
curState := *r.targetResolverState
// If multiple resolved proxy addresses are present, we send only the
// unresolved proxy host and let net.Dial handle the proxy host name
// resolution when creating the transport. Sending all resolved addresses
@ -206,24 +240,30 @@ func (r *delegatingResolver) updateClientConnStateLocked() error {
}
var addresses []resolver.Address
for _, targetAddr := range (*r.targetResolverState).Addresses {
// Avoid proxy when network is not tcp.
if networkType := networkTypeFromAddr(targetAddr); networkType != "tcp" {
addresses = append(addresses, targetAddr)
continue
}
addresses = append(addresses, proxyattributes.Set(proxyAddr, proxyattributes.Options{
User: r.proxyURL.User,
ConnectAddr: targetAddr.Addr,
}))
}
// Create a list of combined endpoints by pairing all proxy endpoints
// with every target endpoint. Each time, it constructs a new
// [resolver.Endpoint] using the all addresses from all the proxy endpoint
// and the target addresses from one endpoint. The target address and user
// information from the proxy URL are added as attributes to the proxy
// address.The resulting list of addresses is then grouped into endpoints,
// covering all combinations of proxy and target endpoints.
// For each target endpoint, construct a new [resolver.Endpoint] that
// includes all addresses from all proxy endpoints and the addresses from
// that target endpoint, preserving the number of target endpoints.
var endpoints []resolver.Endpoint
for _, endpt := range (*r.targetResolverState).Endpoints {
var addrs []resolver.Address
for _, proxyAddr := range r.proxyAddrs {
for _, targetAddr := range endpt.Addresses {
for _, targetAddr := range endpt.Addresses {
// Avoid proxy when network is not tcp.
if networkType := networkTypeFromAddr(targetAddr); networkType != "tcp" {
addrs = append(addrs, targetAddr)
continue
}
for _, proxyAddr := range r.proxyAddrs {
addrs = append(addrs, proxyattributes.Set(proxyAddr, proxyattributes.Options{
User: r.proxyURL.User,
ConnectAddr: targetAddr.Addr,
@ -234,8 +274,9 @@ func (r *delegatingResolver) updateClientConnStateLocked() error {
}
// Use the targetResolverState for its service config and attributes
// contents. The state update is only sent after both the target and proxy
// resolvers have sent their updates, and curState has been updated with
// the combined addresses.
// resolvers have sent their updates, and curState has been updated with the
// combined addresses.
curState := *r.targetResolverState
curState.Addresses = addresses
curState.Endpoints = endpoints
return r.cc.UpdateState(curState)
@ -245,7 +286,8 @@ func (r *delegatingResolver) updateClientConnStateLocked() error {
// addresses and endpoints, marking the resolver as ready, and triggering a
// state update if both proxy and target resolvers are ready. If the ClientConn
// returns a non-nil error, it calls `ResolveNow()` on the target resolver. It
// is a StateListener function of wrappingClientConn passed to the proxy resolver.
// is a StateListener function of wrappingClientConn passed to the proxy
// resolver.
func (r *delegatingResolver) updateProxyResolverState(state resolver.State) error {
r.mu.Lock()
defer r.mu.Unlock()
@ -253,8 +295,8 @@ func (r *delegatingResolver) updateProxyResolverState(state resolver.State) erro
logger.Infof("Addresses received from proxy resolver: %s", state.Addresses)
}
if len(state.Endpoints) > 0 {
// We expect exactly one address per endpoint because the proxy
// resolver uses "dns" resolution.
// We expect exactly one address per endpoint because the proxy resolver
// uses "dns" resolution.
r.proxyAddrs = make([]resolver.Address, 0, len(state.Endpoints))
for _, endpoint := range state.Endpoints {
r.proxyAddrs = append(r.proxyAddrs, endpoint.Addresses...)
@ -267,20 +309,29 @@ func (r *delegatingResolver) updateProxyResolverState(state resolver.State) erro
err := r.updateClientConnStateLocked()
// Another possible approach was to block until updates are received from
// both resolvers. But this is not used because calling `New()` triggers
// `Build()` for the first resolver, which calls `UpdateState()`. And the
// `Build()` for the first resolver, which calls `UpdateState()`. And the
// second resolver hasn't sent an update yet, so it would cause `New()` to
// block indefinitely.
if err != nil {
r.targetResolver.ResolveNow(resolver.ResolveNowOptions{})
go func() {
r.childMu.Lock()
defer r.childMu.Unlock()
if r.targetResolver != nil {
r.targetResolver.ResolveNow(resolver.ResolveNowOptions{})
}
}()
}
return err
}
// updateTargetResolverState updates the target resolver state by storing target
// addresses, endpoints, and service config, marking the resolver as ready, and
// triggering a state update if both resolvers are ready. If the ClientConn
// returns a non-nil error, it calls `ResolveNow()` on the proxy resolver. It
// is a StateListener function of wrappingClientConn passed to the target resolver.
// updateTargetResolverState is the StateListener function provided to the
// target resolver via wrappingClientConn. It updates the resolver state and
// marks the target resolver as ready. If the update includes at least one TCP
// address and the proxy resolver has not yet been constructed, it initializes
// the proxy resolver. A combined state update is triggered once both resolvers
// are ready. If all addresses are non-TCP, it proceeds without waiting for the
// proxy resolver. If ClientConn.UpdateState returns a non-nil error,
// ResolveNow() is called on the proxy resolver.
func (r *delegatingResolver) updateTargetResolverState(state resolver.State) error {
r.mu.Lock()
defer r.mu.Unlock()
@ -289,9 +340,40 @@ func (r *delegatingResolver) updateTargetResolverState(state resolver.State) err
logger.Infof("Addresses received from target resolver: %v", state.Addresses)
}
r.targetResolverState = &state
// If no addresses returned by resolver have network type as tcp , do not
// wait for proxy update.
if !isTCPAddressPresent(r.targetResolverState) {
return r.cc.UpdateState(*r.targetResolverState)
}
// The proxy resolver may be rebuilt multiple times, specifically each time
// the target resolver sends an update, even if the target resolver is built
// successfully but building the proxy resolver fails.
if len(r.proxyAddrs) == 0 {
go func() {
r.childMu.Lock()
defer r.childMu.Unlock()
if _, ok := r.proxyResolver.(nopResolver); !ok {
return
}
proxyResolver, err := r.proxyURIResolver(resolver.BuildOptions{})
if err != nil {
r.cc.ReportError(fmt.Errorf("delegating_resolver: unable to build the proxy resolver: %v", err))
return
}
r.proxyResolver = proxyResolver
}()
}
err := r.updateClientConnStateLocked()
if err != nil {
r.proxyResolver.ResolveNow(resolver.ResolveNowOptions{})
go func() {
r.childMu.Lock()
defer r.childMu.Unlock()
if r.proxyResolver != nil {
r.proxyResolver.ResolveNow(resolver.ResolveNowOptions{})
}
}()
}
return nil
}
@ -311,7 +393,8 @@ func (wcc *wrappingClientConn) UpdateState(state resolver.State) error {
return wcc.stateListener(state)
}
// ReportError intercepts errors from the child resolvers and passes them to ClientConn.
// ReportError intercepts errors from the child resolvers and passes them to
// ClientConn.
func (wcc *wrappingClientConn) ReportError(err error) {
wcc.parent.cc.ReportError(err)
}
@ -322,8 +405,8 @@ func (wcc *wrappingClientConn) NewAddress(addrs []resolver.Address) {
wcc.UpdateState(resolver.State{Addresses: addrs})
}
// ParseServiceConfig parses the provided service config and returns an
// object that provides the parsed config.
// ParseServiceConfig parses the provided service config and returns an object
// that provides the parsed config.
func (wcc *wrappingClientConn) ParseServiceConfig(serviceConfigJSON string) *serviceconfig.ParseResult {
return wcc.parent.cc.ParseServiceConfig(serviceConfigJSON)
}

View file

@ -59,7 +59,7 @@ func (s *ClientStream) Read(n int) (mem.BufferSlice, error) {
return b, err
}
// Close closes the stream and popagates err to any readers.
// Close closes the stream and propagates err to any readers.
func (s *ClientStream) Close(err error) {
var (
rst bool

View file

@ -176,7 +176,7 @@ func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error
return fn(ctx, address)
}
if !ok {
networkType, address = parseDialTarget(address)
networkType, address = ParseDialTarget(address)
}
if opts, present := proxyattributes.Get(addr); present {
return proxyDial(ctx, addr, grpcUA, opts)
@ -1242,7 +1242,8 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
statusCode = codes.DeadlineExceeded
}
}
t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %v", f.ErrCode), nil, false)
st := status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %v", f.ErrCode)
t.closeStream(s, st.Err(), false, http2.ErrCodeNo, st, nil, false)
}
func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) {
@ -1390,8 +1391,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) error {
// the caller.
func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) {
t.goAwayReason = GoAwayNoReason
switch f.ErrCode {
case http2.ErrCodeEnhanceYourCalm:
if f.ErrCode == http2.ErrCodeEnhanceYourCalm {
if string(f.DebugData()) == "too_many_pings" {
t.goAwayReason = GoAwayTooManyPings
}

View file

@ -35,6 +35,7 @@ import (
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcutil"
"google.golang.org/grpc/internal/pretty"
@ -598,6 +599,22 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade
if len(t.activeStreams) == 1 {
t.idle = time.Time{}
}
// Start a timer to close the stream on reaching the deadline.
if timeoutSet {
// We need to wait for s.cancel to be updated before calling
// t.closeStream to avoid data races.
cancelUpdated := make(chan struct{})
timer := internal.TimeAfterFunc(timeout, func() {
<-cancelUpdated
t.closeStream(s, true, http2.ErrCodeCancel, false)
})
oldCancel := s.cancel
s.cancel = func() {
oldCancel()
timer.Stop()
}
close(cancelUpdated)
}
t.mu.Unlock()
if channelz.IsOn() {
t.channelz.SocketMetrics.StreamsStarted.Add(1)
@ -1274,7 +1291,6 @@ func (t *http2Server) Close(err error) {
// deleteStream deletes the stream s from transport's active streams.
func (t *http2Server) deleteStream(s *ServerStream, eosReceived bool) {
t.mu.Lock()
if _, ok := t.activeStreams[s.id]; ok {
delete(t.activeStreams, s.id)
@ -1324,7 +1340,10 @@ func (t *http2Server) closeStream(s *ServerStream, rst bool, rstCode http2.ErrCo
// called to interrupt the potential blocking on other goroutines.
s.cancel()
s.swapState(streamDone)
oldState := s.swapState(streamDone)
if oldState == streamDone {
return
}
t.deleteStream(s, eosReceived)
t.controlBuf.put(&cleanupStream{

View file

@ -439,8 +439,8 @@ func getWriteBufferPool(size int) *sync.Pool {
return pool
}
// parseDialTarget returns the network and address to pass to dialer.
func parseDialTarget(target string) (string, string) {
// ParseDialTarget returns the network and address to pass to dialer.
func ParseDialTarget(target string) (string, string) {
net := "tcp"
m1 := strings.Index(target, ":")
m2 := strings.Index(target, ":/")

View file

@ -35,8 +35,10 @@ type ServerStream struct {
*Stream // Embed for common stream functionality.
st internalServerTransport
ctxDone <-chan struct{} // closed at the end of stream. Cache of ctx.Done() (for performance)
cancel context.CancelFunc // invoked at the end of stream to cancel ctx.
ctxDone <-chan struct{} // closed at the end of stream. Cache of ctx.Done() (for performance)
// cancel is invoked at the end of stream to cancel ctx. It also stops the
// timer for monitoring the rpc deadline if configured.
cancel func()
// Holds compressor names passed in grpc-accept-encoding metadata from the
// client.

View file

@ -18,16 +18,28 @@
package resolver
type addressMapEntry struct {
import (
"encoding/base64"
"sort"
"strings"
)
type addressMapEntry[T any] struct {
addr Address
value any
value T
}
// AddressMap is a map of addresses to arbitrary values taking into account
// AddressMap is an AddressMapV2[any]. It will be deleted in an upcoming
// release of grpc-go.
//
// Deprecated: use the generic AddressMapV2 type instead.
type AddressMap = AddressMapV2[any]
// AddressMapV2 is a map of addresses to arbitrary values taking into account
// Attributes. BalancerAttributes are ignored, as are Metadata and Type.
// Multiple accesses may not be performed concurrently. Must be created via
// NewAddressMap; do not construct directly.
type AddressMap struct {
type AddressMapV2[T any] struct {
// The underlying map is keyed by an Address with fields that we don't care
// about being set to their zero values. The only fields that we care about
// are `Addr`, `ServerName` and `Attributes`. Since we need to be able to
@ -41,23 +53,30 @@ type AddressMap struct {
// The value type of the map contains a slice of addresses which match the key
// in their `Addr` and `ServerName` fields and contain the corresponding value
// associated with them.
m map[Address]addressMapEntryList
m map[Address]addressMapEntryList[T]
}
func toMapKey(addr *Address) Address {
return Address{Addr: addr.Addr, ServerName: addr.ServerName}
}
type addressMapEntryList []*addressMapEntry
type addressMapEntryList[T any] []*addressMapEntry[T]
// NewAddressMap creates a new AddressMap.
// NewAddressMap creates a new AddressMapV2[any].
//
// Deprecated: use the generic NewAddressMapV2 constructor instead.
func NewAddressMap() *AddressMap {
return &AddressMap{m: make(map[Address]addressMapEntryList)}
return NewAddressMapV2[any]()
}
// NewAddressMapV2 creates a new AddressMapV2.
func NewAddressMapV2[T any]() *AddressMapV2[T] {
return &AddressMapV2[T]{m: make(map[Address]addressMapEntryList[T])}
}
// find returns the index of addr in the addressMapEntry slice, or -1 if not
// present.
func (l addressMapEntryList) find(addr Address) int {
func (l addressMapEntryList[T]) find(addr Address) int {
for i, entry := range l {
// Attributes are the only thing to match on here, since `Addr` and
// `ServerName` are already equal.
@ -69,28 +88,28 @@ func (l addressMapEntryList) find(addr Address) int {
}
// Get returns the value for the address in the map, if present.
func (a *AddressMap) Get(addr Address) (value any, ok bool) {
func (a *AddressMapV2[T]) Get(addr Address) (value T, ok bool) {
addrKey := toMapKey(&addr)
entryList := a.m[addrKey]
if entry := entryList.find(addr); entry != -1 {
return entryList[entry].value, true
}
return nil, false
return value, false
}
// Set updates or adds the value to the address in the map.
func (a *AddressMap) Set(addr Address, value any) {
func (a *AddressMapV2[T]) Set(addr Address, value T) {
addrKey := toMapKey(&addr)
entryList := a.m[addrKey]
if entry := entryList.find(addr); entry != -1 {
entryList[entry].value = value
return
}
a.m[addrKey] = append(entryList, &addressMapEntry{addr: addr, value: value})
a.m[addrKey] = append(entryList, &addressMapEntry[T]{addr: addr, value: value})
}
// Delete removes addr from the map.
func (a *AddressMap) Delete(addr Address) {
func (a *AddressMapV2[T]) Delete(addr Address) {
addrKey := toMapKey(&addr)
entryList := a.m[addrKey]
entry := entryList.find(addr)
@ -107,7 +126,7 @@ func (a *AddressMap) Delete(addr Address) {
}
// Len returns the number of entries in the map.
func (a *AddressMap) Len() int {
func (a *AddressMapV2[T]) Len() int {
ret := 0
for _, entryList := range a.m {
ret += len(entryList)
@ -116,7 +135,7 @@ func (a *AddressMap) Len() int {
}
// Keys returns a slice of all current map keys.
func (a *AddressMap) Keys() []Address {
func (a *AddressMapV2[T]) Keys() []Address {
ret := make([]Address, 0, a.Len())
for _, entryList := range a.m {
for _, entry := range entryList {
@ -127,8 +146,8 @@ func (a *AddressMap) Keys() []Address {
}
// Values returns a slice of all current map values.
func (a *AddressMap) Values() []any {
ret := make([]any, 0, a.Len())
func (a *AddressMapV2[T]) Values() []T {
ret := make([]T, 0, a.Len())
for _, entryList := range a.m {
for _, entry := range entryList {
ret = append(ret, entry.value)
@ -137,70 +156,65 @@ func (a *AddressMap) Values() []any {
return ret
}
type endpointNode struct {
addrs map[string]struct{}
}
// Equal returns whether the unordered set of addrs are the same between the
// endpoint nodes.
func (en *endpointNode) Equal(en2 *endpointNode) bool {
if len(en.addrs) != len(en2.addrs) {
return false
}
for addr := range en.addrs {
if _, ok := en2.addrs[addr]; !ok {
return false
}
}
return true
}
func toEndpointNode(endpoint Endpoint) endpointNode {
en := make(map[string]struct{})
for _, addr := range endpoint.Addresses {
en[addr.Addr] = struct{}{}
}
return endpointNode{
addrs: en,
}
}
type endpointMapKey string
// EndpointMap is a map of endpoints to arbitrary values keyed on only the
// unordered set of address strings within an endpoint. This map is not thread
// safe, thus it is unsafe to access concurrently. Must be created via
// NewEndpointMap; do not construct directly.
type EndpointMap struct {
endpoints map[*endpointNode]any
type EndpointMap[T any] struct {
endpoints map[endpointMapKey]endpointData[T]
}
type endpointData[T any] struct {
// decodedKey stores the original key to avoid decoding when iterating on
// EndpointMap keys.
decodedKey Endpoint
value T
}
// NewEndpointMap creates a new EndpointMap.
func NewEndpointMap() *EndpointMap {
return &EndpointMap{
endpoints: make(map[*endpointNode]any),
func NewEndpointMap[T any]() *EndpointMap[T] {
return &EndpointMap[T]{
endpoints: make(map[endpointMapKey]endpointData[T]),
}
}
// encodeEndpoint returns a string that uniquely identifies the unordered set of
// addresses within an endpoint.
func encodeEndpoint(e Endpoint) endpointMapKey {
addrs := make([]string, 0, len(e.Addresses))
// base64 encoding the address strings restricts the characters present
// within the strings. This allows us to use a delimiter without the need of
// escape characters.
for _, addr := range e.Addresses {
addrs = append(addrs, base64.StdEncoding.EncodeToString([]byte(addr.Addr)))
}
sort.Strings(addrs)
// " " should not appear in base64 encoded strings.
return endpointMapKey(strings.Join(addrs, " "))
}
// Get returns the value for the address in the map, if present.
func (em *EndpointMap) Get(e Endpoint) (value any, ok bool) {
en := toEndpointNode(e)
if endpoint := em.find(en); endpoint != nil {
return em.endpoints[endpoint], true
func (em *EndpointMap[T]) Get(e Endpoint) (value T, ok bool) {
val, found := em.endpoints[encodeEndpoint(e)]
if found {
return val.value, true
}
return nil, false
return value, false
}
// Set updates or adds the value to the address in the map.
func (em *EndpointMap) Set(e Endpoint, value any) {
en := toEndpointNode(e)
if endpoint := em.find(en); endpoint != nil {
em.endpoints[endpoint] = value
return
func (em *EndpointMap[T]) Set(e Endpoint, value T) {
en := encodeEndpoint(e)
em.endpoints[en] = endpointData[T]{
decodedKey: Endpoint{Addresses: e.Addresses},
value: value,
}
em.endpoints[&en] = value
}
// Len returns the number of entries in the map.
func (em *EndpointMap) Len() int {
func (em *EndpointMap[T]) Len() int {
return len(em.endpoints)
}
@ -209,43 +223,25 @@ func (em *EndpointMap) Len() int {
// the unordered set of addresses. Thus, endpoint information returned is not
// the full endpoint data (drops duplicated addresses and attributes) but can be
// used for EndpointMap accesses.
func (em *EndpointMap) Keys() []Endpoint {
func (em *EndpointMap[T]) Keys() []Endpoint {
ret := make([]Endpoint, 0, len(em.endpoints))
for en := range em.endpoints {
var endpoint Endpoint
for addr := range en.addrs {
endpoint.Addresses = append(endpoint.Addresses, Address{Addr: addr})
}
ret = append(ret, endpoint)
for _, en := range em.endpoints {
ret = append(ret, en.decodedKey)
}
return ret
}
// Values returns a slice of all current map values.
func (em *EndpointMap) Values() []any {
ret := make([]any, 0, len(em.endpoints))
func (em *EndpointMap[T]) Values() []T {
ret := make([]T, 0, len(em.endpoints))
for _, val := range em.endpoints {
ret = append(ret, val)
ret = append(ret, val.value)
}
return ret
}
// find returns a pointer to the endpoint node in em if the endpoint node is
// already present. If not found, nil is returned. The comparisons are done on
// the unordered set of addresses within an endpoint.
func (em EndpointMap) find(e endpointNode) *endpointNode {
for endpoint := range em.endpoints {
if e.Equal(endpoint) {
return endpoint
}
}
return nil
}
// Delete removes the specified endpoint from the map.
func (em *EndpointMap) Delete(e Endpoint) {
en := toEndpointNode(e)
if entry := em.find(en); entry != nil {
delete(em.endpoints, entry)
}
func (em *EndpointMap[T]) Delete(e Endpoint) {
en := encodeEndpoint(e)
delete(em.endpoints, en)
}

View file

@ -134,12 +134,7 @@ func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {
return nil
}
if s.Endpoints == nil {
s.Endpoints = make([]resolver.Endpoint, 0, len(s.Addresses))
for _, a := range s.Addresses {
ep := resolver.Endpoint{Addresses: []resolver.Address{a}, Attributes: a.BalancerAttributes}
ep.Addresses[0].BalancerAttributes = nil
s.Endpoints = append(s.Endpoints, ep)
}
s.Endpoints = addressesToEndpoints(s.Addresses)
}
ccr.addChannelzTraceEvent(s)
ccr.curState = s
@ -172,7 +167,11 @@ func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
ccr.cc.mu.Unlock()
return
}
s := resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig}
s := resolver.State{
Addresses: addrs,
ServiceConfig: ccr.curState.ServiceConfig,
Endpoints: addressesToEndpoints(addrs),
}
ccr.addChannelzTraceEvent(s)
ccr.curState = s
ccr.mu.Unlock()
@ -210,3 +209,13 @@ func (ccr *ccResolverWrapper) addChannelzTraceEvent(s resolver.State) {
}
channelz.Infof(logger, ccr.cc.channelz, "Resolver state updated: %s (%v)", pretty.ToJSON(s), strings.Join(updates, "; "))
}
func addressesToEndpoints(addrs []resolver.Address) []resolver.Endpoint {
endpoints := make([]resolver.Endpoint, 0, len(addrs))
for _, a := range addrs {
ep := resolver.Endpoint{Addresses: []resolver.Address{a}, Attributes: a.BalancerAttributes}
ep.Addresses[0].BalancerAttributes = nil
endpoints = append(endpoints, ep)
}
return endpoints
}

View file

@ -870,13 +870,19 @@ func decompress(compressor encoding.Compressor, d mem.BufferSlice, dc Decompress
return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the message: %v", err)
}
out, err := mem.ReadAll(io.LimitReader(dcReader, int64(maxReceiveMessageSize)), pool)
// Read at most one byte more than the limit from the decompressor.
// Unless the limit is MaxInt64, in which case, that's impossible, so
// apply no limit.
if limit := int64(maxReceiveMessageSize); limit < math.MaxInt64 {
dcReader = io.LimitReader(dcReader, limit+1)
}
out, err := mem.ReadAll(dcReader, pool)
if err != nil {
out.Free()
return nil, status.Errorf(codes.Internal, "grpc: failed to read decompressed data: %v", err)
}
if out.Len() == maxReceiveMessageSize && !atEOF(dcReader) {
if out.Len() > maxReceiveMessageSize {
out.Free()
return nil, status.Errorf(codes.ResourceExhausted, "grpc: received message after decompression larger than max %d", maxReceiveMessageSize)
}
@ -885,12 +891,6 @@ func decompress(compressor encoding.Compressor, d mem.BufferSlice, dc Decompress
return nil, status.Errorf(codes.Internal, "grpc: no decompressor available for compressed payload")
}
// atEOF reads data from r and returns true if zero bytes could be read and r.Read returns EOF.
func atEOF(dcReader io.Reader) bool {
n, err := dcReader.Read(make([]byte, 1))
return n == 0 && err == io.EOF
}
type recvCompressor interface {
RecvCompress() string
}

View file

@ -36,7 +36,12 @@ type RPCStats interface {
IsClient() bool
}
// Begin contains stats when an RPC attempt begins.
// Begin contains stats for the start of an RPC attempt.
//
// - Server-side: Triggered after `InHeader`, as headers are processed
// before the RPC lifecycle begins.
// - Client-side: The first stats event recorded.
//
// FailFast is only valid if this Begin is from client side.
type Begin struct {
// Client is true if this Begin is from client side.
@ -69,7 +74,7 @@ func (*PickerUpdated) IsClient() bool { return true }
func (*PickerUpdated) isRPCStats() {}
// InPayload contains the information for an incoming payload.
// InPayload contains stats about an incoming payload.
type InPayload struct {
// Client is true if this InPayload is from client side.
Client bool
@ -98,7 +103,9 @@ func (s *InPayload) IsClient() bool { return s.Client }
func (s *InPayload) isRPCStats() {}
// InHeader contains stats when a header is received.
// InHeader contains stats about header reception.
//
// - Server-side: The first stats event after the RPC request is received.
type InHeader struct {
// Client is true if this InHeader is from client side.
Client bool
@ -123,7 +130,7 @@ func (s *InHeader) IsClient() bool { return s.Client }
func (s *InHeader) isRPCStats() {}
// InTrailer contains stats when a trailer is received.
// InTrailer contains stats about trailer reception.
type InTrailer struct {
// Client is true if this InTrailer is from client side.
Client bool
@ -139,7 +146,7 @@ func (s *InTrailer) IsClient() bool { return s.Client }
func (s *InTrailer) isRPCStats() {}
// OutPayload contains the information for an outgoing payload.
// OutPayload contains stats about an outgoing payload.
type OutPayload struct {
// Client is true if this OutPayload is from client side.
Client bool
@ -166,7 +173,10 @@ func (s *OutPayload) IsClient() bool { return s.Client }
func (s *OutPayload) isRPCStats() {}
// OutHeader contains stats when a header is sent.
// OutHeader contains stats about header transmission.
//
// - Client-side: Only occurs after 'Begin', as headers are always the first
// thing sent on a stream.
type OutHeader struct {
// Client is true if this OutHeader is from client side.
Client bool
@ -189,14 +199,15 @@ func (s *OutHeader) IsClient() bool { return s.Client }
func (s *OutHeader) isRPCStats() {}
// OutTrailer contains stats when a trailer is sent.
// OutTrailer contains stats about trailer transmission.
type OutTrailer struct {
// Client is true if this OutTrailer is from client side.
Client bool
// WireLength is the wire length of trailer.
//
// Deprecated: This field is never set. The length is not known when this message is
// emitted because the trailer fields are compressed with hpack after that.
// Deprecated: This field is never set. The length is not known when this
// message is emitted because the trailer fields are compressed with hpack
// after that.
WireLength int
// Trailer contains the trailer metadata sent to the client. This
// field is only valid if this OutTrailer is from the server side.
@ -208,7 +219,7 @@ func (s *OutTrailer) IsClient() bool { return s.Client }
func (s *OutTrailer) isRPCStats() {}
// End contains stats when an RPC ends.
// End contains stats about RPC completion.
type End struct {
// Client is true if this End is from client side.
Client bool
@ -238,7 +249,7 @@ type ConnStats interface {
IsClient() bool
}
// ConnBegin contains the stats of a connection when it is established.
// ConnBegin contains stats about connection establishment.
type ConnBegin struct {
// Client is true if this ConnBegin is from client side.
Client bool
@ -249,7 +260,7 @@ func (s *ConnBegin) IsClient() bool { return s.Client }
func (s *ConnBegin) isConnStats() {}
// ConnEnd contains the stats of a connection when it ends.
// ConnEnd contains stats about connection termination.
type ConnEnd struct {
// Client is true if this ConnEnd is from client side.
Client bool

View file

@ -19,4 +19,4 @@
package grpc
// Version is the current grpc version.
const Version = "1.71.0"
const Version = "1.72.1"