Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jinja templating doesn't work with container_resources when using dymanic task mapping with Kubernetes Pod Operator #29432

Closed
1 of 2 tasks
pshrivastava27 opened this issue Feb 8, 2023 · 16 comments · Fixed by #29451
Assignees
Labels
area:core kind:bug This is a clearly a bug

Comments

@pshrivastava27
Copy link

Apache Airflow version

Other Airflow 2 version (please specify below)

What happened

Google Cloud Composer Version - 2.1.5
Airflow Version - 2.4.3

We are trying to use dynamic task mapping with Kubernetes Pod Operator. Our use-case is to return the pod's CPU and memory requirements from a function which is included as a macro in DAG

Without dynamic task mapping it works perfectly, but when used with the dynamic task mapping, it is unable to recognize the macro.

container_resources is a templated field as per the docs, the feature was introduced in this PR.
We also tried the toggling the boolean render_template_as_native_obj, but still no luck.

Providing below a trimmed version of our DAG to help reproduce the issue. (function to return cpu and memory is trivial here just to show example)

What you think should happen instead

It should have worked similar with or without dynamic task mapping.

How to reproduce

Deployed the following DAG in Google Cloud Composer.

import datetime
import os

from airflow import models
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
    KubernetesPodOperator,
)
from kubernetes.client import models as k8s_models

dvt_image = os.environ.get("DVT_IMAGE")

default_dag_args = {"start_date": datetime.datetime(2022, 1, 1)}


def pod_mem():
    return "4000M"


def pod_cpu():
    return "1000m"


with models.DAG(
    "sample_dag",
    schedule_interval=None,
    default_args=default_dag_args,
    render_template_as_native_obj=True,
    user_defined_macros={
        "pod_mem": pod_mem,
        "pod_cpu": pod_cpu,
    },
) as dag:

    task_1 = KubernetesPodOperator(
        task_id="task_1",
        name="task_1",
        namespace="default",
        image=dvt_image,
        cmds=["bash", "-cx"],
        arguments=["echo hello"],
        service_account_name="sa-k8s",
        container_resources=k8s_models.V1ResourceRequirements(
            limits={
                "memory": "{{ pod_mem() }}",
                "cpu": "{{ pod_cpu() }}",
            }
        ),
        startup_timeout_seconds=1800,
        get_logs=True,
        image_pull_policy="Always",
        config_file="/home/airflow/composer_kube_config",
        dag=dag,
    )

    task_2 = KubernetesPodOperator.partial(
        task_id="task_2",
        name="task_2",
        namespace="default",
        image=dvt_image,
        cmds=["bash", "-cx"],
        service_account_name="sa-k8s",
        container_resources=k8s_models.V1ResourceRequirements(
            limits={
                "memory": "{{ pod_mem() }}",
                "cpu": "{{ pod_cpu() }}",
            }
        ),
        startup_timeout_seconds=1800,
        get_logs=True,
        image_pull_policy="Always",
        config_file="/home/airflow/composer_kube_config",
        dag=dag,
    ).expand(arguments=[["echo hello"]])

    task_1 >> task_2

task_1 (without dynamic task mapping) completes successfully, while task_2(with dynamic task mapping) fails.

Looking at the error logs, it failed while rendering the Pod spec since the calls to pod_cpu() and pod_mem() are unresolved.

Here is the traceback:

Exception when attempting to create Namespaced Pod: { "apiVersion": "v1", "kind": "Pod", "metadata": { "annotations": {}, "labels": { "dag_id": "sample_dag", "task_id": "task_2", "run_id": "manual__2023-02-08T183926.890852Z-eee90e4ee", "kubernetes_pod_operator": "True", "map_index": "0", "try_number": "2", "airflow_version": "2.4.3-composer", "airflow_kpo_in_cluster": "False" }, "name": "task-2-46f76eb0432d42ae9a331a6fc53835b3", "namespace": "default" }, "spec": { "affinity": {}, "containers": [ { "args": [ "echo hello" ], "command": [ "bash", "-cx" ], "env": [], "envFrom": [], "image": "us.gcr.io/ams-e2e-testing/edw-dvt-tool", "imagePullPolicy": "Always", "name": "base", "ports": [], "resources": { "limits": { "memory": "{{ pod_mem() }}", "cpu": "{{ pod_cpu() }}" } }, "volumeMounts": [] } ], "hostNetwork": false, "imagePullSecrets": [], "initContainers": [], "nodeSelector": {}, "restartPolicy": "Never", "securityContext": {}, "serviceAccountName": "sa-k8s", "tolerations": [], "volumes": [] } }
Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py", line 143, in run_pod_async
resp = self._client.create_namespaced_pod(
File "/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/api/core_v1_api.py", line 7356, in create_namespaced_pod
return self.create_namespaced_pod_with_http_info(namespace, body, **kwargs) # noqa: E501
File "/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/api/core_v1_api.py", line 7455, in create_namespaced_pod_with_http_info
return self.api_client.call_api(
File "/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 348, in call_api
return self.__call_api(resource_path, method,
File "/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 180, in __call_api
response_data = self.request(
File "/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 391, in request
return self.rest_client.POST(url,
File "/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/rest.py", line 275, in POST
return self.request("POST", url,
File "/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/rest.py", line 234, in request
raise ApiException(http_resp=r)
kubernetes.client.exceptions.ApiException: (400)
Reason: Bad Request
HTTP response headers: HTTPHeaderDict({'Audit-Id': '1ef20c0b-6980-4173-b9cc-9af5b4792e86', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid': '1b263a21-4c75-4ef8-8147-c18780a13f0e', 'X-Kubernetes-Pf-Prioritylevel-Uid': '3cd4cda4-908c-4944-a422-5512b0fb88d6', 'Date': 'Wed, 08 Feb 2023 18:45:23 GMT', 'Content-Length': '256'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Pod in version "v1" cannot be handled as a Pod: quantities must match the regular expression '^([+-]?[0-9.]+)([eEinumkKMGTP][-+]?[0-9])$'","reason":"BadRequest","code":400}

Operating System

Google Composer Kubernetes Cluster

Versions of Apache Airflow Providers

No response

Deployment

Composer

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@pshrivastava27 pshrivastava27 added area:core kind:bug This is a clearly a bug labels Feb 8, 2023
@boring-cyborg
Copy link

boring-cyborg bot commented Feb 8, 2023

Thanks for opening your first issue here! Be sure to follow the issue template!

@hussein-awala
Copy link
Member

Thank you for creating this issue, I can reproduce it with master.
Can someone assign it to me?

@potiuk
Copy link
Member

potiuk commented Feb 8, 2023

Here you go :)

@hussein-awala
Copy link
Member

@pshrivastava27 I created this PR #29451 and it solved the problem for me, can you test it on your Dev environment?

@pshrivastava27
Copy link
Author

pshrivastava27 commented Feb 10, 2023

Thanks! @hussein-awala
We're using Google Cloud Composer (composer-2.1.5-airflow-2.4.3), which is the latest available version.
Is there any way we can get just this change incorporated in our current version? Can we use a custom operator to get it fixed temporarily?

@potiuk
Copy link
Member

potiuk commented Feb 10, 2023

Thanks! @hussein-awala We're using Google Cloud Composer (composer-2.1.5-airflow-2.4.3), which is the latest available version. Is there any way we can get just this change incorporated in our current version? Can we use a custom operator to get it fixed temporarily?

Looking at the change it would work if you copy the whole new render_template_fields method to your custom operator.

@hussein-awala
Copy link
Member

Looking at the change it would work if you copy the whole new render_template_fields method to your custom operator.

I don't think this will work where the operator render_template_fields method is not called at all, instead we call the Templater method which we cannot override.

Is there any way we can get just this change incorporated in our current version? Can we use a custom operator to get it fixed temporarily?

Implementing a custom operator can't solve the problem, I'll check how you can work around it.

@potiuk
Copy link
Member

potiuk commented Feb 11, 2023

I don't think this will work where the operator render_template_fields method is not called at all, instead we call the Templater method which we cannot override.

Curious. How would then #29451 solve the problem (I based my answer on the comment above that it solves it :) ) . I think #29451 only really changes the "do_render_template" method which in essence can be overridden (assuming that you only override it in a custom operator that you used as "mapped" operator (I.e. in dynamic task-mapped tasks).

Or maybe I am missing something :)?

@hussein-awala
Copy link
Member

As I understood, MappedOperator is a wrapper for AbstractOperator and we don't extend it, it just contains a reference for an operator instance created from the custom operator which extends AbstractOperator:
Airflow_operators

Without #29451, the MappedOperator calls its own _do_render_template_fields method where the custom operator doesn't extend it, for that the nested fields of KubernetesPodOperator were not rendered.
Call stack:

  • MappedOperator.render_template_fields()
  • AbstractOperator._render_nested_template_fields()

With #29451 MappedOperator calls the method _do_render_template_fields from the unmapped_task which is an instance created from our custom operator, in this case the logic we define in this method will be applied on the instance fields.
Call stack:

  • MappedOperator.render_template_fields()
  • CustomOperator._render_nested_template_fields()
  • Super._render_nested_template_fields() (if called)
  • ...

@vasu2809
Copy link

vasu2809 commented Feb 13, 2023

@hussein-awala Is the solution to use a custom pod operator in this case to be able to the jinja templating variables?

We are using Cloud Composer and dont have the flexibility to upgrade to the latest airflow version for the fix to be implemented, hence we are wondering if we should be using a custom Pod Operator now

@jose-lpa
Copy link
Contributor

@hussein-awala Can I ask what Airflow version is going to include this fix? I'm in the same situation as @vasu2809: using Cloud Composer and not able to just upgrade to the latest Airflow version. Looking at the code, it doesn't seem that users can fix this issue by just creating a custom operator or something like that, right?

@jose-lpa
Copy link
Contributor

jose-lpa commented Feb 20, 2023

I actually fixed my situation by simply using the Variable model instead of trying to go with templated stuff.

Example of my exact situation:

Failing:

with DAG(...) as dag:

    # ...other tasks...

    calculate_statistics = KubernetesPodOperator.partial(
        config_file="/home/airflow/composer_kube_config",
        kubernetes_conn_id="kubernetes_default",
        namespace="default",
        task_id="calculate_statistics",
        name="calculate-statistics",
        image=(
            "eu.gcr.io/hummingbird-technologies/tasks/imagery-stats:{{ var.value.ENVIRONMENT }}"
        ),
        image_pull_policy="Always",
        # (!) Here is the problem, in this `env_vars`.
        env_vars=[
            V1EnvVar(name="INTERNAL_API_URL", value="{{ var.value.INTERNAL_API_URL }}",
            V1EnvVar(name="COLLECTION_NAME", value="{{ var.value.COLLECTION_NAME }}"),
        ],
        container_resources=V1ResourceRequirements(requests={"cpu": 1, "memory": "10Gi"}),
        startup_timeout_seconds=5 * 60,
        retries=0,
    )

    statistics = calculate_statistics.expand(arguments=XComArg(argument_builder))

    chain(acquire_data, statistics)

Working:

from airflow.models import Variable


with DAG(...) as dag:

    # ...other tasks...

    calculate_statistics = KubernetesPodOperator.partial(
        config_file="/home/airflow/composer_kube_config",
        kubernetes_conn_id="kubernetes_default",
        namespace="default",
        task_id="calculate_statistics",
        name="calculate-statistics",
        image=(
            "eu.gcr.io/hummingbird-technologies/tasks/imagery-stats:{{ var.value.ENVIRONMENT }}"
        ),
        image_pull_policy="Always",
        # (!) Solving it by not using templated values at all.
        env_vars=[
            V1EnvVar(name="INTERNAL_API_URL", value=Variable.get("INTERNAL_API_URL")),
            V1EnvVar(name="COLLECTION_NAME", value=Variable.get("COLLECTION_NAME")),
        ],
        container_resources=V1ResourceRequirements(requests={"cpu": 1, "memory": "10Gi"}),
        startup_timeout_seconds=5 * 60,
        retries=0,
    )

    statistics = calculate_statistics.expand(arguments=XComArg(argument_builder))

    chain(acquire_data, statistics)

@vasu2809 maybe this can help you too...

@hussein-awala
Copy link
Member

@jose-lpa using Variable.get() in the dag script is not recommended because the DagFileProcessor process the script each X minutes, and it loads this variable from the DB. Also this solution works with Variable but not all the other jinja templates.

@pshrivastava27 here is a solution for your need

import datetime
import os

from airflow import models
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
    KubernetesPodOperator,
)
from kubernetes.client import models as k8s_models

dvt_image = os.environ.get("DVT_IMAGE", "dev")

default_dag_args = {"start_date": datetime.datetime(2022, 1, 1)}


class PatchedResourceRequirements(k8s_models.V1ResourceRequirements):
    template_fields = ("limits", "requests")


def pod_mem():
    return "4000M"


def pod_cpu():
    return "1000m"


with models.DAG(
    "sample_dag",
    schedule_interval=None,
    default_args=default_dag_args,
    render_template_as_native_obj=True,
    user_defined_macros={
        "pod_mem": pod_mem,
        "pod_cpu": pod_cpu,
    },
) as dag:

    task_1 = KubernetesPodOperator(
        task_id="task_1",
        name="task_1",
        namespace="default",
        image=dvt_image,
        cmds=["bash", "-cx"],
        arguments=["echo hello"],
        service_account_name="sa-k8s",
        container_resources=PatchedResourceRequirements(
            limits={
                "memory": "{{ pod_mem() }}",
                "cpu": "{{ pod_cpu() }}",
            }
        ),
        startup_timeout_seconds=1800,
        get_logs=True,
        image_pull_policy="Always",
        config_file="/home/airflow/composer_kube_config",
        dag=dag,
    )

    task_2 = KubernetesPodOperator.partial(
        task_id="task_2",
        name="task_2",
        namespace="default",
        image=dvt_image,
        cmds=["bash", "-cx"],
        service_account_name="sa-k8s",
        container_resources=PatchedResourceRequirements(
            limits={
                "memory": "{{ pod_mem() }}",
                "cpu": "{{ pod_cpu() }}",
            }
        ),
        startup_timeout_seconds=1800,
        get_logs=True,
        image_pull_policy="Always",
        config_file="/home/airflow/composer_kube_config",
        dag=dag,
    ).expand(arguments=[["echo hello"]])

    task_1 >> task_2

You can do the same for the other classes if needed.

@raphaelauv
Copy link
Contributor

@potiuk could we add this one to https://github.com/apache/airflow/milestone/68 ?

@potiuk
Copy link
Member

potiuk commented Feb 20, 2023

Done

@pshrivastava27
Copy link
Author

@jose-lpa using Variable.get() in the dag script is not recommended because the DagFileProcessor process the script each X minutes, and it loads this variable from the DB. Also this solution works with Variable but not all the other jinja templates.

@pshrivastava27 here is a solution for your need

import datetime
import os

from airflow import models
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
    KubernetesPodOperator,
)
from kubernetes.client import models as k8s_models

dvt_image = os.environ.get("DVT_IMAGE", "dev")

default_dag_args = {"start_date": datetime.datetime(2022, 1, 1)}


class PatchedResourceRequirements(k8s_models.V1ResourceRequirements):
    template_fields = ("limits", "requests")


def pod_mem():
    return "4000M"


def pod_cpu():
    return "1000m"


with models.DAG(
    "sample_dag",
    schedule_interval=None,
    default_args=default_dag_args,
    render_template_as_native_obj=True,
    user_defined_macros={
        "pod_mem": pod_mem,
        "pod_cpu": pod_cpu,
    },
) as dag:

    task_1 = KubernetesPodOperator(
        task_id="task_1",
        name="task_1",
        namespace="default",
        image=dvt_image,
        cmds=["bash", "-cx"],
        arguments=["echo hello"],
        service_account_name="sa-k8s",
        container_resources=PatchedResourceRequirements(
            limits={
                "memory": "{{ pod_mem() }}",
                "cpu": "{{ pod_cpu() }}",
            }
        ),
        startup_timeout_seconds=1800,
        get_logs=True,
        image_pull_policy="Always",
        config_file="/home/airflow/composer_kube_config",
        dag=dag,
    )

    task_2 = KubernetesPodOperator.partial(
        task_id="task_2",
        name="task_2",
        namespace="default",
        image=dvt_image,
        cmds=["bash", "-cx"],
        service_account_name="sa-k8s",
        container_resources=PatchedResourceRequirements(
            limits={
                "memory": "{{ pod_mem() }}",
                "cpu": "{{ pod_cpu() }}",
            }
        ),
        startup_timeout_seconds=1800,
        get_logs=True,
        image_pull_policy="Always",
        config_file="/home/airflow/composer_kube_config",
        dag=dag,
    ).expand(arguments=[["echo hello"]])

    task_1 >> task_2

You can do the same for the other classes if needed.

Thanks for the help! @hussein-awala

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core kind:bug This is a clearly a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants