Skip to content

Commit

Permalink
[hotfix][mysql] Keep assigned splits in order to fix wrong meta group…
Browse files Browse the repository at this point in the history
… calculation

This closes apache#2421.

Co-authored-by: Leonard Xu <[email protected]>
(cherry picked from commit dddab4d)
  • Loading branch information
qg-lin authored and zhongqishang committed Dec 7, 2023
1 parent 16f4412 commit 6c62e31
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,14 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
Expand Down Expand Up @@ -106,7 +107,7 @@ public MySqlSnapshotSplitAssigner(
currentParallelism,
new ArrayList<>(),
new ArrayList<>(),
new HashMap<>(),
new LinkedHashMap<>(),
new HashMap<>(),
new HashMap<>(),
AssignerStatus.INITIAL_ASSIGNING,
Expand Down Expand Up @@ -152,7 +153,17 @@ private MySqlSnapshotSplitAssigner(
this.currentParallelism = currentParallelism;
this.alreadyProcessedTables = alreadyProcessedTables;
this.remainingSplits = new CopyOnWriteArrayList<>(remainingSplits);
this.assignedSplits = assignedSplits;
// When job restore from savepoint, sort the existing tables and newly added tables
// to let enumerator only send newly added tables' BinlogSplitMetaEvent
this.assignedSplits =
assignedSplits.entrySet().stream()
.sorted(Entry.comparingByKey())
.collect(
Collectors.toMap(
Entry::getKey,
Entry::getValue,
(o, o2) -> o,
LinkedHashMap::new));
this.tableSchemas = tableSchemas;
this.splitFinishedOffsets = splitFinishedOffsets;
this.assignerStatus = assignerStatus;
Expand Down Expand Up @@ -232,7 +243,7 @@ private void captureNewlyAddedTables() {

// remove unassigned tables/splits if it does not satisfy new table filter
List<String> splitsToRemove = new LinkedList<>();
for (Map.Entry<String, MySqlSchemalessSnapshotSplit> splitEntry :
for (Entry<String, MySqlSchemalessSnapshotSplit> splitEntry :
assignedSplits.entrySet()) {
if (tablesToRemove.contains(splitEntry.getValue().getTableId())) {
splitsToRemove.add(splitEntry.getKey());
Expand Down Expand Up @@ -367,9 +378,7 @@ public List<FinishedSnapshotSplitInfo> getFinishedSplitInfos() {
"The assigner is not ready to offer finished split information, this should not be called");
}
final List<MySqlSchemalessSnapshotSplit> assignedSnapshotSplit =
assignedSplits.values().stream()
.sorted(Comparator.comparing(MySqlSplit::splitId))
.collect(Collectors.toList());
new ArrayList<>(assignedSplits.values());
List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = new ArrayList<>();
for (MySqlSchemalessSnapshotSplit split : assignedSnapshotSplit) {
BinlogOffset binlogOffset = splitFinishedOffsets.get(split.splitId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,20 +471,24 @@ private MySqlBinlogSplit discoverTableSchemasForBinlogSplit(

private Set<String> getExistedSplitsOfLastGroup(
List<FinishedSnapshotSplitInfo> finishedSnapshotSplits, int metaGroupSize) {
Set<String> existedSplitsOfLastGroup = new HashSet<>();
int splitsNumOfLastGroup =
finishedSnapshotSplits.size() % sourceConfig.getSplitMetaGroupSize();
if (splitsNumOfLastGroup != 0) {
int lastGroupStart =
((int) (finishedSnapshotSplits.size() / sourceConfig.getSplitMetaGroupSize()))
* metaGroupSize;
existedSplitsOfLastGroup =
finishedSnapshotSplits
.subList(lastGroupStart, lastGroupStart + splitsNumOfLastGroup).stream()
// Keep same order with MySqlHybridSplitAssigner.createBinlogSplit() to avoid
// 'invalid request meta group id' error
List<String> sortedFinishedSnapshotSplits =
finishedSnapshotSplits.stream()
.map(FinishedSnapshotSplitInfo::getSplitId)
.collect(Collectors.toSet());
.sorted()
.collect(Collectors.toList());
return new HashSet<>(
sortedFinishedSnapshotSplits.subList(
lastGroupStart, lastGroupStart + splitsNumOfLastGroup));
}
return existedSplitsOfLastGroup;
return new HashSet<>();
}

private void logCurrentBinlogOffsets(List<MySqlSplit> splits, long checkpointId) {
Expand Down

0 comments on commit 6c62e31

Please sign in to comment.