diff --git a/airflow/providers/google/cloud/operators/kubernetes_engine.py b/airflow/providers/google/cloud/operators/kubernetes_engine.py index f8766224280484..0a28809dd1597b 100644 --- a/airflow/providers/google/cloud/operators/kubernetes_engine.py +++ b/airflow/providers/google/cloud/operators/kubernetes_engine.py @@ -42,12 +42,8 @@ ) from airflow.providers.cncf.kubernetes.utils.pod_manager import OnFinishAction from airflow.providers.google.cloud.hooks.kubernetes_engine import ( - GKECustomResourceHook, - GKEDeploymentHook, GKEHook, - GKEJobHook, GKEKubernetesHook, - GKEPodHook, ) from airflow.providers.google.cloud.links.kubernetes_engine import ( KubernetesEngineClusterLink, @@ -533,13 +529,13 @@ def cluster_hook(self) -> GKEHook: ) @cached_property - def deployment_hook(self) -> GKEDeploymentHook: + def deployment_hook(self) -> GKEKubernetesHook: if self._cluster_url is None or self._ssl_ca_cert is None: raise AttributeError( - "Cluster url and ssl_ca_cert should be defined before using self.hook method. " + "Cluster url and ssl_ca_cert should be defined before using self.deployment_hook method. " "Try to use self.get_kube_creds method", ) - return GKEDeploymentHook( + return GKEKubernetesHook( gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain, cluster_url=self._cluster_url, @@ -547,13 +543,14 @@ def deployment_hook(self) -> GKEDeploymentHook: ) @cached_property - def pod_hook(self) -> GKEPodHook: + def pod_hook(self) -> GKEKubernetesHook: if self._cluster_url is None or self._ssl_ca_cert is None: raise AttributeError( - "Cluster url and ssl_ca_cert should be defined before using self.hook method. " + "Cluster url and ssl_ca_cert should be defined before using self.pod_hook method. " "Try to use self.get_kube_creds method", ) - return GKEPodHook( + + return GKEKubernetesHook( gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain, cluster_url=self._cluster_url, @@ -742,21 +739,20 @@ def cluster_hook(self) -> GKEHook: ) @cached_property - def hook(self) -> GKEPodHook: + def hook(self) -> GKEKubernetesHook: if self._cluster_url is None or self._ssl_ca_cert is None: raise AttributeError( "Cluster url and ssl_ca_cert should be defined before using self.hook method. " "Try to use self.get_kube_creds method", ) - hook = GKEPodHook( + return GKEKubernetesHook( gcp_conn_id=self.gcp_conn_id, cluster_url=self._cluster_url, ssl_ca_cert=self._ssl_ca_cert, impersonation_chain=self.impersonation_chain, enable_tcp_keepalive=True, ) - return hook def execute(self, context: Context): """Execute process of creating pod and executing provided command inside it.""" @@ -901,19 +897,18 @@ def cluster_hook(self) -> GKEHook: ) @cached_property - def hook(self) -> GKEJobHook: + def hook(self) -> GKEKubernetesHook: if self._cluster_url is None or self._ssl_ca_cert is None: raise AttributeError( "Cluster url and ssl_ca_cert should be defined before using self.hook method. " "Try to use self.get_kube_creds method", ) - hook = GKEJobHook( + return GKEKubernetesHook( gcp_conn_id=self.gcp_conn_id, cluster_url=self._cluster_url, ssl_ca_cert=self._ssl_ca_cert, ) - return hook def execute(self, context: Context): """Execute process of creating Job.""" @@ -1027,7 +1022,7 @@ def cluster_hook(self) -> GKEHook: ) @cached_property - def hook(self) -> GKEJobHook: + def hook(self) -> GKEKubernetesHook: self._cluster_url, self._ssl_ca_cert = GKEClusterAuthDetails( cluster_name=self.cluster_name, project_id=self.project_id, @@ -1035,7 +1030,7 @@ def hook(self) -> GKEJobHook: cluster_hook=self.cluster_hook, ).fetch_cluster_info() - return GKEJobHook( + return GKEKubernetesHook( gcp_conn_id=self.gcp_conn_id, cluster_url=self._cluster_url, ssl_ca_cert=self._ssl_ca_cert, @@ -1128,7 +1123,7 @@ def cluster_hook(self) -> GKEHook: ) @cached_property - def hook(self) -> GKEJobHook: + def hook(self) -> GKEKubernetesHook: self._cluster_url, self._ssl_ca_cert = GKEClusterAuthDetails( cluster_name=self.cluster_name, project_id=self.project_id, @@ -1136,7 +1131,7 @@ def hook(self) -> GKEJobHook: cluster_hook=self.cluster_hook, ).fetch_cluster_info() - return GKEJobHook( + return GKEKubernetesHook( gcp_conn_id=self.gcp_conn_id, cluster_url=self._cluster_url, ssl_ca_cert=self._ssl_ca_cert, @@ -1234,13 +1229,13 @@ def cluster_hook(self) -> GKEHook: ) @cached_property - def hook(self) -> GKECustomResourceHook: + def hook(self) -> GKEKubernetesHook: if self._cluster_url is None or self._ssl_ca_cert is None: raise AttributeError( "Cluster url and ssl_ca_cert should be defined before using self.hook method. " "Try to use self.get_kube_creds method", ) - return GKECustomResourceHook( + return GKEKubernetesHook( gcp_conn_id=self.gcp_conn_id, cluster_url=self._cluster_url, ssl_ca_cert=self._ssl_ca_cert, @@ -1336,13 +1331,13 @@ def cluster_hook(self) -> GKEHook: ) @cached_property - def hook(self) -> GKECustomResourceHook: + def hook(self) -> GKEKubernetesHook: if self._cluster_url is None or self._ssl_ca_cert is None: raise AttributeError( "Cluster url and ssl_ca_cert should be defined before using self.hook method. " "Try to use self.get_kube_creds method", ) - return GKECustomResourceHook( + return GKEKubernetesHook( gcp_conn_id=self.gcp_conn_id, cluster_url=self._cluster_url, ssl_ca_cert=self._ssl_ca_cert, @@ -1475,14 +1470,14 @@ def cluster_hook(self) -> GKEHook: ) @cached_property - def hook(self) -> GKEJobHook: + def hook(self) -> GKEKubernetesHook: if self._cluster_url is None or self._ssl_ca_cert is None: raise AttributeError( "Cluster url and ssl_ca_cert should be defined before using self.hook method. " "Try to use self.get_kube_creds method", ) - return GKEJobHook( + return GKEKubernetesHook( gcp_conn_id=self.gcp_conn_id, cluster_url=self._cluster_url, ssl_ca_cert=self._ssl_ca_cert, diff --git a/airflow/providers/google/cloud/triggers/kubernetes_engine.py b/airflow/providers/google/cloud/triggers/kubernetes_engine.py index c0d8fef97a0633..8557bea0824108 100644 --- a/airflow/providers/google/cloud/triggers/kubernetes_engine.py +++ b/airflow/providers/google/cloud/triggers/kubernetes_engine.py @@ -30,7 +30,6 @@ from airflow.providers.google.cloud.hooks.kubernetes_engine import ( GKEAsyncHook, GKEKubernetesAsyncHook, - GKEPodAsyncHook, ) from airflow.triggers.base import BaseTrigger, TriggerEvent @@ -147,8 +146,8 @@ def serialize(self) -> tuple[str, dict[str, Any]]: ) @cached_property - def hook(self) -> GKEPodAsyncHook: # type: ignore[override] - return GKEPodAsyncHook( + def hook(self) -> GKEKubernetesAsyncHook: # type: ignore[override] + return GKEKubernetesAsyncHook( cluster_url=self._cluster_url, ssl_ca_cert=self._ssl_ca_cert, gcp_conn_id=self.gcp_conn_id, diff --git a/tests/deprecations_ignore.yml b/tests/deprecations_ignore.yml index f1fafcca73cff1..91c05b3bd212d7 100644 --- a/tests/deprecations_ignore.yml +++ b/tests/deprecations_ignore.yml @@ -517,25 +517,14 @@ - tests/providers/google/cloud/operators/test_dataproc.py::test_scale_cluster_operator_extra_links - tests/providers/google/cloud/operators/test_dataproc.py::test_submit_spark_job_operator_extra_links - tests/providers/google/cloud/operators/test_gcs.py::TestGoogleCloudStorageListOperator::test_execute__delimiter -- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEDeleteJobOperator::test_default_gcp_conn_id -- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEDeleteJobOperator::test_gcp_conn_id -- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEDescribeJobOperator::test_default_gcp_conn_id -- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEDescribeJobOperator::test_gcp_conn_id - tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_cluster_info - tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_config_file_throws_error - tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_default_gcp_conn_id - tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_execute -- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_execute_with_impersonation_service_account -- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_execute_with_impersonation_service_chain_one_element - tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_gcp_conn_id - tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_on_finish_action_handler - tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_template_fields - tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperatorAsync::test_async_create_pod_should_execute_successfully -- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEStartJobOperator::test_default_gcp_conn_id -- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEStartJobOperator::test_gcp_conn_id -- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEStartKueueInsideClusterOperator::test_execute -- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEStartKueueJobOperator::test_default_gcp_conn_id -- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEStartKueueJobOperator::test_gcp_conn_id - tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGoogleCloudPlatformContainerOperator::test_create_execute - tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGoogleCloudPlatformContainerOperator::test_create_execute_call_defer_method - tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGoogleCloudPlatformContainerOperator::test_create_execute_error_body diff --git a/tests/providers/google/cloud/operators/test_kubernetes_engine.py b/tests/providers/google/cloud/operators/test_kubernetes_engine.py index 05c7bf24a7ec8a..c71cc99c7e64b7 100644 --- a/tests/providers/google/cloud/operators/test_kubernetes_engine.py +++ b/tests/providers/google/cloud/operators/test_kubernetes_engine.py @@ -81,9 +81,7 @@ KUB_OP_PATH = "airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.{}" GKE_HOOK_MODULE_PATH = "airflow.providers.google.cloud.operators.kubernetes_engine" GKE_HOOK_PATH = f"{GKE_HOOK_MODULE_PATH}.GKEHook" -GKE_POD_HOOK_PATH = f"{GKE_HOOK_MODULE_PATH}.GKEPodHook" -GKE_DEPLOYMENT_HOOK_PATH = f"{GKE_HOOK_MODULE_PATH}.GKEDeploymentHook" -GKE_JOB_HOOK_PATH = f"{GKE_HOOK_MODULE_PATH}.GKEJobHook" +GKE_KUBERNETES_HOOK = f"{GKE_HOOK_MODULE_PATH}.GKEKubernetesHook" GKE_K8S_HOOK_PATH = f"{GKE_HOOK_MODULE_PATH}.GKEKubernetesHook" KUB_OPERATOR_EXEC = "airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.execute" KUB_JOB_OPERATOR_EXEC = "airflow.providers.cncf.kubernetes.operators.job.KubernetesJobOperator.execute" @@ -502,8 +500,8 @@ def setup_test(self): @mock.patch(TEMP_FILE) @mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info") @mock.patch(GKE_HOOK_PATH) - @mock.patch(f"{GKE_DEPLOYMENT_HOOK_PATH}.check_kueue_deployment_running") - @mock.patch(GKE_POD_HOOK_PATH) + @mock.patch(f"{GKE_KUBERNETES_HOOK}.check_kueue_deployment_running") + @mock.patch(GKE_KUBERNETES_HOOK) def test_execute(self, mock_pod_hook, mock_deployment, mock_hook, fetch_cluster_info_mock, file_mock): mock_pod_hook.return_value.apply_from_yaml_file.side_effect = mock.MagicMock() fetch_cluster_info_mock.return_value = (CLUSTER_URL, SSL_CA_CERT) @@ -515,9 +513,9 @@ def test_execute(self, mock_pod_hook, mock_deployment, mock_hook, fetch_cluster_ @mock.patch.dict(os.environ, {}) @mock.patch(TEMP_FILE) @mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info") - @mock.patch(GKE_DEPLOYMENT_HOOK_PATH) + @mock.patch(GKE_KUBERNETES_HOOK) @mock.patch(GKE_HOOK_PATH) - @mock.patch(GKE_POD_HOOK_PATH) + @mock.patch(GKE_KUBERNETES_HOOK) def test_execute_autoscaled_cluster( self, mock_pod_hook, mock_hook, mock_depl_hook, fetch_cluster_info_mock, file_mock, caplog ): @@ -534,7 +532,7 @@ def test_execute_autoscaled_cluster( @mock.patch(TEMP_FILE) @mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info") @mock.patch(GKE_HOOK_PATH) - @mock.patch(GKE_POD_HOOK_PATH) + @mock.patch(GKE_KUBERNETES_HOOK) def test_execute_autoscaled_cluster_check_error( self, mock_pod_hook, mock_hook, fetch_cluster_info_mock, file_mock, caplog ): @@ -550,7 +548,7 @@ def test_execute_autoscaled_cluster_check_error( @mock.patch(TEMP_FILE) @mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info") @mock.patch(GKE_HOOK_PATH) - @mock.patch(GKE_POD_HOOK_PATH) + @mock.patch(GKE_KUBERNETES_HOOK) def test_execute_non_autoscaled_cluster_check_error( self, mock_pod_hook, mock_hook, fetch_cluster_info_mock, file_mock, caplog ): @@ -916,7 +914,7 @@ def setup_method(self): @mock.patch(TEMP_FILE) @mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info") @mock.patch(GKE_HOOK_PATH) - @mock.patch(GKE_JOB_HOOK_PATH) + @mock.patch(GKE_KUBERNETES_HOOK) def test_execute(self, mock_job_hook, mock_hook, fetch_cluster_info_mock, file_mock): mock_job_hook.return_value.get_job.return_value = mock.MagicMock() fetch_cluster_info_mock.return_value = (CLUSTER_URL, SSL_CA_CERT) @@ -931,7 +929,7 @@ def test_execute(self, mock_job_hook, mock_hook, fetch_cluster_info_mock, file_m @mock.patch(TEMP_FILE) @mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info") @mock.patch(GKE_HOOK_PATH) - @mock.patch(GKE_JOB_HOOK_PATH) + @mock.patch(GKE_KUBERNETES_HOOK) def test_execute_with_impersonation_service_account( self, mock_job_hook, mock_hook, fetch_cluster_info_mock, file_mock, get_con_mock ): @@ -949,7 +947,7 @@ def test_execute_with_impersonation_service_account( @mock.patch(TEMP_FILE) @mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info") @mock.patch(GKE_HOOK_PATH) - @mock.patch(GKE_JOB_HOOK_PATH) + @mock.patch(GKE_KUBERNETES_HOOK) def test_execute_with_impersonation_service_chain_one_element( self, mock_job_hook, mock_hook, fetch_cluster_info_mock, file_mock, get_con_mock ):