From bb7dea99c5743b010ae67e6294a381b3ab39af52 Mon Sep 17 00:00:00 2001 From: Daniel Gaspar Date: Fri, 13 Nov 2020 09:22:37 +0000 Subject: [PATCH 01/16] feat(reports): scheduler and delivery system --- superset/dao/base.py | 8 +- superset/models/reports.py | 1 + superset/reports/commands/execute.py | 129 ++++++++++++++++++ superset/reports/dao.py | 145 +++++++++++++++++++++ superset/reports/notifications/__init__.py | 34 +++++ superset/reports/notifications/base.py | 51 ++++++++ superset/reports/notifications/email.py | 89 +++++++++++++ superset/reports/notifications/slack.py | 78 +++++++++++ superset/tasks/celery_app.py | 2 +- superset/tasks/scheduler.py | 56 ++++++++ 10 files changed, 589 insertions(+), 4 deletions(-) create mode 100644 superset/reports/commands/execute.py create mode 100644 superset/reports/dao.py create mode 100644 superset/reports/notifications/__init__.py create mode 100644 superset/reports/notifications/base.py create mode 100644 superset/reports/notifications/email.py create mode 100644 superset/reports/notifications/slack.py create mode 100644 superset/tasks/scheduler.py diff --git a/superset/dao/base.py b/superset/dao/base.py index c5db30167b1bb..a4e43d90d3139 100644 --- a/superset/dao/base.py +++ b/superset/dao/base.py @@ -20,6 +20,7 @@ from flask_appbuilder.models.sqla import Model from flask_appbuilder.models.sqla.interface import SQLAInterface from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy.orm import Session from superset.dao.exceptions import ( DAOConfigError, @@ -46,13 +47,14 @@ class BaseDAO: """ @classmethod - def find_by_id(cls, model_id: int) -> Model: + def find_by_id(cls, model_id: int, session: Session = None) -> Model: """ Find a model by id, if defined applies `base_filter` """ - query = db.session.query(cls.model_cls) + session = session or db.session + query = session.query(cls.model_cls) if cls.base_filter: - data_model = SQLAInterface(cls.model_cls, db.session) + data_model = SQLAInterface(cls.model_cls, session) query = cls.base_filter( # pylint: disable=not-callable "id", data_model ).apply(query, None) diff --git a/superset/models/reports.py b/superset/models/reports.py index c510cc6069fb3..6bf3c0aba4239 100644 --- a/superset/models/reports.py +++ b/superset/models/reports.py @@ -61,6 +61,7 @@ class ReportRecipientType(str, enum.Enum): class ReportLogState(str, enum.Enum): SUCCESS = "Success" + WORKING = "Working" ERROR = "Error" diff --git a/superset/reports/commands/execute.py b/superset/reports/commands/execute.py new file mode 100644 index 0000000000000..0ee07478f7992 --- /dev/null +++ b/superset/reports/commands/execute.py @@ -0,0 +1,129 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import logging +import urllib +from dataclasses import dataclass +from typing import Any, Iterator, Optional + +from contextlib2 import contextmanager +from flask import url_for +from sqlalchemy.orm import Session + +from superset import app, thumbnail_cache +from superset.commands.base import BaseCommand +from superset.extensions import db, security_manager +from superset.models.reports import ReportExecutionLog, ReportLogState, ReportSchedule +from superset.reports.dao import ReportScheduleDAO +from superset.reports.notifications import create_notification +from superset.reports.notifications.base import NotificationContent, ScreenshotData +from superset.utils.celery import session_scope +from superset.utils.screenshots import ChartScreenshot, DashboardScreenshot +from superset.utils.urls import get_url_path + +logger = logging.getLogger(__name__) + + +@contextmanager +def normal_session_scope() -> Iterator[Session]: + session = db.session + try: + yield session + session.commit() + except Exception as ex: + session.rollback() + logger.exception(ex) + raise + finally: + session.close() + + +def _get_url_path(view: str, user_friendly: bool = False, **kwargs: Any) -> str: + with app.test_request_context(): + base_url = ( + app.config["WEBDRIVER_BASEURL_USER_FRIENDLY"] + if user_friendly + else app.config["WEBDRIVER_BASEURL"] + ) + return urllib.parse.urljoin(str(base_url), url_for(view, **kwargs)) + + +class ExecuteReportScheduleCommand(BaseCommand): + def __init__(self, model_id: int, worker_context: bool = True): + self._worker_context = worker_context + self._model_id = model_id + self._model: Optional[ReportSchedule] = None + + def set_state(self, session: Session, state: ReportLogState): + if self._model: + self._model.last_state = state + session.commit() + + def get_url(self, user_friendly: bool = False) -> str: + if self._model.chart: + return get_url_path( + "Superset.slice", + user_friendly=user_friendly, + slice_id=self._model.chart_id, + standalone="true", + ) + return get_url_path( + "Superset.dashboard", + user_friendly=user_friendly, + dashboard_id_or_slug=self._model.dashboard_id, + ) + + def get_screenshot(self) -> ScreenshotData: + url = self.get_url() + if self._model.chart: + screenshot = ChartScreenshot(url, self._model.chart.digest) + else: + screenshot = DashboardScreenshot(url, self._model.dashboard.digest) + image_url = self.get_url(user_friendly=True) + + user = security_manager.find_user(app.config["THUMBNAIL_SELENIUM_USER"]) + image_data = screenshot.compute_and_cache( + user=user, cache=thumbnail_cache, force=True, + ) + return ScreenshotData(url=image_url, image=image_data) + + def get_notification_content(self) -> NotificationContent: + screenshot_data = self.get_screenshot() + if self._model.chart: + name = self._model.chart.slice_name + else: + name = self._model.dashboard.dashboard_title + return NotificationContent(name=name, screenshot=screenshot_data) + + def run(self) -> None: + if self._worker_context: + session_context = session_scope(nullpool=True) + else: + session_context = normal_session_scope + with session_context as session: + self.validate(session=session) + self.set_state(session, ReportLogState.WORKING) + notification_content = self.get_notification_content() + for recipient in self._model.recipients: + notification = create_notification(recipient, notification_content) + notification.send() + self.set_state(session, ReportLogState.SUCCESS) + + def validate(self, session: Session = None) -> None: + # Validate/populate model exists + self._model = ReportScheduleDAO.find_by_id(self._model_id, session=session) + if not self._model: + raise Exception("NOT FOUND") diff --git a/superset/reports/dao.py b/superset/reports/dao.py new file mode 100644 index 0000000000000..6f037576cb62c --- /dev/null +++ b/superset/reports/dao.py @@ -0,0 +1,145 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import logging +from typing import Any, Dict, List, Optional + +from flask_appbuilder import Model +from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy.orm import Session + +from superset.dao.base import BaseDAO +from superset.dao.exceptions import DAOCreateFailedError, DAODeleteFailedError +from superset.extensions import db +from superset.models.reports import ReportRecipients, ReportSchedule + +logger = logging.getLogger(__name__) + + +class ReportScheduleDAO(BaseDAO): + model_cls = ReportSchedule + + @staticmethod + def bulk_delete( + models: Optional[List[ReportSchedule]], commit: bool = True + ) -> None: + item_ids = [model.id for model in models] if models else [] + try: + # Clean owners secondary table + report_schedules = ( + db.session.query(ReportSchedule) + .filter(ReportSchedule.id.in_(item_ids)) + .all() + ) + for report_schedule in report_schedules: + report_schedule.owners = [] + for report_schedule in report_schedules: + db.session.delete(report_schedule) + if commit: + db.session.commit() + except SQLAlchemyError: + if commit: + db.session.rollback() + raise DAODeleteFailedError() + + @staticmethod + def validate_update_uniqueness( + name: str, report_schedule_id: Optional[int] = None + ) -> bool: + """ + Validate if this name is unique. + + :param name: The report schedule name + :param report_schedule_id: The report schedule current id + (only for validating on updates) + :return: bool + """ + query = db.session.query(ReportSchedule).filter(ReportSchedule.name == name) + if report_schedule_id: + query = query.filter(ReportSchedule.id != report_schedule_id) + return not db.session.query(query.exists()).scalar() + + @classmethod + def create(cls, properties: Dict[str, Any], commit: bool = True) -> Model: + """ + create a report schedule and nested recipients + :raises: DAOCreateFailedError + """ + import json + + try: + model = ReportSchedule() + for key, value in properties.items(): + if key != "recipients": + setattr(model, key, value) + recipients = properties.get("recipients", []) + for recipient in recipients: + model.recipients.append( # pylint: disable=no-member + ReportRecipients( + type=recipient["type"], + recipient_config_json=json.dumps( + recipient["recipient_config_json"] + ), + ) + ) + db.session.add(model) + if commit: + db.session.commit() + return model + except SQLAlchemyError: + db.session.rollback() + raise DAOCreateFailedError + + @classmethod + def update( + cls, model: Model, properties: Dict[str, Any], commit: bool = True + ) -> Model: + """ + create a report schedule and nested recipients + :raises: DAOCreateFailedError + """ + import json + + try: + for key, value in properties.items(): + if key != "recipients": + setattr(model, key, value) + if "recipients" in properties: + recipients = properties["recipients"] + model.recipients = [ + ReportRecipients( + type=recipient["type"], + recipient_config_json=json.dumps( + recipient["recipient_config_json"] + ), + report_schedule=model, + ) + for recipient in recipients + ] + db.session.merge(model) + if commit: + db.session.commit() + return model + except SQLAlchemyError: + db.session.rollback() + raise DAOCreateFailedError + + @staticmethod + def find_active(session: Optional[Session] = None) -> List[ReportSchedule]: + session = session or db.session + return ( + session.query(ReportSchedule).filter(ReportSchedule.active.is_(True)).all() + ) diff --git a/superset/reports/notifications/__init__.py b/superset/reports/notifications/__init__.py new file mode 100644 index 0000000000000..2553053131690 --- /dev/null +++ b/superset/reports/notifications/__init__.py @@ -0,0 +1,34 @@ +# -*- coding: utf-8 -*- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from superset.models.reports import ReportRecipients +from superset.reports.notifications.base import BaseNotification, NotificationContent +from superset.reports.notifications.email import EmailNotification +from superset.reports.notifications.slack import SlackNotification + + +def create_notification( + recipient: ReportRecipients, screenshot_data: NotificationContent +) -> BaseNotification: + """ + Notification polymorphic factory + Returns the Notification class for the recipient type + """ + for plugin in BaseNotification.plugins: + if plugin.type == recipient.type: + return plugin(recipient, screenshot_data) + raise Exception("Recipient type not supported") diff --git a/superset/reports/notifications/base.py b/superset/reports/notifications/base.py new file mode 100644 index 0000000000000..df67735bf3c6e --- /dev/null +++ b/superset/reports/notifications/base.py @@ -0,0 +1,51 @@ +# -*- coding: utf-8 -*- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from dataclasses import dataclass +from typing import List, Optional, Type + +from superset.models.reports import ReportRecipients, ReportRecipientType + + +@dataclass +class ScreenshotData: + url: str # url to chart/dashboard for this screenshot + image: Optional[bytes] # bytes for the screenshot + + +@dataclass +class NotificationContent: + name: str + screenshot: ScreenshotData + + +class BaseNotification: + plugins: List[Type["BaseNotification"]] = [] + type: Optional[ReportRecipientType] = None + + def __init_subclass__(cls, **kwargs): + super().__init_subclass__(**kwargs) + cls.plugins.append(cls) + + def __init__( + self, recipient: ReportRecipients, content: NotificationContent + ) -> None: + self._recipient = recipient + self._content = content + + def send(self) -> None: + raise NotImplementedError() diff --git a/superset/reports/notifications/email.py b/superset/reports/notifications/email.py new file mode 100644 index 0000000000000..dc97da16ad379 --- /dev/null +++ b/superset/reports/notifications/email.py @@ -0,0 +1,89 @@ +# -*- coding: utf-8 -*- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import json +import logging +from dataclasses import dataclass +from email.utils import make_msgid, parseaddr +from typing import Dict + +from flask_babel import gettext as __ + +from superset import app +from superset.models.reports import ReportRecipientType +from superset.reports.notifications.base import BaseNotification +from superset.utils.core import send_email_smtp + +logger = logging.getLogger(__name__) + + +@dataclass +class EmailContent: + body: str + images: Dict[str, bytes] + + +class EmailNotification(BaseNotification): + type = ReportRecipientType.EMAIL + + def _get_smtp_from_domain(self): + return parseaddr(app.config["SMTP_MAIL_FROM"])[1].split("@")[1] + + def _get_content(self) -> EmailContent: + # Get the domain from the 'From' address .. + # and make a message id without the < > in the ends + domain = self._get_smtp_from_domain() + msgid = make_msgid(domain)[1:-1] + + images = {msgid: self._content.screenshot.image} + body = __( + """ + Explore in Superset

+ + """, + url=self._content.screenshot.url, + msgid=msgid, + ) + return EmailContent(body=body, images=images) + + def _get_subject(self) -> str: + return __( + "%(prefix)s %(title)s", + prefix=app.config["EMAIL_REPORTS_SUBJECT_PREFIX"], + title=self._content.name, + ) + + def _get_to(self) -> str: + return json.loads(self._recipient.recipient_config_json)["target"] + + def send(self) -> None: + subject = self._get_subject() + content = self._get_content() + to = self._get_to() + send_email_smtp( + to, + subject, + content.body, + app.config, + files=[], + data=None, + images=content.images, + bcc="", + mime_subtype="related", + dryrun=False, + ) + logger.debug(f"EMAIL SENT {to} {subject}") diff --git a/superset/reports/notifications/slack.py b/superset/reports/notifications/slack.py new file mode 100644 index 0000000000000..a83fc54e04517 --- /dev/null +++ b/superset/reports/notifications/slack.py @@ -0,0 +1,78 @@ +# -*- coding: utf-8 -*- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import json +import logging +from io import IOBase +from typing import cast, Optional, Union + +from flask_babel import gettext as __ +from retry.api import retry +from slack import WebClient +from slack.errors import SlackApiError +from slack.web.slack_response import SlackResponse + +from superset import app +from superset.models.reports import ReportRecipientType +from superset.reports.notifications.base import BaseNotification + +logger = logging.getLogger(__name__) + + +class SlackNotification(BaseNotification): + type = ReportRecipientType.SLACK + + def _get_channel(self) -> str: + return json.loads(self._recipient.recipient_config_json)["target"] + + def _get_body(self) -> str: + return __( + """ + *%(name)s*\n + <%(url)s|Explore in Superset> + """, + name=self._content.name, + url=self._content.screenshot.url, + ) + + def _get_inline_screenshot(self) -> Optional[Union[str, IOBase, bytes]]: + return self._content.screenshot.image + + @retry(SlackApiError, delay=10, backoff=2, tries=5) + def send(self) -> None: + file = self._get_inline_screenshot() + channel = self._get_channel() + body = self._get_body() + + client = WebClient( + token=app.config["SLACK_API_TOKEN"], proxy=app.config["SLACK_PROXY"] + ) + # files_upload returns SlackResponse as we run it in sync mode. + if file: + response = cast( + SlackResponse, + client.files_upload( + channels=channel, file=file, initial_comment=body, title="subject" + ), + ) + assert response["file"], str(response) # the uploaded file + else: + response = cast( + SlackResponse, client.chat_postMessage(channel=channel, text=body), + ) + assert response["message"]["text"], str(response) + logger.info("Sent the report to the slack %s", channel) diff --git a/superset/tasks/celery_app.py b/superset/tasks/celery_app.py index 0f3cd0ef558b2..d84273f4ee710 100644 --- a/superset/tasks/celery_app.py +++ b/superset/tasks/celery_app.py @@ -29,7 +29,7 @@ # Need to import late, as the celery_app will have been setup by "create_app()" # pylint: disable=wrong-import-position, unused-import -from . import cache, schedules # isort:skip +from . import cache, schedules, scheduler # isort:skip # Export the celery app globally for Celery (as run on the cmd line) to find app = celery_app diff --git a/superset/tasks/scheduler.py b/superset/tasks/scheduler.py new file mode 100644 index 0000000000000..2d06cb1baf9a2 --- /dev/null +++ b/superset/tasks/scheduler.py @@ -0,0 +1,56 @@ +import logging +from datetime import datetime, timedelta +from typing import Iterator, Optional + +import croniter + +from superset.extensions import celery_app +from superset.reports.commands.execute import ExecuteReportScheduleCommand +from superset.reports.dao import ReportScheduleDAO +from superset.utils.celery import session_scope + +logger = logging.getLogger(__name__) + + +class ScheduleWindow: + def __init__(self, window_size: int = 10) -> None: + self._window_size = window_size + utc_now = datetime.utcnow() + self._start_at = utc_now - timedelta(seconds=1) + self._stop_at = utc_now + timedelta(seconds=self._window_size) + + def next(self, cron: str) -> Iterator[datetime]: + crons = croniter.croniter(cron, self._start_at) + for schedule in crons.all_next(datetime): + if schedule >= self._stop_at: + break + yield schedule + + @property + def start_at(self) -> datetime: + return self._start_at + + @property + def stop_at(self) -> datetime: + return self._stop_at + + +@celery_app.task(name="reports.scheduler") +def scheduler() -> None: + """ Celery beat main scheduler """ + schedule_window = ScheduleWindow() + with session_scope(nullpool=True) as session: + active_schedules = ReportScheduleDAO.find_active(session) + for active_schedule in active_schedules: + logger.debug(f"Processing active schedule {active_schedule}") + for schedule in schedule_window.next(active_schedule.crontab): + logger.debug(f"Execution time {schedule}") + execute.apply_async((active_schedule.id,), eta=schedule) + + +@celery_app.task(name="reports.execute") +def execute(report_schedule_id: int) -> None: + try: + ExecuteReportScheduleCommand(report_schedule_id, worker_context=True).run() + except Exception as ex: + pass From b35228015dd9b463e472ffd73d25cbe3eb087df3 Mon Sep 17 00:00:00 2001 From: Daniel Gaspar Date: Mon, 16 Nov 2020 12:07:22 +0000 Subject: [PATCH 02/16] working version --- superset/models/reports.py | 1 + superset/reports/commands/alert.py | 83 +++++++++++ superset/reports/commands/base.py | 17 ++- superset/reports/commands/exceptions.py | 32 +++++ superset/reports/commands/execute.py | 179 ++++++++++++++++++++---- superset/reports/commands/log_prune.py | 62 ++++++++ superset/reports/dao.py | 27 +++- superset/reports/notifications/base.py | 8 +- superset/reports/notifications/email.py | 6 +- superset/tasks/scheduler.py | 15 +- 10 files changed, 388 insertions(+), 42 deletions(-) create mode 100644 superset/reports/commands/alert.py create mode 100644 superset/reports/commands/log_prune.py diff --git a/superset/models/reports.py b/superset/models/reports.py index 8aceea5a93b9a..aeeebeb8929c5 100644 --- a/superset/models/reports.py +++ b/superset/models/reports.py @@ -62,6 +62,7 @@ class ReportLogState(str, enum.Enum): SUCCESS = "Success" WORKING = "Working" ERROR = "Error" + NOOP = "Not triggered" class ReportEmailFormat(str, enum.Enum): diff --git a/superset/reports/commands/alert.py b/superset/reports/commands/alert.py new file mode 100644 index 0000000000000..1d8d0dd84df2b --- /dev/null +++ b/superset/reports/commands/alert.py @@ -0,0 +1,83 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import json +import logging +from operator import eq, ge, gt, le, lt, ne +from typing import Optional + +import numpy as np +import pandas as pd + +from superset import jinja_context +from superset.commands.base import BaseCommand +from superset.models.reports import ReportSchedule, ReportScheduleValidatorType +from superset.reports.commands.exceptions import ( + AlertQueryInvalidTypeError, + AlertQueryMultipleColumnsError, + AlertQueryMultipleRowsError, +) + +logger = logging.getLogger(__name__) + + +OPERATOR_FUNCTIONS = {">=": ge, ">": gt, "<=": le, "<": lt, "==": eq, "!=": ne} + + +class AlertCommand(BaseCommand): + def __init__(self, report_schedule: ReportSchedule): + self._report_schedule = report_schedule + self._result: Optional[float] = None + + def run(self) -> bool: + self.validate() + self._report_schedule.last_value = self._result + + if self._report_schedule.validator_type == ReportScheduleValidatorType.NOT_NULL: + return self._result not in (0, None, np.nan) + else: + operator = json.loads(self._report_schedule.validator_config_json)["op"] + threshold = json.loads(self._report_schedule.validator_config_json)[ + "threshold" + ] + return OPERATOR_FUNCTIONS[operator](self._result, threshold) + + def validate(self) -> None: + """ + Validate the query result as a Pandas DataFrame + """ + sql_template = jinja_context.get_template_processor( + database=self._report_schedule.database + ) + rendered_sql = sql_template.process_template(self._report_schedule.sql) + df = self._report_schedule.database.get_df(rendered_sql) + + if df.empty: + return + rows = df.to_records() + # check if query return more then one row + if len(rows) > 1: + raise AlertQueryMultipleRowsError() + if len(rows[0]) > 2: + raise AlertQueryMultipleColumnsError() + if rows[0][1] is None: + return + try: + # Check if it's float or if we can convert it + self._result = float(rows[0][1]) + return + except (AssertionError, TypeError, ValueError): + raise AlertQueryInvalidTypeError() diff --git a/superset/reports/commands/base.py b/superset/reports/commands/base.py index bb4064d22cde7..012bc256ee70f 100644 --- a/superset/reports/commands/base.py +++ b/superset/reports/commands/base.py @@ -15,13 +15,16 @@ # specific language governing permissions and limitations # under the License. import logging -from typing import Any, Dict, List +from typing import Any, Dict, Iterator, List +from contextlib2 import contextmanager from marshmallow import ValidationError +from sqlalchemy.orm import Session from superset.charts.dao import ChartDAO from superset.commands.base import BaseCommand from superset.dashboards.dao import DashboardDAO +from superset.extensions import db from superset.reports.commands.exceptions import ( ChartNotFoundValidationError, DashboardNotFoundValidationError, @@ -31,6 +34,18 @@ logger = logging.getLogger(__name__) +@contextmanager +def normal_session_scope() -> Iterator[Session]: + session = db.session + try: + yield session + session.commit() + except Exception as ex: + session.rollback() + logger.exception(ex) + raise ex + + class BaseReportScheduleCommand(BaseCommand): _properties: Dict[str, Any] diff --git a/superset/reports/commands/exceptions.py b/superset/reports/commands/exceptions.py index 23a21425bd92b..0bc80d2975524 100644 --- a/superset/reports/commands/exceptions.py +++ b/superset/reports/commands/exceptions.py @@ -103,6 +103,22 @@ class ReportScheduleDeleteFailedError(CommandException): message = _("Report Schedule delete failed.") +class PruneReportScheduleLogFailedError(CommandException): + message = _("Report Schedule log prune failed.") + + +class ReportScheduleScreenshotFailedError(CommandException): + message = _("Report Schedule execute screenshot failed.") + + +class ReportScheduleExecuteUnexpectedError(CommandException): + message = _("Report Schedule execute got an unexpected error.") + + +class ReportSchedulePreviousWorkingError(CommandException): + message = _("Report Schedule is still working error, refused to re compute.") + + class ReportScheduleNameUniquenessValidationError(ValidationError): """ Marshmallow validation error for Report Schedule name already exists @@ -110,3 +126,19 @@ class ReportScheduleNameUniquenessValidationError(ValidationError): def __init__(self) -> None: super().__init__([_("Name must be unique")], field_name="name") + + +class AlertQueryMultipleRowsError(CommandException): + message = _("Alert query returned more then one row.") + + +class AlertQueryMultipleColumnsError(CommandException): + message = _("Alert query returned more then one column.") + + +class AlertQueryInvalidTypeError(CommandException): + message = _("Alert query returned a non-number value") + + +class ReportScheduleAlertGracePeriodError(CommandException): + message = _("Alert on grace period") diff --git a/superset/reports/commands/execute.py b/superset/reports/commands/execute.py index 0ee07478f7992..35c85db777ba4 100644 --- a/superset/reports/commands/execute.py +++ b/superset/reports/commands/execute.py @@ -16,41 +16,45 @@ # under the License. import logging import urllib -from dataclasses import dataclass -from typing import Any, Iterator, Optional +from datetime import datetime, timedelta +from typing import Any, Optional -from contextlib2 import contextmanager from flask import url_for from sqlalchemy.orm import Session from superset import app, thumbnail_cache from superset.commands.base import BaseCommand -from superset.extensions import db, security_manager -from superset.models.reports import ReportExecutionLog, ReportLogState, ReportSchedule +from superset.commands.exceptions import CommandException +from superset.extensions import security_manager +from superset.models.reports import ( + ReportExecutionLog, + ReportLogState, + ReportSchedule, + ReportScheduleType, +) +from superset.reports.commands.alert import AlertCommand +from superset.reports.commands.base import normal_session_scope +from superset.reports.commands.exceptions import ( + ReportScheduleAlertGracePeriodError, + ReportScheduleExecuteUnexpectedError, + ReportScheduleNotFoundError, + ReportSchedulePreviousWorkingError, + ReportScheduleScreenshotFailedError, +) from superset.reports.dao import ReportScheduleDAO from superset.reports.notifications import create_notification from superset.reports.notifications.base import NotificationContent, ScreenshotData from superset.utils.celery import session_scope -from superset.utils.screenshots import ChartScreenshot, DashboardScreenshot +from superset.utils.screenshots import ( + BaseScreenshot, + ChartScreenshot, + DashboardScreenshot, +) from superset.utils.urls import get_url_path logger = logging.getLogger(__name__) -@contextmanager -def normal_session_scope() -> Iterator[Session]: - session = db.session - try: - yield session - session.commit() - except Exception as ex: - session.rollback() - logger.exception(ex) - raise - finally: - session.close() - - def _get_url_path(view: str, user_friendly: bool = False, **kwargs: Any) -> str: with app.test_request_context(): base_url = ( @@ -62,17 +66,90 @@ def _get_url_path(view: str, user_friendly: bool = False, **kwargs: Any) -> str: class ExecuteReportScheduleCommand(BaseCommand): + """ + Execute all types of report schedules. + On reports takes chart or dashboard screenshots and send configured notifications + On Alerts uses related Command AlertCommand + """ + def __init__(self, model_id: int, worker_context: bool = True): self._worker_context = worker_context self._model_id = model_id self._model: Optional[ReportSchedule] = None - def set_state(self, session: Session, state: ReportLogState): + def set_state_and_log( + self, + session: Session, + start_dttm: datetime, + state: ReportLogState, + scheduled_dttm: Optional[datetime] = None, + value: Optional[float] = None, + value_row_json: Optional[str] = None, + error_message: Optional[str] = None, + ) -> None: + """ + Updates current ReportSchedule state and TS. If on final state writes the log + for this execution + """ + now_dttm = datetime.utcnow() + if state == ReportLogState.WORKING: + return self.set_state(session, state, now_dttm) + self.set_state(session, state, now_dttm) + self.create_log( + session, + start_dttm, + now_dttm, + state, + scheduled_dttm=scheduled_dttm, + value=value, + value_row_json=value_row_json, + error_message=error_message, + ) + + def set_state( + self, session: Session, state: ReportLogState, dttm: datetime + ) -> None: + """ + Set the current report schedule state, on this case we want to + commit immediately + """ if self._model: self._model.last_state = state + self._model.last_eval_dttm = dttm session.commit() + def create_log( + self, + session: Session, + start_dttm: datetime, + end_dttm: datetime, + state: ReportLogState, + scheduled_dttm: Optional[datetime] = None, + value: Optional[float] = None, + value_row_json: Optional[str] = None, + error_message: Optional[str] = None, + ) -> None: + # TODO Remove this hack + scheduled_dttm = scheduled_dttm or datetime.utcnow() + if self._model: + log = ReportExecutionLog( + scheduled_dttm=scheduled_dttm, + start_dttm=start_dttm, + end_dttm=end_dttm, + value=value, + value_row_json=value_row_json, + state=state, + error_message=error_message, + report_schedule=self._model, + ) + session.add(log) + def get_url(self, user_friendly: bool = False) -> str: + """ + Get the url for this report schedule chart or dashboard + """ + if not self._model: + raise ReportScheduleExecuteUnexpectedError() if self._model.chart: return get_url_path( "Superset.slice", @@ -87,20 +164,30 @@ def get_url(self, user_friendly: bool = False) -> str: ) def get_screenshot(self) -> ScreenshotData: + """ + Get a chart or dashboard screenshot + :raises: ReportScheduleScreenshotFailedError + """ + if not self._model: + raise ReportScheduleExecuteUnexpectedError() url = self.get_url() + screenshot: Optional[BaseScreenshot] = None if self._model.chart: screenshot = ChartScreenshot(url, self._model.chart.digest) else: screenshot = DashboardScreenshot(url, self._model.dashboard.digest) image_url = self.get_url(user_friendly=True) - user = security_manager.find_user(app.config["THUMBNAIL_SELENIUM_USER"]) image_data = screenshot.compute_and_cache( user=user, cache=thumbnail_cache, force=True, ) + if not image_data: + raise ReportScheduleScreenshotFailedError() return ScreenshotData(url=image_url, image=image_data) def get_notification_content(self) -> NotificationContent: + if not self._model: + raise ReportScheduleExecuteUnexpectedError() screenshot_data = self.get_screenshot() if self._model.chart: name = self._model.chart.slice_name @@ -114,16 +201,48 @@ def run(self) -> None: else: session_context = normal_session_scope with session_context as session: - self.validate(session=session) - self.set_state(session, ReportLogState.WORKING) - notification_content = self.get_notification_content() - for recipient in self._model.recipients: - notification = create_notification(recipient, notification_content) - notification.send() - self.set_state(session, ReportLogState.SUCCESS) + try: + start_dttm = datetime.utcnow() + self.validate(session=session) + if not self._model: + raise ReportScheduleExecuteUnexpectedError() + self.set_state_and_log(session, start_dttm, ReportLogState.WORKING) + # If it's an alert check if the alert is triggered + if self._model.type == ReportScheduleType.ALERT: + if not AlertCommand(self._model).run(): + self.set_state_and_log(session, start_dttm, ReportLogState.NOOP) + return + notification_content = self.get_notification_content() + for recipient in self._model.recipients: + notification = create_notification(recipient, notification_content) + notification.send() + + # Log, state and TS + self.set_state_and_log(session, start_dttm, ReportLogState.SUCCESS) + except ReportScheduleAlertGracePeriodError as ex: + self.set_state_and_log( + session, start_dttm, ReportLogState.NOOP, error_message=str(ex) + ) + except CommandException as ex: + self.set_state_and_log( + session, start_dttm, ReportLogState.ERROR, error_message=str(ex) + ) + logger.error("Failed to execute report schedule: %s", ex) def validate(self, session: Session = None) -> None: # Validate/populate model exists self._model = ReportScheduleDAO.find_by_id(self._model_id, session=session) if not self._model: - raise Exception("NOT FOUND") + raise ReportScheduleNotFoundError() + # Avoid overlap processing + if self._model.last_state == ReportLogState.WORKING: + raise ReportSchedulePreviousWorkingError() + # Check grace period + if ( + self._model.type == ReportScheduleType.ALERT + and self._model.last_state == ReportLogState.SUCCESS + and self._model.grace_period + and datetime.utcnow() - timedelta(seconds=self._model.grace_period) + < self._model.last_eval_dttm + ): + raise ReportScheduleAlertGracePeriodError() diff --git a/superset/reports/commands/log_prune.py b/superset/reports/commands/log_prune.py new file mode 100644 index 0000000000000..b09cf3a596511 --- /dev/null +++ b/superset/reports/commands/log_prune.py @@ -0,0 +1,62 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import logging +from datetime import datetime, timedelta +from typing import Optional + +from flask_appbuilder.security.sqla.models import User +from sqlalchemy.orm import Session + +from superset.commands.base import BaseCommand +from superset.dao.exceptions import DAODeleteFailedError +from superset.models.reports import ReportSchedule +from superset.reports.commands.base import normal_session_scope +from superset.reports.commands.exceptions import ( + ReportScheduleDeleteFailedError, + ReportScheduleNotFoundError, +) +from superset.reports.dao import ReportScheduleDAO +from superset.utils.celery import session_scope + +logger = logging.getLogger(__name__) + + +class PruneReportScheduleLogCommand(BaseCommand): + """ + Prunes logs from all report schedules + """ + + def __init__(self, worker_context: bool = True): + self._worker_context = worker_context + + def run(self) -> None: + if self._worker_context: + session_context = session_scope(nullpool=True) + else: + session_context = normal_session_scope + with session_context as session: + self.validate() + for report_schedule in session.query(ReportSchedule).all(): + from_date = datetime.utcnow() - timedelta( + days=report_schedule.log_retention + ) + ReportScheduleDAO.bulk_delete_logs( + report_schedule, from_date, session=session, commit=False + ) + + def validate(self) -> None: + pass diff --git a/superset/reports/dao.py b/superset/reports/dao.py index 6f037576cb62c..14c8beb210f87 100644 --- a/superset/reports/dao.py +++ b/superset/reports/dao.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. import logging +from datetime import datetime from typing import Any, Dict, List, Optional from flask_appbuilder import Model @@ -24,7 +25,7 @@ from superset.dao.base import BaseDAO from superset.dao.exceptions import DAOCreateFailedError, DAODeleteFailedError from superset.extensions import db -from superset.models.reports import ReportRecipients, ReportSchedule +from superset.models.reports import ReportExecutionLog, ReportRecipients, ReportSchedule logger = logging.getLogger(__name__) @@ -139,7 +140,31 @@ def update( @staticmethod def find_active(session: Optional[Session] = None) -> List[ReportSchedule]: + """ + Find all active reports. If session is passed it will be used instead of the + default `db.session`, this is useful when on a celery worker session context + """ session = session or db.session return ( session.query(ReportSchedule).filter(ReportSchedule.active.is_(True)).all() ) + + @staticmethod + def bulk_delete_logs( + model: ReportSchedule, + from_date: datetime, + session: Optional[Session] = None, + commit: bool = True, + ) -> None: + session = session or db.session + try: + session.query(ReportExecutionLog).filter( + ReportExecutionLog.report_schedule == model, + ReportExecutionLog.end_dttm < from_date, + ).delete(synchronize_session="fetch") + if commit: + session.commit() + except SQLAlchemyError as ex: + if commit: + session.rollback() + raise ex diff --git a/superset/reports/notifications/base.py b/superset/reports/notifications/base.py index df67735bf3c6e..9369e1dba4f30 100644 --- a/superset/reports/notifications/base.py +++ b/superset/reports/notifications/base.py @@ -16,7 +16,7 @@ # specific language governing permissions and limitations # under the License. from dataclasses import dataclass -from typing import List, Optional, Type +from typing import Any, List, Optional, Type from superset.models.reports import ReportRecipients, ReportRecipientType @@ -24,7 +24,7 @@ @dataclass class ScreenshotData: url: str # url to chart/dashboard for this screenshot - image: Optional[bytes] # bytes for the screenshot + image: bytes # bytes for the screenshot @dataclass @@ -37,8 +37,8 @@ class BaseNotification: plugins: List[Type["BaseNotification"]] = [] type: Optional[ReportRecipientType] = None - def __init_subclass__(cls, **kwargs): - super().__init_subclass__(**kwargs) + def __init_subclass__(cls, *args: Any, **kwargs: Any) -> None: + super().__init_subclass__(*args, **kwargs) # type: ignore cls.plugins.append(cls) def __init__( diff --git a/superset/reports/notifications/email.py b/superset/reports/notifications/email.py index dc97da16ad379..594d887f0514b 100644 --- a/superset/reports/notifications/email.py +++ b/superset/reports/notifications/email.py @@ -40,7 +40,7 @@ class EmailContent: class EmailNotification(BaseNotification): type = ReportRecipientType.EMAIL - def _get_smtp_from_domain(self): + def _get_smtp_from_domain(self) -> str: return parseaddr(app.config["SMTP_MAIL_FROM"])[1].split("@")[1] def _get_content(self) -> EmailContent: @@ -49,7 +49,7 @@ def _get_content(self) -> EmailContent: domain = self._get_smtp_from_domain() msgid = make_msgid(domain)[1:-1] - images = {msgid: self._content.screenshot.image} + image = {msgid: self._content.screenshot.image} body = __( """ Explore in Superset

@@ -58,7 +58,7 @@ def _get_content(self) -> EmailContent: url=self._content.screenshot.url, msgid=msgid, ) - return EmailContent(body=body, images=images) + return EmailContent(body=body, images=image) def _get_subject(self) -> str: return __( diff --git a/superset/tasks/scheduler.py b/superset/tasks/scheduler.py index 2d06cb1baf9a2..783e4d20fb800 100644 --- a/superset/tasks/scheduler.py +++ b/superset/tasks/scheduler.py @@ -4,8 +4,10 @@ import croniter +from superset.commands.exceptions import CommandException from superset.extensions import celery_app from superset.reports.commands.execute import ExecuteReportScheduleCommand +from superset.reports.commands.log_prune import PruneReportScheduleLogCommand from superset.reports.dao import ReportScheduleDAO from superset.utils.celery import session_scope @@ -37,7 +39,9 @@ def stop_at(self) -> datetime: @celery_app.task(name="reports.scheduler") def scheduler() -> None: - """ Celery beat main scheduler """ + """ + Celery beat main scheduler for reports + """ schedule_window = ScheduleWindow() with session_scope(nullpool=True) as session: active_schedules = ReportScheduleDAO.find_active(session) @@ -52,5 +56,10 @@ def scheduler() -> None: def execute(report_schedule_id: int) -> None: try: ExecuteReportScheduleCommand(report_schedule_id, worker_context=True).run() - except Exception as ex: - pass + except CommandException as ex: + logger.error("An exception occurred while executing the report %s", ex) + + +@celery_app.task(name="reports.prune_log") +def prune_log() -> None: + PruneReportScheduleLogCommand(worker_context=True).run() From d2f0d05659e9739d457c2714c766035eb78a1740 Mon Sep 17 00:00:00 2001 From: Daniel Gaspar Date: Mon, 16 Nov 2020 12:12:12 +0000 Subject: [PATCH 03/16] add missing license --- superset/tasks/scheduler.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/superset/tasks/scheduler.py b/superset/tasks/scheduler.py index 783e4d20fb800..7318191357923 100644 --- a/superset/tasks/scheduler.py +++ b/superset/tasks/scheduler.py @@ -1,6 +1,22 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. import logging from datetime import datetime, timedelta -from typing import Iterator, Optional +from typing import Iterator import croniter From cfa92eb80390ed852758da8a757f65032826e788 Mon Sep 17 00:00:00 2001 From: Daniel Gaspar Date: Mon, 16 Nov 2020 14:45:15 +0000 Subject: [PATCH 04/16] lint --- superset/reports/commands/alert.py | 12 +++---- superset/reports/commands/execute.py | 22 +++++------- superset/reports/commands/log_prune.py | 11 +----- superset/reports/notifications/base.py | 2 +- superset/reports/notifications/email.py | 9 ++--- superset/reports/notifications/slack.py | 4 +-- superset/tasks/scheduler.py | 46 +++++++++---------------- 7 files changed, 40 insertions(+), 66 deletions(-) diff --git a/superset/reports/commands/alert.py b/superset/reports/commands/alert.py index 1d8d0dd84df2b..7601a07082424 100644 --- a/superset/reports/commands/alert.py +++ b/superset/reports/commands/alert.py @@ -20,7 +20,6 @@ from typing import Optional import numpy as np -import pandas as pd from superset import jinja_context from superset.commands.base import BaseCommand @@ -48,12 +47,11 @@ def run(self) -> bool: if self._report_schedule.validator_type == ReportScheduleValidatorType.NOT_NULL: return self._result not in (0, None, np.nan) - else: - operator = json.loads(self._report_schedule.validator_config_json)["op"] - threshold = json.loads(self._report_schedule.validator_config_json)[ - "threshold" - ] - return OPERATOR_FUNCTIONS[operator](self._result, threshold) + operator = json.loads(self._report_schedule.validator_config_json)["op"] + threshold = json.loads(self._report_schedule.validator_config_json)[ + "threshold" + ] + return OPERATOR_FUNCTIONS[operator](self._result, threshold) def validate(self) -> None: """ diff --git a/superset/reports/commands/execute.py b/superset/reports/commands/execute.py index 35c85db777ba4..84b02424f1fc6 100644 --- a/superset/reports/commands/execute.py +++ b/superset/reports/commands/execute.py @@ -33,7 +33,6 @@ ReportScheduleType, ) from superset.reports.commands.alert import AlertCommand -from superset.reports.commands.base import normal_session_scope from superset.reports.commands.exceptions import ( ReportScheduleAlertGracePeriodError, ReportScheduleExecuteUnexpectedError, @@ -65,19 +64,18 @@ def _get_url_path(view: str, user_friendly: bool = False, **kwargs: Any) -> str: return urllib.parse.urljoin(str(base_url), url_for(view, **kwargs)) -class ExecuteReportScheduleCommand(BaseCommand): +class AsyncExecuteReportScheduleCommand(BaseCommand): """ Execute all types of report schedules. On reports takes chart or dashboard screenshots and send configured notifications On Alerts uses related Command AlertCommand """ - def __init__(self, model_id: int, worker_context: bool = True): - self._worker_context = worker_context + def __init__(self, model_id: int): self._model_id = model_id self._model: Optional[ReportSchedule] = None - def set_state_and_log( + def set_state_and_log( # pylint: disable=too-many-arguments self, session: Session, start_dttm: datetime, @@ -93,7 +91,7 @@ def set_state_and_log( """ now_dttm = datetime.utcnow() if state == ReportLogState.WORKING: - return self.set_state(session, state, now_dttm) + self.set_state(session, state, now_dttm) self.set_state(session, state, now_dttm) self.create_log( session, @@ -118,7 +116,7 @@ def set_state( self._model.last_eval_dttm = dttm session.commit() - def create_log( + def create_log( # pylint: disable=too-many-arguments self, session: Session, start_dttm: datetime, @@ -196,11 +194,7 @@ def get_notification_content(self) -> NotificationContent: return NotificationContent(name=name, screenshot=screenshot_data) def run(self) -> None: - if self._worker_context: - session_context = session_scope(nullpool=True) - else: - session_context = normal_session_scope - with session_context as session: + with session_scope(nullpool=True) as session: try: start_dttm = datetime.utcnow() self.validate(session=session) @@ -229,7 +223,9 @@ def run(self) -> None: ) logger.error("Failed to execute report schedule: %s", ex) - def validate(self, session: Session = None) -> None: + def validate( # pylint: disable=arguments-differ + self, session: Session = None + ) -> None: # Validate/populate model exists self._model = ReportScheduleDAO.find_by_id(self._model_id, session=session) if not self._model: diff --git a/superset/reports/commands/log_prune.py b/superset/reports/commands/log_prune.py index b09cf3a596511..5052b88db56ee 100644 --- a/superset/reports/commands/log_prune.py +++ b/superset/reports/commands/log_prune.py @@ -16,26 +16,17 @@ # under the License. import logging from datetime import datetime, timedelta -from typing import Optional - -from flask_appbuilder.security.sqla.models import User -from sqlalchemy.orm import Session from superset.commands.base import BaseCommand -from superset.dao.exceptions import DAODeleteFailedError from superset.models.reports import ReportSchedule from superset.reports.commands.base import normal_session_scope -from superset.reports.commands.exceptions import ( - ReportScheduleDeleteFailedError, - ReportScheduleNotFoundError, -) from superset.reports.dao import ReportScheduleDAO from superset.utils.celery import session_scope logger = logging.getLogger(__name__) -class PruneReportScheduleLogCommand(BaseCommand): +class AsyncPruneReportScheduleLogCommand(BaseCommand): """ Prunes logs from all report schedules """ diff --git a/superset/reports/notifications/base.py b/superset/reports/notifications/base.py index 9369e1dba4f30..bd732546e634f 100644 --- a/superset/reports/notifications/base.py +++ b/superset/reports/notifications/base.py @@ -33,7 +33,7 @@ class NotificationContent: screenshot: ScreenshotData -class BaseNotification: +class BaseNotification: # pylint: disable=too-few-public-methods plugins: List[Type["BaseNotification"]] = [] type: Optional[ReportRecipientType] = None diff --git a/superset/reports/notifications/email.py b/superset/reports/notifications/email.py index 594d887f0514b..8e0d2e554426d 100644 --- a/superset/reports/notifications/email.py +++ b/superset/reports/notifications/email.py @@ -37,16 +37,17 @@ class EmailContent: images: Dict[str, bytes] -class EmailNotification(BaseNotification): +class EmailNotification(BaseNotification): # pylint: disable=too-few-public-methods type = ReportRecipientType.EMAIL - def _get_smtp_from_domain(self) -> str: + @staticmethod + def _get_smtp_domain() -> str: return parseaddr(app.config["SMTP_MAIL_FROM"])[1].split("@")[1] def _get_content(self) -> EmailContent: # Get the domain from the 'From' address .. # and make a message id without the < > in the ends - domain = self._get_smtp_from_domain() + domain = self._get_smtp_domain() msgid = make_msgid(domain)[1:-1] image = {msgid: self._content.screenshot.image} @@ -86,4 +87,4 @@ def send(self) -> None: mime_subtype="related", dryrun=False, ) - logger.debug(f"EMAIL SENT {to} {subject}") + logger.info("Report sent to email") diff --git a/superset/reports/notifications/slack.py b/superset/reports/notifications/slack.py index a83fc54e04517..1cf3ce96feabd 100644 --- a/superset/reports/notifications/slack.py +++ b/superset/reports/notifications/slack.py @@ -33,7 +33,7 @@ logger = logging.getLogger(__name__) -class SlackNotification(BaseNotification): +class SlackNotification(BaseNotification): # pylint: disable=too-few-public-methods type = ReportRecipientType.SLACK def _get_channel(self) -> str: @@ -75,4 +75,4 @@ def send(self) -> None: SlackResponse, client.chat_postMessage(channel=channel, text=body), ) assert response["message"]["text"], str(response) - logger.info("Sent the report to the slack %s", channel) + logger.info("Report sent to slack") diff --git a/superset/tasks/scheduler.py b/superset/tasks/scheduler.py index 7318191357923..c1321f6f82f5c 100644 --- a/superset/tasks/scheduler.py +++ b/superset/tasks/scheduler.py @@ -22,35 +22,23 @@ from superset.commands.exceptions import CommandException from superset.extensions import celery_app -from superset.reports.commands.execute import ExecuteReportScheduleCommand -from superset.reports.commands.log_prune import PruneReportScheduleLogCommand +from superset.reports.commands.execute import AsyncExecuteReportScheduleCommand +from superset.reports.commands.log_prune import AsyncPruneReportScheduleLogCommand from superset.reports.dao import ReportScheduleDAO from superset.utils.celery import session_scope logger = logging.getLogger(__name__) -class ScheduleWindow: - def __init__(self, window_size: int = 10) -> None: - self._window_size = window_size - utc_now = datetime.utcnow() - self._start_at = utc_now - timedelta(seconds=1) - self._stop_at = utc_now + timedelta(seconds=self._window_size) - - def next(self, cron: str) -> Iterator[datetime]: - crons = croniter.croniter(cron, self._start_at) - for schedule in crons.all_next(datetime): - if schedule >= self._stop_at: - break - yield schedule - - @property - def start_at(self) -> datetime: - return self._start_at - - @property - def stop_at(self) -> datetime: - return self._stop_at +def cron_schedule_window(cron: str, window_size: int = 10) -> Iterator[datetime]: + utc_now = datetime.utcnow() + start_at = utc_now - timedelta(seconds=1) + stop_at = utc_now + timedelta(seconds=window_size) + crons = croniter.croniter(cron, start_at) + for schedule in crons.all_next(datetime): + if schedule >= stop_at: + break + yield schedule @celery_app.task(name="reports.scheduler") @@ -58,24 +46,24 @@ def scheduler() -> None: """ Celery beat main scheduler for reports """ - schedule_window = ScheduleWindow() with session_scope(nullpool=True) as session: active_schedules = ReportScheduleDAO.find_active(session) for active_schedule in active_schedules: - logger.debug(f"Processing active schedule {active_schedule}") - for schedule in schedule_window.next(active_schedule.crontab): - logger.debug(f"Execution time {schedule}") + for schedule in cron_schedule_window(active_schedule.crontab): execute.apply_async((active_schedule.id,), eta=schedule) @celery_app.task(name="reports.execute") def execute(report_schedule_id: int) -> None: try: - ExecuteReportScheduleCommand(report_schedule_id, worker_context=True).run() + AsyncExecuteReportScheduleCommand(report_schedule_id).run() except CommandException as ex: logger.error("An exception occurred while executing the report %s", ex) @celery_app.task(name="reports.prune_log") def prune_log() -> None: - PruneReportScheduleLogCommand(worker_context=True).run() + try: + AsyncPruneReportScheduleLogCommand().run() + except CommandException as ex: + logger.error("An exception occurred while pruning report schedule logs %s", ex) From 2ff6de2726a2e74dfcf3e09065626d066c637c5f Mon Sep 17 00:00:00 2001 From: Daniel Gaspar Date: Mon, 16 Nov 2020 17:07:25 +0000 Subject: [PATCH 05/16] improvements and fix grace_period --- superset/reports/commands/base.py | 17 +----- superset/reports/commands/execute.py | 75 ++++++++++--------------- superset/reports/commands/log_prune.py | 7 +-- superset/reports/dao.py | 22 +++++++- superset/reports/notifications/base.py | 11 ++++ superset/reports/notifications/email.py | 4 ++ superset/reports/notifications/slack.py | 4 ++ superset/tasks/scheduler.py | 6 +- superset/utils/urls.py | 12 ++-- 9 files changed, 83 insertions(+), 75 deletions(-) diff --git a/superset/reports/commands/base.py b/superset/reports/commands/base.py index 012bc256ee70f..bb4064d22cde7 100644 --- a/superset/reports/commands/base.py +++ b/superset/reports/commands/base.py @@ -15,16 +15,13 @@ # specific language governing permissions and limitations # under the License. import logging -from typing import Any, Dict, Iterator, List +from typing import Any, Dict, List -from contextlib2 import contextmanager from marshmallow import ValidationError -from sqlalchemy.orm import Session from superset.charts.dao import ChartDAO from superset.commands.base import BaseCommand from superset.dashboards.dao import DashboardDAO -from superset.extensions import db from superset.reports.commands.exceptions import ( ChartNotFoundValidationError, DashboardNotFoundValidationError, @@ -34,18 +31,6 @@ logger = logging.getLogger(__name__) -@contextmanager -def normal_session_scope() -> Iterator[Session]: - session = db.session - try: - yield session - session.commit() - except Exception as ex: - session.rollback() - logger.exception(ex) - raise ex - - class BaseReportScheduleCommand(BaseCommand): _properties: Dict[str, Any] diff --git a/superset/reports/commands/execute.py b/superset/reports/commands/execute.py index 84b02424f1fc6..ee8a4c06fa863 100644 --- a/superset/reports/commands/execute.py +++ b/superset/reports/commands/execute.py @@ -15,11 +15,9 @@ # specific language governing permissions and limitations # under the License. import logging -import urllib from datetime import datetime, timedelta -from typing import Any, Optional +from typing import Optional -from flask import url_for from sqlalchemy.orm import Session from superset import app, thumbnail_cache @@ -54,35 +52,23 @@ logger = logging.getLogger(__name__) -def _get_url_path(view: str, user_friendly: bool = False, **kwargs: Any) -> str: - with app.test_request_context(): - base_url = ( - app.config["WEBDRIVER_BASEURL_USER_FRIENDLY"] - if user_friendly - else app.config["WEBDRIVER_BASEURL"] - ) - return urllib.parse.urljoin(str(base_url), url_for(view, **kwargs)) - - class AsyncExecuteReportScheduleCommand(BaseCommand): """ Execute all types of report schedules. - On reports takes chart or dashboard screenshots and send configured notifications - On Alerts uses related Command AlertCommand + - On reports takes chart or dashboard screenshots and sends configured notifications + - On Alerts uses related Command AlertCommand and sends configured notifications """ - def __init__(self, model_id: int): + def __init__(self, model_id: int, scheduled_dttm: datetime): self._model_id = model_id self._model: Optional[ReportSchedule] = None + self._scheduled_dttm = scheduled_dttm - def set_state_and_log( # pylint: disable=too-many-arguments + def set_state_and_log( self, session: Session, start_dttm: datetime, state: ReportLogState, - scheduled_dttm: Optional[datetime] = None, - value: Optional[float] = None, - value_row_json: Optional[str] = None, error_message: Optional[str] = None, ) -> None: """ @@ -92,16 +78,10 @@ def set_state_and_log( # pylint: disable=too-many-arguments now_dttm = datetime.utcnow() if state == ReportLogState.WORKING: self.set_state(session, state, now_dttm) + return self.set_state(session, state, now_dttm) self.create_log( - session, - start_dttm, - now_dttm, - state, - scheduled_dttm=scheduled_dttm, - value=value, - value_row_json=value_row_json, - error_message=error_message, + session, start_dttm, now_dttm, state, error_message=error_message, ) def set_state( @@ -122,20 +102,18 @@ def create_log( # pylint: disable=too-many-arguments start_dttm: datetime, end_dttm: datetime, state: ReportLogState, - scheduled_dttm: Optional[datetime] = None, - value: Optional[float] = None, - value_row_json: Optional[str] = None, error_message: Optional[str] = None, ) -> None: - # TODO Remove this hack - scheduled_dttm = scheduled_dttm or datetime.utcnow() + """ + Creates a Report execution log, uses the current computed last_value for Alerts + """ if self._model: log = ReportExecutionLog( - scheduled_dttm=scheduled_dttm, + scheduled_dttm=self._scheduled_dttm, start_dttm=start_dttm, end_dttm=end_dttm, - value=value, - value_row_json=value_row_json, + value=self._model.last_value, + value_row_json=self._model.last_value_row_json, state=state, error_message=error_message, report_schedule=self._model, @@ -144,7 +122,7 @@ def create_log( # pylint: disable=too-many-arguments def get_url(self, user_friendly: bool = False) -> str: """ - Get the url for this report schedule chart or dashboard + Get the url for this report schedule: chart or dashboard """ if not self._model: raise ReportScheduleExecuteUnexpectedError() @@ -184,6 +162,10 @@ def get_screenshot(self) -> ScreenshotData: return ScreenshotData(url=image_url, image=image_data) def get_notification_content(self) -> NotificationContent: + """ + Gets a notification content, this is composed by a title and a screenshot + :raises: ReportScheduleScreenshotFailedError + """ if not self._model: raise ReportScheduleExecuteUnexpectedError() screenshot_data = self.get_screenshot() @@ -234,11 +216,14 @@ def validate( # pylint: disable=arguments-differ if self._model.last_state == ReportLogState.WORKING: raise ReportSchedulePreviousWorkingError() # Check grace period - if ( - self._model.type == ReportScheduleType.ALERT - and self._model.last_state == ReportLogState.SUCCESS - and self._model.grace_period - and datetime.utcnow() - timedelta(seconds=self._model.grace_period) - < self._model.last_eval_dttm - ): - raise ReportScheduleAlertGracePeriodError() + if self._model.type == ReportScheduleType.ALERT: + last_success = ReportScheduleDAO.find_last_success_log(session) + if ( + last_success + and self._model.last_state + in (ReportLogState.SUCCESS, ReportLogState.NOOP) + and self._model.grace_period + and datetime.utcnow() - timedelta(seconds=self._model.grace_period) + < last_success.end_dttm + ): + raise ReportScheduleAlertGracePeriodError() diff --git a/superset/reports/commands/log_prune.py b/superset/reports/commands/log_prune.py index 5052b88db56ee..9825a35eef2a8 100644 --- a/superset/reports/commands/log_prune.py +++ b/superset/reports/commands/log_prune.py @@ -19,7 +19,6 @@ from superset.commands.base import BaseCommand from superset.models.reports import ReportSchedule -from superset.reports.commands.base import normal_session_scope from superset.reports.dao import ReportScheduleDAO from superset.utils.celery import session_scope @@ -35,11 +34,7 @@ def __init__(self, worker_context: bool = True): self._worker_context = worker_context def run(self) -> None: - if self._worker_context: - session_context = session_scope(nullpool=True) - else: - session_context = normal_session_scope - with session_context as session: + with session_scope(nullpool=True) as session: self.validate() for report_schedule in session.query(ReportSchedule).all(): from_date = datetime.utcnow() - timedelta( diff --git a/superset/reports/dao.py b/superset/reports/dao.py index 14c8beb210f87..6081fc8efa67e 100644 --- a/superset/reports/dao.py +++ b/superset/reports/dao.py @@ -25,7 +25,12 @@ from superset.dao.base import BaseDAO from superset.dao.exceptions import DAOCreateFailedError, DAODeleteFailedError from superset.extensions import db -from superset.models.reports import ReportExecutionLog, ReportRecipients, ReportSchedule +from superset.models.reports import ( + ReportExecutionLog, + ReportLogState, + ReportRecipients, + ReportSchedule, +) logger = logging.getLogger(__name__) @@ -149,6 +154,21 @@ def find_active(session: Optional[Session] = None) -> List[ReportSchedule]: session.query(ReportSchedule).filter(ReportSchedule.active.is_(True)).all() ) + @staticmethod + def find_last_success_log( + session: Optional[Session] = None, + ) -> Optional[ReportExecutionLog]: + """ + Finds last success execution log + """ + session = session or db.session + return ( + session.query(ReportExecutionLog) + .filter(ReportExecutionLog.state == ReportLogState.SUCCESS) + .order_by(ReportExecutionLog.end_dttm.desc()) + .first() + ) + @staticmethod def bulk_delete_logs( model: ReportSchedule, diff --git a/superset/reports/notifications/base.py b/superset/reports/notifications/base.py index bd732546e634f..f55154c1e7430 100644 --- a/superset/reports/notifications/base.py +++ b/superset/reports/notifications/base.py @@ -34,8 +34,19 @@ class NotificationContent: class BaseNotification: # pylint: disable=too-few-public-methods + """ + Serves has base for all notifications and creates a simple plugin system + for extending future implementations. + Child implementations get automatically registered and should identify the + notification type + """ + plugins: List[Type["BaseNotification"]] = [] type: Optional[ReportRecipientType] = None + """ + Child classes set their notification type ex: `type = "email"` this string will be + used by ReportRecipients.type to map to the correct implementation + """ def __init_subclass__(cls, *args: Any, **kwargs: Any) -> None: super().__init_subclass__(*args, **kwargs) # type: ignore diff --git a/superset/reports/notifications/email.py b/superset/reports/notifications/email.py index 8e0d2e554426d..96cf2b048b3cc 100644 --- a/superset/reports/notifications/email.py +++ b/superset/reports/notifications/email.py @@ -38,6 +38,10 @@ class EmailContent: class EmailNotification(BaseNotification): # pylint: disable=too-few-public-methods + """ + Sends an email notification for a report recipient + """ + type = ReportRecipientType.EMAIL @staticmethod diff --git a/superset/reports/notifications/slack.py b/superset/reports/notifications/slack.py index 1cf3ce96feabd..1bd52184b8c51 100644 --- a/superset/reports/notifications/slack.py +++ b/superset/reports/notifications/slack.py @@ -34,6 +34,10 @@ class SlackNotification(BaseNotification): # pylint: disable=too-few-public-methods + """ + Sends a slack notification for a report recipient + """ + type = ReportRecipientType.SLACK def _get_channel(self) -> str: diff --git a/superset/tasks/scheduler.py b/superset/tasks/scheduler.py index c1321f6f82f5c..956074c33a2fc 100644 --- a/superset/tasks/scheduler.py +++ b/superset/tasks/scheduler.py @@ -50,13 +50,13 @@ def scheduler() -> None: active_schedules = ReportScheduleDAO.find_active(session) for active_schedule in active_schedules: for schedule in cron_schedule_window(active_schedule.crontab): - execute.apply_async((active_schedule.id,), eta=schedule) + execute.apply_async((active_schedule.id, schedule,), eta=schedule) @celery_app.task(name="reports.execute") -def execute(report_schedule_id: int) -> None: +def execute(report_schedule_id: int, scheduled_dttm: datetime) -> None: try: - AsyncExecuteReportScheduleCommand(report_schedule_id).run() + AsyncExecuteReportScheduleCommand(report_schedule_id, scheduled_dttm).run() except CommandException as ex: logger.error("An exception occurred while executing the report %s", ex) diff --git a/superset/utils/urls.py b/superset/utils/urls.py index 905376991dc48..fe9455d27e3c3 100644 --- a/superset/utils/urls.py +++ b/superset/utils/urls.py @@ -20,11 +20,15 @@ from flask import current_app, url_for -def headless_url(path: str) -> str: - base_url = current_app.config.get("WEBDRIVER_BASEURL", "") +def headless_url(path: str, user_friendly: bool = False) -> str: + base_url = ( + current_app.config["WEBDRIVER_BASEURL_USER_FRIENDLY"] + if user_friendly + else current_app.config["WEBDRIVER_BASEURL"] + ) return urllib.parse.urljoin(base_url, path) -def get_url_path(view: str, **kwargs: Any) -> str: +def get_url_path(view: str, user_friendly: bool = False, **kwargs: Any) -> str: with current_app.test_request_context(): - return headless_url(url_for(view, **kwargs)) + return headless_url(url_for(view, **kwargs), user_friendly=user_friendly) From 0e1f0812bd575e50e10bf9fc4b49f6f64a8d751c Mon Sep 17 00:00:00 2001 From: Daniel Gaspar Date: Mon, 16 Nov 2020 17:30:17 +0000 Subject: [PATCH 06/16] lint --- superset/reports/commands/alert.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/superset/reports/commands/alert.py b/superset/reports/commands/alert.py index 7601a07082424..9b6ad215ca5ab 100644 --- a/superset/reports/commands/alert.py +++ b/superset/reports/commands/alert.py @@ -48,9 +48,7 @@ def run(self) -> bool: if self._report_schedule.validator_type == ReportScheduleValidatorType.NOT_NULL: return self._result not in (0, None, np.nan) operator = json.loads(self._report_schedule.validator_config_json)["op"] - threshold = json.loads(self._report_schedule.validator_config_json)[ - "threshold" - ] + threshold = json.loads(self._report_schedule.validator_config_json)["threshold"] return OPERATOR_FUNCTIONS[operator](self._result, threshold) def validate(self) -> None: From 198cd75c62271b7133e55f97784bcf2012ff9536 Mon Sep 17 00:00:00 2001 From: Daniel Gaspar Date: Thu, 19 Nov 2020 00:18:32 +0000 Subject: [PATCH 07/16] add tests and fix bugs --- superset/reports/commands/alert.py | 14 +- superset/reports/commands/exceptions.py | 12 +- superset/reports/commands/execute.py | 79 ++-- superset/reports/notifications/email.py | 30 +- superset/reports/notifications/exceptions.py | 20 + superset/reports/notifications/slack.py | 43 +- superset/tasks/scheduler.py | 4 +- tests/reports/api_tests.py | 45 +- tests/reports/commands_tests.py | 446 +++++++++++++++++++ tests/reports/utils.py | 68 +++ 10 files changed, 652 insertions(+), 109 deletions(-) create mode 100644 superset/reports/notifications/exceptions.py create mode 100644 tests/reports/commands_tests.py create mode 100644 tests/reports/utils.py diff --git a/superset/reports/commands/alert.py b/superset/reports/commands/alert.py index 9b6ad215ca5ab..6c8d042a76990 100644 --- a/superset/reports/commands/alert.py +++ b/superset/reports/commands/alert.py @@ -43,10 +43,11 @@ def __init__(self, report_schedule: ReportSchedule): def run(self) -> bool: self.validate() - self._report_schedule.last_value = self._result if self._report_schedule.validator_type == ReportScheduleValidatorType.NOT_NULL: + self._report_schedule.last_value_row_json = self._result return self._result not in (0, None, np.nan) + self._report_schedule.last_value = self._result operator = json.loads(self._report_schedule.validator_config_json)["op"] threshold = json.loads(self._report_schedule.validator_config_json)["threshold"] return OPERATOR_FUNCTIONS[operator](self._result, threshold) @@ -72,8 +73,15 @@ def validate(self) -> None: if rows[0][1] is None: return try: - # Check if it's float or if we can convert it - self._result = float(rows[0][1]) + + if ( + self._report_schedule.validator_type + == ReportScheduleValidatorType.NOT_NULL + ): + self._result = rows[0][1] + else: + # Check if it's float or if we can convert it + self._result = float(rows[0][1]) return except (AssertionError, TypeError, ValueError): raise AlertQueryInvalidTypeError() diff --git a/superset/reports/commands/exceptions.py b/superset/reports/commands/exceptions.py index 0bc80d2975524..5615599cc8115 100644 --- a/superset/reports/commands/exceptions.py +++ b/superset/reports/commands/exceptions.py @@ -108,15 +108,15 @@ class PruneReportScheduleLogFailedError(CommandException): class ReportScheduleScreenshotFailedError(CommandException): - message = _("Report Schedule execute screenshot failed.") + message = _("Report Schedule execution failed when generating a screenshot.") class ReportScheduleExecuteUnexpectedError(CommandException): - message = _("Report Schedule execute got an unexpected error.") + message = _("Report Schedule execution got an unexpected error.") class ReportSchedulePreviousWorkingError(CommandException): - message = _("Report Schedule is still working error, refused to re compute.") + message = _("Report Schedule is still working, refusing to re-compute.") class ReportScheduleNameUniquenessValidationError(ValidationError): @@ -137,8 +137,12 @@ class AlertQueryMultipleColumnsError(CommandException): class AlertQueryInvalidTypeError(CommandException): - message = _("Alert query returned a non-number value") + message = _("Alert query returned a non-number value.") class ReportScheduleAlertGracePeriodError(CommandException): + message = _("Alert fired during grace period.") + + +class ReportScheduleNotificationError(CommandException): message = _("Alert on grace period") diff --git a/superset/reports/commands/execute.py b/superset/reports/commands/execute.py index ee8a4c06fa863..bb3384702d1f3 100644 --- a/superset/reports/commands/execute.py +++ b/superset/reports/commands/execute.py @@ -35,12 +35,14 @@ ReportScheduleAlertGracePeriodError, ReportScheduleExecuteUnexpectedError, ReportScheduleNotFoundError, + ReportScheduleNotificationError, ReportSchedulePreviousWorkingError, ReportScheduleScreenshotFailedError, ) from superset.reports.dao import ReportScheduleDAO from superset.reports.notifications import create_notification from superset.reports.notifications.base import NotificationContent, ScreenshotData +from superset.reports.notifications.exceptions import NotificationError from superset.utils.celery import session_scope from superset.utils.screenshots import ( BaseScreenshot, @@ -120,39 +122,36 @@ def create_log( # pylint: disable=too-many-arguments ) session.add(log) - def get_url(self, user_friendly: bool = False) -> str: + @staticmethod + def _get_url(report_schedule: ReportSchedule, user_friendly: bool = False) -> str: """ Get the url for this report schedule: chart or dashboard """ - if not self._model: - raise ReportScheduleExecuteUnexpectedError() - if self._model.chart: + if report_schedule.chart: return get_url_path( "Superset.slice", user_friendly=user_friendly, - slice_id=self._model.chart_id, + slice_id=report_schedule.chart_id, standalone="true", ) return get_url_path( "Superset.dashboard", user_friendly=user_friendly, - dashboard_id_or_slug=self._model.dashboard_id, + dashboard_id_or_slug=report_schedule.dashboard_id, ) - def get_screenshot(self) -> ScreenshotData: + def _get_screenshot(self, report_schedule: ReportSchedule) -> ScreenshotData: """ Get a chart or dashboard screenshot :raises: ReportScheduleScreenshotFailedError """ - if not self._model: - raise ReportScheduleExecuteUnexpectedError() - url = self.get_url() + url = self._get_url(report_schedule) screenshot: Optional[BaseScreenshot] = None - if self._model.chart: - screenshot = ChartScreenshot(url, self._model.chart.digest) + if report_schedule.chart: + screenshot = ChartScreenshot(url, report_schedule.chart.digest) else: - screenshot = DashboardScreenshot(url, self._model.dashboard.digest) - image_url = self.get_url(user_friendly=True) + screenshot = DashboardScreenshot(url, report_schedule.dashboard.digest) + image_url = self._get_url(report_schedule, user_friendly=True) user = security_manager.find_user(app.config["THUMBNAIL_SELENIUM_USER"]) image_data = screenshot.compute_and_cache( user=user, cache=thumbnail_cache, force=True, @@ -161,20 +160,38 @@ def get_screenshot(self) -> ScreenshotData: raise ReportScheduleScreenshotFailedError() return ScreenshotData(url=image_url, image=image_data) - def get_notification_content(self) -> NotificationContent: + def _get_notification_content( + self, report_schedule: ReportSchedule + ) -> NotificationContent: """ Gets a notification content, this is composed by a title and a screenshot :raises: ReportScheduleScreenshotFailedError """ - if not self._model: - raise ReportScheduleExecuteUnexpectedError() - screenshot_data = self.get_screenshot() - if self._model.chart: - name = self._model.chart.slice_name + screenshot_data = self._get_screenshot(report_schedule) + if report_schedule.chart: + name = report_schedule.chart.slice_name else: - name = self._model.dashboard.dashboard_title + name = report_schedule.dashboard.dashboard_title return NotificationContent(name=name, screenshot=screenshot_data) + def _send(self, report_schedule: ReportSchedule) -> None: + """ + Creates the notification content and sends them to all recipients + + :raises: ReportScheduleNotificationError + """ + notification_errors = [] + notification_content = self._get_notification_content(report_schedule) + for recipient in report_schedule.recipients: + notification = create_notification(recipient, notification_content) + try: + notification.send() + except NotificationError as ex: + # collect notification errors but keep processing them + notification_errors.append(str(ex)) + if notification_errors: + raise ReportScheduleNotificationError(";".join(notification_errors)) + def run(self) -> None: with session_scope(nullpool=True) as session: try: @@ -188,10 +205,8 @@ def run(self) -> None: if not AlertCommand(self._model).run(): self.set_state_and_log(session, start_dttm, ReportLogState.NOOP) return - notification_content = self.get_notification_content() - for recipient in self._model.recipients: - notification = create_notification(recipient, notification_content) - notification.send() + + self._send(self._model) # Log, state and TS self.set_state_and_log(session, start_dttm, ReportLogState.SUCCESS) @@ -199,11 +214,23 @@ def run(self) -> None: self.set_state_and_log( session, start_dttm, ReportLogState.NOOP, error_message=str(ex) ) + except ReportSchedulePreviousWorkingError as ex: + self.create_log( + session, + start_dttm, + datetime.utcnow(), + state=ReportLogState.ERROR, + error_message=str(ex), + ) + session.commit() + raise except CommandException as ex: self.set_state_and_log( session, start_dttm, ReportLogState.ERROR, error_message=str(ex) ) - logger.error("Failed to execute report schedule: %s", ex) + # We want to actually commit the state and log inside the scope + session.commit() + raise def validate( # pylint: disable=arguments-differ self, session: Session = None diff --git a/superset/reports/notifications/email.py b/superset/reports/notifications/email.py index 96cf2b048b3cc..e99a7f43e37da 100644 --- a/superset/reports/notifications/email.py +++ b/superset/reports/notifications/email.py @@ -26,6 +26,7 @@ from superset import app from superset.models.reports import ReportRecipientType from superset.reports.notifications.base import BaseNotification +from superset.reports.notifications.exceptions import NotificationError from superset.utils.core import send_email_smtp logger = logging.getLogger(__name__) @@ -79,16 +80,19 @@ def send(self) -> None: subject = self._get_subject() content = self._get_content() to = self._get_to() - send_email_smtp( - to, - subject, - content.body, - app.config, - files=[], - data=None, - images=content.images, - bcc="", - mime_subtype="related", - dryrun=False, - ) - logger.info("Report sent to email") + try: + send_email_smtp( + to, + subject, + content.body, + app.config, + files=[], + data=None, + images=content.images, + bcc="", + mime_subtype="related", + dryrun=False, + ) + logger.info("Report sent to email") + except Exception as ex: + raise NotificationError(ex) diff --git a/superset/reports/notifications/exceptions.py b/superset/reports/notifications/exceptions.py new file mode 100644 index 0000000000000..749a91fd955b0 --- /dev/null +++ b/superset/reports/notifications/exceptions.py @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +class NotificationError(Exception): + pass diff --git a/superset/reports/notifications/slack.py b/superset/reports/notifications/slack.py index 1bd52184b8c51..8e859ffc894fc 100644 --- a/superset/reports/notifications/slack.py +++ b/superset/reports/notifications/slack.py @@ -23,12 +23,13 @@ from flask_babel import gettext as __ from retry.api import retry from slack import WebClient -from slack.errors import SlackApiError +from slack.errors import SlackApiError, SlackClientError from slack.web.slack_response import SlackResponse from superset import app from superset.models.reports import ReportRecipientType from superset.reports.notifications.base import BaseNotification +from superset.reports.notifications.exceptions import NotificationError logger = logging.getLogger(__name__) @@ -62,21 +63,27 @@ def send(self) -> None: channel = self._get_channel() body = self._get_body() - client = WebClient( - token=app.config["SLACK_API_TOKEN"], proxy=app.config["SLACK_PROXY"] - ) - # files_upload returns SlackResponse as we run it in sync mode. - if file: - response = cast( - SlackResponse, - client.files_upload( - channels=channel, file=file, initial_comment=body, title="subject" - ), - ) - assert response["file"], str(response) # the uploaded file - else: - response = cast( - SlackResponse, client.chat_postMessage(channel=channel, text=body), + try: + client = WebClient( + token=app.config["SLACK_API_TOKEN"], proxy=app.config["SLACK_PROXY"] ) - assert response["message"]["text"], str(response) - logger.info("Report sent to slack") + # files_upload returns SlackResponse as we run it in sync mode. + if file: + response = cast( + SlackResponse, + client.files_upload( + channels=channel, + file=file, + initial_comment=body, + title="subject", + ), + ) + assert response["file"], str(response) # the uploaded file + else: + response = cast( + SlackResponse, client.chat_postMessage(channel=channel, text=body), + ) + assert response["message"]["text"], str(response) + logger.info("Report sent to slack") + except SlackClientError as ex: + raise NotificationError(ex) diff --git a/superset/tasks/scheduler.py b/superset/tasks/scheduler.py index 956074c33a2fc..62398f08df9ff 100644 --- a/superset/tasks/scheduler.py +++ b/superset/tasks/scheduler.py @@ -58,7 +58,7 @@ def execute(report_schedule_id: int, scheduled_dttm: datetime) -> None: try: AsyncExecuteReportScheduleCommand(report_schedule_id, scheduled_dttm).run() except CommandException as ex: - logger.error("An exception occurred while executing the report %s", ex) + logger.error("An exception occurred while executing the report: %s", ex) @celery_app.task(name="reports.prune_log") @@ -66,4 +66,4 @@ def prune_log() -> None: try: AsyncPruneReportScheduleLogCommand().run() except CommandException as ex: - logger.error("An exception occurred while pruning report schedule logs %s", ex) + logger.error("An exception occurred while pruning report schedule logs: %s", ex) diff --git a/tests/reports/api_tests.py b/tests/reports/api_tests.py index eb5425dade989..ae415700fe2e3 100644 --- a/tests/reports/api_tests.py +++ b/tests/reports/api_tests.py @@ -40,6 +40,7 @@ ) from tests.base_tests import SupersetTestCase +from tests.reports.utils import insert_report_schedule from superset.utils.core import get_example_database @@ -47,48 +48,6 @@ class TestReportSchedulesApi(SupersetTestCase): - def insert_report_schedule( - self, - type: str, - name: str, - crontab: str, - sql: Optional[str] = None, - description: Optional[str] = None, - chart: Optional[Slice] = None, - dashboard: Optional[Dashboard] = None, - database: Optional[Database] = None, - owners: Optional[List[User]] = None, - validator_type: Optional[str] = None, - validator_config_json: Optional[str] = None, - log_retention: Optional[int] = None, - grace_period: Optional[int] = None, - recipients: Optional[List[ReportRecipients]] = None, - logs: Optional[List[ReportExecutionLog]] = None, - ) -> ReportSchedule: - owners = owners or [] - recipients = recipients or [] - logs = logs or [] - report_schedule = ReportSchedule( - type=type, - name=name, - crontab=crontab, - sql=sql, - description=description, - chart=chart, - dashboard=dashboard, - database=database, - owners=owners, - validator_type=validator_type, - validator_config_json=validator_config_json, - log_retention=log_retention, - grace_period=grace_period, - recipients=recipients, - logs=logs, - ) - db.session.add(report_schedule) - db.session.commit() - return report_schedule - @pytest.fixture() def create_report_schedules(self): with self.create_app().app_context(): @@ -116,7 +75,7 @@ def create_report_schedules(self): ) ) report_schedules.append( - self.insert_report_schedule( + insert_report_schedule( type=ReportScheduleType.ALERT, name=f"name{cx}", crontab=f"*/{cx} * * * *", diff --git a/tests/reports/commands_tests.py b/tests/reports/commands_tests.py new file mode 100644 index 0000000000000..f81ec2ed71c7a --- /dev/null +++ b/tests/reports/commands_tests.py @@ -0,0 +1,446 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import json +from datetime import datetime +from typing import List, Optional +from unittest.mock import patch + +import pytest +from freezegun import freeze_time +from sqlalchemy.sql import func + +from superset import db +from superset.models.core import Database +from superset.models.dashboard import Dashboard +from superset.models.reports import ( + ReportExecutionLog, + ReportLogState, + ReportRecipients, + ReportRecipientType, + ReportSchedule, + ReportScheduleType, + ReportScheduleValidatorType, +) +from superset.models.slice import Slice +from superset.reports.commands.exceptions import ( + ReportScheduleNotFoundError, + ReportScheduleNotificationError, + ReportSchedulePreviousWorkingError, +) +from superset.reports.commands.execute import AsyncExecuteReportScheduleCommand +from superset.utils.core import get_example_database +from tests.reports.utils import insert_report_schedule +from tests.test_app import app +from tests.utils import read_fixture + + +def get_target_from_report_schedule(report_schedule) -> List[str]: + return [ + json.loads(recipient.recipient_config_json)["target"] + for recipient in report_schedule.recipients + ] + + +def assert_success_log(): + logs = db.session.query(ReportExecutionLog).all() + assert len(logs) == 1 + assert logs[0].scheduled_dttm == datetime.utcnow() + assert logs[0].error_message == None + assert logs[0].state == ReportLogState.SUCCESS + + +def create_report_notification( + email_target: Optional[str] = None, + slack_channel: Optional[str] = None, + chart: Optional[Slice] = None, + dashboard: Optional[Dashboard] = None, + database: Optional[Database] = None, + sql: Optional[str] = None, + report_type: Optional[str] = None, + validator_type: Optional[str] = None, + validator_config_json: Optional[str] = None, +) -> ReportSchedule: + + report_type = report_type or ReportScheduleType.REPORT + target = email_target or slack_channel + config_json = {"target": target} + + if slack_channel: + recipient = ReportRecipients( + type=ReportRecipientType.SLACK, + recipient_config_json=json.dumps(config_json), + ) + else: + recipient = ReportRecipients( + type=ReportRecipientType.EMAIL, + recipient_config_json=json.dumps(config_json), + ) + + report_schedule = insert_report_schedule( + type=report_type, + name=f"report", + crontab=f"0 9 * * *", + description=f"Daily report", + sql=sql, + chart=chart, + dashboard=dashboard, + database=database, + recipients=[recipient], + validator_type=validator_type, + validator_config_json=validator_config_json, + ) + return report_schedule + + +@pytest.yield_fixture() +def create_report_email_chart(): + with app.app_context(): + chart = db.session.query(Slice).first() + report_schedule = create_report_notification( + email_target="target@email.com", chart=chart + ) + yield report_schedule + + db.session.delete(report_schedule) + db.session.commit() + + +@pytest.yield_fixture() +def create_report_email_dashboard(): + with app.app_context(): + dashboard = db.session.query(Dashboard).first() + report_schedule = create_report_notification( + email_target="target@email.com", dashboard=dashboard + ) + yield report_schedule + + db.session.delete(report_schedule) + db.session.commit() + + +@pytest.yield_fixture() +def create_report_slack_chart(): + with app.app_context(): + chart = db.session.query(Slice).first() + report_schedule = create_report_notification( + slack_channel="slack_channel", chart=chart + ) + yield report_schedule + + db.session.delete(report_schedule) + db.session.commit() + + +@pytest.yield_fixture() +def create_report_slack_chart_working(): + with app.app_context(): + chart = db.session.query(Slice).first() + report_schedule = create_report_notification( + slack_channel="slack_channel", chart=chart + ) + report_schedule.last_state = ReportLogState.WORKING + db.session.commit() + yield report_schedule + + db.session.delete(report_schedule) + db.session.commit() + + +@pytest.yield_fixture( + params=["alert1", "alert2", "alert3", "alert4", "alert5", "alert6"] +) +def create_alert_email_chart(request): + param_config = { + "alert1": { + "sql": "SELECT 10 as metric", + "validator_type": ReportScheduleValidatorType.OPERATOR, + "validator_config_json": '{"op": ">", "threshold": 9}', + }, + "alert2": { + "sql": "SELECT 10 as metric", + "validator_type": ReportScheduleValidatorType.OPERATOR, + "validator_config_json": '{"op": ">=", "threshold": 10}', + }, + "alert3": { + "sql": "SELECT 10 as metric", + "validator_type": ReportScheduleValidatorType.OPERATOR, + "validator_config_json": '{"op": "<", "threshold": 11}', + }, + "alert4": { + "sql": "SELECT 10 as metric", + "validator_type": ReportScheduleValidatorType.OPERATOR, + "validator_config_json": '{"op": "<=", "threshold": 10}', + }, + "alert5": { + "sql": "SELECT 10 as metric", + "validator_type": ReportScheduleValidatorType.OPERATOR, + "validator_config_json": '{"op": "!=", "threshold": 11}', + }, + "alert6": { + "sql": "SELECT 'something' as metric", + "validator_type": ReportScheduleValidatorType.NOT_NULL, + "validator_config_json": "{}", + }, + } + with app.app_context(): + chart = db.session.query(Slice).first() + example_database = get_example_database() + + report_schedule = create_report_notification( + email_target="target@email.com", + chart=chart, + report_type=ReportScheduleType.ALERT, + database=example_database, + sql=param_config[request.param]["sql"], + validator_type=param_config[request.param]["validator_type"], + validator_config_json=param_config[request.param]["validator_config_json"], + ) + yield report_schedule + + db.session.delete(report_schedule) + db.session.commit() + + +@pytest.yield_fixture(params=["alert1", "alert2", "alert3", "alert4", "alert5"]) +def create_no_alert_email_chart(request): + param_config = { + "alert1": { + "sql": "SELECT 10 as metric", + "validator_type": ReportScheduleValidatorType.OPERATOR, + "validator_config_json": '{"op": "<", "threshold": 10}', + }, + "alert2": { + "sql": "SELECT 10 as metric", + "validator_type": ReportScheduleValidatorType.OPERATOR, + "validator_config_json": '{"op": ">=", "threshold": 11}', + }, + "alert3": { + "sql": "SELECT 10 as metric", + "validator_type": ReportScheduleValidatorType.OPERATOR, + "validator_config_json": '{"op": "<", "threshold": 10}', + }, + "alert4": { + "sql": "SELECT 10 as metric", + "validator_type": ReportScheduleValidatorType.OPERATOR, + "validator_config_json": '{"op": "<=", "threshold": 9}', + }, + "alert5": { + "sql": "SELECT 10 as metric", + "validator_type": ReportScheduleValidatorType.OPERATOR, + "validator_config_json": '{"op": "!=", "threshold": 10}', + }, + } + with app.app_context(): + chart = db.session.query(Slice).first() + example_database = get_example_database() + + report_schedule = create_report_notification( + email_target="target@email.com", + chart=chart, + report_type=ReportScheduleType.ALERT, + database=example_database, + sql=param_config[request.param]["sql"], + validator_type=param_config[request.param]["validator_type"], + validator_config_json=param_config[request.param]["validator_config_json"], + ) + yield report_schedule + + db.session.delete(report_schedule) + db.session.commit() + + +@pytest.mark.usefixtures("create_report_email_chart") +@patch("superset.reports.notifications.email.send_email_smtp") +@patch("superset.utils.screenshots.ChartScreenshot.compute_and_cache") +def test_email_chart_report_schedule( + screenshot_mock, email_mock, create_report_email_chart +): + """ + ExecuteReport Command: Test chart email report schedule + """ + # setup screenshot mock + screenshot = read_fixture("sample.png") + screenshot_mock.return_value = screenshot + + with freeze_time("2020-01-01T00:00:00Z"): + AsyncExecuteReportScheduleCommand( + create_report_email_chart.id, datetime.utcnow() + ).run() + + notification_targets = get_target_from_report_schedule( + create_report_email_chart + ) + # Assert the email smtp address + assert email_mock.call_args[0][0] == notification_targets[0] + # Assert the email inline screenshot + smtp_images = email_mock.call_args[1]["images"] + assert smtp_images[list(smtp_images.keys())[0]] == screenshot + # Assert logs are correct + assert_success_log() + + +@pytest.mark.usefixtures("create_report_email_dashboard") +@patch("superset.reports.notifications.email.send_email_smtp") +@patch("superset.utils.screenshots.DashboardScreenshot.compute_and_cache") +def test_email_dashboard_report_schedule( + screenshot_mock, email_mock, create_report_email_dashboard +): + """ + ExecuteReport Command: Test dashboard email report schedule + """ + # setup screenshot mock + screenshot = read_fixture("sample.png") + screenshot_mock.return_value = screenshot + + with freeze_time("2020-01-01T00:00:00Z"): + AsyncExecuteReportScheduleCommand( + create_report_email_dashboard.id, datetime.utcnow() + ).run() + + notification_targets = get_target_from_report_schedule( + create_report_email_dashboard + ) + # Assert the email smtp address + assert email_mock.call_args[0][0] == notification_targets[0] + # Assert the email inline screenshot + smtp_images = email_mock.call_args[1]["images"] + assert smtp_images[list(smtp_images.keys())[0]] == screenshot + # Assert logs are correct + assert_success_log() + + +@pytest.mark.usefixtures("create_report_slack_chart") +@patch("superset.reports.notifications.slack.WebClient.files_upload") +@patch("superset.utils.screenshots.ChartScreenshot.compute_and_cache") +def test_slack_chart_report_schedule( + screenshot_mock, file_upload_mock, create_report_slack_chart +): + """ + ExecuteReport Command: Test chart slack report schedule + """ + # setup screenshot mock + screenshot = read_fixture("sample.png") + screenshot_mock.return_value = screenshot + + with freeze_time("2020-01-01T00:00:00Z"): + AsyncExecuteReportScheduleCommand( + create_report_slack_chart.id, datetime.utcnow() + ).run() + + notification_targets = get_target_from_report_schedule( + create_report_slack_chart + ) + assert file_upload_mock.call_args[1]["channels"] == notification_targets[0] + assert file_upload_mock.call_args[1]["file"] == screenshot + + # Assert logs are correct + assert_success_log() + + +@pytest.mark.usefixtures("create_report_slack_chart") +def test_report_schedule_not_found(create_report_slack_chart): + """ + ExecuteReport Command: Test report schedule not found + """ + max_id = db.session.query(func.max(ReportSchedule.id)).scalar() + with pytest.raises(ReportScheduleNotFoundError): + AsyncExecuteReportScheduleCommand(max_id + 1, datetime.utcnow()).run() + + +@pytest.mark.usefixtures("create_report_slack_chart_working") +def test_report_schedule_working(create_report_slack_chart_working): + """ + ExecuteReport Command: Test report schedule still working + """ + # setup screenshot mock + with pytest.raises(ReportSchedulePreviousWorkingError): + AsyncExecuteReportScheduleCommand( + create_report_slack_chart_working.id, datetime.utcnow() + ).run() + + logs = db.session.query(ReportExecutionLog).all() + assert len(logs) == 1 + assert logs[0].error_message == ReportSchedulePreviousWorkingError.message + assert logs[0].state == ReportLogState.ERROR + + assert create_report_slack_chart_working.last_state == ReportLogState.WORKING + + +@pytest.mark.usefixtures("create_report_email_dashboard") +@patch("superset.reports.notifications.email.send_email_smtp") +@patch("superset.utils.screenshots.DashboardScreenshot.compute_and_cache") +def test_email_dashboard_report_fails( + screenshot_mock, email_mock, create_report_email_dashboard +): + """ + ExecuteReport Command: Test dashboard email report schedule notification fails + """ + # setup screenshot mock + from smtplib import SMTPException + + screenshot = read_fixture("sample.png") + screenshot_mock.return_value = screenshot + email_mock.side_effect = SMTPException("Could not connect to SMTP XPTO") + + with pytest.raises(ReportScheduleNotificationError): + AsyncExecuteReportScheduleCommand( + create_report_email_dashboard.id, datetime.utcnow() + ).run() + + logs = db.session.query(ReportExecutionLog).all() + assert len(logs) == 1 + assert logs[0].error_message == "Could not connect to SMTP XPTO" + assert logs[0].state == ReportLogState.ERROR + + +@pytest.mark.usefixtures("create_alert_email_chart") +@patch("superset.reports.notifications.email.send_email_smtp") +@patch("superset.utils.screenshots.ChartScreenshot.compute_and_cache") +def test_slack_chart_alert(screenshot_mock, email_mock, create_alert_email_chart): + """ + ExecuteReport Command: Test chart slack alert + """ + # setup screenshot mock + screenshot = read_fixture("sample.png") + screenshot_mock.return_value = screenshot + + with freeze_time("2020-01-01T00:00:00Z"): + AsyncExecuteReportScheduleCommand( + create_alert_email_chart.id, datetime.utcnow() + ).run() + + notification_targets = get_target_from_report_schedule(create_alert_email_chart) + # Assert the email smtp address + assert email_mock.call_args[0][0] == notification_targets[0] + # Assert the email inline screenshot + smtp_images = email_mock.call_args[1]["images"] + assert smtp_images[list(smtp_images.keys())[0]] == screenshot + # Assert logs are correct + assert_success_log() + + +@pytest.mark.usefixtures("create_no_alert_email_chart") +def test_slack_chart_no_alert(create_no_alert_email_chart): + """ + ExecuteReport Command: Test chart slack no alert + """ + with freeze_time("2020-01-01T00:00:00Z"): + AsyncExecuteReportScheduleCommand( + create_no_alert_email_chart.id, datetime.utcnow() + ).run() + db.session.commit() + assert create_no_alert_email_chart.last_state == ReportLogState.NOOP diff --git a/tests/reports/utils.py b/tests/reports/utils.py new file mode 100644 index 0000000000000..841ae4d975207 --- /dev/null +++ b/tests/reports/utils.py @@ -0,0 +1,68 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import List, Optional + +from flask_appbuilder.security.sqla.models import User + +from superset import db +from superset.models.core import Database +from superset.models.dashboard import Dashboard +from superset.models.reports import ReportExecutionLog, ReportRecipients, ReportSchedule +from superset.models.slice import Slice + + +def insert_report_schedule( + type: str, + name: str, + crontab: str, + sql: Optional[str] = None, + description: Optional[str] = None, + chart: Optional[Slice] = None, + dashboard: Optional[Dashboard] = None, + database: Optional[Database] = None, + owners: Optional[List[User]] = None, + validator_type: Optional[str] = None, + validator_config_json: Optional[str] = None, + log_retention: Optional[int] = None, + grace_period: Optional[int] = None, + recipients: Optional[List[ReportRecipients]] = None, + logs: Optional[List[ReportExecutionLog]] = None, +) -> ReportSchedule: + owners = owners or [] + recipients = recipients or [] + logs = logs or [] + report_schedule = ReportSchedule( + type=type, + name=name, + crontab=crontab, + sql=sql, + description=description, + chart=chart, + dashboard=dashboard, + database=database, + owners=owners, + validator_type=validator_type, + validator_config_json=validator_config_json, + log_retention=log_retention, + grace_period=grace_period, + recipients=recipients, + logs=logs, + ) + db.session.add(report_schedule) + db.session.commit() + return report_schedule From 91751aab8de5e92d925cba048507455c14f084ee Mon Sep 17 00:00:00 2001 From: Daniel Gaspar Date: Thu, 19 Nov 2020 09:51:15 +0000 Subject: [PATCH 08/16] test --- tests/reports/commands_tests.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/reports/commands_tests.py b/tests/reports/commands_tests.py index f81ec2ed71c7a..9abb08cfc5c16 100644 --- a/tests/reports/commands_tests.py +++ b/tests/reports/commands_tests.py @@ -106,7 +106,7 @@ def create_report_notification( return report_schedule -@pytest.yield_fixture() +@pytest.yield_fixture(scope="module") def create_report_email_chart(): with app.app_context(): chart = db.session.query(Slice).first() @@ -280,6 +280,7 @@ def test_email_chart_report_schedule( AsyncExecuteReportScheduleCommand( create_report_email_chart.id, datetime.utcnow() ).run() + db.session.commit() notification_targets = get_target_from_report_schedule( create_report_email_chart @@ -442,5 +443,5 @@ def test_slack_chart_no_alert(create_no_alert_email_chart): AsyncExecuteReportScheduleCommand( create_no_alert_email_chart.id, datetime.utcnow() ).run() - db.session.commit() - assert create_no_alert_email_chart.last_state == ReportLogState.NOOP + db.session.commit() + assert create_no_alert_email_chart.last_state == ReportLogState.NOOP From 260838b8c454934527a46e7c2f935bc27f1ddb22 Mon Sep 17 00:00:00 2001 From: Daniel Gaspar Date: Thu, 19 Nov 2020 10:25:17 +0000 Subject: [PATCH 09/16] test --- tests/reports/commands_tests.py | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/tests/reports/commands_tests.py b/tests/reports/commands_tests.py index 9abb08cfc5c16..9cf479d6ee814 100644 --- a/tests/reports/commands_tests.py +++ b/tests/reports/commands_tests.py @@ -56,6 +56,7 @@ def get_target_from_report_schedule(report_schedule) -> List[str]: def assert_success_log(): + db.session.commit() logs = db.session.query(ReportExecutionLog).all() assert len(logs) == 1 assert logs[0].scheduled_dttm == datetime.utcnow() @@ -74,11 +75,9 @@ def create_report_notification( validator_type: Optional[str] = None, validator_config_json: Optional[str] = None, ) -> ReportSchedule: - report_type = report_type or ReportScheduleType.REPORT target = email_target or slack_channel config_json = {"target": target} - if slack_channel: recipient = ReportRecipients( type=ReportRecipientType.SLACK, @@ -106,7 +105,7 @@ def create_report_notification( return report_schedule -@pytest.yield_fixture(scope="module") +@pytest.yield_fixture() def create_report_email_chart(): with app.app_context(): chart = db.session.query(Slice).first() @@ -215,7 +214,9 @@ def create_alert_email_chart(request): db.session.commit() -@pytest.yield_fixture(params=["alert1", "alert2", "alert3", "alert4", "alert5"]) +@pytest.yield_fixture( + params=["alert1", "alert2", "alert3", "alert4", "alert5", "alert6"] +) def create_no_alert_email_chart(request): param_config = { "alert1": { @@ -243,10 +244,21 @@ def create_no_alert_email_chart(request): "validator_type": ReportScheduleValidatorType.OPERATOR, "validator_config_json": '{"op": "!=", "threshold": 10}', }, + "alert6": { + "sql": "SELECT first from test_table where first=0", + "validator_type": ReportScheduleValidatorType.NOT_NULL, + "validator_config_json": "{}", + }, } with app.app_context(): chart = db.session.query(Slice).first() example_database = get_example_database() + example_database.get_sqla_engine().execute( + "CREATE TABLE test_table AS SELECT 1 as first, 2 as second" + ) + example_database.get_sqla_engine().execute( + "INSERT INTO test_table (first, second) VALUES (3, 4)" + ) report_schedule = create_report_notification( email_target="target@email.com", @@ -261,6 +273,7 @@ def create_no_alert_email_chart(request): db.session.delete(report_schedule) db.session.commit() + example_database.get_sqla_engine().execute("DROP TABLE test_table") @pytest.mark.usefixtures("create_report_email_chart") @@ -280,7 +293,6 @@ def test_email_chart_report_schedule( AsyncExecuteReportScheduleCommand( create_report_email_chart.id, datetime.utcnow() ).run() - db.session.commit() notification_targets = get_target_from_report_schedule( create_report_email_chart From 32e40e3a7414d2299cc53a550c7cfb19e39d8360 Mon Sep 17 00:00:00 2001 From: Daniel Gaspar Date: Thu, 19 Nov 2020 11:03:28 +0000 Subject: [PATCH 10/16] more tests --- tests/reports/commands_tests.py | 145 +++++++++++++++++++++++--------- 1 file changed, 104 insertions(+), 41 deletions(-) diff --git a/tests/reports/commands_tests.py b/tests/reports/commands_tests.py index 9cf479d6ee814..6f3a6307acbd8 100644 --- a/tests/reports/commands_tests.py +++ b/tests/reports/commands_tests.py @@ -20,6 +20,7 @@ from unittest.mock import patch import pytest +from contextlib2 import contextmanager from freezegun import freeze_time from sqlalchemy.sql import func @@ -37,6 +38,8 @@ ) from superset.models.slice import Slice from superset.reports.commands.exceptions import ( + AlertQueryMultipleColumnsError, + AlertQueryMultipleRowsError, ReportScheduleNotFoundError, ReportScheduleNotificationError, ReportSchedulePreviousWorkingError, @@ -55,13 +58,12 @@ def get_target_from_report_schedule(report_schedule) -> List[str]: ] -def assert_success_log(): +def assert_log(state: str, error_message: Optional[str] = None): db.session.commit() logs = db.session.query(ReportExecutionLog).all() assert len(logs) == 1 - assert logs[0].scheduled_dttm == datetime.utcnow() - assert logs[0].error_message == None - assert logs[0].state == ReportLogState.SUCCESS + assert logs[0].error_message == error_message + assert logs[0].state == state def create_report_notification( @@ -160,7 +162,7 @@ def create_report_slack_chart_working(): @pytest.yield_fixture( - params=["alert1", "alert2", "alert3", "alert4", "alert5", "alert6"] + params=["alert1", "alert2", "alert3", "alert4", "alert5", "alert6", "alert7"] ) def create_alert_email_chart(request): param_config = { @@ -194,6 +196,11 @@ def create_alert_email_chart(request): "validator_type": ReportScheduleValidatorType.NOT_NULL, "validator_config_json": "{}", }, + "alert7": { + "sql": "SELECT {{ 5 + 5 }} as metric", + "validator_type": ReportScheduleValidatorType.OPERATOR, + "validator_config_json": '{"op": "!=", "threshold": 11}', + }, } with app.app_context(): chart = db.session.query(Slice).first() @@ -214,6 +221,22 @@ def create_alert_email_chart(request): db.session.commit() +@contextmanager +def create_test_table_context(database: Database): + database.get_sqla_engine().execute( + "CREATE TABLE test_table AS SELECT 1 as first, 2 as second" + ) + database.get_sqla_engine().execute( + "INSERT INTO test_table (first, second) VALUES (1, 2)" + ) + database.get_sqla_engine().execute( + "INSERT INTO test_table (first, second) VALUES (3, 4)" + ) + + yield db.session + database.get_sqla_engine().execute("DROP TABLE test_table") + + @pytest.yield_fixture( params=["alert1", "alert2", "alert3", "alert4", "alert5", "alert6"] ) @@ -253,27 +276,59 @@ def create_no_alert_email_chart(request): with app.app_context(): chart = db.session.query(Slice).first() example_database = get_example_database() - example_database.get_sqla_engine().execute( - "CREATE TABLE test_table AS SELECT 1 as first, 2 as second" - ) - example_database.get_sqla_engine().execute( - "INSERT INTO test_table (first, second) VALUES (3, 4)" - ) + with create_test_table_context(example_database): + + report_schedule = create_report_notification( + email_target="target@email.com", + chart=chart, + report_type=ReportScheduleType.ALERT, + database=example_database, + sql=param_config[request.param]["sql"], + validator_type=param_config[request.param]["validator_type"], + validator_config_json=param_config[request.param][ + "validator_config_json" + ], + ) + yield report_schedule + + db.session.delete(report_schedule) + db.session.commit() + + +@pytest.yield_fixture(params=["alert1", "alert2"]) +def create_mul_alert_email_chart(request): + param_config = { + "alert1": { + "sql": "SELECT first from test_table", + "validator_type": ReportScheduleValidatorType.OPERATOR, + "validator_config_json": '{"op": "<", "threshold": 10}', + }, + "alert2": { + "sql": "SELECT first, second from test_table", + "validator_type": ReportScheduleValidatorType.OPERATOR, + "validator_config_json": '{"op": "<", "threshold": 10}', + }, + } + with app.app_context(): + chart = db.session.query(Slice).first() + example_database = get_example_database() + with create_test_table_context(example_database): - report_schedule = create_report_notification( - email_target="target@email.com", - chart=chart, - report_type=ReportScheduleType.ALERT, - database=example_database, - sql=param_config[request.param]["sql"], - validator_type=param_config[request.param]["validator_type"], - validator_config_json=param_config[request.param]["validator_config_json"], - ) - yield report_schedule + report_schedule = create_report_notification( + email_target="target@email.com", + chart=chart, + report_type=ReportScheduleType.ALERT, + database=example_database, + sql=param_config[request.param]["sql"], + validator_type=param_config[request.param]["validator_type"], + validator_config_json=param_config[request.param][ + "validator_config_json" + ], + ) + yield report_schedule - db.session.delete(report_schedule) - db.session.commit() - example_database.get_sqla_engine().execute("DROP TABLE test_table") + db.session.delete(report_schedule) + db.session.commit() @pytest.mark.usefixtures("create_report_email_chart") @@ -303,7 +358,7 @@ def test_email_chart_report_schedule( smtp_images = email_mock.call_args[1]["images"] assert smtp_images[list(smtp_images.keys())[0]] == screenshot # Assert logs are correct - assert_success_log() + assert_log(ReportLogState.SUCCESS) @pytest.mark.usefixtures("create_report_email_dashboard") @@ -333,7 +388,7 @@ def test_email_dashboard_report_schedule( smtp_images = email_mock.call_args[1]["images"] assert smtp_images[list(smtp_images.keys())[0]] == screenshot # Assert logs are correct - assert_success_log() + assert_log(ReportLogState.SUCCESS) @pytest.mark.usefixtures("create_report_slack_chart") @@ -361,7 +416,7 @@ def test_slack_chart_report_schedule( assert file_upload_mock.call_args[1]["file"] == screenshot # Assert logs are correct - assert_success_log() + assert_log(ReportLogState.SUCCESS) @pytest.mark.usefixtures("create_report_slack_chart") @@ -385,11 +440,9 @@ def test_report_schedule_working(create_report_slack_chart_working): create_report_slack_chart_working.id, datetime.utcnow() ).run() - logs = db.session.query(ReportExecutionLog).all() - assert len(logs) == 1 - assert logs[0].error_message == ReportSchedulePreviousWorkingError.message - assert logs[0].state == ReportLogState.ERROR - + assert_log( + ReportLogState.ERROR, error_message=ReportSchedulePreviousWorkingError.message + ) assert create_report_slack_chart_working.last_state == ReportLogState.WORKING @@ -414,10 +467,7 @@ def test_email_dashboard_report_fails( create_report_email_dashboard.id, datetime.utcnow() ).run() - logs = db.session.query(ReportExecutionLog).all() - assert len(logs) == 1 - assert logs[0].error_message == "Could not connect to SMTP XPTO" - assert logs[0].state == ReportLogState.ERROR + assert_log(ReportLogState.ERROR, error_message="Could not connect to SMTP XPTO") @pytest.mark.usefixtures("create_alert_email_chart") @@ -443,17 +493,30 @@ def test_slack_chart_alert(screenshot_mock, email_mock, create_alert_email_chart smtp_images = email_mock.call_args[1]["images"] assert smtp_images[list(smtp_images.keys())[0]] == screenshot # Assert logs are correct - assert_success_log() + assert_log(ReportLogState.SUCCESS) @pytest.mark.usefixtures("create_no_alert_email_chart") -def test_slack_chart_no_alert(create_no_alert_email_chart): +def test_email_chart_no_alert(create_no_alert_email_chart): """ - ExecuteReport Command: Test chart slack no alert + ExecuteReport Command: Test chart email no alert """ with freeze_time("2020-01-01T00:00:00Z"): AsyncExecuteReportScheduleCommand( create_no_alert_email_chart.id, datetime.utcnow() ).run() - db.session.commit() - assert create_no_alert_email_chart.last_state == ReportLogState.NOOP + assert_log(ReportLogState.NOOP) + + +@pytest.mark.usefixtures("create_mul_alert_email_chart") +def test_email_mul_alert(create_mul_alert_email_chart): + """ + ExecuteReport Command: Test chart email multiple rows + """ + with freeze_time("2020-01-01T00:00:00Z"): + with pytest.raises( + (AlertQueryMultipleRowsError, AlertQueryMultipleColumnsError) + ): + AsyncExecuteReportScheduleCommand( + create_mul_alert_email_chart.id, datetime.utcnow() + ).run() From f78dd9ea9a5892fbb51492876bbfbf101437c6b5 Mon Sep 17 00:00:00 2001 From: Daniel Gaspar Date: Thu, 19 Nov 2020 11:43:29 +0000 Subject: [PATCH 11/16] fix report API test --- tests/reports/api_tests.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/tests/reports/api_tests.py b/tests/reports/api_tests.py index ae415700fe2e3..26dbe165768a8 100644 --- a/tests/reports/api_tests.py +++ b/tests/reports/api_tests.py @@ -128,10 +128,6 @@ def test_get_report_schedule(self): "last_value_row_json": report_schedule.last_value_row_json, "log_retention": report_schedule.log_retention, "name": report_schedule.name, - "owners": [ - {"first_name": "admin", "id": 1, "last_name": "user"}, - {"first_name": "alpha", "id": 5, "last_name": "user"}, - ], "recipients": [ { "id": report_schedule.recipients[0].id, @@ -143,7 +139,16 @@ def test_get_report_schedule(self): "validator_config_json": report_schedule.validator_config_json, "validator_type": report_schedule.validator_type, } - assert data["result"] == expected_result + for key in expected_result: + assert data["result"][key] == expected_result[key] + # needed because order may vary + assert {"first_name": "admin", "id": 1, "last_name": "user"} in data["result"][ + "owners" + ] + assert {"first_name": "alpha", "id": 5, "last_name": "user"} in data["result"][ + "owners" + ] + assert len(data["result"]["owners"]) == 2 def test_info_report_schedule(self): """ From 44923b47d18d966a4641a5f5fec16f2103b7bf66 Mon Sep 17 00:00:00 2001 From: Daniel Gaspar Date: Thu, 19 Nov 2020 13:32:57 +0000 Subject: [PATCH 12/16] test MySQL test fail --- superset/reports/commands/alert.py | 17 ++++++++--------- tests/reports/commands_tests.py | 2 ++ 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/superset/reports/commands/alert.py b/superset/reports/commands/alert.py index 6c8d042a76990..81e1d6acf9b41 100644 --- a/superset/reports/commands/alert.py +++ b/superset/reports/commands/alert.py @@ -65,6 +65,12 @@ def validate(self) -> None: if df.empty: return rows = df.to_records() + if ( + self._report_schedule.validator_type + == ReportScheduleValidatorType.NOT_NULL + ): + self._result = rows[0][1] + return # check if query return more then one row if len(rows) > 1: raise AlertQueryMultipleRowsError() @@ -73,15 +79,8 @@ def validate(self) -> None: if rows[0][1] is None: return try: - - if ( - self._report_schedule.validator_type - == ReportScheduleValidatorType.NOT_NULL - ): - self._result = rows[0][1] - else: - # Check if it's float or if we can convert it - self._result = float(rows[0][1]) + # Check if it's float or if we can convert it + self._result = float(rows[0][1]) return except (AssertionError, TypeError, ValueError): raise AlertQueryInvalidTypeError() diff --git a/tests/reports/commands_tests.py b/tests/reports/commands_tests.py index 6f3a6307acbd8..ecda5e4e553ca 100644 --- a/tests/reports/commands_tests.py +++ b/tests/reports/commands_tests.py @@ -327,6 +327,8 @@ def create_mul_alert_email_chart(request): ) yield report_schedule + # needed for MySQL + report_schedule.logs = [] db.session.delete(report_schedule) db.session.commit() From 89abae3e45d58b20758f410f4a01c6b790d1ae46 Mon Sep 17 00:00:00 2001 From: Daniel Gaspar Date: Thu, 19 Nov 2020 13:53:57 +0000 Subject: [PATCH 13/16] delete-orphan --- superset/models/reports.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/superset/models/reports.py b/superset/models/reports.py index aeeebeb8929c5..7b1f1831399d9 100644 --- a/superset/models/reports.py +++ b/superset/models/reports.py @@ -177,6 +177,6 @@ class ReportExecutionLog(Model): # pylint: disable=too-few-public-methods ) report_schedule = relationship( ReportSchedule, - backref=backref("logs", cascade="all,delete"), + backref=backref("logs", cascade="all,delete,delete-orphan"), foreign_keys=[report_schedule_id], ) From 094a3c75128d9af330611408a54937c433d6d624 Mon Sep 17 00:00:00 2001 From: Daniel Gaspar Date: Thu, 19 Nov 2020 14:40:45 +0000 Subject: [PATCH 14/16] fix MySQL tests --- superset/reports/commands/alert.py | 5 +---- tests/reports/commands_tests.py | 9 ++++++++- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/superset/reports/commands/alert.py b/superset/reports/commands/alert.py index 81e1d6acf9b41..ea15d10f5d269 100644 --- a/superset/reports/commands/alert.py +++ b/superset/reports/commands/alert.py @@ -65,10 +65,7 @@ def validate(self) -> None: if df.empty: return rows = df.to_records() - if ( - self._report_schedule.validator_type - == ReportScheduleValidatorType.NOT_NULL - ): + if self._report_schedule.validator_type == ReportScheduleValidatorType.NOT_NULL: self._result = rows[0][1] return # check if query return more then one row diff --git a/tests/reports/commands_tests.py b/tests/reports/commands_tests.py index ecda5e4e553ca..d5566944e537f 100644 --- a/tests/reports/commands_tests.py +++ b/tests/reports/commands_tests.py @@ -328,7 +328,14 @@ def create_mul_alert_email_chart(request): yield report_schedule # needed for MySQL - report_schedule.logs = [] + logs = ( + db.session.query(ReportExecutionLog) + .filter(ReportExecutionLog.report_schedule == report_schedule) + .all() + ) + for log in logs: + db.session.delete(log) + db.session.commit() db.session.delete(report_schedule) db.session.commit() From c5ccd12ef513f77c5362757629621759f5742722 Mon Sep 17 00:00:00 2001 From: Daniel Gaspar Date: Mon, 23 Nov 2020 16:48:01 +0000 Subject: [PATCH 15/16] address comments --- superset/exceptions.py | 4 ++- superset/reports/commands/alert.py | 48 +++++++++++++++++-------- superset/reports/commands/exceptions.py | 1 + 3 files changed, 37 insertions(+), 16 deletions(-) diff --git a/superset/exceptions.py b/superset/exceptions.py index c0d55f8924426..fd95a59e2cfa7 100644 --- a/superset/exceptions.py +++ b/superset/exceptions.py @@ -25,7 +25,9 @@ class SupersetException(Exception): status = 500 message = "" - def __init__(self, message: str = "", exception: Optional[Exception] = None): + def __init__( + self, message: str = "", exception: Optional[Exception] = None, + ) -> None: if message: self.message = message self._exception = exception diff --git a/superset/reports/commands/alert.py b/superset/reports/commands/alert.py index ea15d10f5d269..18d9b483365f4 100644 --- a/superset/reports/commands/alert.py +++ b/superset/reports/commands/alert.py @@ -20,6 +20,7 @@ from typing import Optional import numpy as np +from flask_babel import lazy_gettext as _ from superset import jinja_context from superset.commands.base import BaseCommand @@ -52,6 +53,36 @@ def run(self) -> bool: threshold = json.loads(self._report_schedule.validator_config_json)["threshold"] return OPERATOR_FUNCTIONS[operator](self._result, threshold) + def _validate_not_null(self, rows: np.recarray) -> None: + self._result = rows[0][1] + return + + def _validate_operator(self, rows: np.recarray) -> None: + # check if query return more then one row + if len(rows) > 1: + raise AlertQueryMultipleRowsError( + message=_( + "Alert query returned more then one row. %s rows returned" + % len(rows), + ) + ) + # check if query returned more then one column + if len(rows[0]) > 2: + raise AlertQueryMultipleColumnsError( + _( + "Alert query returned more then one column. %s columns returned" + % len(rows[0]) + ) + ) + if rows[0][1] is None: + return + try: + # Check if it's float or if we can convert it + self._result = float(rows[0][1]) + return + except (AssertionError, TypeError, ValueError): + raise AlertQueryInvalidTypeError() + def validate(self) -> None: """ Validate the query result as a Pandas DataFrame @@ -66,18 +97,5 @@ def validate(self) -> None: return rows = df.to_records() if self._report_schedule.validator_type == ReportScheduleValidatorType.NOT_NULL: - self._result = rows[0][1] - return - # check if query return more then one row - if len(rows) > 1: - raise AlertQueryMultipleRowsError() - if len(rows[0]) > 2: - raise AlertQueryMultipleColumnsError() - if rows[0][1] is None: - return - try: - # Check if it's float or if we can convert it - self._result = float(rows[0][1]) - return - except (AssertionError, TypeError, ValueError): - raise AlertQueryInvalidTypeError() + return self._validate_not_null(rows) + return self._validate_operator(rows) diff --git a/superset/reports/commands/exceptions.py b/superset/reports/commands/exceptions.py index 5615599cc8115..3a56a49b9d7f2 100644 --- a/superset/reports/commands/exceptions.py +++ b/superset/reports/commands/exceptions.py @@ -129,6 +129,7 @@ def __init__(self) -> None: class AlertQueryMultipleRowsError(CommandException): + message = _("Alert query returned more then one row.") From d0e37708a128f1a53d9fc5a1977794b9147fa57a Mon Sep 17 00:00:00 2001 From: Daniel Gaspar Date: Mon, 23 Nov 2020 17:32:42 +0000 Subject: [PATCH 16/16] lint --- superset/reports/commands/alert.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/superset/reports/commands/alert.py b/superset/reports/commands/alert.py index 18d9b483365f4..cab294ce5c29b 100644 --- a/superset/reports/commands/alert.py +++ b/superset/reports/commands/alert.py @@ -55,7 +55,6 @@ def run(self) -> bool: def _validate_not_null(self, rows: np.recarray) -> None: self._result = rows[0][1] - return def _validate_operator(self, rows: np.recarray) -> None: # check if query return more then one row @@ -97,5 +96,6 @@ def validate(self) -> None: return rows = df.to_records() if self._report_schedule.validator_type == ReportScheduleValidatorType.NOT_NULL: - return self._validate_not_null(rows) - return self._validate_operator(rows) + self._validate_not_null(rows) + return + self._validate_operator(rows)