Skip to content

Commit

Permalink
Merge branch 'beats-otel-collector' into esotel-beats
Browse files Browse the repository at this point in the history
  • Loading branch information
VihasMakwana committed Sep 26, 2024
2 parents b92b041 + cc9b664 commit c2aa64b
Show file tree
Hide file tree
Showing 15 changed files with 19,102 additions and 11,900 deletions.
30,350 changes: 18,455 additions & 11,895 deletions NOTICE.txt

Large diffs are not rendered by default.

30 changes: 29 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,12 @@ require (
go.opentelemetry.io/collector/processor/memorylimiterprocessor v0.110.0
go.opentelemetry.io/collector/receiver v0.110.0
google.golang.org/genproto/googleapis/api v0.0.0-20240903143218-8af14fe29dc1
go.opentelemetry.io/collector/config/confighttp v0.110.0
go.opentelemetry.io/collector/config/configopaque v1.16.0
go.opentelemetry.io/collector/config/configtls v1.16.0
go.opentelemetry.io/collector/consumer v0.110.0
go.opentelemetry.io/collector/exporter v0.110.0
google.golang.org/genproto/googleapis/api v0.0.0-20240725223205-93522f1f2a9f
gopkg.in/natefinch/lumberjack.v2 v2.2.1
)

Expand Down Expand Up @@ -400,7 +406,6 @@ require (
github.com/stoewer/go-strcase v1.2.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
github.com/valyala/fastjson v1.6.4 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a // indirect
Expand Down Expand Up @@ -460,6 +465,28 @@ require (
go.opentelemetry.io/otel/sdk v1.30.0 // indirect
go.opentelemetry.io/otel/sdk/log v0.6.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.30.0 // indirect
go.opentelemetry.io/collector/client v1.16.0 // indirect
go.opentelemetry.io/collector/component v0.110.0 // indirect
go.opentelemetry.io/collector/config/configauth v0.110.0 // indirect
go.opentelemetry.io/collector/config/configcompression v1.16.0 // indirect
go.opentelemetry.io/collector/config/configretry v1.16.0 // indirect
go.opentelemetry.io/collector/config/configtelemetry v0.110.0 // indirect
go.opentelemetry.io/collector/config/internal v0.110.0 // indirect
go.opentelemetry.io/collector/consumer/consumerprofiles v0.110.0 // indirect
go.opentelemetry.io/collector/extension v0.110.0 // indirect
go.opentelemetry.io/collector/extension/auth v0.110.0 // indirect
go.opentelemetry.io/collector/extension/experimental/storage v0.110.0 // indirect
go.opentelemetry.io/collector/internal/globalsignal v0.110.0 // indirect
go.opentelemetry.io/collector/pdata v1.16.0 // indirect
go.opentelemetry.io/collector/pdata/pprofile v0.110.0 // indirect
go.opentelemetry.io/collector/pipeline v0.110.0 // indirect
go.opentelemetry.io/collector/semconv v0.110.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.55.0 // indirect
go.opentelemetry.io/otel v1.30.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0 // indirect
go.opentelemetry.io/otel/metric v1.30.0 // indirect
go.opentelemetry.io/otel/sdk v1.30.0 // indirect
go.opentelemetry.io/otel/trace v1.30.0 // indirect
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
go.uber.org/ratelimit v0.3.1 // indirect
Expand All @@ -468,6 +495,7 @@ require (
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
gonum.org/v1/gonum v0.15.1 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/klog/v2 v2.110.1 // indirect
k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 // indirect
Expand Down
12 changes: 12 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1865,6 +1865,8 @@ go.opentelemetry.io/collector/internal/globalsignal v0.110.0 h1:S6bfFEiek8vJeXAb
go.opentelemetry.io/collector/internal/globalsignal v0.110.0/go.mod h1:GqMXodPWOxK5uqpX8MaMXC2389y2XJTa5nPwf8FYDK8=
go.opentelemetry.io/collector/otelcol v0.110.0 h1:nZjsCSQ6StXaJUxyFBH+VC35BQJ+Dp0sU5l7UP72bjs=
go.opentelemetry.io/collector/otelcol v0.110.0/go.mod h1:x7KFlMkEirBPp9P9oisM05oLztSqVFps9D7jPDuqr8A=
go.opentelemetry.io/collector/internal/globalsignal v0.110.0 h1:S6bfFEiek8vJeXAbciWS7W8UR6ZrVJB3ftNyFTMHQaY=
go.opentelemetry.io/collector/internal/globalsignal v0.110.0/go.mod h1:GqMXodPWOxK5uqpX8MaMXC2389y2XJTa5nPwf8FYDK8=
go.opentelemetry.io/collector/pdata v1.16.0 h1:g02K8jlRnmQ7TQDuXpdgVL6vIxIVqr5Gbb1qIR27rto=
go.opentelemetry.io/collector/pdata v1.16.0/go.mod h1:YZZJIt2ehxosYf/Y1pbvexjNWsIGNNrzzlCTO9jC1F4=
go.opentelemetry.io/collector/pdata/pprofile v0.110.0 h1:DknuOGOdjYIzVnromhVcW5rWyyjPahf65UAfgXz1xfo=
Expand Down Expand Up @@ -1923,6 +1925,12 @@ go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.30.0 h1:kn1BudCgwtE7PxL
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.30.0/go.mod h1:ljkUDtAMdleoi9tIG1R6dJUpVwDcYjw3J2Q6Q/SuiC0=
go.opentelemetry.io/otel/log v0.6.0 h1:nH66tr+dmEgW5y+F9LanGJUBYPrRgP4g2EkmPE3LeK8=
go.opentelemetry.io/otel/log v0.6.0/go.mod h1:KdySypjQHhP069JX0z/t26VHwa8vSwzgaKmXtIB3fJM=
go.opentelemetry.io/otel v1.30.0 h1:F2t8sK4qf1fAmY9ua4ohFS/K+FUuOPemHUIXHtktrts=
go.opentelemetry.io/otel v1.30.0/go.mod h1:tFw4Br9b7fOS+uEao81PJjVMjW/5fvNCbpsDIXqP0pc=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0 h1:3Q/xZUyC1BBkualc9ROb4G8qkH90LXEIICcs5zv1OYY=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0/go.mod h1:s75jGIWA9OfCMzF0xr+ZgfrB5FEbbV7UuYo32ahUiFI=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.28.0 h1:j9+03ymgYhPKmeXGk5Zu+cIZOlVzd9Zv7QIiyItjFBU=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.28.0/go.mod h1:Y5+XiUG4Emn1hTfciPzGPJaSI+RpDts6BnCIir0SLqk=
go.opentelemetry.io/otel/metric v1.30.0 h1:4xNulvn9gjzo4hjg+wzIKG7iNFEaBMX00Qd4QIZs7+w=
go.opentelemetry.io/otel/metric v1.30.0/go.mod h1:aXTfST94tswhWEb+5QjlSqG+cZlmyXy/u8jFpor3WqQ=
go.opentelemetry.io/otel/sdk v1.30.0 h1:cHdik6irO49R5IysVhdn8oaiR9m8XluDaJAs4DfOrYE=
Expand Down Expand Up @@ -2542,6 +2550,10 @@ google.golang.org/genproto/googleapis/api v0.0.0-20240903143218-8af14fe29dc1 h1:
google.golang.org/genproto/googleapis/api v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:qpvKtACPCQhAdu3PyQgV4l3LMXZEtft7y8QcarRsp9I=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 h1:pPJltXNxVzT4pK9yD8vR9X75DaWYYmLGMsEvBfFQZzQ=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
google.golang.org/genproto/googleapis/api v0.0.0-20240725223205-93522f1f2a9f h1:b1Ln/PG8orm0SsBbHZWke8dDp2lrCD4jSmfglFpTZbk=
google.golang.org/genproto/googleapis/api v0.0.0-20240725223205-93522f1f2a9f/go.mod h1:AHT0dDg3SoMOgZGnZk29b5xTbPHMoEC8qthmBLJCpys=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd h1:6TEm2ZxXoQmFWFlt1vNxvVOa1Q0dXFQD1m/rYjXmS0E=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.0/go.mod h1:chYK+tFQF0nDUGJgXMSgLCQk3phJEuONr2DCgLDdAQM=
Expand Down
133 changes: 133 additions & 0 deletions libbeat/outputs/elasticsearch/config_otel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package elasticsearch

import (
"fmt"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/exporter/exporterbatcher"

"github.com/elastic/beats/v7/libbeat/cloudid"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/elastic-agent-libs/config"
)

// ToOtelConfig converts a Beat config into an OTel elasticsearch exporter config
func ToOtelConfig(beatCfg *config.C) (*elasticsearchexporter.Config, error) {
// Handle cloud.id the same way Beats does, this will also handle
// extracting the Kibana URL (which is required to handle ILM on
// Beats side (currently not supported by ES OTel exporter).
if err := cloudid.OverwriteSettings(beatCfg); err != nil {
return nil, fmt.Errorf("cannot read cloudid: %w", err)
}

esRawCfg, err := beatCfg.Child("output.elasticsearch", -1)
if err != nil {
return nil, fmt.Errorf("could not parse Elasticsearch output configuration: %w", err)
}
escfg := defaultConfig
if err := esRawCfg.Unpack(&escfg); err != nil {
return nil, err
}

esToOTelOptions := struct {
Index string `config:"index"`
Pipeline string `config:"pipeline"`
ProxyURL string `config:"proxy_url"`
Hosts []string `config:"hosts" validate:"required"`
}{}

if err := esRawCfg.Unpack(&esToOTelOptions); err != nil {
return nil, fmt.Errorf("cannot parse Elasticsearch config: %w", err)
}

hosts := []string{}
for _, h := range esToOTelOptions.Hosts {
esURL, err := common.MakeURL(escfg.Protocol, escfg.Path, h, 9200)
if err != nil {
return nil, fmt.Errorf("cannot generate ES URL from host %q", err)
}
hosts = append(hosts, esURL)
}

// The workers config is can be configured using two keys, so we leverage
// the already existing code to handle it by using `output.HostWorkerCfg`.
workersCfg := outputs.HostWorkerCfg{}
if err := esRawCfg.Unpack(&workersCfg); err != nil {
return nil, fmt.Errorf("cannot read worker/workers from Elasticsearch config: %w", err)
}

headers := make(map[string]configopaque.String, len(escfg.Headers))
for k, v := range escfg.Headers {
headers[k] = configopaque.String(v)
}

otelTLSConfg, err := outputs.TLSCommonToOtel(escfg.Transport.TLS)
if err != nil {
return nil, fmt.Errorf("cannot convert SSL config into OTel: %w", err)
}

otelcfg := elasticsearchexporter.Config{
LogsIndex: esToOTelOptions.Index, // index
Pipeline: esToOTelOptions.Pipeline, // pipeline
Endpoints: hosts, // hosts, protocol, path, port
NumWorkers: workersCfg.NumWorkers(), // worker/workers

Authentication: elasticsearchexporter.AuthenticationSettings{
User: escfg.Username, // username
Password: configopaque.String(escfg.Password), // password
APIKey: configopaque.String(escfg.APIKey), //api_key
},

// HTTP Client configuration
ClientConfig: confighttp.ClientConfig{
ProxyURL: esToOTelOptions.ProxyURL, // proxy_url
Headers: headers, // headers
Timeout: escfg.Transport.Timeout, // timeout
IdleConnTimeout: &escfg.Transport.IdleConnTimeout, // idle_connection_connection_timeout
TLSSetting: otelTLSConfg,
},

// Backoff settings
Retry: elasticsearchexporter.RetrySettings{
Enabled: true,
InitialInterval: escfg.Backoff.Init, // backoff.init
MaxInterval: escfg.Backoff.Max, // backoff.max
},

// Batching configuration
Batcher: elasticsearchexporter.BatcherConfig{
Enabled: ptr(true),
MaxSizeConfig: exporterbatcher.MaxSizeConfig{
MaxSizeItems: escfg.BulkMaxSize, // bulk_max_size
},
},
}

return &otelcfg, nil
}

func ptr[T any](v T) *T {
var p T
p = v
return &p
}
124 changes: 124 additions & 0 deletions libbeat/outputs/elasticsearch/config_otel_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package elasticsearch

import (
_ "embed"
"strings"
"testing"
"time"

"github.com/elastic/elastic-agent-libs/config"
)

//go:embed testdata/filebeat.yml
var beatYAMLCfg string

//go:embed testdata/certs/client.crt
var clientCertPem string

//go:embed testdata/expectedCaPem.crt
var wantCAPem string

// Generating certs:
// Root CA: openssl req -x509 -ca -sha256 -days 1825 -newkey rsa:2048 -keyout rootCA.key -out rootCA.crt -passout pass:changeme
// Server Cert: openssl req -newkey rsa:2048 -keyout server.key -x509 -days 3650 -out server.crt -passout pass:changeme
// Client Cert: openssl req -newkey rsa:2048 -keyout client.key -x509 -days 3650 -out client.crt -passout pass:changeme -extensions usr_cert
func TestToOtelConfig(t *testing.T) {
beatCfg := config.MustNewConfigFrom(beatYAMLCfg)

otelCfg, err := ToOtelConfig(beatCfg)
if err != nil {
t.Fatalf("could not convert Beat config to OTel elasicsearch exporter: %s", err)
}

if otelCfg.Endpoint != "" {
t.Errorf("OTel endpoint must be emtpy got %s", otelCfg.Endpoint)
}

expectedHost := "https://es-hostname.elastic.co:443"
if len(otelCfg.Endpoints) != 1 || otelCfg.Endpoints[0] != expectedHost {
t.Errorf("OTel endpoints must contain only %q, got %q", expectedHost, otelCfg.Endpoints)
}

if got, want := otelCfg.Authentication.User, "elastic-cloud"; got != want {
t.Errorf("expecting User %q, got %q", want, got)
}

if got, want := string(otelCfg.Authentication.Password), "password"; got != want {
t.Errorf("expecting password to be '%s', got '%s' instead", want, got)
}

if got, want := string(otelCfg.Authentication.APIKey), "secret key"; got != want {
t.Errorf("expecting api_key to be '%s', got '%s' instead", want, got)
}

if got, want := otelCfg.LogsIndex, "some-index"; got != want {
t.Errorf("expecting logs index to be '%s', got '%s' instead", want, got)
}

if got, want := otelCfg.Pipeline, "some-ingest-pipeline"; got != want {
t.Errorf("expecting pipeline to be '%s', got '%s' instead", want, got)
}

if got, want := otelCfg.ClientConfig.ProxyURL, "https://proxy.url"; got != want {
t.Errorf("expecting proxy URL to be '%s', got '%s' instead", want, got)
}

if got, want := string(otelCfg.ClientConfig.TLSSetting.CertPem), clientCertPem; got != want {
t.Errorf("expecting client certificate %q got %q", want, got)
}

gotCAPem := strings.TrimSpace(string(otelCfg.ClientConfig.TLSSetting.CAPem))
wantCAPem = strings.TrimSpace(wantCAPem)
if gotCAPem != wantCAPem {
t.Errorf("expecting CA PEM:\n%s\ngot:\n%s", wantCAPem, gotCAPem)
}

if !*otelCfg.Batcher.Enabled {
t.Error("expecting batcher.enabled to be true")
}

if got, want := otelCfg.Batcher.MaxSizeItems, 42; got != want {
t.Errorf("expecting batcher.max_size_items = %d got %d", want, got)
}

if !otelCfg.Retry.Enabled {
t.Error("expecting retyr.enabled to be true")
}

if got, want := otelCfg.Retry.InitialInterval, time.Second*42; got != want {
t.Errorf("expecting retry.initial_interval '%s', got '%s'", got, want)
}

if got, want := otelCfg.NumWorkers, 30; got != want {
t.Errorf("expecting num_workers %d got %d", want, got)
}

headers := map[string]string{
"X-Header-1": "foo",
"X-Bar-Header": "bar",
}

for k, v := range headers {
gotV := string(otelCfg.Headers[k])
if gotV != v {
t.Errorf("expecting header[%s]='%s', got '%s", k, v, gotV)
}
}
}
23 changes: 23 additions & 0 deletions libbeat/outputs/elasticsearch/testdata/certs/client.crt
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
-----BEGIN CERTIFICATE-----
MIID2jCCAsKgAwIBAgIUXSGhi1rVH7ftDmJ6TlavLsY/74MwDQYJKoZIhvcNAQEL
BQAwgY8xCzAJBgNVBAYTAlVTMRAwDgYDVQQIDAdGbG9yaWRhMRAwDgYDVQQHDAdP
cmxhbmRvMSEwHwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQxFzAVBgNV
BAMMDkVsYXN0aWMgQ2xpZW50MSAwHgYJKoZIhvcNAQkBFhFjbGllbnRAZWxhc3Rp
Yy5jbzAeFw0yNDA5MjUxOTAzNDZaFw0zNDA5MjMxOTAzNDZaMIGPMQswCQYDVQQG
EwJVUzEQMA4GA1UECAwHRmxvcmlkYTEQMA4GA1UEBwwHT3JsYW5kbzEhMB8GA1UE
CgwYSW50ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMRcwFQYDVQQDDA5FbGFzdGljIENs
aWVudDEgMB4GCSqGSIb3DQEJARYRY2xpZW50QGVsYXN0aWMuY28wggEiMA0GCSqG
SIb3DQEBAQUAA4IBDwAwggEKAoIBAQDAVGPlx4U2BpWfQlyMNraLMjdJAo4PjO2G
rDbwg2cAO4QFbMECEiNHakvuJ3zVVDO+HsBdkLWr8nO4iXmZDokfDrOrANJuqq16
p022soC8pJQz9uIBWTnxDGd/wdofi4H+V5uaMhw961sgB7GREyBWNRBQzhcFyQEP
XkR1/G52PcuzM5H9cnOSy7jc62g8Pkk8c2eZu3ADmvgWSH0b5pFUIvKsq068QjKP
qoHYXn38d/SSeCX57tzKsj+mBzp0cr1f9jeXmKeu68wPYG14aj9WmmY6ICAPvqPF
BKNLhXn2xPlZzv93zjiUR5bnitenVxvsmwjn5XvlgH56/fh3Y3aPAgMBAAGjLDAq
MAkGA1UdEwQCMAAwHQYDVR0OBBYEFJlsmqi3qid9YoWj4N7GQvAywRzbMA0GCSqG
SIb3DQEBCwUAA4IBAQCAtBwyiRYAGeAcN/UuMEcnMXP8QNrnCO/unoCbyFsByFQT
TcwMrS441hGPp/cAa8Fx0cP+oqrO99G1YHCzhprYVqIi/W9MsvRnR7Nh8SSS2/ld
0Gv9g+DU89NMzE5hlMCt5V0ydKbRj+ChKDsKlgQSopbrArjxHQv4Hb234HSZAR5N
OkJ1rNCF7wMD+xlNzEWZAHl7qjHuG8C4xWP207dXGYuY3064rBqv9hypLxj7RuZn
qesVBabxXBCL6Y1foh5OLLHyEWw28yfK/PnVdqU0lLrBhW9VJ6mQ9XCwZxf/tlSk
B3FafTQk4ZtU+4bVJuiAiQI7DeqpIFU6Lczds2gG
-----END CERTIFICATE-----
Loading

0 comments on commit c2aa64b

Please sign in to comment.