diff --git a/core/pom.xml b/core/pom.xml
index 2a81f6df289c0..7eb0b48eaeebd 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -21,7 +21,11 @@
org.apache.spark
spark-parent
+<<<<<<< HEAD
1.2.0-SNAPSHOT
+=======
+ 1.0.0
+>>>>>>> initial commit for pySparkStreaming
../pom.xml
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index f9ff4ea6ca157..022e2891559d7 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -288,7 +288,7 @@ private class PythonException(msg: String, cause: Exception) extends RuntimeExce
* Form an RDD[(Array[Byte], Array[Byte])] from key-value pairs returned from Python.
* This is used by PySpark's shuffle operations.
*/
-private class PairwiseRDD(prev: RDD[Array[Byte]]) extends
+private[spark] class PairwiseRDD(prev: RDD[Array[Byte]]) extends
RDD[(Long, Array[Byte])](prev) {
override def getPartitions = prev.partitions
override def compute(split: Partition, context: TaskContext) =
diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
index b66c3ba4d5fb0..dc68b1fbda8bb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
@@ -57,6 +57,7 @@ object PythonRunner {
val builder = new ProcessBuilder(Seq(pythonExec, "-u", formattedPythonFile) ++ otherArgs)
val env = builder.environment()
env.put("PYTHONPATH", pythonPath)
+ env.put("PYSPARK_PYTHON", pythonExec)
env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort)
builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
val process = builder.start()
diff --git a/examples/src/main/python/streaming/wordcount.py b/examples/src/main/python/streaming/wordcount.py
new file mode 100644
index 0000000000000..f44cd696894ba
--- /dev/null
+++ b/examples/src/main/python/streaming/wordcount.py
@@ -0,0 +1,22 @@
+import sys
+from operator import add
+
+from pyspark.streaming.context import StreamingContext
+from pyspark.streaming.duration import *
+
+if __name__ == "__main__":
+ if len(sys.argv) != 2:
+ print >> sys.stderr, "Usage: wordcount "
+ exit(-1)
+ ssc = StreamingContext(appName="PythonStreamingWordCount", duration=Seconds(1))
+
+ lines = ssc.textFileStream(sys.argv[1])
+ fm_lines = lines.flatMap(lambda x: x.split(" "))
+ filtered_lines = fm_lines.filter(lambda line: "Spark" in line)
+ mapped_lines = fm_lines.map(lambda x: (x, 1))
+
+ fm_lines.pyprint()
+ filtered_lines.pyprint()
+ mapped_lines.pyprint()
+ ssc.start()
+ ssc.awaitTermination()
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index 9c70fa5c16d0c..c3fef42d118bd 100644
--- a/python/pyspark/java_gateway.py
+++ b/python/pyspark/java_gateway.py
@@ -108,6 +108,9 @@ def run(self):
java_import(gateway.jvm, "org.apache.spark.SparkConf")
java_import(gateway.jvm, "org.apache.spark.api.java.*")
java_import(gateway.jvm, "org.apache.spark.api.python.*")
+ java_import(gateway.jvm, "org.apache.spark.streaming.*")
+ java_import(gateway.jvm, "org.apache.spark.streaming.api.java.*")
+ java_import(gateway.jvm, "org.apache.spark.streaming.api.python.*")
java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
java_import(gateway.jvm, "org.apache.spark.sql.SQLContext")
java_import(gateway.jvm, "org.apache.spark.sql.hive.HiveContext")
diff --git a/python/pyspark/streaming/__init__.py b/python/pyspark/streaming/__init__.py
new file mode 100644
index 0000000000000..719592912e80c
--- /dev/null
+++ b/python/pyspark/streaming/__init__.py
@@ -0,0 +1 @@
+__author__ = 'ktakagiw'
diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py
new file mode 100644
index 0000000000000..c8ae9c4af85c9
--- /dev/null
+++ b/python/pyspark/streaming/context.py
@@ -0,0 +1,133 @@
+__author__ = 'ktakagiw'
+
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import shutil
+import sys
+from threading import Lock
+from tempfile import NamedTemporaryFile
+
+from pyspark import accumulators
+from pyspark.accumulators import Accumulator
+from pyspark.broadcast import Broadcast
+from pyspark.conf import SparkConf
+from pyspark.files import SparkFiles
+from pyspark.java_gateway import launch_gateway
+from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer
+from pyspark.storagelevel import StorageLevel
+from pyspark.rdd import RDD
+from pyspark.context import SparkContext
+
+from py4j.java_collections import ListConverter
+
+from pyspark.streaming.dstream import DStream
+
+class StreamingContext(object):
+ """
+ Main entry point for Spark functionality. A StreamingContext represents the
+ connection to a Spark cluster, and can be used to create L{RDD}s and
+ broadcast variables on that cluster.
+ """
+
+ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
+ environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None,
+ gateway=None, duration=None):
+ """
+ Create a new StreamingContext. At least the master and app name and duration
+ should be set, either through the named parameters here or through C{conf}.
+
+ @param master: Cluster URL to connect to
+ (e.g. mesos://host:port, spark://host:port, local[4]).
+ @param appName: A name for your job, to display on the cluster web UI.
+ @param sparkHome: Location where Spark is installed on cluster nodes.
+ @param pyFiles: Collection of .zip or .py files to send to the cluster
+ and add to PYTHONPATH. These can be paths on the local file
+ system or HDFS, HTTP, HTTPS, or FTP URLs.
+ @param environment: A dictionary of environment variables to set on
+ worker nodes.
+ @param batchSize: The number of Python objects represented as a single
+ Java object. Set 1 to disable batching or -1 to use an
+ unlimited batch size.
+ @param serializer: The serializer for RDDs.
+ @param conf: A L{SparkConf} object setting Spark properties.
+ @param gateway: Use an existing gateway and JVM, otherwise a new JVM
+ will be instatiated.
+ @param duration: A L{Duration} Duration for SparkStreaming
+
+ """
+ # Create the Python Sparkcontext
+ self._sc = SparkContext(master=master, appName=appName, sparkHome=sparkHome,
+ pyFiles=pyFiles, environment=environment, batchSize=batchSize,
+ serializer=serializer, conf=conf, gateway=gateway)
+ self._jvm = self._sc._jvm
+ self._jssc = self._initialize_context(self._sc._jsc, duration._jduration)
+
+ # Initialize StremaingContext in function to allow subclass specific initialization
+ def _initialize_context(self, jspark_context, jduration):
+ return self._jvm.JavaStreamingContext(jspark_context, jduration)
+
+ def actorStream(self, props, name, storageLevel, supervisorStrategy):
+ raise NotImplementedError
+
+ def addStreamingListener(self, streamingListener):
+ raise NotImplementedError
+
+ def awaitTermination(self, timeout=None):
+ if timeout:
+ self._jssc.awaitTermination(timeout)
+ else:
+ self._jssc.awaitTermination()
+
+ def checkpoint(self, directory):
+ raise NotImplementedError
+
+ def fileStream(self, directory, filter=None, newFilesOnly=None):
+ raise NotImplementedError
+
+ def networkStream(self, receiver):
+ raise NotImplementedError
+
+ def queueStream(self, queue, oneAtATime=True, defaultRDD=None):
+ raise NotImplementedError
+
+ def rawSocketStream(self, hostname, port, storagelevel):
+ raise NotImplementedError
+
+ def remember(self, duration):
+ raise NotImplementedError
+
+ def socketStream(hostname, port, converter,storageLevel):
+ raise NotImplementedError
+
+ def start(self):
+ self._jssc.start()
+
+ def stop(self, stopSparkContext=True):
+ raise NotImplementedError
+
+ def textFileStream(self, directory):
+ return DStream(self._jssc.textFileStream(directory), self, UTF8Deserializer())
+
+ def transform(self, seq):
+ raise NotImplementedError
+
+ def union(self, seq):
+ raise NotImplementedError
+
diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py
new file mode 100644
index 0000000000000..b422b147d11e1
--- /dev/null
+++ b/python/pyspark/streaming/dstream.py
@@ -0,0 +1,315 @@
+from base64 import standard_b64encode as b64enc
+import copy
+from collections import defaultdict
+from collections import namedtuple
+from itertools import chain, ifilter, imap
+import operator
+import os
+import sys
+import shlex
+import traceback
+from subprocess import Popen, PIPE
+from tempfile import NamedTemporaryFile
+from threading import Thread
+import warnings
+import heapq
+from random import Random
+
+from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
+ BatchedSerializer, CloudPickleSerializer, PairDeserializer, pack_long
+from pyspark.join import python_join, python_left_outer_join, \
+ python_right_outer_join, python_cogroup
+from pyspark.statcounter import StatCounter
+from pyspark.rddsampler import RDDSampler
+from pyspark.storagelevel import StorageLevel
+#from pyspark.resultiterable import ResultIterable
+from pyspark.rdd import _JavaStackTrace
+
+from py4j.java_collections import ListConverter, MapConverter
+
+__all__ = ["DStream"]
+
+class DStream(object):
+ def __init__(self, jdstream, ssc, jrdd_deserializer):
+ self._jdstream = jdstream
+ self._ssc = ssc
+ self.ctx = ssc._sc
+ self._jrdd_deserializer = jrdd_deserializer
+
+ def generatedRDDs(self):
+ """
+ // RDDs generated, marked as private[streaming] so that testsuites can access it
+ @transient
+ """
+ pass
+
+ def print_(self):
+ """
+ """
+ # print is a resrved name of Python. We cannot give print to function name
+ getattr(self._jdstream, "print")()
+
+ def pyprint(self):
+ """
+ """
+ self._jdstream.pyprint()
+
+ def cache(self):
+ """
+ """
+ raise NotImplementedError
+
+ def checkpoint(self):
+ """
+ """
+ raise NotImplementedError
+
+ def compute(self, time):
+ """
+ """
+ raise NotImplementedError
+
+ def context(self):
+ """
+ """
+ raise NotImplementedError
+
+ def count(self):
+ """
+ """
+ raise NotImplementedError
+
+ def countByValue(self, numPartitions=None):
+ """
+ """
+ raise NotImplementedError
+
+ def countByValueAndWindow(self, duration, slideDuration=None):
+ """
+ """
+ raise NotImplementedError
+
+ def countByWindow(self, duration, slideDuration=None):
+ """
+ """
+ raise NotImplementedError
+
+ def dstream(self):
+ """
+ """
+ raise NotImplementedError
+
+ def filter(self, f):
+ """
+ """
+ def func(iterator): return ifilter(f, iterator)
+ return self.mapPartitions(func)
+
+ def flatMap(self, f, preservesPartitioning=False):
+ """
+ """
+ def func(s, iterator): return chain.from_iterable(imap(f, iterator))
+ return self.mapPartitionsWithIndex(func, preservesPartitioning)
+
+ def foreachRDD(self, f, time):
+ """
+ """
+ raise NotImplementedError
+
+ def glom(self):
+ """
+ """
+ raise NotImplementedError
+
+ def map(self, f, preservesPartitioning=False):
+ """
+ """
+ def func(split, iterator): return imap(f, iterator)
+ return PipelinedDStream(self, func, preservesPartitioning)
+
+ def mapPartitions(self, f):
+ """
+ """
+ def func(s, iterator): return f(iterator)
+ return self.mapPartitionsWithIndex(func)
+
+ def perist(self, storageLevel):
+ """
+ """
+ raise NotImplementedError
+
+ def reduce(self, func, numPartitions=None):
+ """
+
+ """
+ return self._combineByKey(lambda x:x, func, func, numPartitions)
+
+ def _combineByKey(self, createCombiner, mergeValue, mergeCombiners,
+ numPartitions = None):
+ """
+ """
+ if numPartitions is None:
+ numPartitions = self.ctx._defaultParallelism()
+ def combineLocally(iterator):
+ combiners = {}
+ for x in iterator:
+ (k, v) = x
+ if k not in combiners:
+ combiners[k] = createCombiner(v)
+ else:
+ combiners[k] = mergeValue(combiners[k], v)
+ return combiners.iteritems()
+ locally_combined = self.mapPartitions(combineLocally)
+ shuffled = locally_combined.partitionBy(numPartitions)
+ def _mergeCombiners(iterator):
+ combiners = {}
+ for (k, v) in iterator:
+ if not k in combiners:
+ combiners[k] = v
+ else:
+ combiners[k] = mergeCombiners(combiners[k], v)
+ return combiners.iteritems()
+ return shuffled.mapPartitions(_mergeCombiners)
+
+
+ def partitionBy(self, numPartitions, partitionFunc=None):
+ """
+ Return a copy of the DStream partitioned using the specified partitioner.
+
+ """
+ if numPartitions is None:
+ numPartitions = self.ctx._defaultReducePartitions()
+
+ if partitionFunc is None:
+ partitionFunc = lambda x: 0 if x is None else hash(x)
+ # Transferring O(n) objects to Java is too expensive. Instead, we'll
+ # form the hash buckets in Python, transferring O(numPartitions) objects
+ # to Java. Each object is a (splitNumber, [objects]) pair.
+ outputSerializer = self.ctx._unbatched_serializer
+ def add_shuffle_key(split, iterator):
+
+ buckets = defaultdict(list)
+
+ for (k, v) in iterator:
+ buckets[partitionFunc(k) % numPartitions].append((k, v))
+ for (split, items) in buckets.iteritems():
+ yield pack_long(split)
+ yield outputSerializer.dumps(items)
+ keyed = PipelinedDStream(self, add_shuffle_key)
+ keyed._bypass_serializer = True
+ with _JavaStackTrace(self.ctx) as st:
+ #JavaDStream
+ #pairRDD = self.ctx._jvm.PairwiseDStream(keyed._jdstream.dstream()).asJavaPairRDD()
+ pairDStream = self.ctx._jvm.PairwiseDStream(keyed._jdstream.dstream()).asJavaPairDStream()
+ partitioner = self.ctx._jvm.PythonPartitioner(numPartitions,
+ id(partitionFunc))
+ jdstream = pairDStream.partitionBy(partitioner).values()
+ dstream = DStream(jdstream, self._ssc, BatchedSerializer(outputSerializer))
+ # This is required so that id(partitionFunc) remains unique, even if
+ # partitionFunc is a lambda:
+ dstream._partitionFunc = partitionFunc
+ return dstream
+
+
+
+ def reduceByWindow(self, reduceFunc, windowDuration, slideDuration, inReduceTunc):
+ """
+ """
+
+ raise NotImplementedError
+
+ def repartition(self, numPartitions):
+ """
+ """
+ raise NotImplementedError
+
+ def slice(self, fromTime, toTime):
+ """
+ """
+ raise NotImplementedError
+
+ def transform(self, transformFunc):
+ """
+ """
+ raise NotImplementedError
+
+ def transformWith(self, other, transformFunc):
+ """
+ """
+ raise NotImplementedError
+
+ def union(self, that):
+ """
+ """
+ raise NotImplementedError
+
+ def window(self, windowDuration, slideDuration=None):
+ """
+ """
+ raise NotImplementedError
+
+ def wrapRDD(self, rdd):
+ """
+ """
+ raise NotImplementedError
+
+ def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
+ return PipelinedDStream(self, f, preservesPartitioning)
+
+
+class PipelinedDStream(DStream):
+ def __init__(self, prev, func, preservesPartitioning=False):
+ if not isinstance(prev, PipelinedDStream) or not prev._is_pipelinable():
+ # This transformation is the first in its stage:
+ self.func = func
+ self.preservesPartitioning = preservesPartitioning
+ self._prev_jdstream = prev._jdstream
+ self._prev_jrdd_deserializer = prev._jrdd_deserializer
+ else:
+ prev_func = prev.func
+ def pipeline_func(split, iterator):
+ return func(split, prev_func(split, iterator))
+ self.func = pipeline_func
+ self.preservesPartitioning = \
+ prev.preservesPartitioning and preservesPartitioning
+ self._prev_jdstream = prev._prev_jdstream # maintain the pipeline
+ self._prev_jrdd_deserializer = prev._prev_jrdd_deserializer
+ self.is_cached = False
+ self.is_checkpointed = False
+ self._ssc = prev._ssc
+ self.ctx = prev.ctx
+ self.prev = prev
+ self._jdstream_val = None
+ self._jrdd_deserializer = self.ctx.serializer
+ self._bypass_serializer = False
+
+ @property
+ def _jdstream(self):
+ if self._jdstream_val:
+ return self._jdstream_val
+ if self._bypass_serializer:
+ serializer = NoOpSerializer()
+ else:
+ serializer = self.ctx.serializer
+
+ command = (self.func, self._prev_jrdd_deserializer, serializer)
+ pickled_command = CloudPickleSerializer().dumps(command)
+ broadcast_vars = ListConverter().convert(
+ [x._jbroadcast for x in self.ctx._pickled_broadcast_vars],
+ self.ctx._gateway._gateway_client)
+ self.ctx._pickled_broadcast_vars.clear()
+ class_tag = self._prev_jdstream.classTag()
+ env = MapConverter().convert(self.ctx.environment,
+ self.ctx._gateway._gateway_client)
+ includes = ListConverter().convert(self.ctx._python_includes,
+ self.ctx._gateway._gateway_client)
+ python_dstream = self.ctx._jvm.PythonDStream(self._prev_jdstream.dstream(),
+ bytearray(pickled_command),
+ env, includes, self.preservesPartitioning,
+ self.ctx.pythonExec, broadcast_vars, self.ctx._javaAccumulator,
+ class_tag)
+ self._jdstream_val = python_dstream.asJavaDStream()
+ return self._jdstream_val
+
+ def _is_pipelinable(self):
+ return not (self.is_cached or self.is_checkpointed)
diff --git a/python/pyspark/streaming/duration.py b/python/pyspark/streaming/duration.py
new file mode 100644
index 0000000000000..ef1b4f6cef237
--- /dev/null
+++ b/python/pyspark/streaming/duration.py
@@ -0,0 +1,171 @@
+__author__ = 'ktakagiw'
+
+from pyspark.streaming import utils
+
+class Duration(object):
+ """
+ Duration for Spark Streaming application. Used to set duration
+
+ Most of the time, you would create a Duration object with
+ C{Duration()}, which will load values from C{spark.streaming.*} Java system
+ properties as well. In this case, any parameters you set directly on
+ the C{Duration} object take priority over system properties.
+
+ """
+ def __init__(self, millis, _jvm=None):
+ """
+ Create new Duration.
+
+ @param millis: milisecond
+
+ """
+ self._millis = millis
+
+ from pyspark.context import SparkContext
+ SparkContext._ensure_initialized()
+ _jvm = _jvm or SparkContext._jvm
+ self._jduration = _jvm.Duration(millis)
+
+ def toString(self):
+ """ Return duration as string """
+ return str(self._millis) + " ms"
+
+ def isZero(self):
+ """ Check if millis is zero """
+ return self._millis == 0
+
+ def prettyPrint(self):
+ """
+ Return a human-readable string representing a duration
+ """
+ return utils.msDurationToString(self._millis)
+
+ def milliseconds(self):
+ """ Return millisecond """
+ return self._millis
+
+ def toFormattedString(self):
+ """ Return millisecond """
+ return str(self._millis)
+
+ def max(self, other):
+ """ Return higher Duration """
+ Duration._is_duration(other)
+ if self > other:
+ return self
+ else:
+ return other
+
+ def min(self, other):
+ """ Return lower Durattion """
+ Duration._is_duration(other)
+ if self < other:
+ return self
+ else:
+ return other
+
+ def __str__(self):
+ return self.toString()
+
+ def __add__(self, other):
+ """ Add Duration and Duration """
+ Duration._is_duration(other)
+ return Duration(self._millis + other._millis)
+
+ def __sub__(self, other):
+ """ Subtract Duration by Duration """
+ Duration._is_duration(other)
+ return Duration(self._millis - other._millis)
+
+ def __mul__(self, other):
+ """ Multiple Duration by Duration """
+ Duration._is_duration(other)
+ return Duration(self._millis * other._millis)
+
+ def __div__(self, other):
+ """
+ Divide Duration by Duration
+ for Python 2.X
+ """
+ Duration._is_duration(other)
+ return Duration(self._millis / other._millis)
+
+ def __truediv__(self, other):
+ """
+ Divide Duration by Duration
+ for Python 3.0
+ """
+ Duration._is_duration(other)
+ return Duration(self._millis / other._millis)
+
+ def __floordiv__(self, other):
+ """ Divide Duration by Duration """
+ Duration._is_duration(other)
+ return Duration(self._millis // other._millis)
+
+ def __len__(self):
+ """ Length of miilisecond in Duration """
+ return len(self._millis)
+
+ def __lt__(self, other):
+ """ Duration < Duration """
+ Duration._is_duration(other)
+ return self._millis < other._millis
+
+ def __le__(self, other):
+ """ Duration <= Duration """
+ Duration._is_duration(other)
+ return self.millis <= other._millis
+
+ def __eq__(self, other):
+ """ Duration == Duration """
+ Duration._is_duration(other)
+ return self._millis == other._millis
+
+ def __ne__(self, other):
+ """ Duration != Duration """
+ Duration._is_duration(other)
+ return self._millis != other._millis
+
+ def __gt__(self, other):
+ """ Duration > Duration """
+ Duration._is_duration(other)
+ return self._millis > other._millis
+
+ def __ge__(self, other):
+ """ Duration >= Duration """
+ Duration._is_duration(other)
+ return self._millis >= other._millis
+
+ @classmethod
+ def _is_duration(self, instance):
+ """ is instance Duration """
+ if not isinstance(instance, Duration):
+ raise TypeError("This should be Duration")
+
+def Milliseconds(milliseconds):
+ """
+ Helper function that creates instance of [[pysparkstreaming.duration]] representing
+ a given number of milliseconds.
+ """
+ return Duration(milliseconds)
+
+def Seconds(seconds):
+ """
+ Helper function that creates instance of [[pysparkstreaming.duration]] representing
+ a given number of seconds.
+ """
+ return Duration(seconds * 1000)
+
+def Minites(minites):
+ """
+ Helper function that creates instance of [[pysparkstreaming.duration]] representing
+ a given number of minutes.
+ """
+ return Duration(minutes * 60000)
+
+if __name__ == "__main__":
+ d = Duration(1)
+ print d
+ print d.milliseconds()
+
diff --git a/python/pyspark/streaming/jtime.py b/python/pyspark/streaming/jtime.py
new file mode 100644
index 0000000000000..41670af659ea3
--- /dev/null
+++ b/python/pyspark/streaming/jtime.py
@@ -0,0 +1,116 @@
+__author__ = 'ktakagiw'
+
+from pyspark.streaming import utils
+from pyspark.streaming.duration import Duration
+
+class Time(object):
+ """
+ Time for Spark Streaming application. Used to set Time
+
+ Most of the time, you would create a Duration object with
+ C{Time()}, which will load values from C{spark.streaming.*} Java system
+ properties as well. In this case, any parameters you set directly on
+ the C{Time} object take priority over system properties.
+
+ """
+ def __init__(self, millis, _jvm=None):
+ """
+ Create new Time.
+
+ @param millis: milisecond
+
+ @param _jvm: internal parameter used to pass a handle to the
+ Java VM; does not need to be set by users
+
+ """
+ self._millis = millis
+
+ from pyspark.context import StreamingContext
+ StreamingContext._ensure_initialized()
+ _jvm = _jvm or StreamingContext._jvm
+ self._jtime = _jvm.Time(millis)
+
+ def toString(self):
+ """ Return time as string """
+ return str(self._millis) + " ms"
+
+ def milliseconds(self):
+ """ Return millisecond """
+ return self._millis
+
+ def max(self, other):
+ """ Return higher Time """
+ Time._is_time(other)
+ if self > other:
+ return self
+ else:
+ return other
+
+ def min(self, other):
+ """ Return lower Time """
+ Time._is_time(other)
+ if self < other:
+ return self
+ else:
+ return other
+
+ def __add__(self, other):
+ """ Add Time and Time """
+ Duration._is_duration(other)
+ return Time(self._millis + other._millis)
+
+ def __sub__(self, other):
+ """ Subtract Time by Duration or Time """
+ if isinstance(other, Duration):
+ return Time(self._millis - other._millis)
+ elif isinstance(other, Time):
+ return Duration(self._mills, other._millis)
+ else:
+ raise TypeError
+
+ def __lt__(self, other):
+ """ Time < Time """
+ Time._is_time(other)
+ return self._millis < other._millis
+
+ def __le__(self, other):
+ """ Time <= Time """
+ Time._is_time(other)
+ return self.millis <= other._millis
+
+ def __eq__(self, other):
+ """ Time == Time """
+ Time._is_time(other)
+ return self._millis == other._millis
+
+ def __ne__(self, other):
+ """ Time != Time """
+ Time._is_time(other)
+ return self._millis != other._millis
+
+ def __gt__(self, other):
+ """ Time > Time """
+ Time._is_time(other)
+ return self._millis > other._millis
+
+ def __ge__(self, other):
+ """ Time >= Time """
+ Time._is_time(other)
+ return self._millis >= other._millis
+
+ def isMultipbleOf(duration):
+ """ is multiple by Duration """
+ Duration._is_duration(duration)
+ return self._millis % duration._millis == 0
+
+ def until(time, interval):
+ raise NotImplementedError
+
+ def to(time, interval):
+ raise NotImplementedError
+
+ @classmethod
+ def _is_time(self, instance):
+ """ is instance Time """
+ if not isinstance(instance, Time):
+ raise TypeError
diff --git a/python/pyspark/streaming/pyprint.py b/python/pyspark/streaming/pyprint.py
new file mode 100644
index 0000000000000..fcdaca510812c
--- /dev/null
+++ b/python/pyspark/streaming/pyprint.py
@@ -0,0 +1,28 @@
+import sys
+from itertools import chain
+from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer
+
+def collect(binary_file_path):
+ dse = PickleSerializer()
+ with open(binary_file_path, 'rb') as tempFile:
+ for item in dse.load_stream(tempFile):
+ yield item
+def main():
+ try:
+ binary_file_path = sys.argv[1]
+ except:
+ print "Missed FilePath in argement"
+
+ if not binary_file_path:
+ return
+
+ counter = 0
+ for rdd in chain.from_iterable(collect(binary_file_path)):
+ print rdd
+ counter = counter + 1
+ if counter >= 10:
+ print "..."
+ break
+
+if __name__ =="__main__":
+ exit(main())
diff --git a/python/pyspark/streaming/utils.py b/python/pyspark/streaming/utils.py
new file mode 100644
index 0000000000000..71aa3376c6578
--- /dev/null
+++ b/python/pyspark/streaming/utils.py
@@ -0,0 +1,18 @@
+__author__ = 'ktakagiw'
+
+def msDurationToString(ms):
+ """
+ Returns a human-readable string representing a duration such as "35ms"
+ """
+ second = 1000
+ minute = 60 * second
+ hour = 60 * minute
+
+ if ms < second:
+ return "%d ms" % ms
+ elif ms < minute:
+ return "%.1f s" % (float(ms) / second)
+ elif ms < hout:
+ return "%.1f m" % (float(ms) / minute)
+ else:
+ return "%.2f h" % (float(ms) / hour)
diff --git a/streaming/pom.xml b/streaming/pom.xml
index 12f900c91eb98..483e200ff9f16 100644
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@ -77,9 +77,9 @@
org.scalatest
scalatest-maven-plugin
-
-