Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial Filter Processor #1220

Merged
merged 17 commits into from
Sep 16, 2022
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions processors/blockprocessor/block_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (proc *blockProcessor) Init(ctx context.Context, initProvider data.InitProv
proc.logger = logger

// First get the configuration from the string
var pCfg processors.BlockProcessorConfig
var pCfg Config
err := yaml.Unmarshal([]byte(cfg), &pCfg)
if err != nil {
return fmt.Errorf("blockprocessor init error: %w", err)
Expand Down Expand Up @@ -184,7 +184,7 @@ func MakeBlockProcessorWithLedger(logger *log.Logger, l *ledger.Ledger, handler
}

// MakeBlockProcessorWithLedgerInit creates a block processor and initializes the ledger.
func MakeBlockProcessorWithLedgerInit(ctx context.Context, logger *log.Logger, nextDbRound uint64, genesis *bookkeeping.Genesis, config processors.BlockProcessorConfig, handler func(block *ledgercore.ValidatedBlock) error) (BlockProcessor, error) {
func MakeBlockProcessorWithLedgerInit(ctx context.Context, logger *log.Logger, nextDbRound uint64, genesis *bookkeeping.Genesis, config Config, handler func(block *ledgercore.ValidatedBlock) error) (BlockProcessor, error) {
err := InitializeLedger(ctx, logger, nextDbRound, *genesis, &config)
if err != nil {
return nil, fmt.Errorf("MakeBlockProcessorWithLedgerInit() err: %w", err)
Expand Down
3 changes: 1 addition & 2 deletions processors/blockprocessor/block_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package blockprocessor_test
import (
"context"
"fmt"
"github.com/algorand/indexer/processors"
"testing"

test2 "github.com/sirupsen/logrus/hooks/test"
Expand Down Expand Up @@ -166,7 +165,7 @@ func TestMakeProcessorWithLedgerInit_CatchpointErrors(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
config := processors.BlockProcessorConfig{Catchpoint: tc.catchpoint}
config := blockprocessor.Config{Catchpoint: tc.catchpoint}
_, err := blockprocessor.MakeBlockProcessorWithLedgerInit(
context.Background(),
logger,
Expand Down
6 changes: 3 additions & 3 deletions processors/config.go → processors/blockprocessor/config.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package processors
package blockprocessor

// BlockProcessorConfig configuration for a block processor
type BlockProcessorConfig struct {
// Config configuration for a block processor
type Config struct {
// Catchpoint to initialize the local ledger to
Catchpoint string `yaml:"catchpoint"`

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package processors
package blockprocessor

import (
"github.com/stretchr/testify/require"
Expand All @@ -16,7 +16,7 @@ func TestConfigDeserialize(t *testing.T) {
algod-addr: "algod_addr"
`

var processorConfig BlockProcessorConfig
var processorConfig Config
err := yaml.Unmarshal([]byte(configStr), &processorConfig)
require.Nil(t, err)
require.Equal(t, processorConfig.Catchpoint, "acatch")
Expand Down
7 changes: 3 additions & 4 deletions processors/blockprocessor/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@ import (
"github.com/algorand/go-algorand/rpcs"

"github.com/algorand/indexer/fetcher"
"github.com/algorand/indexer/processors"
"github.com/algorand/indexer/processors/blockprocessor/internal"
)

// InitializeLedger will initialize a ledger to the directory given by the
// IndexerDbOpts.
// nextRound - next round to process after initializing.
// catchpoint - if provided, attempt to use fast catchup.
func InitializeLedger(ctx context.Context, logger *log.Logger, nextDbRound uint64, genesis bookkeeping.Genesis, config *processors.BlockProcessorConfig) error {
func InitializeLedger(ctx context.Context, logger *log.Logger, nextDbRound uint64, genesis bookkeeping.Genesis, config *Config) error {
if nextDbRound > 0 {
if config.Catchpoint != "" {
round, _, err := ledgercore.ParseCatchpointLabel(config.Catchpoint)
Expand Down Expand Up @@ -59,7 +58,7 @@ func InitializeLedgerFastCatchup(ctx context.Context, logger *log.Logger, catchp

// InitializeLedgerSimple initializes a ledger with the block processor by
// sending it one block at a time and letting it update the ledger as usual.
func InitializeLedgerSimple(ctx context.Context, logger *log.Logger, round uint64, genesis *bookkeeping.Genesis, config *processors.BlockProcessorConfig) error {
func InitializeLedgerSimple(ctx context.Context, logger *log.Logger, round uint64, genesis *bookkeeping.Genesis, config *Config) error {
ctx, cf := context.WithCancel(ctx)
defer cf()
var bot fetcher.Fetcher
Expand Down Expand Up @@ -138,7 +137,7 @@ func handleBlock(logger *log.Logger, block *rpcs.EncodedBlockCert, procHandler f
return nil
}

func getFetcher(logger *log.Logger, config *processors.BlockProcessorConfig) (fetcher.Fetcher, error) {
func getFetcher(logger *log.Logger, config *Config) (fetcher.Fetcher, error) {
var err error
var bot fetcher.Fetcher
if config.AlgodDataDir != "" {
Expand Down
3 changes: 1 addition & 2 deletions processors/blockprocessor/initialize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/rpcs"

"github.com/algorand/indexer/processors"
"github.com/algorand/indexer/processors/blockprocessor/internal"
"github.com/algorand/indexer/util"
"github.com/algorand/indexer/util/test"
Expand Down Expand Up @@ -61,7 +60,7 @@ func TestRunMigration(t *testing.T) {

dname, err := os.MkdirTemp("", "indexer")
defer os.RemoveAll(dname)
config := processors.BlockProcessorConfig{
config := Config{
IndexerDatadir: dname,
AlgodAddr: "localhost",
AlgodToken: "AAAAA",
Expand Down
18 changes: 18 additions & 0 deletions processors/filterprocessor/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package filterprocessor

import "github.com/algorand/indexer/processors/filterprocessor/expression"

// SubConfig is the configuration needed for each additional filter
type SubConfig struct {
// The tag of the struct to analyze
FilterTag string `yaml:"tag"`
// The type of expression to search for "const" or "regex"
AlgoStephenAkiki marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like const this was renamed. Also, since you have comments anyway, could you add the variable names so that they matches the godoc format?

ExpressionType expression.Type `yaml:"expression-type"`
// The expression to search
Expression string `yaml:"expression"`
}

// Config configuration for the filter processor
type Config struct {
Filters []map[string][]SubConfig `yaml:"filters"`
}
65 changes: 65 additions & 0 deletions processors/filterprocessor/expression/expression.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package expression

import (
"fmt"
"regexp"
)

// Type is the type of the filter (i.e. const, regex, etc)
type Type string
AlgoStephenAkiki marked this conversation as resolved.
Show resolved Hide resolved

const (
// ConstFilter a filter that looks at a constant string in its entirety
ConstFilter Type = "const"
// RegexFilter a filter that applies regex rules to the matching
RegexFilter Type = "regex"
)

// TypeToFunctionMap maps the expression-type with the needed function for the expression.
// For instance the const or regex expression-type might need the String() function
// Can't make this consts because there are no constant maps in go...
var TypeToFunctionMap = map[Type]string{
ConstFilter: "String",
RegexFilter: "String",
}

// Interface the expression interface
type Interface interface {
AlgoStephenAkiki marked this conversation as resolved.
Show resolved Hide resolved
Search(input interface{}) bool
}

type regexExpression struct {
Regex *regexp.Regexp
}

func (e regexExpression) Search(input interface{}) bool {
return e.Regex.MatchString(input.(string))
}

// MakeExpression creates an expression based on an expression type
func MakeExpression(expressionType Type, expressionSearchStr string) (*Interface, error) {
switch expressionType {
case ConstFilter:
{
r, err := regexp.Compile("^" + expressionSearchStr + "$")
if err != nil {
return nil, err
}

var exp Interface = regexExpression{Regex: r}
return &exp, nil
}
case RegexFilter:
{
r, err := regexp.Compile(expressionSearchStr)
if err != nil {
return nil, err
}

var exp Interface = regexExpression{Regex: r}
return &exp, nil
}
default:
return nil, fmt.Errorf("unknown expression type: %s", expressionType)
}
}
72 changes: 72 additions & 0 deletions processors/filterprocessor/fields/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package fields

import (
"fmt"

"github.com/algorand/go-algorand/data/transactions"
"github.com/algorand/indexer/data"
)

// Operation an operation like "some" or "all" for boolean logic
type Operation string

const someFieldOperation Operation = "some"
const allFieldOperation Operation = "all"
AlgoStephenAkiki marked this conversation as resolved.
Show resolved Hide resolved

// ValidFieldOperation returns true if the input is a valid operation
func ValidFieldOperation(input string) bool {
if input != string(someFieldOperation) && input != string(allFieldOperation) {
return false
}

return true
}

// Filter an object that combines field searches with a boolean operator
type Filter struct {
Op Operation
Searchers []*Searcher
}

// SearchAndFilter searches through the block data and applies the operation to the results
func (f Filter) SearchAndFilter(input data.BlockData) (data.BlockData, error) {

var newPayset []transactions.SignedTxnInBlock
switch f.Op {
case someFieldOperation:
for _, txn := range input.Payset {
for _, fs := range f.Searchers {
if fs.Search(txn) {
newPayset = append(newPayset, txn)
break
}
}
}

break
case allFieldOperation:
for _, txn := range input.Payset {

allTrue := true
for _, fs := range f.Searchers {
if !fs.Search(txn) {
allTrue = false
break
}
}

if allTrue {
newPayset = append(newPayset, txn)
}

}
break
default:
return data.BlockData{}, fmt.Errorf("unknown operation: %s", f.Op)
}

input.Payset = newPayset

return input, nil

}
78 changes: 78 additions & 0 deletions processors/filterprocessor/fields/searcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package fields

import (
"fmt"
"reflect"
"strings"

"github.com/algorand/go-algorand/data/transactions"
"github.com/algorand/indexer/processors/filterprocessor/expression"
)

// Searcher searches the struct with an expression and method to call
type Searcher struct {
Exp *expression.Interface
Tag string
MethodToCall string
}

// Search returns true if block contains the expression
func (f Searcher) Search(input transactions.SignedTxnInBlock) bool {

e := reflect.ValueOf(&input).Elem()

var field string
AlgoStephenAkiki marked this conversation as resolved.
Show resolved Hide resolved

for _, field = range strings.Split(f.Tag, ".") {
AlgoStephenAkiki marked this conversation as resolved.
Show resolved Hide resolved
e = e.FieldByName(field)
}

toSearch := e.MethodByName(f.MethodToCall).Call([]reflect.Value{})[0].Interface()

if (*f.Exp).Search(toSearch) {
return true
}

return false
}

// checks that the supplied tag exists in the struct and recovers from any panics
func checkTagExistsAndHasCorrectFunction(expressionType expression.Type, tag string) (outError error) {
var field string
defer func() {
if r := recover(); r != nil {
outError = fmt.Errorf("error occured regarding tag %s. last searched field was: %s - %v", tag, field, r)
}
}()

e := reflect.ValueOf(&transactions.SignedTxnInBlock{}).Elem()

for _, field = range strings.Split(tag, ".") {
e = e.FieldByName(field)
if !e.IsValid() {
return fmt.Errorf("%s does not exist in transactions.SignedTxnInBlock struct. last searched field was: %s", tag, field)
}
}

method, ok := expression.TypeToFunctionMap[expressionType]

if !ok {
return fmt.Errorf("expression type (%s) is not supported. tag value: %s", expressionType, tag)
}

if !e.MethodByName(method).IsValid() {
return fmt.Errorf("variable referenced by tag %s does not contain the needed method: %s", tag, method)
}

return nil
}

// MakeFieldSearcher will check that the field exists and that it contains the necessary "conversion" function
func MakeFieldSearcher(e *expression.Interface, expressionType expression.Type, tag string) (*Searcher, error) {

if err := checkTagExistsAndHasCorrectFunction(expressionType, tag); err != nil {
return nil, err
}

return &Searcher{Exp: e, Tag: tag, MethodToCall: expression.TypeToFunctionMap[expressionType]}, nil
}
21 changes: 21 additions & 0 deletions processors/filterprocessor/fields/searcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package fields

import (
"testing"

"github.com/stretchr/testify/assert"
)

// TestCheckTagExistsAndHasCorrectFunction tests that the check tag exists and function relation works
func TestCheckTagExistsAndHasCorrectFunction(t *testing.T) {
// check that something that doesn't exist throws an error
err := checkTagExistsAndHasCorrectFunction("const", "SignedTxnWithAD.SignedTxn.Txn.PaymentTxnFields.LoreumIpsum.SDF")
assert.ErrorContains(t, err, "does not exist in transactions")

err = checkTagExistsAndHasCorrectFunction("const", "LoreumIpsum")
assert.ErrorContains(t, err, "does not exist in transactions")

// Fee does not have a "String" Function so we cant use const with it.
err = checkTagExistsAndHasCorrectFunction("const", "SignedTxnWithAD.SignedTxn.Txn.Header.Fee")
assert.ErrorContains(t, err, "does not contain the needed method")
}
Loading