Skip to content

Commit

Permalink
Merge branch 'apache:main' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
comphead authored Aug 18, 2023
2 parents 71bf5de + f7e784c commit 8fb17e3
Show file tree
Hide file tree
Showing 21 changed files with 779 additions and 144 deletions.
35 changes: 21 additions & 14 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,22 +302,26 @@ config_namespace! {
/// Sets default parquet compression codec
/// Valid values are: uncompressed, snappy, gzip(level),
/// lzo, brotli(level), lz4, zstd(level), and lz4_raw.
/// These values are not case sensitive.
pub compression: String, default = "snappy".into()
/// These values are not case sensitive. If NULL, uses
/// default parquet writer setting
pub compression: Option<String>, default = None

/// Sets if dictionary encoding is enabled
pub dictionary_enabled: bool, default = true
/// Sets if dictionary encoding is enabled. If NULL, uses
/// default parquet writer setting
pub dictionary_enabled: Option<bool>, default = None

/// Sets best effort maximum dictionary page size, in bytes
pub dictionary_page_size_limit: usize, default = 1024 * 1024

/// Sets if statistics are enabled for any column
/// Valid values are: "none", "chunk", and "page"
/// These values are not case sensitive.
pub statistics_enabled: String, default = "page".into()
/// These values are not case sensitive. If NULL, uses
/// default parquet writer setting
pub statistics_enabled: Option<String>, default = None

/// Sets max statistics size for any column
pub max_statistics_size: usize, default = 4096
/// Sets max statistics size for any column. If NULL, uses
/// default parquet writer setting
pub max_statistics_size: Option<usize>, default = None

/// Sets maximum number of rows in a row group
pub max_row_group_size: usize, default = 1024 * 1024
Expand All @@ -335,17 +339,20 @@ config_namespace! {
/// Valid values are: plain, plain_dictionary, rle,
/// bit_packed, delta_binary_packed, delta_length_byte_array,
/// delta_byte_array, rle_dictionary, and byte_stream_split.
/// These values are not case sensitive.
pub encoding: String, default = "plain".into()
/// These values are not case sensitive. If NULL, uses
/// default parquet writer setting
pub encoding: Option<String>, default = None

/// Sets if bloom filter is enabled for any column
pub bloom_filter_enabled: bool, default = false

/// Sets bloom filter false positive probability
pub bloom_filter_fpp: f64, default = 0.05
/// Sets bloom filter false positive probability. If NULL, uses
/// default parquet writer setting
pub bloom_filter_fpp: Option<f64>, default = None

/// Sets bloom filter number of distinct values
pub bloom_filter_ndv: u64, default = 1_000_000_u64
/// Sets bloom filter number of distinct values. If NULL, uses
/// default parquet writer setting
pub bloom_filter_ndv: Option<u64>, default = None
}
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ rand = "0.8"
smallvec = { version = "1.6", features = ["union"] }
sqlparser = { workspace = true }
tempfile = "3"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] }
tokio = { version = "1.28", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] }
tokio-util = { version = "0.7.4", features = ["io"] }
url = "2.2"
uuid = { version = "1.0", features = ["v4"] }
Expand Down
55 changes: 43 additions & 12 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -755,28 +755,59 @@ impl ParquetSink {
context: &Arc<TaskContext>,
) -> Result<WriterProperties> {
let parquet_context = &context.session_config().options().execution.parquet;
Ok(WriterProperties::builder()
let mut builder = WriterProperties::builder()
.set_data_page_size_limit(parquet_context.data_pagesize_limit)
.set_write_batch_size(parquet_context.write_batch_size)
.set_writer_version(parse_version_string(&parquet_context.writer_version)?)
.set_compression(parse_compression_string(&parquet_context.compression)?)
.set_dictionary_enabled(parquet_context.dictionary_enabled)
.set_dictionary_page_size_limit(parquet_context.dictionary_page_size_limit)
.set_statistics_enabled(parse_statistics_string(
&parquet_context.statistics_enabled,
)?)
.set_max_statistics_size(parquet_context.max_statistics_size)
.set_max_row_group_size(parquet_context.max_row_group_size)
.set_created_by(parquet_context.created_by.clone())
.set_column_index_truncate_length(
parquet_context.column_index_truncate_length,
)
.set_data_page_row_count_limit(parquet_context.data_page_row_count_limit)
.set_encoding(parse_encoding_string(&parquet_context.encoding)?)
.set_bloom_filter_enabled(parquet_context.bloom_filter_enabled)
.set_bloom_filter_fpp(parquet_context.bloom_filter_fpp)
.set_bloom_filter_ndv(parquet_context.bloom_filter_ndv)
.build())
.set_bloom_filter_enabled(parquet_context.bloom_filter_enabled);

builder = match &parquet_context.encoding {
Some(encoding) => builder.set_encoding(parse_encoding_string(encoding)?),
None => builder,
};

builder = match &parquet_context.dictionary_enabled {
Some(enabled) => builder.set_dictionary_enabled(*enabled),
None => builder,
};

builder = match &parquet_context.compression {
Some(compression) => {
builder.set_compression(parse_compression_string(compression)?)
}
None => builder,
};

builder = match &parquet_context.statistics_enabled {
Some(statistics) => {
builder.set_statistics_enabled(parse_statistics_string(statistics)?)
}
None => builder,
};

builder = match &parquet_context.max_statistics_size {
Some(size) => builder.set_max_statistics_size(*size),
None => builder,
};

builder = match &parquet_context.bloom_filter_fpp {
Some(fpp) => builder.set_bloom_filter_fpp(*fpp),
None => builder,
};

builder = match &parquet_context.bloom_filter_ndv {
Some(ndv) => builder.set_bloom_filter_ndv(*ndv),
None => builder,
};

Ok(builder.build())
}

// Create a write for parquet files
Expand Down
28 changes: 28 additions & 0 deletions datafusion/core/src/datasource/listing/url.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

use std::fs;

use crate::datasource::object_store::ObjectStoreUrl;
use datafusion_common::{DataFusionError, Result};
use futures::stream::BoxStream;
Expand Down Expand Up @@ -87,6 +89,32 @@ impl ListingTableUrl {
}
}

/// Get object store for specified input_url
/// if input_url is actually not a url, we assume it is a local file path
/// if we have a local path, create it if not exists so ListingTableUrl::parse works
pub fn parse_create_local_if_not_exists(
s: impl AsRef<str>,
is_directory: bool,
) -> Result<Self> {
let s = s.as_ref();
let is_valid_url = Url::parse(s).is_ok();

match is_valid_url {
true => ListingTableUrl::parse(s),
false => {
let path = std::path::PathBuf::from(s);
if !path.exists() {
if is_directory {
fs::create_dir_all(path)?;
} else {
fs::File::create(path)?;
}
}
ListingTableUrl::parse(s)
}
}
}

/// Creates a new [`ListingTableUrl`] interpreting `s` as a filesystem path
fn parse_path(s: &str) -> Result<Self> {
let (prefix, glob) = match split_glob_expression(s) {
Expand Down
31 changes: 30 additions & 1 deletion datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,36 @@ impl TableProviderFactory for ListingTableFactory {
},
}?;

let create_local_path_mode = cmd
.options
.get("create_local_path")
.map(|s| s.as_str())
.unwrap_or("false");
let single_file = cmd
.options
.get("single_file")
.map(|s| s.as_str())
.unwrap_or("false");

let single_file = match single_file {
"true" => Ok(true),
"false" => Ok(false),
_ => Err(DataFusionError::Plan(
"Invalid option single_file, must be 'true' or 'false'".into(),
)),
}?;

let table_path = match create_local_path_mode {
"true" => ListingTableUrl::parse_create_local_if_not_exists(
&cmd.location,
!single_file,
),
"false" => ListingTableUrl::parse(&cmd.location),
_ => Err(DataFusionError::Plan(
"Invalid option create_local_path, must be 'true' or 'false'".into(),
)),
}?;

let options = ListingOptions::new(file_format)
.with_collect_stat(state.config().collect_statistics())
.with_file_extension(file_extension)
Expand All @@ -154,7 +184,6 @@ impl TableProviderFactory for ListingTableFactory {
.with_file_sort_order(cmd.order_exprs.clone())
.with_insert_mode(insert_mode);

let table_path = ListingTableUrl::parse(&cmd.location)?;
let resolved_schema = match provided_schema {
None => options.infer_schema(state, &table_path).await?,
Some(s) => s,
Expand Down
Loading

0 comments on commit 8fb17e3

Please sign in to comment.