Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Migrate /superset/queries/<last_updated_ms> to API v1 #22611

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
976 changes: 934 additions & 42 deletions docs/static/resources/openapi.json

Large diffs are not rendered by default.

20 changes: 15 additions & 5 deletions superset-frontend/src/SqlLab/components/QueryAutoRefresh/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
*/
import { useState } from 'react';
import { isObject } from 'lodash';
import rison from 'rison';
import {
SupersetClient,
Query,
runningQueryStateList,
QueryResponse,
} from '@superset-ui/core';
import { QueryDictionary } from 'src/SqlLab/types';
import useInterval from 'src/SqlLab/utils/useInterval';
Expand Down Expand Up @@ -62,22 +64,30 @@ function QueryAutoRefresh({
refreshQueries,
queriesLastUpdate,
}: QueryAutoRefreshProps) {
// We do not want to spam requests in the case of slow connections and potentially recieve responses out of order
// We do not want to spam requests in the case of slow connections and potentially receive responses out of order
// pendingRequest check ensures we only have one active http call to check for query statuses
const [pendingRequest, setPendingRequest] = useState(false);

const checkForRefresh = () => {
if (!pendingRequest && shouldCheckForQueries(queries)) {
const params = rison.encode({
last_updated_ms: queriesLastUpdate - QUERY_UPDATE_BUFFER_MS,
});

setPendingRequest(true);
SupersetClient.get({
endpoint: `/superset/queries/${
queriesLastUpdate - QUERY_UPDATE_BUFFER_MS
}`,
endpoint: `/api/v1/query/updated_since?q=${params}`,
timeout: QUERY_TIMEOUT_LIMIT,
})
.then(({ json }) => {
if (json) {
refreshQueries?.(json);
const jsonPayload = json as { result?: QueryResponse[] };
const queries =
jsonPayload?.result?.reduce((acc, current) => {
acc[current.id] = current;
return acc;
}, {}) ?? {};
refreshQueries?.(queries);
}
})
.catch(() => {})
Expand Down
1 change: 1 addition & 0 deletions superset/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ class RouteMethod: # pylint: disable=too-few-public-methods
"get_data": "read",
"samples": "read",
"delete_ssh_tunnel": "write",
"get_updated_since": "read",
"stop_query": "read",
}

Expand Down
62 changes: 61 additions & 1 deletion superset/queries/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
# specific language governing permissions and limitations
# under the License.
import logging
from typing import Any

import backoff
from flask_appbuilder.api import expose, protect, request, safe
from flask_appbuilder.api import expose, protect, request, rison, safe
from flask_appbuilder.models.sqla.interface import SQLAInterface

from superset import db, event_logger
Expand All @@ -29,6 +30,7 @@
from superset.queries.filters import QueryFilter
from superset.queries.schemas import (
openapi_spec_methods_override,
queries_get_updated_since_schema,
QuerySchema,
StopQuerySchema,
)
Expand Down Expand Up @@ -59,6 +61,11 @@ class QueryRestApi(BaseSupersetModelRestApi):
RouteMethod.RELATED,
RouteMethod.DISTINCT,
"stop_query",
"get_updated_since",
}

apispec_parameter_schemas = {
"queries_get_updated_since_schema": queries_get_updated_since_schema,
}

list_columns = [
Expand Down Expand Up @@ -142,6 +149,59 @@ class QueryRestApi(BaseSupersetModelRestApi):
allowed_rel_fields = {"database", "user"}
allowed_distinct_fields = {"status"}

@expose("/updated_since")
@protect()
@safe
@rison(queries_get_updated_since_schema)
@statsd_metrics
@event_logger.log_this_with_context(
action=lambda self, *args, **kwargs: f"{self.__class__.__name__}"
f".get_updated_since",
log_to_statsd=False,
)
def get_updated_since(self, **kwargs: Any) -> FlaskResponse:
"""Get a list of queries that changed after last_updated_ms
---
get:
summary: Get a list of queries that changed after last_updated_ms
parameters:
- in: query
name: q
content:
application/json:
schema:
$ref: '#/components/schemas/queries_get_updated_since_schema'
responses:
200:
description: Queries list
content:
application/json:
schema:
type: object
properties:
result:
description: >-
A List of queries that changed after last_updated_ms
type: array
items:
$ref: '#/components/schemas/{{self.__class__.__name__}}.get'
400:
$ref: '#/components/responses/400'
401:
$ref: '#/components/responses/401'
404:
$ref: '#/components/responses/404'
500:
$ref: '#/components/responses/500'
"""
try:
last_updated_ms = kwargs["rison"].get("last_updated_ms", 0)
queries = QueryDAO.get_queries_changed_after(last_updated_ms)
payload = [q.to_dict() for q in queries]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you replace this with a marshmallow schema dump instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem here is that this model is different than the way we're currently using schemas, the FE expects more params than the QuerySchema, most of them in camelCase. There're other apis, like the sql_json that do the same.
Should we make the change now for this or wait til the v1 transition is done to move ahead with that?

Copy link
Member

@dpgaspar dpgaspar Jan 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it, I was assuming this was using the to_dict method on FAB. I vote for making the transition now.
Actually I think we may not even need this endpoint at all, what do you think about making a new filter and leverage the get list endpoint?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having taking a closer look, here's what I see:

The to_dict object returns a different payload, more fields and in a different format.
The payload, as is, is returned by multiple endpoints, like /superset/sql_json, superset/results.
This object shape is used throughout sql lab, in the actions, reducers, components. It even bleeds to superset-ui-core.

The above, coupled with the sqllab not being fully typed would make this a big upgrade.
I think that migration should be separate as to not block v1 migration.

return self.response(200, result=payload)
except SupersetException as ex:
return self.response(ex.status, message=ex.message)

@expose("/stop", methods=["POST"])
@protect()
@safe
Expand Down
14 changes: 13 additions & 1 deletion superset/queries/dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.
import logging
from datetime import datetime
from typing import Any, Dict
from typing import Any, Dict, List, Union

from superset import sql_lab
from superset.common.db_query_status import QueryStatus
Expand All @@ -25,6 +25,7 @@
from superset.extensions import db
from superset.models.sql_lab import Query, SavedQuery
from superset.queries.filters import QueryFilter
from superset.utils.core import get_user_id
from superset.utils.dates import now_as_float

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -61,6 +62,17 @@ def save_metadata(query: Query, payload: Dict[str, Any]) -> None:
db.session.add(query)
query.set_extra_json_key("columns", columns)

@staticmethod
def get_queries_changed_after(last_updated_ms: Union[float, int]) -> List[Query]:
# UTC date time, same that is stored in the DB.
last_updated_dt = datetime.utcfromtimestamp(last_updated_ms / 1000)

return (
db.session.query(Query)
.filter(Query.user_id == get_user_id(), Query.changed_on >= last_updated_dt)
.all()
)

@staticmethod
def stop_query(client_id: str) -> None:
query = db.session.query(Query).filter_by(client_id=client_id).one_or_none()
Expand Down
8 changes: 8 additions & 0 deletions superset/queries/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@
},
}

queries_get_updated_since_schema = {
"type": "object",
"properties": {
"last_updated_ms": {"type": "number"},
},
"required": ["last_updated_ms"],
}


class DatabaseSchema(Schema):
database_name = fields.String()
Expand Down
1 change: 1 addition & 0 deletions superset/views/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2573,6 +2573,7 @@ def fetch_datasource_metadata(self) -> FlaskResponse: # pylint: disable=no-self
@event_logger.log_this
@expose("/queries/<float:last_updated_ms>")
@expose("/queries/<int:last_updated_ms>")
@deprecated()
def queries(self, last_updated_ms: Union[float, int]) -> FlaskResponse:
"""
Get the updated queries.
Expand Down
57 changes: 56 additions & 1 deletion tests/integration_tests/queries/api_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def insert_query(
rows: int = 100,
tab_name: str = "",
status: str = "success",
changed_on: datetime = datetime(2020, 1, 1),
) -> Query:
database = db.session.query(Database).get(database_id)
user = db.session.query(security_manager.user_model).get(user_id)
Expand All @@ -67,7 +68,7 @@ def insert_query(
rows=rows,
tab_name=tab_name,
status=status,
changed_on=datetime(2020, 1, 1),
changed_on=changed_on,
)
db.session.add(query)
db.session.commit()
Expand Down Expand Up @@ -394,6 +395,60 @@ def test_get_list_query_no_data_access(self):
db.session.delete(query)
db.session.commit()

def test_get_updated_since(self):
"""
Query API: Test get queries updated since timestamp
"""
now = datetime.utcnow()
client_id = self.get_random_string()

admin = self.get_user("admin")
example_db = get_example_database()

old_query = self.insert_query(
example_db.id,
admin.id,
self.get_random_string(),
sql="SELECT col1, col2 from table1",
select_sql="SELECT col1, col2 from table1",
executed_sql="SELECT col1, col2 from table1 LIMIT 100",
changed_on=now - timedelta(days=3),
)
updated_query = self.insert_query(
example_db.id,
admin.id,
client_id,
sql="SELECT col1, col2 from table1",
select_sql="SELECT col1, col2 from table1",
executed_sql="SELECT col1, col2 from table1 LIMIT 100",
changed_on=now - timedelta(days=1),
)

self.login(username="admin")
timestamp = datetime.timestamp(now - timedelta(days=2)) * 1000
uri = f"api/v1/query/updated_since?q={prison.dumps({'last_updated_ms': timestamp})}"
rv = self.client.get(uri)
self.assertEqual(rv.status_code, 200)

expected_result = updated_query.to_dict()
data = json.loads(rv.data.decode("utf-8"))
self.assertEqual(len(data["result"]), 1)
for key, value in data["result"][0].items():
# We can't assert timestamp
if key not in (
"changedOn",
"changed_on",
"end_time",
"start_running_time",
"start_time",
"id",
):
self.assertEqual(value, expected_result[key])
# rollback changes
db.session.delete(old_query)
db.session.delete(updated_query)
db.session.commit()

@mock.patch("superset.sql_lab.cancel_query")
@mock.patch("superset.views.core.db.session")
def test_stop_query_not_found(
Expand Down
56 changes: 56 additions & 0 deletions tests/unit_tests/dao/queries_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
import json
from datetime import datetime, timedelta
from typing import Any, Iterator

import pytest
Expand Down Expand Up @@ -58,6 +59,61 @@ def test_query_dao_save_metadata(session: Session) -> None:
assert query.extra.get("columns", None) == []


def test_query_dao_get_queries_changed_after(session: Session) -> None:
from superset.models.core import Database
from superset.models.sql_lab import Query

engine = session.get_bind()
Query.metadata.create_all(engine) # pylint: disable=no-member

db = Database(database_name="my_database", sqlalchemy_uri="sqlite://")

now = datetime.utcnow()

old_query_obj = Query(
client_id="foo",
database=db,
tab_name="test_tab",
sql_editor_id="test_editor_id",
sql="select * from bar",
select_sql="select * from bar",
executed_sql="select * from bar",
limit=100,
select_as_cta=False,
rows=100,
error_message="none",
results_key="abc",
changed_on=now - timedelta(days=3),
)

updated_query_obj = Query(
client_id="updated_foo",
database=db,
tab_name="test_tab",
sql_editor_id="test_editor_id",
sql="select * from foo",
select_sql="select * from foo",
executed_sql="select * from foo",
limit=100,
select_as_cta=False,
rows=100,
error_message="none",
results_key="abc",
changed_on=now - timedelta(days=1),
)

session.add(db)
session.add(old_query_obj)
session.add(updated_query_obj)

from superset.queries.dao import QueryDAO

timestamp = datetime.timestamp(now - timedelta(days=2)) * 1000
result = QueryDAO.get_queries_changed_after(timestamp)
assert len(result) == 1
assert result[0].client_id == "updated_foo"


def test_query_dao_stop_query_not_found(
mocker: MockFixture, app: Any, session: Session
) -> None:
Expand Down