Skip to content

Commit

Permalink
fix: Update SqlToSlackApiFileOperator with new param to check empty o…
Browse files Browse the repository at this point in the history
…utput
  • Loading branch information
andyguwc committed Mar 12, 2024
1 parent 68282c1 commit a587a71
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 1 deletion.
9 changes: 9 additions & 0 deletions airflow/providers/slack/transfers/sql_to_slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
from airflow.utils.context import Context


class SqlToSlackNullOutputException(AirflowException):
"""An exception that indicates the sql output is empty and hence should not be sent with slack message."""


class SqlToSlackApiFileOperator(BaseSqlToSlackOperator):
"""
Executes an SQL statement in a given SQL connection and sends the results to Slack API as file.
Expand All @@ -58,6 +62,7 @@ class SqlToSlackApiFileOperator(BaseSqlToSlackOperator):
:param slack_base_url: A string representing the Slack API base URL. Optional
:param slack_method_version: The version of the Slack SDK Client method to be used, either "v1" or "v2".
:param df_kwargs: Keyword arguments forwarded to ``pandas.DataFrame.to_{format}()`` method.
:param allow_null: Keyword argument to allow null sql output which implies allowing sending an empty file.
"""

template_fields: Sequence[str] = (
Expand Down Expand Up @@ -87,6 +92,7 @@ def __init__(
slack_base_url: str | None = None,
slack_method_version: Literal["v1", "v2"] = "v1",
df_kwargs: dict | None = None,
allow_null: bool = True,
**kwargs,
):
super().__init__(
Expand All @@ -100,6 +106,7 @@ def __init__(
self.slack_base_url = slack_base_url
self.slack_method_version = slack_method_version
self.df_kwargs = df_kwargs or {}
self.allow_null = allow_null

@cached_property
def slack_hook(self):
Expand Down Expand Up @@ -134,6 +141,8 @@ def execute(self, context: Context) -> None:
output_file_name = fp.name
output_file_format = output_file_format.upper()
df_result = self._get_query_results()
if not self.allow_null and df_result.empty:
raise SqlToSlackNullOutputException("Sql output is empty.")
if output_file_format == "CSV":
df_result.to_csv(output_file_name, **self.df_kwargs)
elif output_file_format == "JSON":
Expand Down
27 changes: 26 additions & 1 deletion tests/providers/slack/transfers/test_sql_to_slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
import pytest

from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.providers.slack.transfers.sql_to_slack import SqlToSlackApiFileOperator, SqlToSlackOperator
from airflow.providers.slack.transfers.sql_to_slack import (
SqlToSlackApiFileOperator,
SqlToSlackNullOutputException,
SqlToSlackOperator,
)
from airflow.utils import timezone

TEST_DAG_ID = "sql_to_slack_unit_test"
Expand Down Expand Up @@ -157,6 +161,27 @@ def test_unsupported_format(self, filename):
with pytest.raises(ValueError):
op.execute(mock.MagicMock())

@mock.patch("airflow.providers.slack.transfers.sql_to_slack.BaseSqlToSlackOperator._get_query_results")
def test_null_output(self, mock_get_query_results):
op_kwargs = {
**self.default_op_kwargs,
"slack_conn_id": "expected-test-slack-conn-id",
"slack_filename": "test_filename.csv",
"slack_channels": ["#random"],
"slack_initial_comment": "test_comment",
"slack_title": "test_title",
"allow_null": False,
}
op = SqlToSlackApiFileOperator(task_id="test_send_file", **op_kwargs)

# Mock empty query results
mock_df = mock.MagicMock()
mock_df.configure_mock(**{"empty.return_value": True})
mock_get_query_results.return_value = mock_df

with pytest.raises(SqlToSlackNullOutputException):
op.execute(mock.MagicMock)


def test_deprecated_sql_to_slack_operator():
warning_pattern = "SqlToSlackOperator` has been renamed and moved"
Expand Down

0 comments on commit a587a71

Please sign in to comment.