Skip to content

Commit

Permalink
chore: minor update
Browse files Browse the repository at this point in the history
  • Loading branch information
shuiyisong committed Jun 5, 2024
1 parent ea548b0 commit 6c88b89
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 31 deletions.
1 change: 1 addition & 0 deletions src/frontend/src/instance/log_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ impl LogHandler for Instance {
}

async fn delete_pipeline(&self, _name: &str, _query_ctx: QueryContextRef) -> ServerResult<()> {
// todo(qtang): impl delete
todo!("delete_pipeline")
}
}
Expand Down
18 changes: 10 additions & 8 deletions src/pipeline/src/mng/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,24 +350,26 @@ impl PipelineTable {
.context(CollectRecordsSnafu)?;

ensure!(!records.is_empty(), PipelineNotFoundSnafu { name });
// assume schema + name is unique for now
ensure!(
records.len() == 1 && records[0].num_columns() == 1,
PipelineNotFoundSnafu { name }
);

assert_eq!(records.len(), 1);
assert_eq!(records[0].num_columns(), 1);

let script_column = records[0].column(0);
let script_column = script_column
let pipeline_column = records[0].column(0);
let pipeline_column = pipeline_column
.as_any()
.downcast_ref::<StringVector>()
.with_context(|| CastTypeSnafu {
msg: format!(
"can't downcast {:?} array into string vector",
script_column.data_type()
pipeline_column.data_type()
),
})?;

assert_eq!(script_column.len(), 1);
ensure!(pipeline_column.len() == 1, PipelineNotFoundSnafu { name });

// Safety: asserted above
Ok(script_column.get_data(0).unwrap().to_string())
Ok(pipeline_column.get_data(0).unwrap().to_string())
}
}
26 changes: 3 additions & 23 deletions src/servers/src/http/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@ use std::collections::HashMap;
use api::v1::{RowInsertRequest, RowInsertRequests, Rows};
use axum::extract::{Json, Query, State};
use axum::headers::ContentType;
use axum::Extension;
use common_telemetry::{error, info};
use headers::HeaderMapExt;
use http::HeaderMap;
use axum::{Extension, TypedHeader};
use common_telemetry::error;
use mime_guess::mime;
use pipeline::error::{CastTypeSnafu, ExecPipelineSnafu};
use pipeline::Value as PipelineValue;
Expand Down Expand Up @@ -75,17 +73,9 @@ pub async fn log_ingester(
State(handler): State<LogHandlerRef>,
Query(query_params): Query<LogIngesterQueryParams>,
Extension(query_ctx): Extension<QueryContextRef>,
// TypedHeader(content_type): TypedHeader<ContentType>,
headers: HeaderMap,
TypedHeader(content_type): TypedHeader<ContentType>,
payload: String,
) -> Result<HttpResponse> {
// TODO(shuiyisong): remove debug log
info!("[log_header]: {:?}", headers);
info!("[log_payload]: {:?}", payload);
let content_type = headers
.typed_get::<ContentType>()
.unwrap_or(ContentType::text());

let pipeline_name = query_params.pipeline_name.context(InvalidParameterSnafu {
reason: "pipeline_name is required",
})?;
Expand All @@ -97,23 +87,13 @@ pub async fn log_ingester(
let value = match m.subtype() {
// TODO (qtang): we should decide json or jsonl
mime::JSON => serde_json::from_str(&payload).context(ParseJsonSnafu)?,
// TODO (qtang): we should decide which content type to support
// form_url_cncoded type is only placeholder
mime::WWW_FORM_URLENCODED => parse_space_separated_log(payload)?,
// add more content type support
_ => UnsupportedContentTypeSnafu { content_type }.fail()?,
};

log_ingester_inner(handler, pipeline_name, table_name, value, query_ctx).await
}

fn parse_space_separated_log(payload: String) -> Result<Value> {
// ToStructuredLogSnafu
let _log = payload.split_whitespace().collect::<Vec<&str>>();
// TODO (qtang): implement this
todo!()
}

async fn log_ingester_inner(
state: LogHandlerRef,
pipeline_name: String,
Expand Down

0 comments on commit 6c88b89

Please sign in to comment.