From 429b74aeda5057055ddea8e461cf02222180b67c Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Wed, 14 Feb 2024 22:46:10 +0100 Subject: [PATCH] Push schema information to workers from admin service on schema changes This commit enables the admin service to push schema updates to the worker in case of schema changes. --- Cargo.lock | 2 + crates/admin/src/options.rs | 11 ++-- crates/admin/src/rest_api/deployments.rs | 5 ++ crates/admin/src/rest_api/error.rs | 2 + crates/admin/src/rest_api/mod.rs | 33 ++++++++++++ crates/admin/src/rest_api/services.rs | 3 ++ crates/admin/src/rest_api/subscriptions.rs | 5 ++ crates/admin/src/service.rs | 34 +++++++------ crates/admin/src/state.rs | 22 +++++++- crates/meta/src/error.rs | 5 +- crates/node-services/proto/worker.proto | 7 +++ crates/node/src/lib.rs | 2 + crates/node/src/roles/cluster_controller.rs | 6 ++- crates/node/src/roles/mod.rs | 2 +- crates/node/src/roles/worker.rs | 56 ++++++++++++--------- crates/node/src/server/handler/worker.rs | 35 ++++++++++++- crates/node/src/server/service.rs | 13 ++++- crates/worker/src/lib.rs | 2 +- tools/xtask/Cargo.toml | 2 + tools/xtask/src/main.rs | 11 +++- 20 files changed, 203 insertions(+), 55 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1451dd917..4bf1b90c9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7514,6 +7514,7 @@ dependencies = [ "restate-admin", "restate-bifrost", "restate-meta", + "restate-node-services", "restate-schema-api", "restate-server", "restate-types", @@ -7522,6 +7523,7 @@ dependencies = [ "serde_json", "serde_yaml", "tokio", + "tonic 0.10.2", ] [[package]] diff --git a/crates/admin/src/options.rs b/crates/admin/src/options.rs index b76fcb29e..f139ca496 100644 --- a/crates/admin/src/options.rs +++ b/crates/admin/src/options.rs @@ -8,7 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use restate_meta::MetaHandle; +use restate_meta::{FileMetaReader, MetaHandle}; use restate_schema_impl::Schemas; use serde::{Deserialize, Serialize}; use serde_with::serde_as; @@ -44,7 +44,12 @@ impl Default for Options { } impl Options { - pub fn build(self, schemas: Schemas, meta_handle: MetaHandle) -> AdminService { - AdminService::new(self, schemas, meta_handle) + pub fn build( + self, + schemas: Schemas, + meta_handle: MetaHandle, + schema_reader: FileMetaReader, + ) -> AdminService { + AdminService::new(self, schemas, meta_handle, schema_reader) } } diff --git a/crates/admin/src/rest_api/deployments.rs b/crates/admin/src/rest_api/deployments.rs index 3bb4630e6..f17fae1ea 100644 --- a/crates/admin/src/rest_api/deployments.rs +++ b/crates/admin/src/rest_api/deployments.rs @@ -19,6 +19,7 @@ use restate_service_client::Endpoint; use restate_service_protocol::discovery::DiscoverEndpoint; use restate_types::identifiers::InvalidLambdaARN; +use crate::rest_api::notify_worker_about_schema_changes; use axum::body::Bytes; use axum::extract::{Path, Query, State}; use axum::http::{header, HeaderValue, StatusCode}; @@ -97,6 +98,8 @@ pub async fn create_deployment( .register_deployment(discover_endpoint, force, apply_changes) .await?; + notify_worker_about_schema_changes(state.schema_reader(), state.worker_svc_client()).await?; + let response_body = RegisterDeploymentResponse { id: registration_result.deployment, services: registration_result.services, @@ -241,6 +244,8 @@ pub async fn delete_deployment( ) -> Result { if let Some(true) = force { state.meta_handle().remove_deployment(deployment_id).await?; + notify_worker_about_schema_changes(state.schema_reader(), state.worker_svc_client()) + .await?; Ok(StatusCode::ACCEPTED) } else { Ok(StatusCode::NOT_IMPLEMENTED) diff --git a/crates/admin/src/rest_api/error.rs b/crates/admin/src/rest_api/error.rs index 4af492cf2..deb57888b 100644 --- a/crates/admin/src/rest_api/error.rs +++ b/crates/admin/src/rest_api/error.rs @@ -43,6 +43,8 @@ pub enum MetaApiError { Meta(#[from] MetaError), #[error(transparent)] Worker(#[from] restate_worker_api::Error), + #[error(transparent)] + Generic(Error), } /// # Error description response diff --git a/crates/admin/src/rest_api/mod.rs b/crates/admin/src/rest_api/mod.rs index 56df892c7..c5de51675 100644 --- a/crates/admin/src/rest_api/mod.rs +++ b/crates/admin/src/rest_api/mod.rs @@ -18,8 +18,14 @@ mod methods; mod services; mod subscriptions; +use crate::rest_api::error::MetaApiError; use okapi_operation::axum_integration::{delete, get, patch, post}; use okapi_operation::*; +use restate_meta::{FileMetaReader, MetaReader}; +use restate_node_services::worker::worker_svc_client::WorkerSvcClient; +use restate_node_services::worker::UpdateSchemaRequest; +use tonic::transport::Channel; +use tracing::debug; use crate::state::AdminServiceState; @@ -101,3 +107,30 @@ pub fn create_router, +) -> Result<(), MetaApiError> { + let schema_updates = schema_reader + .read() + .await + .map_err(|err| MetaApiError::Meta(err.into()))?; + + // don't fail if the worker is not reachable + let result = worker_svc_client + .update_schemas(UpdateSchemaRequest { + schema_bin: bincode::serde::encode_to_vec(schema_updates, bincode::config::standard()) + .map_err(|err| MetaApiError::Generic(err.into()))? + .into(), + }) + .await; + + if let Err(err) = result { + debug!("Failed notifying worker about schema changes: {err}"); + } + + Ok(()) +} diff --git a/crates/admin/src/rest_api/services.rs b/crates/admin/src/rest_api/services.rs index 399a78b1b..414b9f175 100644 --- a/crates/admin/src/rest_api/services.rs +++ b/crates/admin/src/rest_api/services.rs @@ -15,6 +15,7 @@ use restate_meta_rest_model::services::*; use restate_pb::grpc::reflection::v1::FileDescriptorResponse; use restate_schema_api::service::ServiceMetadataResolver; +use crate::rest_api::notify_worker_about_schema_changes; use axum::extract::{Path, State}; use axum::http::{header, HeaderValue}; use axum::response::{IntoResponse, Response}; @@ -91,6 +92,8 @@ pub async fn modify_service( .modify_service(service_name.clone(), public) .await?; + notify_worker_about_schema_changes(state.schema_reader(), state.worker_svc_client()).await?; + state .schemas() .resolve_latest_service_metadata(&service_name) diff --git a/crates/admin/src/rest_api/subscriptions.rs b/crates/admin/src/rest_api/subscriptions.rs index ca0eea0bb..64051c8c6 100644 --- a/crates/admin/src/rest_api/subscriptions.rs +++ b/crates/admin/src/rest_api/subscriptions.rs @@ -14,6 +14,7 @@ use crate::state::AdminServiceState; use restate_meta_rest_model::subscriptions::*; use restate_schema_api::subscription::SubscriptionResolver; +use crate::rest_api::notify_worker_about_schema_changes; use axum::extract::Query; use axum::extract::{Path, State}; use axum::http::StatusCode; @@ -52,6 +53,8 @@ pub async fn create_subscription( ) .await?; + notify_worker_about_schema_changes(state.schema_reader(), state.worker_svc_client()).await?; + Ok(( StatusCode::CREATED, [( @@ -168,5 +171,7 @@ pub async fn delete_subscription( .delete_subscription(subscription_id) .await?; + notify_worker_about_schema_changes(state.schema_reader(), state.worker_svc_client()).await?; + Ok(StatusCode::ACCEPTED) } diff --git a/crates/admin/src/service.rs b/crates/admin/src/service.rs index 0e0ce7cf6..4937c59f4 100644 --- a/crates/admin/src/service.rs +++ b/crates/admin/src/service.rs @@ -16,7 +16,7 @@ use http::StatusCode; use tonic::transport::Channel; use tower::ServiceBuilder; -use restate_meta::MetaHandle; +use restate_meta::{FileMetaReader, MetaHandle}; use restate_node_services::worker::worker_svc_client::WorkerSvcClient; use restate_schema_impl::Schemas; use tracing::info; @@ -29,14 +29,21 @@ pub struct AdminService { opts: Options, schemas: Schemas, meta_handle: MetaHandle, + schema_reader: FileMetaReader, } impl AdminService { - pub fn new(opts: Options, schemas: Schemas, meta_handle: MetaHandle) -> Self { + pub fn new( + opts: Options, + schemas: Schemas, + meta_handle: MetaHandle, + schema_reader: FileMetaReader, + ) -> Self { Self { opts, schemas, meta_handle, + schema_reader, } } @@ -44,21 +51,18 @@ impl AdminService { self, drain: drain::Watch, worker_handle: impl restate_worker_api::Handle + Clone + Send + Sync + 'static, - worker_svc_client: Option>, + worker_svc_client: WorkerSvcClient, ) -> Result<(), Error> { - let rest_state = - state::AdminServiceState::new(self.meta_handle, self.schemas, worker_handle); - - let router = axum::Router::new(); + let rest_state = state::AdminServiceState::new( + self.meta_handle, + self.schemas, + worker_handle, + worker_svc_client.clone(), + self.schema_reader, + ); - // Stitch query http endpoint if enabled - let router = if let Some(worker_svc_client) = worker_svc_client { - let query_state = Arc::new(state::QueryServiceState { worker_svc_client }); - // Merge storage query router - router.merge(storage_query::create_router(query_state)) - } else { - router - }; + let query_state = Arc::new(state::QueryServiceState { worker_svc_client }); + let router = axum::Router::new().merge(storage_query::create_router(query_state)); let router = router // Merge meta API router diff --git a/crates/admin/src/state.rs b/crates/admin/src/state.rs index 75057c28a..7d695885f 100644 --- a/crates/admin/src/state.rs +++ b/crates/admin/src/state.rs @@ -9,7 +9,7 @@ // by the Apache License, Version 2.0. // -use restate_meta::MetaHandle; +use restate_meta::{FileMetaReader, MetaHandle}; use restate_node_services::worker::worker_svc_client::WorkerSvcClient; use restate_schema_impl::Schemas; use tonic::transport::Channel; @@ -19,6 +19,8 @@ pub struct AdminServiceState { meta_handle: MetaHandle, schemas: Schemas, worker_handle: W, + worker_svc_client: WorkerSvcClient, + schema_reader: FileMetaReader, } #[derive(Clone)] @@ -27,11 +29,19 @@ pub struct QueryServiceState { } impl AdminServiceState { - pub fn new(meta_handle: MetaHandle, schemas: Schemas, worker_handle: W) -> Self { + pub fn new( + meta_handle: MetaHandle, + schemas: Schemas, + worker_handle: W, + worker_svc_client: WorkerSvcClient, + schema_reader: FileMetaReader, + ) -> Self { Self { meta_handle, schemas, worker_handle, + worker_svc_client, + schema_reader, } } @@ -42,6 +52,14 @@ impl AdminServiceState { pub fn schemas(&self) -> &Schemas { &self.schemas } + + pub fn worker_svc_client(&self) -> WorkerSvcClient { + self.worker_svc_client.clone() + } + + pub fn schema_reader(&self) -> &FileMetaReader { + &self.schema_reader + } } impl AdminServiceState { diff --git a/crates/meta/src/error.rs b/crates/meta/src/error.rs index ddd2ba4b8..42a155af9 100644 --- a/crates/meta/src/error.rs +++ b/crates/meta/src/error.rs @@ -11,7 +11,7 @@ use restate_schema_impl::SchemasUpdateError; use restate_service_protocol::discovery::ServiceDiscoveryError; -use crate::storage::MetaStorageError; +use crate::storage::{MetaReaderError, MetaStorageError}; #[derive(Debug, thiserror::Error, codederror::CodedError)] pub enum Error { @@ -26,6 +26,9 @@ pub enum Error { Storage(#[from] MetaStorageError), #[error(transparent)] #[code(unknown)] + Reader(#[from] MetaReaderError), + #[error(transparent)] + #[code(unknown)] SchemaRegistry(#[from] SchemasUpdateError), #[error("meta closed")] #[code(unknown)] diff --git a/crates/node-services/proto/worker.proto b/crates/node-services/proto/worker.proto index 38fdd5651..bb9617337 100644 --- a/crates/node-services/proto/worker.proto +++ b/crates/node-services/proto/worker.proto @@ -25,6 +25,9 @@ service WorkerSvc { // Queries the storage of the worker and returns the result as a stream of responses rpc QueryStorage(StorageQueryRequest) returns (stream StorageQueryResponse); + + // Updates the schema information on the worker node + rpc UpdateSchemas(UpdateSchemaRequest) returns (google.protobuf.Empty); } message BifrostVersion { @@ -48,4 +51,8 @@ message StorageQueryRequest { message StorageQueryResponse { bytes header = 1; bytes data = 2; +} + +message UpdateSchemaRequest { + bytes schema_bin = 1; } \ No newline at end of file diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index d921b7716..a1664f7a7 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -117,6 +117,8 @@ impl Node { worker.bifrost_handle(), worker.worker_command_tx(), worker.storage_query_context().clone(), + worker.schemas(), + worker.subscription_controller(), ) }), cluster_controller_role.as_ref().map(|cluster_controller| { diff --git a/crates/node/src/roles/cluster_controller.rs b/crates/node/src/roles/cluster_controller.rs index d29845ffd..dc4668e91 100644 --- a/crates/node/src/roles/cluster_controller.rs +++ b/crates/node/src/roles/cluster_controller.rs @@ -114,7 +114,7 @@ impl ClusterControllerRole { component_set.spawn( self.admin - .run(inner_shutdown_watch, worker_handle, Some(worker_svc_client)) + .run(inner_shutdown_watch, worker_handle, worker_svc_client) .map_ok(|_| "admin") .map_err(ClusterControllerRoleError::Admin), ); @@ -143,7 +143,9 @@ impl TryFrom for ClusterControllerRole { fn try_from(options: Options) -> Result { let meta = options.meta.build(options.worker.kafka.clone())?; - let admin = options.admin.build(meta.schemas(), meta.meta_handle()); + let admin = options + .admin + .build(meta.schemas(), meta.meta_handle(), meta.schema_reader()); Ok(ClusterControllerRole { controller: restate_cluster_controller::Service::new(options.cluster_controller), diff --git a/crates/node/src/roles/mod.rs b/crates/node/src/roles/mod.rs index a41d01756..3c4b36ed2 100644 --- a/crates/node/src/roles/mod.rs +++ b/crates/node/src/roles/mod.rs @@ -14,4 +14,4 @@ mod worker; pub use cluster_controller::{ ClusterControllerRole, ClusterControllerRoleBuildError, ClusterControllerRoleError, }; -pub use worker::{WorkerRole, WorkerRoleBuildError, WorkerRoleError}; +pub use worker::{update_schemas, WorkerRole, WorkerRoleBuildError, WorkerRoleError}; diff --git a/crates/node/src/roles/worker.rs b/crates/node/src/roles/worker.rs index 88c1f8c7e..153663c9e 100644 --- a/crates/node/src/roles/worker.rs +++ b/crates/node/src/roles/worker.rs @@ -19,7 +19,7 @@ use restate_schema_impl::{Schemas, SchemasUpdateCommand}; use restate_storage_query_datafusion::context::QueryContext; use restate_storage_rocksdb::RocksDBStorage; use restate_types::NodeId; -use restate_worker::{Worker, WorkerCommandSender}; +use restate_worker::{SubscriptionControllerHandle, Worker, WorkerCommandSender}; use restate_worker_api::SubscriptionController; use std::time::Duration; use tokio::task::JoinSet; @@ -107,6 +107,14 @@ impl WorkerRole { self.worker.storage_query_context() } + pub fn schemas(&self) -> Schemas { + self.schemas.clone() + } + + pub fn subscription_controller(&self) -> Option { + Some(self.worker.subscription_controller_handle()) + } + pub async fn run( self, _node_id: NodeId, @@ -219,7 +227,7 @@ impl WorkerRole { SC: SubscriptionController + Send + Sync, { let schema_updates = Self::fetch_schemas(metadata_svc_client).await?; - Self::update_schemas(schemas, subscription_controller, schema_updates).await?; + update_schemas(schemas, subscription_controller, schema_updates).await?; Ok(()) } @@ -239,29 +247,6 @@ impl WorkerRole { )?; Ok(schema_updates) } - - async fn update_schemas( - schemas: &Schemas, - subscription_controller: Option<&SC>, - schema_updates: Vec, - ) -> Result<(), SchemaError> - where - SC: SubscriptionController + Send + Sync, - { - // hack to suppress repeated logging of schema registrations - // todo: Fix it - tracing::subscriber::with_default(NoSubscriber::new(), || { - schemas.overwrite(schema_updates) - })?; - - if let Some(subscription_controller) = subscription_controller { - let subscriptions = schemas.list_subscriptions(&[]); - subscription_controller - .update_subscriptions(subscriptions) - .await?; - } - Ok(()) - } } impl TryFrom for WorkerRole { @@ -279,3 +264,24 @@ impl TryFrom for WorkerRole { }) } } + +pub async fn update_schemas( + schemas: &Schemas, + subscription_controller: Option<&SC>, + schema_updates: Vec, +) -> Result<(), SchemaError> +where + SC: SubscriptionController + Send + Sync, +{ + // hack to suppress repeated logging of schema registrations + // todo: Fix it + tracing::subscriber::with_default(NoSubscriber::new(), || schemas.overwrite(schema_updates))?; + + if let Some(subscription_controller) = subscription_controller { + let subscriptions = schemas.list_subscriptions(&[]); + subscription_controller + .update_subscriptions(subscriptions) + .await?; + } + Ok(()) +} diff --git a/crates/node/src/server/handler/worker.rs b/crates/node/src/server/handler/worker.rs index e8b9cafdd..8975b5410 100644 --- a/crates/node/src/server/handler/worker.rs +++ b/crates/node/src/server/handler/worker.rs @@ -16,10 +16,11 @@ use restate_bifrost::Bifrost; use restate_node_services::worker::worker_svc_server::WorkerSvc; use restate_node_services::worker::{ BifrostVersion, StateMutationRequest, StorageQueryRequest, StorageQueryResponse, - TerminationRequest, + TerminationRequest, UpdateSchemaRequest, }; +use restate_schema_impl::{Schemas, SchemasUpdateCommand}; use restate_storage_query_datafusion::context::QueryContext; -use restate_worker::WorkerCommandSender; +use restate_worker::{SubscriptionControllerHandle, WorkerCommandSender}; use restate_worker_api::Handle; use tonic::{Request, Response, Status}; @@ -27,6 +28,8 @@ pub struct WorkerHandler { bifrost: Bifrost, worker_cmd_tx: WorkerCommandSender, query_context: QueryContext, + schemas: Schemas, + subscription_controller: Option, } impl WorkerHandler { @@ -34,11 +37,15 @@ impl WorkerHandler { bifrost: Bifrost, worker_cmd_tx: WorkerCommandSender, query_context: QueryContext, + schemas: Schemas, + subscription_controller: Option, ) -> Self { Self { bifrost, worker_cmd_tx, query_context, + schemas, + subscription_controller, } } } @@ -120,4 +127,28 @@ impl WorkerSvc for WorkerHandler { Ok(Response::new(Box::pin(response_stream))) } + + async fn update_schemas( + &self, + request: Request, + ) -> Result, Status> { + let (schema_updates, _) = + bincode::serde::decode_from_slice::, _>( + &request.into_inner().schema_bin, + bincode::config::standard(), + ) + .map_err(|err| Status::invalid_argument(err.to_string()))?; + + crate::roles::update_schemas( + &self.schemas, + self.subscription_controller.as_ref(), + schema_updates, + ) + .await + .map_err(|err| { + Status::internal(format!("failed updating the schema information: {err}")) + })?; + + Ok(Response::new(())) + } } diff --git a/crates/node/src/server/service.rs b/crates/node/src/server/service.rs index 8b23cc94b..e6651dd90 100644 --- a/crates/node/src/server/service.rs +++ b/crates/node/src/server/service.rs @@ -31,8 +31,9 @@ use restate_node_services::metadata::metadata_svc_server::MetadataSvcServer; use restate_node_services::node_ctrl::node_ctrl_svc_server::NodeCtrlSvcServer; use restate_node_services::worker::worker_svc_server::WorkerSvcServer; use restate_node_services::{cluster_controller, metadata, node_ctrl, worker}; +use restate_schema_impl::Schemas; use restate_storage_query_datafusion::context::QueryContext; -use restate_worker::WorkerCommandSender; +use restate_worker::{SubscriptionControllerHandle, WorkerCommandSender}; use crate::server::multiplex::MultiplexService; use crate::server::options::Options; @@ -133,6 +134,8 @@ impl NodeServer { bifrost, worker_cmd_tx, query_context, + schemas, + subscription_controller, .. }) = self.worker { @@ -140,6 +143,8 @@ impl NodeServer { bifrost, worker_cmd_tx, query_context, + schemas, + subscription_controller, ))); } @@ -185,6 +190,8 @@ pub struct WorkerDependencies { bifrost: Bifrost, worker_cmd_tx: WorkerCommandSender, query_context: QueryContext, + schemas: Schemas, + subscription_controller: Option, } impl WorkerDependencies { @@ -193,12 +200,16 @@ impl WorkerDependencies { bifrost: Bifrost, worker_cmd_tx: WorkerCommandSender, query_context: QueryContext, + schemas: Schemas, + subscription_controller: Option, ) -> Self { WorkerDependencies { rocksdb, bifrost, worker_cmd_tx, query_context, + schemas, + subscription_controller, } } } diff --git a/crates/worker/src/lib.rs b/crates/worker/src/lib.rs index 0738c4a1f..d2c895479 100644 --- a/crates/worker/src/lib.rs +++ b/crates/worker/src/lib.rs @@ -80,7 +80,7 @@ pub use restate_storage_query_datafusion::{ OptionsBuilderError as StorageQueryDatafusionOptionsBuilderError, }; -use crate::subscription_integration::SubscriptionControllerHandle; +pub use crate::subscription_integration::SubscriptionControllerHandle; pub use restate_storage_query_postgres::{ Options as StorageQueryPostgresOptions, OptionsBuilder as StorageQueryPostgresOptionsBuilder, OptionsBuilderError as StorageQueryPostgresOptionsBuilderError, diff --git a/tools/xtask/Cargo.toml b/tools/xtask/Cargo.toml index a12bd5341..03b6caff6 100644 --- a/tools/xtask/Cargo.toml +++ b/tools/xtask/Cargo.toml @@ -11,6 +11,7 @@ publish = false restate-admin = { workspace = true, features = ["options_schema"] } restate-bifrost = { workspace = true, features = ["options_schema"] } restate-meta = { workspace = true } +restate-node-services = { workspace = true } restate-schema-api = { workspace = true, features = ["subscription"] } restate-server = { workspace = true, features = ["options_schema"] } restate-types = { workspace = true } @@ -23,3 +24,4 @@ schemars = { workspace = true } serde_json = { workspace = true } serde_yaml = "0.9" tokio = { workspace = true } +tonic = { workspace = true } diff --git a/tools/xtask/src/main.rs b/tools/xtask/src/main.rs index c1508083b..b3d44fd17 100644 --- a/tools/xtask/src/main.rs +++ b/tools/xtask/src/main.rs @@ -10,6 +10,7 @@ use anyhow::bail; use reqwest::header::ACCEPT; +use restate_node_services::worker::worker_svc_client::WorkerSvcClient; use restate_schema_api::subscription::Subscription; use restate_types::identifiers::SubscriptionId; use restate_types::invocation::InvocationTermination; @@ -19,6 +20,7 @@ use restate_worker_api::Error; use schemars::gen::SchemaSettings; use std::env; use std::time::Duration; +use tonic::transport::{Channel, Uri}; fn generate_config_schema() -> anyhow::Result<()> { let schema = SchemaSettings::draft2019_09() @@ -88,12 +90,17 @@ async fn generate_rest_api_doc() -> anyhow::Result<()> { "http://localhost:{}/openapi", admin_options.bind_address.port() ); - let admin_service = admin_options.build(meta.schemas(), meta.meta_handle()); + let admin_service = + admin_options.build(meta.schemas(), meta.meta_handle(), meta.schema_reader()); meta.init().await.unwrap(); // We start the Meta component, then download the openapi schema generated let (shutdown_signal, shutdown_watch) = drain::channel(); - let join_handle = tokio::spawn(admin_service.run(shutdown_watch, Mock, None)); + let join_handle = tokio::spawn(admin_service.run( + shutdown_watch, + Mock, + WorkerSvcClient::new(Channel::builder(Uri::default()).connect_lazy()), + )); let res = RetryPolicy::fixed_delay(Duration::from_millis(100), 20) .retry_operation(|| async {