-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixes PodMutationHook for backwards compatibility #9903
Conversation
airflow/config_templates/config.yml
Outdated
@@ -2173,6 +2173,13 @@ | |||
type: string | |||
example: ~ | |||
default: "" | |||
- name: pod_mutation_using_k8s_pod |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we will need to support both for 1.10.12 since some of the users have already updated their pod_mutation_hook
to use k8s.V1Pod. So instead of a config we can infer the type of the Pod object and use one or the other accordingly
We could just check the type of the Pod that is passed, example:
if isinstance(k8s.V1Pod):
# Use Pod from k8s api
....
else:
# Use Pod from Airflow POD
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kaxil I tried to figure that out but... here's where it gets tricky.
We now need to determine what type to send INTO the pod_mutation_hook function. If a user is using an existing pod_mutation_hook Pod object, then they need to RECIEVE the Pod instead of a Kubernetes V1Pod otherwise the function will break. Does that make sense? If this were a typed language we'd need BOTH of these functions
def pod_mutation_hook(pod: k8s.V1Pod) -> k8s.V1Pod
AND
def pod_mutation_hook(pod: airflow.kubernetes.pod.Pod) -> airflow.kubernetes.pod.Pod
I'm not sure how to handle that other than setting a flag.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But based on the following can we not send the type that we receive?
if isinstance(k8s.V1Pod):
# Use Pod from k8s API
pod_mutation_hook(pod)
....
else:
# Use Pod from Airflow POD
pod_mutation_hook(pod)
If it is just for TypeHints, we can avoid typehints as Python2.7 does not work with it or use Union[k8s.V1Pod, airflow.kubernetes.pod.Pod]
def pod_mutation_hook(pod: Union[k8s.V1Pod, airflow.kubernetes.pod.Pod]) -> Union[k8s.V1Pod, airflow.kubernetes.pod.Pod]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://docs.python.org/3/library/typing.html#typing.Union
Union type; Union[X, Y] means either X or Y.
To define a union, use e.g. Union[int, str].
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand what you mean now, apologies, checking to see if we can avoid the situation at all
airflow/kubernetes/pod_launcher.py
Outdated
import airflow.configuration as conf | ||
if conf.get("kubernetes", "pod_mutation_using_k8s_pod"): | ||
pod_mutation_hook(pod) | ||
else: | ||
from airflow.kubernetes.pod import Pod | ||
from airflow.kubernetes.pod_generator import PodGenerator | ||
dummy_pod = Pod(image="", envs={}, cmds=[]) | ||
pod_mutation_hook(dummy_pod) | ||
dummy_pod = dummy_pod.to_v1_kubernetes_pod() | ||
PodGenerator.reconcile_pods(pod, dummy_pod) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as above:
I think we will need to support both for 1.10.12 since some of the users have already updated their pod_mutation_hook
to use k8s.V1Pod. So instead of a config we can infer the type of the Pod object and use one or the other accordingly
We could just check the type of the Pod that is passed, example:
if isinstance(k8s.V1Pod):
# Use Pod from k8s api
....
else:
# Use Pod from Airflow POD
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change in PR won't work if we have the following in our airflow_local_settings.py
(We have this for Astronomer- Airflow chart) since the pod labels would show up as {}
:
def pod_mutation_hook(pod: Pod):
extra_labels = {
"kubernetes_executor": "False",
"kubernetes_pod_operator": "False"
}
if 'airflow-worker' in pod.labels.keys() or \
conf.get('core', 'EXECUTOR') == "KubernetesExecutor":
extra_labels["kubernetes_executor"] = "True"
else:
extra_labels["kubernetes_pod_operator"] = "True"
ba5f317
to
7457e15
Compare
7457e15
to
84fa04a
Compare
airflow/kubernetes/pod_launcher.py
Outdated
dummy_pod = dummy_pod.to_v1_kubernetes_pod() | ||
PodGenerator.reconcile_pods(pod, dummy_pod) | ||
except AttributeError: | ||
pod_mutation_hook(pod) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know this is for 1.10 but maybe we should consider adding a PendingDeprecationWarning
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup creating a separate PR for that and adding all the old paths back with deprecation warning
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created : #10067
@dimberman -- I have refactored and fixed incorrct logic quite a bit, please take a look and let me know wdyt (although I have merged it based on my tests) -- especially if you see something is completely out of place |
Co-authored-by: Daniel Imberman <[email protected]> Co-authored-by: Kaxil Naik <[email protected]>
Co-authored-by: Daniel Imberman <[email protected]> Co-authored-by: Kaxil Naik <[email protected]>
Users were unable to use the traditional pod_mutation_hook due to breaking changes in 1.10.11. This fix allows both new and old forms
Make sure to mark the boxes below before creating PR: [x]
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.
Read the Pull Request Guidelines for more information.