Skip to content

Commit

Permalink
Merge branch 'main' into kafkaexporter_zipkin_encoding
Browse files Browse the repository at this point in the history
  • Loading branch information
yaroliakh authored Aug 18, 2023
2 parents bc96361 + 06f1f57 commit 1a65fea
Show file tree
Hide file tree
Showing 48 changed files with 1,113 additions and 110 deletions.
9 changes: 9 additions & 0 deletions .chloggen/opensearch-exporter-implmentation.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
change_type: enhancement

component: opensearchexporter

note: implement [OpenSearch](https://opensearch.org/) exporter.

issues: [23611]

subtext:
8 changes: 4 additions & 4 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ processor/deltatorateprocessor/ @open-telemetry/collect
processor/filterprocessor/ @open-telemetry/collector-contrib-approvers @TylerHelmuth @boostchicken
processor/groupbyattrsprocessor/ @open-telemetry/collector-contrib-approvers @rnishtala-sumo
processor/groupbytraceprocessor/ @open-telemetry/collector-contrib-approvers @jpkrohling
processor/k8sattributesprocessor/ @open-telemetry/collector-contrib-approvers @dmitryax @rmfitzpatrick @fatsheep9146
processor/k8sattributesprocessor/ @open-telemetry/collector-contrib-approvers @dmitryax @rmfitzpatrick @fatsheep9146 @TylerHelmuth
processor/logstransformprocessor/ @open-telemetry/collector-contrib-approvers @djaglowski @dehaansa
processor/metricsgenerationprocessor/ @open-telemetry/collector-contrib-approvers @Aneurysm9
processor/metricstransformprocessor/ @open-telemetry/collector-contrib-approvers @dmitryax
Expand Down Expand Up @@ -208,9 +208,9 @@ receiver/influxdbreceiver/ @open-telemetry/collect
receiver/jaegerreceiver/ @open-telemetry/collector-contrib-approvers @jpkrohling
receiver/jmxreceiver/ @open-telemetry/collector-contrib-approvers @rmfitzpatrick
receiver/journaldreceiver/ @open-telemetry/collector-contrib-approvers @sumo-drosiek @djaglowski
receiver/k8sclusterreceiver/ @open-telemetry/collector-contrib-approvers @dmitryax
receiver/k8seventsreceiver/ @open-telemetry/collector-contrib-approvers @dmitryax
receiver/k8sobjectsreceiver/ @open-telemetry/collector-contrib-approvers @dmitryax @hvaghani221
receiver/k8sclusterreceiver/ @open-telemetry/collector-contrib-approvers @dmitryax @TylerHelmuth
receiver/k8seventsreceiver/ @open-telemetry/collector-contrib-approvers @dmitryax @TylerHelmuth
receiver/k8sobjectsreceiver/ @open-telemetry/collector-contrib-approvers @dmitryax @hvaghani221 @TylerHelmuth
receiver/kafkametricsreceiver/ @open-telemetry/collector-contrib-approvers @dmitryax
receiver/kafkareceiver/ @open-telemetry/collector-contrib-approvers @pavolloffay @MovieStoreGuy
receiver/kubeletstatsreceiver/ @open-telemetry/collector-contrib-approvers @dmitryax
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,9 @@ func doubleSummaryMetricsToLogs(name string, data pmetric.SummaryDataPointSlice,
}

func metricDataToLogServiceData(md pmetric.Metric, defaultLabels KeyValues) (logs []*sls.Log) {
//exhaustive:enforce
switch md.Type() {
case pmetric.MetricTypeEmpty:
case pmetric.MetricTypeEmpty, pmetric.MetricTypeExponentialHistogram:
break
case pmetric.MetricTypeGauge:
return numberMetricsToLogs(md.Name(), md.Gauge().DataPoints(), defaultLabels)
Expand Down
5 changes: 4 additions & 1 deletion exporter/cassandraexporter/exporter_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,10 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error {
for k := 0; k < rs.Len(); k++ {
r := rs.At(k)
logAttr := attributesToMap(r.Attributes().AsRaw())
bodyByte, _ := json.Marshal(r.Body().AsRaw())
bodyByte, err := json.Marshal(r.Body().AsRaw())
if err != nil {
return err
}

insertLogError := e.client.Query(fmt.Sprintf(insertLogTableSQL, e.cfg.Keyspace, e.cfg.LogsTable),
r.Timestamp().AsTime(),
Expand Down
4 changes: 2 additions & 2 deletions exporter/fileexporter/buffered_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestBufferedWrites(t *testing.T) {
}

var (
benchmarkErr error
errBenchmark error
)

func BenchmarkWriter(b *testing.B) {
Expand Down Expand Up @@ -83,7 +83,7 @@ func BenchmarkWriter(b *testing.B) {
for i := 0; i < b.N; i++ {
_, err = w.Write(payload)
}
benchmarkErr = multierr.Combine(err, w.Close())
errBenchmark = multierr.Combine(err, w.Close())
})
}
}
Expand Down
11 changes: 7 additions & 4 deletions exporter/opensearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@ The documents are sent using [observability catalog](https://github.com/opensear

### HTTP Connection Options
OpenSearch export supports standard (HTTP client settings](https://github.com/open-telemetry/opentelemetry-collector/tree/main/config/confighttp#client-configuration).
- `endpoint` (required) `<url>:<port>` of OpenSearch node to send data to.
- `http.endpoint` (required) `<url>:<port>` of OpenSearch node to send data to.

### TLS settings
Supports standard TLS settings as part of HTTP settings. See [TLS Configuration/Client Settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md#client-configuration).

### Retry Options
- `retry_on_failure`: See [retry_on_failure](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md)

### Timeout Options
- `timeout` : See [timeout](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md)
## Example

```yaml
Expand All @@ -36,9 +38,10 @@ extensions:

exporters:
opensearch/trace:
endpoint: https://opensearch.example.com:9200
auth:
authenticator: basicauth/client
http:
endpoint: https://opensearch.example.com:9200
auth:
authenticator: basicauth/client
# ······
service:
pipelines:
Expand Down
31 changes: 25 additions & 6 deletions exporter/opensearchexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,43 @@ import (

"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.uber.org/multierr"
)

const (
// defaultNamespace value is used as ssoTracesExporter.Namespace when component.Config.Namespace is not set.
defaultNamespace = "namespace"

// defaultDataset value is used as ssoTracesExporter.Dataset when component.Config.Dataset is not set.
defaultDataset = "default"
)

// Config defines configuration for OpenSearch exporter.
type Config struct {
confighttp.HTTPClientSettings `mapstructure:",squash"`
exporterhelper.RetrySettings `mapstructure:"retry_on_failure"`
Namespace string `mapstructure:"namespace"`
Dataset string `mapstructure:"dataset"`
confighttp.HTTPClientSettings `mapstructure:"http"`
exporterhelper.RetrySettings `mapstructure:"retry_on_failure"`
exporterhelper.TimeoutSettings `mapstructure:",squash"`
Namespace string `mapstructure:"namespace"`
Dataset string `mapstructure:"dataset"`
}

var (
errConfigNoEndpoint = errors.New("endpoint must be specified")
errDatasetNoValue = errors.New("dataset must be specified")
errNamespaceNoValue = errors.New("namespace must be specified")
)

// Validate validates the opensearch server configuration.
func (cfg *Config) Validate() error {
var multiErr []error
if len(cfg.Endpoint) == 0 {
return errConfigNoEndpoint
multiErr = append(multiErr, errConfigNoEndpoint)
}
if len(cfg.Dataset) == 0 {
multiErr = append(multiErr, errDatasetNoValue)
}
if len(cfg.Namespace) == 0 {
multiErr = append(multiErr, errNamespaceNoValue)
}
return nil
return multierr.Combine(multiErr...)
}
42 changes: 38 additions & 4 deletions exporter/opensearchexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ func TestLoadConfig(t *testing.T) {
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
require.NoError(t, err)

defaultCfg := newDefaultConfig()
defaultCfg.(*Config).Endpoint = "https://opensearch.example.com:9200"
sampleEndpoint := "https://opensearch.example.com:9200"
sampleCfg := withDefaultConfig(func(config *Config) {
config.Endpoint = sampleEndpoint
})
maxIdleConns := 100
idleConnTimeout := 90 * time.Second

Expand All @@ -36,7 +38,7 @@ func TestLoadConfig(t *testing.T) {
}{
{
id: component.NewIDWithName(typeStr, ""),
expected: defaultCfg,
expected: sampleCfg,
configValidateAssert: assert.NoError,
},
{
Expand All @@ -45,7 +47,7 @@ func TestLoadConfig(t *testing.T) {
Dataset: "ngnix",
Namespace: "eu",
HTTPClientSettings: confighttp.HTTPClientSettings{
Endpoint: "https://opensearch.example.com:9200",
Endpoint: sampleEndpoint,
Timeout: 2 * time.Minute,
Headers: map[string]configopaque.String{
"myheader": "test",
Expand All @@ -65,6 +67,28 @@ func TestLoadConfig(t *testing.T) {
},
configValidateAssert: assert.NoError,
},
{
id: component.NewIDWithName(typeStr, "empty_dataset"),
expected: withDefaultConfig(func(config *Config) {
config.Endpoint = sampleEndpoint
config.Dataset = ""
config.Namespace = "eu"
}),
configValidateAssert: func(t assert.TestingT, err error, i ...interface{}) bool {
return assert.ErrorContains(t, err, errDatasetNoValue.Error())
},
},
{
id: component.NewIDWithName(typeStr, "empty_namespace"),
expected: withDefaultConfig(func(config *Config) {
config.Endpoint = sampleEndpoint
config.Dataset = "ngnix"
config.Namespace = ""
}),
configValidateAssert: func(t assert.TestingT, err error, i ...interface{}) bool {
return assert.ErrorContains(t, err, errNamespaceNoValue.Error())
},
},
}

for _, tt := range tests {
Expand All @@ -82,3 +106,13 @@ func TestLoadConfig(t *testing.T) {
})
}
}

// withDefaultConfig create a new default configuration
// and applies provided functions to it.
func withDefaultConfig(fns ...func(*Config)) *Config {
cfg := newDefaultConfig().(*Config)
for _, fn := range fns {
fn(cfg)
}
return cfg
}
23 changes: 14 additions & 9 deletions exporter/opensearchexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/pdata/ptrace"
)

const (
Expand All @@ -32,20 +32,25 @@ func NewFactory() exporter.Factory {
func newDefaultConfig() component.Config {
return &Config{
HTTPClientSettings: confighttp.NewDefaultHTTPClientSettings(),
Namespace: "namespace",
Dataset: "default",
Namespace: defaultNamespace,
Dataset: defaultDataset,
RetrySettings: exporterhelper.NewDefaultRetrySettings(),
}
}

func createTracesExporter(ctx context.Context,
set exporter.CreateSettings,
cfg component.Config) (exporter.Traces, error) {
c := cfg.(*Config)
te, e := newSSOTracesExporter(c, set)
if e != nil {
return nil, e
}

return exporterhelper.NewTracesExporter(ctx, set, cfg, func(ctx context.Context, ld ptrace.Traces) error {
return nil
},
exporterhelper.WithShutdown(func(ctx context.Context) error {
return nil
}))
return exporterhelper.NewTracesExporter(ctx, set, cfg,
te.pushTraceData,
exporterhelper.WithStart(te.Start),
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
exporterhelper.WithRetry(c.RetrySettings),
exporterhelper.WithTimeout(c.TimeoutSettings))
}
50 changes: 50 additions & 0 deletions exporter/opensearchexporter/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package opensearchexporter

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/exporter/exportertest"
)

func TestCreateDefaultConfig(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
assert.NotNil(t, cfg, "failed to create default config")
assert.NoError(t, componenttest.CheckConfigStruct(cfg))
}

func TestFactory_CreateMetricsExporter_Fail(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
params := exportertest.NewNopCreateSettings()
_, err := factory.CreateMetricsExporter(context.Background(), params, cfg)
require.Error(t, err, "expected an error when creating a traces exporter")
}

func TestFactory_CreateTracesExporter_Fail(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
params := exportertest.NewNopCreateSettings()
_, err := factory.CreateTracesExporter(context.Background(), params, cfg)
require.Error(t, err, "expected an error when creating a traces exporter")
}

func TestFactory_CreateTracesExporter(t *testing.T) {
factory := NewFactory()
cfg := withDefaultConfig(func(cfg *Config) {
cfg.Endpoint = "https://opensearch.example.com:9200"
})
params := exportertest.NewNopCreateSettings()
exporter, err := factory.CreateTracesExporter(context.Background(), params, cfg)
require.NoError(t, err)
require.NotNil(t, exporter)

require.NoError(t, exporter.Shutdown(context.TODO()))
}
9 changes: 5 additions & 4 deletions exporter/opensearchexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,19 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/exporter/opense
go 1.20

require (
github.com/opensearch-project/opensearch-go/v2 v2.3.0
github.com/stretchr/testify v1.8.4
go.opentelemetry.io/collector/component v0.83.0
go.opentelemetry.io/collector/config/configauth v0.83.0
go.opentelemetry.io/collector/config/confighttp v0.83.0
go.opentelemetry.io/collector/config/configopaque v0.83.0
go.opentelemetry.io/collector/confmap v0.83.0
go.opentelemetry.io/collector/consumer v0.83.0
go.opentelemetry.io/collector/exporter v0.83.0
go.opentelemetry.io/collector/pdata v1.0.0-rcv0014
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.25.0
gopkg.in/yaml.v3 v3.0.1
)

require (
Expand Down Expand Up @@ -40,7 +45,6 @@ require (
go.opentelemetry.io/collector/config/configtelemetry v0.83.0 // indirect
go.opentelemetry.io/collector/config/configtls v0.83.0 // indirect
go.opentelemetry.io/collector/config/internal v0.83.0 // indirect
go.opentelemetry.io/collector/consumer v0.83.0 // indirect
go.opentelemetry.io/collector/extension v0.83.0 // indirect
go.opentelemetry.io/collector/extension/auth v0.83.0 // indirect
go.opentelemetry.io/collector/featuregate v1.0.0-rcv0014 // indirect
Expand All @@ -50,13 +54,10 @@ require (
go.opentelemetry.io/otel v1.16.0 // indirect
go.opentelemetry.io/otel/metric v1.16.0 // indirect
go.opentelemetry.io/otel/trace v1.16.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.25.0 // indirect
golang.org/x/net v0.14.0 // indirect
golang.org/x/sys v0.11.0 // indirect
golang.org/x/text v0.12.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc // indirect
google.golang.org/grpc v1.57.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading

0 comments on commit 1a65fea

Please sign in to comment.