Skip to content

Commit

Permalink
Rolls back session changes in #3365
Browse files Browse the repository at this point in the history
  • Loading branch information
kevgliss committed May 17, 2023
1 parent 54cb681 commit 699f121
Showing 1 changed file with 9 additions and 50 deletions.
59 changes: 9 additions & 50 deletions src/dispatch/decorators.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
from contextlib import _GeneratorContextManager, contextmanager
from functools import wraps
from typing import Any, Callable, List
import inspect
import logging
import time

from sqlalchemy.engine import Engine
from sqlalchemy.orm import scoped_session, Session

from dispatch.metrics import provider as metrics_provider
from dispatch.organization import service as organization_service
Expand All @@ -23,67 +20,30 @@ def fullname(o):
return f"{module.__name__}.{o.__qualname__}"


@contextmanager
def _session(
engine: Engine,
is_scoped: bool = False,
) -> None:
"""Provide a transactional scope around a series of operations."""
session = (
sessionmaker(bind=engine)()
if not is_scoped
else scoped_session(sessionmaker(bind=engine))()
)
try:
yield session
session.commit()
except Exception:
session.rollback()
raise Exception from None
finally:
session.close()


def _execute_task_in_project_context(
func: Callable,
is_scoped: bool = False,
*args,
**kwargs,
) -> None:
db_session = SessionLocal()
metrics_provider.counter("function.call.counter", tags={"function": fullname(func)})
start = time.perf_counter()

def __execute_task_within_session_context(
schema_session: _GeneratorContextManager[Session],
) -> None:
kwargs["db_session"] = schema_session
for project in project_service.get_all(db_session=schema_session):
project = schema_session.merge(project)
kwargs["project"] = project
try:
func(*args, **kwargs)
except Exception as e:
log.exception(e)

try:
# iterate for all schema
for organization in organization_service.get_all(db_session=db_session):
schema_engine = engine.execution_options(
schema_translate_map={None: f"dispatch_organization_{organization.slug}"}
)
if not is_scoped:
with _session(
engine=schema_engine,
is_scoped=False,
) as __session:
__execute_task_within_session_context(__session)
else:
with _session(
engine=schema_engine,
is_scoped=True,
) as __session:
__execute_task_within_session_context(__session)
schema_session = sessionmaker(bind=schema_engine)()
kwargs["db_session"] = schema_session
for project in project_service.get_all(db_session=schema_session):
kwargs["project"] = project
try:
func(*args, **kwargs)
except Exception as e:
log.exception(e)
schema_session.close()

elapsed_time = time.perf_counter() - start
metrics_provider.timer(
Expand All @@ -109,7 +69,6 @@ def wrapper(*args, **kwargs):
func,
*args,
**kwargs,
is_scoped=False,
)

return wrapper
Expand Down

0 comments on commit 699f121

Please sign in to comment.