Skip to content

Commit

Permalink
various fix on array construction, array type conversion, etc (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
yjshen authored Dec 22, 2021
1 parent 83ed429 commit d5a11da
Show file tree
Hide file tree
Showing 17 changed files with 114 additions and 140 deletions.
2 changes: 1 addition & 1 deletion ballista/rust/core/src/execution_plans/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,11 +488,11 @@ impl ShuffleWriter {
mod tests {
use super::*;
use datafusion::arrow::array::{StructArray, UInt32Array, UInt64Array, Utf8Array};
use datafusion::field_util::StructArrayExt;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::expressions::Column;
use datafusion::physical_plan::limit::GlobalLimitExec;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::field_util::StructArrayExt;
use tempfile::TempDir;

#[tokio::test]
Expand Down
5 changes: 0 additions & 5 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,8 @@ use std::{
time::Instant,
};

<<<<<<< HEAD
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::io::print;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::parquet::ParquetTable;
use ballista::context::BallistaContext;
use ballista::prelude::{BallistaConfig, BALLISTA_DEFAULT_SHUFFLE_PARTITIONS};

use datafusion::datasource::{MemTable, TableProvider};
use datafusion::error::{DataFusionError, Result};
Expand Down
3 changes: 1 addition & 2 deletions datafusion/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
//! Line delimited JSON format abstractions

use std::any::Any;
use std::io::BufReader;
use std::sync::Arc;

use arrow::datatypes::Schema;
Expand Down Expand Up @@ -61,7 +60,7 @@ impl FileFormat for JsonFormat {
let mut schemas = Vec::new();
let records_to_read = self.schema_infer_max_rec;
while let Some(obj_reader) = readers.next().await {
let mut reader = BufReader::new(obj_reader?.sync_reader()?);
let mut reader = obj_reader?.sync_reader()?;
// FIXME: return number of records read from infer_json_schema so we can enforce
// records_to_read
let schema = json::infer_json_schema(&mut reader, records_to_read)?;
Expand Down
4 changes: 3 additions & 1 deletion datafusion/src/datasource/object_store/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ use std::sync::Arc;
use async_trait::async_trait;
use futures::{stream, AsyncRead, StreamExt};

use crate::datasource::object_store::{FileMeta, FileMetaStream, ListEntryStream, ObjectReader, ObjectStore, ReadSeek};
use crate::datasource::object_store::{
FileMeta, FileMetaStream, ListEntryStream, ObjectReader, ObjectStore, ReadSeek,
};
use crate::datasource::PartitionedFile;
use crate::error::DataFusionError;
use crate::error::Result;
Expand Down
7 changes: 6 additions & 1 deletion datafusion/src/field_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

//! Utility functions for complex field access

use std::borrow::Borrow;
use arrow::array::{ArrayRef, StructArray};
use arrow::datatypes::{DataType, Field};
use std::borrow::Borrow;

use crate::error::{DataFusionError, Result};
use crate::scalar::ScalarValue;
Expand Down Expand Up @@ -73,6 +73,7 @@ pub fn get_indexed_field(data_type: &DataType, key: &ScalarValue) -> Result<Fiel
pub trait StructArrayExt {
fn column_names(&self) -> Vec<&str>;
fn column_by_name(&self, column_name: &str) -> Option<&ArrayRef>;
fn num_columns(&self) -> usize;
}

impl StructArrayExt for StructArray {
Expand All @@ -86,4 +87,8 @@ impl StructArrayExt for StructArray {
.position(|c| c.name() == column_name)
.map(|pos| self.values()[pos].borrow())
}

fn num_columns(&self) -> usize {
self.fields().len()
}
}
3 changes: 2 additions & 1 deletion datafusion/src/physical_plan/datetime_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ fn handle<'a, O, F>(
args: &'a [ColumnarValue],
op: F,
name: &str,
data_type: DataType) -> Result<ColumnarValue>
data_type: DataType,
) -> Result<ColumnarValue>
where
O: NativeType,
ScalarValue: From<Option<O>>,
Expand Down
13 changes: 7 additions & 6 deletions datafusion/src/physical_plan/expressions/approx_distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ use crate::physical_plan::{
use crate::scalar::ScalarValue;
use arrow::array::{ArrayRef, BinaryArray, Offset, PrimitiveArray, Utf8Array};
use arrow::datatypes::{DataType, Field};
use arrow::types::NativeType;
use std::any::type_name;
use std::any::Any;
use std::convert::TryFrom;
use std::convert::TryInto;
use std::hash::Hash;
use std::marker::PhantomData;
use std::sync::Arc;
use arrow::types::NativeType;

/// APPROX_DISTINCT aggregate expression
#[derive(Debug)]
Expand Down Expand Up @@ -229,7 +229,10 @@ macro_rules! default_accumulator_impl {

fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
assert_eq!(1, states.len(), "expect only 1 element in the states");
let binary_array = states[0].as_any().downcast_ref::<BinaryArray>().unwrap();
let binary_array = states[0]
.as_any()
.downcast_ref::<BinaryArray<i32>>()
.unwrap();
for v in binary_array.iter() {
let v = v.ok_or_else(|| {
DataFusionError::Internal(
Expand Down Expand Up @@ -272,8 +275,7 @@ where
T: Offset,
{
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
let array: &BinaryArray<T> =
downcast_value!(values, BinaryArray, T);
let array: &BinaryArray<T> = downcast_value!(values, BinaryArray, T);
// flatten because we would skip nulls
self.hll
.extend(array.into_iter().flatten().map(|v| v.to_vec()));
Expand All @@ -288,8 +290,7 @@ where
T: Offset,
{
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
let array: &Utf8Array<T> =
downcast_value!(values, Utf8Array, T);
let array: &Utf8Array<T> = downcast_value!(values, Utf8Array, T);
// flatten because we would skip nulls
self.hll
.extend(array.into_iter().flatten().map(|i| i.to_string()));
Expand Down
24 changes: 12 additions & 12 deletions datafusion/src/physical_plan/expressions/cume_dist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,18 +88,18 @@ impl PartitionEvaluator for CumeDistEvaluator {
ranks_in_partition: &[Range<usize>],
) -> Result<ArrayRef> {
let scaler = (partition.end - partition.start) as f64;
let result = Float64Array::from_iter_values(
ranks_in_partition
.iter()
.scan(0_u64, |acc, range| {
let len = range.end - range.start;
*acc += len as u64;
let value: f64 = (*acc as f64) / scaler;
let result = iter::repeat(value).take(len);
Some(result)
})
.flatten(),
);
let result = ranks_in_partition
.iter()
.scan(0_u64, |acc, range| {
let len = range.end - range.start;
*acc += len as u64;
let value: f64 = (*acc as f64) / scaler;
let result = iter::repeat(value).take(len);
Some(result)
})
.flatten()
.collect::<Vec<_>>();
let result = Float64Array::from_values(result);
Ok(Arc::new(result))
}
}
Expand Down
11 changes: 6 additions & 5 deletions datafusion/src/physical_plan/expressions/min_max.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ type StringArray = Utf8Array<i32>;
type LargeStringArray = Utf8Array<i64>;

use super::format_state_name;
use crate::arrow::array::Array;
use arrow::array::Int128Array;

// Min/max aggregation can take Dictionary encode input but always produces unpacked
// (aka non Dictionary) output. We need to adjust the output data type to reflect this.
Expand Down Expand Up @@ -167,9 +165,6 @@ macro_rules! typed_min_max_batch_decimal128 {
macro_rules! min_max_batch {
($VALUES:expr, $OP:ident) => {{
match $VALUES.data_type() {
DataType::Decimal(precision, scale) => {
typed_min_max_batch_decimal128!($VALUES, precision, scale, $OP)
}
// all types that have a natural order
DataType::Int64 => {
typed_min_max_batch!($VALUES, Int64Array, Int64, $OP)
Expand Down Expand Up @@ -221,6 +216,9 @@ fn min_batch(values: &ArrayRef) -> Result<ScalarValue> {
DataType::Float32 => {
typed_min_max_batch!(values, Float32Array, Float32, min_primitive)
}
DataType::Decimal(precision, scale) => {
typed_min_max_batch_decimal128!(values, precision, scale, min)
}
_ => min_max_batch!(values, min_primitive),
})
}
Expand All @@ -240,6 +238,9 @@ fn max_batch(values: &ArrayRef) -> Result<ScalarValue> {
DataType::Float32 => {
typed_min_max_batch!(values, Float32Array, Float32, max_primitive)
}
DataType::Decimal(precision, scale) => {
typed_min_max_batch_decimal128!(values, precision, scale, max)
}
_ => min_max_batch!(values, max_primitive),
})
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/src/physical_plan/file_format/file_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl<F: FormatReaderOpener> FileStream<F> {
self.object_store
.file_reader(f.file_meta.sized_file)
.and_then(|r| r.sync_reader())
.map_err(|e| ArrowError::ExternalError(Box::new(e)))
.map_err(|e| ArrowError::External("".to_owned(), Box::new(e)))
.and_then(|f| {
self.batch_iter = (self.file_reader)(f, &self.remain);
self.next_batch().transpose()
Expand Down Expand Up @@ -197,7 +197,7 @@ mod tests {
async fn create_and_collect(limit: Option<usize>) -> Vec<RecordBatch> {
let records = vec![make_partition(3), make_partition(2)];

let source_schema = records[0].schema();
let source_schema = records[0].schema().clone();

let reader = move |_file, _remain: &Option<usize>| {
// this reader returns the same batch regardless of the file
Expand Down
8 changes: 5 additions & 3 deletions datafusion/src/physical_plan/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use crate::{
scalar::ScalarValue,
};
use arrow::array::UInt8Array;
use arrow::datatypes::IntegerType;
use lazy_static::lazy_static;
use std::{
collections::HashMap,
Expand All @@ -52,7 +53,8 @@ use super::{ColumnStatistics, Statistics};

lazy_static! {
/// The datatype used for all partitioning columns for now
pub static ref DEFAULT_PARTITION_COLUMN_DATATYPE: DataType = DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8));
pub static ref DEFAULT_PARTITION_COLUMN_DATATYPE: DataType =
DataType::Dictionary(IntegerType::UInt8, Box::new(DataType::Utf8));
}

/// The base configurations to provide when creating a physical plan for
Expand Down Expand Up @@ -204,7 +206,7 @@ impl PartitionColumnProjector {

Self {
projected_partition_indexes,
key_buffer_cache: None,
key_array_cache: None,
projected_schema,
}
}
Expand All @@ -222,7 +224,7 @@ impl PartitionColumnProjector {
self.projected_schema.fields().len() - self.projected_partition_indexes.len();

if file_batch.columns().len() != expected_cols {
return Err(ArrowError::SchemaError(format!(
return Err(ArrowError::ExternalFormat(format!(
"Unexpected batch schema from file, expected {} cols but got {}",
expected_cols,
file_batch.columns().len()
Expand Down
3 changes: 2 additions & 1 deletion datafusion/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,8 +433,9 @@ fn read_partition(
);
let object_reader =
object_store.file_reader(partitioned_file.file_meta.sized_file.clone())?;
let reader = object_reader.sync_reader()?;
let mut record_reader = read::RecordReader::try_new(
std::io::BufReader::new(object_reader),
reader,
Some(projection.to_vec()),
limit,
None,
Expand Down
6 changes: 2 additions & 4 deletions datafusion/src/physical_plan/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use super::{
type_coercion::{coerce, data_types},
ColumnarValue, PhysicalExpr,
};
use crate::execution::context::ExecutionContextState;
use crate::physical_plan::array_expressions;
use crate::physical_plan::datetime_expressions;
use crate::physical_plan::expressions::{
Expand All @@ -44,9 +45,6 @@ use crate::{
error::{DataFusionError, Result},
scalar::ScalarValue,
};
use crate::{
execution::context::ExecutionContextState,
};
use arrow::{
array::*,
compute::length::length,
Expand Down Expand Up @@ -523,7 +521,7 @@ pub fn return_type(
match fun {
BuiltinScalarFunction::Array => Ok(DataType::FixedSizeList(
Box::new(Field::new("item", input_expr_types[0].clone(), true)),
input_expr_types.len() as i32,
input_expr_types.len(),
)),
BuiltinScalarFunction::Ascii => Ok(DataType::Int32),
BuiltinScalarFunction::BitLength => {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/physical_plan/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -904,7 +904,7 @@ fn produce_from_matched(
}
JoinSide::Right => {
let datatype = schema.field(idx).data_type();
new_null_array(datatype, num_rows).into()
new_null_array(datatype.clone(), num_rows).into()
}
};

Expand Down
5 changes: 1 addition & 4 deletions datafusion/src/physical_plan/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,7 @@ impl ProjectionExec {
})
.collect();

let schema = Arc::new(Schema::new_with_metadata(
fields?,
input_schema.metadata().clone(),
));
let schema = Arc::new(Schema::new_from(fields?, input_schema.metadata().clone()));

Ok(Self {
expr,
Expand Down
3 changes: 2 additions & 1 deletion datafusion/src/physical_plan/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl ValuesExec {
schema
.fields()
.iter()
.map(|field| new_null_array(field.data_type(), 1).into())
.map(|field| new_null_array(field.data_type().clone(), 1).into())
.collect::<Vec<_>>(),
)?;
let arr = (0..n_col)
Expand All @@ -81,6 +81,7 @@ impl ValuesExec {
})
.collect::<Result<Vec<_>>>()
.and_then(ScalarValue::iter_to_array)
.and_then(|b| Ok(Arc::from(b)))
})
.collect::<Result<Vec<_>>>()?;
let batch = RecordBatch::try_new(schema.clone(), arr)?;
Expand Down
Loading

0 comments on commit d5a11da

Please sign in to comment.