Skip to content

Commit

Permalink
Addressing pr review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
treff7es committed Jul 13, 2022
1 parent 9d3a3de commit 063895a
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from datahub_provider.entities import Dataset
from datahub_provider.operators.datahub_operation_sensor import (
DatahubOperationCircuitBreakerSensor,
DataHubOperationCircuitBreakerSensor,
)

dag = DAG(
Expand All @@ -17,7 +17,7 @@
)

# New DataHub Operation Circuit Breaker Sensor
pet_profiles_operation_sensor = DatahubOperationCircuitBreakerSensor(
pet_profiles_operation_sensor = DataHubOperationCircuitBreakerSensor(
task_id="pet_profiles_operation_sensor",
datahub_rest_conn_id="datahub_longtail",
urn=[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@

from datahub_provider.entities import Dataset
from datahub_provider.operators.datahub_assertion_operator import (
DatahubAssertionOperator,
DataHubAssertionOperator,
)
from datahub_provider.operators.datahub_operation_sensor import (
DatahubOperationCircuitBreakerSensor,
DataHubOperationCircuitBreakerSensor,
)

dag = DAG(
Expand All @@ -19,7 +19,7 @@
catchup=False,
)

items_operation_sensor = DatahubOperationCircuitBreakerSensor(
items_operation_sensor = DataHubOperationCircuitBreakerSensor(
dag=dag,
task_id="pet_profiles_operation_sensor",
datahub_rest_conn_id="datahub_longtail",
Expand All @@ -30,9 +30,9 @@
)

# Assertion circuit breaker to check if there are assertions for the urns specified.
# check_last_assertion_time is enabled which means it will get from the latest operation the timeframe
# verify_after_last_update is enabled which means it will get from the latest operation the timeframe
# it accepts assertions.
assertion_circuit_breaker = DatahubAssertionOperator(
assertion_circuit_breaker = DataHubAssertionOperator(
task_id="pet_profiles_assertion_circuit_breaker",
datahub_rest_conn_id="datahub_longtail",
urn=[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@


class AssertionCircuitBreakerConfig(CircuitBreakerConfig):
check_last_assertion_time: bool = Field(
verify_after_last_update: bool = Field(
default=True,
description="Whether to check if assertion happened after the dataset was last updated.",
)
Expand Down Expand Up @@ -89,7 +89,7 @@ class AssertionResult:
)
return True
elif last_assertion.state == "SUCCESS":
print(f"Found successful assertion: {assertion_urn}")
logger.info(f"Found successful assertion: {assertion_urn}")
result = False
if last_updated is not None:
last_run = datetime.fromtimestamp(last_assertion.time / 1000)
Expand All @@ -109,7 +109,7 @@ def is_circuit_breaker_active(self, urn: str) -> bool:

last_updated: Optional[datetime] = None

if self.config.check_last_assertion_time:
if self.config.verify_after_last_update:
last_updated = self.get_last_updated(urn)
logger.info(
f"The dataset {urn} was last updated at {last_updated}, using this as min assertion date."
Expand All @@ -129,7 +129,7 @@ def is_circuit_breaker_active(self, urn: str) -> bool:

if self._check_if_assertion_failed(
assertions,
last_updated if self.config.check_last_assertion_time is True else None,
last_updated if self.config.verify_after_last_update is True else None,
):
logger.info(f"Dataset {urn} has failed or missing assertion(s).")
return True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@
from datahub_provider.hooks.datahub import DatahubRestHook


class DatahubAssertionOperator(BaseOperator):
class DataHubAssertionOperator(BaseOperator):
r"""
Datahub Assertion Circuit Breaker Operator.
DataHub Assertion Circuit Breaker Operator.
:param urn: The DataHub dataset unique identifier. (templated)
:param datahub_rest_conn_id: The REST datahub connection id to communicate with DataHub
which is set as Airflow connection.
:param check_last_assertion_time: If set it checks assertions after the last operation was set on the dataset.
By default it is True.
:param time_delta: If check_last_assertion_time is False it checks for assertion within the time delta.
:param time_delta: If verify_after_last_update is False it checks for assertion within the time delta.
"""

template_fields: Sequence[str] = ("urn",)
Expand Down Expand Up @@ -48,15 +48,15 @@ def __init__( # type: ignore[no-untyped-def]
datahub_host=host,
datahub_token=password,
timeout=timeout_sec,
check_last_assertion_time=check_last_assertion_time,
verify_after_last_update=check_last_assertion_time,
time_delta=time_delta if time_delta else datetime.timedelta(days=1),
)

self.circuit_breaker = AssertionCircuitBreaker(config=config)

def execute(self, context: Any) -> bool:
if "datahub_silence_circuit_breakers" in context["dag_run"].conf:
print(
self.log.info(
"Circuit breaker is silenced because datahub_silence_circuit_breakers config is set"
)
return True
Expand All @@ -73,7 +73,6 @@ def execute(self, context: Any) -> bool:
self.log.info(f"Checking if dataset {self.urn} is ready to be consumed")
ret = self.circuit_breaker.is_circuit_breaker_active(urn=urn)
if ret:
print(f"Dataset {self.urn} is not in consumable state")
raise Exception(f"Dataset {self.urn} is not in consumable state")

return True
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@
from datahub_provider.hooks.datahub import DatahubRestHook


class DatahubAssertionSensor(BaseSensorOperator):
class DataHubAssertionSensor(BaseSensorOperator):
r"""
Datahub Assertion Circuit Breaker Sensor.
DataHub Assertion Circuit Breaker Sensor.
:param urn: The DataHub dataset unique identifier. (templated)
:param datahub_rest_conn_id: The REST datahub connection id to communicate with DataHub
which is set as Airflow connection.
:param check_last_assertion_time: If set it checks assertions after the last operation was set on the dataset.
By default it is True.
:param time_delta: If check_last_assertion_time is False it checks for assertion within the time delta.
:param time_delta: If verify_after_last_update is False it checks for assertion within the time delta.
"""

template_fields: Sequence[str] = ("urn",)
Expand Down Expand Up @@ -48,14 +48,14 @@ def __init__( # type: ignore[no-untyped-def]
datahub_host=host,
datahub_token=password,
timeout=timeout_sec,
check_last_assertion_time=check_last_assertion_time,
verify_after_last_update=check_last_assertion_time,
time_delta=time_delta,
)
self.circuit_breaker = AssertionCircuitBreaker(config=config)

def poke(self, context: Any) -> bool:
if "datahub_silence_circuit_breakers" in context["dag_run"].conf:
print(
self.log.info(
"Circuit breaker is silenced because datahub_silence_circuit_breakers config is set"
)
return True
Expand All @@ -72,7 +72,7 @@ def poke(self, context: Any) -> bool:
self.log.info(f"Checking if dataset {self.urn} is ready to be consumed")
ret = self.circuit_breaker.is_circuit_breaker_active(urn=urn)
if ret:
print(f"Dataset {self.urn} is not in consumable state")
self.log.info(f"Dataset {self.urn} is not in consumable state")
return False

return True
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
from datahub_provider.hooks.datahub import DatahubRestHook


class DatahubOperationCircuitBreakerOperator(BaseSensorOperator):
class DataHubOperationCircuitBreakerOperator(BaseSensorOperator):
r"""
Datahub Operation Circuit Breaker Operator.
DataHub Operation Circuit Breaker Operator.
:param urn: The DataHub dataset unique identifier. (templated)
:param datahub_rest_conn_id: The REST datahub connection id to communicate with DataHub
Expand Down Expand Up @@ -70,7 +70,7 @@ def __init__( # type: ignore[no-untyped-def]

def execute(self, context: Any) -> bool:
if "datahub_silence_circuit_breakers" in context["dag_run"].conf:
print(
self.log.info(
"Circuit breaker is silenced because datahub_silence_circuit_breakers config is set"
)
return True
Expand All @@ -93,7 +93,6 @@ def execute(self, context: Any) -> bool:
source_type=self.source_type,
)
if ret:
print(f"Dataset {self.urn} is not in consumable state")
raise Exception(f"Dataset {self.urn} is not in consumable state")

return True
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
from datahub_provider.hooks.datahub import DatahubRestHook


class DatahubOperationCircuitBreakerSensor(BaseSensorOperator):
class DataHubOperationCircuitBreakerSensor(BaseSensorOperator):
r"""
Datahub Operation Circuit Breaker Sensor.
DataHub Operation Circuit Breaker Sensor.
:param urn: The DataHub dataset unique identifier. (templated)
:param datahub_rest_conn_id: The REST datahub connection id to communicate with DataHub
Expand Down Expand Up @@ -72,7 +72,7 @@ def __init__( # type: ignore[no-untyped-def]

def poke(self, context: Any) -> bool:
if "datahub_silence_circuit_breakers" in context["dag_run"].conf:
print(
self.log.info(
"Circuit breaker is silenced because datahub_silence_circuit_breakers config is set"
)
return True
Expand All @@ -94,7 +94,7 @@ def poke(self, context: Any) -> bool:
source_type=self.source_type,
)
if ret:
print(f"Dataset {self.urn} is not in consumable state")
self.log.info(f"Dataset {self.urn} is not in consumable state")
return False

return True

0 comments on commit 063895a

Please sign in to comment.