From da0976807efe2a1ebec24cfa7838621d07a2404a Mon Sep 17 00:00:00 2001 From: giwa Date: Sun, 31 Aug 2014 16:19:38 +0900 Subject: [PATCH] added StreamingContext.remember --- python/pyspark/streaming/context.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index 6b6cc653dfa6f..a1a9be1eae439 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -21,6 +21,7 @@ from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer from pyspark.context import SparkContext from pyspark.streaming.dstream import DStream +from pyspark.streaming.duration import Duration from py4j.java_collections import ListConverter @@ -107,6 +108,20 @@ def awaitTermination(self, timeout=None): else: self._jssc.awaitTermination(timeout) + def remember(self, duration): + """ + Set each DStreams in this context to remember RDDs it generated in the last given duration. + DStreams remember RDDs only for a limited duration of time and releases them for garbage + collection. This method allows the developer to specify how to long to remember the RDDs ( + if the developer wishes to query old data outside the DStream computation). + @param duration pyspark.streaming.duration.Duration object. + Minimum duration that each DStream should remember its RDDs + """ + if not isinstance(duration, Duration): + raise TypeError("Input should be pyspark.streaming.duration.Duration object") + + self._jssc.remember(duration._jduration) + # TODO: add storageLevel def socketTextStream(self, hostname, port): """