Skip to content

Commit

Permalink
SchemalessPartitionSpec
Browse files Browse the repository at this point in the history
  • Loading branch information
c-thiel committed Sep 23, 2024
1 parent 12e12e2 commit c0e6d2b
Show file tree
Hide file tree
Showing 10 changed files with 1,083 additions and 608 deletions.
4 changes: 2 additions & 2 deletions crates/catalog/memory/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ mod tests {

assert_eq!(metadata.current_schema().as_ref(), expected_schema);

let expected_partition_spec = PartitionSpec::builder(expected_schema)
let expected_partition_spec = PartitionSpec::builder((*expected_schema).clone())
.with_spec_id(0)
.build()
.unwrap();
Expand All @@ -365,7 +365,7 @@ mod tests {
.partition_specs_iter()
.map(|p| p.as_ref())
.collect_vec(),
vec![&expected_partition_spec]
vec![&expected_partition_spec.into_schemaless()]
);

let expected_sorted_order = SortOrder::builder()
Expand Down
131 changes: 64 additions & 67 deletions crates/iceberg/src/expr/visitors/expression_evaluator.rs

Large diffs are not rendered by default.

11 changes: 5 additions & 6 deletions crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,10 +504,10 @@ mod test {

#[test]
fn test_data_file_no_partitions() {
let (table_schema_ref, _partition_spec_ref) = create_test_schema_and_partition_spec();
let partition_spec_ref = create_test_partition_spec();

let partition_filter = Predicate::AlwaysTrue
.bind(table_schema_ref.clone(), false)
.bind(partition_spec_ref.schema_ref().clone(), false)
.unwrap();

let case_sensitive = false;
Expand Down Expand Up @@ -1645,7 +1645,7 @@ mod test {
assert!(result, "Should read: NotIn on no nulls column");
}

fn create_test_schema_and_partition_spec() -> (Arc<Schema>, Arc<PartitionSpec>) {
fn create_test_partition_spec() -> Arc<PartitionSpec> {
let table_schema = Schema::builder()
.with_fields(vec![Arc::new(NestedField::optional(
1,
Expand All @@ -1656,7 +1656,7 @@ mod test {
.unwrap();
let table_schema_ref = Arc::new(table_schema);

let partition_spec = PartitionSpec::builder(&table_schema_ref)
let partition_spec = PartitionSpec::builder(table_schema_ref.clone())
.with_spec_id(1)
.add_unbound_fields(vec![UnboundPartitionField::builder()
.source_id(1)
Expand All @@ -1667,8 +1667,7 @@ mod test {
.unwrap()
.build()
.unwrap();
let partition_spec_ref = Arc::new(partition_spec);
(table_schema_ref, partition_spec_ref)
Arc::new(partition_spec)
}

fn not_null(reference: &str) -> BoundPredicate {
Expand Down
155 changes: 110 additions & 45 deletions crates/iceberg/src/expr/visitors/inclusive_projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ use fnv::FnvHashSet;

use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor};
use crate::expr::{BoundPredicate, BoundReference, Predicate};
use crate::spec::{Datum, PartitionField, PartitionSpecRef};
use crate::spec::{Datum, PartitionField, SchemalessPartitionSpecRef};
use crate::Error;

pub(crate) struct InclusiveProjection {
partition_spec: PartitionSpecRef,
partition_spec: SchemalessPartitionSpecRef,
cached_parts: HashMap<i32, Vec<PartitionField>>,
}

impl InclusiveProjection {
pub(crate) fn new(partition_spec: PartitionSpecRef) -> Self {
pub(crate) fn new(partition_spec: SchemalessPartitionSpecRef) -> Self {
Self {
partition_spec,
cached_parts: HashMap::new(),
Expand Down Expand Up @@ -235,7 +235,7 @@ mod tests {
use crate::expr::visitors::inclusive_projection::InclusiveProjection;
use crate::expr::{Bind, Predicate, Reference};
use crate::spec::{
Datum, NestedField, PartitionField, PartitionSpec, PrimitiveType, Schema, Transform, Type,
Datum, NestedField, PartitionSpec, PrimitiveType, Schema, Transform, Type,
UnboundPartitionField,
};

Expand Down Expand Up @@ -265,13 +265,14 @@ mod tests {
#[test]
fn test_inclusive_projection_logic_ops() {
let schema = build_test_schema();
let arc_schema = Arc::new(schema);

let partition_spec = PartitionSpec::builder(&schema)
let partition_spec = PartitionSpec::builder(arc_schema.clone())
.with_spec_id(1)
.build()
.unwrap();
.unwrap()
.into_schemaless();

let arc_schema = Arc::new(schema);
let arc_partition_spec = Arc::new(partition_spec);

// this predicate contains only logic operators,
Expand All @@ -295,8 +296,9 @@ mod tests {
#[test]
fn test_inclusive_projection_identity_transform() {
let schema = build_test_schema();
let arc_schema = Arc::new(schema);

let partition_spec = PartitionSpec::builder(&schema)
let partition_spec = PartitionSpec::builder(arc_schema.clone())
.with_spec_id(1)
.add_unbound_field(
UnboundPartitionField::builder()
Expand All @@ -308,9 +310,9 @@ mod tests {
)
.unwrap()
.build()
.unwrap();
.unwrap()
.into_schemaless();

let arc_schema = Arc::new(schema);
let arc_partition_spec = Arc::new(partition_spec);

let unbound_predicate = Reference::new("a").less_than(Datum::int(10));
Expand All @@ -321,7 +323,7 @@ mod tests {
// should result in the same Predicate as the original
// `unbound_predicate`, since we have just a single partition field,
// and it has an Identity transform
let mut inclusive_projection = InclusiveProjection::new(arc_partition_spec.clone());
let mut inclusive_projection = InclusiveProjection::new(arc_partition_spec);
let result = inclusive_projection.project(&bound_predicate).unwrap();

let expected = "a < 10".to_string();
Expand All @@ -330,34 +332,95 @@ mod tests {
}

#[test]
fn test_inclusive_projection_date_transforms() {
fn test_inclusive_projection_date_year_transform() {
let schema = build_test_schema();
let arc_schema = Arc::new(schema);

let partition_spec = PartitionSpec::builder(arc_schema.clone())
.with_spec_id(1)
.add_unbound_fields(vec![UnboundPartitionField {
source_id: 2,
name: "year".to_string(),
field_id: Some(1000),
transform: Transform::Year,
}])
.unwrap()
.build()
.unwrap()
.into_schemaless();

let arc_partition_spec = Arc::new(partition_spec);

let unbound_predicate =
Reference::new("date").less_than(Datum::date_from_str("2024-01-01").unwrap());

let bound_predicate = unbound_predicate.bind(arc_schema.clone(), false).unwrap();

// applying InclusiveProjection to bound_predicate
// should result in a predicate that correctly handles
// year, month and date
let mut inclusive_projection = InclusiveProjection::new(arc_partition_spec);
let result = inclusive_projection.project(&bound_predicate).unwrap();

let expected = "year <= 53".to_string();

assert_eq!(result.to_string(), expected);
}

#[test]
fn test_inclusive_projection_date_month_transform() {
let schema = build_test_schema();
let arc_schema = Arc::new(schema);

let partition_spec = PartitionSpec::builder(arc_schema.clone())
.with_spec_id(1)
.add_unbound_fields(vec![UnboundPartitionField {
source_id: 2,
name: "month".to_string(),
field_id: Some(1000),
transform: Transform::Month,
}])
.unwrap()
.build()
.unwrap()
.into_schemaless();

let arc_partition_spec = Arc::new(partition_spec);

let unbound_predicate =
Reference::new("date").less_than(Datum::date_from_str("2024-01-01").unwrap());

let bound_predicate = unbound_predicate.bind(arc_schema.clone(), false).unwrap();

let partition_spec = PartitionSpec {
spec_id: 1,
fields: vec![
PartitionField {
source_id: 2,
name: "year".to_string(),
field_id: 1000,
transform: Transform::Year,
},
PartitionField {
source_id: 2,
name: "month".to_string(),
field_id: 1001,
transform: Transform::Month,
},
PartitionField {
source_id: 2,
name: "day".to_string(),
field_id: 1002,
transform: Transform::Day,
},
],
};
// applying InclusiveProjection to bound_predicate
// should result in a predicate that correctly handles
// year, month and date
let mut inclusive_projection = InclusiveProjection::new(arc_partition_spec);
let result = inclusive_projection.project(&bound_predicate).unwrap();

let expected = "month <= 647".to_string();

assert_eq!(result.to_string(), expected);
}

#[test]
fn test_inclusive_projection_date_day_transform() {
let schema = build_test_schema();
let arc_schema = Arc::new(schema);

let partition_spec = PartitionSpec::builder(arc_schema.clone())
.with_spec_id(1)
.add_unbound_fields(vec![UnboundPartitionField {
source_id: 2,
name: "day".to_string(),
field_id: Some(1000),
transform: Transform::Day,
}])
.unwrap()
.build()
.unwrap()
.into_schemaless();

let arc_partition_spec = Arc::new(partition_spec);

let unbound_predicate =
Expand All @@ -368,19 +431,20 @@ mod tests {
// applying InclusiveProjection to bound_predicate
// should result in a predicate that correctly handles
// year, month and date
let mut inclusive_projection = InclusiveProjection::new(arc_partition_spec.clone());
let mut inclusive_projection = InclusiveProjection::new(arc_partition_spec);
let result = inclusive_projection.project(&bound_predicate).unwrap();

let expected = "((year <= 53) AND (month <= 647)) AND (day <= 19722)".to_string();
let expected = "day <= 19722".to_string();

assert_eq!(result.to_string(), expected);
}

#[test]
fn test_inclusive_projection_truncate_transform() {
let schema = build_test_schema();
let arc_schema = Arc::new(schema);

let partition_spec = PartitionSpec::builder(&schema)
let partition_spec = PartitionSpec::builder(arc_schema.clone())
.with_spec_id(1)
.add_unbound_field(
UnboundPartitionField::builder()
Expand All @@ -392,9 +456,9 @@ mod tests {
)
.unwrap()
.build()
.unwrap();
.unwrap()
.into_schemaless();

let arc_schema = Arc::new(schema);
let arc_partition_spec = Arc::new(partition_spec);

let unbound_predicate = Reference::new("name").starts_with(Datum::string("Testy McTest"));
Expand All @@ -408,7 +472,7 @@ mod tests {
// name that start with "Testy McTest" into a partition
// for values of name that start with the first four letters
// of that, ie "Test".
let mut inclusive_projection = InclusiveProjection::new(arc_partition_spec.clone());
let mut inclusive_projection = InclusiveProjection::new(arc_partition_spec);
let result = inclusive_projection.project(&bound_predicate).unwrap();

let expected = "name_truncate STARTS WITH \"Test\"".to_string();
Expand All @@ -419,8 +483,9 @@ mod tests {
#[test]
fn test_inclusive_projection_bucket_transform() {
let schema = build_test_schema();
let arc_schema = Arc::new(schema);

let partition_spec = PartitionSpec::builder(&schema)
let partition_spec = PartitionSpec::builder(arc_schema.clone())
.with_spec_id(1)
.add_unbound_field(
UnboundPartitionField::builder()
Expand All @@ -432,9 +497,9 @@ mod tests {
)
.unwrap()
.build()
.unwrap();
.unwrap()
.into_schemaless();

let arc_schema = Arc::new(schema);
let arc_partition_spec = Arc::new(partition_spec);

let unbound_predicate = Reference::new("a").equal_to(Datum::int(10));
Expand All @@ -445,7 +510,7 @@ mod tests {
// should result in the "a = 10" predicate being
// transformed into "a = 2", since 10 gets bucketed
// to 2 with a Bucket(7) partition
let mut inclusive_projection = InclusiveProjection::new(arc_partition_spec.clone());
let mut inclusive_projection = InclusiveProjection::new(arc_partition_spec);
let result = inclusive_projection.project(&bound_predicate).unwrap();

let expected = "a_bucket[7] = 2".to_string();
Expand Down
2 changes: 1 addition & 1 deletion crates/iceberg/src/io/object_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ mod tests {
)
.write(Manifest::new(
ManifestMetadata::builder()
.schema((*current_schema).clone())
.schema(current_schema.clone())
.content(ManifestContentType::Data)
.format_version(FormatVersion::V2)
.partition_spec((**current_partition_spec).clone())
Expand Down
8 changes: 4 additions & 4 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ impl PartitionFilterCache {
&self,
spec_id: i32,
table_metadata: &TableMetadataRef,
schema: &SchemaRef,
schema: &Schema,
case_sensitive: bool,
filter: BoundPredicate,
) -> Result<Arc<BoundPredicate>> {
Expand All @@ -693,11 +693,11 @@ impl PartitionFilterCache {
format!("Could not find partition spec for id {}", spec_id),
))?;

let partition_type = partition_spec.partition_type(schema.as_ref())?;
let partition_type = partition_spec.partition_type(schema)?;
let partition_fields = partition_type.fields().to_owned();
let partition_schema = Arc::new(
Schema::builder()
.with_schema_id(partition_spec.spec_id)
.with_schema_id(partition_spec.spec_id())
.with_fields(partition_fields)
.build()?,
);
Expand Down Expand Up @@ -989,7 +989,7 @@ mod tests {
)
.write(Manifest::new(
ManifestMetadata::builder()
.schema((*current_schema).clone())
.schema(current_schema.clone())
.content(ManifestContentType::Data)
.format_version(FormatVersion::V2)
.partition_spec((**current_partition_spec).clone())
Expand Down
Loading

0 comments on commit c0e6d2b

Please sign in to comment.