Skip to content
This repository has been archived by the owner on Feb 16, 2024. It is now read-only.

Commit

Permalink
[BAHIR-183] HDFS based MQTT client persistence
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasz-antoniak committed Mar 28, 2019
1 parent c51853d commit 9709a62
Show file tree
Hide file tree
Showing 15 changed files with 351 additions and 725 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ Furthermore, to generate scaladocs for each module:

`$ mvn package`

Scaladocs is generated in, `MODULE_NAME/target/site/scaladocs/index.html`. __ Where `MODULE_NAME` is one of, `sql-streaming-mqtt`, `streaming-akka`, `streaming-mqtt`, `streaming-zeromq`, `streaming-twitter`. __
Scaladocs is generated in `${MODULE_NAME}/target/site/scaladocs/index.html`, where `MODULE_NAME` is one of `sql-streaming-mqtt`, `streaming-akka`, `streaming-mqtt`, `streaming-zeromq` or `streaming-twitter`.

## A note about Apache Spark integration

Expand Down
50 changes: 25 additions & 25 deletions sql-streaming-mqtt/README.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,4 @@
#

org.apache.bahir.sql.streaming.mqtt.MQTTStreamSinkProvider
org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider
org.apache.spark.sql.mqtt.HDFSMQTTSourceProvider
org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ private[mqtt] object CachedMQTTClient extends Logging {

private def createMqttClient(config: Map[String, String]):
(MqttClient, MqttClientPersistence) = {
val (brokerUrl, clientId, _, persistence, mqttConnectOptions, _, _, _, _) =
val (brokerUrl, clientId, _, persistence, mqttConnectOptions, _) =
MQTTUtils.parseConfigParams(config)
val client = new MqttClient(brokerUrl, clientId, persistence)
val callback = new MqttCallbackExtended() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class MQTTDataWriter(config: mutable.Map[String, String]) extends DataWriter[Int
private lazy val publishBackoff: Long =
SparkEnv.get.conf.getTimeAsMs("spark.mqtt.client.publish.backoff", "5s")

private lazy val (_, _, topic, _, _, qos, _, _, _) = MQTTUtils.parseConfigParams(config.toMap)
private lazy val (_, _, topic, _, _, qos) = MQTTUtils.parseConfigParams(config.toMap)

override def write(record: InternalRow): Unit = {
val client = CachedMQTTClient.getOrCreate(config.toMap)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,6 @@ class MQTTStreamSource(options: DataSourceOptions, brokerUrl: String, persistenc
private var startOffset: OffsetV2 = _
private var endOffset: OffsetV2 = _

/* Older than last N messages, will not be checked for redelivery. */
val backLog = options.getInt("autopruning.backlog", 500)

private[mqtt] val store = new LocalMessageStore(persistence)

private[mqtt] val messages = new TrieMap[Long, MQTTMessage]
Expand Down Expand Up @@ -231,7 +228,6 @@ class MQTTStreamSource(options: DataSourceOptions, brokerUrl: String, persistenc
/** Stop this source. */
override def stop(): Unit = synchronized {
client.disconnect()
persistence.close()
client.close()
}

Expand All @@ -250,7 +246,7 @@ class MQTTStreamSourceProvider extends DataSourceV2
}

import scala.collection.JavaConverters._
val (brokerUrl, clientId, topic, persistence, mqttConnectOptions, qos, _, _, _) =
val (brokerUrl, clientId, topic, persistence, mqttConnectOptions, qos) =
MQTTUtils.parseConfigParams(collection.immutable.HashMap() ++ parameters.asMap().asScala)

new MQTTStreamSource(parameters, brokerUrl, persistence, topic, clientId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.bahir.sql.streaming.mqtt

import java.util.Properties

import org.apache.hadoop.conf.Configuration
import org.eclipse.paho.client.mqttv3.{MqttClient, MqttClientPersistence, MqttConnectOptions}
import org.eclipse.paho.client.mqttv3.persist.{MemoryPersistence, MqttDefaultFilePersistence}

Expand All @@ -45,7 +46,7 @@ object MQTTUtils extends Logging {
)

def parseConfigParams(config: Map[String, String]):
(String, String, String, MqttClientPersistence, MqttConnectOptions, Int, Long, Long, Int) = {
(String, String, String, MqttClientPersistence, MqttConnectOptions, Int) = {
def e(s: String) = new IllegalArgumentException(s)
val parameters = CaseInsensitiveMap(config)

Expand All @@ -54,6 +55,14 @@ object MQTTUtils extends Logging {

val persistence: MqttClientPersistence = parameters.get("persistence") match {
case Some("memory") => new MemoryPersistence()
case Some("hdfs") =>
val hadoopConfig = new Configuration
for (parameter <- parameters) {
if (parameter._1.startsWith("hdfs.")) {
hadoopConfig.set(parameter._1.replaceFirst("hdfs.", ""), parameter._2)
}
}
new HdfsMqttClientPersistence( hadoopConfig )
case _ => val localStorage: Option[String] = parameters.get("localStorage")
localStorage match {
case Some(x) => new MqttDefaultFilePersistence(x)
Expand Down Expand Up @@ -83,11 +92,6 @@ object MQTTUtils extends Logging {
val autoReconnect: Boolean = parameters.getOrElse("autoReconnect", "false").toBoolean
val maxInflight: Int = parameters.getOrElse("maxInflight", "60").toInt

val maxBatchMessageNum = parameters.getOrElse("maxBatchMessageNum", s"${Long.MaxValue}").toLong
val maxBatchMessageSize = parameters.getOrElse("maxBatchMessageSize",
s"${Long.MaxValue}").toLong
val maxRetryNumber = parameters.getOrElse("maxRetryNum", "3").toInt

val mqttConnectOptions: MqttConnectOptions = new MqttConnectOptions()
mqttConnectOptions.setAutomaticReconnect(autoReconnect)
mqttConnectOptions.setCleanSession(cleanSession)
Expand All @@ -109,7 +113,6 @@ object MQTTUtils extends Logging {
})
mqttConnectOptions.setSSLProperties(sslProperties)

(brokerUrl, clientId, topic, persistence, mqttConnectOptions, qos,
maxBatchMessageNum, maxBatchMessageSize, maxRetryNumber)
(brokerUrl, clientId, topic, persistence, mqttConnectOptions, qos)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ package org.apache.bahir.sql.streaming.mqtt
import java.io._
import java.util

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.eclipse.paho.client.mqttv3.{MqttClientPersistence, MqttPersistable, MqttPersistenceException}
import org.eclipse.paho.client.mqttv3.internal.MqttPersistentData
import scala.util.Try

import org.apache.bahir.utils.Logging
Expand All @@ -45,24 +49,16 @@ trait MessageStore {
}

private[mqtt] class MqttPersistableData(bytes: Array[Byte]) extends MqttPersistable {

override def getHeaderLength: Int = bytes.length

override def getHeaderOffset: Int = 0

override def getPayloadOffset: Int = 0

override def getPayloadBytes: Array[Byte] = null

override def getHeaderBytes: Array[Byte] = bytes

override def getPayloadLength: Int = 0
}

trait Serializer {

def deserialize[T](x: Array[Byte]): T

def serialize[T](x: T): Array[Byte]
}

Expand Down Expand Up @@ -94,17 +90,14 @@ class JavaSerializer extends Serializer with Logging {
null
}
}

}

object JavaSerializer {

private lazy val instance = new JavaSerializer()

def getInstance(): JavaSerializer = instance

}


/**
* A message store to persist messages received. This is not intended to be thread safe.
* It uses `MqttDefaultFilePersistence` for storing messages on disk locally on the client.
Expand Down Expand Up @@ -148,3 +141,113 @@ private[mqtt] class LocalMessageStore(val persistentStore: MqttClientPersistence
}

}

private[mqtt] class HdfsMqttClientPersistence(config: Configuration)
extends MqttClientPersistence {

var rootPath: Path = _
var fileSystem: FileSystem = _

override def open(clientId: String, serverURI: String): Unit = {
try {
rootPath = new Path("mqtt/" + clientId + "/" + serverURI.replaceAll("[^a-zA-Z0-9]", "_"))
fileSystem = FileSystem.get(config)
if (!fileSystem.exists(rootPath)) {
fileSystem.mkdirs(rootPath)
}
}
catch {
case e: Exception => throw new MqttPersistenceException(e)
}
}

override def close(): Unit = {
try {
fileSystem.close()
}
catch {
case e: Exception => throw new MqttPersistenceException(e)
}
}

override def put(key: String, persistable: MqttPersistable): Unit = {
try {
val path = getPath(key)
val output = fileSystem.create(path)
output.writeInt(persistable.getHeaderLength)
if (persistable.getHeaderLength > 0) {
output.write(persistable.getHeaderBytes)
}
output.writeInt(persistable.getPayloadLength)
if (persistable.getPayloadLength > 0) {
output.write(persistable.getPayloadBytes)
}
output.close()
}
catch {
case e: Exception => throw new MqttPersistenceException(e)
}
}

override def get(key: String): MqttPersistable = {
try {
val input = fileSystem.open(getPath(key))
val headerLength = input.readInt()
val headerBytes: Array[Byte] = new Array[Byte](headerLength)
input.read(headerBytes)
val payloadLength = input.readInt()
val payloadBytes: Array[Byte] = new Array[Byte](payloadLength)
input.read(payloadBytes)
input.close()
new MqttPersistentData(
key, headerBytes, 0, headerBytes.length, payloadBytes, 0, payloadBytes.length
)
}
catch {
case e: Exception => throw new MqttPersistenceException(e)
}
}

override def remove(key: String): Unit = {
try {
fileSystem.delete(getPath(key), false)
}
catch {
case e: Exception => throw new MqttPersistenceException(e)
}
}

override def keys(): util.Enumeration[String] = {
try {
val iterator = fileSystem.listFiles(rootPath, false)
new util.Enumeration[String]() {
override def hasMoreElements: Boolean = iterator.hasNext
override def nextElement(): String = iterator.next().getPath.getName
}
}
catch {
case e: Exception => throw new MqttPersistenceException(e)
}
}

override def clear(): Unit = {
try {
fileSystem.delete(rootPath, true)
}
catch {
case e: Exception => throw new MqttPersistenceException(e)
}
}

override def containsKey(key: String): Boolean = {
try {
fileSystem.isFile(getPath(key))
}
catch {
case e: Exception => throw new MqttPersistenceException(e)
}
}

private def getPath(key: String): Path = new Path(rootPath + "/" + key)

}

This file was deleted.

Loading

0 comments on commit 9709a62

Please sign in to comment.