Skip to content

Commit

Permalink
Add complete promise api (#63)
Browse files Browse the repository at this point in the history
  • Loading branch information
dfarr authored Sep 19, 2023
1 parent 9e1276a commit 1ad9f62
Show file tree
Hide file tree
Showing 18 changed files with 762 additions and 120 deletions.
12 changes: 0 additions & 12 deletions cmd/dst.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/resonatehq/resonate/internal/aio"
"github.com/resonatehq/resonate/internal/api"
"github.com/resonatehq/resonate/internal/app/coroutines"
"github.com/resonatehq/resonate/internal/app/subsystems/aio/network"
"github.com/resonatehq/resonate/internal/kernel/system"
"github.com/resonatehq/resonate/internal/kernel/types"
Expand Down Expand Up @@ -141,17 +140,6 @@ var dstRunCmd = &cobra.Command{

// instantiate system
system := system.New(api, aio, config.System, metrics)
system.AddOnRequest(types.ReadPromise, coroutines.ReadPromise)
system.AddOnRequest(types.SearchPromises, coroutines.SearchPromises)
system.AddOnRequest(types.CreatePromise, coroutines.CreatePromise)
system.AddOnRequest(types.ResolvePromise, coroutines.ResolvePromise)
system.AddOnRequest(types.RejectPromise, coroutines.RejectPromise)
system.AddOnRequest(types.CancelPromise, coroutines.CancelPromise)
system.AddOnRequest(types.ReadSubscriptions, coroutines.ReadSubscriptions)
system.AddOnRequest(types.CreateSubscription, coroutines.CreateSubscription)
system.AddOnRequest(types.DeleteSubscription, coroutines.DeleteSubscription)
system.AddOnTick(2, coroutines.TimeoutPromises)
system.AddOnTick(10, coroutines.NotifySubscriptions)

dst := dst.New(&dst.Config{
Ticks: ticks,
Expand Down
1 change: 1 addition & 0 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ var serveCmd = &cobra.Command{
system.AddOnRequest(types.ResolvePromise, coroutines.ResolvePromise)
system.AddOnRequest(types.RejectPromise, coroutines.RejectPromise)
system.AddOnRequest(types.CancelPromise, coroutines.CancelPromise)
system.AddOnRequest(types.CompletePromise, coroutines.CompletePromise)
system.AddOnRequest(types.ReadSubscriptions, coroutines.ReadSubscriptions)
system.AddOnRequest(types.CreateSubscription, coroutines.CreateSubscription)
system.AddOnRequest(types.DeleteSubscription, coroutines.DeleteSubscription)
Expand Down
2 changes: 0 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ services:
command:
- dst
- run
- --ticks
- "1"
- --aio-store-postgres-host
- postgres-dst
- --aio-store-postgres-username
Expand Down
224 changes: 224 additions & 0 deletions internal/app/coroutines/completePromise.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
package coroutines

import (
"fmt"
"log/slog"

"github.com/resonatehq/resonate/internal/kernel/scheduler"
"github.com/resonatehq/resonate/internal/kernel/types"
"github.com/resonatehq/resonate/internal/util"
"github.com/resonatehq/resonate/pkg/promise"
)

func CompletePromise(t int64, req *types.Request, res func(*types.Response, error)) *scheduler.Coroutine {
return scheduler.NewCoroutine(fmt.Sprintf("CompletePromise(id=%s)", req.CompletePromise.Id), "CompletePromise", func(s *scheduler.Scheduler, c *scheduler.Coroutine) {
util.Assert(req.CompletePromise.State.In(promise.Resolved|promise.Rejected|promise.Canceled), "state must be one of resolved, rejected, or canceled")

if req.CompletePromise.Value.Headers == nil {
req.CompletePromise.Value.Headers = map[string]string{}
}
if req.CompletePromise.Value.Data == nil {
req.CompletePromise.Value.Data = []byte{}
}

submission := &types.Submission{
Kind: types.Store,
Store: &types.StoreSubmission{
Transaction: &types.Transaction{
Commands: []*types.Command{
{
Kind: types.StoreReadPromise,
ReadPromise: &types.ReadPromiseCommand{
Id: req.CompletePromise.Id,
},
},
},
},
},
}

c.Yield(submission, func(completion *types.Completion, err error) {
if err != nil {
slog.Error("failed to read promise", "req", req, "err", err)
res(nil, err)
return
}

util.Assert(completion.Store != nil, "completion must not be nil")

result := completion.Store.Results[0].ReadPromise
util.Assert(result.RowsReturned == 0 || result.RowsReturned == 1, "result must return 0 or 1 rows")

if result.RowsReturned == 0 {
res(&types.Response{
Kind: types.CompletePromise,
CompletePromise: &types.CompletePromiseResponse{
Status: types.ResponseNotFound,
},
}, nil)
} else {
p, err := result.Records[0].Promise()
if err != nil {
slog.Error("failed to parse promise record", "record", result.Records[0], "err", err)
res(nil, err)
return
}

if p.State == promise.Pending {
if t >= p.Timeout {
s.Add(TimeoutPromise(t, p, CompletePromise(t, req, res), func(err error) {
if err != nil {
slog.Error("failed to timeout promise", "req", req, "err", err)
res(nil, err)
return
}

res(&types.Response{
Kind: types.CompletePromise,
CompletePromise: &types.CompletePromiseResponse{
Status: types.ResponseForbidden,
Promise: &promise.Promise{
Id: p.Id,
State: promise.Timedout,
Param: p.Param,
Value: promise.Value{
Headers: map[string]string{},
Ikey: nil,
Data: []byte{},
},
Timeout: p.Timeout,
Tags: p.Tags,
CreatedOn: p.CreatedOn,
CompletedOn: &p.Timeout,
},
},
}, nil)
}))
} else {
submission := &types.Submission{
Kind: types.Store,
Store: &types.StoreSubmission{
Transaction: &types.Transaction{
Commands: []*types.Command{
{
Kind: types.StoreReadSubscriptions,
ReadSubscriptions: &types.ReadSubscriptionsCommand{
PromiseIds: []string{req.CompletePromise.Id},
},
},
},
},
},
}

c.Yield(submission, func(completion *types.Completion, err error) {
if err != nil {
slog.Error("failed to read subscriptions", "req", req, "err", err)
res(nil, err)
return
}

util.Assert(completion.Store != nil, "completion must not be nil")
records := completion.Store.Results[0].ReadSubscriptions.Records

commands := []*types.Command{
{
Kind: types.StoreUpdatePromise,
UpdatePromise: &types.UpdatePromiseCommand{
Id: req.CompletePromise.Id,
State: req.CompletePromise.State,
Value: req.CompletePromise.Value,
CompletedOn: t,
},
},
{
Kind: types.StoreDeleteTimeout,
DeleteTimeout: &types.DeleteTimeoutCommand{
Id: req.CompletePromise.Id,
},
},
{
Kind: types.StoreDeleteSubscriptions,
DeleteSubscriptions: &types.DeleteSubscriptionsCommand{
PromiseId: req.CompletePromise.Id,
},
},
}

for _, record := range records {
commands = append(commands, &types.Command{
Kind: types.StoreCreateNotification,
CreateNotification: &types.CreateNotificationCommand{
Id: record.Id,
PromiseId: record.PromiseId,
Url: record.Url,
RetryPolicy: record.RetryPolicy,
Time: t,
},
})
}

submission := &types.Submission{
Kind: types.Store,
Store: &types.StoreSubmission{
Transaction: &types.Transaction{
Commands: commands,
},
},
}

c.Yield(submission, func(completion *types.Completion, err error) {
if err != nil {
slog.Error("failed to update state", "req", req, "err", err)
res(nil, err)
return
}

util.Assert(completion.Store != nil, "completion must not be nil")

result := completion.Store.Results[0].UpdatePromise
util.Assert(result.RowsAffected == 0 || result.RowsAffected == 1, "result must return 0 or 1 rows")

if result.RowsAffected == 1 {
res(&types.Response{
Kind: types.CompletePromise,
CompletePromise: &types.CompletePromiseResponse{
Status: types.ResponseCreated,
Promise: &promise.Promise{
Id: p.Id,
State: req.CompletePromise.State,
Param: p.Param,
Value: req.CompletePromise.Value,
Timeout: p.Timeout,
Tags: p.Tags,
CreatedOn: p.CreatedOn,
CompletedOn: &t,
},
},
}, nil)
} else {
s.Add(CompletePromise(t, req, res))
}
})
})
}
} else {
var status types.ResponseStatus
if p.Value.Ikey.Match(req.CompletePromise.Value.Ikey) {
status = types.ResponseOK
} else {
status = types.ResponseForbidden
}

res(&types.Response{
Kind: types.CompletePromise,
CompletePromise: &types.CompletePromiseResponse{
Status: status,
Promise: p,
},
}, nil)
}
}
})
})
}
Loading

0 comments on commit 1ad9f62

Please sign in to comment.