Skip to content

Commit

Permalink
Post rebase conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper committed Feb 22, 2024
1 parent fe07270 commit 670213b
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 31 deletions.
4 changes: 3 additions & 1 deletion crates/ingress-dispatcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ use prost::Message;
use restate_pb::restate::Event;
use restate_schema_api::subscription::{EventReceiverServiceInstanceType, Sink, Subscription};
use restate_types::errors::InvocationError;
use restate_types::identifiers::{FullInvocationId, InvocationUuid, PeerId, WithPartitionKey, ServiceId};
use restate_types::identifiers::{
FullInvocationId, InvocationUuid, PeerId, ServiceId, WithPartitionKey,
};
use restate_types::invocation::{ServiceInvocation, ServiceInvocationSpanContext, SpanRelation};
use restate_types::message::{AckKind, MessageIndex};
use restate_types::{GenerationalNodeId, Version};
Expand Down
10 changes: 4 additions & 6 deletions crates/worker/src/partition/leadership/action_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ use futures::{Stream, StreamExt};
use prost::Message;
use restate_errors::NotRunningError;
use restate_invoker_api::ServiceHandle;
use restate_types::identifiers::{
FullInvocationId, InvocationUuid, PartitionLeaderEpoch, WithPartitionKey,
};
use restate_types::identifiers::{FullInvocationId, PartitionLeaderEpoch, WithPartitionKey};
use restate_types::invocation::{ServiceInvocation, Source, SpanRelation};
use restate_types::journal::CompletionResult;
use restate_types::NodeId;
Expand Down Expand Up @@ -193,7 +191,7 @@ where
// We need this to agree on the invocation uuid, which is randomly generated
// We could get rid of it if invocation uuids are deterministically generated.
let service_invocation = ServiceInvocation::new(
FullInvocationId::with_service_id(target_service, InvocationUuid::new()),
FullInvocationId::generate(target_service),
method_name,
journal_notification_request,
Source::Internal,
Expand All @@ -216,10 +214,10 @@ where
// We need this to agree on the invocation uuid, which is randomly generated
// We could get rid of it if invocation uuids are deterministically generated.
let service_invocation = ServiceInvocation::new(
FullInvocationId::with_service_id(target_service, InvocationUuid::new()),
FullInvocationId::generate(target_service),
method_name,
restate_pb::restate::internal::KillNotificationRequest {
invocation_uuid: invocation_uuid.into(),
invocation_uuid: invocation_id.invocation_uuid().into(),
}
.encode_to_vec(),
Source::Internal,
Expand Down
20 changes: 9 additions & 11 deletions crates/worker/src/partition/services/non_deterministic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ impl StateAndJournalTransitions {
for key in order {
let ((invocation_id, entry_index), journal_entry) =
self.journal_entries.remove_entry(&key).unwrap();
effects.push(BuiltInServiceEffect::StoreEntry {
effects.push(BuiltinServiceEffect::StoreEntry {
invocation_id,
entry_index,
journal_entry,
Expand Down Expand Up @@ -205,12 +205,13 @@ impl<S: StateReader> InvocationContext<'_, S> {
completion_notification_target: NotificationTarget,
kill_notification_target: NotificationTarget,
) {
self.effects_buffer.push(BuiltInServiceEffect::CreateJournal {
invocation_id,
span_context,
completion_notification_target,
kill_notification_target,
});
self.effects_buffer
.push(BuiltinServiceEffect::CreateJournal {
invocation_id,
span_context,
completion_notification_target,
kill_notification_target,
});
}

async fn read_journal_entry(
Expand Down Expand Up @@ -390,10 +391,7 @@ mod tests {
BuiltinServiceEffect::ClearState(key) => {
self.0.remove(&key.to_string());
}
BuiltinServiceEffect::CreateJournal {
span_context,
..
} => {
BuiltinServiceEffect::CreateJournal { span_context, .. } => {
self.1 = Some((
JournalMetadata::new(0, span_context.clone()),
Vec::default(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,6 @@ where
) -> Result<(), Error> {
match nbis_effect {
BuiltinServiceEffect::CreateJournal {
invocation_id,
invocation_id,
span_context,
completion_notification_target,
Expand Down
12 changes: 4 additions & 8 deletions crates/worker/src/partition/state_machine/effects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::partition::types::TimerKeyDisplay;
use crate::partition::TimerValue;
use bytes::Bytes;
use bytestring::ByteString;
use opentelemetry_api::trace::SpanId;
Expand All @@ -18,12 +16,6 @@ use restate_storage_api::invocation_status_table::InvocationMetadata;
use restate_storage_api::invocation_status_table::{
InvocationStatus, JournalMetadata, NotificationTarget,
};
use restate_types::identifiers::WithPartitionKey;
use restate_types::journal::{Completion, CompletionResult};
use std::collections::HashSet;
use std::fmt;
use std::vec::Drain;
use tracing::{debug_span, event_enabled, span_enabled, trace, trace_span, Level};
use restate_storage_api::outbox_table::OutboxMessage;
use restate_storage_api::timer_table::{Timer, TimerKey};
use restate_types::errors::InvocationErrorCode;
Expand All @@ -41,6 +33,10 @@ use restate_types::state_mut::ExternalStateMutation;
use restate_types::time::MillisSinceEpoch;
use restate_wal_protocol::timer::TimerKeyDisplay;
use restate_wal_protocol::timer::TimerValue;
use std::collections::HashSet;
use std::fmt;
use std::vec::Drain;
use tracing::{debug_span, event_enabled, span_enabled, trace, trace_span, Level};

#[derive(Debug)]
pub(crate) enum Effect {
Expand Down
7 changes: 3 additions & 4 deletions crates/worker/src/partition/state_machine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {
mod tests {
use super::*;

use crate::partition::services::non_deterministic::{Effect, Effects as NBISEffects};
use crate::partition::types::{InvokerEffect, InvokerEffectKind};
use bytes::Bytes;
use bytestring::ByteString;
Expand Down Expand Up @@ -591,7 +590,7 @@ mod tests {
.apply(Command::BuiltInInvokerEffect(BuiltinServiceEffects::new(
fid_virtual_invocation_creator,
vec![BuiltinServiceEffect::CreateJournal {
invocation_id: virtual_invocation_id.clone(),
invocation_id: virtual_invocation_id.clone(),
span_context: Default::default(),
completion_notification_target: notification_service_target.clone(),
kill_notification_target: notification_service_target.clone(),
Expand Down Expand Up @@ -666,7 +665,7 @@ mod tests {
fid_virtual_invocation_creator,
vec![
BuiltinServiceEffect::StoreEntry {
invocation_id: virtual_invocation_id.clone(),
invocation_id: virtual_invocation_id.clone(),
entry_index: 0,
journal_entry: ProtobufRawEntryCodec::serialize_enriched(
Entry::Awakeable(AwakeableEntry { result: None }),
Expand Down Expand Up @@ -708,7 +707,7 @@ mod tests {
// Now send completion
let actions = state_machine
.apply(Command::InvocationResponse(InvocationResponse {
id: MaybeFullInvocationId::Partial(virtual_invocation_id.clone()),
id: MaybeFullInvocationId::Partial(virtual_invocation_id.clone()),
entry_index: 0,
result: ResponseResult::Success(Bytes::from_static(b"123")),
}))
Expand Down

0 comments on commit 670213b

Please sign in to comment.