diff --git a/crates/bifrost/src/bifrost.rs b/crates/bifrost/src/bifrost.rs index b9c88780f..c577b39e3 100644 --- a/crates/bifrost/src/bifrost.rs +++ b/crates/bifrost/src/bifrost.rs @@ -16,7 +16,7 @@ use enum_map::EnumMap; use restate_core::{Metadata, MetadataKind, TargetVersion}; use restate_types::logs::metadata::{MaybeSegment, ProviderKind, Segment}; -use restate_types::logs::{KeyFilter, LogId, Lsn, SequenceNumber}; +use restate_types::logs::{KeyFilter, LogId, Lsn, SequenceNumber, TailState}; use restate_types::storage::StorageEncode; use restate_types::Version; @@ -25,7 +25,7 @@ use crate::background_appender::BackgroundAppender; use crate::loglet::LogletProvider; use crate::loglet_wrapper::LogletWrapper; use crate::watchdog::WatchdogSender; -use crate::{Error, FindTailAttributes, InputRecord, LogReadStream, Result, TailState}; +use crate::{Error, FindTailAttributes, InputRecord, LogReadStream, Result}; /// Bifrost is Restate's durable interconnect system /// diff --git a/crates/bifrost/src/bifrost_admin.rs b/crates/bifrost/src/bifrost_admin.rs index a1d76a429..0b384a44c 100644 --- a/crates/bifrost/src/bifrost_admin.rs +++ b/crates/bifrost/src/bifrost_admin.rs @@ -17,12 +17,12 @@ use restate_metadata_store::MetadataStoreClient; use restate_types::config::Configuration; use restate_types::logs::builder::BuilderError; use restate_types::logs::metadata::{LogletParams, Logs, ProviderKind, SegmentIndex}; -use restate_types::logs::{LogId, Lsn}; +use restate_types::logs::{LogId, Lsn, TailState}; use restate_types::metadata_store::keys::BIFROST_CONFIG_KEY; use restate_types::Version; use crate::error::AdminError; -use crate::{Bifrost, Error, Result, TailState}; +use crate::{Bifrost, Error, Result}; /// Bifrost's Admin API #[derive(Clone, Copy)] diff --git a/crates/bifrost/src/loglet/loglet_tests.rs b/crates/bifrost/src/loglet/loglet_tests.rs index b829cca07..135eeec63 100644 --- a/crates/bifrost/src/loglet/loglet_tests.rs +++ b/crates/bifrost/src/loglet/loglet_tests.rs @@ -14,19 +14,19 @@ use std::sync::Arc; use std::time::Duration; use googletest::prelude::*; -use restate_types::logs::metadata::SegmentIndex; use tokio::sync::Barrier; use tokio::task::{JoinHandle, JoinSet}; - -use restate_test_util::let_assert; -use restate_types::logs::{KeyFilter, Lsn, SequenceNumber}; use tokio_stream::StreamExt; use tracing::info; +use restate_test_util::let_assert; +use restate_types::logs::metadata::SegmentIndex; +use restate_types::logs::{KeyFilter, Lsn, SequenceNumber, TailState}; + use super::{Loglet, LogletOffset}; use crate::loglet::AppendError; use crate::loglet_wrapper::LogletWrapper; -use crate::{setup_panic_handler, TailState}; +use crate::setup_panic_handler; async fn wait_for_trim(loglet: &LogletWrapper, required_trim_point: Lsn) -> anyhow::Result<()> { for _ in 0..3 { diff --git a/crates/bifrost/src/loglet/mod.rs b/crates/bifrost/src/loglet/mod.rs index 77df9ece3..7ffc259b4 100644 --- a/crates/bifrost/src/loglet/mod.rs +++ b/crates/bifrost/src/loglet/mod.rs @@ -18,7 +18,6 @@ pub mod util; pub use error::*; use futures::stream::BoxStream; pub use provider::{LogletProvider, LogletProviderFactory}; -use restate_core::ShutdownError; use tokio::sync::oneshot; use std::pin::Pin; @@ -28,10 +27,11 @@ use std::task::{ready, Poll}; use async_trait::async_trait; use futures::{FutureExt, Stream}; -use restate_types::logs::{KeyFilter, LogletOffset, Record}; +use restate_core::ShutdownError; +use restate_types::logs::{KeyFilter, LogletOffset, Record, TailState}; use crate::LogEntry; -use crate::{Result, TailState}; +use crate::Result; /// A loglet represents a logical log stream provided by a provider implementation. /// diff --git a/crates/bifrost/src/loglet/util.rs b/crates/bifrost/src/loglet/util.rs index d16039b5e..37f7f8807 100644 --- a/crates/bifrost/src/loglet/util.rs +++ b/crates/bifrost/src/loglet/util.rs @@ -11,8 +11,9 @@ use tokio::sync::watch; use tokio_stream::wrappers::WatchStream; +use restate_types::logs::TailState; + use super::LogletOffset; -use crate::TailState; #[derive(Debug, Clone)] pub struct TailOffsetWatch { @@ -43,6 +44,10 @@ impl TailOffsetWatch { self.sender.borrow().offset() } + pub fn get(&self) -> watch::Ref<'_, TailState> { + self.sender.borrow() + } + pub fn is_sealed(&self) -> bool { self.sender.borrow().is_sealed() } diff --git a/crates/bifrost/src/loglet_wrapper.rs b/crates/bifrost/src/loglet_wrapper.rs index 39413b1e7..38028b56b 100644 --- a/crates/bifrost/src/loglet_wrapper.rs +++ b/crates/bifrost/src/loglet_wrapper.rs @@ -18,12 +18,12 @@ use tracing::instrument; use restate_core::ShutdownError; use restate_types::logs::metadata::SegmentIndex; -use restate_types::logs::Record; use restate_types::logs::{KeyFilter, LogletOffset, Lsn, SequenceNumber}; +use restate_types::logs::{Record, TailState}; use crate::loglet::{AppendError, Loglet, OperationError, SendableLogletReadStream}; +use crate::Result; use crate::{Commit, LogEntry, LsnExt}; -use crate::{Result, TailState}; #[cfg(any(test, feature = "test-util"))] #[derive(Debug, Clone, thiserror::Error)] diff --git a/crates/bifrost/src/providers/local_loglet/mod.rs b/crates/bifrost/src/providers/local_loglet/mod.rs index d7314c806..2ce78cdd3 100644 --- a/crates/bifrost/src/providers/local_loglet/mod.rs +++ b/crates/bifrost/src/providers/local_loglet/mod.rs @@ -29,7 +29,7 @@ use tokio::sync::Mutex; use tracing::{debug, warn}; use restate_core::ShutdownError; -use restate_types::logs::{KeyFilter, LogletOffset, Record, SequenceNumber}; +use restate_types::logs::{KeyFilter, LogletOffset, Record, SequenceNumber, TailState}; use self::log_store::LogStoreError; use self::log_store::RocksDbLogStore; @@ -41,7 +41,7 @@ use crate::loglet::{Loglet, LogletCommit, OperationError, SendableLogletReadStre use crate::providers::local_loglet::metric_definitions::{ BIFROST_LOCAL_TRIM, BIFROST_LOCAL_TRIM_LENGTH, }; -use crate::{Result, TailState}; +use crate::Result; #[derive(derive_more::Debug)] struct LocalLoglet { diff --git a/crates/bifrost/src/providers/local_loglet/read_stream.rs b/crates/bifrost/src/providers/local_loglet/read_stream.rs index cb9a079ce..518ff1c46 100644 --- a/crates/bifrost/src/providers/local_loglet/read_stream.rs +++ b/crates/bifrost/src/providers/local_loglet/read_stream.rs @@ -20,12 +20,12 @@ use tracing::{debug, error, warn}; use restate_core::ShutdownError; use restate_rocksdb::RocksDbPerfGuard; -use restate_types::logs::{KeyFilter, LogletOffset, SequenceNumber}; +use restate_types::logs::{KeyFilter, LogletOffset, SequenceNumber, TailState}; use crate::loglet::{Loglet, LogletReadStream, OperationError}; use crate::providers::local_loglet::record_format::decode_and_filter_record; use crate::providers::local_loglet::LogStoreError; -use crate::{LogEntry, Result, TailState}; +use crate::{LogEntry, Result}; use super::keys::RecordKey; use super::LocalLoglet; diff --git a/crates/bifrost/src/providers/memory_loglet.rs b/crates/bifrost/src/providers/memory_loglet.rs index 848a2d038..8358d9cd6 100644 --- a/crates/bifrost/src/providers/memory_loglet.rs +++ b/crates/bifrost/src/providers/memory_loglet.rs @@ -23,15 +23,17 @@ use tracing::{debug, info}; use restate_core::ShutdownError; use restate_types::logs::metadata::{LogletParams, ProviderKind, SegmentIndex}; -use restate_types::logs::{KeyFilter, LogId, LogletOffset, MatchKeyQuery, Record, SequenceNumber}; +use restate_types::logs::{ + KeyFilter, LogId, LogletOffset, MatchKeyQuery, Record, SequenceNumber, TailState, +}; use crate::loglet::util::TailOffsetWatch; use crate::loglet::{ Loglet, LogletCommit, LogletProvider, LogletProviderFactory, LogletReadStream, OperationError, SendableLogletReadStream, }; +use crate::LogEntry; use crate::Result; -use crate::{LogEntry, TailState}; #[derive(Default)] pub struct Factory { diff --git a/crates/bifrost/src/read_stream.rs b/crates/bifrost/src/read_stream.rs index 27519716b..2d212acdf 100644 --- a/crates/bifrost/src/read_stream.rs +++ b/crates/bifrost/src/read_stream.rs @@ -27,6 +27,7 @@ use restate_types::logs::metadata::MaybeSegment; use restate_types::logs::KeyFilter; use restate_types::logs::MatchKeyQuery; use restate_types::logs::SequenceNumber; +use restate_types::logs::TailState; use restate_types::logs::{LogId, Lsn}; use restate_types::Version; use restate_types::Versioned; @@ -38,7 +39,6 @@ use crate::loglet_wrapper::LogletReadStreamWrapper; use crate::Error; use crate::LogEntry; use crate::Result; -use crate::TailState; /// A read stream reads from the virtual log. The stream provides a unified view over /// the virtual log addressing space in the face of seals, reconfiguration, and trims. diff --git a/crates/bifrost/src/types.rs b/crates/bifrost/src/types.rs index 3fea9ec97..616b38e1f 100644 --- a/crates/bifrost/src/types.rs +++ b/crates/bifrost/src/types.rs @@ -78,81 +78,6 @@ pub struct FindTailAttributes { // TODO: consistent_read: bool, } -/// Represents the state of the tail of the loglet. -#[derive(Clone, Debug)] -pub enum TailState { - /// Loglet is open for appends - Open(Offset), - /// Loglet is sealed. This offset if the durable tail. - Sealed(Offset), -} - -impl TailState { - pub fn new(sealed: bool, offset: Offset) -> Self { - if sealed { - TailState::Sealed(offset) - } else { - TailState::Open(offset) - } - } - - /// Combines two TailStates together - /// - /// Only applies updates to the value according to the following rules: - /// - Offsets can only move forward. - /// - Tail cannot be unsealed once sealed. - /// - /// Returns true if the state was updated - pub fn combine(&mut self, sealed: bool, offset: Offset) -> bool { - let old_offset = self.offset(); - let is_already_sealed = self.is_sealed(); - - let new_offset = std::cmp::max(self.offset(), offset); - let new_sealed = self.is_sealed() || sealed; - if new_sealed != is_already_sealed || new_offset > old_offset { - *self = TailState::new(new_sealed, new_offset); - true - } else { - false - } - } - - /// Applies a seal on the tail state without changing the tail offset - /// Returns true if the state was updated - pub fn seal(&mut self) -> bool { - if self.is_sealed() { - false - } else { - *self = TailState::new(true, self.offset()); - true - } - } -} - -impl TailState { - pub fn map(self, f: F) -> TailState - where - F: FnOnce(Offset) -> T, - { - match self { - TailState::Open(offset) => TailState::Open(f(offset)), - TailState::Sealed(offset) => TailState::Sealed(f(offset)), - } - } - - #[inline(always)] - pub fn is_sealed(&self) -> bool { - matches!(self, TailState::Sealed(_)) - } - - #[inline(always)] - pub fn offset(&self) -> Offset { - match self { - TailState::Open(offset) | TailState::Sealed(offset) => *offset, - } - } -} - /// A future that resolves to the Lsn of the last Lsn in a committed batch. /// /// Note: dropping this future doesn't cancel or stop the underlying enqueued append. diff --git a/crates/node/src/network_server/handler/cluster_ctrl.rs b/crates/node/src/network_server/handler/cluster_ctrl.rs index 2e945a1f2..082cdd4c6 100644 --- a/crates/node/src/network_server/handler/cluster_ctrl.rs +++ b/crates/node/src/network_server/handler/cluster_ctrl.rs @@ -114,8 +114,8 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler { Ok(Response::new(DescribeLogResponse { chain: serialize_value(chain), tail_state: match tail_state { - restate_bifrost::TailState::Open(_) => 1, - restate_bifrost::TailState::Sealed(_) => 2, + restate_types::logs::TailState::Open(_) => 1, + restate_types::logs::TailState::Sealed(_) => 2, }, tail_offset: tail_state.offset().as_u64(), })) diff --git a/crates/types/src/logs/mod.rs b/crates/types/src/logs/mod.rs index 8fa285307..f88b2b83d 100644 --- a/crates/types/src/logs/mod.rs +++ b/crates/types/src/logs/mod.rs @@ -19,7 +19,9 @@ use crate::storage::StorageEncode; pub mod builder; pub mod metadata; mod record; +mod tail; pub use record::Record; +pub use tail::*; #[derive( Debug, diff --git a/crates/types/src/logs/tail.rs b/crates/types/src/logs/tail.rs new file mode 100644 index 000000000..6449d3abc --- /dev/null +++ b/crates/types/src/logs/tail.rs @@ -0,0 +1,85 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. +/// Represents the state of the tail of the loglet. +use super::{Lsn, SequenceNumber}; + +#[derive(Clone, Debug)] +pub enum TailState { + /// Loglet is open for appends + Open(Offset), + /// Loglet is sealed. This offset if the durable tail. + Sealed(Offset), +} + +impl TailState { + pub fn new(sealed: bool, offset: Offset) -> Self { + if sealed { + TailState::Sealed(offset) + } else { + TailState::Open(offset) + } + } + + /// Combines two TailStates together + /// + /// Only applies updates to the value according to the following rules: + /// - Offsets can only move forward. + /// - Tail cannot be unsealed once sealed. + /// + /// Returns true if the state was updated + pub fn combine(&mut self, sealed: bool, offset: Offset) -> bool { + let old_offset = self.offset(); + let is_already_sealed = self.is_sealed(); + + let new_offset = std::cmp::max(self.offset(), offset); + let new_sealed = self.is_sealed() || sealed; + if new_sealed != is_already_sealed || new_offset > old_offset { + *self = TailState::new(new_sealed, new_offset); + true + } else { + false + } + } + + /// Applies a seal on the tail state without changing the tail offset + /// Returns true if the state was updated + pub fn seal(&mut self) -> bool { + if self.is_sealed() { + false + } else { + *self = TailState::new(true, self.offset()); + true + } + } +} + +impl TailState { + pub fn map(self, f: F) -> TailState + where + F: FnOnce(Offset) -> T, + { + match self { + TailState::Open(offset) => TailState::Open(f(offset)), + TailState::Sealed(offset) => TailState::Sealed(f(offset)), + } + } + + #[inline(always)] + pub fn is_sealed(&self) -> bool { + matches!(self, TailState::Sealed(_)) + } + + #[inline(always)] + pub fn offset(&self) -> Offset { + match self { + TailState::Open(offset) | TailState::Sealed(offset) => *offset, + } + } +}