Skip to content

Commit

Permalink
Feat(pisa-proxy, sharding): add database_table sharding rewrite (#380)
Browse files Browse the repository at this point in the history
* fix(pisa-proxy, sharding): fix don't merge when contains `count` func
Signed-off-by: xuanyuan300 <[email protected]>

* fix(pisa-proxy, sharding): fix count,sum field idx

Signed-off-by: xuanyuan300 <[email protected]>

* fix(pisa-proxy, sharding): fix columns incorrect when contains `avg`

Signed-off-by: xuanyuan300 <[email protected]>

* fix(pisa-proxy, sharding): fix columns when contains avg func.

Signed-off-by: xuanyuan300 <[email protected]>

* fix(pisa-proxy, sharding): fix columns when contains avg func.

Signed-off-by: xuanyuan300 <[email protected]>

* fix(pisa-proxy, sharding): fix columns when contains avg func in prepare.

Signed-off-by: xuanyuan300 <[email protected]>

* fix(pisa-proxy, sharding): fix columns when contains avg func in prepare.

Signed-off-by: xuanyuan300 <[email protected]>

* fix(pisa-proxy, sharding): add get_meta_detail macro to simplify the code
Signed-off-by: xuanyuan300 <[email protected]>

* refactor(pisa-proxy, sharding): WIP refactor sharding rewrite
Signed-off-by: xuanyuan300 <[email protected]>

* refactor(pisa-proxy, sharding): WIP refactor sharding rewrite
Signed-off-by: xuanyuan300 <[email protected]>

* refactor(pisa-proxy, sharding): WIP refactor sharding rewrite

Signed-off-by: xuanyuan300 <[email protected]>

* refactor(pisa-proxy, sharding): WIP refactor sharding rewrite
Signed-off-by: xuanyuan300 <[email protected]>

* refactor(pisa-proxy, sharding): WIP refactor sharding rewrite

Signed-off-by: xuanyuan300 <[email protected]>

* refactor(pisa-proxy, sharding): WIP refactor sharding rewrite

Signed-off-by: xuanyuan300 <[email protected]>

* refactor(pisa-proxy, sharding): WIP refactor sharding rewrite

Signed-off-by: xuanyuan300 <[email protected]>

* chore(pisa-proxy, runtime): remove comments
Signed-off-by: xuanyuan300 <[email protected]>

Signed-off-by: xuanyuan300 <[email protected]>
  • Loading branch information
xuanyuan300 authored Nov 21, 2022
1 parent b0c66d8 commit 1de659f
Show file tree
Hide file tree
Showing 14 changed files with 1,890 additions and 901 deletions.
2 changes: 1 addition & 1 deletion pisa-proxy/protocol/mysql/src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use bytes::{Buf, BufMut, BytesMut};

use crate::{mysql_const::ColumnType, util::{ BufExt, BufMutExt, get_length }};

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct ColumnInfo {
pub schema: Option<String>,
pub table_name: Option<String>,
Expand Down
41 changes: 31 additions & 10 deletions pisa-proxy/protocol/mysql/src/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;
use std::{sync::Arc, str::FromStr};

use crate::{
column::ColumnInfo,
Expand All @@ -37,8 +37,8 @@ pub enum RowDataTyp<T: AsRef<[u8]>> {
pub struct RowPartData {
pub data: Box<[u8]>,
pub start_idx: usize,
pub start_part_idx: usize,
pub end_part_idx: usize,
pub part_encode_length: usize,
pub part_data_length: usize,
}

crate::gen_row_data!(RowDataTyp, Text(RowDataText), Binary(RowDataBinary));
Expand Down Expand Up @@ -83,8 +83,7 @@ impl<T: AsRef<[u8]>> RowData<T> for RowDataText<T> {
fn decode_with_name<V: Value>(&mut self, name: &str) -> value::Result<V> {
let row_data = self.get_row_data_with_name(name)?;
match row_data {
Some(data) => Value::from(&data.data[data.start_part_idx..data.end_part_idx]),

Some(data) => Value::from(&data.data),
_ => Ok(None),
}
}
Expand All @@ -104,10 +103,10 @@ impl<T: AsRef<[u8]>> RowData<T> for RowDataText<T> {

return Ok(Some(
RowPartData {
data: self.buf.as_ref()[idx..idx + (pos + length) as usize].into(),
data: self.buf.as_ref()[idx + pos as usize .. idx + (pos + length) as usize].into(),
start_idx: idx,
start_part_idx: pos as usize,
end_part_idx: (pos + length) as usize,
part_encode_length: pos as usize,
part_data_length: length as usize,
}
));
}
Expand Down Expand Up @@ -239,12 +238,13 @@ impl<T: AsRef<[u8]>> RowData<T> for RowDataBinary<T> {

// Need to add packet header and null_map to returnd data
let raw_data = &self.buf.as_ref()[start_pos + pos as usize..(start_pos + pos as usize + length as usize)];
println!("eeeeeeeeeeeee {:?}", &raw_data[..]);
return Ok(Some(
RowPartData {
data: raw_data.into(),
start_idx: start_pos,
start_part_idx: pos as usize,
end_part_idx: (pos + length) as usize,
part_encode_length: pos as usize,
part_data_length: length as usize,
}
))
}
Expand All @@ -254,6 +254,27 @@ impl<T: AsRef<[u8]>> RowData<T> for RowDataBinary<T> {
}
}


// Box has default 'static bound, use `'e` lifetime relax bound.
pub fn decode_with_name<'e, T: AsRef<[u8]>, V: Value + std::str::FromStr>(row_data: &mut RowDataTyp<T>, name: &str, is_binary: bool) -> Result<Option<V>, Box<dyn std::error::Error + Send + Sync + 'e> >
where
T: AsRef<[u8]>,
V: Value + std::str::FromStr,
<V as FromStr>::Err: std::error::Error + Sync + Send + 'e
{
if is_binary {
row_data.decode_with_name::<V>(name)
} else {
let new_value = row_data.decode_with_name::<String>(name)?;
if let Some(new_value) = new_value {
let new_value = new_value.parse::<V>()?;
Ok(Some(new_value))
} else {
Ok(None)
}
}
}

#[cfg(test)]
mod test {
use std::sync::Arc;
Expand Down
3 changes: 2 additions & 1 deletion pisa-proxy/protocol/mysql/src/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ use chrono::{Duration, NaiveDateTime, NaiveDate, NaiveTime};

use crate::err::DecodeRowError;

pub type Result<T> = std::result::Result<Option<T>, Box<dyn std::error::Error + Send + Sync>>;
type BoxError = Box<dyn std::error::Error + Send + Sync>;
pub type Result<T> = std::result::Result<Option<T>, BoxError>;

pub trait Value: Sized {
type Item: Convert<Self>;
Expand Down
1 change: 1 addition & 0 deletions pisa-proxy/proxy/strategy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ aho-corasick = "0.7.19"
itertools = "0.10.4"
thiserror = "1.0"
crc32fast = "1.3.2"
paste = "1.0.9"
135 changes: 96 additions & 39 deletions pisa-proxy/proxy/strategy/src/sharding_rewrite/generic_meta.rs
Original file line number Diff line number Diff line change
@@ -1,128 +1,185 @@
// Copyright 2022 SphereEx Authors
//
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//
// http://www.apache.org/licenses/LICENSE-2.0
//
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::config::{StrategyType, Sharding, ShardingAlgorithmName};
use endpoint::endpoint::Endpoint;

use crate::config::{Sharding, ShardingAlgorithmName, StrategyType};

#[derive(Debug)]
pub(crate) struct ShardingMetaBaseInfo<'a> {
pub column: (Option<&'a str>, Option<&'a str>),
pub count: (Option<u32>, Option<u32>),
pub algo: (Option<&'a ShardingAlgorithmName>, Option<&'a ShardingAlgorithmName>),
}

pub trait ShardingMeta {
fn get_sharding_column(&self) -> (Option<&str>, Option<&str>);
fn get_algo(&self) -> (Option<&ShardingAlgorithmName>, Option<&ShardingAlgorithmName>);
fn get_sharding_count(&self) -> (Option<u64>, Option<u64>);
fn get_sharding_count(&self) -> (Option<u32>, Option<u32>);
fn get_actual_schema<'a>(
&'a self,
endpoints: &'a [Endpoint],
idx: Option<usize>,
) -> Option<&'a str>;
fn get_endpoint<'a>(
&'a self,
endpoints: &'a [Endpoint],
idx: Option<usize>,
) -> Option<Endpoint>;
fn get_strategy_typ(&self) -> super::StrategyTyp;
}

/// Todo: use macro generate
impl ShardingMeta for Sharding {
fn get_sharding_column(&self) -> (Option<&str>, Option<&str>) {
if let Some(strategy) = &self.database_strategy {
return strategy.get_sharding_column()
return strategy.get_sharding_column();
}

if let Some(strategy) = &self.table_strategy {
return strategy.get_sharding_column()
return strategy.get_sharding_column();
}

if let Some(strategy) = &self.database_table_strategy {
return strategy.get_sharding_column()
return strategy.get_sharding_column();
}

(None, None)
}

fn get_algo(&self) -> (Option<&ShardingAlgorithmName>, Option<&ShardingAlgorithmName>) {
if let Some(strategy) = &self.database_strategy {
return strategy.get_algo()
return strategy.get_algo();
}

if let Some(strategy) = &self.table_strategy {
return strategy.get_algo()
return strategy.get_algo();
}

if let Some(strategy) = &self.database_table_strategy {
return strategy.get_algo()
return strategy.get_algo();
}

(None, None)
}

fn get_sharding_count(&self) -> (Option<u64>, Option<u64>) {
fn get_sharding_count(&self) -> (Option<u32>, Option<u32>) {
if let Some(_) = &self.database_strategy {
return (Some(self.actual_datanodes.len() as u64), None)
return (Some(self.actual_datanodes.len() as u32), None);
}

if let Some(strategy) = &self.table_strategy {
return (None, strategy.get_sharding_count().1)
return (None, strategy.get_sharding_count().1);
}

if let Some(strategy) = &self.database_table_strategy {
return (Some(self.actual_datanodes.len() as u64), strategy.get_sharding_count().1)
return (Some(self.actual_datanodes.len() as u32), strategy.get_sharding_count().1);
}

(None, None)
}

fn get_actual_schema<'a>(
&self,
endpoints: &'a [Endpoint],
idx: Option<usize>,
) -> Option<&'a str> {
if self.database_strategy.is_some() || self.database_table_strategy.is_some() {
let ep = endpoints.iter().find(|ep| ep.name == self.actual_datanodes[idx.unwrap()]);
return ep.map(|x| x.db.as_str());
}

None
}

fn get_endpoint(&self, endpoints: &[Endpoint], idx: Option<usize>) -> Option<Endpoint> {
let idx = if self.table_strategy.is_some() { 0 } else { idx.unwrap() };
endpoints.iter().find(|ep| ep.name == self.actual_datanodes[idx]).map(|x| x.clone())
}

fn get_strategy_typ(&self) -> super::StrategyTyp {
if self.database_strategy.is_some() {
super::StrategyTyp::Database
} else if self.table_strategy.is_some() {
super::StrategyTyp::Table
} else {
super::StrategyTyp::DatabaseTable
}
}
}

impl ShardingMeta for StrategyType {
fn get_sharding_column(&self) -> (Option<&str>, Option<&str>) {
match self {
Self::DatabaseStrategyConfig(config) => {
(Some(&config.database_sharding_column), None)
},
Self::DatabaseStrategyConfig(config) => (Some(&config.database_sharding_column), None),

Self::DatabaseTableStrategyConfig(config) => {
(Some(&config.database_sharding_column), Some(&config.table_sharding_column))
},
}

Self::TableStrategyConfig(config) => {
(None, Some(&config.table_sharding_column))
},
Self::TableStrategyConfig(config) => (None, Some(&config.table_sharding_column)),

_ => (None, None)
_ => (None, None),
}
}

fn get_algo(&self) -> (Option<&ShardingAlgorithmName>, Option<&ShardingAlgorithmName>) {
match self {
Self::DatabaseStrategyConfig(config) => {
(Some(&config.database_sharding_algorithm_name), None)
},
}

Self::DatabaseTableStrategyConfig(config) => {
(Some(&config.database_sharding_algorithm_name), Some(&config.table_sharding_algorithm_name))
},
Self::DatabaseTableStrategyConfig(config) => (
Some(&config.database_sharding_algorithm_name),
Some(&config.table_sharding_algorithm_name),
),

Self::TableStrategyConfig(config) => {
(None, Some(&config.table_sharding_algorithm_name))
},
}

_ => (None, None)
_ => (None, None),
}
}

fn get_sharding_count(&self) -> (Option<u64>, Option<u64>) {
fn get_sharding_count(&self) -> (Option<u32>, Option<u32>) {
match self {
Self::DatabaseStrategyConfig(_) => {
unimplemented!()
},
}

Self::DatabaseTableStrategyConfig(config) => {
(None, Some(config.shading_count.into()))
},
Self::DatabaseTableStrategyConfig(config) => (None, Some(config.shading_count.into())),

Self::TableStrategyConfig(config) => {
(None, Some(config.sharding_count.into()))
},
Self::TableStrategyConfig(config) => (None, Some(config.sharding_count.into())),

_ => (None, None)
_ => (None, None),
}
}
}

fn get_actual_schema<'a>(
&'a self,
_endpoints: &'a [Endpoint],
_idx: Option<usize>,
) -> Option<&'a str> {
None
}

fn get_endpoint(&self, _endpoints: &[Endpoint], _idx: Option<usize>) -> Option<Endpoint> {
None
}

fn get_strategy_typ(&self) -> super::StrategyTyp {
unimplemented!()
}
}
23 changes: 23 additions & 0 deletions pisa-proxy/proxy/strategy/src/sharding_rewrite/macros.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright 2022 SphereEx Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#[macro_export]
macro_rules! get_meta_detail {
($meta:ident, $($meta_typ:ident),*) => {
paste! {
$(let $meta_typ = $meta.[<get_ $meta_typ>]();)*
}

}
}
Loading

0 comments on commit 1de659f

Please sign in to comment.