diff --git a/crates/types/protobuf/restate/common.proto b/crates/types/protobuf/restate/common.proto index 769e0e120..e3be4a52f 100644 --- a/crates/types/protobuf/restate/common.proto +++ b/crates/types/protobuf/restate/common.proto @@ -42,6 +42,7 @@ enum TargetName { ATTACH_RESPONSE = 6; GET_PROCESSORS_STATE_REQUEST = 7; PROCESSORS_STATE_RESPONSE = 8; + CONTROL_PROCESSORS = 9; } enum NodeStatus { diff --git a/crates/types/src/net/partition_processor_manager.rs b/crates/types/src/net/partition_processor_manager.rs index d62f5ef65..47d73d80e 100644 --- a/crates/types/src/net/partition_processor_manager.rs +++ b/crates/types/src/net/partition_processor_manager.rs @@ -15,9 +15,10 @@ use serde_with::serde_as; use crate::cluster::cluster_state::PartitionProcessorStatus; use crate::identifiers::PartitionId; -use crate::net::{RequestId, TargetName}; +use crate::net::{define_message, RequestId, TargetName}; use crate::net::define_rpc; +use crate::Version; define_rpc! { @request = GetProcessorsState, @@ -38,3 +39,27 @@ pub struct ProcessorsStateResponse { #[serde_as(as = "serde_with::Seq<(_, _)>")] pub state: BTreeMap, } + +define_message! { + @message = ControlProcessors, + @target = TargetName::ControlProcessors, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ControlProcessors { + pub min_partition_table_version: Version, + pub commands: Vec, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub struct ControlProcessor { + pub partition_id: PartitionId, + pub command: ProcessorCommand, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub enum ProcessorCommand { + Stop, + Follower, + Leader, +} diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index b6024ada9..136b3d672 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -10,19 +10,21 @@ use std::collections::BTreeMap; use std::ops::RangeInclusive; +use std::pin::Pin; use std::time::Duration; use anyhow::Context; use futures::future::OptionFuture; -use futures::stream::BoxStream; use futures::stream::StreamExt; +use futures::Stream; use metrics::gauge; use restate_types::live::Live; use restate_types::logs::SequenceNumber; +use tokio::sync::mpsc::error::TrySendError; use tokio::sync::{mpsc, watch}; use tokio::time; use tokio::time::MissedTickBehavior; -use tracing::{debug, info, trace, warn}; +use tracing::{debug, info, instrument, trace, warn}; use restate_bifrost::Bifrost; use restate_core::network::rpc_router::{RpcError, RpcRouter}; @@ -47,10 +49,13 @@ use restate_types::logs::Lsn; use restate_types::metadata_store::keys::partition_processor_epoch_key; use restate_types::net::cluster_controller::AttachRequest; use restate_types::net::cluster_controller::{Action, AttachResponse}; -use restate_types::net::partition_processor_manager::GetProcessorsState; use restate_types::net::partition_processor_manager::ProcessorsStateResponse; +use restate_types::net::partition_processor_manager::{ + ControlProcessor, ControlProcessors, GetProcessorsState, ProcessorCommand, +}; use restate_types::net::MessageEnvelope; use restate_types::net::RpcMessage; +use restate_types::partition_table::FixedPartitionTable; use restate_types::time::MillisSinceEpoch; use restate_types::GenerationalNodeId; @@ -70,14 +75,17 @@ use crate::PartitionProcessorBuilder; pub struct PartitionProcessorManager { task_center: TaskCenter, updateable_config: Live, - running_partition_processors: BTreeMap, + running_partition_processors: BTreeMap, name_cache: BTreeMap, metadata: Metadata, metadata_store_client: MetadataStoreClient, partition_store_manager: PartitionStoreManager, attach_router: RpcRouter, - incoming_get_state: BoxStream<'static, MessageEnvelope>, + incoming_get_state: + Pin> + Send + Sync + 'static>>, + incoming_update_processors: + Pin> + Send + Sync + 'static>>, networking: Networking, bifrost: Bifrost, invoker_handle: InvokerHandle>, @@ -88,6 +96,16 @@ pub struct PartitionProcessorManager { persisted_lsns_rx: Option>>, } +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error(transparent)] + Shutdown(#[from] ShutdownError), + #[error("failed updating the metadata store: {0}")] + MetadataStore(#[from] ReadModifyWriteError), + #[error("could not send command to partition processor since it is busy")] + PartitionProcessorBusy, +} + #[derive(Debug, thiserror::Error)] enum AttachError { #[error("No cluster controller found in nodes configuration")] @@ -96,12 +114,128 @@ enum AttachError { ShutdownError(#[from] ShutdownError), } -struct State { +struct ProcessorState { + partition_id: PartitionId, + task_id: TaskId, _created_at: MillisSinceEpoch, _key_range: RangeInclusive, - _control_tx: mpsc::Sender, + planned_mode: RunMode, + handle: PartitionProcessorHandle, watch_rx: watch::Receiver, - _task_id: TaskId, +} + +impl ProcessorState { + fn new( + partition_id: PartitionId, + task_id: TaskId, + key_range: RangeInclusive, + handle: PartitionProcessorHandle, + watch_rx: watch::Receiver, + ) -> Self { + Self { + partition_id, + task_id, + _created_at: MillisSinceEpoch::now(), + _key_range: key_range, + planned_mode: RunMode::Follower, + handle, + watch_rx, + } + } + + fn step_down(&mut self) -> Result<(), Error> { + if self.planned_mode != RunMode::Follower { + self.handle.step_down()?; + } + + self.planned_mode = RunMode::Follower; + + Ok(()) + } + + async fn run_for_leader( + &mut self, + metadata_store_client: MetadataStoreClient, + node_id: GenerationalNodeId, + ) -> Result<(), Error> { + if self.planned_mode != RunMode::Leader { + let leader_epoch = + Self::obtain_next_epoch(metadata_store_client, self.partition_id, node_id).await?; + self.handle.run_for_leader(leader_epoch)?; + } + + self.planned_mode = RunMode::Leader; + + Ok(()) + } + + async fn obtain_next_epoch( + metadata_store_client: MetadataStoreClient, + partition_id: PartitionId, + node_id: GenerationalNodeId, + ) -> Result { + let epoch: EpochMetadata = metadata_store_client + .read_modify_write(partition_processor_epoch_key(partition_id), |epoch| { + let next_epoch = epoch + .map(|epoch: EpochMetadata| epoch.claim_leadership(node_id, partition_id)) + .unwrap_or_else(|| EpochMetadata::new(node_id, partition_id)); + + Ok(next_epoch) + }) + .await?; + Ok(epoch.epoch()) + } +} + +#[derive(Debug, thiserror::Error)] +enum PartitionProcessorHandleError { + #[error(transparent)] + Shutdown(#[from] ShutdownError), + #[error("command could not be sent")] + FailedSend, +} + +impl From> for PartitionProcessorHandleError { + fn from(value: TrySendError) -> Self { + match value { + TrySendError::Full(_) => PartitionProcessorHandleError::FailedSend, + TrySendError::Closed(_) => PartitionProcessorHandleError::Shutdown(ShutdownError), + } + } +} + +impl From for Error { + fn from(value: PartitionProcessorHandleError) -> Self { + match value { + PartitionProcessorHandleError::Shutdown(err) => Error::Shutdown(err), + PartitionProcessorHandleError::FailedSend => Error::PartitionProcessorBusy, + } + } +} + +struct PartitionProcessorHandle { + control_tx: mpsc::Sender, +} + +impl PartitionProcessorHandle { + fn new(control_tx: mpsc::Sender) -> Self { + Self { control_tx } + } + + fn step_down(&self) -> Result<(), PartitionProcessorHandleError> { + self.control_tx + .try_send(PartitionProcessorControlCommand::StepDown)?; + Ok(()) + } + + fn run_for_leader( + &self, + leader_epoch: LeaderEpoch, + ) -> Result<(), PartitionProcessorHandleError> { + self.control_tx + .try_send(PartitionProcessorControlCommand::RunForLeader(leader_epoch))?; + Ok(()) + } } impl PartitionProcessorManager { @@ -119,6 +253,7 @@ impl PartitionProcessorManager { ) -> Self { let attach_router = RpcRouter::new(networking.clone(), router_builder); let incoming_get_state = router_builder.subscribe_to_stream(2); + let incoming_update_processors = router_builder.subscribe_to_stream(2); let (tx, rx) = mpsc::channel(updateable_config.pinned().worker.internal_queue_length()); Self { @@ -130,6 +265,7 @@ impl PartitionProcessorManager { metadata_store_client, partition_store_manager, incoming_get_state, + incoming_update_processors, networking, bifrost, invoker_handle, @@ -225,7 +361,12 @@ impl PartitionProcessorManager { Some(get_state) = self.incoming_get_state.next() => { self.on_get_state(get_state); } - _ = &mut shutdown => { + Some(update_processors) = self.incoming_update_processors.next() => { + if let Err(err) = self.on_control_processors(update_processors).await { + warn!("failed processing control processors command: {err}"); + } + } + _ = &mut shutdown => { return Ok(()); } } @@ -280,6 +421,9 @@ impl PartitionProcessorManager { .set(last_record_applied_at.elapsed()); } + // it is a bit unfortunate that we share PartitionProcessorStatus between the + // PP and the PPManager :-(. Maybe at some point we want to split the struct for it. + status.planned_mode = state.planned_mode; status.last_persisted_log_lsn = persisted_lsns .as_ref() .and_then(|lsns| lsns.get(partition_id).cloned()); @@ -311,7 +455,89 @@ impl PartitionProcessorManager { } } - pub async fn apply_plan(&mut self, actions: &[Action]) -> anyhow::Result<()> { + async fn on_control_processors( + &mut self, + control_processor: MessageEnvelope, + ) -> Result<(), Error> { + let (_, control_processors) = control_processor.split(); + + let partition_table = self + .metadata + .wait_for_partition_table(control_processors.min_partition_table_version) + .await?; + + for control_processor in control_processors.commands { + self.on_control_processor(control_processor, &partition_table) + .await?; + } + + Ok(()) + } + + #[instrument(level = "debug", skip_all, fields(partition_id = %control_processor.partition_id))] + async fn on_control_processor( + &mut self, + control_processor: ControlProcessor, + partition_table: &FixedPartitionTable, + ) -> Result<(), Error> { + let partition_id = control_processor.partition_id; + + match control_processor.command { + ProcessorCommand::Stop => { + if let Some(processor) = self.running_partition_processors.remove(&partition_id) { + if let Some(handle) = self.task_center.cancel_task(processor.task_id) { + if let Err(err) = handle.await { + warn!("Partition processor crashed while shutting down: {err}"); + } + } + } else { + debug!("No running partition processor. Ignoring stop command."); + } + } + ProcessorCommand::Follower => { + if let Some(state) = self.running_partition_processors.get_mut(&partition_id) { + // if we error here, then the system is shutting down + state.step_down()?; + } else if let Some(partition_key_range) = + partition_table.partition_range(partition_id) + { + self.start_partition_processor( + partition_id, + partition_key_range, + RunMode::Follower, + ) + .await?; + } else { + debug!("Unknown partition id '{partition_id}'. Ignoring follower command."); + } + } + ProcessorCommand::Leader => { + if let Some(state) = self.running_partition_processors.get_mut(&partition_id) { + state + .run_for_leader( + self.metadata_store_client.clone(), + self.metadata.my_node_id(), + ) + .await?; + } else if let Some(partition_key_range) = + partition_table.partition_range(partition_id) + { + self.start_partition_processor( + partition_id, + partition_key_range, + RunMode::Leader, + ) + .await?; + } else { + debug!("Unknown partition id '{partition_id}'. Ignoring leader command."); + } + } + } + + Ok(()) + } + + pub async fn apply_plan(&mut self, actions: &[Action]) -> Result<(), Error> { for action in actions { match action { Action::RunPartition(action) => { @@ -320,39 +546,12 @@ impl PartitionProcessorManager { .running_partition_processors .contains_key(&action.partition_id) { - let (control_tx, control_rx) = mpsc::channel(2); - let status = PartitionProcessorStatus::new(); - let (watch_tx, watch_rx) = watch::channel(status.clone()); - - let _task_id = self.spawn_partition_processor( + self.start_partition_processor( action.partition_id, action.key_range_inclusive.clone().into(), - status, - control_rx, - watch_tx, - )?; - - if RunMode::Leader == action.mode { - let leader_epoch = Self::obtain_next_epoch( - self.metadata_store_client.clone(), - action.partition_id, - self.metadata.my_node_id(), - ) - .await?; - let _ = control_tx - .send(PartitionProcessorControlCommand::RunForLeader(leader_epoch)) - .await; - } - - let state = State { - _created_at: MillisSinceEpoch::now(), - _key_range: action.key_range_inclusive.clone().into(), - _task_id, - _control_tx: control_tx, - watch_rx, - }; - self.running_partition_processors - .insert(action.partition_id, state); + action.mode, + ) + .await?; } else { debug!( "Partition processor for partition id '{}' is already running.", @@ -367,14 +566,37 @@ impl PartitionProcessorManager { Ok(()) } + async fn start_partition_processor( + &mut self, + partition_id: PartitionId, + key_range: RangeInclusive, + mode: RunMode, + ) -> Result<(), Error> { + let mut state = self.spawn_partition_processor(partition_id, key_range.clone())?; + + if RunMode::Leader == mode { + state + .run_for_leader( + self.metadata_store_client.clone(), + self.metadata.my_node_id(), + ) + .await? + } + + self.running_partition_processors + .insert(partition_id, state); + Ok(()) + } + fn spawn_partition_processor( &mut self, partition_id: PartitionId, key_range: RangeInclusive, - status: PartitionProcessorStatus, - control_rx: mpsc::Receiver, - watch_tx: watch::Sender, - ) -> Result { + ) -> Result { + let (control_tx, control_rx) = mpsc::channel(2); + let status = PartitionProcessorStatus::new(); + let (watch_tx, watch_rx) = watch::channel(status.clone()); + let config = self.updateable_config.pinned(); let options = &config.worker; @@ -401,18 +623,19 @@ impl PartitionProcessorManager { .entry(partition_id) .or_insert_with(|| Box::leak(Box::new(format!("pp-{}", partition_id)))); - self.task_center.spawn_child( + let task_id = self.task_center.spawn_child( TaskKind::PartitionProcessor, task_name, Some(pp_builder.partition_id), { let storage_manager = self.partition_store_manager.clone(); let options = options.clone(); + let key_range = key_range.clone(); async move { let partition_store = storage_manager .open_partition_store( partition_id, - key_range.clone(), + key_range, OpenMode::CreateIfMissing, &options.storage.rocksdb, ) @@ -425,24 +648,15 @@ impl PartitionProcessorManager { .await } }, - ) - } - - async fn obtain_next_epoch( - metadata_store_client: MetadataStoreClient, - partition_id: PartitionId, - node_id: GenerationalNodeId, - ) -> Result { - let epoch: EpochMetadata = metadata_store_client - .read_modify_write(partition_processor_epoch_key(partition_id), |epoch| { - let next_epoch = epoch - .map(|epoch: EpochMetadata| epoch.claim_leadership(node_id, partition_id)) - .unwrap_or_else(|| EpochMetadata::new(node_id, partition_id)); + )?; - Ok(next_epoch) - }) - .await?; - Ok(epoch.epoch()) + Ok(ProcessorState::new( + partition_id, + task_id, + key_range, + PartitionProcessorHandle::new(control_tx), + watch_rx, + )) } }