Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update SqlToSlackApiFileOperator with new param to check empty output #38079

Merged
merged 4 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion airflow/providers/slack/transfers/sql_to_slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from deprecated import deprecated
from typing_extensions import Literal

from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning
from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning, AirflowSkipException
from airflow.providers.slack.hooks.slack import SlackHook
from airflow.providers.slack.transfers.base_sql_to_slack import BaseSqlToSlackOperator
from airflow.providers.slack.transfers.sql_to_slack_webhook import SqlToSlackWebhookOperator
Expand Down Expand Up @@ -58,6 +58,11 @@ class SqlToSlackApiFileOperator(BaseSqlToSlackOperator):
:param slack_base_url: A string representing the Slack API base URL. Optional
:param slack_method_version: The version of the Slack SDK Client method to be used, either "v1" or "v2".
:param df_kwargs: Keyword arguments forwarded to ``pandas.DataFrame.to_{format}()`` method.
:param action_on_empty_df: Specifying how to handle an empty sql output df. Possible values:

- ``send``: (default) send the slack with an empty file.
- ``skip``: skip sending the slack message. Task state set to "skipped".
- ``error``: raise an error to fail the task. Task state set to "failed".
"""

template_fields: Sequence[str] = (
Expand Down Expand Up @@ -87,6 +92,7 @@ def __init__(
slack_base_url: str | None = None,
slack_method_version: Literal["v1", "v2"] = "v1",
df_kwargs: dict | None = None,
action_on_empty_df: Literal["send", "skip", "error"] = "send",
**kwargs,
):
super().__init__(
Expand All @@ -100,6 +106,9 @@ def __init__(
self.slack_base_url = slack_base_url
self.slack_method_version = slack_method_version
self.df_kwargs = df_kwargs or {}
if not action_on_empty_df or action_on_empty_df not in ("send", "skip", "error"):
raise ValueError(f"Invalid `action_on_empty_df` value {action_on_empty_df!r}")
self.action_on_empty_df = action_on_empty_df

@cached_property
def slack_hook(self):
Expand Down Expand Up @@ -134,6 +143,13 @@ def execute(self, context: Context) -> None:
output_file_name = fp.name
output_file_format = output_file_format.upper()
df_result = self._get_query_results()
if df_result.empty:
if self.action_on_empty_df == "skip":
raise AirflowSkipException("SQL output df is empty. Skipping.")
elif self.action_on_empty_df == "error":
raise ValueError("SQL output df must be non-empty. Failing.")
elif self.action_on_empty_df != "send":
raise ValueError(f"Invalid `action_on_empty_df` value {self.action_on_empty_df!r}")
if output_file_format == "CSV":
df_result.to_csv(output_file_name, **self.df_kwargs)
elif output_file_format == "JSON":
Expand Down
69 changes: 68 additions & 1 deletion tests/providers/slack/transfers/test_sql_to_slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import pytest

from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.exceptions import AirflowProviderDeprecationWarning, AirflowSkipException
from airflow.providers.slack.transfers.sql_to_slack import SqlToSlackApiFileOperator, SqlToSlackOperator
from airflow.utils import timezone

Expand Down Expand Up @@ -157,6 +157,73 @@ def test_unsupported_format(self, filename):
with pytest.raises(ValueError):
op.execute(mock.MagicMock())

@mock.patch("airflow.providers.slack.transfers.sql_to_slack.SlackHook")
@mock.patch("airflow.providers.slack.transfers.sql_to_slack.BaseSqlToSlackOperator._get_query_results")
def test_null_output_sending_empty_file_by_default(self, mock_get_query_results, mock_slack_hook_cls):
op_kwargs = {
**self.default_op_kwargs,
"slack_conn_id": "expected-test-slack-conn-id",
"slack_filename": "test_filename.csv",
"slack_channels": ["#random"],
"slack_initial_comment": "test_comment",
"slack_title": "test_title",
}
op = SqlToSlackApiFileOperator(task_id="test_send_file", **op_kwargs)

# Mock empty query results
mock_df = mock.MagicMock()
mock_df.configure_mock(**{"empty.return_value": True})
mock_get_query_results.return_value = mock_df

op.execute(mock.MagicMock)
mock_slack_hook_cls.assert_called_once()

@mock.patch("airflow.providers.slack.transfers.sql_to_slack.SlackHook")
@mock.patch("airflow.providers.slack.transfers.sql_to_slack.BaseSqlToSlackOperator._get_query_results")
def test_null_output_skip_sending_file(self, mock_get_query_results, mock_slack_hook_cls):
op_kwargs = {
**self.default_op_kwargs,
"slack_conn_id": "expected-test-slack-conn-id",
"slack_filename": "test_filename.csv",
"slack_channels": ["#random"],
"slack_initial_comment": "test_comment",
"slack_title": "test_title",
"action_on_empty_df": "skip",
}
op = SqlToSlackApiFileOperator(task_id="test_send_file", **op_kwargs)

# Mock empty query results
mock_df = mock.MagicMock()
mock_df.configure_mock(**{"empty.return_value": True})
mock_get_query_results.return_value = mock_df

with pytest.raises(AirflowSkipException):
op.execute(mock.MagicMock())
mock_slack_hook_cls.assert_not_called()

@mock.patch("airflow.providers.slack.transfers.sql_to_slack.SlackHook")
@mock.patch("airflow.providers.slack.transfers.sql_to_slack.BaseSqlToSlackOperator._get_query_results")
def test_null_output_raise_error(self, mock_get_query_results, mock_slack_hook_cls):
op_kwargs = {
**self.default_op_kwargs,
"slack_conn_id": "expected-test-slack-conn-id",
"slack_filename": "test_filename.csv",
"slack_channels": ["#random"],
"slack_initial_comment": "test_comment",
"slack_title": "test_title",
"action_on_empty_df": "error",
}
op = SqlToSlackApiFileOperator(task_id="test_send_file", **op_kwargs)

# Mock empty query results
mock_df = mock.MagicMock()
mock_df.configure_mock(**{"empty.return_value": True})
mock_get_query_results.return_value = mock_df

with pytest.raises(ValueError, match="output df must be non-empty\. Failing"):
op.execute(mock.MagicMock())
mock_slack_hook_cls.assert_not_called()


def test_deprecated_sql_to_slack_operator():
warning_pattern = "SqlToSlackOperator` has been renamed and moved"
Expand Down
Loading