Skip to content

Commit

Permalink
fix: ts conversion during transform phase (#4790)
Browse files Browse the repository at this point in the history
* fix: allow ts conversion during transform phase

* chore: replace `unimplemented` with snafu
  • Loading branch information
shuiyisong authored Oct 8, 2024
1 parent 71a66d1 commit 5f0a83b
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 49 deletions.
11 changes: 11 additions & 0 deletions src/pipeline/src/etl/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,17 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("failed to coerce complex value, not supported"))]
CoerceComplexType {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("failed to coerce value: {msg}"))]
CoerceIncompatibleTypes {
msg: String,
#[snafu(implicit)]
location: Location,
},

#[snafu(display(
"Invalid resolution: '{resolution}'. Available resolutions: {valid_resolution}"
Expand Down
57 changes: 30 additions & 27 deletions src/pipeline/src/etl/transform/transformer/greptime/coerce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ use greptime_proto::v1::{ColumnDataType, ColumnSchema, SemanticType};
use snafu::ResultExt;

use crate::etl::error::{
CoerceStringToTypeSnafu, CoerceUnsupportedEpochTypeSnafu, CoerceUnsupportedNullTypeSnafu,
CoerceComplexTypeSnafu, CoerceIncompatibleTypesSnafu, CoerceStringToTypeSnafu,
CoerceUnsupportedEpochTypeSnafu, CoerceUnsupportedNullTypeSnafu,
CoerceUnsupportedNullTypeToSnafu, ColumnOptionsSnafu, Error, Result,
};
use crate::etl::transform::index::Index;
Expand Down Expand Up @@ -61,8 +62,7 @@ impl TryFrom<Value> for ValueData {
}
Value::Timestamp(Timestamp::Second(s)) => Ok(ValueData::TimestampSecondValue(s)),

Value::Array(_) => unimplemented!("Array type not supported"),
Value::Map(_) => unimplemented!("Object type not supported"),
Value::Array(_) | Value::Map(_) => CoerceComplexTypeSnafu.fail(),
}
}
}
Expand Down Expand Up @@ -134,8 +134,7 @@ fn coerce_type(transform: &Transform) -> Result<ColumnDataType> {
Value::Timestamp(Timestamp::Millisecond(_)) => Ok(ColumnDataType::TimestampMillisecond),
Value::Timestamp(Timestamp::Second(_)) => Ok(ColumnDataType::TimestampSecond),

Value::Array(_) => unimplemented!("Array"),
Value::Map(_) => unimplemented!("Object"),
Value::Array(_) | Value::Map(_) => CoerceComplexTypeSnafu.fail(),

Value::Null => CoerceUnsupportedNullTypeToSnafu {
ty: transform.type_.to_str_type(),
Expand Down Expand Up @@ -176,19 +175,28 @@ pub(crate) fn coerce_value(val: &Value, transform: &Transform) -> Result<Option<
Value::Boolean(b) => coerce_bool_value(*b, transform),
Value::String(s) => coerce_string_value(s, transform),

Value::Timestamp(Timestamp::Nanosecond(ns)) => {
Ok(Some(ValueData::TimestampNanosecondValue(*ns)))
}
Value::Timestamp(Timestamp::Microsecond(us)) => {
Ok(Some(ValueData::TimestampMicrosecondValue(*us)))
}
Value::Timestamp(Timestamp::Millisecond(ms)) => {
Ok(Some(ValueData::TimestampMillisecondValue(*ms)))
}
Value::Timestamp(Timestamp::Second(s)) => Ok(Some(ValueData::TimestampSecondValue(*s))),
Value::Timestamp(input_timestamp) => match &transform.type_ {
Value::Timestamp(target_timestamp) => match target_timestamp {
Timestamp::Nanosecond(_) => Ok(Some(ValueData::TimestampNanosecondValue(
input_timestamp.timestamp_nanos(),
))),
Timestamp::Microsecond(_) => Ok(Some(ValueData::TimestampMicrosecondValue(
input_timestamp.timestamp_micros(),
))),
Timestamp::Millisecond(_) => Ok(Some(ValueData::TimestampMillisecondValue(
input_timestamp.timestamp_millis(),
))),
Timestamp::Second(_) => Ok(Some(ValueData::TimestampSecondValue(
input_timestamp.timestamp(),
))),
},
_ => CoerceIncompatibleTypesSnafu {
msg: "Timestamp can only be coerced to another timestamp",
}
.fail(),
},

Value::Array(_) => unimplemented!("Array type not supported"),
Value::Map(_) => unimplemented!("Object type not supported"),
Value::Array(_) | Value::Map(_) => CoerceComplexTypeSnafu.fail(),
}
}

Expand Down Expand Up @@ -220,8 +228,7 @@ fn coerce_bool_value(b: bool, transform: &Transform) -> Result<Option<ValueData>
}
},

Value::Array(_) => unimplemented!("Array type not supported"),
Value::Map(_) => unimplemented!("Object type not supported"),
Value::Array(_) | Value::Map(_) => return CoerceComplexTypeSnafu.fail(),

Value::Null => return Ok(None),
};
Expand Down Expand Up @@ -257,8 +264,7 @@ fn coerce_i64_value(n: i64, transform: &Transform) -> Result<Option<ValueData>>
}
},

Value::Array(_) => unimplemented!("Array type not supported"),
Value::Map(_) => unimplemented!("Object type not supported"),
Value::Array(_) | Value::Map(_) => return CoerceComplexTypeSnafu.fail(),

Value::Null => return Ok(None),
};
Expand Down Expand Up @@ -294,8 +300,7 @@ fn coerce_u64_value(n: u64, transform: &Transform) -> Result<Option<ValueData>>
}
},

Value::Array(_) => unimplemented!("Array type not supported"),
Value::Map(_) => unimplemented!("Object type not supported"),
Value::Array(_) | Value::Map(_) => return CoerceComplexTypeSnafu.fail(),

Value::Null => return Ok(None),
};
Expand Down Expand Up @@ -331,8 +336,7 @@ fn coerce_f64_value(n: f64, transform: &Transform) -> Result<Option<ValueData>>
}
},

Value::Array(_) => unimplemented!("Array type not supported"),
Value::Map(_) => unimplemented!("Object type not supported"),
Value::Array(_) | Value::Map(_) => return CoerceComplexTypeSnafu.fail(),

Value::Null => return Ok(None),
};
Expand Down Expand Up @@ -407,8 +411,7 @@ fn coerce_string_value(s: &String, transform: &Transform) -> Result<Option<Value
None => CoerceUnsupportedEpochTypeSnafu { ty: "String" }.fail(),
},

Value::Array(_) => unimplemented!("Array type not supported"),
Value::Map(_) => unimplemented!("Object type not supported"),
Value::Array(_) | Value::Map(_) => CoerceComplexTypeSnafu.fail(),

Value::Null => Ok(None),
}
Expand Down
24 changes: 13 additions & 11 deletions src/pipeline/tests/epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ transform:

#[test]
fn test_default_wrong_resolution() {
// given a number, we have no ways to guess its resolution
// but we can convert resolution during transform phase
let test_input = r#"
{
"input_s": "1722580862",
Expand All @@ -209,28 +211,30 @@ fn test_default_wrong_resolution() {
let pipeline_yaml = r#"
processors:
- epoch:
fields:
- input_s
- input_nano
field: input_s
resolution: s
- epoch:
field: input_nano
resolution: ns
transform:
- fields:
- input_s
type: epoch, s
type: epoch, ms
- fields:
- input_nano
type: epoch, nano
type: epoch, ms
"#;

let expected_schema = vec![
common::make_column_schema(
"input_s".to_string(),
ColumnDataType::TimestampSecond,
ColumnDataType::TimestampMillisecond,
SemanticType::Field,
),
common::make_column_schema(
"input_nano".to_string(),
ColumnDataType::TimestampNanosecond,
ColumnDataType::TimestampMillisecond,
SemanticType::Field,
),
common::make_column_schema(
Expand All @@ -242,14 +246,12 @@ transform:

let output = common::parse_and_exec(test_input, pipeline_yaml);
assert_eq!(output.schema, expected_schema);
// this is actually wrong
// TODO(shuiyisong): add check for type when converting epoch
assert_eq!(
output.rows[0].values[0].value_data,
Some(ValueData::TimestampMillisecondValue(1722580862))
Some(ValueData::TimestampMillisecondValue(1722580862000))
);
assert_eq!(
output.rows[0].values[1].value_data,
Some(ValueData::TimestampMillisecondValue(1722583122284583936))
Some(ValueData::TimestampMillisecondValue(1722583122284))
);
}
23 changes: 12 additions & 11 deletions src/pipeline/tests/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ transform:

#[test]
fn test_timestamp_default_wrong_resolution() {
// same as test_default_wrong_resolution from epoch tests
let test_input = r#"
{
"input_s": "1722580862",
Expand All @@ -327,28 +328,30 @@ fn test_timestamp_default_wrong_resolution() {
let pipeline_yaml = r#"
processors:
- timestamp:
fields:
- input_s
- input_nano
field: input_s
resolution: s
- timestamp:
field: input_nano
resolution: ns
transform:
- fields:
- input_s
type: timestamp, s
type: timestamp, ms
- fields:
- input_nano
type: timestamp, nano
type: timestamp, ms
"#;

let expected_schema = vec![
common::make_column_schema(
"input_s".to_string(),
ColumnDataType::TimestampSecond,
ColumnDataType::TimestampMillisecond,
SemanticType::Field,
),
common::make_column_schema(
"input_nano".to_string(),
ColumnDataType::TimestampNanosecond,
ColumnDataType::TimestampMillisecond,
SemanticType::Field,
),
common::make_column_schema(
Expand All @@ -360,14 +363,12 @@ transform:

let output = common::parse_and_exec(test_input, pipeline_yaml);
assert_eq!(output.schema, expected_schema);
// this is actually wrong
// TODO(shuiyisong): add check for type when converting epoch
assert_eq!(
output.rows[0].values[0].value_data,
Some(ValueData::TimestampMillisecondValue(1722580862))
Some(ValueData::TimestampMillisecondValue(1722580862000))
);
assert_eq!(
output.rows[0].values[1].value_data,
Some(ValueData::TimestampMillisecondValue(1722583122284583936))
Some(ValueData::TimestampMillisecondValue(1722583122284))
);
}

0 comments on commit 5f0a83b

Please sign in to comment.