Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create infrastructure for running interactive local process #8495

Merged
merged 11 commits into from
Oct 31, 2019
12 changes: 11 additions & 1 deletion src/python/pants/base/exception_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def signal_handler_mapping(self):
def __init__(self):
self._ignore_sigint_lock = threading.Lock()
self._threads_ignoring_sigint = 0
self._ignoring_sigint_v2_engine = False

def _check_sigint_gate_is_correct(self):
assert self._threads_ignoring_sigint >= 0, \
Expand All @@ -57,9 +58,14 @@ def _handle_sigint_if_enabled(self, signum, _frame):
with self._ignore_sigint_lock:
self._check_sigint_gate_is_correct()
threads_ignoring_sigint = self._threads_ignoring_sigint
if threads_ignoring_sigint == 0:
ignoring_sigint_v2_engine = self._ignoring_sigint_v2_engine
if threads_ignoring_sigint == 0 and not ignoring_sigint_v2_engine:
self.handle_sigint(signum, _frame)

def _toggle_ignoring_sigint_v2_engine(self, toggle: bool):
with self._ignore_sigint_lock:
self._ignoring_sigint_v2_engine = toggle

@contextmanager
def _ignoring_sigint(self):
with self._ignore_sigint_lock:
Expand Down Expand Up @@ -392,6 +398,10 @@ def ignoring_sigint(cls):
with cls._signal_handler._ignoring_sigint():
yield

@classmethod
def toggle_ignoring_sigint_v2_engine(cls, toggle: bool):
cls._signal_handler._toggle_ignoring_sigint_v2_engine(toggle)

@classmethod
def _iso_timestamp_for_now(cls):
return datetime.datetime.now().isoformat()
Expand Down
12 changes: 12 additions & 0 deletions src/python/pants/engine/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,17 @@ python_library(
]
)

python_library(
name='interactive_runner',
sources=['interactive_runner.py'],
dependencies=[
'3rdparty/python:dataclasses',
'src/python/pants/base:exception_sink',
':rules',
':platform',
]
)

python_library(
name='isolated_process',
sources=['isolated_process.py'],
Expand Down Expand Up @@ -208,6 +219,7 @@ python_library(
dependencies=[
':native_engine_shared_library',
':fs',
':interactive_runner',
':isolated_process',
':selectors',
'3rdparty/python:cffi',
Expand Down
33 changes: 33 additions & 0 deletions src/python/pants/engine/interactive_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Copyright 2019 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).

from dataclasses import dataclass
from typing import Any, Tuple

from pants.base.exception_sink import ExceptionSink
from pants.engine.rules import RootRule


@dataclass(frozen=True)
class InteractiveProcessResult:
process_exit_code: int


@dataclass(frozen=True)
class InteractiveProcessRequest:
argv: Tuple[str, ...]
env: Tuple[str, ...] = ()
run_in_workspace: bool = False


@dataclass(frozen=True)
class InteractiveRunner:
_scheduler: Any

def run_local_interactive_process(self, request: InteractiveProcessRequest) -> InteractiveProcessResult:
ExceptionSink.toggle_ignoring_sigint_v2_engine(True)
return self._scheduler.run_local_interactive_process(request)


def create_interactive_runner_rules():
return [RootRule(InteractiveRunner)]
26 changes: 20 additions & 6 deletions src/python/pants/engine/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
Snapshot,
UrlToFetch,
)
from pants.engine.interactive_runner import InteractiveProcessRequest, InteractiveProcessResult
from pants.engine.isolated_process import (
FallibleExecuteProcessResult,
MultiPlatformExecuteProcessRequest,
Expand Down Expand Up @@ -359,7 +360,7 @@ def extern_type_to_str(self, context_handle, type_id):
c = self._ffi.from_handle(context_handle)
return c.utf8_buf(str(c.from_id(type_id.tup_0).__name__))

# If we try to pass a None to the CFFI layer, it will silently fail
# If we try to pass a None to the CFFI layer, it will silently fail
# in a weird way. So instead we use the empty string/bytestring as
# a de-facto null value, in both `extern_val_to_str` and
# `extern_val_to_bytes`.
Expand Down Expand Up @@ -570,6 +571,9 @@ class EngineTypes(NamedTuple):
url_to_fetch: TypeId
string: TypeId
bytes: TypeId
construct_interactive_process_result: Function
interactive_process_request: TypeId
interactive_process_result: TypeId


class PyResult(NamedTuple):
Expand Down Expand Up @@ -692,9 +696,11 @@ class CFFIExternMethodRuntimeErrorInfo(NamedTuple):
When an exception is raised in the body of a CFFI extern, the `onerror` handler is used to
capture it, storing the exception info as an instance of `CFFIExternMethodRuntimeErrorInfo` with
`.add_cffi_extern_method_runtime_exception()`. The scheduler will then check whether any
exceptions were stored by calling `.cffi_extern_method_runtime_exceptions()` after specific
calls to the native library which may raise. `.reset_cffi_extern_method_runtime_exceptions()`
should be called after the stored exception has been handled or before it is re-raised.
exceptions were stored by calling `.consume_cffi_extern_method_runtime_exceptions()` after
specific calls to the native library which may raise.
Note that `.consume_cffi_extern_method_runtime_exceptions()` will also clear out all stored
exceptions, so exceptions should be stored separately after consumption.
Some ways that exceptions in CFFI extern methods can be handled are described in
https://cffi.readthedocs.io/en/latest/using.html#extern-python-reference.
Expand All @@ -706,9 +712,14 @@ class CFFIExternMethodRuntimeErrorInfo(NamedTuple):
def reset_cffi_extern_method_runtime_exceptions(self):
self._errors_during_execution = []

def cffi_extern_method_runtime_exceptions(self):
def _peek_cffi_extern_method_runtime_exceptions(self):
return self._errors_during_execution

def consume_cffi_extern_method_runtime_exceptions(self):
res = self._peek_cffi_extern_method_runtime_exceptions()
self.reset_cffi_extern_method_runtime_exceptions()
return res

def add_cffi_extern_method_runtime_exception(self, error_info):
assert isinstance(error_info, self.CFFIExternMethodRuntimeErrorInfo)
self._errors_during_execution.append(error_info)
Expand Down Expand Up @@ -907,7 +918,10 @@ def ti(type_obj):
url_to_fetch=ti(UrlToFetch),
string=ti(str),
bytes=ti(bytes),
)
construct_interactive_process_result=func(InteractiveProcessResult),
interactive_process_request=ti(InteractiveProcessRequest),
interactive_process_result=ti(InteractiveProcessResult),
)

scheduler_result = self.lib.scheduler_create(
tasks,
Expand Down
37 changes: 31 additions & 6 deletions src/python/pants/engine/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
from textwrap import dedent
from typing import Any

from pants.base.exception_sink import ExceptionSink
from pants.base.exiter import PANTS_FAILED_EXIT_CODE
from pants.engine.fs import Digest, DirectoryToMaterialize, PathGlobsAndRoot
from pants.engine.interactive_runner import InteractiveProcessRequest, InteractiveProcessResult
from pants.engine.native import Function, TypeId
from pants.engine.nodes import Return, Throw
from pants.engine.objects import Collection
Expand Down Expand Up @@ -252,16 +254,32 @@ def with_fork_context(self, func):

def _run_and_return_roots(self, session, execution_request):
raw_roots = self._native.lib.scheduler_execute(self._scheduler, session, execution_request)
remaining_runtime_exceptions_to_capture = list(self._native.consume_cffi_extern_method_runtime_exceptions())
try:
roots = []
for raw_root in self._native.unpack(raw_roots.nodes_ptr, raw_roots.nodes_len):
# Check if there were any uncaught exceptions within rules that were executed.
remaining_runtime_exceptions_to_capture.extend(self._native.consume_cffi_extern_method_runtime_exceptions())

if raw_root.is_throw:
state = Throw(self._from_value(raw_root.handle))
elif raw_root.handle == self._native.ffi.NULL:
# NB: We expect all NULL handles to correspond to uncaught exceptions which are collected
# in `self._native._peek_cffi_extern_method_runtime_exceptions()`!
if not remaining_runtime_exceptions_to_capture:
raise ExecutionError('Internal logic error in scheduler: expected more elements in '
'`self._native._peek_cffi_extern_method_runtime_exceptions()`.')
matching_runtime_exception = remaining_runtime_exceptions_to_capture.pop(0)
state = Throw(matching_runtime_exception)
else:
state = Return(self._from_value(raw_root.handle))
roots.append(state)
finally:
self._native.lib.nodes_destroy(raw_roots)

if remaining_runtime_exceptions_to_capture:
raise ExecutionError('Internal logic error in scheduler: expected elements in '
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message here appears to be the negation of the if condition?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is coming from one of the commits @cosmicexplorer had in his currently-open-PR that I rebased off of becuase the bug he fixed was affecting me. I'm not sure if makes sense to try to get that merged and then merge this PR after that, or just merge this one with those commits.

'`self._native._peek_cffi_extern_method_runtime_exceptions()`.')
return roots

def lease_files_in_graph(self):
Expand Down Expand Up @@ -384,6 +402,8 @@ def execute(self, execution_request):
roots = list(zip(execution_request.roots,
self._scheduler._run_and_return_roots(self._session, execution_request.native)))

ExceptionSink.toggle_ignoring_sigint_v2_engine(False)

self._maybe_visualize()

logger.debug(
Expand Down Expand Up @@ -444,7 +464,7 @@ def product_request(self, product, subjects):
except: # noqa: T803
# If there are any exceptions during CFFI extern method calls, we want to return an error with
# them and whatever failure results from it. This typically results from unhashable types.
if self._scheduler._native.cffi_extern_method_runtime_exceptions():
if self._scheduler._native._peek_cffi_extern_method_runtime_exceptions():
raised_exception = sys.exc_info()[0:3]
else:
# Otherwise, this is likely an exception coming from somewhere else, and we don't want to
Expand All @@ -454,7 +474,7 @@ def product_request(self, product, subjects):
# We still want to raise whenever there are any exceptions in any CFFI extern methods, even if
# that didn't lead to an exception in generating the execution request for some reason, so we
# check the extern exceptions list again.
internal_errors = self._scheduler._native.cffi_extern_method_runtime_exceptions()
internal_errors = self._scheduler._native.consume_cffi_extern_method_runtime_exceptions()
if internal_errors:
error_tracebacks = [
''.join(
Expand All @@ -473,10 +493,6 @@ def product_request(self, product, subjects):
{}
""").format(''.join(traceback.format_exception(etype=exc_type, value=exc_value, tb=tb)))

# Zero out the errors raised in CFFI callbacks in case this one is caught and pants doesn't
# exit.
self._scheduler._native.reset_cffi_extern_method_runtime_exceptions()

raise ExecutionError(dedent("""\
{error_description} raised in CFFI extern methods:
{joined_tracebacks}{raised_exception_message}
Expand Down Expand Up @@ -527,6 +543,15 @@ def merge_directories(self, directory_digests):
)
return self._scheduler._raise_or_return(result)

def run_local_interactive_process(self, request: InteractiveProcessRequest) -> InteractiveProcessResult:
sched_pointer = self._scheduler._scheduler

result = self._scheduler._native.lib.run_local_interactive_process(
sched_pointer,
self._scheduler._to_value(request)
)
return self._scheduler._raise_or_return(result)

def materialize_directories(self, directories_paths_and_digests):
"""Creates the specified directories on the file system.
:param directories_paths_and_digests tuple<DirectoryToMaterialize>: Tuple of the path and
Expand Down
6 changes: 5 additions & 1 deletion src/python/pants/init/engine_initializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from pants.engine.console import Console
from pants.engine.fs import Workspace, create_fs_rules
from pants.engine.goal import Goal
from pants.engine.interactive_runner import InteractiveRunner, create_interactive_runner_rules
from pants.engine.isolated_process import create_process_rules
from pants.engine.legacy.address_mapper import LegacyAddressMapper
from pants.engine.legacy.graph import LegacyBuildGraph, create_legacy_graph_tasks
Expand Down Expand Up @@ -203,10 +204,12 @@ def run_console_rules(self, options_bootstrapper, goals, target_roots):
use_colors=options_bootstrapper.bootstrap_options.for_global_scope().colors
)
workspace = Workspace(self.scheduler_session)
interactive_runner = InteractiveRunner(self.scheduler_session)


for goal in goals:
goal_product = self.goal_map[goal]
params = Params(subject, options_bootstrapper, console, workspace)
params = Params(subject, options_bootstrapper, console, workspace, interactive_runner)
logger.debug(f'requesting {goal_product} to satisfy execution of `{goal}` goal')
try:
exit_code = self.scheduler_session.run_console_rule(goal_product, params)
Expand Down Expand Up @@ -375,6 +378,7 @@ def union_membership_singleton() -> UnionMembership:
] +
create_legacy_graph_tasks() +
create_fs_rules() +
create_interactive_runner_rules() +
create_process_rules() +
create_platform_rules() +
create_graph_rules(address_mapper) +
Expand Down
1 change: 1 addition & 0 deletions src/python/pants/rules/core/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ python_library(
'src/python/pants/build_graph',
'src/python/pants/engine:goal',
'src/python/pants/engine:rules',
'src/python/pants/engine:interactive_runner',
'src/python/pants/engine:selectors',
'src/python/pants/engine:addressable',
'src/python/pants/engine:console',
Expand Down
2 changes: 2 additions & 0 deletions src/python/pants/rules/core/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
lint,
list_roots,
list_targets,
run,
strip_source_root,
test,
)
Expand All @@ -21,6 +22,7 @@ def rules():
*list_roots.rules(),
*list_targets.rules(),
*filedeps.rules(),
*run.rules(),
*strip_source_root.rules(),
*test.rules()
]
36 changes: 36 additions & 0 deletions src/python/pants/rules/core/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Copyright 2019 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).

from pants.engine.addressable import BuildFileAddresses
from pants.engine.console import Console
from pants.engine.goal import Goal
from pants.engine.interactive_runner import InteractiveProcessRequest, InteractiveRunner
from pants.engine.rules import console_rule


class Run(Goal):
"""Runs a runnable target."""
name = 'v2-run'


@console_rule
def run(console: Console, runner: InteractiveRunner, build_file_addresses: BuildFileAddresses) -> Run:
console.write_stdout("Running the `run` goal\n")

request = InteractiveProcessRequest(
argv=["/usr/bin/python"],
env=("TEST_ENV", "TEST"),
run_in_workspace=False,
)

try:
res = runner.run_local_interactive_process(request)
print(f"Subprocess exited with result: {res.process_exit_code}")
yield Run(res.process_exit_code)
except Exception as e:
print(f"Exception when running local interactive process: {e}")
yield Run(-1)


def rules():
return [run]
1 change: 1 addition & 0 deletions src/rust/engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/rust/engine/engine_cffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ logging = { path = "../logging" }
rule_graph = { path = "../rule_graph" }
store = { path = "../fs/store" }
tar_api = { path = "../tar_api" }
tempfile = "3"
workunit_store = { path = "../workunit_store" }

[build-dependencies]
Expand Down
Loading