Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move Aggregator interface to aggregator package #2444

Merged
merged 2 commits into from
Dec 15, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion exporters/otlp/otlpmetric/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"go.opentelemetry.io/otel/metric/number"
"go.opentelemetry.io/otel/metric/sdkapi"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
"go.opentelemetry.io/otel/sdk/metric/export"
Expand Down Expand Up @@ -682,7 +683,7 @@ func runMetricExportTests(t *testing.T, opts []otlpmetric.Option, res *resource.
desc := metrictest.NewDescriptor(r.name, r.iKind, r.nKind)
labs := attribute.NewSet(lcopy...)

var agg, ckpt export.Aggregator
var agg, ckpt aggregator.Aggregator
if r.iKind.Adding() {
sums := sum.New(2)
agg, ckpt = &sums[0], &sums[1]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"go.opentelemetry.io/otel/metric/metrictest"
"go.opentelemetry.io/otel/metric/number"
"go.opentelemetry.io/otel/metric/sdkapi"
"go.opentelemetry.io/otel/sdk/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue"
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
"go.opentelemetry.io/otel/sdk/metric/export"
Expand Down Expand Up @@ -241,10 +242,10 @@ func (t *testAgg) Aggregation() aggregation.Aggregation {
func (t *testAgg) Update(ctx context.Context, number number.Number, descriptor *sdkapi.Descriptor) error {
return nil
}
func (t *testAgg) SynchronizedMove(destination export.Aggregator, descriptor *sdkapi.Descriptor) error {
func (t *testAgg) SynchronizedMove(destination aggregator.Aggregator, descriptor *sdkapi.Descriptor) error {
return nil
}
func (t *testAgg) Merge(aggregator export.Aggregator, descriptor *sdkapi.Descriptor) error {
func (t *testAgg) Merge(aggregator aggregator.Aggregator, descriptor *sdkapi.Descriptor) error {
return nil
}

Expand All @@ -270,7 +271,7 @@ func (te *testErrSum) Kind() aggregation.Kind {
return aggregation.SumKind
}

var _ export.Aggregator = &testAgg{}
var _ aggregator.Aggregator = &testAgg{}
var _ aggregation.Aggregation = &testAgg{}
var _ aggregation.Sum = &testErrSum{}
var _ aggregation.LastValue = &testErrLastValue{}
Expand Down
5 changes: 3 additions & 2 deletions sdk/export/metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@ import (

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/sdkapi"
"go.opentelemetry.io/otel/sdk/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/export"
"go.opentelemetry.io/otel/sdk/metric/export/aggregation"
)

// Deprecated: use module "go.opentelemetry.io/otel/sdk/metric/export"
type Accumulation = export.Accumulation

// Deprecated: use module "go.opentelemetry.io/otel/sdk/metric/export"
type Aggregator = export.Aggregator
// Deprecated: use module "go.opentelemetry.io/otel/sdk/metric/aggregator"
type Aggregator = aggregator.Aggregator

// Deprecated: use module "go.opentelemetry.io/otel/sdk/metric/export"
type AggregatorSelector = export.AggregatorSelector
Expand Down
66 changes: 64 additions & 2 deletions sdk/metric/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,81 @@
package aggregator // import "go.opentelemetry.io/otel/sdk/metric/aggregator"

import (
"context"
"fmt"
"math"

"go.opentelemetry.io/otel/metric/number"
"go.opentelemetry.io/otel/metric/sdkapi"
"go.opentelemetry.io/otel/sdk/metric/export"
"go.opentelemetry.io/otel/sdk/metric/export/aggregation"
)

// Aggregator implements a specific aggregation behavior, e.g., a
// behavior to track a sequence of updates to an instrument. Counter
// instruments commonly use a simple Sum aggregator, but for the
// distribution instruments (Histogram, GaugeObserver) there are a
// number of possible aggregators with different cost and accuracy
// tradeoffs.
//
// Note that any Aggregator may be attached to any instrument--this is
// the result of the OpenTelemetry API/SDK separation. It is possible
// to attach a Sum aggregator to a Histogram instrument.
type Aggregator interface {
// Aggregation returns an Aggregation interface to access the
// current state of this Aggregator. The caller is
// responsible for synchronization and must not call any the
// other methods in this interface concurrently while using
// the Aggregation.
Aggregation() aggregation.Aggregation

// Update receives a new measured value and incorporates it
// into the aggregation. Update() calls may be called
// concurrently.
//
// Descriptor.NumberKind() should be consulted to determine
// whether the provided number is an int64 or float64.
//
// The Context argument comes from user-level code and could be
// inspected for a `correlation.Map` or `trace.SpanContext`.
Update(ctx context.Context, number number.Number, descriptor *sdkapi.Descriptor) error

// SynchronizedMove is called during collection to finish one
// period of aggregation by atomically saving the
// currently-updating state into the argument Aggregator AND
// resetting the current value to the zero state.
//
// SynchronizedMove() is called concurrently with Update(). These
// two methods must be synchronized with respect to each
// other, for correctness.
//
// After saving a synchronized copy, the Aggregator can be converted
// into one or more of the interfaces in the `aggregation` sub-package,
// according to kind of Aggregator that was selected.
//
// This method will return an InconsistentAggregatorError if
// this Aggregator cannot be copied into the destination due
// to an incompatible type.
//
// This call has no Context argument because it is expected to
// perform only computation.
//
// When called with a nil `destination`, this Aggregator is reset
// and the current value is discarded.
SynchronizedMove(destination Aggregator, descriptor *sdkapi.Descriptor) error

// Merge combines the checkpointed state from the argument
// Aggregator into this Aggregator. Merge is not synchronized
// with respect to Update or SynchronizedMove.
//
// The owner of an Aggregator being merged is responsible for
// synchronization of both Aggregator states.
Merge(aggregator Aggregator, descriptor *sdkapi.Descriptor) error
}

// NewInconsistentAggregatorError formats an error describing an attempt to
// Checkpoint or Merge different-type aggregators. The result can be unwrapped as
// an ErrInconsistentType.
func NewInconsistentAggregatorError(a1, a2 export.Aggregator) error {
func NewInconsistentAggregatorError(a1, a2 Aggregator) error {
return fmt.Errorf("%w: %T and %T", aggregation.ErrInconsistentType, a1, a2)
}

Expand Down
13 changes: 6 additions & 7 deletions sdk/metric/aggregator/aggregatortest/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"go.opentelemetry.io/otel/metric/number"
"go.opentelemetry.io/otel/metric/sdkapi"
"go.opentelemetry.io/otel/sdk/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/export"
"go.opentelemetry.io/otel/sdk/metric/export/aggregation"
)

Expand All @@ -44,7 +43,7 @@ type Profile struct {
type NoopAggregator struct{}
type NoopAggregation struct{}

var _ export.Aggregator = NoopAggregator{}
var _ aggregator.Aggregator = NoopAggregator{}
var _ aggregation.Aggregation = NoopAggregation{}

func newProfiles() []Profile {
Expand Down Expand Up @@ -150,7 +149,7 @@ func (n *Numbers) Points() []number.Number {
}

// CheckedUpdate performs the same range test the SDK does on behalf of the aggregator.
func CheckedUpdate(t *testing.T, agg export.Aggregator, number number.Number, descriptor *sdkapi.Descriptor) {
func CheckedUpdate(t *testing.T, agg aggregator.Aggregator, number number.Number, descriptor *sdkapi.Descriptor) {
ctx := context.Background()

// Note: Aggregator tests are written assuming that the SDK
Expand All @@ -166,7 +165,7 @@ func CheckedUpdate(t *testing.T, agg export.Aggregator, number number.Number, de
}
}

func CheckedMerge(t *testing.T, aggInto, aggFrom export.Aggregator, descriptor *sdkapi.Descriptor) {
func CheckedMerge(t *testing.T, aggInto, aggFrom aggregator.Aggregator, descriptor *sdkapi.Descriptor) {
if err := aggInto.Merge(aggFrom, descriptor); err != nil {
t.Error("Unexpected Merge failure", err)
}
Expand All @@ -184,15 +183,15 @@ func (NoopAggregator) Update(context.Context, number.Number, *sdkapi.Descriptor)
return nil
}

func (NoopAggregator) SynchronizedMove(export.Aggregator, *sdkapi.Descriptor) error {
func (NoopAggregator) SynchronizedMove(aggregator.Aggregator, *sdkapi.Descriptor) error {
return nil
}

func (NoopAggregator) Merge(export.Aggregator, *sdkapi.Descriptor) error {
func (NoopAggregator) Merge(aggregator.Aggregator, *sdkapi.Descriptor) error {
return nil
}

func SynchronizedMoveResetTest(t *testing.T, mkind sdkapi.InstrumentKind, nf func(*sdkapi.Descriptor) export.Aggregator) {
func SynchronizedMoveResetTest(t *testing.T, mkind sdkapi.InstrumentKind, nf func(*sdkapi.Descriptor) aggregator.Aggregator) {
t.Run("reset on nil", func(t *testing.T) {
// Ensures that SynchronizedMove(nil, descriptor) discards and
// resets the aggregator.
Expand Down
7 changes: 3 additions & 4 deletions sdk/metric/aggregator/histogram/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"go.opentelemetry.io/otel/metric/number"
"go.opentelemetry.io/otel/metric/sdkapi"
"go.opentelemetry.io/otel/sdk/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/export"
"go.opentelemetry.io/otel/sdk/metric/export/aggregation"
)

Expand Down Expand Up @@ -97,7 +96,7 @@ var defaultInt64ExplicitBoundaries = func(bounds []float64) (asint []float64) {
return
}(defaultFloat64ExplicitBoundaries)

var _ export.Aggregator = &Aggregator{}
var _ aggregator.Aggregator = &Aggregator{}
var _ aggregation.Sum = &Aggregator{}
var _ aggregation.Count = &Aggregator{}
var _ aggregation.Histogram = &Aggregator{}
Expand Down Expand Up @@ -174,7 +173,7 @@ func (c *Aggregator) Histogram() (aggregation.Buckets, error) {
// the empty set. Since no locks are taken, there is a chance that
// the independent Sum, Count and Bucket Count are not consistent with each
// other.
func (c *Aggregator) SynchronizedMove(oa export.Aggregator, desc *sdkapi.Descriptor) error {
func (c *Aggregator) SynchronizedMove(oa aggregator.Aggregator, desc *sdkapi.Descriptor) error {
o, _ := oa.(*Aggregator)

if oa != nil && o == nil {
Expand Down Expand Up @@ -254,7 +253,7 @@ func (c *Aggregator) Update(_ context.Context, number number.Number, desc *sdkap
}

// Merge combines two histograms that have the same buckets into a single one.
func (c *Aggregator) Merge(oa export.Aggregator, desc *sdkapi.Descriptor) error {
func (c *Aggregator) Merge(oa aggregator.Aggregator, desc *sdkapi.Descriptor) error {
o, _ := oa.(*Aggregator)
if o == nil {
return aggregator.NewInconsistentAggregatorError(c, oa)
Expand Down
4 changes: 2 additions & 2 deletions sdk/metric/aggregator/histogram/histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import (

"go.opentelemetry.io/otel/metric/number"
"go.opentelemetry.io/otel/metric/sdkapi"
"go.opentelemetry.io/otel/sdk/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/aggregator/aggregatortest"
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
"go.opentelemetry.io/otel/sdk/metric/export"
)

const count = 100
Expand Down Expand Up @@ -240,7 +240,7 @@ func TestSynchronizedMoveReset(t *testing.T) {
aggregatortest.SynchronizedMoveResetTest(
t,
sdkapi.HistogramInstrumentKind,
func(desc *sdkapi.Descriptor) export.Aggregator {
func(desc *sdkapi.Descriptor) aggregator.Aggregator {
return &histogram.New(1, desc, histogram.WithExplicitBoundaries(testBoundaries))[0]
},
)
Expand Down
7 changes: 3 additions & 4 deletions sdk/metric/aggregator/lastvalue/lastvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"go.opentelemetry.io/otel/metric/number"
"go.opentelemetry.io/otel/metric/sdkapi"
"go.opentelemetry.io/otel/sdk/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/export"
"go.opentelemetry.io/otel/sdk/metric/export/aggregation"
)

Expand Down Expand Up @@ -51,7 +50,7 @@ type (
}
)

var _ export.Aggregator = &Aggregator{}
var _ aggregator.Aggregator = &Aggregator{}
var _ aggregation.LastValue = &Aggregator{}

// An unset lastValue has zero timestamp and zero value.
Expand Down Expand Up @@ -92,7 +91,7 @@ func (g *Aggregator) LastValue() (number.Number, time.Time, error) {
}

// SynchronizedMove atomically saves the current value.
func (g *Aggregator) SynchronizedMove(oa export.Aggregator, _ *sdkapi.Descriptor) error {
func (g *Aggregator) SynchronizedMove(oa aggregator.Aggregator, _ *sdkapi.Descriptor) error {
if oa == nil {
atomic.StorePointer(&g.value, unsafe.Pointer(unsetLastValue))
return nil
Expand All @@ -117,7 +116,7 @@ func (g *Aggregator) Update(_ context.Context, number number.Number, desc *sdkap

// Merge combines state from two aggregators. The most-recently set
// value is chosen.
func (g *Aggregator) Merge(oa export.Aggregator, desc *sdkapi.Descriptor) error {
func (g *Aggregator) Merge(oa aggregator.Aggregator, desc *sdkapi.Descriptor) error {
o, _ := oa.(*Aggregator)
if o == nil {
return aggregator.NewInconsistentAggregatorError(g, oa)
Expand Down
6 changes: 3 additions & 3 deletions sdk/metric/aggregator/lastvalue/lastvalue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ import (
ottest "go.opentelemetry.io/otel/internal/internaltest"
"go.opentelemetry.io/otel/metric/number"
"go.opentelemetry.io/otel/metric/sdkapi"
"go.opentelemetry.io/otel/sdk/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/aggregator/aggregatortest"
"go.opentelemetry.io/otel/sdk/metric/export"
"go.opentelemetry.io/otel/sdk/metric/export/aggregation"
)

const count = 100

var _ export.Aggregator = &Aggregator{}
var _ aggregator.Aggregator = &Aggregator{}

// Ensure struct alignment prior to running tests.
func TestMain(m *testing.M) {
Expand Down Expand Up @@ -139,7 +139,7 @@ func TestSynchronizedMoveReset(t *testing.T) {
aggregatortest.SynchronizedMoveResetTest(
t,
sdkapi.GaugeObserverInstrumentKind,
func(desc *sdkapi.Descriptor) export.Aggregator {
func(desc *sdkapi.Descriptor) aggregator.Aggregator {
return &New(1)[0]
},
)
Expand Down
7 changes: 3 additions & 4 deletions sdk/metric/aggregator/sum/sum.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"go.opentelemetry.io/otel/metric/number"
"go.opentelemetry.io/otel/metric/sdkapi"
"go.opentelemetry.io/otel/sdk/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/export"
"go.opentelemetry.io/otel/sdk/metric/export/aggregation"
)

Expand All @@ -31,7 +30,7 @@ type Aggregator struct {
value number.Number
}

var _ export.Aggregator = &Aggregator{}
var _ aggregator.Aggregator = &Aggregator{}
var _ aggregation.Sum = &Aggregator{}

// New returns a new counter aggregator implemented by atomic
Expand Down Expand Up @@ -59,7 +58,7 @@ func (c *Aggregator) Sum() (number.Number, error) {

// SynchronizedMove atomically saves the current value into oa and resets the
// current sum to zero.
func (c *Aggregator) SynchronizedMove(oa export.Aggregator, _ *sdkapi.Descriptor) error {
func (c *Aggregator) SynchronizedMove(oa aggregator.Aggregator, _ *sdkapi.Descriptor) error {
if oa == nil {
c.value.SetRawAtomic(0)
return nil
Expand All @@ -79,7 +78,7 @@ func (c *Aggregator) Update(_ context.Context, num number.Number, desc *sdkapi.D
}

// Merge combines two counters by adding their sums.
func (c *Aggregator) Merge(oa export.Aggregator, desc *sdkapi.Descriptor) error {
func (c *Aggregator) Merge(oa aggregator.Aggregator, desc *sdkapi.Descriptor) error {
o, _ := oa.(*Aggregator)
if o == nil {
return aggregator.NewInconsistentAggregatorError(c, oa)
Expand Down
4 changes: 2 additions & 2 deletions sdk/metric/aggregator/sum/sum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
ottest "go.opentelemetry.io/otel/internal/internaltest"
"go.opentelemetry.io/otel/metric/number"
"go.opentelemetry.io/otel/metric/sdkapi"
"go.opentelemetry.io/otel/sdk/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/aggregator/aggregatortest"
"go.opentelemetry.io/otel/sdk/metric/export"
)

const count = 100
Expand Down Expand Up @@ -147,7 +147,7 @@ func TestSynchronizedMoveReset(t *testing.T) {
aggregatortest.SynchronizedMoveResetTest(
t,
sdkapi.CounterObserverInstrumentKind,
func(desc *sdkapi.Descriptor) export.Aggregator {
func(desc *sdkapi.Descriptor) aggregator.Aggregator {
return &New(1)[0]
},
)
Expand Down
3 changes: 2 additions & 1 deletion sdk/metric/correct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/sdkapi"
metricsdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/export"
"go.opentelemetry.io/otel/sdk/metric/export/aggregation"
"go.opentelemetry.io/otel/sdk/metric/processor/processortest"
Expand Down Expand Up @@ -72,7 +73,7 @@ type testSelector struct {
newAggCount int
}

func (ts *testSelector) AggregatorFor(desc *sdkapi.Descriptor, aggPtrs ...*export.Aggregator) {
func (ts *testSelector) AggregatorFor(desc *sdkapi.Descriptor, aggPtrs ...*aggregator.Aggregator) {
ts.newAggCount += len(aggPtrs)
processortest.AggregatorSelector().AggregatorFor(desc, aggPtrs...)
}
Expand Down
Loading