Skip to content

Commit

Permalink
[Bifrost] Run trim tests on replicated loglet in single node mode
Browse files Browse the repository at this point in the history
This enables the full suite of loglet spec tests to run against replicated loglet in single node setup. This also adds a couple of configuration keys to allow users to control readahead triggers. Such triggers are used in tests to unblock the test behaviour. One caveat is that we cannot (and should not) disable readahead completely in replicated loglet, so due to its nature, the loglet spec test has been updated slightly to support this case.

Test Plan:
Unit tests
  • Loading branch information
AhmedSoliman committed Nov 15, 2024
1 parent a5e1351 commit e25e700
Show file tree
Hide file tree
Showing 6 changed files with 233 additions and 78 deletions.
19 changes: 17 additions & 2 deletions crates/bifrost/src/loglet/loglet_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,11 +387,26 @@ pub async fn single_loglet_readstream_with_trims(
loglet.append(format!("record{}", i).into()).await?;
}

// When reading record 8, it's acceptable to observe the record, or the trim gap. Both are
// acceptable because replicated loglet read stream's readhead cannot be completely disabled.
// Its minimum is to immediately read the next record after consuming the last one, so we'll
// see record8 because it's already cached.
//
// read stream should send a gap from 8->10
let record = read_stream.next().await.unwrap()?;
assert_that!(record.sequence_number(), eq(Lsn::new(8)));
assert!(record.is_trim_gap());
assert_that!(record.trim_gap_to_sequence_number(), eq(Some(Lsn::new(10))));
if record.is_trim_gap() {
assert!(record.is_trim_gap());
assert_that!(record.trim_gap_to_sequence_number(), eq(Some(Lsn::new(10))));
} else {
// data record.
assert_that!(record.decode_unchecked::<String>(), eq("record8"));
// next record should be the trim gap
let record = read_stream.next().await.unwrap()?;
assert_that!(record.sequence_number(), eq(Lsn::new(9)));
assert!(record.is_trim_gap());
assert_that!(record.trim_gap_to_sequence_number(), eq(Some(Lsn::new(10))));
}

for i in 11..=20 {
let record = read_stream.next().await.unwrap()?;
Expand Down
183 changes: 121 additions & 62 deletions crates/bifrost/src/providers/replicated_loglet/loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,18 +340,18 @@ impl<T: TransportConnect> Loglet for ReplicatedLoglet<T> {

#[cfg(test)]
mod tests {
use std::num::NonZeroU8;

use super::*;

use std::num::{NonZeroU8, NonZeroUsize};

use googletest::prelude::*;
use test_log::test;

use restate_core::network::NetworkServerBuilder;
use restate_core::TestCoreEnvBuilder;
use restate_log_server::LogServerService;
use restate_rocksdb::RocksDbManager;
use restate_types::config::Configuration;
use restate_types::config::{set_current_config, Configuration};
use restate_types::health::HealthStatus;
use restate_types::live::Live;
use restate_types::logs::Keys;
Expand All @@ -366,22 +366,24 @@ mod tests {
}

async fn run_in_test_env<F, O>(
config: Configuration,
loglet_params: ReplicatedLogletParams,
record_cache: RecordCache,
mut future: F,
) -> googletest::Result<()>
where
F: FnMut(TestEnv) -> O,
O: std::future::Future<Output = googletest::Result<()>>,
{
let config = Live::from_value(Configuration::default());
set_current_config(config.clone());
let config = Live::from_value(config);

let mut node_env =
TestCoreEnvBuilder::with_incoming_only_connector().add_mock_nodes_config();
let mut server_builder = NetworkServerBuilder::default();

let logserver_rpc = LogServersRpc::new(&mut node_env.router_builder);
let sequencer_rpc = SequencersRpc::new(&mut node_env.router_builder);
let record_cache = RecordCache::new(1_000_000);

let log_server = LogServerService::create(
HealthStatus::default(),
Expand Down Expand Up @@ -440,30 +442,36 @@ mod tests {
replication: ReplicationProperty::new(NonZeroU8::new(1).unwrap()),
nodeset: NodeSet::from_single(PlainNodeId::new(1)),
};
let record_cache = RecordCache::new(1_000_000);

run_in_test_env(params, |env| async move {
let batch: Arc<[Record]> = vec![
("record-1", Keys::Single(1)).into(),
("record-2", Keys::Single(2)).into(),
("record-3", Keys::Single(3)).into(),
]
.into();
let offset = env.loglet.enqueue_batch(batch.clone()).await?.await?;
assert_that!(offset, eq(LogletOffset::new(3)));
let offset = env.loglet.enqueue_batch(batch.clone()).await?.await?;
assert_that!(offset, eq(LogletOffset::new(6)));
let tail = env.loglet.find_tail().await?;
assert_that!(tail, eq(TailState::Open(LogletOffset::new(7))));

let cached_record = env.record_cache.get(loglet_id, 1.into());
assert!(cached_record.is_some());
assert_that!(
cached_record.unwrap().keys().clone(),
matches_pattern!(Keys::Single(eq(1)))
);
run_in_test_env(
Configuration::default(),
params,
record_cache,
|env| async move {
let batch: Arc<[Record]> = vec![
("record-1", Keys::Single(1)).into(),
("record-2", Keys::Single(2)).into(),
("record-3", Keys::Single(3)).into(),
]
.into();
let offset = env.loglet.enqueue_batch(batch.clone()).await?.await?;
assert_that!(offset, eq(LogletOffset::new(3)));
let offset = env.loglet.enqueue_batch(batch.clone()).await?.await?;
assert_that!(offset, eq(LogletOffset::new(6)));
let tail = env.loglet.find_tail().await?;
assert_that!(tail, eq(TailState::Open(LogletOffset::new(7))));

let cached_record = env.record_cache.get(loglet_id, 1.into());
assert!(cached_record.is_some());
assert_that!(
cached_record.unwrap().keys().clone(),
matches_pattern!(Keys::Single(eq(1)))
);

Ok(())
})
Ok(())
},
)
.await
}

Expand All @@ -478,92 +486,143 @@ mod tests {
nodeset: NodeSet::from_single(PlainNodeId::new(1)),
};

run_in_test_env(params, |env| async move {
let batch: Arc<[Record]> = vec![
("record-1", Keys::Single(1)).into(),
("record-2", Keys::Single(2)).into(),
("record-3", Keys::Single(3)).into(),
]
.into();
let offset = env.loglet.enqueue_batch(batch.clone()).await?.await?;
assert_that!(offset, eq(LogletOffset::new(3)));
let offset = env.loglet.enqueue_batch(batch.clone()).await?.await?;
assert_that!(offset, eq(LogletOffset::new(6)));
let tail = env.loglet.find_tail().await?;
assert_that!(tail, eq(TailState::Open(LogletOffset::new(7))));

env.loglet.seal().await?;
let batch: Arc<[Record]> = vec![
("record-4", Keys::Single(4)).into(),
("record-5", Keys::Single(5)).into(),
]
.into();
let not_appended = env.loglet.enqueue_batch(batch).await?.await;
assert_that!(not_appended, err(pat!(AppendError::Sealed)));
let tail = env.loglet.find_tail().await?;
assert_that!(tail, eq(TailState::Sealed(LogletOffset::new(7))));

Ok(())
let record_cache = RecordCache::new(1_000_000);
run_in_test_env(
Configuration::default(),
params,
record_cache,
|env| async move {
let batch: Arc<[Record]> = vec![
("record-1", Keys::Single(1)).into(),
("record-2", Keys::Single(2)).into(),
("record-3", Keys::Single(3)).into(),
]
.into();
let offset = env.loglet.enqueue_batch(batch.clone()).await?.await?;
assert_that!(offset, eq(LogletOffset::new(3)));
let offset = env.loglet.enqueue_batch(batch.clone()).await?.await?;
assert_that!(offset, eq(LogletOffset::new(6)));
let tail = env.loglet.find_tail().await?;
assert_that!(tail, eq(TailState::Open(LogletOffset::new(7))));

env.loglet.seal().await?;
let batch: Arc<[Record]> = vec![
("record-4", Keys::Single(4)).into(),
("record-5", Keys::Single(5)).into(),
]
.into();
let not_appended = env.loglet.enqueue_batch(batch).await?.await;
assert_that!(not_appended, err(pat!(AppendError::Sealed)));
let tail = env.loglet.find_tail().await?;
assert_that!(tail, eq(TailState::Sealed(LogletOffset::new(7))));

Ok(())
},
)
.await
}

// # Loglet Spec Tests On Single Node
// ** Single-node replicated-loglet **
#[test(tokio::test(start_paused = true))]
async fn single_node_gapless_loglet_smoke_test() -> Result<()> {
let record_cache = RecordCache::new(1_000_000);
let loglet_id = ReplicatedLogletId::new_unchecked(122);
let params = ReplicatedLogletParams {
loglet_id,
sequencer: GenerationalNodeId::new(1, 1),
replication: ReplicationProperty::new(NonZeroU8::new(1).unwrap()),
nodeset: NodeSet::from_single(PlainNodeId::new(1)),
};
run_in_test_env(Configuration::default(), params, record_cache, |env| {
crate::loglet::loglet_tests::gapless_loglet_smoke_test(env.loglet)
})
.await
}

// ** Single-node replicated-loglet read-stream **
#[test(tokio::test(start_paused = true))]
async fn replicated_loglet_single_loglet_readstream() -> Result<()> {
async fn single_node_single_loglet_readstream() -> Result<()> {
let loglet_id = ReplicatedLogletId::new_unchecked(122);
let params = ReplicatedLogletParams {
loglet_id,
sequencer: GenerationalNodeId::new(1, 1),
replication: ReplicationProperty::new(NonZeroU8::new(1).unwrap()),
nodeset: NodeSet::from_single(PlainNodeId::new(1)),
};
run_in_test_env(params, |env| {
let record_cache = RecordCache::new(1_000_000);
run_in_test_env(Configuration::default(), params, record_cache, |env| {
crate::loglet::loglet_tests::single_loglet_readstream(env.loglet)
})
.await
}

#[test(tokio::test(start_paused = true))]
async fn replicated_loglet_single_append_after_seal() -> Result<()> {
async fn single_node_single_loglet_readstream_with_trims() -> Result<()> {
let loglet_id = ReplicatedLogletId::new_unchecked(122);
let params = ReplicatedLogletParams {
loglet_id,
sequencer: GenerationalNodeId::new(1, 1),
replication: ReplicationProperty::new(NonZeroU8::new(1).unwrap()),
nodeset: NodeSet::from_single(PlainNodeId::new(1)),
};
run_in_test_env(params, |env| {
// For this test to work, we need to disable the record cache to ensure we
// observer the moving trimpoint.
let mut config = Configuration::default();
// disable read-ahead to avoid reading records from log-servers before the trim taking
// place.
config.bifrost.replicated_loglet.readahead_records = NonZeroUsize::new(1).unwrap();
config.bifrost.replicated_loglet.readahead_trigger_ratio = 1.0;
let record_cache = RecordCache::new(0);
run_in_test_env(config, params, record_cache, |env| {
crate::loglet::loglet_tests::single_loglet_readstream_with_trims(env.loglet)
})
.await
}

#[test(tokio::test(start_paused = true))]
async fn single_node_append_after_seal() -> Result<()> {
let loglet_id = ReplicatedLogletId::new_unchecked(122);
let params = ReplicatedLogletParams {
loglet_id,
sequencer: GenerationalNodeId::new(1, 1),
replication: ReplicationProperty::new(NonZeroU8::new(1).unwrap()),
nodeset: NodeSet::from_single(PlainNodeId::new(1)),
};
let record_cache = RecordCache::new(1_000_000);
run_in_test_env(Configuration::default(), params, record_cache, |env| {
crate::loglet::loglet_tests::append_after_seal(env.loglet)
})
.await
}

#[test(tokio::test(start_paused = true))]
async fn replicated_loglet_single_append_after_seal_concurrent() -> Result<()> {
async fn single_node_append_after_seal_concurrent() -> Result<()> {
let loglet_id = ReplicatedLogletId::new_unchecked(122);
let params = ReplicatedLogletParams {
loglet_id,
sequencer: GenerationalNodeId::new(1, 1),
replication: ReplicationProperty::new(NonZeroU8::new(1).unwrap()),
nodeset: NodeSet::from_single(PlainNodeId::new(1)),
};
run_in_test_env(params, |env| {

let record_cache = RecordCache::new(1_000_000);
run_in_test_env(Configuration::default(), params, record_cache, |env| {
crate::loglet::loglet_tests::append_after_seal_concurrent(env.loglet)
})
.await
}

#[test(tokio::test(start_paused = true))]
async fn replicated_loglet_single_seal_empty() -> Result<()> {
async fn single_node_seal_empty() -> Result<()> {
let loglet_id = ReplicatedLogletId::new_unchecked(122);
let params = ReplicatedLogletParams {
loglet_id,
sequencer: GenerationalNodeId::new(1, 1),
replication: ReplicationProperty::new(NonZeroU8::new(1).unwrap()),
nodeset: NodeSet::from_single(PlainNodeId::new(1)),
};
run_in_test_env(params, |env| {
let record_cache = RecordCache::new(1_000_000);
run_in_test_env(Configuration::default(), params, record_cache, |env| {
crate::loglet::loglet_tests::seal_empty(env.loglet)
})
.await
Expand Down
Loading

0 comments on commit e25e700

Please sign in to comment.