Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(GODT-2500): Add panic handlers everywhere. #332

Merged
merged 2 commits into from
Apr 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion queue/bool.go → async/bool.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package queue
package async

import "sync/atomic"

Expand Down
65 changes: 65 additions & 0 deletions async/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package async

import (
"context"
"sync"
)

// Abortable collects groups of functions that can be aborted by calling Abort.
type Abortable struct {
abortFunc []context.CancelFunc
abortLock sync.RWMutex
}

func (a *Abortable) Do(ctx context.Context, fn func(context.Context)) {
fn(a.newCancelCtx(ctx))
}

func (a *Abortable) Abort() {
a.abortLock.RLock()
defer a.abortLock.RUnlock()

for _, fn := range a.abortFunc {
fn()
}
}

func (a *Abortable) newCancelCtx(ctx context.Context) context.Context {
a.abortLock.Lock()
defer a.abortLock.Unlock()

ctx, cancel := context.WithCancel(ctx)

a.abortFunc = append(a.abortFunc, cancel)

return ctx
}

// RangeContext iterates over the given channel until the context is canceled or the
// channel is closed.
func RangeContext[T any](ctx context.Context, ch <-chan T, fn func(T)) {
for {
select {
case v, ok := <-ch:
if !ok {
return
}

fn(v)

case <-ctx.Done():
return
}
}
}

// ForwardContext forwards all values from the src channel to the dst channel until the
// context is canceled or the src channel is closed.
func ForwardContext[T any](ctx context.Context, dst chan<- T, src <-chan T) {
RangeContext(ctx, src, func(v T) {
select {
case dst <- v:
case <-ctx.Done():
}
})
}
214 changes: 214 additions & 0 deletions async/group.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
package async

import (
"context"
"math/rand"
"sync"
"time"
)

// Group is forked and improved version of "github.com/bradenaw/juniper/xsync.Group".
//
// It manages a group of goroutines. The main change to original is posibility
// to wait passed function to finish without canceling it's context and adding
// PanicHandler.
type Group struct {
baseCtx context.Context
ctx context.Context
jobCtx context.Context
cancel context.CancelFunc
finish context.CancelFunc
wg sync.WaitGroup

panicHandler PanicHandler
}

// NewGroup returns a Group ready for use. The context passed to any of the f functions will be a
// descendant of ctx.
func NewGroup(ctx context.Context, panicHandler PanicHandler) *Group {
bgCtx, cancel := context.WithCancel(ctx)
jobCtx, finish := context.WithCancel(ctx)

return &Group{
baseCtx: ctx,
ctx: bgCtx,
jobCtx: jobCtx,
cancel: cancel,
finish: finish,
panicHandler: panicHandler,
}
}

// Once calls f once from another goroutine.
func (g *Group) Once(f func(ctx context.Context)) {
g.wg.Add(1)

go func() {
defer HandlePanic(g.panicHandler)

f(g.ctx)
g.wg.Done()
}()
}

// jitterDuration returns a random duration in [d - jitter, d + jitter].
func jitterDuration(d time.Duration, jitter time.Duration) time.Duration {
return d + time.Duration(float64(jitter)*((rand.Float64()*2)-1)) //nolint:gosec
}

// Periodic spawns a goroutine that calls f once per interval +/- jitter.
func (g *Group) Periodic(
interval time.Duration,
jitter time.Duration,
f func(ctx context.Context),
) {
g.wg.Add(1)

go func() {
defer HandlePanic(g.panicHandler)

defer g.wg.Done()

t := time.NewTimer(jitterDuration(interval, jitter))
defer t.Stop()

for {
if g.ctx.Err() != nil {
return
}

select {
case <-g.jobCtx.Done():
return
case <-t.C:
}

t.Reset(jitterDuration(interval, jitter))
f(g.ctx)
}
}()
}

// Trigger spawns a goroutine which calls f whenever the returned function is called. If f is
// already running when triggered, f will run again immediately when it finishes.
func (g *Group) Trigger(f func(ctx context.Context)) func() {
c := make(chan struct{}, 1)

g.wg.Add(1)

go func() {
defer HandlePanic(g.panicHandler)

defer g.wg.Done()

for {
if g.ctx.Err() != nil {
return
}
select {
case <-g.jobCtx.Done():
return
case <-c:
}
f(g.ctx)
}
}()

return func() {
select {
case c <- struct{}{}:
default:
}
}
}

// PeriodicOrTrigger spawns a goroutine which calls f whenever the returned function is called. If
// f is already running when triggered, f will run again immediately when it finishes. Also calls f
// when it has been interval+/-jitter since the last trigger.
func (g *Group) PeriodicOrTrigger(
interval time.Duration,
jitter time.Duration,
f func(ctx context.Context),
) func() {
c := make(chan struct{}, 1)

g.wg.Add(1)

go func() {
defer HandlePanic(g.panicHandler)

defer g.wg.Done()

t := time.NewTimer(jitterDuration(interval, jitter))
defer t.Stop()

for {
if g.ctx.Err() != nil {
return
}
select {
case <-g.jobCtx.Done():
return
case <-t.C:
t.Reset(jitterDuration(interval, jitter))
case <-c:
if !t.Stop() {
<-t.C
}

t.Reset(jitterDuration(interval, jitter))
}
f(g.ctx)
}
}()

return func() {
select {
case c <- struct{}{}:
default:
}
}
}

func (g *Group) resetCtx() {
g.jobCtx, g.finish = context.WithCancel(g.baseCtx)
g.ctx, g.cancel = context.WithCancel(g.baseCtx)
}

// Cancel is send to all of the spawn goroutines and ends periodic
// or trigger routines.
func (g *Group) Cancel() {
g.cancel()
g.finish()
g.resetCtx()
}

// Finish will ends all periodic or polls routines. It will let
// currently running functions to finish (cancel is not sent).
//
// It is not safe to call Wait concurrently with any other method on g.
func (g *Group) Finish() {
g.finish()
g.jobCtx, g.finish = context.WithCancel(g.baseCtx)
}

// CancelAndWait cancels the context passed to any of the spawned goroutines and waits for all spawned
// goroutines to exit.
//
// It is not safe to call Wait concurrently with any other method on g.
func (g *Group) CancelAndWait() {
g.finish()
g.cancel()
g.wg.Wait()
g.resetCtx()
}

// WaitToFinish will ends all periodic or polls routines. It will wait for
// currently running functions to finish (cancel is not sent).
//
// It is not safe to call Wait concurrently with any other method on g.
func (g *Group) WaitToFinish() {
g.finish()
g.wg.Wait()
g.jobCtx, g.finish = context.WithCancel(g.baseCtx)
}
15 changes: 15 additions & 0 deletions async/panic_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package async

type PanicHandler interface {
HandlePanic()
}

type NoopPanicHandler struct{}

func (n NoopPanicHandler) HandlePanic() {}

func HandlePanic(panicHandler PanicHandler) {
if panicHandler != nil {
panicHandler.HandlePanic()
}
}
6 changes: 3 additions & 3 deletions queue/queued_channel.go → async/queued_channel.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package queue
package async

import (
"context"
Expand All @@ -18,7 +18,7 @@ type QueuedChannel[T any] struct {
closed atomicBool // Should use atomic.Bool once we use Go 1.19!
}

func NewQueuedChannel[T any](chanBufferSize, queueCapacity int) *QueuedChannel[T] {
func NewQueuedChannel[T any](chanBufferSize, queueCapacity int, panicHandler PanicHandler) *QueuedChannel[T] {
queue := &QueuedChannel[T]{
ch: make(chan T, chanBufferSize),
stopCh: make(chan struct{}),
Expand All @@ -30,7 +30,7 @@ func NewQueuedChannel[T any](chanBufferSize, queueCapacity int) *QueuedChannel[T
queue.closed.store(false)

// Start the queue consumer.
logging.GoAnnotated(context.Background(), func(ctx context.Context) {
logging.GoAnnotated(context.Background(), panicHandler, func(ctx context.Context) {
defer close(queue.ch)

for {
Expand Down
6 changes: 3 additions & 3 deletions queue/queued_channel_test.go → async/queued_channel_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package queue
package async

import (
"testing"
Expand All @@ -11,7 +11,7 @@ func TestQueuedChannel(t *testing.T) {
defer goleak.VerifyNone(t)

// Create a new queued channel.
queue := NewQueuedChannel[int](3, 3)
queue := NewQueuedChannel[int](3, 3, NoopPanicHandler{})

// Push some items to the queue.
require.True(t, queue.Enqueue(1, 2, 3))
Expand Down Expand Up @@ -43,7 +43,7 @@ func TestQueuedChannelDoesNotLeakIfThereAreNoReadersOnCloseAndDiscard(t *testing
defer goleak.VerifyNone(t)

// Create a new queued channel.
queue := NewQueuedChannel[int](1, 3)
queue := NewQueuedChannel[int](1, 3, NoopPanicHandler{})

// Push some items to the queue.
require.True(t, queue.Enqueue(1, 2, 3))
Expand Down
27 changes: 27 additions & 0 deletions async/wait_group.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package async

import "sync"

type WaitGroup struct {
wg sync.WaitGroup
panicHandler PanicHandler
}

func MakeWaitGroup(panicHandler PanicHandler) WaitGroup {
return WaitGroup{panicHandler: panicHandler}
}

func (wg *WaitGroup) Go(f func()) {
wg.wg.Add(1)

go func() {
defer HandlePanic(wg.panicHandler)

defer wg.wg.Done()
f()
}()
}

func (wg *WaitGroup) Wait() {
wg.wg.Wait()
}
Loading