From 54cdf02f985d681b1f5b4105dad47f6730f96a31 Mon Sep 17 00:00:00 2001 From: Zakelly Date: Wed, 9 Oct 2024 15:22:06 +0800 Subject: [PATCH 1/2] [FLINK-36322] Remove scala --- pom.xml | 22 ----------- .../benchmark/functions/ScalaADTSource.scala | 38 ------------------- 2 files changed, 60 deletions(-) delete mode 100644 src/main/scala/org/apache/flink/benchmark/functions/ScalaADTSource.scala diff --git a/pom.xml b/pom.xml index 214cfc0..351fa3a 100644 --- a/pom.xml +++ b/pom.xml @@ -585,28 +585,6 @@ under the License. - - net.alchim31.maven - scala-maven-plugin - 4.4.0 - - - scala-compile-first - process-resources - - add-source - compile - - - - scala-test-compile - process-test-resources - - testCompile - - - - org.apache.maven.plugins maven-compiler-plugin diff --git a/src/main/scala/org/apache/flink/benchmark/functions/ScalaADTSource.scala b/src/main/scala/org/apache/flink/benchmark/functions/ScalaADTSource.scala deleted file mode 100644 index 78622e2..0000000 --- a/src/main/scala/org/apache/flink/benchmark/functions/ScalaADTSource.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.benchmark.functions - -import org.apache.flink.api.scala._ -import org.apache.flink.benchmark.functions.ScalaADTSource.{ADT, ADT1, ADT2} - -class ScalaADTSource(numEvents: Int) extends BaseSourceWithKeyRange[ADT](numEvents,2) { - override protected def getElement(keyId: Int): ADT = keyId match { - case 0 => ADT1("a") - case 1 => ADT2(1) - } -} - -object ScalaADTSource { - sealed trait ADT - case class ADT1(a: String) extends ADT - case class ADT2(a: Int) extends ADT - - val adtTypeInfo = createTypeInformation[ADT] -} - - From a6cd702c08ad654f51533e3983c0f80f5b7cd598 Mon Sep 17 00:00:00 2001 From: Zakelly Date: Wed, 9 Oct 2024 18:30:29 +0800 Subject: [PATCH 2/2] [FLINK-36322] Fix compile based on 2.0-preview --- .../benchmark/AsyncWaitOperatorBenchmark.java | 2 +- .../flink/benchmark/BackpressureUtils.java | 2 +- .../benchmark/BlockingPartitionBenchmark.java | 66 +++---------------- ...ockingPartitionRemoteChannelBenchmark.java | 31 +-------- .../CheckpointEnvironmentContext.java | 14 ++-- .../apache/flink/benchmark/CollectSink.java | 2 +- ...ContinuousFileReaderOperatorBenchmark.java | 6 +- .../benchmark/FlinkEnvironmentContext.java | 20 +++--- .../flink/benchmark/InputBenchmark.java | 2 +- .../flink/benchmark/KeyByBenchmarks.java | 10 +-- .../MemoryStateBackendBenchmark.java | 2 +- .../benchmark/MultipleInputBenchmark.java | 4 +- .../benchmark/ProcessingTimerBenchmark.java | 11 ++-- .../RemoteChannelThroughputBenchmark.java | 4 +- .../SerializationFrameworkMiniBenchmarks.java | 16 ++--- .../SortingBoundedInputBenchmarks.java | 2 +- .../benchmark/StateBackendBenchmarkBase.java | 35 +++++----- .../flink/benchmark/StreamGraphUtils.java | 2 +- .../flink/benchmark/TwoInputBenchmark.java | 2 +- .../flink/benchmark/WindowBenchmarks.java | 4 +- .../full/PojoSerializationBenchmark.java | 4 +- .../SerializationFrameworkAllBenchmarks.java | 33 +++------- .../full/StringSerializationBenchmark.java | 2 +- .../functions/BaseSourceWithKeyRange.java | 3 +- .../functions/IntegerLongSource.java | 2 +- .../flink/benchmark/functions/LongSource.java | 2 +- .../benchmark/RescalingBenchmarkBase.java | 5 +- .../state/benchmark/StateBenchmarkBase.java | 2 +- .../apache/flink/config/ConfigUtilTest.java | 2 +- 29 files changed, 105 insertions(+), 187 deletions(-) diff --git a/src/main/java/org/apache/flink/benchmark/AsyncWaitOperatorBenchmark.java b/src/main/java/org/apache/flink/benchmark/AsyncWaitOperatorBenchmark.java index b61fd0c..20474c7 100644 --- a/src/main/java/org/apache/flink/benchmark/AsyncWaitOperatorBenchmark.java +++ b/src/main/java/org/apache/flink/benchmark/AsyncWaitOperatorBenchmark.java @@ -24,7 +24,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; -import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.api.functions.sink.legacy.DiscardingSink; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.OperationsPerInvocation; diff --git a/src/main/java/org/apache/flink/benchmark/BackpressureUtils.java b/src/main/java/org/apache/flink/benchmark/BackpressureUtils.java index 7752ac5..fe2c947 100644 --- a/src/main/java/org/apache/flink/benchmark/BackpressureUtils.java +++ b/src/main/java/org/apache/flink/benchmark/BackpressureUtils.java @@ -73,7 +73,7 @@ private static RestClusterClient createClient( int port, Configuration clientConfiguration) throws Exception { final Configuration clientConfig = new Configuration(); clientConfig.addAll(clientConfiguration); - clientConfig.setInteger(RestOptions.PORT, port); + clientConfig.set(RestOptions.PORT, port); return new RestClusterClient<>(clientConfig, StandaloneClusterId.getInstance()); } diff --git a/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java b/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java index c447049..0159de1 100644 --- a/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java +++ b/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java @@ -60,22 +60,6 @@ public void compressedFilePartition(CompressedFileEnvironmentContext context) th executeBenchmark(context.env); } - @Benchmark - public void uncompressedMmapPartition(UncompressedMmapEnvironmentContext context) - throws Exception { - executeBenchmark(context.env); - } - - @Benchmark - public void compressedSortPartition(CompressedSortEnvironmentContext context) throws Exception { - executeBenchmark(context.env); - } - - @Benchmark - public void uncompressedSortPartition(UncompressedSortEnvironmentContext context) throws Exception { - executeBenchmark(context.env); - } - private void executeBenchmark(StreamExecutionEnvironment env) throws Exception { StreamGraph streamGraph = StreamGraphUtils.buildGraphForBatchJob(env, RECORDS_PER_INVOCATION); @@ -102,23 +86,15 @@ public void setUp() throws Exception { } protected Configuration createConfiguration( - boolean compressionEnabled, String subpartitionType, boolean isSortShuffle) { + boolean compressionEnabled) { Configuration configuration = super.createConfiguration(); - if (isSortShuffle) { - configuration.setInteger( - NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, 1); - } else { - configuration.setInteger( - NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, - Integer.MAX_VALUE); - } - configuration.setBoolean( - NettyShuffleEnvironmentOptions.BATCH_SHUFFLE_COMPRESSION_ENABLED, - compressionEnabled); - configuration.setString( - NettyShuffleEnvironmentOptions.NETWORK_BLOCKING_SHUFFLE_TYPE, subpartitionType); - configuration.setString( + configuration.set( + NettyShuffleEnvironmentOptions.SHUFFLE_COMPRESSION_CODEC, + compressionEnabled ? + NettyShuffleEnvironmentOptions.CompressionCodec.LZ4 + : NettyShuffleEnvironmentOptions.CompressionCodec.NONE); + configuration.set( CoreOptions.TMP_DIRS, FileUtils.getCurrentWorkingDirectory().toAbsolutePath().toString()); return configuration; @@ -129,7 +105,7 @@ public static class UncompressedFileEnvironmentContext extends BlockingPartitionEnvironmentContext { @Override protected Configuration createConfiguration() { - return createConfiguration(false, "file", false); + return createConfiguration(false); } } @@ -137,31 +113,7 @@ public static class CompressedFileEnvironmentContext extends BlockingPartitionEnvironmentContext { @Override protected Configuration createConfiguration() { - return createConfiguration(true, "file", false); - } - } - - public static class UncompressedMmapEnvironmentContext - extends BlockingPartitionEnvironmentContext { - @Override - protected Configuration createConfiguration() { - return createConfiguration(false, "mmap", false); - } - } - - public static class CompressedSortEnvironmentContext - extends BlockingPartitionEnvironmentContext { - @Override - protected Configuration createConfiguration() { - return createConfiguration(true, "file", true); - } - } - - public static class UncompressedSortEnvironmentContext - extends BlockingPartitionEnvironmentContext { - @Override - protected Configuration createConfiguration() { - return createConfiguration(false, "file", true); + return createConfiguration(true); } } } diff --git a/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java b/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java index ee69eb7..da411bb 100644 --- a/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java +++ b/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java @@ -48,14 +48,6 @@ public static void main(String[] args) throws RunnerException { new Runner(options).run(); } - @Benchmark - public void remoteFilePartition(RemoteFileEnvironmentContext context) throws Exception { - StreamGraph streamGraph = - StreamGraphUtils.buildGraphForBatchJob(context.env, RECORDS_PER_INVOCATION); - context.miniCluster.executeJobBlocking( - StreamingJobGraphGenerator.createJobGraph(streamGraph)); - } - @Benchmark public void remoteSortPartition(RemoteSortEnvironmentContext context) throws Exception { StreamGraph streamGraph = @@ -75,20 +67,10 @@ public void setUp() throws Exception { env.setBufferTimeout(-1); } - protected Configuration createConfiguration(boolean isSortShuffle) { + protected Configuration createConfiguration() { Configuration configuration = super.createConfiguration(); - if (isSortShuffle) { - configuration.setInteger( - NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, 1); - } else { - configuration.setInteger( - NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, - Integer.MAX_VALUE); - } - configuration.setString( - NettyShuffleEnvironmentOptions.NETWORK_BLOCKING_SHUFFLE_TYPE, "file"); - configuration.setString( + configuration.set( CoreOptions.TMP_DIRS, FileUtils.getCurrentWorkingDirectory().toAbsolutePath().toString()); return configuration; @@ -100,17 +82,10 @@ protected int getNumberOfVertices() { } } - public static class RemoteFileEnvironmentContext extends BlockingPartitionEnvironmentContext { - @Override - protected Configuration createConfiguration() { - return createConfiguration(false); - } - } - public static class RemoteSortEnvironmentContext extends BlockingPartitionEnvironmentContext { @Override protected Configuration createConfiguration() { - return createConfiguration(true); + return super.createConfiguration(); } } } diff --git a/src/main/java/org/apache/flink/benchmark/CheckpointEnvironmentContext.java b/src/main/java/org/apache/flink/benchmark/CheckpointEnvironmentContext.java index 374994f..0295cdf 100644 --- a/src/main/java/org/apache/flink/benchmark/CheckpointEnvironmentContext.java +++ b/src/main/java/org/apache/flink/benchmark/CheckpointEnvironmentContext.java @@ -19,14 +19,14 @@ package org.apache.flink.benchmark; import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.execution.JobClient; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.testutils.CommonTestUtils; -import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction; import org.apache.flink.streaming.api.graph.StreamGraph; import java.time.Duration; @@ -93,31 +93,31 @@ protected Configuration createConfiguration() { public enum CheckpointMode { UNALIGNED( config -> { - config.set(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, true); + config.set(CheckpointingOptions.ENABLE_UNALIGNED, true); config.set( TaskManagerOptions.MEMORY_SEGMENT_SIZE, CheckpointEnvironmentContext.START_MEMORY_SEGMENT_SIZE); config.set( - ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT, + CheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT, Duration.ofMillis(0)); config.set(TaskManagerOptions.BUFFER_DEBLOAT_ENABLED, false); return config; }), UNALIGNED_1( config -> { - config.set(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, true); + config.set(CheckpointingOptions.ENABLE_UNALIGNED, true); config.set( TaskManagerOptions.MEMORY_SEGMENT_SIZE, CheckpointEnvironmentContext.START_MEMORY_SEGMENT_SIZE); config.set( - ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT, + CheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT, Duration.ofMillis(1)); config.set(TaskManagerOptions.BUFFER_DEBLOAT_ENABLED, false); return config; }), ALIGNED( config -> { - config.set(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, false); + config.set(CheckpointingOptions.ENABLE_UNALIGNED, false); config.set( TaskManagerOptions.MEMORY_SEGMENT_SIZE, CheckpointEnvironmentContext.START_MEMORY_SEGMENT_SIZE); diff --git a/src/main/java/org/apache/flink/benchmark/CollectSink.java b/src/main/java/org/apache/flink/benchmark/CollectSink.java index ba1b82c..1282e2c 100644 --- a/src/main/java/org/apache/flink/benchmark/CollectSink.java +++ b/src/main/java/org/apache/flink/benchmark/CollectSink.java @@ -18,7 +18,7 @@ package org.apache.flink.benchmark; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction; import java.util.ArrayList; import java.util.List; diff --git a/src/main/java/org/apache/flink/benchmark/ContinuousFileReaderOperatorBenchmark.java b/src/main/java/org/apache/flink/benchmark/ContinuousFileReaderOperatorBenchmark.java index a829823..be13691 100644 --- a/src/main/java/org/apache/flink/benchmark/ContinuousFileReaderOperatorBenchmark.java +++ b/src/main/java/org/apache/flink/benchmark/ContinuousFileReaderOperatorBenchmark.java @@ -18,19 +18,18 @@ package org.apache.flink.benchmark; import org.apache.flink.api.common.io.FileInputFormat; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.Path; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction; import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperatorFactory; -import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit; import joptsimple.internal.Strings; +import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.OperationsPerInvocation; import org.openjdk.jmh.runner.Runner; @@ -76,7 +75,6 @@ public static void main(String[] args) throws RunnerException { public void readFileSplit(FlinkEnvironmentContext context) throws Exception { TARGET_COUNT_REACHED_LATCH.reset(); StreamExecutionEnvironment env = context.env; - env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration()); env.enableCheckpointing(100) .setParallelism(1) .addSource(new MockSourceFunction()) diff --git a/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java b/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java index 1b09133..b51de81 100644 --- a/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java +++ b/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java @@ -18,15 +18,14 @@ package org.apache.flink.benchmark; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DeploymentOptions; -import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.RestartStrategyOptions; +import org.apache.flink.configuration.StateBackendOptions; import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; -import org.apache.flink.runtime.state.memory.MemoryStateBackend; -import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.test.util.MiniClusterPipelineExecutorServiceLoader; @@ -54,6 +53,8 @@ public void setUp() throws Exception { throw new RuntimeException("setUp was called multiple times!"); } final Configuration clusterConfig = createConfiguration(); + clusterConfig.set(RestartStrategyOptions.RESTART_STRATEGY, "none"); + clusterConfig.set(StateBackendOptions.STATE_BACKEND, "hashmap"); miniCluster = new MiniCluster( new MiniClusterConfiguration.Builder() @@ -78,8 +79,6 @@ public void setUp() throws Exception { if (objectReuse) { env.getConfig().enableObjectReuse(); } - env.setRestartStrategy(RestartStrategies.noRestart()); - env.setStateBackend(new MemoryStateBackend()); } @TearDown @@ -102,14 +101,15 @@ public void execute() throws Exception { protected Configuration createConfiguration() { final Configuration configuration = new Configuration(); - configuration.setString(RestOptions.BIND_PORT, "0"); - configuration.setInteger( - NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS, NUM_NETWORK_BUFFERS); + configuration.set(RestOptions.BIND_PORT, "0"); + // no equivalent config available. + //configuration.setInteger( + // NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS, NUM_NETWORK_BUFFERS); configuration.set(DeploymentOptions.TARGET, MiniClusterPipelineExecutorServiceLoader.NAME); configuration.set(DeploymentOptions.ATTACHED, true); // It doesn't make sense to wait for the final checkpoint in benchmarks since it only prolongs // the test but doesn't give any advantages. - configuration.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, false); + configuration.set(CheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, false); // TODO: remove this line after FLINK-28243 will be done configuration.set(REQUIREMENTS_CHECK_DELAY, Duration.ZERO); return configuration; diff --git a/src/main/java/org/apache/flink/benchmark/InputBenchmark.java b/src/main/java/org/apache/flink/benchmark/InputBenchmark.java index 08b3fcf..62e5d4d 100644 --- a/src/main/java/org/apache/flink/benchmark/InputBenchmark.java +++ b/src/main/java/org/apache/flink/benchmark/InputBenchmark.java @@ -22,7 +22,7 @@ import org.apache.flink.benchmark.functions.MultiplyByTwo; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.api.functions.sink.legacy.DiscardingSink; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.OperationsPerInvocation; diff --git a/src/main/java/org/apache/flink/benchmark/KeyByBenchmarks.java b/src/main/java/org/apache/flink/benchmark/KeyByBenchmarks.java index 3fdc289..c4cb907 100644 --- a/src/main/java/org/apache/flink/benchmark/KeyByBenchmarks.java +++ b/src/main/java/org/apache/flink/benchmark/KeyByBenchmarks.java @@ -20,9 +20,11 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.benchmark.functions.BaseSourceWithKeyRange; +import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.api.functions.sink.legacy.DiscardingSink; +import org.apache.flink.streaming.util.keys.KeySelectorUtil; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.OperationsPerInvocation; import org.openjdk.jmh.runner.Runner; @@ -54,7 +56,7 @@ public void tupleKeyBy(FlinkEnvironmentContext context) throws Exception { env.setParallelism(4); env.addSource(new IncreasingTupleSource(TUPLE_RECORDS_PER_INVOCATION, 10)) - .keyBy(0) + .keyBy(e -> e.f0) .addSink(new DiscardingSink<>()); env.execute(); @@ -66,8 +68,8 @@ public void arrayKeyBy(FlinkEnvironmentContext context) throws Exception { StreamExecutionEnvironment env = context.env; env.setParallelism(4); - env.addSource(new IncreasingArraySource(ARRAY_RECORDS_PER_INVOCATION, 10)) - .keyBy(0) + DataStreamSource source = env.addSource(new IncreasingArraySource(ARRAY_RECORDS_PER_INVOCATION, 10)); + source.keyBy(KeySelectorUtil.getSelectorForArray(new int[]{0}, source.getType())) .addSink(new DiscardingSink<>()); env.execute(); diff --git a/src/main/java/org/apache/flink/benchmark/MemoryStateBackendBenchmark.java b/src/main/java/org/apache/flink/benchmark/MemoryStateBackendBenchmark.java index 706aad3..35b01d6 100644 --- a/src/main/java/org/apache/flink/benchmark/MemoryStateBackendBenchmark.java +++ b/src/main/java/org/apache/flink/benchmark/MemoryStateBackendBenchmark.java @@ -58,7 +58,7 @@ public void stateBackends(MemoryStateBackendContext context) throws Exception { @State(Thread) public static class MemoryStateBackendContext extends StateBackendContext { - @Param({"MEMORY", "FS", "FS_ASYNC"}) + @Param({"MEMORY", "FS"}) public StateBackend stateBackend = StateBackend.MEMORY; @Override diff --git a/src/main/java/org/apache/flink/benchmark/MultipleInputBenchmark.java b/src/main/java/org/apache/flink/benchmark/MultipleInputBenchmark.java index 1ec8920..72ad6c8 100644 --- a/src/main/java/org/apache/flink/benchmark/MultipleInputBenchmark.java +++ b/src/main/java/org/apache/flink/benchmark/MultipleInputBenchmark.java @@ -36,8 +36,8 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.MultipleConnectedStreams; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.DiscardingSink; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.sink.legacy.DiscardingSink; +import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction; import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.transformations.MultipleInputTransformation; diff --git a/src/main/java/org/apache/flink/benchmark/ProcessingTimerBenchmark.java b/src/main/java/org/apache/flink/benchmark/ProcessingTimerBenchmark.java index 721f31f..93a433c 100644 --- a/src/main/java/org/apache/flink/benchmark/ProcessingTimerBenchmark.java +++ b/src/main/java/org/apache/flink/benchmark/ProcessingTimerBenchmark.java @@ -17,14 +17,13 @@ package org.apache.flink.benchmark; -import org.apache.flink.configuration.Configuration; +import org.apache.flink.api.common.functions.OpenContext; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; -import org.apache.flink.streaming.api.functions.sink.DiscardingSink; -import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.functions.sink.legacy.DiscardingSink; +import org.apache.flink.streaming.api.functions.source.legacy.RichParallelSourceFunction; import org.apache.flink.util.Collector; - import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.OperationsPerInvocation; import org.openjdk.jmh.runner.Runner; @@ -75,7 +74,7 @@ private static class SingleRecordSource extends RichParallelSourceFunction pojoSerializer = TypeInformation.of(SerializationFrameworkMiniBenchmarks.MyPojo.class) - .createSerializer(config); + .createSerializer(config.getSerializerConfig()); TypeSerializer kryoSerializer = - new KryoSerializer<>(SerializationFrameworkMiniBenchmarks.MyPojo.class, config); + new KryoSerializer<>(SerializationFrameworkMiniBenchmarks.MyPojo.class, config.getSerializerConfig()); TypeSerializer avroSerializer = new AvroSerializer<>(org.apache.flink.benchmark.avro.MyPojo.class); diff --git a/src/main/java/org/apache/flink/benchmark/full/SerializationFrameworkAllBenchmarks.java b/src/main/java/org/apache/flink/benchmark/full/SerializationFrameworkAllBenchmarks.java index 00d40c5..7053465 100644 --- a/src/main/java/org/apache/flink/benchmark/full/SerializationFrameworkAllBenchmarks.java +++ b/src/main/java/org/apache/flink/benchmark/full/SerializationFrameworkAllBenchmarks.java @@ -24,10 +24,9 @@ import org.apache.flink.benchmark.FlinkEnvironmentContext; import org.apache.flink.benchmark.SerializationFrameworkMiniBenchmarks; import org.apache.flink.benchmark.functions.BaseSourceWithKeyRange; -import org.apache.flink.benchmark.functions.ScalaADTSource; import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.api.functions.sink.legacy.DiscardingSink; import com.twitter.chill.protobuf.ProtobufSerializer; import com.twitter.chill.thrift.TBaseSerializer; @@ -84,7 +83,7 @@ public void serializerKryoWithoutRegistration(FlinkEnvironmentContext context) throws Exception { StreamExecutionEnvironment env = context.env; env.setParallelism(4); - env.getConfig().enableForceKryo(); + env.getConfig().getSerializerConfig().setForceAvro(true); env.addSource(new PojoSource(RECORDS_PER_INVOCATION, 10)) .rebalance() @@ -98,7 +97,7 @@ public void serializerKryoWithoutRegistration(FlinkEnvironmentContext context) public void serializerAvroReflect(FlinkEnvironmentContext context) throws Exception { StreamExecutionEnvironment env = context.env; env.setParallelism(4); - env.getConfig().enableForceAvro(); + env.getConfig().getSerializerConfig().setForceAvro(true); env.addSource(new PojoSource(RECORDS_PER_INVOCATION, 10)) .rebalance() @@ -121,29 +120,17 @@ public void serializerAvroGeneric(FlinkEnvironmentContext context) throws Except env.execute(); } - @Benchmark - @OperationsPerInvocation(value = SerializationFrameworkMiniBenchmarks.RECORDS_PER_INVOCATION) - public void serializerScalaADT(FlinkEnvironmentContext context) throws Exception { - StreamExecutionEnvironment env = context.env; - env.setParallelism(4); - - env.addSource(new ScalaADTSource(RECORDS_PER_INVOCATION), ScalaADTSource.adtTypeInfo()) - .rebalance() - .addSink(new DiscardingSink<>()); - - env.execute(); - } - @Benchmark @OperationsPerInvocation(value = SerializationFrameworkMiniBenchmarks.RECORDS_PER_INVOCATION) public void serializerKryoThrift(FlinkEnvironmentContext context) throws Exception { StreamExecutionEnvironment env = context.env; env.setParallelism(4); ExecutionConfig executionConfig = env.getConfig(); - executionConfig.enableForceKryo(); - executionConfig.addDefaultKryoSerializer( + + executionConfig.getSerializerConfig().setForceKryo(true); + executionConfig.getSerializerConfig().addDefaultKryoSerializer( org.apache.flink.benchmark.thrift.MyPojo.class, TBaseSerializer.class); - executionConfig.addDefaultKryoSerializer( + executionConfig.getSerializerConfig().addDefaultKryoSerializer( org.apache.flink.benchmark.thrift.MyOperation.class, TBaseSerializer.class); env.addSource(new ThriftPojoSource(RECORDS_PER_INVOCATION, 10)) @@ -159,11 +146,11 @@ public void serializerKryoProtobuf(FlinkEnvironmentContext context) throws Excep StreamExecutionEnvironment env = context.env; env.setParallelism(4); ExecutionConfig executionConfig = env.getConfig(); - executionConfig.enableForceKryo(); - executionConfig.registerTypeWithKryoSerializer( + executionConfig.getSerializerConfig().setForceKryo(true); + executionConfig.getSerializerConfig().registerTypeWithKryoSerializer( org.apache.flink.benchmark.protobuf.MyPojoOuterClass.MyPojo.class, ProtobufSerializer.class); - executionConfig.registerTypeWithKryoSerializer( + executionConfig.getSerializerConfig().registerTypeWithKryoSerializer( org.apache.flink.benchmark.protobuf.MyPojoOuterClass.MyOperation.class, ProtobufSerializer.class); diff --git a/src/main/java/org/apache/flink/benchmark/full/StringSerializationBenchmark.java b/src/main/java/org/apache/flink/benchmark/full/StringSerializationBenchmark.java index 57ded08..6cc032e 100644 --- a/src/main/java/org/apache/flink/benchmark/full/StringSerializationBenchmark.java +++ b/src/main/java/org/apache/flink/benchmark/full/StringSerializationBenchmark.java @@ -65,7 +65,7 @@ public class StringSerializationBenchmark extends BenchmarkBase { int length; String input; ExecutionConfig config = new ExecutionConfig(); - TypeSerializer serializer = TypeInformation.of(String.class).createSerializer(config); + TypeSerializer serializer = TypeInformation.of(String.class).createSerializer(config.getSerializerConfig()); ByteArrayInputStream serializedBuffer; DataInputView serializedStream; diff --git a/src/main/java/org/apache/flink/benchmark/functions/BaseSourceWithKeyRange.java b/src/main/java/org/apache/flink/benchmark/functions/BaseSourceWithKeyRange.java index 7629c84..e44b59b 100644 --- a/src/main/java/org/apache/flink/benchmark/functions/BaseSourceWithKeyRange.java +++ b/src/main/java/org/apache/flink/benchmark/functions/BaseSourceWithKeyRange.java @@ -18,8 +18,7 @@ package org.apache.flink.benchmark.functions; -import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; -import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; +import org.apache.flink.streaming.api.functions.source.legacy.ParallelSourceFunction; /** Abstract base class for sources with a defined number of events and a fixed key range. */ public abstract class BaseSourceWithKeyRange implements ParallelSourceFunction { diff --git a/src/main/java/org/apache/flink/benchmark/functions/IntegerLongSource.java b/src/main/java/org/apache/flink/benchmark/functions/IntegerLongSource.java index aeddfa3..22ad2f9 100644 --- a/src/main/java/org/apache/flink/benchmark/functions/IntegerLongSource.java +++ b/src/main/java/org/apache/flink/benchmark/functions/IntegerLongSource.java @@ -18,7 +18,7 @@ package org.apache.flink.benchmark.functions; -import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.functions.source.legacy.RichParallelSourceFunction; public class IntegerLongSource extends RichParallelSourceFunction { private volatile boolean running = true; diff --git a/src/main/java/org/apache/flink/benchmark/functions/LongSource.java b/src/main/java/org/apache/flink/benchmark/functions/LongSource.java index 36478c3..985dd39 100644 --- a/src/main/java/org/apache/flink/benchmark/functions/LongSource.java +++ b/src/main/java/org/apache/flink/benchmark/functions/LongSource.java @@ -18,7 +18,7 @@ package org.apache.flink.benchmark.functions; -import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.functions.source.legacy.RichParallelSourceFunction; public class LongSource extends RichParallelSourceFunction { diff --git a/src/main/java/org/apache/flink/state/benchmark/RescalingBenchmarkBase.java b/src/main/java/org/apache/flink/state/benchmark/RescalingBenchmarkBase.java index be25fd7..f5b749d 100644 --- a/src/main/java/org/apache/flink/state/benchmark/RescalingBenchmarkBase.java +++ b/src/main/java/org/apache/flink/state/benchmark/RescalingBenchmarkBase.java @@ -17,6 +17,7 @@ */ package org.apache.flink.state.benchmark; +import org.apache.flink.api.common.functions.OpenContext; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; @@ -156,8 +157,8 @@ protected static class TestKeyedFunction extends KeyedProcessFunction("RandomState", byte[].class)); diff --git a/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java b/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java index 345063c..8c1f970 100644 --- a/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java +++ b/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java @@ -72,7 +72,7 @@ protected KeyedStateBackend createKeyedStateBackend(TtlTimeProvider ttlTim public static File createStateDataDir() throws IOException { Configuration benchMarkConfig = ConfigUtil.loadBenchMarkConf(); - String stateDataDirPath = benchMarkConfig.getString(StateBenchmarkOptions.STATE_DATA_DIR); + String stateDataDirPath = benchMarkConfig.get(StateBenchmarkOptions.STATE_DATA_DIR); File dataDir = null; if (stateDataDirPath != null) { dataDir = new File(stateDataDirPath); diff --git a/src/test/java/org/apache/flink/config/ConfigUtilTest.java b/src/test/java/org/apache/flink/config/ConfigUtilTest.java index 9f3cbaa..a1b8324 100644 --- a/src/test/java/org/apache/flink/config/ConfigUtilTest.java +++ b/src/test/java/org/apache/flink/config/ConfigUtilTest.java @@ -28,7 +28,7 @@ public class ConfigUtilTest { @Test public void testLoadConf() { Configuration cfg = ConfigUtil.loadBenchMarkConf(); - String dir = cfg.getString(StateBenchmarkOptions.STATE_DATA_DIR); + String dir = cfg.get(StateBenchmarkOptions.STATE_DATA_DIR); Assert.assertEquals("/tmp/data", dir); } }