Skip to content

Commit

Permalink
[FLINK-36315][cdc-base]add enterStreamReading while isNewlyAddedAssig…
Browse files Browse the repository at this point in the history
…ningFinished
  • Loading branch information
molin.lxd committed Oct 8, 2024
1 parent 6c00324 commit d5c663e
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ public Optional<SourceSplitBase> getNext() {
} else if (isNewlyAddedAssigningFinished(snapshotSplitAssigner.getAssignerStatus())) {
// do not need to create stream split, but send event to wake up the binlog reader
isStreamSplitAssigned = true;
enumeratorMetrics.enterStreamReading();
return Optional.empty();
} else {
// stream split is not ready by now
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -366,7 +366,7 @@ private Map<TableId, Set<String>> readFinishedSplits(DataInputDeserializer in)
for (int i = 0; i < size; i++) {
String tableIdStr = in.readUTF();
int splitIdSize = in.readInt();
Set<String> splitIds = Collections.newSetFromMap(new HashMap<>());
Set<String> splitIds = new HashSet<>();
for (int j = 0; j < splitIdSize; j++) {
splitIds.add(in.readUTF());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.cdc.connectors.base.source.metrics;

import org.apache.flink.cdc.connectors.base.source.metrics.SourceEnumeratorMetrics.TableMetrics;
import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
Expand Down Expand Up @@ -79,17 +80,15 @@ public class SourceReaderMetrics {

/** The total number of record that failed to consume, process or emit. */
private final Counter numRecordsInErrorsCounter;

/** The timestamp of the last record received. */
private volatile long lastReceivedEventTime = UNDEFINED;
private volatile long currentReadTimestampMs = UNDEFINED;

public SourceReaderMetrics(SourceReaderMetricGroup metricGroup) {
this.metricGroup = metricGroup;
this.numRecordsInErrorsCounter = metricGroup.getNumRecordsInErrorsCounter();

metricGroup.gauge(
MetricNames.CURRENT_FETCH_EVENT_TIME_LAG, (Gauge<Long>) this::getFetchDelay);
metricGroup.gauge(CURRENT_READ_TIMESTAMP_MS, () -> currentReadTimestampMs);
metricGroup.gauge(CURRENT_EVENT_TIME_LAG, this::getCurrentEventTimeLag);

snapshotCounter = metricGroup.counter(NUM_SNAPSHOT_RECORDS);
Expand Down Expand Up @@ -118,7 +117,11 @@ public void updateLastReceivedEventTime(Long eventTimestamp) {
}

public void markRecord() {
metricGroup.getIOMetricGroup().getNumRecordsInCounter().inc();
try {
metricGroup.getIOMetricGroup().getNumRecordsInCounter().inc();
} catch (Exception e) {
LOG.warn("Failed to update record counters.", e);
}
}

public void updateRecordCounters(SourceRecord record) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,7 @@ protected void reportMetrics(SourceRecord element) {
}
}

/**
* Collector for outputting records.
*
* @param <T>
*/
/** An adapter between {@link SourceOutput} and {@link Collector}. */
protected static class OutputCollector<T> implements Collector<T> {
public SourceOutput<T> output;
public Long currentMessageTimestamp;
Expand Down

0 comments on commit d5c663e

Please sign in to comment.