Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Change the error type in the pipeline crate from String to Error #4763

Merged
merged 6 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/pipeline/benches/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
// limitations under the License.

use criterion::{black_box, criterion_group, criterion_main, Criterion};
use pipeline::{parse, Content, GreptimeTransformer, Pipeline};
use pipeline::{parse, Content, GreptimeTransformer, Pipeline, Result};
use serde_json::{Deserializer, Value};

fn processor_mut(
pipeline: &Pipeline<GreptimeTransformer>,
input_values: Vec<Value>,
) -> Result<Vec<greptime_proto::v1::Row>, String> {
) -> Result<Vec<greptime_proto::v1::Row>> {
let mut payload = pipeline.init_intermediate_state();
let mut result = Vec::with_capacity(input_values.len());

Expand All @@ -30,7 +30,7 @@ fn processor_mut(
pipeline.reset_intermediate_state(&mut payload);
}

Ok::<Vec<greptime_proto::v1::Row>, String>(result)
Ok(result)
}

fn prepare_pipeline() -> Pipeline<GreptimeTransformer> {
Expand Down Expand Up @@ -230,7 +230,7 @@ fn criterion_benchmark(c: &mut Criterion) {
let input_value_str = include_str!("./data.log");
let input_value = Deserializer::from_str(input_value_str)
.into_iter::<serde_json::Value>()
.collect::<Result<Vec<_>, _>>()
.collect::<std::result::Result<Vec<_>, _>>()
.unwrap();
let pipeline = prepare_pipeline();
let mut group = c.benchmark_group("pipeline");
Expand Down
30 changes: 14 additions & 16 deletions src/pipeline/src/etl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,24 @@

#![allow(dead_code)]

pub mod error;
pub mod field;
pub mod processor;
pub mod transform;
pub mod value;

use ahash::HashSet;
use common_telemetry::debug;
use error::{IntermediateKeyIndexSnafu, PrepareValueMustBeObjectSnafu, YamlLoadSnafu};
use itertools::Itertools;
use processor::{Processor, ProcessorBuilder, Processors};
use snafu::{OptionExt, ResultExt};
use transform::{TransformBuilders, Transformer, Transforms};
use value::Value;
use yaml_rust::YamlLoader;

use crate::etl::error::Result;

const DESCRIPTION: &str = "description";
const PROCESSORS: &str = "processors";
const TRANSFORM: &str = "transform";
Expand All @@ -37,13 +42,13 @@ pub enum Content {
Yaml(String),
}

pub fn parse<T>(input: &Content) -> Result<Pipeline<T>, String>
pub fn parse<T>(input: &Content) -> Result<Pipeline<T>>
where
T: Transformer,
{
match input {
Content::Yaml(str) => {
let docs = YamlLoader::load_from_str(str).map_err(|e| e.to_string())?;
let docs = YamlLoader::load_from_str(str).context(YamlLoadSnafu)?;

let doc = &docs[0];

Expand Down Expand Up @@ -124,7 +129,7 @@ where
.processor_builders
.into_iter()
.map(|builder| builder.build(&final_intermediate_keys))
.collect::<Result<Vec<_>, _>>()?;
.collect::<Result<Vec<_>>>()?;
let processors = Processors {
processors: processors_kind_list,
required_keys: processors_required_keys.clone(),
Expand All @@ -136,7 +141,7 @@ where
.builders
.into_iter()
.map(|builder| builder.build(&final_intermediate_keys, &output_keys))
.collect::<Result<Vec<_>, String>>()?;
.collect::<Result<Vec<_>>>()?;

let transformers = Transforms {
transforms: transfor_list,
Expand Down Expand Up @@ -197,15 +202,15 @@ impl<T> Pipeline<T>
where
T: Transformer,
{
pub fn exec_mut(&self, val: &mut Vec<Value>) -> Result<T::VecOutput, String> {
pub fn exec_mut(&self, val: &mut Vec<Value>) -> Result<T::VecOutput> {
for processor in self.processors.iter() {
processor.exec_mut(val)?;
}

self.transformer.transform_mut(val)
}

pub fn prepare(&self, val: serde_json::Value, result: &mut [Value]) -> Result<(), String> {
pub fn prepare(&self, val: serde_json::Value, result: &mut [Value]) -> Result<()> {
match val {
serde_json::Value::Object(map) => {
let mut search_from = 0;
Expand All @@ -230,7 +235,7 @@ where
result[0] = val.try_into()?;
}
_ => {
return Err("expect object".to_string());
return PrepareValueMustBeObjectSnafu.fail();
}
}
Ok(())
Expand Down Expand Up @@ -274,18 +279,11 @@ where
}
}

pub(crate) fn find_key_index(
intermediate_keys: &[String],
key: &str,
kind: &str,
) -> Result<usize, String> {
pub(crate) fn find_key_index(intermediate_keys: &[String], key: &str, kind: &str) -> Result<usize> {
intermediate_keys
.iter()
.position(|k| k == key)
.ok_or(format!(
"{} processor.{} not found in intermediate keys",
kind, key
))
.context(IntermediateKeyIndexSnafu { kind, key })
}

#[cfg(test)]
Expand Down
Loading