-
Notifications
You must be signed in to change notification settings - Fork 93
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add Cancel API to cancel a running job #141
Conversation
client.go
Outdated
// In the event the running job finishes executing _before_ the cancellation | ||
// signal is received, the job will be completed or retried as usual without | ||
// regard to the cancellation. TODO: fix this w/ smarter completion query that | ||
// uses metadata value? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See the question in the PR description.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO: if the job was about to be completed, just let that happen without the cancellation superseding it. However, if a job that was cancelled were to be set as retryable with the possibility of running again, that definitely seems wrong and should be fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I agree. That would require changes only to the queries which would mark the job as errored, because in that case you'd want to realize that the cancellation had been attempted and you'd want that to take precedence.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I started working on this, but quickly realized it's a bit more nuanced than I expected. This query is the one making the update from the completer. I wanted to confirm the desired outcome for each of the potential states it could transition into:
completed
: this one's easy, just let it complete / no changescancelled
: also easy, let it cancel / no changesdiscarded
: since the job is going to finalize and not retry again, I think this one is also fine to not change?retryable
: we don't want additional retries to proceed, so we should (a) set afinalized_at
and (b) override thestate
tocancelled
. But we won't have a@finalized_at
input here because the caller hadn't intended to finalize it, so I guess I'd just usenow()
in the query?scheduled
: I guess similar to above, you would also want to override this one to set afinalized_at
and astate
ofcancelled
.
I implemented the above behavior just now, please take a look. I haven't yet added tests because I want to make sure this all sounds reasonable to you first (the tests may be tricky).
client_test.go
Outdated
t.Run("CancelNonExistentJob", func(t *testing.T) { | ||
t.Parallel() | ||
|
||
client, _ := setup(t) | ||
startClient(ctx, t, client) | ||
|
||
// Cancel an unknown job ID: | ||
cancelled, err := client.Cancel(ctx, 0) | ||
// TODO(bgentry): should we try to make this return an error even though the | ||
// query was successfully a no-op? or is it fine that it returns false, nil? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I initially wrote this test expecting the test to error when the row didn't exist, but of course that didn't happen. We could in theory rework the implementation to cause it to error like with an ErrNoRows
type of thing, but I'm not sure that's desirable or necessary. Thoughts?
I think I'm inclined to just keep it a quiet no-op and don't rely on Cancel
to tell you whether or not a job exists (we should expose a "get" method for that).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I definitely would've leaned toward returning an error on a job that doesn't exist at all. The trouble is that it's quite easy in Go to pass the wrong value since all types are only "moderately strong".
e.g. You might be holding an int64
that you think is a job ID, but is actually the primary ID of a different entity in your DB, pass that in expecting a cancellation, but never being the wiser that the wrong thing is happening because you're expecting a cancellation to be somewhat of an edge case anyway that's often a no-op.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think that's doable, although we don't have any such "not found" error defined yet. I'm not actually sure where else it would make sense—perhaps a soon-to-be-added JobGet
API because you similarly wouldn't want to return nil, nil
there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a top level ErrNotFound
and updated Cancel
/CancelTx
to rescue riverdriver.ErrNoRows
and instead return the new error.
afcbd12
to
bb6a7ed
Compare
01d2aa2
to
b1725c2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome. Looking forward to see this coming in!
A couple high level thoughts:
- I know we discussed this somewhat on a call the other week, but given that cancellation is quite a core feature, I think I would've leaned towards bringing it right into the
river_job
schema rather than deferring it to themetadata
K/V blob. Hard schema is generally preferable IMO because not only is it safe against potential name conflicts inmetadata
, but it's just easier to query and update etc. - In my experience in
metadata
or out, it's usually better to prefer timestamps likecancelled_at
rather than straight up booleans likeis_cancelled
because the former providers strictly more information — you can check the cancellation state on either a timestamp or a boolean, but the timestamp adds context to the equation by letting you know when it was cancelled, which is helpful in some situations.
client.go
Outdated
// In the event the running job finishes executing _before_ the cancellation | ||
// signal is received, the job will be completed or retried as usual without | ||
// regard to the cancellation. TODO: fix this w/ smarter completion query that | ||
// uses metadata value? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO: if the job was about to be completed, just let that happen without the cancellation superseding it. However, if a job that was cancelled were to be set as retryable with the possibility of running again, that definitely seems wrong and should be fixed.
client_test.go
Outdated
t.Run("CancelNonExistentJob", func(t *testing.T) { | ||
t.Parallel() | ||
|
||
client, _ := setup(t) | ||
startClient(ctx, t, client) | ||
|
||
// Cancel an unknown job ID: | ||
cancelled, err := client.Cancel(ctx, 0) | ||
// TODO(bgentry): should we try to make this return an error even though the | ||
// query was successfully a no-op? or is it fine that it returns false, nil? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I definitely would've leaned toward returning an error on a job that doesn't exist at all. The trouble is that it's quite easy in Go to pass the wrong value since all types are only "moderately strong".
e.g. You might be holding an int64
that you think is a job ID, but is actually the primary ID of a different entity in your DB, pass that in expecting a cancellation, but never being the wiser that the wrong thing is happening because you're expecting a cancellation to be somewhat of an edge case anyway that's often a no-op.
internal/maintenance/rescuer.go
Outdated
NumJobsDiscarded int64 | ||
NumJobsRetried int64 | ||
} | ||
|
||
type metadataWithCancelledByQuery struct { | ||
CancelledByQuery bool `json:"cancelled_by_query"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean exactly by "cancelled by query"? Does that just mean that it was cancelled and cancellation happens by a query so that it was cancelled by a query?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a little tricky because of the inherent raciness of this operation. The cancel query itself is guaranteed to be completed here, but the job isn't actually cancelled yet at the moment this is set because that's an asynchronous operation which requires the producer to receive the signal, act on it, and the job to then exit. And there are the edge cases where the job completes or gets discarded before the cancel signal is received.
Perhaps cancel_attempted_at
or cancel_initiated_at
is a suitable name for this attr and/or a column?
type jobControlAction string | ||
|
||
const ( | ||
jobControlActionCancel jobControlAction = "cancel" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OOC, are there any other actions you were expecting to come in?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have anything else in mind that's specific to a single in-progress job. You could retry a single job immediately (no matter its state), but that doesn't require a notification because it's just a matter of updating the job row.
client.go
Outdated
// | ||
// Returns true if the job was cancelled or cancellation was initiated, and | ||
// false if this was a no-op because the job was already finalized. | ||
func (c *Client[TTx]) Cancel(ctx context.Context, jobID int64) (bool, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I kinda would've leaned toward having this return a job row representing the updated job for purposes of logging and additional checking. Thoughts?
A side benefit would be that it'd also allow you to disambiguate between the different edge conditions too. i.e. If the job was completed before it was cancelled, you could check its final state to see that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I started with that initially and probably should have kept on in that direction. The difficulty is it makes the query just a bit trickier because you have to lock the specified row no matter what, and return it whether or not it was cancelled/updated. That's only a minor challenge, though.
It probably does make it more important to offer a first-class way to tell that a cancellation has been attempted (not merely a metadata field per your other comment).
A side benefit would be that it'd also allow you to disambiguate between the different edge conditions too. i.e. If the job was completed before it was cancelled, you could check its final state to see that.
You wouldn't necessarily get this in all cases, at least not as a result of this API. The notification to cancel the job doesn't get emitted until the transaction here is committed, and there's a lag before the notification is received and processed by the producer running the job. But after-the-fact, you would be able to look at the job's row and see that the cancellation was attempted but that the job was still completed. A timestamp (as you suggested) makes this more useful because you'd see that these events were within milliseconds of each other.
Yeah, I think my hesitation here is I'm a bit reluctant to add columns generally because of the potential impact on table bloat and performance. One additional timestamp is probably fine, although you could argue that it's being used to cover some fairly rare edge cases and that alone may not be worth the potential impact of adding the column. Plus, you'd need a migration to do this, and we have to decide if that's worth doing for this particular purpose. That said, if we go this route, I have a few other unrelated things in other branches that we'd want to bundle together with it (non-null on
I agree a timestamp would have been better and I should have thought to do that! Even just for debugging some of these race conditions it'd be useful to see that the job was completed/finalized mere ms after the cancellation attempt. |
client_test.go
Outdated
// TODO(bgentry): do we want to expose a different error type for this | ||
// externally since we don't yet want to expose riverdriver publicly? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed up a slightly updated branch here to return the job row, haven't yet finished the rest of the changes. However this specific question popped up and I wanted to ask you about it. We have a riverdriver.ErrNoRows
which presumably is meant to abstract away the different ErrNoRows
types from the underlying drivers, however my assumption is that we may not want to expose this externally.
Which error should we be returning here in the top-level river
package, and which should we be exposing from dbadapter
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to return a new ErrNotFound
from the Client
when Cancel
or CancelTx
gets back a riverdriver.ErrNoRows
from the adapter call.
792bcaf
to
f162181
Compare
4899891
to
7a9d8bd
Compare
@brandur I think I've made all the changes here except for the potential dedicated DB column. Interested in your thoughts on that based on my above points. If we go that route, we'll want to ship a single migration that includes a handful of changes batched together, so we'll need to coordinate on that. |
3738e6f
to
d8f31ff
Compare
|
||
statusUpdateCh := client.monitor.RegisterUpdates() | ||
startClient(ctx, t, client) | ||
waitForClientHealthy(ctx, t, statusUpdateCh) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got a flaky failure on these tests in this run. There wasn't anything particularly helpful in the failure logs, other than that the job kept running and didn't get cancelled. My guess, however, is that the notifier wasn't up and running before the cancel signal was sent.
I tried reproducing it with thousands of runs locally but couldn't get any failures. Ultimately I added this logic to wait for internal components to come up and be healthy prior to proceeding with the test.
It makes me wonder if there's a more systemic issue here; should we be waiting for some components to come up before returning from Start()
? Or should our startClient
test helper always be waiting for the client to become healthy before returning?
I'm re-running CI several times just to get more confidence that this fixed the issue, no more failures yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It makes me wonder if there's a more systemic issue here; should we be waiting for some components to come up before returning from
Start()
? Or should ourstartClient
test helper always be waiting for the client to become healthy before returning?
Yeah, good question. I kind of suspect that ... yes, a Start
that returned only when things were really healthy and ready to go would overall be better. Shorter term, I like the idea of a test helper that waits for everything to be fully healthy, like you've done here, but we may want to make it even more widespread with an easy way to get at it from riverinternaltest
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could easily update startClient
to wait for the client to become healthy. That should reduce flakiness and shouldn't impact test timing much as everything starts quickly. However since there is no client health exposed externally yet, it's purely an internal solution for testing.
There's the goal of making sure the client is healthy and fully booted before proceeding; this may be desirable sometimes, but other times users may wish to allow the client to finish booting asynchronously while their app does other initialization work. But then there's the separate goal of monitoring the client's health as it operates, and any of these internal components could in theory have issues or connectivity issues during operation.
I'm sure there are other libraries which try to tackle these problems, but one particular instance I have recent experience with is LaunchDarkly's SDK. It's a bit poorly documented and feels overly abstracted / Java-like, but the concepts there are reasonably well-solved and that's what I was intending to do when I initially built the client monitor stuff internally. I don't claim that the design is perfect or finished but at least the general problem is somewhat solved by it. And maybe some of it can/should be baked in as part of Start()
so users don't need to think about it as much.
We should probably take a look at this both for our own tests but also for users' sake in follow up PRs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think my hesitation here is I'm a bit reluctant to add columns generally because of the potential impact on table bloat and performance. One additional timestamp is probably fine, although you could argue that it's being used to cover some fairly rare edge cases and that alone may not be worth the potential impact of adding the column. Plus, you'd need a migration to do this, and we have to decide if that's worth doing for this particular purpose.
That said, if we go this route, I have a few other unrelated things in other branches that we'd want to bundle together with it (non-null on
args
, adding apending
state).
K. The one counter is that in my experience at least, schema is basically always better because it's more visible, more discoverable, provides tab complete, etc., so if we're not going that direction, we should make sure it's for the right reasons, rather than just because it's easier.
I don't think table bloat/perf is a concern if I understand the Postgres page layout. Because this would be a nullable field, a value wouldn't even be present for most rows, and it'd just require one extra bit in the NULL bitmask. Read this and see if I'm out to lunch on that:
https://www.postgresql.org/docs/current/storage-page-layout.html#STORAGE-TUPLE-LAYOUT
It seems okay to start with less schema and then batch up schema changes together into one migration. We'd have to be careful to try and avoid and migration queries that are too expensive (i.e. we might not even bother converting older cancelled job rows to the new format if the table is too large), but should be doable.
|
||
statusUpdateCh := client.monitor.RegisterUpdates() | ||
startClient(ctx, t, client) | ||
waitForClientHealthy(ctx, t, statusUpdateCh) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It makes me wonder if there's a more systemic issue here; should we be waiting for some components to come up before returning from
Start()
? Or should ourstartClient
test helper always be waiting for the client to become healthy before returning?
Yeah, good question. I kind of suspect that ... yes, a Start
that returned only when things were really healthy and ready to go would overall be better. Shorter term, I like the idea of a test helper that waits for everything to be fully healthy, like you've done here, but we may want to make it even more widespread with an easy way to get at it from riverinternaltest
.
This adds a new `Client` method `Cancel` which can cancel a running job. It has these behaviors: * Jobs which are still scheduled, retryable, or available are immediately cancelled and will not be run again. * Jobs that are already finalized (completed, cancelled, or discarded) are not touched. * Jobs that are actively running are marked for cancellation, and cancellation is attempted using a `LISTEN`/`NOTIFY` pubsub message which is picked up by the client and producer running that job. Because Go offers no way to interrupt a running goroutine, actively running jobs cannot be immediately halted and cancelled, so we can only cancel the job's context. Once the cancellation signal is received by the client running the job, any error returned by that job will result in it being cancelled permanently and not retried. However if the job returns no error, it will be completed as usual. In the event the running job finishes executing _before_ the cancellation signal is received but _after_ this update was made, the behavior depends on which state the job is being transitioned into: - If the job completed successfully, was cancelled from within, or was discarded due to exceeding its max attempts, the job will be updated as usual. - If the job was snoozed to run again later or encountered a retryable error, the job will be marked as cancelled and will not be attempted again. When no job is found with the specified ID, a new `ErrNotFound` is returned. Also expose `JobCancelTx` variant.
ef6cb85
to
31ab35d
Compare
6469662
to
9b1eb55
Compare
@brandur I just added an example here demonstrating remote job cancellation. Notably, it suffers from the exact problem we were discussing around not being able to ensure that the client is ready and needing to sleep to get around that 😕 Feel free to review, we can always improve the example after this PR gets merged (which I'll do as soon as CI goes green). |
This was added in #141 / 0aaeee8. Meanwhile another job-related query API was added called `JobList`, but `Cancel` and `CancelTx` were not given the `Job*` prefix. This renames `Cancel` to `JobCancel` and `CancelTx` to `JobCancelTx` for consistency. Other upcoming job query APIs will also use the prefix.
This was added in #141 / 0aaeee8. Meanwhile another job-related query API was added called `JobList`, but `Cancel` and `CancelTx` were not given the `Job*` prefix. This renames `Cancel` to `JobCancel` and `CancelTx` to `JobCancelTx` for consistency. Other upcoming job query APIs will also use the prefix.
This was added in #141 / 0aaeee8. Meanwhile another job-related query API was added called `JobList`, but `Cancel` and `CancelTx` were not given the `Job*` prefix. This renames `Cancel` to `JobCancel` and `CancelTx` to `JobCancelTx` for consistency. Other upcoming job query APIs will also use the prefix.
This adds a new
Client
methodCancel
which can cancel a running job. It has these behaviors:LISTEN
/NOTIFY
pubsub message which is picked up by the client and producer running that job.Because Go offers no way to interrupt a running goroutine, actively running jobs cannot be immediately halted and cancelled, so we can only cancel the job's context. Once the cancellation signal is received by the client running the job, any error returned by that job will result in it being cancelled permanently and not retried. However if the job returns no error, it will be completed as usual.
In the event the running job finishes executing before the cancellation signal is received, the job will be completed or retried as usual without regard to the cancellation. @brandur Realized as I was writing this that it was solvable, do you think we should try to fix this with a smarter completion query that uses the metadata value we set as part of the
Cancel
query?Additional documentation to follow once we're satisfied with the shape and behavior of this API.