Skip to content

Commit

Permalink
Remove useless adapters for flink 1.13
Browse files Browse the repository at this point in the history
  • Loading branch information
beyond1920 committed Nov 12, 2023
1 parent 2ed69fc commit 61e0551
Show file tree
Hide file tree
Showing 52 changed files with 63 additions and 1,568 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@

package org.apache.hudi.sink.transform;

import org.apache.hudi.adapter.RateLimiterAdapter;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.RateLimiter;
import org.apache.hudi.configuration.FlinkOptions;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;

import java.util.concurrent.TimeUnit;

/**
* Function that transforms RowData to a HoodieRecord with RateLimit.
*/
Expand All @@ -39,7 +41,7 @@ public class RowDataToHoodieFunctionWithRateLimit<I extends RowData, O extends H
/**
* Rate limit per second for per task.
*/
private transient RateLimiterAdapter rateLimiter;
private transient RateLimiter rateLimiter;

public RowDataToHoodieFunctionWithRateLimit(RowType rowType, Configuration config) {
super(rowType, config);
Expand All @@ -50,12 +52,12 @@ public RowDataToHoodieFunctionWithRateLimit(RowType rowType, Configuration confi
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.rateLimiter =
RateLimiterAdapter.create(totalLimit / getRuntimeContext().getNumberOfParallelSubtasks());
RateLimiter.create((int) totalLimit / getRuntimeContext().getNumberOfParallelSubtasks(), TimeUnit.SECONDS);
}

@Override
public O map(I i) throws Exception {
rateLimiter.acquire();
rateLimiter.acquire(1);
return super.map(i);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,28 @@

package org.apache.hudi.source;

import org.apache.hudi.adapter.AbstractStreamOperatorAdapter;
import org.apache.hudi.adapter.AbstractStreamOperatorFactoryAdapter;
import org.apache.hudi.adapter.MailboxExecutorAdapter;
import org.apache.hudi.adapter.Utils;
import org.apache.hudi.metrics.FlinkStreamReadMetrics;
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;

import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.JavaSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.StreamSourceContexts;
import org.apache.flink.streaming.api.operators.YieldingOperatorFactory;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
Expand All @@ -60,7 +63,7 @@
* This architecture allows the separation of split reading from processing the checkpoint barriers,
* thus removing any potential back-pressure.
*/
public class StreamReadOperator extends AbstractStreamOperatorAdapter<RowData>
public class StreamReadOperator extends AbstractStreamOperator<RowData>
implements OneInputStreamOperator<MergeOnReadInputSplit, RowData> {

private static final Logger LOG = LoggerFactory.getLogger(StreamReadOperator.class);
Expand All @@ -70,7 +73,7 @@ public class StreamReadOperator extends AbstractStreamOperatorAdapter<RowData>
// It's the same thread that runs this operator and checkpoint actions. Use this executor to schedule only
// splits for subsequent reading, so that a new checkpoint could be triggered without blocking a long time
// for exhausting all scheduled split reading tasks.
private final MailboxExecutorAdapter executor;
private final MailboxExecutor executor;

private MergeOnReadInputFormat format;

Expand All @@ -89,7 +92,7 @@ public class StreamReadOperator extends AbstractStreamOperatorAdapter<RowData>
private transient FlinkStreamReadMetrics readMetrics;

private StreamReadOperator(MergeOnReadInputFormat format, ProcessingTimeService timeService,
MailboxExecutorAdapter mailboxExecutor) {
MailboxExecutor mailboxExecutor) {
this.format = Preconditions.checkNotNull(format, "The InputFormat should not be null.");
this.processingTimeService = timeService;
this.executor = Preconditions.checkNotNull(mailboxExecutor, "The mailboxExecutor should not be null.");
Expand Down Expand Up @@ -119,10 +122,9 @@ public void initializeState(StateInitializationContext context) throws Exception
}
}

this.sourceContext = Utils.getSourceContext(
this.sourceContext = getSourceContext(
getOperatorConfig().getTimeCharacteristic(),
getProcessingTimeService(),
getContainingTask(),
output,
getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval());

Expand Down Expand Up @@ -247,8 +249,8 @@ private enum SplitState {
IDLE, RUNNING
}

private static class OperatorFactory extends AbstractStreamOperatorFactoryAdapter<RowData>
implements OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> {
private static class OperatorFactory extends AbstractStreamOperatorFactory<RowData>
implements OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData>, YieldingOperatorFactory<RowData> {

private final MergeOnReadInputFormat format;

Expand All @@ -259,7 +261,7 @@ private OperatorFactory(MergeOnReadInputFormat format) {
@SuppressWarnings("unchecked")
@Override
public <O extends StreamOperator<RowData>> O createStreamOperator(StreamOperatorParameters<RowData> parameters) {
StreamReadOperator operator = new StreamReadOperator(format, processingTimeService, getMailboxExecutorAdapter());
StreamReadOperator operator = new StreamReadOperator(format, processingTimeService, getMailboxExecutor());
operator.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
return (O) operator;
}
Expand All @@ -269,4 +271,19 @@ public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classL
return StreamReadOperator.class;
}
}

private static <O> SourceFunction.SourceContext<O> getSourceContext(
TimeCharacteristic timeCharacteristic,
ProcessingTimeService processingTimeService,
Output<StreamRecord<O>> output,
long watermarkInterval) {
return StreamSourceContexts.getSourceContext(
timeCharacteristic,
processingTimeService,
new Object(), // no actual locking needed
output,
watermarkInterval,
-1,
true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@

package org.apache.hudi.sink.utils;

import org.apache.hudi.adapter.OutputAdapter;

import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.OutputTag;

import java.util.ArrayList;
Expand All @@ -32,7 +31,7 @@
/**
* Collecting {@link Output} for {@link StreamRecord}.
*/
public class CollectorOutput<T> implements OutputAdapter<StreamRecord<T>> {
public class CollectorOutput<T> implements Output<StreamRecord<T>> {

private final List<T> records;

Expand Down Expand Up @@ -68,4 +67,9 @@ public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
public void close() {
this.records.clear();
}

@Override
public void emitWatermarkStatus(WatermarkStatus watermarkStatus) {
// no operation
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,18 @@

package org.apache.hudi.sink.utils;

import org.apache.hudi.adapter.StateInitializationContextAdapter;

import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StatePartitionStreamProvider;

import java.util.OptionalLong;

/**
* A {@link FunctionInitializationContext} for testing purpose.
*/
public class MockStateInitializationContext implements StateInitializationContextAdapter {
public class MockStateInitializationContext implements StateInitializationContext {

private final MockOperatorStateStore operatorStateStore;

Expand Down Expand Up @@ -59,4 +60,9 @@ public Iterable<StatePartitionStreamProvider> getRawOperatorStateInputs() {
public Iterable<KeyGroupStatePartitionStreamProvider> getRawKeyedStateInputs() {
return null;
}

@Override
public OptionalLong getRestoredCheckpointId() {
return OptionalLong.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.apache.hudi.sink.utils;

import org.apache.hudi.adapter.StreamingRuntimeContextAdapter;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
Expand All @@ -37,7 +37,7 @@
*
* <p>NOTE: Adapted from Apache Flink, the MockStreamOperator is modified to support MapState.
*/
public class MockStreamingRuntimeContext extends StreamingRuntimeContextAdapter {
public class MockStreamingRuntimeContext extends StreamingRuntimeContext {

private final boolean isCheckpointingEnabled;

Expand Down Expand Up @@ -128,4 +128,9 @@ public KeyedStateStore getKeyedStateStore() {
return mockOperatorStateStore;
}
}

@Override
public OperatorMetricGroup getMetricGroup() {
return UnregisteredMetricsGroup.createOperatorMetricGroup();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.hudi.table;

import org.apache.hudi.adapter.TestTableEnvs;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
Expand All @@ -32,6 +31,7 @@
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestSQL;
import org.apache.hudi.utils.TestTableEnvs;
import org.apache.hudi.utils.TestUtils;
import org.apache.hudi.utils.factory.CollectSinkTableFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.hudi.table.catalog;

import org.apache.hudi.adapter.TestTableEnvs;
import org.apache.hudi.utils.TestTableEnvs;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.TableEnvironment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.hudi.adapter;
package org.apache.hudi.utils;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Loading

0 comments on commit 61e0551

Please sign in to comment.