From e88465840d2e11bf65eb4f3907aaf95ee04c0878 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Fri, 6 Sep 2024 16:29:20 +0800 Subject: [PATCH] feat: add extension field to HeartbeatRequest (#4688) * feat: add extension field to HeartbeatRequest * chore: extension to extensions * chore: upgrade proto --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/api/src/region.rs | 6 +++--- src/common/meta/src/ddl/alter_logical_tables.rs | 2 +- src/common/meta/src/ddl/create_logical_tables.rs | 2 +- src/datanode/src/heartbeat.rs | 4 +++- src/datanode/src/region_server.rs | 8 ++++---- src/meta-srv/src/handler/failure_handler.rs | 1 + src/meta-srv/src/handler/node_stat.rs | 5 ++++- src/meta-srv/src/handler/region_lease_handler.rs | 1 + src/meta-srv/src/procedure/utils.rs | 2 +- src/meta-srv/src/selector/weight_compute.rs | 3 +++ src/metric-engine/src/engine.rs | 2 +- src/operator/src/flow.rs | 2 +- 14 files changed, 26 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a433b418417d..6391920f4748 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4300,7 +4300,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=c437b55725b7f5224fe9d46db21072b4a682ee4b#c437b55725b7f5224fe9d46db21072b4a682ee4b" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=157cfdb52709e489cf1f3ce8e3042ed4ee8a524a#157cfdb52709e489cf1f3ce8e3042ed4ee8a524a" dependencies = [ "prost 0.12.6", "serde", diff --git a/Cargo.toml b/Cargo.toml index e4a04c1f474c..93ea8db134a3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -120,7 +120,7 @@ etcd-client = { version = "0.13" } fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "c437b55725b7f5224fe9d46db21072b4a682ee4b" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "157cfdb52709e489cf1f3ce8e3042ed4ee8a524a" } humantime = "2.1" humantime-serde = "1.1" itertools = "0.10" diff --git a/src/api/src/region.rs b/src/api/src/region.rs index 0493378213c7..d75238253499 100644 --- a/src/api/src/region.rs +++ b/src/api/src/region.rs @@ -21,14 +21,14 @@ use greptime_proto::v1::region::RegionResponse as RegionResponseV1; #[derive(Debug)] pub struct RegionResponse { pub affected_rows: AffectedRows, - pub extension: HashMap>, + pub extensions: HashMap>, } impl RegionResponse { pub fn from_region_response(region_response: RegionResponseV1) -> Self { Self { affected_rows: region_response.affected_rows as _, - extension: region_response.extension, + extensions: region_response.extensions, } } @@ -36,7 +36,7 @@ impl RegionResponse { pub fn new(affected_rows: AffectedRows) -> Self { Self { affected_rows, - extension: Default::default(), + extensions: Default::default(), } } } diff --git a/src/common/meta/src/ddl/alter_logical_tables.rs b/src/common/meta/src/ddl/alter_logical_tables.rs index 48d34b4307c3..3af359ef6e81 100644 --- a/src/common/meta/src/ddl/alter_logical_tables.rs +++ b/src/common/meta/src/ddl/alter_logical_tables.rs @@ -131,7 +131,7 @@ impl AlterLogicalTablesProcedure { let phy_raw_schemas = future::join_all(alter_region_tasks) .await .into_iter() - .map(|res| res.map(|mut res| res.extension.remove(ALTER_PHYSICAL_EXTENSION_KEY))) + .map(|res| res.map(|mut res| res.extensions.remove(ALTER_PHYSICAL_EXTENSION_KEY))) .collect::>>()?; if phy_raw_schemas.is_empty() { diff --git a/src/common/meta/src/ddl/create_logical_tables.rs b/src/common/meta/src/ddl/create_logical_tables.rs index 5095b7c32e1a..4b867147be61 100644 --- a/src/common/meta/src/ddl/create_logical_tables.rs +++ b/src/common/meta/src/ddl/create_logical_tables.rs @@ -157,7 +157,7 @@ impl CreateLogicalTablesProcedure { let phy_raw_schemas = join_all(create_region_tasks) .await .into_iter() - .map(|res| res.map(|mut res| res.extension.remove(ALTER_PHYSICAL_EXTENSION_KEY))) + .map(|res| res.map(|mut res| res.extensions.remove(ALTER_PHYSICAL_EXTENSION_KEY))) .collect::>>()?; if phy_raw_schemas.is_empty() { diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index 04e9d9ac5b6f..68b4637fce2c 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -324,10 +324,12 @@ impl HeartbeatTask { region_id: stat.region_id.as_u64(), engine: stat.engine, role: RegionRole::from(stat.role).into(), - // TODO(jeremy): w/rcus + // TODO(weny): w/rcus rcus: 0, wcus: 0, approximate_bytes: region_server.region_disk_usage(stat.region_id).unwrap_or(0), + // TODO(weny): add extensions + extensions: Default::default(), }) .collect() } diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index f6cc479d6a17..56068a38c3aa 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -366,10 +366,10 @@ impl RegionServerHandler for RegionServer { // merge results by sum up affected rows and merge extensions. let mut affected_rows = 0; - let mut extension = HashMap::new(); + let mut extensions = HashMap::new(); for result in results { affected_rows += result.affected_rows; - extension.extend(result.extension); + extensions.extend(result.extensions); } Ok(RegionResponseV1 { @@ -380,7 +380,7 @@ impl RegionServerHandler for RegionServer { }), }), affected_rows: affected_rows as _, - extension, + extensions, }) } } @@ -708,7 +708,7 @@ impl RegionServerInner { .await?; Ok(RegionResponse { affected_rows: result.affected_rows, - extension: result.extension, + extensions: result.extensions, }) } Err(err) => { diff --git a/src/meta-srv/src/handler/failure_handler.rs b/src/meta-srv/src/handler/failure_handler.rs index f8acdd75c255..ebeeaf6b7fde 100644 --- a/src/meta-srv/src/handler/failure_handler.rs +++ b/src/meta-srv/src/handler/failure_handler.rs @@ -93,6 +93,7 @@ mod tests { approximate_bytes: 0, engine: default_engine().to_string(), role: RegionRole::Follower, + extensions: Default::default(), } } acc.stat = Some(Stat { diff --git a/src/meta-srv/src/handler/node_stat.rs b/src/meta-srv/src/handler/node_stat.rs index b7fe55a0f47f..5f1ec1cc2b7a 100644 --- a/src/meta-srv/src/handler/node_stat.rs +++ b/src/meta-srv/src/handler/node_stat.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use api::v1::meta::HeartbeatRequest; use common_meta::ClusterId; @@ -57,6 +57,8 @@ pub struct RegionStat { pub engine: String, /// The region role. pub role: RegionRole, + /// The extension info of this region + pub extensions: HashMap>, } impl Stat { @@ -142,6 +144,7 @@ impl TryFrom for RegionStat { approximate_bytes: value.approximate_bytes, engine: value.engine.to_string(), role: RegionRole::from(value.role()), + extensions: value.extensions, }) } } diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index 2481e86c8f5a..28ddb436e0ad 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -135,6 +135,7 @@ mod test { wcus: 0, approximate_bytes: 0, engine: String::new(), + extensions: Default::default(), } } diff --git a/src/meta-srv/src/procedure/utils.rs b/src/meta-srv/src/procedure/utils.rs index 09f0400ba118..c4e1688de07c 100644 --- a/src/meta-srv/src/procedure/utils.rs +++ b/src/meta-srv/src/procedure/utils.rs @@ -100,7 +100,7 @@ pub mod mock { }), }), affected_rows: 0, - extension: Default::default(), + extensions: Default::default(), }) } } diff --git a/src/meta-srv/src/selector/weight_compute.rs b/src/meta-srv/src/selector/weight_compute.rs index a87a1b3b7f47..c8c555d20445 100644 --- a/src/meta-srv/src/selector/weight_compute.rs +++ b/src/meta-srv/src/selector/weight_compute.rs @@ -199,6 +199,7 @@ mod tests { approximate_bytes: 1, engine: "mito2".to_string(), role: RegionRole::Leader, + extensions: Default::default(), }], ..Default::default() } @@ -215,6 +216,7 @@ mod tests { approximate_bytes: 1, engine: "mito2".to_string(), role: RegionRole::Leader, + extensions: Default::default(), }], ..Default::default() } @@ -231,6 +233,7 @@ mod tests { approximate_bytes: 1, engine: "mito2".to_string(), role: RegionRole::Leader, + extensions: Default::default(), }], ..Default::default() } diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index f4e386a0531a..08414a97e4c5 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -162,7 +162,7 @@ impl RegionEngine for MetricEngine { result.map_err(BoxedError::new).map(|rows| RegionResponse { affected_rows: rows, - extension: extension_return_value, + extensions: extension_return_value, }) } diff --git a/src/operator/src/flow.rs b/src/operator/src/flow.rs index d6344e278d10..1c82fcf00af5 100644 --- a/src/operator/src/flow.rs +++ b/src/operator/src/flow.rs @@ -119,7 +119,7 @@ impl FlowServiceOperator { if let Some(prev) = &mut final_result { prev.affected_rows = res.affected_rows; prev.affected_flows.extend(res.affected_flows); - prev.extension.extend(res.extension); + prev.extensions.extend(res.extensions); } else { final_result = Some(res); }