Skip to content

Commit

Permalink
fix: block when stream insert (#1835)
Browse files Browse the repository at this point in the history
* fix: stream insert blocking

* fix: example link

* chore: Increase the default channel size "1024" -> "65536"
  • Loading branch information
fengys1996 authored Jun 27, 2023
1 parent fcff66e commit 313121f
Show file tree
Hide file tree
Showing 6 changed files with 319 additions and 27 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ tonic.workspace = true

[dev-dependencies]
datanode = { path = "../datanode" }
derive-new = "0.5"
substrait = { path = "../common/substrait" }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
Expand Down
183 changes: 183 additions & 0 deletions src/client/examples/stream_ingest.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use api::v1::column::*;
use api::v1::*;
use client::{Client, Database, DEFAULT_SCHEMA_NAME};
use derive_new::new;
use tracing::{error, info};

fn main() {
tracing::subscriber::set_global_default(tracing_subscriber::FmtSubscriber::builder().finish())
.unwrap();

run();
}

#[tokio::main]
async fn run() {
let greptimedb_endpoint =
std::env::var("GREPTIMEDB_ENDPOINT").unwrap_or_else(|_| "localhost:4001".to_owned());

let greptimedb_dbname =
std::env::var("GREPTIMEDB_DBNAME").unwrap_or_else(|_| DEFAULT_SCHEMA_NAME.to_owned());

let grpc_client = Client::with_urls(vec![&greptimedb_endpoint]);

let client = Database::new_with_dbname(greptimedb_dbname, grpc_client);

let stream_inserter = client.streaming_inserter().unwrap();

if let Err(e) = stream_inserter
.insert(vec![to_insert_request(weather_records_1())])
.await
{
error!("Error: {e}");
}

if let Err(e) = stream_inserter
.insert(vec![to_insert_request(weather_records_2())])
.await
{
error!("Error: {e}");
}

let result = stream_inserter.finish().await;

match result {
Ok(rows) => {
info!("Rows written: {rows}");
}
Err(e) => {
error!("Error: {e}");
}
};
}

#[derive(new)]
struct WeatherRecord {
timestamp_millis: i64,
collector: String,
temperature: f32,
humidity: i32,
}

fn weather_records_1() -> Vec<WeatherRecord> {
vec![
WeatherRecord::new(1686109527000, "c1".to_owned(), 26.4, 15),
WeatherRecord::new(1686023127000, "c1".to_owned(), 29.3, 20),
WeatherRecord::new(1685936727000, "c1".to_owned(), 31.8, 13),
WeatherRecord::new(1686109527000, "c2".to_owned(), 20.4, 67),
WeatherRecord::new(1686023127000, "c2".to_owned(), 18.0, 74),
WeatherRecord::new(1685936727000, "c2".to_owned(), 19.2, 81),
]
}

fn weather_records_2() -> Vec<WeatherRecord> {
vec![
WeatherRecord::new(1686109527001, "c3".to_owned(), 26.4, 15),
WeatherRecord::new(1686023127002, "c3".to_owned(), 29.3, 20),
WeatherRecord::new(1685936727003, "c3".to_owned(), 31.8, 13),
WeatherRecord::new(1686109527004, "c4".to_owned(), 20.4, 67),
WeatherRecord::new(1686023127005, "c4".to_owned(), 18.0, 74),
WeatherRecord::new(1685936727006, "c4".to_owned(), 19.2, 81),
]
}

/// This function generates some random data and bundle them into a
/// `InsertRequest`.
///
/// Data structure:
///
/// - `ts`: a timestamp column
/// - `collector`: a tag column
/// - `temperature`: a value field of f32
/// - `humidity`: a value field of i32
///
fn to_insert_request(records: Vec<WeatherRecord>) -> InsertRequest {
// convert records into columns
let rows = records.len();

// transpose records into columns
let (timestamp_millis, collectors, temp, humidity) = records.into_iter().fold(
(
Vec::with_capacity(rows),
Vec::with_capacity(rows),
Vec::with_capacity(rows),
Vec::with_capacity(rows),
),
|mut acc, rec| {
acc.0.push(rec.timestamp_millis);
acc.1.push(rec.collector);
acc.2.push(rec.temperature);
acc.3.push(rec.humidity);

acc
},
);

let columns = vec![
// timestamp column: `ts`
Column {
column_name: "ts".to_owned(),
values: Some(column::Values {
ts_millisecond_values: timestamp_millis,
..Default::default()
}),
semantic_type: SemanticType::Timestamp as i32,
datatype: ColumnDataType::TimestampMillisecond as i32,
..Default::default()
},
// tag column: collectors
Column {
column_name: "collector".to_owned(),
values: Some(column::Values {
string_values: collectors.into_iter().collect(),
..Default::default()
}),
semantic_type: SemanticType::Tag as i32,
datatype: ColumnDataType::String as i32,
..Default::default()
},
// field column: temperature
Column {
column_name: "temperature".to_owned(),
values: Some(column::Values {
f32_values: temp,
..Default::default()
}),
semantic_type: SemanticType::Field as i32,
datatype: ColumnDataType::Float32 as i32,
..Default::default()
},
// field column: humidity
Column {
column_name: "humidity".to_owned(),
values: Some(column::Values {
i32_values: humidity,
..Default::default()
}),
semantic_type: SemanticType::Field as i32,
datatype: ColumnDataType::Int32 as i32,
..Default::default()
},
];

InsertRequest {
table_name: "weather_demo".to_owned(),
columns,
row_count: rows as u32,
..Default::default()
}
}
44 changes: 17 additions & 27 deletions src/client/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,11 @@ use common_telemetry::{logging, timer};
use futures_util::{TryFutureExt, TryStreamExt};
use prost::Message;
use snafu::{ensure, ResultExt};
use tokio::sync::mpsc::Sender;
use tokio::sync::{mpsc, OnceCell};
use tokio_stream::wrappers::ReceiverStream;

use crate::error::{
ConvertFlightDataSnafu, IllegalDatabaseResponseSnafu, IllegalFlightMessagesSnafu,
};
use crate::{error, metrics, Client, Result};
use crate::{error, metrics, Client, Result, StreamInserter};

#[derive(Clone, Debug, Default)]
pub struct Database {
Expand All @@ -50,7 +47,6 @@ pub struct Database {
dbname: String,

client: Client,
streaming_client: OnceCell<Sender<GreptimeRequest>>,
ctx: FlightContext,
}

Expand All @@ -62,7 +58,6 @@ impl Database {
schema: schema.into(),
dbname: "".to_string(),
client,
streaming_client: OnceCell::new(),
ctx: FlightContext::default(),
}
}
Expand All @@ -80,7 +75,6 @@ impl Database {
schema: "".to_string(),
dbname: dbname.into(),
client,
streaming_client: OnceCell::new(),
ctx: FlightContext::default(),
}
}
Expand Down Expand Up @@ -120,20 +114,24 @@ impl Database {
self.handle(Request::Inserts(requests)).await
}

pub async fn insert_to_stream(&self, requests: InsertRequests) -> Result<()> {
let streaming_client = self
.streaming_client
.get_or_try_init(|| self.client_stream())
.await?;
pub fn streaming_inserter(&self) -> Result<StreamInserter> {
self.streaming_inserter_with_channel_size(65536)
}

pub fn streaming_inserter_with_channel_size(
&self,
channel_size: usize,
) -> Result<StreamInserter> {
let client = self.client.make_database_client()?.inner;

let request = self.to_rpc_request(Request::Inserts(requests));
let stream_inserter = StreamInserter::new(
client,
self.dbname().to_string(),
self.ctx.auth_header.clone(),
channel_size,
);

streaming_client.send(request).await.map_err(|e| {
error::ClientStreamingSnafu {
err_msg: e.to_string(),
}
.build()
})
Ok(stream_inserter)
}

pub async fn delete(&self, request: DeleteRequest) -> Result<u32> {
Expand Down Expand Up @@ -169,14 +167,6 @@ impl Database {
}
}

async fn client_stream(&self) -> Result<Sender<GreptimeRequest>> {
let mut client = self.client.make_database_client()?.inner;
let (sender, receiver) = mpsc::channel::<GreptimeRequest>(65536);
let receiver = ReceiverStream::new(receiver);
let _ = client.handle_requests(receiver).await?;
Ok(sender)
}

pub async fn sql(&self, sql: &str) -> Result<Output> {
let _timer = timer!(metrics::METRIC_GRPC_SQL);
self.do_get(Request::Query(QueryRequest {
Expand Down
2 changes: 2 additions & 0 deletions src/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ mod database;
mod error;
pub mod load_balance;
mod metrics;
mod stream_insert;

pub use api;
pub use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};

pub use self::client::Client;
pub use self::database::Database;
pub use self::error::{Error, Result};
pub use self::stream_insert::StreamInserter;
Loading

0 comments on commit 313121f

Please sign in to comment.