Skip to content

Commit

Permalink
[Flow Visibility] Add subsequent policy recommendation Spark job
Browse files Browse the repository at this point in the history
  • Loading branch information
Yongming Ding committed Dec 7, 2021
1 parent 5458b30 commit ee39314
Showing 1 changed file with 111 additions and 65 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import antrea_crd
import json
import kubernetes.client
import random
import string
import yaml
from ipaddress import ip_address, IPv4Address
from pyspark.sql import SparkSession
Expand All @@ -9,7 +11,7 @@
from re import sub

# Column names of flow record table in Clickhouse database used in recommendation job
FLOW_TABLE_COLUMNS = ["src_ip", "src_pod_ns", "src_pod_labels", "dst_ip", "dst_pod_ns", "dst_pod_labels", "dst_svc_name", "dst_svc_ns", "dst_port", "proto", "ingress_policy", "egress_policy"]
FLOW_TABLE_COLUMNS = ["src_ip", "src_pod_ns", "src_pod_labels", "dst_ip", "dst_pod_ns", "dst_pod_labels", "dst_svc_name", "dst_svc_ns", "dst_port", "proto", "ingress_policy", "egress_policy", "trusted"]

def get_flow_type(dst_svc_name, dst_pod_labels):
if dst_svc_name != "":
Expand Down Expand Up @@ -135,16 +137,8 @@ def generate_k8s_ingress_rule(ingress):
)
return ingress_rule

def generate_policy_name(info, applied_to):
policy_name = info
ns, labels = applied_to.split("#")
if ns:
policy_name = policy_name + "-" + ns
for k, v in json.loads(labels).items():
policy_name = policy_name + "-" + k + "-" + v
if len(policy_name) > 63:
policy_name = policy_name[:63]
return policy_name
def generate_policy_name(info):
return "-".join([info, "".join(random.sample(string.ascii_letters + string.digits, 5))])

def generate_k8s_np(x):
applied_to, (ingresses, egresses) = x
Expand All @@ -165,7 +159,7 @@ def generate_k8s_np(x):
if ingressRules:
policy_types.append("Ingress")
ns, labels = applied_to.split("#")
np_name = generate_policy_name("recommend-k8s-np", applied_to)
np_name = generate_policy_name("recommend-k8s-np")
np = kubernetes.client.V1NetworkPolicy(
api_version = "networking.k8s.io/v1",
kind = "NetworkPolicy",
Expand Down Expand Up @@ -258,7 +252,7 @@ def generate_anp(network_peers):
ingressRules.append(generate_anp_ingress_rule(ingress))
if egressRules or ingressRules:
ns, labels = applied_to.split("#")
np_name = generate_policy_name("recommend-anp", applied_to)
np_name = generate_policy_name("recommend-allow-anp")
np = antrea_crd.NetworkPolicy(
kind = "NetworkPolicy",
api_version = "crd.antrea.io/v1alpha1",
Expand Down Expand Up @@ -324,7 +318,7 @@ def generate_svc_acnp(x):
egressRules.append(generate_acnp_svc_egress_rule(egress))
if egressRules:
ns, labels = applied_to.split("#")
np_name = generate_policy_name("recommend-svc-acnp", applied_to)
np_name = generate_policy_name("recommend-svc-allow-acnp")
np = antrea_crd.ClusterNetworkPolicy(
kind = "ClusterNetworkPolicy",
api_version = "crd.antrea.io/v1alpha1",
Expand Down Expand Up @@ -357,7 +351,7 @@ def generate_reject_acnp(applied_to):
namespace_selector = kubernetes.client.V1LabelSelector()
)
else:
np_name = generate_policy_name("recommend-reject-acnp", applied_to)
np_name = generate_policy_name("recommend-reject-acnp")
ns, labels = applied_to.split("#")
applied_to = antrea_crd.NetworkPolicyPeer(
pod_selector = kubernetes.client.V1LabelSelector(
Expand Down Expand Up @@ -393,54 +387,39 @@ def generate_reject_acnp(applied_to):
)
return dict_to_yaml(np.to_dict())

def initial_recommendation_job(db_jdbc_address, table_name, limit=100, option=1):
if option not in [1, 2, 3]:
print("Error: option {} is not valid".format(option))
return []
spark = SparkSession.builder.getOrCreate()
flow_df = spark.read \
.format("jdbc") \
.option("url", db_jdbc_address) \
.option("query", "select {} FROM {} LIMIT {}".format(", ".join(FLOW_TABLE_COLUMNS), table_name, limit)) \
.load()
# Filter out unprotected flows and aggregate them
unprotected_flows_df = flow_df.filter((flow_df.ingress_policy == "") & (flow_df.egress_policy == ""))\
.groupBy(FLOW_TABLE_COLUMNS[:-2])\
.agg({})\
.withColumn('flow_type', udf(get_flow_type, StringType())("dst_svc_name", "dst_pod_labels"))
if option == 3:
# Recommend k8s native network policies for unprotected flows
egress_rdd = unprotected_flows_df.rdd.map(map_flow_to_egress)\
.reduceByKey(lambda a, b: ("", a[1]+"|"+b[1]))
ingress_rdd = unprotected_flows_df.filter(unprotected_flows_df.flow_type != "pod_to_external")\
.rdd.map(map_flow_to_ingress)\
.reduceByKey(lambda a, b: (a[0]+"|"+b[0], ""))
network_peers_rdd = ingress_rdd.union(egress_rdd)\
.reduceByKey(combine_network_peers)
k8s_np_rdd = network_peers_rdd.map(generate_k8s_np)
k8s_np_list = k8s_np_rdd.collect()
return k8s_np_list
else:
# Recommend allow Antrea Network Policies for unprotected Pod-to-Pod & Pod-to-External flows
unprotected_not_svc_flows_df = unprotected_flows_df.filter(unprotected_flows_df.flow_type != "pod_to_svc")
egress_rdd = unprotected_not_svc_flows_df.rdd.map(map_flow_to_egress)\
.reduceByKey(lambda a, b: ("", a[1]+"|"+b[1]))
ingress_rdd = unprotected_not_svc_flows_df.filter(unprotected_not_svc_flows_df.flow_type != "pod_to_external")\
.rdd.map(map_flow_to_ingress)\
.reduceByKey(lambda a, b: (a[0]+"|"+b[0], ""))
network_peers_rdd = ingress_rdd.union(egress_rdd)\
.reduceByKey(combine_network_peers)
anp_rdd = network_peers_rdd.map(generate_anp)
anp_list = anp_rdd.collect()
# Recommend allow Antrea Cluster Network Policies for unprotected Pod-to-Svc flows
unprotected_svc_flows_df = unprotected_flows_df.filter(unprotected_flows_df.flow_type == "pod_to_svc")
svc_df = unprotected_svc_flows_df.groupBy(["dst_svc_ns", "dst_svc_name"]).agg({})
svc_cg_list = svc_df.rdd.map(generate_svc_cg).collect()
egress_svc_rdd = unprotected_svc_flows_df.rdd.map(map_flow_to_egress_svc)\
.reduceByKey(lambda a, b: a+"|"+b)
svc_acnp_rdd = egress_svc_rdd.map(generate_svc_acnp)
svc_acnp_list = svc_acnp_rdd.collect()
# return svc_acnp_list
def recommend_k8s_policies(flows_df):
egress_rdd = flows_df.rdd.map(map_flow_to_egress)\
.reduceByKey(lambda a, b: ("", a[1]+"|"+b[1]))
ingress_rdd = flows_df.filter(flows_df.flow_type != "pod_to_external")\
.rdd.map(map_flow_to_ingress)\
.reduceByKey(lambda a, b: (a[0]+"|"+b[0], ""))
network_peers_rdd = ingress_rdd.union(egress_rdd)\
.reduceByKey(combine_network_peers)
k8s_np_rdd = network_peers_rdd.map(generate_k8s_np)
k8s_np_list = k8s_np_rdd.collect()
return k8s_np_list

def recommend_antrea_policies(flows_df, option=1, deny_rules=True):
# Recommend allow Antrea Network Policies for unprotected Pod-to-Pod & Pod-to-External flows
unprotected_not_svc_flows_df = flows_df.filter(flows_df.flow_type != "pod_to_svc")
egress_rdd = unprotected_not_svc_flows_df.rdd.map(map_flow_to_egress)\
.reduceByKey(lambda a, b: ("", a[1]+"|"+b[1]))
ingress_rdd = unprotected_not_svc_flows_df.filter(unprotected_not_svc_flows_df.flow_type != "pod_to_external")\
.rdd.map(map_flow_to_ingress)\
.reduceByKey(lambda a, b: (a[0]+"|"+b[0], ""))
network_peers_rdd = ingress_rdd.union(egress_rdd)\
.reduceByKey(combine_network_peers)
anp_rdd = network_peers_rdd.map(generate_anp)
anp_list = anp_rdd.collect()
# Recommend allow Antrea Cluster Network Policies for unprotected Pod-to-Svc flows
unprotected_svc_flows_df = flows_df.filter(flows_df.flow_type == "pod_to_svc")
svc_df = unprotected_svc_flows_df.groupBy(["dst_svc_ns", "dst_svc_name"]).agg({})
svc_cg_list = svc_df.rdd.map(generate_svc_cg).collect()
egress_svc_rdd = unprotected_svc_flows_df.rdd.map(map_flow_to_egress_svc)\
.reduceByKey(lambda a, b: a+"|"+b)
svc_acnp_rdd = egress_svc_rdd.map(generate_svc_acnp)
svc_acnp_list = svc_acnp_rdd.collect()
if deny_rules:
if option == 1:
# Recommend deny ANPs for the applied to groups of allow policies
applied_groups_rdd = network_peers_rdd.map(lambda x: x[0])\
Expand All @@ -453,13 +432,80 @@ def initial_recommendation_job(db_jdbc_address, table_name, limit=100, option=1)
# Recommend deny ACNP for whole cluster
deny_all_policy = generate_reject_acnp("")
return anp_list + svc_cg_list + svc_acnp_list + [deny_all_policy]
else:
return anp_list + svc_cg_list + svc_acnp_list

def recommend_policies_for_unprotected_flows(unprotected_flows_df, option=1):
if option not in [1, 2, 3]:
print("Error: option {} is not valid".format(option))
return []
if option == 3:
# Recommend k8s native network policies for unprotected flows
return recommend_k8s_policies(unprotected_flows_df)
else:
return recommend_antrea_policies(unprotected_flows_df, option, True)

def recommend_policies_for_trusted_denied_flows(trusted_denied_flows_df):
return recommend_antrea_policies(trusted_denied_flows_df, deny_rules=False)

def generate_sql_query(table_name, limit, start_time, end_time):
sql_query = "SELECT {} FROM {}".format(", ".join(FLOW_TABLE_COLUMNS), table_name)
if start_time and end_time:
sql_query += " WHERE flow_end_seconds >= '{}' AND flow_end_seconds < '{}'".format(start_time, end_time)
elif start_time:
sql_query += " WHERE flow_end_seconds >= '{}'".format(start_time)
elif end_time:
sql_query += " WHERE flow_end_seconds < '{}'".format(end_time)
if limit:
sql_query += " LIMIT {}".format(limit)
return sql_query

def initial_recommendation_job(db_jdbc_address, table_name, limit=100, option=1, start_time=None, end_time=None):
spark = SparkSession.builder.getOrCreate()
sql_query = generate_sql_query(table_name, limit, start_time, end_time)
flow_df = spark.read \
.format("jdbc") \
.option("url", db_jdbc_address) \
.option("query", sql_query) \
.load()
# Filter out unprotected flows and aggregate them
unprotected_flows_df = flow_df.filter((flow_df.ingress_policy == "") & (flow_df.egress_policy == ""))\
.groupBy(FLOW_TABLE_COLUMNS[:-3])\
.agg({})\
.withColumn('flow_type', udf(get_flow_type, StringType())("dst_svc_name", "dst_pod_labels"))
return recommend_policies_for_unprotected_flows(unprotected_flows_df, option)

def subsequent_recommendation_job(db_jdbc_address, table_name, limit=100, option=1, start_time=None, end_time=None):
recommend_policies = []
spark = SparkSession.builder.getOrCreate()
sql_query = generate_sql_query(table_name, limit, start_time, end_time)
flow_df = spark.read \
.format("jdbc") \
.option("url", db_jdbc_address) \
.option("query", sql_query)\
.load()
unprotected_flows_df = flow_df.filter((flow_df.ingress_policy == "") & (flow_df.egress_policy == ""))\
.groupBy(FLOW_TABLE_COLUMNS[:-3])\
.agg({})\
.withColumn('flow_type', udf(get_flow_type, StringType())("dst_svc_name", "dst_pod_labels"))
recommend_policies += recommend_policies_for_unprotected_flows(unprotected_flows_df, option)
if option in [1, 2]:
trusted_denied_flows_df = flow_df.filter(flow_df.trusted == 1)\
.groupBy(FLOW_TABLE_COLUMNS[:-3])\
.agg({})\
.withColumn('flow_type', udf(get_flow_type, StringType())("dst_svc_name", "dst_pod_labels"))
recommend_policies += recommend_policies_for_trusted_denied_flows(trusted_denied_flows_df)
return recommend_policies


def main():
# Argument values for development use only
db_jdbc_address = "jdbc:clickhouse://localhost:8123"
table_name = "default.flow_record"
result = initial_recommendation_job(db_jdbc_address, table_name, option=3)
print("Initial recommended completed, policy number: {}".format(len(result)))
# result = initial_recommendation_job(db_jdbc_address, table_name, option=3)
# print("Initial recommended completed, policy number: {}".format(len(result)))
result = subsequent_recommendation_job(db_jdbc_address, table_name, option=1, start_time="2021-12-01 00:00:00", end_time=None)
print("Subsequent recommended completed, policy number: {}".format(len(result)))
for policy in result:
print(policy)

Expand Down

0 comments on commit ee39314

Please sign in to comment.