diff --git a/core/pom.xml b/core/pom.xml index 2a81f6df289c0..7eb0b48eaeebd 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,11 @@ org.apache.spark spark-parent +<<<<<<< HEAD 1.2.0-SNAPSHOT +======= + 1.0.0 +>>>>>>> initial commit for pySparkStreaming ../pom.xml diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index f9ff4ea6ca157..022e2891559d7 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -288,7 +288,7 @@ private class PythonException(msg: String, cause: Exception) extends RuntimeExce * Form an RDD[(Array[Byte], Array[Byte])] from key-value pairs returned from Python. * This is used by PySpark's shuffle operations. */ -private class PairwiseRDD(prev: RDD[Array[Byte]]) extends +private[spark] class PairwiseRDD(prev: RDD[Array[Byte]]) extends RDD[(Long, Array[Byte])](prev) { override def getPartitions = prev.partitions override def compute(split: Partition, context: TaskContext) = diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala index b66c3ba4d5fb0..dc68b1fbda8bb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala @@ -57,6 +57,7 @@ object PythonRunner { val builder = new ProcessBuilder(Seq(pythonExec, "-u", formattedPythonFile) ++ otherArgs) val env = builder.environment() env.put("PYTHONPATH", pythonPath) + env.put("PYSPARK_PYTHON", pythonExec) env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort) builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize val process = builder.start() diff --git a/examples/src/main/python/streaming/wordcount.py b/examples/src/main/python/streaming/wordcount.py new file mode 100644 index 0000000000000..f44cd696894ba --- /dev/null +++ b/examples/src/main/python/streaming/wordcount.py @@ -0,0 +1,22 @@ +import sys +from operator import add + +from pyspark.streaming.context import StreamingContext +from pyspark.streaming.duration import * + +if __name__ == "__main__": + if len(sys.argv) != 2: + print >> sys.stderr, "Usage: wordcount " + exit(-1) + ssc = StreamingContext(appName="PythonStreamingWordCount", duration=Seconds(1)) + + lines = ssc.textFileStream(sys.argv[1]) + fm_lines = lines.flatMap(lambda x: x.split(" ")) + filtered_lines = fm_lines.filter(lambda line: "Spark" in line) + mapped_lines = fm_lines.map(lambda x: (x, 1)) + + fm_lines.pyprint() + filtered_lines.pyprint() + mapped_lines.pyprint() + ssc.start() + ssc.awaitTermination() diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 9c70fa5c16d0c..c3fef42d118bd 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -108,6 +108,9 @@ def run(self): java_import(gateway.jvm, "org.apache.spark.SparkConf") java_import(gateway.jvm, "org.apache.spark.api.java.*") java_import(gateway.jvm, "org.apache.spark.api.python.*") + java_import(gateway.jvm, "org.apache.spark.streaming.*") + java_import(gateway.jvm, "org.apache.spark.streaming.api.java.*") + java_import(gateway.jvm, "org.apache.spark.streaming.api.python.*") java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*") java_import(gateway.jvm, "org.apache.spark.sql.SQLContext") java_import(gateway.jvm, "org.apache.spark.sql.hive.HiveContext") diff --git a/python/pyspark/streaming/__init__.py b/python/pyspark/streaming/__init__.py new file mode 100644 index 0000000000000..719592912e80c --- /dev/null +++ b/python/pyspark/streaming/__init__.py @@ -0,0 +1 @@ +__author__ = 'ktakagiw' diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py new file mode 100644 index 0000000000000..c8ae9c4af85c9 --- /dev/null +++ b/python/pyspark/streaming/context.py @@ -0,0 +1,133 @@ +__author__ = 'ktakagiw' + + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import shutil +import sys +from threading import Lock +from tempfile import NamedTemporaryFile + +from pyspark import accumulators +from pyspark.accumulators import Accumulator +from pyspark.broadcast import Broadcast +from pyspark.conf import SparkConf +from pyspark.files import SparkFiles +from pyspark.java_gateway import launch_gateway +from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer +from pyspark.storagelevel import StorageLevel +from pyspark.rdd import RDD +from pyspark.context import SparkContext + +from py4j.java_collections import ListConverter + +from pyspark.streaming.dstream import DStream + +class StreamingContext(object): + """ + Main entry point for Spark functionality. A StreamingContext represents the + connection to a Spark cluster, and can be used to create L{RDD}s and + broadcast variables on that cluster. + """ + + def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, + environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None, + gateway=None, duration=None): + """ + Create a new StreamingContext. At least the master and app name and duration + should be set, either through the named parameters here or through C{conf}. + + @param master: Cluster URL to connect to + (e.g. mesos://host:port, spark://host:port, local[4]). + @param appName: A name for your job, to display on the cluster web UI. + @param sparkHome: Location where Spark is installed on cluster nodes. + @param pyFiles: Collection of .zip or .py files to send to the cluster + and add to PYTHONPATH. These can be paths on the local file + system or HDFS, HTTP, HTTPS, or FTP URLs. + @param environment: A dictionary of environment variables to set on + worker nodes. + @param batchSize: The number of Python objects represented as a single + Java object. Set 1 to disable batching or -1 to use an + unlimited batch size. + @param serializer: The serializer for RDDs. + @param conf: A L{SparkConf} object setting Spark properties. + @param gateway: Use an existing gateway and JVM, otherwise a new JVM + will be instatiated. + @param duration: A L{Duration} Duration for SparkStreaming + + """ + # Create the Python Sparkcontext + self._sc = SparkContext(master=master, appName=appName, sparkHome=sparkHome, + pyFiles=pyFiles, environment=environment, batchSize=batchSize, + serializer=serializer, conf=conf, gateway=gateway) + self._jvm = self._sc._jvm + self._jssc = self._initialize_context(self._sc._jsc, duration._jduration) + + # Initialize StremaingContext in function to allow subclass specific initialization + def _initialize_context(self, jspark_context, jduration): + return self._jvm.JavaStreamingContext(jspark_context, jduration) + + def actorStream(self, props, name, storageLevel, supervisorStrategy): + raise NotImplementedError + + def addStreamingListener(self, streamingListener): + raise NotImplementedError + + def awaitTermination(self, timeout=None): + if timeout: + self._jssc.awaitTermination(timeout) + else: + self._jssc.awaitTermination() + + def checkpoint(self, directory): + raise NotImplementedError + + def fileStream(self, directory, filter=None, newFilesOnly=None): + raise NotImplementedError + + def networkStream(self, receiver): + raise NotImplementedError + + def queueStream(self, queue, oneAtATime=True, defaultRDD=None): + raise NotImplementedError + + def rawSocketStream(self, hostname, port, storagelevel): + raise NotImplementedError + + def remember(self, duration): + raise NotImplementedError + + def socketStream(hostname, port, converter,storageLevel): + raise NotImplementedError + + def start(self): + self._jssc.start() + + def stop(self, stopSparkContext=True): + raise NotImplementedError + + def textFileStream(self, directory): + return DStream(self._jssc.textFileStream(directory), self, UTF8Deserializer()) + + def transform(self, seq): + raise NotImplementedError + + def union(self, seq): + raise NotImplementedError + diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py new file mode 100644 index 0000000000000..b422b147d11e1 --- /dev/null +++ b/python/pyspark/streaming/dstream.py @@ -0,0 +1,315 @@ +from base64 import standard_b64encode as b64enc +import copy +from collections import defaultdict +from collections import namedtuple +from itertools import chain, ifilter, imap +import operator +import os +import sys +import shlex +import traceback +from subprocess import Popen, PIPE +from tempfile import NamedTemporaryFile +from threading import Thread +import warnings +import heapq +from random import Random + +from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ + BatchedSerializer, CloudPickleSerializer, PairDeserializer, pack_long +from pyspark.join import python_join, python_left_outer_join, \ + python_right_outer_join, python_cogroup +from pyspark.statcounter import StatCounter +from pyspark.rddsampler import RDDSampler +from pyspark.storagelevel import StorageLevel +#from pyspark.resultiterable import ResultIterable +from pyspark.rdd import _JavaStackTrace + +from py4j.java_collections import ListConverter, MapConverter + +__all__ = ["DStream"] + +class DStream(object): + def __init__(self, jdstream, ssc, jrdd_deserializer): + self._jdstream = jdstream + self._ssc = ssc + self.ctx = ssc._sc + self._jrdd_deserializer = jrdd_deserializer + + def generatedRDDs(self): + """ + // RDDs generated, marked as private[streaming] so that testsuites can access it + @transient + """ + pass + + def print_(self): + """ + """ + # print is a resrved name of Python. We cannot give print to function name + getattr(self._jdstream, "print")() + + def pyprint(self): + """ + """ + self._jdstream.pyprint() + + def cache(self): + """ + """ + raise NotImplementedError + + def checkpoint(self): + """ + """ + raise NotImplementedError + + def compute(self, time): + """ + """ + raise NotImplementedError + + def context(self): + """ + """ + raise NotImplementedError + + def count(self): + """ + """ + raise NotImplementedError + + def countByValue(self, numPartitions=None): + """ + """ + raise NotImplementedError + + def countByValueAndWindow(self, duration, slideDuration=None): + """ + """ + raise NotImplementedError + + def countByWindow(self, duration, slideDuration=None): + """ + """ + raise NotImplementedError + + def dstream(self): + """ + """ + raise NotImplementedError + + def filter(self, f): + """ + """ + def func(iterator): return ifilter(f, iterator) + return self.mapPartitions(func) + + def flatMap(self, f, preservesPartitioning=False): + """ + """ + def func(s, iterator): return chain.from_iterable(imap(f, iterator)) + return self.mapPartitionsWithIndex(func, preservesPartitioning) + + def foreachRDD(self, f, time): + """ + """ + raise NotImplementedError + + def glom(self): + """ + """ + raise NotImplementedError + + def map(self, f, preservesPartitioning=False): + """ + """ + def func(split, iterator): return imap(f, iterator) + return PipelinedDStream(self, func, preservesPartitioning) + + def mapPartitions(self, f): + """ + """ + def func(s, iterator): return f(iterator) + return self.mapPartitionsWithIndex(func) + + def perist(self, storageLevel): + """ + """ + raise NotImplementedError + + def reduce(self, func, numPartitions=None): + """ + + """ + return self._combineByKey(lambda x:x, func, func, numPartitions) + + def _combineByKey(self, createCombiner, mergeValue, mergeCombiners, + numPartitions = None): + """ + """ + if numPartitions is None: + numPartitions = self.ctx._defaultParallelism() + def combineLocally(iterator): + combiners = {} + for x in iterator: + (k, v) = x + if k not in combiners: + combiners[k] = createCombiner(v) + else: + combiners[k] = mergeValue(combiners[k], v) + return combiners.iteritems() + locally_combined = self.mapPartitions(combineLocally) + shuffled = locally_combined.partitionBy(numPartitions) + def _mergeCombiners(iterator): + combiners = {} + for (k, v) in iterator: + if not k in combiners: + combiners[k] = v + else: + combiners[k] = mergeCombiners(combiners[k], v) + return combiners.iteritems() + return shuffled.mapPartitions(_mergeCombiners) + + + def partitionBy(self, numPartitions, partitionFunc=None): + """ + Return a copy of the DStream partitioned using the specified partitioner. + + """ + if numPartitions is None: + numPartitions = self.ctx._defaultReducePartitions() + + if partitionFunc is None: + partitionFunc = lambda x: 0 if x is None else hash(x) + # Transferring O(n) objects to Java is too expensive. Instead, we'll + # form the hash buckets in Python, transferring O(numPartitions) objects + # to Java. Each object is a (splitNumber, [objects]) pair. + outputSerializer = self.ctx._unbatched_serializer + def add_shuffle_key(split, iterator): + + buckets = defaultdict(list) + + for (k, v) in iterator: + buckets[partitionFunc(k) % numPartitions].append((k, v)) + for (split, items) in buckets.iteritems(): + yield pack_long(split) + yield outputSerializer.dumps(items) + keyed = PipelinedDStream(self, add_shuffle_key) + keyed._bypass_serializer = True + with _JavaStackTrace(self.ctx) as st: + #JavaDStream + #pairRDD = self.ctx._jvm.PairwiseDStream(keyed._jdstream.dstream()).asJavaPairRDD() + pairDStream = self.ctx._jvm.PairwiseDStream(keyed._jdstream.dstream()).asJavaPairDStream() + partitioner = self.ctx._jvm.PythonPartitioner(numPartitions, + id(partitionFunc)) + jdstream = pairDStream.partitionBy(partitioner).values() + dstream = DStream(jdstream, self._ssc, BatchedSerializer(outputSerializer)) + # This is required so that id(partitionFunc) remains unique, even if + # partitionFunc is a lambda: + dstream._partitionFunc = partitionFunc + return dstream + + + + def reduceByWindow(self, reduceFunc, windowDuration, slideDuration, inReduceTunc): + """ + """ + + raise NotImplementedError + + def repartition(self, numPartitions): + """ + """ + raise NotImplementedError + + def slice(self, fromTime, toTime): + """ + """ + raise NotImplementedError + + def transform(self, transformFunc): + """ + """ + raise NotImplementedError + + def transformWith(self, other, transformFunc): + """ + """ + raise NotImplementedError + + def union(self, that): + """ + """ + raise NotImplementedError + + def window(self, windowDuration, slideDuration=None): + """ + """ + raise NotImplementedError + + def wrapRDD(self, rdd): + """ + """ + raise NotImplementedError + + def mapPartitionsWithIndex(self, f, preservesPartitioning=False): + return PipelinedDStream(self, f, preservesPartitioning) + + +class PipelinedDStream(DStream): + def __init__(self, prev, func, preservesPartitioning=False): + if not isinstance(prev, PipelinedDStream) or not prev._is_pipelinable(): + # This transformation is the first in its stage: + self.func = func + self.preservesPartitioning = preservesPartitioning + self._prev_jdstream = prev._jdstream + self._prev_jrdd_deserializer = prev._jrdd_deserializer + else: + prev_func = prev.func + def pipeline_func(split, iterator): + return func(split, prev_func(split, iterator)) + self.func = pipeline_func + self.preservesPartitioning = \ + prev.preservesPartitioning and preservesPartitioning + self._prev_jdstream = prev._prev_jdstream # maintain the pipeline + self._prev_jrdd_deserializer = prev._prev_jrdd_deserializer + self.is_cached = False + self.is_checkpointed = False + self._ssc = prev._ssc + self.ctx = prev.ctx + self.prev = prev + self._jdstream_val = None + self._jrdd_deserializer = self.ctx.serializer + self._bypass_serializer = False + + @property + def _jdstream(self): + if self._jdstream_val: + return self._jdstream_val + if self._bypass_serializer: + serializer = NoOpSerializer() + else: + serializer = self.ctx.serializer + + command = (self.func, self._prev_jrdd_deserializer, serializer) + pickled_command = CloudPickleSerializer().dumps(command) + broadcast_vars = ListConverter().convert( + [x._jbroadcast for x in self.ctx._pickled_broadcast_vars], + self.ctx._gateway._gateway_client) + self.ctx._pickled_broadcast_vars.clear() + class_tag = self._prev_jdstream.classTag() + env = MapConverter().convert(self.ctx.environment, + self.ctx._gateway._gateway_client) + includes = ListConverter().convert(self.ctx._python_includes, + self.ctx._gateway._gateway_client) + python_dstream = self.ctx._jvm.PythonDStream(self._prev_jdstream.dstream(), + bytearray(pickled_command), + env, includes, self.preservesPartitioning, + self.ctx.pythonExec, broadcast_vars, self.ctx._javaAccumulator, + class_tag) + self._jdstream_val = python_dstream.asJavaDStream() + return self._jdstream_val + + def _is_pipelinable(self): + return not (self.is_cached or self.is_checkpointed) diff --git a/python/pyspark/streaming/duration.py b/python/pyspark/streaming/duration.py new file mode 100644 index 0000000000000..ef1b4f6cef237 --- /dev/null +++ b/python/pyspark/streaming/duration.py @@ -0,0 +1,171 @@ +__author__ = 'ktakagiw' + +from pyspark.streaming import utils + +class Duration(object): + """ + Duration for Spark Streaming application. Used to set duration + + Most of the time, you would create a Duration object with + C{Duration()}, which will load values from C{spark.streaming.*} Java system + properties as well. In this case, any parameters you set directly on + the C{Duration} object take priority over system properties. + + """ + def __init__(self, millis, _jvm=None): + """ + Create new Duration. + + @param millis: milisecond + + """ + self._millis = millis + + from pyspark.context import SparkContext + SparkContext._ensure_initialized() + _jvm = _jvm or SparkContext._jvm + self._jduration = _jvm.Duration(millis) + + def toString(self): + """ Return duration as string """ + return str(self._millis) + " ms" + + def isZero(self): + """ Check if millis is zero """ + return self._millis == 0 + + def prettyPrint(self): + """ + Return a human-readable string representing a duration + """ + return utils.msDurationToString(self._millis) + + def milliseconds(self): + """ Return millisecond """ + return self._millis + + def toFormattedString(self): + """ Return millisecond """ + return str(self._millis) + + def max(self, other): + """ Return higher Duration """ + Duration._is_duration(other) + if self > other: + return self + else: + return other + + def min(self, other): + """ Return lower Durattion """ + Duration._is_duration(other) + if self < other: + return self + else: + return other + + def __str__(self): + return self.toString() + + def __add__(self, other): + """ Add Duration and Duration """ + Duration._is_duration(other) + return Duration(self._millis + other._millis) + + def __sub__(self, other): + """ Subtract Duration by Duration """ + Duration._is_duration(other) + return Duration(self._millis - other._millis) + + def __mul__(self, other): + """ Multiple Duration by Duration """ + Duration._is_duration(other) + return Duration(self._millis * other._millis) + + def __div__(self, other): + """ + Divide Duration by Duration + for Python 2.X + """ + Duration._is_duration(other) + return Duration(self._millis / other._millis) + + def __truediv__(self, other): + """ + Divide Duration by Duration + for Python 3.0 + """ + Duration._is_duration(other) + return Duration(self._millis / other._millis) + + def __floordiv__(self, other): + """ Divide Duration by Duration """ + Duration._is_duration(other) + return Duration(self._millis // other._millis) + + def __len__(self): + """ Length of miilisecond in Duration """ + return len(self._millis) + + def __lt__(self, other): + """ Duration < Duration """ + Duration._is_duration(other) + return self._millis < other._millis + + def __le__(self, other): + """ Duration <= Duration """ + Duration._is_duration(other) + return self.millis <= other._millis + + def __eq__(self, other): + """ Duration == Duration """ + Duration._is_duration(other) + return self._millis == other._millis + + def __ne__(self, other): + """ Duration != Duration """ + Duration._is_duration(other) + return self._millis != other._millis + + def __gt__(self, other): + """ Duration > Duration """ + Duration._is_duration(other) + return self._millis > other._millis + + def __ge__(self, other): + """ Duration >= Duration """ + Duration._is_duration(other) + return self._millis >= other._millis + + @classmethod + def _is_duration(self, instance): + """ is instance Duration """ + if not isinstance(instance, Duration): + raise TypeError("This should be Duration") + +def Milliseconds(milliseconds): + """ + Helper function that creates instance of [[pysparkstreaming.duration]] representing + a given number of milliseconds. + """ + return Duration(milliseconds) + +def Seconds(seconds): + """ + Helper function that creates instance of [[pysparkstreaming.duration]] representing + a given number of seconds. + """ + return Duration(seconds * 1000) + +def Minites(minites): + """ + Helper function that creates instance of [[pysparkstreaming.duration]] representing + a given number of minutes. + """ + return Duration(minutes * 60000) + +if __name__ == "__main__": + d = Duration(1) + print d + print d.milliseconds() + diff --git a/python/pyspark/streaming/jtime.py b/python/pyspark/streaming/jtime.py new file mode 100644 index 0000000000000..41670af659ea3 --- /dev/null +++ b/python/pyspark/streaming/jtime.py @@ -0,0 +1,116 @@ +__author__ = 'ktakagiw' + +from pyspark.streaming import utils +from pyspark.streaming.duration import Duration + +class Time(object): + """ + Time for Spark Streaming application. Used to set Time + + Most of the time, you would create a Duration object with + C{Time()}, which will load values from C{spark.streaming.*} Java system + properties as well. In this case, any parameters you set directly on + the C{Time} object take priority over system properties. + + """ + def __init__(self, millis, _jvm=None): + """ + Create new Time. + + @param millis: milisecond + + @param _jvm: internal parameter used to pass a handle to the + Java VM; does not need to be set by users + + """ + self._millis = millis + + from pyspark.context import StreamingContext + StreamingContext._ensure_initialized() + _jvm = _jvm or StreamingContext._jvm + self._jtime = _jvm.Time(millis) + + def toString(self): + """ Return time as string """ + return str(self._millis) + " ms" + + def milliseconds(self): + """ Return millisecond """ + return self._millis + + def max(self, other): + """ Return higher Time """ + Time._is_time(other) + if self > other: + return self + else: + return other + + def min(self, other): + """ Return lower Time """ + Time._is_time(other) + if self < other: + return self + else: + return other + + def __add__(self, other): + """ Add Time and Time """ + Duration._is_duration(other) + return Time(self._millis + other._millis) + + def __sub__(self, other): + """ Subtract Time by Duration or Time """ + if isinstance(other, Duration): + return Time(self._millis - other._millis) + elif isinstance(other, Time): + return Duration(self._mills, other._millis) + else: + raise TypeError + + def __lt__(self, other): + """ Time < Time """ + Time._is_time(other) + return self._millis < other._millis + + def __le__(self, other): + """ Time <= Time """ + Time._is_time(other) + return self.millis <= other._millis + + def __eq__(self, other): + """ Time == Time """ + Time._is_time(other) + return self._millis == other._millis + + def __ne__(self, other): + """ Time != Time """ + Time._is_time(other) + return self._millis != other._millis + + def __gt__(self, other): + """ Time > Time """ + Time._is_time(other) + return self._millis > other._millis + + def __ge__(self, other): + """ Time >= Time """ + Time._is_time(other) + return self._millis >= other._millis + + def isMultipbleOf(duration): + """ is multiple by Duration """ + Duration._is_duration(duration) + return self._millis % duration._millis == 0 + + def until(time, interval): + raise NotImplementedError + + def to(time, interval): + raise NotImplementedError + + @classmethod + def _is_time(self, instance): + """ is instance Time """ + if not isinstance(instance, Time): + raise TypeError diff --git a/python/pyspark/streaming/pyprint.py b/python/pyspark/streaming/pyprint.py new file mode 100644 index 0000000000000..fcdaca510812c --- /dev/null +++ b/python/pyspark/streaming/pyprint.py @@ -0,0 +1,28 @@ +import sys +from itertools import chain +from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer + +def collect(binary_file_path): + dse = PickleSerializer() + with open(binary_file_path, 'rb') as tempFile: + for item in dse.load_stream(tempFile): + yield item +def main(): + try: + binary_file_path = sys.argv[1] + except: + print "Missed FilePath in argement" + + if not binary_file_path: + return + + counter = 0 + for rdd in chain.from_iterable(collect(binary_file_path)): + print rdd + counter = counter + 1 + if counter >= 10: + print "..." + break + +if __name__ =="__main__": + exit(main()) diff --git a/python/pyspark/streaming/utils.py b/python/pyspark/streaming/utils.py new file mode 100644 index 0000000000000..71aa3376c6578 --- /dev/null +++ b/python/pyspark/streaming/utils.py @@ -0,0 +1,18 @@ +__author__ = 'ktakagiw' + +def msDurationToString(ms): + """ + Returns a human-readable string representing a duration such as "35ms" + """ + second = 1000 + minute = 60 * second + hour = 60 * minute + + if ms < second: + return "%d ms" % ms + elif ms < minute: + return "%.1f s" % (float(ms) / second) + elif ms < hout: + return "%.1f m" % (float(ms) / minute) + else: + return "%.2f h" % (float(ms) / hour) diff --git a/streaming/pom.xml b/streaming/pom.xml index 12f900c91eb98..483e200ff9f16 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -77,9 +77,9 @@ org.scalatest scalatest-maven-plugin - -