Skip to content

Commit

Permalink
RFC: replace DatafusionError::Plam with concise macro
Browse files Browse the repository at this point in the history
  • Loading branch information
comphead committed Jul 27, 2023
1 parent 4d6ff35 commit d3e394a
Show file tree
Hide file tree
Showing 14 changed files with 168 additions and 180 deletions.
10 changes: 3 additions & 7 deletions benchmarks/src/tpch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
use arrow::datatypes::SchemaBuilder;
use datafusion::{
arrow::datatypes::{DataType, Field, Schema},
common::plan_err,
error::{DataFusionError, Result},
};
use std::fs;

mod run;
pub use run::RunOpt;

Expand Down Expand Up @@ -158,13 +158,9 @@ pub fn get_query_sql(query: usize) -> Result<Vec<String>> {
Err(e) => errors.push(format!("{filename}: {e}")),
};
}
Err(DataFusionError::Plan(format!(
"invalid query. Could not find query: {errors:?}"
)))
plan_err!("invalid query. Could not find query: {:?}", errors)
} else {
Err(DataFusionError::Plan(
"invalid query. Expected value between 1 and 22".to_owned(),
))
plan_err!("invalid query. Expected value between 1 and 22")
}
}

Expand Down
37 changes: 25 additions & 12 deletions datafusion/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,18 +99,6 @@ macro_rules! context {
};
}

#[macro_export]
macro_rules! plan_err {
($desc:expr) => {
Err(datafusion_common::DataFusionError::Plan(format!(
"{} at {}:{}",
$desc,
file!(),
line!()
)))
};
}

/// Schema-related errors
#[derive(Debug)]
pub enum SchemaError {
Expand Down Expand Up @@ -533,3 +521,28 @@ macro_rules! unwrap_or_internal_err {
})?
};
}

#[macro_export]
macro_rules! with_dollar_sign {
($($body:tt)*) => {
macro_rules! __with_dollar_sign { $($body)* }
__with_dollar_sign!($);
}
}

macro_rules! make_error {
($NAME:ident, $ERR:ident) => {
with_dollar_sign! {
($d:tt) => {
#[macro_export]
macro_rules! $NAME {
($d($d args:expr),*) => {
Err(DataFusionError::$ERR(format!($d($d args),*).into()))
}
}
}
}
};
}

make_error!(plan_err, Plan);
9 changes: 4 additions & 5 deletions datafusion/core/src/datasource/file_format/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::sync::Arc;

use arrow::datatypes::{DataType, Schema, SchemaRef};
use async_trait::async_trait;
use datafusion_common::DataFusionError;
use datafusion_common::{plan_err, DataFusionError};

use crate::datasource::file_format::arrow::{ArrowFormat, DEFAULT_ARROW_EXTENSION};
use crate::datasource::file_format::avro::DEFAULT_AVRO_EXTENSION;
Expand Down Expand Up @@ -439,10 +439,9 @@ pub trait ReadOptions<'a> {
.to_listing_options(config)
.infer_schema(&state, &table_path)
.await?),
(None, true) => Err(DataFusionError::Plan(
"Schema inference for infinite data sources is not supported."
.to_string(),
)),
(None, true) => {
plan_err!("Schema inference for infinite data sources is not supported.")
}
}
}
}
Expand Down
26 changes: 10 additions & 16 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef};
use arrow_schema::Schema;
use async_trait::async_trait;
use dashmap::DashMap;
use datafusion_common::{SchemaExt, ToDFSchema};
use datafusion_common::{plan_err, SchemaExt, ToDFSchema};
use datafusion_expr::expr::Sort;
use datafusion_optimizer::utils::conjunction;
use datafusion_physical_expr::{create_physical_expr, LexOrdering, PhysicalSortExpr};
Expand Down Expand Up @@ -640,14 +640,10 @@ impl ListingTable {
})
}
else {
Err(DataFusionError::Plan(
format!("Expected single column references in output_ordering, got {expr}")
))
plan_err!("Expected single column references in output_ordering, got {expr}")
}
} else {
Err(DataFusionError::Plan(
format!("Expected Expr::Sort in output_ordering, but got {expr}")
))
plan_err!("Expected Expr::Sort in output_ordering, but got {expr}")
}
})
.collect::<Result<Vec<_>>>()?;
Expand Down Expand Up @@ -777,17 +773,16 @@ impl TableProvider for ListingTable {
) -> Result<Arc<dyn ExecutionPlan>> {
// Check that the schema of the plan matches the schema of this table.
if !self.schema().equivalent_names_and_types(&input.schema()) {
return Err(DataFusionError::Plan(
return plan_err!(
// Return an error if schema of the input query does not match with the table schema.
"Inserting query must have the same schema with the table.".to_string(),
));
"Inserting query must have the same schema with the table."
);
}

if self.table_paths().len() > 1 {
return Err(DataFusionError::Plan(
return plan_err!(
"Writing to a table backed by multiple files is not supported yet"
.to_owned(),
));
);
}

let table_path = &self.table_paths()[0];
Expand All @@ -806,10 +801,9 @@ impl TableProvider for ListingTable {
let file_groups = file_list_stream.try_collect::<Vec<_>>().await?;

if file_groups.len() > 1 {
return Err(DataFusionError::Plan(
return plan_err!(
"Datafusion currently supports tables from single partition and/or file."
.to_owned(),
));
);
}

// Sink related option, apart from format
Expand Down
12 changes: 5 additions & 7 deletions datafusion/core/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::sync::Arc;
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use datafusion_common::SchemaExt;
use datafusion_common::{plan_err, SchemaExt};
use datafusion_execution::TaskContext;
use tokio::sync::RwLock;
use tokio::task::JoinSet;
Expand Down Expand Up @@ -64,9 +64,7 @@ impl MemTable {
"mem table schema does not contain batches schema. \
Target_schema: {schema:?}. Batches Schema: {batches_schema:?}"
);
return Err(DataFusionError::Plan(
"Mismatch between schema and batches".to_string(),
));
return plan_err!("Mismatch between schema and batches");
}
}

Expand Down Expand Up @@ -196,9 +194,9 @@ impl TableProvider for MemTable {
// Create a physical plan from the logical plan.
// Check that the schema of the plan matches the schema of this table.
if !self.schema().equivalent_names_and_types(&input.schema()) {
return Err(DataFusionError::Plan(
"Inserting query must have the same schema with the table.".to_string(),
));
return plan_err!(
"Inserting query must have the same schema with the table."
);
}
let sink = Arc::new(MemSink::new(self.batches.clone()));
Ok(Arc::new(InsertExec::new(input, sink, self.schema.clone())))
Expand Down
9 changes: 6 additions & 3 deletions datafusion/core/src/datasource/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ use crate::{
scalar::ScalarValue,
};

use datafusion_common::tree_node::{TreeNode, VisitRecursion};
use datafusion_common::{
plan_err,
tree_node::{TreeNode, VisitRecursion},
};
use datafusion_physical_expr::expressions::Column;

use arrow::compute::cast;
Expand Down Expand Up @@ -557,12 +560,12 @@ impl SchemaAdapter {
projection.push(file_idx);
}
false => {
return Err(DataFusionError::Plan(format!(
return plan_err!(
"Cannot cast file schema field {} of type {:?} to table schema field of type {:?}",
file_field.name(),
file_field.data_type(),
table_field.data_type()
)))
)
}
}
}
Expand Down
6 changes: 2 additions & 4 deletions datafusion/core/src/datasource/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::sync::Arc;
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;

use datafusion_common::{DataFusionError, Result};
use datafusion_common::{plan_err, DataFusionError, Result};
use datafusion_expr::{Expr, TableType};
use log::debug;

Expand Down Expand Up @@ -52,9 +52,7 @@ impl StreamingTable {
"target schema does not contain partition schema. \
Target_schema: {schema:?}. Partiton Schema: {partition_schema:?}"
);
return Err(DataFusionError::Plan(
"Mismatch between schema and batches".to_string(),
));
return plan_err!("Mismatch between schema and batches");
}
}

Expand Down
9 changes: 4 additions & 5 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::{
optimizer::optimizer::Optimizer,
physical_optimizer::optimizer::{PhysicalOptimizer, PhysicalOptimizerRule},
};
use datafusion_common::alias::AliasGenerator;
use datafusion_common::{alias::AliasGenerator, plan_err};
use datafusion_execution::registry::SerializerRegistry;
use datafusion_expr::{
logical_plan::{DdlStatement, Statement},
Expand Down Expand Up @@ -958,10 +958,9 @@ impl SessionContext {
(Some(s), _) => s,
(None, false) => options.infer_schema(&self.state(), &table_path).await?,
(None, true) => {
return Err(DataFusionError::Plan(
return plan_err!(
"Schema inference for infinite data sources is not supported."
.to_string(),
))
)
}
};
let config = ListingTableConfig::new(table_path)
Expand Down Expand Up @@ -1182,7 +1181,7 @@ impl SessionContext {
let schema = self.state.read().schema_for_ref(table_ref)?;
match schema.table(&table).await {
Some(ref provider) => Ok(Arc::clone(provider)),
_ => Err(DataFusionError::Plan(format!("No table named '{table}'"))),
_ => plan_err!("No table named '{table}'"),
}
}

Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_optimizer/pipeline_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::physical_plan::joins::SymmetricHashJoinExec;
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
use datafusion_common::config::OptimizerOptions;
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_common::DataFusionError;
use datafusion_common::{plan_err, DataFusionError};
use datafusion_physical_expr::intervals::{check_support, is_datatype_supported};
use std::sync::Arc;

Expand Down Expand Up @@ -142,7 +142,7 @@ pub fn check_finiteness_requirements(
{
const MSG: &str = "Join operation cannot operate on a non-prunable stream without enabling \
the 'allow_symmetric_joins_without_pruning' configuration flag";
return Err(DataFusionError::Plan(MSG.to_owned()));
return plan_err!("{}", MSG);
}
}
input
Expand Down
Loading

0 comments on commit d3e394a

Please sign in to comment.