From 590e45fa69082f5db1789dffae8ef676455a348a Mon Sep 17 00:00:00 2001 From: Robert Collins Date: Fri, 7 Jun 2019 00:07:12 +1200 Subject: [PATCH] Thread dir creation as well When directories complete, start writing the files/dirs within that directory that have been decompressed already. Avoids a stat() + create_dir_all() in the main thread permitting more concurrent IO dispatch in exchange for memory pressure. --- src/diskio/immediate.rs | 8 +- src/diskio/mod.rs | 4 +- src/diskio/threaded.rs | 16 ++-- src/dist/component/package.rs | 154 +++++++++++++++++++++++++--------- 4 files changed, 127 insertions(+), 55 deletions(-) diff --git a/src/diskio/immediate.rs b/src/diskio/immediate.rs index b76d2e4a68c..f19f74cfe73 100644 --- a/src/diskio/immediate.rs +++ b/src/diskio/immediate.rs @@ -42,11 +42,11 @@ impl Executor for ImmediateUnpacker { Box::new(ImmediateIterator(Cell::new(IterateOne::Item(item)))) } - fn join(&mut self) -> Option>> { - None + fn join(&mut self) -> Box> { + Box::new(ImmediateIterator(Cell::new(IterateOne::None))) } - fn completed(&mut self) -> Option>> { - None + fn completed(&mut self) -> Box> { + Box::new(ImmediateIterator(Cell::new(IterateOne::None))) } } diff --git a/src/diskio/mod.rs b/src/diskio/mod.rs index f3debc13c16..20b3e0b3ed5 100644 --- a/src/diskio/mod.rs +++ b/src/diskio/mod.rs @@ -136,10 +136,10 @@ pub trait Executor { /// All operations submitted before the join will have been /// returned either through ready/complete or join once join /// returns. - fn join(&mut self) -> Option>>; + fn join(&mut self) -> Box>; /// Iterate over completed items. - fn completed(&mut self) -> Option>>; + fn completed(&mut self) -> Box>; } /// Trivial single threaded IO to be used from executors. diff --git a/src/diskio/threaded.rs b/src/diskio/threaded.rs index ec02e4cf234..842fdb48955 100644 --- a/src/diskio/threaded.rs +++ b/src/diskio/threaded.rs @@ -98,7 +98,7 @@ impl<'a> Executor for Threaded<'a> { }) } - fn join(&mut self) -> Option>> { + fn join(&mut self) -> Box> { // Some explanation is in order. Even though the tar we are reading from (if // any) will have had its FileWithProgress download tracking // completed before we hit drop, that is not true if we are unwinding due to a @@ -157,26 +157,24 @@ impl<'a> Executor for Threaded<'a> { self.tx .send(Task::Sentinel) .expect("must still be listening"); - Some(Box::new(JoinIterator { + Box::new(JoinIterator { iter: self.rx.iter(), consume_sentinel: false, - })) + }) } - fn completed(&mut self) -> Option>> { - Some(Box::new(JoinIterator { + fn completed(&mut self) -> Box> { + Box::new(JoinIterator { iter: self.rx.try_iter(), consume_sentinel: true, - })) + }) } } impl<'a> Drop for Threaded<'a> { fn drop(&mut self) { // We are not permitted to fail - consume but do not handle the items. - if let Some(iter) = self.join() { - for _ in iter {} - } + self.join().for_each(drop); } } diff --git a/src/dist/component/package.rs b/src/dist/component/package.rs index 52c543e8655..29e94c87093 100644 --- a/src/dist/component/package.rs +++ b/src/dist/component/package.rs @@ -10,9 +10,11 @@ use crate::errors::*; use crate::utils::notifications::Notification; use crate::utils::utils; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::fmt; use std::io::{self, ErrorKind as IOErrorKind, Read}; +use std::iter::FromIterator; +use std::mem; use std::path::{Path, PathBuf}; use tar::EntryType; @@ -155,14 +157,17 @@ impl<'a> TarPackage<'a> { } } -// Handle the async result of io operations -fn filter_result(op: Item) -> io::Result<()> { - match op.result { +/// Handle the async result of io operations +/// Replaces op.result with Ok(()) +fn filter_result(op: &mut Item) -> io::Result<()> { + let result = mem::replace(&mut op.result, Ok(())); + match result { Ok(_) => Ok(()), Err(e) => match e.kind() { - // TODO: the IO execution logic should pass this back rather than - // being the code to ignore it. IOErrorKind::AlreadyExists => { + // mkdir of e.g. ~/.rustup already existing is just fine; + // for others it would be better to know whether it is + // expected to exist or not -so put a flag in the state. if let Kind::Directory = op.kind { Ok(()) } else { @@ -174,6 +179,47 @@ fn filter_result(op: Item) -> io::Result<()> { } } +/// Dequeue the children of directories queued up waiting for the directory to +/// be created. +/// +/// Currently the volume of queued items does not count as backpressure against +/// the main tar extraction process. +fn trigger_children( + io_executor: &mut dyn Executor, + directories: &mut HashMap, + item: Item, +) -> Result { + let mut result = 0; + if let Kind::Directory = item.kind { + let mut pending = Vec::new(); + directories + .entry(item.full_path) + .and_modify(|status| match status { + DirStatus::Exists => unreachable!(), + DirStatus::Pending(pending_inner) => { + pending.append(pending_inner); + *status = DirStatus::Exists; + } + }) + .or_insert_with(|| unreachable!()); + result += pending.len(); + for pending_item in pending.into_iter() { + for mut item in Vec::from_iter(io_executor.execute(pending_item)) { + // TODO capture metrics + filter_result(&mut item).chain_err(|| ErrorKind::ExtractingPackage)?; + result += trigger_children(io_executor, directories, item)?; + } + } + }; + Ok(result) +} + +/// What is the status of this directory ? +enum DirStatus { + Exists, + Pending(Vec), +} + fn unpack_without_first_dir<'a, R: Read>( archive: &mut tar::Archive, path: &Path, @@ -183,9 +229,11 @@ fn unpack_without_first_dir<'a, R: Read>( let entries = archive .entries() .chain_err(|| ErrorKind::ExtractingPackage)?; - let mut checked_parents: HashSet = HashSet::new(); + let mut directories: HashMap = HashMap::new(); + // Path is presumed to exist. Call it a precondition. + directories.insert(path.to_owned(), DirStatus::Exists); - for entry in entries { + 'entries: for entry in entries { let mut entry = entry.chain_err(|| ErrorKind::ExtractingPackage)?; let relpath = { let path = entry.path(); @@ -200,9 +248,13 @@ fn unpack_without_first_dir<'a, R: Read>( } } let mut components = relpath.components(); - // Throw away the first path component: we make our own root + // Throw away the first path component: our root was supplied. components.next(); let full_path = path.join(&components.as_path()); + if full_path == path { + // The tmp dir code makes the root dir for us. + continue; + } let size = entry.header().size()?; if size > 100_000_000 { @@ -236,8 +288,11 @@ fn unpack_without_first_dir<'a, R: Read>( let o_mode = g_mode >> 3; let mode = u_mode | g_mode | o_mode; - let item = match kind { - EntryType::Directory => Item::make_dir(full_path, mode), + let mut item = match kind { + EntryType::Directory => { + directories.insert(full_path.to_owned(), DirStatus::Pending(Vec::new())); + Item::make_dir(full_path, mode) + } EntryType::Regular => { let mut v = Vec::with_capacity(size as usize); entry.read_to_end(&mut v)?; @@ -246,45 +301,64 @@ fn unpack_without_first_dir<'a, R: Read>( _ => return Err(ErrorKind::UnsupportedKind(format!("{:?}", kind)).into()), }; - // FUTURE: parallelise or delete (surely all distribution tars are well formed in this regard). - // Create the full path to the entry if it does not exist already - if let Some(parent) = item.full_path.parent() { - if !checked_parents.contains(parent) { - checked_parents.insert(parent.to_owned()); - // It would be nice to optimise this stat out, but the tar could be like so: - // a/deep/file.txt - // a/file.txt - // which would require tracking the segments rather than a simple hash. - // Until profile shows that one stat per dir is a problem (vs one stat per file) - // leave till later. - - if !parent.exists() { - let path_display = format!("{}", parent.display()); - trace_scoped!("create_dir_all", "name": path_display); - std::fs::create_dir_all(&parent).chain_err(|| ErrorKind::ExtractingPackage)? + let item = loop { + // Create the full path to the entry if it does not exist already + if let Some(parent) = item.full_path.to_owned().parent() { + match directories.get_mut(parent) { + None => { + // Tar has item before containing directory + // Complain about this so we can see if these exist. + eprintln!( + "Unexpected: missing parent '{}' for '{}'", + parent.display(), + entry.path()?.display() + ); + directories.insert(parent.to_owned(), DirStatus::Pending(vec![item])); + item = Item::make_dir(parent.to_owned(), 0o755); + // Check the parent's parent + continue; + } + Some(DirStatus::Exists) => { + break item; + } + Some(DirStatus::Pending(pending)) => { + // Parent dir is being made, take next item from tar + pending.push(item); + continue 'entries; + } } + } else { + unreachable!(); } - } + }; - for item in io_executor.execute(item) { - // TODO capture metrics, add directories to created cache - filter_result(item).chain_err(|| ErrorKind::ExtractingPackage)?; + for mut item in Vec::from_iter(io_executor.execute(item)) { + // TODO capture metrics + filter_result(&mut item).chain_err(|| ErrorKind::ExtractingPackage)?; + trigger_children(&mut *io_executor, &mut directories, item)?; } // drain completed results to keep memory pressure low - if let Some(iter) = io_executor.completed() { - for prev_item in iter { - // TODO capture metrics, add directories to created cache - filter_result(prev_item).chain_err(|| ErrorKind::ExtractingPackage)?; - } + + for mut item in Vec::from_iter(io_executor.completed()) { + // TODO capture metrics + filter_result(&mut item).chain_err(|| ErrorKind::ExtractingPackage)?; + trigger_children(&mut *io_executor, &mut directories, item)?; } } - if let Some(iter) = io_executor.join() { - for item in iter { + loop { + let mut triggered = 0; + for mut item in Vec::from_iter(io_executor.join()) { // handle final IOs - // TODO capture metrics, add directories to created cache - filter_result(item).chain_err(|| ErrorKind::ExtractingPackage)?; + // TODO capture metrics + filter_result(&mut item).chain_err(|| ErrorKind::ExtractingPackage)?; + triggered += trigger_children(&mut *io_executor, &mut directories, item)?; + } + if triggered == 0 { + // None of the IO submitted before the prior join triggered any new + // submissions + break; } }