- Docker
- on Linux
- on Windows with Ubuntu on WSL. Instruction is here
- 6 cores, 16 GB RAM for Spark cluster
- 6 cores, 32 GB RAM for Spark cluster + Airflow
Data transformation tasks (1-4) and schema description:
- data_transformation_task_description.txt
Inputs:
- data/input/tables/accounts/*.parquet
- data/input/tables/country_abbreviation/*.parquet
- data/input/tables/transactions/*.parquet
Outputs:
- data/output/df/task.../...
- data/output/sql/task.../...
Expected outputs:
- src/test/task1/expected_output/..
- src/test/task../expected_output/..
Code:
-
src/main/pyspark_task.py - dataframes and sql definition
-
src/main/pyspark_task_validator.py - module to invoke and test dataframes and sql definition
-
src/main/resources/sql/.. - sql files with the same logic as for dataframes
-
src/main/web/.. - web UI on flask for task invocation
-
src/test/test_app.py - all tests definition
-
docker/start-docker.sh - file to start project using bash commands.
- First parameter can have values spark,airflow,all used to start only spark/airflow or both.
- Second parameter can have values y,n, used to build image or not.
- Third parameter can have values y,n, used to start test or not.
-
bash/... other files are related to the spark env config
Summary: Use spark sql and dataframes API for data processing. Implement all tasks described in data_transformation_task_description.txt.
- Write sql code in all src/main/resources/sql/task*/
- Write pyspark code for all dataframes in pyspark_task.py
- Check how we can invoke subsets of tests for
- Data Frame
- SQLs
- Task group
- Particular Task
- Make sure that all test passed
- Option1: Run all tests using prepared bash script
./docker/start-docker.sh spark y y
- Option2: Connect to master node and run all tests
docker container exec -it py_spark_test_tasks-spark-master-1 /bin/bash pytest /opt/spark-apps/test
- Option3: Run one test for task (for debug/development)
pytest /opt/spark-apps/test/test_app.py --task_type sql --task_group_id 2 --task_id 1 --skip-xfail pytest /opt/spark-apps/test/test_app.py --task_type df --task_group_id 2 --task_id 1 --skip-xfail
- Understand implementation of config and tests for pytest ( conftest.py, test_app.py )
-
Fix bugs in current implementation as it doesn't work as expected.
Command below need to run only specific tests + technical one (marked as Failed + test_fn_run_task_group_sql).
So in the data/output you should find only data/output/sql/task2/2.1... and data/output/sql/task1 (executed by test_fn_run_task_group_sql).pytest /opt/spark-apps/test/test_app.py --task_type sql --task_group_id 2 --task_id 1
-
Add parameter to skip tests that marked with marks=pytest.mark.xfail.
pytest /opt/spark-apps/test/test_app.py --task_type sql --task_group_id 2 --task_id 1 --skip-xfail
-
Mark all tests in test_app.py except test_task_data as technical
-
Add parameter --skip-technical to skip tests that marked technical.
So in the data/output you should find only one folder data/output/sql/task2/2.1... if you implemented all correctly
pytest /opt/spark-apps/test/test_app.py --task_type sql --task_group_id 2 --task_id 1 --skip-technical
-
Add parameter --deselect-technical to deselect tests that marked technical.
So in the data/output you should find only one folder data/output/sql/task2/2.1... if you implemented all correctly
pytest /opt/spark-apps/test/test_app.py --task_type sql --task_group_id 2 --task_id 1 --deselect-technical
-
Summary: Run all tasks using airflow Group DAG.
You need to run Main script /opt/spark-apps/main/pyspark_task.py using Airflow.
User and PWD for AirFlow UI http://localhost:8080/ is airflow/airflow.
- you need to implement Task 1 (Spark API + Spark SQL + pytest)
- Start spark cluster and airflow
if airflow doesn't start you need to clean up your docker images and volumes :
docker compose -f ./docker-compose-spark.yaml -f ./docker-compose-airflow-no-connection-with-spark.yaml up -d
docker rm -f $(docker ps -a -q) docker volume rm $(docker volume ls -q) docker system prune
- Install and configure SSH and Spark submit providers
- Create simple DAG by connecting to the spark master host and running task 1.1
- Create 4 group dags (one per each task)
- Group Dags need to be executed one by one
- Tasks inside group need to be executed in parallel
- Add your code to airflow/dags/docker_spark_dag_with_task_groups.py or create your own DAG
- Check and understand the config, write your own DAG
- If you had issues with the config use next command and check the solution.
- command to create spark cluster + airflow + ssh connection between them
./docker/start-docker.sh all y
- command to connect to any container
docker container exec -it [container_name] /bin/bash
- command to get list of container names
docker compose -f ./docker-compose-spark.yaml -f ./docker-compose-airflow-no-connection-with-spark.yaml ps
- List of bash commands that you need to add to the DAG
# df ## group 1 spark-submit /opt/spark-apps/main/pyspark_task.py -g 1 -t 1 -tt df spark-submit /opt/spark-apps/main/pyspark_task.py -g 1 -t 2 -tt df ## group 2 spark-submit /opt/spark-apps/main/pyspark_task.py -g 2 -t 1 -tt df spark-submit /opt/spark-apps/main/pyspark_task.py -g 2 -t 2 -tt df spark-submit /opt/spark-apps/main/pyspark_task.py -g 2 -t 3 -tt df spark-submit /opt/spark-apps/main/pyspark_task.py -g 2 -t 4 -tt df spark-submit /opt/spark-apps/main/pyspark_task.py -g 2 -t 5 -tt df ## group 3 spark-submit /opt/spark-apps/main/pyspark_task.py -g 3 -t 1 -tt df spark-submit /opt/spark-apps/main/pyspark_task.py -g 3 -t 2 -tt df spark-submit /opt/spark-apps/main/pyspark_task.py -g 3 -t 3 -tt df spark-submit /opt/spark-apps/main/pyspark_task.py -g 3 -t 4 -tt df spark-submit /opt/spark-apps/main/pyspark_task.py -g 3 -t 5 -tt df ## group 4 spark-submit /opt/spark-apps/main/pyspark_task.py -g 4 -t 1 -tt df spark-submit /opt/spark-apps/main/pyspark_task.py -g 4 -t 2 -tt df spark-submit /opt/spark-apps/main/pyspark_task.py -g 4 -t 3 -tt df # sql ## group 1 spark-submit /opt/spark-apps/main/pyspark_task.py -g 1 -t 1 -tt sql spark-submit /opt/spark-apps/main/pyspark_task.py -g 1 -t 2 -tt sql ## group 2 spark-submit /opt/spark-apps/main/pyspark_task.py -g 2 -t 1 -tt sql spark-submit /opt/spark-apps/main/pyspark_task.py -g 2 -t 2 -tt sql spark-submit /opt/spark-apps/main/pyspark_task.py -g 2 -t 3 -tt sql spark-submit /opt/spark-apps/main/pyspark_task.py -g 2 -t 4 -tt sql spark-submit /opt/spark-apps/main/pyspark_task.py -g 2 -t 5 -tt sql ## group 3 spark-submit /opt/spark-apps/main/pyspark_task.py -g 3 -t 1 -tt sql spark-submit /opt/spark-apps/main/pyspark_task.py -g 3 -t 2 -tt sql spark-submit /opt/spark-apps/main/pyspark_task.py -g 3 -t 3 -tt sql spark-submit /opt/spark-apps/main/pyspark_task.py -g 3 -t 4 -tt sql spark-submit /opt/spark-apps/main/pyspark_task.py -g 3 -t 5 -tt sql ## group 4 spark-submit /opt/spark-apps/main/pyspark_task.py -g 4 -t 1 -tt sql spark-submit /opt/spark-apps/main/pyspark_task.py -g 4 -t 2 -tt sql spark-submit /opt/spark-apps/main/pyspark_task.py -g 4 -t 3 -tt sql
Summary: Implement modules specified below by yourself
- Create own data comparison framework (write your own pyspark_task_validator.py)
- Test created all transformations for SQL and Dataframe api using pytest-spark (write your own test_app.py)
- Add possibility to run tests only for particular criteria (group, task, and skip tests marked as failed)
- Add logging to all your functions using decorators(write your own project_logs.py)
Summary: Create UI using flask for execution bash commands in pyspark.
- You need to write code in src/main/web/app.py and src/main/web/templates/main.html
- Flask app need to be accessible from http://localhost:8000/run_task
- You should have ability to
- Choose task from drop down list
- Choose method of execution (sql, dataframe or both) from drop down list
- Button to start execution
- See logs generated by your script in real time on your web page
- For details how to run commands on docker refer to section below "How to work with project"
- List of bash commands that you need to execute using Flask UI (one at a time)
# df
## group 1
spark-submit /opt/spark-apps/main/pyspark_task.py -g 1 -t 1 -tt df
spark-submit /opt/spark-apps/main/pyspark_task.py -g 1 -t 2 -tt df
## group 2
spark-submit /opt/spark-apps/main/pyspark_task.py -g 2 -t 1 -tt df
spark-submit /opt/spark-apps/main/pyspark_task.py -g 2 -t 2 -tt df
spark-submit /opt/spark-apps/main/pyspark_task.py -g 2 -t 3 -tt df
spark-submit /opt/spark-apps/main/pyspark_task.py -g 2 -t 4 -tt df
spark-submit /opt/spark-apps/main/pyspark_task.py -g 2 -t 5 -tt df
## group 3
spark-submit /opt/spark-apps/main/pyspark_task.py -g 3 -t 1 -tt df
spark-submit /opt/spark-apps/main/pyspark_task.py -g 3 -t 2 -tt df
spark-submit /opt/spark-apps/main/pyspark_task.py -g 3 -t 3 -tt df
spark-submit /opt/spark-apps/main/pyspark_task.py -g 3 -t 4 -tt df
spark-submit /opt/spark-apps/main/pyspark_task.py -g 3 -t 5 -tt df
## group 4
spark-submit /opt/spark-apps/main/pyspark_task.py -g 4 -t 1 -tt df
spark-submit /opt/spark-apps/main/pyspark_task.py -g 4 -t 2 -tt df
spark-submit /opt/spark-apps/main/pyspark_task.py -g 4 -t 3 -tt df
# sql
## group 1
spark-submit /opt/spark-apps/main/pyspark_task.py -g 1 -t 1 -tt sql
spark-submit /opt/spark-apps/main/pyspark_task.py -g 1 -t 2 -tt sql
## group 2
spark-submit /opt/spark-apps/main/pyspark_task.py -g 2 -t 1 -tt sql
spark-submit /opt/spark-apps/main/pyspark_task.py -g 2 -t 2 -tt sql
spark-submit /opt/spark-apps/main/pyspark_task.py -g 2 -t 3 -tt sql
spark-submit /opt/spark-apps/main/pyspark_task.py -g 2 -t 4 -tt sql
spark-submit /opt/spark-apps/main/pyspark_task.py -g 2 -t 5 -tt sql
## group 3
spark-submit /opt/spark-apps/main/pyspark_task.py -g 3 -t 1 -tt sql
spark-submit /opt/spark-apps/main/pyspark_task.py -g 3 -t 2 -tt sql
spark-submit /opt/spark-apps/main/pyspark_task.py -g 3 -t 3 -tt sql
spark-submit /opt/spark-apps/main/pyspark_task.py -g 3 -t 4 -tt sql
spark-submit /opt/spark-apps/main/pyspark_task.py -g 3 -t 5 -tt sql
## group 4
spark-submit /opt/spark-apps/main/pyspark_task.py -g 4 -t 1 -tt sql
spark-submit /opt/spark-apps/main/pyspark_task.py -g 4 -t 2 -tt sql
spark-submit /opt/spark-apps/main/pyspark_task.py -g 4 -t 3 -tt sql
Summary: Implement task 1 and task 2 on any cloud. Idea is to run spark on EMR or DataProc using managed Airflow on AWS or GCP. You can do it in few phases : by using local Airflow on Docker and EMR/DataProc, then by using all Cloud services
Example for GCP you can find in ./cloud/gcp/PySpark_on_GCP_Tutorial.pdf. It has explanation of all steps but uses previous structure of the project, so you will need to amend it to make it work.
-
How to initialize the project :
- Permissions set
chmod -R 755 ./*
- Docker image build
-
Using prepared bash script
./docker/start-docker.sh spark y
-
Using docker commands
docker build -f ./docker/DockerfileSpark --build-arg SPARK_VERSION=3.0.2 --build-arg HADOOP_VERSION=3.2 -t cluster-apache-spark:3.0.2 ./ docker build -f ./docker/DockerfileAirflow -t airflow-with-spark:1.0.0 ./
-
- Permissions set
-
How to run only spark cluster without airflow
- Using prepared bash script
./docker/start-docker.sh spark n
- Using docker commands
docker compose -f ./docker-compose-spark.yaml up -d docker container exec -it py_spark_test_tasks-spark-master-1 /bin/bash
- Using prepared bash script
-
How to run Spark and Airflow (already connected via ssh)
- Using prepared bash script
./docker/start-docker.sh all n
- Using docker commands
docker compose -f ./docker-compose-spark.yaml -f ./docker-compose-airflow.yaml up -d
- Using prepared bash script
-
How to use main script pyspark_task.py:
spark-submit /opt/spark-apps/main/pyspark_task.py -g <GROUP_ID> -t <TASK_ID> -tt <TASK_TYPE>
- GROUP_ID has values from list [1,2,3,4]
- TASK_ID has values 1 from 5, depends on task, not every group task has 5 tasks
- TASK_TYPE has values from list [df,sql]
#Examples spark-submit /opt/spark-apps/main/pyspark_task.py -g 1 -t 1 -tt df spark-submit /opt/spark-apps/main/pyspark_task.py -g 1 -t 1 -tt sql spark-submit /opt/spark-apps/main/pyspark_task.py -g 2 -t 1 -tt df spark-submit /opt/spark-apps/main/pyspark_task.py -g 3 -t 1 -tt sql
-
How to run all tests using bash script:
./bash/start-docker.sh spark n y
- How to run all tests manually:
./bash/start-docker.sh spark n
pytest /opt/spark-apps/test
- How to run all failed tests:
./bash/start-docker.sh spark n f
- Flask App to execute tasks from UI:
- Spark Master UI
- Airflow UI