Skip to content

Commit

Permalink
Basic_consume callback parameter should be called on_message_callback (
Browse files Browse the repository at this point in the history
…#341)

* fixing issue with kwargs, when customers were calling basic_consume with kwargs the instrumentation was failing as the callback parameter was having a different name in the instrumentation

* remove duplicated code

* added popping queue, callback from args
  • Loading branch information
pdimitra authored Oct 27, 2021
1 parent f3d3ab8 commit 1005ad0
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 9 deletions.
16 changes: 9 additions & 7 deletions instana/instrumentation/pika.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,11 @@ def _bind_args(exchange, routing_key, body, properties=None, *args, **kwargs):


def basic_get_with_instana(wrapped, instance, args, kwargs):
def _bind_args(queue, callback, *args, **kwargs):
return (queue, callback, args, kwargs)
def _bind_args(*args, **kwargs):
args = list(args)
queue = kwargs.pop('queue', None) or args.pop(0)
callback = kwargs.pop('callback', None) or kwargs.pop('on_message_callback', None) or args.pop(0)
return (queue, callback, tuple(args), kwargs)

queue, callback, args, kwargs = _bind_args(*args, **kwargs)

Expand All @@ -102,13 +105,12 @@ def _cb_wrapper(channel, method, properties, body):
args = (queue, _cb_wrapper) + args
return wrapped(*args, **kwargs)


@wrapt.patch_function_wrapper('pika.adapters.blocking_connection', 'BlockingChannel.basic_consume')
def basic_consume_with_instana(wrapped, instance, args, kwargs):
def _bind_args(queue, on_consume_callback, *args, **kwargs):
return (queue, on_consume_callback, args, kwargs)
def _bind_args(queue, on_message_callback, *args, **kwargs):
return (queue, on_message_callback, args, kwargs)

queue, on_consume_callback, args, kwargs = _bind_args(*args, **kwargs)
queue, on_message_callback, args, kwargs = _bind_args(*args, **kwargs)

def _cb_wrapper(channel, method, properties, body):
parent_span = tracer.extract(opentracing.Format.HTTP_HEADERS, properties.headers,
Expand All @@ -123,7 +125,7 @@ def _cb_wrapper(channel, method, properties, body):
logger.debug("basic_consume_with_instana: ", exc_info=True)

try:
on_consume_callback(channel, method, properties, body)
on_message_callback(channel, method, properties, body)
except Exception as e:
scope.span.log_exception(e)
raise
Expand Down
2 changes: 1 addition & 1 deletion instana/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@

# Module version file. Used by setup.py and snapshot reporting.

VERSION = '1.36.0'
VERSION = '1.36.1'
2 changes: 1 addition & 1 deletion tests/clients/test_pika.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ def test_basic_consume_with_trace_context(self, _unused):

cb = mock.Mock()

self.obj.basic_consume("test.queue", cb, consumer_tag="test")
self.obj.basic_consume(queue="test.queue", on_message_callback=cb, consumer_tag="test")
self.obj._on_deliver(method_frame, header_frame, body)

spans = self.recorder.queued_spans()
Expand Down

0 comments on commit 1005ad0

Please sign in to comment.