Skip to content

Commit

Permalink
[FLINK-30552][Connector/Pulsar] Drop next message id calculation, use…
Browse files Browse the repository at this point in the history
… resetCursor api to exclude the given message.
  • Loading branch information
syhily committed Jan 3, 2023
1 parent d8f6056 commit 1044b8b
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.flink.util.FlinkRuntimeException;

import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.MessageId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -218,37 +217,11 @@ private void createSubscription(List<TopicPartition> newPartitions) {
for (TopicPartition partition : newPartitions) {
String topicName = partition.getFullTopicName();
String subscriptionName = sourceConfiguration.getSubscriptionName();
CursorPosition position =
startCursor.position(partition.getTopic(), partition.getPartitionId());

List<String> subscriptions =
sneakyAdmin(() -> pulsarAdmin.topics().getSubscriptions(topicName));
if (!subscriptions.contains(subscriptionName)) {
CursorPosition position =
startCursor.position(partition.getTopic(), partition.getPartitionId());
MessageId initialPosition = queryInitialPosition(topicName, position);

sneakyAdmin(
() ->
pulsarAdmin
.topics()
.createSubscription(
topicName, subscriptionName, initialPosition));
}
}
}

/** Query the available message id from Pulsar. */
private MessageId queryInitialPosition(String topicName, CursorPosition position) {
CursorPosition.Type type = position.getType();
if (type == CursorPosition.Type.TIMESTAMP) {
return sneakyAdmin(
() ->
pulsarAdmin
.topics()
.getMessageIdByTimestamp(topicName, position.getTimestamp()));
} else if (type == CursorPosition.Type.MESSAGE_ID) {
return position.getMessageId();
} else {
throw new UnsupportedOperationException("We don't support this seek type " + type);
sneakyAdmin(
() -> position.createInitialPosition(pulsarAdmin, topicName, subscriptionName));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,16 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumerator;
import org.apache.flink.connector.pulsar.source.reader.PulsarPartitionSplitReader;

import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.ChunkMessageIdImpl;

import java.io.Serializable;
import java.util.List;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand All @@ -37,46 +43,95 @@ public final class CursorPosition implements Serializable {
private final Type type;

private final MessageId messageId;
private final boolean include;

private final Long timestamp;

public CursorPosition(MessageId messageId) {
/**
* Start consuming from the given message id. The message id couldn't be the {@code
* MultipleMessageIdImpl}.
*
* @param include Whether the cosponsored position will be (in/ex)cluded in the consuming
* result.
*/
public CursorPosition(MessageId messageId, boolean include) {
checkNotNull(messageId, "Message id couldn't be null.");

this.type = Type.MESSAGE_ID;
this.messageId = messageId;
this.include = include;
this.timestamp = null;
}

/**
* Start consuming from the given timestamp position. The cosponsored position will be included
* in the consuming result.
*/
public CursorPosition(Long timestamp) {
checkNotNull(timestamp, "Timestamp couldn't be null.");

this.type = Type.TIMESTAMP;
this.messageId = null;
this.include = true;
this.timestamp = timestamp;
}

/** This method is used to create the initial position in {@link PulsarSourceEnumerator}. */
@Internal
public Type getType() {
return type;
public boolean createInitialPosition(
PulsarAdmin pulsarAdmin, String topicName, String subscriptionName)
throws PulsarAdminException {
List<String> subscriptions = pulsarAdmin.topics().getSubscriptions(topicName);

if (!subscriptions.contains(subscriptionName)) {
pulsarAdmin
.topics()
.createSubscription(topicName, subscriptionName, MessageId.earliest);

// Reset cursor to desired position.
MessageId initialPosition = getMessageId(pulsarAdmin, topicName);
pulsarAdmin
.topics()
.resetCursor(topicName, subscriptionName, initialPosition, !include);

return true;
}

return false;
}

/**
* This method is used to reset the consuming position in {@link PulsarPartitionSplitReader}.
*/
@Internal
public MessageId getMessageId() {
return messageId;
public void seekPosition(PulsarAdmin pulsarAdmin, String topicName, String subscriptionName)
throws PulsarAdminException {
if (!createInitialPosition(pulsarAdmin, topicName, subscriptionName)) {
// Reset cursor to desired position.
MessageId initialPosition = getMessageId(pulsarAdmin, topicName);
pulsarAdmin
.topics()
.resetCursor(topicName, subscriptionName, initialPosition, !include);
}
}

@Internal
public Long getTimestamp() {
return timestamp;
private MessageId getMessageId(PulsarAdmin pulsarAdmin, String topicName)
throws PulsarAdminException {
if (type == Type.TIMESTAMP) {
return pulsarAdmin.topics().getMessageIdByTimestamp(topicName, timestamp);
} else if (messageId instanceof ChunkMessageIdImpl) {
return ((ChunkMessageIdImpl) messageId).getFirstChunkMessageId();
} else {
return messageId;
}
}

@Override
public String toString() {
if (type == Type.TIMESTAMP) {
return "timestamp: " + timestamp;
} else {
return "message id: " + messageId;
return "message id: " + messageId + " include: " + include;
}
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,41 +24,30 @@
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.MessageId;

import static org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.nextMessageId;
import static org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.unwrapMessageId;

/** This cursor would leave pulsar start consuming from a specific message id. */
public class MessageIdStartCursor implements StartCursor {
private static final long serialVersionUID = -8057345435887170111L;

private final MessageId messageId;
private final boolean inclusive;

/**
* The default {@code inclusive} behavior should be controlled in {@link
* ConsumerBuilder#startMessageIdInclusive}. But pulsar has a bug and don't support this
* currently. We have to use {@code entry + 1} policy for consuming the next available message.
* If the message id entry is not valid. Pulsar would automatically find next valid message id.
* Please referer <a
* href="https://github.com/apache/pulsar/blob/36d5738412bb1ed9018178007bf63d9202b675db/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L1151">this
* code</a> for understanding pulsar internal logic.
* ConsumerBuilder#startMessageIdInclusive}. But pulsar has a bug and doesn't support this
* feature currently. We have to use admin API to reset the cursor instead.
*
* @param messageId The message id for start position.
* @param inclusive Whether we include the start message id in consuming result. This works only
* if we provide a specified message id instead of {@link MessageId#earliest} or {@link
* @param inclusive Whether we include the start message id in the consuming result. This works
* only if we provide a specified message id instead of {@link MessageId#earliest} or {@link
* MessageId#latest}.
*/
public MessageIdStartCursor(MessageId messageId, boolean inclusive) {
if (MessageId.earliest.equals(messageId)
|| MessageId.latest.equals(messageId)
|| inclusive) {
this.messageId = unwrapMessageId(messageId);
} else {
this.messageId = nextMessageId(messageId);
}
this.messageId = messageId;
this.inclusive = inclusive;
}

@Override
public CursorPosition position(String topic, int partitionId) {
return new CursorPosition(messageId);
return new CursorPosition(messageId, inclusive);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;

import static org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.unwrapMessageId;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.pulsar.client.api.MessageId.earliest;
import static org.apache.pulsar.client.api.MessageId.latest;
import static org.apache.pulsar.client.impl.MessageIdImpl.convertToMessageIdImpl;

/**
* Stop consuming message at a given message id. We use the {@link MessageId#compareTo(Object)} for
Expand All @@ -43,7 +43,7 @@ public MessageIdStopCursor(MessageId messageId, boolean inclusive) {
checkArgument(!earliest.equals(messageId), "MessageId.earliest is not supported.");
checkArgument(!latest.equals(messageId), "Use LatestMessageStopCursor instead.");

this.messageId = unwrapMessageId(messageId);
this.messageId = convertToMessageIdImpl(messageId);
this.inclusive = inclusive;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.CursorPosition;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor.StopCondition;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
Expand Down Expand Up @@ -78,7 +79,6 @@
import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
import static org.apache.flink.connector.pulsar.source.config.CursorVerification.FAIL_ON_MISMATCH;
import static org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils.createConsumerBuilder;
import static org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.nextMessageId;
import static org.apache.flink.connector.pulsar.source.enumerator.topic.range.TopicRangeUtils.isFullTopicRanges;
import static org.apache.pulsar.client.api.KeySharedPolicy.stickyHashRange;

Expand Down Expand Up @@ -186,31 +186,22 @@ public void handleSplitsChanges(SplitsChange<PulsarPartitionSplit> splitsChanges
if (latestConsumedId != null) {
LOG.info("Reset subscription position by the checkpoint {}", latestConsumedId);
try {
MessageId initialPosition;
CursorPosition cursorPosition;
if (latestConsumedId == MessageId.latest
|| latestConsumedId == MessageId.earliest) {
// for compatibility
initialPosition = latestConsumedId;
cursorPosition = new CursorPosition(latestConsumedId, true);
} else {
initialPosition = nextMessageId(latestConsumedId);
cursorPosition = new CursorPosition(latestConsumedId, false);
}

// Remove Consumer.seek() here for waiting for pulsar-client-all 2.12.0
// See https://github.com/apache/pulsar/issues/16757 for more details.

String topicName = registeredSplit.getPartition().getFullTopicName();
List<String> subscriptions = pulsarAdmin.topics().getSubscriptions(topicName);
String subscriptionName = sourceConfiguration.getSubscriptionName();

if (!subscriptions.contains(subscriptionName)) {
// If this subscription is not available. Just create it.
pulsarAdmin
.topics()
.createSubscription(topicName, subscriptionName, initialPosition);
} else {
// Reset the subscription if this is existed.
pulsarAdmin.topics().resetCursor(topicName, subscriptionName, initialPosition);
}
// Remove Consumer.seek() here for waiting for pulsar-client-all 2.12.0
// See https://github.com/apache/pulsar/issues/16757 for more details.

cursorPosition.seekPosition(pulsarAdmin, topicName, subscriptionName);
} catch (PulsarAdminException e) {
if (sourceConfiguration.getVerifyInitialOffsets() == FAIL_ON_MISMATCH) {
throw new IllegalArgumentException(e);
Expand Down

0 comments on commit 1044b8b

Please sign in to comment.