Skip to content

Commit

Permalink
Merge pull request #668 from vikinghawk/topology_listener
Browse files Browse the repository at this point in the history
Add topology recovery started method to RecoveryListener
  • Loading branch information
michaelklishin authored Feb 15, 2021
2 parents 136c796 + b3b8642 commit 43af65d
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 2 deletions.
8 changes: 8 additions & 0 deletions src/main/java/com/rabbitmq/client/RecoveryListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,12 @@ public interface RecoveryListener {
* @param recoverable a {@link Recoverable} connection.
*/
void handleRecoveryStarted(Recoverable recoverable);

/**
* Invoked before automatic topology recovery starts.
* This means that the connection and channel recovery has completed
* and that exchange/queue/binding/consumer recovery is about to begin.
* @param recoverable a {@link Recoverable} connection.
*/
default void handleTopologyRecoveryStarted(Recoverable recoverable) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,10 @@ public void removeConsumerRecoveryListener(ConsumerRecoveryListener listener) {
}

private synchronized void beginAutomaticRecovery() throws InterruptedException {
this.wait(this.params.getRecoveryDelayHandler().getDelay(0));
final long delay = this.params.getRecoveryDelayHandler().getDelay(0);
if (delay > 0) {
this.wait(delay);
}

this.notifyRecoveryListenersStarted();

Expand All @@ -576,6 +579,7 @@ private synchronized void beginAutomaticRecovery() throws InterruptedException {
// don't assign new delegate connection until channel recovery is complete
this.delegate = newConn;
if (this.params.isTopologyRecoveryEnabled()) {
notifyTopologyRecoveryListenersStarted();
recoverTopology(params.getTopologyRecoveryExecutor());
}
this.notifyRecoveryListenersComplete();
Expand Down Expand Up @@ -650,6 +654,12 @@ private void notifyRecoveryListenersStarted() {
}
}

private void notifyTopologyRecoveryListenersStarted() {
for (RecoveryListener f : Utility.copy(this.recoveryListeners)) {
f.handleTopologyRecoveryStarted(this);
}
}

private void recoverTopology(final ExecutorService executor) {
// The recovery sequence is the following:
// 1. Recover exchanges
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public String getPassword() {
// see https://github.com/rabbitmq/rabbitmq-java-client/issues/135
@Test public void thatShutdownHooksOnConnectionFireBeforeRecoveryStarts() throws IOException, InterruptedException {
final List<String> events = new CopyOnWriteArrayList<String>();
final CountDownLatch latch = new CountDownLatch(2); // one when started, another when complete
final CountDownLatch latch = new CountDownLatch(3); // one when started, another when complete
connection.addShutdownListener(new ShutdownListener() {
@Override
public void shutdownCompleted(ShutdownSignalException cause) {
Expand Down Expand Up @@ -202,6 +202,10 @@ public void handleRecovery(Recoverable recoverable) {
public void handleRecoveryStarted(Recoverable recoverable) {
latch.countDown();
}
@Override
public void handleTopologyRecoveryStarted(Recoverable recoverable) {
latch.countDown();
}
});
assertThat(connection.isOpen()).isTrue();
closeAndWaitForRecovery();
Expand Down

0 comments on commit 43af65d

Please sign in to comment.