Skip to content

Commit

Permalink
feat: Add clap dependency and behavior_test
Browse files Browse the repository at this point in the history
  • Loading branch information
j5ik2o committed Jul 14, 2024
1 parent d45e5b2 commit 2d146f3
Show file tree
Hide file tree
Showing 11 changed files with 182 additions and 41 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ uuid = { version = "1.9.0", features = ["v4"] }
static_assertions = "1.1.0"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
clap = { version = "4.5.9", features = ["derive"] }

[dev-dependencies]
rstest = "0.21.0"
Expand Down
1 change: 1 addition & 0 deletions src/actor/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub mod actor_producer;
pub mod actor_receiver;
pub mod behavior;
mod behavior_logic_test;
mod behavior_test;
pub mod context_decorator;
pub mod context_decorator_chain;
pub mod context_handler;
Expand Down
41 changes: 14 additions & 27 deletions src/actor/actor/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,36 +15,23 @@ use crate::actor::supervisor::supervisor_strategy_handle::SupervisorStrategyHand
#[async_trait]
pub trait Actor: Debug + Send + Sync + 'static {
async fn handle(&mut self, context_handle: ContextHandle) -> Result<(), ActorError> {
if let Some(message_handle) = context_handle.get_message_handle_opt().await {
tracing::debug!("Actor::handle: message_handle = {:?}", message_handle);
let me = message_handle.to_typed::<MessageEnvelope>();
let arm = message_handle.to_typed::<AutoReceiveMessage>();
match (me, arm) {
(Some(_), None) => {
let message = unwrap_envelope_message(message_handle.clone());
tracing::debug!("Actor::handle: MessageEnvelope = {:?}", message);
self.receive(context_handle.clone(), message).await
}
(None, Some(arm)) => match arm {
AutoReceiveMessage::PreStart => self.pre_start(context_handle).await,
AutoReceiveMessage::PostStart => self.post_start(context_handle).await,
AutoReceiveMessage::PreRestart => self.pre_restart(context_handle).await,
AutoReceiveMessage::PostRestart => self.post_restart(context_handle).await,
AutoReceiveMessage::PreStop => self.pre_stop(context_handle).await,
AutoReceiveMessage::PostStop => self.post_stop(context_handle).await,
AutoReceiveMessage::Terminated(t) => self.post_child_terminate(context_handle, &t).await,
},
_ => self.receive(context_handle.clone(), message_handle).await,
}
} else {
tracing::error!("No message found");
Err(ActorError::ReceiveError(ActorInnerError::new(
"No message found".to_string(),
)))
let message_handle = context_handle.get_message_handle().await;
let arm = message_handle.to_typed::<AutoReceiveMessage>();
match arm {
Some(arm) => match arm {
AutoReceiveMessage::PreStart => self.pre_start(context_handle).await,
AutoReceiveMessage::PostStart => self.post_start(context_handle).await,
AutoReceiveMessage::PreRestart => self.pre_restart(context_handle).await,
AutoReceiveMessage::PostRestart => self.post_restart(context_handle).await,
AutoReceiveMessage::PreStop => self.pre_stop(context_handle).await,
AutoReceiveMessage::PostStop => self.post_stop(context_handle).await,
AutoReceiveMessage::Terminated(t) => self.post_child_terminate(context_handle, &t).await,
},
_ => self.receive(context_handle).await,
}
}

async fn receive(&mut self, context_handle: ContextHandle, message_handle: MessageHandle) -> Result<(), ActorError>;
async fn receive(&mut self, context_handle: ContextHandle) -> Result<(), ActorError>;

async fn pre_start(&self, _: ContextHandle) -> Result<(), ActorError> {
tracing::debug!("Actor::pre_start");
Expand Down
4 changes: 2 additions & 2 deletions src/actor/actor/actor_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ impl Actor for ActorHandle {
mg.handle(c).await
}

async fn receive(&mut self, context_handle: ContextHandle, message_handle: MessageHandle) -> Result<(), ActorError> {
async fn receive(&mut self, context_handle: ContextHandle) -> Result<(), ActorError> {
let mut mg = self.0.lock().await;
mg.receive(context_handle, message_handle).await
mg.receive(context_handle).await
}

async fn get_supervisor_strategy(&self) -> Option<SupervisorStrategyHandle> {
Expand Down
6 changes: 3 additions & 3 deletions src/actor/actor/behavior.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,22 @@ impl Behavior {
Behavior { stack: vec![] }
}

pub async fn context_become<F, Fut>(&mut self, receive: F)
pub async fn transition<F, Fut>(&mut self, receive: F)
where
F: Fn(ContextHandle) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<(), ActorError>> + Send + 'static, {
self.clear().await;
self.push(receive).await;
}

pub async fn context_become_stacked<F, Fut>(&mut self, receive: F)
pub async fn transition_stacked<F, Fut>(&mut self, receive: F)
where
F: Fn(ContextHandle) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<(), ActorError>> + Send + 'static, {
self.push(receive).await;
}

pub async fn context_un_become_stacked(&mut self) {
pub async fn revert_transition(&mut self) {
self.pop().await;
}

Expand Down
151 changes: 151 additions & 0 deletions src/actor/actor/behavior_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
#[cfg(test)]
mod tests {
use crate::actor::actor::actor::Actor;
use crate::actor::actor::actor_error::ActorError;
use crate::actor::actor::behavior::Behavior;
use crate::actor::actor::props::Props;
use crate::actor::actor_system::ActorSystem;
use crate::actor::context::context_handle::ContextHandle;
use crate::actor::context::{BasePart, MessagePart, SenderPart, SpawnerPart};
use crate::actor::message::message::Message;
use crate::actor::message::message_handle::MessageHandle;
use crate::actor::message::response::ResponseHandle;
use async_trait::async_trait;
use std::any::Any;
use std::env;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tracing_subscriber::EnvFilter;

#[derive(Debug, Clone, PartialEq, Eq)]
struct BehaviorMessage;

impl Message for BehaviorMessage {
fn eq_message(&self, other: &dyn Message) -> bool {
let other_msg = other.as_any().downcast_ref::<BehaviorMessage>().unwrap();
self == other_msg
}

fn as_any(&self) -> &(dyn Any + Send + Sync + 'static) {
self
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
struct EchoRequest;
impl Message for EchoRequest {
fn eq_message(&self, other: &dyn Message) -> bool {
let other_msg = other.as_any().downcast_ref::<EchoRequest>().unwrap();
self == other_msg
}

fn as_any(&self) -> &(dyn Any + Send + Sync + 'static) {
self
}
}

#[derive(Debug, PartialEq, Eq)]
struct EchoResponse;

impl Message for EchoResponse {
fn eq_message(&self, other: &dyn Message) -> bool {
let other_msg = other.as_any().downcast_ref::<EchoResponse>().unwrap();
self == other_msg
}

fn as_any(&self) -> &(dyn Any + Send + Sync + 'static) {
self
}
}

#[derive(Debug, Clone)]
struct EchoSetBehaviorActor {
behavior: Arc<Mutex<Behavior>>,
}

impl EchoSetBehaviorActor {
async fn new() -> Self {
let actor = Self {
behavior: Arc::new(Mutex::new(Behavior::new())),
};
let cloned_self = actor.clone();

{
tracing::debug!("EchoSetBehaviorActor::new:0");
let mut behavior = actor.behavior.lock().await;
tracing::debug!("EchoSetBehaviorActor::new:1");
behavior
.transition(move |ctx| {
let cloned_self = cloned_self.clone();
async move {
tracing::debug!("EchoSetBehaviorActor::new:2");
cloned_self.one(ctx).await
}
})
.await;
tracing::debug!("EchoSetBehaviorActor::new:3");
}

actor
}

async fn one(&self, ctx: ContextHandle) -> Result<(), ActorError> {
tracing::debug!("one:0: {:?}", ctx.get_message_handle().await);
if let Some(BehaviorMessage) = ctx.get_message_handle().await.to_typed::<BehaviorMessage>() {
tracing::debug!("one:1: {:?}", ctx.get_message_handle().await);
let cloned_self = self.clone();
let mut behavior = self.behavior.lock().await;
tracing::debug!("one:2: {:?}", ctx.get_message_handle().await);
behavior
.transition(move |ctx| {
let cloned_self = cloned_self.clone();
async move { cloned_self.other(ctx).await }
})
.await;
tracing::debug!("one:3: {:?}", ctx.get_message_handle().await);
}
Ok(())
}

async fn other(&self, ctx: ContextHandle) -> Result<(), ActorError> {
tracing::debug!("other: {:?}", ctx.get_message_handle().await);
if let Some(EchoRequest) = ctx.get_message_handle().await.to_typed::<EchoRequest>() {
ctx.respond(ResponseHandle::new(EchoResponse)).await;
}
Ok(())
}
}

#[async_trait]
impl Actor for EchoSetBehaviorActor {
async fn receive(
&mut self,
context_handle: ContextHandle,
) -> Result<(), ActorError> {
let mg = self.behavior.lock().await;
mg.receive(context_handle).await
}
}
// #[tokio::test]
// async fn test_actor_can_set_behavior() {
// let _ = env::set_var("RUST_LOG", "debug");
// let _ = tracing_subscriber::fmt()
// .with_env_filter(EnvFilter::from_default_env())
// .try_init();
//
// let system = ActorSystem::new().await;
// let mut root_context = system.get_root_context().await;
// let props = Props::from_actor_producer(|_| async { EchoSetBehaviorActor::new().await }).await;
// let pid = root_context.spawn(props).await;
//
// root_context.send(pid.clone(), MessageHandle::new(BehaviorMessage)).await;
//
// let future= root_context.request_future(pid, MessageHandle::new(EchoRequest), Duration::from_secs(3)).await;
//
// let result = future.result().await;
//
// println!("result: {:?}", result);
// assert!(result.is_ok());
// }
}
2 changes: 1 addition & 1 deletion src/actor/actor/props.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl Actor for ActorReceiverActor {
self.0.run(ctx).await
}

async fn receive(&mut self, _: ContextHandle, _: MessageHandle) -> Result<(), ActorError> {
async fn receive(&mut self, _: ContextHandle) -> Result<(), ActorError> {
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion src/actor/actor_system_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ mod tests {

#[async_trait]
impl Actor for MyActor {
async fn receive(&mut self, _: ContextHandle, _: MessageHandle) -> Result<(), ActorError> {
async fn receive(&mut self, _: ContextHandle) -> Result<(), ActorError> {
self.b.wait().await;
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion src/actor/spawn_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ mod tests {
Ok(())
}

async fn receive(&mut self, _: ContextHandle, _: MessageHandle) -> Result<(), ActorError> {
async fn receive(&mut self, _: ContextHandle) -> Result<(), ActorError> {
Ok(())
}

Expand Down
6 changes: 3 additions & 3 deletions src/actor/supervisor/supervision_event_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ mod test {
use crate::actor::actor::props::Props;
use crate::actor::actor_system::ActorSystem;
use crate::actor::context::context_handle::ContextHandle;
use crate::actor::context::{SenderPart, SpawnerPart};
use crate::actor::context::{MessagePart, SenderPart, SpawnerPart};
use crate::actor::message::message::Message;
use crate::actor::message::message_handle::MessageHandle;
use crate::actor::supervisor::exponential_backoff_strategy::ExponentialBackoffStrategy;
Expand All @@ -31,8 +31,8 @@ mod test {

#[async_trait]
impl Actor for PanicActor {
async fn receive(&mut self, _: ContextHandle, message_handle: MessageHandle) -> Result<(), ActorError> {
if message_handle.to_typed::<String>().is_some() {
async fn receive(&mut self, ctx: ContextHandle) -> Result<(), ActorError> {
if ctx.get_message_handle().await.to_typed::<String>().is_some() {
Err(ActorError::ReceiveError(ActorInnerError::new("Boom!".to_string())))
} else {
Ok(())
Expand Down
7 changes: 4 additions & 3 deletions src/actor/supervisor/supervision_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ mod test {
use crate::actor::actor_system::ActorSystem;
use crate::actor::context::context_handle::ContextHandle;
use crate::actor::context::receiver_context_handle::ReceiverContextHandle;
use crate::actor::context::{SenderPart, SpawnerPart};
use crate::actor::context::{MessagePart, SenderPart, SpawnerPart};
use crate::actor::message::auto_receive_message::AutoReceiveMessage;
use crate::actor::message::message::Message;
use crate::actor::message::message_handle::MessageHandle;
Expand Down Expand Up @@ -170,7 +170,7 @@ mod test {
Ok(())
}

async fn receive(&mut self, _: ContextHandle, _: MessageHandle) -> Result<(), ActorError> {
async fn receive(&mut self, _: ContextHandle) -> Result<(), ActorError> {
tracing::debug!("ActorWithSupervisor::receive");
Ok(())
}
Expand Down Expand Up @@ -212,7 +212,8 @@ mod test {
Ok(())
}

async fn receive(&mut self, _: ContextHandle, message_handle: MessageHandle) -> Result<(), ActorError> {
async fn receive(&mut self, ctx: ContextHandle) -> Result<(), ActorError> {
let message_handle = ctx.get_message_handle().await;
tracing::debug!("FailingChildActor::receive: msg = {:?}", message_handle);
if let Some(StringMessage(msg)) = message_handle.to_typed::<StringMessage>() {
tracing::debug!("FailingChildActor::receive: msg = {:?}", msg);
Expand Down

0 comments on commit 2d146f3

Please sign in to comment.