Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add VertxContextSupport#executeBlocking() #43444

Merged
merged 1 commit into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
package io.quarkus.vertx;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import java.util.List;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.RequestScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
Expand All @@ -18,6 +22,7 @@
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.arc.Arc;
import io.quarkus.runtime.BlockingOperationControl;
import io.quarkus.runtime.StartupEvent;
import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle;
Expand Down Expand Up @@ -46,25 +51,41 @@ public void testRunner() throws InterruptedException {
@Singleton
public static class Alpha {

@Inject
Bravo bravo;

String val;

final List<Integer> vals = new CopyOnWriteArrayList<>();
final CountDownLatch latch = new CountDownLatch(1);

void onStart(@Observes StartupEvent event) {
// Request context is active but duplicated context is not used
String bravoId = bravo.getId();
Supplier<Uni<String>> uniSupplier = new Supplier<Uni<String>>() {
@Override
public Uni<String> get() {
assertTrue(VertxContext.isOnDuplicatedContext());
VertxContextSafetyToggle.validateContextIfExists("Error", "Error");
assertTrue(Arc.container().requestContext().isActive());
return Uni.createFrom().item("foo");
// New duplicated contex -> new request context
String asyncBravoId = bravo.getId();
assertNotEquals(bravoId, asyncBravoId);

return VertxContextSupport.executeBlocking(() -> {
assertTrue(BlockingOperationControl.isBlockingAllowed());
assertTrue(VertxContext.isOnDuplicatedContext());
assertTrue(Arc.container().requestContext().isActive());
// Duplicated context is propagated -> the same request context
assertEquals(asyncBravoId, bravo.getId());
return "foo";
});
}
};
try {
val = VertxContextSupport.subscribeAndAwait(uniSupplier);
} catch (Throwable e) {
fail();
fail(e);
}

Supplier<Multi<Integer>> multiSupplier = new Supplier<Multi<Integer>>() {
Expand All @@ -80,4 +101,20 @@ public Multi<Integer> get() {
}
}

@RequestScoped
public static class Bravo {

private String id;

@PostConstruct
void init() {
this.id = UUID.randomUUID().toString();
}

public String getId() {
return id;
}

}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.quarkus.vertx;

import java.util.concurrent.Callable;
import java.util.function.Consumer;
import java.util.function.Supplier;

Expand Down Expand Up @@ -34,7 +35,7 @@ private VertxContextSupport() {
* @throws IllegalStateException If called on an event loop thread.
*/
public static <T> T subscribeAndAwait(Supplier<Uni<T>> uniSupplier) throws Throwable {
Context context = getContext();
Context context = getContext(false);
VertxContextSafetyToggle.setContextSafe(context, true);
return Uni.createFrom().<T> emitter(e -> {
context.runOnContext(new Handler<Void>() {
Expand Down Expand Up @@ -69,7 +70,7 @@ public void handle(Void event) {
* @param subscribeConsumer
*/
public static <T> void subscribe(Supplier<Multi<T>> multiSupplier, Consumer<MultiSubscribe<T>> subscribeConsumer) {
Context context = getContext();
Context context = getContext(false);
VertxContextSafetyToggle.setContextSafe(context, true);
context.runOnContext(new Handler<Void>() {

Expand All @@ -96,14 +97,44 @@ public void accept(MultiSubscribe<T> ms) {
});
}

private static Context getContext() {
/**
* Executes the supplied blocking {@link Callable} on a Vertx duplicated context; does not block the current thread.
* <p>
* If necessary, the CDI request context is activated during execution of the blocking code.
*
* @param <T>
* @param callable
* @return the produced {@link Uni}
* @see VertxContext#getOrCreateDuplicatedContext(Vertx)
*/
public static <T> Uni<T> executeBlocking(Callable<T> callable) {
Context context = getContext(true);
return Uni.createFrom().completionStage(() -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defer might be slightly better.

@jponge wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defer might be slightly better.

In what exactly? I mean, both suppliers are called at subscription time. And we would need to convert the io.vertx.core.Future to the Uni anyway.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was referring to https://javadoc.io/doc/io.smallrye.reactive/mutiny/latest/io.smallrye.mutiny/io/smallrye/mutiny/groups/UniCreate.html which avoid using completion stages and it's slightly more optimized. But, yes, then you need an emitter, so the cost would be almost equivalent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, sorry, missed the anchor.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deferred with supplier is better, but a CompletionStage isn't incorrect per-se.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deferred with supplier is better, but a CompletionStage isn't incorrect per-se.

Better in terms of performance, or?

Hm, but how do I create a Uni from io.vertx.core.Future without Uni.createFrom().completionStage()? That's what io.smallrye.mutiny.vertx.UniHelper.toUni(Future<T>) does by the way.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes yes you are right

return context.executeBlocking(() -> {
ManagedContext requestContext = Arc.container().requestContext();
boolean terminate = requestContext.isActive() ? false : true;
if (terminate) {
requestContext.activate();
}
try {
return callable.call();
} finally {
if (terminate) {
requestContext.terminate();
}
}
}, false).toCompletionStage();
});
}

private static Context getContext(boolean blocking) {
Context context = Vertx.currentContext();
if (context == null) {
Vertx vertx = VertxCoreRecorder.getVertx().get();
context = VertxContext.getOrCreateDuplicatedContext(vertx);
} else {
// Executed on a vertx thread...
if (Context.isOnEventLoopThread()) {
if (!blocking && Context.isOnEventLoopThread()) {
throw new IllegalStateException("VertxContextSupport#subscribeAndAwait() must not be called on an event loop!");
}
context = VertxContext.getOrCreateDuplicatedContext(context);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package io.quarkus.websockets.next.test.upgrade;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.net.URI;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;

import jakarta.inject.Inject;
import jakarta.inject.Singleton;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.arc.Arc;
import io.quarkus.runtime.BlockingOperationControl;
import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.vertx.VertxContextSupport;
import io.quarkus.websockets.next.HttpUpgradeCheck;
import io.quarkus.websockets.next.OnOpen;
import io.quarkus.websockets.next.WebSocket;
import io.quarkus.websockets.next.test.utils.WSClient;
import io.smallrye.common.vertx.VertxContext;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Vertx;

public class BlockingHttpUpgradeCheckTest {

@RegisterExtension
public static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> root
.addClasses(BlockingHttpUpgradeCheck.class, Endpoint.class, WSClient.class));

@TestHTTPResource("/end")
URI endUri;

@Inject
Vertx vertx;

@Test
public void testBlockingCheck() {
try (WSClient client = new WSClient(vertx)) {
client.connect(endUri);
client.waitForMessages(1);
assertEquals("ok", client.getMessages().get(0).toString());
assertTrue(BlockingHttpUpgradeCheck.PERFORMED.get());
}
}

@WebSocket(path = "/end")
public static class Endpoint {

@OnOpen
String open() {
return "ok";
}

}

@Singleton
public static class BlockingHttpUpgradeCheck implements HttpUpgradeCheck {

static final AtomicBoolean PERFORMED = new AtomicBoolean();

@Override
public Uni<CheckResult> perform(HttpUpgradeContext context) {
return VertxContextSupport.executeBlocking(new Callable<CheckResult>() {

@Override
public CheckResult call() throws Exception {
assertTrue(BlockingOperationControl.isBlockingAllowed());
assertTrue(VertxContext.isOnDuplicatedContext());
assertTrue(Arc.container().requestContext().isActive());
PERFORMED.set(true);
return CheckResult.permitUpgradeSync();
}
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.stream.Stream;

import io.quarkus.security.identity.SecurityIdentity;
import io.quarkus.vertx.VertxContextSupport;
import io.smallrye.mutiny.Uni;
import io.vertx.core.http.HttpServerRequest;

Expand All @@ -23,6 +24,9 @@ public interface HttpUpgradeCheck {

/**
* This method inspects HTTP Upgrade context and either allows or denies upgrade to a WebSocket connection.
* <p>
* Use {@link VertxContextSupport#executeBlocking(java.util.concurrent.Callable)} in order to execute some blocking code in
* the check.
*
* @param context {@link HttpUpgradeContext}
* @return check result; must never be null
Expand Down
Loading