Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hibernate Reactive does not combine with blocking calls in Quarkus 3 #32665

Closed
wjglerum opened this issue Apr 15, 2023 · 54 comments
Closed

Hibernate Reactive does not combine with blocking calls in Quarkus 3 #32665

wjglerum opened this issue Apr 15, 2023 · 54 comments
Labels
area/hibernate-reactive Hibernate Reactive area/persistence OBSOLETE, DO NOT USE kind/bug Something isn't working

Comments

@wjglerum
Copy link
Contributor

wjglerum commented Apr 15, 2023

Describe the bug

In Quarkus 2 it's possible to combine blocking calls when writing to the database with Hibernate Reactive with Panache. It looks like this is not possible anymore in Quarkus 3.

Take the following use case:

  • We have a scheduled method that fetches new fruits using a service and stores them using a repository provided by Panache
  • The service simulates a blocking call by sleeping for several seconds before returning the results. To make sure we do not block the event loop threads we schedule this work on the worker pool. With .runSubscriptionOn(Infrastructure.getDefaultWorkerPool()), see https://smallrye.io/smallrye-mutiny/2.1.0/guides/imperative-to-reactive/#running-blocking-code-on-subscription
  • Next we store the retrieved fruit using the repository.
@Entity
public class Fruit extends PanacheEntity {

    @Column
    public String name;

    public static Fruit of(String name) {
        var fruit = new Fruit();
        fruit.name = name;
        return fruit;
    }
}

@ApplicationScoped
public class FruitRepository implements PanacheRepository<Fruit> {
}

@ApplicationScoped
public class FruitService {

    public Uni<Fruit> random() {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            Log.info("Fetching fruits!");
            Thread.sleep(3000);
            // Not so random ;)
            return Fruit.of("Banana");
        })).runSubscriptionOn(Infrastructure.getDefaultWorkerPool());
    }
}

@Singleton
@WithTransaction
public class FruitScheduler {

    @Inject
    FruitService fruitService;

    @Inject
    FruitRepository fruitRepository;

    @Scheduled(every = "1m")
    public Uni<Void> store() {
        return fruitService.random().flatMap(fruitRepository::persist).replaceWithVoid();
    }
}

This all worked fine on Quarkus 2 (the latest I tried was 2.16.6.Final), but doesn't work on Quarkus 3 (3.0.0.CR2)

Expected behavior

I would expect that we can do some blocking work during a transaction when using Hibernate Reactive, especially when we delegate that blocking work to a worker thread and switch back to an event loop thread when we persist the entity with Hibernate Reactive.

Actual behavior

The scheduled method fails with the following exception:

2023-04-15 13:21:04,408 ERROR [io.qua.sch.com.run.StatusEmitterInvoker] (executor-thread-1) Error occurred while executing task for trigger IntervalTrigger [id=1_nl.wjglerum.FruitResource#store, interval=60000]: java.util.concurrent.CompletionException: io.smallrye.mutiny.CompositeException: Multiple exceptions caught:
	[Exception 0] io.smallrye.mutiny.CompositeException: Multiple exceptions caught:
	[Exception 0] java.lang.IllegalStateException: HR000069: Detected use of the reactive Session from a different Thread than the one which was used to open the reactive Session - this suggests an invalid integration; original thread [136]: 'vert.x-eventloop-thread-4' current Thread [122]: 'executor-thread-1'
	[Exception 1] java.lang.IllegalStateException: HR000068: This method should exclusively be invoked from a Vert.x EventLoop thread; currently running on thread 'executor-thread-1'
	[Exception 1] java.lang.IllegalStateException: HR000069: Detected use of the reactive Session from a different Thread than the one which was used to open the reactive Session - this suggests an invalid integration; original thread [136]: 'vert.x-eventloop-thread-4' current Thread [122]: 'executor-thread-1'
	at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332)
	at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:347)
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:874)
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
	at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162)
	at io.smallrye.context.impl.wrappers.SlowContextualConsumer.accept(SlowContextualConsumer.java:21)
	at io.smallrye.mutiny.helpers.UniCallbackSubscriber.onFailure(UniCallbackSubscriber.java:62)
	at io.smallrye.mutiny.operators.uni.UniOnItemOrFailureFlatMap$UniOnItemOrFailureFlatMapProcessor.onFailure(UniOnItemOrFailureFlatMap.java:67)
	at io.smallrye.mutiny.operators.uni.UniOnItemOrFailureFlatMap$UniOnItemOrFailureFlatMapProcessor.onFailure(UniOnItemOrFailureFlatMap.java:67)
	at io.smallrye.mutiny.operators.uni.builders.UniCreateFromKnownFailure$KnownFailureSubscription.forward(UniCreateFromKnownFailure.java:38)
	at io.smallrye.mutiny.operators.uni.builders.UniCreateFromKnownFailure.subscribe(UniCreateFromKnownFailure.java:23)
	at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
	at io.smallrye.mutiny.operators.uni.UniOnItemOrFailureFlatMap$UniOnItemOrFailureFlatMapProcessor.performInnerSubscription(UniOnItemOrFailureFlatMap.java:99)
	at io.smallrye.mutiny.operators.uni.UniOnItemOrFailureFlatMap$UniOnItemOrFailureFlatMapProcessor.onFailure(UniOnItemOrFailureFlatMap.java:65)
	at io.smallrye.mutiny.operators.uni.UniOnTermination$UniOnTerminationProcessor.onFailure(UniOnTermination.java:52)
	at io.smallrye.mutiny.operators.uni.UniOperatorProcessor.onFailure(UniOperatorProcessor.java:55)
	at io.smallrye.mutiny.helpers.EmptyUniSubscription.propagateFailureEvent(EmptyUniSubscription.java:40)
	at io.smallrye.mutiny.operators.uni.builders.UniCreateFromCompletionStage.subscribe(UniCreateFromCompletionStage.java:26)
	at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
	at io.smallrye.mutiny.operators.uni.UniRunSubscribeOn.lambda$subscribe$0(UniRunSubscribeOn.java:27)
	at org.hibernate.reactive.context.impl.VertxContext.execute(VertxContext.java:90)
	at io.smallrye.mutiny.operators.uni.UniRunSubscribeOn.subscribe(UniRunSubscribeOn.java:25)
	at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
	at io.smallrye.mutiny.operators.uni.UniOnTermination.subscribe(UniOnTermination.java:21)
	at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
	at io.smallrye.mutiny.operators.uni.UniOnItemOrFailureFlatMap.subscribe(UniOnItemOrFailureFlatMap.java:27)
	at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
	at io.smallrye.mutiny.operators.uni.UniOnItemOrFailureFlatMap$UniOnItemOrFailureFlatMapProcessor.performInnerSubscription(UniOnItemOrFailureFlatMap.java:99)
	at io.smallrye.mutiny.operators.uni.UniOnItemOrFailureFlatMap$UniOnItemOrFailureFlatMapProcessor.onFailure(UniOnItemOrFailureFlatMap.java:65)
	at io.smallrye.mutiny.operators.uni.UniOperatorProcessor.onFailure(UniOperatorProcessor.java:55)
	at io.smallrye.mutiny.operators.uni.UniOnTermination$UniOnTerminationProcessor.onFailure(UniOnTermination.java:52)
	at io.smallrye.mutiny.operators.uni.UniOperatorProcessor.onFailure(UniOperatorProcessor.java:55)
	at io.smallrye.mutiny.operators.uni.UniOperatorProcessor.onFailure(UniOperatorProcessor.java:55)
	at io.smallrye.mutiny.operators.uni.UniOperatorProcessor.onFailure(UniOperatorProcessor.java:55)
	at io.smallrye.mutiny.operators.uni.UniOnCancellationCall$UniOnCancellationCallProcessor.onFailure(UniOnCancellationCall.java:59)
	at io.smallrye.mutiny.operators.uni.UniOnFailureFlatMap$UniOnFailureFlatMapProcessor.performInnerSubscription(UniOnFailureFlatMap.java:94)
	at io.smallrye.mutiny.operators.uni.UniOnFailureFlatMap$UniOnFailureFlatMapProcessor.dispatch(UniOnFailureFlatMap.java:83)
	at io.smallrye.mutiny.operators.uni.UniOnFailureFlatMap$UniOnFailureFlatMapProcessor.onFailure(UniOnFailureFlatMap.java:60)
	at io.smallrye.mutiny.operators.uni.UniOperatorProcessor.onFailure(UniOperatorProcessor.java:55)
	at io.smallrye.mutiny.operators.uni.UniOperatorProcessor.onFailure(UniOperatorProcessor.java:55)
	at io.smallrye.mutiny.operators.uni.UniOperatorProcessor.onFailure(UniOperatorProcessor.java:55)
	at io.smallrye.mutiny.operators.uni.UniOperatorProcessor.onFailure(UniOperatorProcessor.java:55)
	at io.smallrye.mutiny.operators.uni.UniOperatorProcessor.onFailure(UniOperatorProcessor.java:55)
	at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni$UniOnItemTransformToUniProcessor.performInnerSubscription(UniOnItemTransformToUni.java:73)
	at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni$UniOnItemTransformToUniProcessor.onItem(UniOnItemTransformToUni.java:57)
	at io.smallrye.mutiny.operators.uni.builders.UniCreateFromKnownItem$KnownItemSubscription.forward(UniCreateFromKnownItem.java:38)
	at io.smallrye.mutiny.operators.uni.builders.UniCreateFromKnownItem.subscribe(UniCreateFromKnownItem.java:23)
	at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
	at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni.subscribe(UniOnItemTransformToUni.java:25)
	at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
	at io.smallrye.mutiny.operators.uni.UniOnItemTransform.subscribe(UniOnItemTransform.java:22)
	at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
	at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni$UniOnItemTransformToUniProcessor.performInnerSubscription(UniOnItemTransformToUni.java:81)
	at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni$UniOnItemTransformToUniProcessor.onItem(UniOnItemTransformToUni.java:57)
	at io.smallrye.mutiny.operators.uni.UniOperatorProcessor.onItem(UniOperatorProcessor.java:47)
	at io.smallrye.mutiny.operators.uni.builders.UniCreateFromItemSupplier.subscribe(UniCreateFromItemSupplier.java:29)
	at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
	at io.smallrye.mutiny.operators.uni.UniRunSubscribeOn.lambda$subscribe$0(UniRunSubscribeOn.java:27)
	at io.quarkus.vertx.core.runtime.VertxCoreRecorder$14.runWith(VertxCoreRecorder.java:576)
	at org.jboss.threads.EnhancedQueueExecutor$Task.run(EnhancedQueueExecutor.java:2513)
	at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1538)
	at org.jboss.threads.DelegatingRunnable.run(DelegatingRunnable.java:29)
	at org.jboss.threads.ThreadLocalResettingRunnable.run(ThreadLocalResettingRunnable.java:29)
     o.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: io.smallrye.mutiny.CompositeException: Multiple exceptions caught:
	[Exception 0] io.smallrye.mutiny.CompositeException: Multiple exceptions caught:
	[Exception 0] java.lang.IllegalStateException: HR000069: Detected use of the reactive Session from a different Thread than the one which was used to open the reactive Session - this suggests an invalid integration; original thread [136]: 'vert.x-eventloop-thread-4' current Thread [122]: 'executor-thread-1'
	[Exception 1] java.lang.IllegalStateException: HR000068: This method should exclusively be invoked from a Vert.x EventLoop thread; currently running on thread 'executor-thread-1'
	[Exception 1] java.lang.IllegalStateException: HR000069: Detected use of the reactive Session from a different Thread than the one which was used to open the reactive Session - this suggests an invalid integration; original thread [136]: 'vert.x-eventloop-thread-4' current Thread [122]: 'executor-thread-1'
	at io.smallrye.mutiny.groups.UniOnItemOrFailure.lambda$call$1(UniOnItemOrFailure.java:75)
	at io.smallrye.context.impl.wrappers.SlowContextualBiFunction.apply(SlowContextualBiFunction.java:21)
	at io.smallrye.mutiny.operators.uni.UniOnItemOrFailureFlatMap$UniOnItemOrFailureFlatMapProcessor.performInnerSubscription(UniOnItemOrFailureFlatMap.java:86)
	... 52 more
	Suppressed: java.lang.IllegalStateException: HR000069: Detected use of the reactive Session from a different Thread than the one which was used to open the reactive Session - this suggests an invalid integration; original thread [136]: 'vert.x-eventloop-thread-4' current Thread [122]: 'executor-thread-1'
		at org.hibernate.reactive.common.InternalStateAssertions.assertCurrentThreadMatches(InternalStateAssertions.java:46)
		at org.hibernate.reactive.session.impl.ReactiveSessionImpl.threadCheck(ReactiveSessionImpl.java:190)
		at org.hibernate.reactive.session.impl.ReactiveSessionImpl.checkOpen(ReactiveSessionImpl.java:1786)
		at org.hibernate.internal.AbstractSharedSessionContract.checkOpenOrWaitingForAutoClose(AbstractSharedSessionContract.java:447)
		at org.hibernate.internal.SessionImpl.checkOpenOrWaitingForAutoClose(SessionImpl.java:616)
		at org.hibernate.internal.SessionImpl.closeWithoutOpenChecks(SessionImpl.java:410)
		at org.hibernate.internal.SessionImpl.close(SessionImpl.java:397)
		at org.hibernate.reactive.session.impl.ReactiveSessionImpl.reactiveClose(ReactiveSessionImpl.java:1738)
		at io.smallrye.context.impl.wrappers.SlowContextualSupplier.get(SlowContextualSupplier.java:21)
		at io.smallrye.mutiny.operators.uni.builders.UniCreateFromCompletionStage.subscribe(UniCreateFromCompletionStage.java:24)
		... 47 more
Caused by: io.smallrye.mutiny.CompositeException: Multiple exceptions caught:
	[Exception 0] java.lang.IllegalStateException: HR000069: Detected use of the reactive Session from a different Thread than the one which was used to open the reactive Session - this suggests an invalid integration; original thread [136]: 'vert.x-eventloop-thread-4' current Thread [122]: 'executor-thread-1'
	[Exception 1] java.lang.IllegalStateException: HR000068: This method should exclusively be invoked from a Vert.x EventLoop thread; currently running on thread 'executor-thread-1'
	... 30 more
	Suppressed: java.lang.IllegalStateException: HR000068: This method should exclusively be invoked from a Vert.x EventLoop thread; currently running on thread 'executor-thread-1'
		at org.hibernate.reactive.common.InternalStateAssertions.assertUseOnEventLoop(InternalStateAssertions.java:40)
		at org.hibernate.reactive.session.impl.ReactiveSessionImpl.getReactiveConnection(ReactiveSessionImpl.java:1727)
		at org.hibernate.reactive.mutiny.impl.MutinySessionImpl$Transaction.rollback(MutinySessionImpl.java:477)
		at io.smallrye.context.impl.wrappers.SlowContextualSupplier.get(SlowContextualSupplier.java:21)
		at io.smallrye.mutiny.groups.UniOnFailure.lambda$call$5(UniOnFailure.java:133)
		at io.smallrye.context.impl.wrappers.SlowContextualFunction.apply(SlowContextualFunction.java:21)
		at io.smallrye.mutiny.groups.UniOnFailure.lambda$call$4(UniOnFailure.java:102)
		at io.smallrye.context.impl.wrappers.SlowContextualFunction.apply(SlowContextualFunction.java:21)
		at io.smallrye.mutiny.operators.uni.UniOnFailureFlatMap$UniOnFailureFlatMapProcessor.performInnerSubscription(UniOnFailureFlatMap.java:92)
		... 29 more
Caused by: java.lang.IllegalStateException: HR000069: Detected use of the reactive Session from a different Thread than the one which was used to open the reactive Session - this suggests an invalid integration; original thread [136]: 'vert.x-eventloop-thread-4' current Thread [122]: 'executor-thread-1'
	at org.hibernate.reactive.common.InternalStateAssertions.assertCurrentThreadMatches(InternalStateAssertions.java:46)
	at org.hibernate.reactive.session.impl.ReactiveSessionImpl.threadCheck(ReactiveSessionImpl.java:190)
	at org.hibernate.reactive.session.impl.ReactiveSessionImpl.checkOpen(ReactiveSessionImpl.java:1786)
	at org.hibernate.internal.SessionImpl.contains(SessionImpl.java:1544)
	at org.hibernate.reactive.mutiny.impl.MutinySessionImpl.contains(MutinySessionImpl.java:109)
	at io.quarkus.hibernate.reactive.panache.common.runtime.AbstractJpaOperations.lambda$persist$0(AbstractJpaOperations.java:41)
	at io.smallrye.context.impl.wrappers.SlowContextualFunction.apply(SlowContextualFunction.java:21)
	at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni$UniOnItemTransformToUniProcessor.performInnerSubscription(UniOnItemTransformToUni.java:68)
	... 21 more

How to Reproduce?

Attached reactive.zip is a simple project that reproduces the error.

  1. ./mvnw quarkus:dev
  2. And see the exceptions
  3. Turn of the Hibernate checks
  4. ./mvnw quarkus:dev -Dorg.hibernate.reactive.common.InternalStateAssertions.ENFORCE=false
  5. No more errors occur

Output of uname -a or ver

Darwin Willem's-Tiny-MacBook-Pro 22.4.0 Darwin Kernel Version 22.4.0: Mon Mar  6 21:00:41 PST 2023; root:xnu-8796.101.5~3/RELEASE_ARM64_T8103 arm64

Output of java -version

openjdk 17.0.6 2023-01-17 OpenJDK Runtime Environment Temurin-17.0.6+10 (build 17.0.6+10) OpenJDK 64-Bit Server VM Temurin-17.0.6+10 (build 17.0.6+10, mixed mode)

GraalVM version (if different from Java)

No response

Quarkus version or git rev

3.0.0.RC2

Build tool (ie. output of mvnw --version or gradlew --version)

Apache Maven 3.8.8 (4c87b05d9aedce574290d1acc98575ed5eb6cd39) Maven home: /Users/wjglerum/.m2/wrapper/dists/apache-maven-3.8.8-bin/67c30f74/apache-maven-3.8.8 Java version: 17.0.6, vendor: Eclipse Adoptium, runtime: /Users/wjglerum/.sdkman/candidates/java/17.0.6-tem Default locale: en_NL, platform encoding: UTF-8 OS name: "mac os x", version: "13.3", arch: "aarch64", family: "mac"

Additional information

Looks related to #32533

@wjglerum wjglerum added the kind/bug Something isn't working label Apr 15, 2023
@quarkus-bot quarkus-bot bot added area/hibernate-reactive Hibernate Reactive area/persistence OBSOLETE, DO NOT USE labels Apr 15, 2023
@quarkus-bot
Copy link

quarkus-bot bot commented Apr 15, 2023

/cc @DavideD (hibernate-reactive), @Sanne (hibernate-reactive), @gavinking (hibernate-reactive)

@wjglerum
Copy link
Contributor Author

wjglerum commented Apr 15, 2023

Hmm added some more logs to see what's happening, looks like the persisting is indeed done on a worker thread (both on Quarkus 2 and 3)

2023-04-15 14:01:52,243 INFO  [nl.wjg.FruitScheduler] (vert.x-eventloop-thread-1) Scheduling fetching of fruit
2023-04-15 14:01:52,245 INFO  [nl.wjg.FruitService] (executor-thread-0) Fetching fruit!
2023-04-15 14:01:55,251 INFO  [nl.wjg.FruitRepository] (executor-thread-0) Saving fruit Banana

@wjglerum
Copy link
Contributor Author

Could we (force) switch back to the event loop thread?

@mkouba
Copy link
Contributor

mkouba commented Apr 17, 2023

Have you tried something like fruitService.random().chain(fruitRepository::persist).runSubscriptionOn(Infrastructure.getDefaultExecutor()).replaceWithVoid()?

CC @jponge

@jponge
Copy link
Member

jponge commented Apr 17, 2023

I don't think this is a Mutiny issue per-se.

Infrastructure.getDefaultExecutor() indeed shifts to a worker thread.

If you want to bring back some execution to a Vert.x event-loop then you might capture the Vert.x context (see Vertx.currentContext()) then eventually run something like context.runOnContext(v -> {...}).

@mkouba
Copy link
Contributor

mkouba commented Apr 17, 2023

Infrastructure.getDefaultExecutor() indeed shifts to a worker thread.

Hm, what's the difference between Infrastructure.getDefaultWorkerPool() and Infrastructure.getDefaultExecutor()? It might make sense to add some javadoc to the io.smallrye.mutiny.infrastructure.Infrastructure.

If you want to bring back some execution to a Vert.x event-loop then you might capture the Vert.x context (see Vertx.currentContext()) then eventually run something like context.runOnContext(v -> {...}).

That's not very convenient. Maybe Uni#runSubscriptionOn() is not the best approach for similar use cases?

@jponge
Copy link
Member

jponge commented Apr 17, 2023

Hm, what's the difference between Infrastructure.getDefaultWorkerPool() and Infrastructure.getDefaultExecutor()? It might make sense to add some javadoc to the io.smallrye.mutiny.infrastructure.Infrastructure.

They're the same under the hood. For a long time the Quarkus executor was not a scheduled thread pool so we had to provide a wrapper on top of a non-scheduled executor, but that's not the case anymore.

That's not very convenient. Maybe Uni#runSubscriptionOn() is not the best approach for similar use cases?

Well when you subscribe, it happens from the caller thread. If it's a Vert.x event loop then the subscription starts in such a context. runSubscriptionOn is used to offset the subscription from the caller thread to another thread (see https://smallrye.io/smallrye-mutiny/2.1.0/guides/emit-on-vs-run-subscription-on/).

@mkouba
Copy link
Contributor

mkouba commented Apr 17, 2023

@wjglerum Since the FruitService#random() is a blocking operation you can also try to leverage the VertxContextSupport.subscribeAndAwait() util method instead, i.e. change the code like:

// scheduled method is executed on a worker thread
@Scheduled(every = "1m") 
void store() {
     Fruit fruit = fruitService.random();
     VertxContextSupport.subscribeAndAwait(() -> {
        Panache.withTransaction(() -> fruitRepository.persist(fruit));
     });
 }

@jponge
Copy link
Member

jponge commented Apr 17, 2023

@wjglerum Since the FruitService#random() is a blocking operation you can also try to leverage the VertxContextSupport.subscribeAndAwait() util method instead, i.e. change the code like:

👍

@wjglerum
Copy link
Contributor Author

@wjglerum Since the FruitService#random() is a blocking operation you can also try to leverage the VertxContextSupport.subscribeAndAwait() util method instead, i.e. change the code like:

// scheduled method is executed on a worker thread
@Scheduled(every = "1m") 
void store() {
     Fruit fruit = fruitService.random();
     VertxContextSupport.subscribeAndAwait(() -> {
        Panache.withTransaction(() -> fruitRepository.persist(fruit));
     });
 }

That indeed works, thanks!

This wouldn't really work for more complex examples where we have more calls with uni's and multi's in the reactive world.

Next to .runSubscriptionOn(Infrastructure. getDefaultWorkerPool()) I think it would be useful to have something similar to switch to an event loop.

I don't think this is a Mutiny issue per-se.

Infrastructure.getDefaultExecutor() indeed shifts to a worker thread.

If you want to bring back some execution to a Vert.x event-loop then you might capture the Vert.x context (see Vertx.currentContext()) then eventually run something like context.runOnContext(v -> {...}).

I tried doing something like it, but can't seem to really figure out how the code should look like ... As runOnContext() returns a void and not Uni<Void>

I came up with this:

@Singleton
@WithTransaction
public class FruitScheduler {

    @Inject
    FruitService fruitService;

    @Inject
    FruitRepository fruitRepository;

    @Scheduled(every = "1m")
    public void store() {
        Log.info("Scheduling fruit!");
        Context context = Vertx.currentContext();
        Fruit fruit = fruitService.random();
        context.runOnContext(v -> fruitRepository.save(fruit));
    }
}

However that doesn't really work, as we now don't have a session on the context

2023-04-19 10:21:24,855 INFO  [io.qua.dep.dev.RuntimeUpdatesProcessor] (Aesh InputStream Reader) Live reload total time: 1.175s 
2023-04-19 10:21:25,004 INFO  [nl.wjg.FruitScheduler] (vert.x-worker-thread-1) Scheduling fruit!
2023-04-19 10:21:25,005 INFO  [nl.wjg.FruitService] (vert.x-worker-thread-1) Fetching fruits!
2023-04-19 10:21:28,008 INFO  [nl.wjg.FruitRepository] (vert.x-eventloop-thread-13) Persisting fruit!
2023-04-19 10:21:28,009 ERROR [io.qua.ver.cor.run.VertxCoreRecorder] (vert.x-eventloop-thread-13) Uncaught exception received by Vert.x: java.lang.IllegalStateException: No current Mutiny.Session found
	- no reactive session was found in the context and the context was not marked to open a new session lazily
	- you might need to annotate the business method with @WithSession
	at io.quarkus.hibernate.reactive.panache.common.runtime.SessionOperations.getSession(SessionOperations.java:155)
	at io.quarkus.hibernate.reactive.panache.common.runtime.AbstractJpaOperations.getSession(AbstractJpaOperations.java:351)
	at io.quarkus.hibernate.reactive.panache.common.runtime.AbstractJpaOperations.persist(AbstractJpaOperations.java:36)
	at io.quarkus.hibernate.reactive.panache.PanacheRepositoryBase.persist(PanacheRepositoryBase.java:54)
	at nl.wjglerum.FruitRepository.save(FruitRepository.java:13)
	at nl.wjglerum.FruitRepository_ClientProxy.save(Unknown Source)
	at nl.wjglerum.FruitScheduler.lambda$store$0(FruitScheduler.java:27)
	at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:264)
	at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:246)
	at io.vertx.core.impl.EventLoopContext.lambda$runOnContext$0(EventLoopContext.java:43)
	at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:833)

We can fix that by wrapping this in a Panache.withTransaction() but also not really great if we combine multiple calls.

@wjglerum
Copy link
Contributor Author

wjglerum commented Apr 19, 2023

For example with something like this:

@Singleton
@WithTransaction
public class FruitScheduler {

    @Inject
    FruitService fruitService;

    @Inject
    FruitRepository fruitRepository;

    @Scheduled(every = "1m")
    public Uni<Void> store() {
        Log.info("Scheduling fruit!");
        return fruitRepository.listAll()
                .chain(fruits -> fruitService.random().chain(fruit -> fruitRepository.save(fruit)))
                .replaceWithVoid();
    }
}

@jponge
Copy link
Member

jponge commented Apr 19, 2023

Next to .runSubscriptionOn(Infrastructure. getDefaultWorkerPool()) I think it would be useful to have something similar to switch to an event loop.

Noted 👍

@geoand
Copy link
Contributor

geoand commented Jun 27, 2023

What's the status of this? I was not able to get a clear picture from the existing comments.

@astappiev
Copy link

astappiev commented Aug 25, 2023

Hi, I would like to ask on a similar matter.

Let's imagine we have a hibernate request, blocking action and hibernate request again.
What is the proper way to implement that? (I know getting rid of blocking is the best, however)

Now I ended up with that, how optimal is it?

    @POST
    @Authenticated
    @Path("/completions")
    public CompletionResults completions(@Valid CompletionQuery query) throws Throwable {
        Chat chat = VertxContextSupport.subscribeAndAwait(() -> getOrCreateChat(query));
        CompletionResults results = chatService.completions(query, securityIdentity.getPrincipal());
        results.setChatId(chat.id);
        VertxContextSupport.subscribeAndAwait(() -> persistMessages(chat, query.getMessages(), results.getChoices()));
        return results;
    }

Ideally, I want to run a hibernate request in the event loop, spawn a new thread for the blocking method and then return back to the event loop. My first attempt was like this .chain(call).runSubscriptionOn(Infrastructure.getDefaultWorkerPool()), however, I haven't figured out how to return back to the event loop after worker pool.

@gsmet
Copy link
Member

gsmet commented Aug 25, 2023

Frankly, if you're in a blocking world, just use Hibernate ORM: it will be easier for you.

@geoand
Copy link
Contributor

geoand commented Aug 25, 2023

If one is going to use Hibernate ORM as opposed to Hibernate Reactive, there are no efficiancy gains to be had by offloading to another thread - if anything it would be worse.
Moreover you might run into to subtle issues with the request scope propagation (which in turn affects things like tracing).

So the gist is, if you absolutely want to gain maximum efficiency, use Hibernate Reactive, otherwise stick with blocking operations using Hibernate ORM.

@Syl2010
Copy link

Syl2010 commented Aug 25, 2023

The problem is that, if your operation take too long on an IO thread, Quarkus will stop the request and ask to use @Blocking (worker thread). But if you do that, you can't use Reactive Hibernate anymore because he want an IO thread (non blocking). So, beside of having a way to offload hibernate operations on the io thread from a blocking request, it's a bit a hard stuck

@geoand
Copy link
Contributor

geoand commented Aug 25, 2023

I am not sure how that relates as you are never supposed to block any event loop, no matter what operations you perform.

@Syl2010
Copy link

Syl2010 commented Aug 25, 2023

True, but Quarkus still cancel the request if the operation take too long, even if it's actually not blocking

@geoand
Copy link
Contributor

geoand commented Aug 25, 2023

That is configurable

@Froidoh
Copy link

Froidoh commented Aug 30, 2023

Frankly, if you're in a blocking world, just use Hibernate ORM: it will be easier for you.

If we were in a world where we could combine hibernate orm and hibernate-reactive in one and the same project and decide per API which one we use (or have long running background cron-jobs use hibernate orm and the API for querying some status or such in hibernate-reactive) this would be acceptable.

As we can not freely combine the two, this is, frankly, very disappointing.

I'm used to work in many languages, Scala, Kotlin, Rust, Node.js (bah) to name a few modern ones I use(d) professionally and now with Quarkus it's quite often needed to re-learn concepts and find workarounds for tasks that are easy with other solutions.

This is not meant as a diss, I am super grateful you guys are doing an amazing work to bring Java forward, but at the same time I must say: The tradeoffs hurt.

We are currently evaluating migrating off of quarkus at some client of mine because of stuff like this. This not a threat or anything, you don't have to care, it's not about emotions or hurting the project, it's a matter of fact I wanted to share with you.

We had a service running for many months now. Since yesterday the load increased and we suddenly get (without a new deployment!)

[Exception 0] java.lang.IllegalStateException: HR000069: Detected use of the reactive Session from a different Thread than the one which was used to open the reactive Session - this suggests an invalid integration; original thread [136]: 'vert.x-eventloop-thread-4' current Thread [122]: 'executor-thread-1'

This is for a non-blocking call.

@geoand
Copy link
Contributor

geoand commented Aug 30, 2023

Negative feedback is often more valuable than positive feedback, so thanks for sharing!

The mixing of Hibernate ORM and Hibernate Reactive is something that is on the radar, but hasn't been done yet because other persistence related things get higher priority.

As for the exception you are seeing, that is definitely a bug and we absolutely need to fix it. Can you open a new issue and attach a sample application that exhibits this problematic behavior?

@Froidoh
Copy link

Froidoh commented Aug 30, 2023

@geoand First off: Thanks!

Currently I simply don't have the time to create a minimal reproducible example as this only happened since yesterday under more load and I cannot reproduce it locally so far.
Of course I cannot add the whole application for various reasons (Setting it up would also add quite some burden to you).

I am an open source maintainer myself (under another nick ;)) and would like to give back a little, but right now: I can't.

@geoand
Copy link
Contributor

geoand commented Aug 30, 2023

Of course I cannot add the whole application for various reasons

Yeah, that's completely understood.

I am an open source maintainer myself (under another nick ;)) and would like to give back a little, but right now: I can't.

No problem. If and when you can create a sample we can use to debug the problem, please let us know - cc @DavideD

@maxandersen
Copy link
Member

maxandersen commented Aug 30, 2023

@Froidoh thanks for your candid comments.

the limitations of not being able to mix hibernate classic orm and hibernate reactive is one of the main reasons why Hibernate Reactive continues to be marked with "preview" status - we know it is annoying but also didn't wan't to hold back those using fully reactive from not being able to access reactive. The next update of docs will make the limitations more explicitly documented.

That does not solve you problem; but mentioning it here for others to be aware.

On your specific issue I'm curious to know a few things to clarify what kind of bug we are dealing with here.

  1. Which version of Quarkus are you using?
  2. Does any of your code manually try and run things concurrently that shares hibernate managed entities? i.e. spawning own threads, using managed executor, doing mutiny operations which can result in concurrent execution?

if you have cases of #2 then we should investigate that because that kind of behavior is inherently not safe - we try and detect such actions to prevent data loss/invalid edits - that could show up like this. This is not a "orm not mixable with reactive" but something that can happen in any stack if not being careful; though with reactive many seem to be able to do it more easily without noticing.

if you do NOT have #2 then I'm more concerned and would be great to investigate more...the version of Quarkus might give us a hint.

Thanks again.

@Froidoh
Copy link

Froidoh commented Aug 30, 2023

@maxandersen

We use <quarkus.platform.version>3.1.0.Final</quarkus.platform.version> - the API in question is defined as

	 @POST
	 @Path("{bucket}")
	 @Produces(MediaType.APPLICATION_JSON)
	 public Uni<FileRow> uploadFile(
	            @HeaderParam("X-File-Metadata") String metaData,
	            @HeaderParam("Content-Type") String contentType,
	            @PathParam("bucket") String bucketName,
	            InputStream file) {

		}

It should do a streaming file upload. At the time this API was created there was no way to do streaming multipart file upload, in case you're wondering.

We then proceed to fetch a bucket config from the dabase and if it exists we try to persist the file in a non-blocking fashion via fs operations.

In there we use an AsyncInputStream and let vertx handle a lot of stuff (biting the buffer so we don't choke on huge files and do not need to allocate too much memory at once)
Also a vertx.getDelegate().executeBlocking(p -> {}) for some strictly blocking Files.setAttribute calls

Once the file is uploaded we insert some data to a few db tables and return a result.

Maybe there is a footgun in there indeed.

@geoand
Copy link
Contributor

geoand commented Aug 30, 2023

<quarkus.platform.version>3.1.0.Final</quarkus.platform.version>

The first thing to do would be to tre 3.2.5.Final and / or 3.3.1

the API in question is defined as

Can you please fill in some pseudo-code showing what the impl does (and most imporantly where blocking and non-blocking calls are used)

@Froidoh
Copy link

Froidoh commented Aug 31, 2023

@geoand trying with 3.3.1 we get:

org.hibernate.HibernateException: java.lang.ClassCastException: class java.math.BigDecimal cannot be cast to class java.lang.Integer (java.math.BigDecimal and java.lang.Integer are in module java.base of loader 'bootstrap')"

Oh how I missed the runtime errors of Java when working in Rust g

We do not use BigDecimal in any of our code btw. There is one occurrence of BigInteger though

@geoand
Copy link
Contributor

geoand commented Aug 31, 2023

@DavideD ^

@Froidoh
Copy link

Froidoh commented Aug 31, 2023

Same with 3.2.5.Final

Not a problem with 3.1.0.Final

Didn't do a git bisect but it starts with 3.2.0.Final (didn't try any release candidates)

@DavideD
Copy link
Contributor

DavideD commented Aug 31, 2023

org.hibernate.HibernateException: java.lang.ClassCastException: class java.math.BigDecimal cannot be cast to class java.lang.Integer (java.math.BigDecimal and java.lang.Integer are in module java.base of loader 'bootstrap')"

Can we have the stack trace?
What operation is causing this error?
The only time I think I've seen a similar error was with id generation or aggregate operations.
Is this error thrown after running a query?

@Froidoh
Copy link

Froidoh commented Aug 31, 2023

the API in question is defined as

Can you please fill in some pseudo-code showing what the impl does (and most imporantly where blocking and non-blocking calls are used)

@geoand

I am sorry, I missed this... you know what, it won't help but here it is:

@NonBlocking
public Uni<FileInformation> upload(String contentType, InputStream file, String originalFileName, String bucketName, MetaDataDto metadata) {
    String normalizedFilename = originalFileName;
    try {
        byte[] decode = base64decoder.decode(originalFileName);
        normalizedFilename = new String(decode, StandardCharsets.UTF_8);
    } catch (Exception e) {
    }
    String filename = normalizedFilename;
    return
            bucketsService.getConfigIfAllowsNewFiles(bucketName)
                    .onItem()
                    .ifNull()
                    .failWith(BucketNotFoundException.noConfig(bucketName))
                    .flatMap(bucketConfig -> {
                                return persistFile(file, contentType, bucketConfig)
                                        .flatMap(writtenFile -> repo.persistAndFlush(new FileInformation(
                                                bucketConfig,
                                                writtenFile,
                                                contentType,
                                                filename
                                        )));
                            }
                    )
                    .flatMap(f -> f.updateMetaDataAndPersistChanges(metadata))
            ;
}

  private Uni<AsyncFileWriteDto> persistFile(InputStream in, String contentType, BucketConfig bucketConfig) {
    String fileName = UUID.randomUUID() + MimeTypes.getExtensionForMimeType(contentType);
    String pathToDir = bucketConfig.pathToDir();
    String filePath = pathToDir.concat("/").concat(fileName);

    FileSystem nfs = vertx.fileSystem();
    AsyncInputStream ais = new AsyncInputStream(vertx.getDelegate(), vertx.getDelegate().getOrCreateContext(), in, d -> updateHash(sha256Digest, d.getBytes()));

    try {
        return nfs
                .mkdirs(pathToDir)
                .flatMap(v -> nfs.createFile(filePath))
                .flatMap(v -> nfs.open(filePath, new OpenOptions().setWrite(true)))
                .flatMap(asyncFile ->
                        UniHelper.toUni(ais
                                        .handler(data -> updateHash(sha256Digest, data.getBytes()))
                                        .pipeTo(asyncFile.getDelegate()))
                                .flatMap(v -> {
                                    BigInteger fileSize = BigInteger.valueOf(ais.getFileSize());
                                    if (BigInteger.ZERO.equals(fileSize)) {
                                        vertx.getDelegate().executeBlocking(p -> {
                                            log.info(String.format("Deleting empty file at %s!", filePath));
                                            nfs.deleteBlocking(filePath);
                                        });
                                        return Uni.createFrom().failure(new EmptyFileUploadException());
                                    }
                                    byte[] digest = sha256Digest.digest();
                                    BigInteger bigInteger = new BigInteger(1, digest);
                                    String fileHash = bigInteger.toString(16);
                                    // Not all file systems support creation time, so don't even bother, let's define our own, it will be good enough
                                    LocalDateTime creationTime = LocalDateTime.now();
                                    LocalDateTime retentionTime = bucketConfig.calculateRetentionTimeFrom(creationTime);
                                    // If everything was okay up until this point, we can make the file "unwritable"
                                    ZonedDateTime zdt = ZonedDateTime.of(retentionTime, ZoneId.systemDefault());
                                    vertx.getDelegate().executeBlocking(p -> {
                                        // Yes, this is blocking as I found no way to do this asynchronously in Java
                                        File f = new File(filePath);
                                        try {
                                            Files.setAttribute(Paths.get(filePath), "lastAccessTime", FileTime.from(zdt.toInstant()));
                                            if (f.setLastModified(zdt.toInstant().toEpochMilli())) {
                                                log.debug(String.format("set last modified of %s to %s", filePath, retentionTime));
                                            } else {
                                                log.error(String.format("Failed to set last modified of %s to %s", filePath, retentionTime));
                                            }
                                            if (!f.setReadOnly()) {
                                                log.error(String.format("Failed to set %s to READ_ONLY", filePath));
                                            }
                                            p.complete();
                                        } catch (IOException e) {
                                            p.fail(e);
                                        }
                                    }, res -> {
                                        if (res.succeeded()) {
                                            log.debug(String.format("Successfully set lastAccessTime of %s to %s", filePath, retentionTime));
                                        } else {
                                            log.error(String.format("Failed to set lastAccessTime of %s to %s", filePath, retentionTime));
                                        }
                                    });
                                    return Uni.createFrom().item(new AsyncFileWriteDto(filePath, creationTime, fileSize, fileHash, retentionTime));
                                }));

    } catch (Exception e) {
        log.error(String.format("Failed to persist file: %s", e));
        return Uni.createFrom().failure(new FileUploadFailedException());
    }
}

/**
 * @author stw, antimist
 * Taken from github
 */
public class AsyncInputStream implements ReadStream<Buffer> {

    public static final int DEFAULT_READ_BUFFER_SIZE = 8192;
    private static final Logger log = Logger.getLogger(AsyncInputStream.class);

    // Based on the inputStream with the real data
    private final ReadableByteChannel ch;
    private final Vertx vertx;
    private final Context context;

    private boolean closed;
    private boolean readInProgress;

    private Handler<Buffer> dataHandler;
    private final Handler<Buffer> hashCalculator;
    private Handler<Void> endHandler;
    private Handler<Throwable> exceptionHandler;
    private final InboundBuffer<Buffer> queue;

    private final int readBufferSize = DEFAULT_READ_BUFFER_SIZE;
    private long readPos;
    private long fileSize;

    /**
     * Create a new Async InputStream that can we used with a Pump
     *
     * @param in
     *        The input stream you want to write somewhere
     */
    public AsyncInputStream(Vertx vertx, Context context, InputStream in, Handler<Buffer> hashCalculator) {
        this.vertx = vertx;
        this.context = context;
        this.ch = Channels.newChannel(in);
        this.queue = new InboundBuffer<>(context, 0);
        this.hashCalculator = hashCalculator;
        queue.handler(buff -> {
            if (buff.length() > 0) {
                handleData(buff);
            } else {
                handleEnd();
            }
        });
        queue.drainHandler(v -> doRead());
    }

    public void close() {
        closeInternal(null);
    }

    public void close(Handler<AsyncResult<Void>> handler) {
        closeInternal(handler);
    }

    /*
     * (non-Javadoc)
     * @see io.vertx.core.streams.ReadStream#endHandler(io.vertx.core.Handler)
     */
    @Override
    public synchronized AsyncInputStream endHandler(Handler<Void> endHandler) {
        check();
        this.endHandler = endHandler;
        return this;
    }

    /*
     * (non-Javadoc)
     * @see
     * io.vertx.core.streams.ReadStream#exceptionHandler(io.vertx.core.Handler)
     */
    @Override
    public synchronized AsyncInputStream exceptionHandler(Handler<Throwable> exceptionHandler) {
        check();
        this.exceptionHandler = exceptionHandler;
        return this;
    }

    /*
     * (non-Javadoc)
     * @see io.vertx.core.streams.ReadStream#handler(io.vertx.core.Handler)
     */
    @Override
    public synchronized AsyncInputStream handler(Handler<Buffer> handler) {
        check();
        this.dataHandler = handler;
        if (this.dataHandler != null && !this.closed) {
            this.doRead();
        } else {
            queue.clear();
        }
        return this;
    }

    /*
     * (non-Javadoc)
     * @see io.vertx.core.streams.ReadStream#pause()
     */
    @Override
    public synchronized AsyncInputStream pause() {
        check();
        queue.pause();
        return this;
    }

    /*
     * (non-Javadoc)
     * @see io.vertx.core.streams.ReadStream#resume()
     */
    @Override
    public synchronized AsyncInputStream resume() {
        check();
        if (!closed) {
            queue.resume();
        }
        return this;
    }

    @Override
    public ReadStream<Buffer> fetch(long amount) {
        queue.fetch(amount);
        return this;
    }

    private void check() {
        if (this.closed) {
            throw new IllegalStateException("Inputstream is closed");
        }
    }

    private void checkContext() {
        if (!vertx.getOrCreateContext().equals(context)) {
            throw new IllegalStateException("AsyncInputStream must only be used in the context that created it, expected: " + this.context
                    + " actual " + vertx.getOrCreateContext());
        }
    }

    private synchronized void closeInternal(Handler<AsyncResult<Void>> handler) {
        check();
        closed = true;
        doClose(handler);
    }

    private void doClose(Handler<AsyncResult<Void>> handler) {

        try {
            ch.close();
            if (handler != null) {
                this.vertx.runOnContext(v -> handler.handle(Future.succeededFuture()));
            }
        } catch (IOException e) {
            if (handler != null) {
                this.vertx.runOnContext(v -> handler.handle(Future.failedFuture(e)));
            }
        }
    }

    public synchronized AsyncInputStream read(Buffer buffer, int offset, long position, int length,
                                              Handler<AsyncResult<Buffer>> handler) {
        Objects.requireNonNull(buffer, "buffer");
        Objects.requireNonNull(handler, "handler");
        Arguments.require(offset >= 0, "offset must be >= 0");
        Arguments.require(position >= 0, "position must be >= 0");
        Arguments.require(length >= 0, "length must be >= 0");
        check();
        ByteBuffer bb = ByteBuffer.allocate(length);
        doRead(buffer, offset, bb, position, handler);
        return this;
    }

    private void doRead() {
        check();
        doRead(ByteBuffer.allocate(readBufferSize));
    }

    private synchronized void doRead(ByteBuffer bb) {
        if (!readInProgress) {
            readInProgress = true;
            Buffer buff = Buffer.buffer(readBufferSize);
            doRead(buff, 0, bb, readPos, ar -> {
                if (ar.succeeded()) {
                    readInProgress = false;
                    Buffer buffer = ar.result();
                    readPos += buffer.length();
                    fileSize = readPos;
                    // Empty buffer represents end of file
                    if (queue.write(buffer) && buffer.length() > 0) {
                        doRead(bb);
                    }
                } else {
                    handleException(ar.cause());
                }
            });
        }
    }

    private void doRead(Buffer writeBuff, int offset, ByteBuffer buff, long position, Handler<AsyncResult<Buffer>> handler) {

        // ReadableByteChannel doesn't have a completion handler, so we wrap it into
        // an executeBlocking and use the future there
        vertx.executeBlocking(future -> {
            try {
                Integer bytesRead = ch.read(buff);
                future.complete(bytesRead);
            } catch (Exception e) {
                log.error(e);
                future.fail(e);
            }
        }, res -> {
            if (res.failed()) {
                context.runOnContext((v) -> handler.handle(Future.failedFuture(res.cause())));
            } else {
                // Do the completed check
                Integer bytesRead = (Integer) res.result();
                if (bytesRead == -1) {
                    //End of file
                    context.runOnContext((v) -> {
                        buff.flip();
                        writeBuff.setBytes(offset, buff);
                        buff.compact();
                        handler.handle(Future.succeededFuture(writeBuff));
                    });
                } else if (buff.hasRemaining()) {
                    long pos = position;
                    pos += bytesRead;
                    // resubmit
                    doRead(writeBuff, offset, buff, pos, handler);
                } else {
                    // It's been fully written

                    context.runOnContext((v) -> {
                        buff.flip();
                        writeBuff.setBytes(offset, buff);
                        buff.compact();
                        handler.handle(Future.succeededFuture(writeBuff));
                    });
                }
            }
        });
    }

    private void handleData(Buffer buff) {
        Handler<Buffer> handler;
        synchronized (this) {
            handler = this.dataHandler;
        }
        if (handler != null) {
            checkContext();
            hashCalculator.handle(buff);
            handler.handle(buff);
        }
    }

    private synchronized void handleEnd() {
        Handler<Void> endHandler;
        synchronized (this) {
            dataHandler = null;
            endHandler = this.endHandler;
        }
        if (endHandler != null) {
            checkContext();
            endHandler.handle(null);
        }
    }

    private void handleException(Throwable t) {
        if (exceptionHandler != null && t instanceof Exception) {
            exceptionHandler.handle(t);
        } else {
            log.error("Unhandled exception", t);
        }
    }

    public long getFileSize() {
        return fileSize;
    }
}

Maybe you see something that is absolutely wrong

@Froidoh
Copy link

Froidoh commented Aug 31, 2023

org.hibernate.HibernateException: java.lang.ClassCastException: class java.math.BigDecimal cannot be cast to class java.lang.Integer (java.math.BigDecimal and java.lang.Integer are in module java.base of loader 'bootstrap')"

Can we have the stack trace? What operation is causing this error? The only time I think I've seen a similar error was with id generation or aggregate operations. Is this error thrown after running a query?

@DavideD

Yes, the fix is going from:

public class IdGenerator extends MutinyGenerator {
    @Override
    public Uni<Object> generate(Mutiny.Session session, Object owner, Object currentValue, EventType eventType) {
        return session
                .createNativeQuery("select OUR_SEQUENCE.nextval FROM dual")
                .getSingleResult()
                .map(x -> BigInteger.valueOf((Integer) x))
                ;
    }
...

to

public class IdGenerator extends MutinyGenerator {
    @Override
    public Uni<Object> generate(Mutiny.Session session, Object owner, Object currentValue, EventType eventType) {
        return session
                .createNativeQuery("select OUR_SEQUENCE.nextval FROM dual")
                .getSingleResult()
                .map(x -> ((BigDecimal) x).toBigInteger())
                ;
    }
...

I remember we had to change this a couple of times already in the past year(s) going through the versions.

At some point it was a generic function and the result of the native query was typed, so this error would've been caught at compile time. But at some point this changed.

@DavideD
Copy link
Contributor

DavideD commented Aug 31, 2023

So, the issue here is that you are running a native query without specifying what value you expect to receive.
Hibernate has no idea what type you expect and it returns the object that the driver is returning.

It's possible that with different versions of the driver, or Hibernate, or database, you get something different in return.
I haven't tested it, but I would expect something like this to work:

               .createNativeQuery("select OUR_SEQUENCE.nextval FROM dual", BigInteger.class)
                .getSingleResult()

or, at the very least:

               .createNativeQuery("select OUR_SEQUENCE.nextval FROM dual", BigDecimal.class)
                .getSingleResult()
                .map( BigDecimal::toBigInteger)

@Froidoh
Copy link

Froidoh commented Aug 31, 2023

So, the issue here is that you are running a native query without specifying what value you expect to receive. So hibernate just returns the object as is from the driver.

It's possible that with different versions of the driver, or Hibernate, or database, you get something different in return. I haven't tested it, but I would expect something like this to work:

               .createNativeQuery("select OUR_SEQUENCE.nextval FROM dual", BigInteger.class)
                .getSingleResult()

or, at the very least:

               .createNativeQuery("select OUR_SEQUENCE.nextval FROM dual", BigDecimal.class)
                .getSingleResult()
                .map( BigDecimal::toBigInteger)

That's cool, thanks!

But one would still need to cast this to an Object (or at least add a .map(Function.identity())) because of MutinyGenerator not taking a generic (anymore?)

@DavideD
Copy link
Contributor

DavideD commented Aug 31, 2023

Yes, I think you are right. I don't know if there is any particular reason for not using the generic any more.

I think this method is inspired by ORM where the generate returns an Object, we did a natural conversion by returning a Uni<Object>.

I will create an issue to make it generic again.

@DavideD
Copy link
Contributor

DavideD commented Aug 31, 2023

Actually, just returning a Uni<?> would solve the issue. I think

@geoand
Copy link
Contributor

geoand commented Aug 31, 2023

@Froidoh so if I understand correctly what you are trying do is something like the following:

  • Receive an HTTP input representing a file
  • Call some downstream service to get some metata
  • Persist a file on the filesystem containing the file and the metadata
  • Write some metadata to a DB
  • Write some more data to a DB?

Is that write?

@Froidoh
Copy link

Froidoh commented Aug 31, 2023

@geoand

We get the metadata also from the call, I omitted that initially, but it's just a http-header. Sorry, it was an attempt to make the code a bit leaner.

So it is:

  • Receive an HTTP input representing a file + metadata (via http header)
  • Persist the file on the filesystem WITHOUT allocating all the memory... the file could be hundreds of mb big - also set some additional flags on the file (filesystem level)
  • When the file was written to the fs: Write meta data to the db AND write an entry about the file itself (so yes, multiple tables affected)

@geoand
Copy link
Contributor

geoand commented Aug 31, 2023

@Froidoh thanks

Before I move on to suggestions (and @FroMage probably has some as well) I have a question

For the HTTP input, are you using multipart data is just plain bytes in the HTTP body?

@Froidoh
Copy link

Froidoh commented Aug 31, 2023

@Froidoh thanks

Before I move on to suggestions (and @FroMage probably has some as well) I have a question

For the HTTP input, are you using multipart data is just plain bytes in the HTTP body?

Currently plain bytes as back when we started this multipart data could not be streamed at all in quarkus, so we would've needed to allocate all the files on the server, which was an absolute nogo.

A curl call would look like this:

curl --location 'localhost:8080/v1/buckets/test_bucket/files'
--header 'X-File-Metadata: { "systemInfo": { "systemName": "Odin" }}'
--header 'Transfer-Encoding: chunked'
--header 'Content-Type: text/plain'
--data '@/home/froidoh/triumph.txt'

@geoand
Copy link
Contributor

geoand commented Aug 31, 2023

Quarkus should not be buffering the multipart contents and if it is, that's a bug.

Now, even if you use raw HTTP body, you can just use the File or Path types and Quarkus will save the contents to disk from where you can then use them in your Resource method.

Furthermore, you can also use vertx.fileSystem() to perform async operations on the file (like moving it to a location of your choice). That should vastly simplify things and eliminate any well hidden bugs that might be lurking in the code you mentioned above.

@Froidoh
Copy link

Froidoh commented Aug 31, 2023

@geoand is there any example how to do this in a non-blocking manor while not allocating any additional memory on the server AND not relying on any temp folder?

Because the server this runs on doesn't have a lot of HDD, the file-system we push to is a special remote file-system mounted on the server.

If a lot of files get uploaded and all of them would need to temporarily reside on the server, we would have a problem. Maybe the "temp folder" is configurable and we could "pipe it through" to the remote file-system...

@geoand
Copy link
Contributor

geoand commented Aug 31, 2023

I have found what looks like a bug in our File handling which I am looking into. I'll post an update when I have figured it out.

@geoand
Copy link
Contributor

geoand commented Aug 31, 2023

After #35659 is in, you will be able to use quarkus.http.body.uploads-directory to specify where the temp files are stored.
Furthermore, those files will be removed by default when the request is completed (same is already the case for multipart handling).

@FroMage
Copy link
Member

FroMage commented Aug 31, 2023

If you get a lot of concurrent requests uploading files, I suspect you will still have more room in your tmp folder on your HDD than if you streamed all those in memory and then later on your larger storage. Now, perhaps you're streaming them directly from the network to your larger storage, in which case, yeah, just let RESTEasy Reactive do it for you like Georgios said, and it will be done prior to invoking your endpoint, and give you a File or Path which is easy to work with.

Note that this should work both for Multipart and a single file.

@Froidoh
Copy link

Froidoh commented Aug 31, 2023

If you get a lot of concurrent requests uploading files, I suspect you will still have more room in your tmp folder on your HDD than if you streamed all those in memory and then later on your larger storage. Now, perhaps you're streaming them directly from the network to your larger storage, in which case, yeah, just let RESTEasy Reactive do it for you like Georgios said, and it will be done prior to invoking your endpoint, and give you a File or Path which is easy to work with.

Note that this should work both for Multipart and a single file.

Sadly no, as the server we are talking about is a container with like 200mb of hdd space available :/

@FroMage
Copy link
Member

FroMage commented Sep 11, 2023

So, did you try quarkus.http.body.uploads-directory?

@Froidoh
Copy link

Froidoh commented Sep 15, 2023

So, did you try quarkus.http.body.uploads-directory?

I did not, I am currently on vacation and I must say that for the time being 2 of 3 of our services will migrate from quarkus to spring.

This will probably lead to more hardware resources but as I am the bottleneck of all development and involved in other projects as well, the decision was met and I think it's the right one.

We'll see how everything works out with the advent of Java 21 and "green threads".

At least now I am not the only person capable of writing somewhat acceptable code because quarkus/mutiny really feels like another programming language, if you only know "classic imperative java".

I wanna say thanks so for all your support and I wish you all the best. We are staying with Quarkus for one service, that works reliable and makes users happy, but has no need of combining blocking calls with non-blocking ones :)

@geoand
Copy link
Contributor

geoand commented Sep 15, 2023

So, did you try quarkus.http.body.uploads-directory?

@FroMage we need my PR for that to work

@geoand
Copy link
Contributor

geoand commented Sep 15, 2023

At least now I am not the only person capable of writing somewhat acceptable code because quarkus/mutiny really feels like another programming language, if you only know "classic imperative java".

That is all fine and well, but you can use Quarkus in an imperative way, and it's probably the best approach for most use cases.

@Froidoh
Copy link

Froidoh commented Sep 15, 2023

At least now I am not the only person capable of writing somewhat acceptable code because quarkus/mutiny really feels like another programming language, if you only know "classic imperative java".

That is all fine and well, but you can use Quarkus in an imperative way, and it's probably the best approach for most use cases.

I would argue that this advice should be way more prominent on the website. If you search for quarkus you mostly find the mutiny/vert.x and non-blocking examples that distinguish quarkus from other frameworks in java-land.

Also one additional note: What really made a big impact in the decision finding was a new requirement to use redis. A colleague of mine read up the docs, implemented it and it doesn't work if there are more than one hosts specified.

Took them a few minutes to get it working in Spring. Granted, they are way more familiar with Spring.

@geoand
Copy link
Contributor

geoand commented Sep 15, 2023

I would argue that this advice should be way more prominent on the website. If you search for quarkus you mostly find the mutiny/vert.x and non-blocking examples that distinguish quarkus from other frameworks in java-land.

Although we have been saying from day 1 that Quarkus can be used in both imperative and reactive mode (and the styles can even be mixed in the same application), the fact that you have not got that impression means we certainly need to do much better.

Also one additional note: What really made a big impact in the decision finding was a new requirement to use redis. A colleague of mine read up the docs, implemented it and it doesn't work if there are more than one hosts specified.

I would need to know more details about this to give a proper answer

@Sanne
Copy link
Member

Sanne commented Jan 19, 2024

Hi all,
there's good feedback in here in the various comments which probably deserves their own issues and/or some further attention, however the main problem being described of "This worked in Quarkus 2 and now no longer works" is not a bug: it was illegal and problematic also in Quarkus 2, we simply have better validations nowadays.

So I think we should close this as it's not particularly actionable.. sorry for all confusion.

Allow me to give some advise: use Hibernate Reactive exclusively if you have a "pure" reactive application.

Attempting to switch threads back and forth from blocking to reactive will only result in significant efficiency waste, making the use of Hibernate Reactive pointless. If you're on a regular executor (not the vertx/netty threads) you're better off to use the "regular" Hibernate ORM within a blocking thread, and remember there's nothing inherently bad about it as we optimised the regular ORM a lot as well: if you find inefficiencies in it, let us know!

On the other hand if your entire flow of operations is running on the vertx(netty) IO threads, then (and only then) you can really benefit from Hibernate Reactive: but remember the benefit of it really stems from the fact that you're NOT switching threads and executors. Such switches are the operation to avoid to achieve an high performance, highly efficient system, so attempting to shoehorn Hibernate Reactive operations from within a blocking thread just doesn't make much sense.

HTH

@Sanne Sanne closed this as completed Jan 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/hibernate-reactive Hibernate Reactive area/persistence OBSOLETE, DO NOT USE kind/bug Something isn't working
Projects
None yet
Development

No branches or pull requests