Skip to content

Commit

Permalink
wip (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
yjshen authored Dec 21, 2021
1 parent 648f600 commit 626591f
Show file tree
Hide file tree
Showing 21 changed files with 181 additions and 279 deletions.
20 changes: 1 addition & 19 deletions ballista/rust/core/src/execution_plans/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -492,27 +492,9 @@ mod tests {
use datafusion::physical_plan::expressions::Column;
use datafusion::physical_plan::limit::GlobalLimitExec;
use datafusion::physical_plan::memory::MemoryExec;
use std::borrow::Borrow;
use datafusion::field_util::StructArrayExt;
use tempfile::TempDir;

pub trait StructArrayExt {
fn column_names(&self) -> Vec<&str>;
fn column_by_name(&self, column_name: &str) -> Option<&ArrayRef>;
}

impl StructArrayExt for StructArray {
fn column_names(&self) -> Vec<&str> {
self.fields().iter().map(|f| f.name.as_str()).collect()
}

fn column_by_name(&self, column_name: &str) -> Option<&ArrayRef> {
self.fields()
.iter()
.position(|c| c.name() == column_name)
.map(|pos| self.values()[pos].borrow())
}
}

#[tokio::test]
async fn test() -> Result<()> {
let input_plan = Arc::new(CoalescePartitionsExec::new(create_input_plan()?));
Expand Down
7 changes: 3 additions & 4 deletions datafusion/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
//! Parquet format abstractions

use std::any::{type_name, Any};
use std::io::Read;
use std::sync::Arc;

use arrow::datatypes::Schema;
Expand Down Expand Up @@ -257,15 +256,15 @@ fn summarize_min_max(

/// Read and parse the schema of the Parquet file at location `path`
fn fetch_schema(object_reader: Arc<dyn ObjectReader>) -> Result<Schema> {
let reader = std::io::BufReader::new(object_reader.sync_reader())?;
let meta_data = read_metadata(&mut std::io::BufReader::new(reader))?;
let mut reader = object_reader.sync_reader()?;
let meta_data = read_metadata(&mut reader)?;
let schema = get_schema(&meta_data)?;
Ok(schema)
}

/// Read and parse the statistics of the Parquet file at location `path`
fn fetch_statistics(object_reader: Arc<dyn ObjectReader>) -> Result<Statistics> {
let reader = std::io::BufReader::new(object_reader.sync_reader())?;
let mut reader = object_reader.sync_reader()?;
let meta_data = read_metadata(&mut reader)?;
let schema = get_schema(&meta_data)?;

Expand Down
14 changes: 7 additions & 7 deletions datafusion/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ pub async fn pruned_partition_list(
.try_collect()
.await?;

let mem_table = MemTable::try_new(batches[0].schema(), vec![batches])?;
let mem_table = MemTable::try_new(batches[0].schema().clone(), vec![batches])?;

// Filter the partitions using a local datafusion context
// TODO having the external context would allow us to resolve `Volatility::Stable`
Expand Down Expand Up @@ -263,23 +263,23 @@ fn paths_to_batch(
table_path: &str,
metas: &[FileMeta],
) -> Result<RecordBatch> {
let mut key_builder = MutableUtf8Array::with_capacity(metas.len());
let mut key_builder = MutableUtf8Array::<i32>::with_capacity(metas.len());
let mut length_builder = MutablePrimitiveArray::<u64>::with_capacity(metas.len());
let mut modified_builder = MutablePrimitiveArray::<i64>::with_capacity(metas.len());
let mut partition_builders = table_partition_cols
.iter()
.map(|_| MutableUtf8Array::with_capacity(metas.len()))
.map(|_| MutableUtf8Array::<i32>::with_capacity(metas.len()))
.collect::<Vec<_>>();
for file_meta in metas {
if let Some(partition_values) =
parse_partitions_for_path(table_path, file_meta.path(), table_partition_cols)
{
key_builder.push(Some(file_meta.path()))?;
length_builder.push(Some(file_meta.size()))?;
key_builder.push(Some(file_meta.path()));
length_builder.push(Some(file_meta.size()));
modified_builder
.push(file_meta.last_modified.map(|lm| lm.timestamp_millis()));
for (i, part_val) in partition_values.iter().enumerate() {
partition_builders[i].push(Some(part_val))?;
partition_builders[i].push(Some(part_val));
}
} else {
debug!("No partitioning for path {}", file_meta.path());
Expand All @@ -290,7 +290,7 @@ fn paths_to_batch(
let mut col_arrays: Vec<Arc<dyn Array>> = vec![
key_builder.into_arc(),
length_builder.into_arc(),
modified_builder.into_arc().to(DataType::Date64),
modified_builder.to(DataType::Date64).into_arc(),
];
for mut partition_builder in partition_builders {
col_arrays.push(partition_builder.into_arc());
Expand Down
6 changes: 2 additions & 4 deletions datafusion/src/datasource/object_store/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use futures::{stream, AsyncRead, StreamExt};

use crate::datasource::object_store::{
FileMeta, FileMetaStream, ListEntryStream, ObjectReader, ObjectStore,
};
use crate::datasource::object_store::{FileMeta, FileMetaStream, ListEntryStream, ObjectReader, ObjectStore, ReadSeek};
use crate::datasource::PartitionedFile;
use crate::error::DataFusionError;
use crate::error::Result;
Expand Down Expand Up @@ -82,7 +80,7 @@ impl ObjectReader for LocalFileReader {
&self,
start: u64,
length: usize,
) -> Result<Box<dyn Read + Send + Sync>> {
) -> Result<Box<dyn ReadSeek + Send + Sync>> {
// A new file descriptor is opened for each chunk reader.
// This okay because chunks are usually fairly large.
let mut file = File::open(&self.file.path)?;
Expand Down
8 changes: 5 additions & 3 deletions datafusion/src/datasource/object_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub mod local;

use std::collections::HashMap;
use std::fmt::{self, Debug};
use std::io::Read;
use std::io::{Read, Seek};
use std::pin::Pin;
use std::sync::{Arc, RwLock};

Expand All @@ -33,6 +33,8 @@ use local::LocalFileSystem;

use crate::error::{DataFusionError, Result};

trait ReadSeek: Read + Seek {}

/// Object Reader for one file in an object store.
///
/// Note that the dynamic dispatch on the reader might
Expand All @@ -48,10 +50,10 @@ pub trait ObjectReader: Send + Sync {
&self,
start: u64,
length: usize,
) -> Result<Box<dyn Read + Send + Sync>>;
) -> Result<Box<dyn ReadSeek + Send + Sync>>;

/// Get reader for the entire file
fn sync_reader(&self) -> Result<Box<dyn Read + Send + Sync>> {
fn sync_reader(&self) -> Result<Box<dyn ReadSeek + Send + Sync>> {
self.sync_chunk_reader(0, self.length() as usize)
}

Expand Down
20 changes: 20 additions & 0 deletions datafusion/src/field_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

//! Utility functions for complex field access

use std::borrow::Borrow;
use arrow::array::{ArrayRef, StructArray};
use arrow::datatypes::{DataType, Field};

use crate::error::{DataFusionError, Result};
Expand Down Expand Up @@ -67,3 +69,21 @@ pub fn get_indexed_field(data_type: &DataType, key: &ScalarValue) -> Result<Fiel
)),
}
}

pub trait StructArrayExt {
fn column_names(&self) -> Vec<&str>;
fn column_by_name(&self, column_name: &str) -> Option<&ArrayRef>;
}

impl StructArrayExt for StructArray {
fn column_names(&self) -> Vec<&str> {
self.fields().iter().map(|f| f.name.as_str()).collect()
}

fn column_by_name(&self, column_name: &str) -> Option<&ArrayRef> {
self.fields()
.iter()
.position(|c| c.name() == column_name)
.map(|pos| self.values()[pos].borrow())
}
}
2 changes: 1 addition & 1 deletion datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ pub use arrow;

mod arrow_temporal_util;

pub(crate) mod field_util;
pub mod field_util;

#[cfg(feature = "pyarrow")]
mod pyarrow;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/optimizer/simplify_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ impl ConstEvaluator {
let schema = Schema::new(vec![Field::new(DUMMY_COL_NAME, DataType::Null, true)]);

// Need a single "input" row to produce a single output row
let col = new_null_array(&DataType::Null, 1);
let col = new_null_array(DataType::Null, 1).into();
let input_batch =
RecordBatch::try_new(std::sync::Arc::new(schema), vec![col]).unwrap();

Expand Down
6 changes: 5 additions & 1 deletion datafusion/src/physical_plan/datetime_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,11 @@ where
// given an function that maps a `&str` to a arrow native type,
// returns a `ColumnarValue` where the function is applied to either a `ArrayRef` or `ScalarValue`
// depending on the `args`'s variant.
fn handle<'a, O, F>(args: &'a [ColumnarValue], op: F, name: &str) -> Result<ColumnarValue>
fn handle<'a, O, F>(
args: &'a [ColumnarValue],
op: F,
name: &str,
data_type: DataType) -> Result<ColumnarValue>
where
O: NativeType,
ScalarValue: From<Option<O>>,
Expand Down
58 changes: 25 additions & 33 deletions datafusion/src/physical_plan/expressions/approx_distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,16 @@ use crate::physical_plan::{
hyperloglog::HyperLogLog, Accumulator, AggregateExpr, PhysicalExpr,
};
use crate::scalar::ScalarValue;
use arrow::array::{
ArrayRef, BinaryArray, BinaryOffsetSizeTrait, GenericBinaryArray, GenericStringArray,
PrimitiveArray, StringOffsetSizeTrait,
};
use arrow::datatypes::{
ArrowPrimitiveType, DataType, Field, Int16Type, Int32Type, Int64Type, Int8Type,
UInt16Type, UInt32Type, UInt64Type, UInt8Type,
};
use arrow::array::{ArrayRef, BinaryArray, Offset, PrimitiveArray, Utf8Array};
use arrow::datatypes::{DataType, Field};
use std::any::type_name;
use std::any::Any;
use std::convert::TryFrom;
use std::convert::TryInto;
use std::hash::Hash;
use std::marker::PhantomData;
use std::sync::Arc;
use arrow::types::NativeType;

/// APPROX_DISTINCT aggregate expression
#[derive(Debug)]
Expand Down Expand Up @@ -89,14 +84,14 @@ impl AggregateExpr for ApproxDistinct {
// TODO u8, i8, u16, i16 shall really be done using bitmap, not HLL
// TODO support for boolean (trivial case)
// https://github.com/apache/arrow-datafusion/issues/1109
DataType::UInt8 => Box::new(NumericHLLAccumulator::<UInt8Type>::new()),
DataType::UInt16 => Box::new(NumericHLLAccumulator::<UInt16Type>::new()),
DataType::UInt32 => Box::new(NumericHLLAccumulator::<UInt32Type>::new()),
DataType::UInt64 => Box::new(NumericHLLAccumulator::<UInt64Type>::new()),
DataType::Int8 => Box::new(NumericHLLAccumulator::<Int8Type>::new()),
DataType::Int16 => Box::new(NumericHLLAccumulator::<Int16Type>::new()),
DataType::Int32 => Box::new(NumericHLLAccumulator::<Int32Type>::new()),
DataType::Int64 => Box::new(NumericHLLAccumulator::<Int64Type>::new()),
DataType::UInt8 => Box::new(NumericHLLAccumulator::<u8>::new()),
DataType::UInt16 => Box::new(NumericHLLAccumulator::<u16>::new()),
DataType::UInt32 => Box::new(NumericHLLAccumulator::<u32>::new()),
DataType::UInt64 => Box::new(NumericHLLAccumulator::<u64>::new()),
DataType::Int8 => Box::new(NumericHLLAccumulator::<i8>::new()),
DataType::Int16 => Box::new(NumericHLLAccumulator::<i16>::new()),
DataType::Int32 => Box::new(NumericHLLAccumulator::<i32>::new()),
DataType::Int64 => Box::new(NumericHLLAccumulator::<i64>::new()),
DataType::Utf8 => Box::new(StringHLLAccumulator::<i32>::new()),
DataType::LargeUtf8 => Box::new(StringHLLAccumulator::<i64>::new()),
DataType::Binary => Box::new(BinaryHLLAccumulator::<i32>::new()),
Expand All @@ -119,15 +114,15 @@ impl AggregateExpr for ApproxDistinct {
#[derive(Debug)]
struct BinaryHLLAccumulator<T>
where
T: BinaryOffsetSizeTrait,
T: Offset,
{
hll: HyperLogLog<Vec<u8>>,
phantom_data: PhantomData<T>,
}

impl<T> BinaryHLLAccumulator<T>
where
T: BinaryOffsetSizeTrait,
T: Offset,
{
/// new approx_distinct accumulator
pub fn new() -> Self {
Expand All @@ -141,15 +136,15 @@ where
#[derive(Debug)]
struct StringHLLAccumulator<T>
where
T: StringOffsetSizeTrait,
T: Offset,
{
hll: HyperLogLog<String>,
phantom_data: PhantomData<T>,
}

impl<T> StringHLLAccumulator<T>
where
T: StringOffsetSizeTrait,
T: Offset,
{
/// new approx_distinct accumulator
pub fn new() -> Self {
Expand All @@ -163,16 +158,14 @@ where
#[derive(Debug)]
struct NumericHLLAccumulator<T>
where
T: ArrowPrimitiveType,
T::Native: Hash,
T: NativeType + Hash,
{
hll: HyperLogLog<T::Native>,
hll: HyperLogLog<T>,
}

impl<T> NumericHLLAccumulator<T>
where
T: ArrowPrimitiveType,
T::Native: Hash,
T: NativeType + Hash,
{
/// new approx_distinct accumulator
pub fn new() -> Self {
Expand Down Expand Up @@ -276,11 +269,11 @@ macro_rules! downcast_value {

impl<T> Accumulator for BinaryHLLAccumulator<T>
where
T: BinaryOffsetSizeTrait,
T: Offset,
{
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
let array: &GenericBinaryArray<T> =
downcast_value!(values, GenericBinaryArray, T);
let array: &BinaryArray<T> =
downcast_value!(values, BinaryArray, T);
// flatten because we would skip nulls
self.hll
.extend(array.into_iter().flatten().map(|v| v.to_vec()));
Expand All @@ -292,11 +285,11 @@ where

impl<T> Accumulator for StringHLLAccumulator<T>
where
T: StringOffsetSizeTrait,
T: Offset,
{
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
let array: &GenericStringArray<T> =
downcast_value!(values, GenericStringArray, T);
let array: &Utf8Array<T> =
downcast_value!(values, Utf8Array, T);
// flatten because we would skip nulls
self.hll
.extend(array.into_iter().flatten().map(|i| i.to_string()));
Expand All @@ -308,8 +301,7 @@ where

impl<T> Accumulator for NumericHLLAccumulator<T>
where
T: ArrowPrimitiveType + std::fmt::Debug,
T::Native: Hash,
T: NativeType + Hash,
{
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
let array: &PrimitiveArray<T> = downcast_value!(values, PrimitiveArray, T);
Expand Down
Loading

0 comments on commit 626591f

Please sign in to comment.