Skip to content

Commit

Permalink
Test various cancellation behaviour
Browse files Browse the repository at this point in the history
We should pin down the behaviour around clients going away, suspensions,
and cancellations. Also add new behaviour to panic proactively if the
client has gone away when starting to write a new journal entry.
  • Loading branch information
jackkleeman committed Aug 8, 2024
1 parent adafb6e commit 44a16f6
Show file tree
Hide file tree
Showing 6 changed files with 408 additions and 10 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/text v0.14.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs=
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
Expand Down
6 changes: 6 additions & 0 deletions internal/state/completion.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package state

import (
"context"
"log/slog"

"github.com/restatedev/sdk-go/internal/log"
Expand Down Expand Up @@ -30,6 +31,11 @@ func (m *Machine) ackable(entryIndex uint32) wire.AckableMessage {
}

func (m *Machine) Write(message wire.Message) {
if m.ctx.Err() != nil {
// the main context being cancelled means the client is no longer interested in our response
// and so creating new entries is pointless and we should shut down the state machine.
panic(m.newClientGoneAway(context.Cause(m.ctx)))
}
if message, ok := message.(wire.CompleteableMessage); ok && !message.Completed() {
m.pendingMutex.Lock()
m.pendingCompletions[m.entryIndex] = message
Expand Down
27 changes: 25 additions & 2 deletions internal/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ type Machine struct {
suspend func(error)

handler restate.Handler
protocol *wire.Protocol
protocol wire.Protocol

// state
key string
Expand Down Expand Up @@ -432,10 +432,17 @@ The journal entry at position %d was:
m.log.LogAttrs(m.ctx, slog.LevelError, "Error sending failure message", log.Error(typ.err))
}

return
case *clientGoneAway:
m.log.LogAttrs(m.ctx, slog.LevelWarn, "Cancelling invocation as the incoming request context was cancelled", log.Error(typ.err))
return
case *wire.SuspensionPanic:
if m.ctx.Err() != nil {
m.log.WarnContext(m.ctx, "Cancelling invocation as the incoming request was cancelled")
// special case; awaiting a pre-existing sleep or awakeable doesn't create a new entry
// so doesn't hit the clientGoneAway code path, but instead it just looks like a suspension.
// so here we should differentiate between the causes; if the main context is cancelled,
// this isn't a suspension.
m.log.LogAttrs(m.ctx, slog.LevelWarn, "Cancelling invocation as the incoming request context was cancelled", log.Error(typ.Err))
return
}
if stderrors.Is(typ.Err, io.EOF) {
Expand Down Expand Up @@ -637,6 +644,12 @@ func replayOrNew[M wire.Message, O any](
}
defer m.entryMutex.Unlock()

if m.ctx.Err() != nil {
// the main context being cancelled means the client is no longer interested in our response
// and so creating new entries is pointless and we should shut down the state machine.
panic(m.newClientGoneAway(context.Cause(m.ctx)))
}

if m.failure != nil {
// maybe the user will try to catch our panics, but we will just keep producing them
panic(m.failure)
Expand Down Expand Up @@ -676,3 +689,13 @@ func (m *Machine) newConcurrentContextUse(entry wire.Type) *concurrentContextUse
m.failure = c
return c
}

type clientGoneAway struct {
err error
}

func (m *Machine) newClientGoneAway(err error) *clientGoneAway {
c := &clientGoneAway{err}
m.failure = c
return c
}
Loading

0 comments on commit 44a16f6

Please sign in to comment.