Skip to content

Commit

Permalink
fix: allow ts conversion during transform phase
Browse files Browse the repository at this point in the history
  • Loading branch information
shuiyisong committed Sep 30, 2024
1 parent e39a9e6 commit 60f3a76
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 32 deletions.
27 changes: 17 additions & 10 deletions src/pipeline/src/etl/transform/transformer/greptime/coerce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,16 +176,23 @@ pub(crate) fn coerce_value(val: &Value, transform: &Transform) -> Result<Option<
Value::Boolean(b) => coerce_bool_value(*b, transform),
Value::String(s) => coerce_string_value(s, transform),

Value::Timestamp(Timestamp::Nanosecond(ns)) => {
Ok(Some(ValueData::TimestampNanosecondValue(*ns)))
}
Value::Timestamp(Timestamp::Microsecond(us)) => {
Ok(Some(ValueData::TimestampMicrosecondValue(*us)))
}
Value::Timestamp(Timestamp::Millisecond(ms)) => {
Ok(Some(ValueData::TimestampMillisecondValue(*ms)))
}
Value::Timestamp(Timestamp::Second(s)) => Ok(Some(ValueData::TimestampSecondValue(*s))),
Value::Timestamp(input_timestamp) => match &transform.type_ {
Value::Timestamp(target_timestamp) => match target_timestamp {
Timestamp::Nanosecond(_) => Ok(Some(ValueData::TimestampNanosecondValue(
input_timestamp.timestamp_nanos(),
))),
Timestamp::Microsecond(_) => Ok(Some(ValueData::TimestampMicrosecondValue(
input_timestamp.timestamp_micros(),
))),
Timestamp::Millisecond(_) => Ok(Some(ValueData::TimestampMillisecondValue(
input_timestamp.timestamp_millis(),
))),
Timestamp::Second(_) => Ok(Some(ValueData::TimestampSecondValue(
input_timestamp.timestamp(),
))),
},
_ => unimplemented!("Timestamp can only be coerced to another timestamp"),
},

Value::Array(_) => unimplemented!("Array type not supported"),
Value::Map(_) => unimplemented!("Object type not supported"),
Expand Down
24 changes: 13 additions & 11 deletions src/pipeline/tests/epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ transform:

#[test]
fn test_default_wrong_resolution() {
// given a number, we have no ways to guess its resolution
// but we can convert resolution during transform phase
let test_input = r#"
{
"input_s": "1722580862",
Expand All @@ -209,28 +211,30 @@ fn test_default_wrong_resolution() {
let pipeline_yaml = r#"
processors:
- epoch:
fields:
- input_s
- input_nano
field: input_s
resolution: s
- epoch:
field: input_nano
resolution: ns
transform:
- fields:
- input_s
type: epoch, s
type: epoch, ms
- fields:
- input_nano
type: epoch, nano
type: epoch, ms
"#;

let expected_schema = vec![
common::make_column_schema(
"input_s".to_string(),
ColumnDataType::TimestampSecond,
ColumnDataType::TimestampMillisecond,
SemanticType::Field,
),
common::make_column_schema(
"input_nano".to_string(),
ColumnDataType::TimestampNanosecond,
ColumnDataType::TimestampMillisecond,
SemanticType::Field,
),
common::make_column_schema(
Expand All @@ -242,14 +246,12 @@ transform:

let output = common::parse_and_exec(test_input, pipeline_yaml);
assert_eq!(output.schema, expected_schema);
// this is actually wrong
// TODO(shuiyisong): add check for type when converting epoch
assert_eq!(
output.rows[0].values[0].value_data,
Some(ValueData::TimestampMillisecondValue(1722580862))
Some(ValueData::TimestampMillisecondValue(1722580862000))
);
assert_eq!(
output.rows[0].values[1].value_data,
Some(ValueData::TimestampMillisecondValue(1722583122284583936))
Some(ValueData::TimestampMillisecondValue(1722583122284))
);
}
23 changes: 12 additions & 11 deletions src/pipeline/tests/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ transform:

#[test]
fn test_timestamp_default_wrong_resolution() {
// same as test_default_wrong_resolution from epoch tests
let test_input = r#"
{
"input_s": "1722580862",
Expand All @@ -327,28 +328,30 @@ fn test_timestamp_default_wrong_resolution() {
let pipeline_yaml = r#"
processors:
- timestamp:
fields:
- input_s
- input_nano
field: input_s
resolution: s
- timestamp:
field: input_nano
resolution: ns
transform:
- fields:
- input_s
type: timestamp, s
type: timestamp, ms
- fields:
- input_nano
type: timestamp, nano
type: timestamp, ms
"#;

let expected_schema = vec![
common::make_column_schema(
"input_s".to_string(),
ColumnDataType::TimestampSecond,
ColumnDataType::TimestampMillisecond,
SemanticType::Field,
),
common::make_column_schema(
"input_nano".to_string(),
ColumnDataType::TimestampNanosecond,
ColumnDataType::TimestampMillisecond,
SemanticType::Field,
),
common::make_column_schema(
Expand All @@ -360,14 +363,12 @@ transform:

let output = common::parse_and_exec(test_input, pipeline_yaml);
assert_eq!(output.schema, expected_schema);
// this is actually wrong
// TODO(shuiyisong): add check for type when converting epoch
assert_eq!(
output.rows[0].values[0].value_data,
Some(ValueData::TimestampMillisecondValue(1722580862))
Some(ValueData::TimestampMillisecondValue(1722580862000))
);
assert_eq!(
output.rows[0].values[1].value_data,
Some(ValueData::TimestampMillisecondValue(1722583122284583936))
Some(ValueData::TimestampMillisecondValue(1722583122284))
);
}

0 comments on commit 60f3a76

Please sign in to comment.