diff --git a/Cargo.lock b/Cargo.lock index cdc2f2717..9b34a6d66 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2681,6 +2681,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "hostname" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9c7c7c8ac16c798734b8a24560c1362120597c40d5e1459f09498f8f6c8f2ba" +dependencies = [ + "cfg-if", + "libc", + "windows", +] + [[package]] name = "http" version = "0.2.12" @@ -4982,7 +4993,7 @@ dependencies = [ "enumset", "futures", "googletest", - "hostname", + "hostname 0.3.1", "humantime", "metrics", "rand", @@ -5842,10 +5853,12 @@ dependencies = [ "bytestring", "cfg_eval", "clap", + "derive_builder", "derive_more", "enum-map", "enumset", "googletest", + "hostname 0.4.0", "http 0.2.12", "humantime", "num-traits", @@ -5864,6 +5877,7 @@ dependencies = [ "test-log", "thiserror", "tokio", + "toml", "tracing", "tracing-opentelemetry", "ulid", @@ -7711,6 +7725,16 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be" +dependencies = [ + "windows-core", + "windows-targets 0.52.4", +] + [[package]] name = "windows-core" version = "0.52.0" diff --git a/crates/invoker-impl/src/options.rs b/crates/invoker-impl/src/options.rs index d80835aee..06f83d582 100644 --- a/crates/invoker-impl/src/options.rs +++ b/crates/invoker-impl/src/options.rs @@ -97,7 +97,9 @@ impl Default for Options { retry_policy: RetryPolicy::exponential( Duration::from_millis(50), 2.0, - usize::MAX, + // Practically no limit, but toml fails on loading anything higher than i64::MAX + // See https://github.com/toml-rs/toml/issues/705 for details + i64::MAX as usize, Some(Duration::from_secs(10)), ), inactivity_timeout: Duration::from_secs(60).into(), diff --git a/crates/types/Cargo.toml b/crates/types/Cargo.toml index 224356761..1ac50d6b5 100644 --- a/crates/types/Cargo.toml +++ b/crates/types/Cargo.toml @@ -11,7 +11,6 @@ publish = false default = [] test-util = [] -schemars = ["dep:schemars"] local_loglet = [] memory_loglet = [] @@ -23,10 +22,12 @@ arc-swap = { workspace = true } base64 = { workspace = true } bytes = { workspace = true } bytestring = { workspace = true } -clap = { workspace = true, optional = true } +clap = { workspace = true, features = ["std", "derive", "env"], optional = true } +derive_builder = { workspace = true } derive_more = { workspace = true } enum-map = { workspace = true } enumset = { workspace = true, features = ["serde"] } +hostname = { version = "0.4" } http = { workspace = true } humantime = { workspace = true } once_cell = { workspace = true } @@ -41,6 +42,7 @@ strum_macros = { workspace = true } sync_wrapper = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, default-features = false, features = ["time", "sync"]} +toml = { version = "0.8.12" } tracing = { workspace = true } tracing-opentelemetry = { workspace = true } ulid = { workspace = true } diff --git a/crates/types/src/config/admin.rs b/crates/types/src/config/admin.rs new file mode 100644 index 000000000..918982372 --- /dev/null +++ b/crates/types/src/config/admin.rs @@ -0,0 +1,52 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use serde::{Deserialize, Serialize}; +use serde_with::serde_as; +use std::net::SocketAddr; +use std::path::PathBuf; + +use super::{data_dir, QueryEngineOptions}; + +/// # Admin server options +#[serde_as] +#[derive(Debug, Clone, Serialize, Deserialize, derive_builder::Builder)] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[cfg_attr(feature = "schemars", schemars(rename = "AdminOptions", default))] +#[serde(rename_all = "kebab-case")] +#[builder(default)] +pub struct AdminOptions { + /// # Endpoint address + /// + /// Address to bind for the Admin APIs. + pub bind_address: SocketAddr, + + /// # Concurrency limit + /// + /// Concurrency limit for the Admin APIs. + pub concurrent_api_requests_limit: usize, + pub query_engine: QueryEngineOptions, +} + +impl AdminOptions { + pub fn data_dir(&self) -> PathBuf { + data_dir("registry") + } +} + +impl Default for AdminOptions { + fn default() -> Self { + Self { + bind_address: "0.0.0.0:9070".parse().unwrap(), + concurrent_api_requests_limit: i64::MAX as usize, + query_engine: Default::default(), + } + } +} diff --git a/crates/types/src/config/aws.rs b/crates/types/src/config/aws.rs new file mode 100644 index 000000000..97ae7d65b --- /dev/null +++ b/crates/types/src/config/aws.rs @@ -0,0 +1,33 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use serde::{Deserialize, Serialize}; +use serde_with::serde_as; + +/// # AWS options +#[serde_as] +#[derive(Debug, Default, Clone, Serialize, Deserialize, derive_builder::Builder)] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[cfg_attr(feature = "schemars", schemars(rename = "AwsClientOptions", default))] +#[builder(default)] +pub struct AwsOptions { + /// # AWS Profile + /// + /// Name of the AWS profile to select. Defaults to 'AWS_PROFILE' env var, or otherwise + /// the `default` profile. + pub aws_profile: Option, + + /// # AssumeRole external ID + /// + /// An external ID to apply to any AssumeRole operations taken by this client. + /// https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-user_externalid.html + /// Can be overridden by the `AWS_EXTERNAL_ID` environment variable. + pub aws_assume_role_external_id: Option, +} diff --git a/crates/types/src/config/bifrost.rs b/crates/types/src/config/bifrost.rs new file mode 100644 index 000000000..897820982 --- /dev/null +++ b/crates/types/src/config/bifrost.rs @@ -0,0 +1,112 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::path::PathBuf; +use std::time::Duration; + +use enum_map::Enum; +use serde::{Deserialize, Serialize}; +use serde_with::serde_as; + +use super::{data_dir, RocksDbOptions, RocksDbOptionsBuilder}; + +/// # Bifrost options +#[derive(Debug, Clone, Serialize, Deserialize, derive_builder::Builder)] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[cfg_attr(feature = "schemars", schemars(rename = "BifrostOptions", default))] +#[serde(rename_all = "kebab-case")] +#[builder(default)] +pub struct BifrostOptions { + /// # The default kind of loglet to be used + #[cfg_attr(feature = "schemars", schemars(with = "String"))] + pub default_provider: ProviderKind, + #[cfg_attr(feature = "schemars", schemars(with = "String"))] + /// Configuration of local loglet provider + pub local: LocalLogletOptions, +} + +impl Default for BifrostOptions { + fn default() -> Self { + Self { + default_provider: ProviderKind::Local, + local: LocalLogletOptions::default(), + } + } +} + +/// An enum with the list of supported loglet providers. +/// For each variant we must have a corresponding implementation of the +/// [`crate::loglet::Loglet`] trait +#[derive( + Debug, + Clone, + Hash, + Eq, + PartialEq, + Copy, + serde::Serialize, + serde::Deserialize, + Enum, + strum_macros::EnumIter, + strum_macros::Display, +)] +#[serde(rename_all = "kebab-case")] +pub enum ProviderKind { + /// A local rocksdb-backed loglet. + Local, + /// An in-memory loglet, primarily for testing. + InMemory, +} + +#[serde_as] +#[derive(Debug, Clone, Serialize, Deserialize, derive_builder::Builder)] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[cfg_attr(feature = "schemars", schemars(rename = "LocalLoglet", default))] +#[serde(rename_all = "kebab-case")] +#[builder(default)] +pub struct LocalLogletOptions { + #[serde(flatten)] + pub rocksdb: RocksDbOptions, + + /// Trigger a commit when the batch size exceeds this threshold. Set to 0 or 1 to commit the + /// write batch on every command. + pub writer_commit_batch_size_threshold: usize, + /// Trigger a commit when the time since the last commit exceeds this threshold. + #[serde_as(as = "serde_with::DisplayFromStr")] + #[cfg_attr(feature = "schemars", schemars(with = "String"))] + pub writer_commit_time_interval: humantime::Duration, + /// The maximum number of write commands that can be queued. + pub writer_queue_len: usize, + /// If true, rocksdb flushes follow writing record batches, otherwise, we + /// fallback to rocksdb automatic WAL flushes. + pub flush_wal_on_commit: bool, +} + +impl LocalLogletOptions { + pub fn data_dir(&self) -> PathBuf { + data_dir("local-loglet") + } +} + +impl Default for LocalLogletOptions { + fn default() -> Self { + let rocksdb = RocksDbOptionsBuilder::default() + .rocksdb_disable_wal(Some(false)) + .build() + .unwrap(); + Self { + rocksdb, + writer_commit_batch_size_threshold: 200, + writer_commit_time_interval: Duration::from_millis(13).into(), + writer_queue_len: 200, + flush_wal_on_commit: true, + } + } +} diff --git a/crates/types/src/config/cli_option_overrides.rs b/crates/types/src/config/cli_option_overrides.rs new file mode 100644 index 000000000..9463ae2e1 --- /dev/null +++ b/crates/types/src/config/cli_option_overrides.rs @@ -0,0 +1,137 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::path::PathBuf; + +use humantime::Duration; +use serde::Serialize; +use serde_with::serde_as; + +use crate::net::{AdvertisedAddress, BindAddress}; +use crate::nodes_config::Role; +use crate::PlainNodeId; + +use super::LogFormat; + +#[serde_as] +#[derive(Debug, Clone, clap::Parser, Serialize, Default)] +/// A subset of CommonOptions that can be parsed via the CLI. This **must** remain +/// parse-compatible with CommonOptions. +pub struct CommonOptionCliOverride { + /// Defines the roles which this Restate node should run, by default the node + /// starts with all roles. + #[clap(long, alias = "role")] + #[serde(skip_serializing_if = "Option::is_none")] + roles: Option>, + + /// Unique name for this node in the cluster. The node must not change unless + /// it's started with empty local store. It defaults to the node hostname. + #[clap(long, env = "RESTATE_NODE_NAME")] + #[serde(skip_serializing_if = "Option::is_none")] + node_name: Option, + + /// If set, the node insists on acquiring this node ID. + #[clap(long)] + #[serde(skip_serializing_if = "Option::is_none")] + force_node_id: Option, + + /// A unique identifier for the cluster. All nodes in the same cluster should + /// have the same. + #[clap(long, env = "RESTATE_CLUSTER_NAME")] + #[serde(skip_serializing_if = "Option::is_none")] + cluster_name: Option, + + /// If true, then a new cluster is bootstrapped. This node *must* have an admin + /// role and a new nodes configuration will be created that includes this node. + #[clap(long)] + #[serde(skip_serializing_if = "Option::is_none")] + allow_bootstrap: Option, + + /// The working directory which this Restate node should use for relative paths. The default is + /// `restate-data` under the current working directory. + #[clap(long)] + #[serde(skip_serializing_if = "Option::is_none")] + base_dir: Option, + + /// Address of the metadata store server to bootstrap the node from. + #[clap(long)] + #[serde(skip_serializing_if = "Option::is_none")] + metadata_store_address: Option, + + /// Address to bind for the Node server. e.g. `0.0.0.0:5122` + #[clap(long)] + #[serde(skip_serializing_if = "Option::is_none")] + bind_address: Option, + + /// Address that other nodes will use to connect to this node. Defaults to use bind_address if + /// unset. e.g. `http://127.0.0.1:5122/` + #[clap(long)] + #[serde(skip_serializing_if = "Option::is_none")] + advertise_address: Option, + + /// This timeout is used when shutting down the various Restate components to drain all the internal queues. + #[serde_as(as = "Option")] + #[serde(skip_serializing_if = "Option::is_none")] + #[clap(long)] + shutdown_timeout: Option, + + /// # Tracing Endpoint + /// + /// Specify the tracing endpoint to send traces to. + /// Traces will be exported using [OTLP gRPC](https://opentelemetry.io/docs/specs/otlp/#otlpgrpc) + /// through [opentelemetry_otlp](https://docs.rs/opentelemetry-otlp/0.12.0/opentelemetry_otlp/). + #[clap(long, env = "RESTATE_TRACING_ENDPOINT")] + #[serde(skip_serializing_if = "Option::is_none")] + tracing_endpoint: Option, + + /// # Distributed Tracing JSON Export Path + /// + /// If set, an exporter will be configured to write traces to files using the Jaeger JSON format. + /// Each trace file will start with the `trace` prefix. + /// + /// If unset, no traces will be written to file. + /// + /// It can be used to export traces in a structured format without configuring a Jaeger agent. + /// + /// To inspect the traces, open the Jaeger UI and use the Upload JSON feature to load and inspect them. + #[clap(long)] + #[serde(skip_serializing_if = "Option::is_none")] + tracing_json_path: Option, + + /// # Tracing Filter + /// + /// Distributed tracing exporter filter. + /// Check the [`RUST_LOG` documentation](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html) for more details how to configure it. + #[clap(long)] + #[serde(skip_serializing_if = "Option::is_none")] + tracing_filter: Option, + + /// # Logging Filter + /// + /// Log filter configuration. Can be overridden by the `RUST_LOG` environment variable. + /// Check the [`RUST_LOG` documentation](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html) for more details how to configure it. + #[clap(long)] + #[serde(skip_serializing_if = "Option::is_none")] + log_filter: Option, + + /// # Logging format + /// + /// Format to use when logging. + #[clap(long)] + #[serde(skip_serializing_if = "Option::is_none")] + log_format: Option, + + /// # Disable ANSI in log output + /// + /// Disable ANSI terminal codes for logs. This is useful when the log collector doesn't support processing ANSI terminal codes. + #[clap(long)] + #[serde(skip_serializing_if = "Option::is_none")] + log_disable_ansi_codes: Option, +} diff --git a/crates/types/src/config/common.rs b/crates/types/src/config/common.rs new file mode 100644 index 000000000..a7287e724 --- /dev/null +++ b/crates/types/src/config/common.rs @@ -0,0 +1,229 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::path::PathBuf; +use std::str::FromStr; + +use enumset::EnumSet; +use humantime::Duration; +use serde::{Deserialize, Serialize}; +use serde_with::serde_as; + +use crate::net::{AdvertisedAddress, BindAddress}; +use crate::nodes_config::Role; +use crate::PlainNodeId; + +use super::{AwsOptions, HttpOptions, RocksDbOptions}; + +const DEFAULT_STORAGE_DIRECTORY: &str = "restate-data"; + +#[serde_as] +#[derive(Debug, Clone, Serialize, Deserialize, derive_builder::Builder)] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[cfg_attr(feature = "schemars", schemars(default))] +#[serde(rename_all = "kebab-case")] +#[builder(default)] +pub struct CommonOptions { + /// Defines the roles which this Restate node should run, by default the node + /// starts with all roles. + #[cfg_attr(feature = "schemars", schemars(with = "Vec"))] + pub roles: EnumSet, + + /// # Node Name + /// + /// Unique name for this node in the cluster. The node must not change unless + /// it's started with empty local store. It defaults to the node hostname. + pub node_name: String, + + /// If set, the node insists on acquiring this node ID. + pub force_node_id: Option, + + /// # Cluster Name + /// + /// A unique identifier for the cluster. All nodes in the same cluster should + /// have the same. + pub cluster_name: String, + + /// If true, then a new cluster is bootstrapped. This node *must* be has an admin + /// role and a new nodes configuration will be created that includes this node. + pub allow_bootstrap: bool, + + /// The working directory which this Restate node should use for relative paths. The default is + /// `restate-data` under the current working directory. + pub base_dir: PathBuf, + + #[cfg_attr(feature = "schemars", schemars(with = "String"))] + /// Address of the metadata store server to bootstrap the node from. + pub metadata_store_address: AdvertisedAddress, + + /// Address to bind for the Node server. Default is `0.0.0.0:5122` + #[cfg_attr(feature = "schemars", schemars(with = "String"))] + pub bind_address: BindAddress, + + /// Address that other nodes will use to connect to this node. Default is `http://127.0.0.1:5122/` + #[cfg_attr(feature = "schemars", schemars(with = "String"))] + pub advertised_address: AdvertisedAddress, + + /// # Shutdown grace timeout + /// + /// This timeout is used when shutting down the various Restate components to drain all the internal queues. + /// + /// Can be configured using the [`humantime`](https://docs.rs/humantime/latest/humantime/fn.parse_duration.html) format. + #[serde_as(as = "serde_with::DisplayFromStr")] + #[cfg_attr(feature = "schemars", schemars(with = "String"))] + pub shutdown_timeout: Duration, + + /// # Default async runtime thread pool + /// + /// Size of the default thread pool used to perform internal tasks. + /// If not set, it defaults to the number of CPU cores. + pub default_thread_pool_size: Option, + + /// # Tracing Endpoint + /// + /// Specify the tracing endpoint to send traces to. + /// Traces will be exported using [OTLP gRPC](https://opentelemetry.io/docs/specs/otlp/#otlpgrpc) + /// through [opentelemetry_otlp](https://docs.rs/opentelemetry-otlp/0.12.0/opentelemetry_otlp/). + /// + /// To configure the sampling, please refer to the [opentelemetry autoconfigure docs](https://github.com/open-telemetry/opentelemetry-java/blob/main/sdk-extensions/autoconfigure/README.md#sampler). + pub tracing_endpoint: Option, + + /// # Distributed Tracing JSON Export Path + /// + /// If set, an exporter will be configured to write traces to files using the Jaeger JSON format. + /// Each trace file will start with the `trace` prefix. + /// + /// If unset, no traces will be written to file. + /// + /// It can be used to export traces in a structured format without configuring a Jaeger agent. + /// + /// To inspect the traces, open the Jaeger UI and use the Upload JSON feature to load and inspect them. + pub tracing_json_path: Option, + + /// # Tracing Filter + /// + /// Distributed tracing exporter filter. + /// Check the [`RUST_LOG` documentation](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html) for more details how to configure it. + pub tracing_filter: String, + /// # Logging Filter + /// + /// Log filter configuration. Can be overridden by the `RUST_LOG` environment variable. + /// Check the [`RUST_LOG` documentation](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html) for more details how to configure it. + pub log_filter: String, + + /// # Logging format + /// + /// Format to use when logging. + pub log_format: LogFormat, + + /// # Disable ANSI in log output + /// + /// Disable ANSI terminal codes for logs. This is useful when the log collector doesn't support processing ANSI terminal codes. + pub log_disable_ansi_codes: bool, + + /// Timeout for idle histograms. + /// + /// The duration after which a histogram is considered idle and will be removed from + /// metric responses to save memory. Unsetting means that histograms will never be removed. + #[serde(with = "serde_with::As::>")] + #[cfg_attr(feature = "schemars", schemars(with = "Option"))] + pub histogram_inactivity_timeout: Option, + + #[serde(flatten)] + pub service_client: ServiceClientOptions, + + /// Disable prometheus metric recording and reporting. Default is `false`. + pub disable_prometheus: bool, + + /// RocksDb general settings and memory limits + #[serde(flatten)] + pub rocksdb: RocksDbOptions, +} + +impl CommonOptions { + pub fn shutdown_grace_period(&self) -> std::time::Duration { + self.shutdown_timeout.into() + } +} + +impl Default for CommonOptions { + fn default() -> Self { + Self { + roles: EnumSet::all(), + node_name: hostname::get() + .map(|h| h.into_string().expect("hostname is valid unicode")) + .unwrap_or("localhost".to_owned()), + force_node_id: None, + cluster_name: "localcluster".to_owned(), + // boot strap the cluster by default. This is very likely to change in the future to be + // false by default. For now, this is true to make the converged deployment backward + // compatible and easy for users. + allow_bootstrap: true, + base_dir: std::env::current_dir() + .unwrap() + .join(DEFAULT_STORAGE_DIRECTORY), + metadata_store_address: "http://127.0.0.1:5123" + .parse() + .expect("valid metadata store address"), + bind_address: "0.0.0.0:5122".parse().unwrap(), + advertised_address: AdvertisedAddress::from_str("http://127.0.0.1:5122/").unwrap(), + histogram_inactivity_timeout: None, + disable_prometheus: false, + service_client: Default::default(), + shutdown_timeout: std::time::Duration::from_secs(60).into(), + tracing_endpoint: None, + tracing_json_path: None, + tracing_filter: "info".to_owned(), + log_filter: "warn,restate=info".to_string(), + log_format: Default::default(), + log_disable_ansi_codes: false, + default_thread_pool_size: None, + rocksdb: Default::default(), + } + } +} + +/// # Service Client options +#[serde_as] +#[derive(Debug, Clone, Serialize, Deserialize, derive_builder::Builder)] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[cfg_attr( + feature = "schemars", + schemars(rename = "ServiceClientOptions", default) +)] +#[builder(default)] +#[derive(Default)] +pub struct ServiceClientOptions { + #[serde(flatten)] + pub http: HttpOptions, + #[serde(flatten)] + pub lambda: AwsOptions, +} + +/// # Log format +#[cfg_attr(feature = "clap", derive(clap::ValueEnum))] +#[derive(Debug, Clone, Copy, Hash, Default, Serialize, Deserialize)] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[serde(rename_all = "kebab-case")] +pub enum LogFormat { + /// # Pretty + /// + /// Enables verbose logging. Not recommended in production. + #[default] + Pretty, + /// # Compact + /// + /// Enables compact logging. + Compact, + /// # Json + /// + /// Enables json logging. You can use a json log collector to ingest these logs and further process them. + Json, +} diff --git a/crates/types/src/config/http.rs b/crates/types/src/config/http.rs new file mode 100644 index 000000000..3b03ab286 --- /dev/null +++ b/crates/types/src/config/http.rs @@ -0,0 +1,166 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::fmt; +use std::str::FromStr; +use std::time::Duration; + +use http::uri::{InvalidUri, Scheme}; +use http::Uri; +use serde::{Deserialize, Serialize}; +use serde_with::serde_as; + +/// # HTTP client options +#[serde_as] +#[derive(Debug, Default, Clone, Serialize, Deserialize, derive_builder::Builder)] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[cfg_attr(feature = "schemars", schemars(rename = "HttpClientOptions", default))] +#[builder(default)] +#[serde(rename_all = "kebab-case")] +pub struct HttpOptions { + /// # HTTP/2 Keep-alive + /// + /// Configuration for the HTTP/2 keep-alive mechanism, using PING frames. + /// If unset, HTTP/2 keep-alive are disabled. + pub http_keep_alive_options: Http2KeepAliveOptions, + /// # Proxy URI + /// + /// A URI, such as `http://127.0.0.1:10001`, of a server to which all invocations should be sent, with the `Host` header set to the deployment URI. + /// HTTPS proxy URIs are supported, but only HTTP endpoint traffic will be proxied currently. + /// Can be overridden by the `HTTP_PROXY` environment variable. + #[cfg_attr(feature = "schemars", schemars(with = "Option"))] + pub http_proxy: Option, +} + +/// # HTTP/2 Keep alive options +/// +/// Configuration for the HTTP/2 keep-alive mechanism, using PING frames. +/// +/// Please note: most gateways don't propagate the HTTP/2 keep-alive between downstream and upstream hosts. +/// In those environments, you need to make sure the gateway can detect a broken connection to the upstream deployment(s). +#[serde_as] +#[derive(Debug, Clone, Serialize, Deserialize, derive_builder::Builder)] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[cfg_attr(feature = "schemars", schemars(default))] +#[serde(rename_all = "kebab-case")] +pub struct Http2KeepAliveOptions { + /// # HTTP/2 Keep-alive interval + /// + /// Sets an interval for HTTP/2 PING frames should be sent to keep a + /// connection alive. + /// + /// You should set this timeout with a value lower than the `abort_timeout`. + #[serde_as(as = "serde_with::DisplayFromStr")] + #[cfg_attr(feature = "schemars", schemars(with = "String"))] + pub interval: humantime::Duration, + + /// # Timeout + /// + /// Sets a timeout for receiving an acknowledgement of the keep-alive ping. + /// + /// If the ping is not acknowledged within the timeout, the connection will + /// be closed. + #[serde_as(as = "serde_with::DisplayFromStr")] + #[cfg_attr(feature = "schemars", schemars(with = "String"))] + pub timeout: humantime::Duration, +} + +impl Default for Http2KeepAliveOptions { + fn default() -> Self { + Self { + interval: Http2KeepAliveOptions::default_interval(), + timeout: Http2KeepAliveOptions::default_timeout(), + } + } +} + +impl Http2KeepAliveOptions { + #[inline] + fn default_interval() -> humantime::Duration { + (Duration::from_secs(40)).into() + } + + #[inline] + fn default_timeout() -> humantime::Duration { + (Duration::from_secs(20)).into() + } +} + +#[derive(Clone, Debug, thiserror::Error)] +#[error("invalid proxy Uri (must have scheme, authority, and path): {0}")] +pub struct InvalidProxyUri(Uri); + +#[derive(Clone, Debug, Hash, serde::Serialize, serde::Deserialize)] +#[serde(try_from = "String", into = "String")] +pub struct ProxyUri { + uri: Uri, +} + +impl fmt::Display for ProxyUri { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.uri.fmt(f) + } +} + +impl TryFrom for ProxyUri { + type Error = ProxyFromStrError; + + fn try_from(value: String) -> Result { + ProxyUri::from_str(&value) + } +} + +impl From for String { + fn from(value: ProxyUri) -> Self { + value.to_string() + } +} + +#[derive(Debug, thiserror::Error)] +pub enum ProxyFromStrError { + #[error(transparent)] + InvalidUri(#[from] InvalidUri), + #[error(transparent)] + InvalidProxyUri(#[from] InvalidProxyUri), +} + +impl FromStr for ProxyUri { + type Err = ProxyFromStrError; + fn from_str(s: &str) -> Result { + Ok(Self::new(Uri::from_str(s)?)?) + } +} + +impl ProxyUri { + pub fn new(proxy_uri: Uri) -> Result { + match proxy_uri.clone().into_parts() { + // all three must be present + http::uri::Parts { + scheme: Some(_), + authority: Some(_), + path_and_query: Some(_), + .. + } => Ok(Self { uri: proxy_uri }), + _ => Err(InvalidProxyUri(proxy_uri)), + } + } + + pub fn dst(&self, dst: Uri) -> Uri { + // only proxy non TLS traffic, otherwise just pass through directly to underlying connector + if dst.scheme() != Some(&Scheme::HTTPS) { + let mut parts = self.clone().uri.into_parts(); + parts.path_and_query = dst.path_and_query().cloned(); + + Uri::from_parts(parts).unwrap() + } else { + dst + } + } +} diff --git a/crates/types/src/config/ingress.rs b/crates/types/src/config/ingress.rs new file mode 100644 index 000000000..0b5d2bb7d --- /dev/null +++ b/crates/types/src/config/ingress.rs @@ -0,0 +1,46 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::net::SocketAddr; + +use serde::{Deserialize, Serialize}; + +use super::KafkaIngressOptions; + +/// # Ingress options +#[derive(Debug, Clone, Serialize, Deserialize, derive_builder::Builder)] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[cfg_attr(feature = "schemars", schemars(rename = "IngressOptions"))] +#[cfg_attr(feature = "schemars", schemars(default))] +#[serde(rename_all = "kebab-case")] +#[builder(default)] +pub struct IngressOptions { + /// # Bind address + /// + /// The address to bind for the ingress. + pub bind_address: SocketAddr, + + /// # Concurrency limit + /// + /// Local concurrency limit to use to limit the amount of concurrent requests. If exceeded, the ingress will reply immediately with an appropriate status code. + pub concurrent_api_requests_limit: usize, + + pub kafka: KafkaIngressOptions, +} + +impl Default for IngressOptions { + fn default() -> Self { + Self { + bind_address: "0.0.0.0:8080".parse().unwrap(), + concurrent_api_requests_limit: i64::MAX as usize, + kafka: Default::default(), + } + } +} diff --git a/crates/types/src/config/kafka.rs b/crates/types/src/config/kafka.rs new file mode 100644 index 000000000..b81555ef1 --- /dev/null +++ b/crates/types/src/config/kafka.rs @@ -0,0 +1,47 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::collections::HashMap; + +use serde::{Deserialize, Serialize}; + +/// # Kafka cluster options +/// +/// Configuration options to connect to a Kafka cluster. +#[derive(Debug, Clone, Serialize, Deserialize, derive_builder::Builder)] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[serde(rename_all = "kebab-case")] +pub struct KafkaClusterOptions { + /// # Servers + /// + /// Initial list of brokers (host or host:port). + pub(crate) brokers: Vec, + + /// # Additional options + /// + /// Free floating list of kafka options in the same form of rdkafka. For more details on all the available options: + /// https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md + #[serde(flatten, skip_serializing_if = "HashMap::is_empty")] + pub(crate) additional_options: HashMap, +} + +/// # Subscription options +#[derive(Debug, Default, Clone, Serialize, Deserialize, derive_builder::Builder)] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[cfg_attr(feature = "schemars", schemars(rename = "SubscriptionOptions"))] +#[serde(rename_all = "kebab-case")] +#[builder(default)] +pub struct KafkaIngressOptions { + /// # Kafka clusters + /// + /// Configuration parameters for the known kafka clusters + #[serde(flatten)] + pub(crate) clusters: HashMap, +} diff --git a/crates/types/src/config/metadata_store.rs b/crates/types/src/config/metadata_store.rs new file mode 100644 index 000000000..a8c72f827 --- /dev/null +++ b/crates/types/src/config/metadata_store.rs @@ -0,0 +1,49 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::path::PathBuf; + +use serde::{Deserialize, Serialize}; + +use super::{data_dir, RocksDbOptions}; +use crate::net::BindAddress; + +#[derive(Debug, Clone, Serialize, Deserialize, derive_builder::Builder)] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[cfg_attr( + feature = "schemars", + schemars(rename = "LocalMetadataStoreOptions", default) +)] +#[serde(rename_all = "kebab-case")] +#[builder(default)] +pub struct MetadataStoreOptions { + /// Address to which the metadata store will bind to. + #[cfg_attr(feature = "schemars", schemars(with = "String"))] + pub bind_address: BindAddress, + /// Number of in-flight metadata store requests. + pub request_queue_length: usize, + pub rocksdb: RocksDbOptions, +} + +impl MetadataStoreOptions { + pub fn data_dir(&self) -> PathBuf { + data_dir("local-metadata-store") + } +} + +impl Default for MetadataStoreOptions { + fn default() -> Self { + Self { + bind_address: "0.0.0.0:5123".parse().expect("valid bind address"), + request_queue_length: 32, + rocksdb: Default::default(), + } + } +} diff --git a/crates/types/src/config/mod.rs b/crates/types/src/config/mod.rs new file mode 100644 index 000000000..4021f9ef2 --- /dev/null +++ b/crates/types/src/config/mod.rs @@ -0,0 +1,168 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. +mod util; + +pub use util::*; +mod admin; +mod aws; +mod bifrost; +#[cfg(feature = "clap")] +mod cli_option_overrides; +mod common; +mod http; +mod ingress; +mod kafka; +mod metadata_store; +mod query_engine; +mod rocksdb; +mod worker; + +pub use admin::*; +pub use aws::*; +pub use bifrost::*; +#[cfg(feature = "clap")] +pub use cli_option_overrides::*; +pub use common::*; +pub use http::*; +pub use ingress::*; +pub use kafka::*; +pub use metadata_store::*; +pub use query_engine::*; +pub use rocksdb::*; +pub use worker::*; + +use std::path::PathBuf; +use std::sync::{Arc, OnceLock}; + +use arc_swap::ArcSwap; +use once_cell::sync::Lazy; +use serde::{Deserialize, Serialize}; +use serde_with::serde_as; + +use super::arc_util::{ArcSwapExt, Pinned, Updateable}; +use crate::errors::GenericError; + +static CONFIGURATION: Lazy> = Lazy::new(ArcSwap::default); +static NODE_BASE_DIR: OnceLock = OnceLock::new(); + +fn data_dir(dir: &str) -> PathBuf { + NODE_BASE_DIR + .get() + .expect("base_dir is initialized") + .join(dir) +} + +/// Set the current configuration, this is temporary until we have a dedicated configuration loader +/// thread. +pub fn set_current_config(config: Configuration) { + let proposed_cwd = config.common.base_dir.clone(); + // todo: potentially validate the config + CONFIGURATION.store(Arc::new(config)); + NODE_BASE_DIR.get_or_init(|| { + if let Err(e) = std::env::set_current_dir(&proposed_cwd) { + eprintln!("[WARN] Failed to set current working directory: {}", e); + }; + proposed_cwd + }); + notify_config_update(); +} + +/// # Restate configuration file +/// +/// Configuration for the Restate single binary deployment. +/// +/// You can specify the configuration file to use through the `--config-file ` argument or +/// with `RESTATE_CONFIG=` environment variable. +/// +/// Each configuration entry can be overridden using environment variables, +/// prefixing them with `RESTATE_` and separating nested structs with `__` (double underscore). +/// +/// For example, to configure `admin.bind_address`, the corresponding environment variable is `RESTATE_ADMIN__BIND_ADDRESS`. +#[serde_as] +#[derive(Debug, Clone, Default, Serialize, Deserialize, derive_builder::Builder)] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[cfg_attr(feature = "schemars", schemars(default))] +#[builder(default)] +#[serde(rename_all = "kebab-case")] +pub struct Configuration { + #[serde(flatten)] + pub common: CommonOptions, + pub worker: WorkerOptions, + pub admin: AdminOptions, + pub ingress: IngressOptions, + pub bifrost: BifrostOptions, + pub metadata_store: MetadataStoreOptions, +} + +impl Configuration { + // Access the raw swappable configuration + pub fn current() -> &'static impl ArcSwapExt { + &CONFIGURATION + } + + /// Potentially fast access to a snapshot, should be used if an Updateable + /// isn't possible (Updateable trait is not object-safe, and requires mut to load()). + /// Guard acquired doesn't track config updates. ~10x slower than Updateable's load(). + /// + /// There’s only limited number of “fast” slots for borrowing from the underlying ArcSwap + /// for each single thread (currently 8, but this might change). If these run out, the + /// algorithm falls back to slower path (fallback to `snapshot()`). + /// + /// If too many Guards are kept around, the performance might be poor. These are not intended + /// to be stored in data structures or used across async yield points. + pub fn pinned() -> Pinned { + CONFIGURATION.pinned() + } + + /// The best way to access an updateable when holding a mutable Updateable is + /// viable. + /// + /// ~10% slower than `snapshot()` to create (YMMV), load() is as fast as accessing local objects, + /// and will always load the latest configuration reference. The downside is that `load()` requires + /// exclusive reference. This should be the preferred method for accessing the updateable, but + /// avoid using `to_updateable()` or `snapshot()` in tight loops. Instead, get a new updateable, + /// and pass it down to the loop by value for very efficient access. + pub fn updateable() -> impl Updateable { + CONFIGURATION.to_updateable() + } + + pub fn updateable_common() -> impl Updateable { + CONFIGURATION.map_as_updateable(|c| &c.common) + } + + pub fn updateable_worker() -> impl Updateable { + CONFIGURATION.map_as_updateable(|c| &c.worker) + } + + /// Create an updateable that projects a part of the config + pub fn mapped_updateable(f: F) -> impl Updateable + where + F: FnMut(&Arc) -> &U, + { + Configuration::current().map_as_updateable(f) + } + + pub fn apply_rocksdb_common(mut self) -> Self { + self.worker.rocksdb.apply_common(&self.common.rocksdb); + self.bifrost + .local + .rocksdb + .apply_common(&self.common.rocksdb); + self.metadata_store + .rocksdb + .apply_common(&self.common.rocksdb); + self + } + + /// Dumps the configuration to a string + pub fn dump(&self) -> Result { + Ok(toml::to_string_pretty(self)?) + } +} diff --git a/crates/types/src/config/query_engine.rs b/crates/types/src/config/query_engine.rs new file mode 100644 index 000000000..3d03ad525 --- /dev/null +++ b/crates/types/src/config/query_engine.rs @@ -0,0 +1,52 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::net::SocketAddr; + +use serde::{Deserialize, Serialize}; + +/// # Storage query engine options +#[derive(Debug, Clone, Serialize, Deserialize, derive_builder::Builder)] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[cfg_attr(feature = "schemars", schemars(rename = "QueryEngineOptions"))] +#[cfg_attr(feature = "schemars", schemars(default))] +#[serde(rename_all = "kebab-case")] +pub struct QueryEngineOptions { + /// # Memory limit + /// + /// The total memory in bytes that can be used to preform sql queries + pub memory_limit: Option, + + /// # Temp folder to use for spill + /// + /// The path to spill to + pub tmp_dir: Option, + + /// # Default query parallelism + /// + /// The number of parallel partitions to use for a query execution + pub query_parallelism: Option, + + /// # Pgsql Bind address + /// + /// The address to bind for the psql service. + pub pgsql_bind_address: SocketAddr, +} + +impl Default for QueryEngineOptions { + fn default() -> Self { + Self { + memory_limit: None, + tmp_dir: None, + query_parallelism: None, + pgsql_bind_address: "0.0.0.0:9071".parse().unwrap(), + } + } +} diff --git a/crates/types/src/config/rocksdb.rs b/crates/types/src/config/rocksdb.rs new file mode 100644 index 000000000..fa920e4a4 --- /dev/null +++ b/crates/types/src/config/rocksdb.rs @@ -0,0 +1,107 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Default, Serialize, Deserialize, derive_builder::Builder)] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[cfg_attr( + feature = "schemars", + schemars(rename = "WorkerRocksDbOptions", default) +)] +#[serde(rename_all = "kebab-case")] +#[builder(default)] +// NOTE: Prefix with rocksdb_ +pub struct RocksDbOptions { + /// # Threads + /// + /// The number of threads to reserve to Rocksdb background tasks. + #[serde(skip_serializing_if = "Option::is_none")] + rocksdb_num_threads: Option, + + /// # Write Buffer size + /// + /// The size of a single memtable. Once memtable exceeds this size, it is marked immutable and a new one is created. + /// The default is set such that 3 column families per table will use a total of 50% of the global memory limit + /// (`MEMORY_LIMIT`), which defaults to 3GiB, leading to a value of 64MiB with 8 tables. + #[serde(skip_serializing_if = "Option::is_none")] + rocksdb_write_buffer_size: Option, + + /// # Maximum total WAL size + /// + /// Max WAL size, that after this Rocksdb start flushing mem tables to disk. + /// Default is 2GB. + #[serde(skip_serializing_if = "Option::is_none")] + rocksdb_max_total_wal_size: Option, + + /// # Disable WAL + /// + /// The default depends on the different rocksdb use-cases at Restate. + #[serde(skip_serializing_if = "Option::is_none")] + rocksdb_disable_wal: Option, + + /// # Maximum cache size + /// + /// The memory size used for rocksdb caches. Default is 1GB. + #[serde(skip_serializing_if = "Option::is_none")] + rocksdb_cache_size: Option, + + /// Disable rocksdb statistics collection + #[serde(skip_serializing_if = "Option::is_none")] + rocksdb_disable_statistics: Option, +} + +impl RocksDbOptions { + pub fn apply_common(&mut self, common: &RocksDbOptions) { + // apply memory limits? + if self.rocksdb_num_threads.is_none() { + self.rocksdb_num_threads = Some(common.rocksdb_num_threads()); + } + if self.rocksdb_write_buffer_size.is_none() { + self.rocksdb_write_buffer_size = Some(common.rocksdb_write_buffer_size()); + } + if self.rocksdb_max_total_wal_size.is_none() { + self.rocksdb_max_total_wal_size = Some(common.rocksdb_max_total_wal_size()); + } + if self.rocksdb_disable_wal.is_none() { + self.rocksdb_disable_wal = Some(common.rocksdb_disable_wal()); + } + if self.rocksdb_cache_size.is_none() { + self.rocksdb_cache_size = Some(common.rocksdb_cache_size()); + } + if self.rocksdb_disable_statistics.is_none() { + self.rocksdb_disable_statistics = Some(common.rocksdb_disable_statistics()); + } + } + + pub fn rocksdb_num_threads(&self) -> usize { + self.rocksdb_num_threads.unwrap_or(10) + } + + pub fn rocksdb_write_buffer_size(&self) -> usize { + self.rocksdb_write_buffer_size.unwrap_or(0) + } + + pub fn rocksdb_max_total_wal_size(&self) -> u64 { + self.rocksdb_max_total_wal_size.unwrap_or(2 * (1 << 30)) + } + + pub fn rocksdb_disable_wal(&self) -> bool { + self.rocksdb_disable_wal.unwrap_or(true) + } + + pub fn rocksdb_cache_size(&self) -> usize { + self.rocksdb_cache_size.unwrap_or(1 << 30) + } + + pub fn rocksdb_disable_statistics(&self) -> bool { + self.rocksdb_disable_statistics.unwrap_or(false) + } +} diff --git a/crates/types/src/config.rs b/crates/types/src/config/util.rs similarity index 100% rename from crates/types/src/config.rs rename to crates/types/src/config/util.rs diff --git a/crates/types/src/config/worker.rs b/crates/types/src/config/worker.rs new file mode 100644 index 000000000..fe745ce15 --- /dev/null +++ b/crates/types/src/config/worker.rs @@ -0,0 +1,163 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::path::PathBuf; +use std::time::Duration; + +use serde::{Deserialize, Serialize}; +use serde_with::serde_as; + +use crate::retries::RetryPolicy; + +use super::{data_dir, RocksDbOptions, RocksDbOptionsBuilder}; + +/// # Worker options +#[derive(Debug, Clone, Serialize, Deserialize, derive_builder::Builder)] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[cfg_attr(feature = "schemars", schemars(rename = "WorkerOptions", default))] +#[serde(rename_all = "kebab-case")] +#[builder(default)] +pub struct WorkerOptions { + /// # Internal queue for partition processor communication + pub internal_queue_length: usize, + + /// # Num timers in memory limit + /// + /// The number of timers in memory limit is used to bound the amount of timers loaded in memory. If this limit is set, when exceeding it, the timers farther in the future will be spilled to disk. + pub num_timers_in_memory_limit: Option, + + #[serde(flatten)] + pub rocksdb: RocksDbOptions, + + pub invoker: InvokerOptions, + + /// # Partitions + /// + /// Number of partitions that will be provisioned during cluster bootstrap, + /// partitions used to process messages. + /// + /// NOTE: This config entry only impacts the initial number of partitions, the + /// value of this entry is ignored for bootstrapped nodes/clusters. + /// + /// Cannot be higher than `4611686018427387903` (You should almost never need as many partitions anyway) + pub bootstrap_num_partitions: u64, +} + +impl WorkerOptions { + pub fn data_dir(&self) -> PathBuf { + data_dir("db") + } +} + +impl Default for WorkerOptions { + fn default() -> Self { + let rocksdb = RocksDbOptionsBuilder::default() + .rocksdb_disable_wal(Some(true)) + .build() + .unwrap(); + + Self { + internal_queue_length: 64, + num_timers_in_memory_limit: None, + rocksdb, + invoker: Default::default(), + bootstrap_num_partitions: 64, + } + } +} + +/// # Invoker options +#[serde_as] +#[derive(Debug, Clone, Serialize, Deserialize, derive_builder::Builder)] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[cfg_attr(feature = "schemars", schemars(rename = "InvokerOptions", default))] +#[builder(default)] +#[serde(rename_all = "kebab-case")] +pub struct InvokerOptions { + /// # Retry policy + /// + /// Retry policy to use for all the invocations handled by this invoker. + retry_policy: RetryPolicy, + + /// # Inactivity timeout + /// + /// This timer guards against stalled service/handler invocations. Once it expires, + /// Restate triggers a graceful termination by asking the service invocation to + /// suspend (which preserves intermediate progress). + /// + /// The 'abort timeout' is used to abort the invocation, in case it doesn't react to + /// the request to suspend. + /// + /// Can be configured using the [`humantime`](https://docs.rs/humantime/latest/humantime/fn.parse_duration.html) format. + #[serde_as(as = "serde_with::DisplayFromStr")] + #[cfg_attr(feature = "schemars", schemars(with = "String"))] + inactivity_timeout: humantime::Duration, + + /// # Abort timeout + /// + /// This timer guards against stalled service/handler invocations that are supposed to + /// terminate. The abort timeout is started after the 'inactivity timeout' has expired + /// and the service/handler invocation has been asked to gracefully terminate. Once the + /// timer expires, it will abort the service/handler invocation. + /// + /// This timer potentially **interrupts** user code. If the user code needs longer to + /// gracefully terminate, then this value needs to be set accordingly. + /// + /// Can be configured using the [`humantime`](https://docs.rs/humantime/latest/humantime/fn.parse_duration.html) format. + #[serde_as(as = "serde_with::DisplayFromStr")] + #[cfg_attr(feature = "schemars", schemars(with = "String"))] + abort_timeout: humantime::Duration, + + /// # Message size warning + /// + /// Threshold to log a warning in case protocol messages coming from a service are larger than the specified amount. + message_size_warning: usize, + + /// # Message size limit + /// + /// Threshold to fail the invocation in case protocol messages coming from a service are larger than the specified amount. + message_size_limit: Option, + + /// # Temporary directory + /// + /// Temporary directory to use for the invoker temporary files. + /// If empty, the system temporary directory will be used instead. + tmp_dir: PathBuf, + + /// # Limit number of concurrent invocations from this node + /// + /// Number of concurrent invocations that can be processed by the invoker. + concurrent_invocations_limit: Option, + + // -- Private config options (not exposed in the schema) + #[cfg_attr(feature = "schemars", schemars(skip))] + disable_eager_state: bool, +} + +impl Default for InvokerOptions { + fn default() -> Self { + Self { + retry_policy: RetryPolicy::exponential( + Duration::from_millis(50), + 2.0, + // see https://github.com/toml-rs/toml/issues/705 + i64::MAX as usize, + Some(Duration::from_secs(10)), + ), + inactivity_timeout: Duration::from_secs(60).into(), + abort_timeout: Duration::from_secs(60).into(), + message_size_warning: 1024 * 1024 * 10, // 10mb + message_size_limit: None, + tmp_dir: std::env::temp_dir().join(format!("{}-{}", "invoker", ulid::Ulid::new())), + concurrent_invocations_limit: None, + disable_eager_state: false, + } + } +} diff --git a/crates/types/src/nodes_config.rs b/crates/types/src/nodes_config.rs index e16161373..e0fb5f6a1 100644 --- a/crates/types/src/nodes_config.rs +++ b/crates/types/src/nodes_config.rs @@ -34,9 +34,9 @@ pub enum NodesConfigError { // PartialEq+Eq+Clone+Copy are implemented by EnumSetType #[derive(Debug, Hash, EnumSetType, strum_macros::Display, serde::Serialize, serde::Deserialize)] #[enumset(serialize_repr = "list")] -#[serde(rename_all = "snake_case")] +#[serde(rename_all = "kebab-case")] #[cfg_attr(feature = "clap", derive(clap::ValueEnum))] -#[cfg_attr(feature = "clap", clap(rename_all = "snake_case"))] +#[cfg_attr(feature = "clap", clap(rename_all = "kebab-case"))] pub enum Role { /// A worker runs partition processor (journal, state, and drives invocations) Worker, diff --git a/crates/types/src/retries.rs b/crates/types/src/retries.rs index 4733762ba..4cd35363e 100644 --- a/crates/types/src/retries.rs +++ b/crates/types/src/retries.rs @@ -53,7 +53,11 @@ const DEFAULT_JITTER_MULTIPLIER: f32 = 0.3; /// } /// ``` #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -#[serde(tag = "type")] +#[serde( + tag = "type", + rename_all = "kebab-case", + rename_all_fields = "kebab-case" +)] #[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] #[cfg_attr( feature = "schemars",