Skip to content

Commit

Permalink
Replace Ordered with an iterator in export.Labels. (#567)
Browse files Browse the repository at this point in the history
* Do not expose a slice of labels in export.Record

This is really an inconvenient implementation detail leak - we may
want to store labels in a different way. Replace it with an iterator -
it does not force us to use slice of key values as a storage in the
long run.

* Add Len to LabelIterator

It may come in handy in several situations, where we don't have access
to export.Labels object, but only to the label iterator.

* Use reflect value label iterator for the fixed labels

* add reset operation to iterator

Makes my life easier when writing a benchmark. Might also be an
alternative to cloning the iterator.

* Add benchmarks for iterators

* Add import comment

* Add clone operation to label iterator

* Move iterator tests to a separate package

* Add tests for cloning iterators

* Pass label iterator to export labels

* Use non-addressable array reflect values

By not using the value created by `reflect.New()`, but rather by
`reflect.ValueOf()`, we get a non-addressable array in the value,
which does not infer an allocation cost when getting an element from
the array.

* Drop zero iterator

This can be substituted by a reflect value iterator that goes over a
value with a zero-sized array.

* Add a simple iterator that implements label iterator

In the long run this will completely replace the LabelIterator
interface.

* Replace reflect value iterator with simple iterator

* Pass label storage to new export labels, not label iterator

* Drop label iterator interface, rename storage iterator to label iterator

* Drop clone operation from iterator

It's a leftover from interface times and now it's pointless - the
iterator is a simple struct, so cloning it is a simple copy.

* Drop Reset from label iterator

The sole existence of Reset was actually for benchmarking convenience.
Now we can just copy the iterator cheaply, so a need for Reset is no
more.

* Drop noop iterator tests

* Move back iterator tests to export package

* Eagerly get the reflect value of ordered labels

So we won't get into problems when several goroutines want to iterate
the same labels at the same time. Not sure if this would be a big
deal, since every goroutine would compute the same reflect.Value, but
concurrent write to the same memory is bad anyway. And it doesn't cost
us any extra allocations anyway.

* Replace NewSliceLabelIterator() with a method of LabelSlice

* Add some documentation

* Documentation fixes
  • Loading branch information
krnowak authored Mar 19, 2020
1 parent d8682c1 commit a01f63b
Show file tree
Hide file tree
Showing 18 changed files with 380 additions and 123 deletions.
8 changes: 6 additions & 2 deletions exporters/metric/internal/statsd/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ type noTagsAdapter struct {
func (*noTagsAdapter) AppendName(rec export.Record, buf *bytes.Buffer) {
_, _ = buf.WriteString(rec.Descriptor().Name())

for _, tag := range rec.Labels().Ordered() {
iter := rec.Labels().Iter()
for iter.Next() {
tag := iter.Label()
_, _ = buf.WriteString(".")
_, _ = buf.WriteString(tag.Value.Emit())
}
Expand Down Expand Up @@ -294,7 +296,9 @@ func TestPacketSplit(t *testing.T) {
tcase.setup(func(nkeys int) {
labels := makeLabels(offset, nkeys)
offset += nkeys
expect := fmt.Sprint("counter:100|c", adapter.LabelEncoder.Encode(labels), "\n")
iter := export.LabelSlice(labels).Iter()
encoded := adapter.LabelEncoder.Encode(iter)
expect := fmt.Sprint("counter:100|c", encoded, "\n")
expected = append(expected, expect)
checkpointSet.AddCounter(&desc, 100, labels...)
})
Expand Down
8 changes: 4 additions & 4 deletions exporters/metric/internal/statsd/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"bytes"
"sync"

"go.opentelemetry.io/otel/api/core"
export "go.opentelemetry.io/otel/sdk/export/metric"
)

Expand Down Expand Up @@ -52,14 +51,15 @@ func NewLabelEncoder() *LabelEncoder {
}

// Encode emits a string like "|#key1:value1,key2:value2".
func (e *LabelEncoder) Encode(labels []core.KeyValue) string {
func (e *LabelEncoder) Encode(iter export.LabelIterator) string {
buf := e.pool.Get().(*bytes.Buffer)
defer e.pool.Put(buf)
buf.Reset()

delimiter := "|#"

for _, kv := range labels {
for iter.Next() {
kv := iter.Label()
_, _ = buf.WriteString(delimiter)
_, _ = buf.WriteString(string(kv.Key))
_, _ = buf.WriteRune(':')
Expand All @@ -80,5 +80,5 @@ func (e *LabelEncoder) ForceEncode(labels export.Labels) (string, bool) {
return labels.Encoded(), false
}

return e.Encode(labels.Ordered()), true
return e.Encode(labels.Iter()), true
}
16 changes: 9 additions & 7 deletions exporters/metric/internal/statsd/labels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,23 @@ var testLabels = []core.KeyValue{
func TestLabelSyntax(t *testing.T) {
encoder := statsd.NewLabelEncoder()

require.Equal(t, `|#A:B,C:D,E:1.5`, encoder.Encode(testLabels))
require.Equal(t, `|#A:B,C:D,E:1.5`, encoder.Encode(export.LabelSlice(testLabels).Iter()))

require.Equal(t, `|#A:B`, encoder.Encode([]core.KeyValue{
kvs := []core.KeyValue{
key.New("A").String("B"),
}))
}
require.Equal(t, `|#A:B`, encoder.Encode(export.LabelSlice(kvs).Iter()))

require.Equal(t, "", encoder.Encode(nil))
require.Equal(t, "", encoder.Encode(export.LabelSlice(nil).Iter()))
}

func TestLabelForceEncode(t *testing.T) {
defaultLabelEncoder := sdk.NewDefaultLabelEncoder()
statsdLabelEncoder := statsd.NewLabelEncoder()

exportLabelsDefault := export.NewLabels(testLabels, defaultLabelEncoder.Encode(testLabels), defaultLabelEncoder)
exportLabelsStatsd := export.NewLabels(testLabels, statsdLabelEncoder.Encode(testLabels), statsdLabelEncoder)
ls := export.LabelSlice(testLabels)
exportLabelsDefault := export.NewLabels(ls, defaultLabelEncoder.Encode(ls.Iter()), defaultLabelEncoder)
exportLabelsStatsd := export.NewLabels(ls, statsdLabelEncoder.Encode(ls.Iter()), statsdLabelEncoder)

statsdEncoding := exportLabelsStatsd.Encoded()
require.NotEqual(t, statsdEncoding, exportLabelsDefault.Encoded())
Expand All @@ -63,7 +65,7 @@ func TestLabelForceEncode(t *testing.T) {
require.False(t, repeat)

// Check that this works for an embedded implementation.
exportLabelsEmbed := export.NewLabels(testLabels, statsdEncoding, struct {
exportLabelsEmbed := export.NewLabels(export.LabelSlice(testLabels), statsdEncoding, struct {
*statsd.LabelEncoder
}{LabelEncoder: statsdLabelEncoder})

Expand Down
12 changes: 8 additions & 4 deletions exporters/metric/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,10 @@ func (e *Exporter) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

func labelsKeys(labels export.Labels) []string {
keys := make([]string, 0, labels.Len())
for _, kv := range labels.Ordered() {
iter := labels.Iter()
keys := make([]string, 0, iter.Len())
for iter.Next() {
kv := iter.Label()
keys = append(keys, sanitize(string(kv.Key)))
}
return keys
Expand All @@ -309,8 +311,10 @@ func labelsKeys(labels export.Labels) []string {
func labelValues(labels export.Labels) []string {
// TODO(paivagustavo): parse the labels.Encoded() instead of calling `Emit()` directly
// this would avoid unnecessary allocations.
values := make([]string, 0, labels.Len())
for _, label := range labels.Ordered() {
iter := labels.Iter()
values := make([]string, 0, iter.Len())
for iter.Next() {
label := iter.Label()
values = append(values, label.Value.Emit())
}
return values
Expand Down
8 changes: 5 additions & 3 deletions exporters/metric/stdout/stdout.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,14 +209,16 @@ func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet)
}

specifiedKeyMap := make(map[core.Key]core.Value)
for _, kv := range record.Labels().Ordered() {
iter := record.Labels().Iter()
for iter.Next() {
kv := iter.Label()
specifiedKeyMap[kv.Key] = kv.Value
}

var materializedKeys []string

if labels := record.Labels(); labels.Len() > 0 {
materializedKeys = append(materializedKeys, labels.Encoded())
if iter.Len() > 0 {
materializedKeys = append(materializedKeys, record.Labels().Encoded())
}

for _, k := range desc.Keys() {
Expand Down
4 changes: 2 additions & 2 deletions exporters/metric/test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ func (p *CheckpointSet) Reset() {
// If there is an existing record with the same descriptor and LabelSet
// the stored aggregator will be returned and should be merged.
func (p *CheckpointSet) Add(desc *metric.Descriptor, newAgg export.Aggregator, labels ...core.KeyValue) (agg export.Aggregator, added bool) {
encoded := p.encoder.Encode(labels)
elabels := export.NewLabels(labels, encoded, p.encoder)
ls := export.LabelSlice(labels)
elabels := export.NewLabels(ls, p.encoder.Encode(ls.Iter()), p.encoder)

key := desc.Name() + "_" + elabels.Encoded()
if record, ok := p.records[key]; ok {
Expand Down
20 changes: 14 additions & 6 deletions exporters/otlp/internal/transform/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func sum(desc *metric.Descriptor, labels export.Labels, a aggregator.Sum) (*metr
Name: desc.Name(),
Description: desc.Description(),
Unit: string(desc.Unit()),
Labels: stringKeyValues(labels.Ordered()),
Labels: stringKeyValues(labels.Iter()),
},
}

Expand Down Expand Up @@ -110,7 +110,7 @@ func minMaxSumCount(desc *metric.Descriptor, labels export.Labels, a aggregator.
Description: desc.Description(),
Unit: string(desc.Unit()),
Type: metricpb.MetricDescriptor_SUMMARY,
Labels: stringKeyValues(labels.Ordered()),
Labels: stringKeyValues(labels.Iter()),
},
SummaryDataPoints: []*metricpb.SummaryDataPoint{
{
Expand All @@ -131,10 +131,18 @@ func minMaxSumCount(desc *metric.Descriptor, labels export.Labels, a aggregator.
}, nil
}

// stringKeyValues transforms a KeyValues into an OTLP StringKeyValues.
func stringKeyValues(kvs []core.KeyValue) []*commonpb.StringKeyValue {
result := make([]*commonpb.StringKeyValue, 0, len(kvs))
for _, kv := range kvs {
// stringKeyValues transforms a label iterator into an OTLP StringKeyValues.
func stringKeyValues(iter export.LabelIterator) []*commonpb.StringKeyValue {
l := iter.Len()
if l == 0 {
// TODO: That looks like a pointless allocation in case of
// no labels, but returning nil from this function makes
// the test fail.
return []*commonpb.StringKeyValue{}
}
result := make([]*commonpb.StringKeyValue, 0, l)
for iter.Next() {
kv := iter.Label()
result = append(result, &commonpb.StringKeyValue{
Key: string(kv.Key),
Value: kv.Value.Emit(),
Expand Down
13 changes: 7 additions & 6 deletions exporters/otlp/internal/transform/metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ func TestStringKeyValues(t *testing.T) {
}

for _, test := range tests {
assert.Equal(t, test.expected, stringKeyValues(test.kvs))
iter := export.LabelSlice(test.kvs).Iter()
assert.Equal(t, test.expected, stringKeyValues(iter))
}
}

Expand Down Expand Up @@ -149,7 +150,7 @@ func TestMinMaxSumCountMetricDescriptor(t *testing.T) {
metric.WithKeys(test.keys...),
metric.WithDescription(test.description),
metric.WithUnit(test.unit))
labels := export.NewLabels(test.labels, "", nil)
labels := export.NewLabels(export.LabelSlice(test.labels), "", nil)
got, err := minMaxSumCount(&desc, labels, mmsc)
if assert.NoError(t, err) {
assert.Equal(t, test.expected, got.MetricDescriptor)
Expand All @@ -159,7 +160,7 @@ func TestMinMaxSumCountMetricDescriptor(t *testing.T) {

func TestMinMaxSumCountDatapoints(t *testing.T) {
desc := metric.NewDescriptor("", metric.MeasureKind, core.Int64NumberKind)
labels := export.NewLabels([]core.KeyValue{}, "", nil)
labels := export.NewLabels(export.LabelSlice(nil), "", nil)
mmsc := minmaxsumcount.New(&desc)
assert.NoError(t, mmsc.Update(context.Background(), 1, &desc))
assert.NoError(t, mmsc.Update(context.Background(), 10, &desc))
Expand Down Expand Up @@ -250,7 +251,7 @@ func TestSumMetricDescriptor(t *testing.T) {
metric.WithDescription(test.description),
metric.WithUnit(test.unit),
)
labels := export.NewLabels(test.labels, "", nil)
labels := export.NewLabels(export.LabelSlice(test.labels), "", nil)
got, err := sum(&desc, labels, sumAgg.New())
if assert.NoError(t, err) {
assert.Equal(t, test.expected, got.MetricDescriptor)
Expand All @@ -260,7 +261,7 @@ func TestSumMetricDescriptor(t *testing.T) {

func TestSumInt64DataPoints(t *testing.T) {
desc := metric.NewDescriptor("", metric.MeasureKind, core.Int64NumberKind)
labels := export.NewLabels([]core.KeyValue{}, "", nil)
labels := export.NewLabels(export.LabelSlice(nil), "", nil)
s := sumAgg.New()
assert.NoError(t, s.Update(context.Background(), core.Number(1), &desc))
s.Checkpoint(context.Background(), &desc)
Expand All @@ -274,7 +275,7 @@ func TestSumInt64DataPoints(t *testing.T) {

func TestSumFloat64DataPoints(t *testing.T) {
desc := metric.NewDescriptor("", metric.MeasureKind, core.Float64NumberKind)
labels := export.NewLabels([]core.KeyValue{}, "", nil)
labels := export.NewLabels(export.LabelSlice(nil), "", nil)
s := sumAgg.New()
assert.NoError(t, s.Update(context.Background(), core.NewFloat64Number(1), &desc))
s.Checkpoint(context.Background(), &desc)
Expand Down
Loading

0 comments on commit a01f63b

Please sign in to comment.