Skip to content

Commit

Permalink
Merge pull request #9 from oli-hall/improve-blocking-create
Browse files Browse the repository at this point in the history
Improve blocking create
  • Loading branch information
oli-hall authored Nov 17, 2017
2 parents 285a8e7 + feb9538 commit ad35d6e
Showing 1 changed file with 11 additions and 7 deletions.
18 changes: 11 additions & 7 deletions pydataproc/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,8 @@ def create_cluster(self, cluster_name, num_masters=1, num_workers=2,
"""Creates a DataProc cluster with the provided settings, returning a dict
of the results returned from the API. It can wait for cluster creation if desired.
N.B. the cluster creation currently waits for the cluster to reach a 'RUNNING' state.
If there is an initialisation error, it may never reach this state, which currently
isn't handled.
If block is set to True, the method will block until the cluster reaches either
a RUNNING or an ERROR state. If the cluster errors, an Exception will be raised.
:param cluster_name: the name of the cluster
:param num_masters: the number of master instances to use (default: 1)
Expand Down Expand Up @@ -162,12 +161,16 @@ def create_cluster(self, cluster_name, num_masters=1, num_workers=2,
return result


is_running = self.is_running(cluster_name)
state = self.cluster_state(cluster_name)
log.info("Waiting for cluster to be ready...")
log.warn("N.B. This may get stuck if the cluster never reaches a RUNNING state")
while not is_running:
while not state in ['RUNNING', 'ERROR']:
time.sleep(5)
is_running = self.is_running(cluster_name)
state = self.cluster_state(cluster_name)

if state == 'ERROR':
cluster_info = self.cluster_info(cluster_name)
status_detail = cluster_info['status']['detail']
raise Exception("Cluster encountered an error: {}".format(status_detail))

return self.cluster_info(cluster_name)

Expand All @@ -178,6 +181,7 @@ def delete_cluster(self, cluster_name):
:param cluster_name: the name of the cluster to delete
:return: the (dict) results of the deletion
"""
# TODO this doesn't deal with clusters that aren't running (e.g. that failed on startup)
log.info('Tearing down cluster {}...'.format(cluster_name))
result = self.dataproc.projects().regions().clusters().delete(
projectId=self.project,
Expand Down

0 comments on commit ad35d6e

Please sign in to comment.