Skip to content

Commit

Permalink
chore: add json path for pipeline (#4925)
Browse files Browse the repository at this point in the history
* chore: add json path for pipeline

* chore: change jsonpath lib verion

* chore: remove useless doc

* chore: fix json path test

* chore: fix pipeline json path test
  • Loading branch information
paomian authored Nov 6, 2024
1 parent cccd25d commit dfe8cf2
Show file tree
Hide file tree
Showing 7 changed files with 760 additions and 4 deletions.
16 changes: 15 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/pipeline/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ futures.workspace = true
greptime-proto.workspace = true
itertools.workspace = true
jsonb.workspace = true
jsonpath-rust = "0.7.3"
lazy_static.workspace = true
moka = { workspace = true, features = ["sync"] }
once_cell.workspace = true
Expand Down
12 changes: 12 additions & 0 deletions src/pipeline/src/etl/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,18 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Parse json path error"))]
JsonPathParse {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: jsonpath_rust::JsonPathParserError,
},
#[snafu(display("Json path result index not number"))]
JsonPathParseResultIndex {
#[snafu(implicit)]
location: Location,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down
9 changes: 9 additions & 0 deletions src/pipeline/src/etl/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub mod dissect;
pub mod epoch;
pub mod gsub;
pub mod join;
pub mod json_path;
pub mod letter;
pub mod regex;
pub mod timestamp;
Expand All @@ -34,6 +35,7 @@ use epoch::{EpochProcessor, EpochProcessorBuilder};
use gsub::{GsubProcessor, GsubProcessorBuilder};
use itertools::Itertools;
use join::{JoinProcessor, JoinProcessorBuilder};
use json_path::{JsonPathProcessor, JsonPathProcessorBuilder};
use letter::{LetterProcessor, LetterProcessorBuilder};
use regex::{RegexProcessor, RegexProcessorBuilder};
use snafu::{OptionExt, ResultExt};
Expand All @@ -56,6 +58,8 @@ const PATTERN_NAME: &str = "pattern";
const PATTERNS_NAME: &str = "patterns";
const SEPARATOR_NAME: &str = "separator";
const TARGET_FIELDS_NAME: &str = "target_fields";
const JSON_PATH_NAME: &str = "json_path";
const JSON_PATH_RESULT_INDEX_NAME: &str = "result_index";

// const IF_NAME: &str = "if";
// const IGNORE_FAILURE_NAME: &str = "ignore_failure";
Expand Down Expand Up @@ -94,6 +98,7 @@ pub enum ProcessorKind {
UrlEncoding(UrlEncodingProcessor),
Epoch(EpochProcessor),
Date(DateProcessor),
JsonPath(JsonPathProcessor),
}

/// ProcessorBuilder trait defines the interface for all processor builders
Expand Down Expand Up @@ -122,6 +127,7 @@ pub enum ProcessorBuilders {
UrlEncoding(UrlEncodingProcessorBuilder),
Epoch(EpochProcessorBuilder),
Date(DateProcessorBuilder),
JsonPath(JsonPathProcessorBuilder),
}

#[derive(Debug, Default)]
Expand Down Expand Up @@ -266,6 +272,9 @@ fn parse_processor(doc: &yaml_rust::Yaml) -> Result<ProcessorBuilders> {
urlencoding::PROCESSOR_URL_ENCODING => {
ProcessorBuilders::UrlEncoding(UrlEncodingProcessorBuilder::try_from(value)?)
}
json_path::PROCESSOR_JSON_PATH => {
ProcessorBuilders::JsonPath(json_path::JsonPathProcessorBuilder::try_from(value)?)
}
_ => return UnsupportedProcessorSnafu { processor: str_key }.fail(),
};

Expand Down
231 changes: 231 additions & 0 deletions src/pipeline/src/etl/processor/json_path.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use ahash::HashSet;
use jsonpath_rust::JsonPath;
use snafu::{OptionExt, ResultExt};

use super::{
yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, Processor, ProcessorBuilder,
FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME, JSON_PATH_NAME, JSON_PATH_RESULT_INDEX_NAME,
};
use crate::etl::error::{Error, Result};
use crate::etl::field::{Fields, OneInputOneOutputField};
use crate::etl::processor::ProcessorKind;
use crate::etl_error::{
JsonPathParseResultIndexSnafu, JsonPathParseSnafu, KeyMustBeStringSnafu,
ProcessorMissingFieldSnafu,
};
use crate::Value;

pub(crate) const PROCESSOR_JSON_PATH: &str = "json_path";

#[derive(Debug)]
pub struct JsonPathProcessorBuilder {
fields: Fields,
json_path: JsonPath<Value>,
ignore_missing: bool,
result_idex: Option<usize>,
}

impl JsonPathProcessorBuilder {
fn build(self, intermediate_keys: &[String]) -> Result<JsonPathProcessor> {
let mut real_fields = vec![];
for field in self.fields.into_iter() {
let input = OneInputOneOutputField::build(
JSON_PATH_NAME,
intermediate_keys,
field.input_field(),
field.target_or_input_field(),
)?;
real_fields.push(input);
}

Ok(JsonPathProcessor {
fields: real_fields,
json_path: self.json_path,
ignore_missing: self.ignore_missing,
result_idex: self.result_idex,
})
}
}

impl ProcessorBuilder for JsonPathProcessorBuilder {
fn output_keys(&self) -> HashSet<&str> {
self.fields
.iter()
.map(|f| f.target_or_input_field())
.collect()
}

fn input_keys(&self) -> HashSet<&str> {
self.fields.iter().map(|f| f.input_field()).collect()
}

fn build(self, intermediate_keys: &[String]) -> Result<ProcessorKind> {
self.build(intermediate_keys).map(ProcessorKind::JsonPath)
}
}

impl TryFrom<&yaml_rust::yaml::Hash> for JsonPathProcessorBuilder {
type Error = Error;

fn try_from(value: &yaml_rust::yaml::Hash) -> std::result::Result<Self, Self::Error> {
let mut fields = Fields::default();
let mut ignore_missing = false;
let mut json_path = None;
let mut result_idex = None;

for (k, v) in value.iter() {
let key = k
.as_str()
.with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
match key {
FIELD_NAME => {
fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
}
FIELDS_NAME => {
fields = yaml_new_fields(v, FIELDS_NAME)?;
}

IGNORE_MISSING_NAME => {
ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
}
JSON_PATH_RESULT_INDEX_NAME => {
result_idex = Some(v.as_i64().context(JsonPathParseResultIndexSnafu)? as usize);
}

JSON_PATH_NAME => {
let json_path_str = yaml_string(v, JSON_PATH_NAME)?;
json_path = Some(
JsonPath::try_from(json_path_str.as_str()).context(JsonPathParseSnafu)?,
);
}

_ => {}
}
}
if let Some(json_path) = json_path {
let processor = JsonPathProcessorBuilder {
fields,
json_path,
ignore_missing,
result_idex,
};

Ok(processor)
} else {
ProcessorMissingFieldSnafu {
processor: PROCESSOR_JSON_PATH,
field: JSON_PATH_NAME,
}
.fail()
}
}
}

#[derive(Debug)]
pub struct JsonPathProcessor {
fields: Vec<OneInputOneOutputField>,
json_path: JsonPath<Value>,
ignore_missing: bool,
result_idex: Option<usize>,
}

impl Default for JsonPathProcessor {
fn default() -> Self {
JsonPathProcessor {
fields: vec![],
json_path: JsonPath::try_from("$").unwrap(),
ignore_missing: false,
result_idex: None,
}
}
}

impl JsonPathProcessor {
fn process_field(&self, val: &Value) -> Result<Value> {
let processed = self.json_path.find(val);
match processed {
Value::Array(arr) => {
if let Some(index) = self.result_idex {
Ok(arr.get(index).cloned().unwrap_or(Value::Null))
} else {
Ok(Value::Array(arr))
}
}
v => Ok(v),
}
}
}

impl Processor for JsonPathProcessor {
fn kind(&self) -> &str {
PROCESSOR_JSON_PATH
}

fn ignore_missing(&self) -> bool {
self.ignore_missing
}

fn exec_mut(&self, val: &mut Vec<Value>) -> Result<()> {
for field in self.fields.iter() {
let index = field.input_index();
match val.get(index) {
Some(v) => {
let processed = self.process_field(v)?;

let output_index = field.output_index();
val[output_index] = processed;
}
None => {
if !self.ignore_missing {
return ProcessorMissingFieldSnafu {
processor: self.kind(),
field: field.input_name(),
}
.fail();
}
}
}
}
Ok(())
}
}

#[cfg(test)]
mod test {
use crate::Map;

#[test]
fn test_json_path() {
use super::*;
use crate::Value;

let json_path = JsonPath::try_from("$.hello").unwrap();
let processor = JsonPathProcessor {
json_path,
result_idex: Some(0),
..Default::default()
};

let result = processor
.process_field(&Value::Map(Map::one(
"hello",
Value::String("world".to_string()),
)))
.unwrap();
assert_eq!(result, Value::String("world".to_string()));
}
}
Loading

0 comments on commit dfe8cf2

Please sign in to comment.