Skip to content

Commit

Permalink
[fix][client] Make the whole grabCnx() progress atomic
Browse files Browse the repository at this point in the history
### Motivation

In `ConnectionHandler`, there is a `Connection` interface whose methods
will be called after the connection in `grabCnx` is established, the
implementation of `Connection` might send some requests after the
connection is established. For example, the consumer will send the
`CommandSubscribe` request in `connectionOpened`. However, the whole
process is not atomic, which leads to the message lost reported in
apache#20591.

### Modifications

Modify the `Connection` interface to have a single method:

```java
CompletableFuture<Void> handleNewConnection(ClientCnx cnx, PulsarClientException e);
```

The returned future should be completed once the implementation has done
everything, e.g. for the consumer, the future should only be completed
after receiving the response for `CommandSubscribe`.

In `grabCnx`, the `ConnectionHandler` could only connect to the broker
once the whole process is completed.

Add `ConnectionHandlerTest` to verify the behavior.
  • Loading branch information
BewareMyPower committed Jun 16, 2023
1 parent a85e9df commit b9d9637
Show file tree
Hide file tree
Showing 7 changed files with 224 additions and 53 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Slf4j
@Test(groups = "broker-impl")
public class ConnectionHandlerTest extends ProducerConsumerBase {

private static final Backoff BACKOFF = new BackoffBuilder().setInitialTime(1, TimeUnit.MILLISECONDS)
.setMandatoryStop(1, TimeUnit.SECONDS)
.setMax(3, TimeUnit.SECONDS).create();
private final ExecutorService executor = Executors.newFixedThreadPool(4);

@BeforeClass(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}

@AfterClass
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
executor.shutdown();
}

@Test(timeOut = 30000)
public void testSynchronousGrabCnx() {
for (int i = 0; i < 10; i++) {
final CompletableFuture<Integer> future = new CompletableFuture<>();
final int index = i;
final ConnectionHandler handler = new ConnectionHandler(
new MockedHandlerState((PulsarClientImpl) pulsarClient, "my-topic"), BACKOFF,
(cnx, e) -> {
future.complete(index);
return CompletableFuture.completedFuture(null);
});
handler.grabCnx();
Assert.assertEquals(future.join(), i);
}
}

@Test
public void testConcurrentGrabCnx() {
final AtomicInteger cnt = new AtomicInteger(0);
final ConnectionHandler handler = new ConnectionHandler(
new MockedHandlerState((PulsarClientImpl) pulsarClient, "my-topic"), BACKOFF,
(cnx, e) -> {
cnt.incrementAndGet();
return CompletableFuture.completedFuture(null);
});
final int numGrab = 10;
for (int i = 0; i < numGrab; i++) {
handler.grabCnx();
}
Awaitility.await().atMost(Duration.ofSeconds(3)).until(() -> cnt.get() > 0);
Assert.assertThrows(ConditionTimeoutException.class,
() -> Awaitility.await().atMost(Duration.ofMillis(500)).until(() -> cnt.get() == numGrab));
Assert.assertEquals(cnt.get(), 1);
}

private static class MockedHandlerState extends HandlerState {

public MockedHandlerState(PulsarClientImpl client, String topic) {
super(client, topic);
}

@Override
String getHandlerName() {
return "mocked";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ public class ConnectionHandler {
// Start with -1L because it gets incremented before sending on the first connection
private volatile long epoch = -1L;
protected volatile long lastConnectionClosedTimestamp = 0L;
private volatile CompletableFuture<Void> connectFuture;

interface Connection {
void connectionFailed(PulsarClientException exception);
void connectionOpened(ClientCnx cnx);

CompletableFuture<Void> handleNewConnection(ClientCnx cnx, PulsarClientException e);
}

protected Connection connection;
Expand All @@ -63,11 +64,17 @@ protected void grabCnx() {
return;
}

if (!isValidStateForReconnection()) {
// Ignore connection closed when we are shutting down
log.info("[{}] [{}] Ignoring reconnection request (state: {})",
state.topic, state.getHandlerName(), state.getState());
return;
synchronized (this) {
if (connectFuture != null && !connectFuture.isDone()) {
return;
}
if (isValidStateForReconnection()) {
connectFuture = new CompletableFuture<>();
} else {
// Ignore connection closed when we are shutting down
log.info("[{}] [{}] Ignoring reconnection request (state: {})",
state.topic, state.getHandlerName(), state.getState());
}
}

try {
Expand All @@ -81,31 +88,42 @@ protected void grabCnx() {
} else {
cnxFuture = state.client.getConnection(state.topic); //
}
cnxFuture.thenAccept(cnx -> connection.connectionOpened(cnx)) //
.exceptionally(this::handleConnectionError);
cnxFuture.whenComplete((cnx, e) -> {
try {
final CompletableFuture<Void> future = connection.handleNewConnection(cnx, convertThrowable(e));
State state = this.state.getState();
if (state == State.Uninitialized || state == State.Connecting || state == State.Ready) {
reconnectLater(e);
}
future.whenComplete((__, throwable) -> {
if (throwable != null) {
log.warn("[{}] [{}] Failed to handle the connection: {}",
this.state.topic, this.state.getHandlerName(), throwable.getMessage());
}
connectFuture.complete(null);
});
} catch (Throwable throwable) {
log.error("[{}] [{}] Unexpected exception after the connection",
state.topic, state.getHandlerName(), throwable);
}
});
} catch (Throwable t) {
log.warn("[{}] [{}] Exception thrown while getting connection: ", state.topic, state.getHandlerName(), t);
reconnectLater(t);
}
}

private Void handleConnectionError(Throwable exception) {
log.warn("[{}] [{}] Error connecting to broker: {}",
state.topic, state.getHandlerName(), exception.getMessage());
if (exception instanceof PulsarClientException) {
connection.connectionFailed((PulsarClientException) exception);
} else if (exception.getCause() instanceof PulsarClientException) {
connection.connectionFailed((PulsarClientException) exception.getCause());
} else {
connection.connectionFailed(new PulsarClientException(exception));
private static PulsarClientException convertThrowable(Throwable e) {
if (e == null) {
return null;
}

State state = this.state.getState();
if (state == State.Uninitialized || state == State.Connecting || state == State.Ready) {
reconnectLater(exception);
if (e instanceof PulsarClientException) {
return (PulsarClientException) e;
} else if (e.getCause() instanceof PulsarClientException) {
return (PulsarClientException) e.getCause();
} else {
return new PulsarClientException(e);
}

return null;
}

protected void reconnectLater(Throwable exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -759,17 +759,17 @@ public void negativeAcknowledge(Message<?> message) {
unAckedMessageTracker.remove(message.getMessageId());
}

@Override
public void connectionOpened(final ClientCnx cnx) {
private CompletableFuture<Void> handleConnectionOpened(final ClientCnx cnx) {
previousExceptions.clear();

if (getState() == State.Closing || getState() == State.Closed) {
final State state = getState();
if (state == State.Closing || state == State.Closed) {
setState(State.Closed);
closeConsumerTasks();
deregisterFromClientCnx();
client.cleanupConsumer(this);
clearReceiverQueue();
return;
return FutureUtil.failedFuture(new IllegalStateException("consumer state is " + state));
}

log.info("[{}][{}] Subscribing to topic on cnx {}, consumerId {}",
Expand Down Expand Up @@ -823,6 +823,7 @@ public void connectionOpened(final ClientCnx cnx) {
&& startMessageId.equals(initialStartMessageId)) ? startMessageRollbackDurationInSec : 0;

// synchronized this, because redeliverUnAckMessage eliminate the epoch inconsistency between them
final CompletableFuture<Void> future = new CompletableFuture<>();
synchronized (this) {
setClientCnx(cnx);
ByteBuf request = Commands.newSubscribe(topic, subscription, consumerId, requestId, getSubType(),
Expand All @@ -844,6 +845,7 @@ public void connectionOpened(final ClientCnx cnx) {
deregisterFromClientCnx();
client.cleanupConsumer(this);
cnx.channel().close();
future.complete(null);
return;
}
}
Expand All @@ -856,6 +858,7 @@ public void connectionOpened(final ClientCnx cnx) {
if (!(firstTimeConnect && hasParentConsumer) && getCurrentReceiverQueueSize() != 0) {
increaseAvailablePermits(cnx, getCurrentReceiverQueueSize());
}
future.complete(null);
}).exceptionally((e) -> {
deregisterFromClientCnx();
if (getState() == State.Closing || getState() == State.Closed) {
Expand Down Expand Up @@ -905,9 +908,11 @@ public void connectionOpened(final ClientCnx cnx) {
// consumer was subscribed and connected but we got some error, keep trying
reconnectLater(e.getCause());
}
future.completeExceptionally(e);
return null;
});
}
return future;
}

protected void consumerIsReconnectedToBroker(ClientCnx cnx, int currentQueueSize) {
Expand Down Expand Up @@ -981,8 +986,7 @@ private void sendFlowPermitsToBroker(ClientCnx cnx, int numMessages) {
}
}

@Override
public void connectionFailed(PulsarClientException exception) {
private CompletableFuture<Void> handleConnectionFailed(PulsarClientException exception) {
boolean nonRetriableError = !PulsarClientException.isRetriableError(exception);
boolean timeout = System.currentTimeMillis() > lookupDeadline;
if (nonRetriableError || timeout) {
Expand All @@ -991,7 +995,7 @@ public void connectionFailed(PulsarClientException exception) {
setState(State.Failed);
if (nonRetriableError) {
log.info("[{}] Consumer creation failed for consumer {} with unretriableError {}",
topic, consumerId, exception);
topic, consumerId, exception.getMessage());
} else {
log.info("[{}] Consumer creation failed for consumer {} after timeout", topic, consumerId);
}
Expand All @@ -1002,6 +1006,7 @@ public void connectionFailed(PulsarClientException exception) {
} else {
previousExceptions.add(exception);
}
return CompletableFuture.completedFuture(null);
}

@Override
Expand Down Expand Up @@ -2342,6 +2347,11 @@ private boolean hasMoreMessages(MessageId lastMessageIdInBroker, MessageId messa
&& ((MessageIdImpl) lastMessageIdInBroker).getEntryId() != -1;
}

@Override
public CompletableFuture<Void> handleNewConnection(ClientCnx cnx, PulsarClientException e) {
return (e == null) ? handleConnectionOpened(cnx) : handleConnectionFailed(e);
}

private static final class GetLastMessageIdResponse {
final MessageId lastMessageId;
final MessageId markDeletePosition;
Expand Down
Loading

0 comments on commit b9d9637

Please sign in to comment.