Skip to content

Commit

Permalink
Fix using XCom with KubernetesPodOperator
Browse files Browse the repository at this point in the history
This commit effectively revers apache#15942 because of the issues it is causing. Until we find better solution we should revert this change

Error:

```
[2021-07-26 20:23:54,109] {taskinstance.py:1108} INFO - Executing <Task(KubernetesPodOperator): write-xcom> on 2021-07-26T20:23:27.058907+00:00
[2021-07-26 20:23:54,113] {standard_task_runner.py:52} INFO - Started process 11 to run task
[2021-07-26 20:23:54,297] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'k8_pod_operator_xcom', 'write-xcom', '2021-07-26T20:23:27.058907+00:00', '--job-id', '1757', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/k8s_xcom_example.py', '--cfg-path', '/tmp/tmp0q94pkhs', '--error-file', '/tmp/tmpoz9qqp2l']
[2021-07-26 20:23:54,298] {standard_task_runner.py:77} INFO - Job 1757: Subtask write-xcom
[2021-07-26 20:23:54,511] {logging_mixin.py:104} INFO - Running <TaskInstance: k8_pod_operator_xcom.write-xcom 2021-07-26T20:23:27.058907+00:00 [running]> on host k8podoperatorxcomwritexcom.21384021df914227ad4e4b3a34313710
[2021-07-26 20:23:54,713] {taskinstance.py:1502} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1158, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1295, in _prepare_and_execute_task_with_callbacks
    self.render_templates(context=context)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1796, in render_templates
    self.task.render_template_fields(context)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 999, in render_template_fields
    self._do_render_template_fields(self, self.template_fields, context, jinja_env, set())
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1012, in _do_render_template_fields
    rendered_content = self.render_template(content, context, jinja_env, seen_oids)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in render_template
    return [self.render_template(element, context, jinja_env) for element in content]
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in <listcomp>
    return [self.render_template(element, context, jinja_env) for element in content]
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1047, in render_template
    return jinja_env.get_template(content).render(**context)
  File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 883, in get_template
    return self._load_template(name, self.make_globals(globals))
  File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 857, in _load_template
    template = self.loader.load(self, name, globals)
  File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 115, in load
    source, filename, uptodate = self.get_source(environment, name)
  File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 197, in get_source
    raise TemplateNotFound(template)
jinja2.exceptions.TemplateNotFound: mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json
[2021-07-26 20:23:54,797] {taskinstance.py:1552} INFO - Marking task as FAILED. dag_id=k8_pod_operator_xcom, task_id=write-xcom, execution_date=20210726T202327, start_date=20210726T202353, end_date=20210726T202354
[2021-07-26 20:23:54,936] {local_task_job.py:153} INFO - Task exited with return code 1
```

Dag:

```
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
    KubernetesPodOperator,
)
from airflow.utils.dates import days_ago
from airflow.configuration import conf

namespace = conf.get("kubernetes", "NAMESPACE")

# This will detect the default namespace locally and read the
# environment namespace when deployed to Astronomer.
if namespace == "default":
    config_file = "/usr/local/airflow/include/.kube/config"
    in_cluster = False
else:
    in_cluster = True
    config_file = None

default_args = {
    "owner": "airflow",
}

with DAG(
    dag_id="k8_pod_operator_xcom",
    default_args=default_args,
    schedule_interval=None,
    start_date=days_ago(2),
    tags=["k8"],
) as dag:

    write_xcom = KubernetesPodOperator(
        namespace=namespace,
        in_cluster=in_cluster,
        config_file=config_file,
        image="ubuntu",
        cmds=[
            "sh",
            "-c",
            "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json",
        ],
        name="write-xcom",
        do_xcom_push=True,
        is_delete_operator_pod=True,
        task_id="write-xcom",
        get_logs=True,
    )

    pod_task_xcom_result = BashOperator(
        bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"",
        task_id="pod_task_xcom_result",
    )

    write_xcom >> pod_task_xcom_result

```

closes apache#17186
  • Loading branch information
kaxil committed Aug 21, 2021
1 parent c4d043d commit 46d909a
Showing 1 changed file with 0 additions and 2 deletions.
2 changes: 0 additions & 2 deletions airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,6 @@ class KubernetesPodOperator(BaseOperator):
'pod_template_file',
)

template_ext = ('.yaml', '.yml', '.json')

# fmt: off
def __init__(
# fmt: on
Expand Down

0 comments on commit 46d909a

Please sign in to comment.