Skip to content

Commit

Permalink
[Bifrost] Decouples loglet dependencies from bifrost
Browse files Browse the repository at this point in the history
A restructure the allows finer-grain control over what types loglet providers have access to. This will be exploited more in upcoming PRs.
  • Loading branch information
AhmedSoliman committed Jul 19, 2024
1 parent 12cdaa8 commit aa1f959
Show file tree
Hide file tree
Showing 27 changed files with 234 additions and 192 deletions.
2 changes: 1 addition & 1 deletion crates/bifrost/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ license.workspace = true
publish = false

[features]
default = ["replicated-loglet"]
default = []
options_schema = ["dep:schemars"]
replicated-loglet = ["restate-types/replicated-loglet", "restate-metadata-store"]
test-util = []
Expand Down
11 changes: 6 additions & 5 deletions crates/bifrost/src/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ use restate_types::logs::{LogId, Lsn, Payload, SequenceNumber};
use restate_types::storage::StorageCodec;
use restate_types::Version;

use crate::loglet::{LogletBase, LogletWrapper};
use crate::loglet::{LogletBase, LogletProvider};
use crate::loglet_wrapper::LogletWrapper;
use crate::watchdog::WatchdogSender;
use crate::{
Error, FindTailAttributes, LogReadStream, LogRecord, LogletProvider, Result, TailState,
Error, FindTailAttributes, LogReadStream, LogRecord, Result, TailState,
SMALL_BATCH_THRESHOLD_COUNT,
};

Expand All @@ -48,7 +49,7 @@ impl Bifrost {

#[cfg(any(test, feature = "test-util"))]
pub async fn init_in_memory(metadata: Metadata) -> Self {
use crate::loglets::memory_loglet;
use crate::providers::memory_loglet;

Self::init_with_factory(metadata, memory_loglet::Factory::default()).await
}
Expand All @@ -75,7 +76,7 @@ impl Bifrost {
#[cfg(any(test, feature = "test-util"))]
pub async fn init_with_factory(
metadata: Metadata,
factory: impl crate::LogletProviderFactory,
factory: impl crate::loglet::LogletProviderFactory,
) -> Self {
use crate::BifrostService;

Expand Down Expand Up @@ -436,7 +437,7 @@ mod tests {

use super::*;

use crate::loglets::memory_loglet::{self};
use crate::providers::memory_loglet::{self};
use googletest::prelude::*;

use crate::{Record, TrimGap};
Expand Down
9 changes: 1 addition & 8 deletions crates/bifrost/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::sync::Arc;

use restate_types::logs::{LogId, Lsn};

use crate::loglets::local_loglet::LogStoreError;
use crate::providers::local_loglet::LogStoreError;
use crate::types::SealReason;

/// Result type for bifrost operations.
Expand All @@ -38,10 +38,3 @@ pub enum Error {
#[error("bifrost provider '{0}' is disabled or unrecognized")]
Disabled(String),
}

#[derive(Debug, thiserror::Error)]
#[error(transparent)]
pub enum ProviderError {
Shutdown(#[from] ShutdownError),
Other(#[from] anyhow::Error),
}
11 changes: 4 additions & 7 deletions crates/bifrost/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,17 @@

mod bifrost;
mod error;
mod loglet;
#[cfg(test)]
mod loglet_tests;
pub mod loglets;
mod provider;
pub mod loglet;
mod loglet_wrapper;
pub mod providers;
mod read_stream;
mod record;
mod service;
mod types;
mod watchdog;

pub use bifrost::Bifrost;
pub use error::{Error, ProviderError, Result};
pub use provider::*;
pub use error::{Error, Result};
pub use read_stream::LogReadStream;
pub use record::*;
pub use service::BifrostService;
Expand Down
18 changes: 18 additions & 0 deletions crates/bifrost/src/loglet/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use restate_core::ShutdownError;

#[derive(Debug, thiserror::Error)]
#[error(transparent)]
pub enum ProviderError {
Shutdown(#[from] ShutdownError),
Other(#[from] anyhow::Error),
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use restate_test_util::let_assert;
use restate_types::logs::SequenceNumber;
use tracing::info;

use crate::loglet::{Loglet, LogletOffset};
use super::{Loglet, LogletOffset};
use crate::{LogRecord, Record, TrimGap};

fn setup() {
Expand Down
149 changes: 12 additions & 137 deletions crates/bifrost/src/loglet.rs → crates/bifrost/src/loglet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,27 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

mod error;
#[cfg(test)]
pub mod loglet_tests;
mod provider;
pub(crate) mod util;

// exports
pub use error::*;
pub use provider::{LogletProvider, LogletProviderFactory};

use std::ops::Add;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Poll;

use async_trait::async_trait;
use bytes::Bytes;
use futures::Stream;

use restate_types::logs::{Lsn, SequenceNumber};

use crate::{LogRecord, LsnExt};
use crate::LogRecord;
use crate::{Result, TailState};

// Inner loglet offset
Expand Down Expand Up @@ -79,6 +88,7 @@ impl SequenceNumber for LogletOffset {
/// ^ Last Committed
/// ^ -- Last released - can be delivered to readers
///
///
/// An empty loglet. A log is empty when trim_point.next() == tail.prev()
///
/// Semantics of offsets
Expand All @@ -94,42 +104,6 @@ impl SequenceNumber for LogletOffset {
pub trait Loglet: LogletBase<Offset = LogletOffset> {}
impl<T> Loglet for T where T: LogletBase<Offset = LogletOffset> {}

/// Wraps loglets with the base LSN of the segment
#[derive(Clone, Debug)]
pub struct LogletWrapper {
/// The offset of the first record in the segment (if exists).
/// A segment on a clean chain is created with Lsn::OLDEST but this doesn't mean that this
/// record exists. It only means that we want to offset the loglet offsets by base_lsn -
/// Loglet::Offset::OLDEST.
pub(crate) base_lsn: Lsn,
loglet: Arc<dyn Loglet>,
}

impl LogletWrapper {
pub fn new(base_lsn: Lsn, loglet: Arc<dyn Loglet>) -> Self {
Self { base_lsn, loglet }
}

pub async fn create_wrapped_read_stream(
self,
start_lsn: Lsn,
) -> Result<LogletReadStreamWrapper> {
// Translates LSN to loglet offset
Ok(LogletReadStreamWrapper::new(
self.loglet
.create_read_stream(start_lsn.into_offset(self.base_lsn))
.await?,
self.base_lsn,
))
}
}

impl PartialEq for LogletWrapper {
fn eq(&self, other: &Self) -> bool {
self.base_lsn == other.base_lsn && Arc::ptr_eq(&self.loglet, &other.loglet)
}
}

#[async_trait]
pub trait LogletBase: Send + Sync + std::fmt::Debug {
type Offset: SequenceNumber;
Expand Down Expand Up @@ -194,102 +168,3 @@ pub trait LogletReadStream<S: SequenceNumber>: Stream<Item = Result<LogRecord<S,
}

pub type SendableLogletReadStream<S = Lsn> = Pin<Box<dyn LogletReadStream<S> + Send>>;

#[async_trait]
impl LogletBase for LogletWrapper {
type Offset = Lsn;

/// This should never be used directly. Instead, use `create_wrapped_read_stream()` instead.
async fn create_read_stream(
self: Arc<Self>,
_after: Self::Offset,
) -> Result<SendableLogletReadStream<Self::Offset>> {
unreachable!("create_read_stream on LogletWrapper should never be used directly")
}

async fn append(&self, data: Bytes) -> Result<Lsn> {
let offset = self.loglet.append(data).await?;
// Return the LSN given the loglet offset.
Ok(self.base_lsn.offset_by(offset))
}

async fn append_batch(&self, payloads: &[Bytes]) -> Result<Lsn> {
let offset = self.loglet.append_batch(payloads).await?;
Ok(self.base_lsn.offset_by(offset))
}

async fn find_tail(&self) -> Result<TailState<Lsn>> {
Ok(self
.loglet
.find_tail()
.await?
.map(|o| self.base_lsn.offset_by(o)))
}

async fn get_trim_point(&self) -> Result<Option<Lsn>> {
let offset = self.loglet.get_trim_point().await?;
Ok(offset.map(|o| self.base_lsn.offset_by(o)))
}

// trim_point is inclusive.
async fn trim(&self, trim_point: Self::Offset) -> Result<()> {
// trimming to INVALID is no-op
if trim_point == Self::Offset::INVALID {
return Ok(());
}
let trim_point = trim_point.into_offset(self.base_lsn);
self.loglet.trim(trim_point).await
}

async fn read_next_single(&self, from: Lsn) -> Result<LogRecord<Lsn, Bytes>> {
// convert LSN to loglet offset
let offset = from.into_offset(self.base_lsn);
self.loglet
.read_next_single(offset)
.await
.map(|record| record.with_base_lsn(self.base_lsn))
}

async fn read_next_single_opt(
&self,
from: Self::Offset,
) -> Result<Option<LogRecord<Self::Offset, Bytes>>> {
let offset = from.into_offset(self.base_lsn);
self.loglet
.read_next_single_opt(offset)
.await
.map(|maybe_record| maybe_record.map(|record| record.with_base_lsn(self.base_lsn)))
}
}

/// Wraps loglet read streams with the base LSN of the segment
pub struct LogletReadStreamWrapper {
pub(crate) base_lsn: Lsn,
inner: SendableLogletReadStream<LogletOffset>,
}

impl LogletReadStreamWrapper {
pub fn new(inner: SendableLogletReadStream<LogletOffset>, base_lsn: Lsn) -> Self {
Self { inner, base_lsn }
}
}

impl Stream for LogletReadStreamWrapper {
type Item = Result<LogRecord<Lsn, Bytes>>;

fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
match self.inner.as_mut().poll_next(cx) {
Poll::Ready(Some(Ok(record))) => {
Poll::Ready(Some(Ok(record.with_base_lsn(self.base_lsn))))
}
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}

static_assertions::assert_impl_all!(LogletWrapper: Send, Sync, Clone);
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ use async_trait::async_trait;

use restate_types::logs::metadata::{LogletParams, ProviderKind};

use crate::loglet::Loglet;
use crate::ProviderError;
use super::{Loglet, ProviderError};
use crate::Result;

#[async_trait]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tokio_stream::wrappers::WatchStream;

use restate_core::ShutdownError;

use crate::loglet::LogletOffset;
use super::LogletOffset;

#[derive(Debug)]
pub struct OffsetWatch {
Expand Down
Loading

0 comments on commit aa1f959

Please sign in to comment.