diff --git a/Makefile.toml b/Makefile.toml index 8063e0d..8833db0 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -7,31 +7,31 @@ default_to_workspace = false [tasks.setup] script = ''' - echo # installing git hooks + echo installing git hooks pre-commit --version || pip install pre-commit pre-commit install || echo "failed to install git hooks!" 1>&2 - echo # install cargo-binstall - cargo install -q cargo-binstall + echo install cargo-binstall + cargo install cargo-binstall - echo # things required for `cargo make test` - cargo binstall -q -y cargo-nextest + echo things required for `cargo make test` + cargo binstall -y cargo-nextest - echo # things required by `cargo make coverage` + echo things required by `cargo make coverage` rustup component add llvm-tools-preview - cargo install -q cargo-llvm-cov + cargo install cargo-llvm-cov - echo # things required by `cargo make audit` - cargo binstall -q -y cargo-audit + echo things required by `cargo make audit` + cargo binstall -y cargo-audit - echo # things required by `cargo release` + echo things required by `cargo release` cargo binstall -q -y cargo-release - echo # things required by `cargo make sort-deps` - cargo binstall -q -y cargo-sort + echo things required by `cargo make sort-deps` + cargo binstall -y cargo-sort - echo # things required by mdbook - cargo binstall -q -y mdbook mdbook-admonish + echo things required by mdbook + cargo binstall -y mdbook mdbook-admonish ''' [tasks.example] diff --git a/scyllax/src/collection.rs b/scyllax/src/collection.rs index 47eb3f9..f5544e5 100644 --- a/scyllax/src/collection.rs +++ b/scyllax/src/collection.rs @@ -48,5 +48,15 @@ pub async fn prepare_query( query_type: &str, ) -> Result { tracing::info!("preparing query"); - Ok(session.prepare(query).await?) + + let res = session.prepare(query).await; + + match res { + Ok(prepared) => Ok(prepared), + Err(err) => { + tracing::error!("failed to prepare query: {:#?}", err); + + return Err(err.into()); + } + } } diff --git a/scyllax/src/executor.rs b/scyllax/src/executor.rs index 00826c3..6221676 100644 --- a/scyllax/src/executor.rs +++ b/scyllax/src/executor.rs @@ -60,7 +60,7 @@ pub type ShardMessage = (Q, oneshot::Sender>); type TaskRequestMap = HashMap>>>; /// The result of a read query. -type ReadQueryResult = Result<::Output, ScyllaxError>; +type ReadQueryResult = Arc::Output, ScyllaxError>>; /// A message sent to the [`Executor::read_query_runner`] task. pub struct QueryRunnerMessage { @@ -71,6 +71,8 @@ pub struct QueryRunnerMessage { impl Executor { /// Creates a new [`Executor`] from a [`Session`] and a [`QueryCollection`]. + // all this is super ugly and inefficient, but its okay because + // it only happens once per executor pub async fn new(session: Arc) -> Result { let queries = T::new(&session).await?; let executor = Arc::new(Self { @@ -79,7 +81,6 @@ impl Executor { }); let queries = executor.queries.clone().register_tasks(executor); - // let queries = Arc::new(queries); let executor = Self { session, queries }; Ok(executor) @@ -96,10 +97,10 @@ impl Executor { return self.perform_read_query(query).await; } - let (response_tx, response_rx) = oneshot::channel(); + let (tx, rx) = oneshot::channel(); let task = self.queries.get_task::(); - match task.send((query, response_tx)).await { + match task.send((query, tx)).await { Ok(_) => (), Err(e) => { tracing::error!("error sending query to task: {:#?}", e); @@ -107,9 +108,14 @@ impl Executor { } } - match response_rx.await { + let result = match rx.await { Ok(result) => result, - Err(e) => Err(ScyllaxError::ReceiverError(e)), + Err(e) => return Err(ScyllaxError::ReceiverError(e)), + }; + + match Arc::try_unwrap(result) { + Ok(data) => data, + Err(arc) => (*arc).clone(), } } @@ -171,22 +177,22 @@ impl Executor { }); join_set.spawn(async move { - let res = match response_receiver.await { - Ok(result) => result, - Err(e) => Err(ScyllaxError::ReceiverError(e)), - }; + let res = response_receiver.await; tracing::debug!(hash = hash, "joinset handle returned: {:#?}", res); (hash, res) }); } }, + // this runs when the query is completed and needs be to dispatched to the requestors Some(join_handle) = join_set.join_next() => { tracing::debug!("join set recieved a result!"); if let Ok((hash, result)) = join_handle { if let Some(senders) = requests.remove(&hash) { + let res = result.unwrap(); + for sender in senders { - let _ = sender.send(result.clone()); + let _ = sender.send(res.clone()); } } } @@ -214,9 +220,7 @@ impl Executor { { tracing::debug!("running query for hash: {hash}"); let result = self.perform_read_query(query).await; - - // todo(fyko): arc this - let _ = response_transmitter.send(result.clone()); + let _ = response_transmitter.send(Arc::new(result)); } }