Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore] Add pipeline module #11209

Merged
merged 3 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .chloggen/add-pipeline-module.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: new_component

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: pipeline

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Adds new `pipeline` module to house the concept of pipeline ID and Signal.

# One or more tracking issues or pull requests related to the change
issues: [11209]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -303,11 +303,13 @@ check-contrib:
-replace go.opentelemetry.io/collector/extension/zpagesextension=$(CURDIR)/extension/zpagesextension \
-replace go.opentelemetry.io/collector/featuregate=$(CURDIR)/featuregate \
-replace go.opentelemetry.io/collector/internal/globalgates=$(CURDIR)/internal/globalgates \
-replace go.opentelemetry.io/collector/internal/globalsignal=$(CURDIR)/internal/globalsignal \
-replace go.opentelemetry.io/collector/otelcol=$(CURDIR)/otelcol \
-replace go.opentelemetry.io/collector/otelcol/otelcoltest=$(CURDIR)/otelcol/otelcoltest \
-replace go.opentelemetry.io/collector/pdata=$(CURDIR)/pdata \
-replace go.opentelemetry.io/collector/pdata/testdata=$(CURDIR)/pdata/testdata \
-replace go.opentelemetry.io/collector/pdata/pprofile=$(CURDIR)/pdata/pprofile \
-replace go.opentelemetry.io/collector/pipeline=$(CURDIR)/pipeline \
-replace go.opentelemetry.io/collector/processor=$(CURDIR)/processor \
-replace go.opentelemetry.io/collector/processor/batchprocessor=$(CURDIR)/processor/batchprocessor \
-replace go.opentelemetry.io/collector/processor/memorylimiterprocessor=$(CURDIR)/processor/memorylimiterprocessor \
Expand Down Expand Up @@ -369,11 +371,13 @@ restore-contrib:
-dropreplace go.opentelemetry.io/collector/extension/zpagesextension \
-dropreplace go.opentelemetry.io/collector/featuregate \
-dropreplace go.opentelemetry.io/collector/internal/globalgates \
-dropreplace go.opentelemetry.io/collector/internal/globalsignal \
-dropreplace go.opentelemetry.io/collector/otelcol \
-dropreplace go.opentelemetry.io/collector/otelcol/otelcoltest \
-dropreplace go.opentelemetry.io/collector/pdata \
-dropreplace go.opentelemetry.io/collector/pdata/testdata \
-dropreplace go.opentelemetry.io/collector/pdata/pprofile \
-dropreplace go.opentelemetry.io/collector/pipeline \
-dropreplace go.opentelemetry.io/collector/processor \
-dropreplace go.opentelemetry.io/collector/processor/batchprocessor \
-dropreplace go.opentelemetry.io/collector/processor/memorylimiterprocessor \
Expand Down
1 change: 1 addition & 0 deletions internal/globalsignal/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
11 changes: 11 additions & 0 deletions internal/globalsignal/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module go.opentelemetry.io/collector/internal/globalsignal

go 1.22.0

require github.com/stretchr/testify v1.9.0

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
10 changes: 10 additions & 0 deletions internal/globalsignal/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

50 changes: 50 additions & 0 deletions internal/globalsignal/signal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package globalsignal // import "go.opentelemetry.io/collector/internal/globalsignal"

import (
"fmt"
"regexp"
)

// Signal represents the signals supported by the collector.
type Signal struct {
name string
}

// String returns the string representation of the signal.
func (s Signal) String() string {
return s.name
}

// MarshalText marshals the Signal.
func (s Signal) MarshalText() (text []byte, err error) {
return []byte(s.name), nil
}

// signalRegex is used to validate the signal.
// A signal must consist of 1 to 62 lowercase ASCII alphabetic characters.
var signalRegex = regexp.MustCompile(`^[a-z]{1,62}$`)

// NewSignal creates a Signal. It returns an error if the Signal is invalid.
// A Signal must consist of 1 to 62 lowercase ASCII alphabetic characters.
func NewSignal(signal string) (Signal, error) {
if len(signal) == 0 {
return Signal{}, fmt.Errorf("signal must not be empty")
}
if !signalRegex.MatchString(signal) {
return Signal{}, fmt.Errorf("invalid character(s) in type %q", signal)
}
return Signal{name: signal}, nil
}

// MustNewSignal creates a Signal. It panics if the Signal is invalid.
// A signal must consist of 1 to 62 lowercase ASCII alphabetic characters.
func MustNewSignal(signal string) Signal {
s, err := NewSignal(signal)
if err != nil {
panic(err)

Check warning on line 47 in internal/globalsignal/signal.go

View check run for this annotation

Codecov / codecov/patch

internal/globalsignal/signal.go#L47

Added line #L47 was not covered by tests
}
return s
}
41 changes: 41 additions & 0 deletions internal/globalsignal/signal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package globalsignal

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func Test_NewSignal(t *testing.T) {
s, err := NewSignal("traces")
require.NoError(t, err)
assert.Equal(t, Signal{name: "traces"}, s)
}

func Test_NewSignal_Invalid(t *testing.T) {
_, err := NewSignal("")
require.Error(t, err)
_, err = NewSignal("TRACES")
require.Error(t, err)
}

func Test_MustNewSignal(t *testing.T) {
s := MustNewSignal("traces")
assert.Equal(t, Signal{name: "traces"}, s)
}

func Test_Signal_String(t *testing.T) {
s := MustNewSignal("traces")
assert.Equal(t, "traces", s.String())
}

func Test_Signal_MarshalText(t *testing.T) {
s := MustNewSignal("traces")
b, err := s.MarshalText()
require.NoError(t, err)
assert.Equal(t, []byte("traces"), b)
}
1 change: 1 addition & 0 deletions pipeline/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../Makefile.Common
16 changes: 16 additions & 0 deletions pipeline/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
module go.opentelemetry.io/collector/pipeline

go 1.22.0

require (
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/internal/globalsignal v0.109.0
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace go.opentelemetry.io/collector/internal/globalsignal => ../internal/globalsignal
10 changes: 10 additions & 0 deletions pipeline/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

131 changes: 131 additions & 0 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package pipeline // import "go.opentelemetry.io/collector/pipeline"
import (
"errors"
"fmt"
"regexp"
"strings"

"go.opentelemetry.io/collector/internal/globalsignal"
)

// typeAndNameSeparator is the separator that is used between type and name in type/name composite keys.
const typeAndNameSeparator = "/"

// ID represents the identity for a pipeline. It combines two values:
// * signal - the Signal of the pipeline.
// * name - the name of that pipeline.
type ID struct {
signal Signal `mapstructure:"-"`
name string `mapstructure:"-"`
}

// NewID returns a new ID with the given Signal and empty name.
func NewID(signal Signal) ID {
return ID{signal: signal}
}

// MustNewID builds a Signal and returns a new ID with the given Signal and empty name.
// It panics if the Signal is invalid.
// A signal must consist of 1 to 62 lowercase ASCII alphabetic characters.
func MustNewID(signal string) ID {
return ID{signal: globalsignal.MustNewSignal(signal)}
}

// NewIDWithName returns a new ID with the given Signal and name.
func NewIDWithName(signal Signal, name string) ID {
return ID{signal: signal, name: name}
}

// MustNewIDWithName builds a Signal and returns a new ID with the given Signal and name.
// It panics if the Signal is invalid or name is invalid.
// A signal must consist of 1 to 62 lowercase ASCII alphabetic characters.
// A name must consist of 1 to 1024 unicode characters excluding whitespace, control characters, and symbols.
func MustNewIDWithName(signal string, name string) ID {
id := ID{signal: globalsignal.MustNewSignal(signal)}
err := validateName(name)
if err != nil {
panic(err)

Check warning on line 50 in pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

pipeline/pipeline.go#L50

Added line #L50 was not covered by tests
}
id.name = name
return id
}

// Signal returns the Signal of the ID.
func (i ID) Signal() Signal {
return i.signal
}

// Name returns the name of the ID.
func (i ID) Name() string {
return i.name
}

// MarshalText implements the encoding.TextMarshaler interface.
// This marshals the Signal and name as one string in the config.
func (i ID) MarshalText() (text []byte, err error) {
return []byte(i.String()), nil
}

// UnmarshalText implements the encoding.TextUnmarshaler interface.
func (i *ID) UnmarshalText(text []byte) error {
idStr := string(text)
items := strings.SplitN(idStr, typeAndNameSeparator, 2)
var signalStr, nameStr string
if len(items) >= 1 {
signalStr = strings.TrimSpace(items[0])
}

if len(items) == 1 && signalStr == "" {
return errors.New("id must not be empty")
}

if signalStr == "" {
return fmt.Errorf("in %q id: the part before %s should not be empty", idStr, typeAndNameSeparator)
}

if len(items) > 1 {
// "name" part is present.
nameStr = strings.TrimSpace(items[1])
if nameStr == "" {
return fmt.Errorf("in %q id: the part after %s should not be empty", idStr, typeAndNameSeparator)
}
if err := validateName(nameStr); err != nil {
return fmt.Errorf("in %q id: %w", nameStr, err)
}
}

var err error
if i.signal, err = globalsignal.NewSignal(signalStr); err != nil {
return fmt.Errorf("in %q id: %w", idStr, err)
}
i.name = nameStr

return nil
}

// String returns the ID string representation as "signal[/name]" format.
func (i ID) String() string {
if i.name == "" {
return i.signal.String()
}

return i.signal.String() + typeAndNameSeparator + i.name
}

// nameRegexp is used to validate the name of an ID. A name can consist of
// 1 to 1024 unicode characters excluding whitespace, control characters, and
// symbols.
var nameRegexp = regexp.MustCompile(`^[^\pZ\pC\pS]+$`)

func validateName(nameStr string) error {
if len(nameStr) > 1024 {
return fmt.Errorf("name %q is longer than 1024 characters (%d characters)", nameStr, len(nameStr))
}
if !nameRegexp.MatchString(nameStr) {
return fmt.Errorf("invalid character(s) in name %q", nameStr)
}
return nil
}
Loading
Loading