Skip to content

Commit

Permalink
Fixed the serialize deserialization of the indexing state to include (#…
Browse files Browse the repository at this point in the history
…4180)

* Fixed the serialize deserialization of the indexing state to include
shard lists.

Before this, there was no notion of shards and indexers were just
exposing the number of pipelines per source_uid.

With ingest v2, the list of shard is part of indexing tasks, and the
control plane wants to diff against it.

The serialization format goes
`[1,2][3]` to express two pipelines with respectively shard [1,2] and
`[3]`.

For a source that does not have notion of shards, like kafka, two
pipelines simply look as follows: `[][]`

Also took it as an opportunity to clean up the code
and increase the test coverage.

Related #4174

Co-authored-by: François Massot <[email protected]>
  • Loading branch information
fulmicoton and fmassot committed Nov 22, 2023
1 parent 4348ba8 commit 645658e
Show file tree
Hide file tree
Showing 4 changed files with 259 additions and 145 deletions.
270 changes: 253 additions & 17 deletions quickwit/quickwit-cluster/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ use tracing::{info, warn};
use crate::change::{compute_cluster_change_events, ClusterChange};
use crate::member::{
build_cluster_member, ClusterMember, NodeStateExt, ENABLED_SERVICES_KEY,
GRPC_ADVERTISE_ADDR_KEY, INDEXING_TASK_PREFIX, PIPELINE_METRICS_PREFIX, READINESS_KEY,
READINESS_VALUE_NOT_READY, READINESS_VALUE_READY,
GRPC_ADVERTISE_ADDR_KEY, PIPELINE_METRICS_PREFIX, READINESS_KEY, READINESS_VALUE_NOT_READY,
READINESS_VALUE_READY,
};
use crate::ClusterNode;

Expand All @@ -60,6 +60,10 @@ const MARKED_FOR_DELETION_GRACE_PERIOD: usize = if cfg!(any(test, feature = "tes
5_000 // ~ HEARTBEAT * 5_000 ~ 4 hours.
};

// An indexing task key is formatted as
// `{INDEXING_TASK_PREFIX}{INDEXING_TASK_SEPARATOR}{index_id}{INDEXING_TASK_SEPARATOR}{source_id}`.
const INDEXING_TASK_PREFIX: &str = "indexing_task:";

#[derive(Clone)]
pub struct Cluster {
cluster_id: String,
Expand Down Expand Up @@ -349,20 +353,7 @@ impl Cluster {
let chitchat = self.chitchat().await;
let mut chitchat_guard = chitchat.lock().await;
let node_state = chitchat_guard.self_node_state();
let mut current_indexing_tasks_keys: HashSet<String> = node_state
.iter_prefix(INDEXING_TASK_PREFIX)
.map(|(key, _)| key.to_string())
.collect();
for (indexing_task, indexing_tasks_group) in
indexing_tasks.iter().group_by(|&task| task).into_iter()
{
let key = format!("{INDEXING_TASK_PREFIX}{indexing_task}");
current_indexing_tasks_keys.remove(&key);
node_state.set(key, indexing_tasks_group.count().to_string());
}
for obsolete_task_key in current_indexing_tasks_keys {
node_state.mark_for_deletion(&obsolete_task_key);
}
set_indexing_tasks_in_node_state(indexing_tasks, node_state);
Ok(())
}

Expand Down Expand Up @@ -407,6 +398,138 @@ fn spawn_ready_members_task(
tokio::spawn(fut);
}

/// Parses indexing tasks from the chitchat node state.
pub fn parse_indexing_tasks(node_state: &NodeState) -> Vec<IndexingTask> {
node_state
.iter_prefix(INDEXING_TASK_PREFIX)
.flat_map(|(key, versioned_value)| {
// We want to skip the tombstoned keys.
if versioned_value.tombstone.is_none() {
Some((key, versioned_value.value.as_str()))
} else {
None
}
})
.flat_map(|(indexing_task_key, pipeline_shard_str)| {
parse_indexing_task_key_value(indexing_task_key, pipeline_shard_str)
})
.collect()
}

/// Parses indexing task key into the IndexingTask.
/// Malformed keys and values are ignored, just warnings are emitted.
fn parse_indexing_task_key_value(
indexing_task_key: &str,
pipeline_shards_str: &str,
) -> Vec<IndexingTask> {
let Some(index_uid_source_id) = indexing_task_key.strip_prefix(INDEXING_TASK_PREFIX) else {
warn!(
indexing_task_key = indexing_task_key,
"indexing task must start by the prefix `{INDEXING_TASK_PREFIX}`"
);
return Vec::new();
};
let Some((index_uid_str, source_id_str)) = index_uid_source_id.rsplit_once(':') else {
warn!(index_uid_source_id=%index_uid_source_id, "invalid index task format, cannot find index_uid and source_id");
return Vec::new();
};
match deserialize_pipeline_shards(pipeline_shards_str) {
Ok(pipeline_shards) => pipeline_shards
.into_iter()
.map(|shard_ids| IndexingTask {
index_uid: index_uid_str.to_string(),
source_id: source_id_str.to_string(),
shard_ids,
})
.collect(),
Err(error) => {
warn!(error=%error, "failed to parse pipeline shard list");
Vec::new()
}
}
}

/// Writes the given indexing tasks in the given node state.
///
/// If previous indexing tasks were present in the node state but were not in the given tasks, they
/// are marked for deletion.
pub fn set_indexing_tasks_in_node_state(
indexing_tasks: &[IndexingTask],
node_state: &mut NodeState,
) {
let mut current_indexing_tasks_keys: HashSet<String> = node_state
.iter_prefix(INDEXING_TASK_PREFIX)
.map(|(key, _)| key.to_string())
.collect();
let mut indexing_tasks_grouped_by_source: HashMap<(&str, &str), Vec<&IndexingTask>> =
HashMap::new();
for indexing_task in indexing_tasks {
indexing_tasks_grouped_by_source
.entry((
indexing_task.index_uid.as_str(),
indexing_task.source_id.as_str(),
))
.or_default()
.push(indexing_task);
}
for ((index_uid, source_id), indexing_tasks) in indexing_tasks_grouped_by_source {
let shards_per_pipeline = indexing_tasks
.iter()
.map(|indexing_task| &indexing_task.shard_ids[..]);
let pipeline_shards_str: String = serialize_pipeline_shards(shards_per_pipeline);
let key = format!("{INDEXING_TASK_PREFIX}{index_uid}:{source_id}");
current_indexing_tasks_keys.remove(&key);
node_state.set(key, pipeline_shards_str);
}
for obsolete_task_key in current_indexing_tasks_keys {
node_state.mark_for_deletion(&obsolete_task_key);
}
}

/// Given a list of list of shards (one list per pipeline), serializes it as a string to be stored
/// as a value in the chitchat state.
///
/// The format is as follows `[1,2,3][4,5]`.
fn serialize_pipeline_shards<'a>(pipeline_shards: impl Iterator<Item = &'a [u64]>) -> String {
let mut pipeline_shards_str = String::new();
for shards in pipeline_shards {
pipeline_shards_str.push('[');
pipeline_shards_str.push_str(&shards.iter().join(","));
pipeline_shards_str.push(']');
}
pipeline_shards_str
}

/// Deserializes the list of shards from a string stored in the chitchat state, as
/// serialized by `serialize_pipeline_shards`.
///
/// This function will make sure the pipeline shard lists are sorted.
pub fn deserialize_pipeline_shards(pipeline_shards_str: &str) -> anyhow::Result<Vec<Vec<u64>>> {
if pipeline_shards_str.is_empty() {
return Ok(Vec::new());
}
let mut pipeline_shards: Vec<Vec<u64>> = Vec::new();
for single_pipeline_shard_str in pipeline_shards_str.split(']') {
if single_pipeline_shard_str.is_empty() {
continue;
}
let Some(comma_sep_shards_str) = single_pipeline_shard_str.strip_prefix('[') else {
anyhow::bail!("invalid pipeline shards string: `{pipeline_shards_str}`");
};
let mut shards: Vec<u64> = Vec::new();
if !comma_sep_shards_str.is_empty() {
for shard_str in comma_sep_shards_str.split(',') {
let shard_id: u64 = shard_str.parse::<u64>()?;
shards.push(shard_id);
}
}
shards.sort();
pipeline_shards.push(shards);
}
pipeline_shards.sort_by_key(|shards| shards.first().copied());
Ok(pipeline_shards)
}

async fn spawn_ready_nodes_change_stream_task(cluster: Cluster) {
let cluster_guard = cluster.inner.read().await;
let cluster_id = cluster_guard.cluster_id.clone();
Expand Down Expand Up @@ -983,7 +1106,7 @@ mod tests {
let mut chitchat_guard = chitchat_handle.lock().await;
chitchat_guard.self_node_state().set(
format!("{INDEXING_TASK_PREFIX}my_good_index:my_source:11111111111111111111111111"),
"2".to_string(),
"[][]".to_string(),
);
chitchat_guard.self_node_state().set(
format!("{INDEXING_TASK_PREFIX}my_bad_index:my_source:11111111111111111111111111"),
Expand Down Expand Up @@ -1080,4 +1203,117 @@ mod tests {

Ok(())
}

fn test_serialize_pipeline_shards_aux(pipeline_shards: &[&[u64]], expected_str: &str) {
let pipeline_shards_str = serialize_pipeline_shards(pipeline_shards.iter().copied());
assert_eq!(pipeline_shards_str, expected_str);
let ser_deser_pipeline_shards = deserialize_pipeline_shards(&pipeline_shards_str).unwrap();
assert_eq!(pipeline_shards, ser_deser_pipeline_shards);
}

#[test]
fn test_serialize_pipeline_shards() {
test_serialize_pipeline_shards_aux(&[], "");
test_serialize_pipeline_shards_aux(&[&[]], "[]");
test_serialize_pipeline_shards_aux(&[&[1]], "[1]");
test_serialize_pipeline_shards_aux(&[&[1, 2]], "[1,2]");
test_serialize_pipeline_shards_aux(&[&[], &[1, 2]], "[][1,2]");
test_serialize_pipeline_shards_aux(&[&[], &[]], "[][]");
test_serialize_pipeline_shards_aux(&[&[1], &[3, 4, 5, 6]], "[1][3,4,5,6]");
}

#[test]
fn test_deserialize_pipeline_shards_sorts() {
assert_eq!(
deserialize_pipeline_shards("[2,1]").unwrap(),
vec![vec![1, 2]]
);
assert_eq!(
deserialize_pipeline_shards("[1][]").unwrap(),
vec![vec![], vec![1]]
);
assert_eq!(
deserialize_pipeline_shards("[3][2]").unwrap(),
vec![vec![2], vec![3]]
);
}

fn test_serialize_indexing_tasks_aux(
indexing_tasks: &[IndexingTask],
node_state: &mut NodeState,
) {
set_indexing_tasks_in_node_state(indexing_tasks, node_state);
let ser_deser_indexing_tasks = parse_indexing_tasks(node_state);
assert_eq!(indexing_tasks, ser_deser_indexing_tasks);
}

#[test]
fn test_serialize_indexing_tasks() {
let mut node_state = NodeState::default();
test_serialize_indexing_tasks_aux(&[], &mut node_state);
test_serialize_indexing_tasks_aux(
&[IndexingTask {
index_uid: "test:test1".to_string(),
source_id: "my-source1".to_string(),
shard_ids: vec![1, 2],
}],
&mut node_state,
);
// change in the set of shards
test_serialize_indexing_tasks_aux(
&[IndexingTask {
index_uid: "test:test1".to_string(),
source_id: "my-source1".to_string(),
shard_ids: vec![1, 2, 3],
}],
&mut node_state,
);
test_serialize_indexing_tasks_aux(
&[
IndexingTask {
index_uid: "test:test1".to_string(),
source_id: "my-source1".to_string(),
shard_ids: vec![1, 2],
},
IndexingTask {
index_uid: "test:test1".to_string(),
source_id: "my-source1".to_string(),
shard_ids: vec![3, 4],
},
],
&mut node_state,
);
// different index.
test_serialize_indexing_tasks_aux(
&[
IndexingTask {
index_uid: "test:test1".to_string(),
source_id: "my-source1".to_string(),
shard_ids: vec![1, 2],
},
IndexingTask {
index_uid: "test:test2".to_string(),
source_id: "my-source1".to_string(),
shard_ids: vec![3, 4],
},
],
&mut node_state,
);
// same index, different source.
test_serialize_indexing_tasks_aux(
&[
IndexingTask {
index_uid: "test:test1".to_string(),
source_id: "my-source1".to_string(),
shard_ids: vec![1, 2],
},
IndexingTask {
index_uid: "test:test1".to_string(),
source_id: "my-source2".to_string(),
shard_ids: vec![3, 4],
},
],
&mut node_state,
);
}
}
51 changes: 3 additions & 48 deletions quickwit/quickwit-cluster/src/member.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,20 @@ use std::collections::HashSet;
use std::net::SocketAddr;
use std::str::FromStr;

use anyhow::{anyhow, Context};
use anyhow::Context;
use chitchat::{ChitchatId, NodeState};
use itertools::Itertools;
use quickwit_proto::indexing::{CpuCapacity, IndexingTask};
use quickwit_proto::types::NodeId;
use tracing::{error, warn};

use crate::cluster::parse_indexing_tasks;
use crate::{GenerationId, QuickwitService};

// Keys used to store member's data in chitchat state.
pub(crate) const GRPC_ADVERTISE_ADDR_KEY: &str = "grpc_advertise_addr";
pub(crate) const ENABLED_SERVICES_KEY: &str = "enabled_services";
pub(crate) const PIPELINE_METRICS_PREFIX: &str = "pipeline_metrics:";

// An indexing task key is formatted as
// `{INDEXING_TASK_PREFIX}{INDEXING_TASK_SEPARATOR}{index_id}{INDEXING_TASK_SEPARATOR}{source_id}`.
pub(crate) const INDEXING_TASK_PREFIX: &str = "indexing_task:";

// Readiness key and values used to store node's readiness in Chitchat state.
pub(crate) const READINESS_KEY: &str = "readiness";
pub(crate) const READINESS_VALUE_READY: &str = "READY";
Expand Down Expand Up @@ -146,7 +142,7 @@ pub(crate) fn build_cluster_member(
parse_enabled_services_str(enabled_services_str, &chitchat_id.node_id)
})?;
let grpc_advertise_addr = node_state.grpc_advertise_addr()?;
let indexing_tasks = parse_indexing_tasks(node_state, &chitchat_id.node_id);
let indexing_tasks = parse_indexing_tasks(node_state);
let indexing_cpu_capacity = parse_indexing_cpu_capacity(node_state);
let member = ClusterMember {
node_id: chitchat_id.node_id.into(),
Expand All @@ -161,47 +157,6 @@ pub(crate) fn build_cluster_member(
Ok(member)
}

// Parses indexing task key into the IndexingTask.
fn parse_indexing_task_key(key: &str) -> anyhow::Result<IndexingTask> {
let reminder = key.strip_prefix(INDEXING_TASK_PREFIX).ok_or_else(|| {
anyhow!(
"indexing task must contain the delimiter character `:`: `{}`",
key
)
})?;
IndexingTask::try_from(reminder)
}

/// Parses indexing tasks serialized in keys formatted as
/// `INDEXING_TASK_PREFIX:index_id:index_incarnation:source_id`. Malformed keys and values are
/// ignored, just warnings are emitted.
pub(crate) fn parse_indexing_tasks(node_state: &NodeState, node_id: &str) -> Vec<IndexingTask> {
node_state
.iter_prefix(INDEXING_TASK_PREFIX)
.map(|(key, versioned_value)| {
let indexing_task = parse_indexing_task_key(key)?;
let num_tasks: usize = versioned_value.value.parse()?;
Ok((0..num_tasks).map(move |_| indexing_task.clone()))
})
.flatten_ok()
.filter_map(
|indexing_task_parsing_result: anyhow::Result<IndexingTask>| {
match indexing_task_parsing_result {
Ok(indexing_task) => Some(indexing_task),
Err(error) => {
warn!(
node_id=%node_id,
error=%error,
"Malformated indexing task key and value on node."
);
None
}
}
},
)
.collect()
}

fn parse_enabled_services_str(
enabled_services_str: &str,
node_id: &str,
Expand Down
Loading

0 comments on commit 645658e

Please sign in to comment.