diff --git a/CHANGELOG.md b/CHANGELOG.md index d93f7bab1a..242dd4935c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,16 +7,23 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +### Added + - Add connection attributes to sqlalchemy connect span ([#1608](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1608)) +### Fixed + +- Fix Flask instrumentation to only close the span if it was created by the same thread. + ([#1654](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1654)) + ## Version 1.16.0/0.37b0 (2023-02-17) ### Added - Support `aio_pika` 9.x (([#1670](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1670]) - `opentelemetry-instrumentation-redis` Add `sanitize_query` config option to allow query sanitization. ([#1572](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1572)) -- `opentelemetry-instrumentation-elasticsearch` Add optional db.statement query sanitization. +- `opentelemetry-instrumentation-elasticsearch` Add optional db.statement query sanitization. ([#1598](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1598)) - `opentelemetry-instrumentation-celery` Record exceptions as events on the span. ([#1573](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1573)) diff --git a/instrumentation/opentelemetry-instrumentation-flask/src/opentelemetry/instrumentation/flask/__init__.py b/instrumentation/opentelemetry-instrumentation-flask/src/opentelemetry/instrumentation/flask/__init__.py index 922c5e0b41..fd3c40aab3 100644 --- a/instrumentation/opentelemetry-instrumentation-flask/src/opentelemetry/instrumentation/flask/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-flask/src/opentelemetry/instrumentation/flask/__init__.py @@ -238,8 +238,8 @@ def response_hook(span: Span, status: str, response_headers: List): API --- """ - from logging import getLogger +from threading import get_ident from time import time_ns from timeit import default_timer from typing import Collection @@ -265,6 +265,7 @@ def response_hook(span: Span, status: str, response_headers: List): _ENVIRON_STARTTIME_KEY = "opentelemetry-flask.starttime_key" _ENVIRON_SPAN_KEY = "opentelemetry-flask.span_key" _ENVIRON_ACTIVATION_KEY = "opentelemetry-flask.activation_key" +_ENVIRON_THREAD_ID_KEY = "opentelemetry-flask.thread_id_key" _ENVIRON_TOKEN = "opentelemetry-flask.token" _excluded_urls_from_env = get_excluded_urls("FLASK") @@ -398,6 +399,7 @@ def _before_request(): activation = trace.use_span(span, end_on_exit=True) activation.__enter__() # pylint: disable=E1101 flask_request_environ[_ENVIRON_ACTIVATION_KEY] = activation + flask_request_environ[_ENVIRON_THREAD_ID_KEY] = get_ident() flask_request_environ[_ENVIRON_SPAN_KEY] = span flask_request_environ[_ENVIRON_TOKEN] = token @@ -437,10 +439,17 @@ def _teardown_request(exc): return activation = flask.request.environ.get(_ENVIRON_ACTIVATION_KEY) - if not activation: + thread_id = flask.request.environ.get(_ENVIRON_THREAD_ID_KEY) + if not activation or thread_id != get_ident(): # This request didn't start a span, maybe because it was created in # a way that doesn't run `before_request`, like when it is created # with `app.test_request_context`. + # + # Similarly, check the thread_id against the current thread to ensure + # tear down only happens on the original thread. This situation can + # arise if the original thread handling the request spawn children + # threads and then uses something like copy_current_request_context + # to copy the request context. return if exc is None: activation.__exit__(None, None, None) diff --git a/instrumentation/opentelemetry-instrumentation-flask/tests/base_test.py b/instrumentation/opentelemetry-instrumentation-flask/tests/base_test.py index 1da8faa7fd..a9cc4e55f7 100644 --- a/instrumentation/opentelemetry-instrumentation-flask/tests/base_test.py +++ b/instrumentation/opentelemetry-instrumentation-flask/tests/base_test.py @@ -12,6 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +from concurrent.futures import ThreadPoolExecutor, as_completed +from random import randint + import flask from werkzeug.test import Client from werkzeug.wrappers import Response @@ -34,6 +37,25 @@ def _sqlcommenter_endpoint(): ) return sqlcommenter_flask_values + @staticmethod + def _multithreaded_endpoint(count): + def do_random_stuff(): + @flask.copy_current_request_context + def inner(): + return randint(0, 100) + + return inner + + executor = ThreadPoolExecutor(count) + futures = [] + for _ in range(count): + futures.append(executor.submit(do_random_stuff())) + numbers = [] + for future in as_completed(futures): + numbers.append(future.result()) + + return " ".join([str(i) for i in numbers]) + @staticmethod def _custom_response_headers(): resp = flask.Response("test response") @@ -61,6 +83,7 @@ def excluded2_endpoint(): # pylint: disable=no-member self.app.route("/hello/")(self._hello_endpoint) self.app.route("/sqlcommenter")(self._sqlcommenter_endpoint) + self.app.route("/multithreaded")(self._multithreaded_endpoint) self.app.route("/excluded/")(self._hello_endpoint) self.app.route("/excluded")(excluded_endpoint) self.app.route("/excluded2")(excluded2_endpoint) diff --git a/instrumentation/opentelemetry-instrumentation-flask/tests/test_multithreading.py b/instrumentation/opentelemetry-instrumentation-flask/tests/test_multithreading.py new file mode 100644 index 0000000000..a793b229e3 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-flask/tests/test_multithreading.py @@ -0,0 +1,52 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import flask +from werkzeug.test import Client +from werkzeug.wrappers import Response + +from opentelemetry.instrumentation.flask import FlaskInstrumentor +from opentelemetry.test.wsgitestutil import WsgiTestBase + +# pylint: disable=import-error +from .base_test import InstrumentationTest + + +class TestMultiThreading(InstrumentationTest, WsgiTestBase): + def setUp(self): + super().setUp() + FlaskInstrumentor().instrument() + self.app = flask.Flask(__name__) + self._common_initialization() + + def tearDown(self): + super().tearDown() + with self.disable_logging(): + FlaskInstrumentor().uninstrument() + + def test_multithreaded(self): + """Test that instrumentation tear down does not blow up + when the request thread spawn children threads and the request + context is copied to the children threads + """ + self.app = flask.Flask(__name__) + self.app.route("/multithreaded/")( + self._multithreaded_endpoint + ) + client = Client(self.app, Response) + count = 5 + resp = client.get(f"/multithreaded/{count}") + self.assertEqual(200, resp.status_code) + # Should return the specified number of random integers + self.assertEqual(count, len(resp.text.split(" ")))