Skip to content

Commit

Permalink
[HUDI-7072] Remove support for Flink 1.13 (#10052)
Browse files Browse the repository at this point in the history
  • Loading branch information
beyond1920 authored Nov 19, 2023
1 parent dfe1674 commit b2f4493
Show file tree
Hide file tree
Showing 104 changed files with 82 additions and 6,786 deletions.
11 changes: 5 additions & 6 deletions .github/workflows/bot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,6 @@ jobs:
strategy:
matrix:
include:
- flinkProfile: "flink1.13"
- flinkProfile: "flink1.14"
- flinkProfile: "flink1.15"
- flinkProfile: "flink1.16"
Expand Down Expand Up @@ -315,13 +314,13 @@ jobs:
- flinkProfile: 'flink1.14'
sparkProfile: 'spark3.2'
sparkRuntime: 'spark3.2.3'
- flinkProfile: 'flink1.13'
- flinkProfile: 'flink1.14'
sparkProfile: 'spark3.1'
sparkRuntime: 'spark3.1.3'
- flinkProfile: 'flink1.14'
sparkProfile: 'spark3.0'
sparkRuntime: 'spark3.0.2'
- flinkProfile: 'flink1.13'
- flinkProfile: 'flink1.14'
sparkProfile: 'spark2.4'
sparkRuntime: 'spark2.4.8'
steps:
Expand Down Expand Up @@ -390,13 +389,13 @@ jobs:
- flinkProfile: 'flink1.14'
sparkProfile: 'spark3.2'
sparkRuntime: 'spark3.2.3'
- flinkProfile: 'flink1.13'
- flinkProfile: 'flink1.14'
sparkProfile: 'spark3.1'
sparkRuntime: 'spark3.1.3'
- flinkProfile: 'flink1.13'
- flinkProfile: 'flink1.14'
sparkProfile: 'spark'
sparkRuntime: 'spark2.4.8'
- flinkProfile: 'flink1.13'
- flinkProfile: 'flink1.14'
sparkProfile: 'spark2.4'
sparkRuntime: 'spark2.4.8'
steps:
Expand Down
5 changes: 0 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,6 @@ Refer to the table below for building with different Flink and Scala versions.
| `-Dflink1.15` | hudi-flink1.15-bundle | For Flink 1.15 |
| `-Dflink1.14` | hudi-flink1.14-bundle | For Flink 1.14 and Scala 2.12 |
| `-Dflink1.14 -Dscala-2.11` | hudi-flink1.14-bundle | For Flink 1.14 and Scala 2.11 |
| `-Dflink1.13` | hudi-flink1.13-bundle | For Flink 1.13 and Scala 2.12 |
| `-Dflink1.13 -Dscala-2.11` | hudi-flink1.13-bundle | For Flink 1.13 and Scala 2.11 |

For example,
```
Expand All @@ -142,9 +140,6 @@ mvn clean package -DskipTests -Dflink1.15
# Build against Flink 1.14.x and Scala 2.11
mvn clean package -DskipTests -Dflink1.14 -Dscala-2.11
# Build against Flink 1.13.x and Scala 2.12
mvn clean package -DskipTests -Dflink1.13
```

## Running Tests
Expand Down
3 changes: 0 additions & 3 deletions azure-pipelines-20230430.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ parameters:
- 'hudi-common'
- 'hudi-flink-datasource'
- 'hudi-flink-datasource/hudi-flink'
- 'hudi-flink-datasource/hudi-flink1.13.x'
- 'hudi-flink-datasource/hudi-flink1.14.x'
- 'hudi-flink-datasource/hudi-flink1.15.x'
- 'hudi-flink-datasource/hudi-flink1.16.x'
Expand Down Expand Up @@ -65,7 +64,6 @@ parameters:
- '!hudi-examples/hudi-examples-spark'
- '!hudi-flink-datasource'
- '!hudi-flink-datasource/hudi-flink'
- '!hudi-flink-datasource/hudi-flink1.13.x'
- '!hudi-flink-datasource/hudi-flink1.14.x'
- '!hudi-flink-datasource/hudi-flink1.15.x'
- '!hudi-flink-datasource/hudi-flink1.16.x'
Expand All @@ -89,7 +87,6 @@ parameters:
- '!hudi-examples/hudi-examples-spark'
- '!hudi-flink-datasource'
- '!hudi-flink-datasource/hudi-flink'
- '!hudi-flink-datasource/hudi-flink1.13.x'
- '!hudi-flink-datasource/hudi-flink1.14.x'
- '!hudi-flink-datasource/hudi-flink1.15.x'
- '!hudi-flink-datasource/hudi-flink1.16.x'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@

package org.apache.hudi.sink.transform;

import org.apache.hudi.adapter.RateLimiterAdapter;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.RateLimiter;
import org.apache.hudi.configuration.FlinkOptions;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;

import java.util.concurrent.TimeUnit;

/**
* Function that transforms RowData to a HoodieRecord with RateLimit.
*/
Expand All @@ -39,7 +41,7 @@ public class RowDataToHoodieFunctionWithRateLimit<I extends RowData, O extends H
/**
* Rate limit per second for per task.
*/
private transient RateLimiterAdapter rateLimiter;
private transient RateLimiter rateLimiter;

public RowDataToHoodieFunctionWithRateLimit(RowType rowType, Configuration config) {
super(rowType, config);
Expand All @@ -50,12 +52,12 @@ public RowDataToHoodieFunctionWithRateLimit(RowType rowType, Configuration confi
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.rateLimiter =
RateLimiterAdapter.create(totalLimit / getRuntimeContext().getNumberOfParallelSubtasks());
RateLimiter.create((int) totalLimit / getRuntimeContext().getNumberOfParallelSubtasks(), TimeUnit.SECONDS);
}

@Override
public O map(I i) throws Exception {
rateLimiter.acquire();
rateLimiter.acquire(1);
return super.map(i);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,28 @@

package org.apache.hudi.source;

import org.apache.hudi.adapter.AbstractStreamOperatorAdapter;
import org.apache.hudi.adapter.AbstractStreamOperatorFactoryAdapter;
import org.apache.hudi.adapter.MailboxExecutorAdapter;
import org.apache.hudi.adapter.Utils;
import org.apache.hudi.metrics.FlinkStreamReadMetrics;
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;

import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.JavaSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.StreamSourceContexts;
import org.apache.flink.streaming.api.operators.YieldingOperatorFactory;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
Expand All @@ -60,7 +63,7 @@
* This architecture allows the separation of split reading from processing the checkpoint barriers,
* thus removing any potential back-pressure.
*/
public class StreamReadOperator extends AbstractStreamOperatorAdapter<RowData>
public class StreamReadOperator extends AbstractStreamOperator<RowData>
implements OneInputStreamOperator<MergeOnReadInputSplit, RowData> {

private static final Logger LOG = LoggerFactory.getLogger(StreamReadOperator.class);
Expand All @@ -70,7 +73,7 @@ public class StreamReadOperator extends AbstractStreamOperatorAdapter<RowData>
// It's the same thread that runs this operator and checkpoint actions. Use this executor to schedule only
// splits for subsequent reading, so that a new checkpoint could be triggered without blocking a long time
// for exhausting all scheduled split reading tasks.
private final MailboxExecutorAdapter executor;
private final MailboxExecutor executor;

private MergeOnReadInputFormat format;

Expand All @@ -89,7 +92,7 @@ public class StreamReadOperator extends AbstractStreamOperatorAdapter<RowData>
private transient FlinkStreamReadMetrics readMetrics;

private StreamReadOperator(MergeOnReadInputFormat format, ProcessingTimeService timeService,
MailboxExecutorAdapter mailboxExecutor) {
MailboxExecutor mailboxExecutor) {
this.format = Preconditions.checkNotNull(format, "The InputFormat should not be null.");
this.processingTimeService = timeService;
this.executor = Preconditions.checkNotNull(mailboxExecutor, "The mailboxExecutor should not be null.");
Expand Down Expand Up @@ -119,10 +122,9 @@ public void initializeState(StateInitializationContext context) throws Exception
}
}

this.sourceContext = Utils.getSourceContext(
this.sourceContext = getSourceContext(
getOperatorConfig().getTimeCharacteristic(),
getProcessingTimeService(),
getContainingTask(),
output,
getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval());

Expand Down Expand Up @@ -247,8 +249,8 @@ private enum SplitState {
IDLE, RUNNING
}

private static class OperatorFactory extends AbstractStreamOperatorFactoryAdapter<RowData>
implements OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> {
private static class OperatorFactory extends AbstractStreamOperatorFactory<RowData>
implements OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData>, YieldingOperatorFactory<RowData> {

private final MergeOnReadInputFormat format;

Expand All @@ -259,7 +261,7 @@ private OperatorFactory(MergeOnReadInputFormat format) {
@SuppressWarnings("unchecked")
@Override
public <O extends StreamOperator<RowData>> O createStreamOperator(StreamOperatorParameters<RowData> parameters) {
StreamReadOperator operator = new StreamReadOperator(format, processingTimeService, getMailboxExecutorAdapter());
StreamReadOperator operator = new StreamReadOperator(format, processingTimeService, getMailboxExecutor());
operator.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
return (O) operator;
}
Expand All @@ -269,4 +271,19 @@ public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classL
return StreamReadOperator.class;
}
}

private static <O> SourceFunction.SourceContext<O> getSourceContext(
TimeCharacteristic timeCharacteristic,
ProcessingTimeService processingTimeService,
Output<StreamRecord<O>> output,
long watermarkInterval) {
return StreamSourceContexts.getSourceContext(
timeCharacteristic,
processingTimeService,
new Object(), // no actual locking needed
output,
watermarkInterval,
-1,
true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@

package org.apache.hudi.sink.utils;

import org.apache.hudi.adapter.OutputAdapter;

import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.OutputTag;

import java.util.ArrayList;
Expand All @@ -32,7 +31,7 @@
/**
* Collecting {@link Output} for {@link StreamRecord}.
*/
public class CollectorOutput<T> implements OutputAdapter<StreamRecord<T>> {
public class CollectorOutput<T> implements Output<StreamRecord<T>> {

private final List<T> records;

Expand Down Expand Up @@ -68,4 +67,9 @@ public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
public void close() {
this.records.clear();
}

@Override
public void emitWatermarkStatus(WatermarkStatus watermarkStatus) {
// no operation
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,18 @@

package org.apache.hudi.sink.utils;

import org.apache.hudi.adapter.StateInitializationContextAdapter;

import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StatePartitionStreamProvider;

import java.util.OptionalLong;

/**
* A {@link FunctionInitializationContext} for testing purpose.
*/
public class MockStateInitializationContext implements StateInitializationContextAdapter {
public class MockStateInitializationContext implements StateInitializationContext {

private final MockOperatorStateStore operatorStateStore;

Expand Down Expand Up @@ -59,4 +60,9 @@ public Iterable<StatePartitionStreamProvider> getRawOperatorStateInputs() {
public Iterable<KeyGroupStatePartitionStreamProvider> getRawKeyedStateInputs() {
return null;
}

@Override
public OptionalLong getRestoredCheckpointId() {
return OptionalLong.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.apache.hudi.sink.utils;

import org.apache.hudi.adapter.StreamingRuntimeContextAdapter;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
Expand All @@ -37,7 +37,7 @@
*
* <p>NOTE: Adapted from Apache Flink, the MockStreamOperator is modified to support MapState.
*/
public class MockStreamingRuntimeContext extends StreamingRuntimeContextAdapter {
public class MockStreamingRuntimeContext extends StreamingRuntimeContext {

private final boolean isCheckpointingEnabled;

Expand Down Expand Up @@ -128,4 +128,9 @@ public KeyedStateStore getKeyedStateStore() {
return mockOperatorStateStore;
}
}

@Override
public OperatorMetricGroup getMetricGroup() {
return UnregisteredMetricsGroup.createOperatorMetricGroup();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.hudi.table;

import org.apache.hudi.adapter.TestTableEnvs;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
Expand All @@ -32,6 +31,7 @@
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestSQL;
import org.apache.hudi.utils.TestTableEnvs;
import org.apache.hudi.utils.TestUtils;
import org.apache.hudi.utils.factory.CollectSinkTableFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.hudi.table.catalog;

import org.apache.hudi.adapter.TestTableEnvs;
import org.apache.hudi.utils.TestTableEnvs;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.TableEnvironment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.hudi.adapter;
package org.apache.hudi.utils;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand Down
Loading

0 comments on commit b2f4493

Please sign in to comment.