Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow spark task to load config from different version #2196

Merged
merged 1 commit into from
Aug 25, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 29 additions & 22 deletions luigi/contrib/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,93 +61,100 @@ def app_options(self):
"""
return []

@property
def spark_version(self):
return "spark"

@property
def spark_submit(self):
return configuration.get_config().get('spark', 'spark-submit', 'spark-submit')
return configuration.get_config().get(self.spark_version, 'spark-submit', 'spark-submit')

@property
def master(self):
return configuration.get_config().get("spark", "master", None)
return configuration.get_config().get(self.spark_version, "master", None)

@property
def deploy_mode(self):
return configuration.get_config().get("spark", "deploy-mode", None)
return configuration.get_config().get(self.spark_version, "deploy-mode", None)

@property
def jars(self):
return self._list_config(configuration.get_config().get("spark", "jars", None))
return self._list_config(configuration.get_config().get(self.spark_version, "jars", None))

@property
def packages(self):
return self._list_config(configuration.get_config().get("spark", "packages", None))
return self._list_config(configuration.get_config().get(
self.spark_version, "packages", None))

@property
def py_files(self):
return self._list_config(configuration.get_config().get("spark", "py-files", None))
return self._list_config(configuration.get_config().get(
self.spark_version, "py-files", None))

@property
def files(self):
return self._list_config(configuration.get_config().get("spark", "files", None))
return self._list_config(configuration.get_config().get(self.spark_version, "files", None))

@property
def conf(self):
return self._dict_config(configuration.get_config().get("spark", "conf", None))
return self._dict_config(configuration.get_config().get(self.spark_version, "conf", None))

@property
def properties_file(self):
return configuration.get_config().get("spark", "properties-file", None)
return configuration.get_config().get(self.spark_version, "properties-file", None)

@property
def driver_memory(self):
return configuration.get_config().get("spark", "driver-memory", None)
return configuration.get_config().get(self.spark_version, "driver-memory", None)

@property
def driver_java_options(self):
return configuration.get_config().get("spark", "driver-java-options", None)
return configuration.get_config().get(self.spark_version, "driver-java-options", None)

@property
def driver_library_path(self):
return configuration.get_config().get("spark", "driver-library-path", None)
return configuration.get_config().get(self.spark_version, "driver-library-path", None)

@property
def driver_class_path(self):
return configuration.get_config().get("spark", "driver-class-path", None)
return configuration.get_config().get(self.spark_version, "driver-class-path", None)

@property
def executor_memory(self):
return configuration.get_config().get("spark", "executor-memory", None)
return configuration.get_config().get(self.spark_version, "executor-memory", None)

@property
def driver_cores(self):
return configuration.get_config().get("spark", "driver-cores", None)
return configuration.get_config().get(self.spark_version, "driver-cores", None)

@property
def supervise(self):
return bool(configuration.get_config().get("spark", "supervise", False))
return bool(configuration.get_config().get(self.spark_version, "supervise", False))

@property
def total_executor_cores(self):
return configuration.get_config().get("spark", "total-executor-cores", None)
return configuration.get_config().get(self.spark_version, "total-executor-cores", None)

@property
def executor_cores(self):
return configuration.get_config().get("spark", "executor-cores", None)
return configuration.get_config().get(self.spark_version, "executor-cores", None)

@property
def queue(self):
return configuration.get_config().get("spark", "queue", None)
return configuration.get_config().get(self.spark_version, "queue", None)

@property
def num_executors(self):
return configuration.get_config().get("spark", "num-executors", None)
return configuration.get_config().get(self.spark_version, "num-executors", None)

@property
def archives(self):
return self._list_config(configuration.get_config().get("spark", "archives", None))
return self._list_config(configuration.get_config().get(
self.spark_version, "archives", None))

@property
def hadoop_conf_dir(self):
return configuration.get_config().get("spark", "hadoop-conf-dir", None)
return configuration.get_config().get(self.spark_version, "hadoop-conf-dir", None)

def get_environment(self):
env = os.environ.copy()
Expand Down