Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge worker-api with worker crate #1330

Merged
merged 1 commit into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 1 addition & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ restate-tracing-instrumentation = { path = "crates/tracing-instrumentation" }
restate-types = { path = "crates/types" }
restate-wal-protocol = { path = "crates/wal-protocol" }
restate-worker = { path = "crates/worker" }
restate-worker-api = { path = "crates/worker-api" }

# External crates
ahash = "0.8.5"
Expand Down
1 change: 0 additions & 1 deletion crates/admin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ restate-service-client = { workspace = true }
restate-service-protocol = { workspace = true, features = ["discovery"] }
restate-types = { workspace = true, features = ["serde", "serde_schema"] }
restate-wal-protocol = { workspace = true }
restate-worker-api = { workspace = true }

anyhow = { workspace = true }
arrow-flight = { workspace = true }
Expand Down
3 changes: 0 additions & 3 deletions crates/admin/src/rest_api/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ pub enum MetaApiError {
SubscriptionNotFound(SubscriptionId),
#[error(transparent)]
Meta(#[from] MetaError),
#[error(transparent)]
Worker(#[from] restate_worker_api::Error),
#[error("Internal server error: {0}")]
Internal(String),
}
Expand All @@ -69,7 +67,6 @@ impl IntoResponse for MetaApiError {
| MetaApiError::DeploymentNotFound(_)
| MetaApiError::SubscriptionNotFound(_) => StatusCode::NOT_FOUND,
MetaApiError::InvalidField(_, _) => StatusCode::BAD_REQUEST,
MetaApiError::Worker(_) => StatusCode::SERVICE_UNAVAILABLE,
MetaApiError::Meta(MetaError::SchemaRegistry(schema_registry_error)) => {
match schema_registry_error.kind() {
ErrorKind::NotFound => StatusCode::NOT_FOUND,
Expand Down
1 change: 0 additions & 1 deletion crates/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ restate-storage-query-datafusion = { workspace = true }
restate-storage-rocksdb = { workspace = true }
restate-types = { workspace = true }
restate-worker = { workspace = true }
restate-worker-api = { workspace = true }

anyhow = { workspace = true }
arc-swap = { workspace = true }
Expand Down
4 changes: 2 additions & 2 deletions crates/node/src/roles/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use restate_storage_query_datafusion::context::QueryContext;
use restate_storage_rocksdb::RocksDBStorage;
use restate_types::net::AdvertisedAddress;
use restate_types::retries::RetryPolicy;
use restate_worker::SubscriptionController;
use restate_worker::{KafkaIngressOptions, SubscriptionControllerHandle, Worker};
use restate_worker_api::SubscriptionController;
use tracing::info;

use restate_worker::Options as WorkerOptions;
Expand Down Expand Up @@ -74,7 +74,7 @@ pub enum SchemaError {
),
#[error("failed updating subscriptions: {0}")]
#[code(unknown)]
Subscription(#[from] restate_worker_api::Error),
Subscription(#[from] restate_worker::WorkerHandleError),
}

#[derive(Debug, thiserror::Error, CodedError)]
Expand Down
14 changes: 0 additions & 14 deletions crates/worker-api/Cargo.toml

This file was deleted.

3 changes: 1 addition & 2 deletions crates/worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ restate-invoker-impl = { workspace = true }
restate-network = { workspace = true }
restate-node-protocol = { workspace = true }
restate-pb = { workspace = true, features = ["builtin-service"] }
restate-schema-api = { workspace = true, features = [ "component"] }
restate-schema-api = { workspace = true, features = [ "component", "subscription"] }
restate-schema-impl = { workspace = true }
restate-serde-util = { workspace = true, features = ["proto"] }
restate-service-client = { workspace = true }
Expand All @@ -44,7 +44,6 @@ restate-storage-rocksdb = { workspace = true }
restate-timer = { workspace = true }
restate-types = { workspace = true }
restate-wal-protocol = { workspace = true }
restate-worker-api = { workspace = true }

anyhow = { workspace = true }
assert2 = { workspace = true }
Expand Down
15 changes: 15 additions & 0 deletions crates/worker/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

#[derive(Debug, thiserror::Error)]
pub enum WorkerHandleError {
#[error("worker is unreachable")]
Unreachable,
}
29 changes: 29 additions & 0 deletions crates/worker/src/handle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
use std::future::Future;

use restate_types::invocation::InvocationTermination;
use restate_types::state_mut::ExternalStateMutation;

use crate::WorkerHandleError;

pub trait WorkerHandle {
/// Send a command to terminate an invocation. This command is best-effort.
fn terminate_invocation(
&self,
invocation_termination: InvocationTermination,
) -> impl Future<Output = Result<(), WorkerHandleError>> + Send;

/// Send a command to mutate a state. This command is best-effort.
fn external_state_mutation(
&self,
mutation: ExternalStateMutation,
) -> impl Future<Output = Result<(), WorkerHandleError>> + Send;
}
7 changes: 7 additions & 0 deletions crates/worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,17 @@ use std::ops::RangeInclusive;
use std::path::Path;
use tracing::debug;

mod error;
mod handle;
mod invoker_integration;
mod metric_definitions;
mod partition;
mod subscription_controller;
mod subscription_integration;

pub use error::*;
pub use handle::*;

pub use restate_ingress_http::{
Options as IngressOptions, OptionsBuilder as IngressOptionsBuilder,
OptionsBuilderError as IngressOptionsBuilderError,
Expand All @@ -51,6 +57,7 @@ pub use restate_invoker_impl::{
Options as InvokerOptions, OptionsBuilder as InvokerOptionsBuilder,
OptionsBuilderError as InvokerOptionsBuilderError,
};
pub use subscription_controller::SubscriptionController;

pub use restate_storage_rocksdb::{
Options as RocksdbOptions, OptionsBuilder as RocksdbOptionsBuilder,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH.
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
Expand All @@ -8,48 +8,29 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::future::Future;

use restate_schema_api::subscription::Subscription;
use restate_types::identifiers::SubscriptionId;
use restate_types::invocation::InvocationTermination;
use restate_types::state_mut::ExternalStateMutation;
use std::future::Future;

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("worker is unreachable")]
Unreachable,
}
use crate::WorkerHandleError;

// This is just an interface to isolate the interaction between meta and subscription controller.
// Depending on how we evolve the Kafka ingress deployment, this might end up living in a separate process.
pub trait SubscriptionController {
fn start_subscription(
&self,
subscription: Subscription,
) -> impl Future<Output = Result<(), Error>> + Send;
) -> impl Future<Output = Result<(), WorkerHandleError>> + Send;
fn stop_subscription(
&self,
id: SubscriptionId,
) -> impl Future<Output = Result<(), Error>> + Send;
) -> impl Future<Output = Result<(), WorkerHandleError>> + Send;

/// Updates the subscription controller with the provided set of subscriptions. The subscription controller
/// is supposed to only run the set of provided subscriptions after this call succeeds.
fn update_subscriptions(
&self,
subscriptions: Vec<Subscription>,
) -> impl Future<Output = Result<(), Error>> + Send;
}

pub trait Handle {
/// Send a command to terminate an invocation. This command is best-effort.
fn terminate_invocation(
&self,
invocation_termination: InvocationTermination,
) -> impl Future<Output = Result<(), Error>> + Send;

/// Send a command to mutate a state. This command is best-effort.
fn external_state_mutation(
&self,
mutation: ExternalStateMutation,
) -> impl Future<Output = Result<(), Error>> + Send;
) -> impl Future<Output = Result<(), WorkerHandleError>> + Send;
}
20 changes: 13 additions & 7 deletions crates/worker/src/subscription_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::{SubscriptionController, WorkerHandleError};
use restate_ingress_kafka::SubscriptionCommandSender;
use restate_schema_api::subscription::{Subscription, SubscriptionValidator};
use restate_types::identifiers::SubscriptionId;
use restate_worker_api::{Error, SubscriptionController};
use std::ops::Deref;
use std::sync::Arc;

Expand Down Expand Up @@ -39,28 +39,34 @@ impl SubscriptionValidator for SubscriptionControllerHandle {
}

impl SubscriptionController for SubscriptionControllerHandle {
async fn start_subscription(&self, subscription: Subscription) -> Result<(), Error> {
async fn start_subscription(
&self,
subscription: Subscription,
) -> Result<(), WorkerHandleError> {
self.1
.send(restate_ingress_kafka::Command::StartSubscription(
subscription,
))
.await
.map_err(|_| Error::Unreachable)
.map_err(|_| WorkerHandleError::Unreachable)
}

async fn stop_subscription(&self, id: SubscriptionId) -> Result<(), Error> {
async fn stop_subscription(&self, id: SubscriptionId) -> Result<(), WorkerHandleError> {
self.1
.send(restate_ingress_kafka::Command::StopSubscription(id))
.await
.map_err(|_| Error::Unreachable)
.map_err(|_| WorkerHandleError::Unreachable)
}

async fn update_subscriptions(&self, subscriptions: Vec<Subscription>) -> Result<(), Error> {
async fn update_subscriptions(
&self,
subscriptions: Vec<Subscription>,
) -> Result<(), WorkerHandleError> {
self.1
.send(restate_ingress_kafka::Command::UpdateSubscriptions(
subscriptions,
))
.await
.map_err(|_| Error::Unreachable)
.map_err(|_| WorkerHandleError::Unreachable)
}
}
6 changes: 3 additions & 3 deletions tools/xtask/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ publish = false
restate-admin = { workspace = true, features = ["options_schema"] }
restate-bifrost = { workspace = true, features = ["options_schema"] }
restate-core = { workspace = true, features = ["test-util"] }
restate-meta = { workspace = true }
restate-meta = { workspace = true, features = ["options_schema"] }
restate-node-services = { workspace = true, features = ["clients"] }
restate-schema-api = { workspace = true, features = ["subscription"] }
restate-server = { workspace = true, features = ["options_schema"] }
restate-types = { workspace = true }
restate-worker-api = { workspace = true }
restate-types = { workspace = true, features = ["serde_schema"] }
restate-worker = { workspace = true, features = ["options_schema"] }

anyhow = { workspace = true }
drain = { workspace = true }
Expand Down
Loading
Loading