Skip to content

Commit

Permalink
Always interact with ctx through facilitator functions
Browse files Browse the repository at this point in the history
  • Loading branch information
jackkleeman committed Aug 21, 2024
1 parent eb778c7 commit 4ac8539
Show file tree
Hide file tree
Showing 23 changed files with 377 additions and 276 deletions.
87 changes: 15 additions & 72 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"log/slog"
"time"

"github.com/restatedev/sdk-go/interfaces"
"github.com/restatedev/sdk-go/internal/futures"
"github.com/restatedev/sdk-go/internal/options"
"github.com/restatedev/sdk-go/internal/rand"
Expand All @@ -24,28 +25,28 @@ type Context interface {
// After is an alternative to Context.Sleep which allows you to complete other tasks concurrently
// with the sleep. This is particularly useful when combined with Context.Select to race between
// the sleep and other Selectable operations.
After(d time.Duration) After
After(d time.Duration) interfaces.After

// Service gets a Service accessor by service and method name
// Note: use the CallAs helper function to deserialise return values
Service(service, method string, opts ...options.ClientOption) CallClient
// Service gets a Service request client by service and method name
// Note: use module-level [Service] to deserialise return values
Service(service, method string, opts ...options.ClientOption) interfaces.Client

// Object gets a Object accessor by name, key and method name
// Note: use the CallAs helper function to receive serialised values
Object(object, key, method string, opts ...options.ClientOption) CallClient
// Object gets an Object request client by service name, key and method name
// Note: use module-level [Object] to receive serialised values
Object(object, key, method string, opts ...options.ClientOption) interfaces.Client

// Run runs the function (fn), storing final results (including terminal errors)
// durably in the journal, or otherwise for transient errors stopping execution
// so Restate can retry the invocation. Replays will produce the same value, so
// all non-deterministic operations (eg, generating a unique ID) *must* happen
// inside Run blocks.
// Note: use the RunAs helper function to get typed output values instead of providing an output pointer
// Note: use module-level [Run] to get typed output values instead of providing an output pointer
Run(fn func(ctx RunContext) (any, error), output any, opts ...options.RunOption) error

// Awakeable returns a Restate awakeable; a 'promise' to a future
// value or error, that can be resolved or rejected by other services.
// Note: use the AwakeableAs helper function to avoid having to pass a output pointer to Awakeable.Result()
Awakeable(options ...options.AwakeableOption) Awakeable
// Note: use module-level [Awakeable] to avoid having to pass a output pointer to Awakeable.Result()
Awakeable(options ...options.AwakeableOption) interfaces.Awakeable
// ResolveAwakeable allows an awakeable (not necessarily from this service) to be
// resolved with a particular value.
ResolveAwakeable(id string, value any, options ...options.ResolveAwakeableOption)
Expand All @@ -57,55 +58,8 @@ type Context interface {
// which allows you to safely run them in parallel. The Selector will store the order
// that things complete in durably inside Restate, so that on replay the same order
// can be used. This avoids non-determinism. It is *not* safe to use goroutines or channels
// outside of Context.Run functions, as they do not behave deterministically.
Select(futs ...Selectable) Selector
}

// Selectable is implemented by types that may be passed to Context.Select
type Selectable = futures.Selectable

// Awakeable is the Go representation of a Restate awakeable; a 'promise' to a future
// value or error, that can be resolved or rejected by other services.
type Awakeable interface {
// Id returns the awakeable ID, which can be stored or sent to a another service
Id() string
// Result blocks on receiving the result of the awakeable, storing the value it was
// resolved with in output or otherwise returning the error it was rejected with.
// It is *not* safe to call this in a goroutine - use Context.Select if you
// want to wait on multiple results at once.
// Note: use the AwakeableAs helper function to avoid having to pass a output pointer
Result(output any) error
Selectable
}

// CallClient represents all the different ways you can invoke a particular service/key/method tuple.
type CallClient interface {
// RequestFuture makes a call and returns a handle on a future response
RequestFuture(input any, opts ...options.RequestOption) ResponseFuture
// Request makes a call and blocks on getting the response which is stored in output
Request(input any, output any, opts ...options.RequestOption) error
// Send makes a one-way call which is executed in the background
Send(input any, opts ...options.SendOption)
}

// ResponseFuture is a handle on a potentially not-yet completed outbound call.
type ResponseFuture interface {
// Response blocks on the response to the call and stores it in output, or returns the associated error
// It is *not* safe to call this in a goroutine - use Context.Select if you
// want to wait on multiple results at once.
Response(output any) error
Selectable
}

// Selector is an iterator over a list of blocking Restate operations that are running
// in the background.
type Selector interface {
// Remaining returns whether there are still operations that haven't been returned by Select().
// There will always be exactly the same number of results as there were operations
// given to Context.Select
Remaining() bool
// Select blocks on the next completed operation or returns nil if there are none left
Select() Selectable
// outside of [Run] functions, as they do not behave deterministically.
Select(futs ...futures.Selectable) interfaces.Selector
}

// RunContext methods are the only methods of [Context] that are safe to call from inside a .Run()
Expand Down Expand Up @@ -137,17 +91,6 @@ type Request struct {
Body []byte
}

// After is a handle on a Sleep operation which allows you to do other work concurrently
// with the sleep.
type After interface {
// Done blocks waiting on the remaining duration of the sleep.
// It is *not* safe to call this in a goroutine - use Context.Select if you want to wait on multiple
// results at once. Can return a terminal error in the case where the invocation was cancelled mid-sleep,
// hence Done() should always be called, even after using Context.Select.
Done() error
Selectable
}

// ObjectContext is an extension of [Context] which can be used in exclusive-mode Virtual Object handlers,
// giving mutable access to state.
type ObjectContext interface {
Expand All @@ -173,7 +116,7 @@ type KeyValueReader interface {
Get(key string, value any, options ...options.GetOption) error
// Keys returns a list of all associated key
// If the invocation was cancelled while obtaining the state (only possible if eager state is disabled),
// a cancellation error is returned. If eager state is enabled (the default), err will always be nil.
// a cancellation error is returned.
Keys() ([]string, error)
// Key retrieves the key for this virtual object invocation. This is a no-op and is
// always safe to call.
Expand All @@ -186,6 +129,6 @@ type KeyValueWriter interface {
Set(key string, value any, options ...options.SetOption)
// Clear deletes a key
Clear(key string)
// ClearAll drops all stored state associated with key
// ClearAll drops all stored state associated with this Object key
ClearAll()
}
24 changes: 12 additions & 12 deletions examples/codegen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,29 +33,29 @@ type counter struct {
}

func (c counter) Add(ctx restate.ObjectContext, req *helloworld.AddRequest) (*helloworld.GetResponse, error) {
count, err := restate.GetAs[int64](ctx, "counter")
count, err := restate.Get[int64](ctx, "counter")
if err != nil {
return nil, err
}

watchers, err := restate.GetAs[[]string](ctx, "watchers")
watchers, err := restate.Get[[]string](ctx, "watchers")
if err != nil {
return nil, err
}

count += req.Delta
ctx.Set("counter", count)
restate.Set(ctx, "counter", count)

for _, awakeableID := range watchers {
ctx.ResolveAwakeable(awakeableID, count)
restate.ResolveAwakeable(ctx, awakeableID, count)
}
ctx.Clear("watchers")
restate.Clear(ctx, "watchers")

return &helloworld.GetResponse{Value: count}, nil
}

func (c counter) Get(ctx restate.ObjectSharedContext, _ *helloworld.GetRequest) (*helloworld.GetResponse, error) {
count, err := restate.GetAs[int64](ctx, "counter")
count, err := restate.Get[int64](ctx, "counter")
if err != nil {
return nil, err
}
Expand All @@ -64,22 +64,22 @@ func (c counter) Get(ctx restate.ObjectSharedContext, _ *helloworld.GetRequest)
}

func (c counter) AddWatcher(ctx restate.ObjectContext, req *helloworld.AddWatcherRequest) (*helloworld.AddWatcherResponse, error) {
watchers, err := restate.GetAs[[]string](ctx, "watchers")
watchers, err := restate.Get[[]string](ctx, "watchers")
if err != nil {
return nil, err
}
watchers = append(watchers, req.AwakeableId)
ctx.Set("watchers", watchers)
restate.Set(ctx, "watchers", watchers)
return &helloworld.AddWatcherResponse{}, nil
}

func (c counter) Watch(ctx restate.ObjectSharedContext, req *helloworld.WatchRequest) (*helloworld.GetResponse, error) {
awakeable := restate.AwakeableAs[int64](ctx)
awakeable := restate.Awakeable[int64](ctx)

// since this is a shared handler, we need to use a separate exclusive handler to store the awakeable ID
// if there is an in-flight Add call, this will take effect after it completes
// we could add a version counter check here to detect changes that happen mid-request and return immediately
if _, err := helloworld.NewCounterClient(ctx, ctx.Key()).
if _, err := helloworld.NewCounterClient(ctx, restate.Key(ctx)).
AddWatcher().
Request(&helloworld.AddWatcherRequest{AwakeableId: awakeable.Id()}); err != nil {
return nil, err
Expand All @@ -96,10 +96,10 @@ func (c counter) Watch(ctx restate.ObjectSharedContext, req *helloworld.WatchReq
return &helloworld.GetResponse{Value: next}, nil
}

after := ctx.After(timeout)
after := restate.After(ctx, timeout)

// this is the safe way to race two results
selector := ctx.Select(after, awakeable)
selector := restate.Select(ctx, after, awakeable)

if selector.Select() == after {
// the timeout won
Expand Down
30 changes: 15 additions & 15 deletions examples/codegen/proto/helloworld_restate.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions examples/ticketreservation/checkout.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (c *checkout) ServiceName() string {
const CheckoutServiceName = "Checkout"

func (c *checkout) Payment(ctx restate.Context, request PaymentRequest) (response PaymentResponse, err error) {
uuid := ctx.Rand().UUID().String()
uuid := restate.Rand(ctx).UUID().String()

response.ID = uuid

Expand All @@ -35,7 +35,7 @@ func (c *checkout) Payment(ctx restate.Context, request PaymentRequest) (respons
price := len(request.Tickets) * 30

response.Price = price
_, err = restate.RunAs(ctx, func(ctx restate.RunContext) (bool, error) {
_, err = restate.Run(ctx, func(ctx restate.RunContext) (bool, error) {
log := ctx.Log().With("uuid", uuid, "price", price)
if rand.Float64() < 0.5 {
log.Info("payment succeeded")
Expand Down
20 changes: 10 additions & 10 deletions examples/ticketreservation/ticket_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,55 +19,55 @@ type ticketService struct{}
func (t *ticketService) ServiceName() string { return TicketServiceName }

func (t *ticketService) Reserve(ctx restate.ObjectContext, _ restate.Void) (bool, error) {
status, err := restate.GetAs[TicketStatus](ctx, "status")
status, err := restate.Get[TicketStatus](ctx, "status")
if err != nil {
return false, err
}

if status == TicketAvailable {
ctx.Set("status", TicketReserved)
restate.Set(ctx, "status", TicketReserved)
return true, nil
}

return false, nil
}

func (t *ticketService) Unreserve(ctx restate.ObjectContext, _ restate.Void) (void restate.Void, err error) {
ticketId := ctx.Key()
ticketId := restate.Key(ctx)
ctx.Log().Info("un-reserving ticket", "ticket", ticketId)
status, err := restate.GetAs[TicketStatus](ctx, "status")
status, err := restate.Get[TicketStatus](ctx, "status")
if err != nil {
return void, err
}

if status != TicketSold {
ctx.Clear("status")
restate.Clear(ctx, "status")
return void, nil
}

return void, nil
}

func (t *ticketService) MarkAsSold(ctx restate.ObjectContext, _ restate.Void) (void restate.Void, err error) {
ticketId := ctx.Key()
ticketId := restate.Key(ctx)
ctx.Log().Info("mark ticket as sold", "ticket", ticketId)

status, err := restate.GetAs[TicketStatus](ctx, "status")
status, err := restate.Get[TicketStatus](ctx, "status")
if err != nil {
return void, err
}

if status == TicketReserved {
ctx.Set("status", TicketSold)
restate.Set(ctx, "status", TicketSold)
return void, nil
}

return void, nil
}

func (t *ticketService) Status(ctx restate.ObjectSharedContext, _ restate.Void) (TicketStatus, error) {
ticketId := ctx.Key()
ticketId := restate.Key(ctx)
ctx.Log().Info("mark ticket as sold", "ticket", ticketId)

return restate.GetAs[TicketStatus](ctx, "status")
return restate.Get[TicketStatus](ctx, "status")
}
Loading

0 comments on commit 4ac8539

Please sign in to comment.