diff --git a/src/frontend/src/instance/log_handler.rs b/src/frontend/src/instance/log_handler.rs index 26e37498e591..ecc22b4935ec 100644 --- a/src/frontend/src/instance/log_handler.rs +++ b/src/frontend/src/instance/log_handler.rs @@ -66,6 +66,7 @@ impl LogHandler for Instance { } async fn delete_pipeline(&self, _name: &str, _query_ctx: QueryContextRef) -> ServerResult<()> { + // todo(qtang): impl delete todo!("delete_pipeline") } } diff --git a/src/pipeline/src/mng/pipeline_operator.rs b/src/pipeline/src/mng/pipeline_operator.rs index f137cdfd769f..8540fa797be6 100644 --- a/src/pipeline/src/mng/pipeline_operator.rs +++ b/src/pipeline/src/mng/pipeline_operator.rs @@ -48,7 +48,7 @@ impl PipelineOperator { catalog_name: catalog.to_string(), schema_name: DEFAULT_PRIVATE_SCHEMA_NAME.to_string(), table_name: PIPELINE_TABLE_NAME.to_string(), - desc: "GreptimeDB scripts table for Python".to_string(), + desc: "GreptimeDB pipeline table for Log".to_string(), column_defs, time_index, primary_keys, diff --git a/src/pipeline/src/mng/table.rs b/src/pipeline/src/mng/table.rs index b7574238f569..040924f799de 100644 --- a/src/pipeline/src/mng/table.rs +++ b/src/pipeline/src/mng/table.rs @@ -80,7 +80,6 @@ impl PipelineTable { let content_type = "content_type"; let pipeline_content = "pipeline"; let created_at = "created_at"; - let updated_at = "updated_at"; ( created_at.to_string(), @@ -135,15 +134,6 @@ impl PipelineTable { comment: "".to_string(), datatype_extension: None, }, - ColumnDef { - name: updated_at.to_string(), - data_type: ColumnDataType::TimestampMillisecond as i32, - is_nullable: false, - default_constraint: vec![], - semantic_type: SemanticType::Field as i32, - comment: "".to_string(), - datatype_extension: None, - }, ], ) } @@ -180,12 +170,6 @@ impl PipelineTable { semantic_type: SemanticType::Timestamp.into(), ..Default::default() }, - PbColumnSchema { - column_name: "updated_at".to_string(), - datatype: ColumnDataType::TimestampMillisecond.into(), - semantic_type: SemanticType::Field.into(), - ..Default::default() - }, ] } @@ -241,7 +225,6 @@ impl PipelineTable { ValueData::StringValue(content_type.to_string()).into(), ValueData::StringValue(pipeline.to_string()).into(), ValueData::TimestampMillisecondValue(now).into(), - ValueData::TimestampMillisecondValue(now).into(), ], }], }), @@ -350,24 +333,26 @@ impl PipelineTable { .context(CollectRecordsSnafu)?; ensure!(!records.is_empty(), PipelineNotFoundSnafu { name }); + // assume schema + name is unique for now + ensure!( + records.len() == 1 && records[0].num_columns() == 1, + PipelineNotFoundSnafu { name } + ); - assert_eq!(records.len(), 1); - assert_eq!(records[0].num_columns(), 1); - - let script_column = records[0].column(0); - let script_column = script_column + let pipeline_column = records[0].column(0); + let pipeline_column = pipeline_column .as_any() .downcast_ref::() .with_context(|| CastTypeSnafu { msg: format!( "can't downcast {:?} array into string vector", - script_column.data_type() + pipeline_column.data_type() ), })?; - assert_eq!(script_column.len(), 1); + ensure!(pipeline_column.len() == 1, PipelineNotFoundSnafu { name }); // Safety: asserted above - Ok(script_column.get_data(0).unwrap().to_string()) + Ok(pipeline_column.get_data(0).unwrap().to_string()) } } diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index 4d1ebe2e2172..a08688b55309 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -17,10 +17,8 @@ use std::collections::HashMap; use api::v1::{RowInsertRequest, RowInsertRequests, Rows}; use axum::extract::{Json, Query, State}; use axum::headers::ContentType; -use axum::Extension; -use common_telemetry::{error, info}; -use headers::HeaderMapExt; -use http::HeaderMap; +use axum::{Extension, TypedHeader}; +use common_telemetry::error; use mime_guess::mime; use pipeline::error::{CastTypeSnafu, ExecPipelineSnafu}; use pipeline::Value as PipelineValue; @@ -75,17 +73,9 @@ pub async fn log_ingester( State(handler): State, Query(query_params): Query, Extension(query_ctx): Extension, - // TypedHeader(content_type): TypedHeader, - headers: HeaderMap, + TypedHeader(content_type): TypedHeader, payload: String, ) -> Result { - // TODO(shuiyisong): remove debug log - info!("[log_header]: {:?}", headers); - info!("[log_payload]: {:?}", payload); - let content_type = headers - .typed_get::() - .unwrap_or(ContentType::text()); - let pipeline_name = query_params.pipeline_name.context(InvalidParameterSnafu { reason: "pipeline_name is required", })?; @@ -97,9 +87,6 @@ pub async fn log_ingester( let value = match m.subtype() { // TODO (qtang): we should decide json or jsonl mime::JSON => serde_json::from_str(&payload).context(ParseJsonSnafu)?, - // TODO (qtang): we should decide which content type to support - // form_url_cncoded type is only placeholder - mime::WWW_FORM_URLENCODED => parse_space_separated_log(payload)?, // add more content type support _ => UnsupportedContentTypeSnafu { content_type }.fail()?, }; @@ -107,13 +94,6 @@ pub async fn log_ingester( log_ingester_inner(handler, pipeline_name, table_name, value, query_ctx).await } -fn parse_space_separated_log(payload: String) -> Result { - // ToStructuredLogSnafu - let _log = payload.split_whitespace().collect::>(); - // TODO (qtang): implement this - todo!() -} - async fn log_ingester_inner( state: LogHandlerRef, pipeline_name: String,