Skip to content

Commit

Permalink
short-circuit SSH check; linear backoff
Browse files Browse the repository at this point in the history
  • Loading branch information
nchammas committed Sep 14, 2014
1 parent 9a9e035 commit 43a69f0
Showing 1 changed file with 20 additions and 9 deletions.
29 changes: 20 additions & 9 deletions ec2/spark_ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def parse_args():
help="Number of slaves to launch (default: %default)")
parser.add_option(
"-w", "--wait", type="int",
help="DEPRECATED - Seconds to wait for nodes to start")
help="DEPRECATED (no longer necessary) - Seconds to wait for nodes to start")
parser.add_option(
"-k", "--key-pair",
help="Key pair to use on instances")
Expand Down Expand Up @@ -602,11 +602,8 @@ def is_ssh_available(host, opts):
try:
with open(os.devnull, 'w') as devnull:
ret = subprocess.check_call(
ssh_command(opts) +
['-t', '-t',
'-o', 'ConnectTimeout=3',
'%s@%s' % (opts.user, host),
stringify_command('true')],
ssh_command(opts) + ['-t', '-t', '-o', 'ConnectTimeout=3',
'%s@%s' % (opts.user, host), stringify_command('true')],
stdout=devnull,
stderr=devnull
)
Expand All @@ -615,6 +612,14 @@ def is_ssh_available(host, opts):
return False


def is_cluster_ssh_available(cluster_instances, opts):
for i in cluster_instances:
if not is_ssh_available(host=i.ip_address, opts=opts):
return False
else:
return True


def wait_for_cluster_state(cluster_instances, cluster_state, opts):
"""
cluster_instances: a list of boto.ec2.instance.Instance
Expand All @@ -628,20 +633,26 @@ def wait_for_cluster_state(cluster_instances, cluster_state, opts):
)
sys.stdout.flush()

num_attempts = 0

while True:
time.sleep(3 * num_attempts)

for i in cluster_instances:
s = i.update() # capture output to suppress print to screen in newer versions of boto
# print "{instance}: {state}".format(instance=i.id, state=i.state)

if cluster_state == 'ssh-ready':
if all(i.state == 'running' for i in cluster_instances) and \
all(is_ssh_available(host=i.ip_address, opts=opts) for i in cluster_instances):
is_cluster_ssh_available(cluster_instances, opts):
break
else:
if all(i.state == cluster_state for i in cluster_instances):
break

num_attempts += 1

sys.stdout.write(".")
sys.stdout.flush()
time.sleep(3)

sys.stdout.write("\n")

Expand Down

0 comments on commit 43a69f0

Please sign in to comment.