From 4e1b5f8accb6d597d4fe2244ae381a4a56b6f109 Mon Sep 17 00:00:00 2001 From: "yuzhao.cyz" Date: Wed, 22 Sep 2021 11:19:29 +0800 Subject: [PATCH] [MINOR] Cosmetic changes for flink --- .../apache/hudi/sink/StreamWriteFunction.java | 2 +- .../profile/EmptyWriteProfile.java | 7 ++-- .../hudi/streamer/HoodieFlinkStreamer.java | 5 +-- .../apache/hudi/table/HoodieTableFactory.java | 2 +- .../apache/hudi/table/HoodieTableSource.java | 11 +++---- .../format/mor/MergeOnReadInputFormat.java | 2 +- .../org/apache/hudi/util/InputFormats.java | 33 +++++++++++++++++++ .../org/apache/hudi/util/StreamerUtil.java | 16 +++++++-- 8 files changed, 59 insertions(+), 19 deletions(-) create mode 100644 hudi-flink/src/main/java/org/apache/hudi/util/InputFormats.java diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index a155fb52d885..c71a91a82d10 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -67,7 +67,7 @@ *

The Semantics

* *

The task implements exactly-once semantics by buffering the data between checkpoints. The operator coordinator - * starts a new instant on the time line when a checkpoint triggers, the coordinator checkpoints always + * starts a new instant on the timeline when a checkpoint triggers, the coordinator checkpoints always * start before its operator, so when this function starts a checkpoint, a REQUESTED instant already exists. * *

The function process thread blocks data buffering after the checkpoint thread finishes flushing the existing data buffer until diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/EmptyWriteProfile.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/EmptyWriteProfile.java index 3cdd798e2e84..e0a6fc1f4a33 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/EmptyWriteProfile.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/EmptyWriteProfile.java @@ -28,11 +28,8 @@ /** * WriteProfile that always return empty small files. * - *

This write profile is used for cases: - * i). INSERT OVERWRITE and INSERT OVERWRITE TABLE operations, - * the existing small files are ignored because of the 'OVERWRITE' semantics; - * ii). INSERT operation when data file merge is disabled. - * + *

This write profile is used for INSERT OVERWRITE and INSERT OVERWRITE TABLE operations, + * the existing small files are ignored because of the 'OVERWRITE' semantics. * *

Note: assumes the index can always index log files for Flink write. */ diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java index 077633ee90e5..bb545ad896ac 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java @@ -41,8 +41,9 @@ import java.util.Properties; /** - * An Utility which can incrementally consume data from Kafka and apply it to the target table. - * currently, it only supports COW table and insert, upsert operation. + * A utility which can incrementally consume data from Kafka and apply it to the target table. + * It has the similar functionality with SQL data source except that the source is bind to Kafka + * and the format is bind to JSON. */ public class HoodieFlinkStreamer { public static void main(String[] args) throws Exception { diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index a2d0960770e9..627bc2c29acf 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -135,7 +135,7 @@ private void sanityCheck(Configuration conf, ResolvedSchema schema) { } /** - * Setup the config options based on the table definition, for e.g the table name, primary key. + * Sets up the config options based on the table definition, for e.g the table name, primary key. * * @param conf The configuration to setup * @param tableName The table name diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index 0494143a1a01..6ef608bc713b 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -40,6 +40,7 @@ import org.apache.hudi.table.format.mor.MergeOnReadTableState; import org.apache.hudi.util.AvroSchemaConverter; import org.apache.hudi.util.ChangelogModes; +import org.apache.hudi.util.InputFormats; import org.apache.hudi.util.StreamerUtil; import org.apache.avro.Schema; @@ -48,7 +49,6 @@ import org.apache.flink.api.common.io.FilePathFilter; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.io.CollectionInputFormat; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; @@ -108,9 +108,6 @@ public class HoodieTableSource implements private static final int NO_LIMIT_CONSTANT = -1; - private static final InputFormat EMPTY_INPUT_FORMAT = - new CollectionInputFormat<>(Collections.emptyList(), null); - private final transient org.apache.hadoop.conf.Configuration hadoopConf; private final transient HoodieTableMetaClient metaClient; private final long maxCompactionMemoryInBytes; @@ -340,7 +337,7 @@ private List buildFileIndex() { if (inputSplits.size() == 0) { // When there is no input splits, just return an empty source. LOG.warn("No input splits generate for MERGE_ON_READ input format, returns empty collection instead"); - return EMPTY_INPUT_FORMAT; + return InputFormats.EMPTY_INPUT_FORMAT; } return mergeOnReadInputFormat(rowType, requiredRowType, tableAvroSchema, rowDataType, inputSplits, false); @@ -360,7 +357,7 @@ private List buildFileIndex() { if (result.isEmpty()) { // When there is no input splits, just return an empty source. LOG.warn("No input splits generate for incremental read, returns empty collection instead"); - return new CollectionInputFormat<>(Collections.emptyList(), null); + return InputFormats.EMPTY_INPUT_FORMAT; } return mergeOnReadInputFormat(rowType, requiredRowType, tableAvroSchema, rowDataType, result.getInputSplits(), false); @@ -419,7 +416,7 @@ private MergeOnReadInputFormat mergeOnReadInputFormat( private InputFormat baseFileOnlyInputFormat() { final Path[] paths = getReadPaths(); if (paths.length == 0) { - return EMPTY_INPUT_FORMAT; + return InputFormats.EMPTY_INPUT_FORMAT; } FileInputFormat format = new CopyOnWriteInputFormat( FilePathUtils.toFlinkPaths(paths), diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java index e3a8eee9292d..4cd45a81d798 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java @@ -125,7 +125,7 @@ public class MergeOnReadInputFormat /** * Flag saying whether to emit the deletes. In streaming read mode, downstream - * operators need the delete messages to retract the legacy accumulator. + * operators need the DELETE messages to retract the legacy accumulator. */ private boolean emitDelete; diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/InputFormats.java b/hudi-flink/src/main/java/org/apache/hudi/util/InputFormats.java new file mode 100644 index 000000000000..f193357e8880 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/util/InputFormats.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.util; + +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.java.io.CollectionInputFormat; +import org.apache.flink.table.data.RowData; + +import java.util.Collections; + +/** + * Utilities for all kinds of {@link org.apache.flink.api.common.io.InputFormat}s. + */ +public class InputFormats { + public static final InputFormat EMPTY_INPUT_FORMAT = + new CollectionInputFormat<>(Collections.emptyList(), null); +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 04eeab8b377a..b717268800ac 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -250,7 +250,7 @@ public static void initTableIfNotExists(Configuration conf) throws IOException { basePath, conf.getString(FlinkOptions.TABLE_NAME)); } // Do not close the filesystem in order to use the CACHE, - // some of the filesystems release the handles in #close method. + // some filesystems release the handles in #close method. } /** @@ -359,7 +359,7 @@ public static HoodieFlinkWriteClient createWriteClient(Configuration conf) { } /** - * Return the median instant time between the given two instant time. + * Returns the median instant time between the given two instant time. */ public static String medianInstantTime(String highVal, String lowVal) { try { @@ -399,6 +399,10 @@ public static Option createTransformer(List classNames) thr } } + /** + * Returns whether the give file is in valid hoodie format. + * For example, filtering out the empty or corrupt files. + */ public static boolean isValidFile(FileStatus fileStatus) { final String extension = FSUtils.getFileExtension(fileStatus.getPath().toString()); if (PARQUET.getFileExtension().equals(extension)) { @@ -416,11 +420,19 @@ public static boolean isValidFile(FileStatus fileStatus) { return fileStatus.getLen() > 0; } + /** + * Returns whether insert deduplication is allowed with given configuration {@code conf}. + */ public static boolean allowDuplicateInserts(Configuration conf) { WriteOperationType operationType = WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)); return operationType == WriteOperationType.INSERT && !conf.getBoolean(FlinkOptions.INSERT_DEDUP); } + /** + * Returns whether there are successful commits on the timeline. + * @param metaClient The meta client + * @return true if there is any successful commit + */ public static boolean haveSuccessfulCommits(HoodieTableMetaClient metaClient) { return !metaClient.getCommitsTimeline().filterCompletedInstants().empty(); }