Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-7098] Adding max bytes per partition with cloud stores source in DS #10100

Merged
merged 2 commits into from
Nov 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,20 @@ public class CloudSourceConfig extends HoodieConfig {
.sinceVersion("0.14.0")
.withDocumentation("A comma delimited list of path-based partition fields in the source file structure.");

public static final ConfigProperty<Boolean> SPARK_DATASOURCE_READER_COMMA_SEPARATED_PATH_FORMAT = ConfigProperty
.key(STREAMER_CONFIG_PREFIX + "source.cloud.data.reader.comma.separated.path.format")
.defaultValue(false)
.markAdvanced()
.sinceVersion("0.14.1")
.withDocumentation("Boolean value for specifying path format in load args of spark.read.format(\"..\").load(\"a.xml,b.xml,c.xml\"),\n"
+ " * set true if path format needs to be comma separated string value, if false it's passed as array of strings like\n"
+ " * spark.read.format(\"..\").load(new String[]{a.xml,b.xml,c.xml})");

public static final ConfigProperty<String> SOURCE_MAX_BYTES_PER_PARTITION = ConfigProperty
.key(STREAMER_CONFIG_PREFIX + "source.cloud.data.partition.max.size")
.noDefaultValue()
.markAdvanced()
.sinceVersion("0.14.1")
.withDocumentation("specify this value in bytes, to coalesce partitions of source dataset not greater than specified limit");

}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@
import static org.apache.hudi.common.util.ConfigUtils.containsConfigProperty;
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
import static org.apache.hudi.utilities.config.CloudSourceConfig.PATH_BASED_PARTITION_FIELDS;
import static org.apache.hudi.utilities.sources.helpers.CloudStoreIngestionConfig.SPARK_DATASOURCE_READER_COMMA_SEPARATED_PATH_FORMAT;
import static org.apache.hudi.utilities.config.CloudSourceConfig.SOURCE_MAX_BYTES_PER_PARTITION;
import static org.apache.hudi.utilities.config.CloudSourceConfig.SPARK_DATASOURCE_READER_COMMA_SEPARATED_PATH_FORMAT;
import static org.apache.spark.sql.functions.input_file_name;
import static org.apache.spark.sql.functions.split;

Expand Down Expand Up @@ -191,9 +192,11 @@ public static Option<Dataset<Row>> loadAsDataset(SparkSession spark, List<CloudO
}
// inflate 10% for potential hoodie meta fields
totalSize *= 1.1;
long parquetMaxFileSize = props.getLong(PARQUET_MAX_FILE_SIZE.key(), Long.parseLong(PARQUET_MAX_FILE_SIZE.defaultValue()));
int numPartitions = (int) Math.max(totalSize / parquetMaxFileSize, 1);
boolean isCommaSeparatedPathFormat = props.getBoolean(SPARK_DATASOURCE_READER_COMMA_SEPARATED_PATH_FORMAT, false);
// if source bytes are provided, then give preference to that.
long bytesPerPartition = props.containsKey(SOURCE_MAX_BYTES_PER_PARTITION.key()) ? props.getLong(SOURCE_MAX_BYTES_PER_PARTITION.key()) :
props.getLong(PARQUET_MAX_FILE_SIZE.key(), Long.parseLong(PARQUET_MAX_FILE_SIZE.defaultValue()));
int numPartitions = (int) Math.max(Math.ceil(totalSize / bytesPerPartition), 1);
boolean isCommaSeparatedPathFormat = props.getBoolean(SPARK_DATASOURCE_READER_COMMA_SEPARATED_PATH_FORMAT.key(), false);

Dataset<Row> dataset;
if (isCommaSeparatedPathFormat) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,4 @@ public class CloudStoreIngestionConfig {
* A comma delimited list of path-based partition fields in the source file structure
*/
public static final String PATH_BASED_PARTITION_FIELDS = "hoodie.deltastreamer.source.cloud.data.partition.fields.from.path";

/**
* boolean value for specifying path format in load args of spark.read.format("..").load("a.xml,b.xml,c.xml"),
* set true if path format needs to be comma separated string value, if false it's passed as array of strings like
* spark.read.format("..").load(new String[]{a.xml,b.xml,c.xml})
*/
public static final String SPARK_DATASOURCE_READER_COMMA_SEPARATED_PATH_FORMAT = "hoodie.deltastreamer.source.cloud.data.reader.comma.separated.path.format";
nsivabalan marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@
*/
public class QueryRunner {
private final SparkSession sparkSession;
private final TypedProperties props;
private final String sourcePath;

private static final Logger LOG = LoggerFactory.getLogger(QueryRunner.class);

public QueryRunner(SparkSession sparkSession, TypedProperties props) {
this.sparkSession = sparkSession;
this.props = props;
checkRequiredConfigProperties(props, Collections.singletonList(HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH));
this.sourcePath = getStringWithAltKeys(props, HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH);
}
Expand Down Expand Up @@ -85,7 +87,11 @@ public Dataset<Row> runIncrementalQuery(QueryInfo queryInfo) {
return sparkSession.read().format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE().key(), queryInfo.getQueryType())
.option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(), queryInfo.getPreviousInstant())
.option(DataSourceReadOptions.END_INSTANTTIME().key(), queryInfo.getEndInstant()).load(sourcePath);
.option(DataSourceReadOptions.END_INSTANTTIME().key(), queryInfo.getEndInstant())
.option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().key(),
props.getString(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().key(),
DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().defaultValue()))
.load(sourcePath);
}

public Dataset<Row> runSnapshotQuery(QueryInfo queryInfo) {
Expand Down
Loading