Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async MQTT client blocks on publish if the client is disconnected and waiting between reconnect attempts #554

Open
richturner opened this issue Dec 15, 2022 · 10 comments
Labels

Comments

@richturner
Copy link

Expected behavior

The async client (in my case Mqtt3AsyncClient) should not block when calling send no matter what the client state is; instead I would expect to get the CompletableFuture<MqttPublish>

Actual behavior

send blocks the calling thread; I guess waiting for the client to be in a connected state?

To Reproduce

  1. Create an Mqtt3AsyncClient with reconnect logic, put an invalid host value (so the client can never actually connect)
  2. Publish an MQTT message e.g.:
client.publishWith()
            .topic(message.topic)
            .payload(messageToBytes(message.payload))
            .send()
            .whenComplete((publish, throwable) -> {
                if (throwable != null) {
                    // Failure
                    LOG.log(Level.INFO, "Failed to publish to MQTT broker '" + getClientUri() + "'", throwable);
                } else {
                    // Success
                    LOG.finer("Published message to MQTT broker '" + getClientUri() + "'");
                }
            });
  1. Notice that the call blocks on the send call with no Future ever returned

Details

  • Affected HiveMQ MQTT Client version(s): 1.2.2
  • Used JVM version: Oracle JDK 17.0.1
  • Used OS (name and version): Windows 10 64-bit but also same behaviour on Linux Debian AArch64
  • Used MQTT version: N/A
  • Used MQTT broker (name and version): N/A
@pglombardo
Copy link
Contributor

Hi @richturner - thanks for pointing this issue out. I'll see if I can reproduce this here locally and report back soon.

@pglombardo
Copy link
Contributor

Hi @richturner - I haven't been able to reproduce this. Could you post your client creation code (with reconnect)?

@pglombardo
Copy link
Contributor

Since this issue has gone stale, I'll close it out but if you want to pick up on this again, let us know. We'd be happy to help out!

@Mystery406
Copy link

Same problem here, have you found any solutions to this?

@Blafasel3
Copy link

Blafasel3 commented Aug 29, 2024

It seems we are experiencing the same issue with an RxClient:

It seems the connect runs into a timeout and the client does not recover from that.
This essentially leads to a OoM (with unbounded thread count) or a totally blocked app (with bounded thread count)
I could verify this via Android heap dumps: We use an unbounded Rxjava io-Scheduler which is generally not an issue.
Since the MqttClient completely bocks the publish but does not release the message, the whole thread seems to die and is not gc'ed. In the long run, this leads to a OoM on our side.

This is the creation Logic we use:
The variable connectionDetails is just a data class wrapping some config. Initial reconnect delay is 1 seconds, max is 1 minute.

MqttClient.builder().useMqttVersion3().identifier(connectionDetails.clientProperties.identifier())
			// Server-Settings
			.serverHost(connectionDetails.serverProperties.serverHost).serverPort(connectionDetails.serverProperties.serverPort)
			.also {
				val authDetails = connectionDetails.authorization
				if (authDetails != null) {
					it.simpleAuth().username(authDetails.username).password(authDetails.password.toByteArray()).applySimpleAuth()
				}
			}
			// Reconnect-Settings
			.automaticReconnect()
			.initialDelay(connectionDelayInMillis, TimeUnit.MILLISECONDS)
			.maxDelay(reconnectMaxDelayInMillis, TimeUnit.MILLISECONDS)
			.applyAutomaticReconnect()
			// Listener Configurations
			.addConnectedListener(connectedLister)
			.addDisconnectedListener(disconnectedListener)
			.buildRx()

EDIT: This does not seem to depend on QoS of the publish. Even attaching a timeout to the completable publish does not prevent this.

@thxyoulistenme
Copy link

thxyoulistenme commented Oct 11, 2024

@richturner @Mystery406 @Blafasel3 Hi, We also encountered the same problem, do you have a solution? Thank you so much

@Mystery406
Copy link

@richturner @Mystery406 @Blafasel3 Hi, We also encountered the same problem, do you have a solution? Thank you so much

My workaround for now is to check whether client is connected to server before every push.

@code2life-crypto
Copy link

code2life-crypto commented Oct 12, 2024

@richturner @Mystery406 @Blafasel3 @pglombardo @richturner Hi, we are facing the same issue, and we check the client state before each send. Here is the code for publish message:
ts20241012-1152-2

Under most circumstances, it runs fine in prod env. This is an intermittent issue that has occurred twice in the past six months, where the thread gets locked and cannot be awakened.

here is a sample stack trace:
"**.client.pool-2-2" #157 [111] prio=5 os_prio=0 cpu=3198.07ms elapsed=100922.67s tid=0x00007fd7fbfac520 nid=111 in Object.wait() [0x00007fd77e3f1000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait0([email protected]/Native Method) - waiting on <no object reference available> at java.lang.Object.wait([email protected]/Object.java:366) at java.lang.Object.wait([email protected]/Object.java:339) at com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttPublishFlowables.add(MqttPublishFlowables.java:53) - locked <0x00000007220c0a30> (a com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttPublishFlowables) at com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttAckSingle.subscribeActual(MqttAckSingle.java:56) at io.reactivex.Single.subscribe(Single.java:3666) at io.reactivex.internal.operators.single.SingleObserveOn.subscribeActual(SingleObserveOn.java:35) at io.reactivex.Single.subscribe(Single.java:3666) at com.hivemq.client.internal.rx.RxFutureConverter$RxSingleFuture.<init>(RxFutureConverter.java:113) at com.hivemq.client.internal.rx.RxFutureConverter.toFuture(RxFutureConverter.java:43) at com.hivemq.client.internal.mqtt.MqttAsyncClient.publish(MqttAsyncClient.java:243)

here is "com.hivemq.client.mqtt" stack trace:
`"com.hivemq.client.mqtt-7-1" #176 [121] prio=10 os_prio=0 cpu=998.73ms elapsed=100918.76s tid=0x00007fd65c2a5220 nid=121 runnable [0x00007fd77dceb000]
java.lang.Thread.State: RUNNABLE
at io.netty.channel.epoll.Native.epollWait0(Native Method)
at io.netty.channel.epoll.Native.epollWait(Native.java:193)
at io.netty.channel.epoll.EpollEventLoop.epollWait(EpollEventLoop.java:304)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:368)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.runWith([email protected]/Thread.java:1596)
at java.lang.Thread.run([email protected]/Thread.java:1583)

"com.hivemq.client.mqtt-7-2" #175 [122] prio=10 os_prio=0 cpu=1038.93ms elapsed=100918.76s tid=0x00007fd674412360 nid=122 runnable [0x00007fd77dbea000]
java.lang.Thread.State: RUNNABLE
at io.netty.channel.epoll.Native.epollWait0(Native Method)
at io.netty.channel.epoll.Native.epollWait(Native.java:193)
at io.netty.channel.epoll.EpollEventLoop.epollWait(EpollEventLoop.java:304)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:368)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.runWith([email protected]/Thread.java:1596)
at java.lang.Thread.run([email protected]/Thread.java:1583)`

  **### Details**
  Affected HiveMQ MQTT Client version(s): 1.3.0
  Used JVM version: Oracle JDK 21+
  Used OS (name and version): deploy on AWS cloud Linux Debian AArch64
  Used MQTT version: N/A
  Used MQTT broker (name and version): N/A
  MQTT Publish Message QoS Level : 0

We can't reproduce the issue in our local env, even after trying with breakpoints. We suspect that HiveMQ's thread pool doesn't always call the notifyAll method in certain cases, but we're unsure of the cause.
ts20241012-1210-2

Could you please help investigate this issue?

@Blafasel3
Copy link

@richturner @Mystery406 @Blafasel3 @pglombardo @richturner Hi, we are facing the same issue, and we check the client state before each send. Here is the code for publish message: ts20241012-1152-2

Under most circumstances, it runs fine in prod env. This is an intermittent issue that has occurred twice in the past six months, where the thread gets locked and cannot be awakened.

here is a sample stack trace: "**.client.pool-2-2" #157 [111] prio=5 os_prio=0 cpu=3198.07ms elapsed=100922.67s tid=0x00007fd7fbfac520 nid=111 in Object.wait() [0x00007fd77e3f1000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait0([email protected]/Native Method) - waiting on <no object reference available> at java.lang.Object.wait([email protected]/Object.java:366) at java.lang.Object.wait([email protected]/Object.java:339) at com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttPublishFlowables.add(MqttPublishFlowables.java:53) - locked <0x00000007220c0a30> (a com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttPublishFlowables) at com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttAckSingle.subscribeActual(MqttAckSingle.java:56) at io.reactivex.Single.subscribe(Single.java:3666) at io.reactivex.internal.operators.single.SingleObserveOn.subscribeActual(SingleObserveOn.java:35) at io.reactivex.Single.subscribe(Single.java:3666) at com.hivemq.client.internal.rx.RxFutureConverter$RxSingleFuture.<init>(RxFutureConverter.java:113) at com.hivemq.client.internal.rx.RxFutureConverter.toFuture(RxFutureConverter.java:43) at com.hivemq.client.internal.mqtt.MqttAsyncClient.publish(MqttAsyncClient.java:243)

here is "com.hivemq.client.mqtt" stack trace: `"com.hivemq.client.mqtt-7-1" #176 [121] prio=10 os_prio=0 cpu=998.73ms elapsed=100918.76s tid=0x00007fd65c2a5220 nid=121 runnable [0x00007fd77dceb000] java.lang.Thread.State: RUNNABLE at io.netty.channel.epoll.Native.epollWait0(Native Method) at io.netty.channel.epoll.Native.epollWait(Native.java:193) at io.netty.channel.epoll.EpollEventLoop.epollWait(EpollEventLoop.java:304) at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:368) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.runWith([email protected]/Thread.java:1596) at java.lang.Thread.run([email protected]/Thread.java:1583)

"com.hivemq.client.mqtt-7-2" #175 [122] prio=10 os_prio=0 cpu=1038.93ms elapsed=100918.76s tid=0x00007fd674412360 nid=122 runnable [0x00007fd77dbea000] java.lang.Thread.State: RUNNABLE at io.netty.channel.epoll.Native.epollWait0(Native Method) at io.netty.channel.epoll.Native.epollWait(Native.java:193) at io.netty.channel.epoll.EpollEventLoop.epollWait(EpollEventLoop.java:304) at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:368) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.runWith([email protected]/Thread.java:1596) at java.lang.Thread.run([email protected]/Thread.java:1583)`

  **### Details**
  Affected HiveMQ MQTT Client version(s): 1.3.0
  Used JVM version: Oracle JDK 21+
  Used OS (name and version): deploy on AWS cloud Linux Debian AArch64
  Used MQTT version: N/A
  Used MQTT broker (name and version): N/A

We can't reproduce the issue in our local env, even after trying with breakpoints. We suspect that HiveMQ's thread pool doesn't always call the notifyAll method in certain cases, but we're unsure of the cause. ts20241012-1210-2

Could you please help investigate this issue?

@Mystery406 @thxyoulistenme
This is basically what we are doing now. You could consider implementing a queue like this depending on QoS.
But nevertheless, this should be fixed inside the client imo.

@pglombardo pglombardo reopened this Oct 12, 2024
@pglombardo
Copy link
Contributor

pglombardo commented Oct 12, 2024

Hi all - I have the team take a closer look at this this week.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

6 participants