Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API: Drop column of deleted partitioned field to Unbound partitionSpec #4602

Closed
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions api/src/main/java/org/apache/iceberg/UnboundPartitionSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.transforms.Transform;
import org.apache.iceberg.transforms.Transforms;
import org.apache.iceberg.types.Types;

public class UnboundPartitionSpec {

Expand Down Expand Up @@ -53,10 +54,13 @@ private PartitionSpec.Builder copyToBuilder(Schema schema) {
PartitionSpec.Builder builder = PartitionSpec.builderFor(schema).withSpecId(specId);

for (UnboundPartitionField field : fields) {
if (field.partitionId != null) {
builder.add(field.sourceId, field.partitionId, field.name, field.transform);
} else {
builder.add(field.sourceId, field.name, field.transform);
Types.NestedField column = schema.findField(field.sourceId);
felixYyu marked this conversation as resolved.
Show resolved Hide resolved
if (column != null) {
if (field.partitionId != null) {
builder.add(field.sourceId, field.partitionId, field.name, field.transform);
} else {
builder.add(field.sourceId, field.name, field.transform);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,16 +421,97 @@ public void testSparkTableAddDropPartitions() throws Exception {
"spark table partition should be empty", 0, sparkTable().partitioning().length);
}

@Test
public void testUnboundPartitionSpecFormatVersion1() throws Exception {
sql(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just an idea: since both tests are identical except for the format-version property in the create statement, maybe we could combine them and supply the format-version as a parameter in a loop?

IntStream.rangeClosed(1, 2).forEach(version -> {
   ... // sql statements
});

This way it could be easily extended for future versions as well. WDYT?

"CREATE TABLE IF NOT EXISTS %s (id bigint NOT NULL, ts timestamp, data string) USING iceberg "
+ "TBLPROPERTIES ('format-version' = 1, 'write.delete.mode' = 'merge-on-read')",
tableName);
Assert.assertEquals(
"spark table partition should be empty", 0, sparkTable().partitioning().length);

sql("INSERT INTO %s VALUES (1, current_timestamp(), 'format-version-1-first-data')", tableName);
Assert.assertEquals(
"Should have 1 rows after insert", 1L, scalarSql("SELECT count(*) FROM %s", tableName));

sql("ALTER TABLE %s ADD PARTITION FIELD truncate(data, 4)", tableName);
assertPartitioningEquals(sparkTable(), 1, "truncate(data, 4)");

sql(
"INSERT INTO %s VALUES (2, current_timestamp(), 'format-version-1-second-data')",
tableName);
Assert.assertEquals(
"Should have 2 rows after insert", 2L, scalarSql("SELECT count(*) FROM %s", tableName));

sql("ALTER TABLE %s DROP PARTITION FIELD truncate(data, 4)", tableName);
Assert.assertEquals(
"spark table partition should be empty", 0, sparkTable().partitioning().length);

sql("INSERT INTO %s VALUES (3, current_timestamp(), 'format-version-1-third-data')", tableName);
Assert.assertEquals(
"Should have 3 rows after insert", 3L, scalarSql("SELECT count(*) FROM %s", tableName));

sql("ALTER TABLE %s DROP COLUMN data", tableName);

Assert.assertEquals(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition to the data read test, shall we add a test to read back the partitions metatable contents as well?

"Should have 3 rows after insert", 3L, scalarSql("SELECT count(*) FROM %s", tableName));
}

@Test
public void testUnboundPartitionSpecFormatVersion2() throws Exception {
sql(
"CREATE TABLE IF NOT EXISTS %s (id bigint NOT NULL, ts timestamp, data string) USING iceberg "
+ "TBLPROPERTIES ('format-version' = 2, 'write.delete.mode' = 'merge-on-read')",
tableName);
Assert.assertEquals(
"spark table partition should be empty", 0, sparkTable().partitioning().length);

sql("INSERT INTO %s VALUES (1, current_timestamp(), 'format-version-2-first-data')", tableName);
Assert.assertEquals(
"Should have 1 rows after insert", 1L, scalarSql("SELECT count(*) FROM %s", tableName));

sql("ALTER TABLE %s ADD PARTITION FIELD truncate(data, 4)", tableName);
assertPartitioningEquals(sparkTable(), 1, "truncate(data, 4)");

sql(
"INSERT INTO %s VALUES (2, current_timestamp(), 'format-version-2-second-data')",
tableName);
Assert.assertEquals(
"Should have 2 rows after insert", 2L, scalarSql("SELECT count(*) FROM %s", tableName));

sql("ALTER TABLE %s DROP PARTITION FIELD truncate(data, 4)", tableName);
Assert.assertEquals(
"spark table partition should be empty", 0, sparkTable().partitioning().length);

sql("INSERT INTO %s VALUES (3, current_timestamp(), 'format-version-2-third-data')", tableName);
Assert.assertEquals(
"Should have 3 rows after insert", 3L, scalarSql("SELECT count(*) FROM %s", tableName));

sql("ALTER TABLE %s DROP COLUMN data", tableName);

Assert.assertEquals(
"Should have 3 rows after insert", 3L, scalarSql("SELECT count(*) FROM %s", tableName));
}

@Test
public void testDropColumnOfOldPartitionFieldV1() {
// default table created in v1 format
sql(
"CREATE TABLE %s (id bigint NOT NULL, ts timestamp, day_of_ts date) USING iceberg PARTITIONED BY (day_of_ts) TBLPROPERTIES('format-version' = '1')",
tableName);

sql(
"INSERT INTO %s VALUES (1, CAST('2022-01-01 10:00:00' AS TIMESTAMP), CAST('2022-01-01' AS DATE))",
tableName);

sql("ALTER TABLE %s REPLACE PARTITION FIELD day_of_ts WITH days(ts)", tableName);

sql("ALTER TABLE %s DROP COLUMN day_of_ts", tableName);

assertEquals(
"Should have expected rows",
ImmutableList.of(row(1L, Timestamp.valueOf("2022-01-01 10:00:00"))),
sql("SELECT * FROM %s WHERE ts < current_timestamp()", tableName));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading from the table is actually breaking on current master:

Caused by: java.lang.NullPointerException: Type cannot be null
	at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:907)
	at org.apache.iceberg.types.Types$NestedField.<init>(Types.java:446)
	at org.apache.iceberg.types.Types$NestedField.optional(Types.java:415)
	at org.apache.iceberg.PartitionSpec.partitionType(PartitionSpec.java:135)
	at org.apache.iceberg.Partitioning.partitionType(Partitioning.java:233)
	at org.apache.iceberg.spark.source.SparkTable.metadataColumns(SparkTable.java:215)

}

@Test
Expand All @@ -439,9 +520,18 @@ public void testDropColumnOfOldPartitionFieldV2() {
"CREATE TABLE %s (id bigint NOT NULL, ts timestamp, day_of_ts date) USING iceberg PARTITIONED BY (day_of_ts) TBLPROPERTIES('format-version' = '2')",
tableName);

sql(
"INSERT INTO %s VALUES (1, CAST('2022-01-01 10:00:00' AS TIMESTAMP), CAST('2022-01-01' AS DATE))",
tableName);

sql("ALTER TABLE %s REPLACE PARTITION FIELD day_of_ts WITH days(ts)", tableName);

sql("ALTER TABLE %s DROP COLUMN day_of_ts", tableName);

assertEquals(
"Should have expected rows",
ImmutableList.of(row(1L, Timestamp.valueOf("2022-01-01 10:00:00"))),
sql("SELECT * FROM %s WHERE ts < current_timestamp()", tableName));
}

private void assertPartitioningEquals(SparkTable table, int len, String transform) {
Expand Down