Skip to content

Commit

Permalink
Make InstanceID immutable
Browse files Browse the repository at this point in the history
  • Loading branch information
mwear committed Jul 1, 2024
1 parent a082f2e commit 26d02fb
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 49 deletions.
25 changes: 25 additions & 0 deletions .chloggen/immutable-instance-id.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'breaking'

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: component

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Make InstanceID immutable. This hides previously exported fields, making it a breaking change.

# One or more tracking issues or pull requests related to the change
issues: [10494]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
53 changes: 49 additions & 4 deletions component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,54 @@ func (f CreateDefaultConfigFunc) CreateDefaultConfig() Config {
return f()
}

// InstanceID uniquely identifies a component instance
// InstanceID uniquely identifies a component instance.
type InstanceID struct {
ID ID
Kind Kind
PipelineIDs map[ID]struct{}
componentID ID
kind Kind
pipelineIDs map[ID]struct{}
}

// ComponentID returns the ComponentID associated with this instance.
func (id InstanceID) ComponentID() ID {
return id.componentID
}

// Kind returns the component Kind associated with this instance.
func (id InstanceID) Kind() Kind {
return id.kind
}

// PipelineIDs returns a set of PipelineIDs associated with this instance.
func (id InstanceID) PipelineIDs() map[ID]struct{} {
return id.pipelineIDs
}

// NewInstanceID returns an ID that uniquely identifies a component.
func NewInstanceID(componentID ID, kind Kind, pipelineIDs ...ID) *InstanceID {
instanceID := InstanceID{
componentID: componentID,
kind: kind,
pipelineIDs: make(map[ID]struct{}, len(pipelineIDs)),
}
for _, pid := range pipelineIDs {
instanceID.pipelineIDs[pid] = struct{}{}
}
return &instanceID
}

// InstanceIDWithPipelines derives a new InstanceID from id with additional
// pipelineIDs added to it.
func InstanceIDWithPipelines(id *InstanceID, pipelineIDs ...ID) *InstanceID {
instanceID := InstanceID{
componentID: id.ComponentID(),
kind: id.Kind(),
pipelineIDs: make(map[ID]struct{}, len(id.PipelineIDs())+len(pipelineIDs)),
}
for pid := range id.PipelineIDs() {
instanceID.pipelineIDs[pid] = struct{}{}
}
for _, pid := range pipelineIDs {
instanceID.pipelineIDs[pid] = struct{}{}
}
return &instanceID
}
24 changes: 24 additions & 0 deletions component/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,27 @@ func TestStabilityLevelString(t *testing.T) {
assert.EqualValues(t, "Stable", StabilityLevelStable.String())
assert.EqualValues(t, "", StabilityLevel(100).String())
}

func TestInstanceID(t *testing.T) {
traces := MustNewID("traces")
metrics := MustNewID("metrics")
logs := MustNewID("logs")
receiver := MustNewID("receiver")

id1 := NewInstanceID(receiver, KindReceiver, traces)
id2 := InstanceIDWithPipelines(id1, metrics, logs)

assert.Equal(t, receiver, id1.ComponentID())
assert.Equal(t, KindReceiver, id1.Kind())
assert.Equal(t, map[ID]struct{}{
traces: {},
}, id1.pipelineIDs)

assert.Equal(t, receiver, id2.ComponentID())
assert.Equal(t, KindReceiver, id2.Kind())
assert.Equal(t, map[ID]struct{}{
traces: {},
metrics: {},
logs: {},
}, id2.pipelineIDs)
}
4 changes: 2 additions & 2 deletions otelcol/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func TestComponentStatusWatcher(t *testing.T) {
changedComponents := map[*component.InstanceID][]component.Status{}
var mux sync.Mutex
onStatusChanged := func(source *component.InstanceID, event *component.StatusEvent) {
if source.ID.Type() != unhealthyProcessorFactory.Type() {
if source.ComponentID().Type() != unhealthyProcessorFactory.Type() {
return
}
mux.Lock()
Expand Down Expand Up @@ -199,7 +199,7 @@ func TestComponentStatusWatcher(t *testing.T) {

for k, v := range changedComponents {
// All processors must report a status change with the same ID
assert.EqualValues(t, component.NewID(unhealthyProcessorFactory.Type()), k.ID)
assert.EqualValues(t, component.NewID(unhealthyProcessorFactory.Type()), k.ComponentID())
// And all must have a valid startup sequence
assert.Equal(t, startupStatuses(v), v)
}
Expand Down
5 changes: 1 addition & 4 deletions service/extensions/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,7 @@ func New(ctx context.Context, set Settings, cfg Config) (*Extensions, error) {
extensionIDs: make([]component.ID, 0, len(cfg)),
}
for _, extID := range cfg {
instanceID := &component.InstanceID{
ID: extID,
Kind: component.KindExtension,
}
instanceID := component.NewInstanceID(extID, component.KindExtension)
extSet := extension.Settings{
ID: extID,
TelemetrySettings: set.Telemetry.ToComponentTelemetrySettings(instanceID),
Expand Down
53 changes: 20 additions & 33 deletions service/internal/graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,67 +193,54 @@ func (g *Graph) createNodes(set Settings) error {
func (g *Graph) createReceiver(pipelineID, recvID component.ID) *receiverNode {
rcvrNode := newReceiverNode(pipelineID.Type(), recvID)
if node := g.componentGraph.Node(rcvrNode.ID()); node != nil {
g.instanceIDs[node.ID()].PipelineIDs[pipelineID] = struct{}{}
instanceID := g.instanceIDs[node.ID()]
g.instanceIDs[node.ID()] = component.InstanceIDWithPipelines(instanceID, pipelineID)
return node.(*receiverNode)
}
g.componentGraph.AddNode(rcvrNode)
g.instanceIDs[rcvrNode.ID()] = &component.InstanceID{
ID: recvID,
Kind: component.KindReceiver,
PipelineIDs: map[component.ID]struct{}{
pipelineID: {},
},
}
g.instanceIDs[rcvrNode.ID()] = component.NewInstanceID(
recvID, component.KindReceiver, pipelineID,
)
return rcvrNode
}

func (g *Graph) createProcessor(pipelineID, procID component.ID) *processorNode {
procNode := newProcessorNode(pipelineID, procID)
g.componentGraph.AddNode(procNode)
g.instanceIDs[procNode.ID()] = &component.InstanceID{
ID: procID,
Kind: component.KindProcessor,
PipelineIDs: map[component.ID]struct{}{
pipelineID: {},
},
}
g.instanceIDs[procNode.ID()] = component.NewInstanceID(
procID, component.KindProcessor, pipelineID,
)
return procNode
}

func (g *Graph) createExporter(pipelineID, exprID component.ID) *exporterNode {
expNode := newExporterNode(pipelineID.Type(), exprID)
if node := g.componentGraph.Node(expNode.ID()); node != nil {
g.instanceIDs[expNode.ID()].PipelineIDs[pipelineID] = struct{}{}
instanceID := g.instanceIDs[expNode.ID()]
g.instanceIDs[expNode.ID()] = component.InstanceIDWithPipelines(instanceID, pipelineID)
return node.(*exporterNode)
}
g.componentGraph.AddNode(expNode)
g.instanceIDs[expNode.ID()] = &component.InstanceID{
ID: expNode.componentID,
Kind: component.KindExporter,
PipelineIDs: map[component.ID]struct{}{
pipelineID: {},
},
}
g.instanceIDs[expNode.ID()] = component.NewInstanceID(
expNode.componentID, component.KindExporter, pipelineID,
)
return expNode
}

func (g *Graph) createConnector(exprPipelineID, rcvrPipelineID, connID component.ID) *connectorNode {
connNode := newConnectorNode(exprPipelineID.Type(), rcvrPipelineID.Type(), connID)
if node := g.componentGraph.Node(connNode.ID()); node != nil {
instanceID := g.instanceIDs[connNode.ID()]
instanceID.PipelineIDs[exprPipelineID] = struct{}{}
instanceID.PipelineIDs[rcvrPipelineID] = struct{}{}
g.instanceIDs[connNode.ID()] = component.InstanceIDWithPipelines(
instanceID, exprPipelineID, rcvrPipelineID,
)
return node.(*connectorNode)
}
g.componentGraph.AddNode(connNode)
g.instanceIDs[connNode.ID()] = &component.InstanceID{
ID: connNode.componentID,
Kind: component.KindConnector,
PipelineIDs: map[component.ID]struct{}{
exprPipelineID: {},
rcvrPipelineID: {},
},
}

g.instanceIDs[connNode.ID()] = component.NewInstanceID(
connNode.componentID, component.KindConnector, exprPipelineID, rcvrPipelineID,
)
return connNode
}

Expand Down
12 changes: 6 additions & 6 deletions service/internal/graph/graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2215,12 +2215,12 @@ func TestStatusReportedOnStartupShutdown(t *testing.T) {
eSdErr := &testNode{id: component.MustNewIDWithName("e_sd_err", "1"), shutdownErr: assert.AnError}

instanceIDs := map[*testNode]*component.InstanceID{
rNoErr: {ID: rNoErr.id},
rStErr: {ID: rStErr.id},
rSdErr: {ID: rSdErr.id},
eNoErr: {ID: eNoErr.id},
eStErr: {ID: eStErr.id},
eSdErr: {ID: eSdErr.id},
rNoErr: component.NewInstanceID(rNoErr.id, component.KindReceiver),
rStErr: component.NewInstanceID(rStErr.id, component.KindReceiver),
rSdErr: component.NewInstanceID(rSdErr.id, component.KindReceiver),
eNoErr: component.NewInstanceID(eNoErr.id, component.KindExporter),
eStErr: component.NewInstanceID(eStErr.id, component.KindExporter),
eSdErr: component.NewInstanceID(eSdErr.id, component.KindExporter),
}

// compare two maps of status events ignoring timestamp
Expand Down

0 comments on commit 26d02fb

Please sign in to comment.