From 93a3e81349a9a6c4e40a4ff8c4a76df3f34f0c51 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Fri, 22 Nov 2019 16:51:02 +0000 Subject: [PATCH] [AIRFLOW-6044] Standardize the Code Structure in kube_pod_operator.py (#6639) https://issues.apache.org/jira/browse/AIRFLOW-6044 --- .../operators/kubernetes_pod_operator.py | 146 +++++++++--------- 1 file changed, 73 insertions(+), 73 deletions(-) diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py index 70b4a108a481a4..93f369a6d0f9f4 100644 --- a/airflow/contrib/operators/kubernetes_pod_operator.py +++ b/airflow/contrib/operators/kubernetes_pod_operator.py @@ -122,6 +122,79 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance- """ template_fields = ('cmds', 'arguments', 'env_vars', 'config_file') + @apply_defaults + def __init__(self, # pylint: disable=too-many-arguments,too-many-locals + namespace, + image, + name, + cmds=None, + arguments=None, + ports=None, + volume_mounts=None, + volumes=None, + env_vars=None, + secrets=None, + in_cluster=True, + cluster_context=None, + labels=None, + startup_timeout_seconds=120, + get_logs=True, + image_pull_policy='IfNotPresent', + annotations=None, + resources=None, + affinity=None, + config_file=None, + node_selectors=None, + image_pull_secrets=None, + service_account_name='default', + is_delete_operator_pod=False, + hostnetwork=False, + tolerations=None, + configmaps=None, + security_context=None, + pod_runtime_info_envs=None, + dnspolicy=None, + full_pod_spec=None, + *args, + **kwargs): + if kwargs.get('xcom_push') is not None: + raise AirflowException("'xcom_push' was deprecated, use 'do_xcom_push' instead") + super().__init__(*args, resources=None, **kwargs) + + self.pod = None + + self.image = image + self.namespace = namespace + self.cmds = cmds or [] + self.arguments = arguments or [] + self.labels = labels or {} + self.startup_timeout_seconds = startup_timeout_seconds + self.name = self._set_name(name) + self.env_vars = env_vars or {} + self.ports = ports or [] + self.volume_mounts = volume_mounts or [] + self.volumes = volumes or [] + self.secrets = secrets or [] + self.in_cluster = in_cluster + self.cluster_context = cluster_context + self.get_logs = get_logs + self.image_pull_policy = image_pull_policy + self.node_selectors = node_selectors or {} + self.annotations = annotations or {} + self.affinity = affinity or {} + self.resources = self._set_resources(resources) + self.config_file = config_file + self.image_pull_secrets = image_pull_secrets + self.service_account_name = service_account_name + self.is_delete_operator_pod = is_delete_operator_pod + self.hostnetwork = hostnetwork + self.tolerations = tolerations or [] + self.configmaps = configmaps or [] + self.security_context = security_context or {} + self.pod_runtime_info_envs = pod_runtime_info_envs or [] + self.dnspolicy = dnspolicy + self.full_pod_spec = full_pod_spec + def execute(self, context): try: client = kube_client.get_kube_client(in_cluster=self.in_cluster, @@ -199,76 +272,3 @@ def _set_resources(self, resources): def _set_name(self, name): validate_key(name, max_length=63) return re.sub(r'[^a-z0-9.-]+', '-', name.lower()) - - @apply_defaults - def __init__(self, # pylint: disable=too-many-arguments,too-many-locals - namespace, - image, - name, - cmds=None, - arguments=None, - ports=None, - volume_mounts=None, - volumes=None, - env_vars=None, - secrets=None, - in_cluster=True, - cluster_context=None, - labels=None, - startup_timeout_seconds=120, - get_logs=True, - image_pull_policy='IfNotPresent', - annotations=None, - resources=None, - affinity=None, - config_file=None, - node_selectors=None, - image_pull_secrets=None, - service_account_name='default', - is_delete_operator_pod=False, - hostnetwork=False, - tolerations=None, - configmaps=None, - security_context=None, - pod_runtime_info_envs=None, - dnspolicy=None, - full_pod_spec=None, - *args, - **kwargs): - if kwargs.get('xcom_push') is not None: - raise AirflowException("'xcom_push' was deprecated, use 'do_xcom_push' instead") - super().__init__(*args, resources=None, **kwargs) - - self.pod = None - - self.image = image - self.namespace = namespace - self.cmds = cmds or [] - self.arguments = arguments or [] - self.labels = labels or {} - self.startup_timeout_seconds = startup_timeout_seconds - self.name = self._set_name(name) - self.env_vars = env_vars or {} - self.ports = ports or [] - self.volume_mounts = volume_mounts or [] - self.volumes = volumes or [] - self.secrets = secrets or [] - self.in_cluster = in_cluster - self.cluster_context = cluster_context - self.get_logs = get_logs - self.image_pull_policy = image_pull_policy - self.node_selectors = node_selectors or {} - self.annotations = annotations or {} - self.affinity = affinity or {} - self.resources = self._set_resources(resources) - self.config_file = config_file - self.image_pull_secrets = image_pull_secrets - self.service_account_name = service_account_name - self.is_delete_operator_pod = is_delete_operator_pod - self.hostnetwork = hostnetwork - self.tolerations = tolerations or [] - self.configmaps = configmaps or [] - self.security_context = security_context or {} - self.pod_runtime_info_envs = pod_runtime_info_envs or [] - self.dnspolicy = dnspolicy - self.full_pod_spec = full_pod_spec