Skip to content

Commit

Permalink
[Bifrost] Decouple loglet errors from bifrost errors
Browse files Browse the repository at this point in the history
- Major cleanup of loglet error handling, this removes the unnecessary leakage of internal loglet error types to bifrost.
- No more superflous `Arc<>` of non-Clone errors.
- FindTail on loglets cannot return Sealed error (enforcement via type-system)
  • Loading branch information
AhmedSoliman committed Jul 12, 2024
1 parent 3022450 commit 95a7a6a
Show file tree
Hide file tree
Showing 14 changed files with 233 additions and 95 deletions.
24 changes: 17 additions & 7 deletions crates/bifrost/src/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use restate_types::logs::{LogId, Lsn, Payload, SequenceNumber};
use restate_types::storage::StorageCodec;
use restate_types::Version;

use crate::loglet::{LogletBase, LogletProvider};
use crate::loglet::{AppendError, LogletBase, LogletProvider};
use crate::loglet_wrapper::LogletWrapper;
use crate::watchdog::WatchdogSender;
use crate::{
Expand Down Expand Up @@ -257,7 +257,14 @@ impl BifrostInner {
let loglet = self.writeable_loglet(log_id).await?;
let mut buf = BytesMut::default();
StorageCodec::encode(payload, &mut buf).expect("serialization to bifrost is infallible");
loglet.append(buf.freeze()).await

let res = loglet.append(buf.freeze()).await;
// todo: Handle retries, segment seals and other recoverable errors.
res.map_err(|e| match e {
AppendError::Sealed => todo!(),
AppendError::Shutdown(e) => Error::Shutdown(e),
AppendError::Other(e) => Error::LogletError(e),
})
}

pub async fn append_batch(&self, log_id: LogId, payloads: &[Payload]) -> Result<Lsn> {
Expand All @@ -271,7 +278,13 @@ impl BifrostInner {
buf.freeze()
})
.collect();
loglet.append_batch(&raw_payloads).await
let res = loglet.append_batch(&raw_payloads).await;
// todo: Handle retries, segment seals and other recoverable errors.
res.map_err(|e| match e {
AppendError::Sealed => todo!(),
AppendError::Shutdown(e) => Error::Shutdown(e),
AppendError::Other(e) => Error::LogletError(e),
})
}

pub async fn read_next_single(&self, log_id: LogId, from: Lsn) -> Result<LogRecord> {
Expand Down Expand Up @@ -377,10 +390,7 @@ impl BifrostInner {
/// Immediately fetch new metadata from metadata store.
pub async fn sync_metadata(&self) -> Result<()> {
self.fail_if_shutting_down()?;
self.metadata
.sync(MetadataKind::Logs)
.await
.map_err(Arc::new)?;
self.metadata.sync(MetadataKind::Logs).await?;
Ok(())
}

Expand Down
18 changes: 13 additions & 5 deletions crates/bifrost/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ use std::sync::Arc;

use restate_types::logs::{LogId, Lsn};

use crate::providers::local_loglet::LogStoreError;
use crate::loglet::{LogletError, OperationError};
use crate::types::SealReason;

/// Result type for bifrost operations.
pub type Result<T, E = Error> = std::result::Result<T, E>;

#[derive(thiserror::Error, Debug, Clone)]
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("log '{0}' is sealed")]
LogSealed(LogId, SealReason),
Expand All @@ -30,11 +30,19 @@ pub enum Error {
#[error("operation failed due to an ongoing shutdown")]
Shutdown(#[from] ShutdownError),
#[error(transparent)]
LogStoreError(#[from] LogStoreError),
LogletError(#[from] Arc<dyn LogletError + Send + Sync>),
#[error("failed syncing logs metadata: {0}")]
// unfortunately, we have to use Arc here, because the SyncError is not Clone.
MetadataSync(#[from] Arc<SyncError>),
MetadataSync(#[from] SyncError),
/// Provider is unknown or disabled
#[error("bifrost provider '{0}' is disabled or unrecognized")]
Disabled(String),
}

impl From<OperationError> for Error {
fn from(value: OperationError) -> Self {
match value {
OperationError::Shutdown(e) => Error::Shutdown(e),
OperationError::Other(e) => Error::LogletError(e),
}
}
}
100 changes: 97 additions & 3 deletions crates/bifrost/src/loglet/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,105 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::fmt::{Debug, Display};
use std::sync::Arc;

use restate_core::ShutdownError;

#[derive(Debug, Clone, thiserror::Error)]
pub enum AppendError {
#[error("Loglet is sealed")]
Sealed,
#[error(transparent)]
Shutdown(#[from] ShutdownError),
#[error(transparent)]
Other(Arc<dyn LogletError>),
}

impl AppendError {
pub fn retryable<E: std::error::Error + Send + Sync + 'static>(error: E) -> Self {
Self::Other(Arc::new(RetryableError(error)))
}

pub fn terminal<E: std::error::Error + Send + Sync + 'static>(error: E) -> Self {
Self::Other(Arc::new(TerminalError(error)))
}

pub fn other<E: LogletError + Send + Sync>(error: E) -> Self {
Self::Other(Arc::new(error))
}
}

#[derive(Debug, thiserror::Error)]
#[error(transparent)]
pub enum ProviderError {
pub enum OperationError {
#[error(transparent)]
Shutdown(#[from] ShutdownError),
Other(#[from] anyhow::Error),
#[error(transparent)]
Other(Arc<dyn LogletError>),
}

impl OperationError {
pub fn retryable<E: std::error::Error + Send + Sync + 'static>(error: E) -> Self {
Self::Other(Arc::new(RetryableError(error)))
}

pub fn terminal<E: std::error::Error + Send + Sync + 'static>(error: E) -> Self {
Self::Other(Arc::new(TerminalError(error)))
}

pub fn other<E: LogletError + Send + Sync>(error: E) -> Self {
Self::Other(Arc::new(error))
}
}

// -- Helper Types --

/// Represents a type-erased error from the loglet provider.
pub trait LogletError: std::error::Error + Send + Sync + Debug + Display + 'static {
/// Signal upper layers whether this erorr should be retried or not.
fn retryable(&self) -> bool {
false
}
}

#[derive(Debug, thiserror::Error)]
struct RetryableError<T>(#[source] T);

impl<T> Display for RetryableError<T>
where
T: Debug + Display + Send + Sync + std::error::Error + 'static,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "[retryable] {}", self.0)
}
}

impl<T> LogletError for RetryableError<T>
where
T: Debug + Display + Send + Sync + std::error::Error + 'static,
{
fn retryable(&self) -> bool {
true
}
}

#[derive(Debug, thiserror::Error)]
struct TerminalError<T>(#[source] T);

impl<T> LogletError for TerminalError<T>
where
T: Debug + Display + Send + Sync + std::error::Error + 'static,
{
fn retryable(&self) -> bool {
false
}
}

impl<T> Display for TerminalError<T>
where
T: Debug + Display + Send + Sync + std::error::Error + 'static,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "[terminal] {}", self.0)
}
}
23 changes: 13 additions & 10 deletions crates/bifrost/src/loglet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,11 @@ pub trait LogletBase: Send + Sync + std::fmt::Debug {
) -> Result<SendableLogletReadStream<Self::Offset>>;

/// Append a record to the loglet.
async fn append(&self, data: Bytes) -> Result<Self::Offset>;
async fn append(&self, data: Bytes) -> Result<Self::Offset, AppendError>;

/// Append a batch of records to the loglet. The returned offset (on success) if the offset of
/// the first record in the batch)
async fn append_batch(&self, payloads: &[Bytes]) -> Result<Self::Offset>;
async fn append_batch(&self, payloads: &[Bytes]) -> Result<Self::Offset, AppendError>;

/// The tail is *the first unwritten position* in the loglet.
///
Expand All @@ -129,14 +129,12 @@ pub trait LogletBase: Send + Sync + std::fmt::Debug {
/// after the next `append()` call.
///
/// If the loglet is empty, the loglet should return TailState::Open(Offset::OLDEST).
/// This should never return Err(Error::LogSealed). Sealed state is represented as
/// TailState::Sealed(..)
async fn find_tail(&self) -> Result<TailState<Self::Offset>>;
async fn find_tail(&self) -> Result<TailState<Self::Offset>, OperationError>;

/// The offset of the slot **before** the first readable record (if it exists), or the offset
/// before the next slot that will be written to. Must not return Self::INVALID. If the loglet
/// is never trimmed, this must return `None`.
async fn get_trim_point(&self) -> Result<Option<Self::Offset>>;
async fn get_trim_point(&self) -> Result<Option<Self::Offset>, OperationError>;

/// Trim the loglet prefix up to and including the `trim_point`.
/// If trim_point equal or higher than the loglet tail, the loglet trims its data until the tail.
Expand All @@ -146,21 +144,26 @@ pub trait LogletBase: Send + Sync + std::fmt::Debug {
///
/// Passing `Offset::INVALID` is a no-op. (success)
/// Passing `Offset::OLDEST` trims the first record in the loglet (if exists).
async fn trim(&self, trim_point: Self::Offset) -> Result<()>;
async fn trim(&self, trim_point: Self::Offset) -> Result<(), OperationError>;

/// Read or wait for the record at `from` offset, or the next available record if `from` isn't
/// defined for the loglet.
async fn read_next_single(&self, from: Self::Offset) -> Result<LogRecord<Self::Offset, Bytes>>;
async fn read_next_single(
&self,
from: Self::Offset,
) -> Result<LogRecord<Self::Offset, Bytes>, OperationError>;

/// Read the next record if it's been committed, otherwise, return None without waiting.
async fn read_next_single_opt(
&self,
from: Self::Offset,
) -> Result<Option<LogRecord<Self::Offset, Bytes>>>;
) -> Result<Option<LogRecord<Self::Offset, Bytes>>, OperationError>;
}

/// A stream of log records from a single loglet. Loglet streams are _always_ tailing streams.
pub trait LogletReadStream<S: SequenceNumber>: Stream<Item = Result<LogRecord<S, Bytes>>> {
pub trait LogletReadStream<S: SequenceNumber>:
Stream<Item = Result<LogRecord<S, Bytes>, OperationError>>
{
/// Current read pointer. This points to the next offset to be read.
fn read_pointer(&self) -> S;
/// Returns true if the stream is terminated.
Expand Down
6 changes: 3 additions & 3 deletions crates/bifrost/src/loglet/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use async_trait::async_trait;

use restate_types::logs::metadata::{LogletParams, ProviderKind};

use super::{Loglet, ProviderError};
use super::{Loglet, OperationError};
use crate::Result;

#[async_trait]
Expand All @@ -23,7 +23,7 @@ pub trait LogletProviderFactory: Send + 'static {
/// Factory creates providers of `kind`.
fn kind(&self) -> ProviderKind;
/// Initialize provider.
async fn create(self: Box<Self>) -> Result<Arc<dyn LogletProvider + 'static>, ProviderError>;
async fn create(self: Box<Self>) -> Result<Arc<dyn LogletProvider + 'static>, OperationError>;
}

#[async_trait]
Expand All @@ -35,7 +35,7 @@ pub trait LogletProvider: Send + Sync {
async fn post_start(&self) {}

/// Hook for handling graceful shutdown
async fn shutdown(&self) -> Result<(), ProviderError> {
async fn shutdown(&self) -> Result<(), OperationError> {
Ok(())
}
}
20 changes: 11 additions & 9 deletions crates/bifrost/src/loglet_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ use futures::Stream;

use restate_types::logs::{Lsn, SequenceNumber};

use crate::loglet::{Loglet, LogletBase, LogletOffset, SendableLogletReadStream};
use crate::loglet::{
AppendError, Loglet, LogletBase, LogletOffset, OperationError, SendableLogletReadStream,
};
use crate::{LogRecord, LsnExt};
use crate::{Result, TailState};

Expand Down Expand Up @@ -71,32 +73,32 @@ impl LogletBase for LogletWrapper {
unreachable!("create_read_stream on LogletWrapper should never be used directly")
}

async fn append(&self, data: Bytes) -> Result<Lsn> {
async fn append(&self, data: Bytes) -> Result<Lsn, AppendError> {
let offset = self.loglet.append(data).await?;
// Return the LSN given the loglet offset.
Ok(self.base_lsn.offset_by(offset))
}

async fn append_batch(&self, payloads: &[Bytes]) -> Result<Lsn> {
async fn append_batch(&self, payloads: &[Bytes]) -> Result<Lsn, AppendError> {
let offset = self.loglet.append_batch(payloads).await?;
Ok(self.base_lsn.offset_by(offset))
}

async fn find_tail(&self) -> Result<TailState<Lsn>> {
async fn find_tail(&self) -> Result<TailState<Lsn>, OperationError> {
Ok(self
.loglet
.find_tail()
.await?
.map(|o| self.base_lsn.offset_by(o)))
}

async fn get_trim_point(&self) -> Result<Option<Lsn>> {
async fn get_trim_point(&self) -> Result<Option<Lsn>, OperationError> {
let offset = self.loglet.get_trim_point().await?;
Ok(offset.map(|o| self.base_lsn.offset_by(o)))
}

// trim_point is inclusive.
async fn trim(&self, trim_point: Self::Offset) -> Result<()> {
async fn trim(&self, trim_point: Self::Offset) -> Result<(), OperationError> {
// trimming to INVALID is no-op
if trim_point == Self::Offset::INVALID {
return Ok(());
Expand All @@ -105,7 +107,7 @@ impl LogletBase for LogletWrapper {
self.loglet.trim(trim_point).await
}

async fn read_next_single(&self, from: Lsn) -> Result<LogRecord<Lsn, Bytes>> {
async fn read_next_single(&self, from: Lsn) -> Result<LogRecord<Lsn, Bytes>, OperationError> {
// convert LSN to loglet offset
let offset = from.into_offset(self.base_lsn);
self.loglet
Expand All @@ -117,7 +119,7 @@ impl LogletBase for LogletWrapper {
async fn read_next_single_opt(
&self,
from: Self::Offset,
) -> Result<Option<LogRecord<Self::Offset, Bytes>>> {
) -> Result<Option<LogRecord<Self::Offset, Bytes>>, OperationError> {
let offset = from.into_offset(self.base_lsn);
self.loglet
.read_next_single_opt(offset)
Expand All @@ -139,7 +141,7 @@ impl LogletReadStreamWrapper {
}

impl Stream for LogletReadStreamWrapper {
type Item = Result<LogRecord<Lsn, Bytes>>;
type Item = Result<LogRecord<Lsn, Bytes>, OperationError>;

fn poll_next(
mut self: Pin<&mut Self>,
Expand Down
Loading

0 comments on commit 95a7a6a

Please sign in to comment.