diff --git a/src/arrow2/src/array/dictionary/data.rs b/src/arrow2/src/array/dictionary/data.rs deleted file mode 100644 index 286e02dc91..0000000000 --- a/src/arrow2/src/array/dictionary/data.rs +++ /dev/null @@ -1,48 +0,0 @@ -use crate::array::{ - from_data, to_data, Arrow2Arrow, DictionaryArray, DictionaryKey, PrimitiveArray, -}; -use crate::datatypes::{DataType, PhysicalType}; -use arrow_data::{ArrayData, ArrayDataBuilder}; - -impl Arrow2Arrow for DictionaryArray { - fn to_data(&self) -> ArrayData { - let keys = self.keys.to_data(); - let builder = keys - .into_builder() - .data_type(self.data_type.clone().into()) - .child_data(vec![to_data(self.values.as_ref())]); - - // Safety: Dictionary is valid - unsafe { builder.build_unchecked() } - } - - fn from_data(data: &ArrayData) -> Self { - let key = match data.data_type() { - arrow_schema::DataType::Dictionary(k, _) => k.as_ref(), - d => panic!("unsupported dictionary type {d}"), - }; - - let data_type = DataType::from(data.data_type().clone()); - assert_eq!( - data_type.to_physical_type(), - PhysicalType::Dictionary(K::KEY_TYPE) - ); - - let key_builder = ArrayDataBuilder::new(key.clone()) - .buffers(vec![data.buffers()[0].clone()]) - .offset(data.offset()) - .len(data.len()) - .nulls(data.nulls().cloned()); - - // Safety: Dictionary is valid - let key_data = unsafe { key_builder.build_unchecked() }; - let keys = PrimitiveArray::from_data(&key_data); - let values = from_data(&data.child_data()[0]); - - Self { - data_type, - keys, - values, - } - } -} diff --git a/src/arrow2/src/compute/comparison/simd/packed.rs b/src/arrow2/src/compute/comparison/simd/packed.rs deleted file mode 100644 index 0f71798a8b..0000000000 --- a/src/arrow2/src/compute/comparison/simd/packed.rs +++ /dev/null @@ -1,81 +0,0 @@ -use std::simd::{SimdPartialEq, SimdPartialOrd, ToBitMask}; - -use crate::types::simd::*; -use crate::types::{days_ms, f16, i256, months_days_ns}; - -use super::*; - -macro_rules! simd8 { - ($type:ty, $md:ty) => { - impl Simd8 for $type { - type Simd = $md; - } - - impl Simd8Lanes<$type> for $md { - #[inline] - fn from_chunk(v: &[$type]) -> Self { - <$md>::from_slice(v) - } - - #[inline] - fn from_incomplete_chunk(v: &[$type], remaining: $type) -> Self { - let mut a = [remaining; 8]; - a.iter_mut().zip(v.iter()).for_each(|(a, b)| *a = *b); - Self::from_array(a) - } - } - - impl Simd8PartialEq for $md { - #[inline] - fn eq(self, other: Self) -> u8 { - self.simd_eq(other).to_bitmask() - } - - #[inline] - fn neq(self, other: Self) -> u8 { - self.simd_ne(other).to_bitmask() - } - } - - impl Simd8PartialOrd for $md { - #[inline] - fn lt_eq(self, other: Self) -> u8 { - self.simd_le(other).to_bitmask() - } - - #[inline] - fn lt(self, other: Self) -> u8 { - self.simd_lt(other).to_bitmask() - } - - #[inline] - fn gt_eq(self, other: Self) -> u8 { - self.simd_ge(other).to_bitmask() - } - - #[inline] - fn gt(self, other: Self) -> u8 { - self.simd_gt(other).to_bitmask() - } - } - }; -} - -simd8!(u8, u8x8); -simd8!(u16, u16x8); -simd8!(u32, u32x8); -simd8!(u64, u64x8); -simd8!(i8, i8x8); -simd8!(i16, i16x8); -simd8!(i32, i32x8); -simd8!(i64, i64x8); -simd8_native_all!(i128); -simd8_native_all!(i256); -simd8_native!(f16); -simd8_native_partial_eq!(f16); -simd8!(f32, f32x8); -simd8!(f64, f64x8); -simd8_native!(days_ms); -simd8_native_partial_eq!(days_ms); -simd8_native!(months_days_ns); -simd8_native_partial_eq!(months_days_ns); diff --git a/src/arrow2/src/io/odbc/mod.rs b/src/arrow2/src/io/odbc/mod.rs deleted file mode 100644 index f40bcb3c27..0000000000 --- a/src/arrow2/src/io/odbc/mod.rs +++ /dev/null @@ -1,11 +0,0 @@ -//! API to serialize and deserialize data from and to ODBC -pub use odbc_api as api; - -pub mod read; -pub mod write; - -impl From for crate::error::Error { - fn from(error: api::Error) -> Self { - crate::error::Error::External("".to_string(), Box::new(error)) - } -} diff --git a/src/arrow2/src/io/odbc/read/deserialize.rs b/src/arrow2/src/io/odbc/read/deserialize.rs deleted file mode 100644 index be0a548e1a..0000000000 --- a/src/arrow2/src/io/odbc/read/deserialize.rs +++ /dev/null @@ -1,275 +0,0 @@ -use chrono::{NaiveDate, NaiveDateTime}; -use odbc_api::buffers::{BinColumnView, TextColumnView}; -use odbc_api::Bit; - -use crate::array::{Array, BinaryArray, BooleanArray, PrimitiveArray, Utf8Array}; -use crate::bitmap::{Bitmap, MutableBitmap}; -use crate::buffer::Buffer; -use crate::datatypes::{DataType, TimeUnit}; -use crate::offset::{Offsets, OffsetsBuffer}; -use crate::types::NativeType; - -use super::super::api::buffers::AnyColumnView; - -/// Deserializes a [`AnyColumnView`] into an array of [`DataType`]. -/// This is CPU-bounded -pub fn deserialize(column: AnyColumnView, data_type: DataType) -> Box { - match column { - AnyColumnView::Text(view) => Box::new(utf8(data_type, view)) as _, - AnyColumnView::WText(_) => todo!(), - AnyColumnView::Binary(view) => Box::new(binary(data_type, view)) as _, - AnyColumnView::Date(values) => Box::new(date(data_type, values)) as _, - AnyColumnView::Time(values) => Box::new(time(data_type, values)) as _, - AnyColumnView::Timestamp(values) => Box::new(timestamp(data_type, values)) as _, - AnyColumnView::F64(values) => Box::new(primitive(data_type, values)) as _, - AnyColumnView::F32(values) => Box::new(primitive(data_type, values)) as _, - AnyColumnView::I8(values) => Box::new(primitive(data_type, values)) as _, - AnyColumnView::I16(values) => Box::new(primitive(data_type, values)) as _, - AnyColumnView::I32(values) => Box::new(primitive(data_type, values)) as _, - AnyColumnView::I64(values) => Box::new(primitive(data_type, values)) as _, - AnyColumnView::U8(values) => Box::new(primitive(data_type, values)) as _, - AnyColumnView::Bit(values) => Box::new(bool(data_type, values)) as _, - AnyColumnView::NullableDate(slice) => Box::new(date_optional( - data_type, - slice.raw_values().0, - slice.raw_values().1, - )) as _, - AnyColumnView::NullableTime(slice) => Box::new(time_optional( - data_type, - slice.raw_values().0, - slice.raw_values().1, - )) as _, - AnyColumnView::NullableTimestamp(slice) => Box::new(timestamp_optional( - data_type, - slice.raw_values().0, - slice.raw_values().1, - )) as _, - AnyColumnView::NullableF64(slice) => Box::new(primitive_optional( - data_type, - slice.raw_values().0, - slice.raw_values().1, - )) as _, - AnyColumnView::NullableF32(slice) => Box::new(primitive_optional( - data_type, - slice.raw_values().0, - slice.raw_values().1, - )) as _, - AnyColumnView::NullableI8(slice) => Box::new(primitive_optional( - data_type, - slice.raw_values().0, - slice.raw_values().1, - )) as _, - AnyColumnView::NullableI16(slice) => Box::new(primitive_optional( - data_type, - slice.raw_values().0, - slice.raw_values().1, - )) as _, - AnyColumnView::NullableI32(slice) => Box::new(primitive_optional( - data_type, - slice.raw_values().0, - slice.raw_values().1, - )) as _, - AnyColumnView::NullableI64(slice) => Box::new(primitive_optional( - data_type, - slice.raw_values().0, - slice.raw_values().1, - )) as _, - AnyColumnView::NullableU8(slice) => Box::new(primitive_optional( - data_type, - slice.raw_values().0, - slice.raw_values().1, - )) as _, - AnyColumnView::NullableBit(slice) => Box::new(bool_optional( - data_type, - slice.raw_values().0, - slice.raw_values().1, - )) as _, - } -} - -fn bitmap(values: &[isize]) -> Option { - MutableBitmap::from_trusted_len_iter(values.iter().map(|x| *x != -1)).into() -} - -fn primitive(data_type: DataType, values: &[T]) -> PrimitiveArray { - PrimitiveArray::new(data_type, values.to_vec().into(), None) -} - -fn primitive_optional( - data_type: DataType, - values: &[T], - indicators: &[isize], -) -> PrimitiveArray { - let validity = bitmap(indicators); - PrimitiveArray::new(data_type, values.to_vec().into(), validity) -} - -fn bool(data_type: DataType, values: &[Bit]) -> BooleanArray { - let values = values.iter().map(|x| x.as_bool()); - let values = Bitmap::from_trusted_len_iter(values); - BooleanArray::new(data_type, values, None) -} - -fn bool_optional(data_type: DataType, values: &[Bit], indicators: &[isize]) -> BooleanArray { - let validity = bitmap(indicators); - let values = values.iter().map(|x| x.as_bool()); - let values = Bitmap::from_trusted_len_iter(values); - BooleanArray::new(data_type, values, validity) -} - -fn binary_generic<'a>( - iter: impl Iterator>, -) -> (OffsetsBuffer, Buffer, Option) { - let length = iter.size_hint().0; - let mut validity = MutableBitmap::with_capacity(length); - let mut values = Vec::::with_capacity(0); - - let mut offsets = Offsets::::with_capacity(length); - for item in iter { - if let Some(item) = item { - values.extend_from_slice(item); - offsets - .try_push_usize(item.len()) - .expect("List to contain less than i32::MAX items."); - validity.push(true); - } else { - offsets.extend_constant(1); - validity.push(false); - } - } - - (offsets.into(), values.into(), validity.into()) -} - -fn binary(data_type: DataType, view: BinColumnView) -> BinaryArray { - let (offsets, values, validity) = binary_generic(view.iter()); - BinaryArray::new(data_type, offsets, values, validity) -} - -fn utf8(data_type: DataType, view: TextColumnView) -> Utf8Array { - let (offsets, values, validity) = binary_generic(view.iter()); - - // this O(N) check is necessary for the utf8 validity - Utf8Array::new(data_type, offsets, values, validity) -} - -fn date(data_type: DataType, values: &[odbc_api::sys::Date]) -> PrimitiveArray { - let values = values.iter().map(days_since_epoch).collect::>(); - PrimitiveArray::new(data_type, values.into(), None) -} - -fn date_optional( - data_type: DataType, - values: &[odbc_api::sys::Date], - indicators: &[isize], -) -> PrimitiveArray { - let values = values.iter().map(days_since_epoch).collect::>(); - let validity = bitmap(indicators); - PrimitiveArray::new(data_type, values.into(), validity) -} - -fn days_since_epoch(date: &odbc_api::sys::Date) -> i32 { - let unix_epoch = NaiveDate::from_ymd_opt(1970, 1, 1).expect("invalid or out-of-range date"); - let date = NaiveDate::from_ymd_opt(date.year as i32, date.month as u32, date.day as u32) - .unwrap_or(unix_epoch); - let duration = date.signed_duration_since(unix_epoch); - duration.num_days().try_into().unwrap_or(i32::MAX) -} - -fn time(data_type: DataType, values: &[odbc_api::sys::Time]) -> PrimitiveArray { - let values = values.iter().map(time_since_midnight).collect::>(); - PrimitiveArray::new(data_type, values.into(), None) -} - -fn time_since_midnight(date: &odbc_api::sys::Time) -> i32 { - (date.hour as i32) * 60 * 60 + (date.minute as i32) * 60 + date.second as i32 -} - -fn time_optional( - data_type: DataType, - values: &[odbc_api::sys::Time], - indicators: &[isize], -) -> PrimitiveArray { - let values = values.iter().map(time_since_midnight).collect::>(); - let validity = bitmap(indicators); - PrimitiveArray::new(data_type, values.into(), validity) -} - -fn timestamp(data_type: DataType, values: &[odbc_api::sys::Timestamp]) -> PrimitiveArray { - let unit = if let DataType::Timestamp(unit, _) = &data_type { - unit - } else { - unreachable!() - }; - let values = match unit { - TimeUnit::Second => values.iter().map(timestamp_s).collect::>(), - TimeUnit::Millisecond => values.iter().map(timestamp_ms).collect::>(), - TimeUnit::Microsecond => values.iter().map(timestamp_us).collect::>(), - TimeUnit::Nanosecond => values.iter().map(timestamp_ns).collect::>(), - }; - PrimitiveArray::new(data_type, values.into(), None) -} - -fn timestamp_optional( - data_type: DataType, - values: &[odbc_api::sys::Timestamp], - indicators: &[isize], -) -> PrimitiveArray { - let unit = if let DataType::Timestamp(unit, _) = &data_type { - unit - } else { - unreachable!() - }; - let values = match unit { - TimeUnit::Second => values.iter().map(timestamp_s).collect::>(), - TimeUnit::Millisecond => values.iter().map(timestamp_ms).collect::>(), - TimeUnit::Microsecond => values.iter().map(timestamp_us).collect::>(), - TimeUnit::Nanosecond => values.iter().map(timestamp_ns).collect::>(), - }; - let validity = bitmap(indicators); - PrimitiveArray::new(data_type, values.into(), validity) -} - -fn timestamp_to_naive(timestamp: &odbc_api::sys::Timestamp) -> Option { - NaiveDate::from_ymd_opt( - timestamp.year as i32, - timestamp.month as u32, - timestamp.day as u32, - ) - .and_then(|x| { - x.and_hms_nano_opt( - timestamp.hour as u32, - timestamp.minute as u32, - timestamp.second as u32, - /* - https://docs.microsoft.com/en-us/sql/odbc/reference/appendixes/c-data-types?view=sql-server-ver15 - [b] The value of the fraction field is [...] for a billionth of a second (one nanosecond) is 1. - */ - timestamp.fraction, - ) - }) -} - -fn timestamp_s(timestamp: &odbc_api::sys::Timestamp) -> i64 { - timestamp_to_naive(timestamp) - .map(|x| x.timestamp()) - .unwrap_or(0) -} - -fn timestamp_ms(timestamp: &odbc_api::sys::Timestamp) -> i64 { - timestamp_to_naive(timestamp) - .map(|x| x.timestamp_millis()) - .unwrap_or(0) -} - -fn timestamp_us(timestamp: &odbc_api::sys::Timestamp) -> i64 { - timestamp_to_naive(timestamp) - .map(|x| x.timestamp_nanos_opt().unwrap() / 1000) - .unwrap_or(0) -} - -fn timestamp_ns(timestamp: &odbc_api::sys::Timestamp) -> i64 { - timestamp_to_naive(timestamp) - .map(|x| x.timestamp_nanos_opt().unwrap()) - .unwrap_or(0) -} diff --git a/src/arrow2/src/io/odbc/read/mod.rs b/src/arrow2/src/io/odbc/read/mod.rs deleted file mode 100644 index e8945759c6..0000000000 --- a/src/arrow2/src/io/odbc/read/mod.rs +++ /dev/null @@ -1,37 +0,0 @@ -//! APIs to read from ODBC -mod deserialize; -mod schema; - -pub use deserialize::deserialize; -pub use schema::infer_schema; - -use super::api; - -/// Creates a [`api::buffers::ColumnarBuffer`] from the metadata. -/// # Errors -/// Iff the driver provides an incorrect [`api::ResultSetMetadata`] -pub fn buffer_from_metadata( - resut_set_metadata: &impl api::ResultSetMetadata, - max_batch_size: usize, -) -> std::result::Result, api::Error> { - let num_cols: u16 = resut_set_metadata.num_result_cols()? as u16; - - let descs = (0..num_cols) - .map(|index| { - let mut column_description = api::ColumnDescription::default(); - - resut_set_metadata.describe_col(index + 1, &mut column_description)?; - - Ok(api::buffers::BufferDescription { - nullable: column_description.could_be_nullable(), - kind: api::buffers::BufferKind::from_data_type(column_description.data_type) - .unwrap(), - }) - }) - .collect::, api::Error>>()?; - - Ok(api::buffers::buffer_from_description( - max_batch_size, - descs.into_iter(), - )) -} diff --git a/src/arrow2/src/io/odbc/read/schema.rs b/src/arrow2/src/io/odbc/read/schema.rs deleted file mode 100644 index c679500b7a..0000000000 --- a/src/arrow2/src/io/odbc/read/schema.rs +++ /dev/null @@ -1,80 +0,0 @@ -use crate::datatypes::{DataType, Field, TimeUnit}; -use crate::error::Result; - -use super::super::api; -use super::super::api::ResultSetMetadata; - -/// Infers the Arrow [`Field`]s from a [`ResultSetMetadata`] -pub fn infer_schema(resut_set_metadata: &impl ResultSetMetadata) -> Result> { - let num_cols: u16 = resut_set_metadata.num_result_cols().unwrap() as u16; - - let fields = (0..num_cols) - .map(|index| { - let mut column_description = api::ColumnDescription::default(); - resut_set_metadata - .describe_col(index + 1, &mut column_description) - .unwrap(); - - column_to_field(&column_description) - }) - .collect(); - Ok(fields) -} - -fn column_to_field(column_description: &api::ColumnDescription) -> Field { - Field::new( - column_description - .name_to_string() - .expect("Column name must be representable in utf8"), - column_to_data_type(&column_description.data_type), - column_description.could_be_nullable(), - ) -} - -fn column_to_data_type(data_type: &api::DataType) -> DataType { - use api::DataType as OdbcDataType; - match data_type { - OdbcDataType::Numeric { - precision: p @ 0..=38, - scale, - } - | OdbcDataType::Decimal { - precision: p @ 0..=38, - scale, - } => DataType::Decimal(*p, (*scale) as usize), - OdbcDataType::Integer => DataType::Int32, - OdbcDataType::SmallInt => DataType::Int16, - OdbcDataType::Real | OdbcDataType::Float { precision: 0..=24 } => DataType::Float32, - OdbcDataType::Float { precision: _ } | OdbcDataType::Double => DataType::Float64, - OdbcDataType::Date => DataType::Date32, - OdbcDataType::Timestamp { precision: 0 } => DataType::Timestamp(TimeUnit::Second, None), - OdbcDataType::Timestamp { precision: 1..=3 } => { - DataType::Timestamp(TimeUnit::Millisecond, None) - } - OdbcDataType::Timestamp { precision: 4..=6 } => { - DataType::Timestamp(TimeUnit::Microsecond, None) - } - OdbcDataType::Timestamp { precision: _ } => DataType::Timestamp(TimeUnit::Nanosecond, None), - OdbcDataType::BigInt => DataType::Int64, - OdbcDataType::TinyInt => DataType::Int8, - OdbcDataType::Bit => DataType::Boolean, - OdbcDataType::Binary { length } => DataType::FixedSizeBinary(*length), - OdbcDataType::LongVarbinary { length: _ } | OdbcDataType::Varbinary { length: _ } => { - DataType::Binary - } - OdbcDataType::Unknown - | OdbcDataType::Time { precision: _ } - | OdbcDataType::Numeric { .. } - | OdbcDataType::Decimal { .. } - | OdbcDataType::Other { - data_type: _, - column_size: _, - decimal_digits: _, - } - | OdbcDataType::WChar { length: _ } - | OdbcDataType::Char { length: _ } - | OdbcDataType::WVarchar { length: _ } - | OdbcDataType::LongVarchar { length: _ } - | OdbcDataType::Varchar { length: _ } => DataType::Utf8, - } -} diff --git a/src/arrow2/src/io/odbc/write/mod.rs b/src/arrow2/src/io/odbc/write/mod.rs deleted file mode 100644 index 245f2455bb..0000000000 --- a/src/arrow2/src/io/odbc/write/mod.rs +++ /dev/null @@ -1,71 +0,0 @@ -//! APIs to write to ODBC -mod schema; -mod serialize; - -use crate::{array::Array, chunk::Chunk, datatypes::Field, error::Result}; - -use super::api; -pub use schema::infer_descriptions; -pub use serialize::serialize; - -/// Creates a [`api::buffers::ColumnarBuffer`] from [`api::ColumnDescription`]s. -/// -/// This is useful when separating the serialization (CPU-bounded) to writing to the DB (IO-bounded). -pub fn buffer_from_description( - descriptions: Vec, - capacity: usize, -) -> api::buffers::ColumnarBuffer { - let descs = descriptions - .into_iter() - .map(|description| api::buffers::BufferDescription { - nullable: description.could_be_nullable(), - kind: api::buffers::BufferKind::from_data_type(description.data_type).unwrap(), - }); - - api::buffers::buffer_from_description(capacity, descs) -} - -/// A writer of [`Chunk`]s to an ODBC [`api::Prepared`] statement. -/// # Implementation -/// This struct mixes CPU-bounded and IO-bounded tasks and is not ideal -/// for an `async` context. -pub struct Writer<'a> { - fields: Vec, - buffer: api::buffers::ColumnarBuffer, - prepared: api::Prepared<'a>, -} - -impl<'a> Writer<'a> { - /// Creates a new [`Writer`]. - /// # Errors - /// Errors iff any of the types from [`Field`] is not supported. - pub fn try_new(prepared: api::Prepared<'a>, fields: Vec) -> Result { - let buffer = buffer_from_description(infer_descriptions(&fields)?, 0); - Ok(Self { - fields, - buffer, - prepared, - }) - } - - /// Writes a chunk to the writer. - /// # Errors - /// Errors iff the execution of the statement fails. - pub fn write>(&mut self, chunk: &Chunk) -> Result<()> { - if chunk.len() > self.buffer.num_rows() { - // if the chunk is larger, we re-allocate new buffers to hold it - self.buffer = buffer_from_description(infer_descriptions(&self.fields)?, chunk.len()); - } - - self.buffer.set_num_rows(chunk.len()); - - // serialize (CPU-bounded) - for (i, column) in chunk.arrays().iter().enumerate() { - serialize(column.as_ref(), &mut self.buffer.column_mut(i))?; - } - - // write (IO-bounded) - self.prepared.execute(&self.buffer)?; - Ok(()) - } -} diff --git a/src/arrow2/src/io/odbc/write/schema.rs b/src/arrow2/src/io/odbc/write/schema.rs deleted file mode 100644 index 5ac7ebfaf8..0000000000 --- a/src/arrow2/src/io/odbc/write/schema.rs +++ /dev/null @@ -1,38 +0,0 @@ -use super::super::api; - -use crate::datatypes::{DataType, Field}; -use crate::error::{Error, Result}; - -/// Infers the [`api::ColumnDescription`] from the fields -pub fn infer_descriptions(fields: &[Field]) -> Result> { - fields - .iter() - .map(|field| { - let nullability = if field.is_nullable { - api::Nullability::Nullable - } else { - api::Nullability::NoNulls - }; - let data_type = data_type_to(field.data_type())?; - Ok(api::ColumnDescription { - name: api::U16String::from_str(&field.name).into_vec(), - nullability, - data_type, - }) - }) - .collect() -} - -fn data_type_to(data_type: &DataType) -> Result { - Ok(match data_type { - DataType::Boolean => api::DataType::Bit, - DataType::Int16 => api::DataType::SmallInt, - DataType::Int32 => api::DataType::Integer, - DataType::Float32 => api::DataType::Float { precision: 24 }, - DataType::Float64 => api::DataType::Float { precision: 53 }, - DataType::FixedSizeBinary(length) => api::DataType::Binary { length: *length }, - DataType::Binary | DataType::LargeBinary => api::DataType::Varbinary { length: 0 }, - DataType::Utf8 | DataType::LargeUtf8 => api::DataType::Varchar { length: 0 }, - other => return Err(Error::nyi(format!("{other:?} to ODBC"))), - }) -} diff --git a/src/arrow2/src/io/odbc/write/serialize.rs b/src/arrow2/src/io/odbc/write/serialize.rs deleted file mode 100644 index f92326ba89..0000000000 --- a/src/arrow2/src/io/odbc/write/serialize.rs +++ /dev/null @@ -1,182 +0,0 @@ -use api::buffers::{BinColumnWriter, TextColumnWriter}; - -use crate::array::*; -use crate::bitmap::Bitmap; -use crate::datatypes::DataType; -use crate::error::{Error, Result}; -use crate::offset::Offset; -use crate::types::NativeType; - -use super::super::api; -use super::super::api::buffers::NullableSliceMut; - -/// Serializes an [`Array`] to [`api::buffers::AnyColumnViewMut`] -/// This operation is CPU-bounded -pub fn serialize(array: &dyn Array, column: &mut api::buffers::AnyColumnViewMut) -> Result<()> { - match array.data_type() { - DataType::Boolean => { - if let api::buffers::AnyColumnViewMut::Bit(values) = column { - bool(array.as_any().downcast_ref().unwrap(), values); - Ok(()) - } else if let api::buffers::AnyColumnViewMut::NullableBit(values) = column { - bool_optional(array.as_any().downcast_ref().unwrap(), values); - Ok(()) - } else { - Err(Error::nyi("serialize bool to non-bool ODBC")) - } - } - DataType::Int16 => { - if let api::buffers::AnyColumnViewMut::I16(values) = column { - primitive(array.as_any().downcast_ref().unwrap(), values); - Ok(()) - } else if let api::buffers::AnyColumnViewMut::NullableI16(values) = column { - primitive_optional(array.as_any().downcast_ref().unwrap(), values); - Ok(()) - } else { - Err(Error::nyi("serialize i16 to non-i16 ODBC")) - } - } - DataType::Int32 => { - if let api::buffers::AnyColumnViewMut::I32(values) = column { - primitive(array.as_any().downcast_ref().unwrap(), values); - Ok(()) - } else if let api::buffers::AnyColumnViewMut::NullableI32(values) = column { - primitive_optional(array.as_any().downcast_ref().unwrap(), values); - Ok(()) - } else { - Err(Error::nyi("serialize i32 to non-i32 ODBC")) - } - } - DataType::Float32 => { - if let api::buffers::AnyColumnViewMut::F32(values) = column { - primitive(array.as_any().downcast_ref().unwrap(), values); - Ok(()) - } else if let api::buffers::AnyColumnViewMut::NullableF32(values) = column { - primitive_optional(array.as_any().downcast_ref().unwrap(), values); - Ok(()) - } else { - Err(Error::nyi("serialize f32 to non-f32 ODBC")) - } - } - DataType::Float64 => { - if let api::buffers::AnyColumnViewMut::F64(values) = column { - primitive(array.as_any().downcast_ref().unwrap(), values); - Ok(()) - } else if let api::buffers::AnyColumnViewMut::NullableF64(values) = column { - primitive_optional(array.as_any().downcast_ref().unwrap(), values); - Ok(()) - } else { - Err(Error::nyi("serialize f64 to non-f64 ODBC")) - } - } - DataType::Utf8 => { - if let api::buffers::AnyColumnViewMut::Text(values) = column { - utf8::(array.as_any().downcast_ref().unwrap(), values); - Ok(()) - } else { - Err(Error::nyi("serialize utf8 to non-text ODBC")) - } - } - DataType::LargeUtf8 => { - if let api::buffers::AnyColumnViewMut::Text(values) = column { - utf8::(array.as_any().downcast_ref().unwrap(), values); - Ok(()) - } else { - Err(Error::nyi("serialize utf8 to non-text ODBC")) - } - } - DataType::Binary => { - if let api::buffers::AnyColumnViewMut::Binary(values) = column { - binary::(array.as_any().downcast_ref().unwrap(), values); - Ok(()) - } else { - Err(Error::nyi("serialize utf8 to non-binary ODBC")) - } - } - DataType::LargeBinary => { - if let api::buffers::AnyColumnViewMut::Binary(values) = column { - binary::(array.as_any().downcast_ref().unwrap(), values); - Ok(()) - } else { - Err(Error::nyi("serialize utf8 to non-text ODBC")) - } - } - DataType::FixedSizeBinary(_) => { - if let api::buffers::AnyColumnViewMut::Binary(values) = column { - fixed_binary(array.as_any().downcast_ref().unwrap(), values); - Ok(()) - } else { - Err(Error::nyi("serialize fixed to non-binary ODBC")) - } - } - other => Err(Error::nyi(format!("{other:?} to ODBC"))), - } -} - -fn bool(array: &BooleanArray, values: &mut [api::Bit]) { - array - .values() - .iter() - .zip(values.iter_mut()) - .for_each(|(from, to)| *to = api::Bit(from as u8)); -} - -fn bool_optional(array: &BooleanArray, values: &mut NullableSliceMut) { - let (values, indicators) = values.raw_values(); - array - .values() - .iter() - .zip(values.iter_mut()) - .for_each(|(from, to)| *to = api::Bit(from as u8)); - write_validity(array.validity(), indicators); -} - -fn primitive(array: &PrimitiveArray, values: &mut [T]) { - values.copy_from_slice(array.values()) -} - -fn write_validity(validity: Option<&Bitmap>, indicators: &mut [isize]) { - if let Some(validity) = validity { - indicators - .iter_mut() - .zip(validity.iter()) - .for_each(|(indicator, is_valid)| *indicator = if is_valid { 0 } else { -1 }) - } else { - indicators.iter_mut().for_each(|x| *x = 0) - } -} - -fn primitive_optional(array: &PrimitiveArray, values: &mut NullableSliceMut) { - let (values, indicators) = values.raw_values(); - values.copy_from_slice(array.values()); - write_validity(array.validity(), indicators); -} - -fn fixed_binary(array: &FixedSizeBinaryArray, writer: &mut BinColumnWriter) { - writer.set_max_len(array.size()); - writer.write(array.iter()) -} - -fn binary(array: &BinaryArray, writer: &mut BinColumnWriter) { - let max_len = array - .offsets() - .buffer() - .windows(2) - .map(|x| (x[1] - x[0]).to_usize()) - .max() - .unwrap_or(0); - writer.set_max_len(max_len); - writer.write(array.iter()) -} - -fn utf8(array: &Utf8Array, writer: &mut TextColumnWriter) { - let max_len = array - .offsets() - .buffer() - .windows(2) - .map(|x| (x[1] - x[0]).to_usize()) - .max() - .unwrap_or(0); - writer.set_max_len(max_len); - writer.write(array.iter().map(|x| x.map(|x| x.as_bytes()))) -} diff --git a/src/daft-csv/src/compression.rs b/src/daft-csv/src/compression.rs deleted file mode 100644 index 268b1566d9..0000000000 --- a/src/daft-csv/src/compression.rs +++ /dev/null @@ -1,66 +0,0 @@ -use async_compression::tokio::bufread::{ - BrotliDecoder, BzDecoder, DeflateDecoder, GzipDecoder, LzmaDecoder, XzDecoder, ZlibDecoder, - ZstdDecoder, -}; -use std::{path::PathBuf, pin::Pin}; -use tokio::io::{AsyncBufRead, AsyncRead}; -use url::Url; - -#[derive(Debug)] -pub enum CompressionCodec { - Brotli, - Bz, - Deflate, - Gzip, - Lzma, - Xz, - Zlib, - Zstd, -} - -impl CompressionCodec { - pub fn from_uri(uri: &str) -> Option { - let url = Url::parse(uri); - let path = match &url { - Ok(url) => url.path(), - _ => uri, - }; - let extension = PathBuf::from(path) - .extension()? - .to_string_lossy() - .to_string(); - Self::from_extension(extension.as_ref()) - } - pub fn from_extension(extension: &str) -> Option { - use CompressionCodec::*; - match extension { - "br" => Some(Brotli), - "bz2" => Some(Bz), - "deflate" => Some(Deflate), - "gz" => Some(Gzip), - "lzma" => Some(Lzma), - "xz" => Some(Xz), - "zl" => Some(Zlib), - "zstd" | "zst" => Some(Zstd), - "snappy" => todo!("Snappy compression support not yet implemented"), - _ => None, - } - } - - pub fn to_decoder( - &self, - reader: T, - ) -> Pin> { - use CompressionCodec::*; - match self { - Brotli => Box::pin(BrotliDecoder::new(reader)), - Bz => Box::pin(BzDecoder::new(reader)), - Deflate => Box::pin(DeflateDecoder::new(reader)), - Gzip => Box::pin(GzipDecoder::new(reader)), - Lzma => Box::pin(LzmaDecoder::new(reader)), - Xz => Box::pin(XzDecoder::new(reader)), - Zlib => Box::pin(ZlibDecoder::new(reader)), - Zstd => Box::pin(ZstdDecoder::new(reader)), - } - } -}