Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/resourcedetection] support http detector and periodically do detect action #26

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions processor/resourcedetectionprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@
package resourcedetectionprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor"

import (
"time"

"go.opentelemetry.io/collector/config/confighttp"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/http"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/aws/ec2"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/aws/ecs"
Expand Down Expand Up @@ -41,6 +45,9 @@ type Config struct {
// If a supplied attribute is not a valid attribute of a supplied detector it will be ignored.
// Deprecated: Please use detector's resource_attributes config instead
Attributes []string `mapstructure:"attributes"`

// interval of detect action
DetectInterval time.Duration `mapstructure:"detect_interval"`
}

// DetectorConfig contains user-specified configurations unique to all individual detectors
Expand Down Expand Up @@ -86,6 +93,9 @@ type DetectorConfig struct {

// K8SNode contains user-specified configurations for the K8SNode detector
K8SNodeConfig k8snode.Config `mapstructure:"k8snode"`

// Http contains user-specified configurations for the Http detector
HttpConfig http.Config `mapstructure:"http"`
}

func detectorCreateDefaultConfig() DetectorConfig {
Expand All @@ -104,6 +114,7 @@ func detectorCreateDefaultConfig() DetectorConfig {
SystemConfig: system.CreateDefaultConfig(),
OpenShiftConfig: openshift.CreateDefaultConfig(),
K8SNodeConfig: k8snode.CreateDefaultConfig(),
HttpConfig: http.CreateDefaultConfig(),
}
}

Expand Down Expand Up @@ -137,6 +148,8 @@ func (d *DetectorConfig) GetConfigFromType(detectorType internal.DetectorType) i
return d.OpenShiftConfig
case k8snode.TypeStr:
return d.K8SNodeConfig
case http.TypeStr:
return d.HttpConfig
default:
return nil
}
Expand Down
5 changes: 5 additions & 0 deletions processor/resourcedetectionprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"sync"
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/http"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/consumer"
Expand Down Expand Up @@ -63,6 +65,7 @@ func NewFactory() processor.Factory {
system.TypeStr: system.NewDetector,
openshift.TypeStr: openshift.NewDetector,
k8snode.TypeStr: k8snode.NewDetector,
http.TypeStr: http.NewDetector,
})

f := &factory{
Expand Down Expand Up @@ -90,6 +93,7 @@ func createDefaultConfig() component.Config {
Override: true,
Attributes: nil,
DetectorConfig: detectorCreateDefaultConfig(),
DetectInterval: time.Minute * 10,
// TODO: Once issue(https://github.com/open-telemetry/opentelemetry-collector/issues/4001) gets resolved,
// Set the default value of 'hostname_source' here instead of 'system' detector
}
Expand Down Expand Up @@ -182,6 +186,7 @@ func (f *factory) getResourceDetectionProcessor(
override: oCfg.Override,
httpClientSettings: oCfg.ClientConfig,
telemetrySettings: params.TelemetrySettings,
detectInterval: oCfg.DetectInterval,
}, nil
}

Expand Down
24 changes: 24 additions & 0 deletions processor/resourcedetectionprocessor/internal/http/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package http // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/http"
import (
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configopaque"
)

type Config struct {
// APIKey is the authentication token
APIKey configopaque.String `mapstructure:"api_key"`

// API URL to use
APIURL string `mapstructure:"api_url"`

confighttp.ClientConfig `mapstructure:",squash"`
}

func CreateDefaultConfig() Config {
return Config{
APIURL: "http://localhost:8080",
}
}
98 changes: 98 additions & 0 deletions processor/resourcedetectionprocessor/internal/http/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package http // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/http"
import (
"context"
"fmt"
"io"
"net/http"
"time"

jsoniter "github.com/json-iterator/go"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/processor"
conventions "go.opentelemetry.io/collector/semconv/v1.22.0"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal"
)

const (
// TypeStr is type of detector.
TypeStr = "http"
)

type resourceAttribute struct {
Key string
Value string
}

var _ internal.Detector = (*detector)(nil)

type detector struct {
logger *zap.Logger
//
set component.TelemetrySettings
interval time.Duration
client *http.Client
apiURL string
apiKey configopaque.String
requestIntervalTicker *time.Ticker
}

// NewDetector returns a detector which can detect resource attributes on Heroku
func NewDetector(set processor.Settings, dcfg internal.DetectorConfig) (internal.Detector, error) {
cfg := dcfg.(Config)

if cfg.APIURL == "" {
return nil, fmt.Errorf("apiUrl could not be empty")
}

return &detector{
apiKey: cfg.APIKey,
apiURL: cfg.APIURL,
logger: set.Logger,
// TODO: interval request
interval: time.Second * 5,
}, nil
}

// Detect detects http response metadata and returns a resource with the available ones
func (d detector) Detect(ctx context.Context) (resource pcommon.Resource, schemaURL string, err error) {
res := pcommon.NewResource()

detectedResources := d.requestResourceAttributes()

for _, resAttr := range detectedResources {
res.Attributes().PutStr(resAttr.Key, resAttr.Value)
}

return res, conventions.SchemaURL, nil
}

func (d detector) requestResourceAttributes() []resourceAttribute {
var resources []resourceAttribute
resp, err := http.Get(d.apiURL)
if err != nil {
d.logger.Warn("Failed to fetch resource", zap.Error(err))
return resources
}

defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
d.logger.Warn("Failed to fetch resource", zap.Error(err))
return resources
}

err = jsoniter.Unmarshal(body, &resources)
if err != nil {
d.logger.Warn("Failed to fetch resource", zap.Error(err))
return resources
}

return resources
}
40 changes: 40 additions & 0 deletions processor/resourcedetectionprocessor/internal/http/http_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package http

import (
"context"
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/processor/processortest"
conventions "go.opentelemetry.io/collector/semconv/v1.22.0"
)

type mockMetadata struct {
mock.Mock
}

func TestDetect(t *testing.T) {
handler := http.NotFound
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
handler(w, r)
}))
defer ts.Close()
handler = func(w http.ResponseWriter, r *http.Request) {
outPut := `[{"key":"attributes_1","value":"foo"},{"key":"attributes_2","value":"bar"}]`
_, _ = w.Write([]byte(outPut))
}
defaultCfg := CreateDefaultConfig()
defaultCfg.APIURL = ts.URL

d, err := NewDetector(processortest.NewNopSettings(), defaultCfg)
require.NoError(t, err)
res, schemaURL, err := d.Detect(context.Background())
require.NoError(t, err)
require.Equal(t, 2, res.Attributes().Len())
require.NotNil(t, res)
assert.Equal(t, conventions.SchemaURL, schemaURL)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
type: resourcedetectionprocessor/http

parent: resourcedetection

status:
class: pkg
codeowners:
active: [JaredTan95]
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,9 @@ func NewResourceProvider(logger *zap.Logger, timeout time.Duration, attributesTo
}

func (p *ResourceProvider) Get(ctx context.Context, client *http.Client) (resource pcommon.Resource, schemaURL string, err error) {
p.once.Do(func() {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, client.Timeout)
defer cancel()
p.detectResource(ctx)
})
ctx, cancel := context.WithTimeout(ctx, client.Timeout)
defer cancel()
p.detectResource(ctx)

return p.detectedResource.resource, p.detectedResource.schemaURL, p.detectedResource.err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ package resourcedetectionprocessor // import "github.com/open-telemetry/opentele

import (
"context"
"net/http"
"time"

"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confighttp"
Expand All @@ -23,15 +27,28 @@ type resourceDetectionProcessor struct {
override bool
httpClientSettings confighttp.ClientConfig
telemetrySettings component.TelemetrySettings
detectInterval time.Duration
}

// Start is invoked during service startup.
func (rdp *resourceDetectionProcessor) Start(ctx context.Context, host component.Host) error {
client, _ := rdp.httpClientSettings.ToClient(ctx, host, rdp.telemetrySettings)
ctx = internal.ContextWithClient(ctx, client)
go rdp.tickerDetect(ctx, client)
return nil
}

func (rdp *resourceDetectionProcessor) tickerDetect(ctx context.Context, client *http.Client) {
intervalTicker := time.NewTicker(rdp.detectInterval)
defer intervalTicker.Stop()

var err error
rdp.resource, rdp.schemaURL, err = rdp.provider.Get(ctx, client)
return err
for range intervalTicker.C {
rdp.resource, rdp.schemaURL, err = rdp.provider.Get(ctx, client)
if err != nil {
rdp.telemetrySettings.Logger.Error("failed to retrieve resource from provider: %v", zap.Error(err))
}
}
}

// processTraces implements the ProcessTracesFunc type.
Expand Down
62 changes: 62 additions & 0 deletions processor/resourcedetectionprocessor/testdata/otel-col-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317

exporters:
debug:
verbosity: detailed
sampling_initial: 5
sampling_thereafter: 200
otlp/jaeger:
endpoint: localhost:14317
tls:
insecure: true
retry_on_failure:
enabled: true
max_elapsed_time: 500s
sending_queue:
enabled: true

processors:
batch:
resourcedetection/sgm:
detectors: [env,http]
timeout: 2s
override: false
detect_interval: 10s
http:
api_url: https://apifoxmock.com/m1/3552517-2307922-default/mock/auto-tagging
resourcedetection/system:
detectors: [env, system]
timeout: 2s
override: false
system:
resource_attributes:
host.name:
enabled: true
host.id:
enabled: true
os.type:
enabled: true
attributes: ["a", "b"]

extensions:
health_check:
pprof:
endpoint: :1888
zpages:
endpoint: :55679

service:
extensions: [pprof, zpages, health_check]
pipelines:
traces:
receivers: [otlp]
processors: [batch,resourcedetection/sgm]
exporters: [debug,otlp/jaeger]
metrics:
receivers: [otlp]
processors: [batch,resourcedetection/sgm]
exporters: [debug]
Loading