Skip to content

Commit

Permalink
chore: apply suggestions from CR
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Sep 30, 2024
1 parent 5b86ad9 commit 20b3e61
Show file tree
Hide file tree
Showing 23 changed files with 161 additions and 130 deletions.
14 changes: 7 additions & 7 deletions src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ use crate::key::table_route::TableRouteKey;
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::kv_backend::txn::{Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::rpc::router::{region_distribution, RegionRoute, RegionState};
use crate::rpc::router::{region_distribution, LeaderState, RegionRoute};
use crate::rpc::store::BatchDeleteRequest;
use crate::DatanodeId;

Expand Down Expand Up @@ -1126,7 +1126,7 @@ impl TableMetadataManager {
next_region_route_status: F,
) -> Result<()>
where
F: Fn(&RegionRoute) -> Option<Option<RegionState>>,
F: Fn(&RegionRoute) -> Option<Option<LeaderState>>,
{
let mut new_region_routes = current_table_route_value.region_routes()?.clone();

Expand Down Expand Up @@ -1280,7 +1280,7 @@ mod tests {
use crate::key::{DeserializedValueWithBytes, TableMetadataManager, ViewInfoValue};
use crate::kv_backend::memory::MemoryKvBackend;
use crate::peer::Peer;
use crate::rpc::router::{region_distribution, Region, RegionRoute, RegionState};
use crate::rpc::router::{region_distribution, LeaderState, Region, RegionRoute};

#[test]
fn test_deserialized_value_with_bytes() {
Expand Down Expand Up @@ -1715,7 +1715,7 @@ mod tests {
attrs: BTreeMap::new(),
},
leader_peer: Some(Peer::new(datanode, "a2")),
leader_state: Some(RegionState::Downgrading),
leader_state: Some(LeaderState::Downgrading),
follower_peers: vec![],
leader_down_since: Some(current_time_millis()),
},
Expand Down Expand Up @@ -1753,7 +1753,7 @@ mod tests {
if region_route.leader_state.is_some() {
None
} else {
Some(Some(RegionState::Downgrading))
Some(Some(LeaderState::Downgrading))
}
})
.await
Expand All @@ -1769,7 +1769,7 @@ mod tests {

assert_eq!(
updated_route_value.region_routes().unwrap()[0].leader_state,
Some(RegionState::Downgrading)
Some(LeaderState::Downgrading)
);

assert!(updated_route_value.region_routes().unwrap()[0]
Expand All @@ -1778,7 +1778,7 @@ mod tests {

assert_eq!(
updated_route_value.region_routes().unwrap()[1].leader_state,
Some(RegionState::Downgrading)
Some(LeaderState::Downgrading)
);
assert!(updated_route_value.region_routes().unwrap()[1]
.leader_down_since
Expand Down
25 changes: 13 additions & 12 deletions src/common/meta/src/rpc/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ pub fn convert_to_region_peer_map(
/// Returns the HashMap<[RegionNumber], [RegionStatus]>;
pub fn convert_to_region_leader_status_map(
region_routes: &[RegionRoute],
) -> HashMap<RegionNumber, RegionState> {
) -> HashMap<RegionNumber, LeaderState> {
region_routes
.iter()
.filter_map(|x| {
Expand Down Expand Up @@ -264,7 +264,7 @@ pub struct RegionRoute {
alias = "leader_status",
skip_serializing_if = "Option::is_none"
)]
pub leader_state: Option<RegionState>,
pub leader_state: Option<LeaderState>,
/// The start time when the leader is in `Downgraded` status.
#[serde(default)]
#[builder(default = "self.default_leader_down_since()")]
Expand All @@ -274,17 +274,17 @@ pub struct RegionRoute {
impl RegionRouteBuilder {
fn default_leader_down_since(&self) -> Option<i64> {
match self.leader_state {
Some(Some(RegionState::Downgrading)) => Some(current_time_millis()),
Some(Some(LeaderState::Downgrading)) => Some(current_time_millis()),
_ => None,
}
}
}

/// The State of the [`Region`].
/// The State of the [`Region`] Leader.
/// TODO(dennis): It's better to add more fine-grained statuses such as `PENDING` etc.
#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, AsRefStr)]
#[strum(serialize_all = "UPPERCASE")]
pub enum RegionState {
pub enum LeaderState {
/// The following cases in which the [`Region`] will be downgraded.
///
/// - The [`Region`] may be unavailable (e.g., Crashed, Network disconnected).
Expand All @@ -301,7 +301,7 @@ impl RegionRoute {
/// - The [`Region`] was planned to migrate to another [`Peer`].
///
pub fn is_leader_downgrading(&self) -> bool {
matches!(self.leader_state, Some(RegionState::Downgrading))
matches!(self.leader_state, Some(LeaderState::Downgrading))
}

/// Marks the Leader [`Region`] as [`RegionState::Downgrading`].
Expand All @@ -311,12 +311,13 @@ impl RegionRoute {
/// - During the [`Region`] Failover Procedure.
/// - Migrating a [`Region`].
///
/// TODO(weny): Update comments
/// **Notes:** Meta Server will stop renewing the lease for the downgrading [`Region`].
/// **Notes:** Meta Server will renewing a special lease(`Downgrading`) for the downgrading [`Region`].
///
/// A downgrading region will reject any write requests, and only allow memetable to be flushed to object storage
///
pub fn downgrade_leader(&mut self) {
self.leader_down_since = Some(current_time_millis());
self.leader_state = Some(RegionState::Downgrading)
self.leader_state = Some(LeaderState::Downgrading)
}

/// Returns how long since the leader is in `Downgraded` status.
Expand All @@ -328,14 +329,14 @@ impl RegionRoute {
/// Sets the leader status.
///
/// Returns true if updated.
pub fn set_leader_status(&mut self, status: Option<RegionState>) -> bool {
pub fn set_leader_status(&mut self, status: Option<LeaderState>) -> bool {
let updated = self.leader_state != status;

match (status, updated) {
(Some(RegionState::Downgrading), true) => {
(Some(LeaderState::Downgrading), true) => {
self.leader_down_since = Some(current_time_millis());
}
(Some(RegionState::Downgrading), false) => {
(Some(LeaderState::Downgrading), false) => {
// Do nothing if leader is still in `Downgraded` status.
}
_ => {
Expand Down
4 changes: 2 additions & 2 deletions src/datanode/src/alive_keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl RegionAliveKeeper {
let _ = self
.region_server
.set_region_role(region_id, RegionRole::Follower);
error!(e; "Failed to close staled region {}, convert region to readonly.",region_id);
error!(e; "Failed to close staled region {}, convert region to follower.", region_id);
}
}
}
Expand Down Expand Up @@ -401,7 +401,7 @@ impl CountdownTask {
}
}
() = &mut countdown => {
warn!("The region {region_id} lease is expired, convert region to readonly.");
warn!("The region {region_id} lease is expired, convert region to follower.");
let _ = self.region_server.set_region_role(self.region_id, RegionRole::Follower);
// resets the countdown.
let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 30);
Expand Down
2 changes: 1 addition & 1 deletion src/datanode/src/heartbeat/handler/downgrade_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl HandlerContext {
});
}
Err(err) => {
warn!("Failed to set region to downgrading: {err:?}");
warn!(err; "Failed to convert region to downgrading leader");
return InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
exists: true,
Expand Down
4 changes: 2 additions & 2 deletions src/meta-srv/src/handler/region_lease_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ mod test {
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::peer::Peer;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::rpc::router::{Region, RegionRoute, RegionState};
use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
use store_api::region_engine::RegionRole;
use store_api::storage::RegionId;

Expand Down Expand Up @@ -297,7 +297,7 @@ mod test {
region: Region::new_test(region_id),
leader_peer: Some(peer.clone()),
follower_peers: vec![follower_peer.clone()],
leader_state: Some(RegionState::Downgrading),
leader_state: Some(LeaderState::Downgrading),
leader_down_since: Some(1),
},
RegionRoute {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use common_error::ext::BoxedError;
use common_meta::rpc::router::RegionState;
use common_meta::rpc::router::LeaderState;
use snafu::ResultExt;

use crate::error::{self, Result};
Expand Down Expand Up @@ -53,7 +53,7 @@ impl UpdateMetadata {
.as_ref()
.is_some_and(|leader_peer| leader_peer.id == from_peer_id)
{
Some(Some(RegionState::Downgrading))
Some(Some(LeaderState::Downgrading))
} else {
None
}
Expand Down Expand Up @@ -81,7 +81,7 @@ mod tests {

use common_meta::key::test_utils::new_test_table_info;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute, RegionState};
use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
use store_api::storage::RegionId;

use crate::error::Error;
Expand Down Expand Up @@ -155,7 +155,7 @@ mod tests {
table_metadata_manager
.update_leader_region_status(table_id, &original_table_route, |route| {
if route.region.id == RegionId::new(1024, 2) {
Some(Some(RegionState::Downgrading))
Some(Some(LeaderState::Downgrading))
} else {
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ mod tests {

use common_meta::key::test_utils::new_test_table_info;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute, RegionState};
use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
use store_api::storage::RegionId;

use crate::error::Error;
Expand Down Expand Up @@ -110,13 +110,13 @@ mod tests {
RegionRoute {
region: Region::new_test(RegionId::new(1024, 1)),
leader_peer: Some(from_peer.clone()),
leader_state: Some(RegionState::Downgrading),
leader_state: Some(LeaderState::Downgrading),
..Default::default()
},
RegionRoute {
region: Region::new_test(RegionId::new(1024, 2)),
leader_peer: Some(Peer::empty(4)),
leader_state: Some(RegionState::Downgrading),
leader_state: Some(LeaderState::Downgrading),
..Default::default()
},
RegionRoute {
Expand Down Expand Up @@ -207,13 +207,13 @@ mod tests {
RegionRoute {
region: Region::new_test(RegionId::new(1024, 1)),
leader_peer: Some(from_peer.clone()),
leader_state: Some(RegionState::Downgrading),
leader_state: Some(LeaderState::Downgrading),
..Default::default()
},
RegionRoute {
region: Region::new_test(RegionId::new(1024, 2)),
leader_peer: Some(Peer::empty(4)),
leader_state: Some(RegionState::Downgrading),
leader_state: Some(LeaderState::Downgrading),
..Default::default()
},
RegionRoute {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ mod tests {
use common_meta::key::test_utils::new_test_table_info;
use common_meta::peer::Peer;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::rpc::router::{Region, RegionRoute, RegionState};
use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
use common_time::util::current_time_millis;
use store_api::storage::RegionId;

Expand Down Expand Up @@ -286,7 +286,7 @@ mod tests {
region: Region::new_test(RegionId::new(1024, 1)),
leader_peer: Some(Peer::empty(1)),
follower_peers: vec![Peer::empty(2), Peer::empty(3)],
leader_state: Some(RegionState::Downgrading),
leader_state: Some(LeaderState::Downgrading),
leader_down_since: Some(current_time_millis()),
}];

Expand Down Expand Up @@ -319,13 +319,13 @@ mod tests {
region: Region::new_test(RegionId::new(table_id, 1)),
leader_peer: Some(Peer::empty(1)),
follower_peers: vec![Peer::empty(5), Peer::empty(3)],
leader_state: Some(RegionState::Downgrading),
leader_state: Some(LeaderState::Downgrading),
leader_down_since: Some(current_time_millis()),
},
RegionRoute {
region: Region::new_test(RegionId::new(table_id, 2)),
leader_peer: Some(Peer::empty(4)),
leader_state: Some(RegionState::Downgrading),
leader_state: Some(LeaderState::Downgrading),
..Default::default()
},
];
Expand Down Expand Up @@ -430,7 +430,7 @@ mod tests {
region: Region::new_test(RegionId::new(1024, 1)),
leader_peer: Some(candidate_peer),
follower_peers: vec![Peer::empty(2), Peer::empty(3)],
leader_state: Some(RegionState::Downgrading),
leader_state: Some(LeaderState::Downgrading),
leader_down_since: None,
}];

Expand All @@ -455,7 +455,7 @@ mod tests {
let region_routes = vec![RegionRoute {
region: Region::new_test(RegionId::new(table_id, 1)),
leader_peer: Some(Peer::empty(1)),
leader_state: Some(RegionState::Downgrading),
leader_state: Some(LeaderState::Downgrading),
..Default::default()
}];

Expand Down
6 changes: 3 additions & 3 deletions src/meta-srv/src/region/lease_keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ mod tests {
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::peer::Peer;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::rpc::router::{Region, RegionRouteBuilder, RegionState};
use common_meta::rpc::router::{LeaderState, Region, RegionRouteBuilder};
use store_api::region_engine::RegionRole;
use store_api::storage::RegionId;
use table::metadata::RawTableInfo;
Expand Down Expand Up @@ -265,7 +265,7 @@ mod tests {
Some((region_id, RegionRole::Follower))
);

region_route.leader_state = Some(RegionState::Downgrading);
region_route.leader_state = Some(LeaderState::Downgrading);
// The downgraded leader region on the datanode.
assert_eq!(
renew_region_lease_via_region_route(&region_route, leader_peer_id, region_id),
Expand Down Expand Up @@ -492,7 +492,7 @@ mod tests {
.region(Region::new_test(region_id))
.leader_peer(Peer::empty(leader_peer_id))
.follower_peers(vec![Peer::empty(follower_peer_id)])
.leader_state(RegionState::Downgrading)
.leader_state(LeaderState::Downgrading)
.build()
.unwrap();

Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/compaction/compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use api::v1::region::compact_request;
use common_telemetry::info;
use object_store::manager::ObjectStoreManagerRef;
use serde::{Deserialize, Serialize};
use smallvec::{smallvec, SmallVec};
use smallvec::SmallVec;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;
Expand Down Expand Up @@ -381,7 +381,7 @@ impl Compactor for DefaultCompactor {
// TODO: We might leak files if we fail to update manifest. We can add a cleanup task to remove them later.
compaction_region
.manifest_ctx
.update_manifest(smallvec![RegionLeaderState::Writable], action_list)
.update_manifest(RegionLeaderState::Writable, action_list)
.await?;

Ok(edit)
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ impl EngineInner {
.get_region(region_id)
.context(RegionNotFoundSnafu { region_id })?;

region.set_region_role_state(role);
region.set_role(role);
Ok(())
}

Expand Down
Loading

0 comments on commit 20b3e61

Please sign in to comment.