Skip to content

Commit

Permalink
feat: new reports scheduler (#11711)
Browse files Browse the repository at this point in the history
* feat(reports): scheduler and delivery system

* working version

* improvements and fix grace_period

* add tests and fix bugs

* fix report API test

* test MySQL test fail

* delete-orphans

* fix MySQL tests

* address comments

* lint
  • Loading branch information
dpgaspar committed Nov 25, 2020
1 parent 501b9d4 commit f27ebc4
Show file tree
Hide file tree
Showing 19 changed files with 1,499 additions and 59 deletions.
8 changes: 5 additions & 3 deletions superset/dao/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from flask_appbuilder.models.sqla import Model
from flask_appbuilder.models.sqla.interface import SQLAInterface
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import Session

from superset.dao.exceptions import (
DAOConfigError,
Expand All @@ -46,13 +47,14 @@ class BaseDAO:
"""

@classmethod
def find_by_id(cls, model_id: int) -> Model:
def find_by_id(cls, model_id: int, session: Session = None) -> Model:
"""
Find a model by id, if defined applies `base_filter`
"""
query = db.session.query(cls.model_cls)
session = session or db.session
query = session.query(cls.model_cls)
if cls.base_filter:
data_model = SQLAInterface(cls.model_cls, db.session)
data_model = SQLAInterface(cls.model_cls, session)
query = cls.base_filter( # pylint: disable=not-callable
"id", data_model
).apply(query, None)
Expand Down
4 changes: 3 additions & 1 deletion superset/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ class SupersetException(Exception):
status = 500
message = ""

def __init__(self, message: str = "", exception: Optional[Exception] = None):
def __init__(
self, message: str = "", exception: Optional[Exception] = None,
) -> None:
if message:
self.message = message
self._exception = exception
Expand Down
4 changes: 3 additions & 1 deletion superset/models/reports.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ class ReportRecipientType(str, enum.Enum):

class ReportLogState(str, enum.Enum):
SUCCESS = "Success"
WORKING = "Working"
ERROR = "Error"
NOOP = "Not triggered"


class ReportEmailFormat(str, enum.Enum):
Expand Down Expand Up @@ -175,6 +177,6 @@ class ReportExecutionLog(Model): # pylint: disable=too-few-public-methods
)
report_schedule = relationship(
ReportSchedule,
backref=backref("logs", cascade="all,delete"),
backref=backref("logs", cascade="all,delete,delete-orphan"),
foreign_keys=[report_schedule_id],
)
101 changes: 101 additions & 0 deletions superset/reports/commands/alert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import json
import logging
from operator import eq, ge, gt, le, lt, ne
from typing import Optional

import numpy as np
from flask_babel import lazy_gettext as _

from superset import jinja_context
from superset.commands.base import BaseCommand
from superset.models.reports import ReportSchedule, ReportScheduleValidatorType
from superset.reports.commands.exceptions import (
AlertQueryInvalidTypeError,
AlertQueryMultipleColumnsError,
AlertQueryMultipleRowsError,
)

logger = logging.getLogger(__name__)


OPERATOR_FUNCTIONS = {">=": ge, ">": gt, "<=": le, "<": lt, "==": eq, "!=": ne}


class AlertCommand(BaseCommand):
def __init__(self, report_schedule: ReportSchedule):
self._report_schedule = report_schedule
self._result: Optional[float] = None

def run(self) -> bool:
self.validate()

if self._report_schedule.validator_type == ReportScheduleValidatorType.NOT_NULL:
self._report_schedule.last_value_row_json = self._result
return self._result not in (0, None, np.nan)
self._report_schedule.last_value = self._result
operator = json.loads(self._report_schedule.validator_config_json)["op"]
threshold = json.loads(self._report_schedule.validator_config_json)["threshold"]
return OPERATOR_FUNCTIONS[operator](self._result, threshold)

def _validate_not_null(self, rows: np.recarray) -> None:
self._result = rows[0][1]

def _validate_operator(self, rows: np.recarray) -> None:
# check if query return more then one row
if len(rows) > 1:
raise AlertQueryMultipleRowsError(
message=_(
"Alert query returned more then one row. %s rows returned"
% len(rows),
)
)
# check if query returned more then one column
if len(rows[0]) > 2:
raise AlertQueryMultipleColumnsError(
_(
"Alert query returned more then one column. %s columns returned"
% len(rows[0])
)
)
if rows[0][1] is None:
return
try:
# Check if it's float or if we can convert it
self._result = float(rows[0][1])
return
except (AssertionError, TypeError, ValueError):
raise AlertQueryInvalidTypeError()

def validate(self) -> None:
"""
Validate the query result as a Pandas DataFrame
"""
sql_template = jinja_context.get_template_processor(
database=self._report_schedule.database
)
rendered_sql = sql_template.process_template(self._report_schedule.sql)
df = self._report_schedule.database.get_df(rendered_sql)

if df.empty:
return
rows = df.to_records()
if self._report_schedule.validator_type == ReportScheduleValidatorType.NOT_NULL:
self._validate_not_null(rows)
return
self._validate_operator(rows)
37 changes: 37 additions & 0 deletions superset/reports/commands/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,47 @@ class ReportScheduleDeleteFailedError(CommandException):
message = _("Report Schedule delete failed.")


class PruneReportScheduleLogFailedError(CommandException):
message = _("Report Schedule log prune failed.")


class ReportScheduleScreenshotFailedError(CommandException):
message = _("Report Schedule execution failed when generating a screenshot.")


class ReportScheduleExecuteUnexpectedError(CommandException):
message = _("Report Schedule execution got an unexpected error.")


class ReportSchedulePreviousWorkingError(CommandException):
message = _("Report Schedule is still working, refusing to re-compute.")


class ReportScheduleNameUniquenessValidationError(ValidationError):
"""
Marshmallow validation error for Report Schedule name already exists
"""

def __init__(self) -> None:
super().__init__([_("Name must be unique")], field_name="name")


class AlertQueryMultipleRowsError(CommandException):

message = _("Alert query returned more then one row.")


class AlertQueryMultipleColumnsError(CommandException):
message = _("Alert query returned more then one column.")


class AlertQueryInvalidTypeError(CommandException):
message = _("Alert query returned a non-number value.")


class ReportScheduleAlertGracePeriodError(CommandException):
message = _("Alert fired during grace period.")


class ReportScheduleNotificationError(CommandException):
message = _("Alert on grace period")
Loading

0 comments on commit f27ebc4

Please sign in to comment.