From ae464e0c0f2f87bae309d7f6bdb1a5a4a3cd646b Mon Sep 17 00:00:00 2001 From: Ken Takagiwa Date: Thu, 17 Jul 2014 17:09:23 -0700 Subject: [PATCH] edit python sparkstreaming example --- examples/src/main/python/streaming/network_wordcount.py | 8 +++++++- examples/src/main/python/streaming/wordcount.py | 1 + 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/examples/src/main/python/streaming/network_wordcount.py b/examples/src/main/python/streaming/network_wordcount.py index 67dc28f7bf7f0..77fca7ff7657d 100644 --- a/examples/src/main/python/streaming/network_wordcount.py +++ b/examples/src/main/python/streaming/network_wordcount.py @@ -1,6 +1,7 @@ import sys from operator import add +from pyspark.conf import SparkConf from pyspark.streaming.context import StreamingContext from pyspark.streaming.duration import * @@ -8,15 +9,20 @@ if len(sys.argv) != 3: print >> sys.stderr, "Usage: wordcount " exit(-1) - ssc = StreamingContext(appName="PythonStreamingNetworkWordCount", duration=Seconds(1)) + conf = SparkConf() + conf.setAppName("PythonStreamingNetworkWordCount") + conf.set("spark.default.parallelism", 1) + ssc = StreamingContext(conf=conf, duration=Seconds(1)) lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) fm_lines = lines.flatMap(lambda x: x.split(" ")) filtered_lines = fm_lines.filter(lambda line: "Spark" in line) mapped_lines = fm_lines.map(lambda x: (x, 1)) + reduced_lines = mapped_lines.reduce(add) fm_lines.pyprint() filtered_lines.pyprint() mapped_lines.pyprint() + reduced_lines.pyprint() ssc.start() ssc.awaitTermination() diff --git a/examples/src/main/python/streaming/wordcount.py b/examples/src/main/python/streaming/wordcount.py index 3996991109d60..9ff8bc5ac9ab2 100644 --- a/examples/src/main/python/streaming/wordcount.py +++ b/examples/src/main/python/streaming/wordcount.py @@ -13,6 +13,7 @@ conf.setAppName("PythonStreamingWordCount") conf.set("spark.default.parallelism", 1) +# still has a bug # ssc = StreamingContext(appName="PythonStreamingWordCount", duration=Seconds(1)) ssc = StreamingContext(conf=conf, duration=Seconds(1))