mirror of
				https://github.com/superseriousbusiness/gotosocial.git
				synced 2025-10-31 05:52:25 -05:00 
			
		
		
		
	feat: initial tracing support (#1623)
This commit is contained in:
		
					parent
					
						
							
								878ed48de3
							
						
					
				
			
			
				commit
				
					
						6392e00653
					
				
			
		
					 472 changed files with 102600 additions and 12 deletions
				
			
		
							
								
								
									
										189
									
								
								vendor/google.golang.org/grpc/internal/binarylog/binarylog.go
									
										
									
										generated
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										189
									
								
								vendor/google.golang.org/grpc/internal/binarylog/binarylog.go
									
										
									
										generated
									
									
										vendored
									
									
										Normal file
									
								
							|  | @ -0,0 +1,189 @@ | |||
| /* | ||||
|  * | ||||
|  * Copyright 2018 gRPC authors. | ||||
|  * | ||||
|  * Licensed under the Apache License, Version 2.0 (the "License"); | ||||
|  * you may not use this file except in compliance with the License. | ||||
|  * You may obtain a copy of the License at | ||||
|  * | ||||
|  *     http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  * | ||||
|  * Unless required by applicable law or agreed to in writing, software | ||||
|  * distributed under the License is distributed on an "AS IS" BASIS, | ||||
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
|  * See the License for the specific language governing permissions and | ||||
|  * limitations under the License. | ||||
|  * | ||||
|  */ | ||||
| 
 | ||||
| // Package binarylog implementation binary logging as defined in | ||||
| // https://github.com/grpc/proposal/blob/master/A16-binary-logging.md. | ||||
| package binarylog | ||||
| 
 | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"os" | ||||
| 
 | ||||
| 	"google.golang.org/grpc/grpclog" | ||||
| 	"google.golang.org/grpc/internal/grpcutil" | ||||
| ) | ||||
| 
 | ||||
| // Logger is the global binary logger. It can be used to get binary logger for | ||||
| // each method. | ||||
| type Logger interface { | ||||
| 	GetMethodLogger(methodName string) MethodLogger | ||||
| } | ||||
| 
 | ||||
| // binLogger is the global binary logger for the binary. One of this should be | ||||
| // built at init time from the configuration (environment variable or flags). | ||||
| // | ||||
| // It is used to get a MethodLogger for each individual method. | ||||
| var binLogger Logger | ||||
| 
 | ||||
| var grpclogLogger = grpclog.Component("binarylog") | ||||
| 
 | ||||
| // SetLogger sets the binary logger. | ||||
| // | ||||
| // Only call this at init time. | ||||
| func SetLogger(l Logger) { | ||||
| 	binLogger = l | ||||
| } | ||||
| 
 | ||||
| // GetLogger gets the binary logger. | ||||
| // | ||||
| // Only call this at init time. | ||||
| func GetLogger() Logger { | ||||
| 	return binLogger | ||||
| } | ||||
| 
 | ||||
| // GetMethodLogger returns the MethodLogger for the given methodName. | ||||
| // | ||||
| // methodName should be in the format of "/service/method". | ||||
| // | ||||
| // Each MethodLogger returned by this method is a new instance. This is to | ||||
| // generate sequence id within the call. | ||||
| func GetMethodLogger(methodName string) MethodLogger { | ||||
| 	if binLogger == nil { | ||||
| 		return nil | ||||
| 	} | ||||
| 	return binLogger.GetMethodLogger(methodName) | ||||
| } | ||||
| 
 | ||||
| func init() { | ||||
| 	const envStr = "GRPC_BINARY_LOG_FILTER" | ||||
| 	configStr := os.Getenv(envStr) | ||||
| 	binLogger = NewLoggerFromConfigString(configStr) | ||||
| } | ||||
| 
 | ||||
| // MethodLoggerConfig contains the setting for logging behavior of a method | ||||
| // logger. Currently, it contains the max length of header and message. | ||||
| type MethodLoggerConfig struct { | ||||
| 	// Max length of header and message. | ||||
| 	Header, Message uint64 | ||||
| } | ||||
| 
 | ||||
| // LoggerConfig contains the config for loggers to create method loggers. | ||||
| type LoggerConfig struct { | ||||
| 	All      *MethodLoggerConfig | ||||
| 	Services map[string]*MethodLoggerConfig | ||||
| 	Methods  map[string]*MethodLoggerConfig | ||||
| 
 | ||||
| 	Blacklist map[string]struct{} | ||||
| } | ||||
| 
 | ||||
| type logger struct { | ||||
| 	config LoggerConfig | ||||
| } | ||||
| 
 | ||||
| // NewLoggerFromConfig builds a logger with the given LoggerConfig. | ||||
| func NewLoggerFromConfig(config LoggerConfig) Logger { | ||||
| 	return &logger{config: config} | ||||
| } | ||||
| 
 | ||||
| // newEmptyLogger creates an empty logger. The map fields need to be filled in | ||||
| // using the set* functions. | ||||
| func newEmptyLogger() *logger { | ||||
| 	return &logger{} | ||||
| } | ||||
| 
 | ||||
| // Set method logger for "*". | ||||
| func (l *logger) setDefaultMethodLogger(ml *MethodLoggerConfig) error { | ||||
| 	if l.config.All != nil { | ||||
| 		return fmt.Errorf("conflicting global rules found") | ||||
| 	} | ||||
| 	l.config.All = ml | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| // Set method logger for "service/*". | ||||
| // | ||||
| // New MethodLogger with same service overrides the old one. | ||||
| func (l *logger) setServiceMethodLogger(service string, ml *MethodLoggerConfig) error { | ||||
| 	if _, ok := l.config.Services[service]; ok { | ||||
| 		return fmt.Errorf("conflicting service rules for service %v found", service) | ||||
| 	} | ||||
| 	if l.config.Services == nil { | ||||
| 		l.config.Services = make(map[string]*MethodLoggerConfig) | ||||
| 	} | ||||
| 	l.config.Services[service] = ml | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| // Set method logger for "service/method". | ||||
| // | ||||
| // New MethodLogger with same method overrides the old one. | ||||
| func (l *logger) setMethodMethodLogger(method string, ml *MethodLoggerConfig) error { | ||||
| 	if _, ok := l.config.Blacklist[method]; ok { | ||||
| 		return fmt.Errorf("conflicting blacklist rules for method %v found", method) | ||||
| 	} | ||||
| 	if _, ok := l.config.Methods[method]; ok { | ||||
| 		return fmt.Errorf("conflicting method rules for method %v found", method) | ||||
| 	} | ||||
| 	if l.config.Methods == nil { | ||||
| 		l.config.Methods = make(map[string]*MethodLoggerConfig) | ||||
| 	} | ||||
| 	l.config.Methods[method] = ml | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| // Set blacklist method for "-service/method". | ||||
| func (l *logger) setBlacklist(method string) error { | ||||
| 	if _, ok := l.config.Blacklist[method]; ok { | ||||
| 		return fmt.Errorf("conflicting blacklist rules for method %v found", method) | ||||
| 	} | ||||
| 	if _, ok := l.config.Methods[method]; ok { | ||||
| 		return fmt.Errorf("conflicting method rules for method %v found", method) | ||||
| 	} | ||||
| 	if l.config.Blacklist == nil { | ||||
| 		l.config.Blacklist = make(map[string]struct{}) | ||||
| 	} | ||||
| 	l.config.Blacklist[method] = struct{}{} | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| // getMethodLogger returns the MethodLogger for the given methodName. | ||||
| // | ||||
| // methodName should be in the format of "/service/method". | ||||
| // | ||||
| // Each MethodLogger returned by this method is a new instance. This is to | ||||
| // generate sequence id within the call. | ||||
| func (l *logger) GetMethodLogger(methodName string) MethodLogger { | ||||
| 	s, m, err := grpcutil.ParseMethod(methodName) | ||||
| 	if err != nil { | ||||
| 		grpclogLogger.Infof("binarylogging: failed to parse %q: %v", methodName, err) | ||||
| 		return nil | ||||
| 	} | ||||
| 	if ml, ok := l.config.Methods[s+"/"+m]; ok { | ||||
| 		return NewTruncatingMethodLogger(ml.Header, ml.Message) | ||||
| 	} | ||||
| 	if _, ok := l.config.Blacklist[s+"/"+m]; ok { | ||||
| 		return nil | ||||
| 	} | ||||
| 	if ml, ok := l.config.Services[s]; ok { | ||||
| 		return NewTruncatingMethodLogger(ml.Header, ml.Message) | ||||
| 	} | ||||
| 	if l.config.All == nil { | ||||
| 		return nil | ||||
| 	} | ||||
| 	return NewTruncatingMethodLogger(l.config.All.Header, l.config.All.Message) | ||||
| } | ||||
							
								
								
									
										42
									
								
								vendor/google.golang.org/grpc/internal/binarylog/binarylog_testutil.go
									
										
									
										generated
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										42
									
								
								vendor/google.golang.org/grpc/internal/binarylog/binarylog_testutil.go
									
										
									
										generated
									
									
										vendored
									
									
										Normal file
									
								
							|  | @ -0,0 +1,42 @@ | |||
| /* | ||||
|  * | ||||
|  * Copyright 2018 gRPC authors. | ||||
|  * | ||||
|  * Licensed under the Apache License, Version 2.0 (the "License"); | ||||
|  * you may not use this file except in compliance with the License. | ||||
|  * You may obtain a copy of the License at | ||||
|  * | ||||
|  *     http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  * | ||||
|  * Unless required by applicable law or agreed to in writing, software | ||||
|  * distributed under the License is distributed on an "AS IS" BASIS, | ||||
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
|  * See the License for the specific language governing permissions and | ||||
|  * limitations under the License. | ||||
|  * | ||||
|  */ | ||||
| 
 | ||||
| // This file contains exported variables/functions that are exported for testing | ||||
| // only. | ||||
| // | ||||
| // An ideal way for this would be to put those in a *_test.go but in binarylog | ||||
| // package. But this doesn't work with staticcheck with go module. Error was: | ||||
| // "MdToMetadataProto not declared by package binarylog". This could be caused | ||||
| // by the way staticcheck looks for files for a certain package, which doesn't | ||||
| // support *_test.go files. | ||||
| // | ||||
| // Move those to binary_test.go when staticcheck is fixed. | ||||
| 
 | ||||
| package binarylog | ||||
| 
 | ||||
| var ( | ||||
| 	// AllLogger is a logger that logs all headers/messages for all RPCs. It's | ||||
| 	// for testing only. | ||||
| 	AllLogger = NewLoggerFromConfigString("*") | ||||
| 	// MdToMetadataProto converts metadata to a binary logging proto message. | ||||
| 	// It's for testing only. | ||||
| 	MdToMetadataProto = mdToMetadataProto | ||||
| 	// AddrToProto converts an address to a binary logging proto message. It's | ||||
| 	// for testing only. | ||||
| 	AddrToProto = addrToProto | ||||
| ) | ||||
							
								
								
									
										208
									
								
								vendor/google.golang.org/grpc/internal/binarylog/env_config.go
									
										
									
										generated
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										208
									
								
								vendor/google.golang.org/grpc/internal/binarylog/env_config.go
									
										
									
										generated
									
									
										vendored
									
									
										Normal file
									
								
							|  | @ -0,0 +1,208 @@ | |||
| /* | ||||
|  * | ||||
|  * Copyright 2018 gRPC authors. | ||||
|  * | ||||
|  * Licensed under the Apache License, Version 2.0 (the "License"); | ||||
|  * you may not use this file except in compliance with the License. | ||||
|  * You may obtain a copy of the License at | ||||
|  * | ||||
|  *     http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  * | ||||
|  * Unless required by applicable law or agreed to in writing, software | ||||
|  * distributed under the License is distributed on an "AS IS" BASIS, | ||||
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
|  * See the License for the specific language governing permissions and | ||||
|  * limitations under the License. | ||||
|  * | ||||
|  */ | ||||
| 
 | ||||
| package binarylog | ||||
| 
 | ||||
| import ( | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"regexp" | ||||
| 	"strconv" | ||||
| 	"strings" | ||||
| ) | ||||
| 
 | ||||
| // NewLoggerFromConfigString reads the string and build a logger. It can be used | ||||
| // to build a new logger and assign it to binarylog.Logger. | ||||
| // | ||||
| // Example filter config strings: | ||||
| //   - "" Nothing will be logged | ||||
| //   - "*" All headers and messages will be fully logged. | ||||
| //   - "*{h}" Only headers will be logged. | ||||
| //   - "*{m:256}" Only the first 256 bytes of each message will be logged. | ||||
| //   - "Foo/*" Logs every method in service Foo | ||||
| //   - "Foo/*,-Foo/Bar" Logs every method in service Foo except method /Foo/Bar | ||||
| //   - "Foo/*,Foo/Bar{m:256}" Logs the first 256 bytes of each message in method | ||||
| //     /Foo/Bar, logs all headers and messages in every other method in service | ||||
| //     Foo. | ||||
| // | ||||
| // If two configs exist for one certain method or service, the one specified | ||||
| // later overrides the previous config. | ||||
| func NewLoggerFromConfigString(s string) Logger { | ||||
| 	if s == "" { | ||||
| 		return nil | ||||
| 	} | ||||
| 	l := newEmptyLogger() | ||||
| 	methods := strings.Split(s, ",") | ||||
| 	for _, method := range methods { | ||||
| 		if err := l.fillMethodLoggerWithConfigString(method); err != nil { | ||||
| 			grpclogLogger.Warningf("failed to parse binary log config: %v", err) | ||||
| 			return nil | ||||
| 		} | ||||
| 	} | ||||
| 	return l | ||||
| } | ||||
| 
 | ||||
| // fillMethodLoggerWithConfigString parses config, creates TruncatingMethodLogger and adds | ||||
| // it to the right map in the logger. | ||||
| func (l *logger) fillMethodLoggerWithConfigString(config string) error { | ||||
| 	// "" is invalid. | ||||
| 	if config == "" { | ||||
| 		return errors.New("empty string is not a valid method binary logging config") | ||||
| 	} | ||||
| 
 | ||||
| 	// "-service/method", blacklist, no * or {} allowed. | ||||
| 	if config[0] == '-' { | ||||
| 		s, m, suffix, err := parseMethodConfigAndSuffix(config[1:]) | ||||
| 		if err != nil { | ||||
| 			return fmt.Errorf("invalid config: %q, %v", config, err) | ||||
| 		} | ||||
| 		if m == "*" { | ||||
| 			return fmt.Errorf("invalid config: %q, %v", config, "* not allowed in blacklist config") | ||||
| 		} | ||||
| 		if suffix != "" { | ||||
| 			return fmt.Errorf("invalid config: %q, %v", config, "header/message limit not allowed in blacklist config") | ||||
| 		} | ||||
| 		if err := l.setBlacklist(s + "/" + m); err != nil { | ||||
| 			return fmt.Errorf("invalid config: %v", err) | ||||
| 		} | ||||
| 		return nil | ||||
| 	} | ||||
| 
 | ||||
| 	// "*{h:256;m:256}" | ||||
| 	if config[0] == '*' { | ||||
| 		hdr, msg, err := parseHeaderMessageLengthConfig(config[1:]) | ||||
| 		if err != nil { | ||||
| 			return fmt.Errorf("invalid config: %q, %v", config, err) | ||||
| 		} | ||||
| 		if err := l.setDefaultMethodLogger(&MethodLoggerConfig{Header: hdr, Message: msg}); err != nil { | ||||
| 			return fmt.Errorf("invalid config: %v", err) | ||||
| 		} | ||||
| 		return nil | ||||
| 	} | ||||
| 
 | ||||
| 	s, m, suffix, err := parseMethodConfigAndSuffix(config) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("invalid config: %q, %v", config, err) | ||||
| 	} | ||||
| 	hdr, msg, err := parseHeaderMessageLengthConfig(suffix) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("invalid header/message length config: %q, %v", suffix, err) | ||||
| 	} | ||||
| 	if m == "*" { | ||||
| 		if err := l.setServiceMethodLogger(s, &MethodLoggerConfig{Header: hdr, Message: msg}); err != nil { | ||||
| 			return fmt.Errorf("invalid config: %v", err) | ||||
| 		} | ||||
| 	} else { | ||||
| 		if err := l.setMethodMethodLogger(s+"/"+m, &MethodLoggerConfig{Header: hdr, Message: msg}); err != nil { | ||||
| 			return fmt.Errorf("invalid config: %v", err) | ||||
| 		} | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| const ( | ||||
| 	// TODO: this const is only used by env_config now. But could be useful for | ||||
| 	// other config. Move to binarylog.go if necessary. | ||||
| 	maxUInt = ^uint64(0) | ||||
| 
 | ||||
| 	// For "p.s/m" plus any suffix. Suffix will be parsed again. See test for | ||||
| 	// expected output. | ||||
| 	longMethodConfigRegexpStr = `^([\w./]+)/((?:\w+)|[*])(.+)?$` | ||||
| 
 | ||||
| 	// For suffix from above, "{h:123,m:123}". See test for expected output. | ||||
| 	optionalLengthRegexpStr      = `(?::(\d+))?` // Optional ":123". | ||||
| 	headerConfigRegexpStr        = `^{h` + optionalLengthRegexpStr + `}$` | ||||
| 	messageConfigRegexpStr       = `^{m` + optionalLengthRegexpStr + `}$` | ||||
| 	headerMessageConfigRegexpStr = `^{h` + optionalLengthRegexpStr + `;m` + optionalLengthRegexpStr + `}$` | ||||
| ) | ||||
| 
 | ||||
| var ( | ||||
| 	longMethodConfigRegexp    = regexp.MustCompile(longMethodConfigRegexpStr) | ||||
| 	headerConfigRegexp        = regexp.MustCompile(headerConfigRegexpStr) | ||||
| 	messageConfigRegexp       = regexp.MustCompile(messageConfigRegexpStr) | ||||
| 	headerMessageConfigRegexp = regexp.MustCompile(headerMessageConfigRegexpStr) | ||||
| ) | ||||
| 
 | ||||
| // Turn "service/method{h;m}" into "service", "method", "{h;m}". | ||||
| func parseMethodConfigAndSuffix(c string) (service, method, suffix string, _ error) { | ||||
| 	// Regexp result: | ||||
| 	// | ||||
| 	// in:  "p.s/m{h:123,m:123}", | ||||
| 	// out: []string{"p.s/m{h:123,m:123}", "p.s", "m", "{h:123,m:123}"}, | ||||
| 	match := longMethodConfigRegexp.FindStringSubmatch(c) | ||||
| 	if match == nil { | ||||
| 		return "", "", "", fmt.Errorf("%q contains invalid substring", c) | ||||
| 	} | ||||
| 	service = match[1] | ||||
| 	method = match[2] | ||||
| 	suffix = match[3] | ||||
| 	return | ||||
| } | ||||
| 
 | ||||
| // Turn "{h:123;m:345}" into 123, 345. | ||||
| // | ||||
| // Return maxUInt if length is unspecified. | ||||
| func parseHeaderMessageLengthConfig(c string) (hdrLenStr, msgLenStr uint64, err error) { | ||||
| 	if c == "" { | ||||
| 		return maxUInt, maxUInt, nil | ||||
| 	} | ||||
| 	// Header config only. | ||||
| 	if match := headerConfigRegexp.FindStringSubmatch(c); match != nil { | ||||
| 		if s := match[1]; s != "" { | ||||
| 			hdrLenStr, err = strconv.ParseUint(s, 10, 64) | ||||
| 			if err != nil { | ||||
| 				return 0, 0, fmt.Errorf("failed to convert %q to uint", s) | ||||
| 			} | ||||
| 			return hdrLenStr, 0, nil | ||||
| 		} | ||||
| 		return maxUInt, 0, nil | ||||
| 	} | ||||
| 
 | ||||
| 	// Message config only. | ||||
| 	if match := messageConfigRegexp.FindStringSubmatch(c); match != nil { | ||||
| 		if s := match[1]; s != "" { | ||||
| 			msgLenStr, err = strconv.ParseUint(s, 10, 64) | ||||
| 			if err != nil { | ||||
| 				return 0, 0, fmt.Errorf("failed to convert %q to uint", s) | ||||
| 			} | ||||
| 			return 0, msgLenStr, nil | ||||
| 		} | ||||
| 		return 0, maxUInt, nil | ||||
| 	} | ||||
| 
 | ||||
| 	// Header and message config both. | ||||
| 	if match := headerMessageConfigRegexp.FindStringSubmatch(c); match != nil { | ||||
| 		// Both hdr and msg are specified, but one or two of them might be empty. | ||||
| 		hdrLenStr = maxUInt | ||||
| 		msgLenStr = maxUInt | ||||
| 		if s := match[1]; s != "" { | ||||
| 			hdrLenStr, err = strconv.ParseUint(s, 10, 64) | ||||
| 			if err != nil { | ||||
| 				return 0, 0, fmt.Errorf("failed to convert %q to uint", s) | ||||
| 			} | ||||
| 		} | ||||
| 		if s := match[2]; s != "" { | ||||
| 			msgLenStr, err = strconv.ParseUint(s, 10, 64) | ||||
| 			if err != nil { | ||||
| 				return 0, 0, fmt.Errorf("failed to convert %q to uint", s) | ||||
| 			} | ||||
| 		} | ||||
| 		return hdrLenStr, msgLenStr, nil | ||||
| 	} | ||||
| 	return 0, 0, fmt.Errorf("%q contains invalid substring", c) | ||||
| } | ||||
							
								
								
									
										435
									
								
								vendor/google.golang.org/grpc/internal/binarylog/method_logger.go
									
										
									
										generated
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										435
									
								
								vendor/google.golang.org/grpc/internal/binarylog/method_logger.go
									
										
									
										generated
									
									
										vendored
									
									
										Normal file
									
								
							|  | @ -0,0 +1,435 @@ | |||
| /* | ||||
|  * | ||||
|  * Copyright 2018 gRPC authors. | ||||
|  * | ||||
|  * Licensed under the Apache License, Version 2.0 (the "License"); | ||||
|  * you may not use this file except in compliance with the License. | ||||
|  * You may obtain a copy of the License at | ||||
|  * | ||||
|  *     http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  * | ||||
|  * Unless required by applicable law or agreed to in writing, software | ||||
|  * distributed under the License is distributed on an "AS IS" BASIS, | ||||
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
|  * See the License for the specific language governing permissions and | ||||
|  * limitations under the License. | ||||
|  * | ||||
|  */ | ||||
| 
 | ||||
| package binarylog | ||||
| 
 | ||||
| import ( | ||||
| 	"net" | ||||
| 	"strings" | ||||
| 	"sync/atomic" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/golang/protobuf/proto" | ||||
| 	"github.com/golang/protobuf/ptypes" | ||||
| 	binlogpb "google.golang.org/grpc/binarylog/grpc_binarylog_v1" | ||||
| 	"google.golang.org/grpc/metadata" | ||||
| 	"google.golang.org/grpc/status" | ||||
| ) | ||||
| 
 | ||||
| type callIDGenerator struct { | ||||
| 	id uint64 | ||||
| } | ||||
| 
 | ||||
| func (g *callIDGenerator) next() uint64 { | ||||
| 	id := atomic.AddUint64(&g.id, 1) | ||||
| 	return id | ||||
| } | ||||
| 
 | ||||
| // reset is for testing only, and doesn't need to be thread safe. | ||||
| func (g *callIDGenerator) reset() { | ||||
| 	g.id = 0 | ||||
| } | ||||
| 
 | ||||
| var idGen callIDGenerator | ||||
| 
 | ||||
| // MethodLogger is the sub-logger for each method. | ||||
| type MethodLogger interface { | ||||
| 	Log(LogEntryConfig) | ||||
| } | ||||
| 
 | ||||
| // TruncatingMethodLogger is a method logger that truncates headers and messages | ||||
| // based on configured fields. | ||||
| type TruncatingMethodLogger struct { | ||||
| 	headerMaxLen, messageMaxLen uint64 | ||||
| 
 | ||||
| 	callID          uint64 | ||||
| 	idWithinCallGen *callIDGenerator | ||||
| 
 | ||||
| 	sink Sink // TODO(blog): make this plugable. | ||||
| } | ||||
| 
 | ||||
| // NewTruncatingMethodLogger returns a new truncating method logger. | ||||
| func NewTruncatingMethodLogger(h, m uint64) *TruncatingMethodLogger { | ||||
| 	return &TruncatingMethodLogger{ | ||||
| 		headerMaxLen:  h, | ||||
| 		messageMaxLen: m, | ||||
| 
 | ||||
| 		callID:          idGen.next(), | ||||
| 		idWithinCallGen: &callIDGenerator{}, | ||||
| 
 | ||||
| 		sink: DefaultSink, // TODO(blog): make it plugable. | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| // Build is an internal only method for building the proto message out of the | ||||
| // input event. It's made public to enable other library to reuse as much logic | ||||
| // in TruncatingMethodLogger as possible. | ||||
| func (ml *TruncatingMethodLogger) Build(c LogEntryConfig) *binlogpb.GrpcLogEntry { | ||||
| 	m := c.toProto() | ||||
| 	timestamp, _ := ptypes.TimestampProto(time.Now()) | ||||
| 	m.Timestamp = timestamp | ||||
| 	m.CallId = ml.callID | ||||
| 	m.SequenceIdWithinCall = ml.idWithinCallGen.next() | ||||
| 
 | ||||
| 	switch pay := m.Payload.(type) { | ||||
| 	case *binlogpb.GrpcLogEntry_ClientHeader: | ||||
| 		m.PayloadTruncated = ml.truncateMetadata(pay.ClientHeader.GetMetadata()) | ||||
| 	case *binlogpb.GrpcLogEntry_ServerHeader: | ||||
| 		m.PayloadTruncated = ml.truncateMetadata(pay.ServerHeader.GetMetadata()) | ||||
| 	case *binlogpb.GrpcLogEntry_Message: | ||||
| 		m.PayloadTruncated = ml.truncateMessage(pay.Message) | ||||
| 	} | ||||
| 	return m | ||||
| } | ||||
| 
 | ||||
| // Log creates a proto binary log entry, and logs it to the sink. | ||||
| func (ml *TruncatingMethodLogger) Log(c LogEntryConfig) { | ||||
| 	ml.sink.Write(ml.Build(c)) | ||||
| } | ||||
| 
 | ||||
| func (ml *TruncatingMethodLogger) truncateMetadata(mdPb *binlogpb.Metadata) (truncated bool) { | ||||
| 	if ml.headerMaxLen == maxUInt { | ||||
| 		return false | ||||
| 	} | ||||
| 	var ( | ||||
| 		bytesLimit = ml.headerMaxLen | ||||
| 		index      int | ||||
| 	) | ||||
| 	// At the end of the loop, index will be the first entry where the total | ||||
| 	// size is greater than the limit: | ||||
| 	// | ||||
| 	// len(entry[:index]) <= ml.hdr && len(entry[:index+1]) > ml.hdr. | ||||
| 	for ; index < len(mdPb.Entry); index++ { | ||||
| 		entry := mdPb.Entry[index] | ||||
| 		if entry.Key == "grpc-trace-bin" { | ||||
| 			// "grpc-trace-bin" is a special key. It's kept in the log entry, | ||||
| 			// but not counted towards the size limit. | ||||
| 			continue | ||||
| 		} | ||||
| 		currentEntryLen := uint64(len(entry.GetKey())) + uint64(len(entry.GetValue())) | ||||
| 		if currentEntryLen > bytesLimit { | ||||
| 			break | ||||
| 		} | ||||
| 		bytesLimit -= currentEntryLen | ||||
| 	} | ||||
| 	truncated = index < len(mdPb.Entry) | ||||
| 	mdPb.Entry = mdPb.Entry[:index] | ||||
| 	return truncated | ||||
| } | ||||
| 
 | ||||
| func (ml *TruncatingMethodLogger) truncateMessage(msgPb *binlogpb.Message) (truncated bool) { | ||||
| 	if ml.messageMaxLen == maxUInt { | ||||
| 		return false | ||||
| 	} | ||||
| 	if ml.messageMaxLen >= uint64(len(msgPb.Data)) { | ||||
| 		return false | ||||
| 	} | ||||
| 	msgPb.Data = msgPb.Data[:ml.messageMaxLen] | ||||
| 	return true | ||||
| } | ||||
| 
 | ||||
| // LogEntryConfig represents the configuration for binary log entry. | ||||
| type LogEntryConfig interface { | ||||
| 	toProto() *binlogpb.GrpcLogEntry | ||||
| } | ||||
| 
 | ||||
| // ClientHeader configs the binary log entry to be a ClientHeader entry. | ||||
| type ClientHeader struct { | ||||
| 	OnClientSide bool | ||||
| 	Header       metadata.MD | ||||
| 	MethodName   string | ||||
| 	Authority    string | ||||
| 	Timeout      time.Duration | ||||
| 	// PeerAddr is required only when it's on server side. | ||||
| 	PeerAddr net.Addr | ||||
| } | ||||
| 
 | ||||
| func (c *ClientHeader) toProto() *binlogpb.GrpcLogEntry { | ||||
| 	// This function doesn't need to set all the fields (e.g. seq ID). The Log | ||||
| 	// function will set the fields when necessary. | ||||
| 	clientHeader := &binlogpb.ClientHeader{ | ||||
| 		Metadata:   mdToMetadataProto(c.Header), | ||||
| 		MethodName: c.MethodName, | ||||
| 		Authority:  c.Authority, | ||||
| 	} | ||||
| 	if c.Timeout > 0 { | ||||
| 		clientHeader.Timeout = ptypes.DurationProto(c.Timeout) | ||||
| 	} | ||||
| 	ret := &binlogpb.GrpcLogEntry{ | ||||
| 		Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_HEADER, | ||||
| 		Payload: &binlogpb.GrpcLogEntry_ClientHeader{ | ||||
| 			ClientHeader: clientHeader, | ||||
| 		}, | ||||
| 	} | ||||
| 	if c.OnClientSide { | ||||
| 		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT | ||||
| 	} else { | ||||
| 		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER | ||||
| 	} | ||||
| 	if c.PeerAddr != nil { | ||||
| 		ret.Peer = addrToProto(c.PeerAddr) | ||||
| 	} | ||||
| 	return ret | ||||
| } | ||||
| 
 | ||||
| // ServerHeader configs the binary log entry to be a ServerHeader entry. | ||||
| type ServerHeader struct { | ||||
| 	OnClientSide bool | ||||
| 	Header       metadata.MD | ||||
| 	// PeerAddr is required only when it's on client side. | ||||
| 	PeerAddr net.Addr | ||||
| } | ||||
| 
 | ||||
| func (c *ServerHeader) toProto() *binlogpb.GrpcLogEntry { | ||||
| 	ret := &binlogpb.GrpcLogEntry{ | ||||
| 		Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_HEADER, | ||||
| 		Payload: &binlogpb.GrpcLogEntry_ServerHeader{ | ||||
| 			ServerHeader: &binlogpb.ServerHeader{ | ||||
| 				Metadata: mdToMetadataProto(c.Header), | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
| 	if c.OnClientSide { | ||||
| 		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT | ||||
| 	} else { | ||||
| 		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER | ||||
| 	} | ||||
| 	if c.PeerAddr != nil { | ||||
| 		ret.Peer = addrToProto(c.PeerAddr) | ||||
| 	} | ||||
| 	return ret | ||||
| } | ||||
| 
 | ||||
| // ClientMessage configs the binary log entry to be a ClientMessage entry. | ||||
| type ClientMessage struct { | ||||
| 	OnClientSide bool | ||||
| 	// Message can be a proto.Message or []byte. Other messages formats are not | ||||
| 	// supported. | ||||
| 	Message interface{} | ||||
| } | ||||
| 
 | ||||
| func (c *ClientMessage) toProto() *binlogpb.GrpcLogEntry { | ||||
| 	var ( | ||||
| 		data []byte | ||||
| 		err  error | ||||
| 	) | ||||
| 	if m, ok := c.Message.(proto.Message); ok { | ||||
| 		data, err = proto.Marshal(m) | ||||
| 		if err != nil { | ||||
| 			grpclogLogger.Infof("binarylogging: failed to marshal proto message: %v", err) | ||||
| 		} | ||||
| 	} else if b, ok := c.Message.([]byte); ok { | ||||
| 		data = b | ||||
| 	} else { | ||||
| 		grpclogLogger.Infof("binarylogging: message to log is neither proto.message nor []byte") | ||||
| 	} | ||||
| 	ret := &binlogpb.GrpcLogEntry{ | ||||
| 		Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_MESSAGE, | ||||
| 		Payload: &binlogpb.GrpcLogEntry_Message{ | ||||
| 			Message: &binlogpb.Message{ | ||||
| 				Length: uint32(len(data)), | ||||
| 				Data:   data, | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
| 	if c.OnClientSide { | ||||
| 		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT | ||||
| 	} else { | ||||
| 		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER | ||||
| 	} | ||||
| 	return ret | ||||
| } | ||||
| 
 | ||||
| // ServerMessage configs the binary log entry to be a ServerMessage entry. | ||||
| type ServerMessage struct { | ||||
| 	OnClientSide bool | ||||
| 	// Message can be a proto.Message or []byte. Other messages formats are not | ||||
| 	// supported. | ||||
| 	Message interface{} | ||||
| } | ||||
| 
 | ||||
| func (c *ServerMessage) toProto() *binlogpb.GrpcLogEntry { | ||||
| 	var ( | ||||
| 		data []byte | ||||
| 		err  error | ||||
| 	) | ||||
| 	if m, ok := c.Message.(proto.Message); ok { | ||||
| 		data, err = proto.Marshal(m) | ||||
| 		if err != nil { | ||||
| 			grpclogLogger.Infof("binarylogging: failed to marshal proto message: %v", err) | ||||
| 		} | ||||
| 	} else if b, ok := c.Message.([]byte); ok { | ||||
| 		data = b | ||||
| 	} else { | ||||
| 		grpclogLogger.Infof("binarylogging: message to log is neither proto.message nor []byte") | ||||
| 	} | ||||
| 	ret := &binlogpb.GrpcLogEntry{ | ||||
| 		Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_MESSAGE, | ||||
| 		Payload: &binlogpb.GrpcLogEntry_Message{ | ||||
| 			Message: &binlogpb.Message{ | ||||
| 				Length: uint32(len(data)), | ||||
| 				Data:   data, | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
| 	if c.OnClientSide { | ||||
| 		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT | ||||
| 	} else { | ||||
| 		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER | ||||
| 	} | ||||
| 	return ret | ||||
| } | ||||
| 
 | ||||
| // ClientHalfClose configs the binary log entry to be a ClientHalfClose entry. | ||||
| type ClientHalfClose struct { | ||||
| 	OnClientSide bool | ||||
| } | ||||
| 
 | ||||
| func (c *ClientHalfClose) toProto() *binlogpb.GrpcLogEntry { | ||||
| 	ret := &binlogpb.GrpcLogEntry{ | ||||
| 		Type:    binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_HALF_CLOSE, | ||||
| 		Payload: nil, // No payload here. | ||||
| 	} | ||||
| 	if c.OnClientSide { | ||||
| 		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT | ||||
| 	} else { | ||||
| 		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER | ||||
| 	} | ||||
| 	return ret | ||||
| } | ||||
| 
 | ||||
| // ServerTrailer configs the binary log entry to be a ServerTrailer entry. | ||||
| type ServerTrailer struct { | ||||
| 	OnClientSide bool | ||||
| 	Trailer      metadata.MD | ||||
| 	// Err is the status error. | ||||
| 	Err error | ||||
| 	// PeerAddr is required only when it's on client side and the RPC is trailer | ||||
| 	// only. | ||||
| 	PeerAddr net.Addr | ||||
| } | ||||
| 
 | ||||
| func (c *ServerTrailer) toProto() *binlogpb.GrpcLogEntry { | ||||
| 	st, ok := status.FromError(c.Err) | ||||
| 	if !ok { | ||||
| 		grpclogLogger.Info("binarylogging: error in trailer is not a status error") | ||||
| 	} | ||||
| 	var ( | ||||
| 		detailsBytes []byte | ||||
| 		err          error | ||||
| 	) | ||||
| 	stProto := st.Proto() | ||||
| 	if stProto != nil && len(stProto.Details) != 0 { | ||||
| 		detailsBytes, err = proto.Marshal(stProto) | ||||
| 		if err != nil { | ||||
| 			grpclogLogger.Infof("binarylogging: failed to marshal status proto: %v", err) | ||||
| 		} | ||||
| 	} | ||||
| 	ret := &binlogpb.GrpcLogEntry{ | ||||
| 		Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_TRAILER, | ||||
| 		Payload: &binlogpb.GrpcLogEntry_Trailer{ | ||||
| 			Trailer: &binlogpb.Trailer{ | ||||
| 				Metadata:      mdToMetadataProto(c.Trailer), | ||||
| 				StatusCode:    uint32(st.Code()), | ||||
| 				StatusMessage: st.Message(), | ||||
| 				StatusDetails: detailsBytes, | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
| 	if c.OnClientSide { | ||||
| 		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT | ||||
| 	} else { | ||||
| 		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER | ||||
| 	} | ||||
| 	if c.PeerAddr != nil { | ||||
| 		ret.Peer = addrToProto(c.PeerAddr) | ||||
| 	} | ||||
| 	return ret | ||||
| } | ||||
| 
 | ||||
| // Cancel configs the binary log entry to be a Cancel entry. | ||||
| type Cancel struct { | ||||
| 	OnClientSide bool | ||||
| } | ||||
| 
 | ||||
| func (c *Cancel) toProto() *binlogpb.GrpcLogEntry { | ||||
| 	ret := &binlogpb.GrpcLogEntry{ | ||||
| 		Type:    binlogpb.GrpcLogEntry_EVENT_TYPE_CANCEL, | ||||
| 		Payload: nil, | ||||
| 	} | ||||
| 	if c.OnClientSide { | ||||
| 		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT | ||||
| 	} else { | ||||
| 		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER | ||||
| 	} | ||||
| 	return ret | ||||
| } | ||||
| 
 | ||||
| // metadataKeyOmit returns whether the metadata entry with this key should be | ||||
| // omitted. | ||||
| func metadataKeyOmit(key string) bool { | ||||
| 	switch key { | ||||
| 	case "lb-token", ":path", ":authority", "content-encoding", "content-type", "user-agent", "te": | ||||
| 		return true | ||||
| 	case "grpc-trace-bin": // grpc-trace-bin is special because it's visiable to users. | ||||
| 		return false | ||||
| 	} | ||||
| 	return strings.HasPrefix(key, "grpc-") | ||||
| } | ||||
| 
 | ||||
| func mdToMetadataProto(md metadata.MD) *binlogpb.Metadata { | ||||
| 	ret := &binlogpb.Metadata{} | ||||
| 	for k, vv := range md { | ||||
| 		if metadataKeyOmit(k) { | ||||
| 			continue | ||||
| 		} | ||||
| 		for _, v := range vv { | ||||
| 			ret.Entry = append(ret.Entry, | ||||
| 				&binlogpb.MetadataEntry{ | ||||
| 					Key:   k, | ||||
| 					Value: []byte(v), | ||||
| 				}, | ||||
| 			) | ||||
| 		} | ||||
| 	} | ||||
| 	return ret | ||||
| } | ||||
| 
 | ||||
| func addrToProto(addr net.Addr) *binlogpb.Address { | ||||
| 	ret := &binlogpb.Address{} | ||||
| 	switch a := addr.(type) { | ||||
| 	case *net.TCPAddr: | ||||
| 		if a.IP.To4() != nil { | ||||
| 			ret.Type = binlogpb.Address_TYPE_IPV4 | ||||
| 		} else if a.IP.To16() != nil { | ||||
| 			ret.Type = binlogpb.Address_TYPE_IPV6 | ||||
| 		} else { | ||||
| 			ret.Type = binlogpb.Address_TYPE_UNKNOWN | ||||
| 			// Do not set address and port fields. | ||||
| 			break | ||||
| 		} | ||||
| 		ret.Address = a.IP.String() | ||||
| 		ret.IpPort = uint32(a.Port) | ||||
| 	case *net.UnixAddr: | ||||
| 		ret.Type = binlogpb.Address_TYPE_UNIX | ||||
| 		ret.Address = a.String() | ||||
| 	default: | ||||
| 		ret.Type = binlogpb.Address_TYPE_UNKNOWN | ||||
| 	} | ||||
| 	return ret | ||||
| } | ||||
							
								
								
									
										170
									
								
								vendor/google.golang.org/grpc/internal/binarylog/sink.go
									
										
									
										generated
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										170
									
								
								vendor/google.golang.org/grpc/internal/binarylog/sink.go
									
										
									
										generated
									
									
										vendored
									
									
										Normal file
									
								
							|  | @ -0,0 +1,170 @@ | |||
| /* | ||||
|  * | ||||
|  * Copyright 2018 gRPC authors. | ||||
|  * | ||||
|  * Licensed under the Apache License, Version 2.0 (the "License"); | ||||
|  * you may not use this file except in compliance with the License. | ||||
|  * You may obtain a copy of the License at | ||||
|  * | ||||
|  *     http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  * | ||||
|  * Unless required by applicable law or agreed to in writing, software | ||||
|  * distributed under the License is distributed on an "AS IS" BASIS, | ||||
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
|  * See the License for the specific language governing permissions and | ||||
|  * limitations under the License. | ||||
|  * | ||||
|  */ | ||||
| 
 | ||||
| package binarylog | ||||
| 
 | ||||
| import ( | ||||
| 	"bufio" | ||||
| 	"encoding/binary" | ||||
| 	"io" | ||||
| 	"sync" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/golang/protobuf/proto" | ||||
| 	binlogpb "google.golang.org/grpc/binarylog/grpc_binarylog_v1" | ||||
| ) | ||||
| 
 | ||||
| var ( | ||||
| 	// DefaultSink is the sink where the logs will be written to. It's exported | ||||
| 	// for the binarylog package to update. | ||||
| 	DefaultSink Sink = &noopSink{} // TODO(blog): change this default (file in /tmp). | ||||
| ) | ||||
| 
 | ||||
| // Sink writes log entry into the binary log sink. | ||||
| // | ||||
| // sink is a copy of the exported binarylog.Sink, to avoid circular dependency. | ||||
| type Sink interface { | ||||
| 	// Write will be called to write the log entry into the sink. | ||||
| 	// | ||||
| 	// It should be thread-safe so it can be called in parallel. | ||||
| 	Write(*binlogpb.GrpcLogEntry) error | ||||
| 	// Close will be called when the Sink is replaced by a new Sink. | ||||
| 	Close() error | ||||
| } | ||||
| 
 | ||||
| type noopSink struct{} | ||||
| 
 | ||||
| func (ns *noopSink) Write(*binlogpb.GrpcLogEntry) error { return nil } | ||||
| func (ns *noopSink) Close() error                       { return nil } | ||||
| 
 | ||||
| // newWriterSink creates a binary log sink with the given writer. | ||||
| // | ||||
| // Write() marshals the proto message and writes it to the given writer. Each | ||||
| // message is prefixed with a 4 byte big endian unsigned integer as the length. | ||||
| // | ||||
| // No buffer is done, Close() doesn't try to close the writer. | ||||
| func newWriterSink(w io.Writer) Sink { | ||||
| 	return &writerSink{out: w} | ||||
| } | ||||
| 
 | ||||
| type writerSink struct { | ||||
| 	out io.Writer | ||||
| } | ||||
| 
 | ||||
| func (ws *writerSink) Write(e *binlogpb.GrpcLogEntry) error { | ||||
| 	b, err := proto.Marshal(e) | ||||
| 	if err != nil { | ||||
| 		grpclogLogger.Errorf("binary logging: failed to marshal proto message: %v", err) | ||||
| 		return err | ||||
| 	} | ||||
| 	hdr := make([]byte, 4) | ||||
| 	binary.BigEndian.PutUint32(hdr, uint32(len(b))) | ||||
| 	if _, err := ws.out.Write(hdr); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	if _, err := ws.out.Write(b); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (ws *writerSink) Close() error { return nil } | ||||
| 
 | ||||
| type bufferedSink struct { | ||||
| 	mu             sync.Mutex | ||||
| 	closer         io.Closer | ||||
| 	out            Sink          // out is built on buf. | ||||
| 	buf            *bufio.Writer // buf is kept for flush. | ||||
| 	flusherStarted bool | ||||
| 
 | ||||
| 	writeTicker *time.Ticker | ||||
| 	done        chan struct{} | ||||
| } | ||||
| 
 | ||||
| func (fs *bufferedSink) Write(e *binlogpb.GrpcLogEntry) error { | ||||
| 	fs.mu.Lock() | ||||
| 	defer fs.mu.Unlock() | ||||
| 	if !fs.flusherStarted { | ||||
| 		// Start the write loop when Write is called. | ||||
| 		fs.startFlushGoroutine() | ||||
| 		fs.flusherStarted = true | ||||
| 	} | ||||
| 	if err := fs.out.Write(e); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| const ( | ||||
| 	bufFlushDuration = 60 * time.Second | ||||
| ) | ||||
| 
 | ||||
| func (fs *bufferedSink) startFlushGoroutine() { | ||||
| 	fs.writeTicker = time.NewTicker(bufFlushDuration) | ||||
| 	go func() { | ||||
| 		for { | ||||
| 			select { | ||||
| 			case <-fs.done: | ||||
| 				return | ||||
| 			case <-fs.writeTicker.C: | ||||
| 			} | ||||
| 			fs.mu.Lock() | ||||
| 			if err := fs.buf.Flush(); err != nil { | ||||
| 				grpclogLogger.Warningf("failed to flush to Sink: %v", err) | ||||
| 			} | ||||
| 			fs.mu.Unlock() | ||||
| 		} | ||||
| 	}() | ||||
| } | ||||
| 
 | ||||
| func (fs *bufferedSink) Close() error { | ||||
| 	fs.mu.Lock() | ||||
| 	defer fs.mu.Unlock() | ||||
| 	if fs.writeTicker != nil { | ||||
| 		fs.writeTicker.Stop() | ||||
| 	} | ||||
| 	close(fs.done) | ||||
| 	if err := fs.buf.Flush(); err != nil { | ||||
| 		grpclogLogger.Warningf("failed to flush to Sink: %v", err) | ||||
| 	} | ||||
| 	if err := fs.closer.Close(); err != nil { | ||||
| 		grpclogLogger.Warningf("failed to close the underlying WriterCloser: %v", err) | ||||
| 	} | ||||
| 	if err := fs.out.Close(); err != nil { | ||||
| 		grpclogLogger.Warningf("failed to close the Sink: %v", err) | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| // NewBufferedSink creates a binary log sink with the given WriteCloser. | ||||
| // | ||||
| // Write() marshals the proto message and writes it to the given writer. Each | ||||
| // message is prefixed with a 4 byte big endian unsigned integer as the length. | ||||
| // | ||||
| // Content is kept in a buffer, and is flushed every 60 seconds. | ||||
| // | ||||
| // Close closes the WriteCloser. | ||||
| func NewBufferedSink(o io.WriteCloser) Sink { | ||||
| 	bufW := bufio.NewWriter(o) | ||||
| 	return &bufferedSink{ | ||||
| 		closer: o, | ||||
| 		out:    newWriterSink(bufW), | ||||
| 		buf:    bufW, | ||||
| 		done:   make(chan struct{}), | ||||
| 	} | ||||
| } | ||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue