diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py index 76ff8976627fc7..577634284af2a0 100644 --- a/airflow/contrib/operators/kubernetes_pod_operator.py +++ b/airflow/contrib/operators/kubernetes_pod_operator.py @@ -113,72 +113,6 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance- """ template_fields = ('cmds', 'arguments', 'env_vars', 'config_file') - def execute(self, context): - try: - client = kube_client.get_kube_client(in_cluster=self.in_cluster, - cluster_context=self.cluster_context, - config_file=self.config_file) - gen = pod_generator.PodGenerator() - - for port in self.ports: - gen.add_port(port) - for mount in self.volume_mounts: - gen.add_mount(mount) - for volume in self.volumes: - gen.add_volume(volume) - - pod = gen.make_pod( - namespace=self.namespace, - image=self.image, - pod_id=self.name, - cmds=self.cmds, - arguments=self.arguments, - labels=self.labels, - ) - - pod.service_account_name = self.service_account_name - pod.secrets = self.secrets - pod.envs = self.env_vars - pod.image_pull_policy = self.image_pull_policy - pod.image_pull_secrets = self.image_pull_secrets - pod.annotations = self.annotations - pod.resources = self.resources - pod.affinity = self.affinity - pod.node_selectors = self.node_selectors - pod.hostnetwork = self.hostnetwork - pod.tolerations = self.tolerations - pod.configmaps = self.configmaps - pod.security_context = self.security_context - pod.pod_runtime_info_envs = self.pod_runtime_info_envs - pod.dnspolicy = self.dnspolicy - - launcher = pod_launcher.PodLauncher(kube_client=client, - extract_xcom=self.do_xcom_push) - try: - (final_state, result) = launcher.run_pod( - pod, - startup_timeout=self.startup_timeout_seconds, - get_logs=self.get_logs) - finally: - if self.is_delete_operator_pod: - launcher.delete_pod(pod) - - if final_state != State.SUCCESS: - raise AirflowException( - 'Pod returned a failure: {state}'.format(state=final_state) - ) - if self.do_xcom_push: - return result - except AirflowException as ex: - raise AirflowException('Pod Launching failed: {error}'.format(error=ex)) - - def _set_resources(self, resources): - return Resources(**resources) if resources else Resources() - - def _set_name(self, name): - validate_key(name, max_length=63) - return re.sub(r'[^a-z0-9.-]+', '-', name.lower()) - @apply_defaults def __init__(self, # pylint: disable=too-many-arguments,too-many-locals namespace, @@ -251,3 +185,69 @@ def __init__(self, # pylint: disable=too-many-arguments,too-many-locals self.security_context = security_context or {} self.pod_runtime_info_envs = pod_runtime_info_envs or [] self.dnspolicy = dnspolicy + + def execute(self, context): + try: + client = kube_client.get_kube_client(in_cluster=self.in_cluster, + cluster_context=self.cluster_context, + config_file=self.config_file) + gen = pod_generator.PodGenerator() + + for port in self.ports: + gen.add_port(port) + for mount in self.volume_mounts: + gen.add_mount(mount) + for volume in self.volumes: + gen.add_volume(volume) + + pod = gen.make_pod( + namespace=self.namespace, + image=self.image, + pod_id=self.name, + cmds=self.cmds, + arguments=self.arguments, + labels=self.labels, + ) + + pod.service_account_name = self.service_account_name + pod.secrets = self.secrets + pod.envs = self.env_vars + pod.image_pull_policy = self.image_pull_policy + pod.image_pull_secrets = self.image_pull_secrets + pod.annotations = self.annotations + pod.resources = self.resources + pod.affinity = self.affinity + pod.node_selectors = self.node_selectors + pod.hostnetwork = self.hostnetwork + pod.tolerations = self.tolerations + pod.configmaps = self.configmaps + pod.security_context = self.security_context + pod.pod_runtime_info_envs = self.pod_runtime_info_envs + pod.dnspolicy = self.dnspolicy + + launcher = pod_launcher.PodLauncher(kube_client=client, + extract_xcom=self.do_xcom_push) + try: + (final_state, result) = launcher.run_pod( + pod, + startup_timeout=self.startup_timeout_seconds, + get_logs=self.get_logs) + finally: + if self.is_delete_operator_pod: + launcher.delete_pod(pod) + + if final_state != State.SUCCESS: + raise AirflowException( + 'Pod returned a failure: {state}'.format(state=final_state) + ) + if self.do_xcom_push: + return result + except AirflowException as ex: + raise AirflowException('Pod Launching failed: {error}'.format(error=ex)) + + def _set_resources(self, resources): + return Resources(**resources) if resources else Resources() + + def _set_name(self, name): + validate_key(name, max_length=63) + return re.sub(r'[^a-z0-9.-]+', '-', name.lower())