diff --git a/eng/spotbugs-aggregate-report/pom.xml b/eng/spotbugs-aggregate-report/pom.xml index ec012ce84e0d..dbc497d34c77 100644 --- a/eng/spotbugs-aggregate-report/pom.xml +++ b/eng/spotbugs-aggregate-report/pom.xml @@ -15,9 +15,10 @@ 1.0.0 - 1.2.0 5.0.1 - 2.0.0 + 2.3.0 + 2.5.0 + 1.2.0 10.5.0 @@ -67,7 +68,7 @@ com.microsoft.azure azure-eventhubs-eph - ${azure-eventhubs.version} + ${azure-eventhubs-eph.version} com.microsoft.azure diff --git a/eventhubs/data-plane/ConsumingEvents.md b/eventhubs/data-plane/ConsumingEvents.md index cff94be57c9e..450247efcb71 100644 --- a/eventhubs/data-plane/ConsumingEvents.md +++ b/eventhubs/data-plane/ConsumingEvents.md @@ -23,10 +23,10 @@ This library is available for use in Maven projects from the Maven Central Repos following dependency declaration inside of your Maven project file: ```XML - - com.microsoft.azure - azure-eventhubs - 2.0.0 + + com.microsoft.azure + azure-eventhubs + 2.3.0 ``` diff --git a/eventhubs/data-plane/PublishingEvents.md b/eventhubs/data-plane/PublishingEvents.md index 5a9eb428e531..c83007c0174b 100644 --- a/eventhubs/data-plane/PublishingEvents.md +++ b/eventhubs/data-plane/PublishingEvents.md @@ -9,11 +9,11 @@ This library is available for use in Maven projects from the Maven Central Repos following dependency declaration inside of your Maven project file: ```XML - - com.microsoft.azure - azure-eventhubs - 2.0.0 - + + com.microsoft.azure + azure-eventhubs + 2.3.0 + ``` For different types of build environments, the latest released JAR files can also be [explicitly obtained from the diff --git a/eventhubs/data-plane/azure-eventhubs-eph/pom.xml b/eventhubs/data-plane/azure-eventhubs-eph/pom.xml index 33d31c4e0ace..649dc2322e69 100644 --- a/eventhubs/data-plane/azure-eventhubs-eph/pom.xml +++ b/eventhubs/data-plane/azure-eventhubs-eph/pom.xml @@ -4,22 +4,22 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + com.microsoft.azure + azure-eventhubs-clients + 2.3.0 + ../pom.xml + + 4.0.0 com.microsoft.azure azure-eventhubs-eph - 2.2.0 + 2.5.0 Microsoft Azure SDK for Event Hubs Event Processor Host(EPH) EPH is built on top of the Azure Event Hubs Client and provides a number of features not present in that lower layer https://github.com/Azure/azure-sdk-for-java - - com.microsoft.azure - azure-eventhubs-clients - 2.0.0 - ../pom.xml - - azure-java-build-docs diff --git a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/AzureStorageCheckpointLeaseManager.java b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/AzureStorageCheckpointLeaseManager.java index 91aa4a89486a..e44b6d2ccdf6 100644 --- a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/AzureStorageCheckpointLeaseManager.java +++ b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/AzureStorageCheckpointLeaseManager.java @@ -34,6 +34,7 @@ import java.util.Hashtable; import java.util.Iterator; import java.util.List; +import java.util.NoSuchElementException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.regex.Matcher; @@ -331,10 +332,17 @@ public CompletableFuture> getAllLeases() { (bp.getLeaseState() == LeaseState.LEASED))); }); future = CompletableFuture.completedFuture(infos); - } catch (URISyntaxException | StorageException e) { + } catch (URISyntaxException | StorageException | NoSuchElementException e) { + Throwable effective = e; + if (e instanceof NoSuchElementException) { + // If there is a StorageException in the forEach, it arrives wrapped in a NoSuchElementException. + // Strip the misleading NoSuchElementException to provide a meaningful error for the user. + effective = e.getCause(); + } + TRACE_LOGGER.warn(this.hostContext.withHost("Failure while getting lease state details"), e); - future = new CompletableFuture>(); - future.completeExceptionally(LoggingUtils.wrapException(e, EventProcessorHostActionStrings.GETTING_LEASE)); + future = new CompletableFuture<>(); + future.completeExceptionally(LoggingUtils.wrapException(effective, EventProcessorHostActionStrings.GETTING_LEASE)); } return future; diff --git a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventProcessorHost.java b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventProcessorHost.java index 3f52e162b227..c9eaf61e266d 100644 --- a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventProcessorHost.java +++ b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventProcessorHost.java @@ -11,6 +11,7 @@ import java.net.URISyntaxException; import java.security.InvalidKeyException; +import java.util.Locale; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -23,7 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger; /*** - * The main class of event processor host. + * The main class of event processor host. */ public final class EventProcessorHost { private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(EventProcessorHost.class); @@ -268,15 +269,14 @@ public EventProcessorHost( if (leaseManager == null) { throw new IllegalArgumentException("Must provide an object which implements ILeaseManager"); } - // executorService argument is allowed to be null, that is the indication to use an internal threadpool. + // executorService argument is allowed to be null, that is the indication to use an internal threadpool. // Normally will not be null because we're using the AzureStorage implementation. // If it is null, we're using user-supplied implementation. Establish generic defaults // in case the user doesn't provide an options object. this.partitionManagerOptions = new PartitionManagerOptions(); - if (executorService != null) { // User has supplied an ExecutorService, so use that. this.weOwnExecutor = false; @@ -560,7 +560,7 @@ public Thread newThread(Runnable r) { } private String getNamePrefix() { - return String.format("[%s|%s|%s]-%s-", + return String.format(Locale.US, "[%s|%s|%s]-%s-", this.entityName, this.consumerGroupName, this.hostName, POOL_NUMBER.getAndIncrement()); } diff --git a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventProcessorOptions.java b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventProcessorOptions.java index 06637660e557..1b0bd2783dc0 100644 --- a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventProcessorOptions.java +++ b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventProcessorOptions.java @@ -11,6 +11,9 @@ import java.util.function.Consumer; import java.util.function.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /*** * Options affecting the behavior of the event processor host instance in general. */ @@ -25,6 +28,8 @@ public final class EventProcessorOptions { return EventPosition.fromStartOfStream(); }; + private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(EventProcessorOptions.class); + public EventProcessorOptions() { } @@ -112,7 +117,7 @@ public int getPrefetchCount() { /*** * Sets the prefetch count for the underlying event hub client. * - * The default is 500. This controls how many events are received in advance. + * The default is 300. This controls how many events are received in advance. * * @param prefetchCount The new prefetch count. */ @@ -210,7 +215,11 @@ void notifyOfException(String hostname, Exception exception, String action, Stri // Capture handler so it doesn't get set to null between test and use Consumer handler = this.exceptionNotificationHandler; if (handler != null) { - handler.accept(new ExceptionReceivedEventArgs(hostname, exception, action, partitionId)); + try { + handler.accept(new ExceptionReceivedEventArgs(hostname, exception, action, partitionId)); + } catch (Exception e) { + TRACE_LOGGER.error("host " + hostname + ": caught exception from user-provided exception notification handler", e); + } } } diff --git a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/InMemoryLeaseManager.java b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/InMemoryLeaseManager.java index a2238c2bca9a..9b95d5457129 100644 --- a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/InMemoryLeaseManager.java +++ b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/InMemoryLeaseManager.java @@ -14,7 +14,7 @@ import java.util.concurrent.ConcurrentHashMap; /*** - * An ILeaseManager implementation based on an in-memory store. + * An ILeaseManager implementation based on an in-memory store. * * THIS CLASS IS PROVIDED AS A CONVENIENCE FOR TESTING ONLY. All data stored via this class is in memory * only and not persisted in any way. In addition, it is only visible within the same process: multiple @@ -46,11 +46,11 @@ public InMemoryLeaseManager() { public void initialize(HostContext hostContext) { this.hostContext = hostContext; } - + public void setLatency(long milliseconds) { this.millisecondsLatency = milliseconds; } - + private void latency(String caller) { if (this.millisecondsLatency > 0) { try { @@ -91,7 +91,7 @@ public CompletableFuture deleteLeaseStore() { latency("deleteLeaseStore"); return CompletableFuture.completedFuture(null); } - + @Override public CompletableFuture getLease(String partitionId) { TRACE_LOGGER.debug(this.hostContext.withHost("getLease()")); @@ -110,7 +110,7 @@ public CompletableFuture> getAllLeases() { latency("getAllLeasesStateInfo"); return CompletableFuture.completedFuture(infos); } - + @Override public CompletableFuture createAllLeasesIfNotExists(List partitionIds) { ArrayList> createFutures = new ArrayList>(); diff --git a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionManager.java b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionManager.java index b6ed86fe462d..8f419a35e6e2 100644 --- a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionManager.java +++ b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionManager.java @@ -283,16 +283,21 @@ private Void scan(boolean isFirst) { TRACE_LOGGER.debug(this.hostContext.withHost("Starting lease scan")); long start = System.currentTimeMillis(); - (new PartitionScanner(this.hostContext, (lease) -> this.pumpManager.addPump(lease), this)).scan(isFirst) + try { + (new PartitionScanner(this.hostContext, (lease) -> this.pumpManager.addPump(lease), this)).scan(isFirst) .whenCompleteAsync((didSteal, e) -> { TRACE_LOGGER.debug(this.hostContext.withHost("Scanning took " + (System.currentTimeMillis() - start))); + if ((e != null) && !(e instanceof ClosingException)) { + TRACE_LOGGER.warn(this.hostContext.withHost("Lease scanner got exception"), e); + } + onPartitionCheckCompleteTestHook(); // Schedule the next scan unless we are shutting down. if (!this.getIsClosingOrClosed()) { int seconds = didSteal ? this.hostContext.getPartitionManagerOptions().getFastScanIntervalInSeconds() - : this.hostContext.getPartitionManagerOptions().getSlowScanIntervalInSeconds(); + : this.hostContext.getPartitionManagerOptions().getSlowScanIntervalInSeconds(); if (isFirst) { seconds = this.hostContext.getPartitionManagerOptions().getStartupScanDelayInSeconds(); } @@ -301,9 +306,19 @@ private Void scan(boolean isFirst) { } TRACE_LOGGER.debug(this.hostContext.withHost("Scheduling lease scanner in " + seconds)); } else { - TRACE_LOGGER.debug(this.hostContext.withHost("Not scheduling lease scanner due to shutdown")); + TRACE_LOGGER.warn(this.hostContext.withHost("Not scheduling lease scanner due to shutdown")); } }, this.hostContext.getExecutor()); + } catch (Exception e) { + TRACE_LOGGER.error(this.hostContext.withHost("Lease scanner threw directly"), e); + if (!this.getIsClosingOrClosed()) { + int seconds = this.hostContext.getPartitionManagerOptions().getSlowScanIntervalInSeconds(); + synchronized (this.scanFutureSynchronizer) { + this.scanFuture = this.hostContext.getExecutor().schedule(() -> scan(false), seconds, TimeUnit.SECONDS); + } + TRACE_LOGGER.debug(this.hostContext.withHost("Forced schedule of lease scanner in " + seconds)); + } + } return null; } diff --git a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionPump.java b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionPump.java index ef774f314eae..d2e16d1e3c09 100644 --- a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionPump.java +++ b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionPump.java @@ -135,7 +135,7 @@ private CompletableFuture openClientsRetryWrapper() { // trace exceptions from the final attempt, or ReceiverDisconnectedException. return retryResult.handleAsync((r, e) -> { if (e == null) { - // IEventProcessor.onOpen is called from the base PartitionPump and must have returned in order for execution to reach here, + // IEventProcessor.onOpen is called from the base PartitionPump and must have returned in order for execution to reach here, // meaning it is safe to set the handler and start calling IEventProcessor.onEvents. this.partitionReceiver.setReceiveHandler(this, this.hostContext.getEventProcessorOptions().getInvokeProcessorAfterReceiveTimeout()); } else { diff --git a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionScanner.java b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionScanner.java index 429798d6520e..bafa1af7b381 100644 --- a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionScanner.java +++ b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionScanner.java @@ -312,7 +312,7 @@ private CompletableFuture stealLeases(List stealThese) { return allSteals; } - + private static class AcquisitionHolder { private CompleteLease acquiredLease; diff --git a/eventhubs/data-plane/azure-eventhubs-extensions/pom.xml b/eventhubs/data-plane/azure-eventhubs-extensions/pom.xml index cb42b1e68178..45bbd35f6b58 100644 --- a/eventhubs/data-plane/azure-eventhubs-extensions/pom.xml +++ b/eventhubs/data-plane/azure-eventhubs-extensions/pom.xml @@ -4,6 +4,13 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + com.microsoft.azure + azure-eventhubs-clients + 2.3.0 + ../pom.xml + + 4.0.0 com.microsoft.azure azure-eventhubs-extensions @@ -12,13 +19,6 @@ Extensions built on Microsoft Azure Event Hubs https://github.com/Azure/azure-sdk-for-java - - com.microsoft.azure - azure-eventhubs-clients - 2.0.0 - ../pom.xml - - azure-java-build-docs diff --git a/eventhubs/data-plane/azure-eventhubs/pom.xml b/eventhubs/data-plane/azure-eventhubs/pom.xml index 93bc7d05635b..c7a1e80f3f5a 100644 --- a/eventhubs/data-plane/azure-eventhubs/pom.xml +++ b/eventhubs/data-plane/azure-eventhubs/pom.xml @@ -4,6 +4,13 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + com.microsoft.azure + azure-eventhubs-clients + 2.3.0 + ../pom.xml + + 4.0.0 com.microsoft.azure azure-eventhubs @@ -12,13 +19,6 @@ Libraries built on Microsoft Azure Event Hubs https://github.com/Azure/azure-sdk-for-java - - com.microsoft.azure - azure-eventhubs-clients - 2.0.0 - ../pom.xml - - azure-java-build-docs diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventData.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventData.java index c361076eb9e7..e2091f2977e6 100755 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventData.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventData.java @@ -147,6 +147,7 @@ static EventData create(final ByteBuffer buffer) { * @see SystemProperties#getEnqueuedTime */ SystemProperties getSystemProperties(); + void setSystemProperties(SystemProperties props); class SystemProperties extends HashMap { private static final long serialVersionUID = -2827050124966993723L; @@ -155,6 +156,13 @@ public SystemProperties(final HashMap map) { super(Collections.unmodifiableMap(map)); } + public SystemProperties(final long sequenceNumber, final Instant enqueuedTimeUtc, final String offset, final String partitionKey) { + this.put(AmqpConstants.SEQUENCE_NUMBER_ANNOTATION_NAME, sequenceNumber); + this.put(AmqpConstants.ENQUEUED_TIME_UTC_ANNOTATION_NAME, new Date(enqueuedTimeUtc.toEpochMilli())); + this.put(AmqpConstants.OFFSET_ANNOTATION_NAME, offset); + this.put(AmqpConstants.PARTITION_KEY_ANNOTATION_NAME, partitionKey); + } + public String getOffset() { return this.getSystemProperty(AmqpConstants.OFFSET_ANNOTATION_NAME); } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiver.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiver.java index b48a67f11fc4..4edbcf56f790 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiver.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiver.java @@ -25,7 +25,7 @@ public interface PartitionReceiver { int MINIMUM_PREFETCH_COUNT = 1; int DEFAULT_PREFETCH_COUNT = 500; - int MAXIMUM_PREFETCH_COUNT = 2000; + int MAXIMUM_PREFETCH_COUNT = 8000; long NULL_EPOCH = 0; diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/ReceiverOptions.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/ReceiverOptions.java index 315927bf8e67..5dae17cc69f4 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/ReceiverOptions.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/ReceiverOptions.java @@ -79,7 +79,6 @@ public String getIdentifier() { * EventHubs service will throw {@link QuotaExceededException} and will include this identifier. * So, its very critical to choose a value, which can uniquely identify the whereabouts of {@link PartitionReceiver}. *

- *

* * @param value string to identify {@link PartitionReceiver} */ diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ActiveClientTokenManager.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ActiveClientTokenManager.java index a81285f71aeb..63a20902717f 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ActiveClientTokenManager.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ActiveClientTokenManager.java @@ -40,6 +40,10 @@ final class ActiveClientTokenManager { } public void cancel() { + if (TRACE_LOGGER.isInfoEnabled()) { + TRACE_LOGGER.info(String.format(Locale.US, "clientEntity[%s] - canceling ActiveClientLinkManager", + clientEntity.getClientId())); + } synchronized (this.timerLock) { this.timer.cancel(false); @@ -61,9 +65,8 @@ public void run() { } else { if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info( - String.format(Locale.US, - "clientEntity[%s] - closing ActiveClientLinkManager", clientEntity.getClientId())); + TRACE_LOGGER.info(String.format(Locale.US, "clientEntity[%s] - closing ActiveClientLinkManager", + clientEntity.getClientId())); } } } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/AmqpConstants.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/AmqpConstants.java index eac5f39c9caa..cad341a75721 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/AmqpConstants.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/AmqpConstants.java @@ -12,6 +12,7 @@ public final class AmqpConstants { public static final String APACHE = "apache.org"; + public static final String PROTON = "proton"; public static final String VENDOR = "com.microsoft"; public static final String AMQP_ANNOTATION_FORMAT = "amqp.annotation.%s >%s '%s'"; public static final String OFFSET_ANNOTATION_NAME = "x-opt-offset"; diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/BaseLinkHandler.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/BaseLinkHandler.java index 991c3b1cf9b2..45fe9885baca 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/BaseLinkHandler.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/BaseLinkHandler.java @@ -12,12 +12,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Locale; + public class BaseLinkHandler extends BaseHandler { protected static final Logger TRACE_LOGGER = LoggerFactory.getLogger(BaseLinkHandler.class); + private final String name; private final AmqpLink underlyingEntity; - public BaseLinkHandler(final AmqpLink amqpLink) { + public BaseLinkHandler(final AmqpLink amqpLink, final String name) { + this.name = name; this.underlyingEntity = amqpLink; } @@ -27,10 +31,8 @@ public void onLinkLocalClose(Event event) { final ErrorCondition condition = link.getCondition(); if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format("onLinkLocalClose linkName[%s], errorCondition[%s], errorDescription[%s]", - link.getName(), - condition != null ? condition.getCondition() : "n/a", - condition != null ? condition.getDescription() : "n/a")); + TRACE_LOGGER.info(String.format(Locale.US, "onLinkLocalClose clientName[%s], linkName[%s], errorCondition[%s], errorDescription[%s]", + this.name, link.getName(), condition != null ? condition.getCondition() : "n/a", condition != null ? condition.getDescription() : "n/a")); } closeSession(link, link.getCondition()); @@ -39,13 +41,11 @@ public void onLinkLocalClose(Event event) { @Override public void onLinkRemoteClose(Event event) { final Link link = event.getLink(); - final ErrorCondition condition = link.getCondition(); + final ErrorCondition condition = link.getRemoteCondition(); if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format("onLinkRemoteClose linkName[%s], errorCondition[%s], errorDescription[%s]", - link.getName(), - condition != null ? condition.getCondition() : "n/a", - condition != null ? condition.getDescription() : "n/a")); + TRACE_LOGGER.info(String.format(Locale.US, "onLinkRemoteClose clientName[%s], linkName[%s], errorCondition[%s], errorDescription[%s]", + this.name, link.getName(), condition != null ? condition.getCondition() : "n/a", condition != null ? condition.getDescription() : "n/a")); } handleRemoteLinkClosed(event); @@ -57,10 +57,8 @@ public void onLinkRemoteDetach(Event event) { final ErrorCondition condition = link.getCondition(); if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format("onLinkRemoteDetach linkName[%s], errorCondition[%s], errorDescription[%s]", - link.getName(), - condition != null ? condition.getCondition() : "n/a", - condition != null ? condition.getDescription() : "n/a")); + TRACE_LOGGER.info(String.format(Locale.US, "onLinkRemoteDetach clientName[%s], linkName[%s], errorCondition[%s], errorDescription[%s]", + this.name, link.getName(), condition != null ? condition.getCondition() : "n/a", condition != null ? condition.getDescription() : "n/a")); } handleRemoteLinkClosed(event); @@ -68,16 +66,19 @@ public void onLinkRemoteDetach(Event event) { public void processOnClose(Link link, ErrorCondition condition) { if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format("processOnClose linkName[%s], errorCondition[%s], errorDescription[%s]", - link.getName(), - condition != null ? condition.getCondition() : "n/a", - condition != null ? condition.getDescription() : "n/a")); + TRACE_LOGGER.info(String.format(Locale.US, "processOnClose clientName[%s], linkName[%s], errorCondition[%s], errorDescription[%s]", + this.name, link.getName(), condition != null ? condition.getCondition() : "n/a", condition != null ? condition.getDescription() : "n/a")); } this.underlyingEntity.onClose(condition); } public void processOnClose(Link link, Exception exception) { + if (TRACE_LOGGER.isInfoEnabled()) { + TRACE_LOGGER.info(String.format(Locale.US, "processOnClose clientName[%s], linkName[%s], exception[%s]", + this.name, link.getName(), exception != null ? exception.getMessage() : "n/a")); + } + this.underlyingEntity.onError(exception); } @@ -86,10 +87,8 @@ private void closeSession(Link link, ErrorCondition condition) { if (session != null && session.getLocalState() != EndpointState.CLOSED) { if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format("closeSession for linkName[%s], errorCondition[%s], errorDescription[%s]", - link.getName(), - condition != null ? condition.getCondition() : "n/a", - condition != null ? condition.getDescription() : "n/a")); + TRACE_LOGGER.info(String.format(Locale.US, "closeSession for clientName[%s], linkName[%s], errorCondition[%s], errorDescription[%s]", + this.name, link.getName(), condition != null ? condition.getCondition() : "n/a", condition != null ? condition.getDescription() : "n/a")); } session.setCondition(condition); diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/CBSChannel.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/CBSChannel.java index 27b870ddf454..4dcfcd6c5c38 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/CBSChannel.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/CBSChannel.java @@ -17,11 +17,12 @@ final class CBSChannel { CBSChannel( final SessionProvider sessionProvider, - final AmqpConnection connection) { + final AmqpConnection connection, + final String clientId) { RequestResponseCloser closer = new RequestResponseCloser(); this.innerChannel = new FaultTolerantObject<>( - new RequestResponseOpener(sessionProvider, "cbs-session", "cbs", ClientConstants.CBS_ADDRESS, connection), + new RequestResponseOpener(sessionProvider, clientId, "cbs-session", "cbs", ClientConstants.CBS_ADDRESS, connection), closer); closer.setInnerChannel(this.innerChannel); } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ClientConstants.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ClientConstants.java index 9d140815aef1..2fb6bee48ebe 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ClientConstants.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ClientConstants.java @@ -19,6 +19,7 @@ public final class ClientConstants { public static final Symbol STORE_LOCK_LOST_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":store-lock-lost"); public static final Symbol PUBLISHER_REVOKED_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":publisher-revoked"); public static final Symbol TIMEOUT_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":timeout"); + public static final Symbol PROTON_IO_ERROR = Symbol.getSymbol(AmqpConstants.PROTON + ":io"); public static final Symbol TRACKING_ID_PROPERTY = Symbol.getSymbol(AmqpConstants.VENDOR + ":tracking-id"); public static final int MAX_MESSAGE_LENGTH_BYTES = 256 * 1024; public static final int MAX_FRAME_SIZE_BYTES = 64 * 1024; @@ -26,7 +27,7 @@ public final class ClientConstants { public static final Duration TIMER_TOLERANCE = Duration.ofSeconds(1); public static final Duration DEFAULT_RETRY_MIN_BACKOFF = Duration.ofSeconds(0); public static final Duration DEFAULT_RETRY_MAX_BACKOFF = Duration.ofSeconds(30); - public static final Duration TOKEN_REFRESH_INTERVAL = Duration.ofMinutes(10); // renew every 10 mins, which expires 20 mins + public static final Duration TOKEN_REFRESH_INTERVAL = Duration.ofMinutes(5); // renew every 5 minutes, which expires 20 minutes public static final Duration TOKEN_VALIDITY = Duration.ofMinutes(20); public static final int DEFAULT_MAX_RETRY_COUNT = 10; public static final boolean DEFAULT_IS_TRANSIENT = true; @@ -36,7 +37,7 @@ public final class ClientConstants { public static final String NO_RETRY = "NoRetry"; public static final String DEFAULT_RETRY = "Default"; public static final String PRODUCT_NAME = "MSJavaClient"; - public static final String CURRENT_JAVACLIENT_VERSION = "2.0.0"; + public static final String CURRENT_JAVACLIENT_VERSION = "2.3.0"; public static final String PLATFORM_INFO = getPlatformInfo(); public static final String FRAMEWORK_INFO = getFrameworkInfo(); public static final String CBS_ADDRESS = "$cbs"; @@ -76,6 +77,9 @@ public final class ClientConstants { public static final String HTTPS_URI_FORMAT = "https://%s:%s"; public static final int MAX_RECEIVER_NAME_LENGTH = 64; + public static final String COMMUNICATION_EXCEPTION_GENERIC_MESSAGE = "A communication error has occurred. " + + "This may be due to an incorrect host name in your connection string or a problem with your network connection."; + /** * This is a constant defined to represent the start of a partition stream in EventHub. */ diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ConnectionHandler.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ConnectionHandler.java index 445f0dc66f5e..1e8bf4c60685 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ConnectionHandler.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ConnectionHandler.java @@ -22,21 +22,21 @@ import java.util.Locale; import java.util.Map; -// ServiceBus <-> ProtonReactor interaction handles all -// amqp_connection/transport related events from reactor public class ConnectionHandler extends BaseHandler { private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(ConnectionHandler.class); private final AmqpConnection amqpConnection; + private final String connectionId; - protected ConnectionHandler(final AmqpConnection amqpConnection) { + protected ConnectionHandler(final AmqpConnection amqpConnection, final String connectionId) { add(new Handshaker()); this.amqpConnection = amqpConnection; + this.connectionId = connectionId; } - static ConnectionHandler create(TransportType transportType, AmqpConnection amqpConnection) { + static ConnectionHandler create(TransportType transportType, AmqpConnection amqpConnection, String connectionId) { switch (transportType) { case AMQP_WEB_SOCKETS: if (WebSocketProxyConnectionHandler.shouldUseProxy(amqpConnection.getHostName())) { @@ -46,7 +46,7 @@ static ConnectionHandler create(TransportType transportType, AmqpConnection amqp } case AMQP: default: - return new ConnectionHandler(amqpConnection); + return new ConnectionHandler(amqpConnection, connectionId); } } @@ -67,17 +67,18 @@ protected AmqpConnection getAmqpConnection() { @Override public void onConnectionInit(Event event) { if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format(Locale.US, "onConnectionInit hostname[%s]", this.amqpConnection.getHostName())); + TRACE_LOGGER.info(String.format(Locale.US, "onConnectionInit hostname[%s], connectionId[%s]", + this.amqpConnection.getHostName(), this.connectionId)); } final Connection connection = event.getConnection(); final String hostName = new StringBuilder(this.amqpConnection.getHostName()) .append(":") - .append(String.valueOf(this.getProtocolPort())) + .append(this.getProtocolPort()) .toString(); connection.setHostname(hostName); - connection.setContainer(StringUtil.getRandomString()); + connection.setContainer(this.connectionId); final Map connectionProperties = new HashMap<>(); connectionProperties.put(AmqpConstants.PRODUCT, ClientConstants.PRODUCT_NAME); @@ -141,7 +142,8 @@ protected int getMaxFrameSize() { @Override public void onConnectionBound(Event event) { if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format(Locale.US, "onConnectionBound hostname[%s]", this.amqpConnection.getHostName())); + TRACE_LOGGER.info(String.format(Locale.US, "onConnectionBound hostname[%s], connectionId[%s]", + this.amqpConnection.getHostName(), this.connectionId)); } final Transport transport = event.getTransport(); @@ -154,8 +156,8 @@ public void onConnectionUnbound(Event event) { final Connection connection = event.getConnection(); if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format(Locale.US, "onConnectionUnbound: hostname[%s], state[%s], remoteState[%s]", - connection.getHostname(), connection.getLocalState(), connection.getRemoteState())); + TRACE_LOGGER.info(String.format(Locale.US, "onConnectionUnbound hostname[%s], connectionId[%s], state[%s], remoteState[%s]", + connection.getHostname(), this.connectionId, connection.getLocalState(), connection.getRemoteState())); } // if failure happened while establishing transport - nothing to free up. @@ -172,9 +174,8 @@ public void onTransportError(Event event) { final ErrorCondition condition = transport.getCondition(); if (TRACE_LOGGER.isWarnEnabled()) { - TRACE_LOGGER.warn(String.format(Locale.US, "onTransportError: hostname[%s], error[%s]", - connection != null ? connection.getHostname() : "n/a", - condition != null ? condition.getDescription() : "n/a")); + TRACE_LOGGER.warn(String.format(Locale.US, "onTransportError hostname[%s], connectionId[%s], error[%s]", + connection != null ? connection.getHostname() : "n/a", this.connectionId, condition != null ? condition.getDescription() : "n/a")); } if (connection != null && connection.getRemoteState() != EndpointState.CLOSED) { @@ -197,8 +198,8 @@ public void onTransportClosed(Event event) { final ErrorCondition condition = transport.getCondition(); if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format(Locale.US, "onTransportClosed: hostname[%s], error[%s]", - connection != null ? connection.getHostname() : "n/a", (condition != null ? condition.getDescription() : "n/a"))); + TRACE_LOGGER.info(String.format(Locale.US, "onTransportClosed hostname[%s], connectionId[%s], error[%s]", + connection != null ? connection.getHostname() : "n/a", this.connectionId, (condition != null ? condition.getDescription() : "n/a"))); } if (connection != null && connection.getRemoteState() != EndpointState.CLOSED) { @@ -214,10 +215,8 @@ public void onConnectionLocalOpen(Event event) { final ErrorCondition error = connection.getCondition(); if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format(Locale.US, "onConnectionLocalOpen: hostname[%s], errorCondition[%s], errorDescription[%s]", - connection.getHostname(), - error != null ? error.getCondition() : "n/a", - error != null ? error.getDescription() : "n/a")); + TRACE_LOGGER.info(String.format(Locale.US, "onConnectionLocalOpen hostname[%s], connectionId[%s], errorCondition[%s], errorDescription[%s]", + connection.getHostname(), this.connectionId, error != null ? error.getCondition() : "n/a", error != null ? error.getDescription() : "n/a")); } } @@ -225,8 +224,8 @@ public void onConnectionLocalOpen(Event event) { public void onConnectionRemoteOpen(Event event) { if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format(Locale.US, "onConnectionRemoteOpen: hostname[%s], remoteContainer[%s]", - event.getConnection().getHostname(), event.getConnection().getRemoteContainer())); + TRACE_LOGGER.info(String.format(Locale.US, "onConnectionRemoteOpen hostname[%s], connectionId[%s], remoteContainer[%s]", + event.getConnection().getHostname(), this.connectionId, event.getConnection().getRemoteContainer())); } this.amqpConnection.onOpenComplete(null); @@ -239,10 +238,8 @@ public void onConnectionLocalClose(Event event) { final ErrorCondition error = connection.getCondition(); if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format(Locale.US, "onConnectionLocalClose: hostname[%s], errorCondition[%s], errorDescription[%s]", - connection.getHostname(), - error != null ? error.getCondition() : "n/a", - error != null ? error.getDescription() : "n/a")); + TRACE_LOGGER.info(String.format(Locale.US, "onConnectionLocalClose hostname[%s], connectionId[%s], errorCondition[%s], errorDescription[%s]", + connection.getHostname(), this.connectionId, error != null ? error.getCondition() : "n/a", error != null ? error.getDescription() : "n/a")); } if (connection.getRemoteState() == EndpointState.CLOSED) { @@ -261,10 +258,8 @@ public void onConnectionRemoteClose(Event event) { final ErrorCondition error = connection.getRemoteCondition(); if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format(Locale.US, "onConnectionRemoteClose: hostname[%s], errorCondition[%s], errorDescription[%s]", - connection.getHostname(), - error != null ? error.getCondition() : "n/a", - error != null ? error.getDescription() : "n/a")); + TRACE_LOGGER.info(String.format(Locale.US, "onConnectionRemoteClose hostname[%s], connectionId[%s], errorCondition[%s], errorDescription[%s]", + connection.getHostname(), this.connectionId, error != null ? error.getCondition() : "n/a", error != null ? error.getDescription() : "n/a")); } this.amqpConnection.onConnectionError(error); @@ -276,10 +271,8 @@ public void onConnectionFinal(Event event) { final ErrorCondition error = connection.getCondition(); if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format(Locale.US, "onConnectionFinal: hostname[%s], errorCondition[%s], errorDescription[%s]", - connection.getHostname(), - error != null ? error.getCondition() : "n/a", - error != null ? error.getDescription() : "n/a")); + TRACE_LOGGER.info(String.format(Locale.US, "onConnectionFinal hostname[%s], connectionId[%s], errorCondition[%s], errorDescription[%s]", + connection.getHostname(), this.connectionId, error != null ? error.getCondition() : "n/a", error != null ? error.getDescription() : "n/a")); } } } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/CustomIOHandler.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/CustomIOHandler.java index 2050bc5873d6..9169eb10b478 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/CustomIOHandler.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/CustomIOHandler.java @@ -16,14 +16,20 @@ public class CustomIOHandler extends IOHandler { private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(CustomIOHandler.class); + private final String name; + + public CustomIOHandler(final String name) { + this.name = name; + } + @Override public void onTransportClosed(Event event) { final Transport transport = event.getTransport(); final Connection connection = event.getConnection(); if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format(Locale.US, "onTransportClosed hostname[%s]", - (connection != null ? connection.getHostname() : "n/a"))); + TRACE_LOGGER.info(String.format(Locale.US, "onTransportClosed name[%s], hostname[%s]", + this.name, (connection != null ? connection.getHostname() : "n/a"))); } if (transport != null && connection != null && connection.getTransport() != null) { diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataBatchImpl.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataBatchImpl.java index 569792a4f5cd..54207e7e18a8 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataBatchImpl.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataBatchImpl.java @@ -11,6 +11,7 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Locale; final class EventDataBatchImpl implements EventDataBatch { @@ -45,7 +46,7 @@ public boolean tryAdd(final EventData eventData) throws PayloadSizeExceededExcep try { size = getSize(eventDataImpl, events.isEmpty()); } catch (java.nio.BufferOverflowException exception) { - throw new PayloadSizeExceededException(String.format("Size of the payload exceeded Maximum message size: %s kb", this.maxMessageSize / 1024)); + throw new PayloadSizeExceededException(String.format(Locale.US, "Size of the payload exceeded Maximum message size: %s kb", this.maxMessageSize / 1024)); } if (this.currentSize + size > this.maxMessageSize) { diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataImpl.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataImpl.java index 12b4a9848404..b4294435f7e0 100755 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataImpl.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataImpl.java @@ -165,6 +165,10 @@ public SystemProperties getSystemProperties() { return this.systemProperties; } + public void setSystemProperties(EventData.SystemProperties props) { + this.systemProperties = props; + } + // This is intended to be used while sending EventData - so EventData.SystemProperties will not be copied over to the AmqpMessage Message toAmqpMessage() { final Message amqpMessage = Proton.message(); diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataUtil.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataUtil.java index dba9d1e99974..d8e1c3588e54 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataUtil.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataUtil.java @@ -14,9 +14,6 @@ import java.util.Set; import java.util.function.Consumer; -/* - * Internal utility class for EventData - */ final class EventDataUtil { @SuppressWarnings("serial") diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java index 5f3d339e35d2..a352172bb1dc 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java @@ -50,15 +50,15 @@ public final class EventHubClientImpl extends ClientEntity implements EventHubCl private CompletableFuture createSender; private EventHubClientImpl(final ConnectionStringBuilder connectionString, final ScheduledExecutorService executor) { - super("EventHubClientImpl".concat(StringUtil.getRandomString()), null, executor); + super(StringUtil.getRandomString("EC"), null, executor); this.eventHubName = connectionString.getEventHubName(); this.senderCreateSync = new Object(); } public static CompletableFuture create( - final String connectionString, final RetryPolicy retryPolicy, final ScheduledExecutorService executor) - throws EventHubException, IOException { + final String connectionString, final RetryPolicy retryPolicy, final ScheduledExecutorService executor) + throws IOException { final ConnectionStringBuilder connStr = new ConnectionStringBuilder(connectionString); final EventHubClientImpl eventHubClient = new EventHubClientImpl(connStr, executor); @@ -232,7 +232,8 @@ private CompletableFuture createInternalSender() { if (!this.isSenderCreateStarted) { synchronized (this.senderCreateSync) { if (!this.isSenderCreateStarted) { - this.createSender = MessageSender.create(this.underlyingFactory, this.getClientId().concat("-InternalSender"), this.eventHubName) + String senderName = StringUtil.getRandomString("EC").concat(StringUtil.SEPARATOR + this.underlyingFactory.getClientId()).concat("-InternalSender"); + this.createSender = MessageSender.create(this.underlyingFactory, senderName, this.eventHubName) .thenAcceptAsync(new Consumer() { public void accept(MessageSender a) { EventHubClientImpl.this.sender = a; @@ -314,7 +315,7 @@ public CompletableFuture apply(Map private CompletableFuture addManagementToken(Map request) { CompletableFuture retval = null; try { - String audience = String.format("amqp://%s/%s", this.underlyingFactory.getHostName(), this.eventHubName); + String audience = String.format(Locale.US, "amqp://%s/%s", this.underlyingFactory.getHostName(), this.eventHubName); String token = this.underlyingFactory.getTokenProvider().getToken(audience, ClientConstants.TOKEN_REFRESH_INTERVAL); request.put(ClientConstants.MANAGEMENT_SECURITY_TOKEN_KEY, token); } catch (InvalidKeyException | NoSuchAlgorithmException | IOException e) { diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventPositionImpl.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventPositionImpl.java index dbbb4ab5fe64..80f2cec4a44c 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventPositionImpl.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventPositionImpl.java @@ -8,6 +8,7 @@ import org.slf4j.LoggerFactory; import java.time.Instant; +import java.util.Locale; public final class EventPositionImpl implements EventPosition { @@ -103,7 +104,7 @@ String getExpression() { @Override public String toString() { - return String.format("offset[%s], sequenceNumber[%s], enqueuedTime[%s], inclusiveFlag[%s]", + return String.format(Locale.US, "offset[%s], sequenceNumber[%s], enqueuedTime[%s], inclusiveFlag[%s]", this.offset, this.sequenceNumber, (this.dateTime != null) ? this.dateTime.toEpochMilli() : "null", this.inclusiveFlag); diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ExceptionUtil.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ExceptionUtil.java index da5a59b8e4a1..621c582d2af3 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ExceptionUtil.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ExceptionUtil.java @@ -4,6 +4,7 @@ package com.microsoft.azure.eventhubs.impl; import com.microsoft.azure.eventhubs.AuthorizationFailedException; +import com.microsoft.azure.eventhubs.CommunicationException; import com.microsoft.azure.eventhubs.ErrorContext; import com.microsoft.azure.eventhubs.EventHubException; import com.microsoft.azure.eventhubs.IllegalEntityException; @@ -62,6 +63,12 @@ static Exception toException(ErrorCondition errorCondition) { return new EventHubException(true, new AmqpException(errorCondition)); } else if (errorCondition.getCondition() == AmqpErrorCode.ResourceLimitExceeded) { return new QuotaExceededException(new AmqpException(errorCondition)); + } else if (errorCondition.getCondition() == ClientConstants.PROTON_IO_ERROR) { + String message = ClientConstants.COMMUNICATION_EXCEPTION_GENERIC_MESSAGE; + if (errorCondition.getDescription() != null) { + message = errorCondition.getDescription(); + } + return new CommunicationException(message, null); } return new EventHubException(ClientConstants.DEFAULT_IS_TRANSIENT, errorCondition.getDescription()); @@ -137,7 +144,7 @@ public static String toStackTraceString(final Throwable exception, final String final Throwable innerException = exception.getCause(); if (innerException != null) { - builder.append("Cause: " + innerException.getMessage()); + builder.append("Cause: ").append(innerException.getMessage()); final StackTraceElement[] innerStackTraceElements = innerException.getStackTrace(); for (final StackTraceElement ste : innerStackTraceElements) { builder.append(System.lineSeparator()); diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/FaultTolerantObject.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/FaultTolerantObject.java index 4ae153061003..01e9006950c0 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/FaultTolerantObject.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/FaultTolerantObject.java @@ -47,7 +47,7 @@ public void runOnOpenedObject( public void onEvent() { if (!creatingNewInnerObject && (innerObject == null || innerObject.getState() == IOObject.IOObjectState.CLOSED - || innerObject.getState() == IOObject.IOObjectState.CLOSING)) { + || innerObject.getState() == IOObject.IOObjectState.CLOSING)) { creatingNewInnerObject = true; try { @@ -59,6 +59,7 @@ public void onComplete(T result) { for (OperationResult callback : openCallbacks) { callback.onComplete(result); } + openCallbacks.clear(); } @@ -67,6 +68,7 @@ public void onError(Exception error) { for (OperationResult callback : openCallbacks) { callback.onError(error); } + openCallbacks.clear(); } }); diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ManagementChannel.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ManagementChannel.java index 7e74034d319e..b6f577165d58 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ManagementChannel.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ManagementChannel.java @@ -3,33 +3,34 @@ package com.microsoft.azure.eventhubs.impl; +import com.microsoft.azure.eventhubs.OperationCancelledException; +import com.microsoft.azure.eventhubs.TimeoutException; import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.amqp.messaging.AmqpValue; import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; import org.apache.qpid.proton.message.Message; import java.io.IOException; +import java.util.Locale; import java.util.Map; import java.util.concurrent.CompletableFuture; -import com.microsoft.azure.eventhubs.OperationCancelledException; -import com.microsoft.azure.eventhubs.TimeoutException; - final class ManagementChannel { final FaultTolerantObject innerChannel; - ManagementChannel(final SessionProvider sessionProvider, final AmqpConnection connection) { + ManagementChannel(final SessionProvider sessionProvider, final AmqpConnection connection, final String clientId) { final RequestResponseCloser closer = new RequestResponseCloser(); this.innerChannel = new FaultTolerantObject<>( - new RequestResponseOpener( - sessionProvider, - "mgmt-session", - "mgmt", - ClientConstants.MANAGEMENT_ADDRESS, - connection), - closer); + new RequestResponseOpener( + sessionProvider, + clientId, + "mgmt-session", + "mgmt", + ClientConstants.MANAGEMENT_ADDRESS, + connection), + closer); closer.setInnerChannel(this.innerChannel); } @@ -45,22 +46,22 @@ public CompletableFuture> request( try { // schedule client-timeout on the request dispatcher.invoke((int) timeoutInMillis, - new DispatchHandler() { - @Override - public void onEvent() { - final RequestResponseChannel channel = innerChannel.unsafeGetIfOpened(); - final String errorMessage; - if (channel != null && channel.getState() == IOObject.IOObjectState.OPENED) { - final String remoteContainerId = channel.getSendLink().getSession().getConnection().getRemoteContainer(); - errorMessage = String.format("Management request timed out (%sms), after not receiving response from service. TrackingId: %s", - timeoutInMillis, StringUtil.isNullOrEmpty(remoteContainerId) ? "n/a" : remoteContainerId); - } else { - errorMessage = "Management request timed out on the client - enable info level tracing to diagnose."; + new DispatchHandler() { + @Override + public void onEvent() { + final RequestResponseChannel channel = innerChannel.unsafeGetIfOpened(); + final String errorMessage; + if (channel != null && channel.getState() == IOObject.IOObjectState.OPENED) { + final String remoteContainerId = channel.getSendLink().getSession().getConnection().getRemoteContainer(); + errorMessage = String.format(Locale.US, "Management request timed out (%sms), after not receiving response from service. TrackingId: %s", + timeoutInMillis, StringUtil.isNullOrEmpty(remoteContainerId) ? "n/a" : remoteContainerId); + } else { + errorMessage = "Management request timed out on the client - enable info level tracing to diagnose."; + } + + resultFuture.completeExceptionally(new TimeoutException(errorMessage)); } - - resultFuture.completeExceptionally(new TimeoutException(errorMessage)); - } - }); + }); } catch (final IOException ioException) { resultFuture.completeExceptionally( new OperationCancelledException( diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageReceiver.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageReceiver.java index 3a780267dba9..655709389517 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageReceiver.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageReceiver.java @@ -77,6 +77,7 @@ public final class MessageReceiver extends ClientEntity implements AmqpReceiver, private volatile CompletableFuture closeTimer; private int prefetchCount; private Exception lastKnownLinkError; + private String linkCreationTime; private MessageReceiver(final MessagingFactory factory, final String name, @@ -124,7 +125,7 @@ public void run() { "clientId[%s], path[%s], linkName[%s] - Reschedule operation timer, current: [%s], remaining: [%s] secs", getClientId(), receivePath, - receiveLink.getName(), + getReceiveLinkName(), Instant.now(), timeoutTracker.remaining().getSeconds())); } @@ -160,7 +161,7 @@ public void onComplete(Void result) { TRACE_LOGGER.debug( String.format(Locale.US, "clientId[%s], path[%s], linkName[%s] - token renewed", - getClientId(), receivePath, receiveLink.getName())); + getClientId(), receivePath, getReceiveLinkName())); } } @@ -170,7 +171,7 @@ public void onError(Exception error) { TRACE_LOGGER.info( String.format(Locale.US, "clientId[%s], path[%s], linkName[%s], tokenRenewalFailure[%s]", - getClientId(), receivePath, receiveLink.getName(), error.getMessage())); + getClientId(), receivePath, getReceiveLinkName(), error.getMessage())); } } }); @@ -179,7 +180,7 @@ public void onError(Exception error) { TRACE_LOGGER.info( String.format(Locale.US, "clientId[%s], path[%s], linkName[%s], tokenRenewalScheduleFailure[%s]", - getClientId(), receivePath, receiveLink.getName(), exception.getMessage())); + getClientId(), receivePath, getReceiveLinkName(), exception.getMessage())); } } } @@ -242,6 +243,10 @@ private List receiveCore(final int messageCount) { return returnMessages; } + private String getReceiveLinkName() { + return this.receiveLink == null ? "null" : this.receiveLink.getName(); + } + public Duration getReceiveTimeout() { return this.receiveTimeout; } @@ -269,7 +274,7 @@ public CompletableFuture> receive(final int maxMessageCount) "clientId[%s], path[%s], linkName[%s] - schedule operation timer, current: [%s], remaining: [%s] secs", this.getClientId(), this.receivePath, - this.receiveLink.getName(), + this.getReceiveLinkName(), Instant.now(), this.receiveTimeout.getSeconds())); } @@ -313,8 +318,8 @@ public void onOpenComplete(Exception exception) { this.sendFlow(this.prefetchCount - this.prefetchedMessages.size()); if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format("onOpenComplete - clientId[%s], receiverPath[%s], linkName[%s], updated-link-credit[%s], sentCredits[%s]", - this.getClientId(), this.receivePath, this.receiveLink.getName(), this.receiveLink.getCredit(), this.prefetchCount)); + TRACE_LOGGER.info(String.format(Locale.US, "onOpenComplete - clientId[%s], receiverPath[%s], linkName[%s], updated-link-credit[%s], sentCredits[%s]", + this.getClientId(), this.receivePath, this.getReceiveLinkName(), this.receiveLink.getCredit(), this.prefetchCount)); } } else { synchronized (this.errorConditionLock) { @@ -409,11 +414,12 @@ public void onError(final Exception exception) { : exception; if (TRACE_LOGGER.isWarnEnabled()) { - TRACE_LOGGER.warn("clientId[{}], receiverPath[{}], linkName[{}], onError: {}", + TRACE_LOGGER.warn( + String.format(Locale.US, "clientId[%s], receiverPath[%s], linkName[%s], onError: %s", this.getClientId(), this.receivePath, - this.receiveLink != null ? this.receiveLink.getName() : "n/a", - completionException); + this.getReceiveLinkName(), + completionException)); } this.onOpenComplete(completionException); @@ -430,7 +436,7 @@ public void onError(final Exception exception) { @Override public void onEvent() { if (!MessageReceiver.this.getIsClosingOrClosed() - && (receiveLink.getLocalState() == EndpointState.CLOSED || receiveLink.getRemoteState() == EndpointState.CLOSED)) { + && (receiveLink == null || receiveLink.getLocalState() == EndpointState.CLOSED || receiveLink.getRemoteState() == EndpointState.CLOSED)) { createReceiveLink(); underlyingFactory.getRetryPolicy().incrementRetryCount(getClientId()); } @@ -443,7 +449,7 @@ public void onEvent() { String.format(Locale.US, "clientId[%s], receiverPath[%s], linkName[%s], scheduling createLink encountered error: %s", this.getClientId(), this.receivePath, - this.receiveLink.getName(), ignore.getLocalizedMessage())); + this.getReceiveLinkName(), ignore.getLocalizedMessage())); } } } @@ -478,6 +484,13 @@ private void scheduleOperationTimer(final TimeoutTracker tracker) { private void createReceiveLink() { synchronized (this.errorConditionLock) { if (this.creatingLink) { + if (TRACE_LOGGER.isInfoEnabled()) { + TRACE_LOGGER.info( + String.format(Locale.US, + "clientId[%s], path[%s], operationTimeout[%s], creating a receive link is already in progress", + this.getClientId(), this.receivePath, this.operationTimeout)); + } + return; } @@ -491,6 +504,8 @@ private void createReceiveLink() { this.getClientId(), this.receivePath, this.operationTimeout)); } + this.linkCreationTime = Instant.now().toString(); + this.scheduleLinkOpenTimeout(TimeoutTracker.create(this.operationTimeout)); final Consumer onSessionOpen = new Consumer() { @@ -498,6 +513,12 @@ private void createReceiveLink() { public void accept(Session session) { // if the MessageReceiver is closed - we no-longer need to create the link if (MessageReceiver.this.getIsClosingOrClosed()) { + if (TRACE_LOGGER.isInfoEnabled()) { + TRACE_LOGGER.info( + String.format(Locale.US, + "clientId[%s], path[%s], canceling the job of creating a receive link because the receiver was closed", + getClientId(), receivePath)); + } session.close(); return; @@ -529,7 +550,7 @@ public void accept(Session session) { if (desiredCapabilities != null) { receiver.setDesiredCapabilities(desiredCapabilities); } - final ReceiveLinkHandler handler = new ReceiveLinkHandler(MessageReceiver.this); + final ReceiveLinkHandler handler = new ReceiveLinkHandler(MessageReceiver.this, MessageReceiver.this.getClientId()); BaseHandler.setHandler(receiver, handler); if (MessageReceiver.this.receiveLink != null) { @@ -609,18 +630,23 @@ private Message pollPrefetchQueue() { private void sendFlow(final int credits) { // slow down sending the flow - to make the protocol less-chat'y this.nextCreditToFlow += credits; - if (this.nextCreditToFlow >= this.prefetchCount || this.nextCreditToFlow >= 100) { + if (this.shouldSendFlow()) { final int tempFlow = this.nextCreditToFlow; this.receiveLink.flow(tempFlow); this.nextCreditToFlow = 0; if (TRACE_LOGGER.isDebugEnabled()) { - TRACE_LOGGER.debug(String.format("clientId[%s], receiverPath[%s], linkName[%s], updated-link-credit[%s], sentCredits[%s], ThreadId[%s]", - this.getClientId(), this.receivePath, this.receiveLink.getName(), this.receiveLink.getCredit(), tempFlow, Thread.currentThread().getId())); + TRACE_LOGGER.debug(String.format(Locale.US, "clientId[%s], receiverPath[%s], linkName[%s], updated-link-credit[%s], sentCredits[%s], ThreadId[%s]", + this.getClientId(), this.receivePath, this.getReceiveLinkName(), this.receiveLink.getCredit(), tempFlow, Thread.currentThread().getId())); } } } + private boolean shouldSendFlow() { + return (this.nextCreditToFlow > 0 && this.nextCreditToFlow >= (this.prefetchCount / 2)) + || (this.nextCreditToFlow >= 100); + } + private void scheduleLinkOpenTimeout(final TimeoutTracker timeout) { // timer to signal a timeout if exceeds the operationTimeout on MessagingFactory this.openTimer = timer.schedule( @@ -815,7 +841,7 @@ public void onEvent() { receiveWork.onEvent(); if (!MessageReceiver.this.getIsClosingOrClosed() - && (receiveLink.getLocalState() == EndpointState.CLOSED || receiveLink.getRemoteState() == EndpointState.CLOSED)) { + && (receiveLink == null || receiveLink.getLocalState() == EndpointState.CLOSED || receiveLink.getRemoteState() == EndpointState.CLOSED)) { createReceiveLink(); } } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageSender.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageSender.java index 1481baa7e6f6..248643813aab 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageSender.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageSender.java @@ -131,7 +131,7 @@ public void onComplete(Void result) { if (TRACE_LOGGER.isDebugEnabled()) { TRACE_LOGGER.debug(String.format(Locale.US, "clientId[%s], path[%s], linkName[%s] - token renewed", - getClientId(), sendPath, sendLink.getName())); + getClientId(), sendPath, getSendLinkName())); } } @@ -140,7 +140,7 @@ public void onError(Exception error) { if (TRACE_LOGGER.isInfoEnabled()) { TRACE_LOGGER.info(String.format(Locale.US, "clientId[%s], path[%s], linkName[%s] - tokenRenewalFailure[%s]", - getClientId(), sendPath, sendLink.getName(), error.getMessage())); + getClientId(), sendPath, getSendLinkName(), error.getMessage())); } } }); @@ -148,7 +148,7 @@ public void onError(Exception error) { if (TRACE_LOGGER.isWarnEnabled()) { TRACE_LOGGER.warn(String.format(Locale.US, "clientId[%s], path[%s], linkName[%s] - tokenRenewalScheduleFailure[%s]", - getClientId(), sendPath, sendLink.getName(), exception.getMessage())); + getClientId(), sendPath, getSendLinkName(), exception.getMessage())); } } } @@ -266,6 +266,10 @@ private CompletableFuture send( return this.sendCore(bytes, arrayOffset, messageFormat, onSend, tracker, null, null); } + private String getSendLinkName() { + return this.sendLink == null ? "null" : this.sendLink.getName(); + } + public CompletableFuture send(final Iterable messages) { if (messages == null || IteratorUtil.sizeEquals(messages, 0)) { throw new IllegalArgumentException(String.format(Locale.US, @@ -359,8 +363,8 @@ public void onOpenComplete(Exception completionException) { this.cancelOpenTimer(); if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format("onOpenComplete - clientId[%s], sendPath[%s], linkName[%s]", - this.getClientId(), this.sendPath, this.sendLink.getName())); + TRACE_LOGGER.info(String.format(Locale.US, "onOpenComplete - clientId[%s], sendPath[%s], linkName[%s]", + this.getClientId(), this.sendPath, this.getSendLinkName())); } if (!this.linkFirstOpen.isDone()) { @@ -508,7 +512,7 @@ public void onError(final Exception completionException) { @Override public void onEvent() { if (!MessageSender.this.getIsClosingOrClosed() - && (sendLink.getLocalState() == EndpointState.CLOSED || sendLink.getRemoteState() == EndpointState.CLOSED)) { + && (sendLink == null || sendLink.getLocalState() == EndpointState.CLOSED || sendLink.getRemoteState() == EndpointState.CLOSED)) { recreateSendLink(); } } @@ -543,7 +547,7 @@ public void onSendComplete(final Delivery delivery) { String.format( Locale.US, "clientId[%s], path[%s], linkName[%s], deliveryTag[%s]", - this.getClientId(), this.sendPath, this.sendLink.getName(), deliveryTag)); + this.getClientId(), this.sendPath, this.getSendLinkName(), deliveryTag)); } final ReplayableWorkItem pendingSendWorkItem = this.pendingSendsData.remove(deliveryTag); @@ -612,7 +616,7 @@ public void onEvent() { if (TRACE_LOGGER.isDebugEnabled()) { TRACE_LOGGER.debug( String.format(Locale.US, "clientId[%s]. path[%s], linkName[%s], delivery[%s] - mismatch (or send timed out)", - this.getClientId(), this.sendPath, this.sendLink.getName(), deliveryTag)); + this.getClientId(), this.sendPath, this.getSendLinkName(), deliveryTag)); } } } @@ -627,6 +631,13 @@ private void cleanupFailedSend(final ReplayableWorkItem failedSend, final private void createSendLink() { synchronized (this.errorConditionLock) { if (this.creatingLink) { + if (TRACE_LOGGER.isInfoEnabled()) { + TRACE_LOGGER.info( + String.format(Locale.US, + "clientId[%s], path[%s], operationTimeout[%s], creating a send link is already in progress", + this.getClientId(), this.sendPath, this.operationTimeout)); + } + return; } @@ -663,7 +674,7 @@ public void accept(Session session) { sender.setSenderSettleMode(SenderSettleMode.UNSETTLED); - final SendLinkHandler handler = new SendLinkHandler(MessageSender.this); + final SendLinkHandler handler = new SendLinkHandler(MessageSender.this, MessageSender.this.getClientId()); BaseHandler.setHandler(sender, handler); if (MessageSender.this.sendLink != null) { @@ -803,7 +814,7 @@ public void onFlow(final int creditIssued) { int numberOfSendsWaitingforCredit = this.pendingSends.size(); TRACE_LOGGER.debug(String.format(Locale.US, "clientId[%s], path[%s], linkName[%s], remoteLinkCredit[%s], pendingSendsWaitingForCredit[%s], pendingSendsWaitingDelivery[%s]", - this.getClientId(), this.sendPath, this.sendLink.getName(), creditIssued, numberOfSendsWaitingforCredit, this.pendingSendsData.size() - numberOfSendsWaitingforCredit)); + this.getClientId(), this.sendPath, this.getSendLinkName(), creditIssued, numberOfSendsWaitingforCredit, this.pendingSendsData.size() - numberOfSendsWaitingforCredit)); } this.sendWork.onEvent(); @@ -816,10 +827,11 @@ private void recreateSendLink() { // actual send on the SenderLink should happen only in this method & should run on Reactor Thread private void processSendWork() { - if (this.sendLink.getLocalState() == EndpointState.CLOSED || this.sendLink.getRemoteState() == EndpointState.CLOSED) { + if (this.sendLink == null || this.sendLink.getLocalState() == EndpointState.CLOSED || this.sendLink.getRemoteState() == EndpointState.CLOSED) { if (!this.getIsClosingOrClosed()) { this.recreateSendLink(); } + return; } @@ -870,7 +882,7 @@ private void processSendWork() { if (TRACE_LOGGER.isDebugEnabled()) { TRACE_LOGGER.debug( String.format(Locale.US, "clientId[%s], path[%s], linkName[%s], deliveryTag[%s], sentMessageSize[%s], payloadActualSize[%s] - sendlink advance failed", - this.getClientId(), this.sendPath, this.sendLink.getName(), deliveryTag, sentMsgSize, sendData.getEncodedMessageSize())); + this.getClientId(), this.sendPath, this.getSendLinkName(), deliveryTag, sentMsgSize, sendData.getEncodedMessageSize())); } if (delivery != null) { @@ -888,7 +900,7 @@ private void processSendWork() { if (TRACE_LOGGER.isDebugEnabled()) { TRACE_LOGGER.debug( String.format(Locale.US, "clientId[%s], path[%s], linkName[%s], deliveryTag[%s] - sendData not found for this delivery.", - this.getClientId(), this.sendPath, this.sendLink.getName(), deliveryTag)); + this.getClientId(), this.sendPath, this.getSendLinkName(), deliveryTag)); } } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java index 8336b3b1afa4..2509288948c0 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java @@ -10,6 +10,7 @@ import com.microsoft.azure.eventhubs.OperationCancelledException; import com.microsoft.azure.eventhubs.RetryPolicy; import com.microsoft.azure.eventhubs.TimeoutException; +import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.engine.BaseHandler; import org.apache.qpid.proton.engine.Connection; @@ -72,7 +73,7 @@ public final class MessagingFactory extends ClientEntity implements AmqpConnecti final RetryPolicy retryPolicy, final ScheduledExecutorService executor, final ReactorFactory reactorFactory) { - super("MessagingFactory".concat(StringUtil.getRandomString()), null, executor); + super(StringUtil.getRandomString("MF"), null, executor); this.hostName = builder.getEndpoint().getHost(); this.reactorFactory = reactorFactory; @@ -80,7 +81,7 @@ public final class MessagingFactory extends ClientEntity implements AmqpConnecti this.retryPolicy = retryPolicy; this.registeredLinks = new LinkedList<>(); this.reactorLock = new Object(); - this.connectionHandler = ConnectionHandler.create(builder.getTransportType(), this); + this.connectionHandler = ConnectionHandler.create(builder.getTransportType(), this, this.getClientId()); this.cbsChannelCreateLock = new Object(); this.mgmtChannelCreateLock = new Object(); this.tokenProvider = builder.getSharedAccessSignature() == null @@ -168,7 +169,7 @@ private void createConnection() throws IOException { } private void startReactor(final ReactorHandler reactorHandler) throws IOException { - final Reactor newReactor = this.reactorFactory.create(reactorHandler, this.connectionHandler.getMaxFrameSize()); + final Reactor newReactor = this.reactorFactory.create(reactorHandler, this.connectionHandler.getMaxFrameSize(), this.getClientId()); synchronized (this.reactorLock) { this.reactor = newReactor; this.reactorDispatcher = new ReactorDispatcher(newReactor); @@ -183,7 +184,7 @@ private void startReactor(final ReactorHandler reactorHandler) throws IOExceptio public CBSChannel getCBSChannel() { synchronized (this.cbsChannelCreateLock) { if (this.cbsChannel == null) { - this.cbsChannel = new CBSChannel(this, this); + this.cbsChannel = new CBSChannel(this, this, this.getClientId()); } } @@ -193,7 +194,7 @@ public CBSChannel getCBSChannel() { public ManagementChannel getManagementChannel() { synchronized (this.mgmtChannelCreateLock) { if (this.mgmtChannel == null) { - this.mgmtChannel = new ManagementChannel(this, this); + this.mgmtChannel = new ManagementChannel(this, this, this.getClientId()); } } @@ -203,11 +204,16 @@ public ManagementChannel getManagementChannel() { @Override public Session getSession(final String path, final Consumer onRemoteSessionOpen, final BiConsumer onRemoteSessionOpenError) { if (this.getIsClosingOrClosed()) { - onRemoteSessionOpenError.accept(null, new OperationCancelledException("underlying messagingFactory instance is closed")); return null; } + if (TRACE_LOGGER.isInfoEnabled()) { + TRACE_LOGGER.info( + String.format(Locale.US, "messagingFactory[%s], hostName[%s], getting a session.", + getClientId(), getHostName())); + } + if (this.connection == null || this.connection.getLocalState() == EndpointState.CLOSED || this.connection.getRemoteState() == EndpointState.CLOSED) { this.connection = this.getReactor().connectionToHost( this.connectionHandler.getRemoteHostName(), @@ -216,7 +222,7 @@ public Session getSession(final String path, final Consumer onRemoteSes } final Session session = this.connection.session(); - BaseHandler.setHandler(session, new SessionHandler(path, onRemoteSessionOpen, onRemoteSessionOpenError, this.operationTimeout)); + BaseHandler.setHandler(session, new SessionHandler(path, onRemoteSessionOpen, onRemoteSessionOpenError, this.operationTimeout, this.getClientId())); session.open(); return session; @@ -251,7 +257,7 @@ public void onOpenComplete(Exception exception) { @Override public void onConnectionError(ErrorCondition error) { if (TRACE_LOGGER.isWarnEnabled()) { - TRACE_LOGGER.warn(String.format(Locale.US, "onConnectionError: messagingFactory[%s], hostname[%s], error[%s]", + TRACE_LOGGER.warn(String.format(Locale.US, "onConnectionError messagingFactory[%s], hostname[%s], error[%s]", this.getClientId(), this.hostName, error != null ? error.getDescription() : "n/a")); @@ -259,7 +265,7 @@ public void onConnectionError(ErrorCondition error) { if (!this.open.isDone()) { if (TRACE_LOGGER.isWarnEnabled()) { - TRACE_LOGGER.warn(String.format(Locale.US, "onConnectionError: messagingFactory[%s], hostname[%s], open hasn't complete, stopping the reactor", + TRACE_LOGGER.warn(String.format(Locale.US, "onConnectionError messagingFactory[%s], hostname[%s], open hasn't complete, stopping the reactor", this.getClientId(), this.hostName)); } @@ -272,9 +278,9 @@ public void onConnectionError(ErrorCondition error) { final List closedLinks = new LinkedList<>(); for (Link link : oldRegisteredLinksCopy) { - if (link.getLocalState() != EndpointState.CLOSED && link.getRemoteState() != EndpointState.CLOSED) { + if (link.getLocalState() != EndpointState.CLOSED) { if (TRACE_LOGGER.isWarnEnabled()) { - TRACE_LOGGER.warn(String.format(Locale.US, "onConnectionError: messagingFactory[%s], hostname[%s], closing link [%s]", + TRACE_LOGGER.warn(String.format(Locale.US, "onConnectionError messagingFactory[%s], hostname[%s], closing link [%s]", this.getClientId(), this.hostName, link.getName())); } @@ -289,7 +295,7 @@ public void onConnectionError(ErrorCondition error) { // in connection recreation we depend on currentConnection state to evaluate need for recreation if (oldConnection.getLocalState() != EndpointState.CLOSED) { if (TRACE_LOGGER.isWarnEnabled()) { - TRACE_LOGGER.warn(String.format(Locale.US, "onConnectionError: messagingFactory[%s], hostname[%s], closing current connection", + TRACE_LOGGER.warn(String.format(Locale.US, "onConnectionError messagingFactory[%s], hostname[%s], closing current connection", this.getClientId(), this.hostName)); } @@ -303,7 +309,7 @@ public void onConnectionError(ErrorCondition error) { for (Link link : closedLinks) { final Handler handler = BaseHandler.getHandler(link); - if (handler != null && handler instanceof BaseLinkHandler) { + if (handler instanceof BaseLinkHandler) { final BaseLinkHandler linkHandler = (BaseLinkHandler) handler; linkHandler.processOnClose(link, error); } @@ -349,17 +355,26 @@ private void onReactorError(Exception cause) { // below .close() calls (local closes). // But, we still need to change the states of these to Closed - so that subsequent retries - will // treat the links and connection as closed and re-establish them and continue running on new Reactor instance. - if (oldConnection.getLocalState() != EndpointState.CLOSED && oldConnection.getRemoteState() != EndpointState.CLOSED) { + ErrorCondition errorCondition = new ErrorCondition(Symbol.getSymbol("messagingfactory.onreactorerror"), cause.getMessage()); + if (oldConnection.getLocalState() != EndpointState.CLOSED) { + if (TRACE_LOGGER.isWarnEnabled()) { + TRACE_LOGGER.warn(String.format(Locale.US, "onReactorError: messagingFactory[%s], hostname[%s], closing current connection", + this.getClientId(), + this.hostName)); + } + + oldConnection.setCondition(errorCondition); oldConnection.close(); } for (final Link link : oldRegisteredLinksCopy) { - if (link.getLocalState() != EndpointState.CLOSED && link.getRemoteState() != EndpointState.CLOSED) { + if (link.getLocalState() != EndpointState.CLOSED) { + link.setCondition(errorCondition); link.close(); } final Handler handler = BaseHandler.getHandler(link); - if (handler != null && handler instanceof BaseLinkHandler) { + if (handler instanceof BaseLinkHandler) { final BaseLinkHandler linkHandler = (BaseLinkHandler) handler; linkHandler.processOnClose(link, cause); } @@ -415,8 +430,8 @@ public void scheduleOnReactorThread(final int delay, final DispatchHandler handl public static class ReactorFactory { - public Reactor create(final ReactorHandler reactorHandler, final int maxFrameSize) throws IOException { - return ProtonUtil.reactor(reactorHandler, maxFrameSize); + public Reactor create(final ReactorHandler reactorHandler, final int maxFrameSize, final String name) throws IOException { + return ProtonUtil.reactor(reactorHandler, maxFrameSize, name); } } @@ -611,6 +626,10 @@ public void run() { } private class ReactorHandlerWithConnection extends ReactorHandler { + ReactorHandlerWithConnection() { + super(getClientId()); + } + @Override public void onReactorInit(Event e) { super.onReactorInit(e); diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/PartitionReceiverImpl.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/PartitionReceiverImpl.java index e91230ad378c..92e1eb40e0a5 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/PartitionReceiverImpl.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/PartitionReceiverImpl.java @@ -20,6 +20,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Date; +import java.util.Locale; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -55,7 +56,7 @@ private PartitionReceiverImpl(MessagingFactory factory, final boolean isEpochReceiver, final ReceiverOptions receiverOptions, final ScheduledExecutorService executor) { - super("PartitionReceiverImpl".concat(StringUtil.getRandomString()), null, executor); + super(StringUtil.getRandomString("PR").concat(StringUtil.SEPARATOR + factory.getClientId()), null, executor); this.underlyingFactory = factory; this.eventHubName = eventHubName; @@ -104,7 +105,7 @@ public PartitionReceiver apply(Void a) { private CompletableFuture createInternalReceiver() { return MessageReceiver.create(this.underlyingFactory, this.getClientId().concat("-InternalReceiver"), - String.format("%s/ConsumerGroups/%s/Partitions/%s", this.eventHubName, this.consumerGroupName, this.partitionId), + String.format(Locale.US, "%s/ConsumerGroups/%s/Partitions/%s", this.eventHubName, this.consumerGroupName, this.partitionId), this.receiverOptions.getPrefetchCount(), this) .thenAcceptAsync(new Consumer() { public void accept(MessageReceiver r) { @@ -248,7 +249,7 @@ public Map getFilter(final Message lastReceivedMes } else { logReceivePath = "receiverPath[" + this.internalReceiver.getReceivePath() + "]"; } - TRACE_LOGGER.info(String.format("%s, action[createReceiveLink], %s", logReceivePath, this.eventPosition)); + TRACE_LOGGER.info(String.format(Locale.US, "%s, action[createReceiveLink], %s", logReceivePath, this.eventPosition)); } return Collections.singletonMap(AmqpConstants.STRING_FILTER, new UnknownDescribedType(AmqpConstants.STRING_FILTER, expression)); diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/PartitionSenderImpl.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/PartitionSenderImpl.java index bbde282f1f4e..86bf875d7728 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/PartitionSenderImpl.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/PartitionSenderImpl.java @@ -9,6 +9,7 @@ import com.microsoft.azure.eventhubs.EventHubException; import com.microsoft.azure.eventhubs.PartitionSender; +import java.util.Locale; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; import java.util.function.Consumer; @@ -22,7 +23,7 @@ final class PartitionSenderImpl extends ClientEntity implements PartitionSender private volatile MessageSender internalSender; private PartitionSenderImpl(final MessagingFactory factory, final String eventHubName, final String partitionId, final ScheduledExecutorService executor) { - super("PartitionSenderImpl".concat(StringUtil.getRandomString()), null, executor); + super(StringUtil.getRandomString("PS").concat(StringUtil.SEPARATOR + factory.getClientId()), null, executor); this.partitionId = partitionId; this.eventHubName = eventHubName; @@ -44,7 +45,7 @@ public PartitionSender apply(Void a) { private CompletableFuture createInternalSender() throws EventHubException { return MessageSender.create(this.factory, this.getClientId().concat("-InternalSender"), - String.format("%s/Partitions/%s", this.eventHubName, this.partitionId)) + String.format(Locale.US, "%s/Partitions/%s", this.eventHubName, this.partitionId)) .thenAcceptAsync(new Consumer() { public void accept(MessageSender a) { PartitionSenderImpl.this.internalSender = a; diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ProtonUtil.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ProtonUtil.java index caa69d739358..b375f4afe7dc 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ProtonUtil.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ProtonUtil.java @@ -14,14 +14,14 @@ public final class ProtonUtil { private ProtonUtil() { } - public static Reactor reactor(final ReactorHandler reactorHandler, final int maxFrameSize) throws IOException { + public static Reactor reactor(final ReactorHandler reactorHandler, final int maxFrameSize, final String name) throws IOException { final ReactorOptions reactorOptions = new ReactorOptions(); reactorOptions.setMaxFrameSize(maxFrameSize); reactorOptions.setEnableSaslByDefault(true); final Reactor reactor = Proton.reactor(reactorOptions, reactorHandler); - reactor.setGlobalHandler(new CustomIOHandler()); + reactor.setGlobalHandler(new CustomIOHandler(name)); return reactor; } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ReactorHandler.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ReactorHandler.java index 6e8a8ce7657b..c8a5670e41f3 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ReactorHandler.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ReactorHandler.java @@ -9,12 +9,20 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Locale; + public class ReactorHandler extends BaseHandler { private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(ReactorHandler.class); + private final String name; + private ReactorDispatcher reactorDispatcher; + public ReactorHandler(final String name) { + this.name = name; + } + public ReactorDispatcher getReactorDispatcher() { return this.reactorDispatcher; } @@ -26,8 +34,7 @@ public void unsafeSetReactorDispatcher(final ReactorDispatcher reactorDispatcher @Override public void onReactorInit(Event e) { - - TRACE_LOGGER.info("reactor.onReactorInit"); + TRACE_LOGGER.info(String.format(Locale.US, "name[%s] reactor.onReactorInit", this.name)); final Reactor reactor = e.getReactor(); reactor.setTimeout(ClientConstants.REACTOR_IO_POLL_TIMEOUT); @@ -35,7 +42,6 @@ public void onReactorInit(Event e) { @Override public void onReactorFinal(Event e) { - - TRACE_LOGGER.info("reactor.onReactorFinal"); + TRACE_LOGGER.info(String.format(Locale.US, "name[%s] reactor.onReactorFinal", this.name)); } } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ReceiveLinkHandler.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ReceiveLinkHandler.java index e3d56bdd09e5..bc2ed56d53a0 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ReceiveLinkHandler.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ReceiveLinkHandler.java @@ -12,18 +12,18 @@ import java.util.Locale; -// ServiceBus <-> ProtonReactor interaction -// handles all recvLink - reactor events public final class ReceiveLinkHandler extends BaseLinkHandler { private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(ReceiveLinkHandler.class); private final AmqpReceiver amqpReceiver; + private final String receiverName; private final Object firstResponse; private boolean isFirstResponse; - public ReceiveLinkHandler(final AmqpReceiver receiver) { - super(receiver); + public ReceiveLinkHandler(final AmqpReceiver receiver, final String receiverName) { + super(receiver, receiverName); this.amqpReceiver = receiver; + this.receiverName = receiverName; this.firstResponse = new Object(); this.isFirstResponse = true; } @@ -32,11 +32,9 @@ public ReceiveLinkHandler(final AmqpReceiver receiver) { public void onLinkLocalOpen(Event evt) { Link link = evt.getLink(); if (link instanceof Receiver) { - Receiver receiver = (Receiver) link; - if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info( - String.format("onLinkLocalOpen linkName[%s], localSource[%s]", receiver.getName(), receiver.getSource())); + TRACE_LOGGER.info(String.format(Locale.US, "onLinkLocalOpen receiverName[%s], linkName[%s], localSource[%s]", + this.receiverName, link.getName(), link.getSource())); } } } @@ -45,11 +43,10 @@ public void onLinkLocalOpen(Event evt) { public void onLinkRemoteOpen(Event event) { Link link = event.getLink(); if (link instanceof Receiver) { - Receiver receiver = (Receiver) link; if (link.getRemoteSource() != null) { if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format(Locale.US, "onLinkRemoteOpen linkName[%s], remoteSource[%s]", - receiver.getName(), link.getRemoteSource())); + TRACE_LOGGER.info(String.format(Locale.US, "onLinkRemoteOpen receiverName[%s], linkName[%s], remoteSource[%s]", + this.receiverName, link.getName(), link.getRemoteSource())); } synchronized (this.firstResponse) { @@ -58,9 +55,8 @@ public void onLinkRemoteOpen(Event event) { } } else { if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info( - String.format(Locale.US, "onLinkRemoteOpen linkName[%s], remoteTarget[null], " - + "remoteSource[null], action[waitingForError]", receiver.getName())); + TRACE_LOGGER.info(String.format(Locale.US, "onLinkRemoteOpen receiverName[%s], linkName[%s], action[waitingForError]", + this.receiverName, link.getName())); } } } @@ -90,9 +86,9 @@ public void onDelivery(Event event) { if (TRACE_LOGGER.isWarnEnabled()) { TRACE_LOGGER.warn( receiveLink != null - ? String.format(Locale.US, "onDelivery linkName[%s], updatedLinkCredit[%s], remoteCredit[%s], " + ? String.format(Locale.US, "onDelivery receiverName[%s], linkName[%s], updatedLinkCredit[%s], remoteCredit[%s], " + "remoteCondition[%s], delivery.isSettled[%s]", - receiveLink.getName(), receiveLink.getCredit(), receiveLink.getRemoteCredit(), receiveLink.getRemoteCondition(), delivery.isSettled()) + this.receiverName, receiveLink.getName(), receiveLink.getCredit(), receiveLink.getRemoteCredit(), receiveLink.getRemoteCondition(), delivery.isSettled()) : String.format(Locale.US, "delivery.isSettled[%s]", delivery.isSettled())); } } else { @@ -102,9 +98,9 @@ public void onDelivery(Event event) { if (TRACE_LOGGER.isTraceEnabled() && receiveLink != null) { TRACE_LOGGER.trace( - String.format(Locale.US, "onDelivery linkName[%s], updatedLinkCredit[%s], remoteCredit[%s], " - + "remoteCondition[%s], delivery.isPartial[%s]", - receiveLink.getName(), receiveLink.getCredit(), receiveLink.getRemoteCredit(), receiveLink.getRemoteCondition(), delivery.isPartial())); + String.format(Locale.US, "onDelivery receiverName[%s], linkName[%s], updatedLinkCredit[%s], remoteCredit[%s], " + + "remoteCondition[%s], delivery.isPartial[%s]", + this.receiverName, receiveLink.getName(), receiveLink.getCredit(), receiveLink.getRemoteCredit(), receiveLink.getRemoteCondition(), delivery.isPartial())); } } } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ReceivePump.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ReceivePump.java index 4430ce551295..b4c8f5d98088 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ReceivePump.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ReceivePump.java @@ -8,6 +8,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Locale; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; @@ -55,8 +56,8 @@ public void run() { } catch (final Exception exception) { if (TRACE_LOGGER.isErrorEnabled()) { TRACE_LOGGER.error( - String.format("Receive pump for eventHub (%s), consumerGroup (%s), partition (%s) " - + "encountered unrecoverable error and exited with exception %s.", + String.format(Locale.US, "Receive pump for eventHub (%s), consumerGroup (%s), partition (%s) " + + "encountered unrecoverable error and exited with exception %s.", this.eventHubName, this.consumerGroupName, this.receiver.getPartitionId(), exception.toString())); } @@ -71,7 +72,7 @@ public void receiveAndProcess() { .handleAsync(this.processAndReschedule, this.executor); } else { if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format("Stopping receive pump for eventHub (%s), consumerGroup (%s), partition (%s) as %s", + TRACE_LOGGER.info(String.format(Locale.US, "Stopping receive pump for eventHub (%s), consumerGroup (%s), partition (%s) as %s", this.eventHubName, this.consumerGroupName, this.receiver.getPartitionId(), this.stopPumpRaised.get() ? "per the request." : "pump ran into errors.")); } @@ -111,7 +112,7 @@ private void handleUserCodeExceptions(final Throwable userCodeException) { this.isPumpHealthy = false; if (TRACE_LOGGER.isErrorEnabled()) { TRACE_LOGGER.error( - String.format("Receive pump for eventHub (%s), consumerGroup (%s), partition (%s) " + String.format(Locale.US, "Receive pump for eventHub (%s), consumerGroup (%s), partition (%s) " + "exiting after user-code exception %s", this.eventHubName, this.consumerGroupName, this.receiver.getPartitionId(), userCodeException.toString())); } @@ -120,7 +121,7 @@ private void handleUserCodeExceptions(final Throwable userCodeException) { if (userCodeException instanceof InterruptedException) { if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format("Interrupting receive pump for eventHub (%s), consumerGroup (%s), partition (%s)", + TRACE_LOGGER.info(String.format(Locale.US, "Interrupting receive pump for eventHub (%s), consumerGroup (%s), partition (%s)", this.eventHubName, this.consumerGroupName, this.receiver.getPartitionId())); } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseChannel.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseChannel.java index e19fa8e6f60b..fd3191acae16 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseChannel.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseChannel.java @@ -57,7 +57,7 @@ public RequestResponseChannel( this.sendLink.setTarget(target); sendLink.setSource(new Source()); this.sendLink.setSenderSettleMode(SenderSettleMode.SETTLED); - BaseHandler.setHandler(this.sendLink, new SendLinkHandler(new RequestHandler())); + BaseHandler.setHandler(this.sendLink, new SendLinkHandler(new RequestHandler(), linkName)); this.receiveLink = session.receiver(linkName + ":receiver"); final Source source = new Source(); @@ -68,7 +68,7 @@ public RequestResponseChannel( this.receiveLink.setTarget(receiverTarget); this.receiveLink.setSenderSettleMode(SenderSettleMode.SETTLED); this.receiveLink.setReceiverSettleMode(ReceiverSettleMode.SECOND); - BaseHandler.setHandler(this.receiveLink, new ReceiveLinkHandler(new ResponseHandler())); + BaseHandler.setHandler(this.receiveLink, new ReceiveLinkHandler(new ResponseHandler(), linkName)); } // open should be called only once - we use FaultTolerantObject for that diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseCloser.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseCloser.java index 99d56495fccf..1403a700cbb2 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseCloser.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseCloser.java @@ -20,7 +20,9 @@ public void run(OperationResult closeOperationCallback) { if (channelToBeClosed == null) { closeOperationCallback.onComplete(null); } else { - channelToBeClosed.close(new OperationResultBase<>(closeOperationCallback::onComplete, closeOperationCallback::onError)); + channelToBeClosed.close(new OperationResultBase<>( + closeOperationCallback::onComplete, + closeOperationCallback::onError)); } } } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseOpener.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseOpener.java index a6799464fb0e..a7e5d22fcd15 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseOpener.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseOpener.java @@ -4,17 +4,27 @@ package com.microsoft.azure.eventhubs.impl; import org.apache.qpid.proton.engine.Session; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Locale; public class RequestResponseOpener implements Operation { + private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(RequestResponseOpener.class); + private final SessionProvider sessionProvider; + private final String clientId; private final String sessionName; private final String linkName; private final String endpointAddress; private final AmqpConnection eventDispatcher; - public RequestResponseOpener(final SessionProvider sessionProvider, final String sessionName, final String linkName, + private boolean isOpened; + + public RequestResponseOpener(final SessionProvider sessionProvider, final String clientId, final String sessionName, final String linkName, final String endpointAddress, final AmqpConnection eventDispatcher) { this.sessionProvider = sessionProvider; + this.clientId = clientId; this.sessionName = sessionName; this.linkName = linkName; this.endpointAddress = endpointAddress; @@ -22,7 +32,10 @@ public RequestResponseOpener(final SessionProvider sessionProvider, final String } @Override - public void run(OperationResult operationCallback) { + public synchronized void run(OperationResult operationCallback) { + if (this.isOpened) { + return; + } final Session session = this.sessionProvider.getSession( this.sessionName, @@ -51,11 +64,23 @@ public void onComplete(Void result) { eventDispatcher.registerForConnectionError(requestResponseChannel.getReceiveLink()); operationCallback.onComplete(requestResponseChannel); + + isOpened = true; + + if (TRACE_LOGGER.isInfoEnabled()) { + TRACE_LOGGER.info(String.format(Locale.US, "requestResponseChannel.onOpen complete clientId[%s], session[%s], link[%s], endpoint[%s]", + clientId, sessionName, linkName, endpointAddress)); + } } @Override public void onError(Exception error) { operationCallback.onError(error); + + if (TRACE_LOGGER.isWarnEnabled()) { + TRACE_LOGGER.warn(String.format(Locale.US, "requestResponseChannel.onOpen error clientId[%s], session[%s], link[%s], endpoint[%s], error %s", + clientId, sessionName, linkName, endpointAddress, error)); + } } }, new OperationResult() { @@ -63,12 +88,24 @@ public void onError(Exception error) { public void onComplete(Void result) { eventDispatcher.deregisterForConnectionError(requestResponseChannel.getSendLink()); eventDispatcher.deregisterForConnectionError(requestResponseChannel.getReceiveLink()); + + isOpened = false; + + if (TRACE_LOGGER.isInfoEnabled()) { + TRACE_LOGGER.info(String.format(Locale.US, "requestResponseChannel.onClose complete clientId[%s], session[%s], link[%s], endpoint[%s]", + clientId, sessionName, linkName, endpointAddress)); + } } @Override public void onError(Exception error) { eventDispatcher.deregisterForConnectionError(requestResponseChannel.getSendLink()); eventDispatcher.deregisterForConnectionError(requestResponseChannel.getReceiveLink()); + + if (TRACE_LOGGER.isWarnEnabled()) { + TRACE_LOGGER.warn(String.format(Locale.US, "requestResponseChannel.onClose error clientId[%s], session[%s], link[%s], endpoint[%s], error %s", + clientId, sessionName, linkName, endpointAddress, error)); + } } }); } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/SendLinkHandler.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/SendLinkHandler.java index f2220ef1d7df..55949b61306b 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/SendLinkHandler.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/SendLinkHandler.java @@ -10,20 +10,21 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.charset.StandardCharsets; import java.util.Locale; import java.util.concurrent.atomic.AtomicBoolean; -import static java.nio.charset.StandardCharsets.UTF_8; - public class SendLinkHandler extends BaseLinkHandler { private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(SendLinkHandler.class); private final AmqpSender msgSender; + private final String senderName; private AtomicBoolean isFirstFlow; - public SendLinkHandler(final AmqpSender sender) { - super(sender); + public SendLinkHandler(final AmqpSender sender, final String senderName) { + super(sender, senderName); this.msgSender = sender; + this.senderName = senderName; this.isFirstFlow = new AtomicBoolean(true); } @@ -31,9 +32,9 @@ public SendLinkHandler(final AmqpSender sender) { public void onLinkLocalOpen(Event event) { Link link = event.getLink(); if (link instanceof Sender) { - Sender sender = (Sender) link; if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format("onLinkLocalOpen linkName[%s], localTarget[%s]", sender.getName(), sender.getTarget())); + TRACE_LOGGER.info(String.format(Locale.US, "onLinkLocalOpen senderName[%s], linkName[%s], localTarget[%s]", + this.senderName, link.getName(), link.getTarget())); } } } @@ -42,20 +43,19 @@ public void onLinkLocalOpen(Event event) { public void onLinkRemoteOpen(Event event) { Link link = event.getLink(); if (link instanceof Sender) { - Sender sender = (Sender) link; if (link.getRemoteTarget() != null) { if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format(Locale.US, "onLinkRemoteOpen linkName[%s], remoteTarget[%s]", sender.getName(), link.getRemoteTarget())); + TRACE_LOGGER.info(String.format(Locale.US, "onLinkRemoteOpen senderName[%s], linkName[%s], remoteTarget[%s]", + this.senderName, link.getName(), link.getRemoteTarget())); } if (this.isFirstFlow.compareAndSet(true, false)) { this.msgSender.onOpenComplete(null); } - } else { if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info( - String.format(Locale.US, "onLinkRemoteOpen linkName[%s], remoteTarget[null], remoteSource[null], action[waitingForError]", sender.getName())); + TRACE_LOGGER.info(String.format(Locale.US, "onLinkRemoteOpen senderName[%s], linkName[%s], remoteTarget[null], remoteSource[null], action[waitingForError]", + this.senderName, link.getName())); } } } @@ -69,10 +69,8 @@ public void onDelivery(Event event) { Sender sender = (Sender) delivery.getLink(); if (TRACE_LOGGER.isTraceEnabled()) { - TRACE_LOGGER.trace( - "onDelivery linkName[" + sender.getName() - + "], unsettled[" + sender.getUnsettled() + "], credit[" + sender.getRemoteCredit() + "], deliveryState[" + delivery.getRemoteState() - + "], delivery.isBuffered[" + delivery.isBuffered() + "], delivery.id[" + new String(delivery.getTag(), UTF_8) + "]"); + TRACE_LOGGER.trace(String.format(Locale.US, "onDelivery senderName[%s], linkName[%s], unsettled[%s], credit[%s], deliveryState[%s], delivery.isBuffered[%s], delivery.id[%s]", + this.senderName, sender.getName(), sender.getUnsettled(), sender.getRemoteCredit(), delivery.getRemoteState(), delivery.isBuffered(), new String(delivery.getTag(), StandardCharsets.UTF_8))); } msgSender.onSendComplete(delivery); @@ -92,7 +90,8 @@ public void onLinkFlow(Event event) { this.msgSender.onFlow(sender.getRemoteCredit()); if (TRACE_LOGGER.isDebugEnabled()) { - TRACE_LOGGER.debug("onLinkFlow linkName[" + sender.getName() + "], unsettled[" + sender.getUnsettled() + "], credit[" + sender.getCredit() + "]"); + TRACE_LOGGER.debug(String.format(Locale.US, "onLinkFlow senderName[%s], linkName[%s], unsettled[%s], credit[%s]", + this.senderName, sender.getName(), sender.getUnsettled(), sender.getCredit())); } } } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/SessionHandler.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/SessionHandler.java index 15d29e9a962f..3818fcdacb7a 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/SessionHandler.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/SessionHandler.java @@ -28,6 +28,7 @@ public class SessionHandler extends BaseHandler { private final Consumer onRemoteSessionOpen; private final BiConsumer onRemoteSessionOpenError; private final Duration openTimeout; + private final String connectionId; private boolean sessionCreated = false; private boolean sessionOpenErrorDispatched = false; @@ -35,18 +36,20 @@ public class SessionHandler extends BaseHandler { public SessionHandler(final String entityName, final Consumer onRemoteSessionOpen, final BiConsumer onRemoteSessionOpenError, - final Duration openTimeout) { + final Duration openTimeout, + final String connectionId) { this.entityName = entityName; this.onRemoteSessionOpenError = onRemoteSessionOpenError; this.onRemoteSessionOpen = onRemoteSessionOpen; this.openTimeout = openTimeout; + this.connectionId = connectionId; } @Override public void onSessionLocalOpen(Event e) { if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format(Locale.US, "onSessionLocalOpen entityName[%s], condition[%s]", this.entityName, - e.getSession().getCondition() == null ? "none" : e.getSession().getCondition().toString())); + TRACE_LOGGER.info(String.format(Locale.US, "onSessionLocalOpen connectionId[%s], entityName[%s], condition[%s]", + this.connectionId, this.entityName, e.getSession().getCondition() == null ? "none" : e.getSession().getCondition().toString())); } if (this.onRemoteSessionOpenError != null) { @@ -66,8 +69,7 @@ public void onSessionLocalOpen(Event e) { null, new EventHubException( false, - String.format("OnSessionLocalOpen entityName[%s], reactorHandler: NULL POINTER exception.") - ) + String.format("OnSessionLocalOpen entityName[%s], reactorHandler: NULL POINTER exception.", this.entityName)) ); e.getSession().close(); return; @@ -77,11 +79,11 @@ public void onSessionLocalOpen(Event e) { final Session session = e.getSession(); try { - reactorDispatcher.invoke((int) this.openTimeout.toMillis(), new SessionTimeoutHandler(session, entityName)); + reactorDispatcher.invoke((int) this.openTimeout.toMillis(), new SessionTimeoutHandler(entityName, connectionId)); } catch (IOException ioException) { if (TRACE_LOGGER.isWarnEnabled()) { - TRACE_LOGGER.warn(String.format(Locale.US, "onSessionLocalOpen entityName[%s], reactorDispatcherError[%s]", - this.entityName, ioException.getMessage())); + TRACE_LOGGER.warn(String.format(Locale.US, "onSessionLocalOpen connectionId[%s], entityName[%s], reactorDispatcherError[%s]", + this.connectionId, this.entityName, ioException.getMessage())); } session.close(); @@ -89,8 +91,8 @@ public void onSessionLocalOpen(Event e) { null, new EventHubException( false, - String.format("onSessionLocalOpen entityName[%s], underlying IO of reactorDispatcher faulted with error: %s", - this.entityName, ioException.getMessage()), ioException)); + String.format(Locale.US, "onSessionLocalOpen connectionId[%s], entityName[%s], underlying IO of reactorDispatcher faulted with error: %s", + this.connectionId, this.entityName, ioException.getMessage()), ioException)); } } } @@ -98,8 +100,8 @@ public void onSessionLocalOpen(Event e) { @Override public void onSessionRemoteOpen(Event e) { if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format(Locale.US, "onSessionRemoteOpen entityName[%s], sessionIncCapacity[%s], sessionOutgoingWindow[%s]", - this.entityName, e.getSession().getIncomingCapacity(), e.getSession().getOutgoingWindow())); + TRACE_LOGGER.info(String.format(Locale.US, "onSessionRemoteOpen connectionId[%s], entityName[%s], sessionIncCapacity[%s], sessionOutgoingWindow[%s]", + this.connectionId, this.entityName, e.getSession().getIncomingCapacity(), e.getSession().getOutgoingWindow())); } final Session session = e.getSession(); @@ -116,16 +118,16 @@ public void onSessionRemoteOpen(Event e) { @Override public void onSessionLocalClose(Event e) { if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format(Locale.US, "onSessionLocalClose entityName[%s], condition[%s]", this.entityName, - e.getSession().getCondition() == null ? "none" : e.getSession().getCondition().toString())); + TRACE_LOGGER.info(String.format(Locale.US, "onSessionLocalClose connectionId[%s], entityName[%s], condition[%s]", this.entityName, + this.connectionId, e.getSession().getCondition() == null ? "none" : e.getSession().getCondition().toString())); } } @Override public void onSessionRemoteClose(Event e) { if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format(Locale.US, "onSessionRemoteClose entityName[%s], condition[%s]", this.entityName, - e.getSession().getRemoteCondition() == null ? "none" : e.getSession().getRemoteCondition().toString())); + TRACE_LOGGER.info(String.format(Locale.US, "onSessionRemoteClose connectionId[%s], entityName[%s], condition[%s]", this.entityName, + this.connectionId, e.getSession().getRemoteCondition() == null ? "none" : e.getSession().getRemoteCondition().toString())); } final Session session = e.getSession(); @@ -133,10 +135,8 @@ public void onSessionRemoteClose(Event e) { if (session != null && session.getLocalState() != EndpointState.CLOSED) { if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format(Locale.US, "onSessionRemoteClose closing a local session for entityName[%s], condition[%s], description[%s]", - this.entityName, - condition != null ? condition.getCondition() : "n/a", - condition != null ? condition.getDescription() : "n/a")); + TRACE_LOGGER.info(String.format(Locale.US, "onSessionRemoteClose closing a local session for connectionId[%s], entityName[%s], condition[%s], description[%s]", + this.connectionId, this.entityName, condition != null ? condition.getCondition() : "n/a", condition != null ? condition.getDescription() : "n/a")); } session.setCondition(session.getRemoteCondition()); @@ -155,19 +155,19 @@ public void onSessionFinal(Event e) { final Session session = e.getSession(); ErrorCondition condition = session != null ? session.getCondition() : null; - TRACE_LOGGER.info(String.format(Locale.US, "onSessionFinal entityName[%s], condition[%s], description[%s]", - this.entityName, - condition != null ? condition.getCondition() : "n/a", - condition != null ? condition.getDescription() : "n/a")); + TRACE_LOGGER.info(String.format(Locale.US, "onSessionFinal connectionId[%s], entityName[%s], condition[%s], description[%s]", + this.connectionId, this.entityName, condition != null ? condition.getCondition() : "n/a", condition != null ? condition.getDescription() : "n/a")); } } private class SessionTimeoutHandler extends DispatchHandler { private final String entityName; + private final String connectionId; - SessionTimeoutHandler(final Session session, final String entityName) { + SessionTimeoutHandler(final String entityName, final String connectionId) { this.entityName = entityName; + this.connectionId = connectionId; } @Override @@ -181,8 +181,8 @@ public void onEvent() { if (!sessionCreated && !sessionOpenErrorDispatched) { if (TRACE_LOGGER.isWarnEnabled()) { - TRACE_LOGGER.warn(String.format(Locale.US, "SessionTimeoutHandler.onEvent - entityName[%s], session open timed out.", - this.entityName)); + TRACE_LOGGER.warn(String.format(Locale.US, "SessionTimeoutHandler.onEvent - connectionId[%s], entityName[%s], session open timed out.", + this.connectionId, this.entityName)); } } } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/StringUtil.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/StringUtil.java index 6660d8d4289f..d55792e667ee 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/StringUtil.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/StringUtil.java @@ -3,10 +3,13 @@ package com.microsoft.azure.eventhubs.impl; +import java.time.Instant; +import java.util.Locale; import java.util.UUID; public final class StringUtil { public static final String EMPTY = ""; + public static final String SEPARATOR = "_"; public static boolean isNullOrEmpty(String string) { return (string == null || string.isEmpty()); @@ -25,7 +28,7 @@ public static boolean isNullOrWhiteSpace(String string) { return true; } - public static String getRandomString() { - return UUID.randomUUID().toString().substring(0, 6); + public static String getRandomString(String prefix) { + return String.format(Locale.US, "%s_%s_%s", prefix, UUID.randomUUID().toString().substring(0, 6), Instant.now().toEpochMilli()); } } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/TimeoutTracker.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/TimeoutTracker.java index 8a52e6bc2883..eb63d0b4af8d 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/TimeoutTracker.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/TimeoutTracker.java @@ -3,7 +3,6 @@ package com.microsoft.azure.eventhubs.impl; - import java.time.Duration; import java.time.Instant; diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/TrackingUtil.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/TrackingUtil.java index 74cde83b1ff4..fe40e2eb68a8 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/TrackingUtil.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/TrackingUtil.java @@ -5,34 +5,18 @@ import org.apache.qpid.proton.engine.Session; -import java.time.Instant; - public final class TrackingUtil { public static final String TRACKING_ID_TOKEN_SEPARATOR = "_"; private TrackingUtil() { } - /** - * parses ServiceBus role identifiers from trackingId - * - * @return null if no roleIdentifier found - */ - static String parseRoleIdentifier(final String trackingId) { - if (StringUtil.isNullOrWhiteSpace(trackingId) || !trackingId.contains(TRACKING_ID_TOKEN_SEPARATOR)) { - return null; - } - - return trackingId.substring(trackingId.indexOf(TRACKING_ID_TOKEN_SEPARATOR)); - } - public static String getLinkName(final Session session) { - // returned linkName lookslike: ea9cac_8b_G27_1479943074829 - final String linkNamePrefix = StringUtil.getRandomString(); - final String linkNameWithServiceRoleTracker = session.getConnection() != null && !StringUtil.isNullOrEmpty(session.getConnection().getRemoteContainer()) + // LN_1479943074829_ea9cac_8b_G27 + final String linkNamePrefix = StringUtil.getRandomString("LN"); + return session.getConnection() != null && !StringUtil.isNullOrEmpty(session.getConnection().getRemoteContainer()) ? linkNamePrefix.concat(TrackingUtil.TRACKING_ID_TOKEN_SEPARATOR).concat(session.getConnection().getRemoteContainer() - .substring(Math.max(session.getConnection().getRemoteContainer().length() - 7, 0), session.getConnection().getRemoteContainer().length())) + .substring(Math.max(session.getConnection().getRemoteContainer().length() - 7, 0))) : linkNamePrefix; - return linkNameWithServiceRoleTracker.concat(TrackingUtil.TRACKING_ID_TOKEN_SEPARATOR).concat(String.valueOf(Instant.now().toEpochMilli())); } } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketConnectionHandler.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketConnectionHandler.java index 5c97f42b71b2..51ccec9b5cd7 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketConnectionHandler.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketConnectionHandler.java @@ -13,7 +13,7 @@ public class WebSocketConnectionHandler extends ConnectionHandler { private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(WebSocketConnectionHandler.class); public WebSocketConnectionHandler(AmqpConnection amqpConnection) { - super(amqpConnection); + super(amqpConnection, StringUtil.getRandomString("WS")); } @Override diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketProxyConnectionHandler.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketProxyConnectionHandler.java index 7b1a172895ff..e7d512bcd3a3 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketProxyConnectionHandler.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketProxyConnectionHandler.java @@ -6,7 +6,6 @@ import com.microsoft.azure.proton.transport.proxy.ProxyHandler; import com.microsoft.azure.proton.transport.proxy.impl.ProxyHandlerImpl; import com.microsoft.azure.proton.transport.proxy.impl.ProxyImpl; - import org.apache.qpid.proton.amqp.transport.ConnectionError; import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.engine.Connection; @@ -17,18 +16,11 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.net.Authenticator; import java.net.InetSocketAddress; -import java.net.PasswordAuthentication; import java.net.Proxy; import java.net.ProxySelector; import java.net.URI; -import java.util.Base64; -import java.util.HashMap; import java.util.List; -import java.util.Map; - -import static java.nio.charset.StandardCharsets.UTF_8; public class WebSocketProxyConnectionHandler extends WebSocketConnectionHandler { private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(WebSocketProxyConnectionHandler.class); @@ -59,8 +51,7 @@ protected void addTransportLayers(final Event event, final TransportInternal tra // after creating the socket to proxy final String hostName = event.getConnection().getHostname(); final ProxyHandler proxyHandler = new ProxyHandlerImpl(); - final Map proxyHeader = getAuthorizationHeader(); - proxy.configure(hostName, proxyHeader, proxyHandler, transport); + proxy.configure(hostName, null, proxyHandler, transport); transport.addTransportLayer(proxy); @@ -120,38 +111,6 @@ public int getRemotePort() { return socketAddress.getPort(); } - private Map getAuthorizationHeader() { - final PasswordAuthentication authentication = Authenticator.requestPasswordAuthentication( - getRemoteHostName(), - null, - getRemotePort(), - null, - null, - "http", - null, - Authenticator.RequestorType.PROXY); - if (authentication == null) { - return null; - } - - final String proxyUserName = authentication.getUserName(); - final String proxyPassword = authentication.getPassword() != null - ? new String(authentication.getPassword()) - : null; - if (StringUtil.isNullOrEmpty(proxyUserName) - || StringUtil.isNullOrEmpty(proxyPassword)) { - return null; - } - - final HashMap proxyAuthorizationHeader = new HashMap<>(); - // https://tools.ietf.org/html/rfc7617 - final String usernamePasswordPair = proxyUserName + ":" + proxyPassword; - proxyAuthorizationHeader.put( - "Proxy-Authorization", - "Basic " + Base64.getEncoder().encodeToString(usernamePasswordPair.getBytes(UTF_8))); - return proxyAuthorizationHeader; - } - private InetSocketAddress getProxyAddress() { final URI serviceUri = createURIFromHostNamePort( this.getAmqpConnection().getHostName(), diff --git a/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/MsgFactoryOpenCloseTest.java b/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/MsgFactoryOpenCloseTest.java index 8d048b08dfdf..ecb3278fafd2 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/MsgFactoryOpenCloseTest.java +++ b/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/MsgFactoryOpenCloseTest.java @@ -3,10 +3,10 @@ package com.microsoft.azure.eventhubs.exceptioncontracts; +import com.microsoft.azure.eventhubs.CommunicationException; import com.microsoft.azure.eventhubs.ConnectionStringBuilder; import com.microsoft.azure.eventhubs.EventData; import com.microsoft.azure.eventhubs.EventHubClient; -import com.microsoft.azure.eventhubs.EventHubException; import com.microsoft.azure.eventhubs.EventPosition; import com.microsoft.azure.eventhubs.PartitionReceiveHandler; import com.microsoft.azure.eventhubs.PartitionReceiver; @@ -17,9 +17,9 @@ import com.microsoft.azure.eventhubs.lib.TestContext; import org.junit.Assert; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; +import java.time.Duration; import java.time.Instant; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -115,7 +115,6 @@ public void onError(Throwable error) { } } - @Ignore("TODO: Investigate testcase failure.") @Test() public void verifyThreadReleaseOnMsgFactoryOpenError() throws Exception { @@ -133,12 +132,14 @@ public void verifyThreadReleaseOnMsgFactoryOpenError() throws Exception { openFuture.get(); Assert.fail(); } catch (ExecutionException error) { - Assert.assertEquals(EventHubException.class, error.getCause().getClass()); + Assert.assertEquals(CommunicationException.class, error.getCause().getClass()); } - Thread.sleep(1000); // for reactor to transition from cleanup to complete-stop + // Waiting for reactor to transition from cleanup to complete-stop, this requires at least 60 seconds until + // the items are emptied. + Thread.sleep(Duration.ofSeconds(90).toMillis()); - Assert.assertEquals(((ScheduledThreadPoolExecutor) executor).getQueue().size(), 0); + Assert.assertEquals(0, ((ScheduledThreadPoolExecutor) executor).getQueue().size()); } finally { executor.shutdown(); } diff --git a/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/ReactorFaultTest.java b/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/ReactorFaultTest.java index d8973a721bcf..f3cb525cb257 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/ReactorFaultTest.java +++ b/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/ReactorFaultTest.java @@ -19,6 +19,7 @@ import org.apache.qpid.proton.reactor.Reactor; import org.junit.Assert; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; import java.lang.reflect.Field; @@ -34,6 +35,7 @@ public static void initialize() { connStr = TestContext.getConnectionString(); } + @Ignore("TODO: Investigate testcase. This fails.") @Test() public void verifyReactorRestartsOnProtonBugs() throws Exception { final EventHubClient eventHubClient = EventHubClient.createSync(connStr.toString(), TestContext.EXECUTOR_SERVICE); @@ -58,7 +60,7 @@ public void run() { handler.add(new BaseHandler() { @Override public void handle(org.apache.qpid.proton.engine.Event e) { - throw new NullPointerException(); + throw new NullPointerException("The test exception. We want this to restart."); } }); } catch (Exception e) { diff --git a/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/SecurityExceptionsTest.java b/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/SecurityExceptionsTest.java index 1416b65db58c..b070a110e0ad 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/SecurityExceptionsTest.java +++ b/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/SecurityExceptionsTest.java @@ -20,6 +20,7 @@ import org.junit.Test; import java.time.Duration; +import java.util.Locale; import java.util.UUID; public class SecurityExceptionsTest extends ApiTestBase { @@ -89,7 +90,7 @@ public void testEventHubClientUnAuthorizedAccessToken() throws Throwable { final String wrongToken = SharedAccessSignatureTokenProvider.generateSharedAccessSignature( "wrongkey", correctConnectionString.getSasKey(), - String.format("amqps://%s/%s", correctConnectionString.getEndpoint().getHost(), correctConnectionString.getEventHubName()), + String.format(Locale.US, "amqps://%s/%s", correctConnectionString.getEndpoint().getHost(), correctConnectionString.getEventHubName()), Duration.ofSeconds(10)); final ConnectionStringBuilder connectionString = new ConnectionStringBuilder() .setEndpoint(correctConnectionString.getEndpoint()) diff --git a/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/SendLargeMessageTest.java b/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/SendLargeMessageTest.java index c7a26fd13b4a..dee6bcbc38ed 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/SendLargeMessageTest.java +++ b/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/SendLargeMessageTest.java @@ -19,6 +19,7 @@ import org.junit.Test; import java.time.Instant; +import java.util.Locale; public class SendLargeMessageTest extends ApiTestBase { private static final String PARTITION_ID = "0"; @@ -97,6 +98,6 @@ private void sendLargeMessageTest(int msgSize) throws EventHubException { EventData recdMessage = messages.iterator().next(); - Assert.assertEquals(String.format("sent msg size: %s, recvd msg size: %s", msgSize, recdMessage.getBytes().length), recdMessage.getBytes().length, msgSize); + Assert.assertEquals(String.format(Locale.US, "sent msg size: %s, recvd msg size: %s", msgSize, recdMessage.getBytes().length), recdMessage.getBytes().length, msgSize); } } diff --git a/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/lib/FaultInjectingReactorFactory.java b/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/lib/FaultInjectingReactorFactory.java index 54b8562f6700..29b01efa89b4 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/lib/FaultInjectingReactorFactory.java +++ b/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/lib/FaultInjectingReactorFactory.java @@ -24,7 +24,7 @@ public void setFaultType(final FaultType faultType) { } @Override - public Reactor create(final ReactorHandler reactorHandler, final int maxFrameSize) throws IOException { + public Reactor create(final ReactorHandler reactorHandler, final int maxFrameSize, final String name) throws IOException { final Reactor reactor = Proton.reactor(reactorHandler); switch (this.faultType) { @@ -44,6 +44,10 @@ public enum FaultType { public static final class NetworkOutageSimulator extends CustomIOHandler { + public NetworkOutageSimulator() { + super("NetworkOutageSimulator"); + } + @Override public void onUnhandled(final Event event) { switch (event.getType()) { diff --git a/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/lib/SasTokenTestBase.java b/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/lib/SasTokenTestBase.java index 8e66a8c5a2a5..9ba244fe6747 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/lib/SasTokenTestBase.java +++ b/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/lib/SasTokenTestBase.java @@ -9,6 +9,7 @@ import org.junit.BeforeClass; import java.time.Duration; +import java.util.Locale; public class SasTokenTestBase extends ApiTestBase { @@ -24,7 +25,7 @@ public static void replaceConnectionString() throws Exception { .setSharedAccessSignature( SharedAccessSignatureTokenProvider.generateSharedAccessSignature(originalConnectionString.getSasKeyName(), originalConnectionString.getSasKey(), - String.format("amqp://%s/%s", originalConnectionString.getEndpoint().getHost(), originalConnectionString.getEventHubName()), + String.format(Locale.US, "amqp://%s/%s", originalConnectionString.getEndpoint().getHost(), originalConnectionString.getEventHubName()), Duration.ofDays(1)) ) .toString(); diff --git a/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/lib/TestContext.java b/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/lib/TestContext.java index 3366bc614364..cacc7461ba4e 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/lib/TestContext.java +++ b/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/lib/TestContext.java @@ -9,7 +9,6 @@ import java.util.concurrent.ScheduledExecutorService; public final class TestContext { - public static final ScheduledExecutorService EXECUTOR_SERVICE = Executors.newScheduledThreadPool(1); private static final String EVENT_HUB_CONNECTION_STRING_ENV_NAME = "AZURE_EVENTHUBS_CONNECTION_STRING"; diff --git a/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/sendrecv/ReceiveParallelManualTest.java b/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/sendrecv/ReceiveParallelManualTest.java index 73411d1d19c4..eb73673bf219 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/sendrecv/ReceiveParallelManualTest.java +++ b/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/sendrecv/ReceiveParallelManualTest.java @@ -16,6 +16,7 @@ import org.junit.AfterClass; import org.junit.BeforeClass; +import java.util.Locale; import java.util.concurrent.ExecutionException; import java.util.logging.FileHandler; import java.util.logging.Level; @@ -104,14 +105,14 @@ public void run() { long batchSize = (1 + IteratorUtil.getLast(receivedEvents.iterator()).getSystemProperties().getSequenceNumber()) - (IteratorUtil.getFirst(receivedEvents).getSystemProperties().getSequenceNumber()); totalEvents += batchSize; - System.out.println(String.format("[partitionId: %s] received %s events; total sofar: %s, begin: %s, end: %s", + System.out.println(String.format(Locale.US, "[partitionId: %s] received %s events; total sofar: %s, begin: %s, end: %s", sPartitionId, batchSize, totalEvents, IteratorUtil.getLast(receivedEvents.iterator()).getSystemProperties().getSequenceNumber(), IteratorUtil.getFirst(receivedEvents).getSystemProperties().getSequenceNumber())); } else { - System.out.println(String.format("received null on partition %s", sPartitionId)); + System.out.println(String.format(Locale.US, "received null on partition %s", sPartitionId)); } } catch (Exception exp) { System.out.println(exp.getMessage() + exp.toString()); diff --git a/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/sendrecv/ReceiveTest.java b/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/sendrecv/ReceiveTest.java index 51cfe84113d2..ef25a601ca55 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/sendrecv/ReceiveTest.java +++ b/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/sendrecv/ReceiveTest.java @@ -23,6 +23,7 @@ import java.time.Duration; import java.time.Instant; import java.util.Iterator; +import java.util.Locale; import java.util.concurrent.ExecutionException; import java.util.function.Consumer; @@ -69,7 +70,7 @@ public void testReceiverStartOfStreamFilters() throws EventHubException { for (EventData eventDataUsingOffset : startingEventsUsingOffsetReceiver) { EventData eventDataUsingDateTime = dateTimeIterator.next(); Assert.assertTrue( - String.format("START_OF_STREAM offset: %s, EPOCH offset: %s", eventDataUsingOffset.getSystemProperties().getOffset(), eventDataUsingDateTime.getSystemProperties().getOffset()), + String.format(Locale.US, "START_OF_STREAM offset: %s, EPOCH offset: %s", eventDataUsingOffset.getSystemProperties().getOffset(), eventDataUsingDateTime.getSystemProperties().getOffset()), eventDataUsingOffset.getSystemProperties().getOffset().equalsIgnoreCase(eventDataUsingDateTime.getSystemProperties().getOffset())); if (!dateTimeIterator.hasNext()) { diff --git a/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/sendrecv/SendTest.java b/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/sendrecv/SendTest.java index 6f79da660c55..3006234578c9 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/sendrecv/SendTest.java +++ b/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/sendrecv/SendTest.java @@ -23,6 +23,7 @@ import java.time.Instant; import java.util.LinkedList; import java.util.List; +import java.util.Locale; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -43,6 +44,7 @@ public static void initialize() throws Exception { final ConnectionStringBuilder connectionString = TestContext.getConnectionString(); initializeEventHub(connectionString); } + public static void initializeEventHub(final ConnectionStringBuilder connectionString) throws Exception { ehClient = EventHubClient.createSync(connectionString.toString(), TestContext.EXECUTOR_SERVICE); } @@ -175,7 +177,7 @@ public void onReceive(Iterable events) { for (EventData event : events) { if (!partitionKey.equals(event.getSystemProperties().getPartitionKey())) { this.validateSignal.completeExceptionally( - new AssertionFailedError(String.format("received partitionKey: %s, expected partitionKey: %s", event.getSystemProperties().getPartitionKey(), partitionKey))); + new AssertionFailedError(String.format(Locale.US, "received partitionKey: %s, expected partitionKey: %s", event.getSystemProperties().getPartitionKey(), partitionKey))); } this.currentEventCount++; @@ -215,9 +217,8 @@ public void onReceive(Iterable events) { for (EventData event : events) { final int currentEventOrder = (int) event.getProperties().get(ORDER_PROPERTY); if (currentEventOrder != currentCount) { - this.validateSignal.completeExceptionally(new AssertionError(String.format("expected %s, got %s", currentCount, currentEventOrder))); + this.validateSignal.completeExceptionally(new AssertionError(String.format(Locale.US, "expected %s, got %s", currentCount, currentEventOrder))); } - currentCount++; } } diff --git a/eventhubs/data-plane/pom.xml b/eventhubs/data-plane/pom.xml index e82e44df223e..93d1e09126b5 100644 --- a/eventhubs/data-plane/pom.xml +++ b/eventhubs/data-plane/pom.xml @@ -4,23 +4,23 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + com.azure + azure-client-sdk-parent + 1.0.0 + ../../pom.client.xml + + 4.0.0 com.microsoft.azure azure-eventhubs-clients pom - 2.0.0 + 2.3.0 Microsoft Azure Event Hubs SDK Parent Java libraries for talking to Windows Azure Event Hubs https://github.com/Azure/azure-sdk-for-java - - com.azure - azure-client-sdk-parent - 1.0.0 - ../../pom.client.xml - - azure-java-build-docs @@ -45,6 +45,7 @@ org.slf4j slf4j-api
+ junit junit diff --git a/eventhubs/data-plane/readme.md b/eventhubs/data-plane/readme.md index ea52c6191cd0..38634dbb2979 100644 --- a/eventhubs/data-plane/readme.md +++ b/eventhubs/data-plane/readme.md @@ -41,11 +41,11 @@ the required versions of Apache Qpid Proton-J, and the cryptography library BCPK |azure-eventhubs|[![Maven Central](https://maven-badges.herokuapp.com/maven-central/com.microsoft.azure/azure-eventhubs/badge.svg)](https://maven-badges.herokuapp.com/maven-central/com.microsoft.azure/azure-eventhubs) ```XML - - com.microsoft.azure - azure-eventhubs - 2.0.0 - + + com.microsoft.azure + azure-eventhubs + 2.3.0 + ``` #### Microsoft Azure EventHubs Java Event Processor Host library @@ -58,12 +58,12 @@ It pulls the required versions of Event Hubs, Azure Storage and GSon libraries. |azure-eventhubs-eph|[![Maven Central](https://maven-badges.herokuapp.com/maven-central/com.microsoft.azure/azure-eventhubs-eph/badge.svg)](https://maven-badges.herokuapp.com/maven-central/com.microsoft.azure/azure-eventhubs-eph) ```XML - - com.microsoft.azure - azure-eventhubs-eph - 2.2.0 - -``` + + com.microsoft.azure + azure-eventhubs-eph + 2.5.0 + +``` ## How to provide feedback diff --git a/pom.client.xml b/pom.client.xml index 460cb031d8cb..225f88e6694d 100644 --- a/pom.client.xml +++ b/pom.client.xml @@ -109,7 +109,7 @@ 1.10 3.1.11 0.31.0 - 1.1.0 + 1.2.0 2.11.1 2.9.3-01 2.4.16-03 @@ -249,13 +249,13 @@ log4j-api ${log4j-api.version} - + com.microsoft.rest.v2 client-runtime ${client-runtime.version.v2} - + org.slf4j slf4j-api @@ -362,7 +362,7 @@ ${cglib-nodep.version} test - + org.slf4j slf4j-simple