This project is targeted to test Flink data throughput and latency with Kafka data source.
Paul Zhang
- Flink 1.15.4
- Hadoop Yarn cluster
- Kafka 1.x or above
If you have deployed Flink/Kafka with different version, please update Flink/Kafka version properties in pom.xml
before compilation.
mvn clean package
The binary distribution locates in benchmark-dist/target/dist/flink-benchmark
.
Note: The number of partitions for both input/output Kafka topic should be equal to the number of threads for Kafka datagen and parallelisms for Flink benchmark job.
Explanations of parameters are listed in Command options
If the input/output topic already exist, please delete them first.
./kafka-topics.sh --zookeeper manager.bigdata:2181,master.bigdata:2181,worker.bigdata:2181 --delete --topic input
./kafka-topics.sh --zookeeper manager.bigdata:2181,master.bigdata:2181,worker.bigdata:2181 --delete --topic output
Create input/output topic with designated number of partitions.
# Create topic with designated partitions
./kafka-topics.sh --create --zookeeper manager.bigdata:2181,master.bigdata:2181,worker.bigdata:2181 --replication-factor 1 --partitions 4 --topic output
./kafka-topics.sh --create --zookeeper manager.bigdata:2181,master.bigdata:2181,worker.bigdata:2181 --replication-factor 1 --partitions 4 --topic input
# Latency and Throughput benchmark
./bin/flink run -m yarn-cluster -c com.paultech.Latency /path/to/benchmark/benchmark-1.0.jar --parallelism 4 --output-topic output --input-topic input --bootstrap-server kafka01:6667,kafka02:6667,kafka03:6667
java -jar kafka-datasource-1.0.jar -t input -b kafka01:6667,kafka02:6667,kafka03:6667 -i 10 -c 100 -n 4
Your need to run result analyzer to calculate the histogram of latency.
java -jar kafka-result-analyzer-1.0.jar -b kafka01:6667,kafka02:6667,kafka03:6667 -t output
Flink might need some time to consume all pending records, so you might need run result analyzer several times.
java -jar kafka-datasource-1.0.jar -t test_topic -b kafka01:6667,kafka02:6667,kafka03:6667 -a 0 -i 10 -n 4 -p uuid
# Window Throughput benchmark
./bin/flink run -m yarn-cluster -c com.paultech.WindowThroughput /path/to/benchmark/benchmark-1.0.jar --parallelism 4 --output-topic output --input-topic input --bootstrap-server kafka01:6667,kafka02:6667,kafka03:6667
Data will be collected in 1-minute-window.
Use the following command to retrieve the result from output topic:
./kafka-console-consumer.sh --bootstrap-server kafka01:6667,kafka02:6667,kafka03:6667 --topic output
The output is how many records in a 1-minute-long window that Flink is able to process.
- --parallelism: Parallelism for Flink Stream Execution Environment
- --bufferTimeout: Flink buffer timeout
- --input-topic: Kafka topic where Flink reads data
- --output-topic: Kafka topic where Flink writes data
- --bootstrap-server: Addresses and ports for kafka brokers
- --consumer-group: Consumer group. Default is "flink-bench"
- --offset: Consume kafka topic from earliest or latest offset.
Examples:
# Run Throughput
./bin/flink run -m 10.180.210.187:8081 -c com.paultech.WindowThroughput /root/zy/benchmark/benchmark-1.0.jar --parallelism 12 --output-topic output --input-topic input --bootstrap-server 10.180.210.187:6667,10.180.210.188:6667,10.180.210.189:6667
# Run Latency
./bin/flink run -m 10.180.210.187:8081 -c com.paultech.Latency /root/zy/benchmark/benchmark-1.0.jar --parallelism 12 --output-topic output --input-topic input --bootstrap-server 10.180.210.187:6667,10.180.210.188:6667,10.180.210.189:6667
- -b: Kafka bootstrap server
- -t: Output Kafka topic
- -a: Acks
- -n: Number of threads
- -i: Message send interval
- -c: Messages send per interval
- -p: Kafka data payload type. Can be uuid or 1kb
- -h: Get help message
Examples:
java -jar kafka-datasource-1.0-SNAPSHOT.jar -b 10.180.210.187:6667,10.180.210.188:6667,10.180.210.189:6667 -t input -a 0 -n 12
- -b: Kafka bootstrap server
- -t: Output Kafka topic
- -g: Kafka consumer group
Examples:
java -jar kafka-latency-analyzer-1.0-SNAPSHOT.jar -b 10.180.210.187:6667,10.180.210.188:6667,10.180.210.189:6667 -t output1 -g analyzer
- Enter
flink-benchmark/benchmark-dist/target/dist/benchmark-dist/bin
. - Update
benchmark-env.sh
by settingFLINK_HOME
andJAVA_HOME
(optional). - Execute
run.sh
and then follow the instruction.