Skip to content

Commit

Permalink
[Amqp-core, EH]: Prepending namespace|entitypath consistenty in log, …
Browse files Browse the repository at this point in the history
…first untrack processor subscriber then notify and adding retry to EventHubConsumer[Receiver]Client (#24417)
  • Loading branch information
anuchandy authored Sep 28, 2021
1 parent ffd48d4 commit 5ceab7e
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ public void onError(Throwable throwable) {
Objects.requireNonNull(throwable, "'throwable' is required.");

if (isRetryPending.get() && retryPolicy.calculateRetryDelay(throwable, retryAttempts.get()) != null) {
logger.warning("Retry is already pending. Ignoring transient error.", throwable);
logger.warning("namespace[{}] entityPath[{}]: Retry is already pending. Ignoring transient error.",
fullyQualifiedNamespace, entityPath, throwable);
return;
}

Expand Down Expand Up @@ -182,22 +183,24 @@ public void onError(Throwable throwable) {
return;
}

logger.info("Retry #{}. Transient error occurred. Retrying after {} ms.", attempts,
retryInterval.toMillis(), throwable);
logger.info("namespace[{}] entityPath[{}]: Retry #{}. Transient error occurred. Retrying after {} ms.",
fullyQualifiedNamespace, entityPath, attempts, retryInterval.toMillis(), throwable);

retrySubscription = Mono.delay(retryInterval).subscribe(i -> {
if (isDisposed()) {
logger.info("Retry #{}. Not requesting from upstream. Processor is disposed.", attempts);
logger.info("namespace[{}] entityPath[{}]: Retry #{}. Not requesting from upstream. Processor is disposed.",
fullyQualifiedNamespace, entityPath, attempts);
} else {
logger.info("Retry #{}. Requesting from upstream.", attempts);
logger.info("namespace[{}] entityPath[{}]: Retry #{}. Requesting from upstream.",
fullyQualifiedNamespace, entityPath, attempts);

requestUpstream();
isRetryPending.set(false);
}
});
} else {
logger.warning("entityPath[{}] Retry #{}. Retry attempts exhausted or exception was not retriable.",
entityPath, attempts, throwable);
logger.warning("namespace[{}] entityPath[{}]: Retry #{}. Retry attempts exhausted or exception was not retriable.",
fullyQualifiedNamespace, entityPath, attempts, throwable);

lastError = throwable;
isDisposed.set(true);
Expand All @@ -206,8 +209,8 @@ public void onError(Throwable throwable) {
synchronized (lock) {
final ConcurrentLinkedDeque<ChannelSubscriber<T>> currentSubscribers = subscribers;
subscribers = new ConcurrentLinkedDeque<>();
logger.info("namespace[{}] entityPath[{}]: Error in AMQP channel processor. Notifying {} "
+ "subscribers.", fullyQualifiedNamespace, entityPath, currentSubscribers.size());
logger.info("namespace[{}] entityPath[{}]: Error in AMQP channel processor. Notifying {} subscribers.",
fullyQualifiedNamespace, entityPath, currentSubscribers.size());

currentSubscribers.forEach(subscriber -> subscriber.onError(throwable));
}
Expand Down Expand Up @@ -254,8 +257,8 @@ public void subscribe(CoreSubscriber<? super T> actual) {
}

subscribers.add(subscriber);
logger.verbose("Added a subscriber {} to AMQP channel processor. Total "
+ "subscribers = {}", subscriber, subscribers.size());
logger.verbose("namespace[{}] entityPath[{}]: Added a subscriber {} to AMQP channel processor. Total "
+ "subscribers = {}", fullyQualifiedNamespace, entityPath, subscriber, subscribers.size());

if (!isRetryPending.get()) {
requestUpstream();
Expand Down Expand Up @@ -350,7 +353,11 @@ private void close(T channel) {
}

/**
* Represents a subscriber, waiting for an AMQP connection.
* Represents the decorator-subscriber wrapping a downstream subscriber to AmqpChannelProcessor.
* These are the subscribers waiting to receive a channel that is yet to be available in the AmqpChannelProcessor.
* The AmqpChannelProcessor tracks a list of such waiting subscribers; once the processor receives
* a result (channel, error or disposal) from it's upstream, each decorated-subscriber will be notified,
* which removes itself from the tracking list, then propagates the notification to the wrapped subscriber.
*/
private static final class ChannelSubscriber<T> extends Operators.MonoSubscriber<T, T> {
private final AmqpChannelProcessor<T> processor;
Expand All @@ -362,15 +369,16 @@ private ChannelSubscriber(CoreSubscriber<? super T> actual, AmqpChannelProcessor

@Override
public void cancel() {
super.cancel();
processor.subscribers.remove(this);
super.cancel();
}

@Override
public void onComplete() {
if (!isCancelled()) {
actual.onComplete();
// first untrack before calling into external code.
processor.subscribers.remove(this);
actual.onComplete();
}
}

Expand All @@ -384,8 +392,8 @@ public void onNext(T channel) {
@Override
public void onError(Throwable throwable) {
if (!isCancelled()) {
actual.onError(throwable);
processor.subscribers.remove(this);
actual.onError(throwable);
} else {
Operators.onErrorDropped(throwable, currentContext());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import com.azure.core.amqp.implementation.AmqpReceiveLink;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.RetryUtil;
import com.azure.core.amqp.implementation.StringUtil;
import com.azure.core.annotation.ReturnType;
import com.azure.core.annotation.ServiceClient;
Expand Down Expand Up @@ -352,15 +353,37 @@ private EventHubPartitionAsyncConsumer createPartitionConsumer(String linkName,
getEventHubName(), consumerGroup, partitionId);

final AtomicReference<Supplier<EventPosition>> initialPosition = new AtomicReference<>(() -> startingPosition);
final Flux<AmqpReceiveLink> receiveLinkMono = connectionProcessor

// The Mono, when subscribed, creates a AmqpReceiveLink in the AmqpConnection emitted by the connectionProcessor
//
final Mono<AmqpReceiveLink> receiveLinkMono = connectionProcessor
.flatMap(connection -> {
logger.info("connectionId[{}] linkName[{}] Creating receive consumer for partition '{}'",
connection.getId(), linkName, partitionId);
return connection.createReceiveLink(linkName, entityPath, initialPosition.get().get(), receiveOptions);
})
.repeat();

final AmqpReceiveLinkProcessor linkMessageProcessor = receiveLinkMono.subscribeWith(
});

// A Mono that resubscribes to 'receiveLinkMono' to retry the creation of AmqpReceiveLink.
//
// The scenarios where this retry helps are -
// [1]. When we try to create a link on a session being disposed but connection is healthy, the retry can
// eventually create a new session then the link.
// [2]. When we try to create a new session (to host the new link) but on a connection being disposed,
// the retry can eventually receives a new connection and then proceed with creating session and link.
//
final Mono<AmqpReceiveLink> retriableReceiveLinkMono = RetryUtil.withRetry(receiveLinkMono,
connectionProcessor.getRetryOptions(),
"Failed to create receive link " + linkName,
true);

// A Flux that produces a new AmqpReceiveLink each time it receives a request from the below
// 'AmqpReceiveLinkProcessor'. Obviously, the processor requests a link when there is a downstream subscriber.
// It also requests a new link (i.e. retry) when the current link it holds gets terminated
// (e.g., when the service decides to close that link).
//
final Flux<AmqpReceiveLink> receiveLinkFlux = retriableReceiveLinkMono.repeat();

final AmqpReceiveLinkProcessor linkMessageProcessor = receiveLinkFlux.subscribeWith(
new AmqpReceiveLinkProcessor(entityPath, prefetchCount, connectionProcessor));

return new EventHubPartitionAsyncConsumer(linkMessageProcessor, messageSerializer, getFullyQualifiedNamespace(),
Expand Down

0 comments on commit 5ceab7e

Please sign in to comment.