diff --git a/plugin/storage/cassandra/factory.go b/plugin/storage/cassandra/factory.go index 09d3a10bf73..0c9d5c0071b 100644 --- a/plugin/storage/cassandra/factory.go +++ b/plugin/storage/cassandra/factory.go @@ -16,6 +16,7 @@ package cassandra import ( + "errors" "flag" "github.com/spf13/viper" @@ -26,6 +27,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/cassandra/config" cDepStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/dependencystore" cSpanStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore" + "github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore/dbmodel" "github.com/jaegertracing/jaeger/storage" "github.com/jaegertracing/jaeger/storage/dependencystore" "github.com/jaegertracing/jaeger/storage/spanstore" @@ -102,7 +104,11 @@ func (f *Factory) CreateSpanReader() (spanstore.Reader, error) { // CreateSpanWriter implements storage.Factory func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { - return cSpanStore.NewSpanWriter(f.primarySession, f.Options.SpanStoreWriteCacheTTL, f.primaryMetricsFactory, f.logger), nil + options, err := writerOptions(f.Options) + if err != nil { + return nil, err + } + return cSpanStore.NewSpanWriter(f.primarySession, f.Options.SpanStoreWriteCacheTTL, f.primaryMetricsFactory, f.logger, options...), nil } // CreateDependencyReader implements storage.Factory @@ -124,5 +130,26 @@ func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) { if f.archiveSession == nil { return nil, storage.ErrArchiveStorageNotConfigured } - return cSpanStore.NewSpanWriter(f.archiveSession, f.Options.SpanStoreWriteCacheTTL, f.archiveMetricsFactory, f.logger), nil + options, err := writerOptions(f.Options) + if err != nil { + return nil, err + } + return cSpanStore.NewSpanWriter(f.archiveSession, f.Options.SpanStoreWriteCacheTTL, f.archiveMetricsFactory, f.logger, options...), nil +} + +func writerOptions(opts *Options) ([]cSpanStore.Option, error) { + tagIndexBlacklist := opts.TagIndexBlacklist() + tagIndexWhitelist := opts.TagIndexWhitelist() + + if len(tagIndexBlacklist) > 0 && len(tagIndexWhitelist) > 0 { + return nil, errors.New("only one of TagIndexBlacklist and TagIndexWhitelist can be specified") + } + + var options []cSpanStore.Option + if len(tagIndexBlacklist) > 0 { + options = append(options, cSpanStore.TagFilter(dbmodel.NewBlacklistFilter(tagIndexBlacklist))) + } else if len(tagIndexWhitelist) > 0 { + options = append(options, cSpanStore.TagFilter(dbmodel.NewWhitelistFilter(tagIndexWhitelist))) + } + return options, nil } diff --git a/plugin/storage/cassandra/factory_test.go b/plugin/storage/cassandra/factory_test.go index d301cc31d3a..a031334fa66 100644 --- a/plugin/storage/cassandra/factory_test.go +++ b/plugin/storage/cassandra/factory_test.go @@ -100,3 +100,68 @@ func TestCassandraFactory(t *testing.T) { _, err = f.CreateArchiveSpanWriter() assert.NoError(t, err) } + +func TestExclusiveWhitelistBlacklist(t *testing.T) { + logger, logBuf := testutils.NewLogger() + f := NewFactory() + v, command := config.Viperize(f.AddFlags) + command.ParseFlags([]string{"--cassandra-archive.enabled=true", + "--cassandra.enable-dependencies-v2=true", + "--cassandra.tag-index-whitelist=a,b,c", + "--cassandra.tag-index-blacklist=a,b,c"}) + f.InitFromViper(v) + + // after InitFromViper, f.primaryConfig points to a real session builder that will fail in unit tests, + // so we override it with a mock. + f.primaryConfig = newMockSessionBuilder(nil, errors.New("made-up error")) + assert.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up error") + + var ( + session = &mocks.Session{} + query = &mocks.Query{} + ) + session.On("Query", mock.AnythingOfType("string"), mock.Anything).Return(query) + query.On("Exec").Return(nil) + f.primaryConfig = newMockSessionBuilder(session, nil) + f.archiveConfig = newMockSessionBuilder(nil, errors.New("made-up error")) + assert.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up error") + + f.archiveConfig = nil + assert.NoError(t, f.Initialize(metrics.NullFactory, logger)) + assert.Contains(t, logBuf.String(), "Cassandra archive storage configuration is empty, skipping") + + _, err := f.CreateSpanWriter() + assert.EqualError(t, err, "only one of TagIndexBlacklist and TagIndexWhitelist can be specified") + + f.archiveConfig = &mockSessionBuilder{} + assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) + + _, err = f.CreateArchiveSpanWriter() + assert.EqualError(t, err, "only one of TagIndexBlacklist and TagIndexWhitelist can be specified") +} + +func TestWriterOptions(t *testing.T) { + opts := NewOptions("cassandra") + v, command := config.Viperize(opts.AddFlags) + command.ParseFlags([]string{"--cassandra.tag-index-whitelist=a,b,c"}) + opts.InitFromViper(v) + + options, _ := writerOptions(opts) + assert.Len(t, options, 1) + + opts = NewOptions("cassandra") + v, command = config.Viperize(opts.AddFlags) + command.ParseFlags([]string{"--cassandra.tag-index-blacklist=a,b,c"}) + opts.InitFromViper(v) + + options, _ = writerOptions(opts) + assert.Len(t, options, 1) + + opts = NewOptions("cassandra") + v, command = config.Viperize(opts.AddFlags) + command.ParseFlags([]string{""}) + opts.InitFromViper(v) + + options, _ = writerOptions(opts) + assert.Len(t, options, 0) +} diff --git a/plugin/storage/cassandra/options.go b/plugin/storage/cassandra/options.go index fcf5383c568..9207c774b40 100644 --- a/plugin/storage/cassandra/options.go +++ b/plugin/storage/cassandra/options.go @@ -53,6 +53,8 @@ const ( // common storage settings suffixSpanStoreWriteCacheTTL = ".span-store-write-cache-ttl" + suffixTagIndexBlacklist = ".tag-index-blacklist" + suffixTagIndexWhitelist = ".tag-index-whitelist" ) // Options contains various type of Cassandra configs and provides the ability @@ -62,6 +64,8 @@ type Options struct { primary *namespaceConfig others map[string]*namespaceConfig SpanStoreWriteCacheTTL time.Duration + tagIndexBlacklist string + tagIndexWhitelist string } // the Servers field in config.Configuration is a list, which we cannot represent with flags. @@ -116,6 +120,15 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) { flagSet.Duration(opt.primary.namespace+suffixSpanStoreWriteCacheTTL, opt.SpanStoreWriteCacheTTL, "The duration to wait before rewriting an existing service or operation name") + flagSet.String( + opt.primary.namespace+suffixTagIndexBlacklist, + opt.tagIndexBlacklist, + "The comma-separated list of span tags to blacklist from being indexed. All other tags will be indexed. Mutually exclusive with the whitelist option.") + flagSet.String( + opt.primary.namespace+suffixTagIndexWhitelist, + opt.tagIndexWhitelist, + "The comma-separated list of span tags to whitelist for being indexed. All other tags will not be indexed. Mutually exclusive with the blacklist option.") + } func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { @@ -222,6 +235,8 @@ func (opt *Options) InitFromViper(v *viper.Viper) { cfg.initFromViper(v) } opt.SpanStoreWriteCacheTTL = v.GetDuration(opt.primary.namespace + suffixSpanStoreWriteCacheTTL) + opt.tagIndexBlacklist = stripWhiteSpace(v.GetString(opt.primary.namespace + suffixTagIndexBlacklist)) + opt.tagIndexWhitelist = stripWhiteSpace(v.GetString(opt.primary.namespace + suffixTagIndexWhitelist)) } func (cfg *namespaceConfig) initFromViper(v *viper.Viper) { @@ -276,6 +291,24 @@ func (opt *Options) Get(namespace string) *config.Configuration { return &nsCfg.Configuration } +// TagIndexBlacklist returns the list of blacklisted tags +func (opt *Options) TagIndexBlacklist() []string { + if len(opt.tagIndexBlacklist) > 0 { + return strings.Split(opt.tagIndexBlacklist, ",") + } + + return nil +} + +// TagIndexWhitelist returns the list of whitelisted tags +func (opt *Options) TagIndexWhitelist() []string { + if len(opt.tagIndexWhitelist) > 0 { + return strings.Split(opt.tagIndexWhitelist, ",") + } + + return nil +} + // stripWhiteSpace removes all whitespace characters from a string func stripWhiteSpace(str string) string { return strings.Replace(str, " ", "", -1) diff --git a/plugin/storage/cassandra/options_test.go b/plugin/storage/cassandra/options_test.go index aca6b020146..ffcdac1e87c 100644 --- a/plugin/storage/cassandra/options_test.go +++ b/plugin/storage/cassandra/options_test.go @@ -60,6 +60,8 @@ func TestOptionsWithFlags(t *testing.T) { "--cas.consistency=ONE", "--cas.proto-version=3", "--cas.socket-keep-alive=42s", + "--cas.tag-index-blacklist=blerg, blarg,blorg ", + "--cas.tag-index-whitelist=flerg, flarg,florg ", // enable aux with a couple overrides "--cas-aux.enabled=true", "--cas-aux.keyspace=jaeger-archive", @@ -74,6 +76,8 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, []string{"1.1.1.1", "2.2.2.2"}, primary.Servers) assert.Equal(t, "ONE", primary.Consistency) assert.Equal(t, false, primary.EnableDependenciesV2) + assert.Equal(t, []string{"blerg", "blarg", "blorg"}, opts.TagIndexBlacklist()) + assert.Equal(t, []string{"flerg", "flarg", "florg"}, opts.TagIndexWhitelist()) aux := opts.Get("cas-aux") require.NotNil(t, aux) @@ -89,3 +93,13 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, 42*time.Second, aux.SocketKeepAlive) assert.Equal(t, true, aux.EnableDependenciesV2) } + +func TestEmptyBlackWhiteLists(t *testing.T) { + opts := NewOptions("cas") + v, command := config.Viperize(opts.AddFlags) + command.ParseFlags([]string{}) + opts.InitFromViper(v) + + assert.Len(t, opts.TagIndexBlacklist(), 0) + assert.Len(t, opts.TagIndexWhitelist(), 0) +} diff --git a/plugin/storage/cassandra/spanstore/dbmodel/tag_filter_exact_match.go b/plugin/storage/cassandra/spanstore/dbmodel/tag_filter_exact_match.go new file mode 100644 index 00000000000..51917a9bff4 --- /dev/null +++ b/plugin/storage/cassandra/spanstore/dbmodel/tag_filter_exact_match.go @@ -0,0 +1,74 @@ +// Copyright (c) 2019 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dbmodel + +import ( + "github.com/jaegertracing/jaeger/model" +) + +// ExactMatchTagFilter filters out all tags in its tags slice +type ExactMatchTagFilter struct { + tags map[string]struct{} + dropMatches bool +} + +// newExactMatchTagFilter creates a ExactMatchTagFilter with the provided tags. Passing +// dropMatches true will exhibit blacklist behavior. Passing dropMatches false +// will exhibit whitelist behavior. +func newExactMatchTagFilter(tags []string, dropMatches bool) ExactMatchTagFilter { + mapTags := make(map[string]struct{}) + for _, t := range tags { + mapTags[t] = struct{}{} + } + return ExactMatchTagFilter{ + tags: mapTags, + dropMatches: dropMatches, + } +} + +// NewBlacklistFilter is a convenience method for creating a blacklist ExactMatchTagFilter +func NewBlacklistFilter(tags []string) ExactMatchTagFilter { + return newExactMatchTagFilter(tags, true) +} + +// NewWhitelistFilter is a convenience method for creating a whitelist ExactMatchTagFilter +func NewWhitelistFilter(tags []string) ExactMatchTagFilter { + return newExactMatchTagFilter(tags, false) +} + +// FilterProcessTags implements TagFilter +func (tf ExactMatchTagFilter) FilterProcessTags(span *model.Span, processTags model.KeyValues) model.KeyValues { + return tf.filter(processTags) +} + +// FilterTags implements TagFilter +func (tf ExactMatchTagFilter) FilterTags(span *model.Span, tags model.KeyValues) model.KeyValues { + return tf.filter(tags) +} + +// FilterLogFields implements TagFilter +func (tf ExactMatchTagFilter) FilterLogFields(span *model.Span, logFields model.KeyValues) model.KeyValues { + return tf.filter(logFields) +} + +func (tf ExactMatchTagFilter) filter(tags model.KeyValues) model.KeyValues { + var filteredTags model.KeyValues + for _, t := range tags { + if _, ok := tf.tags[t.Key]; ok == !tf.dropMatches { + filteredTags = append(filteredTags, t) + } + } + return filteredTags +} diff --git a/plugin/storage/cassandra/spanstore/dbmodel/tag_filter_exact_match_test.go b/plugin/storage/cassandra/spanstore/dbmodel/tag_filter_exact_match_test.go new file mode 100644 index 00000000000..c90d38f56c1 --- /dev/null +++ b/plugin/storage/cassandra/spanstore/dbmodel/tag_filter_exact_match_test.go @@ -0,0 +1,119 @@ +// Copyright (c) 2019 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dbmodel + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/jaegertracing/jaeger/model" +) + +func TestBlacklistFilter(t *testing.T) { + tt := []struct { + input []string + filter []string + expected []string + }{ + { + input: []string{"a", "b", "c"}, + filter: []string{"a"}, + expected: []string{"b", "c"}, + }, + { + input: []string{"a", "b", "c"}, + filter: []string{"A"}, + expected: []string{"a", "b", "c"}, + }, + } + + for _, test := range tt { + var inputKVs model.KeyValues + for _, i := range test.input { + inputKVs = append(inputKVs, model.String(i, "")) + } + var expectedKVs model.KeyValues + for _, e := range test.expected { + expectedKVs = append(expectedKVs, model.String(e, "")) + } + expectedKVs.Sort() + + tf := NewBlacklistFilter(test.filter) + actualKVs := tf.filter(inputKVs) + actualKVs.Sort() + assert.Equal(t, actualKVs, expectedKVs) + + actualKVs = tf.FilterLogFields(nil, inputKVs) + actualKVs.Sort() + assert.Equal(t, actualKVs, expectedKVs) + + actualKVs = tf.FilterProcessTags(nil, inputKVs) + actualKVs.Sort() + assert.Equal(t, actualKVs, expectedKVs) + + actualKVs = tf.FilterTags(nil, inputKVs) + actualKVs.Sort() + assert.Equal(t, actualKVs, expectedKVs) + } +} + +func TestWhitelistFilter(t *testing.T) { + tt := []struct { + input []string + filter []string + expected []string + }{ + { + input: []string{"a", "b", "c"}, + filter: []string{"a"}, + expected: []string{"a"}, + }, + { + input: []string{"a", "b", "c"}, + filter: []string{"A"}, + expected: []string{}, + }, + } + + for _, test := range tt { + var inputKVs model.KeyValues + for _, i := range test.input { + inputKVs = append(inputKVs, model.String(i, "")) + } + var expectedKVs model.KeyValues + for _, e := range test.expected { + expectedKVs = append(expectedKVs, model.String(e, "")) + } + expectedKVs.Sort() + + tf := NewWhitelistFilter(test.filter) + actualKVs := tf.filter(inputKVs) + actualKVs.Sort() + assert.Equal(t, actualKVs, expectedKVs) + + actualKVs = tf.FilterLogFields(nil, inputKVs) + actualKVs.Sort() + assert.Equal(t, actualKVs, expectedKVs) + + actualKVs = tf.FilterProcessTags(nil, inputKVs) + actualKVs.Sort() + assert.Equal(t, actualKVs, expectedKVs) + + actualKVs = tf.FilterTags(nil, inputKVs) + actualKVs.Sort() + assert.Equal(t, actualKVs, expectedKVs) + } +}