Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disconnect GKE operators from deprecated hooks #39434

Merged
merged 2 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 21 additions & 26 deletions airflow/providers/google/cloud/operators/kubernetes_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,8 @@
)
from airflow.providers.cncf.kubernetes.utils.pod_manager import OnFinishAction
from airflow.providers.google.cloud.hooks.kubernetes_engine import (
GKECustomResourceHook,
GKEDeploymentHook,
GKEHook,
GKEJobHook,
GKEKubernetesHook,
GKEPodHook,
)
from airflow.providers.google.cloud.links.kubernetes_engine import (
KubernetesEngineClusterLink,
Expand Down Expand Up @@ -533,27 +529,28 @@ def cluster_hook(self) -> GKEHook:
)

@cached_property
def deployment_hook(self) -> GKEDeploymentHook:
def deployment_hook(self) -> GKEKubernetesHook:
if self._cluster_url is None or self._ssl_ca_cert is None:
raise AttributeError(
"Cluster url and ssl_ca_cert should be defined before using self.hook method. "
"Cluster url and ssl_ca_cert should be defined before using self.deployment_hook method. "
"Try to use self.get_kube_creds method",
)
return GKEDeploymentHook(
return GKEKubernetesHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
cluster_url=self._cluster_url,
ssl_ca_cert=self._ssl_ca_cert,
)

@cached_property
def pod_hook(self) -> GKEPodHook:
def pod_hook(self) -> GKEKubernetesHook:
if self._cluster_url is None or self._ssl_ca_cert is None:
raise AttributeError(
"Cluster url and ssl_ca_cert should be defined before using self.hook method. "
"Cluster url and ssl_ca_cert should be defined before using self.pod_hook method. "
"Try to use self.get_kube_creds method",
)
return GKEPodHook(

return GKEKubernetesHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
cluster_url=self._cluster_url,
Expand Down Expand Up @@ -742,21 +739,20 @@ def cluster_hook(self) -> GKEHook:
)

@cached_property
def hook(self) -> GKEPodHook:
def hook(self) -> GKEKubernetesHook:
if self._cluster_url is None or self._ssl_ca_cert is None:
raise AttributeError(
"Cluster url and ssl_ca_cert should be defined before using self.hook method. "
"Try to use self.get_kube_creds method",
)

hook = GKEPodHook(
return GKEKubernetesHook(
gcp_conn_id=self.gcp_conn_id,
cluster_url=self._cluster_url,
ssl_ca_cert=self._ssl_ca_cert,
impersonation_chain=self.impersonation_chain,
enable_tcp_keepalive=True,
)
return hook

def execute(self, context: Context):
"""Execute process of creating pod and executing provided command inside it."""
Expand Down Expand Up @@ -901,19 +897,18 @@ def cluster_hook(self) -> GKEHook:
)

@cached_property
def hook(self) -> GKEJobHook:
def hook(self) -> GKEKubernetesHook:
if self._cluster_url is None or self._ssl_ca_cert is None:
raise AttributeError(
"Cluster url and ssl_ca_cert should be defined before using self.hook method. "
"Try to use self.get_kube_creds method",
)

hook = GKEJobHook(
return GKEKubernetesHook(
gcp_conn_id=self.gcp_conn_id,
cluster_url=self._cluster_url,
ssl_ca_cert=self._ssl_ca_cert,
)
return hook

def execute(self, context: Context):
"""Execute process of creating Job."""
Expand Down Expand Up @@ -1027,15 +1022,15 @@ def cluster_hook(self) -> GKEHook:
)

@cached_property
def hook(self) -> GKEJobHook:
def hook(self) -> GKEKubernetesHook:
self._cluster_url, self._ssl_ca_cert = GKEClusterAuthDetails(
cluster_name=self.cluster_name,
project_id=self.project_id,
use_internal_ip=self.use_internal_ip,
cluster_hook=self.cluster_hook,
).fetch_cluster_info()

return GKEJobHook(
return GKEKubernetesHook(
gcp_conn_id=self.gcp_conn_id,
cluster_url=self._cluster_url,
ssl_ca_cert=self._ssl_ca_cert,
Expand Down Expand Up @@ -1128,15 +1123,15 @@ def cluster_hook(self) -> GKEHook:
)

@cached_property
def hook(self) -> GKEJobHook:
def hook(self) -> GKEKubernetesHook:
self._cluster_url, self._ssl_ca_cert = GKEClusterAuthDetails(
cluster_name=self.cluster_name,
project_id=self.project_id,
use_internal_ip=self.use_internal_ip,
cluster_hook=self.cluster_hook,
).fetch_cluster_info()

return GKEJobHook(
return GKEKubernetesHook(
gcp_conn_id=self.gcp_conn_id,
cluster_url=self._cluster_url,
ssl_ca_cert=self._ssl_ca_cert,
Expand Down Expand Up @@ -1234,13 +1229,13 @@ def cluster_hook(self) -> GKEHook:
)

@cached_property
def hook(self) -> GKECustomResourceHook:
def hook(self) -> GKEKubernetesHook:
if self._cluster_url is None or self._ssl_ca_cert is None:
raise AttributeError(
"Cluster url and ssl_ca_cert should be defined before using self.hook method. "
"Try to use self.get_kube_creds method",
)
return GKECustomResourceHook(
return GKEKubernetesHook(
gcp_conn_id=self.gcp_conn_id,
cluster_url=self._cluster_url,
ssl_ca_cert=self._ssl_ca_cert,
Expand Down Expand Up @@ -1336,13 +1331,13 @@ def cluster_hook(self) -> GKEHook:
)

@cached_property
def hook(self) -> GKECustomResourceHook:
def hook(self) -> GKEKubernetesHook:
if self._cluster_url is None or self._ssl_ca_cert is None:
raise AttributeError(
"Cluster url and ssl_ca_cert should be defined before using self.hook method. "
"Try to use self.get_kube_creds method",
)
return GKECustomResourceHook(
return GKEKubernetesHook(
gcp_conn_id=self.gcp_conn_id,
cluster_url=self._cluster_url,
ssl_ca_cert=self._ssl_ca_cert,
Expand Down Expand Up @@ -1475,14 +1470,14 @@ def cluster_hook(self) -> GKEHook:
)

@cached_property
def hook(self) -> GKEJobHook:
def hook(self) -> GKEKubernetesHook:
if self._cluster_url is None or self._ssl_ca_cert is None:
raise AttributeError(
"Cluster url and ssl_ca_cert should be defined before using self.hook method. "
"Try to use self.get_kube_creds method",
)

return GKEJobHook(
return GKEKubernetesHook(
gcp_conn_id=self.gcp_conn_id,
cluster_url=self._cluster_url,
ssl_ca_cert=self._ssl_ca_cert,
Expand Down
5 changes: 2 additions & 3 deletions airflow/providers/google/cloud/triggers/kubernetes_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
from airflow.providers.google.cloud.hooks.kubernetes_engine import (
GKEAsyncHook,
GKEKubernetesAsyncHook,
GKEPodAsyncHook,
)
from airflow.triggers.base import BaseTrigger, TriggerEvent

Expand Down Expand Up @@ -147,8 +146,8 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
)

@cached_property
def hook(self) -> GKEPodAsyncHook: # type: ignore[override]
return GKEPodAsyncHook(
def hook(self) -> GKEKubernetesAsyncHook: # type: ignore[override]
return GKEKubernetesAsyncHook(
cluster_url=self._cluster_url,
ssl_ca_cert=self._ssl_ca_cert,
gcp_conn_id=self.gcp_conn_id,
Expand Down
11 changes: 0 additions & 11 deletions tests/deprecations_ignore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -517,25 +517,14 @@
- tests/providers/google/cloud/operators/test_dataproc.py::test_scale_cluster_operator_extra_links
- tests/providers/google/cloud/operators/test_dataproc.py::test_submit_spark_job_operator_extra_links
- tests/providers/google/cloud/operators/test_gcs.py::TestGoogleCloudStorageListOperator::test_execute__delimiter
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEDeleteJobOperator::test_default_gcp_conn_id
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEDeleteJobOperator::test_gcp_conn_id
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEDescribeJobOperator::test_default_gcp_conn_id
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEDescribeJobOperator::test_gcp_conn_id
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_cluster_info
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_config_file_throws_error
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_default_gcp_conn_id
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_execute
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_execute_with_impersonation_service_account
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_execute_with_impersonation_service_chain_one_element
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_gcp_conn_id
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_on_finish_action_handler
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_template_fields
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperatorAsync::test_async_create_pod_should_execute_successfully
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEStartJobOperator::test_default_gcp_conn_id
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEStartJobOperator::test_gcp_conn_id
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEStartKueueInsideClusterOperator::test_execute
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEStartKueueJobOperator::test_default_gcp_conn_id
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEStartKueueJobOperator::test_gcp_conn_id
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGoogleCloudPlatformContainerOperator::test_create_execute
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGoogleCloudPlatformContainerOperator::test_create_execute_call_defer_method
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGoogleCloudPlatformContainerOperator::test_create_execute_error_body
Expand Down
22 changes: 10 additions & 12 deletions tests/providers/google/cloud/operators/test_kubernetes_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,7 @@
KUB_OP_PATH = "airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.{}"
GKE_HOOK_MODULE_PATH = "airflow.providers.google.cloud.operators.kubernetes_engine"
GKE_HOOK_PATH = f"{GKE_HOOK_MODULE_PATH}.GKEHook"
GKE_POD_HOOK_PATH = f"{GKE_HOOK_MODULE_PATH}.GKEPodHook"
GKE_DEPLOYMENT_HOOK_PATH = f"{GKE_HOOK_MODULE_PATH}.GKEDeploymentHook"
GKE_JOB_HOOK_PATH = f"{GKE_HOOK_MODULE_PATH}.GKEJobHook"
GKE_KUBERNETES_HOOK = f"{GKE_HOOK_MODULE_PATH}.GKEKubernetesHook"
GKE_K8S_HOOK_PATH = f"{GKE_HOOK_MODULE_PATH}.GKEKubernetesHook"
KUB_OPERATOR_EXEC = "airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.execute"
KUB_JOB_OPERATOR_EXEC = "airflow.providers.cncf.kubernetes.operators.job.KubernetesJobOperator.execute"
Expand Down Expand Up @@ -502,8 +500,8 @@ def setup_test(self):
@mock.patch(TEMP_FILE)
@mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info")
@mock.patch(GKE_HOOK_PATH)
@mock.patch(f"{GKE_DEPLOYMENT_HOOK_PATH}.check_kueue_deployment_running")
@mock.patch(GKE_POD_HOOK_PATH)
@mock.patch(f"{GKE_KUBERNETES_HOOK}.check_kueue_deployment_running")
@mock.patch(GKE_KUBERNETES_HOOK)
def test_execute(self, mock_pod_hook, mock_deployment, mock_hook, fetch_cluster_info_mock, file_mock):
mock_pod_hook.return_value.apply_from_yaml_file.side_effect = mock.MagicMock()
fetch_cluster_info_mock.return_value = (CLUSTER_URL, SSL_CA_CERT)
Expand All @@ -515,9 +513,9 @@ def test_execute(self, mock_pod_hook, mock_deployment, mock_hook, fetch_cluster_
@mock.patch.dict(os.environ, {})
@mock.patch(TEMP_FILE)
@mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info")
@mock.patch(GKE_DEPLOYMENT_HOOK_PATH)
@mock.patch(GKE_KUBERNETES_HOOK)
@mock.patch(GKE_HOOK_PATH)
@mock.patch(GKE_POD_HOOK_PATH)
@mock.patch(GKE_KUBERNETES_HOOK)
def test_execute_autoscaled_cluster(
self, mock_pod_hook, mock_hook, mock_depl_hook, fetch_cluster_info_mock, file_mock, caplog
):
Expand All @@ -534,7 +532,7 @@ def test_execute_autoscaled_cluster(
@mock.patch(TEMP_FILE)
@mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info")
@mock.patch(GKE_HOOK_PATH)
@mock.patch(GKE_POD_HOOK_PATH)
@mock.patch(GKE_KUBERNETES_HOOK)
def test_execute_autoscaled_cluster_check_error(
self, mock_pod_hook, mock_hook, fetch_cluster_info_mock, file_mock, caplog
):
Expand All @@ -550,7 +548,7 @@ def test_execute_autoscaled_cluster_check_error(
@mock.patch(TEMP_FILE)
@mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info")
@mock.patch(GKE_HOOK_PATH)
@mock.patch(GKE_POD_HOOK_PATH)
@mock.patch(GKE_KUBERNETES_HOOK)
def test_execute_non_autoscaled_cluster_check_error(
self, mock_pod_hook, mock_hook, fetch_cluster_info_mock, file_mock, caplog
):
Expand Down Expand Up @@ -916,7 +914,7 @@ def setup_method(self):
@mock.patch(TEMP_FILE)
@mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info")
@mock.patch(GKE_HOOK_PATH)
@mock.patch(GKE_JOB_HOOK_PATH)
@mock.patch(GKE_KUBERNETES_HOOK)
def test_execute(self, mock_job_hook, mock_hook, fetch_cluster_info_mock, file_mock):
mock_job_hook.return_value.get_job.return_value = mock.MagicMock()
fetch_cluster_info_mock.return_value = (CLUSTER_URL, SSL_CA_CERT)
Expand All @@ -931,7 +929,7 @@ def test_execute(self, mock_job_hook, mock_hook, fetch_cluster_info_mock, file_m
@mock.patch(TEMP_FILE)
@mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info")
@mock.patch(GKE_HOOK_PATH)
@mock.patch(GKE_JOB_HOOK_PATH)
@mock.patch(GKE_KUBERNETES_HOOK)
def test_execute_with_impersonation_service_account(
self, mock_job_hook, mock_hook, fetch_cluster_info_mock, file_mock, get_con_mock
):
Expand All @@ -949,7 +947,7 @@ def test_execute_with_impersonation_service_account(
@mock.patch(TEMP_FILE)
@mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info")
@mock.patch(GKE_HOOK_PATH)
@mock.patch(GKE_JOB_HOOK_PATH)
@mock.patch(GKE_KUBERNETES_HOOK)
def test_execute_with_impersonation_service_chain_one_element(
self, mock_job_hook, mock_hook, fetch_cluster_info_mock, file_mock, get_con_mock
):
Expand Down