-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Async callbacks for processing GetColumnProfile RPC requests #473
Conversation
@dfalbel I changed the base to |
77117df
to
c09500e
Compare
754119e
to
e46b104
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice first stab!
I think we could make it simpler and safer by using the new idle task mechanism r_task::spawn_idle()
:
-
These tasks run on the R thread when R is idle instead of at interrupt time, which is the direction we'd like to take. It's also consistent with the data explorer being an idle-only feature (you can't use it when the kernel is busy on a shiny app or computing an expensive model).
-
These tasks take rust async functions, which make it easy to yield for a tick, fail on timeout etc.
-
They could own the
callback_id
and send the result back to the data explorer thread in the form of a response event. This way we won't need to spawn any threads or create any temporary channels. We just need a new permanent channel to transmit events to the data explorer thread, the receiving side of which would be handled by the data explorer event loop. -
The tasks may get delayed by an incoming execute request. This means there's also a possibility of sending outdated summaries if the data was updated, but I guess the frontend should handle that case. There should be no possibility of piling up idle tasks while R is busy since the shell socket is busy as well and can't take any new data explorer requests.
c7cca12
to
414a160
Compare
efaebcb
to
cac8ace
Compare
Updated to use The most challending thing was that the task in Moved all profile handling functions to a separate file as they can no longer depend on I also had to change tests, because since there's no spawned thread in the tests, the tasks are now running synchronously and thus the messages get out of order. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good progress! Some more changes needed as this is tricky.
@@ -253,7 +253,7 @@ where | |||
Fut: Future<Output = ()> + 'static, | |||
{ | |||
// Idle tasks are always run from the read-console loop | |||
if !only_idle && unsafe { R_TASK_BYPASS } { | |||
if unsafe { R_TASK_BYPASS } { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmmmm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:P
I forgot I had to change that to make tests pass. In theory this only affects tests because of R_TASK_BYPASS
and all tests seem to be passing normally.
If I don't add this, then the tests get deadlock as there's no R main thread to actually watch the task queue and execute them.
let json_event = serde_json::to_value(event)?; | ||
comm.outgoing_tx.send(CommMsg::Data(json_event))?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still confused that we don't reply in case of errors, since there is a callback ID I would expect the frontend is keeping track of it. But I guess this is cleaned up when it no longer needs the answer because the data has changed? (I haven't looked into the frontend code yet)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, currently the RPC contract does not implement any error handling, so if it fails and we never respond, all the front-end will see is a timeout.
I added some handling here, so basically, when it fails we will return an empty result set:
ark/crates/ark/src/data_explorer/column_profile.rs
Lines 57 to 66 in 56726b7
.await | |
.unwrap_or_else(|e| { | |
// In case something goes wrong whhile computing the profiles, we send | |
// an empty response. Ideally, we would have a way to comunicate an that | |
// an error happened but it's not implemented yet. | |
log::error!("Error while producing profiles: {e}"); | |
std::iter::repeat(empty_column_profile_result()) | |
.take(n_profiles) | |
.collect() | |
}); |
We already return empty profiles if we fail to compute profiles, thus it seems reasonable to also do that if everything fails.
We can add error handling too, we would need make ReturnColumnsProfile
an enum so we would return either the actual profiles or an error message, but I'd rather add this in another PR because it will also require changes in the python backend.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I created posit-dev/positron#4559 to track this issue.
// This allows for background threads to easilly access the | ||
// instance related data without having to rely on the lifetime | ||
// of data explorer execution thread. | ||
// Since this is a DashMap, it's safe to access it's underlying data from |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of a dashmap stored in a global, could we use an Arc<Mutex<HashMap>>
? This should solve the lifetime concerns?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or passing around Arc<Mutex<Option<RThreadSafeObject<RObject>>>>
as we discussed on Slack, so that an instance can set it to None
when it's dropped, and getting a None
would be a cancellation point for the task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This worked nice! So got rid of the global dashmap, and lifetime is now managed with Arc
. Failing to get()
from the table means that the table has been deleted and so it can be used to cancel tasks in case the data explorer is closed, although we don't really use it for now.
We would probably also want to implement a cancelling mechanism based on the callback_id
, so the front-end can cancel tasks it will no longer need. But we can leave this for another PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh I meant cancelling the idle task, which does happen already via the propagated get()
error, e.g. your comment
// In case something goes wrong whhile computing the profiles, we send
// an empty response. Ideally, we would have a way to comunicate an that
// an error happened but it's not implemented yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good!
Co-authored-by: Lionel Henry <[email protected]>
This PR pairs with posit-dev/positron#4326
And should be merged after #458
This makes
GetColumnProfile
requests asycronous giving the data explorer execution thread oportunities to compute other RPC methods, such as GetDataValues which should make the data explorer UX a little smoother.