Skip to content

Commit

Permalink
Fix issue with Flask instrumentation when a request spawn children th…
Browse files Browse the repository at this point in the history
…reads and copies the request context (open-telemetry#1654)
  • Loading branch information
hangonlyra authored and pridhi-arora committed Feb 25, 2023
1 parent 6c655d3 commit c4dbbfb
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 3 deletions.
9 changes: 8 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,23 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

### Added

- Add connection attributes to sqlalchemy connect span
([#1608](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1608))

### Fixed

- Fix Flask instrumentation to only close the span if it was created by the same thread.
([#1654](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1654))

## Version 1.16.0/0.37b0 (2023-02-17)

### Added

- Support `aio_pika` 9.x (([#1670](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1670])
- `opentelemetry-instrumentation-redis` Add `sanitize_query` config option to allow query sanitization. ([#1572](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1572))
- `opentelemetry-instrumentation-elasticsearch` Add optional db.statement query sanitization.
- `opentelemetry-instrumentation-elasticsearch` Add optional db.statement query sanitization.
([#1598](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1598))
- `opentelemetry-instrumentation-celery` Record exceptions as events on the span.
([#1573](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1573))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,8 @@ def response_hook(span: Span, status: str, response_headers: List):
API
---
"""

from logging import getLogger
from threading import get_ident
from time import time_ns
from timeit import default_timer
from typing import Collection
Expand All @@ -265,6 +265,7 @@ def response_hook(span: Span, status: str, response_headers: List):
_ENVIRON_STARTTIME_KEY = "opentelemetry-flask.starttime_key"
_ENVIRON_SPAN_KEY = "opentelemetry-flask.span_key"
_ENVIRON_ACTIVATION_KEY = "opentelemetry-flask.activation_key"
_ENVIRON_THREAD_ID_KEY = "opentelemetry-flask.thread_id_key"
_ENVIRON_TOKEN = "opentelemetry-flask.token"

_excluded_urls_from_env = get_excluded_urls("FLASK")
Expand Down Expand Up @@ -398,6 +399,7 @@ def _before_request():
activation = trace.use_span(span, end_on_exit=True)
activation.__enter__() # pylint: disable=E1101
flask_request_environ[_ENVIRON_ACTIVATION_KEY] = activation
flask_request_environ[_ENVIRON_THREAD_ID_KEY] = get_ident()
flask_request_environ[_ENVIRON_SPAN_KEY] = span
flask_request_environ[_ENVIRON_TOKEN] = token

Expand Down Expand Up @@ -437,10 +439,17 @@ def _teardown_request(exc):
return

activation = flask.request.environ.get(_ENVIRON_ACTIVATION_KEY)
if not activation:
thread_id = flask.request.environ.get(_ENVIRON_THREAD_ID_KEY)
if not activation or thread_id != get_ident():
# This request didn't start a span, maybe because it was created in
# a way that doesn't run `before_request`, like when it is created
# with `app.test_request_context`.
#
# Similarly, check the thread_id against the current thread to ensure
# tear down only happens on the original thread. This situation can
# arise if the original thread handling the request spawn children
# threads and then uses something like copy_current_request_context
# to copy the request context.
return
if exc is None:
activation.__exit__(None, None, None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from concurrent.futures import ThreadPoolExecutor, as_completed
from random import randint

import flask
from werkzeug.test import Client
from werkzeug.wrappers import Response
Expand All @@ -34,6 +37,25 @@ def _sqlcommenter_endpoint():
)
return sqlcommenter_flask_values

@staticmethod
def _multithreaded_endpoint(count):
def do_random_stuff():
@flask.copy_current_request_context
def inner():
return randint(0, 100)

return inner

executor = ThreadPoolExecutor(count)
futures = []
for _ in range(count):
futures.append(executor.submit(do_random_stuff()))
numbers = []
for future in as_completed(futures):
numbers.append(future.result())

return " ".join([str(i) for i in numbers])

@staticmethod
def _custom_response_headers():
resp = flask.Response("test response")
Expand Down Expand Up @@ -61,6 +83,7 @@ def excluded2_endpoint():
# pylint: disable=no-member
self.app.route("/hello/<int:helloid>")(self._hello_endpoint)
self.app.route("/sqlcommenter")(self._sqlcommenter_endpoint)
self.app.route("/multithreaded")(self._multithreaded_endpoint)
self.app.route("/excluded/<int:helloid>")(self._hello_endpoint)
self.app.route("/excluded")(excluded_endpoint)
self.app.route("/excluded2")(excluded2_endpoint)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import flask
from werkzeug.test import Client
from werkzeug.wrappers import Response

from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.test.wsgitestutil import WsgiTestBase

# pylint: disable=import-error
from .base_test import InstrumentationTest


class TestMultiThreading(InstrumentationTest, WsgiTestBase):
def setUp(self):
super().setUp()
FlaskInstrumentor().instrument()
self.app = flask.Flask(__name__)
self._common_initialization()

def tearDown(self):
super().tearDown()
with self.disable_logging():
FlaskInstrumentor().uninstrument()

def test_multithreaded(self):
"""Test that instrumentation tear down does not blow up
when the request thread spawn children threads and the request
context is copied to the children threads
"""
self.app = flask.Flask(__name__)
self.app.route("/multithreaded/<int:count>")(
self._multithreaded_endpoint
)
client = Client(self.app, Response)
count = 5
resp = client.get(f"/multithreaded/{count}")
self.assertEqual(200, resp.status_code)
# Should return the specified number of random integers
self.assertEqual(count, len(resp.text.split(" ")))

0 comments on commit c4dbbfb

Please sign in to comment.