diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java index dff21c6773..ff8cd2e1a9 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java @@ -43,13 +43,14 @@ import java.util.ArrayList; import java.util.Collection; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Optional; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; @@ -106,7 +107,7 @@ public MySqlSnapshotSplitAssigner( currentParallelism, new ArrayList<>(), new ArrayList<>(), - new HashMap<>(), + new LinkedHashMap<>(), new HashMap<>(), new HashMap<>(), AssignerStatus.INITIAL_ASSIGNING, @@ -152,7 +153,17 @@ private MySqlSnapshotSplitAssigner( this.currentParallelism = currentParallelism; this.alreadyProcessedTables = alreadyProcessedTables; this.remainingSplits = new CopyOnWriteArrayList<>(remainingSplits); - this.assignedSplits = assignedSplits; + // When job restore from savepoint, sort the existing tables and newly added tables + // to let enumerator only send newly added tables' BinlogSplitMetaEvent + this.assignedSplits = + assignedSplits.entrySet().stream() + .sorted(Entry.comparingByKey()) + .collect( + Collectors.toMap( + Entry::getKey, + Entry::getValue, + (o, o2) -> o, + LinkedHashMap::new)); this.tableSchemas = tableSchemas; this.splitFinishedOffsets = splitFinishedOffsets; this.assignerStatus = assignerStatus; @@ -232,7 +243,7 @@ private void captureNewlyAddedTables() { // remove unassigned tables/splits if it does not satisfy new table filter List splitsToRemove = new LinkedList<>(); - for (Map.Entry splitEntry : + for (Entry splitEntry : assignedSplits.entrySet()) { if (tablesToRemove.contains(splitEntry.getValue().getTableId())) { splitsToRemove.add(splitEntry.getKey()); @@ -367,9 +378,7 @@ public List getFinishedSplitInfos() { "The assigner is not ready to offer finished split information, this should not be called"); } final List assignedSnapshotSplit = - assignedSplits.values().stream() - .sorted(Comparator.comparing(MySqlSplit::splitId)) - .collect(Collectors.toList()); + new ArrayList<>(assignedSplits.values()); List finishedSnapshotSplitInfos = new ArrayList<>(); for (MySqlSchemalessSnapshotSplit split : assignedSnapshotSplit) { BinlogOffset binlogOffset = splitFinishedOffsets.get(split.splitId()); diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java index 8a36d8a595..9055848b49 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java @@ -471,20 +471,24 @@ private MySqlBinlogSplit discoverTableSchemasForBinlogSplit( private Set getExistedSplitsOfLastGroup( List finishedSnapshotSplits, int metaGroupSize) { - Set existedSplitsOfLastGroup = new HashSet<>(); int splitsNumOfLastGroup = finishedSnapshotSplits.size() % sourceConfig.getSplitMetaGroupSize(); if (splitsNumOfLastGroup != 0) { int lastGroupStart = ((int) (finishedSnapshotSplits.size() / sourceConfig.getSplitMetaGroupSize())) * metaGroupSize; - existedSplitsOfLastGroup = - finishedSnapshotSplits - .subList(lastGroupStart, lastGroupStart + splitsNumOfLastGroup).stream() + // Keep same order with MySqlHybridSplitAssigner.createBinlogSplit() to avoid + // 'invalid request meta group id' error + List sortedFinishedSnapshotSplits = + finishedSnapshotSplits.stream() .map(FinishedSnapshotSplitInfo::getSplitId) - .collect(Collectors.toSet()); + .sorted() + .collect(Collectors.toList()); + return new HashSet<>( + sortedFinishedSnapshotSplits.subList( + lastGroupStart, lastGroupStart + splitsNumOfLastGroup)); } - return existedSplitsOfLastGroup; + return new HashSet<>(); } private void logCurrentBinlogOffsets(List splits, long checkpointId) {