diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/HybridSplitAssigner.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/HybridSplitAssigner.java index d5914234d6..24d3217273 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/HybridSplitAssigner.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/HybridSplitAssigner.java @@ -170,6 +170,7 @@ public Optional getNext() { } else if (isNewlyAddedAssigningFinished(snapshotSplitAssigner.getAssignerStatus())) { // do not need to create stream split, but send event to wake up the binlog reader isStreamSplitAssigned = true; + enumeratorMetrics.enterStreamReading(); return Optional.empty(); } else { // stream split is not ready by now diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializer.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializer.java index c8d7a6e488..19612339e3 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializer.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializer.java @@ -34,8 +34,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -366,7 +366,7 @@ private Map> readFinishedSplits(DataInputDeserializer in) for (int i = 0; i < size; i++) { String tableIdStr = in.readUTF(); int splitIdSize = in.readInt(); - Set splitIds = Collections.newSetFromMap(new HashMap<>()); + Set splitIds = new HashSet<>(); for (int j = 0; j < splitIdSize; j++) { splitIds.add(in.readUTF()); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/metrics/SourceReaderMetrics.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/metrics/SourceReaderMetrics.java index 3da6da2508..aaba982251 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/metrics/SourceReaderMetrics.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/metrics/SourceReaderMetrics.java @@ -17,6 +17,7 @@ package org.apache.flink.cdc.connectors.base.source.metrics; +import org.apache.flink.cdc.connectors.base.source.metrics.SourceEnumeratorMetrics.TableMetrics; import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; @@ -79,9 +80,8 @@ public class SourceReaderMetrics { /** The total number of record that failed to consume, process or emit. */ private final Counter numRecordsInErrorsCounter; - + /** The timestamp of the last record received. */ private volatile long lastReceivedEventTime = UNDEFINED; - private volatile long currentReadTimestampMs = UNDEFINED; public SourceReaderMetrics(SourceReaderMetricGroup metricGroup) { this.metricGroup = metricGroup; @@ -89,7 +89,6 @@ public SourceReaderMetrics(SourceReaderMetricGroup metricGroup) { metricGroup.gauge( MetricNames.CURRENT_FETCH_EVENT_TIME_LAG, (Gauge) this::getFetchDelay); - metricGroup.gauge(CURRENT_READ_TIMESTAMP_MS, () -> currentReadTimestampMs); metricGroup.gauge(CURRENT_EVENT_TIME_LAG, this::getCurrentEventTimeLag); snapshotCounter = metricGroup.counter(NUM_SNAPSHOT_RECORDS); @@ -118,7 +117,11 @@ public void updateLastReceivedEventTime(Long eventTimestamp) { } public void markRecord() { - metricGroup.getIOMetricGroup().getNumRecordsInCounter().inc(); + try { + metricGroup.getIOMetricGroup().getNumRecordsInCounter().inc(); + } catch (Exception e) { + LOG.warn("Failed to update record counters.", e); + } } public void updateRecordCounters(SourceRecord record) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java index f3a0646250..25ebb8aaa4 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java @@ -172,11 +172,7 @@ protected void reportMetrics(SourceRecord element) { } } - /** - * Collector for outputting records. - * - * @param - */ + /** An adapter between {@link SourceOutput} and {@link Collector}. */ protected static class OutputCollector implements Collector { public SourceOutput output; public Long currentMessageTimestamp;