diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java index 16d9b73c70e2..e7b44cf91214 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java @@ -121,4 +121,20 @@ public class CloudSourceConfig extends HoodieConfig { .sinceVersion("0.14.0") .withDocumentation("A comma delimited list of path-based partition fields in the source file structure."); + public static final ConfigProperty SPARK_DATASOURCE_READER_COMMA_SEPARATED_PATH_FORMAT = ConfigProperty + .key(STREAMER_CONFIG_PREFIX + "source.cloud.data.reader.comma.separated.path.format") + .defaultValue(false) + .markAdvanced() + .sinceVersion("0.14.1") + .withDocumentation("Boolean value for specifying path format in load args of spark.read.format(\"..\").load(\"a.xml,b.xml,c.xml\"),\n" + + " * set true if path format needs to be comma separated string value, if false it's passed as array of strings like\n" + + " * spark.read.format(\"..\").load(new String[]{a.xml,b.xml,c.xml})"); + + public static final ConfigProperty SOURCE_MAX_BYTES_PER_PARTITION = ConfigProperty + .key(STREAMER_CONFIG_PREFIX + "source.cloud.data.partition.max.size") + .noDefaultValue() + .markAdvanced() + .sinceVersion("0.14.1") + .withDocumentation("specify this value in bytes, to coalesce partitions of source dataset not greater than specified limit"); + } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java index 19da6aada9bd..4098448b7936 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java @@ -57,7 +57,8 @@ import static org.apache.hudi.common.util.ConfigUtils.containsConfigProperty; import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys; import static org.apache.hudi.utilities.config.CloudSourceConfig.PATH_BASED_PARTITION_FIELDS; -import static org.apache.hudi.utilities.sources.helpers.CloudStoreIngestionConfig.SPARK_DATASOURCE_READER_COMMA_SEPARATED_PATH_FORMAT; +import static org.apache.hudi.utilities.config.CloudSourceConfig.SOURCE_MAX_BYTES_PER_PARTITION; +import static org.apache.hudi.utilities.config.CloudSourceConfig.SPARK_DATASOURCE_READER_COMMA_SEPARATED_PATH_FORMAT; import static org.apache.spark.sql.functions.input_file_name; import static org.apache.spark.sql.functions.split; @@ -191,9 +192,11 @@ public static Option> loadAsDataset(SparkSession spark, List dataset; if (isCommaSeparatedPathFormat) { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudStoreIngestionConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudStoreIngestionConfig.java index 66b94177b7b0..8a1c15c88869 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudStoreIngestionConfig.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudStoreIngestionConfig.java @@ -107,11 +107,4 @@ public class CloudStoreIngestionConfig { * A comma delimited list of path-based partition fields in the source file structure */ public static final String PATH_BASED_PARTITION_FIELDS = "hoodie.deltastreamer.source.cloud.data.partition.fields.from.path"; - - /** - * boolean value for specifying path format in load args of spark.read.format("..").load("a.xml,b.xml,c.xml"), - * set true if path format needs to be comma separated string value, if false it's passed as array of strings like - * spark.read.format("..").load(new String[]{a.xml,b.xml,c.xml}) - */ - public static final String SPARK_DATASOURCE_READER_COMMA_SEPARATED_PATH_FORMAT = "hoodie.deltastreamer.source.cloud.data.reader.comma.separated.path.format"; } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java index 761e942549c1..597c0195f5e8 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java @@ -44,12 +44,14 @@ */ public class QueryRunner { private final SparkSession sparkSession; + private final TypedProperties props; private final String sourcePath; private static final Logger LOG = LoggerFactory.getLogger(QueryRunner.class); public QueryRunner(SparkSession sparkSession, TypedProperties props) { this.sparkSession = sparkSession; + this.props = props; checkRequiredConfigProperties(props, Collections.singletonList(HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH)); this.sourcePath = getStringWithAltKeys(props, HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH); } @@ -85,7 +87,11 @@ public Dataset runIncrementalQuery(QueryInfo queryInfo) { return sparkSession.read().format("org.apache.hudi") .option(DataSourceReadOptions.QUERY_TYPE().key(), queryInfo.getQueryType()) .option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(), queryInfo.getPreviousInstant()) - .option(DataSourceReadOptions.END_INSTANTTIME().key(), queryInfo.getEndInstant()).load(sourcePath); + .option(DataSourceReadOptions.END_INSTANTTIME().key(), queryInfo.getEndInstant()) + .option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().key(), + props.getString(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().key(), + DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().defaultValue())) + .load(sourcePath); } public Dataset runSnapshotQuery(QueryInfo queryInfo) {