Skip to content

Commit

Permalink
Spark 3.2: Use ProcedureInput in AncestorsOfProcedure and AddFilesPro…
Browse files Browse the repository at this point in the history
…cedure (#7260)
  • Loading branch information
aokolnychyi authored Apr 4, 2023
1 parent 49e9308 commit d485cc8
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public void testInvalidAncestorOfCases() {
AssertHelpers.assertThrows(
"Should reject calls with empty table identifier",
IllegalArgumentException.class,
"Cannot handle an empty identifier for argument table",
"Cannot handle an empty identifier for parameter 'table'",
() -> sql("CALL %s.system.ancestors_of('')", catalogName));

AssertHelpers.assertThrows(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,9 @@
import org.apache.iceberg.mapping.MappingUtil;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkTableUtil;
import org.apache.iceberg.spark.SparkTableUtil.SparkPartition;
Expand All @@ -50,18 +49,21 @@
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.runtime.BoxedUnit;

class AddFilesProcedure extends BaseProcedure {

private static final Joiner.MapJoiner MAP_JOINER = Joiner.on(",").withKeyValueSeparator("=");
private static final ProcedureParameter TABLE_PARAM =
ProcedureParameter.required("table", DataTypes.StringType);
private static final ProcedureParameter SOURCE_TABLE_PARAM =
ProcedureParameter.required("source_table", DataTypes.StringType);
private static final ProcedureParameter PARTITION_FILTER_PARAM =
ProcedureParameter.optional("partition_filter", STRING_MAP);
private static final ProcedureParameter CHECK_DUPLICATE_FILES_PARAM =
ProcedureParameter.optional("check_duplicate_files", DataTypes.BooleanType);

private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
ProcedureParameter.required("table", DataTypes.StringType),
ProcedureParameter.required("source_table", DataTypes.StringType),
ProcedureParameter.optional("partition_filter", STRING_MAP),
ProcedureParameter.optional("check_duplicate_files", DataTypes.BooleanType)
TABLE_PARAM, SOURCE_TABLE_PARAM, PARTITION_FILTER_PARAM, CHECK_DUPLICATE_FILES_PARAM
};

private static final StructType OUTPUT_TYPE =
Expand Down Expand Up @@ -95,30 +97,17 @@ public StructType outputType() {

@Override
public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args);

Identifier tableIdent = input.ident(TABLE_PARAM);

CatalogPlugin sessionCat = spark().sessionState().catalogManager().v2SessionCatalog();
Identifier sourceIdent =
toCatalogAndIdentifier(args.getString(1), PARAMETERS[1].name(), sessionCat).identifier();

Map<String, String> partitionFilter = Maps.newHashMap();
if (!args.isNullAt(2)) {
args.getMap(2)
.foreach(
DataTypes.StringType,
DataTypes.StringType,
(k, v) -> {
partitionFilter.put(k.toString(), v.toString());
return BoxedUnit.UNIT;
});
}
Identifier sourceIdent = input.ident(SOURCE_TABLE_PARAM, sessionCat);

boolean checkDuplicateFiles;
if (args.isNullAt(3)) {
checkDuplicateFiles = true;
} else {
checkDuplicateFiles = args.getBoolean(3);
}
Map<String, String> partitionFilter =
input.asStringMap(PARTITION_FILTER_PARAM, ImmutableMap.of());

boolean checkDuplicateFiles = input.asBoolean(CHECK_DUPLICATE_FILES_PARAM, true);

long addedFilesCount =
importToIceberg(tableIdent, sourceIdent, partitionFilter, checkDuplicateFiles);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@

public class AncestorsOfProcedure extends BaseProcedure {

private static final ProcedureParameter TABLE_PARAM =
ProcedureParameter.required("table", DataTypes.StringType);
private static final ProcedureParameter SNAPSHOT_ID_PARAM =
ProcedureParameter.optional("snapshot_id", DataTypes.LongType);

private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
ProcedureParameter.required("table", DataTypes.StringType),
ProcedureParameter.optional("snapshot_id", DataTypes.LongType),
};
new ProcedureParameter[] {TABLE_PARAM, SNAPSHOT_ID_PARAM};

private static final StructType OUTPUT_TYPE =
new StructType(
Expand Down Expand Up @@ -72,8 +74,10 @@ public StructType outputType() {

@Override
public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
Long toSnapshotId = args.isNullAt(1) ? null : args.getLong(1);
ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args);

Identifier tableIdent = input.ident(TABLE_PARAM);
Long toSnapshotId = input.asLong(SNAPSHOT_ID_PARAM, null);

SparkTable sparkTable = loadSparkTable(tableIdent);
Table icebergTable = sparkTable.table();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,11 @@ private Dataset<Row> computeUpdateImages(String[] identifierColumns, Dataset<Row
private boolean shouldComputeUpdateImages(ProcedureInput input) {
// If the identifier columns are set, we compute pre/post update images by default.
boolean defaultValue = input.isProvided(IDENTIFIER_COLUMNS_PARAM);
return input.bool(COMPUTE_UPDATES_PARAM, defaultValue);
return input.asBoolean(COMPUTE_UPDATES_PARAM, defaultValue);
}

private boolean shouldRemoveCarryoverRows(ProcedureInput input) {
return input.bool(REMOVE_CARRYOVERS_PARAM, true);
return input.asBoolean(REMOVE_CARRYOVERS_PARAM, true);
}

private Dataset<Row> removeCarryoverRows(Dataset<Row> df) {
Expand All @@ -190,7 +190,7 @@ private Dataset<Row> removeCarryoverRows(Dataset<Row> df) {

private String[] identifierColumns(ProcedureInput input, Identifier tableIdent) {
if (input.isProvided(IDENTIFIER_COLUMNS_PARAM)) {
return input.stringArray(IDENTIFIER_COLUMNS_PARAM);
return input.asStringArray(IDENTIFIER_COLUMNS_PARAM);
} else {
Table table = loadSparkTable(tableIdent).table();
return table.schema().identifierFieldNames().toArray(new String[0]);
Expand All @@ -205,12 +205,12 @@ private Identifier changelogTableIdent(Identifier tableIdent) {
}

private Map<String, String> options(ProcedureInput input) {
return input.stringMap(OPTIONS_PARAM, ImmutableMap.of());
return input.asStringMap(OPTIONS_PARAM, ImmutableMap.of());
}

private String viewName(ProcedureInput input, String tableName) {
String defaultValue = String.format("`%s_changes`", tableName);
return input.string(CHANGELOG_VIEW_PARAM, defaultValue);
return input.asString(CHANGELOG_VIEW_PARAM, defaultValue);
}

private Dataset<Row> applyChangelogIterator(Dataset<Row> df, Column[] repartitionSpec) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,31 +62,43 @@ public boolean isProvided(ProcedureParameter param) {
return !args.isNullAt(ordinal);
}

public boolean bool(ProcedureParameter param, boolean defaultValue) {
public Boolean asBoolean(ProcedureParameter param, Boolean defaultValue) {
validateParamType(param, DataTypes.BooleanType);
int ordinal = ordinal(param);
return args.isNullAt(ordinal) ? defaultValue : args.getBoolean(ordinal);
return args.isNullAt(ordinal) ? defaultValue : (Boolean) args.getBoolean(ordinal);
}

public String string(ProcedureParameter param) {
String value = string(param, null);
public long asLong(ProcedureParameter param) {
Long value = asLong(param, null);
Preconditions.checkArgument(value != null, "Parameter '%s' is not set", param.name());
return value;
}

public String string(ProcedureParameter param, String defaultValue) {
public Long asLong(ProcedureParameter param, Long defaultValue) {
validateParamType(param, DataTypes.LongType);
int ordinal = ordinal(param);
return args.isNullAt(ordinal) ? defaultValue : (Long) args.getLong(ordinal);
}

public String asString(ProcedureParameter param) {
String value = asString(param, null);
Preconditions.checkArgument(value != null, "Parameter '%s' is not set", param.name());
return value;
}

public String asString(ProcedureParameter param, String defaultValue) {
validateParamType(param, DataTypes.StringType);
int ordinal = ordinal(param);
return args.isNullAt(ordinal) ? defaultValue : args.getString(ordinal);
}

public String[] stringArray(ProcedureParameter param) {
String[] value = stringArray(param, null);
public String[] asStringArray(ProcedureParameter param) {
String[] value = asStringArray(param, null);
Preconditions.checkArgument(value != null, "Parameter '%s' is not set", param.name());
return value;
}

public String[] stringArray(ProcedureParameter param, String[] defaultValue) {
public String[] asStringArray(ProcedureParameter param, String[] defaultValue) {
validateParamType(param, STRING_ARRAY);
return array(
param,
Expand Down Expand Up @@ -119,7 +131,8 @@ private <T> T[] array(
return convertedArray;
}

public Map<String, String> stringMap(ProcedureParameter param, Map<String, String> defaultValue) {
public Map<String, String> asStringMap(
ProcedureParameter param, Map<String, String> defaultValue) {
validateParamType(param, STRING_MAP);
return map(
param,
Expand Down Expand Up @@ -154,28 +167,34 @@ private <K, V> Map<K, V> map(
}

public Identifier ident(ProcedureParameter param) {
String identAsString = string(param);
CatalogAndIdentifier catalogAndIdent = toCatalogAndIdent(identAsString, param.name(), catalog);
CatalogAndIdentifier catalogAndIdent = catalogAndIdent(param, catalog);

Preconditions.checkArgument(
catalogAndIdent.catalog().equals(catalog),
"Cannot run procedure in catalog '%s': '%s' is a table in catalog '%s'",
catalog.name(),
identAsString,
catalogAndIdent.identifier(),
catalogAndIdent.catalog().name());

return catalogAndIdent.identifier();
}

private CatalogAndIdentifier toCatalogAndIdent(
String identAsString, String paramName, CatalogPlugin defaultCatalog) {
public Identifier ident(ProcedureParameter param, CatalogPlugin defaultCatalog) {
CatalogAndIdentifier catalogAndIdent = catalogAndIdent(param, defaultCatalog);
return catalogAndIdent.identifier();
}

private CatalogAndIdentifier catalogAndIdent(
ProcedureParameter param, CatalogPlugin defaultCatalog) {

String identAsString = asString(param);

Preconditions.checkArgument(
StringUtils.isNotBlank(identAsString),
"Cannot handle an empty identifier for parameter '%s'",
paramName);
param.name());

String desc = String.format("identifier for parameter '%s'", paramName);
String desc = String.format("identifier for parameter '%s'", param.name());
return Spark3Util.catalogAndIdentifier(desc, spark, identAsString, defaultCatalog);
}

Expand Down

0 comments on commit d485cc8

Please sign in to comment.