Skip to content

Commit

Permalink
[Ingest Manager] Fixed nil pointer during unenroll (#23609)
Browse files Browse the repository at this point in the history
[Ingest Manager] Fixed nil pointer during unenroll (#23609)
  • Loading branch information
michalpristas authored Jan 21, 2021
1 parent f527935 commit 7bdbf28
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 5 deletions.
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
- Fix Windows service installation script {pull}20203[20203]
- Fix timeout issue stopping service applications {pull}20256[20256]
- Fix incorrect hash when upgrading agent {pull}22322[22322]
- Fixed nil pointer during unenroll {pull}23609[23609]

==== New features

Expand Down
8 changes: 7 additions & 1 deletion x-pack/elastic-agent/pkg/agent/application/fleet_acker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package application
import (
"context"
"fmt"
"strings"
"time"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
Expand Down Expand Up @@ -58,27 +59,32 @@ func (f *actionAcker) Ack(ctx context.Context, action fleetapi.Action) error {
return errors.New(err, fmt.Sprintf("acknowledge action '%s' for elastic-agent '%s' failed", action.ID(), agentID), errors.TypeNetwork)
}

f.log.Debugf("action with id '%s' was just acknowledged", action.ID())

return nil
}

func (f *actionAcker) AckBatch(ctx context.Context, actions []fleetapi.Action) error {
// checkin
agentID := f.agentInfo.AgentID()
events := make([]fleetapi.AckEvent, 0, len(actions))
ids := make([]string, 0, len(actions))
for _, action := range actions {
events = append(events, constructEvent(action, agentID))
ids = append(ids, action.ID())
}

cmd := fleetapi.NewAckCmd(f.agentInfo, f.client)
req := &fleetapi.AckRequest{
Events: events,
}

f.log.Debugf("%d actions with ids '%s' acknowledging", len(ids), strings.Join(ids, ","))

_, err := cmd.Execute(ctx, req)
if err != nil {
return errors.New(err, fmt.Sprintf("acknowledge %d actions '%v' for elastic-agent '%s' failed", len(actions), actions, agentID), errors.TypeNetwork)
}

return nil
}

Expand Down
6 changes: 5 additions & 1 deletion x-pack/elastic-agent/pkg/agent/application/lazy_acker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package application
import (
"context"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi"
)

Expand All @@ -19,19 +20,22 @@ type ackForcer interface {
}

type lazyAcker struct {
log *logger.Logger
acker batchAcker
queue []fleetapi.Action
}

func newLazyAcker(baseAcker batchAcker) *lazyAcker {
func newLazyAcker(baseAcker batchAcker, log *logger.Logger) *lazyAcker {
return &lazyAcker{
acker: baseAcker,
queue: make([]fleetapi.Action, 0),
log: log,
}
}

func (f *lazyAcker) Ack(ctx context.Context, action fleetapi.Action) error {
f.queue = append(f.queue, action)
f.log.Debugf("appending action with id '%s' to the queue", action.ID())

if _, isAckForced := action.(ackForcer); isAckForced {
return f.Commit(ctx)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestLazyAcker(t *testing.T) {
t.Fatal(err)
}

lacker := newLazyAcker(acker)
lacker := newLazyAcker(acker, log)

if acker == nil {
t.Fatal("acker not initialized")
Expand Down
2 changes: 1 addition & 1 deletion x-pack/elastic-agent/pkg/agent/application/managed_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func newManaged(
return nil, err
}

batchedAcker := newLazyAcker(acker)
batchedAcker := newLazyAcker(acker, log)

// Create the state store that will persist the last good policy change on disk.
stateStore, err := newStateStoreWithMigration(log, info.AgentActionStoreFile(), info.AgentStateStoreFile())
Expand Down
2 changes: 1 addition & 1 deletion x-pack/elastic-agent/pkg/agent/application/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func (s *stateStore) Save() error {
if apc, ok := s.state.action.(*fleetapi.ActionPolicyChange); ok {
serialize.Action = &actionSerializer{apc.ActionID, apc.ActionType, apc.Policy, nil}
} else if aun, ok := s.state.action.(*fleetapi.ActionUnenroll); ok {
serialize.Action = &actionSerializer{apc.ActionID, apc.ActionType, nil, &aun.IsDetected}
serialize.Action = &actionSerializer{aun.ActionID, aun.ActionType, nil, &aun.IsDetected}
} else {
return fmt.Errorf("incompatible type, expected ActionPolicyChange and received %T", s.state.action)
}
Expand Down
30 changes: 30 additions & 0 deletions x-pack/elastic-agent/pkg/agent/application/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,36 @@ func runTestStateStore(t *testing.T, ackToken string) {
require.Equal(t, ackToken, store.AckToken())
}))

t.Run("can save to disk unenroll action type",
withFile(func(t *testing.T, file string) {
action := &fleetapi.ActionUnenroll{
ActionID: "abc123",
ActionType: "UNENROLL",
}

s := storage.NewDiskStore(file)
store, err := newStateStore(log, s)
require.NoError(t, err)

require.Equal(t, 0, len(store.Actions()))
store.Add(action)
store.SetAckToken(ackToken)
err = store.Save()
require.NoError(t, err)
require.Equal(t, 1, len(store.Actions()))
require.Equal(t, ackToken, store.AckToken())

s = storage.NewDiskStore(file)
store1, err := newStateStore(log, s)
require.NoError(t, err)

actions := store1.Actions()
require.Equal(t, 1, len(actions))

require.Equal(t, action, actions[0])
require.Equal(t, ackToken, store.AckToken())
}))

t.Run("when we ACK we save to disk",
withFile(func(t *testing.T, file string) {
ActionPolicyChange := &fleetapi.ActionPolicyChange{
Expand Down

0 comments on commit 7bdbf28

Please sign in to comment.