Skip to content

Commit

Permalink
Make event acknowledgment asynchronous in shipper output (#32785)
Browse files Browse the repository at this point in the history
So we can keep publishing batches not blocking on a single batch to be acknowledged.
Also updated the config documentation.
  • Loading branch information
rdner authored and chrisberkhout committed Jun 1, 2023
1 parent c76e845 commit 6f655af
Show file tree
Hide file tree
Showing 5 changed files with 284 additions and 175 deletions.
7 changes: 7 additions & 0 deletions libbeat/outputs/shipper/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ output.shipper:
timeout: 30
max_retries: 3
bulk_max_size: 50
ack_polling_interval: '5ms'
backoff:
init: 1
max: 60
Expand Down Expand Up @@ -61,6 +62,12 @@ Setting `bulk_max_size` to values less than or equal to 0 disables the
splitting of batches. When splitting is disabled, the queue decides on the
number of events to be contained in a batch.

### `ack_polling_interval`

The minimal interval for getting persisted index updates from the shipper server. Batches of events are acknowledged asynchronously in the background. If after the `ack_polling_interval` duration the persisted index value changed all batches pending acknowledgment will be checked against the new value and acknowledged if `persisted_index` >= `accepted_index`.

The default value is `5ms`, cannot be set to a value less then the default.

### `backoff.init`

The number of seconds to wait before trying to republish to the shipper
Expand Down
50 changes: 43 additions & 7 deletions libbeat/outputs/shipper/api/shipper_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package api

import (
context "context"
"errors"
"time"

pb "github.com/elastic/elastic-agent-shipper-client/pkg/proto"
"github.com/elastic/elastic-agent-shipper-client/pkg/proto/messages"
Expand All @@ -29,23 +31,29 @@ import (
func NewProducerMock(cap int) *ProducerMock {
id, _ := uuid.NewV4()
return &ProducerMock{
UUID: id.String(),
uuid: id.String(),
Q: make([]*messages.Event, 0, cap),
}
}

type ProducerMock struct {
pb.UnimplementedProducerServer
Q []*messages.Event
UUID string
Error error
Q []*messages.Event
uuid string
AcceptedCount uint32
persistedIndex uint64
Error error
}

func (p *ProducerMock) PublishEvents(ctx context.Context, r *messages.PublishRequest) (*messages.PublishReply, error) {
if p.Error != nil {
return nil, p.Error
}

if r.Uuid != p.uuid {
return nil, errors.New("UUID does not match")
}

resp := &messages.PublishReply{}

for _, e := range r.Events {
Expand All @@ -55,16 +63,44 @@ func (p *ProducerMock) PublishEvents(ctx context.Context, r *messages.PublishReq

p.Q = append(p.Q, e)
resp.AcceptedCount++
if resp.AcceptedCount == p.AcceptedCount {
break
}
}

resp.AcceptedIndex = uint64(len(p.Q))

return resp, nil
}

func (p *ProducerMock) Persist(count uint64) {
p.persistedIndex = count
}

func (p *ProducerMock) PersistedIndex(req *messages.PersistedIndexRequest, producer pb.Producer_PersistedIndexServer) error {
return producer.Send(&messages.PersistedIndexReply{
Uuid: p.UUID,
PersistedIndex: uint64(len(p.Q)),
err := producer.Send(&messages.PersistedIndexReply{
Uuid: p.uuid,
PersistedIndex: p.persistedIndex,
})
if err != nil {
return err
}

if !req.PollingInterval.IsValid() || req.PollingInterval.AsDuration() == 0 {
return nil
}

ticker := time.NewTicker(req.PollingInterval.AsDuration())
defer ticker.Stop()

for range ticker.C {
err = producer.Send(&messages.PersistedIndexReply{
Uuid: p.uuid,
PersistedIndex: p.persistedIndex,
})
if err != nil {
return err
}
}
return nil
}
9 changes: 5 additions & 4 deletions libbeat/outputs/shipper/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ type Config struct {
MaxRetries int `config:"max_retries" validate:"min=-1,nonzero"`
// BulkMaxSize max amount of events in a single batch
BulkMaxSize int `config:"bulk_max_size"`
// AckPollingInterval is an interval for polling the shipper server for persistence acknowledgement.
// The default/minimum 5 ms polling interval means we could publish at most 1000 ms / 5 ms = 200 batches/sec.
// With the default `bulk_max_size` of 50 events this would limit
// the default throughput per worker to 200 * 50 = 10000 events/sec.
// AckPollingInterval is a minimal interval for getting persisted index updates from the shipper server.
// Batches of events are acknowledged asynchronously in the background.
// If after the `AckPollingInterval` duration the persisted index value changed
// all batches pending acknowledgment will be checked against the new value
// and acknowledged if `persisted_index` >= `accepted_index`.
AckPollingInterval time.Duration `config:"ack_polling_interval" validate:"min=5ms"`
// Backoff strategy for the shipper output
Backoff backoffConfig `config:"backoff"`
Expand Down
Loading

0 comments on commit 6f655af

Please sign in to comment.