Skip to content

Commit

Permalink
Add tags and createdOn/completedOn timestamps to promises
Browse files Browse the repository at this point in the history
  • Loading branch information
dfarr committed Aug 31, 2023
1 parent 4f1f006 commit d08b071
Show file tree
Hide file tree
Showing 29 changed files with 438 additions and 219 deletions.
6 changes: 3 additions & 3 deletions internal/aio/aio.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (a *aio) Enqueue(sqe *bus.SQE[types.Submission, types.Completion]) {
select {
case subsystem.sq <- sqe:
slog.Debug("aio:enqueue", "sqe", sqe.Submission)
a.metrics.AioInFlight.WithLabelValues(sqe.Submission.Kind.String()).Inc()
a.metrics.AioInFlight.WithLabelValues(sqe.Kind).Inc()

Check warning on line 112 in internal/aio/aio.go

View check run for this annotation

Codecov / codecov/patch

internal/aio/aio.go#L112

Added line #L112 was not covered by tests
default:
sqe.Callback(nil, fmt.Errorf("aio:subsystem:%s submission queue full", subsystem))
}
Expand Down Expand Up @@ -138,8 +138,8 @@ func (a *aio) Dequeue(n int) []*bus.CQE[types.Submission, types.Completion] {
}

slog.Debug("aio:dequeue", "cqe", cqe.Completion)
a.metrics.AioTotal.WithLabelValues(cqe.Completion.Kind.String(), status).Inc()
a.metrics.AioInFlight.WithLabelValues(cqe.Completion.Kind.String()).Dec()
a.metrics.AioTotal.WithLabelValues(cqe.Kind, status).Inc()
a.metrics.AioInFlight.WithLabelValues(cqe.Kind).Dec()

Check warning on line 142 in internal/aio/aio.go

View check run for this annotation

Codecov / codecov/patch

internal/aio/aio.go#L141-L142

Added lines #L141 - L142 were not covered by tests

cqes = append(cqes, cqe)
default:
Expand Down
68 changes: 35 additions & 33 deletions internal/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

type API interface {
Enqueue(string, *bus.SQE[types.Request, types.Response])
Enqueue(*bus.SQE[types.Request, types.Response])
Dequeue(int, <-chan time.Time) []*bus.SQE[types.Request, types.Response]
Done() bool
}
Expand Down Expand Up @@ -66,40 +66,42 @@ func (a *api) Errors() <-chan error {
return a.errors
}

func (a *api) Enqueue(kind string, sqe *bus.SQE[types.Request, types.Response]) {
select {
case a.sq <- sqe:
a.metrics.ApiInFlight.WithLabelValues(kind).Inc()

callback := sqe.Callback
sqe.Callback = func(res *types.Response, err error) {
var status types.ResponseStatus
switch res.Kind {
case types.ReadPromise:
status = res.ReadPromise.Status
case types.SearchPromises:
status = res.SearchPromises.Status
case types.CreatePromise:
status = res.CreatePromise.Status
case types.CancelPromise:
status = res.CancelPromise.Status
case types.ResolvePromise:
status = res.ResolvePromise.Status
case types.RejectPromise:
status = res.RejectPromise.Status
case types.ReadSubscriptions:
status = res.ReadSubscriptions.Status
case types.CreateSubscription:
status = res.CreateSubscription.Status
case types.DeleteSubscription:
status = res.DeleteSubscription.Status
}
func (a *api) Enqueue(sqe *bus.SQE[types.Request, types.Response]) {
// replace sqe.Callback with a callback that wraps the original
// function and emits metrics
callback := sqe.Callback
sqe.Callback = func(res *types.Response, err error) {
var status types.ResponseStatus
switch res.Kind {
case types.ReadPromise:
status = res.ReadPromise.Status
case types.SearchPromises:
status = res.SearchPromises.Status
case types.CreatePromise:
status = res.CreatePromise.Status
case types.CancelPromise:
status = res.CancelPromise.Status
case types.ResolvePromise:
status = res.ResolvePromise.Status
case types.RejectPromise:
status = res.RejectPromise.Status
case types.ReadSubscriptions:
status = res.ReadSubscriptions.Status
case types.CreateSubscription:
status = res.CreateSubscription.Status
case types.DeleteSubscription:
status = res.DeleteSubscription.Status
}

a.metrics.ApiTotal.WithLabelValues(kind, strconv.Itoa(int(status))).Inc()
a.metrics.ApiInFlight.WithLabelValues(kind).Dec()
a.metrics.ApiTotal.WithLabelValues(sqe.Kind, strconv.Itoa(int(status))).Inc()
a.metrics.ApiInFlight.WithLabelValues(sqe.Kind).Dec()

callback(res, err)
}
callback(res, err)
}

select {
case a.sq <- sqe:
a.metrics.ApiInFlight.WithLabelValues(sqe.Kind).Inc()
default:
sqe.Callback(nil, fmt.Errorf("api submission queue full"))
}
Expand Down
50 changes: 24 additions & 26 deletions internal/app/coroutines/cancelPromise.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,16 @@ func CancelPromise(t int64, req *types.Request, res func(*types.Response, error)
},
}, nil)
} else {
record := result.Records[0]

param, err := record.Param()
p, err := result.Records[0].Promise()
if err != nil {
slog.Error("failed to parse promise record param", "record", record, "err", err)
slog.Error("failed to parse promise record", "record", result.Records[0], "err", err)

Check warning on line 57 in internal/app/coroutines/cancelPromise.go

View check run for this annotation

Codecov / codecov/patch

internal/app/coroutines/cancelPromise.go#L57

Added line #L57 was not covered by tests
res(nil, err)
return
}

if record.State == promise.Pending {
if t >= record.Timeout {
s.Add(TimeoutPromise(t, req.CancelPromise.Id, CancelPromise(t, req, res), func(err error) {
if p.State == promise.Pending {
if t >= p.Timeout {
s.Add(TimeoutPromise(t, p, CancelPromise(t, req, res), func(err error) {
if err != nil {
slog.Error("failed to timeout promise", "req", req, "err", err)
res(nil, err)
Expand All @@ -75,15 +73,18 @@ func CancelPromise(t int64, req *types.Request, res func(*types.Response, error)
CancelPromise: &types.CancelPromiseResponse{
Status: types.ResponseForbidden,
Promise: &promise.Promise{
Id: record.Id,
Id: p.Id,
State: promise.Timedout,
Param: param,
Param: p.Param,
Value: promise.Value{
Headers: map[string]string{},
Ikey: nil,
Data: nil,
},
Timeout: record.Timeout,
Timeout: p.Timeout,
Tags: p.Tags,
CreatedOn: p.CreatedOn,
CompletedOn: &p.Timeout,
},
},
}, nil)
Expand Down Expand Up @@ -119,9 +120,10 @@ func CancelPromise(t int64, req *types.Request, res func(*types.Response, error)
{
Kind: types.StoreUpdatePromise,
UpdatePromise: &types.UpdatePromiseCommand{
Id: req.CancelPromise.Id,
State: promise.Canceled,
Value: req.CancelPromise.Value,
Id: req.CancelPromise.Id,
State: promise.Canceled,
Value: req.CancelPromise.Value,
CompletedOn: t,
},
},
{
Expand Down Expand Up @@ -171,11 +173,14 @@ func CancelPromise(t int64, req *types.Request, res func(*types.Response, error)
CancelPromise: &types.CancelPromiseResponse{
Status: types.ResponseCreated,
Promise: &promise.Promise{
Id: record.Id,
State: promise.Canceled,
Timeout: record.Timeout,
Param: param,
Value: req.CancelPromise.Value,
Id: p.Id,
State: promise.Canceled,
Param: p.Param,
Value: req.CancelPromise.Value,
Timeout: p.Timeout,
Tags: p.Tags,
CreatedOn: p.CreatedOn,
CompletedOn: &t,
},
},
}, nil)
Expand All @@ -186,15 +191,8 @@ func CancelPromise(t int64, req *types.Request, res func(*types.Response, error)
})
}
} else {
p, err := record.Promise()
if err != nil {
slog.Error("failed to parse promise record", "record", record, "err", err)
res(nil, err)
return
}

var status types.ResponseStatus
if record.State == promise.Canceled && record.ValueIkey.Match(req.CancelPromise.Value.Ikey) {
if p.State == promise.Canceled && p.Value.Ikey.Match(req.CancelPromise.Value.Ikey) {
status = types.ResponseOK
} else {
status = types.ResponseForbidden
Expand Down
57 changes: 30 additions & 27 deletions internal/app/coroutines/createPromise.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ func CreatePromise(t int64, req *types.Request, res func(*types.Response, error)
if req.CreatePromise.Param.Headers == nil {
req.CreatePromise.Param.Headers = map[string]string{}
}
if req.CreatePromise.Tags == nil {
req.CreatePromise.Tags = map[string]string{}
}

submission := &types.Submission{
Kind: types.Store,
Expand Down Expand Up @@ -50,9 +53,11 @@ func CreatePromise(t int64, req *types.Request, res func(*types.Response, error)
{
Kind: types.StoreCreatePromise,
CreatePromise: &types.CreatePromiseCommand{
Id: req.CreatePromise.Id,
Timeout: req.CreatePromise.Timeout,
Param: req.CreatePromise.Param,
Id: req.CreatePromise.Id,
Param: req.CreatePromise.Param,
Timeout: req.CreatePromise.Timeout,
Tags: req.CreatePromise.Tags,
CreatedOn: t,
},
},
{
Expand All @@ -79,6 +84,7 @@ func CreatePromise(t int64, req *types.Request, res func(*types.Response, error)
PromiseId: req.CreatePromise.Id,
Url: s.Url,
RetryPolicy: s.RetryPolicy,
CreatedOn: t,
},
})
}
Expand Down Expand Up @@ -110,10 +116,12 @@ func CreatePromise(t int64, req *types.Request, res func(*types.Response, error)
CreatePromise: &types.CreatePromiseResponse{
Status: types.ResponseCreated,
Promise: &promise.Promise{
Id: req.CreatePromise.Id,
State: promise.Pending,
Timeout: req.CreatePromise.Timeout,
Param: req.CreatePromise.Param,
Id: req.CreatePromise.Id,
State: promise.Pending,
Param: req.CreatePromise.Param,
Timeout: req.CreatePromise.Timeout,
Tags: req.CreatePromise.Tags,
CreatedOn: &t,
},
},
}, nil)
Expand All @@ -122,55 +130,50 @@ func CreatePromise(t int64, req *types.Request, res func(*types.Response, error)
}
})
} else {
record := result.Records[0]
p, err := result.Records[0].Promise()
if err != nil {
slog.Error("failed to parse promise record", "record", result.Records[0], "err", err)
res(nil, err)
return
}

Check warning on line 138 in internal/app/coroutines/createPromise.go

View check run for this annotation

Codecov / codecov/patch

internal/app/coroutines/createPromise.go#L135-L138

Added lines #L135 - L138 were not covered by tests

var status types.ResponseStatus
if record.ParamIkey.Match(req.CreatePromise.Param.Ikey) {
if p.Param.Ikey.Match(req.CreatePromise.Param.Ikey) {
status = types.ResponseOK
} else {
status = types.ResponseForbidden
}

if record.State == promise.Pending && t >= record.Timeout {
s.Add(TimeoutPromise(t, req.CreatePromise.Id, CreatePromise(t, req, res), func(err error) {
if p.State == promise.Pending && t >= p.Timeout {
s.Add(TimeoutPromise(t, p, CreatePromise(t, req, res), func(err error) {
if err != nil {
slog.Error("failed to timeout promise", "req", req, "err", err)
res(nil, err)
return
}

param, err := record.Param()
if err != nil {
res(nil, err)
return
}

res(&types.Response{
Kind: types.CreatePromise,
CreatePromise: &types.CreatePromiseResponse{
Status: status,
Promise: &promise.Promise{
Id: record.Id,
Id: p.Id,
State: promise.Timedout,
Param: param,
Param: p.Param,
Value: promise.Value{
Headers: map[string]string{},
Ikey: nil,
Data: nil,
},
Timeout: record.Timeout,
Timeout: p.Timeout,
Tags: p.Tags,
CreatedOn: p.CreatedOn,
CompletedOn: &p.Timeout,
},
},
}, nil)
}))
} else {
p, err := record.Promise()
if err != nil {
slog.Error("failed to parse promise record", "record", record, "err", err)
res(nil, err)
return
}

res(&types.Response{
Kind: types.CreatePromise,
CreatePromise: &types.CreatePromiseResponse{
Expand Down
1 change: 1 addition & 0 deletions internal/app/coroutines/createSubscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func CreateSubscription(t int64, req *types.Request, res func(*types.Response, e
PromiseId: req.CreateSubscription.PromiseId,
Url: req.CreateSubscription.Url,
RetryPolicy: req.CreateSubscription.RetryPolicy,
CreatedOn: t,
},
},
},
Expand Down
34 changes: 14 additions & 20 deletions internal/app/coroutines/readPromise.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,49 +48,43 @@ func ReadPromise(t int64, req *types.Request, res func(*types.Response, error))
},
}, nil)
} else {
record := result.Records[0]
p, err := result.Records[0].Promise()
if err != nil {
slog.Error("failed to parse promise record", "record", result.Records[0], "err", err)
res(nil, err)
return
}

Check warning on line 57 in internal/app/coroutines/readPromise.go

View check run for this annotation

Codecov / codecov/patch

internal/app/coroutines/readPromise.go#L53-L57

Added lines #L53 - L57 were not covered by tests
if record.State == promise.Pending && t >= record.Timeout {
s.Add(TimeoutPromise(t, req.ReadPromise.Id, ReadPromise(t, req, res), func(err error) {
if p.State == promise.Pending && t >= p.Timeout {
s.Add(TimeoutPromise(t, p, ReadPromise(t, req, res), func(err error) {
if err != nil {
slog.Error("failed to timeout promise", "req", req, "err", err)
res(nil, err)
return
}

param, err := record.Param()
if err != nil {
slog.Error("failed to parse promise record param", "record", record, "err", err)
res(nil, err)
return
}

res(&types.Response{
Kind: types.ReadPromise,
ReadPromise: &types.ReadPromiseResponse{
Status: types.ResponseOK,
Promise: &promise.Promise{
Id: record.Id,
Id: p.Id,
State: promise.Timedout,
Param: param,
Param: p.Param,
Value: promise.Value{
Headers: map[string]string{},
Ikey: nil,
Data: nil,
},
Timeout: record.Timeout,
Timeout: p.Timeout,
Tags: p.Tags,
CreatedOn: p.CreatedOn,
CompletedOn: &p.Timeout,
},
},
}, nil)
}))
} else {
p, err := record.Promise()
if err != nil {
slog.Error("failed to parse promise record", "record", record, "err", err)
res(nil, err)
return
}

res(&types.Response{
Kind: types.ReadPromise,
ReadPromise: &types.ReadPromiseResponse{
Expand Down
Loading

0 comments on commit d08b071

Please sign in to comment.