Skip to content

Commit

Permalink
Add internal_err error macros. Part 2
Browse files Browse the repository at this point in the history
  • Loading branch information
comphead committed Aug 17, 2023
1 parent d2f9025 commit cd5cc20
Show file tree
Hide file tree
Showing 55 changed files with 482 additions and 468 deletions.
6 changes: 3 additions & 3 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@ macro_rules! config_namespace {
$(
stringify!($field_name) => self.$field_name.set(rem, value),
)*
_ => Err(DataFusionError::Internal(
format!(concat!("Config value \"{}\" not found on ", stringify!($struct_name)), key)
))
_ => _internal_err!(
"Config value \"{}\" not found on {}", key, stringify!($struct_name)
)
}
}

Expand Down
21 changes: 8 additions & 13 deletions datafusion/common/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ macro_rules! primitive_right {
};
($TERM:expr, /, $SCALAR:ident) => {
internal_err!(
"Can not divide an uninitialized value to a non-floating point value",
"Can not divide an uninitialized value to a non-floating point value"
)
};
($TERM:expr, &, $SCALAR:ident) => {
Expand All @@ -722,11 +722,10 @@ macro_rules! primitive_right {

macro_rules! unsigned_subtraction_error {
($SCALAR:expr) => {{
let msg = format!(
_internal_err!(
"Can not subtract a {} value from an uninitialized value",
$SCALAR
);
Err(DataFusionError::Internal(msg))
)
}};
}

Expand Down Expand Up @@ -1404,9 +1403,7 @@ where
DT_MODE => add_day_time(prior, interval as i64, sign),
MDN_MODE => add_m_d_nano(prior, interval, sign),
_ => {
return Err(DataFusionError::Internal(
"Undefined interval mode for interval calculations".to_string(),
));
return _internal_err!("Undefined interval mode for interval calculations");
}
})
}
Expand Down Expand Up @@ -2241,9 +2238,9 @@ impl ScalarValue {
// figure out the type based on the first element
let data_type = match scalars.peek() {
None => {
return Err(DataFusionError::Internal(
"Empty iterator passed to ScalarValue::iter_to_array".to_string(),
));
return _internal_err!(
"Empty iterator passed to ScalarValue::iter_to_array"
);
}
Some(sv) => sv.get_datatype(),
};
Expand Down Expand Up @@ -3062,9 +3059,7 @@ impl ScalarValue {
Ok(ScalarValue::Decimal256(Some(value), precision, scale))
}
}
_ => Err(DataFusionError::Internal(
"Unsupported decimal type".to_string(),
)),
_ => _internal_err!("Unsupported decimal type"),
}
}

Expand Down
6 changes: 2 additions & 4 deletions datafusion/core/src/datasource/default_table_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::sync::Arc;
use crate::datasource::TableProvider;

use arrow::datatypes::SchemaRef;
use datafusion_common::{Constraints, DataFusionError};
use datafusion_common::{internal_err, Constraints, DataFusionError};
use datafusion_expr::{Expr, TableProviderFilterPushDown, TableSource};

/// DataFusion default table source, wrapping TableProvider
Expand Down Expand Up @@ -91,8 +91,6 @@ pub fn source_as_provider(
.downcast_ref::<DefaultTableSource>()
{
Some(source) => Ok(source.table_provider.clone()),
_ => Err(DataFusionError::Internal(
"TableSource was not DefaultTableSource".to_string(),
)),
_ => internal_err!("TableSource was not DefaultTableSource"),
}
}
5 changes: 2 additions & 3 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,7 @@ mod tests {
use bytes::Bytes;
use chrono::DateTime;
use datafusion_common::cast::as_string_array;
use datafusion_common::internal_err;
use datafusion_expr::{col, lit};
use futures::StreamExt;
use object_store::local::LocalFileSystem;
Expand Down Expand Up @@ -972,9 +973,7 @@ mod tests {
}
}

Err(DataFusionError::Internal(
"query contains no CsvExec".to_string(),
))
internal_err!("query contains no CsvExec")
}

#[rstest(n_partitions, case(1), case(2), case(3), case(4))]
Expand Down
10 changes: 5 additions & 5 deletions datafusion/core/src/datasource/file_format/file_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@

//! File type abstraction

use crate::error::{DataFusionError, Result};

use crate::common::internal_err;
use crate::datasource::file_format::arrow::DEFAULT_ARROW_EXTENSION;
use crate::datasource::file_format::avro::DEFAULT_AVRO_EXTENSION;
use crate::datasource::file_format::csv::DEFAULT_CSV_EXTENSION;
use crate::datasource::file_format::json::DEFAULT_JSON_EXTENSION;
use crate::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
use crate::error::{DataFusionError, Result};
#[cfg(feature = "compression")]
use async_compression::tokio::bufread::{
BzDecoder as AsyncBzDecoder, BzEncoder as AsyncBzEncoder,
Expand Down Expand Up @@ -291,9 +291,9 @@ impl FileType {
FileType::JSON | FileType::CSV => Ok(format!("{}{}", ext, c.get_ext())),
FileType::PARQUET | FileType::AVRO | FileType::ARROW => match c.variant {
UNCOMPRESSED => Ok(ext),
_ => Err(DataFusionError::Internal(
"FileCompressionType can be specified for CSV/JSON FileType.".into(),
)),
_ => internal_err!(
"FileCompressionType can be specified for CSV/JSON FileType."
),
},
}
}
Expand Down
7 changes: 3 additions & 4 deletions datafusion/core/src/datasource/file_format/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::error::Result;
use crate::physical_plan::SendableRecordBatchStream;

use arrow_array::RecordBatch;
use datafusion_common::internal_err;
use datafusion_common::DataFusionError;

use async_trait::async_trait;
Expand Down Expand Up @@ -331,13 +332,11 @@ pub(crate) async fn stateless_serialize_and_write_files(
per_thread_output: bool,
) -> Result<u64> {
if !per_thread_output && (serializers.len() != 1 || writers.len() != 1) {
return Err(DataFusionError::Internal(
"per_thread_output is false, but got more than 1 writer!".into(),
));
return internal_err!("per_thread_output is false, but got more than 1 writer!");
}
let num_partitions = data.len();
if per_thread_output && (num_partitions != writers.len()) {
return Err(DataFusionError::Internal("per_thread_output is true, but did not get 1 writer for each output partition!".into()));
return internal_err!("per_thread_output is true, but did not get 1 writer for each output partition!");
}
let mut row_count = 0;
// Map errors to DatafusionError.
Expand Down
6 changes: 2 additions & 4 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef};
use arrow_schema::Schema;
use async_trait::async_trait;
use dashmap::DashMap;
use datafusion_common::{plan_err, project_schema, SchemaExt, ToDFSchema};
use datafusion_common::{internal_err, plan_err, project_schema, SchemaExt, ToDFSchema};
use datafusion_expr::expr::Sort;
use datafusion_optimizer::utils::conjunction;
use datafusion_physical_expr::{create_physical_expr, LexOrdering, PhysicalSortExpr};
Expand Down Expand Up @@ -195,9 +195,7 @@ impl ListingTableConfig {
options: Some(options),
})
}
None => Err(DataFusionError::Internal(
"No `ListingOptions` set for inferring schema".into(),
)),
None => internal_err!("No `ListingOptions` set for inferring schema"),
}
}

Expand Down
6 changes: 2 additions & 4 deletions datafusion/core/src/datasource/physical_plan/file_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,7 @@ impl<F: FileOpener> RecordBatchStream for FileStream<F> {
#[cfg(test)]
mod tests {
use arrow_schema::Schema;
use datafusion_common::internal_err;
use datafusion_common::DataFusionError;

use super::*;
Expand Down Expand Up @@ -557,10 +558,7 @@ mod tests {
let idx = self.current_idx.fetch_add(1, Ordering::SeqCst);

if self.error_opening_idx.contains(&idx) {
Ok(futures::future::ready(Err(DataFusionError::Internal(
"error opening".to_owned(),
)))
.boxed())
Ok(futures::future::ready(internal_err!("error opening")).boxed())
} else if self.error_scanning_idx.contains(&idx) {
let error = futures::future::ready(Err(ArrowError::IoError(
"error scanning".to_owned(),
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -545,9 +545,9 @@ fn swap_join_according_to_unboundedness(
(PartitionMode::CollectLeft, _) => {
swap_hash_join(hash_join, PartitionMode::CollectLeft)
}
(PartitionMode::Auto, _) => Err(DataFusionError::Internal(
"Auto is not acceptable for unbounded input here.".to_string(),
)),
(PartitionMode::Auto, _) => {
internal_err!("Auto is not acceptable for unbounded input here.")
}
}
}

Expand Down
4 changes: 1 addition & 3 deletions datafusion/core/src/physical_plan/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,7 @@ impl ExecutionPlan for AnalyzeExec {
/// If the plan does not support pipelining, but its input(s) are
/// infinite, returns an error to indicate this.
fn unbounded_output(&self, _children: &[bool]) -> Result<bool> {
Err(DataFusionError::Internal(
"Optimization not supported for ANALYZE".to_string(),
))
internal_err!("Optimization not supported for ANALYZE")
}

/// Get the output partitioning of this plan
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/physical_plan/coalesce_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,9 @@ impl ExecutionPlan for CoalescePartitionsExec {

let input_partitions = self.input.output_partitioning().partition_count();
match input_partitions {
0 => Err(DataFusionError::Internal(
"CoalescePartitionsExec requires at least one input partition".to_owned(),
)),
0 => internal_err!(
"CoalescePartitionsExec requires at least one input partition"
),
1 => {
// bypass any threading / metrics if there is a single partition
self.input.execute(0, context)
Expand Down
6 changes: 2 additions & 4 deletions datafusion/core/src/physical_plan/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use std::fmt::Debug;
use std::sync::Arc;

use crate::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion_common::DataFusionError;
use datafusion_common::{internal_err, DataFusionError};
use datafusion_execution::TaskContext;

/// `DataSink` implements writing streams of [`RecordBatch`]es to
Expand Down Expand Up @@ -232,9 +232,7 @@ impl ExecutionPlan for FileSinkExec {
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
if partition != 0 {
return Err(DataFusionError::Internal(
"FileSinkExec can only be called on partition 0!".into(),
));
return internal_err!("FileSinkExec can only be called on partition 0!");
}
let data = self.execute_all_input_streams(context.clone())?;

Expand Down
30 changes: 15 additions & 15 deletions datafusion/core/src/physical_plan/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -966,9 +966,9 @@ pub fn equal_rows(
equal_rows_elem!(Time32MillisecondArray, l, r, left, right, null_equals_null)
}
_ => {
err = Some(Err(DataFusionError::Internal(
"Unsupported data type in hasher".to_string(),
)));
err = Some(internal_err!(
"Unsupported data type in hasher"
));
false
}
}
Expand All @@ -980,9 +980,9 @@ pub fn equal_rows(
equal_rows_elem!(Time64NanosecondArray, l, r, left, right, null_equals_null)
}
_ => {
err = Some(Err(DataFusionError::Internal(
"Unsupported data type in hasher".to_string(),
)));
err = Some(internal_err!(
"Unsupported data type in hasher"
));
false
}
}
Expand Down Expand Up @@ -1049,16 +1049,16 @@ pub fn equal_rows(
null_equals_null
)
} else {
err = Some(Err(DataFusionError::Internal(
"Inconsistent Decimal data type in hasher, the scale should be same".to_string(),
)));
err = Some(internal_err!(
"Inconsistent Decimal data type in hasher, the scale should be same"
));
false
}
}
_ => {
err = Some(Err(DataFusionError::Internal(
"Unsupported data type in hasher".to_string(),
)));
err = Some(internal_err!(
"Unsupported data type in hasher"
));
false
}
},
Expand Down Expand Up @@ -1148,9 +1148,9 @@ pub fn equal_rows(
}
_ => {
// should not happen
err = Some(Err(DataFusionError::Internal(
"Unsupported data type in hasher".to_string(),
)));
err = Some(internal_err!(
"Unsupported data type in hasher"
));
false
}
}
Expand Down
4 changes: 1 addition & 3 deletions datafusion/core/src/physical_plan/joins/sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,9 +311,7 @@ impl ExecutionPlan for SortMergeJoinExec {
self.sort_options.clone(),
self.null_equals_null,
)?)),
_ => Err(DataFusionError::Internal(
"SortMergeJoin wrong number of children".to_string(),
)),
_ => internal_err!("SortMergeJoin wrong number of children"),
}
}

Expand Down
8 changes: 2 additions & 6 deletions datafusion/core/src/physical_plan/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,7 @@ impl ExecutionPlan for GlobalLimitExec {

// GlobalLimitExec requires a single input partition
if 1 != self.input.output_partitioning().partition_count() {
return Err(DataFusionError::Internal(
"GlobalLimitExec requires a single input partition".to_owned(),
));
return internal_err!("GlobalLimitExec requires a single input partition");
}

let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
Expand Down Expand Up @@ -331,9 +329,7 @@ impl ExecutionPlan for LocalLimitExec {
children[0].clone(),
self.fetch,
))),
_ => Err(DataFusionError::Internal(
"LocalLimitExec wrong number of children".to_string(),
)),
_ => internal_err!("LocalLimitExec wrong number of children"),
}
}

Expand Down
6 changes: 2 additions & 4 deletions datafusion/core/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use self::{
use crate::datasource::physical_plan::FileScanConfig;
use crate::physical_plan::expressions::PhysicalSortExpr;
use datafusion_common::Result;
pub use datafusion_common::{ColumnStatistics, Statistics};
pub use datafusion_common::{internal_err, ColumnStatistics, Statistics};
pub use visitor::{accept, visit_execution_plan, ExecutionPlanVisitor};

use arrow::datatypes::SchemaRef;
Expand Down Expand Up @@ -232,9 +232,7 @@ pub fn with_new_children_if_necessary(
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
let old_children = plan.children();
if children.len() != old_children.len() {
Err(DataFusionError::Internal(
"Wrong number of children".to_string(),
))
internal_err!("Wrong number of children")
} else if children.is_empty()
|| children
.iter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,10 +219,9 @@ impl ExecutionPlan for SortPreservingMergeExec {
.register(&context.runtime_env().memory_pool);

match input_partitions {
0 => Err(DataFusionError::Internal(
0 => internal_err!(
"SortPreservingMergeExec requires at least one input partition"
.to_owned(),
)),
),
1 => {
// bypass if there is only one partition to merge (no metrics in this case either)
let result = self.input.execute(0, context);
Expand Down
Loading

0 comments on commit cd5cc20

Please sign in to comment.