Skip to content

Commit

Permalink
Fix error when running tasks with Sentry integration enabled. (#13929)
Browse files Browse the repository at this point in the history
Co-authored-by: Ash Berlin-Taylor <[email protected]>
(cherry picked from commit 0e8698d)
  • Loading branch information
junnplus authored and ashb committed Apr 15, 2021
1 parent ab8c558 commit 337edee
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 11 deletions.
13 changes: 10 additions & 3 deletions airflow/sentry.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from functools import wraps

from airflow.configuration import conf
from airflow.utils.session import provide_session
from airflow.utils.session import find_session_idx, provide_session
from airflow.utils.state import State

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -149,14 +149,21 @@ def add_breadcrumbs(self, task_instance, session=None):

def enrich_errors(self, func):
"""Wrap TaskInstance._run_raw_task to support task specific tags and breadcrumbs."""
session_args_idx = find_session_idx(func)

@wraps(func)
def wrapper(task_instance, *args, session=None, **kwargs):
def wrapper(task_instance, *args, **kwargs):
# Wrapping the _run_raw_task function with push_scope to contain
# tags and breadcrumbs to a specific Task Instance

try:
session = kwargs.get('session', args[session_args_idx])
except IndexError:
session = None

with sentry_sdk.push_scope():
try:
return func(task_instance, *args, session=session, **kwargs)
return func(task_instance, *args, **kwargs)
except Exception as e:
self.add_tagging(task_instance)
self.add_breadcrumbs(task_instance, session=session)
Expand Down
21 changes: 13 additions & 8 deletions airflow/utils/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,26 @@ def create_session():
RT = TypeVar("RT") # pylint: disable=invalid-name


def find_session_idx(func: Callable[..., RT]) -> int:
"""Find session index in function call parameter."""
func_params = signature(func).parameters
try:
# func_params is an ordered dict -- this is the "recommended" way of getting the position
session_args_idx = tuple(func_params).index("session")
except ValueError:
raise ValueError(f"Function {func.__qualname__} has no `session` argument") from None

return session_args_idx


def provide_session(func: Callable[..., RT]) -> Callable[..., RT]:
"""
Function decorator that provides a session if it isn't provided.
If you want to reuse a session or run the function as part of a
database transaction, you pass it to the function, if not this wrapper
will create one and close it for you.
"""
func_params = signature(func).parameters
try:
# func_params is an ordered dict -- this is the "recommended" way of getting the position
session_args_idx = tuple(func_params).index("session")
except ValueError:
raise ValueError(f"Function {func.__qualname__} has no `session` argument") from None
# We don't need this anymore -- ensure we don't keep a reference to it by mistake
del func_params
session_args_idx = find_session_idx(func)

@wraps(func)
def wrapper(*args, **kwargs) -> RT:
Expand Down
52 changes: 52 additions & 0 deletions tests/utils/test_session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import pytest

from airflow.utils.session import provide_session


class TestSession:
def dummy_session(self, session=None):
return session

def test_raised_provide_session(self):
with pytest.raises(ValueError, match="Function .*dummy has no `session` argument"):

@provide_session
def dummy():
pass

def test_provide_session_without_args_and_kwargs(self):
assert self.dummy_session() is None

wrapper = provide_session(self.dummy_session)

assert wrapper() is not None

def test_provide_session_with_args(self):
wrapper = provide_session(self.dummy_session)

session = object()
assert wrapper(session) is session

def test_provide_session_with_kwargs(self):
wrapper = provide_session(self.dummy_session)

session = object()
assert wrapper(session=session) is session

0 comments on commit 337edee

Please sign in to comment.