Skip to content

Commit

Permalink
Start adding PipelineID
Browse files Browse the repository at this point in the history
  • Loading branch information
TylerHelmuth committed Sep 9, 2024
1 parent 6928951 commit 0d37132
Show file tree
Hide file tree
Showing 62 changed files with 803 additions and 692 deletions.
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -1383,8 +1383,8 @@ and hope to make a v1.0.0 release soon.
- `config`: Deprecate multiple types and funcs in `config` package (#6422)
- config.ComponentID => component.ID
- config.Type => component.Type
- config.DataType => component.DataType
- config.[Traces|Metrics|Logs]DataType => component.DataType[Traces|Metrics|Logs]
- config.DataType => component.Signal
- config.[Traces|Metrics|Logs]DataType => component.Signal[Traces|Metrics|Logs]
- config.Receiver => component.ReceiverConfig
- config.UnmarshalReceiver => component.UnmarshalReceiverConfig
- config.Processor => component.ProcessorConfig
Expand Down
18 changes: 9 additions & 9 deletions cmd/mdatagen/templates/component_test.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func TestComponentLifecycle(t *testing.T) {
{
name: "logs_to_logs",
createFn: func(ctx context.Context, set connector.Settings, cfg component.Config) (component.Component, error) {
router := connector.NewLogsRouter(map[component.ID]consumer.Logs{component.NewID(component.DataTypeLogs): consumertest.NewNop()})
router := connector.NewLogsRouter(map[component.PipelineID]consumer.Logs{component.NewPipelineID(component.SignalLogs): consumertest.NewNop()})
return factory.CreateLogsToLogs(ctx, set, cfg, router)
},
},
Expand All @@ -381,7 +381,7 @@ func TestComponentLifecycle(t *testing.T) {
{
name: "logs_to_metrics",
createFn: func(ctx context.Context, set connector.Settings, cfg component.Config) (component.Component, error) {
router := connector.NewMetricsRouter(map[component.ID]consumer.Metrics{component.NewID(component.DataTypeMetrics): consumertest.NewNop()})
router := connector.NewMetricsRouter(map[component.PipelineID]consumer.Metrics{component.NewPipelineID(component.SignalMetrics): consumertest.NewNop()})
return factory.CreateLogsToMetrics(ctx, set, cfg, router)
},
},
Expand All @@ -390,7 +390,7 @@ func TestComponentLifecycle(t *testing.T) {
{
name: "logs_to_traces",
createFn: func(ctx context.Context, set connector.Settings, cfg component.Config) (component.Component, error) {
router := connector.NewTracesRouter(map[component.ID]consumer.Traces{component.NewID(component.DataTypeTraces): consumertest.NewNop()})
router := connector.NewTracesRouter(map[component.PipelineID]consumer.Traces{component.NewPipelineID(component.SignalTraces): consumertest.NewNop()})
return factory.CreateLogsToTraces(ctx, set, cfg, router)
},
},
Expand All @@ -399,7 +399,7 @@ func TestComponentLifecycle(t *testing.T) {
{
name: "metrics_to_logs",
createFn: func(ctx context.Context, set connector.Settings, cfg component.Config) (component.Component, error) {
router := connector.NewLogsRouter(map[component.ID]consumer.Logs{component.NewID(component.DataTypeLogs): consumertest.NewNop()})
router := connector.NewLogsRouter(map[component.PipelineID]consumer.Logs{component.NewPipelineID(component.SignalLogs): consumertest.NewNop()})
return factory.CreateMetricsToLogs(ctx, set, cfg, router)
},
},
Expand All @@ -408,7 +408,7 @@ func TestComponentLifecycle(t *testing.T) {
{
name: "metrics_to_metrics",
createFn: func(ctx context.Context, set connector.Settings, cfg component.Config) (component.Component, error) {
router := connector.NewMetricsRouter(map[component.ID]consumer.Metrics{component.NewID(component.DataTypeMetrics): consumertest.NewNop()})
router := connector.NewMetricsRouter(map[component.PipelineID]consumer.Metrics{component.NewPipelineID(component.SignalMetrics): consumertest.NewNop()})
return factory.CreateMetricsToMetrics(ctx, set, cfg, router)
},
},
Expand All @@ -417,7 +417,7 @@ func TestComponentLifecycle(t *testing.T) {
{
name: "metrics_to_traces",
createFn: func(ctx context.Context, set connector.Settings, cfg component.Config) (component.Component, error) {
router := connector.NewTracesRouter(map[component.ID]consumer.Traces{component.NewID(component.DataTypeTraces): consumertest.NewNop()})
router := connector.NewTracesRouter(map[component.PipelineID]consumer.Traces{component.NewPipelineID(component.SignalTraces): consumertest.NewNop()})
return factory.CreateMetricsToTraces(ctx, set, cfg, router)
},
},
Expand All @@ -426,7 +426,7 @@ func TestComponentLifecycle(t *testing.T) {
{
name: "traces_to_logs",
createFn: func(ctx context.Context, set connector.Settings, cfg component.Config) (component.Component, error) {
router := connector.NewLogsRouter(map[component.ID]consumer.Logs{component.NewID(component.DataTypeLogs): consumertest.NewNop()})
router := connector.NewLogsRouter(map[component.PipelineID]consumer.Logs{component.NewPipelineID(component.SignalLogs): consumertest.NewNop()})
return factory.CreateTracesToLogs(ctx, set, cfg, router)
},
},
Expand All @@ -435,7 +435,7 @@ func TestComponentLifecycle(t *testing.T) {
{
name: "traces_to_metrics",
createFn: func(ctx context.Context, set connector.Settings, cfg component.Config) (component.Component, error) {
router := connector.NewMetricsRouter(map[component.ID]consumer.Metrics{component.NewID(component.DataTypeMetrics): consumertest.NewNop()})
router := connector.NewMetricsRouter(map[component.PipelineID]consumer.Metrics{component.NewPipelineID(component.SignalMetrics): consumertest.NewNop()})
return factory.CreateTracesToMetrics(ctx, set, cfg, router)
},
},
Expand All @@ -444,7 +444,7 @@ func TestComponentLifecycle(t *testing.T) {
{
name: "traces_to_traces",
createFn: func(ctx context.Context, set connector.Settings, cfg component.Config) (component.Component, error) {
router := connector.NewTracesRouter(map[component.ID]consumer.Traces{component.NewID(component.DataTypeTraces): consumertest.NewNop()})
router := connector.NewTracesRouter(map[component.PipelineID]consumer.Traces{component.NewPipelineID(component.SignalTraces): consumertest.NewNop()})
return factory.CreateTracesToTraces(ctx, set, cfg, router)
},
},
Expand Down
9 changes: 2 additions & 7 deletions component/componentprofiles/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,6 @@ package componentprofiles // import "go.opentelemetry.io/collector/component/com

import "go.opentelemetry.io/collector/component"

func mustNewDataType(strType string) component.DataType {
return component.MustNewType(strType)
}

var (
// DataTypeProfiles is the data type tag for profiles.
DataTypeProfiles = mustNewDataType("profiles")
const (
SignalProfiles = component.Signal("profiles")
)
10 changes: 5 additions & 5 deletions component/componentstatus/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type InstanceID struct {
}

// NewInstanceID returns an ID that uniquely identifies a component.
func NewInstanceID(componentID component.ID, kind component.Kind, pipelineIDs ...component.ID) *InstanceID {
func NewInstanceID(componentID component.ID, kind component.Kind, pipelineIDs ...component.PipelineID) *InstanceID {
instanceID := &InstanceID{
componentID: componentID,
kind: kind,
Expand All @@ -47,14 +47,14 @@ func (id *InstanceID) Kind() component.Kind {

// AllPipelineIDs calls f for each pipeline this instance is associated with. If
// f returns false it will stop iteration.
func (id *InstanceID) AllPipelineIDs(f func(component.ID) bool) {
func (id *InstanceID) AllPipelineIDs(f func(component.PipelineID) bool) {
var bs []byte
for _, b := range []byte(id.pipelineIDs) {
if b != pipelineDelim {
bs = append(bs, b)
continue
}
pipelineID := component.ID{}
pipelineID := component.PipelineID{}
err := pipelineID.UnmarshalText(bs)
bs = bs[:0]
if err != nil {
Expand All @@ -68,7 +68,7 @@ func (id *InstanceID) AllPipelineIDs(f func(component.ID) bool) {

// WithPipelines returns a new InstanceID updated to include the given
// pipelineIDs.
func (id *InstanceID) WithPipelines(pipelineIDs ...component.ID) *InstanceID {
func (id *InstanceID) WithPipelines(pipelineIDs ...component.PipelineID) *InstanceID {
instanceID := &InstanceID{
componentID: id.componentID,
kind: id.kind,
Expand All @@ -78,7 +78,7 @@ func (id *InstanceID) WithPipelines(pipelineIDs ...component.ID) *InstanceID {
return instanceID
}

func (id *InstanceID) addPipelines(pipelineIDs []component.ID) {
func (id *InstanceID) addPipelines(pipelineIDs []component.PipelineID) {
delim := string(pipelineDelim)
strIDs := strings.Split(id.pipelineIDs, delim)
for _, pID := range pipelineIDs {
Expand Down
32 changes: 16 additions & 16 deletions component/componentstatus/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@ import (

func TestInstanceID(t *testing.T) {
traces := component.MustNewID("traces")
tracesA := component.MustNewIDWithName("traces", "a")
tracesB := component.MustNewIDWithName("traces", "b")
tracesC := component.MustNewIDWithName("traces", "c")
tracesA := component.NewPipelineIDWithName("traces", "a")
tracesB := component.NewPipelineIDWithName("traces", "b")
tracesC := component.NewPipelineIDWithName("traces", "c")

idTracesA := NewInstanceID(traces, component.KindReceiver, tracesA)
idTracesAll := NewInstanceID(traces, component.KindReceiver, tracesA, tracesB, tracesC)
assert.NotEqual(t, idTracesA, idTracesAll)

assertHasPipelines := func(t *testing.T, instanceID *InstanceID, expectedPipelineIDs []component.ID) {
var pipelineIDs []component.ID
instanceID.AllPipelineIDs(func(id component.ID) bool {
assertHasPipelines := func(t *testing.T, instanceID *InstanceID, expectedPipelineIDs []component.PipelineID) {
var pipelineIDs []component.PipelineID
instanceID.AllPipelineIDs(func(id component.PipelineID) bool {
pipelineIDs = append(pipelineIDs, id)
return true
})
Expand All @@ -34,31 +34,31 @@ func TestInstanceID(t *testing.T) {
name string
id1 *InstanceID
id2 *InstanceID
pipelineIDs []component.ID
pipelineIDs []component.PipelineID
}{
{
name: "equal instances",
id1: idTracesA,
id2: NewInstanceID(traces, component.KindReceiver, tracesA),
pipelineIDs: []component.ID{tracesA},
pipelineIDs: []component.PipelineID{tracesA},
},
{
name: "equal instances - out of order",
id1: idTracesAll,
id2: NewInstanceID(traces, component.KindReceiver, tracesC, tracesB, tracesA),
pipelineIDs: []component.ID{tracesA, tracesB, tracesC},
pipelineIDs: []component.PipelineID{tracesA, tracesB, tracesC},
},
{
name: "with pipelines",
id1: idTracesAll,
id2: idTracesA.WithPipelines(tracesB, tracesC),
pipelineIDs: []component.ID{tracesA, tracesB, tracesC},
pipelineIDs: []component.PipelineID{tracesA, tracesB, tracesC},
},
{
name: "with pipelines - out of order",
id1: idTracesAll,
id2: idTracesA.WithPipelines(tracesC, tracesB),
pipelineIDs: []component.ID{tracesA, tracesB, tracesC},
pipelineIDs: []component.PipelineID{tracesA, tracesB, tracesC},
},
} {
t.Run(tc.name, func(t *testing.T) {
Expand All @@ -73,20 +73,20 @@ func TestAllPipelineIDs(t *testing.T) {
instanceID := NewInstanceID(
component.MustNewID("traces"),
component.KindReceiver,
component.MustNewIDWithName("traces", "a"),
component.MustNewIDWithName("traces", "b"),
component.MustNewIDWithName("traces", "c"),
component.NewPipelineIDWithName("traces", "a"),
component.NewPipelineIDWithName("traces", "b"),
component.NewPipelineIDWithName("traces", "c"),
)

count := 0
instanceID.AllPipelineIDs(func(component.ID) bool {
instanceID.AllPipelineIDs(func(component.PipelineID) bool {
count++
return true
})
assert.Equal(t, 3, count)

count = 0
instanceID.AllPipelineIDs(func(component.ID) bool {
instanceID.AllPipelineIDs(func(component.PipelineID) bool {
count++
return false
})
Expand Down
45 changes: 45 additions & 0 deletions component/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package component // import "go.opentelemetry.io/collector/component"

import (
"errors"
"fmt"
"reflect"
"regexp"
Expand Down Expand Up @@ -147,6 +148,8 @@ func MustNewType(strType string) Type {

// DataType is a special Type that represents the data types supported by the collector. We currently support
// collecting metrics, traces and logs, this can expand in the future.
//
// Deprecated: [v0.108.0] Use Signal instead
type DataType = Type

func mustNewDataType(strType string) DataType {
Expand All @@ -156,12 +159,18 @@ func mustNewDataType(strType string) DataType {
// Currently supported data types. Add new data types here when new types are supported in the future.
var (
// DataTypeTraces is the data type tag for traces.
//
// Deprecated: [v0.108.0] Use SignalTraces instead
DataTypeTraces = mustNewDataType("traces")

// DataTypeMetrics is the data type tag for metrics.
//
// Deprecated: [v0.108.0] Use SignalMetrics instead
DataTypeMetrics = mustNewDataType("metrics")

// DataTypeLogs is the data type tag for logs.
//
// Deprecated: [v0.108.0] Use SignalLogs instead
DataTypeLogs = mustNewDataType("logs")
)

Expand All @@ -179,3 +188,39 @@ func validateName(nameStr string) error {
}
return nil
}

type Signal string

const (
SignalTraces Signal = "traces"
SignalMetrics Signal = "metrics"
SignalLogs Signal = "logs"
)

func (s Signal) String() string {
return string(s)

Check warning on line 201 in component/config.go

View check run for this annotation

Codecov / codecov/patch

component/config.go#L200-L201

Added lines #L200 - L201 were not covered by tests
}

func (s Signal) MarshalText() (text []byte, err error) {
return []byte(s), nil

Check warning on line 205 in component/config.go

View check run for this annotation

Codecov / codecov/patch

component/config.go#L204-L205

Added lines #L204 - L205 were not covered by tests
}

func (s *Signal) UnmarshalText(text []byte) error {
if len(text) == 0 {
return errors.New("id must not be empty")

Check warning on line 210 in component/config.go

View check run for this annotation

Codecov / codecov/patch

component/config.go#L208-L210

Added lines #L208 - L210 were not covered by tests
}
strText := string(text)
switch strText {
case "traces":
*s = SignalTraces
return nil
case "metrics":
*s = SignalMetrics
return nil
case "logs":
*s = SignalLogs
return nil
default:
return fmt.Errorf("invalid signal %q", strText)

Check warning on line 224 in component/config.go

View check run for this annotation

Codecov / codecov/patch

component/config.go#L212-L224

Added lines #L212 - L224 were not covered by tests
}
}
71 changes: 71 additions & 0 deletions component/identifiable.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,74 @@ func (id ID) String() string {

return id.typeVal.String() + typeAndNameSeparator + id.nameVal
}

type PipelineID struct {
signal Signal `mapstructure:"-"`
nameVal string `mapstructure:"-"`
}

func NewPipelineID(signal Signal) PipelineID {
return PipelineID{signal: signal}

Check warning on line 114 in component/identifiable.go

View check run for this annotation

Codecov / codecov/patch

component/identifiable.go#L113-L114

Added lines #L113 - L114 were not covered by tests
}

func NewPipelineIDWithName(signal Signal, nameVal string) PipelineID {
return PipelineID{signal: signal, nameVal: nameVal}

Check warning on line 118 in component/identifiable.go

View check run for this annotation

Codecov / codecov/patch

component/identifiable.go#L117-L118

Added lines #L117 - L118 were not covered by tests
}

func (p PipelineID) Signal() Signal {
return p.signal

Check warning on line 122 in component/identifiable.go

View check run for this annotation

Codecov / codecov/patch

component/identifiable.go#L121-L122

Added lines #L121 - L122 were not covered by tests
}

func (p PipelineID) Name() string {
return p.nameVal

Check warning on line 126 in component/identifiable.go

View check run for this annotation

Codecov / codecov/patch

component/identifiable.go#L125-L126

Added lines #L125 - L126 were not covered by tests
}

func (p PipelineID) MarshalText() (text []byte, err error) {
return []byte(p.String()), nil

Check warning on line 130 in component/identifiable.go

View check run for this annotation

Codecov / codecov/patch

component/identifiable.go#L129-L130

Added lines #L129 - L130 were not covered by tests
}

// UnmarshalText implements the encoding.TextUnmarshaler interface.
func (p *PipelineID) UnmarshalText(text []byte) error {
pipelineStr := string(text)
items := strings.SplitN(pipelineStr, typeAndNameSeparator, 2)
var signalStr, nameStr string
if len(items) >= 1 {
signalStr = strings.TrimSpace(items[0])

Check warning on line 139 in component/identifiable.go

View check run for this annotation

Codecov / codecov/patch

component/identifiable.go#L134-L139

Added lines #L134 - L139 were not covered by tests
}

if len(items) == 1 && signalStr == "" {
return errors.New("id must not be empty")

Check warning on line 143 in component/identifiable.go

View check run for this annotation

Codecov / codecov/patch

component/identifiable.go#L142-L143

Added lines #L142 - L143 were not covered by tests
}

if signalStr == "" {
return fmt.Errorf("in %q id: the part before %s should not be empty", pipelineStr, typeAndNameSeparator)

Check warning on line 147 in component/identifiable.go

View check run for this annotation

Codecov / codecov/patch

component/identifiable.go#L146-L147

Added lines #L146 - L147 were not covered by tests
}

err := p.signal.UnmarshalText([]byte(signalStr))
if err != nil {
return err

Check warning on line 152 in component/identifiable.go

View check run for this annotation

Codecov / codecov/patch

component/identifiable.go#L150-L152

Added lines #L150 - L152 were not covered by tests
}

if len(items) > 1 {

Check warning on line 155 in component/identifiable.go

View check run for this annotation

Codecov / codecov/patch

component/identifiable.go#L155

Added line #L155 was not covered by tests
// "name" part is present.
nameStr = strings.TrimSpace(items[1])
if nameStr == "" {
return fmt.Errorf("in %q id: the part after %s should not be empty", pipelineStr, typeAndNameSeparator)

Check warning on line 159 in component/identifiable.go

View check run for this annotation

Codecov / codecov/patch

component/identifiable.go#L157-L159

Added lines #L157 - L159 were not covered by tests
}
if err := validateName(nameStr); err != nil {
return fmt.Errorf("in %q id: %w", nameStr, err)

Check warning on line 162 in component/identifiable.go

View check run for this annotation

Codecov / codecov/patch

component/identifiable.go#L161-L162

Added lines #L161 - L162 were not covered by tests
}
}
p.nameVal = nameStr

Check warning on line 165 in component/identifiable.go

View check run for this annotation

Codecov / codecov/patch

component/identifiable.go#L165

Added line #L165 was not covered by tests

return nil

Check warning on line 167 in component/identifiable.go

View check run for this annotation

Codecov / codecov/patch

component/identifiable.go#L167

Added line #L167 was not covered by tests
}

// String returns the ID string representation as "type[/name]" format.
func (p PipelineID) String() string {
if p.nameVal == "" {
return p.signal.String()

Check warning on line 173 in component/identifiable.go

View check run for this annotation

Codecov / codecov/patch

component/identifiable.go#L171-L173

Added lines #L171 - L173 were not covered by tests
}

return p.signal.String() + typeAndNameSeparator + p.nameVal

Check warning on line 176 in component/identifiable.go

View check run for this annotation

Codecov / codecov/patch

component/identifiable.go#L176

Added line #L176 was not covered by tests
}
Loading

0 comments on commit 0d37132

Please sign in to comment.