From 35834c3809ce6f5f1dcff130d0e68cabed7f72de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Bregu=C5=82a?= Date: Thu, 26 Mar 2020 16:09:27 +0100 Subject: [PATCH] Remove Presto check operators (#7884) --- airflow/operators/presto_check_operator.py | 57 ++++- .../providers/presto/operators/__init__.py | 17 -- .../presto/operators/presto_check.py | 133 ----------- docs/autoapi_templates/index.rst | 2 - tests/providers/presto/operators/__init__.py | 18 -- .../presto/operators/test_presto_check.py | 221 ------------------ tests/test_core_to_contrib.py | 6 +- 7 files changed, 55 insertions(+), 399 deletions(-) delete mode 100644 airflow/providers/presto/operators/__init__.py delete mode 100644 airflow/providers/presto/operators/presto_check.py delete mode 100644 tests/providers/presto/operators/__init__.py delete mode 100644 tests/providers/presto/operators/test_presto_check.py diff --git a/airflow/operators/presto_check_operator.py b/airflow/operators/presto_check_operator.py index 486db20a41205..e7558794e1741 100644 --- a/airflow/operators/presto_check_operator.py +++ b/airflow/operators/presto_check_operator.py @@ -15,16 +15,63 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""This module is deprecated. Please use `airflow.providers.presto.operators.presto_check`.""" +"""This module is deprecated. Please use `airflow.operators.check_operator`.""" import warnings # pylint: disable=unused-import -from airflow.providers.presto.operators.presto_check import ( # noqa - PrestoCheckOperator, PrestoIntervalCheckOperator, PrestoValueCheckOperator, -) +from airflow.operators.check_operator import CheckOperator, IntervalCheckOperator, ValueCheckOperator # noqa warnings.warn( - "This module is deprecated. Please use `airflow.providers.presto.operators.presto_check`.", + "This module is deprecated. Please use `airflow.operators.check_operator`.", DeprecationWarning, stacklevel=2 ) + + +class PrestoCheckOperator(CheckOperator): + """ + This class is deprecated. + Please use `airflow.operators.check_operator.CheckOperator`. + """ + + def __init__(self, *args, **kwargs): + warnings.warn( + """This class is deprecated. + Please use `airflow.operators.check_operator.CheckOperator`.""", + DeprecationWarning, stacklevel=2 + ) + super().__init__(*args, **kwargs) + + +class PrestoIntervalCheckOperator(IntervalCheckOperator): + """ + This class is deprecated. + Please use `airflow.operators.check_operator.IntervalCheckOperator`. + """ + + def __init__(self, *args, **kwargs): + warnings.warn( + """ + This class is deprecated.l + Please use `airflow.operators.check_operator.IntervalCheckOperator`. + """, + DeprecationWarning, stacklevel=2 + ) + super().__init__(*args, **kwargs) + + +class PrestoValueCheckOperator(ValueCheckOperator): + """ + This class is deprecated. + Please use `airflow.operators.check_operator.ValueCheckOperator`. + """ + + def __init__(self, *args, **kwargs): + warnings.warn( + """ + This class is deprecated.l + Please use `airflow.operators.check_operator.ValueCheckOperator`. + """, + DeprecationWarning, stacklevel=2 + ) + super().__init__(*args, **kwargs) diff --git a/airflow/providers/presto/operators/__init__.py b/airflow/providers/presto/operators/__init__.py deleted file mode 100644 index 217e5db960782..0000000000000 --- a/airflow/providers/presto/operators/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. diff --git a/airflow/providers/presto/operators/presto_check.py b/airflow/providers/presto/operators/presto_check.py deleted file mode 100644 index 8e4a62fd2dadf..0000000000000 --- a/airflow/providers/presto/operators/presto_check.py +++ /dev/null @@ -1,133 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from typing import Any, Dict - -from airflow.operators.check_operator import CheckOperator, IntervalCheckOperator, ValueCheckOperator -from airflow.providers.presto.hooks.presto import PrestoHook -from airflow.utils.decorators import apply_defaults - - -class PrestoCheckOperator(CheckOperator): - """ - Performs checks against Presto. The ``PrestoCheckOperator`` expects - a sql query that will return a single row. Each value on that - first row is evaluated using python ``bool`` casting. If any of the - values return ``False`` the check is failed and errors out. - - Note that Python bool casting evals the following as ``False``: - - * ``False`` - * ``0`` - * Empty string (``""``) - * Empty list (``[]``) - * Empty dictionary or set (``{}``) - - Given a query like ``SELECT COUNT(*) FROM foo``, it will fail only if - the count ``== 0``. You can craft much more complex query that could, - for instance, check that the table has the same number of rows as - the source table upstream, or that the count of today's partition is - greater than yesterday's partition, or that a set of metrics are less - than 3 standard deviation for the 7 day average. - - This operator can be used as a data quality check in your pipeline, and - depending on where you put it in your DAG, you have the choice to - stop the critical path, preventing from - publishing dubious data, or on the side and receive email alerts - without stopping the progress of the DAG. - - :param sql: the sql to be executed - :type sql: str - :param presto_conn_id: reference to the Presto database - :type presto_conn_id: str - """ - - @apply_defaults - def __init__( - self, - sql: str, - presto_conn_id: str = 'presto_default', - *args, **kwargs) -> None: - super().__init__(sql=sql, *args, **kwargs) - - self.presto_conn_id = presto_conn_id - self.sql = sql - - def get_db_hook(self): - return PrestoHook(presto_conn_id=self.presto_conn_id) - - -class PrestoValueCheckOperator(ValueCheckOperator): - """ - Performs a simple value check using sql code. - - :param sql: the sql to be executed - :type sql: str - :param presto_conn_id: reference to the Presto database - :type presto_conn_id: str - """ - - @apply_defaults - def __init__( - self, - sql: str, - pass_value: Any, - tolerance: Any = None, - presto_conn_id: str = 'presto_default', - *args, **kwargs): - super().__init__( - sql=sql, pass_value=pass_value, tolerance=tolerance, - *args, **kwargs) - self.presto_conn_id = presto_conn_id - - def get_db_hook(self): - return PrestoHook(presto_conn_id=self.presto_conn_id) - - -class PrestoIntervalCheckOperator(IntervalCheckOperator): - """ - Checks that the values of metrics given as SQL expressions are within - a certain tolerance of the ones from days_back before. - - :param table: the table name - :type table: str - :param days_back: number of days between ds and the ds we want to check - against. Defaults to 7 days - :type days_back: int - :param metrics_threshold: a dictionary of ratios indexed by metrics - :type metrics_threshold: dict - :param presto_conn_id: reference to the Presto database - :type presto_conn_id: str - """ - - @apply_defaults - def __init__( - self, - table: str, - metrics_thresholds: Dict, - date_filter_column: str = 'ds', - days_back: int = -7, - presto_conn_id: str = 'presto_default', - *args, **kwargs): - super().__init__( - table=table, metrics_thresholds=metrics_thresholds, - date_filter_column=date_filter_column, days_back=days_back, - *args, **kwargs) - self.presto_conn_id = presto_conn_id - - def get_db_hook(self): - return PrestoHook(presto_conn_id=self.presto_conn_id) diff --git a/docs/autoapi_templates/index.rst b/docs/autoapi_templates/index.rst index c0fac5986a6ac..5f2d149f1e9bb 100644 --- a/docs/autoapi_templates/index.rst +++ b/docs/autoapi_templates/index.rst @@ -160,8 +160,6 @@ All operators are in the following packages: airflow/providers/postgres/operators/index - airflow/providers/presto/operators/index - airflow/providers/qubole/operators/index airflow/providers/qubole/sensors/index diff --git a/tests/providers/presto/operators/__init__.py b/tests/providers/presto/operators/__init__.py deleted file mode 100644 index fe95886d5c104..0000000000000 --- a/tests/providers/presto/operators/__init__.py +++ /dev/null @@ -1,18 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# diff --git a/tests/providers/presto/operators/test_presto_check.py b/tests/providers/presto/operators/test_presto_check.py deleted file mode 100644 index 1c037982b5069..0000000000000 --- a/tests/providers/presto/operators/test_presto_check.py +++ /dev/null @@ -1,221 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - - -import os -import unittest -from datetime import datetime -from unittest import mock - -from airflow.exceptions import AirflowException -from airflow.models import DAG -from airflow.providers.presto.operators.presto_check import ( - PrestoCheckOperator, PrestoIntervalCheckOperator, PrestoValueCheckOperator, -) - -DEFAULT_DATE = datetime(2015, 1, 1) - - -class TestPrestoCheckOperator(unittest.TestCase): - @mock.patch.object(PrestoCheckOperator, "get_db_hook") - def test_execute_no_records(self, mock_get_db_hook): - mock_get_db_hook.return_value.get_first.return_value = [] - - with self.assertRaises(AirflowException): - PrestoCheckOperator(sql="sql").execute() - - @mock.patch.object(PrestoCheckOperator, "get_db_hook") - def test_execute_not_all_records_are_true(self, mock_get_db_hook): - mock_get_db_hook.return_value.get_first.return_value = ["data", ""] - - with self.assertRaises(AirflowException): - PrestoCheckOperator(sql="sql").execute() - - @unittest.skipIf( - 'AIRFLOW_RUNALL_TESTS' not in os.environ, - "Skipped because AIRFLOW_RUNALL_TESTS is not set") - def test_presto(self): - sql = """ - SELECT count(1) FROM airflow.static_babynames_partitioned; - """ - dag = DAG("test_dag", start_date=datetime(2017, 1, 1)) - op = PrestoCheckOperator( - task_id='presto_check', sql=sql, dag=dag) - op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, - ignore_ti_state=True) - - -class TestPrestoValueCheckOperator(unittest.TestCase): - def setUp(self): - self.task_id = "test_task" - self.conn_id = "default_conn" - - def _construct_operator(self, sql, pass_value, tolerance=None): - dag = DAG("test_dag", start_date=datetime(2017, 1, 1)) - - return PrestoValueCheckOperator( - dag=dag, - task_id=self.task_id, - conn_id=self.conn_id, - sql=sql, - pass_value=pass_value, - tolerance=tolerance, - ) - - def test_pass_value_template_string(self): - pass_value_str = "2018-03-22" - operator = self._construct_operator("select date from tab1;", "{{ ds }}") - - operator.render_template_fields({"ds": pass_value_str}) - - self.assertEqual(operator.task_id, self.task_id) - self.assertEqual(operator.pass_value, pass_value_str) - - def test_pass_value_template_string_float(self): - pass_value_float = 4.0 - operator = self._construct_operator("select date from tab1;", pass_value_float) - - operator.render_template_fields({}) - - self.assertEqual(operator.task_id, self.task_id) - self.assertEqual(operator.pass_value, str(pass_value_float)) - - @mock.patch.object(PrestoValueCheckOperator, "get_db_hook") - def test_execute_pass(self, mock_get_db_hook): - mock_hook = mock.Mock() - mock_hook.get_first.return_value = [10] - mock_get_db_hook.return_value = mock_hook - sql = "select value from tab1 limit 1;" - operator = self._construct_operator(sql, 5, 1) - - operator.execute(None) - - mock_hook.get_first.assert_called_once_with(sql) - - @mock.patch.object(PrestoValueCheckOperator, "get_db_hook") - def test_execute_fail(self, mock_get_db_hook): - mock_hook = mock.Mock() - mock_hook.get_first.return_value = [11] - mock_get_db_hook.return_value = mock_hook - - operator = self._construct_operator("select value from tab1 limit 1;", 5, 1) - - with self.assertRaisesRegex(AirflowException, "Tolerance:100.0%"): - operator.execute() - - -class TestPrestoIntervalCheckOperator(unittest.TestCase): - def _construct_operator(self, table, metric_thresholds, ratio_formula, ignore_zero): - return PrestoIntervalCheckOperator( - task_id="test_task", - table=table, - metrics_thresholds=metric_thresholds, - ratio_formula=ratio_formula, - ignore_zero=ignore_zero, - ) - - def test_invalid_ratio_formula(self): - with self.assertRaisesRegex(AirflowException, "Invalid diff_method"): - self._construct_operator( - table="test_table", - metric_thresholds={"f1": 1}, - ratio_formula="abs", - ignore_zero=False, - ) - - @mock.patch.object(PrestoIntervalCheckOperator, "get_db_hook") - def test_execute_not_ignore_zero(self, mock_get_db_hook): - mock_hook = mock.Mock() - mock_hook.get_first.return_value = [0] - mock_get_db_hook.return_value = mock_hook - - operator = self._construct_operator( - table="test_table", - metric_thresholds={"f1": 1}, - ratio_formula="max_over_min", - ignore_zero=False, - ) - - with self.assertRaises(AirflowException): - operator.execute() - - @mock.patch.object(PrestoIntervalCheckOperator, "get_db_hook") - def test_execute_ignore_zero(self, mock_get_db_hook): - mock_hook = mock.Mock() - mock_hook.get_first.return_value = [0] - mock_get_db_hook.return_value = mock_hook - - operator = self._construct_operator( - table="test_table", - metric_thresholds={"f1": 1}, - ratio_formula="max_over_min", - ignore_zero=True, - ) - - operator.execute() - - @mock.patch.object(PrestoIntervalCheckOperator, "get_db_hook") - def test_execute_min_max(self, mock_get_db_hook): - mock_hook = mock.Mock() - - def returned_row(): - rows = [ - [2, 2, 2, 2], # reference - [1, 1, 1, 1], # current - ] - - yield from rows - - mock_hook.get_first.side_effect = returned_row() - mock_get_db_hook.return_value = mock_hook - - operator = self._construct_operator( - table="test_table", - metric_thresholds={"f0": 1.0, "f1": 1.5, "f2": 2.0, "f3": 2.5}, - ratio_formula="max_over_min", - ignore_zero=True, - ) - - with self.assertRaisesRegex(AirflowException, "f0, f1, f2"): - operator.execute() - - @mock.patch.object(PrestoIntervalCheckOperator, "get_db_hook") - def test_execute_diff(self, mock_get_db_hook): - mock_hook = mock.Mock() - - def returned_row(): - rows = [ - [3, 3, 3, 3], # reference - [1, 1, 1, 1], # current - ] - - yield from rows - - mock_hook.get_first.side_effect = returned_row() - mock_get_db_hook.return_value = mock_hook - - operator = self._construct_operator( - table="test_table", - metric_thresholds={"f0": 0.5, "f1": 0.6, "f2": 0.7, "f3": 0.8}, - ratio_formula="relative_diff", - ignore_zero=True, - ) - - with self.assertRaisesRegex(AirflowException, "f0, f1"): - operator.execute() diff --git a/tests/test_core_to_contrib.py b/tests/test_core_to_contrib.py index dde51e032467f..9d3b5396b5b78 100644 --- a/tests/test_core_to_contrib.py +++ b/tests/test_core_to_contrib.py @@ -1180,15 +1180,15 @@ 'airflow.operators.papermill_operator.PapermillOperator', ), ( - 'airflow.providers.presto.operators.presto_check.PrestoCheckOperator', + 'airflow.operators.check_operator.CheckOperator', 'airflow.operators.presto_check_operator.PrestoCheckOperator', ), ( - 'airflow.providers.presto.operators.presto_check.PrestoIntervalCheckOperator', + 'airflow.operators.check_operator.IntervalCheckOperator', 'airflow.operators.presto_check_operator.PrestoIntervalCheckOperator', ), ( - 'airflow.providers.presto.operators.presto_check.PrestoValueCheckOperator', + 'airflow.operators.check_operator.ValueCheckOperator', 'airflow.operators.presto_check_operator.PrestoValueCheckOperator', ), (