Skip to content

Commit

Permalink
Added the ability to fetch cell-by-cell
Browse files Browse the repository at this point in the history
**SUPER HACKY**
* Allows for fetching cell-by-cell, with a different type for each
* This is a bit of a hack, but can be a solution for ClickHouse#10
  • Loading branch information
wspeirs committed Oct 18, 2023
1 parent 851b06a commit 3e72638
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 0 deletions.
37 changes: 37 additions & 0 deletions src/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,43 @@ impl<T> RowBinaryCursor<T> {
}
}

pub(crate) struct RowBinaryCursor2 {
raw: RawCursor,
buffer: Vec<u8>
}

impl RowBinaryCursor2 {
pub(crate) fn new(response: Response) -> Self {
Self {
raw: RawCursor::new(response),
buffer: vec![0; INITIAL_BUFFER_SIZE],
}
}

pub(crate) async fn next<'a, 'b: 'a, T>(&'a mut self) -> Result<Option<T>>
where
T: Deserialize<'b>,
{
let buffer = &mut self.buffer;

self.raw
.next(|pending| {
match rowbinary::deserialize_from(pending, &mut workaround_51132(buffer)[..]) {
Ok(value) => ControlFlow::Yield(value),
Err(Error::TooSmallBuffer(need)) => {
let new_len = (buffer.len() + need)
.checked_next_power_of_two()
.expect("oom");
buffer.resize(new_len, 0);
ControlFlow::Retry
}
Err(err) => ControlFlow::Err(err),
}
})
.await
}
}

// === JsonCursor ===

#[cfg(feature = "watch")]
Expand Down
33 changes: 33 additions & 0 deletions src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use crate::{
sql::{Bind, SqlBuilder},
Client,
};
use crate::cursor::RowBinaryCursor2;
use crate::sql::Part;

const MAX_QUERY_LEN_TO_USE_GET: usize = 8192;

Expand Down Expand Up @@ -74,6 +76,26 @@ impl Query {
Ok(RowCursor(RowBinaryCursor::new(response)))
}

pub fn fetch_cells(mut self) -> Result<CellCursor> {
// ensure we don't need to bind any fields
match &self.sql {
SqlBuilder::InProgress { parts, size } => {
if parts.iter().find(|p| matches!(p, Part::Arg)).is_some() {
return Err(Error::Custom("Cannot specify ?fields when calling fetch_rows".to_string()));
}
}
SqlBuilder::Failed(err) => {
return Err(Error::Custom(err.clone()))
}
}

self.sql.append(" FORMAT RowBinary");

let response = self.do_execute(true)?;

Ok(CellCursor(RowBinaryCursor2::new(response)))
}

/// Executes the query and returns just a single row.
///
/// Note that `T` must be owned.
Expand Down Expand Up @@ -187,3 +209,14 @@ impl<T> RowCursor<T> {
self.0.next().await
}
}

pub struct CellCursor(RowBinaryCursor2);

impl CellCursor {
pub async fn next<'a, 'b: 'a, T>(&'a mut self) -> Result<Option<T>>
where
T: Deserialize<'b>,
{
self.0.next().await
}
}
30 changes: 30 additions & 0 deletions tests/it/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,36 @@ async fn fetch_one_and_optional() {
assert_eq!(got_string, "bar");
}

#[crate::named]
#[tokio::test]
async fn fetch_cells() {
let client = prepare_database!();

client
.query("CREATE TABLE test(n String) ENGINE = MergeTree ORDER BY n")
.execute()
.await
.unwrap();

let q = "SELECT n FROM test ORDER BY n";

#[derive(Serialize, Row)]
struct Row {
n: String,
}

let mut insert = client.insert("test").unwrap();
insert.write(&Row { n: "foo".into() }).await.unwrap();
insert.write(&Row { n: "bar".into() }).await.unwrap();
insert.end().await.unwrap();

let mut cell_cursor = client.query(q).fetch_cells().unwrap();

assert_eq!(cell_cursor.next::<String>().await.expect("Error getting cell"), Some("bar".into()));
assert_eq!(cell_cursor.next::<String>().await.expect("Error getting cell"), Some("foo".into()));
assert_eq!(cell_cursor.next::<String>().await.expect("Error getting cell"), None);
}

// See #19.
#[crate::named]
#[tokio::test]
Expand Down

0 comments on commit 3e72638

Please sign in to comment.