Skip to content

Commit

Permalink
refactor: Change the error type in the pipeline crate from String to …
Browse files Browse the repository at this point in the history
…Error (#4763)

* chore: in process

* chore: change pipeline crate error type

* chore: improve event error

* chore: fix by pr comment

* chore: use snafu context replace ok_or_else

* refactor: update snafu usage

---------

Co-authored-by: Ning Sun <[email protected]>
  • Loading branch information
paomian and sunng87 authored Sep 25, 2024
1 parent 0274e75 commit 627a326
Show file tree
Hide file tree
Showing 26 changed files with 1,276 additions and 542 deletions.
8 changes: 4 additions & 4 deletions src/pipeline/benches/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
// limitations under the License.

use criterion::{black_box, criterion_group, criterion_main, Criterion};
use pipeline::{parse, Content, GreptimeTransformer, Pipeline};
use pipeline::{parse, Content, GreptimeTransformer, Pipeline, Result};
use serde_json::{Deserializer, Value};

fn processor_mut(
pipeline: &Pipeline<GreptimeTransformer>,
input_values: Vec<Value>,
) -> Result<Vec<greptime_proto::v1::Row>, String> {
) -> Result<Vec<greptime_proto::v1::Row>> {
let mut payload = pipeline.init_intermediate_state();
let mut result = Vec::with_capacity(input_values.len());

Expand All @@ -30,7 +30,7 @@ fn processor_mut(
pipeline.reset_intermediate_state(&mut payload);
}

Ok::<Vec<greptime_proto::v1::Row>, String>(result)
Ok(result)
}

fn prepare_pipeline() -> Pipeline<GreptimeTransformer> {
Expand Down Expand Up @@ -230,7 +230,7 @@ fn criterion_benchmark(c: &mut Criterion) {
let input_value_str = include_str!("./data.log");
let input_value = Deserializer::from_str(input_value_str)
.into_iter::<serde_json::Value>()
.collect::<Result<Vec<_>, _>>()
.collect::<std::result::Result<Vec<_>, _>>()
.unwrap();
let pipeline = prepare_pipeline();
let mut group = c.benchmark_group("pipeline");
Expand Down
30 changes: 14 additions & 16 deletions src/pipeline/src/etl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,24 @@

#![allow(dead_code)]

pub mod error;
pub mod field;
pub mod processor;
pub mod transform;
pub mod value;

use ahash::HashSet;
use common_telemetry::debug;
use error::{IntermediateKeyIndexSnafu, PrepareValueMustBeObjectSnafu, YamlLoadSnafu};
use itertools::Itertools;
use processor::{Processor, ProcessorBuilder, Processors};
use snafu::{OptionExt, ResultExt};
use transform::{TransformBuilders, Transformer, Transforms};
use value::Value;
use yaml_rust::YamlLoader;

use crate::etl::error::Result;

const DESCRIPTION: &str = "description";
const PROCESSORS: &str = "processors";
const TRANSFORM: &str = "transform";
Expand All @@ -37,13 +42,13 @@ pub enum Content {
Yaml(String),
}

pub fn parse<T>(input: &Content) -> Result<Pipeline<T>, String>
pub fn parse<T>(input: &Content) -> Result<Pipeline<T>>
where
T: Transformer,
{
match input {
Content::Yaml(str) => {
let docs = YamlLoader::load_from_str(str).map_err(|e| e.to_string())?;
let docs = YamlLoader::load_from_str(str).context(YamlLoadSnafu)?;

let doc = &docs[0];

Expand Down Expand Up @@ -124,7 +129,7 @@ where
.processor_builders
.into_iter()
.map(|builder| builder.build(&final_intermediate_keys))
.collect::<Result<Vec<_>, _>>()?;
.collect::<Result<Vec<_>>>()?;
let processors = Processors {
processors: processors_kind_list,
required_keys: processors_required_keys.clone(),
Expand All @@ -136,7 +141,7 @@ where
.builders
.into_iter()
.map(|builder| builder.build(&final_intermediate_keys, &output_keys))
.collect::<Result<Vec<_>, String>>()?;
.collect::<Result<Vec<_>>>()?;

let transformers = Transforms {
transforms: transfor_list,
Expand Down Expand Up @@ -197,15 +202,15 @@ impl<T> Pipeline<T>
where
T: Transformer,
{
pub fn exec_mut(&self, val: &mut Vec<Value>) -> Result<T::VecOutput, String> {
pub fn exec_mut(&self, val: &mut Vec<Value>) -> Result<T::VecOutput> {
for processor in self.processors.iter() {
processor.exec_mut(val)?;
}

self.transformer.transform_mut(val)
}

pub fn prepare(&self, val: serde_json::Value, result: &mut [Value]) -> Result<(), String> {
pub fn prepare(&self, val: serde_json::Value, result: &mut [Value]) -> Result<()> {
match val {
serde_json::Value::Object(map) => {
let mut search_from = 0;
Expand All @@ -230,7 +235,7 @@ where
result[0] = val.try_into()?;
}
_ => {
return Err("expect object".to_string());
return PrepareValueMustBeObjectSnafu.fail();
}
}
Ok(())
Expand Down Expand Up @@ -274,18 +279,11 @@ where
}
}

pub(crate) fn find_key_index(
intermediate_keys: &[String],
key: &str,
kind: &str,
) -> Result<usize, String> {
pub(crate) fn find_key_index(intermediate_keys: &[String], key: &str, kind: &str) -> Result<usize> {
intermediate_keys
.iter()
.position(|k| k == key)
.ok_or(format!(
"{} processor.{} not found in intermediate keys",
kind, key
))
.context(IntermediateKeyIndexSnafu { kind, key })
}

#[cfg(test)]
Expand Down
Loading

0 comments on commit 627a326

Please sign in to comment.