Skip to content

Commit

Permalink
feat: request coalescing
Browse files Browse the repository at this point in the history
  • Loading branch information
Fyko committed Oct 10, 2023
1 parent f99f21a commit 167de7f
Show file tree
Hide file tree
Showing 12 changed files with 188 additions and 19 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions example/src/entities/person/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ create_query_collection!(
GetPersonById,
GetPeopleByIds,
GetPersonByEmail,
],
[
DeletePersonById,
UpsertPerson,
UpsertPersonWithTTL,
Expand Down
1 change: 1 addition & 0 deletions scyllax-cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub async fn run(opt: Opt) -> Result<()> {
source,
description,
} => migrate::add(&source, &description).await?,
MigrateCommand::Init { connect_opts } => migrate::init(connect_opts).await?,
MigrateCommand::Run {
source,
next,
Expand Down
35 changes: 35 additions & 0 deletions scyllax-cli/src/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,3 +276,38 @@ pub async fn revert(migration_source: &str, connect_opts: ConnectOpts) -> anyhow

Ok(())
}

/// Creates the `scyllax_migrations` table.
pub async fn init(connect_opts: ConnectOpts) -> anyhow::Result<()> {
let create_keyspace = r#"create keyspace if not exists scyllax_migrations with replication = { 'class': 'LocalStrategy' };"#;
let create_table = r#"
create table if not exists scyllax_migrations.migration (
bucket int,
version bigint,
description text,
installed_on timestamp,
success boolean,
checksum blob,
execution_time bigint,
primary key (bucket, version)
);"#;

let session = create_session(
connect_opts.scylla_nodes.split(','),
Some(connect_opts.keyspace),
)
.await?;

for query in [create_keyspace, create_table] {
let prepared_query = Query::new(query);
session.query(prepared_query, ()).await?;
}

println!(
"{}\n{}",
style("scyllax_migrations keyspace and table created.").green(),
style("It's recommended you manually create these tables in production.")
);

Ok(())
}
6 changes: 6 additions & 0 deletions scyllax-cli/src/opt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ pub enum MigrateCommand {
source: Source,
},

/// Creates the `scyllax_migrations` keyspace.
Init {
#[clap(flatten)]
connect_opts: ConnectOpts,
},

/// Run all pending migrations.
Run {
#[clap(flatten)]
Expand Down
74 changes: 64 additions & 10 deletions scyllax-macros-core/src/prepare.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! This module contains the `prepare_queries!` macro.

Check failure on line 1 in scyllax-macros-core/src/prepare.rs

View workflow job for this annotation

GitHub Actions / Check Suite

Diff in /home/runner/work/scyllax/scyllax/scyllax-macros-core/src/prepare.rs
use proc_macro2::TokenStream;
use quote::quote;
use quote::{quote, ToTokens, format_ident};
use syn::{
parse::{Parse, ParseStream},
ExprArray,
Expand All @@ -12,15 +12,17 @@ pub struct PrepareQueriesInput {
/// The name of the struct to generate.
pub name: syn::Ident,
/// The queries to attach to the struct.
pub queries: Vec<syn::Ident>,
pub read_queries: Vec<syn::Ident>,
/// Write queries to attach to the struct.
pub write_queries: Vec<syn::Ident>,
}

impl Parse for PrepareQueriesInput {
fn parse(input: ParseStream<'_>) -> syn::Result<Self> {
let name = input.parse()?;
input.parse::<syn::Token![,]>()?;

let queries = input
let read_queries = input
.parse::<ExprArray>()?
.elems
.iter()
Expand All @@ -33,7 +35,21 @@ impl Parse for PrepareQueriesInput {
})
.collect::<syn::Result<Vec<_>>>()?;

Ok(Self { name, queries })
input.parse::<syn::Token![,]>()?;
let write_queries = input
.parse::<ExprArray>()?
.elems
.iter()
.map(|expr| {
if let syn::Expr::Path(path) = expr {
Ok(path.path.get_ident().unwrap().clone())
} else {
Err(syn::Error::new_spanned(expr, "expected an identifier"))
}
})

Check failure on line 49 in scyllax-macros-core/src/prepare.rs

View workflow job for this annotation

GitHub Actions / Check Suite

Diff in /home/runner/work/scyllax/scyllax/scyllax-macros-core/src/prepare.rs
.collect::<syn::Result<Vec<_>>>()?;

Ok(Self { name, read_queries, write_queries })
}
}

Expand All @@ -51,10 +67,12 @@ pub fn expand(input: TokenStream) -> TokenStream {
Ok(args) => args,
Err(e) => return e.to_compile_error(),
};

Check failure on line 69 in scyllax-macros-core/src/prepare.rs

View workflow job for this annotation

GitHub Actions / Check Suite

Diff in /home/runner/work/scyllax/scyllax/scyllax-macros-core/src/prepare.rs
let queries = args.queries;
let read_queries = args.read_queries;
let write_queries = args.write_queries;
let queries: Vec<&proc_macro2::Ident> = read_queries.iter().chain(write_queries.iter()).collect();
let name = args.name;

let stmts = queries.iter().map(|field| {
let prepared_statements = queries.iter().map(|field| {
let doc = format!("The prepared statement for `{}`.", field);
quote! {
#[allow(non_snake_case)]
Expand All @@ -63,7 +81,7 @@ pub fn expand(input: TokenStream) -> TokenStream {
}
});

let gets = queries.iter().map(|field| {
let get_prepared_statements = queries.iter().map(|field| {
quote! {
impl scyllax::prelude::GetPreparedStatement<#field> for #name {
#[doc = "Get a prepared statement."]
Expand All @@ -80,23 +98,59 @@ pub fn expand(input: TokenStream) -> TokenStream {
}
});

let coalescing_senders = read_queries.iter().map(|field| {
let doc = format!("The task for `{}`.", field);
let field = format_ident!("{}_task", field).to_token_stream();
quote! {
#[allow(non_snake_case)]
#[doc = #doc]
pub #field: tokio::sync::mpsc::Sender<scyllax::executor::ShardMessage<'_, #field>>,
}
});

let get_coalescing_senders = read_queries.iter().map(|field| {
quote! {
impl scyllax::prelude::GetCoalescingSender<#field> for #name {
#[doc = "Get a task."]
fn get(&self) -> &tokio::sync::mpsc::Sender<scyllax::executor::ShardMessage<'_, #field>> {
&self.#field
}
}
}
});

let create_senders = read_queries.iter().map(|field| {
let field = format_ident!("{}_task", field);
quote! {
#field: {
let (tx, rx) = tokio::sync::mpsc::channel(100);
let queries = self.clone();
tokio::spawn(self.read_task(rx));
tx
},
}
});

quote! {
#[doc = "A collection of prepared statements."]
#[allow(non_snake_case)]
pub struct #name {
#(#stmts)*
#(#prepared_statements)*
#(#coalescing_senders)*
}

#[scyllax::prelude::async_trait]
#[doc = "A collection of prepared statements."]
impl scyllax::prelude::QueryCollection for #name {
async fn new(session: &scylla::Session) -> Result<Self, scyllax::prelude::ScyllaxError> {
Ok(Self {
#(#prepares)*
#(#prepares)*,
#(#create_senders)*
})
}
}

#(#gets)*
#(#get_prepared_statements)*
#(#get_coalescing_senders)*
}
}
1 change: 1 addition & 0 deletions scyllax/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ scylla.workspace = true
scyllax-macros = { version = "0.1.9-alpha", path = "../scyllax-macros" }
scyllax-macros-core = { version = "0.1.9-alpha", path = "../scyllax-macros-core" }
thiserror = "1"
tokio.workspace = true
tracing.workspace = true
uuid.workspace = true
12 changes: 11 additions & 1 deletion scyllax/src/collection.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use crate::{error::ScyllaxError, executor::GetPreparedStatement, queries::Query};
use std::sync::mpsc;

Check warning on line 1 in scyllax/src/collection.rs

View workflow job for this annotation

GitHub Actions / Run the example

unused import: `std::sync::mpsc`

Check failure on line 1 in scyllax/src/collection.rs

View workflow job for this annotation

GitHub Actions / Check Suite

Diff in /home/runner/work/scyllax/scyllax/scyllax/src/collection.rs

use crate::{error::ScyllaxError, executor::{GetPreparedStatement, GetCoalescingSender, ShardMessage}, queries::Query, prelude::ReadQuery};
use async_trait::async_trait;
use scylla::{prepared_statement::PreparedStatement, Session};
use tokio::sync::mpsc::Sender;

/// A collection of prepared statements.
#[async_trait]
Expand All @@ -15,6 +18,13 @@ pub trait QueryCollection {
{
<Self as GetPreparedStatement<T>>::get(self)
}

fn get_task<T: Query + ReadQuery>(&self) -> &Sender<ShardMessage<T>>

Check warning on line 22 in scyllax/src/collection.rs

View workflow job for this annotation

GitHub Actions / Run the example

hidden lifetime parameters in types are deprecated
where
Self: GetCoalescingSender<T>,
{
<Self as GetCoalescingSender<T>>::get(self)
}
}

#[tracing::instrument(skip(session))]
Expand Down
10 changes: 8 additions & 2 deletions scyllax/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
//! ScyllaX error types

use tokio::sync::oneshot::error::RecvError;

/// An error from ScyllaX
#[derive(thiserror::Error, Debug)]
#[derive(thiserror::Error, Clone, Debug)]
pub enum ScyllaxError {
/// A query error from Scylla
#[error("Scylla Query error: {0}")]
Expand All @@ -22,10 +24,14 @@ pub enum ScyllaxError {
/// An error when serializing values
#[error("Failed to serialize values: {0}")]
SerializedValues(#[from] scylla::frame::value::SerializeValuesError),

/// An error when using receivers
#[error("Receiver error: {0}")]
ReceiverError(#[from] RecvError),
}

/// An error when building an upsert query
#[derive(thiserror::Error, Debug)]
#[derive(thiserror::Error, Clone, Debug)]
pub enum BuildUpsertQueryError {
/// There were too many values (usually ignored since we don't set a capacity on [`scylla::frame::value::SerializedValues`]])
#[error("Too many values when adding {field}")]
Expand Down
57 changes: 52 additions & 5 deletions scyllax/src/executor.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
//! The `scyllax` [`Executor`] processes queries.

Check failure on line 1 in scyllax/src/executor.rs

View workflow job for this annotation

GitHub Actions / Check Suite

Diff in /home/runner/work/scyllax/scyllax/scyllax/src/executor.rs
use std::{collections::HashMap, sync::mpsc::RecvError};

Check warning on line 2 in scyllax/src/executor.rs

View workflow job for this annotation

GitHub Actions / Run the example

unused import: `sync::mpsc::RecvError`
use crate::{
collection::QueryCollection,
error::ScyllaxError,
prelude::WriteQuery,
queries::{Query, ReadQuery},

Check failure on line 7 in scyllax/src/executor.rs

View workflow job for this annotation

GitHub Actions / Check Suite

Diff in /home/runner/work/scyllax/scyllax/scyllax/src/executor.rs
};
use scylla::{prepared_statement::PreparedStatement, QueryResult, Session, SessionBuilder};
use tokio::sync::mpsc::{Sender, Receiver};
use tokio::sync::oneshot;

/// Creates a new [`CachingSession`] and returns it
pub async fn create_session(
Expand All @@ -28,12 +31,18 @@ pub trait GetPreparedStatement<T: Query> {
fn get(&self) -> &PreparedStatement;
}

pub trait GetCoalescingSender<T: Query + ReadQuery> {
fn get(&self) -> &Sender<ShardMessage<'_, T>>;
}

#[derive(Debug)]
pub struct Executor<T> {
pub session: Session,
queries: T,

Check failure on line 41 in scyllax/src/executor.rs

View workflow job for this annotation

GitHub Actions / Check Suite

Diff in /home/runner/work/scyllax/scyllax/scyllax/src/executor.rs
}

pub type ShardMessage<'a, Q: Query + ReadQuery> = (&'a Q, oneshot::Sender<Result<Q::Output, ScyllaxError>>);

Check warning on line 44 in scyllax/src/executor.rs

View workflow job for this annotation

GitHub Actions / Run the example

bounds on generic parameters are not enforced in type aliases

impl<T: QueryCollection> Executor<T> {
pub async fn new(session: Session) -> Result<Self, ScyllaxError> {
let queries = T::new(&session).await?;
Expand All @@ -44,14 +53,52 @@ impl<T: QueryCollection> Executor<T> {
pub async fn execute_read<Q>(&self, query: &Q) -> Result<Q::Output, ScyllaxError>
where
Q: Query + ReadQuery,
T: GetPreparedStatement<Q>,
T: GetPreparedStatement<Q> + GetCoalescingSender<Q>,
{
let statement = self.queries.get_prepared::<Q>();
let variables = query.bind()?;
let (response_tx, response_rx) = oneshot::channel();
let task = self.queries.get_task::<Q>();

task.send((query, response_tx)).await.unwrap();

match response_rx.await {
Ok(result) => result,
Err(e) => Err(ScyllaxError::ReceiverError(e)),
}
}

async fn read_task<Q>(&self, mut rx: Receiver<ShardMessage<'_, Q>>)

Check warning on line 69 in scyllax/src/executor.rs

View workflow job for this annotation

GitHub Actions / Run the example

method `read_task` is never used
where
Q: Query + ReadQuery,

Check failure on line 71 in scyllax/src/executor.rs

View workflow job for this annotation

GitHub Actions / Check Suite

Diff in /home/runner/work/scyllax/scyllax/scyllax/src/executor.rs
T: GetPreparedStatement<Q> + GetCoalescingSender<Q>,
{
let mut requests: HashMap<String, Vec<oneshot::Sender<Result<Q::Output, ScyllaxError>>>> = HashMap::new();

while let Some((query, tx)) = rx.recv().await {
let key = query.shard_key();

if let Some(senders) = requests.get_mut(&key) {
senders.push(tx);
} else {
let mut senders = Vec::new();
senders.push(tx);
requests.insert(key.clone(), senders);

let result = self.session.execute(statement, variables).await?;
// Execute the query here and send the result back
// let result = self.execute_read(&query).await;
let statement = self.queries.get_prepared::<Q>();
// FIXME: better error handling
let variables = query.bind().unwrap();
// FIXME: better error handling
let result = self.session.execute(statement, variables).await.unwrap();
let parsed = Q::parse_response(result).await;

Q::parse_response(result).await
if let Some(senders) = requests.remove(&key) {
for tx in senders {
let _ = tx.send(parsed.clone());
}
}
}
}
}

pub async fn execute_write<Q>(&self, query: &Q) -> Result<QueryResult, ScyllaxError>
Expand Down
2 changes: 1 addition & 1 deletion scyllax/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ pub use crate::{
collection::{prepare_query, QueryCollection},

Check failure on line 3 in scyllax/src/prelude.rs

View workflow job for this annotation

GitHub Actions / Check Suite

Diff in /home/runner/work/scyllax/scyllax/scyllax/src/prelude.rs
entity::EntityExt,
error::{BuildUpsertQueryError, ScyllaxError},
executor::{create_session, Executor, GetPreparedStatement},
executor::{create_session, Executor, GetPreparedStatement, GetCoalescingSender},
maybe_unset::MaybeUnset,
queries::{Query, ReadQuery, SerializedValuesResult, WriteQuery},
util::v1_uuid,
Expand Down
6 changes: 6 additions & 0 deletions scyllax/src/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ pub trait Query {
pub trait ReadQuery {
type Output: Clone + std::fmt::Debug + Send + Sync;

/// Returns the shard key for the query
fn shard_key(&self) -> String {
// TODO: impl me
String::new()
}

/// Parses the response from the database
async fn parse_response(rows: QueryResult) -> Result<Self::Output, ScyllaxError>;
}
Expand Down

0 comments on commit 167de7f

Please sign in to comment.