From 579518c24b9eb0d0a1ebd611705609a18bac0869 Mon Sep 17 00:00:00 2001 From: Thomas de Zeeuw Date: Fri, 29 Dec 2023 17:48:07 +0100 Subject: [PATCH] Make RpcMessage::handle async Allows the function to return a future, which itself is awaited for the response. --- examples/2_rpc.rs | 12 +++++++----- rt/examples/3_rpc.rs | 12 +++++++----- rt/tests/functional/actor_ref.rs | 5 +++-- src/actor_ref/rpc.rs | 13 +++++++------ 4 files changed, 24 insertions(+), 18 deletions(-) diff --git a/examples/2_rpc.rs b/examples/2_rpc.rs index 3ea2be5f4..745844168 100644 --- a/examples/2_rpc.rs +++ b/examples/2_rpc.rs @@ -42,11 +42,13 @@ async fn pong_actor(mut ctx: actor::Context) { // Await a message, same as all other messages. while let Ok(msg) = ctx.receive_next().await { // Next we respond to the request. - let res = msg.handle(|request| { - println!("Got a RPC request: {request}"); - // Return a response. - Pong - }); + let res = msg + .handle(|request| async move { + println!("Got a RPC request: {request}"); + // Return a response. + Pong + }) + .await; if let Err(err) = res { eprintln!("failed to respond to RPC: {err}"); diff --git a/rt/examples/3_rpc.rs b/rt/examples/3_rpc.rs index 3ea2be5f4..745844168 100644 --- a/rt/examples/3_rpc.rs +++ b/rt/examples/3_rpc.rs @@ -42,11 +42,13 @@ async fn pong_actor(mut ctx: actor::Context) { // Await a message, same as all other messages. while let Ok(msg) = ctx.receive_next().await { // Next we respond to the request. - let res = msg.handle(|request| { - println!("Got a RPC request: {request}"); - // Return a response. - Pong - }); + let res = msg + .handle(|request| async move { + println!("Got a RPC request: {request}"); + // Return a response. + Pong + }) + .await; if let Err(err) = res { eprintln!("failed to respond to RPC: {err}"); diff --git a/rt/tests/functional/actor_ref.rs b/rt/tests/functional/actor_ref.rs index eee1d2ee2..5977a69c9 100644 --- a/rt/tests/functional/actor_ref.rs +++ b/rt/tests/functional/actor_ref.rs @@ -2,6 +2,7 @@ use std::convert::Infallible; use std::fmt; +use std::future::ready; use std::num::NonZeroUsize; use std::pin::Pin; use std::task::Poll; @@ -650,7 +651,7 @@ impl From> for RpcTestMessage { async fn pong(mut ctx: actor::Context) { while let Ok(msg) = ctx.receive_next().await { match msg { - RpcTestMessage::Ping(msg) => msg.handle(|_| Pong).unwrap(), + RpcTestMessage::Ping(msg) => msg.handle(|_| ready(Pong)).await.unwrap(), RpcTestMessage::Check => {} } } @@ -857,7 +858,7 @@ async fn wake_on_response(_: actor::Context, relay_ref: ActorRef async fn wake_on_rpc_receive(mut ctx: actor::Context) { while let Ok(msg) = ctx.receive_next().await { match msg { - RpcTestMessage::Ping(msg) => msg.handle(|_| Pong).unwrap(), + RpcTestMessage::Ping(msg) => msg.handle(|_| ready(Pong)).await.unwrap(), RpcTestMessage::Check => {} } } diff --git a/src/actor_ref/rpc.rs b/src/actor_ref/rpc.rs index 7d0f86aa2..d4702ba7c 100644 --- a/src/actor_ref/rpc.rs +++ b/src/actor_ref/rpc.rs @@ -263,9 +263,9 @@ pub struct RpcMessage { impl RpcMessage { /// Convenience method to handle a `Req`uest and return a `Res`ponse. /// - /// The function `f` is called with [`self.request`], the response returned by - /// the function `f` is than returned to the request maker via - /// [`self.response.respond`]. + /// The function `f` is called with [`self.request`] and the returned future + /// is awaited, the response returned by the future is than returned to the + /// requester via [`self.response.respond`]. /// /// [`self.request`]: RpcMessage::request /// [`self.response.respond`]: RpcResponse::respond @@ -276,12 +276,13 @@ impl RpcMessage { /// called and `Ok(())` is returned instead. /// /// [no longer connected]: RpcResponse::is_connected - pub fn handle(self, f: F) -> Result<(), SendError> + pub async fn handle(self, f: F) -> Result<(), SendError> where - F: FnOnce(Req) -> Res, + F: FnOnce(Req) -> Fut, + Fut: Future, { if self.response.is_connected() { - let response = f(self.request); + let response = f(self.request).await; self.response.respond(response) } else { // If the receiving actor is no longer waiting we can skip the