Skip to content

Commit

Permalink
Instrumentation and separate pp, ingress, and default runtimes
Browse files Browse the repository at this point in the history
  • Loading branch information
AhmedSoliman committed May 29, 2024
1 parent 93c9f42 commit 90e9fc7
Show file tree
Hide file tree
Showing 23 changed files with 297 additions and 77 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

106 changes: 95 additions & 11 deletions crates/core/src/task_center.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use restate_types::identifiers::PartitionId;
use crate::metric_definitions::{TC_FINISHED, TC_SPAWN, TC_STATUS_COMPLETED, TC_STATUS_FAILED};
use crate::{metric_definitions, Metadata, TaskId, TaskKind};

static WORKER_ID: AtomicUsize = AtomicUsize::new(0);
static NEXT_TASK_ID: AtomicU64 = AtomicU64::new(0);
const EXIT_CODE_FAILURE: i32 = 1;

Expand Down Expand Up @@ -62,6 +63,8 @@ pub enum TaskCenterBuildError {
pub struct TaskCenterBuilder {
default_runtime_handle: Option<tokio::runtime::Handle>,
default_runtime: Option<tokio::runtime::Runtime>,
ingress_runtime_handle: Option<tokio::runtime::Handle>,
ingress_runtime: Option<tokio::runtime::Runtime>,
options: Option<CommonOptions>,
#[cfg(any(test, feature = "test-util"))]
pause_time: bool,
Expand All @@ -74,6 +77,12 @@ impl TaskCenterBuilder {
self
}

pub fn ingress_runtime_handle(mut self, handle: tokio::runtime::Handle) -> Self {
self.ingress_runtime_handle = Some(handle);
self.ingress_runtime = None;
self
}

pub fn options(mut self, options: CommonOptions) -> Self {
self.options = Some(options);
self
Expand All @@ -85,6 +94,12 @@ impl TaskCenterBuilder {
self
}

pub fn ingress_runtime(mut self, runtime: tokio::runtime::Runtime) -> Self {
self.ingress_runtime_handle = Some(runtime.handle().clone());
self.ingress_runtime = Some(runtime);
self
}

#[cfg(any(test, feature = "test-util"))]
pub fn pause_time(mut self, pause_time: bool) -> Self {
self.pause_time = pause_time;
Expand All @@ -94,7 +109,7 @@ impl TaskCenterBuilder {
pub fn build(mut self) -> Result<TaskCenter, TaskCenterBuildError> {
let options = self.options.unwrap_or_default();
if self.default_runtime_handle.is_none() {
let mut default_runtime_builder = tokio_builder(&options);
let mut default_runtime_builder = tokio_builder("worker", &options);
#[cfg(any(test, feature = "test-util"))]
if self.pause_time {
default_runtime_builder.start_paused(self.pause_time);
Expand All @@ -104,34 +119,63 @@ impl TaskCenterBuilder {
self.default_runtime = Some(default_runtime);
}

if self.ingress_runtime_handle.is_none() {
let mut ingress_runtime_builder = tokio_builder("ingress", &options);
#[cfg(any(test, feature = "test-util"))]
if self.pause_time {
ingress_runtime_builder.start_paused(self.pause_time);
}
let ingress_runtime = ingress_runtime_builder.build()?;
self.ingress_runtime_handle = Some(ingress_runtime.handle().clone());
self.ingress_runtime = Some(ingress_runtime);
}

metric_definitions::describe_metrics();
Ok(TaskCenter {
inner: Arc::new(TaskCenterInner {
default_runtime_handle: self.default_runtime_handle.unwrap(),
default_runtime: self.default_runtime,
ingress_runtime_handle: self.ingress_runtime_handle.unwrap(),
ingress_runtime: self.ingress_runtime,
global_cancel_token: CancellationToken::new(),
shutdown_requested: AtomicBool::new(false),
current_exit_code: AtomicI32::new(0),
tasks: Mutex::new(HashMap::new()),
global_metadata: OnceLock::new(),
pp_runtimes: Mutex::new(HashMap::with_capacity(64)),
}),
})
}
}

fn tokio_builder(common_opts: &CommonOptions) -> tokio::runtime::Builder {
fn tokio_builder(prefix: &'static str, common_opts: &CommonOptions) -> tokio::runtime::Builder {
let mut builder = tokio::runtime::Builder::new_multi_thread();
builder.enable_all().thread_name_fn(|| {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
let id = ATOMIC_ID.fetch_add(1, Ordering::Relaxed);
format!("rs:worker-{}", id)
builder.enable_all().thread_name_fn(move || {
let id = WORKER_ID.fetch_add(1, Ordering::Relaxed);
format!("rs:{}-{}", prefix, id)
});

builder.worker_threads(common_opts.default_thread_pool_size());

builder
}

fn tokio_pp_builder(pp_name: &'static str) -> tokio::runtime::Builder {
let mut builder = tokio::runtime::Builder::new_multi_thread();
builder.enable_all().thread_name_fn(move || {
let id = WORKER_ID.fetch_add(1, Ordering::Relaxed);
format!("rs:{}-{}", pp_name, id)
});

builder
.worker_threads(2)
.max_blocking_threads(2)
.event_interval(21)
.disable_lifo_slot();

builder
}

/// Task center is used to manage long-running and background tasks and their lifecycle.
#[derive(Clone)]
pub struct TaskCenter {
Expand All @@ -145,6 +189,15 @@ impl TaskCenter {
self.inner.default_runtime_handle.metrics()
}

pub fn ingress_runtime_metrics(&self) -> RuntimeMetrics {
self.inner.ingress_runtime_handle.metrics()
}

pub fn partition_processors_runtime_metrics(&self) -> Vec<(&'static str, RuntimeMetrics)> {
let guard = self.inner.pp_runtimes.lock().unwrap();
guard.iter().map(|(k, v)| (*k, v.metrics())).collect()
}

/// Use to monitor an on-going shutdown when requested
pub fn watch_shutdown(&self) -> WaitForCancellationFutureOwned {
self.inner.global_cancel_token.clone().cancelled_owned()
Expand All @@ -160,6 +213,14 @@ impl TaskCenter {
self.inner.current_exit_code.load(Ordering::Relaxed)
}

#[inline]
fn runtime_for_kind(&self, kind: TaskKind) -> &tokio::runtime::Handle {
match kind.runtime() {
crate::AsyncRuntime::Default => &self.inner.default_runtime_handle,
crate::AsyncRuntime::Ingress => &self.inner.ingress_runtime_handle,
}
}

/// Triggers a shutdown of the system. All running tasks will be asked gracefully
/// to cancel but we will only wait for tasks with a TaskKind that has the property
/// "OnCancel" set to "wait".
Expand Down Expand Up @@ -238,13 +299,31 @@ impl TaskCenter {
metadata,
future,
);
let join_handle = tokio_task
.spawn_on(fut, &inner.default_runtime_handle)
.expect("default runtime can spawn tasks");

let kind_str: &'static str = kind.into();
let join_handle = {
if matches!(kind, TaskKind::PartitionProcessor) {
let mut guard = self.inner.pp_runtimes.lock().unwrap();
// special case.
let runtime = guard
.entry(name)
.or_insert_with(|| tokio_pp_builder(name).build().unwrap())
.handle();

counter!(TC_SPAWN, "kind" => kind_str, "runtime" => name).increment(1);
tokio_task
.spawn_on(fut, runtime)
.expect("pp dedicated runtime can be spawned")
} else {
let runtime_name: &'static str = kind.runtime().into();
counter!(TC_SPAWN, "kind" => kind_str, "runtime" => runtime_name).increment(1);
tokio_task
.spawn_on(fut, self.runtime_for_kind(kind))
.expect("runtime can spawn tasks")
}
};
*handle_mut = Some(join_handle);
drop(handle_mut);
let kind_str: &'static str = kind.into();
counter!(TC_SPAWN, "kind" => kind_str).increment(1);
// Task is ready
id
}
Expand Down Expand Up @@ -601,11 +680,15 @@ impl TaskCenter {

struct TaskCenterInner {
default_runtime_handle: tokio::runtime::Handle,
ingress_runtime_handle: tokio::runtime::Handle,
pp_runtimes: Mutex<HashMap<&'static str, tokio::runtime::Runtime>>,
/// We hold on to the owned Runtime to ensure it's dropped when task center is dropped. If this
/// is None, it means that it's the responsibility of the Handle owner to correctly drop
/// tokio's runtime after dropping the task center.
#[allow(dead_code)]
default_runtime: Option<tokio::runtime::Runtime>,
#[allow(dead_code)]
ingress_runtime: Option<tokio::runtime::Runtime>,
global_cancel_token: CancellationToken,
shutdown_requested: AtomicBool,
current_exit_code: AtomicI32,
Expand Down Expand Up @@ -764,6 +847,7 @@ mod tests {
let tc = TaskCenterBuilder::default()
.options(common_opts)
.default_runtime_handle(tokio::runtime::Handle::current())
.ingress_runtime_handle(tokio::runtime::Handle::current())
.build()?;
let start = tokio::time::Instant::now();
tc.spawn(TaskKind::RoleRunner, "worker-role", None, async {
Expand Down
16 changes: 15 additions & 1 deletion crates/core/src/task_center_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pub enum TaskKind {
RpcServer,
/// A type for ingress until we start enforcing timeouts for inflight requests. This enables us
/// to shutdown cleanly without waiting indefinitely.
#[strum(props(OnCancel = "abort"))]
#[strum(props(OnCancel = "abort", runtime = "ingress"))]
IngressServer,
RoleRunner,
SystemService,
Expand Down Expand Up @@ -105,8 +105,22 @@ impl TaskKind {
fn on_error(&self) -> &'static str {
self.get_str("OnError").unwrap_or("shutdown")
}

pub fn runtime(&self) -> AsyncRuntime {
match self.get_str("runtime").unwrap_or("default") {
"default" => AsyncRuntime::Default,
"ingress" => AsyncRuntime::Ingress,
_ => panic!("Invalid runtime for task kind: {}", self),
}
}
}

pub enum FailureBehaviour {
Shutdown,
}

#[derive(Clone, Copy, Debug, Eq, PartialEq, strum_macros::IntoStaticStr, strum_macros::Display)]
pub enum AsyncRuntime {
Default,
Ingress,
}
1 change: 1 addition & 0 deletions crates/core/src/test_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ where
) -> Self {
let tc = TaskCenterBuilder::default()
.default_runtime_handle(tokio::runtime::Handle::current())
.ingress_runtime_handle(tokio::runtime::Handle::current())
.build()
.expect("task_center builds");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ where
.attach_to_span(&invocation_task_span);

info!(
invocation.id = %self.invocation_task.invocation_id,
deployment.address = %deployment.metadata.address_display(),
deployment.service_protocol_version = %self.service_protocol_version.as_repr(),
path = %path,
Expand Down
Loading

0 comments on commit 90e9fc7

Please sign in to comment.