Skip to content

Commit

Permalink
address pr comments
Browse files Browse the repository at this point in the history
  • Loading branch information
efd6 committed Sep 19, 2023
1 parent 4251d26 commit 60a095d
Show file tree
Hide file tree
Showing 4 changed files with 403 additions and 57 deletions.
42 changes: 20 additions & 22 deletions libbeat/processors/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,27 +129,21 @@ func getStoreFor(cfg config) (Store, context.CancelFunc, error) {

func getMemStore(stores map[string]*memStore, id string, cfg config) (*memStore, context.CancelFunc) {
s, ok := stores[id]
if ok {
// We may have already constructed the store with
// a get or a delete config, so set the TTL, cap
// and effort if we have a put config. If another
// put config has already been included, we ignore
// the put options now.
s.setPutOptions(cfg)
return s, noop
if !ok {
s = newMemStore(cfg, id)
stores[s.id] = s
}
s = newMemStore(cfg)
stores[id] = s

// We may have already constructed the store with
// a get or a delete config, so set the TTL, cap
// and effort if we have a put config. If another
// put config has already been included, we ignore
// the put options now.
s.setPutOptions(cfg)

return s, func() {
// TODO: Consider making this reference counted.
// Currently, what we have is essentially an
// ownership model, where the put operation is
// owner. This could conceivably be problematic
// if a processor were shared between different
// inputs and the put is closed due to a config
// change.
storeMu.Lock()
delete(stores, id)
s.close(stores)
storeMu.Unlock()
}
}
Expand Down Expand Up @@ -260,12 +254,16 @@ func (p *cache) getFor(event *beat.Event) (result *beat.Event, err error) {
if m, ok := meta.(map[string]interface{}); ok {
meta = mapstr.M(m)
}
// ... and write it into the cloned event.
result = event.Clone()
if _, err = result.PutValue(dst, meta); err != nil {
// ... and write it into the event.
// The implementation of PutValue currently leaves event
// essentially unchanged in the case of an error (in the
// case of an @metadata field there may be a mutation,
// but at most this will be the addition of a Meta field
// value to event). None of this is documented.
if _, err = event.PutValue(dst, meta); err != nil {
return nil, err
}
return result, nil
return event, nil
}

// deleteFor deletes the configured value from the cache based on the value of
Expand Down
285 changes: 271 additions & 14 deletions libbeat/processors/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package cache

import (
"errors"
"io"
"testing"

"github.com/google/go-cmp/cmp"
Expand Down Expand Up @@ -297,6 +296,263 @@ var cacheTests = []struct {
},
},
},
{
name: "put_and_get_value_with_get_error_no_overwrite",
configs: []testConfig{
{
when: func(e mapstr.M) bool {
return e["put"] == true
},
cfg: mapstr.M{
"backend": mapstr.M{
"memory": mapstr.M{
"id": "aidmaster",
},
},
"put": mapstr.M{
"key_field": "crowdstrike.aid",
"value_field": "crowdstrike.metadata",
"ttl": "168h",
},
},
},
{
when: func(e mapstr.M) bool {
return e["get"] == true
},
cfg: mapstr.M{
"backend": mapstr.M{
"memory": mapstr.M{
"id": "aidmaster",
},
},
"get": mapstr.M{
"key_field": "crowdstrike.aid",
"target_field": "crowdstrike.metadata_new",
},
},
},
},
wantInitErr: nil,
steps: []cacheTestStep{
{
event: mapstr.M{
"put": true,
"crowdstrike": mapstr.M{
"aid": "one",
"metadata": "metadata_value",
},
},
want: mapstr.M{
"put": true,
"crowdstrike": mapstr.M{
"aid": "one",
"metadata": "metadata_value",
},
},
wantCacheVal: map[string]*CacheEntry{
"one": {key: "one", value: "metadata_value"},
},
wantErr: nil,
},
{
event: mapstr.M{
"get": true,
"crowdstrike": mapstr.M{
"aid": "one",
"metadata_new": mapstr.M{
"someone_is_already_here": mapstr.M{
"another_key": "value",
},
},
},
},
want: mapstr.M{
"get": true,
"crowdstrike": mapstr.M{
"aid": "one",
"metadata_new": mapstr.M{
"someone_is_already_here": mapstr.M{
"another_key": "value",
},
},
},
},
wantCacheVal: map[string]*CacheEntry{
"one": {key: "one", value: "metadata_value"},
},
wantErr: errors.New("error applying cache get processor: target field 'crowdstrike.metadata_new' already exists and overwrite_keys is false"),
},
},
},
{
name: "put_and_get_value_allow_overwrite",
configs: []testConfig{
{
when: func(e mapstr.M) bool {
return e["put"] == true
},
cfg: mapstr.M{
"backend": mapstr.M{
"memory": mapstr.M{
"id": "aidmaster",
},
},
"put": mapstr.M{
"key_field": "crowdstrike.aid",
"value_field": "crowdstrike.metadata",
"ttl": "168h",
},
},
},
{
when: func(e mapstr.M) bool {
return e["get"] == true
},
cfg: mapstr.M{
"backend": mapstr.M{
"memory": mapstr.M{
"id": "aidmaster",
},
},
"overwrite_keys": true,
"get": mapstr.M{
"key_field": "crowdstrike.aid",
"target_field": "crowdstrike.metadata_new",
},
},
},
},
wantInitErr: nil,
steps: []cacheTestStep{
{
event: mapstr.M{
"put": true,
"crowdstrike": mapstr.M{
"aid": "one",
"metadata": "metadata_value",
},
},
want: mapstr.M{
"put": true,
"crowdstrike": mapstr.M{
"aid": "one",
"metadata": "metadata_value",
},
},
wantCacheVal: map[string]*CacheEntry{
"one": {key: "one", value: "metadata_value"},
},
wantErr: nil,
},
{
event: mapstr.M{
"get": true,
"crowdstrike": mapstr.M{
"aid": "one",
"metadata_new": mapstr.M{
"someone_is_already_here": mapstr.M{
"another_key": "value",
},
},
},
},
want: mapstr.M{
"get": true,
"crowdstrike": mapstr.M{
"aid": "one",
"metadata_new": "metadata_value",
},
},
wantCacheVal: map[string]*CacheEntry{
"one": {key: "one", value: "metadata_value"},
},
wantErr: nil,
},
},
},
{
name: "put_and_get_value_allow_overwrite_but_get_error",
configs: []testConfig{
{
when: func(e mapstr.M) bool {
return e["put"] == true
},
cfg: mapstr.M{
"backend": mapstr.M{
"memory": mapstr.M{
"id": "aidmaster",
},
},
"put": mapstr.M{
"key_field": "crowdstrike.aid",
"value_field": "crowdstrike.metadata",
"ttl": "168h",
},
},
},
{
when: func(e mapstr.M) bool {
return e["get"] == true
},
cfg: mapstr.M{
"backend": mapstr.M{
"memory": mapstr.M{
"id": "aidmaster",
},
},
"overwrite_keys": true,
"get": mapstr.M{
"key_field": "crowdstrike.aid",
"target_field": "crowdstrike.metadata_new.child",
},
},
},
},
wantInitErr: nil,
steps: []cacheTestStep{
{
event: mapstr.M{
"put": true,
"crowdstrike": mapstr.M{
"aid": "one",
"metadata": "metadata_value",
},
},
want: mapstr.M{
"put": true,
"crowdstrike": mapstr.M{
"aid": "one",
"metadata": "metadata_value",
},
},
wantCacheVal: map[string]*CacheEntry{
"one": {key: "one", value: "metadata_value"},
},
wantErr: nil,
},
{
event: mapstr.M{
"get": true,
"crowdstrike": mapstr.M{
"aid": "one",
"metadata_new": "someone_is_already_here",
},
},
want: mapstr.M{
"get": true,
"crowdstrike": mapstr.M{
"aid": "one",
"metadata_new": "someone_is_already_here",
},
},
wantCacheVal: map[string]*CacheEntry{
"one": {key: "one", value: "metadata_value"},
},
wantErr: errors.New("error applying cache get processor: expected map but type is string"),
},
},
},
}

type testConfig struct {
Expand All @@ -309,7 +565,7 @@ func TestCache(t *testing.T) {
for _, test := range cacheTests {
t.Run(test.name, func(t *testing.T) {
var processors []beat.Processor
for _, cfg := range test.configs {
for i, cfg := range test.configs {
config, err := conf.NewConfigFrom(cfg.cfg)
if err != nil {
t.Fatal(err)
Expand All @@ -322,7 +578,20 @@ func TestCache(t *testing.T) {
if err != nil {
return
}

t.Log(p)
c, ok := p.(*cache)
if !ok {
t.Fatalf("processor %d is not an *cache", i)
}

defer func() {
err := c.Close()
if err != nil {
t.Errorf("unexpected error from c.Close(): %v", err)
}
}()

processors = append(processors, p)
}

Expand Down Expand Up @@ -351,18 +620,6 @@ func TestCache(t *testing.T) {
}
}
}

for i, p := range processors {
p, ok := p.(io.Closer)
if !ok {
t.Errorf("processor %d is not an io.Closer", i)
continue
}
err := p.Close()
if err != nil {
t.Errorf("unexpected error from p.Close(): %v", err)
}
}
})
}
}
Loading

0 comments on commit 60a095d

Please sign in to comment.