From a587a71b960753cbf5b06990f139701b3e2aacea Mon Sep 17 00:00:00 2001 From: Tianyou Gu Date: Mon, 11 Mar 2024 21:43:25 -0700 Subject: [PATCH] fix: Update SqlToSlackApiFileOperator with new param to check empty output --- .../providers/slack/transfers/sql_to_slack.py | 9 +++++++ .../slack/transfers/test_sql_to_slack.py | 27 ++++++++++++++++++- 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/airflow/providers/slack/transfers/sql_to_slack.py b/airflow/providers/slack/transfers/sql_to_slack.py index 0ecfc4e8ca917c..8b4ff2c91872d3 100644 --- a/airflow/providers/slack/transfers/sql_to_slack.py +++ b/airflow/providers/slack/transfers/sql_to_slack.py @@ -33,6 +33,10 @@ from airflow.utils.context import Context +class SqlToSlackNullOutputException(AirflowException): + """An exception that indicates the sql output is empty and hence should not be sent with slack message.""" + + class SqlToSlackApiFileOperator(BaseSqlToSlackOperator): """ Executes an SQL statement in a given SQL connection and sends the results to Slack API as file. @@ -58,6 +62,7 @@ class SqlToSlackApiFileOperator(BaseSqlToSlackOperator): :param slack_base_url: A string representing the Slack API base URL. Optional :param slack_method_version: The version of the Slack SDK Client method to be used, either "v1" or "v2". :param df_kwargs: Keyword arguments forwarded to ``pandas.DataFrame.to_{format}()`` method. + :param allow_null: Keyword argument to allow null sql output which implies allowing sending an empty file. """ template_fields: Sequence[str] = ( @@ -87,6 +92,7 @@ def __init__( slack_base_url: str | None = None, slack_method_version: Literal["v1", "v2"] = "v1", df_kwargs: dict | None = None, + allow_null: bool = True, **kwargs, ): super().__init__( @@ -100,6 +106,7 @@ def __init__( self.slack_base_url = slack_base_url self.slack_method_version = slack_method_version self.df_kwargs = df_kwargs or {} + self.allow_null = allow_null @cached_property def slack_hook(self): @@ -134,6 +141,8 @@ def execute(self, context: Context) -> None: output_file_name = fp.name output_file_format = output_file_format.upper() df_result = self._get_query_results() + if not self.allow_null and df_result.empty: + raise SqlToSlackNullOutputException("Sql output is empty.") if output_file_format == "CSV": df_result.to_csv(output_file_name, **self.df_kwargs) elif output_file_format == "JSON": diff --git a/tests/providers/slack/transfers/test_sql_to_slack.py b/tests/providers/slack/transfers/test_sql_to_slack.py index fd3c06189d5109..cedf5fc3102d5b 100644 --- a/tests/providers/slack/transfers/test_sql_to_slack.py +++ b/tests/providers/slack/transfers/test_sql_to_slack.py @@ -21,7 +21,11 @@ import pytest from airflow.exceptions import AirflowProviderDeprecationWarning -from airflow.providers.slack.transfers.sql_to_slack import SqlToSlackApiFileOperator, SqlToSlackOperator +from airflow.providers.slack.transfers.sql_to_slack import ( + SqlToSlackApiFileOperator, + SqlToSlackNullOutputException, + SqlToSlackOperator, +) from airflow.utils import timezone TEST_DAG_ID = "sql_to_slack_unit_test" @@ -157,6 +161,27 @@ def test_unsupported_format(self, filename): with pytest.raises(ValueError): op.execute(mock.MagicMock()) + @mock.patch("airflow.providers.slack.transfers.sql_to_slack.BaseSqlToSlackOperator._get_query_results") + def test_null_output(self, mock_get_query_results): + op_kwargs = { + **self.default_op_kwargs, + "slack_conn_id": "expected-test-slack-conn-id", + "slack_filename": "test_filename.csv", + "slack_channels": ["#random"], + "slack_initial_comment": "test_comment", + "slack_title": "test_title", + "allow_null": False, + } + op = SqlToSlackApiFileOperator(task_id="test_send_file", **op_kwargs) + + # Mock empty query results + mock_df = mock.MagicMock() + mock_df.configure_mock(**{"empty.return_value": True}) + mock_get_query_results.return_value = mock_df + + with pytest.raises(SqlToSlackNullOutputException): + op.execute(mock.MagicMock) + def test_deprecated_sql_to_slack_operator(): warning_pattern = "SqlToSlackOperator` has been renamed and moved"