From 5ceab7ec30bbb95d39b678f7f159129bdd1068a6 Mon Sep 17 00:00:00 2001 From: Anu Thomas Chandy Date: Tue, 28 Sep 2021 12:34:57 -0700 Subject: [PATCH] [Amqp-core, EH]: Prepending namespace|entitypath consistenty in log, first untrack processor subscriber then notify and adding retry to EventHubConsumer[Receiver]Client (#24417) --- .../implementation/AmqpChannelProcessor.java | 38 +++++++++++-------- .../EventHubConsumerAsyncClient.java | 33 +++++++++++++--- 2 files changed, 51 insertions(+), 20 deletions(-) diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpChannelProcessor.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpChannelProcessor.java index 24d013d8ff35e..0ef34bf9672ab 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpChannelProcessor.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpChannelProcessor.java @@ -140,7 +140,8 @@ public void onError(Throwable throwable) { Objects.requireNonNull(throwable, "'throwable' is required."); if (isRetryPending.get() && retryPolicy.calculateRetryDelay(throwable, retryAttempts.get()) != null) { - logger.warning("Retry is already pending. Ignoring transient error.", throwable); + logger.warning("namespace[{}] entityPath[{}]: Retry is already pending. Ignoring transient error.", + fullyQualifiedNamespace, entityPath, throwable); return; } @@ -182,22 +183,24 @@ public void onError(Throwable throwable) { return; } - logger.info("Retry #{}. Transient error occurred. Retrying after {} ms.", attempts, - retryInterval.toMillis(), throwable); + logger.info("namespace[{}] entityPath[{}]: Retry #{}. Transient error occurred. Retrying after {} ms.", + fullyQualifiedNamespace, entityPath, attempts, retryInterval.toMillis(), throwable); retrySubscription = Mono.delay(retryInterval).subscribe(i -> { if (isDisposed()) { - logger.info("Retry #{}. Not requesting from upstream. Processor is disposed.", attempts); + logger.info("namespace[{}] entityPath[{}]: Retry #{}. Not requesting from upstream. Processor is disposed.", + fullyQualifiedNamespace, entityPath, attempts); } else { - logger.info("Retry #{}. Requesting from upstream.", attempts); + logger.info("namespace[{}] entityPath[{}]: Retry #{}. Requesting from upstream.", + fullyQualifiedNamespace, entityPath, attempts); requestUpstream(); isRetryPending.set(false); } }); } else { - logger.warning("entityPath[{}] Retry #{}. Retry attempts exhausted or exception was not retriable.", - entityPath, attempts, throwable); + logger.warning("namespace[{}] entityPath[{}]: Retry #{}. Retry attempts exhausted or exception was not retriable.", + fullyQualifiedNamespace, entityPath, attempts, throwable); lastError = throwable; isDisposed.set(true); @@ -206,8 +209,8 @@ public void onError(Throwable throwable) { synchronized (lock) { final ConcurrentLinkedDeque> currentSubscribers = subscribers; subscribers = new ConcurrentLinkedDeque<>(); - logger.info("namespace[{}] entityPath[{}]: Error in AMQP channel processor. Notifying {} " - + "subscribers.", fullyQualifiedNamespace, entityPath, currentSubscribers.size()); + logger.info("namespace[{}] entityPath[{}]: Error in AMQP channel processor. Notifying {} subscribers.", + fullyQualifiedNamespace, entityPath, currentSubscribers.size()); currentSubscribers.forEach(subscriber -> subscriber.onError(throwable)); } @@ -254,8 +257,8 @@ public void subscribe(CoreSubscriber actual) { } subscribers.add(subscriber); - logger.verbose("Added a subscriber {} to AMQP channel processor. Total " - + "subscribers = {}", subscriber, subscribers.size()); + logger.verbose("namespace[{}] entityPath[{}]: Added a subscriber {} to AMQP channel processor. Total " + + "subscribers = {}", fullyQualifiedNamespace, entityPath, subscriber, subscribers.size()); if (!isRetryPending.get()) { requestUpstream(); @@ -350,7 +353,11 @@ private void close(T channel) { } /** - * Represents a subscriber, waiting for an AMQP connection. + * Represents the decorator-subscriber wrapping a downstream subscriber to AmqpChannelProcessor. + * These are the subscribers waiting to receive a channel that is yet to be available in the AmqpChannelProcessor. + * The AmqpChannelProcessor tracks a list of such waiting subscribers; once the processor receives + * a result (channel, error or disposal) from it's upstream, each decorated-subscriber will be notified, + * which removes itself from the tracking list, then propagates the notification to the wrapped subscriber. */ private static final class ChannelSubscriber extends Operators.MonoSubscriber { private final AmqpChannelProcessor processor; @@ -362,15 +369,16 @@ private ChannelSubscriber(CoreSubscriber actual, AmqpChannelProcessor @Override public void cancel() { - super.cancel(); processor.subscribers.remove(this); + super.cancel(); } @Override public void onComplete() { if (!isCancelled()) { - actual.onComplete(); + // first untrack before calling into external code. processor.subscribers.remove(this); + actual.onComplete(); } } @@ -384,8 +392,8 @@ public void onNext(T channel) { @Override public void onError(Throwable throwable) { if (!isCancelled()) { - actual.onError(throwable); processor.subscribers.remove(this); + actual.onError(throwable); } else { Operators.onErrorDropped(throwable, currentContext()); } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClient.java index 70d60783126e6..a48d6518acba1 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClient.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClient.java @@ -5,6 +5,7 @@ import com.azure.core.amqp.implementation.AmqpReceiveLink; import com.azure.core.amqp.implementation.MessageSerializer; +import com.azure.core.amqp.implementation.RetryUtil; import com.azure.core.amqp.implementation.StringUtil; import com.azure.core.annotation.ReturnType; import com.azure.core.annotation.ServiceClient; @@ -352,15 +353,37 @@ private EventHubPartitionAsyncConsumer createPartitionConsumer(String linkName, getEventHubName(), consumerGroup, partitionId); final AtomicReference> initialPosition = new AtomicReference<>(() -> startingPosition); - final Flux receiveLinkMono = connectionProcessor + + // The Mono, when subscribed, creates a AmqpReceiveLink in the AmqpConnection emitted by the connectionProcessor + // + final Mono receiveLinkMono = connectionProcessor .flatMap(connection -> { logger.info("connectionId[{}] linkName[{}] Creating receive consumer for partition '{}'", connection.getId(), linkName, partitionId); return connection.createReceiveLink(linkName, entityPath, initialPosition.get().get(), receiveOptions); - }) - .repeat(); - - final AmqpReceiveLinkProcessor linkMessageProcessor = receiveLinkMono.subscribeWith( + }); + + // A Mono that resubscribes to 'receiveLinkMono' to retry the creation of AmqpReceiveLink. + // + // The scenarios where this retry helps are - + // [1]. When we try to create a link on a session being disposed but connection is healthy, the retry can + // eventually create a new session then the link. + // [2]. When we try to create a new session (to host the new link) but on a connection being disposed, + // the retry can eventually receives a new connection and then proceed with creating session and link. + // + final Mono retriableReceiveLinkMono = RetryUtil.withRetry(receiveLinkMono, + connectionProcessor.getRetryOptions(), + "Failed to create receive link " + linkName, + true); + + // A Flux that produces a new AmqpReceiveLink each time it receives a request from the below + // 'AmqpReceiveLinkProcessor'. Obviously, the processor requests a link when there is a downstream subscriber. + // It also requests a new link (i.e. retry) when the current link it holds gets terminated + // (e.g., when the service decides to close that link). + // + final Flux receiveLinkFlux = retriableReceiveLinkMono.repeat(); + + final AmqpReceiveLinkProcessor linkMessageProcessor = receiveLinkFlux.subscribeWith( new AmqpReceiveLinkProcessor(entityPath, prefetchCount, connectionProcessor)); return new EventHubPartitionAsyncConsumer(linkMessageProcessor, messageSerializer, getFullyQualifiedNamespace(),