From ab6b914fbcd84ba62f6232313949d05a400fee31 Mon Sep 17 00:00:00 2001 From: Junichi Kato Date: Sun, 14 Jul 2024 18:49:56 +0900 Subject: [PATCH] feat: Add typed actor implementation --- src/actor/actor.rs | 1 + src/actor/actor/actor_error.rs | 8 ++- src/actor/actor/typed_actor.rs | 98 ++++++++++++++++++++++++++++++++++ 3 files changed, 105 insertions(+), 2 deletions(-) create mode 100644 src/actor/actor/typed_actor.rs diff --git a/src/actor/actor.rs b/src/actor/actor.rs index db39d610..eec0e580 100644 --- a/src/actor/actor.rs +++ b/src/actor/actor.rs @@ -25,6 +25,7 @@ pub mod sender_middleware_chain; pub mod spawn_middleware; pub mod spawner; pub mod taks; +mod typed_actor; // include!(concat!(env!("OUT_DIR"), "/actor.rs")); #[derive(Debug, Clone, PartialEq)] diff --git a/src/actor/actor/actor_error.rs b/src/actor/actor/actor_error.rs index d49b7089..53cdcc61 100644 --- a/src/actor/actor/actor_error.rs +++ b/src/actor/actor/actor_error.rs @@ -10,6 +10,7 @@ pub enum ActorError { StopError(ActorInnerError), InitializationError(ActorInnerError), CommunicationError(ActorInnerError), + BehaviorNotInitialized(ActorInnerError), } impl ActorError { @@ -19,7 +20,8 @@ impl ActorError { | ActorError::RestartError(e) | ActorError::StopError(e) | ActorError::InitializationError(e) - | ActorError::CommunicationError(e) => Some(e), + | ActorError::CommunicationError(e) + | ActorError::BehaviorNotInitialized(e) => Some(e), } } } @@ -32,6 +34,7 @@ impl Display for ActorError { ActorError::StopError(e) => write!(f, "Stop error: {}", e), ActorError::InitializationError(e) => write!(f, "Initialization error: {}", e), ActorError::CommunicationError(e) => write!(f, "Communication error: {}", e), + ActorError::BehaviorNotInitialized(e) => write!(f, "Behavior not initialized: {}", e), } } } @@ -43,7 +46,8 @@ impl Error for ActorError { | ActorError::RestartError(e) | ActorError::StopError(e) | ActorError::InitializationError(e) - | ActorError::CommunicationError(e) => Some(e), + | ActorError::CommunicationError(e) + | ActorError::BehaviorNotInitialized(e) => Some(e), } } } diff --git a/src/actor/actor/typed_actor.rs b/src/actor/actor/typed_actor.rs new file mode 100644 index 00000000..cfd70048 --- /dev/null +++ b/src/actor/actor/typed_actor.rs @@ -0,0 +1,98 @@ +use crate::actor::actor::actor::Actor; +use crate::actor::actor::actor_error::ActorError; +use crate::actor::actor::actor_inner_error::ActorInnerError; +use crate::actor::context::context_handle::ContextHandle; +use crate::actor::context::MessagePart; +use crate::actor::message::message::Message; +use async_trait::async_trait; +use futures::future::BoxFuture; +use std::fmt::{Debug, Formatter}; +use std::future::Future; +use std::sync::Arc; +use tokio::sync::Mutex; + +type BehaviorFn = Arc) -> BoxFuture<'static, Behavior> + Send + Sync>; + +#[derive(Clone)] +pub struct Behavior { + f: BehaviorFn, +} + +impl Debug for Behavior { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "Behavior") + } +} + +impl Behavior { + pub fn new(f: F) -> Self + where + F: Fn(M, &mut ActorContext) -> Fut + Send + Sync + 'static, + Fut: Future> + Send + 'static, { + Behavior { + f: Arc::new(move |msg, ctx| Box::pin(f(msg, ctx))), + } + } + + pub async fn receive(&self, msg: M, ctx: &mut ActorContext) -> Behavior { + (self.f)(msg, ctx).await + } +} + +pub struct ActorContext { + context_handle: ContextHandle, + _phantom: std::marker::PhantomData, +} + +impl ActorContext { + pub fn new(context_handle: ContextHandle) -> Self { + Self { + context_handle, + _phantom: std::marker::PhantomData, + } + } + + pub fn context_handle(&self) -> &ContextHandle { + &self.context_handle + } +} + +#[async_trait] +pub trait BehaviorActor: Debug { + type Message: Message + Clone; + + fn create_initial_behavior() -> Behavior; +} + +#[derive(Debug)] +struct ActorWrapper { + behavior: Arc>>>, +} + +impl ActorWrapper { + fn new() -> Self { + Self { + behavior: Arc::new(Mutex::new(Some(A::create_initial_behavior()))), + } + } +} + +#[async_trait] +impl Actor for ActorWrapper { + async fn receive(&mut self, context_handle: ContextHandle) -> Result<(), ActorError> { + let message_handle = context_handle.get_message_handle().await; + let msg = message_handle.to_typed::().unwrap(); + + let mut behavior_guard = self.behavior.lock().await; + if let Some(current_behavior) = behavior_guard.take() { + let mut actor_context = ActorContext::new(context_handle); + let new_behavior = current_behavior.receive(msg, &mut actor_context).await; + *behavior_guard = Some(new_behavior); + } else { + return Err(ActorError::BehaviorNotInitialized(ActorInnerError::new( + "Behavior not initialized".to_string(), + ))); + } + Ok(()) + } +}