Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add unit test to spark_ec2 script #134

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*.ipr
*.iml
*.iws
*.pyc
.idea/
sbt/*.jar
.settings
Expand Down
48 changes: 48 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,54 @@ WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
IN THE SOFTWARE.

=======================================================================
For the moto library (ec2/third_party/moto*.zip)
=======================================================================

Copyright 2012 Steve Pulec

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

=======================================================================
For the mock library (ec2/third_party/mock*.zip)
=======================================================================

Copyright (c) 2003-2012, Michael Foord
All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:

* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.

* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following
disclaimer in the documentation and/or other materials provided
with the distribution.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

========================================================================
For CloudPickle (pyspark/cloudpickle.py):
Expand Down
109 changes: 57 additions & 52 deletions ec2/spark_ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
from boto.ec2.blockdevicemapping import BlockDeviceMapping, EBSBlockDeviceType
from boto import ec2

AWS_EVENTUAL_CONSISTENCY = 30

class UsageError(Exception):
pass

Expand Down Expand Up @@ -223,7 +225,7 @@ def launch_cluster(conn, opts, cluster_name):
sys.exit(1)
if opts.key_pair is None:
print >> stderr, "ERROR: Must provide a key pair name (-k) to use on instances."
sys.exit(1)
sys.exit(1)
print "Setting up security groups..."
master_group = get_or_make_group(conn, cluster_name + "-master")
slave_group = get_or_make_group(conn, cluster_name + "-slaves")
Expand Down Expand Up @@ -459,7 +461,6 @@ def setup_spark_cluster(master, opts):
if opts.ganglia:
print "Ganglia started at http://%s:5080/ganglia" % master


# Wait for a whole cluster (masters, slaves and ZooKeeper) to start up
def wait_for_cluster(conn, wait_secs, master_nodes, slave_nodes):
print "Waiting for instances to start up..."
Expand Down Expand Up @@ -665,6 +666,59 @@ def get_partition(total, num_partitions, current_partitions):
return num_slaves_this_zone


def destroy_cluster(conn, opts, cluster_name):
(master_nodes, slave_nodes) = get_existing_cluster(
conn, opts, cluster_name, die_on_error=False)
print "Terminating master..."
for inst in master_nodes:
inst.terminate()
print "Terminating slaves..."
for inst in slave_nodes:
inst.terminate()

# Delete security groups as well
if opts.delete_groups:
print "Deleting security groups (this will take some time)..."
group_names = [cluster_name + "-master", cluster_name + "-slaves"]

attempt = 1;
while attempt <= 3:
print "Attempt %d" % attempt
groups = [g for g in conn.get_all_security_groups() if g.name in group_names]
success = True
# Delete individual rules in all groups before deleting groups to
# remove dependencies between them
for group in groups:
print "Deleting rules in security group " + group.name
for rule in group.rules:
for grant in rule.grants:
success &= group.revoke(ip_protocol=rule.ip_protocol,
from_port=rule.from_port,
to_port=rule.to_port,
src_group=grant)

# Sleep for AWS eventual-consistency to catch up, and for instances
# to terminate
time.sleep(AWS_EVENTUAL_CONSISTENCY) # Yes, it does have to be this long :-(
for group in groups:
try:
conn.delete_security_group(group.name)
print "Deleted security group " + group.name
except boto.exception.EC2ResponseError:
success = False;
print "Failed to delete security group " + group.name

# Unfortunately, group.revoke() returns True even if a rule was not
# deleted, so this needs to be rerun if something fails
if success: break;

attempt += 1

if not success:
print "Failed to delete all security groups after 3 tries."
print "Try re-running in a few minutes."


def real_main():
(opts, action, cluster_name) = parse_args()
try:
Expand Down Expand Up @@ -695,56 +749,7 @@ def real_main():
cluster_name + "?\nALL DATA ON ALL NODES WILL BE LOST!!\n" +
"Destroy cluster " + cluster_name + " (y/N): ")
if response == "y":
(master_nodes, slave_nodes) = get_existing_cluster(
conn, opts, cluster_name, die_on_error=False)
print "Terminating master..."
for inst in master_nodes:
inst.terminate()
print "Terminating slaves..."
for inst in slave_nodes:
inst.terminate()

# Delete security groups as well
if opts.delete_groups:
print "Deleting security groups (this will take some time)..."
group_names = [cluster_name + "-master", cluster_name + "-slaves"]

attempt = 1;
while attempt <= 3:
print "Attempt %d" % attempt
groups = [g for g in conn.get_all_security_groups() if g.name in group_names]
success = True
# Delete individual rules in all groups before deleting groups to
# remove dependencies between them
for group in groups:
print "Deleting rules in security group " + group.name
for rule in group.rules:
for grant in rule.grants:
success &= group.revoke(ip_protocol=rule.ip_protocol,
from_port=rule.from_port,
to_port=rule.to_port,
src_group=grant)

# Sleep for AWS eventual-consistency to catch up, and for instances
# to terminate
time.sleep(30) # Yes, it does have to be this long :-(
for group in groups:
try:
conn.delete_security_group(group.name)
print "Deleted security group " + group.name
except boto.exception.EC2ResponseError:
success = False;
print "Failed to delete security group " + group.name

# Unfortunately, group.revoke() returns True even if a rule was not
# deleted, so this needs to be rerun if something fails
if success: break;

attempt += 1

if not success:
print "Failed to delete all security groups after 3 tries."
print "Try re-running in a few minutes."
destroy_cluster()

elif action == "login":
(master_nodes, slave_nodes) = get_existing_cluster(
Expand Down
25 changes: 25 additions & 0 deletions ec2/tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import unittest
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably include

#!/usr/bin/env python
# -*- coding: utf-8 -*-

and definitely include the Apache license header

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


from boto import ec2
import mock
import moto

import spark_ec2

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra newline

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The extra newline before classes is required by the flake8 tool in its default settings, which is what I am using to review Python style for this script. If you tell me this is not your enforced style, then I will change it. However, if you do not have any enforced style, please allow me to keep it and suggest flake8 defaults as the style standard.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I check code in pyspark as well and we do follow the flake8 defaults of extra newline before classes. So keep this in is absolutely fine.


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also add some class level comments and instructions on how to run this test here ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

class CommandTests(unittest.TestCase):

@moto.mock_ec2
def test_destroy(self):
spark_ec2.AWS_EVENTUAL_CONSISTENCY = 1
opts = mock.MagicMock(name='opts')
opts.region = "us-east-1"
conn = ec2.connect_to_region(opts.region)
cluster_name = "cluster_name"
try:
spark_ec2.destroy_cluster(conn, opts, cluster_name)
except:
self.fail("destroy_cluster raised unexpected exception")

if __name__ == '__main__':
unittest.main()
Binary file added ec2/third_party/mock-1.0.1.zip
Binary file not shown.
Binary file added ec2/third_party/moto-0.2.11.zip
Binary file not shown.