From 5595bcbb15aa8832920d51fa59ef49dbcfae459e Mon Sep 17 00:00:00 2001 From: Stephen Akiki Date: Tue, 13 Sep 2022 09:22:56 -0400 Subject: [PATCH 01/17] Initial Filter Processor Resolves #1100 Adds dynamic filter processor and initial tests --- processors/config.go | 15 + .../filterprocessor/filter_processor.go | 305 ++++++++++++++++++ .../filterprocessor/filter_processor_test.go | 101 ++++++ 3 files changed, 421 insertions(+) create mode 100644 processors/filterprocessor/filter_processor.go create mode 100644 processors/filterprocessor/filter_processor_test.go diff --git a/processors/config.go b/processors/config.go index 3aae63221..950d6bc9c 100644 --- a/processors/config.go +++ b/processors/config.go @@ -10,3 +10,18 @@ type BlockProcessorConfig struct { AlgodToken string `yaml:"algod-token"` AlgodAddr string `yaml:"algod-addr"` } + +// FilterProcessorSubConfig is the configuration needed for each additional filter +type FilterProcessorSubConfig struct { + // The tag of the struct to analyze + FilterTag string `yaml:"tag"` + // The type of expression to search for "const" or "regex" + ExpressionType string `yaml:"expression-type"` + // The expression to search + Expression string `yaml:"expression"` +} + +// FilterProcessorConfig configuration for the filter processor +type FilterProcessorConfig struct { + Filters []map[string][]FilterProcessorSubConfig `yaml:"filters"` +} diff --git a/processors/filterprocessor/filter_processor.go b/processors/filterprocessor/filter_processor.go new file mode 100644 index 000000000..d01fe329f --- /dev/null +++ b/processors/filterprocessor/filter_processor.go @@ -0,0 +1,305 @@ +package filterprocessor + +import ( + "context" + "fmt" + "github.com/algorand/go-algorand/data/transactions" + "gopkg.in/yaml.v3" + "reflect" + "regexp" + "strings" + + log "github.com/sirupsen/logrus" + + "github.com/algorand/indexer/data" + "github.com/algorand/indexer/plugins" + "github.com/algorand/indexer/processors" +) + +const implementationName = "filter_processor" + +// package-wide init function +func init() { + processors.RegisterProcessor(implementationName, &Constructor{}) +} + +// Constructor is the ProcessorConstructor implementation for the "address_filter" processor +type Constructor struct{} + +// New initializes a blockProcessorConstructor +func (c *Constructor) New() processors.Processor { + return &FilterProcessor{} +} + +type expression interface { + Search(input interface{}) bool +} + +type regexExpression struct { + Regex *regexp.Regexp +} + +func (e regexExpression) Search(input interface{}) bool { + return e.Regex.MatchString(input.(string)) +} + +const constantExpressionStr = "const" +const regexExpressionStr = "regex" + +func makeExpression(expressionType string, expressionSearchStr string) (*expression, error) { + switch expressionType { + case constantExpressionStr: + { + r, err := regexp.Compile("^" + expressionSearchStr + "$") + if err != nil { + return nil, err + } + + var exp expression = regexExpression{Regex: r} + return &exp, nil + } + case regexExpressionStr: + { + r, err := regexp.Compile(expressionSearchStr) + if err != nil { + return nil, err + } + + var exp expression = regexExpression{Regex: r} + return &exp, nil + } + default: + return nil, fmt.Errorf("unknown expression type: %s", expressionType) + } +} + +type fieldOperation string + +const someFieldOperation fieldOperation = "some" +const allFieldOperation fieldOperation = "all" + +type fieldSearcher struct { + Exp *expression + Tag string + MethodToCall string +} + +// Search returns true if block contains the expression +func (f fieldSearcher) Search(input transactions.SignedTxnInBlock) bool { + + e := reflect.ValueOf(&input).Elem() + + var field string + + for _, field = range strings.Split(f.Tag, ".") { + e = e.FieldByName(field) + } + + toSearch := e.MethodByName(f.MethodToCall).Call([]reflect.Value{})[0].Interface() + + if (*f.Exp).Search(toSearch) { + return true + } + + return false +} + +// This maps the expression-type with the needed function for the expression. +// For instance the const or regex expression-type might need the String() function +// Can't make this consts because there are no constant maps in go... +var expressionTypeToFunctionMap = map[string]string{ + constantExpressionStr: "String", + regexExpressionStr: "String", +} + +// checks that the supplied tag exists in the struct and recovers from any panics +func checkTagExistsAndHasCorrectFunction(expressionType string, tag string) (outError error) { + var field string + defer func() { + if r := recover(); r != nil { + outError = fmt.Errorf("error occured regarding tag %s. last searched field was: %s - %v", tag, field, r) + } + }() + + e := reflect.ValueOf(&transactions.SignedTxnInBlock{}).Elem() + + for _, field = range strings.Split(tag, ".") { + e = e.FieldByName(field) + if !e.IsValid() { + return fmt.Errorf("%s does not exist in transactions.SignedTxnInBlock struct. last searched field was: %s", tag, field) + } + } + + method, ok := expressionTypeToFunctionMap[expressionType] + + if !ok { + return fmt.Errorf("expression type (%s) is not supported. tag value: %s", expressionType, tag) + } + + if !e.MethodByName(method).IsValid() { + return fmt.Errorf("variable referenced by tag %s does not contain the needed method: %s", tag, method) + } + + return nil +} + +// makeFieldSearcher will check that the field exists and that it contains the necessary "conversion" function +func makeFieldSearcher(e *expression, expressionType string, tag string) (*fieldSearcher, error) { + + if err := checkTagExistsAndHasCorrectFunction(expressionType, tag); err != nil { + return nil, err + } + + return &fieldSearcher{Exp: e, Tag: tag, MethodToCall: expressionTypeToFunctionMap[expressionType]}, nil +} + +type fieldFilter struct { + Op fieldOperation + Searchers []*fieldSearcher +} + +func (f fieldFilter) SearchAndFilter(input data.BlockData) (data.BlockData, error) { + + var newPayset []transactions.SignedTxnInBlock + switch f.Op { + case someFieldOperation: + for _, txn := range input.Payset { + for _, fs := range f.Searchers { + if fs.Search(txn) { + newPayset = append(newPayset, txn) + break + } + } + } + + break + case allFieldOperation: + for _, txn := range input.Payset { + + allTrue := true + for _, fs := range f.Searchers { + if !fs.Search(txn) { + allTrue = false + break + } + } + + if allTrue { + newPayset = append(newPayset, txn) + } + + } + break + default: + return data.BlockData{}, fmt.Errorf("unknown operation: %s", f.Op) + } + + input.Payset = newPayset + + return input, nil + +} + +// FilterProcessor filters transactions by a variety of means +type FilterProcessor struct { + FieldFilters []fieldFilter + + logger *log.Logger + cfg plugins.PluginConfig + ctx context.Context +} + +// Metadata returns metadata +func (a *FilterProcessor) Metadata() processors.ProcessorMetadata { + return processors.MakeProcessorMetadata(implementationName, "FilterProcessor Filter Processor", false) +} + +// Config returns the config +func (a *FilterProcessor) Config() plugins.PluginConfig { + return a.cfg +} + +// Init initializes the filter processor +func (a *FilterProcessor) Init(ctx context.Context, _ data.InitProvider, cfg plugins.PluginConfig, logger *log.Logger) error { + a.logger = logger + a.cfg = cfg + a.ctx = ctx + + // First get the configuration from the string + pCfg := processors.FilterProcessorConfig{} + + err := yaml.Unmarshal([]byte(cfg), &pCfg) + if err != nil { + return fmt.Errorf("filter processor init error: %w", err) + } + + // configMaps is the "- some: ...." portion of the filter config + for _, configMaps := range pCfg.Filters { + + // We only want one key in the map (i.e. either "some" or "all"). The reason we use a list is that want + // to maintain ordering of the filters and a straight up map doesn't do that. + if len(configMaps) != 1 { + return fmt.Errorf("filter processor Init(): illegal filter tag formation. tag length was: %d", len(configMaps)) + } + + for key, subConfigs := range configMaps { + + if key != string(someFieldOperation) && key != string(allFieldOperation) { + return fmt.Errorf("filter processor Init(): filter key was not a valid value: %s", key) + } + + var searcherList []*fieldSearcher + + for _, subConfig := range subConfigs { + + exp, err := makeExpression(subConfig.ExpressionType, subConfig.Expression) + if err != nil { + return fmt.Errorf("filter processor Init(): could not make expression with string %s for filter tag %s - %w", subConfig.Expression, subConfig.FilterTag, err) + } + + searcher, err := makeFieldSearcher(exp, subConfig.ExpressionType, subConfig.FilterTag) + if err != nil { + return fmt.Errorf("filter processor Init(): %w", err) + } + + searcherList = append(searcherList, searcher) + } + + ff := fieldFilter{ + Op: fieldOperation(key), + Searchers: searcherList, + } + + a.FieldFilters = append(a.FieldFilters, ff) + + } + } + + return nil + +} + +// Close a no-op for this processor +func (a *FilterProcessor) Close() error { + return nil +} + +// Process processes the input data +func (a *FilterProcessor) Process(input data.BlockData) (data.BlockData, error) { + + var err error + + for _, searcher := range a.FieldFilters { + input, err = searcher.SearchAndFilter(input) + if err != nil { + return data.BlockData{}, err + } + } + + return input, err +} + +// OnComplete a no-op for this processor +func (a *FilterProcessor) OnComplete(input data.BlockData) error { + return nil +} diff --git a/processors/filterprocessor/filter_processor_test.go b/processors/filterprocessor/filter_processor_test.go new file mode 100644 index 000000000..e7d862c10 --- /dev/null +++ b/processors/filterprocessor/filter_processor_test.go @@ -0,0 +1,101 @@ +package filterprocessor + +import ( + "context" + "github.com/algorand/go-algorand/data/basics" + "github.com/algorand/go-algorand/data/transactions" + "github.com/algorand/indexer/conduit" + "github.com/algorand/indexer/data" + "github.com/algorand/indexer/plugins" + "testing" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + + "github.com/algorand/indexer/processors" +) + +func TestCheckTagExistsAndHasCorrectFunction(t *testing.T) { + // check that something that doesn't exist throws an error + err := checkTagExistsAndHasCorrectFunction("const", "SignedTxnWithAD.SignedTxn.Txn.PaymentTxnFields.LoreumIpsum.SDF") + assert.NotNil(t, err) + assert.Contains(t, err.Error(), "does not exist in transactions") + + err = checkTagExistsAndHasCorrectFunction("const", "LoreumIpsum") + assert.NotNil(t, err) + assert.Contains(t, err.Error(), "does not exist in transactions") + + // Fee does not have a "String" Function so we cant use const with it. + err = checkTagExistsAndHasCorrectFunction("const", "SignedTxnWithAD.SignedTxn.Txn.Header.Fee") + assert.NotNil(t, err) + assert.Contains(t, err.Error(), "does not contain the needed method") +} + +// TestFilterProcessor_Init tests initialization of the filter processor +func TestFilterProcessor_Init(t *testing.T) { + + sampleAddr1 := basics.Address{1} + sampleAddr2 := basics.Address{2} + sampleAddr3 := basics.Address{3} + + sampleCfgStr := `--- +filters: + - some: + - tag: SignedTxnWithAD.SignedTxn.Txn.PaymentTxnFields.Receiver + expression-type: regex + expression: "` + sampleAddr1.String() + `" + - tag: SignedTxnWithAD.SignedTxn.Txn.PaymentTxnFields.Receiver + expression-type: const + expression: "` + sampleAddr2.String() + `" +` + + fpBuilder, err := processors.ProcessorBuilderByName(implementationName) + assert.Nil(t, err) + + fp := fpBuilder.New() + err = fp.Init(context.Background(), &conduit.PipelineInitProvider{}, plugins.PluginConfig(sampleCfgStr), logrus.New()) + assert.Nil(t, err) + + bd := data.BlockData{} + bd.Payset = append(bd.Payset, + + transactions.SignedTxnInBlock{ + SignedTxnWithAD: transactions.SignedTxnWithAD{ + SignedTxn: transactions.SignedTxn{ + Txn: transactions.Transaction{ + PaymentTxnFields: transactions.PaymentTxnFields{ + Receiver: sampleAddr1, + }, + }, + }, + }, + }, + transactions.SignedTxnInBlock{ + SignedTxnWithAD: transactions.SignedTxnWithAD{ + SignedTxn: transactions.SignedTxn{ + Txn: transactions.Transaction{ + PaymentTxnFields: transactions.PaymentTxnFields{ + Receiver: sampleAddr2, + }, + }, + }, + }, + }, + transactions.SignedTxnInBlock{ + SignedTxnWithAD: transactions.SignedTxnWithAD{ + SignedTxn: transactions.SignedTxn{ + Txn: transactions.Transaction{ + PaymentTxnFields: transactions.PaymentTxnFields{ + Receiver: sampleAddr3, + }, + }, + }, + }, + }, + ) + + output, err := fp.Process(bd) + assert.Nil(t, err) + assert.Equal(t, len(output.Payset), 2) + +} From 048156bdb461ea921124d5536c6db6a7d45cb810 Mon Sep 17 00:00:00 2001 From: Stephen Akiki Date: Tue, 13 Sep 2022 09:27:46 -0400 Subject: [PATCH 02/17] Nits --- processors/filterprocessor/filter_processor.go | 2 +- processors/filterprocessor/filter_processor_test.go | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/processors/filterprocessor/filter_processor.go b/processors/filterprocessor/filter_processor.go index d01fe329f..f672c07fb 100644 --- a/processors/filterprocessor/filter_processor.go +++ b/processors/filterprocessor/filter_processor.go @@ -3,7 +3,6 @@ package filterprocessor import ( "context" "fmt" - "github.com/algorand/go-algorand/data/transactions" "gopkg.in/yaml.v3" "reflect" "regexp" @@ -11,6 +10,7 @@ import ( log "github.com/sirupsen/logrus" + "github.com/algorand/go-algorand/data/transactions" "github.com/algorand/indexer/data" "github.com/algorand/indexer/plugins" "github.com/algorand/indexer/processors" diff --git a/processors/filterprocessor/filter_processor_test.go b/processors/filterprocessor/filter_processor_test.go index e7d862c10..eb1625105 100644 --- a/processors/filterprocessor/filter_processor_test.go +++ b/processors/filterprocessor/filter_processor_test.go @@ -2,16 +2,16 @@ package filterprocessor import ( "context" - "github.com/algorand/go-algorand/data/basics" - "github.com/algorand/go-algorand/data/transactions" - "github.com/algorand/indexer/conduit" - "github.com/algorand/indexer/data" - "github.com/algorand/indexer/plugins" "testing" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" + "github.com/algorand/go-algorand/data/basics" + "github.com/algorand/go-algorand/data/transactions" + "github.com/algorand/indexer/conduit" + "github.com/algorand/indexer/data" + "github.com/algorand/indexer/plugins" "github.com/algorand/indexer/processors" ) From 26b1467e9682f6deaab9b0f8cb2296db229a3eff Mon Sep 17 00:00:00 2001 From: AlgoStephenAkiki <85183435+AlgoStephenAkiki@users.noreply.github.com> Date: Wed, 14 Sep 2022 09:20:33 -0400 Subject: [PATCH 03/17] Update processors/filterprocessor/filter_processor.go Co-authored-by: Eric Warehime --- processors/filterprocessor/filter_processor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processors/filterprocessor/filter_processor.go b/processors/filterprocessor/filter_processor.go index f672c07fb..dcaf16f71 100644 --- a/processors/filterprocessor/filter_processor.go +++ b/processors/filterprocessor/filter_processor.go @@ -26,7 +26,7 @@ func init() { // Constructor is the ProcessorConstructor implementation for the "address_filter" processor type Constructor struct{} -// New initializes a blockProcessorConstructor +// New initializes a FilterProcessor func (c *Constructor) New() processors.Processor { return &FilterProcessor{} } From 05b818f2f0624c09666e5379c1c78c36169b0e8c Mon Sep 17 00:00:00 2001 From: AlgoStephenAkiki <85183435+AlgoStephenAkiki@users.noreply.github.com> Date: Wed, 14 Sep 2022 09:20:42 -0400 Subject: [PATCH 04/17] Update processors/filterprocessor/filter_processor_test.go Co-authored-by: Eric Warehime --- processors/filterprocessor/filter_processor_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/processors/filterprocessor/filter_processor_test.go b/processors/filterprocessor/filter_processor_test.go index eb1625105..cee73664e 100644 --- a/processors/filterprocessor/filter_processor_test.go +++ b/processors/filterprocessor/filter_processor_test.go @@ -22,8 +22,7 @@ func TestCheckTagExistsAndHasCorrectFunction(t *testing.T) { assert.Contains(t, err.Error(), "does not exist in transactions") err = checkTagExistsAndHasCorrectFunction("const", "LoreumIpsum") - assert.NotNil(t, err) - assert.Contains(t, err.Error(), "does not exist in transactions") + assert.ErrorContains(t, err, "does not exist in transactions") // Fee does not have a "String" Function so we cant use const with it. err = checkTagExistsAndHasCorrectFunction("const", "SignedTxnWithAD.SignedTxn.Txn.Header.Fee") From 19dbbd5cf56059f25326eae84ccb5dae0ee11437 Mon Sep 17 00:00:00 2001 From: AlgoStephenAkiki <85183435+AlgoStephenAkiki@users.noreply.github.com> Date: Wed, 14 Sep 2022 09:20:55 -0400 Subject: [PATCH 05/17] Update processors/filterprocessor/filter_processor_test.go Co-authored-by: Eric Warehime --- processors/filterprocessor/filter_processor_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processors/filterprocessor/filter_processor_test.go b/processors/filterprocessor/filter_processor_test.go index cee73664e..cae1b196d 100644 --- a/processors/filterprocessor/filter_processor_test.go +++ b/processors/filterprocessor/filter_processor_test.go @@ -49,7 +49,7 @@ filters: ` fpBuilder, err := processors.ProcessorBuilderByName(implementationName) - assert.Nil(t, err) + assert.NoError(t, err) fp := fpBuilder.New() err = fp.Init(context.Background(), &conduit.PipelineInitProvider{}, plugins.PluginConfig(sampleCfgStr), logrus.New()) From dc101adb0e05650670027f9a7a7a5ca4dc013739 Mon Sep 17 00:00:00 2001 From: Stephen Akiki Date: Wed, 14 Sep 2022 09:26:07 -0400 Subject: [PATCH 06/17] PR comments --- processors/filterprocessor/filter_processor.go | 5 +++-- processors/filterprocessor/filter_processor_test.go | 10 ++++------ 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/processors/filterprocessor/filter_processor.go b/processors/filterprocessor/filter_processor.go index dcaf16f71..8fb3c0ef5 100644 --- a/processors/filterprocessor/filter_processor.go +++ b/processors/filterprocessor/filter_processor.go @@ -3,14 +3,15 @@ package filterprocessor import ( "context" "fmt" - "gopkg.in/yaml.v3" "reflect" "regexp" "strings" log "github.com/sirupsen/logrus" + "gopkg.in/yaml.v3" "github.com/algorand/go-algorand/data/transactions" + "github.com/algorand/indexer/data" "github.com/algorand/indexer/plugins" "github.com/algorand/indexer/processors" @@ -23,7 +24,7 @@ func init() { processors.RegisterProcessor(implementationName, &Constructor{}) } -// Constructor is the ProcessorConstructor implementation for the "address_filter" processor +// Constructor is the ProcessorConstructor implementation for the "filter_processor" processor type Constructor struct{} // New initializes a FilterProcessor diff --git a/processors/filterprocessor/filter_processor_test.go b/processors/filterprocessor/filter_processor_test.go index cae1b196d..092254988 100644 --- a/processors/filterprocessor/filter_processor_test.go +++ b/processors/filterprocessor/filter_processor_test.go @@ -18,16 +18,14 @@ import ( func TestCheckTagExistsAndHasCorrectFunction(t *testing.T) { // check that something that doesn't exist throws an error err := checkTagExistsAndHasCorrectFunction("const", "SignedTxnWithAD.SignedTxn.Txn.PaymentTxnFields.LoreumIpsum.SDF") - assert.NotNil(t, err) - assert.Contains(t, err.Error(), "does not exist in transactions") + assert.ErrorContains(t, err, "does not exist in transactions") err = checkTagExistsAndHasCorrectFunction("const", "LoreumIpsum") assert.ErrorContains(t, err, "does not exist in transactions") // Fee does not have a "String" Function so we cant use const with it. err = checkTagExistsAndHasCorrectFunction("const", "SignedTxnWithAD.SignedTxn.Txn.Header.Fee") - assert.NotNil(t, err) - assert.Contains(t, err.Error(), "does not contain the needed method") + assert.ErrorContains(t, err, "does not contain the needed method") } // TestFilterProcessor_Init tests initialization of the filter processor @@ -53,7 +51,7 @@ filters: fp := fpBuilder.New() err = fp.Init(context.Background(), &conduit.PipelineInitProvider{}, plugins.PluginConfig(sampleCfgStr), logrus.New()) - assert.Nil(t, err) + assert.NoError(t, err) bd := data.BlockData{} bd.Payset = append(bd.Payset, @@ -94,7 +92,7 @@ filters: ) output, err := fp.Process(bd) - assert.Nil(t, err) + assert.NoError(t, err) assert.Equal(t, len(output.Payset), 2) } From b585ad21c551ee8a7554636cc0088f60cfe8030d Mon Sep 17 00:00:00 2001 From: Stephen Akiki Date: Wed, 14 Sep 2022 10:05:17 -0400 Subject: [PATCH 07/17] Pr comments --- processors/blockprocessor/block_processor.go | 4 +-- .../blockprocessor/block_processor_test.go | 3 +-- processors/blockprocessor/config.go | 12 +++++++++ .../{ => blockprocessor}/config_test.go | 4 +-- processors/blockprocessor/initialize.go | 7 +++-- processors/blockprocessor/initialize_test.go | 3 +-- processors/config.go | 27 ------------------- processors/filterprocessor/config.go | 26 ++++++++++++++++++ .../filterprocessor/filter_processor.go | 21 +++++++-------- 9 files changed, 56 insertions(+), 51 deletions(-) create mode 100644 processors/blockprocessor/config.go rename processors/{ => blockprocessor}/config_test.go (92%) delete mode 100644 processors/config.go create mode 100644 processors/filterprocessor/config.go diff --git a/processors/blockprocessor/block_processor.go b/processors/blockprocessor/block_processor.go index a63be3403..f9b83ea4d 100644 --- a/processors/blockprocessor/block_processor.go +++ b/processors/blockprocessor/block_processor.go @@ -74,7 +74,7 @@ func (proc *blockProcessor) Init(ctx context.Context, initProvider data.InitProv proc.logger = logger // First get the configuration from the string - var pCfg processors.BlockProcessorConfig + var pCfg Config err := yaml.Unmarshal([]byte(cfg), &pCfg) if err != nil { return fmt.Errorf("blockprocessor init error: %w", err) @@ -184,7 +184,7 @@ func MakeBlockProcessorWithLedger(logger *log.Logger, l *ledger.Ledger, handler } // MakeBlockProcessorWithLedgerInit creates a block processor and initializes the ledger. -func MakeBlockProcessorWithLedgerInit(ctx context.Context, logger *log.Logger, nextDbRound uint64, genesis *bookkeeping.Genesis, config processors.BlockProcessorConfig, handler func(block *ledgercore.ValidatedBlock) error) (BlockProcessor, error) { +func MakeBlockProcessorWithLedgerInit(ctx context.Context, logger *log.Logger, nextDbRound uint64, genesis *bookkeeping.Genesis, config Config, handler func(block *ledgercore.ValidatedBlock) error) (BlockProcessor, error) { err := InitializeLedger(ctx, logger, nextDbRound, *genesis, &config) if err != nil { return nil, fmt.Errorf("MakeBlockProcessorWithLedgerInit() err: %w", err) diff --git a/processors/blockprocessor/block_processor_test.go b/processors/blockprocessor/block_processor_test.go index e880244b3..24d6f5a87 100644 --- a/processors/blockprocessor/block_processor_test.go +++ b/processors/blockprocessor/block_processor_test.go @@ -3,7 +3,6 @@ package blockprocessor_test import ( "context" "fmt" - "github.com/algorand/indexer/processors" "testing" test2 "github.com/sirupsen/logrus/hooks/test" @@ -166,7 +165,7 @@ func TestMakeProcessorWithLedgerInit_CatchpointErrors(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - config := processors.BlockProcessorConfig{Catchpoint: tc.catchpoint} + config := blockprocessor.Config{Catchpoint: tc.catchpoint} _, err := blockprocessor.MakeBlockProcessorWithLedgerInit( context.Background(), logger, diff --git a/processors/blockprocessor/config.go b/processors/blockprocessor/config.go new file mode 100644 index 000000000..387dec848 --- /dev/null +++ b/processors/blockprocessor/config.go @@ -0,0 +1,12 @@ +package blockprocessor + +// Config configuration for a block processor +type Config struct { + // Catchpoint to initialize the local ledger to + Catchpoint string `yaml:"catchpoint"` + + IndexerDatadir string `yaml:"indexer-data-dir"` + AlgodDataDir string `yaml:"algod-data-dir"` + AlgodToken string `yaml:"algod-token"` + AlgodAddr string `yaml:"algod-addr"` +} diff --git a/processors/config_test.go b/processors/blockprocessor/config_test.go similarity index 92% rename from processors/config_test.go rename to processors/blockprocessor/config_test.go index 82d58151f..2f5b5decd 100644 --- a/processors/config_test.go +++ b/processors/blockprocessor/config_test.go @@ -1,4 +1,4 @@ -package processors +package blockprocessor import ( "github.com/stretchr/testify/require" @@ -16,7 +16,7 @@ func TestConfigDeserialize(t *testing.T) { algod-addr: "algod_addr" ` - var processorConfig BlockProcessorConfig + var processorConfig Config err := yaml.Unmarshal([]byte(configStr), &processorConfig) require.Nil(t, err) require.Equal(t, processorConfig.Catchpoint, "acatch") diff --git a/processors/blockprocessor/initialize.go b/processors/blockprocessor/initialize.go index 0de745bbe..fd433ca8c 100644 --- a/processors/blockprocessor/initialize.go +++ b/processors/blockprocessor/initialize.go @@ -13,7 +13,6 @@ import ( "github.com/algorand/go-algorand/rpcs" "github.com/algorand/indexer/fetcher" - "github.com/algorand/indexer/processors" "github.com/algorand/indexer/processors/blockprocessor/internal" ) @@ -21,7 +20,7 @@ import ( // IndexerDbOpts. // nextRound - next round to process after initializing. // catchpoint - if provided, attempt to use fast catchup. -func InitializeLedger(ctx context.Context, logger *log.Logger, nextDbRound uint64, genesis bookkeeping.Genesis, config *processors.BlockProcessorConfig) error { +func InitializeLedger(ctx context.Context, logger *log.Logger, nextDbRound uint64, genesis bookkeeping.Genesis, config *Config) error { if nextDbRound > 0 { if config.Catchpoint != "" { round, _, err := ledgercore.ParseCatchpointLabel(config.Catchpoint) @@ -59,7 +58,7 @@ func InitializeLedgerFastCatchup(ctx context.Context, logger *log.Logger, catchp // InitializeLedgerSimple initializes a ledger with the block processor by // sending it one block at a time and letting it update the ledger as usual. -func InitializeLedgerSimple(ctx context.Context, logger *log.Logger, round uint64, genesis *bookkeeping.Genesis, config *processors.BlockProcessorConfig) error { +func InitializeLedgerSimple(ctx context.Context, logger *log.Logger, round uint64, genesis *bookkeeping.Genesis, config *Config) error { ctx, cf := context.WithCancel(ctx) defer cf() var bot fetcher.Fetcher @@ -138,7 +137,7 @@ func handleBlock(logger *log.Logger, block *rpcs.EncodedBlockCert, procHandler f return nil } -func getFetcher(logger *log.Logger, config *processors.BlockProcessorConfig) (fetcher.Fetcher, error) { +func getFetcher(logger *log.Logger, config *Config) (fetcher.Fetcher, error) { var err error var bot fetcher.Fetcher if config.AlgodDataDir != "" { diff --git a/processors/blockprocessor/initialize_test.go b/processors/blockprocessor/initialize_test.go index 29458c772..40dfbf3de 100644 --- a/processors/blockprocessor/initialize_test.go +++ b/processors/blockprocessor/initialize_test.go @@ -19,7 +19,6 @@ import ( "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/rpcs" - "github.com/algorand/indexer/processors" "github.com/algorand/indexer/processors/blockprocessor/internal" "github.com/algorand/indexer/util" "github.com/algorand/indexer/util/test" @@ -61,7 +60,7 @@ func TestRunMigration(t *testing.T) { dname, err := os.MkdirTemp("", "indexer") defer os.RemoveAll(dname) - config := processors.BlockProcessorConfig{ + config := Config{ IndexerDatadir: dname, AlgodAddr: "localhost", AlgodToken: "AAAAA", diff --git a/processors/config.go b/processors/config.go deleted file mode 100644 index 950d6bc9c..000000000 --- a/processors/config.go +++ /dev/null @@ -1,27 +0,0 @@ -package processors - -// BlockProcessorConfig configuration for a block processor -type BlockProcessorConfig struct { - // Catchpoint to initialize the local ledger to - Catchpoint string `yaml:"catchpoint"` - - IndexerDatadir string `yaml:"indexer-data-dir"` - AlgodDataDir string `yaml:"algod-data-dir"` - AlgodToken string `yaml:"algod-token"` - AlgodAddr string `yaml:"algod-addr"` -} - -// FilterProcessorSubConfig is the configuration needed for each additional filter -type FilterProcessorSubConfig struct { - // The tag of the struct to analyze - FilterTag string `yaml:"tag"` - // The type of expression to search for "const" or "regex" - ExpressionType string `yaml:"expression-type"` - // The expression to search - Expression string `yaml:"expression"` -} - -// FilterProcessorConfig configuration for the filter processor -type FilterProcessorConfig struct { - Filters []map[string][]FilterProcessorSubConfig `yaml:"filters"` -} diff --git a/processors/filterprocessor/config.go b/processors/filterprocessor/config.go new file mode 100644 index 000000000..7038fc01a --- /dev/null +++ b/processors/filterprocessor/config.go @@ -0,0 +1,26 @@ +package filterprocessor + +// FilterExpressionType is the type of the filter (i.e. const, regex, etc) +type FilterExpressionType string + +const ( + // ConstFilter a filter that looks at a constant string in its entirety + ConstFilter FilterExpressionType = "const" + // RegexFilter a filter that applies regex rules to the matching + RegexFilter FilterExpressionType = "regex" +) + +// SubConfig is the configuration needed for each additional filter +type SubConfig struct { + // The tag of the struct to analyze + FilterTag string `yaml:"tag"` + // The type of expression to search for "const" or "regex" + ExpressionType FilterExpressionType `yaml:"expression-type"` + // The expression to search + Expression string `yaml:"expression"` +} + +// Config configuration for the filter processor +type Config struct { + Filters []map[string][]SubConfig `yaml:"filters"` +} diff --git a/processors/filterprocessor/filter_processor.go b/processors/filterprocessor/filter_processor.go index 8fb3c0ef5..5015bb000 100644 --- a/processors/filterprocessor/filter_processor.go +++ b/processors/filterprocessor/filter_processor.go @@ -44,12 +44,9 @@ func (e regexExpression) Search(input interface{}) bool { return e.Regex.MatchString(input.(string)) } -const constantExpressionStr = "const" -const regexExpressionStr = "regex" - -func makeExpression(expressionType string, expressionSearchStr string) (*expression, error) { +func makeExpression(expressionType FilterExpressionType, expressionSearchStr string) (*expression, error) { switch expressionType { - case constantExpressionStr: + case ConstFilter: { r, err := regexp.Compile("^" + expressionSearchStr + "$") if err != nil { @@ -59,7 +56,7 @@ func makeExpression(expressionType string, expressionSearchStr string) (*express var exp expression = regexExpression{Regex: r} return &exp, nil } - case regexExpressionStr: + case RegexFilter: { r, err := regexp.Compile(expressionSearchStr) if err != nil { @@ -108,13 +105,13 @@ func (f fieldSearcher) Search(input transactions.SignedTxnInBlock) bool { // This maps the expression-type with the needed function for the expression. // For instance the const or regex expression-type might need the String() function // Can't make this consts because there are no constant maps in go... -var expressionTypeToFunctionMap = map[string]string{ - constantExpressionStr: "String", - regexExpressionStr: "String", +var expressionTypeToFunctionMap = map[FilterExpressionType]string{ + ConstFilter: "String", + RegexFilter: "String", } // checks that the supplied tag exists in the struct and recovers from any panics -func checkTagExistsAndHasCorrectFunction(expressionType string, tag string) (outError error) { +func checkTagExistsAndHasCorrectFunction(expressionType FilterExpressionType, tag string) (outError error) { var field string defer func() { if r := recover(); r != nil { @@ -145,7 +142,7 @@ func checkTagExistsAndHasCorrectFunction(expressionType string, tag string) (out } // makeFieldSearcher will check that the field exists and that it contains the necessary "conversion" function -func makeFieldSearcher(e *expression, expressionType string, tag string) (*fieldSearcher, error) { +func makeFieldSearcher(e *expression, expressionType FilterExpressionType, tag string) (*fieldSearcher, error) { if err := checkTagExistsAndHasCorrectFunction(expressionType, tag); err != nil { return nil, err @@ -227,7 +224,7 @@ func (a *FilterProcessor) Init(ctx context.Context, _ data.InitProvider, cfg plu a.ctx = ctx // First get the configuration from the string - pCfg := processors.FilterProcessorConfig{} + pCfg := Config{} err := yaml.Unmarshal([]byte(cfg), &pCfg) if err != nil { From 74743a3959a7fd4eb8c89995987b120df0ba1711 Mon Sep 17 00:00:00 2001 From: Stephen Akiki Date: Wed, 14 Sep 2022 10:32:58 -0400 Subject: [PATCH 08/17] PR updates --- processors/filterprocessor/config.go | 12 +- .../filterprocessor/expression/expression.go | 65 ++++++ processors/filterprocessor/fields/filter.go | 72 +++++++ processors/filterprocessor/fields/searcher.go | 78 ++++++++ .../filterprocessor/fields/searcher_test.go | 21 ++ .../filterprocessor/filter_processor.go | 188 +----------------- .../filterprocessor/filter_processor_test.go | 13 -- 7 files changed, 247 insertions(+), 202 deletions(-) create mode 100644 processors/filterprocessor/expression/expression.go create mode 100644 processors/filterprocessor/fields/filter.go create mode 100644 processors/filterprocessor/fields/searcher.go create mode 100644 processors/filterprocessor/fields/searcher_test.go diff --git a/processors/filterprocessor/config.go b/processors/filterprocessor/config.go index 7038fc01a..ac126958f 100644 --- a/processors/filterprocessor/config.go +++ b/processors/filterprocessor/config.go @@ -1,21 +1,13 @@ package filterprocessor -// FilterExpressionType is the type of the filter (i.e. const, regex, etc) -type FilterExpressionType string - -const ( - // ConstFilter a filter that looks at a constant string in its entirety - ConstFilter FilterExpressionType = "const" - // RegexFilter a filter that applies regex rules to the matching - RegexFilter FilterExpressionType = "regex" -) +import "github.com/algorand/indexer/processors/filterprocessor/expression" // SubConfig is the configuration needed for each additional filter type SubConfig struct { // The tag of the struct to analyze FilterTag string `yaml:"tag"` // The type of expression to search for "const" or "regex" - ExpressionType FilterExpressionType `yaml:"expression-type"` + ExpressionType expression.Type `yaml:"expression-type"` // The expression to search Expression string `yaml:"expression"` } diff --git a/processors/filterprocessor/expression/expression.go b/processors/filterprocessor/expression/expression.go new file mode 100644 index 000000000..a1fdc7596 --- /dev/null +++ b/processors/filterprocessor/expression/expression.go @@ -0,0 +1,65 @@ +package expression + +import ( + "fmt" + "regexp" +) + +// Type is the type of the filter (i.e. const, regex, etc) +type Type string + +const ( + // ConstFilter a filter that looks at a constant string in its entirety + ConstFilter Type = "const" + // RegexFilter a filter that applies regex rules to the matching + RegexFilter Type = "regex" +) + +// ExpressionTypeToFunctionMap maps the expression-type with the needed function for the expression. +// For instance the const or regex expression-type might need the String() function +// Can't make this consts because there are no constant maps in go... +var ExpressionTypeToFunctionMap = map[Type]string{ + ConstFilter: "String", + RegexFilter: "String", +} + +// Interface the expression interface +type Interface interface { + Search(input interface{}) bool +} + +type regexExpression struct { + Regex *regexp.Regexp +} + +func (e regexExpression) Search(input interface{}) bool { + return e.Regex.MatchString(input.(string)) +} + +// MakeExpression creates an expression based on an expression type +func MakeExpression(expressionType Type, expressionSearchStr string) (*Interface, error) { + switch expressionType { + case ConstFilter: + { + r, err := regexp.Compile("^" + expressionSearchStr + "$") + if err != nil { + return nil, err + } + + var exp Interface = regexExpression{Regex: r} + return &exp, nil + } + case RegexFilter: + { + r, err := regexp.Compile(expressionSearchStr) + if err != nil { + return nil, err + } + + var exp Interface = regexExpression{Regex: r} + return &exp, nil + } + default: + return nil, fmt.Errorf("unknown expression type: %s", expressionType) + } +} diff --git a/processors/filterprocessor/fields/filter.go b/processors/filterprocessor/fields/filter.go new file mode 100644 index 000000000..65666b7c3 --- /dev/null +++ b/processors/filterprocessor/fields/filter.go @@ -0,0 +1,72 @@ +package fields + +import ( + "fmt" + + "github.com/algorand/go-algorand/data/transactions" + "github.com/algorand/indexer/data" +) + +// Operation an operation like "some" or "all" for boolean logic +type Operation string + +const someFieldOperation Operation = "some" +const allFieldOperation Operation = "all" + +// ValidFieldOperation returns true if the input is a valid operation +func ValidFieldOperation(input string) bool { + if input != string(someFieldOperation) && input != string(allFieldOperation) { + return false + } + + return true +} + +// Filter an object that combines field searches with a boolean operator +type Filter struct { + Op Operation + Searchers []*Searcher +} + +// SearchAndFilter searches through the block data and applies the operation to the results +func (f Filter) SearchAndFilter(input data.BlockData) (data.BlockData, error) { + + var newPayset []transactions.SignedTxnInBlock + switch f.Op { + case someFieldOperation: + for _, txn := range input.Payset { + for _, fs := range f.Searchers { + if fs.Search(txn) { + newPayset = append(newPayset, txn) + break + } + } + } + + break + case allFieldOperation: + for _, txn := range input.Payset { + + allTrue := true + for _, fs := range f.Searchers { + if !fs.Search(txn) { + allTrue = false + break + } + } + + if allTrue { + newPayset = append(newPayset, txn) + } + + } + break + default: + return data.BlockData{}, fmt.Errorf("unknown operation: %s", f.Op) + } + + input.Payset = newPayset + + return input, nil + +} diff --git a/processors/filterprocessor/fields/searcher.go b/processors/filterprocessor/fields/searcher.go new file mode 100644 index 000000000..47766c84e --- /dev/null +++ b/processors/filterprocessor/fields/searcher.go @@ -0,0 +1,78 @@ +package fields + +import ( + "fmt" + "reflect" + "strings" + + "github.com/algorand/go-algorand/data/transactions" + "github.com/algorand/indexer/processors/filterprocessor/expression" +) + +// Searcher searches the struct with an expression and method to call +type Searcher struct { + Exp *expression.Interface + Tag string + MethodToCall string +} + +// Search returns true if block contains the expression +func (f Searcher) Search(input transactions.SignedTxnInBlock) bool { + + e := reflect.ValueOf(&input).Elem() + + var field string + + for _, field = range strings.Split(f.Tag, ".") { + e = e.FieldByName(field) + } + + toSearch := e.MethodByName(f.MethodToCall).Call([]reflect.Value{})[0].Interface() + + if (*f.Exp).Search(toSearch) { + return true + } + + return false +} + +// checks that the supplied tag exists in the struct and recovers from any panics +func checkTagExistsAndHasCorrectFunction(expressionType expression.Type, tag string) (outError error) { + var field string + defer func() { + if r := recover(); r != nil { + outError = fmt.Errorf("error occured regarding tag %s. last searched field was: %s - %v", tag, field, r) + } + }() + + e := reflect.ValueOf(&transactions.SignedTxnInBlock{}).Elem() + + for _, field = range strings.Split(tag, ".") { + e = e.FieldByName(field) + if !e.IsValid() { + return fmt.Errorf("%s does not exist in transactions.SignedTxnInBlock struct. last searched field was: %s", tag, field) + } + } + + method, ok := expression.ExpressionTypeToFunctionMap[expressionType] + + if !ok { + return fmt.Errorf("expression type (%s) is not supported. tag value: %s", expressionType, tag) + } + + if !e.MethodByName(method).IsValid() { + return fmt.Errorf("variable referenced by tag %s does not contain the needed method: %s", tag, method) + } + + return nil +} + +// MakeFieldSearcher will check that the field exists and that it contains the necessary "conversion" function +func MakeFieldSearcher(e *expression.Interface, expressionType expression.Type, tag string) (*Searcher, error) { + + if err := checkTagExistsAndHasCorrectFunction(expressionType, tag); err != nil { + return nil, err + } + + return &Searcher{Exp: e, Tag: tag, MethodToCall: expression.ExpressionTypeToFunctionMap[expressionType]}, nil +} diff --git a/processors/filterprocessor/fields/searcher_test.go b/processors/filterprocessor/fields/searcher_test.go new file mode 100644 index 000000000..4213e09dc --- /dev/null +++ b/processors/filterprocessor/fields/searcher_test.go @@ -0,0 +1,21 @@ +package fields + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +// TestCheckTagExistsAndHasCorrectFunction tests that the check tag exists and function relation works +func TestCheckTagExistsAndHasCorrectFunction(t *testing.T) { + // check that something that doesn't exist throws an error + err := checkTagExistsAndHasCorrectFunction("const", "SignedTxnWithAD.SignedTxn.Txn.PaymentTxnFields.LoreumIpsum.SDF") + assert.ErrorContains(t, err, "does not exist in transactions") + + err = checkTagExistsAndHasCorrectFunction("const", "LoreumIpsum") + assert.ErrorContains(t, err, "does not exist in transactions") + + // Fee does not have a "String" Function so we cant use const with it. + err = checkTagExistsAndHasCorrectFunction("const", "SignedTxnWithAD.SignedTxn.Txn.Header.Fee") + assert.ErrorContains(t, err, "does not contain the needed method") +} diff --git a/processors/filterprocessor/filter_processor.go b/processors/filterprocessor/filter_processor.go index 5015bb000..6b3915dbf 100644 --- a/processors/filterprocessor/filter_processor.go +++ b/processors/filterprocessor/filter_processor.go @@ -3,18 +3,14 @@ package filterprocessor import ( "context" "fmt" - "reflect" - "regexp" - "strings" - log "github.com/sirupsen/logrus" "gopkg.in/yaml.v3" - "github.com/algorand/go-algorand/data/transactions" - "github.com/algorand/indexer/data" "github.com/algorand/indexer/plugins" "github.com/algorand/indexer/processors" + "github.com/algorand/indexer/processors/filterprocessor/expression" + "github.com/algorand/indexer/processors/filterprocessor/fields" ) const implementationName = "filter_processor" @@ -32,175 +28,9 @@ func (c *Constructor) New() processors.Processor { return &FilterProcessor{} } -type expression interface { - Search(input interface{}) bool -} - -type regexExpression struct { - Regex *regexp.Regexp -} - -func (e regexExpression) Search(input interface{}) bool { - return e.Regex.MatchString(input.(string)) -} - -func makeExpression(expressionType FilterExpressionType, expressionSearchStr string) (*expression, error) { - switch expressionType { - case ConstFilter: - { - r, err := regexp.Compile("^" + expressionSearchStr + "$") - if err != nil { - return nil, err - } - - var exp expression = regexExpression{Regex: r} - return &exp, nil - } - case RegexFilter: - { - r, err := regexp.Compile(expressionSearchStr) - if err != nil { - return nil, err - } - - var exp expression = regexExpression{Regex: r} - return &exp, nil - } - default: - return nil, fmt.Errorf("unknown expression type: %s", expressionType) - } -} - -type fieldOperation string - -const someFieldOperation fieldOperation = "some" -const allFieldOperation fieldOperation = "all" - -type fieldSearcher struct { - Exp *expression - Tag string - MethodToCall string -} - -// Search returns true if block contains the expression -func (f fieldSearcher) Search(input transactions.SignedTxnInBlock) bool { - - e := reflect.ValueOf(&input).Elem() - - var field string - - for _, field = range strings.Split(f.Tag, ".") { - e = e.FieldByName(field) - } - - toSearch := e.MethodByName(f.MethodToCall).Call([]reflect.Value{})[0].Interface() - - if (*f.Exp).Search(toSearch) { - return true - } - - return false -} - -// This maps the expression-type with the needed function for the expression. -// For instance the const or regex expression-type might need the String() function -// Can't make this consts because there are no constant maps in go... -var expressionTypeToFunctionMap = map[FilterExpressionType]string{ - ConstFilter: "String", - RegexFilter: "String", -} - -// checks that the supplied tag exists in the struct and recovers from any panics -func checkTagExistsAndHasCorrectFunction(expressionType FilterExpressionType, tag string) (outError error) { - var field string - defer func() { - if r := recover(); r != nil { - outError = fmt.Errorf("error occured regarding tag %s. last searched field was: %s - %v", tag, field, r) - } - }() - - e := reflect.ValueOf(&transactions.SignedTxnInBlock{}).Elem() - - for _, field = range strings.Split(tag, ".") { - e = e.FieldByName(field) - if !e.IsValid() { - return fmt.Errorf("%s does not exist in transactions.SignedTxnInBlock struct. last searched field was: %s", tag, field) - } - } - - method, ok := expressionTypeToFunctionMap[expressionType] - - if !ok { - return fmt.Errorf("expression type (%s) is not supported. tag value: %s", expressionType, tag) - } - - if !e.MethodByName(method).IsValid() { - return fmt.Errorf("variable referenced by tag %s does not contain the needed method: %s", tag, method) - } - - return nil -} - -// makeFieldSearcher will check that the field exists and that it contains the necessary "conversion" function -func makeFieldSearcher(e *expression, expressionType FilterExpressionType, tag string) (*fieldSearcher, error) { - - if err := checkTagExistsAndHasCorrectFunction(expressionType, tag); err != nil { - return nil, err - } - - return &fieldSearcher{Exp: e, Tag: tag, MethodToCall: expressionTypeToFunctionMap[expressionType]}, nil -} - -type fieldFilter struct { - Op fieldOperation - Searchers []*fieldSearcher -} - -func (f fieldFilter) SearchAndFilter(input data.BlockData) (data.BlockData, error) { - - var newPayset []transactions.SignedTxnInBlock - switch f.Op { - case someFieldOperation: - for _, txn := range input.Payset { - for _, fs := range f.Searchers { - if fs.Search(txn) { - newPayset = append(newPayset, txn) - break - } - } - } - - break - case allFieldOperation: - for _, txn := range input.Payset { - - allTrue := true - for _, fs := range f.Searchers { - if !fs.Search(txn) { - allTrue = false - break - } - } - - if allTrue { - newPayset = append(newPayset, txn) - } - - } - break - default: - return data.BlockData{}, fmt.Errorf("unknown operation: %s", f.Op) - } - - input.Payset = newPayset - - return input, nil - -} - // FilterProcessor filters transactions by a variety of means type FilterProcessor struct { - FieldFilters []fieldFilter + FieldFilters []fields.Filter logger *log.Logger cfg plugins.PluginConfig @@ -242,20 +72,20 @@ func (a *FilterProcessor) Init(ctx context.Context, _ data.InitProvider, cfg plu for key, subConfigs := range configMaps { - if key != string(someFieldOperation) && key != string(allFieldOperation) { + if !fields.ValidFieldOperation(key) { return fmt.Errorf("filter processor Init(): filter key was not a valid value: %s", key) } - var searcherList []*fieldSearcher + var searcherList []*fields.Searcher for _, subConfig := range subConfigs { - exp, err := makeExpression(subConfig.ExpressionType, subConfig.Expression) + exp, err := expression.MakeExpression(subConfig.ExpressionType, subConfig.Expression) if err != nil { return fmt.Errorf("filter processor Init(): could not make expression with string %s for filter tag %s - %w", subConfig.Expression, subConfig.FilterTag, err) } - searcher, err := makeFieldSearcher(exp, subConfig.ExpressionType, subConfig.FilterTag) + searcher, err := fields.MakeFieldSearcher(exp, subConfig.ExpressionType, subConfig.FilterTag) if err != nil { return fmt.Errorf("filter processor Init(): %w", err) } @@ -263,8 +93,8 @@ func (a *FilterProcessor) Init(ctx context.Context, _ data.InitProvider, cfg plu searcherList = append(searcherList, searcher) } - ff := fieldFilter{ - Op: fieldOperation(key), + ff := fields.Filter{ + Op: fields.Operation(key), Searchers: searcherList, } diff --git a/processors/filterprocessor/filter_processor_test.go b/processors/filterprocessor/filter_processor_test.go index 092254988..5e3be3334 100644 --- a/processors/filterprocessor/filter_processor_test.go +++ b/processors/filterprocessor/filter_processor_test.go @@ -15,19 +15,6 @@ import ( "github.com/algorand/indexer/processors" ) -func TestCheckTagExistsAndHasCorrectFunction(t *testing.T) { - // check that something that doesn't exist throws an error - err := checkTagExistsAndHasCorrectFunction("const", "SignedTxnWithAD.SignedTxn.Txn.PaymentTxnFields.LoreumIpsum.SDF") - assert.ErrorContains(t, err, "does not exist in transactions") - - err = checkTagExistsAndHasCorrectFunction("const", "LoreumIpsum") - assert.ErrorContains(t, err, "does not exist in transactions") - - // Fee does not have a "String" Function so we cant use const with it. - err = checkTagExistsAndHasCorrectFunction("const", "SignedTxnWithAD.SignedTxn.Txn.Header.Fee") - assert.ErrorContains(t, err, "does not contain the needed method") -} - // TestFilterProcessor_Init tests initialization of the filter processor func TestFilterProcessor_Init(t *testing.T) { From 297966386ee7670d9b61b375610587fc064ba3ed Mon Sep 17 00:00:00 2001 From: Stephen Akiki Date: Wed, 14 Sep 2022 10:35:19 -0400 Subject: [PATCH 09/17] PR updates --- processors/filterprocessor/expression/expression.go | 4 ++-- processors/filterprocessor/fields/searcher.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/processors/filterprocessor/expression/expression.go b/processors/filterprocessor/expression/expression.go index a1fdc7596..e246e938c 100644 --- a/processors/filterprocessor/expression/expression.go +++ b/processors/filterprocessor/expression/expression.go @@ -15,10 +15,10 @@ const ( RegexFilter Type = "regex" ) -// ExpressionTypeToFunctionMap maps the expression-type with the needed function for the expression. +// TypeToFunctionMap maps the expression-type with the needed function for the expression. // For instance the const or regex expression-type might need the String() function // Can't make this consts because there are no constant maps in go... -var ExpressionTypeToFunctionMap = map[Type]string{ +var TypeToFunctionMap = map[Type]string{ ConstFilter: "String", RegexFilter: "String", } diff --git a/processors/filterprocessor/fields/searcher.go b/processors/filterprocessor/fields/searcher.go index 47766c84e..a93038da5 100644 --- a/processors/filterprocessor/fields/searcher.go +++ b/processors/filterprocessor/fields/searcher.go @@ -54,7 +54,7 @@ func checkTagExistsAndHasCorrectFunction(expressionType expression.Type, tag str } } - method, ok := expression.ExpressionTypeToFunctionMap[expressionType] + method, ok := expression.TypeToFunctionMap[expressionType] if !ok { return fmt.Errorf("expression type (%s) is not supported. tag value: %s", expressionType, tag) @@ -74,5 +74,5 @@ func MakeFieldSearcher(e *expression.Interface, expressionType expression.Type, return nil, err } - return &Searcher{Exp: e, Tag: tag, MethodToCall: expression.ExpressionTypeToFunctionMap[expressionType]}, nil + return &Searcher{Exp: e, Tag: tag, MethodToCall: expression.TypeToFunctionMap[expressionType]}, nil } From b382bbc6807bb5443392bcd0f3e3dcc80244ca68 Mon Sep 17 00:00:00 2001 From: Stephen Akiki Date: Wed, 14 Sep 2022 10:36:11 -0400 Subject: [PATCH 10/17] Pr updates --- processors/filterprocessor/filter_processor.go | 1 + 1 file changed, 1 insertion(+) diff --git a/processors/filterprocessor/filter_processor.go b/processors/filterprocessor/filter_processor.go index 6b3915dbf..7afba55d7 100644 --- a/processors/filterprocessor/filter_processor.go +++ b/processors/filterprocessor/filter_processor.go @@ -3,6 +3,7 @@ package filterprocessor import ( "context" "fmt" + log "github.com/sirupsen/logrus" "gopkg.in/yaml.v3" From 092adcb28e90b896ec95990427ea9985fbc08dd7 Mon Sep 17 00:00:00 2001 From: Stephen Akiki Date: Wed, 14 Sep 2022 10:52:48 -0400 Subject: [PATCH 11/17] PR updates --- processors/filterprocessor/config.go | 2 +- .../filterprocessor/expression/expression.go | 20 +++++++++---------- processors/filterprocessor/fields/searcher.go | 6 +++--- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/processors/filterprocessor/config.go b/processors/filterprocessor/config.go index ac126958f..26db01d7c 100644 --- a/processors/filterprocessor/config.go +++ b/processors/filterprocessor/config.go @@ -7,7 +7,7 @@ type SubConfig struct { // The tag of the struct to analyze FilterTag string `yaml:"tag"` // The type of expression to search for "const" or "regex" - ExpressionType expression.Type `yaml:"expression-type"` + ExpressionType expression.FilterType `yaml:"expression-type"` // The expression to search Expression string `yaml:"expression"` } diff --git a/processors/filterprocessor/expression/expression.go b/processors/filterprocessor/expression/expression.go index e246e938c..ef153a0ad 100644 --- a/processors/filterprocessor/expression/expression.go +++ b/processors/filterprocessor/expression/expression.go @@ -5,26 +5,26 @@ import ( "regexp" ) -// Type is the type of the filter (i.e. const, regex, etc) -type Type string +// FilterType is the type of the filter (i.e. const, regex, etc) +type FilterType string const ( // ConstFilter a filter that looks at a constant string in its entirety - ConstFilter Type = "const" + ConstFilter FilterType = "const" // RegexFilter a filter that applies regex rules to the matching - RegexFilter Type = "regex" + RegexFilter FilterType = "regex" ) // TypeToFunctionMap maps the expression-type with the needed function for the expression. // For instance the const or regex expression-type might need the String() function // Can't make this consts because there are no constant maps in go... -var TypeToFunctionMap = map[Type]string{ +var TypeToFunctionMap = map[FilterType]string{ ConstFilter: "String", RegexFilter: "String", } -// Interface the expression interface -type Interface interface { +// Expression the expression interface +type Expression interface { Search(input interface{}) bool } @@ -37,7 +37,7 @@ func (e regexExpression) Search(input interface{}) bool { } // MakeExpression creates an expression based on an expression type -func MakeExpression(expressionType Type, expressionSearchStr string) (*Interface, error) { +func MakeExpression(expressionType FilterType, expressionSearchStr string) (*Expression, error) { switch expressionType { case ConstFilter: { @@ -46,7 +46,7 @@ func MakeExpression(expressionType Type, expressionSearchStr string) (*Interface return nil, err } - var exp Interface = regexExpression{Regex: r} + var exp Expression = regexExpression{Regex: r} return &exp, nil } case RegexFilter: @@ -56,7 +56,7 @@ func MakeExpression(expressionType Type, expressionSearchStr string) (*Interface return nil, err } - var exp Interface = regexExpression{Regex: r} + var exp Expression = regexExpression{Regex: r} return &exp, nil } default: diff --git a/processors/filterprocessor/fields/searcher.go b/processors/filterprocessor/fields/searcher.go index a93038da5..44f4b6ec3 100644 --- a/processors/filterprocessor/fields/searcher.go +++ b/processors/filterprocessor/fields/searcher.go @@ -11,7 +11,7 @@ import ( // Searcher searches the struct with an expression and method to call type Searcher struct { - Exp *expression.Interface + Exp *expression.Expression Tag string MethodToCall string } @@ -37,7 +37,7 @@ func (f Searcher) Search(input transactions.SignedTxnInBlock) bool { } // checks that the supplied tag exists in the struct and recovers from any panics -func checkTagExistsAndHasCorrectFunction(expressionType expression.Type, tag string) (outError error) { +func checkTagExistsAndHasCorrectFunction(expressionType expression.FilterType, tag string) (outError error) { var field string defer func() { if r := recover(); r != nil { @@ -68,7 +68,7 @@ func checkTagExistsAndHasCorrectFunction(expressionType expression.Type, tag str } // MakeFieldSearcher will check that the field exists and that it contains the necessary "conversion" function -func MakeFieldSearcher(e *expression.Interface, expressionType expression.Type, tag string) (*Searcher, error) { +func MakeFieldSearcher(e *expression.Expression, expressionType expression.FilterType, tag string) (*Searcher, error) { if err := checkTagExistsAndHasCorrectFunction(expressionType, tag); err != nil { return nil, err From 9ed2a7853de9c04a2fe6a1ac126bd91079aab41c Mon Sep 17 00:00:00 2001 From: Stephen Akiki Date: Thu, 15 Sep 2022 08:21:13 -0400 Subject: [PATCH 12/17] PR updates --- .../filterprocessor/filter_processor_test.go | 191 +++++++++++++++++- 1 file changed, 189 insertions(+), 2 deletions(-) diff --git a/processors/filterprocessor/filter_processor_test.go b/processors/filterprocessor/filter_processor_test.go index 5e3be3334..8cfab6aae 100644 --- a/processors/filterprocessor/filter_processor_test.go +++ b/processors/filterprocessor/filter_processor_test.go @@ -15,7 +15,193 @@ import ( "github.com/algorand/indexer/processors" ) -// TestFilterProcessor_Init tests initialization of the filter processor +// TestFilterProcessor_Init_Multi tests initialization of the filter processor with the "all" and "some" filter types +func TestFilterProcessor_Init_Multi(t *testing.T) { + + sampleAddr1 := basics.Address{1} + sampleAddr2 := basics.Address{2} + sampleAddr3 := basics.Address{3} + + sampleCfgStr := `--- +filters: + - some: + - tag: SignedTxnWithAD.SignedTxn.AuthAddr + expression-type: const + expression: "` + sampleAddr1.String() + `" + - tag: SignedTxnWithAD.SignedTxn.Txn.AssetTransferTxnFields.AssetSender + expression-type: regex + expression: "` + sampleAddr3.String() + `" + - all: + - tag: SignedTxnWithAD.SignedTxn.Txn.PaymentTxnFields.Receiver + expression-type: regex + expression: "` + sampleAddr2.String() + `" + - tag: SignedTxnWithAD.SignedTxn.Txn.Header.Sender + expression-type: const + expression: "` + sampleAddr2.String() + `" + - some: + - tag: SignedTxnWithAD.SignedTxn.Txn.AssetTransferTxnFields.AssetCloseTo + expression-type: const + expression: "` + sampleAddr2.String() + `" + - tag: SignedTxnWithAD.SignedTxn.Txn.AssetTransferTxnFields.AssetReceiver + expression-type: regex + expression: "` + sampleAddr2.String() + `" +` + + fpBuilder, err := processors.ProcessorBuilderByName(implementationName) + assert.NoError(t, err) + + fp := fpBuilder.New() + err = fp.Init(context.Background(), &conduit.PipelineInitProvider{}, plugins.PluginConfig(sampleCfgStr), logrus.New()) + assert.NoError(t, err) + + bd := data.BlockData{} + bd.Payset = append(bd.Payset, + + transactions.SignedTxnInBlock{ + SignedTxnWithAD: transactions.SignedTxnWithAD{ + SignedTxn: transactions.SignedTxn{ + AuthAddr: sampleAddr1, + }, + }, + }, + transactions.SignedTxnInBlock{ + SignedTxnWithAD: transactions.SignedTxnWithAD{ + SignedTxn: transactions.SignedTxn{ + Txn: transactions.Transaction{ + PaymentTxnFields: transactions.PaymentTxnFields{ + Receiver: sampleAddr2, + }, + Header: transactions.Header{ + Sender: sampleAddr2, + }, + AssetTransferTxnFields: transactions.AssetTransferTxnFields{ + AssetCloseTo: sampleAddr2, + }, + }, + }, + }, + }, + transactions.SignedTxnInBlock{ + SignedTxnWithAD: transactions.SignedTxnWithAD{ + SignedTxn: transactions.SignedTxn{ + Txn: transactions.Transaction{ + AssetTransferTxnFields: transactions.AssetTransferTxnFields{ + AssetSender: sampleAddr3, + }, + PaymentTxnFields: transactions.PaymentTxnFields{ + Receiver: sampleAddr3, + }, + }, + }, + }, + }, + // The one transaction that will be allowed through + transactions.SignedTxnInBlock{ + SignedTxnWithAD: transactions.SignedTxnWithAD{ + SignedTxn: transactions.SignedTxn{ + Txn: transactions.Transaction{ + PaymentTxnFields: transactions.PaymentTxnFields{ + Receiver: sampleAddr2, + }, + Header: transactions.Header{ + Sender: sampleAddr2, + }, + AssetTransferTxnFields: transactions.AssetTransferTxnFields{ + AssetSender: sampleAddr3, + AssetCloseTo: sampleAddr2, + AssetReceiver: sampleAddr2, + }, + }, + }, + }, + }, + ) + + output, err := fp.Process(bd) + assert.NoError(t, err) + assert.Equal(t, len(output.Payset), 1) + assert.Equal(t, output.Payset[0].SignedTxnWithAD.SignedTxn.Txn.PaymentTxnFields.Receiver, sampleAddr2) + assert.Equal(t, output.Payset[0].SignedTxnWithAD.SignedTxn.Txn.Header.Sender, sampleAddr2) + assert.Equal(t, output.Payset[0].SignedTxnWithAD.SignedTxn.Txn.AssetTransferTxnFields.AssetSender, sampleAddr3) + assert.Equal(t, output.Payset[0].SignedTxnWithAD.SignedTxn.Txn.AssetTransferTxnFields.AssetCloseTo, sampleAddr2) + assert.Equal(t, output.Payset[0].SignedTxnWithAD.SignedTxn.Txn.AssetTransferTxnFields.AssetReceiver, sampleAddr2) + +} + +// TestFilterProcessor_Init_All tests initialization of the filter processor with the "all" filter type +func TestFilterProcessor_Init_All(t *testing.T) { + + sampleAddr1 := basics.Address{1} + sampleAddr2 := basics.Address{2} + sampleAddr3 := basics.Address{3} + + sampleCfgStr := `--- +filters: + - all: + - tag: SignedTxnWithAD.SignedTxn.Txn.PaymentTxnFields.Receiver + expression-type: regex + expression: "` + sampleAddr2.String() + `" + - tag: SignedTxnWithAD.SignedTxn.Txn.Header.Sender + expression-type: const + expression: "` + sampleAddr2.String() + `" +` + + fpBuilder, err := processors.ProcessorBuilderByName(implementationName) + assert.NoError(t, err) + + fp := fpBuilder.New() + err = fp.Init(context.Background(), &conduit.PipelineInitProvider{}, plugins.PluginConfig(sampleCfgStr), logrus.New()) + assert.NoError(t, err) + + bd := data.BlockData{} + bd.Payset = append(bd.Payset, + + transactions.SignedTxnInBlock{ + SignedTxnWithAD: transactions.SignedTxnWithAD{ + SignedTxn: transactions.SignedTxn{ + Txn: transactions.Transaction{ + PaymentTxnFields: transactions.PaymentTxnFields{ + Receiver: sampleAddr1, + }, + }, + }, + }, + }, + transactions.SignedTxnInBlock{ + SignedTxnWithAD: transactions.SignedTxnWithAD{ + SignedTxn: transactions.SignedTxn{ + Txn: transactions.Transaction{ + PaymentTxnFields: transactions.PaymentTxnFields{ + Receiver: sampleAddr2, + }, + Header: transactions.Header{ + Sender: sampleAddr2, + }, + }, + }, + }, + }, + transactions.SignedTxnInBlock{ + SignedTxnWithAD: transactions.SignedTxnWithAD{ + SignedTxn: transactions.SignedTxn{ + Txn: transactions.Transaction{ + PaymentTxnFields: transactions.PaymentTxnFields{ + Receiver: sampleAddr3, + }, + }, + }, + }, + }, + ) + + output, err := fp.Process(bd) + assert.NoError(t, err) + assert.Equal(t, len(output.Payset), 1) + assert.Equal(t, output.Payset[0].SignedTxnWithAD.SignedTxn.Txn.PaymentTxnFields.Receiver, sampleAddr2) + assert.Equal(t, output.Payset[0].SignedTxnWithAD.SignedTxn.Txn.Header.Sender, sampleAddr2) +} + +// TestFilterProcessor_Init_Some tests initialization of the filter processor with the "some" filter type func TestFilterProcessor_Init(t *testing.T) { sampleAddr1 := basics.Address{1} @@ -81,5 +267,6 @@ filters: output, err := fp.Process(bd) assert.NoError(t, err) assert.Equal(t, len(output.Payset), 2) - + assert.Equal(t, output.Payset[0].SignedTxnWithAD.SignedTxn.Txn.PaymentTxnFields.Receiver, sampleAddr1) + assert.Equal(t, output.Payset[1].SignedTxnWithAD.SignedTxn.Txn.PaymentTxnFields.Receiver, sampleAddr2) } From 1f381babf6ac918b0ee1569f91e35b82da6def15 Mon Sep 17 00:00:00 2001 From: Stephen Akiki Date: Thu, 15 Sep 2022 08:23:35 -0400 Subject: [PATCH 13/17] PR updates --- processors/filterprocessor/fields/searcher.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/processors/filterprocessor/fields/searcher.go b/processors/filterprocessor/fields/searcher.go index 44f4b6ec3..f1db42a3c 100644 --- a/processors/filterprocessor/fields/searcher.go +++ b/processors/filterprocessor/fields/searcher.go @@ -21,9 +21,7 @@ func (f Searcher) Search(input transactions.SignedTxnInBlock) bool { e := reflect.ValueOf(&input).Elem() - var field string - - for _, field = range strings.Split(f.Tag, ".") { + for _, field := range strings.Split(f.Tag, ".") { e = e.FieldByName(field) } From 9363ada1408761b139dda24e55181864e477c3e1 Mon Sep 17 00:00:00 2001 From: Stephen Akiki Date: Thu, 15 Sep 2022 10:10:47 -0400 Subject: [PATCH 14/17] Changed some to any and const to exact --- .../filterprocessor/expression/expression.go | 12 +++++------ processors/filterprocessor/fields/filter.go | 8 ++++---- .../filterprocessor/filter_processor.go | 4 ++-- .../filterprocessor/filter_processor_test.go | 20 +++++++++---------- 4 files changed, 22 insertions(+), 22 deletions(-) diff --git a/processors/filterprocessor/expression/expression.go b/processors/filterprocessor/expression/expression.go index ef153a0ad..861926035 100644 --- a/processors/filterprocessor/expression/expression.go +++ b/processors/filterprocessor/expression/expression.go @@ -9,17 +9,17 @@ import ( type FilterType string const ( - // ConstFilter a filter that looks at a constant string in its entirety - ConstFilter FilterType = "const" + // ExactFilter a filter that looks at a constant string in its entirety + ExactFilter FilterType = "exact" // RegexFilter a filter that applies regex rules to the matching RegexFilter FilterType = "regex" ) // TypeToFunctionMap maps the expression-type with the needed function for the expression. -// For instance the const or regex expression-type might need the String() function -// Can't make this consts because there are no constant maps in go... +// For instance the exact or regex expression-type might need the String() function +// Can't make this const because there are no constant maps in go... var TypeToFunctionMap = map[FilterType]string{ - ConstFilter: "String", + ExactFilter: "String", RegexFilter: "String", } @@ -39,7 +39,7 @@ func (e regexExpression) Search(input interface{}) bool { // MakeExpression creates an expression based on an expression type func MakeExpression(expressionType FilterType, expressionSearchStr string) (*Expression, error) { switch expressionType { - case ConstFilter: + case ExactFilter: { r, err := regexp.Compile("^" + expressionSearchStr + "$") if err != nil { diff --git a/processors/filterprocessor/fields/filter.go b/processors/filterprocessor/fields/filter.go index 65666b7c3..80040c65e 100644 --- a/processors/filterprocessor/fields/filter.go +++ b/processors/filterprocessor/fields/filter.go @@ -7,15 +7,15 @@ import ( "github.com/algorand/indexer/data" ) -// Operation an operation like "some" or "all" for boolean logic +// Operation an operation like "any" or "all" for boolean logic type Operation string -const someFieldOperation Operation = "some" +const anyFieldOperation Operation = "any" const allFieldOperation Operation = "all" // ValidFieldOperation returns true if the input is a valid operation func ValidFieldOperation(input string) bool { - if input != string(someFieldOperation) && input != string(allFieldOperation) { + if input != string(anyFieldOperation) && input != string(allFieldOperation) { return false } @@ -33,7 +33,7 @@ func (f Filter) SearchAndFilter(input data.BlockData) (data.BlockData, error) { var newPayset []transactions.SignedTxnInBlock switch f.Op { - case someFieldOperation: + case anyFieldOperation: for _, txn := range input.Payset { for _, fs := range f.Searchers { if fs.Search(txn) { diff --git a/processors/filterprocessor/filter_processor.go b/processors/filterprocessor/filter_processor.go index 7afba55d7..588e60e5b 100644 --- a/processors/filterprocessor/filter_processor.go +++ b/processors/filterprocessor/filter_processor.go @@ -62,10 +62,10 @@ func (a *FilterProcessor) Init(ctx context.Context, _ data.InitProvider, cfg plu return fmt.Errorf("filter processor init error: %w", err) } - // configMaps is the "- some: ...." portion of the filter config + // configMaps is the "- any: ...." portion of the filter config for _, configMaps := range pCfg.Filters { - // We only want one key in the map (i.e. either "some" or "all"). The reason we use a list is that want + // We only want one key in the map (i.e. either "any" or "all"). The reason we use a list is that want // to maintain ordering of the filters and a straight up map doesn't do that. if len(configMaps) != 1 { return fmt.Errorf("filter processor Init(): illegal filter tag formation. tag length was: %d", len(configMaps)) diff --git a/processors/filterprocessor/filter_processor_test.go b/processors/filterprocessor/filter_processor_test.go index 8cfab6aae..f52cb8bd3 100644 --- a/processors/filterprocessor/filter_processor_test.go +++ b/processors/filterprocessor/filter_processor_test.go @@ -15,7 +15,7 @@ import ( "github.com/algorand/indexer/processors" ) -// TestFilterProcessor_Init_Multi tests initialization of the filter processor with the "all" and "some" filter types +// TestFilterProcessor_Init_Multi tests initialization of the filter processor with the "all" and "any" filter types func TestFilterProcessor_Init_Multi(t *testing.T) { sampleAddr1 := basics.Address{1} @@ -24,9 +24,9 @@ func TestFilterProcessor_Init_Multi(t *testing.T) { sampleCfgStr := `--- filters: - - some: + - any: - tag: SignedTxnWithAD.SignedTxn.AuthAddr - expression-type: const + expression-type: exact expression: "` + sampleAddr1.String() + `" - tag: SignedTxnWithAD.SignedTxn.Txn.AssetTransferTxnFields.AssetSender expression-type: regex @@ -36,11 +36,11 @@ filters: expression-type: regex expression: "` + sampleAddr2.String() + `" - tag: SignedTxnWithAD.SignedTxn.Txn.Header.Sender - expression-type: const + expression-type: exact expression: "` + sampleAddr2.String() + `" - - some: + - any: - tag: SignedTxnWithAD.SignedTxn.Txn.AssetTransferTxnFields.AssetCloseTo - expression-type: const + expression-type: exact expression: "` + sampleAddr2.String() + `" - tag: SignedTxnWithAD.SignedTxn.Txn.AssetTransferTxnFields.AssetReceiver expression-type: regex @@ -142,7 +142,7 @@ filters: expression-type: regex expression: "` + sampleAddr2.String() + `" - tag: SignedTxnWithAD.SignedTxn.Txn.Header.Sender - expression-type: const + expression-type: exact expression: "` + sampleAddr2.String() + `" ` @@ -201,7 +201,7 @@ filters: assert.Equal(t, output.Payset[0].SignedTxnWithAD.SignedTxn.Txn.Header.Sender, sampleAddr2) } -// TestFilterProcessor_Init_Some tests initialization of the filter processor with the "some" filter type +// TestFilterProcessor_Init_Some tests initialization of the filter processor with the "any" filter type func TestFilterProcessor_Init(t *testing.T) { sampleAddr1 := basics.Address{1} @@ -210,12 +210,12 @@ func TestFilterProcessor_Init(t *testing.T) { sampleCfgStr := `--- filters: - - some: + - any: - tag: SignedTxnWithAD.SignedTxn.Txn.PaymentTxnFields.Receiver expression-type: regex expression: "` + sampleAddr1.String() + `" - tag: SignedTxnWithAD.SignedTxn.Txn.PaymentTxnFields.Receiver - expression-type: const + expression-type: exact expression: "` + sampleAddr2.String() + `" ` From 0fdcf5035e651fd933fe4321914fea7815afb204 Mon Sep 17 00:00:00 2001 From: Stephen Akiki Date: Thu, 15 Sep 2022 12:14:22 -0400 Subject: [PATCH 15/17] Test failure fix --- processors/filterprocessor/fields/searcher_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/processors/filterprocessor/fields/searcher_test.go b/processors/filterprocessor/fields/searcher_test.go index 4213e09dc..e955cb9aa 100644 --- a/processors/filterprocessor/fields/searcher_test.go +++ b/processors/filterprocessor/fields/searcher_test.go @@ -9,13 +9,13 @@ import ( // TestCheckTagExistsAndHasCorrectFunction tests that the check tag exists and function relation works func TestCheckTagExistsAndHasCorrectFunction(t *testing.T) { // check that something that doesn't exist throws an error - err := checkTagExistsAndHasCorrectFunction("const", "SignedTxnWithAD.SignedTxn.Txn.PaymentTxnFields.LoreumIpsum.SDF") + err := checkTagExistsAndHasCorrectFunction("exact", "SignedTxnWithAD.SignedTxn.Txn.PaymentTxnFields.LoreumIpsum.SDF") assert.ErrorContains(t, err, "does not exist in transactions") - err = checkTagExistsAndHasCorrectFunction("const", "LoreumIpsum") + err = checkTagExistsAndHasCorrectFunction("exact", "LoreumIpsum") assert.ErrorContains(t, err, "does not exist in transactions") // Fee does not have a "String" Function so we cant use const with it. - err = checkTagExistsAndHasCorrectFunction("const", "SignedTxnWithAD.SignedTxn.Txn.Header.Fee") + err = checkTagExistsAndHasCorrectFunction("exact", "SignedTxnWithAD.SignedTxn.Txn.Header.Fee") assert.ErrorContains(t, err, "does not contain the needed method") } From f2aca3614b661201bf9288b473bf6f11294edf62 Mon Sep 17 00:00:00 2001 From: Stephen Akiki Date: Fri, 16 Sep 2022 11:08:00 -0400 Subject: [PATCH 16/17] Added more unit tests --- processors/filterprocessor/fields/filter.go | 4 +- processors/filterprocessor/fields/searcher.go | 11 ++- .../filterprocessor/fields/searcher_test.go | 77 ++++++++++++++++++- .../filterprocessor/filter_processor.go | 4 +- .../filterprocessor/filter_processor_test.go | 60 +++++++++++++++ 5 files changed, 148 insertions(+), 8 deletions(-) diff --git a/processors/filterprocessor/fields/filter.go b/processors/filterprocessor/fields/filter.go index 80040c65e..3f6869525 100644 --- a/processors/filterprocessor/fields/filter.go +++ b/processors/filterprocessor/fields/filter.go @@ -36,7 +36,7 @@ func (f Filter) SearchAndFilter(input data.BlockData) (data.BlockData, error) { case anyFieldOperation: for _, txn := range input.Payset { for _, fs := range f.Searchers { - if fs.Search(txn) { + if fs.search(txn) { newPayset = append(newPayset, txn) break } @@ -49,7 +49,7 @@ func (f Filter) SearchAndFilter(input data.BlockData) (data.BlockData, error) { allTrue := true for _, fs := range f.Searchers { - if !fs.Search(txn) { + if !fs.search(txn) { allTrue = false break } diff --git a/processors/filterprocessor/fields/searcher.go b/processors/filterprocessor/fields/searcher.go index f1db42a3c..38ff05ada 100644 --- a/processors/filterprocessor/fields/searcher.go +++ b/processors/filterprocessor/fields/searcher.go @@ -16,9 +16,10 @@ type Searcher struct { MethodToCall string } -// Search returns true if block contains the expression -func (f Searcher) Search(input transactions.SignedTxnInBlock) bool { - +// This function is ONLY to be used by the filter.field function. +// The reason being is that without validation of the tag (which is provided by +// MakeFieldSearcher) then this can panic +func (f Searcher) search(input transactions.SignedTxnInBlock) bool { e := reflect.ValueOf(&input).Elem() for _, field := range strings.Split(f.Tag, ".") { @@ -38,6 +39,10 @@ func (f Searcher) Search(input transactions.SignedTxnInBlock) bool { func checkTagExistsAndHasCorrectFunction(expressionType expression.FilterType, tag string) (outError error) { var field string defer func() { + // This defer'd function is a belt and suspenders type thing. We check every reflected + // evaluation's IsValid() function to make sure not to operate on a zero value. Therfore we can't + // actually reach inside the if conditional unless we intentionally panic. + // However, having this function gives additional safety to a critical function if r := recover(); r != nil { outError = fmt.Errorf("error occured regarding tag %s. last searched field was: %s - %v", tag, field, r) } diff --git a/processors/filterprocessor/fields/searcher_test.go b/processors/filterprocessor/fields/searcher_test.go index e955cb9aa..593c4068e 100644 --- a/processors/filterprocessor/fields/searcher_test.go +++ b/processors/filterprocessor/fields/searcher_test.go @@ -4,8 +4,76 @@ import ( "testing" "github.com/stretchr/testify/assert" + + "github.com/algorand/go-algorand/data/basics" + "github.com/algorand/go-algorand/data/transactions" + "github.com/algorand/indexer/processors/filterprocessor/expression" ) +// TestInternalSearch tests the internal search functionality +func TestInternalSearch(t *testing.T) { + + defer func() { + // Since this function should only be called after validation is performed, + // this recovery function lets us recover is the schema changes in anyway in the future + if r := recover(); r != nil { + assert.True(t, false) + } + }() + + address1 := basics.Address{1} + address2 := basics.Address{2} + + var expressionType expression.FilterType = "exact" + tag := "SignedTxnWithAD.SignedTxn.AuthAddr" + exp, err := expression.MakeExpression(expressionType, address1.String()) + assert.NoError(t, err) + searcher, err := MakeFieldSearcher(exp, expressionType, tag) + + result := searcher.search( + transactions.SignedTxnInBlock{ + SignedTxnWithAD: transactions.SignedTxnWithAD{ + SignedTxn: transactions.SignedTxn{ + AuthAddr: address1, + }, + }, + }, + ) + + assert.True(t, result) + + result = searcher.search( + transactions.SignedTxnInBlock{ + SignedTxnWithAD: transactions.SignedTxnWithAD{ + SignedTxn: transactions.SignedTxn{ + AuthAddr: address2, + }, + }, + }, + ) + + assert.False(t, result) +} + +// TestMakeFieldSearcher tests making a field searcher is valid +func TestMakeFieldSearcher(t *testing.T) { + var expressionType expression.FilterType = "exact" + tag := "SignedTxnWithAD.SignedTxn.AuthAddr" + sampleExpressionStr := "sample" + exp, err := expression.MakeExpression(expressionType, sampleExpressionStr) + assert.NoError(t, err) + searcher, err := MakeFieldSearcher(exp, expressionType, tag) + assert.NoError(t, err) + assert.NotNil(t, searcher) + assert.Equal(t, searcher.Tag, tag) + assert.Equal(t, searcher.MethodToCall, expression.TypeToFunctionMap[expressionType]) + + searcher, err = MakeFieldSearcher(exp, "made-up-expression-type", sampleExpressionStr) + assert.Error(t, err) + assert.Nil(t, searcher) + +} + // TestCheckTagExistsAndHasCorrectFunction tests that the check tag exists and function relation works func TestCheckTagExistsAndHasCorrectFunction(t *testing.T) { // check that something that doesn't exist throws an error @@ -15,7 +83,14 @@ func TestCheckTagExistsAndHasCorrectFunction(t *testing.T) { err = checkTagExistsAndHasCorrectFunction("exact", "LoreumIpsum") assert.ErrorContains(t, err, "does not exist in transactions") - // Fee does not have a "String" Function so we cant use const with it. + // Fee does not have a "String" Function so we cant use exact with it. err = checkTagExistsAndHasCorrectFunction("exact", "SignedTxnWithAD.SignedTxn.Txn.Header.Fee") assert.ErrorContains(t, err, "does not contain the needed method") + + // a made up expression type should throw an error + err = checkTagExistsAndHasCorrectFunction("made-up-expression-type", "SignedTxnWithAD.SignedTxn.AuthAddr") + assert.ErrorContains(t, err, "is not supported") + + err = checkTagExistsAndHasCorrectFunction("exact", "SignedTxnWithAD.SignedTxn.AuthAddr") + assert.NoError(t, err) } diff --git a/processors/filterprocessor/filter_processor.go b/processors/filterprocessor/filter_processor.go index 588e60e5b..bd38a5ab5 100644 --- a/processors/filterprocessor/filter_processor.go +++ b/processors/filterprocessor/filter_processor.go @@ -66,7 +66,7 @@ func (a *FilterProcessor) Init(ctx context.Context, _ data.InitProvider, cfg plu for _, configMaps := range pCfg.Filters { // We only want one key in the map (i.e. either "any" or "all"). The reason we use a list is that want - // to maintain ordering of the filters and a straight up map doesn't do that. + // to maintain ordering of the filters and a straight-up map doesn't do that. if len(configMaps) != 1 { return fmt.Errorf("filter processor Init(): illegal filter tag formation. tag length was: %d", len(configMaps)) } @@ -88,7 +88,7 @@ func (a *FilterProcessor) Init(ctx context.Context, _ data.InitProvider, cfg plu searcher, err := fields.MakeFieldSearcher(exp, subConfig.ExpressionType, subConfig.FilterTag) if err != nil { - return fmt.Errorf("filter processor Init(): %w", err) + return fmt.Errorf("filter processor Init(): error making field searcher - %w", err) } searcherList = append(searcherList, searcher) diff --git a/processors/filterprocessor/filter_processor_test.go b/processors/filterprocessor/filter_processor_test.go index f52cb8bd3..e24a44e6c 100644 --- a/processors/filterprocessor/filter_processor_test.go +++ b/processors/filterprocessor/filter_processor_test.go @@ -15,6 +15,66 @@ import ( "github.com/algorand/indexer/processors" ) +// TestFilterProcessor_VariousErrorPathsOnInit tests the various error paths in the filter processor init function +func TestFilterProcessor_VariousErrorPathsOnInit(t *testing.T) { + tests := []struct { + name string + sampleCfgStr string + errorContainsStr string + }{ + + {"MakeExpressionError", `--- +filters: + - any: + - tag: DoesNot.ExistIn.Struct + expression-type: exact + expression: "sample" +`, "error making field searcher"}, + + {"MakeExpressionError", `--- +filters: + - any: + - tag: SignedTxnWithAD.SignedTxn.AuthAddr + expression-type: wrong-expression-type + expression: "sample" +`, "could not make expression with string"}, + + {"CorrectFilterType", `--- +filters: + - wrong-filter-type: + - tag: SignedTxnWithAD.SignedTxn.AuthAddr + expression-type: exact + expression: "sample" + +`, "filter key was not a valid value"}, + + {"FilterTagFormation", `--- +filters: + - any: + - tag: SignedTxnWithAD.SignedTxn.AuthAddr + expression-type: exact + expression: "sample" + all: + - tag: SignedTxnWithAD.SignedTxn.AuthAddr + expression-type: exact + expression: "sample" + + +`, "illegal filter tag formation"}, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + + fpBuilder, err := processors.ProcessorBuilderByName(implementationName) + assert.NoError(t, err) + + fp := fpBuilder.New() + err = fp.Init(context.Background(), &conduit.PipelineInitProvider{}, plugins.PluginConfig(test.sampleCfgStr), logrus.New()) + assert.ErrorContains(t, err, test.errorContainsStr) + }) + } +} + // TestFilterProcessor_Init_Multi tests initialization of the filter processor with the "all" and "any" filter types func TestFilterProcessor_Init_Multi(t *testing.T) { From 6c18d483ffb1611f4756ba5e23fe9a446f69c55d Mon Sep 17 00:00:00 2001 From: Stephen Akiki Date: Fri, 16 Sep 2022 11:12:50 -0400 Subject: [PATCH 17/17] Config comments --- processors/filterprocessor/config.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/processors/filterprocessor/config.go b/processors/filterprocessor/config.go index 26db01d7c..63957d61a 100644 --- a/processors/filterprocessor/config.go +++ b/processors/filterprocessor/config.go @@ -4,15 +4,16 @@ import "github.com/algorand/indexer/processors/filterprocessor/expression" // SubConfig is the configuration needed for each additional filter type SubConfig struct { - // The tag of the struct to analyze + // FilterTag the tag of the struct to analyze FilterTag string `yaml:"tag"` - // The type of expression to search for "const" or "regex" + // ExpressionType the type of expression to search for (i.e. "exact" or "regex") ExpressionType expression.FilterType `yaml:"expression-type"` - // The expression to search + // Expression the expression to search Expression string `yaml:"expression"` } // Config configuration for the filter processor type Config struct { + // Filters are a list of SubConfig objects with an operation acting as the string key in the map Filters []map[string][]SubConfig `yaml:"filters"` }