From 58106ce67568fc7f844ae4e965fea05e44f14d0b Mon Sep 17 00:00:00 2001 From: Tomek Urbaszel Date: Mon, 19 Aug 2019 15:23:50 +0200 Subject: [PATCH] [AIRFLOW-4768] Timeout parameter in example_gcp_video_intelligence This commit adds timeout parameter to GCP Video Intelligence operators --- airflow/gcp/operators/video_intelligence.py | 18 ++++++++++++++++++ tests/gcp/operators/test_video_intelligence.py | 3 +++ 2 files changed, 21 insertions(+) diff --git a/airflow/gcp/operators/video_intelligence.py b/airflow/gcp/operators/video_intelligence.py index 1ce1b487acbd2d..5f8cd69f94e7c5 100644 --- a/airflow/gcp/operators/video_intelligence.py +++ b/airflow/gcp/operators/video_intelligence.py @@ -55,6 +55,9 @@ class CloudVideoIntelligenceDetectVideoLabelsOperator(BaseOperator): :param retry: Retry object used to determine when/if to retry requests. If None is specified, requests will not be retried. :type retry: google.api_core.retry.Retry + :param timeout: Optional, The amount of time, in seconds, to wait for the request to complete. + Note that if retry is specified, the timeout applies to each individual attempt. + :type timeout: float :param gcp_conn_id: Optional, The connection ID used to connect to Google Cloud Platform. Defaults to ``google_cloud_default``. :type gcp_conn_id: str @@ -71,6 +74,7 @@ def __init__( video_context=None, location=None, retry=None, + timeout=None, gcp_conn_id="google_cloud_default", *args, **kwargs @@ -83,6 +87,7 @@ def __init__( self.location = location self.retry = retry self.gcp_conn_id = gcp_conn_id + self.timeout = timeout def execute(self, context): hook = CloudVideoIntelligenceHook(gcp_conn_id=self.gcp_conn_id) @@ -93,6 +98,7 @@ def execute(self, context): location=self.location, retry=self.retry, features=[enums.Feature.LABEL_DETECTION], + timeout=self.timeout ) self.log.info("Processing video for label annotations") result = MessageToDict(operation.result()) @@ -128,6 +134,9 @@ class CloudVideoIntelligenceDetectVideoExplicitContentOperator(BaseOperator): :param retry: Retry object used to determine when/if to retry requests. If None is specified, requests will not be retried. :type retry: google.api_core.retry.Retry + :param timeout: Optional, The amount of time, in seconds, to wait for the request to complete. + Note that if retry is specified, the timeout applies to each individual attempt. + :type timeout: float :param gcp_conn_id: Optional, The connection ID used to connect to Google Cloud Platform. Defaults to ``google_cloud_default``. :type gcp_conn_id: str @@ -144,6 +153,7 @@ def __init__( video_context=None, location=None, retry=None, + timeout=None, gcp_conn_id="google_cloud_default", *args, **kwargs @@ -156,6 +166,7 @@ def __init__( self.location = location self.retry = retry self.gcp_conn_id = gcp_conn_id + self.timeout = timeout def execute(self, context): hook = CloudVideoIntelligenceHook(gcp_conn_id=self.gcp_conn_id) @@ -166,6 +177,7 @@ def execute(self, context): location=self.location, retry=self.retry, features=[enums.Feature.EXPLICIT_CONTENT_DETECTION], + timeout=self.timeout ) self.log.info("Processing video for explicit content annotations") result = MessageToDict(operation.result()) @@ -201,6 +213,9 @@ class CloudVideoIntelligenceDetectVideoShotsOperator(BaseOperator): :param retry: Retry object used to determine when/if to retry requests. If None is specified, requests will not be retried. :type retry: google.api_core.retry.Retry + :param timeout: Optional, The amount of time, in seconds, to wait for the request to complete. + Note that if retry is specified, the timeout applies to each individual attempt. + :type timeout: float :param gcp_conn_id: Optional, The connection ID used to connect to Google Cloud Platform. Defaults to ``google_cloud_default``. :type gcp_conn_id: str @@ -217,6 +232,7 @@ def __init__( video_context=None, location=None, retry=None, + timeout=None, gcp_conn_id="google_cloud_default", *args, **kwargs @@ -229,6 +245,7 @@ def __init__( self.location = location self.retry = retry self.gcp_conn_id = gcp_conn_id + self.timeout = timeout def execute(self, context): hook = CloudVideoIntelligenceHook(gcp_conn_id=self.gcp_conn_id) @@ -239,6 +256,7 @@ def execute(self, context): location=self.location, retry=self.retry, features=[enums.Feature.SHOT_CHANGE_DETECTION], + timeout=self.timeout ) self.log.info("Processing video for video shots annotations") result = MessageToDict(operation.result()) diff --git a/tests/gcp/operators/test_video_intelligence.py b/tests/gcp/operators/test_video_intelligence.py index ab6994cc27b242..ac71abb674029b 100644 --- a/tests/gcp/operators/test_video_intelligence.py +++ b/tests/gcp/operators/test_video_intelligence.py @@ -56,6 +56,7 @@ def test_detect_video_labels_green_path(self, mock_hook): video_context=None, location=None, retry=None, + timeout=None ) @mock.patch("airflow.gcp.operators.video_intelligence.CloudVideoIntelligenceHook") @@ -76,6 +77,7 @@ def test_detect_video_explicit_content_green_path(self, mock_hook): video_context=None, location=None, retry=None, + timeout=None ) @mock.patch("airflow.gcp.operators.video_intelligence.CloudVideoIntelligenceHook") @@ -96,4 +98,5 @@ def test_detect_video_shots_green_path(self, mock_hook): video_context=None, location=None, retry=None, + timeout=None )