Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose an API to pass multiple Params to an engine request #6871

Merged
merged 4 commits into from
Dec 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions src/python/pants/engine/legacy/address_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from pants.build_graph.address_mapper import AddressMapper
from pants.engine.addressable import BuildFileAddresses
from pants.engine.mapper import ResolveError
from pants.engine.nodes import Throw
from pants.util.dirutil import fast_relpath


Expand Down Expand Up @@ -67,12 +66,10 @@ def _internal_scan_specs(self, specs, fail_fast=True, missing_is_fatal=True):
# TODO: This should really use `product_request`, but on the other hand, we need to
# deprecate the entire `AddressMapper` interface anyway. See #4769.
request = self._scheduler.execution_request([BuildFileAddresses], [Specs(tuple(specs))])
result = self._scheduler.execute(request)
if result.error:
raise self.BuildFileScanError(str(result.error))
(_, state), = result.root_products
returns, throws = self._scheduler.execute(request)

if isinstance(state, Throw):
if throws:
_, state = throws[0]
stuhood marked this conversation as resolved.
Show resolved Hide resolved
if isinstance(state.exc, (AddressLookupError, ResolveError)):
if missing_is_fatal:
raise self.BuildFileScanError(
Expand All @@ -83,7 +80,9 @@ def _internal_scan_specs(self, specs, fail_fast=True, missing_is_fatal=True):
return set()
else:
raise self.BuildFileScanError(str(state.exc))
elif missing_is_fatal and not state.value.dependencies:

_, state = returns[0]
if missing_is_fatal and not state.value.dependencies:
raise self.BuildFileScanError(
'Spec `{}` does not match any targets.'.format(self._specs_string(specs)))

Expand Down
2 changes: 1 addition & 1 deletion src/python/pants/engine/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@
PyResult graph_visualize(Scheduler*, Session*, char*);
void graph_trace(Scheduler*, ExecutionRequest*, char*);

PyResult execution_add_root_select(Scheduler*, ExecutionRequest*, Key, TypeConstraint);
PyResult execution_add_root_select(Scheduler*, ExecutionRequest*, HandleBuffer, TypeConstraint);

PyResult capture_snapshots(Scheduler*, Handle);

Expand Down
134 changes: 31 additions & 103 deletions src/python/pants/engine/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,18 @@
import os
import time
from builtins import object, open, str, zip
from collections import defaultdict
from types import GeneratorType

from pants.base.exceptions import TaskError
from pants.base.project_tree import Dir, File, Link
from pants.build_graph.address import Address
from pants.engine.fs import (Digest, DirectoryToMaterialize, FileContent, FilesContent,
MergedDirectories, Path, PathGlobs, PathGlobsAndRoot, Snapshot,
UrlToFetch)
from pants.engine.isolated_process import ExecuteProcessRequest, FallibleExecuteProcessResult
from pants.engine.native import Function, TypeConstraint, TypeId
from pants.engine.nodes import Return, State, Throw
from pants.engine.nodes import Return, Throw
from pants.engine.rules import RuleIndex, SingletonRule, TaskRule
from pants.engine.selectors import Select, constraint_for
from pants.engine.selectors import Params, Select, constraint_for
from pants.rules.core.exceptions import GracefulTerminationException
from pants.util.contextutil import temporary_file_path
from pants.util.dirutil import check_no_overlapping_paths
Expand All @@ -43,34 +41,6 @@ class ExecutionRequest(datatype(['roots', 'native'])):
"""


class ExecutionResult(datatype(['error', 'root_products'])):
"""Represents the result of a single execution."""

@classmethod
def finished(cls, root_products):
"""Create a success or partial success result from a finished run.

Runs can either finish with no errors, satisfying all promises, or they can partially finish
if run in fail-slow mode producing as many products as possible.
:param root_products: List of ((subject, product), State) tuples.
:rtype: `ExecutionResult`
"""
return cls(error=None, root_products=root_products)

@classmethod
def failure(cls, error):
"""Create a failure result.

A failure result represent a run with a fatal error. It presents the error but no
products.

:param error: The execution error encountered.
:type error: :class:`pants.base.exceptions.TaskError`
:rtype: `ExecutionResult`
"""
return cls(error=error, root_products=None)


class ExecutionError(Exception):
def __init__(self, message, wrapped_exceptions=None):
super(ExecutionError, self).__init__(message)
Expand Down Expand Up @@ -170,6 +140,9 @@ def graph_trace(self, execution_request):
def _assert_ruleset_valid(self):
self._raise_or_return(self._native.lib.validator_run(self._scheduler))

def _to_vals_buf(self, objs):
return self._native.context.vals_buf(tuple(self._native.context.to_value(obj) for obj in objs))

def _to_value(self, obj):
return self._native.context.to_value(obj)

Expand Down Expand Up @@ -292,10 +265,14 @@ def invalidate_all_files(self):
def graph_len(self):
return self._native.lib.graph_len(self._scheduler)

def add_root_selection(self, execution_request, subject, product):
def add_root_selection(self, execution_request, subject_or_params, product):
if isinstance(subject_or_params, Params):
params = subject_or_params.params
else:
params = [subject_or_params]
res = self._native.lib.execution_add_root_select(self._scheduler,
execution_request,
self._to_key(subject),
self._to_vals_buf(params),
self._to_constraint(product))
self._raise_or_return(res)

Expand Down Expand Up @@ -475,12 +452,10 @@ def _maybe_visualize(self):
self._run_count += 1
self.visualize_graph_to_file(os.path.join(self._scheduler.visualize_to_dir(), name))

def schedule(self, execution_request):
"""Yields batches of Steps until the roots specified by the request have been completed.
def execute(self, execution_request):
"""Invoke the engine for the given ExecutionRequest, returning Return and Throw states.

This method should be called by exactly one scheduling thread, but the Step objects returned
by this method are intended to be executed in multiple threads, and then satisfied by the
scheduling thread.
:return: A tuple of (root, Return) tuples and (root, Throw) tuples.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i havent seen this return format before, but makes it a lot cleaner!

"""
start_time = time.time()
roots = list(zip(execution_request.roots,
Expand All @@ -495,23 +470,9 @@ def schedule(self, execution_request):
self._scheduler.graph_len()
)

return roots

def execute(self, execution_request):
"""Executes the requested build and returns the resulting root entries.

TODO: Merge with `schedule`.
TODO2: Use of TaskError here is... odd.

:param execution_request: The description of the goals to achieve.
:type execution_request: :class:`ExecutionRequest`
:returns: The result of the run.
:rtype: :class:`Engine.Result`
"""
try:
return ExecutionResult.finished(self.schedule(execution_request))
except TaskError as e:
return ExecutionResult.failure(e)
returns = tuple((root, state) for root, state in roots if type(state) is Return)
throws = tuple((root, state) for root, state in roots if type(state) is Throw)
return returns, throws

def _trace_on_error(self, unique_exceptions, request):
exception_noun = pluralize(len(unique_exceptions), 'Exception')
Expand All @@ -531,71 +492,38 @@ def _trace_on_error(self, unique_exceptions, request):

def run_console_rule(self, product, subject, v2_ui):
"""

:param product: product type for the request.
:param subject: subject for the request.
:param v2_ui: whether to render the v2 engine UI
:return: A dict from product type to lists of products each with length matching len(subjects).
"""
request = self.execution_request([product], [subject], v2_ui)
result = self.execute(request)
if result.error:
raise result.error

self._state_validation(result)
assert len(result.root_products) == 1
root, state = result.root_products[0]
if type(state) is Throw:
returns, throws = self.execute(request)

if throws:
_, state = throws[0]
exc = state.exc
if isinstance(exc, GracefulTerminationException):
raise exc
self._trace_on_error([exc], request)
return {result.root_products[0]: [state.value]}

def _state_validation(self, result):
# State validation.
unknown_state_types = tuple(
type(state) for _, state in result.root_products if type(state) not in (Throw, Return)
)
if unknown_state_types:
State.raise_unrecognized(unknown_state_types)

def products_request(self, products, subjects):
"""Executes a request for multiple products for some subjects, and returns the products.
def product_request(self, product, subjects):
"""Executes a request for a single product for some subjects, and returns the products.

:param list products: A list of product type for the request.
:param list subjects: A list of subjects for the request.
:returns: A dict from product type to lists of products each with length matching len(subjects).
:param class product: A product type for the request.
:param list subjects: A list of subjects or Params instances for the request.
:returns: A list of the requested products, with length match len(subjects).
"""
request = self.execution_request(products, subjects)
result = self.execute(request)
if result.error:
raise result.error

self._state_validation(result)
request = self.execution_request([product], subjects)
returns, throws = self.execute(request)

# Throw handling.
# TODO: See https://github.com/pantsbuild/pants/issues/3912
throw_root_states = tuple(state for root, state in result.root_products if type(state) is Throw)
if throw_root_states:
unique_exceptions = tuple({t.exc for t in throw_root_states})
if throws:
unique_exceptions = tuple({t.exc for _, t in throws})
self._trace_on_error(unique_exceptions, request)

# Everything is a Return: we rely on the fact that roots are ordered to preserve subject
# order in output lists.
product_results = defaultdict(list)
for (_, product), state in result.root_products:
product_results[product].append(state.value)
return product_results

def product_request(self, product, subjects):
"""Executes a request for a single product for some subjects, and returns the products.

:param class product: A product type for the request.
:param list subjects: A list of subjects for the request.
:returns: A list of the requested products, with length match len(subjects).
"""
return self.products_request([product], subjects)[product]
return [ret.value for _, ret in returns]

def capture_snapshots(self, path_globs_and_roots):
"""Synchronously captures Snapshots for each matching PathGlobs rooted at a its root directory.
Expand Down
27 changes: 12 additions & 15 deletions src/python/pants/engine/selectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@
from __future__ import absolute_import, division, print_function, unicode_literals

import ast
from abc import abstractproperty
from builtins import str

from pants.util.meta import AbstractClass
from pants.util.objects import Exactly, datatype


Expand Down Expand Up @@ -78,23 +76,17 @@ def __new__(cls, *args):
return super(Get, cls).__new__(cls, product, subject)


class Selector(AbstractClass):
class Params(datatype([('params', tuple)])):
"""A set of values with distinct types.

@property
def type_constraint(self):
"""The type constraint for the product type for this selector."""
return constraint_for(self.product)

@abstractproperty
def optional(self):
"""Return true if this Selector is optional. It may result in a `None` match."""
Distinct types are enforced at consumption time by the rust type of the same name.
"""

@abstractproperty
def product(self):
"""The product that this selector produces."""
def __new__(cls, *args):
return super(Params, cls).__new__(cls, tuple(args))


class Select(datatype(['product', 'optional']), Selector):
class Select(datatype(['product', 'optional'])):
"""Selects the given Product for the Subject provided to the constructor.

If optional=True and no matching product can be produced, will return None.
Expand All @@ -104,6 +96,11 @@ def __new__(cls, product, optional=False):
obj = super(Select, cls).__new__(cls, product, optional)
return obj

@property
def type_constraint(self):
"""The type constraint for the product type for this selector."""
return constraint_for(self.product)

def __repr__(self):
return '{}({}{})'.format(type(self).__name__,
type_or_constraint_repr(self.product),
Expand Down
4 changes: 1 addition & 3 deletions src/python/pants/init/engine_initializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,7 @@ def warm_product_graph(self, target_roots):
logger.debug('warming target_roots for: %r', target_roots)
subjects = self._determine_subjects(target_roots)
request = self.scheduler_session.execution_request([TransitiveHydratedTargets], subjects)
result = self.scheduler_session.execute(request)
if result.error:
raise result.error
self.scheduler_session.execute(request)

def validate_goals(self, goals):
"""Checks for @console_rules that satisfy requested goals.
Expand Down
26 changes: 26 additions & 0 deletions src/rust/engine/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,28 @@ pub type FNV = hash::BuildHasherDefault<FnvHasher>;
pub struct Params(SmallVec<[Key; 4]>);

impl Params {
pub fn new<I: IntoIterator<Item = Key>>(param_inputs: I) -> Result<Params, String> {
let mut params = param_inputs.into_iter().collect::<SmallVec<[Key; 4]>>();
params.sort_by_key(|k| *k.type_id());

if params.len() > 1 {
let mut prev = &params[0];
for param in &params[1..] {
if param.type_id() == prev.type_id() {
return Err(format!(
"Values used as `Params` must have distinct types, but the following values had the same type (`{}`):\n {}\n {}",
externs::type_to_str(*prev.type_id()),
externs::key_to_str(prev),
externs::key_to_str(param)
));
}
prev = param;
}
}

Ok(Params(params))
}

pub fn new_single(param: Key) -> Params {
Params(smallvec![param])
}
Expand Down Expand Up @@ -60,6 +82,10 @@ impl Params {
.binary_search_by(|probe| probe.type_id().cmp(&type_id))
}

pub fn type_ids<'a>(&'a self) -> impl Iterator<Item = TypeId> + 'a {
self.0.iter().map(|k| *k.type_id())
}

///
/// Given a set of either param type or param value strings: sort, join, and render as one string.
///
Expand Down
16 changes: 8 additions & 8 deletions src/rust/engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,13 @@ use std::path::{Path, PathBuf};
use std::time::Duration;

use context::Core;
use core::{Function, Key, TypeConstraint, TypeId, Value};
use core::{Function, Key, Params, TypeConstraint, TypeId, Value};
use externs::{
Buffer, BufferBuffer, CallExtern, CloneValExtern, CreateExceptionExtern, DropHandlesExtern,
EqualsExtern, EvalExtern, ExternContext, Externs, GeneratorSendExtern, IdentifyExtern, LogExtern,
ProjectIgnoringTypeExtern, ProjectMultiExtern, PyResult, SatisfiedByExtern,
SatisfiedByTypeExtern, StoreBytesExtern, StoreI64Extern, StoreTupleExtern, StoreUtf8Extern,
TypeIdBuffer, TypeToStrExtern, ValToStrExtern,
EqualsExtern, EvalExtern, ExternContext, Externs, GeneratorSendExtern, HandleBuffer,
IdentifyExtern, LogExtern, ProjectIgnoringTypeExtern, ProjectMultiExtern, PyResult,
SatisfiedByExtern, SatisfiedByTypeExtern, StoreBytesExtern, StoreI64Extern, StoreTupleExtern,
StoreUtf8Extern, TypeIdBuffer, TypeToStrExtern, ValToStrExtern,
};
use futures::Future;
use handles::Handle;
Expand Down Expand Up @@ -404,13 +404,13 @@ pub extern "C" fn scheduler_destroy(scheduler_ptr: *mut Scheduler) {
pub extern "C" fn execution_add_root_select(
scheduler_ptr: *mut Scheduler,
execution_request_ptr: *mut ExecutionRequest,
subject: Key,
param_vals: HandleBuffer,
product: TypeConstraint,
) -> PyResult {
with_scheduler(scheduler_ptr, |scheduler| {
with_execution_request(execution_request_ptr, |execution_request| {
scheduler
.add_root_select(execution_request, subject, product)
Params::new(param_vals.to_vec().into_iter().map(externs::key_for))
.and_then(|params| scheduler.add_root_select(execution_request, params, product))
.into()
})
})
Expand Down
Loading