Skip to content

Commit

Permalink
Merge branch 'main' into sqlalchemy-connect
Browse files Browse the repository at this point in the history
  • Loading branch information
srikanthccv authored Feb 1, 2023
2 parents 531df87 + d8788b6 commit d7b70f9
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 9 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#1212](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1212))
- Add connection attributes to sqlalchemy connect span
([#1608](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1608))
- `opentelemetry-instrumentation-aws-lambda` Flush `MeterProvider` at end of function invocation.
([#1613](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1613))
- Fix aiohttp bug with unset `trace_configs`
([#1592](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1592))

### Fixed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ def instrumented_init(wrapped, instance, args, kwargs):
if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY):
return wrapped(*args, **kwargs)

client_trace_configs = list(kwargs.get("trace_configs", ()))
client_trace_configs = list(kwargs.get("trace_configs") or [])
client_trace_configs.extend(trace_configs)

trace_config = create_trace_config(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def lambda_handler(event, context):
The `instrument` method accepts the following keyword args:
tracer_provider (TracerProvider) - an optional tracer provider
meter_provider (MeterProvider) - an optional meter provider
event_context_extractor (Callable) - a function that returns an OTel Trace
Context given the Lambda Event the AWS Lambda was invoked with
this function signature is: def event_context_extractor(lambda_event: Any) -> Context
Expand All @@ -68,6 +69,7 @@ def custom_event_context_extractor(lambda_event):

import logging
import os
import time
from importlib import import_module
from typing import Any, Callable, Collection
from urllib.parse import urlencode
Expand All @@ -79,6 +81,10 @@ def custom_event_context_extractor(lambda_event):
from opentelemetry.instrumentation.aws_lambda.version import __version__
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.metrics import (
MeterProvider,
get_meter_provider,
)
from opentelemetry.propagate import get_global_textmap
from opentelemetry.propagators.aws.aws_xray_propagator import (
TRACE_HEADER_KEY,
Expand Down Expand Up @@ -274,6 +280,7 @@ def _instrument(
event_context_extractor: Callable[[Any], Context],
tracer_provider: TracerProvider = None,
disable_aws_context_propagation: bool = False,
meter_provider: MeterProvider = None,
):
def _instrumented_lambda_handler_call(
call_wrapped, instance, args, kwargs
Expand Down Expand Up @@ -352,15 +359,33 @@ def _instrumented_lambda_handler_call(
result.get("statusCode"),
)

now = time.time()
_tracer_provider = tracer_provider or get_tracer_provider()
try:
# NOTE: `force_flush` before function quit in case of Lambda freeze.
# Assumes we are using the OpenTelemetry SDK implementation of the
# `TracerProvider`.
_tracer_provider.force_flush(flush_timeout)
except Exception: # pylint: disable=broad-except
logger.error(
"TracerProvider was missing `force_flush` method. This is necessary in case of a Lambda freeze and would exist in the OTel SDK implementation."
if hasattr(_tracer_provider, "force_flush"):
try:
# NOTE: `force_flush` before function quit in case of Lambda freeze.
_tracer_provider.force_flush(flush_timeout)
except Exception: # pylint: disable=broad-except
logger.exception(
f"TracerProvider failed to flush traces"
)
else:
logger.warning("TracerProvider was missing `force_flush` method. This is necessary in case of a Lambda freeze and would exist in the OTel SDK implementation.")

_meter_provider = meter_provider or get_meter_provider()
if hasattr(_meter_provider, "force_flush"):
rem = flush_timeout - (time.time()-now)*1000
if rem > 0:
try:
# NOTE: `force_flush` before function quit in case of Lambda freeze.
_meter_provider.force_flush(rem)
except Exception: # pylint: disable=broad-except
logger.exception(
f"MeterProvider failed to flush metrics"
)
else:
logger.warning(
"MeterProvider was missing `force_flush` method. This is necessary in case of a Lambda freeze and would exist in the OTel SDK implementation."
)

return result
Expand All @@ -385,6 +410,7 @@ def _instrument(self, **kwargs):
Args:
**kwargs: Optional arguments
``tracer_provider``: a TracerProvider, defaults to global
``meter_provider``: a MeterProvider, defaults to global
``event_context_extractor``: a method which takes the Lambda
Event as input and extracts an OTel Context from it. By default,
the context is extracted from the HTTP headers of an API Gateway
Expand Down Expand Up @@ -432,6 +458,7 @@ def _instrument(self, **kwargs):
),
tracer_provider=kwargs.get("tracer_provider"),
disable_aws_context_propagation=disable_aws_context_propagation,
meter_provider=kwargs.get("meter_provider"),
)

def _uninstrument(self, **kwargs):
Expand Down

0 comments on commit d7b70f9

Please sign in to comment.