Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize CreateDigest implementation. #16617

Merged
merged 2 commits into from
Aug 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/rust/engine/fs/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,23 @@ impl Store {
.await
}

///
/// A convenience method for storing batches of small files.
///
/// NB: This method should not be used for large blobs: prefer to stream them from their source
/// using `store_file`.
///
pub async fn store_file_bytes_batch(
&self,
items: Vec<(Option<Digest>, Bytes)>,
initial_lease: bool,
) -> Result<Vec<Digest>, String> {
self
.local
.store_bytes_batch(EntryType::File, items, initial_lease)
.await
}

///
/// Store a file locally by streaming its contents.
///
Expand Down
80 changes: 43 additions & 37 deletions src/rust/engine/src/intrinsics.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright 2021 Pants project contributors (see CONTRIBUTORS.md).
// Licensed under the Apache License, Version 2.0 (see LICENSE).

use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::process::Stdio;
use std::str::FromStr;
Expand All @@ -18,14 +19,15 @@ use crate::tasks::Intrinsic;
use crate::types::Types;
use crate::Failure;

use futures::future::{self, BoxFuture, FutureExt, TryFutureExt};
use bytes::Bytes;
use futures::future::{BoxFuture, FutureExt, TryFutureExt};
use futures::try_join;
use indexmap::IndexMap;
use pyo3::{PyAny, PyRef, Python, ToPyObject};
use tokio::process;

use fs::{DirectoryDigest, RelativePath};
use hashing::Digest;
use fs::{DigestTrie, DirectoryDigest, PathStat, RelativePath};
use hashing::{Digest, EMPTY_DIGEST};
use process_execution::local::{apply_chroot, create_sandbox, prepare_workdir, KeepSandboxes};
use process_execution::ManagedChild;
use stdio::TryCloneAsFile;
Expand Down Expand Up @@ -406,6 +408,8 @@ fn create_digest_to_digest(
context: Context,
args: Vec<Value>,
) -> BoxFuture<'static, NodeResult<Value>> {
let mut new_file_count = 0;

let items: Vec<CreateDigestItem> = {
let gil = Python::acquire_gil();
let py = gil.python();
Expand All @@ -419,6 +423,7 @@ fn create_digest_to_digest(
if obj.hasattr("content").unwrap() {
let bytes = bytes::Bytes::from(externs::getattr::<Vec<u8>>(obj, "content").unwrap());
let is_executable: bool = externs::getattr(obj, "is_executable").unwrap();
new_file_count += 1;
CreateDigestItem::FileContent(path, bytes, is_executable)
} else if obj.hasattr("file_digest").unwrap() {
let py_file_digest: PyFileDigest = externs::getattr(obj, "file_digest").unwrap();
Expand All @@ -431,45 +436,46 @@ fn create_digest_to_digest(
.collect()
};

// TODO: Rather than creating independent Digests and then merging them, this should use
// `DigestTrie::from_path_stats`.
// see https://github.com/pantsbuild/pants/pull/14569#issuecomment-1057286943
let digest_futures: Vec<_> = items
.into_iter()
.map(|item| {
let store = context.core.store();
async move {
match item {
CreateDigestItem::FileContent(path, bytes, is_executable) => {
let digest = store.store_file_bytes(bytes, true).await?;
let snapshot = store
.snapshot_of_one_file(path, digest, is_executable)
.await?;
let res: Result<DirectoryDigest, String> = Ok(snapshot.into());
res
}
CreateDigestItem::FileEntry(path, digest, is_executable) => {
let snapshot = store
.snapshot_of_one_file(path, digest, is_executable)
.await?;
let res: Result<_, String> = Ok(snapshot.into());
res
}
CreateDigestItem::Dir(path) => store
.create_empty_dir(&path)
.await
.map_err(|e| e.to_string()),
}
let mut path_stats: Vec<PathStat> = Vec::with_capacity(items.len());
let mut file_digests: HashMap<PathBuf, Digest> = HashMap::with_capacity(items.len());
let mut bytes_to_store: Vec<(Option<Digest>, Bytes)> = Vec::with_capacity(new_file_count);

for item in items {
match item {
CreateDigestItem::FileContent(path, bytes, is_executable) => {
let digest = Digest::of_bytes(&bytes);
bytes_to_store.push((Some(digest), bytes));
let stat = fs::File {
path: path.to_path_buf(),
is_executable,
};
path_stats.push(PathStat::file(path.to_path_buf(), stat));
file_digests.insert(path.to_path_buf(), digest);
}
})
.collect();
CreateDigestItem::FileEntry(path, digest, is_executable) => {
let stat = fs::File {
path: path.to_path_buf(),
is_executable,
};
path_stats.push(PathStat::file(path.to_path_buf(), stat));
file_digests.insert(path.to_path_buf(), digest);
}
CreateDigestItem::Dir(path) => {
let stat = fs::Dir(path.to_path_buf());
path_stats.push(PathStat::dir(path.to_path_buf(), stat));
file_digests.insert(path.to_path_buf(), EMPTY_DIGEST);
}
}
}

let store = context.core.store();
async move {
let digests = future::try_join_all(digest_futures).await?;
let digest = store.merge(digests).await?;
// The digests returned here are already in the `file_digests` map.
let _ = store.store_file_bytes_batch(bytes_to_store, true).await?;
let trie = DigestTrie::from_path_stats(path_stats, &file_digests)?;

let gil = Python::acquire_gil();
let value = Snapshot::store_directory_digest(gil.python(), digest)?;
let value = Snapshot::store_directory_digest(gil.python(), trie.into())?;
Ok(value)
}
.boxed()
Expand Down