diff --git a/superset-frontend/src/dashboard/index.jsx b/superset-frontend/src/dashboard/index.jsx index 9fe82346c3247..78ba1d2aa9698 100644 --- a/superset-frontend/src/dashboard/index.jsx +++ b/superset-frontend/src/dashboard/index.jsx @@ -38,7 +38,9 @@ const initState = getInitialState(bootstrapData); const asyncEventMiddleware = initAsyncEvents({ config: bootstrapData.common.conf, getPendingComponents: ({ charts }) => - Object.values(charts).filter(c => c.chartStatus === 'loading'), + Object.values(charts).filter( + c => c.chartStatus === 'loading' && c.asyncJobId !== undefined, + ), successAction: (componentId, componentData) => actions.chartUpdateSucceeded(componentData, componentId), errorAction: (componentId, response) => diff --git a/superset-frontend/src/explore/index.jsx b/superset-frontend/src/explore/index.jsx index 83e4bc63dc9ca..7698dee01ad22 100644 --- a/superset-frontend/src/explore/index.jsx +++ b/superset-frontend/src/explore/index.jsx @@ -40,7 +40,9 @@ const initState = getInitialState(bootstrapData); const asyncEventMiddleware = initAsyncEvents({ config: bootstrapData.common.conf, getPendingComponents: ({ charts }) => - Object.values(charts).filter(c => c.chartStatus === 'loading'), + Object.values(charts).filter( + c => c.chartStatus === 'loading' && c.asyncJobId !== undefined, + ), successAction: (componentId, componentData) => actions.chartUpdateSucceeded(componentData, componentId), errorAction: (componentId, response) => diff --git a/superset-frontend/src/middleware/asyncEvent.ts b/superset-frontend/src/middleware/asyncEvent.ts index 20caad91406ae..1beb997118552 100644 --- a/superset-frontend/src/middleware/asyncEvent.ts +++ b/superset-frontend/src/middleware/asyncEvent.ts @@ -119,8 +119,7 @@ const initAsyncEvents = (options: AsyncEventOptions) => { }; const processEvents = async () => { - const state = store.getState(); - const queuedComponents = getPendingComponents(state); + let queuedComponents = getPendingComponents(store.getState()); const eventArgs = lastReceivedEventId ? { last_id: lastReceivedEventId } : {}; @@ -128,6 +127,9 @@ const initAsyncEvents = (options: AsyncEventOptions) => { if (queuedComponents && queuedComponents.length) { try { const { result: events } = await fetchEvents(eventArgs); + // refetch queuedComponents due to race condition where results are available + // before component state is updated with asyncJobId + queuedComponents = getPendingComponents(store.getState()); if (events && events.length) { const componentsByJobId = queuedComponents.reduce((acc, item) => { acc[item.asyncJobId] = item; diff --git a/superset/tasks/celery_app.py b/superset/tasks/celery_app.py index d84273f4ee710..f8b9bef0d7328 100644 --- a/superset/tasks/celery_app.py +++ b/superset/tasks/celery_app.py @@ -19,13 +19,16 @@ This is the main entrypoint used by Celery workers. As such, it needs to call create_app() in order to initialize things properly """ +from typing import Any + +from celery.signals import worker_process_init # Superset framework imports from superset import create_app -from superset.extensions import celery_app +from superset.extensions import celery_app, db # Init the Flask app / configure everything -create_app() +flask_app = create_app() # Need to import late, as the celery_app will have been setup by "create_app()" # pylint: disable=wrong-import-position, unused-import @@ -33,3 +36,10 @@ # Export the celery app globally for Celery (as run on the cmd line) to find app = celery_app + + +@worker_process_init.connect +def reset_db_connection_pool(**kwargs: Any) -> None: # pylint: disable=unused-argument + with flask_app.app_context(): + # https://docs.sqlalchemy.org/en/14/core/connections.html#engine-disposal + db.engine.dispose()