[bugfix] Downstep otel to fix freebsd compile issue (#1773)

https://github.com/open-telemetry/opentelemetry-go/issues/4076
This commit is contained in:
tobi 2023-05-12 14:55:18 +02:00 committed by GitHub
commit b47661f033
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
70 changed files with 1097 additions and 3138 deletions

View file

@ -91,7 +91,7 @@ var _ SpanProcessor = (*batchSpanProcessor)(nil)
// NewBatchSpanProcessor creates a new SpanProcessor that will send completed
// span batches to the exporter with the supplied options.
//
// If the exporter is nil, the span processor will perform no action.
// If the exporter is nil, the span processor will preform no action.
func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorOption) SpanProcessor {
maxQueueSize := env.BatchSpanProcessorMaxQueueSize(DefaultMaxQueueSize)
maxExportBatchSize := env.BatchSpanProcessorMaxExportBatchSize(DefaultMaxExportBatchSize)

View file

@ -75,9 +75,8 @@ func (cfg tracerProviderConfig) MarshalLog() interface{} {
type TracerProvider struct {
mu sync.Mutex
namedTracer map[instrumentation.Scope]*tracer
spanProcessors atomic.Pointer[spanProcessorStates]
isShutdown atomic.Bool
spanProcessors atomic.Value
isShutdown bool
// These fields are not protected by the lock mu. They are assumed to be
// immutable after creation of the TracerProvider.
@ -120,11 +119,11 @@ func NewTracerProvider(opts ...TracerProviderOption) *TracerProvider {
}
global.Info("TracerProvider created", "config", o)
spss := make(spanProcessorStates, 0, len(o.processors))
spss := spanProcessorStates{}
for _, sp := range o.processors {
spss = append(spss, newSpanProcessorState(sp))
}
tp.spanProcessors.Store(&spss)
tp.spanProcessors.Store(spss)
return tp
}
@ -137,11 +136,10 @@ func NewTracerProvider(opts ...TracerProviderOption) *TracerProvider {
//
// This method is safe to be called concurrently.
func (p *TracerProvider) Tracer(name string, opts ...trace.TracerOption) trace.Tracer {
// This check happens before the mutex is acquired to avoid deadlocking if Tracer() is called from within Shutdown().
if p.isShutdown.Load() {
return trace.NewNoopTracerProvider().Tracer(name, opts...)
}
c := trace.NewTracerConfig(opts...)
p.mu.Lock()
defer p.mu.Unlock()
if name == "" {
name = defaultTracerName
}
@ -150,74 +148,44 @@ func (p *TracerProvider) Tracer(name string, opts ...trace.TracerOption) trace.T
Version: c.InstrumentationVersion(),
SchemaURL: c.SchemaURL(),
}
t, ok := func() (trace.Tracer, bool) {
p.mu.Lock()
defer p.mu.Unlock()
// Must check the flag after acquiring the mutex to avoid returning a valid tracer if Shutdown() ran
// after the first check above but before we acquired the mutex.
if p.isShutdown.Load() {
return trace.NewNoopTracerProvider().Tracer(name, opts...), true
}
t, ok := p.namedTracer[is]
if !ok {
t = &tracer{
provider: p,
instrumentationScope: is,
}
p.namedTracer[is] = t
}
return t, ok
}()
t, ok := p.namedTracer[is]
if !ok {
// This code is outside the mutex to not hold the lock while calling third party logging code:
// - That code may do slow things like I/O, which would prolong the duration the lock is held,
// slowing down all tracing consumers.
// - Logging code may be instrumented with tracing and deadlock because it could try
// acquiring the same non-reentrant mutex.
global.Info("Tracer created", "name", name, "version", is.Version, "schemaURL", is.SchemaURL)
t = &tracer{
provider: p,
instrumentationScope: is,
}
p.namedTracer[is] = t
global.Info("Tracer created", "name", name, "version", c.InstrumentationVersion(), "schemaURL", c.SchemaURL())
}
return t
}
// RegisterSpanProcessor adds the given SpanProcessor to the list of SpanProcessors.
func (p *TracerProvider) RegisterSpanProcessor(sp SpanProcessor) {
// This check prevents calls during a shutdown.
if p.isShutdown.Load() {
return
}
p.mu.Lock()
defer p.mu.Unlock()
// This check prevents calls after a shutdown.
if p.isShutdown.Load() {
if p.isShutdown {
return
}
current := p.getSpanProcessors()
newSPS := make(spanProcessorStates, 0, len(current)+1)
newSPS = append(newSPS, current...)
newSPS := spanProcessorStates{}
newSPS = append(newSPS, p.spanProcessors.Load().(spanProcessorStates)...)
newSPS = append(newSPS, newSpanProcessorState(sp))
p.spanProcessors.Store(&newSPS)
p.spanProcessors.Store(newSPS)
}
// UnregisterSpanProcessor removes the given SpanProcessor from the list of SpanProcessors.
func (p *TracerProvider) UnregisterSpanProcessor(sp SpanProcessor) {
// This check prevents calls during a shutdown.
if p.isShutdown.Load() {
return
}
p.mu.Lock()
defer p.mu.Unlock()
// This check prevents calls after a shutdown.
if p.isShutdown.Load() {
if p.isShutdown {
return
}
old := p.getSpanProcessors()
old := p.spanProcessors.Load().(spanProcessorStates)
if len(old) == 0 {
return
}
spss := make(spanProcessorStates, len(old))
copy(spss, old)
spss := spanProcessorStates{}
spss = append(spss, old...)
// stop the span processor if it is started and remove it from the list
var stopOnce *spanProcessorState
@ -241,13 +209,13 @@ func (p *TracerProvider) UnregisterSpanProcessor(sp SpanProcessor) {
spss[len(spss)-1] = nil
spss = spss[:len(spss)-1]
p.spanProcessors.Store(&spss)
p.spanProcessors.Store(spss)
}
// ForceFlush immediately exports all spans that have not yet been exported for
// all the registered span processors.
func (p *TracerProvider) ForceFlush(ctx context.Context) error {
spss := p.getSpanProcessors()
spss := p.spanProcessors.Load().(spanProcessorStates)
if len(spss) == 0 {
return nil
}
@ -268,21 +236,18 @@ func (p *TracerProvider) ForceFlush(ctx context.Context) error {
// Shutdown shuts down TracerProvider. All registered span processors are shut down
// in the order they were registered and any held computational resources are released.
// After Shutdown is called, all methods are no-ops.
func (p *TracerProvider) Shutdown(ctx context.Context) error {
// This check prevents deadlocks in case of recursive shutdown.
if p.isShutdown.Load() {
return nil
}
p.mu.Lock()
defer p.mu.Unlock()
// This check prevents calls after a shutdown has already been done concurrently.
if !p.isShutdown.CompareAndSwap(false, true) { // did toggle?
spss := p.spanProcessors.Load().(spanProcessorStates)
if len(spss) == 0 {
return nil
}
p.mu.Lock()
defer p.mu.Unlock()
p.isShutdown = true
var retErr error
for _, sps := range p.getSpanProcessors() {
for _, sps := range spss {
select {
case <-ctx.Done():
return ctx.Err()
@ -302,14 +267,10 @@ func (p *TracerProvider) Shutdown(ctx context.Context) error {
}
}
}
p.spanProcessors.Store(&spanProcessorStates{})
p.spanProcessors.Store(spanProcessorStates{})
return retErr
}
func (p *TracerProvider) getSpanProcessors() spanProcessorStates {
return *(p.spanProcessors.Load())
}
// TracerProviderOption configures a TracerProvider.
type TracerProviderOption interface {
apply(tracerProviderConfig) tracerProviderConfig

View file

@ -19,13 +19,12 @@ import (
"sync"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/internal/global"
)
// simpleSpanProcessor is a SpanProcessor that synchronously sends all
// completed Spans to a trace.Exporter immediately.
type simpleSpanProcessor struct {
exporterMu sync.Mutex
exporterMu sync.RWMutex
exporter SpanExporter
stopOnce sync.Once
}
@ -44,8 +43,6 @@ func NewSimpleSpanProcessor(exporter SpanExporter) SpanProcessor {
ssp := &simpleSpanProcessor{
exporter: exporter,
}
global.Warn("SimpleSpanProcessor is not recommended for production use, consider using BatchSpanProcessor instead.")
return ssp
}
@ -54,8 +51,8 @@ func (ssp *simpleSpanProcessor) OnStart(context.Context, ReadWriteSpan) {}
// OnEnd immediately exports a ReadOnlySpan.
func (ssp *simpleSpanProcessor) OnEnd(s ReadOnlySpan) {
ssp.exporterMu.Lock()
defer ssp.exporterMu.Unlock()
ssp.exporterMu.RLock()
defer ssp.exporterMu.RUnlock()
if ssp.exporter != nil && s.SpanContext().TraceFlags().IsSampled() {
if err := ssp.exporter.ExportSpans(context.Background(), []ReadOnlySpan{s}); err != nil {

View file

@ -302,7 +302,7 @@ func (s *recordingSpan) addOverCapAttrs(limit int, attrs []attribute.KeyValue) {
// most a length of limit. Each string slice value is truncated in this fashion
// (the slice length itself is unaffected).
//
// No truncation is performed for a negative limit.
// No truncation is perfromed for a negative limit.
func truncateAttr(limit int, attr attribute.KeyValue) attribute.KeyValue {
if limit < 0 {
return attr
@ -410,7 +410,7 @@ func (s *recordingSpan) End(options ...trace.SpanEndOption) {
}
s.mu.Unlock()
sps := s.tracer.provider.getSpanProcessors()
sps := s.tracer.provider.spanProcessors.Load().(spanProcessorStates)
if len(sps) == 0 {
return
}

View file

@ -38,7 +38,7 @@ type SpanExporter interface {
// must never be done outside of a new major release.
// Shutdown notifies the exporter of a pending halt to operations. The
// exporter is expected to perform any cleanup or synchronization it
// exporter is expected to preform any cleanup or synchronization it
// requires while honoring all timeouts and cancellations contained in
// the passed context.
Shutdown(ctx context.Context) error

View file

@ -62,11 +62,11 @@ type SpanProcessor interface {
type spanProcessorState struct {
sp SpanProcessor
state sync.Once
state *sync.Once
}
func newSpanProcessorState(sp SpanProcessor) *spanProcessorState {
return &spanProcessorState{sp: sp}
return &spanProcessorState{sp: sp, state: &sync.Once{}}
}
type spanProcessorStates []*spanProcessorState

View file

@ -51,7 +51,7 @@ func (tr *tracer) Start(ctx context.Context, name string, options ...trace.SpanS
s := tr.newSpan(ctx, name, &config)
if rw, ok := s.(ReadWriteSpan); ok && s.IsRecording() {
sps := tr.provider.getSpanProcessors()
sps := tr.provider.spanProcessors.Load().(spanProcessorStates)
for _, sp := range sps {
sp.sp.OnStart(ctx, rw)
}