Skip to content

Commit

Permalink
Auto merge of #736 - Mark-Simulacrum:refactor-worker, r=Mark-Simulacrum
Browse files Browse the repository at this point in the history
Refactor worker

This moves all pushes of logs to the central server out of the execution
of tasks and into the worker function, which ensures that we don't
overwrite previous results for the same crate multiple times (when
retrying its run). That should hopefully preserve the logs as well
throughout the retries.
  • Loading branch information
bors committed Aug 31, 2024
2 parents 9068125 + d9bc6ab commit 1cbbe32
Show file tree
Hide file tree
Showing 8 changed files with 261 additions and 349 deletions.
19 changes: 10 additions & 9 deletions src/agent/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
mod api;
mod results;

use crate::agent::api::AgentApi;
use crate::agent::results::ResultsUploader;
pub use crate::agent::api::AgentApi;
use crate::config::Config;
use crate::crates::Crate;
use crate::db::{Database, QueryUtils};
Expand Down Expand Up @@ -149,7 +147,6 @@ fn run_heartbeat(url: &str, token: &str) {
fn run_experiment(
agent: &Agent,
workspace: &Workspace,
db: &ResultsUploader,
threads_count: usize,
past_experiment: &mut Option<String>,
) -> Result<(), (Option<Box<Experiment>>, Error)> {
Expand All @@ -173,9 +170,14 @@ fn run_experiment(
}
}

crate::runner::run_ex(&ex, workspace, db, threads_count, &agent.config, &|| {
agent.next_crate(&ex.name)
})
crate::runner::run_ex(
&ex,
workspace,
&agent.api,
threads_count,
&agent.config,
&|| agent.next_crate(&ex.name),
)
.map_err(|err| (Some(Box::new(ex)), err))?;
Ok(())
}
Expand All @@ -188,15 +190,14 @@ pub fn run(
workspace: &Workspace,
) -> Fallible<()> {
let agent = Agent::new(url, token, caps)?;
let db = results::ResultsUploader::new(&agent.api);

run_heartbeat(url, token);
health_thread();

let mut past_experiment = None;
loop {
if let Err((ex, err)) =
run_experiment(&agent, workspace, &db, threads_count, &mut past_experiment)
run_experiment(&agent, workspace, threads_count, &mut past_experiment)
{
utils::report_failure(&err);
if let Some(ex) = ex {
Expand Down
92 changes: 0 additions & 92 deletions src/agent/results.rs

This file was deleted.

18 changes: 18 additions & 0 deletions src/results/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,24 @@ impl<'a> WriteResults for DatabaseDB<'a> {
}
}

impl crate::runner::RecordProgress for DatabaseDB<'_> {
fn record_progress(
&self,
ex: &Experiment,
krate: &Crate,
toolchain: &Toolchain,
log: &[u8],
result: &TestResult,
version: Option<(&Crate, &Crate)>,
) -> Fallible<()> {
self.store_result(ex, krate, toolchain, result, log, EncodingType::Plain)?;
if let Some((old, new)) = version {
self.update_crate_version(ex, old, new)?;
}
Ok(())
}
}

impl<'a> DeleteResults for DatabaseDB<'a> {
fn delete_all_results(&self, ex: &Experiment) -> Fallible<()> {
self.db
Expand Down
18 changes: 14 additions & 4 deletions src/runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ use crate::config::Config;
use crate::crates::Crate;
use crate::experiments::{Experiment, Mode};
use crate::prelude::*;
use crate::results::{TestResult, WriteResults};
use crate::results::TestResult;
use crate::runner::worker::{DiskSpaceWatcher, Worker};
use rustwide::Workspace;
use std::thread::scope;
use std::time::Duration;
pub use worker::RecordProgress;

const DISK_SPACE_WATCHER_INTERVAL: Duration = Duration::from_secs(30);
const DISK_SPACE_WATCHER_THRESHOLD: f32 = 0.80;
Expand All @@ -20,10 +21,10 @@ const DISK_SPACE_WATCHER_THRESHOLD: f32 = 0.80;
#[error("overridden task result to {0}")]
pub struct OverrideResult(TestResult);

pub fn run_ex<DB: WriteResults + Sync>(
pub fn run_ex(
ex: &Experiment,
workspace: &Workspace,
db: &DB,
api: &dyn RecordProgress,
threads_count: usize,
config: &Config,
next_crate: &(dyn Fn() -> Fallible<Option<Crate>> + Send + Sync),
Expand Down Expand Up @@ -82,7 +83,16 @@ pub fn run_ex<DB: WriteResults + Sync>(
info!("running tasks in {} threads...", threads_count);

let workers = (0..threads_count)
.map(|i| Worker::new(format!("worker-{i}"), workspace, ex, config, db, next_crate))
.map(|i| {
Worker::new(
format!("worker-{i}"),
workspace,
ex,
config,
api,
next_crate,
)
})
.collect::<Vec<_>>();

let disk_watcher = DiskSpaceWatcher::new(
Expand Down
Loading

0 comments on commit 1cbbe32

Please sign in to comment.