diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpChannelProcessor.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpChannelProcessor.java index 02400669c49d9..e8dac4df57b42 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpChannelProcessor.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpChannelProcessor.java @@ -6,6 +6,7 @@ import com.azure.core.amqp.AmqpEndpointState; import com.azure.core.amqp.AmqpRetryPolicy; import com.azure.core.amqp.exception.AmqpException; +import com.azure.core.util.AsyncCloseable; import com.azure.core.util.logging.ClientLogger; import org.reactivestreams.Processor; import org.reactivestreams.Subscription; @@ -298,8 +299,8 @@ private void setAndClearChannel() { } private void close(T channel) { - if (channel instanceof AsyncAutoCloseable) { - ((AsyncAutoCloseable) channel).closeAsync().subscribe(); + if (channel instanceof AsyncCloseable) { + ((AsyncCloseable) channel).closeAsync().subscribe(); } else if (channel instanceof AutoCloseable) { try { ((AutoCloseable) channel).close(); diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AsyncAutoCloseable.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AsyncAutoCloseable.java deleted file mode 100644 index 391ed693174c4..0000000000000 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AsyncAutoCloseable.java +++ /dev/null @@ -1,19 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -package com.azure.core.amqp.implementation; - -import reactor.core.publisher.Mono; - -/** - * Asynchronous class to close resources. - */ -public interface AsyncAutoCloseable { - - /** - * Begins the close operation. If one is in progress, will return that existing close operation. - * - * @return A mono representing the close operation. - */ - Mono closeAsync(); -} diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java index 17f76c3fafe6a..b363954e08776 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java @@ -8,6 +8,7 @@ import com.azure.core.amqp.AmqpRetryOptions; import com.azure.core.amqp.exception.AmqpErrorCondition; import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler; +import com.azure.core.util.AsyncCloseable; import com.azure.core.util.logging.ClientLogger; import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.amqp.Symbol; @@ -37,7 +38,7 @@ /** * Handles receiving events from Event Hubs service and translating them to proton-j messages. */ -public class ReactorReceiver implements AmqpReceiveLink, AsyncAutoCloseable, AutoCloseable { +public class ReactorReceiver implements AmqpReceiveLink, AsyncCloseable, AutoCloseable { private final String entityPath; private final Receiver receiver; private final ReceiveLinkHandler handler; diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java index 2ee4b2eb2dfc9..25e0988831e01 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java @@ -12,6 +12,7 @@ import com.azure.core.amqp.exception.AmqpException; import com.azure.core.amqp.exception.OperationCancelledException; import com.azure.core.amqp.implementation.handler.SendLinkHandler; +import com.azure.core.util.AsyncCloseable; import com.azure.core.util.logging.ClientLogger; import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.amqp.Binary; @@ -62,7 +63,7 @@ /** * Handles scheduling and transmitting events through proton-j to Event Hubs service. */ -class ReactorSender implements AmqpSendLink, AsyncAutoCloseable, AutoCloseable { +class ReactorSender implements AmqpSendLink, AsyncCloseable, AutoCloseable { private final String entityPath; private final Sender sender; private final SendLinkHandler handler; diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RequestResponseChannel.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RequestResponseChannel.java index 54704eb86d588..f7bed8c8242d8 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RequestResponseChannel.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RequestResponseChannel.java @@ -9,6 +9,7 @@ import com.azure.core.amqp.exception.AmqpErrorContext; import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler; import com.azure.core.amqp.implementation.handler.SendLinkHandler; +import com.azure.core.util.AsyncCloseable; import com.azure.core.util.logging.ClientLogger; import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.amqp.UnsignedLong; @@ -50,7 +51,7 @@ * Represents a bidirectional link between the message broker and the client. Allows client to send a request to the * broker and receive the associated response. */ -public class RequestResponseChannel implements AsyncAutoCloseable { +public class RequestResponseChannel implements AsyncCloseable { private final ConcurrentSkipListMap> unconfirmedSends = new ConcurrentSkipListMap<>(); private final AtomicBoolean hasError = new AtomicBoolean(); diff --git a/sdk/core/azure-core/CHANGELOG.md b/sdk/core/azure-core/CHANGELOG.md index 7f5fa70d1549a..0e9031ee19ea6 100644 --- a/sdk/core/azure-core/CHANGELOG.md +++ b/sdk/core/azure-core/CHANGELOG.md @@ -2,6 +2,9 @@ ## 1.17.0-beta.1 (Unreleased) +### Features Added + +- Added `AsyncCloseable` ## 1.16.0 (2021-05-07) diff --git a/sdk/core/azure-core/src/main/java/com/azure/core/util/AsyncCloseable.java b/sdk/core/azure-core/src/main/java/com/azure/core/util/AsyncCloseable.java new file mode 100644 index 0000000000000..2eede858adb31 --- /dev/null +++ b/sdk/core/azure-core/src/main/java/com/azure/core/util/AsyncCloseable.java @@ -0,0 +1,27 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.util; + +import reactor.core.publisher.Mono; + +/** + * Interface for close operations that are asynchronous. + * + *

Asynchronously closing a class

+ *

In the snippet below, we have a long-lived {@code NetworkResource} class. There are some operations such + * as closing {@literal I/O}. Instead of returning a sync {@code close()}, we use {@code closeAsync()} so users' + * programs don't block waiting for this operation to complete.

+ * + * {@codesnippet com.azure.core.util.AsyncCloseable.closeAsync} + */ +public interface AsyncCloseable { + /** + * Begins the close operation. If one is in progress, will return that existing close operation. If the close + * operation is unsuccessful, the Mono completes with an error. + * + * @return A Mono representing the close operation. If the close operation is unsuccessful, the Mono completes with + * an error. + */ + Mono closeAsync(); +} diff --git a/sdk/core/azure-core/src/samples/java/com/azure/core/util/AsyncCloseableJavaDocCodeSnippet.java b/sdk/core/azure-core/src/samples/java/com/azure/core/util/AsyncCloseableJavaDocCodeSnippet.java new file mode 100644 index 0000000000000..b5f5a21aed64d --- /dev/null +++ b/sdk/core/azure-core/src/samples/java/com/azure/core/util/AsyncCloseableJavaDocCodeSnippet.java @@ -0,0 +1,79 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.util; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.IntStream; + +/** + * Code snippets for {@link AsyncCloseable}. + */ +public class AsyncCloseableJavaDocCodeSnippet { + public void asyncResource() throws IOException { + // BEGIN: com.azure.core.util.AsyncCloseable.closeAsync + NetworkResource resource = new NetworkResource(); + resource.longRunningDownload("https://longdownload.com") + .subscribe( + byteBuffer -> System.out.println("Buffer received: " + byteBuffer), + error -> System.err.printf("Error occurred while downloading: %s%n", error), + () -> System.out.println("Completed download operation.")); + + System.out.println("Press enter to stop downloading."); + System.in.read(); + + // We block here because it is the end of the main Program function. A real-life program may chain this + // with some other close operations like save download/program state, etc. + resource.closeAsync().block(); + // END: com.azure.core.util.AsyncCloseable.closeAsync + } + + /** + * A long lived network resource. + */ + static class NetworkResource implements AsyncCloseable { + private final AtomicBoolean isClosed = new AtomicBoolean(); + private final Sinks.Empty closeMono = Sinks.empty(); + + /** + * Downloads a resource. + * + * @param url URL for the download. + * + * @return A stream of bytes. + */ + Flux longRunningDownload(String url) { + final byte[] bytes = url.getBytes(StandardCharsets.UTF_8); + + // Does nothing real but it represents taking from this possibly infinite Flux until + // the closeMono emits a signal. + return Flux.fromStream(IntStream.range(0, bytes.length) + .mapToObj(index -> ByteBuffer.wrap(bytes))) + .takeUntilOther(closeMono.asMono()); + } + + @Override + public Mono closeAsync() { + // If the close operation has started, then + if (isClosed.getAndSet(true)) { + return closeMono.asMono(); + } + + return startAsyncClose().then(closeMono.asMono()); + } + + private Mono startAsyncClose() { + return Mono.delay(Duration.ofSeconds(10)).then() + .doOnError(error -> closeMono.emitError(error, Sinks.EmitFailureHandler.FAIL_FAST)) + .doOnSuccess(unused -> closeMono.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST)); + } + } +} diff --git a/sdk/eventhubs/azure-messaging-eventhubs/pom.xml b/sdk/eventhubs/azure-messaging-eventhubs/pom.xml index eeaf6c5b01ae6..54f6df929347c 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/pom.xml +++ b/sdk/eventhubs/azure-messaging-eventhubs/pom.xml @@ -37,7 +37,7 @@ com.azure azure-core - 1.16.0 + 1.17.0-beta.1 com.azure diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/AmqpReceiveLinkProcessor.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/AmqpReceiveLinkProcessor.java index 6699224b71d42..abceceef10d3b 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/AmqpReceiveLinkProcessor.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/AmqpReceiveLinkProcessor.java @@ -9,7 +9,7 @@ import com.azure.core.amqp.exception.AmqpException; import com.azure.core.amqp.exception.LinkErrorContext; import com.azure.core.amqp.implementation.AmqpReceiveLink; -import com.azure.core.amqp.implementation.AsyncAutoCloseable; +import com.azure.core.util.AsyncCloseable; import com.azure.core.util.logging.ClientLogger; import org.apache.qpid.proton.message.Message; import org.reactivestreams.Subscription; @@ -599,8 +599,8 @@ private void disposeReceiver(AmqpReceiveLink link) { } try { - if (link instanceof AsyncAutoCloseable) { - ((AsyncAutoCloseable) link).closeAsync().subscribe(); + if (link instanceof AsyncCloseable) { + ((AsyncCloseable) link).closeAsync().subscribe(); } else { link.dispose(); } diff --git a/sdk/servicebus/azure-messaging-servicebus/pom.xml b/sdk/servicebus/azure-messaging-servicebus/pom.xml index 13ffa90838862..23004b35e5d2e 100644 --- a/sdk/servicebus/azure-messaging-servicebus/pom.xml +++ b/sdk/servicebus/azure-messaging-servicebus/pom.xml @@ -42,7 +42,7 @@ com.azure azure-core - 1.16.0 + 1.17.0-beta.1 com.azure diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiver.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiver.java index f865191c55092..03995a2cb0a35 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiver.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiver.java @@ -3,8 +3,8 @@ package com.azure.messaging.servicebus; import com.azure.core.amqp.AmqpRetryOptions; -import com.azure.core.amqp.implementation.AsyncAutoCloseable; import com.azure.core.amqp.implementation.MessageSerializer; +import com.azure.core.util.AsyncCloseable; import com.azure.core.util.CoreUtils; import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.servicebus.implementation.LockContainer; @@ -29,7 +29,7 @@ /** * Represents an session that is received when "any" session is accepted from the service. */ -class ServiceBusSessionReceiver implements AsyncAutoCloseable, AutoCloseable { +class ServiceBusSessionReceiver implements AsyncCloseable, AutoCloseable { private final AtomicBoolean isDisposed = new AtomicBoolean(); private final LockContainer lockContainer; private final AtomicReference sessionLockedUntil = new AtomicReference<>(); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLink.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLink.java index 403b66f32477d..8e41bde7bd9ef 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLink.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLink.java @@ -4,7 +4,7 @@ package com.azure.messaging.servicebus.implementation; import com.azure.core.amqp.implementation.AmqpReceiveLink; -import com.azure.core.amqp.implementation.AsyncAutoCloseable; +import com.azure.core.util.AsyncCloseable; import org.apache.qpid.proton.amqp.transport.DeliveryState; import reactor.core.publisher.Mono; @@ -13,7 +13,7 @@ /** * Represents an AMQP receive link. */ -public interface ServiceBusReceiveLink extends AmqpReceiveLink, AsyncAutoCloseable { +public interface ServiceBusReceiveLink extends AmqpReceiveLink, AsyncCloseable { /** * Gets the session id associated with the link. * diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessor.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessor.java index d242b3a4727b0..47aa31ebcd4ba 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessor.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessor.java @@ -6,7 +6,7 @@ import com.azure.core.amqp.AmqpEndpointState; import com.azure.core.amqp.AmqpRetryPolicy; import com.azure.core.amqp.implementation.AmqpReceiveLink; -import com.azure.core.amqp.implementation.AsyncAutoCloseable; +import com.azure.core.util.AsyncCloseable; import com.azure.core.util.logging.ClientLogger; import org.apache.qpid.proton.amqp.transport.DeliveryState; import org.apache.qpid.proton.message.Message; @@ -585,8 +585,8 @@ private void disposeReceiver(AmqpReceiveLink link) { } try { - if (link instanceof AsyncAutoCloseable) { - ((AsyncAutoCloseable) link).closeAsync().subscribe(); + if (link instanceof AsyncCloseable) { + ((AsyncCloseable) link).closeAsync().subscribe(); } else { link.dispose(); } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/IntegrationTestBase.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/IntegrationTestBase.java index a063615b0228f..4d65ca84d8379 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/IntegrationTestBase.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/IntegrationTestBase.java @@ -6,9 +6,9 @@ import com.azure.core.amqp.AmqpTransportType; import com.azure.core.amqp.ProxyAuthenticationType; import com.azure.core.amqp.ProxyOptions; -import com.azure.core.amqp.implementation.AsyncAutoCloseable; import com.azure.core.test.TestBase; import com.azure.core.test.TestMode; +import com.azure.core.util.AsyncCloseable; import com.azure.core.util.Configuration; import com.azure.core.util.CoreUtils; import com.azure.core.util.logging.ClientLogger; @@ -335,8 +335,8 @@ protected void dispose(AutoCloseable... closeables) { continue; } - if (closeable instanceof AsyncAutoCloseable) { - final Mono voidMono = ((AsyncAutoCloseable) closeable).closeAsync(); + if (closeable instanceof AsyncCloseable) { + final Mono voidMono = ((AsyncCloseable) closeable).closeAsync(); closeableMonos.add(voidMono); voidMono.subscribe();