From 15d7756492aea9d843a09353aaa25dafb95c9dae Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Sun, 7 Jul 2024 13:47:37 +0200 Subject: [PATCH] Introduce UpdateProcessors command This commit introduces the UpdateProcessors command which allows an external entity to instruct the PartitionProcessorManager to start and stop PartitionProcessors. This fixes #1695. --- crates/types/protobuf/restate/common.proto | 1 + .../src/net/partition_processor_manager.rs | 27 +- .../worker/src/partition_processor_manager.rs | 237 ++++++++++++++---- 3 files changed, 217 insertions(+), 48 deletions(-) diff --git a/crates/types/protobuf/restate/common.proto b/crates/types/protobuf/restate/common.proto index 769e0e120..96e8ef217 100644 --- a/crates/types/protobuf/restate/common.proto +++ b/crates/types/protobuf/restate/common.proto @@ -42,6 +42,7 @@ enum TargetName { ATTACH_RESPONSE = 6; GET_PROCESSORS_STATE_REQUEST = 7; PROCESSORS_STATE_RESPONSE = 8; + UPDATE_PROCESSORS = 9; } enum NodeStatus { diff --git a/crates/types/src/net/partition_processor_manager.rs b/crates/types/src/net/partition_processor_manager.rs index d62f5ef65..6a6d16f2d 100644 --- a/crates/types/src/net/partition_processor_manager.rs +++ b/crates/types/src/net/partition_processor_manager.rs @@ -15,9 +15,10 @@ use serde_with::serde_as; use crate::cluster::cluster_state::PartitionProcessorStatus; use crate::identifiers::PartitionId; -use crate::net::{RequestId, TargetName}; +use crate::net::{define_message, RequestId, TargetName}; use crate::net::define_rpc; +use crate::Version; define_rpc! { @request = GetProcessorsState, @@ -38,3 +39,27 @@ pub struct ProcessorsStateResponse { #[serde_as(as = "serde_with::Seq<(_, _)>")] pub state: BTreeMap, } + +define_message! { + @message = UpdateProcessors, + @target = TargetName::UpdateProcessors, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct UpdateProcessors { + pub min_partition_table_version: Version, + pub commands: Vec, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub struct UpdateProcessor { + pub partition_id: PartitionId, + pub command: ProcessorCommand, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub enum ProcessorCommand { + Stop, + Follower, + Leader, +} diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index 6472a8466..aad9d1123 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -10,18 +10,19 @@ use std::collections::BTreeMap; use std::ops::RangeInclusive; +use std::pin::Pin; use std::time::Duration; use anyhow::Context; use futures::future::OptionFuture; -use futures::stream::BoxStream; use futures::stream::StreamExt; +use futures::Stream; use metrics::gauge; use restate_types::live::Live; use tokio::sync::{mpsc, watch}; use tokio::time; use tokio::time::MissedTickBehavior; -use tracing::{debug, info, trace, warn}; +use tracing::{debug, info, instrument, trace, warn}; use restate_bifrost::Bifrost; use restate_core::network::rpc_router::{RpcError, RpcRouter}; @@ -46,10 +47,13 @@ use restate_types::logs::{Lsn, SequenceNumber}; use restate_types::metadata_store::keys::partition_processor_epoch_key; use restate_types::net::cluster_controller::AttachRequest; use restate_types::net::cluster_controller::{Action, AttachResponse}; -use restate_types::net::partition_processor_manager::GetProcessorsState; use restate_types::net::partition_processor_manager::ProcessorsStateResponse; +use restate_types::net::partition_processor_manager::{ + GetProcessorsState, ProcessorCommand, UpdateProcessor, UpdateProcessors, +}; use restate_types::net::MessageEnvelope; use restate_types::net::RpcMessage; +use restate_types::partition_table::FixedPartitionTable; use restate_types::time::MillisSinceEpoch; use restate_types::GenerationalNodeId; @@ -76,7 +80,10 @@ pub struct PartitionProcessorManager { metadata_store_client: MetadataStoreClient, partition_store_manager: PartitionStoreManager, attach_router: RpcRouter, - incoming_get_state: BoxStream<'static, MessageEnvelope>, + incoming_get_state: + Pin> + Send + Sync + 'static>>, + incoming_update_processors: + Pin> + Send + Sync + 'static>>, networking: Networking, bifrost: Bifrost, invoker_handle: InvokerHandle>, @@ -87,6 +94,14 @@ pub struct PartitionProcessorManager { persisted_lsns_rx: Option>>, } +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error(transparent)] + Shutdown(#[from] ShutdownError), + #[error("failed updating the metadata store: {0}")] + MetadataStore(#[from] ReadModifyWriteError), +} + #[derive(Debug, thiserror::Error)] enum AttachError { #[error("No cluster controller found in nodes configuration")] @@ -98,9 +113,33 @@ enum AttachError { struct State { _created_at: MillisSinceEpoch, _key_range: RangeInclusive, - _control_tx: mpsc::Sender, + handle: PartitionProcessorHandle, watch_rx: watch::Receiver, - _task_id: TaskId, + task_id: TaskId, +} + +struct PartitionProcessorHandle { + control_tx: mpsc::Sender, +} + +impl PartitionProcessorHandle { + fn new(control_tx: mpsc::Sender) -> Self { + Self { control_tx } + } + + async fn step_down(&self) -> Result<(), ShutdownError> { + self.control_tx + .send(PartitionProcessorControlCommand::StepDown) + .await + .map_err(|_| ShutdownError) + } + + async fn run_for_leader(&self, leader_epoch: LeaderEpoch) -> Result<(), ShutdownError> { + self.control_tx + .send(PartitionProcessorControlCommand::RunForLeader(leader_epoch)) + .await + .map_err(|_| ShutdownError) + } } impl PartitionProcessorManager { @@ -118,6 +157,7 @@ impl PartitionProcessorManager { ) -> Self { let attach_router = RpcRouter::new(networking.clone(), router_builder); let incoming_get_state = router_builder.subscribe_to_stream(2); + let incoming_update_processors = router_builder.subscribe_to_stream(2); let (tx, rx) = mpsc::channel(updateable_config.pinned().worker.internal_queue_length()); Self { @@ -129,6 +169,7 @@ impl PartitionProcessorManager { metadata_store_client, partition_store_manager, incoming_get_state, + incoming_update_processors, networking, bifrost, invoker_handle, @@ -195,7 +236,6 @@ impl PartitionProcessorManager { .context("Timeout waiting to attach to a cluster controller")??; let (from, msg) = response.split(); - // We ignore errors due to shutdown self.apply_plan(&msg.actions).await?; self.latest_attach_response = Some((from, msg)); info!("Plan applied from attaching to controller {}", from); @@ -225,7 +265,12 @@ impl PartitionProcessorManager { Some(get_state) = self.incoming_get_state.next() => { self.on_get_state(get_state); } - _ = &mut shutdown => { + Some(update_processors) = self.incoming_update_processors.next() => { + if let Err(err) = self.on_update_processors(update_processors).await { + warn!("failed processing update processors command: {err}"); + } + } + _ = &mut shutdown => { return Ok(()); } } @@ -311,7 +356,103 @@ impl PartitionProcessorManager { } } - pub async fn apply_plan(&mut self, actions: &[Action]) -> anyhow::Result<()> { + async fn on_update_processors( + &mut self, + update_processor: MessageEnvelope, + ) -> Result<(), Error> { + let (_, update_processors) = update_processor.split(); + + let partition_table = self + .metadata + .wait_for_partition_table(update_processors.min_partition_table_version) + .await?; + + for update_processor in update_processors.commands { + self.on_update_processor(update_processor, &partition_table) + .await?; + } + + Ok(()) + } + + #[instrument(level = "debug", skip_all, fields(partition_id = %update_processor.partition_id))] + async fn on_update_processor( + &mut self, + update_processor: UpdateProcessor, + partition_table: &FixedPartitionTable, + ) -> Result<(), Error> { + let partition_id = update_processor.partition_id; + + match update_processor.command { + ProcessorCommand::Stop => { + if let Some(processor) = self.running_partition_processors.remove(&partition_id) { + if let Some(handle) = self.task_center.cancel_task(processor.task_id) { + if let Err(err) = handle.await { + warn!("Partition processor crashed while shutting down: {err}"); + } + } + } else { + debug!("No running partition processor. Ignoring stop command."); + } + } + ProcessorCommand::Follower => { + if let Some(processor) = self.running_partition_processors.get(&partition_id) { + // if we error here, then the system is shutting down + let _ = processor.handle.step_down().await; + } else if let Some(partition_key_range) = + partition_table.partition_range(partition_id) + { + self.start_partition_processor( + partition_id, + partition_key_range, + RunMode::Follower, + ) + .await?; + } else { + debug!("Unknown partition id '{partition_id}'. Ignoring follower command."); + } + } + ProcessorCommand::Leader => { + if let Some(processor) = self.running_partition_processors.get(&partition_id) { + self.make_processor_run_as_leader(partition_id, &processor.handle) + .await?; + } else if let Some(partition_key_range) = + partition_table.partition_range(partition_id) + { + self.start_partition_processor( + partition_id, + partition_key_range, + RunMode::Leader, + ) + .await?; + } else { + debug!("Unknown partition id '{partition_id}'. Ignoring leader command."); + return Ok(()); + } + } + } + + Ok(()) + } + + async fn make_processor_run_as_leader( + &self, + partition_id: PartitionId, + handle: &PartitionProcessorHandle, + ) -> Result<(), Error> { + let leader_epoch = Self::obtain_next_epoch( + self.metadata_store_client.clone(), + partition_id, + self.metadata.my_node_id(), + ) + .await?; + + handle.run_for_leader(leader_epoch).await?; + + Ok(()) + } + + pub async fn apply_plan(&mut self, actions: &[Action]) -> Result<(), Error> { for action in actions { match action { Action::RunPartition(action) => { @@ -320,39 +461,12 @@ impl PartitionProcessorManager { .running_partition_processors .contains_key(&action.partition_id) { - let (control_tx, control_rx) = mpsc::channel(2); - let status = PartitionProcessorStatus::new(action.mode); - let (watch_tx, watch_rx) = watch::channel(status.clone()); - - let _task_id = self.spawn_partition_processor( + self.start_partition_processor( action.partition_id, action.key_range_inclusive.clone().into(), - status, - control_rx, - watch_tx, - )?; - - if RunMode::Leader == action.mode { - let leader_epoch = Self::obtain_next_epoch( - self.metadata_store_client.clone(), - action.partition_id, - self.metadata.my_node_id(), - ) - .await?; - let _ = control_tx - .send(PartitionProcessorControlCommand::RunForLeader(leader_epoch)) - .await; - } - - let state = State { - _created_at: MillisSinceEpoch::now(), - _key_range: action.key_range_inclusive.clone().into(), - _task_id, - _control_tx: control_tx, - watch_rx, - }; - self.running_partition_processors - .insert(action.partition_id, state); + action.mode, + ) + .await?; } else { debug!( "Partition processor for partition id '{}' is already running.", @@ -367,14 +481,34 @@ impl PartitionProcessorManager { Ok(()) } + async fn start_partition_processor( + &mut self, + partition_id: PartitionId, + key_range: RangeInclusive, + mode: RunMode, + ) -> Result<(), Error> { + let state = self.spawn_partition_processor(partition_id, key_range.clone(), mode)?; + + if RunMode::Leader == mode { + self.make_processor_run_as_leader(partition_id, &state.handle) + .await?; + } + + self.running_partition_processors + .insert(partition_id, state); + Ok(()) + } + fn spawn_partition_processor( &mut self, partition_id: PartitionId, key_range: RangeInclusive, - status: PartitionProcessorStatus, - control_rx: mpsc::Receiver, - watch_tx: watch::Sender, - ) -> Result { + mode: RunMode, + ) -> Result { + let (control_tx, control_rx) = mpsc::channel(2); + let status = PartitionProcessorStatus::new(mode); + let (watch_tx, watch_rx) = watch::channel(status.clone()); + let config = self.updateable_config.pinned(); let options = &config.worker; @@ -401,18 +535,19 @@ impl PartitionProcessorManager { .entry(partition_id) .or_insert_with(|| Box::leak(Box::new(format!("pp-{}", partition_id)))); - self.task_center.spawn_child( + let task_id = self.task_center.spawn_child( TaskKind::PartitionProcessor, task_name, Some(pp_builder.partition_id), { let storage_manager = self.partition_store_manager.clone(); let options = options.clone(); + let key_range = key_range.clone(); async move { let partition_store = storage_manager .open_partition_store( partition_id, - key_range.clone(), + key_range, OpenMode::CreateIfMissing, &options.storage.rocksdb, ) @@ -425,7 +560,15 @@ impl PartitionProcessorManager { .await } }, - ) + )?; + + Ok(State { + _created_at: MillisSinceEpoch::now(), + _key_range: key_range, + task_id, + handle: PartitionProcessorHandle::new(control_tx), + watch_rx, + }) } async fn obtain_next_epoch(