diff --git a/.github/workflows/develop.yml b/.github/workflows/develop.yml index 073c40748618..13c155d6171d 100644 --- a/.github/workflows/develop.yml +++ b/.github/workflows/develop.yml @@ -442,6 +442,13 @@ jobs: minio: true kafka: true values: "with-remote-wal.yaml" + include: + - target: "fuzz_migrate_mito_regions" + mode: + name: "Local WAL" + minio: true + kafka: false + values: "with-minio.yaml" steps: - name: Remove unused software run: | @@ -530,7 +537,7 @@ jobs: with: image-registry: localhost:5001 values-filename: ${{ matrix.mode.values }} - enable-region-failover: true + enable-region-failover: ${{ matrix.mode.kafka }} - name: Port forward (mysql) run: | kubectl port-forward service/my-greptimedb-frontend 4002:4002 -n my-greptimedb& diff --git a/Cargo.lock b/Cargo.lock index 25ba1eb378e7..dbc29676819b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4413,7 +4413,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=36334744c7020734dcb4a6b8d24d52ae7ed53fe1#36334744c7020734dcb4a6b8d24d52ae7ed53fe1" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=0b4f7c8ab06399f6b90e1626e8d5b9697cb33bb9#0b4f7c8ab06399f6b90e1626e8d5b9697cb33bb9" dependencies = [ "prost 0.12.6", "serde", diff --git a/Cargo.toml b/Cargo.toml index 44b2cda1227f..63d7ad3ba739 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -120,7 +120,7 @@ etcd-client = { version = "0.13" } fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "36334744c7020734dcb4a6b8d24d52ae7ed53fe1" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "0b4f7c8ab06399f6b90e1626e8d5b9697cb33bb9" } humantime = "2.1" humantime-serde = "1.1" itertools = "0.10" diff --git a/src/catalog/src/system_schema/information_schema/region_peers.rs b/src/catalog/src/system_schema/information_schema/region_peers.rs index 5496879af0fb..50c2593f8621 100644 --- a/src/catalog/src/system_schema/information_schema/region_peers.rs +++ b/src/catalog/src/system_schema/information_schema/region_peers.rs @@ -224,8 +224,8 @@ impl InformationSchemaRegionPeersBuilder { let region_id = RegionId::new(table_id, route.region.id.region_number()).as_u64(); let peer_id = route.leader_peer.clone().map(|p| p.id); let peer_addr = route.leader_peer.clone().map(|p| p.addr); - let status = if let Some(status) = route.leader_status { - Some(status.as_ref().to_string()) + let state = if let Some(state) = route.leader_state { + Some(state.as_ref().to_string()) } else { // Alive by default Some("ALIVE".to_string()) @@ -242,7 +242,7 @@ impl InformationSchemaRegionPeersBuilder { self.peer_ids.push(peer_id); self.peer_addrs.push(peer_addr.as_deref()); self.is_leaders.push(Some("Yes")); - self.statuses.push(status.as_deref()); + self.statuses.push(state.as_deref()); self.down_seconds .push(route.leader_down_millis().map(|m| m / 1000)); } diff --git a/src/cmd/src/cli/bench.rs b/src/cmd/src/cli/bench.rs index bf5a6825f014..f3d1d0f8097f 100644 --- a/src/cmd/src/cli/bench.rs +++ b/src/cmd/src/cli/bench.rs @@ -158,7 +158,7 @@ fn create_region_routes(regions: Vec) -> Vec { addr: String::new(), }), follower_peers: vec![], - leader_status: None, + leader_state: None, leader_down_since: None, }); } diff --git a/src/common/meta/src/ddl/alter_table/region_request.rs b/src/common/meta/src/ddl/alter_table/region_request.rs index b4223b8ea05d..07563603954f 100644 --- a/src/common/meta/src/ddl/alter_table/region_request.rs +++ b/src/common/meta/src/ddl/alter_table/region_request.rs @@ -187,7 +187,7 @@ mod tests { region: Region::new_test(region_id), leader_peer: Some(Peer::empty(1)), follower_peers: vec![], - leader_status: None, + leader_state: None, leader_down_since: None, }]), HashMap::new(), diff --git a/src/common/meta/src/ddl/tests/alter_table.rs b/src/common/meta/src/ddl/tests/alter_table.rs index 06654cfe0f3d..36a1ff0ecece 100644 --- a/src/common/meta/src/ddl/tests/alter_table.rs +++ b/src/common/meta/src/ddl/tests/alter_table.rs @@ -107,21 +107,21 @@ async fn test_on_submit_alter_request() { region: Region::new_test(RegionId::new(table_id, 1)), leader_peer: Some(Peer::empty(1)), follower_peers: vec![Peer::empty(5)], - leader_status: None, + leader_state: None, leader_down_since: None, }, RegionRoute { region: Region::new_test(RegionId::new(table_id, 2)), leader_peer: Some(Peer::empty(2)), follower_peers: vec![Peer::empty(4)], - leader_status: None, + leader_state: None, leader_down_since: None, }, RegionRoute { region: Region::new_test(RegionId::new(table_id, 3)), leader_peer: Some(Peer::empty(3)), follower_peers: vec![], - leader_status: None, + leader_state: None, leader_down_since: None, }, ]), @@ -193,21 +193,21 @@ async fn test_on_submit_alter_request_with_outdated_request() { region: Region::new_test(RegionId::new(table_id, 1)), leader_peer: Some(Peer::empty(1)), follower_peers: vec![Peer::empty(5)], - leader_status: None, + leader_state: None, leader_down_since: None, }, RegionRoute { region: Region::new_test(RegionId::new(table_id, 2)), leader_peer: Some(Peer::empty(2)), follower_peers: vec![Peer::empty(4)], - leader_status: None, + leader_state: None, leader_down_since: None, }, RegionRoute { region: Region::new_test(RegionId::new(table_id, 3)), leader_peer: Some(Peer::empty(3)), follower_peers: vec![], - leader_status: None, + leader_state: None, leader_down_since: None, }, ]), diff --git a/src/common/meta/src/ddl/tests/drop_table.rs b/src/common/meta/src/ddl/tests/drop_table.rs index aff123747223..c3a5f5875cad 100644 --- a/src/common/meta/src/ddl/tests/drop_table.rs +++ b/src/common/meta/src/ddl/tests/drop_table.rs @@ -119,21 +119,21 @@ async fn test_on_datanode_drop_regions() { region: Region::new_test(RegionId::new(table_id, 1)), leader_peer: Some(Peer::empty(1)), follower_peers: vec![Peer::empty(5)], - leader_status: None, + leader_state: None, leader_down_since: None, }, RegionRoute { region: Region::new_test(RegionId::new(table_id, 2)), leader_peer: Some(Peer::empty(2)), follower_peers: vec![Peer::empty(4)], - leader_status: None, + leader_state: None, leader_down_since: None, }, RegionRoute { region: Region::new_test(RegionId::new(table_id, 3)), leader_peer: Some(Peer::empty(3)), follower_peers: vec![], - leader_status: None, + leader_state: None, leader_down_since: None, }, ]), diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index 61e2811e72b2..4864f7562d10 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -137,14 +137,16 @@ pub struct DowngradeRegion { /// `None` stands for don't flush before downgrading the region. #[serde(default)] pub flush_timeout: Option, + /// Rejects all write requests after flushing. + pub reject_write: bool, } impl Display for DowngradeRegion { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, - "DowngradeRegion(region_id={}, flush_timeout={:?})", - self.region_id, self.flush_timeout, + "DowngradeRegion(region_id={}, flush_timeout={:?}, rejct_write={})", + self.region_id, self.flush_timeout, self.reject_write ) } } diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index d864882da6dd..0f703b9430a3 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -140,7 +140,7 @@ use crate::key::table_route::TableRouteKey; use crate::key::txn_helper::TxnOpGetResponseSet; use crate::kv_backend::txn::{Txn, TxnOp}; use crate::kv_backend::KvBackendRef; -use crate::rpc::router::{region_distribution, RegionRoute, RegionStatus}; +use crate::rpc::router::{region_distribution, LeaderState, RegionRoute}; use crate::rpc::store::BatchDeleteRequest; use crate::DatanodeId; @@ -1126,14 +1126,14 @@ impl TableMetadataManager { next_region_route_status: F, ) -> Result<()> where - F: Fn(&RegionRoute) -> Option>, + F: Fn(&RegionRoute) -> Option>, { let mut new_region_routes = current_table_route_value.region_routes()?.clone(); let mut updated = 0; for route in &mut new_region_routes { - if let Some(status) = next_region_route_status(route) { - if route.set_leader_status(status) { + if let Some(state) = next_region_route_status(route) { + if route.set_leader_state(state) { updated += 1; } } @@ -1280,7 +1280,7 @@ mod tests { use crate::key::{DeserializedValueWithBytes, TableMetadataManager, ViewInfoValue}; use crate::kv_backend::memory::MemoryKvBackend; use crate::peer::Peer; - use crate::rpc::router::{region_distribution, Region, RegionRoute, RegionStatus}; + use crate::rpc::router::{region_distribution, LeaderState, Region, RegionRoute}; #[test] fn test_deserialized_value_with_bytes() { @@ -1324,7 +1324,7 @@ mod tests { }, leader_peer: Some(Peer::new(datanode, "a2")), follower_peers: vec![], - leader_status: None, + leader_state: None, leader_down_since: None, } } @@ -1715,7 +1715,7 @@ mod tests { attrs: BTreeMap::new(), }, leader_peer: Some(Peer::new(datanode, "a2")), - leader_status: Some(RegionStatus::Downgraded), + leader_state: Some(LeaderState::Downgrading), follower_peers: vec![], leader_down_since: Some(current_time_millis()), }, @@ -1727,7 +1727,7 @@ mod tests { attrs: BTreeMap::new(), }, leader_peer: Some(Peer::new(datanode, "a1")), - leader_status: None, + leader_state: None, follower_peers: vec![], leader_down_since: None, }, @@ -1750,10 +1750,10 @@ mod tests { table_metadata_manager .update_leader_region_status(table_id, ¤t_table_route_value, |region_route| { - if region_route.leader_status.is_some() { + if region_route.leader_state.is_some() { None } else { - Some(Some(RegionStatus::Downgraded)) + Some(Some(LeaderState::Downgrading)) } }) .await @@ -1768,8 +1768,8 @@ mod tests { .unwrap(); assert_eq!( - updated_route_value.region_routes().unwrap()[0].leader_status, - Some(RegionStatus::Downgraded) + updated_route_value.region_routes().unwrap()[0].leader_state, + Some(LeaderState::Downgrading) ); assert!(updated_route_value.region_routes().unwrap()[0] @@ -1777,8 +1777,8 @@ mod tests { .is_some()); assert_eq!( - updated_route_value.region_routes().unwrap()[1].leader_status, - Some(RegionStatus::Downgraded) + updated_route_value.region_routes().unwrap()[1].leader_state, + Some(LeaderState::Downgrading) ); assert!(updated_route_value.region_routes().unwrap()[1] .leader_down_since @@ -1943,21 +1943,21 @@ mod tests { region: Region::new_test(RegionId::new(table_id, 1)), leader_peer: Some(Peer::empty(1)), follower_peers: vec![Peer::empty(5)], - leader_status: None, + leader_state: None, leader_down_since: None, }, RegionRoute { region: Region::new_test(RegionId::new(table_id, 2)), leader_peer: Some(Peer::empty(2)), follower_peers: vec![Peer::empty(4)], - leader_status: None, + leader_state: None, leader_down_since: None, }, RegionRoute { region: Region::new_test(RegionId::new(table_id, 3)), leader_peer: Some(Peer::empty(3)), follower_peers: vec![], - leader_status: None, + leader_state: None, leader_down_since: None, }, ]), @@ -1996,21 +1996,21 @@ mod tests { region: Region::new_test(RegionId::new(table_id, 1)), leader_peer: Some(Peer::empty(1)), follower_peers: vec![Peer::empty(5)], - leader_status: None, + leader_state: None, leader_down_since: None, }, RegionRoute { region: Region::new_test(RegionId::new(table_id, 2)), leader_peer: Some(Peer::empty(2)), follower_peers: vec![Peer::empty(4)], - leader_status: None, + leader_state: None, leader_down_since: None, }, RegionRoute { region: Region::new_test(RegionId::new(table_id, 3)), leader_peer: Some(Peer::empty(3)), follower_peers: vec![], - leader_status: None, + leader_state: None, leader_down_since: None, }, ]), diff --git a/src/common/meta/src/key/table_route.rs b/src/common/meta/src/key/table_route.rs index 0be0aab3aae0..c9990ab12129 100644 --- a/src/common/meta/src/key/table_route.rs +++ b/src/common/meta/src/key/table_route.rs @@ -744,6 +744,7 @@ mod tests { use crate::kv_backend::memory::MemoryKvBackend; use crate::kv_backend::{KvBackend, TxnService}; use crate::peer::Peer; + use crate::rpc::router::Region; use crate::rpc::store::PutRequest; #[test] @@ -751,11 +752,43 @@ mod tests { let old_raw_v = r#"{"region_routes":[{"region":{"id":1,"name":"r1","partition":null,"attrs":{}},"leader_peer":{"id":2,"addr":"a2"},"follower_peers":[]},{"region":{"id":1,"name":"r1","partition":null,"attrs":{}},"leader_peer":{"id":2,"addr":"a2"},"follower_peers":[]}],"version":0}"#; let v = TableRouteValue::try_from_raw_value(old_raw_v.as_bytes()).unwrap(); - let new_raw_v = format!("{:?}", v); - assert_eq!( - new_raw_v, - r#"Physical(PhysicalTableRouteValue { region_routes: [RegionRoute { region: Region { id: 1(0, 1), name: "r1", partition: None, attrs: {} }, leader_peer: Some(Peer { id: 2, addr: "a2" }), follower_peers: [], leader_status: None, leader_down_since: None }, RegionRoute { region: Region { id: 1(0, 1), name: "r1", partition: None, attrs: {} }, leader_peer: Some(Peer { id: 2, addr: "a2" }), follower_peers: [], leader_status: None, leader_down_since: None }], version: 0 })"# - ); + let expected_table_route = TableRouteValue::Physical(PhysicalTableRouteValue { + region_routes: vec![ + RegionRoute { + region: Region { + id: RegionId::new(0, 1), + name: "r1".to_string(), + partition: None, + attrs: Default::default(), + }, + leader_peer: Some(Peer { + id: 2, + addr: "a2".to_string(), + }), + follower_peers: vec![], + leader_state: None, + leader_down_since: None, + }, + RegionRoute { + region: Region { + id: RegionId::new(0, 1), + name: "r1".to_string(), + partition: None, + attrs: Default::default(), + }, + leader_peer: Some(Peer { + id: 2, + addr: "a2".to_string(), + }), + follower_peers: vec![], + leader_state: None, + leader_down_since: None, + }, + ], + version: 0, + }); + + assert_eq!(v, expected_table_route); } #[test] diff --git a/src/common/meta/src/region_keeper.rs b/src/common/meta/src/region_keeper.rs index a0d53b847752..54d5d6cc11d3 100644 --- a/src/common/meta/src/region_keeper.rs +++ b/src/common/meta/src/region_keeper.rs @@ -58,7 +58,7 @@ impl MemoryRegionKeeper { Default::default() } - /// Returns [OpeningRegionGuard] if Region(`region_id`) on Peer(`datanode_id`) does not exist. + /// Returns [OperatingRegionGuard] if Region(`region_id`) on Peer(`datanode_id`) does not exist. pub fn register( &self, datanode_id: DatanodeId, diff --git a/src/common/meta/src/rpc/router.rs b/src/common/meta/src/rpc/router.rs index 3e609e4af4d8..8dc409c8be31 100644 --- a/src/common/meta/src/rpc/router.rs +++ b/src/common/meta/src/rpc/router.rs @@ -108,16 +108,16 @@ pub fn convert_to_region_peer_map( .collect::>() } -/// Returns the HashMap<[RegionNumber], [RegionStatus]>; -pub fn convert_to_region_leader_status_map( +/// Returns the HashMap<[RegionNumber], [LeaderState]>; +pub fn convert_to_region_leader_state_map( region_routes: &[RegionRoute], -) -> HashMap { +) -> HashMap { region_routes .iter() .filter_map(|x| { - x.leader_status + x.leader_state .as_ref() - .map(|status| (x.region.id.region_number(), *status)) + .map(|state| (x.region.id.region_number(), *state)) }) .collect::>() } @@ -205,7 +205,7 @@ impl TableRoute { region, leader_peer, follower_peers, - leader_status: None, + leader_state: None, leader_down_since: None, }); } @@ -259,9 +259,13 @@ pub struct RegionRoute { pub follower_peers: Vec, /// `None` by default. #[builder(setter(into, strip_option), default)] - #[serde(default, skip_serializing_if = "Option::is_none")] - pub leader_status: Option, - /// The start time when the leader is in `Downgraded` status. + #[serde( + default, + alias = "leader_status", + skip_serializing_if = "Option::is_none" + )] + pub leader_state: Option, + /// The start time when the leader is in `Downgraded` state. #[serde(default)] #[builder(default = "self.default_leader_down_since()")] pub leader_down_since: Option, @@ -269,76 +273,78 @@ pub struct RegionRoute { impl RegionRouteBuilder { fn default_leader_down_since(&self) -> Option { - match self.leader_status { - Some(Some(RegionStatus::Downgraded)) => Some(current_time_millis()), + match self.leader_state { + Some(Some(LeaderState::Downgrading)) => Some(current_time_millis()), _ => None, } } } -/// The Status of the [Region]. +/// The State of the [`Region`] Leader. /// TODO(dennis): It's better to add more fine-grained statuses such as `PENDING` etc. #[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, AsRefStr)] #[strum(serialize_all = "UPPERCASE")] -pub enum RegionStatus { - /// The following cases in which the [Region] will be downgraded. +pub enum LeaderState { + /// The following cases in which the [`Region`] will be downgraded. /// - /// - The [Region] is unavailable(e.g., Crashed, Network disconnected). - /// - The [Region] was planned to migrate to another [Peer]. - Downgraded, + /// - The [`Region`] may be unavailable (e.g., Crashed, Network disconnected). + /// - The [`Region`] was planned to migrate to another [`Peer`]. + Downgrading, } impl RegionRoute { - /// Returns true if the Leader [Region] is downgraded. + /// Returns true if the Leader [`Region`] is downgraded. /// - /// The following cases in which the [Region] will be downgraded. + /// The following cases in which the [`Region`] will be downgraded. /// - /// - The [Region] is unavailable(e.g., Crashed, Network disconnected). - /// - The [Region] was planned to migrate to another [Peer]. + /// - The [`Region`] is unavailable(e.g., Crashed, Network disconnected). + /// - The [`Region`] was planned to migrate to another [`Peer`]. /// - pub fn is_leader_downgraded(&self) -> bool { - matches!(self.leader_status, Some(RegionStatus::Downgraded)) + pub fn is_leader_downgrading(&self) -> bool { + matches!(self.leader_state, Some(LeaderState::Downgrading)) } - /// Marks the Leader [Region] as downgraded. + /// Marks the Leader [`Region`] as [`RegionState::Downgrading`]. /// - /// We should downgrade a [Region] before deactivating it: + /// We should downgrade a [`Region`] before deactivating it: /// - /// - During the [Region] Failover Procedure. - /// - Migrating a [Region]. + /// - During the [`Region`] Failover Procedure. + /// - Migrating a [`Region`]. /// - /// **Notes:** Meta Server will stop renewing the lease for the downgraded [Region]. + /// **Notes:** Meta Server will renewing a special lease(`Downgrading`) for the downgrading [`Region`]. + /// + /// A downgrading region will reject any write requests, and only allow memetable to be flushed to object storage /// pub fn downgrade_leader(&mut self) { self.leader_down_since = Some(current_time_millis()); - self.leader_status = Some(RegionStatus::Downgraded) + self.leader_state = Some(LeaderState::Downgrading) } - /// Returns how long since the leader is in `Downgraded` status. + /// Returns how long since the leader is in `Downgraded` state. pub fn leader_down_millis(&self) -> Option { self.leader_down_since .map(|start| current_time_millis() - start) } - /// Sets the leader status. + /// Sets the leader state. /// /// Returns true if updated. - pub fn set_leader_status(&mut self, status: Option) -> bool { - let updated = self.leader_status != status; + pub fn set_leader_state(&mut self, state: Option) -> bool { + let updated = self.leader_state != state; - match (status, updated) { - (Some(RegionStatus::Downgraded), true) => { + match (state, updated) { + (Some(LeaderState::Downgrading), true) => { self.leader_down_since = Some(current_time_millis()); } - (Some(RegionStatus::Downgraded), false) => { - // Do nothing if leader is still in `Downgraded` status. + (Some(LeaderState::Downgrading), false) => { + // Do nothing if leader is still in `Downgraded` state. } _ => { self.leader_down_since = None; } } - self.leader_status = status; + self.leader_state = state; updated } } @@ -477,15 +483,15 @@ mod tests { }, leader_peer: Some(Peer::new(1, "a1")), follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")], - leader_status: None, + leader_state: None, leader_down_since: None, }; - assert!(!region_route.is_leader_downgraded()); + assert!(!region_route.is_leader_downgrading()); region_route.downgrade_leader(); - assert!(region_route.is_leader_downgraded()); + assert!(region_route.is_leader_downgrading()); } #[test] @@ -499,7 +505,7 @@ mod tests { }, leader_peer: Some(Peer::new(1, "a1")), follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")], - leader_status: None, + leader_state: None, leader_down_since: None, }; diff --git a/src/datanode/src/alive_keeper.rs b/src/datanode/src/alive_keeper.rs index c6ef6cb3f6db..a0ea2c0188bb 100644 --- a/src/datanode/src/alive_keeper.rs +++ b/src/datanode/src/alive_keeper.rs @@ -129,8 +129,10 @@ impl RegionAliveKeeper { let request = RegionRequest::Close(RegionCloseRequest {}); if let Err(e) = self.region_server.handle_request(region_id, request).await { if e.status_code() != StatusCode::RegionNotFound { - let _ = self.region_server.set_writable(region_id, false); - error!(e; "Failed to close staled region {}, set region to readonly.",region_id); + let _ = self + .region_server + .set_region_role(region_id, RegionRole::Follower); + error!(e; "Failed to close staled region {}, convert region to follower.", region_id); } } } @@ -378,7 +380,7 @@ impl CountdownTask { } }, Some(CountdownCommand::Reset((role, deadline))) => { - let _ = self.region_server.set_writable(self.region_id, role.writable()); + let _ = self.region_server.set_region_role(self.region_id, role); trace!( "Reset deadline of region {region_id} to approximately {} seconds later.", (deadline - Instant::now()).as_secs_f32(), @@ -399,8 +401,8 @@ impl CountdownTask { } } () = &mut countdown => { - warn!("The region {region_id} lease is expired, set region to readonly."); - let _ = self.region_server.set_writable(self.region_id, false); + warn!("The region {region_id} lease is expired, convert region to follower."); + let _ = self.region_server.set_region_role(self.region_id, RegionRole::Follower); // resets the countdown. let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 30); countdown.as_mut().reset(far_future); @@ -436,7 +438,9 @@ mod test { .handle_request(region_id, RegionRequest::Create(builder.build())) .await .unwrap(); - region_server.set_writable(region_id, true).unwrap(); + region_server + .set_region_role(region_id, RegionRole::Leader) + .unwrap(); // Register a region before starting. alive_keeper.register_region(region_id).await; diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index f5d7bd9fc627..128a60ab9b7b 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -47,7 +47,7 @@ use servers::server::ServerHandlers; use servers::Mode; use snafu::{ensure, OptionExt, ResultExt}; use store_api::path_utils::{region_dir, WAL_DIR}; -use store_api::region_engine::RegionEngineRef; +use store_api::region_engine::{RegionEngineRef, RegionRole}; use store_api::region_request::RegionOpenRequest; use store_api::storage::RegionId; use tokio::fs; @@ -546,9 +546,9 @@ async fn open_all_regions( for region_id in open_regions { if open_with_writable { - if let Err(e) = region_server.set_writable(region_id, true) { + if let Err(e) = region_server.set_region_role(region_id, RegionRole::Leader) { error!( - e; "failed to set writable for region {region_id}" + e; "failed to convert region {region_id} to leader" ); } } diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index d84552a8d215..ef9af0acdd02 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -126,7 +126,9 @@ impl HeartbeatTask { let mut follower_region_lease_count = 0; for lease in &lease.regions { match lease.role() { - RegionRole::Leader => leader_region_lease_count += 1, + RegionRole::Leader | RegionRole::DowngradingLeader => { + leader_region_lease_count += 1 + } RegionRole::Follower => follower_region_lease_count += 1, } } diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index d23615eb13d8..89b6991788cc 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -153,6 +153,7 @@ mod tests { use mito2::engine::MITO_ENGINE_NAME; use mito2::test_util::{CreateRequestBuilder, TestEnv}; use store_api::path_utils::region_dir; + use store_api::region_engine::RegionRole; use store_api::region_request::{RegionCloseRequest, RegionRequest}; use store_api::storage::RegionId; use tokio::sync::mpsc::{self, Receiver}; @@ -213,6 +214,7 @@ mod tests { let instruction = Instruction::DowngradeRegion(DowngradeRegion { region_id: RegionId::new(2048, 1), flush_timeout: Some(Duration::from_secs(1)), + reject_write: false, }); assert!(heartbeat_handler .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction)))); @@ -295,7 +297,9 @@ mod tests { } assert_matches!( - region_server.set_writable(region_id, true).unwrap_err(), + region_server + .set_region_role(region_id, RegionRole::Leader) + .unwrap_err(), error::Error::RegionNotFound { .. } ); } @@ -411,6 +415,7 @@ mod tests { let instruction = Instruction::DowngradeRegion(DowngradeRegion { region_id, flush_timeout: Some(Duration::from_secs(1)), + reject_write: false, }); let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction)); @@ -433,6 +438,7 @@ mod tests { let instruction = Instruction::DowngradeRegion(DowngradeRegion { region_id: RegionId::new(2048, 1), flush_timeout: Some(Duration::from_secs(1)), + reject_write: false, }); let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction)); let control = heartbeat_handler.handle(&mut ctx).await.unwrap(); diff --git a/src/datanode/src/heartbeat/handler/downgrade_region.rs b/src/datanode/src/heartbeat/handler/downgrade_region.rs index ac1179280376..fd85c75ba2dd 100644 --- a/src/datanode/src/heartbeat/handler/downgrade_region.rs +++ b/src/datanode/src/heartbeat/handler/downgrade_region.rs @@ -16,7 +16,7 @@ use common_meta::instruction::{DowngradeRegion, DowngradeRegionReply, Instructio use common_telemetry::tracing::info; use common_telemetry::warn; use futures_util::future::BoxFuture; -use store_api::region_engine::SetReadonlyResponse; +use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState}; use store_api::region_request::{RegionFlushRequest, RegionRequest}; use store_api::storage::RegionId; @@ -24,16 +24,20 @@ use crate::heartbeat::handler::HandlerContext; use crate::heartbeat::task_tracker::WaitResult; impl HandlerContext { - async fn set_readonly_gracefully(&self, region_id: RegionId) -> InstructionReply { - match self.region_server.set_readonly_gracefully(region_id).await { - Ok(SetReadonlyResponse::Success { last_entry_id }) => { + async fn downgrade_to_follower_gracefully(&self, region_id: RegionId) -> InstructionReply { + match self + .region_server + .set_region_role_state_gracefully(region_id, SettableRegionRoleState::Follower) + .await + { + Ok(SetRegionRoleStateResponse::Success { last_entry_id }) => { InstructionReply::DowngradeRegion(DowngradeRegionReply { last_entry_id, exists: true, error: None, }) } - Ok(SetReadonlyResponse::NotFound) => { + Ok(SetRegionRoleStateResponse::NotFound) => { InstructionReply::DowngradeRegion(DowngradeRegionReply { last_entry_id: None, exists: false, @@ -53,10 +57,12 @@ impl HandlerContext { DowngradeRegion { region_id, flush_timeout, + reject_write, }: DowngradeRegion, ) -> BoxFuture<'static, InstructionReply> { Box::pin(async move { - let Some(writable) = self.region_server.is_writable(region_id) else { + let Some(writable) = self.region_server.is_region_leader(region_id) else { + warn!("Region: {region_id} is not found"); return InstructionReply::DowngradeRegion(DowngradeRegionReply { last_entry_id: None, exists: false, @@ -64,61 +70,89 @@ impl HandlerContext { }); }; + let region_server_moved = self.region_server.clone(); + // Ignores flush request if !writable { - return self.set_readonly_gracefully(region_id).await; + return self.downgrade_to_follower_gracefully(region_id).await; } - let region_server_moved = self.region_server.clone(); - if let Some(flush_timeout) = flush_timeout { - let register_result = self - .downgrade_tasks - .try_register( + // If flush_timeout is not set, directly convert region to follower. + let Some(flush_timeout) = flush_timeout else { + return self.downgrade_to_follower_gracefully(region_id).await; + }; + + if reject_write { + // Sets region to downgrading, the downgrading region will reject all write requests. + match self + .region_server + .set_region_role_state_gracefully( region_id, - Box::pin(async move { - info!("Flush region: {region_id} before downgrading region"); - region_server_moved - .handle_request( - region_id, - RegionRequest::Flush(RegionFlushRequest { - row_group_size: None, - }), - ) - .await?; - - Ok(()) - }), + SettableRegionRoleState::DowngradingLeader, ) - .await; - - if register_result.is_busy() { - warn!("Another flush task is running for the region: {region_id}"); - } - - let mut watcher = register_result.into_watcher(); - let result = self.catchup_tasks.wait(&mut watcher, flush_timeout).await; - - match result { - WaitResult::Timeout => { - InstructionReply::DowngradeRegion(DowngradeRegionReply { + .await + { + Ok(SetRegionRoleStateResponse::Success { .. }) => {} + Ok(SetRegionRoleStateResponse::NotFound) => { + warn!("Region: {region_id} is not found"); + return InstructionReply::DowngradeRegion(DowngradeRegionReply { last_entry_id: None, - exists: true, - error: Some(format!( - "Flush region: {region_id} before downgrading region is timeout" - )), - }) + exists: false, + error: None, + }); } - WaitResult::Finish(Ok(_)) => self.set_readonly_gracefully(region_id).await, - WaitResult::Finish(Err(err)) => { - InstructionReply::DowngradeRegion(DowngradeRegionReply { + Err(err) => { + warn!(err; "Failed to convert region to downgrading leader"); + return InstructionReply::DowngradeRegion(DowngradeRegionReply { last_entry_id: None, exists: true, error: Some(format!("{err:?}")), - }) + }); } } - } else { - self.set_readonly_gracefully(region_id).await + } + + let register_result = self + .downgrade_tasks + .try_register( + region_id, + Box::pin(async move { + info!("Flush region: {region_id} before converting region to follower"); + region_server_moved + .handle_request( + region_id, + RegionRequest::Flush(RegionFlushRequest { + row_group_size: None, + }), + ) + .await?; + + Ok(()) + }), + ) + .await; + + if register_result.is_busy() { + warn!("Another flush task is running for the region: {region_id}"); + } + + let mut watcher = register_result.into_watcher(); + let result = self.catchup_tasks.wait(&mut watcher, flush_timeout).await; + + match result { + WaitResult::Timeout => InstructionReply::DowngradeRegion(DowngradeRegionReply { + last_entry_id: None, + exists: true, + error: Some(format!("Flush region: {region_id} is timeout")), + }), + WaitResult::Finish(Ok(_)) => self.downgrade_to_follower_gracefully(region_id).await, + WaitResult::Finish(Err(err)) => { + InstructionReply::DowngradeRegion(DowngradeRegionReply { + last_entry_id: None, + exists: true, + error: Some(format!("{err:?}")), + }) + } } }) } @@ -131,7 +165,7 @@ mod tests { use common_meta::instruction::{DowngradeRegion, InstructionReply}; use mito2::engine::MITO_ENGINE_NAME; - use store_api::region_engine::{RegionRole, SetReadonlyResponse}; + use store_api::region_engine::{RegionRole, SetRegionRoleStateResponse}; use store_api::region_request::RegionRequest; use store_api::storage::RegionId; use tokio::time::Instant; @@ -155,6 +189,7 @@ mod tests { .handle_downgrade_region_instruction(DowngradeRegion { region_id, flush_timeout, + reject_write: false, }) .await; assert_matches!(reply, InstructionReply::DowngradeRegion(_)); @@ -182,8 +217,9 @@ mod tests { Ok(0) })); - region_engine.handle_set_readonly_gracefully_mock_fn = - Some(Box::new(|_| Ok(SetReadonlyResponse::success(Some(1024))))) + region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| { + Ok(SetRegionRoleStateResponse::success(Some(1024))) + })) }); mock_region_server.register_test_region(region_id, mock_engine); let handler_context = HandlerContext::new_for_test(mock_region_server); @@ -195,6 +231,7 @@ mod tests { .handle_downgrade_region_instruction(DowngradeRegion { region_id, flush_timeout, + reject_write: false, }) .await; assert_matches!(reply, InstructionReply::DowngradeRegion(_)); @@ -215,8 +252,9 @@ mod tests { MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| { region_engine.mock_role = Some(Some(RegionRole::Leader)); region_engine.handle_request_delay = Some(Duration::from_secs(100)); - region_engine.handle_set_readonly_gracefully_mock_fn = - Some(Box::new(|_| Ok(SetReadonlyResponse::success(Some(1024))))) + region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| { + Ok(SetRegionRoleStateResponse::success(Some(1024))) + })) }); mock_region_server.register_test_region(region_id, mock_engine); let handler_context = HandlerContext::new_for_test(mock_region_server); @@ -227,6 +265,7 @@ mod tests { .handle_downgrade_region_instruction(DowngradeRegion { region_id, flush_timeout: Some(flush_timeout), + reject_write: false, }) .await; assert_matches!(reply, InstructionReply::DowngradeRegion(_)); @@ -246,8 +285,9 @@ mod tests { MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| { region_engine.mock_role = Some(Some(RegionRole::Leader)); region_engine.handle_request_delay = Some(Duration::from_millis(300)); - region_engine.handle_set_readonly_gracefully_mock_fn = - Some(Box::new(|_| Ok(SetReadonlyResponse::success(Some(1024))))) + region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| { + Ok(SetRegionRoleStateResponse::success(Some(1024))) + })) }); mock_region_server.register_test_region(region_id, mock_engine); let handler_context = HandlerContext::new_for_test(mock_region_server); @@ -263,6 +303,7 @@ mod tests { .handle_downgrade_region_instruction(DowngradeRegion { region_id, flush_timeout, + reject_write: false, }) .await; assert_matches!(reply, InstructionReply::DowngradeRegion(_)); @@ -277,6 +318,7 @@ mod tests { .handle_downgrade_region_instruction(DowngradeRegion { region_id, flush_timeout: Some(Duration::from_millis(500)), + reject_write: false, }) .await; assert_matches!(reply, InstructionReply::DowngradeRegion(_)); @@ -304,8 +346,9 @@ mod tests { } .fail() })); - region_engine.handle_set_readonly_gracefully_mock_fn = - Some(Box::new(|_| Ok(SetReadonlyResponse::success(Some(1024))))) + region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| { + Ok(SetRegionRoleStateResponse::success(Some(1024))) + })) }); mock_region_server.register_test_region(region_id, mock_engine); let handler_context = HandlerContext::new_for_test(mock_region_server); @@ -321,6 +364,7 @@ mod tests { .handle_downgrade_region_instruction(DowngradeRegion { region_id, flush_timeout, + reject_write: false, }) .await; assert_matches!(reply, InstructionReply::DowngradeRegion(_)); @@ -335,6 +379,7 @@ mod tests { .handle_downgrade_region_instruction(DowngradeRegion { region_id, flush_timeout: Some(Duration::from_millis(500)), + reject_write: false, }) .await; assert_matches!(reply, InstructionReply::DowngradeRegion(_)); @@ -356,7 +401,7 @@ mod tests { MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| { region_engine.mock_role = Some(Some(RegionRole::Leader)); region_engine.handle_set_readonly_gracefully_mock_fn = - Some(Box::new(|_| Ok(SetReadonlyResponse::NotFound))); + Some(Box::new(|_| Ok(SetRegionRoleStateResponse::NotFound))); }); mock_region_server.register_test_region(region_id, mock_engine); let handler_context = HandlerContext::new_for_test(mock_region_server); @@ -365,6 +410,7 @@ mod tests { .handle_downgrade_region_instruction(DowngradeRegion { region_id, flush_timeout: None, + reject_write: false, }) .await; assert_matches!(reply, InstructionReply::DowngradeRegion(_)); @@ -396,6 +442,7 @@ mod tests { .handle_downgrade_region_instruction(DowngradeRegion { region_id, flush_timeout: None, + reject_write: false, }) .await; assert_matches!(reply, InstructionReply::DowngradeRegion(_)); diff --git a/src/datanode/src/heartbeat/handler/upgrade_region.rs b/src/datanode/src/heartbeat/handler/upgrade_region.rs index 0d1ef0476c95..9acb3da9c348 100644 --- a/src/datanode/src/heartbeat/handler/upgrade_region.rs +++ b/src/datanode/src/heartbeat/handler/upgrade_region.rs @@ -31,7 +31,7 @@ impl HandlerContext { }: UpgradeRegion, ) -> BoxFuture<'static, InstructionReply> { Box::pin(async move { - let Some(writable) = self.region_server.is_writable(region_id) else { + let Some(writable) = self.region_server.is_region_leader(region_id) else { return InstructionReply::UpgradeRegion(UpgradeRegionReply { ready: false, exists: false, diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index aa80f52a5c24..0bac53e4d6a2 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -54,7 +54,10 @@ use snafu::{ensure, OptionExt, ResultExt}; use store_api::metric_engine_consts::{ FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, }; -use store_api::region_engine::{RegionEngineRef, RegionRole, RegionStatistic, SetReadonlyResponse}; +use store_api::region_engine::{ + RegionEngineRef, RegionRole, RegionStatistic, SetRegionRoleStateResponse, + SettableRegionRoleState, +}; use store_api::region_request::{ AffectedRows, RegionCloseRequest, RegionOpenRequest, RegionRequest, }; @@ -274,37 +277,47 @@ impl RegionServer { .collect() } - pub fn is_writable(&self, region_id: RegionId) -> Option { - // TODO(weny): Finds a better way. + pub fn is_region_leader(&self, region_id: RegionId) -> Option { self.inner.region_map.get(®ion_id).and_then(|engine| { engine.role(region_id).map(|role| match role { RegionRole::Follower => false, RegionRole::Leader => true, + RegionRole::DowngradingLeader => true, }) }) } - pub fn set_writable(&self, region_id: RegionId, writable: bool) -> Result<()> { + pub fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> { let engine = self .inner .region_map .get(®ion_id) .with_context(|| RegionNotFoundSnafu { region_id })?; engine - .set_writable(region_id, writable) + .set_region_role(region_id, role) .with_context(|_| HandleRegionRequestSnafu { region_id }) } - pub async fn set_readonly_gracefully( + /// Set region role state gracefully. + /// + /// For [SettableRegionRoleState::Follower]: + /// After the call returns, the engine ensures that + /// no **further** write or flush operations will succeed in this region. + /// + /// For [SettableRegionRoleState::DowngradingLeader]: + /// After the call returns, the engine ensures that + /// no **further** write operations will succeed in this region. + pub async fn set_region_role_state_gracefully( &self, region_id: RegionId, - ) -> Result { + state: SettableRegionRoleState, + ) -> Result { match self.inner.region_map.get(®ion_id) { Some(engine) => Ok(engine - .set_readonly_gracefully(region_id) + .set_region_role_state_gracefully(region_id, state) .await .with_context(|_| HandleRegionRequestSnafu { region_id })?), - None => Ok(SetReadonlyResponse::NotFound), + None => Ok(SetRegionRoleStateResponse::NotFound), } } @@ -842,7 +855,7 @@ impl RegionServerInner { info!("Region {region_id} is deregistered from engine {engine_type}"); self.region_map .remove(®ion_id) - .map(|(id, engine)| engine.set_writable(id, false)); + .map(|(id, engine)| engine.set_region_role(id, RegionRole::Follower)); self.event_listener.on_region_deregistered(region_id); } RegionChange::Catchup => { diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index 35f513bc8348..2acc66a5927d 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -32,7 +32,8 @@ use query::{QueryEngine, QueryEngineContext}; use session::context::QueryContextRef; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{ - RegionEngine, RegionRole, RegionScannerRef, RegionStatistic, SetReadonlyResponse, + RegionEngine, RegionRole, RegionScannerRef, RegionStatistic, SetRegionRoleStateResponse, + SettableRegionRoleState, }; use store_api::region_request::{AffectedRows, RegionRequest}; use store_api::storage::{RegionId, ScanRequest}; @@ -106,7 +107,7 @@ pub type MockRequestHandler = Box Result + Send + Sync>; pub type MockSetReadonlyGracefullyHandler = - Box Result + Send + Sync>; + Box Result + Send + Sync>; pub struct MockRegionEngine { sender: Sender<(RegionId, RegionRequest)>, @@ -220,14 +221,15 @@ impl RegionEngine for MockRegionEngine { Ok(()) } - fn set_writable(&self, _region_id: RegionId, _writable: bool) -> Result<(), BoxedError> { + fn set_region_role(&self, _region_id: RegionId, _role: RegionRole) -> Result<(), BoxedError> { Ok(()) } - async fn set_readonly_gracefully( + async fn set_region_role_state_gracefully( &self, region_id: RegionId, - ) -> Result { + _region_role_state: SettableRegionRoleState, + ) -> Result { if let Some(mock_fn) = &self.handle_set_readonly_gracefully_mock_fn { return mock_fn(region_id).map_err(BoxedError::new); }; diff --git a/src/file-engine/src/engine.rs b/src/file-engine/src/engine.rs index 32e1a1d58d0d..e6313f4322cc 100644 --- a/src/file-engine/src/engine.rs +++ b/src/file-engine/src/engine.rs @@ -26,8 +26,8 @@ use object_store::ObjectStore; use snafu::{ensure, OptionExt}; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{ - RegionEngine, RegionRole, RegionScannerRef, RegionStatistic, SetReadonlyResponse, - SinglePartitionScanner, + RegionEngine, RegionRole, RegionScannerRef, RegionStatistic, SetRegionRoleStateResponse, + SettableRegionRoleState, SinglePartitionScanner, }; use store_api::region_request::{ AffectedRows, RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest, @@ -113,22 +113,23 @@ impl RegionEngine for FileRegionEngine { None } - fn set_writable(&self, region_id: RegionId, writable: bool) -> Result<(), BoxedError> { + fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> { self.inner - .set_writable(region_id, writable) + .set_region_role(region_id, role) .map_err(BoxedError::new) } - async fn set_readonly_gracefully( + async fn set_region_role_state_gracefully( &self, region_id: RegionId, - ) -> Result { + _region_role_state: SettableRegionRoleState, + ) -> Result { let exists = self.inner.get_region(region_id).await.is_some(); if exists { - Ok(SetReadonlyResponse::success(None)) + Ok(SetRegionRoleStateResponse::success(None)) } else { - Ok(SetReadonlyResponse::NotFound) + Ok(SetRegionRoleStateResponse::NotFound) } } @@ -189,7 +190,7 @@ impl EngineInner { Ok(()) } - fn set_writable(&self, _region_id: RegionId, _writable: bool) -> EngineResult<()> { + fn set_region_role(&self, _region_id: RegionId, _region_role: RegionRole) -> EngineResult<()> { // TODO(zhongzc): Improve the semantics and implementation of this API. Ok(()) } diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index 06cf818d237e..de491da37150 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -111,7 +111,7 @@ mod test { use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::peer::Peer; use common_meta::region_keeper::MemoryRegionKeeper; - use common_meta::rpc::router::{Region, RegionRoute, RegionStatus}; + use common_meta::rpc::router::{LeaderState, Region, RegionRoute}; use store_api::region_engine::RegionRole; use store_api::storage::RegionId; @@ -297,7 +297,7 @@ mod test { region: Region::new_test(region_id), leader_peer: Some(peer.clone()), follower_peers: vec![follower_peer.clone()], - leader_status: Some(RegionStatus::Downgraded), + leader_state: Some(LeaderState::Downgrading), leader_down_since: Some(1), }, RegionRoute { @@ -352,7 +352,7 @@ mod test { assert_region_lease( acc, vec![ - GrantedRegion::new(region_id, RegionRole::Follower), + GrantedRegion::new(region_id, RegionRole::DowngradingLeader), GrantedRegion::new(another_region_id, RegionRole::Leader), ], ); diff --git a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs index 836ca4c53212..ec5114b9eb6a 100644 --- a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs @@ -22,8 +22,10 @@ use common_meta::instruction::{ }; use common_procedure::Status; use common_telemetry::{error, info, warn}; +use common_wal::options::WalOptions; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; +use store_api::storage::RegionId; use tokio::time::{sleep, Instant}; use super::update_metadata::UpdateMetadata; @@ -95,15 +97,32 @@ impl DowngradeLeaderRegion { &self, ctx: &Context, flush_timeout: Duration, + reject_write: bool, ) -> Instruction { let pc = &ctx.persistent_ctx; let region_id = pc.region_id; Instruction::DowngradeRegion(DowngradeRegion { region_id, flush_timeout: Some(flush_timeout), + reject_write, }) } + async fn should_reject_write(ctx: &mut Context, region_id: RegionId) -> Result { + let datanode_table_value = ctx.get_from_peer_datanode_table_value().await?; + if let Some(wal_option) = datanode_table_value + .region_info + .region_wal_options + .get(®ion_id.region_number()) + { + let options: WalOptions = serde_json::from_str(wal_option) + .with_context(|_| error::DeserializeFromJsonSnafu { input: wal_option })?; + return Ok(matches!(options, WalOptions::RaftEngine)); + } + + Ok(true) + } + /// Tries to downgrade a leader region. /// /// Retry: @@ -118,16 +137,17 @@ impl DowngradeLeaderRegion { /// - [ExceededDeadline](error::Error::ExceededDeadline) /// - Invalid JSON. async fn downgrade_region(&self, ctx: &mut Context) -> Result<()> { - let pc = &ctx.persistent_ctx; - let region_id = pc.region_id; - let leader = &pc.from_peer; + let region_id = ctx.persistent_ctx.region_id; let operation_timeout = ctx.next_operation_timeout() .context(error::ExceededDeadlineSnafu { operation: "Downgrade region", })?; - let downgrade_instruction = self.build_downgrade_region_instruction(ctx, operation_timeout); + let reject_write = Self::should_reject_write(ctx, region_id).await?; + let downgrade_instruction = + self.build_downgrade_region_instruction(ctx, operation_timeout, reject_write); + let leader = &ctx.persistent_ctx.from_peer; let msg = MailboxMessage::json_message( &format!("Downgrade leader region: {}", region_id), &format!("Meta@{}", ctx.server_addr()), @@ -240,8 +260,13 @@ impl DowngradeLeaderRegion { #[cfg(test)] mod tests { use std::assert_matches::assert_matches; + use std::collections::HashMap; + use common_meta::key::table_route::TableRouteValue; + use common_meta::key::test_utils::new_test_table_info; use common_meta::peer::Peer; + use common_meta::rpc::router::{Region, RegionRoute}; + use common_wal::options::KafkaWalOptions; use store_api::storage::RegionId; use tokio::time::Instant; @@ -264,19 +289,73 @@ mod tests { } } + async fn prepare_table_metadata(ctx: &Context, wal_options: HashMap) { + let table_info = + new_test_table_info(ctx.persistent_ctx.region_id.table_id(), vec![1]).into(); + let region_routes = vec![RegionRoute { + region: Region::new_test(ctx.persistent_ctx.region_id), + leader_peer: Some(ctx.persistent_ctx.from_peer.clone()), + follower_peers: vec![ctx.persistent_ctx.to_peer.clone()], + ..Default::default() + }]; + ctx.table_metadata_manager + .create_table_metadata( + table_info, + TableRouteValue::physical(region_routes), + wal_options, + ) + .await + .unwrap(); + } + #[tokio::test] async fn test_datanode_is_unreachable() { let state = DowngradeLeaderRegion::default(); let persistent_context = new_persistent_context(); let env = TestingEnv::new(); let mut ctx = env.context_factory().new_context(persistent_context); - + prepare_table_metadata(&ctx, HashMap::default()).await; let err = state.downgrade_region(&mut ctx).await.unwrap_err(); assert_matches!(err, Error::PusherNotFound { .. }); assert!(!err.is_retryable()); } + #[tokio::test] + async fn test_should_reject_writes() { + let persistent_context = new_persistent_context(); + let region_id = persistent_context.region_id; + let env = TestingEnv::new(); + let mut ctx = env.context_factory().new_context(persistent_context); + let wal_options = + HashMap::from([(1, serde_json::to_string(&WalOptions::RaftEngine).unwrap())]); + prepare_table_metadata(&ctx, wal_options).await; + + let reject_write = DowngradeLeaderRegion::should_reject_write(&mut ctx, region_id) + .await + .unwrap(); + assert!(reject_write); + + // Remote WAL + let persistent_context = new_persistent_context(); + let region_id = persistent_context.region_id; + let env = TestingEnv::new(); + let mut ctx = env.context_factory().new_context(persistent_context); + let wal_options = HashMap::from([( + 1, + serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions { + topic: "my_topic".to_string(), + })) + .unwrap(), + )]); + prepare_table_metadata(&ctx, wal_options).await; + + let reject_write = DowngradeLeaderRegion::should_reject_write(&mut ctx, region_id) + .await + .unwrap(); + assert!(!reject_write); + } + #[tokio::test] async fn test_pusher_dropped() { let state = DowngradeLeaderRegion::default(); @@ -285,6 +364,7 @@ mod tests { let mut env = TestingEnv::new(); let mut ctx = env.context_factory().new_context(persistent_context); + prepare_table_metadata(&ctx, HashMap::default()).await; let mailbox_ctx = env.mailbox_context(); let (tx, rx) = tokio::sync::mpsc::channel(1); @@ -307,6 +387,7 @@ mod tests { let persistent_context = new_persistent_context(); let env = TestingEnv::new(); let mut ctx = env.context_factory().new_context(persistent_context); + prepare_table_metadata(&ctx, HashMap::default()).await; ctx.volatile_ctx.operations_elapsed = ctx.persistent_ctx.timeout + Duration::from_secs(1); let err = state.downgrade_region(&mut ctx).await.unwrap_err(); @@ -330,6 +411,7 @@ mod tests { let mut env = TestingEnv::new(); let mut ctx = env.context_factory().new_context(persistent_context); + prepare_table_metadata(&ctx, HashMap::default()).await; let mailbox_ctx = env.mailbox_context(); let mailbox = mailbox_ctx.mailbox().clone(); @@ -356,6 +438,7 @@ mod tests { let mut env = TestingEnv::new(); let mut ctx = env.context_factory().new_context(persistent_context); + prepare_table_metadata(&ctx, HashMap::default()).await; let mailbox_ctx = env.mailbox_context(); let mailbox = mailbox_ctx.mailbox().clone(); @@ -383,6 +466,7 @@ mod tests { let mut env = TestingEnv::new(); let mut ctx = env.context_factory().new_context(persistent_context); + prepare_table_metadata(&ctx, HashMap::default()).await; let mailbox_ctx = env.mailbox_context(); let mailbox = mailbox_ctx.mailbox().clone(); @@ -416,6 +500,7 @@ mod tests { let mut env = TestingEnv::new(); let mut ctx = env.context_factory().new_context(persistent_context); + prepare_table_metadata(&ctx, HashMap::default()).await; let mailbox_ctx = env.mailbox_context(); let mailbox = mailbox_ctx.mailbox().clone(); @@ -508,6 +593,7 @@ mod tests { let mut env = TestingEnv::new(); let mut ctx = env.context_factory().new_context(persistent_context); + prepare_table_metadata(&ctx, HashMap::default()).await; let mailbox_ctx = env.mailbox_context(); let mailbox = mailbox_ctx.mailbox().clone(); diff --git a/src/meta-srv/src/procedure/region_migration/manager.rs b/src/meta-srv/src/procedure/region_migration/manager.rs index bb3eff80c0b3..01ea887ca9fd 100644 --- a/src/meta-srv/src/procedure/region_migration/manager.rs +++ b/src/meta-srv/src/procedure/region_migration/manager.rs @@ -246,7 +246,7 @@ impl RegionMigrationManager { region_route: &RegionRoute, task: &RegionMigrationProcedureTask, ) -> Result { - if region_route.is_leader_downgraded() { + if region_route.is_leader_downgrading() { return Ok(false); } diff --git a/src/meta-srv/src/procedure/region_migration/test_util.rs b/src/meta-srv/src/procedure/region_migration/test_util.rs index 65e33ab3d99e..cb3b5a3dc3ab 100644 --- a/src/meta-srv/src/procedure/region_migration/test_util.rs +++ b/src/meta-srv/src/procedure/region_migration/test_util.rs @@ -449,7 +449,7 @@ impl ProcedureMigrationTestSuite { .find(|route| route.region.id == region_id) .unwrap(); - assert!(!region_route.is_leader_downgraded()); + assert!(!region_route.is_leader_downgrading()); assert_eq!( region_route.leader_peer.as_ref().unwrap().id, expected_leader_id diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs index 3b3f6a6c0c3b..d8bad44871d6 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs @@ -13,7 +13,7 @@ // limitations under the License. use common_error::ext::BoxedError; -use common_meta::rpc::router::RegionStatus; +use common_meta::rpc::router::LeaderState; use snafu::ResultExt; use crate::error::{self, Result}; @@ -53,7 +53,7 @@ impl UpdateMetadata { .as_ref() .is_some_and(|leader_peer| leader_peer.id == from_peer_id) { - Some(Some(RegionStatus::Downgraded)) + Some(Some(LeaderState::Downgrading)) } else { None } @@ -81,7 +81,7 @@ mod tests { use common_meta::key::test_utils::new_test_table_info; use common_meta::peer::Peer; - use common_meta::rpc::router::{Region, RegionRoute, RegionStatus}; + use common_meta::rpc::router::{LeaderState, Region, RegionRoute}; use store_api::storage::RegionId; use crate::error::Error; @@ -155,7 +155,7 @@ mod tests { table_metadata_manager .update_leader_region_status(table_id, &original_table_route, |route| { if route.region.id == RegionId::new(1024, 2) { - Some(Some(RegionStatus::Downgraded)) + Some(Some(LeaderState::Downgrading)) } else { None } @@ -210,7 +210,7 @@ mod tests { // It should remain unchanged. assert_eq!(latest_table_route.version().unwrap(), 0); - assert!(!latest_table_route.region_routes().unwrap()[0].is_leader_downgraded()); + assert!(!latest_table_route.region_routes().unwrap()[0].is_leader_downgrading()); assert!(ctx.volatile_ctx.table_route.is_none()); } @@ -251,7 +251,7 @@ mod tests { .unwrap() .unwrap(); - assert!(latest_table_route.region_routes().unwrap()[0].is_leader_downgraded()); + assert!(latest_table_route.region_routes().unwrap()[0].is_leader_downgrading()); assert!(ctx.volatile_ctx.table_route.is_none()); } } diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs index 4e6f20ef195f..0d568ab7b0bb 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs @@ -65,7 +65,7 @@ mod tests { use common_meta::key::test_utils::new_test_table_info; use common_meta::peer::Peer; - use common_meta::rpc::router::{Region, RegionRoute, RegionStatus}; + use common_meta::rpc::router::{LeaderState, Region, RegionRoute}; use store_api::storage::RegionId; use crate::error::Error; @@ -110,13 +110,13 @@ mod tests { RegionRoute { region: Region::new_test(RegionId::new(1024, 1)), leader_peer: Some(from_peer.clone()), - leader_status: Some(RegionStatus::Downgraded), + leader_state: Some(LeaderState::Downgrading), ..Default::default() }, RegionRoute { region: Region::new_test(RegionId::new(1024, 2)), leader_peer: Some(Peer::empty(4)), - leader_status: Some(RegionStatus::Downgraded), + leader_state: Some(LeaderState::Downgrading), ..Default::default() }, RegionRoute { @@ -128,8 +128,8 @@ mod tests { let expected_region_routes = { let mut region_routes = region_routes.clone(); - region_routes[0].leader_status = None; - region_routes[1].leader_status = None; + region_routes[0].leader_state = None; + region_routes[1].leader_state = None; region_routes }; @@ -207,13 +207,13 @@ mod tests { RegionRoute { region: Region::new_test(RegionId::new(1024, 1)), leader_peer: Some(from_peer.clone()), - leader_status: Some(RegionStatus::Downgraded), + leader_state: Some(LeaderState::Downgrading), ..Default::default() }, RegionRoute { region: Region::new_test(RegionId::new(1024, 2)), leader_peer: Some(Peer::empty(4)), - leader_status: Some(RegionStatus::Downgraded), + leader_state: Some(LeaderState::Downgrading), ..Default::default() }, RegionRoute { @@ -225,7 +225,7 @@ mod tests { let expected_region_routes = { let mut region_routes = region_routes.clone(); - region_routes[0].leader_status = None; + region_routes[0].leader_state = None; region_routes }; diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs index 75f93f760e75..b710a0e1f3e0 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs @@ -43,7 +43,7 @@ impl UpdateMetadata { .context(error::RegionRouteNotFoundSnafu { region_id })?; // Removes downgraded status. - region_route.set_leader_status(None); + region_route.set_leader_state(None); let candidate = &ctx.persistent_ctx.to_peer; let expected_old_leader = &ctx.persistent_ctx.from_peer; @@ -106,7 +106,7 @@ impl UpdateMetadata { if leader_peer.id == candidate_peer_id { ensure!( - !region_route.is_leader_downgraded(), + !region_route.is_leader_downgrading(), error::UnexpectedSnafu { violated: format!("Unexpected intermediate state is found during the update metadata for upgrading region {region_id}"), } @@ -190,7 +190,7 @@ mod tests { use common_meta::key::test_utils::new_test_table_info; use common_meta::peer::Peer; use common_meta::region_keeper::MemoryRegionKeeper; - use common_meta::rpc::router::{Region, RegionRoute, RegionStatus}; + use common_meta::rpc::router::{LeaderState, Region, RegionRoute}; use common_time::util::current_time_millis; use store_api::storage::RegionId; @@ -286,7 +286,7 @@ mod tests { region: Region::new_test(RegionId::new(1024, 1)), leader_peer: Some(Peer::empty(1)), follower_peers: vec![Peer::empty(2), Peer::empty(3)], - leader_status: Some(RegionStatus::Downgraded), + leader_state: Some(LeaderState::Downgrading), leader_down_since: Some(current_time_millis()), }]; @@ -298,7 +298,7 @@ mod tests { .await .unwrap(); - assert!(!new_region_routes[0].is_leader_downgraded()); + assert!(!new_region_routes[0].is_leader_downgrading()); assert!(new_region_routes[0].leader_down_since.is_none()); assert_eq!(new_region_routes[0].follower_peers, vec![Peer::empty(3)]); assert_eq!(new_region_routes[0].leader_peer.as_ref().unwrap().id, 2); @@ -319,13 +319,13 @@ mod tests { region: Region::new_test(RegionId::new(table_id, 1)), leader_peer: Some(Peer::empty(1)), follower_peers: vec![Peer::empty(5), Peer::empty(3)], - leader_status: Some(RegionStatus::Downgraded), + leader_state: Some(LeaderState::Downgrading), leader_down_since: Some(current_time_millis()), }, RegionRoute { region: Region::new_test(RegionId::new(table_id, 2)), leader_peer: Some(Peer::empty(4)), - leader_status: Some(RegionStatus::Downgraded), + leader_state: Some(LeaderState::Downgrading), ..Default::default() }, ]; @@ -382,7 +382,7 @@ mod tests { region: Region::new_test(RegionId::new(1024, 1)), leader_peer: Some(leader_peer), follower_peers: vec![Peer::empty(2), Peer::empty(3)], - leader_status: None, + leader_state: None, leader_down_since: None, }]; @@ -406,7 +406,7 @@ mod tests { region: Region::new_test(RegionId::new(1024, 1)), leader_peer: Some(candidate_peer), follower_peers: vec![Peer::empty(2), Peer::empty(3)], - leader_status: None, + leader_state: None, leader_down_since: None, }]; @@ -430,7 +430,7 @@ mod tests { region: Region::new_test(RegionId::new(1024, 1)), leader_peer: Some(candidate_peer), follower_peers: vec![Peer::empty(2), Peer::empty(3)], - leader_status: Some(RegionStatus::Downgraded), + leader_state: Some(LeaderState::Downgrading), leader_down_since: None, }]; @@ -455,7 +455,7 @@ mod tests { let region_routes = vec![RegionRoute { region: Region::new_test(RegionId::new(table_id, 1)), leader_peer: Some(Peer::empty(1)), - leader_status: Some(RegionStatus::Downgraded), + leader_state: Some(LeaderState::Downgrading), ..Default::default() }]; @@ -485,7 +485,7 @@ mod tests { assert!(ctx.volatile_ctx.table_route.is_none()); assert!(ctx.volatile_ctx.opening_region_guard.is_none()); assert_eq!(region_routes.len(), 1); - assert!(!region_routes[0].is_leader_downgraded()); + assert!(!region_routes[0].is_leader_downgrading()); assert!(region_routes[0].follower_peers.is_empty()); assert_eq!(region_routes[0].leader_peer.as_ref().unwrap().id, 2); } diff --git a/src/meta-srv/src/region/lease_keeper.rs b/src/meta-srv/src/region/lease_keeper.rs index a1065d4cbbc2..194f3710c853 100644 --- a/src/meta-srv/src/region/lease_keeper.rs +++ b/src/meta-srv/src/region/lease_keeper.rs @@ -62,8 +62,8 @@ fn renew_region_lease_via_region_route( // If it's a leader region on this datanode. if let Some(leader) = ®ion_route.leader_peer { if leader.id == datanode_id { - let region_role = if region_route.is_leader_downgraded() { - RegionRole::Follower + let region_role = if region_route.is_leader_downgrading() { + RegionRole::DowngradingLeader } else { RegionRole::Leader }; @@ -220,7 +220,7 @@ mod tests { use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::peer::Peer; use common_meta::region_keeper::MemoryRegionKeeper; - use common_meta::rpc::router::{Region, RegionRouteBuilder, RegionStatus}; + use common_meta::rpc::router::{LeaderState, Region, RegionRouteBuilder}; use store_api::region_engine::RegionRole; use store_api::storage::RegionId; use table::metadata::RawTableInfo; @@ -265,11 +265,11 @@ mod tests { Some((region_id, RegionRole::Follower)) ); - region_route.leader_status = Some(RegionStatus::Downgraded); + region_route.leader_state = Some(LeaderState::Downgrading); // The downgraded leader region on the datanode. assert_eq!( renew_region_lease_via_region_route(®ion_route, leader_peer_id, region_id), - Some((region_id, RegionRole::Follower)) + Some((region_id, RegionRole::DowngradingLeader)) ); } @@ -492,7 +492,7 @@ mod tests { .region(Region::new_test(region_id)) .leader_peer(Peer::empty(leader_peer_id)) .follower_peers(vec![Peer::empty(follower_peer_id)]) - .leader_status(RegionStatus::Downgraded) + .leader_state(LeaderState::Downgrading) .build() .unwrap(); diff --git a/src/meta-srv/src/test_util.rs b/src/meta-srv/src/test_util.rs index 576d1aa92365..6b9ccc99a0fa 100644 --- a/src/meta-srv/src/test_util.rs +++ b/src/meta-srv/src/test_util.rs @@ -36,7 +36,7 @@ pub(crate) fn new_region_route(region_id: u64, peers: &[Peer], leader_node: u64) region, leader_peer, follower_peers: vec![], - leader_status: None, + leader_state: None, leader_down_since: None, } } diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 358da1d2167a..42948aa6cd7d 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -37,7 +37,8 @@ use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; use store_api::metric_engine_consts::METRIC_ENGINE_NAME; use store_api::region_engine::{ - RegionEngine, RegionRole, RegionScannerRef, RegionStatistic, SetReadonlyResponse, + RegionEngine, RegionRole, RegionScannerRef, RegionStatistic, SetRegionRoleStateResponse, + SettableRegionRoleState, }; use store_api::region_request::RegionRequest; use store_api::storage::{RegionId, ScanRequest}; @@ -201,14 +202,14 @@ impl RegionEngine for MetricEngine { Ok(()) } - fn set_writable(&self, region_id: RegionId, writable: bool) -> Result<(), BoxedError> { + fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> { // ignore the region not found error for x in [ utils::to_metadata_region_id(region_id), utils::to_data_region_id(region_id), region_id, ] { - if let Err(e) = self.inner.mito.set_writable(x, writable) + if let Err(e) = self.inner.mito.set_region_role(x, role) && e.status_code() != StatusCode::RegionNotFound { return Err(e); @@ -217,11 +218,15 @@ impl RegionEngine for MetricEngine { Ok(()) } - async fn set_readonly_gracefully( + async fn set_region_role_state_gracefully( &self, region_id: RegionId, - ) -> std::result::Result { - self.inner.mito.set_readonly_gracefully(region_id).await + region_role_state: SettableRegionRoleState, + ) -> std::result::Result { + self.inner + .mito + .set_region_role_state_gracefully(region_id, region_role_state) + .await } /// Returns the physical region role. diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 12b9dd5fefb6..d919633ba964 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -39,8 +39,7 @@ use crate::read::Source; use crate::region::opener::new_manifest_dir; use crate::region::options::RegionOptions; use crate::region::version::{VersionBuilder, VersionRef}; -use crate::region::ManifestContext; -use crate::region::RegionState::Writable; +use crate::region::{ManifestContext, RegionLeaderState, RegionRoleState}; use crate::schedule::scheduler::LocalScheduler; use crate::sst::file::{FileMeta, IndexType}; use crate::sst::file_purger::LocalFilePurger; @@ -129,7 +128,10 @@ pub async fn open_compaction_region( let manifest = manifest_manager.manifest(); let region_metadata = manifest.metadata.clone(); - let manifest_ctx = Arc::new(ManifestContext::new(manifest_manager, Writable)); + let manifest_ctx = Arc::new(ManifestContext::new( + manifest_manager, + RegionRoleState::Leader(RegionLeaderState::Writable), + )); let file_purger = { let purge_scheduler = Arc::new(LocalScheduler::new(mito_config.max_background_jobs)); @@ -379,7 +381,7 @@ impl Compactor for DefaultCompactor { // TODO: We might leak files if we fail to update manifest. We can add a cleanup task to remove them later. compaction_region .manifest_ctx - .update_manifest(Writable, action_list) + .update_manifest(RegionLeaderState::Writable, action_list) .await?; Ok(edit) diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index e9177d40bf24..ed8cc9290906 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -53,7 +53,7 @@ mod prune_test; #[cfg(test)] mod row_selector_test; #[cfg(test)] -mod set_readonly_test; +mod set_role_state_test; #[cfg(test)] mod truncate_test; @@ -77,7 +77,7 @@ use store_api::logstore::LogStore; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{ BatchResponses, RegionEngine, RegionRole, RegionScannerRef, RegionStatistic, - SetReadonlyResponse, + SetRegionRoleStateResponse, SettableRegionRoleState, }; use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest}; use store_api::storage::{RegionId, ScanRequest}; @@ -436,22 +436,27 @@ impl EngineInner { Ok(scan_region) } - /// Set writable mode for a region. - fn set_writable(&self, region_id: RegionId, writable: bool) -> Result<()> { + /// Converts the [`RegionRole`]. + fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> { let region = self .workers .get_region(region_id) .context(RegionNotFoundSnafu { region_id })?; - region.set_writable(writable); + region.set_role(role); Ok(()) } /// Sets read-only for a region and ensures no more writes in the region after it returns. - async fn set_readonly_gracefully(&self, region_id: RegionId) -> Result { + async fn set_region_role_state_gracefully( + &self, + region_id: RegionId, + region_role_state: SettableRegionRoleState, + ) -> Result { // Notes: It acquires the mutable ownership to ensure no other threads, // Therefore, we submit it to the worker. - let (request, receiver) = WorkerRequest::new_set_readonly_gracefully(region_id); + let (request, receiver) = + WorkerRequest::new_set_readonly_gracefully(region_id, region_role_state); self.workers.submit_to_worker(region_id, request).await?; receiver.await.context(RecvSnafu) @@ -459,7 +464,7 @@ impl EngineInner { fn role(&self, region_id: RegionId) -> Option { self.workers.get_region(region_id).map(|region| { - if region.is_readonly() { + if region.is_follower() { RegionRole::Follower } else { RegionRole::Leader @@ -547,22 +552,23 @@ impl RegionEngine for MitoEngine { self.get_region_statistic(region_id) } - fn set_writable(&self, region_id: RegionId, writable: bool) -> Result<(), BoxedError> { + fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> { self.inner - .set_writable(region_id, writable) + .set_region_role(region_id, role) .map_err(BoxedError::new) } - async fn set_readonly_gracefully( + async fn set_region_role_state_gracefully( &self, region_id: RegionId, - ) -> Result { + region_role_state: SettableRegionRoleState, + ) -> Result { let _timer = HANDLE_REQUEST_ELAPSED - .with_label_values(&["set_readonly_gracefully"]) + .with_label_values(&["set_region_role_state_gracefully"]) .start_timer(); self.inner - .set_readonly_gracefully(region_id) + .set_region_role_state_gracefully(region_id, region_role_state) .await .map_err(BoxedError::new) } diff --git a/src/mito2/src/engine/alter_test.rs b/src/mito2/src/engine/alter_test.rs index b48dc2ccfb08..2e75bf19faa0 100644 --- a/src/mito2/src/engine/alter_test.rs +++ b/src/mito2/src/engine/alter_test.rs @@ -24,7 +24,7 @@ use common_recordbatch::RecordBatches; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; use store_api::metadata::ColumnMetadata; -use store_api::region_engine::RegionEngine; +use store_api::region_engine::{RegionEngine, RegionRole}; use store_api::region_request::{ AddColumn, AddColumnLocation, AlterKind, RegionAlterRequest, RegionOpenRequest, RegionRequest, }; @@ -213,8 +213,10 @@ async fn test_put_after_alter() { ) .await .unwrap(); - // Set writable. - engine.set_writable(region_id, true).unwrap(); + // Convert region to leader. + engine + .set_region_role(region_id, RegionRole::Leader) + .unwrap(); // Put with old schema. let rows = Rows { diff --git a/src/mito2/src/engine/catchup_test.rs b/src/mito2/src/engine/catchup_test.rs index 5f4dd3b15acf..a9de0d6008ff 100644 --- a/src/mito2/src/engine/catchup_test.rs +++ b/src/mito2/src/engine/catchup_test.rs @@ -22,7 +22,7 @@ use common_recordbatch::RecordBatches; use common_wal::options::{KafkaWalOptions, WalOptions, WAL_OPTIONS_KEY}; use rstest::rstest; use rstest_reuse::{self, apply}; -use store_api::region_engine::{RegionEngine, SetReadonlyResponse}; +use store_api::region_engine::{RegionEngine, RegionRole, SetRegionRoleStateResponse}; use store_api::region_request::{RegionCatchupRequest, RegionOpenRequest, RegionRequest}; use store_api::storage::{RegionId, ScanRequest}; @@ -34,8 +34,8 @@ use crate::test_util::{ }; use crate::wal::EntryId; -fn get_last_entry_id(resp: SetReadonlyResponse) -> Option { - if let SetReadonlyResponse::Success { last_entry_id } = resp { +fn get_last_entry_id(resp: SetRegionRoleStateResponse) -> Option { + if let SetRegionRoleStateResponse::Success { last_entry_id } = resp { last_entry_id } else { unreachable!(); @@ -45,6 +45,8 @@ fn get_last_entry_id(resp: SetReadonlyResponse) -> Option { #[apply(single_kafka_log_store_factory)] async fn test_catchup_with_last_entry_id(factory: Option) { + use store_api::region_engine::SettableRegionRoleState; + common_telemetry::init_default_ut_logging(); let Some(factory) = factory else { return; @@ -102,7 +104,7 @@ async fn test_catchup_with_last_entry_id(factory: Option) { put_rows(&leader_engine, region_id, rows).await; let resp = leader_engine - .set_readonly_gracefully(region_id) + .set_region_role_state_gracefully(region_id, SettableRegionRoleState::Follower) .await .unwrap(); @@ -159,6 +161,8 @@ async fn test_catchup_with_last_entry_id(factory: Option) { #[apply(single_kafka_log_store_factory)] async fn test_catchup_with_incorrect_last_entry_id(factory: Option) { + use store_api::region_engine::SettableRegionRoleState; + common_telemetry::init_default_ut_logging(); let Some(factory) = factory else { return; @@ -217,7 +221,7 @@ async fn test_catchup_with_incorrect_last_entry_id(factory: Option source.status_code(), CompatReader { .. } => StatusCode::Unexpected, InvalidRegionRequest { source, .. } => source.status_code(), - RegionState { .. } => StatusCode::RegionNotReady, + RegionLeaderState { .. } => StatusCode::RegionNotReady, + &FlushableRegionState { .. } => StatusCode::RegionNotReady, JsonOptions { .. } => StatusCode::InvalidArguments, EmptyRegionDir { .. } | EmptyManifestDir { .. } => StatusCode::RegionNotFound, ArrowReader { .. } => StatusCode::StorageUnavailable, diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 05561b6080ff..9606e92d04db 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -36,7 +36,7 @@ use crate::metrics::{FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_ERRORS_TOTAL, FLUSH use crate::read::Source; use crate::region::options::IndexOptions; use crate::region::version::{VersionControlData, VersionControlRef}; -use crate::region::{ManifestContextRef, RegionState}; +use crate::region::{ManifestContextRef, RegionLeaderState}; use crate::request::{ BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderDdlRequest, SenderWriteRequest, WorkerRequest, @@ -195,6 +195,8 @@ pub enum FlushReason { Alter, /// Flush periodically. Periodically, + /// Flush memtable during downgrading state. + Downgrading, } impl FlushReason { @@ -407,11 +409,23 @@ impl RegionFlushTask { info!("Applying {edit:?} to region {}", self.region_id); let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone())); + + let expected_state = if matches!(self.reason, FlushReason::Downgrading) { + RegionLeaderState::Downgrading + } else { + RegionLeaderState::Writable + }; // We will leak files if the manifest update fails, but we ignore them for simplicity. We can // add a cleanup job to remove them later. - self.manifest_ctx - .update_manifest(RegionState::Writable, action_list) + let version = self + .manifest_ctx + .update_manifest(expected_state, action_list) .await?; + info!( + "Successfully update manifest version to {version}, region: {}, reason: {}", + self.region_id, + self.reason.as_str() + ); Ok(edit) } diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 8fc9095ae5fb..b05daf3da076 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -27,12 +27,16 @@ use common_telemetry::{error, info, warn}; use crossbeam_utils::atomic::AtomicCell; use snafu::{ensure, OptionExt}; use store_api::logstore::provider::Provider; +use store_api::manifest::ManifestVersion; use store_api::metadata::RegionMetadataRef; -use store_api::region_engine::RegionStatistic; +use store_api::region_engine::{RegionRole, RegionStatistic, SettableRegionRoleState}; use store_api::storage::RegionId; use crate::access_layer::AccessLayerRef; -use crate::error::{RegionNotFoundSnafu, RegionStateSnafu, RegionTruncatedSnafu, Result}; +use crate::error::{ + FlushableRegionStateSnafu, RegionLeaderStateSnafu, RegionNotFoundSnafu, RegionTruncatedSnafu, + Result, +}; use crate::manifest::action::{RegionMetaAction, RegionMetaActionList}; use crate::manifest::manager::RegionManifestManager; use crate::memtable::MemtableBuilderRef; @@ -59,11 +63,8 @@ impl RegionUsage { } } -/// State of the region. #[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum RegionState { - /// The region is opened but is still read-only. - ReadOnly, +pub enum RegionLeaderState { /// The region is opened and is writable. Writable, /// The region is altering. @@ -74,6 +75,14 @@ pub enum RegionState { Truncating, /// The region is handling a region edit. Editing, + /// The region is stepping down. + Downgrading, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum RegionRoleState { + Leader(RegionLeaderState), + Follower, } /// Metadata and runtime status of a region. @@ -172,83 +181,91 @@ impl MitoRegion { /// Returns whether the region is writable. pub(crate) fn is_writable(&self) -> bool { - self.manifest_ctx.state.load() == RegionState::Writable + self.manifest_ctx.state.load() == RegionRoleState::Leader(RegionLeaderState::Writable) + } + + /// Returns whether the region is flushable. + pub(crate) fn is_flushable(&self) -> bool { + matches!( + self.manifest_ctx.state.load(), + RegionRoleState::Leader(RegionLeaderState::Writable) + | RegionRoleState::Leader(RegionLeaderState::Downgrading) + ) + } + + /// Returns whether the region is downgrading. + pub(crate) fn is_downgrading(&self) -> bool { + matches!( + self.manifest_ctx.state.load(), + RegionRoleState::Leader(RegionLeaderState::Downgrading) + ) } /// Returns whether the region is readonly. - pub(crate) fn is_readonly(&self) -> bool { - self.manifest_ctx.state.load() == RegionState::ReadOnly + pub(crate) fn is_follower(&self) -> bool { + self.manifest_ctx.state.load() == RegionRoleState::Follower } /// Returns the state of the region. - pub(crate) fn state(&self) -> RegionState { + pub(crate) fn state(&self) -> RegionRoleState { self.manifest_ctx.state.load() } - /// Sets the writable state. - pub(crate) fn set_writable(&self, writable: bool) { - if writable { - // Only sets the region to writable if it is read only. - // This prevents others updating the manifest. - match self - .manifest_ctx - .state - .compare_exchange(RegionState::ReadOnly, RegionState::Writable) - { - Ok(state) => info!( - "Set region {} to writable, previous state: {:?}", - self.region_id, state - ), - Err(state) => { - if state != RegionState::Writable { - warn!( - "Failed to set region {} to writable, current state: {:?}", - self.region_id, state - ) - } - } - } - } else { - self.manifest_ctx.state.store(RegionState::ReadOnly); - } + /// Sets the region role state. + pub(crate) fn set_role(&self, next_role: RegionRole) { + self.manifest_ctx.set_role(next_role, self.region_id); } /// Sets the altering state. /// You should call this method in the worker loop. pub(crate) fn set_altering(&self) -> Result<()> { - self.compare_exchange_state(RegionState::Writable, RegionState::Altering) + self.compare_exchange_state( + RegionLeaderState::Writable, + RegionRoleState::Leader(RegionLeaderState::Altering), + ) } /// Sets the dropping state. /// You should call this method in the worker loop. pub(crate) fn set_dropping(&self) -> Result<()> { - self.compare_exchange_state(RegionState::Writable, RegionState::Dropping) + self.compare_exchange_state( + RegionLeaderState::Writable, + RegionRoleState::Leader(RegionLeaderState::Dropping), + ) } /// Sets the truncating state. /// You should call this method in the worker loop. pub(crate) fn set_truncating(&self) -> Result<()> { - self.compare_exchange_state(RegionState::Writable, RegionState::Truncating) + self.compare_exchange_state( + RegionLeaderState::Writable, + RegionRoleState::Leader(RegionLeaderState::Truncating), + ) } /// Sets the editing state. /// You should call this method in the worker loop. pub(crate) fn set_editing(&self) -> Result<()> { - self.compare_exchange_state(RegionState::Writable, RegionState::Editing) + self.compare_exchange_state( + RegionLeaderState::Writable, + RegionRoleState::Leader(RegionLeaderState::Editing), + ) } /// Sets the region to readonly gracefully. This acquires the manifest write lock. - pub(crate) async fn set_readonly_gracefully(&self) { + pub(crate) async fn set_role_state_gracefully(&self, state: SettableRegionRoleState) { let _manager = self.manifest_ctx.manifest_manager.write().await; // We acquires the write lock of the manifest manager to ensure that no one is updating the manifest. // Then we change the state. - self.set_writable(false); + self.set_role(state.into()); } - /// Switches the region state to `RegionState::Writable` if the current state is `expect`. + /// Switches the region state to `RegionRoleState::Leader(RegionLeaderState::Writable)` if the current state is `expect`. /// Otherwise, logs an error. - pub(crate) fn switch_state_to_writable(&self, expect: RegionState) { - if let Err(e) = self.compare_exchange_state(expect, RegionState::Writable) { + pub(crate) fn switch_state_to_writable(&self, expect: RegionLeaderState) { + if let Err(e) = self + .compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Writable)) + { error!(e; "failed to switch region state to writable, expect state is {:?}", expect); } } @@ -280,12 +297,16 @@ impl MitoRegion { /// Sets the state of the region to given state if the current state equals to /// the expected. - fn compare_exchange_state(&self, expect: RegionState, state: RegionState) -> Result<()> { + fn compare_exchange_state( + &self, + expect: RegionLeaderState, + state: RegionRoleState, + ) -> Result<()> { self.manifest_ctx .state - .compare_exchange(expect, state) + .compare_exchange(RegionRoleState::Leader(expect), state) .map_err(|actual| { - RegionStateSnafu { + RegionLeaderStateSnafu { region_id: self.region_id, state: actual, expect, @@ -303,17 +324,25 @@ pub(crate) struct ManifestContext { manifest_manager: tokio::sync::RwLock, /// The state of the region. The region checks the state before updating /// manifest. - state: AtomicCell, + state: AtomicCell, } impl ManifestContext { - pub(crate) fn new(manager: RegionManifestManager, state: RegionState) -> Self { + pub(crate) fn new(manager: RegionManifestManager, state: RegionRoleState) -> Self { ManifestContext { manifest_manager: tokio::sync::RwLock::new(manager), state: AtomicCell::new(state), } } + pub(crate) async fn manifest_version(&self) -> ManifestVersion { + self.manifest_manager + .read() + .await + .manifest() + .manifest_version + } + pub(crate) async fn has_update(&self) -> Result { self.manifest_manager.read().await.has_update().await } @@ -321,9 +350,9 @@ impl ManifestContext { /// Updates the manifest if current state is `expect_state`. pub(crate) async fn update_manifest( &self, - expect_state: RegionState, + expect_state: RegionLeaderState, action_list: RegionMetaActionList, - ) -> Result<()> { + ) -> Result { // Acquires the write lock of the manifest manager. let mut manager = self.manifest_manager.write().await; // Gets current manifest. @@ -332,8 +361,8 @@ impl ManifestContext { // after `set_readonly_gracefully()` is called. let current_state = self.state.load(); ensure!( - current_state == expect_state, - RegionStateSnafu { + current_state == RegionRoleState::Leader(expect_state), + RegionLeaderStateSnafu { region_id: manifest.metadata.region_id, state: current_state, expect: expect_state, @@ -376,18 +405,92 @@ impl ManifestContext { } // Now we can update the manifest. - manager.update(action_list).await.inspect_err( + let version = manager.update(action_list).await.inspect_err( |e| error!(e; "Failed to update manifest, region_id: {}", manifest.metadata.region_id), )?; - if self.state.load() == RegionState::ReadOnly { + if self.state.load() == RegionRoleState::Follower { warn!( - "Region {} becomes read-only while updating manifest which may cause inconsistency", + "Region {} becomes follower while updating manifest which may cause inconsistency, manifest version: {version}", manifest.metadata.region_id ); } - Ok(()) + Ok(version) + } + + /// Sets the [`RegionRole`]. + /// + /// ``` + /// +------------------------------------------+ + /// | +-----------------+ | + /// | | | | + /// +---+------+ +-------+-----+ +--v-v---+ + /// | Follower | | Downgrading | | Leader | + /// +---^-^----+ +-----+-^-----+ +--+-+---+ + /// | | | | | | + /// | +------------------+ +-----------------+ | + /// +------------------------------------------+ + /// + /// Transition: + /// - Follower -> Leader + /// - Downgrading Leader -> Leader + /// - Leader -> Follower + /// - Downgrading Leader -> Follower + /// - Leader -> Downgrading Leader + /// + /// ``` + pub(crate) fn set_role(&self, next_role: RegionRole, region_id: RegionId) { + match next_role { + RegionRole::Follower => { + self.state.store(RegionRoleState::Follower); + } + RegionRole::Leader => { + match self.state.fetch_update(|state| { + if matches!( + state, + RegionRoleState::Follower + | RegionRoleState::Leader(RegionLeaderState::Downgrading) + ) { + Some(RegionRoleState::Leader(RegionLeaderState::Writable)) + } else { + None + } + }) { + Ok(state) => info!( + "Convert region {} to leader, previous role state: {:?}", + region_id, state + ), + Err(state) => { + if state != RegionRoleState::Leader(RegionLeaderState::Writable) { + warn!( + "Failed to convert region {} to leader, current role state: {:?}", + region_id, state + ) + } + } + } + } + RegionRole::DowngradingLeader => { + match self.state.compare_exchange( + RegionRoleState::Leader(RegionLeaderState::Writable), + RegionRoleState::Leader(RegionLeaderState::Downgrading), + ) { + Ok(state) => info!( + "Convert region {} to downgrading region, previous role state: {:?}", + region_id, state + ), + Err(state) => { + if state != RegionRoleState::Leader(RegionLeaderState::Downgrading) { + warn!( + "Failed to convert region {} to downgrading leader, current role state: {:?}", + region_id, state + ) + } + } + } + } + } } } @@ -434,10 +537,10 @@ impl RegionMap { .context(RegionNotFoundSnafu { region_id })?; ensure!( region.is_writable(), - RegionStateSnafu { + RegionLeaderStateSnafu { region_id, state: region.state(), - expect: RegionState::Writable, + expect: RegionLeaderState::Writable, } ); Ok(region) @@ -460,6 +563,40 @@ impl RegionMap { } } + /// Gets flushable region by region id. + /// + /// Returns error if the region does not exist or is not operable. + fn flushable_region(&self, region_id: RegionId) -> Result { + let region = self + .get_region(region_id) + .context(RegionNotFoundSnafu { region_id })?; + ensure!( + region.is_flushable(), + FlushableRegionStateSnafu { + region_id, + state: region.state(), + } + ); + Ok(region) + } + + /// Gets flushable region by region id. + /// + /// Calls the callback if the region does not exist or is not operable. + pub(crate) fn flushable_region_or( + &self, + region_id: RegionId, + cb: &mut F, + ) -> Option { + match self.flushable_region(region_id) { + Ok(region) => Some(region), + Err(e) => { + cb.on_failure(e); + None + } + } + } + /// Remove region by id. pub(crate) fn remove_region(&self, region_id: RegionId) { let mut regions = self.regions.write().unwrap(); @@ -548,12 +685,70 @@ impl ManifestStats { #[cfg(test)] mod tests { + use std::sync::Arc; + use crossbeam_utils::atomic::AtomicCell; + use store_api::region_engine::RegionRole; + use store_api::storage::RegionId; - use crate::region::RegionState; + use crate::region::{RegionLeaderState, RegionRoleState}; + use crate::test_util::scheduler_util::SchedulerEnv; + use crate::test_util::version_util::VersionControlBuilder; #[test] fn test_region_state_lock_free() { - assert!(AtomicCell::::is_lock_free()); + assert!(AtomicCell::::is_lock_free()); + } + + #[tokio::test] + async fn test_set_region_state() { + let env = SchedulerEnv::new().await; + let builder = VersionControlBuilder::new(); + let version_control = Arc::new(builder.build()); + let manifest_ctx = env + .mock_manifest_context(version_control.current().version.metadata.clone()) + .await; + + let region_id = RegionId::new(1024, 0); + // Leader -> Follower + manifest_ctx.set_role(RegionRole::Follower, region_id); + assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower); + + // Follower -> Leader + manifest_ctx.set_role(RegionRole::Leader, region_id); + assert_eq!( + manifest_ctx.state.load(), + RegionRoleState::Leader(RegionLeaderState::Writable) + ); + + // Leader -> Downgrading Leader + manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id); + assert_eq!( + manifest_ctx.state.load(), + RegionRoleState::Leader(RegionLeaderState::Downgrading) + ); + + // Downgrading Leader -> Follower + manifest_ctx.set_role(RegionRole::Follower, region_id); + assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower); + + // Can't downgrade from follower (Follower -> Downgrading Leader) + manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id); + assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower); + + // Set region role too Downgrading Leader + manifest_ctx.set_role(RegionRole::Leader, region_id); + manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id); + assert_eq!( + manifest_ctx.state.load(), + RegionRoleState::Leader(RegionLeaderState::Downgrading) + ); + + // Downgrading Leader -> Leader + manifest_ctx.set_role(RegionRole::Leader, region_id); + assert_eq!( + manifest_ctx.state.load(), + RegionRoleState::Leader(RegionLeaderState::Writable) + ); } } diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 64272a183bc9..b2a76490cc27 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -28,6 +28,7 @@ use snafu::{ensure, OptionExt}; use store_api::logstore::provider::Provider; use store_api::logstore::LogStore; use store_api::metadata::{ColumnMetadata, RegionMetadata}; +use store_api::region_engine::RegionRole; use store_api::storage::{ColumnId, RegionId}; use crate::access_layer::AccessLayer; @@ -42,7 +43,9 @@ use crate::memtable::time_partition::TimePartitions; use crate::memtable::MemtableBuilderProvider; use crate::region::options::RegionOptions; use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef}; -use crate::region::{ManifestContext, ManifestStats, MitoRegion, RegionState}; +use crate::region::{ + ManifestContext, ManifestStats, MitoRegion, RegionLeaderState, RegionRoleState, +}; use crate::region_write_ctx::RegionWriteCtx; use crate::request::OptionOutputTx; use crate::schedule::scheduler::SchedulerRef; @@ -169,8 +172,8 @@ impl RegionOpener { &expect.column_metadatas, &expect.primary_key, )?; - // To keep consistence with Create behavior, set the opened Region writable. - region.set_writable(true); + // To keep consistence with Create behavior, set the opened Region to RegionRole::Leader. + region.set_role(RegionRole::Leader); return Ok(region); } Ok(None) => { @@ -235,7 +238,7 @@ impl RegionOpener { // Region is writable after it is created. manifest_ctx: Arc::new(ManifestContext::new( manifest_manager, - RegionState::Writable, + RegionRoleState::Leader(RegionLeaderState::Writable), )), file_purger: Arc::new(LocalFilePurger::new( self.purge_scheduler, @@ -362,9 +365,10 @@ impl RegionOpener { let version_control = Arc::new(VersionControl::new(version)); if !self.skip_wal_replay { info!( - "Start replaying memtable at flushed_entry_id + 1 {} for region {}", + "Start replaying memtable at flushed_entry_id + 1: {} for region {}, manifest version: {}", flushed_entry_id + 1, - region_id + region_id, + manifest.manifest_version ); replay_memtable( &provider, @@ -377,7 +381,10 @@ impl RegionOpener { ) .await?; } else { - info!("Skip the WAL replay for region: {}", region_id); + info!( + "Skip the WAL replay for region: {}, manifest version: {}", + region_id, manifest.manifest_version + ); } let now = self.time_provider.current_time_millis(); @@ -388,7 +395,7 @@ impl RegionOpener { // Region is always opened in read only mode. manifest_ctx: Arc::new(ManifestContext::new( manifest_manager, - RegionState::ReadOnly, + RegionRoleState::Follower, )), file_purger, provider: provider.clone(), diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index d88bc994e97e..1e4c6b8dc986 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -31,7 +31,7 @@ use prost::Message; use smallvec::SmallVec; use snafu::{ensure, OptionExt, ResultExt}; use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef}; -use store_api::region_engine::SetReadonlyResponse; +use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState}; use store_api::region_request::{ AffectedRows, RegionAlterRequest, RegionCatchupRequest, RegionCloseRequest, RegionCompactRequest, RegionCreateRequest, RegionDropRequest, RegionFlushRequest, @@ -483,11 +483,13 @@ pub(crate) enum WorkerRequest { }, /// The internal commands. - SetReadonlyGracefully { + SetRegionRoleStateGracefully { /// Id of the region to send. region_id: RegionId, + /// The [SettableRegionRoleState]. + region_role_state: SettableRegionRoleState, /// The sender of [SetReadonlyResponse]. - sender: Sender, + sender: Sender, }, /// Notify a worker to stop. @@ -587,11 +589,16 @@ impl WorkerRequest { pub(crate) fn new_set_readonly_gracefully( region_id: RegionId, - ) -> (WorkerRequest, Receiver) { + region_role_state: SettableRegionRoleState, + ) -> (WorkerRequest, Receiver) { let (sender, receiver) = oneshot::channel(); ( - WorkerRequest::SetReadonlyGracefully { region_id, sender }, + WorkerRequest::SetRegionRoleStateGracefully { + region_id, + region_role_state, + sender, + }, receiver, ) } diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index cd449e53fae6..0bd85747c0f1 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -52,7 +52,7 @@ use rskafka::client::{Client, ClientBuilder}; use rskafka::record::Record; use rstest_reuse::template; use store_api::metadata::{ColumnMetadata, RegionMetadataRef}; -use store_api::region_engine::RegionEngine; +use store_api::region_engine::{RegionEngine, RegionRole}; use store_api::region_request::{ RegionCloseRequest, RegionCreateRequest, RegionDeleteRequest, RegionFlushRequest, RegionOpenRequest, RegionPutRequest, RegionRequest, @@ -1114,6 +1114,8 @@ pub async fn reopen_region( .unwrap(); if writable { - engine.set_writable(region_id, true).unwrap(); + engine + .set_region_role(region_id, RegionRole::Leader) + .unwrap(); } } diff --git a/src/mito2/src/test_util/scheduler_util.rs b/src/mito2/src/test_util/scheduler_util.rs index c1b85279deda..ba777b157fc3 100644 --- a/src/mito2/src/test_util/scheduler_util.rs +++ b/src/mito2/src/test_util/scheduler_util.rs @@ -31,7 +31,7 @@ use crate::config::MitoConfig; use crate::error::Result; use crate::flush::FlushScheduler; use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; -use crate::region::{ManifestContext, ManifestContextRef, RegionState}; +use crate::region::{ManifestContext, ManifestContextRef, RegionLeaderState, RegionRoleState}; use crate::request::WorkerRequest; use crate::schedule::scheduler::{Job, LocalScheduler, Scheduler, SchedulerRef}; use crate::sst::index::intermediate::IntermediateManager; @@ -124,7 +124,7 @@ impl SchedulerEnv { ) .await .unwrap(), - RegionState::Writable, + RegionRoleState::Leader(RegionLeaderState::Writable), )) } diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index c2fbc8098203..e790ed08c1a9 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -39,7 +39,7 @@ use prometheus::IntGauge; use rand::{thread_rng, Rng}; use snafu::{ensure, ResultExt}; use store_api::logstore::LogStore; -use store_api::region_engine::SetReadonlyResponse; +use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState}; use store_api::storage::RegionId; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::{mpsc, oneshot, watch, Mutex}; @@ -734,8 +734,13 @@ impl RegionWorkerLoop { // For background notify, we handle it directly. self.handle_background_notify(region_id, notify).await; } - WorkerRequest::SetReadonlyGracefully { region_id, sender } => { - self.set_readonly_gracefully(region_id, sender).await; + WorkerRequest::SetRegionRoleStateGracefully { + region_id, + region_role_state, + sender, + } => { + self.set_role_state_gracefully(region_id, region_role_state, sender) + .await; } WorkerRequest::EditRegion(request) => { self.handle_region_edit(request).await; @@ -834,22 +839,23 @@ impl RegionWorkerLoop { } } - /// Handles `set_readonly_gracefully`. - async fn set_readonly_gracefully( + /// Handles `set_region_role_gracefully`. + async fn set_role_state_gracefully( &mut self, region_id: RegionId, - sender: oneshot::Sender, + region_role_state: SettableRegionRoleState, + sender: oneshot::Sender, ) { if let Some(region) = self.regions.get_region(region_id) { // We need to do this in background as we need the manifest lock. common_runtime::spawn_global(async move { - region.set_readonly_gracefully().await; + region.set_role_state_gracefully(region_role_state).await; let last_entry_id = region.version_control.current().last_entry_id; - let _ = sender.send(SetReadonlyResponse::success(Some(last_entry_id))); + let _ = sender.send(SetRegionRoleStateResponse::success(Some(last_entry_id))); }); } else { - let _ = sender.send(SetReadonlyResponse::NotFound); + let _ = sender.send(SetRegionRoleStateResponse::NotFound); } } } diff --git a/src/mito2/src/worker/handle_catchup.rs b/src/mito2/src/worker/handle_catchup.rs index 505c994d3607..cacd563ed78e 100644 --- a/src/mito2/src/worker/handle_catchup.rs +++ b/src/mito2/src/worker/handle_catchup.rs @@ -20,6 +20,7 @@ use common_telemetry::info; use common_telemetry::tracing::warn; use snafu::ensure; use store_api::logstore::LogStore; +use store_api::region_engine::RegionRole; use store_api::region_request::{AffectedRows, RegionCatchupRequest}; use store_api::storage::RegionId; use tokio::time::Instant; @@ -47,7 +48,8 @@ impl RegionWorkerLoop { // Utilizes the short circuit evaluation. let region = if !is_mutable_empty || region.manifest_ctx.has_update().await? { - info!("Reopening the region: {region_id}, empty mutable: {is_mutable_empty}"); + let manifest_version = region.manifest_ctx.manifest_version().await; + info!("Reopening the region: {region_id}, empty mutable: {is_mutable_empty}, manifest version: {manifest_version}"); let reopened_region = Arc::new( RegionOpener::new( region_id, @@ -112,7 +114,7 @@ impl RegionWorkerLoop { } if request.set_writable { - region.set_writable(true); + region.set_role(RegionRole::Leader); } Ok(0) diff --git a/src/mito2/src/worker/handle_drop.rs b/src/mito2/src/worker/handle_drop.rs index 51b42acb406f..a569f2236029 100644 --- a/src/mito2/src/worker/handle_drop.rs +++ b/src/mito2/src/worker/handle_drop.rs @@ -28,7 +28,7 @@ use store_api::storage::RegionId; use tokio::time::sleep; use crate::error::{OpenDalSnafu, Result}; -use crate::region::{RegionMapRef, RegionState}; +use crate::region::{RegionLeaderState, RegionMapRef}; use crate::worker::{RegionWorkerLoop, DROPPING_MARKER_FILE}; const GC_TASK_INTERVAL_SEC: u64 = 5 * 60; // 5 minutes @@ -62,7 +62,7 @@ where // Sets the state back to writable. It's possible that the marker file has been written. // We set the state back to writable so we can retry the drop operation. - region.switch_state_to_writable(RegionState::Dropping); + region.switch_state_to_writable(RegionLeaderState::Dropping); })?; region.stop().await; diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index 14a70225bbe1..b2bc5fd2e865 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -36,16 +36,18 @@ impl RegionWorkerLoop { request: RegionFlushRequest, mut sender: OptionOutputTx, ) { - let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else { + let Some(region) = self.regions.flushable_region_or(region_id, &mut sender) else { return; }; - let mut task = self.new_flush_task( - ®ion, - FlushReason::Manual, - request.row_group_size, - self.config.clone(), - ); + let reason = if region.is_downgrading() { + FlushReason::Downgrading + } else { + FlushReason::Manual + }; + + let mut task = + self.new_flush_task(®ion, reason, request.row_group_size, self.config.clone()); task.push_sender(sender); if let Err(e) = self.flush_scheduler diff --git a/src/mito2/src/worker/handle_manifest.rs b/src/mito2/src/worker/handle_manifest.rs index de5f4e563d43..e97b30afec76 100644 --- a/src/mito2/src/worker/handle_manifest.rs +++ b/src/mito2/src/worker/handle_manifest.rs @@ -27,7 +27,7 @@ use crate::error::{RegionBusySnafu, RegionNotFoundSnafu, Result}; use crate::manifest::action::{ RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionTruncate, }; -use crate::region::{MitoRegionRef, RegionState}; +use crate::region::{MitoRegionRef, RegionLeaderState, RegionRoleState}; use crate::request::{ BackgroundNotify, OptionOutputTx, RegionChangeResult, RegionEditRequest, RegionEditResult, TruncateResult, WorkerRequest, @@ -84,7 +84,7 @@ impl RegionWorkerLoop { }; if !region.is_writable() { - if region.state() == RegionState::Editing { + if region.state() == RegionRoleState::Leader(RegionLeaderState::Editing) { self.region_edit_queues .entry(region_id) .or_insert_with(|| RegionEditQueue::new(region_id)) @@ -159,7 +159,7 @@ impl RegionWorkerLoop { } // Sets the region as writable. - region.switch_state_to_writable(RegionState::Editing); + region.switch_state_to_writable(RegionLeaderState::Editing); let _ = edit_result.sender.send(edit_result.result); @@ -199,8 +199,9 @@ impl RegionWorkerLoop { RegionMetaActionList::with_action(RegionMetaAction::Truncate(truncate.clone())); let result = manifest_ctx - .update_manifest(RegionState::Truncating, action_list) - .await; + .update_manifest(RegionLeaderState::Truncating, action_list) + .await + .map(|_| ()); // Sends the result back to the request sender. let truncate_result = TruncateResult { @@ -241,8 +242,9 @@ impl RegionWorkerLoop { let result = region .manifest_ctx - .update_manifest(RegionState::Altering, action_list) - .await; + .update_manifest(RegionLeaderState::Altering, action_list) + .await + .map(|_| ()); let notify = WorkerRequest::Background { region_id: region.region_id, notify: BackgroundNotify::RegionChange(RegionChangeResult { @@ -291,7 +293,7 @@ impl RegionWorkerLoop { } // Sets the region as writable. - region.switch_state_to_writable(RegionState::Altering); + region.switch_state_to_writable(RegionLeaderState::Altering); change_result.sender.send(change_result.result.map(|_| 0)); } @@ -338,6 +340,7 @@ async fn edit_region( let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit)); region .manifest_ctx - .update_manifest(RegionState::Editing, action_list) + .update_manifest(RegionLeaderState::Editing, action_list) .await + .map(|_| ()) } diff --git a/src/mito2/src/worker/handle_truncate.rs b/src/mito2/src/worker/handle_truncate.rs index da5b74e511f3..863b1961a34f 100644 --- a/src/mito2/src/worker/handle_truncate.rs +++ b/src/mito2/src/worker/handle_truncate.rs @@ -20,7 +20,7 @@ use store_api::storage::RegionId; use crate::error::RegionNotFoundSnafu; use crate::manifest::action::RegionTruncate; -use crate::region::RegionState; +use crate::region::RegionLeaderState; use crate::request::{OptionOutputTx, TruncateResult}; use crate::worker::RegionWorkerLoop; @@ -63,7 +63,7 @@ impl RegionWorkerLoop { }; // We are already in the worker loop so we can set the state first. - region.switch_state_to_writable(RegionState::Truncating); + region.switch_state_to_writable(RegionLeaderState::Truncating); match truncate_result.result { Ok(()) => { diff --git a/src/operator/src/tests/partition_manager.rs b/src/operator/src/tests/partition_manager.rs index bc7907903681..2f1ffeffb587 100644 --- a/src/operator/src/tests/partition_manager.rs +++ b/src/operator/src/tests/partition_manager.rs @@ -142,7 +142,7 @@ pub(crate) async fn create_partition_rule_manager( }, leader_peer: Some(Peer::new(3, "")), follower_peers: vec![], - leader_status: None, + leader_state: None, leader_down_since: None, }, RegionRoute { @@ -173,7 +173,7 @@ pub(crate) async fn create_partition_rule_manager( }, leader_peer: Some(Peer::new(2, "")), follower_peers: vec![], - leader_status: None, + leader_state: None, leader_down_since: None, }, RegionRoute { @@ -196,7 +196,7 @@ pub(crate) async fn create_partition_rule_manager( }, leader_peer: Some(Peer::new(1, "")), follower_peers: vec![], - leader_status: None, + leader_state: None, leader_down_since: None, }, ]), diff --git a/src/query/src/optimizer/test_util.rs b/src/query/src/optimizer/test_util.rs index 6b14a5af52e4..58e014affff7 100644 --- a/src/query/src/optimizer/test_util.rs +++ b/src/query/src/optimizer/test_util.rs @@ -28,7 +28,8 @@ use store_api::metadata::{ ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef, }; use store_api::region_engine::{ - RegionEngine, RegionRole, RegionScannerRef, RegionStatistic, SetReadonlyResponse, + RegionEngine, RegionRole, RegionScannerRef, RegionStatistic, SetRegionRoleStateResponse, + SettableRegionRoleState, }; use store_api::region_request::RegionRequest; use store_api::storage::{ConcreteDataType, RegionId, ScanRequest}; @@ -89,14 +90,15 @@ impl RegionEngine for MetaRegionEngine { Ok(()) } - fn set_writable(&self, _region_id: RegionId, _writable: bool) -> Result<(), BoxedError> { + fn set_region_role(&self, _region_id: RegionId, _role: RegionRole) -> Result<(), BoxedError> { unimplemented!() } - async fn set_readonly_gracefully( + async fn set_region_role_state_gracefully( &self, _region_id: RegionId, - ) -> Result { + _region_role_state: SettableRegionRoleState, + ) -> Result { unimplemented!() } diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index 483d3cc1adbe..850b9ad3e2d6 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -36,9 +36,32 @@ use crate::metadata::RegionMetadataRef; use crate::region_request::{RegionOpenRequest, RegionRequest}; use crate::storage::{RegionId, ScanRequest}; -/// The result of setting readonly for the region. +/// The settable region role state. +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub enum SettableRegionRoleState { + Follower, + DowngradingLeader, +} + +impl From for RegionRole { + fn from(value: SettableRegionRoleState) -> Self { + match value { + SettableRegionRoleState::Follower => RegionRole::Follower, + SettableRegionRoleState::DowngradingLeader => RegionRole::DowngradingLeader, + } + } +} + +/// The request to set region role state. +#[derive(Debug, PartialEq, Eq)] +pub struct SetRegionRoleStateRequest { + region_id: RegionId, + region_role_state: SettableRegionRoleState, +} + +/// The response of setting region role state. #[derive(Debug, PartialEq, Eq)] -pub enum SetReadonlyResponse { +pub enum SetRegionRoleStateResponse { Success { /// Returns `last_entry_id` of the region if available(e.g., It's not available in file engine). last_entry_id: Option, @@ -46,8 +69,8 @@ pub enum SetReadonlyResponse { NotFound, } -impl SetReadonlyResponse { - /// Returns a [SetReadonlyResponse::Success] with the `last_entry_id`. +impl SetRegionRoleStateResponse { + /// Returns a [SetRegionRoleStateResponse::Success] with the `last_entry_id`. pub fn success(last_entry_id: Option) -> Self { Self::Success { last_entry_id } } @@ -58,6 +81,7 @@ pub struct GrantedRegion { pub region_id: RegionId, pub region_role: RegionRole, } + impl GrantedRegion { pub fn new(region_id: RegionId, region_role: RegionRole) -> Self { Self { @@ -85,12 +109,18 @@ impl From for GrantedRegion { } } +/// The role of the region. +/// TODO(weny): rename it to `RegionRoleState` #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] pub enum RegionRole { // Readonly region(mito2) Follower, // Writable region(mito2), Readonly region(file). Leader, + // Leader is downgrading to follower. + // + // This state is used to prevent new write requests. + DowngradingLeader, } impl Display for RegionRole { @@ -98,6 +128,7 @@ impl Display for RegionRole { match self { RegionRole::Follower => write!(f, "Follower"), RegionRole::Leader => write!(f, "Leader"), + RegionRole::DowngradingLeader => write!(f, "Leader(Downgrading)"), } } } @@ -113,6 +144,7 @@ impl From for PbRegionRole { match value { RegionRole::Follower => PbRegionRole::Follower, RegionRole::Leader => PbRegionRole::Leader, + RegionRole::DowngradingLeader => PbRegionRole::DowngradingLeader, } } } @@ -122,6 +154,7 @@ impl From for RegionRole { match value { PbRegionRole::Leader => RegionRole::Leader, PbRegionRole::Follower => RegionRole::Follower, + PbRegionRole::DowngradingLeader => RegionRole::DowngradingLeader, } } } @@ -331,20 +364,21 @@ pub trait RegionEngine: Send + Sync { /// Stops the engine async fn stop(&self) -> Result<(), BoxedError>; - /// Sets writable mode for a region. + /// Sets [RegionRole] for a region. /// /// The engine checks whether the region is writable before writing to the region. Setting /// the region as readonly doesn't guarantee that write operations in progress will not /// take effect. - fn set_writable(&self, region_id: RegionId, writable: bool) -> Result<(), BoxedError>; + fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError>; - /// Sets readonly for a region gracefully. + /// Sets region role state gracefully. /// /// After the call returns, the engine ensures no more write operations will succeed in the region. - async fn set_readonly_gracefully( + async fn set_region_role_state_gracefully( &self, region_id: RegionId, - ) -> Result; + region_role_state: SettableRegionRoleState, + ) -> Result; /// Indicates region role. ///