Skip to content

Commit

Permalink
Use panics for serialisation errors
Browse files Browse the repository at this point in the history
  • Loading branch information
jackkleeman committed Aug 20, 2024
1 parent 841b7c3 commit 5d32719
Show file tree
Hide file tree
Showing 19 changed files with 151 additions and 247 deletions.
12 changes: 6 additions & 6 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type Context interface {
Awakeable(options ...options.AwakeableOption) Awakeable
// ResolveAwakeable allows an awakeable (not necessarily from this service) to be
// resolved with a particular value.
ResolveAwakeable(id string, value any, options ...options.ResolveAwakeableOption) error
ResolveAwakeable(id string, value any, options ...options.ResolveAwakeableOption)
// ResolveAwakeable allows an awakeable (not necessarily from this service) to be
// rejected with a particular error.
RejectAwakeable(id string, reason error)
Expand Down Expand Up @@ -81,11 +81,11 @@ type Awakeable interface {
// CallClient represents all the different ways you can invoke a particular service/key/method tuple.
type CallClient interface {
// RequestFuture makes a call and returns a handle on a future response
RequestFuture(input any) (ResponseFuture, error)
RequestFuture(input any) ResponseFuture
// Request makes a call and blocks on getting the response which is stored in output
Request(input any, output any) error
// Send makes a one-way call which is executed in the background
Send(input any, delay time.Duration) error
Send(input any, delay time.Duration)
}

// ResponseFuture is a handle on a potentially not-yet completed outbound call.
Expand Down Expand Up @@ -166,9 +166,9 @@ type ObjectSharedContext interface {
// KeyValueReader is the set of read-only methods which can be used in all Virtual Object handlers.
type KeyValueReader interface {
// Get gets value associated with key and stores it in value
// If key does not exist, this function returns ErrKeyNotFound
// If key does not exist, this function returns false
// Note: Use GetAs generic helper function to avoid passing in a value pointer
Get(key string, value any, options ...options.GetOption) error
Get(key string, value any, options ...options.GetOption) bool
// Keys returns a list of all associated key
Keys() []string
// Key retrieves the key for this virtual object invocation. This is a no-op and is
Expand All @@ -179,7 +179,7 @@ type KeyValueReader interface {
// KeyValueWriter is the set of mutating methods which can be used in exclusive-mode Virtual Object handlers.
type KeyValueWriter interface {
// Set sets a value against a key, using the provided codec (defaults to JSON)
Set(key string, value any, options ...options.SetOption) error
Set(key string, value any, options ...options.SetOption)
// Clear deletes a key
Clear(key string)
// ClearAll drops all stored state associated with key
Expand Down
33 changes: 7 additions & 26 deletions examples/codegen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"context"
"errors"
"fmt"
"log/slog"
"os"
Expand Down Expand Up @@ -34,49 +33,31 @@ type counter struct {
}

func (c counter) Add(ctx restate.ObjectContext, req *helloworld.AddRequest) (*helloworld.GetResponse, error) {
count, err := restate.GetAs[int64](ctx, "counter")
if err != nil && !errors.Is(err, restate.ErrKeyNotFound) {
return nil, err
}
count, _ := restate.GetAs[int64](ctx, "counter")

watchers, err := restate.GetAs[[]string](ctx, "watchers")
if err != nil && !errors.Is(err, restate.ErrKeyNotFound) {
return nil, err
}
watchers, _ := restate.GetAs[[]string](ctx, "watchers")

count += req.Delta
if err := ctx.Set("counter", count); err != nil {
return nil, err
}
ctx.Set("counter", count)

for _, awakeableID := range watchers {
if err := ctx.ResolveAwakeable(awakeableID, count); err != nil {
return nil, err
}
ctx.ResolveAwakeable(awakeableID, count)
}
ctx.Clear("watchers")

return &helloworld.GetResponse{Value: count}, nil
}

func (c counter) Get(ctx restate.ObjectSharedContext, _ *helloworld.GetRequest) (*helloworld.GetResponse, error) {
count, err := restate.GetAs[int64](ctx, "counter")
if err != nil && !errors.Is(err, restate.ErrKeyNotFound) {
return nil, err
}
count, _ := restate.GetAs[int64](ctx, "counter")

return &helloworld.GetResponse{Value: count}, nil
}

func (c counter) AddWatcher(ctx restate.ObjectContext, req *helloworld.AddWatcherRequest) (*helloworld.AddWatcherResponse, error) {
watchers, err := restate.GetAs[[]string](ctx, "watchers")
if err != nil && !errors.Is(err, restate.ErrKeyNotFound) {
return nil, err
}
watchers, _ := restate.GetAs[[]string](ctx, "watchers")
watchers = append(watchers, req.AwakeableId)
if err := ctx.Set("watchers", watchers); err != nil {
return nil, err
}
ctx.Set("watchers", watchers)
return &helloworld.AddWatcherResponse{}, nil
}

Expand Down
30 changes: 8 additions & 22 deletions examples/ticketreservation/ticket_service.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package main

import (
"errors"

restate "github.com/restatedev/sdk-go"
)

Expand All @@ -21,13 +19,11 @@ type ticketService struct{}
func (t *ticketService) ServiceName() string { return TicketServiceName }

func (t *ticketService) Reserve(ctx restate.ObjectContext, _ restate.Void) (bool, error) {
status, err := restate.GetAs[TicketStatus](ctx, "status")
if err != nil && !errors.Is(err, restate.ErrKeyNotFound) {
return false, err
}
status, _ := restate.GetAs[TicketStatus](ctx, "status")

if status == TicketAvailable {
return true, ctx.Set("status", TicketReserved)
ctx.Set("status", TicketReserved)
return true, nil
}

return false, nil
Expand All @@ -36,11 +32,8 @@ func (t *ticketService) Reserve(ctx restate.ObjectContext, _ restate.Void) (bool
func (t *ticketService) Unreserve(ctx restate.ObjectContext, _ restate.Void) (void restate.Void, err error) {
ticketId := ctx.Key()
ctx.Log().Info("un-reserving ticket", "ticket", ticketId)
status, err := restate.GetAs[TicketStatus](ctx, "status")
if err != nil && !errors.Is(err, restate.ErrKeyNotFound) {
return void, err
}

status, _ := restate.GetAs[TicketStatus](ctx, "status")
if status != TicketSold {
ctx.Clear("status")
return void, nil
Expand All @@ -53,13 +46,10 @@ func (t *ticketService) MarkAsSold(ctx restate.ObjectContext, _ restate.Void) (v
ticketId := ctx.Key()
ctx.Log().Info("mark ticket as sold", "ticket", ticketId)

status, err := restate.GetAs[TicketStatus](ctx, "status")
if err != nil && !errors.Is(err, restate.ErrKeyNotFound) {
return void, err
}

status, _ := restate.GetAs[TicketStatus](ctx, "status")
if status == TicketReserved {
return void, ctx.Set("status", TicketSold)
ctx.Set("status", TicketSold)
return void, nil
}

return void, nil
Expand All @@ -69,10 +59,6 @@ func (t *ticketService) Status(ctx restate.ObjectSharedContext, _ restate.Void)
ticketId := ctx.Key()
ctx.Log().Info("mark ticket as sold", "ticket", ticketId)

status, err := restate.GetAs[TicketStatus](ctx, "status")
if err != nil && !errors.Is(err, restate.ErrKeyNotFound) {
return status, err
}

status, _ := restate.GetAs[TicketStatus](ctx, "status")
return status, nil
}
43 changes: 10 additions & 33 deletions examples/ticketreservation/user_session.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"errors"
"slices"
"time"

Expand Down Expand Up @@ -29,30 +28,17 @@ func (u *userSession) AddTicket(ctx restate.ObjectContext, ticketId string) (boo
}

// add ticket to list of tickets
tickets, err := restate.GetAs[[]string](ctx, "tickets")

if err != nil && !errors.Is(err, restate.ErrKeyNotFound) {
return false, err
}

tickets, _ := restate.GetAs[[]string](ctx, "tickets")
tickets = append(tickets, ticketId)

if err := ctx.Set("tickets", tickets); err != nil {
return false, err
}

if err := ctx.Object(UserSessionServiceName, userId, "ExpireTicket").Send(ticketId, 15*time.Minute); err != nil {
return false, err
}
ctx.Set("tickets", tickets)
ctx.Object(UserSessionServiceName, userId, "ExpireTicket").Send(ticketId, 15*time.Minute)

return true, nil
}

func (u *userSession) ExpireTicket(ctx restate.ObjectContext, ticketId string) (void restate.Void, err error) {
tickets, err := restate.GetAs[[]string](ctx, "tickets")
if err != nil && !errors.Is(err, restate.ErrKeyNotFound) {
return void, err
}
tickets, _ := restate.GetAs[[]string](ctx, "tickets")

deleted := false
tickets = slices.DeleteFunc(tickets, func(ticket string) bool {
Expand All @@ -66,19 +52,15 @@ func (u *userSession) ExpireTicket(ctx restate.ObjectContext, ticketId string) (
return void, nil
}

if err := ctx.Set("tickets", tickets); err != nil {
return void, err
}
ctx.Set("tickets", tickets)
ctx.Object(TicketServiceName, ticketId, "Unreserve").Send(nil, 0)

return void, ctx.Object(TicketServiceName, ticketId, "Unreserve").Send(nil, 0)
return void, nil
}

func (u *userSession) Checkout(ctx restate.ObjectContext, _ restate.Void) (bool, error) {
userId := ctx.Key()
tickets, err := restate.GetAs[[]string](ctx, "tickets")
if err != nil && !errors.Is(err, restate.ErrKeyNotFound) {
return false, err
}
tickets, _ := restate.GetAs[[]string](ctx, "tickets")

ctx.Log().Info("tickets in basket", "tickets", tickets)

Expand All @@ -88,11 +70,8 @@ func (u *userSession) Checkout(ctx restate.ObjectContext, _ restate.Void) (bool,

timeout := ctx.After(time.Minute)

request, err := restate.CallAs[PaymentResponse](ctx.Object(CheckoutServiceName, "", "Payment")).
request := restate.CallAs[PaymentResponse](ctx.Object(CheckoutServiceName, "", "Payment")).
RequestFuture(PaymentRequest{UserID: userId, Tickets: tickets})
if err != nil {
return false, err
}

// race between the request and the timeout
switch ctx.Select(timeout, request).Select() {
Expand All @@ -113,9 +92,7 @@ func (u *userSession) Checkout(ctx restate.ObjectContext, _ restate.Void) (bool,

for _, ticket := range tickets {
call := ctx.Object(TicketServiceName, ticket, "MarkAsSold")
if err := call.Send(nil, 0); err != nil {
return false, err
}
call.Send(nil, 0)
}

ctx.Clear("tickets")
Expand Down
28 changes: 10 additions & 18 deletions facilitators.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
)

// GetAs get the value for a key, returning a typed response instead of accepting a pointer.
// If there is no associated value with key, [ErrKeyNotFound] is returned
func GetAs[T any](ctx ObjectSharedContext, key string, options ...options.GetOption) (output T, err error) {
err = ctx.Get(key, &output, options...)
// If there is no associated value with key, the zero value of T and false is returned
func GetAs[T any](ctx ObjectSharedContext, key string, options ...options.GetOption) (output T, ok bool) {
ok = ctx.Get(key, &output, options...)
return
}

Expand Down Expand Up @@ -51,11 +51,11 @@ func AwakeableAs[T any](ctx Context, options ...options.AwakeableOption) TypedAw
// TypedCallClient is an extension of [CallClient] which deals in typed values
type TypedCallClient[I any, O any] interface {
// RequestFuture makes a call and returns a handle on a future response
RequestFuture(input I) (TypedResponseFuture[O], error)
RequestFuture(input I) TypedResponseFuture[O]
// Request makes a call and blocks on getting the response
Request(input I) (O, error)
// Send makes a one-way call which is executed in the background
Send(input I, delay time.Duration) error
Send(input I, delay time.Duration)
}

type typedCallClient[I any, O any] struct {
Expand All @@ -70,24 +70,16 @@ func NewTypedCallClient[I any, O any](client CallClient) TypedCallClient[I, O] {
}

func (t typedCallClient[I, O]) Request(input I) (output O, err error) {
fut, err := t.inner.RequestFuture(input)
if err != nil {
return output, err
}
err = fut.Response(&output)
err = t.inner.RequestFuture(input).Response(&output)
return
}

func (t typedCallClient[I, O]) RequestFuture(input I) (TypedResponseFuture[O], error) {
fut, err := t.inner.RequestFuture(input)
if err != nil {
return nil, err
}
return typedResponseFuture[O]{fut}, nil
func (t typedCallClient[I, O]) RequestFuture(input I) TypedResponseFuture[O] {
return typedResponseFuture[O]{t.inner.RequestFuture(input)}
}

func (t typedCallClient[I, O]) Send(input I, delay time.Duration) error {
return t.inner.Send(input, delay)
func (t typedCallClient[I, O]) Send(input I, delay time.Duration) {
t.inner.Send(input, delay)
}

// TypedResponseFuture is an extension of [ResponseFuture] which returns typed responses instead of accepting a pointer
Expand Down
6 changes: 4 additions & 2 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ func (h *serviceHandler[I, O]) Call(ctx Context, bytes []byte) ([]byte, error) {

bytes, err = encoding.Marshal(h.options.Codec, output)
if err != nil {
return nil, TerminalError(fmt.Errorf("failed to serialize output: %w", err))
// we don't use a terminal error here as this is hot-fixable by changing the return type
return nil, fmt.Errorf("failed to serialize output: %w", err)
}

return bytes, nil
Expand Down Expand Up @@ -170,7 +171,8 @@ func (h *objectHandler[I, O]) Call(ctx ObjectContext, bytes []byte) ([]byte, err

bytes, err = encoding.Marshal(h.options.Codec, output)
if err != nil {
return nil, TerminalError(fmt.Errorf("failed to serialize output: %w", err))
// we don't use a terminal error here as this is hot-fixable by changing the return type
return nil, fmt.Errorf("failed to serialize output: %w", err)
}

return bytes, nil
Expand Down
Loading

0 comments on commit 5d32719

Please sign in to comment.