Skip to content

Commit

Permalink
Support partial parsing (#800)
Browse files Browse the repository at this point in the history
## Description

dbt uses `partial_parse.msgpack` to make rendering things a lot faster.
This PR adds support for `partial_parse.msgpack` in the following
places:

- `ExecutionMode.LOCAL`
- `ExecutionMode.VIRTUALENV`
- `LoadMode.DBT_LS`

This PR also allows users to explicitly _turn off_ partial parsing. If
this is done, then `--no-partial-parse` will be passed as an explicit
flag in all dbt command invocations (i.e. all `ExecutionMode`s and
`LoadMode.DBT_LS`, albeit not the `dbt deps` invocation.)

This should address some performance complaints that users have, e.g.
this message from Slack:
https://apache-airflow.slack.com/archives/C059CC42E9W/p1704483361206829
Albeit, this user will also need to provide a `partial_parse.msgpack`.

My experimentation and looking at dbt-core source code confirms that dbt
does not use `manifest.json` when partial parsing. It appears that these
are just output artifacts, but not input artifacts. Only
`partial_parse.msgpack` is used. (There are a couple ways to confirm
this other than just checking source code

Also, I added a minor refactor of a feature I added a year ago: I added
`send_sigint()` to the custom subprocess hook, since this custom
subprocess hook did not exist back when I added it (if you want me to
split this refactor into a different PR then let me know).

### API change

I decided the best way to go about this would be to just add a
`partial_parse: bool` to both the execution config and render config.
For example:

```python
dbt_group = DbtTaskGroup(
    ...,
    execution_config=ExecutionConfig(
        ...,
        partial_parse=True
    ),
    render_config=RenderConfig(
        ...,
        partial_parse=False
    )
)
```

That said, in all honesty users will not need to set this at all, except
unless they want to suppress the little warning message about not being
able to find the `partial_parse.msgpack`. This is because by default
this addition searches for a msgpack if it exists, which is already the
existing behavior in a sense, except right now the msgpack file never
exists (dbt does look for it though).

When inserting into the `AbstractDbtBaseOperator`, I did not use
`global_boolean_flags`. See the subsection below for why.

### Other execution performance improvements

The main reason I am adding this feature is that it should dramatically
improve performance for users. However, it is not the only way to
improve

It's possible that we should add a way to add the flag `--no-write-json`
as an explicit kwarg to the dbt base operator. Right now users can
support this via `dbt_cmd_global_flags=["--no-write-json"]`. Some users
(e.g. those using Elementary, or those using the dbt local operator
`callback` kwarg) will want to write the JSON, but I suspect the
majority of users will not. Similarly, `--log-level-file` is not used at
all, and at minimum dbt should work best the vast majority of time with
`--no-cache-selected-only` set.

It's also possible there should be some sort of "performant" mode that
automatically sets all these defaults for optimal performance:

- `--no-write-json`
- `--log-level-file=none`
- `--no-cache-selected-only`

Perhaps a "performant" config would be too cumbersome to implement (I
would agree with that). In which case the docs could also have a section
on performance tips.

### A note on `global_boolean_flags`

I did not add the partial parse support to `global_boolean_flags`
because it doesn't quite fit into the existing paradigm for this. Right
now the default for each of these `global_boolean_flags` is False,
whereas the default for partial parse is actually True. This makes
fitting it in awkward.

I think it's possible that just having a `tuple[str]` is insufficient
here, as some flags you may want to add (not just `--no-partial-parse`
but also `--no-write-json` are by default _True_ and must be explicitly
turned off. Meaning that the parsing Cosmos does with flags of
`'--{flag.replace("_", "-")}'` is ineffective for flags like this.

Right now, we have an example of putting _no_ in front with
`no_version_check`. Meaning that the default behavior of version
checking is True, but the flag itself starts as negated so the default
is actually `False`.

My proposal is, instead of `global_boolean_flags: tuple[str]`, this
should instead be `global_boolean_flags: tuple[str | tuple[str, str |
None, str | None]]`. In the case that a global flag is a `tuple[str, str
| None, str | None]`, then the first arg should be the flag, the second
should be "if true," and the third should be "if false." `None`
indicates, when true/false (respectively), then do nothing.

For example:

```python
class AbstractDbtBaseOperator(BaseOperator, metaclass=ABCMeta):
    ...
    global_boolean_flags = (
        ("no_version_check", "--no-version-check", None),
        ("cache_selected_only", "-cache-selected-only", None),
        ("partial_parse", None, "--no-partial-parse"),
    )

```

And Cosmos want to support `str` parsing for backwards compatibility.
It's pretty straightforward to convert the data type:

```python
if isinstance(flag, str):
    flag = (flag, '--{flag.replace("_", "-")}', None)
```

## Related Issue(s)

- Resolves #791
- Partially resolves #785
- #785 should probably be split up into two different stages: (1)
support for partial parsing (2) (a) dbt project dir / manifest /
`partial_parse.msgpack` is allowed to come from cloud storage. (b) `dbt
compile` is able to dump into cloud storage.

## Breaking Change?

Should not break anything. This doesn't do anything when
`partial_parse.msgpack` is missing, and the default behavior
(`partial_parse=True`) does not alter the dbt cmd flags.

## Checklist

- [x] I have made corresponding changes to the documentation (if
required)
- [x] I have added tests that prove my fix is effective or that my
feature works

---------

Co-authored-by: Tatiana Al-Chueyr <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Julian LaNeve <[email protected]>
Co-authored-by: Justin Bandoro <[email protected]>
  • Loading branch information
5 people authored Feb 19, 2024
1 parent 9af7067 commit 39acd4c
Show file tree
Hide file tree
Showing 15 changed files with 227 additions and 20 deletions.
7 changes: 6 additions & 1 deletion cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ class ProjectConfig:
:param dbt_vars: Dictionary of dbt variables for the project. This argument overrides variables defined in your dbt_project.yml
file. The dictionary is dumped to a yaml string and passed to dbt commands as the --vars argument. Variables are only
supported for rendering when using ``RenderConfig.LoadMode.DBT_LS`` and ``RenderConfig.LoadMode.CUSTOM`` load mode.
:param partial_parse: If True, then attempt to use the ``partial_parse.msgpack`` if it exists. This is only used
for the ``LoadMode.DBT_LS`` load mode, and for the ``ExecutionMode.LOCAL`` and ``ExecutionMode.VIRTUALENV``
execution modes.
"""

dbt_project_path: Path | None = None
Expand All @@ -141,6 +144,7 @@ def __init__(
project_name: str | None = None,
env_vars: dict[str, str] | None = None,
dbt_vars: dict[str, str] | None = None,
partial_parse: bool = True,
):
# Since we allow dbt_project_path to be defined in ExecutionConfig and RenderConfig
# dbt_project_path may not always be defined here.
Expand All @@ -166,6 +170,7 @@ def __init__(

self.env_vars = env_vars
self.dbt_vars = dbt_vars
self.partial_parse = partial_parse

def validate_project(self) -> None:
"""
Expand Down Expand Up @@ -292,7 +297,7 @@ class ExecutionConfig:
:param execution_mode: The execution mode for dbt. Defaults to local
:param test_indirect_selection: The mode to configure the test behavior when performing indirect selection.
:param dbt_executable_path: The path to the dbt executable for runtime execution. Defaults to dbt if available on the path.
:param dbt_project_path Configures the DBT project location accessible at runtime for dag execution. This is the project path in a docker container for ExecutionMode.DOCKER or ExecutionMode.KUBERNETES. Mutually Exclusive with ProjectConfig.dbt_project_path
:param dbt_project_path: Configures the DBT project location accessible at runtime for dag execution. This is the project path in a docker container for ExecutionMode.DOCKER or ExecutionMode.KUBERNETES. Mutually Exclusive with ProjectConfig.dbt_project_path
"""

execution_mode: ExecutionMode = ExecutionMode.LOCAL
Expand Down
1 change: 1 addition & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
DBT_LOG_DIR_NAME = "logs"
DBT_TARGET_PATH_ENVVAR = "DBT_TARGET_PATH"
DBT_TARGET_DIR_NAME = "target"
DBT_PARTIAL_PARSE_FILE_NAME = "partial_parse.msgpack"
DBT_LOG_FILENAME = "dbt.log"
DBT_BINARY_NAME = "dbt"

Expand Down
1 change: 1 addition & 0 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ def __init__(
task_args = {
**operator_args,
"project_dir": execution_config.project_path,
"partial_parse": project_config.partial_parse,
"profile_config": profile_config,
"emit_datasets": render_config.emit_datasets,
"env": env_vars,
Expand Down
8 changes: 7 additions & 1 deletion cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
LoadMode,
)
from cosmos.dbt.parser.project import LegacyDbtProject
from cosmos.dbt.project import create_symlinks, environ
from cosmos.dbt.project import create_symlinks, copy_msgpack_for_partial_parse, environ
from cosmos.dbt.selector import select_nodes
from cosmos.log import get_logger

Expand Down Expand Up @@ -204,6 +204,9 @@ def run_dbt_ls(
if self.render_config.selector:
ls_command.extend(["--selector", self.render_config.selector])

if not self.project.partial_parse:
ls_command.append("--no-partial-parse")

ls_command.extend(self.local_flags)

stdout = run_command(ls_command, tmp_dir, env_vars)
Expand Down Expand Up @@ -248,6 +251,9 @@ def load_via_dbt_ls(self) -> None:
tmpdir_path = Path(tmpdir)
create_symlinks(self.render_config.project_path, tmpdir_path, self.render_config.dbt_deps)

if self.project.partial_parse:
copy_msgpack_for_partial_parse(self.render_config.project_path, tmpdir_path)

with self.profile_config.ensure_profile(use_mock_values=True) as profile_values, environ(
self.project.env_vars or self.render_config.env_vars or {}
):
Expand Down
16 changes: 12 additions & 4 deletions cosmos/dbt/project.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
from __future__ import annotations

from pathlib import Path
import shutil
import os
from cosmos.constants import (
DBT_LOG_DIR_NAME,
DBT_TARGET_DIR_NAME,
)
from cosmos.constants import DBT_LOG_DIR_NAME, DBT_TARGET_DIR_NAME, DBT_PARTIAL_PARSE_FILE_NAME
from contextlib import contextmanager
from typing import Generator

Expand All @@ -21,6 +19,16 @@ def create_symlinks(project_path: Path, tmp_dir: Path, ignore_dbt_packages: bool
os.symlink(project_path / child_name, tmp_dir / child_name)


def copy_msgpack_for_partial_parse(project_path: Path, tmp_dir: Path) -> None:
partial_parse_file = Path(project_path) / DBT_TARGET_DIR_NAME / DBT_PARTIAL_PARSE_FILE_NAME

if partial_parse_file.exists():
tmp_target_dir = tmp_dir / DBT_TARGET_DIR_NAME
tmp_target_dir.mkdir(exist_ok=True)

shutil.copy(str(partial_parse_file), str(tmp_target_dir / DBT_PARTIAL_PARSE_FILE_NAME))


@contextmanager
def environ(env_vars: dict[str, str]) -> Generator[None, None, None]:
"""Temporarily set environment variables inside the context manager and restore
Expand Down
6 changes: 6 additions & 0 deletions cosmos/hooks/subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,9 @@ def send_sigterm(self) -> None:
logger.info("Sending SIGTERM signal to process group")
if self.sub_process and hasattr(self.sub_process, "pid"):
os.killpg(os.getpgid(self.sub_process.pid), signal.SIGTERM)

def send_sigint(self) -> None:
"""Sends SIGINT signal to ``self.sub_process`` if one exists."""
logger.info("Sending SIGINT signal to process group")
if self.sub_process and hasattr(self.sub_process, "pid"):
os.killpg(os.getpgid(self.sub_process.pid), signal.SIGINT)
16 changes: 9 additions & 7 deletions cosmos/operators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ class AbstractDbtBaseOperator(BaseOperator, metaclass=ABCMeta):
:param skip_exit_code: If task exits with this exit code, leave the task
in ``skipped`` state (default: 99). If set to ``None``, any non-zero
exit code will be treated as a failure.
:param partial_parse: If True (default), then the operator will use the
``partial_parse.msgpack`` during execution if it exists. If False, then
a flag will be explicitly set to turn off partial parsing.
:param cancel_query_on_kill: If true, then cancel any running queries when the task's on_kill() is executed.
Otherwise, the query will keep running when the task is killed.
:param dbt_executable_path: Path to dbt executable can be used with venv
Expand All @@ -68,13 +71,7 @@ class AbstractDbtBaseOperator(BaseOperator, metaclass=ABCMeta):
"vars",
"models",
)
global_boolean_flags = (
"no_version_check",
"cache_selected_only",
"fail_fast",
"quiet",
"warn_error",
)
global_boolean_flags = ("no_version_check", "cache_selected_only", "fail_fast", "quiet", "warn_error")

intercept_flag = True

Expand Down Expand Up @@ -105,6 +102,7 @@ def __init__(
append_env: bool = False,
output_encoding: str = "utf-8",
skip_exit_code: int = 99,
partial_parse: bool = True,
cancel_query_on_kill: bool = True,
dbt_executable_path: str = get_system_dbt(),
dbt_cmd_flags: list[str] | None = None,
Expand All @@ -131,6 +129,7 @@ def __init__(
self.append_env = append_env
self.output_encoding = output_encoding
self.skip_exit_code = skip_exit_code
self.partial_parse = partial_parse
self.cancel_query_on_kill = cancel_query_on_kill
self.dbt_executable_path = dbt_executable_path
self.dbt_cmd_flags = dbt_cmd_flags
Expand Down Expand Up @@ -219,6 +218,9 @@ def build_cmd(

dbt_cmd.extend(self.dbt_cmd_global_flags)

if not self.partial_parse:
dbt_cmd.append("--no-partial-parse")

dbt_cmd.extend(self.base_cmd)

if self.indirect_selection:
Expand Down
16 changes: 11 additions & 5 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,12 @@

from sqlalchemy.orm import Session

from cosmos.constants import DEFAULT_OPENLINEAGE_NAMESPACE, OPENLINEAGE_PRODUCER
from cosmos.constants import (
DEFAULT_OPENLINEAGE_NAMESPACE,
OPENLINEAGE_PRODUCER,
DBT_TARGET_DIR_NAME,
DBT_PARTIAL_PARSE_FILE_NAME,
)
from cosmos.config import ProfileConfig
from cosmos.log import get_logger
from cosmos.operators.base import (
Expand All @@ -52,7 +57,7 @@
FullOutputSubprocessResult,
)
from cosmos.dbt.parser.output import extract_log_issues, parse_output
from cosmos.dbt.project import create_symlinks
from cosmos.dbt.project import create_symlinks, copy_msgpack_for_partial_parse

DBT_NO_TESTS_MSG = "Nothing to do"
DBT_WARN_MSG = "WARN"
Expand Down Expand Up @@ -208,6 +213,9 @@ def run_command(

create_symlinks(Path(self.project_dir), Path(tmp_project_dir), self.install_deps)

if self.partial_parse:
copy_msgpack_for_partial_parse(Path(self.project_dir), Path(tmp_project_dir))

with self.profile_config.ensure_profile() as profile_values:
(profile_path, env_vars) = profile_values
env.update(env_vars)
Expand Down Expand Up @@ -374,9 +382,7 @@ def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None

def on_kill(self) -> None:
if self.cancel_query_on_kill:
self.subprocess_hook.log.info("Sending SIGINT signal to process group")
if self.subprocess_hook.sub_process and hasattr(self.subprocess_hook.sub_process, "pid"):
os.killpg(os.getpgid(self.subprocess_hook.sub_process.pid), signal.SIGINT)
self.subprocess_hook.send_sigint()
else:
self.subprocess_hook.send_sigterm()

Expand Down
3 changes: 3 additions & 0 deletions docs/configuration/project-config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ variables that should be used for rendering and execution. It takes the followin
will only be rendered at execution time, not at render time.
- ``env_vars``: (new in v1.3) A dictionary of environment variables used for rendering and execution. Rendering with
env vars is only supported when using ``RenderConfig.LoadMode.DBT_LS`` load mode.
- ``partial_parse``: (new in v1.4) If True, then attempt to use the ``partial_parse.msgpack`` if it exists. This is only used
for the ``LoadMode.DBT_LS`` load mode, and for the ``ExecutionMode.LOCAL`` and ``ExecutionMode.VIRTUALENV``
execution modes.

Project Config Example
----------------------
Expand Down
4 changes: 4 additions & 0 deletions docs/getting_started/execution-modes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ The ``local`` execution mode assumes a ``dbt`` binary is reachable within the Ai
If ``dbt`` was not installed as part of the Cosmos packages,
users can define a custom path to ``dbt`` by declaring the argument ``dbt_executable_path``.

By default, if Cosmos sees a ``partial_parse.msgpack`` in the target directory of the dbt project directory when using ``local`` execution, it will use this for partial parsing to speed up task execution.
This can be turned off by setting ``partial_parse=False`` in the ``ProjectConfig``.

When using the ``local`` execution mode, Cosmos converts Airflow Connections into a native ``dbt`` profiles file (``profiles.yml``).

Example of how to use, for instance, when ``dbt`` was installed together with Cosmos:
Expand All @@ -76,6 +79,7 @@ The ``virtualenv`` mode isolates the Airflow worker dependencies from ``dbt`` by
In this case, users are responsible for declaring which version of ``dbt`` they want to use by giving the argument ``py_requirements``. This argument can be set directly in operator instances or when instantiating ``DbtDag`` and ``DbtTaskGroup`` as part of ``operator_args``.

Similar to the ``local`` execution mode, Cosmos converts Airflow Connections into a way ``dbt`` understands them by creating a ``dbt`` profile file (``profiles.yml``).
Also similar to the ``local`` execution mode, Cosmos will by default attempt to use a ``partial_parse.msgpack`` if one exists to speed up parsing.

Some drawbacks of this approach:

Expand Down
28 changes: 28 additions & 0 deletions tests/dbt/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -901,6 +901,34 @@ def test_load_via_dbt_ls_render_config_selector_arg_is_used(
assert ls_command[ls_command.index("--selector") + 1] == selector


@patch("cosmos.dbt.graph.Popen")
@patch("cosmos.dbt.graph.DbtGraph.update_node_dependency")
@patch("cosmos.config.RenderConfig.validate_dbt_command")
def test_load_via_dbt_ls_render_config_no_partial_parse(
mock_validate, mock_update_nodes, mock_popen, tmp_dbt_project_dir
):
"""Tests that --no-partial-parse appears when partial_parse=False."""
mock_popen().communicate.return_value = ("", "")
mock_popen().returncode = 0
project_config = ProjectConfig(partial_parse=False)
render_config = RenderConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, load_method=LoadMode.DBT_LS)
profile_config = ProfileConfig(
profile_name="test",
target_name="test",
profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml",
)
execution_config = MagicMock()
dbt_graph = DbtGraph(
project=project_config,
render_config=render_config,
execution_config=execution_config,
profile_config=profile_config,
)
dbt_graph.load_via_dbt_ls()
ls_command = mock_popen.call_args.args[0]
assert "--no-partial-parse" in ls_command


@pytest.mark.parametrize("load_method", [LoadMode.DBT_MANIFEST, LoadMode.CUSTOM])
def test_load_method_with_unsupported_render_config_selector_arg(load_method):
"""Tests that error is raised when RenderConfig.selector is used with LoadMode.DBT_MANIFEST or LoadMode.CUSTOM."""
Expand Down
31 changes: 29 additions & 2 deletions tests/dbt/test_project.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from pathlib import Path
from cosmos.dbt.project import create_symlinks, environ
import os
from pathlib import Path
from unittest.mock import patch

import pytest

from cosmos.dbt.project import create_symlinks, copy_msgpack_for_partial_parse, environ

DBT_PROJECTS_ROOT_DIR = Path(__file__).parent.parent.parent / "dev/dags/dbt"


Expand All @@ -17,6 +20,30 @@ def test_create_symlinks(tmp_path):
assert child.name not in ("logs", "target", "profiles.yml", "dbt_packages")


@pytest.mark.parametrize("exists", [True, False])
def test_copy_manifest_for_partial_parse(tmp_path, exists):
project_path = tmp_path / "project"
target_path = project_path / "target"
partial_parse_file = target_path / "partial_parse.msgpack"

target_path.mkdir(parents=True)

if exists:
partial_parse_file.write_bytes(b"")

tmp_dir = tmp_path / "tmp_dir"
tmp_dir.mkdir()

copy_msgpack_for_partial_parse(project_path, tmp_dir)

tmp_partial_parse_file = tmp_dir / "target" / "partial_parse.msgpack"

if exists:
assert tmp_partial_parse_file.exists()
else:
assert not tmp_partial_parse_file.exists()


@patch.dict(os.environ, {"VAR1": "value1", "VAR2": "value2"})
def test_environ_context_manager():
# Define the expected environment variables
Expand Down
Empty file added tests/hooks/__init__.py
Empty file.
70 changes: 70 additions & 0 deletions tests/hooks/test_subprocess.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from pathlib import Path
from tempfile import TemporaryDirectory
from unittest.mock import MagicMock, patch
import signal

import pytest

from cosmos.hooks.subprocess import FullOutputSubprocessHook

OS_ENV_KEY = "SUBPROCESS_ENV_TEST"
OS_ENV_VAL = "this-is-from-os-environ"


@pytest.mark.parametrize(
"env,expected",
[
({"ABC": "123", "AAA": "456"}, {"ABC": "123", "AAA": "456", OS_ENV_KEY: ""}),
({}, {OS_ENV_KEY: ""}),
(None, {OS_ENV_KEY: OS_ENV_VAL}),
],
ids=["with env", "empty env", "no env"],
)
def test_env(env, expected):
"""
Test that env variables are exported correctly to the command environment.
When ``env`` is ``None``, ``os.environ`` should be passed to ``Popen``.
Otherwise, the variables in ``env`` should be available, and ``os.environ`` should not.
"""
hook = FullOutputSubprocessHook()

def build_cmd(keys, filename):
"""
Produce bash command to echo env vars into filename.
Will always echo the special test var named ``OS_ENV_KEY`` into the file to test whether
``os.environ`` is passed or not.
"""
return "\n".join(f"echo {k}=${k}>> {filename}" for k in [*keys, OS_ENV_KEY])

with TemporaryDirectory() as tmp_dir, patch.dict("os.environ", {OS_ENV_KEY: OS_ENV_VAL}):
tmp_file = Path(tmp_dir, "test.txt")
command = build_cmd(env and env.keys() or [], tmp_file.as_posix())
hook.run_command(command=["bash", "-c", command], env=env)
actual = dict([x.split("=") for x in tmp_file.read_text().splitlines()])
assert actual == expected


def test_subprocess_hook():
hook = FullOutputSubprocessHook()
result = hook.run_command(command=["bash", "-c", f'echo "foo"'])
assert result.exit_code == 0
assert result.output == "foo"
assert result.full_output == ["foo"]


@patch("os.getpgid", return_value=123)
@patch("os.killpg")
def test_send_sigint(mock_killpg, mock_getpgid):
hook = FullOutputSubprocessHook()
hook.sub_process = MagicMock()
hook.send_sigint()
mock_killpg.assert_called_with(123, signal.SIGINT)


@patch("os.getpgid", return_value=123)
@patch("os.killpg")
def test_send_sigterm(mock_killpg, mock_getpgid):
hook = FullOutputSubprocessHook()
hook.sub_process = MagicMock()
hook.send_sigterm()
mock_killpg.assert_called_with(123, signal.SIGTERM)
Loading

0 comments on commit 39acd4c

Please sign in to comment.