Skip to content
This repository has been archived by the owner on Feb 16, 2024. It is now read-only.

[BAHIR-183] [WIP] HDFS based MQTT client persistence #84

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ Furthermore, to generate scaladocs for each module:

`$ mvn package`

Scaladocs is generated in, `MODULE_NAME/target/site/scaladocs/index.html`. __ Where `MODULE_NAME` is one of, `sql-streaming-mqtt`, `streaming-akka`, `streaming-mqtt`, `streaming-zeromq`, `streaming-twitter`. __
Scaladocs is generated in `${MODULE_NAME}/target/site/scaladocs/index.html`, where `MODULE_NAME` is one of `sql-streaming-mqtt`, `streaming-akka`, `streaming-mqtt`, `streaming-zeromq` or `streaming-twitter`.

## A note about Apache Spark integration

Expand Down
50 changes: 25 additions & 25 deletions sql-streaming-mqtt/README.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,4 @@
#

org.apache.bahir.sql.streaming.mqtt.MQTTStreamSinkProvider
org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider
org.apache.spark.sql.mqtt.HDFSMQTTSourceProvider
org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ private[mqtt] object CachedMQTTClient extends Logging {

private def createMqttClient(config: Map[String, String]):
(MqttClient, MqttClientPersistence) = {
val (brokerUrl, clientId, _, persistence, mqttConnectOptions, _, _, _, _) =
val (brokerUrl, clientId, _, persistence, mqttConnectOptions, _) =
MQTTUtils.parseConfigParams(config)
val client = new MqttClient(brokerUrl, clientId, persistence)
val callback = new MqttCallbackExtended() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class MQTTDataWriter(config: mutable.Map[String, String]) extends DataWriter[Int
private lazy val publishBackoff: Long =
SparkEnv.get.conf.getTimeAsMs("spark.mqtt.client.publish.backoff", "5s")

private lazy val (_, _, topic, _, _, qos, _, _, _) = MQTTUtils.parseConfigParams(config.toMap)
private lazy val (_, _, topic, _, _, qos) = MQTTUtils.parseConfigParams(config.toMap)

override def write(record: InternalRow): Unit = {
val client = CachedMQTTClient.getOrCreate(config.toMap)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,6 @@ class MQTTStreamSource(options: DataSourceOptions, brokerUrl: String, persistenc
private var startOffset: OffsetV2 = _
private var endOffset: OffsetV2 = _

/* Older than last N messages, will not be checked for redelivery. */
val backLog = options.getInt("autopruning.backlog", 500)

private[mqtt] val store = new LocalMessageStore(persistence)

private[mqtt] val messages = new TrieMap[Long, MQTTMessage]
Expand Down Expand Up @@ -231,7 +228,6 @@ class MQTTStreamSource(options: DataSourceOptions, brokerUrl: String, persistenc
/** Stop this source. */
override def stop(): Unit = synchronized {
client.disconnect()
persistence.close()
client.close()
}

Expand All @@ -250,7 +246,7 @@ class MQTTStreamSourceProvider extends DataSourceV2
}

import scala.collection.JavaConverters._
val (brokerUrl, clientId, topic, persistence, mqttConnectOptions, qos, _, _, _) =
val (brokerUrl, clientId, topic, persistence, mqttConnectOptions, qos) =
MQTTUtils.parseConfigParams(collection.immutable.HashMap() ++ parameters.asMap().asScala)

new MQTTStreamSource(parameters, brokerUrl, persistence, topic, clientId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.bahir.sql.streaming.mqtt

import java.util.Properties

import org.apache.hadoop.conf.Configuration
import org.eclipse.paho.client.mqttv3.{MqttClient, MqttClientPersistence, MqttConnectOptions}
import org.eclipse.paho.client.mqttv3.persist.{MemoryPersistence, MqttDefaultFilePersistence}

Expand All @@ -45,7 +46,7 @@ object MQTTUtils extends Logging {
)

def parseConfigParams(config: Map[String, String]):
(String, String, String, MqttClientPersistence, MqttConnectOptions, Int, Long, Long, Int) = {
(String, String, String, MqttClientPersistence, MqttConnectOptions, Int) = {
def e(s: String) = new IllegalArgumentException(s)
val parameters = CaseInsensitiveMap(config)

Expand All @@ -54,6 +55,14 @@ object MQTTUtils extends Logging {

val persistence: MqttClientPersistence = parameters.get("persistence") match {
case Some("memory") => new MemoryPersistence()
case Some("hdfs") =>
val hadoopConfig = new Configuration
for (parameter <- parameters) {
if (parameter._1.startsWith("hdfs.")) {
hadoopConfig.set(parameter._1.replaceFirst("hdfs.", ""), parameter._2)
}
}
new HdfsMqttClientPersistence( hadoopConfig )
case _ => val localStorage: Option[String] = parameters.get("localStorage")
localStorage match {
case Some(x) => new MqttDefaultFilePersistence(x)
Expand Down Expand Up @@ -83,11 +92,6 @@ object MQTTUtils extends Logging {
val autoReconnect: Boolean = parameters.getOrElse("autoReconnect", "false").toBoolean
val maxInflight: Int = parameters.getOrElse("maxInflight", "60").toInt

val maxBatchMessageNum = parameters.getOrElse("maxBatchMessageNum", s"${Long.MaxValue}").toLong
val maxBatchMessageSize = parameters.getOrElse("maxBatchMessageSize",
s"${Long.MaxValue}").toLong
val maxRetryNumber = parameters.getOrElse("maxRetryNum", "3").toInt

val mqttConnectOptions: MqttConnectOptions = new MqttConnectOptions()
mqttConnectOptions.setAutomaticReconnect(autoReconnect)
mqttConnectOptions.setCleanSession(cleanSession)
Expand All @@ -109,7 +113,6 @@ object MQTTUtils extends Logging {
})
mqttConnectOptions.setSSLProperties(sslProperties)

(brokerUrl, clientId, topic, persistence, mqttConnectOptions, qos,
maxBatchMessageNum, maxBatchMessageSize, maxRetryNumber)
(brokerUrl, clientId, topic, persistence, mqttConnectOptions, qos)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ package org.apache.bahir.sql.streaming.mqtt
import java.io._
import java.util

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.eclipse.paho.client.mqttv3.{MqttClientPersistence, MqttPersistable, MqttPersistenceException}
import org.eclipse.paho.client.mqttv3.internal.MqttPersistentData
import scala.util.Try

import org.apache.bahir.utils.Logging
Expand All @@ -45,24 +49,16 @@ trait MessageStore {
}

private[mqtt] class MqttPersistableData(bytes: Array[Byte]) extends MqttPersistable {

override def getHeaderLength: Int = bytes.length

override def getHeaderOffset: Int = 0

override def getPayloadOffset: Int = 0

override def getPayloadBytes: Array[Byte] = null

override def getHeaderBytes: Array[Byte] = bytes

override def getPayloadLength: Int = 0
}

trait Serializer {

def deserialize[T](x: Array[Byte]): T

def serialize[T](x: T): Array[Byte]
}

Expand Down Expand Up @@ -94,17 +90,14 @@ class JavaSerializer extends Serializer with Logging {
null
}
}

}

object JavaSerializer {

private lazy val instance = new JavaSerializer()

def getInstance(): JavaSerializer = instance

}


/**
* A message store to persist messages received. This is not intended to be thread safe.
* It uses `MqttDefaultFilePersistence` for storing messages on disk locally on the client.
Expand Down Expand Up @@ -148,3 +141,113 @@ private[mqtt] class LocalMessageStore(val persistentStore: MqttClientPersistence
}

}

private[mqtt] class HdfsMqttClientPersistence(config: Configuration)
extends MqttClientPersistence {

var rootPath: Path = _
var fileSystem: FileSystem = _

override def open(clientId: String, serverURI: String): Unit = {
try {
rootPath = new Path("mqtt/" + clientId + "/" + serverURI.replaceAll("[^a-zA-Z0-9]", "_"))
fileSystem = FileSystem.get(config)
if (!fileSystem.exists(rootPath)) {
fileSystem.mkdirs(rootPath)
}
}
catch {
case e: Exception => throw new MqttPersistenceException(e)
}
}

override def close(): Unit = {
try {
fileSystem.close()
}
catch {
case e: Exception => throw new MqttPersistenceException(e)
}
}

override def put(key: String, persistable: MqttPersistable): Unit = {
try {
val path = getPath(key)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just worry about the performance of creating a file for each coming message.

val output = fileSystem.create(path)
output.writeInt(persistable.getHeaderLength)
if (persistable.getHeaderLength > 0) {
output.write(persistable.getHeaderBytes)
}
output.writeInt(persistable.getPayloadLength)
if (persistable.getPayloadLength > 0) {
output.write(persistable.getPayloadBytes)
}
output.close()
}
catch {
case e: Exception => throw new MqttPersistenceException(e)
}
}

override def get(key: String): MqttPersistable = {
try {
val input = fileSystem.open(getPath(key))
val headerLength = input.readInt()
val headerBytes: Array[Byte] = new Array[Byte](headerLength)
input.read(headerBytes)
val payloadLength = input.readInt()
val payloadBytes: Array[Byte] = new Array[Byte](payloadLength)
input.read(payloadBytes)
input.close()
new MqttPersistentData(
key, headerBytes, 0, headerBytes.length, payloadBytes, 0, payloadBytes.length
)
}
catch {
case e: Exception => throw new MqttPersistenceException(e)
}
}

override def remove(key: String): Unit = {
try {
fileSystem.delete(getPath(key), false)
}
catch {
case e: Exception => throw new MqttPersistenceException(e)
}
}

override def keys(): util.Enumeration[String] = {
try {
val iterator = fileSystem.listFiles(rootPath, false)
new util.Enumeration[String]() {
override def hasMoreElements: Boolean = iterator.hasNext
override def nextElement(): String = iterator.next().getPath.getName
}
}
catch {
case e: Exception => throw new MqttPersistenceException(e)
}
}

override def clear(): Unit = {
try {
fileSystem.delete(rootPath, true)
}
catch {
case e: Exception => throw new MqttPersistenceException(e)
}
}

override def containsKey(key: String): Boolean = {
try {
fileSystem.isFile(getPath(key))
}
catch {
case e: Exception => throw new MqttPersistenceException(e)
}
}

private def getPath(key: String): Path = new Path(rootPath + "/" + key)

}

This file was deleted.

Loading