diff --git a/crates/admin/src/cluster_controller/service.rs b/crates/admin/src/cluster_controller/service.rs index 8fa159857..bebb9509b 100644 --- a/crates/admin/src/cluster_controller/service.rs +++ b/crates/admin/src/cluster_controller/service.rs @@ -28,7 +28,9 @@ use restate_types::partition_table::{FixedPartitionTable, KeyRange}; use restate_bifrost::Bifrost; use restate_core::network::{MessageRouterBuilder, NetworkSender}; -use restate_core::{cancellation_watcher, Metadata, ShutdownError, TaskCenter, TaskKind}; +use restate_core::{ + cancellation_watcher, Metadata, ShutdownError, TargetVersion, TaskCenter, TaskKind, +}; use restate_types::cluster::cluster_state::RunMode; use restate_types::cluster::cluster_state::{AliveNode, ClusterState, NodeState}; use restate_types::identifiers::PartitionId; @@ -370,7 +372,12 @@ async fn signal_all_partitions_started( || metadata.partition_table_version() == Version::INVALID { // syncing of PartitionTable since we obviously don't have up-to-date information - metadata.sync(MetadataKind::PartitionTable).await?; + metadata + .sync( + MetadataKind::PartitionTable, + TargetVersion::Version(cluster_state.partition_table_version.max(Version::MIN)), + ) + .await?; } else { let partition_table = metadata .partition_table() diff --git a/crates/bifrost/src/bifrost.rs b/crates/bifrost/src/bifrost.rs index 4472037be..d8923bc51 100644 --- a/crates/bifrost/src/bifrost.rs +++ b/crates/bifrost/src/bifrost.rs @@ -18,7 +18,7 @@ use enum_map::EnumMap; use smallvec::SmallVec; use tracing::instrument; -use restate_core::{Metadata, MetadataKind}; +use restate_core::{Metadata, MetadataKind, TargetVersion}; use restate_types::logs::metadata::{MaybeSegment, ProviderKind, Segment}; use restate_types::logs::{LogId, Lsn, Payload, SequenceNumber}; use restate_types::storage::StorageCodec; @@ -377,7 +377,7 @@ impl BifrostInner { pub async fn sync_metadata(&self) -> Result<()> { self.fail_if_shutting_down()?; self.metadata - .sync(MetadataKind::Logs) + .sync(MetadataKind::Logs, TargetVersion::Latest) .await .map_err(Arc::new)?; Ok(()) diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 76130c419..7e443aa1c 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -18,7 +18,7 @@ pub mod worker_api; pub use metadata::{ spawn_metadata_manager, Metadata, MetadataBuilder, MetadataKind, MetadataManager, - MetadataWriter, SyncError, + MetadataWriter, SyncError, TargetVersion, }; pub use task_center::*; pub use task_center_types::*; diff --git a/crates/core/src/metadata/manager.rs b/crates/core/src/metadata/manager.rs index eb75a7816..c3d0998ea 100644 --- a/crates/core/src/metadata/manager.rs +++ b/crates/core/src/metadata/manager.rs @@ -43,9 +43,34 @@ pub(super) type CommandReceiver = mpsc::UnboundedReceiver; #[derive(Debug, thiserror::Error)] pub enum SyncError {} +#[derive(Debug)] +pub enum TargetVersion { + Latest, + Version(Version), +} + +impl Default for TargetVersion { + fn default() -> Self { + Self::Latest + } +} + +impl From> for TargetVersion { + fn from(value: Option) -> Self { + match value { + Some(version) => TargetVersion::Version(version), + None => TargetVersion::Latest, + } + } +} + pub(super) enum Command { UpdateMetadata(MetadataContainer, Option>), - SyncMetadata(MetadataKind, oneshot::Sender>), + SyncMetadata( + MetadataKind, + TargetVersion, + oneshot::Sender>, + ), } /// A handler for processing network messages targeting metadata manager @@ -260,8 +285,8 @@ where async fn handle_command(&mut self, cmd: Command) { match cmd { Command::UpdateMetadata(value, callback) => self.update_metadata(value, callback), - Command::SyncMetadata(kind, callback) => { - let result = self.sync_metadata(kind).await; + Command::SyncMetadata(kind, target_version, callback) => { + let result = self.sync_metadata(kind, target_version).await; if callback.send(result).is_err() { trace!("Couldn't send synce metadata reply back. System is probably shutting down."); } @@ -290,7 +315,15 @@ where } } - async fn sync_metadata(&mut self, metadata_kind: MetadataKind) -> Result<(), ReadError> { + async fn sync_metadata( + &mut self, + metadata_kind: MetadataKind, + target_version: TargetVersion, + ) -> Result<(), ReadError> { + if self.has_target_version(metadata_kind, target_version) { + return Ok(()); + } + match metadata_kind { MetadataKind::NodesConfiguration => { if let Some(nodes_config) = self @@ -333,6 +366,26 @@ where Ok(()) } + fn has_target_version( + &self, + metadata_kind: MetadataKind, + target_version: TargetVersion, + ) -> bool { + match target_version { + TargetVersion::Latest => false, + TargetVersion::Version(target_version) => { + let version = match metadata_kind { + MetadataKind::NodesConfiguration => self.metadata.nodes_config_version(), + MetadataKind::Schema => self.metadata.schema_version(), + MetadataKind::PartitionTable => self.metadata.partition_table_version(), + MetadataKind::Logs => self.metadata.logs_version(), + }; + + version >= target_version + } + } + } + fn update_nodes_configuration(&mut self, config: NodesConfiguration) { let maybe_new_version = Self::update_internal(&self.metadata.inner.nodes_config, config); diff --git a/crates/core/src/metadata/mod.rs b/crates/core/src/metadata/mod.rs index 40a8d10f7..7b3666977 100644 --- a/crates/core/src/metadata/mod.rs +++ b/crates/core/src/metadata/mod.rs @@ -12,7 +12,7 @@ #![allow(dead_code)] mod manager; -pub use manager::MetadataManager; +pub use manager::{MetadataManager, TargetVersion}; use restate_types::live::{Live, Pinned}; use restate_types::schema::Schema; @@ -175,11 +175,23 @@ impl Metadata { self.inner.write_watches[metadata_kind].receive.clone() } - /// Syncs the given metadata_kind from the underlying metadata store. - pub async fn sync(&self, metadata_kind: MetadataKind) -> Result<(), SyncError> { + /// Syncs the given metadata_kind from the underlying metadata store if the current version is + /// lower than target version. + /// + /// Note: If the target version does not exist, then a lower version will be available after + /// this call completes. + pub async fn sync( + &self, + metadata_kind: MetadataKind, + target_version: TargetVersion, + ) -> Result<(), SyncError> { let (result_tx, result_rx) = oneshot::channel(); self.sender - .send(Command::SyncMetadata(metadata_kind, result_tx)) + .send(Command::SyncMetadata( + metadata_kind, + target_version, + result_tx, + )) .map_err(|_| ShutdownError)?; result_rx.await.map_err(|_| ShutdownError)??; diff --git a/crates/core/src/network/connection_manager.rs b/crates/core/src/network/connection_manager.rs index daf88293a..9aa58d447 100644 --- a/crates/core/src/network/connection_manager.rs +++ b/crates/core/src/network/connection_manager.rs @@ -21,7 +21,10 @@ use tokio_stream::wrappers::ReceiverStream; use tonic::transport::Channel; use tracing::{debug, info, trace, warn, Instrument, Span}; +use restate_types::live::Pinned; +use restate_types::net::metadata::MetadataKind; use restate_types::net::AdvertisedAddress; +use restate_types::nodes_config::NodesConfiguration; use restate_types::protobuf::node::message::{self, ConnectionControl}; use restate_types::protobuf::node::{Header, Hello, Message, Welcome}; use restate_types::{GenerationalNodeId, NodeId, PlainNodeId}; @@ -36,8 +39,8 @@ use super::metric_definitions::{ }; use super::protobuf::node_svc::node_svc_client::NodeSvcClient; use super::{Handler, MessageRouter}; -use crate::Metadata; use crate::{cancellation_watcher, current_task_id, task_center, TaskId, TaskKind}; +use crate::{Metadata, TargetVersion}; // todo: make this configurable const SEND_QUEUE_SIZE: usize = 1; @@ -136,7 +139,7 @@ impl ConnectionManager { // The client can retry with an exponential backoff on handshake timeout. debug!("Accepting incoming connection"); let (header, hello) = wait_for_hello(&mut incoming).await?; - let nodes_config = self.metadata.nodes_config_snapshot(); + let nodes_config = self.metadata.nodes_config_ref(); let my_node_id = self.metadata.my_node_id(); // NodeId **must** be generational at this layer let peer_node_id = hello @@ -176,33 +179,9 @@ impl ConnectionManager { selected_protocol_version ); - // If nodeId is unrecognized and peer is at higher nodes configuration version, - // TODO: issue a sync to the higher version - let peer_is_in_the_future = header - .my_nodes_config_version - .as_ref() - .is_some_and(|v| v.value > nodes_config.version().into()); - - if let Err(e) = nodes_config.find_node_by_id(peer_node_id) { - if peer_is_in_the_future { - info!( - "Rejecting a connection from an unrecognized node v{}, the peer is at a higher \ - nodes configuration version {:?}, mine is {}", - peer_node_id, - header.my_nodes_config_version, - nodes_config.version() - ); - // TODO: notify metadata about higher version. - // let _ = self - // .metadata - // .notify_observed_version( - // MetadataKind::NodesConfiguration, - // header.my_nodes_config_version.unwrap().into(), - // ) - // .await?; - } - return Err(NetworkError::UnknownNode(e)); - } + let nodes_config = self + .verify_node_id(peer_node_id, header, nodes_config) + .await?; let (tx, rx) = mpsc::channel(SEND_QUEUE_SIZE); // Enqueue the welcome message @@ -230,6 +209,44 @@ impl ConnectionManager { Ok(Box::pin(transformed)) } + async fn verify_node_id( + &self, + peer_node_id: GenerationalNodeId, + header: Header, + mut nodes_config: Pinned, + ) -> Result, NetworkError> { + if let Err(e) = nodes_config.find_node_by_id(peer_node_id) { + // If nodeId is unrecognized and peer is at higher nodes configuration version, + // then we have to update our NodesConfiguration + let peer_is_in_the_future = header + .my_nodes_config_version + .as_ref() + .is_some_and(|v| v.value > nodes_config.version().into()); + + if peer_is_in_the_future { + // don't keep pinned nodes configuration beyond await point + drop(nodes_config); + // todo: Replace with notifying metadata manager about newer version + self.metadata + .sync( + MetadataKind::NodesConfiguration, + TargetVersion::from(header.my_nodes_config_version.clone().map(Into::into)), + ) + .await?; + nodes_config = self.metadata.nodes_config_ref(); + + if let Err(e) = nodes_config.find_node_by_id(peer_node_id) { + warn!("Could not find remote node {} after syncing nodes configuration. Local version '{}', remote version '{:?}'.", peer_node_id, nodes_config.version(), header.my_nodes_config_version.expect("must be present")); + return Err(NetworkError::UnknownNode(e)); + } + } else { + return Err(NetworkError::UnknownNode(e)); + } + } + + Ok(nodes_config) + } + /// Always attempts to create a new connection with peer pub async fn enforced_new_node_sender( &self, diff --git a/crates/core/src/network/error.rs b/crates/core/src/network/error.rs index 0ee0f33fe..b4fb689ae 100644 --- a/crates/core/src/network/error.rs +++ b/crates/core/src/network/error.rs @@ -12,7 +12,7 @@ use restate_types::net::{CodecError, MIN_SUPPORTED_PROTOCOL_VERSION}; use restate_types::nodes_config::NodesConfigError; use restate_types::NodeId; -use crate::ShutdownError; +use crate::{ShutdownError, SyncError}; #[derive(Debug, thiserror::Error)] pub enum RouterError { @@ -42,6 +42,8 @@ pub enum NetworkError { ConnectionClosed, #[error("cannot send messages to this node: {0}")] Unavailable(String), + #[error("failed syncing metadata: {0}")] + Metadata(#[from] SyncError), } #[derive(Debug, thiserror::Error)] diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index 2b6f0967a..cc5914469 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -27,7 +27,9 @@ use restate_bifrost::BifrostService; use restate_core::metadata_store::{MetadataStoreClientError, ReadWriteError}; use restate_core::network::MessageRouterBuilder; use restate_core::network::Networking; -use restate_core::{spawn_metadata_manager, MetadataBuilder, MetadataKind, MetadataManager}; +use restate_core::{ + spawn_metadata_manager, MetadataBuilder, MetadataKind, MetadataManager, TargetVersion, +}; use restate_core::{task_center, TaskKind}; use restate_metadata_store::local::LocalMetadataStoreService; use restate_metadata_store::MetadataStoreClient; @@ -296,8 +298,12 @@ impl Node { metadata_writer.update(logs).await?; } else { // otherwise, just sync the required metadata - metadata.sync(MetadataKind::PartitionTable).await?; - metadata.sync(MetadataKind::Logs).await?; + metadata + .sync(MetadataKind::PartitionTable, TargetVersion::Latest) + .await?; + metadata + .sync(MetadataKind::Logs, TargetVersion::Latest) + .await?; // safety check until we can tolerate missing partition table and logs configuration if metadata.partition_table_version() == Version::INVALID @@ -312,7 +318,9 @@ impl Node { } // fetch the latest schema information - metadata.sync(MetadataKind::Schema).await?; + metadata + .sync(MetadataKind::Schema, TargetVersion::Latest) + .await?; let nodes_config = metadata.nodes_config_ref(); diff --git a/crates/types/protobuf/restate/common.proto b/crates/types/protobuf/restate/common.proto index 769e0e120..e3be4a52f 100644 --- a/crates/types/protobuf/restate/common.proto +++ b/crates/types/protobuf/restate/common.proto @@ -42,6 +42,7 @@ enum TargetName { ATTACH_RESPONSE = 6; GET_PROCESSORS_STATE_REQUEST = 7; PROCESSORS_STATE_RESPONSE = 8; + CONTROL_PROCESSORS = 9; } enum NodeStatus { diff --git a/crates/types/src/net/partition_processor_manager.rs b/crates/types/src/net/partition_processor_manager.rs index d62f5ef65..47d73d80e 100644 --- a/crates/types/src/net/partition_processor_manager.rs +++ b/crates/types/src/net/partition_processor_manager.rs @@ -15,9 +15,10 @@ use serde_with::serde_as; use crate::cluster::cluster_state::PartitionProcessorStatus; use crate::identifiers::PartitionId; -use crate::net::{RequestId, TargetName}; +use crate::net::{define_message, RequestId, TargetName}; use crate::net::define_rpc; +use crate::Version; define_rpc! { @request = GetProcessorsState, @@ -38,3 +39,27 @@ pub struct ProcessorsStateResponse { #[serde_as(as = "serde_with::Seq<(_, _)>")] pub state: BTreeMap, } + +define_message! { + @message = ControlProcessors, + @target = TargetName::ControlProcessors, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ControlProcessors { + pub min_partition_table_version: Version, + pub commands: Vec, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub struct ControlProcessor { + pub partition_id: PartitionId, + pub command: ProcessorCommand, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub enum ProcessorCommand { + Stop, + Follower, + Leader, +} diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index b6024ada9..136b3d672 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -10,19 +10,21 @@ use std::collections::BTreeMap; use std::ops::RangeInclusive; +use std::pin::Pin; use std::time::Duration; use anyhow::Context; use futures::future::OptionFuture; -use futures::stream::BoxStream; use futures::stream::StreamExt; +use futures::Stream; use metrics::gauge; use restate_types::live::Live; use restate_types::logs::SequenceNumber; +use tokio::sync::mpsc::error::TrySendError; use tokio::sync::{mpsc, watch}; use tokio::time; use tokio::time::MissedTickBehavior; -use tracing::{debug, info, trace, warn}; +use tracing::{debug, info, instrument, trace, warn}; use restate_bifrost::Bifrost; use restate_core::network::rpc_router::{RpcError, RpcRouter}; @@ -47,10 +49,13 @@ use restate_types::logs::Lsn; use restate_types::metadata_store::keys::partition_processor_epoch_key; use restate_types::net::cluster_controller::AttachRequest; use restate_types::net::cluster_controller::{Action, AttachResponse}; -use restate_types::net::partition_processor_manager::GetProcessorsState; use restate_types::net::partition_processor_manager::ProcessorsStateResponse; +use restate_types::net::partition_processor_manager::{ + ControlProcessor, ControlProcessors, GetProcessorsState, ProcessorCommand, +}; use restate_types::net::MessageEnvelope; use restate_types::net::RpcMessage; +use restate_types::partition_table::FixedPartitionTable; use restate_types::time::MillisSinceEpoch; use restate_types::GenerationalNodeId; @@ -70,14 +75,17 @@ use crate::PartitionProcessorBuilder; pub struct PartitionProcessorManager { task_center: TaskCenter, updateable_config: Live, - running_partition_processors: BTreeMap, + running_partition_processors: BTreeMap, name_cache: BTreeMap, metadata: Metadata, metadata_store_client: MetadataStoreClient, partition_store_manager: PartitionStoreManager, attach_router: RpcRouter, - incoming_get_state: BoxStream<'static, MessageEnvelope>, + incoming_get_state: + Pin> + Send + Sync + 'static>>, + incoming_update_processors: + Pin> + Send + Sync + 'static>>, networking: Networking, bifrost: Bifrost, invoker_handle: InvokerHandle>, @@ -88,6 +96,16 @@ pub struct PartitionProcessorManager { persisted_lsns_rx: Option>>, } +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error(transparent)] + Shutdown(#[from] ShutdownError), + #[error("failed updating the metadata store: {0}")] + MetadataStore(#[from] ReadModifyWriteError), + #[error("could not send command to partition processor since it is busy")] + PartitionProcessorBusy, +} + #[derive(Debug, thiserror::Error)] enum AttachError { #[error("No cluster controller found in nodes configuration")] @@ -96,12 +114,128 @@ enum AttachError { ShutdownError(#[from] ShutdownError), } -struct State { +struct ProcessorState { + partition_id: PartitionId, + task_id: TaskId, _created_at: MillisSinceEpoch, _key_range: RangeInclusive, - _control_tx: mpsc::Sender, + planned_mode: RunMode, + handle: PartitionProcessorHandle, watch_rx: watch::Receiver, - _task_id: TaskId, +} + +impl ProcessorState { + fn new( + partition_id: PartitionId, + task_id: TaskId, + key_range: RangeInclusive, + handle: PartitionProcessorHandle, + watch_rx: watch::Receiver, + ) -> Self { + Self { + partition_id, + task_id, + _created_at: MillisSinceEpoch::now(), + _key_range: key_range, + planned_mode: RunMode::Follower, + handle, + watch_rx, + } + } + + fn step_down(&mut self) -> Result<(), Error> { + if self.planned_mode != RunMode::Follower { + self.handle.step_down()?; + } + + self.planned_mode = RunMode::Follower; + + Ok(()) + } + + async fn run_for_leader( + &mut self, + metadata_store_client: MetadataStoreClient, + node_id: GenerationalNodeId, + ) -> Result<(), Error> { + if self.planned_mode != RunMode::Leader { + let leader_epoch = + Self::obtain_next_epoch(metadata_store_client, self.partition_id, node_id).await?; + self.handle.run_for_leader(leader_epoch)?; + } + + self.planned_mode = RunMode::Leader; + + Ok(()) + } + + async fn obtain_next_epoch( + metadata_store_client: MetadataStoreClient, + partition_id: PartitionId, + node_id: GenerationalNodeId, + ) -> Result { + let epoch: EpochMetadata = metadata_store_client + .read_modify_write(partition_processor_epoch_key(partition_id), |epoch| { + let next_epoch = epoch + .map(|epoch: EpochMetadata| epoch.claim_leadership(node_id, partition_id)) + .unwrap_or_else(|| EpochMetadata::new(node_id, partition_id)); + + Ok(next_epoch) + }) + .await?; + Ok(epoch.epoch()) + } +} + +#[derive(Debug, thiserror::Error)] +enum PartitionProcessorHandleError { + #[error(transparent)] + Shutdown(#[from] ShutdownError), + #[error("command could not be sent")] + FailedSend, +} + +impl From> for PartitionProcessorHandleError { + fn from(value: TrySendError) -> Self { + match value { + TrySendError::Full(_) => PartitionProcessorHandleError::FailedSend, + TrySendError::Closed(_) => PartitionProcessorHandleError::Shutdown(ShutdownError), + } + } +} + +impl From for Error { + fn from(value: PartitionProcessorHandleError) -> Self { + match value { + PartitionProcessorHandleError::Shutdown(err) => Error::Shutdown(err), + PartitionProcessorHandleError::FailedSend => Error::PartitionProcessorBusy, + } + } +} + +struct PartitionProcessorHandle { + control_tx: mpsc::Sender, +} + +impl PartitionProcessorHandle { + fn new(control_tx: mpsc::Sender) -> Self { + Self { control_tx } + } + + fn step_down(&self) -> Result<(), PartitionProcessorHandleError> { + self.control_tx + .try_send(PartitionProcessorControlCommand::StepDown)?; + Ok(()) + } + + fn run_for_leader( + &self, + leader_epoch: LeaderEpoch, + ) -> Result<(), PartitionProcessorHandleError> { + self.control_tx + .try_send(PartitionProcessorControlCommand::RunForLeader(leader_epoch))?; + Ok(()) + } } impl PartitionProcessorManager { @@ -119,6 +253,7 @@ impl PartitionProcessorManager { ) -> Self { let attach_router = RpcRouter::new(networking.clone(), router_builder); let incoming_get_state = router_builder.subscribe_to_stream(2); + let incoming_update_processors = router_builder.subscribe_to_stream(2); let (tx, rx) = mpsc::channel(updateable_config.pinned().worker.internal_queue_length()); Self { @@ -130,6 +265,7 @@ impl PartitionProcessorManager { metadata_store_client, partition_store_manager, incoming_get_state, + incoming_update_processors, networking, bifrost, invoker_handle, @@ -225,7 +361,12 @@ impl PartitionProcessorManager { Some(get_state) = self.incoming_get_state.next() => { self.on_get_state(get_state); } - _ = &mut shutdown => { + Some(update_processors) = self.incoming_update_processors.next() => { + if let Err(err) = self.on_control_processors(update_processors).await { + warn!("failed processing control processors command: {err}"); + } + } + _ = &mut shutdown => { return Ok(()); } } @@ -280,6 +421,9 @@ impl PartitionProcessorManager { .set(last_record_applied_at.elapsed()); } + // it is a bit unfortunate that we share PartitionProcessorStatus between the + // PP and the PPManager :-(. Maybe at some point we want to split the struct for it. + status.planned_mode = state.planned_mode; status.last_persisted_log_lsn = persisted_lsns .as_ref() .and_then(|lsns| lsns.get(partition_id).cloned()); @@ -311,7 +455,89 @@ impl PartitionProcessorManager { } } - pub async fn apply_plan(&mut self, actions: &[Action]) -> anyhow::Result<()> { + async fn on_control_processors( + &mut self, + control_processor: MessageEnvelope, + ) -> Result<(), Error> { + let (_, control_processors) = control_processor.split(); + + let partition_table = self + .metadata + .wait_for_partition_table(control_processors.min_partition_table_version) + .await?; + + for control_processor in control_processors.commands { + self.on_control_processor(control_processor, &partition_table) + .await?; + } + + Ok(()) + } + + #[instrument(level = "debug", skip_all, fields(partition_id = %control_processor.partition_id))] + async fn on_control_processor( + &mut self, + control_processor: ControlProcessor, + partition_table: &FixedPartitionTable, + ) -> Result<(), Error> { + let partition_id = control_processor.partition_id; + + match control_processor.command { + ProcessorCommand::Stop => { + if let Some(processor) = self.running_partition_processors.remove(&partition_id) { + if let Some(handle) = self.task_center.cancel_task(processor.task_id) { + if let Err(err) = handle.await { + warn!("Partition processor crashed while shutting down: {err}"); + } + } + } else { + debug!("No running partition processor. Ignoring stop command."); + } + } + ProcessorCommand::Follower => { + if let Some(state) = self.running_partition_processors.get_mut(&partition_id) { + // if we error here, then the system is shutting down + state.step_down()?; + } else if let Some(partition_key_range) = + partition_table.partition_range(partition_id) + { + self.start_partition_processor( + partition_id, + partition_key_range, + RunMode::Follower, + ) + .await?; + } else { + debug!("Unknown partition id '{partition_id}'. Ignoring follower command."); + } + } + ProcessorCommand::Leader => { + if let Some(state) = self.running_partition_processors.get_mut(&partition_id) { + state + .run_for_leader( + self.metadata_store_client.clone(), + self.metadata.my_node_id(), + ) + .await?; + } else if let Some(partition_key_range) = + partition_table.partition_range(partition_id) + { + self.start_partition_processor( + partition_id, + partition_key_range, + RunMode::Leader, + ) + .await?; + } else { + debug!("Unknown partition id '{partition_id}'. Ignoring leader command."); + } + } + } + + Ok(()) + } + + pub async fn apply_plan(&mut self, actions: &[Action]) -> Result<(), Error> { for action in actions { match action { Action::RunPartition(action) => { @@ -320,39 +546,12 @@ impl PartitionProcessorManager { .running_partition_processors .contains_key(&action.partition_id) { - let (control_tx, control_rx) = mpsc::channel(2); - let status = PartitionProcessorStatus::new(); - let (watch_tx, watch_rx) = watch::channel(status.clone()); - - let _task_id = self.spawn_partition_processor( + self.start_partition_processor( action.partition_id, action.key_range_inclusive.clone().into(), - status, - control_rx, - watch_tx, - )?; - - if RunMode::Leader == action.mode { - let leader_epoch = Self::obtain_next_epoch( - self.metadata_store_client.clone(), - action.partition_id, - self.metadata.my_node_id(), - ) - .await?; - let _ = control_tx - .send(PartitionProcessorControlCommand::RunForLeader(leader_epoch)) - .await; - } - - let state = State { - _created_at: MillisSinceEpoch::now(), - _key_range: action.key_range_inclusive.clone().into(), - _task_id, - _control_tx: control_tx, - watch_rx, - }; - self.running_partition_processors - .insert(action.partition_id, state); + action.mode, + ) + .await?; } else { debug!( "Partition processor for partition id '{}' is already running.", @@ -367,14 +566,37 @@ impl PartitionProcessorManager { Ok(()) } + async fn start_partition_processor( + &mut self, + partition_id: PartitionId, + key_range: RangeInclusive, + mode: RunMode, + ) -> Result<(), Error> { + let mut state = self.spawn_partition_processor(partition_id, key_range.clone())?; + + if RunMode::Leader == mode { + state + .run_for_leader( + self.metadata_store_client.clone(), + self.metadata.my_node_id(), + ) + .await? + } + + self.running_partition_processors + .insert(partition_id, state); + Ok(()) + } + fn spawn_partition_processor( &mut self, partition_id: PartitionId, key_range: RangeInclusive, - status: PartitionProcessorStatus, - control_rx: mpsc::Receiver, - watch_tx: watch::Sender, - ) -> Result { + ) -> Result { + let (control_tx, control_rx) = mpsc::channel(2); + let status = PartitionProcessorStatus::new(); + let (watch_tx, watch_rx) = watch::channel(status.clone()); + let config = self.updateable_config.pinned(); let options = &config.worker; @@ -401,18 +623,19 @@ impl PartitionProcessorManager { .entry(partition_id) .or_insert_with(|| Box::leak(Box::new(format!("pp-{}", partition_id)))); - self.task_center.spawn_child( + let task_id = self.task_center.spawn_child( TaskKind::PartitionProcessor, task_name, Some(pp_builder.partition_id), { let storage_manager = self.partition_store_manager.clone(); let options = options.clone(); + let key_range = key_range.clone(); async move { let partition_store = storage_manager .open_partition_store( partition_id, - key_range.clone(), + key_range, OpenMode::CreateIfMissing, &options.storage.rocksdb, ) @@ -425,24 +648,15 @@ impl PartitionProcessorManager { .await } }, - ) - } - - async fn obtain_next_epoch( - metadata_store_client: MetadataStoreClient, - partition_id: PartitionId, - node_id: GenerationalNodeId, - ) -> Result { - let epoch: EpochMetadata = metadata_store_client - .read_modify_write(partition_processor_epoch_key(partition_id), |epoch| { - let next_epoch = epoch - .map(|epoch: EpochMetadata| epoch.claim_leadership(node_id, partition_id)) - .unwrap_or_else(|| EpochMetadata::new(node_id, partition_id)); + )?; - Ok(next_epoch) - }) - .await?; - Ok(epoch.epoch()) + Ok(ProcessorState::new( + partition_id, + task_id, + key_range, + PartitionProcessorHandle::new(control_tx), + watch_rx, + )) } }