Skip to content

Commit

Permalink
[AIRFLOW-3964][AIP-17] Consolidate and de-dup sensor tasks using Smar…
Browse files Browse the repository at this point in the history
…t Sensor (#5499)

Co-authored-by: Yingbo Wang <[email protected]>
  • Loading branch information
YingboWang and Yingbo Wang authored Sep 8, 2020
1 parent ff41361 commit ac943c9
Show file tree
Hide file tree
Showing 44 changed files with 2,062 additions and 132 deletions.
33 changes: 33 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2337,3 +2337,36 @@
to identify the task.
Should be supplied in the format: ``key = value``
options: []
- name: smart_sensor
description: ~
options:
- name: use_smart_sensor
description: |
When `use_smart_sensor` is True, Airflow redirects multiple qualified sensor tasks to
smart sensor task.
version_added: ~
type: boolean
example: ~
default: "False"
- name: shard_code_upper_limit
description: |
`shard_code_upper_limit` is the upper limit of `shard_code` value. The `shard_code` is generated
by `hashcode % shard_code_upper_limit`.
version_added: ~
type: int
example: ~
default: "10000"
- name: shards
description: |
The number of running smart sensor processes for each service.
version_added: ~
type: int
example: ~
default: "5"
- name: sensors_enabled
description: |
comma separated sensor classes support in smart_sensor.
version_added: ~
type: string
example: ~
default: "NamedHivePartitionSensor"
15 changes: 15 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -1179,3 +1179,18 @@ worker_resources =
# The worker pods will be given these static labels, as well as some additional dynamic labels
# to identify the task.
# Should be supplied in the format: ``key = value``

[smart_sensor]
# When `use_smart_sensor` is True, Airflow redirects multiple qualified sensor tasks to
# smart sensor task.
use_smart_sensor = False

# `shard_code_upper_limit` is the upper limit of `shard_code` value. The `shard_code` is generated
# by `hashcode % shard_code_upper_limit`.
shard_code_upper_limit = 10000

# The number of running smart sensor processes for each service.
shards = 5

# comma separated sensor classes support in smart_sensor.
sensors_enabled = NamedHivePartitionSensor
7 changes: 7 additions & 0 deletions airflow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ def __init__(self, reschedule_date):
self.reschedule_date = reschedule_date


class AirflowSmartSensorException(AirflowException):
"""
Raise after the task register itself in the smart sensor service
It should exit without failing a task
"""


class InvalidStatsNameException(AirflowException):
"""Raise when name of the stats is invalid"""

Expand Down
7 changes: 5 additions & 2 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,7 @@ def process_file(
self.log.info("Processing file %s for tasks to queue", file_path)

try:
dagbag = DagBag(file_path, include_examples=False)
dagbag = DagBag(file_path, include_examples=False, include_smart_sensor=False)
except Exception: # pylint: disable=broad-except
self.log.exception("Failed at reloading the DAG file %s", file_path)
Stats.incr('dag_file_refresh_error', 1, 1)
Expand Down Expand Up @@ -1743,7 +1743,10 @@ def _process_and_execute_tasks(self, simple_dag_bag: SimpleDagBag) -> None:
# NONE so we don't try to re-run it.
self._change_state_for_tis_without_dagrun(
simple_dag_bag=simple_dag_bag,
old_states=[State.QUEUED, State.SCHEDULED, State.UP_FOR_RESCHEDULE],
old_states=[State.QUEUED,
State.SCHEDULED,
State.UP_FOR_RESCHEDULE,
State.SENSING],
new_state=State.NONE
)
self._execute_task_instances(simple_dag_bag)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Add sensor_instance table
Revision ID: e38be357a868
Revises: 939bb1e647c8
Create Date: 2019-06-07 04:03:17.003939
"""
import sqlalchemy as sa
from alembic import op
from sqlalchemy import func
from sqlalchemy.dialects import mysql

# revision identifiers, used by Alembic.
revision = 'e38be357a868'
down_revision = 'da3f683c3a5a'
branch_labels = None
depends_on = None


def mssql_timestamp(): # noqa: D103
return sa.DateTime()


def mysql_timestamp(): # noqa: D103
return mysql.TIMESTAMP(fsp=6)


def sa_timestamp(): # noqa: D103
return sa.TIMESTAMP(timezone=True)


def upgrade(): # noqa: D103

conn = op.get_bind()
if conn.dialect.name == 'mysql':
timestamp = mysql_timestamp
elif conn.dialect.name == 'mssql':
timestamp = mssql_timestamp
else:
timestamp = sa_timestamp

op.create_table(
'sensor_instance',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('task_id', sa.String(length=250), nullable=False),
sa.Column('dag_id', sa.String(length=250), nullable=False),
sa.Column('execution_date', timestamp(), nullable=False),
sa.Column('state', sa.String(length=20), nullable=True),
sa.Column('try_number', sa.Integer(), nullable=True),
sa.Column('start_date', timestamp(), nullable=True),
sa.Column('operator', sa.String(length=1000), nullable=False),
sa.Column('op_classpath', sa.String(length=1000), nullable=False),
sa.Column('hashcode', sa.BigInteger(), nullable=False),
sa.Column('shardcode', sa.Integer(), nullable=False),
sa.Column('poke_context', sa.Text(), nullable=False),
sa.Column('execution_context', sa.Text(), nullable=True),
sa.Column('created_at', timestamp(), default=func.now(), nullable=False),
sa.Column('updated_at', timestamp(), default=func.now(), nullable=False),
sa.PrimaryKeyConstraint('id')
)
op.create_index(
'ti_primary_key',
'sensor_instance',
['dag_id', 'task_id', 'execution_date'],
unique=True
)
op.create_index('si_hashcode', 'sensor_instance', ['hashcode'], unique=False)
op.create_index('si_shardcode', 'sensor_instance', ['shardcode'], unique=False)
op.create_index('si_state_shard', 'sensor_instance', ['state', 'shardcode'], unique=False)
op.create_index('si_updated_at', 'sensor_instance', ['updated_at'], unique=False)


def downgrade(): # noqa: D103
op.drop_table('sensor_instance')
1 change: 1 addition & 0 deletions airflow/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from airflow.models.log import Log
from airflow.models.pool import Pool
from airflow.models.renderedtifields import RenderedTaskInstanceFields
from airflow.models.sensorinstance import SensorInstance # noqa: F401
from airflow.models.skipmixin import SkipMixin
from airflow.models.slamiss import SlaMiss
from airflow.models.taskfail import TaskFail
Expand Down
7 changes: 7 additions & 0 deletions airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1334,6 +1334,13 @@ def get_serialized_fields(cls):

return cls.__serialized_fields

def is_smart_sensor_compatible(self):
"""
Return if this operator can use smart service. Default False.
"""
return False


def chain(*tasks: Union[BaseOperator, Sequence[BaseOperator]]):
r"""
Expand Down
12 changes: 10 additions & 2 deletions airflow/models/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ class DagBag(BaseDagBag, LoggingMixin):
:param include_examples: whether to include the examples that ship
with airflow or not
:type include_examples: bool
:param include_smart_sensor: whether to include the smart sensor native
DAGs that create the smart sensor operators for whole cluster
:type include_smart_sensor: bool
:param read_dags_from_db: Read DAGs from DB if store_serialized_dags is ``True``.
If ``False`` DAGs are read from python files. This property is not used when
determining whether or not to write Serialized DAGs, that is done by checking
Expand All @@ -84,6 +87,7 @@ def __init__(
self,
dag_folder: Optional[str] = None,
include_examples: bool = conf.getboolean('core', 'LOAD_EXAMPLES'),
include_smart_sensor: bool = conf.getboolean('smart_sensor', 'USE_SMART_SENSOR'),
safe_mode: bool = conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE'),
read_dags_from_db: bool = False,
store_serialized_dags: Optional[bool] = None,
Expand Down Expand Up @@ -113,6 +117,7 @@ def __init__(
self.collect_dags(
dag_folder=dag_folder,
include_examples=include_examples,
include_smart_sensor=include_smart_sensor,
safe_mode=safe_mode)

def size(self) -> int:
Expand Down Expand Up @@ -391,6 +396,7 @@ def collect_dags(
dag_folder=None,
only_if_updated=True,
include_examples=conf.getboolean('core', 'LOAD_EXAMPLES'),
include_smart_sensor=conf.getboolean('smart_sensor', 'USE_SMART_SENSOR'),
safe_mode=conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE')):
"""
Given a file path or a folder, this method looks for python modules,
Expand All @@ -414,8 +420,10 @@ def collect_dags(
stats = []

dag_folder = correct_maybe_zipped(dag_folder)
for filepath in list_py_file_paths(dag_folder, safe_mode=safe_mode,
include_examples=include_examples):
for filepath in list_py_file_paths(dag_folder,
safe_mode=safe_mode,
include_examples=include_examples,
include_smart_sensor=include_smart_sensor):
try:
file_parse_start_dttm = timezone.utcnow()
found_dags = self.process_file(
Expand Down
Loading

0 comments on commit ac943c9

Please sign in to comment.