From 525d4f69cf26a6317fcf38b3049d59e670a8661f Mon Sep 17 00:00:00 2001 From: Leonard Xu Date: Tue, 23 Aug 2022 00:00:49 +0800 Subject: [PATCH 1/3] [mysql] Make the records of snapshot split don't cross checkpoints --- .../debezium/reader/BinlogSplitReader.java | 9 +- .../debezium/reader/SnapshotSplitReader.java | 12 +- .../connectors/mysql/source/MySqlSource.java | 4 +- .../source/reader/MySqlRecordEmitter.java | 18 +- .../source/reader/MySqlSourceReader.java | 8 +- .../mysql/source/reader/MySqlSplitReader.java | 10 +- .../mysql/source/split/MySqlRecords.java | 14 +- .../mysql/source/split/SourceRecords.java | 49 ++++ .../reader/BinlogSplitReaderTest.java | 21 +- .../reader/SnapshotSplitReaderTest.java | 7 +- .../mysql/source/MySqlSourceITCase.java | 216 +++++++++++++++ .../source/reader/MySqlRecordEmitterTest.java | 6 +- .../source/reader/MySqlSourceReaderTest.java | 251 ++++++++++++++++-- 13 files changed, 566 insertions(+), 59 deletions(-) create mode 100644 flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/split/SourceRecords.java diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java index dc95119ce0..270caa5572 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java @@ -28,6 +28,7 @@ import com.ververica.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo; import com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit; import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit; +import com.ververica.cdc.connectors.mysql.source.split.SourceRecords; import com.ververica.cdc.connectors.mysql.source.utils.ChunkUtils; import com.ververica.cdc.connectors.mysql.source.utils.RecordUtils; import io.debezium.connector.base.ChangeEventQueue; @@ -63,7 +64,7 @@ * A Debezium binlog reader implementation that also support reads binlog and filter overlapping * snapshot data that {@link SnapshotSplitReader} read. */ -public class BinlogSplitReader implements DebeziumReader { +public class BinlogSplitReader implements DebeziumReader { private static final Logger LOG = LoggerFactory.getLogger(BinlogSplitReader.class); private final StatefulTaskContext statefulTaskContext; @@ -145,7 +146,7 @@ public boolean isFinished() { @Nullable @Override - public Iterator pollSplitRecords() throws InterruptedException { + public Iterator pollSplitRecords() throws InterruptedException { checkReadException(); final List sourceRecords = new ArrayList<>(); if (currentTaskRunning) { @@ -155,7 +156,9 @@ public Iterator pollSplitRecords() throws InterruptedException { sourceRecords.add(event.getRecord()); } } - return sourceRecords.iterator(); + Set sourceRecordsSet = new HashSet<>(); + sourceRecordsSet.add(new SourceRecords(sourceRecords)); + return sourceRecordsSet.iterator(); } else { return null; } diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java index 2a8db8de49..14772beb80 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java @@ -29,6 +29,7 @@ import com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit; import com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplit; import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit; +import com.ververica.cdc.connectors.mysql.source.split.SourceRecords; import com.ververica.cdc.connectors.mysql.source.utils.RecordUtils; import io.debezium.config.Configuration; import io.debezium.connector.base.ChangeEventQueue; @@ -49,9 +50,11 @@ import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; @@ -71,7 +74,7 @@ * A snapshot reader that reads data from Table in split level, the split is assigned by primary key * range. */ -public class SnapshotSplitReader implements DebeziumReader { +public class SnapshotSplitReader implements DebeziumReader { private static final Logger LOG = LoggerFactory.getLogger(SnapshotSplitReader.class); private final StatefulTaskContext statefulTaskContext; @@ -235,7 +238,7 @@ public boolean isFinished() { @Nullable @Override - public Iterator pollSplitRecords() throws InterruptedException { + public Iterator pollSplitRecords() throws InterruptedException { checkReadException(); if (hasNextElement.get()) { @@ -288,7 +291,10 @@ public Iterator pollSplitRecords() throws InterruptedException { normalizedRecords.add(lowWatermark); normalizedRecords.addAll(formatMessageTimestamp(snapshotRecords.values())); normalizedRecords.add(highWatermark); - return normalizedRecords.iterator(); + + final Set sourceRecordsSet = new HashSet<>(); + sourceRecordsSet.add(new SourceRecords(normalizedRecords)); + return sourceRecordsSet.iterator(); } // the data has been polled, no more data reachEnd.compareAndSet(false, true); diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlSource.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlSource.java index 35f9ab24f1..a94f722447 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlSource.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlSource.java @@ -51,11 +51,11 @@ import com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader; import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit; import com.ververica.cdc.connectors.mysql.source.split.MySqlSplitSerializer; +import com.ververica.cdc.connectors.mysql.source.split.SourceRecords; import com.ververica.cdc.connectors.mysql.table.StartupMode; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import io.debezium.jdbc.JdbcConnection; import io.debezium.relational.TableId; -import org.apache.kafka.connect.source.SourceRecord; import java.lang.reflect.Method; import java.util.List; @@ -133,7 +133,7 @@ public SourceReader createReader(SourceReaderContext readerContex // create source config for the given subtask (e.g. unique server id) MySqlSourceConfig sourceConfig = configFactory.createConfig(readerContext.getIndexOfSubtask()); - FutureCompletingBlockingQueue> elementsQueue = + FutureCompletingBlockingQueue> elementsQueue = new FutureCompletingBlockingQueue<>(); final Method metricGroupMethod = readerContext.getClass().getMethod("metricGroup"); diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java index b93a4d5608..6fa2017521 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java @@ -23,6 +23,7 @@ import com.ververica.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics; import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset; import com.ververica.cdc.connectors.mysql.source.split.MySqlSplitState; +import com.ververica.cdc.connectors.mysql.source.split.SourceRecords; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import com.ververica.cdc.debezium.history.FlinkJsonTableChangeSerializer; import io.debezium.document.Array; @@ -32,6 +33,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Iterator; + import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getBinlogPosition; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getFetchTimestamp; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getHistoryRecord; @@ -50,7 +53,7 @@ * emit records rather than emit the records directly. */ public final class MySqlRecordEmitter - implements RecordEmitter { + implements RecordEmitter { private static final Logger LOG = LoggerFactory.getLogger(MySqlRecordEmitter.class); private static final FlinkJsonTableChangeSerializer TABLE_CHANGE_SERIALIZER = @@ -72,7 +75,18 @@ public MySqlRecordEmitter( } @Override - public void emitRecord(SourceRecord element, SourceOutput output, MySqlSplitState splitState) + public void emitRecord( + SourceRecords sourceRecords, SourceOutput output, MySqlSplitState splitState) + throws Exception { + final Iterator elementIterator = + sourceRecords.getSourceRecordList().iterator(); + while (elementIterator.hasNext()) { + processElement(elementIterator.next(), output, splitState); + } + } + + private void processElement( + SourceRecord element, SourceOutput output, MySqlSplitState splitState) throws Exception { if (isWatermarkEvent(element)) { BinlogOffset watermark = getWatermark(element); diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java index 92499a71d0..43c34f0aa6 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java @@ -45,11 +45,11 @@ import com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplitState; import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit; import com.ververica.cdc.connectors.mysql.source.split.MySqlSplitState; +import com.ververica.cdc.connectors.mysql.source.split.SourceRecords; import com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils; import io.debezium.connector.mysql.MySqlConnection; import io.debezium.relational.TableId; import io.debezium.relational.history.TableChanges; -import org.apache.kafka.connect.source.SourceRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,7 +71,7 @@ /** The source reader for MySQL source splits. */ public class MySqlSourceReader extends SingleThreadMultiplexSourceReaderBase< - SourceRecord, T, MySqlSplit, MySqlSplitState> { + SourceRecords, T, MySqlSplit, MySqlSplitState> { private static final Logger LOG = LoggerFactory.getLogger(MySqlSourceReader.class); @@ -83,9 +83,9 @@ public class MySqlSourceReader private MySqlBinlogSplit suspendedBinlogSplit; public MySqlSourceReader( - FutureCompletingBlockingQueue> elementQueue, + FutureCompletingBlockingQueue> elementQueue, Supplier splitReaderSupplier, - RecordEmitter recordEmitter, + RecordEmitter recordEmitter, Configuration config, MySqlSourceReaderContext context, MySqlSourceConfig sourceConfig) { diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSplitReader.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSplitReader.java index c6afae43ef..ca08a4cfee 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSplitReader.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSplitReader.java @@ -30,8 +30,8 @@ import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig; import com.ververica.cdc.connectors.mysql.source.split.MySqlRecords; import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit; +import com.ververica.cdc.connectors.mysql.source.split.SourceRecords; import io.debezium.connector.mysql.MySqlConnection; -import org.apache.kafka.connect.source.SourceRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +46,7 @@ import static com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.createMySqlConnection; /** The {@link SplitReader} implementation for the {@link MySqlSource}. */ -public class MySqlSplitReader implements SplitReader { +public class MySqlSplitReader implements SplitReader { private static final Logger LOG = LoggerFactory.getLogger(MySqlSplitReader.class); private final Queue splits; @@ -54,7 +54,7 @@ public class MySqlSplitReader implements SplitReader { private final int subtaskId; private final MySqlSourceReaderContext context; - @Nullable private DebeziumReader currentReader; + @Nullable private DebeziumReader currentReader; @Nullable private String currentSplitId; public MySqlSplitReader( @@ -66,12 +66,12 @@ public MySqlSplitReader( } @Override - public RecordsWithSplitIds fetch() throws IOException { + public RecordsWithSplitIds fetch() throws IOException { checkSplitOrStartNext(); checkNeedStopBinlogReader(); - Iterator dataIt; + Iterator dataIt; try { dataIt = currentReader.pollSplitRecords(); } catch (InterruptedException e) { diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/split/MySqlRecords.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/split/MySqlRecords.java index c551edb7ef..34ba270bca 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/split/MySqlRecords.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/split/MySqlRecords.java @@ -18,8 +18,6 @@ import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; -import org.apache.kafka.connect.source.SourceRecord; - import javax.annotation.Nullable; import java.util.Collections; @@ -29,11 +27,11 @@ /** * An implementation of {@link RecordsWithSplitIds} which contains the records of one table split. */ -public final class MySqlRecords implements RecordsWithSplitIds { +public final class MySqlRecords implements RecordsWithSplitIds { @Nullable private String splitId; - @Nullable private Iterator recordsForCurrentSplit; - @Nullable private final Iterator recordsForSplit; + @Nullable private Iterator recordsForCurrentSplit; + @Nullable private final Iterator recordsForSplit; private final Set finishedSnapshotSplits; public MySqlRecords( @@ -59,8 +57,8 @@ public String nextSplit() { @Nullable @Override - public SourceRecord nextRecordFromSplit() { - final Iterator recordsForSplit = this.recordsForCurrentSplit; + public SourceRecords nextRecordFromSplit() { + final Iterator recordsForSplit = this.recordsForCurrentSplit; if (recordsForSplit != null) { if (recordsForSplit.hasNext()) { return recordsForSplit.next(); @@ -78,7 +76,7 @@ public Set finishedSplits() { } public static MySqlRecords forRecords( - final String splitId, final Iterator recordsForSplit) { + final String splitId, final Iterator recordsForSplit) { return new MySqlRecords(splitId, recordsForSplit, Collections.emptySet()); } diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/split/SourceRecords.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/split/SourceRecords.java new file mode 100644 index 0000000000..ea6e65377c --- /dev/null +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/split/SourceRecords.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.mysql.source.split; + +import org.apache.kafka.connect.source.SourceRecord; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** Data structure to describe a set of {@link SourceRecord}. */ +public final class SourceRecords { + + private final List sourceRecords; + + public SourceRecords(List sourceRecords) { + this.sourceRecords = sourceRecords; + } + + public List getSourceRecordList() { + return sourceRecords; + } + + public Iterator iterator() { + return sourceRecords.iterator(); + } + + public static SourceRecords fromSingleRecord(SourceRecord record) { + final List records = new ArrayList<>(); + records.add(record); + return new SourceRecords(records); + } +} diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java index 2d5c1ed193..e9123fbe41 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java @@ -34,6 +34,7 @@ import com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit; import com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplit; import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit; +import com.ververica.cdc.connectors.mysql.source.split.SourceRecords; import com.ververica.cdc.connectors.mysql.source.utils.RecordUtils; import com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils; import com.ververica.cdc.connectors.mysql.table.StartupOptions; @@ -362,7 +363,7 @@ private MySqlBinlogSplit createBinlogSplitFromLatestOffset(MySqlSourceConfig sou private List pollRecordsFromReader( BinlogSplitReader reader, Predicate filter) { List records = new ArrayList<>(); - Iterator recordIterator; + Iterator recordIterator; try { recordIterator = reader.pollSplitRecords(); } catch (InterruptedException e) { @@ -372,9 +373,12 @@ private List pollRecordsFromReader( return records; } while (recordIterator.hasNext()) { - SourceRecord record = recordIterator.next(); - if (filter.test(record)) { - records.add(record); + Iterator iterator = recordIterator.next().iterator(); + while (iterator.hasNext()) { + SourceRecord record = iterator.next(); + if (filter.test(record)) { + records.add(record); + } } } LOG.debug("Records polled: {}", records); @@ -434,11 +438,14 @@ private List readBinlogSplits( if (snapshotSplitReader.isFinished()) { snapshotSplitReader.submitSplit(sqlSplit); } - Iterator res; + Iterator res; while ((res = snapshotSplitReader.pollSplitRecords()) != null) { while (res.hasNext()) { - SourceRecord sourceRecord = res.next(); - snapshotRecords.add(sourceRecord); + Iterator iterator = res.next().iterator(); + while (iterator.hasNext()) { + SourceRecord sourceRecord = iterator.next(); + snapshotRecords.add(sourceRecord); + } } } } diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java index f5552d8945..d7a3c26304 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java @@ -29,6 +29,7 @@ import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig; import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory; import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit; +import com.ververica.cdc.connectors.mysql.source.split.SourceRecords; import com.ververica.cdc.connectors.mysql.testutils.RecordsFormatter; import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase; import io.debezium.connector.mysql.MySqlConnection; @@ -397,11 +398,11 @@ private List readTableSnapshotSplits( if (snapshotSplitReader.isFinished()) { snapshotSplitReader.submitSplit(sqlSplit); } - Iterator res; + Iterator res; while ((res = snapshotSplitReader.pollSplitRecords()) != null) { while (res.hasNext()) { - SourceRecord sourceRecord = res.next(); - result.add(sourceRecord); + SourceRecords sourceRecords = res.next(); + result.addAll(sourceRecords.getSourceRecordList()); } } } diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceITCase.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceITCase.java index 64dc9b3030..fda3262717 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceITCase.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceITCase.java @@ -17,26 +17,49 @@ package com.ververica.cdc.connectors.mysql.source; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.execution.JobClient; import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl; import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.collect.CollectResultIterator; +import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator; +import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory; +import org.apache.flink.streaming.api.operators.collect.CollectStreamSink; +import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.UniqueConstraint; +import org.apache.flink.table.data.RowData; import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.Row; +import org.apache.flink.types.RowUtils; import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.Collector; import org.apache.flink.util.ExceptionUtils; import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils; +import com.ververica.cdc.connectors.mysql.table.MySqlDeserializationConverterFactory; +import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase; +import com.ververica.cdc.debezium.DebeziumDeserializationSchema; +import com.ververica.cdc.debezium.table.MetadataConverter; +import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema; import io.debezium.connector.mysql.MySqlConnection; import io.debezium.jdbc.JdbcConnection; import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.connect.source.SourceRecord; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -44,17 +67,22 @@ import java.lang.reflect.Field; import java.sql.SQLException; +import java.time.Duration; import java.time.ZoneId; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Properties; import java.util.Random; +import java.util.UUID; import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; import static java.lang.String.format; import static org.apache.flink.util.Preconditions.checkState; @@ -264,6 +292,132 @@ public void testConsumingTableWithoutPrimaryKey() { } } + @Test + @SuppressWarnings({"rawtypes", "unchecked"}) + public void testSnapshotSplitReadingFailCrossCheckpoints() throws Exception { + customDatabase.createAndInitialize(); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); + env.enableCheckpointing(5000L); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); + + // The sleeping source will sleep awhile after send per record + MySqlSource sleepingSource = buildSleepingSource(); + DataStreamSource source = + env.fromSource(sleepingSource, WatermarkStrategy.noWatermarks(), "selfSource"); + + String[] snapshotForSingleTable = + new String[] { + "+I[101, user_1, Shanghai, 123567891234]", + "+I[102, user_2, Shanghai, 123567891234]", + "+I[103, user_3, Shanghai, 123567891234]", + "+I[109, user_4, Shanghai, 123567891234]", + "+I[110, user_5, Shanghai, 123567891234]", + "+I[111, user_6, Shanghai, 123567891234]", + "+I[118, user_7, Shanghai, 123567891234]", + "+I[121, user_8, Shanghai, 123567891234]", + "+I[123, user_9, Shanghai, 123567891234]", + "+I[1009, user_10, Shanghai, 123567891234]", + "+I[1010, user_11, Shanghai, 123567891234]", + "+I[1011, user_12, Shanghai, 123567891234]", + "+I[1012, user_13, Shanghai, 123567891234]", + "+I[1013, user_14, Shanghai, 123567891234]", + "+I[1014, user_15, Shanghai, 123567891234]", + "+I[1015, user_16, Shanghai, 123567891234]", + "+I[1016, user_17, Shanghai, 123567891234]", + "+I[1017, user_18, Shanghai, 123567891234]", + "+I[1018, user_19, Shanghai, 123567891234]", + "+I[1019, user_20, Shanghai, 123567891234]", + "+I[2000, user_21, Shanghai, 123567891234]" + }; + TypeSerializer serializer = + source.getTransformation().getOutputType().createSerializer(env.getConfig()); + String accumulatorName = "dataStreamCollect_" + UUID.randomUUID(); + CollectSinkOperatorFactory factory = + new CollectSinkOperatorFactory(serializer, accumulatorName); + CollectSinkOperator operator = (CollectSinkOperator) factory.getOperator(); + CollectResultIterator iterator = + new CollectResultIterator( + operator.getOperatorIdFuture(), + serializer, + accumulatorName, + env.getCheckpointConfig()); + CollectStreamSink sink = new CollectStreamSink(source, factory); + sink.name("Data stream collect sink"); + env.addOperator(sink.getTransformation()); + JobClient jobClient = env.executeAsync("snapshotSplitTest"); + iterator.setJobClient(jobClient); + JobID jobId = jobClient.getJobID(); + + // Trigger failover once some snapshot records has been sent by sleeping source + if (iterator.hasNext()) { + triggerFailover( + FailoverType.JM, + jobId, + miniClusterResource.getMiniCluster(), + () -> sleepMs(100)); + } + + List expectedSnapshotData = new ArrayList<>(Arrays.asList(snapshotForSingleTable)); + + // Check all snapshot records are sent with exactly-once semantics + assertEqualsInAnyOrder( + expectedSnapshotData, fetchRowData(iterator, expectedSnapshotData.size())); + jobClient.cancel().get(); + } + + private MySqlSource buildSleepingSource() { + ResolvedSchema physicalSchema = + new ResolvedSchema( + Arrays.asList( + Column.physical("id", DataTypes.BIGINT().notNull()), + Column.physical("name", DataTypes.STRING()), + Column.physical("address", DataTypes.STRING()), + Column.physical("phone_number", DataTypes.STRING())), + new ArrayList<>(), + UniqueConstraint.primaryKey("pk", Collections.singletonList("id"))); + RowType physicalDataType = + (RowType) physicalSchema.toPhysicalRowDataType().getLogicalType(); + MetadataConverter[] metadataConverters = new MetadataConverter[0]; + final TypeInformation typeInfo = InternalTypeInfo.of(physicalDataType); + + SleepingRowDataDebeziumDeserializeSchema deserializer = + new SleepingRowDataDebeziumDeserializeSchema( + RowDataDebeziumDeserializeSchema.newBuilder() + .setPhysicalRowType(physicalDataType) + .setMetadataConverters(metadataConverters) + .setResultTypeInfo(typeInfo) + .setServerTimeZone(ZoneId.of("UTC")) + .setUserDefinedConverterFactory( + MySqlDeserializationConverterFactory.instance()) + .build(), + 1000L); + return MySqlSource.builder() + .hostname(MYSQL_CONTAINER.getHost()) + .port(MYSQL_CONTAINER.getDatabasePort()) + .databaseList(customDatabase.getDatabaseName()) + .tableList(customDatabase.getDatabaseName() + ".customers") + .username(customDatabase.getUsername()) + .password(customDatabase.getPassword()) + .serverTimeZone("UTC") + .serverId(getServerId()) + .splitSize(8096) + .splitMetaGroupSize(1000) + .distributionFactorUpper(1000.0d) + .distributionFactorLower(0.05d) + .fetchSize(1024) + .connectTimeout(Duration.ofSeconds(30)) + .connectMaxRetries(3) + .connectionPoolSize(20) + .debeziumProperties(new Properties()) + .startupOptions(StartupOptions.initial()) + .deserializer(deserializer) + .scanNewlyAddedTableEnabled(false) + .jdbcProperties(new Properties()) + .heartbeatInterval(Duration.ofSeconds(30)) + .build(); + } + private void testMySqlParallelSource( FailoverType failoverType, FailoverPhase failoverPhase, String[] captureCustomerTables) throws Exception { @@ -391,6 +545,28 @@ private void testMySqlParallelSource( tableResult.getJobClient().get().cancel().get(); } + private static List convertRowDataToRowString(List rows) { + LinkedHashMap map = new LinkedHashMap<>(); + map.put("id", 0); + map.put("name", 1); + map.put("address", 2); + map.put("phone_number", 3); + return rows.stream() + .map( + row -> + RowUtils.createRowWithNamedPositions( + row.getRowKind(), + new Object[] { + row.getLong(0), + row.getString(1), + row.getString(2), + row.getString(3) + }, + map) + .toString()) + .collect(Collectors.toList()); + } + private void testNewlyAddedTableOneByOne( int parallelism, FailoverType failoverType, @@ -602,6 +778,16 @@ private static List fetchRows(Iterator iter, int size) { return rows; } + private static List fetchRowData(Iterator iter, int size) { + List rows = new ArrayList<>(size); + while (size > 0 && iter.hasNext()) { + RowData row = iter.next(); + rows.add(row); + size--; + } + return convertRowDataToRowString(rows); + } + private String getTableNameRegex(String[] captureCustomerTables) { checkState(captureCustomerTables.length > 0); if (captureCustomerTables.length == 1) { @@ -826,4 +1012,34 @@ private static int sinkSize(String sinkName) { } } } + + /** + * A {@link DebeziumDeserializationSchema} implementation which sleep given milliseconds after + * deserialize per record, this class is designed for test. + */ + static class SleepingRowDataDebeziumDeserializeSchema + implements DebeziumDeserializationSchema { + + private static final long serialVersionUID = 1L; + + private final RowDataDebeziumDeserializeSchema deserializeSchema; + private final long sleepMs; + + public SleepingRowDataDebeziumDeserializeSchema( + RowDataDebeziumDeserializeSchema deserializeSchema, long sleepMs) { + this.deserializeSchema = deserializeSchema; + this.sleepMs = sleepMs; + } + + @Override + public void deserialize(SourceRecord record, Collector out) throws Exception { + deserializeSchema.deserialize(record, out); + Thread.sleep(sleepMs); + } + + @Override + public TypeInformation getProducedType() { + return deserializeSchema.getProducedType(); + } + } } diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java index 3ea6a83358..614ff259fc 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java @@ -25,6 +25,7 @@ import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset; import com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit; import com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplitState; +import com.ververica.cdc.connectors.mysql.source.split.SourceRecords; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import io.debezium.heartbeat.Heartbeat; import org.apache.kafka.connect.source.SourceRecord; @@ -50,7 +51,10 @@ public void testHeartbeatEventHandling() throws Exception { fakeOffset.getOffset(), record -> { try { - recordEmitter.emitRecord(record, new TestingReaderOutput<>(), splitState); + recordEmitter.emitRecord( + SourceRecords.fromSingleRecord(record), + new TestingReaderOutput<>(), + splitState); } catch (Exception e) { throw new RuntimeException("Failed to emit heartbeat record", e); } diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java index 832b29b427..f15668b5ed 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java @@ -21,6 +21,7 @@ import org.apache.flink.api.connector.source.ReaderOutput; import org.apache.flink.api.connector.source.SourceOutput; import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.connector.base.source.reader.RecordEmitter; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; import org.apache.flink.connector.testutils.source.reader.TestingReaderContext; @@ -30,6 +31,7 @@ import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.types.DataType; import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils; import com.ververica.cdc.connectors.mysql.source.MySqlSourceTestBase; @@ -38,20 +40,28 @@ import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig; import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory; import com.ververica.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics; +import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset; import com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit; import com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplit; import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit; +import com.ververica.cdc.connectors.mysql.source.split.MySqlSplitState; +import com.ververica.cdc.connectors.mysql.source.split.SourceRecords; import com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.connectors.mysql.testutils.RecordsFormatter; import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; +import com.ververica.cdc.debezium.history.FlinkJsonTableChangeSerializer; import io.debezium.connector.mysql.MySqlConnection; +import io.debezium.document.Array; import io.debezium.jdbc.JdbcConnection; import io.debezium.relational.TableId; +import io.debezium.relational.history.HistoryRecord; import io.debezium.relational.history.TableChanges; import org.apache.kafka.connect.source.SourceRecord; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.lang.reflect.Method; import java.sql.Connection; @@ -62,12 +72,23 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getBinlogPosition; +import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getFetchTimestamp; +import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getHistoryRecord; +import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getMessageTimestamp; +import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getWatermark; +import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isDataChangeRecord; +import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isHeartbeatEvent; +import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isHighWatermarkEvent; +import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isSchemaChangeEvent; +import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isWatermarkEvent; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; @@ -100,10 +121,6 @@ public void testBinlogReadFailoverCrossTransaction() throws Exception { createBinlogSplit(sourceConfig).asBinlogSplit(), tableSchemas); } - MySqlSourceReader reader = createReader(sourceConfig); - reader.start(); - reader.addSplits(Arrays.asList(binlogSplit)); - // step-1: make 6 change events in one MySQL transaction TableId tableId = binlogSplit.getTableSchemas().keySet().iterator().next(); makeBinlogEventsInOneTransaction(sourceConfig, tableId.toString()); @@ -115,7 +132,10 @@ public void testBinlogReadFailoverCrossTransaction() throws Exception { "+U[103, user_3, Hangzhou, 123567891234]" }; // the 2 records are produced by 1 operations - List actualRecords = consumeRecords(reader, dataType, 1); + MySqlSourceReader reader = createReader(sourceConfig, 1); + reader.start(); + reader.addSplits(Arrays.asList(binlogSplit)); + List actualRecords = consumeRecords(reader, dataType); assertEqualsInOrder(Arrays.asList(expectedRecords), actualRecords); List splitsState = reader.snapshotState(1L); // check the binlog split state @@ -123,7 +143,7 @@ public void testBinlogReadFailoverCrossTransaction() throws Exception { reader.close(); // step-3: mock failover from a restored state - MySqlSourceReader restartReader = createReader(sourceConfig); + MySqlSourceReader restartReader = createReader(sourceConfig, 3); restartReader.start(); restartReader.addSplits(splitsState); @@ -136,7 +156,67 @@ public void testBinlogReadFailoverCrossTransaction() throws Exception { "+U[103, user_3, Shanghai, 123567891234]" }; // the 4 records are produced by 3 operations - List restRecords = consumeRecords(restartReader, dataType, 3); + List restRecords = consumeRecords(restartReader, dataType); + assertEqualsInOrder(Arrays.asList(expectedRestRecords), restRecords); + restartReader.close(); + } + + @Test + public void testBinlogReadFailoverCrossCheckpoint() throws Exception { + customerDatabase.createAndInitialize(); + final MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers"}); + final DataType dataType = + DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.BIGINT()), + DataTypes.FIELD("name", DataTypes.STRING()), + DataTypes.FIELD("address", DataTypes.STRING()), + DataTypes.FIELD("phone_number", DataTypes.STRING())); + MySqlSplit binlogSplit; + try (MySqlConnection jdbc = DebeziumUtils.createMySqlConnection(sourceConfig)) { + Map tableSchemas = + TableDiscoveryUtils.discoverCapturedTableSchemas(sourceConfig, jdbc); + binlogSplit = + MySqlBinlogSplit.fillTableSchemas( + createBinlogSplit(sourceConfig).asBinlogSplit(), tableSchemas); + } + + // step-1: make 6 change events in one MySQL transaction + TableId tableId = binlogSplit.getTableSchemas().keySet().iterator().next(); + makeBinlogEventsInOneTransaction(sourceConfig, tableId.toString()); + + // step-2: fetch the first 2 records belong to the snapshot split + String[] expectedRecords = + new String[] { + "-U[103, user_3, Shanghai, 123567891234]", + "+U[103, user_3, Hangzhou, 123567891234]" + }; + // the 2 records are produced by 1 operations + MySqlSourceReader reader = createReader(sourceConfig, 1); + reader.start(); + reader.addSplits(Arrays.asList(binlogSplit)); + List actualRecords = consumeRecords(reader, dataType); + assertEqualsInOrder(Arrays.asList(expectedRecords), actualRecords); + List splitsState = reader.snapshotState(1L); + + // check the snapshot split state + assertEquals(1, splitsState.size()); + reader.close(); + + // step-3: mock failover from a restored state + MySqlSourceReader restartReader = createReader(sourceConfig, 3); + restartReader.start(); + restartReader.addSplits(splitsState); + + // step-4: fetch all records belong to the snapshot split + String[] expectedRestRecords = + new String[] { + "-D[102, user_2, Shanghai, 123567891234]", + "+I[102, user_2, Shanghai, 123567891234]", + "-U[103, user_3, Hangzhou, 123567891234]", + "+U[103, user_3, Shanghai, 123567891234]" + }; + // the 4 records are produced by 3 operations + List restRecords = consumeRecords(restartReader, dataType); assertEqualsInOrder(Arrays.asList(expectedRestRecords), restRecords); restartReader.close(); } @@ -175,7 +255,8 @@ public void testNoDuplicateRecordsWhenKeepUpdating() throws Exception { final AtomicBoolean finishReading = new AtomicBoolean(false); final CountDownLatch updatingExecuted = new CountDownLatch(1); TestingReaderContext testingReaderContext = new TestingReaderContext(); - MySqlSourceReader reader = createReader(sourceConfig, testingReaderContext); + MySqlSourceReader reader = + createReader(sourceConfig, testingReaderContext, 0); reader.start(); Thread updateWorker = @@ -237,25 +318,31 @@ public void testNoDuplicateRecordsWhenKeepUpdating() throws Exception { } } - private MySqlSourceReader createReader(MySqlSourceConfig configuration) + private MySqlSourceReader createReader(MySqlSourceConfig configuration, int limit) throws Exception { - return createReader(configuration, new TestingReaderContext()); + return createReader(configuration, new TestingReaderContext(), limit); } private MySqlSourceReader createReader( - MySqlSourceConfig configuration, SourceReaderContext readerContext) throws Exception { - final FutureCompletingBlockingQueue> elementsQueue = + MySqlSourceConfig configuration, SourceReaderContext readerContext, int limit) + throws Exception { + final FutureCompletingBlockingQueue> elementsQueue = new FutureCompletingBlockingQueue<>(); // make SourceReaderContext#metricGroup compatible between Flink 1.13 and Flink 1.14 final Method metricGroupMethod = readerContext.getClass().getMethod("metricGroup"); metricGroupMethod.setAccessible(true); final MetricGroup metricGroup = (MetricGroup) metricGroupMethod.invoke(readerContext); - - final MySqlRecordEmitter recordEmitter = - new MySqlRecordEmitter<>( - new ForwardDeserializeSchema(), - new MySqlSourceReaderMetrics(metricGroup), - configuration.isIncludeSchemaChanges()); + final RecordEmitter recordEmitter = + limit > 0 + ? new MysqlLimitedRecordEmitter( + new ForwardDeserializeSchema(), + new MySqlSourceReaderMetrics(metricGroup), + configuration.isIncludeSchemaChanges(), + limit) + : new MySqlRecordEmitter<>( + new ForwardDeserializeSchema(), + new MySqlSourceReaderMetrics(metricGroup), + configuration.isIncludeSchemaChanges()); final MySqlSourceReaderContext mySqlSourceReaderContext = new MySqlSourceReaderContext(readerContext); return new MySqlSourceReader<>( @@ -314,11 +401,10 @@ private MySqlSourceConfig getConfig(String[] captureTables) { } private List consumeRecords( - MySqlSourceReader sourceReader, DataType recordType, int changeEventNum) - throws Exception { + MySqlSourceReader sourceReader, DataType recordType) throws Exception { // Poll all the n records of the single split. final SimpleReaderOutput output = new SimpleReaderOutput(); - while (output.getResults().size() < changeEventNum) { + while (output.getResults().size() == 0) { sourceReader.pollNext(output); } final RecordsFormatter formatter = new RecordsFormatter(recordType); @@ -381,4 +467,127 @@ public TypeInformation getProducedType() { return TypeInformation.of(SourceRecord.class); } } + + private static class MysqlLimitedRecordEmitter + implements RecordEmitter { + + private static final Logger LOG = LoggerFactory.getLogger(MySqlRecordEmitter.class); + private static final FlinkJsonTableChangeSerializer TABLE_CHANGE_SERIALIZER = + new FlinkJsonTableChangeSerializer(); + + private final DebeziumDeserializationSchema debeziumDeserializationSchema; + private final MySqlSourceReaderMetrics sourceReaderMetrics; + private final boolean includeSchemaChanges; + private final OutputCollector outputCollector; + private final int limit; + + public MysqlLimitedRecordEmitter( + DebeziumDeserializationSchema debeziumDeserializationSchema, + MySqlSourceReaderMetrics sourceReaderMetrics, + boolean includeSchemaChanges, + int limit) { + this.debeziumDeserializationSchema = debeziumDeserializationSchema; + this.sourceReaderMetrics = sourceReaderMetrics; + this.includeSchemaChanges = includeSchemaChanges; + this.outputCollector = new OutputCollector<>(); + Preconditions.checkState(limit > 0); + this.limit = limit; + } + + @Override + public void emitRecord( + SourceRecords sourceRecords, + SourceOutput output, + MySqlSplitState splitState) + throws Exception { + final Iterator elementIterator = + sourceRecords.getSourceRecordList().iterator(); + int sendCnt = 0; + while (elementIterator.hasNext()) { + if (sendCnt >= limit) { + return; + } + processElement(elementIterator.next(), output, splitState); + sendCnt++; + } + } + + private void processElement( + SourceRecord element, SourceOutput output, MySqlSplitState splitState) + throws Exception { + if (isWatermarkEvent(element)) { + BinlogOffset watermark = getWatermark(element); + if (isHighWatermarkEvent(element) && splitState.isSnapshotSplitState()) { + splitState.asSnapshotSplitState().setHighWatermark(watermark); + } + } else if (isSchemaChangeEvent(element) && splitState.isBinlogSplitState()) { + HistoryRecord historyRecord = getHistoryRecord(element); + Array tableChanges = + historyRecord.document().getArray(HistoryRecord.Fields.TABLE_CHANGES); + TableChanges changes = TABLE_CHANGE_SERIALIZER.deserialize(tableChanges, true); + for (TableChanges.TableChange tableChange : changes) { + splitState.asBinlogSplitState().recordSchema(tableChange.getId(), tableChange); + } + if (includeSchemaChanges) { + BinlogOffset position = getBinlogPosition(element); + splitState.asBinlogSplitState().setStartingOffset(position); + emitElement(element, output); + } + } else if (isDataChangeRecord(element)) { + updateStartingOffsetForSplit(splitState, element); + reportMetrics(element); + emitElement(element, output); + } else if (isHeartbeatEvent(element)) { + updateStartingOffsetForSplit(splitState, element); + } else { + // unknown element + LOG.info("Meet unknown element {}, just skip.", element); + } + } + + private void updateStartingOffsetForSplit( + MySqlSplitState splitState, SourceRecord element) { + if (splitState.isBinlogSplitState()) { + BinlogOffset position = getBinlogPosition(element); + splitState.asBinlogSplitState().setStartingOffset(position); + } + } + + private void emitElement(SourceRecord element, SourceOutput output) + throws Exception { + outputCollector.output = output; + debeziumDeserializationSchema.deserialize(element, outputCollector); + } + + private void reportMetrics(SourceRecord element) { + long now = System.currentTimeMillis(); + // record the latest process time + sourceReaderMetrics.recordProcessTime(now); + Long messageTimestamp = getMessageTimestamp(element); + + if (messageTimestamp != null && messageTimestamp > 0L) { + // report fetch delay + Long fetchTimestamp = getFetchTimestamp(element); + if (fetchTimestamp != null && fetchTimestamp >= messageTimestamp) { + sourceReaderMetrics.recordFetchDelay(fetchTimestamp - messageTimestamp); + } + // report emit delay + sourceReaderMetrics.recordEmitDelay(now - messageTimestamp); + } + } + + private static class OutputCollector implements Collector { + private SourceOutput output; + + @Override + public void collect(T record) { + output.collect(record); + } + + @Override + public void close() { + // do nothing + } + } + } } From 9fcad5b6a7febdac61945d973a7cd8e148563ec8 Mon Sep 17 00:00:00 2001 From: Leonard Xu Date: Wed, 24 Aug 2022 20:34:19 +0800 Subject: [PATCH 2/3] address comments address comments --- .../debezium/reader/BinlogSplitReader.java | 2 +- .../source/reader/MySqlRecordEmitter.java | 3 +- .../mysql/source/split/SourceRecords.java | 12 ++-- .../mysql/source/MySqlSourceITCase.java | 7 +- .../source/reader/MySqlSourceReaderTest.java | 67 ++----------------- 5 files changed, 15 insertions(+), 76 deletions(-) diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java index 270caa5572..4e2be2e87f 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java @@ -156,7 +156,7 @@ public Iterator pollSplitRecords() throws InterruptedException { sourceRecords.add(event.getRecord()); } } - Set sourceRecordsSet = new HashSet<>(); + List sourceRecordsSet = new ArrayList<>(); sourceRecordsSet.add(new SourceRecords(sourceRecords)); return sourceRecordsSet.iterator(); } else { diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java index 6fa2017521..52c9f771cd 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java @@ -78,8 +78,7 @@ public MySqlRecordEmitter( public void emitRecord( SourceRecords sourceRecords, SourceOutput output, MySqlSplitState splitState) throws Exception { - final Iterator elementIterator = - sourceRecords.getSourceRecordList().iterator(); + final Iterator elementIterator = sourceRecords.iterator(); while (elementIterator.hasNext()) { processElement(elementIterator.next(), output, splitState); } diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/split/SourceRecords.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/split/SourceRecords.java index ea6e65377c..d3317991ab 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/split/SourceRecords.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/split/SourceRecords.java @@ -1,11 +1,9 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Copyright 2022 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceITCase.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceITCase.java index fda3262717..0af6d291a9 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceITCase.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceITCase.java @@ -306,7 +306,7 @@ public void testSnapshotSplitReadingFailCrossCheckpoints() throws Exception { DataStreamSource source = env.fromSource(sleepingSource, WatermarkStrategy.noWatermarks(), "selfSource"); - String[] snapshotForSingleTable = + String[] expectedSnapshotData = new String[] { "+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", @@ -358,11 +358,10 @@ public void testSnapshotSplitReadingFailCrossCheckpoints() throws Exception { () -> sleepMs(100)); } - List expectedSnapshotData = new ArrayList<>(Arrays.asList(snapshotForSingleTable)); - // Check all snapshot records are sent with exactly-once semantics assertEqualsInAnyOrder( - expectedSnapshotData, fetchRowData(iterator, expectedSnapshotData.size())); + Arrays.asList(expectedSnapshotData), + fetchRowData(iterator, expectedSnapshotData.length)); jobClient.cancel().get(); } diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java index f15668b5ed..b0d54b19f3 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java @@ -161,66 +161,6 @@ public void testBinlogReadFailoverCrossTransaction() throws Exception { restartReader.close(); } - @Test - public void testBinlogReadFailoverCrossCheckpoint() throws Exception { - customerDatabase.createAndInitialize(); - final MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers"}); - final DataType dataType = - DataTypes.ROW( - DataTypes.FIELD("id", DataTypes.BIGINT()), - DataTypes.FIELD("name", DataTypes.STRING()), - DataTypes.FIELD("address", DataTypes.STRING()), - DataTypes.FIELD("phone_number", DataTypes.STRING())); - MySqlSplit binlogSplit; - try (MySqlConnection jdbc = DebeziumUtils.createMySqlConnection(sourceConfig)) { - Map tableSchemas = - TableDiscoveryUtils.discoverCapturedTableSchemas(sourceConfig, jdbc); - binlogSplit = - MySqlBinlogSplit.fillTableSchemas( - createBinlogSplit(sourceConfig).asBinlogSplit(), tableSchemas); - } - - // step-1: make 6 change events in one MySQL transaction - TableId tableId = binlogSplit.getTableSchemas().keySet().iterator().next(); - makeBinlogEventsInOneTransaction(sourceConfig, tableId.toString()); - - // step-2: fetch the first 2 records belong to the snapshot split - String[] expectedRecords = - new String[] { - "-U[103, user_3, Shanghai, 123567891234]", - "+U[103, user_3, Hangzhou, 123567891234]" - }; - // the 2 records are produced by 1 operations - MySqlSourceReader reader = createReader(sourceConfig, 1); - reader.start(); - reader.addSplits(Arrays.asList(binlogSplit)); - List actualRecords = consumeRecords(reader, dataType); - assertEqualsInOrder(Arrays.asList(expectedRecords), actualRecords); - List splitsState = reader.snapshotState(1L); - - // check the snapshot split state - assertEquals(1, splitsState.size()); - reader.close(); - - // step-3: mock failover from a restored state - MySqlSourceReader restartReader = createReader(sourceConfig, 3); - restartReader.start(); - restartReader.addSplits(splitsState); - - // step-4: fetch all records belong to the snapshot split - String[] expectedRestRecords = - new String[] { - "-D[102, user_2, Shanghai, 123567891234]", - "+I[102, user_2, Shanghai, 123567891234]", - "-U[103, user_3, Hangzhou, 123567891234]", - "+U[103, user_3, Shanghai, 123567891234]" - }; - // the 4 records are produced by 3 operations - List restRecords = consumeRecords(restartReader, dataType); - assertEqualsInOrder(Arrays.asList(expectedRestRecords), restRecords); - restartReader.close(); - } - @Test public void testNoDuplicateRecordsWhenKeepUpdating() throws Exception { inventoryDatabase.createAndInitialize(); @@ -468,6 +408,10 @@ public TypeInformation getProducedType() { } } + /** + * A implementation of {@link RecordEmitter} which only emit records in given limit number, this + * class is used for test purpose. + */ private static class MysqlLimitedRecordEmitter implements RecordEmitter { @@ -500,8 +444,7 @@ public void emitRecord( SourceOutput output, MySqlSplitState splitState) throws Exception { - final Iterator elementIterator = - sourceRecords.getSourceRecordList().iterator(); + final Iterator elementIterator = sourceRecords.iterator(); int sendCnt = 0; while (elementIterator.hasNext()) { if (sendCnt >= limit) { From a8b30772684d6988f77cef2b72a2a148abce83dd Mon Sep 17 00:00:00 2001 From: Leonard Xu Date: Thu, 25 Aug 2022 10:28:03 +0800 Subject: [PATCH 3/3] address comments --- .../connectors/mysql/debezium/reader/SnapshotSplitReader.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java index 14772beb80..a1ea735d69 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java @@ -50,11 +50,9 @@ import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; @@ -292,7 +290,7 @@ public Iterator pollSplitRecords() throws InterruptedException { normalizedRecords.addAll(formatMessageTimestamp(snapshotRecords.values())); normalizedRecords.add(highWatermark); - final Set sourceRecordsSet = new HashSet<>(); + final List sourceRecordsSet = new ArrayList<>(); sourceRecordsSet.add(new SourceRecords(normalizedRecords)); return sourceRecordsSet.iterator(); }