diff --git a/.gitignore b/.gitignore index 2d11335c..573b4f8c 100644 --- a/.gitignore +++ b/.gitignore @@ -3,7 +3,8 @@ dist venv env .idea -*.pyc +**/*.pyc +**/*.sw[pon] *~ .coverage cover diff --git a/calico_containers/pycalico/util.py b/calico_containers/pycalico/util.py index 795aaa27..aefd2e88 100755 --- a/calico_containers/pycalico/util.py +++ b/calico_containers/pycalico/util.py @@ -1,15 +1,19 @@ -from netaddr.core import AddrFormatError +#!/usr/bin/python +"""General utility functions""" -import netaddr import socket import sys import os import re import logging -from netaddr import IPNetwork, IPAddress from subprocess import check_output, CalledProcessError -_log = logging.getLogger(__name__) +import netaddr +from netaddr import IPNetwork, IPAddress +from netaddr.core import AddrFormatError + + +_log = logging.getLogger(__name__) # pylint: disable=invalid-name _log.addHandler(logging.NullHandler()) HOSTNAME_ENV = "HOSTNAME" @@ -28,9 +32,51 @@ IPV6_RE = re.compile(r'inet6 ([a-fA-F\d:]+)/\d{1,3}') +class CharValidationError(ValueError): + """ + Error when passed resource string includes incompatible characters + or is missing required ones. + """ + pass + + +class RangeValidationError(ValueError): + """Error when passed resource is outside the range of valid values""" + pass + + +class TypeValidationError(TypeError): + """Error when passed resource is an invalid type""" + pass + + +class VersionMismatchError(ValueError): + """Error when passed IP Version does not match the CIDR/IP""" + pass + + +def _return_bool(func, *args, **kwargs): + """ + Simple function to catch exceptions and return a Bool + + :param func: The function being run + :param *args: All positional args being passed to func + :param **kwargs: All key-value args being passed to func + :return: True if function succeeds, False if exception is raised + :rtype: bool + """ + try: + func(*args, **kwargs) + except (AddrFormatError, CharValidationError, TypeValidationError, + RangeValidationError, VersionMismatchError): + return False + else: + return True + + def generate_cali_interface_name(prefix, ep_id): - """Helper method to generate a name for a calico veth, given the endpoint - ID + """ + Helper method to generate a name for a calico veth, given the endpoint ID This takes a prefix, and then truncates the EP ID. @@ -67,7 +113,7 @@ def get_host_ips(version=4, exclude=None): try: ip_addr_output = check_output(["ip", "-%d" % version, "addr"]) except (CalledProcessError, OSError): - print "Call to 'ip addr' Failed" + print("Call to 'ip addr' Failed") sys.exit(1) # Separate interface blocks from ip addr output and iterate. @@ -104,211 +150,505 @@ def get_hostname(): return socket.gethostname() -def validate_port_str(port_str): - """ - Checks whether the command line word specifying a set of ports is valid. +def validate_asn(asn): """ - return validate_ports(port_str.split(",")) + DEPRECATED (use verify_asn) + Validate the format of a 2-byte or 4-byte autonomous system number -def validate_ports(port_list): + :param asn: User input of AS number + :type asn: str + :return: True if valid format, False if invalid format + :rtype: bool """ - Checks whether a list of ports are within range of 0 and 65535. - The port list must include a number or a number range. + return _return_bool(verify_asn, asn) - A valid number range must be two numbers delimited by a colon with the - second number higher than the first. Both numbers must be within range. - If a number range is invalid, the function will return False. - :param port_list: - :return: a Boolean: True if in range, False if not in range +def verify_asn(asn): """ - in_range = True - for port in port_list: - if ":" in str(port): - ports = port.split(":") - in_range = (len(ports) == 2) and (int(ports[0]) < int(ports[1])) \ - and validate_ports(ports) - else: - try: - in_range = 0 <= int(port) < 65536 - except ValueError: - in_range = False - if not in_range: - break + Validate the format of a 2-byte or 4-byte autonomous system number + + :param asn: User input of AS number + :type asn: str + :return: None + :rtype: None + """ + + try: + asn_str = str(asn) + except ValueError: + raise TypeValidationError("AS Number cannot be converted to a " + "string (''{0}'' given)".format(asn)) + + if "." in asn_str: + left_asn, right_asn = asn_str.split(".") + + try: + left_asn_int = int(left_asn) + right_asn_int = int(right_asn) + except ValueError: + raise TypeValidationError("ASDOT notation incorrect. ASDOT " + "should consist of two intergers " + "separated by a period " + "('{0}' given)".format(asn)) + + if not 0 <= left_asn_int <= 65535: + raise RangeValidationError("Left side of as.dot not in range " + "('{0}' given)".format(left_asn)) + elif not 0 <= right_asn_int <= 65535: + raise RangeValidationError("Right side of as.dot not in range " + "('{0}' given)".format(right_asn)) + else: + try: + asn_int = int(asn) + # Passing a tuple == TypeError, passing "a" == ValueError + except (TypeError, ValueError): + raise TypeValidationError("ASPLAIN number could not be " + "converted to an int. " + "('{0}' given)".format(asn)) - return in_range + if not 0 <= asn_int <= 4294967295: + raise RangeValidationError("ASPLAIN number not in range ('{0}' " + "given)".format(asn)) def validate_characters(input_string): + """ + DEPRECATED (use verify_characters) + + Validate that characters in string are supported by Felix. + Felix supports letters a-z, numbers 0-9, and symbols _.- + + :param input_string: to be validated + :type input_string: str + :return: returns True if valid, False if invalid + :rtype: bool + """ + return _return_bool(verify_characters, input_string) + + +def verify_characters(input_string): """ Validate that characters in string are supported by Felix. Felix supports letters a-z, numbers 0-9, and symbols _.- :param input_string: string to be validated - :return: Boolean: True if valid, False if invalid + :type input_string: str + :return: None + :rtype: None """ # List of valid characters that Felix permits - valid_chars = '[a-zA-Z0-9_\.\-]' + valid_chars = r'[a-zA-Z0-9_\.\-]' # Check for invalid characters if not re.match("^%s+$" % valid_chars, input_string): - return False - else: - return True + raise CharValidationError("Invalid string. Felix only supports " + "alphanumeric and the symbols '_', '.', " + "and '-' ('{0}' given)".format(input_string)) -def validate_icmp_type(icmp_type): +def validate_cidr(cidr): """ - Validate that icmp_type is an integer between 0 and 255. - If not return False. + DEPRECATED (use verify_cidr) - :param icmp_type: int value representing an icmp type - :return: Boolean: True if valid icmp type, False if not + Validate cidr is in correct CIDR notation + + :param cidr: IP addr and associated routing prefix + :type cidr: str + :return: True if valid IP, False if invalid + :rtype: bool + """ + return _return_bool(verify_cidr, cidr) + + +def verify_cidr(cidr): + """ + Validate cidr is in correct CIDR notation + + :param cidr: IP addr and associated routing prefix + :type cidr: str + :return: None + :rtype: None """ try: - valid = 0 <= int(icmp_type) < 255 - except ValueError: - valid = False - return valid + netaddr.IPNetwork(cidr) + except (AddrFormatError, ValueError) as exc: + # Some versions of Netaddr have a bug causing them to return a + # ValueError rather than an AddrFormatError, so catch both. + raise AddrFormatError("CIDR is invalid. " + str(exc).capitalize()) -def validate_hostname_port(hostname_port): +def validate_cidr_versions(cidrs, ip_version=None): """ - Validate the hostname and port format. (:) - An IPv4 address is a valid hostname. + DEPRECATED (use verify_cidr_versions) + + Validate CIDR versions match each other and (if specified) the given IP + version. + + :param cidrs: List of CIDRs whose versions need verification + :param ip_version: Expected IP version that CIDRs should use (4, 6, None) + If None, CIDRs should all have same IP version + :type cidrs: list, tuple + :type ip_version: int, str, None + :return: True if versions match each other and ip_version, False otherwise + :rtype: bool + """ + return _return_bool(verify_cidr_versions, cidrs, ip_version) - :param hostname_port: The string to verify - :return: Boolean: True if valid, False if invalid + +def verify_cidr_versions(cidrs, ip_version=None): """ - # Should contain a single ":" separating hostname and port - if not isinstance(hostname_port, str): - _log.error("Must provide string for hostname:port validation, not: %s" - % type(hostname_port)) - return False + Validate CIDR versions match each other and (if specified) the given IP + version. - try: - (hostname, port) = hostname_port.split(":") - except ValueError: - _log.error("Must provide a string splittable by ':' for hostname-port.") - return False + :param cidrs: List of CIDRs whose versions need verification + :param ip_version: Expected IP version that CIDRs should use (4, 6, None) + If None, CIDRs should all have same IP version + :type cidrs: list, tuple + :type ip_version: int, str, None + :return: None + :rtype: None + """ + for cidr in cidrs: + try: + network = netaddr.IPNetwork(cidr) + except (AddrFormatError, ValueError) as exc: + # Some versions of Netaddr have a bug causing them to return a + # ValueError rather than an AddrFormatError, so catch both. + raise AddrFormatError("CIDR is invalid. " + str(exc).capitalize()) - # Check the hostname format. - if not validate_hostname(hostname): - return False + if ip_version is None: + ip_version = network.version + else: + try: + ip_version_int = int(ip_version) + except (TypeError, ValueError): + TypeValidationError("IP Version could not be converted to an " + "int ('{0}' given)".format(ip_version)) + if ip_version_int not in (4, 6): + raise RangeValidationError("IP Version invalid. Only 4 and 6 " + "are valid versions, and '{0}' " + "was given.".format(ip_version)) - # Check port range. - try: - port = int(port) - except ValueError: - _log.error("Port must be an integer.") - return False - if port < 1 or port > 65535: - _log.error("Provided port (%d) must be between 1 and 65535." % port) - return False - return True + if ip_version_int != network.version: + raise VersionMismatchError("IP Version does not match " + "CIDR(s).") def validate_hostname(hostname): """ - Validate a hostname string. This allows standard hostnames and IPv4 + DEPRECATED (use verify_hostname) + + Validates a hostname string. This allows standard hostnames and IP addresses. :param hostname: The hostname to validate. - :return: Boolean: True if valid, False if invalid + :type hostname: str + :return: True if valid, False if invalid + :rtype: bool + """ + return _return_bool(verify_hostname, hostname) + + +def verify_hostname(hostname): + """ + Validates a hostname string. This allows standard hostnames and IP + addresses. + + :param hostname: The hostname to validate. + :type hostname: str + :return: None + :rtype: None """ # Hostname length is limited. if not isinstance(hostname, str): - _log.error("Hostname must be a string, not %s" % type(hostname)) - return False + err_mess = "Hostname must be a string, not {0}".format(hostname) + _log.error(err_mess) + raise TypeValidationError(err_mess) + hostname_len = len(hostname) - if hostname_len > 255: - _log.error("Hostname length (%d) should be less than 255 characters." - % hostname_len) - return False + + if not 0 < hostname_len < 255: + err_mess = ("Hostname length can only be 1 to 254 chars long (length " + "{0} given)".format(hostname_len)) + _log.error(err_mess) + raise RangeValidationError(err_mess) + + # NOTE: The real limit in DNS is 255 octets (253 chars) or 254 chars + # if you include the root domain (i.e. a period on the end). RFC1035 + if hostname_len == 254 and not hostname.endswith('.'): + err_mess = ("Hostname length can only be 1 to 254 chars long, and " + "only 254 chars if it includes the root domain. Passed " + "hostname is 254 chars with no trailing dot. ('{0}' given)" + "".format(hostname)) + _log.error(err_mess) + raise RangeValidationError(err_mess) + + if ':' in hostname: + try: + IPAddress(hostname) + except (AddrFormatError, ValueError): + err_mess = ("Hostname '{0}' has a colon in it, but is not a valid " + "IPv6 address. Thus, it is not DNS resolvable or " + "routable.".format(hostname)) + _log.error(err_mess) + raise CharValidationError(err_mess) + else: + # Hostname is a valid IP Address. Skipping regex + return # Hostname labels may consist of numbers, letters and hyphens, but may not # end or begin with a hyphen. - allowed = re.compile("(?!-)[a-z\d-]{1,63}(?:) + An IPv4 address is a valid hostname. + + :param hostname_port: The hostname:port to verify + :type hostname_port: str + :return: True if valid, False if invalid + :rtype: bool + """ + return _return_bool(verify_hostname_port, hostname_port) + + +def verify_hostname_port(hostname_port): """ + Validate the hostname and port format. (:) + An IPv4 address is a valid hostname. + + :param hostname_port: The hostname:port to verify + :type hostname_port: str + :return: None + :rtype: None + """ + # Should contain a single ":" separating hostname and port + if not isinstance(hostname_port, str): + err_mess = ("Must provide string for hostname:port validation, not " + "{0}".format(type(hostname_port))) + _log.error(err_mess) + raise TypeValidationError(err_mess) + try: - if "." in str(asn): - left_asn, right_asn = str(asn).split(".") - asn_ok = (0 <= int(left_asn) <= 65535) and \ - (0 <= int(right_asn) <= 65535) - else: - asn_ok = 0 <= int(asn) <= 4294967295 + hostname, port = hostname_port.rsplit(":", 1) + except ValueError: + err_mess = ("Must provide a string splittable by ':' for " + "hostname-port. ('{0} given')".format(hostname_port)) + _log.error(err_mess) + raise CharValidationError(err_mess) + + # Check the hostname format. + verify_hostname(hostname) + + # Check port range. + try: + port_int = int(port) except ValueError: - asn_ok = False + err_mess = ("Port must be able to convert to an integer. ('{0}' given)" + "".format(port)) + _log.error(err_mess) + raise TypeValidationError(err_mess) - return asn_ok + if not 1 <= port_int <= 65535: + err_mess = ("Provided port {0} must be from 1 to 65535." + "".format(port)) + _log.error(err_mess) + raise RangeValidationError(err_mess) -def validate_cidr(cidr): +def validate_icmp_type(icmp_type): """ - Validate cidr is in correct CIDR notation + DEPRECATED (use verify_icmp_type) - :param cidr: IP addr and associated routing prefix - :return: Boolean: True if valid IP, False if invalid + Validate that icmp_type is an integer from 0 to 255. + If not return False. + + :param icmp_type: int value representing an icmp type + :type icmp_type: str, int + :return: True if valid icmp type, False if not + :rtype: bool """ - try: - netaddr.IPNetwork(cidr) - return True - except (AddrFormatError, ValueError): - # Some versions of Netaddr have a bug causing them to return a - # ValueError rather than an AddrFormatError, so catch both. - return False + return _return_bool(verify_icmp_type, icmp_type) -def validate_cidr_versions(cidrs, ip_version=None): +def verify_icmp_type(icmp_type): """ - Validate CIDR versions match each other and (if specified) the given IP - version. + Validate that icmp_type is an integer from 0 to 255. + If not return False. - :param cidrs: List of CIDRs whose versions need verification - :param ip_version: Expected IP version that CIDRs should use (4, 6, None) - If None, CIDRs should all have same IP version - :return: Boolean: True if versions match each other and ip_version, - False otherwise + :param icmp_type: int value representing an icmp type + :type icmp_type: int, str + :return: None + :rtype: None """ try: - for cidr in cidrs: - network = netaddr.IPNetwork(cidr) - if ip_version is None: - ip_version = network.version - elif ip_version != network.version: - return False - except (AddrFormatError, ValueError): - # Some versions of Netaddr have a bug causing them to return a - # ValueError rather than an AddrFormatError, so catch both. - return False - return True + icmp_type_int = int(icmp_type) + except (TypeError, ValueError): + raise TypeValidationError("ICMP type is invalid. '{0}' could not " + "be converted to an int.".format(icmp_type)) + if not 0 <= icmp_type_int <= 255: + raise RangeValidationError("ICMP type is invalid. Value must be " + "between 0 and 255 ('{0}' given)." + "".format(icmp_type)) -def validate_ip(ip_addr, version): +def validate_ip(ip_addr, version=None): """ + DEPRECATED (use verify_ip) + Validate that ip_addr is a valid IPv4 or IPv6 address :param ip_addr: IP address to be validated :param version: 4 or 6 - :return: Boolean: True if valid, False if invalid. + :type ip_addr: str + :type version: int, str, None + :return: True if valid, False if invalid. + :rtype: bool + """ + assert version in (4, 6) # For backward compatibility + + return _return_bool(verify_ip, ip_addr, version) + + +def verify_ip(ip_addr, version=None): + """ + Validate that ip_addr is a valid IPv4 or IPv6 address + + :param ip_addr: IP address to be validated + :param version: 4 or 6 + :type ip_addr: str + :type version: int, str, None + :return: None + :rtype: None + """ + if version: + try: + version_int = int(version) + except (TypeError, ValueError): + raise TypeValidationError("Version could not be converted to " + "an integer. ('{0}' as given)." + "".format(version)) + + if version_int not in (4, 6): + raise RangeValidationError("Version is invalid. Should be 4 or " + "6, but '{0}' was given." + "".format(version)) + + # NOTE: Most integers will work here, due to netaddr's internal + # index, which might be misleading. + try: + address = IPAddress(ip_addr) + except (AddrFormatError, ValueError): + raise AddrFormatError("'{0}' is not a valid IP address.") + + if version and address.version != version_int: + raise VersionMismatchError("'{0}' is not a valid IPv{1} address." + "".format(ip_addr, version)) + + +def validate_port_str(port_str): + """ + DEPRECATED (use verify_port_str) + + Checks whether the command line word specifying a set of ports is valid. + + :param port_str: A comma delimited list of ports and port ranges + :type port_str: str + :return: returns True if ports are in range, False if not in range + :rtype: bool + """ + return validate_ports(port_str.split(",")) + + +def verify_port_str(port_str): + """ + Checks whether the command line word specifying a set of ports is valid + and raises if not. + + :param port_str: A comma delimited list of ports and port ranges + :type port_str: str + :return: None + :rtype: None + """ + return verify_ports(port_str.split(",")) + + +def validate_ports(port_list): + """ + DEPRECATED (use verify_ports) + + Checks whether a list of ports are within range of 0 and 65535. + The port list must include a number or a number range. + + A valid number range must be two numbers delimited by a colon with the + second number higher than the first. Both numbers must be within range. + If a number range is invalid, the function will raise ValueError. + + :param port_list: A collection of ports and port ranges + :type port_list: list, tuple + :return: returns True if passed, False if exception raised + :rtype: bool + """ + return _return_bool(verify_ports, port_list) + + +def verify_ports(port_list): + """ + Checks whether a list of ports are within range of 0 and 65535. + The port list must include a number or a number range. + + A valid number range must be two numbers delimited by a colon with the + second number higher than the first. Both numbers must be within range. + If a number range is invalid, the function will raise ValueError. + + :param port_list: A collection of ports and port ranges + :type port_list: list, tuple + :return: None + :rtype: None """ - assert version in (4, 6) + for port in port_list: + if ":" in str(port): + ports = port.split(":") - if version == 4: - return netaddr.valid_ipv4(ip_addr) - if version == 6: - return netaddr.valid_ipv6(ip_addr) + try: + port1_int = int(ports[0]) + except ValueError: + raise TypeValidationError("Port on left side of range could " + "not be converted to an int ('{0}' " + "given).".format(port)) + try: + port2_int = int(ports[1]) + except ValueError: + raise TypeValidationError("Port on right side of range could " + "not be converted to an int ('{0}' " + "given).".format(port)) + + if not ((len(ports) == 2) and port1_int < port2_int and + validate_ports(ports)): + raise RangeValidationError("Port range is invalid. Values " + "must be from 0 to 65535 ('{0}' " + "given).".format(port)) + else: + try: + port_int = int(port) + except: + raise TypeValidationError("Unable to convert port to an " + "integer ('{0}' given)".format(port)) + if not 0 <= port_int <= 65535: + raise RangeValidationError("Port is invalid. Value must be " + "from 0 to 65535 ('{0}' given)." + "".format(port)) diff --git a/calico_containers/tests/unit/test_util.py b/calico_containers/tests/unit/test_util.py index 10700713..4e55dc72 100644 --- a/calico_containers/tests/unit/test_util.py +++ b/calico_containers/tests/unit/test_util.py @@ -1,3 +1,5 @@ +#!/usr/bin/python +"""Test module for pycalico/util.py""" # Copyright 2015 Metaswitch Networks # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -12,16 +14,19 @@ # See the License for the specific language governing permissions and # limitations under the License. + +from subprocess import CalledProcessError, check_output import unittest from mock import patch + from netaddr import IPAddress -from subprocess import CalledProcessError, check_output -from pycalico.util import get_host_ips, validate_characters, validate_ports, validate_icmp_type, validate_hostname_port, \ - validate_cidr_versions, validate_ip, validate_cidr, validate_port_str +from netaddr.core import AddrFormatError from nose_parameterized import parameterized -MOCK_IP_ADDR = \ -""" +import pycalico.util as util + + +MOCK_IP_ADDR = """ 1: lo: mtu 65536 qdisc noqueue state UNKNOWN group default link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00 inet 127.0.0.1/8 scope host lo @@ -46,8 +51,7 @@ valid_lft forever preferred_lft forever """ -MOCK_IP_ADDR_DOCKER_NONE = \ -""" +MOCK_IP_ADDR_DOCKER_NONE = """ 1: lo: mtu 65536 qdisc noqueue state UNKNOWN group default link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00 inet 127.0.0.1/8 scope host lo @@ -72,8 +76,7 @@ valid_lft forever preferred_lft forever """ -MOCK_IP_ADDR_LOOPBACK = \ -""" +MOCK_IP_ADDR_LOOPBACK = """ 1: lo: mtu 65536 inet6 ::1/128 scope host valid_lft forever preferred_lft forever @@ -83,158 +86,158 @@ class TestUtil(unittest.TestCase): - + """Test Case Class for pycalico/util.py""" @patch("pycalico.util.check_output", autospec=True) def test_get_host_ips_standard(self, m_check_output): '''Test general case for get_host_ips''' # Test IPv4 m_check_output.return_value = MOCK_IP_ADDR - addrs = get_host_ips(version=4) + addrs = util.get_host_ips(version=4) m_check_output.assert_called_once_with(["ip", "-4", "addr"]) m_check_output.reset_mock() - self.assertEquals(addrs, [IPAddress('172.24.114.18'), - IPAddress('172.17.42.1')]) + self.assertEqual(addrs, [IPAddress('172.24.114.18'), + IPAddress('172.17.42.1')]) # Test IPv6 - addrs = get_host_ips(version=6) + addrs = util.get_host_ips(version=6) m_check_output.assert_called_once_with(["ip", "-6", "addr"]) m_check_output.reset_mock() - self.assertEquals(addrs, - [IPAddress('2620:104:4008:69:8d7c:499f:2f04:9e55'), - IPAddress('2620:104:4008:69:a00:27ff:fe73:c8d0'), - IPAddress('fe80::a00:27ff:fe73:c8d0'), - IPAddress('fe80::188f:d6ff:fe1f:1482')]) + self.assertEqual(addrs, + [IPAddress('2620:104:4008:69:8d7c:499f:2f04:9e55'), + IPAddress('2620:104:4008:69:a00:27ff:fe73:c8d0'), + IPAddress('fe80::a00:27ff:fe73:c8d0'), + IPAddress('fe80::188f:d6ff:fe1f:1482')]) @patch("pycalico.util.check_output", autospec=True) def test_get_host_ips_loopback_only(self, m_check_output): '''Test get_host_ips with loopback''' # Test IPv4 m_check_output.return_value = MOCK_IP_ADDR_LOOPBACK - addrs = get_host_ips(version=4) + addrs = util.get_host_ips(version=4) m_check_output.assert_called_once_with(["ip", "-4", "addr"]) m_check_output.reset_mock() - self.assertEquals(addrs, []) + self.assertEqual(addrs, []) # Test IPv6 - addrs = get_host_ips(version=6) + addrs = util.get_host_ips(version=6) m_check_output.assert_called_once_with(["ip", "-6", "addr"]) m_check_output.reset_mock() - self.assertEquals(addrs, []) + self.assertEqual(addrs, []) @patch("pycalico.util.check_output", autospec=True) def test_get_host_ips_exclude_docker(self, m_check_output): '''Test get_host_ips exclude "docker0"''' # Test IPv4 m_check_output.return_value = MOCK_IP_ADDR - addrs = get_host_ips(version=4, exclude=["docker0"]) + addrs = util.get_host_ips(version=4, exclude=["docker0"]) m_check_output.assert_called_once_with(["ip", "-4", "addr"]) m_check_output.reset_mock() - self.assertEquals(addrs, [IPAddress('172.24.114.18')]) + self.assertEqual(addrs, [IPAddress('172.24.114.18')]) # Test IPv6 - addrs = get_host_ips(version=6, exclude=["docker0"]) + addrs = util.get_host_ips(version=6, exclude=["docker0"]) m_check_output.assert_called_once_with(["ip", "-6", "addr"]) m_check_output.reset_mock() - self.assertEquals(addrs, - [IPAddress('2620:104:4008:69:8d7c:499f:2f04:9e55'), - IPAddress('2620:104:4008:69:a00:27ff:fe73:c8d0'), - IPAddress('fe80::a00:27ff:fe73:c8d0')]) + self.assertEqual(addrs, + [IPAddress('2620:104:4008:69:8d7c:499f:2f04:9e55'), + IPAddress('2620:104:4008:69:a00:27ff:fe73:c8d0'), + IPAddress('fe80::a00:27ff:fe73:c8d0')]) @patch("pycalico.util.check_output", autospec=True) def test_get_host_ips_exclude_empty(self, m_check_output): '''Test get_host_ips exclude empty list''' # Test IPv4 m_check_output.return_value = MOCK_IP_ADDR - addrs = get_host_ips(version=4, exclude=["^$"]) + addrs = util.get_host_ips(version=4, exclude=["^$"]) m_check_output.assert_called_once_with(["ip", "-4", "addr"]) m_check_output.reset_mock() - self.assertEquals(addrs, [IPAddress('172.24.114.18'), - IPAddress('172.17.42.1')]) + self.assertEqual(addrs, [IPAddress('172.24.114.18'), + IPAddress('172.17.42.1')]) # Test IPv6 - addrs = get_host_ips(version=6, exclude=["^$"]) + addrs = util.get_host_ips(version=6, exclude=["^$"]) m_check_output.assert_called_once_with(["ip", "-6", "addr"]) m_check_output.reset_mock() - self.assertEquals(addrs, - [IPAddress('2620:104:4008:69:8d7c:499f:2f04:9e55'), - IPAddress('2620:104:4008:69:a00:27ff:fe73:c8d0'), - IPAddress('fe80::a00:27ff:fe73:c8d0'), - IPAddress('fe80::188f:d6ff:fe1f:1482')]) + self.assertEqual(addrs, + [IPAddress('2620:104:4008:69:8d7c:499f:2f04:9e55'), + IPAddress('2620:104:4008:69:a00:27ff:fe73:c8d0'), + IPAddress('fe80::a00:27ff:fe73:c8d0'), + IPAddress('fe80::188f:d6ff:fe1f:1482')]) @patch("pycalico.util.check_output", autospec=True) def test_get_host_ips_exclude_docker_prefix(self, m_check_output): '''Test get_host_ips exclude "docker0.*''' # Test IPv4 m_check_output.return_value = MOCK_IP_ADDR_DOCKER_NONE - addrs = get_host_ips(version=4, exclude=["docker0.*"]) + addrs = util.get_host_ips(version=4, exclude=["docker0.*"]) m_check_output.assert_called_once_with(["ip", "-4", "addr"]) m_check_output.reset_mock() - self.assertEquals(addrs, [IPAddress('172.24.114.18')]) + self.assertEqual(addrs, [IPAddress('172.24.114.18')]) # Test IPv6 - addrs = get_host_ips(version=6, exclude=["docker0.*"]) + addrs = util.get_host_ips(version=6, exclude=["docker0.*"]) m_check_output.assert_called_once_with(["ip", "-6", "addr"]) m_check_output.reset_mock() - self.assertEquals(addrs, - [IPAddress('2620:104:4008:69:8d7c:499f:2f04:9e55'), - IPAddress('2620:104:4008:69:a00:27ff:fe73:c8d0'), - IPAddress('fe80::a00:27ff:fe73:c8d0')]) + self.assertEqual(addrs, + [IPAddress('2620:104:4008:69:8d7c:499f:2f04:9e55'), + IPAddress('2620:104:4008:69:a00:27ff:fe73:c8d0'), + IPAddress('fe80::a00:27ff:fe73:c8d0')]) @patch("pycalico.util.check_output", autospec=True) def test_get_host_ips_fail_check_output(self, m_check_output): '''Test get_host_ip failing to check output of ip addr''' - m_check_output.side_effect = CalledProcessError(returncode=1, cmd=check_output(["ip", "-4", "addr"])) + m_check_output.side_effect = CalledProcessError( + returncode=1, cmd=check_output(["ip", "-4", "addr"])) with self.assertRaises(SystemExit): - addrs = get_host_ips(version=4) + util.get_host_ips(version=4) @parameterized.expand([ - ([2, 5, '114'], True), - (['89:133', 19], True), - ([15, 66, -144], False), - (['-1:5'], False), - (['15:77:66'], False), - (['one', 'two'], False) + (1, True), + ("1", True), + (1.1, True), + ("1.1", True), + (-1, False), + (-1.1, False), + ("a", False), + ("1.a", False), + ("a.1", False), + ((1, 2, 3), False), + (5000000000, False), + ("66000.1", False), + ("1.66000", False) ]) - def test_validate_ports(self, input_list, expected_result): + def test_validate_asn(self, input_str, expected_result): """ - Test validate_ports function + Test validate_asn function """ - test_result = validate_ports(input_list) - self.assertEqual(expected_result, test_result) + test_result = util.validate_asn(input_str) - # Each input parameter for this test is the command line word following 'to - # ports' or 'from ports'. - @parameterized.expand([ - ('2,5,114', True), - ('89:133,19', True), - ('15,66,-144', False), - ('-1:5', False), - ('15:77:66', False), - ('39040:39080', True), - ('39080:39040', False), - ('one,two', False) - ]) - def test_validate_port_str(self, input_word, expected_result): - """ - Test validate_port_str function - """ - test_result = validate_port_str(input_word) - self.assertEqual(expected_result, test_result) + self.assertEqual(test_result, expected_result) @parameterized.expand([ - (300, False), - (15, True), - (255, False), - (-7, False), - ('one', False), - ('43', True) + (1, None), + ("1", None), + (1.1, None), + ("1.1", None), + (-1, util.RangeValidationError), + (-1.1, util.RangeValidationError), + ("a", util.TypeValidationError), + ("1.a", util.TypeValidationError), + ("a.1", util.TypeValidationError), + ((1, 2, 3), util.TypeValidationError), + (5000000000, util.RangeValidationError), + ("66000.1", util.RangeValidationError), + ("1.66000", util.RangeValidationError) ]) - def test_validate_icmp_type(self, input_list, expected_result): + def test_verify_asn(self, input_str, expected_result): """ - Test validate_icmp_type function + Test verify_asn function """ - test_result = validate_icmp_type(input_list) - self.assertEqual(expected_result, test_result) + if expected_result: + with self.assertRaises(expected_result): + util.verify_asn(input_str) + else: + self.assertIsNone(util.verify_asn(input_str)) @parameterized.expand([ ('abcdefghijklmnopqrstuvwxyz', True), @@ -257,18 +260,142 @@ def test_validate_characters(self, input_string, expected_result): """ Test validate_characters function """ - with patch('sys.exit', autospec=True) as m_sys_exit: - # Call method under test - test_result = validate_characters(input_string) + # Call method under test + test_result = util.validate_characters(input_string) - # Assert expected result - self.assertEqual(expected_result, test_result) + # Assert expected result + self.assertEqual(expected_result, test_result) + + @parameterized.expand([ + ('abcdefghijklmnopqrstuvwxyz', None), + ('0123456789', None), + ('profile_1', None), + ('profile-1', None), + ('profile 1', util.CharValidationError), + ('profile.1', None), + ('!', util.CharValidationError), + ('@', util.CharValidationError), + ('#', util.CharValidationError), + ('$', util.CharValidationError), + ('%', util.CharValidationError), + ('^', util.CharValidationError), + ('&', util.CharValidationError), + ('*', util.CharValidationError), + ('()', util.CharValidationError) + ]) + def test_verify_characters(self, input_string, expected_result): + """ + Test verify_characters function + """ + if expected_result: + with self.assertRaises(expected_result): + util.verify_characters(input_string) + else: + self.assertIsNone(util.verify_characters(input_string)) + + @parameterized.expand([ + ('127.a.0.1', False), + ('aa:bb::zz', False), + ('1.2.3.4', True), + ('1.2.3.0/24', True), + ('aa:bb::ff', True), + ('1111:2222:3333:4444:5555:6666:7777:8888', True), + ('4294967295', False) + ]) + def test_validate_cidr(self, cidr, expected_result): + """ + Test validate_cidr function in calico_ctl utils + """ + # Call method under test + test_result = util.validate_cidr(cidr) + + # Assert + self.assertEqual(expected_result, test_result) + + @parameterized.expand([ + ('127.a.0.1', AddrFormatError), + ('aa:bb::zz', AddrFormatError), + ('1.2.3.4', None), + ('1.2.3.0/24', None), + ('aa:bb::ff', None), + ('1111:2222:3333:4444:5555:6666:7777:8888', None), + ('4294967295', AddrFormatError) + ]) + def test_verify_cidr(self, cidr, expected_result): + """ + Test verify_cidr function in calico_ctl utils + """ + if expected_result: + with self.assertRaises(expected_result): + util.verify_cidr(cidr) + else: + self.assertIsNone(util.verify_cidr(cidr)) + + @parameterized.expand([ + (["1.2.3.4"], 4, True), + (["1.2.3.4"], None, True), + (["aa:bb::zz"], 6, False), + (["aa:bb::zz"], None, False), + (["10.0.0.1", "11.0.0.1", "11.0.0.1"], 4, True), + (["10.0.0.1", "11.0.0.1", "11.0.0.1"], None, True), + (["1111:2222:3333:4444:5555:6666:7777:8888", "a::b"], 6, True), + (["1111:2222:3333:4444:5555:6666:7777:8888", "a::b", "1234::1"], + None, True), + (["127.1.0.1", "dead:beef"], None, False), + (["aa:bb::cc"], 4, False), + (["1.2.3.4"], 6, False), + (["1.2.3.4"], 8, False), + (["0bad::beef", "1.2.3.4"], 4, False), + (["0bad::beef", "1.2.3.4"], 6, False), + (["0bad::beef", "1.2.3.4"], None, False), + ]) + def test_validate_cidr_versions(self, cidr_list, ip_version, + expected_result): + """ + Test validate_cidr_versions function in calico_ctl utils + """ + # Call method under test + test_result = util.validate_cidr_versions(cidr_list, + ip_version=ip_version) + + # Assert + self.assertEqual(expected_result, test_result) + + @parameterized.expand([ + (["1.2.3.4"], 4, None), + (["1.2.3.4"], None, None), + (["aa:bb::zz"], 6, AddrFormatError), + (["aa:bb::zz"], None, AddrFormatError), + (["10.0.0.1", "11.0.0.1", "11.0.0.1"], 4, None), + (["10.0.0.1", "11.0.0.1", "11.0.0.1"], None, None), + (["1111:2222:3333:4444:5555:6666:7777:8888", "a::b"], 6, None), + (["1111:2222:3333:4444:5555:6666:7777:8888", "a::b", "1234::1"], + None, None), + (["127.1.0.1", "dead::beef"], None, util.VersionMismatchError), + (["aa:bb::cc"], 4, util.VersionMismatchError), + (["1.2.3.4"], 6, util.VersionMismatchError), + (["1.2.3.4"], 8, util.RangeValidationError), + (["0bad::beef", "1.2.3.4"], 4, util.VersionMismatchError), + (["0bad::beef", "1.2.3.4"], 6, util.VersionMismatchError), + (["0bad::beef", "1.2.3.4"], None, util.VersionMismatchError), + ]) + def test_verify_cidr_versions(self, cidr_list, ip_version, + expected_result): + """ + Test verify_cidr_versions function in calico_ctl utils + """ + if expected_result: + with self.assertRaises(expected_result): + util.verify_cidr_versions(cidr_list, ip_version) + else: + self.assertIsNone(util.verify_cidr_versions(cidr_list, + ip_version)) @parameterized.expand([ ('1.2.3.4', False), ('', False), ('abcde', False), - ('aa:bb::cc:1234', False), + ('aa:bb::cc:1234', True), ('aa::256', False), (':1234', False), ('aa...bb:256', False), @@ -280,7 +407,6 @@ def test_validate_characters(self, input_string, expected_result): ('asr-temp-test.thr.yes-33:100', True), ('asr-temp-test.-thr.yes-33:100', False), ('asr-temp-test.thr-.yes-33:100', False), - ('asr-temp-test.thr-.yes-33:100', False), ('validhostname:0', False), ('validhostname:65536', False), ('validhostname:1', True), @@ -298,73 +424,199 @@ def test_validate_hostname_port(self, input_string, expected_result): This also tests validate_hostname which is invoked from validate_hostname_port. """ - test_result = validate_hostname_port(input_string) + test_result = util.validate_hostname_port(input_string) # Assert expected result self.assertEqual(expected_result, test_result) @parameterized.expand([ - ('127.a.0.1', False), - ('aa:bb::zz', False), - ('1.2.3.4', True), - ('1.2.3.0/24', True), - ('aa:bb::ff', True), - ('1111:2222:3333:4444:5555:6666:7777:8888', True), - ('4294967295', False) + ('1.2.3.4', util.CharValidationError), + ('', util.CharValidationError), + ('abcde', util.CharValidationError), + ('aa:bb::cc:1234', None), + ('aa::256', util.CharValidationError), + (':1234', util.RangeValidationError), + ('aa...bb:256', util.CharValidationError), + ('aa:256', None), + ('1.2.3.244:256', None), + ('1.2.a.244:256', None), + ('-asr:100', util.CharValidationError), + ('asr-:100', util.CharValidationError), + ('asr-temp-test.thr.yes-33:100', None), + ('asr-temp-test.-thr.yes-33:100', util.CharValidationError), + ('asr-temp-test.thr-.yes-33:100', util.CharValidationError), + ('validhostname:0', util.RangeValidationError), + ('validhostname:65536', util.RangeValidationError), + ('validhostname:1', None), + ('validhostname:65535', None), + ('#notvalidhostname:65535', util.CharValidationError), + ('verylong' * 100 + ':200', util.RangeValidationError), + ('12.256.122.43:aaa', util.TypeValidationError), + (12345, util.TypeValidationError), + (("1.2.3.244:256",), util.TypeValidationError) ]) - def test_validate_cidr(self, cidr, expected_result): + def test_verify_hostname_port(self, input_string, expected_result): """ - Test validate_cidr function in calico_ctl utils + Test verify_hostname_port function. + + This also tests verify_hostname which is invoked from + verify_hostname_port. """ - # Call method under test - test_result = validate_cidr(cidr) + if expected_result: + with self.assertRaises(expected_result): + util.verify_hostname_port(input_string) + else: + self.assertIsNone(util.verify_hostname_port(input_string)) - # Assert + @parameterized.expand([ + (300, False), + (15, True), + (255, True), + (-7, False), + ('one', False), + ('43', True) + ]) + def test_validate_icmp_type(self, input_list, expected_result): + """ + Test validate_icmp_type function + """ + test_result = util.validate_icmp_type(input_list) self.assertEqual(expected_result, test_result) @parameterized.expand([ - (["1.2.3.4"], 4, True), - (["1.2.3.4"], None, True), - (["aa:bb::zz"], 6, False), - (["aa:bb::zz"], None, False), - (["10.0.0.1", "11.0.0.1", "11.0.0.1"], 4, True), - (["10.0.0.1", "11.0.0.1", "11.0.0.1"], None, True), - (["1111:2222:3333:4444:5555:6666:7777:8888", "a::b"], 6, True), - (["1111:2222:3333:4444:5555:6666:7777:8888", "a::b", "1234::1"], - None, True), - (["127.1.0.1", "dead:beef"], None, False), - (["aa:bb::zz"], 4, False), - (["1.2.3.4"], 6, False), - (["0bad:beef", "1.2.3.4"], 4, False), - (["0bad:beef", "1.2.3.4"], 6, False), - (["0bad:beef", "1.2.3.4"], None, False), + (300, util.RangeValidationError), + (15, None), + (255, None), + (-7, util.RangeValidationError), + ('one', util.TypeValidationError), + ('43', None) ]) - def test_validate_cidr_versions(self, cidr_list, ip_version, expected_result): + def test_verify_icmp_type(self, input_type, expected_result): """ - Test validate_cidr_versions function in calico_ctl utils + Test verify_icmp_type function """ - # Call method under test - test_result = validate_cidr_versions(cidr_list, - ip_version=ip_version) - - # Assert - self.assertEqual(expected_result, test_result) + if expected_result: + with self.assertRaises(expected_result): + util.verify_icmp_type(input_type) + else: + self.assertIsNone(util.verify_icmp_type(input_type)) @parameterized.expand([ ('1.2.3.4', 4, True), ('1.2.3.4', 6, False), ('1.2.3.4', 4, True), + ('1.2.3.4', None, True), + ('1.2.3.4', 'z', False), ('1.2.3.0/24', 4, False), ('aa:bb::ff', 4, False), ('aa:bb::ff', 6, True), + ('aa:bb::ff', None, True), ('1111:2222:3333:4444:5555:6666:7777:8888', 6, True), + ('zzz', None, False) ]) - def test_validate_ip(self, ip, version, expected_result): + def test_validate_ip(self, ip_addr, version, expected_result): """ Test validate_ip function in calico_ctl utils """ - # Call method under test - test_result = validate_ip(ip, version) + if version not in (4, 6): + with self.assertRaises(AssertionError): + util.validate_ip(ip_addr, version) + else: + test_result = util.validate_ip(ip_addr, version) - # Assert + self.assertEqual(expected_result, test_result) + + @parameterized.expand([ + ('1.2.3.4', 4, None), + ('1.2.3.4', 6, util.VersionMismatchError), + ('1.2.3.4', 4, None), + ('1.2.3.4', None, None), + ('1.2.3.4', 'z', util.TypeValidationError), + ('1.2.3.0/24', 4, AddrFormatError), + ('aa:bb::ff', 4, util.VersionMismatchError), + ('aa:bb::ff', 6, None), + ('aa:bb::ff', None, None), + ('1111:2222:3333:4444:5555:6666:7777:8888', 6, None), + ('zzz', None, AddrFormatError) + ]) + def test_verify_ip(self, ip_addr, version, expected_result): + """ + Test verify_ip function in calico_ctl utils + """ + if expected_result: + with self.assertRaises(expected_result): + util.verify_ip(ip_addr, version) + else: + self.assertIsNone(util.verify_ip(ip_addr, version)) + + # Each input parameter for this test is the command line word following 'to + # ports' or 'from ports'. + @parameterized.expand([ + ('2,5,114', True), + ('89:133,19', True), + ('15,66,-144', False), + ('-1:5', False), + ('15:77:66', False), + ('39040:39080', True), + ('39080:39040', False), + ('one,two', False) + ]) + def test_validate_port_str(self, input_word, expected_result): + """ + Tests validate_port_str function + """ + test_result = util.validate_port_str(input_word) self.assertEqual(expected_result, test_result) + + @parameterized.expand([ + ('2,5,114', None), + ('89:133,19', None), + ('15,66,-144', util.RangeValidationError), + ('-1:5', util.RangeValidationError), + ('15:77:66', util.RangeValidationError), + ('39040:39080', None), + ('39080:39040', util.RangeValidationError), + ('one,two', util.TypeValidationError) + ]) + def test_verify_port_str(self, input_str, expected_result): + """ + Tests verify_port_str function + """ + if expected_result: + with self.assertRaises(expected_result): + util.verify_port_str(input_str) + else: + self.assertIsNone(util.verify_port_str(input_str)) + + @parameterized.expand([ + ([2, 5, '114'], True), + (['89:133', 19], True), + ([15, 66, -144], False), + (['-1:5'], False), + (['15:77:66'], False), + (['one', 'two'], False) + ]) + def test_validate_ports(self, input_list, expected_result): + """ + Test validate_ports function + """ + test_result = util.validate_ports(input_list) + self.assertEqual(expected_result, test_result) + + @parameterized.expand([ + ([2, 5, '114'], None), + (['89:133', 19], None), + ([15, 66, -144], util.RangeValidationError), + (['-1:5'], util.RangeValidationError), + (['15:77:66'], util.RangeValidationError), + (['one', 'two'], util.TypeValidationError) + ]) + def test_verify_ports(self, input_list, expected_result): + """ + Test verify_ports function with return_error flag + """ + if expected_result: + with self.assertRaises(expected_result): + util.verify_ports(input_list) + else: + self.assertIsNone(util.verify_ports(input_list))