Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/deltatocumulative]: observe accumulation metrics #31363

Merged
merged 23 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/deltatocumulative-metrics.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: "enhancement"

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: deltatocumulativeprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: self-instrumentation to observe key metrics of the stream accumulation

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [30705]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
17 changes: 17 additions & 0 deletions processor/deltatocumulativeprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,21 @@ processors:
# upper limit of streams to track. new streams exceeding this limit
# will be dropped
[ max_streams: <int> | default = 0 (off) ]

```

There is no further configuration required. All delta samples are converted to cumulative.

## Troubleshooting

The following metrics are recorded when [telemetry is
enabled](https://opentelemetry.io/docs/collector/configuration/#telemetry):

| Name | Description | Unit |
|------------------------------------------|---------------------------------------------------------------------------------------|---------------|
| `deltatocumulative.streams.tracked` | Number of streams currently tracked by the aggregation state | `{stream}` |
andrzej-stencel marked this conversation as resolved.
Show resolved Hide resolved
| `deltatocumulative.streams.limit` | Upper limit of tracked streams | `{stream}` |
| `deltatocumulative.streams.evicted` | Number of streams removed from tracking to ingest newer streams | `{stream}` |
| `deltatocumulative.datapoints.processed` | Total number of datapoints processed, whether successful or not | `{datapoint}` |
| `deltatocumulative.datapoints.dropped` | Faulty datapoints that were dropped due to the reason given in the `reason` attribute | `{datapoint}` |
| `deltatocumulative.gaps.length` | Total length of all gaps in the streams, which occur e.g. due to lost in transit | `second` |
3 changes: 2 additions & 1 deletion processor/deltatocumulativeprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,6 @@ func createMetricsProcessor(_ context.Context, set processor.CreateSettings, cfg
return nil, fmt.Errorf("configuration parsing error")
}

return newProcessor(pcfg, set.Logger, next), nil
meter := metadata.Meter(set.TelemetrySettings)
return newProcessor(pcfg, set.Logger, meter, next), nil
}
2 changes: 1 addition & 1 deletion processor/deltatocumulativeprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
go.opentelemetry.io/collector/consumer v0.96.1-0.20240315172937-3b5aee0c7a16
go.opentelemetry.io/collector/pdata v1.3.1-0.20240315172937-3b5aee0c7a16
go.opentelemetry.io/collector/processor v0.96.1-0.20240315172937-3b5aee0c7a16
go.opentelemetry.io/otel v1.24.0
go.opentelemetry.io/otel/metric v1.24.0
go.opentelemetry.io/otel/trace v1.24.0
go.uber.org/zap v1.27.0
Expand Down Expand Up @@ -41,7 +42,6 @@ require (
github.com/prometheus/procfs v0.12.0 // indirect
go.opentelemetry.io/collector v0.96.1-0.20240315172937-3b5aee0c7a16 // indirect
go.opentelemetry.io/collector/config/configtelemetry v0.96.1-0.20240315172937-3b5aee0c7a16 // indirect
go.opentelemetry.io/otel v1.24.0 // indirect
go.opentelemetry.io/otel/exporters/prometheus v0.46.0 // indirect
go.opentelemetry.io/otel/sdk v1.24.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.24.0 // indirect
Expand Down
19 changes: 18 additions & 1 deletion processor/deltatocumulativeprocessor/internal/delta/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,17 @@ func (a Accumulator[D]) Store(id streams.Ident, dp D) error {
return ErrOutOfOrder{Last: aggr.Timestamp(), Sample: dp.Timestamp()}
}

// detect gaps
var gap error
if dp.StartTimestamp() > aggr.Timestamp() {
sh0rez marked this conversation as resolved.
Show resolved Hide resolved
gap = ErrGap{From: aggr.Timestamp(), To: dp.StartTimestamp()}
}

res := aggr.Add(dp)
return a.Map.Store(id, res)
if err := a.Map.Store(id, res); err != nil {
return err
}
return gap
}

type ErrOlderStart struct {
Expand All @@ -65,3 +74,11 @@ type ErrOutOfOrder struct {
func (e ErrOutOfOrder) Error() string {
return fmt.Sprintf("out of order: dropped sample from time=%s, because series is already at time=%s", e.Sample, e.Last)
}

type ErrGap struct {
From, To pcommon.Timestamp
}

func (e ErrGap) Error() string {
return fmt.Sprintf("gap in stream from %s to %s", e.From, e.To)
sh0rez marked this conversation as resolved.
Show resolved Hide resolved
}
25 changes: 25 additions & 0 deletions processor/deltatocumulativeprocessor/internal/maybe/option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package maybe

// Ptr is a pointer that points to something that is not guaranteed to exist.
// Any use must "Try()" accessing the underlying value, enforcing checking the
sh0rez marked this conversation as resolved.
Show resolved Hide resolved
// ok return value too.
// This provides a clear distinction between "not set" and "set to nil"
type Ptr[T any] struct {
to *T
ok bool
}

func None[T any]() Ptr[T] {
sh0rez marked this conversation as resolved.
Show resolved Hide resolved
return Ptr[T]{to: nil, ok: false}
}

func Some[T any](ptr *T) Ptr[T] {
sh0rez marked this conversation as resolved.
Show resolved Hide resolved
return Ptr[T]{to: ptr, ok: true}
}

func (ptr Ptr[T]) Try() (_ *T, ok bool) {
return ptr.to, ptr.ok
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ type LimitMap[T any] struct {
}

func (m LimitMap[T]) Store(id identity.Stream, v T) error {
if m.Map.Len() < m.Max {
_, ok := m.Map.Load(id)
avail := m.Map.Len() < m.Max
if ok || avail {
return m.Map.Store(id, v)
}

Expand Down Expand Up @@ -57,7 +59,3 @@ type ErrEvicted struct {
func (e ErrEvicted) Error() string {
return fmt.Sprintf("%s. evicted stream %s", e.ErrLimit, e.id)
}

func (e ErrEvicted) Unwrap() error {
return e.ErrLimit
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ func TestLimit(t *testing.T) {
lim := streams.Limit(items, 10)

ids := make([]identity.Stream, 10)
dps := make([]data.Number, 10)

// write until limit must work
for i := 0; i < 10; i++ {
id, dp := sum.Stream()
ids[i] = id
dps[i] = dp
err := lim.Store(id, dp)
require.NoError(t, err)
}
Expand All @@ -40,6 +42,12 @@ func TestLimit(t *testing.T) {
require.True(t, streams.AtLimit(err))
}

// write to existing must work
{
err := lim.Store(ids[3], dps[3])
require.NoError(t, err)
}

// after removing one, must be accepted again
{
lim.Delete(ids[0])
Expand Down
199 changes: 199 additions & 0 deletions processor/deltatocumulativeprocessor/internal/telemetry/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package telemetry // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/telemetry"
sh0rez marked this conversation as resolved.
Show resolved Hide resolved

import (
"context"
"errors"

"go.opentelemetry.io/collector/processor/processorhelper"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/delta"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams"
)

type Telemetry struct {
Metrics
}

func New(meter metric.Meter) Telemetry {
return Telemetry{
Metrics: metrics(meter),
}
}

type Streams struct {
tracked metric.Int64UpDownCounter
limit metric.Int64ObservableGauge
evicted metric.Int64Counter
}

type Datapoints struct {
total metric.Int64Counter
dropped metric.Int64Counter
}

type Metrics struct {
streams Streams
dps Datapoints

gaps metric.Int64Counter
}

func metrics(meter metric.Meter) Metrics {
var (
count = use(meter.Int64Counter)
updown = use(meter.Int64UpDownCounter)
gauge = use(meter.Int64ObservableGauge)
)

return Metrics{
streams: Streams{
tracked: updown("streams.tracked",
metric.WithDescription("number of streams tracked"),
metric.WithUnit("{stream}"),
),
limit: gauge("streams.limit",
metric.WithDescription("upper limit of tracked streams"),
metric.WithUnit("{stream}"),
),
evicted: count("streams.evicted",
metric.WithDescription("number of streams evicted"),
metric.WithUnit("{stream}"),
),
},
dps: Datapoints{
total: count("datapoints.processed",
metric.WithDescription("number of datapoints processed"),
metric.WithUnit("{datapoint}"),
),
dropped: count("datapoints.dropped",
metric.WithDescription("number of dropped datapoints due to given 'reason'"),
metric.WithUnit("{datapoint}"),
),
},
gaps: count("gaps.length",
metric.WithDescription("total duration where data was expected but not received"),
metric.WithUnit("s"),
),
}
}

func (m Metrics) WithLimit(meter metric.Meter, max int64) {
then := metric.Callback(func(_ context.Context, o metric.Observer) error {
o.ObserveInt64(m.streams.limit, max)
return nil
})
_, err := meter.RegisterCallback(then, m.streams.limit)
if err != nil {
panic(err)
}
}

func ObserveItems[T any](items streams.Map[T], metrics *Metrics) Items[T] {
return Items[T]{
Map: items,
Metrics: metrics,
}
}

func ObserveNonFatal[T any](items streams.Map[T], metrics *Metrics) Faults[T] {
return Faults[T]{
Map: items,
Metrics: metrics,
}
}

type Items[T any] struct {
streams.Map[T]
*Metrics
}

func (i Items[T]) Store(id streams.Ident, v T) error {
inc(i.dps.total)

_, old := i.Map.Load(id)
err := i.Map.Store(id, v)
if err == nil && !old {
inc(i.streams.tracked)
}

return err
}

func (i Items[T]) Delete(id streams.Ident) {
dec(i.streams.tracked)
i.Map.Delete(id)
}

type Faults[T any] struct {
streams.Map[T]
*Metrics
}

func (f Faults[T]) Store(id streams.Ident, v T) error {
var (
olderStart delta.ErrOlderStart
outOfOrder delta.ErrOutOfOrder
gap delta.ErrGap
limit streams.ErrLimit
evict streams.ErrEvicted
)

err := f.Map.Store(id, v)
switch {
default:
return err
case errors.As(err, &olderStart):
inc(f.dps.dropped, reason("older-start"))
case errors.As(err, &outOfOrder):
inc(f.dps.dropped, reason("out-of-order"))
case errors.As(err, &limit):
inc(f.dps.dropped, reason("stream-limit"))
case errors.As(err, &evict):
inc(f.streams.evicted)
case errors.As(err, &gap):
from := gap.From.AsTime()
to := gap.To.AsTime()
lost := to.Sub(from).Seconds()
f.gaps.Add(context.TODO(), int64(lost))
}

return nil
}

var (
_ streams.Map[any] = (*Items[any])(nil)
_ streams.Map[any] = (*Faults[any])(nil)
)

type addable[Opts any] interface {
Add(context.Context, int64, ...Opts)
}

func inc[A addable[O], O any](a A, opts ...O) {
a.Add(context.Background(), 1, opts...)
}

func dec[A addable[O], O any](a A, opts ...O) {
a.Add(context.Background(), -1, opts...)
}

func reason(reason string) metric.AddOption {
return metric.WithAttributes(attribute.String("reason", reason))
}

func use[F func(string, ...O) (M, error), M any, O any](f F) func(string, ...O) M {
return func(name string, opts ...O) M {
name = processorhelper.BuildCustomMetricName(metadata.Type.String(), name)
m, err := f(name, opts...)
if err != nil {
panic(err)
}
return m
}
}
Loading
Loading