Skip to content

Commit

Permalink
[processor/schematransformerprocessor] Initial PR (open-telemetry#8371)
Browse files Browse the repository at this point in the history
**Description:**
As part of the [accepted specification](https://github.com/open-telemetry/opentelemetry-specification/blob/e16e1a781bdf62a8fcfa1a12963844749de23e7a/specification/schemas/overview.md#collector-assisted-schema-transformation), there needs to be a processor within the collector that is able to allow for transformations of semantic convention versions . This is my initial draft, I wrote this before https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/8344/files shared their solution to it.

**Link to tracking Issue:**
open-telemetry#8495
Original raised in the collector but should be shifted to the contrib a I can tell.

**Testing:**
It currently does nothing and there is the initial testing set up within the components.

**Documentation:** 
Added a minimal README, will expand upon it once more work has been added to the collector.
  • Loading branch information
MovieStoreGuy authored and djaglowski committed May 10, 2022
1 parent f8e8a0e commit c208b10
Show file tree
Hide file tree
Showing 20 changed files with 2,029 additions and 0 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ processor/resourcedetectionprocessor/ @open-telemetry/collector-c
processor/resourceprocessor @open-telemetry/collector-contrib-approvers @dmitryax
processor/resourcedetectionprocessor/internal/azure @open-telemetry/collector-contrib-approvers @mx-psi
processor/routingprocessor/ @open-telemetry/collector-contrib-approvers @jpkrohling
processor/schemaprocessor/ @open-telemetry/collector-contrib-approvers @MovieStoreGuy
processor/spanmetricsprocessor/ @open-telemetry/collector-contrib-approvers @albertteoh
processor/spanprocessor/ @open-telemetry/collector-contrib-approvers @boostchicken @pmm-sumo
processor/tailsamplingprocessor/ @open-telemetry/collector-contrib-approvers @jpkrohling
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

### 🚀 New components 🚀

- `schemaprocessor`: Starting the initial work to allow from translating from semantic convention to another (#8371)

### 💡 Enhancements 💡

- `k8sclusterreceiver`: Validate that k8s API supports a resource before setting up a watcher for it (#9523)
Expand Down
2 changes: 2 additions & 0 deletions cmd/configschema/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,8 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/reso

replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/routingprocessor => ../../processor/routingprocessor/

replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor => ../../processor/schemaprocessor

replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanmetricsprocessor => ../../processor/spanmetricsprocessor/

replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanprocessor => ../../processor/spanprocessor/
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,8 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/tail

replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor => ./processor/transformprocessor

replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor => ./processor/schemaprocessor

replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/activedirectorydsreceiver => ./receiver/activedirectorydsreceiver

replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver => ./receiver/awscontainerinsightreceiver
Expand Down
378 changes: 378 additions & 0 deletions go.sum

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions processor/schemaprocessor/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
54 changes: 54 additions & 0 deletions processor/schemaprocessor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Schema Transformer Processor

🚧 _Currently under development, subject to change_ 🚧

Supported Pipelines: traces, metrics, logs

The _Schema Processor_ is used to convert existing telemetry data or signals to a version of the semantic convention defined as part of the configuration.
The processor works by using a set of target schema URLs that are used to match incoming signals.
On a match, the processor will fetch the schema translation file (if not cached) set by the incoming signal and apply the transformations
required to export as the target semantic convention version.

Furthermore, it is also possible for organisations and vendors to publish their own semantic conventions and be used by this processor,
be sure to follow [schema overview](https://opentelemetry.io/docs/reference/specification/schemas/overview/) for all the details.

## Caching Schema Translation Files

In order to improve efficiency of the processor, the `prefetch` option allows the processor to start downloading and preparing
the translations needed for signals that match the schema URL.

## Schema Formats

A schema URl is made up in two parts, _Schema Family_ and _Schema Version_, the schema URL is broken down like so:

```text
| Schema URL |
| https://example.com/telemetry/schemas/ | | 1.0.1 |
| Schema Family | | Schema Version |
```

The final path in the schema URL _MUST_ be the schema version and the preceding portion of the URL is the _Schema Family_.
To read about schema formats, please read more [here](https://opentelemetry.io/docs/reference/specification/schemas/overview/#schema-url)

## Targets Schemas

Targets define a set of schema URLs with a schema identifier that will be used to translate any schema URL that matches the target URL to that version.
In the event that the processor matches a signal to a target, the processor will translate the signal from the published one to the defined identifier;
for example using the configuration below, a signal published with the `https://opentelemetry.io/schemas/1.8.0` schema will be translated
by the collector to the `https//opentelemetry.io/schemas/1.6.1` schema.
Within the schema targets, no duplicate schema families are allowed and will report an error if detected.


# Example

```yaml
processors:
schema:
prefetch:
- https://opentelemetry.io/schemas/1.9.0
targets:
- https://opentelemetry.io/schemas/1.6.1
- http://example.com/telemetry/schemas/1.0.1
```
For more complete examples, please refer to [config.yml](./testdata/config.yml).
74 changes: 74 additions & 0 deletions processor/schemaprocessor/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package schemaprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor"

import (
"errors"
"fmt"

"go.opentelemetry.io/collector/config"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/schema"
)

var (
errRequiresTargets = errors.New("requires schema targets")
errDuplicateTargets = errors.New("duplicate targets detected")
)

// Config defines the user provided values for the Schema Processor
type Config struct {
config.ProcessorSettings `mapstructure:",squash"`

// PreCache is a list of schema URLs that are downloaded
// and cached at the start of the collector runtime
// in order to avoid fetching data that later on could
// block processing of signals. (Optional field)
Prefetch []string `mapstructure:"prefetch"`

// Targets define what schema families should be
// translated to, allowing older and newer formats
// to conform to the target schema identifier.
Targets []string `mapstructure:"targets"`
}

func (c *Config) Validate() error {
for _, schemaURL := range c.Prefetch {
_, _, err := schema.GetFamilyAndIdentifier(schemaURL)
if err != nil {
return err
}
}
// Not strictly needed since it would just pass on
// any data that doesn't match targets, however defining
// this processor with no targets is wasteful.
if len(c.Targets) == 0 {
return fmt.Errorf("no schema targets defined: %w", errRequiresTargets)
}

families := make(map[string]struct{})
for _, target := range c.Targets {
family, _, err := schema.GetFamilyAndIdentifier(target)
if err != nil {
return err
}
if _, exist := families[family]; exist {
return errDuplicateTargets
}
families[family] = struct{}{}
}

return nil
}
98 changes: 98 additions & 0 deletions processor/schemaprocessor/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package schemaprocessor

import (
"path"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/service/servicetest"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/schema"
)

func TestLoadConfig(t *testing.T) {
t.Parallel()

factories, err := componenttest.NopFactories()
require.NoError(t, err, "Must not error on creating Nop factories")

factory := NewFactory()
factories.Processors[typeStr] = factory

cfg, err := servicetest.LoadConfigAndValidate(path.Join("testdata", "config.yml"), factories)
require.NoError(t, err, "Must not error when loading configuration")

pcfg := cfg.Processors[config.NewComponentIDWithName(typeStr, "with-all-options")]
assert.Equal(t, pcfg, &Config{
ProcessorSettings: config.NewProcessorSettings(config.NewComponentIDWithName(typeStr, "with-all-options")),
Prefetch: []string{
"https://opentelemetry.io/schemas/1.9.0",
},
Targets: []string{
"https://opentelemetry.io/schemas/1.4.2",
"https://example.com/otel/schemas/1.2.0",
},
})
}

func TestConfigurationValidation(t *testing.T) {
t.Parallel()

tests := []struct {
scenario string
target []string
expectError error
}{
{scenario: "No targets", target: nil, expectError: errRequiresTargets},
{
scenario: "One target of incomplete schema family",
target: []string{"opentelemetry.io/schemas/1.0.0"},
expectError: schema.ErrInvalidFamily,
},
{
scenario: "One target of incomplete schema identifier",
target: []string{"https://opentelemetry.io/schemas/1"},
expectError: schema.ErrInvalidIdentifier,
},
{
scenario: "Valid target(s)",
target: []string{
"https://opentelemetry.io/schemas/1.9.0",
},
expectError: nil,
},
{
scenario: "Duplicate targets",
target: []string{
"https://opentelemetry.io/schemas/1.9.0",
"https://opentelemetry.io/schemas/1.0.0",
},
expectError: errDuplicateTargets,
},
}

for _, tc := range tests {
cfg := &Config{
Targets: tc.target,
}

assert.ErrorIs(t, cfg.Validate(), tc.expectError, tc.scenario)
}
}
107 changes: 107 additions & 0 deletions processor/schemaprocessor/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package schemaprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor"

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/processor/processorhelper"
)

const typeStr = "schema"

var processorCapabilities = consumer.Capabilities{MutatesData: true}

// factory will store any of the precompiled schemas in future
type factory struct{}

// newDefaultConfiguration returns the configuration for schema transformer processor
// with the default values being used throughout it
func newDefaultConfiguration() config.Processor {
return &Config{
ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)),
}
}

func NewFactory() component.ProcessorFactory {
f := &factory{}
return component.NewProcessorFactory(
typeStr,
newDefaultConfiguration,
component.WithLogsProcessor(f.createLogsProcessor),
component.WithMetricsProcessor(f.createMetricsProcessor),
component.WithTracesProcessor(f.createTracesProcessor),
)
}

func (f factory) createLogsProcessor(
ctx context.Context,
set component.ProcessorCreateSettings,
cfg config.Processor,
next consumer.Logs,
) (component.LogsProcessor, error) {
transformer, err := newTransformer(ctx, cfg, set)
if err != nil {
return nil, err
}
return processorhelper.NewLogsProcessor(
cfg,
next,
transformer.processLogs,
processorhelper.WithCapabilities(processorCapabilities),
processorhelper.WithStart(transformer.start),
)
}

func (f factory) createMetricsProcessor(
ctx context.Context,
set component.ProcessorCreateSettings,
cfg config.Processor,
next consumer.Metrics,
) (component.MetricsProcessor, error) {
transformer, err := newTransformer(ctx, cfg, set)
if err != nil {
return nil, err
}
return processorhelper.NewMetricsProcessor(
cfg,
next,
transformer.processMetrics,
processorhelper.WithCapabilities(processorCapabilities),
processorhelper.WithStart(transformer.start),
)
}

func (f factory) createTracesProcessor(
ctx context.Context,
set component.ProcessorCreateSettings,
cfg config.Processor,
next consumer.Traces,
) (component.TracesProcessor, error) {
transformer, err := newTransformer(ctx, cfg, set)
if err != nil {
return nil, err
}
return processorhelper.NewTracesProcessor(
cfg,
next,
transformer.processTraces,
processorhelper.WithCapabilities(processorCapabilities),
processorhelper.WithStart(transformer.start),
)
}
15 changes: 15 additions & 0 deletions processor/schemaprocessor/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package schemaprocessor_test
Loading

0 comments on commit c208b10

Please sign in to comment.