Skip to content

Commit

Permalink
feat(flow): query table schema&refactor (#3943)
Browse files Browse the repository at this point in the history
* feat: get table info

* feat: remove new&unwrap

* chore: per PR advices

* chore: per review
  • Loading branch information
discord9 authored May 15, 2024
1 parent cfae276 commit 97eb196
Show file tree
Hide file tree
Showing 9 changed files with 316 additions and 76 deletions.
1 change: 1 addition & 0 deletions src/common/meta/src/key/table_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ impl TableInfoValue {
}

pub type TableInfoManagerRef = Arc<TableInfoManager>;
#[derive(Clone)]
pub struct TableInfoManager {
kv_backend: KvBackendRef,
}
Expand Down
9 changes: 8 additions & 1 deletion src/flow/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,16 @@

pub(crate) mod error;
pub(crate) mod node_context;
mod table_source;
mod util;

pub(crate) use node_context::{FlowId, FlownodeContext, TableName};
pub(crate) use node_context::FlownodeContext;
pub(crate) use table_source::TableSource;

mod worker;

pub const PER_REQ_MAX_ROW_CNT: usize = 8192;
// TODO: refactor common types for flow to a separate module
/// FlowId is a unique identifier for a flow task
pub type FlowId = u32;
pub type TableName = [String; 3];
18 changes: 1 addition & 17 deletions src/flow/src/adapter/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,27 +98,13 @@ pub enum Error {
location: Location,
},

#[snafu(display("Invalid query plan: {source}"))]
InvalidQueryPlan {
source: query::error::Error,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Invalid query: prost can't decode substrait plan: {inner}"))]
InvalidQueryProst {
inner: api::DecodeError,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Invalid query, can't transform to substrait: {source}"))]
InvalidQuerySubstrait {
source: substrait::error::Error,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Invalid query: {reason}"))]
InvalidQuery {
reason: String,
Expand Down Expand Up @@ -193,9 +179,7 @@ impl ErrorExt for Error {
Self::TableNotFound { .. }
| Self::TableNotFoundMeta { .. }
| Self::FlowNotFound { .. } => StatusCode::TableNotFound,
Self::InvalidQueryPlan { .. }
| Self::InvalidQuerySubstrait { .. }
| Self::InvalidQueryProst { .. }
Self::InvalidQueryProst { .. }
| &Self::InvalidQuery { .. }
| &Self::Plan { .. }
| &Self::Datatypes { .. } => StatusCode::PlanQuery,
Expand Down
134 changes: 84 additions & 50 deletions src/flow/src/adapter/node_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,18 @@
use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
use std::sync::Arc;

use common_telemetry::debug;
use session::context::QueryContext;
use snafu::{OptionExt, ResultExt};
use table::metadata::TableId;
use tokio::sync::{broadcast, mpsc};

use crate::adapter::error::{Error, EvalSnafu, TableNotFoundSnafu};
use crate::adapter::{FlowId, TableName, TableSource};
use crate::expr::error::InternalSnafu;
use crate::expr::GlobalId;
use crate::repr::{DiffRow, RelationType, BROADCAST_CAP};

// TODO: refactor common types for flow to a separate module
/// FlowId is a unique identifier for a flow task
pub type FlowId = u64;
pub type TableName = [String; 3];

pub struct TableSource {}

impl TableSource {
pub async fn get_table_name_schema(
&self,
_table_id: &TableId,
) -> Result<(TableName, RelationType), Error> {
todo!()
}
}

/// A context that holds the information of the dataflow
#[derive(Default)]
pub struct FlownodeContext {
Expand All @@ -53,7 +39,7 @@ pub struct FlownodeContext {
/// broadcast sender for source table, any incoming write request will be sent to the source table's corresponding sender
///
/// Note that we are getting insert requests with table id, so we should use table id as the key
pub source_sender: BTreeMap<TableId, broadcast::Sender<DiffRow>>,
pub source_sender: BTreeMap<TableId, SourceSender>,
/// broadcast receiver for sink table, there should only be one receiver, and it will receive all the data from the sink table
///
/// and send it back to the client, since we are mocking the sink table as a client, we should use table name as the key
Expand All @@ -74,38 +60,90 @@ pub struct FlownodeContext {
pub query_context: Option<Arc<QueryContext>>,
}

/// a simple broadcast sender with backpressure and unbound capacity
///
/// receiver still use tokio broadcast channel, since only sender side need to know
/// backpressure and adjust dataflow running duration to avoid blocking
pub struct SourceSender {
sender: broadcast::Sender<DiffRow>,
send_buf: VecDeque<DiffRow>,
}

impl Default for SourceSender {
fn default() -> Self {
Self {
sender: broadcast::Sender::new(BROADCAST_CAP),
send_buf: Default::default(),
}
}
}

impl SourceSender {
pub fn get_receiver(&self) -> broadcast::Receiver<DiffRow> {
self.sender.subscribe()
}

/// send as many as possible rows from send buf
/// until send buf is empty or broadchannel is full
pub fn try_send_all(&mut self) -> Result<usize, Error> {
let mut row_cnt = 0;
loop {
// if inner sender channel is empty or send buf is empty, there
// is nothing to do for now, just break
if self.sender.len() >= BROADCAST_CAP || self.send_buf.is_empty() {
break;
}
if let Some(row) = self.send_buf.pop_front() {
self.sender
.send(row)
.map_err(|err| {
InternalSnafu {
reason: format!("Failed to send row, error = {:?}", err),
}
.build()
})
.with_context(|_| EvalSnafu)?;
row_cnt += 1;
}
}
if row_cnt > 0 {
debug!("Send {} rows", row_cnt);
}

Ok(row_cnt)
}

/// return number of rows it actual send(including what's in the buffer)
pub fn send_rows(&mut self, rows: Vec<DiffRow>) -> Result<usize, Error> {
self.send_buf.extend(rows);

let row_cnt = self.try_send_all()?;

Ok(row_cnt)
}
}

impl FlownodeContext {
// return number of rows it actual send(including what's in the buffer)
/// return number of rows it actual send(including what's in the buffer)
///
/// TODO(discord9): make this concurrent
pub fn send(&mut self, table_id: TableId, rows: Vec<DiffRow>) -> Result<usize, Error> {
let sender = self
.source_sender
.get(&table_id)
.get_mut(&table_id)
.with_context(|| TableNotFoundSnafu {
name: table_id.to_string(),
})?;
let send_buffer = self.send_buffer.entry(table_id).or_default();
send_buffer.extend(rows);
let mut row_cnt = 0;
while let Some(row) = send_buffer.pop_front() {
if sender.len() >= BROADCAST_CAP {
break;
}
row_cnt += 1;
sender
.send(row)
.map_err(|err| {
InternalSnafu {
reason: format!(
"Failed to send row to table_id = {:?}, error = {:?}",
table_id, err
),
}
.build()
})
.with_context(|_| EvalSnafu)?;
}
// debug!("FlownodeContext::send: trying to send {} rows", rows.len());
sender.send_rows(rows)
}

Ok(row_cnt)
/// flush all sender's buf
pub fn flush_all_sender(&mut self) -> Result<usize, Error> {
self.source_sender
.iter_mut()
.map(|(_table_id, src_sender)| src_sender.try_send_all())
.try_fold(0, |acc, x| x.map(|x| x + acc))
}
}

Expand All @@ -120,7 +158,7 @@ impl FlownodeContext {
sink_table_name: TableName,
) {
for source_table_id in source_table_ids {
self.add_source_sender(*source_table_id);
self.add_source_sender_if_not_exist(*source_table_id);
self.source_to_tasks
.entry(*source_table_id)
.or_default()
Expand All @@ -131,10 +169,9 @@ impl FlownodeContext {
self.flow_to_sink.insert(task_id, sink_table_name);
}

pub fn add_source_sender(&mut self, table_id: TableId) {
self.source_sender
.entry(table_id)
.or_insert_with(|| broadcast::channel(BROADCAST_CAP).0);
/// try add source sender, if already exist, do nothing
pub fn add_source_sender_if_not_exist(&mut self, table_id: TableId) {
let _sender = self.source_sender.entry(table_id).or_default();
}

pub fn add_sink_receiver(&mut self, table_name: TableName) {
Expand All @@ -143,10 +180,7 @@ impl FlownodeContext {
.or_insert_with(mpsc::unbounded_channel::<DiffRow>);
}

pub fn get_source_by_global_id(
&self,
id: &GlobalId,
) -> Result<&broadcast::Sender<DiffRow>, Error> {
pub fn get_source_by_global_id(&self, id: &GlobalId) -> Result<&SourceSender, Error> {
let table_id = self
.table_repr
.get_by_global_id(id)
Expand Down
Loading

0 comments on commit 97eb196

Please sign in to comment.