Skip to content

Commit

Permalink
Create single virtualenv when DbtVirtualenvBaseOperator has `virtua…
Browse files Browse the repository at this point in the history
…lenv_dir=None` and `is_virtualenv_dir_temporary=True` (#1200)

## Description

This PR optimizes the `DbtVirtualenvBaseOperator` by implementing
virtualenv reuse within a single task execution. It reduces the overhead
of creating new virtualenvs for each dbt command.

The `DbtVirtualenvBaseOperator` in [email protected] creates a temporary
directory and prepares a virtualenv twice when `virtualenv_dir` is
`None` and `is_virtualenv_dir_temporary` is `True`. This PR modifies it
to create a directory and a virtualenv only once at the beginning of the
`run_command` method, avoiding this overhead.

Additionally, I have added tests to ensure the directory for virtualenv
will be deleted after the task execution. This is related to the issue
reported in #958.

The changes include:
- Reusing virtualenv in a single task execution
- Improving temporary directory management
- Adding tests to ensure proper handling of virtualenv directories
(deletion or persistence)

## Related Issue(s)

#958

## Breaking Change?

I believe this is not a breaking change.

## Checklist

- [x] I have made corresponding changes to the documentation (if
required)
- [x] I have added tests that prove my fix is effective or that my
feature works
  • Loading branch information
kesompochy authored Oct 1, 2024
1 parent 7eb9386 commit 225d6e9
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 43 deletions.
69 changes: 37 additions & 32 deletions cosmos/operators/virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
)

if TYPE_CHECKING:
from airflow.utils.context import Context

from airflow.utils.context import Context # pragma: no cover
from dbt.cli.main import dbtRunnerResult # pragma: no cover

PY_INTERPRETER = "python3"
LOCK_FILENAME = "cosmos_virtualenv.lock"
Expand Down Expand Up @@ -77,49 +77,54 @@ def __init__(
self.virtualenv_dir = virtualenv_dir
self.is_virtualenv_dir_temporary = is_virtualenv_dir_temporary
self.max_retries_lock = settings.virtualenv_max_retries_lock
self._py_bin: str | None = None
super().__init__(**kwargs)
if not self.py_requirements:
self.log.error("Cosmos virtualenv operators require the `py_requirements` parameter")

def run_subprocess(self, command: list[str], env: dict[str, str], cwd: str) -> FullOutputSubprocessResult:
# No virtualenv_dir set, so create a temporary virtualenv
if self.virtualenv_dir is None or self.is_virtualenv_dir_temporary:
self.log.info("Creating temporary virtualenv")
with TemporaryDirectory(prefix="cosmos-venv") as tempdir:
self.virtualenv_dir = Path(tempdir)
py_bin = self._prepare_virtualenv()
dbt_bin = str(Path(py_bin).parent / "dbt")
command[0] = dbt_bin # type: ignore
subprocess_result: FullOutputSubprocessResult = self.subprocess_hook.run_command(
command=command,
env=env,
cwd=cwd,
output_encoding=self.output_encoding,
)
return subprocess_result

# Use a reusable virtualenv
self.log.info(f"Checking if the virtualenv lock {str(self._lock_file)} exists")
while not self._is_lock_available() and self.max_retries_lock:
logger.info("Waiting for virtualenv lock to be released")
time.sleep(1)
self.max_retries_lock -= 1

self.log.info(f"Acquiring the virtualenv lock")
self._acquire_venv_lock()
py_bin = self._prepare_virtualenv()
dbt_bin = str(Path(py_bin).parent / "dbt")
command[0] = dbt_bin # type: ignore
self.log.info("Trying to run the command:\n %s\nFrom %s", command, cwd)
if self._py_bin is not None:
self.log.info(f"Using Python binary from virtualenv: {self._py_bin}")
command[0] = str(Path(self._py_bin).parent / "dbt")
subprocess_result = self.subprocess_hook.run_command(
command=command,
env=env,
cwd=cwd,
output_encoding=self.output_encoding,
)
self.log.info("Releasing virtualenv lock")
self._release_venv_lock()
self.log.info(subprocess_result.output)
return subprocess_result

def run_command(
self,
cmd: list[str],
env: dict[str, str | bytes | os.PathLike[Any]],
context: Context,
) -> FullOutputSubprocessResult | dbtRunnerResult:
# No virtualenv_dir set, so create a temporary virtualenv
if self.virtualenv_dir is None or self.is_virtualenv_dir_temporary:
self.log.info("Creating temporary virtualenv")
with TemporaryDirectory(prefix="cosmos-venv") as tempdir:
self.virtualenv_dir = Path(tempdir)
self._py_bin = self._prepare_virtualenv()
return super().run_command(cmd, env, context)

try:
self.log.info(f"Checking if the virtualenv lock {str(self._lock_file)} exists")
while not self._is_lock_available() and self.max_retries_lock:
logger.info("Waiting for virtualenv lock to be released")
time.sleep(1)
self.max_retries_lock -= 1

self.log.info("Acquiring the virtualenv lock")
self._acquire_venv_lock()
self._py_bin = self._prepare_virtualenv()
return super().run_command(cmd, env, context)
finally:
self.log.info("Releasing virtualenv lock")
self._release_venv_lock()

def clean_dir_if_temporary(self) -> None:
"""
Delete the virtualenv directory if it is temporary.
Expand Down
40 changes: 29 additions & 11 deletions tests/operators/test_virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,22 @@ def test_run_command_without_virtualenv_dir(
assert dbt_deps["command"][0] == dbt_cmd["command"][0]
assert dbt_deps["command"][1] == "deps"
assert dbt_cmd["command"][1] == "do-something"
assert mock_execute.call_count == 4


assert mock_execute.call_count == 2
virtualenv_call, pip_install_call = mock_execute.call_args_list
assert "python" in virtualenv_call[0][0][0]
assert virtualenv_call[0][0][1] == "-m"
assert virtualenv_call[0][0][2] == "virtualenv"
assert "pip" in pip_install_call[0][0][0]
assert pip_install_call[0][0][1] == "install"
cosmos_venv_dirs = [
f for f in os.listdir("/tmp") if os.path.isdir(os.path.join("/tmp", f)) and f.startswith("cosmos-venv")
]
assert len(cosmos_venv_dirs) == 0


@patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator._is_lock_available")
@patch("time.sleep")
@patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator._release_venv_lock")
@patch("airflow.utils.python_virtualenv.execute_in_subprocess")
@patch("cosmos.operators.virtualenv.DbtLocalBaseOperator.calculate_openlineage_events_completes")
@patch("cosmos.operators.virtualenv.DbtLocalBaseOperator.store_compiled_sql")
Expand All @@ -93,7 +106,12 @@ def test_run_command_with_virtualenv_dir(
mock_store_compiled_sql,
mock_calculate_openlineage_events_completes,
mock_execute,
mock_release_venv_lock,
mock_sleep,
mock_is_lock_available,
caplog,
):
mock_is_lock_available.side_effect = [False, False, True]
mock_get_connection.return_value = Connection(
conn_id="fake_conn",
conn_type="postgres",
Expand Down Expand Up @@ -124,6 +142,12 @@ def test_run_command_with_virtualenv_dir(
dbt_cmd = run_command_args[1].kwargs
assert dbt_deps["command"][0] == "mock-venv/bin/dbt"
assert dbt_cmd["command"][0] == "mock-venv/bin/dbt"
assert caplog.text.count("Waiting for virtualenv lock to be released") == 2
assert mock_sleep.call_count == 2
assert mock_is_lock_available.call_count == 3
assert mock_release_venv_lock.call_count == 1
cosmos_venv_dirs = [f for f in os.listdir() if f == "mock-venv"]
assert len(cosmos_venv_dirs) == 1


def test_virtualenv_operator_append_env_is_true_by_default():
Expand Down Expand Up @@ -184,13 +208,7 @@ def test_on_kill(mock_clean_dir_if_temporary):


@patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator.subprocess_hook")
@patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator._release_venv_lock")
@patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator._prepare_virtualenv")
@patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator._acquire_venv_lock")
@patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator._is_lock_available", side_effect=[False, False, True])
def test_run_subprocess_waits_for_lock(
mock_is_lock_available, mock_acquire, mock_prepare, mock_release, mock_subprocess_hook, tmpdir, caplog
):
def test_run_subprocess(mock_subprocess_hook, tmpdir, caplog):
venv_operator = ConcreteDbtVirtualenvBaseOperator(
profile_config=profile_config,
project_dir="./dev/dags/dbt/jaffle_shop",
Expand All @@ -199,7 +217,7 @@ def test_run_subprocess_waits_for_lock(
virtualenv_dir=tmpdir,
)
venv_operator.run_subprocess(["dbt", "run"], {}, "./dev/dags/dbt/jaffle_shop")
assert caplog.text.count("Waiting for virtualenv lock to be released") == 2
assert len(mock_subprocess_hook.run_command.call_args_list) == 1


@patch("cosmos.operators.local.DbtLocalBaseOperator.execute", side_effect=ValueError)
Expand Down

0 comments on commit 225d6e9

Please sign in to comment.