Skip to content

Commit

Permalink
clean up code
Browse files Browse the repository at this point in the history
  • Loading branch information
Ken Takagiwa authored and giwa committed Sep 20, 2014
1 parent 72b9738 commit bab31c1
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 98 deletions.
41 changes: 21 additions & 20 deletions python/pyspark/streaming/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@
from pyspark.storagelevel import *
from pyspark.rdd import RDD
from pyspark.context import SparkContext
from pyspark.streaming.dstream import DStream

from py4j.java_collections import ListConverter

from pyspark.streaming.dstream import DStream

class StreamingContext(object):
"""
Main entry point for Spark Streaming functionality. A StreamingContext represents the
connection to a Spark cluster, and can be used to create L{RDD}s and
connection to a Spark cluster, and can be used to create L{DStream}s and
broadcast variables on that cluster.
"""

Expand Down Expand Up @@ -71,34 +71,35 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
def _initialize_context(self, jspark_context, jduration):
return self._jvm.JavaStreamingContext(jspark_context, jduration)

def actorStream(self, props, name, storageLevel, supervisorStrategy):
raise NotImplementedError

def addStreamingListener(self, streamingListener):
raise NotImplementedError
def start(self):
"""
Start the execution of the streams.
"""
self._jssc.start()

def awaitTermination(self, timeout=None):
"""
Wait for the execution to stop.
"""
if timeout:
self._jssc.awaitTermination(timeout)
else:
self._jssc.awaitTermination()

# start from simple one. storageLevel is not passed for now.
def socketTextStream(self, hostname, port):
"""
Create an input from TCP source hostname:port. Data is received using
a TCP socket and receive byte is interpreted as UTF8 encoded '\n' delimited
lines.
"""
return DStream(self._jssc.socketTextStream(hostname, port), self, UTF8Deserializer())

def start(self):
self._jssc.start()

def stop(self, stopSparkContext=True):
raise NotImplementedError

def textFileStream(self, directory):
"""
Create an input stream that monitors a Hadoop-compatible file system
for new files and reads them as text files. Files must be wrriten to the
monitored directory by "moving" them from another location within the same
file system. FIle names starting with . are ignored.
"""
return DStream(self._jssc.textFileStream(directory), self, UTF8Deserializer())

def transform(self, seq):
raise NotImplementedError

def union(self, seq):
raise NotImplementedError

74 changes: 52 additions & 22 deletions python/pyspark/streaming/dstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
from itertools import chain, ifilter, imap
import operator

import logging

from pyspark.serializers import NoOpSerializer,\
BatchedSerializer, CloudPickleSerializer, pack_long
from pyspark.rdd import _JavaStackTrace
Expand All @@ -25,64 +23,86 @@ def count(self):
"""
#TODO make sure count implementation, thiis different from what pyspark does
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).map(lambda x: (None, 1))
return self._mapPartitions(lambda i: [sum(1 for _ in i)]).map(lambda x: (None, 1))

def _sum(self):
"""
"""
return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
return self._mapPartitions(lambda x: [sum(x)]).reduce(operator.add)

def print_(self):
"""
Since print is reserved name for python, we cannot make a print method function.
This function prints serialized data in RDD in DStream because Scala and Java cannot
deserialized pickled python object. Please use DStream.pyprint() instead to print result.
Call DStream.print().
"""
# print is a reserved name of Python. We cannot give print to function name
#hack to call print function in DStream
getattr(self._jdstream, "print")()

def pyprint(self):
"""
Print the first ten elements of each RDD generated in this DStream. This is an output
operator, so this DStream will be registered as an output stream and there materialized.
"""
self._jdstream.pyprint()

def filter(self, f):
"""
Return DStream containing only the elements that satisfy predicate.
"""
def func(iterator): return ifilter(f, iterator)
return self.mapPartitions(func)
return self._mapPartitions(func)

def flatMap(self, f, preservesPartitioning=False):
"""
Pass each value in the key-value pair DStream through flatMap function
without changing the keys: this also retains the original RDD's partition.
"""
def func(s, iterator): return chain.from_iterable(imap(f, iterator))
return self.mapPartitionsWithIndex(func, preservesPartitioning)
return self._mapPartitionsWithIndex(func, preservesPartitioning)

def map(self, f, preservesPartitioning=False):
def map(self, f):
"""
Return DStream by applying a function to each element of DStream.
"""
def func(iterator): return imap(f, iterator)
return self.mapPartitions(func)
#return PipelinedDStream(self, func, preservesPartitioning)
return self._mapPartitions(func)

def mapPartitions(self, f):
def _mapPartitions(self, f):
"""
Return a new DStream by applying a function to each partition of this DStream.
"""
def func(s, iterator): return f(iterator)
return self.mapPartitionsWithIndex(func)
return self._mapPartitionsWithIndex(func)

def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
def _mapPartitionsWithIndex(self, f, preservesPartitioning=False):
"""
Return a new DStream by applying a function to each partition of this DStream,
While tracking the index of the original partition.
"""
return PipelinedDStream(self, f, preservesPartitioning)

def reduce(self, func, numPartitions=None):

def reduceByKey(self, func, numPartitions=None):
"""
Merge the value for each key using an associative reduce function.
This will also perform the merging locally on each mapper before
sending resuls to reducer, similarly to a "combiner" in MapReduce.
Output will be hash-partitioned with C{numPartitions} partitions, or
the default parallelism level if C{numPartitions} is not specified.
"""
return self.combineByKey(lambda x:x, func, func, numPartitions)

def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
numPartitions = None):
"""
Count the number of elements for each key, and return the result to the
master as a dictionary
"""
if numPartitions is None:
numPartitions = self._defaultReducePartitions()
Expand Down Expand Up @@ -148,20 +168,20 @@ def add_shuffle_key(split, iterator):
dstream._partitionFunc = partitionFunc
return dstream

def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
"""
"""
return PipelinedDStream(self, f, preservesPartitioning)

def _defaultReducePartitions(self):
"""
Returns the default number of partitions to use during reduce tasks (e.g., groupBy).
If spark.default.parallelism is set, then we'll use the value from SparkContext
defaultParallelism, otherwise we'll use the number of partitions in this RDD.
This mirrors the behavior of the Scala Partitioner#defaultPartitioner, intended to reduce
the likelihood of OOMs. Once PySpark adopts Partitioner-based APIs, this behavior will
be inherent.
"""
# hard code to avoid the error
if self.ctx._conf.contains("spark.default.parallelism"):
return self.ctx.defaultParallelism
else:
<<<<<<< HEAD
return 2

def getNumPartitions(self):
Expand All @@ -172,6 +192,16 @@ def getNumPartitions(self):
2
"""
return self._jdstream.partitions().size()
=======
return self.getNumPartitions()

def getNumPartitions(self):
"""
Return the number of partitions in RDD
"""
# TODO: remove hardcoding. RDD has NumPartitions but DStream does not have.
return 2
>>>>>>> clean up code


class PipelinedDStream(DStream):
Expand Down
1 change: 1 addition & 0 deletions python/pyspark/streaming/duration.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from pyspark.streaming import utils


class Duration(object):
"""
Duration for Spark Streaming application. Used to set duration
Expand Down
9 changes: 8 additions & 1 deletion python/pyspark/streaming/pyprint.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,22 @@

from pyspark.serializers import PickleSerializer


def collect(binary_file_path):
"""
Read pickled file written by SparkStreaming
"""
dse = PickleSerializer()
with open(binary_file_path, 'rb') as tempFile:
for item in dse.load_stream(tempFile):
yield item


def main():
try:
binary_file_path = sys.argv[1]
except:
print "Missed FilePath in argement"
print "Missed FilePath in argements"

if not binary_file_path:
return
Expand All @@ -43,5 +49,6 @@ def main():
print "..."
break


if __name__ =="__main__":
exit(main())
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* operator, so this PythonDStream will be registered as an output stream and there materialized.
* This function is for PythonAPI.
*/

//TODO move this function to PythonDStream
def pyprint() = dstream.pyprint()

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,9 @@ DStream[Array[Byte]](prev.ssc){
case Some(rdd)=>Some(rdd)
val pairwiseRDD = new PairwiseRDD(rdd)
/*
* This is equivalent to following python code
* Since python operation is executed by Scala after StreamingContext.start.
* What PairwiseDStream does is equivalent to following python code in pySpark.
*
* with _JavaStackTrace(self.context) as st:
* pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD()
* partitioner = self.ctx._jvm.PythonPartitioner(numPartitions,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -623,37 +623,36 @@ abstract class DStream[T: ClassTag] (
new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()
}

//TODO move pyprint to PythonDStream
//TODO move pyprint to PythonDStream and executed by py4j call back function
/**
* Print the first ten elements of each PythonRDD generated in this PythonDStream. This is an output
* operator, so this PythonDStream will be registered as an output stream and there materialized.
* Since serialized Python object is readable by Python, pyprint writes out binary data to
* temporary file and run python script to deserialized and print the first ten elements
*
* Currently call python script directly. We should avoid this
*/
private[streaming] def pyprint() {
def foreachFunc = (rdd: RDD[T], time: Time) => {
val iter = rdd.take(11).iterator

// make a temporary file
// Generate a temporary file
val prefix = "spark"
val suffix = ".tmp"
val tempFile = File.createTempFile(prefix, suffix)
val tempFileStream = new DataOutputStream(new FileOutputStream(tempFile.getAbsolutePath))
//write out serialized python object
// Write out serialized python object to temporary file
PythonRDD.writeIteratorToStream(iter, tempFileStream)
tempFileStream.close()

// This value has to be passed from python
// Python currently does not do cluster deployment. But what happened
// pythonExec should be passed from python. Move pyprint to PythonDStream
val pythonExec = new ProcessBuilder().environment().get("PYSPARK_PYTHON")
val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME")
//val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/streaming/pyprint.py", tempFile.getAbsolutePath())) // why this fails to compile???
//absolute path to the python script is needed to change because we do not use pysparkstreaming
// Call python script to deserialize and print result in stdout
val pb = new ProcessBuilder(pythonExec, sparkHome + "/python/pyspark/streaming/pyprint.py", tempFile.getAbsolutePath)
val workerEnv = pb.environment()

//envVars also need to be pass
//workerEnv.putAll(envVars)
// envVars also should be pass from python
val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH")
workerEnv.put("PYTHONPATH", pythonPath)
val worker = pb.start()
Expand All @@ -665,7 +664,7 @@ abstract class DStream[T: ClassTag] (
println ("Time: " + time)
println ("-------------------------------------------")

//print value from python std out
// Print values which is from python std out
var line = ""
breakable {
while (true) {
Expand All @@ -674,7 +673,7 @@ abstract class DStream[T: ClassTag] (
println(line)
}
}
//delete temporary file
// Delete temporary file
tempFile.delete()
println()

Expand Down

0 comments on commit bab31c1

Please sign in to comment.