Skip to content

Commit

Permalink
Spark 3.4: Add fast_forward procedure (#8081)
Browse files Browse the repository at this point in the history
  • Loading branch information
rakesh-das08 authored Aug 6, 2023
1 parent 680241b commit b64446c
Show file tree
Hide file tree
Showing 3 changed files with 286 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark.extensions;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
import org.junit.After;
import org.junit.Test;

public class TestFastForwardBranchProcedure extends SparkExtensionsTestBase {
public TestFastForwardBranchProcedure(
String catalogName, String implementation, Map<String, String> config) {
super(catalogName, implementation, config);
}

@After
public void removeTables() {
sql("DROP TABLE IF EXISTS %s", tableName);
}

@Test
public void testFastForwardBranchUsingPositionalArgs() {
sql("CREATE TABLE %s (id int NOT NULL, data string) USING iceberg", tableName);
sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);

Table table = validationCatalog.loadTable(tableIdent);
table.refresh();

Snapshot currSnapshot = table.currentSnapshot();
long sourceRef = currSnapshot.snapshotId();

String newBranch = "testBranch";
String tableNameWithBranch = String.format("%s.branch_%s", tableName, newBranch);

sql("ALTER TABLE %s CREATE BRANCH %s", tableName, newBranch);
sql("INSERT INTO TABLE %s VALUES(3,'c')", tableNameWithBranch);

table.refresh();
long updatedRef = table.snapshot(newBranch).snapshotId();

assertEquals(
"Main branch should not have the newly inserted record.",
ImmutableList.of(row(1, "a"), row(2, "b")),
sql("SELECT * FROM %s order by id", tableName));

assertEquals(
"Test branch should have the newly inserted record.",
ImmutableList.of(row(1, "a"), row(2, "b"), row(3, "c")),
sql("SELECT * FROM %s order by id", tableNameWithBranch));

List<Object[]> output =
sql(
"CALL %s.system.fast_forward('%s', '%s', '%s')",
catalogName, tableIdent, SnapshotRef.MAIN_BRANCH, newBranch);

assertThat(Arrays.stream(output.get(0)).collect(Collectors.toList()).get(0))
.isEqualTo(SnapshotRef.MAIN_BRANCH);

assertThat(Arrays.stream(output.get(0)).collect(Collectors.toList()).get(1))
.isEqualTo(sourceRef);

assertThat(Arrays.stream(output.get(0)).collect(Collectors.toList()).get(2))
.isEqualTo(updatedRef);

assertEquals(
"Main branch should have the newly inserted record.",
ImmutableList.of(row(1, "a"), row(2, "b"), row(3, "c")),
sql("SELECT * FROM %s order by id", tableName));
}

@Test
public void testFastForwardBranchUsingNamedArgs() {
sql("CREATE TABLE %s (id int NOT NULL, data string) USING iceberg", tableName);
sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);

String newBranch = "testBranch";
String tableNameWithBranch = String.format("%s.branch_%s", tableName, newBranch);

sql("ALTER TABLE %s CREATE BRANCH %s", tableName, newBranch);
sql("INSERT INTO TABLE %s VALUES(3,'c')", tableNameWithBranch);

assertEquals(
"Main branch should not have the newly inserted record.",
ImmutableList.of(row(1, "a"), row(2, "b")),
sql("SELECT * FROM %s order by id", tableName));

assertEquals(
"Test branch should have the newly inserted record.",
ImmutableList.of(row(1, "a"), row(2, "b"), row(3, "c")),
sql("SELECT * FROM %s order by id", tableNameWithBranch));

List<Object[]> output =
sql(
"CALL %s.system.fast_forward(table => '%s', branch => '%s', to => '%s')",
catalogName, tableIdent, SnapshotRef.MAIN_BRANCH, newBranch);

assertEquals(
"Main branch should now have the newly inserted record.",
ImmutableList.of(row(1, "a"), row(2, "b"), row(3, "c")),
sql("SELECT * FROM %s order by id", tableName));
}

@Test
public void testFastForwardWhenTargetIsNotAncestorFails() {
sql("CREATE TABLE %s (id int NOT NULL, data string) USING iceberg", tableName);
sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);

String newBranch = "testBranch";
String tableNameWithBranch = String.format("%s.branch_%s", tableName, newBranch);

sql("ALTER TABLE %s CREATE BRANCH %s", tableName, newBranch);
sql("INSERT INTO TABLE %s VALUES(3,'c')", tableNameWithBranch);

assertEquals(
"Main branch should not have the newly inserted record.",
ImmutableList.of(row(1, "a"), row(2, "b")),
sql("SELECT * FROM %s order by id", tableName));

assertEquals(
"Test branch should have the newly inserted record.",
ImmutableList.of(row(1, "a"), row(2, "b"), row(3, "c")),
sql("SELECT * FROM %s order by id", tableNameWithBranch));

// Commit a snapshot on main to deviate the branches
sql("INSERT INTO TABLE %s VALUES (4, 'd')", tableName);

assertThatThrownBy(
() ->
sql(
"CALL %s.system.fast_forward(table => '%s', branch => '%s', to => '%s')",
catalogName, tableIdent, SnapshotRef.MAIN_BRANCH, newBranch))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Cannot fast-forward: main is not an ancestor of testBranch");
}

@Test
public void testInvalidFastForwardBranchCases() {
assertThatThrownBy(
() ->
sql(
"CALL %s.system.fast_forward('test_table', branch => 'main', to => 'newBranch')",
catalogName))
.isInstanceOf(AnalysisException.class)
.hasMessage("Named and positional arguments cannot be mixed");

assertThatThrownBy(
() ->
sql("CALL %s.custom.fast_forward('test_table', 'main', 'newBranch')", catalogName))
.isInstanceOf(NoSuchProcedureException.class)
.hasMessage("Procedure custom.fast_forward not found");

assertThatThrownBy(() -> sql("CALL %s.system.fast_forward('test_table', 'main')", catalogName))
.isInstanceOf(AnalysisException.class)
.hasMessage("Missing required parameters: [to]");

assertThatThrownBy(
() -> sql("CALL %s.system.fast_forward('', 'main', 'newBranch')", catalogName))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Cannot handle an empty identifier for argument table");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark.procedures;

import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;

public class FastForwardBranchProcedure extends BaseProcedure {

private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
ProcedureParameter.required("table", DataTypes.StringType),
ProcedureParameter.required("branch", DataTypes.StringType),
ProcedureParameter.required("to", DataTypes.StringType)
};

private static final StructType OUTPUT_TYPE =
new StructType(
new StructField[] {
new StructField("branch_updated", DataTypes.StringType, false, Metadata.empty()),
new StructField("previous_ref", DataTypes.LongType, true, Metadata.empty()),
new StructField("updated_ref", DataTypes.LongType, false, Metadata.empty())
});

public static SparkProcedures.ProcedureBuilder builder() {
return new Builder<FastForwardBranchProcedure>() {
@Override
protected FastForwardBranchProcedure doBuild() {
return new FastForwardBranchProcedure(tableCatalog());
}
};
}

private FastForwardBranchProcedure(TableCatalog tableCatalog) {
super(tableCatalog);
}

@Override
public ProcedureParameter[] parameters() {
return PARAMETERS;
}

@Override
public StructType outputType() {
return OUTPUT_TYPE;
}

@Override
public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
String source = args.getString(1);
String target = args.getString(2);

return modifyIcebergTable(
tableIdent,
table -> {
long currentRef = table.currentSnapshot().snapshotId();
table.manageSnapshots().fastForwardBranch(source, target).commit();
long updatedRef = table.currentSnapshot().snapshotId();

InternalRow outputRow =
newInternalRow(UTF8String.fromString(source), currentRef, updatedRef);
return new InternalRow[] {outputRow};
});
}

@Override
public String description() {
return "FastForwardBranchProcedure";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ private static Map<String, Supplier<ProcedureBuilder>> initProcedureBuilders() {
mapBuilder.put("publish_changes", PublishChangesProcedure::builder);
mapBuilder.put("create_changelog_view", CreateChangelogViewProcedure::builder);
mapBuilder.put("rewrite_position_delete_files", RewritePositionDeleteFilesProcedure::builder);
mapBuilder.put("fast_forward", FastForwardBranchProcedure::builder);
return mapBuilder.build();
}

Expand Down

0 comments on commit b64446c

Please sign in to comment.