Skip to content

Commit

Permalink
Read promise and subscriptions in one fell swoop
Browse files Browse the repository at this point in the history
  • Loading branch information
dfarr committed Sep 26, 2023
1 parent fa54979 commit f44c1d1
Show file tree
Hide file tree
Showing 9 changed files with 406 additions and 457 deletions.
160 changes: 71 additions & 89 deletions internal/app/coroutines/cancelPromise.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,30 @@ func CancelPromise(t int64, req *types.Request, res func(int64, *types.Response,
Id: req.CancelPromise.Id,
},
},
{
Kind: types.StoreReadSubscriptions,
ReadSubscriptions: &types.ReadSubscriptionsCommand{
PromiseIds: []string{req.CancelPromise.Id},
},
},
},
},
},
}

c.Yield(submission, func(t int64, completion *types.Completion, err error) {
if err != nil {
slog.Error("failed to read promise", "req", req, "err", err)
slog.Error("failed to read promise or read subscriptions", "req", req, "err", err)
res(t, nil, err)
return
}

util.Assert(completion.Store != nil, "completion must not be nil")
util.Assert(len(completion.Store.Results) == 2, "completion must contain two results")

result := completion.Store.Results[0].ReadPromise
records := completion.Store.Results[1].ReadSubscriptions.Records

util.Assert(result.RowsReturned == 0 || result.RowsReturned == 1, "result must return 0 or 1 rows")

if result.RowsReturned == 0 {
Expand All @@ -64,7 +73,7 @@ func CancelPromise(t int64, req *types.Request, res func(int64, *types.Response,

if p.State == promise.Pending {
if t >= p.Timeout {
s.Add(TimeoutPromise(t, p, CancelPromise(t, req, res), func(t int64, err error) {
s.Add(TimeoutPromise(t, p, records, CancelPromise(t, req, res), func(t int64, err error) {
if err != nil {
slog.Error("failed to timeout promise", "req", req, "err", err)
res(t, nil, err)
Expand Down Expand Up @@ -93,111 +102,84 @@ func CancelPromise(t int64, req *types.Request, res func(int64, *types.Response,
}, nil)
}))
} else {
commands := []*types.Command{
{
Kind: types.StoreUpdatePromise,
UpdatePromise: &types.UpdatePromiseCommand{
Id: req.CancelPromise.Id,
State: promise.Canceled,
Value: req.CancelPromise.Value,
CompletedOn: t,
},
},
{
Kind: types.StoreDeleteTimeout,
DeleteTimeout: &types.DeleteTimeoutCommand{
Id: req.CancelPromise.Id,
},
},
{
Kind: types.StoreDeleteSubscriptions,
DeleteSubscriptions: &types.DeleteSubscriptionsCommand{
PromiseId: req.CancelPromise.Id,
},
},
}

for _, record := range records {
commands = append(commands, &types.Command{
Kind: types.StoreCreateNotification,
CreateNotification: &types.CreateNotificationCommand{
Id: record.Id,
PromiseId: record.PromiseId,
Url: record.Url,
RetryPolicy: record.RetryPolicy,
Time: t,
},
})
}

submission := &types.Submission{
Kind: types.Store,
Store: &types.StoreSubmission{
Transaction: &types.Transaction{
Commands: []*types.Command{
{
Kind: types.StoreReadSubscriptions,
ReadSubscriptions: &types.ReadSubscriptionsCommand{
PromiseIds: []string{req.CancelPromise.Id},
},
},
},
Commands: commands,
},
},
}

c.Yield(submission, func(t int64, completion *types.Completion, err error) {
if err != nil {
slog.Error("failed to read subscriptions", "req", req, "err", err)
slog.Error("failed to update state", "req", req, "err", err)
res(t, nil, err)
return
}

util.Assert(completion.Store != nil, "completion must not be nil")
records := completion.Store.Results[0].ReadSubscriptions.Records

commands := []*types.Command{
{
Kind: types.StoreUpdatePromise,
UpdatePromise: &types.UpdatePromiseCommand{
Id: req.CancelPromise.Id,
State: promise.Canceled,
Value: req.CancelPromise.Value,
CompletedOn: t,
},
},
{
Kind: types.StoreDeleteTimeout,
DeleteTimeout: &types.DeleteTimeoutCommand{
Id: req.CancelPromise.Id,
},
},
{
Kind: types.StoreDeleteSubscriptions,
DeleteSubscriptions: &types.DeleteSubscriptionsCommand{
PromiseId: req.CancelPromise.Id,
},
},
}

for _, record := range records {
commands = append(commands, &types.Command{
Kind: types.StoreCreateNotification,
CreateNotification: &types.CreateNotificationCommand{
Id: record.Id,
PromiseId: record.PromiseId,
Url: record.Url,
RetryPolicy: record.RetryPolicy,
Time: t,
},
})
}

submission := &types.Submission{
Kind: types.Store,
Store: &types.StoreSubmission{
Transaction: &types.Transaction{
Commands: commands,
result := completion.Store.Results[0].UpdatePromise
util.Assert(result.RowsAffected == 0 || result.RowsAffected == 1, "result must return 0 or 1 rows")

if result.RowsAffected == 1 {
res(t, &types.Response{
Kind: types.CancelPromise,
CancelPromise: &types.CancelPromiseResponse{
Status: types.ResponseCreated,
Promise: &promise.Promise{
Id: p.Id,
State: promise.Canceled,
Param: p.Param,
Value: req.CancelPromise.Value,
Timeout: p.Timeout,
Tags: p.Tags,
CreatedOn: p.CreatedOn,
CompletedOn: &t,
},
},
},
}, nil)
} else {
s.Add(CancelPromise(t, req, res))
}

c.Yield(submission, func(t int64, completion *types.Completion, err error) {
if err != nil {
slog.Error("failed to update state", "req", req, "err", err)
res(t, nil, err)
return
}

util.Assert(completion.Store != nil, "completion must not be nil")

result := completion.Store.Results[0].UpdatePromise
util.Assert(result.RowsAffected == 0 || result.RowsAffected == 1, "result must return 0 or 1 rows")

if result.RowsAffected == 1 {
res(t, &types.Response{
Kind: types.CancelPromise,
CancelPromise: &types.CancelPromiseResponse{
Status: types.ResponseCreated,
Promise: &promise.Promise{
Id: p.Id,
State: promise.Canceled,
Param: p.Param,
Value: req.CancelPromise.Value,
Timeout: p.Timeout,
Tags: p.Tags,
CreatedOn: p.CreatedOn,
CompletedOn: &t,
},
},
}, nil)
} else {
s.Add(CancelPromise(t, req, res))
}
})
})
}
} else {
Expand Down
Loading

0 comments on commit f44c1d1

Please sign in to comment.