Skip to content

Commit

Permalink
Merge pull request #93 from openbase/92-memory-leak-in-sharedmqttclient
Browse files Browse the repository at this point in the history
implement removal if unused topics and formatting
  • Loading branch information
DivineThreepwood authored Jan 21, 2023
2 parents 2b82c71 + 86ff906 commit 2c4f92a
Showing 1 changed file with 26 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,9 @@ import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5Connect
import com.hivemq.client.mqtt.mqtt5.message.disconnect.Mqtt5Disconnect
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish
import com.hivemq.client.mqtt.mqtt5.message.subscribe.Mqtt5Subscribe
import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck
import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.Mqtt5Unsubscribe
import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.Mqtt5UnsubscribeBuilder
import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.unsuback.Mqtt5UnsubAck
import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.unsuback.Mqtt5UnsubAckReasonCode
import org.openbase.jul.communication.config.CommunicatorConfig
import org.openbase.jul.iface.Shutdownable
import java.util.*
Expand All @@ -38,16 +36,16 @@ object SharedMqttClient : Shutdownable {
fun get(
communicatorConfig: CommunicatorConfig
) = sharedClients.getOrPut(communicatorConfig) {
MqttClient.builder()
.identifier(UUID.randomUUID().toString())
.serverHost(communicatorConfig.hostname)
.serverPort(communicatorConfig.port)
.useMqttVersion5()
.automaticReconnectWithDefaultConfig()
.buildAsync()
.let { Mqtt5ClientWrapper(it) }
.also { it.connect() }
}
MqttClient.builder()
.identifier(UUID.randomUUID().toString())
.serverHost(communicatorConfig.hostname)
.serverPort(communicatorConfig.port)
.useMqttVersion5()
.automaticReconnectWithDefaultConfig()
.buildAsync()
.let { Mqtt5ClientWrapper(it) }
.also { it.connect() }
}

@Synchronized
fun waitForShutdown() =
Expand Down Expand Up @@ -80,12 +78,24 @@ object SharedMqttClient : Shutdownable {

fun isConnected() = internalClient.config.state.isConnected

/**
* Increase the counter for the number of subscriptions on a topic.
*
* @param topic the topic
* @return if this is the first subscription to this topic
*/
@Synchronized
private fun increaseTopicCounter(topic: MqttTopicFilter): Boolean = topic
.toString()
.also { subscriptionsCounterMap[it] = subscriptionsCounterMap.getOrPut(it) { 0 } + 1 }
.let { subscriptionsCounterMap[it] == 1 }

/**
* Decrease the counter for the number of subscriptions on a topic.
*
* @param topicFilter the topic
* @return true if there are still subscriptions on this topic after decreasing the counter
*/
@Synchronized
private fun decreaseTopicCounter(topicFilter: MqttTopicFilter): Boolean = topicFilter
.toString()
Expand All @@ -94,8 +104,9 @@ object SharedMqttClient : Shutdownable {
.let { it == null || it == 0 }
.also { if (it) return true }

subscriptionsCounterMap[topic] = subscriptionsCounterMap[topic]!! - 1
subscriptionsCounterMap[topic] == 0
subscriptionsCounterMap[topic] = subscriptionsCounterMap[topic]!!.dec()
(subscriptionsCounterMap[topic] == 0)
.also { if (it) subscriptionsCounterMap.remove(topic) }
}

override fun getConfig() = internalClient.config
Expand Down Expand Up @@ -167,7 +178,7 @@ object SharedMqttClient : Shutdownable {
): CompletableFuture<Mqtt5UnsubAck> = p0.topicFilters
.filter { decreaseTopicCounter(it) }
.takeIf { it.isNotEmpty() }
?.let { Mqtt5Unsubscribe.builder().addTopicFilters(it).build() }
?.let { Mqtt5Unsubscribe.builder().addTopicFilters(it).build() }
?.let { internalClient.unsubscribe(it) }
?: CompletableFuture.completedFuture(null)

Expand Down

0 comments on commit 2c4f92a

Please sign in to comment.