From ee393143f397ed9a49421a2a42c8403de9c50be8 Mon Sep 17 00:00:00 2001 From: Yongming Ding Date: Tue, 7 Dec 2021 11:06:19 -0800 Subject: [PATCH] [Flow Visibility] Add subsequent policy recommendation Spark job --- .../policy_recommendation_job.py | 176 +++++++++++------- 1 file changed, 111 insertions(+), 65 deletions(-) diff --git a/plugins/flow-visibility/policy-recommendation/policy_recommendation_job.py b/plugins/flow-visibility/policy-recommendation/policy_recommendation_job.py index 1a83acb516f..342f8905566 100644 --- a/plugins/flow-visibility/policy-recommendation/policy_recommendation_job.py +++ b/plugins/flow-visibility/policy-recommendation/policy_recommendation_job.py @@ -1,6 +1,8 @@ import antrea_crd import json import kubernetes.client +import random +import string import yaml from ipaddress import ip_address, IPv4Address from pyspark.sql import SparkSession @@ -9,7 +11,7 @@ from re import sub # Column names of flow record table in Clickhouse database used in recommendation job -FLOW_TABLE_COLUMNS = ["src_ip", "src_pod_ns", "src_pod_labels", "dst_ip", "dst_pod_ns", "dst_pod_labels", "dst_svc_name", "dst_svc_ns", "dst_port", "proto", "ingress_policy", "egress_policy"] +FLOW_TABLE_COLUMNS = ["src_ip", "src_pod_ns", "src_pod_labels", "dst_ip", "dst_pod_ns", "dst_pod_labels", "dst_svc_name", "dst_svc_ns", "dst_port", "proto", "ingress_policy", "egress_policy", "trusted"] def get_flow_type(dst_svc_name, dst_pod_labels): if dst_svc_name != "": @@ -135,16 +137,8 @@ def generate_k8s_ingress_rule(ingress): ) return ingress_rule -def generate_policy_name(info, applied_to): - policy_name = info - ns, labels = applied_to.split("#") - if ns: - policy_name = policy_name + "-" + ns - for k, v in json.loads(labels).items(): - policy_name = policy_name + "-" + k + "-" + v - if len(policy_name) > 63: - policy_name = policy_name[:63] - return policy_name +def generate_policy_name(info): + return "-".join([info, "".join(random.sample(string.ascii_letters + string.digits, 5))]) def generate_k8s_np(x): applied_to, (ingresses, egresses) = x @@ -165,7 +159,7 @@ def generate_k8s_np(x): if ingressRules: policy_types.append("Ingress") ns, labels = applied_to.split("#") - np_name = generate_policy_name("recommend-k8s-np", applied_to) + np_name = generate_policy_name("recommend-k8s-np") np = kubernetes.client.V1NetworkPolicy( api_version = "networking.k8s.io/v1", kind = "NetworkPolicy", @@ -258,7 +252,7 @@ def generate_anp(network_peers): ingressRules.append(generate_anp_ingress_rule(ingress)) if egressRules or ingressRules: ns, labels = applied_to.split("#") - np_name = generate_policy_name("recommend-anp", applied_to) + np_name = generate_policy_name("recommend-allow-anp") np = antrea_crd.NetworkPolicy( kind = "NetworkPolicy", api_version = "crd.antrea.io/v1alpha1", @@ -324,7 +318,7 @@ def generate_svc_acnp(x): egressRules.append(generate_acnp_svc_egress_rule(egress)) if egressRules: ns, labels = applied_to.split("#") - np_name = generate_policy_name("recommend-svc-acnp", applied_to) + np_name = generate_policy_name("recommend-svc-allow-acnp") np = antrea_crd.ClusterNetworkPolicy( kind = "ClusterNetworkPolicy", api_version = "crd.antrea.io/v1alpha1", @@ -357,7 +351,7 @@ def generate_reject_acnp(applied_to): namespace_selector = kubernetes.client.V1LabelSelector() ) else: - np_name = generate_policy_name("recommend-reject-acnp", applied_to) + np_name = generate_policy_name("recommend-reject-acnp") ns, labels = applied_to.split("#") applied_to = antrea_crd.NetworkPolicyPeer( pod_selector = kubernetes.client.V1LabelSelector( @@ -393,54 +387,39 @@ def generate_reject_acnp(applied_to): ) return dict_to_yaml(np.to_dict()) -def initial_recommendation_job(db_jdbc_address, table_name, limit=100, option=1): - if option not in [1, 2, 3]: - print("Error: option {} is not valid".format(option)) - return [] - spark = SparkSession.builder.getOrCreate() - flow_df = spark.read \ - .format("jdbc") \ - .option("url", db_jdbc_address) \ - .option("query", "select {} FROM {} LIMIT {}".format(", ".join(FLOW_TABLE_COLUMNS), table_name, limit)) \ - .load() - # Filter out unprotected flows and aggregate them - unprotected_flows_df = flow_df.filter((flow_df.ingress_policy == "") & (flow_df.egress_policy == ""))\ - .groupBy(FLOW_TABLE_COLUMNS[:-2])\ - .agg({})\ - .withColumn('flow_type', udf(get_flow_type, StringType())("dst_svc_name", "dst_pod_labels")) - if option == 3: - # Recommend k8s native network policies for unprotected flows - egress_rdd = unprotected_flows_df.rdd.map(map_flow_to_egress)\ - .reduceByKey(lambda a, b: ("", a[1]+"|"+b[1])) - ingress_rdd = unprotected_flows_df.filter(unprotected_flows_df.flow_type != "pod_to_external")\ - .rdd.map(map_flow_to_ingress)\ - .reduceByKey(lambda a, b: (a[0]+"|"+b[0], "")) - network_peers_rdd = ingress_rdd.union(egress_rdd)\ - .reduceByKey(combine_network_peers) - k8s_np_rdd = network_peers_rdd.map(generate_k8s_np) - k8s_np_list = k8s_np_rdd.collect() - return k8s_np_list - else: - # Recommend allow Antrea Network Policies for unprotected Pod-to-Pod & Pod-to-External flows - unprotected_not_svc_flows_df = unprotected_flows_df.filter(unprotected_flows_df.flow_type != "pod_to_svc") - egress_rdd = unprotected_not_svc_flows_df.rdd.map(map_flow_to_egress)\ - .reduceByKey(lambda a, b: ("", a[1]+"|"+b[1])) - ingress_rdd = unprotected_not_svc_flows_df.filter(unprotected_not_svc_flows_df.flow_type != "pod_to_external")\ - .rdd.map(map_flow_to_ingress)\ - .reduceByKey(lambda a, b: (a[0]+"|"+b[0], "")) - network_peers_rdd = ingress_rdd.union(egress_rdd)\ - .reduceByKey(combine_network_peers) - anp_rdd = network_peers_rdd.map(generate_anp) - anp_list = anp_rdd.collect() - # Recommend allow Antrea Cluster Network Policies for unprotected Pod-to-Svc flows - unprotected_svc_flows_df = unprotected_flows_df.filter(unprotected_flows_df.flow_type == "pod_to_svc") - svc_df = unprotected_svc_flows_df.groupBy(["dst_svc_ns", "dst_svc_name"]).agg({}) - svc_cg_list = svc_df.rdd.map(generate_svc_cg).collect() - egress_svc_rdd = unprotected_svc_flows_df.rdd.map(map_flow_to_egress_svc)\ - .reduceByKey(lambda a, b: a+"|"+b) - svc_acnp_rdd = egress_svc_rdd.map(generate_svc_acnp) - svc_acnp_list = svc_acnp_rdd.collect() - # return svc_acnp_list +def recommend_k8s_policies(flows_df): + egress_rdd = flows_df.rdd.map(map_flow_to_egress)\ + .reduceByKey(lambda a, b: ("", a[1]+"|"+b[1])) + ingress_rdd = flows_df.filter(flows_df.flow_type != "pod_to_external")\ + .rdd.map(map_flow_to_ingress)\ + .reduceByKey(lambda a, b: (a[0]+"|"+b[0], "")) + network_peers_rdd = ingress_rdd.union(egress_rdd)\ + .reduceByKey(combine_network_peers) + k8s_np_rdd = network_peers_rdd.map(generate_k8s_np) + k8s_np_list = k8s_np_rdd.collect() + return k8s_np_list + +def recommend_antrea_policies(flows_df, option=1, deny_rules=True): + # Recommend allow Antrea Network Policies for unprotected Pod-to-Pod & Pod-to-External flows + unprotected_not_svc_flows_df = flows_df.filter(flows_df.flow_type != "pod_to_svc") + egress_rdd = unprotected_not_svc_flows_df.rdd.map(map_flow_to_egress)\ + .reduceByKey(lambda a, b: ("", a[1]+"|"+b[1])) + ingress_rdd = unprotected_not_svc_flows_df.filter(unprotected_not_svc_flows_df.flow_type != "pod_to_external")\ + .rdd.map(map_flow_to_ingress)\ + .reduceByKey(lambda a, b: (a[0]+"|"+b[0], "")) + network_peers_rdd = ingress_rdd.union(egress_rdd)\ + .reduceByKey(combine_network_peers) + anp_rdd = network_peers_rdd.map(generate_anp) + anp_list = anp_rdd.collect() + # Recommend allow Antrea Cluster Network Policies for unprotected Pod-to-Svc flows + unprotected_svc_flows_df = flows_df.filter(flows_df.flow_type == "pod_to_svc") + svc_df = unprotected_svc_flows_df.groupBy(["dst_svc_ns", "dst_svc_name"]).agg({}) + svc_cg_list = svc_df.rdd.map(generate_svc_cg).collect() + egress_svc_rdd = unprotected_svc_flows_df.rdd.map(map_flow_to_egress_svc)\ + .reduceByKey(lambda a, b: a+"|"+b) + svc_acnp_rdd = egress_svc_rdd.map(generate_svc_acnp) + svc_acnp_list = svc_acnp_rdd.collect() + if deny_rules: if option == 1: # Recommend deny ANPs for the applied to groups of allow policies applied_groups_rdd = network_peers_rdd.map(lambda x: x[0])\ @@ -453,13 +432,80 @@ def initial_recommendation_job(db_jdbc_address, table_name, limit=100, option=1) # Recommend deny ACNP for whole cluster deny_all_policy = generate_reject_acnp("") return anp_list + svc_cg_list + svc_acnp_list + [deny_all_policy] + else: + return anp_list + svc_cg_list + svc_acnp_list + +def recommend_policies_for_unprotected_flows(unprotected_flows_df, option=1): + if option not in [1, 2, 3]: + print("Error: option {} is not valid".format(option)) + return [] + if option == 3: + # Recommend k8s native network policies for unprotected flows + return recommend_k8s_policies(unprotected_flows_df) + else: + return recommend_antrea_policies(unprotected_flows_df, option, True) + +def recommend_policies_for_trusted_denied_flows(trusted_denied_flows_df): + return recommend_antrea_policies(trusted_denied_flows_df, deny_rules=False) + +def generate_sql_query(table_name, limit, start_time, end_time): + sql_query = "SELECT {} FROM {}".format(", ".join(FLOW_TABLE_COLUMNS), table_name) + if start_time and end_time: + sql_query += " WHERE flow_end_seconds >= '{}' AND flow_end_seconds < '{}'".format(start_time, end_time) + elif start_time: + sql_query += " WHERE flow_end_seconds >= '{}'".format(start_time) + elif end_time: + sql_query += " WHERE flow_end_seconds < '{}'".format(end_time) + if limit: + sql_query += " LIMIT {}".format(limit) + return sql_query + +def initial_recommendation_job(db_jdbc_address, table_name, limit=100, option=1, start_time=None, end_time=None): + spark = SparkSession.builder.getOrCreate() + sql_query = generate_sql_query(table_name, limit, start_time, end_time) + flow_df = spark.read \ + .format("jdbc") \ + .option("url", db_jdbc_address) \ + .option("query", sql_query) \ + .load() + # Filter out unprotected flows and aggregate them + unprotected_flows_df = flow_df.filter((flow_df.ingress_policy == "") & (flow_df.egress_policy == ""))\ + .groupBy(FLOW_TABLE_COLUMNS[:-3])\ + .agg({})\ + .withColumn('flow_type', udf(get_flow_type, StringType())("dst_svc_name", "dst_pod_labels")) + return recommend_policies_for_unprotected_flows(unprotected_flows_df, option) + +def subsequent_recommendation_job(db_jdbc_address, table_name, limit=100, option=1, start_time=None, end_time=None): + recommend_policies = [] + spark = SparkSession.builder.getOrCreate() + sql_query = generate_sql_query(table_name, limit, start_time, end_time) + flow_df = spark.read \ + .format("jdbc") \ + .option("url", db_jdbc_address) \ + .option("query", sql_query)\ + .load() + unprotected_flows_df = flow_df.filter((flow_df.ingress_policy == "") & (flow_df.egress_policy == ""))\ + .groupBy(FLOW_TABLE_COLUMNS[:-3])\ + .agg({})\ + .withColumn('flow_type', udf(get_flow_type, StringType())("dst_svc_name", "dst_pod_labels")) + recommend_policies += recommend_policies_for_unprotected_flows(unprotected_flows_df, option) + if option in [1, 2]: + trusted_denied_flows_df = flow_df.filter(flow_df.trusted == 1)\ + .groupBy(FLOW_TABLE_COLUMNS[:-3])\ + .agg({})\ + .withColumn('flow_type', udf(get_flow_type, StringType())("dst_svc_name", "dst_pod_labels")) + recommend_policies += recommend_policies_for_trusted_denied_flows(trusted_denied_flows_df) + return recommend_policies + def main(): # Argument values for development use only db_jdbc_address = "jdbc:clickhouse://localhost:8123" table_name = "default.flow_record" - result = initial_recommendation_job(db_jdbc_address, table_name, option=3) - print("Initial recommended completed, policy number: {}".format(len(result))) + # result = initial_recommendation_job(db_jdbc_address, table_name, option=3) + # print("Initial recommended completed, policy number: {}".format(len(result))) + result = subsequent_recommendation_job(db_jdbc_address, table_name, option=1, start_time="2021-12-01 00:00:00", end_time=None) + print("Subsequent recommended completed, policy number: {}".format(len(result))) for policy in result: print(policy)