From 4b000cdda3ed18c6fb842231169c6713b0e07039 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Tue, 4 Jan 2022 12:27:33 +0800 Subject: [PATCH] Merge df-6.0-arrow2-0.8 and make it compile --- datafusion/Cargo.toml | 4 -- datafusion/src/error.rs | 6 +-- datafusion/src/execution/context.rs | 2 - datafusion/src/execution/disk_manager.rs | 10 ++--- .../src/execution/memory_management/mod.rs | 9 ++-- datafusion/src/execution/mod.rs | 2 +- datafusion/src/execution/runtime_env.rs | 3 +- datafusion/src/physical_plan/common.rs | 2 +- datafusion/src/physical_plan/mod.rs | 4 -- .../src/physical_plan/sorts/external_sort.rs | 41 +++++++++++++------ datafusion/src/physical_plan/sorts/mod.rs | 2 +- datafusion/src/physical_plan/sorts/sort.rs | 6 +-- .../sorts/sort_preserving_merge.rs | 27 ++++-------- 13 files changed, 55 insertions(+), 63 deletions(-) diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index a6447e632166..1277cc3ed163 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -40,11 +40,7 @@ path = "src/lib.rs" [features] default = ["crypto_expressions", "regex_expressions", "unicode_expressions"] simd = ["arrow/simd"] -<<<<<<< HEAD crypto_expressions = ["md-5", "sha2", "blake2", "blake3"] -======= -crypto_expressions = ["md-5", "sha2"] ->>>>>>> ExternalSortExec v1 regex_expressions = ["regex"] unicode_expressions = ["unicode-segmentation"] # FIXME: add pyarrow support to arrow2 pyarrow = ["pyo3", "arrow/pyarrow"] diff --git a/datafusion/src/error.rs b/datafusion/src/error.rs index 89726385329e..d9ac067d6e3f 100644 --- a/datafusion/src/error.rs +++ b/datafusion/src/error.rs @@ -63,7 +63,7 @@ pub enum DataFusionError { Execution(String), /// This error is thrown when a consumer cannot acquire memory from the Memory Manager /// we can just cancel the execution of the partition. - OutOfMemory(String), + ResourcesExhausted(String), } impl DataFusionError { @@ -132,8 +132,8 @@ impl Display for DataFusionError { DataFusionError::Execution(ref desc) => { write!(f, "Execution error: {}", desc) } - DataFusionError::OutOfMemory(ref desc) => { - write!(f, "Out of memory error: {}", desc) + DataFusionError::ResourcesExhausted(ref desc) => { + write!(f, "Resources exhausted: {}", desc) } } } diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 32650ad5a7a6..6f72380b7227 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -31,8 +31,6 @@ use crate::{ }, MemTable, }, - execution::disk_manager::DiskManager, - execution::memory_management::MemoryManager, logical_plan::{PlanType, ToStringifiedPlan}, optimizer::eliminate_limit::EliminateLimit, physical_optimizer::{ diff --git a/datafusion/src/execution/disk_manager.rs b/datafusion/src/execution/disk_manager.rs index 9632374687fe..80cc1506ae0e 100644 --- a/datafusion/src/execution/disk_manager.rs +++ b/datafusion/src/execution/disk_manager.rs @@ -34,7 +34,7 @@ pub struct DiskManager { impl DiskManager { /// Create local dirs inside user provided dirs through conf - pub fn new(conf_dirs: &Vec) -> Result { + pub fn new(conf_dirs: &[String]) -> Result { Ok(Self { local_dirs: create_local_dirs(conf_dirs)?, }) @@ -55,9 +55,9 @@ impl DiskManager { } /// Setup local dirs by creating one new dir in each of the given dirs -fn create_local_dirs(local_dir: &Vec) -> Result> { +fn create_local_dirs(local_dir: &[String]) -> Result> { local_dir - .into_iter() + .iter() .map(|root| create_directory(root, "datafusion")) .collect() } @@ -82,7 +82,7 @@ fn create_directory(root: &str, prefix: &str) -> Result { ))) } -fn get_file(file_name: &str, local_dirs: &Vec) -> String { +fn get_file(file_name: &str, local_dirs: &[String]) -> String { let mut hasher = DefaultHasher::new(); file_name.hash(&mut hasher); let hash = hasher.finish(); @@ -93,7 +93,7 @@ fn get_file(file_name: &str, local_dirs: &Vec) -> String { path.to_str().unwrap().to_string() } -fn create_tmp_file(local_dirs: &Vec) -> Result { +fn create_tmp_file(local_dirs: &[String]) -> Result { let name = Uuid::new_v4().to_string(); let mut path = get_file(&*name, local_dirs); while Path::new(path.as_str()).exists() { diff --git a/datafusion/src/execution/memory_management/mod.rs b/datafusion/src/execution/memory_management/mod.rs index 651072bc99d4..d5d55440e566 100644 --- a/datafusion/src/execution/memory_management/mod.rs +++ b/datafusion/src/execution/memory_management/mod.rs @@ -19,7 +19,8 @@ pub mod allocation_strategist; -use crate::error::DataFusionError::OutOfMemory; +use std::cmp::Reverse; +use crate::error::DataFusionError::ResourcesExhausted; use crate::error::{DataFusionError, Result}; use crate::execution::memory_management::allocation_strategist::{ DummyAllocationStrategist, FairStrategist, MemoryAllocationStrategist, @@ -169,7 +170,7 @@ impl PartitionMemoryManager { } /// Try to acquire `required` of execution memory for the consumer and return the number of bytes - /// obtained, or return OutOfMemoryError if no enough memory avaiable even after possible spills. + /// obtained, or return ResourcesExhausted if no enough memory available even after possible spills. pub async fn acquire_exec_memory( &self, required: usize, @@ -192,7 +193,7 @@ impl PartitionMemoryManager { for c in consumers.iter() { all_consumers.push(c.1.clone()); } - all_consumers.sort_by(|a, b| b.get_used().cmp(&a.get_used())); + all_consumers.sort_by_key(|b| Reverse(b.get_used())); for c in all_consumers.iter_mut() { if c.id() == consumer_id { @@ -235,7 +236,7 @@ impl PartitionMemoryManager { } if got < required { - return Err(OutOfMemory(format!( + return Err(ResourcesExhausted(format!( "Unable to acquire {} bytes of memory, got {}", required, got ))); diff --git a/datafusion/src/execution/mod.rs b/datafusion/src/execution/mod.rs index 864bb33daab7..c7929c179972 100644 --- a/datafusion/src/execution/mod.rs +++ b/datafusion/src/execution/mod.rs @@ -19,7 +19,7 @@ pub mod context; pub mod dataframe_impl; -pub mod options; pub mod disk_manager; pub mod memory_management; +pub mod options; pub mod runtime_env; diff --git a/datafusion/src/execution/runtime_env.rs b/datafusion/src/execution/runtime_env.rs index d0cd4718ffa9..ae64a3733c11 100644 --- a/datafusion/src/execution/runtime_env.rs +++ b/datafusion/src/execution/runtime_env.rs @@ -21,6 +21,7 @@ use crate::error::Result; use crate::execution::disk_manager::DiskManager; use crate::execution::memory_management::{MemoryConsumer, MemoryManager}; +use lazy_static::lazy_static; use std::sync::Arc; lazy_static! { @@ -100,7 +101,7 @@ impl RuntimeConfig { /// Customize exec size pub fn with_local_dirs(mut self, local_dirs: Vec) -> Self { - assert!(local_dirs.len() > 0); + assert!(!local_dirs.is_empty()); self.local_dirs = local_dirs; self } diff --git a/datafusion/src/physical_plan/common.rs b/datafusion/src/physical_plan/common.rs index ce7c446f99df..9099dc50251e 100644 --- a/datafusion/src/physical_plan/common.rs +++ b/datafusion/src/physical_plan/common.rs @@ -294,7 +294,7 @@ pub struct IPCWriterWrapper { impl IPCWriterWrapper { /// Create new writer pub fn new(path: &str, schema: &Schema) -> Result { - let file = File::create(path).map_err(|e| DataFusionError::IoError(e))?; + let file = File::create(path).map_err(DataFusionError::IoError)?; let buffer_writer = std::io::BufWriter::new(file); Ok(Self { num_batches: 0, diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index aa00358f1bbd..277d3f00c6a6 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -624,11 +624,7 @@ pub mod distinct_expressions; pub mod empty; pub mod explain; pub mod expressions; -<<<<<<< HEAD pub mod file_format; -pub mod external_sort; -======= ->>>>>>> move sorts together into submodule pub mod filter; pub mod functions; pub mod hash_aggregate; diff --git a/datafusion/src/physical_plan/sorts/external_sort.rs b/datafusion/src/physical_plan/sorts/external_sort.rs index d66b6f718338..2dce542f922a 100644 --- a/datafusion/src/physical_plan/sorts/external_sort.rs +++ b/datafusion/src/physical_plan/sorts/external_sort.rs @@ -307,12 +307,16 @@ async fn read_spill_as_stream( TKReceiver>, ) = tokio::sync::mpsc::channel(2); let path_clone = path.clone(); - task::spawn_blocking(move || { + let join_handle = task::spawn_blocking(move || { if let Err(e) = read_spill(sender, path_clone) { error!("Failure while reading spill file: {}. Error: {}", path, e); } }); - Ok(RecordBatchReceiverStream::create(&schema, receiver)) + Ok(RecordBatchReceiverStream::create( + &schema, + receiver, + join_handle, + )) } pub(crate) async fn convert_stream_disk_based( @@ -521,30 +525,41 @@ pub async fn external_sort( #[cfg(test)] mod tests { use super::*; + use crate::datasource::object_store::local::LocalFileSystem; use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; use crate::physical_plan::expressions::col; use crate::physical_plan::memory::MemoryExec; - use crate::physical_plan::sorts::SortOptions; use crate::physical_plan::{ collect, - csv::{CsvExec, CsvReadOptions}, + file_format::{CsvExec, PhysicalPlanConfig}, }; use crate::test; + use crate::test_util; use arrow::array::*; + use arrow::compute::sort::SortOptions; use arrow::datatypes::*; #[tokio::test] async fn test_sort() -> Result<()> { - let schema = test::aggr_test_schema(); + let schema = test_util::aggr_test_schema(); let partitions = 4; - let path = test::create_partitioned_csv("aggregate_test_100.csv", partitions)?; - let csv = CsvExec::try_new( - &path, - CsvReadOptions::new().schema(&schema), - None, - 1024, - None, - )?; + let (_, files) = + test::create_partitioned_csv("aggregate_test_100.csv", partitions)?; + + let csv = CsvExec::new( + PhysicalPlanConfig { + object_store: Arc::new(LocalFileSystem {}), + file_schema: Arc::clone(&schema), + file_groups: files, + statistics: Statistics::default(), + projection: None, + batch_size: 1024, + limit: None, + table_partition_cols: vec![], + }, + true, + b',', + ); let sort_exec = Arc::new(ExternalSortExec::try_new( vec![ diff --git a/datafusion/src/physical_plan/sorts/mod.rs b/datafusion/src/physical_plan/sorts/mod.rs index 691ffb836e68..0a055463c099 100644 --- a/datafusion/src/physical_plan/sorts/mod.rs +++ b/datafusion/src/physical_plan/sorts/mod.rs @@ -170,7 +170,7 @@ impl SortKeyCursor { fn init_cmp_if_needed( &self, other: &SortKeyCursor, - zipped: &Vec<((&ArrayRef, &ArrayRef), &SortOptions)>, + zipped: &[((&ArrayRef, &ArrayRef), &SortOptions)], ) -> Result<()> { let hm = self.batch_comparators.read().unwrap(); if !hm.contains_key(&other.batch_idx) { diff --git a/datafusion/src/physical_plan/sorts/sort.rs b/datafusion/src/physical_plan/sorts/sort.rs index 1bbbf258c68a..0a15fb5f0173 100644 --- a/datafusion/src/physical_plan/sorts/sort.rs +++ b/datafusion/src/physical_plan/sorts/sort.rs @@ -17,12 +17,9 @@ //! Defines the SORT plan -use super::common::AbortOnDropSingle; -use super::metrics::{ - BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput, -}; use super::{RecordBatchStream, SendableRecordBatchStream}; use crate::error::{DataFusionError, Result}; +use crate::physical_plan::common::AbortOnDropSingle; use crate::physical_plan::expressions::PhysicalSortExpr; use crate::physical_plan::metrics::{ BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput, @@ -44,7 +41,6 @@ use std::any::Any; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use crate::physical_plan::common::AbortOnDropSingle; /// Sort execution plan #[derive(Debug)] diff --git a/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs index aa3913e9e4c6..37a0d6b83360 100644 --- a/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs @@ -17,8 +17,6 @@ //! Defines the sort preserving merge plan -use super::common::AbortOnDropMany; -use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use std::any::Any; use std::cmp::Ordering; use std::collections::VecDeque; @@ -43,6 +41,7 @@ use crate::execution::memory_management::{ }; use crate::execution::runtime_env::RuntimeEnv; use crate::execution::runtime_env::RUNTIME_ENV; +use crate::physical_plan::common::AbortOnDropMany; use crate::physical_plan::metrics::{ BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, }; @@ -55,7 +54,6 @@ use crate::physical_plan::{ Distribution, ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream, SendableRecordBatchStream, Statistics, }; -use crate::physical_plan::common::AbortOnDropMany; use futures::lock::Mutex; use std::fmt::{Debug, Formatter}; @@ -164,7 +162,7 @@ impl ExecutionPlan for SortPreservingMergeExec { self.input.execute(0).await } _ => { - let (receivers, join_handles) = (0..input_partitions) + let (streams, join_handles) = (0..input_partitions) .into_iter() .map(|part_i| { let (sender, receiver) = mpsc::channel(1); @@ -172,7 +170,7 @@ impl ExecutionPlan for SortPreservingMergeExec { spawn_execution(self.input.clone(), sender, part_i); (receiver, join_handle) }) - .collect(); + .unzip(); Ok(Box::pin( SortPreservingMergeStream::new_from_receiver( @@ -220,10 +218,6 @@ struct MergingStreams { pub(crate) streams: Mutex>, /// The schema of the RecordBatches yielded by this stream schema: SchemaRef, - - /// Drop helper for tasks feeding the [`receivers`](Self::receivers) - _drop_helper: AbortOnDropMany<()>, - /// Runtime runtime: Arc, } @@ -260,7 +254,7 @@ impl MergingStreams { let origin_stream = &mut streams[stream_idx]; match origin_stream { StreamWrapper::Receiver(_) => { - return Err(DataFusionError::Execution( + Err(DataFusionError::Execution( "Unexpected spilling a receiver stream in SortPreservingMerge" .to_string(), )) @@ -367,6 +361,7 @@ pub(crate) struct SortPreservingMergeStream { } impl SortPreservingMergeStream { + #[allow(clippy::too_many_arguments)] pub(crate) async fn new_from_receiver( receivers: Vec>>, _drop_helper: AbortOnDropMany<()>, @@ -384,7 +379,7 @@ impl SortPreservingMergeStream { let receivers = receivers .into_iter() - .map(|s| StreamWrapper::Receiver(s)) + .map(StreamWrapper::Receiver) .collect(); let streams = Arc::new(MergingStreams::new( partition, @@ -439,7 +434,7 @@ impl SortPreservingMergeStream { schema, cursors, streams, - _drop_helper, + _drop_helper: AbortOnDropMany(vec![]), column_expressions: expressions.iter().map(|x| x.expr.clone()).collect(), sort_options: Arc::new(expressions.iter().map(|x| x.options).collect()), target_batch_size, @@ -732,14 +727,8 @@ mod tests { use crate::{assert_batches_eq, test_util}; use super::*; -<<<<<<< HEAD use arrow::datatypes::{DataType, Field, Schema}; use futures::{FutureExt, SinkExt}; - use crate::physical_plan::sorts::sort::SortExec; -======= - use futures::SinkExt; - use tokio_stream::StreamExt; ->>>>>>> Doc #[tokio::test] async fn test_merge_interleave() { @@ -1309,7 +1298,7 @@ mod tests { let baseline_metrics = BaselineMetrics::new(&metrics, 0); let merge_stream = SortPreservingMergeStream::new_from_receiver( - streams, + receivers, AbortOnDropMany(vec![]), batches.schema(), sort.as_slice(),