Skip to content

Commit

Permalink
[AIRFLOW-6044] Standardize the Code Structure in kube_pod_operator.py (
Browse files Browse the repository at this point in the history
  • Loading branch information
kaxil authored and eladkal committed Dec 2, 2019
1 parent 116e09e commit cfdd60e
Showing 1 changed file with 66 additions and 66 deletions.
132 changes: 66 additions & 66 deletions airflow/contrib/operators/kubernetes_pod_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,72 +113,6 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-
"""
template_fields = ('cmds', 'arguments', 'env_vars', 'config_file')

def execute(self, context):
try:
client = kube_client.get_kube_client(in_cluster=self.in_cluster,
cluster_context=self.cluster_context,
config_file=self.config_file)
gen = pod_generator.PodGenerator()

for port in self.ports:
gen.add_port(port)
for mount in self.volume_mounts:
gen.add_mount(mount)
for volume in self.volumes:
gen.add_volume(volume)

pod = gen.make_pod(
namespace=self.namespace,
image=self.image,
pod_id=self.name,
cmds=self.cmds,
arguments=self.arguments,
labels=self.labels,
)

pod.service_account_name = self.service_account_name
pod.secrets = self.secrets
pod.envs = self.env_vars
pod.image_pull_policy = self.image_pull_policy
pod.image_pull_secrets = self.image_pull_secrets
pod.annotations = self.annotations
pod.resources = self.resources
pod.affinity = self.affinity
pod.node_selectors = self.node_selectors
pod.hostnetwork = self.hostnetwork
pod.tolerations = self.tolerations
pod.configmaps = self.configmaps
pod.security_context = self.security_context
pod.pod_runtime_info_envs = self.pod_runtime_info_envs
pod.dnspolicy = self.dnspolicy

launcher = pod_launcher.PodLauncher(kube_client=client,
extract_xcom=self.do_xcom_push)
try:
(final_state, result) = launcher.run_pod(
pod,
startup_timeout=self.startup_timeout_seconds,
get_logs=self.get_logs)
finally:
if self.is_delete_operator_pod:
launcher.delete_pod(pod)

if final_state != State.SUCCESS:
raise AirflowException(
'Pod returned a failure: {state}'.format(state=final_state)
)
if self.do_xcom_push:
return result
except AirflowException as ex:
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))

def _set_resources(self, resources):
return Resources(**resources) if resources else Resources()

def _set_name(self, name):
validate_key(name, max_length=63)
return re.sub(r'[^a-z0-9.-]+', '-', name.lower())

@apply_defaults
def __init__(self, # pylint: disable=too-many-arguments,too-many-locals
namespace,
Expand Down Expand Up @@ -251,3 +185,69 @@ def __init__(self, # pylint: disable=too-many-arguments,too-many-locals
self.security_context = security_context or {}
self.pod_runtime_info_envs = pod_runtime_info_envs or []
self.dnspolicy = dnspolicy

def execute(self, context):
try:
client = kube_client.get_kube_client(in_cluster=self.in_cluster,
cluster_context=self.cluster_context,
config_file=self.config_file)
gen = pod_generator.PodGenerator()

for port in self.ports:
gen.add_port(port)
for mount in self.volume_mounts:
gen.add_mount(mount)
for volume in self.volumes:
gen.add_volume(volume)

pod = gen.make_pod(
namespace=self.namespace,
image=self.image,
pod_id=self.name,
cmds=self.cmds,
arguments=self.arguments,
labels=self.labels,
)

pod.service_account_name = self.service_account_name
pod.secrets = self.secrets
pod.envs = self.env_vars
pod.image_pull_policy = self.image_pull_policy
pod.image_pull_secrets = self.image_pull_secrets
pod.annotations = self.annotations
pod.resources = self.resources
pod.affinity = self.affinity
pod.node_selectors = self.node_selectors
pod.hostnetwork = self.hostnetwork
pod.tolerations = self.tolerations
pod.configmaps = self.configmaps
pod.security_context = self.security_context
pod.pod_runtime_info_envs = self.pod_runtime_info_envs
pod.dnspolicy = self.dnspolicy

launcher = pod_launcher.PodLauncher(kube_client=client,
extract_xcom=self.do_xcom_push)
try:
(final_state, result) = launcher.run_pod(
pod,
startup_timeout=self.startup_timeout_seconds,
get_logs=self.get_logs)
finally:
if self.is_delete_operator_pod:
launcher.delete_pod(pod)

if final_state != State.SUCCESS:
raise AirflowException(
'Pod returned a failure: {state}'.format(state=final_state)
)
if self.do_xcom_push:
return result
except AirflowException as ex:
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))

def _set_resources(self, resources):
return Resources(**resources) if resources else Resources()

def _set_name(self, name):
validate_key(name, max_length=63)
return re.sub(r'[^a-z0-9.-]+', '-', name.lower())

0 comments on commit cfdd60e

Please sign in to comment.