Skip to content

Commit

Permalink
Merge branch 'main' into promremotewrite-downscale-histograms-17565
Browse files Browse the repository at this point in the history
  • Loading branch information
krajorama committed Jul 14, 2023
2 parents 08f6cf4 + e426c2d commit d6499e6
Show file tree
Hide file tree
Showing 26 changed files with 1,341 additions and 153 deletions.
20 changes: 20 additions & 0 deletions .chloggen/k8s-cluster-receiver-jobs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Use this changelog template to create an entry for release notes.
# If your change doesn't affect end users, such as a test fix or a tooling change,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: k8sclusterreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Refactor k8s.job metrics to use mdatagen"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [10553]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: ""
17 changes: 17 additions & 0 deletions .chloggen/support-client-info-metadata.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: routingprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Enables processor to extract metadata from client.Info

# One or more tracking issues related to the change
issues: [20913]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
Enables processor to perform context based routing for payloads on the http server of otlp receiver
2 changes: 1 addition & 1 deletion .github/workflows/build-and-test-windows.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ jobs:
cache: false
- name: Cache Go
id: go-mod-cache
timeout-minutes: 5
timeout-minutes: 15
uses: actions/cache@v3
with:
path: |
Expand Down
2 changes: 1 addition & 1 deletion cmd/mdatagen/statusdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type Status struct {
Warnings []string `mapstructure:"warnings"`
}

func (s Status) SortedDistributions() []string {
func (s *Status) SortedDistributions() []string {
sorted := s.Distributions
sort.Slice(sorted, func(i, j int) bool {
if s.Distributions[i] == "core" {
Expand Down
7 changes: 3 additions & 4 deletions processor/k8sattributesprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,10 @@ Each association is specified as a list of sources of associations. A source is
In order to get an association applied, all the sources specified need to match.

Each sources rule is specified as a pair of `from` (representing the rule type) and `name` (representing the attribute name if `from` is set to `resource_attribute`).
Following rule types are available:
The following rule types are available:

**from: "connection"** - takes the IP attribute from connection context (if available)
**from: "resource_attribute"** - allows to specify the attribute name to lookup up in the list of attributes of the received Resource.
Semantic convention should be used for naming.
- `connection`: Takes the IP attribute from connection context (if available). In this case the processor must appear before any batching or tail sampling, which remove this information.
- `resource_attribute`: Allows specifying the attribute name to lookup in the list of attributes of the received Resource. Semantic convention should be used for naming.

Pod association configuration.

Expand Down
1 change: 1 addition & 0 deletions processor/routingprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ It is also possible to mix both the conventional routing configuration and the r

- [OTTL] statements can be applied only to resource attributes.
- Currently, it is not possible to specify the boolean statements without function invocation as the routing condition. It is required to provide the NOOP `route()` or any other supported function as part of the routing statement, see [#13545](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/13545) for more information.
- If data is received on OTLP http server, `include_metadata` must be set to true in order to use context based routing.
- Supported [OTTL] functions:
- [IsMatch](../../pkg/ottl/ottlfuncs/README.md#IsMatch)
- [delete_key](../../pkg/ottl/ottlfuncs/README.md#delete_key)
Expand Down
33 changes: 27 additions & 6 deletions processor/routingprocessor/extract.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"strings"

"go.opentelemetry.io/collector/client"
"go.uber.org/zap"
"google.golang.org/grpc/metadata"
)
Expand All @@ -32,14 +33,12 @@ func (e extractor) extractFromContext(ctx context.Context) string {
// right now, we only support looking up attributes from requests that have
// gone through the gRPC server in that case, it will add the HTTP headers
// as context metadata
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return ""
}
values, ok := e.extractFromGRPCContext(ctx)

// we have gRPC metadata in the context but does it have our key?
values, ok := md[strings.ToLower(e.fromAttr)]
if !ok {
values = e.extractFromHTTPContext(ctx)
}
if len(values) == 0 {
return ""
}

Expand All @@ -52,3 +51,25 @@ func (e extractor) extractFromContext(ctx context.Context) string {

return values[0]
}

func (e extractor) extractFromGRPCContext(ctx context.Context) ([]string, bool) {

md, ok := metadata.FromIncomingContext(ctx)

if !ok {
return nil, false
}

values, ok := md[strings.ToLower(e.fromAttr)]
if !ok {
return nil, false
}
return values, true
}

func (e extractor) extractFromHTTPContext(ctx context.Context) []string {
info := client.FromContext(ctx)
md := info.Metadata
values := md.Get(e.fromAttr)
return values
}
40 changes: 40 additions & 0 deletions processor/routingprocessor/extract_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/client"
"go.uber.org/zap"
"google.golang.org/grpc/metadata"
)
Expand Down Expand Up @@ -55,6 +56,45 @@ func TestExtractorForTraces_FromContext(t *testing.T) {
fromAttr: "X-Tenant",
expectedValue: "globex",
},
{
name: "value from existing HTTP attribute",
ctxFunc: func() context.Context {
return client.NewContext(context.Background(),
client.Info{Metadata: client.NewMetadata(map[string][]string{
"X-Tenant": {"acme"},
})})
},
fromAttr: "X-Tenant",
expectedValue: "acme",
},
{
name: "value from existing HTTP attribute: case insensitive",
ctxFunc: func() context.Context {
return client.NewContext(context.Background(),
client.Info{Metadata: client.NewMetadata(map[string][]string{
"X-Tenant": {"acme"},
})})
},
fromAttr: "x-tenant",
expectedValue: "acme",
},
{
name: "no values from empty context",
ctxFunc: context.Background,
fromAttr: "X-Tenant",
expectedValue: "",
},
{
name: "no values from existing HTTP attribute",
ctxFunc: func() context.Context {
return client.NewContext(context.Background(),
client.Info{Metadata: client.NewMetadata(map[string][]string{
"X-Tenant": {""},
})})
},
fromAttr: "X-Tenant",
expectedValue: "",
},
}

for _, tc := range testcases {
Expand Down
2 changes: 1 addition & 1 deletion processor/routingprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.19
require (
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.81.0
github.com/stretchr/testify v1.8.4
go.opentelemetry.io/collector v0.81.0
go.opentelemetry.io/collector/component v0.81.0
go.opentelemetry.io/collector/config/configgrpc v0.81.0
go.opentelemetry.io/collector/confmap v0.81.0
Expand Down Expand Up @@ -47,7 +48,6 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.81.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/collector v0.81.0 // indirect
go.opentelemetry.io/collector/config/configauth v0.81.0 // indirect
go.opentelemetry.io/collector/config/configcompression v0.81.0 // indirect
go.opentelemetry.io/collector/config/confignet v0.81.0 // indirect
Expand Down
36 changes: 34 additions & 2 deletions processor/routingprocessor/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/plog"
Expand Down Expand Up @@ -64,7 +65,7 @@ func TestLogs_RoutingWorks_Context(t *testing.T) {
rl := l.ResourceLogs().AppendEmpty()
rl.Resource().Attributes().PutStr("X-Tenant", "acme")

t.Run("non default route is properly used", func(t *testing.T) {
t.Run("grpc metadata: non default route is properly used", func(t *testing.T) {
assert.NoError(t, exp.ConsumeLogs(
metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{
"X-Tenant": "acme",
Expand All @@ -79,7 +80,7 @@ func TestLogs_RoutingWorks_Context(t *testing.T) {
)
})

t.Run("default route is taken when no matching route can be found", func(t *testing.T) {
t.Run("grpc metadata: default route is taken when no matching route can be found", func(t *testing.T) {
assert.NoError(t, exp.ConsumeLogs(
metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{
"X-Tenant": "some-custom-value1",
Expand All @@ -93,6 +94,37 @@ func TestLogs_RoutingWorks_Context(t *testing.T) {
"log should not be routed to non default exporter",
)
})

t.Run("client.Info metadata: non default route is properly used", func(t *testing.T) {
assert.NoError(t, exp.ConsumeLogs(
client.NewContext(context.Background(),
client.Info{Metadata: client.NewMetadata(map[string][]string{
"X-Tenant": {"acme"},
})}),
l,
))
assert.Len(t, defaultExp.AllLogs(), 1,
"log should not be routed to default exporter",
)
assert.Len(t, lExp.AllLogs(), 2,
"log should be routed to non default exporter",
)
})

t.Run("client.Info metadata: default route is taken when no matching route can be found", func(t *testing.T) {
assert.NoError(t, exp.ConsumeLogs(client.NewContext(context.Background(),
client.Info{Metadata: client.NewMetadata(map[string][]string{
"X-Tenant": {"some-custom-value1"},
})}),
l,
))
assert.Len(t, defaultExp.AllLogs(), 2,
"log should be routed to default exporter",
)
assert.Len(t, lExp.AllLogs(), 2,
"log should not be routed to non default exporter",
)
})
}

func TestLogs_RoutingWorks_ResourceAttribute(t *testing.T) {
Expand Down
38 changes: 36 additions & 2 deletions processor/routingprocessor/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/pmetric"
Expand Down Expand Up @@ -124,7 +125,7 @@ func TestMetrics_RoutingWorks_Context(t *testing.T) {
rm := m.ResourceMetrics().AppendEmpty()
rm.Resource().Attributes().PutStr("X-Tenant", "acme")

t.Run("non default route is properly used", func(t *testing.T) {
t.Run("grpc metadata: non default route is properly used", func(t *testing.T) {
assert.NoError(t, exp.ConsumeMetrics(
metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{
"X-Tenant": "acme",
Expand All @@ -139,7 +140,7 @@ func TestMetrics_RoutingWorks_Context(t *testing.T) {
)
})

t.Run("default route is taken when no matching route can be found", func(t *testing.T) {
t.Run("grpc metadata: default route is taken when no matching route can be found", func(t *testing.T) {
assert.NoError(t, exp.ConsumeMetrics(
metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{
"X-Tenant": "some-custom-value1",
Expand All @@ -153,6 +154,39 @@ func TestMetrics_RoutingWorks_Context(t *testing.T) {
"metric should not be routed to non default exporter",
)
})

t.Run("client.Info metadata: non default route is properly used", func(t *testing.T) {
assert.NoError(t, exp.ConsumeMetrics(
client.NewContext(context.Background(),
client.Info{Metadata: client.NewMetadata(map[string][]string{
"X-Tenant": {"acme"},
})}),
m,
))
assert.Len(t, defaultExp.AllMetrics(), 1,
"metric should not be routed to default exporter",
)
assert.Len(t, mExp.AllMetrics(), 2,
"metric should be routed to non default exporter",
)
})

t.Run("client.Info metadata: default route is taken when no matching route can be found", func(t *testing.T) {
assert.NoError(t, exp.ConsumeMetrics(
client.NewContext(context.Background(),
client.Info{Metadata: client.NewMetadata(map[string][]string{
"X-Tenant": {"some-custom-value1"},
})}),
m,
))
assert.Len(t, defaultExp.AllMetrics(), 2,
"metric should be routed to default exporter",
)
assert.Len(t, mExp.AllMetrics(), 2,
"metric should not be routed to non default exporter",
)
})

}

func TestMetrics_RoutingWorks_ResourceAttribute(t *testing.T) {
Expand Down
37 changes: 35 additions & 2 deletions processor/routingprocessor/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/consumer/consumertest"
Expand Down Expand Up @@ -168,7 +169,7 @@ func TestTraces_RoutingWorks_Context(t *testing.T) {
rs := tr.ResourceSpans().AppendEmpty()
rs.Resource().Attributes().PutStr("X-Tenant", "acme")

t.Run("non default route is properly used", func(t *testing.T) {
t.Run("grpc metadata: non default route is properly used", func(t *testing.T) {
assert.NoError(t, exp.ConsumeTraces(
metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{
"X-Tenant": "acme",
Expand All @@ -183,7 +184,7 @@ func TestTraces_RoutingWorks_Context(t *testing.T) {
)
})

t.Run("default route is taken when no matching route can be found", func(t *testing.T) {
t.Run("grpc metadata: default route is taken when no matching route can be found", func(t *testing.T) {
assert.NoError(t, exp.ConsumeTraces(
metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{
"X-Tenant": "some-custom-value1",
Expand All @@ -197,6 +198,38 @@ func TestTraces_RoutingWorks_Context(t *testing.T) {
"trace should not be routed to non default exporter",
)
})

t.Run("client.Info metadata: non default route is properly used", func(t *testing.T) {
assert.NoError(t, exp.ConsumeTraces(
client.NewContext(context.Background(),
client.Info{Metadata: client.NewMetadata(map[string][]string{
"X-Tenant": {"acme"},
})}),
tr,
))
assert.Len(t, defaultExp.AllTraces(), 1,
"trace should not be routed to default exporter",
)
assert.Len(t, tExp.AllTraces(), 2,
"trace should be routed to non default exporter",
)
})

t.Run("client.Info metadata: default route is taken when no matching route can be found", func(t *testing.T) {
assert.NoError(t, exp.ConsumeTraces(
client.NewContext(context.Background(),
client.Info{Metadata: client.NewMetadata(map[string][]string{
"X-Tenant": {"some-custom-value1"},
})}),
tr,
))
assert.Len(t, defaultExp.AllTraces(), 2,
"trace should be routed to default exporter",
)
assert.Len(t, tExp.AllTraces(), 2,
"trace should not be routed to non default exporter",
)
})
}

func TestTraces_RoutingWorks_ResourceAttribute(t *testing.T) {
Expand Down
Loading

0 comments on commit d6499e6

Please sign in to comment.