Skip to content

Commit

Permalink
[HUDI-7098] Add max bytes per partition with cloud stores source in DS (
Browse files Browse the repository at this point in the history
  • Loading branch information
nsivabalan authored Nov 19, 2023
1 parent 4c295b2 commit 3913dca
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,20 @@ public class CloudSourceConfig extends HoodieConfig {
.sinceVersion("0.14.0")
.withDocumentation("A comma delimited list of path-based partition fields in the source file structure.");

public static final ConfigProperty<Boolean> SPARK_DATASOURCE_READER_COMMA_SEPARATED_PATH_FORMAT = ConfigProperty
.key(STREAMER_CONFIG_PREFIX + "source.cloud.data.reader.comma.separated.path.format")
.defaultValue(false)
.markAdvanced()
.sinceVersion("0.14.1")
.withDocumentation("Boolean value for specifying path format in load args of spark.read.format(\"..\").load(\"a.xml,b.xml,c.xml\"),\n"
+ " * set true if path format needs to be comma separated string value, if false it's passed as array of strings like\n"
+ " * spark.read.format(\"..\").load(new String[]{a.xml,b.xml,c.xml})");

public static final ConfigProperty<String> SOURCE_MAX_BYTES_PER_PARTITION = ConfigProperty
.key(STREAMER_CONFIG_PREFIX + "source.cloud.data.partition.max.size")
.noDefaultValue()
.markAdvanced()
.sinceVersion("0.14.1")
.withDocumentation("specify this value in bytes, to coalesce partitions of source dataset not greater than specified limit");

}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@
import static org.apache.hudi.common.util.ConfigUtils.containsConfigProperty;
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
import static org.apache.hudi.utilities.config.CloudSourceConfig.PATH_BASED_PARTITION_FIELDS;
import static org.apache.hudi.utilities.sources.helpers.CloudStoreIngestionConfig.SPARK_DATASOURCE_READER_COMMA_SEPARATED_PATH_FORMAT;
import static org.apache.hudi.utilities.config.CloudSourceConfig.SOURCE_MAX_BYTES_PER_PARTITION;
import static org.apache.hudi.utilities.config.CloudSourceConfig.SPARK_DATASOURCE_READER_COMMA_SEPARATED_PATH_FORMAT;
import static org.apache.spark.sql.functions.input_file_name;
import static org.apache.spark.sql.functions.split;

Expand Down Expand Up @@ -191,9 +192,11 @@ public static Option<Dataset<Row>> loadAsDataset(SparkSession spark, List<CloudO
}
// inflate 10% for potential hoodie meta fields
totalSize *= 1.1;
long parquetMaxFileSize = props.getLong(PARQUET_MAX_FILE_SIZE.key(), Long.parseLong(PARQUET_MAX_FILE_SIZE.defaultValue()));
int numPartitions = (int) Math.max(totalSize / parquetMaxFileSize, 1);
boolean isCommaSeparatedPathFormat = props.getBoolean(SPARK_DATASOURCE_READER_COMMA_SEPARATED_PATH_FORMAT, false);
// if source bytes are provided, then give preference to that.
long bytesPerPartition = props.containsKey(SOURCE_MAX_BYTES_PER_PARTITION.key()) ? props.getLong(SOURCE_MAX_BYTES_PER_PARTITION.key()) :
props.getLong(PARQUET_MAX_FILE_SIZE.key(), Long.parseLong(PARQUET_MAX_FILE_SIZE.defaultValue()));
int numPartitions = (int) Math.max(Math.ceil(totalSize / bytesPerPartition), 1);
boolean isCommaSeparatedPathFormat = props.getBoolean(SPARK_DATASOURCE_READER_COMMA_SEPARATED_PATH_FORMAT.key(), false);

Dataset<Row> dataset;
if (isCommaSeparatedPathFormat) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,4 @@ public class CloudStoreIngestionConfig {
* A comma delimited list of path-based partition fields in the source file structure
*/
public static final String PATH_BASED_PARTITION_FIELDS = "hoodie.deltastreamer.source.cloud.data.partition.fields.from.path";

/**
* boolean value for specifying path format in load args of spark.read.format("..").load("a.xml,b.xml,c.xml"),
* set true if path format needs to be comma separated string value, if false it's passed as array of strings like
* spark.read.format("..").load(new String[]{a.xml,b.xml,c.xml})
*/
public static final String SPARK_DATASOURCE_READER_COMMA_SEPARATED_PATH_FORMAT = "hoodie.deltastreamer.source.cloud.data.reader.comma.separated.path.format";
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@
*/
public class QueryRunner {
private final SparkSession sparkSession;
private final TypedProperties props;
private final String sourcePath;

private static final Logger LOG = LoggerFactory.getLogger(QueryRunner.class);

public QueryRunner(SparkSession sparkSession, TypedProperties props) {
this.sparkSession = sparkSession;
this.props = props;
checkRequiredConfigProperties(props, Collections.singletonList(HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH));
this.sourcePath = getStringWithAltKeys(props, HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH);
}
Expand Down Expand Up @@ -85,7 +87,11 @@ public Dataset<Row> runIncrementalQuery(QueryInfo queryInfo) {
return sparkSession.read().format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE().key(), queryInfo.getQueryType())
.option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(), queryInfo.getPreviousInstant())
.option(DataSourceReadOptions.END_INSTANTTIME().key(), queryInfo.getEndInstant()).load(sourcePath);
.option(DataSourceReadOptions.END_INSTANTTIME().key(), queryInfo.getEndInstant())
.option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().key(),
props.getString(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().key(),
DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().defaultValue()))
.load(sourcePath);
}

public Dataset<Row> runSnapshotQuery(QueryInfo queryInfo) {
Expand Down

0 comments on commit 3913dca

Please sign in to comment.