Skip to content

Commit

Permalink
Push schema information to workers from admin service on schema changes
Browse files Browse the repository at this point in the history
This commit enables the admin service to push schema updates to the worker
in case of schema changes.
  • Loading branch information
tillrohrmann committed Feb 15, 2024
1 parent 7936da5 commit 429b74a
Show file tree
Hide file tree
Showing 20 changed files with 203 additions and 55 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 8 additions & 3 deletions crates/admin/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use restate_meta::MetaHandle;
use restate_meta::{FileMetaReader, MetaHandle};
use restate_schema_impl::Schemas;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
Expand Down Expand Up @@ -44,7 +44,12 @@ impl Default for Options {
}

impl Options {
pub fn build(self, schemas: Schemas, meta_handle: MetaHandle) -> AdminService {
AdminService::new(self, schemas, meta_handle)
pub fn build(
self,
schemas: Schemas,
meta_handle: MetaHandle,
schema_reader: FileMetaReader,
) -> AdminService {
AdminService::new(self, schemas, meta_handle, schema_reader)
}
}
5 changes: 5 additions & 0 deletions crates/admin/src/rest_api/deployments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use restate_service_client::Endpoint;
use restate_service_protocol::discovery::DiscoverEndpoint;
use restate_types::identifiers::InvalidLambdaARN;

use crate::rest_api::notify_worker_about_schema_changes;
use axum::body::Bytes;
use axum::extract::{Path, Query, State};
use axum::http::{header, HeaderValue, StatusCode};
Expand Down Expand Up @@ -97,6 +98,8 @@ pub async fn create_deployment<W>(
.register_deployment(discover_endpoint, force, apply_changes)
.await?;

notify_worker_about_schema_changes(state.schema_reader(), state.worker_svc_client()).await?;

let response_body = RegisterDeploymentResponse {
id: registration_result.deployment,
services: registration_result.services,
Expand Down Expand Up @@ -241,6 +244,8 @@ pub async fn delete_deployment<W>(
) -> Result<StatusCode, MetaApiError> {
if let Some(true) = force {
state.meta_handle().remove_deployment(deployment_id).await?;
notify_worker_about_schema_changes(state.schema_reader(), state.worker_svc_client())
.await?;
Ok(StatusCode::ACCEPTED)
} else {
Ok(StatusCode::NOT_IMPLEMENTED)
Expand Down
2 changes: 2 additions & 0 deletions crates/admin/src/rest_api/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ pub enum MetaApiError {
Meta(#[from] MetaError),
#[error(transparent)]
Worker(#[from] restate_worker_api::Error),
#[error(transparent)]
Generic(Error),
}

/// # Error description response
Expand Down
33 changes: 33 additions & 0 deletions crates/admin/src/rest_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,14 @@ mod methods;
mod services;
mod subscriptions;

use crate::rest_api::error::MetaApiError;
use okapi_operation::axum_integration::{delete, get, patch, post};
use okapi_operation::*;
use restate_meta::{FileMetaReader, MetaReader};
use restate_node_services::worker::worker_svc_client::WorkerSvcClient;
use restate_node_services::worker::UpdateSchemaRequest;
use tonic::transport::Channel;
use tracing::debug;

use crate::state::AdminServiceState;

Expand Down Expand Up @@ -101,3 +107,30 @@ pub fn create_router<W: restate_worker_api::Handle + Clone + Send + Sync + 'stat
.expect("Error when building the OpenAPI specification")
.with_state(state)
}

/// Notifies the worker about schema changes. This method is best-effort and will not fail if the worker
/// could not be reached.
async fn notify_worker_about_schema_changes(
schema_reader: &FileMetaReader,
mut worker_svc_client: WorkerSvcClient<Channel>,
) -> Result<(), MetaApiError> {
let schema_updates = schema_reader
.read()
.await
.map_err(|err| MetaApiError::Meta(err.into()))?;

// don't fail if the worker is not reachable
let result = worker_svc_client
.update_schemas(UpdateSchemaRequest {
schema_bin: bincode::serde::encode_to_vec(schema_updates, bincode::config::standard())
.map_err(|err| MetaApiError::Generic(err.into()))?
.into(),
})
.await;

if let Err(err) = result {
debug!("Failed notifying worker about schema changes: {err}");
}

Ok(())
}
3 changes: 3 additions & 0 deletions crates/admin/src/rest_api/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use restate_meta_rest_model::services::*;
use restate_pb::grpc::reflection::v1::FileDescriptorResponse;
use restate_schema_api::service::ServiceMetadataResolver;

use crate::rest_api::notify_worker_about_schema_changes;
use axum::extract::{Path, State};
use axum::http::{header, HeaderValue};
use axum::response::{IntoResponse, Response};
Expand Down Expand Up @@ -91,6 +92,8 @@ pub async fn modify_service<W>(
.modify_service(service_name.clone(), public)
.await?;

notify_worker_about_schema_changes(state.schema_reader(), state.worker_svc_client()).await?;

state
.schemas()
.resolve_latest_service_metadata(&service_name)
Expand Down
5 changes: 5 additions & 0 deletions crates/admin/src/rest_api/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::state::AdminServiceState;
use restate_meta_rest_model::subscriptions::*;
use restate_schema_api::subscription::SubscriptionResolver;

use crate::rest_api::notify_worker_about_schema_changes;
use axum::extract::Query;
use axum::extract::{Path, State};
use axum::http::StatusCode;
Expand Down Expand Up @@ -52,6 +53,8 @@ pub async fn create_subscription<W>(
)
.await?;

notify_worker_about_schema_changes(state.schema_reader(), state.worker_svc_client()).await?;

Ok((
StatusCode::CREATED,
[(
Expand Down Expand Up @@ -168,5 +171,7 @@ pub async fn delete_subscription<W>(
.delete_subscription(subscription_id)
.await?;

notify_worker_about_schema_changes(state.schema_reader(), state.worker_svc_client()).await?;

Ok(StatusCode::ACCEPTED)
}
34 changes: 19 additions & 15 deletions crates/admin/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use http::StatusCode;
use tonic::transport::Channel;
use tower::ServiceBuilder;

use restate_meta::MetaHandle;
use restate_meta::{FileMetaReader, MetaHandle};
use restate_node_services::worker::worker_svc_client::WorkerSvcClient;
use restate_schema_impl::Schemas;
use tracing::info;
Expand All @@ -29,36 +29,40 @@ pub struct AdminService {
opts: Options,
schemas: Schemas,
meta_handle: MetaHandle,
schema_reader: FileMetaReader,
}

impl AdminService {
pub fn new(opts: Options, schemas: Schemas, meta_handle: MetaHandle) -> Self {
pub fn new(
opts: Options,
schemas: Schemas,
meta_handle: MetaHandle,
schema_reader: FileMetaReader,
) -> Self {
Self {
opts,
schemas,
meta_handle,
schema_reader,
}
}

pub async fn run(
self,
drain: drain::Watch,
worker_handle: impl restate_worker_api::Handle + Clone + Send + Sync + 'static,
worker_svc_client: Option<WorkerSvcClient<Channel>>,
worker_svc_client: WorkerSvcClient<Channel>,
) -> Result<(), Error> {
let rest_state =
state::AdminServiceState::new(self.meta_handle, self.schemas, worker_handle);

let router = axum::Router::new();
let rest_state = state::AdminServiceState::new(
self.meta_handle,
self.schemas,
worker_handle,
worker_svc_client.clone(),
self.schema_reader,
);

// Stitch query http endpoint if enabled
let router = if let Some(worker_svc_client) = worker_svc_client {
let query_state = Arc::new(state::QueryServiceState { worker_svc_client });
// Merge storage query router
router.merge(storage_query::create_router(query_state))
} else {
router
};
let query_state = Arc::new(state::QueryServiceState { worker_svc_client });
let router = axum::Router::new().merge(storage_query::create_router(query_state));

let router = router
// Merge meta API router
Expand Down
22 changes: 20 additions & 2 deletions crates/admin/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
// by the Apache License, Version 2.0.
//

use restate_meta::MetaHandle;
use restate_meta::{FileMetaReader, MetaHandle};
use restate_node_services::worker::worker_svc_client::WorkerSvcClient;
use restate_schema_impl::Schemas;
use tonic::transport::Channel;
Expand All @@ -19,6 +19,8 @@ pub struct AdminServiceState<W> {
meta_handle: MetaHandle,
schemas: Schemas,
worker_handle: W,
worker_svc_client: WorkerSvcClient<Channel>,
schema_reader: FileMetaReader,
}

#[derive(Clone)]
Expand All @@ -27,11 +29,19 @@ pub struct QueryServiceState {
}

impl<W> AdminServiceState<W> {
pub fn new(meta_handle: MetaHandle, schemas: Schemas, worker_handle: W) -> Self {
pub fn new(
meta_handle: MetaHandle,
schemas: Schemas,
worker_handle: W,
worker_svc_client: WorkerSvcClient<Channel>,
schema_reader: FileMetaReader,
) -> Self {
Self {
meta_handle,
schemas,
worker_handle,
worker_svc_client,
schema_reader,
}
}

Expand All @@ -42,6 +52,14 @@ impl<W> AdminServiceState<W> {
pub fn schemas(&self) -> &Schemas {
&self.schemas
}

pub fn worker_svc_client(&self) -> WorkerSvcClient<Channel> {
self.worker_svc_client.clone()
}

pub fn schema_reader(&self) -> &FileMetaReader {
&self.schema_reader
}
}

impl<W: Clone> AdminServiceState<W> {
Expand Down
5 changes: 4 additions & 1 deletion crates/meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
use restate_schema_impl::SchemasUpdateError;
use restate_service_protocol::discovery::ServiceDiscoveryError;

use crate::storage::MetaStorageError;
use crate::storage::{MetaReaderError, MetaStorageError};

#[derive(Debug, thiserror::Error, codederror::CodedError)]
pub enum Error {
Expand All @@ -26,6 +26,9 @@ pub enum Error {
Storage(#[from] MetaStorageError),
#[error(transparent)]
#[code(unknown)]
Reader(#[from] MetaReaderError),
#[error(transparent)]
#[code(unknown)]
SchemaRegistry(#[from] SchemasUpdateError),
#[error("meta closed")]
#[code(unknown)]
Expand Down
7 changes: 7 additions & 0 deletions crates/node-services/proto/worker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ service WorkerSvc {

// Queries the storage of the worker and returns the result as a stream of responses
rpc QueryStorage(StorageQueryRequest) returns (stream StorageQueryResponse);

// Updates the schema information on the worker node
rpc UpdateSchemas(UpdateSchemaRequest) returns (google.protobuf.Empty);
}

message BifrostVersion {
Expand All @@ -48,4 +51,8 @@ message StorageQueryRequest {
message StorageQueryResponse {
bytes header = 1;
bytes data = 2;
}

message UpdateSchemaRequest {
bytes schema_bin = 1;
}
2 changes: 2 additions & 0 deletions crates/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ impl Node {
worker.bifrost_handle(),
worker.worker_command_tx(),
worker.storage_query_context().clone(),
worker.schemas(),
worker.subscription_controller(),
)
}),
cluster_controller_role.as_ref().map(|cluster_controller| {
Expand Down
6 changes: 4 additions & 2 deletions crates/node/src/roles/cluster_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ impl ClusterControllerRole {

component_set.spawn(
self.admin
.run(inner_shutdown_watch, worker_handle, Some(worker_svc_client))
.run(inner_shutdown_watch, worker_handle, worker_svc_client)
.map_ok(|_| "admin")
.map_err(ClusterControllerRoleError::Admin),
);
Expand Down Expand Up @@ -143,7 +143,9 @@ impl TryFrom<Options> for ClusterControllerRole {

fn try_from(options: Options) -> Result<Self, Self::Error> {
let meta = options.meta.build(options.worker.kafka.clone())?;
let admin = options.admin.build(meta.schemas(), meta.meta_handle());
let admin = options
.admin
.build(meta.schemas(), meta.meta_handle(), meta.schema_reader());

Ok(ClusterControllerRole {
controller: restate_cluster_controller::Service::new(options.cluster_controller),
Expand Down
2 changes: 1 addition & 1 deletion crates/node/src/roles/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ mod worker;
pub use cluster_controller::{
ClusterControllerRole, ClusterControllerRoleBuildError, ClusterControllerRoleError,
};
pub use worker::{WorkerRole, WorkerRoleBuildError, WorkerRoleError};
pub use worker::{update_schemas, WorkerRole, WorkerRoleBuildError, WorkerRoleError};
Loading

0 comments on commit 429b74a

Please sign in to comment.