Skip to content

Commit

Permalink
fix: use nullpool in the celery workers (#10819)
Browse files Browse the repository at this point in the history
* Use nullpool in the celery workers

* Address comments

Co-authored-by: bogdan kyryliuk <[email protected]>
  • Loading branch information
bkyryliuk and bogdan-dbx authored Sep 10, 2020
1 parent dd7f3d5 commit ac2937a
Show file tree
Hide file tree
Showing 7 changed files with 234 additions and 193 deletions.
12 changes: 9 additions & 3 deletions superset/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from superset.app import create_app
from superset.extensions import celery_app, db
from superset.utils import core as utils
from superset.utils.celery import session_scope
from superset.utils.urls import get_url_path

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -619,6 +620,11 @@ def alert() -> None:
from superset.tasks.schedules import schedule_window

click.secho("Processing one alert loop", fg="green")
schedule_window(
ScheduleType.alert, datetime.now() - timedelta(1000), datetime.now(), 6000
)
with session_scope(nullpool=True) as session:
schedule_window(
report_type=ScheduleType.alert,
start_at=datetime.now() - timedelta(1000),
stop_at=datetime.now(),
resolution=6000,
session=session,
)
47 changes: 5 additions & 42 deletions superset/sql_lab.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,25 @@
from contextlib import closing
from datetime import datetime
from sys import getsizeof
from typing import Any, cast, Dict, Iterator, List, Optional, Tuple, Union
from typing import Any, cast, Dict, List, Optional, Tuple, Union

import backoff
import msgpack
import pyarrow as pa
import simplejson as json
import sqlalchemy
from celery.exceptions import SoftTimeLimitExceeded
from celery.task.base import Task
from contextlib2 import contextmanager
from flask_babel import lazy_gettext as _
from sqlalchemy.orm import Session, sessionmaker
from sqlalchemy.pool import NullPool

from superset import (
app,
db,
results_backend,
results_backend_use_msgpack,
security_manager,
)
from sqlalchemy.orm import Session

from superset import app, results_backend, results_backend_use_msgpack, security_manager
from superset.dataframe import df_to_records
from superset.db_engine_specs import BaseEngineSpec
from superset.extensions import celery_app
from superset.models.sql_lab import Query
from superset.result_set import SupersetResultSet
from superset.sql_parse import ParsedQuery
from superset.utils.celery import session_scope
from superset.utils.core import (
json_iso_dttm_ser,
QuerySource,
Expand Down Expand Up @@ -121,35 +113,6 @@ def get_query(query_id: int, session: Session) -> Query:
raise SqlLabException("Failed at getting query")


@contextmanager
def session_scope(nullpool: bool) -> Iterator[Session]:
"""Provide a transactional scope around a series of operations."""
database_uri = app.config["SQLALCHEMY_DATABASE_URI"]
if "sqlite" in database_uri:
logger.warning(
"SQLite Database support for metadata databases will be removed \
in a future version of Superset."
)
if nullpool:
engine = sqlalchemy.create_engine(database_uri, poolclass=NullPool)
session_class = sessionmaker()
session_class.configure(bind=engine)
session = session_class()
else:
session = db.session()
session.commit() # HACK

try:
yield session
session.commit()
except Exception as ex:
session.rollback()
logger.exception(ex)
raise
finally:
session.close()


@celery_app.task(
name="sql_lab.get_sql_results",
bind=True,
Expand Down
12 changes: 7 additions & 5 deletions superset/tasks/alerts/observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,24 @@
from typing import Optional

import pandas as pd
from sqlalchemy.orm import Session

from superset import db
from superset.models.alerts import Alert, SQLObservation
from superset.sql_parse import ParsedQuery

logger = logging.getLogger("tasks.email_reports")


def observe(alert_id: int) -> Optional[str]:
# Session needs to be passed along in the celery workers and db.session cannot be used.
# For more info see: https://github.com/apache/incubator-superset/issues/10530
def observe(alert_id: int, session: Session) -> Optional[str]:
"""
Runs the SQL query in an alert's SQLObserver and then
stores the result in a SQLObservation.
Returns an error message if the observer value was not valid
"""

alert = db.session.query(Alert).filter_by(id=alert_id).one()
alert = session.query(Alert).filter_by(id=alert_id).one()
sql_observer = alert.sql_observer[0]

value = None
Expand All @@ -57,8 +59,8 @@ def observe(alert_id: int) -> Optional[str]:
error_msg=error_msg,
)

db.session.add(observation)
db.session.commit()
session.add(observation)
session.commit()

return error_msg

Expand Down
Loading

0 comments on commit ac2937a

Please sign in to comment.