diff --git a/examples/src/main/python/streaming/test_oprations.py b/examples/src/main/python/streaming/test_oprations.py index cb338ced5f228..084902b6a2f0d 100644 --- a/examples/src/main/python/streaming/test_oprations.py +++ b/examples/src/main/python/streaming/test_oprations.py @@ -15,10 +15,11 @@ lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) words = lines.flatMap(lambda line: line.split(" ")) +# ssc.checkpoint("checkpoint") mapped_words = words.map(lambda word: (word, 1)) count = mapped_words.reduceByKey(add) count.pyprint() ssc.start() -# ssc.awaitTermination() - ssc.stop() + ssc.awaitTermination() +# ssc.stop() diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index 1ad2e36ad21cf..be6da7f2aad68 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -419,6 +419,7 @@ def saveAsTextFile(rdd, time): # TODO: implemtnt rightOuterJoin + class PipelinedDStream(DStream): def __init__(self, prev, func, preservesPartitioning=False): if not isinstance(prev, PipelinedDStream) or not prev._is_pipelinable():