Skip to content

Commit

Permalink
Make RpcMessage::handle async
Browse files Browse the repository at this point in the history
Allows the function to return a future, which itself is awaited for the
response.
  • Loading branch information
Thomasdezeeuw committed Dec 29, 2023
1 parent c9dd146 commit 579518c
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 18 deletions.
12 changes: 7 additions & 5 deletions examples/2_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@ async fn pong_actor(mut ctx: actor::Context<PongMessage, ThreadLocal>) {
// Await a message, same as all other messages.
while let Ok(msg) = ctx.receive_next().await {
// Next we respond to the request.
let res = msg.handle(|request| {
println!("Got a RPC request: {request}");
// Return a response.
Pong
});
let res = msg
.handle(|request| async move {
println!("Got a RPC request: {request}");
// Return a response.
Pong
})
.await;

if let Err(err) = res {
eprintln!("failed to respond to RPC: {err}");
Expand Down
12 changes: 7 additions & 5 deletions rt/examples/3_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@ async fn pong_actor(mut ctx: actor::Context<PongMessage, ThreadLocal>) {
// Await a message, same as all other messages.
while let Ok(msg) = ctx.receive_next().await {
// Next we respond to the request.
let res = msg.handle(|request| {
println!("Got a RPC request: {request}");
// Return a response.
Pong
});
let res = msg
.handle(|request| async move {
println!("Got a RPC request: {request}");
// Return a response.
Pong
})
.await;

if let Err(err) = res {
eprintln!("failed to respond to RPC: {err}");
Expand Down
5 changes: 3 additions & 2 deletions rt/tests/functional/actor_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use std::convert::Infallible;
use std::fmt;
use std::future::ready;
use std::num::NonZeroUsize;
use std::pin::Pin;
use std::task::Poll;
Expand Down Expand Up @@ -650,7 +651,7 @@ impl From<RpcMessage<Ping, Pong>> for RpcTestMessage {
async fn pong(mut ctx: actor::Context<RpcTestMessage, ThreadLocal>) {
while let Ok(msg) = ctx.receive_next().await {
match msg {
RpcTestMessage::Ping(msg) => msg.handle(|_| Pong).unwrap(),
RpcTestMessage::Ping(msg) => msg.handle(|_| ready(Pong)).await.unwrap(),
RpcTestMessage::Check => {}
}
}
Expand Down Expand Up @@ -857,7 +858,7 @@ async fn wake_on_response(_: actor::Context<!, ThreadLocal>, relay_ref: ActorRef
async fn wake_on_rpc_receive(mut ctx: actor::Context<RpcTestMessage, ThreadLocal>) {
while let Ok(msg) = ctx.receive_next().await {
match msg {
RpcTestMessage::Ping(msg) => msg.handle(|_| Pong).unwrap(),
RpcTestMessage::Ping(msg) => msg.handle(|_| ready(Pong)).await.unwrap(),
RpcTestMessage::Check => {}
}
}
Expand Down
13 changes: 7 additions & 6 deletions src/actor_ref/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,9 @@ pub struct RpcMessage<Req, Res> {
impl<Req, Res> RpcMessage<Req, Res> {
/// Convenience method to handle a `Req`uest and return a `Res`ponse.
///
/// The function `f` is called with [`self.request`], the response returned by
/// the function `f` is than returned to the request maker via
/// [`self.response.respond`].
/// The function `f` is called with [`self.request`] and the returned future
/// is awaited, the response returned by the future is than returned to the
/// requester via [`self.response.respond`].
///
/// [`self.request`]: RpcMessage::request
/// [`self.response.respond`]: RpcResponse::respond
Expand All @@ -276,12 +276,13 @@ impl<Req, Res> RpcMessage<Req, Res> {
/// called and `Ok(())` is returned instead.
///
/// [no longer connected]: RpcResponse::is_connected
pub fn handle<F>(self, f: F) -> Result<(), SendError>
pub async fn handle<F, Fut>(self, f: F) -> Result<(), SendError>
where
F: FnOnce(Req) -> Res,
F: FnOnce(Req) -> Fut,
Fut: Future<Output = Res>,
{
if self.response.is_connected() {
let response = f(self.request);
let response = f(self.request).await;
self.response.respond(response)
} else {
// If the receiving actor is no longer waiting we can skip the
Expand Down

0 comments on commit 579518c

Please sign in to comment.