diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 5a32349c65e9..621cb9b64d27 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -473,6 +473,10 @@ config_namespace! { /// When set to true, the explain statement will only print physical plans pub physical_plan_only: bool, default = false + + /// When set to true, the explain statement will print operator statistics + /// for physical plans + pub show_statistics: bool, default = false } } diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index d0f150a3166e..db788efef7cd 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -17,6 +17,8 @@ //! This module provides data structures to represent statistics +use std::fmt::Display; + use crate::ScalarValue; /// Statistics for a relation @@ -37,6 +39,25 @@ pub struct Statistics { pub is_exact: bool, } +impl Display for Statistics { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if self.num_rows.is_none() && self.total_byte_size.is_none() && !self.is_exact { + return Ok(()); + } + + let rows = self + .num_rows + .map_or_else(|| "None".to_string(), |v| v.to_string()); + let bytes = self + .total_byte_size + .map_or_else(|| "None".to_string(), |v| v.to_string()); + + write!(f, "rows={}, bytes={}, exact={}", rows, bytes, self.is_exact)?; + + Ok(()) + } +} + /// Statistics for a column within a relation #[derive(Clone, Debug, Default, PartialEq, Eq)] pub struct ColumnStatistics { diff --git a/datafusion/core/src/physical_plan/analyze.rs b/datafusion/core/src/physical_plan/analyze.rs index 0e6edc618242..98fce19a1dd7 100644 --- a/datafusion/core/src/physical_plan/analyze.rs +++ b/datafusion/core/src/physical_plan/analyze.rs @@ -39,6 +39,8 @@ use datafusion_execution::TaskContext; pub struct AnalyzeExec { /// control how much extra to print verbose: bool, + /// if statistics should be displayed + show_statistics: bool, /// The input plan (the plan being analyzed) pub(crate) input: Arc, /// The output schema for RecordBatches of this exec node @@ -47,9 +49,15 @@ pub struct AnalyzeExec { impl AnalyzeExec { /// Create a new AnalyzeExec - pub fn new(verbose: bool, input: Arc, schema: SchemaRef) -> Self { + pub fn new( + verbose: bool, + show_statistics: bool, + input: Arc, + schema: SchemaRef, + ) -> Self { AnalyzeExec { verbose, + show_statistics, input, schema, } @@ -111,6 +119,7 @@ impl ExecutionPlan for AnalyzeExec { ) -> Result> { Ok(Arc::new(Self::new( self.verbose, + self.show_statistics, children.pop().unwrap(), self.schema.clone(), ))) @@ -143,6 +152,7 @@ impl ExecutionPlan for AnalyzeExec { let captured_input = self.input.clone(); let captured_schema = self.schema.clone(); let verbose = self.verbose; + let show_statistics = self.show_statistics; // future that gathers the results from all the tasks in the // JoinSet that computes the overall row count and final @@ -157,6 +167,7 @@ impl ExecutionPlan for AnalyzeExec { let duration = Instant::now() - start; create_output_batch( verbose, + show_statistics, total_rows, duration, captured_input, @@ -179,6 +190,7 @@ impl ExecutionPlan for AnalyzeExec { /// Creates the ouput of AnalyzeExec as a RecordBatch fn create_output_batch( verbose: bool, + show_statistics: bool, total_rows: usize, duration: std::time::Duration, input: Arc, @@ -191,6 +203,7 @@ fn create_output_batch( type_builder.append_value("Plan with Metrics"); let annotated_plan = DisplayableExecutionPlan::with_metrics(input.as_ref()) + .set_show_statistics(show_statistics) .indent(verbose) .to_string(); plan_builder.append_value(annotated_plan); @@ -201,6 +214,7 @@ fn create_output_batch( type_builder.append_value("Plan with Full Metrics"); let annotated_plan = DisplayableExecutionPlan::with_full_metrics(input.as_ref()) + .set_show_statistics(show_statistics) .indent(verbose) .to_string(); plan_builder.append_value(annotated_plan); @@ -245,7 +259,7 @@ mod tests { let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1)); let refs = blocking_exec.refs(); - let analyze_exec = Arc::new(AnalyzeExec::new(true, blocking_exec, schema)); + let analyze_exec = Arc::new(AnalyzeExec::new(true, false, blocking_exec, schema)); let fut = collect(analyze_exec, task_ctx); let mut fut = fut.boxed(); diff --git a/datafusion/core/src/physical_plan/display.rs b/datafusion/core/src/physical_plan/display.rs index 15109cba95df..3b345bdf9e3a 100644 --- a/datafusion/core/src/physical_plan/display.rs +++ b/datafusion/core/src/physical_plan/display.rs @@ -42,38 +42,49 @@ pub struct DisplayableExecutionPlan<'a> { inner: &'a dyn ExecutionPlan, /// How to show metrics show_metrics: ShowMetrics, + /// If statistics should be displayed + show_statistics: bool, } impl<'a> DisplayableExecutionPlan<'a> { - /// Create a wrapper around an [`'ExecutionPlan'] which can be + /// Create a wrapper around an [`ExecutionPlan`] which can be /// pretty printed in a variety of ways pub fn new(inner: &'a dyn ExecutionPlan) -> Self { Self { inner, show_metrics: ShowMetrics::None, + show_statistics: false, } } - /// Create a wrapper around an [`'ExecutionPlan'] which can be + /// Create a wrapper around an [`ExecutionPlan`] which can be /// pretty printed in a variety of ways that also shows aggregated /// metrics pub fn with_metrics(inner: &'a dyn ExecutionPlan) -> Self { Self { inner, show_metrics: ShowMetrics::Aggregated, + show_statistics: false, } } - /// Create a wrapper around an [`'ExecutionPlan'] which can be + /// Create a wrapper around an [`ExecutionPlan`] which can be /// pretty printed in a variety of ways that also shows all low /// level metrics pub fn with_full_metrics(inner: &'a dyn ExecutionPlan) -> Self { Self { inner, show_metrics: ShowMetrics::Full, + show_statistics: false, } } + /// Enable display of statistics + pub fn set_show_statistics(mut self, show_statistics: bool) -> Self { + self.show_statistics = show_statistics; + self + } + /// Return a `format`able structure that produces a single line /// per node. /// @@ -94,6 +105,7 @@ impl<'a> DisplayableExecutionPlan<'a> { format_type: DisplayFormatType, plan: &'a dyn ExecutionPlan, show_metrics: ShowMetrics, + show_statistics: bool, } impl<'a> fmt::Display for Wrapper<'a> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { @@ -102,6 +114,7 @@ impl<'a> DisplayableExecutionPlan<'a> { f, indent: 0, show_metrics: self.show_metrics, + show_statistics: self.show_statistics, }; accept(self.plan, &mut visitor) } @@ -110,6 +123,7 @@ impl<'a> DisplayableExecutionPlan<'a> { format_type, plan: self.inner, show_metrics: self.show_metrics, + show_statistics: self.show_statistics, } } @@ -128,6 +142,7 @@ impl<'a> DisplayableExecutionPlan<'a> { struct Wrapper<'a> { plan: &'a dyn ExecutionPlan, show_metrics: ShowMetrics, + show_statistics: bool, } impl<'a> fmt::Display for Wrapper<'a> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { @@ -137,6 +152,7 @@ impl<'a> DisplayableExecutionPlan<'a> { f, t, show_metrics: self.show_metrics, + show_statistics: self.show_statistics, graphviz_builder: GraphvizBuilder::default(), parents: Vec::new(), }; @@ -153,6 +169,7 @@ impl<'a> DisplayableExecutionPlan<'a> { Wrapper { plan: self.inner, show_metrics: self.show_metrics, + show_statistics: self.show_statistics, } } @@ -162,6 +179,7 @@ impl<'a> DisplayableExecutionPlan<'a> { struct Wrapper<'a> { plan: &'a dyn ExecutionPlan, show_metrics: ShowMetrics, + show_statistics: bool, } impl<'a> fmt::Display for Wrapper<'a> { @@ -171,6 +189,7 @@ impl<'a> DisplayableExecutionPlan<'a> { t: DisplayFormatType::Default, indent: 0, show_metrics: self.show_metrics, + show_statistics: self.show_statistics, }; visitor.pre_visit(self.plan)?; Ok(()) @@ -180,6 +199,7 @@ impl<'a> DisplayableExecutionPlan<'a> { Wrapper { plan: self.inner, show_metrics: self.show_metrics, + show_statistics: self.show_statistics, } } @@ -215,6 +235,8 @@ struct IndentVisitor<'a, 'b> { indent: usize, /// How to show metrics show_metrics: ShowMetrics, + /// If statistics should be displayed + show_statistics: bool, } impl<'a, 'b> ExecutionPlanVisitor for IndentVisitor<'a, 'b> { @@ -244,6 +266,9 @@ impl<'a, 'b> ExecutionPlanVisitor for IndentVisitor<'a, 'b> { } } } + if self.show_statistics { + write!(self.f, ", statistics=[{}]", plan.statistics())?; + } writeln!(self.f)?; self.indent += 1; Ok(true) @@ -261,6 +286,9 @@ struct GraphvizVisitor<'a, 'b> { t: DisplayFormatType, /// How to show metrics show_metrics: ShowMetrics, + /// If statistics should be displayed + show_statistics: bool, + graphviz_builder: GraphvizBuilder, /// Used to record parent node ids when visiting a plan. parents: Vec, @@ -318,8 +346,24 @@ impl ExecutionPlanVisitor for GraphvizVisitor<'_, '_> { } }; - self.graphviz_builder - .add_node(self.f, id, &label, Some(&metrics))?; + let statistics = if self.show_statistics { + format!("statistics=[{}]", plan.statistics()) + } else { + "".to_string() + }; + + let delimiter = if !metrics.is_empty() && !statistics.is_empty() { + ", " + } else { + "" + }; + + self.graphviz_builder.add_node( + self.f, + id, + &label, + Some(&format!("{}{}{}", metrics, delimiter, statistics)), + )?; if let Some(parent_node_id) = self.parents.last() { self.graphviz_builder diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 3a59f40eded8..35f5f964813e 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1892,6 +1892,7 @@ impl DefaultPhysicalPlanner { Ok(input) => { stringified_plans.push( displayable(input.as_ref()) + .set_show_statistics(config.show_statistics) .to_stringified(e.verbose, InitialPhysicalPlan), ); @@ -1903,12 +1904,14 @@ impl DefaultPhysicalPlanner { let plan_type = OptimizedPhysicalPlan { optimizer_name }; stringified_plans.push( displayable(plan) + .set_show_statistics(config.show_statistics) .to_stringified(e.verbose, plan_type), ); }, ) { Ok(input) => stringified_plans.push( displayable(input.as_ref()) + .set_show_statistics(config.show_statistics) .to_stringified(e.verbose, FinalPhysicalPlan), ), Err(DataFusionError::Context(optimizer_name, e)) => { @@ -1932,7 +1935,13 @@ impl DefaultPhysicalPlanner { } else if let LogicalPlan::Analyze(a) = logical_plan { let input = self.create_physical_plan(&a.input, session_state).await?; let schema = SchemaRef::new((*a.schema).clone().into()); - Ok(Some(Arc::new(AnalyzeExec::new(a.verbose, input, schema)))) + let show_statistics = session_state.config_options().explain.show_statistics; + Ok(Some(Arc::new(AnalyzeExec::new( + a.verbose, + show_statistics, + input, + schema, + )))) } else { Ok(None) } @@ -2716,4 +2725,27 @@ digraph { assert_eq!(expected_graph, generated_graph); } + + #[tokio::test] + async fn test_display_graphviz_with_statistics() { + let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]); + + let logical_plan = scan_empty(Some("employee"), &schema, None) + .unwrap() + .project(vec![col("id") + lit(2)]) + .unwrap() + .build() + .unwrap(); + + let plan = plan(&logical_plan).await.unwrap(); + + let expected_tooltip = ", tooltip=\"statistics=["; + + let generated_graph = format!( + "{}", + displayable(&*plan).set_show_statistics(true).graphviz() + ); + + assert_contains!(generated_graph, expected_tooltip); + } } diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index 12dff15dc0a8..f32ffc1642cd 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -807,3 +807,21 @@ async fn explain_physical_plan_only() { ]]; assert_eq!(expected, actual); } + +#[tokio::test] +async fn csv_explain_analyze_with_statistics() { + let mut config = ConfigOptions::new(); + config.explain.physical_plan_only = true; + config.explain.show_statistics = true; + let ctx = SessionContext::with_config(config.into()); + register_aggregate_csv_by_sql(&ctx).await; + + let sql = "EXPLAIN ANALYZE SELECT c1 FROM aggregate_test_100"; + let actual = execute_to_batches(&ctx, sql).await; + let formatted = arrow::util::pretty::pretty_format_batches(&actual) + .unwrap() + .to_string(); + + // should contain scan statistics + assert_contains!(&formatted, ", statistics=[]"); +} diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index ad9b2be40e9e..d44d19737afd 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -251,3 +251,39 @@ physical_plan after EnforceSorting SAME TEXT AS ABOVE physical_plan after coalesce_batches SAME TEXT AS ABOVE physical_plan after PipelineChecker SAME TEXT AS ABOVE physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true + + +### tests for EXPLAIN with display statistics enabled +statement ok +set datafusion.explain.show_statistics = true; + +statement ok +set datafusion.explain.physical_plan_only = true; + +# CSV scan with empty statistics +query TT +EXPLAIN SELECT a, b, c FROM simple_explain_test limit 10; +---- +physical_plan +GlobalLimitExec: skip=0, fetch=10, statistics=[rows=10, bytes=None, exact=false] +--CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], limit=10, has_header=true, statistics=[] + +# Parquet scan with statistics collected +statement ok +set datafusion.execution.collect_statistics = true; + +statement ok +CREATE EXTERNAL TABLE alltypes_plain STORED AS PARQUET LOCATION '../../parquet-testing/data/alltypes_plain.parquet'; + +query TT +EXPLAIN SELECT * FROM alltypes_plain limit 10; +---- +physical_plan +GlobalLimitExec: skip=0, fetch=10, statistics=[rows=8, bytes=None, exact=true] +--ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, statistics=[rows=8, bytes=None, exact=true] + +statement ok +set datafusion.execution.collect_statistics = false; + +statement ok +set datafusion.explain.show_statistics = false; diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 5db305105f53..362f162b0592 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -179,6 +179,7 @@ datafusion.execution.target_partitions 7 datafusion.execution.time_zone +00:00 datafusion.explain.logical_plan_only false datafusion.explain.physical_plan_only false +datafusion.explain.show_statistics false datafusion.optimizer.allow_symmetric_joins_without_pruning true datafusion.optimizer.bounded_order_preserving_variants false datafusion.optimizer.enable_round_robin_repartition true diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 0d3abeac9fbf..21b0d8bbfd52 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -92,6 +92,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.optimizer.hash_join_single_partition_threshold | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition | | datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans | | datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans | +| datafusion.explain.show_statistics | false | When set to true, the explain statement will print operator statistics for physical plans | | datafusion.sql_parser.parse_float_as_decimal | false | When set to true, SQL parser will parse float as decimal type | | datafusion.sql_parser.enable_ident_normalization | true | When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted) | | datafusion.sql_parser.dialect | generic | Configure the SQL dialect used by DataFusion's parser; supported values include: Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, and Ansi. |