Skip to content

Commit

Permalink
Fixed the serialize deserialization of the indexing state to include
Browse files Browse the repository at this point in the history
shard lists.

Before this, there was no notion of shards and indexers were just
exposing the number of pipelines per source_uid.

With ingest v2, the list of shard is part of indexing tasks, and the
control plane wants to diff against it.

The serialization format goes
`[1,2][3]` to express two pipelines with respectively shard [1,2] and
`[3]`.

For a source that does not have notion of shards, like kafka, two
pipelines simply look as follows: `[][]`

Also took it as an opportunity to clean up the code
and increase the test coverage.

Closes #4174
  • Loading branch information
fulmicoton committed Nov 22, 2023
1 parent 3f18626 commit b50adf7
Show file tree
Hide file tree
Showing 4 changed files with 252 additions and 139 deletions.
261 changes: 246 additions & 15 deletions quickwit/quickwit-cluster/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,20 +349,7 @@ impl Cluster {
let chitchat = self.chitchat().await;
let mut chitchat_guard = chitchat.lock().await;
let node_state = chitchat_guard.self_node_state();
let mut current_indexing_tasks_keys: HashSet<String> = node_state
.iter_prefix(INDEXING_TASK_PREFIX)
.map(|(key, _)| key.to_string())
.collect();
for (indexing_task, indexing_tasks_group) in
indexing_tasks.iter().group_by(|&task| task).into_iter()
{
let key = format!("{INDEXING_TASK_PREFIX}{indexing_task}");
current_indexing_tasks_keys.remove(&key);
node_state.set(key, indexing_tasks_group.count().to_string());
}
for obsolete_task_key in current_indexing_tasks_keys {
node_state.mark_for_deletion(&obsolete_task_key);
}
set_indexing_tasks_in_node_state(indexing_tasks, node_state);
Ok(())
}

Expand Down Expand Up @@ -407,6 +394,137 @@ fn spawn_ready_members_task(
tokio::spawn(fut);
}

/// Parsed indexing tasks from the chitchat node state.
pub fn parse_indexing_tasks(node_state: &NodeState) -> Vec<IndexingTask> {
node_state
.iter_prefix(INDEXING_TASK_PREFIX)
.flat_map(|(key, versioned_value)| {
// We want to skip the tombstoned keys.
if versioned_value.tombstone.is_none() {
Some((key, versioned_value.value.as_str()))
} else {
None
}
})
.flat_map(|(indexing_task_key, pipeline_shard_str)| {
parse_indexing_task_key_value(indexing_task_key, pipeline_shard_str)
})
.collect()
}

/// Parses indexing task key into the IndexingTask.
/// Malformed keys and values are ignored, just warnings are emitted.
fn parse_indexing_task_key_value(
indexing_task_key: &str,
pipeline_shards_str: &str,
) -> Vec<IndexingTask> {
let Some(index_uid_source_id) = indexing_task_key.strip_prefix(INDEXING_TASK_PREFIX) else {
warn!(
indexing_task_key = indexing_task_key,
"indexing task must start by the prefix `{INDEXING_TASK_PREFIX}`"
);
return Vec::new();
};
let Some((index_uid_str, source_id_str)) = index_uid_source_id.rsplit_once(':') else {
warn!(index_uid_source_id=%index_uid_source_id, "invalid index task format, cannot find index_uid and source_id");
return Vec::new();
};
match deserialize_pipeline_shards(pipeline_shards_str) {
Ok(pipeline_shards) => pipeline_shards
.into_iter()
.map(|shard_ids| IndexingTask {
index_uid: index_uid_str.to_string(),
source_id: source_id_str.to_string(),
shard_ids,
})
.collect(),
Err(error) => {
warn!(error=%error, "failed to parse pipeline shard list");
Vec::new()
}
}
}

/// Writes the given indexing tasks in the given node state.
///
/// If previous indexing tasks were present in the node state, they are marked for deletion.
pub fn set_indexing_tasks_in_node_state(
indexing_tasks: &[IndexingTask],
node_state: &mut NodeState,
) {
let mut current_indexing_tasks_keys: HashSet<String> = node_state
.iter_prefix(INDEXING_TASK_PREFIX)
.map(|(key, _)| key.to_string())
.collect();
let mut indexing_tasks_grouped_by_source: HashMap<(&str, &str), Vec<&IndexingTask>> =
HashMap::new();
for indexing_task in indexing_tasks {
indexing_tasks_grouped_by_source
.entry((
indexing_task.index_uid.as_str(),
indexing_task.source_id.as_str(),
))
.or_default()
.push(indexing_task);
}
for ((index_uid, source_id), indexing_tasks) in indexing_tasks_grouped_by_source {
let shards_per_pipeline = indexing_tasks
.iter()
.map(|indexing_task| &indexing_task.shard_ids[..]);
let pipeline_shards_str: String = serialize_pipeline_shards(shards_per_pipeline);
let key = format!("{INDEXING_TASK_PREFIX}{index_uid}:{source_id}");
current_indexing_tasks_keys.remove(&key);
node_state.set(key, pipeline_shards_str);
}
for obsolete_task_key in current_indexing_tasks_keys {
node_state.mark_for_deletion(&obsolete_task_key);
}
}

/// Given a list of list of shards (one list per pipeline), serializes it as a string to be stored
/// as a value in the chitchat state.
///
/// The format is as follows `[1,2,3][4,5]`.
fn serialize_pipeline_shards<'a>(pipeline_shards: impl Iterator<Item = &'a [u64]>) -> String {
let mut pipeline_shards_str = String::new();
for shards in pipeline_shards {
pipeline_shards_str.push('[');
pipeline_shards_str.push_str(&shards.iter().join(","));
pipeline_shards_str.push(']');
}
pipeline_shards_str
}

/// Deserializes the list of shards from a string stored in the chitchat state, as
/// serialized by `serialize_pipeline_shards`.
///
/// This function will make sure the pipeline shard lists are sorted.
pub fn deserialize_pipeline_shards(pipeline_shards_str: &str) -> anyhow::Result<Vec<Vec<u64>>> {
if pipeline_shards_str.is_empty() {
return Ok(Vec::new());
}
let mut pipeline_shards: Vec<Vec<u64>> = Vec::new();
for single_pipeline_shard_str in pipeline_shards_str.split(']') {
if single_pipeline_shard_str.is_empty() {
continue;
}
let Some(comma_sep_shards_str) = single_pipeline_shard_str.strip_prefix('[') else {
anyhow::bail!("invalid pipeline shards string: `{pipeline_shards_str}`");
};
let mut shards: Vec<u64> = Vec::new();
if !comma_sep_shards_str.is_empty() {
for shard_str in comma_sep_shards_str.split(',') {
let shard_id: u64 = shard_str.parse::<u64>()?;
shards.push(shard_id);
}
}
shards.sort();
pipeline_shards.push(shards);
}
pipeline_shards.sort_by_key(|shards| shards.first().copied());
Ok(pipeline_shards)
}

async fn spawn_ready_nodes_change_stream_task(cluster: Cluster) {
let cluster_guard = cluster.inner.read().await;
let cluster_id = cluster_guard.cluster_id.clone();
Expand Down Expand Up @@ -983,7 +1101,7 @@ mod tests {
let mut chitchat_guard = chitchat_handle.lock().await;
chitchat_guard.self_node_state().set(
format!("{INDEXING_TASK_PREFIX}my_good_index:my_source:11111111111111111111111111"),
"2".to_string(),
"[][]".to_string(),
);
chitchat_guard.self_node_state().set(
format!("{INDEXING_TASK_PREFIX}my_bad_index:my_source:11111111111111111111111111"),
Expand Down Expand Up @@ -1080,4 +1198,117 @@ mod tests {

Ok(())
}

fn test_serialize_pipeline_shards_aux(pipeline_shards: &[&[u64]], expected_str: &str) {
let pipeline_shards_str = serialize_pipeline_shards(pipeline_shards.iter().map(|val| *val));
assert_eq!(pipeline_shards_str, expected_str);
let ser_deser_pipeline_shards = deserialize_pipeline_shards(&pipeline_shards_str).unwrap();
assert_eq!(pipeline_shards, ser_deser_pipeline_shards);
}

#[test]
fn test_serialize_pipeline_shards() {
test_serialize_pipeline_shards_aux(&[], "");
test_serialize_pipeline_shards_aux(&[&[]], "[]");
test_serialize_pipeline_shards_aux(&[&[1]], "[1]");
test_serialize_pipeline_shards_aux(&[&[1, 2]], "[1,2]");
test_serialize_pipeline_shards_aux(&[&[], &[1, 2]], "[][1,2]");
test_serialize_pipeline_shards_aux(&[&[], &[]], "[][]");
test_serialize_pipeline_shards_aux(&[&[1], &[3, 4, 5, 6]], "[1][3,4,5,6]");
}

#[test]
fn test_deserialize_pipeline_shards_sorts() {
assert_eq!(
deserialize_pipeline_shards("[2,1]").unwrap(),
vec![vec![1, 2]]
);
assert_eq!(
deserialize_pipeline_shards("[1][]").unwrap(),
vec![vec![], vec![1]]
);
assert_eq!(
deserialize_pipeline_shards("[3][2]").unwrap(),
vec![vec![2], vec![3]]
);
}

fn test_serialize_indexing_tasks_aux(
indexing_tasks: &[IndexingTask],
node_state: &mut NodeState,
) {
set_indexing_tasks_in_node_state(indexing_tasks, node_state);
let ser_deser_indexing_tasks = parse_indexing_tasks(node_state);
assert_eq!(indexing_tasks, ser_deser_indexing_tasks);
}

#[test]
fn test_serialize_indexing_tasks() {
let mut node_state = NodeState::default();
test_serialize_indexing_tasks_aux(&[], &mut node_state);
test_serialize_indexing_tasks_aux(
&[IndexingTask {
index_uid: "test:test1".to_string(),
source_id: "my-source1".to_string(),
shard_ids: vec![1, 2],
}],
&mut node_state,
);
// change in the set of shards
test_serialize_indexing_tasks_aux(
&[IndexingTask {
index_uid: "test:test1".to_string(),
source_id: "my-source1".to_string(),
shard_ids: vec![1, 2, 3],
}],
&mut node_state,
);
test_serialize_indexing_tasks_aux(
&[
IndexingTask {
index_uid: "test:test1".to_string(),
source_id: "my-source1".to_string(),
shard_ids: vec![1, 2],
},
IndexingTask {
index_uid: "test:test1".to_string(),
source_id: "my-source1".to_string(),
shard_ids: vec![3, 4],
},
],
&mut node_state,
);
// different index.
test_serialize_indexing_tasks_aux(
&[
IndexingTask {
index_uid: "test:test1".to_string(),
source_id: "my-source1".to_string(),
shard_ids: vec![1, 2],
},
IndexingTask {
index_uid: "test:test2".to_string(),
source_id: "my-source1".to_string(),
shard_ids: vec![3, 4],
},
],
&mut node_state,
);
// same index, different source.
test_serialize_indexing_tasks_aux(
&[
IndexingTask {
index_uid: "test:test1".to_string(),
source_id: "my-source1".to_string(),
shard_ids: vec![1, 2],
},
IndexingTask {
index_uid: "test:test1".to_string(),
source_id: "my-source2".to_string(),
shard_ids: vec![3, 4],
},
],
&mut node_state,
);
}
}
47 changes: 3 additions & 44 deletions quickwit/quickwit-cluster/src/member.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ use std::collections::HashSet;
use std::net::SocketAddr;
use std::str::FromStr;

use anyhow::{anyhow, Context};
use anyhow::Context;
use chitchat::{ChitchatId, NodeState};
use itertools::Itertools;
use quickwit_proto::indexing::{CpuCapacity, IndexingTask};
use quickwit_proto::types::NodeId;
use tracing::{error, warn};

use crate::cluster::parse_indexing_tasks;
use crate::{GenerationId, QuickwitService};

// Keys used to store member's data in chitchat state.
Expand Down Expand Up @@ -146,7 +146,7 @@ pub(crate) fn build_cluster_member(
parse_enabled_services_str(enabled_services_str, &chitchat_id.node_id)
})?;
let grpc_advertise_addr = node_state.grpc_advertise_addr()?;
let indexing_tasks = parse_indexing_tasks(node_state, &chitchat_id.node_id);
let indexing_tasks = parse_indexing_tasks(node_state);
let indexing_cpu_capacity = parse_indexing_cpu_capacity(node_state);
let member = ClusterMember {
node_id: chitchat_id.node_id.into(),
Expand All @@ -161,47 +161,6 @@ pub(crate) fn build_cluster_member(
Ok(member)
}

// Parses indexing task key into the IndexingTask.
fn parse_indexing_task_key(key: &str) -> anyhow::Result<IndexingTask> {
let reminder = key.strip_prefix(INDEXING_TASK_PREFIX).ok_or_else(|| {
anyhow!(
"indexing task must contain the delimiter character `:`: `{}`",
key
)
})?;
IndexingTask::try_from(reminder)
}

/// Parses indexing tasks serialized in keys formatted as
/// `INDEXING_TASK_PREFIX:index_id:index_incarnation:source_id`. Malformed keys and values are
/// ignored, just warnings are emitted.
pub(crate) fn parse_indexing_tasks(node_state: &NodeState, node_id: &str) -> Vec<IndexingTask> {
node_state
.iter_prefix(INDEXING_TASK_PREFIX)
.map(|(key, versioned_value)| {
let indexing_task = parse_indexing_task_key(key)?;
let num_tasks: usize = versioned_value.value.parse()?;
Ok((0..num_tasks).map(move |_| indexing_task.clone()))
})
.flatten_ok()
.filter_map(
|indexing_task_parsing_result: anyhow::Result<IndexingTask>| {
match indexing_task_parsing_result {
Ok(indexing_task) => Some(indexing_task),
Err(error) => {
warn!(
node_id=%node_id,
error=%error,
"Malformated indexing task key and value on node."
);
None
}
}
},
)
.collect()
}

fn parse_enabled_services_str(
enabled_services_str: &str,
node_id: &str,
Expand Down
12 changes: 3 additions & 9 deletions quickwit/quickwit-cluster/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ impl ClusterNode {
enabled_services: &[&str],
indexing_tasks: &[IndexingTask],
) -> Self {
use itertools::Itertools;
use quickwit_common::tower::make_channel;

use crate::member::{ENABLED_SERVICES_KEY, GRPC_ADVERTISE_ADDR_KEY, INDEXING_TASK_PREFIX};
use crate::cluster::set_indexing_tasks_in_node_state;
use crate::member::{ENABLED_SERVICES_KEY, GRPC_ADVERTISE_ADDR_KEY};

let gossip_advertise_addr = ([127, 0, 0, 1], port).into();
let grpc_advertise_addr = ([127, 0, 0, 1], port + 1).into();
Expand All @@ -79,13 +79,7 @@ impl ClusterNode {
let mut node_state = NodeState::default();
node_state.set(ENABLED_SERVICES_KEY, enabled_services.join(","));
node_state.set(GRPC_ADVERTISE_ADDR_KEY, grpc_advertise_addr.to_string());

for (indexing_task, indexing_tasks_group) in
indexing_tasks.iter().group_by(|&task| task).into_iter()
{
let key = format!("{INDEXING_TASK_PREFIX}{}", indexing_task);
node_state.set(key, indexing_tasks_group.count().to_string());
}
set_indexing_tasks_in_node_state(indexing_tasks, &mut node_state);
Self::try_new(chitchat_id, &node_state, channel, is_self_node).unwrap()
}

Expand Down
Loading

0 comments on commit b50adf7

Please sign in to comment.