Skip to content

Commit

Permalink
[fix][client] Fix ReaderBuilder doest not give illegalArgument on con…
Browse files Browse the repository at this point in the history
…nection failure retry (apache#22639)
  • Loading branch information
rdhabalia authored May 10, 2024
1 parent b7ec89a commit b56f238
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
Expand All @@ -48,6 +50,7 @@
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.Reader;
Expand Down Expand Up @@ -902,4 +905,28 @@ public void testHasMessageAvailableAfterSeekTimestamp(boolean initializeLastMess
assertTrue(reader.hasMessageAvailable());
}
}

@Test
public void testReaderBuilderStateOnRetryFailure() throws Exception {
String ns = "my-property/my-ns";
String topic = "persistent://" + ns + "/testRetryReader";
RetentionPolicies retention = new RetentionPolicies(-1, -1);
admin.namespaces().setRetention(ns, retention);
String badUrl = "pulsar://bad-host:8080";

PulsarClient client = PulsarClient.builder().serviceUrl(badUrl).build();

ReaderBuilder<byte[]> readerBuilder = client.newReader().topic(topic).startMessageFromRollbackDuration(100,
TimeUnit.SECONDS);

for (int i = 0; i < 3; i++) {
try {
readerBuilder.createAsync().get(1, TimeUnit.SECONDS);
} catch (TimeoutException e) {
log.info("It should time out due to invalid url");
} catch (IllegalArgumentException e) {
fail("It should not fail with corrupt reader state");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,9 @@ public CompletableFuture<Reader<T>> createAsync() {
.failedFuture(new IllegalArgumentException("Topic name must be set on the reader builder"));
}

if (conf.getStartMessageId() != null && conf.getStartMessageFromRollbackDurationInSec() > 0
|| conf.getStartMessageId() == null && conf.getStartMessageFromRollbackDurationInSec() <= 0) {
boolean isStartMsgIdExist = conf.getStartMessageId() != null && conf.getStartMessageId() != MessageId.earliest;
if ((isStartMsgIdExist && conf.getStartMessageFromRollbackDurationInSec() > 0)
|| (conf.getStartMessageId() == null && conf.getStartMessageFromRollbackDurationInSec() <= 0)) {
return FutureUtil
.failedFuture(new IllegalArgumentException(
"Start message id or start message from roll back must be specified but they cannot be"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void readerBuilderLoadConfTest() throws Exception {
@Test(expectedExceptions = {PulsarClientException.class}, expectedExceptionsMessageRegExp = ".* must be specified but they cannot be specified at the same time.*")
public void shouldNotSetTwoOptAtTheSameTime() throws Exception {
PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
try (Reader reader = client.newReader().topic("abc").startMessageId(MessageId.earliest)
try (Reader reader = client.newReader().topic("abc").startMessageId(MessageId.latest)
.startMessageFromRollbackDuration(10, TimeUnit.HOURS).create()) {
// no-op
} finally {
Expand Down

0 comments on commit b56f238

Please sign in to comment.