Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: add http write metrics #1045

Merged
merged 3 commits into from
Jul 6, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion proxy/src/grpc/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.

mod metrics;
mod prom_query;
mod route;
mod sql_query;
Expand Down
2 changes: 1 addition & 1 deletion proxy/src/grpc/sql_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use tonic::{transport::Channel, IntoRequest};
use crate::{
error::{self, ErrNoCause, ErrWithCause, Error, Result},
forward::{ForwardRequest, ForwardResult},
grpc::metrics::GRPC_HANDLER_COUNTER_VEC,
metrics::GRPC_HANDLER_COUNTER_VEC,
read::SqlResponse,
Context, Proxy,
};
Expand Down
4 changes: 1 addition & 3 deletions proxy/src/grpc/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@
use ceresdbproto::storage::{WriteRequest, WriteResponse};
use query_engine::executor::Executor as QueryExecutor;

use crate::{
error, error::build_ok_header, grpc::metrics::GRPC_HANDLER_COUNTER_VEC, Context, Proxy,
};
use crate::{error, error::build_ok_header, metrics::GRPC_HANDLER_COUNTER_VEC, Context, Proxy};

impl<Q: QueryExecutor + 'static> Proxy<Q> {
pub async fn handle_write(&self, ctx: Context, req: WriteRequest) -> WriteResponse {
Expand Down
39 changes: 31 additions & 8 deletions proxy/src/http/prom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use crate::{
context::RequestContext,
error::{build_ok_header, ErrNoCause, ErrWithCause, Error, Internal, InternalNoCause, Result},
forward::ForwardResult,
metrics::HTTP_HANDLER_COUNTER_VEC,
Context as ProxyContext, Proxy,
};

Expand All @@ -52,6 +53,14 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
/// Handle write samples to remote storage with remote storage protocol.
async fn handle_prom_write(&self, ctx: RequestContext, req: WriteRequest) -> Result<()> {
let write_table_requests = convert_write_request(req)?;

let mut num_rows = 0;
for write_table_request in &write_table_requests {
for entry in &write_table_request.entries {
num_rows += entry.field_groups.len();
}
}

let table_request = GrpcWriteRequest {
context: Some(GrpcRequestContext {
database: ctx.schema.clone(),
Expand All @@ -65,16 +74,30 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
forwarded_from: None,
};

let result = self.handle_write_internal(ctx, table_request).await?;
if result.failed != 0 {
ErrNoCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: format!("fail to write storage, failed rows:{:?}", result.failed),
match self.handle_write_internal(ctx, table_request).await {
Ok(result) => {
if result.failed != 0 {
HTTP_HANDLER_COUNTER_VEC.write_failed.inc();
HTTP_HANDLER_COUNTER_VEC
.write_failed_row
.inc_by(result.failed as u64);
ErrNoCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: format!("fail to write storage, failed rows:{:?}", result.failed),
}
.fail()?;
}

Ok(())
}
Err(e) => {
HTTP_HANDLER_COUNTER_VEC.write_failed.inc();
HTTP_HANDLER_COUNTER_VEC
.write_failed_row
.inc_by(num_rows as u64);
Err(e)
}
.fail()?;
}

Ok(())
}

/// Handle one query with remote storage protocol.
Expand Down
53 changes: 43 additions & 10 deletions proxy/src/influxdb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::{
convert_influxql_output, convert_write_request, InfluxqlRequest, InfluxqlResponse,
WriteRequest, WriteResponse,
},
metrics::HTTP_HANDLER_COUNTER_VEC,
Context, Proxy,
};

Expand All @@ -48,28 +49,60 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
ctx: RequestContext,
req: WriteRequest,
) -> Result<WriteResponse> {
let write_table_requests = convert_write_request(req)?;

let mut num_rows = 0;
for write_table_request in &write_table_requests {
for entry in &write_table_request.entries {
num_rows += entry.field_groups.len();
}
}

let table_request = GrpcWriteRequest {
context: Some(GrpcRequestContext {
database: ctx.schema.clone(),
}),
table_requests: convert_write_request(req)?,
table_requests: write_table_requests,
};
let proxy_context = Context {
timeout: ctx.timeout,
runtime: self.engine_runtimes.write_runtime.clone(),
enable_partition_table_access: false,
forwarded_from: None,
};
let result = self
.handle_write_internal(proxy_context, table_request)
.await?;

debug!(
"Influxdb write finished, catalog:{}, schema:{}, result:{result:?}",
ctx.catalog, ctx.schema
);

Ok(())
match self
.handle_write_internal(proxy_context, table_request)
.await
{
Ok(result) => {
if result.failed != 0 {
HTTP_HANDLER_COUNTER_VEC.write_failed.inc();
HTTP_HANDLER_COUNTER_VEC
.write_failed_row
.inc_by(result.failed as u64);
ErrNoCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: format!("fail to write storage, failed rows:{:?}", result.failed),
}
.fail()?;
}

debug!(
"Influxdb write finished, catalog:{}, schema:{}, result:{result:?}",
ctx.catalog, ctx.schema
);

Ok(())
}
Err(e) => {
HTTP_HANDLER_COUNTER_VEC.write_failed.inc();
HTTP_HANDLER_COUNTER_VEC
.write_failed_row
.inc_by(num_rows as u64);
Err(e)
}
}
}

async fn fetch_influxdb_query_output(
Expand Down
1 change: 1 addition & 0 deletions proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub mod http;
pub mod influxdb;
pub mod instance;
pub mod limiter;
mod metrics;
pub mod opentsdb;
mod read;
pub mod schema_config_provider;
Expand Down
14 changes: 14 additions & 0 deletions proxy/src/grpc/metrics.rs → proxy/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,29 @@ make_auto_flush_static_metric! {
pub struct GrpcHandlerCounterVec: LocalIntCounter {
"type" => GrpcTypeKind,
}

pub label_enum HttpTypeKind {
write_failed,
write_failed_row,
}

pub struct HttpHandlerCounterVec: LocalIntCounter {
"type" => HttpTypeKind,
}
}

lazy_static! {
pub static ref GRPC_HANDLER_COUNTER_VEC_GLOBAL: IntCounterVec =
register_int_counter_vec!("grpc_handler_counter", "Grpc handler counter", &["type"])
.unwrap();
pub static ref HTTP_HANDLER_COUNTER_VEC_GLOBAL: IntCounterVec =
register_int_counter_vec!("http_handler_counter", "Http handler counter", &["type"])
.unwrap();
}

lazy_static! {
pub static ref GRPC_HANDLER_COUNTER_VEC: GrpcHandlerCounterVec =
auto_flush_from!(GRPC_HANDLER_COUNTER_VEC_GLOBAL, GrpcHandlerCounterVec);
pub static ref HTTP_HANDLER_COUNTER_VEC: HttpHandlerCounterVec =
auto_flush_from!(HTTP_HANDLER_COUNTER_VEC_GLOBAL, HttpHandlerCounterVec);
}
52 changes: 43 additions & 9 deletions proxy/src/opentsdb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
use ceresdbproto::storage::{
RequestContext as GrpcRequestContext, WriteRequest as GrpcWriteRequest,
};
use http::StatusCode;
use log::debug;
use query_engine::executor::Executor as QueryExecutor;

use crate::{
context::RequestContext,
error::Result,
error::{ErrNoCause, Result},
metrics::HTTP_HANDLER_COUNTER_VEC,
opentsdb::types::{convert_put_request, PutRequest, PutResponse},
Context, Proxy,
};
Expand All @@ -24,27 +26,59 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
ctx: RequestContext,
req: PutRequest,
) -> Result<PutResponse> {
let write_table_requests = convert_put_request(req)?;

let mut num_rows = 0;
for write_table_request in &write_table_requests {
jiacai2050 marked this conversation as resolved.
Show resolved Hide resolved
for entry in &write_table_request.entries {
num_rows += entry.field_groups.len();
}
}

let table_request = GrpcWriteRequest {
context: Some(GrpcRequestContext {
database: ctx.schema.clone(),
}),
table_requests: convert_put_request(req)?,
table_requests: write_table_requests,
};
let proxy_context = Context {
timeout: ctx.timeout,
runtime: self.engine_runtimes.write_runtime.clone(),
enable_partition_table_access: false,
forwarded_from: None,
};
let result = self

match self
.handle_write_internal(proxy_context, table_request)
.await?;
.await
{
Ok(result) => {
if result.failed != 0 {
HTTP_HANDLER_COUNTER_VEC.write_failed.inc();
HTTP_HANDLER_COUNTER_VEC
.write_failed_row
.inc_by(result.failed as u64);
ErrNoCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: format!("fail to write storage, failed rows:{:?}", result.failed),
}
.fail()?;
}

debug!(
"OpenTSDB write finished, catalog:{}, schema:{}, result:{result:?}",
ctx.catalog, ctx.schema
);
debug!(
"OpenTSDB write finished, catalog:{}, schema:{}, result:{result:?}",
ctx.catalog, ctx.schema
);

Ok(())
Ok(())
}
Err(e) => {
HTTP_HANDLER_COUNTER_VEC.write_failed.inc();
HTTP_HANDLER_COUNTER_VEC
.write_failed_row
.inc_by(num_rows as u64);
Err(e)
}
}
}
}
Loading