Skip to content

Commit

Permalink
Merge worker-api with worker crate
Browse files Browse the repository at this point in the history
  • Loading branch information
AhmedSoliman committed Apr 2, 2024
1 parent 0ae7c6b commit a348db0
Show file tree
Hide file tree
Showing 15 changed files with 92 additions and 85 deletions.
14 changes: 1 addition & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ restate-tracing-instrumentation = { path = "crates/tracing-instrumentation" }
restate-types = { path = "crates/types" }
restate-wal-protocol = { path = "crates/wal-protocol" }
restate-worker = { path = "crates/worker" }
restate-worker-api = { path = "crates/worker-api" }

# External crates
ahash = "0.8.5"
Expand Down
1 change: 0 additions & 1 deletion crates/admin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ restate-service-client = { workspace = true }
restate-service-protocol = { workspace = true, features = ["discovery"] }
restate-types = { workspace = true, features = ["serde", "serde_schema"] }
restate-wal-protocol = { workspace = true }
restate-worker-api = { workspace = true }

anyhow = { workspace = true }
arrow-flight = { workspace = true }
Expand Down
3 changes: 0 additions & 3 deletions crates/admin/src/rest_api/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ pub enum MetaApiError {
SubscriptionNotFound(SubscriptionId),
#[error(transparent)]
Meta(#[from] MetaError),
#[error(transparent)]
Worker(#[from] restate_worker_api::Error),
#[error("Internal server error: {0}")]
Internal(String),
}
Expand All @@ -69,7 +67,6 @@ impl IntoResponse for MetaApiError {
| MetaApiError::DeploymentNotFound(_)
| MetaApiError::SubscriptionNotFound(_) => StatusCode::NOT_FOUND,
MetaApiError::InvalidField(_, _) => StatusCode::BAD_REQUEST,
MetaApiError::Worker(_) => StatusCode::SERVICE_UNAVAILABLE,
MetaApiError::Meta(MetaError::SchemaRegistry(schema_registry_error)) => {
match schema_registry_error.kind() {
ErrorKind::NotFound => StatusCode::NOT_FOUND,
Expand Down
1 change: 0 additions & 1 deletion crates/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ restate-storage-query-datafusion = { workspace = true }
restate-storage-rocksdb = { workspace = true }
restate-types = { workspace = true }
restate-worker = { workspace = true }
restate-worker-api = { workspace = true }

anyhow = { workspace = true }
arc-swap = { workspace = true }
Expand Down
4 changes: 2 additions & 2 deletions crates/node/src/roles/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use restate_storage_query_datafusion::context::QueryContext;
use restate_storage_rocksdb::RocksDBStorage;
use restate_types::net::AdvertisedAddress;
use restate_types::retries::RetryPolicy;
use restate_worker::SubscriptionController;
use restate_worker::{KafkaIngressOptions, SubscriptionControllerHandle, Worker};
use restate_worker_api::SubscriptionController;
use tracing::info;

use restate_worker::Options as WorkerOptions;
Expand Down Expand Up @@ -74,7 +74,7 @@ pub enum SchemaError {
),
#[error("failed updating subscriptions: {0}")]
#[code(unknown)]
Subscription(#[from] restate_worker_api::Error),
Subscription(#[from] restate_worker::WorkerHandleError),
}

#[derive(Debug, thiserror::Error, CodedError)]
Expand Down
14 changes: 0 additions & 14 deletions crates/worker-api/Cargo.toml

This file was deleted.

3 changes: 1 addition & 2 deletions crates/worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ restate-invoker-impl = { workspace = true }
restate-network = { workspace = true }
restate-node-protocol = { workspace = true }
restate-pb = { workspace = true, features = ["builtin-service"] }
restate-schema-api = { workspace = true, features = [ "component"] }
restate-schema-api = { workspace = true, features = [ "component", "subscription"] }
restate-schema-impl = { workspace = true }
restate-serde-util = { workspace = true, features = ["proto"] }
restate-service-client = { workspace = true }
Expand All @@ -44,7 +44,6 @@ restate-storage-rocksdb = { workspace = true }
restate-timer = { workspace = true }
restate-types = { workspace = true }
restate-wal-protocol = { workspace = true }
restate-worker-api = { workspace = true }

anyhow = { workspace = true }
assert2 = { workspace = true }
Expand Down
15 changes: 15 additions & 0 deletions crates/worker/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

#[derive(Debug, thiserror::Error)]
pub enum WorkerHandleError {
#[error("worker is unreachable")]
Unreachable,
}
29 changes: 29 additions & 0 deletions crates/worker/src/handle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
use std::future::Future;

use restate_types::invocation::InvocationTermination;
use restate_types::state_mut::ExternalStateMutation;

use crate::WorkerHandleError;

pub trait WorkerHandle {
/// Send a command to terminate an invocation. This command is best-effort.
fn terminate_invocation(
&self,
invocation_termination: InvocationTermination,
) -> impl Future<Output = Result<(), WorkerHandleError>> + Send;

/// Send a command to mutate a state. This command is best-effort.
fn external_state_mutation(
&self,
mutation: ExternalStateMutation,
) -> impl Future<Output = Result<(), WorkerHandleError>> + Send;
}
7 changes: 7 additions & 0 deletions crates/worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,17 @@ use std::ops::RangeInclusive;
use std::path::Path;
use tracing::debug;

mod error;
mod handle;
mod invoker_integration;
mod metric_definitions;
mod partition;
mod subscription_controller;
mod subscription_integration;

pub use error::*;
pub use handle::*;

pub use restate_ingress_http::{
Options as IngressOptions, OptionsBuilder as IngressOptionsBuilder,
OptionsBuilderError as IngressOptionsBuilderError,
Expand All @@ -51,6 +57,7 @@ pub use restate_invoker_impl::{
Options as InvokerOptions, OptionsBuilder as InvokerOptionsBuilder,
OptionsBuilderError as InvokerOptionsBuilderError,
};
pub use subscription_controller::SubscriptionController;

pub use restate_storage_rocksdb::{
Options as RocksdbOptions, OptionsBuilder as RocksdbOptionsBuilder,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH.
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
Expand All @@ -8,48 +8,29 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::future::Future;

use restate_schema_api::subscription::Subscription;
use restate_types::identifiers::SubscriptionId;
use restate_types::invocation::InvocationTermination;
use restate_types::state_mut::ExternalStateMutation;
use std::future::Future;

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("worker is unreachable")]
Unreachable,
}
use crate::WorkerHandleError;

// This is just an interface to isolate the interaction between meta and subscription controller.
// Depending on how we evolve the Kafka ingress deployment, this might end up living in a separate process.
pub trait SubscriptionController {
fn start_subscription(
&self,
subscription: Subscription,
) -> impl Future<Output = Result<(), Error>> + Send;
) -> impl Future<Output = Result<(), WorkerHandleError>> + Send;
fn stop_subscription(
&self,
id: SubscriptionId,
) -> impl Future<Output = Result<(), Error>> + Send;
) -> impl Future<Output = Result<(), WorkerHandleError>> + Send;

/// Updates the subscription controller with the provided set of subscriptions. The subscription controller
/// is supposed to only run the set of provided subscriptions after this call succeeds.
fn update_subscriptions(
&self,
subscriptions: Vec<Subscription>,
) -> impl Future<Output = Result<(), Error>> + Send;
}

pub trait Handle {
/// Send a command to terminate an invocation. This command is best-effort.
fn terminate_invocation(
&self,
invocation_termination: InvocationTermination,
) -> impl Future<Output = Result<(), Error>> + Send;

/// Send a command to mutate a state. This command is best-effort.
fn external_state_mutation(
&self,
mutation: ExternalStateMutation,
) -> impl Future<Output = Result<(), Error>> + Send;
) -> impl Future<Output = Result<(), WorkerHandleError>> + Send;
}
20 changes: 13 additions & 7 deletions crates/worker/src/subscription_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::{SubscriptionController, WorkerHandleError};
use restate_ingress_kafka::SubscriptionCommandSender;
use restate_schema_api::subscription::{Subscription, SubscriptionValidator};
use restate_types::identifiers::SubscriptionId;
use restate_worker_api::{Error, SubscriptionController};
use std::ops::Deref;
use std::sync::Arc;

Expand Down Expand Up @@ -39,28 +39,34 @@ impl SubscriptionValidator for SubscriptionControllerHandle {
}

impl SubscriptionController for SubscriptionControllerHandle {
async fn start_subscription(&self, subscription: Subscription) -> Result<(), Error> {
async fn start_subscription(
&self,
subscription: Subscription,
) -> Result<(), WorkerHandleError> {
self.1
.send(restate_ingress_kafka::Command::StartSubscription(
subscription,
))
.await
.map_err(|_| Error::Unreachable)
.map_err(|_| WorkerHandleError::Unreachable)
}

async fn stop_subscription(&self, id: SubscriptionId) -> Result<(), Error> {
async fn stop_subscription(&self, id: SubscriptionId) -> Result<(), WorkerHandleError> {
self.1
.send(restate_ingress_kafka::Command::StopSubscription(id))
.await
.map_err(|_| Error::Unreachable)
.map_err(|_| WorkerHandleError::Unreachable)
}

async fn update_subscriptions(&self, subscriptions: Vec<Subscription>) -> Result<(), Error> {
async fn update_subscriptions(
&self,
subscriptions: Vec<Subscription>,
) -> Result<(), WorkerHandleError> {
self.1
.send(restate_ingress_kafka::Command::UpdateSubscriptions(
subscriptions,
))
.await
.map_err(|_| Error::Unreachable)
.map_err(|_| WorkerHandleError::Unreachable)
}
}
6 changes: 3 additions & 3 deletions tools/xtask/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ publish = false
restate-admin = { workspace = true, features = ["options_schema"] }
restate-bifrost = { workspace = true, features = ["options_schema"] }
restate-core = { workspace = true, features = ["test-util"] }
restate-meta = { workspace = true }
restate-meta = { workspace = true, features = ["options_schema"] }
restate-node-services = { workspace = true, features = ["clients"] }
restate-schema-api = { workspace = true, features = ["subscription"] }
restate-server = { workspace = true, features = ["options_schema"] }
restate-types = { workspace = true }
restate-worker-api = { workspace = true }
restate-types = { workspace = true, features = ["serde_schema"] }
restate-worker = { workspace = true, features = ["options_schema"] }

anyhow = { workspace = true }
drain = { workspace = true }
Expand Down
Loading

0 comments on commit a348db0

Please sign in to comment.