-
Notifications
You must be signed in to change notification settings - Fork 431
/
Copy pathpatch.py
91 lines (69 loc) · 3.12 KB
/
patch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import os
import wrapt
import yaaredis
from ddtrace import config
from ddtrace._trace.utils_redis import _instrument_redis_cmd
from ddtrace._trace.utils_redis import _instrument_redis_execute_pipeline
from ddtrace.contrib.internal.redis_utils import _run_redis_command_async
from ddtrace.internal.schema import schematize_service_name
from ddtrace.internal.utils.deprecations import DDTraceDeprecationWarning
from ddtrace.internal.utils.formats import CMD_MAX_LEN
from ddtrace.internal.utils.formats import asbool
from ddtrace.internal.utils.formats import stringify_cache_args
from ddtrace.internal.utils.wrappers import unwrap
from ddtrace.trace import Pin
from ddtrace.vendor.debtcollector import deprecate
config._add(
"yaaredis",
dict(
_default_service=schematize_service_name("redis"),
cmd_max_length=int(os.getenv("DD_YAAREDIS_CMD_MAX_LENGTH", CMD_MAX_LEN)),
resource_only_command=asbool(os.getenv("DD_REDIS_RESOURCE_ONLY_COMMAND", True)),
),
)
def get_version():
# type: () -> str
return getattr(yaaredis, "__version__", "")
def patch():
"""Patch the instrumented methods"""
deprecate(
prefix="The yaaredis module is deprecated.",
message="The yaaredis module is deprecated and will be deleted.",
category=DDTraceDeprecationWarning,
removal_version="3.0.0",
)
if getattr(yaaredis, "_datadog_patch", False):
return
yaaredis._datadog_patch = True
_w = wrapt.wrap_function_wrapper
_w("yaaredis.client", "StrictRedis.execute_command", traced_execute_command)
_w("yaaredis.client", "StrictRedis.pipeline", traced_pipeline)
_w("yaaredis.pipeline", "StrictPipeline.execute", traced_execute_pipeline)
_w("yaaredis.pipeline", "StrictPipeline.immediate_execute_command", traced_execute_command)
Pin().onto(yaaredis.StrictRedis)
def unpatch():
if getattr(yaaredis, "_datadog_patch", False):
yaaredis._datadog_patch = False
unwrap(yaaredis.client.StrictRedis, "execute_command")
unwrap(yaaredis.client.StrictRedis, "pipeline")
unwrap(yaaredis.pipeline.StrictPipeline, "execute")
unwrap(yaaredis.pipeline.StrictPipeline, "immediate_execute_command")
async def traced_execute_command(func, instance, args, kwargs):
pin = Pin.get_from(instance)
if not pin or not pin.enabled():
return await func(*args, **kwargs)
with _instrument_redis_cmd(pin, config.yaaredis, instance, args) as ctx:
return await _run_redis_command_async(ctx=ctx, func=func, args=args, kwargs=kwargs)
async def traced_pipeline(func, instance, args, kwargs):
pipeline = await func(*args, **kwargs)
pin = Pin.get_from(instance)
if pin:
pin.onto(pipeline)
return pipeline
async def traced_execute_pipeline(func, instance, args, kwargs):
pin = Pin.get_from(instance)
if not pin or not pin.enabled():
return await func(*args, **kwargs)
cmds = [stringify_cache_args(c, cmd_max_len=config.yaaredis.cmd_max_length) for c, _ in instance.command_stack]
with _instrument_redis_execute_pipeline(pin, config.yaaredis, cmds, instance):
return await func(*args, **kwargs)