Skip to content

Commit

Permalink
feat: saved queries with execution info (#11391)
Browse files Browse the repository at this point in the history
* feat: add rows and last_run info to saved queries

* feat: add rows to saved query

* refactor and tests

* lint

* fix tests
  • Loading branch information
dpgaspar authored Oct 26, 2020
1 parent 604a519 commit 144b279
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""add exec info to saved queries
Revision ID: 585b0b1a7b18
Revises: af30ca79208f
Create Date: 2020-10-20 17:28:22.857694
"""

# revision identifiers, used by Alembic.
revision = "585b0b1a7b18"
down_revision = "af30ca79208f"

import sqlalchemy as sa
from alembic import op


def upgrade():
op.add_column("saved_query", sa.Column("last_run", sa.DateTime(), nullable=True))
op.add_column("saved_query", sa.Column("rows", sa.Integer(), nullable=True))


def downgrade():
op.drop_column("saved_query", "rows")
op.drop_column("saved_query", "last_run")
16 changes: 16 additions & 0 deletions superset/models/sql_lab.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import sqlalchemy as sqla
from flask import Markup
from flask_appbuilder import Model
from flask_appbuilder.models.decorators import renders
from humanize import naturaltime
from sqlalchemy import (
Boolean,
Column,
Expand Down Expand Up @@ -181,6 +183,8 @@ class SavedQuery(Model, AuditMixinNullable, ExtraJSONMixin):
foreign_keys=[db_id],
backref=backref("saved_queries", cascade="all, delete-orphan"),
)
rows = Column(Integer, nullable=True)
last_run = Column(DateTime, nullable=True)

def __repr__(self) -> str:
return str(self.label)
Expand Down Expand Up @@ -210,6 +214,18 @@ def url(self) -> str:
def sql_tables(self) -> List[Table]:
return list(ParsedQuery(self.sql).tables)

@property
def last_run_humanized(self) -> str:
return naturaltime(datetime.now() - self.changed_on)

@property
def _last_run_delta_humanized(self) -> str:
return naturaltime(datetime.now() - self.changed_on)

@renders("changed_on")
def last_run_delta_humanized(self) -> str:
return self._last_run_delta_humanized


class TabState(Model, AuditMixinNullable, ExtraJSONMixin):

Expand Down
24 changes: 23 additions & 1 deletion superset/queries/dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
# specific language governing permissions and limitations
# under the License.
import logging
from datetime import datetime

from superset.dao.base import BaseDAO
from superset.models.sql_lab import Query
from superset.extensions import db
from superset.models.sql_lab import Query, SavedQuery
from superset.queries.filters import QueryFilter

logger = logging.getLogger(__name__)
Expand All @@ -26,3 +28,23 @@
class QueryDAO(BaseDAO):
model_cls = Query
base_filter = QueryFilter

@staticmethod
def update_saved_query_exec_info(query_id: int) -> None:
"""
Propagates query execution info back to saved query if applicable
:param query_id: The query id
:return:
"""
query = db.session.query(Query).get(query_id)
related_saved_queries = (
db.session.query(SavedQuery)
.filter(SavedQuery.database == query.database)
.filter(SavedQuery.sql == query.sql)
).all()
if related_saved_queries:
for saved_query in related_saved_queries:
saved_query.rows = query.rows
saved_query.last_run = datetime.now()
db.session.commit()
4 changes: 4 additions & 0 deletions superset/queries/saved_queries/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ class SavedQueryRestApi(BaseSupersetModelRestApi):
"schema",
"sql",
"sql_tables",
"rows",
"last_run_delta_humanized",
]
add_columns = ["db_id", "description", "label", "schema", "sql"]
edit_columns = add_columns
Expand All @@ -96,10 +98,12 @@ class SavedQueryRestApi(BaseSupersetModelRestApi):
"label",
"description",
"sql",
"rows",
"created_by.first_name",
"database.database_name",
"created_on",
"changed_on_delta_humanized",
"last_run_delta_humanized",
]

search_columns = ["id", "database", "label", "schema"]
Expand Down
10 changes: 10 additions & 0 deletions superset/views/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
from superset.models.slice import Slice
from superset.models.sql_lab import Query, TabState
from superset.models.user_attributes import UserAttribute
from superset.queries.dao import QueryDAO
from superset.security.analytics_db_safety import (
check_sqlalchemy_uri,
DBSecurityException,
Expand Down Expand Up @@ -2144,6 +2145,7 @@ def _sql_json_async( # pylint: disable=too-many-arguments
"""
logger.info("Query %i: Running query on a Celery worker", query.id)
# Ignore the celery future object and the request may time out.
query_id = query.id
try:
task = sql_lab.get_sql_results.delay(
query.id,
Expand All @@ -2170,6 +2172,10 @@ def _sql_json_async( # pylint: disable=too-many-arguments
query.error_message = msg
session.commit()
return json_error_response("{}".format(msg))

# Update saved query with execution info from the query execution
QueryDAO.update_saved_query_exec_info(query_id)

resp = json_success(
json.dumps(
{"query": query.to_dict()},
Expand Down Expand Up @@ -2204,6 +2210,7 @@ def _sql_json_sync(
is_feature_enabled("SQLLAB_BACKEND_PERSISTENCE")
and not query.select_as_cta
)
query_id = query.id
with utils.timeout(seconds=timeout, error_message=timeout_msg):
# pylint: disable=no-value-for-parameter
data = sql_lab.get_sql_results(
Expand All @@ -2216,6 +2223,9 @@ def _sql_json_sync(
log_params=log_params,
)

# Update saved query if needed
QueryDAO.update_saved_query_exec_info(query_id)

payload = json.dumps(
apply_display_max_row_limit(data),
default=utils.pessimistic_json_iso_dttm_ser,
Expand Down
6 changes: 4 additions & 2 deletions tests/queries/saved_queries/api_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,10 @@ def test_get_list_sort_saved_query(self):
"""
admin = self.get_user("admin")
saved_queries = (
db.session.query(SavedQuery).filter(SavedQuery.created_by == admin).all()
)
db.session.query(SavedQuery)
.filter(SavedQuery.created_by == admin)
.order_by(SavedQuery.schema.asc())
).all()
self.login(username="admin")
query_string = {"order_column": "schema", "order_direction": "asc"}
uri = f"api/v1/saved_query/?q={prison.dumps(query_string)}"
Expand Down
32 changes: 31 additions & 1 deletion tests/sqllab_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from superset import db, security_manager
from superset.connectors.sqla.models import SqlaTable
from superset.db_engine_specs import BaseEngineSpec
from superset.models.sql_lab import Query
from superset.models.sql_lab import Query, SavedQuery
from superset.result_set import SupersetResultSet
from superset.sql_parse import CtasMethod
from superset.utils.core import (
Expand Down Expand Up @@ -71,6 +71,36 @@ def test_sql_json(self):
data = self.run_sql("SELECT * FROM unexistant_table", "2")
self.assertLess(0, len(data["error"]))

def test_sql_json_to_saved_query_info(self):
"""
SQLLab: Test SQLLab query execution info propagation to saved queries
"""
from freezegun import freeze_time

self.login("admin")

sql_statement = "SELECT * FROM birth_names LIMIT 10"
examples_db_id = get_example_database().id
saved_query = SavedQuery(db_id=examples_db_id, sql=sql_statement)
db.session.add(saved_query)
db.session.commit()

with freeze_time("2020-01-01T00:00:00Z"):
self.run_sql(sql_statement, "1")
saved_query_ = (
db.session.query(SavedQuery)
.filter(
SavedQuery.db_id == examples_db_id, SavedQuery.sql == sql_statement
)
.one_or_none()
)
assert saved_query_.rows is not None
assert saved_query_.last_run == datetime.now()

# Rollback changes
db.session.delete(saved_query_)
db.session.commit()

@parameterized.expand([CtasMethod.TABLE, CtasMethod.VIEW])
def test_sql_json_cta_dynamic_db(self, ctas_method):
examples_db = get_example_database()
Expand Down

0 comments on commit 144b279

Please sign in to comment.