Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make configsource component visible #2948

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Package configsource is an experimental package that defines the interface of
// "configuration sources," e.g., Vault, ZooKeeper, etcd2, and others. Configuration
// sources retrieve values from their respective storages. A configuration parser/loader
// can inject these values in the configuration data used by the collector.
// ATTENTION: the package is still experimental and subject to changes without advanced notice.
package configsource

import (
Expand Down
15 changes: 8 additions & 7 deletions config/internal/configsource/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"gopkg.in/yaml.v2"

"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/experimental/configsource"
"go.opentelemetry.io/collector/consumer/consumererror"
)

Expand Down Expand Up @@ -155,10 +156,10 @@ type (
type Manager struct {
// configSources is map from ConfigSource names (as defined in the configuration)
// and the respective instances.
configSources map[string]ConfigSource
configSources map[string]configsource.ConfigSource
// sessions track all the Session objects used to retrieve values to be injected
// into the configuration.
sessions map[string]Session
sessions map[string]configsource.Session
// watchers keeps track of all WatchForUpdate functions for retrieved values.
watchers []func() error
// watchersWG is used to ensure that Close waits for all WatchForUpdate calls
Expand All @@ -180,7 +181,7 @@ func NewManager(_ *config.Parser) (*Manager, error) {

return &Manager{
// TODO: Temporarily tests should set their config sources per their needs.
sessions: make(map[string]Session),
sessions: make(map[string]configsource.Session),
watchingCh: make(chan struct{}),
closeCh: make(chan struct{}),
}, nil
Expand Down Expand Up @@ -227,11 +228,11 @@ func (m *Manager) WatchForUpdate() error {

err := watcherFn()
switch {
case errors.Is(err, ErrWatcherNotSupported):
case errors.Is(err, configsource.ErrWatcherNotSupported):
// The watcher for the retrieved value is not supported, nothing to
// do, just exit from the goroutine.
return
case errors.Is(err, ErrSessionClosed):
case errors.Is(err, configsource.ErrSessionClosed):
// The Session from which this watcher was retrieved is being closed.
// There is no error to report, just exit from the goroutine.
return
Expand All @@ -258,7 +259,7 @@ func (m *Manager) WatchForUpdate() error {
return err
case <-m.closeCh:
// This covers the case that all watchers returned ErrWatcherNotSupported.
return ErrSessionClosed
return configsource.ErrSessionClosed
}
}

Expand Down Expand Up @@ -325,7 +326,7 @@ func (m *Manager) expandStringValues(ctx context.Context, value interface{}) (in

// expandConfigSource retrieve data from the specified config source and injects them into
// the configuration. The Manager tracks sessions and watcher objects as needed.
func (m *Manager) expandConfigSource(ctx context.Context, cfgSrc ConfigSource, s string) (interface{}, error) {
func (m *Manager) expandConfigSource(ctx context.Context, cfgSrc configsource.ConfigSource, s string) (interface{}, error) {
cfgSrcName, selector, params, err := parseCfgSrc(s)
if err != nil {
return nil, err
Expand Down
49 changes: 25 additions & 24 deletions config/internal/configsource/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ import (
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/experimental/configsource"
)

func TestConfigSourceManager_Simple(t *testing.T) {
ctx := context.Background()
manager, err := NewManager(nil)
require.NoError(t, err)
manager.configSources = map[string]ConfigSource{
manager.configSources = map[string]configsource.ConfigSource{
"tstcfgsrc": &testConfigSource{
ValueMap: map[string]valueEntry{
"test_selector": {Value: "test_value"},
Expand Down Expand Up @@ -69,7 +70,7 @@ func TestConfigSourceManager_Simple(t *testing.T) {
manager.WaitForWatcher()
assert.NoError(t, manager.Close(ctx))
<-doneCh
assert.ErrorIs(t, errWatcher, ErrSessionClosed)
assert.ErrorIs(t, errWatcher, configsource.ErrSessionClosed)
}

func TestConfigSourceManager_ResolveErrors(t *testing.T) {
Expand All @@ -79,14 +80,14 @@ func TestConfigSourceManager_ResolveErrors(t *testing.T) {
tests := []struct {
name string
config map[string]interface{}
configSourceMap map[string]ConfigSource
configSourceMap map[string]configsource.ConfigSource
}{
{
name: "incorrect_cfgsrc_ref",
config: map[string]interface{}{
"cfgsrc": "$tstcfgsrc:selector?{invalid}",
},
configSourceMap: map[string]ConfigSource{
configSourceMap: map[string]configsource.ConfigSource{
"tstcfgsrc": &testConfigSource{},
},
},
Expand All @@ -95,7 +96,7 @@ func TestConfigSourceManager_ResolveErrors(t *testing.T) {
config: map[string]interface{}{
"cfgsrc": "$tstcfgsrc:selector",
},
configSourceMap: map[string]ConfigSource{
configSourceMap: map[string]configsource.ConfigSource{
"tstcfgsrc": &testConfigSource{ErrOnNewSession: testErr},
},
},
Expand All @@ -104,7 +105,7 @@ func TestConfigSourceManager_ResolveErrors(t *testing.T) {
config: map[string]interface{}{
"cfgsrc": "$tstcfgsrc:selector",
},
configSourceMap: map[string]ConfigSource{
configSourceMap: map[string]configsource.ConfigSource{
"tstcfgsrc": &testConfigSource{ErrOnRetrieve: testErr},
},
},
Expand All @@ -113,7 +114,7 @@ func TestConfigSourceManager_ResolveErrors(t *testing.T) {
config: map[string]interface{}{
"cfgsrc": "$tstcfgsrc:selector",
},
configSourceMap: map[string]ConfigSource{
configSourceMap: map[string]configsource.ConfigSource{
"tstcfgsrc": &testConfigSource{
ErrOnRetrieveEnd: testErr,
ValueMap: map[string]valueEntry{
Expand Down Expand Up @@ -141,7 +142,7 @@ func TestConfigSourceManager_ArraysAndMaps(t *testing.T) {
ctx := context.Background()
manager, err := NewManager(nil)
require.NoError(t, err)
manager.configSources = map[string]ConfigSource{
manager.configSources = map[string]configsource.ConfigSource{
"tstcfgsrc": &testConfigSource{
ValueMap: map[string]valueEntry{
"elem0": {Value: "elem0_value"},
Expand Down Expand Up @@ -200,7 +201,7 @@ func TestConfigSourceManager_ParamsHandling(t *testing.T) {

manager, err := NewManager(nil)
require.NoError(t, err)
manager.configSources = map[string]ConfigSource{
manager.configSources = map[string]configsource.ConfigSource{
"tstcfgsrc": &tstCfgSrc,
}

Expand All @@ -224,7 +225,7 @@ func TestConfigSourceManager_WatchForUpdate(t *testing.T) {
require.NoError(t, err)

watchForUpdateCh := make(chan error, 1)
manager.configSources = map[string]ConfigSource{
manager.configSources = map[string]configsource.ConfigSource{
"tstcfgsrc": &testConfigSource{
ValueMap: map[string]valueEntry{
"test_selector": {
Expand Down Expand Up @@ -255,10 +256,10 @@ func TestConfigSourceManager_WatchForUpdate(t *testing.T) {
}()

manager.WaitForWatcher()
watchForUpdateCh <- ErrValueUpdated
watchForUpdateCh <- configsource.ErrValueUpdated

<-doneCh
assert.ErrorIs(t, errWatcher, ErrValueUpdated)
assert.ErrorIs(t, errWatcher, configsource.ErrValueUpdated)
assert.NoError(t, manager.Close(ctx))
}

Expand All @@ -275,11 +276,11 @@ func TestConfigSourceManager_MultipleWatchForUpdate(t *testing.T) {
case errFromWatchForUpdate := <-watchForUpdateCh:
return errFromWatchForUpdate
case <-watchDoneCh:
return ErrSessionClosed
return configsource.ErrSessionClosed
}
}

manager.configSources = map[string]ConfigSource{
manager.configSources = map[string]configsource.ConfigSource{
"tstcfgsrc": &testConfigSource{
ValueMap: map[string]valueEntry{
"test_selector": {
Expand Down Expand Up @@ -313,11 +314,11 @@ func TestConfigSourceManager_MultipleWatchForUpdate(t *testing.T) {
manager.WaitForWatcher()

for i := 0; i < watchForUpdateChSize; i++ {
watchForUpdateCh <- ErrValueUpdated
watchForUpdateCh <- configsource.ErrValueUpdated
}

<-doneCh
assert.ErrorIs(t, errWatcher, ErrValueUpdated)
assert.ErrorIs(t, errWatcher, configsource.ErrValueUpdated)
close(watchForUpdateCh)
assert.NoError(t, manager.Close(ctx))
}
Expand Down Expand Up @@ -345,7 +346,7 @@ func TestConfigSourceManager_EnvVarHandling(t *testing.T) {

manager, err := NewManager(nil)
require.NoError(t, err)
manager.configSources = map[string]ConfigSource{
manager.configSources = map[string]configsource.ConfigSource{
"tstcfgsrc": &tstCfgSrc,
}

Expand All @@ -367,7 +368,7 @@ func TestManager_expandString(t *testing.T) {
ctx := context.Background()
csp, err := NewManager(nil)
require.NoError(t, err)
csp.configSources = map[string]ConfigSource{
csp.configSources = map[string]configsource.ConfigSource{
"tstcfgsrc": &testConfigSource{
ValueMap: map[string]valueEntry{
"str_key": {Value: "test_value"},
Expand Down Expand Up @@ -575,17 +576,17 @@ type valueEntry struct {
WatchForUpdateFn func() error
}

var _ (ConfigSource) = (*testConfigSource)(nil)
var _ (Session) = (*testConfigSource)(nil)
var _ (configsource.ConfigSource) = (*testConfigSource)(nil)
var _ (configsource.Session) = (*testConfigSource)(nil)

func (t *testConfigSource) NewSession(context.Context) (Session, error) {
func (t *testConfigSource) NewSession(context.Context) (configsource.Session, error) {
if t.ErrOnNewSession != nil {
return nil, t.ErrOnNewSession
}
return t, nil
}

func (t *testConfigSource) Retrieve(ctx context.Context, selector string, params interface{}) (Retrieved, error) {
func (t *testConfigSource) Retrieve(ctx context.Context, selector string, params interface{}) (configsource.Retrieved, error) {
if t.OnRetrieve != nil {
if err := t.OnRetrieve(ctx, selector, params); err != nil {
return nil, err
Expand All @@ -602,7 +603,7 @@ func (t *testConfigSource) Retrieve(ctx context.Context, selector string, params
}

watchForUpdateFn := func() error {
return ErrWatcherNotSupported
return configsource.ErrWatcherNotSupported
}

if entry.WatchForUpdateFn != nil {
Expand All @@ -628,7 +629,7 @@ type retrieved struct {
watchForUpdateFn func() error
}

var _ (Retrieved) = (*retrieved)(nil)
var _ (configsource.Retrieved) = (*retrieved)(nil)

func (r *retrieved) Value() interface{} {
return r.value
Expand Down