diff --git a/calico_containers/pycalico/util.py b/calico_containers/pycalico/util.py index 795aaa27..138adab3 100755 --- a/calico_containers/pycalico/util.py +++ b/calico_containers/pycalico/util.py @@ -104,14 +104,14 @@ def get_hostname(): return socket.gethostname() -def validate_port_str(port_str): +def validate_port_str(port_str, return_error=False): """ Checks whether the command line word specifying a set of ports is valid. """ - return validate_ports(port_str.split(",")) + return validate_ports(port_str.split(","), return_error=return_error) -def validate_ports(port_list): +def validate_ports(port_list, return_error=False): """ Checks whether a list of ports are within range of 0 and 65535. The port list must include a number or a number range. @@ -123,7 +123,9 @@ def validate_ports(port_list): :param port_list: :return: a Boolean: True if in range, False if not in range """ + err = None in_range = True + for port in port_list: if ":" in str(port): ports = port.split(":") @@ -135,145 +137,202 @@ def validate_ports(port_list): except ValueError: in_range = False if not in_range: + err = ValueError("Port is invalid. Value must be between 0 and " + "65536 ('{0}' given).".format(port)) break - return in_range + return err if return_error else in_range -def validate_characters(input_string): +def validate_characters(input_string, return_error=False): """ Validate that characters in string are supported by Felix. Felix supports letters a-z, numbers 0-9, and symbols _.- :param input_string: string to be validated + :param return_error: When True, any errors produced during validation will + be returned and None if there are no errors :return: Boolean: True if valid, False if invalid """ + err = None # List of valid characters that Felix permits valid_chars = '[a-zA-Z0-9_\.\-]' # Check for invalid characters + valid = True if not re.match("^%s+$" % valid_chars, input_string): - return False - else: - return True + valid = False + err = ValueError("Invalid string. Felix only supports alphanumeric " + "and the symbols '_', '.', and '-' ('{0}' given)" + "".format(input_string)) + + return err if return_error else valid -def validate_icmp_type(icmp_type): +def validate_icmp_type(icmp_type, return_error=False): """ Validate that icmp_type is an integer between 0 and 255. If not return False. :param icmp_type: int value representing an icmp type + :param return_error: When True, any errors produced during validation will + be returned and None if there are no errors :return: Boolean: True if valid icmp type, False if not """ + err = None + try: valid = 0 <= int(icmp_type) < 255 except ValueError: valid = False - return valid + + if not valid and return_error: + err = ValueError("ICMP type is invalid. Value must be between 0 and " + "255 ('{0}' given).".format(icmp_type)) + + return err if return_error else valid -def validate_hostname_port(hostname_port): +def validate_hostname_port(hostname_port, return_error=False): """ Validate the hostname and port format. (:) An IPv4 address is a valid hostname. :param hostname_port: The string to verify + :param return_error: When True, any errors produced during validation will + be returned and None if there are no errors :return: Boolean: True if valid, False if invalid """ # Should contain a single ":" separating hostname and port if not isinstance(hostname_port, str): - _log.error("Must provide string for hostname:port validation, not: %s" - % type(hostname_port)) - return False + err_mess = ("Must provide string for hostname:port validation, not: " + "{0}".format(type(hostname_port))) + _log.error(err_mess) + return TypeError(err_mess) if return_error else False try: (hostname, port) = hostname_port.split(":") except ValueError: - _log.error("Must provide a string splittable by ':' for hostname-port.") - return False + err_mess = "Must provide a string splittable by ':' for hostname-port." + _log.error(err_mess) + return ValueError(err_mess) if return_error else False # Check the hostname format. - if not validate_hostname(hostname): - return False + result = validate_hostname(hostname, return_error=return_error) + if return_error and result: + return result + elif not return_error and not result: + return result # Check port range. try: port = int(port) except ValueError: - _log.error("Port must be an integer.") - return False + err_mess = "Port must be an integer." + _log.error(err_mess) + return TypeError(err_mess) if return_error else False if port < 1 or port > 65535: - _log.error("Provided port (%d) must be between 1 and 65535." % port) - return False - return True + err_mess = ("Provided port {0} must be between 1 and 65535." + "".format(port)) + _log.error(err_mess) + return ValueError(err_mess) if return_error else False + return None if return_error else True -def validate_hostname(hostname): +def validate_hostname(hostname, return_error=False): """ Validate a hostname string. This allows standard hostnames and IPv4 addresses. :param hostname: The hostname to validate. + :param return_error: When True, any errors produced during validation will + be returned and None if there are no errors :return: Boolean: True if valid, False if invalid """ # Hostname length is limited. if not isinstance(hostname, str): - _log.error("Hostname must be a string, not %s" % type(hostname)) - return False + err_mess = "Hostname must be a string, not {0}".format(hostname) + _log.error(err_mess) + return TypeError(err_mess) if return_error else False hostname_len = len(hostname) if hostname_len > 255: - _log.error("Hostname length (%d) should be less than 255 characters." - % hostname_len) - return False + err_mess = ("Hostname length (%d) should be less than 255 characters." + "".format(hostname_len)) + _log.error(err_mess) + return ValueError(err_mess) if return_error else False # Hostname labels may consist of numbers, letters and hyphens, but may not # end or begin with a hyphen. allowed = re.compile("(?!-)[a-z\d-]{1,63}(? mtu 65536 qdisc noqueue state UNKNOWN group default link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00 inet 127.0.0.1/8 scope host lo @@ -46,8 +49,7 @@ valid_lft forever preferred_lft forever """ -MOCK_IP_ADDR_DOCKER_NONE = \ -""" +MOCK_IP_ADDR_DOCKER_NONE = """ 1: lo: mtu 65536 qdisc noqueue state UNKNOWN group default link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00 inet 127.0.0.1/8 scope host lo @@ -72,8 +74,7 @@ valid_lft forever preferred_lft forever """ -MOCK_IP_ADDR_LOOPBACK = \ -""" +MOCK_IP_ADDR_LOOPBACK = """ 1: lo: mtu 65536 inet6 ::1/128 scope host valid_lft forever preferred_lft forever @@ -183,9 +184,10 @@ def test_get_host_ips_exclude_docker_prefix(self, m_check_output): @patch("pycalico.util.check_output", autospec=True) def test_get_host_ips_fail_check_output(self, m_check_output): '''Test get_host_ip failing to check output of ip addr''' - m_check_output.side_effect = CalledProcessError(returncode=1, cmd=check_output(["ip", "-4", "addr"])) + m_check_output.side_effect = CalledProcessError( + returncode=1, cmd=check_output(["ip", "-4", "addr"])) with self.assertRaises(SystemExit): - addrs = get_host_ips(version=4) + get_host_ips(version=4) @parameterized.expand([ ([2, 5, '114'], True), @@ -202,6 +204,24 @@ def test_validate_ports(self, input_list, expected_result): test_result = validate_ports(input_list) self.assertEqual(expected_result, test_result) + @parameterized.expand([ + ([1, 2, "123"], None), + (["1:2", 3], None), + (["-1"], ValueError), + (["one", "two"], ValueError), + (["1:2:3:4"], ValueError), + (["4:1"], ValueError) + ]) + def test_validate_ports_return_error(self, input_list, expected_result): + """ + Test validate_ports function with return_error flag + """ + test_result = validate_ports(input_list, return_error=True) + if expected_result: + self.assertIsInstance(test_result, expected_result) + else: + self.assertIsNone(test_result) + # Each input parameter for this test is the command line word following 'to # ports' or 'from ports'. @parameterized.expand([ @@ -221,6 +241,25 @@ def test_validate_port_str(self, input_word, expected_result): test_result = validate_port_str(input_word) self.assertEqual(expected_result, test_result) + @parameterized.expand([ + ("1,2,123", None), + ("1:2,3", None), + ("-1", ValueError), + ("one,two", ValueError), + ("1:2:3:4", ValueError), + ("4:1", ValueError) + ]) + def test_validate_ports_str_return_error(self, input_str, + expected_result): + """ + Test validate_ports_str function with return_error_flag + """ + test_result = validate_port_str(input_str, return_error=True) + if expected_result: + self.assertIsInstance(test_result, expected_result) + else: + self.assertIsNone(test_result) + @parameterized.expand([ (300, False), (15, True), @@ -236,6 +275,25 @@ def test_validate_icmp_type(self, input_list, expected_result): test_result = validate_icmp_type(input_list) self.assertEqual(expected_result, test_result) + @parameterized.expand([ + (300, ValueError), + (15, None), + (255, ValueError), + (-7, ValueError), + ('one', ValueError), + ('43', None) + ]) + def test_validate_icmp_type_return_error(self, input_list, + expected_result): + """ + Test validate_icmp_type function with return_error flag + """ + test_result = validate_icmp_type(input_list, return_error=True) + if expected_result: + self.assertIsInstance(test_result, expected_result) + else: + self.assertIsNone(test_result) + @parameterized.expand([ ('abcdefghijklmnopqrstuvwxyz', True), ('0123456789', True), @@ -257,12 +315,30 @@ def test_validate_characters(self, input_string, expected_result): """ Test validate_characters function """ - with patch('sys.exit', autospec=True) as m_sys_exit: - # Call method under test - test_result = validate_characters(input_string) + # Call method under test + test_result = validate_characters(input_string) - # Assert expected result - self.assertEqual(expected_result, test_result) + # Assert expected result + self.assertEqual(expected_result, test_result) + + @parameterized.expand([ + ('name_1', None), + ('name-1', None), + ('name.1', None), + ('name 1', ValueError), + ('name*1', ValueError) + ]) + def test_validate_characters_return_error(self, input_string, + expected_result): + """ + Test validate_characters function with return_error flag + """ + test_result = validate_characters(input_string, return_error=True) + + if expected_result: + self.assertIsInstance(test_result, expected_result) + else: + self.assertIsNone(test_result) @parameterized.expand([ ('1.2.3.4', False), @@ -303,6 +379,30 @@ def test_validate_hostname_port(self, input_string, expected_result): # Assert expected result self.assertEqual(expected_result, test_result) + @parameterized.expand([ + ("domain.com:1234", None), + ("123.123.123.123:123", None), + (1234, TypeError), + ("abcd", ValueError), + ("a b:123", ValueError), + ("domain.com:aa", TypeError), + ("domain.com:999999", ValueError) + ]) + def test_validate_hostname_port_return_error(self, input_string, + expected_result): + """ + Test validate_hostname_port function with return_error flag + + This also tests validate_hostname which is invoked from + validate_hostname_port. + """ + test_result = validate_hostname_port(input_string, return_error=True) + + if expected_result: + self.assertIsInstance(test_result, expected_result) + else: + self.assertIsNone(test_result) + @parameterized.expand([ ('127.a.0.1', False), ('aa:bb::zz', False), @@ -322,6 +422,23 @@ def test_validate_cidr(self, cidr, expected_result): # Assert self.assertEqual(expected_result, test_result) + @parameterized.expand([ + ("127.0.0.0/32", None), + ("dead::beef", None), + ("127.0.0.0/33", ValueError), + ("abcd", ValueError) + ]) + def test_validate_cidr_return_error(self, cidr, expected_result): + """ + Test validate_cidr function in calico_ctl utils + """ + test_result = validate_cidr(cidr, return_error=True) + + if expected_result: + self.assertIsInstance(test_result, expected_result) + else: + self.assertIsNone(test_result) + @parameterized.expand([ (["1.2.3.4"], 4, True), (["1.2.3.4"], None, True), @@ -331,7 +448,7 @@ def test_validate_cidr(self, cidr, expected_result): (["10.0.0.1", "11.0.0.1", "11.0.0.1"], None, True), (["1111:2222:3333:4444:5555:6666:7777:8888", "a::b"], 6, True), (["1111:2222:3333:4444:5555:6666:7777:8888", "a::b", "1234::1"], - None, True), + None, True), (["127.1.0.1", "dead:beef"], None, False), (["aa:bb::zz"], 4, False), (["1.2.3.4"], 6, False), @@ -339,25 +456,52 @@ def test_validate_cidr(self, cidr, expected_result): (["0bad:beef", "1.2.3.4"], 6, False), (["0bad:beef", "1.2.3.4"], None, False), ]) - def test_validate_cidr_versions(self, cidr_list, ip_version, expected_result): + def test_validate_cidr_versions(self, cidr_list, ip_version, + expected_result): """ Test validate_cidr_versions function in calico_ctl utils """ # Call method under test test_result = validate_cidr_versions(cidr_list, - ip_version=ip_version) + ip_version=ip_version) # Assert self.assertEqual(expected_result, test_result) + @parameterized.expand([ + (["1.2.3.4"], 4, None), + (["1.2.3.4"], None, None), + (["1.2.3.4"], 8, ValueError), + (["1.2.3.4/33"], None, ValueError), + (["dead::beef"], 6, None), + (["dead::beef"], None, None) + ]) + def test_validate_cidr_versions_return_error(self, cidr_list, ip_version, + expected_result): + """ + Test validate_cidr_versions function in calico_ctl utils + """ + # Call method under test + test_result = validate_cidr_versions(cidr_list, + ip_version=ip_version, + return_error=True) + + if expected_result: + self.assertIsInstance(test_result, expected_result) + else: + self.assertIsNone(test_result) + @parameterized.expand([ ('1.2.3.4', 4, True), ('1.2.3.4', 6, False), ('1.2.3.4', 4, True), + ('1.2.3.4', None, True), ('1.2.3.0/24', 4, False), ('aa:bb::ff', 4, False), ('aa:bb::ff', 6, True), + ('aa:bb::ff', None, True), ('1111:2222:3333:4444:5555:6666:7777:8888', 6, True), + ('zzz', None, False) ]) def test_validate_ip(self, ip, version, expected_result): """ @@ -368,3 +512,26 @@ def test_validate_ip(self, ip, version, expected_result): # Assert self.assertEqual(expected_result, test_result) + + @parameterized.expand([ + ('1.2.3.4', 4, None), + ('1.2.3.4', None, None), + ('1.2.3.4', 'a', TypeError), + ('1.2.3.4', 5, ValueError), + ('1.2.3.4', 6, ValueError), + ('dead::beef', 6, None), + ('dead::beef', None, None), + ('aa:bb::ff', 4, ValueError), + ('zzz', None, ValueError) + ]) + def test_validate_ip_return_error(self, ip, version, expected_result): + """ + Test validate_ip function in calico_ctl utils + """ + # Call method under test + test_result = validate_ip(ip, version, return_error=True) + + if expected_result: + self.assertIsInstance(test_result, expected_result) + else: + self.assertIsNone(test_result)