Skip to content

Commit

Permalink
Make policy controller watches optional
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Leong <[email protected]>
  • Loading branch information
adleong committed Jul 30, 2024
1 parent 99fd7e1 commit 7d5ddd8
Showing 1 changed file with 52 additions and 21 deletions.
73 changes: 52 additions & 21 deletions policy-controller/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,33 +247,51 @@ async fn main() -> Result<()> {
.instrument(info_span!("networkauthentications")),
);

let http_routes = runtime.watch_all::<k8s::policy::HttpRoute>(watcher::Config::default());
let http_routes_indexes = IndexList::new(inbound_index.clone())
.push(outbound_index.clone())
.push(status_index.clone())
.shared();
tokio::spawn(
kubert::index::namespaced(http_routes_indexes.clone(), http_routes)
.instrument(info_span!("httproutes.policy.linkerd.io")),
);

let gateway_http_routes =
runtime.watch_all::<k8s_gateway_api::HttpRoute>(watcher::Config::default());
tokio::spawn(
kubert::index::namespaced(http_routes_indexes, gateway_http_routes)
.instrument(info_span!("httproutes.gateway.networking.k8s.io")),
);
if api_resource_exists::<k8s::policy::HttpRoute>(&runtime.client()).await {
let http_routes = runtime.watch_all::<k8s::policy::HttpRoute>(watcher::Config::default());

let gateway_grpc_routes =
runtime.watch_all::<k8s_gateway_api::GrpcRoute>(watcher::Config::default());
let gateway_grpc_routes_indexes = IndexList::new(outbound_index.clone())
.push(inbound_index.clone())
.push(status_index.clone())
.shared();
tokio::spawn(
kubert::index::namespaced(gateway_grpc_routes_indexes.clone(), gateway_grpc_routes)
.instrument(info_span!("grpcroutes.gateway.networking.k8s.io")),
);
tokio::spawn(
kubert::index::namespaced(http_routes_indexes.clone(), http_routes)
.instrument(info_span!("httproutes.policy.linkerd.io")),
);
} else {
tracing::warn!("httproutes.policy.linkerd.io resource kind not found, skipping watches");
}

if api_resource_exists::<k8s_gateway_api::HttpRoute>(&runtime.client()).await {
let gateway_http_routes =
runtime.watch_all::<k8s_gateway_api::HttpRoute>(watcher::Config::default());
tokio::spawn(
kubert::index::namespaced(http_routes_indexes, gateway_http_routes)
.instrument(info_span!("httproutes.gateway.networking.k8s.io")),
);
} else {
tracing::warn!(
"httproutes.gateway.networking.k8s.io resource kind not found, skipping watches"
);
}

if api_resource_exists::<k8s_gateway_api::GrpcRoute>(&runtime.client()).await {
let gateway_grpc_routes =
runtime.watch_all::<k8s_gateway_api::GrpcRoute>(watcher::Config::default());
let gateway_grpc_routes_indexes = IndexList::new(outbound_index.clone())
.push(inbound_index.clone())
.push(status_index.clone())
.shared();
tokio::spawn(
kubert::index::namespaced(gateway_grpc_routes_indexes.clone(), gateway_grpc_routes)
.instrument(info_span!("grpcroutes.gateway.networking.k8s.io")),
);
} else {
tracing::warn!(
"grpcroutes.gateway.networking.k8s.io resource kind not found, skipping watches"
);
}

let services = runtime.watch_all::<k8s::Service>(watcher::Config::default());
let services_indexes = IndexList::new(outbound_index.clone())
Expand Down Expand Up @@ -435,3 +453,16 @@ async fn init_lease(client: Client, ns: &str, deployment_name: &str) -> Result<L
.await
.map_err(Into::into)
}

async fn api_resource_exists<T>(client: &Client) -> bool
where
T: Resource,
T::DynamicType: Default,
{
let dt = Default::default();
let resources = client
.list_api_group_resources(&T::api_version(&dt))
.await
.expect("Failed to list API group resources");
resources.resources.iter().any(|r| r.kind == T::kind(&dt))
}

0 comments on commit 7d5ddd8

Please sign in to comment.