Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move admin and meta component into cluster controller role #1180

Merged
merged 9 commits into from
Feb 15, 2024
571 changes: 218 additions & 353 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ restate-worker-api = { path = "crates/worker-api" }
ahash = "0.8.5"
anyhow = "1.0.68"
arc-swap = "1.6"
arrow = { version = "50.0.0", default-features = false }
arrow-flight = { version = "50.0.0" }
assert2 = "0.3.11"
async-channel = "2.1.1"
async-trait = "0.1.73"
Expand All @@ -84,8 +86,8 @@ bytes-utils = "0.1.3"
bytestring = { version = "1.2", features = ["serde"] }
chrono = { version = "0.4.31", default-features = false, features = ["clock"] }
criterion = "0.5"
datafusion = { version = "31.0.0" }
datafusion-expr = { version = "31.0.0" }
datafusion = { version = "35.0.0" }
datafusion-expr = { version = "35.0.0" }
derive_builder = "0.12.0"
derive_more = { version = "0.99.17" }
drain = "0.1.1"
Expand Down
2 changes: 1 addition & 1 deletion cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ restate-service-protocol = { workspace = true }
restate-types = { workspace = true }

anyhow = { workspace = true }
arrow = { version = "49.0.0", default-features = false, features = ["ipc", "prettyprint"] }
arrow = { workspace = true, features = ["ipc", "prettyprint"] }
bytes = { workspace = true }
base64 = { workspace = true}
chrono = { workspace = true }
Expand Down
4 changes: 3 additions & 1 deletion crates/admin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@ restate-fs-util = { workspace = true }
restate-futures-util = { workspace = true }
restate-meta = { workspace = true }
restate-meta-rest-model = { workspace = true, features = ["schema"] }
restate-node-services = { workspace = true }
restate-pb = { workspace = true }
restate-schema-api = { workspace = true, features = ["service", "deployment", "serde", "serde_schema"] }
restate-schema-impl = { workspace = true }
restate-service-client = { workspace = true }
restate-service-protocol = { workspace = true, features = ["discovery"] }
restate-storage-query-datafusion = { workspace = true }
restate-types = { workspace = true, features = ["serde", "serde_schema"] }
restate-worker-api = { workspace = true }

arrow-flight = { workspace = true }
axum = { workspace = true }
bincode = { workspace = true }
bytes = { workspace = true }
Expand All @@ -47,6 +48,7 @@ serde_json = { workspace = true }
serde_with = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tonic = { workspace = true }
tower = { workspace = true, features = ["load-shed", "limit"] }
tracing = { workspace = true }

Expand Down
7 changes: 3 additions & 4 deletions crates/admin/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use restate_bifrost::Bifrost;
use restate_meta::MetaHandle;
use restate_meta::{FileMetaReader, MetaHandle};
use restate_schema_impl::Schemas;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
Expand Down Expand Up @@ -49,8 +48,8 @@ impl Options {
self,
schemas: Schemas,
meta_handle: MetaHandle,
bifrost: Bifrost,
schema_reader: FileMetaReader,
) -> AdminService {
AdminService::new(self, schemas, meta_handle, bifrost)
AdminService::new(self, schemas, meta_handle, schema_reader)
}
}
5 changes: 5 additions & 0 deletions crates/admin/src/rest_api/deployments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use restate_service_client::Endpoint;
use restate_service_protocol::discovery::DiscoverEndpoint;
use restate_types::identifiers::InvalidLambdaARN;

use crate::rest_api::notify_worker_about_schema_changes;
use axum::body::Bytes;
use axum::extract::{Path, Query, State};
use axum::http::{header, HeaderValue, StatusCode};
Expand Down Expand Up @@ -97,6 +98,8 @@ pub async fn create_deployment<W>(
.register_deployment(discover_endpoint, force, apply_changes)
.await?;

notify_worker_about_schema_changes(state.schema_reader(), state.worker_svc_client()).await?;

let response_body = RegisterDeploymentResponse {
id: registration_result.deployment,
services: registration_result.services,
Expand Down Expand Up @@ -241,6 +244,8 @@ pub async fn delete_deployment<W>(
) -> Result<StatusCode, MetaApiError> {
if let Some(true) = force {
state.meta_handle().remove_deployment(deployment_id).await?;
notify_worker_about_schema_changes(state.schema_reader(), state.worker_svc_client())
.await?;
Ok(StatusCode::ACCEPTED)
} else {
Ok(StatusCode::NOT_IMPLEMENTED)
Expand Down
2 changes: 2 additions & 0 deletions crates/admin/src/rest_api/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ pub enum MetaApiError {
Meta(#[from] MetaError),
#[error(transparent)]
Worker(#[from] restate_worker_api::Error),
#[error(transparent)]
Generic(Error),
}

/// # Error description response
Expand Down
2 changes: 1 addition & 1 deletion crates/admin/src/rest_api/invocations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ pub async fn delete_invocation<W>(
Query(DeleteInvocationParams { mode }): Query<DeleteInvocationParams>,
) -> Result<StatusCode, MetaApiError>
where
W: restate_worker_api::Handle + Send,
W: restate_worker_api::Handle + Clone + Send,
{
let invocation_id = invocation_id
.parse::<InvocationId>()
Expand Down
35 changes: 34 additions & 1 deletion crates/admin/src/rest_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,18 @@ mod methods;
mod services;
mod subscriptions;

use crate::rest_api::error::MetaApiError;
use okapi_operation::axum_integration::{delete, get, patch, post};
use okapi_operation::*;
use restate_meta::{FileMetaReader, MetaReader};
use restate_node_services::worker::worker_svc_client::WorkerSvcClient;
use restate_node_services::worker::UpdateSchemaRequest;
use tonic::transport::Channel;
use tracing::debug;

use crate::state::AdminServiceState;

pub fn create_router<W: restate_worker_api::Handle + Send + Sync + 'static>(
pub fn create_router<W: restate_worker_api::Handle + Clone + Send + Sync + 'static>(
state: AdminServiceState<W>,
) -> axum::Router<()> {
// Setup the router
Expand Down Expand Up @@ -101,3 +107,30 @@ pub fn create_router<W: restate_worker_api::Handle + Send + Sync + 'static>(
.expect("Error when building the OpenAPI specification")
.with_state(state)
}

/// Notifies the worker about schema changes. This method is best-effort and will not fail if the worker
/// could not be reached.
async fn notify_worker_about_schema_changes(
schema_reader: &FileMetaReader,
mut worker_svc_client: WorkerSvcClient<Channel>,
) -> Result<(), MetaApiError> {
let schema_updates = schema_reader
.read()
.await
.map_err(|err| MetaApiError::Meta(err.into()))?;

// don't fail if the worker is not reachable
let result = worker_svc_client
.update_schemas(UpdateSchemaRequest {
schema_bin: bincode::serde::encode_to_vec(schema_updates, bincode::config::standard())
.map_err(|err| MetaApiError::Generic(err.into()))?
.into(),
})
.await;

if let Err(err) = result {
debug!("Failed notifying worker about schema changes: {err}");
}

Ok(())
}
5 changes: 4 additions & 1 deletion crates/admin/src/rest_api/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use restate_meta_rest_model::services::*;
use restate_pb::grpc::reflection::v1::FileDescriptorResponse;
use restate_schema_api::service::ServiceMetadataResolver;

use crate::rest_api::notify_worker_about_schema_changes;
use axum::extract::{Path, State};
use axum::http::{header, HeaderValue};
use axum::response::{IntoResponse, Response};
Expand Down Expand Up @@ -91,6 +92,8 @@ pub async fn modify_service<W>(
.modify_service(service_name.clone(), public)
.await?;

notify_worker_about_schema_changes(state.schema_reader(), state.worker_svc_client()).await?;

state
.schemas()
.resolve_latest_service_metadata(&service_name)
Expand Down Expand Up @@ -129,7 +132,7 @@ pub async fn modify_service_state<W>(
}): Json<ModifyServiceStateRequest>,
) -> Result<StatusCode, MetaApiError>
where
W: restate_worker_api::Handle + Send,
W: restate_worker_api::Handle + Clone + Send,
{
let service_id = ServiceId::new(service_name, service_key);

Expand Down
5 changes: 5 additions & 0 deletions crates/admin/src/rest_api/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::state::AdminServiceState;
use restate_meta_rest_model::subscriptions::*;
use restate_schema_api::subscription::SubscriptionResolver;

use crate::rest_api::notify_worker_about_schema_changes;
use axum::extract::Query;
use axum::extract::{Path, State};
use axum::http::StatusCode;
Expand Down Expand Up @@ -52,6 +53,8 @@ pub async fn create_subscription<W>(
)
.await?;

notify_worker_about_schema_changes(state.schema_reader(), state.worker_svc_client()).await?;

Ok((
StatusCode::CREATED,
[(
Expand Down Expand Up @@ -168,5 +171,7 @@ pub async fn delete_subscription<W>(
.delete_subscription(subscription_id)
.await?;

notify_worker_about_schema_changes(state.schema_reader(), state.worker_svc_client()).await?;

Ok(StatusCode::ACCEPTED)
}
41 changes: 22 additions & 19 deletions crates/admin/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,53 +13,56 @@ use std::sync::Arc;
use axum::error_handling::HandleErrorLayer;
use futures::FutureExt;
use http::StatusCode;
use restate_bifrost::Bifrost;
use tonic::transport::Channel;
use tower::ServiceBuilder;

use restate_meta::MetaHandle;
use restate_meta::{FileMetaReader, MetaHandle};
use restate_node_services::worker::worker_svc_client::WorkerSvcClient;
use restate_schema_impl::Schemas;
use restate_storage_query_datafusion::context::QueryContext;
use tracing::info;

use crate::{rest_api, state, storage_query};
use crate::{Error, Options};

#[derive(Debug)]
pub struct AdminService {
opts: Options,
schemas: Schemas,
meta_handle: MetaHandle,
_bifrost: Bifrost,
schema_reader: FileMetaReader,
}

impl AdminService {
pub fn new(opts: Options, schemas: Schemas, meta_handle: MetaHandle, bifrost: Bifrost) -> Self {
pub fn new(
opts: Options,
schemas: Schemas,
meta_handle: MetaHandle,
schema_reader: FileMetaReader,
) -> Self {
Self {
opts,
schemas,
meta_handle,
_bifrost: bifrost,
schema_reader,
}
}

pub async fn run(
self,
drain: drain::Watch,
worker_handle: impl restate_worker_api::Handle + Clone + Send + Sync + 'static,
query_context: Option<QueryContext>,
worker_svc_client: WorkerSvcClient<Channel>,
) -> Result<(), Error> {
let rest_state =
state::AdminServiceState::new(self.meta_handle, self.schemas, worker_handle);

let router = axum::Router::new();
let rest_state = state::AdminServiceState::new(
self.meta_handle,
self.schemas,
worker_handle,
worker_svc_client.clone(),
self.schema_reader,
);

// Stitch query http endpoint if enabled
let router = if let Some(query_context) = query_context {
let query_state = Arc::new(state::QueryServiceState { query_context });
// Merge storage query router
router.merge(storage_query::create_router(query_state))
} else {
router
};
let query_state = Arc::new(state::QueryServiceState { worker_svc_client });
let router = axum::Router::new().merge(storage_query::create_router(query_state));

let router = router
// Merge meta API router
Expand Down
33 changes: 27 additions & 6 deletions crates/admin/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,39 @@
// by the Apache License, Version 2.0.
//

use restate_meta::MetaHandle;
use restate_meta::{FileMetaReader, MetaHandle};
use restate_node_services::worker::worker_svc_client::WorkerSvcClient;
use restate_schema_impl::Schemas;
use restate_storage_query_datafusion::context::QueryContext;
use tonic::transport::Channel;

#[derive(Clone, derive_builder::Builder)]
pub struct AdminServiceState<W> {
meta_handle: MetaHandle,
schemas: Schemas,
worker_handle: W,
worker_svc_client: WorkerSvcClient<Channel>,
schema_reader: FileMetaReader,
}

#[derive(Clone)]
pub struct QueryServiceState {
pub query_context: QueryContext,
pub worker_svc_client: WorkerSvcClient<Channel>,
}

impl<W> AdminServiceState<W> {
pub fn new(meta_handle: MetaHandle, schemas: Schemas, worker_handle: W) -> Self {
pub fn new(
meta_handle: MetaHandle,
schemas: Schemas,
worker_handle: W,
worker_svc_client: WorkerSvcClient<Channel>,
schema_reader: FileMetaReader,
) -> Self {
Self {
meta_handle,
schemas,
worker_handle,
worker_svc_client,
schema_reader,
}
}

Expand All @@ -42,7 +53,17 @@ impl<W> AdminServiceState<W> {
&self.schemas
}

pub fn worker_handle(&self) -> &W {
&self.worker_handle
pub fn worker_svc_client(&self) -> WorkerSvcClient<Channel> {
self.worker_svc_client.clone()
}

pub fn schema_reader(&self) -> &FileMetaReader {
&self.schema_reader
}
}

impl<W: Clone> AdminServiceState<W> {
pub fn worker_handle(&self) -> W {
self.worker_handle.clone()
}
}
Loading
Loading