Skip to content

Commit

Permalink
Adds AsyncCloseable (#21991)
Browse files Browse the repository at this point in the history
* Adding AsyncCloseable with codesnippet.

* Implementing AsyncCloseable and deleting AsyncAutoCloseable.

* Add CHANGELOG entry.

* Removing azure-core as an explicit dependency.

* Fix use in AmqpReceiveLinkProcessor.
  • Loading branch information
conniey authored Jun 1, 2021
1 parent b26abbb commit 851796e
Show file tree
Hide file tree
Showing 15 changed files with 133 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.util.AsyncCloseable;
import com.azure.core.util.logging.ClientLogger;
import org.reactivestreams.Processor;
import org.reactivestreams.Subscription;
Expand Down Expand Up @@ -298,8 +299,8 @@ private void setAndClearChannel() {
}

private void close(T channel) {
if (channel instanceof AsyncAutoCloseable) {
((AsyncAutoCloseable) channel).closeAsync().subscribe();
if (channel instanceof AsyncCloseable) {
((AsyncCloseable) channel).closeAsync().subscribe();
} else if (channel instanceof AutoCloseable) {
try {
((AutoCloseable) channel).close();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler;
import com.azure.core.util.AsyncCloseable;
import com.azure.core.util.logging.ClientLogger;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Symbol;
Expand Down Expand Up @@ -37,7 +38,7 @@
/**
* Handles receiving events from Event Hubs service and translating them to proton-j messages.
*/
public class ReactorReceiver implements AmqpReceiveLink, AsyncAutoCloseable, AutoCloseable {
public class ReactorReceiver implements AmqpReceiveLink, AsyncCloseable, AutoCloseable {
private final String entityPath;
private final Receiver receiver;
private final ReceiveLinkHandler handler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.exception.OperationCancelledException;
import com.azure.core.amqp.implementation.handler.SendLinkHandler;
import com.azure.core.util.AsyncCloseable;
import com.azure.core.util.logging.ClientLogger;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Binary;
Expand Down Expand Up @@ -62,7 +63,7 @@
/**
* Handles scheduling and transmitting events through proton-j to Event Hubs service.
*/
class ReactorSender implements AmqpSendLink, AsyncAutoCloseable, AutoCloseable {
class ReactorSender implements AmqpSendLink, AsyncCloseable, AutoCloseable {
private final String entityPath;
private final Sender sender;
private final SendLinkHandler handler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler;
import com.azure.core.amqp.implementation.handler.SendLinkHandler;
import com.azure.core.util.AsyncCloseable;
import com.azure.core.util.logging.ClientLogger;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.UnsignedLong;
Expand Down Expand Up @@ -50,7 +51,7 @@
* Represents a bidirectional link between the message broker and the client. Allows client to send a request to the
* broker and receive the associated response.
*/
public class RequestResponseChannel implements AsyncAutoCloseable {
public class RequestResponseChannel implements AsyncCloseable {
private final ConcurrentSkipListMap<UnsignedLong, MonoSink<Message>> unconfirmedSends =
new ConcurrentSkipListMap<>();
private final AtomicBoolean hasError = new AtomicBoolean();
Expand Down
3 changes: 3 additions & 0 deletions sdk/core/azure-core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## 1.17.0-beta.1 (Unreleased)

### Features Added

- Added `AsyncCloseable`

## 1.16.0 (2021-05-07)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.util;

import reactor.core.publisher.Mono;

/**
* Interface for close operations that are asynchronous.
*
* <p><strong>Asynchronously closing a class</strong></p>
* <p>In the snippet below, we have a long-lived {@code NetworkResource} class. There are some operations such
* as closing {@literal I/O}. Instead of returning a sync {@code close()}, we use {@code closeAsync()} so users'
* programs don't block waiting for this operation to complete.</p>
*
* {@codesnippet com.azure.core.util.AsyncCloseable.closeAsync}
*/
public interface AsyncCloseable {
/**
* Begins the close operation. If one is in progress, will return that existing close operation. If the close
* operation is unsuccessful, the Mono completes with an error.
*
* @return A Mono representing the close operation. If the close operation is unsuccessful, the Mono completes with
* an error.
*/
Mono<Void> closeAsync();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.util;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.IntStream;

/**
* Code snippets for {@link AsyncCloseable}.
*/
public class AsyncCloseableJavaDocCodeSnippet {
public void asyncResource() throws IOException {
// BEGIN: com.azure.core.util.AsyncCloseable.closeAsync
NetworkResource resource = new NetworkResource();
resource.longRunningDownload("https://longdownload.com")
.subscribe(
byteBuffer -> System.out.println("Buffer received: " + byteBuffer),
error -> System.err.printf("Error occurred while downloading: %s%n", error),
() -> System.out.println("Completed download operation."));

System.out.println("Press enter to stop downloading.");
System.in.read();

// We block here because it is the end of the main Program function. A real-life program may chain this
// with some other close operations like save download/program state, etc.
resource.closeAsync().block();
// END: com.azure.core.util.AsyncCloseable.closeAsync
}

/**
* A long lived network resource.
*/
static class NetworkResource implements AsyncCloseable {
private final AtomicBoolean isClosed = new AtomicBoolean();
private final Sinks.Empty<Void> closeMono = Sinks.empty();

/**
* Downloads a resource.
*
* @param url URL for the download.
*
* @return A stream of bytes.
*/
Flux<ByteBuffer> longRunningDownload(String url) {
final byte[] bytes = url.getBytes(StandardCharsets.UTF_8);

// Does nothing real but it represents taking from this possibly infinite Flux until
// the closeMono emits a signal.
return Flux.fromStream(IntStream.range(0, bytes.length)
.mapToObj(index -> ByteBuffer.wrap(bytes)))
.takeUntilOther(closeMono.asMono());
}

@Override
public Mono<Void> closeAsync() {
// If the close operation has started, then
if (isClosed.getAndSet(true)) {
return closeMono.asMono();
}

return startAsyncClose().then(closeMono.asMono());
}

private Mono<Void> startAsyncClose() {
return Mono.delay(Duration.ofSeconds(10)).then()
.doOnError(error -> closeMono.emitError(error, Sinks.EmitFailureHandler.FAIL_FAST))
.doOnSuccess(unused -> closeMono.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST));
}
}
}
2 changes: 1 addition & 1 deletion sdk/eventhubs/azure-messaging-eventhubs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>1.16.0</version> <!-- {x-version-update;com.azure:azure-core;dependency} -->
<version>1.17.0-beta.1</version> <!-- {x-version-update;unreleased_com.azure:azure-core;dependency} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.exception.LinkErrorContext;
import com.azure.core.amqp.implementation.AmqpReceiveLink;
import com.azure.core.amqp.implementation.AsyncAutoCloseable;
import com.azure.core.util.AsyncCloseable;
import com.azure.core.util.logging.ClientLogger;
import org.apache.qpid.proton.message.Message;
import org.reactivestreams.Subscription;
Expand Down Expand Up @@ -599,8 +599,8 @@ private void disposeReceiver(AmqpReceiveLink link) {
}

try {
if (link instanceof AsyncAutoCloseable) {
((AsyncAutoCloseable) link).closeAsync().subscribe();
if (link instanceof AsyncCloseable) {
((AsyncCloseable) link).closeAsync().subscribe();
} else {
link.dispose();
}
Expand Down
2 changes: 1 addition & 1 deletion sdk/servicebus/azure-messaging-servicebus/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>1.16.0</version> <!-- {x-version-update;com.azure:azure-core;dependency} -->
<version>1.17.0-beta.1</version> <!-- {x-version-update;unreleased_com.azure:azure-core;dependency} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
package com.azure.messaging.servicebus;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.implementation.AsyncAutoCloseable;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.util.AsyncCloseable;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.implementation.LockContainer;
Expand All @@ -29,7 +29,7 @@
/**
* Represents an session that is received when "any" session is accepted from the service.
*/
class ServiceBusSessionReceiver implements AsyncAutoCloseable, AutoCloseable {
class ServiceBusSessionReceiver implements AsyncCloseable, AutoCloseable {
private final AtomicBoolean isDisposed = new AtomicBoolean();
private final LockContainer<OffsetDateTime> lockContainer;
private final AtomicReference<OffsetDateTime> sessionLockedUntil = new AtomicReference<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
package com.azure.messaging.servicebus.implementation;

import com.azure.core.amqp.implementation.AmqpReceiveLink;
import com.azure.core.amqp.implementation.AsyncAutoCloseable;
import com.azure.core.util.AsyncCloseable;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import reactor.core.publisher.Mono;

Expand All @@ -13,7 +13,7 @@
/**
* Represents an AMQP receive link.
*/
public interface ServiceBusReceiveLink extends AmqpReceiveLink, AsyncAutoCloseable {
public interface ServiceBusReceiveLink extends AmqpReceiveLink, AsyncCloseable {
/**
* Gets the session id associated with the link.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.implementation.AmqpReceiveLink;
import com.azure.core.amqp.implementation.AsyncAutoCloseable;
import com.azure.core.util.AsyncCloseable;
import com.azure.core.util.logging.ClientLogger;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.message.Message;
Expand Down Expand Up @@ -585,8 +585,8 @@ private void disposeReceiver(AmqpReceiveLink link) {
}

try {
if (link instanceof AsyncAutoCloseable) {
((AsyncAutoCloseable) link).closeAsync().subscribe();
if (link instanceof AsyncCloseable) {
((AsyncCloseable) link).closeAsync().subscribe();
} else {
link.dispose();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
import com.azure.core.amqp.AmqpTransportType;
import com.azure.core.amqp.ProxyAuthenticationType;
import com.azure.core.amqp.ProxyOptions;
import com.azure.core.amqp.implementation.AsyncAutoCloseable;
import com.azure.core.test.TestBase;
import com.azure.core.test.TestMode;
import com.azure.core.util.AsyncCloseable;
import com.azure.core.util.Configuration;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
Expand Down Expand Up @@ -335,8 +335,8 @@ protected void dispose(AutoCloseable... closeables) {
continue;
}

if (closeable instanceof AsyncAutoCloseable) {
final Mono<Void> voidMono = ((AsyncAutoCloseable) closeable).closeAsync();
if (closeable instanceof AsyncCloseable) {
final Mono<Void> voidMono = ((AsyncCloseable) closeable).closeAsync();
closeableMonos.add(voidMono);

voidMono.subscribe();
Expand Down

0 comments on commit 851796e

Please sign in to comment.