diff --git a/Cargo.lock b/Cargo.lock index 0e50f6c59..0aa3d0565 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4811,7 +4811,6 @@ dependencies = [ "restate-test-util", "restate-types", "restate-wal-protocol", - "restate-worker-api", "schemars", "serde", "serde_json", @@ -5387,7 +5386,6 @@ dependencies = [ "restate-test-util", "restate-types", "restate-worker", - "restate-worker-api", "rocksdb", "schemars", "serde", @@ -5946,7 +5944,6 @@ dependencies = [ "restate-timer", "restate-types", "restate-wal-protocol", - "restate-worker-api", "schemars", "serde", "serde_json", @@ -5962,15 +5959,6 @@ dependencies = [ "tracing-subscriber", ] -[[package]] -name = "restate-worker-api" -version = "0.9.0" -dependencies = [ - "restate-schema-api", - "restate-types", - "thiserror", -] - [[package]] name = "rgb" version = "0.8.37" @@ -7922,7 +7910,7 @@ dependencies = [ "restate-schema-api", "restate-server", "restate-types", - "restate-worker-api", + "restate-worker", "schemars", "serde_json", "serde_yaml", diff --git a/Cargo.toml b/Cargo.toml index e518e5d95..1a22c5d1b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,7 +70,6 @@ restate-tracing-instrumentation = { path = "crates/tracing-instrumentation" } restate-types = { path = "crates/types" } restate-wal-protocol = { path = "crates/wal-protocol" } restate-worker = { path = "crates/worker" } -restate-worker-api = { path = "crates/worker-api" } # External crates ahash = "0.8.5" diff --git a/crates/admin/Cargo.toml b/crates/admin/Cargo.toml index a1c4a42c6..ad20f7bef 100644 --- a/crates/admin/Cargo.toml +++ b/crates/admin/Cargo.toml @@ -27,7 +27,6 @@ restate-service-client = { workspace = true } restate-service-protocol = { workspace = true, features = ["discovery"] } restate-types = { workspace = true, features = ["serde", "serde_schema"] } restate-wal-protocol = { workspace = true } -restate-worker-api = { workspace = true } anyhow = { workspace = true } arrow-flight = { workspace = true } diff --git a/crates/admin/src/rest_api/error.rs b/crates/admin/src/rest_api/error.rs index d8fb5787e..0f47ae714 100644 --- a/crates/admin/src/rest_api/error.rs +++ b/crates/admin/src/rest_api/error.rs @@ -43,8 +43,6 @@ pub enum MetaApiError { SubscriptionNotFound(SubscriptionId), #[error(transparent)] Meta(#[from] MetaError), - #[error(transparent)] - Worker(#[from] restate_worker_api::Error), #[error("Internal server error: {0}")] Internal(String), } @@ -69,7 +67,6 @@ impl IntoResponse for MetaApiError { | MetaApiError::DeploymentNotFound(_) | MetaApiError::SubscriptionNotFound(_) => StatusCode::NOT_FOUND, MetaApiError::InvalidField(_, _) => StatusCode::BAD_REQUEST, - MetaApiError::Worker(_) => StatusCode::SERVICE_UNAVAILABLE, MetaApiError::Meta(MetaError::SchemaRegistry(schema_registry_error)) => { match schema_registry_error.kind() { ErrorKind::NotFound => StatusCode::NOT_FOUND, diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index 47eea5a9b..7fda454f9 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -35,7 +35,6 @@ restate-storage-query-datafusion = { workspace = true } restate-storage-rocksdb = { workspace = true } restate-types = { workspace = true } restate-worker = { workspace = true } -restate-worker-api = { workspace = true } anyhow = { workspace = true } arc-swap = { workspace = true } diff --git a/crates/node/src/roles/worker.rs b/crates/node/src/roles/worker.rs index ad7684e98..6467f532a 100644 --- a/crates/node/src/roles/worker.rs +++ b/crates/node/src/roles/worker.rs @@ -30,8 +30,8 @@ use restate_storage_query_datafusion::context::QueryContext; use restate_storage_rocksdb::RocksDBStorage; use restate_types::net::AdvertisedAddress; use restate_types::retries::RetryPolicy; +use restate_worker::SubscriptionController; use restate_worker::{KafkaIngressOptions, SubscriptionControllerHandle, Worker}; -use restate_worker_api::SubscriptionController; use tracing::info; use restate_worker::Options as WorkerOptions; @@ -74,7 +74,7 @@ pub enum SchemaError { ), #[error("failed updating subscriptions: {0}")] #[code(unknown)] - Subscription(#[from] restate_worker_api::Error), + Subscription(#[from] restate_worker::WorkerHandleError), } #[derive(Debug, thiserror::Error, CodedError)] diff --git a/crates/worker-api/Cargo.toml b/crates/worker-api/Cargo.toml deleted file mode 100644 index 124e072cd..000000000 --- a/crates/worker-api/Cargo.toml +++ /dev/null @@ -1,14 +0,0 @@ -[package] -name = "restate-worker-api" -version.workspace = true -authors.workspace = true -edition.workspace = true -rust-version.workspace = true -license.workspace = true -publish = false - -[dependencies] -restate-schema-api = { workspace = true, features = ["subscription"]} -restate-types = { workspace = true } - -thiserror = { workspace = true } diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index d246e2ce2..6b1b0613a 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -32,7 +32,7 @@ restate-invoker-impl = { workspace = true } restate-network = { workspace = true } restate-node-protocol = { workspace = true } restate-pb = { workspace = true, features = ["builtin-service"] } -restate-schema-api = { workspace = true, features = [ "component"] } +restate-schema-api = { workspace = true, features = [ "component", "subscription"] } restate-schema-impl = { workspace = true } restate-serde-util = { workspace = true, features = ["proto"] } restate-service-client = { workspace = true } @@ -44,7 +44,6 @@ restate-storage-rocksdb = { workspace = true } restate-timer = { workspace = true } restate-types = { workspace = true } restate-wal-protocol = { workspace = true } -restate-worker-api = { workspace = true } anyhow = { workspace = true } assert2 = { workspace = true } diff --git a/crates/worker/src/error.rs b/crates/worker/src/error.rs new file mode 100644 index 000000000..d6da320fb --- /dev/null +++ b/crates/worker/src/error.rs @@ -0,0 +1,15 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +#[derive(Debug, thiserror::Error)] +pub enum WorkerHandleError { + #[error("worker is unreachable")] + Unreachable, +} diff --git a/crates/worker/src/handle.rs b/crates/worker/src/handle.rs new file mode 100644 index 000000000..7fe265a1b --- /dev/null +++ b/crates/worker/src/handle.rs @@ -0,0 +1,29 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. +use std::future::Future; + +use restate_types::invocation::InvocationTermination; +use restate_types::state_mut::ExternalStateMutation; + +use crate::WorkerHandleError; + +pub trait WorkerHandle { + /// Send a command to terminate an invocation. This command is best-effort. + fn terminate_invocation( + &self, + invocation_termination: InvocationTermination, + ) -> impl Future> + Send; + + /// Send a command to mutate a state. This command is best-effort. + fn external_state_mutation( + &self, + mutation: ExternalStateMutation, + ) -> impl Future> + Send; +} diff --git a/crates/worker/src/lib.rs b/crates/worker/src/lib.rs index c84beaed8..1e21bc5dc 100644 --- a/crates/worker/src/lib.rs +++ b/crates/worker/src/lib.rs @@ -34,11 +34,17 @@ use std::ops::RangeInclusive; use std::path::Path; use tracing::debug; +mod error; +mod handle; mod invoker_integration; mod metric_definitions; mod partition; +mod subscription_controller; mod subscription_integration; +pub use error::*; +pub use handle::*; + pub use restate_ingress_http::{ Options as IngressOptions, OptionsBuilder as IngressOptionsBuilder, OptionsBuilderError as IngressOptionsBuilderError, @@ -51,6 +57,7 @@ pub use restate_invoker_impl::{ Options as InvokerOptions, OptionsBuilder as InvokerOptionsBuilder, OptionsBuilderError as InvokerOptionsBuilderError, }; +pub use subscription_controller::SubscriptionController; pub use restate_storage_rocksdb::{ Options as RocksdbOptions, OptionsBuilder as RocksdbOptionsBuilder, diff --git a/crates/worker-api/src/lib.rs b/crates/worker/src/subscription_controller.rs similarity index 54% rename from crates/worker-api/src/lib.rs rename to crates/worker/src/subscription_controller.rs index 4f28e8d5e..f1f813380 100644 --- a/crates/worker-api/src/lib.rs +++ b/crates/worker/src/subscription_controller.rs @@ -1,4 +1,4 @@ -// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH. +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. // All rights reserved. // // Use of this software is governed by the Business Source License @@ -8,17 +8,12 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::future::Future; + use restate_schema_api::subscription::Subscription; use restate_types::identifiers::SubscriptionId; -use restate_types::invocation::InvocationTermination; -use restate_types::state_mut::ExternalStateMutation; -use std::future::Future; -#[derive(Debug, thiserror::Error)] -pub enum Error { - #[error("worker is unreachable")] - Unreachable, -} +use crate::WorkerHandleError; // This is just an interface to isolate the interaction between meta and subscription controller. // Depending on how we evolve the Kafka ingress deployment, this might end up living in a separate process. @@ -26,30 +21,16 @@ pub trait SubscriptionController { fn start_subscription( &self, subscription: Subscription, - ) -> impl Future> + Send; + ) -> impl Future> + Send; fn stop_subscription( &self, id: SubscriptionId, - ) -> impl Future> + Send; + ) -> impl Future> + Send; /// Updates the subscription controller with the provided set of subscriptions. The subscription controller /// is supposed to only run the set of provided subscriptions after this call succeeds. fn update_subscriptions( &self, subscriptions: Vec, - ) -> impl Future> + Send; -} - -pub trait Handle { - /// Send a command to terminate an invocation. This command is best-effort. - fn terminate_invocation( - &self, - invocation_termination: InvocationTermination, - ) -> impl Future> + Send; - - /// Send a command to mutate a state. This command is best-effort. - fn external_state_mutation( - &self, - mutation: ExternalStateMutation, - ) -> impl Future> + Send; + ) -> impl Future> + Send; } diff --git a/crates/worker/src/subscription_integration.rs b/crates/worker/src/subscription_integration.rs index b50ca1f8b..1f49abe2b 100644 --- a/crates/worker/src/subscription_integration.rs +++ b/crates/worker/src/subscription_integration.rs @@ -8,10 +8,10 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use crate::{SubscriptionController, WorkerHandleError}; use restate_ingress_kafka::SubscriptionCommandSender; use restate_schema_api::subscription::{Subscription, SubscriptionValidator}; use restate_types::identifiers::SubscriptionId; -use restate_worker_api::{Error, SubscriptionController}; use std::ops::Deref; use std::sync::Arc; @@ -39,28 +39,34 @@ impl SubscriptionValidator for SubscriptionControllerHandle { } impl SubscriptionController for SubscriptionControllerHandle { - async fn start_subscription(&self, subscription: Subscription) -> Result<(), Error> { + async fn start_subscription( + &self, + subscription: Subscription, + ) -> Result<(), WorkerHandleError> { self.1 .send(restate_ingress_kafka::Command::StartSubscription( subscription, )) .await - .map_err(|_| Error::Unreachable) + .map_err(|_| WorkerHandleError::Unreachable) } - async fn stop_subscription(&self, id: SubscriptionId) -> Result<(), Error> { + async fn stop_subscription(&self, id: SubscriptionId) -> Result<(), WorkerHandleError> { self.1 .send(restate_ingress_kafka::Command::StopSubscription(id)) .await - .map_err(|_| Error::Unreachable) + .map_err(|_| WorkerHandleError::Unreachable) } - async fn update_subscriptions(&self, subscriptions: Vec) -> Result<(), Error> { + async fn update_subscriptions( + &self, + subscriptions: Vec, + ) -> Result<(), WorkerHandleError> { self.1 .send(restate_ingress_kafka::Command::UpdateSubscriptions( subscriptions, )) .await - .map_err(|_| Error::Unreachable) + .map_err(|_| WorkerHandleError::Unreachable) } } diff --git a/tools/xtask/Cargo.toml b/tools/xtask/Cargo.toml index 1bb66b4f6..07389cd9e 100644 --- a/tools/xtask/Cargo.toml +++ b/tools/xtask/Cargo.toml @@ -11,12 +11,12 @@ publish = false restate-admin = { workspace = true, features = ["options_schema"] } restate-bifrost = { workspace = true, features = ["options_schema"] } restate-core = { workspace = true, features = ["test-util"] } -restate-meta = { workspace = true } +restate-meta = { workspace = true, features = ["options_schema"] } restate-node-services = { workspace = true, features = ["clients"] } restate-schema-api = { workspace = true, features = ["subscription"] } restate-server = { workspace = true, features = ["options_schema"] } -restate-types = { workspace = true } -restate-worker-api = { workspace = true } +restate-types = { workspace = true, features = ["serde_schema"] } +restate-worker = { workspace = true, features = ["options_schema"] } anyhow = { workspace = true } drain = { workspace = true } diff --git a/tools/xtask/src/main.rs b/tools/xtask/src/main.rs index d483f895f..582f2f54c 100644 --- a/tools/xtask/src/main.rs +++ b/tools/xtask/src/main.rs @@ -21,7 +21,9 @@ use restate_types::identifiers::SubscriptionId; use restate_types::invocation::InvocationTermination; use restate_types::retries::RetryPolicy; use restate_types::state_mut::ExternalStateMutation; -use restate_worker_api::Error; +use restate_worker::SubscriptionController; +use restate_worker::WorkerHandle; +use restate_worker::WorkerHandleError; use schemars::gen::SchemaSettings; use std::env; use std::time::Duration; @@ -47,38 +49,38 @@ fn generate_default_config() -> anyhow::Result<()> { #[derive(Clone)] struct Mock; -impl restate_worker_api::Handle for Mock { +impl WorkerHandle for Mock { async fn terminate_invocation( &self, _: InvocationTermination, - ) -> Result<(), restate_worker_api::Error> { + ) -> Result<(), WorkerHandleError> { Ok(()) } - async fn external_state_mutation(&self, _mutation: ExternalStateMutation) -> Result<(), Error> { + async fn external_state_mutation( + &self, + _mutation: ExternalStateMutation, + ) -> Result<(), WorkerHandleError> { Ok(()) } } -impl restate_worker_api::SubscriptionController for Mock { - async fn start_subscription(&self, _: Subscription) -> Result<(), restate_worker_api::Error> { +impl SubscriptionController for Mock { + async fn start_subscription(&self, _: Subscription) -> Result<(), WorkerHandleError> { Ok(()) } - async fn stop_subscription(&self, _: SubscriptionId) -> Result<(), restate_worker_api::Error> { + async fn stop_subscription(&self, _: SubscriptionId) -> Result<(), WorkerHandleError> { Ok(()) } - async fn update_subscriptions( - &self, - _: Vec, - ) -> Result<(), restate_worker_api::Error> { + async fn update_subscriptions(&self, _: Vec) -> Result<(), WorkerHandleError> { Ok(()) } } impl restate_schema_api::subscription::SubscriptionValidator for Mock { - type Error = restate_worker_api::Error; + type Error = WorkerHandleError; fn validate(&self, _: Subscription) -> Result { unimplemented!()