Skip to content

Commit

Permalink
[Bifrost] rename bifrost read functions
Browse files Browse the repository at this point in the history
In preparation for the bifrost read_opt to be removed and replaced with `read()`
  • Loading branch information
AhmedSoliman committed Jul 19, 2024
1 parent f977116 commit 01d336a
Show file tree
Hide file tree
Showing 10 changed files with 35 additions and 53 deletions.
4 changes: 2 additions & 2 deletions crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ mod tests {

svc_handle.trim_log(log_id, Lsn::from(3)).await??;

let record = bifrost.read_next_single(log_id, Lsn::OLDEST).await?;
let record = bifrost.read(log_id, Lsn::OLDEST).await?;
assert_that!(
record.record,
pat!(Record::TrimGap(pat!(TrimGap {
Expand Down Expand Up @@ -675,7 +675,7 @@ mod tests {
// everything before the persisted_lsn.
assert_eq!(bifrost.get_trim_point(log_id).await?, Lsn::from(3));
// we should be able to after the last persisted lsn
let v = bifrost.read_next_single(log_id, Lsn::from(4)).await?;
let v = bifrost.read(log_id, Lsn::from(4)).await?;
assert_eq!(Lsn::from(4), v.offset);
assert!(v.record.is_data());
assert_eq!(
Expand Down
35 changes: 12 additions & 23 deletions crates/bifrost/src/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,20 +111,16 @@ impl Bifrost {
/// start reading from. This means that the record returned will have a LSN that is equal or greater than
/// `from`. If no records are committed yet at this LSN, this read operation will "wait"
/// for such records to appear.
pub async fn read_next_single(&self, log_id: LogId, from: Lsn) -> Result<LogRecord> {
self.inner.read_next_single(log_id, from).await
pub async fn read(&self, log_id: LogId, from: Lsn) -> Result<LogRecord> {
self.inner.read(log_id, from).await
}

/// Read the next record from the LSN provided. The `from` indicates the LSN where we will
/// start reading from. This means that the record returned will have a LSN that is equal or greater than
/// `from`. If no records are committed yet at this LSN, this read operation will return
/// `None`.
pub async fn read_next_single_opt(
&self,
log_id: LogId,
from: Lsn,
) -> Result<Option<LogRecord>> {
self.inner.read_next_single_opt(log_id, from).await
pub async fn read_opt(&self, log_id: LogId, from: Lsn) -> Result<Option<LogRecord>> {
self.inner.read_opt(log_id, from).await
}

/// Create a read stream. `end_lsn` is inclusive. Pass [[`Lsn::Max`]] for a tailing stream. Use
Expand Down Expand Up @@ -287,28 +283,24 @@ impl BifrostInner {
})
}

pub async fn read_next_single(&self, log_id: LogId, from: Lsn) -> Result<LogRecord> {
pub async fn read(&self, log_id: LogId, from: Lsn) -> Result<LogRecord> {
self.fail_if_shutting_down()?;
// Accidental reads from Lsn::INVALID are reset to Lsn::OLDEST
let from = std::cmp::max(Lsn::OLDEST, from);

let loglet = self.find_loglet_for_lsn(log_id, from).await?;
Ok(loglet
.read_next_single(from)
.read(from)
.await?
.decode()
.expect("decoding a bifrost envelope succeeds"))
}

pub async fn read_next_single_opt(
&self,
log_id: LogId,
from: Lsn,
) -> Result<Option<LogRecord>> {
pub async fn read_opt(&self, log_id: LogId, from: Lsn) -> Result<Option<LogRecord>> {
self.fail_if_shutting_down()?;

let loglet = self.find_loglet_for_lsn(log_id, from).await?;
Ok(loglet.read_next_single_opt(from).await?.map(|record| {
Ok(loglet.read_opt(from).await?.map(|record| {
record
.decode()
.expect("decoding a bifrost envelope succeeds")
Expand Down Expand Up @@ -599,7 +591,7 @@ mod tests {

// 5 itself is trimmed
for lsn in 1..=5 {
let record = bifrost.read_next_single_opt(log_id, Lsn::from(lsn)).await?;
let record = bifrost.read_opt(log_id, Lsn::from(lsn)).await?;
assert_that!(
record,
pat!(Some(pat!(LogRecord {
Expand All @@ -612,7 +604,7 @@ mod tests {
}

for lsn in 6..=10 {
let record = bifrost.read_next_single_opt(log_id, Lsn::from(lsn)).await?;
let record = bifrost.read_opt(log_id, Lsn::from(lsn)).await?;
assert_that!(
record,
pat!(Some(pat!(LogRecord {
Expand All @@ -635,10 +627,7 @@ mod tests {
let new_trim_point = bifrost.get_trim_point(log_id).await?;
assert_eq!(Lsn::from(10), new_trim_point);

let record = bifrost
.read_next_single_opt(log_id, Lsn::from(10))
.await?
.unwrap();
let record = bifrost.read_opt(log_id, Lsn::from(10)).await?.unwrap();
assert!(record.record.is_trim_gap());
assert_eq!(Lsn::from(10), record.record.try_as_trim_gap().unwrap().to);

Expand All @@ -648,7 +637,7 @@ mod tests {
}

for lsn in 11..20 {
let record = bifrost.read_next_single_opt(log_id, Lsn::from(lsn)).await?;
let record = bifrost.read_opt(log_id, Lsn::from(lsn)).await?;
assert_that!(
record,
pat!(Some(pat!(LogRecord {
Expand Down
21 changes: 8 additions & 13 deletions crates/bifrost/src/loglet/loglet_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,20 +89,20 @@ pub async fn gapless_loglet_smoke_test(loglet: Arc<dyn Loglet>) -> googletest::R
}

// read record 1 (reading from OLDEST)
let_assert!(Some(log_record) = loglet.read_next_single_opt(LogletOffset::OLDEST).await?);
let_assert!(Some(log_record) = loglet.read_opt(LogletOffset::OLDEST).await?);
let LogRecord { offset, record } = log_record;
assert_eq!(LogletOffset::OLDEST, offset,);
assert!(record.is_data());
assert_eq!(Some(&Bytes::from_static(b"record1")), record.payload());

// read record 2
let_assert!(Some(log_record) = loglet.read_next_single_opt(offset.next()).await?);
let_assert!(Some(log_record) = loglet.read_opt(offset.next()).await?);
let LogRecord { offset, record } = log_record;
assert_eq!(LogletOffset::from(2), offset);
assert_eq!(Some(&Bytes::from_static(b"record2")), record.payload());

// read record 3
let_assert!(Some(log_record) = loglet.read_next_single_opt(offset.next()).await?);
let_assert!(Some(log_record) = loglet.read_opt(offset.next()).await?);
let LogRecord { offset, record } = log_record;
assert_eq!(LogletOffset::from(3), offset);
assert_eq!(Some(&Bytes::from_static(b"record3")), record.payload());
Expand All @@ -116,17 +116,13 @@ pub async fn gapless_loglet_smoke_test(loglet: Arc<dyn Loglet>) -> googletest::R
}

// read from the future returns None
assert!(loglet
.read_next_single_opt(LogletOffset::from(end))
.await?
.is_none());
assert!(loglet.read_opt(LogletOffset::from(end)).await?.is_none());

let handle1: JoinHandle<googletest::Result<()>> = tokio::spawn({
let loglet = loglet.clone();
async move {
// read future record 4
let LogRecord { offset, record } =
loglet.read_next_single(LogletOffset::from(4)).await?;
let LogRecord { offset, record } = loglet.read(LogletOffset::from(4)).await?;
assert_eq!(LogletOffset(4), offset);
assert_eq!(Some(&Bytes::from_static(b"record4")), record.payload());
Ok(())
Expand All @@ -138,8 +134,7 @@ pub async fn gapless_loglet_smoke_test(loglet: Arc<dyn Loglet>) -> googletest::R
let loglet = loglet.clone();
async move {
// read future record 10
let LogRecord { offset, record } =
loglet.read_next_single(LogletOffset::from(10)).await?;
let LogRecord { offset, record } = loglet.read(LogletOffset::from(10)).await?;
assert_eq!(LogletOffset(10), offset);
assert_eq!(Some(&Bytes::from_static(b"record10")), record.payload());
Ok(())
Expand Down Expand Up @@ -192,7 +187,7 @@ pub async fn gapless_loglet_smoke_test(loglet: Arc<dyn Loglet>) -> googletest::R
assert!(!tail.is_sealed());
}

let_assert!(Some(log_record) = loglet.read_next_single_opt(LogletOffset::OLDEST).await?);
let_assert!(Some(log_record) = loglet.read_opt(LogletOffset::OLDEST).await?);
let LogRecord { offset, record } = log_record;
assert_eq!(LogletOffset::OLDEST, offset);
assert!(record.is_trim_gap());
Expand All @@ -201,7 +196,7 @@ pub async fn gapless_loglet_smoke_test(loglet: Arc<dyn Loglet>) -> googletest::R
record.try_as_trim_gap_ref().unwrap().to
);

let_assert!(Some(log_record) = loglet.read_next_single_opt(LogletOffset::from(4)).await?);
let_assert!(Some(log_record) = loglet.read_opt(LogletOffset::from(4)).await?);
let LogRecord { offset, record } = log_record;
assert_eq!(LogletOffset::from(4), offset);
assert!(record.is_data());
Expand Down
4 changes: 2 additions & 2 deletions crates/bifrost/src/loglet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,13 @@ pub trait LogletBase: Send + Sync + std::fmt::Debug {

/// Read or wait for the record at `from` offset, or the next available record if `from` isn't
/// defined for the loglet.
async fn read_next_single(
async fn read(
&self,
from: Self::Offset,
) -> Result<LogRecord<Self::Offset, Bytes>, OperationError>;

/// Read the next record if it's been committed, otherwise, return None without waiting.
async fn read_next_single_opt(
async fn read_opt(
&self,
from: Self::Offset,
) -> Result<Option<LogRecord<Self::Offset, Bytes>>, OperationError>;
Expand Down
8 changes: 4 additions & 4 deletions crates/bifrost/src/loglet_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,22 +112,22 @@ impl LogletBase for LogletWrapper {
self.loglet.seal().await
}

async fn read_next_single(&self, from: Lsn) -> Result<LogRecord<Lsn, Bytes>, OperationError> {
async fn read(&self, from: Lsn) -> Result<LogRecord<Lsn, Bytes>, OperationError> {
// convert LSN to loglet offset
let offset = from.into_offset(self.base_lsn);
self.loglet
.read_next_single(offset)
.read(offset)
.await
.map(|record| record.with_base_lsn(self.base_lsn))
}

async fn read_next_single_opt(
async fn read_opt(
&self,
from: Self::Offset,
) -> Result<Option<LogRecord<Self::Offset, Bytes>>, OperationError> {
let offset = from.into_offset(self.base_lsn);
self.loglet
.read_next_single_opt(offset)
.read_opt(offset)
.await
.map(|maybe_record| maybe_record.map(|record| record.with_base_lsn(self.base_lsn)))
}
Expand Down
4 changes: 2 additions & 2 deletions crates/bifrost/src/providers/local_loglet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ impl LogletBase for LocalLoglet {
Ok(())
}

async fn read_next_single(
async fn read(
&self,
from: Self::Offset,
) -> Result<LogRecord<Self::Offset, Bytes>, OperationError> {
Expand All @@ -389,7 +389,7 @@ impl LogletBase for LocalLoglet {
}
}

async fn read_next_single_opt(
async fn read_opt(
&self,
from: Self::Offset,
) -> Result<Option<LogRecord<Self::Offset, Bytes>>, OperationError> {
Expand Down
4 changes: 2 additions & 2 deletions crates/bifrost/src/providers/memory_loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ impl LogletBase for MemoryLoglet {
Ok(())
}

async fn read_next_single(
async fn read(
&self,
from: LogletOffset,
) -> Result<LogRecord<Self::Offset, Bytes>, OperationError> {
Expand All @@ -414,7 +414,7 @@ impl LogletBase for MemoryLoglet {
}
}

async fn read_next_single_opt(
async fn read_opt(
&self,
after: Self::Offset,
) -> Result<Option<LogRecord<Self::Offset, Bytes>>, OperationError> {
Expand Down
2 changes: 1 addition & 1 deletion crates/bifrost/src/read_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub struct LogReadStream {
end_lsn: Lsn,
terminated: bool,
/// Represents the next possible record to be read.
// This is akin to the lsn that can be passed to `read_next_single(from)` to read the
// This is akin to the lsn that can be passed to `read(from)` to read the
// next record in the log.
read_pointer: Lsn,
}
Expand Down
2 changes: 1 addition & 1 deletion crates/ingress-dispatcher/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ mod tests {
.unwrap()
.find_partition_id(invocation_id.partition_key())?;
let log_id = LogId::from(partition_id);
let log_record = bifrost.read_next_single(log_id, Lsn::OLDEST).await?;
let log_record = bifrost.read(log_id, Lsn::OLDEST).await?;

let output_message =
Envelope::from_bytes(log_record.record.into_payload_unchecked().into_body())?;
Expand Down
4 changes: 1 addition & 3 deletions crates/worker/src/partition/leadership.rs
Original file line number Diff line number Diff line change
Expand Up @@ -733,9 +733,7 @@ mod tests {

assert!(matches!(state.state, State::Candidate(_)));

let record = bifrost
.read_next_single(PARTITION_ID.into(), Lsn::OLDEST)
.await?;
let record = bifrost.read(PARTITION_ID.into(), Lsn::OLDEST).await?;
let envelope = Envelope::from_bytes(record.record.into_payload_unchecked().body())?;
let_assert!(Command::AnnounceLeader(announce_leader) = envelope.command);
assert_eq!(
Expand Down

0 comments on commit 01d336a

Please sign in to comment.