Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bifrost] Decouples loglet dependencies from bifrost #1704

Merged
merged 1 commit into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/bifrost/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ license.workspace = true
publish = false

[features]
default = ["replicated-loglet"]
default = []
options_schema = ["dep:schemars"]
replicated-loglet = ["restate-types/replicated-loglet", "restate-metadata-store"]
test-util = []
Expand Down
11 changes: 6 additions & 5 deletions crates/bifrost/src/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ use restate_types::logs::{LogId, Lsn, Payload, SequenceNumber};
use restate_types::storage::StorageCodec;
use restate_types::Version;

use crate::loglet::{LogletBase, LogletWrapper};
use crate::loglet::{LogletBase, LogletProvider};
use crate::loglet_wrapper::LogletWrapper;
use crate::watchdog::WatchdogSender;
use crate::{
Error, FindTailAttributes, LogReadStream, LogRecord, LogletProvider, Result, TailState,
Error, FindTailAttributes, LogReadStream, LogRecord, Result, TailState,
SMALL_BATCH_THRESHOLD_COUNT,
};

Expand All @@ -48,7 +49,7 @@ impl Bifrost {

#[cfg(any(test, feature = "test-util"))]
pub async fn init_in_memory(metadata: Metadata) -> Self {
use crate::loglets::memory_loglet;
use crate::providers::memory_loglet;

Self::init_with_factory(metadata, memory_loglet::Factory::default()).await
}
Expand All @@ -75,7 +76,7 @@ impl Bifrost {
#[cfg(any(test, feature = "test-util"))]
pub async fn init_with_factory(
metadata: Metadata,
factory: impl crate::LogletProviderFactory,
factory: impl crate::loglet::LogletProviderFactory,
) -> Self {
use crate::BifrostService;

Expand Down Expand Up @@ -436,7 +437,7 @@ mod tests {

use super::*;

use crate::loglets::memory_loglet::{self};
use crate::providers::memory_loglet::{self};
use googletest::prelude::*;

use crate::{Record, TrimGap};
Expand Down
9 changes: 1 addition & 8 deletions crates/bifrost/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::sync::Arc;

use restate_types::logs::{LogId, Lsn};

use crate::loglets::local_loglet::LogStoreError;
use crate::providers::local_loglet::LogStoreError;
use crate::types::SealReason;

/// Result type for bifrost operations.
Expand All @@ -38,10 +38,3 @@ pub enum Error {
#[error("bifrost provider '{0}' is disabled or unrecognized")]
Disabled(String),
}

#[derive(Debug, thiserror::Error)]
#[error(transparent)]
pub enum ProviderError {
Shutdown(#[from] ShutdownError),
Other(#[from] anyhow::Error),
}
11 changes: 4 additions & 7 deletions crates/bifrost/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,17 @@

mod bifrost;
mod error;
mod loglet;
#[cfg(test)]
mod loglet_tests;
pub mod loglets;
mod provider;
pub mod loglet;
mod loglet_wrapper;
pub mod providers;
mod read_stream;
mod record;
mod service;
mod types;
mod watchdog;

pub use bifrost::Bifrost;
pub use error::{Error, ProviderError, Result};
pub use provider::*;
pub use error::{Error, Result};
pub use read_stream::LogReadStream;
pub use record::*;
pub use service::BifrostService;
Expand Down
18 changes: 18 additions & 0 deletions crates/bifrost/src/loglet/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use restate_core::ShutdownError;

#[derive(Debug, thiserror::Error)]
#[error(transparent)]
pub enum ProviderError {
Shutdown(#[from] ShutdownError),
Other(#[from] anyhow::Error),
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use restate_test_util::let_assert;
use restate_types::logs::SequenceNumber;
use tracing::info;

use crate::loglet::{Loglet, LogletOffset};
use super::{Loglet, LogletOffset};
use crate::{LogRecord, Record, TrimGap};

fn setup() {
Expand Down
149 changes: 12 additions & 137 deletions crates/bifrost/src/loglet.rs → crates/bifrost/src/loglet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,27 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

mod error;
#[cfg(test)]
pub mod loglet_tests;
mod provider;
pub(crate) mod util;

// exports
pub use error::*;
pub use provider::{LogletProvider, LogletProviderFactory};

use std::ops::Add;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Poll;

use async_trait::async_trait;
use bytes::Bytes;
use futures::Stream;

use restate_types::logs::{Lsn, SequenceNumber};

use crate::{LogRecord, LsnExt};
use crate::LogRecord;
use crate::{Result, TailState};

// Inner loglet offset
Expand Down Expand Up @@ -79,6 +88,7 @@ impl SequenceNumber for LogletOffset {
/// ^ Last Committed
/// ^ -- Last released - can be delivered to readers
///
///
/// An empty loglet. A log is empty when trim_point.next() == tail.prev()
///
/// Semantics of offsets
Expand All @@ -94,42 +104,6 @@ impl SequenceNumber for LogletOffset {
pub trait Loglet: LogletBase<Offset = LogletOffset> {}
impl<T> Loglet for T where T: LogletBase<Offset = LogletOffset> {}

/// Wraps loglets with the base LSN of the segment
#[derive(Clone, Debug)]
pub struct LogletWrapper {
/// The offset of the first record in the segment (if exists).
/// A segment on a clean chain is created with Lsn::OLDEST but this doesn't mean that this
/// record exists. It only means that we want to offset the loglet offsets by base_lsn -
/// Loglet::Offset::OLDEST.
pub(crate) base_lsn: Lsn,
loglet: Arc<dyn Loglet>,
}

impl LogletWrapper {
pub fn new(base_lsn: Lsn, loglet: Arc<dyn Loglet>) -> Self {
Self { base_lsn, loglet }
}

pub async fn create_wrapped_read_stream(
self,
start_lsn: Lsn,
) -> Result<LogletReadStreamWrapper> {
// Translates LSN to loglet offset
Ok(LogletReadStreamWrapper::new(
self.loglet
.create_read_stream(start_lsn.into_offset(self.base_lsn))
.await?,
self.base_lsn,
))
}
}

impl PartialEq for LogletWrapper {
fn eq(&self, other: &Self) -> bool {
self.base_lsn == other.base_lsn && Arc::ptr_eq(&self.loglet, &other.loglet)
}
}

#[async_trait]
pub trait LogletBase: Send + Sync + std::fmt::Debug {
type Offset: SequenceNumber;
Expand Down Expand Up @@ -194,102 +168,3 @@ pub trait LogletReadStream<S: SequenceNumber>: Stream<Item = Result<LogRecord<S,
}

pub type SendableLogletReadStream<S = Lsn> = Pin<Box<dyn LogletReadStream<S> + Send>>;

#[async_trait]
impl LogletBase for LogletWrapper {
type Offset = Lsn;

/// This should never be used directly. Instead, use `create_wrapped_read_stream()` instead.
async fn create_read_stream(
self: Arc<Self>,
_after: Self::Offset,
) -> Result<SendableLogletReadStream<Self::Offset>> {
unreachable!("create_read_stream on LogletWrapper should never be used directly")
}

async fn append(&self, data: Bytes) -> Result<Lsn> {
let offset = self.loglet.append(data).await?;
// Return the LSN given the loglet offset.
Ok(self.base_lsn.offset_by(offset))
}

async fn append_batch(&self, payloads: &[Bytes]) -> Result<Lsn> {
let offset = self.loglet.append_batch(payloads).await?;
Ok(self.base_lsn.offset_by(offset))
}

async fn find_tail(&self) -> Result<TailState<Lsn>> {
Ok(self
.loglet
.find_tail()
.await?
.map(|o| self.base_lsn.offset_by(o)))
}

async fn get_trim_point(&self) -> Result<Option<Lsn>> {
let offset = self.loglet.get_trim_point().await?;
Ok(offset.map(|o| self.base_lsn.offset_by(o)))
}

// trim_point is inclusive.
async fn trim(&self, trim_point: Self::Offset) -> Result<()> {
// trimming to INVALID is no-op
if trim_point == Self::Offset::INVALID {
return Ok(());
}
let trim_point = trim_point.into_offset(self.base_lsn);
self.loglet.trim(trim_point).await
}

async fn read_next_single(&self, from: Lsn) -> Result<LogRecord<Lsn, Bytes>> {
// convert LSN to loglet offset
let offset = from.into_offset(self.base_lsn);
self.loglet
.read_next_single(offset)
.await
.map(|record| record.with_base_lsn(self.base_lsn))
}

async fn read_next_single_opt(
&self,
from: Self::Offset,
) -> Result<Option<LogRecord<Self::Offset, Bytes>>> {
let offset = from.into_offset(self.base_lsn);
self.loglet
.read_next_single_opt(offset)
.await
.map(|maybe_record| maybe_record.map(|record| record.with_base_lsn(self.base_lsn)))
}
}

/// Wraps loglet read streams with the base LSN of the segment
pub struct LogletReadStreamWrapper {
pub(crate) base_lsn: Lsn,
inner: SendableLogletReadStream<LogletOffset>,
}

impl LogletReadStreamWrapper {
pub fn new(inner: SendableLogletReadStream<LogletOffset>, base_lsn: Lsn) -> Self {
Self { inner, base_lsn }
}
}

impl Stream for LogletReadStreamWrapper {
type Item = Result<LogRecord<Lsn, Bytes>>;

fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
match self.inner.as_mut().poll_next(cx) {
Poll::Ready(Some(Ok(record))) => {
Poll::Ready(Some(Ok(record.with_base_lsn(self.base_lsn))))
}
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}

static_assertions::assert_impl_all!(LogletWrapper: Send, Sync, Clone);
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ use async_trait::async_trait;

use restate_types::logs::metadata::{LogletParams, ProviderKind};

use crate::loglet::Loglet;
use crate::ProviderError;
use super::{Loglet, ProviderError};
use crate::Result;

#[async_trait]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tokio_stream::wrappers::WatchStream;

use restate_core::ShutdownError;

use crate::loglet::LogletOffset;
use super::LogletOffset;

#[derive(Debug)]
pub struct OffsetWatch {
Expand Down
Loading
Loading