Skip to content

Commit

Permalink
Use returnImmediately=false and disable timeouts for pullAsync (#1387)
Browse files Browse the repository at this point in the history
  • Loading branch information
mziccard authored Nov 11, 2016
1 parent 89f83f3 commit 8ec721e
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -702,9 +702,10 @@ interface MessageConsumer extends AutoCloseable {
/**
* Pulls messages from the provided subscription. This method possibly returns no messages if no
* message was available at the time the request was processed by the Pub/Sub service (i.e. the
* system is not allowed to wait until at least one message is available). Pulled messages have
* their acknowledge deadline automatically renewed until they are explicitly consumed using
* {@link Iterator#next()}.
* system is not allowed to wait until at least one message is available -
* <a href="https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PullRequest.FIELDS.bool.google.pubsub.v1.PullRequest.return_immediately">return_immediately</a>
* option is set to {@code true}). Pulled messages have their acknowledge deadline automatically
* renewed until they are explicitly consumed using {@link Iterator#next()}.
*
* <p>Example of pulling a maximum number of messages from a subscription.
* <pre> {@code
Expand All @@ -728,9 +729,12 @@ interface MessageConsumer extends AutoCloseable {
/**
* Sends a request for pulling messages from the provided subscription. This method returns a
* {@code Future} object to consume the result. {@link Future#get()} returns a message iterator.
* This method possibly returns no messages if no message was available at the time the request
* was processed by the Pub/Sub service (i.e. the system is not allowed to wait until at least one
* message is available).
* When using this method the system is allowed to wait until at least one message is available
* rather than returning no messages (i.e.
* <a href="https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PullRequest.FIELDS.bool.google.pubsub.v1.PullRequest.return_immediately">return_immediately</a>
* option is set to {@code false}). The client may cancel the request by calling
* {@link Future#cancel(boolean)} if it does not wish to wait any longer. Notice that the Pub/Sub
* service might still return no messages if a timeout is reached on the service side.
*
* <p>Example of asynchronously pulling a maximum number of messages from a subscription.
* <pre> {@code
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -512,18 +512,13 @@ public Future<AsyncPage<SubscriptionId>> listSubscriptionsAsync(String topic,
return listSubscriptionsAsync(topic, getOptions(), optionMap(options));
}

@Override
public Iterator<ReceivedMessage> pull(String subscription, int maxMessages) {
return get(pullAsync(subscription, maxMessages));
}

@Override
public Future<Iterator<ReceivedMessage>> pullAsync(final String subscription, int maxMessages) {
PullRequest request = PullRequest.newBuilder().setReturnImmediately(true)
private Future<Iterator<ReceivedMessage>> pullAsync(final String subscription,
int maxMessages, boolean returnImmediately) {
PullRequest request = PullRequest.newBuilder()
.setSubscription(
SubscriberApi.formatSubscriptionName(getOptions().getProjectId(), subscription))
.setMaxMessages(maxMessages)
.setReturnImmediately(true)
.setReturnImmediately(returnImmediately)
.build();
PullFuture future = rpc.pull(request);
future.addCallback(new PubSubRpc.PullCallback() {
Expand Down Expand Up @@ -555,6 +550,16 @@ public ReceivedMessage apply(com.google.pubsub.v1.ReceivedMessage receivedMessag
});
}

@Override
public Iterator<ReceivedMessage> pull(String subscription, int maxMessages) {
return get(pullAsync(subscription, maxMessages, true));
}

@Override
public Future<Iterator<ReceivedMessage>> pullAsync(String subscription, int maxMessages) {
return pullAsync(subscription, maxMessages, false);
}

@Override
public MessageConsumer pullAsync(String subscription, MessageProcessor callback,
PullOption... options) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;

import org.joda.time.Duration;

import java.io.IOException;
import java.util.Set;
import java.util.concurrent.Future;
Expand All @@ -77,6 +79,7 @@ public class DefaultPubSubRpc implements PubSubRpc {

private final PublisherApi publisherApi;
private final SubscriberApi subscriberApi;
private final SubscriberApi noTimeoutSubscriberApi;
private final ScheduledExecutorService executor;
private final ProviderManager providerManager;
private final ExecutorFactory<ScheduledExecutorService> executorFactory;
Expand Down Expand Up @@ -164,6 +167,12 @@ public DefaultPubSubRpc(PubSubOptions options) throws IOException {
.applyToAllApiMethods(callSettingsBuilder);
publisherApi = PublisherApi.create(pubBuilder.build());
subscriberApi = SubscriberApi.create(subBuilder.build());
callSettingsBuilder.setRetrySettingsBuilder(callSettingsBuilder.getRetrySettingsBuilder()
.setTotalTimeout(Duration.millis(Long.MAX_VALUE))
.setInitialRpcTimeout(Duration.millis(Long.MAX_VALUE))
.setMaxRpcTimeout(Duration.millis(Long.MAX_VALUE)));
subBuilder.applyToAllApiMethods(callSettingsBuilder);
noTimeoutSubscriberApi = SubscriberApi.create(subBuilder.build());
} catch (Exception ex) {
throw new IOException(ex);
}
Expand Down Expand Up @@ -256,9 +265,14 @@ public Future<Empty> acknowledge(AcknowledgeRequest request) {
return translate(subscriberApi.acknowledgeCallable().futureCall(request), false);
}

private static PullFuture pull(SubscriberApi subscriberApi, PullRequest request) {
return new PullFutureImpl(translate(subscriberApi.pullCallable().futureCall(request), false));
}

@Override
public PullFuture pull(PullRequest request) {
return new PullFutureImpl(translate(subscriberApi.pullCallable().futureCall(request), false));
return request.getReturnImmediately()
? pull(subscriberApi, request) : pull(noTimeoutSubscriberApi, request);
}

@Override
Expand Down Expand Up @@ -290,6 +304,7 @@ public void close() throws Exception {
}
closed = true;
subscriberApi.close();
noTimeoutSubscriberApi.close();
publisherApi.close();
providerManager.getChannel().shutdown();
executorFactory.release(executor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,24 @@ public void testPullMessagesAsync() throws ExecutionException, InterruptedExcept
assertTrue(pubsub().deleteTopic(topic));
}

@Test
public void testPullMessagesAsyncNonImmediately() throws ExecutionException, InterruptedException {
String topic = formatForTest("test-pull-messages-async-non-immediately-topic");
pubsub().create(TopicInfo.of(topic));
String subscription = formatForTest("test-pull-messages-async-subscription");
pubsub().create(SubscriptionInfo.of(topic, subscription));
Future<Iterator<ReceivedMessage>> future = pubsub().pullAsync(subscription, 2);
Message message1 = Message.of("payload1");
Message message2 = Message.of("payload2");
List<String> messageIds = pubsub().publish(topic, ImmutableList.of(message1, message2));
assertEquals(2, messageIds.size());
Iterator<ReceivedMessage> iterator = future.get();
assertEquals(message1.getPayloadAsString(), iterator.next().getPayloadAsString());
assertEquals(message2.getPayloadAsString(), iterator.next().getPayloadAsString());
assertTrue(pubsub().deleteSubscription(subscription));
assertTrue(pubsub().deleteTopic(topic));
}

@Test
public void testPullAsyncNonExistingSubscription()
throws ExecutionException, InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1302,7 +1302,7 @@ public void testPullMessagesAsync() throws ExecutionException, InterruptedExcept
PullRequest request = PullRequest.newBuilder()
.setSubscription(SUBSCRIPTION_NAME_PB)
.setMaxMessages(42)
.setReturnImmediately(true)
.setReturnImmediately(false)
.build();
List<ReceivedMessage> messageList = ImmutableList.of(
ReceivedMessage.fromPb(pubsub, SUBSCRIPTION, MESSAGE_PB1),
Expand Down Expand Up @@ -1363,7 +1363,7 @@ public void testPullMessagesAsyncError() throws ExecutionException, InterruptedE
PullRequest request = PullRequest.newBuilder()
.setSubscription(SUBSCRIPTION_NAME_PB)
.setMaxMessages(42)
.setReturnImmediately(true)
.setReturnImmediately(false)
.build();
PubSubException exception = new PubSubException(new IOException(), false);
PullFuture futureMock = EasyMock.createStrictMock(PullFuture.class);
Expand Down

0 comments on commit 8ec721e

Please sign in to comment.