Spark Job Server supports Python jobs through a Python specific context factory
spark.jobserver.python.PythonSparkContextFactory
. See the Contexts documentation
for information on contexts.
The PythonSparkContextFactory
class is part of job-server-extras
, therefore it is the assembly jar of that sub-module
which should be used (this is the default for deployment anyway).
The application config can be configured with paths to be added to the PYTHONPATH
environment variable when the python
subprocess which runs the jobs is executed. At a minimum, it should include the path of the Job Server API Egg file and
the location of the Spark python libraries (which are most conveniently referenced in relation to SPARK_HOME
.
By default python jobs are executed by calling an executable named python
. In some cases, an alternative executable
may be desired, for example to use Python 3 or to use an executable with an absolute path rather than assuming the
executable can be located on the PATH
.
Python binaries are only supported in the SQL flavour of JobDAO
therefore it is also necessary to specify this in the config.
A basic config supporting Python might look like:
spark {
jobserver {
jobdao = spark.jobserver.io.JobSqlDAO
}
context-settings {
python {
paths = [
${SPARK_HOME}/python,
"/home/user/spark-jobserver/job-server-extras/job-server-python/target/python/spark_jobserver_python-0.7.0_SNAPSHOT-py2.7.egg"
]
# The default value in application.conf is "python"
executable = "python3"
}
}
}
There are two python packages which need to be provided, py4j
and pyhocon
. These can either be installed into the
python instance to be used for running the jobs, or added to the list of paths in the config.
In development, you can run a Python ready job server locally with the following reStart
task:
job-server-extras/reStart ../config/python-example.conf.template
When developing a Java/Scala job, it is necessary to implement a specific Job Trait in order that the ContextFactory receives an object of the correct class. However Python does not have the same strong typing so any Python class which implements the methods expected by the Python bootstrapping code would be sufficient.
The interface to conform to is shown by the SparkJob
python class:
class SparkJob:
def __init__(self):
pass
def validate(self, context, runtime, config):
"""
This method is called by the job server to allow jobs to validate their
input and reject invalid job requests.
:param context: the context to be used for the job. Could be a
SparkContext, SQLContext, HiveContext etc.
May be reused across jobs.
:param runtime: the JobEnvironment containing run time information
pertaining to the job and context.
:param config: the HOCON config object passed into the job request
:return: either JobData, which is parsed from config, or a list of
validation problems.
"""
raise NotImplementedError("Concrete implementations should override validate")
def run_job(self, context, runtime, data):
"""
Entry point for the execution of a job
:param context: the context to be used for the job.
SparkContext, SQLContext, HiveContext etc.
May be reused across jobs
:param runtime: the JobEnvironment containing run time information
pertaining to the job and context.
:param data: the JobData returned by the validate method
:return: the job result OR a list of ValidationProblem objects.
"""
raise NotImplementedError("Concrete implementations should override run_job")
It is possible but not necessary to override this class when providing a conforming job class. When returning a list of
validation problems, it is necessary to return instances of sparkjobserver.api.ValidationProblem
since otherwise there
is no way to differentiate between validation errors and valid job data. Instances of ValidationProblem
can be built
from the build_problems
utility method. Consider the following basic implementation of a Python job:
from sparkjobserver.api import SparkJob, build_problems
class WordCountSparkJob(SparkJob):
def validate(self, context, runtime, config):
if config.get('input.strings', None):
return config.get('input.strings')
else:
return build_problems(['config input.strings not found'])
def run_job(self, context, runtime, data):
return context.parallelize(data).countByValue()
Due to job results needing to be converted from Python objects to objects which are serializable by Spray JSON, only a defined set of return types are supported:
- Primitive types of boolean, byte, char, short, int, long, float, double and string.
- Python dicts, which end up as Scala Maps.
- Python lists which end up as Scala Lists.
The supported collection types of Map and List can be nested, i.e. a return type of Map[String, List[Int]] would be fine.
In order to be able to push a job to the the job server, it must be packaged into a Python Egg file. Similar to a Jar, this is just a Zip file with a particular internal structure. Eggs can be built using the setuptools library.
In the most basic setup, a job ready for packaging would be structured as:
- setup.py Contains the configuration for packaging the job
- my_job_package A directory, the name of which is the name of the module containing your job(s)
- __init__.py A file inside the module directory, which contains the python implementation of one or more job classes.
setup.py
would contain something like:
from setuptools import setup,
setup(
name='my_job_package',
packages=['my_job_package']
)
Then, running python setup.py bdist_egg
will create a file dist/my_job_package-0.0.0-py2.7.egg
.
If Spark Job Server is running with Python support, A Python context can be started with, for example:
curl -X POST "localhost:8090/contexts/py-context?context-factory=spark.jobserver.python.PythonSparkContextFactory"
Whereas Java and Scala jobs are packaged as Jar files, Python jobs need to be packaged as Egg
files. A set of example jobs
can be build using the job-server-python/
sbt task job-server-python/buildPyExamples
. this builds an examples Egg
in job-server-python/target/python
so we could push this to the server as a job binary:
curl --data-binary @dist/my_job_package-0.0.0-py2.7.egg \
-H 'Content-Type: application/python-archive' localhost:8090/binaries/my_py_job
Then, running a Python job is similar to running other job types:
curl -d 'input.strings = ["a", "b", "a", "b" ]' \
"localhost:8090/jobs?appName=my_py_job&classPath=my_job_package.WordCountSparkJob&context=py-context"
curl "localhost:8090/jobs/<job-id>"
Python support is also available for SQLContext
and HiveContext
. Simply launch a context using
spark.jobserver.python.PythonSQLContextFactory
or spark.jobserver.python.PythonHiveContextFactory
. For example:
curl -X POST "localhost:8090/contexts/pysql-context?context-factory=spark.jobserver.python.PythonSQLContextFactory"
When implementing the Python job, you can simply assume that the context
argument to validate
and run_job
is of the appropriate type. Due to dynamic typing in Python, this is not enforced in the method definitions. For example:
class SQLAverageJob(SparkJob):
def validate(self, context, runtime, config):
problems = []
job_data = None
if not isinstance(context, SQLContext):
problems.append('Expected a SQL context')
if config.get('input.data', None):
job_data = config.get('input.data')
else:
problems.append('config input.data not found')
if len(problems) == 0:
return job_data
else:
return build_problems(problems)
def run_job(self, context, runtime, data):
rdd = context._sc.parallelize(data)
df = context.createDataFrame(rdd, ['name', 'age', 'salary'])
df.registerTempTable('people')
query = context.sql("SELECT age, AVG(salary) from people GROUP BY age ORDER BY age")
results = query.collect()
return [ (r[0], r[1]) for r in results]
The above job implementation checks during the validate
stage that the context
object is of the correct type.
Then in run_job
dataframe operations are used, which exist on SQLContext
.
This job is one of the examples so running the sbt task job-server-python/buildPyExamples
and uploading the resulting
Egg makes this job available:
curl --data-binary @job-server-python/target/python/sjs_python_examples-0.7.0_SNAPSHOT-py2.7.egg \
-H 'Content-Type: application/python-archive' localhost:8090/binaries/example_jobs
The input to the job can be provided as a conf file, e.g. with the contents:
input.data = [
["bob", 20, 1200],
["jon", 21, 1400],
["mary", 20, 1300],
["sue", 21, 1600]
]
Then we can submit the SQLContext
based job:
curl -d @sqlinput.conf \
"localhost:8090/jobs?appName=example_jobs&classPath=example_jobs.sql_average.SQLAverageJob&context=pysql-context"
curl "localhost:8090/jobs/<job-id>"
When complete, we get output such as:
{
"duration": "4.685 secs",
"classPath": "example_jobs.sql_average.SQLAverageJob",
"startTime": "2016-08-01T15:15:52.250+01:00",
"context": "pysql-context",
"result": [[20, 1250.0], [21, 1500.0]],
"status": "FINISHED",
"jobId": "05346d12-a84b-40f7-a88d-15765bdd23a4"
}
The Python support can support arbitrary context types as long as they are based on an implementation of the
spark.jobserver.python.PythonContextFactory
Trait. As well as implementing a version of this Trait which yields
contexts of your custom type, your Python jobs which use this context must implement an additional method,
build_context(self, gateway, jvmContext, sparkConf)
, which returns the Python equivalent of the JVM Context object.
For a simple example, see CustomContextJob
in the job-server-python
sub-module.