Skip to content

Commit

Permalink
feat: add extension field to HeartbeatRequest (GreptimeTeam#4688)
Browse files Browse the repository at this point in the history
* feat: add extension field to HeartbeatRequest

* chore: extension to extensions

* chore: upgrade proto
  • Loading branch information
fengjiachun authored Sep 6, 2024
1 parent 67d95d2 commit e884658
Show file tree
Hide file tree
Showing 14 changed files with 26 additions and 16 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ etcd-client = { version = "0.13" }
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "c437b55725b7f5224fe9d46db21072b4a682ee4b" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "157cfdb52709e489cf1f3ce8e3042ed4ee8a524a" }
humantime = "2.1"
humantime-serde = "1.1"
itertools = "0.10"
Expand Down
6 changes: 3 additions & 3 deletions src/api/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,22 @@ use greptime_proto::v1::region::RegionResponse as RegionResponseV1;
#[derive(Debug)]
pub struct RegionResponse {
pub affected_rows: AffectedRows,
pub extension: HashMap<String, Vec<u8>>,
pub extensions: HashMap<String, Vec<u8>>,
}

impl RegionResponse {
pub fn from_region_response(region_response: RegionResponseV1) -> Self {
Self {
affected_rows: region_response.affected_rows as _,
extension: region_response.extension,
extensions: region_response.extensions,
}
}

/// Creates one response without extension
pub fn new(affected_rows: AffectedRows) -> Self {
Self {
affected_rows,
extension: Default::default(),
extensions: Default::default(),
}
}
}
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl/alter_logical_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl AlterLogicalTablesProcedure {
let phy_raw_schemas = future::join_all(alter_region_tasks)
.await
.into_iter()
.map(|res| res.map(|mut res| res.extension.remove(ALTER_PHYSICAL_EXTENSION_KEY)))
.map(|res| res.map(|mut res| res.extensions.remove(ALTER_PHYSICAL_EXTENSION_KEY)))
.collect::<Result<Vec<_>>>()?;

if phy_raw_schemas.is_empty() {
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl/create_logical_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ impl CreateLogicalTablesProcedure {
let phy_raw_schemas = join_all(create_region_tasks)
.await
.into_iter()
.map(|res| res.map(|mut res| res.extension.remove(ALTER_PHYSICAL_EXTENSION_KEY)))
.map(|res| res.map(|mut res| res.extensions.remove(ALTER_PHYSICAL_EXTENSION_KEY)))
.collect::<Result<Vec<_>>>()?;

if phy_raw_schemas.is_empty() {
Expand Down
4 changes: 3 additions & 1 deletion src/datanode/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,10 +324,12 @@ impl HeartbeatTask {
region_id: stat.region_id.as_u64(),
engine: stat.engine,
role: RegionRole::from(stat.role).into(),
// TODO(jeremy): w/rcus
// TODO(weny): w/rcus
rcus: 0,
wcus: 0,
approximate_bytes: region_server.region_disk_usage(stat.region_id).unwrap_or(0),
// TODO(weny): add extensions
extensions: Default::default(),
})
.collect()
}
Expand Down
8 changes: 4 additions & 4 deletions src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,10 +366,10 @@ impl RegionServerHandler for RegionServer {

// merge results by sum up affected rows and merge extensions.
let mut affected_rows = 0;
let mut extension = HashMap::new();
let mut extensions = HashMap::new();
for result in results {
affected_rows += result.affected_rows;
extension.extend(result.extension);
extensions.extend(result.extensions);
}

Ok(RegionResponseV1 {
Expand All @@ -380,7 +380,7 @@ impl RegionServerHandler for RegionServer {
}),
}),
affected_rows: affected_rows as _,
extension,
extensions,
})
}
}
Expand Down Expand Up @@ -708,7 +708,7 @@ impl RegionServerInner {
.await?;
Ok(RegionResponse {
affected_rows: result.affected_rows,
extension: result.extension,
extensions: result.extensions,
})
}
Err(err) => {
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/src/handler/failure_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ mod tests {
approximate_bytes: 0,
engine: default_engine().to_string(),
role: RegionRole::Follower,
extensions: Default::default(),
}
}
acc.stat = Some(Stat {
Expand Down
5 changes: 4 additions & 1 deletion src/meta-srv/src/handler/node_stat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;
use std::collections::{HashMap, HashSet};

use api::v1::meta::HeartbeatRequest;
use common_meta::ClusterId;
Expand Down Expand Up @@ -57,6 +57,8 @@ pub struct RegionStat {
pub engine: String,
/// The region role.
pub role: RegionRole,
/// The extension info of this region
pub extensions: HashMap<String, Vec<u8>>,
}

impl Stat {
Expand Down Expand Up @@ -142,6 +144,7 @@ impl TryFrom<api::v1::meta::RegionStat> for RegionStat {
approximate_bytes: value.approximate_bytes,
engine: value.engine.to_string(),
role: RegionRole::from(value.role()),
extensions: value.extensions,
})
}
}
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/src/handler/region_lease_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ mod test {
wcus: 0,
approximate_bytes: 0,
engine: String::new(),
extensions: Default::default(),
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/meta-srv/src/procedure/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ pub mod mock {
}),
}),
affected_rows: 0,
extension: Default::default(),
extensions: Default::default(),
})
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/meta-srv/src/selector/weight_compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ mod tests {
approximate_bytes: 1,
engine: "mito2".to_string(),
role: RegionRole::Leader,
extensions: Default::default(),
}],
..Default::default()
}
Expand All @@ -215,6 +216,7 @@ mod tests {
approximate_bytes: 1,
engine: "mito2".to_string(),
role: RegionRole::Leader,
extensions: Default::default(),
}],
..Default::default()
}
Expand All @@ -231,6 +233,7 @@ mod tests {
approximate_bytes: 1,
engine: "mito2".to_string(),
role: RegionRole::Leader,
extensions: Default::default(),
}],
..Default::default()
}
Expand Down
2 changes: 1 addition & 1 deletion src/metric-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ impl RegionEngine for MetricEngine {

result.map_err(BoxedError::new).map(|rows| RegionResponse {
affected_rows: rows,
extension: extension_return_value,
extensions: extension_return_value,
})
}

Expand Down
2 changes: 1 addition & 1 deletion src/operator/src/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl FlowServiceOperator {
if let Some(prev) = &mut final_result {
prev.affected_rows = res.affected_rows;
prev.affected_flows.extend(res.affected_flows);
prev.extension.extend(res.extension);
prev.extensions.extend(res.extensions);
} else {
final_result = Some(res);
}
Expand Down

0 comments on commit e884658

Please sign in to comment.