[chore] Simplify the OTEL setup (#4110)

# Description

This simplifies our OTEL setup by:

* Getting rid of some deprecated things.
* Using `autoexport` and letting things get configured by the `OTEL_` environment variables.
* Removing all the unnecessary config options.

## Checklist

Please put an x inside each checkbox to indicate that you've read and followed it: `[ ]` -> `[x]`

If this is a documentation change, only the first checkbox must be filled (you can delete the others if you want).

- [x] I/we have read the [GoToSocial contribution guidelines](https://codeberg.org/superseriousbusiness/gotosocial/src/branch/main/CONTRIBUTING.md).
- [x] I/we have discussed the proposed changes already, either in an issue on the repository, or in the Matrix chat.
- [x] I/we have not leveraged AI to create the proposed changes.
- [x] I/we have performed a self-review of added code.
- [x] I/we have written code that is legible and maintainable by others.
- [ ] I/we have commented the added code, particularly in hard-to-understand areas.
- [x] I/we have made any necessary changes to documentation.
- [ ] I/we have added tests that cover new code.
- [x] I/we have run tests and they pass locally with the changes.
- [x] I/we have run `go fmt ./...` and `golangci-lint run`.

Reviewed-on: https://codeberg.org/superseriousbusiness/gotosocial/pulls/4110
Reviewed-by: tobi <kipvandenbos@noreply.codeberg.org>
Co-authored-by: Daenney <daenney@noreply.codeberg.org>
Co-committed-by: Daenney <daenney@noreply.codeberg.org>
This commit is contained in:
Daenney 2025-05-05 16:22:45 +00:00 committed by tobi
commit ecbdc4227b
145 changed files with 21740 additions and 1319 deletions

176
vendor/go.opentelemetry.io/otel/sdk/log/DESIGN.md generated vendored Normal file
View file

@ -0,0 +1,176 @@
# Logs SDK
## Abstract
`go.opentelemetry.io/otel/sdk/log` provides Logs SDK compliant with the
[specification](https://opentelemetry.io/docs/specs/otel/logs/sdk/).
The prototype was created in
[#4955](https://github.com/open-telemetry/opentelemetry-go/pull/4955).
## Background
The goal is to design the exported API of the SDK would have low performance
overhead. Most importantly, have a design that reduces the amount of heap
allocations and even make it possible to have a zero-allocation implementation.
Eliminating the amount of heap allocations reduces the GC pressure which can
produce some of the largest improvements in performance.[^1]
The main and recommended use case is to configure the SDK to use an OTLP
exporter with a batch processor.[^2] Therefore, the implementation aims to be
high-performant in this scenario. Some users that require high throughput may
also want to use e.g. an [user_events](https://docs.kernel.org/trace/user_events.html),
[LLTng](https://lttng.org/docs/v2.13/#doc-tracing-your-own-user-application)
or [ETW](https://learn.microsoft.com/en-us/windows/win32/etw/about-event-tracing)
exporter with a simple processor. Users may also want to use
[OTLP File](https://opentelemetry.io/docs/specs/otel/protocol/file-exporter/)
or [Standard Output](https://opentelemetry.io/docs/specs/otel/logs/sdk_exporters/stdout/)
exporter in order to emit logs to standard output/error or files.
## Modules structure
The SDK is published as a single `go.opentelemetry.io/otel/sdk/log` Go module.
The exporters are going to be published as following Go modules:
- `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc`
- `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp`
- `go.opentelemetry.io/otel/exporters/stdout/stdoutlog`
## LoggerProvider
The [LoggerProvider](https://opentelemetry.io/docs/specs/otel/logs/sdk/#loggerprovider)
is implemented as `LoggerProvider` struct in [provider.go](provider.go).
## LogRecord limits
The [LogRecord limits](https://opentelemetry.io/docs/specs/otel/logs/sdk/#logrecord-limits)
can be configured using following options:
```go
func WithAttributeCountLimit(limit int) LoggerProviderOption
func WithAttributeValueLengthLimit(limit int) LoggerProviderOption
```
The limits can be also configured using the `OTEL_LOGRECORD_*` environment variables as
[defined by the specification](https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/#logrecord-limits).
### Processor
The [LogRecordProcessor](https://opentelemetry.io/docs/specs/otel/logs/sdk/#logrecordprocessor)
is defined as `Processor` interface in [processor.go](processor.go).
The user set processors for the `LoggerProvider` using
`func WithProcessor(processor Processor) LoggerProviderOption`.
The user can configure custom processors and decorate built-in processors.
The specification may add new operations to the
[LogRecordProcessor](https://opentelemetry.io/docs/specs/otel/logs/sdk/#logrecordprocessor).
If it happens, [CONTRIBUTING.md](../../CONTRIBUTING.md#how-to-change-other-interfaces)
describes how the SDK can be extended in a backwards-compatible way.
### SimpleProcessor
The [Simple processor](https://opentelemetry.io/docs/specs/otel/logs/sdk/#simple-processor)
is implemented as `SimpleProcessor` struct in [simple.go](simple.go).
### BatchProcessor
The [Batching processor](https://opentelemetry.io/docs/specs/otel/logs/sdk/#batching-processor)
is implemented as `BatchProcessor` struct in [batch.go](batch.go).
The `Batcher` can be also configured using the `OTEL_BLRP_*` environment variables as
[defined by the specification](https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/#batch-logrecord-processor).
### Exporter
The [LogRecordExporter](https://opentelemetry.io/docs/specs/otel/logs/sdk/#logrecordexporter)
is defined as `Exporter` interface in [exporter.go](exporter.go).
The slice passed to `Export` must not be retained by the implementation
(like e.g. [`io.Writer`](https://pkg.go.dev/io#Writer))
so that the caller can reuse the passed slice
(e.g. using [`sync.Pool`](https://pkg.go.dev/sync#Pool))
to avoid heap allocations on each call.
The specification may add new operations to the
[LogRecordExporter](https://opentelemetry.io/docs/specs/otel/logs/sdk/#logrecordexporter).
If it happens, [CONTRIBUTING.md](../../CONTRIBUTING.md#how-to-change-other-interfaces)
describes how the SDK can be extended in a backwards-compatible way.
### Record
The [ReadWriteLogRecord](https://opentelemetry.io/docs/specs/otel/logs/sdk/#readwritelogrecord)
is defined as `Record` struct in [record.go](record.go).
The `Record` is designed similarly to [`log.Record`](https://pkg.go.dev/go.opentelemetry.io/otel/log#Record)
in order to reduce the number of heap allocations when processing attributes.
The SDK does not have have an additional definition of
[ReadableLogRecord](https://opentelemetry.io/docs/specs/otel/logs/sdk/#readablelogrecord)
as the specification does not say that the exporters must not be able to modify
the log records. It simply requires them to be able to read the log records.
Having less abstractions reduces the API surface and makes the design simpler.
## Benchmarking
The benchmarks are supposed to test end-to-end scenarios
and avoid I/O that could affect the stability of the results.
The benchmark results can be found in [the prototype](https://github.com/open-telemetry/opentelemetry-go/pull/4955).
## Rejected alternatives
### Represent both LogRecordProcessor and LogRecordExporter as Exporter
Because the [LogRecordProcessor](https://opentelemetry.io/docs/specs/otel/logs/sdk/#logrecordprocessor)
and the [LogRecordProcessor](https://opentelemetry.io/docs/specs/otel/logs/sdk/#logrecordexporter)
abstractions are so similar, there was a proposal to unify them under
single `Exporter` interface.[^3]
However, introducing a `Processor` interface makes it easier
to create custom processor decorators[^4]
and makes the design more aligned with the specification.
### Embed log.Record
Because [`Record`](#record) and [`log.Record`](https://pkg.go.dev/go.opentelemetry.io/otel/log#Record)
are very similar, there was a proposal to embed `log.Record` in `Record` definition.
[`log.Record`](https://pkg.go.dev/go.opentelemetry.io/otel/log#Record)
supports only adding attributes.
In the SDK, we also need to be able to modify the attributes (e.g. removal)
provided via API.
Moreover it is safer to have these abstraction decoupled.
E.g. there can be a need for some fields that can be set via API and cannot be modified by the processors.
### Processor.OnEmit to accept Record values
There was a proposal to make the [Processor](#processor)'s `OnEmit`
to accept a [Record](#record) value instead of a pointer to reduce allocations
as well as to have design similar to [`slog.Handler`](https://pkg.go.dev/log/slog#Handler).
There have been long discussions within the OpenTelemetry Specification SIG[^5]
about whether such a design would comply with the specification. The summary
was that the current processor design flaws are present in other languages as
well. Therefore, it would be favorable to introduce new processing concepts
(e.g. chaining processors) in the specification that would coexist with the
current "mutable" processor design.
The performance disadvantages caused by using a pointer (which at the time of
writing causes an additional heap allocation) may be mitigated by future
versions of the Go compiler, thanks to improved escape analysis and
profile-guided optimization (PGO)[^6].
On the other hand, [Processor](#processor)'s `Enabled` is fine to accept
a [Record](#record) value as the processors should not mutate the passed
parameters.
[^1]: [A Guide to the Go Garbage Collector](https://tip.golang.org/doc/gc-guide)
[^2]: [OpenTelemetry Logging](https://opentelemetry.io/docs/specs/otel/logs)
[^3]: [Conversation on representing LogRecordProcessor and LogRecordExporter via a single Exporter interface](https://github.com/open-telemetry/opentelemetry-go/pull/4954#discussion_r1515050480)
[^4]: [Introduce Processor](https://github.com/pellared/opentelemetry-go/pull/9)
[^5]: [Log record mutations do not have to be visible in next registered processors](https://github.com/open-telemetry/opentelemetry-specification/pull/4067)
[^6]: [Profile-guided optimization](https://go.dev/doc/pgo)

201
vendor/go.opentelemetry.io/otel/sdk/log/LICENSE generated vendored Normal file
View file

@ -0,0 +1,201 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

3
vendor/go.opentelemetry.io/otel/sdk/log/README.md generated vendored Normal file
View file

@ -0,0 +1,3 @@
# Log SDK
[![PkgGoDev](https://pkg.go.dev/badge/go.opentelemetry.io/otel/sdk/log)](https://pkg.go.dev/go.opentelemetry.io/otel/sdk/log)

477
vendor/go.opentelemetry.io/otel/sdk/log/batch.go generated vendored Normal file
View file

@ -0,0 +1,477 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package log // import "go.opentelemetry.io/otel/sdk/log"
import (
"context"
"errors"
"slices"
"sync"
"sync/atomic"
"time"
"go.opentelemetry.io/otel/internal/global"
)
const (
dfltMaxQSize = 2048
dfltExpInterval = time.Second
dfltExpTimeout = 30 * time.Second
dfltExpMaxBatchSize = 512
dfltExpBufferSize = 1
envarMaxQSize = "OTEL_BLRP_MAX_QUEUE_SIZE"
envarExpInterval = "OTEL_BLRP_SCHEDULE_DELAY"
envarExpTimeout = "OTEL_BLRP_EXPORT_TIMEOUT"
envarExpMaxBatchSize = "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE"
)
// Compile-time check BatchProcessor implements Processor.
var _ Processor = (*BatchProcessor)(nil)
// BatchProcessor is a processor that exports batches of log records.
//
// Use [NewBatchProcessor] to create a BatchProcessor. An empty BatchProcessor
// is shut down by default, no records will be batched or exported.
type BatchProcessor struct {
// The BatchProcessor is designed to provide the highest throughput of
// log records possible while being compatible with OpenTelemetry. The
// entry point of log records is the OnEmit method. This method is designed
// to receive records as fast as possible while still honoring shutdown
// commands. All records received are enqueued to queue.
//
// In order to block OnEmit as little as possible, a separate "poll"
// goroutine is spawned at the creation of a BatchProcessor. This
// goroutine is responsible for batching the queue at regular polled
// intervals, or when it is directly signaled to.
//
// To keep the polling goroutine from backing up, all batches it makes are
// exported with a bufferedExporter. This exporter allows the poll
// goroutine to enqueue an export payload that will be handled in a
// separate goroutine dedicated to the export. This asynchronous behavior
// allows the poll goroutine to maintain accurate interval polling.
//
// ___BatchProcessor____ __Poll Goroutine__ __Export Goroutine__
// || || || || || ||
// || ********** || || || || ********** ||
// || Records=>* OnEmit * || || | - ticker || || * export * ||
// || ********** || || | - trigger || || ********** ||
// || || || || | || || || ||
// || || || || | || || || ||
// || __________\/___ || || |*********** || || ______/\_______ ||
// || (____queue______)>=||=||===|* batch *===||=||=>[_export_buffer_] ||
// || || || |*********** || || ||
// ||_____________________|| ||__________________|| ||____________________||
//
//
// The "release valve" in this processing is the record queue. This queue
// is a ring buffer. It will overwrite the oldest records first when writes
// to OnEmit are made faster than the queue can be flushed. If batches
// cannot be flushed to the export buffer, the records will remain in the
// queue.
// exporter is the bufferedExporter all batches are exported with.
exporter *bufferExporter
// q is the active queue of records that have not yet been exported.
q *queue
// batchSize is the minimum number of records needed before an export is
// triggered (unless the interval expires).
batchSize int
// pollTrigger triggers the poll goroutine to flush a batch from the queue.
// This is sent to when it is known that the queue contains at least one
// complete batch.
//
// When a send is made to the channel, the poll loop will be reset after
// the flush. If there is still enough records in the queue for another
// batch the reset of the poll loop will automatically re-trigger itself.
// There is no need for the original sender to monitor and resend.
pollTrigger chan struct{}
// pollKill kills the poll goroutine. This is only expected to be closed
// once by the Shutdown method.
pollKill chan struct{}
// pollDone signals the poll goroutine has completed.
pollDone chan struct{}
// stopped holds the stopped state of the BatchProcessor.
stopped atomic.Bool
noCmp [0]func() //nolint: unused // This is indeed used.
}
// NewBatchProcessor decorates the provided exporter
// so that the log records are batched before exporting.
//
// All of the exporter's methods are called synchronously.
func NewBatchProcessor(exporter Exporter, opts ...BatchProcessorOption) *BatchProcessor {
cfg := newBatchConfig(opts)
if exporter == nil {
// Do not panic on nil export.
exporter = defaultNoopExporter
}
// Order is important here. Wrap the timeoutExporter with the chunkExporter
// to ensure each export completes in timeout (instead of all chunked
// exports).
exporter = newTimeoutExporter(exporter, cfg.expTimeout.Value)
// Use a chunkExporter to ensure ForceFlush and Shutdown calls are batched
// appropriately on export.
exporter = newChunkExporter(exporter, cfg.expMaxBatchSize.Value)
b := &BatchProcessor{
exporter: newBufferExporter(exporter, cfg.expBufferSize.Value),
q: newQueue(cfg.maxQSize.Value),
batchSize: cfg.expMaxBatchSize.Value,
pollTrigger: make(chan struct{}, 1),
pollKill: make(chan struct{}),
}
b.pollDone = b.poll(cfg.expInterval.Value)
return b
}
// poll spawns a goroutine to handle interval polling and batch exporting. The
// returned done chan is closed when the spawned goroutine completes.
func (b *BatchProcessor) poll(interval time.Duration) (done chan struct{}) {
done = make(chan struct{})
ticker := time.NewTicker(interval)
// TODO: investigate using a sync.Pool instead of cloning.
buf := make([]Record, b.batchSize)
go func() {
defer close(done)
defer ticker.Stop()
for {
select {
case <-ticker.C:
case <-b.pollTrigger:
ticker.Reset(interval)
case <-b.pollKill:
return
}
if d := b.q.Dropped(); d > 0 {
global.Warn("dropped log records", "dropped", d)
}
qLen := b.q.TryDequeue(buf, func(r []Record) bool {
ok := b.exporter.EnqueueExport(r)
if ok {
buf = slices.Clone(buf)
}
return ok
})
if qLen >= b.batchSize {
// There is another full batch ready. Immediately trigger
// another export attempt.
select {
case b.pollTrigger <- struct{}{}:
default:
// Another flush signal already received.
}
}
}
}()
return done
}
// OnEmit batches provided log record.
func (b *BatchProcessor) OnEmit(_ context.Context, r *Record) error {
if b.stopped.Load() || b.q == nil {
return nil
}
// The record is cloned so that changes done by subsequent processors
// are not going to lead to a data race.
if n := b.q.Enqueue(r.Clone()); n >= b.batchSize {
select {
case b.pollTrigger <- struct{}{}:
default:
// Flush chan full. The poll goroutine will handle this by
// re-sending any trigger until the queue has less than batchSize
// records.
}
}
return nil
}
// Shutdown flushes queued log records and shuts down the decorated exporter.
func (b *BatchProcessor) Shutdown(ctx context.Context) error {
if b.stopped.Swap(true) || b.q == nil {
return nil
}
// Stop the poll goroutine.
close(b.pollKill)
select {
case <-b.pollDone:
case <-ctx.Done():
// Out of time.
return errors.Join(ctx.Err(), b.exporter.Shutdown(ctx))
}
// Flush remaining queued before exporter shutdown.
err := b.exporter.Export(ctx, b.q.Flush())
return errors.Join(err, b.exporter.Shutdown(ctx))
}
var errPartialFlush = errors.New("partial flush: export buffer full")
// Used for testing.
var ctxErr = func(ctx context.Context) error {
return ctx.Err()
}
// ForceFlush flushes queued log records and flushes the decorated exporter.
func (b *BatchProcessor) ForceFlush(ctx context.Context) error {
if b.stopped.Load() || b.q == nil {
return nil
}
buf := make([]Record, b.q.cap)
notFlushed := func() bool {
var flushed bool
_ = b.q.TryDequeue(buf, func(r []Record) bool {
flushed = b.exporter.EnqueueExport(r)
return flushed
})
return !flushed
}
var err error
// For as long as ctx allows, try to make a single flush of the queue.
for notFlushed() {
// Use ctxErr instead of calling ctx.Err directly so we can test
// the partial error return.
if e := ctxErr(ctx); e != nil {
err = errors.Join(e, errPartialFlush)
break
}
}
return errors.Join(err, b.exporter.ForceFlush(ctx))
}
// queue holds a queue of logging records.
//
// When the queue becomes full, the oldest records in the queue are
// overwritten.
type queue struct {
sync.Mutex
dropped atomic.Uint64
cap, len int
read, write *ring
}
func newQueue(size int) *queue {
r := newRing(size)
return &queue{
cap: size,
read: r,
write: r,
}
}
// Dropped returns the number of Records dropped during enqueueing since the
// last time Dropped was called.
func (q *queue) Dropped() uint64 {
return q.dropped.Swap(0)
}
// Enqueue adds r to the queue. The queue size, including the addition of r, is
// returned.
//
// If enqueueing r will exceed the capacity of q, the oldest Record held in q
// will be dropped and r retained.
func (q *queue) Enqueue(r Record) int {
q.Lock()
defer q.Unlock()
q.write.Value = r
q.write = q.write.Next()
q.len++
if q.len > q.cap {
// Overflow. Advance read to be the new "oldest".
q.len = q.cap
q.read = q.read.Next()
q.dropped.Add(1)
}
return q.len
}
// TryDequeue attempts to dequeue up to len(buf) Records. The available Records
// will be assigned into buf and passed to write. If write fails, returning
// false, the Records will not be removed from the queue. If write succeeds,
// returning true, the dequeued Records are removed from the queue. The number
// of Records remaining in the queue are returned.
//
// When write is called the lock of q is held. The write function must not call
// other methods of this q that acquire the lock.
func (q *queue) TryDequeue(buf []Record, write func([]Record) bool) int {
q.Lock()
defer q.Unlock()
origRead := q.read
n := min(len(buf), q.len)
for i := 0; i < n; i++ {
buf[i] = q.read.Value
q.read = q.read.Next()
}
if write(buf[:n]) {
q.len -= n
} else {
q.read = origRead
}
return q.len
}
// Flush returns all the Records held in the queue and resets it to be
// empty.
func (q *queue) Flush() []Record {
q.Lock()
defer q.Unlock()
out := make([]Record, q.len)
for i := range out {
out[i] = q.read.Value
q.read = q.read.Next()
}
q.len = 0
return out
}
type batchConfig struct {
maxQSize setting[int]
expInterval setting[time.Duration]
expTimeout setting[time.Duration]
expMaxBatchSize setting[int]
expBufferSize setting[int]
}
func newBatchConfig(options []BatchProcessorOption) batchConfig {
var c batchConfig
for _, o := range options {
c = o.apply(c)
}
c.maxQSize = c.maxQSize.Resolve(
clearLessThanOne[int](),
getenv[int](envarMaxQSize),
clearLessThanOne[int](),
fallback[int](dfltMaxQSize),
)
c.expInterval = c.expInterval.Resolve(
clearLessThanOne[time.Duration](),
getenv[time.Duration](envarExpInterval),
clearLessThanOne[time.Duration](),
fallback[time.Duration](dfltExpInterval),
)
c.expTimeout = c.expTimeout.Resolve(
clearLessThanOne[time.Duration](),
getenv[time.Duration](envarExpTimeout),
clearLessThanOne[time.Duration](),
fallback[time.Duration](dfltExpTimeout),
)
c.expMaxBatchSize = c.expMaxBatchSize.Resolve(
clearLessThanOne[int](),
getenv[int](envarExpMaxBatchSize),
clearLessThanOne[int](),
clampMax[int](c.maxQSize.Value),
fallback[int](dfltExpMaxBatchSize),
)
c.expBufferSize = c.expBufferSize.Resolve(
clearLessThanOne[int](),
fallback[int](dfltExpBufferSize),
)
return c
}
// BatchProcessorOption applies a configuration to a [BatchProcessor].
type BatchProcessorOption interface {
apply(batchConfig) batchConfig
}
type batchOptionFunc func(batchConfig) batchConfig
func (fn batchOptionFunc) apply(c batchConfig) batchConfig {
return fn(c)
}
// WithMaxQueueSize sets the maximum queue size used by the Batcher.
// After the size is reached log records are dropped.
//
// If the OTEL_BLRP_MAX_QUEUE_SIZE environment variable is set,
// and this option is not passed, that variable value will be used.
//
// By default, if an environment variable is not set, and this option is not
// passed, 2048 will be used.
// The default value is also used when the provided value is less than one.
func WithMaxQueueSize(size int) BatchProcessorOption {
return batchOptionFunc(func(cfg batchConfig) batchConfig {
cfg.maxQSize = newSetting(size)
return cfg
})
}
// WithExportInterval sets the maximum duration between batched exports.
//
// If the OTEL_BLRP_SCHEDULE_DELAY environment variable is set,
// and this option is not passed, that variable value will be used.
//
// By default, if an environment variable is not set, and this option is not
// passed, 1s will be used.
// The default value is also used when the provided value is less than one.
func WithExportInterval(d time.Duration) BatchProcessorOption {
return batchOptionFunc(func(cfg batchConfig) batchConfig {
cfg.expInterval = newSetting(d)
return cfg
})
}
// WithExportTimeout sets the duration after which a batched export is canceled.
//
// If the OTEL_BLRP_EXPORT_TIMEOUT environment variable is set,
// and this option is not passed, that variable value will be used.
//
// By default, if an environment variable is not set, and this option is not
// passed, 30s will be used.
// The default value is also used when the provided value is less than one.
func WithExportTimeout(d time.Duration) BatchProcessorOption {
return batchOptionFunc(func(cfg batchConfig) batchConfig {
cfg.expTimeout = newSetting(d)
return cfg
})
}
// WithExportMaxBatchSize sets the maximum batch size of every export.
// A batch will be split into multiple exports to not exceed this size.
//
// If the OTEL_BLRP_MAX_EXPORT_BATCH_SIZE environment variable is set,
// and this option is not passed, that variable value will be used.
//
// By default, if an environment variable is not set, and this option is not
// passed, 512 will be used.
// The default value is also used when the provided value is less than one.
func WithExportMaxBatchSize(size int) BatchProcessorOption {
return batchOptionFunc(func(cfg batchConfig) batchConfig {
cfg.expMaxBatchSize = newSetting(size)
return cfg
})
}
// WithExportBufferSize sets the batch buffer size.
// Batches will be temporarily kept in a memory buffer until they are exported.
//
// By default, a value of 1 will be used.
// The default value is also used when the provided value is less than one.
func WithExportBufferSize(size int) BatchProcessorOption {
return batchOptionFunc(func(cfg batchConfig) batchConfig {
cfg.expBufferSize = newSetting(size)
return cfg
})
}

36
vendor/go.opentelemetry.io/otel/sdk/log/doc.go generated vendored Normal file
View file

@ -0,0 +1,36 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
/*
Package log provides the OpenTelemetry Logs SDK.
See https://opentelemetry.io/docs/concepts/signals/logs/ for information
about the concept of OpenTelemetry Logs and
https://opentelemetry.io/docs/concepts/components/ for more information
about OpenTelemetry SDKs.
The entry point for the log package is [NewLoggerProvider].
[LoggerProvider] is the object that all Bridge API calls use to create
Loggers, and ultimately emit log records.
Also, it is an object that should be used to
control the life-cycle (start, flush, and shutdown) of the Logs SDK.
A LoggerProvider needs to be configured to process the log records, this is
done by configuring it with a [Processor] implementation using [WithProcessor].
The log package provides the [BatchProcessor] and [SimpleProcessor]
that are configured with an [Exporter] implementation which
exports the log records to given destination. See
[go.opentelemetry.io/otel/exporters] for exporters that can be used with these
Processors.
The data generated by a LoggerProvider needs to include information about its
origin. A LoggerProvider needs to be configured with a Resource, by using
[WithResource], to include this information. This Resource
should be used to describe the unique runtime environment instrumented code
is being run on. That way when multiple instances of the code are collected
at a single endpoint their origin is decipherable.
See [go.opentelemetry.io/otel/log] for more information about
the OpenTelemetry Logs Bridge API.
*/
package log // import "go.opentelemetry.io/otel/sdk/log"

321
vendor/go.opentelemetry.io/otel/sdk/log/exporter.go generated vendored Normal file
View file

@ -0,0 +1,321 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package log // import "go.opentelemetry.io/otel/sdk/log"
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"go.opentelemetry.io/otel"
)
// Exporter handles the delivery of log records to external receivers.
type Exporter interface {
// Export transmits log records to a receiver.
//
// The deadline or cancellation of the passed context must be honored. An
// appropriate error should be returned in these situations.
//
// All retry logic must be contained in this function. The SDK does not
// implement any retry logic. All errors returned by this function are
// considered unrecoverable and will be reported to a configured error
// Handler.
//
// Implementations must not retain the records slice.
//
// Before modifying a Record, the implementation must use Record.Clone
// to create a copy that shares no state with the original.
//
// Export should never be called concurrently with other Export calls.
// However, it may be called concurrently with other methods.
Export(ctx context.Context, records []Record) error
// Shutdown is called when the SDK shuts down. Any cleanup or release of
// resources held by the exporter should be done in this call.
//
// The deadline or cancellation of the passed context must be honored. An
// appropriate error should be returned in these situations.
//
// After Shutdown is called, calls to Export, Shutdown, or ForceFlush
// should perform no operation and return nil error.
//
// Shutdown may be called concurrently with itself or with other methods.
Shutdown(ctx context.Context) error
// ForceFlush exports log records to the configured Exporter that have not yet
// been exported.
//
// The deadline or cancellation of the passed context must be honored. An
// appropriate error should be returned in these situations.
//
// ForceFlush may be called concurrently with itself or with other methods.
ForceFlush(ctx context.Context) error
}
var defaultNoopExporter = &noopExporter{}
type noopExporter struct{}
func (noopExporter) Export(context.Context, []Record) error { return nil }
func (noopExporter) Shutdown(context.Context) error { return nil }
func (noopExporter) ForceFlush(context.Context) error { return nil }
// chunkExporter wraps an Exporter's Export method so it is called with
// appropriately sized export payloads. Any payload larger than a defined size
// is chunked into smaller payloads and exported sequentially.
type chunkExporter struct {
Exporter
// size is the maximum batch size exported.
size int
}
// newChunkExporter wraps exporter. Calls to the Export will have their records
// payload chunked so they do not exceed size. If size is less than or equal
// to 0, exporter is returned directly.
func newChunkExporter(exporter Exporter, size int) Exporter {
if size <= 0 {
return exporter
}
return &chunkExporter{Exporter: exporter, size: size}
}
// Export exports records in chunks no larger than c.size.
func (c chunkExporter) Export(ctx context.Context, records []Record) error {
n := len(records)
for i, j := 0, min(c.size, n); i < n; i, j = i+c.size, min(j+c.size, n) {
if err := c.Exporter.Export(ctx, records[i:j]); err != nil {
return err
}
}
return nil
}
// timeoutExporter wraps an Exporter and ensures any call to Export will have a
// timeout for the context.
type timeoutExporter struct {
Exporter
// timeout is the maximum time an export is attempted.
timeout time.Duration
}
// newTimeoutExporter wraps exporter with an Exporter that limits the context
// lifetime passed to Export to be timeout. If timeout is less than or equal to
// zero, exporter will be returned directly.
func newTimeoutExporter(exp Exporter, timeout time.Duration) Exporter {
if timeout <= 0 {
return exp
}
return &timeoutExporter{Exporter: exp, timeout: timeout}
}
// Export sets the timeout of ctx before calling the Exporter e wraps.
func (e *timeoutExporter) Export(ctx context.Context, records []Record) error {
ctx, cancel := context.WithTimeout(ctx, e.timeout)
defer cancel()
return e.Exporter.Export(ctx, records)
}
// exportSync exports all data from input using exporter in a spawned
// goroutine. The returned chan will be closed when the spawned goroutine
// completes.
func exportSync(input <-chan exportData, exporter Exporter) (done chan struct{}) {
done = make(chan struct{})
go func() {
defer close(done)
for data := range input {
data.DoExport(exporter.Export)
}
}()
return done
}
// exportData is data related to an export.
type exportData struct {
ctx context.Context
records []Record
// respCh is the channel any error returned from the export will be sent
// on. If this is nil, and the export error is non-nil, the error will
// passed to the OTel error handler.
respCh chan<- error
}
// DoExport calls exportFn with the data contained in e. The error response
// will be returned on e's respCh if not nil. The error will be handled by the
// default OTel error handle if it is not nil and respCh is nil or full.
func (e exportData) DoExport(exportFn func(context.Context, []Record) error) {
if len(e.records) == 0 {
e.respond(nil)
return
}
e.respond(exportFn(e.ctx, e.records))
}
func (e exportData) respond(err error) {
select {
case e.respCh <- err:
default:
// e.respCh is nil or busy, default to otel.Handler.
if err != nil {
otel.Handle(err)
}
}
}
// bufferExporter provides asynchronous and synchronous export functionality by
// buffering export requests.
type bufferExporter struct {
Exporter
input chan exportData
inputMu sync.Mutex
done chan struct{}
stopped atomic.Bool
}
// newBufferExporter returns a new bufferExporter that wraps exporter. The
// returned bufferExporter will buffer at most size number of export requests.
// If size is less than zero, zero will be used (i.e. only synchronous
// exporting will be supported).
func newBufferExporter(exporter Exporter, size int) *bufferExporter {
if size < 0 {
size = 0
}
input := make(chan exportData, size)
return &bufferExporter{
Exporter: exporter,
input: input,
done: exportSync(input, exporter),
}
}
var errStopped = errors.New("exporter stopped")
func (e *bufferExporter) enqueue(ctx context.Context, records []Record, rCh chan<- error) error {
data := exportData{ctx, records, rCh}
e.inputMu.Lock()
defer e.inputMu.Unlock()
// Check stopped before enqueueing now that e.inputMu is held. This
// prevents sends on a closed chan when Shutdown is called concurrently.
if e.stopped.Load() {
return errStopped
}
select {
case e.input <- data:
case <-ctx.Done():
return ctx.Err()
}
return nil
}
// EnqueueExport enqueues an export of records in the context of ctx to be
// performed asynchronously. This will return true if the records are
// successfully enqueued (or the bufferExporter is shut down), false otherwise.
//
// The passed records are held after this call returns.
func (e *bufferExporter) EnqueueExport(records []Record) bool {
if len(records) == 0 {
// Nothing to enqueue, do not waste input space.
return true
}
data := exportData{ctx: context.Background(), records: records}
e.inputMu.Lock()
defer e.inputMu.Unlock()
// Check stopped before enqueueing now that e.inputMu is held. This
// prevents sends on a closed chan when Shutdown is called concurrently.
if e.stopped.Load() {
return true
}
select {
case e.input <- data:
return true
default:
return false
}
}
// Export synchronously exports records in the context of ctx. This will not
// return until the export has been completed.
func (e *bufferExporter) Export(ctx context.Context, records []Record) error {
if len(records) == 0 {
return nil
}
resp := make(chan error, 1)
err := e.enqueue(ctx, records, resp)
if err != nil {
if errors.Is(err, errStopped) {
return nil
}
return fmt.Errorf("%w: dropping %d records", err, len(records))
}
select {
case err := <-resp:
return err
case <-ctx.Done():
return ctx.Err()
}
}
// ForceFlush flushes buffered exports. Any existing exports that is buffered
// is flushed before this returns.
func (e *bufferExporter) ForceFlush(ctx context.Context) error {
resp := make(chan error, 1)
err := e.enqueue(ctx, nil, resp)
if err != nil {
if errors.Is(err, errStopped) {
return nil
}
return err
}
select {
case <-resp:
case <-ctx.Done():
return ctx.Err()
}
return e.Exporter.ForceFlush(ctx)
}
// Shutdown shuts down e.
//
// Any buffered exports are flushed before this returns.
//
// All calls to EnqueueExport or Exporter will return nil without any export
// after this is called.
func (e *bufferExporter) Shutdown(ctx context.Context) error {
if e.stopped.Swap(true) {
return nil
}
e.inputMu.Lock()
defer e.inputMu.Unlock()
// No more sends will be made.
close(e.input)
select {
case <-e.done:
case <-ctx.Done():
return errors.Join(ctx.Err(), e.Exporter.Shutdown(ctx))
}
return e.Exporter.Shutdown(ctx)
}

View file

@ -0,0 +1,62 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package log // import "go.opentelemetry.io/otel/sdk/log"
import (
"context"
"go.opentelemetry.io/otel/log"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/resource"
)
// FilterProcessor is a [Processor] that knows, and can identify, what [Record]
// it will process or drop when it is passed to [Processor.OnEmit].
//
// This is useful for users that want to know if a [log.Record]
// will be processed or dropped before they perform complex operations to
// construct the [log.Record].
//
// The SDK's Logger.Enabled returns false
// if all the registered Processors implement FilterProcessor
// and they all return false.
//
// Processor implementations that choose to support this by satisfying this
// interface are expected to re-evaluate the [Record] passed to [Processor.OnEmit],
// it is not expected that the caller to OnEmit will use the functionality
// from this interface prior to calling OnEmit.
//
// See the [go.opentelemetry.io/contrib/processors/minsev] for an example use-case.
// It provides a Processor used to filter out [Record]
// that has a [log.Severity] below a threshold.
type FilterProcessor interface {
// Enabled returns whether the Processor will process for the given context
// and param.
//
// The passed param is likely to be a partial record information being
// provided (e.g a param with only the Severity set).
// If a Processor needs more information than is provided, it
// is said to be in an indeterminate state (see below).
//
// The returned value will be true when the Processor will process for the
// provided context and param, and will be false if the Logger will not
// emit. The returned value may be true or false in an indeterminate state.
// An implementation should default to returning true for an indeterminate
// state, but may return false if valid reasons in particular circumstances
// exist (e.g. performance, correctness).
//
// The param should not be held by the implementation. A copy should be
// made if the param needs to be held after the call returns.
//
// Implementations of this method need to be safe for a user to call
// concurrently.
Enabled(ctx context.Context, param EnabledParameters) bool
}
// EnabledParameters represents payload for [FilterProcessor]'s Enabled method.
type EnabledParameters struct {
Resource resource.Resource
InstrumentationScope instrumentation.Scope
Severity log.Severity
}

110
vendor/go.opentelemetry.io/otel/sdk/log/logger.go generated vendored Normal file
View file

@ -0,0 +1,110 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package log // import "go.opentelemetry.io/otel/sdk/log"
import (
"context"
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/log"
"go.opentelemetry.io/otel/log/embedded"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/trace"
)
var now = time.Now
// Compile-time check logger implements log.Logger.
var _ log.Logger = (*logger)(nil)
type logger struct {
embedded.Logger
provider *LoggerProvider
instrumentationScope instrumentation.Scope
}
func newLogger(p *LoggerProvider, scope instrumentation.Scope) *logger {
return &logger{
provider: p,
instrumentationScope: scope,
}
}
func (l *logger) Emit(ctx context.Context, r log.Record) {
newRecord := l.newRecord(ctx, r)
for _, p := range l.provider.processors {
if err := p.OnEmit(ctx, &newRecord); err != nil {
otel.Handle(err)
}
}
}
// Enabled returns true if at least one Processor held by the LoggerProvider
// that created the logger will process param for the provided context and param.
//
// If it is not possible to definitively determine the param will be
// processed, true will be returned by default. A value of false will only be
// returned if it can be positively verified that no Processor will process.
func (l *logger) Enabled(ctx context.Context, param log.EnabledParameters) bool {
p := EnabledParameters{
Resource: *l.provider.resource,
InstrumentationScope: l.instrumentationScope,
Severity: param.Severity,
}
// If there are more Processors than FilterProcessors,
// which means not all Processors are FilterProcessors,
// we cannot be sure that all Processors will drop the record.
// Therefore, return true.
//
// If all Processors are FilterProcessors, check if any is enabled.
return len(l.provider.processors) > len(l.provider.fltrProcessors) || anyEnabled(ctx, p, l.provider.fltrProcessors)
}
func anyEnabled(ctx context.Context, param EnabledParameters, fltrs []FilterProcessor) bool {
for _, f := range fltrs {
if f.Enabled(ctx, param) {
// At least one Processor will process the Record.
return true
}
}
// No Processor will process the record
return false
}
func (l *logger) newRecord(ctx context.Context, r log.Record) Record {
sc := trace.SpanContextFromContext(ctx)
newRecord := Record{
eventName: r.EventName(),
timestamp: r.Timestamp(),
observedTimestamp: r.ObservedTimestamp(),
severity: r.Severity(),
severityText: r.SeverityText(),
body: r.Body(),
traceID: sc.TraceID(),
spanID: sc.SpanID(),
traceFlags: sc.TraceFlags(),
resource: l.provider.resource,
scope: &l.instrumentationScope,
attributeValueLengthLimit: l.provider.attributeValueLengthLimit,
attributeCountLimit: l.provider.attributeCountLimit,
}
// This field SHOULD be set once the event is observed by OpenTelemetry.
if newRecord.observedTimestamp.IsZero() {
newRecord.observedTimestamp = now()
}
r.WalkAttributes(func(kv log.KeyValue) bool {
newRecord.AddAttributes(kv)
return true
})
return newRecord
}

56
vendor/go.opentelemetry.io/otel/sdk/log/processor.go generated vendored Normal file
View file

@ -0,0 +1,56 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package log // import "go.opentelemetry.io/otel/sdk/log"
import (
"context"
)
// Processor handles the processing of log records.
//
// Any of the Processor's methods may be called concurrently with itself
// or with other methods. It is the responsibility of the Processor to manage
// this concurrency.
//
// See [FilterProcessor] for information about how a Processor can support filtering.
type Processor interface {
// OnEmit is called when a Record is emitted.
//
// OnEmit will be called independent of Enabled. Implementations need to
// validate the arguments themselves before processing.
//
// Implementation should not interrupt the record processing
// if the context is canceled.
//
// All retry logic must be contained in this function. The SDK does not
// implement any retry logic. All errors returned by this function are
// considered unrecoverable and will be reported to a configured error
// Handler.
//
// The SDK invokes the processors sequentially in the same order as
// they were registered using WithProcessor.
// Implementations may synchronously modify the record so that the changes
// are visible in the next registered processor.
// Notice that Record is not concurrent safe. Therefore, asynchronous
// processing may cause race conditions. Use Record.Clone
// to create a copy that shares no state with the original.
OnEmit(ctx context.Context, record *Record) error
// Shutdown is called when the SDK shuts down. Any cleanup or release of
// resources held by the exporter should be done in this call.
//
// The deadline or cancellation of the passed context must be honored. An
// appropriate error should be returned in these situations.
//
// After Shutdown is called, calls to Export, Shutdown, or ForceFlush
// should perform no operation and return nil error.
Shutdown(ctx context.Context) error
// ForceFlush exports log records to the configured Exporter that have not yet
// been exported.
//
// The deadline or cancellation of the passed context must be honored. An
// appropriate error should be returned in these situations.
ForceFlush(ctx context.Context) error
}

256
vendor/go.opentelemetry.io/otel/sdk/log/provider.go generated vendored Normal file
View file

@ -0,0 +1,256 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package log // import "go.opentelemetry.io/otel/sdk/log"
import (
"context"
"errors"
"sync"
"sync/atomic"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/log"
"go.opentelemetry.io/otel/log/embedded"
"go.opentelemetry.io/otel/log/noop"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/resource"
)
const (
defaultAttrCntLim = 128
defaultAttrValLenLim = -1
envarAttrCntLim = "OTEL_LOGRECORD_ATTRIBUTE_COUNT_LIMIT"
envarAttrValLenLim = "OTEL_LOGRECORD_ATTRIBUTE_VALUE_LENGTH_LIMIT"
)
type providerConfig struct {
resource *resource.Resource
processors []Processor
fltrProcessors []FilterProcessor
attrCntLim setting[int]
attrValLenLim setting[int]
}
func newProviderConfig(opts []LoggerProviderOption) providerConfig {
var c providerConfig
for _, opt := range opts {
c = opt.apply(c)
}
if c.resource == nil {
c.resource = resource.Default()
}
c.attrCntLim = c.attrCntLim.Resolve(
getenv[int](envarAttrCntLim),
fallback[int](defaultAttrCntLim),
)
c.attrValLenLim = c.attrValLenLim.Resolve(
getenv[int](envarAttrValLenLim),
fallback[int](defaultAttrValLenLim),
)
return c
}
// LoggerProvider handles the creation and coordination of Loggers. All Loggers
// created by a LoggerProvider will be associated with the same Resource.
type LoggerProvider struct {
embedded.LoggerProvider
resource *resource.Resource
processors []Processor
fltrProcessors []FilterProcessor
attributeCountLimit int
attributeValueLengthLimit int
loggersMu sync.Mutex
loggers map[instrumentation.Scope]*logger
stopped atomic.Bool
noCmp [0]func() //nolint: unused // This is indeed used.
}
// Compile-time check LoggerProvider implements log.LoggerProvider.
var _ log.LoggerProvider = (*LoggerProvider)(nil)
// NewLoggerProvider returns a new and configured LoggerProvider.
//
// By default, the returned LoggerProvider is configured with the default
// Resource and no Processors. Processors cannot be added after a LoggerProvider is
// created. This means the returned LoggerProvider, one created with no
// Processors, will perform no operations.
func NewLoggerProvider(opts ...LoggerProviderOption) *LoggerProvider {
cfg := newProviderConfig(opts)
return &LoggerProvider{
resource: cfg.resource,
processors: cfg.processors,
fltrProcessors: cfg.fltrProcessors,
attributeCountLimit: cfg.attrCntLim.Value,
attributeValueLengthLimit: cfg.attrValLenLim.Value,
}
}
// Logger returns a new [log.Logger] with the provided name and configuration.
//
// If p is shut down, a [noop.Logger] instance is returned.
//
// This method can be called concurrently.
func (p *LoggerProvider) Logger(name string, opts ...log.LoggerOption) log.Logger {
if name == "" {
global.Warn("Invalid Logger name.", "name", name)
}
if p.stopped.Load() {
return noop.NewLoggerProvider().Logger(name, opts...)
}
cfg := log.NewLoggerConfig(opts...)
scope := instrumentation.Scope{
Name: name,
Version: cfg.InstrumentationVersion(),
SchemaURL: cfg.SchemaURL(),
Attributes: cfg.InstrumentationAttributes(),
}
p.loggersMu.Lock()
defer p.loggersMu.Unlock()
if p.loggers == nil {
l := newLogger(p, scope)
p.loggers = map[instrumentation.Scope]*logger{scope: l}
return l
}
l, ok := p.loggers[scope]
if !ok {
l = newLogger(p, scope)
p.loggers[scope] = l
}
return l
}
// Shutdown shuts down the provider and all processors.
//
// This method can be called concurrently.
func (p *LoggerProvider) Shutdown(ctx context.Context) error {
stopped := p.stopped.Swap(true)
if stopped {
return nil
}
var err error
for _, p := range p.processors {
err = errors.Join(err, p.Shutdown(ctx))
}
return err
}
// ForceFlush flushes all processors.
//
// This method can be called concurrently.
func (p *LoggerProvider) ForceFlush(ctx context.Context) error {
if p.stopped.Load() {
return nil
}
var err error
for _, p := range p.processors {
err = errors.Join(err, p.ForceFlush(ctx))
}
return err
}
// LoggerProviderOption applies a configuration option value to a LoggerProvider.
type LoggerProviderOption interface {
apply(providerConfig) providerConfig
}
type loggerProviderOptionFunc func(providerConfig) providerConfig
func (fn loggerProviderOptionFunc) apply(c providerConfig) providerConfig {
return fn(c)
}
// WithResource associates a Resource with a LoggerProvider. This Resource
// represents the entity producing telemetry and is associated with all Loggers
// the LoggerProvider will create.
//
// By default, if this Option is not used, the default Resource from the
// go.opentelemetry.io/otel/sdk/resource package will be used.
func WithResource(res *resource.Resource) LoggerProviderOption {
return loggerProviderOptionFunc(func(cfg providerConfig) providerConfig {
var err error
cfg.resource, err = resource.Merge(resource.Environment(), res)
if err != nil {
otel.Handle(err)
}
return cfg
})
}
// WithProcessor associates Processor with a LoggerProvider.
//
// By default, if this option is not used, the LoggerProvider will perform no
// operations; no data will be exported without a processor.
//
// The SDK invokes the processors sequentially in the same order as they were
// registered.
//
// For production, use [NewBatchProcessor] to batch log records before they are exported.
// For testing and debugging, use [NewSimpleProcessor] to synchronously export log records.
//
// See [FilterProcessor] for information about how a Processor can support filtering.
func WithProcessor(processor Processor) LoggerProviderOption {
return loggerProviderOptionFunc(func(cfg providerConfig) providerConfig {
cfg.processors = append(cfg.processors, processor)
if f, ok := processor.(FilterProcessor); ok {
cfg.fltrProcessors = append(cfg.fltrProcessors, f)
}
return cfg
})
}
// WithAttributeCountLimit sets the maximum allowed log record attribute count.
// Any attribute added to a log record once this limit is reached will be dropped.
//
// Setting this to zero means no attributes will be recorded.
//
// Setting this to a negative value means no limit is applied.
//
// If the OTEL_LOGRECORD_ATTRIBUTE_COUNT_LIMIT environment variable is set,
// and this option is not passed, that variable value will be used.
//
// By default, if an environment variable is not set, and this option is not
// passed, 128 will be used.
func WithAttributeCountLimit(limit int) LoggerProviderOption {
return loggerProviderOptionFunc(func(cfg providerConfig) providerConfig {
cfg.attrCntLim = newSetting(limit)
return cfg
})
}
// AttributeValueLengthLimit sets the maximum allowed attribute value length.
//
// This limit only applies to string and string slice attribute values.
// Any string longer than this value will be truncated to this length.
//
// Setting this to a negative value means no limit is applied.
//
// If the OTEL_LOGRECORD_ATTRIBUTE_VALUE_LENGTH_LIMIT environment variable is set,
// and this option is not passed, that variable value will be used.
//
// By default, if an environment variable is not set, and this option is not
// passed, no limit (-1) will be used.
func WithAttributeValueLengthLimit(limit int) LoggerProviderOption {
return loggerProviderOptionFunc(func(cfg providerConfig) providerConfig {
cfg.attrValLenLim = newSetting(limit)
return cfg
})
}

518
vendor/go.opentelemetry.io/otel/sdk/log/record.go generated vendored Normal file
View file

@ -0,0 +1,518 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package log // import "go.opentelemetry.io/otel/sdk/log"
import (
"slices"
"strings"
"sync"
"time"
"unicode/utf8"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/log"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/trace"
)
// attributesInlineCount is the number of attributes that are efficiently
// stored in an array within a Record. This value is borrowed from slog which
// performed a quantitative survey of log library use and found this value to
// cover 95% of all use-cases (https://go.dev/blog/slog#performance).
const attributesInlineCount = 5
var logAttrDropped = sync.OnceFunc(func() {
global.Warn("limit reached: dropping log Record attributes")
})
// indexPool is a pool of index maps used for de-duplication.
var indexPool = sync.Pool{
New: func() any { return make(map[string]int) },
}
func getIndex() map[string]int {
return indexPool.Get().(map[string]int)
}
func putIndex(index map[string]int) {
clear(index)
indexPool.Put(index)
}
// Record is a log record emitted by the Logger.
// A log record with non-empty event name is interpreted as an event record.
//
// Do not create instances of Record on your own in production code.
// You can use [go.opentelemetry.io/otel/sdk/log/logtest.RecordFactory]
// for testing purposes.
type Record struct {
// Do not embed the log.Record. Attributes need to be overwrite-able and
// deep-copying needs to be possible.
eventName string
timestamp time.Time
observedTimestamp time.Time
severity log.Severity
severityText string
body log.Value
// The fields below are for optimizing the implementation of Attributes and
// AddAttributes. This design is borrowed from the slog Record type:
// https://cs.opensource.google/go/go/+/refs/tags/go1.22.0:src/log/slog/record.go;l=20
// Allocation optimization: an inline array sized to hold
// the majority of log calls (based on examination of open-source
// code). It holds the start of the list of attributes.
front [attributesInlineCount]log.KeyValue
// The number of attributes in front.
nFront int
// The list of attributes except for those in front.
// Invariants:
// - len(back) > 0 if nFront == len(front)
// - Unused array elements are zero-ed. Used to detect mistakes.
back []log.KeyValue
// dropped is the count of attributes that have been dropped when limits
// were reached.
dropped int
traceID trace.TraceID
spanID trace.SpanID
traceFlags trace.TraceFlags
// resource represents the entity that collected the log.
resource *resource.Resource
// scope is the Scope that the Logger was created with.
scope *instrumentation.Scope
attributeValueLengthLimit int
attributeCountLimit int
noCmp [0]func() //nolint: unused // This is indeed used.
}
func (r *Record) addDropped(n int) {
logAttrDropped()
r.dropped += n
}
func (r *Record) setDropped(n int) {
logAttrDropped()
r.dropped = n
}
// EventName returns the event name.
// A log record with non-empty event name is interpreted as an event record.
func (r *Record) EventName() string {
return r.eventName
}
// SetEventName sets the event name.
// A log record with non-empty event name is interpreted as an event record.
func (r *Record) SetEventName(s string) {
r.eventName = s
}
// Timestamp returns the time when the log record occurred.
func (r *Record) Timestamp() time.Time {
return r.timestamp
}
// SetTimestamp sets the time when the log record occurred.
func (r *Record) SetTimestamp(t time.Time) {
r.timestamp = t
}
// ObservedTimestamp returns the time when the log record was observed.
func (r *Record) ObservedTimestamp() time.Time {
return r.observedTimestamp
}
// SetObservedTimestamp sets the time when the log record was observed.
func (r *Record) SetObservedTimestamp(t time.Time) {
r.observedTimestamp = t
}
// Severity returns the severity of the log record.
func (r *Record) Severity() log.Severity {
return r.severity
}
// SetSeverity sets the severity level of the log record.
func (r *Record) SetSeverity(level log.Severity) {
r.severity = level
}
// SeverityText returns severity (also known as log level) text. This is the
// original string representation of the severity as it is known at the source.
func (r *Record) SeverityText() string {
return r.severityText
}
// SetSeverityText sets severity (also known as log level) text. This is the
// original string representation of the severity as it is known at the source.
func (r *Record) SetSeverityText(text string) {
r.severityText = text
}
// Body returns the body of the log record.
func (r *Record) Body() log.Value {
return r.body
}
// SetBody sets the body of the log record.
func (r *Record) SetBody(v log.Value) {
r.body = v
}
// WalkAttributes walks all attributes the log record holds by calling f for
// each on each [log.KeyValue] in the [Record]. Iteration stops if f returns false.
func (r *Record) WalkAttributes(f func(log.KeyValue) bool) {
for i := 0; i < r.nFront; i++ {
if !f(r.front[i]) {
return
}
}
for _, a := range r.back {
if !f(a) {
return
}
}
}
// AddAttributes adds attributes to the log record.
// Attributes in attrs will overwrite any attribute already added to r with the same key.
func (r *Record) AddAttributes(attrs ...log.KeyValue) {
n := r.AttributesLen()
if n == 0 {
// Avoid the more complex duplicate map lookups below.
var drop int
attrs, drop = dedup(attrs)
r.setDropped(drop)
attrs, drop = head(attrs, r.attributeCountLimit)
r.addDropped(drop)
r.addAttrs(attrs)
return
}
// Used to find duplicates between attrs and existing attributes in r.
rIndex := r.attrIndex()
defer putIndex(rIndex)
// Unique attrs that need to be added to r. This uses the same underlying
// array as attrs.
//
// Note, do not iterate attrs twice by just calling dedup(attrs) here.
unique := attrs[:0]
// Used to find duplicates within attrs itself. The index value is the
// index of the element in unique.
uIndex := getIndex()
defer putIndex(uIndex)
// Deduplicate attrs within the scope of all existing attributes.
for _, a := range attrs {
// Last-value-wins for any duplicates in attrs.
idx, found := uIndex[a.Key]
if found {
r.addDropped(1)
unique[idx] = a
continue
}
idx, found = rIndex[a.Key]
if found {
// New attrs overwrite any existing with the same key.
r.addDropped(1)
if idx < 0 {
r.front[-(idx + 1)] = a
} else {
r.back[idx] = a
}
} else {
// Unique attribute.
unique = append(unique, a)
uIndex[a.Key] = len(unique) - 1
}
}
attrs = unique
if r.attributeCountLimit > 0 && n+len(attrs) > r.attributeCountLimit {
// Truncate the now unique attributes to comply with limit.
//
// Do not use head(attrs, r.attributeCountLimit - n) here. If
// (r.attributeCountLimit - n) <= 0 attrs needs to be emptied.
last := max(0, r.attributeCountLimit-n)
r.addDropped(len(attrs) - last)
attrs = attrs[:last]
}
r.addAttrs(attrs)
}
// attrIndex returns an index map for all attributes in the Record r. The index
// maps the attribute key to location the attribute is stored. If the value is
// < 0 then -(value + 1) (e.g. -1 -> 0, -2 -> 1, -3 -> 2) represents the index
// in r.nFront. Otherwise, the index is the exact index of r.back.
//
// The returned index is taken from the indexPool. It is the callers
// responsibility to return the index to that pool (putIndex) when done.
func (r *Record) attrIndex() map[string]int {
index := getIndex()
for i := 0; i < r.nFront; i++ {
key := r.front[i].Key
index[key] = -i - 1 // stored in front: negative index.
}
for i := 0; i < len(r.back); i++ {
key := r.back[i].Key
index[key] = i // stored in back: positive index.
}
return index
}
// addAttrs adds attrs to the Record r. This does not validate any limits or
// duplication of attributes, these tasks are left to the caller to handle
// prior to calling.
func (r *Record) addAttrs(attrs []log.KeyValue) {
var i int
for i = 0; i < len(attrs) && r.nFront < len(r.front); i++ {
a := attrs[i]
r.front[r.nFront] = r.applyAttrLimits(a)
r.nFront++
}
for j, a := range attrs[i:] {
attrs[i+j] = r.applyAttrLimits(a)
}
r.back = slices.Grow(r.back, len(attrs[i:]))
r.back = append(r.back, attrs[i:]...)
}
// SetAttributes sets (and overrides) attributes to the log record.
func (r *Record) SetAttributes(attrs ...log.KeyValue) {
var drop int
attrs, drop = dedup(attrs)
r.setDropped(drop)
attrs, drop = head(attrs, r.attributeCountLimit)
r.addDropped(drop)
r.nFront = 0
var i int
for i = 0; i < len(attrs) && r.nFront < len(r.front); i++ {
a := attrs[i]
r.front[r.nFront] = r.applyAttrLimits(a)
r.nFront++
}
r.back = slices.Clone(attrs[i:])
for i, a := range r.back {
r.back[i] = r.applyAttrLimits(a)
}
}
// head returns the first n values of kvs along with the number of elements
// dropped. If n is less than or equal to zero, kvs is returned with 0.
func head(kvs []log.KeyValue, n int) (out []log.KeyValue, dropped int) {
if n > 0 && len(kvs) > n {
return kvs[:n], len(kvs) - n
}
return kvs, 0
}
// dedup deduplicates kvs front-to-back with the last value saved.
func dedup(kvs []log.KeyValue) (unique []log.KeyValue, dropped int) {
index := getIndex()
defer putIndex(index)
unique = kvs[:0] // Use the same underlying array as kvs.
for _, a := range kvs {
idx, found := index[a.Key]
if found {
dropped++
unique[idx] = a
} else {
unique = append(unique, a)
index[a.Key] = len(unique) - 1
}
}
return unique, dropped
}
// AttributesLen returns the number of attributes in the log record.
func (r *Record) AttributesLen() int {
return r.nFront + len(r.back)
}
// DroppedAttributes returns the number of attributes dropped due to limits
// being reached.
func (r *Record) DroppedAttributes() int {
return r.dropped
}
// TraceID returns the trace ID or empty array.
func (r *Record) TraceID() trace.TraceID {
return r.traceID
}
// SetTraceID sets the trace ID.
func (r *Record) SetTraceID(id trace.TraceID) {
r.traceID = id
}
// SpanID returns the span ID or empty array.
func (r *Record) SpanID() trace.SpanID {
return r.spanID
}
// SetSpanID sets the span ID.
func (r *Record) SetSpanID(id trace.SpanID) {
r.spanID = id
}
// TraceFlags returns the trace flags.
func (r *Record) TraceFlags() trace.TraceFlags {
return r.traceFlags
}
// SetTraceFlags sets the trace flags.
func (r *Record) SetTraceFlags(flags trace.TraceFlags) {
r.traceFlags = flags
}
// Resource returns the entity that collected the log.
func (r *Record) Resource() resource.Resource {
if r.resource == nil {
return *resource.Empty()
}
return *r.resource
}
// InstrumentationScope returns the scope that the Logger was created with.
func (r *Record) InstrumentationScope() instrumentation.Scope {
if r.scope == nil {
return instrumentation.Scope{}
}
return *r.scope
}
// Clone returns a copy of the record with no shared state. The original record
// and the clone can both be modified without interfering with each other.
func (r *Record) Clone() Record {
res := *r
res.back = slices.Clone(r.back)
return res
}
func (r *Record) applyAttrLimits(attr log.KeyValue) log.KeyValue {
attr.Value = r.applyValueLimits(attr.Value)
return attr
}
func (r *Record) applyValueLimits(val log.Value) log.Value {
switch val.Kind() {
case log.KindString:
s := val.AsString()
if len(s) > r.attributeValueLengthLimit {
val = log.StringValue(truncate(r.attributeValueLengthLimit, s))
}
case log.KindSlice:
sl := val.AsSlice()
for i := range sl {
sl[i] = r.applyValueLimits(sl[i])
}
val = log.SliceValue(sl...)
case log.KindMap:
// Deduplicate then truncate. Do not do at the same time to avoid
// wasted truncation operations.
kvs, dropped := dedup(val.AsMap())
r.addDropped(dropped)
for i := range kvs {
kvs[i] = r.applyAttrLimits(kvs[i])
}
val = log.MapValue(kvs...)
}
return val
}
// truncate returns a truncated version of s such that it contains less than
// the limit number of characters. Truncation is applied by returning the limit
// number of valid characters contained in s.
//
// If limit is negative, it returns the original string.
//
// UTF-8 is supported. When truncating, all invalid characters are dropped
// before applying truncation.
//
// If s already contains less than the limit number of bytes, it is returned
// unchanged. No invalid characters are removed.
func truncate(limit int, s string) string {
// This prioritize performance in the following order based on the most
// common expected use-cases.
//
// - Short values less than the default limit (128).
// - Strings with valid encodings that exceed the limit.
// - No limit.
// - Strings with invalid encodings that exceed the limit.
if limit < 0 || len(s) <= limit {
return s
}
// Optimistically, assume all valid UTF-8.
var b strings.Builder
count := 0
for i, c := range s {
if c != utf8.RuneError {
count++
if count > limit {
return s[:i]
}
continue
}
_, size := utf8.DecodeRuneInString(s[i:])
if size == 1 {
// Invalid encoding.
b.Grow(len(s) - 1)
_, _ = b.WriteString(s[:i])
s = s[i:]
break
}
}
// Fast-path, no invalid input.
if b.Cap() == 0 {
return s
}
// Truncate while validating UTF-8.
for i := 0; i < len(s) && count < limit; {
c := s[i]
if c < utf8.RuneSelf {
// Optimization for single byte runes (common case).
_ = b.WriteByte(c)
i++
count++
continue
}
_, size := utf8.DecodeRuneInString(s[i:])
if size == 1 {
// We checked for all 1-byte runes above, this is a RuneError.
i++
continue
}
_, _ = b.WriteString(s[i : i+size])
i += size
count++
}
return b.String()
}

82
vendor/go.opentelemetry.io/otel/sdk/log/ring.go generated vendored Normal file
View file

@ -0,0 +1,82 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package log // import "go.opentelemetry.io/otel/sdk/log"
// A ring is an element of a circular list, or ring. Rings do not have a
// beginning or end; a pointer to any ring element serves as reference to the
// entire ring. Empty rings are represented as nil ring pointers. The zero
// value for a ring is a one-element ring with a nil Value.
//
// This is copied from the "container/ring" package. It uses a Record type for
// Value instead of any to avoid allocations.
type ring struct {
next, prev *ring
Value Record
}
func (r *ring) init() *ring {
r.next = r
r.prev = r
return r
}
// Next returns the next ring element. r must not be empty.
func (r *ring) Next() *ring {
if r.next == nil {
return r.init()
}
return r.next
}
// Prev returns the previous ring element. r must not be empty.
func (r *ring) Prev() *ring {
if r.next == nil {
return r.init()
}
return r.prev
}
// newRing creates a ring of n elements.
func newRing(n int) *ring {
if n <= 0 {
return nil
}
r := new(ring)
p := r
for i := 1; i < n; i++ {
p.next = &ring{prev: p}
p = p.next
}
p.next = r
r.prev = p
return r
}
// Len computes the number of elements in ring r. It executes in time
// proportional to the number of elements.
func (r *ring) Len() int {
n := 0
if r != nil {
n = 1
for p := r.Next(); p != r; p = p.next {
n++
}
}
return n
}
// Do calls function f on each element of the ring, in forward order. The
// behavior of Do is undefined if f changes *r.
func (r *ring) Do(f func(Record)) {
if r != nil {
f(r.Value)
for p := r.Next(); p != r; p = p.next {
f(p.Value)
}
}
}

119
vendor/go.opentelemetry.io/otel/sdk/log/setting.go generated vendored Normal file
View file

@ -0,0 +1,119 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package log // import "go.opentelemetry.io/otel/sdk/log"
import (
"fmt"
"os"
"strconv"
"time"
"go.opentelemetry.io/otel"
)
// setting is a configuration setting value.
type setting[T any] struct {
Value T
Set bool
}
// newSetting returns a new [setting] with the value set.
func newSetting[T any](value T) setting[T] {
return setting[T]{Value: value, Set: true}
}
// resolver returns an updated setting after applying an resolution operation.
type resolver[T any] func(setting[T]) setting[T]
// Resolve returns a resolved version of s.
//
// It will apply all the passed fn in the order provided, chaining together the
// return setting to the next input. The setting s is used as the initial
// argument to the first fn.
//
// Each fn needs to validate if it should apply given the Set state of the
// setting. This will not perform any checks on the set state when chaining
// function.
func (s setting[T]) Resolve(fn ...resolver[T]) setting[T] {
for _, f := range fn {
s = f(s)
}
return s
}
// clampMax returns a resolver that will ensure a setting value is no greater
// than n. If it is, the value is set to n.
func clampMax[T ~int | ~int64](n T) resolver[T] {
return func(s setting[T]) setting[T] {
if s.Value > n {
s.Value = n
}
return s
}
}
// clearLessThanOne returns a resolver that will clear a setting value and
// change its set state to false if its value is less than 1.
func clearLessThanOne[T ~int | ~int64]() resolver[T] {
return func(s setting[T]) setting[T] {
if s.Value < 1 {
s.Value = 0
s.Set = false
}
return s
}
}
// getenv returns a resolver that will apply an integer environment variable
// value associated with key to a setting value.
//
// If the input setting to the resolver is set, the environment variable will
// not be applied.
//
// If the environment variable value associated with key is not an integer, an
// error will be sent to the OTel error handler and the setting will not be
// updated.
//
// If the setting value is a [time.Duration] type, the environment variable
// will be interpreted as a duration of milliseconds.
func getenv[T ~int | ~int64](key string) resolver[T] {
return func(s setting[T]) setting[T] {
if s.Set {
// Passed, valid, options have precedence.
return s
}
if v := os.Getenv(key); v != "" {
n, err := strconv.Atoi(v)
if err != nil {
otel.Handle(fmt.Errorf("invalid %s value %s: %w", key, v, err))
} else {
switch any(s.Value).(type) {
case time.Duration:
// OTel duration envar are in millisecond.
s.Value = T(time.Duration(n) * time.Millisecond)
default:
s.Value = T(n)
}
s.Set = true
}
}
return s
}
}
// fallback returns a resolve that will set a setting value to val if it is not
// already set.
//
// This is usually passed at the end of a resolver chain to ensure a default is
// applied if the setting has not already been set.
func fallback[T any](val T) resolver[T] {
return func(s setting[T]) setting[T] {
if !s.Set {
s.Value = val
s.Set = true
}
return s
}
}

82
vendor/go.opentelemetry.io/otel/sdk/log/simple.go generated vendored Normal file
View file

@ -0,0 +1,82 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package log // import "go.opentelemetry.io/otel/sdk/log"
import (
"context"
"sync"
)
// Compile-time check SimpleProcessor implements Processor.
var _ Processor = (*SimpleProcessor)(nil)
// SimpleProcessor is an processor that synchronously exports log records.
//
// Use [NewSimpleProcessor] to create a SimpleProcessor.
type SimpleProcessor struct {
mu sync.Mutex
exporter Exporter
noCmp [0]func() //nolint: unused // This is indeed used.
}
// NewSimpleProcessor is a simple Processor adapter.
//
// This Processor is not recommended for production use due to its synchronous
// nature, which makes it suitable for testing, debugging, or demonstrating
// other features, but can lead to slow performance and high computational
// overhead. For production environments, it is recommended to use
// [NewBatchProcessor] instead. However, there may be exceptions where certain
// [Exporter] implementations perform better with this Processor.
func NewSimpleProcessor(exporter Exporter, _ ...SimpleProcessorOption) *SimpleProcessor {
return &SimpleProcessor{exporter: exporter}
}
var simpleProcRecordsPool = sync.Pool{
New: func() any {
records := make([]Record, 1)
return &records
},
}
// OnEmit batches provided log record.
func (s *SimpleProcessor) OnEmit(ctx context.Context, r *Record) error {
if s.exporter == nil {
return nil
}
s.mu.Lock()
defer s.mu.Unlock()
records := simpleProcRecordsPool.Get().(*[]Record)
(*records)[0] = *r
defer func() {
simpleProcRecordsPool.Put(records)
}()
return s.exporter.Export(ctx, *records)
}
// Shutdown shuts down the exporter.
func (s *SimpleProcessor) Shutdown(ctx context.Context) error {
if s.exporter == nil {
return nil
}
return s.exporter.Shutdown(ctx)
}
// ForceFlush flushes the exporter.
func (s *SimpleProcessor) ForceFlush(ctx context.Context) error {
if s.exporter == nil {
return nil
}
return s.exporter.ForceFlush(ctx)
}
// SimpleProcessorOption applies a configuration to a [SimpleProcessor].
type SimpleProcessorOption interface {
apply()
}

View file

@ -0,0 +1,3 @@
# SDK Trace test
[![PkgGoDev](https://pkg.go.dev/badge/go.opentelemetry.io/otel/sdk/trace/tracetest)](https://pkg.go.dev/go.opentelemetry.io/otel/sdk/trace/tracetest)

View file

@ -0,0 +1,74 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
// Package tracetest is a testing helper package for the SDK. User can
// configure no-op or in-memory exporters to verify different SDK behaviors or
// custom instrumentation.
package tracetest // import "go.opentelemetry.io/otel/sdk/trace/tracetest"
import (
"context"
"sync"
"go.opentelemetry.io/otel/sdk/trace"
)
var _ trace.SpanExporter = (*NoopExporter)(nil)
// NewNoopExporter returns a new no-op exporter.
func NewNoopExporter() *NoopExporter {
return new(NoopExporter)
}
// NoopExporter is an exporter that drops all received spans and performs no
// action.
type NoopExporter struct{}
// ExportSpans handles export of spans by dropping them.
func (nsb *NoopExporter) ExportSpans(context.Context, []trace.ReadOnlySpan) error { return nil }
// Shutdown stops the exporter by doing nothing.
func (nsb *NoopExporter) Shutdown(context.Context) error { return nil }
var _ trace.SpanExporter = (*InMemoryExporter)(nil)
// NewInMemoryExporter returns a new InMemoryExporter.
func NewInMemoryExporter() *InMemoryExporter {
return new(InMemoryExporter)
}
// InMemoryExporter is an exporter that stores all received spans in-memory.
type InMemoryExporter struct {
mu sync.Mutex
ss SpanStubs
}
// ExportSpans handles export of spans by storing them in memory.
func (imsb *InMemoryExporter) ExportSpans(_ context.Context, spans []trace.ReadOnlySpan) error {
imsb.mu.Lock()
defer imsb.mu.Unlock()
imsb.ss = append(imsb.ss, SpanStubsFromReadOnlySpans(spans)...)
return nil
}
// Shutdown stops the exporter by clearing spans held in memory.
func (imsb *InMemoryExporter) Shutdown(context.Context) error {
imsb.Reset()
return nil
}
// Reset the current in-memory storage.
func (imsb *InMemoryExporter) Reset() {
imsb.mu.Lock()
defer imsb.mu.Unlock()
imsb.ss = nil
}
// GetSpans returns the current in-memory stored spans.
func (imsb *InMemoryExporter) GetSpans() SpanStubs {
imsb.mu.Lock()
defer imsb.mu.Unlock()
ret := make(SpanStubs, len(imsb.ss))
copy(ret, imsb.ss)
return ret
}

View file

@ -0,0 +1,94 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package tracetest // import "go.opentelemetry.io/otel/sdk/trace/tracetest"
import (
"context"
"sync"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)
// SpanRecorder records started and ended spans.
type SpanRecorder struct {
startedMu sync.RWMutex
started []sdktrace.ReadWriteSpan
endedMu sync.RWMutex
ended []sdktrace.ReadOnlySpan
}
var _ sdktrace.SpanProcessor = (*SpanRecorder)(nil)
// NewSpanRecorder returns a new initialized SpanRecorder.
func NewSpanRecorder() *SpanRecorder {
return new(SpanRecorder)
}
// OnStart records started spans.
//
// This method is safe to be called concurrently.
func (sr *SpanRecorder) OnStart(_ context.Context, s sdktrace.ReadWriteSpan) {
sr.startedMu.Lock()
defer sr.startedMu.Unlock()
sr.started = append(sr.started, s)
}
// OnEnd records completed spans.
//
// This method is safe to be called concurrently.
func (sr *SpanRecorder) OnEnd(s sdktrace.ReadOnlySpan) {
sr.endedMu.Lock()
defer sr.endedMu.Unlock()
sr.ended = append(sr.ended, s)
}
// Shutdown does nothing.
//
// This method is safe to be called concurrently.
func (sr *SpanRecorder) Shutdown(context.Context) error {
return nil
}
// ForceFlush does nothing.
//
// This method is safe to be called concurrently.
func (sr *SpanRecorder) ForceFlush(context.Context) error {
return nil
}
// Started returns a copy of all started spans that have been recorded.
//
// This method is safe to be called concurrently.
func (sr *SpanRecorder) Started() []sdktrace.ReadWriteSpan {
sr.startedMu.RLock()
defer sr.startedMu.RUnlock()
dst := make([]sdktrace.ReadWriteSpan, len(sr.started))
copy(dst, sr.started)
return dst
}
// Reset clears the recorded spans.
//
// This method is safe to be called concurrently.
func (sr *SpanRecorder) Reset() {
sr.startedMu.Lock()
sr.endedMu.Lock()
defer sr.startedMu.Unlock()
defer sr.endedMu.Unlock()
sr.started = nil
sr.ended = nil
}
// Ended returns a copy of all ended spans that have been recorded.
//
// This method is safe to be called concurrently.
func (sr *SpanRecorder) Ended() []sdktrace.ReadOnlySpan {
sr.endedMu.RLock()
defer sr.endedMu.RUnlock()
dst := make([]sdktrace.ReadOnlySpan, len(sr.ended))
copy(dst, sr.ended)
return dst
}

View file

@ -0,0 +1,166 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package tracetest // import "go.opentelemetry.io/otel/sdk/trace/tracetest"
import (
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/resource"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
)
// SpanStubs is a slice of SpanStub use for testing an SDK.
type SpanStubs []SpanStub
// SpanStubsFromReadOnlySpans returns SpanStubs populated from ro.
func SpanStubsFromReadOnlySpans(ro []tracesdk.ReadOnlySpan) SpanStubs {
if len(ro) == 0 {
return nil
}
s := make(SpanStubs, 0, len(ro))
for _, r := range ro {
s = append(s, SpanStubFromReadOnlySpan(r))
}
return s
}
// Snapshots returns s as a slice of ReadOnlySpans.
func (s SpanStubs) Snapshots() []tracesdk.ReadOnlySpan {
if len(s) == 0 {
return nil
}
ro := make([]tracesdk.ReadOnlySpan, len(s))
for i := 0; i < len(s); i++ {
ro[i] = s[i].Snapshot()
}
return ro
}
// SpanStub is a stand-in for a Span.
type SpanStub struct {
Name string
SpanContext trace.SpanContext
Parent trace.SpanContext
SpanKind trace.SpanKind
StartTime time.Time
EndTime time.Time
Attributes []attribute.KeyValue
Events []tracesdk.Event
Links []tracesdk.Link
Status tracesdk.Status
DroppedAttributes int
DroppedEvents int
DroppedLinks int
ChildSpanCount int
Resource *resource.Resource
InstrumentationScope instrumentation.Scope
// Deprecated: use InstrumentationScope instead.
InstrumentationLibrary instrumentation.Library //nolint:staticcheck // This method needs to be define for backwards compatibility
}
// SpanStubFromReadOnlySpan returns a SpanStub populated from ro.
func SpanStubFromReadOnlySpan(ro tracesdk.ReadOnlySpan) SpanStub {
if ro == nil {
return SpanStub{}
}
return SpanStub{
Name: ro.Name(),
SpanContext: ro.SpanContext(),
Parent: ro.Parent(),
SpanKind: ro.SpanKind(),
StartTime: ro.StartTime(),
EndTime: ro.EndTime(),
Attributes: ro.Attributes(),
Events: ro.Events(),
Links: ro.Links(),
Status: ro.Status(),
DroppedAttributes: ro.DroppedAttributes(),
DroppedEvents: ro.DroppedEvents(),
DroppedLinks: ro.DroppedLinks(),
ChildSpanCount: ro.ChildSpanCount(),
Resource: ro.Resource(),
InstrumentationScope: ro.InstrumentationScope(),
InstrumentationLibrary: ro.InstrumentationScope(),
}
}
// Snapshot returns a read-only copy of the SpanStub.
func (s SpanStub) Snapshot() tracesdk.ReadOnlySpan {
scopeOrLibrary := s.InstrumentationScope
if scopeOrLibrary.Name == "" && scopeOrLibrary.Version == "" && scopeOrLibrary.SchemaURL == "" {
scopeOrLibrary = s.InstrumentationLibrary
}
return spanSnapshot{
name: s.Name,
spanContext: s.SpanContext,
parent: s.Parent,
spanKind: s.SpanKind,
startTime: s.StartTime,
endTime: s.EndTime,
attributes: s.Attributes,
events: s.Events,
links: s.Links,
status: s.Status,
droppedAttributes: s.DroppedAttributes,
droppedEvents: s.DroppedEvents,
droppedLinks: s.DroppedLinks,
childSpanCount: s.ChildSpanCount,
resource: s.Resource,
instrumentationScope: scopeOrLibrary,
}
}
type spanSnapshot struct {
// Embed the interface to implement the private method.
tracesdk.ReadOnlySpan
name string
spanContext trace.SpanContext
parent trace.SpanContext
spanKind trace.SpanKind
startTime time.Time
endTime time.Time
attributes []attribute.KeyValue
events []tracesdk.Event
links []tracesdk.Link
status tracesdk.Status
droppedAttributes int
droppedEvents int
droppedLinks int
childSpanCount int
resource *resource.Resource
instrumentationScope instrumentation.Scope
}
func (s spanSnapshot) Name() string { return s.name }
func (s spanSnapshot) SpanContext() trace.SpanContext { return s.spanContext }
func (s spanSnapshot) Parent() trace.SpanContext { return s.parent }
func (s spanSnapshot) SpanKind() trace.SpanKind { return s.spanKind }
func (s spanSnapshot) StartTime() time.Time { return s.startTime }
func (s spanSnapshot) EndTime() time.Time { return s.endTime }
func (s spanSnapshot) Attributes() []attribute.KeyValue { return s.attributes }
func (s spanSnapshot) Links() []tracesdk.Link { return s.links }
func (s spanSnapshot) Events() []tracesdk.Event { return s.events }
func (s spanSnapshot) Status() tracesdk.Status { return s.status }
func (s spanSnapshot) DroppedAttributes() int { return s.droppedAttributes }
func (s spanSnapshot) DroppedLinks() int { return s.droppedLinks }
func (s spanSnapshot) DroppedEvents() int { return s.droppedEvents }
func (s spanSnapshot) ChildSpanCount() int { return s.childSpanCount }
func (s spanSnapshot) Resource() *resource.Resource { return s.resource }
func (s spanSnapshot) InstrumentationScope() instrumentation.Scope {
return s.instrumentationScope
}
func (s spanSnapshot) InstrumentationLibrary() instrumentation.Library { //nolint:staticcheck // This method needs to be define for backwards compatibility
return s.instrumentationScope
}