diff --git a/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs b/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs index 827613b02b60..35613d00958a 100644 --- a/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs +++ b/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs @@ -176,16 +176,23 @@ pub(crate) fn coerce_value(val: &Value, transform: &Transform) -> Result coerce_bool_value(*b, transform), Value::String(s) => coerce_string_value(s, transform), - Value::Timestamp(Timestamp::Nanosecond(ns)) => { - Ok(Some(ValueData::TimestampNanosecondValue(*ns))) - } - Value::Timestamp(Timestamp::Microsecond(us)) => { - Ok(Some(ValueData::TimestampMicrosecondValue(*us))) - } - Value::Timestamp(Timestamp::Millisecond(ms)) => { - Ok(Some(ValueData::TimestampMillisecondValue(*ms))) - } - Value::Timestamp(Timestamp::Second(s)) => Ok(Some(ValueData::TimestampSecondValue(*s))), + Value::Timestamp(input_timestamp) => match &transform.type_ { + Value::Timestamp(target_timestamp) => match target_timestamp { + Timestamp::Nanosecond(_) => Ok(Some(ValueData::TimestampNanosecondValue( + input_timestamp.timestamp_nanos(), + ))), + Timestamp::Microsecond(_) => Ok(Some(ValueData::TimestampMicrosecondValue( + input_timestamp.timestamp_micros(), + ))), + Timestamp::Millisecond(_) => Ok(Some(ValueData::TimestampMillisecondValue( + input_timestamp.timestamp_millis(), + ))), + Timestamp::Second(_) => Ok(Some(ValueData::TimestampSecondValue( + input_timestamp.timestamp(), + ))), + }, + _ => unimplemented!("Timestamp can only be coerced to another timestamp"), + }, Value::Array(_) => unimplemented!("Array type not supported"), Value::Map(_) => unimplemented!("Object type not supported"), diff --git a/src/pipeline/tests/epoch.rs b/src/pipeline/tests/epoch.rs index 35a2ab635c00..84662793b937 100644 --- a/src/pipeline/tests/epoch.rs +++ b/src/pipeline/tests/epoch.rs @@ -200,6 +200,8 @@ transform: #[test] fn test_default_wrong_resolution() { + // given a number, we have no ways to guess its resolution + // but we can convert resolution during transform phase let test_input = r#" { "input_s": "1722580862", @@ -209,28 +211,30 @@ fn test_default_wrong_resolution() { let pipeline_yaml = r#" processors: - epoch: - fields: - - input_s - - input_nano + field: input_s + resolution: s + - epoch: + field: input_nano + resolution: ns transform: - fields: - input_s - type: epoch, s + type: epoch, ms - fields: - input_nano - type: epoch, nano + type: epoch, ms "#; let expected_schema = vec![ common::make_column_schema( "input_s".to_string(), - ColumnDataType::TimestampSecond, + ColumnDataType::TimestampMillisecond, SemanticType::Field, ), common::make_column_schema( "input_nano".to_string(), - ColumnDataType::TimestampNanosecond, + ColumnDataType::TimestampMillisecond, SemanticType::Field, ), common::make_column_schema( @@ -242,14 +246,12 @@ transform: let output = common::parse_and_exec(test_input, pipeline_yaml); assert_eq!(output.schema, expected_schema); - // this is actually wrong - // TODO(shuiyisong): add check for type when converting epoch assert_eq!( output.rows[0].values[0].value_data, - Some(ValueData::TimestampMillisecondValue(1722580862)) + Some(ValueData::TimestampMillisecondValue(1722580862000)) ); assert_eq!( output.rows[0].values[1].value_data, - Some(ValueData::TimestampMillisecondValue(1722583122284583936)) + Some(ValueData::TimestampMillisecondValue(1722583122284)) ); } diff --git a/src/pipeline/tests/timestamp.rs b/src/pipeline/tests/timestamp.rs index cba68d6cb15c..85cf0643fcb5 100644 --- a/src/pipeline/tests/timestamp.rs +++ b/src/pipeline/tests/timestamp.rs @@ -318,6 +318,7 @@ transform: #[test] fn test_timestamp_default_wrong_resolution() { + // same as test_default_wrong_resolution from epoch tests let test_input = r#" { "input_s": "1722580862", @@ -327,28 +328,30 @@ fn test_timestamp_default_wrong_resolution() { let pipeline_yaml = r#" processors: - timestamp: - fields: - - input_s - - input_nano + field: input_s + resolution: s + - timestamp: + field: input_nano + resolution: ns transform: - fields: - input_s - type: timestamp, s + type: timestamp, ms - fields: - input_nano - type: timestamp, nano + type: timestamp, ms "#; let expected_schema = vec![ common::make_column_schema( "input_s".to_string(), - ColumnDataType::TimestampSecond, + ColumnDataType::TimestampMillisecond, SemanticType::Field, ), common::make_column_schema( "input_nano".to_string(), - ColumnDataType::TimestampNanosecond, + ColumnDataType::TimestampMillisecond, SemanticType::Field, ), common::make_column_schema( @@ -360,14 +363,12 @@ transform: let output = common::parse_and_exec(test_input, pipeline_yaml); assert_eq!(output.schema, expected_schema); - // this is actually wrong - // TODO(shuiyisong): add check for type when converting epoch assert_eq!( output.rows[0].values[0].value_data, - Some(ValueData::TimestampMillisecondValue(1722580862)) + Some(ValueData::TimestampMillisecondValue(1722580862000)) ); assert_eq!( output.rows[0].values[1].value_data, - Some(ValueData::TimestampMillisecondValue(1722583122284583936)) + Some(ValueData::TimestampMillisecondValue(1722583122284)) ); }