Skip to content

Commit

Permalink
Merge pull request #4588 from eval-exec/exec/sequence-ci
Browse files Browse the repository at this point in the history
IntegrationTest: Let RandomKill and SyncChurn run in sequence
  • Loading branch information
eval-exec authored Aug 14, 2024
2 parents f4a3f7a + b3553e7 commit bfcab0c
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 10 deletions.
8 changes: 7 additions & 1 deletion test/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,19 @@ fn main() {
let mut test_results = Vec::new();
let mut worker_running = worker_count;
let mut done_specs = 0;
let mut started_sequential = false;
while worker_running > 0 {
if max_time > 0 && start_time.elapsed().as_secs() > max_time {
// shutdown, specs running to long
workers.shutdown();
break;
}

if worker_running == 1 && !started_sequential {
started_sequential = true;
workers.start_sequencial()
}

let msg = match notify_rx.recv_timeout(Duration::from_secs(5)) {
Ok(msg) => msg,
Err(err) => {
Expand Down Expand Up @@ -590,9 +596,9 @@ fn all_specs() -> Vec<Box<dyn Spec>> {
Box::new(CheckVmVersion2),
Box::new(CheckVmBExtension),
Box::new(RandomlyKill),
Box::new(SyncChurn),
];
specs.shuffle(&mut thread_rng());
specs.insert(0, Box::new(SyncChurn) as Box<dyn Spec>);
specs
}

Expand Down
85 changes: 76 additions & 9 deletions test/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::time::Instant;
#[derive(PartialEq, Eq)]
pub enum Command {
Shutdown,
StartSequencial,
}

/// Notify from worker
Expand Down Expand Up @@ -45,6 +46,9 @@ pub struct Worker {
inbox: Receiver<Command>,
outbox: Sender<Notify>,
start_port: Arc<AtomicU16>,

sequencial_tasks: Arc<Mutex<Vec<Box<dyn Spec>>>>,
sequencial_worker: bool,
}

impl Clone for Worker {
Expand All @@ -54,13 +58,18 @@ impl Clone for Worker {
inbox: self.inbox.clone(),
outbox: self.outbox.clone(),
start_port: Arc::clone(&self.start_port),
sequencial_tasks: Arc::clone(&self.sequencial_tasks),
sequencial_worker: self.sequencial_worker,
}
}
}

const SEQUENCIAL_TASKS: &[&str] = &["RandomlyKill", "SyncChurn"];

impl Worker {
pub fn new(
tasks: Arc<Mutex<Vec<Box<dyn Spec>>>>,
sequencial_tasks: Arc<Mutex<Vec<Box<dyn Spec>>>>,
inbox: Receiver<Command>,
outbox: Sender<Notify>,
start_port: Arc<AtomicU16>,
Expand All @@ -70,12 +79,16 @@ impl Worker {
inbox,
outbox,
start_port,
sequencial_tasks,
sequencial_worker: false,
}
}

/// start handle tasks
pub fn start(self) -> JoinHandle<()> {
thread::spawn(move || {
let mut start_sequencial_task = false;

loop {
let msg = match self.inbox.try_recv() {
Ok(msg) => Some(msg),
Expand All @@ -88,20 +101,54 @@ impl Worker {
}
};
// check command
if Some(Command::Shutdown) == msg {
self.outbox.send(Notify::Stop).unwrap();
return;
match msg {
Some(Command::StartSequencial) => {
start_sequencial_task = true;
}
Some(Command::Shutdown) => {
self.outbox.send(Notify::Stop).unwrap();
return;
}
_ => {}
}

// pick a spec to run
let spec = match self.tasks.lock().pop() {
Some(spec) => spec,

let task = self.tasks.lock().pop();
match task {
Some(spec) => {
// if spec.name() is RandomlyKill or SyncChurn, then push it to sequencial_tasks
if SEQUENCIAL_TASKS.contains(&spec.name()) {
info!("push {} to sequencial_tasks", spec.name());
self.sequencial_tasks.lock().push(spec);
} else {
self.run_spec(spec.as_ref(), 0);
}
}
None => {
self.outbox.send(Notify::Stop).unwrap();
return;
if self.sequencial_worker {
info!("sequencial worker is waiting for command");
if start_sequencial_task {
match self.sequencial_tasks.lock().pop() {
Some(spec) => {
self.run_spec(spec.as_ref(), 0);
}
None => {
info!("sequencial worker has no task to run");
self.outbox.send(Notify::Stop).unwrap();
return;
}
};
} else {
info!("sequencial worker is waiting for parallel workers finish");
std::thread::sleep(std::time::Duration::from_secs(1));
}
} else {
self.outbox.send(Notify::Stop).unwrap();
return;
}
}
};

self.run_spec(spec.as_ref(), 0);
}
})
}
Expand Down Expand Up @@ -176,13 +223,17 @@ impl Workers {
start_port: u16,
) -> Self {
let start_port = Arc::new(AtomicU16::new(start_port));

let sequencial_tasks = Arc::new(Mutex::new(Vec::new()));
let workers: Vec<_> = (0..count)
.map({
let tasks = Arc::clone(&tasks);
let sequencial_tasks = Arc::clone(&sequencial_tasks);
move |_| {
let (command_tx, command_rx) = unbounded();
let worker = Worker::new(
Arc::clone(&tasks),
Arc::clone(&sequencial_tasks),
command_rx,
outbox.clone(),
Arc::clone(&start_port),
Expand All @@ -200,6 +251,8 @@ impl Workers {

/// start all workers
pub fn start(&mut self) {
self.workers.first_mut().unwrap().1.sequencial_worker = true;

let mut join_handles = Vec::new();
for w in self.workers.iter_mut() {
let h = w.1.clone().start();
Expand All @@ -208,6 +261,20 @@ impl Workers {
self.join_handles.replace(join_handles);
}

pub fn start_sequencial(&mut self) {
if let Err(err) = self
.workers
.first()
.unwrap()
.0
.send(Command::StartSequencial)
{
error!("start sequencial worker failed, error: {}", err);
} else {
info!("start sequencial worker success")
}
}

/// shutdown all workers, must call join_all after this.
pub fn shutdown(&mut self) {
if self.is_shutdown {
Expand Down

0 comments on commit bfcab0c

Please sign in to comment.