Skip to content

Commit

Permalink
Review feedback.
Browse files Browse the repository at this point in the history
# Rust tests and lints will be skipped. Delete if not intended.
[ci skip-rust]

# Building wheels and fs_util will be skipped. Delete if not intended.
[ci skip-build-wheels]
  • Loading branch information
stuhood committed Jan 19, 2022
1 parent e28c35b commit b83c36c
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 38 deletions.
23 changes: 7 additions & 16 deletions src/python/pants/core/goals/fmt.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from dataclasses import dataclass
from typing import TypeVar, cast

from pants.core.goals.style_request import StyleRequest
from pants.core.goals.style_request import StyleRequest, style_batch_size_help
from pants.core.util_rules.source_files import SourceFiles, SourceFilesRequest
from pants.engine.console import Console
from pants.engine.engine_aware import EngineAwareReturnType
Expand Down Expand Up @@ -139,7 +139,11 @@ def register_options(cls, register) -> None:
removal_version="2.11.0.dev0",
removal_hint=(
"Formatters are now broken into multiple batches by default using the "
"`--batch-size` argument."
"`--batch-size` argument.\n"
"\n"
"To keep (roughly) this option's behavior, set [fmt].batch_size = 1. However, "
"you'll likely get better performance by using a larger batch size because of "
"reduced overhead launching processes."
),
help=(
"Rather than formatting all files in a single batch, format each file as a "
Expand All @@ -156,20 +160,7 @@ def register_options(cls, register) -> None:
advanced=True,
type=int,
default=128,
help=(
"The target minimum number of files that will be included in each formatter batch.\n"
"\n"
"Formatter processes are batched for a few reasons:\n"
"\n"
"1. to avoid OS argument length limits (in processes which don't support argument "
"files)\n"
"2. to support more stable cache keys than would be possible if all files were "
"operated on in a single batch.\n"
"3. to allow for parallelism in formatter processes which don't have internal "
"parallelism, or -- if they do support internal parallelism -- to improve scheduling "
"behavior when multiple processes are competing for cores and so internal "
"parallelism cannot be used perfectly.\n"
),
help=style_batch_size_help(uppercase="Formatter", lowercase="formatter"),
)

@property
Expand Down
29 changes: 10 additions & 19 deletions src/python/pants/core/goals/lint.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from dataclasses import dataclass
from typing import Any, Iterable, cast

from pants.core.goals.style_request import StyleRequest, write_reports
from pants.core.goals.style_request import StyleRequest, style_batch_size_help, write_reports
from pants.core.util_rules.distdir import DistDir
from pants.engine.console import Console
from pants.engine.engine_aware import EngineAwareReturnType
Expand Down Expand Up @@ -157,7 +157,11 @@ def register_options(cls, register) -> None:
removal_version="2.11.0.dev0",
removal_hint=(
"Linters are now broken into multiple batches by default using the "
"`--batch-size` argument."
"`--batch-size` argument.\n"
"\n"
"To keep (roughly) this option's behavior, set [lint].batch_size = 1. However, "
"you'll likely get better performance by using a larger batch size because of "
"reduced overhead launching processes."
),
help=(
"Rather than linting all files in a single batch, lint each file as a "
Expand All @@ -174,20 +178,7 @@ def register_options(cls, register) -> None:
advanced=True,
type=int,
default=128,
help=(
"The target minimum number of files that will be included in each linter batch.\n"
"\n"
"Linter processes are batched for a few reasons:\n"
"\n"
"1. to avoid OS argument length limits (in processes which don't support argument "
"files)\n"
"2. to support more stable cache keys than would be possible if all files were "
"operated on in a single batch.\n"
"3. to allow for parallelism in linter processes which don't have internal "
"parallelism, or -- if they do support internal parallelism -- to improve scheduling "
"behavior when multiple processes are competing for cores and so internal "
"parallelism cannot be used perfectly.\n"
),
help=style_batch_size_help(uppercase="Linter", lowercase="linter"),
)

@property
Expand Down Expand Up @@ -235,10 +226,10 @@ def address_str(fs: FieldSet) -> str:
return fs.address.spec

all_batch_results = await MultiGet(
Get(LintResults, LintRequest, request.__class__(field_sets))
Get(LintResults, LintRequest, request.__class__(field_set_batch))
for request in requests
if request.field_sets
for field_sets in partition_sequentially(
for field_set_batch in partition_sequentially(
request.field_sets, key=address_str, size_min=lint_subsystem.batch_size
)
)
Expand All @@ -262,7 +253,7 @@ def key_fn(results: LintResults):
sorted_all_batch_results, key=key_fn
)
),
key=lambda results: results.linter_name,
key=key_fn,
)
)

Expand Down
17 changes: 17 additions & 0 deletions src/python/pants/core/goals/style_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,23 @@
_FS = TypeVar("_FS", bound=FieldSet)


def style_batch_size_help(uppercase: str, lowercase: str) -> str:
return (
f"The target minimum number of files that will be included in each {lowercase} batch.\n"
"\n"
f"{uppercase} processes are batched for a few reasons:\n"
"\n"
"1. to avoid OS argument length limits (in processes which don't support argument "
"files)\n"
"2. to support more stable cache keys than would be possible if all files were "
"operated on in a single batch.\n"
f"3. to allow for parallelism in {lowercase} processes which don't have internal "
"parallelism, or -- if they do support internal parallelism -- to improve scheduling "
"behavior when multiple processes are competing for cores and so internal "
"parallelism cannot be used perfectly.\n"
)


@frozen_after_init
@dataclass(unsafe_hash=True)
class StyleRequest(Generic[_FS], EngineAwareParameter, metaclass=ABCMeta):
Expand Down
11 changes: 8 additions & 3 deletions src/python/pants/util/collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import collections
import collections.abc
import math
from typing import Any, Callable, Iterable, Iterator, MutableMapping, Sequence, TypeVar
from typing import Any, Callable, Iterable, Iterator, MutableMapping, TypeVar

from pants.engine.internals import native_engine

Expand Down Expand Up @@ -77,7 +77,7 @@ def ensure_str_list(val: str | Iterable[str], *, allow_single_str: bool = False)


def partition_sequentially(
items: Sequence[_T],
items: Iterable[_T],
*,
key: Callable[[_T], str],
size_min: int,
Expand All @@ -95,7 +95,12 @@ def partition_sequentially(
# To stably partition the arguments into ranges of at least `size_min`, we sort them, and
# create a new batch sequentially once we have the minimum number of entries, _and_ we encounter
# an item hash prefixed with a threshold of zeros.
zero_prefix_threshold = math.log(size_min // 8, 2)
#
# The hashes act like a (deterministic) series of rolls of an evenly distributed die. The
# probability of a hash prefixed with Z zero bits is 1/2^Z, and so to break after N items on
# average, we look for `Z == log2(N)` zero bits.
#
zero_prefix_threshold = math.log(max(4, size_min) // 4, 2)
size_max = size_min * 2 if size_max is None else size_max

batch: list[_T] = []
Expand Down
21 changes: 21 additions & 0 deletions src/python/pants/util/collections_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
assert_single_element,
ensure_list,
ensure_str_list,
partition_sequentially,
recursively_update,
)

Expand Down Expand Up @@ -85,3 +86,23 @@ def test_ensure_str_list() -> None:
ensure_str_list(0) # type: ignore[arg-type]
with pytest.raises(ValueError):
ensure_str_list([0, 1]) # type: ignore[list-item]


@pytest.mark.parametrize("size_min", [0, 1, 16, 32, 64, 128])
def test_partition_sequentially(size_min: int) -> None:
# Adding an item at any position in the input sequence should affect either 1 or 2 (if the added
# item becomes a boundary) buckets in the output.

def partitioned_buckets(items: list[str]) -> set[tuple[str, ...]]:
return set(tuple(p) for p in partition_sequentially(items, key=str, size_min=size_min))

# We start with base items containing every other element from a sorted sequence.
all_items = sorted((f"item{i}" for i in range(0, 64)))
base_items = [item for i, item in enumerate(all_items) if i % 2 == 0]
base_partitions = partitioned_buckets(base_items)

# Then test that adding any of the remaining items elements (which will be interspersed in the
# base items) only affects 1 or 2 buckets in the output.
for to_add in [item for i, item in enumerate(all_items) if i % 2 == 1]:
updated_partitions = partitioned_buckets([to_add, *base_items])
assert 1 <= len(base_partitions ^ updated_partitions) <= 2

0 comments on commit b83c36c

Please sign in to comment.