From dfc6b3ca10fe5762f31910d90d2cac44e64f80a1 Mon Sep 17 00:00:00 2001 From: Kyle Sun Date: Sat, 20 May 2017 22:51:39 -0700 Subject: [PATCH 1/3] Support cluster mode in PySpark Tested w/ Spark 2.1 --- luigi/contrib/spark.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/luigi/contrib/spark.py b/luigi/contrib/spark.py index 2e139da970..bd8a53710e 100644 --- a/luigi/contrib/spark.py +++ b/luigi/contrib/spark.py @@ -237,8 +237,6 @@ class PySparkTask(SparkSubmitTask): # Path to the pyspark program passed to spark-submit app = os.path.join(os.path.dirname(__file__), 'pyspark_runner.py') - # Python only supports the client deploy mode, force it - deploy_mode = "client" @property def name(self): @@ -250,6 +248,11 @@ def py_packages(self): if packages: return map(lambda s: s.strip(), packages.split(',')) + @property + def files(self): + if self.deploy_mode == "cluster": + return [self.run_pickle] + def setup(self, conf): """ Called by the pyspark_runner with a SparkConf instance that will be used to instantiate the SparkContext @@ -269,11 +272,12 @@ def main(self, sc, *args): """ raise NotImplementedError("subclass should define a main method") - def program_args(self): - return self.spark_command() + self.app_command() - def app_command(self): - return [self.app, self.run_pickle] + self.app_options() + if self.deploy_mode == "cluster": + pickle_loc = os.path.basename(self.run_pickle) + else: + pickle_loc = self.run_pickle + return [self.app, pickle_loc] + self.app_options() def run(self): self.run_path = tempfile.mkdtemp(prefix=self.name) From e34e3db9e915b3af43459a95424b9f07de8b2b3b Mon Sep 17 00:00:00 2001 From: Kyle Sun Date: Tue, 1 Aug 2017 22:30:18 -0700 Subject: [PATCH 2/3] Fix tests --- test/contrib/spark_test.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/test/contrib/spark_test.py b/test/contrib/spark_test.py index 75dc1e38f2..ad28ac475b 100644 --- a/test/contrib/spark_test.py +++ b/test/contrib/spark_test.py @@ -188,7 +188,7 @@ def interrupt(): class PySparkTaskTest(unittest.TestCase): ss = 'ss-stub' - @with_config({'spark': {'spark-submit': ss, 'master': "spark://host:7077"}}) + @with_config({'spark': {'spark-submit': ss, 'master': "spark://host:7077", 'deploy-mode': 'client'}}) @patch('luigi.contrib.external_program.subprocess.Popen') def test_run(self, proc): setup_run_process(proc) @@ -199,7 +199,7 @@ def test_run(self, proc): self.assertTrue(os.path.exists(proc_arg_list[7])) self.assertTrue(proc_arg_list[8].endswith('TestPySparkTask.pickle')) - @with_config({'spark': {'spark-submit': ss, 'master': "spark://host:7077"}}) + @with_config({'spark': {'spark-submit': ss, 'master': "spark://host:7077", 'deploy-mode': 'client'}}) @patch('luigi.contrib.external_program.subprocess.Popen') def test_run_with_pickle_dump(self, proc): setup_run_process(proc) @@ -211,6 +211,17 @@ def test_run_with_pickle_dump(self, proc): self.assertTrue(os.path.exists(proc_arg_list[7])) self.assertTrue(proc_arg_list[8].endswith('TestPySparkTask.pickle')) + @with_config({'spark': {'spark-submit': ss, 'master': "spark://host:7077", 'deploy-mode': 'cluster'}}) + @patch('luigi.contrib.external_program.subprocess.Popen') + def test_run_with_cluster(self, proc): + setup_run_process(proc) + job = TestPySparkTask() + job.run() + proc_arg_list = proc.call_args[0][0] + self.assertEqual(proc_arg_list[0:7], ['ss-stub', '--master', 'spark://host:7077', '--deploy-mode', 'cluster', '--name', 'TestPySparkTask']) + self.assertTrue(os.path.exists(proc_arg_list[7])) + self.assertTrue(proc_arg_list[8].endswith('TestPySparkTask.pickle')) + @patch.dict('sys.modules', {'pyspark': MagicMock()}) @patch('pyspark.SparkContext') def test_pyspark_runner(self, spark_context): From 7bb74436f34857e17e9c7de94fe1e1ec07fa6c41 Mon Sep 17 00:00:00 2001 From: Kyle Sun Date: Wed, 2 Aug 2017 11:31:23 -0700 Subject: [PATCH 3/3] Fix test for cluster mode --- test/contrib/spark_test.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/test/contrib/spark_test.py b/test/contrib/spark_test.py index ad28ac475b..3bdaa04407 100644 --- a/test/contrib/spark_test.py +++ b/test/contrib/spark_test.py @@ -218,9 +218,10 @@ def test_run_with_cluster(self, proc): job = TestPySparkTask() job.run() proc_arg_list = proc.call_args[0][0] - self.assertEqual(proc_arg_list[0:7], ['ss-stub', '--master', 'spark://host:7077', '--deploy-mode', 'cluster', '--name', 'TestPySparkTask']) - self.assertTrue(os.path.exists(proc_arg_list[7])) + self.assertEqual(proc_arg_list[0:8], ['ss-stub', '--master', 'spark://host:7077', '--deploy-mode', 'cluster', '--name', 'TestPySparkTask', '--files']) self.assertTrue(proc_arg_list[8].endswith('TestPySparkTask.pickle')) + self.assertTrue(os.path.exists(proc_arg_list[9])) + self.assertEqual('TestPySparkTask.pickle', proc_arg_list[10]) @patch.dict('sys.modules', {'pyspark': MagicMock()}) @patch('pyspark.SparkContext')