diff --git a/src/cursor.rs b/src/cursor.rs index 81dc441..4c23a4a 100644 --- a/src/cursor.rs +++ b/src/cursor.rs @@ -124,6 +124,43 @@ impl RowBinaryCursor { } } +pub(crate) struct RowBinaryCursor2 { + raw: RawCursor, + buffer: Vec +} + +impl RowBinaryCursor2 { + pub(crate) fn new(response: Response) -> Self { + Self { + raw: RawCursor::new(response), + buffer: vec![0; INITIAL_BUFFER_SIZE], + } + } + + pub(crate) async fn next<'a, 'b: 'a, T>(&'a mut self) -> Result> + where + T: Deserialize<'b>, + { + let buffer = &mut self.buffer; + + self.raw + .next(|pending| { + match rowbinary::deserialize_from(pending, &mut workaround_51132(buffer)[..]) { + Ok(value) => ControlFlow::Yield(value), + Err(Error::TooSmallBuffer(need)) => { + let new_len = (buffer.len() + need) + .checked_next_power_of_two() + .expect("oom"); + buffer.resize(new_len, 0); + ControlFlow::Retry + } + Err(err) => ControlFlow::Err(err), + } + }) + .await + } +} + // === JsonCursor === #[cfg(feature = "watch")] diff --git a/src/query.rs b/src/query.rs index 198f015..0ab2c13 100644 --- a/src/query.rs +++ b/src/query.rs @@ -10,6 +10,8 @@ use crate::{ sql::{Bind, SqlBuilder}, Client, }; +use crate::cursor::RowBinaryCursor2; +use crate::sql::Part; const MAX_QUERY_LEN_TO_USE_GET: usize = 8192; @@ -74,6 +76,26 @@ impl Query { Ok(RowCursor(RowBinaryCursor::new(response))) } + pub fn fetch_cells(mut self) -> Result { + // ensure we don't need to bind any fields + match &self.sql { + SqlBuilder::InProgress { parts, size } => { + if parts.iter().find(|p| matches!(p, Part::Arg)).is_some() { + return Err(Error::Custom("Cannot specify ?fields when calling fetch_rows".to_string())); + } + } + SqlBuilder::Failed(err) => { + return Err(Error::Custom(err.clone())) + } + } + + self.sql.append(" FORMAT RowBinary"); + + let response = self.do_execute(true)?; + + Ok(CellCursor(RowBinaryCursor2::new(response))) + } + /// Executes the query and returns just a single row. /// /// Note that `T` must be owned. @@ -187,3 +209,14 @@ impl RowCursor { self.0.next().await } } + +pub struct CellCursor(RowBinaryCursor2); + +impl CellCursor { + pub async fn next<'a, 'b: 'a, T>(&'a mut self) -> Result> + where + T: Deserialize<'b>, + { + self.0.next().await + } +} diff --git a/tests/it/query.rs b/tests/it/query.rs index 4a8e4a9..f304642 100644 --- a/tests/it/query.rs +++ b/tests/it/query.rs @@ -87,6 +87,36 @@ async fn fetch_one_and_optional() { assert_eq!(got_string, "bar"); } +#[crate::named] +#[tokio::test] +async fn fetch_cells() { + let client = prepare_database!(); + + client + .query("CREATE TABLE test(n String) ENGINE = MergeTree ORDER BY n") + .execute() + .await + .unwrap(); + + let q = "SELECT n FROM test ORDER BY n"; + + #[derive(Serialize, Row)] + struct Row { + n: String, + } + + let mut insert = client.insert("test").unwrap(); + insert.write(&Row { n: "foo".into() }).await.unwrap(); + insert.write(&Row { n: "bar".into() }).await.unwrap(); + insert.end().await.unwrap(); + + let mut cell_cursor = client.query(q).fetch_cells().unwrap(); + + assert_eq!(cell_cursor.next::().await.expect("Error getting cell"), Some("bar".into())); + assert_eq!(cell_cursor.next::().await.expect("Error getting cell"), Some("foo".into())); + assert_eq!(cell_cursor.next::().await.expect("Error getting cell"), None); +} + // See #19. #[crate::named] #[tokio::test]