Skip to content

Commit

Permalink
Update SqlToSlackApiFileOperator with new param to check empty output (
Browse files Browse the repository at this point in the history
…apache#38079)

* fix: Update SqlToSlackApiFileOperator with new param to check empty output

* fix: skip sending slack instead of raising exception

* fix: update param to allow different ways to handle an empty df

* Apply suggestions from code review

fmt: make formatting changes

Co-authored-by: Andrey Anshin <[email protected]>

---------

Co-authored-by: Andrey Anshin <[email protected]>
  • Loading branch information
2 people authored and utkarsharma2 committed Apr 22, 2024
1 parent f494e78 commit 5a6cbdc
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 2 deletions.
18 changes: 17 additions & 1 deletion airflow/providers/slack/transfers/sql_to_slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from deprecated import deprecated
from typing_extensions import Literal

from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning
from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning, AirflowSkipException
from airflow.providers.slack.hooks.slack import SlackHook
from airflow.providers.slack.transfers.base_sql_to_slack import BaseSqlToSlackOperator
from airflow.providers.slack.transfers.sql_to_slack_webhook import SqlToSlackWebhookOperator
Expand Down Expand Up @@ -58,6 +58,11 @@ class SqlToSlackApiFileOperator(BaseSqlToSlackOperator):
:param slack_base_url: A string representing the Slack API base URL. Optional
:param slack_method_version: The version of the Slack SDK Client method to be used, either "v1" or "v2".
:param df_kwargs: Keyword arguments forwarded to ``pandas.DataFrame.to_{format}()`` method.
:param action_on_empty_df: Specifying how to handle an empty sql output df. Possible values:
- ``send``: (default) send the slack with an empty file.
- ``skip``: skip sending the slack message. Task state set to "skipped".
- ``error``: raise an error to fail the task. Task state set to "failed".
"""

template_fields: Sequence[str] = (
Expand Down Expand Up @@ -87,6 +92,7 @@ def __init__(
slack_base_url: str | None = None,
slack_method_version: Literal["v1", "v2"] = "v1",
df_kwargs: dict | None = None,
action_on_empty_df: Literal["send", "skip", "error"] = "send",
**kwargs,
):
super().__init__(
Expand All @@ -100,6 +106,9 @@ def __init__(
self.slack_base_url = slack_base_url
self.slack_method_version = slack_method_version
self.df_kwargs = df_kwargs or {}
if not action_on_empty_df or action_on_empty_df not in ("send", "skip", "error"):
raise ValueError(f"Invalid `action_on_empty_df` value {action_on_empty_df!r}")
self.action_on_empty_df = action_on_empty_df

@cached_property
def slack_hook(self):
Expand Down Expand Up @@ -134,6 +143,13 @@ def execute(self, context: Context) -> None:
output_file_name = fp.name
output_file_format = output_file_format.upper()
df_result = self._get_query_results()
if df_result.empty:
if self.action_on_empty_df == "skip":
raise AirflowSkipException("SQL output df is empty. Skipping.")
elif self.action_on_empty_df == "error":
raise ValueError("SQL output df must be non-empty. Failing.")
elif self.action_on_empty_df != "send":
raise ValueError(f"Invalid `action_on_empty_df` value {self.action_on_empty_df!r}")
if output_file_format == "CSV":
df_result.to_csv(output_file_name, **self.df_kwargs)
elif output_file_format == "JSON":
Expand Down
69 changes: 68 additions & 1 deletion tests/providers/slack/transfers/test_sql_to_slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import pytest

from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.exceptions import AirflowProviderDeprecationWarning, AirflowSkipException
from airflow.providers.slack.transfers.sql_to_slack import SqlToSlackApiFileOperator, SqlToSlackOperator
from airflow.utils import timezone

Expand Down Expand Up @@ -157,6 +157,73 @@ def test_unsupported_format(self, filename):
with pytest.raises(ValueError):
op.execute(mock.MagicMock())

@mock.patch("airflow.providers.slack.transfers.sql_to_slack.SlackHook")
@mock.patch("airflow.providers.slack.transfers.sql_to_slack.BaseSqlToSlackOperator._get_query_results")
def test_null_output_sending_empty_file_by_default(self, mock_get_query_results, mock_slack_hook_cls):
op_kwargs = {
**self.default_op_kwargs,
"slack_conn_id": "expected-test-slack-conn-id",
"slack_filename": "test_filename.csv",
"slack_channels": ["#random"],
"slack_initial_comment": "test_comment",
"slack_title": "test_title",
}
op = SqlToSlackApiFileOperator(task_id="test_send_file", **op_kwargs)

# Mock empty query results
mock_df = mock.MagicMock()
mock_df.configure_mock(**{"empty.return_value": True})
mock_get_query_results.return_value = mock_df

op.execute(mock.MagicMock)
mock_slack_hook_cls.assert_called_once()

@mock.patch("airflow.providers.slack.transfers.sql_to_slack.SlackHook")
@mock.patch("airflow.providers.slack.transfers.sql_to_slack.BaseSqlToSlackOperator._get_query_results")
def test_null_output_skip_sending_file(self, mock_get_query_results, mock_slack_hook_cls):
op_kwargs = {
**self.default_op_kwargs,
"slack_conn_id": "expected-test-slack-conn-id",
"slack_filename": "test_filename.csv",
"slack_channels": ["#random"],
"slack_initial_comment": "test_comment",
"slack_title": "test_title",
"action_on_empty_df": "skip",
}
op = SqlToSlackApiFileOperator(task_id="test_send_file", **op_kwargs)

# Mock empty query results
mock_df = mock.MagicMock()
mock_df.configure_mock(**{"empty.return_value": True})
mock_get_query_results.return_value = mock_df

with pytest.raises(AirflowSkipException):
op.execute(mock.MagicMock())
mock_slack_hook_cls.assert_not_called()

@mock.patch("airflow.providers.slack.transfers.sql_to_slack.SlackHook")
@mock.patch("airflow.providers.slack.transfers.sql_to_slack.BaseSqlToSlackOperator._get_query_results")
def test_null_output_raise_error(self, mock_get_query_results, mock_slack_hook_cls):
op_kwargs = {
**self.default_op_kwargs,
"slack_conn_id": "expected-test-slack-conn-id",
"slack_filename": "test_filename.csv",
"slack_channels": ["#random"],
"slack_initial_comment": "test_comment",
"slack_title": "test_title",
"action_on_empty_df": "error",
}
op = SqlToSlackApiFileOperator(task_id="test_send_file", **op_kwargs)

# Mock empty query results
mock_df = mock.MagicMock()
mock_df.configure_mock(**{"empty.return_value": True})
mock_get_query_results.return_value = mock_df

with pytest.raises(ValueError, match="output df must be non-empty\. Failing"):
op.execute(mock.MagicMock())
mock_slack_hook_cls.assert_not_called()


def test_deprecated_sql_to_slack_operator():
warning_pattern = "SqlToSlackOperator` has been renamed and moved"
Expand Down

0 comments on commit 5a6cbdc

Please sign in to comment.