From dab0a87c8332f1b7f378297bd26711c4e68ca5ec Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Mon, 22 Jul 2024 16:03:46 +0100 Subject: [PATCH 1/6] [Bifrost] Decouples loglet dependencies from bifrost A restructure the allows finer-grain control over what types loglet providers have access to. This will be exploited more in upcoming PRs. --- crates/bifrost/Cargo.toml | 2 +- crates/bifrost/src/bifrost.rs | 11 +- crates/bifrost/src/error.rs | 9 +- crates/bifrost/src/lib.rs | 11 +- crates/bifrost/src/loglet/error.rs | 18 ++ .../bifrost/src/{ => loglet}/loglet_tests.rs | 2 +- .../bifrost/src/{loglet.rs => loglet/mod.rs} | 149 ++-------------- crates/bifrost/src/{ => loglet}/provider.rs | 3 +- .../bifrost/src/{loglets => loglet}/util.rs | 2 +- crates/bifrost/src/loglet_wrapper.rs | 159 ++++++++++++++++++ .../local_loglet/keys.rs | 0 .../local_loglet/log_state.rs | 2 +- .../local_loglet/log_store.rs | 0 .../local_loglet/log_store_writer.rs | 5 +- .../local_loglet/metric_definitions.rs | 0 .../local_loglet/mod.rs | 6 +- .../local_loglet/provider.rs | 7 +- .../local_loglet/read_stream.rs | 2 +- .../{loglets => providers}/memory_loglet.rs | 17 +- .../bifrost/src/{loglets => providers}/mod.rs | 2 +- .../replicated_loglet/metric_definitions.rs | 0 .../replicated_loglet/mod.rs | 0 .../replicated_loglet/provider.rs | 7 +- crates/bifrost/src/read_stream.rs | 4 +- crates/bifrost/src/service.rs | 4 +- crates/bifrost/src/watchdog.rs | 2 +- crates/node/src/lib.rs | 2 +- 27 files changed, 234 insertions(+), 192 deletions(-) create mode 100644 crates/bifrost/src/loglet/error.rs rename crates/bifrost/src/{ => loglet}/loglet_tests.rs (99%) rename crates/bifrost/src/{loglet.rs => loglet/mod.rs} (57%) rename crates/bifrost/src/{ => loglet}/provider.rs (95%) rename crates/bifrost/src/{loglets => loglet}/util.rs (97%) create mode 100644 crates/bifrost/src/loglet_wrapper.rs rename crates/bifrost/src/{loglets => providers}/local_loglet/keys.rs (100%) rename crates/bifrost/src/{loglets => providers}/local_loglet/log_state.rs (98%) rename crates/bifrost/src/{loglets => providers}/local_loglet/log_store.rs (100%) rename crates/bifrost/src/{loglets => providers}/local_loglet/log_store_writer.rs (99%) rename crates/bifrost/src/{loglets => providers}/local_loglet/metric_definitions.rs (100%) rename crates/bifrost/src/{loglets => providers}/local_loglet/mod.rs (99%) rename crates/bifrost/src/{loglets => providers}/local_loglet/provider.rs (95%) rename crates/bifrost/src/{loglets => providers}/local_loglet/read_stream.rs (99%) rename crates/bifrost/src/{loglets => providers}/memory_loglet.rs (97%) rename crates/bifrost/src/{loglets => providers}/mod.rs (92%) rename crates/bifrost/src/{loglets => providers}/replicated_loglet/metric_definitions.rs (100%) rename crates/bifrost/src/{loglets => providers}/replicated_loglet/mod.rs (100%) rename crates/bifrost/src/{loglets => providers}/replicated_loglet/provider.rs (93%) diff --git a/crates/bifrost/Cargo.toml b/crates/bifrost/Cargo.toml index 09fcae0c4..e63350be9 100644 --- a/crates/bifrost/Cargo.toml +++ b/crates/bifrost/Cargo.toml @@ -8,7 +8,7 @@ license.workspace = true publish = false [features] -default = ["replicated-loglet"] +default = [] options_schema = ["dep:schemars"] replicated-loglet = ["restate-types/replicated-loglet", "restate-metadata-store"] test-util = [] diff --git a/crates/bifrost/src/bifrost.rs b/crates/bifrost/src/bifrost.rs index d8923bc51..df0484f64 100644 --- a/crates/bifrost/src/bifrost.rs +++ b/crates/bifrost/src/bifrost.rs @@ -24,10 +24,11 @@ use restate_types::logs::{LogId, Lsn, Payload, SequenceNumber}; use restate_types::storage::StorageCodec; use restate_types::Version; -use crate::loglet::{LogletBase, LogletWrapper}; +use crate::loglet::{LogletBase, LogletProvider}; +use crate::loglet_wrapper::LogletWrapper; use crate::watchdog::WatchdogSender; use crate::{ - Error, FindTailAttributes, LogReadStream, LogRecord, LogletProvider, Result, TailState, + Error, FindTailAttributes, LogReadStream, LogRecord, Result, TailState, SMALL_BATCH_THRESHOLD_COUNT, }; @@ -48,7 +49,7 @@ impl Bifrost { #[cfg(any(test, feature = "test-util"))] pub async fn init_in_memory(metadata: Metadata) -> Self { - use crate::loglets::memory_loglet; + use crate::providers::memory_loglet; Self::init_with_factory(metadata, memory_loglet::Factory::default()).await } @@ -75,7 +76,7 @@ impl Bifrost { #[cfg(any(test, feature = "test-util"))] pub async fn init_with_factory( metadata: Metadata, - factory: impl crate::LogletProviderFactory, + factory: impl crate::loglet::LogletProviderFactory, ) -> Self { use crate::BifrostService; @@ -436,7 +437,7 @@ mod tests { use super::*; - use crate::loglets::memory_loglet::{self}; + use crate::providers::memory_loglet::{self}; use googletest::prelude::*; use crate::{Record, TrimGap}; diff --git a/crates/bifrost/src/error.rs b/crates/bifrost/src/error.rs index 3789fc373..997ed338d 100644 --- a/crates/bifrost/src/error.rs +++ b/crates/bifrost/src/error.rs @@ -13,7 +13,7 @@ use std::sync::Arc; use restate_types::logs::{LogId, Lsn}; -use crate::loglets::local_loglet::LogStoreError; +use crate::providers::local_loglet::LogStoreError; use crate::types::SealReason; /// Result type for bifrost operations. @@ -38,10 +38,3 @@ pub enum Error { #[error("bifrost provider '{0}' is disabled or unrecognized")] Disabled(String), } - -#[derive(Debug, thiserror::Error)] -#[error(transparent)] -pub enum ProviderError { - Shutdown(#[from] ShutdownError), - Other(#[from] anyhow::Error), -} diff --git a/crates/bifrost/src/lib.rs b/crates/bifrost/src/lib.rs index b57757a38..d4d024665 100644 --- a/crates/bifrost/src/lib.rs +++ b/crates/bifrost/src/lib.rs @@ -10,11 +10,9 @@ mod bifrost; mod error; -mod loglet; -#[cfg(test)] -mod loglet_tests; -pub mod loglets; -mod provider; +pub mod loglet; +mod loglet_wrapper; +pub mod providers; mod read_stream; mod record; mod service; @@ -22,8 +20,7 @@ mod types; mod watchdog; pub use bifrost::Bifrost; -pub use error::{Error, ProviderError, Result}; -pub use provider::*; +pub use error::{Error, Result}; pub use read_stream::LogReadStream; pub use record::*; pub use service::BifrostService; diff --git a/crates/bifrost/src/loglet/error.rs b/crates/bifrost/src/loglet/error.rs new file mode 100644 index 000000000..1204586a8 --- /dev/null +++ b/crates/bifrost/src/loglet/error.rs @@ -0,0 +1,18 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use restate_core::ShutdownError; + +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +pub enum ProviderError { + Shutdown(#[from] ShutdownError), + Other(#[from] anyhow::Error), +} diff --git a/crates/bifrost/src/loglet_tests.rs b/crates/bifrost/src/loglet/loglet_tests.rs similarity index 99% rename from crates/bifrost/src/loglet_tests.rs rename to crates/bifrost/src/loglet/loglet_tests.rs index 31db38a95..8c1113d3e 100644 --- a/crates/bifrost/src/loglet_tests.rs +++ b/crates/bifrost/src/loglet/loglet_tests.rs @@ -21,7 +21,7 @@ use restate_test_util::let_assert; use restate_types::logs::SequenceNumber; use tracing::info; -use crate::loglet::{Loglet, LogletOffset}; +use super::{Loglet, LogletOffset}; use crate::{LogRecord, Record, TrimGap}; fn setup() { diff --git a/crates/bifrost/src/loglet.rs b/crates/bifrost/src/loglet/mod.rs similarity index 57% rename from crates/bifrost/src/loglet.rs rename to crates/bifrost/src/loglet/mod.rs index b600e7b58..38afecddf 100644 --- a/crates/bifrost/src/loglet.rs +++ b/crates/bifrost/src/loglet/mod.rs @@ -8,10 +8,19 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +mod error; +#[cfg(test)] +pub mod loglet_tests; +mod provider; +pub(crate) mod util; + +// exports +pub use error::*; +pub use provider::{LogletProvider, LogletProviderFactory}; + use std::ops::Add; use std::pin::Pin; use std::sync::Arc; -use std::task::Poll; use async_trait::async_trait; use bytes::Bytes; @@ -19,7 +28,7 @@ use futures::Stream; use restate_types::logs::{Lsn, SequenceNumber}; -use crate::{LogRecord, LsnExt}; +use crate::LogRecord; use crate::{Result, TailState}; // Inner loglet offset @@ -79,6 +88,7 @@ impl SequenceNumber for LogletOffset { /// ^ Last Committed /// ^ -- Last released - can be delivered to readers /// +/// /// An empty loglet. A log is empty when trim_point.next() == tail.prev() /// /// Semantics of offsets @@ -94,42 +104,6 @@ impl SequenceNumber for LogletOffset { pub trait Loglet: LogletBase {} impl Loglet for T where T: LogletBase {} -/// Wraps loglets with the base LSN of the segment -#[derive(Clone, Debug)] -pub struct LogletWrapper { - /// The offset of the first record in the segment (if exists). - /// A segment on a clean chain is created with Lsn::OLDEST but this doesn't mean that this - /// record exists. It only means that we want to offset the loglet offsets by base_lsn - - /// Loglet::Offset::OLDEST. - pub(crate) base_lsn: Lsn, - loglet: Arc, -} - -impl LogletWrapper { - pub fn new(base_lsn: Lsn, loglet: Arc) -> Self { - Self { base_lsn, loglet } - } - - pub async fn create_wrapped_read_stream( - self, - start_lsn: Lsn, - ) -> Result { - // Translates LSN to loglet offset - Ok(LogletReadStreamWrapper::new( - self.loglet - .create_read_stream(start_lsn.into_offset(self.base_lsn)) - .await?, - self.base_lsn, - )) - } -} - -impl PartialEq for LogletWrapper { - fn eq(&self, other: &Self) -> bool { - self.base_lsn == other.base_lsn && Arc::ptr_eq(&self.loglet, &other.loglet) - } -} - #[async_trait] pub trait LogletBase: Send + Sync + std::fmt::Debug { type Offset: SequenceNumber; @@ -194,102 +168,3 @@ pub trait LogletReadStream: Stream = Pin + Send>>; - -#[async_trait] -impl LogletBase for LogletWrapper { - type Offset = Lsn; - - /// This should never be used directly. Instead, use `create_wrapped_read_stream()` instead. - async fn create_read_stream( - self: Arc, - _after: Self::Offset, - ) -> Result> { - unreachable!("create_read_stream on LogletWrapper should never be used directly") - } - - async fn append(&self, data: Bytes) -> Result { - let offset = self.loglet.append(data).await?; - // Return the LSN given the loglet offset. - Ok(self.base_lsn.offset_by(offset)) - } - - async fn append_batch(&self, payloads: &[Bytes]) -> Result { - let offset = self.loglet.append_batch(payloads).await?; - Ok(self.base_lsn.offset_by(offset)) - } - - async fn find_tail(&self) -> Result> { - Ok(self - .loglet - .find_tail() - .await? - .map(|o| self.base_lsn.offset_by(o))) - } - - async fn get_trim_point(&self) -> Result> { - let offset = self.loglet.get_trim_point().await?; - Ok(offset.map(|o| self.base_lsn.offset_by(o))) - } - - // trim_point is inclusive. - async fn trim(&self, trim_point: Self::Offset) -> Result<()> { - // trimming to INVALID is no-op - if trim_point == Self::Offset::INVALID { - return Ok(()); - } - let trim_point = trim_point.into_offset(self.base_lsn); - self.loglet.trim(trim_point).await - } - - async fn read_next_single(&self, from: Lsn) -> Result> { - // convert LSN to loglet offset - let offset = from.into_offset(self.base_lsn); - self.loglet - .read_next_single(offset) - .await - .map(|record| record.with_base_lsn(self.base_lsn)) - } - - async fn read_next_single_opt( - &self, - from: Self::Offset, - ) -> Result>> { - let offset = from.into_offset(self.base_lsn); - self.loglet - .read_next_single_opt(offset) - .await - .map(|maybe_record| maybe_record.map(|record| record.with_base_lsn(self.base_lsn))) - } -} - -/// Wraps loglet read streams with the base LSN of the segment -pub struct LogletReadStreamWrapper { - pub(crate) base_lsn: Lsn, - inner: SendableLogletReadStream, -} - -impl LogletReadStreamWrapper { - pub fn new(inner: SendableLogletReadStream, base_lsn: Lsn) -> Self { - Self { inner, base_lsn } - } -} - -impl Stream for LogletReadStreamWrapper { - type Item = Result>; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - match self.inner.as_mut().poll_next(cx) { - Poll::Ready(Some(Ok(record))) => { - Poll::Ready(Some(Ok(record.with_base_lsn(self.base_lsn)))) - } - Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))), - Poll::Ready(None) => Poll::Ready(None), - Poll::Pending => Poll::Pending, - } - } -} - -static_assertions::assert_impl_all!(LogletWrapper: Send, Sync, Clone); diff --git a/crates/bifrost/src/provider.rs b/crates/bifrost/src/loglet/provider.rs similarity index 95% rename from crates/bifrost/src/provider.rs rename to crates/bifrost/src/loglet/provider.rs index 9ef032622..65e75965a 100644 --- a/crates/bifrost/src/provider.rs +++ b/crates/bifrost/src/loglet/provider.rs @@ -14,8 +14,7 @@ use async_trait::async_trait; use restate_types::logs::metadata::{LogletParams, ProviderKind}; -use crate::loglet::Loglet; -use crate::ProviderError; +use super::{Loglet, ProviderError}; use crate::Result; #[async_trait] diff --git a/crates/bifrost/src/loglets/util.rs b/crates/bifrost/src/loglet/util.rs similarity index 97% rename from crates/bifrost/src/loglets/util.rs rename to crates/bifrost/src/loglet/util.rs index ac2d595ce..e3d19e859 100644 --- a/crates/bifrost/src/loglets/util.rs +++ b/crates/bifrost/src/loglet/util.rs @@ -13,7 +13,7 @@ use tokio_stream::wrappers::WatchStream; use restate_core::ShutdownError; -use crate::loglet::LogletOffset; +use super::LogletOffset; #[derive(Debug)] pub struct OffsetWatch { diff --git a/crates/bifrost/src/loglet_wrapper.rs b/crates/bifrost/src/loglet_wrapper.rs new file mode 100644 index 000000000..affb4c127 --- /dev/null +++ b/crates/bifrost/src/loglet_wrapper.rs @@ -0,0 +1,159 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::task::Poll; + +use std::pin::Pin; +use std::sync::Arc; + +use async_trait::async_trait; +use bytes::Bytes; +use futures::Stream; + +use restate_types::logs::{Lsn, SequenceNumber}; + +use crate::loglet::{Loglet, LogletBase, LogletOffset, SendableLogletReadStream}; +use crate::{LogRecord, LsnExt}; +use crate::{Result, TailState}; + +/// Wraps loglets with the base LSN of the segment +#[derive(Clone, Debug)] +pub struct LogletWrapper { + /// The offset of the first record in the segment (if exists). + /// A segment on a clean chain is created with Lsn::OLDEST but this doesn't mean that this + /// record exists. It only means that we want to offset the loglet offsets by base_lsn - + /// Loglet::Offset::OLDEST. + pub(crate) base_lsn: Lsn, + loglet: Arc, +} + +impl LogletWrapper { + pub fn new(base_lsn: Lsn, loglet: Arc) -> Self { + Self { base_lsn, loglet } + } + + pub async fn create_wrapped_read_stream( + self, + start_lsn: Lsn, + ) -> Result { + // Translates LSN to loglet offset + Ok(LogletReadStreamWrapper::new( + self.loglet + .create_read_stream(start_lsn.into_offset(self.base_lsn)) + .await?, + self.base_lsn, + )) + } +} + +impl PartialEq for LogletWrapper { + fn eq(&self, other: &Self) -> bool { + self.base_lsn == other.base_lsn && Arc::ptr_eq(&self.loglet, &other.loglet) + } +} + +#[async_trait] +impl LogletBase for LogletWrapper { + type Offset = Lsn; + + /// This should never be used directly. Instead, use `create_wrapped_read_stream()` instead. + async fn create_read_stream( + self: Arc, + _after: Self::Offset, + ) -> Result> { + unreachable!("create_read_stream on LogletWrapper should never be used directly") + } + + async fn append(&self, data: Bytes) -> Result { + let offset = self.loglet.append(data).await?; + // Return the LSN given the loglet offset. + Ok(self.base_lsn.offset_by(offset)) + } + + async fn append_batch(&self, payloads: &[Bytes]) -> Result { + let offset = self.loglet.append_batch(payloads).await?; + Ok(self.base_lsn.offset_by(offset)) + } + + async fn find_tail(&self) -> Result> { + Ok(self + .loglet + .find_tail() + .await? + .map(|o| self.base_lsn.offset_by(o))) + } + + async fn get_trim_point(&self) -> Result> { + let offset = self.loglet.get_trim_point().await?; + Ok(offset.map(|o| self.base_lsn.offset_by(o))) + } + + // trim_point is inclusive. + async fn trim(&self, trim_point: Self::Offset) -> Result<()> { + // trimming to INVALID is no-op + if trim_point == Self::Offset::INVALID { + return Ok(()); + } + let trim_point = trim_point.into_offset(self.base_lsn); + self.loglet.trim(trim_point).await + } + + async fn read_next_single(&self, from: Lsn) -> Result> { + // convert LSN to loglet offset + let offset = from.into_offset(self.base_lsn); + self.loglet + .read_next_single(offset) + .await + .map(|record| record.with_base_lsn(self.base_lsn)) + } + + async fn read_next_single_opt( + &self, + from: Self::Offset, + ) -> Result>> { + let offset = from.into_offset(self.base_lsn); + self.loglet + .read_next_single_opt(offset) + .await + .map(|maybe_record| maybe_record.map(|record| record.with_base_lsn(self.base_lsn))) + } +} + +/// Wraps loglet read streams with the base LSN of the segment +pub struct LogletReadStreamWrapper { + pub(crate) base_lsn: Lsn, + inner: SendableLogletReadStream, +} + +impl LogletReadStreamWrapper { + pub fn new(inner: SendableLogletReadStream, base_lsn: Lsn) -> Self { + Self { inner, base_lsn } + } +} + +impl Stream for LogletReadStreamWrapper { + type Item = Result>; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + match self.inner.as_mut().poll_next(cx) { + Poll::Ready(Some(Ok(record))) => { + Poll::Ready(Some(Ok(record.with_base_lsn(self.base_lsn)))) + } + Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } +} + +static_assertions::assert_impl_all!(LogletWrapper: Send, Sync, Clone); diff --git a/crates/bifrost/src/loglets/local_loglet/keys.rs b/crates/bifrost/src/providers/local_loglet/keys.rs similarity index 100% rename from crates/bifrost/src/loglets/local_loglet/keys.rs rename to crates/bifrost/src/providers/local_loglet/keys.rs diff --git a/crates/bifrost/src/loglets/local_loglet/log_state.rs b/crates/bifrost/src/providers/local_loglet/log_state.rs similarity index 98% rename from crates/bifrost/src/loglets/local_loglet/log_state.rs rename to crates/bifrost/src/providers/local_loglet/log_state.rs index 31a9c21e0..83b4ade55 100644 --- a/crates/bifrost/src/loglets/local_loglet/log_state.rs +++ b/crates/bifrost/src/providers/local_loglet/log_state.rs @@ -18,8 +18,8 @@ use serde::{Deserialize, Serialize}; use smallvec::SmallVec; use tracing::{error, trace, warn}; +use super::keys::{MetadataKey, MetadataKind}; use crate::loglet::LogletOffset; -use crate::loglets::local_loglet::keys::{MetadataKey, MetadataKind}; use crate::SealReason; use super::LogStoreError; diff --git a/crates/bifrost/src/loglets/local_loglet/log_store.rs b/crates/bifrost/src/providers/local_loglet/log_store.rs similarity index 100% rename from crates/bifrost/src/loglets/local_loglet/log_store.rs rename to crates/bifrost/src/providers/local_loglet/log_store.rs diff --git a/crates/bifrost/src/loglets/local_loglet/log_store_writer.rs b/crates/bifrost/src/providers/local_loglet/log_store_writer.rs similarity index 99% rename from crates/bifrost/src/loglets/local_loglet/log_store_writer.rs rename to crates/bifrost/src/providers/local_loglet/log_store_writer.rs index 0d88e5e7a..0857f308d 100644 --- a/crates/bifrost/src/loglets/local_loglet/log_store_writer.rs +++ b/crates/bifrost/src/providers/local_loglet/log_store_writer.rs @@ -27,15 +27,14 @@ use restate_types::config::LocalLogletOptions; use restate_types::live::BoxedLiveLoad; use restate_types::logs::SequenceNumber; -use crate::loglet::LogletOffset; -use crate::{Error, SMALL_BATCH_THRESHOLD_COUNT}; - use super::keys::{MetadataKey, MetadataKind, RecordKey}; use super::log_state::LogStateUpdates; use super::log_store::{DATA_CF, METADATA_CF}; use super::metric_definitions::{ BIFROST_LOCAL_WRITE_BATCH_COUNT, BIFROST_LOCAL_WRITE_BATCH_SIZE_BYTES, }; +use crate::loglet::LogletOffset; +use crate::{Error, SMALL_BATCH_THRESHOLD_COUNT}; type Ack = oneshot::Sender>; type AckRecv = oneshot::Receiver>; diff --git a/crates/bifrost/src/loglets/local_loglet/metric_definitions.rs b/crates/bifrost/src/providers/local_loglet/metric_definitions.rs similarity index 100% rename from crates/bifrost/src/loglets/local_loglet/metric_definitions.rs rename to crates/bifrost/src/providers/local_loglet/metric_definitions.rs diff --git a/crates/bifrost/src/loglets/local_loglet/mod.rs b/crates/bifrost/src/providers/local_loglet/mod.rs similarity index 99% rename from crates/bifrost/src/loglets/local_loglet/mod.rs rename to crates/bifrost/src/providers/local_loglet/mod.rs index 982b4dbf5..8f29d6a33 100644 --- a/crates/bifrost/src/loglets/local_loglet/mod.rs +++ b/crates/bifrost/src/providers/local_loglet/mod.rs @@ -30,7 +30,7 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use crate::loglet::{LogletBase, LogletOffset, SendableLogletReadStream}; -use crate::loglets::local_loglet::metric_definitions::{ +use crate::providers::local_loglet::metric_definitions::{ BIFROST_LOCAL_TRIM, BIFROST_LOCAL_TRIM_LENGTH, }; use crate::{Error, LogRecord, Result, SealReason, TailState}; @@ -40,7 +40,7 @@ use self::log_store::RocksDbLogStore; use self::log_store_writer::RocksDbLogWriterHandle; use self::metric_definitions::{BIFROST_LOCAL_APPEND, BIFROST_LOCAL_APPEND_DURATION}; use self::read_stream::LocalLogletReadStream; -use super::util::OffsetWatch; +use crate::loglet::util::OffsetWatch; struct LocalLoglet { log_id: u64, @@ -345,7 +345,7 @@ mod tests { use restate_types::live::Live; use restate_types::logs::metadata::{LogletParams, ProviderKind}; - use crate::loglet_tests::*; + use crate::loglet::loglet_tests::*; use super::*; diff --git a/crates/bifrost/src/loglets/local_loglet/provider.rs b/crates/bifrost/src/providers/local_loglet/provider.rs similarity index 95% rename from crates/bifrost/src/loglets/local_loglet/provider.rs rename to crates/bifrost/src/providers/local_loglet/provider.rs index 995b745ee..6dec3be78 100644 --- a/crates/bifrost/src/loglets/local_loglet/provider.rs +++ b/crates/bifrost/src/providers/local_loglet/provider.rs @@ -23,9 +23,8 @@ use restate_types::logs::metadata::{LogletParams, ProviderKind}; use super::log_store::RocksDbLogStore; use super::log_store_writer::RocksDbLogWriterHandle; use super::{metric_definitions, LocalLoglet}; -use crate::loglet::{Loglet, LogletOffset}; -use crate::ProviderError; -use crate::{Error, LogletProvider}; +use crate::loglet::{Loglet, LogletOffset, LogletProvider, LogletProviderFactory, ProviderError}; +use crate::Error; pub struct Factory { options: BoxedLiveLoad, @@ -45,7 +44,7 @@ impl Factory { } #[async_trait] -impl crate::LogletProviderFactory for Factory { +impl LogletProviderFactory for Factory { fn kind(&self) -> ProviderKind { ProviderKind::Local } diff --git a/crates/bifrost/src/loglets/local_loglet/read_stream.rs b/crates/bifrost/src/providers/local_loglet/read_stream.rs similarity index 99% rename from crates/bifrost/src/loglets/local_loglet/read_stream.rs rename to crates/bifrost/src/providers/local_loglet/read_stream.rs index 8f20f3b13..c4e6e59a4 100644 --- a/crates/bifrost/src/loglets/local_loglet/read_stream.rs +++ b/crates/bifrost/src/providers/local_loglet/read_stream.rs @@ -25,7 +25,7 @@ use restate_rocksdb::RocksDbPerfGuard; use restate_types::logs::SequenceNumber; use crate::loglet::{LogletOffset, LogletReadStream}; -use crate::loglets::local_loglet::LogStoreError; +use crate::providers::local_loglet::LogStoreError; use crate::{Error, LogRecord, Result}; use super::keys::RecordKey; diff --git a/crates/bifrost/src/loglets/memory_loglet.rs b/crates/bifrost/src/providers/memory_loglet.rs similarity index 97% rename from crates/bifrost/src/loglets/memory_loglet.rs rename to crates/bifrost/src/providers/memory_loglet.rs index 7249d648f..2457c64b6 100644 --- a/crates/bifrost/src/loglets/memory_loglet.rs +++ b/crates/bifrost/src/providers/memory_loglet.rs @@ -27,11 +27,13 @@ use tracing::{debug, info}; use restate_types::logs::metadata::{LogletParams, ProviderKind}; use restate_types::logs::SequenceNumber; -use crate::loglet::{Loglet, LogletBase, LogletOffset, LogletReadStream, SendableLogletReadStream}; -use crate::{Error, LogRecord, LogletProvider, TailState}; -use crate::{ProviderError, Result}; - -use super::util::OffsetWatch; +use crate::loglet::util::OffsetWatch; +use crate::loglet::{ + Loglet, LogletBase, LogletOffset, LogletProvider, LogletProviderFactory, LogletReadStream, + ProviderError, SendableLogletReadStream, +}; +use crate::Result; +use crate::{Error, LogRecord, TailState}; #[derive(Default)] pub struct Factory { @@ -39,6 +41,7 @@ pub struct Factory { } impl Factory { + #[cfg(test)] pub fn with_init_delay(init_delay: Duration) -> Self { Self { init_delay: Some(init_delay), @@ -47,7 +50,7 @@ impl Factory { } #[async_trait] -impl crate::LogletProviderFactory for Factory { +impl LogletProviderFactory for Factory { fn kind(&self) -> ProviderKind { ProviderKind::InMemory } @@ -389,7 +392,7 @@ impl LogletBase for MemoryLoglet { #[cfg(test)] mod tests { - use crate::loglet_tests::*; + use crate::loglet::loglet_tests::*; use super::*; diff --git a/crates/bifrost/src/loglets/mod.rs b/crates/bifrost/src/providers/mod.rs similarity index 92% rename from crates/bifrost/src/loglets/mod.rs rename to crates/bifrost/src/providers/mod.rs index 0ba4f51f4..2f091b184 100644 --- a/crates/bifrost/src/loglets/mod.rs +++ b/crates/bifrost/src/providers/mod.rs @@ -10,5 +10,5 @@ pub mod local_loglet; pub mod memory_loglet; +#[cfg(feature = "replicated-loglet")] pub mod replicated_loglet; -pub(crate) mod util; diff --git a/crates/bifrost/src/loglets/replicated_loglet/metric_definitions.rs b/crates/bifrost/src/providers/replicated_loglet/metric_definitions.rs similarity index 100% rename from crates/bifrost/src/loglets/replicated_loglet/metric_definitions.rs rename to crates/bifrost/src/providers/replicated_loglet/metric_definitions.rs diff --git a/crates/bifrost/src/loglets/replicated_loglet/mod.rs b/crates/bifrost/src/providers/replicated_loglet/mod.rs similarity index 100% rename from crates/bifrost/src/loglets/replicated_loglet/mod.rs rename to crates/bifrost/src/providers/replicated_loglet/mod.rs diff --git a/crates/bifrost/src/loglets/replicated_loglet/provider.rs b/crates/bifrost/src/providers/replicated_loglet/provider.rs similarity index 93% rename from crates/bifrost/src/loglets/replicated_loglet/provider.rs rename to crates/bifrost/src/providers/replicated_loglet/provider.rs index db1c9e33f..df5b69398 100644 --- a/crates/bifrost/src/loglets/replicated_loglet/provider.rs +++ b/crates/bifrost/src/providers/replicated_loglet/provider.rs @@ -24,9 +24,8 @@ use restate_types::live::BoxedLiveLoad; use restate_types::logs::metadata::{LogletParams, ProviderKind}; use super::metric_definitions; -use crate::loglet::{Loglet, LogletOffset}; -use crate::ProviderError; -use crate::{Error, LogletProvider}; +use crate::loglet::{Loglet, LogletOffset, LogletProvider, LogletProviderFactory, ProviderError}; +use crate::Error; pub struct Factory { opts: BoxedLiveLoad, @@ -53,7 +52,7 @@ impl Factory { } #[async_trait] -impl crate::LogletProviderFactory for Factory { +impl LogletProviderFactory for Factory { fn kind(&self) -> ProviderKind { ProviderKind::Replicated } diff --git a/crates/bifrost/src/read_stream.rs b/crates/bifrost/src/read_stream.rs index 606735b5d..6bc2f19b1 100644 --- a/crates/bifrost/src/read_stream.rs +++ b/crates/bifrost/src/read_stream.rs @@ -21,8 +21,8 @@ use restate_types::logs::SequenceNumber; use restate_types::logs::{LogId, Lsn}; use crate::bifrost::BifrostInner; -use crate::loglet::LogletReadStreamWrapper; -use crate::loglet::LogletWrapper; +use crate::loglet_wrapper::LogletReadStreamWrapper; +use crate::loglet_wrapper::LogletWrapper; use crate::FindTailAttributes; use crate::LogRecord; use crate::Result; diff --git a/crates/bifrost/src/service.rs b/crates/bifrost/src/service.rs index e8d788483..ac68d8058 100644 --- a/crates/bifrost/src/service.rs +++ b/crates/bifrost/src/service.rs @@ -21,9 +21,9 @@ use restate_core::{cancellation_watcher, Metadata, TaskCenter, TaskKind}; use restate_types::logs::metadata::ProviderKind; use crate::bifrost::BifrostInner; -use crate::loglets::{local_loglet, memory_loglet}; +use crate::providers::{local_loglet, memory_loglet}; use crate::watchdog::{Watchdog, WatchdogCommand}; -use crate::{Bifrost, LogletProviderFactory}; +use crate::{loglet::LogletProviderFactory, Bifrost}; pub struct BifrostService { task_center: TaskCenter, diff --git a/crates/bifrost/src/watchdog.rs b/crates/bifrost/src/watchdog.rs index c125debed..749bcf37f 100644 --- a/crates/bifrost/src/watchdog.rs +++ b/crates/bifrost/src/watchdog.rs @@ -19,7 +19,7 @@ use tokio::task::JoinSet; use tracing::{debug, trace, warn}; use crate::bifrost::BifrostInner; -use crate::provider::LogletProvider; +use crate::loglet::LogletProvider; pub type WatchdogSender = tokio::sync::mpsc::UnboundedSender; type WatchdogReceiver = tokio::sync::mpsc::UnboundedReceiver; diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index cc5914469..55dcd0dee 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -160,7 +160,7 @@ impl Node { // Setup bifrost // replicated-loglet #[cfg(feature = "replicated-loglet")] - let replicated_loglet_factory = restate_bifrost::loglets::replicated_loglet::Factory::new( + let replicated_loglet_factory = restate_bifrost::providers::replicated_loglet::Factory::new( updateable_config .clone() .map(|c| &c.bifrost.replicated_loglet) From 88a94ebbd7bfe72d573d4b548f4fa8a0100fa261 Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Mon, 22 Jul 2024 16:03:46 +0100 Subject: [PATCH 2/6] [Bifrost] Decouple loglet errors from bifrost errors - Major cleanup of loglet error handling, this removes the unnecessary leakage of internal loglet error types to bifrost. - No more superflous `Arc<>` of non-Clone errors. - FindTail on loglets cannot return Sealed error (enforcement via type-system) --- crates/bifrost/src/bifrost.rs | 22 +++- crates/bifrost/src/error.rs | 18 +++- crates/bifrost/src/loglet/error.rs | 100 +++++++++++++++++- crates/bifrost/src/loglet/mod.rs | 23 ++-- crates/bifrost/src/loglet/provider.rs | 6 +- crates/bifrost/src/loglet_wrapper.rs | 20 ++-- .../src/providers/local_loglet/log_state.rs | 10 +- .../src/providers/local_loglet/log_store.rs | 21 +++- .../local_loglet/log_store_writer.rs | 12 +-- .../bifrost/src/providers/local_loglet/mod.rs | 40 ++++--- .../src/providers/local_loglet/provider.rs | 9 +- .../src/providers/local_loglet/read_stream.rs | 10 +- crates/bifrost/src/providers/memory_loglet.rs | 33 +++--- .../providers/replicated_loglet/provider.rs | 6 +- crates/bifrost/src/read_stream.rs | 2 +- 15 files changed, 236 insertions(+), 96 deletions(-) diff --git a/crates/bifrost/src/bifrost.rs b/crates/bifrost/src/bifrost.rs index df0484f64..6724d2c2c 100644 --- a/crates/bifrost/src/bifrost.rs +++ b/crates/bifrost/src/bifrost.rs @@ -24,7 +24,7 @@ use restate_types::logs::{LogId, Lsn, Payload, SequenceNumber}; use restate_types::storage::StorageCodec; use restate_types::Version; -use crate::loglet::{LogletBase, LogletProvider}; +use crate::loglet::{AppendError, LogletBase, LogletProvider}; use crate::loglet_wrapper::LogletWrapper; use crate::watchdog::WatchdogSender; use crate::{ @@ -257,7 +257,14 @@ impl BifrostInner { let loglet = self.writeable_loglet(log_id).await?; let mut buf = BytesMut::default(); StorageCodec::encode(payload, &mut buf).expect("serialization to bifrost is infallible"); - loglet.append(buf.freeze()).await + + let res = loglet.append(buf.freeze()).await; + // todo: Handle retries, segment seals and other recoverable errors. + res.map_err(|e| match e { + AppendError::Sealed => todo!(), + AppendError::Shutdown(e) => Error::Shutdown(e), + AppendError::Other(e) => Error::LogletError(e), + }) } pub async fn append_batch(&self, log_id: LogId, payloads: &[Payload]) -> Result { @@ -271,7 +278,13 @@ impl BifrostInner { buf.freeze() }) .collect(); - loglet.append_batch(&raw_payloads).await + let res = loglet.append_batch(&raw_payloads).await; + // todo: Handle retries, segment seals and other recoverable errors. + res.map_err(|e| match e { + AppendError::Sealed => todo!(), + AppendError::Shutdown(e) => Error::Shutdown(e), + AppendError::Other(e) => Error::LogletError(e), + }) } pub async fn read_next_single(&self, log_id: LogId, from: Lsn) -> Result { @@ -379,8 +392,7 @@ impl BifrostInner { self.fail_if_shutting_down()?; self.metadata .sync(MetadataKind::Logs, TargetVersion::Latest) - .await - .map_err(Arc::new)?; + .await?; Ok(()) } diff --git a/crates/bifrost/src/error.rs b/crates/bifrost/src/error.rs index 997ed338d..3c3377e60 100644 --- a/crates/bifrost/src/error.rs +++ b/crates/bifrost/src/error.rs @@ -13,13 +13,13 @@ use std::sync::Arc; use restate_types::logs::{LogId, Lsn}; -use crate::providers::local_loglet::LogStoreError; +use crate::loglet::{LogletError, OperationError}; use crate::types::SealReason; /// Result type for bifrost operations. pub type Result = std::result::Result; -#[derive(thiserror::Error, Debug, Clone)] +#[derive(thiserror::Error, Debug)] pub enum Error { #[error("log '{0}' is sealed")] LogSealed(LogId, SealReason), @@ -30,11 +30,19 @@ pub enum Error { #[error("operation failed due to an ongoing shutdown")] Shutdown(#[from] ShutdownError), #[error(transparent)] - LogStoreError(#[from] LogStoreError), + LogletError(#[from] Arc), #[error("failed syncing logs metadata: {0}")] - // unfortunately, we have to use Arc here, because the SyncError is not Clone. - MetadataSync(#[from] Arc), + MetadataSync(#[from] SyncError), /// Provider is unknown or disabled #[error("bifrost provider '{0}' is disabled or unrecognized")] Disabled(String), } + +impl From for Error { + fn from(value: OperationError) -> Self { + match value { + OperationError::Shutdown(e) => Error::Shutdown(e), + OperationError::Other(e) => Error::LogletError(e), + } + } +} diff --git a/crates/bifrost/src/loglet/error.rs b/crates/bifrost/src/loglet/error.rs index 1204586a8..69abeed3b 100644 --- a/crates/bifrost/src/loglet/error.rs +++ b/crates/bifrost/src/loglet/error.rs @@ -8,11 +8,105 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::fmt::{Debug, Display}; +use std::sync::Arc; + use restate_core::ShutdownError; +#[derive(Debug, Clone, thiserror::Error)] +pub enum AppendError { + #[error("Loglet is sealed")] + Sealed, + #[error(transparent)] + Shutdown(#[from] ShutdownError), + #[error(transparent)] + Other(Arc), +} + +impl AppendError { + pub fn retryable(error: E) -> Self { + Self::Other(Arc::new(RetryableError(error))) + } + + pub fn terminal(error: E) -> Self { + Self::Other(Arc::new(TerminalError(error))) + } + + pub fn other(error: E) -> Self { + Self::Other(Arc::new(error)) + } +} + #[derive(Debug, thiserror::Error)] -#[error(transparent)] -pub enum ProviderError { +pub enum OperationError { + #[error(transparent)] Shutdown(#[from] ShutdownError), - Other(#[from] anyhow::Error), + #[error(transparent)] + Other(Arc), +} + +impl OperationError { + pub fn retryable(error: E) -> Self { + Self::Other(Arc::new(RetryableError(error))) + } + + pub fn terminal(error: E) -> Self { + Self::Other(Arc::new(TerminalError(error))) + } + + pub fn other(error: E) -> Self { + Self::Other(Arc::new(error)) + } +} + +// -- Helper Types -- + +/// Represents a type-erased error from the loglet provider. +pub trait LogletError: std::error::Error + Send + Sync + Debug + Display + 'static { + /// Signal upper layers whether this error should be retried or not. + fn retryable(&self) -> bool { + false + } +} + +#[derive(Debug, thiserror::Error)] +struct RetryableError(#[source] T); + +impl Display for RetryableError +where + T: Debug + Display + Send + Sync + std::error::Error + 'static, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "[retryable] {}", self.0) + } +} + +impl LogletError for RetryableError +where + T: Debug + Display + Send + Sync + std::error::Error + 'static, +{ + fn retryable(&self) -> bool { + true + } +} + +#[derive(Debug, thiserror::Error)] +struct TerminalError(#[source] T); + +impl LogletError for TerminalError +where + T: Debug + Display + Send + Sync + std::error::Error + 'static, +{ + fn retryable(&self) -> bool { + false + } +} + +impl Display for TerminalError +where + T: Debug + Display + Send + Sync + std::error::Error + 'static, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "[terminal] {}", self.0) + } } diff --git a/crates/bifrost/src/loglet/mod.rs b/crates/bifrost/src/loglet/mod.rs index 38afecddf..477344763 100644 --- a/crates/bifrost/src/loglet/mod.rs +++ b/crates/bifrost/src/loglet/mod.rs @@ -116,11 +116,11 @@ pub trait LogletBase: Send + Sync + std::fmt::Debug { ) -> Result>; /// Append a record to the loglet. - async fn append(&self, data: Bytes) -> Result; + async fn append(&self, data: Bytes) -> Result; /// Append a batch of records to the loglet. The returned offset (on success) if the offset of /// the first record in the batch) - async fn append_batch(&self, payloads: &[Bytes]) -> Result; + async fn append_batch(&self, payloads: &[Bytes]) -> Result; /// The tail is *the first unwritten position* in the loglet. /// @@ -129,14 +129,12 @@ pub trait LogletBase: Send + Sync + std::fmt::Debug { /// after the next `append()` call. /// /// If the loglet is empty, the loglet should return TailState::Open(Offset::OLDEST). - /// This should never return Err(Error::LogSealed). Sealed state is represented as - /// TailState::Sealed(..) - async fn find_tail(&self) -> Result>; + async fn find_tail(&self) -> Result, OperationError>; /// The offset of the slot **before** the first readable record (if it exists), or the offset /// before the next slot that will be written to. Must not return Self::INVALID. If the loglet /// is never trimmed, this must return `None`. - async fn get_trim_point(&self) -> Result>; + async fn get_trim_point(&self) -> Result, OperationError>; /// Trim the loglet prefix up to and including the `trim_point`. /// If trim_point equal or higher than the loglet tail, the loglet trims its data until the tail. @@ -146,21 +144,26 @@ pub trait LogletBase: Send + Sync + std::fmt::Debug { /// /// Passing `Offset::INVALID` is a no-op. (success) /// Passing `Offset::OLDEST` trims the first record in the loglet (if exists). - async fn trim(&self, trim_point: Self::Offset) -> Result<()>; + async fn trim(&self, trim_point: Self::Offset) -> Result<(), OperationError>; /// Read or wait for the record at `from` offset, or the next available record if `from` isn't /// defined for the loglet. - async fn read_next_single(&self, from: Self::Offset) -> Result>; + async fn read_next_single( + &self, + from: Self::Offset, + ) -> Result, OperationError>; /// Read the next record if it's been committed, otherwise, return None without waiting. async fn read_next_single_opt( &self, from: Self::Offset, - ) -> Result>>; + ) -> Result>, OperationError>; } /// A stream of log records from a single loglet. Loglet streams are _always_ tailing streams. -pub trait LogletReadStream: Stream>> { +pub trait LogletReadStream: + Stream, OperationError>> +{ /// Current read pointer. This points to the next offset to be read. fn read_pointer(&self) -> S; /// Returns true if the stream is terminated. diff --git a/crates/bifrost/src/loglet/provider.rs b/crates/bifrost/src/loglet/provider.rs index 65e75965a..acfac51eb 100644 --- a/crates/bifrost/src/loglet/provider.rs +++ b/crates/bifrost/src/loglet/provider.rs @@ -14,7 +14,7 @@ use async_trait::async_trait; use restate_types::logs::metadata::{LogletParams, ProviderKind}; -use super::{Loglet, ProviderError}; +use super::{Loglet, OperationError}; use crate::Result; #[async_trait] @@ -23,7 +23,7 @@ pub trait LogletProviderFactory: Send + 'static { /// Factory creates providers of `kind`. fn kind(&self) -> ProviderKind; /// Initialize provider. - async fn create(self: Box) -> Result, ProviderError>; + async fn create(self: Box) -> Result, OperationError>; } #[async_trait] @@ -35,7 +35,7 @@ pub trait LogletProvider: Send + Sync { async fn post_start(&self) {} /// Hook for handling graceful shutdown - async fn shutdown(&self) -> Result<(), ProviderError> { + async fn shutdown(&self) -> Result<(), OperationError> { Ok(()) } } diff --git a/crates/bifrost/src/loglet_wrapper.rs b/crates/bifrost/src/loglet_wrapper.rs index affb4c127..b73685cdf 100644 --- a/crates/bifrost/src/loglet_wrapper.rs +++ b/crates/bifrost/src/loglet_wrapper.rs @@ -19,7 +19,9 @@ use futures::Stream; use restate_types::logs::{Lsn, SequenceNumber}; -use crate::loglet::{Loglet, LogletBase, LogletOffset, SendableLogletReadStream}; +use crate::loglet::{ + AppendError, Loglet, LogletBase, LogletOffset, OperationError, SendableLogletReadStream, +}; use crate::{LogRecord, LsnExt}; use crate::{Result, TailState}; @@ -71,18 +73,18 @@ impl LogletBase for LogletWrapper { unreachable!("create_read_stream on LogletWrapper should never be used directly") } - async fn append(&self, data: Bytes) -> Result { + async fn append(&self, data: Bytes) -> Result { let offset = self.loglet.append(data).await?; // Return the LSN given the loglet offset. Ok(self.base_lsn.offset_by(offset)) } - async fn append_batch(&self, payloads: &[Bytes]) -> Result { + async fn append_batch(&self, payloads: &[Bytes]) -> Result { let offset = self.loglet.append_batch(payloads).await?; Ok(self.base_lsn.offset_by(offset)) } - async fn find_tail(&self) -> Result> { + async fn find_tail(&self) -> Result, OperationError> { Ok(self .loglet .find_tail() @@ -90,13 +92,13 @@ impl LogletBase for LogletWrapper { .map(|o| self.base_lsn.offset_by(o))) } - async fn get_trim_point(&self) -> Result> { + async fn get_trim_point(&self) -> Result, OperationError> { let offset = self.loglet.get_trim_point().await?; Ok(offset.map(|o| self.base_lsn.offset_by(o))) } // trim_point is inclusive. - async fn trim(&self, trim_point: Self::Offset) -> Result<()> { + async fn trim(&self, trim_point: Self::Offset) -> Result<(), OperationError> { // trimming to INVALID is no-op if trim_point == Self::Offset::INVALID { return Ok(()); @@ -105,7 +107,7 @@ impl LogletBase for LogletWrapper { self.loglet.trim(trim_point).await } - async fn read_next_single(&self, from: Lsn) -> Result> { + async fn read_next_single(&self, from: Lsn) -> Result, OperationError> { // convert LSN to loglet offset let offset = from.into_offset(self.base_lsn); self.loglet @@ -117,7 +119,7 @@ impl LogletBase for LogletWrapper { async fn read_next_single_opt( &self, from: Self::Offset, - ) -> Result>> { + ) -> Result>, OperationError> { let offset = from.into_offset(self.base_lsn); self.loglet .read_next_single_opt(offset) @@ -139,7 +141,7 @@ impl LogletReadStreamWrapper { } impl Stream for LogletReadStreamWrapper { - type Item = Result>; + type Item = Result, OperationError>; fn poll_next( mut self: Pin<&mut Self>, diff --git a/crates/bifrost/src/providers/local_loglet/log_state.rs b/crates/bifrost/src/providers/local_loglet/log_state.rs index 83b4ade55..a07ef6de6 100644 --- a/crates/bifrost/src/providers/local_loglet/log_state.rs +++ b/crates/bifrost/src/providers/local_loglet/log_state.rs @@ -8,8 +8,6 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::sync::Arc; - use bytes::{BufMut, Bytes, BytesMut}; use restate_types::flexbuffers_storage_encode_decode; use restate_types::storage::StorageCodec; @@ -75,11 +73,11 @@ impl LogStateUpdates { } pub fn encode(&self, buf: &mut B) -> Result<(), LogStoreError> { - StorageCodec::encode(self, buf).map_err(|err| LogStoreError::Encode(Arc::new(err))) + Ok(StorageCodec::encode(self, buf)?) } pub fn from_slice(mut data: &[u8]) -> Result { - StorageCodec::decode(&mut data).map_err(|err| LogStoreError::Decode(Arc::new(err))) + Ok(StorageCodec::decode(&mut data)?) } } @@ -95,12 +93,12 @@ pub struct LogState { impl LogState { pub fn to_bytes(&self) -> Result { let mut buf = BytesMut::default(); - StorageCodec::encode(self, &mut buf).map_err(Arc::new)?; + StorageCodec::encode(self, &mut buf)?; Ok(buf.freeze()) } pub fn from_slice(mut data: &[u8]) -> Result { - StorageCodec::decode(&mut data).map_err(|err| LogStoreError::Decode(Arc::new(err))) + Ok(StorageCodec::decode(&mut data)?) } } diff --git a/crates/bifrost/src/providers/local_loglet/log_store.rs b/crates/bifrost/src/providers/local_loglet/log_store.rs index 7176f763c..765235065 100644 --- a/crates/bifrost/src/providers/local_loglet/log_store.rs +++ b/crates/bifrost/src/providers/local_loglet/log_store.rs @@ -19,6 +19,8 @@ use restate_types::storage::{StorageDecodeError, StorageEncodeError}; use rocksdb::{BoundColumnFamily, DBCompressionType, SliceTransform, DB}; use static_assertions::const_assert; +use crate::loglet::LogletError; + use super::keys::{MetadataKey, MetadataKind, DATA_KEY_PREFIX_LENGTH}; use super::log_state::{log_state_full_merge, log_state_partial_merge, LogState}; use super::log_store_writer::LogStoreWriter; @@ -33,20 +35,29 @@ const DATA_CF_BUDGET_RATIO: f64 = 0.85; const_assert!(DATA_CF_BUDGET_RATIO < 1.0); -#[derive(Debug, Clone, thiserror::Error)] +#[derive(Debug, thiserror::Error)] pub enum LogStoreError { #[error(transparent)] - // unfortunately, we have to use Arc here, because the storage encode error is not Clone. - Encode(#[from] Arc), + Encode(#[from] StorageEncodeError), #[error(transparent)] - // unfortunately, we have to use Arc here, because the storage decode error is not Clone. - Decode(#[from] Arc), + Decode(#[from] StorageDecodeError), #[error(transparent)] Rocksdb(#[from] rocksdb::Error), #[error(transparent)] RocksDbManager(#[from] RocksError), } +impl LogletError for LogStoreError { + fn retryable(&self) -> bool { + match self { + LogStoreError::Encode(_) => false, + LogStoreError::Decode(_) => false, + LogStoreError::Rocksdb(_) => true, + LogStoreError::RocksDbManager(_) => false, + } + } +} + #[derive(Debug, Clone)] pub struct RocksDbLogStore { rocksdb: Arc, diff --git a/crates/bifrost/src/providers/local_loglet/log_store_writer.rs b/crates/bifrost/src/providers/local_loglet/log_store_writer.rs index 0857f308d..04c290d27 100644 --- a/crates/bifrost/src/providers/local_loglet/log_store_writer.rs +++ b/crates/bifrost/src/providers/local_loglet/log_store_writer.rs @@ -33,11 +33,11 @@ use super::log_store::{DATA_CF, METADATA_CF}; use super::metric_definitions::{ BIFROST_LOCAL_WRITE_BATCH_COUNT, BIFROST_LOCAL_WRITE_BATCH_SIZE_BYTES, }; -use crate::loglet::LogletOffset; -use crate::{Error, SMALL_BATCH_THRESHOLD_COUNT}; +use crate::loglet::{AppendError, LogletOffset}; +use crate::SMALL_BATCH_THRESHOLD_COUNT; -type Ack = oneshot::Sender>; -type AckRecv = oneshot::Receiver>; +type Ack = oneshot::Sender>; +type AckRecv = oneshot::Receiver>; pub struct LogStoreWriteCommand { log_id: u64, @@ -266,14 +266,14 @@ impl LogStoreWriter { if let Err(e) = result { error!("Failed to commit local loglet write batch: {}", e); - self.send_acks(Err(Error::LogStoreError(e.into()))); + self.send_acks(Err(AppendError::terminal(e))); return; } self.send_acks(Ok(())); } - fn send_acks(&mut self, result: Result<(), Error>) { + fn send_acks(&mut self, result: Result<(), AppendError>) { self.batch_acks_buf.drain(..).for_each(|a| { let _ = a.send(result.clone()); }); diff --git a/crates/bifrost/src/providers/local_loglet/mod.rs b/crates/bifrost/src/providers/local_loglet/mod.rs index 8f29d6a33..42bd2504e 100644 --- a/crates/bifrost/src/providers/local_loglet/mod.rs +++ b/crates/bifrost/src/providers/local_loglet/mod.rs @@ -29,11 +29,13 @@ use tracing::{debug, warn}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; -use crate::loglet::{LogletBase, LogletOffset, SendableLogletReadStream}; +use crate::loglet::{ + AppendError, LogletBase, LogletOffset, OperationError, SendableLogletReadStream, +}; use crate::providers::local_loglet::metric_definitions::{ BIFROST_LOCAL_TRIM, BIFROST_LOCAL_TRIM_LENGTH, }; -use crate::{Error, LogRecord, Result, SealReason, TailState}; +use crate::{LogRecord, Result, SealReason, TailState}; use self::keys::RecordKey; use self::log_store::RocksDbLogStore; @@ -76,9 +78,11 @@ impl LocalLoglet { log_id: u64, log_store: RocksDbLogStore, log_writer: RocksDbLogWriterHandle, - ) -> Result { + ) -> Result { // Fetch the log metadata from the store - let log_state = log_store.get_log_state(log_id)?; + let log_state = log_store + .get_log_state(log_id) + .map_err(OperationError::other)?; let log_state = log_state.unwrap_or_default(); let trim_point_offset = AtomicU64::new(log_state.trim_point); @@ -120,7 +124,7 @@ impl LocalLoglet { fn read_from( &self, from_offset: LogletOffset, - ) -> Result>> { + ) -> Result>, OperationError> { debug_assert_ne!(LogletOffset::INVALID, from_offset); let trim_point = LogletOffset(self.trim_point_offset.load(Ordering::Relaxed)); let head_offset = trim_point.next(); @@ -144,7 +148,10 @@ impl LocalLoglet { read_opts, rocksdb::IteratorMode::From(&key.to_bytes(), rocksdb::Direction::Forward), ); - let record = iter.next().transpose().map_err(LogStoreError::Rocksdb)?; + let record = iter + .next() + .transpose() + .map_err(|e| OperationError::other(LogStoreError::Rocksdb(e)))?; let Some(record) = record else { let trim_point = LogletOffset(self.trim_point_offset.load(Ordering::Relaxed)); // we might not have been able to read the next record because of a concurrent trim operation @@ -192,7 +199,7 @@ impl LogletBase for LocalLoglet { Ok(Box::pin(LocalLogletReadStream::create(self, from).await?)) } - async fn append(&self, payload: Bytes) -> Result { + async fn append(&self, payload: Bytes) -> Result { counter!(BIFROST_LOCAL_APPEND).increment(1); let start_time = std::time::Instant::now(); // We hold the lock to ensure that offsets are enqueued in the order of @@ -217,7 +224,7 @@ impl LogletBase for LocalLoglet { let _ = receiver.await.unwrap_or_else(|_| { warn!("Unsure if the local loglet record was written, the ack channel was dropped"); - Err(Error::Shutdown(ShutdownError)) + Err(ShutdownError.into()) })?; self.last_committed_offset @@ -227,7 +234,7 @@ impl LogletBase for LocalLoglet { Ok(offset) } - async fn append_batch(&self, payloads: &[Bytes]) -> Result { + async fn append_batch(&self, payloads: &[Bytes]) -> Result { let num_payloads = payloads.len(); counter!(BIFROST_LOCAL_APPEND).increment(num_payloads as u64); let start_time = std::time::Instant::now(); @@ -251,7 +258,7 @@ impl LogletBase for LocalLoglet { let _ = receiver.await.unwrap_or_else(|_| { warn!("Unsure if the local loglet record was written, the ack channel was dropped"); - Err(Error::Shutdown(ShutdownError)) + Err(ShutdownError.into()) })?; self.last_committed_offset @@ -261,7 +268,7 @@ impl LogletBase for LocalLoglet { Ok(offset) } - async fn find_tail(&self) -> Result> { + async fn find_tail(&self) -> Result, OperationError> { let last_committed = LogletOffset::from(self.last_committed_offset.load(Ordering::Relaxed)).next(); Ok(if self.seal.is_some() { @@ -271,7 +278,7 @@ impl LogletBase for LocalLoglet { }) } - async fn get_trim_point(&self) -> Result> { + async fn get_trim_point(&self) -> Result, OperationError> { let current_trim_point = LogletOffset(self.trim_point_offset.load(Ordering::Relaxed)); if current_trim_point == LogletOffset::INVALID { @@ -283,7 +290,7 @@ impl LogletBase for LocalLoglet { /// Trim the log to the minimum of new_trim_point and last_committed_offset /// new_trim_point is inclusive (will be trimmed) - async fn trim(&self, new_trim_point: Self::Offset) -> Result<(), Error> { + async fn trim(&self, new_trim_point: Self::Offset) -> Result<(), OperationError> { let effective_trim_point = new_trim_point.min(LogletOffset( self.last_committed_offset.load(Ordering::Relaxed), )); @@ -318,7 +325,10 @@ impl LogletBase for LocalLoglet { Ok(()) } - async fn read_next_single(&self, from: Self::Offset) -> Result> { + async fn read_next_single( + &self, + from: Self::Offset, + ) -> Result, OperationError> { loop { let next_record = self.read_from(from)?; if let Some(next_record) = next_record { @@ -332,7 +342,7 @@ impl LogletBase for LocalLoglet { async fn read_next_single_opt( &self, from: Self::Offset, - ) -> Result>> { + ) -> Result>, OperationError> { self.read_from(from) } } diff --git a/crates/bifrost/src/providers/local_loglet/provider.rs b/crates/bifrost/src/providers/local_loglet/provider.rs index 6dec3be78..ede27e6d5 100644 --- a/crates/bifrost/src/providers/local_loglet/provider.rs +++ b/crates/bifrost/src/providers/local_loglet/provider.rs @@ -11,7 +11,6 @@ use std::collections::{hash_map, HashMap}; use std::sync::Arc; -use anyhow::Context; use async_trait::async_trait; use tokio::sync::Mutex as AsyncMutex; use tracing::debug; @@ -23,7 +22,7 @@ use restate_types::logs::metadata::{LogletParams, ProviderKind}; use super::log_store::RocksDbLogStore; use super::log_store_writer::RocksDbLogWriterHandle; use super::{metric_definitions, LocalLoglet}; -use crate::loglet::{Loglet, LogletOffset, LogletProvider, LogletProviderFactory, ProviderError}; +use crate::loglet::{Loglet, LogletOffset, LogletProvider, LogletProviderFactory, OperationError}; use crate::Error; pub struct Factory { @@ -49,7 +48,7 @@ impl LogletProviderFactory for Factory { ProviderKind::Local } - async fn create(self: Box) -> Result, ProviderError> { + async fn create(self: Box) -> Result, OperationError> { metric_definitions::describe_metrics(); let Factory { mut options, @@ -59,7 +58,7 @@ impl LogletProviderFactory for Factory { let opts = options.live_load(); let log_store = RocksDbLogStore::create(opts, rocksdb_opts) .await - .context("RocksDb LogStore")?; + .map_err(OperationError::other)?; let log_writer = log_store.create_writer().start(options)?; debug!("Started a bifrost local loglet provider"); Ok(Arc::new(LocalLogletProvider { @@ -110,7 +109,7 @@ impl LogletProvider for LocalLogletProvider { Ok(loglet as Arc) } - async fn shutdown(&self) -> Result<(), ProviderError> { + async fn shutdown(&self) -> Result<(), OperationError> { Ok(()) } } diff --git a/crates/bifrost/src/providers/local_loglet/read_stream.rs b/crates/bifrost/src/providers/local_loglet/read_stream.rs index c4e6e59a4..a444899b6 100644 --- a/crates/bifrost/src/providers/local_loglet/read_stream.rs +++ b/crates/bifrost/src/providers/local_loglet/read_stream.rs @@ -24,9 +24,9 @@ use restate_core::ShutdownError; use restate_rocksdb::RocksDbPerfGuard; use restate_types::logs::SequenceNumber; -use crate::loglet::{LogletOffset, LogletReadStream}; +use crate::loglet::{LogletOffset, LogletReadStream, OperationError}; use crate::providers::local_loglet::LogStoreError; -use crate::{Error, LogRecord, Result}; +use crate::{LogRecord, Result}; use super::keys::RecordKey; use super::LocalLoglet; @@ -127,7 +127,7 @@ impl LogletReadStream for LocalLogletReadStream { } impl Stream for LocalLogletReadStream { - type Item = Result>; + type Item = Result, OperationError>; fn poll_next( mut self: std::pin::Pin<&mut Self>, @@ -162,7 +162,7 @@ impl Stream for LocalLogletReadStream { None => { // system shutdown. Or that the loglet has been unexpectedly shutdown. this.terminated.set(true); - return Poll::Ready(Some(Err(Error::Shutdown(ShutdownError)))); + return Poll::Ready(Some(Err(OperationError::Shutdown(ShutdownError)))); } } } @@ -199,7 +199,7 @@ impl Stream for LocalLogletReadStream { // todo: If status is not ok(), we should retry if let Err(e) = this.iterator.status() { this.terminated.set(true); - return Poll::Ready(Some(Err(Error::LogStoreError(LogStoreError::Rocksdb(e))))); + return Poll::Ready(Some(Err(OperationError::other(LogStoreError::Rocksdb(e))))); } if !this.iterator.valid() || this.iterator.key().is_none() { diff --git a/crates/bifrost/src/providers/memory_loglet.rs b/crates/bifrost/src/providers/memory_loglet.rs index 2457c64b6..e1ecae473 100644 --- a/crates/bifrost/src/providers/memory_loglet.rs +++ b/crates/bifrost/src/providers/memory_loglet.rs @@ -29,11 +29,11 @@ use restate_types::logs::SequenceNumber; use crate::loglet::util::OffsetWatch; use crate::loglet::{ - Loglet, LogletBase, LogletOffset, LogletProvider, LogletProviderFactory, LogletReadStream, - ProviderError, SendableLogletReadStream, + AppendError, Loglet, LogletBase, LogletOffset, LogletProvider, LogletProviderFactory, + LogletReadStream, OperationError, SendableLogletReadStream, }; use crate::Result; -use crate::{Error, LogRecord, TailState}; +use crate::{LogRecord, TailState}; #[derive(Default)] pub struct Factory { @@ -55,7 +55,7 @@ impl LogletProviderFactory for Factory { ProviderKind::InMemory } - async fn create(self: Box) -> Result, ProviderError> { + async fn create(self: Box) -> Result, OperationError> { Ok(Arc::new(MemoryLogletProvider { store: Default::default(), init_delay: self.init_delay.unwrap_or_default(), @@ -95,7 +95,7 @@ impl LogletProvider for MemoryLogletProvider { Ok(loglet as Arc) } - async fn shutdown(&self) -> Result<(), ProviderError> { + async fn shutdown(&self) -> Result<(), OperationError> { info!("Shutting down in-memory loglet provider"); Ok(()) } @@ -150,7 +150,7 @@ impl MemoryLoglet { fn read_from( &self, from_offset: LogletOffset, - ) -> Result>> { + ) -> Result>, OperationError> { let guard = self.log.lock().unwrap(); let trim_point = LogletOffset(self.trim_point_offset.load(Ordering::Acquire)); let head_offset = trim_point.next(); @@ -216,7 +216,7 @@ impl LogletReadStream for MemoryReadStream { } impl Stream for MemoryReadStream { - type Item = Result>; + type Item = Result, OperationError>; fn poll_next( mut self: std::pin::Pin<&mut Self>, @@ -248,7 +248,7 @@ impl Stream for MemoryReadStream { None => { // system shutdown. Or that the loglet has been unexpectedly shutdown. this.terminated.set(true); - return Poll::Ready(Some(Err(Error::Shutdown(ShutdownError)))); + return Poll::Ready(Some(Err(OperationError::Shutdown(ShutdownError)))); } } } @@ -298,7 +298,7 @@ impl LogletBase for MemoryLoglet { Ok(Box::pin(MemoryReadStream::create(self, from).await)) } - async fn append(&self, payload: Bytes) -> Result { + async fn append(&self, payload: Bytes) -> Result { let mut log = self.log.lock().unwrap(); let offset = self.index_to_offset(log.len()); debug!( @@ -312,7 +312,7 @@ impl LogletBase for MemoryLoglet { Ok(offset) } - async fn append_batch(&self, payloads: &[Bytes]) -> Result { + async fn append_batch(&self, payloads: &[Bytes]) -> Result { let mut log = self.log.lock().unwrap(); let offset = LogletOffset(self.last_committed_offset.load(Ordering::Acquire)).next(); let first_offset = offset; @@ -329,7 +329,7 @@ impl LogletBase for MemoryLoglet { Ok(first_offset) } - async fn find_tail(&self) -> Result> { + async fn find_tail(&self) -> Result, OperationError> { let committed = LogletOffset(self.last_committed_offset.load(Ordering::Acquire)).next(); let sealed = self.sealed.load(Ordering::Acquire); Ok(if sealed { @@ -340,7 +340,7 @@ impl LogletBase for MemoryLoglet { } /// Find the head (oldest) record in the loglet. - async fn get_trim_point(&self) -> Result> { + async fn get_trim_point(&self) -> Result, OperationError> { let current_trim_point = LogletOffset(self.trim_point_offset.load(Ordering::Relaxed)); if current_trim_point == LogletOffset::INVALID { @@ -350,7 +350,7 @@ impl LogletBase for MemoryLoglet { } } - async fn trim(&self, new_trim_point: Self::Offset) -> Result<()> { + async fn trim(&self, new_trim_point: Self::Offset) -> Result<(), OperationError> { let actual_trim_point = new_trim_point.min(LogletOffset( self.last_committed_offset.load(Ordering::Relaxed), )); @@ -371,7 +371,10 @@ impl LogletBase for MemoryLoglet { Ok(()) } - async fn read_next_single(&self, from: LogletOffset) -> Result> { + async fn read_next_single( + &self, + from: LogletOffset, + ) -> Result, OperationError> { loop { let next_record = self.read_from(from)?; if let Some(next_record) = next_record { @@ -385,7 +388,7 @@ impl LogletBase for MemoryLoglet { async fn read_next_single_opt( &self, after: Self::Offset, - ) -> Result>> { + ) -> Result>, OperationError> { self.read_from(after) } } diff --git a/crates/bifrost/src/providers/replicated_loglet/provider.rs b/crates/bifrost/src/providers/replicated_loglet/provider.rs index df5b69398..2459de270 100644 --- a/crates/bifrost/src/providers/replicated_loglet/provider.rs +++ b/crates/bifrost/src/providers/replicated_loglet/provider.rs @@ -24,7 +24,7 @@ use restate_types::live::BoxedLiveLoad; use restate_types::logs::metadata::{LogletParams, ProviderKind}; use super::metric_definitions; -use crate::loglet::{Loglet, LogletOffset, LogletProvider, LogletProviderFactory, ProviderError}; +use crate::loglet::{Loglet, LogletOffset, LogletProvider, LogletProviderFactory, OperationError}; use crate::Error; pub struct Factory { @@ -57,7 +57,7 @@ impl LogletProviderFactory for Factory { ProviderKind::Replicated } - async fn create(self: Box) -> Result, ProviderError> { + async fn create(self: Box) -> Result, OperationError> { metric_definitions::describe_metrics(); Ok(Arc::new(ReplicatedLogletProvider)) } @@ -75,7 +75,7 @@ impl LogletProvider for ReplicatedLogletProvider { todo!("Not implemented yet") } - async fn shutdown(&self) -> Result<(), ProviderError> { + async fn shutdown(&self) -> Result<(), OperationError> { Ok(()) } } diff --git a/crates/bifrost/src/read_stream.rs b/crates/bifrost/src/read_stream.rs index 6bc2f19b1..ef799f4ea 100644 --- a/crates/bifrost/src/read_stream.rs +++ b/crates/bifrost/src/read_stream.rs @@ -143,7 +143,7 @@ impl Stream for LogReadStream { self.read_pointer = new_pointer; Poll::Ready(Some(Ok(record))) } - Some(Err(e)) => Poll::Ready(Some(Err(e))), + Some(Err(e)) => Poll::Ready(Some(Err(e.into()))), None => { // todo: check if we should switch the loglet. self.as_mut().terminated = true; From 56dfdd3ccfdcbac472550994c6942c2d64042161 Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Mon, 22 Jul 2024 16:03:46 +0100 Subject: [PATCH 3/6] [Bifrost] Base seal tests and implements seal() on memory loglet This also adds an optional limit to loglet readstreams to enable creating a readstream with a pre-determined end offset. --- crates/bifrost/src/loglet/loglet_tests.rs | 163 +++++++++++++++++- crates/bifrost/src/loglet/mod.rs | 10 ++ crates/bifrost/src/loglet_wrapper.rs | 7 +- .../bifrost/src/providers/local_loglet/mod.rs | 9 +- .../src/providers/local_loglet/read_stream.rs | 9 + crates/bifrost/src/providers/memory_loglet.rs | 72 ++++++-- 6 files changed, 248 insertions(+), 22 deletions(-) diff --git a/crates/bifrost/src/loglet/loglet_tests.rs b/crates/bifrost/src/loglet/loglet_tests.rs index 8c1113d3e..c8f635da5 100644 --- a/crates/bifrost/src/loglet/loglet_tests.rs +++ b/crates/bifrost/src/loglet/loglet_tests.rs @@ -8,21 +8,24 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::collections::BTreeSet; use std::sync::atomic::AtomicUsize; use std::sync::Arc; use std::time::Duration; use bytes::Bytes; -use futures::StreamExt; use googletest::prelude::*; -use tokio::task::JoinHandle; +use tokio::sync::Barrier; +use tokio::task::{JoinHandle, JoinSet}; use restate_test_util::let_assert; use restate_types::logs::SequenceNumber; +use tokio_stream::StreamExt; use tracing::info; use super::{Loglet, LogletOffset}; -use crate::{LogRecord, Record, TrimGap}; +use crate::loglet::AppendError; +use crate::{LogRecord, Record, TailState, TrimGap}; fn setup() { // Make sure that panics exits the process. @@ -216,7 +219,10 @@ pub async fn single_loglet_readstream_test(loglet: Arc) -> googletes setup(); let read_from_offset = LogletOffset::from(6); - let mut reader = loglet.clone().create_read_stream(read_from_offset).await?; + let mut reader = loglet + .clone() + .create_read_stream(read_from_offset, None) + .await?; { // no records have been written yet. @@ -307,7 +313,7 @@ pub async fn single_loglet_readstream_test_with_trims( let mut read_stream = loglet .clone() - .create_read_stream(LogletOffset::OLDEST) + .create_read_stream(LogletOffset::OLDEST, None) .await?; let record = read_stream.next().await.unwrap()?; @@ -393,3 +399,150 @@ pub async fn single_loglet_readstream_test_with_trims( Ok(()) } + +/// Validates that appends fail after find_tail() returned Sealed() +pub async fn loglet_test_append_after_seal(loglet: Arc) -> googletest::Result<()> { + setup(); + + assert_eq!(None, loglet.get_trim_point().await?); + { + let tail = loglet.find_tail().await?; + assert_eq!(LogletOffset::OLDEST, tail.offset()); + assert!(!tail.is_sealed()); + } + + // append 5 records. Offsets [1..5] + for i in 1..=5 { + loglet.append(Bytes::from(format!("record{}", i))).await?; + } + + loglet.seal().await?; + + // attempt to append 5 records. Offsets [6..10]. Expected to fail since seal happened on the same client. + for i in 6..=10 { + let res = loglet.append(Bytes::from(format!("record{}", i))).await; + assert_that!(res, err(pat!(AppendError::Sealed))); + } + + let tail = loglet.find_tail().await?; + // Seal must be applied after commit index 5 since it has been acknowledged (tail is 6 or higher) + assert_that!(tail, pat!(TailState::Sealed(gt(LogletOffset::from(5))))); + + Ok(()) +} + +/// Validates that appends fail after find_tail() returned Sealed() +pub async fn loglet_test_append_after_seal_concurrent( + loglet: Arc, +) -> googletest::Result<()> { + use futures::TryStreamExt as _; + + const WARMUP_APPENDS: usize = 1000; + const CONCURRENT_APPENDERS: usize = 20; + + setup(); + + assert_eq!(None, loglet.get_trim_point().await?); + { + let tail = loglet.find_tail().await?; + assert_eq!(LogletOffset::OLDEST, tail.offset()); + assert!(!tail.is_sealed()); + } + // +1 for the main task waiting on all concurrent appenders + let append_barrier = Arc::new(Barrier::new(CONCURRENT_APPENDERS + 1)); + + let mut appenders: JoinSet> = JoinSet::new(); + for appender_id in 0..CONCURRENT_APPENDERS { + appenders.spawn({ + let loglet = loglet.clone(); + let append_barrier = append_barrier.clone(); + async move { + let mut i = 1; + let mut committed = Vec::new(); + let mut warmup = true; + loop { + let res = loglet + .append(Bytes::from(format!("appender-{}-record{}", appender_id, i))) + .await; + i += 1; + if i > WARMUP_APPENDS && warmup { + println!("appender({}) - warmup complete....", appender_id); + append_barrier.wait().await; + warmup = false; + } + match res { + Ok(offset) => { + committed.push(offset); + } + Err(AppendError::Sealed) => { + break; + } + Err(e) => fail!("unexpected error: {}", e)?, + } + // give a chance to other tasks to work + tokio::task::yield_now().await; + } + Ok(committed) + } + }); + } + + // Wait for some warmup appends + println!( + "Awaiting all appenders to reach at least {} appends", + WARMUP_APPENDS + ); + append_barrier.wait().await; + // Go places and do other things. + for _ in 0..5 { + tokio::task::yield_now().await; + } + + loglet.seal().await?; + // fails immediately + assert_that!( + loglet.append(Bytes::from_static(b"failed-record")).await, + err(pat!(AppendError::Sealed)) + ); + + let tail = loglet.find_tail().await?; + assert!(tail.is_sealed()); + println!("Sealed tail: {:?}", tail); + + let mut all_committed = BTreeSet::new(); + while let Some(handle) = appenders.join_next().await { + let mut committed = handle??; + assert!(!committed.is_empty()); + let committed_len = committed.len(); + assert!(committed_len >= WARMUP_APPENDS); + let tail_record = committed.pop().unwrap(); + // tail must be beyond seal point + assert!(tail.offset() > tail_record); + println!( + "Committed len: {}, last appended was {}", + committed_len, tail_record + ); + // ensure that all committed records are unique + assert!(all_committed.insert(tail_record)); + for offset in committed { + assert!(all_committed.insert(offset)); + } + } + + let reader = loglet + .clone() + .create_read_stream(LogletOffset::OLDEST, Some(tail.offset().prev())) + .await?; + + let records: BTreeSet = reader + .try_filter_map(|x| std::future::ready(Ok(Some(x.offset)))) + .try_collect() + .await?; + + // every record committed must be observed in readstream, and it's acceptable for the + // readstream to include more records. + assert!(all_committed.len() <= records.len()); + assert!(all_committed.is_subset(&records)); + + Ok(()) +} diff --git a/crates/bifrost/src/loglet/mod.rs b/crates/bifrost/src/loglet/mod.rs index 477344763..7e9f3067d 100644 --- a/crates/bifrost/src/loglet/mod.rs +++ b/crates/bifrost/src/loglet/mod.rs @@ -110,9 +110,12 @@ pub trait LogletBase: Send + Sync + std::fmt::Debug { /// Create a read stream that streams record from a single loglet instance. /// + /// `to`: The offset of the last record to be read (inclusive). If `None`, the + /// stream is an open-ended tailing read stream. async fn create_read_stream( self: Arc, from: Self::Offset, + to: Option, ) -> Result>; /// Append a record to the loglet. @@ -146,6 +149,12 @@ pub trait LogletBase: Send + Sync + std::fmt::Debug { /// Passing `Offset::OLDEST` trims the first record in the loglet (if exists). async fn trim(&self, trim_point: Self::Offset) -> Result<(), OperationError>; + /// Seal the loglet. This operation is idempotent. + /// + /// Appends **SHOULD NOT** succeed after a `seal()` call is successful. And appends **MUST + /// NOT** succeed after the offset returned by the *first* TailState::Sealed() response. + async fn seal(&self) -> Result<(), OperationError>; + /// Read or wait for the record at `from` offset, or the next available record if `from` isn't /// defined for the loglet. async fn read_next_single( @@ -166,6 +175,7 @@ pub trait LogletReadStream: { /// Current read pointer. This points to the next offset to be read. fn read_pointer(&self) -> S; + /// Returns true if the stream is terminated. fn is_terminated(&self) -> bool; } diff --git a/crates/bifrost/src/loglet_wrapper.rs b/crates/bifrost/src/loglet_wrapper.rs index b73685cdf..a2256441f 100644 --- a/crates/bifrost/src/loglet_wrapper.rs +++ b/crates/bifrost/src/loglet_wrapper.rs @@ -48,7 +48,7 @@ impl LogletWrapper { // Translates LSN to loglet offset Ok(LogletReadStreamWrapper::new( self.loglet - .create_read_stream(start_lsn.into_offset(self.base_lsn)) + .create_read_stream(start_lsn.into_offset(self.base_lsn), None) .await?, self.base_lsn, )) @@ -69,6 +69,7 @@ impl LogletBase for LogletWrapper { async fn create_read_stream( self: Arc, _after: Self::Offset, + _to: Option, ) -> Result> { unreachable!("create_read_stream on LogletWrapper should never be used directly") } @@ -107,6 +108,10 @@ impl LogletBase for LogletWrapper { self.loglet.trim(trim_point).await } + async fn seal(&self) -> Result<(), OperationError> { + self.loglet.seal().await + } + async fn read_next_single(&self, from: Lsn) -> Result, OperationError> { // convert LSN to loglet offset let offset = from.into_offset(self.base_lsn); diff --git a/crates/bifrost/src/providers/local_loglet/mod.rs b/crates/bifrost/src/providers/local_loglet/mod.rs index 42bd2504e..5281a3127 100644 --- a/crates/bifrost/src/providers/local_loglet/mod.rs +++ b/crates/bifrost/src/providers/local_loglet/mod.rs @@ -195,8 +195,11 @@ impl LogletBase for LocalLoglet { async fn create_read_stream( self: Arc, from: Self::Offset, + to: Option, ) -> Result> { - Ok(Box::pin(LocalLogletReadStream::create(self, from).await?)) + Ok(Box::pin( + LocalLogletReadStream::create(self, from, to).await?, + )) } async fn append(&self, payload: Bytes) -> Result { @@ -325,6 +328,10 @@ impl LogletBase for LocalLoglet { Ok(()) } + async fn seal(&self) -> Result<(), OperationError> { + todo!() + } + async fn read_next_single( &self, from: Self::Offset, diff --git a/crates/bifrost/src/providers/local_loglet/read_stream.rs b/crates/bifrost/src/providers/local_loglet/read_stream.rs index a444899b6..b5dde70d4 100644 --- a/crates/bifrost/src/providers/local_loglet/read_stream.rs +++ b/crates/bifrost/src/providers/local_loglet/read_stream.rs @@ -38,6 +38,8 @@ pub(crate) struct LocalLogletReadStream { // the next record this stream will attempt to read read_pointer: LogletOffset, release_pointer: LogletOffset, + /// Last offset to read before terminating the stream. None means "tailing" reader. + read_to: Option, #[pin] iterator: DBRawIteratorWithThreadMode<'static, DB>, #[pin] @@ -62,6 +64,7 @@ impl LocalLogletReadStream { pub(crate) async fn create( loglet: Arc, from_offset: LogletOffset, + to: Option, ) -> Result { // Reading from INVALID resets to OLDEST. let from_offset = from_offset.max(LogletOffset::OLDEST); @@ -111,6 +114,7 @@ impl LocalLogletReadStream { terminated: false, release_watch, release_pointer, + read_to: to, }) } } @@ -143,6 +147,11 @@ impl Stream for LocalLogletReadStream { loop { let mut this = self.as_mut().project(); + // We have reached the limit we are allowed to read + if this.read_to.is_some_and(|read_to| next_offset > read_to) { + this.terminated.set(true); + return Poll::Ready(None); + } // Are we reading after commit offset? // We are at tail. We need to wait until new records have been released. if next_offset > *this.release_pointer { diff --git a/crates/bifrost/src/providers/memory_loglet.rs b/crates/bifrost/src/providers/memory_loglet.rs index e1ecae473..27e62dec9 100644 --- a/crates/bifrost/src/providers/memory_loglet.rs +++ b/crates/bifrost/src/providers/memory_loglet.rs @@ -127,18 +127,18 @@ impl MemoryLoglet { } fn index_to_offset(&self, index: usize) -> LogletOffset { - let offset = self.trim_point_offset.load(Ordering::Acquire); + let offset = self.trim_point_offset.load(Ordering::Relaxed); LogletOffset::from(offset + 1 + index as u64) } fn saturating_offset_to_index(&self, offset: LogletOffset) -> usize { - let trim_point = self.trim_point_offset.load(Ordering::Acquire); + let trim_point = self.trim_point_offset.load(Ordering::Relaxed); (offset.0.saturating_sub(trim_point) - 1) as usize } - pub fn advance_commit_offset(&self, offset: LogletOffset) { + fn advance_commit_offset(&self, offset: LogletOffset) { self.last_committed_offset - .fetch_max(offset.into(), Ordering::Release); + .fetch_max(offset.into(), Ordering::Relaxed); self.notify_readers(); } @@ -152,7 +152,7 @@ impl MemoryLoglet { from_offset: LogletOffset, ) -> Result>, OperationError> { let guard = self.log.lock().unwrap(); - let trim_point = LogletOffset(self.trim_point_offset.load(Ordering::Acquire)); + let trim_point = LogletOffset(self.trim_point_offset.load(Ordering::Relaxed)); let head_offset = trim_point.next(); // Are we reading behind the loglet head? if from_offset < head_offset { @@ -160,7 +160,7 @@ impl MemoryLoglet { } // are we reading after commit offset? - let commit_offset = LogletOffset(self.last_committed_offset.load(Ordering::Acquire)); + let commit_offset = LogletOffset(self.last_committed_offset.load(Ordering::Relaxed)); if from_offset > commit_offset { Ok(None) } else { @@ -180,14 +180,20 @@ struct MemoryReadStream { read_pointer: LogletOffset, #[pin] release_watch: WatchStream, - // how far we are allowed to read in the loglet + /// how far we are allowed to read in the loglet release_pointer: LogletOffset, + /// Last offset to read before terminating the stream. None means "tailing" reader. + read_to: Option, #[pin] terminated: bool, } impl MemoryReadStream { - async fn create(loglet: Arc, from_offset: LogletOffset) -> Self { + async fn create( + loglet: Arc, + from_offset: LogletOffset, + to: Option, + ) -> Self { let mut release_watch = loglet.release_watch.to_stream(); let release_pointer = release_watch .next() @@ -199,6 +205,7 @@ impl MemoryReadStream { read_pointer: from_offset, release_watch, release_pointer, + read_to: to, terminated: false, } } @@ -227,9 +234,16 @@ impl Stream for MemoryReadStream { } let next_offset = self.read_pointer; + loop { let mut this = self.as_mut().project(); + // We have reached the limit we are allowed to read + if this.read_to.is_some_and(|read_to| next_offset > read_to) { + this.terminated.set(true); + return Poll::Ready(None); + } + // Are we reading after commit offset? // We are at tail. We need to wait until new records have been released. if next_offset > *this.release_pointer { @@ -294,12 +308,16 @@ impl LogletBase for MemoryLoglet { async fn create_read_stream( self: Arc, from: Self::Offset, + to: Option, ) -> Result> { - Ok(Box::pin(MemoryReadStream::create(self, from).await)) + Ok(Box::pin(MemoryReadStream::create(self, from, to).await)) } async fn append(&self, payload: Bytes) -> Result { let mut log = self.log.lock().unwrap(); + if self.sealed.load(Ordering::Relaxed) { + return Err(AppendError::Sealed); + } let offset = self.index_to_offset(log.len()); debug!( "Appending record to in-memory loglet {:?} at offset {}", @@ -307,14 +325,17 @@ impl LogletBase for MemoryLoglet { ); log.push(payload); // mark as committed immediately. - let offset = LogletOffset(self.last_committed_offset.load(Ordering::Acquire)).next(); + let offset = LogletOffset(self.last_committed_offset.load(Ordering::Relaxed)).next(); self.advance_commit_offset(offset); Ok(offset) } async fn append_batch(&self, payloads: &[Bytes]) -> Result { let mut log = self.log.lock().unwrap(); - let offset = LogletOffset(self.last_committed_offset.load(Ordering::Acquire)).next(); + if self.sealed.load(Ordering::Relaxed) { + return Err(AppendError::Sealed); + } + let offset = LogletOffset(self.last_committed_offset.load(Ordering::Relaxed)).next(); let first_offset = offset; let num_payloads = payloads.len(); for payload in payloads { @@ -330,8 +351,9 @@ impl LogletBase for MemoryLoglet { } async fn find_tail(&self) -> Result, OperationError> { - let committed = LogletOffset(self.last_committed_offset.load(Ordering::Acquire)).next(); - let sealed = self.sealed.load(Ordering::Acquire); + let _guard = self.log.lock().unwrap(); + let sealed = self.sealed.load(Ordering::Relaxed); + let committed = LogletOffset(self.last_committed_offset.load(Ordering::Relaxed)).next(); Ok(if sealed { TailState::Sealed(committed) } else { @@ -341,6 +363,7 @@ impl LogletBase for MemoryLoglet { /// Find the head (oldest) record in the loglet. async fn get_trim_point(&self) -> Result, OperationError> { + let _guard = self.log.lock().unwrap(); let current_trim_point = LogletOffset(self.trim_point_offset.load(Ordering::Relaxed)); if current_trim_point == LogletOffset::INVALID { @@ -351,12 +374,11 @@ impl LogletBase for MemoryLoglet { } async fn trim(&self, new_trim_point: Self::Offset) -> Result<(), OperationError> { + let mut log = self.log.lock().unwrap(); let actual_trim_point = new_trim_point.min(LogletOffset( self.last_committed_offset.load(Ordering::Relaxed), )); - let mut log = self.log.lock().unwrap(); - let current_trim_point = LogletOffset(self.trim_point_offset.load(Ordering::Relaxed)); if current_trim_point >= actual_trim_point { @@ -371,6 +393,13 @@ impl LogletBase for MemoryLoglet { Ok(()) } + async fn seal(&self) -> Result<(), OperationError> { + // Ensures no in-flight operations are taking place. + let _guard = self.log.lock().unwrap(); + self.sealed.store(true, Ordering::Relaxed); + Ok(()) + } + async fn read_next_single( &self, from: LogletOffset, @@ -416,4 +445,17 @@ mod tests { let loglet = MemoryLoglet::new(LogletParams::from("112".to_string())); single_loglet_readstream_test_with_trims(loglet).await } + + #[tokio::test(start_paused = true)] + async fn memory_loglet_test_append_after_seal() -> googletest::Result<()> { + let loglet = MemoryLoglet::new(LogletParams::from("112".to_string())); + loglet_test_append_after_seal(loglet).await + } + + // multi-threaded to check correctness under parallel conditions + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn memory_loglet_test_append_after_seal_concurrent() -> googletest::Result<()> { + let loglet = MemoryLoglet::new(LogletParams::from("112".to_string())); + loglet_test_append_after_seal_concurrent(loglet).await + } } From b549a07b137680eae13e5d575047f5648870209f Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Mon, 22 Jul 2024 16:03:46 +0100 Subject: [PATCH 4/6] [Bifrost] Implements seal() for LocalLoglet --- crates/bifrost/src/error.rs | 3 +- crates/bifrost/src/loglet/error.rs | 11 +- crates/bifrost/src/loglet/loglet_tests.rs | 28 +++- .../src/providers/local_loglet/log_state.rs | 19 +-- .../local_loglet/log_store_writer.rs | 23 ++- .../bifrost/src/providers/local_loglet/mod.rs | 154 ++++++++++++++++-- crates/bifrost/src/read_stream.rs | 1 - crates/bifrost/src/record.rs | 5 +- crates/bifrost/src/types.rs | 10 -- crates/worker/src/partition/mod.rs | 3 - 10 files changed, 198 insertions(+), 59 deletions(-) diff --git a/crates/bifrost/src/error.rs b/crates/bifrost/src/error.rs index 3c3377e60..481f374ef 100644 --- a/crates/bifrost/src/error.rs +++ b/crates/bifrost/src/error.rs @@ -14,7 +14,6 @@ use std::sync::Arc; use restate_types::logs::{LogId, Lsn}; use crate::loglet::{LogletError, OperationError}; -use crate::types::SealReason; /// Result type for bifrost operations. pub type Result = std::result::Result; @@ -22,7 +21,7 @@ pub type Result = std::result::Result; #[derive(thiserror::Error, Debug)] pub enum Error { #[error("log '{0}' is sealed")] - LogSealed(LogId, SealReason), + LogSealed(LogId), #[error("unknown log '{0}'")] UnknownLogId(LogId), #[error("invalid log sequence number '{0}'")] diff --git a/crates/bifrost/src/loglet/error.rs b/crates/bifrost/src/loglet/error.rs index 69abeed3b..8f2dca9b4 100644 --- a/crates/bifrost/src/loglet/error.rs +++ b/crates/bifrost/src/loglet/error.rs @@ -37,7 +37,7 @@ impl AppendError { } } -#[derive(Debug, thiserror::Error)] +#[derive(Debug, Clone, thiserror::Error)] pub enum OperationError { #[error(transparent)] Shutdown(#[from] ShutdownError), @@ -59,6 +59,15 @@ impl OperationError { } } +impl From for AppendError { + fn from(value: OperationError) -> Self { + match value { + OperationError::Shutdown(s) => AppendError::Shutdown(s), + OperationError::Other(o) => AppendError::Other(o), + } + } +} + // -- Helper Types -- /// Represents a type-erased error from the loglet provider. diff --git a/crates/bifrost/src/loglet/loglet_tests.rs b/crates/bifrost/src/loglet/loglet_tests.rs index c8f635da5..9c491173e 100644 --- a/crates/bifrost/src/loglet/loglet_tests.rs +++ b/crates/bifrost/src/loglet/loglet_tests.rs @@ -437,7 +437,7 @@ pub async fn loglet_test_append_after_seal_concurrent( ) -> googletest::Result<()> { use futures::TryStreamExt as _; - const WARMUP_APPENDS: usize = 1000; + const WARMUP_APPENDS: usize = 200; const CONCURRENT_APPENDERS: usize = 20; setup(); @@ -487,6 +487,20 @@ pub async fn loglet_test_append_after_seal_concurrent( }); } + let first_observed_seal = tokio::task::spawn({ + let loglet = loglet.clone(); + async move { + loop { + let res = loglet.find_tail().await.expect("find_tail succeeds"); + if res.is_sealed() { + return res.offset(); + } + // give a chance to other tasks to work + tokio::task::yield_now().await; + } + } + }); + // Wait for some warmup appends println!( "Awaiting all appenders to reach at least {} appends", @@ -507,7 +521,11 @@ pub async fn loglet_test_append_after_seal_concurrent( let tail = loglet.find_tail().await?; assert!(tail.is_sealed()); - println!("Sealed tail: {:?}", tail); + let first_observed_seal = first_observed_seal.await?; + println!( + "Sealed tail={:?}, first observed seal at={}", + tail, first_observed_seal + ); let mut all_committed = BTreeSet::new(); while let Some(handle) = appenders.join_next().await { @@ -519,7 +537,7 @@ pub async fn loglet_test_append_after_seal_concurrent( // tail must be beyond seal point assert!(tail.offset() > tail_record); println!( - "Committed len: {}, last appended was {}", + "Committed len={}, last appended={}", committed_len, tail_record ); // ensure that all committed records are unique @@ -529,6 +547,10 @@ pub async fn loglet_test_append_after_seal_concurrent( } } + // All (acknowledged) appends must have offsets less than the tail observed at the first + // Sealed() response of find_tail() + assert!(first_observed_seal > *all_committed.last().unwrap()); + let reader = loglet .clone() .create_read_stream(LogletOffset::OLDEST, Some(tail.offset().prev())) diff --git a/crates/bifrost/src/providers/local_loglet/log_state.rs b/crates/bifrost/src/providers/local_loglet/log_state.rs index a07ef6de6..8cd55df83 100644 --- a/crates/bifrost/src/providers/local_loglet/log_state.rs +++ b/crates/bifrost/src/providers/local_loglet/log_state.rs @@ -18,7 +18,6 @@ use tracing::{error, trace, warn}; use super::keys::{MetadataKey, MetadataKind}; use crate::loglet::LogletOffset; -use crate::SealReason; use super::LogStoreError; @@ -36,7 +35,7 @@ pub struct LogStateUpdates { enum LogStateUpdate { ReleasePointer(u64), TrimPoint(u64), - Seal(SealReason), + Seal, } impl LogStateUpdates { @@ -58,9 +57,8 @@ impl LogStateUpdates { self } - #[allow(dead_code)] - pub fn seal(mut self, reason: SealReason) -> Self { - self.updates.push(LogStateUpdate::Seal(reason)); + pub fn seal(mut self) -> Self { + self.updates.push(LogStateUpdate::Seal); self } } @@ -87,7 +85,7 @@ flexbuffers_storage_encode_decode!(LogStateUpdates); pub struct LogState { pub release_pointer: u64, pub trim_point: u64, - pub seal: Option, + pub seal: bool, } impl LogState { @@ -148,12 +146,8 @@ pub fn log_state_full_merge( // trim point can only move forward log_state.trim_point = log_state.trim_point.max(offset); } - LogStateUpdate::Seal(reason) => { - // A log cannot be sealed twice. - if log_state.seal.is_none() { - // trim point can only move forward - log_state.seal = Some(reason); - } + LogStateUpdate::Seal => { + log_state.seal = true; } } } @@ -189,6 +183,7 @@ pub fn log_state_partial_merge( Ok(updates) => updates, }; + // todo (asoli): actually merge updates merged.updates.append(&mut updates.updates); } match merged.to_bytes() { diff --git a/crates/bifrost/src/providers/local_loglet/log_store_writer.rs b/crates/bifrost/src/providers/local_loglet/log_store_writer.rs index 04c290d27..7f7f98f9a 100644 --- a/crates/bifrost/src/providers/local_loglet/log_store_writer.rs +++ b/crates/bifrost/src/providers/local_loglet/log_store_writer.rs @@ -33,11 +33,11 @@ use super::log_store::{DATA_CF, METADATA_CF}; use super::metric_definitions::{ BIFROST_LOCAL_WRITE_BATCH_COUNT, BIFROST_LOCAL_WRITE_BATCH_SIZE_BYTES, }; -use crate::loglet::{AppendError, LogletOffset}; +use crate::loglet::{LogletOffset, OperationError}; use crate::SMALL_BATCH_THRESHOLD_COUNT; -type Ack = oneshot::Sender>; -type AckRecv = oneshot::Receiver>; +type Ack = oneshot::Sender>; +type AckRecv = oneshot::Receiver>; pub struct LogStoreWriteCommand { log_id: u64, @@ -266,14 +266,14 @@ impl LogStoreWriter { if let Err(e) = result { error!("Failed to commit local loglet write batch: {}", e); - self.send_acks(Err(AppendError::terminal(e))); + self.send_acks(Err(OperationError::terminal(e))); return; } self.send_acks(Ok(())); } - fn send_acks(&mut self, result: Result<(), AppendError>) { + fn send_acks(&mut self, result: Result<(), OperationError>) { self.batch_acks_buf.drain(..).for_each(|a| { let _ = a.send(result.clone()); }); @@ -295,6 +295,19 @@ impl RocksDbLogWriterHandle { self.enqueue_put_records(log_id, offset, &[data]).await } + pub async fn enqueue_seal(&self, log_id: u64) -> Result { + let (ack, receiver) = oneshot::channel(); + let log_state_updates = Some(LogStateUpdates::default().seal()); + self.send_command(LogStoreWriteCommand { + log_id, + data_updates: Default::default(), + log_state_updates, + ack: Some(ack), + }) + .await?; + Ok(receiver) + } + pub async fn enqueue_put_records( &self, log_id: u64, diff --git a/crates/bifrost/src/providers/local_loglet/mod.rs b/crates/bifrost/src/providers/local_loglet/mod.rs index 5281a3127..8a67a1fa3 100644 --- a/crates/bifrost/src/providers/local_loglet/mod.rs +++ b/crates/bifrost/src/providers/local_loglet/mod.rs @@ -26,7 +26,7 @@ use restate_types::logs::SequenceNumber; use tokio::sync::Mutex; use tracing::{debug, warn}; -use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::Arc; use crate::loglet::{ @@ -35,7 +35,7 @@ use crate::loglet::{ use crate::providers::local_loglet::metric_definitions::{ BIFROST_LOCAL_TRIM, BIFROST_LOCAL_TRIM_LENGTH, }; -use crate::{LogRecord, Result, SealReason, TailState}; +use crate::{LogRecord, Result, TailState}; use self::keys::RecordKey; use self::log_store::RocksDbLogStore; @@ -55,8 +55,7 @@ struct LocalLoglet { // In local loglet, the release point == the last committed offset last_committed_offset: AtomicU64, next_write_offset: Mutex, - #[allow(dead_code)] - seal: Option, + sealed: AtomicBool, release_watch: OffsetWatch, append_latency: Histogram, } @@ -68,7 +67,7 @@ impl std::fmt::Debug for LocalLoglet { .field("trim_point_offset", &self.trim_point_offset) .field("last_committed_offset", &self.last_committed_offset) .field("next_write_offset", &self.next_write_offset) - .field("seal", &self.seal) + .field("sealed", &self.sealed) .finish() } } @@ -91,7 +90,7 @@ impl LocalLoglet { let next_write_offset_raw = log_state.release_pointer + 1; let next_write_offset = Mutex::new(LogletOffset::from(next_write_offset_raw)); let release_pointer = LogletOffset::from(log_state.release_pointer); - let seal = log_state.seal; + let sealed = AtomicBool::new(log_state.seal); let append_latency = histogram!(BIFROST_LOCAL_APPEND_DURATION); let loglet = Self { log_id, @@ -101,7 +100,7 @@ impl LocalLoglet { trim_point_lock: Mutex::new(()), next_write_offset, last_committed_offset, - seal, + sealed, release_watch: OffsetWatch::new(release_pointer), append_latency, }; @@ -116,8 +115,7 @@ impl LocalLoglet { } #[inline] - fn notify_readers(&self) { - let release_pointer = LogletOffset(self.last_committed_offset.load(Ordering::Relaxed)); + fn notify_readers(&self, release_pointer: LogletOffset) { self.release_watch.notify(release_pointer); } @@ -203,6 +201,13 @@ impl LogletBase for LocalLoglet { } async fn append(&self, payload: Bytes) -> Result { + // An initial check if we are sealed or not, we are not worried about accepting an + // append while sealing is taking place. We only care about *not* acknowledging + // it if we lost the race and the seal was completed while waiting on this append. + if self.sealed.load(Ordering::Relaxed) { + return Err(AppendError::Sealed); + } + counter!(BIFROST_LOCAL_APPEND).increment(1); let start_time = std::time::Instant::now(); // We hold the lock to ensure that offsets are enqueued in the order of @@ -230,14 +235,29 @@ impl LogletBase for LocalLoglet { Err(ShutdownError.into()) })?; - self.last_committed_offset - .fetch_max(offset.into(), Ordering::Relaxed); - self.notify_readers(); + let release_pointer = LogletOffset::from( + self.last_committed_offset + .fetch_max(offset.into(), Ordering::AcqRel) + .max(offset.into()), + ); + self.notify_readers(release_pointer); + // Ensure that we don't acknowledge the append (even that it has happened) if the loglet + // has been sealed already. + if self.sealed.load(Ordering::Relaxed) { + return Err(AppendError::Sealed); + } self.append_latency.record(start_time.elapsed()); Ok(offset) } async fn append_batch(&self, payloads: &[Bytes]) -> Result { + // An initial check if we are sealed or not, we are not worried about accepting an + // append while sealing is taking place. We only care about *not* acknowledging + // it if we lost the race and the seal was completed while waiting on this append. + if self.sealed.load(Ordering::Relaxed) { + return Err(AppendError::Sealed); + } + let num_payloads = payloads.len(); counter!(BIFROST_LOCAL_APPEND).increment(num_payloads as u64); let start_time = std::time::Instant::now(); @@ -264,17 +284,29 @@ impl LogletBase for LocalLoglet { Err(ShutdownError.into()) })?; - self.last_committed_offset - .fetch_max(offset.into(), Ordering::Relaxed); - self.notify_readers(); + // AcqRel to ensure that the offset is visible to other threads and to synchronize sealed + // with find_tail. + let release_pointer = LogletOffset::from( + self.last_committed_offset + .fetch_max(offset.into(), Ordering::AcqRel) + .max(offset.into()), + ); + self.notify_readers(release_pointer); + // Ensure that we don't acknowledge the append (albeit durable) if the loglet + // has been sealed already. + if self.sealed.load(Ordering::Relaxed) { + return Err(AppendError::Sealed); + } self.append_latency.record(start_time.elapsed()); Ok(offset) } async fn find_tail(&self) -> Result, OperationError> { + // `fetch_add(0)` with Release ordering to enforce using last_committed_offset as a memory + // barrier and synchronization point with other threads. let last_committed = - LogletOffset::from(self.last_committed_offset.load(Ordering::Relaxed)).next(); - Ok(if self.seal.is_some() { + LogletOffset::from(self.last_committed_offset.fetch_add(0, Ordering::Release)).next(); + Ok(if self.sealed.load(Ordering::Relaxed) { TailState::Sealed(last_committed) } else { TailState::Open(last_committed) @@ -329,7 +361,16 @@ impl LogletBase for LocalLoglet { } async fn seal(&self) -> Result<(), OperationError> { - todo!() + if self.sealed.load(Ordering::Relaxed) { + return Ok(()); + } + let receiver = self.log_writer.enqueue_seal(self.log_id).await?; + let _ = receiver.await.unwrap_or_else(|_| { + warn!("Unsure if the local loglet record was sealed, the ack channel was dropped"); + Err(ShutdownError.into()) + })?; + self.sealed.store(true, Ordering::Relaxed); + Ok(()) } async fn read_next_single( @@ -491,4 +532,81 @@ mod tests { }) .await } + #[tokio::test(start_paused = true)] + async fn local_loglet_test_append_after_seal() -> googletest::Result<()> { + let node_env = TestCoreEnvBuilder::new_with_mock_network() + .set_provider_kind(ProviderKind::Local) + .build() + .await; + + node_env + .tc + .run_in_scope("test", None, async { + let config = Live::from_value(Configuration::default()); + RocksDbManager::init(config.clone().map(|c| &c.common)); + let params = LogletParams::from("99".to_string()); + + let log_store = RocksDbLogStore::create( + &config.pinned().bifrost.local, + config.clone().map(|c| &c.bifrost.local.rocksdb).boxed(), + ) + .await?; + + let log_writer = log_store + .create_writer() + .start(config.clone().map(|c| &c.bifrost.local).boxed())?; + + let loglet = Arc::new( + LocalLoglet::create( + params + .id() + .parse() + .expect("loglet params can be converted into u64"), + log_store, + log_writer, + ) + .await?, + ); + + loglet_test_append_after_seal(loglet).await?; + Ok(()) + }) + .await + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn local_loglet_test_append_after_seal_concurrent() -> googletest::Result<()> { + let node_env = TestCoreEnvBuilder::new_with_mock_network() + .set_provider_kind(ProviderKind::Local) + .build() + .await; + + node_env + .tc + .run_in_scope("test", None, async { + let config = Live::from_value(Configuration::default()); + RocksDbManager::init(config.clone().map(|c| &c.common)); + + let log_store = RocksDbLogStore::create( + &config.pinned().bifrost.local, + config.clone().map(|c| &c.bifrost.local.rocksdb).boxed(), + ) + .await?; + + let log_writer = log_store + .create_writer() + .start(config.clone().map(|c| &c.bifrost.local).boxed())?; + + // Run the test 10 times + for i in 1..=10 { + let loglet = Arc::new( + LocalLoglet::create(i, log_store.clone(), log_writer.clone()).await?, + ); + loglet_test_append_after_seal_concurrent(loglet).await?; + } + + Ok(()) + }) + .await + } } diff --git a/crates/bifrost/src/read_stream.rs b/crates/bifrost/src/read_stream.rs index ef799f4ea..8ade70011 100644 --- a/crates/bifrost/src/read_stream.rs +++ b/crates/bifrost/src/read_stream.rs @@ -99,7 +99,6 @@ impl LogReadStream { // skips over the boundary of the gap. crate::Record::TrimGap(trim_gap) => trim_gap.to, crate::Record::Data(_) => record.offset, - crate::Record::Seal(_) => record.offset, } .next() } diff --git a/crates/bifrost/src/record.rs b/crates/bifrost/src/record.rs index 5db7d64c1..5c68437dc 100644 --- a/crates/bifrost/src/record.rs +++ b/crates/bifrost/src/record.rs @@ -12,7 +12,7 @@ use bytes::Bytes; use restate_types::logs::{Lsn, Payload, SequenceNumber}; use restate_types::storage::{StorageCodec, StorageDecodeError}; -use crate::{LsnExt, SealReason}; +use crate::LsnExt; /// A single entry in the log. #[derive(Debug, Clone, PartialEq)] @@ -43,7 +43,6 @@ impl LogRecord { to: base_lsn.offset_by(to), }), Record::Data(payload) => Record::Data(payload), - Record::Seal(reason) => Record::Seal(reason), }; LogRecord { @@ -58,7 +57,6 @@ impl LogRecord { let record = match self.record { Record::Data(mut payload) => Record::Data(StorageCodec::decode(&mut payload)?), Record::TrimGap(t) => Record::TrimGap(t), - Record::Seal(reason) => Record::Seal(reason), }; Ok(LogRecord { offset: self.offset, @@ -71,7 +69,6 @@ impl LogRecord { pub enum Record { TrimGap(TrimGap), Data(D), - Seal(SealReason), } impl Record { diff --git a/crates/bifrost/src/types.rs b/crates/bifrost/src/types.rs index 0203fd138..cbb632512 100644 --- a/crates/bifrost/src/types.rs +++ b/crates/bifrost/src/types.rs @@ -13,7 +13,6 @@ use crate::loglet::LogletOffset; use restate_types::logs::{Lsn, SequenceNumber}; -use serde::{Deserialize, Serialize}; // Only implemented for LSNs pub(crate) trait LsnExt: SequenceNumber { @@ -65,15 +64,6 @@ pub(crate) trait LsnExt: SequenceNumber { impl LsnExt for Lsn {} -/// Details about why a log was sealed -#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)] -pub enum SealReason { - /// Log was sealed to perform a repartitioning operation (split or unsplit). - /// The reader/writer need to figure out where to read/write next. - Resharding, - Other(String), -} - #[derive(Debug, Clone, Default)] pub struct FindTailAttributes { // Ensure that we are reading the most recent metadata. This should be used when diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index 6729f01ed..06e86dd09 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -255,9 +255,6 @@ where Record::TrimGap(_) => { unimplemented!("Currently not supported") } - Record::Seal(_) => { - unimplemented!("Currently not supported") - } } }); From 1e62260f8f233338c35ac207e6cb24c459088b9c Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Mon, 22 Jul 2024 16:03:46 +0100 Subject: [PATCH 5/6] [Bifrost] Init single-node loglets with random loglet ids In preparation to allow multi-segment chains, local loglets in single-node bootstrap now get unique random ids. --- crates/bifrost/benches/util.rs | 2 +- .../src/providers/local_loglet/keys.rs | 32 ++++----- .../src/providers/local_loglet/log_store.rs | 4 +- .../local_loglet/log_store_writer.rs | 28 ++++---- .../bifrost/src/providers/local_loglet/mod.rs | 40 +++++------ .../src/providers/local_loglet/provider.rs | 11 +-- .../src/providers/local_loglet/read_stream.rs | 10 +-- crates/core/src/test_env.rs | 4 +- crates/node/src/lib.rs | 4 +- crates/types/Cargo.toml | 2 +- crates/types/src/config/bifrost.rs | 10 +++ crates/types/src/logs/builder.rs | 3 +- crates/types/src/logs/metadata.rs | 72 ++++++++++++++----- tools/bifrost-benchpress/src/main.rs | 2 +- 14 files changed, 133 insertions(+), 91 deletions(-) diff --git a/crates/bifrost/benches/util.rs b/crates/bifrost/benches/util.rs index e765227da..fce2151ec 100644 --- a/crates/bifrost/benches/util.rs +++ b/crates/bifrost/benches/util.rs @@ -48,7 +48,7 @@ pub async fn spawn_environment( RocksDbManager::init(Constant::new(config.common)) }); - let logs = restate_types::logs::metadata::create_static_metadata(provider, num_logs); + let logs = restate_types::logs::metadata::bootstrap_logs_metadata(provider, num_logs); metadata_store_client .put(BIFROST_CONFIG_KEY.clone(), logs.clone(), Precondition::None) diff --git a/crates/bifrost/src/providers/local_loglet/keys.rs b/crates/bifrost/src/providers/local_loglet/keys.rs index d758b41ba..6b623b115 100644 --- a/crates/bifrost/src/providers/local_loglet/keys.rs +++ b/crates/bifrost/src/providers/local_loglet/keys.rs @@ -20,18 +20,18 @@ pub(crate) const DATA_KEY_PREFIX_LENGTH: usize = size_of::() + size_of:: Self { - Self { log_id, offset } + pub fn new(loglet_id: u64, offset: LogletOffset) -> Self { + Self { loglet_id, offset } } - pub fn upper_bound(log_id: u64) -> Self { + pub fn upper_bound(loglet_id: u64) -> Self { Self { - log_id, + loglet_id, offset: LogletOffset::MAX, } } @@ -39,7 +39,7 @@ impl RecordKey { pub fn to_bytes(self) -> Bytes { let mut buf = BytesMut::with_capacity(size_of::() + 1); buf.put_u8(b'd'); - buf.put_u64(self.log_id); + buf.put_u64(self.loglet_id); buf.put_u64(self.offset.into()); buf.freeze() } @@ -48,9 +48,9 @@ impl RecordKey { let mut data = data; let c = data.get_u8(); debug_assert_eq!(c, b'd'); - let log_id = data.get_u64(); + let loglet_id = data.get_u64(); let offset = LogletOffset::from(data.get_u64()); - Self { log_id, offset } + Self { loglet_id, offset } } } @@ -64,20 +64,20 @@ pub enum MetadataKind { #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct MetadataKey { - pub log_id: u64, + pub loglet_id: u64, pub kind: MetadataKind, } impl MetadataKey { - pub fn new(log_id: u64, kind: MetadataKind) -> Self { - Self { log_id, kind } + pub fn new(loglet_id: u64, kind: MetadataKind) -> Self { + Self { loglet_id, kind } } pub fn to_bytes(self) -> Bytes { let mut buf = BytesMut::with_capacity(size_of::() + 1); // m for metadata buf.put_u8(b'm'); - buf.put_u64(self.log_id); + buf.put_u64(self.loglet_id); buf.put_u8(self.kind as u8); buf.freeze() } @@ -86,11 +86,11 @@ impl MetadataKey { let mut data = Bytes::copy_from_slice(data); let c = data.get_u8(); debug_assert_eq!(c, b'm'); - let log_id = data.get_u64(); + let loglet_id = data.get_u64(); let kind = MetadataKind::from_repr(data.get_u8()); let kind = kind.unwrap_or_default(); - Self { log_id, kind } + Self { loglet_id, kind } } } @@ -111,12 +111,12 @@ mod tests { #[test] fn test_metadata_key() { let key = MetadataKey::new(1, MetadataKind::LogState); - assert_eq!(key.log_id, 1); + assert_eq!(key.loglet_id, 1); assert_eq!(key.kind, MetadataKind::LogState); let bytes = key.to_bytes(); let key2 = MetadataKey::from_slice(&bytes); assert_eq!(key, key2); - assert_eq!(key2.log_id, 1); + assert_eq!(key2.loglet_id, 1); assert_eq!(key2.kind, MetadataKind::LogState); } } diff --git a/crates/bifrost/src/providers/local_loglet/log_store.rs b/crates/bifrost/src/providers/local_loglet/log_store.rs index 765235065..ffaf23d9d 100644 --- a/crates/bifrost/src/providers/local_loglet/log_store.rs +++ b/crates/bifrost/src/providers/local_loglet/log_store.rs @@ -109,11 +109,11 @@ impl RocksDbLogStore { .expect("METADATA_CF exists") } - pub fn get_log_state(&self, log_id: u64) -> Result, LogStoreError> { + pub fn get_log_state(&self, loglet_id: u64) -> Result, LogStoreError> { let metadata_cf = self.metadata_cf(); let value = self.rocksdb.inner().as_raw_db().get_pinned_cf( &metadata_cf, - MetadataKey::new(log_id, MetadataKind::LogState).to_bytes(), + MetadataKey::new(loglet_id, MetadataKind::LogState).to_bytes(), )?; if let Some(value) = value { diff --git a/crates/bifrost/src/providers/local_loglet/log_store_writer.rs b/crates/bifrost/src/providers/local_loglet/log_store_writer.rs index 7f7f98f9a..f36f16b0a 100644 --- a/crates/bifrost/src/providers/local_loglet/log_store_writer.rs +++ b/crates/bifrost/src/providers/local_loglet/log_store_writer.rs @@ -40,7 +40,7 @@ type Ack = oneshot::Sender>; type AckRecv = oneshot::Receiver>; pub struct LogStoreWriteCommand { - log_id: u64, + loglet_id: u64, data_updates: SmallVec<[DataUpdate; SMALL_BATCH_THRESHOLD_COUNT]>, log_state_updates: Option, ack: Option, @@ -155,7 +155,7 @@ impl LogStoreWriter { DataUpdate::PutRecord { offset, data } => Self::put_record( &data_cf, &mut write_batch, - command.log_id, + command.loglet_id, offset, data, ), @@ -165,7 +165,7 @@ impl LogStoreWriter { } => Self::trim_log( &data_cf, &mut write_batch, - command.log_id, + command.loglet_id, old_trim_point, new_trim_point, ), @@ -178,7 +178,7 @@ impl LogStoreWriter { Self::update_log_state( &metadata_cf, &mut write_batch, - command.log_id, + command.loglet_id, logstate_updates, buffer, ) @@ -198,14 +198,14 @@ impl LogStoreWriter { fn update_log_state( metadata_cf: &Arc, write_batch: &mut WriteBatch, - log_id: u64, + loglet_id: u64, updates: LogStateUpdates, buffer: &mut BytesMut, ) { updates.encode(buffer).expect("encode"); write_batch.merge_cf( metadata_cf, - MetadataKey::new(log_id, MetadataKind::LogState).to_bytes(), + MetadataKey::new(loglet_id, MetadataKind::LogState).to_bytes(), buffer, ); } @@ -288,18 +288,18 @@ pub struct RocksDbLogWriterHandle { impl RocksDbLogWriterHandle { pub async fn enqueue_put_record( &self, - log_id: u64, + loglet_id: u64, offset: LogletOffset, data: Bytes, ) -> Result { - self.enqueue_put_records(log_id, offset, &[data]).await + self.enqueue_put_records(loglet_id, offset, &[data]).await } - pub async fn enqueue_seal(&self, log_id: u64) -> Result { + pub async fn enqueue_seal(&self, loglet_id: u64) -> Result { let (ack, receiver) = oneshot::channel(); let log_state_updates = Some(LogStateUpdates::default().seal()); self.send_command(LogStoreWriteCommand { - log_id, + loglet_id, data_updates: Default::default(), log_state_updates, ack: Some(ack), @@ -310,7 +310,7 @@ impl RocksDbLogWriterHandle { pub async fn enqueue_put_records( &self, - log_id: u64, + loglet_id: u64, mut start_offset: LogletOffset, records: &[Bytes], ) -> Result { @@ -328,7 +328,7 @@ impl RocksDbLogWriterHandle { let log_state_updates = Some(LogStateUpdates::default().update_release_pointer(start_offset.prev())); self.send_command(LogStoreWriteCommand { - log_id, + loglet_id, data_updates, log_state_updates, ack: Some(ack), @@ -339,7 +339,7 @@ impl RocksDbLogWriterHandle { pub async fn enqueue_trim( &self, - log_id: u64, + loglet_id: u64, old_trim_point: LogletOffset, new_trim_point: LogletOffset, ) -> Result<(), ShutdownError> { @@ -351,7 +351,7 @@ impl RocksDbLogWriterHandle { let log_state_updates = Some(LogStateUpdates::default().update_trim_point(new_trim_point)); self.send_command(LogStoreWriteCommand { - log_id, + loglet_id, data_updates, log_state_updates, ack: None, diff --git a/crates/bifrost/src/providers/local_loglet/mod.rs b/crates/bifrost/src/providers/local_loglet/mod.rs index 8a67a1fa3..31d40523b 100644 --- a/crates/bifrost/src/providers/local_loglet/mod.rs +++ b/crates/bifrost/src/providers/local_loglet/mod.rs @@ -45,7 +45,7 @@ use self::read_stream::LocalLogletReadStream; use crate::loglet::util::OffsetWatch; struct LocalLoglet { - log_id: u64, + loglet_id: u64, log_store: RocksDbLogStore, log_writer: RocksDbLogWriterHandle, // internal offset _before_ the loglet head. Loglet head is trim_point_offset.next() @@ -63,7 +63,7 @@ struct LocalLoglet { impl std::fmt::Debug for LocalLoglet { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("LocalLoglet") - .field("log_id", &self.log_id) + .field("loglet_id", &self.loglet_id) .field("trim_point_offset", &self.trim_point_offset) .field("last_committed_offset", &self.last_committed_offset) .field("next_write_offset", &self.next_write_offset) @@ -74,13 +74,13 @@ impl std::fmt::Debug for LocalLoglet { impl LocalLoglet { pub async fn create( - log_id: u64, + loglet_id: u64, log_store: RocksDbLogStore, log_writer: RocksDbLogWriterHandle, ) -> Result { // Fetch the log metadata from the store let log_state = log_store - .get_log_state(log_id) + .get_log_state(loglet_id) .map_err(OperationError::other)?; let log_state = log_state.unwrap_or_default(); @@ -93,7 +93,7 @@ impl LocalLoglet { let sealed = AtomicBool::new(log_state.seal); let append_latency = histogram!(BIFROST_LOCAL_APPEND_DURATION); let loglet = Self { - log_id, + loglet_id, log_store, log_writer, trim_point_offset, @@ -105,7 +105,7 @@ impl LocalLoglet { append_latency, }; debug!( - log_id = log_id, + loglet_id = loglet_id, release_pointer = %release_pointer, next_offset = next_write_offset_raw, "Local loglet started" @@ -136,10 +136,10 @@ impl LocalLoglet { if from_offset > commit_offset { Ok(None) } else { - let key = RecordKey::new(self.log_id, from_offset); + let key = RecordKey::new(self.loglet_id, from_offset); let data_cf = self.log_store.data_cf(); let mut read_opts = rocksdb::ReadOptions::default(); - read_opts.set_iterate_upper_bound(RecordKey::upper_bound(self.log_id).to_bytes()); + read_opts.set_iterate_upper_bound(RecordKey::upper_bound(self.loglet_id).to_bytes()); let mut iter = self.log_store.db().iterator_cf_opt( &data_cf, @@ -163,12 +163,12 @@ impl LocalLoglet { let (key, data) = record; let key = RecordKey::from_slice(&key); // Defensive, the upper_bound set on the iterator should prevent this. - if key.log_id != self.log_id { + if key.loglet_id != self.loglet_id { warn!( - log_id = self.log_id, - "read_from moved to the adjacent log {}, that should not happen.\ + loglet_id = self.loglet_id, + "read_from moved to the adjacent loglet {}, that should not happen.\ This is harmless but needs to be investigated!", - key.log_id, + key.loglet_id, ); return Ok(None); } @@ -220,7 +220,7 @@ impl LogletBase for LocalLoglet { let offset = *next_offset_guard; let receiver = self .log_writer - .enqueue_put_record(self.log_id, offset, payload) + .enqueue_put_record(self.loglet_id, offset, payload) .await?; // next offset points to the next available slot. *next_offset_guard = offset.next(); @@ -271,7 +271,7 @@ impl LogletBase for LocalLoglet { // lock acquired let receiver = self .log_writer - .enqueue_put_records(self.log_id, *next_offset_guard, payloads) + .enqueue_put_records(self.loglet_id, *next_offset_guard, payloads) .await?; // next offset points to the next available slot. *next_offset_guard = offset + num_payloads; @@ -350,7 +350,7 @@ impl LogletBase for LocalLoglet { .store(effective_trim_point.0, Ordering::Relaxed); self.log_writer - .enqueue_trim(self.log_id, current_trim_point, effective_trim_point) + .enqueue_trim(self.loglet_id, current_trim_point, effective_trim_point) .await?; histogram!(BIFROST_LOCAL_TRIM_LENGTH).record( @@ -364,7 +364,7 @@ impl LogletBase for LocalLoglet { if self.sealed.load(Ordering::Relaxed) { return Ok(()); } - let receiver = self.log_writer.enqueue_seal(self.log_id).await?; + let receiver = self.log_writer.enqueue_seal(self.loglet_id).await?; let _ = receiver.await.unwrap_or_else(|_| { warn!("Unsure if the local loglet record was sealed, the ack channel was dropped"); Err(ShutdownError.into()) @@ -434,7 +434,7 @@ mod tests { let loglet = Arc::new( LocalLoglet::create( params - .id() + .as_str() .parse() .expect("loglet params can be converted into u64"), log_store, @@ -476,7 +476,7 @@ mod tests { let loglet = Arc::new( LocalLoglet::create( params - .id() + .as_str() .parse() .expect("loglet params can be converted into u64"), log_store, @@ -518,7 +518,7 @@ mod tests { let loglet = Arc::new( LocalLoglet::create( params - .id() + .as_str() .parse() .expect("loglet params can be converted into u64"), log_store, @@ -559,7 +559,7 @@ mod tests { let loglet = Arc::new( LocalLoglet::create( params - .id() + .as_str() .parse() .expect("loglet params can be converted into u64"), log_store, diff --git a/crates/bifrost/src/providers/local_loglet/provider.rs b/crates/bifrost/src/providers/local_loglet/provider.rs index ede27e6d5..7e336f2cb 100644 --- a/crates/bifrost/src/providers/local_loglet/provider.rs +++ b/crates/bifrost/src/providers/local_loglet/provider.rs @@ -82,18 +82,13 @@ impl LogletProvider for LocalLogletProvider { params: &LogletParams, ) -> Result>, Error> { let mut guard = self.active_loglets.lock().await; - let loglet = match guard.entry(params.id().to_owned()) { + let loglet = match guard.entry(params.as_str().to_owned()) { hash_map::Entry::Vacant(entry) => { // Create loglet - // - // todo: Fix by making params richer config object. - // - // NOTE: We blatently assume that id() is a u64 unique id under the hood. Obviously - // this is a shortcut and should be fixed by making LogletParams a richer config - // object that can be used to create the loglet. + // NOTE: local-loglet expects params to be a `u64` string-encoded unique identifier under the hood. let loglet = LocalLoglet::create( params - .id() + .as_str() .parse() .expect("loglet params can be converted into u64"), self.log_store.clone(), diff --git a/crates/bifrost/src/providers/local_loglet/read_stream.rs b/crates/bifrost/src/providers/local_loglet/read_stream.rs index b5dde70d4..db720c885 100644 --- a/crates/bifrost/src/providers/local_loglet/read_stream.rs +++ b/crates/bifrost/src/providers/local_loglet/read_stream.rs @@ -71,7 +71,7 @@ impl LocalLogletReadStream { // We seek to next key on every iteration, we need to setup the iterator to be // at the previous key within the same prefix if from_offset > 0 (saturating to // LogletOffset::INVALID) - let key = RecordKey::new(loglet.log_id, from_offset.prev()); + let key = RecordKey::new(loglet.loglet_id, from_offset.prev()); let mut read_opts = rocksdb::ReadOptions::default(); read_opts.set_tailing(true); // In some cases, the underlying ForwardIterator will fail if it hits a `RangeDelete` tombstone. @@ -84,7 +84,7 @@ impl LocalLogletReadStream { read_opts.set_prefix_same_as_start(true); read_opts.set_total_order_seek(false); read_opts.set_iterate_lower_bound(key.to_bytes()); - read_opts.set_iterate_upper_bound(RecordKey::upper_bound(loglet.log_id).to_bytes()); + read_opts.set_iterate_upper_bound(RecordKey::upper_bound(loglet.loglet_id).to_bytes()); let log_store = &loglet.log_store; let mut release_watch = loglet.release_watch.to_stream(); @@ -107,7 +107,7 @@ impl LocalLogletReadStream { }; Ok(Self { - log_id: loglet.log_id, + log_id: loglet.loglet_id, loglet, read_pointer: from_offset, iterator: iter, @@ -236,12 +236,12 @@ impl Stream for LocalLogletReadStream { debug_assert_eq!(loaded_key.offset, key.offset); // Defensive, the upper_bound set on the iterator should prevent this. - if loaded_key.log_id != *this.log_id { + if loaded_key.loglet_id != *this.log_id { warn!( log_id = *this.log_id, "read_after moved to the adjacent log {}, that should not happen.\ This is harmless but needs to be investigated!", - key.log_id, + key.loglet_id, ); this.terminated.set(true); return Poll::Ready(None); diff --git a/crates/core/src/test_env.rs b/crates/core/src/test_env.rs index 4d06da6c0..790b5ef07 100644 --- a/crates/core/src/test_env.rs +++ b/crates/core/src/test_env.rs @@ -13,7 +13,7 @@ use std::sync::Arc; use tokio::sync::{mpsc, RwLock}; -use restate_types::logs::metadata::{create_static_metadata, ProviderKind}; +use restate_types::logs::metadata::{bootstrap_logs_metadata, ProviderKind}; use restate_types::metadata_store::keys::{ BIFROST_CONFIG_KEY, NODES_CONFIG_KEY, PARTITION_TABLE_KEY, }; @@ -288,7 +288,7 @@ where self.metadata_writer.submit(self.nodes_config.clone()); let logs = - create_static_metadata(self.provider_kind, self.partition_table.num_partitions()); + bootstrap_logs_metadata(self.provider_kind, self.partition_table.num_partitions()); self.metadata_store_client .put(BIFROST_CONFIG_KEY.clone(), logs.clone(), Precondition::None) .await diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index 55dcd0dee..53ac0c420 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -34,7 +34,7 @@ use restate_core::{task_center, TaskKind}; use restate_metadata_store::local::LocalMetadataStoreService; use restate_metadata_store::MetadataStoreClient; use restate_types::config::{CommonOptions, Configuration}; -use restate_types::logs::metadata::{create_static_metadata, Logs}; +use restate_types::logs::metadata::{bootstrap_logs_metadata, Logs}; use restate_types::metadata_store::keys::{ BIFROST_CONFIG_KEY, NODES_CONFIG_KEY, PARTITION_TABLE_KEY, }; @@ -461,7 +461,7 @@ impl Node { ) -> Result { Self::retry_on_network_error(|| { metadata_store_client.get_or_insert(BIFROST_CONFIG_KEY.clone(), || { - create_static_metadata(config.bifrost.default_provider, num_partitions) + bootstrap_logs_metadata(config.bifrost.default_provider, num_partitions) }) }) .await diff --git a/crates/types/Cargo.toml b/crates/types/Cargo.toml index 1c1a9210d..17df37098 100644 --- a/crates/types/Cargo.toml +++ b/crates/types/Cargo.toml @@ -45,7 +45,7 @@ prost-types = { workspace = true } rand = { workspace = true } regress = { workspace = true } schemars = { workspace = true, optional = true } -serde = { workspace = true, features = ["rc"] } +serde = { workspace = true } serde_json = { workspace = true } serde_with = { workspace = true } sha2 = { workspace = true } diff --git a/crates/types/src/config/bifrost.rs b/crates/types/src/config/bifrost.rs index 3316bd62c..d611fb8fb 100644 --- a/crates/types/src/config/bifrost.rs +++ b/crates/types/src/config/bifrost.rs @@ -32,6 +32,9 @@ use super::{CommonOptions, RocksDbOptions, RocksDbOptionsBuilder}; pub struct BifrostOptions { /// # The default kind of loglet to be used pub default_provider: ProviderKind, + /// An opaque string that gets passed to the loglet provider to seed the creation of new + /// loglets. + default_provider_config: Option, #[cfg_attr(feature = "schemars", schemars(with = "String"))] /// Configuration of local loglet provider pub local: LocalLogletOptions, @@ -41,10 +44,17 @@ pub struct BifrostOptions { pub replicated_loglet: ReplicatedLogletOptions, } +impl BifrostOptions { + pub fn default_provider_config(&self) -> Option<&str> { + self.default_provider_config.as_deref() + } +} + impl Default for BifrostOptions { fn default() -> Self { Self { default_provider: ProviderKind::Local, + default_provider_config: None, #[cfg(feature = "replicated-loglet")] replicated_loglet: ReplicatedLogletOptions::default(), local: LocalLogletOptions::default(), diff --git a/crates/types/src/logs/builder.rs b/crates/types/src/logs/builder.rs index ee7593ffb..ff460605a 100644 --- a/crates/types/src/logs/builder.rs +++ b/crates/types/src/logs/builder.rs @@ -9,7 +9,6 @@ // by the Apache License, Version 2.0. use std::ops::Deref; -use std::sync::Arc; use super::metadata::{Chain, LogletConfig, LogletParams, Logs, MaybeSegment, ProviderKind}; use super::{LogId, Lsn}; @@ -105,7 +104,7 @@ impl<'a> ChainBuilder<'a> { // validate that the base_lsn is higher than existing base_lsns. self.inner .chain - .insert(base_lsn, Arc::new(LogletConfig::new(provider, params))); + .insert(base_lsn, LogletConfig::new(provider, params)); Ok(()) } else { // can't add to the back. diff --git a/crates/types/src/logs/metadata.rs b/crates/types/src/logs/metadata.rs index 49d6ad9d9..b3adf5692 100644 --- a/crates/types/src/logs/metadata.rs +++ b/crates/types/src/logs/metadata.rs @@ -8,15 +8,18 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use crate::logs::{LogId, Lsn, SequenceNumber}; -use crate::{flexbuffers_storage_encode_decode, Version, Versioned}; +use std::collections::{BTreeMap, HashMap, HashSet}; + +use bytes::Bytes; +use bytestring::ByteString; use enum_map::Enum; +use rand::RngCore; use serde::{Deserialize, Serialize}; use serde_with::serde_as; -use std::collections::{BTreeMap, HashMap}; -use std::sync::Arc; use super::builder::LogsBuilder; +use crate::logs::{LogId, Lsn, SequenceNumber}; +use crate::{flexbuffers_storage_encode_decode, Version, Versioned}; /// Log metadata is the map of logs known to the system with the corresponding chain. /// Metadata updates are versioned and atomic. @@ -43,7 +46,7 @@ impl Default for Logs { pub struct Chain { // flexbuffers only supports string-keyed maps :-( --> so we store it as vector of kv pairs #[serde_as(as = "serde_with::Seq<(_, _)>")] - pub(super) chain: BTreeMap>, + pub(super) chain: BTreeMap, } #[derive(Debug, Clone)] @@ -53,7 +56,7 @@ pub struct Segment<'a> { /// record exists. It only means that we want to offset the loglet offsets by base_lsn - /// Loglet::Offset::OLDEST. pub base_lsn: Lsn, - pub config: &'a Arc, + pub config: &'a LogletConfig, } /// A segment in the chain of loglet instances. @@ -68,11 +71,17 @@ pub struct LogletConfig { /// and start-lsn. It's provided by bifrost on loglet creation. This allows the /// parameters to be shared between segments and logs if needed. #[derive(Debug, Clone, Hash, Eq, PartialEq, derive_more::From, Serialize, Deserialize)] -pub struct LogletParams(String); +pub struct LogletParams(ByteString); + +impl From for LogletParams { + fn from(value: String) -> Self { + Self(ByteString::from(value)) + } +} impl From<&'static str> for LogletParams { - fn from(value: &str) -> Self { - Self(value.to_owned()) + fn from(value: &'static str) -> Self { + Self(ByteString::from_static(value)) } } @@ -112,9 +121,13 @@ impl LogletConfig { } impl LogletParams { - pub fn id(&self) -> &str { + pub fn as_str(&self) -> &str { &self.0 } + + pub fn as_bytes(&self) -> &Bytes { + self.0.as_bytes() + } } impl Logs { @@ -171,7 +184,7 @@ impl Chain { /// Create a chain with `base_lsn` as its oldest Lsn. pub fn with_base_lsn(base_lsn: Lsn, kind: ProviderKind, config: LogletParams) -> Self { let mut chain = BTreeMap::new(); - chain.insert(base_lsn, Arc::new(LogletConfig::new(kind, config))); + chain.insert(base_lsn, LogletConfig::new(kind, config)); Self { chain } } @@ -235,18 +248,43 @@ impl Chain { } } +/// Creates appropriate [`LogletParams`] value that can be used to start a fresh +/// single-node loglet instance. +/// +/// This is used in single-node bootstrap scenarios and assumes a non-running system. +/// It must generate params that uniquely identify the new loglet instance on every call. +pub fn new_single_node_loglet_params(default_provider: ProviderKind) -> LogletParams { + let loglet_id = rand::thread_rng().next_u64().to_string(); + match default_provider { + ProviderKind::Local => LogletParams::from(loglet_id), + ProviderKind::InMemory => LogletParams::from(loglet_id), + #[cfg(feature = "replicated-loglet")] + ProviderKind::Replicated => panic!( + "replicated-loglet cannot be used as default-provider in a single-node setup.\ + To use replicated loglet, the node must be running in cluster-mode" + ), + } +} + /// Initializes the bifrost metadata with static log metadata, it creates a log for every partition /// with a chain of the default loglet provider kind. -pub fn create_static_metadata(default_provider: ProviderKind, num_partitions: u64) -> Logs { +pub fn bootstrap_logs_metadata(default_provider: ProviderKind, num_partitions: u64) -> Logs { // Get metadata from somewhere let mut builder = LogsBuilder::default(); - + #[allow(clippy::mutable_key_type)] + let mut generated_params: HashSet<_> = HashSet::new(); // pre-fill with all possible logs up to `num_partitions` (0..num_partitions).for_each(|i| { - // fixed config that uses the log-id as loglet identifier/config - let config = LogletParams::from(i.to_string()); + // a little paranoid about collisions + let params = loop { + let params = new_single_node_loglet_params(default_provider); + if !generated_params.contains(¶ms) { + generated_params.insert(params.clone()); + break params; + } + }; builder - .add_log(LogId::from(i), Chain::new(default_provider, config)) + .add_log(LogId::from(i), Chain::new(default_provider, params)) .unwrap(); }); @@ -264,6 +302,6 @@ mod tests { let Segment { base_lsn, config } = chain.tail(); assert_eq!(Lsn::OLDEST, base_lsn); assert_eq!(ProviderKind::Local, config.kind); - assert_eq!("test".to_string(), config.params.0); + assert_eq!("test".to_string(), config.params.0.to_string()); } } diff --git a/tools/bifrost-benchpress/src/main.rs b/tools/bifrost-benchpress/src/main.rs index d2dcc6bef..46e6659d4 100644 --- a/tools/bifrost-benchpress/src/main.rs +++ b/tools/bifrost-benchpress/src/main.rs @@ -151,7 +151,7 @@ fn spawn_environment(config: Live, num_logs: u64) -> (TaskCenter, RocksDbManager::init(config.clone().map(|c| &c.common)); - let logs = restate_types::logs::metadata::create_static_metadata( + let logs = restate_types::logs::metadata::bootstrap_logs_metadata( config.pinned().bifrost.default_provider, num_logs, ); From a4af55590dfb2241288b1384197d7689c25e7929 Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Mon, 22 Jul 2024 16:03:46 +0100 Subject: [PATCH 6/6] [Bifrost] rename bifrost read functions In preparation for the bifrost read_opt to be removed and replaced with `read()` --- .../admin/src/cluster_controller/service.rs | 4 +-- crates/bifrost/src/bifrost.rs | 35 +++++++------------ crates/bifrost/src/loglet/loglet_tests.rs | 21 +++++------ crates/bifrost/src/loglet/mod.rs | 4 +-- crates/bifrost/src/loglet_wrapper.rs | 8 ++--- .../bifrost/src/providers/local_loglet/mod.rs | 4 +-- crates/bifrost/src/providers/memory_loglet.rs | 4 +-- crates/bifrost/src/read_stream.rs | 2 +- crates/ingress-dispatcher/src/dispatcher.rs | 2 +- crates/worker/src/partition/leadership.rs | 4 +-- 10 files changed, 35 insertions(+), 53 deletions(-) diff --git a/crates/admin/src/cluster_controller/service.rs b/crates/admin/src/cluster_controller/service.rs index bebb9509b..1668f27fd 100644 --- a/crates/admin/src/cluster_controller/service.rs +++ b/crates/admin/src/cluster_controller/service.rs @@ -471,7 +471,7 @@ mod tests { svc_handle.trim_log(log_id, Lsn::from(3)).await??; - let record = bifrost.read_next_single(log_id, Lsn::OLDEST).await?; + let record = bifrost.read(log_id, Lsn::OLDEST).await?; assert_that!( record.record, pat!(Record::TrimGap(pat!(TrimGap { @@ -675,7 +675,7 @@ mod tests { // everything before the persisted_lsn. assert_eq!(bifrost.get_trim_point(log_id).await?, Lsn::from(3)); // we should be able to after the last persisted lsn - let v = bifrost.read_next_single(log_id, Lsn::from(4)).await?; + let v = bifrost.read(log_id, Lsn::from(4)).await?; assert_eq!(Lsn::from(4), v.offset); assert!(v.record.is_data()); assert_eq!( diff --git a/crates/bifrost/src/bifrost.rs b/crates/bifrost/src/bifrost.rs index 6724d2c2c..e9c74febc 100644 --- a/crates/bifrost/src/bifrost.rs +++ b/crates/bifrost/src/bifrost.rs @@ -111,20 +111,16 @@ impl Bifrost { /// start reading from. This means that the record returned will have a LSN that is equal or greater than /// `from`. If no records are committed yet at this LSN, this read operation will "wait" /// for such records to appear. - pub async fn read_next_single(&self, log_id: LogId, from: Lsn) -> Result { - self.inner.read_next_single(log_id, from).await + pub async fn read(&self, log_id: LogId, from: Lsn) -> Result { + self.inner.read(log_id, from).await } /// Read the next record from the LSN provided. The `from` indicates the LSN where we will /// start reading from. This means that the record returned will have a LSN that is equal or greater than /// `from`. If no records are committed yet at this LSN, this read operation will return /// `None`. - pub async fn read_next_single_opt( - &self, - log_id: LogId, - from: Lsn, - ) -> Result> { - self.inner.read_next_single_opt(log_id, from).await + pub async fn read_opt(&self, log_id: LogId, from: Lsn) -> Result> { + self.inner.read_opt(log_id, from).await } /// Create a read stream. `end_lsn` is inclusive. Pass [[`Lsn::Max`]] for a tailing stream. Use @@ -287,28 +283,24 @@ impl BifrostInner { }) } - pub async fn read_next_single(&self, log_id: LogId, from: Lsn) -> Result { + pub async fn read(&self, log_id: LogId, from: Lsn) -> Result { self.fail_if_shutting_down()?; // Accidental reads from Lsn::INVALID are reset to Lsn::OLDEST let from = std::cmp::max(Lsn::OLDEST, from); let loglet = self.find_loglet_for_lsn(log_id, from).await?; Ok(loglet - .read_next_single(from) + .read(from) .await? .decode() .expect("decoding a bifrost envelope succeeds")) } - pub async fn read_next_single_opt( - &self, - log_id: LogId, - from: Lsn, - ) -> Result> { + pub async fn read_opt(&self, log_id: LogId, from: Lsn) -> Result> { self.fail_if_shutting_down()?; let loglet = self.find_loglet_for_lsn(log_id, from).await?; - Ok(loglet.read_next_single_opt(from).await?.map(|record| { + Ok(loglet.read_opt(from).await?.map(|record| { record .decode() .expect("decoding a bifrost envelope succeeds") @@ -599,7 +591,7 @@ mod tests { // 5 itself is trimmed for lsn in 1..=5 { - let record = bifrost.read_next_single_opt(log_id, Lsn::from(lsn)).await?; + let record = bifrost.read_opt(log_id, Lsn::from(lsn)).await?; assert_that!( record, pat!(Some(pat!(LogRecord { @@ -612,7 +604,7 @@ mod tests { } for lsn in 6..=10 { - let record = bifrost.read_next_single_opt(log_id, Lsn::from(lsn)).await?; + let record = bifrost.read_opt(log_id, Lsn::from(lsn)).await?; assert_that!( record, pat!(Some(pat!(LogRecord { @@ -635,10 +627,7 @@ mod tests { let new_trim_point = bifrost.get_trim_point(log_id).await?; assert_eq!(Lsn::from(10), new_trim_point); - let record = bifrost - .read_next_single_opt(log_id, Lsn::from(10)) - .await? - .unwrap(); + let record = bifrost.read_opt(log_id, Lsn::from(10)).await?.unwrap(); assert!(record.record.is_trim_gap()); assert_eq!(Lsn::from(10), record.record.try_as_trim_gap().unwrap().to); @@ -648,7 +637,7 @@ mod tests { } for lsn in 11..20 { - let record = bifrost.read_next_single_opt(log_id, Lsn::from(lsn)).await?; + let record = bifrost.read_opt(log_id, Lsn::from(lsn)).await?; assert_that!( record, pat!(Some(pat!(LogRecord { diff --git a/crates/bifrost/src/loglet/loglet_tests.rs b/crates/bifrost/src/loglet/loglet_tests.rs index 9c491173e..bc93b141a 100644 --- a/crates/bifrost/src/loglet/loglet_tests.rs +++ b/crates/bifrost/src/loglet/loglet_tests.rs @@ -89,20 +89,20 @@ pub async fn gapless_loglet_smoke_test(loglet: Arc) -> googletest::R } // read record 1 (reading from OLDEST) - let_assert!(Some(log_record) = loglet.read_next_single_opt(LogletOffset::OLDEST).await?); + let_assert!(Some(log_record) = loglet.read_opt(LogletOffset::OLDEST).await?); let LogRecord { offset, record } = log_record; assert_eq!(LogletOffset::OLDEST, offset,); assert!(record.is_data()); assert_eq!(Some(&Bytes::from_static(b"record1")), record.payload()); // read record 2 - let_assert!(Some(log_record) = loglet.read_next_single_opt(offset.next()).await?); + let_assert!(Some(log_record) = loglet.read_opt(offset.next()).await?); let LogRecord { offset, record } = log_record; assert_eq!(LogletOffset::from(2), offset); assert_eq!(Some(&Bytes::from_static(b"record2")), record.payload()); // read record 3 - let_assert!(Some(log_record) = loglet.read_next_single_opt(offset.next()).await?); + let_assert!(Some(log_record) = loglet.read_opt(offset.next()).await?); let LogRecord { offset, record } = log_record; assert_eq!(LogletOffset::from(3), offset); assert_eq!(Some(&Bytes::from_static(b"record3")), record.payload()); @@ -116,17 +116,13 @@ pub async fn gapless_loglet_smoke_test(loglet: Arc) -> googletest::R } // read from the future returns None - assert!(loglet - .read_next_single_opt(LogletOffset::from(end)) - .await? - .is_none()); + assert!(loglet.read_opt(LogletOffset::from(end)).await?.is_none()); let handle1: JoinHandle> = tokio::spawn({ let loglet = loglet.clone(); async move { // read future record 4 - let LogRecord { offset, record } = - loglet.read_next_single(LogletOffset::from(4)).await?; + let LogRecord { offset, record } = loglet.read(LogletOffset::from(4)).await?; assert_eq!(LogletOffset(4), offset); assert_eq!(Some(&Bytes::from_static(b"record4")), record.payload()); Ok(()) @@ -138,8 +134,7 @@ pub async fn gapless_loglet_smoke_test(loglet: Arc) -> googletest::R let loglet = loglet.clone(); async move { // read future record 10 - let LogRecord { offset, record } = - loglet.read_next_single(LogletOffset::from(10)).await?; + let LogRecord { offset, record } = loglet.read(LogletOffset::from(10)).await?; assert_eq!(LogletOffset(10), offset); assert_eq!(Some(&Bytes::from_static(b"record10")), record.payload()); Ok(()) @@ -192,7 +187,7 @@ pub async fn gapless_loglet_smoke_test(loglet: Arc) -> googletest::R assert!(!tail.is_sealed()); } - let_assert!(Some(log_record) = loglet.read_next_single_opt(LogletOffset::OLDEST).await?); + let_assert!(Some(log_record) = loglet.read_opt(LogletOffset::OLDEST).await?); let LogRecord { offset, record } = log_record; assert_eq!(LogletOffset::OLDEST, offset); assert!(record.is_trim_gap()); @@ -201,7 +196,7 @@ pub async fn gapless_loglet_smoke_test(loglet: Arc) -> googletest::R record.try_as_trim_gap_ref().unwrap().to ); - let_assert!(Some(log_record) = loglet.read_next_single_opt(LogletOffset::from(4)).await?); + let_assert!(Some(log_record) = loglet.read_opt(LogletOffset::from(4)).await?); let LogRecord { offset, record } = log_record; assert_eq!(LogletOffset::from(4), offset); assert!(record.is_data()); diff --git a/crates/bifrost/src/loglet/mod.rs b/crates/bifrost/src/loglet/mod.rs index 7e9f3067d..8d8b4dc1d 100644 --- a/crates/bifrost/src/loglet/mod.rs +++ b/crates/bifrost/src/loglet/mod.rs @@ -157,13 +157,13 @@ pub trait LogletBase: Send + Sync + std::fmt::Debug { /// Read or wait for the record at `from` offset, or the next available record if `from` isn't /// defined for the loglet. - async fn read_next_single( + async fn read( &self, from: Self::Offset, ) -> Result, OperationError>; /// Read the next record if it's been committed, otherwise, return None without waiting. - async fn read_next_single_opt( + async fn read_opt( &self, from: Self::Offset, ) -> Result>, OperationError>; diff --git a/crates/bifrost/src/loglet_wrapper.rs b/crates/bifrost/src/loglet_wrapper.rs index a2256441f..1705094d5 100644 --- a/crates/bifrost/src/loglet_wrapper.rs +++ b/crates/bifrost/src/loglet_wrapper.rs @@ -112,22 +112,22 @@ impl LogletBase for LogletWrapper { self.loglet.seal().await } - async fn read_next_single(&self, from: Lsn) -> Result, OperationError> { + async fn read(&self, from: Lsn) -> Result, OperationError> { // convert LSN to loglet offset let offset = from.into_offset(self.base_lsn); self.loglet - .read_next_single(offset) + .read(offset) .await .map(|record| record.with_base_lsn(self.base_lsn)) } - async fn read_next_single_opt( + async fn read_opt( &self, from: Self::Offset, ) -> Result>, OperationError> { let offset = from.into_offset(self.base_lsn); self.loglet - .read_next_single_opt(offset) + .read_opt(offset) .await .map(|maybe_record| maybe_record.map(|record| record.with_base_lsn(self.base_lsn))) } diff --git a/crates/bifrost/src/providers/local_loglet/mod.rs b/crates/bifrost/src/providers/local_loglet/mod.rs index 31d40523b..18d99a0a0 100644 --- a/crates/bifrost/src/providers/local_loglet/mod.rs +++ b/crates/bifrost/src/providers/local_loglet/mod.rs @@ -373,7 +373,7 @@ impl LogletBase for LocalLoglet { Ok(()) } - async fn read_next_single( + async fn read( &self, from: Self::Offset, ) -> Result, OperationError> { @@ -387,7 +387,7 @@ impl LogletBase for LocalLoglet { } } - async fn read_next_single_opt( + async fn read_opt( &self, from: Self::Offset, ) -> Result>, OperationError> { diff --git a/crates/bifrost/src/providers/memory_loglet.rs b/crates/bifrost/src/providers/memory_loglet.rs index 27e62dec9..29f248cc5 100644 --- a/crates/bifrost/src/providers/memory_loglet.rs +++ b/crates/bifrost/src/providers/memory_loglet.rs @@ -400,7 +400,7 @@ impl LogletBase for MemoryLoglet { Ok(()) } - async fn read_next_single( + async fn read( &self, from: LogletOffset, ) -> Result, OperationError> { @@ -414,7 +414,7 @@ impl LogletBase for MemoryLoglet { } } - async fn read_next_single_opt( + async fn read_opt( &self, after: Self::Offset, ) -> Result>, OperationError> { diff --git a/crates/bifrost/src/read_stream.rs b/crates/bifrost/src/read_stream.rs index 8ade70011..1ceeecfa0 100644 --- a/crates/bifrost/src/read_stream.rs +++ b/crates/bifrost/src/read_stream.rs @@ -39,7 +39,7 @@ pub struct LogReadStream { end_lsn: Lsn, terminated: bool, /// Represents the next possible record to be read. - // This is akin to the lsn that can be passed to `read_next_single(from)` to read the + // This is akin to the lsn that can be passed to `read(from)` to read the // next record in the log. read_pointer: Lsn, } diff --git a/crates/ingress-dispatcher/src/dispatcher.rs b/crates/ingress-dispatcher/src/dispatcher.rs index 798298ea1..d30b05e94 100644 --- a/crates/ingress-dispatcher/src/dispatcher.rs +++ b/crates/ingress-dispatcher/src/dispatcher.rs @@ -325,7 +325,7 @@ mod tests { .unwrap() .find_partition_id(invocation_id.partition_key())?; let log_id = LogId::from(partition_id); - let log_record = bifrost.read_next_single(log_id, Lsn::OLDEST).await?; + let log_record = bifrost.read(log_id, Lsn::OLDEST).await?; let output_message = Envelope::from_bytes(log_record.record.into_payload_unchecked().into_body())?; diff --git a/crates/worker/src/partition/leadership.rs b/crates/worker/src/partition/leadership.rs index e8b331112..f40b115f7 100644 --- a/crates/worker/src/partition/leadership.rs +++ b/crates/worker/src/partition/leadership.rs @@ -733,9 +733,7 @@ mod tests { assert!(matches!(state.state, State::Candidate(_))); - let record = bifrost - .read_next_single(PARTITION_ID.into(), Lsn::OLDEST) - .await?; + let record = bifrost.read(PARTITION_ID.into(), Lsn::OLDEST).await?; let envelope = Envelope::from_bytes(record.record.into_payload_unchecked().body())?; let_assert!(Command::AnnounceLeader(announce_leader) = envelope.command); assert_eq!(