From b3824867f0a912cf3d73a852ddcebd48515e905c Mon Sep 17 00:00:00 2001 From: Paulo Janotti Date: Thu, 15 Apr 2021 16:09:30 -0700 Subject: [PATCH 1/2] Make configsource component experimental --- .../configsource/component.go | 4 ++ config/internal/configsource/manager.go | 15 +++--- config/internal/configsource/manager_test.go | 49 ++++++++++--------- 3 files changed, 37 insertions(+), 31 deletions(-) rename config/{internal => experimental}/configsource/component.go (95%) diff --git a/config/internal/configsource/component.go b/config/experimental/configsource/component.go similarity index 95% rename from config/internal/configsource/component.go rename to config/experimental/configsource/component.go index c65ea9bda5c..671c2f16c4f 100644 --- a/config/internal/configsource/component.go +++ b/config/experimental/configsource/component.go @@ -12,6 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Package configsource is an experimental package that define the interface +// of "configuration sources", e.g.: Vault, ZooKeeper, etcd, and others, that +// can be used to provide secrets or any other configuration data to be used by +// the collector. package configsource import ( diff --git a/config/internal/configsource/manager.go b/config/internal/configsource/manager.go index 9c3a7f27420..f62eb3df56c 100644 --- a/config/internal/configsource/manager.go +++ b/config/internal/configsource/manager.go @@ -27,6 +27,7 @@ import ( "gopkg.in/yaml.v2" "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/config/experimental/configsource" "go.opentelemetry.io/collector/consumer/consumererror" ) @@ -155,10 +156,10 @@ type ( type Manager struct { // configSources is map from ConfigSource names (as defined in the configuration) // and the respective instances. - configSources map[string]ConfigSource + configSources map[string]configsource.ConfigSource // sessions track all the Session objects used to retrieve values to be injected // into the configuration. - sessions map[string]Session + sessions map[string]configsource.Session // watchers keeps track of all WatchForUpdate functions for retrieved values. watchers []func() error // watchersWG is used to ensure that Close waits for all WatchForUpdate calls @@ -180,7 +181,7 @@ func NewManager(_ *config.Parser) (*Manager, error) { return &Manager{ // TODO: Temporarily tests should set their config sources per their needs. - sessions: make(map[string]Session), + sessions: make(map[string]configsource.Session), watchingCh: make(chan struct{}), closeCh: make(chan struct{}), }, nil @@ -227,11 +228,11 @@ func (m *Manager) WatchForUpdate() error { err := watcherFn() switch { - case errors.Is(err, ErrWatcherNotSupported): + case errors.Is(err, configsource.ErrWatcherNotSupported): // The watcher for the retrieved value is not supported, nothing to // do, just exit from the goroutine. return - case errors.Is(err, ErrSessionClosed): + case errors.Is(err, configsource.ErrSessionClosed): // The Session from which this watcher was retrieved is being closed. // There is no error to report, just exit from the goroutine. return @@ -258,7 +259,7 @@ func (m *Manager) WatchForUpdate() error { return err case <-m.closeCh: // This covers the case that all watchers returned ErrWatcherNotSupported. - return ErrSessionClosed + return configsource.ErrSessionClosed } } @@ -325,7 +326,7 @@ func (m *Manager) expandStringValues(ctx context.Context, value interface{}) (in // expandConfigSource retrieve data from the specified config source and injects them into // the configuration. The Manager tracks sessions and watcher objects as needed. -func (m *Manager) expandConfigSource(ctx context.Context, cfgSrc ConfigSource, s string) (interface{}, error) { +func (m *Manager) expandConfigSource(ctx context.Context, cfgSrc configsource.ConfigSource, s string) (interface{}, error) { cfgSrcName, selector, params, err := parseCfgSrc(s) if err != nil { return nil, err diff --git a/config/internal/configsource/manager_test.go b/config/internal/configsource/manager_test.go index 2c1f010f912..9d701eccd56 100644 --- a/config/internal/configsource/manager_test.go +++ b/config/internal/configsource/manager_test.go @@ -26,13 +26,14 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/config/experimental/configsource" ) func TestConfigSourceManager_Simple(t *testing.T) { ctx := context.Background() manager, err := NewManager(nil) require.NoError(t, err) - manager.configSources = map[string]ConfigSource{ + manager.configSources = map[string]configsource.ConfigSource{ "tstcfgsrc": &testConfigSource{ ValueMap: map[string]valueEntry{ "test_selector": {Value: "test_value"}, @@ -69,7 +70,7 @@ func TestConfigSourceManager_Simple(t *testing.T) { manager.WaitForWatcher() assert.NoError(t, manager.Close(ctx)) <-doneCh - assert.ErrorIs(t, errWatcher, ErrSessionClosed) + assert.ErrorIs(t, errWatcher, configsource.ErrSessionClosed) } func TestConfigSourceManager_ResolveErrors(t *testing.T) { @@ -79,14 +80,14 @@ func TestConfigSourceManager_ResolveErrors(t *testing.T) { tests := []struct { name string config map[string]interface{} - configSourceMap map[string]ConfigSource + configSourceMap map[string]configsource.ConfigSource }{ { name: "incorrect_cfgsrc_ref", config: map[string]interface{}{ "cfgsrc": "$tstcfgsrc:selector?{invalid}", }, - configSourceMap: map[string]ConfigSource{ + configSourceMap: map[string]configsource.ConfigSource{ "tstcfgsrc": &testConfigSource{}, }, }, @@ -95,7 +96,7 @@ func TestConfigSourceManager_ResolveErrors(t *testing.T) { config: map[string]interface{}{ "cfgsrc": "$tstcfgsrc:selector", }, - configSourceMap: map[string]ConfigSource{ + configSourceMap: map[string]configsource.ConfigSource{ "tstcfgsrc": &testConfigSource{ErrOnNewSession: testErr}, }, }, @@ -104,7 +105,7 @@ func TestConfigSourceManager_ResolveErrors(t *testing.T) { config: map[string]interface{}{ "cfgsrc": "$tstcfgsrc:selector", }, - configSourceMap: map[string]ConfigSource{ + configSourceMap: map[string]configsource.ConfigSource{ "tstcfgsrc": &testConfigSource{ErrOnRetrieve: testErr}, }, }, @@ -113,7 +114,7 @@ func TestConfigSourceManager_ResolveErrors(t *testing.T) { config: map[string]interface{}{ "cfgsrc": "$tstcfgsrc:selector", }, - configSourceMap: map[string]ConfigSource{ + configSourceMap: map[string]configsource.ConfigSource{ "tstcfgsrc": &testConfigSource{ ErrOnRetrieveEnd: testErr, ValueMap: map[string]valueEntry{ @@ -141,7 +142,7 @@ func TestConfigSourceManager_ArraysAndMaps(t *testing.T) { ctx := context.Background() manager, err := NewManager(nil) require.NoError(t, err) - manager.configSources = map[string]ConfigSource{ + manager.configSources = map[string]configsource.ConfigSource{ "tstcfgsrc": &testConfigSource{ ValueMap: map[string]valueEntry{ "elem0": {Value: "elem0_value"}, @@ -200,7 +201,7 @@ func TestConfigSourceManager_ParamsHandling(t *testing.T) { manager, err := NewManager(nil) require.NoError(t, err) - manager.configSources = map[string]ConfigSource{ + manager.configSources = map[string]configsource.ConfigSource{ "tstcfgsrc": &tstCfgSrc, } @@ -224,7 +225,7 @@ func TestConfigSourceManager_WatchForUpdate(t *testing.T) { require.NoError(t, err) watchForUpdateCh := make(chan error, 1) - manager.configSources = map[string]ConfigSource{ + manager.configSources = map[string]configsource.ConfigSource{ "tstcfgsrc": &testConfigSource{ ValueMap: map[string]valueEntry{ "test_selector": { @@ -255,10 +256,10 @@ func TestConfigSourceManager_WatchForUpdate(t *testing.T) { }() manager.WaitForWatcher() - watchForUpdateCh <- ErrValueUpdated + watchForUpdateCh <- configsource.ErrValueUpdated <-doneCh - assert.ErrorIs(t, errWatcher, ErrValueUpdated) + assert.ErrorIs(t, errWatcher, configsource.ErrValueUpdated) assert.NoError(t, manager.Close(ctx)) } @@ -275,11 +276,11 @@ func TestConfigSourceManager_MultipleWatchForUpdate(t *testing.T) { case errFromWatchForUpdate := <-watchForUpdateCh: return errFromWatchForUpdate case <-watchDoneCh: - return ErrSessionClosed + return configsource.ErrSessionClosed } } - manager.configSources = map[string]ConfigSource{ + manager.configSources = map[string]configsource.ConfigSource{ "tstcfgsrc": &testConfigSource{ ValueMap: map[string]valueEntry{ "test_selector": { @@ -313,11 +314,11 @@ func TestConfigSourceManager_MultipleWatchForUpdate(t *testing.T) { manager.WaitForWatcher() for i := 0; i < watchForUpdateChSize; i++ { - watchForUpdateCh <- ErrValueUpdated + watchForUpdateCh <- configsource.ErrValueUpdated } <-doneCh - assert.ErrorIs(t, errWatcher, ErrValueUpdated) + assert.ErrorIs(t, errWatcher, configsource.ErrValueUpdated) close(watchForUpdateCh) assert.NoError(t, manager.Close(ctx)) } @@ -345,7 +346,7 @@ func TestConfigSourceManager_EnvVarHandling(t *testing.T) { manager, err := NewManager(nil) require.NoError(t, err) - manager.configSources = map[string]ConfigSource{ + manager.configSources = map[string]configsource.ConfigSource{ "tstcfgsrc": &tstCfgSrc, } @@ -367,7 +368,7 @@ func TestManager_expandString(t *testing.T) { ctx := context.Background() csp, err := NewManager(nil) require.NoError(t, err) - csp.configSources = map[string]ConfigSource{ + csp.configSources = map[string]configsource.ConfigSource{ "tstcfgsrc": &testConfigSource{ ValueMap: map[string]valueEntry{ "str_key": {Value: "test_value"}, @@ -575,17 +576,17 @@ type valueEntry struct { WatchForUpdateFn func() error } -var _ (ConfigSource) = (*testConfigSource)(nil) -var _ (Session) = (*testConfigSource)(nil) +var _ (configsource.ConfigSource) = (*testConfigSource)(nil) +var _ (configsource.Session) = (*testConfigSource)(nil) -func (t *testConfigSource) NewSession(context.Context) (Session, error) { +func (t *testConfigSource) NewSession(context.Context) (configsource.Session, error) { if t.ErrOnNewSession != nil { return nil, t.ErrOnNewSession } return t, nil } -func (t *testConfigSource) Retrieve(ctx context.Context, selector string, params interface{}) (Retrieved, error) { +func (t *testConfigSource) Retrieve(ctx context.Context, selector string, params interface{}) (configsource.Retrieved, error) { if t.OnRetrieve != nil { if err := t.OnRetrieve(ctx, selector, params); err != nil { return nil, err @@ -602,7 +603,7 @@ func (t *testConfigSource) Retrieve(ctx context.Context, selector string, params } watchForUpdateFn := func() error { - return ErrWatcherNotSupported + return configsource.ErrWatcherNotSupported } if entry.WatchForUpdateFn != nil { @@ -628,7 +629,7 @@ type retrieved struct { watchForUpdateFn func() error } -var _ (Retrieved) = (*retrieved)(nil) +var _ (configsource.Retrieved) = (*retrieved)(nil) func (r *retrieved) Value() interface{} { return r.value From 8bb48c89a0e406789400c519357044f2b6265346 Mon Sep 17 00:00:00 2001 From: Paulo Janotti Date: Thu, 15 Apr 2021 16:26:36 -0700 Subject: [PATCH 2/2] Improve package description --- config/experimental/configsource/component.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/config/experimental/configsource/component.go b/config/experimental/configsource/component.go index 671c2f16c4f..8159d51e6c0 100644 --- a/config/experimental/configsource/component.go +++ b/config/experimental/configsource/component.go @@ -12,10 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Package configsource is an experimental package that define the interface -// of "configuration sources", e.g.: Vault, ZooKeeper, etcd, and others, that -// can be used to provide secrets or any other configuration data to be used by -// the collector. +// Package configsource is an experimental package that defines the interface of +// "configuration sources," e.g., Vault, ZooKeeper, etcd2, and others. Configuration +// sources retrieve values from their respective storages. A configuration parser/loader +// can inject these values in the configuration data used by the collector. +// ATTENTION: the package is still experimental and subject to changes without advanced notice. package configsource import (