Skip to content

Commit

Permalink
[Bifrost] Init single-node loglets with random loglet ids
Browse files Browse the repository at this point in the history
In preparation to allow multi-segment chains, local loglets in single-node bootstrap now get unique random ids.
  • Loading branch information
AhmedSoliman committed Jul 19, 2024
1 parent 4ca3ad1 commit f977116
Show file tree
Hide file tree
Showing 14 changed files with 134 additions and 92 deletions.
2 changes: 1 addition & 1 deletion crates/bifrost/benches/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub async fn spawn_environment(
RocksDbManager::init(Constant::new(config.common))
});

let logs = restate_types::logs::metadata::create_static_metadata(provider, num_logs);
let logs = restate_types::logs::metadata::bootstrap_logs_metadata(provider, num_logs);

metadata_store_client
.put(BIFROST_CONFIG_KEY.clone(), logs.clone(), Precondition::None)
Expand Down
32 changes: 16 additions & 16 deletions crates/bifrost/src/providers/local_loglet/keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,26 @@ pub(crate) const DATA_KEY_PREFIX_LENGTH: usize = size_of::<u8>() + size_of::<u64

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RecordKey {
pub log_id: u64,
pub loglet_id: u64,
pub offset: LogletOffset,
}

impl RecordKey {
pub fn new(log_id: u64, offset: LogletOffset) -> Self {
Self { log_id, offset }
pub fn new(loglet_id: u64, offset: LogletOffset) -> Self {
Self { loglet_id, offset }
}

pub fn upper_bound(log_id: u64) -> Self {
pub fn upper_bound(loglet_id: u64) -> Self {
Self {
log_id,
loglet_id,
offset: LogletOffset::MAX,
}
}

pub fn to_bytes(self) -> Bytes {
let mut buf = BytesMut::with_capacity(size_of::<Self>() + 1);
buf.put_u8(b'd');
buf.put_u64(self.log_id);
buf.put_u64(self.loglet_id);
buf.put_u64(self.offset.into());
buf.freeze()
}
Expand All @@ -48,9 +48,9 @@ impl RecordKey {
let mut data = data;
let c = data.get_u8();
debug_assert_eq!(c, b'd');
let log_id = data.get_u64();
let loglet_id = data.get_u64();
let offset = LogletOffset::from(data.get_u64());
Self { log_id, offset }
Self { loglet_id, offset }
}
}

Expand All @@ -64,20 +64,20 @@ pub enum MetadataKind {

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct MetadataKey {
pub log_id: u64,
pub loglet_id: u64,
pub kind: MetadataKind,
}

impl MetadataKey {
pub fn new(log_id: u64, kind: MetadataKind) -> Self {
Self { log_id, kind }
pub fn new(loglet_id: u64, kind: MetadataKind) -> Self {
Self { loglet_id, kind }
}

pub fn to_bytes(self) -> Bytes {
let mut buf = BytesMut::with_capacity(size_of::<Self>() + 1);
// m for metadata
buf.put_u8(b'm');
buf.put_u64(self.log_id);
buf.put_u64(self.loglet_id);
buf.put_u8(self.kind as u8);
buf.freeze()
}
Expand All @@ -86,11 +86,11 @@ impl MetadataKey {
let mut data = Bytes::copy_from_slice(data);
let c = data.get_u8();
debug_assert_eq!(c, b'm');
let log_id = data.get_u64();
let loglet_id = data.get_u64();
let kind = MetadataKind::from_repr(data.get_u8());
let kind = kind.unwrap_or_default();

Self { log_id, kind }
Self { loglet_id, kind }
}
}

Expand All @@ -111,12 +111,12 @@ mod tests {
#[test]
fn test_metadata_key() {
let key = MetadataKey::new(1, MetadataKind::LogState);
assert_eq!(key.log_id, 1);
assert_eq!(key.loglet_id, 1);
assert_eq!(key.kind, MetadataKind::LogState);
let bytes = key.to_bytes();
let key2 = MetadataKey::from_slice(&bytes);
assert_eq!(key, key2);
assert_eq!(key2.log_id, 1);
assert_eq!(key2.loglet_id, 1);
assert_eq!(key2.kind, MetadataKind::LogState);
}
}
4 changes: 2 additions & 2 deletions crates/bifrost/src/providers/local_loglet/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,11 @@ impl RocksDbLogStore {
.expect("METADATA_CF exists")
}

pub fn get_log_state(&self, log_id: u64) -> Result<Option<LogState>, LogStoreError> {
pub fn get_log_state(&self, loglet_id: u64) -> Result<Option<LogState>, LogStoreError> {
let metadata_cf = self.metadata_cf();
let value = self.rocksdb.inner().as_raw_db().get_pinned_cf(
&metadata_cf,
MetadataKey::new(log_id, MetadataKind::LogState).to_bytes(),
MetadataKey::new(loglet_id, MetadataKind::LogState).to_bytes(),
)?;

if let Some(value) = value {
Expand Down
28 changes: 14 additions & 14 deletions crates/bifrost/src/providers/local_loglet/log_store_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type Ack = oneshot::Sender<Result<(), OperationError>>;
type AckRecv = oneshot::Receiver<Result<(), OperationError>>;

pub struct LogStoreWriteCommand {
log_id: u64,
loglet_id: u64,
data_updates: SmallVec<[DataUpdate; SMALL_BATCH_THRESHOLD_COUNT]>,
log_state_updates: Option<LogStateUpdates>,
ack: Option<Ack>,
Expand Down Expand Up @@ -155,7 +155,7 @@ impl LogStoreWriter {
DataUpdate::PutRecord { offset, data } => Self::put_record(
&data_cf,
&mut write_batch,
command.log_id,
command.loglet_id,
offset,
data,
),
Expand All @@ -165,7 +165,7 @@ impl LogStoreWriter {
} => Self::trim_log(
&data_cf,
&mut write_batch,
command.log_id,
command.loglet_id,
old_trim_point,
new_trim_point,
),
Expand All @@ -178,7 +178,7 @@ impl LogStoreWriter {
Self::update_log_state(
&metadata_cf,
&mut write_batch,
command.log_id,
command.loglet_id,
logstate_updates,
buffer,
)
Expand All @@ -198,14 +198,14 @@ impl LogStoreWriter {
fn update_log_state(
metadata_cf: &Arc<BoundColumnFamily>,
write_batch: &mut WriteBatch,
log_id: u64,
loglet_id: u64,
updates: LogStateUpdates,
buffer: &mut BytesMut,
) {
updates.encode(buffer).expect("encode");
write_batch.merge_cf(
metadata_cf,
MetadataKey::new(log_id, MetadataKind::LogState).to_bytes(),
MetadataKey::new(loglet_id, MetadataKind::LogState).to_bytes(),
buffer,
);
}
Expand Down Expand Up @@ -288,18 +288,18 @@ pub struct RocksDbLogWriterHandle {
impl RocksDbLogWriterHandle {
pub async fn enqueue_put_record(
&self,
log_id: u64,
loglet_id: u64,
offset: LogletOffset,
data: Bytes,
) -> Result<AckRecv, ShutdownError> {
self.enqueue_put_records(log_id, offset, &[data]).await
self.enqueue_put_records(loglet_id, offset, &[data]).await
}

pub async fn enqueue_seal(&self, log_id: u64) -> Result<AckRecv, ShutdownError> {
pub async fn enqueue_seal(&self, loglet_id: u64) -> Result<AckRecv, ShutdownError> {
let (ack, receiver) = oneshot::channel();
let log_state_updates = Some(LogStateUpdates::default().seal());
self.send_command(LogStoreWriteCommand {
log_id,
loglet_id,
data_updates: Default::default(),
log_state_updates,
ack: Some(ack),
Expand All @@ -310,7 +310,7 @@ impl RocksDbLogWriterHandle {

pub async fn enqueue_put_records(
&self,
log_id: u64,
loglet_id: u64,
mut start_offset: LogletOffset,
records: &[Bytes],
) -> Result<AckRecv, ShutdownError> {
Expand All @@ -328,7 +328,7 @@ impl RocksDbLogWriterHandle {
let log_state_updates =
Some(LogStateUpdates::default().update_release_pointer(start_offset.prev()));
self.send_command(LogStoreWriteCommand {
log_id,
loglet_id,
data_updates,
log_state_updates,
ack: Some(ack),
Expand All @@ -339,7 +339,7 @@ impl RocksDbLogWriterHandle {

pub async fn enqueue_trim(
&self,
log_id: u64,
loglet_id: u64,
old_trim_point: LogletOffset,
new_trim_point: LogletOffset,
) -> Result<(), ShutdownError> {
Expand All @@ -351,7 +351,7 @@ impl RocksDbLogWriterHandle {
let log_state_updates = Some(LogStateUpdates::default().update_trim_point(new_trim_point));

self.send_command(LogStoreWriteCommand {
log_id,
loglet_id,
data_updates,
log_state_updates,
ack: None,
Expand Down
42 changes: 21 additions & 21 deletions crates/bifrost/src/providers/local_loglet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use self::read_stream::LocalLogletReadStream;
use crate::loglet::util::OffsetWatch;

struct LocalLoglet {
log_id: u64,
loglet_id: u64,
log_store: RocksDbLogStore,
log_writer: RocksDbLogWriterHandle,
// internal offset _before_ the loglet head. Loglet head is trim_point_offset.next()
Expand All @@ -63,7 +63,7 @@ struct LocalLoglet {
impl std::fmt::Debug for LocalLoglet {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LocalLoglet")
.field("log_id", &self.log_id)
.field("loglet_id", &self.loglet_id)
.field("trim_point_offset", &self.trim_point_offset)
.field("last_committed_offset", &self.last_committed_offset)
.field("next_write_offset", &self.next_write_offset)
Expand All @@ -74,13 +74,13 @@ impl std::fmt::Debug for LocalLoglet {

impl LocalLoglet {
pub async fn create(
log_id: u64,
loglet_id: u64,
log_store: RocksDbLogStore,
log_writer: RocksDbLogWriterHandle,
) -> Result<Self, OperationError> {
// Fetch the log metadata from the store
let log_state = log_store
.get_log_state(log_id)
.get_log_state(loglet_id)
.map_err(OperationError::other)?;
let log_state = log_state.unwrap_or_default();

Expand All @@ -93,7 +93,7 @@ impl LocalLoglet {
let sealed = AtomicBool::new(log_state.seal);
let append_latency = histogram!(BIFROST_LOCAL_APPEND_DURATION);
let loglet = Self {
log_id,
loglet_id,
log_store,
log_writer,
trim_point_offset,
Expand All @@ -105,7 +105,7 @@ impl LocalLoglet {
append_latency,
};
debug!(
log_id = log_id,
loglet_id = loglet_id,
release_pointer = %release_pointer,
next_offset = next_write_offset_raw,
"Local loglet started"
Expand Down Expand Up @@ -136,10 +136,10 @@ impl LocalLoglet {
if from_offset > commit_offset {
Ok(None)
} else {
let key = RecordKey::new(self.log_id, from_offset);
let key = RecordKey::new(self.loglet_id, from_offset);
let data_cf = self.log_store.data_cf();
let mut read_opts = rocksdb::ReadOptions::default();
read_opts.set_iterate_upper_bound(RecordKey::upper_bound(self.log_id).to_bytes());
read_opts.set_iterate_upper_bound(RecordKey::upper_bound(self.loglet_id).to_bytes());

let mut iter = self.log_store.db().iterator_cf_opt(
&data_cf,
Expand All @@ -163,12 +163,12 @@ impl LocalLoglet {
let (key, data) = record;
let key = RecordKey::from_slice(&key);
// Defensive, the upper_bound set on the iterator should prevent this.
if key.log_id != self.log_id {
if key.loglet_id != self.loglet_id {
warn!(
log_id = self.log_id,
"read_from moved to the adjacent log {}, that should not happen.\
loglet_id = self.loglet_id,
"read_from moved to the adjacent loglet {}, that should not happen.\
This is harmless but needs to be investigated!",
key.log_id,
key.loglet_id,
);
return Ok(None);
}
Expand Down Expand Up @@ -219,7 +219,7 @@ impl LogletBase for LocalLoglet {
let offset = *next_offset_guard;
let receiver = self
.log_writer
.enqueue_put_record(self.log_id, offset, payload)
.enqueue_put_record(self.loglet_id, offset, payload)
.await?;
// next offset points to the next available slot.
*next_offset_guard = offset.next();
Expand Down Expand Up @@ -272,7 +272,7 @@ impl LogletBase for LocalLoglet {
// lock acquired
let receiver = self
.log_writer
.enqueue_put_records(self.log_id, *next_offset_guard, payloads)
.enqueue_put_records(self.loglet_id, *next_offset_guard, payloads)
.await?;
// next offset points to the next available slot.
*next_offset_guard = offset + num_payloads;
Expand Down Expand Up @@ -352,7 +352,7 @@ impl LogletBase for LocalLoglet {
.store(effective_trim_point.0, Ordering::Relaxed);

self.log_writer
.enqueue_trim(self.log_id, current_trim_point, effective_trim_point)
.enqueue_trim(self.loglet_id, current_trim_point, effective_trim_point)
.await?;

histogram!(BIFROST_LOCAL_TRIM_LENGTH).record(
Expand All @@ -366,7 +366,7 @@ impl LogletBase for LocalLoglet {
if self.sealed.load(Ordering::Acquire) {
return Ok(());
}
let receiver = self.log_writer.enqueue_seal(self.log_id).await?;
let receiver = self.log_writer.enqueue_seal(self.loglet_id).await?;
let _ = receiver.await.unwrap_or_else(|_| {
warn!("Unsure if the local loglet record was sealed, the ack channel was dropped");
Err(ShutdownError.into())
Expand Down Expand Up @@ -436,7 +436,7 @@ mod tests {
let loglet = Arc::new(
LocalLoglet::create(
params
.id()
.as_str()
.parse()
.expect("loglet params can be converted into u64"),
log_store,
Expand Down Expand Up @@ -478,7 +478,7 @@ mod tests {
let loglet = Arc::new(
LocalLoglet::create(
params
.id()
.as_str()
.parse()
.expect("loglet params can be converted into u64"),
log_store,
Expand Down Expand Up @@ -520,7 +520,7 @@ mod tests {
let loglet = Arc::new(
LocalLoglet::create(
params
.id()
.as_str()
.parse()
.expect("loglet params can be converted into u64"),
log_store,
Expand Down Expand Up @@ -561,7 +561,7 @@ mod tests {
let loglet = Arc::new(
LocalLoglet::create(
params
.id()
.as_str()
.parse()
.expect("loglet params can be converted into u64"),
log_store,
Expand Down Expand Up @@ -603,7 +603,7 @@ mod tests {
let loglet = Arc::new(
LocalLoglet::create(
params
.id()
.as_str()
.parse()
.expect("loglet params can be converted into u64"),
log_store,
Expand Down
Loading

0 comments on commit f977116

Please sign in to comment.