Skip to content

Commit

Permalink
Changes for Complete coroutine. (#218)
Browse files Browse the repository at this point in the history
* Changes for Complete coroutine.

* Reverse Enum ordering, remove Invalid Enum and fix test errors.

* Consolidated t_api Kind.

* Add return statement.
  • Loading branch information
favalos committed Jan 30, 2024
1 parent 6cb07b4 commit 268c588
Show file tree
Hide file tree
Showing 19 changed files with 183 additions and 755 deletions.
8 changes: 2 additions & 6 deletions cmd/dst/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,7 @@ func RunDSTCmd() *cobra.Command {
system.AddOnRequest(t_api.ReadPromise, coroutines.ReadPromise)
system.AddOnRequest(t_api.SearchPromises, coroutines.SearchPromises)
system.AddOnRequest(t_api.CreatePromise, coroutines.CreatePromise)
system.AddOnRequest(t_api.CancelPromise, coroutines.CancelPromise)
system.AddOnRequest(t_api.ResolvePromise, coroutines.ResolvePromise)
system.AddOnRequest(t_api.RejectPromise, coroutines.RejectPromise)
system.AddOnRequest(t_api.CompletePromise, coroutines.CompletePromise)
system.AddOnRequest(t_api.ReadSchedule, coroutines.ReadSchedule)
system.AddOnRequest(t_api.SearchSchedules, coroutines.SearchSchedules)
system.AddOnRequest(t_api.CreateSchedule, coroutines.CreateSchedule)
Expand All @@ -133,9 +131,7 @@ func RunDSTCmd() *cobra.Command {
t_api.ReadPromise,
t_api.SearchPromises,
t_api.CreatePromise,
t_api.CancelPromise,
t_api.ResolvePromise,
t_api.RejectPromise,
t_api.CompletePromise,

// SCHEDULE
t_api.ReadSchedule,
Expand Down
4 changes: 1 addition & 3 deletions cmd/serve/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,7 @@ func ServeCmd() *cobra.Command {
system.AddOnRequest(t_api.ReadPromise, coroutines.ReadPromise)
system.AddOnRequest(t_api.SearchPromises, coroutines.SearchPromises)
system.AddOnRequest(t_api.CreatePromise, coroutines.CreatePromise)
system.AddOnRequest(t_api.ResolvePromise, coroutines.ResolvePromise)
system.AddOnRequest(t_api.RejectPromise, coroutines.RejectPromise)
system.AddOnRequest(t_api.CancelPromise, coroutines.CancelPromise)
system.AddOnRequest(t_api.CompletePromise, coroutines.CompletePromise)
system.AddOnRequest(t_api.ReadSchedule, coroutines.ReadSchedule)
system.AddOnRequest(t_api.SearchSchedules, coroutines.SearchSchedules)
system.AddOnRequest(t_api.CreateSchedule, coroutines.CreateSchedule)
Expand Down
8 changes: 2 additions & 6 deletions internal/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,8 @@ func (a *api) Enqueue(sqe *bus.SQE[t_api.Request, t_api.Response]) {
status = int(res.SearchPromises.Status)
case t_api.CreatePromise:
status = int(res.CreatePromise.Status)
case t_api.CancelPromise:
status = int(res.CancelPromise.Status)
case t_api.ResolvePromise:
status = int(res.ResolvePromise.Status)
case t_api.RejectPromise:
status = int(res.RejectPromise.Status)
case t_api.CompletePromise:
status = int(res.CompletePromise.Status)
case t_api.ReadSchedule:
status = int(res.ReadSchedule.Status)
case t_api.SearchSchedules:
Expand Down
185 changes: 0 additions & 185 deletions internal/app/coroutines/cancelPromise.go

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ import (
"github.com/resonatehq/resonate/pkg/promise"
)

func RejectPromise(metadata *metadata.Metadata, req *t_api.Request, res func(*t_api.Response, error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] {
func CompletePromise(metadata *metadata.Metadata, req *t_api.Request, res func(*t_api.Response, error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] {
return scheduler.NewCoroutine(metadata, func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) {
if req.RejectPromise.Value.Headers == nil {
req.RejectPromise.Value.Headers = map[string]string{}
if req.CompletePromise.Value.Headers == nil {
req.CompletePromise.Value.Headers = map[string]string{}
}
if req.RejectPromise.Value.Data == nil {
req.RejectPromise.Value.Data = []byte{}
if req.CompletePromise.Value.Data == nil {
req.CompletePromise.Value.Data = []byte{}
}

completion, err := c.Yield(&t_aio.Submission{
Expand All @@ -28,7 +28,7 @@ func RejectPromise(metadata *metadata.Metadata, req *t_api.Request, res func(*t_
{
Kind: t_aio.ReadPromise,
ReadPromise: &t_aio.ReadPromiseCommand{
Id: req.RejectPromise.Id,
Id: req.CompletePromise.Id,
},
},
},
Expand All @@ -49,8 +49,8 @@ func RejectPromise(metadata *metadata.Metadata, req *t_api.Request, res func(*t_

if result.RowsReturned == 0 {
res(&t_api.Response{
Kind: t_api.RejectPromise,
RejectPromise: &t_api.CompletePromiseResponse{
Kind: req.Kind,
CompletePromise: &t_api.CompletePromiseResponse{
Status: t_api.StatusPromiseNotFound,
},
}, nil)
Expand All @@ -64,16 +64,16 @@ func RejectPromise(metadata *metadata.Metadata, req *t_api.Request, res func(*t_

if p.State == promise.Pending {
if c.Time() >= p.Timeout {
c.Scheduler.Add(TimeoutPromise(metadata, p, RejectPromise(metadata, req, res), func(err error) {
c.Scheduler.Add(TimeoutPromise(metadata, p, CompletePromise(metadata, req, res), func(err error) {
if err != nil {
slog.Error("failed to timeout promise", "req", req, "err", err)
res(nil, t_api.NewResonateError(t_api.ErrAIOStoreFailure, "failed to timeout promise", err))
return
}

res(&t_api.Response{
Kind: t_api.RejectPromise,
RejectPromise: &t_api.CompletePromiseResponse{
Kind: req.Kind,
CompletePromise: &t_api.CompletePromiseResponse{
Status: t_api.StatusPromiseAlreadyTimedOut,
Promise: &promise.Promise{
Id: p.Id,
Expand Down Expand Up @@ -103,24 +103,24 @@ func RejectPromise(metadata *metadata.Metadata, req *t_api.Request, res func(*t_
{
Kind: t_aio.UpdatePromise,
UpdatePromise: &t_aio.UpdatePromiseCommand{
Id: req.RejectPromise.Id,
State: promise.Rejected,
Value: req.RejectPromise.Value,
IdempotencyKey: req.RejectPromise.IdempotencyKey,
Id: req.CompletePromise.Id,
State: req.CompletePromise.State,
Value: req.CompletePromise.Value,
IdempotencyKey: req.CompletePromise.IdempotencyKey,
CompletedOn: completedOn,
},
},
{
Kind: t_aio.CreateNotifications,
CreateNotifications: &t_aio.CreateNotificationsCommand{
PromiseId: req.RejectPromise.Id,
PromiseId: req.CompletePromise.Id,
Time: completedOn,
},
},
{
Kind: t_aio.DeleteSubscriptions,
DeleteSubscriptions: &t_aio.DeleteSubscriptionsCommand{
PromiseId: req.RejectPromise.Id,
PromiseId: req.CompletePromise.Id,
},
},
},
Expand All @@ -141,38 +141,38 @@ func RejectPromise(metadata *metadata.Metadata, req *t_api.Request, res func(*t_

if result.RowsAffected == 1 {
res(&t_api.Response{
Kind: t_api.RejectPromise,
RejectPromise: &t_api.CompletePromiseResponse{
Kind: req.Kind,
CompletePromise: &t_api.CompletePromiseResponse{
Status: t_api.StatusCreated,
Promise: &promise.Promise{
Id: p.Id,
State: promise.Rejected,
State: req.CompletePromise.State,
Param: p.Param,
Value: req.RejectPromise.Value,
Value: req.CompletePromise.Value,
Timeout: p.Timeout,
IdempotencyKeyForCreate: p.IdempotencyKeyForCreate,
IdempotencyKeyForComplete: req.RejectPromise.IdempotencyKey,
IdempotencyKeyForComplete: req.CompletePromise.IdempotencyKey,
Tags: p.Tags,
CreatedOn: p.CreatedOn,
CompletedOn: &completedOn,
},
},
}, nil)
} else {
c.Scheduler.Add(RejectPromise(metadata, req, res))
c.Scheduler.Add(CompletePromise(metadata, req, res))
}
}
} else {
status := t_api.ForbiddenStatus(p.State)
strict := req.RejectPromise.Strict && p.State != promise.Rejected
strict := req.CompletePromise.Strict && p.State != req.CompletePromise.State

if !strict && p.IdempotencyKeyForComplete.Match(req.RejectPromise.IdempotencyKey) {
if !strict && p.IdempotencyKeyForComplete.Match(req.CompletePromise.IdempotencyKey) {
status = t_api.StatusOK
}

res(&t_api.Response{
Kind: t_api.RejectPromise,
RejectPromise: &t_api.CompletePromiseResponse{
Kind: req.Kind,
CompletePromise: &t_api.CompletePromiseResponse{
Status: status,
Promise: p,
},
Expand Down
Loading

0 comments on commit 268c588

Please sign in to comment.