Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bifrost] rename bifrost read functions #1722

Merged
merged 6 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ mod tests {

svc_handle.trim_log(log_id, Lsn::from(3)).await??;

let record = bifrost.read_next_single(log_id, Lsn::OLDEST).await?;
let record = bifrost.read(log_id, Lsn::OLDEST).await?;
assert_that!(
record.record,
pat!(Record::TrimGap(pat!(TrimGap {
Expand Down Expand Up @@ -675,7 +675,7 @@ mod tests {
// everything before the persisted_lsn.
assert_eq!(bifrost.get_trim_point(log_id).await?, Lsn::from(3));
// we should be able to after the last persisted lsn
let v = bifrost.read_next_single(log_id, Lsn::from(4)).await?;
let v = bifrost.read(log_id, Lsn::from(4)).await?;
assert_eq!(Lsn::from(4), v.offset);
assert!(v.record.is_data());
assert_eq!(
Expand Down
2 changes: 1 addition & 1 deletion crates/bifrost/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ license.workspace = true
publish = false

[features]
default = ["replicated-loglet"]
default = []
options_schema = ["dep:schemars"]
replicated-loglet = ["restate-types/replicated-loglet", "restate-metadata-store"]
test-util = []
Expand Down
2 changes: 1 addition & 1 deletion crates/bifrost/benches/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub async fn spawn_environment(
RocksDbManager::init(Constant::new(config.common))
});

let logs = restate_types::logs::metadata::create_static_metadata(provider, num_logs);
let logs = restate_types::logs::metadata::bootstrap_logs_metadata(provider, num_logs);

metadata_store_client
.put(BIFROST_CONFIG_KEY.clone(), logs.clone(), Precondition::None)
Expand Down
66 changes: 34 additions & 32 deletions crates/bifrost/src/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ use restate_types::logs::{LogId, Lsn, Payload, SequenceNumber};
use restate_types::storage::StorageCodec;
use restate_types::Version;

use crate::loglet::{LogletBase, LogletWrapper};
use crate::loglet::{AppendError, LogletBase, LogletProvider};
use crate::loglet_wrapper::LogletWrapper;
use crate::watchdog::WatchdogSender;
use crate::{
Error, FindTailAttributes, LogReadStream, LogRecord, LogletProvider, Result, TailState,
Error, FindTailAttributes, LogReadStream, LogRecord, Result, TailState,
SMALL_BATCH_THRESHOLD_COUNT,
};

Expand All @@ -48,7 +49,7 @@ impl Bifrost {

#[cfg(any(test, feature = "test-util"))]
pub async fn init_in_memory(metadata: Metadata) -> Self {
use crate::loglets::memory_loglet;
use crate::providers::memory_loglet;

Self::init_with_factory(metadata, memory_loglet::Factory::default()).await
}
Expand All @@ -75,7 +76,7 @@ impl Bifrost {
#[cfg(any(test, feature = "test-util"))]
pub async fn init_with_factory(
metadata: Metadata,
factory: impl crate::LogletProviderFactory,
factory: impl crate::loglet::LogletProviderFactory,
) -> Self {
use crate::BifrostService;

Expand Down Expand Up @@ -110,20 +111,16 @@ impl Bifrost {
/// start reading from. This means that the record returned will have a LSN that is equal or greater than
/// `from`. If no records are committed yet at this LSN, this read operation will "wait"
/// for such records to appear.
pub async fn read_next_single(&self, log_id: LogId, from: Lsn) -> Result<LogRecord> {
self.inner.read_next_single(log_id, from).await
pub async fn read(&self, log_id: LogId, from: Lsn) -> Result<LogRecord> {
self.inner.read(log_id, from).await
}

/// Read the next record from the LSN provided. The `from` indicates the LSN where we will
/// start reading from. This means that the record returned will have a LSN that is equal or greater than
/// `from`. If no records are committed yet at this LSN, this read operation will return
/// `None`.
pub async fn read_next_single_opt(
&self,
log_id: LogId,
from: Lsn,
) -> Result<Option<LogRecord>> {
self.inner.read_next_single_opt(log_id, from).await
pub async fn read_opt(&self, log_id: LogId, from: Lsn) -> Result<Option<LogRecord>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we going to remove this method as stated in the commit message?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The next commit answers this question. Yes this method will get removed.

self.inner.read_opt(log_id, from).await
}

/// Create a read stream. `end_lsn` is inclusive. Pass [[`Lsn::Max`]] for a tailing stream. Use
Expand Down Expand Up @@ -256,7 +253,14 @@ impl BifrostInner {
let loglet = self.writeable_loglet(log_id).await?;
let mut buf = BytesMut::default();
StorageCodec::encode(payload, &mut buf).expect("serialization to bifrost is infallible");
loglet.append(buf.freeze()).await

let res = loglet.append(buf.freeze()).await;
// todo: Handle retries, segment seals and other recoverable errors.
res.map_err(|e| match e {
AppendError::Sealed => todo!(),
AppendError::Shutdown(e) => Error::Shutdown(e),
AppendError::Other(e) => Error::LogletError(e),
})
}

pub async fn append_batch(&self, log_id: LogId, payloads: &[Payload]) -> Result<Lsn> {
Expand All @@ -270,31 +274,33 @@ impl BifrostInner {
buf.freeze()
})
.collect();
loglet.append_batch(&raw_payloads).await
let res = loglet.append_batch(&raw_payloads).await;
// todo: Handle retries, segment seals and other recoverable errors.
res.map_err(|e| match e {
AppendError::Sealed => todo!(),
AppendError::Shutdown(e) => Error::Shutdown(e),
AppendError::Other(e) => Error::LogletError(e),
})
}

pub async fn read_next_single(&self, log_id: LogId, from: Lsn) -> Result<LogRecord> {
pub async fn read(&self, log_id: LogId, from: Lsn) -> Result<LogRecord> {
self.fail_if_shutting_down()?;
// Accidental reads from Lsn::INVALID are reset to Lsn::OLDEST
let from = std::cmp::max(Lsn::OLDEST, from);

let loglet = self.find_loglet_for_lsn(log_id, from).await?;
Ok(loglet
.read_next_single(from)
.read(from)
.await?
.decode()
.expect("decoding a bifrost envelope succeeds"))
}

pub async fn read_next_single_opt(
&self,
log_id: LogId,
from: Lsn,
) -> Result<Option<LogRecord>> {
pub async fn read_opt(&self, log_id: LogId, from: Lsn) -> Result<Option<LogRecord>> {
self.fail_if_shutting_down()?;

let loglet = self.find_loglet_for_lsn(log_id, from).await?;
Ok(loglet.read_next_single_opt(from).await?.map(|record| {
Ok(loglet.read_opt(from).await?.map(|record| {
record
.decode()
.expect("decoding a bifrost envelope succeeds")
Expand Down Expand Up @@ -378,8 +384,7 @@ impl BifrostInner {
self.fail_if_shutting_down()?;
self.metadata
.sync(MetadataKind::Logs, TargetVersion::Latest)
.await
.map_err(Arc::new)?;
.await?;
Ok(())
}

Expand Down Expand Up @@ -436,7 +441,7 @@ mod tests {

use super::*;

use crate::loglets::memory_loglet::{self};
use crate::providers::memory_loglet::{self};
use googletest::prelude::*;

use crate::{Record, TrimGap};
Expand Down Expand Up @@ -586,7 +591,7 @@ mod tests {

// 5 itself is trimmed
for lsn in 1..=5 {
let record = bifrost.read_next_single_opt(log_id, Lsn::from(lsn)).await?;
let record = bifrost.read_opt(log_id, Lsn::from(lsn)).await?;
assert_that!(
record,
pat!(Some(pat!(LogRecord {
Expand All @@ -599,7 +604,7 @@ mod tests {
}

for lsn in 6..=10 {
let record = bifrost.read_next_single_opt(log_id, Lsn::from(lsn)).await?;
let record = bifrost.read_opt(log_id, Lsn::from(lsn)).await?;
assert_that!(
record,
pat!(Some(pat!(LogRecord {
Expand All @@ -622,10 +627,7 @@ mod tests {
let new_trim_point = bifrost.get_trim_point(log_id).await?;
assert_eq!(Lsn::from(10), new_trim_point);

let record = bifrost
.read_next_single_opt(log_id, Lsn::from(10))
.await?
.unwrap();
let record = bifrost.read_opt(log_id, Lsn::from(10)).await?.unwrap();
assert!(record.record.is_trim_gap());
assert_eq!(Lsn::from(10), record.record.try_as_trim_gap().unwrap().to);

Expand All @@ -635,7 +637,7 @@ mod tests {
}

for lsn in 11..20 {
let record = bifrost.read_next_single_opt(log_id, Lsn::from(lsn)).await?;
let record = bifrost.read_opt(log_id, Lsn::from(lsn)).await?;
assert_that!(
record,
pat!(Some(pat!(LogRecord {
Expand Down
24 changes: 12 additions & 12 deletions crates/bifrost/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,35 +13,35 @@ use std::sync::Arc;

use restate_types::logs::{LogId, Lsn};

use crate::loglets::local_loglet::LogStoreError;
use crate::types::SealReason;
use crate::loglet::{LogletError, OperationError};

/// Result type for bifrost operations.
pub type Result<T, E = Error> = std::result::Result<T, E>;

#[derive(thiserror::Error, Debug, Clone)]
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("log '{0}' is sealed")]
LogSealed(LogId, SealReason),
LogSealed(LogId),
#[error("unknown log '{0}'")]
UnknownLogId(LogId),
#[error("invalid log sequence number '{0}'")]
InvalidLsn(Lsn),
#[error("operation failed due to an ongoing shutdown")]
Shutdown(#[from] ShutdownError),
#[error(transparent)]
LogStoreError(#[from] LogStoreError),
LogletError(#[from] Arc<dyn LogletError + Send + Sync>),
#[error("failed syncing logs metadata: {0}")]
// unfortunately, we have to use Arc here, because the SyncError is not Clone.
MetadataSync(#[from] Arc<SyncError>),
MetadataSync(#[from] SyncError),
/// Provider is unknown or disabled
#[error("bifrost provider '{0}' is disabled or unrecognized")]
Disabled(String),
}

#[derive(Debug, thiserror::Error)]
#[error(transparent)]
pub enum ProviderError {
Shutdown(#[from] ShutdownError),
Other(#[from] anyhow::Error),
impl From<OperationError> for Error {
fn from(value: OperationError) -> Self {
match value {
OperationError::Shutdown(e) => Error::Shutdown(e),
OperationError::Other(e) => Error::LogletError(e),
}
}
}
11 changes: 4 additions & 7 deletions crates/bifrost/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,17 @@

mod bifrost;
mod error;
mod loglet;
#[cfg(test)]
mod loglet_tests;
pub mod loglets;
mod provider;
pub mod loglet;
mod loglet_wrapper;
pub mod providers;
mod read_stream;
mod record;
mod service;
mod types;
mod watchdog;

pub use bifrost::Bifrost;
pub use error::{Error, ProviderError, Result};
pub use provider::*;
pub use error::{Error, Result};
pub use read_stream::LogReadStream;
pub use record::*;
pub use service::BifrostService;
Expand Down
Loading
Loading