Skip to content

Commit

Permalink
feat: add ddl client (#1856)
Browse files Browse the repository at this point in the history
* feat: add ddl client

* chore: apply suggestions from CR

* chore: apply suggestions from CR
  • Loading branch information
WenyXu committed Jul 4, 2023
1 parent 884731a commit 000df8c
Show file tree
Hide file tree
Showing 5 changed files with 179 additions and 7 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "63e52dde9e44cac4b1f6c6e6b6bf6368ba3bd323" }
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "7aeaeaba1e0ca6a5c736b6ab2eb63144ae3d284b" }
greptime-proto = { git = "https://github.com/WenyXu/greptime-proto.git", rev = "aab7d9a35900f995f9328c8588781e4d75253cba" }
itertools = "0.10"
parquet = "40.0"
paste = "1.0"
Expand Down
16 changes: 15 additions & 1 deletion src/meta-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod ddl;
mod heartbeat;
mod load_balance;
mod lock;
Expand All @@ -29,6 +30,7 @@ use common_meta::rpc::store::{
RangeRequest, RangeResponse,
};
use common_telemetry::info;
use ddl::Client as DdlClient;
use heartbeat::Client as HeartbeatClient;
use lock::Client as LockClient;
use router::Client as RouterClient;
Expand All @@ -49,6 +51,7 @@ pub struct MetaClientBuilder {
enable_router: bool,
enable_store: bool,
enable_lock: bool,
enable_ddl: bool,
channel_manager: Option<ChannelManager>,
}

Expand Down Expand Up @@ -89,6 +92,13 @@ impl MetaClientBuilder {
}
}

pub fn enable_ddl(self) -> Self {
Self {
enable_ddl: true,
..self
}
}

pub fn channel_manager(self, channel_manager: ChannelManager) -> Self {
Self {
channel_manager: Some(channel_manager),
Expand Down Expand Up @@ -119,7 +129,10 @@ impl MetaClientBuilder {
client.store = Some(StoreClient::new(self.id, self.role, mgr.clone()));
}
if self.enable_lock {
client.lock = Some(LockClient::new(self.id, self.role, mgr));
client.lock = Some(LockClient::new(self.id, self.role, mgr.clone()));
}
if self.enable_ddl {
client.ddl = Some(DdlClient::new(self.id, self.role, mgr));
}

client
Expand All @@ -134,6 +147,7 @@ pub struct MetaClient {
router: Option<RouterClient>,
store: Option<StoreClient>,
lock: Option<LockClient>,
ddl: Option<DdlClient>,
}

impl MetaClient {
Expand Down
145 changes: 145 additions & 0 deletions src/meta-client/src/client/ddl.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use api::v1::meta::ddl_task_client::DdlTaskClient;
use api::v1::meta::{ErrorCode, Role, SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use common_grpc::channel_manager::ChannelManager;
use snafu::{ensure, ResultExt};
use tokio::sync::RwLock;
use tonic::transport::Channel;

use crate::client::heartbeat::Inner as HeartbeatInner;
use crate::client::Id;
use crate::error;
use crate::error::Result;

#[derive(Clone, Debug)]
// TODO(weny): removes this in following PRs.
#[allow(unused)]
pub struct Client {
inner: Arc<RwLock<Inner>>,
}

// TODO(weny): removes this in following PRs.
#[allow(dead_code)]
impl Client {
pub fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self {
let inner = Arc::new(RwLock::new(Inner {
id,
role,
channel_manager: channel_manager.clone(),
heartbeat_inner: HeartbeatInner::new(id, role, channel_manager),
}));

Self { inner }
}

pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
where
U: AsRef<str>,
A: AsRef<[U]>,
{
let mut inner = self.inner.write().await;
inner.start(urls).await
}

pub async fn is_started(&self) -> bool {
let inner = self.inner.read().await;
inner.is_started()
}

pub async fn submit_ddl_task(
&self,
req: SubmitDdlTaskRequest,
) -> Result<SubmitDdlTaskResponse> {
let mut inner = self.inner.write().await;
inner.submit_ddl_task(req).await
}
}

#[derive(Debug)]
// TODO(weny): removes this in following PRs.
#[allow(unused)]
struct Inner {
id: Id,
role: Role,
channel_manager: ChannelManager,
heartbeat_inner: HeartbeatInner,
}

impl Inner {
async fn start<U, A>(&mut self, urls: A) -> Result<()>
where
U: AsRef<str>,
A: AsRef<[U]>,
{
ensure!(
!self.is_started(),
error::IllegalGrpcClientStateSnafu {
err_msg: "Router client already started",
}
);

self.heartbeat_inner.start(urls).await?;
Ok(())
}

fn make_client(&self, addr: impl AsRef<str>) -> Result<DdlTaskClient<Channel>> {
let channel = self
.channel_manager
.get(addr)
.context(error::CreateChannelSnafu)?;

Ok(DdlTaskClient::new(channel))
}

#[inline]
fn is_started(&self) -> bool {
self.heartbeat_inner.is_started()
}

pub async fn submit_ddl_task(
&mut self,
mut req: SubmitDdlTaskRequest,
) -> Result<SubmitDdlTaskResponse> {
req.set_header(self.id, self.role);

loop {
if let Some(leader) = &self.heartbeat_inner.get_leader() {
let mut client = self.make_client(leader)?;
let res = client
.submit_ddl_task(req.clone())
.await
.context(error::TonicStatusSnafu)?;

let res = res.into_inner();

if let Some(header) = res.header.as_ref() {
if let Some(err) = header.error.as_ref() {
if err.code == ErrorCode::NotLeader as i32 {
self.heartbeat_inner.ask_leader().await?;
continue;
}
}
}

return Ok(res);
} else if let Err(err) = self.heartbeat_inner.ask_leader().await {
return Err(err);
}
}
}
}
21 changes: 17 additions & 4 deletions src/meta-client/src/client/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl Client {
}

#[derive(Debug)]
struct Inner {
pub(crate) struct Inner {
id: Id,
role: Role,
channel_manager: ChannelManager,
Expand All @@ -142,7 +142,16 @@ struct Inner {
}

impl Inner {
async fn start<U, A>(&mut self, urls: A) -> Result<()>
pub(crate) fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self {
Self {
id,
role,
channel_manager,
peers: HashSet::new(),
leader: None,
}
}
pub(crate) async fn start<U, A>(&mut self, urls: A) -> Result<()>
where
U: AsRef<str>,
A: AsRef<[U]>,
Expand All @@ -163,7 +172,11 @@ impl Inner {
Ok(())
}

async fn ask_leader(&mut self) -> Result<()> {
pub(crate) fn get_leader(&self) -> Option<String> {
self.leader.clone()
}

pub(crate) async fn ask_leader(&mut self) -> Result<()> {
ensure!(
self.is_started(),
error::IllegalGrpcClientStateSnafu {
Expand Down Expand Up @@ -242,7 +255,7 @@ impl Inner {
}

#[inline]
fn is_started(&self) -> bool {
pub(crate) fn is_started(&self) -> bool {
!self.peers.is_empty()
}
}
Expand Down

0 comments on commit 000df8c

Please sign in to comment.