Skip to content

Commit

Permalink
[performance] Port Process operations to use of DigestTrie (#14723)
Browse files Browse the repository at this point in the history
This change ports the most meaningful remaining methods for #13112 to use of `DigestTrie`: in particular:
* `Process` inputs and outputs
* `materialize_directory`

A few other notable usecases remain un-ported for followup work (marked by #13112 TODOs and `todo_as_digest`/`todo_from_digest`), but this change shows a speedup of 56% for a microbenchmark of `pants.core.util_rules.source_files.determine_source_files`, and drops `top`-reported memory usage for common cases by 10%.
  • Loading branch information
stuhood authored Mar 10, 2022
1 parent 42db2fc commit 4e7c57d
Show file tree
Hide file tree
Showing 28 changed files with 522 additions and 431 deletions.
6 changes: 5 additions & 1 deletion src/python/pants/engine/fs_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ def prime_store_with_roland_digest(rule_runner: RuleRunner) -> None:
assert snapshot.files == ("roland",)
assert snapshot.digest == ROLAND_DIGEST

# NB: Capturing a Snapshot avoids persisting directory entries to disk, so we have to ensure
# that independently.
rule_runner.scheduler.ensure_directory_digest_persisted(snapshot.digest)


def setup_fs_test_tar(rule_runner: RuleRunner) -> None:
"""Extract fs_test.tar into the rule_runner's build root.
Expand Down Expand Up @@ -1126,7 +1130,7 @@ def test_digest_is_not_file_digest() -> None:


def test_snapshot_properties() -> None:
digest = Digest("a" * 64, 1000)
digest = Digest("691638f4d58abaa8cfdc9af2e00682f13f07f96ad1d177f146216a7341ca4982", 154)
snapshot = Snapshot._unsafe_create(digest, ["f.ext", "dir/f.ext"], ["dir"])
assert snapshot.digest == digest
assert snapshot.files == ("dir/f.ext", "f.ext")
Expand Down
1 change: 1 addition & 0 deletions src/python/pants/engine/internals/native_engine.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ def capture_snapshots(
def ensure_remote_has_recursive(
scheduler: PyScheduler, digests: list[Digest | FileDigest]
) -> None: ...
def ensure_directory_digest_persisted(scheduler: PyScheduler, digest: Digest) -> None: ...
def single_file_digests_to_bytes(
scheduler: PyScheduler, digests: list[FileDigest]
) -> list[bytes]: ...
Expand Down
3 changes: 3 additions & 0 deletions src/python/pants/engine/internals/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,9 @@ def snapshots_to_file_contents(
def ensure_remote_has_recursive(self, digests: Sequence[Digest | FileDigest]) -> None:
native_engine.ensure_remote_has_recursive(self.py_scheduler, list(digests))

def ensure_directory_digest_persisted(self, digest: Digest) -> None:
native_engine.ensure_directory_digest_persisted(self.py_scheduler, digest)

def write_digest(self, digest: Digest, *, path_prefix: str | None = None) -> None:
"""Write a digest to disk, relative to the build root."""
if path_prefix and PurePath(path_prefix).is_absolute():
Expand Down
35 changes: 19 additions & 16 deletions src/rust/engine/fs/fs_util/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -523,8 +523,9 @@ async fn execute(top_match: &clap::ArgMatches) -> Result<(), ExitError> {
.load_tree_from_remote(digest)
.await
.expect("protocol error");
let output_digest =
output_digest_opt.ok_or_else(|| ExitError("not found".into(), ExitCode::NotFound))?;
let output_digest = DirectoryDigest::from_persisted_digest(
output_digest_opt.ok_or_else(|| ExitError("not found".into(), ExitCode::NotFound))?,
);
store
.materialize_directory(destination, output_digest, Permissions::Writable)
.await
Expand All @@ -547,7 +548,7 @@ async fn execute(top_match: &clap::ArgMatches) -> Result<(), ExitError> {
.unwrap()
.parse::<usize>()
.expect("size_bytes must be a non-negative number");
let digest = Digest::new(fingerprint, size_bytes);
let digest = DirectoryDigest::from_persisted_digest(Digest::new(fingerprint, size_bytes));
store
.materialize_directory(destination, digest, Permissions::Writable)
.await
Expand Down Expand Up @@ -583,7 +584,6 @@ async fn execute(top_match: &clap::ArgMatches) -> Result<(), ExitError> {
.map_err(|e| format!("Error expanding globs: {:?}", e))?;

let snapshot = Snapshot::from_path_stats(
store_copy.clone(),
store::OneOffStoreFileByDigest::new(store_copy, posix_fs, false),
paths,
)
Expand All @@ -601,7 +601,8 @@ async fn execute(top_match: &clap::ArgMatches) -> Result<(), ExitError> {
.unwrap()
.parse::<usize>()
.expect("size_bytes must be a non-negative number");
let mut digest = Digest::new(fingerprint, size_bytes);
let mut digest =
DirectoryDigest::from_persisted_digest(Digest::new(fingerprint, size_bytes));

if let Some(prefix_to_strip) = args.value_of("child-dir") {
let mut result = store
Expand All @@ -619,31 +620,33 @@ async fn execute(top_match: &clap::ArgMatches) -> Result<(), ExitError> {
// It's a shame we can't just .and_then here, because we can't use async closures.
if let Ok(subset_digest) = result {
result = store
.strip_prefix(
DirectoryDigest::todo_from_digest(subset_digest),
&RelativePath::new(prefix_to_strip)?,
)
.await
.map(|dd| dd.as_digest());
.strip_prefix(subset_digest, &RelativePath::new(prefix_to_strip)?)
.await;
}
digest = result.map_err(|err| match err {
SnapshotOpsError::String(string)
| SnapshotOpsError::DigestMergeFailure(string)
| SnapshotOpsError::GlobMatchError(string) => string,
})?
})?;

// TODO: The below operations don't strictly need persistence: we could render the
// relevant `DigestTrie` directly. See #13112.
store
.ensure_directory_digest_persisted(digest.clone())
.await?;
}

let proto_bytes: Option<Vec<u8>> = match args.value_of("output-format").unwrap() {
"binary" => {
let maybe_directory = store.load_directory(digest).await?;
let maybe_directory = store.load_directory(digest.as_digest()).await?;
maybe_directory.map(|d| d.to_bytes().to_vec())
}
"text" => {
let maybe_p = store.load_directory(digest).await?;
let maybe_p = store.load_directory(digest.as_digest()).await?;
maybe_p.map(|p| format!("{:?}\n", p).as_bytes().to_vec())
}
"recursive-file-list" => {
let maybe_v = expand_files(store, digest).await?;
let maybe_v = expand_files(store, digest.as_digest()).await?;
maybe_v
.map(|v| {
v.into_iter()
Expand All @@ -654,7 +657,7 @@ async fn execute(top_match: &clap::ArgMatches) -> Result<(), ExitError> {
.map(String::into_bytes)
}
"recursive-file-list-with-digests" => {
let maybe_v = expand_files(store, digest).await?;
let maybe_v = expand_files(store, digest.as_digest()).await?;
maybe_v
.map(|v| {
v.into_iter()
Expand Down
68 changes: 47 additions & 21 deletions src/rust/engine/fs/src/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Licensed under the Apache License, Version 2.0 (see LICENSE).

use std::collections::HashMap;
use std::fmt::{self, Debug};
use std::fmt::{self, Debug, Display};
use std::hash::{self, Hash};
use std::ops::Deref;
use std::path::{Path, PathBuf};
Expand All @@ -12,6 +12,7 @@ use deepsize::{known_deep_size, DeepSizeOf};
use internment::Intern;
use itertools::Itertools;
use lazy_static::lazy_static;
use serde::Serialize;

// TODO: Extract protobuf-specific pieces to a new crate.
use grpc_util::prost::MessageExt;
Expand All @@ -35,10 +36,11 @@ lazy_static! {
/// persisted to the Store (either locally or remotely). The field thus acts likes a cache in some
/// cases, but in other cases is an indication that the tree must first be persisted (or loaded)
/// before the Digest may be operated on.
#[derive(Clone, DeepSizeOf)]
#[derive(Clone, DeepSizeOf, Serialize)]
pub struct DirectoryDigest {
// NB: Private in order to force a choice between `todo_as_digest` and `as_digest`.
digest: Digest,
#[serde(skip_serializing)]
pub tree: Option<DigestTrie>,
}

Expand Down Expand Up @@ -71,7 +73,10 @@ impl Debug for DirectoryDigest {
} else {
"None"
};
write!(f, "DirectoryDigest({:?}, tree: {})", self.digest, tree)
f.debug_struct("DirectoryDigest")
.field("digest", &self.digest)
.field("tree", &tree)
.finish()
}
}

Expand All @@ -80,7 +85,8 @@ impl DirectoryDigest {
/// identifies the DigestTrie).
pub fn new(digest: Digest, tree: DigestTrie) -> Self {
if cfg!(debug_assertions) {
assert!(digest == tree.compute_root_digest());
let actual = tree.compute_root_digest();
assert!(digest == actual, "Expected {digest:?} but got {actual:?}");
}
Self {
digest,
Expand Down Expand Up @@ -109,13 +115,6 @@ impl DirectoryDigest {
Self { digest, tree: None }
}

pub fn from_tree(tree: DigestTrie) -> Self {
Self {
digest: tree.compute_root_digest(),
tree: Some(tree),
}
}

/// Returns the `Digest` for this `DirectoryDigest`.
///
/// TODO: If a callsite needs to convert to `Digest` as a convenience (i.e. in a location where
Expand All @@ -128,7 +127,7 @@ impl DirectoryDigest {
/// Marks a callsite that is discarding the `DigestTrie` held by this `DirectoryDigest` as a
/// temporary convenience, rather than updating its signature to return a `DirectoryDigest`. All
/// usages of this method should be removed before closing #13112.
pub fn todo_as_digest(self) -> Digest {
pub fn todo_as_digest(&self) -> Digest {
self.digest
}

Expand All @@ -150,8 +149,8 @@ impl DirectoryDigest {
/// A single component of a filesystem path.
///
/// For example: the path `foo/bar` will be broken up into `Name("foo")` and `Name("bar")`.
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
struct Name(Intern<String>);
#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)]
pub struct Name(Intern<String>);
// NB: Calculating the actual deep size of an `Intern` is very challenging, because it does not
// keep any record of the number of held references, and instead effectively makes its held value
// static. Switching to `ArcIntern` would get accurate counts at the cost of performance and size.
Expand All @@ -165,14 +164,20 @@ impl Deref for Name {
}
}

#[derive(Clone, DeepSizeOf)]
impl Display for Name {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
write!(f, "{}", self.0.as_ref())
}
}

#[derive(Clone, Debug, DeepSizeOf)]
pub enum Entry {
Directory(Directory),
File(File),
}

impl Entry {
fn name(&self) -> Name {
pub fn name(&self) -> Name {
match self {
Entry::Directory(d) => d.name,
Entry::File(f) => f.name,
Expand Down Expand Up @@ -200,8 +205,8 @@ impl Directory {
}
}

pub fn name(&self) -> &str {
self.name.as_ref()
pub fn name(&self) -> Name {
self.name
}

pub fn digest(&self) -> Digest {
Expand All @@ -224,16 +229,28 @@ impl Directory {
}
}

#[derive(Clone, DeepSizeOf)]
impl Debug for Directory {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// NB: To avoid over-large output, we don't render the Trie. It would likely be best rendered
// as e.g. JSON.
f.debug_struct("Directory")
.field("name", &self.name)
.field("digest", &self.digest)
.field("tree", &"..")
.finish()
}
}

#[derive(Clone, Debug, DeepSizeOf)]
pub struct File {
name: Name,
digest: Digest,
is_executable: bool,
}

impl File {
pub fn name(&self) -> &str {
self.name.as_ref()
pub fn name(&self) -> Name {
self.name
}

pub fn digest(&self) -> Digest {
Expand Down Expand Up @@ -290,6 +307,15 @@ pub struct DigestTrie(Arc<[Entry]>);
// TODO: This avoids a `rustc` crasher (repro on 7f319ee84ad41bc0aea3cb01fb2f32dcd51be704).
unsafe impl Sync for DigestTrie {}

impl From<DigestTrie> for DirectoryDigest {
fn from(tree: DigestTrie) -> Self {
Self {
digest: tree.compute_root_digest(),
tree: Some(tree),
}
}
}

impl DigestTrie {
/// Create a DigestTrie from unique PathStats. Fails for duplicate items.
pub fn from_path_stats(
Expand Down
25 changes: 13 additions & 12 deletions src/rust/engine/fs/store/benches/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ use std::sync::Arc;
use std::time::Duration;

use fs::{
File, GitignoreStyleExcludes, GlobExpansionConjunction, PathStat, Permissions, PosixFS,
PreparedPathGlobs, StrictGlobMatching,
DirectoryDigest, File, GitignoreStyleExcludes, GlobExpansionConjunction, PathStat, Permissions,
PosixFS, PreparedPathGlobs, StrictGlobMatching,
};
use hashing::{Digest, EMPTY_DIGEST};
use hashing::EMPTY_DIGEST;
use protos::gen::build::bazel::remote::execution::v2 as remexec;
use task_executor::Executor;
use tempfile::TempDir;
Expand Down Expand Up @@ -70,7 +70,7 @@ pub fn criterion_benchmark_materialize(c: &mut Criterion) {
let dest = new_temp.path().to_path_buf();
std::mem::forget(new_temp);
let _ = executor
.block_on(store.materialize_directory(dest, digest, perms))
.block_on(store.materialize_directory(dest, digest.clone(), perms))
.unwrap();
})
},
Expand Down Expand Up @@ -116,7 +116,6 @@ pub fn criterion_benchmark_snapshot_capture(c: &mut Criterion) {
for _ in 0..captures {
let _ = executor
.block_on(Snapshot::from_path_stats(
store.clone(),
OneOffStoreFileByDigest::new(store.clone(), posix_fs.clone(), immutable),
path_stats.clone(),
))
Expand All @@ -140,7 +139,7 @@ pub fn criterion_benchmark_subset_wildcard(c: &mut Criterion) {
.bench_function("wildcard", |b| {
b.iter(|| {
let get_subset = store.subset(
digest,
digest.clone(),
SubsetParams {
globs: PreparedPathGlobs::create(
vec!["**/*".to_string()],
Expand All @@ -160,12 +159,15 @@ pub fn criterion_benchmark_merge(c: &mut Criterion) {
let num_files: usize = 4000;
let (store, _tempdir, digest) = snapshot(&executor, num_files, 100);

// Modify half of the files in the top-level directory by setting them to have the empty
// fingerprint (zero content).
executor
.block_on(store.ensure_directory_digest_persisted(digest.clone()))
.unwrap();
let directory = executor
.block_on(store.load_directory(digest))
.block_on(store.load_directory(digest.as_digest()))
.unwrap()
.unwrap();
// Modify half of the files in the top-level directory by setting them to have the empty
// fingerprint (zero content).
let mut all_file_nodes = directory.files.to_vec();
let mut file_nodes_to_modify = all_file_nodes.split_off(all_file_nodes.len() / 2);
for file_node in file_nodes_to_modify.iter_mut() {
Expand Down Expand Up @@ -330,7 +332,7 @@ fn snapshot(
executor: &Executor,
max_files: usize,
file_target_size: usize,
) -> (Store, TempDir, Digest) {
) -> (Store, TempDir, DirectoryDigest) {
// NB: We create the files in a tempdir rather than in memory in order to allow for more
// realistic benchmarking involving large files. The tempdir is dropped at the end of this method
// (after everything has been captured out of it).
Expand All @@ -348,14 +350,13 @@ fn snapshot(
)
.unwrap();
Snapshot::from_path_stats(
store2.clone(),
OneOffStoreFileByDigest::new(store2, Arc::new(posix_fs), true),
path_stats,
)
.await
})
.unwrap()
.digest;
.into();

(store, storedir, digest)
}
Loading

0 comments on commit 4e7c57d

Please sign in to comment.