Skip to content

Commit

Permalink
Add options to blacklist/whitelist span tags for indexing in Cassandra (
Browse files Browse the repository at this point in the history
#1904)

* Added configuration and CLI support

Signed-off-by: Joe Elliott <[email protected]>

* Added filters

Signed-off-by: Joe Elliott <[email protected]>

* Create White and Blacklist filters based on options

Signed-off-by: Joe Elliott <[email protected]>

* make fmt

Signed-off-by: Joe Elliott <[email protected]>

* Expanded filter tests

Signed-off-by: Joe Elliott <[email protected]>

* Expanded factory tests

Signed-off-by: Joe Elliott <[email protected]>

* Moved tag filters out of cassandra connection config

Signed-off-by: Joe Elliott <[email protected]>

* Addressed review items

Signed-off-by: Joe Elliott <[email protected]>

* Combined white and blacklist filters

Signed-off-by: Joe Elliott <[email protected]>

* Added exact match tests

Signed-off-by: Joe Elliott <[email protected]>

* processTags => tags

Signed-off-by: Joe Elliott <[email protected]>

* Create exact match tag filters

Signed-off-by: Joe Elliott <[email protected]>

* Better var name

Signed-off-by: Joe Elliott <[email protected]>

* Reverted submodule

Signed-off-by: Joe Elliott <[email protected]>

* Created convenience factory methods for white/blacklist

Signed-off-by: Joe Elliott <[email protected]>

* Hid original factory method

Signed-off-by: Joe Elliott <[email protected]>

* Clarified tests

Signed-off-by: Joe Elliott <[email protected]>

* Fixed submodule?

Signed-off-by: Joe Elliott <[email protected]>

* Removed blank line

Signed-off-by: Joe Elliott <[email protected]>
  • Loading branch information
joe-elliott authored and yurishkuro committed Nov 9, 2019
1 parent 2092718 commit c8f52a9
Show file tree
Hide file tree
Showing 6 changed files with 334 additions and 2 deletions.
31 changes: 29 additions & 2 deletions plugin/storage/cassandra/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package cassandra

import (
"errors"
"flag"

"github.com/spf13/viper"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/jaegertracing/jaeger/pkg/cassandra/config"
cDepStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/dependencystore"
cSpanStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore/dbmodel"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
Expand Down Expand Up @@ -102,7 +104,11 @@ func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {

// CreateSpanWriter implements storage.Factory
func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
return cSpanStore.NewSpanWriter(f.primarySession, f.Options.SpanStoreWriteCacheTTL, f.primaryMetricsFactory, f.logger), nil
options, err := writerOptions(f.Options)
if err != nil {
return nil, err
}
return cSpanStore.NewSpanWriter(f.primarySession, f.Options.SpanStoreWriteCacheTTL, f.primaryMetricsFactory, f.logger, options...), nil
}

// CreateDependencyReader implements storage.Factory
Expand All @@ -124,5 +130,26 @@ func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
if f.archiveSession == nil {
return nil, storage.ErrArchiveStorageNotConfigured
}
return cSpanStore.NewSpanWriter(f.archiveSession, f.Options.SpanStoreWriteCacheTTL, f.archiveMetricsFactory, f.logger), nil
options, err := writerOptions(f.Options)
if err != nil {
return nil, err
}
return cSpanStore.NewSpanWriter(f.archiveSession, f.Options.SpanStoreWriteCacheTTL, f.archiveMetricsFactory, f.logger, options...), nil
}

func writerOptions(opts *Options) ([]cSpanStore.Option, error) {
tagIndexBlacklist := opts.TagIndexBlacklist()
tagIndexWhitelist := opts.TagIndexWhitelist()

if len(tagIndexBlacklist) > 0 && len(tagIndexWhitelist) > 0 {
return nil, errors.New("only one of TagIndexBlacklist and TagIndexWhitelist can be specified")
}

var options []cSpanStore.Option
if len(tagIndexBlacklist) > 0 {
options = append(options, cSpanStore.TagFilter(dbmodel.NewBlacklistFilter(tagIndexBlacklist)))
} else if len(tagIndexWhitelist) > 0 {
options = append(options, cSpanStore.TagFilter(dbmodel.NewWhitelistFilter(tagIndexWhitelist)))
}
return options, nil
}
65 changes: 65 additions & 0 deletions plugin/storage/cassandra/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,68 @@ func TestCassandraFactory(t *testing.T) {
_, err = f.CreateArchiveSpanWriter()
assert.NoError(t, err)
}

func TestExclusiveWhitelistBlacklist(t *testing.T) {
logger, logBuf := testutils.NewLogger()
f := NewFactory()
v, command := config.Viperize(f.AddFlags)
command.ParseFlags([]string{"--cassandra-archive.enabled=true",
"--cassandra.enable-dependencies-v2=true",
"--cassandra.tag-index-whitelist=a,b,c",
"--cassandra.tag-index-blacklist=a,b,c"})
f.InitFromViper(v)

// after InitFromViper, f.primaryConfig points to a real session builder that will fail in unit tests,
// so we override it with a mock.
f.primaryConfig = newMockSessionBuilder(nil, errors.New("made-up error"))
assert.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up error")

var (
session = &mocks.Session{}
query = &mocks.Query{}
)
session.On("Query", mock.AnythingOfType("string"), mock.Anything).Return(query)
query.On("Exec").Return(nil)
f.primaryConfig = newMockSessionBuilder(session, nil)
f.archiveConfig = newMockSessionBuilder(nil, errors.New("made-up error"))
assert.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up error")

f.archiveConfig = nil
assert.NoError(t, f.Initialize(metrics.NullFactory, logger))
assert.Contains(t, logBuf.String(), "Cassandra archive storage configuration is empty, skipping")

_, err := f.CreateSpanWriter()
assert.EqualError(t, err, "only one of TagIndexBlacklist and TagIndexWhitelist can be specified")

f.archiveConfig = &mockSessionBuilder{}
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))

_, err = f.CreateArchiveSpanWriter()
assert.EqualError(t, err, "only one of TagIndexBlacklist and TagIndexWhitelist can be specified")
}

func TestWriterOptions(t *testing.T) {
opts := NewOptions("cassandra")
v, command := config.Viperize(opts.AddFlags)
command.ParseFlags([]string{"--cassandra.tag-index-whitelist=a,b,c"})
opts.InitFromViper(v)

options, _ := writerOptions(opts)
assert.Len(t, options, 1)

opts = NewOptions("cassandra")
v, command = config.Viperize(opts.AddFlags)
command.ParseFlags([]string{"--cassandra.tag-index-blacklist=a,b,c"})
opts.InitFromViper(v)

options, _ = writerOptions(opts)
assert.Len(t, options, 1)

opts = NewOptions("cassandra")
v, command = config.Viperize(opts.AddFlags)
command.ParseFlags([]string{""})
opts.InitFromViper(v)

options, _ = writerOptions(opts)
assert.Len(t, options, 0)
}
33 changes: 33 additions & 0 deletions plugin/storage/cassandra/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ const (

// common storage settings
suffixSpanStoreWriteCacheTTL = ".span-store-write-cache-ttl"
suffixTagIndexBlacklist = ".tag-index-blacklist"
suffixTagIndexWhitelist = ".tag-index-whitelist"
)

// Options contains various type of Cassandra configs and provides the ability
Expand All @@ -62,6 +64,8 @@ type Options struct {
primary *namespaceConfig
others map[string]*namespaceConfig
SpanStoreWriteCacheTTL time.Duration
tagIndexBlacklist string
tagIndexWhitelist string
}

// the Servers field in config.Configuration is a list, which we cannot represent with flags.
Expand Down Expand Up @@ -116,6 +120,15 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) {
flagSet.Duration(opt.primary.namespace+suffixSpanStoreWriteCacheTTL,
opt.SpanStoreWriteCacheTTL,
"The duration to wait before rewriting an existing service or operation name")
flagSet.String(
opt.primary.namespace+suffixTagIndexBlacklist,
opt.tagIndexBlacklist,
"The comma-separated list of span tags to blacklist from being indexed. All other tags will be indexed. Mutually exclusive with the whitelist option.")
flagSet.String(
opt.primary.namespace+suffixTagIndexWhitelist,
opt.tagIndexWhitelist,
"The comma-separated list of span tags to whitelist for being indexed. All other tags will not be indexed. Mutually exclusive with the blacklist option.")

}

func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
Expand Down Expand Up @@ -222,6 +235,8 @@ func (opt *Options) InitFromViper(v *viper.Viper) {
cfg.initFromViper(v)
}
opt.SpanStoreWriteCacheTTL = v.GetDuration(opt.primary.namespace + suffixSpanStoreWriteCacheTTL)
opt.tagIndexBlacklist = stripWhiteSpace(v.GetString(opt.primary.namespace + suffixTagIndexBlacklist))
opt.tagIndexWhitelist = stripWhiteSpace(v.GetString(opt.primary.namespace + suffixTagIndexWhitelist))
}

func (cfg *namespaceConfig) initFromViper(v *viper.Viper) {
Expand Down Expand Up @@ -276,6 +291,24 @@ func (opt *Options) Get(namespace string) *config.Configuration {
return &nsCfg.Configuration
}

// TagIndexBlacklist returns the list of blacklisted tags
func (opt *Options) TagIndexBlacklist() []string {
if len(opt.tagIndexBlacklist) > 0 {
return strings.Split(opt.tagIndexBlacklist, ",")
}

return nil
}

// TagIndexWhitelist returns the list of whitelisted tags
func (opt *Options) TagIndexWhitelist() []string {
if len(opt.tagIndexWhitelist) > 0 {
return strings.Split(opt.tagIndexWhitelist, ",")
}

return nil
}

// stripWhiteSpace removes all whitespace characters from a string
func stripWhiteSpace(str string) string {
return strings.Replace(str, " ", "", -1)
Expand Down
14 changes: 14 additions & 0 deletions plugin/storage/cassandra/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ func TestOptionsWithFlags(t *testing.T) {
"--cas.consistency=ONE",
"--cas.proto-version=3",
"--cas.socket-keep-alive=42s",
"--cas.tag-index-blacklist=blerg, blarg,blorg ",
"--cas.tag-index-whitelist=flerg, flarg,florg ",
// enable aux with a couple overrides
"--cas-aux.enabled=true",
"--cas-aux.keyspace=jaeger-archive",
Expand All @@ -74,6 +76,8 @@ func TestOptionsWithFlags(t *testing.T) {
assert.Equal(t, []string{"1.1.1.1", "2.2.2.2"}, primary.Servers)
assert.Equal(t, "ONE", primary.Consistency)
assert.Equal(t, false, primary.EnableDependenciesV2)
assert.Equal(t, []string{"blerg", "blarg", "blorg"}, opts.TagIndexBlacklist())
assert.Equal(t, []string{"flerg", "flarg", "florg"}, opts.TagIndexWhitelist())

aux := opts.Get("cas-aux")
require.NotNil(t, aux)
Expand All @@ -89,3 +93,13 @@ func TestOptionsWithFlags(t *testing.T) {
assert.Equal(t, 42*time.Second, aux.SocketKeepAlive)
assert.Equal(t, true, aux.EnableDependenciesV2)
}

func TestEmptyBlackWhiteLists(t *testing.T) {
opts := NewOptions("cas")
v, command := config.Viperize(opts.AddFlags)
command.ParseFlags([]string{})
opts.InitFromViper(v)

assert.Len(t, opts.TagIndexBlacklist(), 0)
assert.Len(t, opts.TagIndexWhitelist(), 0)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright (c) 2019 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dbmodel

import (
"github.com/jaegertracing/jaeger/model"
)

// ExactMatchTagFilter filters out all tags in its tags slice
type ExactMatchTagFilter struct {
tags map[string]struct{}
dropMatches bool
}

// newExactMatchTagFilter creates a ExactMatchTagFilter with the provided tags. Passing
// dropMatches true will exhibit blacklist behavior. Passing dropMatches false
// will exhibit whitelist behavior.
func newExactMatchTagFilter(tags []string, dropMatches bool) ExactMatchTagFilter {
mapTags := make(map[string]struct{})
for _, t := range tags {
mapTags[t] = struct{}{}
}
return ExactMatchTagFilter{
tags: mapTags,
dropMatches: dropMatches,
}
}

// NewBlacklistFilter is a convenience method for creating a blacklist ExactMatchTagFilter
func NewBlacklistFilter(tags []string) ExactMatchTagFilter {
return newExactMatchTagFilter(tags, true)
}

// NewWhitelistFilter is a convenience method for creating a whitelist ExactMatchTagFilter
func NewWhitelistFilter(tags []string) ExactMatchTagFilter {
return newExactMatchTagFilter(tags, false)
}

// FilterProcessTags implements TagFilter
func (tf ExactMatchTagFilter) FilterProcessTags(span *model.Span, processTags model.KeyValues) model.KeyValues {
return tf.filter(processTags)
}

// FilterTags implements TagFilter
func (tf ExactMatchTagFilter) FilterTags(span *model.Span, tags model.KeyValues) model.KeyValues {
return tf.filter(tags)
}

// FilterLogFields implements TagFilter
func (tf ExactMatchTagFilter) FilterLogFields(span *model.Span, logFields model.KeyValues) model.KeyValues {
return tf.filter(logFields)
}

func (tf ExactMatchTagFilter) filter(tags model.KeyValues) model.KeyValues {
var filteredTags model.KeyValues
for _, t := range tags {
if _, ok := tf.tags[t.Key]; ok == !tf.dropMatches {
filteredTags = append(filteredTags, t)
}
}
return filteredTags
}
Loading

0 comments on commit c8f52a9

Please sign in to comment.