Skip to content

Commit

Permalink
[SPARK-12487][STREAMING][DOCUMENT] Add docs for Kafka message handler
Browse files Browse the repository at this point in the history
Author: Shixiong Zhu <[email protected]>

Closes apache#10439 from zsxwing/kafka-message-handler-doc.
  • Loading branch information
zsxwing authored and tdas committed Dec 22, 2015
1 parent b374a25 commit 93db50d
Showing 1 changed file with 3 additions and 0 deletions.
3 changes: 3 additions & 0 deletions docs/streaming-kafka-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ Next, we discuss how to use this approach in your streaming application.
[key class], [value class], [key decoder class], [value decoder class] ](
streamingContext, [map of Kafka parameters], [set of topics to consume])

You can also pass a `messageHandler` to `createDirectStream` to access `MessageAndMetadata` that contains metadata about the current message and transform it to any desired type.
See the [API docs](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$)
and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala).
</div>
Expand All @@ -115,6 +116,7 @@ Next, we discuss how to use this approach in your streaming application.
[key class], [value class], [key decoder class], [value decoder class],
[map of Kafka parameters], [set of topics to consume]);

You can also pass a `messageHandler` to `createDirectStream` to access `MessageAndMetadata` that contains metadata about the current message and transform it to any desired type.
See the [API docs](api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html)
and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java).

Expand All @@ -123,6 +125,7 @@ Next, we discuss how to use this approach in your streaming application.
from pyspark.streaming.kafka import KafkaUtils
directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})

You can also pass a `messageHandler` to `createDirectStream` to access `KafkaMessageAndMetadata` that contains metadata about the current message and transform it to any desired type.
By default, the Python API will decode Kafka data as UTF8 encoded strings. You can specify your custom decoding function to decode the byte arrays in Kafka records to any arbitrary data type. See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.kafka.KafkaUtils)
and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/direct_kafka_wordcount.py).
</div>
Expand Down

0 comments on commit 93db50d

Please sign in to comment.