Skip to content

Commit

Permalink
[fix][client] fix Reader.hasMessageAvailable might return true after …
Browse files Browse the repository at this point in the history
…seeking to latest

### Motivation

Java client has the same issue with
apache/pulsar-client-python#199

After a seek operation is done, the `startMessageId` will be updated
until the reconnection due to the seek is done in `connectionOpened`. So
before it's updated, `hasMessageAvailable` could compare with an
outdated `startMessageId` and return a wrong value.

### Modifications

Replace `duringSeek` with a `SeekStatus` field:
- `NOT_STARTED`: initial, or a seek operation is done. `seek` could only
  succeed in this status.
- `IN_PROGRESS`: A seek operation has started but the client does not
  receive the response from broker.
- `COMPLETED`: The client has received the seek response but the seek
  future is not done.

After the status becomes `COMPLETED`, next time the connection is
established, the status will change from `COMPLETED` to `NOT_STARTED`
and then seek future will be completed in the internal executor.

Add `testHasMessageAvailableAfterSeek` to cover this change.
  • Loading branch information
BewareMyPower committed Mar 5, 2024
1 parent 207335a commit 4edc576
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Slf4j
Expand Down Expand Up @@ -813,4 +814,27 @@ public void testReaderReconnectedFromNextEntry() throws Exception {
producer.close();
admin.topics().delete(topic, false);
}

@DataProvider
public static Object[][] initializeLastMessageIdInBroker() {
return new Object[][] { { true }, { false } };
}

@Test(dataProvider = "initializeLastMessageIdInBroker")
public void testHasMessageAvailableAfterSeek(boolean initializeLastMessageIdInBroker) throws Exception {
final String topic = "persistent://my-property/my-ns/test-has-message-available-after-seek";
@Cleanup Reader<String> reader = pulsarClient.newReader(Schema.STRING).topic(topic).receiverQueueSize(1)
.startMessageId(MessageId.earliest).create();

@Cleanup Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
producer.send("msg");

if (initializeLastMessageIdInBroker) {
assertTrue(reader.hasMessageAvailable());
} // else: lastMessageIdInBroker is earliest

reader.seek(MessageId.latest);
// lastMessageIdInBroker is the last message ID, while startMessageId is still earliest
assertFalse(reader.hasMessageAvailable());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
private volatile MessageIdAdv startMessageId;

private volatile MessageIdAdv seekMessageId;
private final AtomicBoolean duringSeek;
private final AtomicReference<SeekStatus> seekStatus;
private volatile CompletableFuture<Void> seekFuture;

private final MessageIdAdv initialStartMessageId;

Expand Down Expand Up @@ -304,7 +305,7 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
stats = ConsumerStatsDisabled.INSTANCE;
}

duringSeek = new AtomicBoolean(false);
seekStatus = new AtomicReference<>(SeekStatus.NOT_STARTED);

// Create msgCrypto if not created already
if (conf.getCryptoKeyReader() != null) {
Expand Down Expand Up @@ -781,15 +782,15 @@ public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
closeConsumerTasks();
deregisterFromClientCnx();
client.cleanupConsumer(this);
clearReceiverQueue();
clearReceiverQueue(false);
return CompletableFuture.completedFuture(null);
}

log.info("[{}][{}] Subscribing to topic on cnx {}, consumerId {}",
topic, subscription, cnx.ctx().channel(), consumerId);

long requestId = client.newRequestId();
if (duringSeek.get()) {
if (!SeekStatus.NOT_STARTED.equals(seekStatus.get())) {
acknowledgmentsGroupingTracker.flushAndClean();
}

Expand All @@ -800,7 +801,7 @@ public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
int currentSize;
synchronized (this) {
currentSize = incomingMessages.size();
startMessageId = clearReceiverQueue();
clearReceiverQueue(true);
if (possibleSendToDeadLetterTopicMessages != null) {
possibleSendToDeadLetterTopicMessages.clear();
}
Expand Down Expand Up @@ -943,15 +944,21 @@ protected void consumerIsReconnectedToBroker(ClientCnx cnx, int currentQueueSize
* Clear the internal receiver queue and returns the message id of what was the 1st message in the queue that was
* not seen by the application.
*/
private MessageIdAdv clearReceiverQueue() {
private void clearReceiverQueue(boolean updateStartMessageId) {
List<Message<?>> currentMessageQueue = new ArrayList<>(incomingMessages.size());
incomingMessages.drainTo(currentMessageQueue);
resetIncomingMessageSize();

if (duringSeek.compareAndSet(true, false)) {
return seekMessageId;
CompletableFuture<Void> seekFuture = this.seekFuture;
MessageIdAdv seekMessageId = this.seekMessageId;
if (seekStatus.compareAndSet(SeekStatus.COMPLETED, SeekStatus.NOT_STARTED)) {
if (updateStartMessageId) {
startMessageId = seekMessageId;
}
internalPinnedExecutor.execute(() -> seekFuture.complete(null));
return;
} else if (subscriptionMode == SubscriptionMode.Durable) {
return startMessageId;
return;
}

if (!currentMessageQueue.isEmpty()) {
Expand All @@ -968,15 +975,14 @@ private MessageIdAdv clearReceiverQueue() {
}
// release messages if they are pooled messages
currentMessageQueue.forEach(Message::release);
return previousMessage;
} else if (!lastDequeuedMessageId.equals(MessageId.earliest)) {
if (updateStartMessageId) {
startMessageId = previousMessage;
}
} else if (updateStartMessageId && !lastDequeuedMessageId.equals(MessageId.earliest)) {
// If the queue was empty we need to restart from the message just after the last one that has been dequeued
// in the past
return new BatchMessageIdImpl((MessageIdImpl) lastDequeuedMessageId);
} else {
// No message was received or dequeued by this consumer. Next message would still be the startMessageId
return startMessageId;
}
startMessageId = new BatchMessageIdImpl((MessageIdImpl) lastDequeuedMessageId);
} // else: No message was received or dequeued by this consumer. Next message would still be the startMessageId
}

/**
Expand Down Expand Up @@ -2249,25 +2255,23 @@ private CompletableFuture<Void> seekAsyncInternal(long requestId, ByteBuf seek,
.setMandatoryStop(0, TimeUnit.MILLISECONDS)
.create();

CompletableFuture<Void> seekFuture = new CompletableFuture<>();
seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, opTimeoutMs, seekFuture);
if (!seekStatus.compareAndSet(SeekStatus.NOT_STARTED, SeekStatus.IN_PROGRESS)) {
final String message = String.format(
"[%s][%s] attempting to seek operation that is already in progress (seek by %s)",
topic, subscription, seekBy);
log.warn("[{}][{}] Attempting to seek operation that is already in progress, cancelling {}",
topic, subscription, seekBy);
return FutureUtil.failedFuture(new IllegalStateException(message));
}
seekFuture = new CompletableFuture<>();
seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, opTimeoutMs);
return seekFuture;
}

private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, String seekBy,
final Backoff backoff, final AtomicLong remainingTime,
CompletableFuture<Void> seekFuture) {
final Backoff backoff, final AtomicLong remainingTime) {
ClientCnx cnx = cnx();
if (isConnected() && cnx != null) {
if (!duringSeek.compareAndSet(false, true)) {
final String message = String.format(
"[%s][%s] attempting to seek operation that is already in progress (seek by %s)",
topic, subscription, seekBy);
log.warn("[{}][{}] Attempting to seek operation that is already in progress, cancelling {}",
topic, subscription, seekBy);
seekFuture.completeExceptionally(new IllegalStateException(message));
return;
}
MessageIdAdv originSeekMessageId = seekMessageId;
seekMessageId = (MessageIdAdv) seekId;
log.info("[{}][{}] Seeking subscription to {}", topic, subscription, seekBy);
Expand All @@ -2279,14 +2283,12 @@ private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, S
lastDequeuedMessageId = MessageId.earliest;

clearIncomingMessages();
seekFuture.complete(null);
seekStatus.set(SeekStatus.COMPLETED);
}).exceptionally(e -> {
// re-set duringSeek and seekMessageId if seek failed
seekMessageId = originSeekMessageId;
duringSeek.set(false);
log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage());

seekFuture.completeExceptionally(
failSeek(
PulsarClientException.wrap(e.getCause(),
String.format("Failed to seek the subscription %s of the topic %s to %s",
subscription, topicName.toString(), seekBy)));
Expand All @@ -2295,7 +2297,7 @@ private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, S
} else {
long nextDelay = Math.min(backoff.next(), remainingTime.get());
if (nextDelay <= 0) {
seekFuture.completeExceptionally(
failSeek(
new PulsarClientException.TimeoutException(
String.format("The subscription %s of the topic %s could not seek "
+ "withing configured timeout", subscription, topicName.toString())));
Expand All @@ -2306,11 +2308,18 @@ private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, S
log.warn("[{}] [{}] Could not get connection while seek -- Will try again in {} ms",
topic, getHandlerName(), nextDelay);
remainingTime.addAndGet(-nextDelay);
seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, remainingTime, seekFuture);
seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, remainingTime);
}, nextDelay, TimeUnit.MILLISECONDS);
}
}

private void failSeek(Throwable throwable) {
CompletableFuture<Void> seekFuture = this.seekFuture;
if (seekStatus.compareAndSet(SeekStatus.IN_PROGRESS, SeekStatus.NOT_STARTED)) {
seekFuture.completeExceptionally(throwable);
}
}

@Override
public CompletableFuture<Void> seekAsync(long timestamp) {
String seekBy = String.format("the timestamp %d", timestamp);
Expand Down Expand Up @@ -2968,4 +2977,9 @@ boolean isAckReceiptEnabled() {

private static final Logger log = LoggerFactory.getLogger(ConsumerImpl.class);

private enum SeekStatus {
NOT_STARTED,
IN_PROGRESS,
COMPLETED
}
}

0 comments on commit 4edc576

Please sign in to comment.