diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 933c112312c1a..d7b74c03ae1f6 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -153,8 +153,8 @@ private FlinkOptions() { public static final ConfigOption READ_TASKS = ConfigOptions .key("read.tasks") .intType() - .defaultValue(4) - .withDescription("Parallelism of tasks that do actual read, default is 4"); + .noDefaultValue() + .withDescription("Parallelism of tasks that do actual read, default is the parallelism of the execution environment"); public static final ConfigOption SOURCE_AVRO_SCHEMA_PATH = ConfigOptions .key("source.avro-schema.path") @@ -406,8 +406,8 @@ private FlinkOptions() { public static final ConfigOption WRITE_TASKS = ConfigOptions .key("write.tasks") .intType() - .defaultValue(4) - .withDescription("Parallelism of tasks that do actual write, default is 4"); + .noDefaultValue() + .withDescription("Parallelism of tasks that do actual write, default is the parallelism of the execution environment"); public static final ConfigOption WRITE_TASK_MAX_SIZE = ConfigOptions .key("write.task.max.size") @@ -512,8 +512,8 @@ private FlinkOptions() { public static final ConfigOption COMPACTION_TASKS = ConfigOptions .key("compaction.tasks") .intType() - .defaultValue(4) // default WRITE_TASKS * COMPACTION_DELTA_COMMITS * 0.2 (assumes 5 commits generate one bucket) - .withDescription("Parallelism of tasks that do actual compaction, default is 4"); + .noDefaultValue() // default WRITE_TASKS * COMPACTION_DELTA_COMMITS / 5 (assumes 5 commits generate one bucket) + .withDescription("Parallelism of tasks that do actual compaction, default is WRITE_TASKS * COMPACTION_DELTA_COMMITS / 5"); public static final String NUM_COMMITS = "num_commits"; public static final String TIME_ELAPSED = "time_elapsed"; @@ -622,8 +622,8 @@ private FlinkOptions() { public static final ConfigOption CLUSTERING_TASKS = ConfigOptions .key("clustering.tasks") .intType() - .defaultValue(4) - .withDescription("Parallelism of tasks that do actual clustering, default is 4"); + .noDefaultValue() + .withDescription("Parallelism of tasks that do actual clustering, default is the parallelism of the execution environment"); public static final ConfigOption CLUSTERING_TARGET_PARTITIONS = ConfigOptions .key("clustering.plan.strategy.daybased.lookback.partitions") diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java index e2d2972a0de43..8f8dea6c37854 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java @@ -275,9 +275,10 @@ private void compact() throws Exception { } // get compactionParallelism. - int compactionParallelism = conf.getInteger(FlinkOptions.COMPACTION_TASKS) == -1 + int compactionTasks = conf.getOptional(FlinkOptions.COMPACTION_TASKS).orElse(env.getParallelism()); + int compactionParallelism = compactionTasks == -1 ? Math.toIntExact(compactionPlans.stream().mapToLong(pair -> pair.getRight().getOperations().size()).sum()) - : conf.getInteger(FlinkOptions.COMPACTION_TASKS); + : Math.max(compactionTasks, 1); LOG.info("Start to compaction for instant " + compactionInstantTimes); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java index f89bdb2606b01..6deef280a73ff 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java @@ -92,12 +92,13 @@ public class Pipelines { * *

The bulk insert should be run in batch execution mode. * - * @param conf The configuration - * @param rowType The input row type - * @param dataStream The input data stream + * @param conf The configuration + * @param defaultParallelism The default parallelism + * @param rowType The input row type + * @param dataStream The input data stream * @return the bulk insert data stream sink */ - public static DataStreamSink bulkInsert(Configuration conf, RowType rowType, DataStream dataStream) { + public static DataStreamSink bulkInsert(Configuration conf, int defaultParallelism, RowType rowType, DataStream dataStream) { WriteOperatorFactory operatorFactory = BulkInsertWriteOperator.getFactory(conf, rowType); if (OptionsResolver.isBucketIndexType(conf)) { String indexKeys = conf.getString(FlinkOptions.INDEX_KEY_FIELD); @@ -111,18 +112,18 @@ public static DataStreamSink bulkInsert(Configuration conf, RowType rowT Map bucketIdToFileId = new HashMap<>(); dataStream = dataStream.partitionCustom(partitioner, keyGen::getHoodieKey) .map(record -> BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, record, indexKeys, numBuckets), typeInfo) - .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); // same parallelism as write task to avoid shuffle + .setParallelism(getWriteTasks(conf, defaultParallelism)); // same parallelism as write task to avoid shuffle if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) { SortOperatorGen sortOperatorGen = BucketBulkInsertWriterHelper.getFileIdSorterGen(rowTypeWithFileId); dataStream = dataStream.transform("file_sorter", typeInfo, sortOperatorGen.createSortOperator()) - .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); // same parallelism as write task to avoid shuffle + .setParallelism(getWriteTasks(conf, defaultParallelism)); // same parallelism as write task to avoid shuffle ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(), conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L); } return dataStream .transform(opIdentifier("bucket_bulk_insert", conf), TypeInformation.of(Object.class), operatorFactory) .uid("uid_bucket_bulk_insert" + conf.getString(FlinkOptions.TABLE_NAME)) - .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)) + .setParallelism(getWriteTasks(conf, defaultParallelism)) .addSink(DummySink.INSTANCE) .name("dummy"); } @@ -136,7 +137,7 @@ public static DataStreamSink bulkInsert(Configuration conf, RowType rowT // use #partitionCustom instead of #keyBy to avoid duplicate sort operations, // see BatchExecutionUtils#applyBatchExecutionSettings for details. Partitioner partitioner = (key, channels) -> KeyGroupRangeAssignment.assignKeyToParallelOperator(key, - KeyGroupRangeAssignment.computeDefaultMaxParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)), channels); + KeyGroupRangeAssignment.computeDefaultMaxParallelism(getWriteTasks(conf, defaultParallelism)), channels); dataStream = dataStream.partitionCustom(partitioner, rowDataKeyGen::getPartitionPath); } if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) { @@ -146,7 +147,7 @@ public static DataStreamSink bulkInsert(Configuration conf, RowType rowT .transform("partition_key_sorter", TypeInformation.of(RowData.class), sortOperatorGen.createSortOperator()) - .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); + .setParallelism(getWriteTasks(conf, defaultParallelism)); ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(), conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L); } @@ -156,7 +157,7 @@ public static DataStreamSink bulkInsert(Configuration conf, RowType rowT TypeInformation.of(Object.class), operatorFactory) // follow the parallelism of upstream operators to avoid shuffle - .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)) + .setParallelism(getWriteTasks(conf, defaultParallelism)) .addSink(DummySink.INSTANCE) .name("dummy"); } @@ -177,14 +178,16 @@ public static DataStreamSink bulkInsert(Configuration conf, RowType rowT *

The write task switches to new file handle each time it receives a record * from the different partition path, so there may be many small files. * - * @param conf The configuration - * @param rowType The input row type - * @param dataStream The input data stream - * @param bounded Whether the input stream is bounded + * @param conf The configuration + * @param defaultParallelism The default parallelism + * @param rowType The input row type + * @param dataStream The input data stream + * @param bounded Whether the input stream is bounded * @return the appending data stream sink */ public static DataStream append( Configuration conf, + int defaultParallelism, RowType rowType, DataStream dataStream, boolean bounded) { @@ -198,7 +201,7 @@ public static DataStream append( return dataStream .transform(opIdentifier("hoodie_append_write", conf), TypeInformation.of(Object.class), operatorFactory) .uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME)) - .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); + .setParallelism(getWriteTasks(conf, defaultParallelism)); } /** @@ -319,7 +322,7 @@ public static DataStream rowDataToHoodieRecord(Configuration conf, * @param dataStream The input data stream * @return the stream write data stream pipeline */ - public static DataStream hoodieStreamWrite(Configuration conf, int defaultParallelism, DataStream dataStream) { + public static DataStream hoodieStreamWrite(Configuration conf, int defaultParallelism,DataStream dataStream) { if (OptionsResolver.isBucketIndexType(conf)) { WriteOperatorFactory operatorFactory = BucketStreamWriteOperator.getFactory(conf); int bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS); @@ -328,7 +331,7 @@ public static DataStream hoodieStreamWrite(Configuration conf, int defau return dataStream.partitionCustom(partitioner, HoodieRecord::getKey) .transform(opIdentifier("bucket_write", conf), TypeInformation.of(Object.class), operatorFactory) .uid("uid_bucket_write" + conf.getString(FlinkOptions.TABLE_NAME)) - .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); + .setParallelism(getWriteTasks(conf, defaultParallelism)); } else { WriteOperatorFactory operatorFactory = StreamWriteOperator.getFactory(conf); return dataStream @@ -344,7 +347,7 @@ public static DataStream hoodieStreamWrite(Configuration conf, int defau .keyBy(record -> record.getCurrentLocation().getFileId()) .transform(opIdentifier("stream_write", conf), TypeInformation.of(Object.class), operatorFactory) .uid("uid_stream_write" + conf.getString(FlinkOptions.TABLE_NAME)) - .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); + .setParallelism(getWriteTasks(conf, defaultParallelism)); } } @@ -364,11 +367,13 @@ public static DataStream hoodieStreamWrite(Configuration conf, int defau * Note: both the compaction plan generation task and commission task are singleton. * * - * @param conf The configuration - * @param dataStream The input data stream + * @param conf The configuration + * @param defaultParallelism The default parallelism + * @param dataStream The input data stream * @return the compaction pipeline */ - public static DataStreamSink compact(Configuration conf, DataStream dataStream) { + public static DataStreamSink compact(Configuration conf, int defaultParallelism, DataStream dataStream) { + int compactionTasks = getWriteTasks(conf, defaultParallelism) * conf.getInteger(FlinkOptions.COMPACTION_DELTA_COMMITS) / 5; return dataStream.transform("compact_plan_generate", TypeInformation.of(CompactionPlanEvent.class), new CompactionPlanOperator(conf)) @@ -379,7 +384,8 @@ public static DataStreamSink compact(Configuration conf, .transform("compact_task", TypeInformation.of(CompactionCommitEvent.class), new ProcessOperator<>(new CompactFunction(conf))) - .setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS)) + .setParallelism(conf.getOptional(FlinkOptions.COMPACTION_TASKS) + .orElse(Math.max(compactionTasks, 1))) .addSink(new CompactionCommitSink(conf)) .name("compact_commit") .setParallelism(1); // compaction commit should be singleton @@ -401,12 +407,13 @@ public static DataStreamSink compact(Configuration conf, * Note: both the clustering plan generation task and commission task are singleton. * * - * @param conf The configuration - * @param rowType The input row type - * @param dataStream The input data stream + * @param conf The configuration + * @param defaultParallelism The default parallelism + * @param rowType The input row type + * @param dataStream The input data stream * @return the clustering pipeline */ - public static DataStreamSink cluster(Configuration conf, RowType rowType, DataStream dataStream) { + public static DataStreamSink cluster(Configuration conf, int defaultParallelism, RowType rowType, DataStream dataStream) { return dataStream.transform("cluster_plan_generate", TypeInformation.of(ClusteringPlanEvent.class), new ClusteringPlanOperator(conf)) @@ -419,7 +426,7 @@ public static DataStreamSink cluster(Configuration conf, .transform("clustering_task", TypeInformation.of(ClusteringCommitEvent.class), new ClusteringOperator(conf, rowType)) - .setParallelism(conf.getInteger(FlinkOptions.CLUSTERING_TASKS)) + .setParallelism(conf.getOptional(FlinkOptions.CLUSTERING_TASKS).orElse(defaultParallelism)) .addSink(new ClusteringCommitSink(conf)) .name("clustering_commit") .setParallelism(1); // compaction commit should be singleton @@ -439,6 +446,16 @@ public static String opIdentifier(String operatorN, Configuration conf) { return operatorN + ": " + conf.getString(FlinkOptions.TABLE_NAME); } + /** + * Gets parallelism of tasks that do actual write. + * @param conf The configuration + * @param defaultParallelism The default parallelism + * @return the parallelism of tasks that do actual write + */ + private static int getWriteTasks(Configuration conf, int defaultParallelism) { + return conf.getOptional(FlinkOptions.WRITE_TASKS).orElse(defaultParallelism); + } + /** * Dummy sink that does nothing. */ diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java index 013753b6d9276..adde4783873ce 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java @@ -101,7 +101,7 @@ public static void main(String[] args) throws Exception { DataStream hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream); DataStream pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream); if (OptionsResolver.needsAsyncCompaction(conf)) { - Pipelines.compact(conf, pipeline); + Pipelines.compact(conf, parallelism, pipeline); } else { Pipelines.clean(conf, pipeline); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index 5af86867d86d6..d384f9da15e65 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -69,24 +69,25 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { RowType rowType = (RowType) schema.toSinkRowDataType().notNull().getLogicalType(); + // default parallelism + int parallelism = dataStream.getExecutionConfig().getParallelism(); + // bulk_insert mode final String writeOperation = this.conf.get(FlinkOptions.OPERATION); if (WriteOperationType.fromValue(writeOperation) == WriteOperationType.BULK_INSERT) { - return Pipelines.bulkInsert(conf, rowType, dataStream); + return Pipelines.bulkInsert(conf, parallelism, rowType, dataStream); } // Append mode if (OptionsResolver.isAppendMode(conf)) { - DataStream pipeline = Pipelines.append(conf, rowType, dataStream, context.isBounded()); + DataStream pipeline = Pipelines.append(conf, parallelism, rowType, dataStream, context.isBounded()); if (OptionsResolver.needsAsyncClustering(conf)) { - return Pipelines.cluster(conf, rowType, pipeline); + return Pipelines.cluster(conf, parallelism, rowType, pipeline); } else { return Pipelines.dummySink(pipeline); } } - // default parallelism - int parallelism = dataStream.getExecutionConfig().getParallelism(); DataStream pipeline; // bootstrap final DataStream hoodieRecordDataStream = @@ -99,7 +100,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { if (context.isBounded()) { conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); } - return Pipelines.compact(conf, pipeline); + return Pipelines.compact(conf, parallelism, pipeline); } else { return Pipelines.clean(conf, pipeline); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index 2034cb322eb8e..1e2c28646baa4 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -179,6 +179,8 @@ public DataStream produceDataStream(StreamExecutionEnvironment execEnv) @SuppressWarnings("unchecked") TypeInformation typeInfo = (TypeInformation) TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType()); + // default parallelism + int parallelism = execEnv.getParallelism(); if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) { StreamReadMonitoringFunction monitoringFunction = new StreamReadMonitoringFunction( conf, FilePathUtils.toFlinkPath(path), tableRowType, maxCompactionMemoryInBytes, getRequiredPartitionPaths()); @@ -188,12 +190,12 @@ public DataStream produceDataStream(StreamExecutionEnvironment execEnv) .setParallelism(1) .keyBy(MergeOnReadInputSplit::getFileId) .transform("split_reader", typeInfo, factory) - .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS)); + .setParallelism(conf.getOptional(FlinkOptions.READ_TASKS).orElse(parallelism)); return new DataStreamSource<>(source); } else { InputFormatSourceFunction func = new InputFormatSourceFunction<>(getInputFormat(), typeInfo); DataStreamSource source = execEnv.addSource(func, asSummaryString(), typeInfo); - return source.name(getSourceOperatorName("bounded_source")).setParallelism(conf.getInteger(FlinkOptions.READ_TASKS)); + return source.name(getSourceOperatorName("bounded_source")).setParallelism(conf.getOptional(FlinkOptions.READ_TASKS).orElse(parallelism)); } } }; diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java index 680c4d02e238b..9c4ab8d78ea51 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java @@ -191,7 +191,7 @@ private void testWriteToHoodie( StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); execEnv.getConfig().disableObjectReuse(); - execEnv.setParallelism(4); + execEnv.setParallelism(8); // set up checkpoint interval execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE); execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1); @@ -232,7 +232,7 @@ private void testWriteToHoodie( .name("continuous_file_source") .setParallelism(1) .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8))) - .setParallelism(4); + .setParallelism(8); } if (transformer.isPresent()) { @@ -246,7 +246,7 @@ private void testWriteToHoodie( if (isMor) { Pipelines.clean(conf, pipeline); - Pipelines.compact(conf, pipeline); + Pipelines.compact(conf, parallelism, pipeline); } execute(execEnv, isMor, jobName); @@ -261,7 +261,7 @@ private void testWriteToHoodieWithCluster( StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); execEnv.getConfig().disableObjectReuse(); - execEnv.setParallelism(4); + execEnv.setParallelism(8); // set up checkpoint interval execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE); execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1); @@ -302,13 +302,15 @@ private void testWriteToHoodieWithCluster( .name("continuous_file_source") .setParallelism(1) .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8))) - .setParallelism(4); + .setParallelism(8); } - DataStream pipeline = Pipelines.append(conf, rowType, dataStream, true); + int parallelism = execEnv.getParallelism(); + + DataStream pipeline = Pipelines.append(conf, parallelism, rowType, dataStream, true); execEnv.addOperator(pipeline.getTransformation()); - Pipelines.cluster(conf, rowType, pipeline); + Pipelines.cluster(conf, parallelism, rowType, pipeline); execEnv.execute(jobName); TestData.checkWrittenDataCOW(tempFile, expected); @@ -336,7 +338,7 @@ public void testHoodiePipelineBuilderSource() throws Exception { //create a StreamExecutionEnvironment instance. StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); execEnv.getConfig().disableObjectReuse(); - execEnv.setParallelism(1); + execEnv.setParallelism(8); // set up checkpoint interval execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE); execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1); @@ -377,7 +379,7 @@ public void testHoodiePipelineBuilderSink() throws Exception { StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); Map options = new HashMap<>(); execEnv.getConfig().disableObjectReuse(); - execEnv.setParallelism(4); + execEnv.setParallelism(8); // set up checkpoint interval execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE); execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);