Skip to content

Commit

Permalink
Split connection settings by types (#82)
Browse files Browse the repository at this point in the history
This changes splits connection settings by the type of the connection so that
each type has a corresponding message that records the appropriate data.

Implements spec change open-telemetry/opamp-spec#87
  • Loading branch information
tigrannajaryan committed May 26, 2022
1 parent afb132b commit 0403a82
Show file tree
Hide file tree
Showing 9 changed files with 789 additions and 661 deletions.
18 changes: 9 additions & 9 deletions client/clientimpl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,11 +618,11 @@ func TestAgentIdentification(t *testing.T) {
func TestConnectionSettings(t *testing.T) {
testClients(t, func(t *testing.T, client OpAMPClient) {
hash := []byte{1, 2, 3}
opampSettings := &protobufs.ConnectionSettings{DestinationEndpoint: "http://opamp.com"}
metricsSettings := &protobufs.ConnectionSettings{DestinationEndpoint: "http://metrics.com"}
tracesSettings := &protobufs.ConnectionSettings{DestinationEndpoint: "http://traces.com"}
logsSettings := &protobufs.ConnectionSettings{DestinationEndpoint: "http://logs.com"}
otherSettings := &protobufs.ConnectionSettings{DestinationEndpoint: "http://other.com"}
opampSettings := &protobufs.OpAMPConnectionSettings{DestinationEndpoint: "http://opamp.com"}
metricsSettings := &protobufs.TelemetryConnectionSettings{DestinationEndpoint: "http://metrics.com"}
tracesSettings := &protobufs.TelemetryConnectionSettings{DestinationEndpoint: "http://traces.com"}
logsSettings := &protobufs.TelemetryConnectionSettings{DestinationEndpoint: "http://logs.com"}
otherSettings := &protobufs.OtherConnectionSettings{DestinationEndpoint: "http://other.com"}

var rcvStatus int64
// Start a Server.
Expand All @@ -638,7 +638,7 @@ func TestConnectionSettings(t *testing.T) {
OwnMetrics: metricsSettings,
OwnTraces: tracesSettings,
OwnLogs: logsSettings,
OtherConnections: map[string]*protobufs.ConnectionSettings{
OtherConnections: map[string]*protobufs.OtherConnectionSettings{
"other": otherSettings,
},
},
Expand All @@ -655,7 +655,7 @@ func TestConnectionSettings(t *testing.T) {
settings := types.StartSettings{
Callbacks: types.CallbacksStruct{
OnOpampConnectionSettingsFunc: func(
ctx context.Context, settings *protobufs.ConnectionSettings,
ctx context.Context, settings *protobufs.OpAMPConnectionSettings,
) error {
assert.True(t, proto.Equal(opampSettings, settings))
atomic.AddInt64(&gotOpampSettings, 1)
Expand All @@ -664,7 +664,7 @@ func TestConnectionSettings(t *testing.T) {

OnOwnTelemetryConnectionSettingsFunc: func(
ctx context.Context, telemetryType types.OwnTelemetryType,
settings *protobufs.ConnectionSettings,
settings *protobufs.TelemetryConnectionSettings,
) error {
switch telemetryType {
case types.OwnMetrics:
Expand All @@ -680,7 +680,7 @@ func TestConnectionSettings(t *testing.T) {

OnOtherConnectionSettingsFunc: func(
ctx context.Context, name string,
settings *protobufs.ConnectionSettings,
settings *protobufs.OtherConnectionSettings,
) error {
assert.EqualValues(t, "other", name)
assert.True(t, proto.Equal(otherSettings, settings))
Expand Down
8 changes: 4 additions & 4 deletions client/internal/receivedprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,9 @@ func (r *receivedProcessor) rcvConnectionSettings(ctx context.Context, settings

func (r *receivedProcessor) rcvOwnTelemetryConnectionSettings(
ctx context.Context,
settings *protobufs.ConnectionSettings,
settings *protobufs.TelemetryConnectionSettings,
telemetryType types.OwnTelemetryType,
callback func(ctx context.Context, telemetryType types.OwnTelemetryType, settings *protobufs.ConnectionSettings) error,
callback func(ctx context.Context, telemetryType types.OwnTelemetryType, settings *protobufs.TelemetryConnectionSettings) error,
) {
if settings != nil {
callback(ctx, telemetryType, settings)
Expand All @@ -233,9 +233,9 @@ func (r *receivedProcessor) rcvOwnTelemetryConnectionSettings(

func (r *receivedProcessor) rcvOtherConnectionSettings(
ctx context.Context,
settings *protobufs.ConnectionSettings,
settings *protobufs.OtherConnectionSettings,
name string,
callback func(ctx context.Context, name string, settings *protobufs.ConnectionSettings) error,
callback func(ctx context.Context, name string, settings *protobufs.OtherConnectionSettings) error,
) {
if settings != nil {
callback(ctx, name, settings)
Expand Down
24 changes: 12 additions & 12 deletions client/types/callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,15 @@ type Callbacks interface {
// See OnRemoteConfig for the behavior.
OnOpampConnectionSettings(
ctx context.Context,
settings *protobufs.ConnectionSettings,
settings *protobufs.OpAMPConnectionSettings,
) error

// OnOpampConnectionSettingsAccepted will be called after the settings are
// verified and accepted (OnOpampConnectionSettingsOffer and connection using
// new settings succeeds). The Agent should store the settings and use them
// in the future. Old connection settings should be forgotten.
OnOpampConnectionSettingsAccepted(
settings *protobufs.ConnectionSettings,
settings *protobufs.OpAMPConnectionSettings,
)

// OnOwnTelemetryConnectionSettings is called when the Agent receives a
Expand All @@ -116,7 +116,7 @@ type Callbacks interface {
OnOwnTelemetryConnectionSettings(
ctx context.Context,
telemetryType OwnTelemetryType,
settings *protobufs.ConnectionSettings,
settings *protobufs.TelemetryConnectionSettings,
) error

// OnOtherConnectionSettings is called when the Agent receives a
Expand All @@ -131,7 +131,7 @@ type Callbacks interface {
OnOtherConnectionSettings(
ctx context.Context,
name string,
certificate *protobufs.ConnectionSettings,
certificate *protobufs.OtherConnectionSettings,
) error

// OnPackagesAvailable is called when the Server has packages available which are
Expand Down Expand Up @@ -169,22 +169,22 @@ type CallbacksStruct struct {

OnOpampConnectionSettingsFunc func(
ctx context.Context,
settings *protobufs.ConnectionSettings,
settings *protobufs.OpAMPConnectionSettings,
) error
OnOpampConnectionSettingsAcceptedFunc func(
settings *protobufs.ConnectionSettings,
settings *protobufs.OpAMPConnectionSettings,
)

OnOwnTelemetryConnectionSettingsFunc func(
ctx context.Context,
telemetryType OwnTelemetryType,
settings *protobufs.ConnectionSettings,
settings *protobufs.TelemetryConnectionSettings,
) error

OnOtherConnectionSettingsFunc func(
ctx context.Context,
name string,
settings *protobufs.ConnectionSettings,
settings *protobufs.OtherConnectionSettings,
) error

OnPackagesAvailableFunc func(ctx context.Context, packages *protobufs.PackagesAvailable, syncer PackagesSyncer) error
Expand Down Expand Up @@ -237,23 +237,23 @@ func (c CallbacksStruct) GetEffectiveConfig(ctx context.Context) (*protobufs.Eff
}

func (c CallbacksStruct) OnOpampConnectionSettings(
ctx context.Context, settings *protobufs.ConnectionSettings,
ctx context.Context, settings *protobufs.OpAMPConnectionSettings,
) error {
if c.OnOpampConnectionSettingsFunc != nil {
return c.OnOpampConnectionSettingsFunc(ctx, settings)
}
return nil
}

func (c CallbacksStruct) OnOpampConnectionSettingsAccepted(settings *protobufs.ConnectionSettings) {
func (c CallbacksStruct) OnOpampConnectionSettingsAccepted(settings *protobufs.OpAMPConnectionSettings) {
if c.OnOpampConnectionSettingsAcceptedFunc != nil {
c.OnOpampConnectionSettingsAcceptedFunc(settings)
}
}

func (c CallbacksStruct) OnOwnTelemetryConnectionSettings(
ctx context.Context, telemetryType OwnTelemetryType,
settings *protobufs.ConnectionSettings,
settings *protobufs.TelemetryConnectionSettings,
) error {
if c.OnOwnTelemetryConnectionSettingsFunc != nil {
return c.OnOwnTelemetryConnectionSettingsFunc(ctx, telemetryType, settings)
Expand All @@ -262,7 +262,7 @@ func (c CallbacksStruct) OnOwnTelemetryConnectionSettings(
}

func (c CallbacksStruct) OnOtherConnectionSettings(
ctx context.Context, name string, settings *protobufs.ConnectionSettings,
ctx context.Context, name string, settings *protobufs.OtherConnectionSettings,
) error {
if c.OnOtherConnectionSettingsFunc != nil {
return c.OnOtherConnectionSettingsFunc(ctx, name, settings)
Expand Down
4 changes: 2 additions & 2 deletions internal/examples/agent/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (agent *Agent) onRemoteConfig(
func (agent *Agent) onOwnTelemetryConnectionSettings(
_ context.Context,
telemetryType types.OwnTelemetryType,
settings *protobufs.ConnectionSettings,
settings *protobufs.TelemetryConnectionSettings,
) error {
switch telemetryType {
case types.OwnMetrics:
Expand All @@ -243,7 +243,7 @@ func (agent *Agent) onAgentIdentificationFunc(
return nil
}

func (agent *Agent) initMeter(settings *protobufs.ConnectionSettings) {
func (agent *Agent) initMeter(settings *protobufs.TelemetryConnectionSettings) {
reporter, err := NewMetricReporter(agent.logger, settings, agent.agentType, agent.agentVersion, agent.instanceId)
if err != nil {
agent.logger.Errorf("Cannot collect metrics: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion internal/examples/agent/agent/metricreporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type MetricReporter struct {

func NewMetricReporter(
logger types.Logger,
dest *protobufs.ConnectionSettings,
dest *protobufs.TelemetryConnectionSettings,
agentType string,
agentVersion string,
instanceId ulid.ULID,
Expand Down
2 changes: 1 addition & 1 deletion internal/examples/server/data/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func (agent *Agent) calcConnectionSettings(response *protobufs.ServerToAgent) {
response.ConnectionSettings = &protobufs.ConnectionSettingsOffers{
Hash: nil, // TODO: calc has from settings.
Opamp: nil,
OwnMetrics: &protobufs.ConnectionSettings{
OwnMetrics: &protobufs.TelemetryConnectionSettings{
// We just hard-code this to a port on a localhost on which we can
// run an Otel Collector for demo purposes. With real production
// servers this should likely point to an OTLP backend.
Expand Down
4 changes: 2 additions & 2 deletions internal/examples/supervisor/supervisor/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (s *Supervisor) onRemoteConfig(
func (s *Supervisor) onOwnTelemetryConnectionSettings(
ctx context.Context,
telemetryType types.OwnTelemetryType,
settings *protobufs.ConnectionSettings,
settings *protobufs.TelemetryConnectionSettings,
) error {
switch telemetryType {
case types.OwnMetrics:
Expand Down Expand Up @@ -298,7 +298,7 @@ func (s *Supervisor) onAgentIdentificationFunc(
return nil
}

func (s *Supervisor) setupOwnMetrics(ctx context.Context, settings *protobufs.ConnectionSettings) {
func (s *Supervisor) setupOwnMetrics(ctx context.Context, settings *protobufs.TelemetryConnectionSettings) {
var cfg string
if settings.DestinationEndpoint == "" {
// No destination. Disable metric collection.
Expand Down
117 changes: 64 additions & 53 deletions internal/proto/opamp.proto
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,56 @@ enum ServerCapabilities {
// Add new capabilities here, continuing with the least significant unused bit.
}

// The ConnectionSettings message is a collection of fields which comprise an
// The OpAMPConnectionSettings message is a collection of fields which comprise an
// offer from the Server to the Agent to use the specified settings for OpAMP
// connection.
message OpAMPConnectionSettings {
// OpAMP Server URL This MUST be a WebSocket or HTTP URL and MUST be non-empty, for
// example: "wss://example.com:4318/v1/opamp"
string destination_endpoint = 1;

// Optional headers to use when connecting. Typically used to set access tokens or
// other authorization headers. For HTTP-based protocols the Agent should
// set these in the request headers.
// For example:
// key="Authorization", Value="Basic YWxhZGRpbjpvcGVuc2VzYW1l".
Headers headers = 2;

// The Agent should use the offered certificate to connect to the destination
// from now on. If the Agent is able to validate and connect using the offered
// certificate the Agent SHOULD forget any previous client certificates
// for this connection.
// This field is optional: if omitted the client SHOULD NOT use a client-side certificate.
// This field can be used to perform a client certificate revocation/rotation.
TLSCertificate certificate = 3;
}

// The TelemetryConnectionSettings message is a collection of fields which comprise an
// offer from the Server to the Agent to use the specified settings for a network
// connection to report own telemetry.
message TelemetryConnectionSettings {
// The value MUST be a full URL an OTLP/HTTP/Protobuf receiver with path. Schema
// SHOULD begin with "https://", for example "https://example.com:4318/v1/metrics"
// The Agent MAY refuse to send the telemetry if the URL begins with "http://".
string destination_endpoint = 1;

// Optional headers to use when connecting. Typically used to set access tokens or
// other authorization headers. For HTTP-based protocols the Agent should
// set these in the request headers.
// For example:
// key="Authorization", Value="Basic YWxhZGRpbjpvcGVuc2VzYW1l".
Headers headers = 2;

// The Agent should use the offered certificate to connect to the destination
// from now on. If the Agent is able to validate and connect using the offered
// certificate the Agent SHOULD forget any previous client certificates
// for this connection.
// This field is optional: if omitted the client SHOULD NOT use a client-side certificate.
// This field can be used to perform a client certificate revocation/rotation.
TLSCertificate certificate = 3;
}

// The OtherConnectionSettings message is a collection of fields which comprise an
// offer from the Server to the Agent to use the specified settings for a network
// connection. It is not required that all fields in this message are specified.
// The Server may specify only some of the fields, in which case it means that
Expand All @@ -194,66 +243,28 @@ enum ServerCapabilities {
// field is not set (this is done to overcome the limitation of old protoc
// compilers don't generate methods that allow to check for the presence of
// the field.
message ConnectionSettings {
message OtherConnectionSettings {
// A URL, host:port or some other destination specifier.
//
// For OpAMP destination this MUST be a WebSocket URL and MUST be non-empty, for
// example: "wss://example.com:4318/v1/opamp"
//
// For own telemetry destination this MUST be the full HTTP URL to an
// OTLP/HTTP/Protobuf receiver. The value MUST be a full URL with path and schema
// and SHOULD begin with "https://", for example "https://example.com:4318/v1/metrics"
// The Agent MAY refuse to send the telemetry if the URL begins with "http://".
// The field is considered unset if (flags & DestinationEndpointSet)==0.
string destination_endpoint = 1;

// Headers to use when connecting. Typically used to set access tokens or
// Optional headers to use when connecting. Typically used to set access tokens or
// other authorization headers. For HTTP-based protocols the Agent should
// set these in the request headers.
// For example:
// key="Authorization", Value="Basic YWxhZGRpbjpvcGVuc2VzYW1l".
// if the field is unset then the Agent SHOULD continue using the headers
// that it currently has (if any).
Headers headers = 2;

// A URL, host:port or some other specifier of an intermediary proxy.
// Empty if no proxy is used.
//
// Example use case: if OpAMP proxy is also an OTLP intermediary Collector then
// the OpAMP proxy can direct the Agents that connect to it to also send Agents's
// OTLP metrics through its OTLP metrics pipeline.
// Can be used for example by Otel Helm chart with 2 stage-collection when Agents
// on K8s nodes are proxied through a standalone Collector.
//
// For example: "https://proxy.example.com:5678"
// The field is considered unset if (flags & ProxyEndpointSet)==0.
string proxy_endpoint = 3;

// Headers to use when connecting to a proxy. For HTTP-based protocols
// the Agent should set these in the request headers.
// If no proxy is used the Headers field must be present and must contain no headers.
// For example:
// key="Proxy-Authorization", value="Basic YWxhZGRpbjpvcGVuc2VzYW1l".
// if the field is unset then the Agent SHOULD continue using the proxy headers
// that it currently has (if any).
Headers proxy_headers = 4;

// The Agent should use the offered certificate to connect to the destination
// from now on. If the Agent is able to validate and connect using the offered
// certificate the Agent SHOULD forget any previous client certificates
// for this connection.
// This field is used to perform a client certificate revocation/rotation.
// if the field is unset then the Agent SHOULD continue using the certificate
// that it currently has (if any).
TLSCertificate certificate = 5;

enum Flags {
_ = 0;
DestinationEndpointSet = 0x01;
ProxyEndpointSet = 0x02;
}
// Bitfield of Flags.
Flags flags = 6;
// This field is optional: if omitted the client SHOULD NOT use a client-side certificate.
// This field can be used to perform a client certificate revocation/rotation.
TLSCertificate certificate = 3;

// Other connection settings. These are Agent-specific and are up to the Agent
// interpret.
map<string, string> other_settings = 4;
}

message Headers {
Expand Down Expand Up @@ -307,7 +318,7 @@ message ConnectionSettingsOffers {
// The Agent MUST verify the offered connection settings by actually connecting
// before accepting the setting to ensure it does not loose access to the OpAMP
// Server due to invalid settings.
ConnectionSettings opamp = 2;
OpAMPConnectionSettings opamp = 2;

// Settings to connect to an OTLP metrics backend to send Agent's own metrics to.
// If this field is not set then the Agent should assume that the settings
Expand All @@ -326,20 +337,20 @@ message ConnectionSettingsOffers {
//
// Process metrics MUST follow the conventions for processes:
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/semantic_conventions/process-metrics.md
ConnectionSettings own_metrics = 3;
TelemetryConnectionSettings own_metrics = 3;

// Similar to own_metrics, but for traces.
ConnectionSettings own_traces = 4;
TelemetryConnectionSettings own_traces = 4;

// Similar to own_metrics, but for logs.
ConnectionSettings own_logs = 5;
TelemetryConnectionSettings own_logs = 5;

// Another set of connection settings, with a string name associated with each.
// How the Agent uses these is Agent-specific. Typically the name represents
// the name of the destination to connect to (as it is known to the Agent).
// If this field is not set then the Agent should assume that the other_connections
// settings are unchanged.
map<string,ConnectionSettings> other_connections = 6;
map<string,OtherConnectionSettings> other_connections = 6;
}

// List of packages that the Server offers to the Agent.
Expand Down
Loading

0 comments on commit 0403a82

Please sign in to comment.