forked from mozilla/telemetry-airflow
-
Notifications
You must be signed in to change notification settings - Fork 0
/
airflow.sh
executable file
·131 lines (116 loc) · 3.18 KB
/
airflow.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# to be copied to $AIRFLOW_BUCKET/steps/airflow.sh
set -o verbose
HOME=/home/hadoop
source $HOME/.bashrc
# Error message
error_msg ()
{
echo 1>&2 "Error: $1"
}
# Parse arguments
while [ $# -gt 0 ]; do
case "$1" in
--job-name)
shift
job_name=$1
;;
--user)
shift
user=$1
;;
--uri)
shift
uri=$1
;;
--arguments)
shift
args=$1
;;
--runner-arguments)
shift
runner_args=$1
;;
--data-bucket)
shift
data_bucket=$1
;;
--environment)
shift
environment=$1
;;
-*)
# do not exit out, just note failure
error_msg "unrecognized option: $1"
;;
*)
break;
;;
esac
shift
done
if [ -z "$job_name" ] || [ -z "$user" ] || [ -z "$uri" ] || [ -z "$data_bucket" ]; then
error_msg "missing argument(s)"
exit -1
fi
s3_base="s3://$data_bucket/data/$user/$job_name"
# Wait for Parquet datasets to be loaded
while pgrep -f hive_config.sh | grep -v grep > /dev/null; do sleep 1; done
wd=/mnt/analyses
mkdir -p $wd && cd $wd
mkdir -p output
urldecode() {
local url_encoded="${1//+/ }"
printf '%b' "${url_encoded//%/\\x}"
}
# Download file
if [[ $uri == s3://* ]]; then
aws s3 cp "$uri" .
elif [[ $uri =~ ^https?.*$ ]]; then
uri=$(urldecode $uri)
wget -N "$uri"
fi
# Run job
job="${uri##*/}"
cd $wd
if [[ $uri == *.jar ]]; then
time env $environment spark-submit $runner_args --master yarn-client "./$job" $args
rc=$?
elif [[ $uri == *.ipynb ]]; then
time env $environment \
PYSPARK_DRIVER_PYTHON=jupyter \
PYSPARK_DRIVER_PYTHON_OPTS="nbconvert --ExecutePreprocessor.kernel_name=python2 --ExecutePreprocessor.timeout=-1 --to notebook --log-level=10 --execute \"./${job}\" --allow-errors --output-dir ./output/ " \
pyspark
rc=$?
# When nbconvert is called with --allow-errors there's no way to detect if a cell raised an exception.
# Grepping the output notebook looking for a 'error' output type does the trick.
if [ $rc != 0 ] || [ "`grep '\"output_type\": \"error\"' ./output/${job}`" ] ;then
# If an error is detected, print out the notebook in a human readable (markdown) format.
PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS="nbconvert --to markdown --stdout \"./output/${job}\"" pyspark
rc=1
fi
elif [[ $uri == *.py ]]; then
time env $environment \
PYSPARK_DRIVER_PYTHON=/mnt/anaconda2/bin/python PYSPARK_DRIVER_PYTHON_OPTS= spark-submit \
$runner_args --master yarn-client "./$job" $args
rc=$?
else
chmod +x "./$job"
time env $environment "./$job" $args
rc=$?
fi
# Upload output files
cd $wd/output
find . -iname "*" -type f | while read f
do
# Remove the leading "./"
f=$(sed -e "s/^\.\///" <<< $f)
echo $f
upload_cmd="aws s3 cp './$f' '$s3_base/$f'"
if [[ "$f" == *.gz ]]; then
upload_cmd="$upload_cmd --content-encoding gzip"
fi
eval $upload_cmd
done
if [[ $rc != 0 ]]; then
exit $rc;
fi