Skip to content

Commit

Permalink
fix race in SubscriberImplTest::receiveMessage (#1586)
Browse files Browse the repository at this point in the history
This bug manifests as testBundleAcks deadlocking.

The test
1. publishes a series of messages to a mock server,
2. waits to receive them back,
3. and acknowledges them.

For performance reasons, the client does not immediately
send the acknowledgement request to the server.
Instead, it sends acks in batch every 100ms.
Using a fake clock, the test advances the time by 100ms,
sending the ack-batch, then verify that the mock server
receives the acks.

The bug is in step 2. The test thread waits by waiting on a
CountDownLatch, which is counted down by GRPC thread calling
receiveMessage().
However, the method decrements the latch before acking the message.

On a bad day, the test thread can wake up, advance the clock,
and send the ack-batch before the GRPC thread could add the
message to the batch.
The test then waits for the server to receive an ack it never sent,
deadlocking the test.

The fix is for receiveMessage() to ack the message before
decrementing the counter.
  • Loading branch information
pongad authored Feb 3, 2017
1 parent d325afb commit 36e609e
Showing 1 changed file with 4 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,6 @@ void waitForExpectedMessages() throws InterruptedException {

@Override
public void receiveMessage(PubsubMessage message, SettableFuture<AckReply> response) {
if (messageCountLatch.isPresent()) {
messageCountLatch.get().countDown();
}

if (explicitAckReplies) {
try {
outstandingMessageReplies.put(response);
Expand All @@ -128,6 +124,10 @@ public void receiveMessage(PubsubMessage message, SettableFuture<AckReply> respo
} else {
replyTo(response);
}

if (messageCountLatch.isPresent()) {
messageCountLatch.get().countDown();
}
}

public void replyNextOutstandingMessage() {
Expand Down

0 comments on commit 36e609e

Please sign in to comment.