Skip to content

Commit

Permalink
Download Directory recursively from remote CAS (pantsbuild#6336)
Browse files Browse the repository at this point in the history
### Problem

We dont currently have the ability to download a `Directory`, and its contents, recursively.

### Solution

Add `ensure_local_has_recursive_directory()` to be able to download  a `Directory`, and its contents, recursively.
  • Loading branch information
ity authored Aug 14, 2018
1 parent e391cd1 commit a96001e
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 4 deletions.
106 changes: 102 additions & 4 deletions src/rust/engine/fs/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ impl Store {
}
}

///
/// Store a file locally.
///
pub fn store_file_bytes(&self, bytes: Bytes, initial_lease: bool) -> BoxFuture<Digest, String> {
let len = bytes.len();
self
Expand All @@ -106,8 +109,9 @@ impl Store {
}

///
/// Loads the bytes of the file with the passed fingerprint, and returns the result of applying f
/// to that value.
/// Loads the bytes of the file with the passed fingerprint from the local store and back-fill
/// from remote when necessary and possible (i.e. when remote is configured), and returns the
/// result of applying f to that value.
///
pub fn load_file_bytes_with<T: Send + 'static, F: Fn(Bytes) -> T + Send + Sync + 'static>(
&self,
Expand All @@ -128,8 +132,9 @@ impl Store {
}

///
/// Save the bytes of the Directory proto, without regard for any of the contents of any FileNodes
/// or DirectoryNodes therein (i.e. does not require that its children are already stored).
/// Save the bytes of the Directory proto locally, without regard for any of the
/// contents of any FileNodes or DirectoryNodes therein (i.e. does not require that its
/// children are already stored).
///
pub fn record_directory(
&self,
Expand All @@ -150,6 +155,8 @@ impl Store {
.to_boxed()
}

///
/// Loads a directory proto from the local store, back-filling from remote if necessary.
///
/// Guarantees that if an Ok Some value is returned, it is valid, and canonical, and its
/// fingerprint exactly matches that which is requested. Will return an Err if it would return a
Expand Down Expand Up @@ -190,6 +197,11 @@ impl Store {
)
}

///
/// Loads bytes from remote cas if required and possible (i.e. if remote is configured). Takes
/// two functions f_local and f_remote. These functions are any validation or transformations you
/// want to perform on the bytes received from the local and remote cas (if remote is configured).
///
fn load_bytes_with<
T: Send + 'static,
FLocal: Fn(Bytes) -> Result<T, String> + Send + Sync + 'static,
Expand Down Expand Up @@ -315,6 +327,47 @@ impl Store {
.to_boxed()
}

///
/// Download a directory from Remote ByteStore recursively to the local one. Called only with the
/// Digest of a Directory.
///
pub fn ensure_local_has_recursive_directory(
&self,
dir_digest: Digest
) -> BoxFuture<(), String> {
let store = self.clone();
self
.load_directory(dir_digest)
.and_then(move |directory_opt| {
directory_opt.ok_or_else(|| format!("Could not read dir with digest {:?}", dir_digest))
})
.and_then(move |directory| {
// Traverse the files within directory
let file_futures = directory
.get_files()
.iter()
.map(|file_node| {
let file_digest = try_future!(file_node.get_digest().into());
store.load_bytes_with(EntryType::File, file_digest, |_| Ok(()), |_| Ok(()))
})
.collect::<Vec<_>>();

// Recursively call with sub-directories
let directory_futures = directory
.get_directories()
.iter()
.map (move |child_dir| {
let child_digest = try_future!(child_dir.get_digest().into());
store.ensure_local_has_recursive_directory(child_digest)
})
.collect::<Vec<_>>();
future::join_all(file_futures)
.join(future::join_all(directory_futures))
.map(|_| ())
})
.to_boxed()
}

pub fn lease_all<'a, Ds: Iterator<Item = &'a Digest>>(&self, digests: Ds) -> Result<(), String> {
self.local.lease_all(digests)
}
Expand Down Expand Up @@ -405,6 +458,10 @@ impl Store {
.to_boxed()
}

///
/// Lays out the directory and all of its contents (files and directories) on disk so that a
/// process which uses the directory structure can run.
///
pub fn materialize_directory(
&self,
destination: PathBuf,
Expand Down Expand Up @@ -2197,6 +2254,47 @@ mod tests {
);
}

#[test]
fn load_recursive_directory() {
let dir = TempDir::new().unwrap();

let roland = TestData::roland();
let catnip = TestData::catnip();
let testdir = TestDirectory::containing_roland();
let testdir_digest = testdir.digest();
let testdir_directory = testdir.directory();
let recursive_testdir = TestDirectory::recursive();
let recursive_testdir_directory = recursive_testdir.directory();
let recursive_testdir_digest = recursive_testdir.digest();

let cas = StubCAS::with_content(
1024,
vec![roland.clone(), catnip.clone()], vec![testdir, recursive_testdir]
);
new_store(dir.path(), cas.address())
.ensure_local_has_recursive_directory(recursive_testdir_digest)
.wait()
.expect("Downloading recursive directory should have succeeded.");

assert_eq!(
load_file_bytes(&new_local_store(dir.path()), roland.digest()),
Ok(Some(roland.bytes()))
);
assert_eq!(
load_file_bytes(&new_local_store(dir.path()), catnip.digest()),
Ok(Some(catnip.bytes()))
);
assert_eq!(
new_local_store(dir.path()).load_directory(testdir_digest).wait(),
Ok(Some(testdir_directory))
);
assert_eq!(
new_local_store(dir.path()).load_directory(recursive_testdir_digest).wait(),
Ok(Some(recursive_testdir_directory))
);
}


#[test]
fn load_file_missing_is_none() {
let dir = TempDir::new().unwrap();
Expand Down
1 change: 1 addition & 0 deletions src/rust/engine/testutil/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use hashing;
use protobuf::Message;
use sha2::{self, Digest};

#[derive(Clone)]
pub struct TestData {
string: String,
}
Expand Down

0 comments on commit a96001e

Please sign in to comment.