Skip to content

Commit

Permalink
Truncate ingester queues on assign shards (#3850)
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored Sep 26, 2023
1 parent b87d82e commit ade57d3
Showing 1 changed file with 107 additions and 49 deletions.
156 changes: 107 additions & 49 deletions quickwit/quickwit-indexing/src/source/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,59 @@ impl IngestSource {
assigned_shard.current_position_inclusive = to_position_inclusive;
Ok(())
}

async fn truncate(&self, truncation_point: &[(ShardId, Position)]) {
let mut per_leader_truncate_subrequests: HashMap<&NodeId, Vec<TruncateSubrequest>> =
HashMap::new();

for (shard_id, truncate_position) in truncation_point {
let Some(leader_id) = self
.assigned_shards
.get(shard_id)
.map(|shard| &shard.leader_id)
else {
warn!(
"failed to truncate shard: shard `{}` is no longer assigned",
shard_id
);
continue;
};
let to_position_inclusive = truncate_position
.as_u64()
.expect("position should be a u64");

let truncate_subrequest = TruncateSubrequest {
index_uid: self.client_id.index_uid.clone().into(),
source_id: self.client_id.source_id.clone(),
shard_id: *shard_id,
to_position_inclusive,
};
per_leader_truncate_subrequests
.entry(leader_id)
.or_default()
.push(truncate_subrequest);
}
for (leader_id, truncate_subrequests) in per_leader_truncate_subrequests {
let Some(mut ingester) = self.ingester_pool.get(leader_id).await else {
warn!(
"failed to truncate shard: ingester `{}` is unavailable",
leader_id
);
continue;
};
let truncate_request = TruncateRequest {
leader_id: leader_id.clone().into(),
subrequests: truncate_subrequests,
};
let truncate_future = async move {
if let Err(error) = ingester.truncate(truncate_request).await {
warn!("failed to truncate shard(s): {error}");
}
};
// Truncation is best-effort, so fire and forget.
tokio::spawn(truncate_future);
}
}
}

#[async_trait]
Expand Down Expand Up @@ -288,6 +341,9 @@ impl Source for IngestSource {
self.assigned_shards.clear();
self.fetch_stream.reset();

let mut truncation_point =
Vec::with_capacity(acquire_shards_subresponse.acquired_shards.len());

for acquired_shard in acquire_shards_subresponse.acquired_shards {
let leader_id: NodeId = acquired_shard.leader_id.into();
let follower_id: Option<NodeId> = acquired_shard
Expand Down Expand Up @@ -317,13 +373,16 @@ impl Source for IngestSource {
error!(error=%error, "failed to subscribe to shard");
continue;
}
truncation_point.push((shard_id, current_position_inclusive.clone()));

let assigned_shard = AssignedShard {
leader_id,
partition_id,
current_position_inclusive,
};
self.assigned_shards.insert(shard_id, assigned_shard);
}
self.truncate(&truncation_point).await;
Ok(())
}

Expand All @@ -332,49 +391,13 @@ impl Source for IngestSource {
checkpoint: SourceCheckpoint,
_ctx: &SourceContext,
) -> anyhow::Result<()> {
let mut per_leader_truncate_subrequests: HashMap<&NodeId, Vec<TruncateSubrequest>> =
HashMap::new();
let mut truncation_point = Vec::with_capacity(checkpoint.num_partitions());

for (partition_id, position) in checkpoint.iter() {
let shard_id = partition_id.as_u64().expect("shard ID should be a u64");
let leader_id = &self
.assigned_shards
.get(&shard_id)
.expect("shard should be assigned") // TODO: This is not true if `assign_shards` is called before `suggest_truncate`.
.leader_id;
let to_position_inclusive = position.as_u64().expect("position should be a u64");

let truncate_subrequest = TruncateSubrequest {
index_uid: self.client_id.index_uid.clone().into(),
source_id: self.client_id.source_id.clone(),
shard_id,
to_position_inclusive,
};
per_leader_truncate_subrequests
.entry(leader_id)
.or_default()
.push(truncate_subrequest);
}
for (leader_id, truncate_subrequests) in per_leader_truncate_subrequests {
let Some(mut ingester) = self.ingester_pool.get(leader_id).await else {
warn!(
"failed to truncate shard: ingester `{}` is unavailable",
leader_id
);
continue;
};
let truncate_request = TruncateRequest {
leader_id: leader_id.clone().into(),
subrequests: truncate_subrequests,
};
let truncate_future = async move {
if let Err(error) = ingester.truncate(truncate_request).await {
warn!("failed to truncate shard(s): {error}");
}
};
// Truncation is best-effort, so fire and forget.
tokio::spawn(truncate_future);
truncation_point.push((shard_id, position));
}
self.truncate(&truncation_point).await;
Ok(())
}

Expand Down Expand Up @@ -435,15 +458,26 @@ mod tests {
subresponses: vec![AcquireShardsSubresponse {
index_uid: "test-index:0".to_string(),
source_id: "test-source".to_string(),
acquired_shards: vec![Shard {
leader_id: "test-ingester-0".to_string(),
follower_id: None,
index_uid: "test-index:0".to_string(),
source_id: "test-source".to_string(),
shard_id: 1,
publish_position_inclusive: "00000000000000000011".to_string(),
..Default::default()
}],
acquired_shards: vec![
Shard {
leader_id: "test-ingester-0".to_string(),
follower_id: None,
index_uid: "test-index:0".to_string(),
source_id: "test-source".to_string(),
shard_id: 1,
publish_position_inclusive: "00000000000000000011".to_string(),
..Default::default()
},
Shard {
leader_id: "test-ingester-1".to_string(),
follower_id: None,
index_uid: "test-index:0".to_string(),
source_id: "test-source".to_string(),
shard_id: 2,
publish_position_inclusive: "00000000000000000022".to_string(),
..Default::default()
},
],
}],
};
Ok(response)
Expand All @@ -469,6 +503,23 @@ mod tests {
let (_service_stream_tx, service_stream) = ServiceStream::new_bounded(1);
Ok(service_stream)
});
ingester_mock_0
.expect_truncate()
.once()
.returning(|request| {
assert_eq!(request.leader_id, "test-ingester-0");
assert_eq!(request.subrequests.len(), 1);

let subrequest = &request.subrequests[0];
assert_eq!(subrequest.index_uid, "test-index:0");
assert_eq!(subrequest.source_id, "test-source");
assert_eq!(subrequest.shard_id, 1);
assert_eq!(subrequest.to_position_inclusive, 11);

let response = TruncateResponse {};
Ok(response)
});

let ingester_0: IngesterServiceClient = ingester_mock_0.into();
ingester_pool
.insert("test-ingester-0".into(), ingester_0.clone())
Expand Down Expand Up @@ -525,13 +576,17 @@ mod tests {
assert_eq!(source.publish_token, publish_token);

assert_eq!(source.assigned_shards.len(), 1);

let assigned_shard = source.assigned_shards.get(&1).unwrap();
let expected_assigned_shard = AssignedShard {
leader_id: "test-ingester-0".into(),
partition_id: 1u64.into(),
current_position_inclusive: Position::from(11u64),
};
assert_eq!(assigned_shard, &expected_assigned_shard);

// Wait for the truncate future to complete.
time::sleep(Duration::from_millis(1)).await;
}

#[tokio::test]
Expand Down Expand Up @@ -755,8 +810,11 @@ mod tests {
(2u64.into(), 22u64.into()),
(3u64.into(), 33u64.into()),
(4u64.into(), 44u64.into()),
// (5u64.into(), 55u64.into()),
(5u64.into(), 55u64.into()),
]);
source.suggest_truncate(checkpoint, &ctx).await.unwrap();

// Wait for the truncate future to complete.
time::sleep(Duration::from_millis(1)).await;
}
}

0 comments on commit ade57d3

Please sign in to comment.