Skip to content

Commit

Permalink
start receive in run_once
Browse files Browse the repository at this point in the history
  • Loading branch information
johanneswuerbach committed Mar 29, 2021
1 parent 404c012 commit 7dcf39a
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ def require_patches
require_relative 'patches/channel'
require_relative 'patches/consumer'
require_relative 'patches/queue'
require_relative 'patches/reader_loop'
end

def patch
::Bunny::Channel.prepend(Patches::Channel)
::Bunny::Consumer.prepend(Patches::Consumer)
::Bunny::Queue.prepend(Patches::Queue)
::Bunny::ReaderLoop.prepend(Patches::ReaderLoop)
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ module Bunny
# See https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/trace/semantic_conventions/messaging.md#messaging-attributes
module PatchHelpers
def self.with_send_span(channel, tracer, exchange, routing_key, &block)
attributes = basic_attributes(channel, exchange, routing_key)
attributes = basic_attributes(channel, channel.connection, exchange, routing_key)
destination = destination_name(exchange, routing_key)

tracer.in_span("#{destination} send", attributes: attributes, kind: :producer, &block)
Expand Down Expand Up @@ -44,15 +44,34 @@ def self.extract_context(properties)
[parent_context, links]
end

def self.basic_attributes(channel, exchange, routing_key)
def self.inject_context_into_property(properties, key)
properties[key] ||= {}
OpenTelemetry.propagation.inject(properties[key])
end

def self.trace_enrich_receive_span(span, channel, delivery_info, properties)
exchange = delivery_info.exchange
routing_key = delivery_info.routing_key
destination = OpenTelemetry::Instrumentation::Bunny::PatchHelpers.destination_name(exchange, routing_key)
destination_kind = OpenTelemetry::Instrumentation::Bunny::PatchHelpers.destination_kind(channel, exchange)
span.name = "#{destination} receive"
span['messaging.destination'] = exchange
span['messaging.destination_kind'] = destination_kind
span['messaging.rabbitmq.routing_key'] = routing_key if routing_key
span['messaging.operation'] = 'receive'

inject_context_into_property(properties, :tracer_receive_headers)
end

def self.basic_attributes(channel, transport, exchange, routing_key)
attributes = {
'messaging.system' => 'rabbitmq',
'messaging.destination' => exchange,
'messaging.destination_kind' => destination_kind(channel, exchange),
'messaging.protocol' => 'AMQP',
'messaging.protocol_version' => ::Bunny.protocol_version,
'net.peer.name' => channel.connection.host,
'net.peer.port' => channel.connection.port
'net.peer.name' => transport.host,
'net.peer.port' => transport.port
}
attributes['messaging.rabbitmq.routing_key'] = routing_key if routing_key
attributes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,47 +11,30 @@ module Patches
# The Channel module contains the instrumentation patch the Channel#basic_get, Channel#basic_publish and Channel#handle_frameset methods
module Channel
def basic_get(queue, opts = { manual_ack: false })
attributes = OpenTelemetry::Instrumentation::Bunny::PatchHelpers.basic_attributes(self, '', nil)
attributes = OpenTelemetry::Instrumentation::Bunny::PatchHelpers.basic_attributes(self, connection, '', nil)

tracer.in_span("#{queue} receive", attributes: attributes, kind: :consumer) do |span, _ctx|
delivery_info, properties, payload = super

return [delivery_info, properties, payload] unless delivery_info

exchange = delivery_info.exchange
routing_key = delivery_info.routing_key
destination = OpenTelemetry::Instrumentation::Bunny::PatchHelpers.destination_name(exchange, routing_key)
destination_kind = OpenTelemetry::Instrumentation::Bunny::PatchHelpers.destination_kind(self, exchange)
span.name = "#{destination} receive"
span['messaging.destination'] = exchange
span['messaging.destination_kind'] = destination_kind
span['messaging.rabbitmq.routing_key'] = routing_key if routing_key
span['messaging.operation'] = 'receive'

inject_context_into_property(properties, :tracer_receive_headers)
OpenTelemetry::Instrumentation::Bunny::PatchHelpers.trace_enrich_receive_span(span, self, delivery_info, properties)

[delivery_info, properties, payload]
end
end

def basic_publish(payload, exchange, routing_key, opts = {})
OpenTelemetry::Instrumentation::Bunny::PatchHelpers.with_send_span(self, tracer, exchange, routing_key) do
inject_context_into_property(opts, :headers)
OpenTelemetry::Instrumentation::Bunny::PatchHelpers.inject_context_into_property(opts, :headers)

super(payload, exchange, routing_key, opts)
end
end

# This method is called when rabbitmq pushes messages to subscribed consumers
def handle_frameset(basic_deliver, properties, content)
exchange = basic_deliver.exchange
routing_key = basic_deliver.routing_key
attributes = OpenTelemetry::Instrumentation::Bunny::PatchHelpers.basic_attributes(self, exchange, routing_key)
destination = OpenTelemetry::Instrumentation::Bunny::PatchHelpers.destination_name(exchange, routing_key)

tracer.in_span("#{destination} receive", attributes: attributes, kind: :consumer) do
inject_context_into_property(properties, :tracer_receive_headers)
end
OpenTelemetry::Instrumentation::Bunny::PatchHelpers.trace_enrich_receive_span(OpenTelemetry::Trace.current_span, self, basic_deliver, properties) if basic_deliver

super
end
Expand All @@ -61,11 +44,6 @@ def handle_frameset(basic_deliver, properties, content)
def tracer
Bunny::Instrumentation.instance.tracer
end

def inject_context_into_property(properties, key)
properties[key] ||= {}
OpenTelemetry.propagation.inject(properties[key])
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

module OpenTelemetry
module Instrumentation
module Bunny
module Patches
# The ReaderLoop module contains the instrumentation patch the ReaderLoop#run_once method
module ReaderLoop
def run_once
attributes = OpenTelemetry::Instrumentation::Bunny::PatchHelpers.basic_attributes(nil, @transport, '', nil)
tracer.in_span('Bunny::ReaderLoop#run_once', attributes: attributes, kind: :consumer) do
super
end
end

private

def tracer
Bunny::Instrumentation.instance.tracer
end
end
end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -44,35 +44,39 @@

queue.pop { |_msg| break }

_(spans.size).must_equal(3)
_(spans[0].name).must_equal("#{topic}.ruby.news send")
_(spans[0].kind).must_equal(:producer)
_(spans[0].attributes['messaging.system']).must_equal('rabbitmq')
_(spans[0].attributes['messaging.destination']).must_equal(topic)
_(spans[0].attributes['messaging.destination_kind']).must_equal('topic')
_(spans[0].attributes['messaging.protocol']).must_equal('AMQP')
_(spans[0].attributes['messaging.protocol_version']).must_equal('0.9.1')
_(spans[0].attributes['messaging.rabbitmq.routing_key']).must_equal('ruby.news')
_(spans[0].attributes['net.peer.name']).must_equal('rabbitmq')
_(spans[0].attributes['net.peer.port']).must_equal(5672)

_(spans[1].name).must_equal("#{topic}.ruby.news receive")
_(spans[1].kind).must_equal(:consumer)
_(spans[1].attributes['messaging.system']).must_equal('rabbitmq')
_(spans[1].attributes['messaging.destination']).must_equal(topic)
_(spans[1].attributes['messaging.destination_kind']).must_equal('topic')
_(spans[1].attributes['messaging.protocol']).must_equal('AMQP')
_(spans[1].attributes['messaging.protocol_version']).must_equal('0.9.1')
_(spans[1].attributes['messaging.rabbitmq.routing_key']).must_equal('ruby.news')
_(spans[1].attributes['net.peer.name']).must_equal('rabbitmq')
_(spans[1].attributes['net.peer.port']).must_equal(5672)

_(spans[2].name).must_equal("#{topic}.ruby.news process")
_(spans[2].kind).must_equal(:consumer)
_(spans[2].trace_id).must_equal(spans[1].trace_id)

linked_span_context = spans[2].links.first.span_context
_(linked_span_context.trace_id).must_equal(spans[0].trace_id)
_(linked_span_context.span_id).must_equal(spans[0].span_id)
_(spans.size >= 3).must_equal(true)

send_span = spans.find { |span| span.name == "#{topic}.ruby.news send" }
_(send_span).wont_be_nil
_(send_span.kind).must_equal(:producer)
_(send_span.attributes['messaging.system']).must_equal('rabbitmq')
_(send_span.attributes['messaging.destination']).must_equal(topic)
_(send_span.attributes['messaging.destination_kind']).must_equal('topic')
_(send_span.attributes['messaging.protocol']).must_equal('AMQP')
_(send_span.attributes['messaging.protocol_version']).must_equal('0.9.1')
_(send_span.attributes['messaging.rabbitmq.routing_key']).must_equal('ruby.news')
_(send_span.attributes['net.peer.name']).must_equal('rabbitmq')
_(send_span.attributes['net.peer.port']).must_equal(5672)

receive_span = spans.find { |span| span.name == "#{topic}.ruby.news receive" }
_(receive_span).wont_be_nil
_(receive_span.kind).must_equal(:consumer)
_(receive_span.attributes['messaging.system']).must_equal('rabbitmq')
_(receive_span.attributes['messaging.destination']).must_equal(topic)
_(receive_span.attributes['messaging.destination_kind']).must_equal('topic')
_(receive_span.attributes['messaging.protocol']).must_equal('AMQP')
_(receive_span.attributes['messaging.protocol_version']).must_equal('0.9.1')
_(receive_span.attributes['messaging.rabbitmq.routing_key']).must_equal('ruby.news')
_(receive_span.attributes['net.peer.name']).must_equal('rabbitmq')
_(receive_span.attributes['net.peer.port']).must_equal(5672)

process_span = spans.find { |span| span.name == "#{topic}.ruby.news process" }
_(process_span).wont_be_nil
_(process_span.kind).must_equal(:consumer)
_(process_span.trace_id).must_equal(receive_span.trace_id)

linked_span_context = process_span.links.first.span_context
_(linked_span_context.trace_id).must_equal(send_span.trace_id)
_(linked_span_context.span_id).must_equal(send_span.span_id)
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,24 @@

consumer.cancel

_(spans.size).must_equal(3)
_(spans[0].name).must_equal("#{topic}.ruby.news send")
_(spans[0].kind).must_equal(:producer)
_(spans.size >= 3).must_equal(true)

_(spans[1].name).must_equal("#{topic}.ruby.news receive")
_(spans[1].kind).must_equal(:consumer)
send_span = spans.find { |span| span.name == "#{topic}.ruby.news send" }
_(send_span).wont_be_nil
_(send_span.kind).must_equal(:producer)

_(spans[2].name).must_equal("#{topic}.ruby.news process")
_(spans[2].kind).must_equal(:consumer)
_(spans[2].trace_id).must_equal(spans[1].trace_id)
receive_span = spans.find { |span| span.name == "#{topic}.ruby.news receive" }
_(receive_span).wont_be_nil
_(receive_span.name).must_equal("#{topic}.ruby.news receive")
_(receive_span.kind).must_equal(:consumer)

linked_span_context = spans[2].links.first.span_context
_(linked_span_context.trace_id).must_equal(spans[0].trace_id)
_(linked_span_context.span_id).must_equal(spans[0].span_id)
process_span = spans.find { |span| span.name == "#{topic}.ruby.news process" }
_(process_span).wont_be_nil
_(process_span.kind).must_equal(:consumer)
_(process_span.trace_id).must_equal(receive_span.trace_id)

linked_span_context = process_span.links.first.span_context
_(linked_span_context.trace_id).must_equal(send_span.trace_id)
_(linked_span_context.span_id).must_equal(send_span.span_id)
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,30 @@

queue.pop { |_delivery_info, _metadata, _payload| break }

_(spans.last.name).must_equal(".#{queue_name} process")
_(spans.last.kind).must_equal(:consumer)
send_span = spans.find { |span| span.name == ".#{queue_name} send" }
_(send_span).wont_be_nil

linked_span_context = spans.last.links.first.span_context
_(linked_span_context.trace_id).must_equal(spans[0].trace_id)
receive_span = spans.find { |span| span.name == ".#{queue_name} receive" }
_(receive_span).wont_be_nil

process_span = spans.find { |span| span.name == ".#{queue_name} process" }
_(process_span).wont_be_nil
_(process_span.kind).must_equal(:consumer)

linked_span_context = process_span.links.first.span_context
_(linked_span_context.trace_id).must_equal(send_span.trace_id)
end

it 'traces messages returned' do
queue.publish('Hello, opentelemetry!')

queue.pop

_(spans.last.name).must_equal(".#{queue_name} receive")
receive_span = spans.find { |span| span.name == ".#{queue_name} receive" }
_(receive_span).wont_be_nil

process_span = spans.find { |span| span.name == ".#{queue_name} process" }
_(process_span).must_be_nil
end
end
end

0 comments on commit 7dcf39a

Please sign in to comment.