Skip to content
This repository has been archived by the owner on Oct 22, 2019. It is now read-only.

Commit

Permalink
Rework IPAMClient.random_blocks() to avoid collecting the whole list.
Browse files Browse the repository at this point in the history
Instead use _random_subnets_from_cidrs() to generate blocks form the
list of CIDRs.
  • Loading branch information
Shaun Crampton committed Apr 12, 2016
1 parent 775edb0 commit fab36c2
Show file tree
Hide file tree
Showing 2 changed files with 180 additions and 99 deletions.
261 changes: 168 additions & 93 deletions calico_containers/pycalico/ipam.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ def _release_block_affinity(self, host, block_cidr):

raise RuntimeError("Max retries hit.") # pragma: no cover

def _random_blocks(self, excluded_ids, version, pool):
def _random_blocks(self, excluded_ids, version, pool, seed=None):
"""
Get an list of block CIDRs, in random order.
Expand All @@ -326,23 +326,12 @@ def _random_blocks(self, excluded_ids, version, pool):
"wrong attributes" % pool)
# Confine search to only the one pool.
ip_pools = [pool]

random_blocks = []
i = 0
for pool in ip_pools:
for block_cidr in pool.cidr.subnet(BLOCK_PREFIXLEN[version]):
if block_cidr not in excluded_ids:
# add this block. We use an "inside-out" Fisher-Yates
# shuffle to randomize the list as we create it. See
# http://en.wikipedia.org/wiki/Fisher%E2%80%93Yates_shuffle
j = random.randint(0, i)
if j != i:
random_blocks.append(random_blocks[j])
random_blocks[j] = block_cidr
else:
random_blocks.append(block_cidr)
i += 1
return random_blocks
cidrs = [p.cidr for p in ip_pools]
for block_cidr in _random_subnets_from_cidrs(cidrs,
BLOCK_PREFIXLEN[version],
seed=seed):
if block_cidr not in excluded_ids:
yield block_cidr

def _increment_handle(self, handle_id, block_cidr, amount):
"""
Expand Down Expand Up @@ -655,25 +644,91 @@ def _auto_assign(self, ip_version, num, handle_id,
:return:
"""
assert isinstance(handle_id, str) or handle_id is None

# Start by trying to assign from one of the host-affine blocks. We
# always do strict checking at this stage, so it doesn't matter whether
# globally we have strict_affinity or not.
block_list = self._get_affine_blocks(host,
ip_version,
pool)
block_ids = list(block_list)
_log.info("Looking for %s IPs in already-allocated affine blocks.",
num)
host_blocks = self._get_affine_blocks(host, ip_version, pool)
num_remaining = num
allocated_ips = self._allocate_ips_explicit_blocks(
host_blocks,
num_remaining,
attributes,
handle_id,
host
)
num_remaining = num - len(allocated_ips)
if len(allocated_ips) < num:
# Still addresses to allocate, we've run out of blocks with
# affinity. Before we can assign new blocks or assign in
# non-affine blocks, we need to check that our IPAM configuration
# allows that.
ipam_config = self.get_ipam_config()

# If we can auto allocate blocks, try to fulfill address request by
# allocating new blocks.
if ipam_config.auto_allocate_blocks:
_log.info("Attempt to allocate %s IPs from new affine blocks",
num_remaining)
ips_from_new_blocks = self._allocate_ips_from_new_blocks(
num_remaining,
attributes,
handle_id,
host,
ip_version,
pool,
ipam_config
)
allocated_ips.extend(ips_from_new_blocks)
num_remaining = num - len(allocated_ips)

if num_remaining > 0:
# We've run out of IPs in our blocks and failed to allocate new
# blocks. If we're allowed, try to grab IPs from random
# blocks.
if not ipam_config.strict_affinity:
_log.info("Still need to allocate %s IPs; strict affinity"
"disabled, trying random blocks.", num_remaining)
ips_from_random_blocks = self._allocate_ips_no_affinity(
num_remaining,
attributes,
handle_id,
host,
ip_version,
pool,
excluded_blocks=set(host_blocks)
)
allocated_ips.extend(ips_from_random_blocks)
_log.info("Allocated %s of %s requested IPs", len(allocated_ips), num)
return allocated_ips

def _allocate_ips_explicit_blocks(self, blocks, num, attributes, handle_id,
host):
"""Tries to allocate IPs from the explicitly-listed blocks.
:param list blocks: Blocks to allocate from (for example, the affine
blocks for a host).
:param num: Number to try to allocate.
:param attributes: Contents of this dict will be stored with the
assignment and can be queried using get_assignment_attributes(). Must
be JSON serializable.
:param handle_id: Handle ID to associate with the allocations.
:param host: The host ID to use for affinity in assigning IP addresses.
:return: list of allocated IPs or an empty list if none were available.
"""
# Copy the list so we can use it as a retry queue.
remaining_host_blocks = deque(blocks)
key_errors = 0
allocated_ips = []

num_remaining = num
while num_remaining > 0:
while len(allocated_ips) < num:
try:
block_id = block_ids.pop(0)
block_id = remaining_host_blocks.popleft()
except IndexError:
_log.info("Ran out of affine blocks for %s in pool %s",
host, pool)
_log.info("No free IPs in pre-existing affine blocks for "
"host %s", host)
break
num_remaining = num - len(allocated_ips)
try:
ips = self._auto_assign_ips_in_block(block_id,
num_remaining,
Expand All @@ -693,7 +748,7 @@ def _auto_assign(self, ip_version, num, handle_id,
key_errors += 1
if key_errors <= KEY_ERROR_RETRIES:
_log.debug("Queueing block %s for retry.", block_id)
block_ids.append(block_id)
remaining_host_blocks.append(block_id)
else:
_log.warning("Stopping retry of block %s.", block_id)
continue
Expand All @@ -706,78 +761,98 @@ def _auto_assign(self, ip_version, num, handle_id,
block_id)
continue
allocated_ips.extend(ips)
num_remaining = num - len(allocated_ips)
return allocated_ips

# If there are still addresses to allocate, then we've run out of
# blocks with affinity. Before we can assign new blocks or assign in
# non-affine blocks, we need to check that our IPAM configuration
# allows that.
ipam_config = self.get_ipam_config()
def _allocate_ips_from_new_blocks(self, num, attributes, handle_id,
host, ip_version, pool, ipam_config):
"""Attempts to allocate new affine block(s) for the given host and
then to allocate IPs from them.
# If we can auto allocate blocks, try to fulfill address request by
# allocating new blocks.
if ipam_config.auto_allocate_blocks:
_log.debug("Attempt to allocate new affine blocks")
:param num: Number to try to allocate.
:param attributes: Contents of this dict will be stored with the
assignment and can be queried using get_assignment_attributes(). Must
be JSON serializable.
:param handle_id: Handle ID to associate with the allocations.
:param host: The host ID to use for affinity in assigning IP addresses.
:param ip_version: IP version to use when choosing a pool.
:param pool: IP pool to choose from, or None for "any pool".
:param ipam_config: Pre-loaded IPAM config object.
:return: list of allocated IPs or an empty list if none were available.
"""
retries = RETRIES
allocated_ips = []
while len(allocated_ips) < num and retries > 0:
retries -= 1
try:
new_block = self._new_affine_block(host,
ip_version,
pool,
ipam_config)
# If successful, this creates the block and registers it to
# us.
except NoFreeBlocksError:
_log.info("Could not get new host affinity block for %s in "
"pool %s", host, pool)
break
num_remaining = num - len(allocated_ips)
ips = self._auto_assign_ips_in_block(new_block,
num_remaining,
handle_id,
attributes,
host)
allocated_ips.extend(ips)
if retries == 0: # pragma: no cover
raise RuntimeError("Hit Max Retries.")
return allocated_ips

retries = RETRIES
while num_remaining > 0 and retries > 0:
retries -= 1
try:
new_block = self._new_affine_block(host,
ip_version,
pool,
ipam_config)
# If successful, this creates the block and registers it to
# us.
except NoFreeBlocksError:
_log.info("Could not get new host affinity block for %s in "
"pool %s", host, pool)
break
ips = self._auto_assign_ips_in_block(new_block,
num_remaining,
handle_id,
attributes,
host)
allocated_ips.extend(ips)
num_remaining = num - len(allocated_ips)
if retries == 0: # pragma: no cover
raise RuntimeError("Hit Max Retries.")
def _allocate_ips_no_affinity(self, num, attributes, handle_id,
host, ip_version, pool,
excluded_blocks):
"""Tries to allocate IP addresses from any available block, without
affinity.
# If there are still addresses to allocate, we've now tried all blocks
# with some affinity to us, and tried (and failed) to allocate new
# ones. If we do not require strict host affinity, our last option is
# a random hunt through any blocks we haven't yet tried.
#
:param num: Number to try to allocate.
:param attributes: Contents of this dict will be stored with the
assignment and can be queried using get_assignment_attributes(). Must
be JSON serializable.
:param handle_id: Handle ID to associate with the allocations.
:param host: The host ID to use for affinity in assigning IP addresses.
:param ip_version: IP version to use when choosing a pool.
:param pool: IP pool to choose from, or None for "any pool".
:param excluded_blocks: set of blocks to exclude from the search, for
example, to exclude blocks that we've already looked in.
:return: list of allocated IPs or an empty list if none were available.
"""
# Note that this processing simply takes all of the IP pools and breaks
# them up into block-sized CIDRs, then shuffles and searches through each
# CIDR. This algorithm does not work if we disallow auto-allocation of
# blocks because the allocated blocks may be sparsely populated in the
# pools resulting in a very slow search for free addresses.
# them up into block-sized CIDRs, then searches through each CIDR in a
# random order. This algorithm does not work if we disallow
# auto-allocation of blocks because the allocated blocks may be
# sparsely populated in the pools resulting in a very slow search for
# free addresses.
#
# If we need to support non-strict affinity and no auto-allocation of
# blocks, then we should query the actual allocation blocks and assign
# from those.
if not ipam_config.strict_affinity:
_log.debug("Attempt to allocate from non-affine random block")
if num_remaining > 0:
random_blocks = iter(self._random_blocks(block_list,
ip_version,
pool))
while num_remaining > 0:
try:
block_id = random_blocks.next()
except StopIteration:
_log.warning("All addresses exhausted in pool %s", pool)
break
ips = self._auto_assign_ips_in_block(block_id,
num_remaining,
handle_id,
attributes,
host,
affinity_check=False)
allocated_ips.extend(ips)
num_remaining = num - len(allocated_ips)

_log.debug("Attempt to allocate from non-affine random block")
random_blocks = self._random_blocks(excluded_ids=excluded_blocks,
version=ip_version,
pool=pool,
seed=host)
allocated_ips = []
while len(allocated_ips) < num:
try:
block_id = next(random_blocks)
except StopIteration:
_log.warning("All addresses exhausted in pool %s", pool)
break
num_remaining = num - len(allocated_ips)
ips = self._auto_assign_ips_in_block(block_id,
num_remaining,
handle_id,
attributes,
host,
affinity_check=False)
allocated_ips.extend(ips)
return allocated_ips

def _auto_assign_ips_in_block(self, block_cidr, num, handle_id, attributes,
Expand Down
18 changes: 12 additions & 6 deletions calico_containers/tests/unit/test_ipam.py
Original file line number Diff line number Diff line change
Expand Up @@ -1810,7 +1810,9 @@ def m_get_ip_pools(_self, version, ipam, include_disabled):

with patch("pycalico.datastore.DatastoreClient.get_ip_pools",
m_get_ip_pools):
random_blocks = self.client._random_blocks(excluded_ids, 4, None)
random_blocks = list(
self.client._random_blocks(excluded_ids, 4, None)
)

# Excluded 3, but only 2 in the pool, so 1024 - 2 = 1022 blocks.
assert_equal(len(random_blocks), 1022)
Expand All @@ -1827,7 +1829,9 @@ def m_get_ip_pools(_self, version, ipam, include_disabled):

# check we aren't doing something stupid, like returning the same
# order every time.
random_blocks2 = self.client._random_blocks(excluded_ids, 4, None)
random_blocks2 = list(
self.client._random_blocks(excluded_ids, 4, None)
)
assert_equal(len(random_blocks2), 1022)

differs = False
Expand All @@ -1850,8 +1854,8 @@ def m_get_ip_pools(_self, version, ipam, include_disabled):

with patch("pycalico.datastore.DatastoreClient.get_ip_pools",
m_get_ip_pools):
assert_raises(PoolNotFound, self.client._random_blocks,
[], 4, IPPool("10.1.0.0/16"))
blocks = self.client._random_blocks([], 4, IPPool("10.1.0.0/16"))
assert_raises(PoolNotFound, list, blocks)

def test_random_blocks_good_pool(self):
"""
Expand All @@ -1869,8 +1873,10 @@ def m_get_ip_pools(_self, version, ipam, include_disabled):

with patch("pycalico.datastore.DatastoreClient.get_ip_pools",
m_get_ip_pools):
random_blocks = self.client._random_blocks(excluded_ids, 4,
IPPool("10.11.0.0/16"))
ip_pool = IPPool("10.11.0.0/16")
random_blocks = list(
self.client._random_blocks(excluded_ids, 4, ip_pool)
)

# Excluded 3, but only 2 in the pool, so 1024 - 2 = 1022 blocks.
assert_equal(len(random_blocks), 1022)
Expand Down

0 comments on commit fab36c2

Please sign in to comment.