From 3468aed115d5f942b61e94a7905ce8d77c6b585e Mon Sep 17 00:00:00 2001 From: Yury Akudovich Date: Thu, 10 Oct 2024 16:17:01 +0200 Subject: [PATCH 1/5] Add max_provers config. --- .../config/src/configs/prover_autoscaler.rs | 2 + .../src/proto/config/prover_autoscaler.proto | 6 +++ .../protobuf_config/src/prover_autoscaler.rs | 38 +++++++++++++++++++ .../prover_autoscaler/src/global/scaler.rs | 9 ++++- 4 files changed, 54 insertions(+), 1 deletion(-) diff --git a/core/lib/config/src/configs/prover_autoscaler.rs b/core/lib/config/src/configs/prover_autoscaler.rs index 41131fc1b8c..6f83f0d2d18 100644 --- a/core/lib/config/src/configs/prover_autoscaler.rs +++ b/core/lib/config/src/configs/prover_autoscaler.rs @@ -51,6 +51,8 @@ pub struct ProverAutoscalerScalerConfig { pub cluster_priorities: HashMap, /// Prover speed per GPU. Used to calculate desired number of provers for queue size. pub prover_speed: HashMap, + /// Maximum number of provers which can be run per cluster/GPU. + pub max_provers: HashMap>, /// Duration after which pending pod considered long pending. #[serde(default = "ProverAutoscalerScalerConfig::default_long_pending_duration")] pub long_pending_duration: Duration, diff --git a/core/lib/protobuf_config/src/proto/config/prover_autoscaler.proto b/core/lib/protobuf_config/src/proto/config/prover_autoscaler.proto index e1d11b94d8f..8363b625119 100644 --- a/core/lib/protobuf_config/src/proto/config/prover_autoscaler.proto +++ b/core/lib/protobuf_config/src/proto/config/prover_autoscaler.proto @@ -34,6 +34,11 @@ message ProverSpeed { optional uint32 speed = 2; // required } +message MaxProver { + optional string cluster_and_gpu = 1; // required, format: / + optional uint32 max = 2; // required +} + message ProverAutoscalerScalerConfig { optional uint32 prometheus_port = 1; // required optional std.Duration scaler_run_interval = 2; // optional @@ -43,4 +48,5 @@ message ProverAutoscalerScalerConfig { repeated ClusterPriority cluster_priorities = 6; // optional repeated ProverSpeed prover_speed = 7; // optional optional uint32 long_pending_duration_s = 8; // optional + repeated MaxProver max_provers = 9; // optional } diff --git a/core/lib/protobuf_config/src/prover_autoscaler.rs b/core/lib/protobuf_config/src/prover_autoscaler.rs index f7da099cb82..8f2ecf4ac50 100644 --- a/core/lib/protobuf_config/src/prover_autoscaler.rs +++ b/core/lib/protobuf_config/src/prover_autoscaler.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + use anyhow::Context as _; use time::Duration; use zksync_config::configs::{self, prover_autoscaler::Gpu}; @@ -92,6 +94,15 @@ impl ProtoRepr for proto::ProverAutoscalerScalerConfig { Some(s) => Duration::seconds(s.into()), None => Self::Type::default_long_pending_duration(), }, + max_provers: self.max_provers.iter().fold(HashMap::new(), |mut acc, e| { + let (cluster_and_gpu, max) = e.read().expect("max_provers"); + if let Some((cluster, gpu)) = cluster_and_gpu.split_once('/') { + acc.entry(cluster.to_string()) + .or_insert_with(HashMap::new) + .insert(gpu.parse().expect("max_provers/gpu"), max); + } + acc + }), }) } @@ -117,6 +128,15 @@ impl ProtoRepr for proto::ProverAutoscalerScalerConfig { .map(|(k, v)| proto::ProverSpeed::build(&(*k, *v))) .collect(), long_pending_duration_s: Some(this.long_pending_duration.whole_seconds() as u32), + max_provers: this + .max_provers + .iter() + .flat_map(|(cluster, inner_map)| { + inner_map.iter().map(move |(gpu, max)| { + proto::MaxProver::build(&(format!("{}/{}", cluster, gpu), *max)) + }) + }) + .collect(), } } } @@ -170,3 +190,21 @@ impl ProtoRepr for proto::ProverSpeed { } } } + +impl ProtoRepr for proto::MaxProver { + type Type = (String, u32); + fn read(&self) -> anyhow::Result { + Ok(( + required(&self.cluster_and_gpu) + .context("cluster_and_gpu")? + .parse()?, + *required(&self.max).context("max")?, + )) + } + fn build(this: &Self::Type) -> Self { + Self { + cluster_and_gpu: Some(this.0.to_string()), + max: Some(this.1), + } + } +} diff --git a/prover/crates/bin/prover_autoscaler/src/global/scaler.rs b/prover/crates/bin/prover_autoscaler/src/global/scaler.rs index 9f37c4d1167..e003e156922 100644 --- a/prover/crates/bin/prover_autoscaler/src/global/scaler.rs +++ b/prover/crates/bin/prover_autoscaler/src/global/scaler.rs @@ -56,6 +56,7 @@ pub struct Scaler { /// Which cluster to use first. cluster_priorities: HashMap, + max_provers: HashMap>, prover_speed: HashMap, long_pending_duration: chrono::Duration, } @@ -87,6 +88,7 @@ impl Scaler { watcher, queuer, cluster_priorities: config.cluster_priorities, + max_provers: config.max_provers, prover_speed: config.prover_speed, long_pending_duration: chrono::Duration::seconds( config.long_pending_duration.whole_seconds(), @@ -112,7 +114,12 @@ impl Scaler { let e = gp_map.entry(gpu).or_insert(GPUPool { name: cluster.name.clone(), gpu, - max_pool_size: 100, // TODO: get from the agent. + max_pool_size: self + .max_provers + .get(&cluster.name) + .and_then(|inner_map| inner_map.get(&gpu)) + .copied() + .unwrap_or(0), ..Default::default() }); From 903cb34b0179f56c1029b0b1909dc8bfe25413ac Mon Sep 17 00:00:00 2001 From: Yury Akudovich Date: Thu, 10 Oct 2024 17:59:20 +0200 Subject: [PATCH 2/5] Add check if clusters data is ready. --- .../prover_autoscaler/src/cluster_types.rs | 10 +--- .../prover_autoscaler/src/global/scaler.rs | 13 +++-- .../prover_autoscaler/src/global/watcher.rs | 49 +++++++++++++------ 3 files changed, 41 insertions(+), 31 deletions(-) diff --git a/prover/crates/bin/prover_autoscaler/src/cluster_types.rs b/prover/crates/bin/prover_autoscaler/src/cluster_types.rs index b074e0774c9..c25b624b5d4 100644 --- a/prover/crates/bin/prover_autoscaler/src/cluster_types.rs +++ b/prover/crates/bin/prover_autoscaler/src/cluster_types.rs @@ -36,19 +36,11 @@ pub struct Namespace { pub pods: HashMap, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct Cluster { pub name: String, pub namespaces: HashMap, } -impl Default for Cluster { - fn default() -> Self { - Self { - name: "".to_string(), - namespaces: HashMap::new(), - } - } -} #[derive(Debug, Default, Clone, Serialize, Deserialize)] pub struct Clusters { diff --git a/prover/crates/bin/prover_autoscaler/src/global/scaler.rs b/prover/crates/bin/prover_autoscaler/src/global/scaler.rs index e003e156922..99ec4436dbf 100644 --- a/prover/crates/bin/prover_autoscaler/src/global/scaler.rs +++ b/prover/crates/bin/prover_autoscaler/src/global/scaler.rs @@ -283,12 +283,13 @@ impl Task for Scaler { async fn invoke(&self) -> anyhow::Result<()> { let queue = self.queuer.get_queue().await.unwrap(); - // TODO: Check that clusters data is ready. - let clusters = self.watcher.clusters.lock().await; + let guard = self.watcher.data.lock().await; + watcher::check_is_ready(&guard.is_ready)?; + for (ns, ppv) in &self.namespaces { let q = queue.queue.get(ppv).cloned().unwrap_or(0); if q > 0 { - let provers = self.run(ns, q, &clusters); + let provers = self.run(ns, q, &guard.clusters); for (k, num) in &provers { AUTOSCALER_METRICS.provers[&(k.cluster.clone(), ns.clone(), k.gpu)] .set(*num as u64); @@ -309,7 +310,7 @@ mod tests { use super::*; use crate::{ - cluster_types::{self, Deployment, Namespace, Pod}, + cluster_types::{Deployment, Namespace, Pod}, global::{queuer, watcher}, }; @@ -317,9 +318,7 @@ mod tests { fn test_run() { let watcher = watcher::Watcher { cluster_agents: vec![], - clusters: Arc::new(Mutex::new(cluster_types::Clusters { - ..Default::default() - })), + data: Arc::new(Mutex::new(watcher::WatchedData::default())), }; let queuer = queuer::Queuer { prover_job_monitor_url: "".to_string(), diff --git a/prover/crates/bin/prover_autoscaler/src/global/watcher.rs b/prover/crates/bin/prover_autoscaler/src/global/watcher.rs index ef3ebd3b819..01fa68c60f8 100644 --- a/prover/crates/bin/prover_autoscaler/src/global/watcher.rs +++ b/prover/crates/bin/prover_autoscaler/src/global/watcher.rs @@ -1,6 +1,6 @@ use std::{collections::HashMap, sync::Arc}; -use anyhow::{Context, Ok}; +use anyhow::{anyhow, Context, Ok, Result}; use futures::future; use reqwest::Method; use tokio::sync::Mutex; @@ -12,15 +12,31 @@ use crate::{ task_wiring::Task, }; +#[derive(Default)] +pub struct WatchedData { + pub clusters: Clusters, + pub is_ready: Vec, +} + +pub fn check_is_ready(v: &Vec) -> Result<()> { + for b in v { + if !b { + return Err(anyhow!("Clusters data is not ready")); + } + } + Ok(()) +} + #[derive(Clone)] pub struct Watcher { /// List of base URLs of all agents. pub cluster_agents: Vec>, - pub clusters: Arc>, + pub data: Arc>, } impl Watcher { pub fn new(agent_urls: Vec) -> Self { + let size = agent_urls.len(); Self { cluster_agents: agent_urls .into_iter() @@ -31,8 +47,11 @@ impl Watcher { ) }) .collect(), - clusters: Arc::new(Mutex::new(Clusters { - clusters: HashMap::new(), + data: Arc::new(Mutex::new(WatchedData { + clusters: Clusters { + clusters: HashMap::new(), + }, + is_ready: vec![false; size], })), } } @@ -45,7 +64,8 @@ impl Task for Watcher { .cluster_agents .clone() .into_iter() - .map(|a| { + .enumerate() + .map(|(i, a)| { tracing::debug!("Getting cluster data from agent {}.", a); tokio::spawn(async move { let url: String = a @@ -55,13 +75,14 @@ impl Task for Watcher { .to_string(); let response = send_request_with_retries(&url, 5, Method::GET, None, None).await; - response + let res = response .map_err(|err| { anyhow::anyhow!("Failed fetching cluster from url: {url}: {err:?}") })? .json::() .await - .context("Failed to read response as json") + .context("Failed to read response as json"); + Ok((i, res)) }) }) .collect(); @@ -71,18 +92,16 @@ impl Task for Watcher { .await .into_iter() .map(|h| async move { - let c = h.unwrap().unwrap(); - self.clusters - .lock() - .await - .clusters - .insert(c.name.clone(), c); + let (i, res) = h??; + let c = res?; + let mut guard = self.data.lock().await; + guard.clusters.clusters.insert(c.name.clone(), c); + guard.is_ready[i] = true; Ok(()) }) .collect::>(), ) - .await - .unwrap(); + .await?; Ok(()) } From de35ec77e686f33e6fdbdf62724f58488951a21c Mon Sep 17 00:00:00 2001 From: Yury Akudovich Date: Wed, 16 Oct 2024 18:57:02 +0200 Subject: [PATCH 3/5] Run evaluation if queue is 0, but there some pods running in the namespace --- .../prover_autoscaler/src/global/scaler.rs | 26 +++++++++++++++++-- .../crates/bin/prover_autoscaler/src/main.rs | 14 +++++----- 2 files changed, 30 insertions(+), 10 deletions(-) diff --git a/prover/crates/bin/prover_autoscaler/src/global/scaler.rs b/prover/crates/bin/prover_autoscaler/src/global/scaler.rs index 99ec4436dbf..0dd3b91a792 100644 --- a/prover/crates/bin/prover_autoscaler/src/global/scaler.rs +++ b/prover/crates/bin/prover_autoscaler/src/global/scaler.rs @@ -272,12 +272,33 @@ impl Scaler { } } - tracing::debug!("run result: provers {:?}, total: {}", &provers, total); + tracing::debug!( + "run result for namespace {}: provers {:?}, total: {}", + namespace, + &provers, + total + ); provers } } +/// is_namespace_running returns true if there are some pods running in it. +fn is_namespace_running(namespace: &str, clusters: &Clusters) -> bool { + clusters + .clusters + .values() + .flat_map(|v| v.namespaces.iter()) + .filter_map(|(k, v)| if k == namespace { Some(v) } else { None }) + .flat_map(|v| v.deployments.values()) + .map( + |d| d.running + d.desired, // If there is something running or expected to run, we + // should consider the namespace. + ) + .sum::() + > 0 +} + #[async_trait::async_trait] impl Task for Scaler { async fn invoke(&self) -> anyhow::Result<()> { @@ -288,7 +309,8 @@ impl Task for Scaler { for (ns, ppv) in &self.namespaces { let q = queue.queue.get(ppv).cloned().unwrap_or(0); - if q > 0 { + tracing::debug!("Running eval for namespace {ns} and PPV {ppv} found queue {q}"); + if q > 0 || is_namespace_running(ns, &guard.clusters) { let provers = self.run(ns, q, &guard.clusters); for (k, num) in &provers { AUTOSCALER_METRICS.provers[&(k.cluster.clone(), ns.clone(), k.gpu)] diff --git a/prover/crates/bin/prover_autoscaler/src/main.rs b/prover/crates/bin/prover_autoscaler/src/main.rs index 196bd6deb81..e3aec1fbd39 100644 --- a/prover/crates/bin/prover_autoscaler/src/main.rs +++ b/prover/crates/bin/prover_autoscaler/src/main.rs @@ -80,24 +80,21 @@ async fn main() -> anyhow::Result<()> { let _ = rustls::crypto::ring::default_provider().install_default(); let client = kube::Client::try_default().await?; - tracing::info!("Starting ProverAutoscaler"); - let mut tasks = vec![]; match opt.job { AutoscalerType::Agent => { + let cluster = opt + .cluster_name + .context("cluster_name is required for Agent")?; + tracing::info!("Starting ProverAutoscaler Agent for cluster {}", cluster); let agent_config = general_config.agent_config.context("agent_config")?; let exporter_config = PrometheusExporterConfig::pull(agent_config.prometheus_port); tasks.push(tokio::spawn(exporter_config.run(stop_receiver.clone()))); // TODO: maybe get cluster name from curl -H "Metadata-Flavor: Google" // http://metadata.google.internal/computeMetadata/v1/instance/attributes/cluster-name - let watcher = Watcher::new( - client.clone(), - opt.cluster_name - .context("cluster_name is required for Agent")?, - agent_config.namespaces, - ); + let watcher = Watcher::new(client.clone(), cluster, agent_config.namespaces); let scaler = Scaler { client }; tasks.push(tokio::spawn(watcher.clone().run())); tasks.push(tokio::spawn(agent::run_server( @@ -108,6 +105,7 @@ async fn main() -> anyhow::Result<()> { ))) } AutoscalerType::Scaler => { + tracing::info!("Starting ProverAutoscaler Scaler"); let scaler_config = general_config.scaler_config.context("scaler_config")?; let interval = scaler_config.scaler_run_interval.unsigned_abs(); let exporter_config = PrometheusExporterConfig::pull(scaler_config.prometheus_port); From 0422797b46eac94f2303fade340473e80eadf2a0 Mon Sep 17 00:00:00 2001 From: Yury Akudovich Date: Wed, 16 Oct 2024 19:21:59 +0200 Subject: [PATCH 4/5] Fix lint and test. --- contracts | 2 +- core/lib/protobuf_config/src/prover_autoscaler.rs | 2 +- .../crates/bin/prover_autoscaler/src/global/scaler.rs | 11 +++++++++-- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/contracts b/contracts index 84d5e3716f6..aafee035db8 160000 --- a/contracts +++ b/contracts @@ -1 +1 @@ -Subproject commit 84d5e3716f645909e8144c7d50af9dd6dd9ded62 +Subproject commit aafee035db892689df3f7afe4b89fd6467a39313 diff --git a/core/lib/protobuf_config/src/prover_autoscaler.rs b/core/lib/protobuf_config/src/prover_autoscaler.rs index 8f2ecf4ac50..e95e4003972 100644 --- a/core/lib/protobuf_config/src/prover_autoscaler.rs +++ b/core/lib/protobuf_config/src/prover_autoscaler.rs @@ -98,7 +98,7 @@ impl ProtoRepr for proto::ProverAutoscalerScalerConfig { let (cluster_and_gpu, max) = e.read().expect("max_provers"); if let Some((cluster, gpu)) = cluster_and_gpu.split_once('/') { acc.entry(cluster.to_string()) - .or_insert_with(HashMap::new) + .or_default() .insert(gpu.parse().expect("max_provers/gpu"), max); } acc diff --git a/prover/crates/bin/prover_autoscaler/src/global/scaler.rs b/prover/crates/bin/prover_autoscaler/src/global/scaler.rs index 0dd3b91a792..75c9e2e3e42 100644 --- a/prover/crates/bin/prover_autoscaler/src/global/scaler.rs +++ b/prover/crates/bin/prover_autoscaler/src/global/scaler.rs @@ -345,7 +345,14 @@ mod tests { let queuer = queuer::Queuer { prover_job_monitor_url: "".to_string(), }; - let scaler = Scaler::new(watcher, queuer, ProverAutoscalerScalerConfig::default()); + let scaler = Scaler::new( + watcher, + queuer, + ProverAutoscalerScalerConfig { + max_provers: HashMap::from([("foo".to_string(), HashMap::from([(Gpu::L4, 100)]))]), + ..Default::default() + }, + ); let got = scaler.run( &"prover".to_string(), 1499, @@ -383,6 +390,6 @@ mod tests { }, 3, )]); - assert!(got == want); + assert_eq!(got, want); } } From 40f077d9a2d20350e36182abfdf926469f4e9656 Mon Sep 17 00:00:00 2001 From: Yury Akudovich Date: Wed, 16 Oct 2024 19:31:59 +0200 Subject: [PATCH 5/5] Submodule sync --- contracts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contracts b/contracts index aafee035db8..84d5e3716f6 160000 --- a/contracts +++ b/contracts @@ -1 +1 @@ -Subproject commit aafee035db892689df3f7afe4b89fd6467a39313 +Subproject commit 84d5e3716f645909e8144c7d50af9dd6dd9ded62