Skip to content

Commit

Permalink
chore: fix by pr comment
Browse files Browse the repository at this point in the history
  • Loading branch information
paomian committed Sep 27, 2024
1 parent b756f82 commit 58bf15f
Show file tree
Hide file tree
Showing 13 changed files with 108 additions and 78 deletions.
25 changes: 22 additions & 3 deletions src/datatypes/src/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use snafu::{ensure, ResultExt};

use crate::error::{self, ConvertArrowArrayToScalarsSnafu, Error, Result, TryFromValueSnafu};
use crate::prelude::*;
use crate::schema::ColumnSchema;
use crate::type_id::LogicalTypeId;
use crate::types::{IntervalType, ListType};
use crate::vectors::ListVector;
Expand Down Expand Up @@ -1237,10 +1238,16 @@ impl<'a> From<Option<ListValueRef<'a>>> for ValueRef<'a> {
}
}

impl<'a> TryFrom<ValueRef<'a>> for serde_json::Value {
pub struct ColumnPair<'a> {
pub value: ValueRef<'a>,
pub schema: &'a ColumnSchema,
}

impl<'a> TryFrom<ColumnPair<'a>> for serde_json::Value {
type Error = serde_json::Error;

fn try_from(value: ValueRef<'a>) -> serde_json::Result<serde_json::Value> {
fn try_from(value: ColumnPair<'a>) -> serde_json::Result<serde_json::Value> {
let ColumnPair { value, schema } = value;
let json_value = match value {
ValueRef::Null => serde_json::Value::Null,
ValueRef::Boolean(v) => serde_json::Value::Bool(v),
Expand All @@ -1255,7 +1262,19 @@ impl<'a> TryFrom<ValueRef<'a>> for serde_json::Value {
ValueRef::Float32(v) => serde_json::Value::from(v.0),
ValueRef::Float64(v) => serde_json::Value::from(v.0),
ValueRef::String(bytes) => serde_json::Value::String(bytes.to_string()),
ValueRef::Binary(bytes) => serde_json::to_value(bytes)?,
ValueRef::Binary(bytes) => {
if let ConcreteDataType::Json(_) = schema.data_type {
match jsonb::from_slice(bytes) {
Ok(json) => json.into(),
Err(e) => {
error!(e; "Failed to parse jsonb");
serde_json::Value::Null
}
}
} else {
serde_json::to_value(bytes)?
}
}
ValueRef::Date(v) => serde_json::Value::Number(v.val().into()),
ValueRef::DateTime(v) => serde_json::Value::Number(v.val().into()),
ValueRef::List(v) => serde_json::to_value(v)?,
Expand Down
5 changes: 3 additions & 2 deletions src/frontend/src/instance/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ use common_telemetry::tracing;
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use pipeline::PipelineWay;
use servers::error::{self, AuthSnafu, Result as ServerResult};
use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef};
use servers::otlp;
use servers::otlp::plugin::TraceParserRef;
use servers::query_handler::{OpenTelemetryProtocolHandler, PipelineWay};
use servers::query_handler::OpenTelemetryProtocolHandler;
use session::context::QueryContextRef;
use snafu::ResultExt;

Expand Down Expand Up @@ -113,10 +114,10 @@ impl OpenTelemetryProtocolHandler for Instance {
.get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
interceptor_ref.pre_execute(ctx.clone())?;
let (requests, rows) = otlp::logs::to_grpc_insert_requests(request, pipeline, table_name)?;
OTLP_LOGS_ROWS.inc_by(rows as u64);

self.handle_row_inserts(requests, ctx)
.await
.inspect(|_| OTLP_LOGS_ROWS.inc_by(rows as u64))
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)
}
Expand Down
5 changes: 5 additions & 0 deletions src/pipeline/src/etl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,11 @@ pub(crate) fn find_key_index(intermediate_keys: &[String], key: &str, kind: &str
.context(IntermediateKeyIndexSnafu { kind, key })
}

pub enum PipelineWay {
Identity,
Custom(std::sync::Arc<Pipeline<crate::GreptimeTransformer>>),
}

#[cfg(test)]
mod tests {

Expand Down
12 changes: 12 additions & 0 deletions src/pipeline/src/etl/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,18 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("can not coerce nested type to {ty}"))]
CoerceNestedType {
ty: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("can not coerce {ty} to nested type"))]
CoerceTypeToNested {
ty: String,
#[snafu(implicit)]
location: Location,
},

#[snafu(display(
"invalid resolution: '{resolution}'. Available resolutions: {valid_resolution}"
Expand Down
18 changes: 11 additions & 7 deletions src/pipeline/src/etl/transform/transformer/greptime/coerce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ use greptime_proto::v1::{ColumnDataType, ColumnSchema, SemanticType};
use snafu::ResultExt;

use crate::etl::error::{
CoerceStringToTypeSnafu, CoerceUnsupportedEpochTypeSnafu, CoerceUnsupportedNullTypeSnafu,
CoerceNestedTypeSnafu, CoerceStringToTypeSnafu, CoerceTypeToNestedSnafu,
CoerceUnsupportedEpochTypeSnafu, CoerceUnsupportedNullTypeSnafu,
CoerceUnsupportedNullTypeToSnafu, ColumnOptionsSnafu, Error, Result,
};
use crate::etl::transform::index::Index;
Expand Down Expand Up @@ -414,14 +415,14 @@ fn coerce_string_value(s: &String, transform: &Transform) -> Result<Option<Value
}
}

fn coerce_nested_value(v: &Value, transform: &Transform) -> Result<Option<ValueData>, String> {
fn coerce_nested_value(v: &Value, transform: &Transform) -> Result<Option<ValueData>> {
match &transform.type_ {
Value::Array(_) | Value::Map(_) => (),
t => {
return Err(format!(
"nested value type not supported {}",
t.to_str_type()
))
return CoerceNestedTypeSnafu {
ty: t.to_str_type(),
}
.fail();
}
}
match v {
Expand All @@ -433,7 +434,10 @@ fn coerce_nested_value(v: &Value, transform: &Transform) -> Result<Option<ValueD
let data: jsonb::Value = v.into();
Ok(Some(ValueData::BinaryValue(data.to_vec())))
}
_ => Err(format!("nested type not support {}", v.to_str_type())),
_ => CoerceTypeToNestedSnafu {
ty: v.to_str_type(),
}
.fail(),
}
}

Expand Down
10 changes: 1 addition & 9 deletions src/pipeline/src/etl/value/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,11 @@ use ahash::HashMap;

use crate::etl::value::Value;

#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, Default)]
pub struct Map {
pub values: BTreeMap<String, Value>,
}

impl Default for Map {
fn default() -> Self {
Self {
values: BTreeMap::default(),
}
}
}

impl Map {
pub fn one(key: impl Into<String>, value: Value) -> Map {
let mut map = Map::default();
Expand Down
2 changes: 1 addition & 1 deletion src/pipeline/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub use etl::error::Result;
pub use etl::processor::Processor;
pub use etl::transform::{GreptimeTransformer, Transformer};
pub use etl::value::{Array, Map, Value};
pub use etl::{parse, Content, Pipeline};
pub use etl::{error as etl_error, parse, Content, Pipeline, PipelineWay};
pub use manager::{
error, pipeline_operator, table, util, PipelineInfo, PipelineRef, PipelineTableRef,
PipelineVersion,
Expand Down
4 changes: 2 additions & 2 deletions src/servers/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,9 +534,9 @@ pub enum Error {
location: Location,
},

#[snafu(display("OpenTelemetry log error: {}", error))]
#[snafu(display("OpenTelemetry log error"))]
OpenTelemetryLog {
error: String,
source: pipeline::etl_error::Error,
#[snafu(implicit)]
location: Location,
},
Expand Down
12 changes: 10 additions & 2 deletions src/servers/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use datatypes::data_type::DataType;
use datatypes::schema::SchemaRef;
use datatypes::value::ColumnPair;
use event::{LogState, LogValidatorRef};
use futures::FutureExt;
use schemars::JsonSchema;
Expand Down Expand Up @@ -239,14 +240,21 @@ impl HttpRecordsOutput {
} else {
let num_rows = recordbatches.iter().map(|r| r.num_rows()).sum::<usize>();
let mut rows = Vec::with_capacity(num_rows);
let schemas = schema.column_schemas();
let num_cols = schema.column_schemas().len();
rows.resize_with(num_rows, || Vec::with_capacity(num_cols));

let mut finished_row_cursor = 0;
for recordbatch in recordbatches {
for col in recordbatch.columns() {
for (col_idx, col) in recordbatch.columns().iter().enumerate() {
// safety here: schemas length is equal to the number of columns in the recordbatch
let schema = &schemas[col_idx];
for row_idx in 0..recordbatch.num_rows() {
let value = Value::try_from(col.get_ref(row_idx)).context(ToJsonSnafu)?;
let column_pair = ColumnPair {
value: col.get_ref(row_idx),
schema,
};
let value = Value::try_from(column_pair).context(ToJsonSnafu)?;
rows[row_idx + finished_row_cursor].push(value);
}
}
Expand Down
48 changes: 34 additions & 14 deletions src/servers/src/http/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use core::str;
use std::result::Result as StdResult;
use std::sync::Arc;

Expand All @@ -33,13 +34,18 @@ use opentelemetry_proto::tonic::collector::trace::v1::{
ExportTraceServiceRequest, ExportTraceServiceResponse,
};
use pipeline::util::to_pipeline_version;
use pipeline::PipelineWay;
use prost::Message;
use session::context::{Channel, QueryContext};
use snafu::prelude::*;

use super::header::{write_cost_header_map, CONTENT_TYPE_PROTOBUF};
use crate::error::{self, Result};
use crate::query_handler::{OpenTelemetryProtocolHandlerRef, PipelineWay};
use crate::query_handler::OpenTelemetryProtocolHandlerRef;

const OTLP_GREPTIME_PIPELINE_NAME_HEADER_NAME: &str = "x-greptime-pipeline-name";
const OTLP_GREPTIME_PIPELINE_VERSION_HEADER_NAME: &str = "x-greptime-pipeline-version";
const OTLP_GREPTIME_TABLE_NAME_HEADER_NAME: &str = "x-greptime-table-name";

#[axum_macros::debug_handler]
#[tracing::instrument(skip_all, fields(protocol = "otlp", request_type = "metrics"))]
Expand Down Expand Up @@ -102,12 +108,14 @@ pub struct PipelineInfo {

fn pipeline_header_error(
header: &HeaderValue,
) -> StdResult<String, (http::StatusCode, &'static str)> {
match header.to_str() {
key: &str,
) -> StdResult<String, (http::StatusCode, String)> {
let header_utf8 = str::from_utf8(header.as_bytes());
match header_utf8 {
Ok(s) => Ok(s.to_string()),
Err(_) => Err((
StatusCode::BAD_REQUEST,
"`X-Pipeline-Name` or `X-Pipeline-Version` header is not string type.",
format!("`{}` header is not valid UTF-8 string type.", key),
)),
}
}
Expand All @@ -117,22 +125,33 @@ impl<S> FromRequestParts<S> for PipelineInfo
where
S: Send + Sync,
{
type Rejection = (StatusCode, &'static str);
type Rejection = (StatusCode, String);

async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult<Self, Self::Rejection> {
let pipeline_name = parts.headers.get("X-Pipeline-Name");
let pipeline_version = parts.headers.get("X-Pipeline-Version");
let pipeline_name = parts.headers.get(OTLP_GREPTIME_PIPELINE_NAME_HEADER_NAME);
let pipeline_version = parts
.headers
.get(OTLP_GREPTIME_PIPELINE_VERSION_HEADER_NAME);
match (pipeline_name, pipeline_version) {
(Some(name), Some(version)) => Ok(PipelineInfo {
pipeline_name: Some(pipeline_header_error(name)?),
pipeline_version: Some(pipeline_header_error(version)?),
pipeline_name: Some(pipeline_header_error(
name,
OTLP_GREPTIME_PIPELINE_NAME_HEADER_NAME,
)?),
pipeline_version: Some(pipeline_header_error(
version,
OTLP_GREPTIME_PIPELINE_VERSION_HEADER_NAME,
)?),
}),
(None, _) => Ok(PipelineInfo {
pipeline_name: None,
pipeline_version: None,
}),
(Some(name), None) => Ok(PipelineInfo {
pipeline_name: Some(pipeline_header_error(name)?),
pipeline_name: Some(pipeline_header_error(
name,
OTLP_GREPTIME_PIPELINE_NAME_HEADER_NAME,
)?),
pipeline_version: None,
}),
}
Expand All @@ -148,13 +167,14 @@ impl<S> FromRequestParts<S> for TableInfo
where
S: Send + Sync,
{
type Rejection = (StatusCode, &'static str);
type Rejection = (StatusCode, String);

async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult<Self, Self::Rejection> {
let table_name = parts.headers.get("X-Table-Name");
let table_name = parts.headers.get(OTLP_GREPTIME_TABLE_NAME_HEADER_NAME);

match table_name {
Some(name) => Ok(TableInfo {
table_name: pipeline_header_error(name)?,
table_name: pipeline_header_error(name, OTLP_GREPTIME_TABLE_NAME_HEADER_NAME)?,
}),
None => Ok(TableInfo {
table_name: "opentelemetry_logs".to_string(),
Expand Down Expand Up @@ -185,7 +205,7 @@ pub async fn logs(
let pipeline_version =
to_pipeline_version(pipeline_info.pipeline_version).map_err(|_| {
error::InvalidParameterSnafu {
reason: "X-Pipeline-Version".to_string(),
reason: OTLP_GREPTIME_PIPELINE_VERSION_HEADER_NAME,
}
.build()
})?;
Expand Down
Loading

0 comments on commit 58bf15f

Please sign in to comment.