Skip to content

Commit

Permalink
Ensures the load of pipelines does not exceed 80%.
Browse files Browse the repository at this point in the history
The allocation attempts to match the previous
plan as much as possible.

It also includes some hysteresis effect to avoid
flip flapping between two N and N+1 pipelines.

Closes #4010
  • Loading branch information
fulmicoton committed Oct 26, 2023
1 parent dd8bc1d commit b493b59
Showing 1 changed file with 192 additions and 12 deletions.
204 changes: 192 additions & 12 deletions quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,25 @@ use crate::indexing_plan::PhysicalIndexingPlan;
use crate::indexing_scheduler::scheduling::scheduling_logic_model::{
SchedulingProblem, SchedulingSolution,
};
use crate::indexing_scheduler::PIPELINE_FULL_LOAD;
use crate::SourceUid;

/// If we have several pipelines below this threshold we
/// reduce the number of pipelines.
///
/// Note that even for 2 pipelines, this creates an hysteris effet.
///
/// Starting from a single pipeline.
/// An overall load above 80% is enough to trigger the creation of a
/// second pipeline.
///
/// Coming back to a single pipeline requires having a load per pipeline
/// of 30%. Which translates into an overall load of 60%.
const LOAD_PER_PIPELINE_LOW_THRESHOLD: u32 = PIPELINE_FULL_LOAD * 3 / 10;

/// That's 80% of a period
const MAX_LOAD_PER_PIPELINE: u32 = PIPELINE_FULL_LOAD * 8 / 10;

fn indexing_task(source_uid: SourceUid, shard_ids: Vec<ShardId>) -> IndexingTask {
IndexingTask {
index_uid: source_uid.index_uid.to_string(),
Expand Down Expand Up @@ -239,8 +256,100 @@ pub enum SourceToScheduleType {
IngestV1,
}

fn group_shards_into_pipelines(
source_uid: &SourceUid,
shard_ids: &[ShardId],
previous_indexing_tasks: &[IndexingTask],
load_per_shard: Load,
) -> Vec<IndexingTask> {
let num_shards = shard_ids.len() as u32;
if num_shards == 0 {
return Vec::new();
}
let max_num_shards_per_pipeline = MAX_LOAD_PER_PIPELINE / load_per_shard;

// We compute the number of pipelines we will create, cooking in some hysteresis effect here.
// We have two different threshold to increase and to decrease the number of pipelines.
let min_num_pipelines: u32 =
(num_shards + max_num_shards_per_pipeline - 1) / max_num_shards_per_pipeline;
let max_num_pipelines: u32 = (num_shards * load_per_shard) / LOAD_PER_PIPELINE_LOW_THRESHOLD;
let previous_num_pipelines = previous_indexing_tasks.len() as u32;
let num_pipelines: u32 = if previous_num_pipelines > min_num_pipelines {
previous_num_pipelines.min(max_num_pipelines)
} else {
min_num_pipelines as u32
};

let mut pipelines: Vec<Vec<ShardId>> = std::iter::repeat_with(Vec::new)
.take((previous_num_pipelines as usize).max(num_pipelines as usize))
.collect();

let mut unassigned_shard_ids: Vec<ShardId> = Vec::new();
let previous_pipeline_map: FnvHashMap<ShardId, usize> = previous_indexing_tasks
.iter()
.enumerate()
.flat_map(|(pipeline_ord, indexing_task)| {
indexing_task
.shard_ids
.iter()
.map(move |shard_id| (*shard_id, pipeline_ord))
})
.collect();

for &shard in shard_ids {
if let Some(pipeline_ord) = previous_pipeline_map.get(&shard).copied() {
// Whenever possible we allocate to the previous pipeline.
let best_pipeline_for_shard = &mut pipelines[pipeline_ord];
if best_pipeline_for_shard.len() < max_num_shards_per_pipeline as usize {
best_pipeline_for_shard.push(shard);
} else {
unassigned_shard_ids.push(shard);
}
} else {
unassigned_shard_ids.push(shard);
}
}

// If needed, let's remove some pipelines. We just remove the pipelines that have
// the least number of shards.
pipelines.sort_by_key(|shards| std::cmp::Reverse(shards.len()));
for removed_pipelines in pipelines.drain(num_pipelines as usize..) {
unassigned_shard_ids.extend(removed_pipelines);
}

// Now we need to allocate the unallocated shards.
// We just allocate them to the current pipeline that has the lowest load.
for shard in unassigned_shard_ids {
let best_pipeline_for_shard: &mut Vec<ShardId> = pipelines
.iter_mut()
.min_by_key(|shards| shards.len())
.unwrap();
best_pipeline_for_shard.push(shard);
}


let mut indexing_tasks: Vec<IndexingTask> = pipelines
.into_iter()
.map(|mut shard_ids| {
shard_ids.sort();
IndexingTask {
index_uid: source_uid.index_uid.to_string(),
source_id: source_uid.source_id.clone(),
shard_ids,
}
})
.collect();

indexing_tasks.sort_by_key(|indexing_task| indexing_task.shard_ids[0]);

indexing_tasks
}

/// This function takes a scheduling solution (which abstracts the notion of pipelines,
/// and shard ids) and builds a physical plan.
fn convert_scheduling_solution_to_physical_plan(
solution: SchedulingSolution,
solution: &SchedulingSolution,
problem: &SchedulingProblem,
id_to_ord_map: &IdToOrdMap,
sources: &[SourceToSchedule],
previous_plan_opt: Option<&PhysicalIndexingPlan>,
Expand Down Expand Up @@ -269,14 +378,27 @@ fn convert_scheduling_solution_to_physical_plan(
let shard_to_node_ord = previous_shard_to_node_map
.remove(&source_ord)
.unwrap_or_default();
let shard_ids_per_node =

let load_per_shard = problem.source_load_per_shard(source_ord);
let shard_ids_per_node: FnvHashMap<NodeOrd, Vec<ShardId>> =
spread_shards_optimally(shards, node_num_shards, shard_to_node_ord);

for (node_ord, shard_ids_for_node) in shard_ids_per_node {
let node_id = id_to_ord_map.indexer_id(node_ord);
let indexing_task =
indexing_task(source.source_uid.clone(), shard_ids_for_node);
physical_indexing_plan.add_indexing_task(node_id, indexing_task);
let indexing_tasks: &[IndexingTask] = previous_plan_opt
.and_then(|previous_plan| previous_plan.node(node_id))
.unwrap_or(&[]);
let indexing_tasks = group_shards_into_pipelines(
&source.source_uid,
&shard_ids_for_node,
indexing_tasks,
load_per_shard,
);
// let indexing_task =
// indexing_task(source.source_uid.clone(), shard_ids_for_node);
for indexing_task in indexing_tasks {
physical_indexing_plan.add_indexing_task(node_id, indexing_task);
}
}
}
SourceToScheduleType::NonSharded { .. } => {
Expand Down Expand Up @@ -352,7 +474,6 @@ pub fn build_physical_indexing_plan(
}

let mut problem = SchedulingProblem::with_node_maximum_load(indexer_max_loads);

for source in sources {
if let Some(source_id) = populate_problem(source, &mut problem) {
let source_ord = id_to_ord_map.add_source_uid(source.source_uid.clone());
Expand All @@ -378,7 +499,8 @@ pub fn build_physical_indexing_plan(

// Convert the new scheduling solution back to a physical plan.
convert_scheduling_solution_to_physical_plan(
new_solution,
&new_solution,
&problem,
&id_to_ord_map,
sources,
previous_plan_opt,
Expand All @@ -392,12 +514,10 @@ mod tests {
use std::sync::atomic::{AtomicUsize, Ordering};

use fnv::FnvHashMap;
use quickwit_proto::types::IndexUid;
use quickwit_proto::indexing::IndexingTask;
use quickwit_proto::types::{IndexUid, ShardId};

use super::{
build_physical_indexing_plan, indexing_task, spread_shards_optimally, SourceToSchedule,
SourceToScheduleType,
};
use super::{build_physical_indexing_plan, indexing_task, spread_shards_optimally, SourceToSchedule, SourceToScheduleType, group_shards_into_pipelines};
use crate::SourceUid;

#[test]
Expand Down Expand Up @@ -519,4 +639,64 @@ mod tests {
)
}
}


#[test]
fn test_group_shards_empty() {
let source_uid = source_id();
let indexing_tasks = group_shards_into_pipelines(&source_uid, &[], &[], 250);
assert!(indexing_tasks.is_empty());
}

fn make_indexing_tasks(source_uid: &SourceUid, shard_ids_grp: &[&[ShardId]]) -> Vec<IndexingTask> {
shard_ids_grp
.iter()
.copied()
.map(|shard_ids| {
IndexingTask {
index_uid: source_uid.index_uid.to_string(),
source_id: source_uid.source_id.clone(),
shard_ids: shard_ids.to_vec(),
}
})
.collect::<Vec<IndexingTask>>()
}

#[test]
fn test_group_shards_into_pipeline_simple() {
let source_uid = source_id();
let previous_indexing_tasks: Vec<IndexingTask> = make_indexing_tasks(&source_uid,&[&[1,2],&[3,4,5]]);
let indexing_tasks = group_shards_into_pipelines(&source_uid, &[0,1,3,4,5], &previous_indexing_tasks, 250);
assert_eq!(indexing_tasks.len(), 2);
assert_eq!(&indexing_tasks[0].shard_ids, &[0, 1]);
assert_eq!(&indexing_tasks[1].shard_ids, &[3, 4, 5]);
}
#[test]
fn test_group_shards_into_pipeline_hysteresis() {
let source_uid = source_id();
let previous_indexing_tasks: Vec<IndexingTask> = make_indexing_tasks(&source_uid,&[]);
let indexing_tasks_1 = group_shards_into_pipelines(&source_uid, &[0,1,2,3,4,5,6,7,8,9,10], &previous_indexing_tasks, 100);
assert_eq!(indexing_tasks_1.len(), 2);
assert_eq!(&indexing_tasks_1[0].shard_ids, &[0, 2, 4, 6, 8, 10]);
assert_eq!(&indexing_tasks_1[1].shard_ids, &[1, 3, 5, 7, 9]);
// With the same set of shards, an increase of load triggers the creation of a new task.
let indexing_tasks_2 = group_shards_into_pipelines(&source_uid, &[0,1,2,3,4,5,6,7,8,9,10], &indexing_tasks_1, 150);
assert_eq!(indexing_tasks_2.len(), 3);
assert_eq!(&indexing_tasks_2[0].shard_ids, &[0, 2, 4, 6, 8]);
assert_eq!(&indexing_tasks_2[1].shard_ids, &[1, 3, 5, 7, 9]);
assert_eq!(&indexing_tasks_2[2].shard_ids, &[10]);
// Now the load comes back to normal
// The hysteresis takes effect. We do not switch back to 2 pipelines.
let indexing_tasks_3 = group_shards_into_pipelines(&source_uid, &[0,1,2,3,4,5,6,7,8,9,10], &indexing_tasks_2, 100);
assert_eq!(indexing_tasks_3.len(), 3);
assert_eq!(&indexing_tasks_3[0].shard_ids, &[0, 2, 4, 6, 8]);
assert_eq!(&indexing_tasks_3[1].shard_ids, &[1, 3, 5, 7, 9]);
assert_eq!(&indexing_tasks_3[2].shard_ids, &[10]);
// Now a further lower load..
let indexing_tasks_4 = group_shards_into_pipelines(&source_uid, &[0,1,2,3,4,5,6,7,8,9,10], &indexing_tasks_3, 80);
assert_eq!(indexing_tasks_4.len(), 2);
assert_eq!(&indexing_tasks_4[0].shard_ids, &[0, 2, 4, 6, 8, 10]);
assert_eq!(&indexing_tasks_4[1].shard_ids, &[1, 3, 5, 7, 9]);
}

}

0 comments on commit b493b59

Please sign in to comment.