Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Source::Subscription to identify the invocation comes from a subscription #2305

Merged
merged 5 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion crates/ingress-kafka/src/consumer_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,18 @@ impl KafkaDeduplicationId {
pub struct MessageSender {
subscription: Subscription,
dispatcher: KafkaIngressDispatcher,
experimental_feature_kafka_ingress_next: bool,

subscription_id: String,
ingress_request_counter: metrics::Counter,
}

impl MessageSender {
pub fn new(subscription: Subscription, dispatcher: KafkaIngressDispatcher) -> Self {
pub fn new(
subscription: Subscription,
dispatcher: KafkaIngressDispatcher,
experimental_feature_kafka_ingress_next: bool,
) -> Self {
Self {
subscription_id: subscription.id().to_string(),
ingress_request_counter: counter!(
Expand All @@ -104,6 +109,7 @@ impl MessageSender {
),
subscription,
dispatcher,
experimental_feature_kafka_ingress_next,
}
}

Expand Down Expand Up @@ -144,6 +150,7 @@ impl MessageSender {
deduplication_id,
deduplication_index,
headers,
self.experimental_feature_kafka_ingress_next,
)
.map_err(|cause| Error::Event {
topic: msg.topic().to_string(),
Expand Down
8 changes: 7 additions & 1 deletion crates/ingress-kafka/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub struct KafkaIngressEvent {
}

impl KafkaIngressEvent {
#[allow(clippy::too_many_arguments)]
pub fn new(
subscription: &Subscription,
key: Bytes,
Expand All @@ -47,6 +48,7 @@ impl KafkaIngressEvent {
deduplication_id: KafkaDeduplicationId,
deduplication_index: MessageIndex,
headers: Vec<restate_types::invocation::Header>,
experimental_feature_kafka_ingress_next: bool,
) -> Result<Self, anyhow::Error> {
// Check if we need to proxy or not
let proxying_partition_key = if KafkaDeduplicationId::requires_proxying(subscription) {
Expand Down Expand Up @@ -94,7 +96,11 @@ impl KafkaIngressEvent {
let mut service_invocation = ServiceInvocation::initialize(
invocation_id,
invocation_target,
restate_types::invocation::Source::Ingress(PartitionProcessorRpcRequestId::new()),
if experimental_feature_kafka_ingress_next {
restate_types::invocation::Source::Subscription(subscription.id())
} else {
restate_types::invocation::Source::Ingress(PartitionProcessorRpcRequestId::new())
},
);
service_invocation.with_related_span(related_span);
service_invocation.argument = argument;
Expand Down
6 changes: 5 additions & 1 deletion crates/ingress-kafka/src/subscription_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,11 @@ impl Service {
task_center(),
client_config,
vec![topic.to_string()],
MessageSender::new(subscription, self.dispatcher.clone()),
MessageSender::new(
subscription,
self.dispatcher.clone(),
options.experimental_feature_kafka_ingress_next(),
),
);

task_orchestrator.start(subscription_id, consumer_task);
Expand Down
5 changes: 5 additions & 0 deletions crates/storage-api/proto/dev/restate/storage/v1/domain.proto
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,15 @@ message Source {
InvocationTarget invocation_target = 2;
}

message Subscription {
bytes subscription_id = 1;
}

oneof source {
Ingress ingress = 9;
Service service = 10;
google.protobuf.Empty internal = 11;
Subscription subscription = 12;
}
}

Expand Down
13 changes: 13 additions & 0 deletions crates/storage-api/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1343,6 +1343,14 @@ pub mod v1 {
// TODO this should become an hard error in Restate 1.3
.unwrap_or_default(),
),
source::Source::Subscription(subscription) => {
restate_types::invocation::Source::Subscription(
restate_types::identifiers::SubscriptionId::from_slice(
&subscription.subscription_id,
)
.map_err(|e| ConversionError::invalid_data(e))?,
)
}
source::Source::Service(service) => restate_types::invocation::Source::Service(
restate_types::identifiers::InvocationId::try_from(
service
Expand Down Expand Up @@ -1370,6 +1378,11 @@ pub mod v1 {
rpc_id: rpc_id.to_bytes().to_vec().into(),
})
}
restate_types::invocation::Source::Subscription(sub_id) => {
source::Source::Subscription(source::Subscription {
subscription_id: sub_id.to_bytes().to_vec().into(),
})
}
restate_types::invocation::Source::Service(
invocation_id,
invocation_target,
Expand Down
4 changes: 4 additions & 0 deletions crates/storage-query-datafusion/src/invocation_status/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ fn fill_invoked_by(row: &mut SysInvocationStatusRowBuilder, output: &mut String,
Source::Internal => {
row.invoked_by("restate");
}
Source::Subscription(sub_id) => {
row.invoked_by("subscription");
row.invoked_by_subscription_id(format_using(output, &sub_id))
}
}
}

Expand Down
21 changes: 12 additions & 9 deletions crates/storage-query-datafusion/src/invocation_status/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,22 @@ define_table!(sys_invocation_status(
/// Idempotency key, if any.
idempotency_key: DataType::LargeUtf8,

/// Either `ingress` if the service was invoked externally or `service` if the service was
/// invoked by another Restate service.
/// Either:
/// * `ingress` if the invocation was created externally.
/// * `service` if the invocation was created by another Restate service.
/// * `subscription` if the invocation was created by a subscription (e.g. Kafka).
invoked_by: DataType::LargeUtf8,

/// The name of the invoking service. Or `null` if invoked externally.
invoked_by_service_name: DataType::LargeUtf8,

/// The caller [Invocation ID](/operate/invocation#invocation-identifier) if the service was
/// invoked by another Restate service. Or `null` if invoked externally.
/// The caller [Invocation ID](/operate/invocation#invocation-identifier) if `invoked_by = 'service'`.
invoked_by_id: DataType::LargeUtf8,

/// The caller invocation target if the service was invoked by another Restate service. Or
/// `null` if invoked externally.
/// The subscription id if `invoked_by = 'subscription'`.
invoked_by_subscription_id: DataType::LargeUtf8,

/// The name of caller service if `invoked_by = 'service'`.
invoked_by_service_name: DataType::LargeUtf8,

/// The caller invocation target if `invoked_by = 'service'`.
invoked_by_target: DataType::LargeUtf8,

/// The ID of the service deployment that started processing this invocation, and will continue
Expand Down
1 change: 1 addition & 0 deletions crates/storage-query-datafusion/src/table_docs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ pub fn sys_invocation_table_docs() -> OwnedTableDocs {
sys_invocation_status.remove("invoked_by").expect("invoked_by should exist"),
sys_invocation_status.remove("invoked_by_service_name").expect("invoked_by_service_name should exist"),
sys_invocation_status.remove("invoked_by_id").expect("invoked_by_id should exist"),
sys_invocation_status.remove("invoked_by_subscription_id").expect("invoked_by_subscription_id should exist"),
sys_invocation_status.remove("invoked_by_target").expect("invoked_by_target should exist"),
sys_invocation_status.remove("pinned_deployment_id").expect("pinned_deployment_id should exist"),
sys_invocation_status.remove("pinned_service_protocol_version").expect("pinned_service_protocol_version should exist"),
Expand Down
1 change: 1 addition & 0 deletions crates/types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ notify-debouncer-mini = { version = "0.4.1" }
num-traits = { version = "0.2.17" }
once_cell = { workspace = true }
opentelemetry = { workspace = true }
paste = { workspace = true }
prost = { workspace = true }
prost-dto = { workspace = true }
prost-types = { workspace = true }
Expand Down
9 changes: 9 additions & 0 deletions crates/types/src/config/ingress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ pub struct IngressOptions {
/// back to a previous version.
#[cfg_attr(feature = "schemars", schemars(skip))]
pub experimental_feature_enable_separate_ingress_role: bool,

/// Cluster of new features for the kafka ingress.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you gonna expand on what this cluster of new features are as you are tackling the other issues?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put it in the commit. I won't put them in the code until i get them done 😄 Also check https://github.com/orgs/restatedev/projects/15/views/13?sliceBy%5Bvalue%5D=ingress_kafka

#[cfg_attr(feature = "schemars", schemars(skip))]
experimental_feature_kafka_ingress_next: bool,
}

impl IngressOptions {
Expand All @@ -65,6 +69,10 @@ impl IngressOptions {
Semaphore::MAX_PERMITS - 1,
)
}

pub fn experimental_feature_kafka_ingress_next(&self) -> bool {
self.experimental_feature_kafka_ingress_next
}
}

impl Default for IngressOptions {
Expand All @@ -75,6 +83,7 @@ impl Default for IngressOptions {
concurrent_api_requests_limit: None,
kafka_clusters: Default::default(),
experimental_feature_enable_separate_ingress_role: false,
experimental_feature_kafka_ingress_next: false,
}
}
}
73 changes: 4 additions & 69 deletions crates/types/src/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,76 +8,8 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::fmt;
use std::mem::size_of;
use std::str::FromStr;

use ulid::Ulid;

use crate::base62_util::base62_max_length_for_type;
use crate::errors::IdDecodeError;
use crate::id_util::{IdDecoder, IdEncoder, IdResourceType};
use crate::identifiers::{DeploymentId, ResourceId, TimestampAwareId};
use crate::identifiers::DeploymentId;
use crate::service_protocol::ServiceProtocolVersion;
use crate::time::MillisSinceEpoch;

impl ResourceId for DeploymentId {
const SIZE_IN_BYTES: usize = size_of::<u128>();
const RESOURCE_TYPE: IdResourceType = IdResourceType::Deployment;
const STRING_CAPACITY_HINT: usize = base62_max_length_for_type::<u128>();
fn push_contents_to_encoder(&self, encoder: &mut IdEncoder<Self>) {
let ulid_raw: u128 = self.0.into();
encoder.encode_fixed_width(ulid_raw);
}
}

impl TimestampAwareId for DeploymentId {
fn timestamp(&self) -> MillisSinceEpoch {
self.0.timestamp_ms().into()
}
}

impl FromStr for DeploymentId {
type Err = IdDecodeError;

fn from_str(input: &str) -> Result<Self, Self::Err> {
let mut decoder = IdDecoder::new(input)?;
// Ensure we are decoding the correct resource type
if decoder.resource_type != Self::RESOURCE_TYPE {
return Err(IdDecodeError::TypeMismatch);
}

// ulid (u128)
let raw_ulid: u128 = decoder.cursor.decode_next()?;
Ok(Self::from(raw_ulid))
}
}

impl fmt::Display for DeploymentId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut encoder = IdEncoder::<Self>::new();
self.push_contents_to_encoder(&mut encoder);
fmt::Display::fmt(&encoder.finalize(), f)
}
}

impl From<u128> for DeploymentId {
fn from(value: u128) -> Self {
Self(Ulid::from(value))
}
}

// Passthrough json schema to the string
#[cfg(feature = "schemars")]
impl schemars::JsonSchema for DeploymentId {
fn schema_name() -> String {
<String as schemars::JsonSchema>::schema_name()
}

fn json_schema(g: &mut schemars::gen::SchemaGenerator) -> schemars::schema::Schema {
<String as schemars::JsonSchema>::json_schema(g)
}
}

/// Deployment which was chosen to run an invocation on.
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
Expand All @@ -102,6 +34,9 @@ impl PinnedDeployment {
mod tests {
use super::*;

use crate::identifiers::{ResourceId, TimestampAwareId};
use crate::IdEncoder;

#[test]
fn test_deployment_id_format() {
let a = DeploymentId::new();
Expand Down
Loading
Loading