Skip to content

Commit

Permalink
Merge pull request #55 from runreveal/alan/timeouts
Browse files Browse the repository at this point in the history
Add flush timeout & watchdog timer
  • Loading branch information
abraithwaite authored Sep 4, 2024
2 parents f6ee4a5 + 6a43bdf commit b1176c0
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 22 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
cache: true

- name: Start containers
run: docker-compose -f "docker-compose.yml" up -d
run: docker compose -f "docker-compose.yml" up -d

- name: Test
run: make test
Expand All @@ -38,5 +38,5 @@ jobs:
run: make lint

- name: Stop containers
run: docker-compose -f "docker-compose.yml" down -v
run: docker compose -f "docker-compose.yml" down -v
if: always()
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ dist:

.PHONY: compose
compose:
docker-compose up -d
docker compose up -d

.PHONY: lint
lint: $(GOPATH)/bin/golangci-lint
Expand Down
88 changes: 69 additions & 19 deletions x/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ import (
"github.com/segmentio/ksuid"
)

// Flusher is the core interface that the user of this package must implement
// to get the batching functionality.
// It takes a slice of messages and returns an error if the flush fails. It's
// expected to be run synchronously and only return once the flush is complete.
// The flusher MUST respond to the context being canceled and return an error
// if the context is canceled. If no other error occured, then return the
// context error.
type Flusher[T any] interface {
Flush(context.Context, []kawa.Message[T]) error
}
Expand Down Expand Up @@ -42,12 +49,14 @@ func (ef ErrorFunc[T]) HandleError(c context.Context, err error, msgs []kawa.Mes
// deadlock as the internal channel being written to by `Send` will not be
// getting read.
type Destination[T any] struct {
flusher Flusher[T]
flushq chan struct{}
flushlen int
flushfreq time.Duration
flushcan map[string]context.CancelFunc
stopTimeout time.Duration
flusher Flusher[T]
flushq chan struct{}
flushlen int
flushfreq time.Duration
flushcan map[string]context.CancelFunc
flushTimeout time.Duration
stopTimeout time.Duration
watchdogTimeout time.Duration

errorHandler ErrorHandler[T]
flusherr chan error
Expand All @@ -65,8 +74,10 @@ type OptFunc func(*Opts)
type Opts struct {
FlushLength int
FlushFrequency time.Duration
FlushTimeout time.Duration
FlushParallelism int
StopTimeout time.Duration
WatchdogTimeout time.Duration
}

func FlushFrequency(d time.Duration) func(*Opts) {
Expand All @@ -87,6 +98,18 @@ func FlushParallelism(n int) func(*Opts) {
}
}

func FlushTimeout(d time.Duration) func(*Opts) {
return func(opts *Opts) {
opts.FlushTimeout = d
}
}

func WatchdogTimeout(d time.Duration) func(*Opts) {
return func(opts *Opts) {
opts.WatchdogTimeout = d
}
}

func StopTimeout(d time.Duration) func(*Opts) {
return func(opts *Opts) {
opts.StopTimeout = d
Expand Down Expand Up @@ -124,21 +147,30 @@ func NewDestination[T any](f Flusher[T], e ErrorHandler[T], opts ...OptFunc) *De
if cfg.StopTimeout < 0 {
cfg.StopTimeout = 0
}
if cfg.WatchdogTimeout < 0 {
cfg.WatchdogTimeout = 0
}
if cfg.FlushTimeout < 0 {
cfg.FlushTimeout = 0
}

return &Destination[T]{
flushlen: cfg.FlushLength,
flushq: make(chan struct{}, cfg.FlushParallelism),
flusher: f,
flushcan: make(map[string]context.CancelFunc),
flushfreq: cfg.FlushFrequency,
stopTimeout: cfg.StopTimeout,
d := &Destination[T]{
flushlen: cfg.FlushLength,
flushq: make(chan struct{}, cfg.FlushParallelism),
flusher: f,
flushcan: make(map[string]context.CancelFunc),
flushfreq: cfg.FlushFrequency,
flushTimeout: cfg.FlushTimeout,
stopTimeout: cfg.StopTimeout,
watchdogTimeout: cfg.WatchdogTimeout,

errorHandler: e,
flusherr: make(chan error, cfg.FlushParallelism),

messages: make(chan msgAck[T]),
}

return d
}

type msgAck[T any] struct {
Expand Down Expand Up @@ -188,10 +220,20 @@ func (d *Destination[T]) Run(ctx context.Context) error {
}
d.syncMu.Unlock()

var wdChan <-chan time.Time
var wdTimer *time.Timer
if d.watchdogTimeout > 0 {
wdTimer = time.NewTimer(d.watchdogTimeout)
wdChan = wdTimer.C
}

var err error
loop:
for {
select {
case <-wdChan:
return errDeadlock

case msg := <-d.messages: // Here
d.count++
if setTimer {
Expand All @@ -200,6 +242,14 @@ loop:
time.AfterFunc(d.flushfreq, func() {
epochC <- epc // Here
})

if wdTimer != nil {
if !wdTimer.Stop() {
<-wdTimer.C
}
wdTimer.Reset(d.watchdogTimeout)
}

setTimer = false
}
d.buf = append(d.buf, msg)
Expand Down Expand Up @@ -246,7 +296,7 @@ loop:
return errDeadlock
}

var errDeadlock = errors.New("batcher: flushes timed out waiting for completion after context stopped.")
var errDeadlock = errors.New("batcher: flushes timed out")

func (d *Destination[T]) flush(ctx context.Context) {
// We make a new context here so that we can cancel the flush if the parent
Expand Down Expand Up @@ -292,11 +342,11 @@ func (d *Destination[T]) flush(ctx context.Context) {
}

func (d *Destination[T]) doflush(ctx context.Context, msgs []kawa.Message[T], acks []func()) {
// This not ideal.
// kawaMsgs := make([]kawa.Message[T], 0, len(msgs))
// for _, m := range msgs {
// kawaMsgs = append(kawaMsgs, m.msg)
// }
if d.flushTimeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, d.flushTimeout)
defer cancel()
}

err := d.flusher.Flush(ctx, msgs)
if err != nil {
Expand Down

0 comments on commit b1176c0

Please sign in to comment.