Skip to content

Commit

Permalink
Filter instances using EC2 API instead of locally
Browse files Browse the repository at this point in the history
This change will allow EC2 API to filter by tags, AZ, and instance state.  In the situation where you have a large number of instances/reservations, this can be a performance boost.

Note that we still do the security group filter locally due to the different strategies (all or some must match).

Closes #39.
  • Loading branch information
joeslice authored and dadoonet committed Nov 26, 2013
1 parent 283d174 commit 712baa4
Showing 1 changed file with 53 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,12 @@ public List<DiscoveryNode> buildDynamicNodes() {

DescribeInstancesResult descInstances;
try {
descInstances = client.describeInstances(new DescribeInstancesRequest());
// Query EC2 API based on AZ, instance state, and tag.

// NOTE: we don't filter by security group during the describe instances request for two reasons:
// 1. differences in VPCs require different parameters during query (ID vs Name)
// 2. We want to use two different strategies: (all security groups vs. any security groups)
descInstances = client.describeInstances(buildDescribeInstancesRequest());
} catch (AmazonClientException e) {
logger.info("Exception while retrieving instance list from AWS API: {}", e.getMessage());
logger.debug("Full exception:", e);
Expand All @@ -109,13 +114,6 @@ public List<DiscoveryNode> buildDynamicNodes() {
logger.trace("building dynamic unicast discovery nodes...");
for (Reservation reservation : descInstances.getReservations()) {
for (Instance instance : reservation.getInstances()) {
if (!availabilityZones.isEmpty()) {
if (!availabilityZones.contains(instance.getPlacement().getAvailabilityZone())) {
logger.trace("filtering out instance {} based on availability_zone {}, not part of {}", instance.getInstanceId(), instance.getPlacement().getAvailabilityZone(), availabilityZones);
continue;
}
}

// lets see if we can filter based on groups
if (!groups.isEmpty()) {
List<GroupIdentifier> instanceSecurityGroups = instance.getSecurityGroups();
Expand All @@ -138,66 +136,34 @@ public List<DiscoveryNode> buildDynamicNodes() {
}
}

// see if we need to filter by tags
boolean filterByTag = false;
if (!tags.isEmpty()) {
if (instance.getTags() == null) {
filterByTag = true;
} else {
// check that all tags listed are there on the instance
for (Map.Entry<String, String> entry : tags.entrySet()) {
boolean found = false;
for (Tag tag : instance.getTags()) {
if (entry.getKey().equals(tag.getKey()) && entry.getValue().equals(tag.getValue())) {
found = true;
break;
}
}
if (!found) {
filterByTag = true;
break;
}
}
}
String address = null;
switch (hostType) {
case PRIVATE_DNS:
address = instance.getPrivateDnsName();
break;
case PRIVATE_IP:
address = instance.getPrivateIpAddress();
break;
case PUBLIC_DNS:
address = instance.getPublicDnsName();
break;
case PUBLIC_IP:
address = instance.getPublicDnsName();
break;
}
if (filterByTag) {
logger.trace("filtering out instance {} based tags {}, not part of {}", instance.getInstanceId(), tags, instance.getTags());
continue;
}

InstanceState state = instance.getState();
if (state.getName().equalsIgnoreCase("pending") || state.getName().equalsIgnoreCase("running")) {
String address = null;
switch (hostType) {
case PRIVATE_DNS:
address = instance.getPrivateDnsName();
break;
case PRIVATE_IP:
address = instance.getPrivateIpAddress();
break;
case PUBLIC_DNS:
address = instance.getPublicDnsName();
break;
case PUBLIC_IP:
address = instance.getPublicDnsName();
break;
}
if (address != null) {
try {
TransportAddress[] addresses = transportService.addressesFromString(address);
// we only limit to 1 address, makes no sense to ping 100 ports
for (int i = 0; (i < addresses.length && i < UnicastZenPing.LIMIT_PORTS_COUNT); i++) {
logger.trace("adding {}, address {}, transport_address {}", instance.getInstanceId(), address, addresses[i]);
discoNodes.add(new DiscoveryNode("#cloud-" + instance.getInstanceId() + "-" + i, addresses[i], Version.CURRENT));
}
} catch (Exception e) {
logger.warn("failed to add {}, address {}", e, instance.getInstanceId(), address);
if (address != null) {
try {
TransportAddress[] addresses = transportService.addressesFromString(address);
// we only limit to 1 addresses, makes no sense to ping 100 ports
for (int i = 0; (i < addresses.length && i < UnicastZenPing.LIMIT_PORTS_COUNT); i++) {
logger.trace("adding {}, address {}, transport_address {}", instance.getInstanceId(), address, addresses[i]);
discoNodes.add(new DiscoveryNode("#cloud-" + instance.getInstanceId() + "-" + i, addresses[i], Version.CURRENT));
}
} else {
logger.trace("not adding {}, address is null, host_type {}", instance.getInstanceId(), hostType);
} catch (Exception e) {
logger.warn("failed ot add {}, address {}", e, instance.getInstanceId(), address);
}
} else {
logger.trace("not adding {}, state {} is not pending or running", instance.getInstanceId(), state.getName());
logger.trace("not adding {}, address is null, host_type {}", instance.getInstanceId(), hostType);
}
}
}
Expand All @@ -206,4 +172,27 @@ public List<DiscoveryNode> buildDynamicNodes() {

return discoNodes;
}

private DescribeInstancesRequest buildDescribeInstancesRequest() {
DescribeInstancesRequest describeInstancesRequest = new DescribeInstancesRequest()
.withFilters(
new Filter("instance-state-name").withValues("running", "pending")
);

for (Map.Entry<String, String> tagFilter : tags.entrySet()) {
// for a given tag key, OR relationship for multiple different values
describeInstancesRequest.withFilters(
new Filter("tag:" + tagFilter.getKey()).withValues(tagFilter.getValue())
);
}

if (!availabilityZones.isEmpty()) {
// OR relationship amongst multiple values of the availability-zone filter
describeInstancesRequest.withFilters(
new Filter("availability-zone").withValues(availabilityZones)
);
}

return describeInstancesRequest;
}
}

0 comments on commit 712baa4

Please sign in to comment.