From b50adf73c19fddf7cee4ccacd785b5765f301bdb Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Wed, 22 Nov 2023 13:10:41 +0900 Subject: [PATCH] Fixed the serialize deserialization of the indexing state to include shard lists. Before this, there was no notion of shards and indexers were just exposing the number of pipelines per source_uid. With ingest v2, the list of shard is part of indexing tasks, and the control plane wants to diff against it. The serialization format goes `[1,2][3]` to express two pipelines with respectively shard [1,2] and `[3]`. For a source that does not have notion of shards, like kafka, two pipelines simply look as follows: `[][]` Also took it as an opportunity to clean up the code and increase the test coverage. Closes #4174 --- quickwit/quickwit-cluster/src/cluster.rs | 261 ++++++++++++++++++-- quickwit/quickwit-cluster/src/member.rs | 47 +--- quickwit/quickwit-cluster/src/node.rs | 12 +- quickwit/quickwit-proto/src/indexing/mod.rs | 71 ------ 4 files changed, 252 insertions(+), 139 deletions(-) diff --git a/quickwit/quickwit-cluster/src/cluster.rs b/quickwit/quickwit-cluster/src/cluster.rs index 2c10fc48b0b..96c9ab9d4ac 100644 --- a/quickwit/quickwit-cluster/src/cluster.rs +++ b/quickwit/quickwit-cluster/src/cluster.rs @@ -349,20 +349,7 @@ impl Cluster { let chitchat = self.chitchat().await; let mut chitchat_guard = chitchat.lock().await; let node_state = chitchat_guard.self_node_state(); - let mut current_indexing_tasks_keys: HashSet = node_state - .iter_prefix(INDEXING_TASK_PREFIX) - .map(|(key, _)| key.to_string()) - .collect(); - for (indexing_task, indexing_tasks_group) in - indexing_tasks.iter().group_by(|&task| task).into_iter() - { - let key = format!("{INDEXING_TASK_PREFIX}{indexing_task}"); - current_indexing_tasks_keys.remove(&key); - node_state.set(key, indexing_tasks_group.count().to_string()); - } - for obsolete_task_key in current_indexing_tasks_keys { - node_state.mark_for_deletion(&obsolete_task_key); - } + set_indexing_tasks_in_node_state(indexing_tasks, node_state); Ok(()) } @@ -407,6 +394,137 @@ fn spawn_ready_members_task( tokio::spawn(fut); } +/// Parsed indexing tasks from the chitchat node state. +pub fn parse_indexing_tasks(node_state: &NodeState) -> Vec { + node_state + .iter_prefix(INDEXING_TASK_PREFIX) + .flat_map(|(key, versioned_value)| { + // We want to skip the tombstoned keys. + if versioned_value.tombstone.is_none() { + Some((key, versioned_value.value.as_str())) + } else { + None + } + }) + .flat_map(|(indexing_task_key, pipeline_shard_str)| { + parse_indexing_task_key_value(indexing_task_key, pipeline_shard_str) + }) + .collect() +} + +/// Parses indexing task key into the IndexingTask. +/// Malformed keys and values are ignored, just warnings are emitted. +fn parse_indexing_task_key_value( + indexing_task_key: &str, + pipeline_shards_str: &str, +) -> Vec { + let Some(index_uid_source_id) = indexing_task_key.strip_prefix(INDEXING_TASK_PREFIX) else { + warn!( + indexing_task_key = indexing_task_key, + "indexing task must start by the prefix `{INDEXING_TASK_PREFIX}`" + ); + return Vec::new(); + }; + let Some((index_uid_str, source_id_str)) = index_uid_source_id.rsplit_once(':') else { + warn!(index_uid_source_id=%index_uid_source_id, "invalid index task format, cannot find index_uid and source_id"); + return Vec::new(); + }; + match deserialize_pipeline_shards(pipeline_shards_str) { + Ok(pipeline_shards) => pipeline_shards + .into_iter() + .map(|shard_ids| IndexingTask { + index_uid: index_uid_str.to_string(), + source_id: source_id_str.to_string(), + shard_ids, + }) + .collect(), + Err(error) => { + warn!(error=%error, "failed to parse pipeline shard list"); + Vec::new() + } + } +} + +/// Writes the given indexing tasks in the given node state. +/// +/// If previous indexing tasks were present in the node state, they are marked for deletion. +pub fn set_indexing_tasks_in_node_state( + indexing_tasks: &[IndexingTask], + node_state: &mut NodeState, +) { + let mut current_indexing_tasks_keys: HashSet = node_state + .iter_prefix(INDEXING_TASK_PREFIX) + .map(|(key, _)| key.to_string()) + .collect(); + let mut indexing_tasks_grouped_by_source: HashMap<(&str, &str), Vec<&IndexingTask>> = + HashMap::new(); + for indexing_task in indexing_tasks { + indexing_tasks_grouped_by_source + .entry(( + indexing_task.index_uid.as_str(), + indexing_task.source_id.as_str(), + )) + .or_default() + .push(indexing_task); + } + for ((index_uid, source_id), indexing_tasks) in indexing_tasks_grouped_by_source { + let shards_per_pipeline = indexing_tasks + .iter() + .map(|indexing_task| &indexing_task.shard_ids[..]); + let pipeline_shards_str: String = serialize_pipeline_shards(shards_per_pipeline); + let key = format!("{INDEXING_TASK_PREFIX}{index_uid}:{source_id}"); + current_indexing_tasks_keys.remove(&key); + node_state.set(key, pipeline_shards_str); + } + for obsolete_task_key in current_indexing_tasks_keys { + node_state.mark_for_deletion(&obsolete_task_key); + } +} + +/// Given a list of list of shards (one list per pipeline), serializes it as a string to be stored +/// as a value in the chitchat state. +/// +/// The format is as follows `[1,2,3][4,5]`. +fn serialize_pipeline_shards<'a>(pipeline_shards: impl Iterator) -> String { + let mut pipeline_shards_str = String::new(); + for shards in pipeline_shards { + pipeline_shards_str.push('['); + pipeline_shards_str.push_str(&shards.iter().join(",")); + pipeline_shards_str.push(']'); + } + pipeline_shards_str +} + +/// Deserializes the list of shards from a string stored in the chitchat state, as +/// serialized by `serialize_pipeline_shards`. +/// +/// This function will make sure the pipeline shard lists are sorted. +pub fn deserialize_pipeline_shards(pipeline_shards_str: &str) -> anyhow::Result>> { + if pipeline_shards_str.is_empty() { + return Ok(Vec::new()); + } + let mut pipeline_shards: Vec> = Vec::new(); + for single_pipeline_shard_str in pipeline_shards_str.split(']') { + if single_pipeline_shard_str.is_empty() { + continue; + } + let Some(comma_sep_shards_str) = single_pipeline_shard_str.strip_prefix('[') else { + anyhow::bail!("invalid pipeline shards string: `{pipeline_shards_str}`"); + }; + let mut shards: Vec = Vec::new(); + if !comma_sep_shards_str.is_empty() { + for shard_str in comma_sep_shards_str.split(',') { + let shard_id: u64 = shard_str.parse::()?; + shards.push(shard_id); + } + } + shards.sort(); + pipeline_shards.push(shards); + } + pipeline_shards.sort_by_key(|shards| shards.first().copied()); + Ok(pipeline_shards) +} + async fn spawn_ready_nodes_change_stream_task(cluster: Cluster) { let cluster_guard = cluster.inner.read().await; let cluster_id = cluster_guard.cluster_id.clone(); @@ -983,7 +1101,7 @@ mod tests { let mut chitchat_guard = chitchat_handle.lock().await; chitchat_guard.self_node_state().set( format!("{INDEXING_TASK_PREFIX}my_good_index:my_source:11111111111111111111111111"), - "2".to_string(), + "[][]".to_string(), ); chitchat_guard.self_node_state().set( format!("{INDEXING_TASK_PREFIX}my_bad_index:my_source:11111111111111111111111111"), @@ -1080,4 +1198,117 @@ mod tests { Ok(()) } + + fn test_serialize_pipeline_shards_aux(pipeline_shards: &[&[u64]], expected_str: &str) { + let pipeline_shards_str = serialize_pipeline_shards(pipeline_shards.iter().map(|val| *val)); + assert_eq!(pipeline_shards_str, expected_str); + let ser_deser_pipeline_shards = deserialize_pipeline_shards(&pipeline_shards_str).unwrap(); + assert_eq!(pipeline_shards, ser_deser_pipeline_shards); + } + + #[test] + fn test_serialize_pipeline_shards() { + test_serialize_pipeline_shards_aux(&[], ""); + test_serialize_pipeline_shards_aux(&[&[]], "[]"); + test_serialize_pipeline_shards_aux(&[&[1]], "[1]"); + test_serialize_pipeline_shards_aux(&[&[1, 2]], "[1,2]"); + test_serialize_pipeline_shards_aux(&[&[], &[1, 2]], "[][1,2]"); + test_serialize_pipeline_shards_aux(&[&[], &[]], "[][]"); + test_serialize_pipeline_shards_aux(&[&[1], &[3, 4, 5, 6]], "[1][3,4,5,6]"); + } + + #[test] + fn test_deserialize_pipeline_shards_sorts() { + assert_eq!( + deserialize_pipeline_shards("[2,1]").unwrap(), + vec![vec![1, 2]] + ); + assert_eq!( + deserialize_pipeline_shards("[1][]").unwrap(), + vec![vec![], vec![1]] + ); + assert_eq!( + deserialize_pipeline_shards("[3][2]").unwrap(), + vec![vec![2], vec![3]] + ); + } + + fn test_serialize_indexing_tasks_aux( + indexing_tasks: &[IndexingTask], + node_state: &mut NodeState, + ) { + set_indexing_tasks_in_node_state(indexing_tasks, node_state); + let ser_deser_indexing_tasks = parse_indexing_tasks(node_state); + assert_eq!(indexing_tasks, ser_deser_indexing_tasks); + } + + #[test] + fn test_serialize_indexing_tasks() { + let mut node_state = NodeState::default(); + test_serialize_indexing_tasks_aux(&[], &mut node_state); + test_serialize_indexing_tasks_aux( + &[IndexingTask { + index_uid: "test:test1".to_string(), + source_id: "my-source1".to_string(), + shard_ids: vec![1, 2], + }], + &mut node_state, + ); + // change in the set of shards + test_serialize_indexing_tasks_aux( + &[IndexingTask { + index_uid: "test:test1".to_string(), + source_id: "my-source1".to_string(), + shard_ids: vec![1, 2, 3], + }], + &mut node_state, + ); + test_serialize_indexing_tasks_aux( + &[ + IndexingTask { + index_uid: "test:test1".to_string(), + source_id: "my-source1".to_string(), + shard_ids: vec![1, 2], + }, + IndexingTask { + index_uid: "test:test1".to_string(), + source_id: "my-source1".to_string(), + shard_ids: vec![3, 4], + }, + ], + &mut node_state, + ); + // different index. + test_serialize_indexing_tasks_aux( + &[ + IndexingTask { + index_uid: "test:test1".to_string(), + source_id: "my-source1".to_string(), + shard_ids: vec![1, 2], + }, + IndexingTask { + index_uid: "test:test2".to_string(), + source_id: "my-source1".to_string(), + shard_ids: vec![3, 4], + }, + ], + &mut node_state, + ); + // same index, different source. + test_serialize_indexing_tasks_aux( + &[ + IndexingTask { + index_uid: "test:test1".to_string(), + source_id: "my-source1".to_string(), + shard_ids: vec![1, 2], + }, + IndexingTask { + index_uid: "test:test1".to_string(), + source_id: "my-source2".to_string(), + shard_ids: vec![3, 4], + }, + ], + &mut node_state, + ); + } } diff --git a/quickwit/quickwit-cluster/src/member.rs b/quickwit/quickwit-cluster/src/member.rs index da35296379b..daf2b560f85 100644 --- a/quickwit/quickwit-cluster/src/member.rs +++ b/quickwit/quickwit-cluster/src/member.rs @@ -21,13 +21,13 @@ use std::collections::HashSet; use std::net::SocketAddr; use std::str::FromStr; -use anyhow::{anyhow, Context}; +use anyhow::Context; use chitchat::{ChitchatId, NodeState}; -use itertools::Itertools; use quickwit_proto::indexing::{CpuCapacity, IndexingTask}; use quickwit_proto::types::NodeId; use tracing::{error, warn}; +use crate::cluster::parse_indexing_tasks; use crate::{GenerationId, QuickwitService}; // Keys used to store member's data in chitchat state. @@ -146,7 +146,7 @@ pub(crate) fn build_cluster_member( parse_enabled_services_str(enabled_services_str, &chitchat_id.node_id) })?; let grpc_advertise_addr = node_state.grpc_advertise_addr()?; - let indexing_tasks = parse_indexing_tasks(node_state, &chitchat_id.node_id); + let indexing_tasks = parse_indexing_tasks(node_state); let indexing_cpu_capacity = parse_indexing_cpu_capacity(node_state); let member = ClusterMember { node_id: chitchat_id.node_id.into(), @@ -161,47 +161,6 @@ pub(crate) fn build_cluster_member( Ok(member) } -// Parses indexing task key into the IndexingTask. -fn parse_indexing_task_key(key: &str) -> anyhow::Result { - let reminder = key.strip_prefix(INDEXING_TASK_PREFIX).ok_or_else(|| { - anyhow!( - "indexing task must contain the delimiter character `:`: `{}`", - key - ) - })?; - IndexingTask::try_from(reminder) -} - -/// Parses indexing tasks serialized in keys formatted as -/// `INDEXING_TASK_PREFIX:index_id:index_incarnation:source_id`. Malformed keys and values are -/// ignored, just warnings are emitted. -pub(crate) fn parse_indexing_tasks(node_state: &NodeState, node_id: &str) -> Vec { - node_state - .iter_prefix(INDEXING_TASK_PREFIX) - .map(|(key, versioned_value)| { - let indexing_task = parse_indexing_task_key(key)?; - let num_tasks: usize = versioned_value.value.parse()?; - Ok((0..num_tasks).map(move |_| indexing_task.clone())) - }) - .flatten_ok() - .filter_map( - |indexing_task_parsing_result: anyhow::Result| { - match indexing_task_parsing_result { - Ok(indexing_task) => Some(indexing_task), - Err(error) => { - warn!( - node_id=%node_id, - error=%error, - "Malformated indexing task key and value on node." - ); - None - } - } - }, - ) - .collect() -} - fn parse_enabled_services_str( enabled_services_str: &str, node_id: &str, diff --git a/quickwit/quickwit-cluster/src/node.rs b/quickwit/quickwit-cluster/src/node.rs index 0b7d39dc7ce..c3c9c7a2874 100644 --- a/quickwit/quickwit-cluster/src/node.rs +++ b/quickwit/quickwit-cluster/src/node.rs @@ -67,10 +67,10 @@ impl ClusterNode { enabled_services: &[&str], indexing_tasks: &[IndexingTask], ) -> Self { - use itertools::Itertools; use quickwit_common::tower::make_channel; - use crate::member::{ENABLED_SERVICES_KEY, GRPC_ADVERTISE_ADDR_KEY, INDEXING_TASK_PREFIX}; + use crate::cluster::set_indexing_tasks_in_node_state; + use crate::member::{ENABLED_SERVICES_KEY, GRPC_ADVERTISE_ADDR_KEY}; let gossip_advertise_addr = ([127, 0, 0, 1], port).into(); let grpc_advertise_addr = ([127, 0, 0, 1], port + 1).into(); @@ -79,13 +79,7 @@ impl ClusterNode { let mut node_state = NodeState::default(); node_state.set(ENABLED_SERVICES_KEY, enabled_services.join(",")); node_state.set(GRPC_ADVERTISE_ADDR_KEY, grpc_advertise_addr.to_string()); - - for (indexing_task, indexing_tasks_group) in - indexing_tasks.iter().group_by(|&task| task).into_iter() - { - let key = format!("{INDEXING_TASK_PREFIX}{}", indexing_task); - node_state.set(key, indexing_tasks_group.count().to_string()); - } + set_indexing_tasks_in_node_state(indexing_tasks, &mut node_state); Self::try_new(chitchat_id, &node_state, channel, is_self_node).unwrap() } diff --git a/quickwit/quickwit-proto/src/indexing/mod.rs b/quickwit/quickwit-proto/src/indexing/mod.rs index f55c4a83ebd..b98353306d9 100644 --- a/quickwit/quickwit-proto/src/indexing/mod.rs +++ b/quickwit/quickwit-proto/src/indexing/mod.rs @@ -174,40 +174,6 @@ impl Hash for IndexingTask { self.source_id.hash(state); } } - -impl TryFrom<&str> for IndexingTask { - type Error = anyhow::Error; - - fn try_from(index_task_str: &str) -> anyhow::Result { - let mut iter = index_task_str.rsplit(':'); - let source_id = iter.next().ok_or_else(|| { - anyhow!( - "invalid index task format, cannot find source_id in `{}`", - index_task_str - ) - })?; - let part1 = iter.next().ok_or_else(|| { - anyhow!( - "invalid index task format, cannot find index_id in `{}`", - index_task_str - ) - })?; - if let Some(part2) = iter.next() { - Ok(IndexingTask { - index_uid: format!("{part2}:{part1}"), - source_id: source_id.to_string(), - shard_ids: Vec::new(), - }) - } else { - Ok(IndexingTask { - index_uid: part1.to_string(), - source_id: source_id.to_string(), - shard_ids: Vec::new(), - }) - } - } -} - #[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, utoipa::ToSchema)] pub struct PipelineMetrics { pub cpu_millis: CpuCapacity, @@ -387,41 +353,4 @@ mod tests { assert_eq!(CpuCapacity::from_cpu_millis(2500).to_string(), "2500m"); assert_eq!(serde_json::to_string(&mcpu(2500)).unwrap(), "\"2500m\""); } - - #[test] - fn test_indexing_task_serialization() { - let original = IndexingTask { - index_uid: "test-index:123456".to_string(), - source_id: "test-source".to_string(), - shard_ids: Vec::new(), - }; - - let serialized = original.to_string(); - let deserialized: IndexingTask = serialized.as_str().try_into().unwrap(); - assert_eq!(original, deserialized); - } - - #[test] - fn test_indexing_task_serialization_bwc() { - assert_eq!( - IndexingTask::try_from("foo:bar").unwrap(), - IndexingTask { - index_uid: "foo".to_string(), - source_id: "bar".to_string(), - shard_ids: Vec::new(), - } - ); - } - - #[test] - fn test_indexing_task_serialization_errors() { - assert_eq!( - "invalid index task format, cannot find index_id in ``", - IndexingTask::try_from("").unwrap_err().to_string() - ); - assert_eq!( - "invalid index task format, cannot find index_id in `foo`", - IndexingTask::try_from("foo").unwrap_err().to_string() - ); - } }