Skip to content

Commit

Permalink
Fix invalid value error caused by long k8s pod name (#13299)
Browse files Browse the repository at this point in the history
K8S pod names follows DNS_SUBDOMAIN naming convention, which can be
broken down into one or more DNS_LABEL separated by `.`.

While the max length of pod name (DNS_SUBDOMAIN) is 253, each label
component (DNS_LABEL) of a the name cannot be longer than 63. Pod names
generated by k8s executor right now only contains one label, which means
the total effective name length cannot be greater than 63.

This patch concats uuid to pod_id using `.` to generate the pod anem,
thus extending the max name length to 63 + len(uuid).

Reference: https://github.com/kubernetes/kubernetes/blob/release-1.1/docs/design/identifiers.md
Relevant discussion: kubernetes/kubernetes#79351 (comment)

(cherry picked from commit 862443f)
  • Loading branch information
QP Hou authored and kaxil committed Jan 29, 2021
1 parent 0e92cd8 commit 7b02edd
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 36 deletions.
38 changes: 15 additions & 23 deletions airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
from airflow.kubernetes.kube_client import get_kube_client
from airflow.kubernetes.kube_config import KubeConfig
from airflow.kubernetes.kubernetes_helper_functions import create_pod_id
from airflow.kubernetes.pod_generator import MAX_POD_ID_LEN, PodGenerator
from airflow.kubernetes.pod_generator import PodGenerator
from airflow.kubernetes.pod_launcher import PodLauncher
from airflow.models import TaskInstance
from airflow.models.taskinstance import TaskInstanceKey
Expand Down Expand Up @@ -367,24 +367,6 @@ def _annotations_to_key(self, annotations: Dict[str, str]) -> Optional[TaskInsta

return TaskInstanceKey(dag_id, task_id, execution_date, try_number)

@staticmethod
def _make_safe_pod_id(safe_dag_id: str, safe_task_id: str, safe_uuid: str) -> str:
r"""
Kubernetes pod names must be <= 253 chars and must pass the following regex for
validation
``^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$``
:param safe_dag_id: a dag_id with only alphanumeric characters
:param safe_task_id: a task_id with only alphanumeric characters
:param safe_uuid: a uuid
:return: ``str`` valid Pod name of appropriate length
"""
safe_key = safe_dag_id + safe_task_id

safe_pod_id = safe_key[: MAX_POD_ID_LEN - len(safe_uuid) - 1] + "-" + safe_uuid

return safe_pod_id

def _flush_watcher_queue(self) -> None:
self.log.debug('Executor shutting down, watcher_queue approx. size=%d', self.watcher_queue.qsize())
while True:
Expand Down Expand Up @@ -468,7 +450,7 @@ def clear_not_launched_queued_tasks(self, session=None) -> None:
pod_generator.make_safe_label_value(task.dag_id),
pod_generator.make_safe_label_value(task.task_id),
pod_generator.datetime_to_label_safe_datestring(task.execution_date),
self.scheduler_job_id,
pod_generator.make_safe_label_value(str(self.scheduler_job_id)),
)
# pylint: enable=protected-access
kwargs = dict(label_selector=dict_string)
Expand Down Expand Up @@ -603,10 +585,16 @@ def try_adopt_task_instances(self, tis: List[TaskInstance]) -> List[TaskInstance
tis_to_flush = [ti for ti in tis if not ti.external_executor_id]
scheduler_job_ids = [ti.external_executor_id for ti in tis]
pod_ids = {
create_pod_id(dag_id=ti.dag_id, task_id=ti.task_id): ti for ti in tis if ti.external_executor_id
create_pod_id(
dag_id=pod_generator.make_safe_label_value(ti.dag_id),
task_id=pod_generator.make_safe_label_value(ti.task_id),
): ti
for ti in tis
if ti.external_executor_id
}
kube_client: client.CoreV1Api = self.kube_client
for scheduler_job_id in scheduler_job_ids:
scheduler_job_id = pod_generator.make_safe_label_value(str(scheduler_job_id))
kwargs = {'label_selector': f'airflow-worker={scheduler_job_id}'}
pod_list = kube_client.list_namespaced_pod(namespace=self.kube_config.kube_namespace, **kwargs)
for pod in pod_list.items:
Expand All @@ -624,7 +612,9 @@ def adopt_launched_task(self, kube_client, pod, pod_ids: dict):
:param pod_ids: pod_ids we expect to patch.
"""
self.log.info("attempting to adopt pod %s", pod.metadata.name)
pod.metadata.labels['airflow-worker'] = str(self.scheduler_job_id)
pod.metadata.labels['airflow-worker'] = pod_generator.make_safe_label_value(
str(self.scheduler_job_id)
)
dag_id = pod.metadata.labels['dag_id']
task_id = pod.metadata.labels['task_id']
pod_id = create_pod_id(dag_id=dag_id, task_id=task_id)
Expand Down Expand Up @@ -659,7 +649,9 @@ def _adopt_completed_pods(self, kube_client: kubernetes.client.CoreV1Api):
pod_list = kube_client.list_namespaced_pod(namespace=self.kube_config.kube_namespace, **kwargs)
for pod in pod_list.items:
self.log.info("Attempting to adopt pod %s", pod.metadata.name)
pod.metadata.labels['airflow-worker'] = str(self.scheduler_job_id)
pod.metadata.labels['airflow-worker'] = pod_generator.make_safe_label_value(
str(self.scheduler_job_id)
)
try:
kube_client.patch_namespaced_pod(
name=pod.metadata.name,
Expand Down
27 changes: 18 additions & 9 deletions airflow/kubernetes/pod_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@
from airflow.kubernetes.pod_generator_deprecated import PodGenerator as PodGeneratorDeprecated
from airflow.version import version as airflow_version

MAX_POD_ID_LEN = 253

MAX_LABEL_LEN = 63


Expand Down Expand Up @@ -355,7 +353,7 @@ def construct_pod( # pylint: disable=too-many-arguments
pod_override_object: Optional[k8s.V1Pod],
base_worker_pod: k8s.V1Pod,
namespace: str,
scheduler_job_id: str,
scheduler_job_id: int,
) -> k8s.V1Pod:
"""
Construct a pod by gathering and consolidating the configuration from 3 places:
Expand All @@ -370,6 +368,10 @@ def construct_pod( # pylint: disable=too-many-arguments
except Exception: # pylint: disable=W0703
image = kube_image

task_id = make_safe_label_value(task_id)
dag_id = make_safe_label_value(dag_id)
scheduler_job_id = make_safe_label_value(str(scheduler_job_id))

dynamic_pod = k8s.V1Pod(
metadata=k8s.V1ObjectMeta(
namespace=namespace,
Expand All @@ -381,7 +383,7 @@ def construct_pod( # pylint: disable=too-many-arguments
},
name=PodGenerator.make_unique_pod_id(pod_id),
labels={
'airflow-worker': str(scheduler_job_id),
'airflow-worker': scheduler_job_id,
'dag_id': dag_id,
'task_id': task_id,
'execution_date': datetime_to_label_safe_datestring(date),
Expand Down Expand Up @@ -450,20 +452,27 @@ def deserialize_model_dict(pod_dict: dict) -> k8s.V1Pod:
return api_client._ApiClient__deserialize_model(pod_dict, k8s.V1Pod) # pylint: disable=W0212

@staticmethod
def make_unique_pod_id(pod_id):
def make_unique_pod_id(pod_id: str) -> str:
r"""
Kubernetes pod names must be <= 253 chars and must pass the following regex for
validation
Kubernetes pod names must consist of one or more lowercase
rfc1035/rfc1123 labels separated by '.' with a maximum length of 253
characters. Each label has a maximum length of 63 characters.
Name must pass the following regex for validation
``^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$``
For more details, see:
https://github.com/kubernetes/kubernetes/blob/release-1.1/docs/design/identifiers.md
:param pod_id: a dag_id with only alphanumeric characters
:return: ``str`` valid Pod name of appropriate length
"""
if not pod_id:
return None

safe_uuid = uuid.uuid4().hex
safe_pod_id = pod_id[: MAX_POD_ID_LEN - len(safe_uuid) - 1] + "-" + safe_uuid
safe_uuid = uuid.uuid4().hex # safe uuid will always be less than 63 chars
trimmed_pod_id = pod_id[:MAX_LABEL_LEN]
safe_pod_id = f"{trimmed_pod_id}.{safe_uuid}"

return safe_pod_id

Expand Down
2 changes: 1 addition & 1 deletion tests/kubernetes/models/test_secret.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def test_attach_to_pod(self, mock_uuid):
'kind': 'Pod',
'metadata': {
'labels': {'app': 'myapp'},
'name': 'myapp-pod-cf4a56d281014217b0272af6216feb48',
'name': 'myapp-pod.cf4a56d281014217b0272af6216feb48',
'namespace': 'default',
},
'spec': {
Expand Down
52 changes: 49 additions & 3 deletions tests/kubernetes/test_pod_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
import sys
import unittest
import uuid
Expand All @@ -22,6 +23,7 @@
import pytest
from dateutil import parser
from kubernetes.client import ApiClient, models as k8s
from parameterized import parameterized

from airflow import __version__
from airflow.exceptions import AirflowConfigException
Expand Down Expand Up @@ -107,7 +109,7 @@ def setUp(self):
kind="Pod",
metadata=k8s.V1ObjectMeta(
namespace="default",
name='myapp-pod-' + self.static_uuid.hex,
name='myapp-pod.' + self.static_uuid.hex,
labels={'app': 'myapp'},
),
spec=k8s.V1PodSpec(
Expand Down Expand Up @@ -424,7 +426,7 @@ def test_construct_pod(self, mock_uuid):
expected.metadata.labels = self.labels
expected.metadata.labels['app'] = 'myapp'
expected.metadata.annotations = self.annotations
expected.metadata.name = 'pod_id-' + self.static_uuid.hex
expected.metadata.name = 'pod_id.' + self.static_uuid.hex
expected.metadata.namespace = 'test_namespace'
expected.spec.containers[0].args = ['command']
expected.spec.containers[0].image = 'airflow_image'
Expand Down Expand Up @@ -466,14 +468,38 @@ def test_construct_pod_empty_executor_config(self, mock_uuid):
worker_config.metadata.annotations = self.annotations
worker_config.metadata.labels = self.labels
worker_config.metadata.labels['app'] = 'myapp'
worker_config.metadata.name = 'pod_id-' + self.static_uuid.hex
worker_config.metadata.name = 'pod_id.' + self.static_uuid.hex
worker_config.metadata.namespace = 'namespace'
worker_config.spec.containers[0].env.append(
k8s.V1EnvVar(name="AIRFLOW_IS_K8S_EXECUTOR_POD", value='True')
)
worker_config_result = self.k8s_client.sanitize_for_serialization(worker_config)
assert worker_config_result == sanitized_result

@mock.patch('uuid.uuid4')
def test_ensure_max_label_length(self, mock_uuid):
mock_uuid.return_value = self.static_uuid
path = os.path.join(os.path.dirname(__file__), 'pod_generator_base_with_secrets.yaml')
worker_config = PodGenerator.deserialize_model_file(path)

result = PodGenerator.construct_pod(
dag_id='a' * 512,
task_id='a' * 512,
pod_id='a' * 512,
kube_image='a' * 512,
try_number=3,
date=self.execution_date,
args=['command'],
namespace='namespace',
scheduler_job_id='a' * 512,
pod_override_object=None,
base_worker_pod=worker_config,
)

assert result.metadata.name == 'a' * 63 + '.' + self.static_uuid.hex
for _, v in result.metadata.labels.items():
assert len(v) <= 63

def test_merge_objects_empty(self):
annotations = {'foo1': 'bar1'}
base_obj = k8s.V1ObjectMeta(annotations=annotations)
Expand Down Expand Up @@ -607,6 +633,26 @@ def test_deserialize_model_file(self):
sanitized_res = self.k8s_client.sanitize_for_serialization(result)
assert sanitized_res == self.deserialize_result

@parameterized.expand(
(
("max_label_length", "a" * 63),
("max_subdomain_length", "a" * 253),
(
"tiny",
"aaa",
),
)
)
def test_pod_name_confirm_to_max_length(self, _, pod_id):
name = PodGenerator.make_unique_pod_id(pod_id)
assert len(name) <= 253
parts = name.split(".")
if len(pod_id) <= 63:
assert len(parts[0]) == len(pod_id)
else:
assert len(parts[0]) <= 63
assert len(parts[1]) <= 63

def test_deserialize_model_string(self):
fixture = """
apiVersion: v1
Expand Down

0 comments on commit 7b02edd

Please sign in to comment.