diff --git a/src/pipeline/src/etl/error.rs b/src/pipeline/src/etl/error.rs index 3680053ba0d7..3b5519e2f730 100644 --- a/src/pipeline/src/etl/error.rs +++ b/src/pipeline/src/etl/error.rs @@ -438,6 +438,17 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + #[snafu(display("failed to coerce complex value, not supported"))] + CoerceComplexType { + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("failed to coerce value: {msg}"))] + CoerceIncompatibleTypes { + msg: String, + #[snafu(implicit)] + location: Location, + }, #[snafu(display( "invalid resolution: '{resolution}'. Available resolutions: {valid_resolution}" diff --git a/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs b/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs index 827613b02b60..8d07b34d462e 100644 --- a/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs +++ b/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs @@ -20,7 +20,8 @@ use greptime_proto::v1::{ColumnDataType, ColumnSchema, SemanticType}; use snafu::ResultExt; use crate::etl::error::{ - CoerceStringToTypeSnafu, CoerceUnsupportedEpochTypeSnafu, CoerceUnsupportedNullTypeSnafu, + CoerceComplexTypeSnafu, CoerceIncompatibleTypesSnafu, CoerceStringToTypeSnafu, + CoerceUnsupportedEpochTypeSnafu, CoerceUnsupportedNullTypeSnafu, CoerceUnsupportedNullTypeToSnafu, ColumnOptionsSnafu, Error, Result, }; use crate::etl::transform::index::Index; @@ -61,8 +62,7 @@ impl TryFrom for ValueData { } Value::Timestamp(Timestamp::Second(s)) => Ok(ValueData::TimestampSecondValue(s)), - Value::Array(_) => unimplemented!("Array type not supported"), - Value::Map(_) => unimplemented!("Object type not supported"), + Value::Array(_) | Value::Map(_) => CoerceComplexTypeSnafu.fail(), } } } @@ -134,8 +134,7 @@ fn coerce_type(transform: &Transform) -> Result { Value::Timestamp(Timestamp::Millisecond(_)) => Ok(ColumnDataType::TimestampMillisecond), Value::Timestamp(Timestamp::Second(_)) => Ok(ColumnDataType::TimestampSecond), - Value::Array(_) => unimplemented!("Array"), - Value::Map(_) => unimplemented!("Object"), + Value::Array(_) | Value::Map(_) => CoerceComplexTypeSnafu.fail(), Value::Null => CoerceUnsupportedNullTypeToSnafu { ty: transform.type_.to_str_type(), @@ -176,19 +175,28 @@ pub(crate) fn coerce_value(val: &Value, transform: &Transform) -> Result coerce_bool_value(*b, transform), Value::String(s) => coerce_string_value(s, transform), - Value::Timestamp(Timestamp::Nanosecond(ns)) => { - Ok(Some(ValueData::TimestampNanosecondValue(*ns))) - } - Value::Timestamp(Timestamp::Microsecond(us)) => { - Ok(Some(ValueData::TimestampMicrosecondValue(*us))) - } - Value::Timestamp(Timestamp::Millisecond(ms)) => { - Ok(Some(ValueData::TimestampMillisecondValue(*ms))) - } - Value::Timestamp(Timestamp::Second(s)) => Ok(Some(ValueData::TimestampSecondValue(*s))), + Value::Timestamp(input_timestamp) => match &transform.type_ { + Value::Timestamp(target_timestamp) => match target_timestamp { + Timestamp::Nanosecond(_) => Ok(Some(ValueData::TimestampNanosecondValue( + input_timestamp.timestamp_nanos(), + ))), + Timestamp::Microsecond(_) => Ok(Some(ValueData::TimestampMicrosecondValue( + input_timestamp.timestamp_micros(), + ))), + Timestamp::Millisecond(_) => Ok(Some(ValueData::TimestampMillisecondValue( + input_timestamp.timestamp_millis(), + ))), + Timestamp::Second(_) => Ok(Some(ValueData::TimestampSecondValue( + input_timestamp.timestamp(), + ))), + }, + _ => CoerceIncompatibleTypesSnafu { + msg: "Timestamp can only be coerced to another timestamp", + } + .fail(), + }, - Value::Array(_) => unimplemented!("Array type not supported"), - Value::Map(_) => unimplemented!("Object type not supported"), + Value::Array(_) | Value::Map(_) => CoerceComplexTypeSnafu.fail(), } } @@ -220,8 +228,7 @@ fn coerce_bool_value(b: bool, transform: &Transform) -> Result } }, - Value::Array(_) => unimplemented!("Array type not supported"), - Value::Map(_) => unimplemented!("Object type not supported"), + Value::Array(_) | Value::Map(_) => return CoerceComplexTypeSnafu.fail(), Value::Null => return Ok(None), }; @@ -257,8 +264,7 @@ fn coerce_i64_value(n: i64, transform: &Transform) -> Result> } }, - Value::Array(_) => unimplemented!("Array type not supported"), - Value::Map(_) => unimplemented!("Object type not supported"), + Value::Array(_) | Value::Map(_) => return CoerceComplexTypeSnafu.fail(), Value::Null => return Ok(None), }; @@ -294,8 +300,7 @@ fn coerce_u64_value(n: u64, transform: &Transform) -> Result> } }, - Value::Array(_) => unimplemented!("Array type not supported"), - Value::Map(_) => unimplemented!("Object type not supported"), + Value::Array(_) | Value::Map(_) => return CoerceComplexTypeSnafu.fail(), Value::Null => return Ok(None), }; @@ -331,8 +336,7 @@ fn coerce_f64_value(n: f64, transform: &Transform) -> Result> } }, - Value::Array(_) => unimplemented!("Array type not supported"), - Value::Map(_) => unimplemented!("Object type not supported"), + Value::Array(_) | Value::Map(_) => return CoerceComplexTypeSnafu.fail(), Value::Null => return Ok(None), }; @@ -407,8 +411,7 @@ fn coerce_string_value(s: &String, transform: &Transform) -> Result CoerceUnsupportedEpochTypeSnafu { ty: "String" }.fail(), }, - Value::Array(_) => unimplemented!("Array type not supported"), - Value::Map(_) => unimplemented!("Object type not supported"), + Value::Array(_) | Value::Map(_) => CoerceComplexTypeSnafu.fail(), Value::Null => Ok(None), } diff --git a/src/pipeline/tests/epoch.rs b/src/pipeline/tests/epoch.rs index 35a2ab635c00..84662793b937 100644 --- a/src/pipeline/tests/epoch.rs +++ b/src/pipeline/tests/epoch.rs @@ -200,6 +200,8 @@ transform: #[test] fn test_default_wrong_resolution() { + // given a number, we have no ways to guess its resolution + // but we can convert resolution during transform phase let test_input = r#" { "input_s": "1722580862", @@ -209,28 +211,30 @@ fn test_default_wrong_resolution() { let pipeline_yaml = r#" processors: - epoch: - fields: - - input_s - - input_nano + field: input_s + resolution: s + - epoch: + field: input_nano + resolution: ns transform: - fields: - input_s - type: epoch, s + type: epoch, ms - fields: - input_nano - type: epoch, nano + type: epoch, ms "#; let expected_schema = vec![ common::make_column_schema( "input_s".to_string(), - ColumnDataType::TimestampSecond, + ColumnDataType::TimestampMillisecond, SemanticType::Field, ), common::make_column_schema( "input_nano".to_string(), - ColumnDataType::TimestampNanosecond, + ColumnDataType::TimestampMillisecond, SemanticType::Field, ), common::make_column_schema( @@ -242,14 +246,12 @@ transform: let output = common::parse_and_exec(test_input, pipeline_yaml); assert_eq!(output.schema, expected_schema); - // this is actually wrong - // TODO(shuiyisong): add check for type when converting epoch assert_eq!( output.rows[0].values[0].value_data, - Some(ValueData::TimestampMillisecondValue(1722580862)) + Some(ValueData::TimestampMillisecondValue(1722580862000)) ); assert_eq!( output.rows[0].values[1].value_data, - Some(ValueData::TimestampMillisecondValue(1722583122284583936)) + Some(ValueData::TimestampMillisecondValue(1722583122284)) ); } diff --git a/src/pipeline/tests/timestamp.rs b/src/pipeline/tests/timestamp.rs index cba68d6cb15c..85cf0643fcb5 100644 --- a/src/pipeline/tests/timestamp.rs +++ b/src/pipeline/tests/timestamp.rs @@ -318,6 +318,7 @@ transform: #[test] fn test_timestamp_default_wrong_resolution() { + // same as test_default_wrong_resolution from epoch tests let test_input = r#" { "input_s": "1722580862", @@ -327,28 +328,30 @@ fn test_timestamp_default_wrong_resolution() { let pipeline_yaml = r#" processors: - timestamp: - fields: - - input_s - - input_nano + field: input_s + resolution: s + - timestamp: + field: input_nano + resolution: ns transform: - fields: - input_s - type: timestamp, s + type: timestamp, ms - fields: - input_nano - type: timestamp, nano + type: timestamp, ms "#; let expected_schema = vec![ common::make_column_schema( "input_s".to_string(), - ColumnDataType::TimestampSecond, + ColumnDataType::TimestampMillisecond, SemanticType::Field, ), common::make_column_schema( "input_nano".to_string(), - ColumnDataType::TimestampNanosecond, + ColumnDataType::TimestampMillisecond, SemanticType::Field, ), common::make_column_schema( @@ -360,14 +363,12 @@ transform: let output = common::parse_and_exec(test_input, pipeline_yaml); assert_eq!(output.schema, expected_schema); - // this is actually wrong - // TODO(shuiyisong): add check for type when converting epoch assert_eq!( output.rows[0].values[0].value_data, - Some(ValueData::TimestampMillisecondValue(1722580862)) + Some(ValueData::TimestampMillisecondValue(1722580862000)) ); assert_eq!( output.rows[0].values[1].value_data, - Some(ValueData::TimestampMillisecondValue(1722583122284583936)) + Some(ValueData::TimestampMillisecondValue(1722583122284)) ); }