-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RPC server task management #1380
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I gave this a spin locally and it looks like it works super well! I suppose as a part of this, it would be a good idea to add query cancellation to BigQuery. We can tackle that in a separate issue...
@beckjake how do you feel about this PR? It all looks reasonable to me, but I'm curious if there are any parts that you are particularly pleased with / unhappy with
try: | ||
msgtype, value = self.queue.get(timeout=get_timeout) | ||
except QueueEmpty: | ||
raise dbt.exceptions.RPCTimeoutException(self.timeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this the right exception to raise here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. if the queue has not received any messages in the calculated timeout value, we have exceeded the set timeout, and should raise a timeout exception. This is actually old code, it just used to live in tasks/rpc_server.py
I'm pretty happy with how it came out. I think the builtin stuff will probably end up changing, and I didn't like all the stuff I had to do on the ResponseManager, but I think it's all fine for now. I would like to refactor the RemoteCompileTask a bit to separate it from the CompileTask, but that's a problem for another day / another PR. |
Oh right - I'm also not super satisfied with the object we return on a |
) | ||
|
||
for _ in range(20): | ||
time.sleep(0.2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a deadlock in tests, maybe sometimes. I think adding this sleep to the test loop fixed it, which does not make me feel good. But I tested it by running the tests over and over in a loop and they don't fail, which does make me feel a little better at least.
9683d47
to
0682cf9
Compare
…e-raise it instead of wrapping it
also fix unit tests
proper cancel support Refactor rpc server logic a bit fix an issue in query cancelation where we would cancel ourselves fix exception handling misbehavior
when adding more threads stops helping, add more sleeps
result = { | ||
'columns': list(TaskRow._fields), | ||
'rows': [list(r) for r in table], | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you make this a list of objects instead? e.g.
[{
"task_id": "1f7054d4-8dae-4560-9cc1-662127de4184",
"request_id": "5bf9dec2-431b-44e2-9d76-1239928571ad",
"request_source": "10.1.31.102",
"method": "run",
"state": "running",
"start": 1554303525.1364384,
"elapsed": 0.014189481735229492,
"timeout": 900
}]
In the frontend, I have to iterate over the list of tasks and match the request id I sent to the request id in the table. if the indexing of the table changes, my loop will break.... but this way I can just look the task up by key.
if data
has to be a dict, just put this list under another key, up to you what that is called
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm pretty sure it does have to be a dict, so I named the key rows
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this works great. the error is dumb ("RPC process killed by signal 2") but whatever. we can improve that later.
The error is kind of dumb, but it also kind of has to be - the process that gets the SIGINT doesn't really know the cause. Or even that it's a SIGINT, really. We could send a SIGUSR1 and implement a handler for that instead, then we could be much more intelligent on the response. That's a decent amount of work as we'd have to add support for it everywhere that handles |
Fixes #1369
Fixes #1370
Implements special 'ps' and 'kill' commands, created a framework that ties requests into API calls and makes them available. This should make async calls easy enough once we decide on how we want them to behave.
To do this I had to mess around quite a bit with how the RPC server executes things, but I think I preserved the good behavior reasonably well. Some edge case issues in error handling might still exist, it's a bit messy in there.
I added the concept of "builtin" methods that are not tasks. I guess we'll see how sustainable that is in the long run, it should be pretty easy to change how that's implemented if we want to. Right now they are just methods on the task manager since they do task management things.
I fixed some nasty query cancellation bugs along the way (databases would kill the connection that was being used to kill connections). I also moved a bunch of things from
tasks/rpc_server
torpc
. I'll expect I'll split that up into entries in a folder in some future PR. Eventually the RPC compile/run tasks will also probably have to move somewhere.The server is now threaded instead of multiprocessing-based, because threading is a LOT easier. I don't think this will have any end-user impact, but who knows.