diff --git a/build/yamls/flow-visibility/policy-recommendation/antrea_crd.py b/build/yamls/flow-visibility/policy-recommendation/antrea_crd.py new file mode 100644 index 00000000000..eca9ac35845 --- /dev/null +++ b/build/yamls/flow-visibility/policy-recommendation/antrea_crd.py @@ -0,0 +1,577 @@ +# This library is used to define Antrea Network Policy related CRDs in Python. +# Code structure is following the Kubernetes Python Client library (https://github.com/kubernetes-client/python). +# Could be improved by using openAPI generator in the future. + +import six + +class NetworkPolicy(object): + attribute_types = { + "kind": "string", + "api_version": "string", + "metadata": "kubernetes.client.V1ObjectMeta", + "spec": "NetworkPolicySpec", + "status": "NetworkPolicyStatus" + } + + def __init__(self, kind=None, api_version=None, metadata=None, spec=None, status=None): + self.kind = kind + self.api_version = api_version + self.metadata = metadata + self.spec = spec + self.status = status + + def to_dict(self): + """Returns the model properties as a dict""" + result = {} + + for attr, _ in six.iteritems(self.attribute_types): + value = getattr(self, attr) + if isinstance(value, list): + result[attr] = list(map( + lambda x: x.to_dict() if hasattr(x, "to_dict") else x, + value + )) + elif hasattr(value, "to_dict"): + result[attr] = value.to_dict() + elif isinstance(value, dict): + result[attr] = dict(map( + lambda item: (item[0], item[1].to_dict()) + if hasattr(item[1], "to_dict") else item, + value.items() + )) + else: + result[attr] = value + + return result + + +class NetworkPolicySpec(object): + attribute_types = { + "tier": "string", + "priority": "float", + "applied_to": "list[NetworkPolicyPeer]", + "ingress": "list[Rule]", + "egress": "list[Rule]" + } + + def __init__(self, tier=None, priority=None, applied_to=None, ingress=None, egress=None): + self.tier = tier + self.priority = priority + self.applied_to = applied_to + self.ingress = ingress + self.egress = egress + + def to_dict(self): + """Returns the model properties as a dict""" + result = {} + + for attr, _ in six.iteritems(self.attribute_types): + value = getattr(self, attr) + if isinstance(value, list): + result[attr] = list(map( + lambda x: x.to_dict() if hasattr(x, "to_dict") else x, + value + )) + elif hasattr(value, "to_dict"): + result[attr] = value.to_dict() + elif isinstance(value, dict): + result[attr] = dict(map( + lambda item: (item[0], item[1].to_dict()) + if hasattr(item[1], "to_dict") else item, + value.items() + )) + else: + result[attr] = value + + return result + +class NetworkPolicyPeer(object): + attribute_types = { + "ip_block": "IPBlock", + "pod_selector": "kubernete.client.V1LabelSelector", + "namespace_selector": "kubernete.client.V1LabelSelector", + "namespaces": "PeerNamespaces", + "external_entity_selector": "kubernete.client.V1LabelSelector", + "group": "string", + "FQDN": "string" + } + + def __init__(self, ip_block=None, pod_selector=None, namespace_selector=None, namespaces=None, external_entity_selector=None, group=None, FQDN=None): + self.ip_block = ip_block + self.pod_selector = pod_selector + self.namespace_selector = namespace_selector + self.namespaces = namespaces + self.external_entity_selector = external_entity_selector + self.group = group + self.FQDN = FQDN + + def to_dict(self): + """Returns the model properties as a dict""" + result = {} + + for attr, _ in six.iteritems(self.attribute_types): + value = getattr(self, attr) + if isinstance(value, list): + result[attr] = list(map( + lambda x: x.to_dict() if hasattr(x, "to_dict") else x, + value + )) + elif hasattr(value, "to_dict"): + result[attr] = value.to_dict() + elif isinstance(value, dict): + result[attr] = dict(map( + lambda item: (item[0], item[1].to_dict()) + if hasattr(item[1], "to_dict") else item, + value.items() + )) + else: + result[attr] = value + + return result + + +class IPBlock(object): + attribute_types = { + "CIDR": "string" + } + + def __init__(self, CIDR=None): + self.CIDR = CIDR + + def to_dict(self): + """Returns the model properties as a dict""" + result = {} + + for attr, _ in six.iteritems(self.attribute_types): + value = getattr(self, attr) + if isinstance(value, list): + result[attr] = list(map( + lambda x: x.to_dict() if hasattr(x, "to_dict") else x, + value + )) + elif hasattr(value, "to_dict"): + result[attr] = value.to_dict() + elif isinstance(value, dict): + result[attr] = dict(map( + lambda item: (item[0], item[1].to_dict()) + if hasattr(item[1], "to_dict") else item, + value.items() + )) + else: + result[attr] = value + + return result + +class PeerNamespaces(object): + attribute_types = { + "Match": "string" + } + + def __init__(self, Match=None): + self.Match = Match + + def to_dict(self): + """Returns the model properties as a dict""" + result = {} + + for attr, _ in six.iteritems(self.attribute_types): + value = getattr(self, attr) + if isinstance(value, list): + result[attr] = list(map( + lambda x: x.to_dict() if hasattr(x, "to_dict") else x, + value + )) + elif hasattr(value, "to_dict"): + result[attr] = value.to_dict() + elif isinstance(value, dict): + result[attr] = dict(map( + lambda item: (item[0], item[1].to_dict()) + if hasattr(item[1], "to_dict") else item, + value.items() + )) + else: + result[attr] = value + + return result + +class Rule(object): + attribute_types = { + "action": "string", + "ports": "list[NetworkPolicyPort]", + "_from": "list[NetworkPolicyPeer]", + "to": "list[NetworkPolicyPeer]", + "name": "string", + "enable_logging": "bool", + "applied_to": "ist[NetworkPolicyPeer]" + } + + def __init__(self, action=None, ports=None, _from=None, to=None, name=None, enable_logging=None, applied_to=None): + self.action = action + self.ports = ports + self._from = _from + self.to = to + self.name = name + self.enable_logging = enable_logging + self.applied_to = applied_to + + def to_dict(self): + """Returns the model properties as a dict""" + result = {} + + for attr, _ in six.iteritems(self.attribute_types): + value = getattr(self, attr) + if isinstance(value, list): + result[attr] = list(map( + lambda x: x.to_dict() if hasattr(x, "to_dict") else x, + value + )) + elif hasattr(value, "to_dict"): + result[attr] = value.to_dict() + elif isinstance(value, dict): + result[attr] = dict(map( + lambda item: (item[0], item[1].to_dict()) + if hasattr(item[1], "to_dict") else item, + value.items() + )) + else: + result[attr] = value + + return result + +class NetworkPolicyPort(object): + attribute_types = { + "protocol": "string", + "port": "int or string", + "endport": "int", + } + + def __init__(self, protocol=None, port=None, endport=None): + self.protocol = protocol + self.port = port + self.endport = endport + + def to_dict(self): + """Returns the model properties as a dict""" + result = {} + + for attr, _ in six.iteritems(self.attribute_types): + value = getattr(self, attr) + if isinstance(value, list): + result[attr] = list(map( + lambda x: x.to_dict() if hasattr(x, "to_dict") else x, + value + )) + elif hasattr(value, "to_dict"): + result[attr] = value.to_dict() + elif isinstance(value, dict): + result[attr] = dict(map( + lambda item: (item[0], item[1].to_dict()) + if hasattr(item[1], "to_dict") else item, + value.items() + )) + else: + result[attr] = value + + return result + +class ClusterGroup(object): + attribute_types = { + "kind": "string", + "api_version": "string", + "metadata": "kubernetes.client.V1ObjectMeta", + "spec": "GroupSpec", + "status": "GroupStatus" + } + + def __init__(self, kind=None, api_version=None, metadata=None, spec=None, status=None): + self.kind = kind + self.api_version = api_version + self.metadata = metadata + self.spec = spec + self.status = status + + def to_dict(self): + """Returns the model properties as a dict""" + result = {} + + for attr, _ in six.iteritems(self.attribute_types): + value = getattr(self, attr) + if isinstance(value, list): + result[attr] = list(map( + lambda x: x.to_dict() if hasattr(x, "to_dict") else x, + value + )) + elif hasattr(value, "to_dict"): + result[attr] = value.to_dict() + elif isinstance(value, dict): + result[attr] = dict(map( + lambda item: (item[0], item[1].to_dict()) + if hasattr(item[1], "to_dict") else item, + value.items() + )) + else: + result[attr] = value + + return result + +class GroupSpec(object): + attribute_types = { + "pod_selector": "kubernete.client.V1LabelSelector", + "namespace_selector": "kubernete.client.V1LabelSelector", + "ip_blocks": "list[IPBlock]", + "service_reference": "ServiceReference", + "external_entity_selector": "kubernete.client.V1LabelSelector", + "child_groups": "list[string]" + } + + def __init__(self, pod_selector=None, namespace_selector=None, ip_blocks=None, service_reference=None, external_entity_selector=None, child_groups=None): + self.pod_selector = pod_selector + self.namespace_selector = namespace_selector + self.ip_blocks = ip_blocks + self.service_reference = service_reference + self.external_entity_selector = external_entity_selector + self.child_groups = child_groups + + def to_dict(self): + """Returns the model properties as a dict""" + result = {} + + for attr, _ in six.iteritems(self.attribute_types): + value = getattr(self, attr) + if isinstance(value, list): + result[attr] = list(map( + lambda x: x.to_dict() if hasattr(x, "to_dict") else x, + value + )) + elif hasattr(value, "to_dict"): + result[attr] = value.to_dict() + elif isinstance(value, dict): + result[attr] = dict(map( + lambda item: (item[0], item[1].to_dict()) + if hasattr(item[1], "to_dict") else item, + value.items() + )) + else: + result[attr] = value + + return result + +class ServiceReference(object): + attribute_types = { + "name": "string", + "namespace": "string" + } + + def __init__(self, name=None, namespace=None): + self.name = name + self.namespace = namespace + + def to_dict(self): + """Returns the model properties as a dict""" + result = {} + + for attr, _ in six.iteritems(self.attribute_types): + value = getattr(self, attr) + if isinstance(value, list): + result[attr] = list(map( + lambda x: x.to_dict() if hasattr(x, "to_dict") else x, + value + )) + elif hasattr(value, "to_dict"): + result[attr] = value.to_dict() + elif isinstance(value, dict): + result[attr] = dict(map( + lambda item: (item[0], item[1].to_dict()) + if hasattr(item[1], "to_dict") else item, + value.items() + )) + else: + result[attr] = value + + return result + +class GroupStatus(object): + attribute_types = { + "conditions": "list[GroupCondition]" + } + + def __init__(self, conditions=None): + self.conditions = conditions + + def to_dict(self): + """Returns the model properties as a dict""" + result = {} + + for attr, _ in six.iteritems(self.attribute_types): + value = getattr(self, attr) + if isinstance(value, list): + result[attr] = list(map( + lambda x: x.to_dict() if hasattr(x, "to_dict") else x, + value + )) + elif hasattr(value, "to_dict"): + result[attr] = value.to_dict() + elif isinstance(value, dict): + result[attr] = dict(map( + lambda item: (item[0], item[1].to_dict()) + if hasattr(item[1], "to_dict") else item, + value.items() + )) + else: + result[attr] = value + + return result + +class GroupCondition(object): + attribute_types = { + "type": "string", + "status": "string", + "last_transition_time": "datetime", + } + + def __init__(self, type=None, status=None, last_transition_time=None): + self.type = type + self.status = status + self.last_transition_time = last_transition_time + + def to_dict(self): + """Returns the model properties as a dict""" + result = {} + + for attr, _ in six.iteritems(self.attribute_types): + value = getattr(self, attr) + if isinstance(value, list): + result[attr] = list(map( + lambda x: x.to_dict() if hasattr(x, "to_dict") else x, + value + )) + elif hasattr(value, "to_dict"): + result[attr] = value.to_dict() + elif isinstance(value, dict): + result[attr] = dict(map( + lambda item: (item[0], item[1].to_dict()) + if hasattr(item[1], "to_dict") else item, + value.items() + )) + else: + result[attr] = value + + return result + +class ClusterNetworkPolicy(object): + attribute_types = { + "kind": "string", + "api_version": "string", + "metadata": "kubernetes.client.V1ObjectMeta", + "spec": "ClusterNetworkPolicySpec", + "status": "NetworkPolicyStatus" + } + + def __init__(self, kind=None, api_version=None, metadata=None, spec=None, status=None): + self.kind = kind + self.api_version = api_version + self.metadata = metadata + self.spec = spec + self.status = status + + def to_dict(self): + """Returns the model properties as a dict""" + result = {} + + for attr, _ in six.iteritems(self.attribute_types): + value = getattr(self, attr) + if isinstance(value, list): + result[attr] = list(map( + lambda x: x.to_dict() if hasattr(x, "to_dict") else x, + value + )) + elif hasattr(value, "to_dict"): + result[attr] = value.to_dict() + elif isinstance(value, dict): + result[attr] = dict(map( + lambda item: (item[0], item[1].to_dict()) + if hasattr(item[1], "to_dict") else item, + value.items() + )) + else: + result[attr] = value + + return result + +class ClusterNetworkPolicySpec(object): + attribute_types = { + "tier": "string", + "priority": "float", + "applied_to": "list[NetworkPolicyPeer]", + "ingress": "list[Rule]", + "egress": "list[Rule]" + } + + def __init__(self, tier=None, priority=None, applied_to=None, ingress=None, egress=None): + self.tier = tier + self.priority = priority + self.applied_to = applied_to + self.ingress = ingress + self.egress = egress + + def to_dict(self): + """Returns the model properties as a dict""" + result = {} + + for attr, _ in six.iteritems(self.attribute_types): + value = getattr(self, attr) + if isinstance(value, list): + result[attr] = list(map( + lambda x: x.to_dict() if hasattr(x, "to_dict") else x, + value + )) + elif hasattr(value, "to_dict"): + result[attr] = value.to_dict() + elif isinstance(value, dict): + result[attr] = dict(map( + lambda item: (item[0], item[1].to_dict()) + if hasattr(item[1], "to_dict") else item, + value.items() + )) + else: + result[attr] = value + + return result + +class NetworkPolicyStatus(object): + attribute_types = { + "phase": "string", + "observed_generation": "int", + "current_nodes_realized": "int", + "desired_nodes_realized": "int" + } + + def __init__(self, phase=None, observed_generation=None, current_nodes_realized=None, desired_nodes_realized=None): + self.phase = phase + self.observed_generation = observed_generation + self.current_nodes_realized = current_nodes_realized + self.desired_nodes_realized = desired_nodes_realized + + def to_dict(self): + """Returns the model properties as a dict""" + result = {} + + for attr, _ in six.iteritems(self.attribute_types): + value = getattr(self, attr) + if isinstance(value, list): + result[attr] = list(map( + lambda x: x.to_dict() if hasattr(x, "to_dict") else x, + value + )) + elif hasattr(value, "to_dict"): + result[attr] = value.to_dict() + elif isinstance(value, dict): + result[attr] = dict(map( + lambda item: (item[0], item[1].to_dict()) + if hasattr(item[1], "to_dict") else item, + value.items() + )) + else: + result[attr] = value + + return result \ No newline at end of file diff --git a/build/yamls/flow-visibility/policy-recommendation/policy_recommendation_job.py b/build/yamls/flow-visibility/policy-recommendation/policy_recommendation_job.py new file mode 100644 index 00000000000..1a83acb516f --- /dev/null +++ b/build/yamls/flow-visibility/policy-recommendation/policy_recommendation_job.py @@ -0,0 +1,467 @@ +import antrea_crd +import json +import kubernetes.client +import yaml +from ipaddress import ip_address, IPv4Address +from pyspark.sql import SparkSession +from pyspark.sql.functions import udf +from pyspark.sql.types import * +from re import sub + +# Column names of flow record table in Clickhouse database used in recommendation job +FLOW_TABLE_COLUMNS = ["src_ip", "src_pod_ns", "src_pod_labels", "dst_ip", "dst_pod_ns", "dst_pod_labels", "dst_svc_name", "dst_svc_ns", "dst_port", "proto", "ingress_policy", "egress_policy"] + +def get_flow_type(dst_svc_name, dst_pod_labels): + if dst_svc_name != "": + return "pod_to_svc" + elif dst_pod_labels != "": + return "pod_to_pod" + else: + return "pod_to_external" + +def map_flow_to_egress(flow): + src = "#".join([flow.src_pod_ns, flow.src_pod_labels]) + if flow.flow_type == "pod_to_external": + dst = "#".join([flow.dst_ip, str(flow.dst_port), flow.proto]) + else: + dst = "#".join([flow.dst_pod_ns, flow.dst_pod_labels, str(flow.dst_port), flow.proto]) + return (src, ("", dst)) + +def map_flow_to_egress_svc(flow): + src = "#".join([flow.src_pod_ns, flow.src_pod_labels]) + dst = "#".join([flow.dst_svc_ns, flow.dst_svc_name, str(flow.dst_port), flow.proto]) + return (src, dst) + +def map_flow_to_ingress(flow): + src = "#".join([flow.src_pod_ns, flow.src_pod_labels, str(flow.dst_port), flow.proto]) + dst = "#".join([flow.dst_pod_ns, flow.dst_pod_labels]) + return (dst, (src, "")) + +def combine_network_peers(a, b): + if a[0] != "" and b[0] != "": + new_src = '|'.join([a[0], b[0]]) + elif a[0] != "": + new_src = a[0] + else: + new_src = b[0] + if a[1] != "" and b[1] != "": + new_dst = '|'.join([a[1], b[1]]) + elif a[1] != "": + new_dst = a[1] + else: + new_dst = b[1] + return (new_src, new_dst) + +def get_IP_version(IP): + return "v4" if type(ip_address(IP)) is IPv4Address else "v6" + +def camel(s): + s = sub(r"(_|-)+", " ", s).title().replace(" ", "") + if not s: return s + return s[0].lower() + s[1:] + +def camel_dict(d): + result = {} + for key, value in d.items(): + if isinstance(value, list): + result[camel(key)] = list(map( + lambda x: camel_dict(x) if isinstance(x, dict) else x, value + )) + elif isinstance(value, dict) and key != "match_labels": + result[camel(key)] = camel_dict(value) + elif value != None: + result[camel(key)] = value + return result + +def dict_to_yaml(d): + return yaml.dump(yaml.load(json.dumps(camel_dict(d)), Loader=yaml.FullLoader)) + +def generate_k8s_egress_rule(egress): + if len(egress.split("#")) == 4: + ns, labels, port, proto = egress.split("#") + egress_peer = kubernetes.client.V1NetworkPolicyPeer( + namespace_selector = kubernetes.client.V1LabelSelector( + match_labels = { + "name":ns + } + ), + pod_selector = kubernetes.client.V1LabelSelector( + match_labels = json.loads(labels) + ), + ) + elif len(egress.split("#")) == 3: + dst_ip, port, proto = egress.split("#") + if get_IP_version(dst_ip) == "v4": + cidr = dst_ip + "/32" + else: + cidr = dst_ip + "/128" + egress_peer = kubernetes.client.V1NetworkPolicyPeer( + ip_block = kubernetes.client.V1IPBlock( + cidr = cidr, + ) + ) + else: + print("Warning: egress tuple {} has wrong format".format(egress)) + return "" + ports = kubernetes.client.V1NetworkPolicyPort( + port = int(port), + protocol = proto + ) + egress_rule = kubernetes.client.V1NetworkPolicyEgressRule( + to = [egress_peer], + ports = [ports] + ) + return egress_rule + +def generate_k8s_ingress_rule(ingress): + ns, labels, port, proto = ingress.split("#") + ingress_peer = kubernetes.client.V1NetworkPolicyPeer( + namespace_selector = kubernetes.client.V1LabelSelector( + match_labels = { + "name":ns + } + ), + pod_selector = kubernetes.client.V1LabelSelector( + match_labels = json.loads(labels) + ), + ) + ports = kubernetes.client.V1NetworkPolicyPort( + port = int(port), + protocol = proto + ) + ingress_rule = kubernetes.client.V1NetworkPolicyIngressRule( + _from = [ingress_peer], + ports = [ports] + ) + return ingress_rule + +def generate_policy_name(info, applied_to): + policy_name = info + ns, labels = applied_to.split("#") + if ns: + policy_name = policy_name + "-" + ns + for k, v in json.loads(labels).items(): + policy_name = policy_name + "-" + k + "-" + v + if len(policy_name) > 63: + policy_name = policy_name[:63] + return policy_name + +def generate_k8s_np(x): + applied_to, (ingresses, egresses) = x + ingress_list = ingresses.split('|') + egress_list = egresses.split('|') + egressRules = [] + for egress in egress_list: + if '#' in egress: + egressRules.append(generate_k8s_egress_rule(egress)) + ingressRules = [] + for ingress in ingress_list: + if '#' in ingress: + ingressRules.append(generate_k8s_ingress_rule(ingress)) + if egressRules or ingressRules: + policy_types = [] + if egressRules: + policy_types.append("Egress") + if ingressRules: + policy_types.append("Ingress") + ns, labels = applied_to.split("#") + np_name = generate_policy_name("recommend-k8s-np", applied_to) + np = kubernetes.client.V1NetworkPolicy( + api_version = "networking.k8s.io/v1", + kind = "NetworkPolicy", + metadata = kubernetes.client.V1ObjectMeta( + name = np_name, + namespace = ns + ), + spec = kubernetes.client.V1NetworkPolicySpec( + egress = egressRules, + ingress = ingressRules, + pod_selector = kubernetes.client.V1LabelSelector( + match_labels = json.loads(labels) + ), + policy_types = policy_types + ) + ) + return dict_to_yaml(np.to_dict()) + +def generate_anp_egress_rule(egress): + if len(egress.split("#")) == 4: + ns, labels, port, proto = egress.split("#") + egress_peer = antrea_crd.NetworkPolicyPeer( + namespace_selector = kubernetes.client.V1LabelSelector( + match_labels = { + "kubernetes.io/metadata.name":ns + } + ), + pod_selector = kubernetes.client.V1LabelSelector( + match_labels = json.loads(labels) + ), + ) + elif len(egress.split("#")) == 3: + dst_ip, port, proto = egress.split("#") + if get_IP_version(dst_ip) == "v4": + cidr = dst_ip + "/32" + else: + cidr = dst_ip + "/128" + egress_peer = antrea_crd.NetworkPolicyPeer( + ip_block = antrea_crd.IPBlock( + CIDR = cidr, + ) + ) + else: + print("Warning: egress tuple {} has wrong format".format(egress)) + ports = antrea_crd.NetworkPolicyPort( + protocol = proto, + port = int(port) + ) + egress_rule = antrea_crd.Rule( + action = "Allow", + to = [egress_peer], + ports = [ports] + ) + return egress_rule + +def generate_anp_ingress_rule(ingress): + ns, labels, port, proto = ingress.split("#") + ingress_peer = antrea_crd.NetworkPolicyPeer( + namespace_selector = kubernetes.client.V1LabelSelector( + match_labels = { + "kubernetes.io/metadata.name":ns + } + ), + pod_selector = kubernetes.client.V1LabelSelector( + match_labels = json.loads(labels) + ), + ) + ports = antrea_crd.NetworkPolicyPort( + protocol = proto, + port = int(port) + ) + ingress_rule = antrea_crd.Rule( + action = "Allow", + _from = [ingress_peer], + ports = [ports] + ) + return ingress_rule + +def generate_anp(network_peers): + applied_to, (ingresses, egresses) = network_peers + ingress_list = ingresses.split('|') + egress_list = egresses.split('|') + egressRules = [] + for egress in egress_list: + if '#' in egress: + egressRules.append(generate_anp_egress_rule(egress)) + ingressRules = [] + for ingress in ingress_list: + if '#' in ingress: + ingressRules.append(generate_anp_ingress_rule(ingress)) + if egressRules or ingressRules: + ns, labels = applied_to.split("#") + np_name = generate_policy_name("recommend-anp", applied_to) + np = antrea_crd.NetworkPolicy( + kind = "NetworkPolicy", + api_version = "crd.antrea.io/v1alpha1", + metadata = kubernetes.client.V1ObjectMeta( + name = np_name, + namespace = ns, + ), + spec = antrea_crd.NetworkPolicySpec( + tier = "Application", + priority = 5, + applied_to = [antrea_crd.NetworkPolicyPeer( + pod_selector = kubernetes.client.V1LabelSelector( + match_labels = json.loads(labels) + ), + )], + egress = egressRules, + ingress = ingressRules, + ) + ) + return dict_to_yaml(np.to_dict()) + +def get_svc_cg_name(namespace, name): + return "-".join(["cg", namespace, name]) + +def generate_svc_cg(x): + namespace, name = x + svc_cg = antrea_crd.ClusterGroup( + kind = "ClusterGroup", + api_version = "crd.antrea.io/v1alpha2", + metadata = kubernetes.client.V1ObjectMeta( + name = get_svc_cg_name(namespace, name) + ), + spec = antrea_crd.GroupSpec( + service_reference = antrea_crd.ServiceReference( + name = name, + namespace = namespace + ) + ) + ) + return dict_to_yaml(svc_cg.to_dict()) + +def generate_acnp_svc_egress_rule(egress): + ns, svc, port, proto = egress.split("#") + egress_peer = antrea_crd.NetworkPolicyPeer( + group = get_svc_cg_name(ns, svc) + ) + ports = antrea_crd.NetworkPolicyPort( + protocol = proto, + port = int(port) + ) + egress_rule = antrea_crd.Rule( + action = "Allow", + to = [egress_peer], + ports = [ports] + ) + return egress_rule + +def generate_svc_acnp(x): + applied_to, egresses = x + egress_list = egresses.split('|') + egressRules = [] + for egress in egress_list: + egressRules.append(generate_acnp_svc_egress_rule(egress)) + if egressRules: + ns, labels = applied_to.split("#") + np_name = generate_policy_name("recommend-svc-acnp", applied_to) + np = antrea_crd.ClusterNetworkPolicy( + kind = "ClusterNetworkPolicy", + api_version = "crd.antrea.io/v1alpha1", + metadata = kubernetes.client.V1ObjectMeta( + name = np_name, + ), + spec = antrea_crd.NetworkPolicySpec( + tier = "Application", + priority = 5, + applied_to = [antrea_crd.NetworkPolicyPeer( + pod_selector = kubernetes.client.V1LabelSelector( + match_labels = json.loads(labels) + ), + namespace_selector = kubernetes.client.V1LabelSelector( + match_labels = { + "kubernetes.io/metadata.name":ns + } + ) + )], + egress = egressRules, + ) + ) + return dict_to_yaml(np.to_dict()) + +def generate_reject_acnp(applied_to): + if not applied_to: + np_name = "recommend-reject-all-acnp" + applied_to = antrea_crd.NetworkPolicyPeer( + pod_selector = kubernetes.client.V1LabelSelector(), + namespace_selector = kubernetes.client.V1LabelSelector() + ) + else: + np_name = generate_policy_name("recommend-reject-acnp", applied_to) + ns, labels = applied_to.split("#") + applied_to = antrea_crd.NetworkPolicyPeer( + pod_selector = kubernetes.client.V1LabelSelector( + match_labels = json.loads(labels) + ), + namespace_selector = kubernetes.client.V1LabelSelector( + match_labels = { + "kubernetes.io/metadata.name":ns + } + ) + ) + np = antrea_crd.ClusterNetworkPolicy( + kind = "ClusterNetworkPolicy", + api_version = "crd.antrea.io/v1alpha1", + metadata = kubernetes.client.V1ObjectMeta( + name = np_name, + ), + spec = antrea_crd.NetworkPolicySpec( + tier = "Baseline", + priority = 5, + applied_to = [applied_to], + egress = [antrea_crd.Rule( + action = "Reject", + to = [antrea_crd.NetworkPolicyPeer( + pod_selector = kubernetes.client.V1LabelSelector())] + )], + ingress = [antrea_crd.Rule( + action = "Reject", + _from = [antrea_crd.NetworkPolicyPeer( + pod_selector = kubernetes.client.V1LabelSelector())] + )], + ) + ) + return dict_to_yaml(np.to_dict()) + +def initial_recommendation_job(db_jdbc_address, table_name, limit=100, option=1): + if option not in [1, 2, 3]: + print("Error: option {} is not valid".format(option)) + return [] + spark = SparkSession.builder.getOrCreate() + flow_df = spark.read \ + .format("jdbc") \ + .option("url", db_jdbc_address) \ + .option("query", "select {} FROM {} LIMIT {}".format(", ".join(FLOW_TABLE_COLUMNS), table_name, limit)) \ + .load() + # Filter out unprotected flows and aggregate them + unprotected_flows_df = flow_df.filter((flow_df.ingress_policy == "") & (flow_df.egress_policy == ""))\ + .groupBy(FLOW_TABLE_COLUMNS[:-2])\ + .agg({})\ + .withColumn('flow_type', udf(get_flow_type, StringType())("dst_svc_name", "dst_pod_labels")) + if option == 3: + # Recommend k8s native network policies for unprotected flows + egress_rdd = unprotected_flows_df.rdd.map(map_flow_to_egress)\ + .reduceByKey(lambda a, b: ("", a[1]+"|"+b[1])) + ingress_rdd = unprotected_flows_df.filter(unprotected_flows_df.flow_type != "pod_to_external")\ + .rdd.map(map_flow_to_ingress)\ + .reduceByKey(lambda a, b: (a[0]+"|"+b[0], "")) + network_peers_rdd = ingress_rdd.union(egress_rdd)\ + .reduceByKey(combine_network_peers) + k8s_np_rdd = network_peers_rdd.map(generate_k8s_np) + k8s_np_list = k8s_np_rdd.collect() + return k8s_np_list + else: + # Recommend allow Antrea Network Policies for unprotected Pod-to-Pod & Pod-to-External flows + unprotected_not_svc_flows_df = unprotected_flows_df.filter(unprotected_flows_df.flow_type != "pod_to_svc") + egress_rdd = unprotected_not_svc_flows_df.rdd.map(map_flow_to_egress)\ + .reduceByKey(lambda a, b: ("", a[1]+"|"+b[1])) + ingress_rdd = unprotected_not_svc_flows_df.filter(unprotected_not_svc_flows_df.flow_type != "pod_to_external")\ + .rdd.map(map_flow_to_ingress)\ + .reduceByKey(lambda a, b: (a[0]+"|"+b[0], "")) + network_peers_rdd = ingress_rdd.union(egress_rdd)\ + .reduceByKey(combine_network_peers) + anp_rdd = network_peers_rdd.map(generate_anp) + anp_list = anp_rdd.collect() + # Recommend allow Antrea Cluster Network Policies for unprotected Pod-to-Svc flows + unprotected_svc_flows_df = unprotected_flows_df.filter(unprotected_flows_df.flow_type == "pod_to_svc") + svc_df = unprotected_svc_flows_df.groupBy(["dst_svc_ns", "dst_svc_name"]).agg({}) + svc_cg_list = svc_df.rdd.map(generate_svc_cg).collect() + egress_svc_rdd = unprotected_svc_flows_df.rdd.map(map_flow_to_egress_svc)\ + .reduceByKey(lambda a, b: a+"|"+b) + svc_acnp_rdd = egress_svc_rdd.map(generate_svc_acnp) + svc_acnp_list = svc_acnp_rdd.collect() + # return svc_acnp_list + if option == 1: + # Recommend deny ANPs for the applied to groups of allow policies + applied_groups_rdd = network_peers_rdd.map(lambda x: x[0])\ + .union(egress_svc_rdd.map(lambda x: x[0]))\ + .distinct() + deny_anp_rdd = applied_groups_rdd.map(generate_reject_acnp) + deny_anp_list = deny_anp_rdd.collect() + return anp_list + svc_cg_list + svc_acnp_list + deny_anp_list + else: + # Recommend deny ACNP for whole cluster + deny_all_policy = generate_reject_acnp("") + return anp_list + svc_cg_list + svc_acnp_list + [deny_all_policy] + +def main(): + # Argument values for development use only + db_jdbc_address = "jdbc:clickhouse://localhost:8123" + table_name = "default.flow_record" + result = initial_recommendation_job(db_jdbc_address, table_name, option=3) + print("Initial recommended completed, policy number: {}".format(len(result))) + for policy in result: + print(policy) + +if __name__ == '__main__': + main()