Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Compat] Fixes for a variety of backward compatibility issues #1920

Merged
merged 1 commit into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 13 additions & 2 deletions crates/bifrost/src/providers/local_loglet/log_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,22 @@ impl LogStateUpdates {

flexbuffers_storage_encode_decode!(LogStateUpdates);

/// DEPRECATED in v1.0
#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
enum SealReason {
Resharding,
Other(String),
}

#[derive(Debug, Default, Serialize, Deserialize)]
pub struct LogState {
pub release_pointer: u32,
pub trim_point: u32,
pub seal: bool,
// deprecated and unused. Kept for v1 compatibility.
#[serde(default)]
seal: Option<SealReason>,
#[serde(default)]
pub sealed: bool,
Comment on lines +144 to +147
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simple as that! Thank you for fixing this.

}

impl LogState {
Expand Down Expand Up @@ -209,7 +220,7 @@ pub fn log_state_full_merge(
log_state.trim_point = log_state.trim_point.max(offset);
}
LogStateUpdate::Seal => {
log_state.seal = true;
log_state.sealed = true;
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/bifrost/src/providers/local_loglet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl LocalLoglet {
let next_write_offset_raw = log_state.release_pointer + 1;
let next_write_offset = Mutex::new(LogletOffset::from(next_write_offset_raw));
let release_pointer = LogletOffset::from(log_state.release_pointer);
let sealed = AtomicBool::new(log_state.seal);
let sealed = AtomicBool::new(log_state.sealed);
let append_latency = histogram!(BIFROST_LOCAL_APPEND_DURATION);
let loglet = Self {
loglet_id,
Expand All @@ -98,7 +98,7 @@ impl LocalLoglet {
last_committed_offset,
sealed,
tail_watch: TailOffsetWatch::new(TailState::new(
log_state.seal,
log_state.sealed,
release_pointer.next(),
)),
append_latency,
Expand Down
1 change: 1 addition & 0 deletions crates/bifrost/src/providers/local_loglet/record_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ pub(super) struct LegacyPayload {
// default is Keys::None which means that records written prior to this field will not be
// filtered out. Partition processors will continue to filter correctly using the extracted
// keys from Envelope, but will not take advantage of push-down filtering.
#[serde(default)]
pub keys: Keys,
}

Expand Down
1 change: 1 addition & 0 deletions crates/types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ regress = { workspace = true }
schemars = { workspace = true, optional = true }
serde = { workspace = true }
serde_json = { workspace = true }
serde_path_to_error = { version = "0.1" }
serde_with = { workspace = true }
sha2 = { workspace = true }
static_assertions = { workspace = true }
Expand Down
18 changes: 15 additions & 3 deletions crates/types/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use downcast_rs::{impl_downcast, DowncastSync};
use serde::de::{DeserializeOwned, Error as DeserializationError};
use serde::ser::Error as SerializationError;
use serde::Serialize;
use tracing::error;

use crate::errors::GenericError;

Expand Down Expand Up @@ -165,6 +166,7 @@ macro_rules! flexbuffers_storage_encode_decode {
match kind {
$crate::storage::StorageCodecKind::FlexbuffersSerde => {
$crate::storage::decode_from_flexbuffers(buf).map_err(|err| {
::tracing::error!(%err, "Flexbuffers decode failure (decoding {})", stringify!($name));
$crate::storage::StorageDecodeError::DecodeValue(err.into())
})
}
Expand Down Expand Up @@ -325,14 +327,24 @@ pub fn decode_from_flexbuffers<T: DeserializeOwned, B: Buf>(
}

if buf.chunk().len() >= length {
let result = flexbuffers::from_slice(buf.chunk())?;
let deserializer = flexbuffers::Reader::get_root(buf.chunk())?;
// todo: inject the path into the error message and propagate upwards
let result = serde_path_to_error::deserialize(deserializer).map_err(|err| {
error!(%err, "Flexbuffers error at field {}", err.path());
err.into_inner()
})?;
buf.advance(length);

Ok(result)
} else {
// need to allocate contiguous buffer of length for flexbuffers
let bytes = buf.copy_to_bytes(length);
flexbuffers::from_slice(&bytes)
let deserializer = flexbuffers::Reader::get_root(bytes.chunk())?;
// todo: inject the path into the error message and propagate upwards
let result = serde_path_to_error::deserialize(deserializer).map_err(|err| {
error!(%err, "Flexbuffers error at field {}", err.path());
err.into_inner()
})?;
Ok(result)
}
}

Expand Down
20 changes: 18 additions & 2 deletions crates/wal-protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use restate_types::invocation::{
};
use restate_types::message::MessageIndex;
use restate_types::state_mut::ExternalStateMutation;
use restate_types::{flexbuffers_storage_encode_decode, logs, Version};
use restate_types::{flexbuffers_storage_encode_decode, logs, PlainNodeId, Version};

use crate::control::AnnounceLeader;
use crate::timer::TimerKeyValue;
Expand Down Expand Up @@ -82,7 +82,11 @@ pub enum Source {
/// epochs lower than the max observed for a given partition id.
leader_epoch: LeaderEpoch,
/// Which node is this message from?
node_id: GenerationalNodeId,
/// deprecated(v1.1): use generational_node_id instead.
node_id: PlainNodeId,
/// From v1.1 this is always set, but maintained to support rollback to v1.0.
#[serde(default)]
generational_node_id: Option<GenerationalNodeId>,
},
/// Message is sent from an ingress node
Ingress {
Expand All @@ -100,6 +104,18 @@ pub enum Source {
},
}

impl Source {
pub fn is_processor_generational(&self) -> bool {
match self {
Source::Processor {
generational_node_id,
..
} => generational_node_id.is_some(),
_ => false,
}
}
}

/// Identifies the intended destination of the message
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
Expand Down
4 changes: 3 additions & 1 deletion crates/worker/src/partition/action_effect_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ impl ActionEffectHandler {
let esn = self.epoch_sequence_number.next();
self.epoch_sequence_number = esn;

let my_node_id = self.metadata.my_node_id();
Header {
dest: Destination::Processor {
partition_key,
Expand All @@ -138,7 +139,8 @@ impl ActionEffectHandler {
partition_id: self.partition_id,
partition_key: Some(partition_key),
leader_epoch: self.epoch_sequence_number.leader_epoch,
node_id: self.metadata.my_node_id(),
node_id: my_node_id.as_plain(),
generational_node_id: Some(my_node_id),
},
}
}
Expand Down
3 changes: 2 additions & 1 deletion crates/worker/src/partition/cleaner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ where
partition_id,
partition_key: None,
leader_epoch,
node_id,
node_id: node_id.as_plain(),
generational_node_id: Some(node_id),
};

let mut interval = tokio::time::interval(cleanup_interval);
Expand Down
4 changes: 3 additions & 1 deletion crates/worker/src/partition/leadership.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,9 @@ where
.start(),
),
leader_epoch,
node_id: self.partition_processor_metadata.node_id,
// Kept for backward compatibility.
node_id: self.partition_processor_metadata.node_id.as_plain(),
generational_node_id: Some(self.partition_processor_metadata.node_id),
},
};

Expand Down
7 changes: 5 additions & 2 deletions crates/worker/src/partition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,9 +380,12 @@ where
action_collector.clear();

self.status.last_observed_leader_epoch = Some(announce_leader.leader_epoch);
if let Source::Processor { node_id, .. } = header.source {
if header.source.is_processor_generational() {
let Source::Processor { generational_node_id, .. } = header.source else {
unreachable!("processor source must have generational_node_id");
};
// all new AnnounceLeader messages should come from a PartitionProcessor
self.status.last_observed_leader_node = Some(node_id);
self.status.last_observed_leader_node = generational_node_id;
} else if announce_leader.node_id.is_some() {
// older AnnounceLeader messages have the announce_leader.node_id set
self.status.last_observed_leader_node = announce_leader.node_id;
Expand Down
3 changes: 2 additions & 1 deletion crates/worker/src/partition/shuffle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ fn create_header(
partition_id: shuffle_metadata.partition_id,
partition_key: None,
leader_epoch: shuffle_metadata.leader_epoch,
node_id: shuffle_metadata.node_id,
node_id: shuffle_metadata.node_id.as_plain(),
generational_node_id: Some(shuffle_metadata.node_id),
},
dest: Destination::Processor {
partition_key: dest_partition_key,
Expand Down
Loading