Skip to content

Commit

Permalink
New config types in restate-types
Browse files Browse the repository at this point in the history
This introduces a revised configuration structure with the following attributes:
- Revised names
- kebab-case serialization
- Toml dump()
- new base_dir option (override-able via cli) to set the root for data directories
- individual data directories are not configurable.
- More liberal defaults (in some cases)
- Unifying rocksdb options with ability to override


Default config dump example:
```toml
roles = [
    "worker",
    "admin",
    "metadata-store",
]
node-name = "localhost"
cluster-name = "localcluster"
allow-bootstrap = true
base-dir = "/Users/asoli/workspace/restatedev/restate/restate-data"
metadata-store-address = "http://127.0.0.1:5123/"
bind-address = "0.0.0.0:5122"
advertise-address = "http://127.0.0.1:5122/"
shutdown-timeout = "1m"
tracing-filter = "info"
log-filter = "warn,restate=info"
log-format = "pretty"
log-disable-ansi-codes = false
disable-prometheus = false

[http-keep-alive-options]
interval = "40s"
timeout = "20s"

[worker]
internal-queue-length = 64
rocksdb-disable-wal = true
bootstrap-num-partitions = 64

[worker.invoker]
inactivity-timeout = "1m"
abort-timeout = "1m"
message-size-warning = 10485760
tmp-dir = "/tmp/invoker-01HTHYN4JEVVSZDR13SZ1RWWK6"
disable-eager-state = false

[worker.invoker.retry-policy]
type = "exponential"
initial_interval = "50ms"
factor = 2.0
max_attempts = 9223372036854775807
max_interval = "10s"

[admin]
bind-address = "0.0.0.0:9070"
concurrent-api-requests-limit = 9223372036854775807

[admin.query-engine]
pgsql-bind-address = "0.0.0.0:9071"

[ingress]
bind-address = "0.0.0.0:8080"
concurrent-api-requests-limit = 9223372036854775807

[ingress.kafka]

[bifrost]
default-provider = "local"

[bifrost.local]
rocksdb-disable-wal = false
writer-commit-batch-size-threshold = 200
writer-commit-time-interval = "13ms"
writer-queue-len = 200
flush-wal-on-commit = true

[metadata-store]
bind-address = "0.0.0.0:5123"
request-queue-length = 32

```
  • Loading branch information
AhmedSoliman committed Apr 4, 2024
1 parent ae19c06 commit 4c5c347
Show file tree
Hide file tree
Showing 19 changed files with 1,400 additions and 7 deletions.
26 changes: 25 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion crates/invoker-impl/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ impl Default for Options {
retry_policy: RetryPolicy::exponential(
Duration::from_millis(50),
2.0,
usize::MAX,
// Practically no limit, but toml fails on loading anything higher than i64::MAX
// See https://github.com/toml-rs/toml/issues/705 for details
i64::MAX as usize,
Some(Duration::from_secs(10)),
),
inactivity_timeout: Duration::from_secs(60).into(),
Expand Down
6 changes: 4 additions & 2 deletions crates/types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ publish = false
default = []

test-util = []
schemars = ["dep:schemars"]
local_loglet = []
memory_loglet = []

Expand All @@ -23,10 +22,12 @@ arc-swap = { workspace = true }
base64 = { workspace = true }
bytes = { workspace = true }
bytestring = { workspace = true }
clap = { workspace = true, optional = true }
clap = { workspace = true, features = ["std", "derive", "env"], optional = true }
derive_builder = { workspace = true }
derive_more = { workspace = true }
enum-map = { workspace = true }
enumset = { workspace = true, features = ["serde"] }
hostname = { version = "0.4" }
http = { workspace = true }
humantime = { workspace = true }
once_cell = { workspace = true }
Expand All @@ -41,6 +42,7 @@ strum_macros = { workspace = true }
sync_wrapper = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, default-features = false, features = ["time", "sync"]}
toml = { version = "0.8.12" }
tracing = { workspace = true }
tracing-opentelemetry = { workspace = true }
ulid = { workspace = true }
Expand Down
52 changes: 52 additions & 0 deletions crates/types/src/config/admin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use std::net::SocketAddr;
use std::path::PathBuf;

use super::{data_dir, QueryEngineOptions};

/// # Admin server options
#[serde_as]
#[derive(Debug, Clone, Serialize, Deserialize, derive_builder::Builder)]
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
#[cfg_attr(feature = "schemars", schemars(rename = "AdminOptions", default))]
#[serde(rename_all = "kebab-case")]
#[builder(default)]
pub struct AdminOptions {
/// # Endpoint address
///
/// Address to bind for the Admin APIs.
pub bind_address: SocketAddr,

/// # Concurrency limit
///
/// Concurrency limit for the Admin APIs.
pub concurrent_api_requests_limit: usize,
pub query_engine: QueryEngineOptions,
}

impl AdminOptions {
pub fn data_dir(&self) -> PathBuf {
data_dir("registry")
}
}

impl Default for AdminOptions {
fn default() -> Self {
Self {
bind_address: "0.0.0.0:9070".parse().unwrap(),
concurrent_api_requests_limit: i64::MAX as usize,
query_engine: Default::default(),
}
}
}
33 changes: 33 additions & 0 deletions crates/types/src/config/aws.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use serde::{Deserialize, Serialize};
use serde_with::serde_as;

/// # AWS options
#[serde_as]
#[derive(Debug, Default, Clone, Serialize, Deserialize, derive_builder::Builder)]
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
#[cfg_attr(feature = "schemars", schemars(rename = "AwsClientOptions", default))]
#[builder(default)]
pub struct AwsOptions {
/// # AWS Profile
///
/// Name of the AWS profile to select. Defaults to 'AWS_PROFILE' env var, or otherwise
/// the `default` profile.
pub aws_profile: Option<String>,

/// # AssumeRole external ID
///
/// An external ID to apply to any AssumeRole operations taken by this client.
/// https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-user_externalid.html
/// Can be overridden by the `AWS_EXTERNAL_ID` environment variable.
pub aws_assume_role_external_id: Option<String>,
}
112 changes: 112 additions & 0 deletions crates/types/src/config/bifrost.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::path::PathBuf;
use std::time::Duration;

use enum_map::Enum;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;

use super::{data_dir, RocksDbOptions, RocksDbOptionsBuilder};

/// # Bifrost options
#[derive(Debug, Clone, Serialize, Deserialize, derive_builder::Builder)]
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
#[cfg_attr(feature = "schemars", schemars(rename = "BifrostOptions", default))]
#[serde(rename_all = "kebab-case")]
#[builder(default)]
pub struct BifrostOptions {
/// # The default kind of loglet to be used
#[cfg_attr(feature = "schemars", schemars(with = "String"))]
pub default_provider: ProviderKind,
#[cfg_attr(feature = "schemars", schemars(with = "String"))]
/// Configuration of local loglet provider
pub local: LocalLogletOptions,
}

impl Default for BifrostOptions {
fn default() -> Self {
Self {
default_provider: ProviderKind::Local,
local: LocalLogletOptions::default(),
}
}
}

/// An enum with the list of supported loglet providers.
/// For each variant we must have a corresponding implementation of the
/// [`crate::loglet::Loglet`] trait
#[derive(
Debug,
Clone,
Hash,
Eq,
PartialEq,
Copy,
serde::Serialize,
serde::Deserialize,
Enum,
strum_macros::EnumIter,
strum_macros::Display,
)]
#[serde(rename_all = "kebab-case")]
pub enum ProviderKind {
/// A local rocksdb-backed loglet.
Local,
/// An in-memory loglet, primarily for testing.
InMemory,
}

#[serde_as]
#[derive(Debug, Clone, Serialize, Deserialize, derive_builder::Builder)]
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
#[cfg_attr(feature = "schemars", schemars(rename = "LocalLoglet", default))]
#[serde(rename_all = "kebab-case")]
#[builder(default)]
pub struct LocalLogletOptions {
#[serde(flatten)]
pub rocksdb: RocksDbOptions,

/// Trigger a commit when the batch size exceeds this threshold. Set to 0 or 1 to commit the
/// write batch on every command.
pub writer_commit_batch_size_threshold: usize,
/// Trigger a commit when the time since the last commit exceeds this threshold.
#[serde_as(as = "serde_with::DisplayFromStr")]
#[cfg_attr(feature = "schemars", schemars(with = "String"))]
pub writer_commit_time_interval: humantime::Duration,
/// The maximum number of write commands that can be queued.
pub writer_queue_len: usize,
/// If true, rocksdb flushes follow writing record batches, otherwise, we
/// fallback to rocksdb automatic WAL flushes.
pub flush_wal_on_commit: bool,
}

impl LocalLogletOptions {
pub fn data_dir(&self) -> PathBuf {
data_dir("local-loglet")
}
}

impl Default for LocalLogletOptions {
fn default() -> Self {
let rocksdb = RocksDbOptionsBuilder::default()
.rocksdb_disable_wal(Some(false))
.build()
.unwrap();
Self {
rocksdb,
writer_commit_batch_size_threshold: 200,
writer_commit_time_interval: Duration::from_millis(13).into(),
writer_queue_len: 200,
flush_wal_on_commit: true,
}
}
}
Loading

0 comments on commit 4c5c347

Please sign in to comment.