Skip to content

Commit

Permalink
Allow spark to load config from different version (#2196)
Browse files Browse the repository at this point in the history
  • Loading branch information
interskh authored and Tarrasch committed Aug 25, 2017
1 parent 6e084b5 commit 89e7287
Showing 1 changed file with 29 additions and 22 deletions.
51 changes: 29 additions & 22 deletions luigi/contrib/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,93 +61,100 @@ def app_options(self):
"""
return []

@property
def spark_version(self):
return "spark"

@property
def spark_submit(self):
return configuration.get_config().get('spark', 'spark-submit', 'spark-submit')
return configuration.get_config().get(self.spark_version, 'spark-submit', 'spark-submit')

@property
def master(self):
return configuration.get_config().get("spark", "master", None)
return configuration.get_config().get(self.spark_version, "master", None)

@property
def deploy_mode(self):
return configuration.get_config().get("spark", "deploy-mode", None)
return configuration.get_config().get(self.spark_version, "deploy-mode", None)

@property
def jars(self):
return self._list_config(configuration.get_config().get("spark", "jars", None))
return self._list_config(configuration.get_config().get(self.spark_version, "jars", None))

@property
def packages(self):
return self._list_config(configuration.get_config().get("spark", "packages", None))
return self._list_config(configuration.get_config().get(
self.spark_version, "packages", None))

@property
def py_files(self):
return self._list_config(configuration.get_config().get("spark", "py-files", None))
return self._list_config(configuration.get_config().get(
self.spark_version, "py-files", None))

@property
def files(self):
return self._list_config(configuration.get_config().get("spark", "files", None))
return self._list_config(configuration.get_config().get(self.spark_version, "files", None))

@property
def conf(self):
return self._dict_config(configuration.get_config().get("spark", "conf", None))
return self._dict_config(configuration.get_config().get(self.spark_version, "conf", None))

@property
def properties_file(self):
return configuration.get_config().get("spark", "properties-file", None)
return configuration.get_config().get(self.spark_version, "properties-file", None)

@property
def driver_memory(self):
return configuration.get_config().get("spark", "driver-memory", None)
return configuration.get_config().get(self.spark_version, "driver-memory", None)

@property
def driver_java_options(self):
return configuration.get_config().get("spark", "driver-java-options", None)
return configuration.get_config().get(self.spark_version, "driver-java-options", None)

@property
def driver_library_path(self):
return configuration.get_config().get("spark", "driver-library-path", None)
return configuration.get_config().get(self.spark_version, "driver-library-path", None)

@property
def driver_class_path(self):
return configuration.get_config().get("spark", "driver-class-path", None)
return configuration.get_config().get(self.spark_version, "driver-class-path", None)

@property
def executor_memory(self):
return configuration.get_config().get("spark", "executor-memory", None)
return configuration.get_config().get(self.spark_version, "executor-memory", None)

@property
def driver_cores(self):
return configuration.get_config().get("spark", "driver-cores", None)
return configuration.get_config().get(self.spark_version, "driver-cores", None)

@property
def supervise(self):
return bool(configuration.get_config().get("spark", "supervise", False))
return bool(configuration.get_config().get(self.spark_version, "supervise", False))

@property
def total_executor_cores(self):
return configuration.get_config().get("spark", "total-executor-cores", None)
return configuration.get_config().get(self.spark_version, "total-executor-cores", None)

@property
def executor_cores(self):
return configuration.get_config().get("spark", "executor-cores", None)
return configuration.get_config().get(self.spark_version, "executor-cores", None)

@property
def queue(self):
return configuration.get_config().get("spark", "queue", None)
return configuration.get_config().get(self.spark_version, "queue", None)

@property
def num_executors(self):
return configuration.get_config().get("spark", "num-executors", None)
return configuration.get_config().get(self.spark_version, "num-executors", None)

@property
def archives(self):
return self._list_config(configuration.get_config().get("spark", "archives", None))
return self._list_config(configuration.get_config().get(
self.spark_version, "archives", None))

@property
def hadoop_conf_dir(self):
return configuration.get_config().get("spark", "hadoop-conf-dir", None)
return configuration.get_config().get(self.spark_version, "hadoop-conf-dir", None)

def get_environment(self):
env = os.environ.copy()
Expand Down

0 comments on commit 89e7287

Please sign in to comment.