Skip to content

Commit

Permalink
edited the comment to add more precise description
Browse files Browse the repository at this point in the history
  • Loading branch information
giwa committed Sep 20, 2014
1 parent af610d3 commit 953deb0
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 495 deletions.
36 changes: 0 additions & 36 deletions python/pyspark/streaming/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ def _testInputStream(self, test_inputs, numSlices=None):
This implementation is inspired by QueStream implementation.
Give list of RDD to generate DStream which contains the RDD.
"""
<<<<<<< HEAD
test_rdds = list()
test_rdd_deserializers = list()
for test_input in test_inputs:
Expand All @@ -158,38 +157,3 @@ def _testInputStream(self, test_inputs, numSlices=None):
jinput_stream = self._jvm.PythonTestInputStream(self._jssc, jtest_rdds).asJavaDStream()

return DStream(jinput_stream, self, test_rdd_deserializers[0])
=======
self._jssc.checkpoint(directory)

def _testInputStream(self, test_inputs, numSlices=None):
"""
Generate multiple files to make "stream" in Scala side for test.
Scala chooses one of the files and generates RDD using PythonRDD.readRDDFromFile.
"""
numSlices = numSlices or self._sc.defaultParallelism
# Calling the Java parallelize() method with an ArrayList is too slow,
# because it sends O(n) Py4J commands. As an alternative, serialized
# objects are written to a file and loaded through textFile().

tempFiles = list()
for test_input in test_inputs:
tempFile = NamedTemporaryFile(delete=False, dir=self._sc._temp_dir)

# Make sure we distribute data evenly if it's smaller than self.batchSize
if "__len__" not in dir(test_input):
c = list(test_input) # Make it a list so we can compute its length
batchSize = min(len(test_input) // numSlices, self._sc._batchSize)
if batchSize > 1:
serializer = BatchedSerializer(self._sc._unbatched_serializer,
batchSize)
else:
serializer = self._sc._unbatched_serializer
serializer.dump_stream(test_input, tempFile)
tempFiles.append(tempFile.name)

jtempFiles = ListConverter().convert(tempFiles, SparkContext._gateway._gateway_client)
jinput_stream = self._jvm.PythonTestInputStream(self._jssc,
jtempFiles,
numSlices).asJavaDStream()
return DStream(jinput_stream, self, PickleSerializer())
>>>>>>> added basic operation test cases
Loading

0 comments on commit 953deb0

Please sign in to comment.