diff --git a/dataproc/quickstart/quickstart.py b/dataproc/quickstart/quickstart.py index fcbda8827d3b..4159e2815202 100644 --- a/dataproc/quickstart/quickstart.py +++ b/dataproc/quickstart/quickstart.py @@ -15,6 +15,18 @@ # limitations under the License. # [START dataproc_quickstart] +""" +This quickstart sample walks a user through creating a Cloud Dataproc +cluster, submitting a PySpark job from Google Cloud Storage to the +cluster, reading the output of the job and deleting the cluster, all +using the Python client library. + +Usage: + python quickstart.py --project_id --region \ + --cluster_name --job_file_path +""" + +import argparse import time from google.cloud import dataproc_v1 as dataproc @@ -22,18 +34,6 @@ def quickstart(project_id, region, cluster_name, job_file_path): - """This quickstart sample walks a user through creating a Cloud Dataproc - cluster, submitting a PySpark job from Google Cloud Storage to the - cluster, reading the output of the job and deleting the cluster, all - using the Python client library. - - Args: - project_id (string): Project to use for creating resources. - region (string): Region where the resources should live. - cluster_name (string): Name to use for creating a cluster. - job_file_path (string): Job in GCS to execute against the cluster. - """ - # Create the cluster client. cluster_client = dataproc.ClusterControllerClient(client_options={ 'api_endpoint': '{}-dataproc.googleapis.com:443'.format(region) @@ -125,4 +125,35 @@ def quickstart(project_id, region, cluster_name, job_file_path): operation.result() print('Cluster {} successfully deleted.'.format(cluster_name)) - # [END dataproc_quickstart] + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument( + '--project_id', + type=str, + required=True, + help='Project to use for creating resources.') + parser.add_argument( + '--region', + type=str, + required=True, + help='Region where the resources should live.') + parser.add_argument( + '--cluster_name', + type=str, + required=True, + help='Name to use for creating a cluster.') + parser.add_argument( + '--job_file_path', + type=str, + required=True, + help='Job in GCS to execute against the cluster.') + + args = parser.parse_args() + quickstart(args.project_id, args.region, + args.cluster_name, args.job_file_path) +# [END dataproc_quickstart] diff --git a/dataproc/quickstart/quickstart_test.py b/dataproc/quickstart/quickstart_test.py index df488d0abc6f..b7fe0576676d 100644 --- a/dataproc/quickstart/quickstart_test.py +++ b/dataproc/quickstart/quickstart_test.py @@ -15,12 +15,11 @@ import os import uuid import pytest +import subprocess from google.cloud import dataproc_v1 as dataproc from google.cloud import storage -import quickstart - PROJECT_ID = os.environ['GCLOUD_PROJECT'] REGION = 'us-central1' @@ -29,10 +28,10 @@ JOB_FILE_NAME = 'sum.py' JOB_FILE_PATH = 'gs://{}/{}'.format(STAGING_BUCKET, JOB_FILE_NAME) SORT_CODE = ( - "import pyspark\n" - "sc = pyspark.SparkContext()\n" - "rdd = sc.parallelize((1,2,3,4,5))\n" - "sum = rdd.reduce(lambda x, y: x + y)\n" + "import pyspark\n" + "sc = pyspark.SparkContext()\n" + "rdd = sc.parallelize((1,2,3,4,5))\n" + "sum = rdd.reduce(lambda x, y: x + y)\n" ) @@ -60,10 +59,16 @@ def setup_teardown(): blob.delete() -def test_quickstart(capsys): - quickstart.quickstart(PROJECT_ID, REGION, CLUSTER_NAME, JOB_FILE_PATH) +def test_quickstart(): + command = [ + 'python', 'quickstart/quickstart.py', + '--project_id', PROJECT_ID, + '--region', REGION, + '--cluster_name', CLUSTER_NAME, + '--job_file_path', JOB_FILE_PATH + ] + out = subprocess.check_output(command).decode("utf-8") - out, _ = capsys.readouterr() assert 'Cluster created successfully' in out assert 'Submitted job' in out assert 'finished with state DONE:' in out