Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(query): make delta table support partition prunning #16621

Merged
merged 11 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/actions/test_unit/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ runs:
RUST_TEST_THREADS: "8"
RUST_LOG: ERROR
RUST_MIN_STACK: 104857600
# RUST_BACKTRACE: full
# RUST_BACKTRACE: 1

- name: Upload failure
if: failure()
Expand Down
12 changes: 8 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/query/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ xorf = { version = "0.11.0", default-features = false, features = ["binary-fuse"

[dev-dependencies]
goldenfile = "1.4"
maplit = "1.0.2"

[lints]
workspace = true
1 change: 1 addition & 0 deletions src/query/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub mod cluster_info;
pub mod database;
pub mod lock;
pub mod merge_into_join;
pub mod partition_columns;
pub mod plan;
pub mod query_kind;
pub mod runtime_filter_info;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ mod pushdown_transform;
mod values_serde;

pub use pushdown_transform::get_pushdown_without_partition_columns;
pub use values_serde::get_partition_values;
pub use values_serde::str_to_scalar;
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@

use std::collections::BTreeMap;

use databend_common_catalog::plan::Projection;
use databend_common_catalog::plan::PushDownInfo;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::FieldIndex;

use crate::plan::Projection;
use crate::plan::PushDownInfo;

pub fn get_pushdown_without_partition_columns(
mut pushdown: PushDownInfo,
partition_columns: &[FieldIndex],
Expand Down Expand Up @@ -87,10 +88,9 @@ fn shift_projection(prj: Projection, partition_columns: &[FieldIndex]) -> Result

#[cfg(test)]
mod tests {
use databend_common_catalog::plan::Projection;

use super::shift_projection;
use super::shift_projection_index;
use crate::plan::Projection;

#[test]
fn test_shift_projection_index() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ use databend_common_expression::types::DataType;
use databend_common_expression::types::NumberDataType;
use databend_common_expression::types::NumberScalar;
use databend_common_expression::Scalar;
use databend_common_expression::TableField;
use deltalake::kernel::Add;

pub fn str_to_scalar(value: &str, data_type: &DataType) -> Result<Scalar> {
if value.is_empty() {
Expand Down Expand Up @@ -81,20 +79,3 @@ pub fn str_to_scalar(value: &str, data_type: &DataType) -> Result<Scalar> {
))),
}
}

pub fn get_partition_values(add: &Add, fields: &[&TableField]) -> Result<Vec<Scalar>> {
let mut values = Vec::with_capacity(fields.len());
for f in fields {
match add.partition_values.get(&f.name) {
Some(Some(v)) => values.push(str_to_scalar(v, &f.data_type().into())?),
Some(None) => values.push(Scalar::Null),
None => {
return Err(ErrorCode::BadArguments(format!(
"partition value for column {} not found",
&f.name
)));
}
}
}
Ok(values)
}
6 changes: 1 addition & 5 deletions src/query/config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2940,11 +2940,7 @@ pub struct DiskCacheConfig {
#[serde(default, deny_unknown_fields)]
pub struct SpillConfig {
/// Path of spill to local disk. disable if it's empty.
#[clap(
long,
value_name = "VALUE",
default_value = "./.databend/temp/_query_spill"
)]
#[clap(long, value_name = "VALUE", default_value = "")]
sundy-li marked this conversation as resolved.
Show resolved Hide resolved
pub spill_local_disk_path: OsString,

#[clap(long, value_name = "VALUE", default_value = "30")]
Expand Down
2 changes: 1 addition & 1 deletion src/query/config/src/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,7 @@ pub struct SpillConfig {
impl Default for SpillConfig {
fn default() -> Self {
Self {
path: OsString::from("./.databend/temp/_query_spill"),
path: OsString::from(""),
reserved_disk_ratio: OrderedFloat(0.3),
global_bytes_limit: u64::MAX,
}
Expand Down
12 changes: 10 additions & 2 deletions src/query/service/tests/it/parquet_rs/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::os::unix::fs::PermissionsExt;
use std::sync::Arc;

use arrow_array::Array;
Expand All @@ -36,6 +37,7 @@ use chrono::Duration;
use parquet::arrow::ArrowWriter;
use parquet::file::properties::WriterProperties;
use tempfile::NamedTempFile;
use tokio::fs::create_dir_all;

// Test cases from apache/arrow-datafusion

Expand Down Expand Up @@ -336,10 +338,13 @@ fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {

/// Create a test parquet file with various data types
pub async fn make_test_file_rg(scenario: Scenario) -> (NamedTempFile, SchemaRef) {
let dir = std::env::temp_dir().join("parquets_rg");
create_dir_all(&dir).await.unwrap();
let mut output_file = tempfile::Builder::new()
.prefix("parquet_pruning")
.suffix(".parquet")
.tempfile()
.permissions(std::fs::Permissions::from_mode(0o666))
.tempfile_in(dir)
.expect("tempfile creation");

let props = WriterProperties::builder()
Expand All @@ -362,10 +367,13 @@ pub async fn make_test_file_rg(scenario: Scenario) -> (NamedTempFile, SchemaRef)
}

pub async fn make_test_file_page(scenario: Scenario) -> (NamedTempFile, SchemaRef) {
let dir = std::env::temp_dir().join("parquets_page");
create_dir_all(&dir).await.unwrap();
let mut output_file = tempfile::Builder::new()
.prefix("parquet_page_pruning")
.suffix(".parquet")
.tempfile()
.permissions(std::fs::Permissions::from_mode(0o666))
.tempfile_in(dir)
.expect("tempfile creation");

// set row count to 5, should get same result as rowGroup
Expand Down
Loading
Loading