Skip to content

Commit

Permalink
Adding Airflow example for circuit breakers
Browse files Browse the repository at this point in the history
  • Loading branch information
treff7es committed Jun 17, 2022
1 parent 2e57cf7 commit d861b4a
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import pendulum
from airflow.models import DAG
from airflow.operators.bash import BashOperator

from datahub.api.graphql.operation import Operation
from datahub_provider.entities import Dataset
from datahub_provider.hooks.datahub import DatahubRestHook

dag = DAG(
dag_id="snowflake_load",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule_interval="0 0 * * *",
catchup=False,
)


# NEW - OPERATION PUSH
def report_operation(context):
hook: DatahubRestHook = DatahubRestHook("datahub_longtail")
host, password, timeout_sec = hook._get_config()
reporter = Operation(datahub_host=host, datahub_token=password, timeout=timeout_sec)
task = context["ti"].task
for outlet in task._outlets:
print(f"Reporting insert operation for {outlet.urn}")
reporter.report_operation(
urn=outlet.urn, operation_type="INSERT", num_affected_rows=123
)


# NEW - OPERATION PUSH
pet_profiles_load = BashOperator(
task_id="load_s3_adoption_pet_profiles",
dag=dag,
inlets=[Dataset("s3", "longtail-core-data/mongo/adoption/pet_profiles")],
outlets=[Dataset("snowflake", "long_tail_companions.adoption.pet_profiles")],
bash_command="echo Dummy Task",
on_success_callback=report_operation,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import datetime

import pendulum
from airflow.models import DAG
from airflow.operators.bash import BashOperator

from datahub_provider.entities import Dataset
from datahub_provider.operators.datahub_operation_sensor import (
DatahubOperationCircuitBreakerSensor,
)

dag = DAG(
dag_id="marketing-send_emails",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule_interval="0 0 * * *",
catchup=False,
)

# New Datahub Operation Circuit Breaker Sensor
pet_profiles_operation_sensor = DatahubOperationCircuitBreakerSensor(
task_id="pet_profiles_operation_sensor",
datahub_rest_conn_id="datahub_longtail",
urn=[
"urn:li:dataset:(urn:li:dataPlatform:snowflake,long_tail_companions.adoption.pet_profiles,PROD)"
],
time_delta=datetime.timedelta(minutes=10),
)

send_email = BashOperator(
task_id="send_emails",
dag=dag,
inlets=[Dataset("snowflake", "long_tail_companions.adoption.pet_profiles")],
bash_command="echo Dummy Task",
)

pet_profiles_operation_sensor.set_downstream(send_email)
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import pendulum
from airflow.models import DAG
from airflow.operators.bash import BashOperator

from datahub.api.graphql.operation import Operation
from datahub_provider.entities import Dataset
from datahub_provider.hooks.datahub import DatahubRestHook

dag = DAG(
dag_id="snowflake_load",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule_interval="0 0 * * *",
catchup=False,
)


def report_operation(context):
hook: DatahubRestHook = DatahubRestHook("datahub_longtail")
host, password, timeout_sec = hook._get_config()
reporter = Operation(datahub_host=host, datahub_token=password, timeout=timeout_sec)
task = context["ti"].task
for inlet in task._outlets:
reporter.report_operation(urn=inlet.urn, operation_type="INSERT")


pet_profiles_load = BashOperator(
task_id="load_s3_adoption_pet_profiles",
dag=dag,
inlets=[Dataset("s3", "longtail-core-data/mongo/adoption/pet_profiles")],
outlets=[Dataset("snowflake", "long_tail_companions.adoption.pet_profiles")],
bash_command="echo Dummy Task",
on_success_callback=report_operation,
)

# NEW RUNNING GE ASSERTION
run_ge_tests = BashOperator(
task_id="pet_profiles_ge_tests_run",
inlets=[Dataset("snowflake", "long_tail_companions.adoption.pet_profiles")],
bash_command="echo /usr/local/airflow/.local/bin/great_expectations checkpoint run pet_profiles",
)

pet_profiles_load.set_downstream(run_ge_tests)
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import datetime

import pendulum
from airflow.models import DAG
from airflow.operators.bash import BashOperator

from datahub_provider.entities import Dataset
from datahub_provider.operators.datahub_assertion_operator import (
DatahubAssertionOperator,
)
from datahub_provider.operators.datahub_operation_sensor import (
DatahubOperationCircuitBreakerSensor,
)

dag = DAG(
dag_id="marketing-send_emails",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule_interval="0 0 * * *",
catchup=False,
)

items_operation_sensor = DatahubOperationCircuitBreakerSensor(
dag=dag,
task_id="pet_profiles_operation_sensor",
datahub_rest_conn_id="datahub_longtail",
urn=[
"urn:li:dataset:(urn:li:dataPlatform:snowflake,long_tail_companions.adoption.pet_profiles,PROD)"
],
time_delta=datetime.timedelta(days=1),
)

# NEW ASSERTION OPERATOR
assertion_circuit_breaker = DatahubAssertionOperator(
task_id="pet_profiles_assertion_circuit_breaker",
datahub_rest_conn_id="datahub_longtail",
urn=[
"urn:li:dataset:(urn:li:dataPlatform:snowflake,long_tail_companions.adoption.pet_profiles,PROD)"
],
check_last_assertion_time=True,
)

send_email = BashOperator(
task_id="send_emails",
dag=dag,
inlets=[Dataset("snowflake", "long_tail_companions.adoption.pet_profiles")],
bash_command="echo Dummy Task",
)

items_operation_sensor.set_downstream(assertion_circuit_breaker)
assertion_circuit_breaker.set_downstream(send_email)

0 comments on commit d861b4a

Please sign in to comment.