diff --git a/zstor/src/actors/backends.rs b/zstor/src/actors/backends.rs index 6a0431d..d9d2929 100644 --- a/zstor/src/actors/backends.rs +++ b/zstor/src/actors/backends.rs @@ -7,7 +7,7 @@ use crate::actors::{ use crate::{ config::{Encryption, Meta}, encryption::AesGcm, - zdb::{SequentialZdb, UserKeyZdb, ZdbConnectionInfo, ZdbRunMode}, + zdb::{NsInfo, SequentialZdb, UserKeyZdb, ZdbConnectionInfo, ZdbRunMode}, zdb_meta::ZdbMetaStore, ZstorError, }; @@ -67,6 +67,45 @@ impl BackendManagerActor { fn check_backends(&mut self, ctx: &mut ::Context) { ctx.notify(CheckBackends); } + + /// check & returns new metadata backends if the cluster needs to be refreshed + fn check_new_metastore( + &self, + meta_info: Vec<( + ZdbConnectionInfo, + Option, + BackendState, + Option, + )>, + ) -> Option> { + let mut need_refresh = false; + for (ci, _, new_state, _) in &meta_info { + if let Some((_, old_state)) = self.managed_meta_dbs.get(ci) { + if old_state.is_writeable() != new_state.is_writeable() { + need_refresh = true; + break; + } + } + } + if !need_refresh { + return None; + } + // get new backends: + // - new_db + // - old_db with state == writable + let mut backends = Vec::new(); + let managed_db = self.managed_meta_dbs.clone(); + for (ci, new_db, new_state, _) in &meta_info { + if let Some(new_db) = new_db { + backends.push(new_db.clone()); + } else if new_state.is_writeable() { + if let Some((Some(db), _)) = managed_db.get(ci) { + backends.push(db.clone()); + } + } + } + Some(backends) + } } /// Message requesting the actor checks the backends, and updates the state of all managed @@ -120,6 +159,7 @@ impl Actor for BackendManagerActor { ); } } + impl Handler for BackendManagerActor { type Result = Result<(), ZstorError>; @@ -340,9 +380,51 @@ impl Handler for BackendManagerActor { } } }); - (data_info, join_all(futs).await) + let meta_info = join_all(futs).await; + (data_info, meta_info) } .into_actor(self) + .then(|(data_info, meta_info), actor, _| { + let new_meta_backends = actor.check_new_metastore(meta_info.clone()); + + let config_addr = actor.config_addr.clone(); + let metastore = actor.metastore.clone(); + async move { + if let Some(backends) = new_meta_backends { + let config = match config_addr.send(GetConfig).await { + Ok(cfg) => cfg, + Err(e) => { + error!("Failed to get running config: {}", e); + return (data_info, meta_info); + } + }; + let Meta::Zdb(meta_config) = config.meta(); + let encoder = meta_config.encoder(); + let encryptor = match config.encryption() { + Encryption::Aes(key) => AesGcm::new(key.clone()), + }; + let new_cluster = ZdbMetaStore::new( + backends, + encoder.clone(), + encryptor.clone(), + meta_config.prefix().to_owned(), + config.virtual_root().clone(), + ); + let writeable = new_cluster.writable(); + if let Err(e) = metastore + .send(ReplaceMetaStore { + new_store: Box::new(new_cluster), + writeable, + }) + .await + { + error!("Failed to send ReplaceMetaStore message: {}", e); + } + } + (data_info, meta_info) + } + .into_actor(actor) + }) .map(|res, actor, _ctx| { for (ci, new_db, new_state, info) in res.0.into_iter() { // update metrics @@ -565,9 +647,11 @@ impl Handler for BackendManagerActor { }; // TODO: send new nodes to config and save it + let metastore_writeable = new_cluster.writable(); if let Err(e) = metastore .send(ReplaceMetaStore { new_store: Box::new(new_cluster), + writeable: metastore_writeable, }) .await { diff --git a/zstor/src/actors/meta.rs b/zstor/src/actors/meta.rs index 714ef86..fbd6d5b 100644 --- a/zstor/src/actors/meta.rs +++ b/zstor/src/actors/meta.rs @@ -106,6 +106,9 @@ pub struct MarkWriteable { pub struct ReplaceMetaStore { /// The new metastore to set pub new_store: Box, + + /// writeable flag + pub writeable: bool, } /// Actor for a metastore @@ -272,6 +275,8 @@ impl Handler for MetaStoreActor { type Result = (); fn handle(&mut self, msg: ReplaceMetaStore, _: &mut Self::Context) -> Self::Result { + log::info!("ReplaceMetaStore writeable: {}", msg.writeable); self.meta_store = Arc::from(msg.new_store as Box); + self.writeable = msg.writeable; } } diff --git a/zstor/src/zdb.rs b/zstor/src/zdb.rs index 2c20b7a..9717e20 100644 --- a/zstor/src/zdb.rs +++ b/zstor/src/zdb.rs @@ -806,7 +806,7 @@ impl UserKeyZdb { } /// Information about a 0-db namespace, as reported by the db itself. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct NsInfo { /// The name of the namespace. pub name: String,