Skip to content

Commit

Permalink
Fix region handling and allow to use an existing cluster.
Browse files Browse the repository at this point in the history
  • Loading branch information
Gioia Ballin committed Aug 4, 2017
1 parent dbfb560 commit 3b68ef6
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 35 deletions.
11 changes: 8 additions & 3 deletions dataproc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,19 @@ To run list_clusters.py:
python list_clusters.py <YOUR-PROJECT-ID> --region=us-central1


To run create_cluster_and_submit_job, first create a GCS bucket, from the Cloud Console or with
To run submit_job_to_cluster.py, first create a GCS bucket, from the Cloud Console or with
gsutil:

gsutil mb gs://<your-input-bucket-name>

Then run:
Then, if you want to rely on an existing cluster, run:

python create_cluster_and_submit_job.py --project_id=<your-project-id> --zone=us-central1-b --cluster_name=testcluster --gcs_bucket=<your-input-bucket-name>
python submit_job_to_cluster.py --project_id=<your-project-id> --zone=us-central1-b --cluster_name=testcluster --gcs_bucket=<your-input-bucket-name>

Otherwise, if you want the script to create a new cluster for you:

python submit_job_to_cluster.py --project_id=<your-project-id> --zone=us-central1-b --cluster_name=testcluster --gcs_bucket=<your-input-bucket-name> --create_new_cluster


This will setup a cluster, upload the PySpark file, submit the job, print the result, then
delete the cluster.
Expand Down
4 changes: 2 additions & 2 deletions dataproc/dataproc_e2e_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from gcp_devrel.testing.flaky import flaky

import create_cluster_and_submit_job
import submit_job_to_cluster

PROJECT = os.environ['GCLOUD_PROJECT']
BUCKET = os.environ['CLOUD_STORAGE_BUCKET']
Expand All @@ -30,6 +30,6 @@

@flaky
def test_e2e():
output = create_cluster_and_submit_job.main(
output = submit_job_to_cluster.main(
PROJECT, ZONE, CLUSTER_NAME, BUCKET)
assert b"['Hello,', 'dog', 'elephant', 'panther', 'world!']" in output
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
from google.cloud import storage
import googleapiclient.discovery

# Currently only the "global" region is supported
REGION = 'global'
DEFAULT_FILENAME = 'pyspark_sort.py'


Expand All @@ -36,6 +34,14 @@ def get_pyspark_file(filename):
return f, os.path.basename(filename)


def get_region_from_zone(zone):
try:
region_as_list = zone.split('-')[:-1]
return '-'.join(region_as_list)
except (AttributeError, IndexError, ValueError):
raise ValueError('Invalid zone provided, please check your input.')


def upload_pyspark_file(project_id, bucket_name, filename, file):
"""Uploads the PySpark file in this directory to the configured
input bucket."""
Expand All @@ -59,8 +65,8 @@ def download_output(project_id, cluster_id, output_bucket, job_id):


# [START create_cluster]
def create_cluster(dataproc, project, cluster_name, zone):
print('Creating cluster.')
def create_cluster(dataproc, project, zone, region, cluster_name):
print('Creating cluster...')
zone_uri = \
'https://www.googleapis.com/compute/v1/projects/{}/zones/{}'.format(
project, zone)
Expand All @@ -75,19 +81,19 @@ def create_cluster(dataproc, project, cluster_name, zone):
}
result = dataproc.projects().regions().clusters().create(
projectId=project,
region=REGION,
region=region,
body=cluster_data).execute()
return result
# [END create_cluster]


def wait_for_cluster_creation(dataproc, project_id, cluster_name, zone):
print('Waiting for cluster creation')
def wait_for_cluster_creation(dataproc, project_id, region, cluster_name):
print('Waiting for cluster creation...')

while True:
result = dataproc.projects().regions().clusters().list(
projectId=project_id,
region=REGION).execute()
region=region).execute()
cluster_list = result['clusters']
cluster = [c
for c in cluster_list
Expand All @@ -100,10 +106,10 @@ def wait_for_cluster_creation(dataproc, project_id, cluster_name, zone):


# [START list_clusters_with_detail]
def list_clusters_with_details(dataproc, project):
def list_clusters_with_details(dataproc, project, region):
result = dataproc.projects().regions().clusters().list(
projectId=project,
region=REGION).execute()
region=region).execute()
cluster_list = result['clusters']
for cluster in cluster_list:
print("{} - {}"
Expand All @@ -120,7 +126,8 @@ def get_cluster_id_by_name(cluster_list, cluster_name):


# [START submit_pyspark_job]
def submit_pyspark_job(dataproc, project, cluster_name, bucket_name, filename):
def submit_pyspark_job(dataproc, project, region,
cluster_name, bucket_name, filename):
"""Submits the Pyspark job to the cluster, assuming `filename` has
already been uploaded to `bucket_name`"""
job_details = {
Expand All @@ -136,7 +143,7 @@ def submit_pyspark_job(dataproc, project, cluster_name, bucket_name, filename):
}
result = dataproc.projects().regions().jobs().submit(
projectId=project,
region=REGION,
region=region,
body=job_details).execute()
job_id = result['reference']['jobId']
print('Submitted job ID {}'.format(job_id))
Expand All @@ -145,29 +152,29 @@ def submit_pyspark_job(dataproc, project, cluster_name, bucket_name, filename):


# [START delete]
def delete_cluster(dataproc, project, cluster):
def delete_cluster(dataproc, project, region, cluster):
print('Tearing down cluster')
result = dataproc.projects().regions().clusters().delete(
projectId=project,
region=REGION,
region=region,
clusterName=cluster).execute()
return result
# [END delete]


# [START wait]
def wait_for_job(dataproc, project, job_id):
def wait_for_job(dataproc, project, region, job_id):
print('Waiting for job to finish...')
while True:
result = dataproc.projects().regions().jobs().get(
projectId=project,
region=REGION,
region=region,
jobId=job_id).execute()
# Handle exceptions
if result['status']['state'] == 'ERROR':
raise Exception(result['status']['details'])
elif result['status']['state'] == 'DONE':
print('Job finished')
print('Job finished.')
return result
# [END wait]

Expand All @@ -181,34 +188,42 @@ def get_client():
# [END get_client]


def main(project_id, zone, cluster_name, bucket_name, pyspark_file=None):
def main(project_id, zone, cluster_name, bucket_name,
pyspark_file=None, create_new_cluster=True):
dataproc = get_client()
region = get_region_from_zone(zone)
try:
if pyspark_file:
spark_file, spark_filename = get_pyspark_file(pyspark_file)
else:
spark_file, spark_filename = get_default_pyspark_file()

create_cluster(dataproc, project_id, cluster_name, zone)
wait_for_cluster_creation(dataproc, project_id, cluster_name, zone)
upload_pyspark_file(project_id, bucket_name,
spark_filename, spark_file)
if create_new_cluster:
create_cluster(dataproc, project_id, zone, region, cluster_name)
wait_for_cluster_creation(dataproc, project_id, region, cluster_name)

upload_pyspark_file(
project_id, bucket_name, spark_filename, spark_file)

cluster_list = list_clusters_with_details(
dataproc, project_id)['clusters']
dataproc, project_id, region)['clusters']

(cluster_id, output_bucket) = (
get_cluster_id_by_name(cluster_list, cluster_name))

# [START call_submit_pyspark_job]
job_id = submit_pyspark_job(
dataproc, project_id, cluster_name, bucket_name, spark_filename)
dataproc, project_id, region,
cluster_name, bucket_name, spark_filename)
# [END call_submit_pyspark_job]
wait_for_job(dataproc, project_id, job_id)
wait_for_job(dataproc, project_id, region, job_id)

output = download_output(project_id, cluster_id, output_bucket, job_id)
print('Received job output {}'.format(output))
return output
finally:
delete_cluster(dataproc, project_id, cluster_name)
if create_new_cluster:
delete_cluster(dataproc, project_id, region, cluster_name)
spark_file.close()


Expand All @@ -220,15 +235,19 @@ def main(project_id, zone, cluster_name, bucket_name, pyspark_file=None):
parser.add_argument(
'--project_id', help='Project ID you want to access.', required=True),
parser.add_argument(
'--zone', help='Region to create clusters in', required=True)
'--zone', help='Zone to create clusters in/connect to', required=True)
parser.add_argument(
'--cluster_name', help='Name of the cluster to create', required=True)
'--cluster_name',
help='Name of the cluster to create/connect to', required=True)
parser.add_argument(
'--gcs_bucket', help='Bucket to upload Pyspark file to', required=True)
parser.add_argument(
'--pyspark_file', help='Pyspark filename. Defaults to pyspark_sort.py')
parser.add_argument(
'--create_new_cluster',
action='store_true', help='States if the cluster should be created or not')

args = parser.parse_args()
main(
args.project_id, args.zone,
args.cluster_name, args.gcs_bucket, args.pyspark_file)
args.project_id, args.zone, args.cluster_name,
args.gcs_bucket, args.pyspark_file, args.create_new_cluster)

0 comments on commit 3b68ef6

Please sign in to comment.