diff --git a/src/factotum/executor/execution_strategy/mod.rs b/src/factotum/executor/execution_strategy/mod.rs new file mode 100644 index 0000000..cacc790 --- /dev/null +++ b/src/factotum/executor/execution_strategy/mod.rs @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2016 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, and + * you may not use this file except in compliance with the Apache License + * Version 2.0. You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the Apache License Version 2.0 is distributed on an "AS + * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the Apache License Version 2.0 for the specific language + * governing permissions and limitations there under. + */ + +#[cfg(test)] +mod tests; +use std::process::Command; +use chrono::UTC; +use std::time::{Instant, Duration}; +use chrono::DateTime; + +pub struct RunResult { + pub run_started: DateTime, + pub duration: Duration, + pub task_execution_error: Option, + pub stdout: Option, + pub stderr: Option, + pub return_code: i32 +} + +pub fn simulation_text(name:&str, command: &Command) -> String { + + use std::cmp; + let command_text = format!("{:?}", command); + + let col_task_title = "TASK"; + let col_command_title = "COMMAND"; + let col_padding = 2; + let task_col_width = cmp::max(name.len()+col_padding, col_task_title.len()+col_padding); + let command_col_width = cmp::max(command_text.len()+col_padding, col_command_title.len()+col_padding); + + let lines = vec![ + format!("/{fill:->taskwidth$}|{fill:->cmdwidth$}\\", fill="-", taskwidth=task_col_width, cmdwidth=command_col_width), + format!("| {:taskwidth$} | {:cmdwidth$} |", "TASK", "COMMAND", taskwidth=task_col_width-col_padding, cmdwidth=command_col_width-col_padding), + format!("|{fill:- RunResult { + info!("Simulating execution for {} with command {:?}", name, command); + RunResult { + run_started: UTC::now(), + duration: Duration::from_secs(0), + task_execution_error: None, + stdout: Some(simulation_text(name, &command)), + stderr: None, + return_code: 0 + } +} + +pub fn execute_os(name:&str, command:&mut Command) -> RunResult { + let run_start = Instant::now(); + let start_time_utc = UTC::now(); + info!("Executing sh {:?}", command); + match command.output() { + Ok(r) => { + let run_duration = run_start.elapsed(); + let return_code = r.status.code().unwrap_or(1); // 1 will be returned if the process was killed by a signal + + let task_stdout: String = String::from_utf8_lossy(&r.stdout).trim_right().into(); + let task_stderr: String = String::from_utf8_lossy(&r.stderr).trim_right().into(); + + info!("task '{}' stdout:\n'{}'", name, task_stdout); + info!("task '{}' stderr:\n'{}'", name, task_stderr); + + let task_stdout_opt = if task_stdout.is_empty() { None } else { Some(task_stdout) }; + let task_stderr_opt = if task_stderr.is_empty() { None } else { Some(task_stderr) }; + + RunResult { + run_started: start_time_utc, + duration: run_duration, + task_execution_error: None, + stdout: task_stdout_opt, + stderr: task_stderr_opt, + return_code: return_code + } + }, + Err(message) => RunResult { + run_started: start_time_utc, + duration: Duration::from_secs(0), + task_execution_error: Some(format!("Error executing process - {}", message)), + stdout: None, + stderr: None, + return_code: -1 + } + } +} diff --git a/src/factotum/executor/execution_strategy/tests.rs b/src/factotum/executor/execution_strategy/tests.rs new file mode 100644 index 0000000..43af0de --- /dev/null +++ b/src/factotum/executor/execution_strategy/tests.rs @@ -0,0 +1,125 @@ +/* + * Copyright (c) 2016 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, and + * you may not use this file except in compliance with the Apache License + * Version 2.0. You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the Apache License Version 2.0 is distributed on an "AS + * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the Apache License Version 2.0 for the specific language + * governing permissions and limitations there under. + */ + +use factotum::executor::execution_strategy::*; +use std::process::Command; +use chrono::UTC; +use chrono::duration::Duration; +use std::cmp; +use std::iter; + +fn fill(fillstr:&str, times:usize) -> String { + iter::repeat(fillstr).take(times).collect::() +} + +#[test] +fn simulation_text_good() { + + let mut command = Command::new("sh"); + command.arg("-c"); + command.arg("does_something.sh"); + let task_name = "Simulation Task!"; + let command_text = format!("{:?}", command); + + let text = simulation_text(task_name, &command); + + // FACTOTUM SIMULATION ONLY. THE TASK HAS NOT BEEN EXECUTED. + + // /--------|------------------------------------------------------------------------------\ + // | TASK | COMMAND | + // |--------|------------------------------------------------------------------------------| + // | ABC | sh -c 'potato' | + // \--------|------------------------------------------------------------------------------/ + + let task_name_width = cmp::max(task_name.len()+2, "TASK".len()+2); + let command_width = cmp::max("COMMAND".len()+2, command_text.len()+2); + + println!("task width:{} command width: {}", task_name_width, command_width); + + let lines = vec![ format!("/{}|{}\\", fill("-", task_name_width), fill("-", command_width)), + format!("| TASK {}| COMMAND {}|", fill(" ", task_name_width-" TASK ".len()), fill(" ", command_width-" COMMAND ".len())), + format!("|{}|{}|", fill("-", task_name_width), fill("-", command_width)), + format!("| {:taskwidth$} | {:commandwidth$} |", task_name, command_text, taskwidth=task_name_width-2, commandwidth=command_width-2), + format!("\\{}|{}/\n", fill("-", task_name_width), fill("-", command_width)) ]; + + let expected = lines.join("\n"); + + println!("*** EXPECTED ***"); + println!("{}", expected); + println!("*** ACTUAL ***"); + println!("{}", text); + + assert_eq!(expected, text); +} + + #[test] + fn simulation_returns_good() { + let mut command:Command = Command::new("banana"); + command.arg("hello_world"); + let result = execute_simulation("hello-world", &mut command); + + assert_eq!(result.return_code, 0); + assert!(result.run_started > UTC::now().checked_sub(Duration::seconds(60)).unwrap()); + assert_eq!(result.duration, Duration::seconds(0).to_std().ok().unwrap()); + assert_eq!(result.stdout.unwrap(), simulation_text("hello-world", &command)); + assert!(result.stderr.is_some()==false) + } + + #[test] + fn os_execution_notfound() { + let mut command:Command = Command::new("sh"); + command.arg("-c"); + command.arg("banana"); + let result = execute_os("hello-world", &mut command); + + assert_eq!(result.return_code, 127); + assert!(result.run_started > UTC::now().checked_sub(Duration::seconds(60)).unwrap()); + assert_eq!(result.duration.as_secs(), 0); + let stderr = result.stderr.unwrap(); + println!("{}", stderr); + assert!(stderr.contains("banana")); + assert!(stderr.contains("not found")); + assert_eq!(result.stdout, None); + assert_eq!(result.task_execution_error, None) + } + + #[test] + fn os_execution_task_exec_failed() { + let mut command:Command = Command::new("this-doesn't-exist"); + let result = execute_os("hello-world", &mut command); + + assert_eq!(result.return_code, -1); + assert!(result.run_started > UTC::now().checked_sub(Duration::seconds(60)).unwrap()); + assert_eq!(result.duration.as_secs(), 0); + assert_eq!(result.stderr, None); + assert_eq!(result.stdout, None); + let expected_msg = "Error executing process - No such file or directory".to_string(); + assert_eq!(result.task_execution_error.unwrap()[..expected_msg.len()], expected_msg); + } + + #[test] + fn os_execution_good() { + let mut command:Command = Command::new("sh"); + command.arg("-c"); + command.arg("type echo"); + let result = execute_os("hello-world", &mut command); + + assert_eq!(result.return_code, 0); + assert!(result.run_started > UTC::now().checked_sub(Duration::seconds(60)).unwrap()); + assert_eq!(result.duration.as_secs(), 0); + assert_eq!(result.stderr, None); + assert_eq!(result.stdout.unwrap(), "echo is a shell builtin"); + assert_eq!(result.task_execution_error, None); + } \ No newline at end of file diff --git a/src/factotum/executor/mod.rs b/src/factotum/executor/mod.rs index 964777d..a3cfe05 100644 --- a/src/factotum/executor/mod.rs +++ b/src/factotum/executor/mod.rs @@ -13,59 +13,23 @@ * governing permissions and limitations there under. */ -use factotum::factfile::*; +use factotum::factfile::Task as FactfileTask; +use factotum::factfile::Factfile; use std::process::Command; -use std::time::{Duration, Instant}; use std::thread; use std::sync::mpsc; -use std::collections::HashMap; -use chrono::DateTime; -use chrono::UTC; -enum TaskResult { - Ok(i32, Duration), - TerminateJobPlease(i32, Duration), - Error(Option, String) -} - -pub struct RunResult { - pub run_started: DateTime, - pub duration: Duration, - pub requests_job_termination: bool, - pub task_execution_error: Option, - pub stdout: Option, - pub stderr: Option, - pub return_code: i32 -} +pub mod execution_strategy; +pub mod task_list; +#[cfg(test)] +mod tests; -pub struct TaskExecutionResult { - pub name: String, - pub attempted: bool, - pub run_details: Option -} +use factotum::executor::task_list::*; +use factotum::executor::execution_strategy::*; -pub enum ExecutionResult { - AllTasksComplete(Vec), - EarlyFinishOk(Vec), - AbnormalTermination(Vec) -} +pub fn get_task_execution_list(factfile:&Factfile, start_from:Option) -> TaskList<&FactfileTask> { + let mut task_list = TaskList::<&FactfileTask>::new(); -#[inline] -fn drain_values(mut map:HashMap, tasks_in_order:&Vec>) -> Vec { - let mut task_seq:Vec = vec![]; - for task_level in tasks_in_order.iter() { - for task in task_level.iter() { - match map.remove(&task.name) { - Some(task_result) => task_seq.push(task_result), - _ => warn!("A task ({}) does not have an execution result? Skipping", task.name) - } - } - } - task_seq -} - -pub fn execute_factfile(factfile:&Factfile, start_from:Option) -> ExecutionResult { - let tasks = if let Some(start_task) = start_from { info!("Reduced run! starting from {}", &start_task); factfile.get_tasks_in_order_from(&start_task) @@ -73,39 +37,56 @@ pub fn execute_factfile(factfile:&Factfile, start_from:Option) -> Execut factfile.get_tasks_in_order() }; - for (idx, task_level) in tasks.iter().enumerate() { - info!("Run level: {}", idx); - for task in task_level.iter() { - info!("Task name: {}", task.name); + for task_level in tasks.iter() { + let task_group: TaskGroup<&FactfileTask> = task_level + .iter() + .map(|t| task_list::Task::<&FactfileTask>::new(t.name.clone(), t)) + .collect(); + match task_list.add_group(task_group) { + Ok(_) => (), + Err(msg) => panic!(format!("Couldn't add task to group: {}", msg)) } } - - let mut task_results:HashMap = HashMap::new(); - for task_level in tasks.iter() { // TODO replace me with helper iterator + + for task_level in tasks.iter() { for task in task_level.iter() { - let new_task_result = TaskExecutionResult { name: task.name.clone(), attempted: false, run_details:None }; - task_results.insert(new_task_result.name.clone(), new_task_result ); + for dep in task.depends_on.iter() { + if task_list.is_task_name_present(&dep) && task_list.is_task_name_present(&task.name) { + match task_list.set_child(&dep, &task.name) { + Ok(_) => (), + Err(msg) => panic!(format!("Executor: couldn't add '{}' to child '{}': {}", dep, task.name, msg)) + } + } + } } - } + } + + task_list +} + + +pub fn execute_factfile<'a, F>(factfile:&'a Factfile, start_from:Option, strategy:F) -> TaskList<&'a FactfileTask> + where F : Fn(&str, &mut Command) -> RunResult + Send + Sync + 'static + Copy { + + let mut tasklist = get_task_execution_list(factfile, start_from); - for task_level in tasks.iter() { - // everything in a task "level" gets run together - let (tx, rx) = mpsc::channel::<(usize, TaskResult, Option, Option, DateTime)>(); + for mut task_group in tasklist.tasks.iter_mut() { + // everything in a task "group" gets run together + let (tx, rx) = mpsc::channel::<(usize, RunResult)>(); - for (idx,task) in task_level.iter().enumerate() { + for (idx,task) in task_group.iter().enumerate() { info!("Running task '{}'!", task.name); { let tx = tx.clone(); - let args = format_args(&task.command, &task.arguments); - let executor = task.executor.to_string(); - let continue_job_codes = task.on_result.continue_job.clone(); - let terminate_job_codes = task.on_result.terminate_job.clone(); + let args = format_args(&task.task_spec.command, &task.task_spec.arguments); let task_name = task.name.to_string(); thread::spawn(move || { - let start_time = UTC::now(); - let (task_result, stdout, stderr) = execute_task(task_name, executor, args, terminate_job_codes, continue_job_codes); - tx.send((idx, task_result, stdout, stderr, start_time)).unwrap(); + let mut command = Command::new("sh"); + command.arg("-c"); + command.arg(args); + let task_result = strategy(&task_name, &mut command); + tx.send((idx, task_result)).unwrap(); }); } } @@ -113,107 +94,44 @@ pub fn execute_factfile(factfile:&Factfile, start_from:Option) -> Execut let mut terminate_job_please = false; let mut task_failed = false; - for _ in 0..task_level.len() { - match rx.recv().unwrap() { - (idx, TaskResult::Ok(code, duration), stdout, stderr, start_time) => { - info!("'{}' returned {} in {:?}", task_level[idx].name, code, duration); - let task_result:&mut TaskExecutionResult = task_results.get_mut(&task_level[idx].name).unwrap(); - task_result.attempted = true; - task_result.run_details = Some(RunResult { run_started: start_time, - duration: duration, - requests_job_termination: false, - task_execution_error: None, - stdout: stdout, - stderr: stderr, - return_code: code }); - }, - (idx, TaskResult::Error(code, msg), stdout, stderr, start_time) => { - warn!("task '{}' failed to execute!\n{}", task_level[idx].name, msg); - let task_result:&mut TaskExecutionResult = task_results.get_mut(&task_level[idx].name).unwrap(); - task_result.attempted = true; - - if let Some(return_code) = code { - task_result.run_details = Some(RunResult { - run_started: start_time, - duration: Duration::from_secs(0), - requests_job_termination: false, - task_execution_error: Some(msg), - stdout: stdout, - stderr: stderr, - return_code: return_code }); - } - task_failed = true; - }, - (idx, TaskResult::TerminateJobPlease(code, duration), stdout, stderr, start_time) => { - warn!("job will stop as task '{}' called for termination (no-op) with code {}", task_level[idx].name, code); - - let task_result:&mut TaskExecutionResult = task_results.get_mut(&task_level[idx].name).unwrap(); - task_result.attempted = true; - task_result.run_details = Some(RunResult { - run_started: start_time, - duration: duration, - requests_job_termination: true, - task_execution_error: None, - stdout: stdout, - stderr: stderr, - return_code: code }); - - terminate_job_please = true; - } - } + for _ in 0..task_group.len() { + let (idx, task_result) = rx.recv().unwrap(); + + info!("'{}' returned {} in {:?}", task_group[idx].name, task_result.return_code, task_result.duration); + + if task_group[idx].task_spec.on_result.terminate_job.contains(&task_result.return_code) { + // if the return code is in the terminate early list, prune the sub-tree (set to skipped) return early term + task_group[idx].state = State::SUCCESS_NOOP; + terminate_job_please = true; + } else if task_group[idx].task_spec.on_result.continue_job.contains(&task_result.return_code) { + // if the return code is in the continue list, return success + task_group[idx].state = State::SUCCESS; + } else { + // if the return code is not in either list, prune the sub-tree (set to skipped) and return error + let expected_codes = task_group[idx].task_spec.on_result.continue_job.iter() + .map(|code| code.to_string()) + .collect::>() + .join(","); + let err_msg = format!("the task exited with a value not specified in continue_job - {} (task expects one of the following return codes to continue [{}])", + task_result.return_code, + expected_codes); + task_group[idx].state = State::FAILED(err_msg); + task_failed = true; + } + + task_group[idx].run_result = Some(task_result); } - match (terminate_job_please, task_failed) { - (_, true) => { return ExecutionResult::AbnormalTermination(drain_values(task_results, &tasks)); }, - (true, false) => { return ExecutionResult::EarlyFinishOk(drain_values(task_results, &tasks)); }, - _ => {} + if terminate_job_please || task_failed { + break; } - } - - ExecutionResult::AllTasksComplete(drain_values(task_results, &tasks)) -} -fn execute_task(task_name:String, executor:String, args:String, terminate_job_codes:Vec, continue_job_codes:Vec) -> (TaskResult, Option, Option) { - if executor!="shell" { - return (TaskResult::Error(None, "Only shell executions are supported currently!".to_string()), None, None) - } else { - let run_start = Instant::now(); - info!("Executing sh -c {:?}", args); - match Command::new("sh").arg("-c").arg(args).output() { - Ok(r) => { - let run_duration = run_start.elapsed(); - let return_code = r.status.code().unwrap_or(1); // 1 will be returned if the process was killed by a signal - - let task_stdout: String = String::from_utf8_lossy(&r.stdout).trim_right().into(); - let task_stderr: String = String::from_utf8_lossy(&r.stderr).trim_right().into(); - - info!("task '{}' stdout:\n'{}'", task_name, task_stdout); - info!("task '{}' stderr:\n'{}'", task_name, task_stderr); - - let task_stdout_opt = if task_stdout.is_empty() { None } else { Some(task_stdout) }; - let task_stderr_opt = if task_stderr.is_empty() { None } else { Some(task_stderr) }; - - if terminate_job_codes.contains(&return_code) { - (TaskResult::TerminateJobPlease(return_code, run_duration), task_stdout_opt, task_stderr_opt) - } else if continue_job_codes.contains(&return_code) { - (TaskResult::Ok(return_code, run_duration), task_stdout_opt, task_stderr_opt) - } else { - let expected_codes = continue_job_codes.iter() - .map(|code| code.to_string()) - .collect::>() - .join(","); - (TaskResult::Error(Some(return_code), format!("the task exited with a value not specified in continue_job - {} (task expects one of the following return codes to continue [{}])", return_code, expected_codes)), - task_stdout_opt, - task_stderr_opt) - } - - }, - Err(message) => (TaskResult::Error(None, format!("Error executing process - {}", message)), None, None) - } } + + tasklist } -fn format_args(command:&str, args:&Vec) -> String { +pub fn format_args(command:&str, args:&Vec) -> String { let arg_str = args.iter() .map(|s| format!("\"{}\"", s)) .collect::>() diff --git a/src/factotum/executor/task_list/mod.rs b/src/factotum/executor/task_list/mod.rs new file mode 100644 index 0000000..0dd1b37 --- /dev/null +++ b/src/factotum/executor/task_list/mod.rs @@ -0,0 +1,135 @@ +/* + * Copyright (c) 2016 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, and + * you may not use this file except in compliance with the Apache License + * Version 2.0. You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the Apache License Version 2.0 is distributed on an "AS + * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the Apache License Version 2.0 for the specific language + * governing permissions and limitations there under. + */ + +#[cfg(test)] +mod tests; +use std::collections::HashMap; +use factotum::executor::execution_strategy::RunResult; + +#[derive(Clone, PartialEq, Debug)] +pub enum State { + WAITING, + SUCCESS, + SUCCESS_NOOP, + FAILED(String), + SKIPPED(String) +} + +pub struct Task { + pub name: String, + pub state: State, + pub children: Vec, + pub task_spec: T, + pub run_result: Option +} + +impl Task { + pub fn new>(name: S, task_spec: T) -> Self { + Task { + name: name.into(), + state: State::WAITING, + children: vec![], + task_spec: task_spec, + run_result: None + } + } +} + +pub type TaskGroup = Vec>; + +pub struct TaskList { + pub tasks: Vec>, + edges: HashMap> +} + +impl TaskList { + + pub fn new() -> Self { + TaskList { + tasks: vec![], + edges: HashMap::new() + } + } + + pub fn add_group(&mut self, tasks:TaskGroup) -> Result<(), String> { + { + let new_edges:Vec<&str> = tasks.iter() + .map(|t| t.name.as_ref()) + .collect(); + + for edge in new_edges { + if self.edges.contains_key(edge) { + return Err(format!("Task '{}' has been added already - task names must be unique", edge)) + } else { + self.edges.insert(edge.to_string(), vec![]); + } + } + } + + self.tasks.push(tasks); + return Ok(()); + } + + pub fn set_child(&mut self, parent:&str, child:&str) -> Result<(), String> { + if self.get_task_by_name(&child).is_some() { + if let Some(children) = self.edges.get_mut(parent) { + children.push(child.to_string()); + Ok(()) + } else { + Err(format!("Parent task '{}' doesn't exist!", parent)) + } + } else { + Err(format!("Child task '{}' doesn't exist!", &child)) + } + } + + pub fn is_task_name_present(&mut self, name:&str) -> bool { + self.get_task_by_name(name).is_some() + } + + pub fn get_task_by_name(&mut self, name:&str) -> Option<&mut Task> { + for task_group in self.tasks.iter_mut() { + for task in task_group.iter_mut() { + if task.name == name { + return Some(task) + } + } + } + None + } + + pub fn get_descendants(&self, task_name:&str) -> Vec { + let mut descendants = self.get_descendants_recursively(task_name); + descendants.sort(); + descendants.dedup(); + descendants + } + + fn get_descendants_recursively(&self, task_name:&str) -> Vec { + let default = &vec![]; + let deps:Vec = self.edges.get(task_name).unwrap_or(default).iter().map(|x| x.clone()).collect(); + + let mut seen = vec![]; + + for dep in deps { + seen.push(dep.clone()); + seen.extend(self.get_descendants_recursively(&dep)); + } + + return seen; + } + +} + diff --git a/src/factotum/executor/task_list/tests.rs b/src/factotum/executor/task_list/tests.rs new file mode 100644 index 0000000..d081637 --- /dev/null +++ b/src/factotum/executor/task_list/tests.rs @@ -0,0 +1,153 @@ +/* + * Copyright (c) 2016 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, and + * you may not use this file except in compliance with the Apache License + * Version 2.0. You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the Apache License Version 2.0 is distributed on an "AS + * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the Apache License Version 2.0 for the specific language + * governing permissions and limitations there under. + */ + +use super::*; + +#[test] +fn task_new_defaults_good() { + let task = Task::::new("hello", "world".to_string()); + assert_eq!(task.name, "hello"); + assert_eq!(task.state, State::WAITING); + assert_eq!(task.children.len(), 0); + assert_eq!(task.task_spec, "world".to_string()); + assert!(task.run_result.is_some()==false) +} + +#[test] +fn task_grp_dup_names_err() { + let mut task_group = TaskGroup::::new(); + task_group.push(Task::::new("hello", "hello".to_string())); + task_group.push(Task::::new("world", "world".to_string())); + let mut tl = TaskList::::new(); + let expected_good = tl.add_group(task_group); + assert!(expected_good.ok().is_some()); + + + let mut task_group_err = TaskGroup::::new(); + task_group_err.push(Task::::new("hello", "hello".to_string())); + let expected_bad = tl.add_group(task_group_err); + match expected_bad { + Ok(()) => panic!("Duplicate values added to tasklist"), + Err(msg) => assert_eq!("Task 'hello' has been added already - task names must be unique", msg) + } +} + +#[test] +fn test_get_by_name() { + let mut tl = TaskList::::new(); + assert!(tl.get_task_by_name("banana").is_some()==false); + + tl.add_group(vec![ + Task::::new("hello", "world".to_string()), + Task::::new("yes", "world".to_string()) + ]).ok().unwrap(); + + tl.add_group(vec![ + Task::::new("thing", "world".to_string()), + Task::::new("yah", "world".to_string()) + ]).ok().unwrap(); + + assert!(tl.get_task_by_name("hello").is_some()); + assert!(tl.get_task_by_name("yes").is_some()); + assert!(tl.get_task_by_name("thing").is_some()); + assert!(tl.get_task_by_name("yah").is_some()); +} + +#[test] +fn set_child_no_parent_err() { + let mut tl = TaskList::<&str>::new(); + tl.add_group(vec![ + Task::<&str>::new("child", "world"), + ]).ok().unwrap(); + let r = tl.set_child("parent", "child"); + assert!(r.ok().is_some()==false); +} + +#[test] +fn set_child_no_child_err() { + let mut tl = TaskList::<&str>::new(); + tl.add_group(vec![ + Task::<&str>::new("parent", "world"), + ]).ok().unwrap(); + let r = tl.set_child("parent", "child"); + assert!(r.ok().is_some()==false); +} + +#[test] +fn set_child_good() { + let mut tl = TaskList::<&str>::new(); + let tg = vec![ + Task::<&str>::new("parent", "world"), + Task::<&str>::new("child", "world") + ]; + tl.add_group(tg).ok().unwrap(); + let r = tl.set_child("parent", "child"); + assert!(r.ok().is_some()==true); +} + +#[test] +fn get_children() { + let mut tl = TaskList::<&str>::new(); + let tg = vec![ + Task::<&str>::new("parent", "world"), + Task::<&str>::new("child", "world"), + Task::<&str>::new("grandchild", "world"), + Task::<&str>::new("grandchild2", "world") + ]; + tl.add_group(tg).ok().unwrap(); + tl.set_child("parent", "child").ok(); + tl.set_child("child", "grandchild").ok(); + tl.set_child("child", "grandchild2").ok(); + + assert_eq!(vec!["grandchild", "grandchild2"], tl.get_descendants("child")); + assert_eq!(vec!["child", "grandchild", "grandchild2"], tl.get_descendants("parent")); + assert_eq!(Vec::::new(), tl.get_descendants("")) +} + +#[test] +fn get_children_dups_removed() { + let mut tl = TaskList::<&str>::new(); + let tg = vec![ + Task::<&str>::new("parent", "world"), + Task::<&str>::new("child", "world"), + Task::<&str>::new("grandchild", "world"), + Task::<&str>::new("grandchild2", "world") + ]; + tl.add_group(tg).ok().unwrap(); + tl.set_child("parent", "child").ok(); + tl.set_child("child", "grandchild").ok(); + tl.set_child("child", "grandchild2").ok(); + tl.set_child("parent", "grandchild2").ok(); + + assert_eq!(vec!["grandchild", "grandchild2"], tl.get_descendants("child")); + assert_eq!(vec!["child", "grandchild", "grandchild2"], tl.get_descendants("parent")); + assert_eq!(Vec::::new(), tl.get_descendants("")) +} + +#[test] +fn is_task_name_present_good() { + let mut tl = TaskList::<&str>::new(); + let tg = vec![ + Task::<&str>::new("parent", "world"), + Task::<&str>::new("child", "world"), + Task::<&str>::new("grandchild", "world"), + Task::<&str>::new("grandchild2", "world") + ]; + tl.add_group(tg).ok().unwrap(); + + assert!(tl.is_task_name_present("parent")); + assert!(tl.is_task_name_present("grandchild2")); + assert!(tl.is_task_name_present("banana")==false); +} \ No newline at end of file diff --git a/src/factotum/executor/tests.rs b/src/factotum/executor/tests.rs new file mode 100644 index 0000000..14b414c --- /dev/null +++ b/src/factotum/executor/tests.rs @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2016 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, and + * you may not use this file except in compliance with the Apache License + * Version 2.0. You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the Apache License Version 2.0 is distributed on an "AS + * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the Apache License Version 2.0 for the specific language + * governing permissions and limitations there under. + */ + +use factotum::tests::make_task; +use factotum::factfile::*; +use factotum::executor::*; + +#[test] +fn get_task_execution_list_good() { + let mut ff = Factfile::new("test"); + ff.add_task_obj(&make_task("apple", &vec![])); + ff.add_task_obj(&make_task("turnip", &vec![])); + ff.add_task_obj(&make_task("orange", &vec!["apple"])); + ff.add_task_obj(&make_task("egg", &vec!["apple"])); + ff.add_task_obj(&make_task("potato", &vec!["apple", "egg"])); + ff.add_task_obj(&make_task("chicken", &vec!["potato","orange"])); + + // apple---------- turnip + // / \ \ + // orange egg----- \ + // \ \ \ + // \ potato + // \ \ + // --------------- chicken + + let tl = get_task_execution_list(&ff, None); + + let expected = vec![ vec!["turnip", "apple"], + vec!["egg", "orange"], + vec!["potato"], + vec!["chicken"] ]; + + let actual:Vec> = tl.tasks.iter() + .map(|task_group| task_group.iter().map(|task| task.name.clone()).collect()) + .collect(); + + assert_eq!(actual, expected); + + // check the children are correctly mapped + + assert_eq!(tl.get_descendants("turnip"), Vec::::new()); + assert_eq!(tl.get_descendants("apple"), vec!["chicken", "egg", "orange", "potato"]); + assert_eq!(tl.get_descendants("egg"), vec!["chicken", "potato"]); + assert_eq!(tl.get_descendants("orange"), vec!["chicken"]); +} + +#[test] +fn get_task_execution_list_good_reduced() { + let mut ff = Factfile::new("test"); + ff.add_task_obj(&make_task("apple", &vec![])); + ff.add_task_obj(&make_task("turnip", &vec![])); + ff.add_task_obj(&make_task("orange", &vec!["apple"])); + ff.add_task_obj(&make_task("egg", &vec!["apple"])); + ff.add_task_obj(&make_task("potato", &vec!["apple", "egg"])); + ff.add_task_obj(&make_task("chicken", &vec!["potato","orange"])); + + let tl = get_task_execution_list(&ff, Some("potato".to_string())); + assert!(tl.tasks[0].len()==1); + assert_eq!(tl.get_descendants("potato"), vec!["chicken"]); +} + +#[test] +fn get_formatted_args() { + let args_list = format_args("echo", &vec!["hello".to_string(), + "world".to_string(), + "abc abc".to_string()]); + assert_eq!(args_list, "echo \"hello\" \"world\" \"abc abc\""); +} + +// todo write test for rejecting non "shell" execution types diff --git a/src/factotum/factfile/mod.rs b/src/factotum/factfile/mod.rs index 9d7006b..9a420e8 100644 --- a/src/factotum/factfile/mod.rs +++ b/src/factotum/factfile/mod.rs @@ -25,6 +25,7 @@ pub struct Factfile { root: NodeIndex } +#[derive(Clone)] pub struct Task { pub name: String, pub depends_on: Vec, @@ -34,6 +35,7 @@ pub struct Task { pub on_result: OnResult } +#[derive(Clone)] pub struct OnResult { pub terminate_job: Vec, pub continue_job: Vec diff --git a/src/factotum/parser/mod.rs b/src/factotum/parser/mod.rs index fd552a2..32528bf 100644 --- a/src/factotum/parser/mod.rs +++ b/src/factotum/parser/mod.rs @@ -25,21 +25,31 @@ use super::factfile; use std::error::Error; -pub fn parse(factfile:&str, env:Option) -> Result { +pub struct TaskReturnCodeMapping { + pub continue_job: Vec, + pub terminate_early: Vec +} + +pub enum OverrideResultMappings { + All(TaskReturnCodeMapping), + None +} + +pub fn parse(factfile:&str, env:Option, overrides:OverrideResultMappings) -> Result { info!("reading {} into memory", factfile); let mut fh = try!(File::open(&factfile).map_err(|e| format!("Couldn't open '{}' for reading: {}", factfile, e))); let mut f = String::new(); try!(fh.read_to_string(&mut f).map_err(|e| format!("Couldn't read '{}': {}", factfile, e))); info!("file {} was read successfully!", factfile); - parse_str(&f, factfile, env) + parse_str(&f, factfile, env, overrides) } pub fn inflate_env(env:&str) -> Result { Json::from_str(env).map_err(|err| format!("Supplied environment/config '{}' is not valid JSON: {}", env, Error::description(&err))) } -fn parse_str(json:&str, from_filename:&str, env:Option) -> Result { +fn parse_str(json:&str, from_filename:&str, env:Option, overrides:OverrideResultMappings) -> Result { info!("parsing json:\n{}", json); let validation_result = schemavalidator::validate_against_factfile_schema(json); @@ -56,7 +66,7 @@ fn parse_str(json:&str, from_filename:&str, env:Option) -> Result { info!("'{}' failed to match factfile schema definition!", from_filename); @@ -90,17 +100,18 @@ struct FactfileTaskFormat { onResult: FactfileTaskResultFormat } -#[derive(RustcDecodable)] +#[derive(RustcDecodable, Clone)] #[allow(non_snake_case)] struct FactfileTaskResultFormat { terminateJobWithSuccess: Vec, continueJob: Vec } -fn parse_valid_json(file:&str, conf:Option) -> Result { +fn parse_valid_json(file:&str, conf:Option, overrides:OverrideResultMappings) -> Result { let schema: SelfDescribingJson = try!(json::decode(file).map_err(|e| e.to_string())); let decoded_json = schema.data; let mut ff = factfile::Factfile::new(decoded_json.name); + for file_task in decoded_json.tasks.iter() { // TODO errs in here - ? add task should Result not panic! info!("adding task '{}'", file_task.name); @@ -113,7 +124,7 @@ fn parse_valid_json(file:&str, conf:Option) -> Result) -> Result) -> Result = file_task.dependsOn.iter().map(AsRef::as_ref).collect(); let args:Vec<&str> = decorated_args.iter().map(AsRef::as_ref).collect(); + + let (terminate_mappings, continue_mappings) = match overrides { + OverrideResultMappings::All(ref with_value) => (&with_value.terminate_early, &with_value.continue_job), + OverrideResultMappings::None => (&file_task.onResult.terminateJobWithSuccess, &file_task.onResult.continueJob) + }; ff.add_task(&file_task.name, &deps, &file_task.executor, &file_task.command, &args, - &file_task.onResult.terminateJobWithSuccess, - &file_task.onResult.continueJob); + terminate_mappings, + continue_mappings); } Ok(ff) } diff --git a/src/factotum/parser/schemavalidator/tests.rs b/src/factotum/parser/schemavalidator/tests.rs index ec07daf..c85680f 100644 --- a/src/factotum/parser/schemavalidator/tests.rs +++ b/src/factotum/parser/schemavalidator/tests.rs @@ -12,4 +12,6 @@ * implied. See the Apache License Version 2.0 for the specific language * governing permissions and limitations there under. */ - \ No newline at end of file + + + // TODO write some tests for the schema validator \ No newline at end of file diff --git a/src/factotum/parser/tests.rs b/src/factotum/parser/tests.rs index a4faa93..f5a59a5 100644 --- a/src/factotum/parser/tests.rs +++ b/src/factotum/parser/tests.rs @@ -23,7 +23,7 @@ fn resource(name:&str) -> String { #[test] fn invalid_files_err() { - let res = parse("asdhf;asdjhfasdf", None); + let res = parse("asdhf;asdjhfasdf", None, OverrideResultMappings::None); if let Err(msg) = res { assert_eq!(msg, "Couldn't open 'asdhf;asdjhfasdf' for reading: No such file or directory (os error 2)".to_string()) } else { @@ -33,7 +33,7 @@ fn invalid_files_err() { #[test] fn invalid_json_err() { - let res = parse(&resource("invalid_json.factfile"), None); + let res = parse(&resource("invalid_json.factfile"), None, OverrideResultMappings::None); if let Err(msg) = res { assert_eq!(msg,format!("'{}' is not a valid factotum factfile: invalid JSON - invalid syntax at line 1, column 3", resource("invalid_json.factfile")).to_string()) } else { @@ -44,7 +44,7 @@ fn invalid_json_err() { #[test] fn invalid_against_schema_err() { let invalid = resource("example_invalid_no_name.factfile"); - let res = parse(&invalid, None); + let res = parse(&invalid, None, OverrideResultMappings::None); if let Err(msg) = res { assert_eq!(msg,format!("'{}' is not a valid factotum factfile: '/data/name' - This property is required", invalid).to_string()) } else { @@ -55,7 +55,7 @@ fn invalid_against_schema_err() { #[test] fn invalid_against_schema_wrong_type() { let invalid = resource("example_wrong_type.factfile"); - let res = parse(&invalid, None); + let res = parse(&invalid, None,OverrideResultMappings::None); if let Err(msg) = res { assert_eq!(msg,format!("'{}' is not a valid factotum factfile: '/data/tasks/0/onResult/terminateJobWithSuccess/0' - Type of the value is wrong (The value must be integer)", invalid).to_string()) } else { @@ -66,7 +66,7 @@ fn invalid_against_schema_wrong_type() { #[test] fn invalid_ambiguous_on_result() { let invalid = resource("example_invalid_terminate_continue_same.factfile"); - let res = parse(&invalid, None); + let res = parse(&invalid, None, OverrideResultMappings::None); if let Err(msg) = res { assert_eq!(msg, format!("'{}' is not a valid factotum factfile: the task 'ambi' has conflicting actions.", invalid)) } else { @@ -77,7 +77,7 @@ fn invalid_ambiguous_on_result() { #[test] fn invalid_must_continue() { let invalid = resource("example_invalid_no_continue.factfile"); - let res = parse(&invalid, None); + let res = parse(&invalid, None, OverrideResultMappings::None); if let Err(msg) = res { assert_eq!(msg, format!("'{}' is not a valid factotum factfile: the task 'continue' has no way to continue successfully.", invalid)) } else { @@ -89,7 +89,7 @@ fn invalid_must_continue() { fn valid_generates_factfile() { let valid = resource("example_ok.factfile"); - if let Ok(factfile) = parse(&valid, None) { + if let Ok(factfile) = parse(&valid, None, OverrideResultMappings::None) { let tasks = factfile.get_tasks_in_order(); assert_eq!(factfile.name, "My First DAG"); @@ -107,6 +107,41 @@ fn valid_generates_factfile() { } else { panic!("valid factfile example_ok.factfile should have parsed but didn't"); } +} + +#[test] +fn overrides_set_noop_values() { + let valid = resource("example_ok.factfile"); + + let new_map = TaskReturnCodeMapping { + continue_job: vec![12], + terminate_early: vec![34] + }; + + if let Ok(factfile) = parse(&valid, None, OverrideResultMappings::All(new_map)) { + let tasks = factfile.get_tasks_in_order(); + assert_eq!(factfile.name, "My First DAG"); + + let task_one = tasks.get(0).unwrap().get(0).unwrap(); + assert_eq!(task_one.name, "EmrEtlRunner"); + assert_eq!(task_one.depends_on, Vec::<&str>::new()); + assert_eq!(task_one.on_result.terminate_job, vec![34]); + assert_eq!(task_one.on_result.continue_job, vec![12]); + + let task_two = tasks.get(1).unwrap().get(0).unwrap(); + assert_eq!(task_two.name, "StorageLoader"); + assert_eq!(task_two.depends_on, vec!["EmrEtlRunner"]); + assert_eq!(task_two.on_result.terminate_job, vec![34]); + assert_eq!(task_two.on_result.continue_job, vec![12]); + + let task_three = tasks.get(2).unwrap().get(0).unwrap(); + assert_eq!(task_three.name, "SQL Runner"); + assert_eq!(task_three.depends_on, vec!["StorageLoader"]); + assert_eq!(task_three.on_result.terminate_job, vec![34]); + assert_eq!(task_three.on_result.continue_job, vec![12]); + } else { + panic!("valid factfile example_ok.factfile should have parsed but didn't"); + } } diff --git a/src/factotum/tests.rs b/src/factotum/tests.rs index 32abd4c..c26c759 100644 --- a/src/factotum/tests.rs +++ b/src/factotum/tests.rs @@ -1,3 +1,18 @@ +/* + * Copyright (c) 2016 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, and + * you may not use this file except in compliance with the Apache License + * Version 2.0. You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the Apache License Version 2.0 is distributed on an "AS + * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the Apache License Version 2.0 for the specific language + * governing permissions and limitations there under. + */ + use factotum::factfile::Task; use factotum::factfile::OnResult; diff --git a/src/main.rs b/src/main.rs index c810830..1eb9c83 100644 --- a/src/main.rs +++ b/src/main.rs @@ -25,11 +25,15 @@ extern crate chrono; use docopt::Docopt; use std::fs; -use factotum::executor::ExecutionResult; -use factotum::executor::TaskExecutionResult; +use factotum::executor::task_list::{Task, State}; use factotum::factfile::Factfile; +use factotum::factfile::{Task as FactfileTask}; +use factotum::parser::OverrideResultMappings; +use factotum::parser::TaskReturnCodeMapping; +use factotum::executor::execution_strategy::*; use colored::*; use std::time::Duration; +use std::process::Command; mod factotum; @@ -42,7 +46,7 @@ const USAGE: &'static str = " Factotum. Usage: - factotum run [--start=] [--env=] + factotum run [--start=] [--env=] [--dry-run] factotum validate factotum (-h | --help) @@ -50,12 +54,14 @@ Options: -h --help Show this screen. --start= Begin at specified task. --env= Supply JSON to define mustache variables in Factfile. + --dry-run Pretend to execute a Factfile, showing the commands that would be executed. Can be used with other options. "; #[derive(Debug, RustcDecodable)] struct Args { flag_start: Option, flag_env: Option, + flag_dry_run: bool, arg_factfile: String, cmd_run: bool, cmd_validate: bool @@ -103,33 +109,43 @@ fn get_duration_as_string(d:&Duration) -> String { } } -fn get_task_result_line_str(task_result:&TaskExecutionResult) -> (String, Option) { - - let (opening_line, stdout, stderr, summary_line) = if let Some(ref run_result) = task_result.run_details { - // we know tasks with run details were attempted - - let opener = format!("Task '{}' was started at {}\n", task_result.name.cyan(), run_result.run_started); +fn get_task_result_line_str(task_result:&Task<&FactfileTask>) -> (String, Option) { + + let state = task_result.state.clone(); + let (opening_line, stdout, stderr, summary_line) = if let Some(ref res) = task_result.run_result { + // we know tasks with run details were attempted + + let opener = format!("Task '{}' was started at {}\n", task_result.name.cyan(), res.run_started); - let output = match run_result.stdout { + let output = match res.stdout { Some(ref o) => Some(format!("Task '{}' stdout:\n{}\n", task_result.name.cyan(), o.trim_right().bold())), None => None }; - let errors = match run_result.stderr { + let errors = match res.stderr { Some(ref e) => Some(format!("Task '{}' stderr:\n{}\n", task_result.name.cyan(), e.trim_right().red())), None => None }; - - let summary = if let Some(ref run_error) = run_result.task_execution_error { - let mut failure_str = "Task '".red().to_string(); - failure_str.push_str(&format!("{}", task_result.name.cyan())); - failure_str.push_str(&format!("': failed after {}. Reason: {}", get_duration_as_string(&run_result.duration), run_error).red().to_string()); - failure_str - } else { - let mut success_str = "Task '".green().to_string(); - success_str.push_str(&format!("{}", task_result.name.cyan())); - success_str.push_str(&format!("': succeeded after {}", get_duration_as_string(&run_result.duration)).green().to_string()); - success_str + + let summary = match (&res.task_execution_error, state) { + (&Some(ref task_exec_error_msg), _) => { + let mut failure_str = "Task '".red().to_string(); + failure_str.push_str(&format!("{}", task_result.name.cyan())); + failure_str.push_str(&format!("': couldn't be started. Reason: {}", task_exec_error_msg).red().to_string()); + failure_str + }, + (_, State::FAILED(fail_reason)) => { + let mut failure_str = "Task '".red().to_string(); + failure_str.push_str(&format!("{}", task_result.name.cyan())); + failure_str.push_str(&format!("': failed after {}. Reason: {}", get_duration_as_string(&res.duration), fail_reason).red().to_string()); + failure_str + }, + (_, _) => { + let mut success_str = "Task '".green().to_string(); + success_str.push_str(&format!("{}", task_result.name.cyan())); + success_str.push_str(&format!("': succeeded after {}", get_duration_as_string(&res.duration)).green().to_string()); + success_str + } }; (opener, output, errors, summary) @@ -138,7 +154,7 @@ fn get_task_result_line_str(task_result:&TaskExecutionResult) -> (String, Option // tasks without run details may have been unable to start (some internal error) // or skipped because a prior task errored or NOOPed - let reason_for_not_running = if task_result.attempted { + let reason_for_not_running = if let State::FAILED(_) = task_result.state { "Factotum could not start the task".red().to_string() } else { "skipped".to_string() @@ -160,7 +176,7 @@ fn get_task_result_line_str(task_result:&TaskExecutionResult) -> (String, Option return (result, stderr); } -fn get_task_results_str(task_results:&Vec) -> (String, String) { +fn get_task_results_str(task_results:&Vec<&Task<&FactfileTask>>) -> (String, String) { let mut stderr = String::new(); let mut stdout = String::new(); @@ -175,7 +191,7 @@ fn get_task_results_str(task_results:&Vec) -> (String, Stri stderr.push_str(&task_stderr_str); } - if let Some(ref run_result) = task.run_details { + if let Some(ref run_result) = task.run_result { total_run_time = total_run_time + run_result.duration; executed += 1; } @@ -213,14 +229,32 @@ fn validate_start_task(job: &Factfile, start_task:&str) -> Result<(), &'static s } fn validate(factfile:&str, env:Option) -> Result { - match factotum::parser::parse(factfile, env) { + match factotum::parser::parse(factfile, env, OverrideResultMappings::None) { Ok(_) => Ok(format!("'{}' is a valid Factfile!", factfile).green().to_string()), Err(msg) => Err(msg.red().to_string()) } } +fn parse_file_and_simulate(factfile:&str, env:Option, start_from:Option) -> i32 { + parse_file_and_execute_with_strategy(factfile, + env, + start_from, + factotum::executor::execution_strategy::execute_simulation, + OverrideResultMappings::All(TaskReturnCodeMapping { continue_job: vec![0], terminate_early: vec![] } )) +} + fn parse_file_and_execute(factfile:&str, env:Option, start_from:Option) -> i32 { - match factotum::parser::parse(factfile, env) { + parse_file_and_execute_with_strategy(factfile, + env, + start_from, + factotum::executor::execution_strategy::execute_os, + OverrideResultMappings::None) +} + +fn parse_file_and_execute_with_strategy(factfile:&str, env:Option, start_from:Option, strategy: F, override_result_map:OverrideResultMappings) -> i32 + where F : Fn(&str, &mut Command) -> RunResult + Send + Sync + 'static + Copy { + + match factotum::parser::parse(factfile, env, override_result_map) { Ok(job) => { if let Some(ref start_task) = start_from { @@ -231,53 +265,75 @@ fn parse_file_and_execute(factfile:&str, env:Option, start_from:Option { + let job_res = factotum::executor::execute_factfile(&job, start_from, strategy); + + let mut has_errors = false; + let mut has_early_finish = false; + + let mut tasks = vec![]; + + for task_group in job_res.tasks.iter() { + for task in task_group { + if let State::FAILED(_) = task.state { + has_errors = true; + } else if let State::SUCCESS_NOOP = task.state { + has_early_finish = true; + } + tasks.push(task); + } + } + + let normal_completion = !has_errors && !has_early_finish; + + if normal_completion { let (stdout_summary, stderr_summary) = get_task_results_str(&tasks); print!("{}", stdout_summary); if !stderr_summary.trim_right().is_empty() { print_err!("{}", stderr_summary.trim_right()); } PROC_SUCCESS - }, - ExecutionResult::EarlyFinishOk(tasks) => { + } + else if has_early_finish && !has_errors { let (stdout_summary, stderr_summary) = get_task_results_str(&tasks); print!("{}", stdout_summary); if !stderr_summary.trim_right().is_empty() { print_err!("{}", stderr_summary.trim_right()); } let incomplete_tasks = tasks.iter() - .filter(|r| !r.attempted) + .filter(|r| r.run_result.is_some() ) .map(|r| format!("'{}'", r.name.cyan())) .collect::>() .join(", "); let stop_requesters = tasks.iter() - .filter(|r| r.run_details.is_some() && r.run_details.as_ref().unwrap().requests_job_termination) + .filter(|r| match r.state { State::SUCCESS_NOOP => true, _ => false } ) .map(|r| format!("'{}'", r.name.cyan())) .collect::>() .join(", "); println!("Factotum job finished early as a task ({}) requested an early finish. The following tasks were not run: {}.", stop_requesters, incomplete_tasks); PROC_SUCCESS - }, - ExecutionResult::AbnormalTermination(tasks) => { + } + else { let (stdout_summary, stderr_summary) = get_task_results_str(&tasks); print!("{}", stdout_summary); + if !stderr_summary.trim_right().is_empty() { print_err!("{}", stderr_summary.trim_right()); } + let incomplete_tasks = tasks.iter() - .filter(|r| !r.attempted) + .filter(|r| !r.run_result.is_some() ) .map(|r| format!("'{}'", r.name.cyan())) .collect::>() .join(", "); + let failed_tasks = tasks.iter() - .filter(|r| r.run_details.is_some() && r.run_details.as_ref().unwrap().task_execution_error.is_some()) + .filter(|r| match r.state { State::FAILED(_) => true, _ => false } ) .map(|r| format!("'{}'", r.name.cyan())) .collect::>() .join(", "); + println!("Factotum job executed abnormally as a task ({}) failed - the following tasks were not run: {}!", failed_tasks, incomplete_tasks); return PROC_EXEC_ERROR; - } } }, Err(msg) => { @@ -318,7 +374,11 @@ fn factotum() -> i32 { }; if args.cmd_run { - parse_file_and_execute(&args.arg_factfile, args.flag_env, args.flag_start) + if !args.flag_dry_run { + parse_file_and_execute(&args.arg_factfile, args.flag_env, args.flag_start) + } else { + parse_file_and_simulate(&args.arg_factfile, args.flag_env, args.flag_start) + } } else if args.cmd_validate { match validate(&args.arg_factfile, args.flag_env) { Ok(msg) => { @@ -385,17 +445,25 @@ fn get_duration_with_hours() { #[test] fn test_get_task_result_line_str() { use chrono::UTC; - use factotum::executor::RunResult; + use factotum::executor::execution_strategy::RunResult; + use factotum::factfile::{ Task as FactfileTask, OnResult }; + // successful after 20 secs let dt = UTC::now(); - let sample_task = TaskExecutionResult { + let sample_task = Task::<&FactfileTask> { name: String::from("hello world"), - attempted: true, - run_details: Some(RunResult { + children: vec![], + state: State::SUCCESS, + task_spec: &FactfileTask { name: "hello world".to_string(), + depends_on: vec![], + executor: "".to_string(), + command: "".to_string(), + arguments: vec![], + on_result: OnResult { terminate_job: vec![], continue_job: vec![] } }, + run_result: Some(RunResult { run_started: dt, duration: Duration::from_secs(20), task_execution_error: None, - requests_job_termination: false, stdout: Some(String::from("hello world")), stderr: None, return_code: 0 @@ -407,92 +475,153 @@ fn test_get_task_result_line_str() { assert_eq!(result_stdout, expected); assert_eq!(result_stderr, None); - let sample_task_stdout = TaskExecutionResult { + // failed after 20 secs + // (was started ok) + let sample_task_stdout = Task::<&FactfileTask> { name: String::from("hello world"), - attempted: true, - run_details: Some(RunResult { + children: vec![], + state: State::FAILED("Something about not being in continue job".to_string()), + task_spec: &FactfileTask { name: "hello world".to_string(), + depends_on: vec![], + executor: "".to_string(), + command: "".to_string(), + arguments: vec![], + on_result: OnResult { terminate_job: vec![], continue_job: vec![] } }, + run_result: Some(RunResult { run_started: dt, duration: Duration::from_secs(20), task_execution_error: None, - requests_job_termination: false, stdout: Some(String::from("hello world")), stderr: Some(String::from("There's errors")), - return_code: 0 + return_code: 0 }) }; assert_eq!(format!("Task '{}' stderr:\n{}\n", sample_task.name.cyan(), "There's errors".red()), get_task_result_line_str(&sample_task_stdout).1.unwrap()); - assert_eq!(get_task_result_line_str(&sample_task_stdout).0, expected); + assert_eq!(get_task_result_line_str(&sample_task_stdout).0, + format!("Task '{}' was started at {}\nTask '{}' stdout:\n{}\n{}{}{}\n", "hello world".cyan(), dt, "hello world".cyan(), "hello world".bold(), "Task '".red(), "hello world".cyan(), "': failed after 20.0s. Reason: Something about not being in continue job".red())); - let task_skipped = TaskExecutionResult { + // skipped task (previous failure/noop) + let task_skipped = Task::<&FactfileTask> { name: String::from("skip"), - attempted: false, - run_details: None + children: vec![], + task_spec: &FactfileTask { name: "hello world".to_string(), + depends_on: vec![], + executor: "".to_string(), + command: "".to_string(), + arguments: vec![], + on_result: OnResult { terminate_job: vec![], continue_job: vec![] } }, + state: State::SKIPPED("for some reason".to_string()), + run_result: None }; assert_eq!(format!("Task '{}': skipped!\n", "skip".cyan()), get_task_result_line_str(&task_skipped).0); assert_eq!(None, get_task_result_line_str(&task_skipped).1); - let task_init_fail = TaskExecutionResult { + let task_init_fail = Task::<&FactfileTask> { name: String::from("init fail"), - attempted: true, - run_details: None + children: vec![], + state: State::FAILED("bla".to_string()), + task_spec: &FactfileTask { name: "hello world".to_string(), + depends_on: vec![], + executor: "".to_string(), + command: "".to_string(), + arguments: vec![], + on_result: OnResult { terminate_job: vec![], continue_job: vec![] } }, + run_result: None }; - // todo: is there a better error here? - // I think this specific case is very unlikely as it'd hint at a problem with the rust stdlib - // it means we've tried to execute a process, but didn't get a return code etc + assert_eq!(format!("Task '{}': {}!\n", "init fail".cyan(), "Factotum could not start the task".red()), get_task_result_line_str(&task_init_fail).0); assert_eq!(None, get_task_result_line_str(&task_init_fail).1); - let task_failure = TaskExecutionResult { + let task_failure = Task::<&FactfileTask> { name: String::from("fails"), - attempted: true, - run_details: Some(RunResult { + children: vec![], + state: State::FAILED("bla".to_string()), + task_spec: &FactfileTask { name: "hello world".to_string(), + depends_on: vec![], + executor: "".to_string(), + command: "".to_string(), + arguments: vec![], + on_result: OnResult { terminate_job: vec![], continue_job: vec![] } }, + run_result: Some(RunResult { run_started: dt, duration: Duration::from_secs(20), task_execution_error: Some(String::from("The task exited with something unexpected")), - requests_job_termination: false, stdout: Some(String::from("hello world")), stderr: Some(String::from("There's errors")), return_code: 0 }) }; - let expected_failed = format!("Task '{}' was started at {}\nTask '{}' stdout:\n{}\n{}{}{}\n", "fails".cyan(), dt, "fails".cyan(), "hello world".bold(), "Task '".red(), "fails".cyan(), "': failed after 20.0s. Reason: The task exited with something unexpected".red()); + let expected_failed = format!("Task '{}' was started at {}\nTask '{}' stdout:\n{}\n{}{}{}\n", "fails".cyan(), dt, "fails".cyan(), "hello world".bold(), "Task '".red(), "fails".cyan(), "': couldn't be started. Reason: The task exited with something unexpected".red()); let (stdout_failed, stderr_failed) = get_task_result_line_str(&task_failure); assert_eq!(expected_failed, stdout_failed); assert_eq!(format!("Task '{}' stderr:\n{}\n", "fails".cyan(), "There's errors".red()), stderr_failed.unwrap()); - // todo noop ? } #[test] fn test_get_task_results_str_summary() { use chrono::UTC; - use factotum::executor::RunResult; + use factotum::executor::execution_strategy::RunResult; + use factotum::factfile::{Task as FactfileTask, OnResult}; let dt = UTC::now(); - let mut tasks = vec![]; - let (stdout, stderr) = get_task_results_str(&tasks); - let expected:String = format!("{}", "0/0 tasks run in 0.0s\n".green()); - - assert_eq!(stdout, expected); - assert_eq!(stderr, ""); - - tasks.push(TaskExecutionResult { + let task_one_spec = FactfileTask { name: "hello world".to_string(), + depends_on: vec![], + executor: "".to_string(), + command: "".to_string(), + arguments: vec![], + on_result: OnResult { terminate_job: vec![], continue_job: vec![] } }; + + let task_one = Task::<&FactfileTask> { name: String::from("hello world"), - attempted: true, - run_details: Some(RunResult { + children: vec![], + state: State::SUCCESS, + task_spec: &task_one_spec, + run_result: Some(RunResult { run_started: dt, duration: Duration::from_secs(20), task_execution_error: None, - requests_job_termination: false, stdout: Some(String::from("hello world")), stderr: Some(String::from("Mistake")), return_code: 0 }) - }); + }; + + + let task_two_spec = FactfileTask { name: "hello world 2".to_string(), + depends_on: vec![], + executor: "".to_string(), + command: "".to_string(), + arguments: vec![], + on_result: OnResult { terminate_job: vec![], continue_job: vec![] } }; + + let task_two = Task::<&FactfileTask> { + name: String::from("hello world 2"), + children: vec![], + state: State::SUCCESS, + task_spec: &task_two_spec, + run_result: Some(RunResult { + run_started: dt, + duration: Duration::from_secs(80), + task_execution_error: None, + stdout: Some(String::from("hello world")), + stderr: Some(String::from("Mistake")), + return_code: 0 + }) + }; + + let mut tasks:Vec<&Task<&FactfileTask>> = vec![]; + let (stdout, stderr) = get_task_results_str(&tasks); + let expected:String = format!("{}", "0/0 tasks run in 0.0s\n".green()); + + assert_eq!(stdout, expected); + assert_eq!(stderr, ""); + + tasks.push(&task_one); let (one_task_stdout, one_task_stderr) = get_task_results_str(&tasks); let (first_task_stdout, first_task_stderr) = get_task_result_line_str(&tasks[0]); @@ -502,19 +631,7 @@ fn test_get_task_results_str_summary() { let first_task_stderr_str = first_task_stderr.unwrap(); assert_eq!(one_task_stderr, first_task_stderr_str); - tasks.push(TaskExecutionResult { - name: String::from("hello world 2"), - attempted: true, - run_details: Some(RunResult { - run_started: dt, - duration: Duration::from_secs(80), - task_execution_error: None, - requests_job_termination: false, - stdout: Some(String::from("hello world")), - stderr: Some(String::from("Mistake")), - return_code: 0 - }) - }); + tasks.push(&task_two); let (two_task_stdout, two_task_stderr) = get_task_results_str(&tasks); let (task_two_stdout, task_two_stderr) = get_task_result_line_str(&tasks[1]);