Skip to content

Commit

Permalink
[AIRFLOW-4768] Add timeout parameter to Cloud Video Intelligence oper…
Browse files Browse the repository at this point in the history
…ators (#5862)

cherry-picked from 2d46d5f
  • Loading branch information
turbaszek authored and kaxil committed Aug 30, 2019
1 parent f0f3da4 commit a4eb7f8
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 4 deletions.
27 changes: 23 additions & 4 deletions airflow/contrib/operators/gcp_video_intelligence_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
This module contains Google Cloud Vision operators.
"""

from google.protobuf.json_format import MessageToDict
from google.cloud.videointelligence_v1 import enums

from airflow.contrib.hooks.gcp_video_intelligence_hook import CloudVideoIntelligenceHook
from airflow.models import BaseOperator
from google.cloud.videointelligence_v1 import enums


class CloudVideoIntelligenceDetectVideoLabelsOperator(BaseOperator):
Expand Down Expand Up @@ -51,11 +55,13 @@ class CloudVideoIntelligenceDetectVideoLabelsOperator(BaseOperator):
:param retry: Retry object used to determine when/if to retry requests.
If None is specified, requests will not be retried.
:type retry: google.api_core.retry.Retry
:param timeout: Optional, The amount of time, in seconds, to wait for the request to complete.
Note that if retry is specified, the timeout applies to each individual attempt.
:type timeout: float
:param gcp_conn_id: Optional, The connection ID used to connect to Google Cloud
Platform. Defaults to ``google_cloud_default``.
:type gcp_conn_id: str
"""

# [START gcp_video_intelligence_detect_labels_template_fields]
template_fields = ("input_uri", "output_uri", "gcp_conn_id")
# [END gcp_video_intelligence_detect_labels_template_fields]
Expand All @@ -68,6 +74,7 @@ def __init__(
video_context=None,
location=None,
retry=None,
timeout=None,
gcp_conn_id="google_cloud_default",
*args,
**kwargs
Expand All @@ -80,6 +87,7 @@ def __init__(
self.location = location
self.retry = retry
self.gcp_conn_id = gcp_conn_id
self.timeout = timeout

def execute(self, context):
hook = CloudVideoIntelligenceHook(gcp_conn_id=self.gcp_conn_id)
Expand All @@ -90,6 +98,7 @@ def execute(self, context):
location=self.location,
retry=self.retry,
features=[enums.Feature.LABEL_DETECTION],
timeout=self.timeout
)
self.log.info("Processing video for label annotations")
result = MessageToDict(operation.result())
Expand Down Expand Up @@ -125,11 +134,13 @@ class CloudVideoIntelligenceDetectVideoExplicitContentOperator(BaseOperator):
:param retry: Retry object used to determine when/if to retry requests.
If None is specified, requests will not be retried.
:type retry: google.api_core.retry.Retry
:param timeout: Optional, The amount of time, in seconds, to wait for the request to complete.
Note that if retry is specified, the timeout applies to each individual attempt.
:type timeout: float
:param gcp_conn_id: Optional, The connection ID used to connect to Google Cloud
Platform. Defaults to ``google_cloud_default``.
:type gcp_conn_id: str
"""

# [START gcp_video_intelligence_detect_explicit_content_template_fields]
template_fields = ("input_uri", "output_uri", "gcp_conn_id")
# [END gcp_video_intelligence_detect_explicit_content_template_fields]
Expand All @@ -142,6 +153,7 @@ def __init__(
video_context=None,
location=None,
retry=None,
timeout=None,
gcp_conn_id="google_cloud_default",
*args,
**kwargs
Expand All @@ -154,6 +166,7 @@ def __init__(
self.location = location
self.retry = retry
self.gcp_conn_id = gcp_conn_id
self.timeout = timeout

def execute(self, context):
hook = CloudVideoIntelligenceHook(gcp_conn_id=self.gcp_conn_id)
Expand All @@ -164,6 +177,7 @@ def execute(self, context):
location=self.location,
retry=self.retry,
features=[enums.Feature.EXPLICIT_CONTENT_DETECTION],
timeout=self.timeout
)
self.log.info("Processing video for explicit content annotations")
result = MessageToDict(operation.result())
Expand Down Expand Up @@ -199,11 +213,13 @@ class CloudVideoIntelligenceDetectVideoShotsOperator(BaseOperator):
:param retry: Retry object used to determine when/if to retry requests.
If None is specified, requests will not be retried.
:type retry: google.api_core.retry.Retry
:param timeout: Optional, The amount of time, in seconds, to wait for the request to complete.
Note that if retry is specified, the timeout applies to each individual attempt.
:type timeout: float
:param gcp_conn_id: Optional, The connection ID used to connect to Google Cloud
Platform. Defaults to ``google_cloud_default``.
:type gcp_conn_id: str
"""

# [START gcp_video_intelligence_detect_video_shots_template_fields]
template_fields = ("input_uri", "output_uri", "gcp_conn_id")
# [END gcp_video_intelligence_detect_video_shots_template_fields]
Expand All @@ -216,6 +232,7 @@ def __init__(
video_context=None,
location=None,
retry=None,
timeout=None,
gcp_conn_id="google_cloud_default",
*args,
**kwargs
Expand All @@ -228,6 +245,7 @@ def __init__(
self.location = location
self.retry = retry
self.gcp_conn_id = gcp_conn_id
self.timeout = timeout

def execute(self, context):
hook = CloudVideoIntelligenceHook(gcp_conn_id=self.gcp_conn_id)
Expand All @@ -238,6 +256,7 @@ def execute(self, context):
location=self.location,
retry=self.retry,
features=[enums.Feature.SHOT_CHANGE_DETECTION],
timeout=self.timeout
)
self.log.info("Processing video for video shots annotations")
result = MessageToDict(operation.result())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def test_detect_video_labels_green_path(self, mock_hook):
video_context=None,
location=None,
retry=None,
timeout=None
)

@mock.patch("airflow.contrib.operators.gcp_video_intelligence_operator.CloudVideoIntelligenceHook")
Expand All @@ -76,6 +77,7 @@ def test_detect_video_explicit_content_green_path(self, mock_hook):
video_context=None,
location=None,
retry=None,
timeout=None
)

@mock.patch("airflow.contrib.operators.gcp_video_intelligence_operator.CloudVideoIntelligenceHook")
Expand All @@ -96,4 +98,5 @@ def test_detect_video_shots_green_path(self, mock_hook):
video_context=None,
location=None,
retry=None,
timeout=None
)

0 comments on commit a4eb7f8

Please sign in to comment.