Skip to content

Commit

Permalink
fix(celery): Reset DB connection pools for forked worker processes (a…
Browse files Browse the repository at this point in the history
…pache#13350)

* Reset sqlalchemy connection pool on celery process fork

* Fix race condition with async chart loading state

* pylint: ignore

* prettier
  • Loading branch information
robdiciuccio authored and betodealmeida committed Mar 12, 2021
1 parent 82f3f14 commit a589da0
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 6 deletions.
4 changes: 3 additions & 1 deletion superset-frontend/src/dashboard/index.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ const initState = getInitialState(bootstrapData);
const asyncEventMiddleware = initAsyncEvents({
config: bootstrapData.common.conf,
getPendingComponents: ({ charts }) =>
Object.values(charts).filter(c => c.chartStatus === 'loading'),
Object.values(charts).filter(
c => c.chartStatus === 'loading' && c.asyncJobId !== undefined,
),
successAction: (componentId, componentData) =>
actions.chartUpdateSucceeded(componentData, componentId),
errorAction: (componentId, response) =>
Expand Down
4 changes: 3 additions & 1 deletion superset-frontend/src/explore/index.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ const initState = getInitialState(bootstrapData);
const asyncEventMiddleware = initAsyncEvents({
config: bootstrapData.common.conf,
getPendingComponents: ({ charts }) =>
Object.values(charts).filter(c => c.chartStatus === 'loading'),
Object.values(charts).filter(
c => c.chartStatus === 'loading' && c.asyncJobId !== undefined,
),
successAction: (componentId, componentData) =>
actions.chartUpdateSucceeded(componentData, componentId),
errorAction: (componentId, response) =>
Expand Down
6 changes: 4 additions & 2 deletions superset-frontend/src/middleware/asyncEvent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,17 @@ const initAsyncEvents = (options: AsyncEventOptions) => {
};

const processEvents = async () => {
const state = store.getState();
const queuedComponents = getPendingComponents(state);
let queuedComponents = getPendingComponents(store.getState());
const eventArgs = lastReceivedEventId
? { last_id: lastReceivedEventId }
: {};
const events: AsyncEvent[] = [];
if (queuedComponents && queuedComponents.length) {
try {
const { result: events } = await fetchEvents(eventArgs);
// refetch queuedComponents due to race condition where results are available
// before component state is updated with asyncJobId
queuedComponents = getPendingComponents(store.getState());
if (events && events.length) {
const componentsByJobId = queuedComponents.reduce((acc, item) => {
acc[item.asyncJobId] = item;
Expand Down
14 changes: 12 additions & 2 deletions superset/tasks/celery_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,27 @@
This is the main entrypoint used by Celery workers. As such,
it needs to call create_app() in order to initialize things properly
"""
from typing import Any

from celery.signals import worker_process_init

# Superset framework imports
from superset import create_app
from superset.extensions import celery_app
from superset.extensions import celery_app, db

# Init the Flask app / configure everything
create_app()
flask_app = create_app()

# Need to import late, as the celery_app will have been setup by "create_app()"
# pylint: disable=wrong-import-position, unused-import
from . import cache, schedules, scheduler # isort:skip

# Export the celery app globally for Celery (as run on the cmd line) to find
app = celery_app


@worker_process_init.connect
def reset_db_connection_pool(**kwargs: Any) -> None: # pylint: disable=unused-argument
with flask_app.app_context():
# https://docs.sqlalchemy.org/en/14/core/connections.html#engine-disposal
db.engine.dispose()

0 comments on commit a589da0

Please sign in to comment.