From e6bcf041b05ef254a737bb0ca9bf5c79a141ff90 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=9E=97=E4=BC=9F?= Date: Wed, 3 Jul 2024 10:45:18 +0800 Subject: [PATCH 1/4] Enable clone_on_ref_ptr clippy lint on physical-plan crate --- .../physical-plan/src/aggregates/mod.rs | 85 +++++----- .../src/aggregates/no_grouping.rs | 9 +- .../src/aggregates/order/partial.rs | 3 +- .../physical-plan/src/aggregates/row_hash.rs | 12 +- .../src/aggregates/topk_stream.rs | 10 +- datafusion/physical-plan/src/analyze.rs | 16 +- .../physical-plan/src/coalesce_batches.rs | 8 +- .../physical-plan/src/coalesce_partitions.rs | 10 +- datafusion/physical-plan/src/common.rs | 7 +- datafusion/physical-plan/src/empty.rs | 15 +- datafusion/physical-plan/src/explain.rs | 6 +- datafusion/physical-plan/src/filter.rs | 14 +- datafusion/physical-plan/src/insert.rs | 14 +- .../physical-plan/src/joins/cross_join.rs | 14 +- .../physical-plan/src/joins/hash_join.rs | 150 +++++++++++------- .../src/joins/nested_loop_join.rs | 25 +-- .../src/joins/sort_merge_join.rs | 69 ++++---- .../src/joins/stream_join_utils.rs | 10 +- .../src/joins/symmetric_hash_join.rs | 20 +-- .../physical-plan/src/joins/test_utils.rs | 51 +++--- datafusion/physical-plan/src/joins/utils.rs | 12 +- datafusion/physical-plan/src/lib.rs | 8 +- datafusion/physical-plan/src/limit.rs | 12 +- datafusion/physical-plan/src/memory.rs | 9 +- .../physical-plan/src/placeholder_row.rs | 13 +- datafusion/physical-plan/src/projection.rs | 20 +-- .../physical-plan/src/recursive_query.rs | 23 ++- .../physical-plan/src/repartition/mod.rs | 56 ++++--- datafusion/physical-plan/src/sorts/builder.rs | 6 +- datafusion/physical-plan/src/sorts/merge.rs | 3 +- .../physical-plan/src/sorts/partial_sort.rs | 53 ++++--- datafusion/physical-plan/src/sorts/sort.rs | 63 +++++--- .../src/sorts/sort_preserving_merge.rs | 34 ++-- datafusion/physical-plan/src/sorts/stream.rs | 2 +- datafusion/physical-plan/src/stream.rs | 28 ++-- datafusion/physical-plan/src/streaming.rs | 6 +- datafusion/physical-plan/src/test.rs | 5 +- datafusion/physical-plan/src/test/exec.rs | 14 +- datafusion/physical-plan/src/topk/mod.rs | 4 +- datafusion/physical-plan/src/tree_node.rs | 2 +- datafusion/physical-plan/src/union.rs | 10 +- datafusion/physical-plan/src/unnest.rs | 36 ++--- datafusion/physical-plan/src/values.rs | 10 +- .../src/windows/bounded_window_agg_exec.rs | 56 ++++--- datafusion/physical-plan/src/windows/mod.rs | 33 ++-- .../src/windows/window_agg_exec.rs | 15 +- datafusion/physical-plan/src/work_table.rs | 8 +- 47 files changed, 623 insertions(+), 466 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 533d10357b0e..1214dc272c28 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -188,7 +188,7 @@ impl PhysicalGroupBy { pub fn input_exprs(&self) -> Vec> { self.expr .iter() - .map(|(expr, _alias)| expr.clone()) + .map(|(expr, _alias)| Arc::clone(expr)) .collect() } @@ -283,9 +283,9 @@ impl AggregateExec { group_by: self.group_by.clone(), filter_expr: self.filter_expr.clone(), limit: self.limit, - input: self.input.clone(), - schema: self.schema.clone(), - input_schema: self.input_schema.clone(), + input: Arc::clone(&self.input), + schema: Arc::clone(&self.schema), + input_schema: Arc::clone(&self.input_schema), } } @@ -355,7 +355,7 @@ impl AggregateExec { let mut new_requirement = indices .iter() .map(|&idx| PhysicalSortRequirement { - expr: groupby_exprs[idx].clone(), + expr: Arc::clone(&groupby_exprs[idx]), options: None, }) .collect::>(); @@ -387,7 +387,7 @@ impl AggregateExec { let cache = Self::compute_properties( &input, - schema.clone(), + Arc::clone(&schema), &projection_mapping, &mode, &input_order_mode, @@ -446,7 +446,7 @@ impl AggregateExec { /// Get the input schema before any aggregates are applied pub fn input_schema(&self) -> SchemaRef { - self.input_schema.clone() + Arc::clone(&self.input_schema) } /// number of rows soft limit of the AggregateExec @@ -700,9 +700,9 @@ impl ExecutionPlan for AggregateExec { self.group_by.clone(), self.aggr_expr.clone(), self.filter_expr.clone(), - children[0].clone(), - self.input_schema.clone(), - self.schema.clone(), + Arc::clone(&children[0]), + Arc::clone(&self.input_schema), + Arc::clone(&self.schema), )?; me.limit = self.limit; @@ -999,7 +999,7 @@ fn aggregate_expressions( // way order sensitive aggregators can satisfy requirement // themselves. if let Some(ordering_req) = agg.order_bys() { - result.extend(ordering_req.iter().map(|item| item.expr.clone())); + result.extend(ordering_req.iter().map(|item| Arc::clone(&item.expr))); } result }) @@ -1159,9 +1159,9 @@ pub(crate) fn evaluate_group_by( .enumerate() .map(|(idx, is_null)| { if *is_null { - null_exprs[idx].clone() + Arc::clone(&null_exprs[idx]) } else { - exprs[idx].clone() + Arc::clone(&exprs[idx]) } }) .collect() @@ -1227,10 +1227,10 @@ mod tests { // define data. ( - schema.clone(), + Arc::clone(&schema), vec![ RecordBatch::try_new( - schema.clone(), + Arc::clone(&schema), vec![ Arc::new(UInt32Array::from(vec![2, 3, 4, 4])), Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0])), @@ -1262,10 +1262,10 @@ mod tests { // the expected result by accident, but merging actually works properly; // i.e. it doesn't depend on the data insertion order. ( - schema.clone(), + Arc::clone(&schema), vec![ RecordBatch::try_new( - schema.clone(), + Arc::clone(&schema), vec![ Arc::new(UInt32Array::from(vec![2, 3, 4, 4])), Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0])), @@ -1273,7 +1273,7 @@ mod tests { ) .unwrap(), RecordBatch::try_new( - schema.clone(), + Arc::clone(&schema), vec![ Arc::new(UInt32Array::from(vec![2, 3, 3, 4])), Arc::new(Float64Array::from(vec![0.0, 1.0, 2.0, 3.0])), @@ -1281,7 +1281,7 @@ mod tests { ) .unwrap(), RecordBatch::try_new( - schema.clone(), + Arc::clone(&schema), vec![ Arc::new(UInt32Array::from(vec![2, 3, 3, 4])), Arc::new(Float64Array::from(vec![3.0, 4.0, 5.0, 6.0])), @@ -1361,11 +1361,11 @@ mod tests { aggregates.clone(), vec![None], input, - input_schema.clone(), + Arc::clone(&input_schema), )?); let result = - common::collect(partial_aggregate.execute(0, task_ctx.clone())?).await?; + common::collect(partial_aggregate.execute(0, Arc::clone(&task_ctx))?).await?; let expected = if spill { vec![ @@ -1443,7 +1443,7 @@ mod tests { )?); let result = - common::collect(merged_aggregate.execute(0, task_ctx.clone())?).await?; + common::collect(merged_aggregate.execute(0, Arc::clone(&task_ctx))?).await?; let batch = concat_batches(&result[0].schema(), &result)?; assert_eq!(batch.num_columns(), 3); assert_eq!(batch.num_rows(), 12); @@ -1511,11 +1511,11 @@ mod tests { aggregates.clone(), vec![None], input, - input_schema.clone(), + Arc::clone(&input_schema), )?); let result = - common::collect(partial_aggregate.execute(0, task_ctx.clone())?).await?; + common::collect(partial_aggregate.execute(0, Arc::clone(&task_ctx))?).await?; let expected = if spill { vec![ @@ -1565,7 +1565,7 @@ mod tests { // enlarge memory limit to let the final aggregation finish new_spill_ctx(2, 2600) } else { - task_ctx.clone() + Arc::clone(&task_ctx) }; let result = common::collect(merged_aggregate.execute(0, task_ctx)?).await?; let batch = concat_batches(&result[0].schema(), &result)?; @@ -1848,11 +1848,11 @@ mod tests { groups, aggregates, vec![None; n_aggr], - input.clone(), - input_schema.clone(), + Arc::clone(&input), + Arc::clone(&input_schema), )?); - let stream = partial_aggregate.execute_typed(0, task_ctx.clone())?; + let stream = partial_aggregate.execute_typed(0, Arc::clone(&task_ctx))?; // ensure that we really got the version we wanted match version { @@ -2104,7 +2104,7 @@ mod tests { vec![partition3], vec![partition4], ], - schema.clone(), + Arc::clone(&schema), None, )?); let aggregate_exec = Arc::new(AggregateExec::try_new( @@ -2113,7 +2113,7 @@ mod tests { aggregates.clone(), vec![None], memory_exec, - schema.clone(), + Arc::clone(&schema), )?); let coalesce = if use_coalesce_batches { let coalesce = Arc::new(CoalescePartitionsExec::new(aggregate_exec)); @@ -2178,41 +2178,41 @@ mod tests { let order_by_exprs = vec![ None, Some(vec![PhysicalSortExpr { - expr: col_a.clone(), + expr: Arc::clone(col_a), options: options1, }]), Some(vec![ PhysicalSortExpr { - expr: col_a.clone(), + expr: Arc::clone(col_a), options: options1, }, PhysicalSortExpr { - expr: col_b.clone(), + expr: Arc::clone(col_b), options: options1, }, PhysicalSortExpr { - expr: col_c.clone(), + expr: Arc::clone(col_c), options: options1, }, ]), Some(vec![ PhysicalSortExpr { - expr: col_a.clone(), + expr: Arc::clone(col_a), options: options1, }, PhysicalSortExpr { - expr: col_b.clone(), + expr: Arc::clone(col_b), options: options1, }, ]), ]; let common_requirement = vec![ PhysicalSortExpr { - expr: col_a.clone(), + expr: Arc::clone(col_a), options: options1, }, PhysicalSortExpr { - expr: col_c.clone(), + expr: Arc::clone(col_c), options: options1, }, ]; @@ -2220,7 +2220,7 @@ mod tests { .into_iter() .map(|order_by_expr| { Arc::new(OrderSensitiveArrayAgg::new( - col_a.clone(), + Arc::clone(col_a), "array_agg", DataType::Int32, false, @@ -2265,12 +2265,11 @@ mod tests { groups, aggregates.clone(), vec![None, None], - blocking_exec.clone(), + Arc::clone(&blocking_exec) as Arc, schema, )?); - let new_agg = aggregate_exec - .clone() - .with_new_children(vec![blocking_exec])?; + let new_agg = + Arc::clone(&aggregate_exec).with_new_children(vec![blocking_exec])?; assert_eq!(new_agg.schema(), aggregate_exec.schema()); Ok(()) } diff --git a/datafusion/physical-plan/src/aggregates/no_grouping.rs b/datafusion/physical-plan/src/aggregates/no_grouping.rs index 5ec95bd79942..f85164f7f1e2 100644 --- a/datafusion/physical-plan/src/aggregates/no_grouping.rs +++ b/datafusion/physical-plan/src/aggregates/no_grouping.rs @@ -140,8 +140,11 @@ impl AggregateStream { let result = finalize_aggregation(&mut this.accumulators, &this.mode) .and_then(|columns| { - RecordBatch::try_new(this.schema.clone(), columns) - .map_err(Into::into) + RecordBatch::try_new( + Arc::clone(&this.schema), + columns, + ) + .map_err(Into::into) }) .record_output(&this.baseline_metrics); @@ -181,7 +184,7 @@ impl Stream for AggregateStream { impl RecordBatchStream for AggregateStream { fn schema(&self) -> SchemaRef { - self.schema.clone() + Arc::clone(&self.schema) } } diff --git a/datafusion/physical-plan/src/aggregates/order/partial.rs b/datafusion/physical-plan/src/aggregates/order/partial.rs index ecd37c913e98..f8fd86ff8b50 100644 --- a/datafusion/physical-plan/src/aggregates/order/partial.rs +++ b/datafusion/physical-plan/src/aggregates/order/partial.rs @@ -22,6 +22,7 @@ use datafusion_common::Result; use datafusion_execution::memory_pool::proxy::VecAllocExt; use datafusion_expr::EmitTo; use datafusion_physical_expr::PhysicalSortExpr; +use std::sync::Arc; /// Tracks grouping state when the data is ordered by some subset of /// the group keys. @@ -138,7 +139,7 @@ impl GroupOrderingPartial { let sort_values: Vec<_> = self .order_indices .iter() - .map(|&idx| group_values[idx].clone()) + .map(|&idx| Arc::clone(&group_values[idx])) .collect(); Ok(self.row_converter.convert_columns(&sort_values)?) diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index ad0860b93a3a..f16dc477149b 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -358,7 +358,7 @@ impl GroupedHashAggregateStream { let spill_state = SpillState { spills: vec![], spill_expr, - spill_schema: agg_schema.clone(), + spill_schema: Arc::clone(&agg_schema), is_stream_merging: false, merging_aggregate_arguments, merging_group_by: PhysicalGroupBy::new_single(agg_group_by.expr.clone()), @@ -401,7 +401,7 @@ pub(crate) fn create_group_accumulator( "Creating GroupsAccumulatorAdapter for {}: {agg_expr:?}", agg_expr.name() ); - let agg_expr_captured = agg_expr.clone(); + let agg_expr_captured = Arc::clone(agg_expr); let factory = move || agg_expr_captured.create_accumulator(); Ok(Box::new(GroupsAccumulatorAdapter::new(factory))) } @@ -515,7 +515,7 @@ impl Stream for GroupedHashAggregateStream { impl RecordBatchStream for GroupedHashAggregateStream { fn schema(&self) -> SchemaRef { - self.schema.clone() + Arc::clone(&self.schema) } } @@ -625,7 +625,7 @@ impl GroupedHashAggregateStream { /// accumulator states/values specified in emit_to fn emit(&mut self, emit_to: EmitTo, spilling: bool) -> Result { let schema = if spilling { - self.spill_state.spill_schema.clone() + Arc::clone(&self.spill_state.spill_schema) } else { self.schema() }; @@ -746,13 +746,13 @@ impl GroupedHashAggregateStream { let expr = self.spill_state.spill_expr.clone(); let schema = batch.schema(); streams.push(Box::pin(RecordBatchStreamAdapter::new( - schema.clone(), + Arc::clone(&schema), futures::stream::once(futures::future::lazy(move |_| { sort_batch(&batch, &expr, None) })), ))); for spill in self.spill_state.spills.drain(..) { - let stream = read_spill_as_stream(spill, schema.clone())?; + let stream = read_spill_as_stream(spill, Arc::clone(&schema))?; streams.push(stream); } self.spill_state.is_stream_merging = true; diff --git a/datafusion/physical-plan/src/aggregates/topk_stream.rs b/datafusion/physical-plan/src/aggregates/topk_stream.rs index 9f25473cb9b4..075d8c5f2883 100644 --- a/datafusion/physical-plan/src/aggregates/topk_stream.rs +++ b/datafusion/physical-plan/src/aggregates/topk_stream.rs @@ -84,14 +84,14 @@ impl GroupedTopKAggregateStream { impl RecordBatchStream for GroupedTopKAggregateStream { fn schema(&self) -> SchemaRef { - self.schema.clone() + Arc::clone(&self.schema) } } impl GroupedTopKAggregateStream { fn intern(&mut self, ids: ArrayRef, vals: ArrayRef) -> Result<()> { let len = ids.len(); - self.priority_map.set_batch(ids, vals.clone()); + self.priority_map.set_batch(ids, Arc::clone(&vals)); let has_nulls = vals.null_count() > 0; for row_idx in 0..len { @@ -139,14 +139,14 @@ impl Stream for GroupedTopKAggregateStream { 1, "Exactly 1 group value required" ); - let group_by_values = group_by_values[0][0].clone(); + let group_by_values = Arc::clone(&group_by_values[0][0]); let input_values = evaluate_many( &self.aggregate_arguments, batches.first().unwrap(), )?; assert_eq!(input_values.len(), 1, "Exactly 1 input required"); assert_eq!(input_values[0].len(), 1, "Exactly 1 input required"); - let input_values = input_values[0][0].clone(); + let input_values = Arc::clone(&input_values[0][0]); // iterate over each column of group_by values (*self).intern(group_by_values, input_values)?; @@ -158,7 +158,7 @@ impl Stream for GroupedTopKAggregateStream { return Poll::Ready(None); } let cols = self.priority_map.emit()?; - let batch = RecordBatch::try_new(self.schema.clone(), cols)?; + let batch = RecordBatch::try_new(Arc::clone(&self.schema), cols)?; trace!( "partition {} emit batch with {} rows", self.partition, diff --git a/datafusion/physical-plan/src/analyze.rs b/datafusion/physical-plan/src/analyze.rs index 5b859804163b..b4c1e25e6191 100644 --- a/datafusion/physical-plan/src/analyze.rs +++ b/datafusion/physical-plan/src/analyze.rs @@ -59,7 +59,7 @@ impl AnalyzeExec { input: Arc, schema: SchemaRef, ) -> Self { - let cache = Self::compute_properties(&input, schema.clone()); + let cache = Self::compute_properties(&input, Arc::clone(&schema)); AnalyzeExec { verbose, show_statistics, @@ -141,7 +141,7 @@ impl ExecutionPlan for AnalyzeExec { self.verbose, self.show_statistics, children.pop().unwrap(), - self.schema.clone(), + Arc::clone(&self.schema), ))) } @@ -164,13 +164,17 @@ impl ExecutionPlan for AnalyzeExec { RecordBatchReceiverStream::builder(self.schema(), num_input_partitions); for input_partition in 0..num_input_partitions { - builder.run_input(self.input.clone(), input_partition, context.clone()); + builder.run_input( + Arc::clone(&self.input), + input_partition, + Arc::clone(&context), + ); } // Create future that computes thefinal output let start = Instant::now(); - let captured_input = self.input.clone(); - let captured_schema = self.schema.clone(); + let captured_input = Arc::clone(&self.input); + let captured_schema = Arc::clone(&self.schema); let verbose = self.verbose; let show_statistics = self.show_statistics; @@ -196,7 +200,7 @@ impl ExecutionPlan for AnalyzeExec { }; Ok(Box::pin(RecordBatchStreamAdapter::new( - self.schema.clone(), + Arc::clone(&self.schema), futures::stream::once(output), ))) } diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index 804fabff71ac..b9bdfcdee712 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -134,7 +134,7 @@ impl ExecutionPlan for CoalesceBatchesExec { children: Vec>, ) -> Result> { Ok(Arc::new(CoalesceBatchesExec::new( - children[0].clone(), + Arc::clone(&children[0]), self.target_batch_size, ))) } @@ -272,7 +272,7 @@ impl CoalesceBatchesStream { impl RecordBatchStream for CoalesceBatchesStream { fn schema(&self) -> SchemaRef { - self.schema.clone() + Arc::clone(&self.schema) } } @@ -329,7 +329,7 @@ mod tests { target_batch_size: usize, ) -> Result>> { // create physical plan - let exec = MemoryExec::try_new(&input_partitions, schema.clone(), None)?; + let exec = MemoryExec::try_new(&input_partitions, Arc::clone(schema), None)?; let exec = RepartitionExec::try_new(Arc::new(exec), Partitioning::RoundRobinBatch(1))?; let exec: Arc = @@ -341,7 +341,7 @@ mod tests { for i in 0..output_partition_count { // execute this *output* partition and collect all batches let task_ctx = Arc::new(TaskContext::default()); - let mut stream = exec.execute(i, task_ctx.clone())?; + let mut stream = exec.execute(i, Arc::clone(&task_ctx))?; let mut batches = vec![]; while let Some(result) = stream.next().await { batches.push(result?); diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs index ce67cba2cd0e..f4ae510b907d 100644 --- a/datafusion/physical-plan/src/coalesce_partitions.rs +++ b/datafusion/physical-plan/src/coalesce_partitions.rs @@ -114,7 +114,9 @@ impl ExecutionPlan for CoalescePartitionsExec { self: Arc, children: Vec>, ) -> Result> { - Ok(Arc::new(CoalescePartitionsExec::new(children[0].clone()))) + Ok(Arc::new(CoalescePartitionsExec::new(Arc::clone( + &children[0], + )))) } fn execute( @@ -152,7 +154,11 @@ impl ExecutionPlan for CoalescePartitionsExec { // spawn independent tasks whose resulting streams (of batches) // are sent to the channel for consumption. for part_i in 0..input_partitions { - builder.run_input(self.input.clone(), part_i, context.clone()); + builder.run_input( + Arc::clone(&self.input), + part_i, + Arc::clone(&context), + ); } let stream = builder.build(); diff --git a/datafusion/physical-plan/src/common.rs b/datafusion/physical-plan/src/common.rs index c61e9a05bfa6..bf9d14e73dd8 100644 --- a/datafusion/physical-plan/src/common.rs +++ b/datafusion/physical-plan/src/common.rs @@ -618,16 +618,17 @@ mod tests { expr: col("f32", &schema).unwrap(), options: SortOptions::default(), }]; - let memory_exec = Arc::new(MemoryExec::try_new(&[], schema.clone(), None)?) as _; + let memory_exec = + Arc::new(MemoryExec::try_new(&[], Arc::clone(&schema), None)?) as _; let sort_exec = Arc::new(SortExec::new(sort_expr.clone(), memory_exec)) as Arc; let memory_exec2 = Arc::new(MemoryExec::try_new(&[], schema, None)?) as _; // memory_exec2 doesn't have output ordering - let union_exec = UnionExec::new(vec![sort_exec.clone(), memory_exec2]); + let union_exec = UnionExec::new(vec![Arc::clone(&sort_exec), memory_exec2]); let res = get_meet_of_orderings(union_exec.inputs()); assert!(res.is_none()); - let union_exec = UnionExec::new(vec![sort_exec.clone(), sort_exec]); + let union_exec = UnionExec::new(vec![Arc::clone(&sort_exec), sort_exec]); let res = get_meet_of_orderings(union_exec.inputs()); assert_eq!(res, Some(&sort_expr[..])); Ok(()) diff --git a/datafusion/physical-plan/src/empty.rs b/datafusion/physical-plan/src/empty.rs index 11af0624db15..4bacea48c347 100644 --- a/datafusion/physical-plan/src/empty.rs +++ b/datafusion/physical-plan/src/empty.rs @@ -47,7 +47,7 @@ pub struct EmptyExec { impl EmptyExec { /// Create a new EmptyExec pub fn new(schema: SchemaRef) -> Self { - let cache = Self::compute_properties(schema.clone(), 1); + let cache = Self::compute_properties(Arc::clone(&schema), 1); EmptyExec { schema, partitions: 1, @@ -142,7 +142,7 @@ impl ExecutionPlan for EmptyExec { Ok(Box::pin(MemoryStream::try_new( self.data()?, - self.schema.clone(), + Arc::clone(&self.schema), None, )?)) } @@ -170,7 +170,7 @@ mod tests { let task_ctx = Arc::new(TaskContext::default()); let schema = test::aggr_test_schema(); - let empty = EmptyExec::new(schema.clone()); + let empty = EmptyExec::new(Arc::clone(&schema)); assert_eq!(empty.schema(), schema); // we should have no results @@ -184,9 +184,12 @@ mod tests { #[test] fn with_new_children() -> Result<()> { let schema = test::aggr_test_schema(); - let empty = Arc::new(EmptyExec::new(schema.clone())); + let empty = Arc::new(EmptyExec::new(Arc::clone(&schema))); - let empty2 = with_new_children_if_necessary(empty.clone(), vec![])?; + let empty2 = with_new_children_if_necessary( + Arc::clone(&empty) as Arc, + vec![], + )?; assert_eq!(empty.schema(), empty2.schema()); let too_many_kids = vec![empty2]; @@ -204,7 +207,7 @@ mod tests { let empty = EmptyExec::new(schema); // ask for the wrong partition - assert!(empty.execute(1, task_ctx.clone()).is_err()); + assert!(empty.execute(1, Arc::clone(&task_ctx)).is_err()); assert!(empty.execute(20, task_ctx).is_err()); Ok(()) } diff --git a/datafusion/physical-plan/src/explain.rs b/datafusion/physical-plan/src/explain.rs index 4b2edbf2045d..56dc35e8819d 100644 --- a/datafusion/physical-plan/src/explain.rs +++ b/datafusion/physical-plan/src/explain.rs @@ -53,7 +53,7 @@ impl ExplainExec { stringified_plans: Vec, verbose: bool, ) -> Self { - let cache = Self::compute_properties(schema.clone()); + let cache = Self::compute_properties(Arc::clone(&schema)); ExplainExec { schema, stringified_plans, @@ -160,7 +160,7 @@ impl ExecutionPlan for ExplainExec { } let record_batch = RecordBatch::try_new( - self.schema.clone(), + Arc::clone(&self.schema), vec![ Arc::new(type_builder.finish()), Arc::new(plan_builder.finish()), @@ -171,7 +171,7 @@ impl ExecutionPlan for ExplainExec { "Before returning RecordBatchStream in ExplainExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id()); Ok(Box::pin(RecordBatchStreamAdapter::new( - self.schema.clone(), + Arc::clone(&self.schema), futures::stream::iter(vec![Ok(record_batch)]), ))) } diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 6153dbacfbff..f57d47a85779 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -78,7 +78,7 @@ impl FilterExec { Self::compute_properties(&input, &predicate, default_selectivity)?; Ok(Self { predicate, - input: input.clone(), + input: Arc::clone(&input), metrics: ExecutionPlanMetricsSet::new(), default_selectivity, cache, @@ -171,9 +171,9 @@ impl FilterExec { if let Some(binary) = conjunction.as_any().downcast_ref::() { if binary.op() == &Operator::Eq { if input_eqs.is_expr_constant(binary.left()) { - res_constants.push(binary.right().clone()) + res_constants.push(Arc::clone(binary.right())) } else if input_eqs.is_expr_constant(binary.right()) { - res_constants.push(binary.left().clone()) + res_constants.push(Arc::clone(binary.left())) } } } @@ -255,7 +255,7 @@ impl ExecutionPlan for FilterExec { self: Arc, mut children: Vec>, ) -> Result> { - FilterExec::try_new(self.predicate.clone(), children.swap_remove(0)) + FilterExec::try_new(Arc::clone(&self.predicate), children.swap_remove(0)) .and_then(|e| { let selectivity = e.default_selectivity(); e.with_default_selectivity(selectivity) @@ -272,7 +272,7 @@ impl ExecutionPlan for FilterExec { let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); Ok(Box::pin(FilterExecStream { schema: self.input.schema(), - predicate: self.predicate.clone(), + predicate: Arc::clone(&self.predicate), input: self.input.execute(partition, context)?, baseline_metrics, })) @@ -397,7 +397,7 @@ impl Stream for FilterExecStream { impl RecordBatchStream for FilterExecStream { fn schema(&self) -> SchemaRef { - self.schema.clone() + Arc::clone(&self.schema) } } @@ -1116,7 +1116,7 @@ mod tests { binary(col("c1", &schema)?, Operator::LtEq, lit(4i32), &schema)?, &schema, )?, - Arc::new(EmptyExec::new(schema.clone())), + Arc::new(EmptyExec::new(Arc::clone(&schema))), )?; exec.statistics().unwrap(); diff --git a/datafusion/physical-plan/src/insert.rs b/datafusion/physical-plan/src/insert.rs index 30c3353d4b71..1c21991d93c5 100644 --- a/datafusion/physical-plan/src/insert.rs +++ b/datafusion/physical-plan/src/insert.rs @@ -153,7 +153,7 @@ impl DataSinkExec { } else { // Check not null constraint on the input stream Ok(Box::pin(RecordBatchStreamAdapter::new( - self.sink_schema.clone(), + Arc::clone(&self.sink_schema), input_stream .map(move |batch| check_not_null_contraits(batch?, &risky_columns)), ))) @@ -252,9 +252,9 @@ impl ExecutionPlan for DataSinkExec { children: Vec>, ) -> Result> { Ok(Arc::new(Self::new( - children[0].clone(), - self.sink.clone(), - self.sink_schema.clone(), + Arc::clone(&children[0]), + Arc::clone(&self.sink), + Arc::clone(&self.sink_schema), self.sort_order.clone(), ))) } @@ -269,10 +269,10 @@ impl ExecutionPlan for DataSinkExec { if partition != 0 { return internal_err!("DataSinkExec can only be called on partition 0!"); } - let data = self.execute_input_stream(0, context.clone())?; + let data = self.execute_input_stream(0, Arc::clone(&context))?; - let count_schema = self.count_schema.clone(); - let sink = self.sink.clone(); + let count_schema = Arc::clone(&self.count_schema); + let sink = Arc::clone(&self.sink); let stream = futures::stream::once(async move { sink.write_all(data, &context).await.map(make_count_batch) diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 92443d06856a..33a9c061bf31 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -79,7 +79,7 @@ impl CrossJoinExec { }; let schema = Arc::new(Schema::new(all_columns)); - let cache = Self::compute_properties(&left, &right, schema.clone()); + let cache = Self::compute_properties(&left, &right, Arc::clone(&schema)); CrossJoinExec { left, right, @@ -220,8 +220,8 @@ impl ExecutionPlan for CrossJoinExec { children: Vec>, ) -> Result> { Ok(Arc::new(CrossJoinExec::new( - children[0].clone(), - children[1].clone(), + Arc::clone(&children[0]), + Arc::clone(&children[1]), ))) } @@ -237,7 +237,7 @@ impl ExecutionPlan for CrossJoinExec { partition: usize, context: Arc, ) -> Result { - let stream = self.right.execute(partition, context.clone())?; + let stream = self.right.execute(partition, Arc::clone(&context))?; let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics); @@ -247,7 +247,7 @@ impl ExecutionPlan for CrossJoinExec { let left_fut = self.left_fut.once(|| { load_left_input( - self.left.clone(), + Arc::clone(&self.left), context, join_metrics.clone(), reservation, @@ -255,7 +255,7 @@ impl ExecutionPlan for CrossJoinExec { }); Ok(Box::pin(CrossJoinStream { - schema: self.schema.clone(), + schema: Arc::clone(&self.schema), left_fut, right: stream, left_index: 0, @@ -337,7 +337,7 @@ struct CrossJoinStream { impl RecordBatchStream for CrossJoinStream { fn schema(&self) -> SchemaRef { - self.schema.clone() + Arc::clone(&self.schema) } } diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index b2f9ef560745..197516028ac4 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -367,7 +367,7 @@ impl HashJoinExec { let cache = Self::compute_properties( &left, &right, - join_schema.clone(), + Arc::clone(&join_schema), *join_type, &on, partition_mode, @@ -461,8 +461,8 @@ impl HashJoinExec { None => None, }; Self::try_new( - self.left.clone(), - self.right.clone(), + Arc::clone(&self.left), + Arc::clone(&self.right), self.on.clone(), self.filter.clone(), &self.join_type, @@ -487,7 +487,7 @@ impl HashJoinExec { left.equivalence_properties().clone(), right.equivalence_properties().clone(), &join_type, - schema.clone(), + Arc::clone(&schema), &Self::maintains_input_order(join_type), Some(Self::probe_side()), on, @@ -635,8 +635,11 @@ impl ExecutionPlan for HashJoinExec { Distribution::UnspecifiedDistribution, ], PartitionMode::Partitioned => { - let (left_expr, right_expr) = - self.on.iter().map(|(l, r)| (l.clone(), r.clone())).unzip(); + let (left_expr, right_expr) = self + .on + .iter() + .map(|(l, r)| (Arc::clone(l), Arc::clone(r))) + .unzip(); vec![ Distribution::HashPartitioned(left_expr), Distribution::HashPartitioned(right_expr), @@ -678,8 +681,8 @@ impl ExecutionPlan for HashJoinExec { children: Vec>, ) -> Result> { Ok(Arc::new(HashJoinExec::try_new( - children[0].clone(), - children[1].clone(), + Arc::clone(&children[0]), + Arc::clone(&children[1]), self.on.clone(), self.filter.clone(), &self.join_type, @@ -694,8 +697,16 @@ impl ExecutionPlan for HashJoinExec { partition: usize, context: Arc, ) -> Result { - let on_left = self.on.iter().map(|on| on.0.clone()).collect::>(); - let on_right = self.on.iter().map(|on| on.1.clone()).collect::>(); + let on_left = self + .on + .iter() + .map(|on| Arc::clone(&on.0)) + .collect::>(); + let on_right = self + .on + .iter() + .map(|on| Arc::clone(&on.1)) + .collect::>(); let left_partitions = self.left.output_partitioning().partition_count(); let right_partitions = self.right.output_partitioning().partition_count(); @@ -715,9 +726,9 @@ impl ExecutionPlan for HashJoinExec { collect_left_input( None, self.random_state.clone(), - self.left.clone(), + Arc::clone(&self.left), on_left.clone(), - context.clone(), + Arc::clone(&context), join_metrics.clone(), reservation, need_produce_result_in_final(self.join_type), @@ -732,9 +743,9 @@ impl ExecutionPlan for HashJoinExec { OnceFut::new(collect_left_input( Some(partition), self.random_state.clone(), - self.left.clone(), + Arc::clone(&self.left), on_left.clone(), - context.clone(), + Arc::clone(&context), join_metrics.clone(), reservation, need_produce_result_in_final(self.join_type), @@ -791,8 +802,8 @@ impl ExecutionPlan for HashJoinExec { // There are some special cases though, for example: // - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)` let mut stats = estimate_join_statistics( - self.left.clone(), - self.right.clone(), + Arc::clone(&self.left), + Arc::clone(&self.right), self.on.clone(), &self.join_type, &self.join_schema, @@ -836,7 +847,7 @@ async fn collect_left_input( }; // Depending on partition argument load single partition or whole left side in memory - let stream = left_input.execute(left_input_partition, context.clone())?; + let stream = left_input.execute(left_input_partition, Arc::clone(&context))?; // This operation performs 2 steps at once: // 1. creates a [JoinHashMap] of all batches from the stream @@ -1111,7 +1122,7 @@ struct HashJoinStream { impl RecordBatchStream for HashJoinStream { fn schema(&self) -> SchemaRef { - self.schema.clone() + Arc::clone(&self.schema) } } @@ -1669,8 +1680,10 @@ mod tests { ) -> Result<(Vec, Vec)> { let partition_count = 4; - let (left_expr, right_expr) = - on.iter().map(|(l, r)| (l.clone(), r.clone())).unzip(); + let (left_expr, right_expr) = on + .iter() + .map(|(l, r)| (Arc::clone(l), Arc::clone(r))) + .unzip(); let left_repartitioned: Arc = match partition_mode { PartitionMode::CollectLeft => Arc::new(CoalescePartitionsExec::new(left)), @@ -1719,7 +1732,7 @@ mod tests { let mut batches = vec![]; for i in 0..partition_count { - let stream = join.execute(i, context.clone())?; + let stream = join.execute(i, Arc::clone(&context))?; let more_batches = common::collect(stream).await?; batches.extend( more_batches @@ -1753,8 +1766,8 @@ mod tests { )]; let (columns, batches) = join_collect( - left.clone(), - right.clone(), + Arc::clone(&left), + Arc::clone(&right), on.clone(), &JoinType::Inner, false, @@ -1800,8 +1813,8 @@ mod tests { )]; let (columns, batches) = partitioned_join_collect( - left.clone(), - right.clone(), + Arc::clone(&left), + Arc::clone(&right), on.clone(), &JoinType::Inner, false, @@ -2104,7 +2117,7 @@ mod tests { assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]); // first part - let stream = join.execute(0, task_ctx.clone())?; + let stream = join.execute(0, Arc::clone(&task_ctx))?; let batches = common::collect(stream).await?; // expected joined records = 1 (first right batch) @@ -2127,7 +2140,7 @@ mod tests { assert_batches_eq!(expected, &batches); // second part - let stream = join.execute(1, task_ctx.clone())?; + let stream = join.execute(1, Arc::clone(&task_ctx))?; let batches = common::collect(stream).await?; // expected joined records = 2 (second right batch) @@ -2342,8 +2355,8 @@ mod tests { )]; let (columns, batches) = join_collect( - left.clone(), - right.clone(), + Arc::clone(&left), + Arc::clone(&right), on.clone(), &JoinType::Left, false, @@ -2386,8 +2399,8 @@ mod tests { )]; let (columns, batches) = partitioned_join_collect( - left.clone(), - right.clone(), + Arc::clone(&left), + Arc::clone(&right), on.clone(), &JoinType::Left, false, @@ -2498,8 +2511,8 @@ mod tests { ); let join = join_with_filter( - left.clone(), - right.clone(), + Arc::clone(&left), + Arc::clone(&right), on.clone(), filter, &JoinType::LeftSemi, @@ -2509,7 +2522,7 @@ mod tests { let columns_header = columns(&join.schema()); assert_eq!(columns_header.clone(), vec!["a1", "b1", "c1"]); - let stream = join.execute(0, task_ctx.clone())?; + let stream = join.execute(0, Arc::clone(&task_ctx))?; let batches = common::collect(stream).await?; let expected = [ @@ -2622,8 +2635,8 @@ mod tests { ); let join = join_with_filter( - left.clone(), - right.clone(), + Arc::clone(&left), + Arc::clone(&right), on.clone(), filter, &JoinType::RightSemi, @@ -2633,7 +2646,7 @@ mod tests { let columns = columns(&join.schema()); assert_eq!(columns, vec!["a2", "b2", "c2"]); - let stream = join.execute(0, task_ctx.clone())?; + let stream = join.execute(0, Arc::clone(&task_ctx))?; let batches = common::collect(stream).await?; let expected = [ @@ -2744,8 +2757,8 @@ mod tests { ); let join = join_with_filter( - left.clone(), - right.clone(), + Arc::clone(&left), + Arc::clone(&right), on.clone(), filter, &JoinType::LeftAnti, @@ -2755,7 +2768,7 @@ mod tests { let columns_header = columns(&join.schema()); assert_eq!(columns_header, vec!["a1", "b1", "c1"]); - let stream = join.execute(0, task_ctx.clone())?; + let stream = join.execute(0, Arc::clone(&task_ctx))?; let batches = common::collect(stream).await?; let expected = [ @@ -2873,8 +2886,8 @@ mod tests { ); let join = join_with_filter( - left.clone(), - right.clone(), + Arc::clone(&left), + Arc::clone(&right), on.clone(), filter, &JoinType::RightAnti, @@ -2884,7 +2897,7 @@ mod tests { let columns_header = columns(&join.schema()); assert_eq!(columns_header, vec!["a2", "b2", "c2"]); - let stream = join.execute(0, task_ctx.clone())?; + let stream = join.execute(0, Arc::clone(&task_ctx))?; let batches = common::collect(stream).await?; let expected = [ @@ -3074,8 +3087,11 @@ mod tests { let random_state = RandomState::with_seeds(0, 0, 0, 0); let hashes_buff = &mut vec![0; left.num_rows()]; - let hashes = - create_hashes(&[left.columns()[0].clone()], &random_state, hashes_buff)?; + let hashes = create_hashes( + &[Arc::clone(&left.columns()[0])], + &random_state, + hashes_buff, + )?; // Create hash collisions (same hashes) hashmap_left.insert(hashes[0], (hashes[0], 1), |(h, _)| *h); @@ -3103,7 +3119,7 @@ mod tests { &join_hash_map, &left, &right, - &[key_column.clone()], + &[Arc::clone(&key_column)], &[key_column], false, &hashes_buffer, @@ -3463,13 +3479,13 @@ mod tests { for (join_type, expected) in test_cases { let (_, batches) = join_collect_with_partition_mode( - left.clone(), - right.clone(), + Arc::clone(&left), + Arc::clone(&right), on.clone(), &join_type, PartitionMode::CollectLeft, false, - task_ctx.clone(), + Arc::clone(&task_ctx), ) .await?; assert_batches_sorted_eq!(expected, &batches); @@ -3487,13 +3503,14 @@ mod tests { let dates: ArrayRef = Arc::new(Date32Array::from(vec![19107, 19108, 19109])); let n: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3])); - let batch = RecordBatch::try_new(schema.clone(), vec![dates, n])?; - let left = - Arc::new(MemoryExec::try_new(&[vec![batch]], schema.clone(), None).unwrap()); + let batch = RecordBatch::try_new(Arc::clone(&schema), vec![dates, n])?; + let left = Arc::new( + MemoryExec::try_new(&[vec![batch]], Arc::clone(&schema), None).unwrap(), + ); let dates: ArrayRef = Arc::new(Date32Array::from(vec![19108, 19108, 19109])); let n: ArrayRef = Arc::new(Int32Array::from(vec![4, 5, 6])); - let batch = RecordBatch::try_new(schema.clone(), vec![dates, n])?; + let batch = RecordBatch::try_new(Arc::clone(&schema), vec![dates, n])?; let right = Arc::new(MemoryExec::try_new(&[vec![batch]], schema, None).unwrap()); let on = vec![( @@ -3555,8 +3572,8 @@ mod tests { for join_type in join_types { let join = join( - left.clone(), - right_input.clone(), + Arc::clone(&left), + Arc::clone(&right_input) as Arc, on.clone(), &join_type, false, @@ -3671,9 +3688,14 @@ mod tests { for batch_size in (1..21).rev() { let task_ctx = prepare_task_ctx(batch_size); - let join = - join(left.clone(), right.clone(), on.clone(), &join_type, false) - .unwrap(); + let join = join( + Arc::clone(&left), + Arc::clone(&right), + on.clone(), + &join_type, + false, + ) + .unwrap(); let stream = join.execute(0, task_ctx).unwrap(); let batches = common::collect(stream).await.unwrap(); @@ -3746,7 +3768,13 @@ mod tests { let task_ctx = TaskContext::default().with_runtime(runtime); let task_ctx = Arc::new(task_ctx); - let join = join(left.clone(), right.clone(), on.clone(), &join_type, false)?; + let join = join( + Arc::clone(&left), + Arc::clone(&right), + on.clone(), + &join_type, + false, + )?; let stream = join.execute(0, task_ctx)?; let err = common::collect(stream).await.unwrap_err(); @@ -3819,8 +3847,8 @@ mod tests { let task_ctx = Arc::new(task_ctx); let join = HashJoinExec::try_new( - left.clone(), - right.clone(), + Arc::clone(&left) as Arc, + Arc::clone(&right) as Arc, on.clone(), None, &join_type, diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 6be124cce06f..caaa6a05e92b 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -173,7 +173,8 @@ impl NestedLoopJoinExec { let (schema, column_indices) = build_join_schema(&left_schema, &right_schema, join_type); let schema = Arc::new(schema); - let cache = Self::compute_properties(&left, &right, schema.clone(), *join_type); + let cache = + Self::compute_properties(&left, &right, Arc::clone(&schema), *join_type); Ok(NestedLoopJoinExec { left, @@ -287,8 +288,8 @@ impl ExecutionPlan for NestedLoopJoinExec { children: Vec>, ) -> Result> { Ok(Arc::new(NestedLoopJoinExec::try_new( - children[0].clone(), - children[1].clone(), + Arc::clone(&children[0]), + Arc::clone(&children[1]), self.filter.clone(), &self.join_type, )?)) @@ -308,8 +309,8 @@ impl ExecutionPlan for NestedLoopJoinExec { let inner_table = self.inner_table.once(|| { collect_left_input( - self.left.clone(), - context.clone(), + Arc::clone(&self.left), + Arc::clone(&context), join_metrics.clone(), load_reservation, need_produce_result_in_final(self.join_type), @@ -319,7 +320,7 @@ impl ExecutionPlan for NestedLoopJoinExec { let outer_table = self.right.execute(partition, context)?; Ok(Box::pin(NestedLoopJoinStream { - schema: self.schema.clone(), + schema: Arc::clone(&self.schema), filter: self.filter.clone(), join_type: self.join_type, outer_table, @@ -336,8 +337,8 @@ impl ExecutionPlan for NestedLoopJoinExec { fn statistics(&self) -> Result { estimate_join_statistics( - self.left.clone(), - self.right.clone(), + Arc::clone(&self.left), + Arc::clone(&self.right), vec![], &self.join_type, &self.schema, @@ -641,7 +642,7 @@ impl Stream for NestedLoopJoinStream { impl RecordBatchStream for NestedLoopJoinStream { fn schema(&self) -> SchemaRef { - self.schema.clone() + Arc::clone(&self.schema) } } @@ -752,7 +753,7 @@ mod tests { let columns = columns(&nested_loop_join.schema()); let mut batches = vec![]; for i in 0..partition_count { - let stream = nested_loop_join.execute(i, context.clone())?; + let stream = nested_loop_join.execute(i, Arc::clone(&context))?; let more_batches = common::collect(stream).await?; batches.extend( more_batches @@ -1037,8 +1038,8 @@ mod tests { let task_ctx = Arc::new(task_ctx); let err = multi_partitioned_join_collect( - left.clone(), - right.clone(), + Arc::clone(&left), + Arc::clone(&right), &join_type, Some(filter.clone()), task_ctx, diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index 91b2151d32e7..e9124a72970a 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -126,11 +126,11 @@ impl SortMergeJoinExec { .zip(sort_options.iter()) .map(|((l, r), sort_op)| { let left = PhysicalSortExpr { - expr: l.clone(), + expr: Arc::clone(l), options: *sort_op, }; let right = PhysicalSortExpr { - expr: r.clone(), + expr: Arc::clone(r), options: *sort_op, }; (left, right) @@ -140,7 +140,7 @@ impl SortMergeJoinExec { let schema = Arc::new(build_join_schema(&left_schema, &right_schema, &join_type).0); let cache = - Self::compute_properties(&left, &right, schema.clone(), join_type, &on); + Self::compute_properties(&left, &right, Arc::clone(&schema), join_type, &on); Ok(Self { left, right, @@ -271,8 +271,11 @@ impl ExecutionPlan for SortMergeJoinExec { } fn required_input_distribution(&self) -> Vec { - let (left_expr, right_expr) = - self.on.iter().map(|(l, r)| (l.clone(), r.clone())).unzip(); + let (left_expr, right_expr) = self + .on + .iter() + .map(|(l, r)| (Arc::clone(l), Arc::clone(r))) + .unzip(); vec![ Distribution::HashPartitioned(left_expr), Distribution::HashPartitioned(right_expr), @@ -304,8 +307,8 @@ impl ExecutionPlan for SortMergeJoinExec { ) -> Result> { match &children[..] { [left, right] => Ok(Arc::new(SortMergeJoinExec::try_new( - left.clone(), - right.clone(), + Arc::clone(left), + Arc::clone(right), self.on.clone(), self.filter.clone(), self.join_type, @@ -332,14 +335,24 @@ impl ExecutionPlan for SortMergeJoinExec { let (on_left, on_right) = self.on.iter().cloned().unzip(); let (streamed, buffered, on_streamed, on_buffered) = if SortMergeJoinExec::probe_side(&self.join_type) == JoinSide::Left { - (self.left.clone(), self.right.clone(), on_left, on_right) + ( + Arc::clone(&self.left), + Arc::clone(&self.right), + on_left, + on_right, + ) } else { - (self.right.clone(), self.left.clone(), on_right, on_left) + ( + Arc::clone(&self.right), + Arc::clone(&self.left), + on_right, + on_left, + ) }; // execute children plans - let streamed = streamed.execute(partition, context.clone())?; - let buffered = buffered.execute(partition, context.clone())?; + let streamed = streamed.execute(partition, Arc::clone(&context))?; + let buffered = buffered.execute(partition, Arc::clone(&context))?; // create output buffer let batch_size = context.session_config().batch_size(); @@ -350,7 +363,7 @@ impl ExecutionPlan for SortMergeJoinExec { // create join stream Ok(Box::pin(SMJStream::try_new( - self.schema.clone(), + Arc::clone(&self.schema), self.sort_options.clone(), self.null_equals_null, streamed, @@ -374,8 +387,8 @@ impl ExecutionPlan for SortMergeJoinExec { // There are some special cases though, for example: // - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)` estimate_join_statistics( - self.left.clone(), - self.right.clone(), + Arc::clone(&self.left), + Arc::clone(&self.right), self.on.clone(), &self.join_type, &self.schema, @@ -657,7 +670,7 @@ struct SMJStream { impl RecordBatchStream for SMJStream { fn schema(&self) -> SchemaRef { - self.schema.clone() + Arc::clone(&self.schema) } } @@ -780,7 +793,7 @@ impl SMJStream { sort_options, null_equals_null, schema, - streamed_schema: streamed_schema.clone(), + streamed_schema: Arc::clone(&streamed_schema), buffered_schema, streamed, buffered, @@ -1233,7 +1246,7 @@ impl SMJStream { }; let output_batch = - RecordBatch::try_new(self.schema.clone(), columns.clone())?; + RecordBatch::try_new(Arc::clone(&self.schema), columns.clone())?; // Apply join filter if any if !filter_columns.is_empty() { @@ -1353,8 +1366,10 @@ impl SMJStream { }; // Push the streamed/buffered batch joined nulls to the output - let null_joined_streamed_batch = - RecordBatch::try_new(self.schema.clone(), columns.clone())?; + let null_joined_streamed_batch = RecordBatch::try_new( + Arc::clone(&self.schema), + columns.clone(), + )?; self.output_record_batches.push(null_joined_streamed_batch); // For full join, we also need to output the null joined rows from the buffered side. @@ -1430,14 +1445,14 @@ fn get_filter_column( .column_indices() .iter() .filter(|col_index| col_index.side == JoinSide::Left) - .map(|i| streamed_columns[i.index].clone()) + .map(|i| Arc::clone(&streamed_columns[i.index])) .collect::>(); let right_columns = f .column_indices() .iter() .filter(|col_index| col_index.side == JoinSide::Right) - .map(|i| buffered_columns[i.index].clone()) + .map(|i| Arc::clone(&buffered_columns[i.index])) .collect::>(); filter_columns.extend(left_columns); @@ -1476,7 +1491,7 @@ fn produce_buffered_null_batch( streamed_columns.extend(buffered_columns); Ok(Some(RecordBatch::try_new( - schema.clone(), + Arc::clone(schema), streamed_columns, )?)) } @@ -1927,7 +1942,7 @@ mod tests { Field::new(c.0, DataType::Int32, true), ])); let batch = RecordBatch::try_new( - schema.clone(), + Arc::clone(&schema), vec![ Arc::new(Int32Array::from(a.1.clone())), Arc::new(Int32Array::from(b.1.clone())), @@ -2771,8 +2786,8 @@ mod tests { let task_ctx = Arc::new(task_ctx); let join = join_with_options( - left.clone(), - right.clone(), + Arc::clone(&left), + Arc::clone(&right), on.clone(), join_type, sort_options.clone(), @@ -2849,8 +2864,8 @@ mod tests { .with_runtime(runtime); let task_ctx = Arc::new(task_ctx); let join = join_with_options( - left.clone(), - right.clone(), + Arc::clone(&left), + Arc::clone(&right), on.clone(), join_type, sort_options.clone(), diff --git a/datafusion/physical-plan/src/joins/stream_join_utils.rs b/datafusion/physical-plan/src/joins/stream_join_utils.rs index 46d3ac5acf1e..ba9384aef1a6 100644 --- a/datafusion/physical-plan/src/joins/stream_join_utils.rs +++ b/datafusion/physical-plan/src/joins/stream_join_utils.rs @@ -270,7 +270,7 @@ pub fn convert_sort_expr_with_filter_schema( sort_expr: &PhysicalSortExpr, ) -> Result>> { let column_map = map_origin_col_to_filter_col(filter, schema, side)?; - let expr = sort_expr.expr.clone(); + let expr = Arc::clone(&sort_expr.expr); // Get main schema columns: let expr_columns = collect_columns(&expr); // Calculation is possible with `column_map` since sort exprs belong to a child. @@ -697,7 +697,7 @@ fn update_sorted_exprs_with_node_indices( // Extract filter expressions from the sorted expressions: let filter_exprs = sorted_exprs .iter() - .map(|expr| expr.filter_expr().clone()) + .map(|expr| Arc::clone(expr.filter_expr())) .collect::>(); // Gather corresponding node indices for the extracted filter expressions from the graph: @@ -756,7 +756,7 @@ pub fn prepare_sorted_exprs( // Build the expression interval graph let mut graph = - ExprIntervalGraph::try_new(filter.expression().clone(), filter.schema())?; + ExprIntervalGraph::try_new(Arc::clone(filter.expression()), filter.schema())?; // Update sorted expressions with node indices update_sorted_exprs_with_node_indices(&mut graph, &mut sorted_exprs); @@ -818,9 +818,9 @@ pub mod tests { &intermediate_schema, )?; let filter_expr = binary( - filter_left.clone(), + Arc::clone(&filter_left), Operator::Gt, - filter_right.clone(), + Arc::clone(&filter_right), &intermediate_schema, )?; let column_indices = vec![ diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index 813f670147bc..c23dc2032c4b 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -233,7 +233,7 @@ impl SymmetricHashJoinExec { let random_state = RandomState::with_seeds(0, 0, 0, 0); let schema = Arc::new(schema); let cache = - Self::compute_properties(&left, &right, schema.clone(), *join_type, &on); + Self::compute_properties(&left, &right, Arc::clone(&schema), *join_type, &on); Ok(SymmetricHashJoinExec { left, right, @@ -397,7 +397,7 @@ impl ExecutionPlan for SymmetricHashJoinExec { let (left_expr, right_expr) = self .on .iter() - .map(|(l, r)| (l.clone() as _, r.clone() as _)) + .map(|(l, r)| (Arc::clone(l) as _, Arc::clone(r) as _)) .unzip(); vec![ Distribution::HashPartitioned(left_expr), @@ -430,8 +430,8 @@ impl ExecutionPlan for SymmetricHashJoinExec { children: Vec>, ) -> Result> { Ok(Arc::new(SymmetricHashJoinExec::try_new( - children[0].clone(), - children[1].clone(), + Arc::clone(&children[0]), + Arc::clone(&children[1]), self.on.clone(), self.filter.clone(), &self.join_type, @@ -489,9 +489,9 @@ impl ExecutionPlan for SymmetricHashJoinExec { let right_side_joiner = OneSideHashJoiner::new(JoinSide::Right, on_right, self.right.schema()); - let left_stream = self.left.execute(partition, context.clone())?; + let left_stream = self.left.execute(partition, Arc::clone(&context))?; - let right_stream = self.right.execute(partition, context.clone())?; + let right_stream = self.right.execute(partition, Arc::clone(&context))?; let reservation = Arc::new(Mutex::new( MemoryConsumer::new(format!("SymmetricHashJoinStream[{partition}]")) @@ -559,7 +559,7 @@ struct SymmetricHashJoinStream { impl RecordBatchStream for SymmetricHashJoinStream { fn schema(&self) -> SchemaRef { - self.schema.clone() + Arc::clone(&self.schema) } } @@ -1634,13 +1634,13 @@ mod tests { task_ctx: Arc, ) -> Result<()> { let first_batches = partitioned_sym_join_with_filter( - left.clone(), - right.clone(), + Arc::clone(&left), + Arc::clone(&right), on.clone(), filter.clone(), &join_type, false, - task_ctx.clone(), + Arc::clone(&task_ctx), ) .await?; let second_batches = partitioned_hash_join_with_filter( diff --git a/datafusion/physical-plan/src/joins/test_utils.rs b/datafusion/physical-plan/src/joins/test_utils.rs index 7e05ded6f69d..264f297ffb4c 100644 --- a/datafusion/physical-plan/src/joins/test_utils.rs +++ b/datafusion/physical-plan/src/joins/test_utils.rs @@ -78,17 +78,23 @@ pub async fn partitioned_sym_join_with_filter( ) -> Result> { let partition_count = 4; - let left_expr = on.iter().map(|(l, _)| l.clone() as _).collect::>(); + let left_expr = on + .iter() + .map(|(l, _)| Arc::clone(l) as _) + .collect::>(); - let right_expr = on.iter().map(|(_, r)| r.clone() as _).collect::>(); + let right_expr = on + .iter() + .map(|(_, r)| Arc::clone(r) as _) + .collect::>(); let join = SymmetricHashJoinExec::try_new( Arc::new(RepartitionExec::try_new( - left.clone(), + Arc::clone(&left), Partitioning::Hash(left_expr, partition_count), )?), Arc::new(RepartitionExec::try_new( - right.clone(), + Arc::clone(&right), Partitioning::Hash(right_expr, partition_count), )?), on, @@ -102,7 +108,7 @@ pub async fn partitioned_sym_join_with_filter( let mut batches = vec![]; for i in 0..partition_count { - let stream = join.execute(i, context.clone())?; + let stream = join.execute(i, Arc::clone(&context))?; let more_batches = common::collect(stream).await?; batches.extend( more_batches @@ -127,7 +133,7 @@ pub async fn partitioned_hash_join_with_filter( let partition_count = 4; let (left_expr, right_expr) = on .iter() - .map(|(l, r)| (l.clone() as _, r.clone() as _)) + .map(|(l, r)| (Arc::clone(l) as _, Arc::clone(r) as _)) .unzip(); let join = Arc::new(HashJoinExec::try_new( @@ -149,7 +155,7 @@ pub async fn partitioned_hash_join_with_filter( let mut batches = vec![]; for i in 0..partition_count { - let stream = join.execute(i, context.clone())?; + let stream = join.execute(i, Arc::clone(&context))?; let more_batches = common::collect(stream).await?; batches.extend( more_batches @@ -475,20 +481,29 @@ pub fn build_sides_record_batches( )); let left = RecordBatch::try_from_iter(vec![ - ("la1", ordered.clone()), - ("lb1", cardinality.clone()), + ("la1", Arc::clone(&ordered)), + ("lb1", Arc::clone(&cardinality) as ArrayRef), ("lc1", cardinality_key_left), - ("lt1", time.clone()), - ("la2", ordered.clone()), - ("la1_des", ordered_des.clone()), - ("l_asc_null_first", ordered_asc_null_first.clone()), - ("l_asc_null_last", ordered_asc_null_last.clone()), - ("l_desc_null_first", ordered_desc_null_first.clone()), - ("li1", interval_time.clone()), - ("l_float", float_asc.clone()), + ("lt1", Arc::clone(&time) as ArrayRef), + ("la2", Arc::clone(&ordered)), + ("la1_des", Arc::clone(&ordered_des) as ArrayRef), + ( + "l_asc_null_first", + Arc::clone(&ordered_asc_null_first) as ArrayRef, + ), + ( + "l_asc_null_last", + Arc::clone(&ordered_asc_null_last) as ArrayRef, + ), + ( + "l_desc_null_first", + Arc::clone(&ordered_desc_null_first) as ArrayRef, + ), + ("li1", Arc::clone(&interval_time)), + ("l_float", Arc::clone(&float_asc) as ArrayRef), ])?; let right = RecordBatch::try_from_iter(vec![ - ("ra1", ordered.clone()), + ("ra1", Arc::clone(&ordered)), ("rb1", cardinality), ("rc1", cardinality_key_right), ("rt1", time), diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index dfa1fd4763f4..37949b3027cd 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -439,7 +439,7 @@ pub fn adjust_right_output_partitioning( Partitioning::Hash(exprs, size) => { let new_exprs = exprs .iter() - .map(|expr| add_offset_to_expr(expr.clone(), left_columns_len)) + .map(|expr| add_offset_to_expr(Arc::clone(expr), left_columns_len)) .collect(); Partitioning::Hash(new_exprs, *size) } @@ -455,12 +455,10 @@ fn replace_on_columns_of_right_ordering( ) -> Result<()> { for (left_col, right_col) in on_columns { for item in right_ordering.iter_mut() { - let new_expr = item - .expr - .clone() + let new_expr = Arc::clone(&item.expr) .transform(|e| { if e.eq(right_col) { - Ok(Transformed::yes(left_col.clone())) + Ok(Transformed::yes(Arc::clone(left_col))) } else { Ok(Transformed::no(e)) } @@ -483,7 +481,7 @@ fn offset_ordering( JoinType::Inner | JoinType::Left | JoinType::Full | JoinType::Right => ordering .iter() .map(|sort_expr| PhysicalSortExpr { - expr: add_offset_to_expr(sort_expr.expr.clone(), offset), + expr: add_offset_to_expr(Arc::clone(&sort_expr.expr), offset), options: sort_expr.options, }) .collect(), @@ -1121,7 +1119,7 @@ impl OnceFut { OnceFutState::Ready(r) => Poll::Ready( r.as_ref() .map(|r| r.as_ref()) - .map_err(|e| DataFusionError::External(Box::new(e.clone()))), + .map_err(|e| DataFusionError::External(Box::new(Arc::clone(e)))), ), } } diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index c648547c98b1..e3387cfbaed3 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -14,6 +14,8 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. +// Make cheap clones clear: https://github.com/apache/datafusion/issues/11143 +#![deny(clippy::clone_on_ref_ptr)] //! Traits for physical query plan, supporting parallel execution for partitioned relations. @@ -144,7 +146,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// Get the schema for this execution plan fn schema(&self) -> SchemaRef { - self.properties().schema().clone() + Arc::clone(self.properties().schema()) } /// Return properties of the output of the `ExecutionPlan`, such as output @@ -725,7 +727,7 @@ pub fn execute_stream( 1 => plan.execute(0, context), _ => { // merge into a single partition - let plan = CoalescePartitionsExec::new(plan.clone()); + let plan = CoalescePartitionsExec::new(Arc::clone(&plan)); // CoalescePartitionsExec must produce a single partition assert_eq!(1, plan.properties().output_partitioning().partition_count()); plan.execute(0, context) @@ -787,7 +789,7 @@ pub fn execute_stream_partitioned( let num_partitions = plan.output_partitioning().partition_count(); let mut streams = Vec::with_capacity(num_partitions); for i in 0..num_partitions { - streams.push(plan.execute(i, context.clone())?); + streams.push(plan.execute(i, Arc::clone(&context))?); } Ok(streams) } diff --git a/datafusion/physical-plan/src/limit.rs b/datafusion/physical-plan/src/limit.rs index 4c6d1b3674d5..9c77a3d05cc2 100644 --- a/datafusion/physical-plan/src/limit.rs +++ b/datafusion/physical-plan/src/limit.rs @@ -145,7 +145,7 @@ impl ExecutionPlan for GlobalLimitExec { children: Vec>, ) -> Result> { Ok(Arc::new(GlobalLimitExec::new( - children[0].clone(), + Arc::clone(&children[0]), self.skip, self.fetch, ))) @@ -352,7 +352,7 @@ impl ExecutionPlan for LocalLimitExec { ) -> Result> { match children.len() { 1 => Ok(Arc::new(LocalLimitExec::new( - children[0].clone(), + Arc::clone(&children[0]), self.fetch, ))), _ => internal_err!("LocalLimitExec wrong number of children"), @@ -551,7 +551,7 @@ impl Stream for LimitStream { impl RecordBatchStream for LimitStream { /// Get the schema fn schema(&self) -> SchemaRef { - self.schema.clone() + Arc::clone(&self.schema) } } @@ -864,11 +864,11 @@ mod tests { // Adding a "GROUP BY i" changes the input stats from Exact to Inexact. let agg = AggregateExec::try_new( AggregateMode::Final, - build_group_by(&csv.schema().clone(), vec!["i".to_string()]), + build_group_by(&csv.schema(), vec!["i".to_string()]), vec![], vec![], - csv.clone(), - csv.schema().clone(), + Arc::clone(&csv), + Arc::clone(&csv.schema()), )?; let agg_exec: Arc = Arc::new(agg); diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs index 39ae8d551f4b..6b2c78902eae 100644 --- a/datafusion/physical-plan/src/memory.rs +++ b/datafusion/physical-plan/src/memory.rs @@ -140,7 +140,7 @@ impl ExecutionPlan for MemoryExec { ) -> Result { Ok(Box::pin(MemoryStream::try_new( self.partitions[partition].clone(), - self.projected_schema.clone(), + Arc::clone(&self.projected_schema), self.projection.clone(), )?)) } @@ -164,7 +164,8 @@ impl MemoryExec { projection: Option>, ) -> Result { let projected_schema = project_schema(&schema, projection.as_ref())?; - let cache = Self::compute_properties(projected_schema.clone(), &[], partitions); + let cache = + Self::compute_properties(Arc::clone(&projected_schema), &[], partitions); Ok(Self { partitions: partitions.to_vec(), schema, @@ -219,7 +220,7 @@ impl MemoryExec { } pub fn original_schema(&self) -> SchemaRef { - self.schema.clone() + Arc::clone(&self.schema) } /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. @@ -305,7 +306,7 @@ impl Stream for MemoryStream { impl RecordBatchStream for MemoryStream { /// Get the schema fn schema(&self) -> SchemaRef { - self.schema.clone() + Arc::clone(&self.schema) } } diff --git a/datafusion/physical-plan/src/placeholder_row.rs b/datafusion/physical-plan/src/placeholder_row.rs index 3b10cc0ac435..272211d5056e 100644 --- a/datafusion/physical-plan/src/placeholder_row.rs +++ b/datafusion/physical-plan/src/placeholder_row.rs @@ -50,7 +50,7 @@ impl PlaceholderRowExec { /// Create a new PlaceholderRowExec pub fn new(schema: SchemaRef) -> Self { let partitions = 1; - let cache = Self::compute_properties(schema.clone(), partitions); + let cache = Self::compute_properties(Arc::clone(&schema), partitions); PlaceholderRowExec { schema, partitions, @@ -160,7 +160,7 @@ impl ExecutionPlan for PlaceholderRowExec { Ok(Box::pin(MemoryStream::try_new( self.data()?, - self.schema.clone(), + Arc::clone(&self.schema), None, )?)) } @@ -188,7 +188,10 @@ mod tests { let placeholder = Arc::new(PlaceholderRowExec::new(schema)); - let placeholder_2 = with_new_children_if_necessary(placeholder.clone(), vec![])?; + let placeholder_2 = with_new_children_if_necessary( + Arc::clone(&placeholder) as Arc, + vec![], + )?; assert_eq!(placeholder.schema(), placeholder_2.schema()); let too_many_kids = vec![placeholder_2]; @@ -206,7 +209,7 @@ mod tests { let placeholder = PlaceholderRowExec::new(schema); // ask for the wrong partition - assert!(placeholder.execute(1, task_ctx.clone()).is_err()); + assert!(placeholder.execute(1, Arc::clone(&task_ctx)).is_err()); assert!(placeholder.execute(20, task_ctx).is_err()); Ok(()) } @@ -234,7 +237,7 @@ mod tests { let placeholder = PlaceholderRowExec::new(schema).with_partitions(partitions); for n in 0..partitions { - let iter = placeholder.execute(n, task_ctx.clone())?; + let iter = placeholder.execute(n, Arc::clone(&task_ctx))?; let batches = common::collect(iter).await?; // should have one item diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index 8341549340dd..9efa0422ec75 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -94,7 +94,7 @@ impl ProjectionExec { // construct a map from the input expressions to the output expression of the Projection let projection_mapping = ProjectionMapping::try_new(&expr, &input_schema)?; let cache = - Self::compute_properties(&input, &projection_mapping, schema.clone())?; + Self::compute_properties(&input, &projection_mapping, Arc::clone(&schema))?; Ok(Self { expr, schema, @@ -227,8 +227,8 @@ impl ExecutionPlan for ProjectionExec { ) -> Result { trace!("Start ProjectionExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id()); Ok(Box::pin(ProjectionStream { - schema: self.schema.clone(), - expr: self.expr.iter().map(|x| x.0.clone()).collect(), + schema: Arc::clone(&self.schema), + expr: self.expr.iter().map(|x| Arc::clone(&x.0)).collect(), input: self.input.execute(partition, context)?, baseline_metrics: BaselineMetrics::new(&self.metrics, partition), })) @@ -242,7 +242,7 @@ impl ExecutionPlan for ProjectionExec { Ok(stats_projection( self.input.statistics()?, self.expr.iter().map(|(e, _)| Arc::clone(e)), - self.schema.clone(), + Arc::clone(&self.schema), )) } } @@ -311,10 +311,10 @@ impl ProjectionStream { if arrays.is_empty() { let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows())); - RecordBatch::try_new_with_options(self.schema.clone(), arrays, &options) + RecordBatch::try_new_with_options(Arc::clone(&self.schema), arrays, &options) .map_err(Into::into) } else { - RecordBatch::try_new(self.schema.clone(), arrays).map_err(Into::into) + RecordBatch::try_new(Arc::clone(&self.schema), arrays).map_err(Into::into) } } } @@ -351,7 +351,7 @@ impl Stream for ProjectionStream { impl RecordBatchStream for ProjectionStream { /// Get the schema fn schema(&self) -> SchemaRef { - self.schema.clone() + Arc::clone(&self.schema) } } @@ -370,10 +370,12 @@ mod tests { let task_ctx = Arc::new(TaskContext::default()); let exec = test::scan_partitioned(1); - let expected = collect(exec.execute(0, task_ctx.clone())?).await.unwrap(); + let expected = collect(exec.execute(0, Arc::clone(&task_ctx))?) + .await + .unwrap(); let projection = ProjectionExec::try_new(vec![], exec)?; - let stream = projection.execute(0, task_ctx.clone())?; + let stream = projection.execute(0, Arc::clone(&task_ctx))?; let output = collect(stream).await.unwrap(); assert_eq!(output.len(), expected.len()); diff --git a/datafusion/physical-plan/src/recursive_query.rs b/datafusion/physical-plan/src/recursive_query.rs index 9a0b66caba31..bd9303f97db0 100644 --- a/datafusion/physical-plan/src/recursive_query.rs +++ b/datafusion/physical-plan/src/recursive_query.rs @@ -82,7 +82,7 @@ impl RecursiveQueryExec { // Each recursive query needs its own work table let work_table = Arc::new(WorkTable::new()); // Use the same work table for both the WorkTableExec and the recursive term - let recursive_term = assign_work_table(recursive_term, work_table.clone())?; + let recursive_term = assign_work_table(recursive_term, Arc::clone(&work_table))?; let cache = Self::compute_properties(static_term.schema()); Ok(RecursiveQueryExec { name, @@ -147,8 +147,8 @@ impl ExecutionPlan for RecursiveQueryExec { ) -> Result> { RecursiveQueryExec::try_new( self.name.clone(), - children[0].clone(), - children[1].clone(), + Arc::clone(&children[0]), + Arc::clone(&children[1]), self.is_distinct, ) .map(|e| Arc::new(e) as _) @@ -167,12 +167,12 @@ impl ExecutionPlan for RecursiveQueryExec { ))); } - let static_stream = self.static_term.execute(partition, context.clone())?; + let static_stream = self.static_term.execute(partition, Arc::clone(&context))?; let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); Ok(Box::pin(RecursiveQueryStream::new( context, - self.work_table.clone(), - self.recursive_term.clone(), + Arc::clone(&self.work_table), + Arc::clone(&self.recursive_term), static_stream, baseline_metrics, ))) @@ -313,9 +313,9 @@ impl RecursiveQueryStream { // Downstream plans should not expect any partitioning. let partition = 0; - let recursive_plan = reset_plan_states(self.recursive_term.clone())?; + let recursive_plan = reset_plan_states(Arc::clone(&self.recursive_term))?; self.recursive_stream = - Some(recursive_plan.execute(partition, self.task_context.clone())?); + Some(recursive_plan.execute(partition, Arc::clone(&self.task_context))?); self.poll_next(cx) } } @@ -334,7 +334,7 @@ fn assign_work_table( } else { work_table_refs += 1; Ok(Transformed::yes(Arc::new( - exec.with_work_table(work_table.clone()), + exec.with_work_table(Arc::clone(&work_table)), ))) } } else if plan.as_any().is::() { @@ -358,8 +358,7 @@ fn reset_plan_states(plan: Arc) -> Result() { Ok(Transformed::no(plan)) } else { - let new_plan = plan - .clone() + let new_plan = Arc::clone(&plan) .with_new_children(plan.children().into_iter().cloned().collect())?; Ok(Transformed::yes(new_plan)) } @@ -407,7 +406,7 @@ impl Stream for RecursiveQueryStream { impl RecordBatchStream for RecursiveQueryStream { /// Get the schema fn schema(&self) -> SchemaRef { - self.schema.clone() + Arc::clone(&self.schema) } } diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 65f7d5070a5d..03f1b71ae5e1 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -133,12 +133,12 @@ impl RepartitionExecState { let r_metrics = RepartitionMetrics::new(i, num_output_partitions, &metrics); let input_task = SpawnedTask::spawn(RepartitionExec::pull_from_input( - input.clone(), + Arc::clone(&input), i, txs.clone(), partitioning.clone(), r_metrics, - context.clone(), + Arc::clone(&context), )); // In a separate task, wait for each input to be done @@ -616,7 +616,7 @@ impl ExecutionPlan for RepartitionExec { schema: Arc::clone(&schema_captured), receiver, drop_helper: Arc::clone(&abort_helper), - reservation: reservation.clone(), + reservation: Arc::clone(&reservation), }) as SendableRecordBatchStream }) .collect::>(); @@ -861,7 +861,7 @@ impl RepartitionExec { for (_, tx) in txs { // wrap it because need to send error to all output partitions - let err = Err(DataFusionError::External(Box::new(e.clone()))); + let err = Err(DataFusionError::External(Box::new(Arc::clone(&e)))); tx.send(Some(err)).await.ok(); } } @@ -940,7 +940,7 @@ impl Stream for RepartitionStream { impl RecordBatchStream for RepartitionStream { /// Get the schema fn schema(&self) -> SchemaRef { - self.schema.clone() + Arc::clone(&self.schema) } } @@ -990,7 +990,7 @@ impl Stream for PerPartitionStream { impl RecordBatchStream for PerPartitionStream { /// Get the schema fn schema(&self) -> SchemaRef { - self.schema.clone() + Arc::clone(&self.schema) } } @@ -1112,14 +1112,14 @@ mod tests { ) -> Result>> { let task_ctx = Arc::new(TaskContext::default()); // create physical plan - let exec = MemoryExec::try_new(&input_partitions, schema.clone(), None)?; + let exec = MemoryExec::try_new(&input_partitions, Arc::clone(schema), None)?; let exec = RepartitionExec::try_new(Arc::new(exec), partitioning)?; // execute and collect results let mut output_partitions = vec![]; for i in 0..exec.partitioning.partition_count() { // execute this *output* partition and collect all batches - let mut stream = exec.execute(i, task_ctx.clone())?; + let mut stream = exec.execute(i, Arc::clone(&task_ctx))?; let mut batches = vec![]; while let Some(result) = stream.next().await { batches.push(result?); @@ -1296,10 +1296,14 @@ mod tests { let input = Arc::new(make_barrier_exec()); // partition into two output streams - let exec = RepartitionExec::try_new(input.clone(), partitioning).unwrap(); + let exec = RepartitionExec::try_new( + Arc::clone(&input) as Arc, + partitioning, + ) + .unwrap(); - let output_stream0 = exec.execute(0, task_ctx.clone()).unwrap(); - let output_stream1 = exec.execute(1, task_ctx.clone()).unwrap(); + let output_stream0 = exec.execute(0, Arc::clone(&task_ctx)).unwrap(); + let output_stream1 = exec.execute(1, Arc::clone(&task_ctx)).unwrap(); // now, purposely drop output stream 0 // *before* any outputs are produced @@ -1344,8 +1348,12 @@ mod tests { // We first collect the results without droping the output stream. let input = Arc::new(make_barrier_exec()); - let exec = RepartitionExec::try_new(input.clone(), partitioning.clone()).unwrap(); - let output_stream1 = exec.execute(1, task_ctx.clone()).unwrap(); + let exec = RepartitionExec::try_new( + Arc::clone(&input) as Arc, + partitioning.clone(), + ) + .unwrap(); + let output_stream1 = exec.execute(1, Arc::clone(&task_ctx)).unwrap(); let mut background_task = JoinSet::new(); background_task.spawn(async move { input.wait().await; @@ -1365,9 +1373,13 @@ mod tests { // Now do the same but dropping the stream before waiting for the barrier let input = Arc::new(make_barrier_exec()); - let exec = RepartitionExec::try_new(input.clone(), partitioning).unwrap(); - let output_stream0 = exec.execute(0, task_ctx.clone()).unwrap(); - let output_stream1 = exec.execute(1, task_ctx.clone()).unwrap(); + let exec = RepartitionExec::try_new( + Arc::clone(&input) as Arc, + partitioning, + ) + .unwrap(); + let output_stream0 = exec.execute(0, Arc::clone(&task_ctx)).unwrap(); + let output_stream1 = exec.execute(1, Arc::clone(&task_ctx)).unwrap(); // now, purposely drop output stream 0 // *before* any outputs are produced std::mem::drop(output_stream0); @@ -1466,9 +1478,9 @@ mod tests { let schema = batch.schema(); let input = MockExec::new(vec![Ok(batch)], schema); let exec = RepartitionExec::try_new(Arc::new(input), partitioning).unwrap(); - let output_stream0 = exec.execute(0, task_ctx.clone()).unwrap(); + let output_stream0 = exec.execute(0, Arc::clone(&task_ctx)).unwrap(); let batch0 = crate::common::collect(output_stream0).await.unwrap(); - let output_stream1 = exec.execute(1, task_ctx.clone()).unwrap(); + let output_stream1 = exec.execute(1, Arc::clone(&task_ctx)).unwrap(); let batch1 = crate::common::collect(output_stream1).await.unwrap(); assert!(batch0.is_empty() || batch1.is_empty()); Ok(()) @@ -1491,12 +1503,12 @@ mod tests { let task_ctx = Arc::new(task_ctx); // create physical plan - let exec = MemoryExec::try_new(&input_partitions, schema.clone(), None)?; + let exec = MemoryExec::try_new(&input_partitions, Arc::clone(&schema), None)?; let exec = RepartitionExec::try_new(Arc::new(exec), partitioning)?; // pull partitions for i in 0..exec.partitioning.partition_count() { - let mut stream = exec.execute(i, task_ctx.clone())?; + let mut stream = exec.execute(i, Arc::clone(&task_ctx))?; let err = arrow_datafusion_err!(stream.next().await.unwrap().unwrap_err().into()); let err = err.find_root(); @@ -1637,7 +1649,7 @@ mod test { } fn memory_exec(schema: &SchemaRef) -> Arc { - Arc::new(MemoryExec::try_new(&[vec![]], schema.clone(), None).unwrap()) + Arc::new(MemoryExec::try_new(&[vec![]], Arc::clone(schema), None).unwrap()) } fn sorted_memory_exec( @@ -1645,7 +1657,7 @@ mod test { sort_exprs: Vec, ) -> Arc { Arc::new( - MemoryExec::try_new(&[vec![]], schema.clone(), None) + MemoryExec::try_new(&[vec![]], Arc::clone(schema), None) .unwrap() .with_sort_information(vec![sort_exprs]), ) diff --git a/datafusion/physical-plan/src/sorts/builder.rs b/datafusion/physical-plan/src/sorts/builder.rs index 3527d5738223..d32c60697ec8 100644 --- a/datafusion/physical-plan/src/sorts/builder.rs +++ b/datafusion/physical-plan/src/sorts/builder.rs @@ -20,6 +20,7 @@ use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use datafusion_common::Result; use datafusion_execution::memory_pool::MemoryReservation; +use std::sync::Arc; #[derive(Debug, Copy, Clone, Default)] struct BatchCursor { @@ -145,6 +146,9 @@ impl BatchBuilder { retain }); - Ok(Some(RecordBatch::try_new(self.schema.clone(), columns)?)) + Ok(Some(RecordBatch::try_new( + Arc::clone(&self.schema), + columns, + )?)) } } diff --git a/datafusion/physical-plan/src/sorts/merge.rs b/datafusion/physical-plan/src/sorts/merge.rs index 422ff3aebdb3..85418ff36119 100644 --- a/datafusion/physical-plan/src/sorts/merge.rs +++ b/datafusion/physical-plan/src/sorts/merge.rs @@ -29,6 +29,7 @@ use datafusion_common::Result; use datafusion_execution::memory_pool::MemoryReservation; use futures::Stream; use std::pin::Pin; +use std::sync::Arc; use std::task::{ready, Context, Poll}; /// A fallible [`PartitionedStream`] of [`Cursor`] and [`RecordBatch`] @@ -324,6 +325,6 @@ impl Stream for SortPreservingMergeStream { impl RecordBatchStream for SortPreservingMergeStream { fn schema(&self) -> SchemaRef { - self.in_progress.schema().clone() + Arc::clone(self.in_progress.schema()) } } diff --git a/datafusion/physical-plan/src/sorts/partial_sort.rs b/datafusion/physical-plan/src/sorts/partial_sort.rs index ad5d485cffc9..fe6b744935fb 100644 --- a/datafusion/physical-plan/src/sorts/partial_sort.rs +++ b/datafusion/physical-plan/src/sorts/partial_sort.rs @@ -260,7 +260,7 @@ impl ExecutionPlan for PartialSortExec { ) -> Result> { let new_partial_sort = PartialSortExec::new( self.expr.clone(), - children[0].clone(), + Arc::clone(&children[0]), self.common_prefix_length, ) .with_fetch(self.fetch) @@ -276,7 +276,7 @@ impl ExecutionPlan for PartialSortExec { ) -> Result { trace!("Start PartialSortExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id()); - let input = self.input.execute(partition, context.clone())?; + let input = self.input.execute(partition, Arc::clone(&context))?; trace!( "End PartialSortExec's input.execute for partition: {}", @@ -485,11 +485,11 @@ mod tests { options: option_asc, }, ], - source.clone(), + Arc::clone(&source), 2, )) as Arc; - let result = collect(partial_sort_exec, task_ctx.clone()).await?; + let result = collect(partial_sort_exec, Arc::clone(&task_ctx)).await?; let expected_after_sort = [ "+---+---+---+", @@ -549,13 +549,13 @@ mod tests { options: option_asc, }, ], - source.clone(), + Arc::clone(&source), common_prefix_length, ) .with_fetch(Some(4)), ) as Arc; - let result = collect(partial_sort_exec, task_ctx.clone()).await?; + let result = collect(partial_sort_exec, Arc::clone(&task_ctx)).await?; let expected_after_sort = [ "+---+---+---+", @@ -621,11 +621,11 @@ mod tests { options: option_asc, }, ], - source.clone(), + Arc::clone(source), common_prefix_length, )); - let result = collect(partial_sort_exec, task_ctx.clone()).await?; + let result = collect(partial_sort_exec, Arc::clone(&task_ctx)).await?; assert_eq!(2, result.len()); assert_eq!( task_ctx.runtime_env().memory_pool.reserved(), @@ -676,7 +676,7 @@ mod tests { Arc::new( MemoryExec::try_new( &[vec![batch1, batch2, batch3, batch4]], - schema.clone(), + Arc::clone(&schema), None, ) .unwrap(), @@ -711,7 +711,7 @@ mod tests { options: option_asc, }, ], - mem_exec.clone(), + Arc::clone(&mem_exec), 1, ); let partial_sort_exec = @@ -720,7 +720,7 @@ mod tests { partial_sort_executor.expr, partial_sort_executor.input, )) as Arc; - let result = collect(partial_sort_exec, task_ctx.clone()).await?; + let result = collect(partial_sort_exec, Arc::clone(&task_ctx)).await?; assert_eq!( result.iter().map(|r| r.num_rows()).collect_vec(), [0, 125, 125, 0, 150] @@ -732,7 +732,7 @@ mod tests { "The sort should have returned all memory used back to the memory manager" ); let partial_sort_result = concat_batches(&schema, &result).unwrap(); - let sort_result = collect(sort_exec, task_ctx.clone()).await?; + let sort_result = collect(sort_exec, Arc::clone(&task_ctx)).await?; assert_eq!(sort_result[0], partial_sort_result); Ok(()) @@ -772,7 +772,7 @@ mod tests { options: option_asc, }, ], - mem_exec.clone(), + Arc::clone(&mem_exec), 1, ) .with_fetch(fetch_size); @@ -783,7 +783,7 @@ mod tests { SortExec::new(partial_sort_executor.expr, partial_sort_executor.input) .with_fetch(fetch_size), ) as Arc; - let result = collect(partial_sort_exec, task_ctx.clone()).await?; + let result = collect(partial_sort_exec, Arc::clone(&task_ctx)).await?; assert_eq!( result.iter().map(|r| r.num_rows()).collect_vec(), expected_batch_num_rows @@ -795,7 +795,7 @@ mod tests { "The sort should have returned all memory used back to the memory manager" ); let partial_sort_result = concat_batches(&schema, &result)?; - let sort_result = collect(sort_exec, task_ctx.clone()).await?; + let sort_result = collect(sort_exec, Arc::clone(&task_ctx)).await?; assert_eq!(sort_result[0], partial_sort_result); } @@ -822,8 +822,12 @@ mod tests { let data: ArrayRef = Arc::new(vec![1, 1, 2].into_iter().map(Some).collect::()); - let batch = RecordBatch::try_new(schema.clone(), vec![data])?; - let input = Arc::new(MemoryExec::try_new(&[vec![batch]], schema.clone(), None)?); + let batch = RecordBatch::try_new(Arc::clone(&schema), vec![data])?; + let input = Arc::new(MemoryExec::try_new( + &[vec![batch]], + Arc::clone(&schema), + None, + )?); let partial_sort_exec = Arc::new(PartialSortExec::new( vec![PhysicalSortExpr { @@ -837,13 +841,13 @@ mod tests { let result: Vec = collect(partial_sort_exec, task_ctx).await?; let expected_batch = vec![ RecordBatch::try_new( - schema.clone(), + Arc::clone(&schema), vec![Arc::new( vec![1, 1].into_iter().map(Some).collect::(), )], )?, RecordBatch::try_new( - schema.clone(), + Arc::clone(&schema), vec![Arc::new( vec![2].into_iter().map(Some).collect::(), )], @@ -879,7 +883,7 @@ mod tests { // define data. let batch = RecordBatch::try_new( - schema.clone(), + Arc::clone(&schema), vec![ Arc::new(Float32Array::from(vec![ Some(1.0_f32), @@ -961,8 +965,11 @@ mod tests { *partial_sort_exec.schema().field(2).data_type() ); - let result: Vec = - collect(partial_sort_exec.clone(), task_ctx).await?; + let result: Vec = collect( + Arc::clone(&partial_sort_exec) as Arc, + task_ctx, + ) + .await?; assert_batches_eq!(expected, &result); assert_eq!(result.len(), 2); let metrics = partial_sort_exec.metrics().unwrap(); @@ -997,7 +1004,7 @@ mod tests { 1, )); - let fut = collect(sort_exec, task_ctx.clone()); + let fut = collect(sort_exec, Arc::clone(&task_ctx)); let mut fut = fut.boxed(); assert_is_pending(&mut fut); diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 2a4862534590..e8b562c5003e 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -345,13 +345,13 @@ impl ExternalSorter { spill.path() ))); } - let stream = read_spill_as_stream(spill, self.schema.clone())?; + let stream = read_spill_as_stream(spill, Arc::clone(&self.schema))?; streams.push(stream); } streaming_merge( streams, - self.schema.clone(), + Arc::clone(&self.schema), &self.expr, self.metrics.baseline.clone(), self.batch_size, @@ -361,7 +361,9 @@ impl ExternalSorter { } else if !self.in_mem_batches.is_empty() { self.in_mem_sort_stream(self.metrics.baseline.clone()) } else { - Ok(Box::pin(EmptyRecordBatchStream::new(self.schema.clone()))) + Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone( + &self.schema, + )))) } } @@ -402,7 +404,8 @@ impl ExternalSorter { let spill_file = self.runtime.disk_manager.create_tmp_file("Sorting")?; let batches = std::mem::take(&mut self.in_mem_batches); let spilled_rows = - spill_sorted_batches(batches, spill_file.path(), self.schema.clone()).await?; + spill_sorted_batches(batches, spill_file.path(), Arc::clone(&self.schema)) + .await?; let used = self.reservation.free(); self.metrics.spill_count.add(1); self.metrics.spilled_bytes.add(used); @@ -532,7 +535,7 @@ impl ExternalSorter { streaming_merge( streams, - self.schema.clone(), + Arc::clone(&self.schema), &self.expr, metrics, self.batch_size, @@ -555,7 +558,7 @@ impl ExternalSorter { let schema = batch.schema(); let fetch = self.fetch; - let expressions = self.expr.clone(); + let expressions = Arc::clone(&self.expr); let stream = futures::stream::once(futures::future::lazy(move |_| { let sorted = sort_batch(&batch, &expressions, fetch)?; metrics.record_output(sorted.num_rows()); @@ -915,7 +918,7 @@ impl ExecutionPlan for SortExec { self: Arc, children: Vec>, ) -> Result> { - let new_sort = SortExec::new(self.expr.clone(), children[0].clone()) + let new_sort = SortExec::new(self.expr.clone(), Arc::clone(&children[0])) .with_fetch(self.fetch) .with_preserve_partitioning(self.preserve_partitioning); @@ -929,7 +932,7 @@ impl ExecutionPlan for SortExec { ) -> Result { trace!("Start SortExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id()); - let mut input = self.input.execute(partition, context.clone())?; + let mut input = self.input.execute(partition, Arc::clone(&context))?; let execution_options = &context.session_config().options().execution; @@ -1033,7 +1036,7 @@ mod tests { Arc::new(CoalescePartitionsExec::new(csv)), )); - let result = collect(sort_exec, task_ctx.clone()).await?; + let result = collect(sort_exec, Arc::clone(&task_ctx)).await?; assert_eq!(result.len(), 1); assert_eq!(result[0].num_rows(), 400); @@ -1076,7 +1079,11 @@ mod tests { Arc::new(CoalescePartitionsExec::new(input)), )); - let result = collect(sort_exec.clone(), task_ctx.clone()).await?; + let result = collect( + Arc::clone(&sort_exec) as Arc, + Arc::clone(&task_ctx), + ) + .await?; assert_eq!(result.len(), 2); @@ -1152,7 +1159,11 @@ mod tests { .with_fetch(fetch), ); - let result = collect(sort_exec.clone(), task_ctx.clone()).await?; + let result = collect( + Arc::clone(&sort_exec) as Arc, + Arc::clone(&task_ctx), + ) + .await?; assert_eq!(result.len(), 1); let metrics = sort_exec.metrics().unwrap(); @@ -1182,9 +1193,10 @@ mod tests { let data: ArrayRef = Arc::new(vec![3, 2, 1].into_iter().map(Some).collect::()); - let batch = RecordBatch::try_new(schema.clone(), vec![data]).unwrap(); - let input = - Arc::new(MemoryExec::try_new(&[vec![batch]], schema.clone(), None).unwrap()); + let batch = RecordBatch::try_new(Arc::clone(&schema), vec![data]).unwrap(); + let input = Arc::new( + MemoryExec::try_new(&[vec![batch]], Arc::clone(&schema), None).unwrap(), + ); let sort_exec = Arc::new(SortExec::new( vec![PhysicalSortExpr { @@ -1199,7 +1211,7 @@ mod tests { let expected_data: ArrayRef = Arc::new(vec![1, 2, 3].into_iter().map(Some).collect::()); let expected_batch = - RecordBatch::try_new(schema.clone(), vec![expected_data]).unwrap(); + RecordBatch::try_new(Arc::clone(&schema), vec![expected_data]).unwrap(); // Data is correct assert_eq!(&vec![expected_batch], &result); @@ -1225,7 +1237,7 @@ mod tests { // define data. let batch = RecordBatch::try_new( - schema.clone(), + Arc::clone(&schema), vec![ Arc::new(Int32Array::from(vec![Some(2), None, Some(1), Some(2)])), Arc::new(ListArray::from_iter_primitive::(vec![ @@ -1254,7 +1266,11 @@ mod tests { }, }, ], - Arc::new(MemoryExec::try_new(&[vec![batch]], schema.clone(), None)?), + Arc::new(MemoryExec::try_new( + &[vec![batch]], + Arc::clone(&schema), + None, + )?), )); assert_eq!(DataType::Int32, *sort_exec.schema().field(0).data_type()); @@ -1263,7 +1279,8 @@ mod tests { *sort_exec.schema().field(1).data_type() ); - let result: Vec = collect(sort_exec.clone(), task_ctx).await?; + let result: Vec = + collect(Arc::clone(&sort_exec) as Arc, task_ctx).await?; let metrics = sort_exec.metrics().unwrap(); assert!(metrics.elapsed_compute().unwrap() > 0); assert_eq!(metrics.output_rows().unwrap(), 4); @@ -1297,7 +1314,7 @@ mod tests { // define data. let batch = RecordBatch::try_new( - schema.clone(), + Arc::clone(&schema), vec![ Arc::new(Float32Array::from(vec![ Some(f32::NAN), @@ -1345,7 +1362,8 @@ mod tests { assert_eq!(DataType::Float32, *sort_exec.schema().field(0).data_type()); assert_eq!(DataType::Float64, *sort_exec.schema().field(1).data_type()); - let result: Vec = collect(sort_exec.clone(), task_ctx).await?; + let result: Vec = + collect(Arc::clone(&sort_exec) as Arc, task_ctx).await?; let metrics = sort_exec.metrics().unwrap(); assert!(metrics.elapsed_compute().unwrap() > 0); assert_eq!(metrics.output_rows().unwrap(), 8); @@ -1408,7 +1426,7 @@ mod tests { blocking_exec, )); - let fut = collect(sort_exec, task_ctx.clone()); + let fut = collect(sort_exec, Arc::clone(&task_ctx)); let mut fut = fut.boxed(); assert_is_pending(&mut fut); @@ -1429,7 +1447,8 @@ mod tests { let schema = Arc::new(Schema::empty()); let options = RecordBatchOptions::new().with_row_count(Some(1)); let batch = - RecordBatch::try_new_with_options(schema.clone(), vec![], &options).unwrap(); + RecordBatch::try_new_with_options(Arc::clone(&schema), vec![], &options) + .unwrap(); let expressions = vec![PhysicalSortExpr { expr: Arc::new(Literal::new(ScalarValue::Int64(Some(1)))), diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 8a349bd22abf..a023f9c7507c 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -182,7 +182,7 @@ impl ExecutionPlan for SortPreservingMergeExec { children: Vec>, ) -> Result> { Ok(Arc::new( - SortPreservingMergeExec::new(self.expr.clone(), children[0].clone()) + SortPreservingMergeExec::new(self.expr.clone(), Arc::clone(&children[0])) .with_fetch(self.fetch), )) } @@ -226,7 +226,8 @@ impl ExecutionPlan for SortPreservingMergeExec { _ => { let receivers = (0..input_partitions) .map(|partition| { - let stream = self.input.execute(partition, context.clone())?; + let stream = + self.input.execute(partition, Arc::clone(&context))?; Ok(spawn_buffered(stream, 1)) }) .collect::>()?; @@ -581,8 +582,9 @@ mod tests { }, }]; - let basic = basic_sort(csv.clone(), sort.clone(), task_ctx.clone()).await; - let partition = partition_sort(csv, sort, task_ctx.clone()).await; + let basic = + basic_sort(Arc::clone(&csv), sort.clone(), Arc::clone(&task_ctx)).await; + let partition = partition_sort(csv, sort, Arc::clone(&task_ctx)).await; let basic = arrow::util::pretty::pretty_format_batches(&[basic]) .unwrap() @@ -648,10 +650,11 @@ mod tests { }]; let input = - sorted_partitioned_input(sort.clone(), &[10, 3, 11], task_ctx.clone()) + sorted_partitioned_input(sort.clone(), &[10, 3, 11], Arc::clone(&task_ctx)) .await?; - let basic = basic_sort(input.clone(), sort.clone(), task_ctx.clone()).await; - let partition = sorted_merge(input, sort, task_ctx.clone()).await; + let basic = + basic_sort(Arc::clone(&input), sort.clone(), Arc::clone(&task_ctx)).await; + let partition = sorted_merge(input, sort, Arc::clone(&task_ctx)).await; assert_eq!(basic.num_rows(), 1200); assert_eq!(partition.num_rows(), 1200); @@ -679,9 +682,9 @@ mod tests { // Test streaming with default batch size let task_ctx = Arc::new(TaskContext::default()); let input = - sorted_partitioned_input(sort.clone(), &[10, 5, 13], task_ctx.clone()) + sorted_partitioned_input(sort.clone(), &[10, 5, 13], Arc::clone(&task_ctx)) .await?; - let basic = basic_sort(input.clone(), sort.clone(), task_ctx).await; + let basic = basic_sort(Arc::clone(&input), sort.clone(), task_ctx).await; // batch size of 23 let task_ctx = TaskContext::default() @@ -799,17 +802,18 @@ mod tests { }]; let batches = - sorted_partitioned_input(sort.clone(), &[5, 7, 3], task_ctx.clone()).await?; + sorted_partitioned_input(sort.clone(), &[5, 7, 3], Arc::clone(&task_ctx)) + .await?; let partition_count = batches.output_partitioning().partition_count(); let mut streams = Vec::with_capacity(partition_count); for partition in 0..partition_count { - let mut builder = RecordBatchReceiverStream::builder(schema.clone(), 1); + let mut builder = RecordBatchReceiverStream::builder(Arc::clone(&schema), 1); let sender = builder.tx(); - let mut stream = batches.execute(partition, task_ctx.clone()).unwrap(); + let mut stream = batches.execute(partition, Arc::clone(&task_ctx)).unwrap(); builder.spawn(async move { while let Some(batch) = stream.next().await { sender.send(batch).await.unwrap(); @@ -843,7 +847,7 @@ mod tests { assert_eq!(merged.len(), 1); let merged = merged.remove(0); - let basic = basic_sort(batches, sort.clone(), task_ctx.clone()).await; + let basic = basic_sort(batches, sort.clone(), Arc::clone(&task_ctx)).await; let basic = arrow::util::pretty::pretty_format_batches(&[basic]) .unwrap() @@ -879,7 +883,9 @@ mod tests { let exec = MemoryExec::try_new(&[vec![b1], vec![b2]], schema, None).unwrap(); let merge = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec))); - let collected = collect(merge.clone(), task_ctx).await.unwrap(); + let collected = collect(Arc::clone(&merge) as Arc, task_ctx) + .await + .unwrap(); let expected = [ "+----+---+", "| a | b |", diff --git a/datafusion/physical-plan/src/sorts/stream.rs b/datafusion/physical-plan/src/sorts/stream.rs index 135b4fbdece4..c7924edfb1eb 100644 --- a/datafusion/physical-plan/src/sorts/stream.rs +++ b/datafusion/physical-plan/src/sorts/stream.rs @@ -109,7 +109,7 @@ impl RowCursorStream { Ok(Self { converter, reservation, - column_expressions: expressions.iter().map(|x| x.expr.clone()).collect(), + column_expressions: expressions.iter().map(|x| Arc::clone(&x.expr)).collect(), streams: FusedStreams(streams), }) } diff --git a/datafusion/physical-plan/src/stream.rs b/datafusion/physical-plan/src/stream.rs index 99d9367740be..faeb4799f5af 100644 --- a/datafusion/physical-plan/src/stream.rs +++ b/datafusion/physical-plan/src/stream.rs @@ -382,7 +382,7 @@ where S: Stream>, { fn schema(&self) -> SchemaRef { - self.schema.clone() + Arc::clone(&self.schema) } } @@ -402,7 +402,7 @@ impl EmptyRecordBatchStream { impl RecordBatchStream for EmptyRecordBatchStream { fn schema(&self) -> SchemaRef { - self.schema.clone() + Arc::clone(&self.schema) } } @@ -474,7 +474,7 @@ mod test { let schema = schema(); let num_partitions = 10; - let input = PanicExec::new(schema.clone(), num_partitions); + let input = PanicExec::new(Arc::clone(&schema), num_partitions); consume(input, 10).await } @@ -485,7 +485,7 @@ mod test { // make 2 partitions, second partition panics before the first let num_partitions = 2; - let input = PanicExec::new(schema.clone(), num_partitions) + let input = PanicExec::new(Arc::clone(&schema), num_partitions) .with_partition_panic(0, 10) .with_partition_panic(1, 3); // partition 1 should panic first (after 3 ) @@ -504,12 +504,12 @@ mod test { let schema = schema(); // Make an input that never proceeds - let input = BlockingExec::new(schema.clone(), 1); + let input = BlockingExec::new(Arc::clone(&schema), 1); let refs = input.refs(); // Configure a RecordBatchReceiverStream to consume the input let mut builder = RecordBatchReceiverStream::builder(schema, 2); - builder.run_input(Arc::new(input), 0, task_ctx.clone()); + builder.run_input(Arc::new(input), 0, Arc::clone(&task_ctx)); let stream = builder.build(); // input should still be present @@ -529,12 +529,14 @@ mod test { let schema = schema(); // make an input that will error twice - let error_stream = - MockExec::new(vec![exec_err!("Test1"), exec_err!("Test2")], schema.clone()) - .with_use_task(false); + let error_stream = MockExec::new( + vec![exec_err!("Test1"), exec_err!("Test2")], + Arc::clone(&schema), + ) + .with_use_task(false); let mut builder = RecordBatchReceiverStream::builder(schema, 2); - builder.run_input(Arc::new(error_stream), 0, task_ctx.clone()); + builder.run_input(Arc::new(error_stream), 0, Arc::clone(&task_ctx)); let mut stream = builder.build(); // get the first result, which should be an error @@ -560,7 +562,11 @@ mod test { let mut builder = RecordBatchReceiverStream::builder(input.schema(), num_partitions); for partition in 0..num_partitions { - builder.run_input(input.clone(), partition, task_ctx.clone()); + builder.run_input( + Arc::clone(&input) as Arc, + partition, + Arc::clone(&task_ctx), + ); } let mut stream = builder.build(); diff --git a/datafusion/physical-plan/src/streaming.rs b/datafusion/physical-plan/src/streaming.rs index ff57adde4e2e..5a9035c8dbfc 100644 --- a/datafusion/physical-plan/src/streaming.rs +++ b/datafusion/physical-plan/src/streaming.rs @@ -93,7 +93,7 @@ impl StreamingTableExec { let projected_output_ordering = projected_output_ordering.into_iter().collect::>(); let cache = Self::compute_properties( - projected_schema.clone(), + Arc::clone(&projected_schema), &projected_output_ordering, &partitions, infinite, @@ -240,7 +240,7 @@ impl ExecutionPlan for StreamingTableExec { let stream = self.partitions[partition].execute(ctx); let projected_stream = match self.projection.clone() { Some(projection) => Box::pin(RecordBatchStreamAdapter::new( - self.projected_schema.clone(), + Arc::clone(&self.projected_schema), stream.map(move |x| { x.and_then(|b| b.project(projection.as_ref()).map_err(Into::into)) }), @@ -327,7 +327,7 @@ mod test { /// Set the batches for the stream fn with_batches(mut self, batches: Vec) -> Self { let stream = TestPartitionStream::new_with_batches(batches); - self.schema = Some(stream.schema().clone()); + self.schema = Some(Arc::clone(stream.schema())); self.partitions = vec![Arc::new(stream)]; self } diff --git a/datafusion/physical-plan/src/test.rs b/datafusion/physical-plan/src/test.rs index 377b919bb407..f5b4a096018f 100644 --- a/datafusion/physical-plan/src/test.rs +++ b/datafusion/physical-plan/src/test.rs @@ -144,6 +144,9 @@ impl PartitionStream for TestPartitionStream { } fn execute(&self, _ctx: Arc) -> SendableRecordBatchStream { let stream = futures::stream::iter(self.batches.clone().into_iter().map(Ok)); - Box::pin(RecordBatchStreamAdapter::new(self.schema.clone(), stream)) + Box::pin(RecordBatchStreamAdapter::new( + Arc::clone(&self.schema), + stream, + )) } } diff --git a/datafusion/physical-plan/src/test/exec.rs b/datafusion/physical-plan/src/test/exec.rs index ad47a484c9f7..ac4eb1ca9e58 100644 --- a/datafusion/physical-plan/src/test/exec.rs +++ b/datafusion/physical-plan/src/test/exec.rs @@ -133,7 +133,7 @@ impl MockExec { /// ensure any poll loops are correct. This behavior can be /// changed with `with_use_task` pub fn new(data: Vec>, schema: SchemaRef) -> Self { - let cache = Self::compute_properties(schema.clone()); + let cache = Self::compute_properties(Arc::clone(&schema)); Self { data, schema, @@ -294,7 +294,7 @@ impl BarrierExec { pub fn new(data: Vec>, schema: SchemaRef) -> Self { // wait for all streams and the input let barrier = Arc::new(Barrier::new(data.len() + 1)); - let cache = Self::compute_properties(schema.clone(), &data); + let cache = Self::compute_properties(Arc::clone(&schema), &data); Self { data, schema, @@ -374,7 +374,7 @@ impl ExecutionPlan for BarrierExec { // task simply sends data in order after barrier is reached let data = self.data[partition].clone(); - let b = self.barrier.clone(); + let b = Arc::clone(&self.barrier); let tx = builder.tx(); builder.spawn(async move { println!("Partition {partition} waiting on barrier"); @@ -421,7 +421,7 @@ impl ErrorExec { DataType::Int64, true, )])); - let cache = Self::compute_properties(schema.clone()); + let cache = Self::compute_properties(schema); Self { cache } } @@ -591,7 +591,7 @@ pub struct BlockingExec { impl BlockingExec { /// Create new [`BlockingExec`] with a give schema and number of partitions. pub fn new(schema: SchemaRef, n_partitions: usize) -> Self { - let cache = Self::compute_properties(schema.clone(), n_partitions); + let cache = Self::compute_properties(Arc::clone(&schema), n_partitions); Self { schema, refs: Default::default(), @@ -735,7 +735,7 @@ impl PanicExec { /// partitions, which will each panic immediately. pub fn new(schema: SchemaRef, n_partitions: usize) -> Self { let batches_until_panics = vec![0; n_partitions]; - let cache = Self::compute_properties(schema.clone(), &batches_until_panics); + let cache = Self::compute_properties(Arc::clone(&schema), &batches_until_panics); Self { schema, batches_until_panics, @@ -845,7 +845,7 @@ impl Stream for PanicStream { if self.ready { self.batches_until_panic -= 1; self.ready = false; - let batch = RecordBatch::new_empty(self.schema.clone()); + let batch = RecordBatch::new_empty(Arc::clone(&self.schema)); return Poll::Ready(Some(Ok(batch))); } else { self.ready = true; diff --git a/datafusion/physical-plan/src/topk/mod.rs b/datafusion/physical-plan/src/topk/mod.rs index 6a77bfaf3ccd..5366a5707696 100644 --- a/datafusion/physical-plan/src/topk/mod.rs +++ b/datafusion/physical-plan/src/topk/mod.rs @@ -131,7 +131,7 @@ impl TopK { ); Ok(Self { - schema: schema.clone(), + schema: Arc::clone(&schema), metrics: TopKMetrics::new(metrics, partition), reservation, batch_size, @@ -355,7 +355,7 @@ impl TopKHeap { /// high, as a single [`RecordBatch`], and a sorted vec of the /// current heap's contents pub fn emit_with_state(&mut self) -> Result<(RecordBatch, Vec)> { - let schema = self.store.schema().clone(); + let schema = Arc::clone(self.store.schema()); // generate sorted rows let topk_rows = std::mem::take(&mut self.inner).into_sorted_vec(); diff --git a/datafusion/physical-plan/src/tree_node.rs b/datafusion/physical-plan/src/tree_node.rs index 1570778be69b..96bd0de3d37c 100644 --- a/datafusion/physical-plan/src/tree_node.rs +++ b/datafusion/physical-plan/src/tree_node.rs @@ -62,7 +62,7 @@ impl PlanContext { } pub fn update_plan_from_children(mut self) -> Result { - let children_plans = self.children.iter().map(|c| c.plan.clone()).collect(); + let children_plans = self.children.iter().map(|c| Arc::clone(&c.plan)).collect(); self.plan = with_new_children_if_necessary(self.plan, children_plans)?; Ok(self) diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index dc7d270bae25..0deddfa33b0f 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -417,7 +417,7 @@ impl ExecutionPlan for InterleaveExec { let mut input_stream_vec = vec![]; for input in self.inputs.iter() { if partition < input.output_partitioning().partition_count() { - input_stream_vec.push(input.execute(partition, context.clone())?); + input_stream_vec.push(input.execute(partition, Arc::clone(&context))?); } else { // Do not find a partition to execute break; @@ -518,7 +518,7 @@ impl CombinedRecordBatchStream { impl RecordBatchStream for CombinedRecordBatchStream { fn schema(&self) -> SchemaRef { - self.schema.clone() + Arc::clone(&self.schema) } } @@ -625,7 +625,7 @@ mod tests { in_data .iter() .map(|(expr, options)| PhysicalSortExpr { - expr: (*expr).clone(), + expr: Arc::clone(*expr), options: *options, }) .collect::>() @@ -810,11 +810,11 @@ mod tests { .map(|ordering| convert_to_sort_exprs(ordering)) .collect::>(); let child1 = Arc::new( - MemoryExec::try_new(&[], schema.clone(), None)? + MemoryExec::try_new(&[], Arc::clone(&schema), None)? .with_sort_information(first_orderings), ); let child2 = Arc::new( - MemoryExec::try_new(&[], schema.clone(), None)? + MemoryExec::try_new(&[], Arc::clone(&schema), None)? .with_sort_information(second_orderings), ); diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs index e072b214fd36..bdd56f4b5aa4 100644 --- a/datafusion/physical-plan/src/unnest.rs +++ b/datafusion/physical-plan/src/unnest.rs @@ -83,7 +83,7 @@ impl UnnestExec { schema: SchemaRef, options: UnnestOptions, ) -> Self { - let cache = Self::compute_properties(&input, schema.clone()); + let cache = Self::compute_properties(&input, Arc::clone(&schema)); UnnestExec { input, @@ -147,10 +147,10 @@ impl ExecutionPlan for UnnestExec { children: Vec>, ) -> Result> { Ok(Arc::new(UnnestExec::new( - children[0].clone(), + Arc::clone(&children[0]), self.list_column_indices.clone(), self.struct_column_indices.clone(), - self.schema.clone(), + Arc::clone(&self.schema), self.options.clone(), ))) } @@ -169,7 +169,7 @@ impl ExecutionPlan for UnnestExec { Ok(Box::pin(UnnestStream { input, - schema: self.schema.clone(), + schema: Arc::clone(&self.schema), list_type_columns: self.list_column_indices.clone(), struct_column_indices: self.struct_column_indices.iter().copied().collect(), options: self.options.clone(), @@ -237,7 +237,7 @@ struct UnnestStream { impl RecordBatchStream for UnnestStream { fn schema(&self) -> SchemaRef { - self.schema.clone() + Arc::clone(&self.schema) } } @@ -330,13 +330,13 @@ fn flatten_struct_cols( data_type ), }, - None => Ok(vec![column_data.clone()]), + None => Ok(vec![Arc::clone(column_data)]), }) .collect::>>()? .into_iter() .flatten() .collect(); - Ok(RecordBatch::try_new(schema.clone(), columns_expanded)?) + Ok(RecordBatch::try_new(Arc::clone(schema), columns_expanded)?) } /// For each row in a `RecordBatch`, some list/struct columns need to be unnested. @@ -357,7 +357,7 @@ fn build_batch( let list_arrays: Vec = list_type_columns .iter() .map(|index| { - ColumnarValue::Array(batch.column(*index).clone()) + ColumnarValue::Array(Arc::clone(batch.column(*index))) .into_array(batch.num_rows()) }) .collect::>()?; @@ -372,7 +372,7 @@ fn build_batch( })? as usize }; if total_length == 0 { - return Ok(RecordBatch::new_empty(schema.clone())); + return Ok(RecordBatch::new_empty(Arc::clone(schema))); } // Unnest all the list arrays @@ -444,7 +444,7 @@ fn find_longest_length( .collect::>()?; let longest_length = list_lengths.iter().skip(1).try_fold( - list_lengths[0].clone(), + Arc::clone(&list_lengths[0]), |longest, current| { let is_lt = lt(&longest, ¤t)?; zip(&is_lt, ¤t, &longest) @@ -649,7 +649,7 @@ fn flatten_list_cols_from_indices( .iter() .enumerate() .map(|(col_idx, arr)| match unnested_list_arrays.get(&col_idx) { - Some(unnested_array) => Ok(unnested_array.clone()), + Some(unnested_array) => Ok(Arc::clone(unnested_array)), None => Ok(kernels::take::take(arr, indices, None)?), }) .collect::>>()?; @@ -813,27 +813,27 @@ mod tests { // Test with single ListArray // [A, B, C], [], NULL, [D], NULL, [NULL, F] let list_array = Arc::new(make_generic_array::()) as ArrayRef; - verify_longest_length(&[list_array.clone()], false, vec![3, 0, 0, 1, 0, 2])?; - verify_longest_length(&[list_array.clone()], true, vec![3, 0, 1, 1, 1, 2])?; + verify_longest_length(&[Arc::clone(&list_array)], false, vec![3, 0, 0, 1, 0, 2])?; + verify_longest_length(&[Arc::clone(&list_array)], true, vec![3, 0, 1, 1, 1, 2])?; // Test with single LargeListArray // [A, B, C], [], NULL, [D], NULL, [NULL, F] let list_array = Arc::new(make_generic_array::()) as ArrayRef; - verify_longest_length(&[list_array.clone()], false, vec![3, 0, 0, 1, 0, 2])?; - verify_longest_length(&[list_array.clone()], true, vec![3, 0, 1, 1, 1, 2])?; + verify_longest_length(&[Arc::clone(&list_array)], false, vec![3, 0, 0, 1, 0, 2])?; + verify_longest_length(&[Arc::clone(&list_array)], true, vec![3, 0, 1, 1, 1, 2])?; // Test with single FixedSizeListArray // [A, B], NULL, [C, D], NULL, [NULL, F], [NULL, NULL] let list_array = Arc::new(make_fixed_list()) as ArrayRef; - verify_longest_length(&[list_array.clone()], false, vec![2, 0, 2, 0, 2, 2])?; - verify_longest_length(&[list_array.clone()], true, vec![2, 1, 2, 1, 2, 2])?; + verify_longest_length(&[Arc::clone(&list_array)], false, vec![2, 0, 2, 0, 2, 2])?; + verify_longest_length(&[Arc::clone(&list_array)], true, vec![2, 1, 2, 1, 2, 2])?; // Test with multiple list arrays // [A, B, C], [], NULL, [D], NULL, [NULL, F] // [A, B], NULL, [C, D], NULL, [NULL, F], [NULL, NULL] let list1 = Arc::new(make_generic_array::()) as ArrayRef; let list2 = Arc::new(make_fixed_list()) as ArrayRef; - let list_arrays = vec![list1.clone(), list2.clone()]; + let list_arrays = vec![Arc::clone(&list1), Arc::clone(&list2)]; verify_longest_length(&list_arrays, false, vec![3, 0, 2, 1, 2, 2])?; verify_longest_length(&list_arrays, true, vec![3, 1, 2, 1, 2, 2])?; diff --git a/datafusion/physical-plan/src/values.rs b/datafusion/physical-plan/src/values.rs index 4d385812d4a8..3ea27d62d80b 100644 --- a/datafusion/physical-plan/src/values.rs +++ b/datafusion/physical-plan/src/values.rs @@ -88,7 +88,7 @@ impl ValuesExec { .and_then(ScalarValue::iter_to_array) }) .collect::>>()?; - let batch = RecordBatch::try_new(schema.clone(), arr)?; + let batch = RecordBatch::try_new(Arc::clone(&schema), arr)?; let data: Vec = vec![batch]; Self::try_new_from_batches(schema, data) } @@ -114,7 +114,7 @@ impl ValuesExec { } } - let cache = Self::compute_properties(schema.clone()); + let cache = Self::compute_properties(Arc::clone(&schema)); Ok(ValuesExec { schema, data: batches, @@ -175,7 +175,7 @@ impl ExecutionPlan for ValuesExec { self: Arc, _: Vec>, ) -> Result> { - ValuesExec::try_new_from_batches(self.schema.clone(), self.data.clone()) + ValuesExec::try_new_from_batches(Arc::clone(&self.schema), self.data.clone()) .map(|e| Arc::new(e) as _) } @@ -193,7 +193,7 @@ impl ExecutionPlan for ValuesExec { Ok(Box::pin(MemoryStream::try_new( self.data(), - self.schema.clone(), + Arc::clone(&self.schema), None, )?)) } @@ -260,7 +260,7 @@ mod tests { DataType::UInt32, false, )])); - let _ = ValuesExec::try_new(schema.clone(), vec![vec![lit(1u32)]]).unwrap(); + let _ = ValuesExec::try_new(Arc::clone(&schema), vec![vec![lit(1u32)]]).unwrap(); // Test that a null value is rejected let _ = ValuesExec::try_new(schema, vec![vec![lit(ScalarValue::UInt32(None))]]) .unwrap_err(); diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index 9eb29891703e..6311107f7b58 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -289,7 +289,7 @@ impl ExecutionPlan for BoundedWindowAggExec { ) -> Result> { Ok(Arc::new(BoundedWindowAggExec::try_new( self.window_expr.clone(), - children[0].clone(), + Arc::clone(&children[0]), self.partition_keys.clone(), self.input_order_mode.clone(), )?)) @@ -303,7 +303,7 @@ impl ExecutionPlan for BoundedWindowAggExec { let input = self.input.execute(partition, context)?; let search_mode = self.get_search_algo()?; let stream = Box::pin(BoundedWindowAggStream::new( - self.schema.clone(), + Arc::clone(&self.schema), self.window_expr.clone(), input, BaselineMetrics::new(&self.metrics, partition), @@ -394,7 +394,9 @@ trait PartitionSearcher: Send { // as it may not have the "correct" schema in terms of output // nullability constraints. For details, see the following issue: // https://github.com/apache/datafusion/issues/9320 - .or_insert_with(|| PartitionBatchState::new(self.input_schema().clone())); + .or_insert_with(|| { + PartitionBatchState::new(Arc::clone(self.input_schema())) + }); partition_batch_state.extend(&partition_batch)?; } @@ -513,7 +515,7 @@ impl PartitionSearcher for LinearSearch { let length = indices.len(); for (idx, window_agg_state) in window_agg_states.iter().enumerate() { let partition = &window_agg_state[&row]; - let values = partition.state.out_col.slice(0, length).clone(); + let values = Arc::clone(&partition.state.out_col.slice(0, length)); new_columns[idx].push(values); } let partition_batch_state = &mut partition_buffers[&row]; @@ -935,7 +937,7 @@ impl BoundedWindowAggStream { search_mode: Box, ) -> Result { let state = window_expr.iter().map(|_| IndexMap::new()).collect(); - let empty_batch = RecordBatch::new_empty(schema.clone()); + let empty_batch = RecordBatch::new_empty(Arc::clone(&schema)); Ok(Self { schema, input, @@ -957,7 +959,7 @@ impl BoundedWindowAggStream { cur_window_expr.evaluate_stateful(&self.partition_buffers, state)?; } - let schema = self.schema.clone(); + let schema = Arc::clone(&self.schema); let window_expr_out = self.search_mode.calculate_out_columns( &self.input_buffer, &self.window_agg_states, @@ -1114,7 +1116,7 @@ impl BoundedWindowAggStream { impl RecordBatchStream for BoundedWindowAggStream { /// Get the schema fn schema(&self) -> SchemaRef { - self.schema.clone() + Arc::clone(&self.schema) } } @@ -1287,7 +1289,7 @@ mod tests { impl RecordBatchStream for TestStreamPartition { fn schema(&self) -> SchemaRef { - self.schema.clone() + Arc::clone(&self.schema) } } @@ -1467,7 +1469,7 @@ mod tests { } let batch = RecordBatch::try_new( - schema.clone(), + Arc::clone(schema), vec![Arc::new(sn1_array.finish()), Arc::new(hash_array.finish())], )?; batches.push(batch); @@ -1500,7 +1502,7 @@ mod tests { // Source has 2 partitions let partitions = vec![ Arc::new(TestStreamPartition { - schema: schema.clone(), + schema: Arc::clone(&schema), batches: batches.clone(), idx: 0, state: PolingState::BatchReturn, @@ -1510,7 +1512,7 @@ mod tests { n_partition ]; let source = Arc::new(StreamingTableExec::try_new( - schema.clone(), + Arc::clone(&schema), partitions, None, orderings, @@ -1533,28 +1535,38 @@ mod tests { let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); // Create a new batch of data to insert into the table let batch = RecordBatch::try_new( - schema.clone(), + Arc::clone(&schema), vec![Arc::new(arrow_array::Int32Array::from(vec![1, 2, 3]))], )?; let memory_exec = MemoryExec::try_new( &[vec![batch.clone(), batch.clone(), batch.clone()]], - schema.clone(), + Arc::clone(&schema), None, ) .map(|e| Arc::new(e) as Arc)?; let col_a = col("a", &schema)?; - let nth_value_func1 = - NthValue::nth("nth_value(-1)", col_a.clone(), DataType::Int32, 1, false)? - .reverse_expr() - .unwrap(); - let nth_value_func2 = - NthValue::nth("nth_value(-2)", col_a.clone(), DataType::Int32, 2, false)? - .reverse_expr() - .unwrap(); + let nth_value_func1 = NthValue::nth( + "nth_value(-1)", + Arc::clone(&col_a), + DataType::Int32, + 1, + false, + )? + .reverse_expr() + .unwrap(); + let nth_value_func2 = NthValue::nth( + "nth_value(-2)", + Arc::clone(&col_a), + DataType::Int32, + 2, + false, + )? + .reverse_expr() + .unwrap(); let last_value_func = Arc::new(NthValue::last( "last", - col_a.clone(), + Arc::clone(&col_a), DataType::Int32, false, )) as _; diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index 181c30800434..de1c448df10f 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -63,11 +63,11 @@ pub fn schema_add_window_field( ) -> Result> { let data_types = args .iter() - .map(|e| e.clone().as_ref().data_type(schema)) + .map(|e| Arc::clone(e).as_ref().data_type(schema)) .collect::>>()?; let nullability = args .iter() - .map(|e| e.clone().as_ref().nullable(schema)) + .map(|e| Arc::clone(e).as_ref().nullable(schema)) .collect::>>()?; let window_expr_return_type = window_fn.return_type(&data_types, &nullability)?; let mut window_fields = schema @@ -269,7 +269,7 @@ fn create_built_in_window_expr( } } BuiltInWindowFunction::Lag => { - let arg = args[0].clone(); + let arg = Arc::clone(&args[0]); let shift_offset = get_scalar_value_from_args(args, 1)? .map(|v| v.try_into()) .and_then(|v| v.ok()); @@ -285,7 +285,7 @@ fn create_built_in_window_expr( )) } BuiltInWindowFunction::Lead => { - let arg = args[0].clone(); + let arg = Arc::clone(&args[0]); let shift_offset = get_scalar_value_from_args(args, 1)? .map(|v| v.try_into()) .and_then(|v| v.ok()); @@ -301,7 +301,7 @@ fn create_built_in_window_expr( )) } BuiltInWindowFunction::NthValue => { - let arg = args[0].clone(); + let arg = Arc::clone(&args[0]); let n = args[1].as_any().downcast_ref::().unwrap().value(); let n: i64 = n .clone() @@ -316,7 +316,7 @@ fn create_built_in_window_expr( )?) } BuiltInWindowFunction::FirstValue => { - let arg = args[0].clone(); + let arg = Arc::clone(&args[0]); Arc::new(NthValue::first( name, arg, @@ -325,7 +325,7 @@ fn create_built_in_window_expr( )) } BuiltInWindowFunction::LastValue => { - let arg = args[0].clone(); + let arg = Arc::clone(&args[0]); Arc::new(NthValue::last( name, arg, @@ -407,13 +407,16 @@ pub(crate) fn calc_requirements< let mut sort_reqs = partition_by_exprs .into_iter() .map(|partition_by| { - PhysicalSortRequirement::new(partition_by.borrow().clone(), None) + PhysicalSortRequirement::new(Arc::clone(partition_by.borrow()), None) }) .collect::>(); for element in orderby_sort_exprs.into_iter() { let PhysicalSortExpr { expr, options } = element.borrow(); if !sort_reqs.iter().any(|e| e.expr.eq(expr)) { - sort_reqs.push(PhysicalSortRequirement::new(expr.clone(), Some(*options))); + sort_reqs.push(PhysicalSortRequirement::new( + Arc::clone(expr), + Some(*options), + )); } } // Convert empty result to None. Otherwise wrap result inside Some() @@ -442,7 +445,7 @@ pub(crate) fn get_partition_by_sort_exprs( ) -> Result { let ordered_partition_exprs = ordered_partition_by_indices .iter() - .map(|idx| partition_by_exprs[*idx].clone()) + .map(|idx| Arc::clone(&partition_by_exprs[*idx])) .collect::>(); // Make sure ordered section doesn't move over the partition by expression assert!(ordered_partition_by_indices.len() <= partition_by_exprs.len()); @@ -463,7 +466,7 @@ pub(crate) fn window_equivalence_properties( ) -> EquivalenceProperties { // We need to update the schema, so we can not directly use // `input.equivalence_properties()`. - let mut window_eq_properties = EquivalenceProperties::new(schema.clone()) + let mut window_eq_properties = EquivalenceProperties::new(Arc::clone(schema)) .extend(input.equivalence_properties().clone()); for expr in window_expr { @@ -534,7 +537,7 @@ pub fn get_best_fitting_window( if window_expr.iter().all(|e| e.uses_bounded_memory()) { Ok(Some(Arc::new(BoundedWindowAggExec::try_new( window_expr, - input.clone(), + Arc::clone(input), physical_partition_keys.to_vec(), input_order_mode, )?) as _)) @@ -547,7 +550,7 @@ pub fn get_best_fitting_window( } else { Ok(Some(Arc::new(WindowAggExec::try_new( window_expr, - input.clone(), + Arc::clone(input), physical_partition_keys.to_vec(), )?) as _)) } @@ -572,7 +575,7 @@ pub fn get_window_mode( let mut partition_by_reqs: Vec = vec![]; let (_, indices) = input_eqs.find_longest_permutation(partitionby_exprs); partition_by_reqs.extend(indices.iter().map(|&idx| PhysicalSortRequirement { - expr: partitionby_exprs[idx].clone(), + expr: Arc::clone(&partitionby_exprs[idx]), options: None, })); // Treat partition by exprs as constant. During analysis of requirements are satisfied. @@ -672,7 +675,7 @@ mod tests { let sort_exprs = sort_exprs.into_iter().collect(); Ok(Arc::new(StreamingTableExec::try_new( - schema.clone(), + Arc::clone(schema), vec![], None, Some(sort_exprs), diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs b/datafusion/physical-plan/src/windows/window_agg_exec.rs index eb01da2ec094..b6330f65e0b7 100644 --- a/datafusion/physical-plan/src/windows/window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs @@ -79,7 +79,7 @@ impl WindowAggExec { let ordered_partition_by_indices = get_ordered_partition_by_indices(window_expr[0].partition_by(), &input); - let cache = Self::compute_properties(schema.clone(), &input, &window_expr); + let cache = Self::compute_properties(Arc::clone(&schema), &input, &window_expr); Ok(Self { input, window_expr, @@ -220,7 +220,7 @@ impl ExecutionPlan for WindowAggExec { ) -> Result> { Ok(Arc::new(WindowAggExec::try_new( self.window_expr.clone(), - children[0].clone(), + Arc::clone(&children[0]), self.partition_keys.clone(), )?)) } @@ -232,7 +232,7 @@ impl ExecutionPlan for WindowAggExec { ) -> Result { let input = self.input.execute(partition, context)?; let stream = Box::pin(WindowAggStream::new( - self.schema.clone(), + Arc::clone(&self.schema), self.window_expr.clone(), input, BaselineMetrics::new(&self.metrics, partition), @@ -333,7 +333,7 @@ impl WindowAggStream { let _timer = self.baseline_metrics.elapsed_compute().timer(); let batch = concat_batches(&self.input.schema(), &self.batches)?; if batch.num_rows() == 0 { - return Ok(RecordBatch::new_empty(self.schema.clone())); + return Ok(RecordBatch::new_empty(Arc::clone(&self.schema))); } let partition_by_sort_keys = self @@ -366,7 +366,10 @@ impl WindowAggStream { let mut batch_columns = batch.columns().to_vec(); // calculate window cols batch_columns.extend_from_slice(&columns); - Ok(RecordBatch::try_new(self.schema.clone(), batch_columns)?) + Ok(RecordBatch::try_new( + Arc::clone(&self.schema), + batch_columns, + )?) } } @@ -412,6 +415,6 @@ impl WindowAggStream { impl RecordBatchStream for WindowAggStream { /// Get the schema fn schema(&self) -> SchemaRef { - self.schema.clone() + Arc::clone(&self.schema) } } diff --git a/datafusion/physical-plan/src/work_table.rs b/datafusion/physical-plan/src/work_table.rs index 003957947fec..5f3cf6e2aee8 100644 --- a/datafusion/physical-plan/src/work_table.rs +++ b/datafusion/physical-plan/src/work_table.rs @@ -110,7 +110,7 @@ pub struct WorkTableExec { impl WorkTableExec { /// Create a new execution plan for a worktable exec. pub fn new(name: String, schema: SchemaRef) -> Self { - let cache = Self::compute_properties(schema.clone()); + let cache = Self::compute_properties(Arc::clone(&schema)); Self { name, schema, @@ -123,7 +123,7 @@ impl WorkTableExec { pub(super) fn with_work_table(&self, work_table: Arc) -> Self { Self { name: self.name.clone(), - schema: self.schema.clone(), + schema: Arc::clone(&self.schema), metrics: ExecutionPlanMetricsSet::new(), work_table, cache: self.cache.clone(), @@ -185,7 +185,7 @@ impl ExecutionPlan for WorkTableExec { self: Arc, _: Vec>, ) -> Result> { - Ok(self.clone()) + Ok(Arc::clone(&self) as Arc) } /// Stream the batches that were written to the work table. @@ -202,7 +202,7 @@ impl ExecutionPlan for WorkTableExec { } let batch = self.work_table.take()?; Ok(Box::pin( - MemoryStream::try_new(batch.batches, self.schema.clone(), None)? + MemoryStream::try_new(batch.batches, Arc::clone(&self.schema), None)? .with_reservation(batch.reservation), )) } From d9e52866c4da65f3c6227dd05308466b15a777ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=9E=97=E4=BC=9F?= Date: Wed, 3 Jul 2024 10:53:11 +0800 Subject: [PATCH 2/4] fmt --- datafusion/physical-plan/src/sorts/sort.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 26e9401f76f4..f347a0f5b6d5 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -396,8 +396,11 @@ impl ExternalSorter { let spill_file = self.runtime.disk_manager.create_tmp_file("Sorting")?; let batches = std::mem::take(&mut self.in_mem_batches); - let spilled_rows = - spill_record_batches(batches, spill_file.path().into(), Arc::clone(&self.schema))?; + let spilled_rows = spill_record_batches( + batches, + spill_file.path().into(), + Arc::clone(&self.schema), + )?; let used = self.reservation.free(); self.metrics.spill_count.add(1); self.metrics.spilled_bytes.add(used); From c7fed491230d215352ddca3ccca24aef154d3067 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 5 Jul 2024 08:20:32 -0400 Subject: [PATCH 3/4] Update for clippy --- datafusion/physical-plan/src/filter.rs | 4 ++-- datafusion/physical-plan/src/windows/mod.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index fc3e04f01daa..f0aa6f570dec 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -173,12 +173,12 @@ impl FilterExec { // Filter evaluates to single value for all partitions if input_eqs.is_expr_constant(binary.left()) { res_constants.push( - ConstExpr::new(binary.right().clone()) + ConstExpr::new(Arc::clone(binary.right())) .with_across_partitions(true), ) } else if input_eqs.is_expr_constant(binary.right()) { res_constants.push( - ConstExpr::new(binary.left().clone()) + ConstExpr::new(Arc::clone(binary.left())) .with_across_partitions(true), ) } diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index 714d23ef8f45..12c7e2f55e99 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -320,7 +320,7 @@ fn create_built_in_window_expr( )) } BuiltInWindowFunction::NthValue => { - let arg = args[0].clone(); + let arg = Arc::clone(&args[0]); let n = get_signed_integer( args[1] .as_any() @@ -603,7 +603,7 @@ pub fn get_window_mode( // Treat partition by exprs as constant. During analysis of requirements are satisfied. let const_exprs = partitionby_exprs .iter() - .map(|expr| ConstExpr::new(expr.clone())); + .map(|expr| ConstExpr::new(Arc::clone(expr))); let partition_by_eqs = input_eqs.add_constants(const_exprs); let order_by_reqs = PhysicalSortRequirement::from_sort_exprs(orderby_keys); let reverse_order_by_reqs = From 36170547adb2e683fddd29c3d72e673ea4f4b900 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 8 Jul 2024 12:48:28 -0400 Subject: [PATCH 4/4] Update new clippy --- datafusion/physical-plan/src/aggregates/mod.rs | 9 ++++++--- datafusion/physical-plan/src/joins/hash_join.rs | 6 +++--- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index df4ba289b10f..8caf10acf09b 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -2339,12 +2339,15 @@ mod tests { let b = Arc::new(Float32Array::from(vec![0.; 8192])); let c = Arc::new(Int32Array::from(vec![1; 8192])); - RecordBatch::try_new(schema.clone(), vec![a, b, c]).unwrap() + RecordBatch::try_new(Arc::clone(&schema), vec![a, b, c]).unwrap() }) .collect(); - let input = - Arc::new(MemoryExec::try_new(&[input_batches], schema.clone(), None)?); + let input = Arc::new(MemoryExec::try_new( + &[input_batches], + Arc::clone(&schema), + None, + )?); let aggregate_exec = Arc::new(AggregateExec::try_new( AggregateMode::Partial, diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 09e198199e02..2f4ee00da35f 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -3955,12 +3955,12 @@ mod tests { )]; let (_, batches_null_eq) = join_collect( - left.clone(), - right.clone(), + Arc::clone(&left), + Arc::clone(&right), on.clone(), &JoinType::Inner, true, - task_ctx.clone(), + Arc::clone(&task_ctx), ) .await?;