[chore] Bump all otel deps (#3241)

This commit is contained in:
tobi 2024-08-26 18:05:54 +02:00 committed by GitHub
commit 28d57d1f13
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
193 changed files with 13714 additions and 2346 deletions

View file

@ -1,16 +1,5 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// SPDX-License-Identifier: Apache-2.0
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
@ -50,7 +39,7 @@ type Builder[N int64 | float64] struct {
//
// If this is not provided a default factory function that returns an
// exemplar.Drop reservoir will be used.
ReservoirFunc func() exemplar.Reservoir[N]
ReservoirFunc func() exemplar.FilteredReservoir[N]
// AggregationLimit is the cardinality limit of measurement attributes. Any
// measurement for new attributes once the limit has been reached will be
// aggregated into a single aggregate for the "otel.metric.overflow"
@ -61,12 +50,12 @@ type Builder[N int64 | float64] struct {
AggregationLimit int
}
func (b Builder[N]) resFunc() func() exemplar.Reservoir[N] {
func (b Builder[N]) resFunc() func() exemplar.FilteredReservoir[N] {
if b.ReservoirFunc != nil {
return b.ReservoirFunc
}
return exemplar.Drop[N]
return exemplar.Drop
}
type fltrMeasure[N int64 | float64] func(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue)
@ -85,21 +74,26 @@ func (b Builder[N]) filter(f fltrMeasure[N]) Measure[N] {
}
// LastValue returns a last-value aggregate function input and output.
//
// The Builder.Temporality is ignored and delta is use always.
func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) {
// Delta temporality is the only temporality that makes semantic sense for
// a last-value aggregate.
lv := newLastValue[N](b.AggregationLimit, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(lv.measure), lv.delta
default:
return b.filter(lv.measure), lv.cumulative
}
}
return b.filter(lv.measure), func(dest *metricdata.Aggregation) int {
// Ignore if dest is not a metricdata.Gauge. The chance for memory
// reuse of the DataPoints is missed (better luck next time).
gData, _ := (*dest).(metricdata.Gauge[N])
lv.computeAggregation(&gData.DataPoints)
*dest = gData
return len(gData.DataPoints)
// PrecomputedLastValue returns a last-value aggregate function input and
// output. The aggregation returned from the returned ComputeAggregation
// function will always only return values from the previous collection cycle.
func (b Builder[N]) PrecomputedLastValue() (Measure[N], ComputeAggregation) {
lv := newPrecomputedLastValue[N](b.AggregationLimit, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(lv.measure), lv.delta
default:
return b.filter(lv.measure), lv.cumulative
}
}

View file

@ -1,16 +1,5 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// SPDX-License-Identifier: Apache-2.0
// Package aggregate provides aggregate types used compute aggregations and
// cycle the state of metric measurements made by the SDK. These types and

View file

@ -0,0 +1,42 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
import (
"sync"
"go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
var exemplarPool = sync.Pool{
New: func() any { return new([]exemplar.Exemplar) },
}
func collectExemplars[N int64 | float64](out *[]metricdata.Exemplar[N], f func(*[]exemplar.Exemplar)) {
dest := exemplarPool.Get().(*[]exemplar.Exemplar)
defer func() {
*dest = (*dest)[:0]
exemplarPool.Put(dest)
}()
*dest = reset(*dest, len(*out), cap(*out))
f(dest)
*out = reset(*out, len(*dest), cap(*dest))
for i, e := range *dest {
(*out)[i].FilteredAttributes = e.FilteredAttributes
(*out)[i].Time = e.Time
(*out)[i].SpanID = e.SpanID
(*out)[i].TraceID = e.TraceID
switch e.Value.Type() {
case exemplar.Int64ValueType:
(*out)[i].Value = N(e.Value.Int64())
case exemplar.Float64ValueType:
(*out)[i].Value = N(e.Value.Float64())
}
}
}

View file

@ -1,16 +1,5 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// SPDX-License-Identifier: Apache-2.0
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
@ -41,7 +30,8 @@ const (
// expoHistogramDataPoint is a single data point in an exponential histogram.
type expoHistogramDataPoint[N int64 | float64] struct {
res exemplar.Reservoir[N]
attrs attribute.Set
res exemplar.FilteredReservoir[N]
count uint64
min N
@ -52,14 +42,14 @@ type expoHistogramDataPoint[N int64 | float64] struct {
noMinMax bool
noSum bool
scale int
scale int32
posBuckets expoBuckets
negBuckets expoBuckets
zeroCount uint64
}
func newExpoHistogramDataPoint[N int64 | float64](maxSize, maxScale int, noMinMax, noSum bool) *expoHistogramDataPoint[N] {
func newExpoHistogramDataPoint[N int64 | float64](attrs attribute.Set, maxSize int, maxScale int32, noMinMax, noSum bool) *expoHistogramDataPoint[N] {
f := math.MaxFloat64
max := N(f) // if N is int64, max will overflow to -9223372036854775808
min := N(-f)
@ -68,6 +58,7 @@ func newExpoHistogramDataPoint[N int64 | float64](maxSize, maxScale int, noMinMa
min = N(minInt64)
}
return &expoHistogramDataPoint[N]{
attrs: attrs,
min: max,
max: min,
maxSize: maxSize,
@ -128,11 +119,13 @@ func (p *expoHistogramDataPoint[N]) record(v N) {
}
// getBin returns the bin v should be recorded into.
func (p *expoHistogramDataPoint[N]) getBin(v float64) int {
frac, exp := math.Frexp(v)
func (p *expoHistogramDataPoint[N]) getBin(v float64) int32 {
frac, expInt := math.Frexp(v)
// 11-bit exponential.
exp := int32(expInt) // nolint: gosec
if p.scale <= 0 {
// Because of the choice of fraction is always 1 power of two higher than we want.
correction := 1
var correction int32 = 1
if frac == .5 {
// If v is an exact power of two the frac will be .5 and the exp
// will be one higher than we want.
@ -140,7 +133,7 @@ func (p *expoHistogramDataPoint[N]) getBin(v float64) int {
}
return (exp - correction) >> (-p.scale)
}
return exp<<p.scale + int(math.Log(frac)*scaleFactors[p.scale]) - 1
return exp<<p.scale + int32(math.Log(frac)*scaleFactors[p.scale]) - 1
}
// scaleFactors are constants used in calculating the logarithm index. They are
@ -171,20 +164,20 @@ var scaleFactors = [21]float64{
// scaleChange returns the magnitude of the scale change needed to fit bin in
// the bucket. If no scale change is needed 0 is returned.
func (p *expoHistogramDataPoint[N]) scaleChange(bin, startBin, length int) int {
func (p *expoHistogramDataPoint[N]) scaleChange(bin, startBin int32, length int) int32 {
if length == 0 {
// No need to rescale if there are no buckets.
return 0
}
low := startBin
high := bin
low := int(startBin)
high := int(bin)
if startBin >= bin {
low = bin
high = startBin + length - 1
low = int(bin)
high = int(startBin) + length - 1
}
count := 0
var count int32
for high-low >= p.maxSize {
low = low >> 1
high = high >> 1
@ -198,39 +191,39 @@ func (p *expoHistogramDataPoint[N]) scaleChange(bin, startBin, length int) int {
// expoBuckets is a set of buckets in an exponential histogram.
type expoBuckets struct {
startBin int
startBin int32
counts []uint64
}
// record increments the count for the given bin, and expands the buckets if needed.
// Size changes must be done before calling this function.
func (b *expoBuckets) record(bin int) {
func (b *expoBuckets) record(bin int32) {
if len(b.counts) == 0 {
b.counts = []uint64{1}
b.startBin = bin
return
}
endBin := b.startBin + len(b.counts) - 1
endBin := int(b.startBin) + len(b.counts) - 1
// if the new bin is inside the current range
if bin >= b.startBin && bin <= endBin {
if bin >= b.startBin && int(bin) <= endBin {
b.counts[bin-b.startBin]++
return
}
// if the new bin is before the current start add spaces to the counts
if bin < b.startBin {
origLen := len(b.counts)
newLength := endBin - bin + 1
newLength := endBin - int(bin) + 1
shift := b.startBin - bin
if newLength > cap(b.counts) {
b.counts = append(b.counts, make([]uint64, newLength-len(b.counts))...)
}
copy(b.counts[shift:origLen+shift], b.counts[:])
copy(b.counts[shift:origLen+int(shift)], b.counts[:])
b.counts = b.counts[:newLength]
for i := 1; i < shift; i++ {
for i := 1; i < int(shift); i++ {
b.counts[i] = 0
}
b.startBin = bin
@ -238,17 +231,17 @@ func (b *expoBuckets) record(bin int) {
return
}
// if the new is after the end add spaces to the end
if bin > endBin {
if bin-b.startBin < cap(b.counts) {
if int(bin) > endBin {
if int(bin-b.startBin) < cap(b.counts) {
b.counts = b.counts[:bin-b.startBin+1]
for i := endBin + 1 - b.startBin; i < len(b.counts); i++ {
for i := endBin + 1 - int(b.startBin); i < len(b.counts); i++ {
b.counts[i] = 0
}
b.counts[bin-b.startBin] = 1
return
}
end := make([]uint64, bin-b.startBin-len(b.counts)+1)
end := make([]uint64, int(bin-b.startBin)-len(b.counts)+1)
b.counts = append(b.counts, end...)
b.counts[bin-b.startBin] = 1
}
@ -256,7 +249,7 @@ func (b *expoBuckets) record(bin int) {
// downscale shrinks a bucket by a factor of 2*s. It will sum counts into the
// correct lower resolution bucket.
func (b *expoBuckets) downscale(delta int) {
func (b *expoBuckets) downscale(delta int32) {
// Example
// delta = 2
// Original offset: -6
@ -271,19 +264,19 @@ func (b *expoBuckets) downscale(delta int) {
return
}
steps := 1 << delta
steps := int32(1) << delta
offset := b.startBin % steps
offset = (offset + steps) % steps // to make offset positive
for i := 1; i < len(b.counts); i++ {
idx := i + offset
if idx%steps == 0 {
b.counts[idx/steps] = b.counts[i]
idx := i + int(offset)
if idx%int(steps) == 0 {
b.counts[idx/int(steps)] = b.counts[i]
continue
}
b.counts[idx/steps] += b.counts[i]
b.counts[idx/int(steps)] += b.counts[i]
}
lastIdx := (len(b.counts) - 1 + offset) / steps
lastIdx := (len(b.counts) - 1 + int(offset)) / int(steps)
b.counts = b.counts[:lastIdx+1]
b.startBin = b.startBin >> delta
}
@ -291,16 +284,16 @@ func (b *expoBuckets) downscale(delta int) {
// newExponentialHistogram returns an Aggregator that summarizes a set of
// measurements as an exponential histogram. Each histogram is scoped by attributes
// and the aggregation cycle the measurements were made in.
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func() exemplar.Reservoir[N]) *expoHistogram[N] {
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func() exemplar.FilteredReservoir[N]) *expoHistogram[N] {
return &expoHistogram[N]{
noSum: noSum,
noMinMax: noMinMax,
maxSize: int(maxSize),
maxScale: int(maxScale),
maxScale: maxScale,
newRes: r,
limit: newLimiter[*expoHistogramDataPoint[N]](limit),
values: make(map[attribute.Set]*expoHistogramDataPoint[N]),
values: make(map[attribute.Distinct]*expoHistogramDataPoint[N]),
start: now(),
}
@ -312,11 +305,11 @@ type expoHistogram[N int64 | float64] struct {
noSum bool
noMinMax bool
maxSize int
maxScale int
maxScale int32
newRes func() exemplar.Reservoir[N]
newRes func() exemplar.FilteredReservoir[N]
limit limiter[*expoHistogramDataPoint[N]]
values map[attribute.Set]*expoHistogramDataPoint[N]
values map[attribute.Distinct]*expoHistogramDataPoint[N]
valuesMu sync.Mutex
start time.Time
@ -328,21 +321,19 @@ func (e *expoHistogram[N]) measure(ctx context.Context, value N, fltrAttr attrib
return
}
t := now()
e.valuesMu.Lock()
defer e.valuesMu.Unlock()
attr := e.limit.Attributes(fltrAttr, e.values)
v, ok := e.values[attr]
v, ok := e.values[attr.Equivalent()]
if !ok {
v = newExpoHistogramDataPoint[N](e.maxSize, e.maxScale, e.noMinMax, e.noSum)
v = newExpoHistogramDataPoint[N](attr, e.maxSize, e.maxScale, e.noMinMax, e.noSum)
v.res = e.newRes()
e.values[attr] = v
e.values[attr.Equivalent()] = v
}
v.record(value)
v.res.Offer(ctx, t, value, droppedAttr)
v.res.Offer(ctx, value, droppedAttr)
}
func (e *expoHistogram[N]) delta(dest *metricdata.Aggregation) int {
@ -360,36 +351,38 @@ func (e *expoHistogram[N]) delta(dest *metricdata.Aggregation) int {
hDPts := reset(h.DataPoints, n, n)
var i int
for a, b := range e.values {
hDPts[i].Attributes = a
for _, val := range e.values {
hDPts[i].Attributes = val.attrs
hDPts[i].StartTime = e.start
hDPts[i].Time = t
hDPts[i].Count = b.count
hDPts[i].Scale = int32(b.scale)
hDPts[i].ZeroCount = b.zeroCount
hDPts[i].Count = val.count
hDPts[i].Scale = val.scale
hDPts[i].ZeroCount = val.zeroCount
hDPts[i].ZeroThreshold = 0.0
hDPts[i].PositiveBucket.Offset = int32(b.posBuckets.startBin)
hDPts[i].PositiveBucket.Counts = reset(hDPts[i].PositiveBucket.Counts, len(b.posBuckets.counts), len(b.posBuckets.counts))
copy(hDPts[i].PositiveBucket.Counts, b.posBuckets.counts)
hDPts[i].PositiveBucket.Offset = val.posBuckets.startBin
hDPts[i].PositiveBucket.Counts = reset(hDPts[i].PositiveBucket.Counts, len(val.posBuckets.counts), len(val.posBuckets.counts))
copy(hDPts[i].PositiveBucket.Counts, val.posBuckets.counts)
hDPts[i].NegativeBucket.Offset = int32(b.negBuckets.startBin)
hDPts[i].NegativeBucket.Counts = reset(hDPts[i].NegativeBucket.Counts, len(b.negBuckets.counts), len(b.negBuckets.counts))
copy(hDPts[i].NegativeBucket.Counts, b.negBuckets.counts)
hDPts[i].NegativeBucket.Offset = val.negBuckets.startBin
hDPts[i].NegativeBucket.Counts = reset(hDPts[i].NegativeBucket.Counts, len(val.negBuckets.counts), len(val.negBuckets.counts))
copy(hDPts[i].NegativeBucket.Counts, val.negBuckets.counts)
if !e.noSum {
hDPts[i].Sum = b.sum
hDPts[i].Sum = val.sum
}
if !e.noMinMax {
hDPts[i].Min = metricdata.NewExtrema(b.min)
hDPts[i].Max = metricdata.NewExtrema(b.max)
hDPts[i].Min = metricdata.NewExtrema(val.min)
hDPts[i].Max = metricdata.NewExtrema(val.max)
}
b.res.Collect(&hDPts[i].Exemplars)
collectExemplars(&hDPts[i].Exemplars, val.res.Collect)
delete(e.values, a)
i++
}
// Unused attribute sets do not report.
clear(e.values)
e.start = t
h.DataPoints = hDPts
*dest = h
@ -411,32 +404,32 @@ func (e *expoHistogram[N]) cumulative(dest *metricdata.Aggregation) int {
hDPts := reset(h.DataPoints, n, n)
var i int
for a, b := range e.values {
hDPts[i].Attributes = a
for _, val := range e.values {
hDPts[i].Attributes = val.attrs
hDPts[i].StartTime = e.start
hDPts[i].Time = t
hDPts[i].Count = b.count
hDPts[i].Scale = int32(b.scale)
hDPts[i].ZeroCount = b.zeroCount
hDPts[i].Count = val.count
hDPts[i].Scale = val.scale
hDPts[i].ZeroCount = val.zeroCount
hDPts[i].ZeroThreshold = 0.0
hDPts[i].PositiveBucket.Offset = int32(b.posBuckets.startBin)
hDPts[i].PositiveBucket.Counts = reset(hDPts[i].PositiveBucket.Counts, len(b.posBuckets.counts), len(b.posBuckets.counts))
copy(hDPts[i].PositiveBucket.Counts, b.posBuckets.counts)
hDPts[i].PositiveBucket.Offset = val.posBuckets.startBin
hDPts[i].PositiveBucket.Counts = reset(hDPts[i].PositiveBucket.Counts, len(val.posBuckets.counts), len(val.posBuckets.counts))
copy(hDPts[i].PositiveBucket.Counts, val.posBuckets.counts)
hDPts[i].NegativeBucket.Offset = int32(b.negBuckets.startBin)
hDPts[i].NegativeBucket.Counts = reset(hDPts[i].NegativeBucket.Counts, len(b.negBuckets.counts), len(b.negBuckets.counts))
copy(hDPts[i].NegativeBucket.Counts, b.negBuckets.counts)
hDPts[i].NegativeBucket.Offset = val.negBuckets.startBin
hDPts[i].NegativeBucket.Counts = reset(hDPts[i].NegativeBucket.Counts, len(val.negBuckets.counts), len(val.negBuckets.counts))
copy(hDPts[i].NegativeBucket.Counts, val.negBuckets.counts)
if !e.noSum {
hDPts[i].Sum = b.sum
hDPts[i].Sum = val.sum
}
if !e.noMinMax {
hDPts[i].Min = metricdata.NewExtrema(b.min)
hDPts[i].Max = metricdata.NewExtrema(b.max)
hDPts[i].Min = metricdata.NewExtrema(val.min)
hDPts[i].Max = metricdata.NewExtrema(val.max)
}
b.res.Collect(&hDPts[i].Exemplars)
collectExemplars(&hDPts[i].Exemplars, val.res.Collect)
i++
// TODO (#3006): This will use an unbounded amount of memory if there

View file

@ -1,21 +1,11 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// SPDX-License-Identifier: Apache-2.0
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
import (
"context"
"slices"
"sort"
"sync"
"time"
@ -26,7 +16,8 @@ import (
)
type buckets[N int64 | float64] struct {
res exemplar.Reservoir[N]
attrs attribute.Set
res exemplar.FilteredReservoir[N]
counts []uint64
count uint64
@ -35,8 +26,8 @@ type buckets[N int64 | float64] struct {
}
// newBuckets returns buckets with n bins.
func newBuckets[N int64 | float64](n int) *buckets[N] {
return &buckets[N]{counts: make([]uint64, n)}
func newBuckets[N int64 | float64](attrs attribute.Set, n int) *buckets[N] {
return &buckets[N]{attrs: attrs, counts: make([]uint64, n)}
}
func (b *buckets[N]) sum(value N) { b.total += value }
@ -57,26 +48,25 @@ type histValues[N int64 | float64] struct {
noSum bool
bounds []float64
newRes func() exemplar.Reservoir[N]
newRes func() exemplar.FilteredReservoir[N]
limit limiter[*buckets[N]]
values map[attribute.Set]*buckets[N]
values map[attribute.Distinct]*buckets[N]
valuesMu sync.Mutex
}
func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func() exemplar.Reservoir[N]) *histValues[N] {
func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func() exemplar.FilteredReservoir[N]) *histValues[N] {
// The responsibility of keeping all buckets correctly associated with the
// passed boundaries is ultimately this type's responsibility. Make a copy
// here so we can always guarantee this. Or, in the case of failure, have
// complete control over the fix.
b := make([]float64, len(bounds))
copy(b, bounds)
sort.Float64s(b)
b := slices.Clone(bounds)
slices.Sort(b)
return &histValues[N]{
noSum: noSum,
bounds: b,
newRes: r,
limit: newLimiter[*buckets[N]](limit),
values: make(map[attribute.Set]*buckets[N]),
values: make(map[attribute.Distinct]*buckets[N]),
}
}
@ -90,13 +80,11 @@ func (s *histValues[N]) measure(ctx context.Context, value N, fltrAttr attribute
// (s.bounds[len(s.bounds)-1], +∞).
idx := sort.SearchFloat64s(s.bounds, float64(value))
t := now()
s.valuesMu.Lock()
defer s.valuesMu.Unlock()
attr := s.limit.Attributes(fltrAttr, s.values)
b, ok := s.values[attr]
b, ok := s.values[attr.Equivalent()]
if !ok {
// N+1 buckets. For example:
//
@ -105,23 +93,23 @@ func (s *histValues[N]) measure(ctx context.Context, value N, fltrAttr attribute
// Then,
//
// buckets = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞)
b = newBuckets[N](len(s.bounds) + 1)
b = newBuckets[N](attr, len(s.bounds)+1)
b.res = s.newRes()
// Ensure min and max are recorded values (not zero), for new buckets.
b.min, b.max = value, value
s.values[attr] = b
s.values[attr.Equivalent()] = b
}
b.bin(idx, value)
if !s.noSum {
b.sum(value)
}
b.res.Offer(ctx, t, value, droppedAttr)
b.res.Offer(ctx, value, droppedAttr)
}
// newHistogram returns an Aggregator that summarizes a set of measurements as
// an histogram.
func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func() exemplar.Reservoir[N]) *histogram[N] {
func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func() exemplar.FilteredReservoir[N]) *histogram[N] {
return &histogram[N]{
histValues: newHistValues[N](boundaries, noSum, limit, r),
noMinMax: noMinMax,
@ -150,36 +138,35 @@ func (s *histogram[N]) delta(dest *metricdata.Aggregation) int {
defer s.valuesMu.Unlock()
// Do not allow modification of our copy of bounds.
bounds := make([]float64, len(s.bounds))
copy(bounds, s.bounds)
bounds := slices.Clone(s.bounds)
n := len(s.values)
hDPts := reset(h.DataPoints, n, n)
var i int
for a, b := range s.values {
hDPts[i].Attributes = a
for _, val := range s.values {
hDPts[i].Attributes = val.attrs
hDPts[i].StartTime = s.start
hDPts[i].Time = t
hDPts[i].Count = b.count
hDPts[i].Count = val.count
hDPts[i].Bounds = bounds
hDPts[i].BucketCounts = b.counts
hDPts[i].BucketCounts = val.counts
if !s.noSum {
hDPts[i].Sum = b.total
hDPts[i].Sum = val.total
}
if !s.noMinMax {
hDPts[i].Min = metricdata.NewExtrema(b.min)
hDPts[i].Max = metricdata.NewExtrema(b.max)
hDPts[i].Min = metricdata.NewExtrema(val.min)
hDPts[i].Max = metricdata.NewExtrema(val.max)
}
b.res.Collect(&hDPts[i].Exemplars)
collectExemplars(&hDPts[i].Exemplars, val.res.Collect)
// Unused attribute sets do not report.
delete(s.values, a)
i++
}
// Unused attribute sets do not report.
clear(s.values)
// The delta collection cycle resets.
s.start = t
@ -201,39 +188,36 @@ func (s *histogram[N]) cumulative(dest *metricdata.Aggregation) int {
defer s.valuesMu.Unlock()
// Do not allow modification of our copy of bounds.
bounds := make([]float64, len(s.bounds))
copy(bounds, s.bounds)
bounds := slices.Clone(s.bounds)
n := len(s.values)
hDPts := reset(h.DataPoints, n, n)
var i int
for a, b := range s.values {
for _, val := range s.values {
hDPts[i].Attributes = val.attrs
hDPts[i].StartTime = s.start
hDPts[i].Time = t
hDPts[i].Count = val.count
hDPts[i].Bounds = bounds
// The HistogramDataPoint field values returned need to be copies of
// the buckets value as we will keep updating them.
//
// TODO (#3047): Making copies for bounds and counts incurs a large
// memory allocation footprint. Alternatives should be explored.
counts := make([]uint64, len(b.counts))
copy(counts, b.counts)
hDPts[i].Attributes = a
hDPts[i].StartTime = s.start
hDPts[i].Time = t
hDPts[i].Count = b.count
hDPts[i].Bounds = bounds
hDPts[i].BucketCounts = counts
hDPts[i].BucketCounts = slices.Clone(val.counts)
if !s.noSum {
hDPts[i].Sum = b.total
hDPts[i].Sum = val.total
}
if !s.noMinMax {
hDPts[i].Min = metricdata.NewExtrema(b.min)
hDPts[i].Max = metricdata.NewExtrema(b.max)
hDPts[i].Min = metricdata.NewExtrema(val.min)
hDPts[i].Max = metricdata.NewExtrema(val.max)
}
b.res.Collect(&hDPts[i].Exemplars)
collectExemplars(&hDPts[i].Exemplars, val.res.Collect)
i++
// TODO (#3006): This will use an unbounded amount of memory if there

View file

@ -1,16 +1,5 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// SPDX-License-Identifier: Apache-2.0
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
@ -26,16 +15,17 @@ import (
// datapoint is timestamped measurement data.
type datapoint[N int64 | float64] struct {
timestamp time.Time
value N
res exemplar.Reservoir[N]
attrs attribute.Set
value N
res exemplar.FilteredReservoir[N]
}
func newLastValue[N int64 | float64](limit int, r func() exemplar.Reservoir[N]) *lastValue[N] {
func newLastValue[N int64 | float64](limit int, r func() exemplar.FilteredReservoir[N]) *lastValue[N] {
return &lastValue[N]{
newRes: r,
limit: newLimiter[datapoint[N]](limit),
values: make(map[attribute.Set]datapoint[N]),
values: make(map[attribute.Distinct]datapoint[N]),
start: now(),
}
}
@ -43,47 +33,130 @@ func newLastValue[N int64 | float64](limit int, r func() exemplar.Reservoir[N])
type lastValue[N int64 | float64] struct {
sync.Mutex
newRes func() exemplar.Reservoir[N]
newRes func() exemplar.FilteredReservoir[N]
limit limiter[datapoint[N]]
values map[attribute.Set]datapoint[N]
values map[attribute.Distinct]datapoint[N]
start time.Time
}
func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) {
t := now()
s.Lock()
defer s.Unlock()
attr := s.limit.Attributes(fltrAttr, s.values)
d, ok := s.values[attr]
d, ok := s.values[attr.Equivalent()]
if !ok {
d.res = s.newRes()
}
d.timestamp = t
d.attrs = attr
d.value = value
d.res.Offer(ctx, t, value, droppedAttr)
d.res.Offer(ctx, value, droppedAttr)
s.values[attr] = d
s.values[attr.Equivalent()] = d
}
func (s *lastValue[N]) computeAggregation(dest *[]metricdata.DataPoint[N]) {
func (s *lastValue[N]) delta(dest *metricdata.Aggregation) int {
t := now()
// Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of
// the DataPoints is missed (better luck next time).
gData, _ := (*dest).(metricdata.Gauge[N])
s.Lock()
defer s.Unlock()
n := s.copyDpts(&gData.DataPoints, t)
// Do not report stale values.
clear(s.values)
// Update start time for delta temporality.
s.start = t
*dest = gData
return n
}
func (s *lastValue[N]) cumulative(dest *metricdata.Aggregation) int {
t := now()
// Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of
// the DataPoints is missed (better luck next time).
gData, _ := (*dest).(metricdata.Gauge[N])
s.Lock()
defer s.Unlock()
n := s.copyDpts(&gData.DataPoints, t)
// TODO (#3006): This will use an unbounded amount of memory if there
// are unbounded number of attribute sets being aggregated. Attribute
// sets that become "stale" need to be forgotten so this will not
// overload the system.
*dest = gData
return n
}
// copyDpts copies the datapoints held by s into dest. The number of datapoints
// copied is returned.
func (s *lastValue[N]) copyDpts(dest *[]metricdata.DataPoint[N], t time.Time) int {
n := len(s.values)
*dest = reset(*dest, n, n)
var i int
for a, v := range s.values {
(*dest)[i].Attributes = a
// The event time is the only meaningful timestamp, StartTime is
// ignored.
(*dest)[i].Time = v.timestamp
for _, v := range s.values {
(*dest)[i].Attributes = v.attrs
(*dest)[i].StartTime = s.start
(*dest)[i].Time = t
(*dest)[i].Value = v.value
v.res.Collect(&(*dest)[i].Exemplars)
// Do not report stale values.
delete(s.values, a)
collectExemplars(&(*dest)[i].Exemplars, v.res.Collect)
i++
}
return n
}
// newPrecomputedLastValue returns an aggregator that summarizes a set of
// observations as the last one made.
func newPrecomputedLastValue[N int64 | float64](limit int, r func() exemplar.FilteredReservoir[N]) *precomputedLastValue[N] {
return &precomputedLastValue[N]{lastValue: newLastValue[N](limit, r)}
}
// precomputedLastValue summarizes a set of observations as the last one made.
type precomputedLastValue[N int64 | float64] struct {
*lastValue[N]
}
func (s *precomputedLastValue[N]) delta(dest *metricdata.Aggregation) int {
t := now()
// Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of
// the DataPoints is missed (better luck next time).
gData, _ := (*dest).(metricdata.Gauge[N])
s.Lock()
defer s.Unlock()
n := s.copyDpts(&gData.DataPoints, t)
// Do not report stale values.
clear(s.values)
// Update start time for delta temporality.
s.start = t
*dest = gData
return n
}
func (s *precomputedLastValue[N]) cumulative(dest *metricdata.Aggregation) int {
t := now()
// Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of
// the DataPoints is missed (better luck next time).
gData, _ := (*dest).(metricdata.Gauge[N])
s.Lock()
defer s.Unlock()
n := s.copyDpts(&gData.DataPoints, t)
// Do not report stale values.
clear(s.values)
*dest = gData
return n
}

View file

@ -1,16 +1,5 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// SPDX-License-Identifier: Apache-2.0
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
@ -41,9 +30,9 @@ func newLimiter[V any](aggregation int) limiter[V] {
// aggregation cardinality limit for the existing measurements. If it will,
// overflowSet is returned. Otherwise, if it will not exceed the limit, or the
// limit is not set (limit <= 0), attr is returned.
func (l limiter[V]) Attributes(attrs attribute.Set, measurements map[attribute.Set]V) attribute.Set {
func (l limiter[V]) Attributes(attrs attribute.Set, measurements map[attribute.Distinct]V) attribute.Set {
if l.aggLimit > 0 {
_, exists := measurements[attrs]
_, exists := measurements[attrs.Equivalent()]
if !exists && len(measurements) >= l.aggLimit-1 {
return overflowSet
}

View file

@ -1,16 +1,5 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// SPDX-License-Identifier: Apache-2.0
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
@ -25,48 +14,48 @@ import (
)
type sumValue[N int64 | float64] struct {
n N
res exemplar.Reservoir[N]
n N
res exemplar.FilteredReservoir[N]
attrs attribute.Set
}
// valueMap is the storage for sums.
type valueMap[N int64 | float64] struct {
sync.Mutex
newRes func() exemplar.Reservoir[N]
newRes func() exemplar.FilteredReservoir[N]
limit limiter[sumValue[N]]
values map[attribute.Set]sumValue[N]
values map[attribute.Distinct]sumValue[N]
}
func newValueMap[N int64 | float64](limit int, r func() exemplar.Reservoir[N]) *valueMap[N] {
func newValueMap[N int64 | float64](limit int, r func() exemplar.FilteredReservoir[N]) *valueMap[N] {
return &valueMap[N]{
newRes: r,
limit: newLimiter[sumValue[N]](limit),
values: make(map[attribute.Set]sumValue[N]),
values: make(map[attribute.Distinct]sumValue[N]),
}
}
func (s *valueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) {
t := now()
s.Lock()
defer s.Unlock()
attr := s.limit.Attributes(fltrAttr, s.values)
v, ok := s.values[attr]
v, ok := s.values[attr.Equivalent()]
if !ok {
v.res = s.newRes()
}
v.attrs = attr
v.n += value
v.res.Offer(ctx, t, value, droppedAttr)
v.res.Offer(ctx, value, droppedAttr)
s.values[attr] = v
s.values[attr.Equivalent()] = v
}
// newSum returns an aggregator that summarizes a set of measurements as their
// arithmetic sum. Each sum is scoped by attributes and the aggregation cycle
// the measurements were made in.
func newSum[N int64 | float64](monotonic bool, limit int, r func() exemplar.Reservoir[N]) *sum[N] {
func newSum[N int64 | float64](monotonic bool, limit int, r func() exemplar.FilteredReservoir[N]) *sum[N] {
return &sum[N]{
valueMap: newValueMap[N](limit, r),
monotonic: monotonic,
@ -98,16 +87,16 @@ func (s *sum[N]) delta(dest *metricdata.Aggregation) int {
dPts := reset(sData.DataPoints, n, n)
var i int
for attr, val := range s.values {
dPts[i].Attributes = attr
for _, val := range s.values {
dPts[i].Attributes = val.attrs
dPts[i].StartTime = s.start
dPts[i].Time = t
dPts[i].Value = val.n
val.res.Collect(&dPts[i].Exemplars)
// Do not report stale values.
delete(s.values, attr)
collectExemplars(&dPts[i].Exemplars, val.res.Collect)
i++
}
// Do not report stale values.
clear(s.values)
// The delta collection cycle resets.
s.start = t
@ -133,12 +122,12 @@ func (s *sum[N]) cumulative(dest *metricdata.Aggregation) int {
dPts := reset(sData.DataPoints, n, n)
var i int
for attr, value := range s.values {
dPts[i].Attributes = attr
for _, value := range s.values {
dPts[i].Attributes = value.attrs
dPts[i].StartTime = s.start
dPts[i].Time = t
dPts[i].Value = value.n
value.res.Collect(&dPts[i].Exemplars)
collectExemplars(&dPts[i].Exemplars, value.res.Collect)
// TODO (#3006): This will use an unbounded amount of memory if there
// are unbounded number of attribute sets being aggregated. Attribute
// sets that become "stale" need to be forgotten so this will not
@ -155,7 +144,7 @@ func (s *sum[N]) cumulative(dest *metricdata.Aggregation) int {
// newPrecomputedSum returns an aggregator that summarizes a set of
// observatrions as their arithmetic sum. Each sum is scoped by attributes and
// the aggregation cycle the measurements were made in.
func newPrecomputedSum[N int64 | float64](monotonic bool, limit int, r func() exemplar.Reservoir[N]) *precomputedSum[N] {
func newPrecomputedSum[N int64 | float64](monotonic bool, limit int, r func() exemplar.FilteredReservoir[N]) *precomputedSum[N] {
return &precomputedSum[N]{
valueMap: newValueMap[N](limit, r),
monotonic: monotonic,
@ -170,12 +159,12 @@ type precomputedSum[N int64 | float64] struct {
monotonic bool
start time.Time
reported map[attribute.Set]N
reported map[attribute.Distinct]N
}
func (s *precomputedSum[N]) delta(dest *metricdata.Aggregation) int {
t := now()
newReported := make(map[attribute.Set]N)
newReported := make(map[attribute.Distinct]N)
// If *dest is not a metricdata.Sum, memory reuse is missed. In that case,
// use the zero-value sData and hope for better alignment next cycle.
@ -190,21 +179,20 @@ func (s *precomputedSum[N]) delta(dest *metricdata.Aggregation) int {
dPts := reset(sData.DataPoints, n, n)
var i int
for attr, value := range s.values {
delta := value.n - s.reported[attr]
for key, value := range s.values {
delta := value.n - s.reported[key]
dPts[i].Attributes = attr
dPts[i].Attributes = value.attrs
dPts[i].StartTime = s.start
dPts[i].Time = t
dPts[i].Value = delta
value.res.Collect(&dPts[i].Exemplars)
collectExemplars(&dPts[i].Exemplars, value.res.Collect)
newReported[attr] = value.n
// Unused attribute sets do not report.
delete(s.values, attr)
newReported[key] = value.n
i++
}
// Unused attribute sets are forgotten.
// Unused attribute sets do not report.
clear(s.values)
s.reported = newReported
// The delta collection cycle resets.
s.start = t
@ -231,17 +219,17 @@ func (s *precomputedSum[N]) cumulative(dest *metricdata.Aggregation) int {
dPts := reset(sData.DataPoints, n, n)
var i int
for attr, val := range s.values {
dPts[i].Attributes = attr
for _, val := range s.values {
dPts[i].Attributes = val.attrs
dPts[i].StartTime = s.start
dPts[i].Time = t
dPts[i].Value = val.n
val.res.Collect(&dPts[i].Exemplars)
collectExemplars(&dPts[i].Exemplars, val.res.Collect)
// Unused attribute sets do not report.
delete(s.values, attr)
i++
}
// Unused attribute sets do not report.
clear(s.values)
sData.DataPoints = dPts
*dest = sData