Skip to content

Commit

Permalink
Create infrastructure for running interactive local process (#8495)
Browse files Browse the repository at this point in the history
## Problem

pants run and pants repl (and perhaps other goals) have as their "output" the actual execution of a program, locally and interactively, which the current ExecuteProcessRequest infrastructure isn't designed to handle well.

## Solution

This commit introduces a new type InteractiveRunner, which works like Console and Workspace, in that it can be requested only by a @console_rule and offers a blocking method to kick off an interactive subprocess, that takes control of the terminal.

This commit also happens to include the stub of a v2 run @console_rule (called, currently, v2-run), which is a stub that just runs /usr/bin/python and was only included to test the rule - eventually this will become a full-featured v2 run goal.
  • Loading branch information
gshuflin authored Oct 31, 2019
1 parent 0e15a34 commit 2bdb223
Show file tree
Hide file tree
Showing 16 changed files with 225 additions and 26 deletions.
12 changes: 11 additions & 1 deletion src/python/pants/base/exception_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def signal_handler_mapping(self):
def __init__(self):
self._ignore_sigint_lock = threading.Lock()
self._threads_ignoring_sigint = 0
self._ignoring_sigint_v2_engine = False

def _check_sigint_gate_is_correct(self):
assert self._threads_ignoring_sigint >= 0, \
Expand All @@ -57,9 +58,14 @@ def _handle_sigint_if_enabled(self, signum, _frame):
with self._ignore_sigint_lock:
self._check_sigint_gate_is_correct()
threads_ignoring_sigint = self._threads_ignoring_sigint
if threads_ignoring_sigint == 0:
ignoring_sigint_v2_engine = self._ignoring_sigint_v2_engine
if threads_ignoring_sigint == 0 and not ignoring_sigint_v2_engine:
self.handle_sigint(signum, _frame)

def _toggle_ignoring_sigint_v2_engine(self, toggle: bool):
with self._ignore_sigint_lock:
self._ignoring_sigint_v2_engine = toggle

@contextmanager
def _ignoring_sigint(self):
with self._ignore_sigint_lock:
Expand Down Expand Up @@ -392,6 +398,10 @@ def ignoring_sigint(cls):
with cls._signal_handler._ignoring_sigint():
yield

@classmethod
def toggle_ignoring_sigint_v2_engine(cls, toggle: bool):
cls._signal_handler._toggle_ignoring_sigint_v2_engine(toggle)

@classmethod
def _iso_timestamp_for_now(cls):
return datetime.datetime.now().isoformat()
Expand Down
12 changes: 12 additions & 0 deletions src/python/pants/engine/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,17 @@ python_library(
]
)

python_library(
name='interactive_runner',
sources=['interactive_runner.py'],
dependencies=[
'3rdparty/python:dataclasses',
'src/python/pants/base:exception_sink',
':rules',
':platform',
]
)

python_library(
name='isolated_process',
sources=['isolated_process.py'],
Expand Down Expand Up @@ -208,6 +219,7 @@ python_library(
dependencies=[
':native_engine_shared_library',
':fs',
':interactive_runner',
':isolated_process',
':selectors',
'3rdparty/python:cffi',
Expand Down
33 changes: 33 additions & 0 deletions src/python/pants/engine/interactive_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Copyright 2019 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).

from dataclasses import dataclass
from typing import Any, Tuple

from pants.base.exception_sink import ExceptionSink
from pants.engine.rules import RootRule


@dataclass(frozen=True)
class InteractiveProcessResult:
process_exit_code: int


@dataclass(frozen=True)
class InteractiveProcessRequest:
argv: Tuple[str, ...]
env: Tuple[str, ...] = ()
run_in_workspace: bool = False


@dataclass(frozen=True)
class InteractiveRunner:
_scheduler: Any

def run_local_interactive_process(self, request: InteractiveProcessRequest) -> InteractiveProcessResult:
ExceptionSink.toggle_ignoring_sigint_v2_engine(True)
return self._scheduler.run_local_interactive_process(request)


def create_interactive_runner_rules():
return [RootRule(InteractiveRunner)]
26 changes: 20 additions & 6 deletions src/python/pants/engine/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
Snapshot,
UrlToFetch,
)
from pants.engine.interactive_runner import InteractiveProcessRequest, InteractiveProcessResult
from pants.engine.isolated_process import (
FallibleExecuteProcessResult,
MultiPlatformExecuteProcessRequest,
Expand Down Expand Up @@ -359,7 +360,7 @@ def extern_type_to_str(self, context_handle, type_id):
c = self._ffi.from_handle(context_handle)
return c.utf8_buf(str(c.from_id(type_id.tup_0).__name__))

# If we try to pass a None to the CFFI layer, it will silently fail
# If we try to pass a None to the CFFI layer, it will silently fail
# in a weird way. So instead we use the empty string/bytestring as
# a de-facto null value, in both `extern_val_to_str` and
# `extern_val_to_bytes`.
Expand Down Expand Up @@ -570,6 +571,9 @@ class EngineTypes(NamedTuple):
url_to_fetch: TypeId
string: TypeId
bytes: TypeId
construct_interactive_process_result: Function
interactive_process_request: TypeId
interactive_process_result: TypeId


class PyResult(NamedTuple):
Expand Down Expand Up @@ -692,9 +696,11 @@ class CFFIExternMethodRuntimeErrorInfo(NamedTuple):
When an exception is raised in the body of a CFFI extern, the `onerror` handler is used to
capture it, storing the exception info as an instance of `CFFIExternMethodRuntimeErrorInfo` with
`.add_cffi_extern_method_runtime_exception()`. The scheduler will then check whether any
exceptions were stored by calling `.cffi_extern_method_runtime_exceptions()` after specific
calls to the native library which may raise. `.reset_cffi_extern_method_runtime_exceptions()`
should be called after the stored exception has been handled or before it is re-raised.
exceptions were stored by calling `.consume_cffi_extern_method_runtime_exceptions()` after
specific calls to the native library which may raise.
Note that `.consume_cffi_extern_method_runtime_exceptions()` will also clear out all stored
exceptions, so exceptions should be stored separately after consumption.
Some ways that exceptions in CFFI extern methods can be handled are described in
https://cffi.readthedocs.io/en/latest/using.html#extern-python-reference.
Expand All @@ -706,9 +712,14 @@ class CFFIExternMethodRuntimeErrorInfo(NamedTuple):
def reset_cffi_extern_method_runtime_exceptions(self):
self._errors_during_execution = []

def cffi_extern_method_runtime_exceptions(self):
def _peek_cffi_extern_method_runtime_exceptions(self):
return self._errors_during_execution

def consume_cffi_extern_method_runtime_exceptions(self):
res = self._peek_cffi_extern_method_runtime_exceptions()
self.reset_cffi_extern_method_runtime_exceptions()
return res

def add_cffi_extern_method_runtime_exception(self, error_info):
assert isinstance(error_info, self.CFFIExternMethodRuntimeErrorInfo)
self._errors_during_execution.append(error_info)
Expand Down Expand Up @@ -907,7 +918,10 @@ def ti(type_obj):
url_to_fetch=ti(UrlToFetch),
string=ti(str),
bytes=ti(bytes),
)
construct_interactive_process_result=func(InteractiveProcessResult),
interactive_process_request=ti(InteractiveProcessRequest),
interactive_process_result=ti(InteractiveProcessResult),
)

scheduler_result = self.lib.scheduler_create(
tasks,
Expand Down
37 changes: 31 additions & 6 deletions src/python/pants/engine/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
from textwrap import dedent
from typing import Any

from pants.base.exception_sink import ExceptionSink
from pants.base.exiter import PANTS_FAILED_EXIT_CODE
from pants.engine.fs import Digest, DirectoryToMaterialize, PathGlobsAndRoot
from pants.engine.interactive_runner import InteractiveProcessRequest, InteractiveProcessResult
from pants.engine.native import Function, TypeId
from pants.engine.nodes import Return, Throw
from pants.engine.objects import Collection
Expand Down Expand Up @@ -252,16 +254,32 @@ def with_fork_context(self, func):

def _run_and_return_roots(self, session, execution_request):
raw_roots = self._native.lib.scheduler_execute(self._scheduler, session, execution_request)
remaining_runtime_exceptions_to_capture = list(self._native.consume_cffi_extern_method_runtime_exceptions())
try:
roots = []
for raw_root in self._native.unpack(raw_roots.nodes_ptr, raw_roots.nodes_len):
# Check if there were any uncaught exceptions within rules that were executed.
remaining_runtime_exceptions_to_capture.extend(self._native.consume_cffi_extern_method_runtime_exceptions())

if raw_root.is_throw:
state = Throw(self._from_value(raw_root.handle))
elif raw_root.handle == self._native.ffi.NULL:
# NB: We expect all NULL handles to correspond to uncaught exceptions which are collected
# in `self._native._peek_cffi_extern_method_runtime_exceptions()`!
if not remaining_runtime_exceptions_to_capture:
raise ExecutionError('Internal logic error in scheduler: expected more elements in '
'`self._native._peek_cffi_extern_method_runtime_exceptions()`.')
matching_runtime_exception = remaining_runtime_exceptions_to_capture.pop(0)
state = Throw(matching_runtime_exception)
else:
state = Return(self._from_value(raw_root.handle))
roots.append(state)
finally:
self._native.lib.nodes_destroy(raw_roots)

if remaining_runtime_exceptions_to_capture:
raise ExecutionError('Internal logic error in scheduler: expected elements in '
'`self._native._peek_cffi_extern_method_runtime_exceptions()`.')
return roots

def lease_files_in_graph(self):
Expand Down Expand Up @@ -384,6 +402,8 @@ def execute(self, execution_request):
roots = list(zip(execution_request.roots,
self._scheduler._run_and_return_roots(self._session, execution_request.native)))

ExceptionSink.toggle_ignoring_sigint_v2_engine(False)

self._maybe_visualize()

logger.debug(
Expand Down Expand Up @@ -444,7 +464,7 @@ def product_request(self, product, subjects):
except: # noqa: T803
# If there are any exceptions during CFFI extern method calls, we want to return an error with
# them and whatever failure results from it. This typically results from unhashable types.
if self._scheduler._native.cffi_extern_method_runtime_exceptions():
if self._scheduler._native._peek_cffi_extern_method_runtime_exceptions():
raised_exception = sys.exc_info()[0:3]
else:
# Otherwise, this is likely an exception coming from somewhere else, and we don't want to
Expand All @@ -454,7 +474,7 @@ def product_request(self, product, subjects):
# We still want to raise whenever there are any exceptions in any CFFI extern methods, even if
# that didn't lead to an exception in generating the execution request for some reason, so we
# check the extern exceptions list again.
internal_errors = self._scheduler._native.cffi_extern_method_runtime_exceptions()
internal_errors = self._scheduler._native.consume_cffi_extern_method_runtime_exceptions()
if internal_errors:
error_tracebacks = [
''.join(
Expand All @@ -473,10 +493,6 @@ def product_request(self, product, subjects):
{}
""").format(''.join(traceback.format_exception(etype=exc_type, value=exc_value, tb=tb)))

# Zero out the errors raised in CFFI callbacks in case this one is caught and pants doesn't
# exit.
self._scheduler._native.reset_cffi_extern_method_runtime_exceptions()

raise ExecutionError(dedent("""\
{error_description} raised in CFFI extern methods:
{joined_tracebacks}{raised_exception_message}
Expand Down Expand Up @@ -527,6 +543,15 @@ def merge_directories(self, directory_digests):
)
return self._scheduler._raise_or_return(result)

def run_local_interactive_process(self, request: InteractiveProcessRequest) -> InteractiveProcessResult:
sched_pointer = self._scheduler._scheduler

result = self._scheduler._native.lib.run_local_interactive_process(
sched_pointer,
self._scheduler._to_value(request)
)
return self._scheduler._raise_or_return(result)

def materialize_directories(self, directories_paths_and_digests):
"""Creates the specified directories on the file system.
:param directories_paths_and_digests tuple<DirectoryToMaterialize>: Tuple of the path and
Expand Down
6 changes: 5 additions & 1 deletion src/python/pants/init/engine_initializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from pants.engine.console import Console
from pants.engine.fs import Workspace, create_fs_rules
from pants.engine.goal import Goal
from pants.engine.interactive_runner import InteractiveRunner, create_interactive_runner_rules
from pants.engine.isolated_process import create_process_rules
from pants.engine.legacy.address_mapper import LegacyAddressMapper
from pants.engine.legacy.graph import LegacyBuildGraph, create_legacy_graph_tasks
Expand Down Expand Up @@ -203,10 +204,12 @@ def run_console_rules(self, options_bootstrapper, goals, target_roots):
use_colors=options_bootstrapper.bootstrap_options.for_global_scope().colors
)
workspace = Workspace(self.scheduler_session)
interactive_runner = InteractiveRunner(self.scheduler_session)


for goal in goals:
goal_product = self.goal_map[goal]
params = Params(subject, options_bootstrapper, console, workspace)
params = Params(subject, options_bootstrapper, console, workspace, interactive_runner)
logger.debug(f'requesting {goal_product} to satisfy execution of `{goal}` goal')
try:
exit_code = self.scheduler_session.run_console_rule(goal_product, params)
Expand Down Expand Up @@ -375,6 +378,7 @@ def union_membership_singleton() -> UnionMembership:
] +
create_legacy_graph_tasks() +
create_fs_rules() +
create_interactive_runner_rules() +
create_process_rules() +
create_platform_rules() +
create_graph_rules(address_mapper) +
Expand Down
1 change: 1 addition & 0 deletions src/python/pants/rules/core/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ python_library(
'src/python/pants/build_graph',
'src/python/pants/engine:goal',
'src/python/pants/engine:rules',
'src/python/pants/engine:interactive_runner',
'src/python/pants/engine:selectors',
'src/python/pants/engine:addressable',
'src/python/pants/engine:console',
Expand Down
2 changes: 2 additions & 0 deletions src/python/pants/rules/core/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
lint,
list_roots,
list_targets,
run,
strip_source_root,
test,
)
Expand All @@ -21,6 +22,7 @@ def rules():
*list_roots.rules(),
*list_targets.rules(),
*filedeps.rules(),
*run.rules(),
*strip_source_root.rules(),
*test.rules()
]
36 changes: 36 additions & 0 deletions src/python/pants/rules/core/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Copyright 2019 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).

from pants.engine.addressable import BuildFileAddresses
from pants.engine.console import Console
from pants.engine.goal import Goal
from pants.engine.interactive_runner import InteractiveProcessRequest, InteractiveRunner
from pants.engine.rules import console_rule


class Run(Goal):
"""Runs a runnable target."""
name = 'v2-run'


@console_rule
def run(console: Console, runner: InteractiveRunner, build_file_addresses: BuildFileAddresses) -> Run:
console.write_stdout("Running the `run` goal\n")

request = InteractiveProcessRequest(
argv=["/usr/bin/python"],
env=("TEST_ENV", "TEST"),
run_in_workspace=False,
)

try:
res = runner.run_local_interactive_process(request)
print(f"Subprocess exited with result: {res.process_exit_code}")
yield Run(res.process_exit_code)
except Exception as e:
print(f"Exception when running local interactive process: {e}")
yield Run(-1)


def rules():
return [run]
1 change: 1 addition & 0 deletions src/rust/engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/rust/engine/engine_cffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ logging = { path = "../logging" }
rule_graph = { path = "../rule_graph" }
store = { path = "../fs/store" }
tar_api = { path = "../tar_api" }
tempfile = "3"
workunit_store = { path = "../workunit_store" }

[build-dependencies]
Expand Down
Loading

0 comments on commit 2bdb223

Please sign in to comment.