Skip to content

Commit

Permalink
Remove unnecessary Sync bounds in actor framework (#3601)
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed Jul 4, 2023
1 parent f02200d commit 25c30d8
Show file tree
Hide file tree
Showing 13 changed files with 137 additions and 42 deletions.
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ sqlx = { version = "0.6", features = [
"time",
] }
syn = "2.0.11"
sync_wrapper = "0.1.2"
tabled = { version = "0.8", features = ["color"] }
tempfile = "3"
termcolor = "1"
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-actors/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ tokio = { workspace = true }
tracing = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
sync_wrapper = { workspace = true }

quickwit-common = { workspace = true }
quickwit-proto = { workspace = true }
Expand Down
8 changes: 4 additions & 4 deletions quickwit/quickwit-actors/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ impl From<SendError> for ActorExitStatus {
/// - update its state;
/// - emits one or more messages to other actors.
#[async_trait]
pub trait Actor: Send + Sync + Sized + 'static {
pub trait Actor: Send + Sized + 'static {
/// Piece of state that can be copied for assert in unit test, admin, etc.
type ObservableState: Send + Sync + Clone + serde::Serialize + fmt::Debug;
type ObservableState: fmt::Debug + serde::Serialize + Send + Sync + Clone;
/// A name identifying the type of actor.
///
/// Ideally respect the `CamelCase` convention.
Expand Down Expand Up @@ -205,7 +205,7 @@ pub trait DeferableReplyHandler<M>: Actor {
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus>
where
M: Send + Sync + 'static;
M: Send + 'static;
}

/// Message handler that requires actor to provide immediate response
Expand Down Expand Up @@ -238,7 +238,7 @@ where H: Handler<M>
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus>
where
M: Send + 'static + Send + Sync,
M: Send + 'static,
{
self.handle(message, ctx).await.map(reply)
}
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-actors/src/actor_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ impl<A: Actor> ActorContext<A> {
) -> Result<oneshot::Receiver<DestActor::Reply>, SendError>
where
DestActor: DeferableReplyHandler<M>,
M: 'static + Send + Sync + fmt::Debug,
M: fmt::Debug + Send + 'static,
{
let _guard = self.protect_zone();
debug!(from=%self.self_mailbox.actor_instance_id(), send=%mailbox.actor_instance_id(), msg=?msg);
Expand All @@ -262,7 +262,7 @@ impl<A: Actor> ActorContext<A> {
) -> Result<T, AskError<Infallible>>
where
DestActor: DeferableReplyHandler<M, Reply = T>,
M: 'static + Send + Sync + fmt::Debug,
M: fmt::Debug + Send + 'static,
{
let _guard = self.protect_zone();
debug!(from=%self.self_mailbox.actor_instance_id(), send=%mailbox.actor_instance_id(), msg=?msg, "ask");
Expand Down
6 changes: 3 additions & 3 deletions quickwit/quickwit-actors/src/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl<A: Actor> fmt::Debug for Envelope<A> {
}

#[async_trait]
trait EnvelopeT<A: Actor>: Send + Sync {
trait EnvelopeT<A: Actor>: Send {
fn debug_msg(&self) -> String;

/// Returns the message as a boxed any.
Expand All @@ -95,7 +95,7 @@ trait EnvelopeT<A: Actor>: Send + Sync {
impl<A, M> EnvelopeT<A> for Option<(oneshot::Sender<A::Reply>, M)>
where
A: DeferableReplyHandler<M>,
M: 'static + Send + Sync + fmt::Debug,
M: fmt::Debug + Send + 'static,
{
fn debug_msg(&self) -> String {
#[allow(clippy::needless_option_take)]
Expand Down Expand Up @@ -143,7 +143,7 @@ pub(crate) fn wrap_in_envelope<A, M>(
) -> (Envelope<A>, oneshot::Receiver<A::Reply>)
where
A: DeferableReplyHandler<M>,
M: 'static + Send + Sync + fmt::Debug,
M: fmt::Debug + Send + 'static,
{
let (response_tx, response_rx) = oneshot::channel();
let handler_envelope = Some((response_tx, msg));
Expand Down
18 changes: 9 additions & 9 deletions quickwit/quickwit-actors/src/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ impl<A: Actor> Mailbox<A> {
) -> Result<oneshot::Receiver<A::Reply>, SendError>
where
A: DeferableReplyHandler<M>,
M: 'static + Send + Sync + fmt::Debug,
M: fmt::Debug + Send + 'static,
{
self.send_message_with_backpressure_counter(message, None)
.await
Expand All @@ -189,7 +189,7 @@ impl<A: Actor> Mailbox<A> {
) -> Result<oneshot::Receiver<A::Reply>, TrySendError<M>>
where
A: DeferableReplyHandler<M>,
M: 'static + Send + Sync + fmt::Debug,
M: fmt::Debug + Send + 'static,
{
let (envelope, response_rx) = self.wrap_in_envelope(message);
self.inner
Expand All @@ -211,7 +211,7 @@ impl<A: Actor> Mailbox<A> {
fn wrap_in_envelope<M>(&self, message: M) -> (Envelope<A>, oneshot::Receiver<A::Reply>)
where
A: DeferableReplyHandler<M>,
M: 'static + Send + Sync + fmt::Debug,
M: fmt::Debug + Send + 'static,
{
let guard = self
.inner
Expand All @@ -233,7 +233,7 @@ impl<A: Actor> Mailbox<A> {
) -> Result<oneshot::Receiver<A::Reply>, SendError>
where
A: DeferableReplyHandler<M>,
M: 'static + Send + Sync + fmt::Debug,
M: fmt::Debug + Send + 'static,
{
let (envelope, response_rx) = self.wrap_in_envelope(message);
match self.inner.tx.try_send_low_priority(envelope) {
Expand All @@ -259,7 +259,7 @@ impl<A: Actor> Mailbox<A> {
) -> Result<oneshot::Receiver<A::Reply>, SendError>
where
A: DeferableReplyHandler<M>,
M: 'static + Send + Sync + fmt::Debug,
M: fmt::Debug + Send + 'static,
{
let (envelope, response_rx) = self.wrap_in_envelope(message);
self.inner.tx.send_high_priority(envelope)?;
Expand All @@ -273,7 +273,7 @@ impl<A: Actor> Mailbox<A> {
) -> Result<oneshot::Receiver<A::Reply>, SendError>
where
A: DeferableReplyHandler<M>,
M: 'static + Send + Sync + fmt::Debug,
M: fmt::Debug + Send + 'static,
{
let (envelope, response_rx) = self.wrap_in_envelope(message);
match priority {
Expand All @@ -292,7 +292,7 @@ impl<A: Actor> Mailbox<A> {
pub async fn ask<M, T>(&self, message: M) -> Result<T, AskError<Infallible>>
where
A: DeferableReplyHandler<M, Reply = T>,
M: 'static + Send + Sync + fmt::Debug,
M: fmt::Debug + Send + 'static,
{
self.ask_with_backpressure_counter(message, None).await
}
Expand All @@ -315,7 +315,7 @@ impl<A: Actor> Mailbox<A> {
) -> Result<T, AskError<Infallible>>
where
A: DeferableReplyHandler<M, Reply = T>,
M: 'static + Send + Sync + fmt::Debug,
M: fmt::Debug + Send + 'static,
{
let resp = self
.send_message_with_backpressure_counter(message, backpressure_micros_counter_opt)
Expand All @@ -332,7 +332,7 @@ impl<A: Actor> Mailbox<A> {
pub async fn ask_for_res<M, T, E>(&self, message: M) -> Result<T, AskError<E>>
where
A: DeferableReplyHandler<M, Reply = Result<T, E>>,
M: fmt::Debug + Send + Sync + 'static,
M: fmt::Debug + Send + 'static,
E: fmt::Debug,
{
self.send_message(message)
Expand Down
28 changes: 18 additions & 10 deletions quickwit/quickwit-actors/src/spawn_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

use anyhow::Context;
use quickwit_common::metrics::IntCounter;
use sync_wrapper::SyncWrapper;
use tokio::sync::watch;
use tracing::{debug, error, info};

Expand Down Expand Up @@ -170,7 +171,7 @@ impl<A: Actor> SpawnBuilder<A> {
(mailbox, actor_handle)
}

pub fn supervise_fn<F: Fn() -> A + Send + Sync + 'static>(
pub fn supervise_fn<F: Fn() -> A + Send + 'static>(
mut self,
actor_factory: F,
) -> (Mailbox<A>, ActorHandle<Supervisor<A>>) {
Expand Down Expand Up @@ -224,14 +225,14 @@ fn try_recv_envelope<A: Actor>(inbox: &mut Inbox<A>, ctx: &ActorContext<A>) -> O
}

struct ActorExecutionEnv<A: Actor> {
actor: A,
actor: SyncWrapper<A>,
inbox: Inbox<A>,
ctx: ActorContext<A>,
}

impl<A: Actor> ActorExecutionEnv<A> {
async fn initialize(&mut self) -> Result<(), ActorExitStatus> {
self.actor.initialize(&self.ctx).await
self.actor.get_mut().initialize(&self.ctx).await
}

async fn process_messages(&mut self) -> ActorExitStatus {
Expand All @@ -247,15 +248,17 @@ impl<A: Actor> ActorExecutionEnv<A> {
mut envelope: Envelope<A>,
) -> Result<(), ActorExitStatus> {
self.yield_and_check_if_killed().await?;
envelope.handle_message(&mut self.actor, &self.ctx).await?;
envelope
.handle_message(self.actor.get_mut(), &self.ctx)
.await?;
Ok(())
}

async fn yield_and_check_if_killed(&self) -> Result<(), ActorExitStatus> {
async fn yield_and_check_if_killed(&mut self) -> Result<(), ActorExitStatus> {
if self.ctx.kill_switch().is_dead() {
return Err(ActorExitStatus::Killed);
}
if self.actor.yield_after_each_message() {
if self.actor.get_mut().yield_after_each_message() {
self.ctx.yield_now().await;
if self.ctx.kill_switch().is_dead() {
return Err(ActorExitStatus::Killed);
Expand All @@ -281,7 +284,7 @@ impl<A: Actor> ActorExecutionEnv<A> {
break;
}
}
self.actor.on_drained_messages(&self.ctx).await?;
self.actor.get_mut().on_drained_messages(&self.ctx).await?;
self.ctx.idle();
if self.ctx.mailbox().is_last_mailbox() {
// No one will be able to send us more messages.
Expand All @@ -300,9 +303,10 @@ impl<A: Actor> ActorExecutionEnv<A> {
.map(|scheduler_client| scheduler_client.no_advance_time_guard());
if let Err(finalize_error) = self
.actor
.get_mut()
.finalize(&exit_status, &self.ctx)
.await
.with_context(|| format!("Finalization of actor {}", self.actor.name()))
.with_context(|| format!("Finalization of actor {}", self.actor.get_mut().name()))
{
error!(error=?finalize_error, "Finalizing failed, set exit status to panicked.");
return ActorExitStatus::Panicked;
Expand Down Expand Up @@ -332,7 +336,7 @@ impl<A: Actor> Drop for ActorExecutionEnv<A> {
// We rely on this object internally to fetch a post-mortem state,
// even in case of a panic.
fn drop(&mut self) {
self.ctx.observe(&mut self.actor);
self.ctx.observe(self.actor.get_mut());
}
}

Expand All @@ -342,7 +346,11 @@ async fn actor_loop<A: Actor>(
no_advance_time_guard: NoAdvanceTimeGuard,
ctx: ActorContext<A>,
) -> ActorExitStatus {
let mut actor_env = ActorExecutionEnv { actor, inbox, ctx };
let mut actor_env = ActorExecutionEnv {
actor: SyncWrapper::new(actor),
inbox,
ctx,
};

let initialize_exit_status_res: Result<(), ActorExitStatus> = actor_env.initialize().await;
drop(no_advance_time_guard);
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-actors/src/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub struct SupervisorState {

pub struct Supervisor<A: Actor> {
actor_name: String,
actor_factory: Box<dyn Fn() -> A + Sync + Send>,
actor_factory: Box<dyn Fn() -> A + Send>,
inbox: Inbox<A>,
handle_opt: Option<ActorHandle<A>>,
state: SupervisorState,
Expand Down Expand Up @@ -95,7 +95,7 @@ impl<A: Actor> Actor for Supervisor<A> {
impl<A: Actor> Supervisor<A> {
pub(crate) fn new(
actor_name: String,
actor_factory: Box<dyn Fn() -> A + Sync + Send>,
actor_factory: Box<dyn Fn() -> A + Send>,
inbox: Inbox<A>,
handle: ActorHandle<A>,
) -> Self {
Expand Down
87 changes: 87 additions & 0 deletions quickwit/quickwit-actors/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::cell::Cell;
use std::collections::HashMap;
use std::ops::Mul;
use std::time::Duration;
Expand Down Expand Up @@ -638,3 +639,89 @@ async fn test_drain_is_called() {
);
universe.assert_quit().await;
}

#[tokio::test]
async fn test_unsync_actor() {
#[derive(Default)]
struct UnsyncActor(Cell<u64>);

impl Actor for UnsyncActor {
type ObservableState = u64;

fn observable_state(&self) -> Self::ObservableState {
self.0.get()
}
}

#[async_trait]
impl Handler<u64> for UnsyncActor {
type Reply = u64;

async fn handle(
&mut self,
number: u64,
_ctx: &ActorContext<Self>,
) -> Result<u64, ActorExitStatus> {
*self.0.get_mut() += number;
Ok(self.0.get())
}
}
let universe = Universe::with_accelerated_time();
let unsync_message_actor = UnsyncActor::default();
let (mailbox, _handle) = universe.spawn_builder().spawn(unsync_message_actor);

let response = mailbox.ask(1).await.unwrap();
assert_eq!(response, 1);

universe.assert_quit().await;
}

#[tokio::test]
async fn test_unsync_actor_message() {
#[derive(Default)]
struct UnsyncMessageActor(u64);

impl Actor for UnsyncMessageActor {
type ObservableState = u64;

fn observable_state(&self) -> Self::ObservableState {
self.0
}
}

#[async_trait]
impl Handler<Cell<u64>> for UnsyncMessageActor {
type Reply = anyhow::Result<u64>;

async fn handle(
&mut self,
number: Cell<u64>,
_ctx: &ActorContext<Self>,
) -> Result<anyhow::Result<u64>, ActorExitStatus> {
self.0 += number.get();
Ok(Ok(self.0))
}
}
let universe = Universe::with_accelerated_time();
let unsync_message_actor = UnsyncMessageActor::default();
let (mailbox, _handle) = universe.spawn_builder().spawn(unsync_message_actor);

let response_rx = mailbox.send_message(Cell::new(1)).await.unwrap();
assert_eq!(response_rx.await.unwrap().unwrap(), 1);

let response = mailbox.ask(Cell::new(1)).await.unwrap().unwrap();
assert_eq!(response, 2);

let response = mailbox.ask_for_res(Cell::new(1)).await.unwrap();
assert_eq!(response, 3);

let response_rx = mailbox
.send_message_with_high_priority(Cell::new(1))
.unwrap();
assert_eq!(response_rx.await.unwrap().unwrap(), 4);

let response_rx = mailbox.try_send_message(Cell::new(1)).unwrap();
assert_eq!(response_rx.await.unwrap().unwrap(), 5);

universe.assert_quit().await;
}
Loading

0 comments on commit 25c30d8

Please sign in to comment.