Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add event and interval annotation support to chart data ep #11665

Merged
merged 3 commits into from
Dec 4, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion superset-frontend/src/chart/chartAction.js
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,10 @@ export function exploreJSON(
});
});

const annotationLayers = formData.annotation_layers || [];
// only retrieve annotations when calling the legacy API
const annotationLayers = shouldUseLegacyApi(formData)
? formData.annotation_layers || []
: [];
const isDashboardRequest = dashboardId > 0;

return Promise.all([
Expand Down
1 change: 1 addition & 0 deletions superset/charts/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,7 @@ def data(self) -> Response:
"""
if request.is_json:
json_body = request.json
print(json_body)
villebro marked this conversation as resolved.
Show resolved Hide resolved
elif request.form.get("form_data"):
# CSV export submits regular form data
json_body = json.loads(request.form["form_data"])
Expand Down
5 changes: 2 additions & 3 deletions superset/charts/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from superset.common.query_context import QueryContext
from superset.utils import schema as utils
from superset.utils.core import (
AnnotationType,
FilterOperator,
PostProcessingBoxplotWhiskerType,
PostProcessingContributionOrientation,
Expand Down Expand Up @@ -783,9 +784,7 @@ class ChartDataExtrasSchema(Schema):
class AnnotationLayerSchema(Schema):
annotationType = fields.String(
description="Type of annotation layer",
validate=validate.OneOf(
choices=("EVENT", "FORMULA", "INTERVAL", "TIME_SERIES",)
),
validate=validate.OneOf(choices=[ann.value for ann in AnnotationType]),
)
color = fields.String(description="Layer color", allow_none=True,)
descriptionColumns = fields.List(
Expand Down
101 changes: 91 additions & 10 deletions superset/common/query_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,17 @@
from flask_babel import gettext as _

from superset import app, db, is_feature_enabled
from superset.annotation_layers.dao import AnnotationLayerDAO
from superset.common.query_object import QueryObject
from superset.connectors.base.models import BaseDatasource
from superset.connectors.connector_registry import ConnectorRegistry
from superset.exceptions import QueryObjectValidationError
from superset.exceptions import QueryObjectValidationError, SupersetException
from superset.extensions import cache_manager, security_manager
from superset.models.slice import Slice
from superset.stats_logger import BaseStatsLogger
from superset.utils import core as utils
from superset.utils.core import DTTM_ALIAS
from superset.views.utils import get_viz
from superset.viz import set_and_log_cache

config = app.config
Expand Down Expand Up @@ -157,8 +160,7 @@ def get_single_payload(self, query_obj: QueryObject) -> Dict[str, Any]:
query_obj.row_offset = 0
query_obj.columns = [o.column_name for o in self.datasource.columns]
payload = self.get_df_payload(query_obj)
# TODO: implement
payload["annotation_data"] = []

df = payload["df"]
status = payload["status"]
if status != utils.QueryStatus.FAILED:
Expand Down Expand Up @@ -220,7 +222,80 @@ def cache_key(self, query_obj: QueryObject, **kwargs: Any) -> Optional[str]:
)
return cache_key

def get_df_payload( # pylint: disable=too-many-statements
@staticmethod
def get_native_annotation_data(query_obj: QueryObject) -> Dict[str, Any]:
annotation_data = {}
annotation_layers = [
layer
for layer in query_obj.annotation_layers
if layer["sourceType"] == "NATIVE"
]
layer_ids = [layer["value"] for layer in annotation_layers]
layer_objects = {
layer_object.id: layer_object
for layer_object in AnnotationLayerDAO.find_by_ids(layer_ids)
}

# annotations
for layer in annotation_layers:
layer_id = layer["value"]
layer_name = layer["name"]
columns = [
"start_dttm",
"end_dttm",
"short_descr",
"long_descr",
"json_metadata",
]
layer_object = layer_objects[layer_id]
records = [
{column: getattr(annotation, column) for column in columns}
for annotation in layer_object.annotation
]
result = {"columns": columns, "records": records}
annotation_data[layer_name] = result
return annotation_data

@staticmethod
def get_viz_annotation_data(
annotation_layer: Dict[str, Any], force: bool
) -> Dict[str, Any]:
slice_id = annotation_layer["value"]
slc = db.session.query(Slice).filter_by(id=slice_id).one_or_none()
villebro marked this conversation as resolved.
Show resolved Hide resolved
form_data = slc.form_data.copy()
if not slc:
raise QueryObjectValidationError("The slice does not exist")
try:
viz_obj = get_viz(
datasource_type=slc.datasource.type,
datasource_id=slc.datasource.id,
form_data=form_data,
force=force,
)
payload = viz_obj.get_payload()
return payload["data"]
except SupersetException as ex:
raise QueryObjectValidationError(utils.error_msg_from_exception(ex))

def get_annotation_data(self, query_obj: QueryObject) -> Dict[str, Any]:
"""

:param query_obj:
:return:
"""
annotation_data: Dict[str, Any] = self.get_native_annotation_data(query_obj)
for annotation_layer in [
layer
for layer in query_obj.annotation_layers
if layer["sourceType"] in ("line", "table")
]:
name = annotation_layer["name"]
annotation_data[name] = self.get_viz_annotation_data(
annotation_layer, self.force
)
return annotation_data

def get_df_payload( # pylint: disable=too-many-statements,too-many-locals
self, query_obj: QueryObject, **kwargs: Any
) -> Dict[str, Any]:
"""Handles caching around the df payload retrieval"""
Expand All @@ -233,6 +308,7 @@ def get_df_payload( # pylint: disable=too-many-statements
cache_value = None
status = None
query = ""
annotation_data = {}
error_message = None
if cache_key and cache_manager.data_cache and not self.force:
cache_value = cache_manager.data_cache.get(cache_key)
Expand All @@ -241,6 +317,7 @@ def get_df_payload( # pylint: disable=too-many-statements
try:
df = cache_value["df"]
query = cache_value["query"]
annotation_data = cache_value.get("annotation_data", {})
status = utils.QueryStatus.SUCCESS
is_loaded = True
stats_logger.incr("loaded_from_cache")
Expand Down Expand Up @@ -272,6 +349,8 @@ def get_df_payload( # pylint: disable=too-many-statements
query = query_result["query"]
error_message = query_result["error_message"]
df = query_result["df"]
annotation_data = self.get_annotation_data(query_obj)

if status != utils.QueryStatus.FAILED:
stats_logger.incr("loaded_from_source")
if not self.force:
Expand All @@ -289,18 +368,20 @@ def get_df_payload( # pylint: disable=too-many-statements

if is_loaded and cache_key and status != utils.QueryStatus.FAILED:
set_and_log_cache(
cache_key,
df,
query,
cached_dttm,
self.cache_timeout,
self.datasource.uid,
cache_key=cache_key,
df=df,
query=query,
annotation_data=annotation_data,
cached_dttm=cached_dttm,
cache_timeout=self.cache_timeout,
datasource_uid=self.datasource.uid,
)
return {
"cache_key": cache_key,
"cached_dttm": cache_value["dttm"] if cache_value is not None else None,
"cache_timeout": self.cache_timeout,
"df": df,
"annotation_data": annotation_data,
"error": error_message,
"is_cached": cache_value is not None,
"query": query,
Expand Down
30 changes: 28 additions & 2 deletions superset/common/query_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,12 @@ def __init__(
metrics = metrics or []
extras = extras or {}
is_sip_38 = is_feature_enabled("SIP_38_VIZ_REARCHITECTURE")
self.annotation_layers = annotation_layers
self.annotation_layers = [
layer
for layer in annotation_layers
# formula annotations don't affect the payload, hence can be dropped
if layer["annotationType"] != "FORMULA"
]
self.applied_time_extras = applied_time_extras or {}
self.granularity = granularity
self.from_dttm, self.to_dttm = utils.get_since_until(
Expand Down Expand Up @@ -236,10 +241,31 @@ def cache_key(self, **extra: Any) -> str:
cache_dict["time_range"] = self.time_range
if self.post_processing:
cache_dict["post_processing"] = self.post_processing

annotation_fields = [
"annotationType",
"descriptionColumns",
"intervalEndColumn",
"name",
"overrides",
"sourceType",
"timeColumn",
"titleColumn",
"value",
]
annotation_layers = [
{field: layer[field] for field in annotation_fields if field in layer}
for layer in self.annotation_layers
]
# only add to key if there are annotations present that affect the payload
if annotation_layers:
cache_dict["annotation_layers"] = annotation_layers

json_data = self.json_dumps(cache_dict, sort_keys=True)
return hashlib.md5(json_data.encode("utf-8")).hexdigest()

def json_dumps(self, obj: Any, sort_keys: bool = False) -> str:
@staticmethod
def json_dumps(obj: Any, sort_keys: bool = False) -> str:
return json.dumps(
obj, default=utils.json_int_dttm_ser, ignore_nan=True, sort_keys=sort_keys
)
Expand Down
7 changes: 7 additions & 0 deletions superset/utils/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1591,6 +1591,13 @@ class ExtraFiltersTimeColumnType(str, Enum):
TIME_RANGE = "__time_range"


class AnnotationType(str, Enum):
FORMULA = "FORMULA"
INTERVAL = "INTERVAL"
EVENT = "EVENT"
TIME_SERIES = "TIME_SERIES"


def is_test() -> bool:
return strtobool(os.environ.get("SUPERSET_TESTENV", "false"))

Expand Down
17 changes: 10 additions & 7 deletions superset/viz.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,12 @@ def set_and_log_cache(
cached_dttm: str,
cache_timeout: int,
datasource_uid: Optional[str],
annotation_data: Optional[Dict[str, Any]] = None,
) -> None:
try:
cache_value = dict(dttm=cached_dttm, df=df, query=query)
cache_value = dict(
dttm=cached_dttm, df=df, query=query, annotation_data=annotation_data or {}
)
stats_logger.incr("set_cache_key")
cache_manager.data_cache.set(cache_key, cache_value, timeout=cache_timeout)

Expand Down Expand Up @@ -587,12 +590,12 @@ def get_df_payload(

if is_loaded and cache_key and self.status != utils.QueryStatus.FAILED:
set_and_log_cache(
cache_key,
df,
self.query,
cached_dttm,
self.cache_timeout,
self.datasource.uid,
cache_key=cache_key,
df=df,
query=self.query,
cached_dttm=cached_dttm,
cache_timeout=self.cache_timeout,
datasource_uid=self.datasource.uid,
)
return {
"cache_key": self._any_cache_key,
Expand Down
Loading