Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update controller and agent to kube-rs client 0.91.0 #702

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
525 changes: 390 additions & 135 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ env_logger = "0.10.0"
futures = { version = "0.3.1", package = "futures" }
hyper = "0.14.2"
itertools = "0.12.0"
k8s-openapi = { version = "0.20.0", default-features = false, features = ["schemars", "v1_23"] }
kube = { version = "0.87.1", features = ["derive"] }
kube-runtime = { version = "0.87.1", features = ["unstable-runtime-reconcile-on"] }
k8s-openapi = { version = "0.22.0", default-features = false, features = ["schemars", "v1_25"] }
kube = { version = "0.91.0", features = [ "derive", "runtime"] }
kube-runtime = { version = "0.91.0", features = ["unstable-runtime-reconcile-on"] }
lazy_static = "1.4"
log = "0.4"
mockall_double = "0.3.1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use futures::future::try_join_all;
use futures::FutureExt;
use itertools::Itertools;
use kube::core::ObjectMeta;
use kube_runtime::reflector::ObjectRef;
use kube::runtime::reflector::ObjectRef;
use tokio::select;
use tokio::sync::mpsc;
use tokio::sync::watch;
Expand Down
2 changes: 1 addition & 1 deletion agent/src/discovery_handler_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{collections::HashMap, sync::Arc};
use akri_shared::{akri::configuration::Configuration, k8s::api::IntoApi};
use k8s_openapi::api::core::v1::{ConfigMap, Secret};

use kube_runtime::reflector::ObjectRef;
use kube::runtime::reflector::ObjectRef;
use thiserror::Error;
use tokio::sync::{mpsc, watch};

Expand Down
2 changes: 1 addition & 1 deletion agent/src/util/discovery_configuration_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::discovery_handler_manager::{
};

use kube::{Resource, ResourceExt};
use kube_runtime::{
use kube::runtime::{
controller::Action,
reflector::{ObjectRef, Store},
Controller,
Expand Down
13 changes: 8 additions & 5 deletions controller/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,17 @@ akri-shared = { path = "../shared" }
anyhow = "1.0.38"
async-std = "1.5.0"
chrono = "0.4.10"
env_logger = "0.10.0"
either = "1.13"
env_logger = "0.11.5"
futures = "0.3.1"
k8s-openapi = { version = "0.20.0", default-features = false, features = ["schemars", "v1_23"] }
kube = { version = "0.87.1", features = ["derive"] }
kube-runtime = "0.87.1"
k8s-openapi = { version = "0.22.0", default-features = false, features = ["schemars", "v1_25"] }
kube = { version = "0.91.0", features = ["runtime", "client", "derive" ] }
lazy_static = "1.4"
log = "0.4"
prometheus = { version = "0.12.0", features = ["process"] }
prometheus = { version = "0.13.4", features = ["process"] }
# Used for patch API
serde_json = "1.0.45"
thiserror = "1"
tokio = { version = "1.0.2", features = ["full"] }

[dev-dependencies]
Expand Down
49 changes: 19 additions & 30 deletions controller/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ extern crate lazy_static;
mod util;

use akri_shared::akri::{metrics::run_metrics_server, API_NAMESPACE};
use async_std::sync::Mutex;
use prometheus::IntGaugeVec;
use std::sync::Arc;
use util::{instance_action, node_watcher, pod_watcher};
use util::{
controller_ctx::{ControllerContext, CONTROLLER_FIELD_MANAGER_ID},
instance_action, node_watcher, pod_watcher,
};

/// Length of time to sleep between controller system validation checks
pub const SYSTEM_CHECK_DELAY_SECS: u64 = 30;
Expand All @@ -32,43 +34,30 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>
);

log::info!("{} Controller logging started", API_NAMESPACE);

let synchronization = Arc::new(Mutex::new(()));
let instance_watch_synchronization = synchronization.clone();
let mut tasks = Vec::new();

// Start server for prometheus metrics
tasks.push(tokio::spawn(async move {
run_metrics_server().await.unwrap();
}));
tokio::spawn(run_metrics_server());

let controller_ctx = Arc::new(ControllerContext::new(
Arc::new(kube::Client::try_default().await?),
CONTROLLER_FIELD_MANAGER_ID,
));
let instance_water_ctx = controller_ctx.clone();
kate-goldenring marked this conversation as resolved.
Show resolved Hide resolved
let node_watcher_ctx = controller_ctx.clone();
let pod_watcher_ctx = controller_ctx.clone();

// Handle existing instances
tasks.push(tokio::spawn({
async move {
instance_action::handle_existing_instances().await.unwrap();
}
}));
// Handle instance changes
tasks.push(tokio::spawn({
async move {
instance_action::do_instance_watch(instance_watch_synchronization)
.await
.unwrap();
}
tasks.push(tokio::spawn(async {
instance_action::run(instance_water_ctx).await;
}));
// Watch for node disappearance
tasks.push(tokio::spawn({
async move {
let mut node_watcher = node_watcher::NodeWatcher::new();
node_watcher.watch().await.unwrap();
}
tasks.push(tokio::spawn(async {
node_watcher::run(node_watcher_ctx).await;
}));
// Watch for broker Pod state changes
tasks.push(tokio::spawn({
async move {
let mut broker_pod_watcher = pod_watcher::BrokerPodWatcher::new();
broker_pod_watcher.watch().await.unwrap();
}
tasks.push(tokio::spawn(async {
pod_watcher::run(pod_watcher_ctx).await;
}));

futures::future::try_join_all(tasks).await?;
Expand Down
106 changes: 106 additions & 0 deletions controller/src/util/controller_ctx.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
use std::collections::HashMap;
use std::sync::Arc;

use akri_shared::akri::configuration::Configuration;
use akri_shared::akri::instance::Instance;
use akri_shared::k8s::api::IntoApi;

use k8s_openapi::api::batch::v1::Job;
use k8s_openapi::api::core::v1::{Node, Pod, Service};

use tokio::sync::RwLock;

// Identifier for the controller to be set as the field manager for server-side apply
pub const CONTROLLER_FIELD_MANAGER_ID: &str = "akri.sh/controller";

/// Pod states that BrokerPodWatcher is interested in
///
/// PodState describes the various states that the controller can
/// react to for Pods.
#[derive(Clone, Debug, PartialEq)]
pub enum PodState {
/// Pod is in Pending state and no action is needed.
Pending,
/// Pod is in Running state and needs to ensure that
/// instance and configuration services are running
Running,
/// Pod is in Failed/Completed/Succeeded state and
/// needs to remove any instance and configuration
/// services that are not supported by other Running
/// Pods. Also, at this point, if an Instance still
/// exists, instance_action::handle_instance_change
/// needs to be called to ensure that Pods are
/// restarted
Ended,
/// Pod is in Deleted state and needs to remove any
/// instance and configuration services that are not
/// supported by other Running Pods. Also, at this
/// point, if an Instance still exists, and the Pod is
/// owned by the Instance,
/// instance_action::handle_instance_change needs to be
/// called to ensure that Pods are restarted. Akri
/// places an Instance OwnerReference on all the Pods it
/// deploys. This declares that the Instance owns that
/// Pod and Akri's Controller explicitly manages its
/// deployment. However, if the Pod is not owned by the
/// Instance, Akri should not assume retry logic and
/// should cease action. The owning object (ie Job) will
/// handle retries as necessary.
Deleted,
}

/// Node states that NodeWatcher is interested in
///
/// NodeState describes the various states that the controller can
/// react to for Nodes.
#[derive(Clone, Debug, PartialEq)]
pub enum NodeState {
/// Node has been seen, but not Running yet
Known,
/// Node has been seen Running
Running,
/// A previously Running Node has been seen as not Running
/// and the Instances have been cleaned of references to that
/// vanished Node
InstancesCleaned,
}

pub trait ControllerKubeClient:
IntoApi<Instance>
+ IntoApi<Configuration>
+ IntoApi<Pod>
+ IntoApi<Job>
+ IntoApi<Service>
+ IntoApi<Node>
{
}

impl<
T: IntoApi<Instance>
+ IntoApi<Configuration>
+ IntoApi<Pod>
+ IntoApi<Job>
+ IntoApi<Service>
+ IntoApi<Node>,
> ControllerKubeClient for T
{
}

pub struct ControllerContext {
/// Kubernetes client
pub client: Arc<dyn ControllerKubeClient>,
pub known_pods: Arc<RwLock<HashMap<String, PodState>>>,
pub known_nodes: Arc<RwLock<HashMap<String, NodeState>>>,
pub identifier: String,
}

impl ControllerContext {
pub fn new(client: Arc<dyn ControllerKubeClient>, identifier: &str) -> Self {
ControllerContext {
client,
known_pods: Arc::new(RwLock::new(HashMap::new())),
known_nodes: Arc::new(RwLock::new(HashMap::new())),
identifier: identifier.to_string(),
}
}
}
Loading
Loading