Skip to content

Commit

Permalink
chore: modify the underlying data structure of the pipeline value map…
Browse files Browse the repository at this point in the history
… type from hashmap to btremap to keep key order
  • Loading branch information
paomian committed Sep 26, 2024
1 parent 0f78d35 commit b756f82
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 17 deletions.
3 changes: 3 additions & 0 deletions src/pipeline/src/etl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,16 +214,19 @@ where
match val {
Value::Map(map) => {
let mut search_from = 0;
// because of the key in the json map is ordered
for (payload_key, payload_value) in map.values.into_iter() {
if search_from >= self.required_keys.len() {
break;
}

// because of map key is ordered, required_keys is ordered too
if let Some(pos) = self.required_keys[search_from..]
.iter()
.position(|k| k == &payload_key)
{
result[search_from + pos] = payload_value;
// next search from is always after the current key
search_from += pos;
}
}
Expand Down
7 changes: 4 additions & 3 deletions src/pipeline/src/etl/processor/cmcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,8 @@ impl Processor for CmcdProcessor {

#[cfg(test)]
mod tests {
use ahash::HashMap;
use std::collections::BTreeMap;

use urlencoding::decode;

use super::{CmcdProcessorBuilder, CMCD_KEYS};
Expand Down Expand Up @@ -563,14 +564,14 @@ mod tests {
let values = vec
.into_iter()
.map(|(k, v)| (k.to_string(), v))
.collect::<HashMap<String, Value>>();
.collect::<BTreeMap<String, Value>>();
let expected = Map { values };

let actual = processor.parse(0, &decoded).unwrap();
let actual = actual
.into_iter()
.map(|(index, value)| (intermediate_keys[index].clone(), value))
.collect::<HashMap<String, Value>>();
.collect::<BTreeMap<String, Value>>();
let actual = Map { values: actual };
assert_eq!(actual, expected);
}
Expand Down
6 changes: 4 additions & 2 deletions src/pipeline/src/etl/processor/regex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,8 @@ impl Processor for RegexProcessor {
}
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;

use ahash::{HashMap, HashMapExt};
use itertools::Itertools;

Expand Down Expand Up @@ -475,14 +477,14 @@ ignore_missing: false"#;
.map(|k| k.to_string())
.collect_vec();
let processor = builder.build(&intermediate_keys).unwrap();
let mut result = HashMap::new();
let mut result = BTreeMap::new();
for (index, pattern) in processor.patterns.iter().enumerate() {
let r = processor
.process(&breadcrumbs_str, pattern, (0, index))
.unwrap()
.into_iter()
.map(|(k, v)| (intermediate_keys[k].clone(), v))
.collect::<HashMap<_, _>>();
.collect::<BTreeMap<_, _>>();
result.extend(r);
}
let map = Map { values: result };
Expand Down
7 changes: 4 additions & 3 deletions src/pipeline/src/etl/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ pub mod array;
pub mod map;
pub mod time;

use ahash::{HashMap, HashMapExt};
use std::collections::BTreeMap;

pub use array::Array;
use jsonb::{Number as JsonbNumber, Object as JsonbObject, Value as JsonbValue};
pub use map::Map;
Expand Down Expand Up @@ -289,7 +290,7 @@ impl TryFrom<serde_json::Value> for Value {
Ok(Value::Array(Array { values }))
}
serde_json::Value::Object(v) => {
let mut values = HashMap::with_capacity(v.len());
let mut values = BTreeMap::new();
for (k, v) in v {
values.insert(k, Value::try_from(v)?);
}
Expand Down Expand Up @@ -320,7 +321,7 @@ impl TryFrom<&yaml_rust::Yaml> for Value {
Ok(Value::Array(Array { values }))
}
yaml_rust::Yaml::Hash(v) => {
let mut values = HashMap::new();
let mut values = BTreeMap::new();
for (k, v) in v {
let key = k
.as_str()
Expand Down
16 changes: 11 additions & 5 deletions src/pipeline/src/etl/value/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use ahash::{HashMap, HashMapExt};
use std::collections::BTreeMap;

use ahash::HashMap;

use crate::etl::value::Value;

#[derive(Debug, Clone, PartialEq)]
pub struct Map {
pub values: HashMap<String, Value>,
pub values: BTreeMap<String, Value>,
}

impl Default for Map {
fn default() -> Self {
Self {
values: HashMap::with_capacity(30),
values: BTreeMap::default(),
}
}
}
Expand All @@ -47,12 +49,16 @@ impl Map {

impl From<HashMap<String, Value>> for Map {
fn from(values: HashMap<String, Value>) -> Self {
Map { values }
let mut map = Map::default();
for (k, v) in values.into_iter() {
map.insert(k, v);
}
map
}
}

impl std::ops::Deref for Map {
type Target = HashMap<String, Value>;
type Target = BTreeMap<String, Value>;

fn deref(&self) -> &Self::Target {
&self.values
Expand Down
7 changes: 3 additions & 4 deletions src/servers/src/otlp/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::collections::{BTreeMap, HashMap as StdHashMap};

use ahash::{HashMap, HashMapExt};
use api::v1::column_data_type_extension::TypeExt;
use api::v1::value::ValueData;
use api::v1::{
Expand Down Expand Up @@ -144,7 +143,7 @@ fn log_to_pipeline_value(
let log_attrs = PipelineValue::Map(Map {
values: key_value_to_map(log.attributes),
});
let mut map = HashMap::new();
let mut map = BTreeMap::new();
map.insert(
"Timestamp".to_string(),
PipelineValue::Uint64(log.time_unix_nano),
Expand Down Expand Up @@ -485,8 +484,8 @@ fn any_value_to_pipeline_value(value: any_value::Value) -> PipelineValue {
}

// convert otlp keyValue vec to map
fn key_value_to_map(key_values: Vec<KeyValue>) -> HashMap<String, PipelineValue> {
let mut map = HashMap::new();
fn key_value_to_map(key_values: Vec<KeyValue>) -> BTreeMap<String, PipelineValue> {
let mut map = BTreeMap::new();
for kv in key_values {
let value = match kv.value {
Some(value) => match value.value {
Expand Down

0 comments on commit b756f82

Please sign in to comment.