Skip to content

Commit

Permalink
Migration to the new configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
AhmedSoliman committed Apr 5, 2024
1 parent 7ea707b commit 34edae0
Show file tree
Hide file tree
Showing 68 changed files with 731 additions and 1,959 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions crates/admin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,9 @@
// by the Apache License, Version 2.0.

mod error;
mod options;
mod rest_api;
pub mod service;
mod state;
mod storage_query;

pub use crate::options::{Options, OptionsBuilder, OptionsBuilderError};
pub use error::Error;
43 changes: 0 additions & 43 deletions crates/admin/src/options.rs

This file was deleted.

30 changes: 9 additions & 21 deletions crates/admin/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use std::sync::Arc;
use axum::error_handling::HandleErrorLayer;
use http::StatusCode;
use restate_bifrost::Bifrost;
use restate_types::arc_util::Updateable;
use restate_types::config::AdminOptions;
use tonic::transport::Channel;
use tower::ServiceBuilder;
use tracing::info;
Expand All @@ -23,46 +25,32 @@ use restate_meta::{FileMetaReader, MetaHandle};
use restate_node_services::node_svc::node_svc_client::NodeSvcClient;
use restate_schema_impl::Schemas;

use crate::Error;
use crate::{rest_api, state, storage_query};
use crate::{Error, Options};

#[derive(Debug)]
pub struct AdminService {
opts: Options,
schemas: Schemas,
meta_handle: MetaHandle,
schema_reader: FileMetaReader,
}

impl AdminService {
pub fn new(
opts: Options,
schemas: Schemas,
meta_handle: MetaHandle,
schema_reader: FileMetaReader,
) -> Self {
pub fn new(schemas: Schemas, meta_handle: MetaHandle, schema_reader: FileMetaReader) -> Self {
Self {
opts,
schemas,
meta_handle,
schema_reader,
}
}

pub fn from_options(
options: Options,
schemas: Schemas,
meta_handle: MetaHandle,
schema_reader: FileMetaReader,
) -> Self {
Self::new(options, schemas, meta_handle, schema_reader)
}

pub async fn run(
self,
mut updateable_config: impl Updateable<AdminOptions> + Send + 'static,
node_svc_client: NodeSvcClient<Channel>,
bifrost: Bifrost,
) -> anyhow::Result<()> {
let opts = updateable_config.load();
let rest_state = state::AdminServiceState::new(
self.meta_handle,
self.schemas,
Expand All @@ -85,14 +73,14 @@ impl AdminService {
}))
.layer(tower::load_shed::LoadShedLayer::new())
.layer(tower::limit::GlobalConcurrencyLimitLayer::new(
self.opts.concurrent_api_requests_limit,
opts.concurrent_api_requests_limit,
)),
);

// Bind and serve
let server = hyper::Server::try_bind(&self.opts.bind_address)
let server = hyper::Server::try_bind(&opts.bind_address)
.map_err(|err| Error::Binding {
address: self.opts.bind_address,
address: opts.bind_address,
source: err,
})?
.serve(router.into_make_service());
Expand Down
1 change: 1 addition & 0 deletions crates/benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ restate-server = { workspace = true }
restate-types = { workspace = true, features = ["clap"] }

anyhow = { workspace = true }
arc-swap = { workspace = true }
drain = { workspace = true }
futures = { workspace = true }
futures-util = { workspace = true }
Expand Down
4 changes: 1 addition & 3 deletions crates/benchmarks/benches/throughput_parallel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,12 @@ use rand::distributions::{Alphanumeric, DistString};
use restate_benchmarks::counter::counter_client::CounterClient;
use restate_benchmarks::counter::CounterAddRequest;
use restate_benchmarks::{parse_benchmark_settings, BenchmarkSettings};
use restate_types::arc_util::Constant;
use tokio::runtime::Builder;
use tonic::transport::Channel;

fn throughput_benchmark(criterion: &mut Criterion) {
let old_config = restate_benchmarks::restate_old_configuration();
let config = restate_benchmarks::restate_configuration();
let tc = restate_benchmarks::spawn_restate(Constant::new(config), old_config);
let tc = restate_benchmarks::spawn_restate(config);

let BenchmarkSettings {
num_requests,
Expand Down
4 changes: 1 addition & 3 deletions crates/benchmarks/benches/throughput_sequential.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,11 @@ use hyper::Uri;
use pprof::criterion::{Output, PProfProfiler};
use restate_benchmarks::counter::counter_client::CounterClient;
use restate_benchmarks::counter::CounterAddRequest;
use restate_types::arc_util::Constant;
use tokio::runtime::Builder;

fn throughput_benchmark(criterion: &mut Criterion) {
let old_config = restate_benchmarks::restate_old_configuration();
let config = restate_benchmarks::restate_configuration();
let tc = restate_benchmarks::spawn_restate(Constant::new(config), old_config);
let tc = restate_benchmarks::spawn_restate(config);

let current_thread_rt = Builder::new_current_thread()
.enable_all()
Expand Down
61 changes: 7 additions & 54 deletions crates/benchmarks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@
//! Utilities for benchmarking the Restate runtime
use std::time::Duration;

use arc_swap::ArcSwap;
use futures_util::{future, TryFutureExt};
use hyper::header::CONTENT_TYPE;
use hyper::{Body, Uri};
use pprof::flamegraph::Options;
use restate_server::config_loader::ConfigLoaderBuilder;
use restate_types::arc_util::Updateable;
use restate_types::config::{
CommonOptionCliOverride, CommonOptionsBuilder, Configuration, ConfigurationBuilder,
CommonOptionsBuilder, Configuration, ConfigurationBuilder, UpdateableConfiguration,
WorkerOptionsBuilder,
};
use tokio::runtime::Runtime;
Expand Down Expand Up @@ -62,22 +62,16 @@ pub fn discover_deployment(current_thread_rt: &Runtime, address: Uri) {
.is_success(),);
}

pub fn spawn_restate(
mut updateable_config: impl Updateable<Configuration> + Send + 'static,
old_config: restate_node::config::Configuration,
) -> TaskCenter {
let common = &updateable_config.load().common;
pub fn spawn_restate(config: Configuration) -> TaskCenter {
let tc = TaskCenterBuilder::default()
.options(common.clone())
.options(config.common.clone())
.build()
.expect("task_center builds");
let cloned_tc = tc.clone();
let updateable_config = UpdateableConfiguration::new(ArcSwap::from_pointee(config));
tc.spawn(TaskKind::TestRunner, "benchmark", None, async move {
let config = updateable_config.load();
let node = Node::new(&old_config, config).expect("Restate node must build");
cloned_tc
.run_in_scope("startup", None, node.start(updateable_config))
.await
let node = Node::new(updateable_config).expect("Restate node must build");
cloned_tc.run_in_scope("startup", None, node.start()).await
})
.unwrap();

Expand All @@ -94,47 +88,6 @@ pub fn flamegraph_options<'a>() -> Options<'a> {
options
}

pub fn restate_old_configuration() -> restate_node::config::Configuration {
let meta_options = restate_node::MetaOptionsBuilder::default()
.schema_storage_path(tempfile::tempdir().expect("tempdir failed").into_path())
.build()
.expect("building meta options should work");

let rocksdb_options = restate_node::RocksdbOptionsBuilder::default()
.path(tempfile::tempdir().expect("tempdir failed").into_path())
.build()
.expect("building rocksdb options should work");

let worker_options = restate_node::WorkerOptionsBuilder::default()
.partitions(10)
.storage_rocksdb(rocksdb_options)
.build()
.expect("building worker options should work");

let admin_options = restate_node::AdminOptionsBuilder::default()
.meta(meta_options)
.build()
.expect("building admin options should work");

let node_options = restate_node::NodeOptionsBuilder::default()
.worker(worker_options)
.admin(admin_options)
.build()
.expect("building the configuration should work");

let config = restate_node::config::ConfigurationBuilder::default()
.node(node_options)
.build()
.expect("building the configuration should work");

restate_node::config::Configuration::load_with_default(
config,
None,
CommonOptionCliOverride::default(),
)
.expect("configuration loading should not fail")
}

pub fn restate_configuration() -> Configuration {
let common_options = CommonOptionsBuilder::default()
.base_dir(tempfile::tempdir().expect("tempdir failed").into_path())
Expand Down
1 change: 0 additions & 1 deletion crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ mod metadata;
pub mod metadata_store;
mod metric_definitions;
pub mod network;
pub mod options;
mod task_center;
mod task_center_types;

Expand Down
Loading

0 comments on commit 34edae0

Please sign in to comment.