Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix region handling and allow to use an existing cluster. #1053

Merged
merged 1 commit into from
Aug 7, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions dataproc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,19 @@ To run list_clusters.py:
python list_clusters.py <YOUR-PROJECT-ID> --region=us-central1


To run create_cluster_and_submit_job, first create a GCS bucket, from the Cloud Console or with
To run submit_job_to_cluster.py, first create a GCS bucket, from the Cloud Console or with
gsutil:

gsutil mb gs://<your-input-bucket-name>

Then run:
Then, if you want to rely on an existing cluster, run:

python create_cluster_and_submit_job.py --project_id=<your-project-id> --zone=us-central1-b --cluster_name=testcluster --gcs_bucket=<your-input-bucket-name>
python submit_job_to_cluster.py --project_id=<your-project-id> --zone=us-central1-b --cluster_name=testcluster --gcs_bucket=<your-input-bucket-name>

Otherwise, if you want the script to create a new cluster for you:

python submit_job_to_cluster.py --project_id=<your-project-id> --zone=us-central1-b --cluster_name=testcluster --gcs_bucket=<your-input-bucket-name> --create_new_cluster


This will setup a cluster, upload the PySpark file, submit the job, print the result, then
delete the cluster.
Expand Down
4 changes: 2 additions & 2 deletions dataproc/dataproc_e2e_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from gcp_devrel.testing.flaky import flaky

import create_cluster_and_submit_job
import submit_job_to_cluster

PROJECT = os.environ['GCLOUD_PROJECT']
BUCKET = os.environ['CLOUD_STORAGE_BUCKET']
Expand All @@ -30,6 +30,6 @@

@flaky
def test_e2e():
output = create_cluster_and_submit_job.main(
output = submit_job_to_cluster.main(
PROJECT, ZONE, CLUSTER_NAME, BUCKET)
assert b"['Hello,', 'dog', 'elephant', 'panther', 'world!']" in output
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
from google.cloud import storage
import googleapiclient.discovery

# Currently only the "global" region is supported
REGION = 'global'
DEFAULT_FILENAME = 'pyspark_sort.py'


Expand All @@ -36,6 +34,14 @@ def get_pyspark_file(filename):
return f, os.path.basename(filename)


def get_region_from_zone(zone):
try:
region_as_list = zone.split('-')[:-1]
return '-'.join(region_as_list)
except (AttributeError, IndexError, ValueError):
raise ValueError('Invalid zone provided, please check your input.')


def upload_pyspark_file(project_id, bucket_name, filename, file):
"""Uploads the PySpark file in this directory to the configured
input bucket."""
Expand All @@ -59,8 +65,8 @@ def download_output(project_id, cluster_id, output_bucket, job_id):


# [START create_cluster]
def create_cluster(dataproc, project, cluster_name, zone):
print('Creating cluster.')
def create_cluster(dataproc, project, zone, region, cluster_name):
print('Creating cluster...')
zone_uri = \
'https://www.googleapis.com/compute/v1/projects/{}/zones/{}'.format(
project, zone)
Expand All @@ -75,19 +81,19 @@ def create_cluster(dataproc, project, cluster_name, zone):
}
result = dataproc.projects().regions().clusters().create(
projectId=project,
region=REGION,
region=region,
body=cluster_data).execute()
return result
# [END create_cluster]


def wait_for_cluster_creation(dataproc, project_id, cluster_name, zone):
print('Waiting for cluster creation')
def wait_for_cluster_creation(dataproc, project_id, region, cluster_name):
print('Waiting for cluster creation...')

while True:
result = dataproc.projects().regions().clusters().list(
projectId=project_id,
region=REGION).execute()
region=region).execute()
cluster_list = result['clusters']
cluster = [c
for c in cluster_list
Expand All @@ -100,10 +106,10 @@ def wait_for_cluster_creation(dataproc, project_id, cluster_name, zone):


# [START list_clusters_with_detail]
def list_clusters_with_details(dataproc, project):
def list_clusters_with_details(dataproc, project, region):
result = dataproc.projects().regions().clusters().list(
projectId=project,
region=REGION).execute()
region=region).execute()
cluster_list = result['clusters']
for cluster in cluster_list:
print("{} - {}"
Expand All @@ -120,7 +126,8 @@ def get_cluster_id_by_name(cluster_list, cluster_name):


# [START submit_pyspark_job]
def submit_pyspark_job(dataproc, project, cluster_name, bucket_name, filename):
def submit_pyspark_job(dataproc, project, region,
cluster_name, bucket_name, filename):
"""Submits the Pyspark job to the cluster, assuming `filename` has
already been uploaded to `bucket_name`"""
job_details = {
Expand All @@ -136,7 +143,7 @@ def submit_pyspark_job(dataproc, project, cluster_name, bucket_name, filename):
}
result = dataproc.projects().regions().jobs().submit(
projectId=project,
region=REGION,
region=region,
body=job_details).execute()
job_id = result['reference']['jobId']
print('Submitted job ID {}'.format(job_id))
Expand All @@ -145,29 +152,29 @@ def submit_pyspark_job(dataproc, project, cluster_name, bucket_name, filename):


# [START delete]
def delete_cluster(dataproc, project, cluster):
def delete_cluster(dataproc, project, region, cluster):
print('Tearing down cluster')
result = dataproc.projects().regions().clusters().delete(
projectId=project,
region=REGION,
region=region,
clusterName=cluster).execute()
return result
# [END delete]


# [START wait]
def wait_for_job(dataproc, project, job_id):
def wait_for_job(dataproc, project, region, job_id):
print('Waiting for job to finish...')
while True:
result = dataproc.projects().regions().jobs().get(
projectId=project,
region=REGION,
region=region,
jobId=job_id).execute()
# Handle exceptions
if result['status']['state'] == 'ERROR':
raise Exception(result['status']['details'])
elif result['status']['state'] == 'DONE':
print('Job finished')
print('Job finished.')
return result
# [END wait]

Expand All @@ -181,34 +188,44 @@ def get_client():
# [END get_client]


def main(project_id, zone, cluster_name, bucket_name, pyspark_file=None):
def main(project_id, zone, cluster_name, bucket_name,
pyspark_file=None, create_new_cluster=True):
dataproc = get_client()
region = get_region_from_zone(zone)
try:
if pyspark_file:
spark_file, spark_filename = get_pyspark_file(pyspark_file)
else:
spark_file, spark_filename = get_default_pyspark_file()

create_cluster(dataproc, project_id, cluster_name, zone)
wait_for_cluster_creation(dataproc, project_id, cluster_name, zone)
upload_pyspark_file(project_id, bucket_name,
spark_filename, spark_file)
if create_new_cluster:
create_cluster(
dataproc, project_id, zone, region, cluster_name)
wait_for_cluster_creation(
dataproc, project_id, region, cluster_name)

upload_pyspark_file(
project_id, bucket_name, spark_filename, spark_file)

cluster_list = list_clusters_with_details(
dataproc, project_id)['clusters']
dataproc, project_id, region)['clusters']

(cluster_id, output_bucket) = (
get_cluster_id_by_name(cluster_list, cluster_name))

# [START call_submit_pyspark_job]
job_id = submit_pyspark_job(
dataproc, project_id, cluster_name, bucket_name, spark_filename)
dataproc, project_id, region,
cluster_name, bucket_name, spark_filename)
# [END call_submit_pyspark_job]
wait_for_job(dataproc, project_id, job_id)
wait_for_job(dataproc, project_id, region, job_id)

output = download_output(project_id, cluster_id, output_bucket, job_id)
print('Received job output {}'.format(output))
return output
finally:
delete_cluster(dataproc, project_id, cluster_name)
if create_new_cluster:
delete_cluster(dataproc, project_id, region, cluster_name)
spark_file.close()


Expand All @@ -220,15 +237,19 @@ def main(project_id, zone, cluster_name, bucket_name, pyspark_file=None):
parser.add_argument(
'--project_id', help='Project ID you want to access.', required=True),
parser.add_argument(
'--zone', help='Region to create clusters in', required=True)
'--zone', help='Zone to create clusters in/connect to', required=True)
parser.add_argument(
'--cluster_name', help='Name of the cluster to create', required=True)
'--cluster_name',
help='Name of the cluster to create/connect to', required=True)
parser.add_argument(
'--gcs_bucket', help='Bucket to upload Pyspark file to', required=True)
parser.add_argument(
'--pyspark_file', help='Pyspark filename. Defaults to pyspark_sort.py')
parser.add_argument(
'--create_new_cluster',
action='store_true', help='States if the cluster should be created')

args = parser.parse_args()
main(
args.project_id, args.zone,
args.cluster_name, args.gcs_bucket, args.pyspark_file)
args.project_id, args.zone, args.cluster_name,
args.gcs_bucket, args.pyspark_file, args.create_new_cluster)