Skip to content

Commit

Permalink
Use panics for serialisation errors
Browse files Browse the repository at this point in the history
  • Loading branch information
jackkleeman committed Aug 20, 2024
1 parent 841b7c3 commit d6322af
Show file tree
Hide file tree
Showing 20 changed files with 141 additions and 174 deletions.
11 changes: 6 additions & 5 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type Context interface {
Awakeable(options ...options.AwakeableOption) Awakeable
// ResolveAwakeable allows an awakeable (not necessarily from this service) to be
// resolved with a particular value.
ResolveAwakeable(id string, value any, options ...options.ResolveAwakeableOption) error
ResolveAwakeable(id string, value any, options ...options.ResolveAwakeableOption)
// ResolveAwakeable allows an awakeable (not necessarily from this service) to be
// rejected with a particular error.
RejectAwakeable(id string, reason error)
Expand Down Expand Up @@ -81,11 +81,11 @@ type Awakeable interface {
// CallClient represents all the different ways you can invoke a particular service/key/method tuple.
type CallClient interface {
// RequestFuture makes a call and returns a handle on a future response
RequestFuture(input any) (ResponseFuture, error)
RequestFuture(input any) ResponseFuture
// Request makes a call and blocks on getting the response which is stored in output
Request(input any, output any) error
// Send makes a one-way call which is executed in the background
Send(input any, delay time.Duration) error
Send(input any, delay time.Duration)
}

// ResponseFuture is a handle on a potentially not-yet completed outbound call.
Expand Down Expand Up @@ -166,7 +166,8 @@ type ObjectSharedContext interface {
// KeyValueReader is the set of read-only methods which can be used in all Virtual Object handlers.
type KeyValueReader interface {
// Get gets value associated with key and stores it in value
// If key does not exist, this function returns ErrKeyNotFound
// If key does not exist, this function returns [ErrKeyNotFound]
// If the invocation was cancelled while obtaining the state, a cancellation error is returned
// Note: Use GetAs generic helper function to avoid passing in a value pointer
Get(key string, value any, options ...options.GetOption) error
// Keys returns a list of all associated key
Expand All @@ -179,7 +180,7 @@ type KeyValueReader interface {
// KeyValueWriter is the set of mutating methods which can be used in exclusive-mode Virtual Object handlers.
type KeyValueWriter interface {
// Set sets a value against a key, using the provided codec (defaults to JSON)
Set(key string, value any, options ...options.SetOption) error
Set(key string, value any, options ...options.SetOption)
// Clear deletes a key
Clear(key string)
// ClearAll drops all stored state associated with key
Expand Down
12 changes: 3 additions & 9 deletions examples/codegen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,10 @@ func (c counter) Add(ctx restate.ObjectContext, req *helloworld.AddRequest) (*he
}

count += req.Delta
if err := ctx.Set("counter", count); err != nil {
return nil, err
}
ctx.Set("counter", count)

for _, awakeableID := range watchers {
if err := ctx.ResolveAwakeable(awakeableID, count); err != nil {
return nil, err
}
ctx.ResolveAwakeable(awakeableID, count)
}
ctx.Clear("watchers")

Expand All @@ -74,9 +70,7 @@ func (c counter) AddWatcher(ctx restate.ObjectContext, req *helloworld.AddWatche
return nil, err
}
watchers = append(watchers, req.AwakeableId)
if err := ctx.Set("watchers", watchers); err != nil {
return nil, err
}
ctx.Set("watchers", watchers)
return &helloworld.AddWatcherResponse{}, nil
}

Expand Down
13 changes: 5 additions & 8 deletions examples/ticketreservation/ticket_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ func (t *ticketService) Reserve(ctx restate.ObjectContext, _ restate.Void) (bool
}

if status == TicketAvailable {
return true, ctx.Set("status", TicketReserved)
ctx.Set("status", TicketReserved)
return true, nil
}

return false, nil
Expand Down Expand Up @@ -59,7 +60,8 @@ func (t *ticketService) MarkAsSold(ctx restate.ObjectContext, _ restate.Void) (v
}

if status == TicketReserved {
return void, ctx.Set("status", TicketSold)
ctx.Set("status", TicketSold)
return void, nil
}

return void, nil
Expand All @@ -69,10 +71,5 @@ func (t *ticketService) Status(ctx restate.ObjectSharedContext, _ restate.Void)
ticketId := ctx.Key()
ctx.Log().Info("mark ticket as sold", "ticket", ticketId)

status, err := restate.GetAs[TicketStatus](ctx, "status")
if err != nil && !errors.Is(err, restate.ErrKeyNotFound) {
return status, err
}

return status, nil
return restate.GetAs[TicketStatus](ctx, "status")
}
26 changes: 7 additions & 19 deletions examples/ticketreservation/user_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,14 @@ func (u *userSession) AddTicket(ctx restate.ObjectContext, ticketId string) (boo

// add ticket to list of tickets
tickets, err := restate.GetAs[[]string](ctx, "tickets")

if err != nil && !errors.Is(err, restate.ErrKeyNotFound) {
return false, err
}

tickets = append(tickets, ticketId)

if err := ctx.Set("tickets", tickets); err != nil {
return false, err
}

if err := ctx.Object(UserSessionServiceName, userId, "ExpireTicket").Send(ticketId, 15*time.Minute); err != nil {
return false, err
}
ctx.Set("tickets", tickets)
ctx.Object(UserSessionServiceName, userId, "ExpireTicket").Send(ticketId, 15*time.Minute)

return true, nil
}
Expand All @@ -66,11 +60,10 @@ func (u *userSession) ExpireTicket(ctx restate.ObjectContext, ticketId string) (
return void, nil
}

if err := ctx.Set("tickets", tickets); err != nil {
return void, err
}
ctx.Set("tickets", tickets)
ctx.Object(TicketServiceName, ticketId, "Unreserve").Send(nil, 0)

return void, ctx.Object(TicketServiceName, ticketId, "Unreserve").Send(nil, 0)
return void, nil
}

func (u *userSession) Checkout(ctx restate.ObjectContext, _ restate.Void) (bool, error) {
Expand All @@ -88,11 +81,8 @@ func (u *userSession) Checkout(ctx restate.ObjectContext, _ restate.Void) (bool,

timeout := ctx.After(time.Minute)

request, err := restate.CallAs[PaymentResponse](ctx.Object(CheckoutServiceName, "", "Payment")).
request := restate.CallAs[PaymentResponse](ctx.Object(CheckoutServiceName, "", "Payment")).
RequestFuture(PaymentRequest{UserID: userId, Tickets: tickets})
if err != nil {
return false, err
}

// race between the request and the timeout
switch ctx.Select(timeout, request).Select() {
Expand All @@ -113,9 +103,7 @@ func (u *userSession) Checkout(ctx restate.ObjectContext, _ restate.Void) (bool,

for _, ticket := range tickets {
call := ctx.Object(TicketServiceName, ticket, "MarkAsSold")
if err := call.Send(nil, 0); err != nil {
return false, err
}
call.Send(nil, 0)
}

ctx.Clear("tickets")
Expand Down
25 changes: 10 additions & 15 deletions facilitators.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import (

// GetAs get the value for a key, returning a typed response instead of accepting a pointer.
// If there is no associated value with key, [ErrKeyNotFound] is returned
// If the invocation was cancelled while obtaining the state, a cancellation error is returned, however this
// can currently only occur if RESTATE_WORKER__INVOKER__DISABLE_EAGER_STATE is set to true (default false).
// If this flag is not true, err will always be ErrKeyNotFound or nil.
func GetAs[T any](ctx ObjectSharedContext, key string, options ...options.GetOption) (output T, err error) {
err = ctx.Get(key, &output, options...)
return
Expand Down Expand Up @@ -51,11 +54,11 @@ func AwakeableAs[T any](ctx Context, options ...options.AwakeableOption) TypedAw
// TypedCallClient is an extension of [CallClient] which deals in typed values
type TypedCallClient[I any, O any] interface {
// RequestFuture makes a call and returns a handle on a future response
RequestFuture(input I) (TypedResponseFuture[O], error)
RequestFuture(input I) TypedResponseFuture[O]
// Request makes a call and blocks on getting the response
Request(input I) (O, error)
// Send makes a one-way call which is executed in the background
Send(input I, delay time.Duration) error
Send(input I, delay time.Duration)
}

type typedCallClient[I any, O any] struct {
Expand All @@ -70,24 +73,16 @@ func NewTypedCallClient[I any, O any](client CallClient) TypedCallClient[I, O] {
}

func (t typedCallClient[I, O]) Request(input I) (output O, err error) {
fut, err := t.inner.RequestFuture(input)
if err != nil {
return output, err
}
err = fut.Response(&output)
err = t.inner.RequestFuture(input).Response(&output)
return
}

func (t typedCallClient[I, O]) RequestFuture(input I) (TypedResponseFuture[O], error) {
fut, err := t.inner.RequestFuture(input)
if err != nil {
return nil, err
}
return typedResponseFuture[O]{fut}, nil
func (t typedCallClient[I, O]) RequestFuture(input I) TypedResponseFuture[O] {
return typedResponseFuture[O]{t.inner.RequestFuture(input)}
}

func (t typedCallClient[I, O]) Send(input I, delay time.Duration) error {
return t.inner.Send(input, delay)
func (t typedCallClient[I, O]) Send(input I, delay time.Duration) {
t.inner.Send(input, delay)
}

// TypedResponseFuture is an extension of [ResponseFuture] which returns typed responses instead of accepting a pointer
Expand Down
6 changes: 4 additions & 2 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ func (h *serviceHandler[I, O]) Call(ctx Context, bytes []byte) ([]byte, error) {

bytes, err = encoding.Marshal(h.options.Codec, output)
if err != nil {
return nil, TerminalError(fmt.Errorf("failed to serialize output: %w", err))
// we don't use a terminal error here as this is hot-fixable by changing the return type
return nil, fmt.Errorf("failed to serialize output: %w", err)
}

return bytes, nil
Expand Down Expand Up @@ -170,7 +171,8 @@ func (h *objectHandler[I, O]) Call(ctx ObjectContext, bytes []byte) ([]byte, err

bytes, err = encoding.Marshal(h.options.Codec, output)
if err != nil {
return nil, TerminalError(fmt.Errorf("failed to serialize output: %w", err))
// we don't use a terminal error here as this is hot-fixable by changing the return type
return nil, fmt.Errorf("failed to serialize output: %w", err)
}

return bytes, nil
Expand Down
23 changes: 10 additions & 13 deletions internal/state/call.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
restate "github.com/restatedev/sdk-go"
"github.com/restatedev/sdk-go/encoding"
protocol "github.com/restatedev/sdk-go/generated/dev/restate/service"
"github.com/restatedev/sdk-go/internal/errors"
"github.com/restatedev/sdk-go/internal/futures"
"github.com/restatedev/sdk-go/internal/options"
"github.com/restatedev/sdk-go/internal/wire"
Expand All @@ -25,22 +24,24 @@ type serviceCall struct {
}

// RequestFuture makes a call and returns a handle on the response
func (c *serviceCall) RequestFuture(input any) (restate.ResponseFuture, error) {
func (c *serviceCall) RequestFuture(input any) restate.ResponseFuture {
bytes, err := encoding.Marshal(c.options.Codec, input)
if err != nil {
return nil, errors.NewTerminalError(fmt.Errorf("failed to marshal RequestFuture input: %w", err))
panic(c.machine.newCodecFailure(fmt.Errorf("failed to marshal RequestFuture input: %w", err)))
}

entry, entryIndex := c.machine.doCall(c.service, c.key, c.method, c.options.Headers, bytes)

return decodingResponseFuture{
futures.NewResponseFuture(c.machine.suspensionCtx, entry, entryIndex, func(err error) any { return c.machine.newProtocolViolation(entry, err) }),
c.machine,
c.options,
}, nil
}
}

type decodingResponseFuture struct {
*futures.ResponseFuture
machine *Machine
options options.CallOptions
}

Expand All @@ -51,29 +52,25 @@ func (d decodingResponseFuture) Response(output any) (err error) {
}

if err := encoding.Unmarshal(d.options.Codec, bytes, output); err != nil {
return errors.NewTerminalError(fmt.Errorf("failed to unmarshal Call response into output: %w", err))
panic(d.machine.newCodecFailure(fmt.Errorf("failed to unmarshal Call response into output: %w", err)))
}

return nil
}

// Request makes a call and blocks on the response
func (c *serviceCall) Request(input any, output any) error {
fut, err := c.RequestFuture(input)
if err != nil {
return err
}
return fut.Response(output)
return c.RequestFuture(input).Response(output)
}

// Send runs a call in the background after delay duration
func (c *serviceCall) Send(input any, delay time.Duration) error {
func (c *serviceCall) Send(input any, delay time.Duration) {
bytes, err := encoding.Marshal(c.options.Codec, input)
if err != nil {
return errors.NewTerminalError(fmt.Errorf("failed to marshal Send input: %w", err))
panic(c.machine.newCodecFailure(fmt.Errorf("failed to marshal Send input: %w", err)))
}
c.machine.sendCall(c.service, c.key, c.method, c.options.Headers, bytes, delay)
return nil
return
}

func (m *Machine) doCall(service, key, method string, headersMap map[string]string, params []byte) (*wire.CallEntryMessage, uint32) {
Expand Down
Loading

0 comments on commit d6322af

Please sign in to comment.