Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Amqp-core, EH]: Consistent processor log lines, ensure processor untrack subscriber then signal, Retry for EH ConsumerClient #24417

Merged
merged 1 commit into from
Sep 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ public void onError(Throwable throwable) {
Objects.requireNonNull(throwable, "'throwable' is required.");

if (isRetryPending.get() && retryPolicy.calculateRetryDelay(throwable, retryAttempts.get()) != null) {
logger.warning("Retry is already pending. Ignoring transient error.", throwable);
logger.warning("namespace[{}] entityPath[{}]: Retry is already pending. Ignoring transient error.",
fullyQualifiedNamespace, entityPath, throwable);
return;
}

Expand Down Expand Up @@ -182,22 +183,24 @@ public void onError(Throwable throwable) {
return;
}

logger.info("Retry #{}. Transient error occurred. Retrying after {} ms.", attempts,
retryInterval.toMillis(), throwable);
logger.info("namespace[{}] entityPath[{}]: Retry #{}. Transient error occurred. Retrying after {} ms.",
fullyQualifiedNamespace, entityPath, attempts, retryInterval.toMillis(), throwable);

retrySubscription = Mono.delay(retryInterval).subscribe(i -> {
if (isDisposed()) {
logger.info("Retry #{}. Not requesting from upstream. Processor is disposed.", attempts);
logger.info("namespace[{}] entityPath[{}]: Retry #{}. Not requesting from upstream. Processor is disposed.",
fullyQualifiedNamespace, entityPath, attempts);
} else {
logger.info("Retry #{}. Requesting from upstream.", attempts);
logger.info("namespace[{}] entityPath[{}]: Retry #{}. Requesting from upstream.",
fullyQualifiedNamespace, entityPath, attempts);

requestUpstream();
isRetryPending.set(false);
}
});
} else {
logger.warning("entityPath[{}] Retry #{}. Retry attempts exhausted or exception was not retriable.",
entityPath, attempts, throwable);
logger.warning("namespace[{}] entityPath[{}]: Retry #{}. Retry attempts exhausted or exception was not retriable.",
fullyQualifiedNamespace, entityPath, attempts, throwable);

lastError = throwable;
isDisposed.set(true);
Expand All @@ -206,8 +209,8 @@ public void onError(Throwable throwable) {
synchronized (lock) {
final ConcurrentLinkedDeque<ChannelSubscriber<T>> currentSubscribers = subscribers;
subscribers = new ConcurrentLinkedDeque<>();
logger.info("namespace[{}] entityPath[{}]: Error in AMQP channel processor. Notifying {} "
+ "subscribers.", fullyQualifiedNamespace, entityPath, currentSubscribers.size());
logger.info("namespace[{}] entityPath[{}]: Error in AMQP channel processor. Notifying {} subscribers.",
fullyQualifiedNamespace, entityPath, currentSubscribers.size());

currentSubscribers.forEach(subscriber -> subscriber.onError(throwable));
}
Expand Down Expand Up @@ -254,8 +257,8 @@ public void subscribe(CoreSubscriber<? super T> actual) {
}

subscribers.add(subscriber);
logger.verbose("Added a subscriber {} to AMQP channel processor. Total "
+ "subscribers = {}", subscriber, subscribers.size());
logger.verbose("namespace[{}] entityPath[{}]: Added a subscriber {} to AMQP channel processor. Total "
+ "subscribers = {}", fullyQualifiedNamespace, entityPath, subscriber, subscribers.size());

if (!isRetryPending.get()) {
requestUpstream();
Expand Down Expand Up @@ -350,7 +353,11 @@ private void close(T channel) {
}

/**
* Represents a subscriber, waiting for an AMQP connection.
* Represents the decorator-subscriber wrapping a downstream subscriber to AmqpChannelProcessor.
* These are the subscribers waiting to receive a channel that is yet to be available in the AmqpChannelProcessor.
* The AmqpChannelProcessor tracks a list of such waiting subscribers; once the processor receives
* a result (channel, error or disposal) from it's upstream, each decorated-subscriber will be notified,
* which removes itself from the tracking list, then propagates the notification to the wrapped subscriber.
*/
private static final class ChannelSubscriber<T> extends Operators.MonoSubscriber<T, T> {
private final AmqpChannelProcessor<T> processor;
Expand All @@ -362,15 +369,16 @@ private ChannelSubscriber(CoreSubscriber<? super T> actual, AmqpChannelProcessor

@Override
public void cancel() {
super.cancel();
processor.subscribers.remove(this);
super.cancel();
}

@Override
public void onComplete() {
if (!isCancelled()) {
actual.onComplete();
// first untrack before calling into external code.
processor.subscribers.remove(this);
actual.onComplete();
}
}

Expand All @@ -384,8 +392,8 @@ public void onNext(T channel) {
@Override
public void onError(Throwable throwable) {
if (!isCancelled()) {
actual.onError(throwable);
processor.subscribers.remove(this);
actual.onError(throwable);
} else {
Operators.onErrorDropped(throwable, currentContext());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import com.azure.core.amqp.implementation.AmqpReceiveLink;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.RetryUtil;
import com.azure.core.amqp.implementation.StringUtil;
import com.azure.core.annotation.ReturnType;
import com.azure.core.annotation.ServiceClient;
Expand Down Expand Up @@ -352,15 +353,37 @@ private EventHubPartitionAsyncConsumer createPartitionConsumer(String linkName,
getEventHubName(), consumerGroup, partitionId);

final AtomicReference<Supplier<EventPosition>> initialPosition = new AtomicReference<>(() -> startingPosition);
final Flux<AmqpReceiveLink> receiveLinkMono = connectionProcessor

// The Mono, when subscribed, creates a AmqpReceiveLink in the AmqpConnection emitted by the connectionProcessor
//
final Mono<AmqpReceiveLink> receiveLinkMono = connectionProcessor
.flatMap(connection -> {
logger.info("connectionId[{}] linkName[{}] Creating receive consumer for partition '{}'",
connection.getId(), linkName, partitionId);
return connection.createReceiveLink(linkName, entityPath, initialPosition.get().get(), receiveOptions);
})
.repeat();

final AmqpReceiveLinkProcessor linkMessageProcessor = receiveLinkMono.subscribeWith(
});

// A Mono that resubscribes to 'receiveLinkMono' to retry the creation of AmqpReceiveLink.
//
// The scenarios where this retry helps are -
// [1]. When we try to create a link on a session being disposed but connection is healthy, the retry can
// eventually create a new session then the link.
// [2]. When we try to create a new session (to host the new link) but on a connection being disposed,
// the retry can eventually receives a new connection and then proceed with creating session and link.
//
final Mono<AmqpReceiveLink> retriableReceiveLinkMono = RetryUtil.withRetry(receiveLinkMono,
connectionProcessor.getRetryOptions(),
"Failed to create receive link " + linkName,
true);

// A Flux that produces a new AmqpReceiveLink each time it receives a request from the below
// 'AmqpReceiveLinkProcessor'. Obviously, the processor requests a link when there is a downstream subscriber.
// It also requests a new link (i.e. retry) when the current link it holds gets terminated
// (e.g., when the service decides to close that link).
//
final Flux<AmqpReceiveLink> receiveLinkFlux = retriableReceiveLinkMono.repeat();

final AmqpReceiveLinkProcessor linkMessageProcessor = receiveLinkFlux.subscribeWith(
new AmqpReceiveLinkProcessor(entityPath, prefetchCount, connectionProcessor));

return new EventHubPartitionAsyncConsumer(linkMessageProcessor, messageSerializer, getFullyQualifiedNamespace(),
Expand Down