Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-6006] Remove dag out of airflow package import. Depends on [AIRFLOW-6004] [AIRFLOW-6005] #6598

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
43 changes: 43 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,47 @@ repos:
files: ^BREEZE.rst$|^breeze$|^breeze-complete$
pass_filenames: false
require_serial: true
- id: base-operator
language: pygrep
name: Make sure BaseOperator is imported from airflow.models.baseoperator in core
entry: "from airflow.models import.* BaseOperator"
files: \.py$
pass_filenames: true
exclude: >
(?x)
^airflow/gcp/.*$|
^airflow/hooks/.*$|
^airflow/operators/.*$|
^airflow/sensors/.*$|
^airflow/providers/.*$|
^airflow/contrib/.*$
- id: base-operator
language: pygrep
name: Make sure BaseOperator is imported from airflow.models outside of core
entry: "from airflow.models.baseoperator import.* BaseOperator"
pass_filenames: true
files: >
(?x)
^airflow/gcp/.*$|
^airflow/hooks/.*$|
^airflow/operators/.*$|
^airflow/sensors/.*$|
^airflow/providers/.*\.py$|
^airflow/contrib/.*\.py$
- id: airflow-exception
language: pygrep
name: Make sure AirflowException is imported using 'from airflow import AirflowException'
entry: "from airflow.exceptions import.* AirflowException"
pass_filenames: true
exclude: ^airflow/__init__\.py$
files: \.py$
- id: airflow-dag
language: pygrep
name: Make sure DAG is imported using 'from airflow import DAG'
entry: "from airflow.models import.* DAG|from airflow.models.dag import.* DAG"
pass_filenames: true
exclude: ^airflow/models/__init__\.py$|^airflow/__init__\.py$
files: \.py$
- id: build
name: Check if image build is needed
entry: ./scripts/ci/pre_commit_ci_build.sh
Expand Down Expand Up @@ -217,12 +258,14 @@ repos:
files: \.py$
exclude: ^tests/.*\.py$|^airflow/_vendor/.*$
pass_filenames: true
require_serial: true
- id: pylint
name: Run pylint for tests
language: system
entry: "./scripts/ci/pre_commit_pylint_tests.sh"
files: ^tests/.*\.py$
pass_filenames: true
require_serial: true
- id: flake8
name: Run flake8
language: system
Expand Down
51 changes: 50 additions & 1 deletion CONTRIBUTING.rst
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,11 @@ image built locally):
=================================== ================================================================ ============
**Hooks** **Description** **Breeze**
=================================== ================================================================ ============
``airflow-settings`` Check if airflow import settings are used well.
``airflow-dag`` Make sure airflow DAG is imported from correct package
----------------------------------- ---------------------------------------------------------------- ------------
``airflow-exception`` Make sure airflow exception is imported from correct package
----------------------------------- ---------------------------------------------------------------- ------------
``base-operator`` Checks that BaseOperator is imported properly
----------------------------------- ---------------------------------------------------------------- ------------
``build`` Builds image for check-apache-licence, mypy, pylint, flake8. *
----------------------------------- ---------------------------------------------------------------- ------------
Expand Down Expand Up @@ -509,6 +513,51 @@ You can always skip running the tests by providing ``--no-verify`` flag to the

To check other usage types of the pre-commit framework, see `Pre-commit website <https://pre-commit.com/>`__.

Importing Airflow core objects
==============================

When you implement core features or DAGs you might need to import some of the core objects or modules.
Since Apache Airflow can be used both as application (by internal classes) and as library (by DAGs), there are
different ways those core objects and packages are imported.

Airflow imports some of the core objects directly to 'airflow' package so that they can be used from there.

Those criteria were assumed for choosing what import path to use:

* If you work on a core feature inside Apache Airflow, you should import the objects directly from the
package where the object is defined - this minimises the risk of cyclic imports.
* If you import the objects from any of 'providers' classes, you should import the objects from
'airflow' or 'airflow.models', It is very important for back-porting operators/hooks/sensors
to Airflow 1.10.* (AIP-21)
* If you import objects from within a DAG you write, you should import them from 'airflow' or
'airflow.models' package where stable location of such import is important.

Those checks enforced for the most important and repeated objects via pre-commit hooks as described below.

BaseOperator
------------

The BaseOperator should be imported:
* as ``from airflow.models import BaseOperator`` in external DAG, provider's operator, or custom operator
* as ``from airflow.models.baseoperator import BaseOperator`` in Airflow core to avoid cyclic imports

DAG
---

The DAG should be imported:
* as ``from airflow import DAG`` in external DAG, provider's operator, or custom operator
* as ``from airflow.models.dag import DAG`` in Airflow core to avoid cyclic imports


AirflowException
----------------

The AirflowException should be imported directly from airflow package:

.. code-block:: python

from airflow import AirflowException

Travis CI Testing Framework
===========================

Expand Down
10 changes: 9 additions & 1 deletion UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ assists users migrating to a new version.

## Airflow Master

### Changes to settings

CONTEXT_MANAGER_DAG was removed from settings. It's role has been taken by `DagContext` in
'airflow.models.dag'. One of the reasons was that settings should be rather static than store
dynamic context from the DAG, but the main one is that moving the context out of settings allowed to
untangle cyclic imports between DAG, BaseOperator, SerializedDAG, SerializedBaseOperator which was
part of AIRFLOW-6010.

### Removal of redirect_stdout, redirect_stderr

Function `redirect_stderr` and `redirect_stdout` from `airflow.utils.log.logging_mixin` module has
Expand Down Expand Up @@ -1443,7 +1451,7 @@ Type "help", "copyright", "credits" or "license" for more information.
>>> from airflow.settings import *
>>>
>>> from datetime import datetime
>>> from airflow import DAG
>>> from airflow.models.dag import DAG
>>> from airflow.operators.dummy_operator import DummyOperator
>>>
>>> dag = DAG('simple_dag', start_date=datetime(2017, 9, 1))
Expand Down
8 changes: 5 additions & 3 deletions airflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,22 @@
# pylint:disable=wrong-import-position
from typing import Callable, Optional

# noinspection PyUnresolvedReferences
from airflow import utils
from airflow import settings
from airflow import version
from airflow.executors.all_executors import AllExecutors
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.models import DAG
from airflow.models.dag import DAG

__version__ = version.version

settings.initialize()

login = None # type: Optional[Callable]

from airflow import executors
from airflow import hooks
from airflow import macros
from airflow import operators
Expand All @@ -60,5 +62,5 @@ def __init__(self, namespace):
operators._integrate_plugins() # pylint:disable=protected-access
sensors._integrate_plugins() # pylint:disable=protected-access
hooks._integrate_plugins() # pylint:disable=protected-access
executors._integrate_plugins() # pylint:disable=protected-access
AllExecutors._integrate_plugins() # pylint:disable=protected-access
macros._integrate_plugins() # pylint:disable=protected-access
3 changes: 2 additions & 1 deletion airflow/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@

from importlib import import_module

from airflow import AirflowException
from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException, AirflowException
from airflow.exceptions import AirflowConfigException
from airflow.utils.log.logging_mixin import LoggingMixin


Expand Down
3 changes: 2 additions & 1 deletion airflow/api/common/experimental/delete_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@

from airflow import models
from airflow.exceptions import DagNotFound
from airflow.models import DagModel, SerializedDagModel, TaskFail
from airflow.models import DagModel, TaskFail
from airflow.models.serialized_dag import SerializedDagModel
from airflow.settings import STORE_SERIALIZED_DAGS
from airflow.utils.db import provide_session
from airflow.utils.log.logging_mixin import LoggingMixin
Expand Down
2 changes: 1 addition & 1 deletion airflow/api/common/experimental/get_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
# specific language governing permissions and limitations
# under the License.
"""Get code APIs."""
from airflow import AirflowException
from airflow.api.common.experimental import check_and_get_dag
from airflow.exceptions import AirflowException
from airflow.www import utils as wwwutils


Expand Down
3 changes: 2 additions & 1 deletion airflow/api/common/experimental/mark_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
from sqlalchemy import or_

from airflow.jobs import BackfillJob
from airflow.models import BaseOperator, DagRun, TaskInstance
from airflow.models import DagRun, TaskInstance
from airflow.models.baseoperator import BaseOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.utils import timezone
from airflow.utils.db import provide_session
Expand Down
4 changes: 2 additions & 2 deletions airflow/cli/commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from contextlib import redirect_stderr, redirect_stdout

from airflow import DAG, AirflowException, conf, jobs, settings
from airflow.executors import get_default_executor
from airflow.executors.all_executors import AllExecutors
from airflow.models import DagPickle, TaskInstance
from airflow.ti_deps.dep_context import SCHEDULER_QUEUED_DEPS, DepContext
from airflow.utils import cli as cli_utils, db
Expand Down Expand Up @@ -69,7 +69,7 @@ def _run(args, dag, ti):
print(e)
raise e

executor = get_default_executor()
executor = AllExecutors.get_default_executor()
executor.start()
print("Sending to executor.")
executor.queue_task_instance(
Expand Down
3 changes: 2 additions & 1 deletion airflow/config_templates/default_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@
"""Default celery configuration."""
import ssl

from airflow import AirflowException
from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException, AirflowException
from airflow.exceptions import AirflowConfigException
from airflow.utils.log.logging_mixin import LoggingMixin


Expand Down
2 changes: 1 addition & 1 deletion airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ def get_airflow_test_config(airflow_home):
has_option = conf.has_option
remove_option = conf.remove_option
as_dict = conf.as_dict
set = conf.set # noqa
set = conf.set # noqa

for func in [load_test_config, get, getboolean, getfloat, getint, has_option,
remove_option, as_dict, set]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@



from airflow import DAG
from airflow.models.dag import DAG
from airflow.contrib.operators.jenkins_job_trigger_operator import JenkinsJobTriggerOperator
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.hooks.jenkins_hook import JenkinsHook
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import os

import airflow
from airflow.models import DAG
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

args = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import os

import airflow
from airflow import DAG
from airflow.contrib.example_dags.libs.helper import print_stuff
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

default_args = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"""
This is an example dag for using the KubernetesPodOperator.
"""
from airflow.models import DAG
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.utils.log.logging_mixin import LoggingMixin

Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/example_dags/example_papermill_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from datetime import timedelta

import airflow
from airflow.models import DAG
from airflow import DAG
from airflow.operators.papermill_operator import PapermillOperator

default_args = {
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/example_dags/example_winrm_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@
from datetime import timedelta

import airflow
from airflow import DAG
from airflow.contrib.hooks.winrm_hook import WinRMHook
from airflow.contrib.operators.winrm_operator import WinRMOperator
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator

default_args = {
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/hooks/aws_dynamodb_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
"""
This module contains the AWS DynamoDB hook
"""
from airflow import AirflowException
from airflow.contrib.hooks.aws_hook import AwsHook
from airflow.exceptions import AirflowException


class AwsDynamoDBHook(AwsHook):
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/hooks/aws_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

import boto3

from airflow.exceptions import AirflowException
from airflow import AirflowException
from airflow.hooks.base_hook import BaseHook


Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/hooks/azure_container_instance_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from azure.mgmt.containerinstance import ContainerInstanceManagementClient
from zope.deprecation import deprecation

from airflow.exceptions import AirflowException
from airflow import AirflowException
from airflow.hooks.base_hook import BaseHook


Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/hooks/cloudant_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"""Hook for Cloudant"""
from cloudant import cloudant

from airflow.exceptions import AirflowException
from airflow import AirflowException
from airflow.hooks.base_hook import BaseHook


Expand Down
3 changes: 1 addition & 2 deletions airflow/contrib/hooks/databricks_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@
from requests import exceptions as requests_exceptions
from requests.auth import AuthBase

from airflow import __version__
from airflow.exceptions import AirflowException
from airflow import AirflowException, __version__
from airflow.hooks.base_hook import BaseHook

RESTART_CLUSTER_ENDPOINT = ("POST", "api/2.0/clusters/restart")
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/hooks/datadog_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from datadog import api, initialize

from airflow.exceptions import AirflowException
from airflow import AirflowException
from airflow.hooks.base_hook import BaseHook
from airflow.utils.log.logging_mixin import LoggingMixin

Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/hooks/discord_webhook_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import json
import re

from airflow.exceptions import AirflowException
from airflow import AirflowException
from airflow.hooks.http_hook import HttpHook


Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/hooks/emr_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
# specific language governing permissions and limitations
# under the License.

from airflow import AirflowException
from airflow.contrib.hooks.aws_hook import AwsHook
from airflow.exceptions import AirflowException


class EmrHook(AwsHook):
Expand Down
Loading