diff --git a/src/actor/actor.rs b/src/actor/actor.rs index eec0e580..382914e3 100644 --- a/src/actor/actor.rs +++ b/src/actor/actor.rs @@ -5,9 +5,6 @@ pub mod actor_inner_error; pub mod actor_process; pub mod actor_producer; pub mod actor_receiver; -pub mod behavior; -mod behavior_logic_test; -mod behavior_test; pub mod context_decorator; pub mod context_decorator_chain; pub mod context_handler; diff --git a/src/actor/actor/behavior.rs b/src/actor/actor/behavior.rs deleted file mode 100644 index 78497a98..00000000 --- a/src/actor/actor/behavior.rs +++ /dev/null @@ -1,79 +0,0 @@ -use crate::actor::actor::actor_error::ActorError; -use crate::actor::actor::actor_receiver::ActorReceiver; -use crate::actor::context::context_handle::ContextHandle; -use crate::actor::context::InfoPart; -use std::fmt::Debug; -use std::future::Future; - -#[derive(Debug, Clone)] -pub struct Behavior { - stack: Vec, -} - -impl Behavior { - pub fn new() -> Self { - Behavior { stack: vec![] } - } - - pub async fn transition(&mut self, receive: F) - where - F: Fn(ContextHandle) -> Fut + Send + Sync + 'static, - Fut: Future> + Send + 'static, { - self.clear().await; - self.push(receive).await; - } - - pub async fn transition_stacked(&mut self, receive: F) - where - F: Fn(ContextHandle) -> Fut + Send + Sync + 'static, - Fut: Future> + Send + 'static, { - self.push(receive).await; - } - - pub async fn revert_transition(&mut self) { - self.pop().await; - } - - pub async fn receive(&self, context: ContextHandle) -> Result<(), ActorError> { - if let Some(behavior) = self.peek().await { - behavior.run(context).await - } else { - tracing::error!("empty behavior called: pid = {}", context.get_self().await); - Err(ActorError::ReceiveError("empty behavior called".into())) - } - } - - pub(crate) async fn clear(&mut self) { - for i in 0..self.stack.len() { - self.stack[i] = ActorReceiver::new(|_| async { Ok(()) }); - } - self.stack.clear(); - } - - pub(crate) async fn peek(&self) -> Option { - if let Some(last) = self.stack.last() { - Some(last.clone()) - } else { - None - } - } - - pub(crate) async fn push(&mut self, actor_receiver: F) - where - F: Fn(ContextHandle) -> Fut + Send + Sync + 'static, - Fut: Future> + Send + 'static, { - self.push_actor_receiver(ActorReceiver::new(actor_receiver)).await; - } - - pub(crate) async fn push_actor_receiver(&mut self, actor_receiver: ActorReceiver) { - self.stack.push(actor_receiver); - } - - pub(crate) async fn pop(&mut self) -> Option { - self.stack.pop() - } - - pub(crate) fn len(&self) -> usize { - self.stack.len() - } -} diff --git a/src/actor/actor/behavior_logic_test.rs b/src/actor/actor/behavior_logic_test.rs deleted file mode 100644 index d8a97f93..00000000 --- a/src/actor/actor/behavior_logic_test.rs +++ /dev/null @@ -1,133 +0,0 @@ -#[cfg(test)] -mod tests { - use crate::actor::actor::actor_receiver::ActorReceiver; - use crate::actor::actor::behavior::Behavior; - use crate::actor::actor_system::ActorSystem; - use crate::actor::context::context_handle::ContextHandle; - use crate::actor::context::mock_context::MockContext; - use std::sync::Arc; - use tokio::sync::Mutex; - - #[tokio::test] - async fn len() { - let mut bs = Behavior::new(); - assert_eq!(bs.len(), 0); - bs.push(|_| async { Ok(()) }).await; - bs.push(|_| async { Ok(()) }).await; - assert_eq!(bs.len(), 2); - } - - #[tokio::test] - async fn push() { - let mut bs = Behavior::new(); - assert_eq!(bs.len(), 0); - bs.push(|_| async { Ok(()) }).await; - assert_eq!(bs.len(), 1); - bs.push(|_| async { Ok(()) }).await; - assert_eq!(bs.len(), 2); - } - - #[tokio::test] - async fn clear() { - let mut bs = Behavior::new(); - bs.push(|_| async { Ok(()) }).await; - bs.push(|_| async { Ok(()) }).await; - assert_eq!(bs.len(), 2); - bs.clear().await; - assert_eq!(bs.len(), 0); - } - - #[tokio::test] - async fn peek() { - let system = ActorSystem::new().await; - let called = Arc::new(Mutex::new(0)); - let cloned_called1 = called.clone(); - let cloned_called2 = called.clone(); - - let fn1 = ActorReceiver::new(move |_| { - let cloned_called = cloned_called1.clone(); - async move { - let mut mg = cloned_called.lock().await; - *mg = 1; - Ok(()) - } - }); - - let fn2 = ActorReceiver::new(move |_| { - let cloned_called = cloned_called2.clone(); - async move { - let mut mg = cloned_called.lock().await; - *mg = 2; - Ok(()) - } - }); - - let cases = vec![(vec![fn1.clone(), fn2.clone()], 2), (vec![fn2, fn1], 1)]; - - for (items, expected) in cases { - let mut bs = Behavior::new(); - for f in items { - bs.push_actor_receiver(f).await; - } - if let Some(a) = bs.peek().await { - let ctx = ContextHandle::new(MockContext::new(system.clone())); - a.run(ctx).await.unwrap(); - assert_eq!(expected, *called.lock().await); - } else { - panic!("peek() returned None"); - } - } - } - - #[tokio::test] - async fn pop() { - let system = ActorSystem::new().await; - let called = Arc::new(Mutex::new(0)); - let cloned_called1 = called.clone(); - let cloned_called2 = called.clone(); - - let fn1 = ActorReceiver::new(move |_| { - let cloned_called = cloned_called1.clone(); - async move { - let mut mg = cloned_called.lock().await; - *mg = 1; - Ok(()) - } - }); - - let fn2 = ActorReceiver::new(move |_| { - let cloned_called = cloned_called2.clone(); - async move { - let mut mg = cloned_called.lock().await; - *mg = 2; - Ok(()) - } - }); - - let cases = vec![ - (vec![fn1.clone(), fn2.clone()], vec![2, 1]), - (vec![fn2, fn1], vec![1, 2]), - ]; - - for (i, (items, expected)) in cases.into_iter().enumerate() { - let test_name = format!("order {}", i); - println!("Running test: {}", test_name); - - let mut bs = Behavior::new(); - for f in items { - bs.push_actor_receiver(f).await; - } - - for e in expected { - if let Some(a) = bs.pop().await { - let ctx = ContextHandle::new(MockContext::new(system.clone())); - a.run(ctx).await.unwrap(); - assert_eq!(e, *called.lock().await, "Failed in test: {}", test_name); - *called.lock().await = 0; - } else { - panic!("pop() returned None in test: {}", test_name); - } - } - } - } -} diff --git a/src/actor/actor/behavior_test.rs b/src/actor/actor/behavior_test.rs deleted file mode 100644 index dd7f7816..00000000 --- a/src/actor/actor/behavior_test.rs +++ /dev/null @@ -1,143 +0,0 @@ -#[cfg(test)] -mod tests { - use crate::actor::actor::actor::Actor; - use crate::actor::actor::actor_error::ActorError; - use crate::actor::actor::behavior::Behavior; - - use crate::actor::context::context_handle::ContextHandle; - use crate::actor::context::{BasePart, MessagePart}; - use crate::actor::message::message::Message; - use crate::actor::message::response::ResponseHandle; - use async_trait::async_trait; - use std::any::Any; - use std::sync::Arc; - use tokio::sync::Mutex; - - #[derive(Debug, Clone, PartialEq, Eq)] - struct BehaviorMessage; - - impl Message for BehaviorMessage { - fn eq_message(&self, other: &dyn Message) -> bool { - let other_msg = other.as_any().downcast_ref::().unwrap(); - self == other_msg - } - - fn as_any(&self) -> &(dyn Any + Send + Sync + 'static) { - self - } - } - - #[derive(Debug, Clone, PartialEq, Eq)] - struct EchoRequest; - impl Message for EchoRequest { - fn eq_message(&self, other: &dyn Message) -> bool { - let other_msg = other.as_any().downcast_ref::().unwrap(); - self == other_msg - } - - fn as_any(&self) -> &(dyn Any + Send + Sync + 'static) { - self - } - } - - #[derive(Debug, PartialEq, Eq)] - struct EchoResponse; - - impl Message for EchoResponse { - fn eq_message(&self, other: &dyn Message) -> bool { - let other_msg = other.as_any().downcast_ref::().unwrap(); - self == other_msg - } - - fn as_any(&self) -> &(dyn Any + Send + Sync + 'static) { - self - } - } - - #[derive(Debug, Clone)] - struct EchoSetBehaviorActor { - behavior: Arc>, - } - - impl EchoSetBehaviorActor { - async fn new() -> Self { - let actor = Self { - behavior: Arc::new(Mutex::new(Behavior::new())), - }; - let cloned_self = actor.clone(); - - { - tracing::debug!("EchoSetBehaviorActor::new:0"); - let mut behavior = actor.behavior.lock().await; - tracing::debug!("EchoSetBehaviorActor::new:1"); - behavior - .transition(move |ctx| { - let cloned_self = cloned_self.clone(); - async move { - tracing::debug!("EchoSetBehaviorActor::new:2"); - cloned_self.one(ctx).await - } - }) - .await; - tracing::debug!("EchoSetBehaviorActor::new:3"); - } - - actor - } - - async fn one(&self, ctx: ContextHandle) -> Result<(), ActorError> { - tracing::debug!("one:0: {:?}", ctx.get_message_handle().await); - if let Some(BehaviorMessage) = ctx.get_message_handle().await.to_typed::() { - tracing::debug!("one:1: {:?}", ctx.get_message_handle().await); - let cloned_self = self.clone(); - let mut behavior = self.behavior.lock().await; - tracing::debug!("one:2: {:?}", ctx.get_message_handle().await); - behavior - .transition(move |ctx| { - let cloned_self = cloned_self.clone(); - async move { cloned_self.other(ctx).await } - }) - .await; - tracing::debug!("one:3: {:?}", ctx.get_message_handle().await); - } - Ok(()) - } - - async fn other(&self, ctx: ContextHandle) -> Result<(), ActorError> { - tracing::debug!("other: {:?}", ctx.get_message_handle().await); - if let Some(EchoRequest) = ctx.get_message_handle().await.to_typed::() { - ctx.respond(ResponseHandle::new(EchoResponse)).await; - } - Ok(()) - } - } - - #[async_trait] - impl Actor for EchoSetBehaviorActor { - async fn receive(&mut self, context_handle: ContextHandle) -> Result<(), ActorError> { - let mg = self.behavior.lock().await; - mg.receive(context_handle).await - } - } - // #[tokio::test] - // async fn test_actor_can_set_behavior() { - // let _ = env::set_var("RUST_LOG", "debug"); - // let _ = tracing_subscriber::fmt() - // .with_env_filter(EnvFilter::from_default_env()) - // .try_init(); - // - // let system = ActorSystem::new().await; - // let mut root_context = system.get_root_context().await; - // let props = Props::from_actor_producer(|_| async { EchoSetBehaviorActor::new().await }).await; - // let pid = root_context.spawn(props).await; - // - // root_context.send(pid.clone(), MessageHandle::new(BehaviorMessage)).await; - // - // let future= root_context.request_future(pid, MessageHandle::new(EchoRequest), Duration::from_secs(3)).await; - // - // let result = future.result().await; - // - // println!("result: {:?}", result); - // assert!(result.is_ok()); - // } -} diff --git a/src/actor/actor/typed_actor.rs b/src/actor/actor/typed_actor.rs index cfd70048..95001460 100644 --- a/src/actor/actor/typed_actor.rs +++ b/src/actor/actor/typed_actor.rs @@ -11,7 +11,7 @@ use std::future::Future; use std::sync::Arc; use tokio::sync::Mutex; -type BehaviorFn = Arc) -> BoxFuture<'static, Behavior> + Send + Sync>; +type BehaviorFn = Arc) -> BoxFuture<'static, Behavior> + Send + Sync>; #[derive(Clone)] pub struct Behavior { @@ -27,24 +27,24 @@ impl Debug for Behavior { impl Behavior { pub fn new(f: F) -> Self where - F: Fn(M, &mut ActorContext) -> Fut + Send + Sync + 'static, + F: Fn(M, &mut TypedActorContext) -> Fut + Send + Sync + 'static, Fut: Future> + Send + 'static, { Behavior { f: Arc::new(move |msg, ctx| Box::pin(f(msg, ctx))), } } - pub async fn receive(&self, msg: M, ctx: &mut ActorContext) -> Behavior { + pub async fn receive(&self, msg: M, ctx: &mut TypedActorContext) -> Behavior { (self.f)(msg, ctx).await } } -pub struct ActorContext { +pub struct TypedActorContext { context_handle: ContextHandle, _phantom: std::marker::PhantomData, } -impl ActorContext { +impl TypedActorContext { pub fn new(context_handle: ContextHandle) -> Self { Self { context_handle, @@ -65,11 +65,11 @@ pub trait BehaviorActor: Debug { } #[derive(Debug)] -struct ActorWrapper { +struct TypedWrapper { behavior: Arc>>>, } -impl ActorWrapper { +impl TypedWrapper { fn new() -> Self { Self { behavior: Arc::new(Mutex::new(Some(A::create_initial_behavior()))), @@ -78,14 +78,14 @@ impl ActorWrapper { } #[async_trait] -impl Actor for ActorWrapper { +impl Actor for TypedWrapper { async fn receive(&mut self, context_handle: ContextHandle) -> Result<(), ActorError> { let message_handle = context_handle.get_message_handle().await; let msg = message_handle.to_typed::().unwrap(); let mut behavior_guard = self.behavior.lock().await; if let Some(current_behavior) = behavior_guard.take() { - let mut actor_context = ActorContext::new(context_handle); + let mut actor_context = TypedActorContext::new(context_handle); let new_behavior = current_behavior.receive(msg, &mut actor_context).await; *behavior_guard = Some(new_behavior); } else { @@ -96,3 +96,120 @@ impl Actor for ActorWrapper { Ok(()) } } + +#[cfg(test)] +mod tests { + use crate::actor::actor::props::Props; + use crate::actor::actor::typed_actor::{Behavior, BehaviorActor, TypedWrapper}; + use crate::actor::actor_system::ActorSystem; + use crate::actor::context::{SenderPart, SpawnerPart}; + use crate::actor::message::message::Message; + use crate::actor::message::message_handle::MessageHandle; + use std::any::Any; + use std::time::Duration; + use tokio::time::sleep; + + #[derive(Debug, Clone, PartialEq, Eq)] + enum AppMessage { + Greet(String), + SwitchToFormal, + SwitchToInformal, + } + + impl Message for AppMessage { + fn eq_message(&self, other: &dyn Message) -> bool { + match other.as_any().downcast_ref::() { + Some(other) => self == other, + None => false, + } + } + + fn as_any(&self) -> &(dyn Any + Send + Sync + 'static) { + self + } + } + + #[derive(Debug)] + struct StateSwitchingActor; + + impl StateSwitchingActor { + fn informal_behavior(greeting_count: usize) -> Behavior { + Behavior::new(move |msg, _ctx| async move { + match msg { + AppMessage::Greet(name) => { + let new_count = greeting_count + 1; + println!("Hey, {}! What's up? (Greetings: {})", name, new_count); + Self::informal_behavior(new_count) + } + AppMessage::SwitchToFormal => { + println!("Switching to formal behavior."); + Self::formal_behavior(greeting_count) + } + _ => { + println!("Informal: I don't understand that message."); + Self::informal_behavior(greeting_count) + } + } + }) + } + + fn formal_behavior(greeting_count: usize) -> Behavior { + Behavior::new(move |msg, _ctx| async move { + match msg { + AppMessage::Greet(name) => { + let new_count = greeting_count + 1; + println!("Good day, {}. How may I assist you? (Greetings: {})", name, new_count); + Self::formal_behavior(new_count) + } + AppMessage::SwitchToInformal => { + println!("Switching to informal behavior."); + Self::informal_behavior(greeting_count) + } + _ => { + println!("Formal: I do not understand that message."); + Self::formal_behavior(greeting_count) + } + } + }) + } + } + + impl BehaviorActor for StateSwitchingActor { + type Message = AppMessage; + + fn create_initial_behavior() -> Behavior { + Self::informal_behavior(0) + } + } + + #[tokio::test] + async fn test() { + let system = ActorSystem::new().await; + let mut root_context = system.get_root_context().await; + + let pid = root_context + .spawn(Props::from_actor_producer(|_| async { TypedWrapper::::new() }).await) + .await; + + root_context + .send(pid.clone(), MessageHandle::new(AppMessage::Greet("Alice".to_string()))) + .await; + root_context + .send(pid.clone(), MessageHandle::new(AppMessage::SwitchToFormal)) + .await; + root_context + .send(pid.clone(), MessageHandle::new(AppMessage::Greet("Bob".to_string()))) + .await; + root_context + .send(pid.clone(), MessageHandle::new(AppMessage::SwitchToInformal)) + .await; + root_context + .send( + pid.clone(), + MessageHandle::new(AppMessage::Greet("Charlie".to_string())), + ) + .await; + + sleep(Duration::from_secs(1)).await; + } +}