From 1044b8b32246ac942604aa937681d2393726c179 Mon Sep 17 00:00:00 2001 From: Yufan Sheng Date: Tue, 3 Jan 2023 22:17:43 +0800 Subject: [PATCH] [FLINK-30552][Connector/Pulsar] Drop next message id calculation, use resetCursor api to exclude the given message. --- .../enumerator/PulsarSourceEnumerator.java | 35 +-------- .../enumerator/cursor/CursorPosition.java | 73 ++++++++++++++++--- .../enumerator/cursor/MessageIdUtils.java | 73 ------------------- .../cursor/start/MessageIdStartCursor.java | 27 ++----- .../cursor/stop/MessageIdStopCursor.java | 4 +- .../reader/PulsarPartitionSplitReader.java | 25 ++----- 6 files changed, 86 insertions(+), 151 deletions(-) delete mode 100644 flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/MessageIdUtils.java diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java index ff1f073c..8fd39751 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java @@ -34,7 +34,6 @@ import org.apache.flink.util.FlinkRuntimeException; import org.apache.pulsar.client.admin.PulsarAdmin; -import org.apache.pulsar.client.api.MessageId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -218,37 +217,11 @@ private void createSubscription(List newPartitions) { for (TopicPartition partition : newPartitions) { String topicName = partition.getFullTopicName(); String subscriptionName = sourceConfiguration.getSubscriptionName(); + CursorPosition position = + startCursor.position(partition.getTopic(), partition.getPartitionId()); - List subscriptions = - sneakyAdmin(() -> pulsarAdmin.topics().getSubscriptions(topicName)); - if (!subscriptions.contains(subscriptionName)) { - CursorPosition position = - startCursor.position(partition.getTopic(), partition.getPartitionId()); - MessageId initialPosition = queryInitialPosition(topicName, position); - - sneakyAdmin( - () -> - pulsarAdmin - .topics() - .createSubscription( - topicName, subscriptionName, initialPosition)); - } - } - } - - /** Query the available message id from Pulsar. */ - private MessageId queryInitialPosition(String topicName, CursorPosition position) { - CursorPosition.Type type = position.getType(); - if (type == CursorPosition.Type.TIMESTAMP) { - return sneakyAdmin( - () -> - pulsarAdmin - .topics() - .getMessageIdByTimestamp(topicName, position.getTimestamp())); - } else if (type == CursorPosition.Type.MESSAGE_ID) { - return position.getMessageId(); - } else { - throw new UnsupportedOperationException("We don't support this seek type " + type); + sneakyAdmin( + () -> position.createInitialPosition(pulsarAdmin, topicName, subscriptionName)); } } diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java index 55b55132..8d8ee2cd 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java @@ -20,10 +20,16 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumerator; +import org.apache.flink.connector.pulsar.source.reader.PulsarPartitionSplitReader; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.impl.ChunkMessageIdImpl; import java.io.Serializable; +import java.util.List; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -37,38 +43,87 @@ public final class CursorPosition implements Serializable { private final Type type; private final MessageId messageId; + private final boolean include; private final Long timestamp; - public CursorPosition(MessageId messageId) { + /** + * Start consuming from the given message id. The message id couldn't be the {@code + * MultipleMessageIdImpl}. + * + * @param include Whether the cosponsored position will be (in/ex)cluded in the consuming + * result. + */ + public CursorPosition(MessageId messageId, boolean include) { checkNotNull(messageId, "Message id couldn't be null."); this.type = Type.MESSAGE_ID; this.messageId = messageId; + this.include = include; this.timestamp = null; } + /** + * Start consuming from the given timestamp position. The cosponsored position will be included + * in the consuming result. + */ public CursorPosition(Long timestamp) { checkNotNull(timestamp, "Timestamp couldn't be null."); this.type = Type.TIMESTAMP; this.messageId = null; + this.include = true; this.timestamp = timestamp; } + /** This method is used to create the initial position in {@link PulsarSourceEnumerator}. */ @Internal - public Type getType() { - return type; + public boolean createInitialPosition( + PulsarAdmin pulsarAdmin, String topicName, String subscriptionName) + throws PulsarAdminException { + List subscriptions = pulsarAdmin.topics().getSubscriptions(topicName); + + if (!subscriptions.contains(subscriptionName)) { + pulsarAdmin + .topics() + .createSubscription(topicName, subscriptionName, MessageId.earliest); + + // Reset cursor to desired position. + MessageId initialPosition = getMessageId(pulsarAdmin, topicName); + pulsarAdmin + .topics() + .resetCursor(topicName, subscriptionName, initialPosition, !include); + + return true; + } + + return false; } + /** + * This method is used to reset the consuming position in {@link PulsarPartitionSplitReader}. + */ @Internal - public MessageId getMessageId() { - return messageId; + public void seekPosition(PulsarAdmin pulsarAdmin, String topicName, String subscriptionName) + throws PulsarAdminException { + if (!createInitialPosition(pulsarAdmin, topicName, subscriptionName)) { + // Reset cursor to desired position. + MessageId initialPosition = getMessageId(pulsarAdmin, topicName); + pulsarAdmin + .topics() + .resetCursor(topicName, subscriptionName, initialPosition, !include); + } } - @Internal - public Long getTimestamp() { - return timestamp; + private MessageId getMessageId(PulsarAdmin pulsarAdmin, String topicName) + throws PulsarAdminException { + if (type == Type.TIMESTAMP) { + return pulsarAdmin.topics().getMessageIdByTimestamp(topicName, timestamp); + } else if (messageId instanceof ChunkMessageIdImpl) { + return ((ChunkMessageIdImpl) messageId).getFirstChunkMessageId(); + } else { + return messageId; + } } @Override @@ -76,7 +131,7 @@ public String toString() { if (type == Type.TIMESTAMP) { return "timestamp: " + timestamp; } else { - return "message id: " + messageId; + return "message id: " + messageId + " include: " + include; } } diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/MessageIdUtils.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/MessageIdUtils.java deleted file mode 100644 index 0ae1a1fb..00000000 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/MessageIdUtils.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connector.pulsar.source.enumerator.cursor; - -import org.apache.flink.annotation.Internal; - -import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.impl.BatchMessageIdImpl; -import org.apache.pulsar.client.impl.MessageIdImpl; - -import static org.apache.flink.util.Preconditions.checkArgument; -import static org.apache.pulsar.client.impl.MessageIdImpl.convertToMessageIdImpl; - -/** The helper class for Pulsar's message id. */ -@Internal -public final class MessageIdUtils { - - private MessageIdUtils() { - // No public constructor. - } - - /** - * The implementation from this - * code snippet to get next message id. - */ - public static MessageId nextMessageId(MessageId messageId) { - MessageIdImpl idImpl = unwrapMessageId(messageId); - - long ledgerId = idImpl.getLedgerId(); - long entryId = idImpl.getEntryId(); - int partitionIndex = idImpl.getPartitionIndex(); - - if (entryId < 0) { - return new MessageIdImpl(ledgerId, 0, partitionIndex); - } else { - return new MessageIdImpl(ledgerId, entryId + 1, partitionIndex); - } - } - - /** - * Convert the message id interface to its backend implementation. And check if it's a batch - * message id. We don't support the batch message for its low performance now. - */ - public static MessageIdImpl unwrapMessageId(MessageId messageId) { - MessageIdImpl idImpl = convertToMessageIdImpl(messageId); - if (idImpl instanceof BatchMessageIdImpl) { - int batchSize = ((BatchMessageIdImpl) idImpl).getBatchSize(); - checkArgument( - batchSize <= 1, - "We only support normal message id currently. This batch size is %d", - batchSize); - } - - return idImpl; - } -} diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java index bf63675f..23dd98eb 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java @@ -24,41 +24,30 @@ import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.MessageId; -import static org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.nextMessageId; -import static org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.unwrapMessageId; - /** This cursor would leave pulsar start consuming from a specific message id. */ public class MessageIdStartCursor implements StartCursor { private static final long serialVersionUID = -8057345435887170111L; private final MessageId messageId; + private final boolean inclusive; /** * The default {@code inclusive} behavior should be controlled in {@link - * ConsumerBuilder#startMessageIdInclusive}. But pulsar has a bug and don't support this - * currently. We have to use {@code entry + 1} policy for consuming the next available message. - * If the message id entry is not valid. Pulsar would automatically find next valid message id. - * Please referer this - * code for understanding pulsar internal logic. + * ConsumerBuilder#startMessageIdInclusive}. But pulsar has a bug and doesn't support this + * feature currently. We have to use admin API to reset the cursor instead. * * @param messageId The message id for start position. - * @param inclusive Whether we include the start message id in consuming result. This works only - * if we provide a specified message id instead of {@link MessageId#earliest} or {@link + * @param inclusive Whether we include the start message id in the consuming result. This works + * only if we provide a specified message id instead of {@link MessageId#earliest} or {@link * MessageId#latest}. */ public MessageIdStartCursor(MessageId messageId, boolean inclusive) { - if (MessageId.earliest.equals(messageId) - || MessageId.latest.equals(messageId) - || inclusive) { - this.messageId = unwrapMessageId(messageId); - } else { - this.messageId = nextMessageId(messageId); - } + this.messageId = messageId; + this.inclusive = inclusive; } @Override public CursorPosition position(String topic, int partitionId) { - return new CursorPosition(messageId); + return new CursorPosition(messageId, inclusive); } } diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java index 3e0a86ed..ada2fd32 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java @@ -23,10 +23,10 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; -import static org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.unwrapMessageId; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.pulsar.client.api.MessageId.earliest; import static org.apache.pulsar.client.api.MessageId.latest; +import static org.apache.pulsar.client.impl.MessageIdImpl.convertToMessageIdImpl; /** * Stop consuming message at a given message id. We use the {@link MessageId#compareTo(Object)} for @@ -43,7 +43,7 @@ public MessageIdStopCursor(MessageId messageId, boolean inclusive) { checkArgument(!earliest.equals(messageId), "MessageId.earliest is not supported."); checkArgument(!latest.equals(messageId), "Use LatestMessageStopCursor instead."); - this.messageId = unwrapMessageId(messageId); + this.messageId = convertToMessageIdImpl(messageId); this.inclusive = inclusive; } diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java index 76888dfd..9a6873c5 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java @@ -27,6 +27,7 @@ import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition; import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; +import org.apache.flink.connector.pulsar.source.enumerator.cursor.CursorPosition; import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor.StopCondition; import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; @@ -78,7 +79,6 @@ import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient; import static org.apache.flink.connector.pulsar.source.config.CursorVerification.FAIL_ON_MISMATCH; import static org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils.createConsumerBuilder; -import static org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.nextMessageId; import static org.apache.flink.connector.pulsar.source.enumerator.topic.range.TopicRangeUtils.isFullTopicRanges; import static org.apache.pulsar.client.api.KeySharedPolicy.stickyHashRange; @@ -186,31 +186,22 @@ public void handleSplitsChanges(SplitsChange splitsChanges if (latestConsumedId != null) { LOG.info("Reset subscription position by the checkpoint {}", latestConsumedId); try { - MessageId initialPosition; + CursorPosition cursorPosition; if (latestConsumedId == MessageId.latest || latestConsumedId == MessageId.earliest) { // for compatibility - initialPosition = latestConsumedId; + cursorPosition = new CursorPosition(latestConsumedId, true); } else { - initialPosition = nextMessageId(latestConsumedId); + cursorPosition = new CursorPosition(latestConsumedId, false); } - // Remove Consumer.seek() here for waiting for pulsar-client-all 2.12.0 - // See https://github.com/apache/pulsar/issues/16757 for more details. - String topicName = registeredSplit.getPartition().getFullTopicName(); - List subscriptions = pulsarAdmin.topics().getSubscriptions(topicName); String subscriptionName = sourceConfiguration.getSubscriptionName(); - if (!subscriptions.contains(subscriptionName)) { - // If this subscription is not available. Just create it. - pulsarAdmin - .topics() - .createSubscription(topicName, subscriptionName, initialPosition); - } else { - // Reset the subscription if this is existed. - pulsarAdmin.topics().resetCursor(topicName, subscriptionName, initialPosition); - } + // Remove Consumer.seek() here for waiting for pulsar-client-all 2.12.0 + // See https://github.com/apache/pulsar/issues/16757 for more details. + + cursorPosition.seekPosition(pulsarAdmin, topicName, subscriptionName); } catch (PulsarAdminException e) { if (sourceConfiguration.getVerifyInitialOffsets() == FAIL_ON_MISMATCH) { throw new IllegalArgumentException(e);