From 86c74988e12f9ea86177084355750b072a6e96c0 Mon Sep 17 00:00:00 2001 From: Alisson Claudino Date: Mon, 15 Jul 2024 18:57:28 +0200 Subject: [PATCH 01/20] fix: forward OTEL context to subthreads in parallel bulk calls --- elasticsearch/_otel.py | 21 +++++++++++++++ elasticsearch/_version.py | 2 +- elasticsearch/helpers/actions.py | 45 ++++++++++++++++---------------- 3 files changed, 45 insertions(+), 23 deletions(-) diff --git a/elasticsearch/_otel.py b/elasticsearch/_otel.py index 039f7798b..1696d0d0e 100644 --- a/elasticsearch/_otel.py +++ b/elasticsearch/_otel.py @@ -22,7 +22,11 @@ from typing import Generator, Literal, Mapping try: + from opentelemetry import context as otel_context from opentelemetry import trace + from opentelemetry.trace.propagation.tracecontext import ( + TraceContextTextMapPropagator, + ) _tracer: trace.Tracer | None = trace.get_tracer("elasticsearch-api") except ModuleNotFoundError: @@ -41,6 +45,8 @@ class OpenTelemetry: + current_context = None + def __init__( self, enabled: bool | None = None, @@ -74,6 +80,8 @@ def span( span_name = endpoint_id or method with self.tracer.start_as_current_span(span_name) as otel_span: + self.current_context = {} + TraceContextTextMapPropagator().inject(self.current_context) otel_span.set_attribute("http.request.method", method) otel_span.set_attribute("db.system", "elasticsearch") if endpoint_id is not None: @@ -86,3 +94,16 @@ def span( endpoint_id=endpoint_id, body_strategy=self.body_strategy, ) + + @contextlib.contextmanager + def recover_parent_context(self): + token = None + if self.current_context: + otel_parent_ctx = TraceContextTextMapPropagator().extract( + carrier=self.current_context + ) + token = otel_context.attach(otel_parent_ctx) + yield + + if token: + otel_context.detach(token) diff --git a/elasticsearch/_version.py b/elasticsearch/_version.py index 9860a8968..d518f57a9 100644 --- a/elasticsearch/_version.py +++ b/elasticsearch/_version.py @@ -15,4 +15,4 @@ # specific language governing permissions and limitations # under the License. -__versionstr__ = "8.14.0" +__versionstr__ = "8.14.1-alpha" diff --git a/elasticsearch/helpers/actions.py b/elasticsearch/helpers/actions.py index e25ea8df6..32f7e1eb1 100644 --- a/elasticsearch/helpers/actions.py +++ b/elasticsearch/helpers/actions.py @@ -331,28 +331,29 @@ def _process_bulk_chunk( """ Send a bulk request to elasticsearch and process the output. """ - if isinstance(ignore_status, int): - ignore_status = (ignore_status,) - - try: - # send the actual request - resp = client.bulk(*args, operations=bulk_actions, **kwargs) # type: ignore[arg-type] - except ApiError as e: - gen = _process_bulk_chunk_error( - error=e, - bulk_data=bulk_data, - ignore_status=ignore_status, - raise_on_exception=raise_on_exception, - raise_on_error=raise_on_error, - ) - else: - gen = _process_bulk_chunk_success( - resp=resp.body, - bulk_data=bulk_data, - ignore_status=ignore_status, - raise_on_error=raise_on_error, - ) - yield from gen + with client._otel.recover_parent_context(): + if isinstance(ignore_status, int): + ignore_status = (ignore_status,) + + try: + # send the actual request + resp = client.bulk(*args, operations=bulk_actions, **kwargs) # type: ignore[arg-type] + except ApiError as e: + gen = _process_bulk_chunk_error( + error=e, + bulk_data=bulk_data, + ignore_status=ignore_status, + raise_on_exception=raise_on_exception, + raise_on_error=raise_on_error, + ) + else: + gen = _process_bulk_chunk_success( + resp=resp.body, + bulk_data=bulk_data, + ignore_status=ignore_status, + raise_on_error=raise_on_error, + ) + yield from gen def streaming_bulk( From c88e548735718838dca9197eaa7c1de39d1472f6 Mon Sep 17 00:00:00 2001 From: Alisson Claudino Date: Tue, 23 Jul 2024 15:30:55 +0200 Subject: [PATCH 02/20] test: add scenario for OTEL context forwarding in parallel bulk --- noxfile.py | 2 +- test_elasticsearch/test_otel.py | 23 ++++++++++++++++++++++- 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/noxfile.py b/noxfile.py index 69e53417f..4f773b3e2 100644 --- a/noxfile.py +++ b/noxfile.py @@ -52,7 +52,7 @@ def test(session): session.run(*pytest_argv()) -@nox.session(python=["3.8", "3.12"]) +@nox.session(python=["3.7", "3.8", "3.12"]) def test_otel(session): session.install( ".[dev]", diff --git a/test_elasticsearch/test_otel.py b/test_elasticsearch/test_otel.py index ffe7afff4..857519073 100644 --- a/test_elasticsearch/test_otel.py +++ b/test_elasticsearch/test_otel.py @@ -16,7 +16,9 @@ # under the License. import os - +from unittest import mock +from elasticsearch import Elasticsearch +from elasticsearch import helpers import pytest try: @@ -95,3 +97,22 @@ def test_detailed_span(): "db.elasticsearch.cluster.name": "e9106fc68e3044f0b1475b04bf4ffd5f", "db.elasticsearch.node.name": "instance-0000000001", } + + +@mock.patch("elasticsearch._otel.OpenTelemetry.recover_parent_context") +@mock.patch("elasticsearch.helpers.actions._process_bulk_chunk_success") +@mock.patch("elasticsearch.Elasticsearch.bulk") +def test_forward_otel_context_to_subthreads( + _call_bulk_mock, _process_bulk_success_mock, _mock_otel_recv_context +): + tracer, memory_exporter = setup_tracing() + es_client = Elasticsearch("http://localhost:9200") + es_client._otel = OpenTelemetry(enabled=True, tracer=tracer) + + _call_bulk_mock.return_value = mock.Mock() + actions = ({"x": i} for i in range(100)) + list( + helpers.parallel_bulk(es_client, actions, chunk_size=4) + ) + # Ensures that the OTEL context has been forwarded to all chunks + assert es_client._otel.recover_parent_context.call_count == 25 From 041d841fb79508ff63b0df97d351dc3787e5376f Mon Sep 17 00:00:00 2001 From: Alisson Claudino Date: Thu, 25 Jul 2024 14:51:32 -0300 Subject: [PATCH 03/20] lint: isort import order and black formatting --- test_elasticsearch/test_otel.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/test_elasticsearch/test_otel.py b/test_elasticsearch/test_otel.py index 857519073..d18864944 100644 --- a/test_elasticsearch/test_otel.py +++ b/test_elasticsearch/test_otel.py @@ -17,10 +17,11 @@ import os from unittest import mock -from elasticsearch import Elasticsearch -from elasticsearch import helpers + import pytest +from elasticsearch import Elasticsearch, helpers + try: from opentelemetry.sdk.trace import TracerProvider, export from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( @@ -111,8 +112,6 @@ def test_forward_otel_context_to_subthreads( _call_bulk_mock.return_value = mock.Mock() actions = ({"x": i} for i in range(100)) - list( - helpers.parallel_bulk(es_client, actions, chunk_size=4) - ) + list(helpers.parallel_bulk(es_client, actions, chunk_size=4)) # Ensures that the OTEL context has been forwarded to all chunks assert es_client._otel.recover_parent_context.call_count == 25 From 31c0b8d2dde4255ebbbe1aa7a2749be8a0c266bd Mon Sep 17 00:00:00 2001 From: Alisson Claudino Date: Fri, 26 Jul 2024 15:39:07 -0300 Subject: [PATCH 04/20] fix: add missing type hints --- elasticsearch/_otel.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/elasticsearch/_otel.py b/elasticsearch/_otel.py index 1696d0d0e..46895fb64 100644 --- a/elasticsearch/_otel.py +++ b/elasticsearch/_otel.py @@ -45,7 +45,7 @@ class OpenTelemetry: - current_context = None + current_context: dict[str, str] = {} def __init__( self, @@ -96,7 +96,7 @@ def span( ) @contextlib.contextmanager - def recover_parent_context(self): + def recover_parent_context(self) -> Generator[None, None, None]: token = None if self.current_context: otel_parent_ctx = TraceContextTextMapPropagator().extract( From 1f2dd49efcb6f7d90d7cfdafebdc8ef0c729a2b2 Mon Sep 17 00:00:00 2001 From: Quentin Pradet Date: Wed, 31 Jul 2024 12:55:39 +0200 Subject: [PATCH 05/20] Add _otel attribute to FailingBulkClient --- test_elasticsearch/test_server/test_helpers.py | 1 + 1 file changed, 1 insertion(+) diff --git a/test_elasticsearch/test_server/test_helpers.py b/test_elasticsearch/test_server/test_helpers.py index 33c31c364..011803bc9 100644 --- a/test_elasticsearch/test_server/test_helpers.py +++ b/test_elasticsearch/test_server/test_helpers.py @@ -41,6 +41,7 @@ def __init__( ), ): self.client = client + self._otel = client._otel self._called = 0 self._fail_at = fail_at self.transport = client.transport From 3f7359287246136287e1e4b4c01603260cb6e88f Mon Sep 17 00:00:00 2001 From: Quentin Pradet Date: Wed, 31 Jul 2024 15:06:19 +0200 Subject: [PATCH 06/20] Restore version --- elasticsearch/_version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/elasticsearch/_version.py b/elasticsearch/_version.py index d518f57a9..9860a8968 100644 --- a/elasticsearch/_version.py +++ b/elasticsearch/_version.py @@ -15,4 +15,4 @@ # specific language governing permissions and limitations # under the License. -__versionstr__ = "8.14.1-alpha" +__versionstr__ = "8.14.0" From ee1468e1162f6dd11167cc45874a33dabc5b56c5 Mon Sep 17 00:00:00 2001 From: Alisson Claudino Date: Tue, 6 Aug 2024 15:38:08 -0300 Subject: [PATCH 07/20] fix: review suggestions, renaming and except guard --- elasticsearch/_otel.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/elasticsearch/_otel.py b/elasticsearch/_otel.py index 46895fb64..2c973ead0 100644 --- a/elasticsearch/_otel.py +++ b/elasticsearch/_otel.py @@ -45,7 +45,7 @@ class OpenTelemetry: - current_context: dict[str, str] = {} + context_carrier: dict[str, str] = {} def __init__( self, @@ -80,8 +80,7 @@ def span( span_name = endpoint_id or method with self.tracer.start_as_current_span(span_name) as otel_span: - self.current_context = {} - TraceContextTextMapPropagator().inject(self.current_context) + TraceContextTextMapPropagator().inject(self.context_carrier) otel_span.set_attribute("http.request.method", method) otel_span.set_attribute("db.system", "elasticsearch") if endpoint_id is not None: @@ -98,12 +97,13 @@ def span( @contextlib.contextmanager def recover_parent_context(self) -> Generator[None, None, None]: token = None - if self.current_context: + if self.context_carrier: otel_parent_ctx = TraceContextTextMapPropagator().extract( - carrier=self.current_context + carrier=self.context_carrier ) token = otel_context.attach(otel_parent_ctx) - yield - - if token: - otel_context.detach(token) + try: + yield + finally: + if token: + otel_context.detach(token) From a7632724280df7eb908746dd8c6bf1ad8f73d9b6 Mon Sep 17 00:00:00 2001 From: Alisson Claudino Date: Tue, 6 Aug 2024 17:59:39 -0300 Subject: [PATCH 08/20] fix: optionally inject OTEL context --- elasticsearch/_otel.py | 4 ++- elasticsearch/helpers/actions.py | 45 ++++++++++++++++++-------------- 2 files changed, 28 insertions(+), 21 deletions(-) diff --git a/elasticsearch/_otel.py b/elasticsearch/_otel.py index 2c973ead0..64b1e75e1 100644 --- a/elasticsearch/_otel.py +++ b/elasticsearch/_otel.py @@ -73,6 +73,7 @@ def span( *, endpoint_id: str | None, path_parts: Mapping[str, str], + inject_context: bool = False, ) -> Generator[OpenTelemetrySpan, None, None]: if not self.enabled or self.tracer is None: yield OpenTelemetrySpan(None) @@ -80,7 +81,8 @@ def span( span_name = endpoint_id or method with self.tracer.start_as_current_span(span_name) as otel_span: - TraceContextTextMapPropagator().inject(self.context_carrier) + if inject_context: + TraceContextTextMapPropagator().inject(self.context_carrier) otel_span.set_attribute("http.request.method", method) otel_span.set_attribute("db.system", "elasticsearch") if endpoint_id is not None: diff --git a/elasticsearch/helpers/actions.py b/elasticsearch/helpers/actions.py index 32f7e1eb1..dfe30bf7f 100644 --- a/elasticsearch/helpers/actions.py +++ b/elasticsearch/helpers/actions.py @@ -590,27 +590,32 @@ def _setup_queues(self) -> None: ] = Queue(max(queue_size, thread_count)) self._quick_put = self._inqueue.put - pool = BlockingPool(thread_count) - - try: - for result in pool.imap( - lambda bulk_chunk: list( - _process_bulk_chunk( - client, - bulk_chunk[1], - bulk_chunk[0], - ignore_status=ignore_status, # type: ignore[misc] - *args, - **kwargs, - ) - ), - _chunk_actions(expanded_actions, chunk_size, max_chunk_bytes, serializer), - ): - yield from result + with client._otel.span( + "parallel_bulk", endpoint_id="", path_parts={}, inject_context=True + ): + pool = BlockingPool(thread_count) - finally: - pool.close() - pool.join() + try: + for result in pool.imap( + lambda bulk_chunk: list( + _process_bulk_chunk( + client, + bulk_chunk[1], + bulk_chunk[0], + ignore_status=ignore_status, # type: ignore[misc] + *args, + **kwargs, + ) + ), + _chunk_actions( + expanded_actions, chunk_size, max_chunk_bytes, serializer + ), + ): + yield from result + + finally: + pool.close() + pool.join() def scan( From 795d1bf119b07cbd82954195d0e09a0d8b9e7a47 Mon Sep 17 00:00:00 2001 From: Quentin Pradet Date: Wed, 21 Aug 2024 11:16:17 +0400 Subject: [PATCH 09/20] Remove 3.7 in nox otel session --- noxfile.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/noxfile.py b/noxfile.py index 3c1abf65e..600120bb3 100644 --- a/noxfile.py +++ b/noxfile.py @@ -52,7 +52,7 @@ def test(session): session.run(*pytest_argv()) -@nox.session(python=["3.7", "3.8", "3.12"]) +@nox.session(python=["3.8", "3.12"]) def test_otel(session): session.install( ".[dev]", From 7cfad71d52f6805faac6ec3c37503bb91505874d Mon Sep 17 00:00:00 2001 From: Quentin Pradet Date: Wed, 21 Aug 2024 11:13:59 +0400 Subject: [PATCH 10/20] Switch from span(inject_context) to helpers_span The helpers spans are not regular DB spans. --- elasticsearch/_otel.py | 18 +++++++++++++++--- elasticsearch/helpers/actions.py | 4 +--- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/elasticsearch/_otel.py b/elasticsearch/_otel.py index 64b1e75e1..91dfcecbb 100644 --- a/elasticsearch/_otel.py +++ b/elasticsearch/_otel.py @@ -73,7 +73,6 @@ def span( *, endpoint_id: str | None, path_parts: Mapping[str, str], - inject_context: bool = False, ) -> Generator[OpenTelemetrySpan, None, None]: if not self.enabled or self.tracer is None: yield OpenTelemetrySpan(None) @@ -81,8 +80,6 @@ def span( span_name = endpoint_id or method with self.tracer.start_as_current_span(span_name) as otel_span: - if inject_context: - TraceContextTextMapPropagator().inject(self.context_carrier) otel_span.set_attribute("http.request.method", method) otel_span.set_attribute("db.system", "elasticsearch") if endpoint_id is not None: @@ -96,6 +93,21 @@ def span( body_strategy=self.body_strategy, ) + + @contextlib.contextmanager + def helpers_span(self, span_name: str): + if not self.enabled or self.tracer is None: + return + + with self.tracer.start_as_current_span(span_name) as otel_span: + TraceContextTextMapPropagator().inject(self.context_carrier) + otel_span.set_attribute("db.system", "elasticsearch") + otel_span.set_attribute("db.operation", span_name) + # Without a request method, Elastic APM does not display the traces + otel_span.set_attribute("http.request.method", "null") + yield + + @contextlib.contextmanager def recover_parent_context(self) -> Generator[None, None, None]: token = None diff --git a/elasticsearch/helpers/actions.py b/elasticsearch/helpers/actions.py index dfe30bf7f..804ee9428 100644 --- a/elasticsearch/helpers/actions.py +++ b/elasticsearch/helpers/actions.py @@ -590,9 +590,7 @@ def _setup_queues(self) -> None: ] = Queue(max(queue_size, thread_count)) self._quick_put = self._inqueue.put - with client._otel.span( - "parallel_bulk", endpoint_id="", path_parts={}, inject_context=True - ): + with client._otel.helpers_span("helpers.parallel_bulk"): pool = BlockingPool(thread_count) try: From 9eb549a1176d49b8cfac1d954f532627c91608bd Mon Sep 17 00:00:00 2001 From: Quentin Pradet Date: Wed, 21 Aug 2024 11:56:48 +0400 Subject: [PATCH 11/20] Create OpenTelemetry parent traces for all sync bulk helpers --- elasticsearch/_otel.py | 13 ++-- elasticsearch/helpers/actions.py | 130 ++++++++++++++++--------------- 2 files changed, 71 insertions(+), 72 deletions(-) diff --git a/elasticsearch/_otel.py b/elasticsearch/_otel.py index 91dfcecbb..b79178fa2 100644 --- a/elasticsearch/_otel.py +++ b/elasticsearch/_otel.py @@ -110,14 +110,11 @@ def helpers_span(self, span_name: str): @contextlib.contextmanager def recover_parent_context(self) -> Generator[None, None, None]: - token = None - if self.context_carrier: - otel_parent_ctx = TraceContextTextMapPropagator().extract( - carrier=self.context_carrier - ) - token = otel_context.attach(otel_parent_ctx) + otel_parent_ctx = TraceContextTextMapPropagator().extract( + carrier=self.context_carrier + ) + token = otel_context.attach(otel_parent_ctx) try: yield finally: - if token: - otel_context.detach(token) + otel_context.detach(token) diff --git a/elasticsearch/helpers/actions.py b/elasticsearch/helpers/actions.py index 804ee9428..f7de8289f 100644 --- a/elasticsearch/helpers/actions.py +++ b/elasticsearch/helpers/actions.py @@ -371,6 +371,7 @@ def streaming_bulk( max_backoff: float = 600, yield_ok: bool = True, ignore_status: Union[int, Collection[int]] = (), + span_name: str = "helpers.streaming_bulk", *args: Any, **kwargs: Any, ) -> Iterable[Tuple[bool, Dict[str, Any]]]: @@ -407,73 +408,74 @@ def streaming_bulk( :arg yield_ok: if set to False will skip successful documents in the output :arg ignore_status: list of HTTP status code that you want to ignore """ - client = client.options() - client._client_meta = (("h", "bp"),) + with client._otel.helpers_span(span_name): + client = client.options() + client._client_meta = (("h", "bp"),) - serializer = client.transport.serializers.get_serializer("application/json") + serializer = client.transport.serializers.get_serializer("application/json") - bulk_data: List[ - Union[ - Tuple[_TYPE_BULK_ACTION_HEADER], - Tuple[_TYPE_BULK_ACTION_HEADER, _TYPE_BULK_ACTION_BODY], + bulk_data: List[ + Union[ + Tuple[_TYPE_BULK_ACTION_HEADER], + Tuple[_TYPE_BULK_ACTION_HEADER, _TYPE_BULK_ACTION_BODY], + ] ] - ] - bulk_actions: List[bytes] - for bulk_data, bulk_actions in _chunk_actions( - map(expand_action_callback, actions), chunk_size, max_chunk_bytes, serializer - ): - for attempt in range(max_retries + 1): - to_retry: List[bytes] = [] - to_retry_data: List[ - Union[ - Tuple[_TYPE_BULK_ACTION_HEADER], - Tuple[_TYPE_BULK_ACTION_HEADER, _TYPE_BULK_ACTION_BODY], - ] - ] = [] - if attempt: - time.sleep(min(max_backoff, initial_backoff * 2 ** (attempt - 1))) - - try: - for data, (ok, info) in zip( - bulk_data, - _process_bulk_chunk( - client, - bulk_actions, + bulk_actions: List[bytes] + for bulk_data, bulk_actions in _chunk_actions( + map(expand_action_callback, actions), chunk_size, max_chunk_bytes, serializer + ): + for attempt in range(max_retries + 1): + to_retry: List[bytes] = [] + to_retry_data: List[ + Union[ + Tuple[_TYPE_BULK_ACTION_HEADER], + Tuple[_TYPE_BULK_ACTION_HEADER, _TYPE_BULK_ACTION_BODY], + ] + ] = [] + if attempt: + time.sleep(min(max_backoff, initial_backoff * 2 ** (attempt - 1))) + + try: + for data, (ok, info) in zip( bulk_data, - raise_on_exception, - raise_on_error, - ignore_status, - *args, - **kwargs, - ), - ): - if not ok: - action, info = info.popitem() - # retry if retries enabled, we get 429, and we are not - # in the last attempt - if ( - max_retries - and info["status"] == 429 - and (attempt + 1) <= max_retries - ): - # _process_bulk_chunk expects bytes so we need to - # re-serialize the data - to_retry.extend(map(serializer.dumps, data)) - to_retry_data.append(data) - else: - yield ok, {action: info} - elif yield_ok: - yield ok, info - - except ApiError as e: - # suppress 429 errors since we will retry them - if attempt == max_retries or e.status_code != 429: - raise - else: - if not to_retry: - break - # retry only subset of documents that didn't succeed - bulk_actions, bulk_data = to_retry, to_retry_data + _process_bulk_chunk( + client, + bulk_actions, + bulk_data, + raise_on_exception, + raise_on_error, + ignore_status, + *args, + **kwargs, + ), + ): + if not ok: + action, info = info.popitem() + # retry if retries enabled, we get 429, and we are not + # in the last attempt + if ( + max_retries + and info["status"] == 429 + and (attempt + 1) <= max_retries + ): + # _process_bulk_chunk expects bytes so we need to + # re-serialize the data + to_retry.extend(map(serializer.dumps, data)) + to_retry_data.append(data) + else: + yield ok, {action: info} + elif yield_ok: + yield ok, info + + except ApiError as e: + # suppress 429 errors since we will retry them + if attempt == max_retries or e.status_code != 429: + raise + else: + if not to_retry: + break + # retry only subset of documents that didn't succeed + bulk_actions, bulk_data = to_retry, to_retry_data def bulk( @@ -520,7 +522,7 @@ def bulk( # make streaming_bulk yield successful results so we can count them kwargs["yield_ok"] = True for ok, item in streaming_bulk( - client, actions, ignore_status=ignore_status, *args, **kwargs # type: ignore[misc] + client, actions, ignore_status=ignore_status, span_name="helpers.bulk", *args, **kwargs # type: ignore[misc] ): # go through request-response pairs and detect failures if not ok: From 751b8111d9a4a74a9d3fb490e3732f7b3795c607 Mon Sep 17 00:00:00 2001 From: Quentin Pradet Date: Wed, 21 Aug 2024 13:16:33 +0400 Subject: [PATCH 12/20] Fix lint --- elasticsearch/_otel.py | 5 ++--- elasticsearch/helpers/actions.py | 5 ++++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/elasticsearch/_otel.py b/elasticsearch/_otel.py index b79178fa2..04ec80347 100644 --- a/elasticsearch/_otel.py +++ b/elasticsearch/_otel.py @@ -93,10 +93,10 @@ def span( body_strategy=self.body_strategy, ) - @contextlib.contextmanager - def helpers_span(self, span_name: str): + def helpers_span(self, span_name: str) -> Generator[None, None, None]: if not self.enabled or self.tracer is None: + yield return with self.tracer.start_as_current_span(span_name) as otel_span: @@ -107,7 +107,6 @@ def helpers_span(self, span_name: str): otel_span.set_attribute("http.request.method", "null") yield - @contextlib.contextmanager def recover_parent_context(self) -> Generator[None, None, None]: otel_parent_ctx = TraceContextTextMapPropagator().extract( diff --git a/elasticsearch/helpers/actions.py b/elasticsearch/helpers/actions.py index f7de8289f..4ef528f8d 100644 --- a/elasticsearch/helpers/actions.py +++ b/elasticsearch/helpers/actions.py @@ -422,7 +422,10 @@ def streaming_bulk( ] bulk_actions: List[bytes] for bulk_data, bulk_actions in _chunk_actions( - map(expand_action_callback, actions), chunk_size, max_chunk_bytes, serializer + map(expand_action_callback, actions), + chunk_size, + max_chunk_bytes, + serializer, ): for attempt in range(max_retries + 1): to_retry: List[bytes] = [] From 464b306f3b345c996338f725138bacc0f96e4ca7 Mon Sep 17 00:00:00 2001 From: Quentin Pradet Date: Wed, 21 Aug 2024 16:32:59 +0400 Subject: [PATCH 13/20] Fix _recover_parent_context without OTel --- elasticsearch/_otel.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/elasticsearch/_otel.py b/elasticsearch/_otel.py index 04ec80347..cae195d3f 100644 --- a/elasticsearch/_otel.py +++ b/elasticsearch/_otel.py @@ -109,6 +109,10 @@ def helpers_span(self, span_name: str) -> Generator[None, None, None]: @contextlib.contextmanager def recover_parent_context(self) -> Generator[None, None, None]: + if not self.enabled or self.tracer is None: + yield + return + otel_parent_ctx = TraceContextTextMapPropagator().extract( carrier=self.context_carrier ) From a0689c8e0986646ae9a8e115e833641cfeada17b Mon Sep 17 00:00:00 2001 From: Quentin Pradet Date: Thu, 29 Aug 2024 16:31:09 +0400 Subject: [PATCH 14/20] Switch test_otel to sync_client --- test_elasticsearch/test_server/test_otel.py | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/test_elasticsearch/test_server/test_otel.py b/test_elasticsearch/test_server/test_otel.py index b90dab8d6..fb987b3be 100644 --- a/test_elasticsearch/test_server/test_otel.py +++ b/test_elasticsearch/test_server/test_otel.py @@ -20,7 +20,6 @@ import pytest import elasticsearch -from test_elasticsearch.utils import CA_CERTS try: from opentelemetry import trace @@ -39,7 +38,7 @@ ] -def test_otel_end_to_end(monkeypatch, elasticsearch_url: str): +def test_otel_end_to_end(monkeypatch, sync_client): # Sets the global default tracer provider tracer_provider = TracerProvider() memory_exporter = InMemorySpanExporter() @@ -47,14 +46,7 @@ def test_otel_end_to_end(monkeypatch, elasticsearch_url: str): tracer_provider.add_span_processor(span_processor) trace.set_tracer_provider(tracer_provider) - # Once OpenTelemetry is enabled by default, we can use the sync_client fixture instead - monkeypatch.setenv("OTEL_PYTHON_INSTRUMENTATION_ELASTICSEARCH_ENABLED", "true") - kw = {} - if elasticsearch_url.startswith("https://"): - kw["ca_certs"] = CA_CERTS - client = elasticsearch.Elasticsearch(elasticsearch_url, **kw) - - resp = client.search(index="logs-*", query={"match_all": {}}) + resp = sync_client.search(index="logs-*", query={"match_all": {}}) assert resp.meta.status == 200 spans = memory_exporter.get_finished_spans() From 6dbe873b753565df88e192188eea3d0c61055684 Mon Sep 17 00:00:00 2001 From: Quentin Pradet Date: Thu, 29 Aug 2024 16:56:45 +0400 Subject: [PATCH 15/20] Switch away from global tracer It's global and not practical to use with multiple tests. --- test_elasticsearch/test_server/test_otel.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/test_elasticsearch/test_server/test_otel.py b/test_elasticsearch/test_server/test_otel.py index fb987b3be..6f8f32122 100644 --- a/test_elasticsearch/test_server/test_otel.py +++ b/test_elasticsearch/test_server/test_otel.py @@ -21,6 +21,8 @@ import elasticsearch +from ..test_otel import setup_tracing + try: from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider, export @@ -38,13 +40,9 @@ ] -def test_otel_end_to_end(monkeypatch, sync_client): - # Sets the global default tracer provider - tracer_provider = TracerProvider() - memory_exporter = InMemorySpanExporter() - span_processor = export.SimpleSpanProcessor(memory_exporter) - tracer_provider.add_span_processor(span_processor) - trace.set_tracer_provider(tracer_provider) +def test_otel_end_to_end(sync_client): + tracer, memory_exporter = setup_tracing() + sync_client._otel.tracer = tracer resp = sync_client.search(index="logs-*", query={"match_all": {}}) assert resp.meta.status == 200 From 4de43157c4e667101973ab030e1711e422fbb6c7 Mon Sep 17 00:00:00 2001 From: Quentin Pradet Date: Fri, 30 Aug 2024 17:52:54 +0400 Subject: [PATCH 16/20] Test all bulks --- test_elasticsearch/test_otel.py | 7 ++- test_elasticsearch/test_server/test_otel.py | 66 ++++++++++++++++++--- 2 files changed, 63 insertions(+), 10 deletions(-) diff --git a/test_elasticsearch/test_otel.py b/test_elasticsearch/test_otel.py index d18864944..f92e7087d 100644 --- a/test_elasticsearch/test_otel.py +++ b/test_elasticsearch/test_otel.py @@ -101,10 +101,14 @@ def test_detailed_span(): @mock.patch("elasticsearch._otel.OpenTelemetry.recover_parent_context") +@mock.patch("elasticsearch._otel.OpenTelemetry.helpers_span") @mock.patch("elasticsearch.helpers.actions._process_bulk_chunk_success") @mock.patch("elasticsearch.Elasticsearch.bulk") def test_forward_otel_context_to_subthreads( - _call_bulk_mock, _process_bulk_success_mock, _mock_otel_recv_context + _call_bulk_mock, + _process_bulk_success_mock, + _mock_otel_helpers_span, + _mock_otel_recv_context, ): tracer, memory_exporter = setup_tracing() es_client = Elasticsearch("http://localhost:9200") @@ -114,4 +118,5 @@ def test_forward_otel_context_to_subthreads( actions = ({"x": i} for i in range(100)) list(helpers.parallel_bulk(es_client, actions, chunk_size=4)) # Ensures that the OTEL context has been forwarded to all chunks + assert es_client._otel.helpers_span.call_count == 1 assert es_client._otel.recover_parent_context.call_count == 25 diff --git a/test_elasticsearch/test_server/test_otel.py b/test_elasticsearch/test_server/test_otel.py index 6f8f32122..06ef382f2 100644 --- a/test_elasticsearch/test_server/test_otel.py +++ b/test_elasticsearch/test_server/test_otel.py @@ -20,18 +20,10 @@ import pytest import elasticsearch +import elasticsearch.helpers from ..test_otel import setup_tracing -try: - from opentelemetry import trace - from opentelemetry.sdk.trace import TracerProvider, export - from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( - InMemorySpanExporter, - ) -except ModuleNotFoundError: - pass - pytestmark = [ pytest.mark.skipif( "TEST_WITH_OTEL" not in os.environ, reason="TEST_WITH_OTEL is not set" @@ -59,3 +51,59 @@ def test_otel_end_to_end(sync_client): # Assert expected atttributes are here, but allow other attributes too # to make this test robust to elastic-transport changes assert expected_attributes.items() <= spans[0].attributes.items() + + +@pytest.mark.parametrize( + "bulk_helper_name", ["bulk", "streaming_bulk", "parallel_bulk"] +) +def test_otel_bulk(sync_client, bulk_helper_name): + tracer, memory_exporter = setup_tracing() + + # Create a new client with our tracer + sync_client = sync_client.options() + sync_client._otel.tracer = tracer + # "Disable" options to keep sync_client with tracer enabled + sync_client.options = lambda: sync_client + + docs = [{"answer": x, "helper": bulk_helper_name, "_id": x} for x in range(10)] + bulk_function = getattr(elasticsearch.helpers, bulk_helper_name) + if bulk_helper_name == "bulk": + success, failed = bulk_function( + sync_client, docs, index="test-index", chunk_size=2, refresh=True + ) + assert success, failed == (5, 0) + else: + for ok, resp in bulk_function( + sync_client, docs, index="test-index", chunk_size=2, refresh=True + ): + assert ok is True + + memory_exporter.shutdown() + + assert 10 == sync_client.count(index="test-index")["count"] + assert {"answer": 4, "helper": bulk_helper_name} == sync_client.get( + index="test-index", id=4 + )["_source"] + + spans = list(memory_exporter.get_finished_spans()) + parent_span = spans.pop() + assert parent_span.name == f"helpers.{bulk_helper_name}" + assert parent_span.attributes == { + "db.system": "elasticsearch", + "db.operation": f"helpers.{bulk_helper_name}", + "http.request.method": "null", + } + + assert len(spans) == 5 + for span in spans: + assert span.name == "bulk" + assert span.attributes == { + "http.request.method": "PUT", + "db.system": "elasticsearch", + "db.operation": "bulk", + "db.elasticsearch.path_parts.index": "test-index", + "url.full": "http://localhost:9200/test-index/_bulk?refresh=true", + "server.address": "localhost", + "server.port": 9200, + } + assert span.parent.trace_id == parent_span.context.trace_id From 280251dda6d5460d4e227a0eda31adb7f0fa6833 Mon Sep 17 00:00:00 2001 From: Quentin Pradet Date: Fri, 30 Aug 2024 19:20:31 +0400 Subject: [PATCH 17/20] Fix url.full comparison --- test_elasticsearch/test_server/test_otel.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test_elasticsearch/test_server/test_otel.py b/test_elasticsearch/test_server/test_otel.py index 06ef382f2..91aa63597 100644 --- a/test_elasticsearch/test_server/test_otel.py +++ b/test_elasticsearch/test_server/test_otel.py @@ -56,7 +56,7 @@ def test_otel_end_to_end(sync_client): @pytest.mark.parametrize( "bulk_helper_name", ["bulk", "streaming_bulk", "parallel_bulk"] ) -def test_otel_bulk(sync_client, bulk_helper_name): +def test_otel_bulk(sync_client, elasticsearch_url, bulk_helper_name): tracer, memory_exporter = setup_tracing() # Create a new client with our tracer @@ -102,7 +102,7 @@ def test_otel_bulk(sync_client, bulk_helper_name): "db.system": "elasticsearch", "db.operation": "bulk", "db.elasticsearch.path_parts.index": "test-index", - "url.full": "http://localhost:9200/test-index/_bulk?refresh=true", + "url.full": f"{elasticsearch_url}/test-index/_bulk?refresh=true", "server.address": "localhost", "server.port": 9200, } From 5dcec1a5fef8e805b67d2573cd946bc01e1a3f5f Mon Sep 17 00:00:00 2001 From: Quentin Pradet Date: Mon, 2 Sep 2024 10:08:12 +0400 Subject: [PATCH 18/20] Fix attribute check again --- test_elasticsearch/test_server/test_otel.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test_elasticsearch/test_server/test_otel.py b/test_elasticsearch/test_server/test_otel.py index 91aa63597..2752d453b 100644 --- a/test_elasticsearch/test_server/test_otel.py +++ b/test_elasticsearch/test_server/test_otel.py @@ -97,13 +97,13 @@ def test_otel_bulk(sync_client, elasticsearch_url, bulk_helper_name): assert len(spans) == 5 for span in spans: assert span.name == "bulk" - assert span.attributes == { + expected_attributes = { "http.request.method": "PUT", "db.system": "elasticsearch", "db.operation": "bulk", "db.elasticsearch.path_parts.index": "test-index", - "url.full": f"{elasticsearch_url}/test-index/_bulk?refresh=true", - "server.address": "localhost", - "server.port": 9200, } + # Assert expected atttributes are here, but allow other attributes too + # to make this test robust to elastic-transport changes + assert expected_attributes.items() <= spans[0].attributes.items() assert span.parent.trace_id == parent_span.context.trace_id From a91e0594aa7e9184c4e623bfaf2022908fbf9d2f Mon Sep 17 00:00:00 2001 From: Quentin Pradet Date: Tue, 3 Sep 2024 17:57:58 +0400 Subject: [PATCH 19/20] Use more robust approach to keep track of the parent context Instead of maintaining a global context, we simply pass the otel span down and use it manually. --- elasticsearch/_otel.py | 22 +++++---------------- elasticsearch/helpers/actions.py | 10 +++++++--- test_elasticsearch/test_otel.py | 6 +++--- test_elasticsearch/test_server/test_otel.py | 2 +- 4 files changed, 16 insertions(+), 24 deletions(-) diff --git a/elasticsearch/_otel.py b/elasticsearch/_otel.py index cae195d3f..59159369b 100644 --- a/elasticsearch/_otel.py +++ b/elasticsearch/_otel.py @@ -24,9 +24,6 @@ try: from opentelemetry import context as otel_context from opentelemetry import trace - from opentelemetry.trace.propagation.tracecontext import ( - TraceContextTextMapPropagator, - ) _tracer: trace.Tracer | None = trace.get_tracer("elasticsearch-api") except ModuleNotFoundError: @@ -45,8 +42,6 @@ class OpenTelemetry: - context_carrier: dict[str, str] = {} - def __init__( self, enabled: bool | None = None, @@ -94,30 +89,23 @@ def span( ) @contextlib.contextmanager - def helpers_span(self, span_name: str) -> Generator[None, None, None]: + def helpers_span(self, span_name: str) -> Generator[OpenTelemetrySpan, None, None]: if not self.enabled or self.tracer is None: - yield + yield OpenTelemetrySpan(None) return with self.tracer.start_as_current_span(span_name) as otel_span: - TraceContextTextMapPropagator().inject(self.context_carrier) otel_span.set_attribute("db.system", "elasticsearch") otel_span.set_attribute("db.operation", span_name) # Without a request method, Elastic APM does not display the traces otel_span.set_attribute("http.request.method", "null") - yield + yield otel_span @contextlib.contextmanager - def recover_parent_context(self) -> Generator[None, None, None]: + def use_span(self, span: OpenTelemetrySpan) -> Generator[None, None, None]: if not self.enabled or self.tracer is None: yield return - otel_parent_ctx = TraceContextTextMapPropagator().extract( - carrier=self.context_carrier - ) - token = otel_context.attach(otel_parent_ctx) - try: + with trace.use_span(span): yield - finally: - otel_context.detach(token) diff --git a/elasticsearch/helpers/actions.py b/elasticsearch/helpers/actions.py index 4ef528f8d..60df0292a 100644 --- a/elasticsearch/helpers/actions.py +++ b/elasticsearch/helpers/actions.py @@ -34,6 +34,7 @@ Union, ) +from elastic_transport import OpenTelemetrySpan from .. import Elasticsearch from ..compat import to_bytes from ..exceptions import ApiError, NotFoundError, TransportError @@ -322,6 +323,7 @@ def _process_bulk_chunk( Tuple[_TYPE_BULK_ACTION_HEADER, _TYPE_BULK_ACTION_BODY], ] ], + otel_span: OpenTelemetrySpan, raise_on_exception: bool = True, raise_on_error: bool = True, ignore_status: Union[int, Collection[int]] = (), @@ -331,7 +333,7 @@ def _process_bulk_chunk( """ Send a bulk request to elasticsearch and process the output. """ - with client._otel.recover_parent_context(): + with client._otel.use_span(otel_span): if isinstance(ignore_status, int): ignore_status = (ignore_status,) @@ -408,7 +410,7 @@ def streaming_bulk( :arg yield_ok: if set to False will skip successful documents in the output :arg ignore_status: list of HTTP status code that you want to ignore """ - with client._otel.helpers_span(span_name): + with client._otel.helpers_span(span_name) as otel_span: client = client.options() client._client_meta = (("h", "bp"),) @@ -445,6 +447,7 @@ def streaming_bulk( client, bulk_actions, bulk_data, + otel_span, raise_on_exception, raise_on_error, ignore_status, @@ -595,7 +598,7 @@ def _setup_queues(self) -> None: ] = Queue(max(queue_size, thread_count)) self._quick_put = self._inqueue.put - with client._otel.helpers_span("helpers.parallel_bulk"): + with client._otel.helpers_span("helpers.parallel_bulk") as otel_span: pool = BlockingPool(thread_count) try: @@ -605,6 +608,7 @@ def _setup_queues(self) -> None: client, bulk_chunk[1], bulk_chunk[0], + otel_span=otel_span, ignore_status=ignore_status, # type: ignore[misc] *args, **kwargs, diff --git a/test_elasticsearch/test_otel.py b/test_elasticsearch/test_otel.py index f92e7087d..48eb9ea58 100644 --- a/test_elasticsearch/test_otel.py +++ b/test_elasticsearch/test_otel.py @@ -100,7 +100,7 @@ def test_detailed_span(): } -@mock.patch("elasticsearch._otel.OpenTelemetry.recover_parent_context") +@mock.patch("elasticsearch._otel.OpenTelemetry.use_span") @mock.patch("elasticsearch._otel.OpenTelemetry.helpers_span") @mock.patch("elasticsearch.helpers.actions._process_bulk_chunk_success") @mock.patch("elasticsearch.Elasticsearch.bulk") @@ -108,7 +108,7 @@ def test_forward_otel_context_to_subthreads( _call_bulk_mock, _process_bulk_success_mock, _mock_otel_helpers_span, - _mock_otel_recv_context, + _mock_otel_use_span, ): tracer, memory_exporter = setup_tracing() es_client = Elasticsearch("http://localhost:9200") @@ -119,4 +119,4 @@ def test_forward_otel_context_to_subthreads( list(helpers.parallel_bulk(es_client, actions, chunk_size=4)) # Ensures that the OTEL context has been forwarded to all chunks assert es_client._otel.helpers_span.call_count == 1 - assert es_client._otel.recover_parent_context.call_count == 25 + assert es_client._otel.use_span.call_count == 25 diff --git a/test_elasticsearch/test_server/test_otel.py b/test_elasticsearch/test_server/test_otel.py index 2752d453b..3f8033d7b 100644 --- a/test_elasticsearch/test_server/test_otel.py +++ b/test_elasticsearch/test_server/test_otel.py @@ -62,7 +62,7 @@ def test_otel_bulk(sync_client, elasticsearch_url, bulk_helper_name): # Create a new client with our tracer sync_client = sync_client.options() sync_client._otel.tracer = tracer - # "Disable" options to keep sync_client with tracer enabled + # "Disable" options to keep our custom tracer sync_client.options = lambda: sync_client docs = [{"answer": x, "helper": bulk_helper_name, "_id": x} for x in range(10)] From 3caee68b6ca9bec2dde428456f8aff1192716e1e Mon Sep 17 00:00:00 2001 From: Quentin Pradet Date: Tue, 3 Sep 2024 18:02:43 +0400 Subject: [PATCH 20/20] Fix lint --- elasticsearch/_otel.py | 1 - elasticsearch/helpers/actions.py | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/elasticsearch/_otel.py b/elasticsearch/_otel.py index 59159369b..49de9201c 100644 --- a/elasticsearch/_otel.py +++ b/elasticsearch/_otel.py @@ -22,7 +22,6 @@ from typing import Generator, Literal, Mapping try: - from opentelemetry import context as otel_context from opentelemetry import trace _tracer: trace.Tracer | None = trace.get_tracer("elasticsearch-api") diff --git a/elasticsearch/helpers/actions.py b/elasticsearch/helpers/actions.py index 60df0292a..1d6b0a27e 100644 --- a/elasticsearch/helpers/actions.py +++ b/elasticsearch/helpers/actions.py @@ -35,6 +35,7 @@ ) from elastic_transport import OpenTelemetrySpan + from .. import Elasticsearch from ..compat import to_bytes from ..exceptions import ApiError, NotFoundError, TransportError