Skip to content

Commit

Permalink
feat: Update Logger constructor parameters
Browse files Browse the repository at this point in the history
  • Loading branch information
j5ik2o committed Jul 12, 2024
1 parent a5e4d0b commit a6d8def
Show file tree
Hide file tree
Showing 57 changed files with 257 additions and 188 deletions.
2 changes: 1 addition & 1 deletion examples/actor-hello-world/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ struct HelloActor;

#[async_trait]
impl Actor for HelloActor {
async fn receive(&mut self, context_handle: ContextHandle, message_handle: MessageHandle) -> Result<(), ActorError> {
async fn receive(&mut self, _: ContextHandle, message_handle: MessageHandle) -> Result<(), ActorError> {
let hello = message_handle.to_typed::<Hello>().unwrap();
println!("Hello, {}!", hello.who);
Ok(())
Expand Down
126 changes: 69 additions & 57 deletions examples/actor-supervision/main.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
use std::any::Any;
use std::env;
use std::time::Duration;
use async_trait::async_trait;
use tokio::time::sleep;
use tracing_subscriber::EnvFilter;
use nexus_acto_rs::actor::actor::actor::Actor;
use nexus_acto_rs::actor::actor::actor_error::ActorError;
use nexus_acto_rs::actor::actor::actor_handle::ActorHandle;
Expand All @@ -19,90 +14,107 @@ use nexus_acto_rs::actor::supervisor::directive::Directive;
use nexus_acto_rs::actor::supervisor::strategy_one_for_one::OneForOneStrategy;
use nexus_acto_rs::actor::supervisor::supervisor_strategy_handle::SupervisorStrategyHandle;
use nexus_acto_rs::actor::util::async_barrier::AsyncBarrier;
use std::any::Any;
use std::env;
use std::time::Duration;
use tokio::time::sleep;
use tracing_subscriber::EnvFilter;

#[derive(Debug)]
struct Parent;

impl Parent {
fn new() -> Self {
Self
}
fn new() -> Self {
Self
}
}

#[async_trait]
impl Actor for Parent {
async fn receive(&mut self, mut context_handle: ContextHandle, message_handle: MessageHandle) -> Result<(), ActorError> {
let msg = message_handle.to_typed::<Hello>().unwrap();
let props = Props::from_actor_producer(ActorProducer::new(|_| async { ActorHandle::new(Child::new())})).await;
let child = context_handle.spawn(props).await;
context_handle.send(child, MessageHandle::new(msg)).await;
Ok(())
}
async fn receive(
&mut self,
mut context_handle: ContextHandle,
message_handle: MessageHandle,
) -> Result<(), ActorError> {
let msg = message_handle.to_typed::<Hello>().unwrap();
let props = Props::from_actor_producer(ActorProducer::new(|_| async { ActorHandle::new(Child::new()) })).await;
let child = context_handle.spawn(props).await;
context_handle.send(child, MessageHandle::new(msg)).await;
Ok(())
}
}

#[derive(Debug)]
struct Child;

impl Child {
fn new() -> Self {
Self
}
fn new() -> Self {
Self
}
}

#[async_trait]
impl Actor for Child {
async fn receive(&mut self, _: ContextHandle, message_handle: MessageHandle) -> Result<(), ActorError> {
let msg = message_handle.to_typed::<Hello>().unwrap();
println!("Hello, {}", msg.who) ;
msg.async_barrier.wait().await;
Err(ActorError::ReceiveError(ActorInnerError::new("Ouch".to_string())))
}
async fn receive(&mut self, _: ContextHandle, message_handle: MessageHandle) -> Result<(), ActorError> {
let msg = message_handle.to_typed::<Hello>().unwrap();
println!("Hello, {}", msg.who);
msg.async_barrier.wait().await;
Err(ActorError::ReceiveError(ActorInnerError::new("Ouch".to_string())))
}
}

#[derive(Debug, Clone)]
struct Hello {
who: String,
async_barrier: AsyncBarrier,
who: String,
async_barrier: AsyncBarrier,
}

impl Hello {
fn new(who: String, async_barrier: AsyncBarrier) -> Self {
Self { who, async_barrier }
}
fn new(who: String, async_barrier: AsyncBarrier) -> Self {
Self { who, async_barrier }
}
}

impl Message for Hello {
fn eq_message(&self, other: &dyn Message) -> bool {
other.eq_message(self)
}
fn eq_message(&self, other: &dyn Message) -> bool {
other.eq_message(self)
}

fn as_any(&self) -> &(dyn Any + Send + Sync + 'static) {
self
}
fn as_any(&self) -> &(dyn Any + Send + Sync + 'static) {
self
}
}

#[tokio::main]
async fn main() {
let _ = env::set_var("RUST_LOG", "debug");
let _ = tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.init();
let _ = env::set_var("RUST_LOG", "debug");
let _ = tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.init();

let system = ActorSystem::new().await;
let decider = |_| async {
println!("occurred error");
Directive::Stop };
let supervisor = OneForOneStrategy::new(10, Duration::from_millis(1000)).with_decider(decider);
let mut root_context = system.get_root_context().await;
let actor_producer = ActorProducer::new(|_| async { ActorHandle::new(Parent::new()) });
let props = Props::from_actor_producer_with_opts(actor_producer, &[
Props::with_supervisor_strategy(SupervisorStrategyHandle::new(supervisor))
]).await;
let pid = root_context
.spawn(props)
.await;
let async_barrier = AsyncBarrier::new(2);
root_context.send(pid, MessageHandle::new(Hello::new("Roger".to_string(), async_barrier.clone()))).await;
async_barrier.wait().await;
sleep(Duration::from_secs(2)).await;
}
let system = ActorSystem::new().await;
let decider = |_| async {
println!("occurred error");
Directive::Stop
};
let supervisor = OneForOneStrategy::new(10, Duration::from_millis(1000)).with_decider(decider);
let mut root_context = system.get_root_context().await;
let actor_producer = ActorProducer::new(|_| async { ActorHandle::new(Parent::new()) });
let props = Props::from_actor_producer_with_opts(
actor_producer,
&[Props::with_supervisor_strategy(SupervisorStrategyHandle::new(
supervisor,
))],
)
.await;
let pid = root_context.spawn(props).await;
let async_barrier = AsyncBarrier::new(2);
root_context
.send(
pid,
MessageHandle::new(Hello::new("Roger".to_string(), async_barrier.clone())),
)
.await;
async_barrier.wait().await;
sleep(Duration::from_secs(2)).await;
}
6 changes: 4 additions & 2 deletions src/actor/actor/actor.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
use std::fmt::Debug;

use async_trait::async_trait;

use crate::actor::actor::actor_error::ActorError;
use crate::actor::actor::actor_inner_error::ActorInnerError;
use crate::actor::actor::Terminated;
Expand All @@ -8,8 +12,6 @@ use crate::actor::message::message_handle::MessageHandle;
use crate::actor::message::message_or_envelope::{unwrap_envelope_message, MessageEnvelope};
use crate::actor::message::system_message::SystemMessage;
use crate::actor::supervisor::supervisor_strategy_handle::SupervisorStrategyHandle;
use async_trait::async_trait;
use std::fmt::Debug;

#[async_trait]
pub trait Actor: Debug + Send + Sync + 'static {
Expand Down
3 changes: 2 additions & 1 deletion src/actor/actor/actor_error.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::actor::actor::actor_inner_error::ActorInnerError;
use std::error::Error;
use std::fmt::{Display, Formatter};

use crate::actor::actor::actor_inner_error::ActorInnerError;

#[derive(Debug, Clone)]
pub enum ActorError {
ReceiveError(ActorInnerError),
Expand Down
8 changes: 5 additions & 3 deletions src/actor/actor/actor_handle.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use std::sync::Arc;

use async_trait::async_trait;
use tokio::sync::Mutex;

use crate::actor::actor::actor::Actor;
use crate::actor::actor::actor_error::ActorError;
use crate::actor::context::context_handle::ContextHandle;
use crate::actor::message::message_handle::MessageHandle;
use crate::actor::supervisor::supervisor_strategy_handle::SupervisorStrategyHandle;
use async_trait::async_trait;
use std::sync::Arc;
use tokio::sync::Mutex;

#[derive(Debug, Clone)]
pub struct ActorHandle(Arc<Mutex<dyn Actor>>);
Expand Down
3 changes: 2 additions & 1 deletion src/actor/actor/actor_inner_error.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use backtrace::Backtrace;
use std::any::Any;
use std::error::Error;
use std::fmt::{Debug, Display, Formatter};
use std::sync::Arc;

use backtrace::Backtrace;

#[derive(Clone)]
pub struct ActorInnerError {
inner_error: Option<Arc<dyn Any + Send + Sync>>,
Expand Down
3 changes: 2 additions & 1 deletion src/actor/actor/actor_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ use std::fmt::{Debug, Formatter};
use std::future::Future;
use std::sync::Arc;

use futures::future::BoxFuture;

use crate::actor::actor::actor_handle::ActorHandle;
use crate::actor::context::context_handle::ContextHandle;
use futures::future::BoxFuture;

#[derive(Clone)]
pub struct ActorProducer(Arc<dyn Fn(ContextHandle) -> BoxFuture<'static, ActorHandle> + Send + Sync>);
Expand Down
3 changes: 2 additions & 1 deletion src/actor/actor/actor_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ use std::fmt::{Debug, Formatter};
use std::future::Future;
use std::sync::Arc;

use futures::future::BoxFuture;

use crate::actor::actor::actor_error::ActorError;
use crate::actor::context::context_handle::ContextHandle;
use futures::future::BoxFuture;

#[derive(Clone)]
pub struct ActorReceiver(Arc<dyn Fn(ContextHandle) -> BoxFuture<'static, Result<(), ActorError>> + Send + Sync>);
Expand Down
3 changes: 2 additions & 1 deletion src/actor/actor/pid_set_test.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
#[cfg(test)]
mod tests {
use super::super::*;
use crate::actor::actor::pid::ExtendedPid;
use crate::actor::actor::pid_set::PidSet;
use crate::actor::actor_system::ActorSystem;

use super::super::*;

async fn new_pid(system: ActorSystem, address: &str, id: &str, request_id: u32) -> ExtendedPid {
let pid = Pid {
address: address.to_string(),
Expand Down
21 changes: 14 additions & 7 deletions src/actor/actor/props.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,7 @@ static DEFAULT_SPAWNER: Lazy<Spawner> = Lazy::new(|| {
.await;
tracing::debug!("mailbox handlers registered: {}", name);

mb.post_system_message(MessageHandle::new(SystemMessage::Started))
.await;
mb.post_system_message(MessageHandle::new(SystemMessage::Started)).await;
tracing::debug!("post_system_message: started: {}", name);
mb.start().await;
tracing::debug!("mailbox started: {}", name);
Expand Down Expand Up @@ -185,9 +184,10 @@ impl Props {
})
}

pub fn with_context_decorators(decorators: Vec<ContextDecorator>) -> PropsOption {
pub fn with_context_decorators(decorators: impl IntoIterator<Item = ContextDecorator> + Send + Sync) -> PropsOption {
let cloned_decorators = decorators.into_iter().collect::<Vec<_>>();
PropsOption::new(move |props: &mut Props| {
let cloned_decorators = decorators.clone();
let cloned_decorators = cloned_decorators.clone();
props.context_decorator.extend(cloned_decorators.clone());
props.context_decorator_chain = make_context_decorator_chain(
&props.context_decorator,
Expand All @@ -211,7 +211,10 @@ impl Props {
})
}

pub fn with_receiver_middlewares(middlewares: Vec<ReceiverMiddleware>) -> PropsOption {
pub fn with_receiver_middlewares(
middlewares: impl IntoIterator<Item = ReceiverMiddleware> + Send + Sync,
) -> PropsOption {
let middlewares = middlewares.into_iter().collect::<Vec<_>>();
PropsOption::new(move |props: &mut Props| {
props.receiver_middleware.extend(middlewares.clone());
props.receiver_middleware_chain = make_receiver_middleware_chain(
Expand All @@ -221,7 +224,8 @@ impl Props {
})
}

pub fn with_sender_middlewares(middlewares: Vec<SenderMiddleware>) -> PropsOption {
pub fn with_sender_middlewares(middlewares: impl IntoIterator<Item = SenderMiddleware> + Send + Sync) -> PropsOption {
let middlewares = middlewares.into_iter().collect::<Vec<_>>();
PropsOption::new(move |props: &mut Props| {
props.sender_middleware.extend(middlewares.clone());
props.sender_middleware_chain = make_sender_middleware_chain(
Expand All @@ -241,7 +245,10 @@ impl Props {
})
}

pub fn with_spawn_middleware(spawn_middlewares: Vec<SpawnMiddleware>) -> PropsOption {
pub fn with_spawn_middleware(
spawn_middlewares: impl IntoIterator<Item = SpawnMiddleware> + Send + Sync,
) -> PropsOption {
let spawn_middlewares = spawn_middlewares.into_iter().collect::<Vec<_>>();
PropsOption::new(move |props: &mut Props| {
props.spawn_middleware.extend(spawn_middlewares.clone());
props.spawn_middleware_chain = make_spawn_middleware_chain(
Expand Down
3 changes: 2 additions & 1 deletion src/actor/actor/receiver_middleware_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ use std::fmt::{Debug, Formatter};
use std::future::Future;
use std::sync::Arc;

use futures::future::BoxFuture;

use crate::actor::actor::actor_error::ActorError;
use crate::actor::context::receiver_context_handle::ReceiverContextHandle;
use crate::actor::message::message_or_envelope::MessageEnvelope;
use futures::future::BoxFuture;

// ReceiverMiddlewareChain
#[derive(Clone)]
Expand Down
4 changes: 2 additions & 2 deletions src/actor/actor/restart_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ impl RestartStatistics {
}
}

pub fn with_values(failure_times: Vec<Instant>) -> Self {
pub fn with_values(failure_times: impl IntoIterator<Item = Instant>) -> Self {
Self {
failure_times: Arc::new(Mutex::new(failure_times)),
failure_times: Arc::new(Mutex::new(failure_times.into_iter().collect())),
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/actor/actor/taks.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::actor::message::message::Message;
use async_trait::async_trait;

use crate::actor::message::message::Message;

#[async_trait]
pub trait Task: Message {
async fn run(&self);
Expand Down
2 changes: 1 addition & 1 deletion src/actor/actor_example_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ mod tests {
let msg = ctx.get_message_handle().await;
tracing::debug!("caller msg = {:?}", msg);
if let Some(msg) = msg.to_typed::<SystemMessage>() {
if let SystemMessage::Started(_) = msg {
if let SystemMessage::Started = msg {
ctx
.request(cloned_callee_pid, MessageHandle::new(Request("PING".to_string())))
.await;
Expand Down
8 changes: 5 additions & 3 deletions src/actor/context.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
use std::fmt::Debug;
use std::time::Duration;

use async_trait::async_trait;

use crate::actor::actor::actor_error::ActorError;
use crate::actor::actor::actor_handle::ActorHandle;
use crate::actor::actor::continuer::Continuer;
Expand All @@ -11,9 +16,6 @@ use crate::actor::message::message_or_envelope::MessageEnvelope;
use crate::actor::message::readonly_message_headers::ReadonlyMessageHeadersHandle;
use crate::actor::message::response::ResponseHandle;
use crate::ctxext::extensions::{ContextExtensionHandle, ContextExtensionId};
use async_trait::async_trait;
use std::fmt::Debug;
use std::time::Duration;

pub mod actor_context;
mod actor_context_extras;
Expand Down
Loading

0 comments on commit a6d8def

Please sign in to comment.