Skip to content

Commit

Permalink
Add awakeable example (#26)
Browse files Browse the repository at this point in the history
  • Loading branch information
jackkleeman authored Aug 12, 2024
1 parent 8eb988a commit 8521ac0
Show file tree
Hide file tree
Showing 4 changed files with 353 additions and 37 deletions.
72 changes: 71 additions & 1 deletion examples/codegen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"log/slog"
"os"
"time"

restate "github.com/restatedev/sdk-go"
helloworld "github.com/restatedev/sdk-go/examples/codegen/proto"
Expand Down Expand Up @@ -38,11 +39,23 @@ func (c counter) Add(ctx restate.ObjectContext, req *helloworld.AddRequest) (*he
return nil, err
}

count += 1
watchers, err := restate.GetAs[[]string](ctx, "watchers")
if err != nil && !errors.Is(err, restate.ErrKeyNotFound) {
return nil, err
}

count += req.Delta
if err := ctx.Set("counter", count); err != nil {
return nil, err
}

for _, awakeableID := range watchers {
if err := ctx.ResolveAwakeable(awakeableID, count); err != nil {
return nil, err
}
}
ctx.Clear("watchers")

return &helloworld.GetResponse{Value: count}, nil
}

Expand All @@ -55,6 +68,63 @@ func (c counter) Get(ctx restate.ObjectSharedContext, _ *helloworld.GetRequest)
return &helloworld.GetResponse{Value: count}, nil
}

func (c counter) AddWatcher(ctx restate.ObjectContext, req *helloworld.AddWatcherRequest) (*helloworld.AddWatcherResponse, error) {
watchers, err := restate.GetAs[[]string](ctx, "watchers")
if err != nil && !errors.Is(err, restate.ErrKeyNotFound) {
return nil, err
}
watchers = append(watchers, req.AwakeableId)
if err := ctx.Set("watchers", watchers); err != nil {
return nil, err
}
return &helloworld.AddWatcherResponse{}, nil
}

func (c counter) Watch(ctx restate.ObjectSharedContext, req *helloworld.WatchRequest) (*helloworld.GetResponse, error) {
awakeable := restate.AwakeableAs[int64](ctx)

// since this is a shared handler, we need to use a separate exclusive handler to store the awakeable ID
// if there is an in-flight Add call, this will take effect after it completes
// we could add a version counter check here to detect changes that happen mid-request and return immediately
if _, err := helloworld.NewCounterClient(ctx, ctx.Key()).
AddWatcher().
Request(&helloworld.AddWatcherRequest{AwakeableId: awakeable.Id()}); err != nil {
return nil, err
}

timeout := time.Duration(req.TimeoutMillis) * time.Millisecond
if timeout == 0 {
// infinite timeout case; just await the next value
next, err := awakeable.Result()
if err != nil {
return nil, err
}

return &helloworld.GetResponse{Value: next}, nil
}

after := ctx.After(timeout)

// this is the safe way to race two results
selector := ctx.Select(after, awakeable)

if selector.Select() == after {
// the timeout won
if err := after.Done(); err != nil {
// an error here implies this invocation was cancelled
return nil, err
}
return nil, restate.TerminalError(context.DeadlineExceeded, 408)
}

// otherwise, the awakeable won
next, err := awakeable.Result()
if err != nil {
return nil, err
}
return &helloworld.GetResponse{Value: next}, nil
}

func main() {
server := server.NewRestate().
Bind(helloworld.NewGreeterServer(greeter{})).
Expand Down
Loading

0 comments on commit 8521ac0

Please sign in to comment.