From b2fb909896c7ef526521b0dd83d5819822dcb101 Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Wed, 27 Mar 2024 14:25:27 +0000 Subject: [PATCH] [4/n] Partial refactor of worker and admin role configs --- Cargo.lock | 2 + crates/admin/src/options.rs | 22 +++------- crates/admin/src/service.rs | 11 ++++- crates/benchmarks/src/lib.rs | 12 ++++-- crates/cluster-controller/src/lib.rs | 2 - crates/cluster-controller/src/options.rs | 19 --------- crates/cluster-controller/src/service.rs | 12 +----- crates/invoker-impl/Cargo.toml | 1 + crates/invoker-impl/src/lib.rs | 46 ++++++++++++++++++-- crates/invoker-impl/src/options.rs | 47 +-------------------- crates/meta/src/lib.rs | 42 ++++-------------- crates/meta/src/service.rs | 31 +++++++++++++- crates/node/src/lib.rs | 9 +++- crates/node/src/options.rs | 5 ++- crates/node/src/roles/admin.rs | 22 ++++++---- crates/node/src/roles/worker.rs | 18 +++++--- crates/service-client/Cargo.toml | 3 +- crates/service-client/src/http.rs | 54 ++++++++++++------------ crates/service-client/src/lambda.rs | 23 +++++----- crates/service-client/src/lib.rs | 8 ++++ crates/service-client/src/options.rs | 15 +++---- crates/worker/src/lib.rs | 41 ++++++++++-------- server/src/main.rs | 2 +- tools/xtask/src/main.rs | 15 ++++--- 24 files changed, 237 insertions(+), 225 deletions(-) delete mode 100644 crates/cluster-controller/src/options.rs diff --git a/Cargo.lock b/Cargo.lock index 7d3ec4ed9..0e50f6c59 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5177,6 +5177,7 @@ dependencies = [ "anyhow", "bytes", "codederror", + "derive-getters", "derive_builder", "drain", "futures", @@ -5577,6 +5578,7 @@ dependencies = [ "aws-smithy-runtime", "base64 0.21.7", "bytestring", + "derive-getters", "derive_builder", "futures", "http-serde", diff --git a/crates/admin/src/options.rs b/crates/admin/src/options.rs index f139ca496..aa73b6184 100644 --- a/crates/admin/src/options.rs +++ b/crates/admin/src/options.rs @@ -8,14 +8,10 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use restate_meta::{FileMetaReader, MetaHandle}; -use restate_schema_impl::Schemas; use serde::{Deserialize, Serialize}; use serde_with::serde_as; use std::net::SocketAddr; -use crate::service::AdminService; - /// # Admin server options #[serde_as] #[derive(Debug, Clone, Serialize, Deserialize, derive_builder::Builder)] @@ -23,6 +19,8 @@ use crate::service::AdminService; #[cfg_attr(feature = "options_schema", schemars(rename = "AdminOptions", default))] #[builder(default)] pub struct Options { + #[serde(flatten)] + pub meta: restate_meta::Options, /// # Endpoint address /// /// Address to bind for the Admin APIs. @@ -31,25 +29,15 @@ pub struct Options { /// # Concurrency limit /// /// Concurrency limit for the Admin APIs. - pub concurrency_limit: usize, + pub concurrent_api_requests_limit: usize, } impl Default for Options { fn default() -> Self { Self { + meta: Default::default(), bind_address: "0.0.0.0:9070".parse().unwrap(), - concurrency_limit: 1000, + concurrent_api_requests_limit: 1000, } } } - -impl Options { - pub fn build( - self, - schemas: Schemas, - meta_handle: MetaHandle, - schema_reader: FileMetaReader, - ) -> AdminService { - AdminService::new(self, schemas, meta_handle, schema_reader) - } -} diff --git a/crates/admin/src/service.rs b/crates/admin/src/service.rs index c6dc29679..ef73f12f7 100644 --- a/crates/admin/src/service.rs +++ b/crates/admin/src/service.rs @@ -49,6 +49,15 @@ impl AdminService { } } + pub fn from_options( + options: Options, + schemas: Schemas, + meta_handle: MetaHandle, + schema_reader: FileMetaReader, + ) -> Self { + Self::new(options, schemas, meta_handle, schema_reader) + } + pub async fn run( self, node_svc_client: NodeSvcClient, @@ -76,7 +85,7 @@ impl AdminService { })) .layer(tower::load_shed::LoadShedLayer::new()) .layer(tower::limit::GlobalConcurrencyLimitLayer::new( - self.opts.concurrency_limit, + self.opts.concurrent_api_requests_limit, )), ); diff --git a/crates/benchmarks/src/lib.rs b/crates/benchmarks/src/lib.rs index 2e4fb1b8e..cb4dfab84 100644 --- a/crates/benchmarks/src/lib.rs +++ b/crates/benchmarks/src/lib.rs @@ -23,7 +23,8 @@ use tokio::runtime::Runtime; use restate_core::{TaskCenter, TaskCenterFactory}; use restate_node::Node; use restate_node::{ - MetaOptionsBuilder, NodeOptionsBuilder, RocksdbOptionsBuilder, WorkerOptionsBuilder, + AdminOptionsBuilder, MetaOptionsBuilder, NodeOptionsBuilder, RocksdbOptionsBuilder, + WorkerOptionsBuilder, }; use restate_server::config::ConfigurationBuilder; use restate_server::Configuration; @@ -91,7 +92,7 @@ pub fn flamegraph_options<'a>() -> Options<'a> { pub fn restate_configuration() -> Configuration { let meta_options = MetaOptionsBuilder::default() - .storage_path(tempfile::tempdir().expect("tempdir failed").into_path()) + .schema_storage_path(tempfile::tempdir().expect("tempdir failed").into_path()) .build() .expect("building meta options should work"); @@ -106,9 +107,14 @@ pub fn restate_configuration() -> Configuration { .build() .expect("building worker options should work"); + let admin_options = AdminOptionsBuilder::default() + .meta(meta_options) + .build() + .expect("building admin options should work"); + let node_options = NodeOptionsBuilder::default() .worker(worker_options) - .meta(meta_options) + .admin(admin_options) .build() .expect("building the configuration should work"); diff --git a/crates/cluster-controller/src/lib.rs b/crates/cluster-controller/src/lib.rs index aaf91bc5d..ac9ac141f 100644 --- a/crates/cluster-controller/src/lib.rs +++ b/crates/cluster-controller/src/lib.rs @@ -8,8 +8,6 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -mod options; mod service; -pub use options::Options; pub use service::{ClusterControllerHandle, Error, Service}; diff --git a/crates/cluster-controller/src/options.rs b/crates/cluster-controller/src/options.rs deleted file mode 100644 index a59ada264..000000000 --- a/crates/cluster-controller/src/options.rs +++ /dev/null @@ -1,19 +0,0 @@ -// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. -// All rights reserved. -// -// Use of this software is governed by the Business Source License -// included in the LICENSE file. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0. - -/// # Controller service options -#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize, derive_builder::Builder)] -#[cfg_attr(feature = "options_schema", derive(schemars::JsonSchema))] -#[cfg_attr( - feature = "options_schema", - schemars(rename = "ClusterControllerOptions") -)] -#[cfg_attr(feature = "options_schema", schemars(default))] -pub struct Options {} diff --git a/crates/cluster-controller/src/service.rs b/crates/cluster-controller/src/service.rs index 08613c741..d5c287ec5 100644 --- a/crates/cluster-controller/src/service.rs +++ b/crates/cluster-controller/src/service.rs @@ -8,7 +8,6 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use crate::options::Options; use codederror::CodedError; use restate_core::cancellation_watcher; @@ -19,20 +18,13 @@ pub enum Error { Error, } -#[derive(Debug)] -pub struct Service { - #[allow(dead_code)] - options: Options, -} +#[derive(Debug, Default)] +pub struct Service {} // todo: Replace with proper handle pub struct ClusterControllerHandle; impl Service { - pub fn new(options: Options) -> Self { - Service { options } - } - pub fn handle(&self) -> ClusterControllerHandle { ClusterControllerHandle } diff --git a/crates/invoker-impl/Cargo.toml b/crates/invoker-impl/Cargo.toml index 40c7ccf32..6e1f89c3b 100644 --- a/crates/invoker-impl/Cargo.toml +++ b/crates/invoker-impl/Cargo.toml @@ -27,6 +27,7 @@ restate-types = { workspace = true, features = ["serde"] } anyhow = { workspace = true } bytes = { workspace = true } codederror = { workspace = true } +derive-getters = { workspace = true } derive_builder = { workspace = true } drain = { workspace = true } futures = { workspace = true } diff --git a/crates/invoker-impl/src/lib.rs b/crates/invoker-impl/src/lib.rs index 8629febe5..733448176 100644 --- a/crates/invoker-impl/src/lib.rs +++ b/crates/invoker-impl/src/lib.rs @@ -18,6 +18,7 @@ mod state_machine_manager; mod status_store; use codederror::CodedError; +use futures::Stream; use input_command::{InputCommand, InvokeCommand}; use invocation_state_machine::InvocationStateMachine; use invocation_task::InvocationTask; @@ -36,6 +37,7 @@ use restate_types::errors::InvocationError; use restate_types::identifiers::{DeploymentId, FullInvocationId, PartitionKey, WithPartitionKey}; use restate_types::identifiers::{EntryIndex, PartitionLeaderEpoch}; use restate_types::journal::enriched::EnrichedRawEntry; +use restate_types::journal::raw::PlainRawEntry; use restate_types::journal::Completion; use restate_types::retries::RetryPolicy; use status_store::InvocationStatusStore; @@ -57,7 +59,7 @@ pub use options::{ Options, OptionsBuilder, OptionsBuilderError, ServiceClientOptionsBuilder, ServiceClientOptionsBuilderError, }; -use restate_service_client::ServiceClient; +use restate_service_client::{AssumeRoleCacheMode, ServiceClient}; use restate_service_protocol::RESTATE_SERVICE_PROTOCOL_VERSION; use crate::metric_definitions::{ @@ -210,6 +212,42 @@ impl Service { }, } } + + pub fn from_options( + options: Options, + journal_reader: JR, + state_reader: SR, + entry_enricher: EE, + deployment_registry: DMR, + ) -> Service + where + JR: JournalReader + Clone + Send + Sync + 'static, + JS: Stream + Unpin + Send + 'static, + EE: EntryEnricher, + DMR: DeploymentResolver, + { + metric_definitions::describe_metrics(); + let client = ServiceClient::from_options( + options.service_client().clone(), + AssumeRoleCacheMode::Unbounded, + ); + + Service::new( + deployment_registry, + options.retry_policy().clone(), + **options.inactivity_timeout(), + (*options.abort_timeout()).into(), + *options.disable_eager_state(), + *options.message_size_warning(), + *options.message_size_limit(), + client, + options.tmp_dir().clone(), + *options.concurrency_limit(), + journal_reader, + state_reader, + entry_enricher, + ) + } } impl Service @@ -1039,8 +1077,10 @@ mod tests { false, 1024, None, - ServiceClientOptions::default() - .build(restate_service_client::AssumeRoleCacheMode::None), + ServiceClient::from_options( + ServiceClientOptions::default(), + restate_service_client::AssumeRoleCacheMode::None, + ), tempdir.into_path(), None, journal_reader::mocks::EmptyJournalReader, diff --git a/crates/invoker-impl/src/options.rs b/crates/invoker-impl/src/options.rs index 360abe67b..d80835aee 100644 --- a/crates/invoker-impl/src/options.rs +++ b/crates/invoker-impl/src/options.rs @@ -8,15 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use crate::metric_definitions; - -use super::Service; - -use futures::Stream; -use restate_invoker_api::{EntryEnricher, JournalReader}; -use restate_schema_api::deployment::DeploymentResolver; -use restate_service_client::AssumeRoleCacheMode; -use restate_types::journal::raw::PlainRawEntry; +use derive_getters::Getters; use restate_types::retries::RetryPolicy; use serde_with::serde_as; use std::path::PathBuf; @@ -29,7 +21,7 @@ pub use restate_service_client::{ /// # Invoker options #[serde_as] -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, derive_builder::Builder)] +#[derive(Debug, Getters, Clone, serde::Serialize, serde::Deserialize, derive_builder::Builder)] #[cfg_attr(feature = "options_schema", derive(schemars::JsonSchema))] #[cfg_attr( feature = "options_schema", @@ -119,38 +111,3 @@ impl Default for Options { } } } - -impl Options { - pub fn build( - self, - journal_reader: JR, - state_reader: SR, - entry_enricher: EE, - deployment_registry: DMR, - ) -> Service - where - JR: JournalReader + Clone + Send + Sync + 'static, - JS: Stream + Unpin + Send + 'static, - EE: EntryEnricher, - DMR: DeploymentResolver, - { - metric_definitions::describe_metrics(); - let client = self.service_client.build(AssumeRoleCacheMode::Unbounded); - - Service::new( - deployment_registry, - self.retry_policy, - *self.inactivity_timeout, - *self.abort_timeout, - self.disable_eager_state, - self.message_size_warning, - self.message_size_limit, - client, - self.tmp_dir, - self.concurrency_limit, - journal_reader, - state_reader, - entry_enricher, - ) - } -} diff --git a/crates/meta/src/lib.rs b/crates/meta/src/lib.rs index 0b1ca503f..6f7818e92 100644 --- a/crates/meta/src/lib.rs +++ b/crates/meta/src/lib.rs @@ -12,9 +12,6 @@ mod error; mod service; mod storage; -use restate_schema_impl::Schemas; -use restate_service_client::AssumeRoleCacheMode; -use restate_types::retries::RetryPolicy; use std::path::{Path, PathBuf}; pub use error::Error; @@ -25,10 +22,7 @@ pub use restate_service_client::{ pub use service::{ApplyMode, Force, MetaHandle, MetaService}; pub use storage::{FileMetaReader, FileMetaStorage, MetaReader, MetaStorage}; -use std::time::Duration; - use codederror::CodedError; -use restate_schema_api::subscription::SubscriptionValidator; use restate_types::DEFAULT_STORAGE_DIRECTORY; use serde::{Deserialize, Serialize}; use serde_with::serde_as; @@ -48,46 +42,26 @@ pub struct BuildError( #[cfg_attr(feature = "options_schema", schemars(rename = "MetaOptions", default))] #[builder(default)] pub struct Options { - /// # Storage path + // todo: remove after moving schema to metadata store + /// # [DEPRECATED] Storage path /// - /// Root path for Meta storage. - storage_path: PathBuf, + /// Root path for Schema storage. + schema_storage_path: PathBuf, - service_client: ServiceClientOptions, + discovery: ServiceClientOptions, } impl Default for Options { fn default() -> Self { Self { - storage_path: Path::new(DEFAULT_STORAGE_DIRECTORY).join("meta"), - service_client: Default::default(), + schema_storage_path: Path::new(DEFAULT_STORAGE_DIRECTORY).join("meta"), + discovery: Default::default(), } } } impl Options { pub fn storage_path(&self) -> &Path { - self.storage_path.as_path() - } - - pub fn build( - self, - subscription_validator: SV, - ) -> Result, BuildError> { - let schemas = Schemas::default(); - let client = self.service_client.build(AssumeRoleCacheMode::None); - Ok(MetaService::new( - schemas.clone(), - FileMetaStorage::new(self.storage_path)?, - subscription_validator, - // Total duration roughly 66 seconds - RetryPolicy::exponential( - Duration::from_millis(100), - 2.0, - 10, - Some(Duration::from_secs(20)), - ), - client, - )) + self.schema_storage_path.as_path() } } diff --git a/crates/meta/src/service.rs b/crates/meta/src/service.rs index bf7ce8a0d..7e2d8eeab 100644 --- a/crates/meta/src/service.rs +++ b/crates/meta/src/service.rs @@ -8,11 +8,14 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use crate::{BuildError, FileMetaStorage, Options}; + use super::error::Error; use super::storage::MetaStorage; use std::collections::HashMap; use std::future::Future; +use std::time::Duration; use http::Uri; use tokio::sync::mpsc; @@ -28,7 +31,7 @@ use restate_schema_impl::{Schemas, SchemasUpdateCommand}; use restate_types::identifiers::{DeploymentId, SubscriptionId}; use restate_types::retries::RetryPolicy; -use restate_service_client::{Endpoint, ServiceClient}; +use restate_service_client::{AssumeRoleCacheMode, Endpoint, ServiceClient}; use restate_service_protocol::discovery; use restate_service_protocol::discovery::ComponentDiscovery; @@ -212,6 +215,32 @@ pub struct MetaService { reloaded: bool, } +impl MetaService +where + SV: SubscriptionValidator, +{ + pub fn from_options( + options: Options, + subscription_validator: SV, + ) -> Result, BuildError> { + let schemas = Schemas::default(); + let client = ServiceClient::from_options(options.discovery, AssumeRoleCacheMode::None); + Ok(MetaService::new( + schemas.clone(), + FileMetaStorage::new(options.schema_storage_path)?, + subscription_validator, + // Total duration roughly 66 seconds + RetryPolicy::exponential( + Duration::from_millis(100), + 2.0, + 10, + Some(Duration::from_secs(20)), + ), + client, + )) + } +} + impl MetaService where Storage: MetaStorage, diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index 2a167063d..1317fb0ba 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -116,14 +116,19 @@ impl Node { metadata_manager.register_in_message_router(&mut router_builder); let admin_role = if common_opts.roles().contains(Role::Admin) { - Some(AdminRole::new(options.clone(), networking.clone())?) + Some(AdminRole::new( + options.admin.clone(), + options.kafka.clone(), + networking.clone(), + )?) } else { None }; let worker_role = if common_opts.roles().contains(Role::Worker) { Some(WorkerRole::new( - options.clone(), + options.worker.clone(), + options.kafka.clone(), &mut router_builder, networking.clone(), bifrost.handle(), diff --git a/crates/node/src/options.rs b/crates/node/src/options.rs index 0f4a45504..743c96cfa 100644 --- a/crates/node/src/options.rs +++ b/crates/node/src/options.rs @@ -9,6 +9,7 @@ // by the Apache License, Version 2.0. use restate_types::net::AdvertisedAddress; +use restate_worker::KafkaIngressOptions; use serde::{Deserialize, Serialize}; use serde_with::serde_as; @@ -18,13 +19,13 @@ use serde_with::serde_as; #[cfg_attr(feature = "options_schema", schemars(default))] #[builder(default)] pub struct Options { - pub meta: restate_meta::Options, pub worker: restate_worker::Options, pub admin: restate_admin::Options, pub bifrost: restate_bifrost::Options, - pub cluster_controller: restate_cluster_controller::Options, pub metadata_store: restate_metadata_store::local::Options, + pub kafka: KafkaIngressOptions, + /// todo: remove. /// Configures the admin address. If it is not specified, then this /// node needs to run the admin role #[serde_as(as = "serde_with::NoneAsEmptyString")] diff --git a/crates/node/src/roles/admin.rs b/crates/node/src/roles/admin.rs index 798777397..58b96be88 100644 --- a/crates/node/src/roles/admin.rs +++ b/crates/node/src/roles/admin.rs @@ -10,6 +10,7 @@ use anyhow::Context; use codederror::CodedError; +use restate_admin::Options as AdminOptions; use restate_network::Networking; use restate_node_services::node_svc::node_svc_client::NodeSvcClient; use tonic::transport::Channel; @@ -22,8 +23,6 @@ use restate_core::{task_center, TaskKind}; use restate_meta::{FileMetaReader, FileMetaStorage, MetaService}; use restate_worker::KafkaIngressOptions; -use crate::Options; - #[derive(Debug, thiserror::Error, CodedError)] pub enum AdminRoleBuildError { #[error("failed creating meta: {0}")] @@ -42,14 +41,21 @@ pub struct AdminRole { } impl AdminRole { - pub fn new(options: Options, _networking: Networking) -> Result { - let meta = options.meta.build(options.worker.kafka.clone())?; - let admin = options - .admin - .build(meta.schemas(), meta.meta_handle(), meta.schema_reader()); + pub fn new( + admin_options: AdminOptions, + kafka_options: KafkaIngressOptions, + _networking: Networking, + ) -> Result { + let meta = MetaService::from_options(admin_options.meta.clone(), kafka_options)?; + let admin = AdminService::from_options( + admin_options, + meta.schemas(), + meta.meta_handle(), + meta.schema_reader(), + ); Ok(AdminRole { - controller: restate_cluster_controller::Service::new(options.cluster_controller), + controller: restate_cluster_controller::Service::default(), admin, meta, }) diff --git a/crates/node/src/roles/worker.rs b/crates/node/src/roles/worker.rs index 9b7f35850..ad7684e98 100644 --- a/crates/node/src/roles/worker.rs +++ b/crates/node/src/roles/worker.rs @@ -30,11 +30,11 @@ use restate_storage_query_datafusion::context::QueryContext; use restate_storage_rocksdb::RocksDBStorage; use restate_types::net::AdvertisedAddress; use restate_types::retries::RetryPolicy; -use restate_worker::{SubscriptionControllerHandle, Worker}; +use restate_worker::{KafkaIngressOptions, SubscriptionControllerHandle, Worker}; use restate_worker_api::SubscriptionController; use tracing::info; -use crate::Options; +use restate_worker::Options as WorkerOptions; #[derive(Debug, thiserror::Error, CodedError)] pub enum WorkerRoleError { @@ -100,15 +100,21 @@ pub struct WorkerRole { impl WorkerRole { pub fn new( - options: Options, + options: WorkerOptions, + kafka_options: KafkaIngressOptions, router_builder: &mut MessageRouterBuilder, networking: Networking, bifrost: Bifrost, ) -> Result { let schemas = Schemas::default(); - let worker = options - .worker - .build(networking, bifrost, router_builder, schemas.clone())?; + let worker = Worker::from_options( + options, + kafka_options, + networking, + bifrost, + router_builder, + schemas.clone(), + )?; Ok(WorkerRole { schemas, worker }) } diff --git a/crates/service-client/Cargo.toml b/crates/service-client/Cargo.toml index 0b46980bc..44d01c48c 100644 --- a/crates/service-client/Cargo.toml +++ b/crates/service-client/Cargo.toml @@ -21,8 +21,9 @@ schemars = { workspace = true, optional = true } arc-swap = { workspace = true } base64 = { workspace = true } bytestring = { workspace = true } +derive-getters = { workspace = true } futures = { workspace = true } -http-serde = "1.1.2" +http-serde = { version = "1.1.2" } humantime = { workspace = true } hyper = { workspace = true } hyper-rustls = { workspace = true } diff --git a/crates/service-client/src/http.rs b/crates/service-client/src/http.rs index cc44b8b2d..a2199ae5b 100644 --- a/crates/service-client/src/http.rs +++ b/crates/service-client/src/http.rs @@ -37,49 +37,25 @@ pub struct Options { /// /// Configuration for the HTTP/2 keep-alive mechanism, using PING frames. /// If unset, HTTP/2 keep-alive are disabled. - keep_alive_options: Option, + http_keep_alive_options: Option, /// # Proxy URI /// /// A URI, such as `http://127.0.0.1:10001`, of a server to which all invocations should be sent, with the `Host` header set to the deployment URI. /// HTTPS proxy URIs are supported, but only HTTP endpoint traffic will be proxied currently. /// Can be overridden by the `HTTP_PROXY` environment variable. #[cfg_attr(feature = "options_schema", schemars(with = "Option"))] - proxy_uri: Option, + http_proxy: Option, } impl Default for Options { fn default() -> Self { Self { - keep_alive_options: Some(Default::default()), - proxy_uri: None, + http_keep_alive_options: Some(Default::default()), + http_proxy: None, } } } -impl Options { - pub fn build(self) -> HttpClient { - let mut builder = hyper::Client::builder(); - builder.http2_only(true); - - if let Some(keep_alive_options) = self.keep_alive_options { - builder - .http2_keep_alive_timeout(keep_alive_options.timeout.into()) - .http2_keep_alive_interval(Some(keep_alive_options.interval.into())); - } - - HttpClient::new( - builder.build::<_, hyper::Body>(ProxyConnector::new( - self.proxy_uri, - hyper_rustls::HttpsConnectorBuilder::new() - .with_native_roots() - .https_or_http() - .enable_http2() - .build(), - )), - ) - } -} - /// # HTTP/2 Keep alive options /// /// Configuration for the HTTP/2 keep-alive mechanism, using PING frames. @@ -145,6 +121,28 @@ impl HttpClient { Self { client } } + pub fn from_options(options: Options) -> HttpClient { + let mut builder = hyper::Client::builder(); + builder.http2_only(true); + + if let Some(keep_alive_options) = options.http_keep_alive_options { + builder + .http2_keep_alive_timeout(keep_alive_options.timeout.into()) + .http2_keep_alive_interval(Some(keep_alive_options.interval.into())); + } + + HttpClient::new( + builder.build::<_, hyper::Body>(ProxyConnector::new( + options.http_proxy, + hyper_rustls::HttpsConnectorBuilder::new() + .with_native_roots() + .https_or_http() + .enable_http2() + .build(), + )), + ) + } + fn build_request( uri: Uri, version: Version, diff --git a/crates/service-client/src/lambda.rs b/crates/service-client/src/lambda.rs index e0482c6a2..537488751 100644 --- a/crates/service-client/src/lambda.rs +++ b/crates/service-client/src/lambda.rs @@ -65,17 +65,7 @@ pub struct Options { /// An external ID to apply to any AssumeRole operations taken by this client. /// https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-user_externalid.html /// Can be overridden by the `AWS_EXTERNAL_ID` environment variable. - assume_role_external_id: Option, -} - -impl Options { - pub fn build(self, assume_role_cache_mode: AssumeRoleCacheMode) -> LambdaClient { - LambdaClient::new( - self.aws_profile, - self.assume_role_external_id, - assume_role_cache_mode, - ) - } + aws_assume_role_external_id: Option, } /// # AssumeRole Cache Mode @@ -193,6 +183,17 @@ impl LambdaClient { Self { inner } } + pub fn from_options( + options: Options, + assume_role_cache_mode: AssumeRoleCacheMode, + ) -> LambdaClient { + LambdaClient::new( + options.aws_profile, + options.aws_assume_role_external_id, + assume_role_cache_mode, + ) + } + pub fn invoke( &self, arn: LambdaARN, diff --git a/crates/service-client/src/lib.rs b/crates/service-client/src/lib.rs index f11f86ccb..80baa1079 100644 --- a/crates/service-client/src/lib.rs +++ b/crates/service-client/src/lib.rs @@ -46,6 +46,14 @@ impl ServiceClient { pub(crate) fn new(http: HttpClient, lambda: LambdaClient) -> Self { Self { http, lambda } } + + pub fn from_options(options: Options, assume_role_cache_mode: AssumeRoleCacheMode) -> Self { + let (http, lambda) = options.dissolve(); + Self::new( + HttpClient::from_options(http), + LambdaClient::from_options(lambda, assume_role_cache_mode), + ) + } } impl ServiceClient { diff --git a/crates/service-client/src/options.rs b/crates/service-client/src/options.rs index 38d932b74..0c0dba539 100644 --- a/crates/service-client/src/options.rs +++ b/crates/service-client/src/options.rs @@ -8,13 +8,12 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use super::lambda::AssumeRoleCacheMode; pub use super::lambda::{ Options as LambdaClientOptions, OptionsBuilder as LambdaClientOptionsBuilder, OptionsBuilderError as LambdaClientOptionsBuilderError, }; -use super::ServiceClient; +use derive_getters::{Dissolve, Getters}; use serde_with::serde_as; pub use super::http::{ @@ -24,7 +23,9 @@ pub use super::http::{ /// # Client options #[serde_as] -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, derive_builder::Builder)] +#[derive( + Debug, Getters, Dissolve, Clone, serde::Serialize, serde::Deserialize, derive_builder::Builder, +)] #[cfg_attr(feature = "options_schema", derive(schemars::JsonSchema))] #[cfg_attr( feature = "options_schema", @@ -33,12 +34,8 @@ pub use super::http::{ #[builder(default)] #[derive(Default)] pub struct Options { + #[serde(flatten)] http: HttpClientOptions, + #[serde(flatten)] lambda: LambdaClientOptions, } - -impl Options { - pub fn build(self, assume_role_cache_mode: AssumeRoleCacheMode) -> ServiceClient { - ServiceClient::new(self.http.build(), self.lambda.build(assume_role_cache_mode)) - } -} diff --git a/crates/worker/src/lib.rs b/crates/worker/src/lib.rs index 280cf91fe..c84beaed8 100644 --- a/crates/worker/src/lib.rs +++ b/crates/worker/src/lib.rs @@ -96,7 +96,6 @@ pub struct Options { storage_query_postgres: StorageQueryPostgresOptions, storage_rocksdb: RocksdbOptions, ingress: IngressOptions, - pub kafka: KafkaIngressOptions, invoker: InvokerOptions, /// # Partitions @@ -119,7 +118,6 @@ impl Default for Options { storage_query_postgres: Default::default(), storage_rocksdb: Default::default(), ingress: Default::default(), - kafka: Default::default(), invoker: Default::default(), partitions: 64, } @@ -146,17 +144,6 @@ impl Options { pub fn storage_path(&self) -> &Path { self.storage_rocksdb.path.as_path() } - - pub fn build( - self, - networking: Networking, - bifrost: Bifrost, - router_builder: &mut MessageRouterBuilder, - schemas: Schemas, - ) -> Result { - metric_definitions::describe_metrics(); - Worker::new(self, networking, bifrost, router_builder, schemas) - } } #[derive(Debug, thiserror::Error, CodedError)] @@ -198,8 +185,28 @@ pub struct Worker { } impl Worker { + pub fn from_options( + options: Options, + kafka_options: KafkaIngressOptions, + networking: Networking, + bifrost: Bifrost, + router_builder: &mut MessageRouterBuilder, + schemas: Schemas, + ) -> Result { + metric_definitions::describe_metrics(); + Worker::new( + options, + kafka_options, + networking, + bifrost, + router_builder, + schemas, + ) + } + pub fn new( opts: Options, + kafka_options: KafkaIngressOptions, networking: Networking, bifrost: Bifrost, router_builder: &mut MessageRouterBuilder, @@ -209,7 +216,6 @@ impl Worker { let Options { ingress, - kafka, storage_query_datafusion, storage_query_postgres, storage_rocksdb, @@ -223,8 +229,8 @@ impl Worker { let ingress_http = ingress.build(ingress_dispatcher.clone(), schemas.clone()); // ingress_kafka - let kafka_config_clone = kafka.clone(); - let ingress_kafka = kafka.build(ingress_dispatcher.clone()); + let kafka_config_clone = kafka_options.clone(); + let ingress_kafka = kafka_options.build(ingress_dispatcher.clone()); let subscription_controller_handle = subscription_integration::SubscriptionControllerHandle::new( kafka_config_clone, @@ -234,7 +240,8 @@ impl Worker { let (rocksdb_storage, rocksdb_writer) = storage_rocksdb.build()?; let invoker_storage_reader = InvokerStorageReader::new(rocksdb_storage.clone()); - let invoker = opts.invoker.build( + let invoker = InvokerService::from_options( + opts.invoker, invoker_storage_reader.clone(), invoker_storage_reader, EntryEnricher::new(schemas.clone()), diff --git a/server/src/main.rs b/server/src/main.rs index 6caced9b1..e3aab7339 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -160,7 +160,7 @@ fn main() { WipeMode::wipe( cli_args.wipe.as_ref(), - config.node.meta.storage_path().into(), + config.node.admin.meta.storage_path().into(), config.node.worker.storage_path().into(), config.node.bifrost.local.path.as_path(), config.node.metadata_store.storage_path(), diff --git a/tools/xtask/src/main.rs b/tools/xtask/src/main.rs index b731f46ab..d483f895f 100644 --- a/tools/xtask/src/main.rs +++ b/tools/xtask/src/main.rs @@ -10,9 +10,11 @@ use anyhow::bail; use reqwest::header::ACCEPT; +use restate_admin::service::AdminService; use restate_bifrost::Bifrost; use restate_core::TaskKind; use restate_core::TestCoreEnv; +use restate_meta::MetaService; use restate_node_services::node_svc::node_svc_client::NodeSvcClient; use restate_schema_api::subscription::Subscription; use restate_types::identifiers::SubscriptionId; @@ -86,15 +88,18 @@ impl restate_schema_api::subscription::SubscriptionValidator for Mock { async fn generate_rest_api_doc() -> anyhow::Result<()> { let admin_options = restate_admin::Options::default(); let meta_options = restate_meta::Options::default(); - let mut meta = meta_options - .build(Mock) - .expect("expect to build meta service"); + let mut meta = + MetaService::from_options(meta_options, Mock).expect("expect to build meta service"); let openapi_address = format!( "http://localhost:{}/openapi", admin_options.bind_address.port() ); - let admin_service = - admin_options.build(meta.schemas(), meta.meta_handle(), meta.schema_reader()); + let admin_service = AdminService::from_options( + admin_options, + meta.schemas(), + meta.meta_handle(), + meta.schema_reader(), + ); meta.init().await.unwrap(); // We start the Meta component, then download the openapi schema generated