Skip to content

Commit

Permalink
[HUDI-4551] The default value of READ_TASKS, WRITE_TASKS, CLUSTERING_…
Browse files Browse the repository at this point in the history
…TASKS is the parallelism of the execution environment
  • Loading branch information
SteNicholas committed Aug 5, 2022
1 parent a75cc02 commit 33048bf
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ private FlinkOptions() {
public static final ConfigOption<Integer> READ_TASKS = ConfigOptions
.key("read.tasks")
.intType()
.defaultValue(4)
.withDescription("Parallelism of tasks that do actual read, default is 4");
.noDefaultValue()
.withDescription("Parallelism of tasks that do actual read, default is the parallelism of the execution environment");

public static final ConfigOption<String> SOURCE_AVRO_SCHEMA_PATH = ConfigOptions
.key("source.avro-schema.path")
Expand Down Expand Up @@ -406,8 +406,8 @@ private FlinkOptions() {
public static final ConfigOption<Integer> WRITE_TASKS = ConfigOptions
.key("write.tasks")
.intType()
.defaultValue(4)
.withDescription("Parallelism of tasks that do actual write, default is 4");
.noDefaultValue()
.withDescription("Parallelism of tasks that do actual write, default is the parallelism of the execution environment");

public static final ConfigOption<Double> WRITE_TASK_MAX_SIZE = ConfigOptions
.key("write.task.max.size")
Expand Down Expand Up @@ -512,8 +512,8 @@ private FlinkOptions() {
public static final ConfigOption<Integer> COMPACTION_TASKS = ConfigOptions
.key("compaction.tasks")
.intType()
.defaultValue(4) // default WRITE_TASKS * COMPACTION_DELTA_COMMITS * 0.2 (assumes 5 commits generate one bucket)
.withDescription("Parallelism of tasks that do actual compaction, default is 4");
.noDefaultValue() // default WRITE_TASKS * COMPACTION_DELTA_COMMITS / 5 (assumes 5 commits generate one bucket)
.withDescription("Parallelism of tasks that do actual compaction, default is WRITE_TASKS * COMPACTION_DELTA_COMMITS / 5");

public static final String NUM_COMMITS = "num_commits";
public static final String TIME_ELAPSED = "time_elapsed";
Expand Down Expand Up @@ -622,8 +622,8 @@ private FlinkOptions() {
public static final ConfigOption<Integer> CLUSTERING_TASKS = ConfigOptions
.key("clustering.tasks")
.intType()
.defaultValue(4)
.withDescription("Parallelism of tasks that do actual clustering, default is 4");
.noDefaultValue()
.withDescription("Parallelism of tasks that do actual clustering, default is the parallelism of the execution environment");

public static final ConfigOption<Integer> CLUSTERING_TARGET_PARTITIONS = ConfigOptions
.key("clustering.plan.strategy.daybased.lookback.partitions")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,10 @@ private void compact() throws Exception {
}

// get compactionParallelism.
int compactionParallelism = conf.getInteger(FlinkOptions.COMPACTION_TASKS) == -1
int compactionTasks = conf.getOptional(FlinkOptions.COMPACTION_TASKS).orElse(env.getParallelism());
int compactionParallelism = compactionTasks == -1
? Math.toIntExact(compactionPlans.stream().mapToLong(pair -> pair.getRight().getOperations().size()).sum())
: conf.getInteger(FlinkOptions.COMPACTION_TASKS);
: Math.max(compactionTasks, 1);

LOG.info("Start to compaction for instant " + compactionInstantTimes);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,13 @@ public class Pipelines {
*
* <p>The bulk insert should be run in batch execution mode.
*
* @param conf The configuration
* @param rowType The input row type
* @param dataStream The input data stream
* @param conf The configuration
* @param defaultParallelism The default parallelism
* @param rowType The input row type
* @param dataStream The input data stream
* @return the bulk insert data stream sink
*/
public static DataStreamSink<Object> bulkInsert(Configuration conf, RowType rowType, DataStream<RowData> dataStream) {
public static DataStreamSink<Object> bulkInsert(Configuration conf, int defaultParallelism, RowType rowType, DataStream<RowData> dataStream) {
WriteOperatorFactory<RowData> operatorFactory = BulkInsertWriteOperator.getFactory(conf, rowType);
if (OptionsResolver.isBucketIndexType(conf)) {
String indexKeys = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
Expand All @@ -111,18 +112,18 @@ public static DataStreamSink<Object> bulkInsert(Configuration conf, RowType rowT
Map<String, String> bucketIdToFileId = new HashMap<>();
dataStream = dataStream.partitionCustom(partitioner, keyGen::getHoodieKey)
.map(record -> BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, record, indexKeys, numBuckets), typeInfo)
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); // same parallelism as write task to avoid shuffle
.setParallelism(getWriteTasks(conf, defaultParallelism)); // same parallelism as write task to avoid shuffle
if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) {
SortOperatorGen sortOperatorGen = BucketBulkInsertWriterHelper.getFileIdSorterGen(rowTypeWithFileId);
dataStream = dataStream.transform("file_sorter", typeInfo, sortOperatorGen.createSortOperator())
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); // same parallelism as write task to avoid shuffle
.setParallelism(getWriteTasks(conf, defaultParallelism)); // same parallelism as write task to avoid shuffle
ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(),
conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
}
return dataStream
.transform(opIdentifier("bucket_bulk_insert", conf), TypeInformation.of(Object.class), operatorFactory)
.uid("uid_bucket_bulk_insert" + conf.getString(FlinkOptions.TABLE_NAME))
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
.setParallelism(getWriteTasks(conf, defaultParallelism))
.addSink(DummySink.INSTANCE)
.name("dummy");
}
Expand All @@ -136,7 +137,7 @@ public static DataStreamSink<Object> bulkInsert(Configuration conf, RowType rowT
// use #partitionCustom instead of #keyBy to avoid duplicate sort operations,
// see BatchExecutionUtils#applyBatchExecutionSettings for details.
Partitioner<String> partitioner = (key, channels) -> KeyGroupRangeAssignment.assignKeyToParallelOperator(key,
KeyGroupRangeAssignment.computeDefaultMaxParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)), channels);
KeyGroupRangeAssignment.computeDefaultMaxParallelism(getWriteTasks(conf, defaultParallelism)), channels);
dataStream = dataStream.partitionCustom(partitioner, rowDataKeyGen::getPartitionPath);
}
if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) {
Expand All @@ -146,7 +147,7 @@ public static DataStreamSink<Object> bulkInsert(Configuration conf, RowType rowT
.transform("partition_key_sorter",
TypeInformation.of(RowData.class),
sortOperatorGen.createSortOperator())
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
.setParallelism(getWriteTasks(conf, defaultParallelism));
ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(),
conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
}
Expand All @@ -156,7 +157,7 @@ public static DataStreamSink<Object> bulkInsert(Configuration conf, RowType rowT
TypeInformation.of(Object.class),
operatorFactory)
// follow the parallelism of upstream operators to avoid shuffle
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
.setParallelism(getWriteTasks(conf, defaultParallelism))
.addSink(DummySink.INSTANCE)
.name("dummy");
}
Expand All @@ -177,14 +178,16 @@ public static DataStreamSink<Object> bulkInsert(Configuration conf, RowType rowT
* <p>The write task switches to new file handle each time it receives a record
* from the different partition path, so there may be many small files.
*
* @param conf The configuration
* @param rowType The input row type
* @param dataStream The input data stream
* @param bounded Whether the input stream is bounded
* @param conf The configuration
* @param defaultParallelism The default parallelism
* @param rowType The input row type
* @param dataStream The input data stream
* @param bounded Whether the input stream is bounded
* @return the appending data stream sink
*/
public static DataStream<Object> append(
Configuration conf,
int defaultParallelism,
RowType rowType,
DataStream<RowData> dataStream,
boolean bounded) {
Expand All @@ -198,7 +201,7 @@ public static DataStream<Object> append(
return dataStream
.transform(opIdentifier("hoodie_append_write", conf), TypeInformation.of(Object.class), operatorFactory)
.uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME))
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
.setParallelism(getWriteTasks(conf, defaultParallelism));
}

/**
Expand Down Expand Up @@ -319,7 +322,7 @@ public static DataStream<HoodieRecord> rowDataToHoodieRecord(Configuration conf,
* @param dataStream The input data stream
* @return the stream write data stream pipeline
*/
public static DataStream<Object> hoodieStreamWrite(Configuration conf, int defaultParallelism, DataStream<HoodieRecord> dataStream) {
public static DataStream<Object> hoodieStreamWrite(Configuration conf, int defaultParallelism,DataStream<HoodieRecord> dataStream) {
if (OptionsResolver.isBucketIndexType(conf)) {
WriteOperatorFactory<HoodieRecord> operatorFactory = BucketStreamWriteOperator.getFactory(conf);
int bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
Expand All @@ -328,7 +331,7 @@ public static DataStream<Object> hoodieStreamWrite(Configuration conf, int defau
return dataStream.partitionCustom(partitioner, HoodieRecord::getKey)
.transform(opIdentifier("bucket_write", conf), TypeInformation.of(Object.class), operatorFactory)
.uid("uid_bucket_write" + conf.getString(FlinkOptions.TABLE_NAME))
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
.setParallelism(getWriteTasks(conf, defaultParallelism));
} else {
WriteOperatorFactory<HoodieRecord> operatorFactory = StreamWriteOperator.getFactory(conf);
return dataStream
Expand All @@ -344,7 +347,7 @@ public static DataStream<Object> hoodieStreamWrite(Configuration conf, int defau
.keyBy(record -> record.getCurrentLocation().getFileId())
.transform(opIdentifier("stream_write", conf), TypeInformation.of(Object.class), operatorFactory)
.uid("uid_stream_write" + conf.getString(FlinkOptions.TABLE_NAME))
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
.setParallelism(getWriteTasks(conf, defaultParallelism));
}
}

Expand All @@ -364,11 +367,13 @@ public static DataStream<Object> hoodieStreamWrite(Configuration conf, int defau
* Note: both the compaction plan generation task and commission task are singleton.
* </pre>
*
* @param conf The configuration
* @param dataStream The input data stream
* @param conf The configuration
* @param defaultParallelism The default parallelism
* @param dataStream The input data stream
* @return the compaction pipeline
*/
public static DataStreamSink<CompactionCommitEvent> compact(Configuration conf, DataStream<Object> dataStream) {
public static DataStreamSink<CompactionCommitEvent> compact(Configuration conf, int defaultParallelism, DataStream<Object> dataStream) {
int compactionTasks = getWriteTasks(conf, defaultParallelism) * conf.getInteger(FlinkOptions.COMPACTION_DELTA_COMMITS) / 5;
return dataStream.transform("compact_plan_generate",
TypeInformation.of(CompactionPlanEvent.class),
new CompactionPlanOperator(conf))
Expand All @@ -379,7 +384,8 @@ public static DataStreamSink<CompactionCommitEvent> compact(Configuration conf,
.transform("compact_task",
TypeInformation.of(CompactionCommitEvent.class),
new ProcessOperator<>(new CompactFunction(conf)))
.setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS))
.setParallelism(conf.getOptional(FlinkOptions.COMPACTION_TASKS)
.orElse(Math.max(compactionTasks, 1)))
.addSink(new CompactionCommitSink(conf))
.name("compact_commit")
.setParallelism(1); // compaction commit should be singleton
Expand All @@ -401,12 +407,13 @@ public static DataStreamSink<CompactionCommitEvent> compact(Configuration conf,
* Note: both the clustering plan generation task and commission task are singleton.
* </pre>
*
* @param conf The configuration
* @param rowType The input row type
* @param dataStream The input data stream
* @param conf The configuration
* @param defaultParallelism The default parallelism
* @param rowType The input row type
* @param dataStream The input data stream
* @return the clustering pipeline
*/
public static DataStreamSink<ClusteringCommitEvent> cluster(Configuration conf, RowType rowType, DataStream<Object> dataStream) {
public static DataStreamSink<ClusteringCommitEvent> cluster(Configuration conf, int defaultParallelism, RowType rowType, DataStream<Object> dataStream) {
return dataStream.transform("cluster_plan_generate",
TypeInformation.of(ClusteringPlanEvent.class),
new ClusteringPlanOperator(conf))
Expand All @@ -419,7 +426,7 @@ public static DataStreamSink<ClusteringCommitEvent> cluster(Configuration conf,
.transform("clustering_task",
TypeInformation.of(ClusteringCommitEvent.class),
new ClusteringOperator(conf, rowType))
.setParallelism(conf.getInteger(FlinkOptions.CLUSTERING_TASKS))
.setParallelism(conf.getOptional(FlinkOptions.CLUSTERING_TASKS).orElse(defaultParallelism))
.addSink(new ClusteringCommitSink(conf))
.name("clustering_commit")
.setParallelism(1); // compaction commit should be singleton
Expand All @@ -439,6 +446,16 @@ public static String opIdentifier(String operatorN, Configuration conf) {
return operatorN + ": " + conf.getString(FlinkOptions.TABLE_NAME);
}

/**
* Gets parallelism of tasks that do actual write.
* @param conf The configuration
* @param defaultParallelism The default parallelism
* @return the parallelism of tasks that do actual write
*/
private static int getWriteTasks(Configuration conf, int defaultParallelism) {
return conf.getOptional(FlinkOptions.WRITE_TASKS).orElse(defaultParallelism);
}

/**
* Dummy sink that does nothing.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public static void main(String[] args) throws Exception {
DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream);
DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream);
if (OptionsResolver.needsAsyncCompaction(conf)) {
Pipelines.compact(conf, pipeline);
Pipelines.compact(conf, parallelism, pipeline);
} else {
Pipelines.clean(conf, pipeline);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,24 +69,25 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {

RowType rowType = (RowType) schema.toSinkRowDataType().notNull().getLogicalType();

// default parallelism
int parallelism = dataStream.getExecutionConfig().getParallelism();

// bulk_insert mode
final String writeOperation = this.conf.get(FlinkOptions.OPERATION);
if (WriteOperationType.fromValue(writeOperation) == WriteOperationType.BULK_INSERT) {
return Pipelines.bulkInsert(conf, rowType, dataStream);
return Pipelines.bulkInsert(conf, parallelism, rowType, dataStream);
}

// Append mode
if (OptionsResolver.isAppendMode(conf)) {
DataStream<Object> pipeline = Pipelines.append(conf, rowType, dataStream, context.isBounded());
DataStream<Object> pipeline = Pipelines.append(conf, parallelism, rowType, dataStream, context.isBounded());
if (OptionsResolver.needsAsyncClustering(conf)) {
return Pipelines.cluster(conf, rowType, pipeline);
return Pipelines.cluster(conf, parallelism, rowType, pipeline);
} else {
return Pipelines.dummySink(pipeline);
}
}

// default parallelism
int parallelism = dataStream.getExecutionConfig().getParallelism();
DataStream<Object> pipeline;
// bootstrap
final DataStream<HoodieRecord> hoodieRecordDataStream =
Expand All @@ -99,7 +100,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
if (context.isBounded()) {
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
}
return Pipelines.compact(conf, pipeline);
return Pipelines.compact(conf, parallelism, pipeline);
} else {
return Pipelines.clean(conf, pipeline);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv)
@SuppressWarnings("unchecked")
TypeInformation<RowData> typeInfo =
(TypeInformation<RowData>) TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType());
// default parallelism
int parallelism = execEnv.getParallelism();
if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
StreamReadMonitoringFunction monitoringFunction = new StreamReadMonitoringFunction(
conf, FilePathUtils.toFlinkPath(path), tableRowType, maxCompactionMemoryInBytes, getRequiredPartitionPaths());
Expand All @@ -188,12 +190,12 @@ public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv)
.setParallelism(1)
.keyBy(MergeOnReadInputSplit::getFileId)
.transform("split_reader", typeInfo, factory)
.setParallelism(conf.getInteger(FlinkOptions.READ_TASKS));
.setParallelism(conf.getOptional(FlinkOptions.READ_TASKS).orElse(parallelism));
return new DataStreamSource<>(source);
} else {
InputFormatSourceFunction<RowData> func = new InputFormatSourceFunction<>(getInputFormat(), typeInfo);
DataStreamSource<RowData> source = execEnv.addSource(func, asSummaryString(), typeInfo);
return source.name(getSourceOperatorName("bounded_source")).setParallelism(conf.getInteger(FlinkOptions.READ_TASKS));
return source.name(getSourceOperatorName("bounded_source")).setParallelism(conf.getOptional(FlinkOptions.READ_TASKS).orElse(parallelism));
}
}
};
Expand Down
Loading

0 comments on commit 33048bf

Please sign in to comment.