diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java index dc22fc4b704c..8e64eaad44e7 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java @@ -331,4 +331,39 @@ public void testReplacePartitionField() { row(1, Timestamp.valueOf("2022-01-01 10:00:00"), Date.valueOf("2022-01-01"))), sql("SELECT * FROM %s WHERE ts < current_timestamp()", tableName)); } + + @Test + public void testWriteManifestWithSpecId() { + sql( + "CREATE TABLE %s (id int, dt string, hr string) USING iceberg PARTITIONED BY (dt)", + tableName); + sql("ALTER TABLE %s SET TBLPROPERTIES ('commit.manifest-merge.enabled' = 'false')", tableName); + + sql("INSERT INTO %s VALUES (1, '2024-01-01', '00')", tableName); + sql("INSERT INTO %s VALUES (2, '2024-01-01', '00')", tableName); + assertEquals( + "Should have 2 manifests and their partition spec id should be 0", + ImmutableList.of(row(0), row(0)), + sql("SELECT partition_spec_id FROM %s.manifests order by 1 asc", tableName)); + + sql("ALTER TABLE %s ADD PARTITION FIELD hr", tableName); + sql("INSERT INTO %s VALUES (3, '2024-01-01', '00')", tableName); + assertEquals( + "Should have 3 manifests and their partition spec id should be 0 and 1", + ImmutableList.of(row(0), row(0), row(1)), + sql("SELECT partition_spec_id FROM %s.manifests order by 1 asc", tableName)); + + List output = sql("CALL %s.system.rewrite_manifests('%s')", catalogName, tableIdent); + assertEquals("Nothing should be rewritten", ImmutableList.of(row(0, 0)), output); + + output = + sql( + "CALL %s.system.rewrite_manifests(table => '%s', spec_id => 0)", + catalogName, tableIdent); + assertEquals("There should be 2 manifests rewriten", ImmutableList.of(row(2, 1)), output); + assertEquals( + "Should have 2 manifests and their partition spec id should be 0 and 1", + ImmutableList.of(row(0), row(1)), + sql("SELECT partition_spec_id FROM %s.manifests order by 1 asc", tableName)); + } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java index c8becc7e5a0f..e59077ae3da9 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java @@ -46,7 +46,8 @@ class RewriteManifestsProcedure extends BaseProcedure { private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[] { ProcedureParameter.required("table", DataTypes.StringType), - ProcedureParameter.optional("use_caching", DataTypes.BooleanType) + ProcedureParameter.optional("use_caching", DataTypes.BooleanType), + ProcedureParameter.optional("spec_id", DataTypes.IntegerType) }; // counts are not nullable since the action result is never null @@ -85,6 +86,7 @@ public StructType outputType() { public InternalRow[] call(InternalRow args) { Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); Boolean useCaching = args.isNullAt(1) ? null : args.getBoolean(1); + Integer specId = args.isNullAt(2) ? null : args.getInt(2); return modifyIcebergTable( tableIdent, @@ -95,6 +97,10 @@ public InternalRow[] call(InternalRow args) { action.option(RewriteManifestsSparkAction.USE_CACHING, useCaching.toString()); } + if (specId != null) { + action.specId(specId); + } + RewriteManifests.Result result = action.execute(); return toOutputRows(result);