Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[mysql] Let the records of snapshot split don't cross checkpoints #1505

Merged
merged 3 commits into from
Aug 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.ververica.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
import com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
import com.ververica.cdc.connectors.mysql.source.split.SourceRecords;
import com.ververica.cdc.connectors.mysql.source.utils.ChunkUtils;
import com.ververica.cdc.connectors.mysql.source.utils.RecordUtils;
import io.debezium.connector.base.ChangeEventQueue;
Expand Down Expand Up @@ -63,7 +64,7 @@
* A Debezium binlog reader implementation that also support reads binlog and filter overlapping
* snapshot data that {@link SnapshotSplitReader} read.
*/
public class BinlogSplitReader implements DebeziumReader<SourceRecord, MySqlSplit> {
public class BinlogSplitReader implements DebeziumReader<SourceRecords, MySqlSplit> {

private static final Logger LOG = LoggerFactory.getLogger(BinlogSplitReader.class);
private final StatefulTaskContext statefulTaskContext;
Expand Down Expand Up @@ -145,7 +146,7 @@ public boolean isFinished() {

@Nullable
@Override
public Iterator<SourceRecord> pollSplitRecords() throws InterruptedException {
public Iterator<SourceRecords> pollSplitRecords() throws InterruptedException {
checkReadException();
final List<SourceRecord> sourceRecords = new ArrayList<>();
if (currentTaskRunning) {
Expand All @@ -155,7 +156,9 @@ public Iterator<SourceRecord> pollSplitRecords() throws InterruptedException {
sourceRecords.add(event.getRecord());
}
}
return sourceRecords.iterator();
List<SourceRecords> sourceRecordsSet = new ArrayList<>();
sourceRecordsSet.add(new SourceRecords(sourceRecords));
return sourceRecordsSet.iterator();
leonardBang marked this conversation as resolved.
Show resolved Hide resolved
} else {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
import com.ververica.cdc.connectors.mysql.source.split.SourceRecords;
import com.ververica.cdc.connectors.mysql.source.utils.RecordUtils;
import io.debezium.config.Configuration;
import io.debezium.connector.base.ChangeEventQueue;
Expand Down Expand Up @@ -71,7 +72,7 @@
* A snapshot reader that reads data from Table in split level, the split is assigned by primary key
* range.
*/
public class SnapshotSplitReader implements DebeziumReader<SourceRecord, MySqlSplit> {
public class SnapshotSplitReader implements DebeziumReader<SourceRecords, MySqlSplit> {

private static final Logger LOG = LoggerFactory.getLogger(SnapshotSplitReader.class);
private final StatefulTaskContext statefulTaskContext;
Expand Down Expand Up @@ -235,7 +236,7 @@ public boolean isFinished() {

@Nullable
@Override
public Iterator<SourceRecord> pollSplitRecords() throws InterruptedException {
public Iterator<SourceRecords> pollSplitRecords() throws InterruptedException {
checkReadException();

if (hasNextElement.get()) {
Expand Down Expand Up @@ -288,7 +289,10 @@ public Iterator<SourceRecord> pollSplitRecords() throws InterruptedException {
normalizedRecords.add(lowWatermark);
normalizedRecords.addAll(formatMessageTimestamp(snapshotRecords.values()));
normalizedRecords.add(highWatermark);
return normalizedRecords.iterator();

final List<SourceRecords> sourceRecordsSet = new ArrayList<>();
sourceRecordsSet.add(new SourceRecords(normalizedRecords));
return sourceRecordsSet.iterator();
}
// the data has been polled, no more data
reachEnd.compareAndSet(false, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@
import com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplitSerializer;
import com.ververica.cdc.connectors.mysql.source.split.SourceRecords;
import com.ververica.cdc.connectors.mysql.table.StartupMode;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
import org.apache.kafka.connect.source.SourceRecord;

import java.lang.reflect.Method;
import java.util.List;
Expand Down Expand Up @@ -133,7 +133,7 @@ public SourceReader<T, MySqlSplit> createReader(SourceReaderContext readerContex
// create source config for the given subtask (e.g. unique server id)
MySqlSourceConfig sourceConfig =
configFactory.createConfig(readerContext.getIndexOfSubtask());
FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>> elementsQueue =
FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecords>> elementsQueue =
new FutureCompletingBlockingQueue<>();

final Method metricGroupMethod = readerContext.getClass().getMethod("metricGroup");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.ververica.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplitState;
import com.ververica.cdc.connectors.mysql.source.split.SourceRecords;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.history.FlinkJsonTableChangeSerializer;
import io.debezium.document.Array;
Expand All @@ -32,6 +33,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Iterator;

import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getBinlogPosition;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getFetchTimestamp;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getHistoryRecord;
Expand All @@ -50,7 +53,7 @@
* emit records rather than emit the records directly.
*/
public final class MySqlRecordEmitter<T>
implements RecordEmitter<SourceRecord, T, MySqlSplitState> {
implements RecordEmitter<SourceRecords, T, MySqlSplitState> {

private static final Logger LOG = LoggerFactory.getLogger(MySqlRecordEmitter.class);
private static final FlinkJsonTableChangeSerializer TABLE_CHANGE_SERIALIZER =
Expand All @@ -72,7 +75,17 @@ public MySqlRecordEmitter(
}

@Override
public void emitRecord(SourceRecord element, SourceOutput<T> output, MySqlSplitState splitState)
public void emitRecord(
SourceRecords sourceRecords, SourceOutput<T> output, MySqlSplitState splitState)
throws Exception {
final Iterator<SourceRecord> elementIterator = sourceRecords.iterator();
while (elementIterator.hasNext()) {
processElement(elementIterator.next(), output, splitState);
}
}

private void processElement(
SourceRecord element, SourceOutput<T> output, MySqlSplitState splitState)
throws Exception {
if (isWatermarkEvent(element)) {
BinlogOffset watermark = getWatermark(element);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@
import com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplitState;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplitState;
import com.ververica.cdc.connectors.mysql.source.split.SourceRecords;
import com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -71,7 +71,7 @@
/** The source reader for MySQL source splits. */
public class MySqlSourceReader<T>
extends SingleThreadMultiplexSourceReaderBase<
SourceRecord, T, MySqlSplit, MySqlSplitState> {
SourceRecords, T, MySqlSplit, MySqlSplitState> {

private static final Logger LOG = LoggerFactory.getLogger(MySqlSourceReader.class);

Expand All @@ -83,9 +83,9 @@ public class MySqlSourceReader<T>
private MySqlBinlogSplit suspendedBinlogSplit;

public MySqlSourceReader(
FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>> elementQueue,
FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecords>> elementQueue,
Supplier<MySqlSplitReader> splitReaderSupplier,
RecordEmitter<SourceRecord, T, MySqlSplitState> recordEmitter,
RecordEmitter<SourceRecords, T, MySqlSplitState> recordEmitter,
Configuration config,
MySqlSourceReaderContext context,
MySqlSourceConfig sourceConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import com.ververica.cdc.connectors.mysql.source.split.MySqlRecords;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
import com.ververica.cdc.connectors.mysql.source.split.SourceRecords;
import io.debezium.connector.mysql.MySqlConnection;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -46,15 +46,15 @@
import static com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.createMySqlConnection;

/** The {@link SplitReader} implementation for the {@link MySqlSource}. */
public class MySqlSplitReader implements SplitReader<SourceRecord, MySqlSplit> {
public class MySqlSplitReader implements SplitReader<SourceRecords, MySqlSplit> {

private static final Logger LOG = LoggerFactory.getLogger(MySqlSplitReader.class);
private final Queue<MySqlSplit> splits;
private final MySqlSourceConfig sourceConfig;
private final int subtaskId;
private final MySqlSourceReaderContext context;

@Nullable private DebeziumReader<SourceRecord, MySqlSplit> currentReader;
@Nullable private DebeziumReader<SourceRecords, MySqlSplit> currentReader;
@Nullable private String currentSplitId;

public MySqlSplitReader(
Expand All @@ -66,12 +66,12 @@ public MySqlSplitReader(
}

@Override
public RecordsWithSplitIds<SourceRecord> fetch() throws IOException {
public RecordsWithSplitIds<SourceRecords> fetch() throws IOException {

checkSplitOrStartNext();
checkNeedStopBinlogReader();

Iterator<SourceRecord> dataIt;
Iterator<SourceRecords> dataIt;
try {
dataIt = currentReader.pollSplitRecords();
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;

import org.apache.kafka.connect.source.SourceRecord;

import javax.annotation.Nullable;

import java.util.Collections;
Expand All @@ -29,11 +27,11 @@
/**
* An implementation of {@link RecordsWithSplitIds} which contains the records of one table split.
*/
public final class MySqlRecords implements RecordsWithSplitIds<SourceRecord> {
public final class MySqlRecords implements RecordsWithSplitIds<SourceRecords> {

@Nullable private String splitId;
@Nullable private Iterator<SourceRecord> recordsForCurrentSplit;
@Nullable private final Iterator<SourceRecord> recordsForSplit;
@Nullable private Iterator<SourceRecords> recordsForCurrentSplit;
@Nullable private final Iterator<SourceRecords> recordsForSplit;
private final Set<String> finishedSnapshotSplits;

public MySqlRecords(
Expand All @@ -59,8 +57,8 @@ public String nextSplit() {

@Nullable
@Override
public SourceRecord nextRecordFromSplit() {
final Iterator<SourceRecord> recordsForSplit = this.recordsForCurrentSplit;
public SourceRecords nextRecordFromSplit() {
final Iterator<SourceRecords> recordsForSplit = this.recordsForCurrentSplit;
if (recordsForSplit != null) {
if (recordsForSplit.hasNext()) {
return recordsForSplit.next();
Expand All @@ -78,7 +76,7 @@ public Set<String> finishedSplits() {
}

public static MySqlRecords forRecords(
final String splitId, final Iterator<SourceRecord> recordsForSplit) {
final String splitId, final Iterator<SourceRecords> recordsForSplit) {
return new MySqlRecords(splitId, recordsForSplit, Collections.emptySet());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2022 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

leonardBang marked this conversation as resolved.
Show resolved Hide resolved
package com.ververica.cdc.connectors.mysql.source.split;

import org.apache.kafka.connect.source.SourceRecord;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

/** Data structure to describe a set of {@link SourceRecord}. */
public final class SourceRecords {

private final List<SourceRecord> sourceRecords;

public SourceRecords(List<SourceRecord> sourceRecords) {
this.sourceRecords = sourceRecords;
}

public List<SourceRecord> getSourceRecordList() {
return sourceRecords;
}

public Iterator<SourceRecord> iterator() {
return sourceRecords.iterator();
}

public static SourceRecords fromSingleRecord(SourceRecord record) {
final List<SourceRecord> records = new ArrayList<>();
records.add(record);
return new SourceRecords(records);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
import com.ververica.cdc.connectors.mysql.source.split.SourceRecords;
import com.ververica.cdc.connectors.mysql.source.utils.RecordUtils;
import com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
Expand Down Expand Up @@ -362,7 +363,7 @@ private MySqlBinlogSplit createBinlogSplitFromLatestOffset(MySqlSourceConfig sou
private List<SourceRecord> pollRecordsFromReader(
BinlogSplitReader reader, Predicate<SourceRecord> filter) {
List<SourceRecord> records = new ArrayList<>();
Iterator<SourceRecord> recordIterator;
Iterator<SourceRecords> recordIterator;
try {
recordIterator = reader.pollSplitRecords();
} catch (InterruptedException e) {
Expand All @@ -372,9 +373,12 @@ private List<SourceRecord> pollRecordsFromReader(
return records;
}
while (recordIterator.hasNext()) {
SourceRecord record = recordIterator.next();
if (filter.test(record)) {
records.add(record);
Iterator<SourceRecord> iterator = recordIterator.next().iterator();
while (iterator.hasNext()) {
SourceRecord record = iterator.next();
if (filter.test(record)) {
records.add(record);
}
}
}
LOG.debug("Records polled: {}", records);
Expand Down Expand Up @@ -434,11 +438,14 @@ private List<String> readBinlogSplits(
if (snapshotSplitReader.isFinished()) {
snapshotSplitReader.submitSplit(sqlSplit);
}
Iterator<SourceRecord> res;
Iterator<SourceRecords> res;
while ((res = snapshotSplitReader.pollSplitRecords()) != null) {
while (res.hasNext()) {
SourceRecord sourceRecord = res.next();
snapshotRecords.add(sourceRecord);
Iterator<SourceRecord> iterator = res.next().iterator();
while (iterator.hasNext()) {
SourceRecord sourceRecord = iterator.next();
snapshotRecords.add(sourceRecord);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
import com.ververica.cdc.connectors.mysql.source.split.SourceRecords;
import com.ververica.cdc.connectors.mysql.testutils.RecordsFormatter;
import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase;
import io.debezium.connector.mysql.MySqlConnection;
Expand Down Expand Up @@ -397,11 +398,11 @@ private List<String> readTableSnapshotSplits(
if (snapshotSplitReader.isFinished()) {
snapshotSplitReader.submitSplit(sqlSplit);
}
Iterator<SourceRecord> res;
Iterator<SourceRecords> res;
while ((res = snapshotSplitReader.pollSplitRecords()) != null) {
while (res.hasNext()) {
SourceRecord sourceRecord = res.next();
result.add(sourceRecord);
SourceRecords sourceRecords = res.next();
result.addAll(sourceRecords.getSourceRecordList());
}
}
}
Expand Down
Loading