Skip to content

Commit

Permalink
rollback not needed changes
Browse files Browse the repository at this point in the history
  • Loading branch information
davies committed Sep 30, 2014
1 parent e00136b commit eed6e2a
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 71 deletions.
6 changes: 5 additions & 1 deletion bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,11 @@ export PYSPARK_SUBMIT_ARGS
if [[ -n "$SPARK_TESTING" ]]; then
unset YARN_CONF_DIR
unset HADOOP_CONF_DIR
exec "$PYSPARK_PYTHON" $1
if [[ -n "$PYSPARK_DOC_TEST" ]]; then
exec "$PYSPARK_PYTHON" -m doctest $1
else
exec "$PYSPARK_PYTHON" $1
fi
exit
fi

Expand Down
5 changes: 0 additions & 5 deletions python/pyspark/accumulators.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,3 @@ def _start_update_server():
thread.daemon = True
thread.start()
return server


if __name__ == "__main__":
import doctest
doctest.testmod()
5 changes: 0 additions & 5 deletions python/pyspark/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,8 +526,3 @@ def write_int(value, stream):
def write_with_length(obj, stream):
write_int(len(obj), stream)
stream.write(obj)


if __name__ == "__main__":
import doctest
doctest.testmod()
38 changes: 19 additions & 19 deletions python/pyspark/streaming/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,25 @@ def func(a, b):
expected = [[('a', (1, None)), ('b', (2, 3)), ('c', (None, 4))]]
self._test_func(input, func, expected, True, input2)

def update_state_by_key(self):

def updater(it):
for k, vs, s in it:
if not s:
s = vs
else:
s.extend(vs)
yield (k, s)

input = [[('k', i)] for i in range(5)]

def func(dstream):
return dstream.updateStateByKey(updater)

expected = [[0], [0, 1], [0, 1, 2], [0, 1, 2, 3], [0, 1, 2, 3, 4]]
expected = [[('k', v)] for v in expected]
self._test_func(input, func, expected)


class TestWindowFunctions(PySparkStreamingTestCase):

Expand Down Expand Up @@ -398,25 +417,6 @@ def test_reduce_by_invalid_window(self):
self.assertRaises(ValueError, lambda: d1.reduceByKeyAndWindow(None, None, 0.1, 0.1))
self.assertRaises(ValueError, lambda: d1.reduceByKeyAndWindow(None, None, 1, 0.1))

def update_state_by_key(self):

def updater(it):
for k, vs, s in it:
if not s:
s = vs
else:
s.extend(vs)
yield (k, s)

input = [[('k', i)] for i in range(5)]

def func(dstream):
return dstream.updateStateByKey(updater)

expected = [[0], [0, 1], [0, 1, 2], [0, 1, 2, 3], [0, 1, 2, 3, 4]]
expected = [[('k', v)] for v in expected]
self._test_func(input, func, expected)


class TestStreamingContext(PySparkStreamingTestCase):

Expand Down
81 changes: 40 additions & 41 deletions python/run-tests
Original file line number Diff line number Diff line change
Expand Up @@ -48,39 +48,6 @@ function run_test() {
fi
}

function run_core_tests() {
run_test "pyspark/conf.py"
run_test "pyspark/context.py"
run_test "pyspark/broadcast.py"
run_test "pyspark/accumulators.py"
run_test "pyspark/serializers.py"
run_test "pyspark/shuffle.py"
run_test "pyspark/rdd.py"
run_test "pyspark/tests.py"
}

function run_sql_tests() {
run_test "pyspark/sql.py"
}

function run_mllib_tests() {
run_test "pyspark/mllib/util.py"
run_test "pyspark/mllib/linalg.py"
run_test "pyspark/mllib/classification.py"
run_test "pyspark/mllib/clustering.py"
run_test "pyspark/mllib/random.py"
run_test "pyspark/mllib/recommendation.py"
run_test "pyspark/mllib/regression.py"
run_test "pyspark/mllib/stat.py"
run_test "pyspark/mllib/tree.py"
run_test "pyspark/mllib/tests.py"
}

function run_streaming_tests() {
run_test "pyspark/streaming/util.py"
run_test "pyspark/streaming/tests.py"
}

echo "Running PySpark tests. Output is in python/unit-tests.log."

export PYSPARK_PYTHON="python"
Expand All @@ -93,21 +60,53 @@ fi
echo "Testing with Python version:"
$PYSPARK_PYTHON --version

run_core_tests
run_sql_tests
run_mllib_tests
run_streaming_tests
run_test "pyspark/rdd.py"
run_test "pyspark/context.py"
run_test "pyspark/conf.py"
run_test "pyspark/sql.py"
# These tests are included in the module-level docs, and so must
# be handled on a higher level rather than within the python file.
export PYSPARK_DOC_TEST=1
run_test "pyspark/broadcast.py"
run_test "pyspark/accumulators.py"
run_test "pyspark/serializers.py"
unset PYSPARK_DOC_TEST
run_test "pyspark/shuffle.py"
run_test "pyspark/tests.py"
run_test "pyspark/mllib/classification.py"
run_test "pyspark/mllib/clustering.py"
run_test "pyspark/mllib/linalg.py"
run_test "pyspark/mllib/random.py"
run_test "pyspark/mllib/recommendation.py"
run_test "pyspark/mllib/regression.py"
run_test "pyspark/mllib/stat.py"
run_test "pyspark/mllib/tests.py"
run_test "pyspark/mllib/tree.py"
run_test "pyspark/mllib/util.py"
run_test "pyspark/streaming/util.py"
run_test "pyspark/streaming/tests.py"

# Try to test with PyPy
if [ $(which pypy) ]; then
export PYSPARK_PYTHON="pypy"
echo "Testing with PyPy version:"
$PYSPARK_PYTHON --version

run_core_tests
run_sql_tests
run_mllib_tests
run_streaming_tests
run_test "pyspark/rdd.py"
run_test "pyspark/context.py"
run_test "pyspark/conf.py"
run_test "pyspark/sql.py"
# These tests are included in the module-level docs, and so must
# be handled on a higher level rather than within the python file.
export PYSPARK_DOC_TEST=1
run_test "pyspark/broadcast.py"
run_test "pyspark/accumulators.py"
run_test "pyspark/serializers.py"
unset PYSPARK_DOC_TEST
run_test "pyspark/shuffle.py"
run_test "pyspark/tests.py"
run_test "pyspark/streaming/util.py"
run_test "pyspark/streaming/tests.py"
fi

if [[ $FAILED == 0 ]]; then
Expand Down

0 comments on commit eed6e2a

Please sign in to comment.