Skip to content

Commit

Permalink
refactor: Remove unused Terminated struct
Browse files Browse the repository at this point in the history
  • Loading branch information
j5ik2o committed Jul 12, 2024
1 parent 5e1ffbd commit 97703af
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 30 deletions.
16 changes: 8 additions & 8 deletions src/actor/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ pub struct DeadLetterResponse {
}
/// system messages

#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Terminated {
#[prost(message, optional, tag = "1")]
pub who: ::core::option::Option<Pid>,
#[prost(enumeration = "TerminatedReason", tag = "2")]
pub why: i32,
}
// #[allow(clippy::derive_partial_eq_without_eq)]
// #[derive(Clone, PartialEq, ::prost::Message)]
// pub struct Terminated {
// #[prost(message, optional, tag = "1")]
// pub who: ::core::option::Option<Pid>,
// #[prost(enumeration = "TerminatedReason", tag = "2")]
// pub why: i32,
// }

#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
Expand Down
14 changes: 6 additions & 8 deletions src/actor/actor/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ use async_trait::async_trait;

use crate::actor::actor::actor_error::ActorError;
use crate::actor::actor::actor_inner_error::ActorInnerError;
use crate::actor::actor::Terminated;
use crate::actor::context::context_handle::ContextHandle;
use crate::actor::context::MessagePart;
use crate::actor::message::auto_receive_message::AutoReceiveMessage;
use crate::actor::message::message_handle::MessageHandle;
use crate::actor::message::message_or_envelope::{unwrap_envelope_message, MessageEnvelope};
use crate::actor::message::terminate_info::TerminateInfo;
use crate::actor::supervisor::supervisor_strategy_handle::SupervisorStrategyHandle;

#[async_trait]
Expand All @@ -19,24 +19,22 @@ pub trait Actor: Debug + Send + Sync + 'static {
tracing::debug!("Actor::handle: message_handle = {:?}", message_handle);
let me = message_handle.to_typed::<MessageEnvelope>();
let arm = message_handle.to_typed::<AutoReceiveMessage>();
let t = message_handle.to_typed::<Terminated>();
println!("me = {:?}, arm = {:?}, t = {:?}", me, arm, t);
match (me, arm, t) {
(Some(_), None, None) => {
match (me, arm) {
(Some(_), None) => {
let message = unwrap_envelope_message(message_handle.clone());
tracing::debug!("Actor::handle: MessageEnvelope = {:?}", message);
self.receive(context_handle.clone(), message).await
}
(None, Some(arm), None) => match arm {
(None, Some(arm)) => match arm {
AutoReceiveMessage::PreStart => self.pre_start(context_handle).await,
AutoReceiveMessage::PostStart => self.post_start(context_handle).await,
AutoReceiveMessage::PreRestart => self.pre_restart(context_handle).await,
AutoReceiveMessage::PostRestart => self.post_restart(context_handle).await,
AutoReceiveMessage::PreStop => self.pre_stop(context_handle).await,
AutoReceiveMessage::PostStop => self.post_stop(context_handle).await,
AutoReceiveMessage::PoisonPill => Ok(()),
AutoReceiveMessage::Terminated(t) => self.post_child_terminate(context_handle, &t).await,
},
(None, None, Some(t)) => self.post_child_terminate(context_handle, &t).await,
_ => self.receive(context_handle.clone(), message_handle).await,
}
} else {
Expand Down Expand Up @@ -79,7 +77,7 @@ pub trait Actor: Debug + Send + Sync + 'static {
Ok(())
}

async fn post_child_terminate(&self, _: ContextHandle, _: &Terminated) -> Result<(), ActorError> {
async fn post_child_terminate(&self, _: ContextHandle, _: &TerminateInfo) -> Result<(), ActorError> {
tracing::debug!("Actor::post_child_terminate");
Ok(())
}
Expand Down
22 changes: 11 additions & 11 deletions src/actor/context/actor_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use crate::actor::actor::props::Props;
use crate::actor::actor::receiver_middleware_chain::ReceiverMiddlewareChain;
use crate::actor::actor::sender_middleware_chain::SenderMiddlewareChain;
use crate::actor::actor::spawner::SpawnError;
use crate::actor::actor::Terminated;
use crate::actor::actor_system::ActorSystem;
use crate::actor::auto_respond::{AutoRespond, AutoResponsive};
use crate::actor::context::actor_context_extras::ActorContextExtras;
Expand All @@ -43,6 +42,7 @@ use crate::actor::message::readonly_message_headers::ReadonlyMessageHeadersHandl
use crate::actor::message::receive_timeout::ReceiveTimeout;
use crate::actor::message::response::ResponseHandle;
use crate::actor::message::system_message::SystemMessage;
use crate::actor::message::terminate_info::TerminateInfo;
use crate::actor::message::watch::{Unwatch, Watch};
use crate::actor::process::Process;
use crate::actor::supervisor::supervisor_strategy::{
Expand Down Expand Up @@ -342,10 +342,10 @@ impl ActorContext {
P_LOG.error("Failed to handle Stopped message", vec![]).await;
return result;
}
let other_stopped = MessageHandle::new(Terminated {
let other_stopped = MessageHandle::new(SystemMessage::Terminate(TerminateInfo {
who: self.get_self_opt().await.map(|x| x.inner_pid),
why: 0,
});
}));
if let Some(extras) = self.get_extras().await {
let watchers = extras.get_watchers().await;
for watcher in watchers.to_vec().await {
Expand Down Expand Up @@ -522,14 +522,14 @@ impl ActorContext {
tracing::debug!("ActorContext::handle_child_failure: finished: self = {}", self_pid,);
}

async fn handle_terminated(&mut self, terminated: &Terminated) -> Result<(), ActorError> {
async fn handle_terminated(&mut self, terminated: &TerminateInfo) -> Result<(), ActorError> {
// tracing::debug!("ActorContext::handle_terminated: {:?}", terminated);
if let Some(mut extras) = self.get_extras().await {
let pid = ExtendedPid::new(terminated.clone().who.unwrap(), self.get_actor_system().await);
extras.remove_child(&pid).await;
}

let msg = MessageHandle::new(terminated.clone());
let msg = MessageHandle::new(AutoReceiveMessage::Terminated(terminated.clone()));
let result = self.invoke_user_message(msg.clone()).await;
if result.is_err() {
P_LOG.error("Failed to handle Terminated message", vec![]).await;
Expand Down Expand Up @@ -984,6 +984,12 @@ impl MessageInvoker for ActorContext {
SystemMessage::Unwatch(unwatch) => {
self.handle_unwatch(&unwatch).await;
}
SystemMessage::Terminate(t) => {
let result = self.handle_terminated(&t).await;
if result.is_err() {
return result;
}
}
}
}
if let Some(c) = message_handle.to_typed::<Continuation>() {
Expand All @@ -994,12 +1000,6 @@ impl MessageInvoker for ActorContext {
if let Some(f) = message_handle.to_typed::<Failure>() {
self.handle_child_failure(&f).await;
}
if let Some(t) = message_handle.to_typed::<Terminated>() {
let result = self.handle_terminated(&t).await;
if result.is_err() {
return result;
}
}
Ok(())
}

Expand Down
7 changes: 4 additions & 3 deletions src/actor/dispatch/dead_letter_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::any::Any;
use async_trait::async_trait;

use crate::actor::actor::pid::ExtendedPid;
use crate::actor::actor::{DeadLetterResponse, Terminated, TerminatedReason};
use crate::actor::actor::{DeadLetterResponse, TerminatedReason};
use crate::actor::actor_system::ActorSystem;
use crate::actor::context::SenderPart;
use crate::actor::log::P_LOG;
Expand All @@ -12,6 +12,7 @@ use crate::actor::message::message::Message;
use crate::actor::message::message_handle::MessageHandle;
use crate::actor::message::message_or_envelope::unwrap_envelope;
use crate::actor::message::system_message::SystemMessage;
use crate::actor::message::terminate_info::TerminateInfo;
use crate::actor::process::{Process, ProcessHandle};
use crate::actor::util::throttler::{Throttle, ThrottleCallback, Valve};
use crate::event_stream::Handler;
Expand Down Expand Up @@ -122,10 +123,10 @@ impl DeadLetterProcess {
e_pid
.send_system_message(
actor_system,
MessageHandle::new(Terminated {
MessageHandle::new(SystemMessage::Terminate(TerminateInfo {
who: Some(pid),
why: TerminatedReason::NotFound as i32,
}),
})),
)
.await;
}
Expand Down
1 change: 1 addition & 0 deletions src/actor/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ pub mod receive_timeout;
pub mod response;
pub mod system_message;
pub mod watch;
pub mod terminate_info;
4 changes: 4 additions & 0 deletions src/actor/message/auto_receive_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::fmt::Display;
use crate::actor::actor::pid::ExtendedPid;
use crate::actor::message::message::Message;
use crate::actor::message::message_handle::MessageHandle;
use crate::actor::message::terminate_info::TerminateInfo;

#[derive(Debug, Clone)]
pub enum AutoReceiveMessage {
Expand All @@ -14,6 +15,7 @@ pub enum AutoReceiveMessage {
PreStop,
PostStop,
PoisonPill,
Terminated(TerminateInfo)
}

impl AutoReceiveMessage {
Expand All @@ -40,6 +42,7 @@ impl Display for AutoReceiveMessage {
AutoReceiveMessage::PreStop => write!(f, "PreStop"),
AutoReceiveMessage::PostStop => write!(f, "PostStop"),
AutoReceiveMessage::PoisonPill => write!(f, "PoisonPill"),
AutoReceiveMessage::Terminated(_) => write!(f, "Terminated"),
}
}
}
Expand All @@ -55,6 +58,7 @@ impl Message for AutoReceiveMessage {
(AutoReceiveMessage::PreStop, Some(&AutoReceiveMessage::PreStop)) => true,
(AutoReceiveMessage::PostStop, Some(&AutoReceiveMessage::PostStop)) => true,
(AutoReceiveMessage::PoisonPill, Some(&AutoReceiveMessage::PoisonPill)) => true,
(AutoReceiveMessage::Terminated(me), Some(&AutoReceiveMessage::Terminated(ref you))) => *me == *you,
_ => false,
}
}
Expand Down
5 changes: 5 additions & 0 deletions src/actor/message/system_message.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::any::Any;

use crate::actor::message::message::Message;
use crate::actor::message::terminate_info::TerminateInfo;
use crate::actor::message::watch::{Unwatch, Watch};

#[derive(Debug, Clone)]
Expand All @@ -10,6 +11,7 @@ pub enum SystemMessage {
Stop,
Watch(Watch),
Unwatch(Unwatch),
Terminate(TerminateInfo)
}

impl Message for SystemMessage {
Expand All @@ -19,6 +21,9 @@ impl Message for SystemMessage {
(SystemMessage::Restart, Some(&SystemMessage::Restart)) => true,
(SystemMessage::Start, Some(&SystemMessage::Start)) => true,
(SystemMessage::Stop, Some(&SystemMessage::Stop)) => true,
(SystemMessage::Watch(_), Some(&SystemMessage::Watch(_))) => true,
(SystemMessage::Unwatch(_), Some(&SystemMessage::Unwatch(_))) => true,
(SystemMessage::Terminate(me), Some(&SystemMessage::Terminate(ref you))) => *me == *you,
_ => false,
}
}
Expand Down
7 changes: 7 additions & 0 deletions src/actor/message/terminate_info.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
use crate::actor::actor::Pid;

#[derive(Debug, Clone, PartialEq)]
pub struct TerminateInfo {
pub who: Option<Pid>,
pub why: i32,
}

0 comments on commit 97703af

Please sign in to comment.