Skip to content

Commit

Permalink
[4/n] Partial refactor of worker and admin role configs
Browse files Browse the repository at this point in the history
More re-org and cleanup. We need to move away from the `Options::build()` style:
- We should treat configuration as data-only structures that won't be consumed on service creation
- We pave the way for updateable config, components will own a "projection" to the config in the future instead of consuming it on startup

This is not the final structure, many changes are transitional
  • Loading branch information
AhmedSoliman committed Mar 28, 2024
1 parent 7553ccc commit 5984b44
Show file tree
Hide file tree
Showing 24 changed files with 237 additions and 232 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 5 additions & 17 deletions crates/admin/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,19 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use restate_meta::{FileMetaReader, MetaHandle};
use restate_schema_impl::Schemas;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use std::net::SocketAddr;

use crate::service::AdminService;

/// # Admin server options
#[serde_as]
#[derive(Debug, Clone, Serialize, Deserialize, derive_builder::Builder)]
#[cfg_attr(feature = "options_schema", derive(schemars::JsonSchema))]
#[cfg_attr(feature = "options_schema", schemars(rename = "AdminOptions", default))]
#[builder(default)]
pub struct Options {
#[serde(flatten)]
pub meta: restate_meta::Options,
/// # Endpoint address
///
/// Address to bind for the Admin APIs.
Expand All @@ -31,25 +29,15 @@ pub struct Options {
/// # Concurrency limit
///
/// Concurrency limit for the Admin APIs.
pub concurrency_limit: usize,
pub concurrent_api_requests_limit: usize,
}

impl Default for Options {
fn default() -> Self {
Self {
meta: Default::default(),
bind_address: "0.0.0.0:9070".parse().unwrap(),
concurrency_limit: 1000,
concurrent_api_requests_limit: 1000,
}
}
}

impl Options {
pub fn build(
self,
schemas: Schemas,
meta_handle: MetaHandle,
schema_reader: FileMetaReader,
) -> AdminService {
AdminService::new(self, schemas, meta_handle, schema_reader)
}
}
11 changes: 10 additions & 1 deletion crates/admin/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ impl AdminService {
}
}

pub fn from_options(
options: Options,
schemas: Schemas,
meta_handle: MetaHandle,
schema_reader: FileMetaReader,
) -> Self {
Self::new(options, schemas, meta_handle, schema_reader)
}

pub async fn run(
self,
node_svc_client: NodeSvcClient<Channel>,
Expand Down Expand Up @@ -76,7 +85,7 @@ impl AdminService {
}))
.layer(tower::load_shed::LoadShedLayer::new())
.layer(tower::limit::GlobalConcurrencyLimitLayer::new(
self.opts.concurrency_limit,
self.opts.concurrent_api_requests_limit,
)),
);

Expand Down
12 changes: 9 additions & 3 deletions crates/benchmarks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ use tokio::runtime::Runtime;
use restate_core::{TaskCenter, TaskCenterFactory};
use restate_node::Node;
use restate_node::{
MetaOptionsBuilder, NodeOptionsBuilder, RocksdbOptionsBuilder, WorkerOptionsBuilder,
AdminOptionsBuilder, MetaOptionsBuilder, NodeOptionsBuilder, RocksdbOptionsBuilder,
WorkerOptionsBuilder,
};
use restate_server::config::ConfigurationBuilder;
use restate_server::Configuration;
Expand Down Expand Up @@ -91,7 +92,7 @@ pub fn flamegraph_options<'a>() -> Options<'a> {

pub fn restate_configuration() -> Configuration {
let meta_options = MetaOptionsBuilder::default()
.storage_path(tempfile::tempdir().expect("tempdir failed").into_path())
.schema_storage_path(tempfile::tempdir().expect("tempdir failed").into_path())
.build()
.expect("building meta options should work");

Expand All @@ -106,9 +107,14 @@ pub fn restate_configuration() -> Configuration {
.build()
.expect("building worker options should work");

let admin_options = AdminOptionsBuilder::default()
.meta(meta_options)
.build()
.expect("building admin options should work");

let node_options = NodeOptionsBuilder::default()
.worker(worker_options)
.meta(meta_options)
.admin(admin_options)
.build()
.expect("building the configuration should work");

Expand Down
2 changes: 0 additions & 2 deletions crates/cluster-controller/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

mod options;
mod service;

pub use options::Options;
pub use service::{ClusterControllerHandle, Error, Service};
19 changes: 0 additions & 19 deletions crates/cluster-controller/src/options.rs

This file was deleted.

12 changes: 2 additions & 10 deletions crates/cluster-controller/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::options::Options;
use codederror::CodedError;
use restate_core::cancellation_watcher;

Expand All @@ -19,20 +18,13 @@ pub enum Error {
Error,
}

#[derive(Debug)]
pub struct Service {
#[allow(dead_code)]
options: Options,
}
#[derive(Debug, Default)]
pub struct Service {}

// todo: Replace with proper handle
pub struct ClusterControllerHandle;

impl Service {
pub fn new(options: Options) -> Self {
Service { options }
}

pub fn handle(&self) -> ClusterControllerHandle {
ClusterControllerHandle
}
Expand Down
1 change: 1 addition & 0 deletions crates/invoker-impl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ restate-types = { workspace = true, features = ["serde"] }
anyhow = { workspace = true }
bytes = { workspace = true }
codederror = { workspace = true }
derive-getters = { workspace = true }
derive_builder = { workspace = true }
drain = { workspace = true }
futures = { workspace = true }
Expand Down
46 changes: 43 additions & 3 deletions crates/invoker-impl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod state_machine_manager;
mod status_store;

use codederror::CodedError;
use futures::Stream;
use input_command::{InputCommand, InvokeCommand};
use invocation_state_machine::InvocationStateMachine;
use invocation_task::InvocationTask;
Expand All @@ -36,6 +37,7 @@ use restate_types::errors::InvocationError;
use restate_types::identifiers::{DeploymentId, FullInvocationId, PartitionKey, WithPartitionKey};
use restate_types::identifiers::{EntryIndex, PartitionLeaderEpoch};
use restate_types::journal::enriched::EnrichedRawEntry;
use restate_types::journal::raw::PlainRawEntry;
use restate_types::journal::Completion;
use restate_types::retries::RetryPolicy;
use status_store::InvocationStatusStore;
Expand All @@ -57,7 +59,7 @@ pub use options::{
Options, OptionsBuilder, OptionsBuilderError, ServiceClientOptionsBuilder,
ServiceClientOptionsBuilderError,
};
use restate_service_client::ServiceClient;
use restate_service_client::{AssumeRoleCacheMode, ServiceClient};
use restate_service_protocol::RESTATE_SERVICE_PROTOCOL_VERSION;

use crate::metric_definitions::{
Expand Down Expand Up @@ -210,6 +212,42 @@ impl<JR, SR, EE, DMR> Service<JR, SR, EE, DMR> {
},
}
}

pub fn from_options<JS>(
options: Options,
journal_reader: JR,
state_reader: SR,
entry_enricher: EE,
deployment_registry: DMR,
) -> Service<JR, SR, EE, DMR>
where
JR: JournalReader<JournalStream = JS> + Clone + Send + Sync + 'static,
JS: Stream<Item = PlainRawEntry> + Unpin + Send + 'static,
EE: EntryEnricher,
DMR: DeploymentResolver,
{
metric_definitions::describe_metrics();
let client = ServiceClient::from_options(
options.service_client().clone(),
AssumeRoleCacheMode::Unbounded,
);

Service::new(
deployment_registry,
options.retry_policy().clone(),
**options.inactivity_timeout(),
(*options.abort_timeout()).into(),
*options.disable_eager_state(),
*options.message_size_warning(),
*options.message_size_limit(),
client,
options.tmp_dir().clone(),
*options.concurrency_limit(),
journal_reader,
state_reader,
entry_enricher,
)
}
}

impl<JR, SR, EE, EMR> Service<JR, SR, EE, EMR>
Expand Down Expand Up @@ -1039,8 +1077,10 @@ mod tests {
false,
1024,
None,
ServiceClientOptions::default()
.build(restate_service_client::AssumeRoleCacheMode::None),
ServiceClient::from_options(
ServiceClientOptions::default(),
restate_service_client::AssumeRoleCacheMode::None,
),
tempdir.into_path(),
None,
journal_reader::mocks::EmptyJournalReader,
Expand Down
47 changes: 2 additions & 45 deletions crates/invoker-impl/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::metric_definitions;

use super::Service;

use futures::Stream;
use restate_invoker_api::{EntryEnricher, JournalReader};
use restate_schema_api::deployment::DeploymentResolver;
use restate_service_client::AssumeRoleCacheMode;
use restate_types::journal::raw::PlainRawEntry;
use derive_getters::Getters;
use restate_types::retries::RetryPolicy;
use serde_with::serde_as;
use std::path::PathBuf;
Expand All @@ -29,7 +21,7 @@ pub use restate_service_client::{

/// # Invoker options
#[serde_as]
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, derive_builder::Builder)]
#[derive(Debug, Getters, Clone, serde::Serialize, serde::Deserialize, derive_builder::Builder)]
#[cfg_attr(feature = "options_schema", derive(schemars::JsonSchema))]
#[cfg_attr(
feature = "options_schema",
Expand Down Expand Up @@ -119,38 +111,3 @@ impl Default for Options {
}
}
}

impl Options {
pub fn build<JR, JS, SR, EE, DMR>(
self,
journal_reader: JR,
state_reader: SR,
entry_enricher: EE,
deployment_registry: DMR,
) -> Service<JR, SR, EE, DMR>
where
JR: JournalReader<JournalStream = JS> + Clone + Send + Sync + 'static,
JS: Stream<Item = PlainRawEntry> + Unpin + Send + 'static,
EE: EntryEnricher,
DMR: DeploymentResolver,
{
metric_definitions::describe_metrics();
let client = self.service_client.build(AssumeRoleCacheMode::Unbounded);

Service::new(
deployment_registry,
self.retry_policy,
*self.inactivity_timeout,
*self.abort_timeout,
self.disable_eager_state,
self.message_size_warning,
self.message_size_limit,
client,
self.tmp_dir,
self.concurrency_limit,
journal_reader,
state_reader,
entry_enricher,
)
}
}
Loading

0 comments on commit 5984b44

Please sign in to comment.