Skip to content

Commit

Permalink
Antispam by sourcename (#682)
Browse files Browse the repository at this point in the history
* check antispam exceptions by source name

* add test
  • Loading branch information
kirillov6 authored Oct 4, 2024
1 parent dd71bb8 commit e02c4a6
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 26 deletions.
10 changes: 5 additions & 5 deletions fd/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@ import (

"github.com/bitly/go-simplejson"
"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/cfg/matchrule"
"github.com/ozontech/file.d/logger"
"github.com/ozontech/file.d/pipeline"
"github.com/ozontech/file.d/pipeline/antispam"
"github.com/ozontech/file.d/pipeline/doif"
)

func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings {
capacity := pipeline.DefaultCapacity
antispamThreshold := 0
antispamField := ""
var antispamExceptions matchrule.RuleSets
var antispamExceptions antispam.Exceptions
avgInputEventSize := pipeline.DefaultAvgInputEventSize
maxInputEventSize := pipeline.DefaultMaxInputEventSize
streamField := pipeline.DefaultStreamField
Expand Down Expand Up @@ -86,7 +86,7 @@ func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings {
antispamField = settings.Get("antispam_field").MustString()

var err error
antispamExceptions, err = extractExceptions(settings)
antispamExceptions, err = extractAntispamExceptions(settings)
if err != nil {
logger.Fatalf("extract exceptions: %s", err)
}
Expand Down Expand Up @@ -121,7 +121,7 @@ func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings {
}
}

func extractExceptions(settings *simplejson.Json) (matchrule.RuleSets, error) {
func extractAntispamExceptions(settings *simplejson.Json) (antispam.Exceptions, error) {
raw, err := settings.Get("antispam_exceptions").MarshalJSON()
if err != nil {
return nil, err
Expand All @@ -130,7 +130,7 @@ func extractExceptions(settings *simplejson.Json) (matchrule.RuleSets, error) {
dec := json.NewDecoder(bytes.NewReader(raw))
dec.DisallowUnknownFields()

var exceptions matchrule.RuleSets
var exceptions antispam.Exceptions
if err := dec.Decode(&exceptions); err != nil {
return nil, err
}
Expand Down
23 changes: 20 additions & 3 deletions pipeline/antispam/antispammer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type Antispammer struct {
maintenanceInterval time.Duration
mu sync.RWMutex
sources map[any]source
exceptions matchrule.RuleSets
exceptions Exceptions

logger *zap.Logger

Expand All @@ -44,7 +44,7 @@ type Options struct {
Threshold int
Field string
UnbanIterations int
Exceptions matchrule.RuleSets
Exceptions Exceptions

Logger *zap.Logger
MetricsController *metric.Ctl
Expand Down Expand Up @@ -90,7 +90,11 @@ func (a *Antispammer) IsSpam(id any, name string, isNewSource bool, event []byte

for i := 0; i < len(a.exceptions); i++ {
e := &a.exceptions[i]
if e.Match(event) {
checkData := event
if e.CheckSourceName {
checkData = []byte(name)
}
if e.Match(checkData) {
if e.Name != "" {
a.exceptionMetric.WithLabelValues(e.Name).Inc()
}
Expand Down Expand Up @@ -204,3 +208,16 @@ func (a *Antispammer) Dump() string {

return out
}

type Exception struct {
matchrule.RuleSet
CheckSourceName bool `json:"check_source_name"`
}

type Exceptions []Exception

func (e Exceptions) Prepare() {
for i := range e {
e[i].Prepare()
}
}
114 changes: 98 additions & 16 deletions pipeline/antispam/antispammer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,34 @@ import (
"testing"
"time"

"github.com/ozontech/file.d/cfg/matchrule"
"github.com/ozontech/file.d/logger"
"github.com/ozontech/file.d/metric"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
)

func TestAntispam(t *testing.T) {
r := require.New(t)

threshold := 5
unbanIterations := 2
maintenanceInterval := time.Second * 1
func newAntispammer(threshold, unbanIterations int, maintenanceInterval time.Duration) *Antispammer {
holder := metric.NewHolder(time.Minute)
antispamer := NewAntispammer(&Options{
return NewAntispammer(&Options{
MaintenanceInterval: maintenanceInterval,
Threshold: threshold,
UnbanIterations: unbanIterations,
Logger: logger.Instance.Named("antispam").Desugar(),
MetricsController: metric.NewCtl("test", prometheus.NewRegistry()),
MetricHolder: holder,
})
}

func TestAntispam(t *testing.T) {
r := require.New(t)

threshold := 5
unbanIterations := 2
maintenanceInterval := time.Second * 1

antispamer := newAntispammer(threshold, unbanIterations, maintenanceInterval)

startTime := time.Now()
checkSpam := func(i int) bool {
Expand Down Expand Up @@ -53,15 +60,8 @@ func TestAntispamAfterRestart(t *testing.T) {
threshold := 5
unbanIterations := 2
maintenanceInterval := time.Second * 1
holder := metric.NewHolder(time.Minute)
antispamer := NewAntispammer(&Options{
MaintenanceInterval: maintenanceInterval,
Threshold: threshold,
UnbanIterations: unbanIterations,
Logger: logger.Instance.Named("antispam").Desugar(),
MetricsController: metric.NewCtl("test", prometheus.NewRegistry()),
MetricHolder: holder,
})

antispamer := newAntispammer(threshold, unbanIterations, maintenanceInterval)

startTime := time.Now()
checkSpam := func(i int) bool {
Expand All @@ -77,3 +77,85 @@ func TestAntispamAfterRestart(t *testing.T) {
result := checkSpam(threshold)
r.False(result)
}

func TestAntispamExceptions(t *testing.T) {
r := require.New(t)
now := time.Now()

threshold := 1
unbanIterations := 2
maintenanceInterval := time.Second * 1

antispamer := newAntispammer(threshold, unbanIterations, maintenanceInterval)

eventRulesetName := "test_event"
sourceRulesetName := "test_sourcename"

antispamer.exceptions = Exceptions{
{
RuleSet: matchrule.RuleSet{
Name: eventRulesetName,
Cond: matchrule.CondOr,
Rules: []matchrule.Rule{
{
Mode: matchrule.ModePrefix,
Values: []string{
`{"level":"debug"`,
`{"level":"info"`,
},
},
{
Mode: matchrule.ModeContains,
Values: []string{"test_event"},
},
},
},
},
{
CheckSourceName: true,
RuleSet: matchrule.RuleSet{
Name: sourceRulesetName,
Cond: matchrule.CondAnd,
Rules: []matchrule.Rule{
{
Mode: matchrule.ModeContains,
Values: []string{"my_source1", "my_source2"},
},
},
},
},
}
antispamer.exceptions.Prepare()

checkSpam := func(source, event string, wantMetric map[string]float64) {
antispamer.IsSpam(1, source, true, []byte(event), now)
for k, v := range wantMetric {
r.Equal(v, testutil.ToFloat64(antispamer.exceptionMetric.WithLabelValues(k)))
}
}

checkSpam("test", `{"level":"info","message":test"}`, map[string]float64{
eventRulesetName: 1,
sourceRulesetName: 0,
})

checkSpam("test", `{"level":"error","message":test_event123"}`, map[string]float64{
eventRulesetName: 2,
sourceRulesetName: 0,
})

checkSpam("my_source2", `{"level":"error","message":test"}`, map[string]float64{
eventRulesetName: 2,
sourceRulesetName: 1,
})

checkSpam("my_source1", `{"level":"debug","message":test"}`, map[string]float64{
eventRulesetName: 3,
sourceRulesetName: 1,
})

checkSpam("test", `{"level":"error","message":test"}`, map[string]float64{
eventRulesetName: 3,
sourceRulesetName: 1,
})
}
3 changes: 1 addition & 2 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"sync"
"time"

"github.com/ozontech/file.d/cfg/matchrule"
"github.com/ozontech/file.d/decoder"
"github.com/ozontech/file.d/logger"
"github.com/ozontech/file.d/metric"
Expand Down Expand Up @@ -135,7 +134,7 @@ type Settings struct {
EventTimeout time.Duration
AntispamThreshold int
AntispamField string
AntispamExceptions matchrule.RuleSets
AntispamExceptions antispam.Exceptions
AvgEventSize int
MaxEventSize int
StreamField string
Expand Down

0 comments on commit e02c4a6

Please sign in to comment.