Skip to content

Commit

Permalink
feat: Add typed actor implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
j5ik2o committed Jul 14, 2024
1 parent 580583f commit ab6b914
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 2 deletions.
1 change: 1 addition & 0 deletions src/actor/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub mod sender_middleware_chain;
pub mod spawn_middleware;
pub mod spawner;
pub mod taks;
mod typed_actor;
// include!(concat!(env!("OUT_DIR"), "/actor.rs"));

#[derive(Debug, Clone, PartialEq)]
Expand Down
8 changes: 6 additions & 2 deletions src/actor/actor/actor_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub enum ActorError {
StopError(ActorInnerError),
InitializationError(ActorInnerError),
CommunicationError(ActorInnerError),
BehaviorNotInitialized(ActorInnerError),
}

impl ActorError {
Expand All @@ -19,7 +20,8 @@ impl ActorError {
| ActorError::RestartError(e)
| ActorError::StopError(e)
| ActorError::InitializationError(e)
| ActorError::CommunicationError(e) => Some(e),
| ActorError::CommunicationError(e)
| ActorError::BehaviorNotInitialized(e) => Some(e),
}
}
}
Expand All @@ -32,6 +34,7 @@ impl Display for ActorError {
ActorError::StopError(e) => write!(f, "Stop error: {}", e),
ActorError::InitializationError(e) => write!(f, "Initialization error: {}", e),
ActorError::CommunicationError(e) => write!(f, "Communication error: {}", e),
ActorError::BehaviorNotInitialized(e) => write!(f, "Behavior not initialized: {}", e),
}
}
}
Expand All @@ -43,7 +46,8 @@ impl Error for ActorError {
| ActorError::RestartError(e)
| ActorError::StopError(e)
| ActorError::InitializationError(e)
| ActorError::CommunicationError(e) => Some(e),
| ActorError::CommunicationError(e)
| ActorError::BehaviorNotInitialized(e) => Some(e),
}
}
}
98 changes: 98 additions & 0 deletions src/actor/actor/typed_actor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
use crate::actor::actor::actor::Actor;
use crate::actor::actor::actor_error::ActorError;
use crate::actor::actor::actor_inner_error::ActorInnerError;
use crate::actor::context::context_handle::ContextHandle;
use crate::actor::context::MessagePart;
use crate::actor::message::message::Message;
use async_trait::async_trait;
use futures::future::BoxFuture;
use std::fmt::{Debug, Formatter};
use std::future::Future;
use std::sync::Arc;
use tokio::sync::Mutex;

type BehaviorFn<M> = Arc<dyn Fn(M, &mut ActorContext<M>) -> BoxFuture<'static, Behavior<M>> + Send + Sync>;

#[derive(Clone)]
pub struct Behavior<M: Message> {
f: BehaviorFn<M>,
}

impl<M: Message> Debug for Behavior<M> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Behavior")
}
}

impl<M: Message> Behavior<M> {
pub fn new<F, Fut>(f: F) -> Self
where
F: Fn(M, &mut ActorContext<M>) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Behavior<M>> + Send + 'static, {
Behavior {
f: Arc::new(move |msg, ctx| Box::pin(f(msg, ctx))),
}
}

pub async fn receive(&self, msg: M, ctx: &mut ActorContext<M>) -> Behavior<M> {
(self.f)(msg, ctx).await
}
}

pub struct ActorContext<M: Message> {
context_handle: ContextHandle,
_phantom: std::marker::PhantomData<M>,
}

impl<M: Message> ActorContext<M> {
pub fn new(context_handle: ContextHandle) -> Self {
Self {
context_handle,
_phantom: std::marker::PhantomData,
}
}

pub fn context_handle(&self) -> &ContextHandle {
&self.context_handle
}
}

#[async_trait]
pub trait BehaviorActor: Debug {
type Message: Message + Clone;

fn create_initial_behavior() -> Behavior<Self::Message>;
}

#[derive(Debug)]
struct ActorWrapper<A: BehaviorActor> {
behavior: Arc<Mutex<Option<Behavior<A::Message>>>>,
}

impl<A: BehaviorActor> ActorWrapper<A> {
fn new() -> Self {
Self {
behavior: Arc::new(Mutex::new(Some(A::create_initial_behavior()))),
}
}
}

#[async_trait]
impl<A: BehaviorActor + 'static> Actor for ActorWrapper<A> {
async fn receive(&mut self, context_handle: ContextHandle) -> Result<(), ActorError> {
let message_handle = context_handle.get_message_handle().await;
let msg = message_handle.to_typed::<A::Message>().unwrap();

let mut behavior_guard = self.behavior.lock().await;
if let Some(current_behavior) = behavior_guard.take() {
let mut actor_context = ActorContext::new(context_handle);
let new_behavior = current_behavior.receive(msg, &mut actor_context).await;
*behavior_guard = Some(new_behavior);
} else {
return Err(ActorError::BehaviorNotInitialized(ActorInnerError::new(
"Behavior not initialized".to_string(),
)));
}
Ok(())
}
}

0 comments on commit ab6b914

Please sign in to comment.