Skip to content

Commit

Permalink
refactor: less cloning
Browse files Browse the repository at this point in the history
  • Loading branch information
Fyko committed Nov 22, 2023
1 parent 0a28d14 commit 1c88f59
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 29 deletions.
28 changes: 14 additions & 14 deletions Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,31 @@ default_to_workspace = false

[tasks.setup]
script = '''
echo # installing git hooks
echo installing git hooks
pre-commit --version || pip install pre-commit
pre-commit install || echo "failed to install git hooks!" 1>&2
echo # install cargo-binstall
cargo install -q cargo-binstall
echo install cargo-binstall
cargo install cargo-binstall
echo # things required for `cargo make test`
cargo binstall -q -y cargo-nextest
echo things required for `cargo make test`
cargo binstall -y cargo-nextest
echo # things required by `cargo make coverage`
echo things required by `cargo make coverage`
rustup component add llvm-tools-preview
cargo install -q cargo-llvm-cov
cargo install cargo-llvm-cov
echo # things required by `cargo make audit`
cargo binstall -q -y cargo-audit
echo things required by `cargo make audit`
cargo binstall -y cargo-audit
echo # things required by `cargo release`
echo things required by `cargo release`
cargo binstall -q -y cargo-release
echo # things required by `cargo make sort-deps`
cargo binstall -q -y cargo-sort
echo things required by `cargo make sort-deps`
cargo binstall -y cargo-sort
echo # things required by mdbook
cargo binstall -q -y mdbook mdbook-admonish
echo things required by mdbook
cargo binstall -y mdbook mdbook-admonish
'''

[tasks.example]
Expand Down
12 changes: 11 additions & 1 deletion scyllax/src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,15 @@ pub async fn prepare_query(
query_type: &str,
) -> Result<PreparedStatement, ScyllaxError> {
tracing::info!("preparing query");
Ok(session.prepare(query).await?)

let res = session.prepare(query).await;

match res {
Ok(prepared) => Ok(prepared),
Err(err) => {
tracing::error!("failed to prepare query: {:#?}", err);

return Err(err.into());
}
}
}
32 changes: 18 additions & 14 deletions scyllax/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub type ShardMessage<Q> = (Q, oneshot::Sender<ReadQueryResult<Q>>);
type TaskRequestMap<Q> = HashMap<u64, Vec<oneshot::Sender<ReadQueryResult<Q>>>>;

/// The result of a read query.
type ReadQueryResult<Q> = Result<<Q as ReadQuery>::Output, ScyllaxError>;
type ReadQueryResult<Q> = Arc<Result<<Q as ReadQuery>::Output, ScyllaxError>>;

/// A message sent to the [`Executor::read_query_runner`] task.
pub struct QueryRunnerMessage<Q: ReadQuery> {
Expand All @@ -71,6 +71,8 @@ pub struct QueryRunnerMessage<Q: ReadQuery> {

impl<T: QueryCollection + Clone> Executor<T> {
/// Creates a new [`Executor`] from a [`Session`] and a [`QueryCollection`].
// all this is super ugly and inefficient, but its okay because
// it only happens once per executor
pub async fn new(session: Arc<Session>) -> Result<Self, ScyllaxError> {
let queries = T::new(&session).await?;
let executor = Arc::new(Self {
Expand All @@ -79,7 +81,6 @@ impl<T: QueryCollection + Clone> Executor<T> {
});

let queries = executor.queries.clone().register_tasks(executor);
// let queries = Arc::new(queries);
let executor = Self { session, queries };

Ok(executor)
Expand All @@ -96,20 +97,25 @@ impl<T: QueryCollection + Clone> Executor<T> {
return self.perform_read_query(query).await;
}

let (response_tx, response_rx) = oneshot::channel();
let (tx, rx) = oneshot::channel();
let task = self.queries.get_task::<Q>();

match task.send((query, response_tx)).await {
match task.send((query, tx)).await {
Ok(_) => (),
Err(e) => {
tracing::error!("error sending query to task: {:#?}", e);
return Err(ScyllaxError::NoRowsFound);
}
}

match response_rx.await {
let result = match rx.await {
Ok(result) => result,
Err(e) => Err(ScyllaxError::ReceiverError(e)),
Err(e) => return Err(ScyllaxError::ReceiverError(e)),
};

match Arc::try_unwrap(result) {
Ok(data) => data,
Err(arc) => (*arc).clone(),
}
}

Expand Down Expand Up @@ -171,22 +177,22 @@ impl<T: QueryCollection + Clone> Executor<T> {
});

join_set.spawn(async move {
let res = match response_receiver.await {
Ok(result) => result,
Err(e) => Err(ScyllaxError::ReceiverError(e)),
};
let res = response_receiver.await;
tracing::debug!(hash = hash, "joinset handle returned: {:#?}", res);

(hash, res)
});
}
},
// this runs when the query is completed and needs be to dispatched to the requestors
Some(join_handle) = join_set.join_next() => {
tracing::debug!("join set recieved a result!");
if let Ok((hash, result)) = join_handle {
if let Some(senders) = requests.remove(&hash) {
let res = result.unwrap();

for sender in senders {
let _ = sender.send(result.clone());
let _ = sender.send(res.clone());
}
}
}
Expand Down Expand Up @@ -214,9 +220,7 @@ impl<T: QueryCollection + Clone> Executor<T> {
{
tracing::debug!("running query for hash: {hash}");
let result = self.perform_read_query(query).await;

// todo(fyko): arc this
let _ = response_transmitter.send(result.clone());
let _ = response_transmitter.send(Arc::new(result));
}
}

Expand Down

0 comments on commit 1c88f59

Please sign in to comment.