-
Notifications
You must be signed in to change notification settings - Fork 930
Scala MQTT Client
Prabeesh K edited this page Jul 30, 2015
·
2 revisions
MQTT is broker based message queuing system. So to work with MQTT, MQTT Message broker/server required. Mosquitto is an open source MQTT Broker. In Ubuntu mosquitto can be installed using the command
$ sudo apt-get install mosquitto
Eclipse Paho is one mqtt client work well with mosquitto. You may read more about it here.
MQTT Scala Publisher
package main.scala
import org.eclipse.paho.client.mqttv3.MqttClient
import org.eclipse.paho.client.mqttv3.MqttException
import org.eclipse.paho.client.mqttv3.MqttMessage
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
/**
*
* MQTT publisher
* @author Prabeesh K
* @mail [email protected]
*
*/
object Publisher {
def main(args: Array[String]) {
val brokerUrl = "tcp://localhost:1883"
val topic = "foo"
val msg = "Hello world test data"
var client: MqttClient = null
// Creating new persistence for mqtt client
val persistence = new MqttDefaultFilePersistence("/tmp")
try {
// mqtt client with specific url and client id
client = new MqttClient(brokerUrl, MqttClient.generateClientId, persistence)
client.connect()
val msgTopic = client.getTopic(topic)
val message = new MqttMessage(msg.getBytes("utf-8"))
while (true) {
msgTopic.publish(message)
println("Publishing Data, Topic : %s, Message : %s".format(msgTopic.getName, message))
Thread.sleep(100)
}
}
catch {
case e: MqttException => println("Exception Caught: " + e)
}
finally {
client.disconnect()
}
}
}
MQTT Scala Subscriber
package main.scala
import org.eclipse.paho.client.mqttv3._
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
/**
*
* MQTT subcriber
* @author Prabeesh K
* @mail [email protected]
*
*/
object Subscriber {
def main(args: Array[String]) {
val brokerUrl = "tcp://localhost:1883"
val topic = "foo"
//Set up persistence for messages
val persistence = new MemoryPersistence
//Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance
val client = new MqttClient(brokerUrl, MqttClient.generateClientId, persistence)
//Connect to MqttBroker
client.connect
//Subscribe to Mqtt topic
client.subscribe(topic)
//Callback automatically triggers as and when new message arrives on specified topic
val callback = new MqttCallback {
override def messageArrived(topic: String, message: MqttMessage): Unit = {
println("Receiving Data, Topic : %s, Message : %s".format(topic, message))
}
override def connectionLost(cause: Throwable): Unit = {
println(cause)
}
override def deliveryComplete(token: IMqttDeliveryToken): Unit = {
}
}
//Set up callback for MqttClient
client.setCallback(callback)
}
}
SBT file
name := "MQTTScalaClinet"
version := "0.2.0"
scalaVersion := "2.10.3"
libraryDependencies += "org.eclipse.paho" % "mqtt-client" % "0.4.0"
resolvers += "MQTT Repository" at "https://repo.eclipse.org/content/repositories/paho-releases/"
MQTT Scala subscriber and publisher code based on eclipse paho library 0.4.0 is available in github with SBT build tool.
MQTT Community Wiki by mqtt.org community members is licensed under a Creative Commons Attribution 4.0 International License and was made possible by the generosity of all of its contributors.