Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[component] Try adding PipelineID #10947

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -1383,8 +1383,8 @@ and hope to make a v1.0.0 release soon.
- `config`: Deprecate multiple types and funcs in `config` package (#6422)
- config.ComponentID => component.ID
- config.Type => component.Type
- config.DataType => component.DataType
- config.[Traces|Metrics|Logs]DataType => component.DataType[Traces|Metrics|Logs]
- config.DataType => component.Signal
- config.[Traces|Metrics|Logs]DataType => component.Signal[Traces|Metrics|Logs]
- config.Receiver => component.ReceiverConfig
- config.UnmarshalReceiver => component.UnmarshalReceiverConfig
- config.Processor => component.ProcessorConfig
Expand Down
18 changes: 9 additions & 9 deletions cmd/mdatagen/templates/component_test.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func TestComponentLifecycle(t *testing.T) {
{
name: "logs_to_logs",
createFn: func(ctx context.Context, set connector.Settings, cfg component.Config) (component.Component, error) {
router := connector.NewLogsRouter(map[component.ID]consumer.Logs{component.NewID(component.DataTypeLogs): consumertest.NewNop()})
router := connector.NewLogsRouter(map[component.PipelineID]consumer.Logs{component.NewPipelineID(component.SignalLogs): consumertest.NewNop()})
return factory.CreateLogsToLogs(ctx, set, cfg, router)
},
},
Expand All @@ -381,7 +381,7 @@ func TestComponentLifecycle(t *testing.T) {
{
name: "logs_to_metrics",
createFn: func(ctx context.Context, set connector.Settings, cfg component.Config) (component.Component, error) {
router := connector.NewMetricsRouter(map[component.ID]consumer.Metrics{component.NewID(component.DataTypeMetrics): consumertest.NewNop()})
router := connector.NewMetricsRouter(map[component.PipelineID]consumer.Metrics{component.NewPipelineID(component.SignalMetrics): consumertest.NewNop()})
return factory.CreateLogsToMetrics(ctx, set, cfg, router)
},
},
Expand All @@ -390,7 +390,7 @@ func TestComponentLifecycle(t *testing.T) {
{
name: "logs_to_traces",
createFn: func(ctx context.Context, set connector.Settings, cfg component.Config) (component.Component, error) {
router := connector.NewTracesRouter(map[component.ID]consumer.Traces{component.NewID(component.DataTypeTraces): consumertest.NewNop()})
router := connector.NewTracesRouter(map[component.PipelineID]consumer.Traces{component.NewPipelineID(component.SignalTraces): consumertest.NewNop()})
return factory.CreateLogsToTraces(ctx, set, cfg, router)
},
},
Expand All @@ -399,7 +399,7 @@ func TestComponentLifecycle(t *testing.T) {
{
name: "metrics_to_logs",
createFn: func(ctx context.Context, set connector.Settings, cfg component.Config) (component.Component, error) {
router := connector.NewLogsRouter(map[component.ID]consumer.Logs{component.NewID(component.DataTypeLogs): consumertest.NewNop()})
router := connector.NewLogsRouter(map[component.PipelineID]consumer.Logs{component.NewPipelineID(component.SignalLogs): consumertest.NewNop()})
return factory.CreateMetricsToLogs(ctx, set, cfg, router)
},
},
Expand All @@ -408,7 +408,7 @@ func TestComponentLifecycle(t *testing.T) {
{
name: "metrics_to_metrics",
createFn: func(ctx context.Context, set connector.Settings, cfg component.Config) (component.Component, error) {
router := connector.NewMetricsRouter(map[component.ID]consumer.Metrics{component.NewID(component.DataTypeMetrics): consumertest.NewNop()})
router := connector.NewMetricsRouter(map[component.PipelineID]consumer.Metrics{component.NewPipelineID(component.SignalMetrics): consumertest.NewNop()})
return factory.CreateMetricsToMetrics(ctx, set, cfg, router)
},
},
Expand All @@ -417,7 +417,7 @@ func TestComponentLifecycle(t *testing.T) {
{
name: "metrics_to_traces",
createFn: func(ctx context.Context, set connector.Settings, cfg component.Config) (component.Component, error) {
router := connector.NewTracesRouter(map[component.ID]consumer.Traces{component.NewID(component.DataTypeTraces): consumertest.NewNop()})
router := connector.NewTracesRouter(map[component.PipelineID]consumer.Traces{component.NewPipelineID(component.SignalTraces): consumertest.NewNop()})
return factory.CreateMetricsToTraces(ctx, set, cfg, router)
},
},
Expand All @@ -426,7 +426,7 @@ func TestComponentLifecycle(t *testing.T) {
{
name: "traces_to_logs",
createFn: func(ctx context.Context, set connector.Settings, cfg component.Config) (component.Component, error) {
router := connector.NewLogsRouter(map[component.ID]consumer.Logs{component.NewID(component.DataTypeLogs): consumertest.NewNop()})
router := connector.NewLogsRouter(map[component.PipelineID]consumer.Logs{component.NewPipelineID(component.SignalLogs): consumertest.NewNop()})
return factory.CreateTracesToLogs(ctx, set, cfg, router)
},
},
Expand All @@ -435,7 +435,7 @@ func TestComponentLifecycle(t *testing.T) {
{
name: "traces_to_metrics",
createFn: func(ctx context.Context, set connector.Settings, cfg component.Config) (component.Component, error) {
router := connector.NewMetricsRouter(map[component.ID]consumer.Metrics{component.NewID(component.DataTypeMetrics): consumertest.NewNop()})
router := connector.NewMetricsRouter(map[component.PipelineID]consumer.Metrics{component.NewPipelineID(component.SignalMetrics): consumertest.NewNop()})
return factory.CreateTracesToMetrics(ctx, set, cfg, router)
},
},
Expand All @@ -444,7 +444,7 @@ func TestComponentLifecycle(t *testing.T) {
{
name: "traces_to_traces",
createFn: func(ctx context.Context, set connector.Settings, cfg component.Config) (component.Component, error) {
router := connector.NewTracesRouter(map[component.ID]consumer.Traces{component.NewID(component.DataTypeTraces): consumertest.NewNop()})
router := connector.NewTracesRouter(map[component.PipelineID]consumer.Traces{component.NewPipelineID(component.SignalTraces): consumertest.NewNop()})
return factory.CreateTracesToTraces(ctx, set, cfg, router)
},
},
Expand Down
9 changes: 2 additions & 7 deletions component/componentprofiles/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,6 @@ package componentprofiles // import "go.opentelemetry.io/collector/component/com

import "go.opentelemetry.io/collector/component"

func mustNewDataType(strType string) component.DataType {
return component.MustNewType(strType)
}

var (
// DataTypeProfiles is the data type tag for profiles.
DataTypeProfiles = mustNewDataType("profiles")
const (
SignalProfiles = component.Signal("profiles")
)
10 changes: 5 additions & 5 deletions component/componentstatus/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type InstanceID struct {
}

// NewInstanceID returns an ID that uniquely identifies a component.
func NewInstanceID(componentID component.ID, kind component.Kind, pipelineIDs ...component.ID) *InstanceID {
func NewInstanceID(componentID component.ID, kind component.Kind, pipelineIDs ...component.PipelineID) *InstanceID {
instanceID := &InstanceID{
componentID: componentID,
kind: kind,
Expand All @@ -47,14 +47,14 @@ func (id *InstanceID) Kind() component.Kind {

// AllPipelineIDs calls f for each pipeline this instance is associated with. If
// f returns false it will stop iteration.
func (id *InstanceID) AllPipelineIDs(f func(component.ID) bool) {
func (id *InstanceID) AllPipelineIDs(f func(component.PipelineID) bool) {
var bs []byte
for _, b := range []byte(id.pipelineIDs) {
if b != pipelineDelim {
bs = append(bs, b)
continue
}
pipelineID := component.ID{}
pipelineID := component.PipelineID{}
err := pipelineID.UnmarshalText(bs)
bs = bs[:0]
if err != nil {
Expand All @@ -68,7 +68,7 @@ func (id *InstanceID) AllPipelineIDs(f func(component.ID) bool) {

// WithPipelines returns a new InstanceID updated to include the given
// pipelineIDs.
func (id *InstanceID) WithPipelines(pipelineIDs ...component.ID) *InstanceID {
func (id *InstanceID) WithPipelines(pipelineIDs ...component.PipelineID) *InstanceID {
instanceID := &InstanceID{
componentID: id.componentID,
kind: id.kind,
Expand All @@ -78,7 +78,7 @@ func (id *InstanceID) WithPipelines(pipelineIDs ...component.ID) *InstanceID {
return instanceID
}

func (id *InstanceID) addPipelines(pipelineIDs []component.ID) {
func (id *InstanceID) addPipelines(pipelineIDs []component.PipelineID) {
delim := string(pipelineDelim)
strIDs := strings.Split(id.pipelineIDs, delim)
for _, pID := range pipelineIDs {
Expand Down
32 changes: 16 additions & 16 deletions component/componentstatus/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@ import (

func TestInstanceID(t *testing.T) {
traces := component.MustNewID("traces")
tracesA := component.MustNewIDWithName("traces", "a")
tracesB := component.MustNewIDWithName("traces", "b")
tracesC := component.MustNewIDWithName("traces", "c")
tracesA := component.NewPipelineIDWithName("traces", "a")
tracesB := component.NewPipelineIDWithName("traces", "b")
tracesC := component.NewPipelineIDWithName("traces", "c")

idTracesA := NewInstanceID(traces, component.KindReceiver, tracesA)
idTracesAll := NewInstanceID(traces, component.KindReceiver, tracesA, tracesB, tracesC)
assert.NotEqual(t, idTracesA, idTracesAll)

assertHasPipelines := func(t *testing.T, instanceID *InstanceID, expectedPipelineIDs []component.ID) {
var pipelineIDs []component.ID
instanceID.AllPipelineIDs(func(id component.ID) bool {
assertHasPipelines := func(t *testing.T, instanceID *InstanceID, expectedPipelineIDs []component.PipelineID) {
var pipelineIDs []component.PipelineID
instanceID.AllPipelineIDs(func(id component.PipelineID) bool {
pipelineIDs = append(pipelineIDs, id)
return true
})
Expand All @@ -34,31 +34,31 @@ func TestInstanceID(t *testing.T) {
name string
id1 *InstanceID
id2 *InstanceID
pipelineIDs []component.ID
pipelineIDs []component.PipelineID
}{
{
name: "equal instances",
id1: idTracesA,
id2: NewInstanceID(traces, component.KindReceiver, tracesA),
pipelineIDs: []component.ID{tracesA},
pipelineIDs: []component.PipelineID{tracesA},
},
{
name: "equal instances - out of order",
id1: idTracesAll,
id2: NewInstanceID(traces, component.KindReceiver, tracesC, tracesB, tracesA),
pipelineIDs: []component.ID{tracesA, tracesB, tracesC},
pipelineIDs: []component.PipelineID{tracesA, tracesB, tracesC},
},
{
name: "with pipelines",
id1: idTracesAll,
id2: idTracesA.WithPipelines(tracesB, tracesC),
pipelineIDs: []component.ID{tracesA, tracesB, tracesC},
pipelineIDs: []component.PipelineID{tracesA, tracesB, tracesC},
},
{
name: "with pipelines - out of order",
id1: idTracesAll,
id2: idTracesA.WithPipelines(tracesC, tracesB),
pipelineIDs: []component.ID{tracesA, tracesB, tracesC},
pipelineIDs: []component.PipelineID{tracesA, tracesB, tracesC},
},
} {
t.Run(tc.name, func(t *testing.T) {
Expand All @@ -73,20 +73,20 @@ func TestAllPipelineIDs(t *testing.T) {
instanceID := NewInstanceID(
component.MustNewID("traces"),
component.KindReceiver,
component.MustNewIDWithName("traces", "a"),
component.MustNewIDWithName("traces", "b"),
component.MustNewIDWithName("traces", "c"),
component.NewPipelineIDWithName("traces", "a"),
component.NewPipelineIDWithName("traces", "b"),
component.NewPipelineIDWithName("traces", "c"),
)

count := 0
instanceID.AllPipelineIDs(func(component.ID) bool {
instanceID.AllPipelineIDs(func(component.PipelineID) bool {
count++
return true
})
assert.Equal(t, 3, count)

count = 0
instanceID.AllPipelineIDs(func(component.ID) bool {
instanceID.AllPipelineIDs(func(component.PipelineID) bool {
count++
return false
})
Expand Down
45 changes: 45 additions & 0 deletions component/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package component // import "go.opentelemetry.io/collector/component"

import (
"errors"
"fmt"
"reflect"
"regexp"
Expand Down Expand Up @@ -147,6 +148,8 @@

// DataType is a special Type that represents the data types supported by the collector. We currently support
// collecting metrics, traces and logs, this can expand in the future.
//
// Deprecated: [v0.108.0] Use Signal instead
type DataType = Type

func mustNewDataType(strType string) DataType {
Expand All @@ -156,12 +159,18 @@
// Currently supported data types. Add new data types here when new types are supported in the future.
var (
// DataTypeTraces is the data type tag for traces.
//
// Deprecated: [v0.108.0] Use SignalTraces instead
DataTypeTraces = mustNewDataType("traces")

// DataTypeMetrics is the data type tag for metrics.
//
// Deprecated: [v0.108.0] Use SignalMetrics instead
DataTypeMetrics = mustNewDataType("metrics")

// DataTypeLogs is the data type tag for logs.
//
// Deprecated: [v0.108.0] Use SignalLogs instead
DataTypeLogs = mustNewDataType("logs")
)

Expand All @@ -179,3 +188,39 @@
}
return nil
}

type Signal string

const (
SignalTraces Signal = "traces"
SignalMetrics Signal = "metrics"
SignalLogs Signal = "logs"
)

func (s Signal) String() string {
return string(s)

Check warning on line 201 in component/config.go

View check run for this annotation

Codecov / codecov/patch

component/config.go#L200-L201

Added lines #L200 - L201 were not covered by tests
}

func (s Signal) MarshalText() (text []byte, err error) {
return []byte(s), nil

Check warning on line 205 in component/config.go

View check run for this annotation

Codecov / codecov/patch

component/config.go#L204-L205

Added lines #L204 - L205 were not covered by tests
}

func (s *Signal) UnmarshalText(text []byte) error {
if len(text) == 0 {
return errors.New("id must not be empty")

Check warning on line 210 in component/config.go

View check run for this annotation

Codecov / codecov/patch

component/config.go#L208-L210

Added lines #L208 - L210 were not covered by tests
}
strText := string(text)
switch strText {
case "traces":
*s = SignalTraces
return nil
case "metrics":
*s = SignalMetrics
return nil
case "logs":
*s = SignalLogs
return nil
default:
return fmt.Errorf("invalid signal %q", strText)

Check warning on line 224 in component/config.go

View check run for this annotation

Codecov / codecov/patch

component/config.go#L212-L224

Added lines #L212 - L224 were not covered by tests
}
}
71 changes: 71 additions & 0 deletions component/identifiable.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,74 @@

return id.typeVal.String() + typeAndNameSeparator + id.nameVal
}

type PipelineID struct {
signal Signal `mapstructure:"-"`
nameVal string `mapstructure:"-"`
}

func NewPipelineID(signal Signal) PipelineID {
return PipelineID{signal: signal}

Check warning on line 114 in component/identifiable.go

View check run for this annotation

Codecov / codecov/patch

component/identifiable.go#L113-L114

Added lines #L113 - L114 were not covered by tests
}

func NewPipelineIDWithName(signal Signal, nameVal string) PipelineID {
return PipelineID{signal: signal, nameVal: nameVal}

Check warning on line 118 in component/identifiable.go

View check run for this annotation

Codecov / codecov/patch

component/identifiable.go#L117-L118

Added lines #L117 - L118 were not covered by tests
}

func (p PipelineID) Signal() Signal {
return p.signal

Check warning on line 122 in component/identifiable.go

View check run for this annotation

Codecov / codecov/patch

component/identifiable.go#L121-L122

Added lines #L121 - L122 were not covered by tests
}

func (p PipelineID) Name() string {
return p.nameVal

Check warning on line 126 in component/identifiable.go

View check run for this annotation

Codecov / codecov/patch

component/identifiable.go#L125-L126

Added lines #L125 - L126 were not covered by tests
}

func (p PipelineID) MarshalText() (text []byte, err error) {
return []byte(p.String()), nil

Check warning on line 130 in component/identifiable.go

View check run for this annotation

Codecov / codecov/patch

component/identifiable.go#L129-L130

Added lines #L129 - L130 were not covered by tests
}

// UnmarshalText implements the encoding.TextUnmarshaler interface.
func (p *PipelineID) UnmarshalText(text []byte) error {
pipelineStr := string(text)
items := strings.SplitN(pipelineStr, typeAndNameSeparator, 2)
var signalStr, nameStr string
if len(items) >= 1 {
signalStr = strings.TrimSpace(items[0])

Check warning on line 139 in component/identifiable.go

View check run for this annotation

Codecov / codecov/patch

component/identifiable.go#L134-L139

Added lines #L134 - L139 were not covered by tests
}

if len(items) == 1 && signalStr == "" {
return errors.New("id must not be empty")

Check warning on line 143 in component/identifiable.go

View check run for this annotation

Codecov / codecov/patch

component/identifiable.go#L142-L143

Added lines #L142 - L143 were not covered by tests
}

if signalStr == "" {
return fmt.Errorf("in %q id: the part before %s should not be empty", pipelineStr, typeAndNameSeparator)

Check warning on line 147 in component/identifiable.go

View check run for this annotation

Codecov / codecov/patch

component/identifiable.go#L146-L147

Added lines #L146 - L147 were not covered by tests
}

err := p.signal.UnmarshalText([]byte(signalStr))
if err != nil {
return err

Check warning on line 152 in component/identifiable.go

View check run for this annotation

Codecov / codecov/patch

component/identifiable.go#L150-L152

Added lines #L150 - L152 were not covered by tests
}

if len(items) > 1 {

Check warning on line 155 in component/identifiable.go

View check run for this annotation

Codecov / codecov/patch

component/identifiable.go#L155

Added line #L155 was not covered by tests
// "name" part is present.
nameStr = strings.TrimSpace(items[1])
if nameStr == "" {
return fmt.Errorf("in %q id: the part after %s should not be empty", pipelineStr, typeAndNameSeparator)

Check warning on line 159 in component/identifiable.go

View check run for this annotation

Codecov / codecov/patch

component/identifiable.go#L157-L159

Added lines #L157 - L159 were not covered by tests
}
if err := validateName(nameStr); err != nil {
return fmt.Errorf("in %q id: %w", nameStr, err)

Check warning on line 162 in component/identifiable.go

View check run for this annotation

Codecov / codecov/patch

component/identifiable.go#L161-L162

Added lines #L161 - L162 were not covered by tests
}
}
p.nameVal = nameStr

Check warning on line 165 in component/identifiable.go

View check run for this annotation

Codecov / codecov/patch

component/identifiable.go#L165

Added line #L165 was not covered by tests

return nil

Check warning on line 167 in component/identifiable.go

View check run for this annotation

Codecov / codecov/patch

component/identifiable.go#L167

Added line #L167 was not covered by tests
}

// String returns the ID string representation as "type[/name]" format.
func (p PipelineID) String() string {
if p.nameVal == "" {
return p.signal.String()

Check warning on line 173 in component/identifiable.go

View check run for this annotation

Codecov / codecov/patch

component/identifiable.go#L171-L173

Added lines #L171 - L173 were not covered by tests
}

return p.signal.String() + typeAndNameSeparator + p.nameVal

Check warning on line 176 in component/identifiable.go

View check run for this annotation

Codecov / codecov/patch

component/identifiable.go#L176

Added line #L176 was not covered by tests
}
Loading
Loading