Skip to content

Commit

Permalink
Added quickstart test
Browse files Browse the repository at this point in the history
  • Loading branch information
bradmiro committed Dec 17, 2019
1 parent 194ba5b commit 19a4997
Showing 1 changed file with 70 additions and 0 deletions.
70 changes: 70 additions & 0 deletions dataproc/quickstart/quickstart_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import uuid
import pytest

from google.cloud import dataproc_v1 as dataproc
from google.cloud import storage

import quickstart


PROJECT_ID = os.environ['GCLOUD_PROJECT']
REGION = 'us-central1'
CLUSTER_NAME = 'test-cluster-{}'.format(str(uuid.uuid4()))
STAGING_BUCKET = 'test-bucket-{}'.format(str(uuid.uuid4()))
JOB_FILE_NAME = 'sum.py'
JOB_FILE_PATH = 'gs://{}/{}'.format(STAGING_BUCKET, JOB_FILE_NAME)
SORT_CODE = (
"import pyspark\n"
"sc = pyspark.SparkContext()\n"
"rdd = sc.parallelize((1,2,3,4,5))\n"
"sum = rdd.reduce(lambda x, y: x + y)\n"
)


@pytest.fixture(autouse=True)
def setup_teardown():
storage_client = storage.Client()
bucket = storage_client.create_bucket(STAGING_BUCKET)
blob = bucket.blob(JOB_FILE_NAME)
blob.upload_from_string(SORT_CODE)

yield

cluster_client = dataproc.ClusterControllerClient(client_options={
'api_endpoint': '{}-dataproc.googleapis.com:443'.format(REGION)
})

# The quickstart sample deletes the cluster, but in the event that the
# test fails before cluster deletion occurs, it can be manually deleted here.
clusters = cluster_client.list_clusters(PROJECT_ID, REGION)

for cluster in clusters:
if cluster.cluster_name == CLUSTER_NAME:
cluster_client.delete_cluster(PROJECT_ID, REGION, CLUSTER_NAME)

blob.delete()


def test_quickstart(capsys):
quickstart.quickstart(PROJECT_ID, REGION, CLUSTER_NAME, JOB_FILE_PATH)

out, _ = capsys.readouterr()
assert 'Cluster created successfully' in out
assert 'Submitted job' in out
assert 'finished with state DONE:' in out
assert 'successfully deleted' in out

0 comments on commit 19a4997

Please sign in to comment.