Skip to content

Commit

Permalink
Merge df-6.0-arrow2-0.8 and make it compile
Browse files Browse the repository at this point in the history
  • Loading branch information
yjshen committed Jan 4, 2022
1 parent 0843c77 commit 4b000cd
Show file tree
Hide file tree
Showing 13 changed files with 55 additions and 63 deletions.
4 changes: 0 additions & 4 deletions datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,7 @@ path = "src/lib.rs"
[features]
default = ["crypto_expressions", "regex_expressions", "unicode_expressions"]
simd = ["arrow/simd"]
<<<<<<< HEAD
crypto_expressions = ["md-5", "sha2", "blake2", "blake3"]
=======
crypto_expressions = ["md-5", "sha2"]
>>>>>>> ExternalSortExec v1
regex_expressions = ["regex"]
unicode_expressions = ["unicode-segmentation"]
# FIXME: add pyarrow support to arrow2 pyarrow = ["pyo3", "arrow/pyarrow"]
Expand Down
6 changes: 3 additions & 3 deletions datafusion/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub enum DataFusionError {
Execution(String),
/// This error is thrown when a consumer cannot acquire memory from the Memory Manager
/// we can just cancel the execution of the partition.
OutOfMemory(String),
ResourcesExhausted(String),
}

impl DataFusionError {
Expand Down Expand Up @@ -132,8 +132,8 @@ impl Display for DataFusionError {
DataFusionError::Execution(ref desc) => {
write!(f, "Execution error: {}", desc)
}
DataFusionError::OutOfMemory(ref desc) => {
write!(f, "Out of memory error: {}", desc)
DataFusionError::ResourcesExhausted(ref desc) => {
write!(f, "Resources exhausted: {}", desc)
}
}
}
Expand Down
2 changes: 0 additions & 2 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ use crate::{
},
MemTable,
},
execution::disk_manager::DiskManager,
execution::memory_management::MemoryManager,
logical_plan::{PlanType, ToStringifiedPlan},
optimizer::eliminate_limit::EliminateLimit,
physical_optimizer::{
Expand Down
10 changes: 5 additions & 5 deletions datafusion/src/execution/disk_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub struct DiskManager {

impl DiskManager {
/// Create local dirs inside user provided dirs through conf
pub fn new(conf_dirs: &Vec<String>) -> Result<Self> {
pub fn new(conf_dirs: &[String]) -> Result<Self> {
Ok(Self {
local_dirs: create_local_dirs(conf_dirs)?,
})
Expand All @@ -55,9 +55,9 @@ impl DiskManager {
}

/// Setup local dirs by creating one new dir in each of the given dirs
fn create_local_dirs(local_dir: &Vec<String>) -> Result<Vec<String>> {
fn create_local_dirs(local_dir: &[String]) -> Result<Vec<String>> {
local_dir
.into_iter()
.iter()
.map(|root| create_directory(root, "datafusion"))
.collect()
}
Expand All @@ -82,7 +82,7 @@ fn create_directory(root: &str, prefix: &str) -> Result<String> {
)))
}

fn get_file(file_name: &str, local_dirs: &Vec<String>) -> String {
fn get_file(file_name: &str, local_dirs: &[String]) -> String {
let mut hasher = DefaultHasher::new();
file_name.hash(&mut hasher);
let hash = hasher.finish();
Expand All @@ -93,7 +93,7 @@ fn get_file(file_name: &str, local_dirs: &Vec<String>) -> String {
path.to_str().unwrap().to_string()
}

fn create_tmp_file(local_dirs: &Vec<String>) -> Result<String> {
fn create_tmp_file(local_dirs: &[String]) -> Result<String> {
let name = Uuid::new_v4().to_string();
let mut path = get_file(&*name, local_dirs);
while Path::new(path.as_str()).exists() {
Expand Down
9 changes: 5 additions & 4 deletions datafusion/src/execution/memory_management/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@

pub mod allocation_strategist;

use crate::error::DataFusionError::OutOfMemory;
use std::cmp::Reverse;
use crate::error::DataFusionError::ResourcesExhausted;
use crate::error::{DataFusionError, Result};
use crate::execution::memory_management::allocation_strategist::{
DummyAllocationStrategist, FairStrategist, MemoryAllocationStrategist,
Expand Down Expand Up @@ -169,7 +170,7 @@ impl PartitionMemoryManager {
}

/// Try to acquire `required` of execution memory for the consumer and return the number of bytes
/// obtained, or return OutOfMemoryError if no enough memory avaiable even after possible spills.
/// obtained, or return ResourcesExhausted if no enough memory available even after possible spills.
pub async fn acquire_exec_memory(
&self,
required: usize,
Expand All @@ -192,7 +193,7 @@ impl PartitionMemoryManager {
for c in consumers.iter() {
all_consumers.push(c.1.clone());
}
all_consumers.sort_by(|a, b| b.get_used().cmp(&a.get_used()));
all_consumers.sort_by_key(|b| Reverse(b.get_used()));

for c in all_consumers.iter_mut() {
if c.id() == consumer_id {
Expand Down Expand Up @@ -235,7 +236,7 @@ impl PartitionMemoryManager {
}

if got < required {
return Err(OutOfMemory(format!(
return Err(ResourcesExhausted(format!(
"Unable to acquire {} bytes of memory, got {}",
required, got
)));
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

pub mod context;
pub mod dataframe_impl;
pub mod options;
pub mod disk_manager;
pub mod memory_management;
pub mod options;
pub mod runtime_env;
3 changes: 2 additions & 1 deletion datafusion/src/execution/runtime_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
use crate::error::Result;
use crate::execution::disk_manager::DiskManager;
use crate::execution::memory_management::{MemoryConsumer, MemoryManager};
use lazy_static::lazy_static;
use std::sync::Arc;

lazy_static! {
Expand Down Expand Up @@ -100,7 +101,7 @@ impl RuntimeConfig {

/// Customize exec size
pub fn with_local_dirs(mut self, local_dirs: Vec<String>) -> Self {
assert!(local_dirs.len() > 0);
assert!(!local_dirs.is_empty());
self.local_dirs = local_dirs;
self
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/physical_plan/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ pub struct IPCWriterWrapper {
impl IPCWriterWrapper {
/// Create new writer
pub fn new(path: &str, schema: &Schema) -> Result<Self> {
let file = File::create(path).map_err(|e| DataFusionError::IoError(e))?;
let file = File::create(path).map_err(DataFusionError::IoError)?;
let buffer_writer = std::io::BufWriter::new(file);
Ok(Self {
num_batches: 0,
Expand Down
4 changes: 0 additions & 4 deletions datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -624,11 +624,7 @@ pub mod distinct_expressions;
pub mod empty;
pub mod explain;
pub mod expressions;
<<<<<<< HEAD
pub mod file_format;
pub mod external_sort;
=======
>>>>>>> move sorts together into submodule
pub mod filter;
pub mod functions;
pub mod hash_aggregate;
Expand Down
41 changes: 28 additions & 13 deletions datafusion/src/physical_plan/sorts/external_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,12 +307,16 @@ async fn read_spill_as_stream(
TKReceiver<ArrowResult<RecordBatch>>,
) = tokio::sync::mpsc::channel(2);
let path_clone = path.clone();
task::spawn_blocking(move || {
let join_handle = task::spawn_blocking(move || {
if let Err(e) = read_spill(sender, path_clone) {
error!("Failure while reading spill file: {}. Error: {}", path, e);
}
});
Ok(RecordBatchReceiverStream::create(&schema, receiver))
Ok(RecordBatchReceiverStream::create(
&schema,
receiver,
join_handle,
))
}

pub(crate) async fn convert_stream_disk_based(
Expand Down Expand Up @@ -521,30 +525,41 @@ pub async fn external_sort(
#[cfg(test)]
mod tests {
use super::*;
use crate::datasource::object_store::local::LocalFileSystem;
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::expressions::col;
use crate::physical_plan::memory::MemoryExec;
use crate::physical_plan::sorts::SortOptions;
use crate::physical_plan::{
collect,
csv::{CsvExec, CsvReadOptions},
file_format::{CsvExec, PhysicalPlanConfig},
};
use crate::test;
use crate::test_util;
use arrow::array::*;
use arrow::compute::sort::SortOptions;
use arrow::datatypes::*;

#[tokio::test]
async fn test_sort() -> Result<()> {
let schema = test::aggr_test_schema();
let schema = test_util::aggr_test_schema();
let partitions = 4;
let path = test::create_partitioned_csv("aggregate_test_100.csv", partitions)?;
let csv = CsvExec::try_new(
&path,
CsvReadOptions::new().schema(&schema),
None,
1024,
None,
)?;
let (_, files) =
test::create_partitioned_csv("aggregate_test_100.csv", partitions)?;

let csv = CsvExec::new(
PhysicalPlanConfig {
object_store: Arc::new(LocalFileSystem {}),
file_schema: Arc::clone(&schema),
file_groups: files,
statistics: Statistics::default(),
projection: None,
batch_size: 1024,
limit: None,
table_partition_cols: vec![],
},
true,
b',',
);

let sort_exec = Arc::new(ExternalSortExec::try_new(
vec![
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/physical_plan/sorts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ impl SortKeyCursor {
fn init_cmp_if_needed(
&self,
other: &SortKeyCursor,
zipped: &Vec<((&ArrayRef, &ArrayRef), &SortOptions)>,
zipped: &[((&ArrayRef, &ArrayRef), &SortOptions)],
) -> Result<()> {
let hm = self.batch_comparators.read().unwrap();
if !hm.contains_key(&other.batch_idx) {
Expand Down
6 changes: 1 addition & 5 deletions datafusion/src/physical_plan/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,9 @@

//! Defines the SORT plan

use super::common::AbortOnDropSingle;
use super::metrics::{
BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput,
};
use super::{RecordBatchStream, SendableRecordBatchStream};
use crate::error::{DataFusionError, Result};
use crate::physical_plan::common::AbortOnDropSingle;
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::metrics::{
BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput,
Expand All @@ -44,7 +41,6 @@ use std::any::Any;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use crate::physical_plan::common::AbortOnDropSingle;

/// Sort execution plan
#[derive(Debug)]
Expand Down
27 changes: 8 additions & 19 deletions datafusion/src/physical_plan/sorts/sort_preserving_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

//! Defines the sort preserving merge plan

use super::common::AbortOnDropMany;
use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use std::any::Any;
use std::cmp::Ordering;
use std::collections::VecDeque;
Expand All @@ -43,6 +41,7 @@ use crate::execution::memory_management::{
};
use crate::execution::runtime_env::RuntimeEnv;
use crate::execution::runtime_env::RUNTIME_ENV;
use crate::physical_plan::common::AbortOnDropMany;
use crate::physical_plan::metrics::{
BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet,
};
Expand All @@ -55,7 +54,6 @@ use crate::physical_plan::{
Distribution, ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream,
SendableRecordBatchStream, Statistics,
};
use crate::physical_plan::common::AbortOnDropMany;
use futures::lock::Mutex;
use std::fmt::{Debug, Formatter};

Expand Down Expand Up @@ -164,15 +162,15 @@ impl ExecutionPlan for SortPreservingMergeExec {
self.input.execute(0).await
}
_ => {
let (receivers, join_handles) = (0..input_partitions)
let (streams, join_handles) = (0..input_partitions)
.into_iter()
.map(|part_i| {
let (sender, receiver) = mpsc::channel(1);
let join_handle =
spawn_execution(self.input.clone(), sender, part_i);
(receiver, join_handle)
})
.collect();
.unzip();

Ok(Box::pin(
SortPreservingMergeStream::new_from_receiver(
Expand Down Expand Up @@ -220,10 +218,6 @@ struct MergingStreams {
pub(crate) streams: Mutex<Vec<StreamWrapper>>,
/// The schema of the RecordBatches yielded by this stream
schema: SchemaRef,

/// Drop helper for tasks feeding the [`receivers`](Self::receivers)
_drop_helper: AbortOnDropMany<()>,

/// Runtime
runtime: Arc<RuntimeEnv>,
}
Expand Down Expand Up @@ -260,7 +254,7 @@ impl MergingStreams {
let origin_stream = &mut streams[stream_idx];
match origin_stream {
StreamWrapper::Receiver(_) => {
return Err(DataFusionError::Execution(
Err(DataFusionError::Execution(
"Unexpected spilling a receiver stream in SortPreservingMerge"
.to_string(),
))
Expand Down Expand Up @@ -367,6 +361,7 @@ pub(crate) struct SortPreservingMergeStream {
}

impl SortPreservingMergeStream {
#[allow(clippy::too_many_arguments)]
pub(crate) async fn new_from_receiver(
receivers: Vec<mpsc::Receiver<ArrowResult<RecordBatch>>>,
_drop_helper: AbortOnDropMany<()>,
Expand All @@ -384,7 +379,7 @@ impl SortPreservingMergeStream {

let receivers = receivers
.into_iter()
.map(|s| StreamWrapper::Receiver(s))
.map(StreamWrapper::Receiver)
.collect();
let streams = Arc::new(MergingStreams::new(
partition,
Expand Down Expand Up @@ -439,7 +434,7 @@ impl SortPreservingMergeStream {
schema,
cursors,
streams,
_drop_helper,
_drop_helper: AbortOnDropMany(vec![]),
column_expressions: expressions.iter().map(|x| x.expr.clone()).collect(),
sort_options: Arc::new(expressions.iter().map(|x| x.options).collect()),
target_batch_size,
Expand Down Expand Up @@ -732,14 +727,8 @@ mod tests {
use crate::{assert_batches_eq, test_util};

use super::*;
<<<<<<< HEAD
use arrow::datatypes::{DataType, Field, Schema};
use futures::{FutureExt, SinkExt};
use crate::physical_plan::sorts::sort::SortExec;
=======
use futures::SinkExt;
use tokio_stream::StreamExt;
>>>>>>> Doc

#[tokio::test]
async fn test_merge_interleave() {
Expand Down Expand Up @@ -1309,7 +1298,7 @@ mod tests {
let baseline_metrics = BaselineMetrics::new(&metrics, 0);

let merge_stream = SortPreservingMergeStream::new_from_receiver(
streams,
receivers,
AbortOnDropMany(vec![]),
batches.schema(),
sort.as_slice(),
Expand Down

0 comments on commit 4b000cd

Please sign in to comment.