Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Kafka/Confluent Kafka header bug #997

Merged
merged 1 commit into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions newrelic/hooks/messagebroker_confluentkafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,12 @@ def wrap_Producer_produce(wrapped, instance, args, kwargs):
destination_type="Topic",
destination_name=topic or "Default",
source=wrapped,
) as trace:
dt_headers = {k: v.encode("utf-8") for k, v in trace.generate_request_headers(transaction)}
):
dt_headers = {k: v.encode("utf-8") for k, v in MessageTrace.generate_request_headers(transaction)}
# headers can be a list of tuples or a dict so convert to dict for consistency.
dt_headers.update(dict(headers) if headers else {})
if headers:
dt_headers.update(dict(headers))

try:
return wrapped(topic, headers=dt_headers, *args, **kwargs)
except Exception as error:
Expand Down
13 changes: 9 additions & 4 deletions newrelic/hooks/messagebroker_kafkapython.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,16 @@ def wrap_KafkaProducer_send(wrapped, instance, args, kwargs):
destination_name=topic or "Default",
source=wrapped,
terminal=False,
) as trace:
dt_headers = [(k, v.encode("utf-8")) for k, v in trace.generate_request_headers(transaction)]
headers.extend(dt_headers)
):
dt_headers = [(k, v.encode("utf-8")) for k, v in MessageTrace.generate_request_headers(transaction)]
# headers can be a list of tuples or a dict so convert to dict for consistency.
if headers:
dt_headers.extend(headers)

try:
return wrapped(topic, value=value, key=key, headers=headers, partition=partition, timestamp_ms=timestamp_ms)
return wrapped(
topic, value=value, key=key, headers=dt_headers, partition=partition, timestamp_ms=timestamp_ms
)
except Exception:
notice_error()
raise
Expand Down
22 changes: 21 additions & 1 deletion tests/messagebroker_confluentkafka/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import time
import threading
import time

import pytest
from conftest import cache_kafka_producer_headers
Expand All @@ -28,6 +28,7 @@
)

from newrelic.api.background_task import background_task
from newrelic.api.function_trace import FunctionTrace
from newrelic.common.object_names import callable_name
from newrelic.packages import six

Expand Down Expand Up @@ -137,6 +138,25 @@ def test():
test()


def test_distributed_tracing_headers_under_terminal(topic, send_producer_message):
@validate_transaction_metrics(
"test_distributed_tracing_headers_under_terminal",
rollup_metrics=[
("Supportability/TraceContext/Create/Success", 1),
("Supportability/DistributedTrace/CreatePayload/Success", 1),
],
background_task=True,
)
@background_task(name="test_distributed_tracing_headers_under_terminal")
@cache_kafka_producer_headers()
@validate_messagebroker_headers
def test():
with FunctionTrace(name="terminal_trace", terminal=True):
send_producer_message()

test()


def test_producer_errors(topic, producer, monkeypatch):
if hasattr(producer, "_value_serializer"):
# Remove serializer to intentionally cause a type error in underlying producer implementation
Expand Down
20 changes: 20 additions & 0 deletions tests/messagebroker_kafkapython/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
)

from newrelic.api.background_task import background_task
from newrelic.api.function_trace import FunctionTrace
from newrelic.common.object_names import callable_name
from newrelic.packages import six

Expand Down Expand Up @@ -70,6 +71,25 @@ def test():
test()


def test_distributed_tracing_headers_under_terminal(topic, send_producer_message):
@validate_transaction_metrics(
"test_distributed_tracing_headers_under_terminal",
rollup_metrics=[
("Supportability/TraceContext/Create/Success", 1),
("Supportability/DistributedTrace/CreatePayload/Success", 1),
],
background_task=True,
)
@background_task(name="test_distributed_tracing_headers_under_terminal")
@cache_kafka_producer_headers
@validate_messagebroker_headers
def test():
with FunctionTrace(name="terminal_trace", terminal=True):
send_producer_message()

test()


def test_producer_errors(topic, producer, monkeypatch):
monkeypatch.setitem(producer.config, "value_serializer", None)
monkeypatch.setitem(producer.config, "key_serializer", None)
Expand Down
Loading