From 000df8cf1e37828845f615ce0f1d1a92a5fc5d6a Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Tue, 4 Jul 2023 20:32:02 +0900 Subject: [PATCH] feat: add ddl client (#1856) * feat: add ddl client * chore: apply suggestions from CR * chore: apply suggestions from CR --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/meta-client/src/client.rs | 16 ++- src/meta-client/src/client/ddl.rs | 145 ++++++++++++++++++++++++ src/meta-client/src/client/heartbeat.rs | 21 +++- 5 files changed, 179 insertions(+), 7 deletions(-) create mode 100644 src/meta-client/src/client/ddl.rs diff --git a/Cargo.lock b/Cargo.lock index 5c407f4d7d84..cf6085bdd8eb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4108,7 +4108,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=7aeaeaba1e0ca6a5c736b6ab2eb63144ae3d284b#7aeaeaba1e0ca6a5c736b6ab2eb63144ae3d284b" +source = "git+https://github.com/WenyXu/greptime-proto.git?rev=aab7d9a35900f995f9328c8588781e4d75253cba#aab7d9a35900f995f9328c8588781e4d75253cba" dependencies = [ "prost", "serde", diff --git a/Cargo.toml b/Cargo.toml index be1554ce6795..415cc378d446 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -73,7 +73,7 @@ datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "63e52dde9e44cac4b1f6c6e6b6bf6368ba3bd323" } futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "7aeaeaba1e0ca6a5c736b6ab2eb63144ae3d284b" } +greptime-proto = { git = "https://github.com/WenyXu/greptime-proto.git", rev = "aab7d9a35900f995f9328c8588781e4d75253cba" } itertools = "0.10" parquet = "40.0" paste = "1.0" diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index 85a5051d8b0d..efd8fe6ec5cf 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod ddl; mod heartbeat; mod load_balance; mod lock; @@ -29,6 +30,7 @@ use common_meta::rpc::store::{ RangeRequest, RangeResponse, }; use common_telemetry::info; +use ddl::Client as DdlClient; use heartbeat::Client as HeartbeatClient; use lock::Client as LockClient; use router::Client as RouterClient; @@ -49,6 +51,7 @@ pub struct MetaClientBuilder { enable_router: bool, enable_store: bool, enable_lock: bool, + enable_ddl: bool, channel_manager: Option, } @@ -89,6 +92,13 @@ impl MetaClientBuilder { } } + pub fn enable_ddl(self) -> Self { + Self { + enable_ddl: true, + ..self + } + } + pub fn channel_manager(self, channel_manager: ChannelManager) -> Self { Self { channel_manager: Some(channel_manager), @@ -119,7 +129,10 @@ impl MetaClientBuilder { client.store = Some(StoreClient::new(self.id, self.role, mgr.clone())); } if self.enable_lock { - client.lock = Some(LockClient::new(self.id, self.role, mgr)); + client.lock = Some(LockClient::new(self.id, self.role, mgr.clone())); + } + if self.enable_ddl { + client.ddl = Some(DdlClient::new(self.id, self.role, mgr)); } client @@ -134,6 +147,7 @@ pub struct MetaClient { router: Option, store: Option, lock: Option, + ddl: Option, } impl MetaClient { diff --git a/src/meta-client/src/client/ddl.rs b/src/meta-client/src/client/ddl.rs new file mode 100644 index 000000000000..510e0e6f2f8f --- /dev/null +++ b/src/meta-client/src/client/ddl.rs @@ -0,0 +1,145 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use api::v1::meta::ddl_task_client::DdlTaskClient; +use api::v1::meta::{ErrorCode, Role, SubmitDdlTaskRequest, SubmitDdlTaskResponse}; +use common_grpc::channel_manager::ChannelManager; +use snafu::{ensure, ResultExt}; +use tokio::sync::RwLock; +use tonic::transport::Channel; + +use crate::client::heartbeat::Inner as HeartbeatInner; +use crate::client::Id; +use crate::error; +use crate::error::Result; + +#[derive(Clone, Debug)] +// TODO(weny): removes this in following PRs. +#[allow(unused)] +pub struct Client { + inner: Arc>, +} + +// TODO(weny): removes this in following PRs. +#[allow(dead_code)] +impl Client { + pub fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self { + let inner = Arc::new(RwLock::new(Inner { + id, + role, + channel_manager: channel_manager.clone(), + heartbeat_inner: HeartbeatInner::new(id, role, channel_manager), + })); + + Self { inner } + } + + pub async fn start(&mut self, urls: A) -> Result<()> + where + U: AsRef, + A: AsRef<[U]>, + { + let mut inner = self.inner.write().await; + inner.start(urls).await + } + + pub async fn is_started(&self) -> bool { + let inner = self.inner.read().await; + inner.is_started() + } + + pub async fn submit_ddl_task( + &self, + req: SubmitDdlTaskRequest, + ) -> Result { + let mut inner = self.inner.write().await; + inner.submit_ddl_task(req).await + } +} + +#[derive(Debug)] +// TODO(weny): removes this in following PRs. +#[allow(unused)] +struct Inner { + id: Id, + role: Role, + channel_manager: ChannelManager, + heartbeat_inner: HeartbeatInner, +} + +impl Inner { + async fn start(&mut self, urls: A) -> Result<()> + where + U: AsRef, + A: AsRef<[U]>, + { + ensure!( + !self.is_started(), + error::IllegalGrpcClientStateSnafu { + err_msg: "Router client already started", + } + ); + + self.heartbeat_inner.start(urls).await?; + Ok(()) + } + + fn make_client(&self, addr: impl AsRef) -> Result> { + let channel = self + .channel_manager + .get(addr) + .context(error::CreateChannelSnafu)?; + + Ok(DdlTaskClient::new(channel)) + } + + #[inline] + fn is_started(&self) -> bool { + self.heartbeat_inner.is_started() + } + + pub async fn submit_ddl_task( + &mut self, + mut req: SubmitDdlTaskRequest, + ) -> Result { + req.set_header(self.id, self.role); + + loop { + if let Some(leader) = &self.heartbeat_inner.get_leader() { + let mut client = self.make_client(leader)?; + let res = client + .submit_ddl_task(req.clone()) + .await + .context(error::TonicStatusSnafu)?; + + let res = res.into_inner(); + + if let Some(header) = res.header.as_ref() { + if let Some(err) = header.error.as_ref() { + if err.code == ErrorCode::NotLeader as i32 { + self.heartbeat_inner.ask_leader().await?; + continue; + } + } + } + + return Ok(res); + } else if let Err(err) = self.heartbeat_inner.ask_leader().await { + return Err(err); + } + } + } +} diff --git a/src/meta-client/src/client/heartbeat.rs b/src/meta-client/src/client/heartbeat.rs index 1b563bc691b8..108ca85de110 100644 --- a/src/meta-client/src/client/heartbeat.rs +++ b/src/meta-client/src/client/heartbeat.rs @@ -133,7 +133,7 @@ impl Client { } #[derive(Debug)] -struct Inner { +pub(crate) struct Inner { id: Id, role: Role, channel_manager: ChannelManager, @@ -142,7 +142,16 @@ struct Inner { } impl Inner { - async fn start(&mut self, urls: A) -> Result<()> + pub(crate) fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self { + Self { + id, + role, + channel_manager, + peers: HashSet::new(), + leader: None, + } + } + pub(crate) async fn start(&mut self, urls: A) -> Result<()> where U: AsRef, A: AsRef<[U]>, @@ -163,7 +172,11 @@ impl Inner { Ok(()) } - async fn ask_leader(&mut self) -> Result<()> { + pub(crate) fn get_leader(&self) -> Option { + self.leader.clone() + } + + pub(crate) async fn ask_leader(&mut self) -> Result<()> { ensure!( self.is_started(), error::IllegalGrpcClientStateSnafu { @@ -242,7 +255,7 @@ impl Inner { } #[inline] - fn is_started(&self) -> bool { + pub(crate) fn is_started(&self) -> bool { !self.peers.is_empty() } }