Skip to content

Commit

Permalink
Refresh bash_command once and earlier
Browse files Browse the repository at this point in the history
  • Loading branch information
josh-fell committed Dec 19, 2023
1 parent 96d14e4 commit 094791c
Showing 1 changed file with 29 additions and 36 deletions.
65 changes: 29 additions & 36 deletions airflow/operators/bash.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,32 +166,31 @@ def __init__(
self.cwd = cwd
self.append_env = append_env

# When using the @task.bash decorator, the bash_command is not known until the underlying Python
# callable is executed and therefore set to NOTSET. This flag is useful during execution to
# determine whether the bash_command value needs to re-rendered for the Rendered Template view in the
# UI. Without re-rendering, the bash_command value in this UI view will display as an ArgNotSet type.
# When using the @task.bash decorator, the Bash command is not known until the underlying Python
# callable is executed and therefore set to NOTSET initially. This flag is useful during execution to
# determine whether the bash_command value needs to re-rendered.
self._init_bash_command_not_set = isinstance(self.bash_command, ArgNotSet)

@cached_property
def subprocess_hook(self):
"""Returns hook for running the bash command."""
return SubprocessHook()

def update_bash_command_rendered_ti_field(self, ti: TaskInstance) -> None:
@staticmethod
def refresh_bash_command(ti: TaskInstance) -> None:
"""Rewrite the underlying rendered bash_command value for a task instance in the metadatabase.
This is useful to ensure the bash_command in the Rendered Template view in the UI displays the
executed Bash command for the task. We cannot call TaskInstance.get_rendered_template_fields() here to
apply to the task because this will retrieve the RenderedTaskInstanceFields from the metadatabase
which doesn't have the runtime-evaluated bash_command value.
TaskInstance.get_rendered_template_fields() cannot be used because this will retrieve the
RenderedTaskInstanceFields from the metadatabase which doesn't have the runtime-evaluated bash_command
value.
:meta private:
"""
from airflow.models.renderedtifields import RenderedTaskInstanceFields

rtif = RenderedTaskInstanceFields(ti, render_templates=False)
rtif = RenderedTaskInstanceFields(ti) # Templates are rendered be default here.
RenderedTaskInstanceFields.write(rtif)
RenderedTaskInstanceFields.delete_old_records(self.task_id, self.dag_id)
RenderedTaskInstanceFields.delete_old_records(ti.task_id, ti.dag_id)

def get_env(self, context):
"""Build the set of environment variables to be exposed for the bash command."""
Expand Down Expand Up @@ -220,34 +219,28 @@ def execute(self, context: Context):
if not os.path.isdir(self.cwd):
raise AirflowException(f"The cwd {self.cwd} must be a directory")
env = self.get_env(context)
ti = cast("TaskInstance", context["ti"])

# Because the bash_command value is reevaluated at runtime using the @tash.bash decorator, the
# returned string from the decorated callable could be a string that contains a Jinja expression.
# To account for this, the templated string need to be rendered again.
# Because the bash_command value is evaluated at runtime using the @tash.bash decorator, the
# RenderedTaskInstanceField data needs to be rewritten and the bash_command value re-rendered -- the
# latter because the returned command from the decorated callable could contain a Jinja expression.
# Both will ensure the correct Bash command is executed and that the Rendered Template view in the UI
# displays the executed command (otherwise it will display as an ArgNotSet type).
if self._init_bash_command_not_set:
ti.render_templates()

try:
result = self.subprocess_hook.run_command(
command=[bash_path, "-c", self.bash_command],
env=env,
output_encoding=self.output_encoding,
cwd=self.cwd,
ti = cast("TaskInstance", context["ti"])
self.refresh_bash_command(ti)

result = self.subprocess_hook.run_command(
command=[bash_path, "-c", self.bash_command],
env=env,
output_encoding=self.output_encoding,
cwd=self.cwd,
)
if self.skip_on_exit_code is not None and result.exit_code in self.skip_on_exit_code:
raise AirflowSkipException(f"Bash command returned exit code {self.skip_on_exit_code}. Skipping.")
elif result.exit_code != 0:
raise AirflowException(
f"Bash command failed. The command returned a non-zero exit code {result.exit_code}."
)
if self.skip_on_exit_code is not None and result.exit_code in self.skip_on_exit_code:
raise AirflowSkipException(
f"Bash command returned exit code {self.skip_on_exit_code}. Skipping."
)
elif result.exit_code != 0:
raise AirflowException(
f"Bash command failed. The command returned a non-zero exit code {result.exit_code}."
)
finally:
# Assuming use of the @task.bash decorator, regardless of the outcome of executing the Bash
# command, update the bash_command value for the Rendered Template view in the UI.
if self._init_bash_command_not_set:
self.update_bash_command_rendered_ti_field(ti)

return result.output

Expand Down

0 comments on commit 094791c

Please sign in to comment.