Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-30552][Connector/Pulsar] drop batch message size assertion, better set the cursor position. #11

Merged
merged 1 commit into from
Jan 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.flink.util.FlinkRuntimeException;

import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.MessageId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -218,37 +217,11 @@ private void createSubscription(List<TopicPartition> newPartitions) {
for (TopicPartition partition : newPartitions) {
String topicName = partition.getFullTopicName();
String subscriptionName = sourceConfiguration.getSubscriptionName();
CursorPosition position =
startCursor.position(partition.getTopic(), partition.getPartitionId());

List<String> subscriptions =
sneakyAdmin(() -> pulsarAdmin.topics().getSubscriptions(topicName));
if (!subscriptions.contains(subscriptionName)) {
CursorPosition position =
startCursor.position(partition.getTopic(), partition.getPartitionId());
MessageId initialPosition = queryInitialPosition(topicName, position);

sneakyAdmin(
() ->
pulsarAdmin
.topics()
.createSubscription(
topicName, subscriptionName, initialPosition));
}
}
}

/** Query the available message id from Pulsar. */
private MessageId queryInitialPosition(String topicName, CursorPosition position) {
CursorPosition.Type type = position.getType();
if (type == CursorPosition.Type.TIMESTAMP) {
return sneakyAdmin(
() ->
pulsarAdmin
.topics()
.getMessageIdByTimestamp(topicName, position.getTimestamp()));
} else if (type == CursorPosition.Type.MESSAGE_ID) {
return position.getMessageId();
} else {
throw new UnsupportedOperationException("We don't support this seek type " + type);
sneakyAdmin(
() -> position.createInitialPosition(pulsarAdmin, topicName, subscriptionName));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,16 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumerator;
import org.apache.flink.connector.pulsar.source.reader.PulsarPartitionSplitReader;

import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.ChunkMessageIdImpl;

import java.io.Serializable;
import java.util.List;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand All @@ -37,46 +43,95 @@ public final class CursorPosition implements Serializable {
private final Type type;

private final MessageId messageId;
private final boolean include;

private final Long timestamp;

public CursorPosition(MessageId messageId) {
/**
* Start consuming from the given message id. The message id couldn't be the {@code
* MultipleMessageIdImpl}.
*
* @param include Whether the cosponsored position will be (in/ex)cluded in the consuming
* result.
*/
public CursorPosition(MessageId messageId, boolean include) {
checkNotNull(messageId, "Message id couldn't be null.");

this.type = Type.MESSAGE_ID;
this.messageId = messageId;
this.include = include;
this.timestamp = null;
}

/**
* Start consuming from the given timestamp position. The cosponsored position will be included
* in the consuming result.
*/
public CursorPosition(Long timestamp) {
checkNotNull(timestamp, "Timestamp couldn't be null.");

this.type = Type.TIMESTAMP;
this.messageId = null;
this.include = true;
this.timestamp = timestamp;
}

/** This method is used to create the initial position in {@link PulsarSourceEnumerator}. */
@Internal
public Type getType() {
return type;
public boolean createInitialPosition(
PulsarAdmin pulsarAdmin, String topicName, String subscriptionName)
throws PulsarAdminException {
List<String> subscriptions = pulsarAdmin.topics().getSubscriptions(topicName);

if (!subscriptions.contains(subscriptionName)) {
pulsarAdmin
.topics()
.createSubscription(topicName, subscriptionName, MessageId.earliest);

// Reset cursor to desired position.
MessageId initialPosition = getMessageId(pulsarAdmin, topicName);
pulsarAdmin
.topics()
.resetCursor(topicName, subscriptionName, initialPosition, !include);

return true;
}

return false;
}

/**
* This method is used to reset the consuming position in {@link PulsarPartitionSplitReader}.
*/
@Internal
public MessageId getMessageId() {
return messageId;
public void seekPosition(PulsarAdmin pulsarAdmin, String topicName, String subscriptionName)
throws PulsarAdminException {
if (!createInitialPosition(pulsarAdmin, topicName, subscriptionName)) {
// Reset cursor to desired position.
MessageId initialPosition = getMessageId(pulsarAdmin, topicName);
pulsarAdmin
.topics()
.resetCursor(topicName, subscriptionName, initialPosition, !include);
}
}

@Internal
public Long getTimestamp() {
return timestamp;
private MessageId getMessageId(PulsarAdmin pulsarAdmin, String topicName)
throws PulsarAdminException {
if (type == Type.TIMESTAMP) {
return pulsarAdmin.topics().getMessageIdByTimestamp(topicName, timestamp);
} else if (messageId instanceof ChunkMessageIdImpl) {
return ((ChunkMessageIdImpl) messageId).getFirstChunkMessageId();
} else {
return messageId;
}
}

@Override
public String toString() {
if (type == Type.TIMESTAMP) {
return "timestamp: " + timestamp;
} else {
return "message id: " + messageId;
return "message id: " + messageId + " include: " + include;
}
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,41 +24,30 @@
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.MessageId;

import static org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.nextMessageId;
import static org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.unwrapMessageId;

/** This cursor would leave pulsar start consuming from a specific message id. */
public class MessageIdStartCursor implements StartCursor {
private static final long serialVersionUID = -8057345435887170111L;

private final MessageId messageId;
private final boolean inclusive;

/**
* The default {@code inclusive} behavior should be controlled in {@link
* ConsumerBuilder#startMessageIdInclusive}. But pulsar has a bug and don't support this
* currently. We have to use {@code entry + 1} policy for consuming the next available message.
* If the message id entry is not valid. Pulsar would automatically find next valid message id.
* Please referer <a
* href="https://github.com/apache/pulsar/blob/36d5738412bb1ed9018178007bf63d9202b675db/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L1151">this
* code</a> for understanding pulsar internal logic.
* ConsumerBuilder#startMessageIdInclusive}. But pulsar has a bug and doesn't support this
* feature currently. We have to use admin API to reset the cursor instead.
*
* @param messageId The message id for start position.
* @param inclusive Whether we include the start message id in consuming result. This works only
* if we provide a specified message id instead of {@link MessageId#earliest} or {@link
* @param inclusive Whether we include the start message id in the consuming result. This works
* only if we provide a specified message id instead of {@link MessageId#earliest} or {@link
* MessageId#latest}.
*/
public MessageIdStartCursor(MessageId messageId, boolean inclusive) {
if (MessageId.earliest.equals(messageId)
|| MessageId.latest.equals(messageId)
|| inclusive) {
this.messageId = unwrapMessageId(messageId);
} else {
this.messageId = nextMessageId(messageId);
}
this.messageId = messageId;
this.inclusive = inclusive;
}

@Override
public CursorPosition position(String topic, int partitionId) {
return new CursorPosition(messageId);
return new CursorPosition(messageId, inclusive);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;

import static org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.unwrapMessageId;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.pulsar.client.api.MessageId.earliest;
import static org.apache.pulsar.client.api.MessageId.latest;
import static org.apache.pulsar.client.impl.MessageIdImpl.convertToMessageIdImpl;

/**
* Stop consuming message at a given message id. We use the {@link MessageId#compareTo(Object)} for
Expand All @@ -43,7 +43,7 @@ public MessageIdStopCursor(MessageId messageId, boolean inclusive) {
checkArgument(!earliest.equals(messageId), "MessageId.earliest is not supported.");
checkArgument(!latest.equals(messageId), "Use LatestMessageStopCursor instead.");

this.messageId = unwrapMessageId(messageId);
this.messageId = convertToMessageIdImpl(messageId);
this.inclusive = inclusive;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.CursorPosition;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor.StopCondition;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
Expand Down Expand Up @@ -78,7 +79,6 @@
import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
import static org.apache.flink.connector.pulsar.source.config.CursorVerification.FAIL_ON_MISMATCH;
import static org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils.createConsumerBuilder;
import static org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.nextMessageId;
import static org.apache.flink.connector.pulsar.source.enumerator.topic.range.TopicRangeUtils.isFullTopicRanges;
import static org.apache.pulsar.client.api.KeySharedPolicy.stickyHashRange;

Expand Down Expand Up @@ -186,31 +186,22 @@ public void handleSplitsChanges(SplitsChange<PulsarPartitionSplit> splitsChanges
if (latestConsumedId != null) {
LOG.info("Reset subscription position by the checkpoint {}", latestConsumedId);
try {
MessageId initialPosition;
CursorPosition cursorPosition;
if (latestConsumedId == MessageId.latest
|| latestConsumedId == MessageId.earliest) {
// for compatibility
initialPosition = latestConsumedId;
cursorPosition = new CursorPosition(latestConsumedId, true);
} else {
initialPosition = nextMessageId(latestConsumedId);
cursorPosition = new CursorPosition(latestConsumedId, false);
}

// Remove Consumer.seek() here for waiting for pulsar-client-all 2.12.0
// See https://github.com/apache/pulsar/issues/16757 for more details.

String topicName = registeredSplit.getPartition().getFullTopicName();
List<String> subscriptions = pulsarAdmin.topics().getSubscriptions(topicName);
String subscriptionName = sourceConfiguration.getSubscriptionName();

if (!subscriptions.contains(subscriptionName)) {
// If this subscription is not available. Just create it.
pulsarAdmin
.topics()
.createSubscription(topicName, subscriptionName, initialPosition);
} else {
// Reset the subscription if this is existed.
pulsarAdmin.topics().resetCursor(topicName, subscriptionName, initialPosition);
}
// Remove Consumer.seek() here for waiting for pulsar-client-all 2.12.0
// See https://github.com/apache/pulsar/issues/16757 for more details.

cursorPosition.seekPosition(pulsarAdmin, topicName, subscriptionName);
} catch (PulsarAdminException e) {
if (sourceConfiguration.getVerifyInitialOffsets() == FAIL_ON_MISMATCH) {
throw new IllegalArgumentException(e);
Expand Down