diff --git a/src/datatypes/src/value.rs b/src/datatypes/src/value.rs index a8e59da51355..a02e6178ab0c 100644 --- a/src/datatypes/src/value.rs +++ b/src/datatypes/src/value.rs @@ -38,6 +38,7 @@ use snafu::{ensure, ResultExt}; use crate::error::{self, ConvertArrowArrayToScalarsSnafu, Error, Result, TryFromValueSnafu}; use crate::prelude::*; +use crate::schema::ColumnSchema; use crate::type_id::LogicalTypeId; use crate::types::{IntervalType, ListType}; use crate::vectors::ListVector; @@ -1237,10 +1238,16 @@ impl<'a> From>> for ValueRef<'a> { } } -impl<'a> TryFrom> for serde_json::Value { +pub struct ColumnPair<'a> { + pub value: ValueRef<'a>, + pub schema: &'a ColumnSchema, +} + +impl<'a> TryFrom> for serde_json::Value { type Error = serde_json::Error; - fn try_from(value: ValueRef<'a>) -> serde_json::Result { + fn try_from(value: ColumnPair<'a>) -> serde_json::Result { + let ColumnPair { value, schema } = value; let json_value = match value { ValueRef::Null => serde_json::Value::Null, ValueRef::Boolean(v) => serde_json::Value::Bool(v), @@ -1255,7 +1262,19 @@ impl<'a> TryFrom> for serde_json::Value { ValueRef::Float32(v) => serde_json::Value::from(v.0), ValueRef::Float64(v) => serde_json::Value::from(v.0), ValueRef::String(bytes) => serde_json::Value::String(bytes.to_string()), - ValueRef::Binary(bytes) => serde_json::to_value(bytes)?, + ValueRef::Binary(bytes) => { + if let ConcreteDataType::Json(_) = schema.data_type { + match jsonb::from_slice(bytes) { + Ok(json) => json.into(), + Err(e) => { + error!(e; "Failed to parse jsonb"); + serde_json::Value::Null + } + } + } else { + serde_json::to_value(bytes)? + } + } ValueRef::Date(v) => serde_json::Value::Number(v.val().into()), ValueRef::DateTime(v) => serde_json::Value::Number(v.val().into()), ValueRef::List(v) => serde_json::to_value(v)?, diff --git a/src/frontend/src/instance/otlp.rs b/src/frontend/src/instance/otlp.rs index 0e0419688e5b..30d6c6cb42d9 100644 --- a/src/frontend/src/instance/otlp.rs +++ b/src/frontend/src/instance/otlp.rs @@ -20,11 +20,12 @@ use common_telemetry::tracing; use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; +use pipeline::PipelineWay; use servers::error::{self, AuthSnafu, Result as ServerResult}; use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef}; use servers::otlp; use servers::otlp::plugin::TraceParserRef; -use servers::query_handler::{OpenTelemetryProtocolHandler, PipelineWay}; +use servers::query_handler::OpenTelemetryProtocolHandler; use session::context::QueryContextRef; use snafu::ResultExt; @@ -113,10 +114,10 @@ impl OpenTelemetryProtocolHandler for Instance { .get::>(); interceptor_ref.pre_execute(ctx.clone())?; let (requests, rows) = otlp::logs::to_grpc_insert_requests(request, pipeline, table_name)?; - OTLP_LOGS_ROWS.inc_by(rows as u64); self.handle_row_inserts(requests, ctx) .await + .inspect(|_| OTLP_LOGS_ROWS.inc_by(rows as u64)) .map_err(BoxedError::new) .context(error::ExecuteGrpcQuerySnafu) } diff --git a/src/pipeline/src/etl.rs b/src/pipeline/src/etl.rs index 748493331c93..bec06cab8bfb 100644 --- a/src/pipeline/src/etl.rs +++ b/src/pipeline/src/etl.rs @@ -317,6 +317,11 @@ pub(crate) fn find_key_index(intermediate_keys: &[String], key: &str, kind: &str .context(IntermediateKeyIndexSnafu { kind, key }) } +pub enum PipelineWay { + Identity, + Custom(std::sync::Arc>), +} + #[cfg(test)] mod tests { diff --git a/src/pipeline/src/etl/error.rs b/src/pipeline/src/etl/error.rs index 3680053ba0d7..57d3d52ea7e9 100644 --- a/src/pipeline/src/etl/error.rs +++ b/src/pipeline/src/etl/error.rs @@ -438,6 +438,18 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + #[snafu(display("can not coerce nested type to {ty}"))] + CoerceNestedType { + ty: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("can not coerce {ty} to nested type"))] + CoerceTypeToNested { + ty: String, + #[snafu(implicit)] + location: Location, + }, #[snafu(display( "invalid resolution: '{resolution}'. Available resolutions: {valid_resolution}" diff --git a/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs b/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs index 489b2e147b6d..48356bc96e67 100644 --- a/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs +++ b/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs @@ -20,7 +20,8 @@ use greptime_proto::v1::{ColumnDataType, ColumnSchema, SemanticType}; use snafu::ResultExt; use crate::etl::error::{ - CoerceStringToTypeSnafu, CoerceUnsupportedEpochTypeSnafu, CoerceUnsupportedNullTypeSnafu, + CoerceNestedTypeSnafu, CoerceStringToTypeSnafu, CoerceTypeToNestedSnafu, + CoerceUnsupportedEpochTypeSnafu, CoerceUnsupportedNullTypeSnafu, CoerceUnsupportedNullTypeToSnafu, ColumnOptionsSnafu, Error, Result, }; use crate::etl::transform::index::Index; @@ -414,14 +415,14 @@ fn coerce_string_value(s: &String, transform: &Transform) -> Result Result, String> { +fn coerce_nested_value(v: &Value, transform: &Transform) -> Result> { match &transform.type_ { Value::Array(_) | Value::Map(_) => (), t => { - return Err(format!( - "nested value type not supported {}", - t.to_str_type() - )) + return CoerceNestedTypeSnafu { + ty: t.to_str_type(), + } + .fail(); } } match v { @@ -433,7 +434,10 @@ fn coerce_nested_value(v: &Value, transform: &Transform) -> Result Err(format!("nested type not support {}", v.to_str_type())), + _ => CoerceTypeToNestedSnafu { + ty: v.to_str_type(), + } + .fail(), } } diff --git a/src/pipeline/src/etl/value/map.rs b/src/pipeline/src/etl/value/map.rs index c9d9cd25f79c..7450fed20d70 100644 --- a/src/pipeline/src/etl/value/map.rs +++ b/src/pipeline/src/etl/value/map.rs @@ -18,19 +18,11 @@ use ahash::HashMap; use crate::etl::value::Value; -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Default)] pub struct Map { pub values: BTreeMap, } -impl Default for Map { - fn default() -> Self { - Self { - values: BTreeMap::default(), - } - } -} - impl Map { pub fn one(key: impl Into, value: Value) -> Map { let mut map = Map::default(); diff --git a/src/pipeline/src/lib.rs b/src/pipeline/src/lib.rs index 8fc72c584484..a4d9767804d3 100644 --- a/src/pipeline/src/lib.rs +++ b/src/pipeline/src/lib.rs @@ -20,7 +20,7 @@ pub use etl::error::Result; pub use etl::processor::Processor; pub use etl::transform::{GreptimeTransformer, Transformer}; pub use etl::value::{Array, Map, Value}; -pub use etl::{parse, Content, Pipeline}; +pub use etl::{error as etl_error, parse, Content, Pipeline, PipelineWay}; pub use manager::{ error, pipeline_operator, table, util, PipelineInfo, PipelineRef, PipelineTableRef, PipelineVersion, diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 2f616f18f954..ebee059e1ddf 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -534,9 +534,9 @@ pub enum Error { location: Location, }, - #[snafu(display("OpenTelemetry log error: {}", error))] + #[snafu(display("OpenTelemetry log error"))] OpenTelemetryLog { - error: String, + source: pipeline::etl_error::Error, #[snafu(implicit)] location: Location, }, diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 61b17a69fc05..12db3346e6d1 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -36,6 +36,7 @@ use common_time::timestamp::TimeUnit; use common_time::Timestamp; use datatypes::data_type::DataType; use datatypes::schema::SchemaRef; +use datatypes::value::ColumnPair; use event::{LogState, LogValidatorRef}; use futures::FutureExt; use schemars::JsonSchema; @@ -239,14 +240,21 @@ impl HttpRecordsOutput { } else { let num_rows = recordbatches.iter().map(|r| r.num_rows()).sum::(); let mut rows = Vec::with_capacity(num_rows); + let schemas = schema.column_schemas(); let num_cols = schema.column_schemas().len(); rows.resize_with(num_rows, || Vec::with_capacity(num_cols)); let mut finished_row_cursor = 0; for recordbatch in recordbatches { - for col in recordbatch.columns() { + for (col_idx, col) in recordbatch.columns().iter().enumerate() { + // safety here: schemas length is equal to the number of columns in the recordbatch + let schema = &schemas[col_idx]; for row_idx in 0..recordbatch.num_rows() { - let value = Value::try_from(col.get_ref(row_idx)).context(ToJsonSnafu)?; + let column_pair = ColumnPair { + value: col.get_ref(row_idx), + schema, + }; + let value = Value::try_from(column_pair).context(ToJsonSnafu)?; rows[row_idx + finished_row_cursor].push(value); } } diff --git a/src/servers/src/http/otlp.rs b/src/servers/src/http/otlp.rs index 53883cc5e705..349bb2a3b9eb 100644 --- a/src/servers/src/http/otlp.rs +++ b/src/servers/src/http/otlp.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use core::str; use std::result::Result as StdResult; use std::sync::Arc; @@ -33,13 +34,18 @@ use opentelemetry_proto::tonic::collector::trace::v1::{ ExportTraceServiceRequest, ExportTraceServiceResponse, }; use pipeline::util::to_pipeline_version; +use pipeline::PipelineWay; use prost::Message; use session::context::{Channel, QueryContext}; use snafu::prelude::*; use super::header::{write_cost_header_map, CONTENT_TYPE_PROTOBUF}; use crate::error::{self, Result}; -use crate::query_handler::{OpenTelemetryProtocolHandlerRef, PipelineWay}; +use crate::query_handler::OpenTelemetryProtocolHandlerRef; + +const OTLP_GREPTIME_PIPELINE_NAME_HEADER_NAME: &str = "x-greptime-pipeline-name"; +const OTLP_GREPTIME_PIPELINE_VERSION_HEADER_NAME: &str = "x-greptime-pipeline-version"; +const OTLP_GREPTIME_TABLE_NAME_HEADER_NAME: &str = "x-greptime-table-name"; #[axum_macros::debug_handler] #[tracing::instrument(skip_all, fields(protocol = "otlp", request_type = "metrics"))] @@ -102,12 +108,14 @@ pub struct PipelineInfo { fn pipeline_header_error( header: &HeaderValue, -) -> StdResult { - match header.to_str() { + key: &str, +) -> StdResult { + let header_utf8 = str::from_utf8(header.as_bytes()); + match header_utf8 { Ok(s) => Ok(s.to_string()), Err(_) => Err(( StatusCode::BAD_REQUEST, - "`X-Pipeline-Name` or `X-Pipeline-Version` header is not string type.", + format!("`{}` header is not valid UTF-8 string type.", key), )), } } @@ -117,22 +125,33 @@ impl FromRequestParts for PipelineInfo where S: Send + Sync, { - type Rejection = (StatusCode, &'static str); + type Rejection = (StatusCode, String); async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult { - let pipeline_name = parts.headers.get("X-Pipeline-Name"); - let pipeline_version = parts.headers.get("X-Pipeline-Version"); + let pipeline_name = parts.headers.get(OTLP_GREPTIME_PIPELINE_NAME_HEADER_NAME); + let pipeline_version = parts + .headers + .get(OTLP_GREPTIME_PIPELINE_VERSION_HEADER_NAME); match (pipeline_name, pipeline_version) { (Some(name), Some(version)) => Ok(PipelineInfo { - pipeline_name: Some(pipeline_header_error(name)?), - pipeline_version: Some(pipeline_header_error(version)?), + pipeline_name: Some(pipeline_header_error( + name, + OTLP_GREPTIME_PIPELINE_NAME_HEADER_NAME, + )?), + pipeline_version: Some(pipeline_header_error( + version, + OTLP_GREPTIME_PIPELINE_VERSION_HEADER_NAME, + )?), }), (None, _) => Ok(PipelineInfo { pipeline_name: None, pipeline_version: None, }), (Some(name), None) => Ok(PipelineInfo { - pipeline_name: Some(pipeline_header_error(name)?), + pipeline_name: Some(pipeline_header_error( + name, + OTLP_GREPTIME_PIPELINE_NAME_HEADER_NAME, + )?), pipeline_version: None, }), } @@ -148,13 +167,14 @@ impl FromRequestParts for TableInfo where S: Send + Sync, { - type Rejection = (StatusCode, &'static str); + type Rejection = (StatusCode, String); async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult { - let table_name = parts.headers.get("X-Table-Name"); + let table_name = parts.headers.get(OTLP_GREPTIME_TABLE_NAME_HEADER_NAME); + match table_name { Some(name) => Ok(TableInfo { - table_name: pipeline_header_error(name)?, + table_name: pipeline_header_error(name, OTLP_GREPTIME_TABLE_NAME_HEADER_NAME)?, }), None => Ok(TableInfo { table_name: "opentelemetry_logs".to_string(), @@ -185,7 +205,7 @@ pub async fn logs( let pipeline_version = to_pipeline_version(pipeline_info.pipeline_version).map_err(|_| { error::InvalidParameterSnafu { - reason: "X-Pipeline-Version".to_string(), + reason: OTLP_GREPTIME_PIPELINE_VERSION_HEADER_NAME, } .build() })?; diff --git a/src/servers/src/otlp/logs.rs b/src/servers/src/otlp/logs.rs index b9c653e51551..b8dd0bafe16d 100644 --- a/src/servers/src/otlp/logs.rs +++ b/src/servers/src/otlp/logs.rs @@ -24,12 +24,12 @@ use jsonb::{Number as JsonbNumber, Value as JsonbValue}; use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; use opentelemetry_proto::tonic::common::v1::{any_value, AnyValue, InstrumentationScope, KeyValue}; use opentelemetry_proto::tonic::logs::v1::LogRecord; -use pipeline::{Array, Map, Value as PipelineValue}; +use pipeline::{Array, Map, PipelineWay, Value as PipelineValue}; +use snafu::ResultExt; use super::trace::attributes::OtlpAnyValue; use crate::error::{OpenTelemetryLogSnafu, Result}; use crate::otlp::trace::span::bytes_to_hex_string; -use crate::query_handler::PipelineWay; /// Normalize otlp instrumentation, metric and attribute names /// @@ -74,10 +74,10 @@ pub fn to_grpc_insert_requests( let mut intermediate_state = p.init_intermediate_state(); for v in request { p.prepare_pipeline_value(v, &mut intermediate_state) - .map_err(|e| OpenTelemetryLogSnafu { error: e }.build())?; + .context(OpenTelemetryLogSnafu)?; let r = p .exec_mut(&mut intermediate_state) - .map_err(|e| OpenTelemetryLogSnafu { error: e }.build())?; + .context(OpenTelemetryLogSnafu)?; result.push(r); } let len = result.len(); @@ -215,20 +215,6 @@ fn build_identity_schema() -> Vec { }), None, ), - ( - "scope_schemaUrl", - ColumnDataType::String, - SemanticType::Field, - None, - None, - ), - ( - "resource_schema_url", - ColumnDataType::String, - SemanticType::Field, - None, - None, - ), ( "resource_attributes", ColumnDataType::Binary, @@ -324,9 +310,7 @@ fn build_identity_schema() -> Vec { fn build_identity_row( log: LogRecord, - resource_schema_url: String, resource_attr: JsonbValue<'_>, - scope_schema_url: String, scope_name: Option, scope_version: Option, scope_attrs: JsonbValue<'_>, @@ -341,12 +325,6 @@ fn build_identity_row( GreptimeValue { value_data: Some(ValueData::BinaryValue(scope_attrs.to_vec())), }, - GreptimeValue { - value_data: Some(ValueData::StringValue(scope_schema_url)), - }, - GreptimeValue { - value_data: Some(ValueData::StringValue(resource_schema_url)), - }, GreptimeValue { value_data: Some(ValueData::BinaryValue(resource_attr.to_vec())), }, @@ -398,16 +376,12 @@ fn parse_export_logs_service_request_to_rows(request: ExportLogsServiceRequest) .resource .map(|x| key_value_to_jsonb(x.attributes)) .unwrap_or(JsonbValue::Null); - let resource_schema_url = r.schema_url; for scope_logs in r.scope_logs { let (scope_attrs, scope_version, scope_name) = scope_to_jsonb(scope_logs.scope); - let scope_schema_url = scope_logs.schema_url; for log in scope_logs.log_records { let value = build_identity_row( log, - resource_schema_url.clone(), resource_attr.clone(), - scope_schema_url.clone(), scope_name.clone(), scope_version.clone(), scope_attrs.clone(), diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index 5415ee33d9ba..a1ad9997ba7b 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -36,7 +36,7 @@ use headers::HeaderValue; use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; -use pipeline::{GreptimeTransformer, Pipeline, PipelineInfo, PipelineVersion}; +use pipeline::{GreptimeTransformer, Pipeline, PipelineInfo, PipelineVersion, PipelineWay}; use serde_json::Value; use session::context::QueryContextRef; @@ -105,10 +105,6 @@ pub trait PromStoreProtocolHandler { async fn ingest_metrics(&self, metrics: Metrics) -> Result<()>; } -pub enum PipelineWay { - Identity, - Custom(Arc>), -} #[async_trait] pub trait OpenTelemetryProtocolHandler: LogHandler { /// Handling opentelemetry metrics request diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index a9ca8d0598d9..8d56d384c76e 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -1510,7 +1510,7 @@ pub async fn test_otlp_logs(store_type: StorageType) { let res = send_req( &client, vec![( - HeaderName::from_static("x-table-name"), + HeaderName::from_static("x-greptime-table-name"), HeaderValue::from_static("logs"), )], "/v1/otlp/v1/logs?db=public", @@ -1520,8 +1520,7 @@ pub async fn test_otlp_logs(store_type: StorageType) { .await; assert_eq!(StatusCode::OK, res.status()); - // TODO(qtang): we show convert jsonb to json string in http sql API - let expected = r#"[{"records":{"schema":{"column_schemas":[{"name":"scope_name","data_type":"String"},{"name":"scope_version","data_type":"String"},{"name":"scope_attributes","data_type":"Json"},{"name":"scope_schemaUrl","data_type":"String"},{"name":"resource_schema_url","data_type":"String"},{"name":"resource_attributes","data_type":"Json"},{"name":"log_attributes","data_type":"Json"},{"name":"timestamp","data_type":"TimestampNanosecond"},{"name":"observed_timestamp","data_type":"TimestampNanosecond"},{"name":"trace_id","data_type":"String"},{"name":"span_id","data_type":"String"},{"name":"trace_flags","data_type":"UInt32"},{"name":"severity_text","data_type":"String"},{"name":"severity_number","data_type":"Int32"},{"name":"body","data_type":"String"}]},"rows":[["","",[64,0,0,0],"https://opentelemetry.io/schemas/1.0.0/scopeLogs","https://opentelemetry.io/schemas/1.0.0/resourceLogs",[64,0,0,1,16,0,0,13,16,0,0,19,114,101,115,111,117,114,99,101,95,97,116,116,114,114,101,115,111,117,114,99,101,45,97,116,116,114,45,118,97,108,45,49],[64,0,0,2,16,0,0,8,16,0,0,3,16,0,0,4,16,0,0,3,99,117,115,116,111,109,101,114,101,110,118,97,99,109,101,100,101,118],1581452773000000789,1581452773000000789,"30","30",1,"Info",9,"something happened"],["","",[64,0,0,0],"https://opentelemetry.io/schemas/1.0.0/scopeLogs","https://opentelemetry.io/schemas/1.0.0/resourceLogs",[64,0,0,1,16,0,0,13,16,0,0,19,114,101,115,111,117,114,99,101,95,97,116,116,114,114,101,115,111,117,114,99,101,45,97,116,116,114,45,118,97,108,45,49],[64,0,0,2,16,0,0,3,16,0,0,12,16,0,0,6,32,0,0,2,97,112,112,105,110,115,116,97,110,99,101,95,110,117,109,115,101,114,118,101,114,64,1],1581452773000009875,1581452773000009875,"3038303430323031303030303030303030303030303030303030303030303030","30313032303430383030303030303030",1,"Info",9,"This is a log message"]]"#; + let expected = r#"[["","",{},{"resource_attr":"resource-attr-val-1"},{"customer":"acme","env":"dev"},1581452773000000789,1581452773000000789,"30","30",1,"Info",9,"something happened"],["","",{},{"resource_attr":"resource-attr-val-1"},{"app":"server","instance_num":1},1581452773000009875,1581452773000009875,"3038303430323031303030303030303030303030303030303030303030303030","30313032303430383030303030303030",1,"Info",9,"This is a log message"]]"#; validate_data(&client, "select * from logs;", expected).await; guard.remove_all().await;