From a9ba91579960d1a642a7e19b35ac91a84375fff6 Mon Sep 17 00:00:00 2001 From: Ryan Yuan Date: Tue, 3 Sep 2019 21:39:37 +1000 Subject: [PATCH] [AIRFLOW-5129] Add typehint to GCP DLP hook (#5980) --- airflow/gcp/hooks/dlp.py | 780 +++++++++++++++++---------------------- 1 file changed, 346 insertions(+), 434 deletions(-) diff --git a/airflow/gcp/hooks/dlp.py b/airflow/gcp/hooks/dlp.py index bffc594be62b6..2943c46a9906c 100644 --- a/airflow/gcp/hooks/dlp.py +++ b/airflow/gcp/hooks/dlp.py @@ -24,13 +24,26 @@ import re import time +from typing import List, Optional, Sequence, Tuple, Union + +from google.api_core.retry import Retry from google.cloud.dlp_v2 import DlpServiceClient -from google.cloud.dlp_v2.types import DlpJob +from google.cloud.dlp_v2.types import (ByteContentItem, ContentItem, + DeidentifyConfig, + DeidentifyContentResponse, + DeidentifyTemplate, DlpJob, FieldMask, + InspectConfig, InspectContentResponse, + InspectJobConfig, InspectTemplate, + JobTrigger, ListInfoTypesResponse, + RedactImageRequest, RedactImageResponse, + ReidentifyContentResponse, + RiskAnalysisJobConfig, StoredInfoType, + StoredInfoTypeConfig) from airflow import AirflowException from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook -DLP_JOB_PATH_PATTERN = '^projects/[^/]+/dlpJobs/(?P.*?)$' +DLP_JOB_PATH_PATTERN = "^projects/[^/]+/dlpJobs/(?P.*?)$" # Time to sleep between active checks of the operation results TIME_TO_SLEEP_IN_SECONDS = 1 @@ -53,13 +66,11 @@ class CloudDLPHook(GoogleCloudBaseHook): :type delegate_to: str """ - def __init__(self, - gcp_conn_id="google_cloud_default", - delegate_to=None): + def __init__(self, gcp_conn_id: str = "google_cloud_default", delegate_to: Optional[str] = None) -> None: super().__init__(gcp_conn_id, delegate_to) self._client = None - def get_conn(self): + def get_conn(self) -> DlpServiceClient: """ Provides a client for interacting with the Cloud DLP API. @@ -73,8 +84,13 @@ def get_conn(self): @GoogleCloudBaseHook.catch_http_exception @GoogleCloudBaseHook.fallback_to_default_project_id def cancel_dlp_job( - self, dlp_job_id, project_id=None, retry=None, timeout=None, metadata=None - ): + self, + dlp_job_id: str, + project_id: Optional[str] = None, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + ) -> None: """ Starts asynchronous cancellation on a long-running DLP job. @@ -92,32 +108,28 @@ def cancel_dlp_job( individual attempt. :type timeout: float :param metadata: (Optional) Additional metadata that is provided to the method. - :type metadata: sequence[tuple[str, str]]] + :type metadata: Sequence[Tuple[str, str]] """ client = self.get_conn() if not dlp_job_id: - raise AirflowException( - "Please provide the ID of the DLP job resource to be cancelled." - ) + raise AirflowException("Please provide the ID of the DLP job resource to be cancelled.") name = DlpServiceClient.dlp_job_path(project_id, dlp_job_id) - client.cancel_dlp_job( - name=name, retry=retry, timeout=timeout, metadata=metadata - ) + client.cancel_dlp_job(name=name, retry=retry, timeout=timeout, metadata=metadata) @GoogleCloudBaseHook.catch_http_exception def create_deidentify_template( self, - organization_id=None, - project_id=None, - deidentify_template=None, - template_id=None, - retry=None, - timeout=None, - metadata=None, - ): + organization_id: Optional[str] = None, + project_id: Optional[str] = None, + deidentify_template: Optional[Union[dict, DeidentifyTemplate]] = None, + template_id: Optional[str] = None, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + ) -> DeidentifyTemplate: """ Creates a deidentify template for re-using frequently used configuration for de-identifying content, images, and storage. @@ -141,7 +153,7 @@ def create_deidentify_template( individual attempt. :type timeout: float :param metadata: (Optional) Additional metadata that is provided to the method. - :type metadata: sequence[tuple[str, str]]] + :type metadata: Sequence[Tuple[str, str]] :rtype: google.cloud.dlp_v2.types.DeidentifyTemplate """ @@ -154,9 +166,7 @@ def create_deidentify_template( elif project_id: parent = DlpServiceClient.project_path(project_id) else: - raise AirflowException( - "Please provide either organization_id or project_id." - ) + raise AirflowException("Please provide either organization_id or project_id.") return client.create_deidentify_template( parent=parent, @@ -171,15 +181,15 @@ def create_deidentify_template( @GoogleCloudBaseHook.fallback_to_default_project_id def create_dlp_job( self, - project_id=None, - inspect_job=None, - risk_job=None, - job_id=None, - retry=None, - timeout=None, - metadata=None, - wait_until_finished=True - ): + project_id: Optional[str] = None, + inspect_job: Optional[Union[dict, InspectJobConfig]] = None, + risk_job: Optional[Union[dict, RiskAnalysisJobConfig]] = None, + job_id: Optional[str] = None, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + wait_until_finished: bool = True, + ) -> DlpJob: """ Creates a new job to inspect storage or calculate risk metrics. @@ -201,7 +211,7 @@ def create_dlp_job( individual attempt. :type timeout: float :param metadata: (Optional) Additional metadata that is provided to the method. - :type metadata: sequence[tuple[str, str]]] + :type metadata: Sequence[Tuple[str, str]] :param wait_until_finished: (Optional) If true, it will keep polling the job state until it is set to DONE. :type wait_until_finished: bool @@ -224,32 +234,28 @@ def create_dlp_job( if wait_until_finished: pattern = re.compile(DLP_JOB_PATH_PATTERN, re.IGNORECASE) match = pattern.match(job.name) - job_name = match.groupdict()['job'] + if match is not None: + job_name = match.groupdict()["job"] + else: + raise AirflowException("Unable to retrieve DLP job's ID from {}.".format(job.name)) while wait_until_finished: - job = self.get_dlp_job( - dlp_job_id=job_name, - project_id=project_id) - - self.log.info( - 'DLP job {} state: {}.'.format( - job.name, - DlpJob.JobState.Name(job.state) - ) - ) + job = self.get_dlp_job(dlp_job_id=job_name, project_id=project_id) + + self.log.info("DLP job {} state: {}.".format(job.name, DlpJob.JobState.Name(job.state))) if job.state == DlpJob.JobState.DONE: return job - elif job.state in [DlpJob.JobState.PENDING, - DlpJob.JobState.RUNNING, - DlpJob.JobState.JOB_STATE_UNSPECIFIED]: + elif job.state in [ + DlpJob.JobState.PENDING, + DlpJob.JobState.RUNNING, + DlpJob.JobState.JOB_STATE_UNSPECIFIED, + ]: time.sleep(TIME_TO_SLEEP_IN_SECONDS) else: raise AirflowException( - 'Stopped polling DLP job state. DLP job {} state: {}.' - .format( - job.name, - DlpJob.JobState.Name(job.state) + "Stopped polling DLP job state. DLP job {} state: {}.".format( + job.name, DlpJob.JobState.Name(job.state) ) ) return job @@ -257,14 +263,14 @@ def create_dlp_job( @GoogleCloudBaseHook.catch_http_exception def create_inspect_template( self, - organization_id=None, - project_id=None, - inspect_template=None, - template_id=None, - retry=None, - timeout=None, - metadata=None, - ): + organization_id: Optional[str] = None, + project_id: Optional[str] = None, + inspect_template: Optional[Union[dict, InspectTemplate]] = None, + template_id: Optional[str] = None, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + ) -> InspectTemplate: """ Creates an inspect template for re-using frequently used configuration for inspecting content, images, and storage. @@ -288,7 +294,7 @@ def create_inspect_template( individual attempt. :type timeout: float :param metadata: (Optional) Additional metadata that is provided to the method. - :type metadata: sequence[tuple[str, str]]] + :type metadata: Sequence[Tuple[str, str]] :rtype: google.cloud.dlp_v2.types.InspectTemplate """ @@ -302,9 +308,7 @@ def create_inspect_template( elif project_id: parent = DlpServiceClient.project_path(project_id) else: - raise AirflowException( - "Please provide either organization_id or project_id." - ) + raise AirflowException("Please provide either organization_id or project_id.") return client.create_inspect_template( parent=parent, @@ -319,13 +323,13 @@ def create_inspect_template( @GoogleCloudBaseHook.fallback_to_default_project_id def create_job_trigger( self, - project_id=None, - job_trigger=None, - trigger_id=None, - retry=None, - timeout=None, - metadata=None, - ): + project_id: Optional[str] = None, + job_trigger: Optional[Union[dict, JobTrigger]] = None, + trigger_id: Optional[str] = None, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + ) -> JobTrigger: """ Creates a job trigger to run DLP actions such as scanning storage for sensitive information on a set schedule. @@ -346,7 +350,7 @@ def create_job_trigger( individual attempt. :type timeout: float :param metadata: (Optional) Additional metadata that is provided to the method. - :type metadata: sequence[tuple[str, str]]] + :type metadata: Sequence[Tuple[str, str]] :rtype: google.cloud.dlp_v2.types.JobTrigger """ @@ -365,14 +369,14 @@ def create_job_trigger( @GoogleCloudBaseHook.catch_http_exception def create_stored_info_type( self, - organization_id=None, - project_id=None, - config=None, - stored_info_type_id=None, - retry=None, - timeout=None, - metadata=None, - ): + organization_id: Optional[str] = None, + project_id: Optional[str] = None, + config: Optional[Union[dict, StoredInfoTypeConfig]] = None, + stored_info_type_id: Optional[str] = None, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + ) -> StoredInfoType: """ Creates a pre-built stored info type to be used for inspection. @@ -395,7 +399,7 @@ def create_stored_info_type( individual attempt. :type timeout: float :param metadata: (Optional) Additional metadata that is provided to the method. - :type metadata: sequence[tuple[str, str]]] + :type metadata: Sequence[Tuple[str, str]] :rtype: google.cloud.dlp_v2.types.StoredInfoType """ @@ -409,9 +413,7 @@ def create_stored_info_type( elif project_id: parent = DlpServiceClient.project_path(project_id) else: - raise AirflowException( - "Please provide either organization_id or project_id." - ) + raise AirflowException("Please provide either organization_id or project_id.") return client.create_stored_info_type( parent=parent, @@ -426,16 +428,16 @@ def create_stored_info_type( @GoogleCloudBaseHook.fallback_to_default_project_id def deidentify_content( self, - project_id=None, - deidentify_config=None, - inspect_config=None, - item=None, - inspect_template_name=None, - deidentify_template_name=None, - retry=None, - timeout=None, - metadata=None, - ): + project_id: Optional[str] = None, + deidentify_config: Optional[Union[dict, DeidentifyConfig]] = None, + inspect_config: Optional[Union[dict, InspectConfig]] = None, + item: Optional[Union[dict, ContentItem]] = None, + inspect_template_name: Optional[str] = None, + deidentify_template_name: Optional[str] = None, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + ) -> DeidentifyContentResponse: """ De-identifies potentially sensitive info from a content item. This method has limits on input size and output size. @@ -468,7 +470,7 @@ def deidentify_content( individual attempt. :type timeout: float :param metadata: (Optional) Additional metadata that is provided to the method. - :type metadata: sequence[tuple[str, str]]] + :type metadata: Sequence[Tuple[str, str]] :rtype: google.cloud.dlp_v2.types.DeidentifyContentResponse """ @@ -489,14 +491,8 @@ def deidentify_content( @GoogleCloudBaseHook.catch_http_exception def delete_deidentify_template( - self, - template_id, - organization_id=None, - project_id=None, - retry=None, - timeout=None, - metadata=None - ): + self, template_id, organization_id=None, project_id=None, retry=None, timeout=None, metadata=None + ) -> None: """ Deletes a deidentify template. @@ -517,15 +513,13 @@ def delete_deidentify_template( individual attempt. :type timeout: float :param metadata: (Optional) Additional metadata that is provided to the method. - :type metadata: sequence[tuple[str, str]]] + :type metadata: Sequence[Tuple[str, str]] """ client = self.get_conn() if not template_id: - raise AirflowException( - "Please provide the ID of deidentify template to be deleted." - ) + raise AirflowException("Please provide the ID of deidentify template to be deleted.") # Handle project_id from connection configuration project_id = project_id or self.project_id @@ -535,19 +529,20 @@ def delete_deidentify_template( elif project_id: name = DlpServiceClient.project_deidentify_template_path(project_id, template_id) else: - raise AirflowException( - "Please provide either organization_id or project_id." - ) + raise AirflowException("Please provide either organization_id or project_id.") - client.delete_deidentify_template( - name=name, retry=retry, timeout=timeout, metadata=metadata - ) + client.delete_deidentify_template(name=name, retry=retry, timeout=timeout, metadata=metadata) @GoogleCloudBaseHook.catch_http_exception @GoogleCloudBaseHook.fallback_to_default_project_id def delete_dlp_job( - self, dlp_job_id, project_id=None, retry=None, timeout=None, metadata=None - ): + self, + dlp_job_id: str, + project_id: Optional[str] = None, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + ) -> None: """ Deletes a long-running DLP job. This method indicates that the client is no longer interested in the DLP job result. The job will be cancelled if possible. @@ -566,31 +561,27 @@ def delete_dlp_job( individual attempt. :type timeout: float :param metadata: (Optional) Additional metadata that is provided to the method. - :type metadata: sequence[tuple[str, str]]] + :type metadata: Sequence[Tuple[str, str]] """ client = self.get_conn() if not dlp_job_id: - raise AirflowException( - "Please provide the ID of the DLP job resource to be cancelled." - ) + raise AirflowException("Please provide the ID of the DLP job resource to be cancelled.") name = DlpServiceClient.dlp_job_path(project_id, dlp_job_id) - client.delete_dlp_job( - name=name, retry=retry, timeout=timeout, metadata=metadata - ) + client.delete_dlp_job(name=name, retry=retry, timeout=timeout, metadata=metadata) @GoogleCloudBaseHook.catch_http_exception def delete_inspect_template( self, - template_id, - organization_id=None, - project_id=None, - retry=None, - timeout=None, - metadata=None - ): + template_id: str, + organization_id: Optional[str] = None, + project_id: Optional[str] = None, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + ) -> None: """ Deletes an inspect template. @@ -611,15 +602,13 @@ def delete_inspect_template( individual attempt. :type timeout: float :param metadata: (Optional) Additional metadata that is provided to the method. - :type metadata: sequence[tuple[str, str]]] + :type metadata: Sequence[Tuple[str, str]] """ client = self.get_conn() if not template_id: - raise AirflowException( - "Please provide the ID of the inspect template to be deleted." - ) + raise AirflowException("Please provide the ID of the inspect template to be deleted.") # Handle project_id from connection configuration project_id = project_id or self.project_id @@ -629,24 +618,20 @@ def delete_inspect_template( elif project_id: name = DlpServiceClient.project_inspect_template_path(project_id, template_id) else: - raise AirflowException( - "Please provide either organization_id or project_id." - ) + raise AirflowException("Please provide either organization_id or project_id.") - client.delete_inspect_template( - name=name, retry=retry, timeout=timeout, metadata=metadata - ) + client.delete_inspect_template(name=name, retry=retry, timeout=timeout, metadata=metadata) @GoogleCloudBaseHook.catch_http_exception @GoogleCloudBaseHook.fallback_to_default_project_id def delete_job_trigger( self, - job_trigger_id, - project_id=None, - retry=None, - timeout=None, - metadata=None - ): + job_trigger_id: str, + project_id: Optional[str] = None, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + ) -> None: """ Deletes a job trigger. @@ -664,31 +649,27 @@ def delete_job_trigger( individual attempt. :type timeout: float :param metadata: (Optional) Additional metadata that is provided to the method. - :type metadata: sequence[tuple[str, str]]] + :type metadata: Sequence[Tuple[str, str]] """ client = self.get_conn() if not job_trigger_id: - raise AirflowException( - "Please provide the ID of the DLP job trigger to be deleted." - ) + raise AirflowException("Please provide the ID of the DLP job trigger to be deleted.") name = DlpServiceClient.project_job_trigger_path(project_id, job_trigger_id) - client.delete_job_trigger( - name=name, retry=retry, timeout=timeout, metadata=metadata - ) + client.delete_job_trigger(name=name, retry=retry, timeout=timeout, metadata=metadata) @GoogleCloudBaseHook.catch_http_exception def delete_stored_info_type( self, - stored_info_type_id, - organization_id=None, - project_id=None, - retry=None, - timeout=None, - metadata=None - ): + stored_info_type_id: str, + organization_id: Optional[str] = None, + project_id: Optional[str] = None, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + ) -> None: """ Deletes a stored info type. @@ -709,47 +690,36 @@ def delete_stored_info_type( individual attempt. :type timeout: float :param metadata: (Optional) Additional metadata that is provided to the method. - :type metadata: sequence[tuple[str, str]]] + :type metadata: Sequence[Tuple[str, str]] """ client = self.get_conn() if not stored_info_type_id: - raise AirflowException( - "Please provide the ID of the stored info type to be deleted." - ) + raise AirflowException("Please provide the ID of the stored info type to be deleted.") # Handle project_id from connection configuration project_id = project_id or self.project_id if organization_id: - name = DlpServiceClient.organization_stored_info_type_path( - organization_id, stored_info_type_id - ) + name = DlpServiceClient.organization_stored_info_type_path(organization_id, stored_info_type_id) elif project_id: - name = DlpServiceClient.project_stored_info_type_path( - project_id, - stored_info_type_id - ) + name = DlpServiceClient.project_stored_info_type_path(project_id, stored_info_type_id) else: - raise AirflowException( - "Please provide either organization_id or project_id." - ) + raise AirflowException("Please provide either organization_id or project_id.") - client.delete_stored_info_type( - name=name, retry=retry, timeout=timeout, metadata=metadata - ) + client.delete_stored_info_type(name=name, retry=retry, timeout=timeout, metadata=metadata) @GoogleCloudBaseHook.catch_http_exception def get_deidentify_template( self, - template_id, - organization_id=None, - project_id=None, - retry=None, - timeout=None, - metadata=None - ): + template_id: str, + organization_id: Optional[str] = None, + project_id: Optional[str] = None, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + ) -> DeidentifyTemplate: """ Gets a deidentify template. @@ -770,16 +740,14 @@ def get_deidentify_template( individual attempt. :type timeout: float :param metadata: (Optional) Additional metadata that is provided to the method. - :type metadata: sequence[tuple[str, str]]] + :type metadata: Sequence[Tuple[str, str]] :rtype: google.cloud.dlp_v2.types.DeidentifyTemplate """ client = self.get_conn() if not template_id: - raise AirflowException( - "Please provide the ID of the deidentify template to be read." - ) + raise AirflowException("Please provide the ID of the deidentify template to be read.") # Handle project_id from connection configuration project_id = project_id or self.project_id @@ -789,24 +757,20 @@ def get_deidentify_template( elif project_id: name = DlpServiceClient.project_deidentify_template_path(project_id, template_id) else: - raise AirflowException( - "Please provide either organization_id or project_id." - ) + raise AirflowException("Please provide either organization_id or project_id.") - return client.get_deidentify_template( - name=name, retry=retry, timeout=timeout, metadata=metadata - ) + return client.get_deidentify_template(name=name, retry=retry, timeout=timeout, metadata=metadata) @GoogleCloudBaseHook.catch_http_exception @GoogleCloudBaseHook.fallback_to_default_project_id def get_dlp_job( self, - dlp_job_id, - project_id=None, - retry=None, - timeout=None, - metadata=None - ): + dlp_job_id: str, + project_id: Optional[str] = None, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + ) -> DlpJob: """ Gets the latest state of a long-running Dlp Job. @@ -824,32 +788,28 @@ def get_dlp_job( individual attempt. :type timeout: float :param metadata: (Optional) Additional metadata that is provided to the method. - :type metadata: sequence[tuple[str, str]]] + :type metadata: Sequence[Tuple[str, str]] :rtype: google.cloud.dlp_v2.types.DlpJob """ client = self.get_conn() if not dlp_job_id: - raise AirflowException( - "Please provide the ID of the DLP job resource to be read." - ) + raise AirflowException("Please provide the ID of the DLP job resource to be read.") name = DlpServiceClient.dlp_job_path(project_id, dlp_job_id) - return client.get_dlp_job( - name=name, retry=retry, timeout=timeout, metadata=metadata - ) + return client.get_dlp_job(name=name, retry=retry, timeout=timeout, metadata=metadata) @GoogleCloudBaseHook.catch_http_exception def get_inspect_template( self, - template_id, - organization_id=None, - project_id=None, - retry=None, - timeout=None, - metadata=None - ): + template_id: str, + organization_id: Optional[str] = None, + project_id: Optional[str] = None, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + ) -> InspectTemplate: """ Gets an inspect template. @@ -870,16 +830,14 @@ def get_inspect_template( individual attempt. :type timeout: float :param metadata: (Optional) Additional metadata that is provided to the method. - :type metadata: sequence[tuple[str, str]]] + :type metadata: Sequence[Tuple[str, str]] :rtype: google.cloud.dlp_v2.types.InspectTemplate """ client = self.get_conn() if not template_id: - raise AirflowException( - "Please provide the ID of the inspect template to be read." - ) + raise AirflowException("Please provide the ID of the inspect template to be read.") # Handle project_id from connection configuration project_id = project_id or self.project_id @@ -889,24 +847,20 @@ def get_inspect_template( elif project_id: name = DlpServiceClient.project_inspect_template_path(project_id, template_id) else: - raise AirflowException( - "Please provide either organization_id or project_id." - ) + raise AirflowException("Please provide either organization_id or project_id.") - return client.get_inspect_template( - name=name, retry=retry, timeout=timeout, metadata=metadata - ) + return client.get_inspect_template(name=name, retry=retry, timeout=timeout, metadata=metadata) @GoogleCloudBaseHook.catch_http_exception @GoogleCloudBaseHook.fallback_to_default_project_id def get_job_trigger( self, - job_trigger_id, - project_id=None, - retry=None, - timeout=None, - metadata=None - ): + job_trigger_id: str, + project_id: Optional[str] = None, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + ) -> JobTrigger: """ Gets a DLP job trigger. @@ -924,32 +878,28 @@ def get_job_trigger( individual attempt. :type timeout: float :param metadata: (Optional) Additional metadata that is provided to the method. - :type metadata: sequence[tuple[str, str]]] + :type metadata: Sequence[Tuple[str, str]] :rtype: google.cloud.dlp_v2.types.JobTrigger """ client = self.get_conn() if not job_trigger_id: - raise AirflowException( - "Please provide the ID of the DLP job trigger to be read." - ) + raise AirflowException("Please provide the ID of the DLP job trigger to be read.") name = DlpServiceClient.project_job_trigger_path(project_id, job_trigger_id) - return client.get_job_trigger( - name=name, retry=retry, timeout=timeout, metadata=metadata - ) + return client.get_job_trigger(name=name, retry=retry, timeout=timeout, metadata=metadata) @GoogleCloudBaseHook.catch_http_exception def get_stored_info_type( self, - stored_info_type_id, - organization_id=None, - project_id=None, - retry=None, - timeout=None, - metadata=None - ): + stored_info_type_id: str, + organization_id: Optional[str] = None, + project_id: Optional[str] = None, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + ) -> StoredInfoType: """ Gets a stored info type. @@ -970,50 +920,39 @@ def get_stored_info_type( individual attempt. :type timeout: float :param metadata: (Optional) Additional metadata that is provided to the method. - :type metadata: sequence[tuple[str, str]]] + :type metadata: Sequence[Tuple[str, str]] :rtype: google.cloud.dlp_v2.types.StoredInfoType """ client = self.get_conn() if not stored_info_type_id: - raise AirflowException( - "Please provide the ID of the stored info type to be read." - ) + raise AirflowException("Please provide the ID of the stored info type to be read.") # Handle project_id from connection configuration project_id = project_id or self.project_id if organization_id: - name = DlpServiceClient.organization_stored_info_type_path( - organization_id, stored_info_type_id - ) + name = DlpServiceClient.organization_stored_info_type_path(organization_id, stored_info_type_id) elif project_id: - name = DlpServiceClient.project_stored_info_type_path( - project_id, - stored_info_type_id - ) + name = DlpServiceClient.project_stored_info_type_path(project_id, stored_info_type_id) else: - raise AirflowException( - "Please provide either organization_id or project_id." - ) + raise AirflowException("Please provide either organization_id or project_id.") - return client.get_stored_info_type( - name=name, retry=retry, timeout=timeout, metadata=metadata - ) + return client.get_stored_info_type(name=name, retry=retry, timeout=timeout, metadata=metadata) @GoogleCloudBaseHook.catch_http_exception @GoogleCloudBaseHook.fallback_to_default_project_id def inspect_content( self, - project_id=None, - inspect_config=None, - item=None, - inspect_template_name=None, - retry=None, - timeout=None, - metadata=None, - ): + project_id: Optional[str] = None, + inspect_config: Optional[Union[dict, InspectConfig]] = None, + item: Optional[Union[dict, ContentItem]] = None, + inspect_template_name: Optional[str] = None, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + ) -> InspectContentResponse: """ Finds potentially sensitive info in content. This method has limits on input size, processing time, and output size. @@ -1038,7 +977,7 @@ def inspect_content( individual attempt. :type timeout: float :param metadata: (Optional) Additional metadata that is provided to the method. - :type metadata: sequence[tuple[str, str]]] + :type metadata: Sequence[Tuple[str, str]] :rtype: google.cloud.dlp_v2.types.InspectContentResponse """ @@ -1058,14 +997,14 @@ def inspect_content( @GoogleCloudBaseHook.catch_http_exception def list_deidentify_templates( self, - organization_id=None, - project_id=None, - page_size=None, - order_by=None, - retry=None, - timeout=None, - metadata=None, - ): + organization_id: Optional[str] = None, + project_id: Optional[str] = None, + page_size: Optional[int] = None, + order_by: Optional[str] = None, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + ) -> List[DeidentifyTemplate]: """ Lists deidentify templates. @@ -1090,8 +1029,8 @@ def list_deidentify_templates( individual attempt. :type timeout: float :param metadata: (Optional) Additional metadata that is provided to the method. - :type metadata: sequence[tuple[str, str]]] - :rtype: list[google.cloud.dlp_v2.types.DeidentifyTemplate] + :type metadata: Sequence[Tuple[str, str]] + :rtype: List[google.cloud.dlp_v2.types.DeidentifyTemplate] """ client = self.get_conn() @@ -1104,9 +1043,7 @@ def list_deidentify_templates( elif project_id: parent = DlpServiceClient.project_path(project_id) else: - raise AirflowException( - "Please provide either organization_id or project_id." - ) + raise AirflowException("Please provide either organization_id or project_id.") results = client.list_deidentify_templates( parent=parent, @@ -1123,15 +1060,15 @@ def list_deidentify_templates( @GoogleCloudBaseHook.fallback_to_default_project_id def list_dlp_jobs( self, - project_id=None, - results_filter=None, - page_size=None, - job_type=None, - order_by=None, - retry=None, - timeout=None, - metadata=None, - ): + project_id: Optional[str] = None, + results_filter: Optional[str] = None, + page_size: Optional[int] = None, + job_type: Optional[str] = None, + order_by: Optional[str] = None, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + ) -> List[DlpJob]: """ Lists DLP jobs that match the specified filter in the request. @@ -1157,8 +1094,8 @@ def list_dlp_jobs( individual attempt. :type timeout: float :param metadata: (Optional) Additional metadata that is provided to the method. - :type metadata: sequence[tuple[str, str]]] - :rtype: list[google.cloud.dlp_v2.types.DlpJob] + :type metadata: Sequence[Tuple[str, str]] + :rtype: List[google.cloud.dlp_v2.types.DlpJob] """ client = self.get_conn() @@ -1179,12 +1116,12 @@ def list_dlp_jobs( @GoogleCloudBaseHook.catch_http_exception def list_info_types( self, - language_code=None, - results_filter=None, - retry=None, - timeout=None, - metadata=None, - ): + language_code: Optional[str] = None, + results_filter: Optional[str] = None, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + ) -> ListInfoTypesResponse: """ Returns a list of the sensitive information types that the DLP API supports. @@ -1202,7 +1139,7 @@ def list_info_types( individual attempt. :type timeout: float :param metadata: (Optional) Additional metadata that is provided to the method. - :type metadata: sequence[tuple[str, str]]] + :type metadata: Sequence[Tuple[str, str]] :rtype: google.cloud.dlp_v2.types.ListInfoTypesResponse """ @@ -1219,14 +1156,14 @@ def list_info_types( @GoogleCloudBaseHook.catch_http_exception def list_inspect_templates( self, - organization_id=None, - project_id=None, - page_size=None, - order_by=None, - retry=None, - timeout=None, - metadata=None, - ): + organization_id: Optional[str] = None, + project_id: Optional[str] = None, + page_size: Optional[int] = None, + order_by: Optional[str] = None, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + ) -> List[InspectTemplate]: """ Lists inspect templates. @@ -1251,8 +1188,8 @@ def list_inspect_templates( individual attempt. :type timeout: float :param metadata: (Optional) Additional metadata that is provided to the method. - :type metadata: sequence[tuple[str, str]]] - :rtype: list[google.cloud.dlp_v2.types.InspectTemplate] + :type metadata: Sequence[Tuple[str, str]] + :rtype: List[google.cloud.dlp_v2.types.InspectTemplate] """ client = self.get_conn() @@ -1265,9 +1202,7 @@ def list_inspect_templates( elif project_id: parent = DlpServiceClient.project_path(project_id) else: - raise AirflowException( - "Please provide either organization_id or project_id." - ) + raise AirflowException("Please provide either organization_id or project_id.") results = client.list_inspect_templates( parent=parent, @@ -1283,14 +1218,14 @@ def list_inspect_templates( @GoogleCloudBaseHook.fallback_to_default_project_id def list_job_triggers( self, - project_id=None, - page_size=None, - order_by=None, - results_filter=None, - retry=None, - timeout=None, - metadata=None, - ): + project_id: Optional[str] = None, + page_size: Optional[int] = None, + order_by: Optional[str] = None, + results_filter: Optional[str] = None, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + ) -> List[JobTrigger]: """ Lists job triggers. @@ -1314,8 +1249,8 @@ def list_job_triggers( individual attempt. :type timeout: float :param metadata: (Optional) Additional metadata that is provided to the method. - :type metadata: sequence[tuple[str, str]]] - :rtype: list[google.cloud.dlp_v2.types.JobTrigger] + :type metadata: Sequence[Tuple[str, str]] + :rtype: List[google.cloud.dlp_v2.types.JobTrigger] """ client = self.get_conn() @@ -1335,14 +1270,14 @@ def list_job_triggers( @GoogleCloudBaseHook.catch_http_exception def list_stored_info_types( self, - organization_id=None, - project_id=None, - page_size=None, - order_by=None, - retry=None, - timeout=None, - metadata=None, - ): + organization_id: Optional[str] = None, + project_id: Optional[str] = None, + page_size: Optional[int] = None, + order_by: Optional[str] = None, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + ) -> List[StoredInfoType]: """ Lists stored info types. @@ -1367,8 +1302,8 @@ def list_stored_info_types( individual attempt. :type timeout: float :param metadata: (Optional) Additional metadata that is provided to the method. - :type metadata: sequence[tuple[str, str]]] - :rtype: list[google.cloud.dlp_v2.types.StoredInfoType] + :type metadata: Sequence[Tuple[str, str]] + :rtype: List[google.cloud.dlp_v2.types.StoredInfoType] """ client = self.get_conn() @@ -1381,9 +1316,7 @@ def list_stored_info_types( elif project_id: parent = DlpServiceClient.project_path(project_id) else: - raise AirflowException( - "Please provide either organization_id or project_id." - ) + raise AirflowException("Please provide either organization_id or project_id.") results = client.list_stored_info_types( parent=parent, @@ -1399,15 +1332,17 @@ def list_stored_info_types( @GoogleCloudBaseHook.fallback_to_default_project_id def redact_image( self, - project_id=None, - inspect_config=None, - image_redaction_configs=None, - include_findings=None, - byte_item=None, - retry=None, - timeout=None, - metadata=None, - ): + project_id: Optional[str] = None, + inspect_config: Optional[Union[dict, InspectConfig]] = None, + image_redaction_configs: Optional[ + Union[List[dict], List[RedactImageRequest.ImageRedactionConfig]] + ] = None, + include_findings: Optional[bool] = None, + byte_item: Optional[Union[dict, ByteContentItem]] = None, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + ) -> RedactImageResponse: """ Redacts potentially sensitive info from an image. This method has limits on input size, processing time, and output size. @@ -1421,7 +1356,8 @@ def redact_image( :type inspect_config: dict or google.cloud.dlp_v2.types.InspectConfig :param image_redaction_configs: (Optional) The configuration for specifying what content to redact from images. - :type image_redaction_configs: list[dict] or list[google.cloud.dlp_v2.types.ImageRedactionConfig] + :type image_redaction_configs: List[dict] or + List[google.cloud.dlp_v2.types.RedactImageRequest.ImageRedactionConfig] :param include_findings: (Optional) Whether the response should include findings along with the redacted image. :type include_findings: bool @@ -1435,7 +1371,7 @@ def redact_image( individual attempt. :type timeout: float :param metadata: (Optional) Additional metadata that is provided to the method. - :type metadata: sequence[tuple[str, str]]] + :type metadata: Sequence[Tuple[str, str]] :rtype: google.cloud.dlp_v2.types.RedactImageResponse """ @@ -1457,16 +1393,16 @@ def redact_image( @GoogleCloudBaseHook.fallback_to_default_project_id def reidentify_content( self, - project_id=None, - reidentify_config=None, - inspect_config=None, - item=None, - inspect_template_name=None, - reidentify_template_name=None, - retry=None, - timeout=None, - metadata=None, - ): + project_id: Optional[str] = None, + reidentify_config: Optional[Union[dict, DeidentifyConfig]] = None, + inspect_config: Optional[Union[dict, InspectConfig]] = None, + item: Optional[Union[dict, ContentItem]] = None, + inspect_template_name: Optional[str] = None, + reidentify_template_name: Optional[str] = None, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + ) -> ReidentifyContentResponse: """ Re-identifies content that has been de-identified. @@ -1496,7 +1432,7 @@ def reidentify_content( individual attempt. :type timeout: float :param metadata: (Optional) Additional metadata that is provided to the method. - :type metadata: sequence[tuple[str, str]]] + :type metadata: Sequence[Tuple[str, str]] :rtype: google.cloud.dlp_v2.types.ReidentifyContentResponse """ @@ -1518,15 +1454,15 @@ def reidentify_content( @GoogleCloudBaseHook.catch_http_exception def update_deidentify_template( self, - template_id, - organization_id=None, - project_id=None, - deidentify_template=None, - update_mask=None, - retry=None, - timeout=None, - metadata=None, - ): + template_id: str, + organization_id: Optional[str] = None, + project_id: Optional[str] = None, + deidentify_template: Optional[Union[dict, DeidentifyTemplate]] = None, + update_mask: Optional[Union[dict, FieldMask]] = None, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + ) -> DeidentifyTemplate: """ Updates the deidentify template. @@ -1551,16 +1487,14 @@ def update_deidentify_template( individual attempt. :type timeout: float :param metadata: (Optional) Additional metadata that is provided to the method. - :type metadata: sequence[tuple[str, str]]] + :type metadata: Sequence[Tuple[str, str]] :rtype: google.cloud.dlp_v2.types.DeidentifyTemplate """ client = self.get_conn() if not template_id: - raise AirflowException( - "Please provide the ID of deidentify template to be updated." - ) + raise AirflowException("Please provide the ID of deidentify template to be updated.") # Handle project_id from connection configuration project_id = project_id or self.project_id @@ -1570,9 +1504,7 @@ def update_deidentify_template( elif project_id: name = DlpServiceClient.project_deidentify_template_path(project_id, template_id) else: - raise AirflowException( - "Please provide either organization_id or project_id." - ) + raise AirflowException("Please provide either organization_id or project_id.") return client.update_deidentify_template( name=name, @@ -1586,15 +1518,15 @@ def update_deidentify_template( @GoogleCloudBaseHook.catch_http_exception def update_inspect_template( self, - template_id, - organization_id=None, - project_id=None, - inspect_template=None, - update_mask=None, - retry=None, - timeout=None, - metadata=None, - ): + template_id: str, + organization_id: Optional[str] = None, + project_id: Optional[str] = None, + inspect_template: Optional[Union[dict, InspectTemplate]] = None, + update_mask: Optional[Union[dict, FieldMask]] = None, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + ) -> InspectTemplate: """ Updates the inspect template. @@ -1619,16 +1551,14 @@ def update_inspect_template( individual attempt. :type timeout: float :param metadata: (Optional) Additional metadata that is provided to the method. - :type metadata: sequence[tuple[str, str]]] + :type metadata: Sequence[Tuple[str, str]] :rtype: google.cloud.dlp_v2.types.InspectTemplate """ client = self.get_conn() if not template_id: - raise AirflowException( - "Please provide the ID of the inspect template to be updated." - ) + raise AirflowException("Please provide the ID of the inspect template to be updated.") # Handle project_id from connection configuration project_id = project_id or self.project_id @@ -1637,9 +1567,7 @@ def update_inspect_template( elif project_id: name = DlpServiceClient.project_inspect_template_path(project_id, template_id) else: - raise AirflowException( - "Please provide either organization_id or project_id." - ) + raise AirflowException("Please provide either organization_id or project_id.") return client.update_inspect_template( name=name, @@ -1654,14 +1582,14 @@ def update_inspect_template( @GoogleCloudBaseHook.fallback_to_default_project_id def update_job_trigger( self, - job_trigger_id, - project_id=None, - job_trigger=None, - update_mask=None, - retry=None, - timeout=None, - metadata=None, - ): + job_trigger_id: str, + project_id: Optional[str] = None, + job_trigger: Optional[Union[dict, JobTrigger]] = None, + update_mask: Optional[Union[dict, FieldMask]] = None, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + ) -> JobTrigger: """ Updates a job trigger. @@ -1683,16 +1611,14 @@ def update_job_trigger( individual attempt. :type timeout: float :param metadata: (Optional) Additional metadata that is provided to the method. - :type metadata: sequence[tuple[str, str]]] + :type metadata: Sequence[Tuple[str, str]] :rtype: google.cloud.dlp_v2.types.JobTrigger """ client = self.get_conn() if not job_trigger_id: - raise AirflowException( - "Please provide the ID of the DLP job trigger to be updated." - ) + raise AirflowException("Please provide the ID of the DLP job trigger to be updated.") name = DlpServiceClient.project_job_trigger_path(project_id, job_trigger_id) return client.update_job_trigger( @@ -1707,15 +1633,15 @@ def update_job_trigger( @GoogleCloudBaseHook.catch_http_exception def update_stored_info_type( self, - stored_info_type_id, - organization_id=None, - project_id=None, - config=None, - update_mask=None, - retry=None, - timeout=None, - metadata=None, - ): + stored_info_type_id: str, + organization_id: Optional[str] = None, + project_id: Optional[str] = None, + config: Optional[Union[dict, StoredInfoTypeConfig]] = None, + update_mask: Optional[Union[dict, FieldMask]] = None, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + ) -> StoredInfoType: """ Updates the stored info type by creating a new version. @@ -1741,39 +1667,25 @@ def update_stored_info_type( individual attempt. :type timeout: float :param metadata: (Optional) Additional metadata that is provided to the method. - :type metadata: sequence[tuple[str, str]]] + :type metadata: Sequence[Tuple[str, str]] :rtype: google.cloud.dlp_v2.types.StoredInfoType """ client = self.get_conn() if not stored_info_type_id: - raise AirflowException( - "Please provide the ID of the stored info type to be updated." - ) + raise AirflowException("Please provide the ID of the stored info type to be updated.") # Handle project_id from connection configuration project_id = project_id or self.project_id if organization_id: - name = DlpServiceClient.organization_stored_info_type_path( - organization_id, stored_info_type_id - ) + name = DlpServiceClient.organization_stored_info_type_path(organization_id, stored_info_type_id) elif project_id: - name = DlpServiceClient.project_stored_info_type_path( - project_id, - stored_info_type_id - ) + name = DlpServiceClient.project_stored_info_type_path(project_id, stored_info_type_id) else: - raise AirflowException( - "Please provide either organization_id or project_id." - ) + raise AirflowException("Please provide either organization_id or project_id.") return client.update_stored_info_type( - name=name, - config=config, - update_mask=update_mask, - retry=retry, - timeout=timeout, - metadata=metadata, + name=name, config=config, update_mask=update_mask, retry=retry, timeout=timeout, metadata=metadata )