Skip to content

Commit

Permalink
Updated to v0.8.2
Browse files Browse the repository at this point in the history
  • Loading branch information
bobozaur committed Sep 4, 2024
1 parent 8665cb0 commit 626c39c
Show file tree
Hide file tree
Showing 24 changed files with 225 additions and 213 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).

## [0.8.2] - 2024-09-04

## [0.7.4] - 2024-03-15

### Added
Expand Down
13 changes: 6 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "sqlx-exasol"
version = "0.7.4"
version = "0.8.2"
edition = "2021"
authors = ["bobozaur"]
description = "Exasol driver for the SQLx framework."
Expand Down Expand Up @@ -47,16 +47,15 @@ async-tungstenite = "0.25.0"
futures-io = "0.3.30"
futures-util = "0.3.30"
futures-core = "0.3.30"
serde_json = { version = "1.0.114", features = ["raw_value"] }
serde_json = { version = "1.0.114" }
serde = { version = "1.0.197", features = ["derive"] }
pin-project = "1.1.5"
lru = "0.12.3"
sqlx-core = "0.7.4"
# sqlx-macros-core = "0.7.4"
sqlx-core = "0.8.2"
# sqlx-macros-core = "0.8.2"
tracing = { version = "0.1.40", features = ["log"] }
arrayvec = "0.7.4"
arrayvec = "0.7.6"
rcgen = { version = "0.12.1", optional = true }
# tungstenite = "0.21.0"

# Feature flagged optional dependencies
uuid = { version = "1.7.0", features = ["serde"], optional = true }
Expand All @@ -81,7 +80,7 @@ bytes = { version = "1.5", optional = true }
futures-channel = { version = "0.3.30", optional = true }

[dev-dependencies]
sqlx = { version = "0.7.4", features = [
sqlx = { version = "0.8.2", features = [
"runtime-tokio",
"tls-native-tls",
"migrate",
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ Inspired by [Py-Exasol](https://github.com/exasol/pyexasol) and based on the (no
>
> With that in mind, please favor using a fixed version of `sqlx` and `sqlx-exasol` in `Cargo.toml` to avoid issues, such as:
> ```toml
> sqlx = "=0.7.4"
> sqlx-exasol = "=0.7.4"
> sqlx = "=0.8.2"
> sqlx-exasol = "=0.8.2"
> ```
Expand Down
86 changes: 35 additions & 51 deletions src/arguments.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use serde::Serialize;
use sqlx_core::{arguments::Arguments, encode::Encode, types::Type, Error as SqlxError};
use serde_json::Error as SerdeError;
use sqlx_core::error::BoxDynError;
use sqlx_core::{arguments::Arguments, encode::Encode, types::Type};

use crate::{database::Exasol, error::ExaProtocolError, type_info::ExaTypeInfo};

Expand All @@ -17,75 +19,64 @@ impl<'q> Arguments<'q> for ExaArguments {
self.buf.inner.reserve(size);
}

fn add<T>(&mut self, value: T)
fn add<T>(&mut self, value: T) -> Result<(), BoxDynError>
where
T: 'q + Send + Encode<'q, Self::Database> + Type<Self::Database>,
T: 'q + Encode<'q, Self::Database> + Type<Self::Database>,
{
let ty = value.produces().unwrap_or_else(T::type_info);
self.types.push(ty);

self.buf.start_seq();
let _ = value.encode(&mut self.buf);
let _ = value.encode(&mut self.buf)?;
self.buf.end_seq();
self.buf.add_separator();

self.buf.register_param_count();
self.buf.check_param_count()?;

self.types.push(ty);

Ok(())
}

fn len(&self) -> usize {
self.types.len()
}
}

#[derive(Debug)]
pub struct ExaBuffer {
pub(crate) inner: Vec<u8>,
pub(crate) num_param_sets: NumParamSets,
pub(crate) binding_err: Option<SqlxError>,
params_count: usize,
pub(crate) params_count: usize,
pub(crate) num_param_sets: Option<usize>,
}

impl ExaBuffer {
/// Serializes and appends a value to this buffer.
pub fn append<T>(&mut self, value: T)
pub fn append<T>(&mut self, value: T) -> Result<(), SerdeError>
where
T: Serialize,
{
self.params_count += 1;

// We can't error out here, so store the first error encountered for later
if let Err(e) = serde_json::to_writer(&mut self.inner, &value) {
if self.binding_err.is_none() {
self.binding_err = Some(SqlxError::Protocol(e.to_string()));
}
}
serde_json::to_writer(&mut self.inner, &value)
}

/// Serializes and appends an iterator of values to this buffer.
pub fn append_iter<'q, I, T>(&mut self, iter: I)
pub fn append_iter<'q, I, T>(&mut self, iter: I) -> Result<(), BoxDynError>
where
I: IntoIterator<Item = T>,
T: 'q + Encode<'q, Exasol> + Type<Exasol>,
{
let mut iter = iter.into_iter();

if let Some(value) = iter.next() {
let _ = value.encode(self);
let _ = value.encode(self)?;
}

for value in iter {
self.add_separator();
let _ = value.encode(self);
let _ = value.encode(self)?;
}
}

/// Outputs the numbers of parameter sets in the buffer.
///
/// # Errors
///
/// Will throw an error if a mismatch was recorded.
pub(crate) fn num_param_sets(&self) -> Result<usize, ExaProtocolError> {
match self.num_param_sets {
NumParamSets::NotSet => Ok(0),
NumParamSets::Set(n) => Ok(n),
NumParamSets::Mismatch(n, m) => Err(ExaProtocolError::ParameterLengthMismatch(n, m)),
}
Ok(())
}

/// Ends the main sequence serialization in the buffer.
Expand Down Expand Up @@ -120,24 +111,27 @@ impl ExaBuffer {
/// Registers the number of rows we bound parameters for.
///
/// The first time we add an argument, we store the number of rows
/// we pass parameters for.
/// we pass parameters for. This is useful for when arrays of
/// parameters get passed for each column.
///
/// All subsequent calls will check that the number of rows is the same.
/// If it is not, the first mismatch is recorded so we can throw
/// an error later (before sending data to the database).
///
/// This is also due to `Encode` not throwing errors.
fn register_param_count(&mut self) {
fn check_param_count(&mut self) -> Result<(), ExaProtocolError> {
let count = self.params_count;

self.num_param_sets = match self.num_param_sets {
NumParamSets::NotSet => NumParamSets::Set(count),
NumParamSets::Set(n) if n != count => NumParamSets::Mismatch(n, count),
num_rows => num_rows,
};

// We must reset the count in preparation for the next parameter.
self.params_count = 0;

match self.num_param_sets {
Some(n) if n == self.params_count => (),
Some(n) => Err(ExaProtocolError::ParameterLengthMismatch(count, n))?,
None => self.num_param_sets = Some(count),
};

Ok(())
}
}

Expand All @@ -146,21 +140,11 @@ impl Default for ExaBuffer {
let inner = Vec::with_capacity(1);
let mut buffer = Self {
inner,
num_param_sets: NumParamSets::NotSet,
params_count: 0,
binding_err: None,
num_param_sets: None,
};

buffer.start_seq();
buffer
}
}

/// Enum illustrating the state of the parameter sets number
/// provided as bind arguments to a query.
#[derive(Clone, Copy, Debug)]
pub(crate) enum NumParamSets {
NotSet,
Set(usize),
Mismatch(usize, usize),
}
31 changes: 12 additions & 19 deletions src/command.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
#[cfg(feature = "etl")]
use std::net::IpAddr;

use serde::{
ser::{Error, SerializeSeq},
Serialize, Serializer,
};
use serde_json::value::RawValue;
use serde::{ser::SerializeSeq, Serialize, Serializer};
use sqlx_core::Error as SqlxError;

use crate::{
arguments::{ExaBuffer, NumParamSets},
arguments::ExaBuffer,
options::{ExaConnectOptionsRef, ProtocolVersion},
responses::ExaAttributes,
ExaTypeInfo,
Expand Down Expand Up @@ -88,9 +84,9 @@ impl<'a> ExaCommand<'a> {
columns: &'a [ExaTypeInfo],
buf: ExaBuffer,
attributes: &'a ExaAttributes,
) -> Result<Self, SqlxError> {
let prepared = ExecutePreparedStmt::new(handle, columns, buf, attributes)?;
Ok(Self::ExecutePreparedStatement(prepared))
) -> Self {
let prepared = ExecutePreparedStmt::new(handle, columns, buf, attributes);
Self::ExecutePreparedStatement(prepared)
}

pub fn new_close_prepared(handle: u16) -> Self {
Expand Down Expand Up @@ -212,17 +208,15 @@ impl<'a> ExecutePreparedStmt<'a> {
columns: &'a [ExaTypeInfo],
data: ExaBuffer,
attributes: &'a ExaAttributes,
) -> Result<Self, SqlxError> {
let prepared = Self {
) -> Self {
Self {
attributes,
statement_handle: handle,
num_columns: columns.len(),
num_rows: data.num_param_sets()?,
num_rows: data.params_count,
columns,
data: data.into(),
};

Ok(prepared)
}
}

fn serialize_parameters<S>(parameters: &[ExaTypeInfo], serializer: S) -> Result<S::Ok, S::Error>
Expand Down Expand Up @@ -267,12 +261,12 @@ pub(crate) struct ClosePreparedStmt {
#[derive(Debug, Clone)]
struct PreparedStmtData {
inner: Vec<u8>,
num_rows: NumParamSets,
num_rows: Option<usize>,
}

impl PreparedStmtData {
fn is_empty(&self) -> bool {
matches!(self.num_rows, NumParamSets::NotSet | NumParamSets::Set(0))
matches!(self.num_rows, None | Some(0))
}
}

Expand All @@ -281,8 +275,7 @@ impl Serialize for PreparedStmtData {
where
S: Serializer,
{
let raw_value: &RawValue = serde_json::from_slice(&self.inner).map_err(Error::custom)?;
raw_value.serialize(serializer)
self.inner.serialize(serializer)
}
}

Expand Down
13 changes: 8 additions & 5 deletions src/connection/executor.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::borrow::Cow;
use std::{borrow::Cow, future::ready};

use futures_core::{future::BoxFuture, stream::BoxStream};
use futures_util::TryStreamExt;
use futures_util::{FutureExt, TryStreamExt};
use sqlx_core::{
database::{Database, HasStatement},
database::Database,
describe::Describe,
executor::{Execute, Executor},
logger::QueryLogger,
Expand Down Expand Up @@ -38,8 +38,11 @@ impl<'c> Executor<'c> for &'c mut ExaConnection {
E: Execute<'q, Self::Database>,
{
let sql = query.sql();
let arguments = query.take_arguments();
let persistent = query.persistent();
let arguments = match query.take_arguments().map_err(SqlxError::Encode) {
Ok(a) => a,
Err(e) => return Box::pin(ready(Err(e)).into_stream()),
};

let logger = QueryLogger::new(sql, self.log_settings.clone());

Expand Down Expand Up @@ -72,7 +75,7 @@ impl<'c> Executor<'c> for &'c mut ExaConnection {
self,
sql: &'q str,
_parameters: &'e [<Self::Database as Database>::TypeInfo],
) -> BoxFuture<'e, Result<<Self::Database as HasStatement<'q>>::Statement, SqlxError>>
) -> BoxFuture<'e, Result<<Self::Database as Database>::Statement<'q>, SqlxError>>
where
'c: 'e,
{
Expand Down
10 changes: 2 additions & 8 deletions src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,20 +158,14 @@ impl ExaConnection {
async fn execute_prepared<'a, C, F>(
&'a mut self,
sql: &str,
mut args: ExaArguments,
args: ExaArguments,
persist: bool,
future_maker: C,
) -> Result<QueryResultStream<'a, C, F>, SqlxError>
where
C: Fn(&'a mut ExaWebSocket, u16, usize) -> Result<F, SqlxError>,
F: Future<Output = Result<(DataChunk, &'a mut ExaWebSocket), SqlxError>> + 'a,
{
// If some error occurred while binding parameters,
// we couldn't return it then, so we stored it and return it now.
if let Some(err) = args.buf.binding_err.take() {
Err(err)?;
}

let prepared = self
.ws
.get_or_prepare(&mut self.statement_cache, sql, persist)
Expand All @@ -193,7 +187,7 @@ impl ExaConnection {
&prepared.parameters,
args.buf,
&self.ws.attributes,
)?
)
.try_into()?;

self.ws
Expand Down
Loading

0 comments on commit 626c39c

Please sign in to comment.