From 4ac85397d7718f5ec6ab614312cb5f2370a1d398 Mon Sep 17 00:00:00 2001 From: Jack Kleeman Date: Wed, 21 Aug 2024 13:07:07 +0100 Subject: [PATCH] Always interact with ctx through facilitator functions --- context.go | 87 ++------ examples/codegen/main.go | 24 +-- .../codegen/proto/helloworld_restate.pb.go | 30 +-- examples/ticketreservation/checkout.go | 4 +- examples/ticketreservation/ticket_service.go | 20 +- examples/ticketreservation/user_session.go | 30 +-- facilitators.go | 197 +++++++++++++----- interfaces/interfaces.go | 67 ++++++ internal/state/call.go | 3 +- internal/state/select.go | 5 +- internal/state/state.go | 11 +- internal/state/state_test.go | 37 ++-- test-services/awakeableholder.go | 8 +- test-services/canceltest.go | 18 +- test-services/counter.go | 14 +- test-services/failing.go | 6 +- test-services/kill.go | 10 +- test-services/listobject.go | 10 +- test-services/mapobject.go | 10 +- test-services/nondeterministic.go | 28 +-- test-services/proxy.go | 11 +- test-services/testutils.go | 17 +- test-services/upgradetest.go | 6 +- 23 files changed, 377 insertions(+), 276 deletions(-) create mode 100644 interfaces/interfaces.go diff --git a/context.go b/context.go index 1dae761..fea02ef 100644 --- a/context.go +++ b/context.go @@ -5,6 +5,7 @@ import ( "log/slog" "time" + "github.com/restatedev/sdk-go/interfaces" "github.com/restatedev/sdk-go/internal/futures" "github.com/restatedev/sdk-go/internal/options" "github.com/restatedev/sdk-go/internal/rand" @@ -24,28 +25,28 @@ type Context interface { // After is an alternative to Context.Sleep which allows you to complete other tasks concurrently // with the sleep. This is particularly useful when combined with Context.Select to race between // the sleep and other Selectable operations. - After(d time.Duration) After + After(d time.Duration) interfaces.After - // Service gets a Service accessor by service and method name - // Note: use the CallAs helper function to deserialise return values - Service(service, method string, opts ...options.ClientOption) CallClient + // Service gets a Service request client by service and method name + // Note: use module-level [Service] to deserialise return values + Service(service, method string, opts ...options.ClientOption) interfaces.Client - // Object gets a Object accessor by name, key and method name - // Note: use the CallAs helper function to receive serialised values - Object(object, key, method string, opts ...options.ClientOption) CallClient + // Object gets an Object request client by service name, key and method name + // Note: use module-level [Object] to receive serialised values + Object(object, key, method string, opts ...options.ClientOption) interfaces.Client // Run runs the function (fn), storing final results (including terminal errors) // durably in the journal, or otherwise for transient errors stopping execution // so Restate can retry the invocation. Replays will produce the same value, so // all non-deterministic operations (eg, generating a unique ID) *must* happen // inside Run blocks. - // Note: use the RunAs helper function to get typed output values instead of providing an output pointer + // Note: use module-level [Run] to get typed output values instead of providing an output pointer Run(fn func(ctx RunContext) (any, error), output any, opts ...options.RunOption) error // Awakeable returns a Restate awakeable; a 'promise' to a future // value or error, that can be resolved or rejected by other services. - // Note: use the AwakeableAs helper function to avoid having to pass a output pointer to Awakeable.Result() - Awakeable(options ...options.AwakeableOption) Awakeable + // Note: use module-level [Awakeable] to avoid having to pass a output pointer to Awakeable.Result() + Awakeable(options ...options.AwakeableOption) interfaces.Awakeable // ResolveAwakeable allows an awakeable (not necessarily from this service) to be // resolved with a particular value. ResolveAwakeable(id string, value any, options ...options.ResolveAwakeableOption) @@ -57,55 +58,8 @@ type Context interface { // which allows you to safely run them in parallel. The Selector will store the order // that things complete in durably inside Restate, so that on replay the same order // can be used. This avoids non-determinism. It is *not* safe to use goroutines or channels - // outside of Context.Run functions, as they do not behave deterministically. - Select(futs ...Selectable) Selector -} - -// Selectable is implemented by types that may be passed to Context.Select -type Selectable = futures.Selectable - -// Awakeable is the Go representation of a Restate awakeable; a 'promise' to a future -// value or error, that can be resolved or rejected by other services. -type Awakeable interface { - // Id returns the awakeable ID, which can be stored or sent to a another service - Id() string - // Result blocks on receiving the result of the awakeable, storing the value it was - // resolved with in output or otherwise returning the error it was rejected with. - // It is *not* safe to call this in a goroutine - use Context.Select if you - // want to wait on multiple results at once. - // Note: use the AwakeableAs helper function to avoid having to pass a output pointer - Result(output any) error - Selectable -} - -// CallClient represents all the different ways you can invoke a particular service/key/method tuple. -type CallClient interface { - // RequestFuture makes a call and returns a handle on a future response - RequestFuture(input any, opts ...options.RequestOption) ResponseFuture - // Request makes a call and blocks on getting the response which is stored in output - Request(input any, output any, opts ...options.RequestOption) error - // Send makes a one-way call which is executed in the background - Send(input any, opts ...options.SendOption) -} - -// ResponseFuture is a handle on a potentially not-yet completed outbound call. -type ResponseFuture interface { - // Response blocks on the response to the call and stores it in output, or returns the associated error - // It is *not* safe to call this in a goroutine - use Context.Select if you - // want to wait on multiple results at once. - Response(output any) error - Selectable -} - -// Selector is an iterator over a list of blocking Restate operations that are running -// in the background. -type Selector interface { - // Remaining returns whether there are still operations that haven't been returned by Select(). - // There will always be exactly the same number of results as there were operations - // given to Context.Select - Remaining() bool - // Select blocks on the next completed operation or returns nil if there are none left - Select() Selectable + // outside of [Run] functions, as they do not behave deterministically. + Select(futs ...futures.Selectable) interfaces.Selector } // RunContext methods are the only methods of [Context] that are safe to call from inside a .Run() @@ -137,17 +91,6 @@ type Request struct { Body []byte } -// After is a handle on a Sleep operation which allows you to do other work concurrently -// with the sleep. -type After interface { - // Done blocks waiting on the remaining duration of the sleep. - // It is *not* safe to call this in a goroutine - use Context.Select if you want to wait on multiple - // results at once. Can return a terminal error in the case where the invocation was cancelled mid-sleep, - // hence Done() should always be called, even after using Context.Select. - Done() error - Selectable -} - // ObjectContext is an extension of [Context] which can be used in exclusive-mode Virtual Object handlers, // giving mutable access to state. type ObjectContext interface { @@ -173,7 +116,7 @@ type KeyValueReader interface { Get(key string, value any, options ...options.GetOption) error // Keys returns a list of all associated key // If the invocation was cancelled while obtaining the state (only possible if eager state is disabled), - // a cancellation error is returned. If eager state is enabled (the default), err will always be nil. + // a cancellation error is returned. Keys() ([]string, error) // Key retrieves the key for this virtual object invocation. This is a no-op and is // always safe to call. @@ -186,6 +129,6 @@ type KeyValueWriter interface { Set(key string, value any, options ...options.SetOption) // Clear deletes a key Clear(key string) - // ClearAll drops all stored state associated with key + // ClearAll drops all stored state associated with this Object key ClearAll() } diff --git a/examples/codegen/main.go b/examples/codegen/main.go index bc22728..757a4eb 100644 --- a/examples/codegen/main.go +++ b/examples/codegen/main.go @@ -33,29 +33,29 @@ type counter struct { } func (c counter) Add(ctx restate.ObjectContext, req *helloworld.AddRequest) (*helloworld.GetResponse, error) { - count, err := restate.GetAs[int64](ctx, "counter") + count, err := restate.Get[int64](ctx, "counter") if err != nil { return nil, err } - watchers, err := restate.GetAs[[]string](ctx, "watchers") + watchers, err := restate.Get[[]string](ctx, "watchers") if err != nil { return nil, err } count += req.Delta - ctx.Set("counter", count) + restate.Set(ctx, "counter", count) for _, awakeableID := range watchers { - ctx.ResolveAwakeable(awakeableID, count) + restate.ResolveAwakeable(ctx, awakeableID, count) } - ctx.Clear("watchers") + restate.Clear(ctx, "watchers") return &helloworld.GetResponse{Value: count}, nil } func (c counter) Get(ctx restate.ObjectSharedContext, _ *helloworld.GetRequest) (*helloworld.GetResponse, error) { - count, err := restate.GetAs[int64](ctx, "counter") + count, err := restate.Get[int64](ctx, "counter") if err != nil { return nil, err } @@ -64,22 +64,22 @@ func (c counter) Get(ctx restate.ObjectSharedContext, _ *helloworld.GetRequest) } func (c counter) AddWatcher(ctx restate.ObjectContext, req *helloworld.AddWatcherRequest) (*helloworld.AddWatcherResponse, error) { - watchers, err := restate.GetAs[[]string](ctx, "watchers") + watchers, err := restate.Get[[]string](ctx, "watchers") if err != nil { return nil, err } watchers = append(watchers, req.AwakeableId) - ctx.Set("watchers", watchers) + restate.Set(ctx, "watchers", watchers) return &helloworld.AddWatcherResponse{}, nil } func (c counter) Watch(ctx restate.ObjectSharedContext, req *helloworld.WatchRequest) (*helloworld.GetResponse, error) { - awakeable := restate.AwakeableAs[int64](ctx) + awakeable := restate.Awakeable[int64](ctx) // since this is a shared handler, we need to use a separate exclusive handler to store the awakeable ID // if there is an in-flight Add call, this will take effect after it completes // we could add a version counter check here to detect changes that happen mid-request and return immediately - if _, err := helloworld.NewCounterClient(ctx, ctx.Key()). + if _, err := helloworld.NewCounterClient(ctx, restate.Key(ctx)). AddWatcher(). Request(&helloworld.AddWatcherRequest{AwakeableId: awakeable.Id()}); err != nil { return nil, err @@ -96,10 +96,10 @@ func (c counter) Watch(ctx restate.ObjectSharedContext, req *helloworld.WatchReq return &helloworld.GetResponse{Value: next}, nil } - after := ctx.After(timeout) + after := restate.After(ctx, timeout) // this is the safe way to race two results - selector := ctx.Select(after, awakeable) + selector := restate.Select(ctx, after, awakeable) if selector.Select() == after { // the timeout won diff --git a/examples/codegen/proto/helloworld_restate.pb.go b/examples/codegen/proto/helloworld_restate.pb.go index 6851c3a..0e33b4a 100644 --- a/examples/codegen/proto/helloworld_restate.pb.go +++ b/examples/codegen/proto/helloworld_restate.pb.go @@ -13,7 +13,7 @@ import ( // GreeterClient is the client API for Greeter service. type GreeterClient interface { - SayHello(opts ...sdk_go.ClientOption) sdk_go.TypedCallClient[*HelloRequest, *HelloResponse] + SayHello(opts ...sdk_go.ClientOption) sdk_go.TypedClient[*HelloRequest, *HelloResponse] } type greeterClient struct { @@ -28,12 +28,12 @@ func NewGreeterClient(ctx sdk_go.Context, opts ...sdk_go.ClientOption) GreeterCl cOpts, } } -func (c *greeterClient) SayHello(opts ...sdk_go.ClientOption) sdk_go.TypedCallClient[*HelloRequest, *HelloResponse] { +func (c *greeterClient) SayHello(opts ...sdk_go.ClientOption) sdk_go.TypedClient[*HelloRequest, *HelloResponse] { cOpts := c.options if len(opts) > 0 { cOpts = append(append([]sdk_go.ClientOption{}, cOpts...), opts...) } - return sdk_go.NewTypedCallClient[*HelloRequest, *HelloResponse](c.ctx.Service("Greeter", "SayHello", cOpts...)) + return sdk_go.NewTypedClient[*HelloRequest, *HelloResponse](c.ctx.Service("Greeter", "SayHello", cOpts...)) } // GreeterServer is the server API for Greeter service. @@ -79,13 +79,13 @@ func NewGreeterServer(srv GreeterServer, opts ...sdk_go.ServiceDefinitionOption) // CounterClient is the client API for Counter service. type CounterClient interface { // Mutate the value - Add(opts ...sdk_go.ClientOption) sdk_go.TypedCallClient[*AddRequest, *GetResponse] + Add(opts ...sdk_go.ClientOption) sdk_go.TypedClient[*AddRequest, *GetResponse] // Get the current value - Get(opts ...sdk_go.ClientOption) sdk_go.TypedCallClient[*GetRequest, *GetResponse] + Get(opts ...sdk_go.ClientOption) sdk_go.TypedClient[*GetRequest, *GetResponse] // Internal method to store an awakeable ID for the Watch method - AddWatcher(opts ...sdk_go.ClientOption) sdk_go.TypedCallClient[*AddWatcherRequest, *AddWatcherResponse] + AddWatcher(opts ...sdk_go.ClientOption) sdk_go.TypedClient[*AddWatcherRequest, *AddWatcherResponse] // Wait for the counter to change and then return the new value - Watch(opts ...sdk_go.ClientOption) sdk_go.TypedCallClient[*WatchRequest, *GetResponse] + Watch(opts ...sdk_go.ClientOption) sdk_go.TypedClient[*WatchRequest, *GetResponse] } type counterClient struct { @@ -102,36 +102,36 @@ func NewCounterClient(ctx sdk_go.Context, key string, opts ...sdk_go.ClientOptio cOpts, } } -func (c *counterClient) Add(opts ...sdk_go.ClientOption) sdk_go.TypedCallClient[*AddRequest, *GetResponse] { +func (c *counterClient) Add(opts ...sdk_go.ClientOption) sdk_go.TypedClient[*AddRequest, *GetResponse] { cOpts := c.options if len(opts) > 0 { cOpts = append(append([]sdk_go.ClientOption{}, cOpts...), opts...) } - return sdk_go.NewTypedCallClient[*AddRequest, *GetResponse](c.ctx.Object("Counter", c.key, "Add", cOpts...)) + return sdk_go.NewTypedClient[*AddRequest, *GetResponse](c.ctx.Object("Counter", c.key, "Add", cOpts...)) } -func (c *counterClient) Get(opts ...sdk_go.ClientOption) sdk_go.TypedCallClient[*GetRequest, *GetResponse] { +func (c *counterClient) Get(opts ...sdk_go.ClientOption) sdk_go.TypedClient[*GetRequest, *GetResponse] { cOpts := c.options if len(opts) > 0 { cOpts = append(append([]sdk_go.ClientOption{}, cOpts...), opts...) } - return sdk_go.NewTypedCallClient[*GetRequest, *GetResponse](c.ctx.Object("Counter", c.key, "Get", cOpts...)) + return sdk_go.NewTypedClient[*GetRequest, *GetResponse](c.ctx.Object("Counter", c.key, "Get", cOpts...)) } -func (c *counterClient) AddWatcher(opts ...sdk_go.ClientOption) sdk_go.TypedCallClient[*AddWatcherRequest, *AddWatcherResponse] { +func (c *counterClient) AddWatcher(opts ...sdk_go.ClientOption) sdk_go.TypedClient[*AddWatcherRequest, *AddWatcherResponse] { cOpts := c.options if len(opts) > 0 { cOpts = append(append([]sdk_go.ClientOption{}, cOpts...), opts...) } - return sdk_go.NewTypedCallClient[*AddWatcherRequest, *AddWatcherResponse](c.ctx.Object("Counter", c.key, "AddWatcher", cOpts...)) + return sdk_go.NewTypedClient[*AddWatcherRequest, *AddWatcherResponse](c.ctx.Object("Counter", c.key, "AddWatcher", cOpts...)) } -func (c *counterClient) Watch(opts ...sdk_go.ClientOption) sdk_go.TypedCallClient[*WatchRequest, *GetResponse] { +func (c *counterClient) Watch(opts ...sdk_go.ClientOption) sdk_go.TypedClient[*WatchRequest, *GetResponse] { cOpts := c.options if len(opts) > 0 { cOpts = append(append([]sdk_go.ClientOption{}, cOpts...), opts...) } - return sdk_go.NewTypedCallClient[*WatchRequest, *GetResponse](c.ctx.Object("Counter", c.key, "Watch", cOpts...)) + return sdk_go.NewTypedClient[*WatchRequest, *GetResponse](c.ctx.Object("Counter", c.key, "Watch", cOpts...)) } // CounterServer is the server API for Counter service. diff --git a/examples/ticketreservation/checkout.go b/examples/ticketreservation/checkout.go index f222540..a16ea40 100644 --- a/examples/ticketreservation/checkout.go +++ b/examples/ticketreservation/checkout.go @@ -26,7 +26,7 @@ func (c *checkout) ServiceName() string { const CheckoutServiceName = "Checkout" func (c *checkout) Payment(ctx restate.Context, request PaymentRequest) (response PaymentResponse, err error) { - uuid := ctx.Rand().UUID().String() + uuid := restate.Rand(ctx).UUID().String() response.ID = uuid @@ -35,7 +35,7 @@ func (c *checkout) Payment(ctx restate.Context, request PaymentRequest) (respons price := len(request.Tickets) * 30 response.Price = price - _, err = restate.RunAs(ctx, func(ctx restate.RunContext) (bool, error) { + _, err = restate.Run(ctx, func(ctx restate.RunContext) (bool, error) { log := ctx.Log().With("uuid", uuid, "price", price) if rand.Float64() < 0.5 { log.Info("payment succeeded") diff --git a/examples/ticketreservation/ticket_service.go b/examples/ticketreservation/ticket_service.go index 28ad90f..cbbe15c 100644 --- a/examples/ticketreservation/ticket_service.go +++ b/examples/ticketreservation/ticket_service.go @@ -19,13 +19,13 @@ type ticketService struct{} func (t *ticketService) ServiceName() string { return TicketServiceName } func (t *ticketService) Reserve(ctx restate.ObjectContext, _ restate.Void) (bool, error) { - status, err := restate.GetAs[TicketStatus](ctx, "status") + status, err := restate.Get[TicketStatus](ctx, "status") if err != nil { return false, err } if status == TicketAvailable { - ctx.Set("status", TicketReserved) + restate.Set(ctx, "status", TicketReserved) return true, nil } @@ -33,15 +33,15 @@ func (t *ticketService) Reserve(ctx restate.ObjectContext, _ restate.Void) (bool } func (t *ticketService) Unreserve(ctx restate.ObjectContext, _ restate.Void) (void restate.Void, err error) { - ticketId := ctx.Key() + ticketId := restate.Key(ctx) ctx.Log().Info("un-reserving ticket", "ticket", ticketId) - status, err := restate.GetAs[TicketStatus](ctx, "status") + status, err := restate.Get[TicketStatus](ctx, "status") if err != nil { return void, err } if status != TicketSold { - ctx.Clear("status") + restate.Clear(ctx, "status") return void, nil } @@ -49,16 +49,16 @@ func (t *ticketService) Unreserve(ctx restate.ObjectContext, _ restate.Void) (vo } func (t *ticketService) MarkAsSold(ctx restate.ObjectContext, _ restate.Void) (void restate.Void, err error) { - ticketId := ctx.Key() + ticketId := restate.Key(ctx) ctx.Log().Info("mark ticket as sold", "ticket", ticketId) - status, err := restate.GetAs[TicketStatus](ctx, "status") + status, err := restate.Get[TicketStatus](ctx, "status") if err != nil { return void, err } if status == TicketReserved { - ctx.Set("status", TicketSold) + restate.Set(ctx, "status", TicketSold) return void, nil } @@ -66,8 +66,8 @@ func (t *ticketService) MarkAsSold(ctx restate.ObjectContext, _ restate.Void) (v } func (t *ticketService) Status(ctx restate.ObjectSharedContext, _ restate.Void) (TicketStatus, error) { - ticketId := ctx.Key() + ticketId := restate.Key(ctx) ctx.Log().Info("mark ticket as sold", "ticket", ticketId) - return restate.GetAs[TicketStatus](ctx, "status") + return restate.Get[TicketStatus](ctx, "status") } diff --git a/examples/ticketreservation/user_session.go b/examples/ticketreservation/user_session.go index b710684..64c03f2 100644 --- a/examples/ticketreservation/user_session.go +++ b/examples/ticketreservation/user_session.go @@ -16,9 +16,9 @@ func (u *userSession) ServiceName() string { } func (u *userSession) AddTicket(ctx restate.ObjectContext, ticketId string) (bool, error) { - userId := ctx.Key() + userId := restate.Key(ctx) - success, err := restate.CallAs[bool](ctx.Object(TicketServiceName, ticketId, "Reserve")).Request(userId) + success, err := restate.Object[bool](ctx, TicketServiceName, ticketId, "Reserve").Request(userId) if err != nil { return false, err } @@ -28,21 +28,21 @@ func (u *userSession) AddTicket(ctx restate.ObjectContext, ticketId string) (boo } // add ticket to list of tickets - tickets, err := restate.GetAs[[]string](ctx, "tickets") + tickets, err := restate.Get[[]string](ctx, "tickets") if err != nil { return false, err } tickets = append(tickets, ticketId) - ctx.Set("tickets", tickets) - ctx.Object(UserSessionServiceName, userId, "ExpireTicket").Send(ticketId, restate.WithDelay(15*time.Minute)) + restate.Set(ctx, "tickets", tickets) + restate.ObjectSend(ctx, UserSessionServiceName, userId, "ExpireTicket").Send(ticketId, restate.WithDelay(15*time.Minute)) return true, nil } func (u *userSession) ExpireTicket(ctx restate.ObjectContext, ticketId string) (void restate.Void, err error) { - tickets, err := restate.GetAs[[]string](ctx, "tickets") + tickets, err := restate.Get[[]string](ctx, "tickets") if err != nil { return void, err } @@ -59,15 +59,15 @@ func (u *userSession) ExpireTicket(ctx restate.ObjectContext, ticketId string) ( return void, nil } - ctx.Set("tickets", tickets) - ctx.Object(TicketServiceName, ticketId, "Unreserve").Send(restate.Void{}) + restate.Set(ctx, "tickets", tickets) + restate.ObjectSend(ctx, TicketServiceName, ticketId, "Unreserve").Send(restate.Void{}) return void, nil } func (u *userSession) Checkout(ctx restate.ObjectContext, _ restate.Void) (bool, error) { - userId := ctx.Key() - tickets, err := restate.GetAs[[]string](ctx, "tickets") + userId := restate.Key(ctx) + tickets, err := restate.Get[[]string](ctx, "tickets") if err != nil { return false, err } @@ -78,13 +78,13 @@ func (u *userSession) Checkout(ctx restate.ObjectContext, _ restate.Void) (bool, return false, nil } - timeout := ctx.After(time.Minute) + timeout := restate.After(ctx, time.Minute) - request := restate.CallAs[PaymentResponse](ctx.Object(CheckoutServiceName, "", "Payment")). + request := restate.Object[PaymentResponse](ctx, CheckoutServiceName, "", "Payment"). RequestFuture(PaymentRequest{UserID: userId, Tickets: tickets}) // race between the request and the timeout - switch ctx.Select(timeout, request).Select() { + switch restate.Select(ctx, timeout, request).Select() { case request: // happy path case timeout: @@ -101,9 +101,9 @@ func (u *userSession) Checkout(ctx restate.ObjectContext, _ restate.Void) (bool, ctx.Log().Info("payment details", "id", response.ID, "price", response.Price) for _, ticket := range tickets { - ctx.Object(TicketServiceName, ticket, "MarkAsSold").Send(restate.Void{}) + restate.ObjectSend(ctx, TicketServiceName, ticket, "MarkAsSold").Send(restate.Void{}) } - ctx.Clear("tickets") + restate.Clear(ctx, "tickets") return true, nil } diff --git a/facilitators.go b/facilitators.go index 4bb3815..9e9099f 100644 --- a/facilitators.go +++ b/facilitators.go @@ -2,90 +2,85 @@ package restate import ( "errors" + "time" + "github.com/restatedev/sdk-go/interfaces" + "github.com/restatedev/sdk-go/internal/futures" "github.com/restatedev/sdk-go/internal/options" + "github.com/restatedev/sdk-go/internal/rand" ) -// GetAs gets the value for a key, returning a typed response instead of accepting a pointer. -// If there is no associated value with key, the zero value is returned - to check explicitly for this case use ctx.Get directly -// or pass a pointer eg *string as T. -// If the invocation was cancelled while obtaining the state (only possible if eager state is disabled), -// a cancellation error is returned. -func GetAs[T any](ctx ObjectSharedContext, key string, options ...options.GetOption) (output T, err error) { - if err := ctx.Get(key, &output, options...); !errors.Is(err, ErrKeyNotFound) { - return output, err - } else { - return output, nil - } +// Rand returns a random source which will give deterministic results for a given invocation +// The source wraps the stdlib rand.Rand but with some extra helper methods +// This source is not safe for use inside .Run() +func Rand(ctx Context) *rand.Rand { + return ctx.Rand() } -// RunAs executes a Run function on a [Context], returning a typed response instead of accepting a pointer -func RunAs[T any](ctx Context, fn func(ctx RunContext) (T, error), options ...options.RunOption) (output T, err error) { - err = ctx.Run(func(ctx RunContext) (any, error) { - return fn(ctx) - }, &output, options...) +// Sleep for the duration d. Can return a terminal error in the case where the invocation was cancelled mid-sleep. +func Sleep(ctx Context, d time.Duration) error { + return ctx.Sleep(d) +} - return +// After is an alternative to [Sleep] which allows you to complete other tasks concurrently +// with the sleep. This is particularly useful when combined with [Select] to race between +// the sleep and other Selectable operations. +func After(ctx Context, d time.Duration) interfaces.After { + return ctx.After(d) } -// TypedAwakeable is an extension of [Awakeable] which returns typed responses instead of accepting a pointer -type TypedAwakeable[T any] interface { - // Id returns the awakeable ID, which can be stored or sent to a another service - Id() string - // Result blocks on receiving the result of the awakeable, storing the value it was - // resolved with in output or otherwise returning the error it was rejected with. - // It is *not* safe to call this in a goroutine - use Context.Select if you - // want to wait on multiple results at once. - Result() (T, error) - Selectable +// Service gets a Service request client by service and method name +func Service[O any](ctx Context, service string, method string, options ...options.ClientOption) TypedClient[any, O] { + return typedClient[any, O]{ctx.Service(service, method, options...)} } -type typedAwakeable[T any] struct { - Awakeable +// Service gets a Service send client by service and method name +func ServiceSend(ctx Context, service string, method string, options ...options.ClientOption) interfaces.SendClient { + return ctx.Service(service, method, options...) } -func (t typedAwakeable[T]) Result() (output T, err error) { - err = t.Awakeable.Result(&output) - return +// Object gets an Object request client by service name, key and method name +func Object[O any](ctx Context, service string, key string, method string, options ...options.ClientOption) TypedClient[any, O] { + return typedClient[any, O]{ctx.Object(service, key, method, options...)} } -// AwakeableAs helper function to treat [Awakeable] results as a particular type. -func AwakeableAs[T any](ctx Context, options ...options.AwakeableOption) TypedAwakeable[T] { - return typedAwakeable[T]{ctx.Awakeable(options...)} +// ObjectSend gets an Object send client by service name, key and method name +func ObjectSend(ctx Context, service string, key string, method string, options ...options.ClientOption) interfaces.SendClient { + return ctx.Object(service, key, method, options...) } -// TypedCallClient is an extension of [CallClient] which deals in typed values -type TypedCallClient[I any, O any] interface { +// TypedClient is an extension of [interfaces.Client] and [interfaces.SendClient] which deals in typed values +type TypedClient[I any, O any] interface { // RequestFuture makes a call and returns a handle on a future response - RequestFuture(input I, opts ...options.RequestOption) TypedResponseFuture[O] + RequestFuture(input I, options ...options.RequestOption) TypedResponseFuture[O] // Request makes a call and blocks on getting the response - Request(input I, opts ...options.RequestOption) (O, error) + Request(input I, options ...options.RequestOption) (O, error) // Send makes a one-way call which is executed in the background - Send(input I, opts ...options.SendOption) + Send(input I, options ...options.SendOption) } -type typedCallClient[I any, O any] struct { - inner CallClient +type typedClient[I any, O any] struct { + inner interfaces.Client } -// NewTypedCallClient is primarily intended to be called from generated code, to provide +// NewTypedClient is primarily intended to be called from generated code, to provide // type safety of input types. In other contexts it's generally less cumbersome to use [CallAs], // as the output type can be inferred. -func NewTypedCallClient[I any, O any](client CallClient) TypedCallClient[I, O] { - return typedCallClient[I, O]{client} +func NewTypedClient[I any, O any](client interfaces.Client) TypedClient[I, O] { + return typedClient[I, O]{client} } -func (t typedCallClient[I, O]) Request(input I, opts ...options.RequestOption) (output O, err error) { - err = t.inner.RequestFuture(input, opts...).Response(&output) +func (t typedClient[I, O]) Request(input I, options ...options.RequestOption) (output O, err error) { + err = t.inner.RequestFuture(input, options...).Response(&output) return } -func (t typedCallClient[I, O]) RequestFuture(input I, opts ...options.RequestOption) TypedResponseFuture[O] { - return typedResponseFuture[O]{t.inner.RequestFuture(input, opts...)} +func (t typedClient[I, O]) RequestFuture(input I, options ...options.RequestOption) TypedResponseFuture[O] { + return typedResponseFuture[O]{t.inner.RequestFuture(input, options...)} } -func (t typedCallClient[I, O]) Send(input I, opts ...options.SendOption) { - t.inner.Send(input, opts...) +func (t typedClient[I, O]) Send(input I, options ...options.SendOption) { + t.inner.Send(input, options...) } // TypedResponseFuture is an extension of [ResponseFuture] which returns typed responses instead of accepting a pointer @@ -94,11 +89,11 @@ type TypedResponseFuture[O any] interface { // It is *not* safe to call this in a goroutine - use Context.Select if you // want to wait on multiple results at once. Response() (O, error) - Selectable + futures.Selectable } type typedResponseFuture[O any] struct { - ResponseFuture + interfaces.ResponseFuture } func (t typedResponseFuture[O]) Response() (output O, err error) { @@ -106,7 +101,97 @@ func (t typedResponseFuture[O]) Response() (output O, err error) { return } -// CallAs helper function to get typed responses from a [CallClient] instead of passing in a pointer -func CallAs[O any](client CallClient) TypedCallClient[any, O] { - return typedCallClient[any, O]{client} +// Awakeable returns a Restate awakeable; a 'promise' to a future +// value or error, that can be resolved or rejected by other services. +func Awakeable[T any](ctx Context, options ...options.AwakeableOption) TypedAwakeable[T] { + return typedAwakeable[T]{ctx.Awakeable(options...)} +} + +// TypedAwakeable is an extension of [Awakeable] which returns typed responses instead of accepting a pointer +type TypedAwakeable[T any] interface { + // Id returns the awakeable ID, which can be stored or sent to a another service + Id() string + // Result blocks on receiving the result of the awakeable, storing the value it was + // resolved with in output or otherwise returning the error it was rejected with. + // It is *not* safe to call this in a goroutine - use Context.Select if you + // want to wait on multiple results at once. + Result() (T, error) + futures.Selectable +} + +type typedAwakeable[T any] struct { + interfaces.Awakeable +} + +func (t typedAwakeable[T]) Result() (output T, err error) { + err = t.Awakeable.Result(&output) + return +} + +// ResolveAwakeable allows an awakeable (not necessarily from this service) to be +// resolved with a particular value. +func ResolveAwakeable[T any](ctx Context, id string, value T, options ...options.ResolveAwakeableOption) { + ctx.ResolveAwakeable(id, value, options...) +} + +// ResolveAwakeable allows an awakeable (not necessarily from this service) to be +// rejected with a particular error. +func RejectAwakeable[T any](ctx Context, id string, reason error) { + ctx.RejectAwakeable(id, reason) +} + +func Select(ctx Context, futs ...interfaces.Selectable) interfaces.Selector { + return ctx.Select(futs...) +} + +// Run runs the function (fn), storing final results (including terminal errors) +// durably in the journal, or otherwise for transient errors stopping execution +// so Restate can retry the invocation. Replays will produce the same value, so +// all non-deterministic operations (eg, generating a unique ID) *must* happen +// inside Run blocks. +func Run[T any](ctx Context, fn func(ctx RunContext) (T, error), options ...options.RunOption) (output T, err error) { + err = ctx.Run(func(ctx RunContext) (any, error) { + return fn(ctx) + }, &output, options...) + + return +} + +// Get gets the value for a key. If there is no associated value with key, the zero value is returned. +// To check explicitly for this case use ctx.Get directly or pass a pointer eg *string as T. +// If the invocation was cancelled while obtaining the state (only possible if eager state is disabled), +// a cancellation error is returned. +func Get[T any](ctx KeyValueReader, key string, options ...options.GetOption) (output T, err error) { + if err := ctx.Get(key, &output, options...); !errors.Is(err, ErrKeyNotFound) { + return output, err + } else { + return output, nil + } +} + +// If the invocation was cancelled while obtaining the state (only possible if eager state is disabled), +// a cancellation error is returned. +func Keys(ctx KeyValueReader) ([]string, error) { + return ctx.Keys() +} + +// Key retrieves the key for this virtual object invocation. This is a no-op and is +// always safe to call. +func Key(ctx KeyValueReader) string { + return ctx.Key() +} + +// Set sets a value against a key, using the provided codec (defaults to JSON) +func Set[T any](ctx KeyValueWriter, key string, value T, options ...options.SetOption) { + ctx.Set(key, value, options...) +} + +// Clear deletes a key +func Clear(ctx KeyValueWriter, key string) { + ctx.Clear(key) +} + +// ClearAll drops all stored state associated with this Object key +func ClearAll(ctx KeyValueWriter) { + ctx.ClearAll() } diff --git a/interfaces/interfaces.go b/interfaces/interfaces.go new file mode 100644 index 0000000..3046298 --- /dev/null +++ b/interfaces/interfaces.go @@ -0,0 +1,67 @@ +package interfaces + +import ( + "github.com/restatedev/sdk-go/internal/futures" + "github.com/restatedev/sdk-go/internal/options" +) + +type Selectable = futures.Selectable + +// After is a handle on a Sleep operation which allows you to do other work concurrently +// with the sleep. +type After interface { + // Done blocks waiting on the remaining duration of the sleep. + // It is *not* safe to call this in a goroutine - use Context.Select if you want to wait on multiple + // results at once. Can return a terminal error in the case where the invocation was cancelled mid-sleep, + // hence Done() should always be called, even after using Context.Select. + Done() error + Selectable +} + +// Awakeable is the Go representation of a Restate awakeable; a 'promise' to a future +// value or error, that can be resolved or rejected by other services. +type Awakeable interface { + // Id returns the awakeable ID, which can be stored or sent to a another service + Id() string + // Result blocks on receiving the result of the awakeable, storing the value it was + // resolved with in output or otherwise returning the error it was rejected with. + // It is *not* safe to call this in a goroutine - use Context.Select if you + // want to wait on multiple results at once. + // Note: use the AwakeableAs helper function to avoid having to pass a output pointer + Result(output any) error + Selectable +} + +// Client represents all the different ways you can invoke a particular service/key/method tuple. +type Client interface { + // RequestFuture makes a call and returns a handle on a future response + RequestFuture(input any, options ...options.RequestOption) ResponseFuture + // Request makes a call and blocks on getting the response which is stored in output + Request(input any, output any, options ...options.RequestOption) error + SendClient +} + +// ResponseFuture is a handle on a potentially not-yet completed outbound call. +type ResponseFuture interface { + // Response blocks on the response to the call and stores it in output, or returns the associated error + // It is *not* safe to call this in a goroutine - use Context.Select if you + // want to wait on multiple results at once. + Response(output any) error + Selectable +} + +type SendClient interface { + // Send makes a one-way call which is executed in the background + Send(input any, options ...options.SendOption) +} + +// Selector is an iterator over a list of blocking Restate operations that are running +// in the background. +type Selector interface { + // Remaining returns whether there are still operations that haven't been returned by Select(). + // There will always be exactly the same number of results as there were operations + // given to Context.Select + Remaining() bool + // Select blocks on the next completed operation or returns nil if there are none left + Select() Selectable +} diff --git a/internal/state/call.go b/internal/state/call.go index 7ed424c..c7e8bbb 100644 --- a/internal/state/call.go +++ b/internal/state/call.go @@ -10,6 +10,7 @@ import ( restate "github.com/restatedev/sdk-go" "github.com/restatedev/sdk-go/encoding" protocol "github.com/restatedev/sdk-go/generated/dev/restate/service" + "github.com/restatedev/sdk-go/interfaces" "github.com/restatedev/sdk-go/internal/futures" "github.com/restatedev/sdk-go/internal/options" "github.com/restatedev/sdk-go/internal/wire" @@ -24,7 +25,7 @@ type serviceCall struct { } // RequestFuture makes a call and returns a handle on the response -func (c *serviceCall) RequestFuture(input any, opts ...options.RequestOption) restate.ResponseFuture { +func (c *serviceCall) RequestFuture(input any, opts ...options.RequestOption) interfaces.ResponseFuture { o := options.RequestOptions{} for _, opt := range opts { opt.BeforeRequest(&o) diff --git a/internal/state/select.go b/internal/state/select.go index 58b038b..0cb60ff 100644 --- a/internal/state/select.go +++ b/internal/state/select.go @@ -3,7 +3,6 @@ package state import ( "slices" - restate "github.com/restatedev/sdk-go" _go "github.com/restatedev/sdk-go/generated/dev/restate/sdk/go" "github.com/restatedev/sdk-go/internal/futures" "github.com/restatedev/sdk-go/internal/wire" @@ -14,12 +13,12 @@ type selector struct { inner *futures.Selector } -func (m *Machine) selector(futs ...restate.Selectable) *selector { +func (m *Machine) selector(futs ...futures.Selectable) *selector { inner := futures.Select(m.suspensionCtx, futs...) return &selector{m, inner} } -func (s *selector) Select() restate.Selectable { +func (s *selector) Select() futures.Selectable { entry, entryIndex := replayOrNew( s.machine, func(entry *wire.SelectorEntryMessage) *wire.SelectorEntryMessage { diff --git a/internal/state/state.go b/internal/state/state.go index 59dbf1d..003d5c3 100644 --- a/internal/state/state.go +++ b/internal/state/state.go @@ -15,6 +15,7 @@ import ( restate "github.com/restatedev/sdk-go" "github.com/restatedev/sdk-go/encoding" protocol "github.com/restatedev/sdk-go/generated/dev/restate/service" + "github.com/restatedev/sdk-go/interfaces" "github.com/restatedev/sdk-go/internal/errors" "github.com/restatedev/sdk-go/internal/futures" "github.com/restatedev/sdk-go/internal/log" @@ -113,11 +114,11 @@ func (c *Context) Sleep(d time.Duration) error { return c.machine.sleep(d) } -func (c *Context) After(d time.Duration) restate.After { +func (c *Context) After(d time.Duration) interfaces.After { return c.machine.after(d) } -func (c *Context) Service(service, method string, opts ...options.ClientOption) restate.CallClient { +func (c *Context) Service(service, method string, opts ...options.ClientOption) interfaces.Client { o := options.ClientOptions{} for _, opt := range opts { opt.BeforeClient(&o) @@ -134,7 +135,7 @@ func (c *Context) Service(service, method string, opts ...options.ClientOption) } } -func (c *Context) Object(service, key, method string, opts ...options.ClientOption) restate.CallClient { +func (c *Context) Object(service, key, method string, opts ...options.ClientOption) interfaces.Client { o := options.ClientOptions{} for _, opt := range opts { opt.BeforeClient(&o) @@ -193,7 +194,7 @@ type AwakeableOption interface { beforeAwakeable(*awakeableOptions) } -func (c *Context) Awakeable(opts ...options.AwakeableOption) restate.Awakeable { +func (c *Context) Awakeable(opts ...options.AwakeableOption) interfaces.Awakeable { o := options.AwakeableOptions{} for _, opt := range opts { opt.BeforeAwakeable(&o) @@ -241,7 +242,7 @@ func (c *Context) RejectAwakeable(id string, reason error) { c.machine.rejectAwakeable(id, reason) } -func (c *Context) Select(futs ...restate.Selectable) restate.Selector { +func (c *Context) Select(futs ...futures.Selectable) interfaces.Selector { return c.machine.selector(futs...) } diff --git a/internal/state/state_test.go b/internal/state/state_test.go index 57456ef..1a0d4d3 100644 --- a/internal/state/state_test.go +++ b/internal/state/state_test.go @@ -10,6 +10,7 @@ import ( restate "github.com/restatedev/sdk-go" protocol "github.com/restatedev/sdk-go/generated/dev/restate/service" + "github.com/restatedev/sdk-go/interfaces" "github.com/restatedev/sdk-go/internal/errors" "github.com/restatedev/sdk-go/internal/wire" "github.com/stretchr/testify/require" @@ -49,7 +50,7 @@ func TestRequestClosed(t *testing.T) { close(tp.input) // writing out journal entries still works - this shouldnt panic - after := ctx.After(time.Minute) + after := restate.After(ctx, time.Minute) ctxErr = ctx.Err() @@ -100,26 +101,26 @@ func TestResponseClosed(t *testing.T) { { name: "awakeable should lead to client gone away panic", afterCancel: func(ctx restate.Context, _ any) { - ctx.Awakeable() + restate.Awakeable[restate.Void](ctx) }, expectedPanic: &clientGoneAway{}, }, { name: "starting run should lead to client gone away panic", afterCancel: func(ctx restate.Context, _ any) { - ctx.Run(func(ctx restate.RunContext) (any, error) { + restate.Run(ctx, func(ctx restate.RunContext) (restate.Void, error) { panic("run should not be executed") - }, restate.Void{}) + }) }, expectedPanic: &clientGoneAway{}, }, { name: "awaiting sleep should lead to suspension panic", beforeCancel: func(ctx restate.Context) any { - return ctx.After(time.Minute) + return restate.After(ctx, time.Minute) }, afterCancel: func(ctx restate.Context, setupState any) { - setupState.(restate.After).Done() + setupState.(interfaces.After).Done() }, producedEntries: 1, expectedPanic: &wire.SuspensionPanic{}, @@ -187,13 +188,13 @@ func TestInFlightRunDisconnect(t *testing.T) { } }() - _ = ctx.Run(func(ctx restate.RunContext) (any, error) { + _, _ = restate.Run(ctx, func(ctx restate.RunContext) (restate.Void, error) { beforeCancelErr = ctx.Err() tp.cancel() afterCancelErr = ctx.Err() - return nil, nil - }, restate.Void{}) + return restate.Void{}, nil + }) }() return restate.Void{}, nil @@ -229,13 +230,13 @@ func TestInFlightRunSuspension(t *testing.T) { } }() - _ = ctx.Run(func(ctx restate.RunContext) (any, error) { + _, _ = restate.Run(ctx, func(ctx restate.RunContext) (restate.Void, error) { beforeCancelErr = ctx.Err() close(tp.input) afterCancelErr = ctx.Err() - return nil, nil - }, restate.Void{}) + return restate.Void{}, nil + }) }() return restate.Void{}, nil @@ -269,22 +270,24 @@ func TestInvocationCanceled(t *testing.T) { { name: "awakeable should return canceled error", fn: func(ctx restate.Context) error { - awakeable := ctx.Awakeable() - return awakeable.Result(restate.Void{}) + awakeable := restate.Awakeable[restate.Void](ctx) + _, err := awakeable.Result() + return err }, }, { name: "sleep should return canceled error", fn: func(ctx restate.Context) error { - after := ctx.After(time.Minute) + after := restate.After(ctx, time.Minute) return after.Done() }, }, { name: "call should return cancelled error", fn: func(ctx restate.Context) error { - fut := ctx.Service("foo", "bar").RequestFuture(restate.Void{}) - return fut.Response(restate.Void{}) + fut := restate.Service[restate.Void](ctx, "foo", "bar").RequestFuture(restate.Void{}) + _, err := fut.Response() + return err }, }, } diff --git a/test-services/awakeableholder.go b/test-services/awakeableholder.go index 838a99b..c677d31 100644 --- a/test-services/awakeableholder.go +++ b/test-services/awakeableholder.go @@ -13,12 +13,12 @@ func init() { restate.NewObject("AwakeableHolder"). Handler("hold", restate.NewObjectHandler( func(ctx restate.ObjectContext, id string) (restate.Void, error) { - ctx.Set(ID_KEY, id) + restate.Set(ctx, ID_KEY, id) return restate.Void{}, nil })). Handler("hasAwakeable", restate.NewObjectHandler( func(ctx restate.ObjectContext, _ restate.Void) (bool, error) { - _, err := restate.GetAs[string](ctx, ID_KEY) + _, err := restate.Get[string](ctx, ID_KEY) if err != nil { return false, err } @@ -26,14 +26,14 @@ func init() { })). Handler("unlock", restate.NewObjectHandler( func(ctx restate.ObjectContext, payload string) (restate.Void, error) { - id, err := restate.GetAs[string](ctx, ID_KEY) + id, err := restate.Get[string](ctx, ID_KEY) if err != nil { return restate.Void{}, err } if id == "" { return restate.Void{}, restate.TerminalError(fmt.Errorf("No awakeable registered"), 404) } - ctx.ResolveAwakeable(id, payload) + restate.ResolveAwakeable(ctx, id, payload) return restate.Void{}, nil }))) } diff --git a/test-services/canceltest.go b/test-services/canceltest.go index 9da4b6d..b9d9f3c 100644 --- a/test-services/canceltest.go +++ b/test-services/canceltest.go @@ -22,9 +22,9 @@ func init() { restate.NewObject("CancelTestRunner"). Handler("startTest", restate.NewObjectHandler( func(ctx restate.ObjectContext, operation BlockingOperation) (restate.Void, error) { - if err := ctx.Object("CancelTestBlockingService", "", "block").Request(operation, restate.Void{}); err != nil { + if _, err := restate.Object[restate.Void](ctx, "CancelTestBlockingService", "", "block").Request(operation); err != nil { if restate.ErrorCode(err) == 409 { - ctx.Set(CANCELED_STATE, true) + restate.Set(ctx, CANCELED_STATE, true) return restate.Void{}, nil } return restate.Void{}, err @@ -33,26 +33,26 @@ func init() { })). Handler("verifyTest", restate.NewObjectHandler( func(ctx restate.ObjectContext, _ restate.Void) (bool, error) { - return restate.GetAs[bool](ctx, CANCELED_STATE) + return restate.Get[bool](ctx, CANCELED_STATE) }))) REGISTRY.AddDefinition( restate.NewObject("CancelTestBlockingService"). Handler("block", restate.NewObjectHandler( func(ctx restate.ObjectContext, operation BlockingOperation) (restate.Void, error) { - awakeable := ctx.Awakeable() - if err := ctx.Object("AwakeableHolder", "cancel", "hold").Request(awakeable.Id(), restate.Void{}); err != nil { + awakeable := restate.Awakeable[restate.Void](ctx) + if _, err := restate.Object[restate.Void](ctx, "AwakeableHolder", "cancel", "hold").Request(awakeable.Id()); err != nil { return restate.Void{}, err } - if err := awakeable.Result(restate.Void{}); err != nil { + if _, err := awakeable.Result(); err != nil { return restate.Void{}, err } switch operation { case CALL: - return restate.Void{}, ctx.Object("CancelTestBlockingService", "", "block").Request(operation, restate.Void{}) + return restate.Object[restate.Void](ctx, "CancelTestBlockingService", "", "block").Request(operation) case SLEEP: - return restate.Void{}, ctx.Sleep(1024 * time.Hour * 24) + return restate.Void{}, restate.Sleep(ctx, 1024*time.Hour*24) case AWAKEABLE: - return restate.Void{}, ctx.Awakeable().Result(restate.Void{}) + return restate.Awakeable[restate.Void](ctx).Result() default: return restate.Void{}, restate.TerminalError(fmt.Errorf("unexpected operation %s", operation), 400) } diff --git a/test-services/counter.go b/test-services/counter.go index f064e2b..f429de3 100644 --- a/test-services/counter.go +++ b/test-services/counter.go @@ -16,22 +16,22 @@ func init() { restate.NewObject("Counter"). Handler("reset", restate.NewObjectHandler( func(ctx restate.ObjectContext, _ restate.Void) (restate.Void, error) { - ctx.Clear(COUNTER_KEY) + restate.Clear(ctx, COUNTER_KEY) return restate.Void{}, nil })). Handler("get", restate.NewObjectSharedHandler( func(ctx restate.ObjectSharedContext, _ restate.Void) (int64, error) { - return restate.GetAs[int64](ctx, COUNTER_KEY) + return restate.Get[int64](ctx, COUNTER_KEY) })). Handler("add", restate.NewObjectHandler( func(ctx restate.ObjectContext, addend int64) (CounterUpdateResponse, error) { - oldValue, err := restate.GetAs[int64](ctx, COUNTER_KEY) + oldValue, err := restate.Get[int64](ctx, COUNTER_KEY) if err != nil { return CounterUpdateResponse{}, err } newValue := oldValue + addend - ctx.Set(COUNTER_KEY, newValue) + restate.Set(ctx, COUNTER_KEY, newValue) return CounterUpdateResponse{ OldValue: oldValue, @@ -40,14 +40,14 @@ func init() { })). Handler("addThenFail", restate.NewObjectHandler( func(ctx restate.ObjectContext, addend int64) (restate.Void, error) { - oldValue, err := restate.GetAs[int64](ctx, COUNTER_KEY) + oldValue, err := restate.Get[int64](ctx, COUNTER_KEY) if err != nil { return restate.Void{}, err } newValue := oldValue + addend - ctx.Set(COUNTER_KEY, newValue) + restate.Set(ctx, COUNTER_KEY, newValue) - return restate.Void{}, restate.TerminalErrorf("%s", ctx.Key()) + return restate.Void{}, restate.TerminalErrorf("%s", restate.Key(ctx)) }))) } diff --git a/test-services/failing.go b/test-services/failing.go index 43d829d..aa39580 100644 --- a/test-services/failing.go +++ b/test-services/failing.go @@ -19,7 +19,7 @@ func init() { })). Handler("callTerminallyFailingCall", restate.NewObjectHandler( func(ctx restate.ObjectContext, errorMessage string) (string, error) { - if err := ctx.Object("Failing", ctx.Rand().UUID().String(), "terminallyFailingCall").Request(errorMessage, restate.Void{}); err != nil { + if _, err := restate.Object[restate.Void](ctx, "Failing", restate.Rand(ctx).UUID().String(), "terminallyFailingCall").Request(errorMessage); err != nil { return "", err } @@ -37,7 +37,7 @@ func init() { })). Handler("failingSideEffectWithEventualSuccess", restate.NewObjectHandler( func(ctx restate.ObjectContext, _ restate.Void) (int32, error) { - return restate.RunAs(ctx, func(ctx restate.RunContext) (int32, error) { + return restate.Run(ctx, func(ctx restate.RunContext) (int32, error) { currentAttempt := eventualSuccessCalls.Add(1) if currentAttempt >= 4 { eventualSuccessSideEffectCalls.Store(0) @@ -49,7 +49,7 @@ func init() { })). Handler("terminallyFailingSideEffect", restate.NewObjectHandler( func(ctx restate.ObjectContext, errorMessage string) (restate.Void, error) { - return restate.RunAs(ctx, func(ctx restate.RunContext) (restate.Void, error) { + return restate.Run(ctx, func(ctx restate.RunContext) (restate.Void, error) { return restate.Void{}, restate.TerminalErrorf(errorMessage) }) }))) diff --git a/test-services/kill.go b/test-services/kill.go index 18255f9..da8dd43 100644 --- a/test-services/kill.go +++ b/test-services/kill.go @@ -6,20 +6,20 @@ import ( func init() { REGISTRY.AddDefinition(restate.NewService("KillTestRunner").Handler("startCallTree", restate.NewServiceHandler(func(ctx restate.Context, _ restate.Void) (restate.Void, error) { - return restate.Void{}, ctx.Object("KillTestSingleton", "", "recursiveCall").Request(restate.Void{}, restate.Void{}) + return restate.Object[restate.Void](ctx, "KillTestSingleton", "", "recursiveCall").Request(restate.Void{}) }))) REGISTRY.AddDefinition( restate.NewObject("KillTestSingleton"). Handler("recursiveCall", restate.NewObjectHandler( func(ctx restate.ObjectContext, _ restate.Void) (restate.Void, error) { - awakeable := ctx.Awakeable() - ctx.Object("AwakeableHolder", "kill", "hold").Send(awakeable.Id()) - if err := awakeable.Result(restate.Void{}); err != nil { + awakeable := restate.Awakeable[restate.Void](ctx) + restate.ObjectSend(ctx, "AwakeableHolder", "kill", "hold").Send(awakeable.Id()) + if _, err := awakeable.Result(); err != nil { return restate.Void{}, err } - return restate.CallAs[restate.Void](ctx.Object("KillTestSingleton", "", "recursiveCall")).Request(restate.Void{}) + return restate.Object[restate.Void](ctx, "KillTestSingleton", "", "recursiveCall").Request(restate.Void{}) })). Handler("isUnlocked", restate.NewObjectHandler( func(ctx restate.ObjectContext, _ restate.Void) (restate.Void, error) { diff --git a/test-services/listobject.go b/test-services/listobject.go index 7403f12..288e2d2 100644 --- a/test-services/listobject.go +++ b/test-services/listobject.go @@ -11,17 +11,17 @@ func init() { restate.NewObject("ListObject"). Handler("append", restate.NewObjectHandler( func(ctx restate.ObjectContext, value string) (restate.Void, error) { - list, err := restate.GetAs[[]string](ctx, LIST_KEY) + list, err := restate.Get[[]string](ctx, LIST_KEY) if err != nil { return restate.Void{}, err } list = append(list, value) - ctx.Set(LIST_KEY, list) + restate.Set(ctx, LIST_KEY, list) return restate.Void{}, nil })). Handler("get", restate.NewObjectHandler( func(ctx restate.ObjectContext, _ restate.Void) ([]string, error) { - list, err := restate.GetAs[[]string](ctx, LIST_KEY) + list, err := restate.Get[[]string](ctx, LIST_KEY) if err != nil { return nil, err } @@ -34,7 +34,7 @@ func init() { })). Handler("clear", restate.NewObjectHandler( func(ctx restate.ObjectContext, _ restate.Void) ([]string, error) { - list, err := restate.GetAs[[]string](ctx, LIST_KEY) + list, err := restate.Get[[]string](ctx, LIST_KEY) if err != nil { return nil, err } @@ -42,7 +42,7 @@ func init() { // or go would encode this as JSON null list = []string{} } - ctx.Clear(LIST_KEY) + restate.Clear(ctx, LIST_KEY) return list, nil }))) } diff --git a/test-services/mapobject.go b/test-services/mapobject.go index 9d46afe..d0d6859 100644 --- a/test-services/mapobject.go +++ b/test-services/mapobject.go @@ -14,28 +14,28 @@ func init() { restate.NewObject("MapObject"). Handler("set", restate.NewObjectHandler( func(ctx restate.ObjectContext, value Entry) (restate.Void, error) { - ctx.Set(value.Key, value.Value) + restate.Set(ctx, value.Key, value.Value) return restate.Void{}, nil })). Handler("get", restate.NewObjectHandler( func(ctx restate.ObjectContext, key string) (string, error) { - return restate.GetAs[string](ctx, key) + return restate.Get[string](ctx, key) })). Handler("clearAll", restate.NewObjectHandler( func(ctx restate.ObjectContext, _ restate.Void) ([]Entry, error) { - keys, err := ctx.Keys() + keys, err := restate.Keys(ctx) if err != nil { return nil, err } out := make([]Entry, 0, len(keys)) for _, k := range keys { - value, err := restate.GetAs[string](ctx, k) + value, err := restate.Get[string](ctx, k) if err != nil { return nil, err } out = append(out, Entry{Key: k, Value: value}) } - ctx.ClearAll() + restate.ClearAll(ctx) return out, nil }))) } diff --git a/test-services/nondeterministic.go b/test-services/nondeterministic.go index b3a88a5..2f23192 100644 --- a/test-services/nondeterministic.go +++ b/test-services/nondeterministic.go @@ -15,7 +15,7 @@ func init() { invocationCountsMtx := sync.RWMutex{} doLeftAction := func(ctx restate.ObjectContext) bool { - countKey := ctx.Key() + countKey := restate.Key(ctx) invocationCountsMtx.Lock() defer invocationCountsMtx.Unlock() @@ -23,7 +23,7 @@ func init() { return invocationCounts[countKey]%2 == 1 } incrementCounter := func(ctx restate.ObjectContext) { - ctx.Object("Counter", ctx.Key(), "add").Send(int64(1)) + restate.ObjectSend(ctx, "Counter", restate.Key(ctx), "add").Send(int64(1)) } REGISTRY.AddDefinition( @@ -31,58 +31,58 @@ func init() { Handler("eitherSleepOrCall", restate.NewObjectHandler( func(ctx restate.ObjectContext, _ restate.Void) (restate.Void, error) { if doLeftAction(ctx) { - ctx.Sleep(100 * time.Millisecond) + restate.Sleep(ctx, 100*time.Millisecond) } else { - if err := ctx.Object("Counter", "abc", "get").Request(restate.Void{}, restate.Void{}); err != nil { + if _, err := restate.Object[restate.Void](ctx, "Counter", "abc", "get").Request(restate.Void{}); err != nil { return restate.Void{}, err } } // This is required to cause a suspension after the non-deterministic operation - ctx.Sleep(100 * time.Millisecond) + restate.Sleep(ctx, 100*time.Millisecond) incrementCounter(ctx) return restate.Void{}, nil })). Handler("callDifferentMethod", restate.NewObjectHandler( func(ctx restate.ObjectContext, _ restate.Void) (restate.Void, error) { if doLeftAction(ctx) { - if err := ctx.Object("Counter", "abc", "get").Request(restate.Void{}, restate.Void{}); err != nil { + if _, err := restate.Object[restate.Void](ctx, "Counter", "abc", "get").Request(restate.Void{}); err != nil { return restate.Void{}, err } } else { - if err := ctx.Object("Counter", "abc", "reset").Request(restate.Void{}, restate.Void{}); err != nil { + if _, err := restate.Object[restate.Void](ctx, "Counter", "abc", "reset").Request(restate.Void{}); err != nil { return restate.Void{}, err } } // This is required to cause a suspension after the non-deterministic operation - ctx.Sleep(100 * time.Millisecond) + restate.Sleep(ctx, 100*time.Millisecond) incrementCounter(ctx) return restate.Void{}, nil })). Handler("backgroundInvokeWithDifferentTargets", restate.NewObjectHandler( func(ctx restate.ObjectContext, _ restate.Void) (restate.Void, error) { if doLeftAction(ctx) { - ctx.Object("Counter", "abc", "get").Send(restate.Void{}) + restate.ObjectSend(ctx, "Counter", "abc", "get").Send(restate.Void{}) } else { - ctx.Object("Counter", "abc", "reset").Send(restate.Void{}) + restate.ObjectSend(ctx, "Counter", "abc", "reset").Send(restate.Void{}) } // This is required to cause a suspension after the non-deterministic operation - ctx.Sleep(100 * time.Millisecond) + restate.Sleep(ctx, 100*time.Millisecond) incrementCounter(ctx) return restate.Void{}, nil })). Handler("setDifferentKey", restate.NewObjectHandler( func(ctx restate.ObjectContext, _ restate.Void) (restate.Void, error) { if doLeftAction(ctx) { - ctx.Set(STATE_A, "my-state") + restate.Set(ctx, STATE_A, "my-state") } else { - ctx.Set(STATE_B, "my-state") + restate.Set(ctx, STATE_B, "my-state") } // This is required to cause a suspension after the non-deterministic operation - ctx.Sleep(100 * time.Millisecond) + restate.Sleep(ctx, 100*time.Millisecond) incrementCounter(ctx) return restate.Void{}, nil }))) diff --git a/test-services/proxy.go b/test-services/proxy.go index 77c90d9..c38497a 100644 --- a/test-services/proxy.go +++ b/test-services/proxy.go @@ -2,6 +2,7 @@ package main import ( restate "github.com/restatedev/sdk-go" + "github.com/restatedev/sdk-go/interfaces" ) type ProxyRequest struct { @@ -12,15 +13,15 @@ type ProxyRequest struct { Message []int `json:"message"` } -func (req *ProxyRequest) ToTarget(ctx restate.Context) restate.TypedCallClient[[]byte, []byte] { +func (req *ProxyRequest) ToTarget(ctx restate.Context) restate.TypedClient[[]byte, []byte] { if req.VirtualObjectKey != nil { - return restate.NewTypedCallClient[[]byte, []byte](ctx.Object( + return restate.NewTypedClient[[]byte, []byte](ctx.Object( req.ServiceName, *req.VirtualObjectKey, req.HandlerName, restate.WithBinary)) } else { - return restate.NewTypedCallClient[[]byte, []byte](ctx.Service( + return restate.NewTypedClient[[]byte, []byte](ctx.Service( req.ServiceName, req.HandlerName, restate.WithBinary)) @@ -53,7 +54,7 @@ func init() { Handler("manyCalls", restate.NewServiceHandler( // We need to use []int because Golang takes the opinionated choice of treating []byte as Base64 func(ctx restate.Context, requests []ManyCallRequest) (restate.Void, error) { - var toAwait []restate.Selectable + var toAwait []interfaces.Selectable for _, req := range requests { input := intArrayToByteArray(req.ProxyRequest.Message) @@ -67,7 +68,7 @@ func init() { } } - selector := ctx.Select(toAwait...) + selector := restate.Select(ctx, toAwait...) for selector.Remaining() { result := selector.Select() if _, err := result.(restate.TypedResponseFuture[[]byte]).Response(); err != nil { diff --git a/test-services/testutils.go b/test-services/testutils.go index 7156706..1fe1481 100644 --- a/test-services/testutils.go +++ b/test-services/testutils.go @@ -6,6 +6,7 @@ import ( "time" restate "github.com/restatedev/sdk-go" + "github.com/restatedev/sdk-go/interfaces" ) type CreateAwakeableAndAwaitItRequest struct { @@ -38,8 +39,8 @@ func init() { })). Handler("createAwakeableAndAwaitIt", restate.NewServiceHandler( func(ctx restate.Context, req CreateAwakeableAndAwaitItRequest) (CreateAwakeableAndAwaitItResponse, error) { - awakeable := restate.AwakeableAs[string](ctx) - if err := ctx.Object("AwakeableHolder", req.AwakeableKey, "hold").Request(awakeable.Id(), restate.Void{}); err != nil { + awakeable := restate.Awakeable[string](ctx) + if _, err := restate.Object[restate.Void](ctx, "AwakeableHolder", req.AwakeableKey, "hold").Request(awakeable.Id()); err != nil { return CreateAwakeableAndAwaitItResponse{}, err } @@ -54,8 +55,8 @@ func init() { }, nil } - timeout := ctx.After(time.Duration(*req.AwaitTimeout) * time.Millisecond) - selector := ctx.Select(timeout, awakeable) + timeout := restate.After(ctx, time.Duration(*req.AwaitTimeout)*time.Millisecond) + selector := restate.Select(ctx, timeout, awakeable) switch selector.Select() { case timeout: return CreateAwakeableAndAwaitItResponse{Type: "timeout"}, nil @@ -71,11 +72,11 @@ func init() { })). Handler("sleepConcurrently", restate.NewServiceHandler( func(ctx restate.Context, millisDuration []int64) (restate.Void, error) { - timers := make([]restate.Selectable, 0, len(millisDuration)) + timers := make([]interfaces.Selectable, 0, len(millisDuration)) for _, d := range millisDuration { - timers = append(timers, ctx.After(time.Duration(d)*time.Millisecond)) + timers = append(timers, restate.After(ctx, time.Duration(d)*time.Millisecond)) } - selector := ctx.Select(timers...) + selector := restate.Select(ctx, timers...) i := 0 for selector.Remaining() { _ = selector.Select() @@ -90,7 +91,7 @@ func init() { func(ctx restate.Context, increments int32) (int32, error) { invokedSideEffects := atomic.Int32{} for i := int32(0); i < increments; i++ { - restate.RunAs(ctx, func(ctx restate.RunContext) (int32, error) { + restate.Run(ctx, func(ctx restate.RunContext) (int32, error) { return invokedSideEffects.Add(1), nil }) } diff --git a/test-services/upgradetest.go b/test-services/upgradetest.go index 358a7ea..d7308b0 100644 --- a/test-services/upgradetest.go +++ b/test-services/upgradetest.go @@ -23,12 +23,12 @@ func init() { if version() != "v1" { return "", fmt.Errorf("executeComplex should not be invoked with version different from 1!") } - awakeable := restate.AwakeableAs[string](ctx) - ctx.Object("AwakeableHolder", "upgrade", "hold").Send(awakeable.Id()) + awakeable := restate.Awakeable[string](ctx) + restate.ObjectSend(ctx, "AwakeableHolder", "upgrade", "hold").Send(awakeable.Id()) if _, err := awakeable.Result(); err != nil { return "", err } - ctx.Object("ListObject", "upgrade-test", "append").Send(version()) + restate.ObjectSend(ctx, "ListObject", "upgrade-test", "append").Send(version()) return version(), nil }))) }