Skip to content

Commit fa43b5e

Browse files
committed
llmobs-specific context manager
1 parent 25e7e2e commit fa43b5e

14 files changed

+302
-277
lines changed

ddtrace/contrib/internal/trace_utils.py

+1
Original file line numberDiff line numberDiff line change
@@ -595,6 +595,7 @@ def activate_distributed_headers(tracer, int_config=None, request_headers=None,
595595
# We have parsed a trace id from headers, and we do not already
596596
# have a context with the same trace id active
597597
tracer.context_provider.activate(context)
598+
core.dispatch("http.activate_distributed_headers", (request_headers, context))
598599

599600

600601
def _flatten(

ddtrace/llmobs/_constants.py

+2
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@
4545
DROPPED_IO_COLLECTION_ERROR = "dropped_io"
4646
DROPPED_VALUE_TEXT = "[This value has been dropped because this span's size exceeds the 1MB size limit.]"
4747

48+
ROOT_PARENT_ID = "undefined"
49+
4850
# Set for traces of evaluator integrations e.g. `runner.integration:ragas`.
4951
# Used to differentiate traces of Datadog-run operations vs user-application operations.
5052
RUNNER_IS_INTEGRATION_SPAN_TAG = "runner.integration"

ddtrace/llmobs/_context.py

+59
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
import contextvars
2+
from typing import Optional
3+
from typing import Union
4+
5+
from ddtrace._trace.context import Context
6+
from ddtrace._trace.provider import DefaultContextProvider
7+
from ddtrace._trace.span import Span
8+
from ddtrace.ext import SpanTypes
9+
10+
11+
ContextTypeValue = Optional[Union[Context, Span]]
12+
13+
14+
_DD_LLMOBS_CONTEXTVAR: contextvars.ContextVar[ContextTypeValue] = contextvars.ContextVar(
15+
"datadog_llmobs_contextvar",
16+
default=None,
17+
)
18+
19+
20+
class LLMObsContextProvider(DefaultContextProvider):
21+
"""Context provider that retrieves contexts from a context variable.
22+
It is suitable for synchronous programming and for asynchronous executors
23+
that support contextvars.
24+
"""
25+
26+
def __init__(self) -> None:
27+
super(DefaultContextProvider, self).__init__()
28+
_DD_LLMOBS_CONTEXTVAR.set(None)
29+
30+
def _has_active_context(self) -> bool:
31+
"""Returns whether there is an active context in the current execution."""
32+
ctx = _DD_LLMOBS_CONTEXTVAR.get()
33+
return ctx is not None
34+
35+
def _update_active(self, span: Span) -> Optional[Span]:
36+
"""Updates the active LLMObs span.
37+
The active span is updated to be the span's closest unfinished LLMObs ancestor span.
38+
"""
39+
if not span.finished:
40+
return span
41+
new_active: Optional[Span] = span
42+
while new_active and new_active.finished:
43+
new_active = new_active._parent
44+
if new_active and not new_active.finished and new_active.span_type == SpanTypes.LLM:
45+
break
46+
self.activate(new_active)
47+
return new_active
48+
49+
def activate(self, ctx: ContextTypeValue) -> None:
50+
"""Makes the given context active in the current execution."""
51+
_DD_LLMOBS_CONTEXTVAR.set(ctx)
52+
super(DefaultContextProvider, self).activate(ctx)
53+
54+
def active(self) -> ContextTypeValue:
55+
"""Returns the active span or context for the current execution."""
56+
item = _DD_LLMOBS_CONTEXTVAR.get()
57+
if isinstance(item, Span):
58+
return self._update_active(item)
59+
return item

ddtrace/llmobs/_integrations/base.py

+10-18
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,8 @@
1818
from ddtrace.internal.telemetry import telemetry_writer
1919
from ddtrace.internal.telemetry.constants import TELEMETRY_NAMESPACE
2020
from ddtrace.internal.utils.formats import asbool
21-
from ddtrace.llmobs._constants import PARENT_ID_KEY
22-
from ddtrace.llmobs._constants import PROPAGATED_PARENT_ID_KEY
2321
from ddtrace.llmobs._llmobs import LLMObs
2422
from ddtrace.llmobs._log_writer import V2LogWriter
25-
from ddtrace.llmobs._utils import _get_llmobs_parent_id
2623
from ddtrace.settings import IntegrationConfig
2724
from ddtrace.trace import Pin
2825
from ddtrace.trace import Span
@@ -132,21 +129,16 @@ def trace(self, pin: Pin, operation_id: str, submit_to_llmobs: bool = False, **k
132129
span.set_tag(_SPAN_MEASURED_KEY)
133130
self._set_base_span_tags(span, **kwargs)
134131
if submit_to_llmobs and self.llmobs_enabled:
135-
if span.get_tag(PROPAGATED_PARENT_ID_KEY) is None:
136-
# For non-distributed traces or spans in the first service of a distributed trace,
137-
# The LLMObs parent ID tag is not set at span start time. We need to manually set the parent ID tag now
138-
# in these cases to avoid conflicting with the later propagated tags.
139-
parent_id = _get_llmobs_parent_id(span) or "undefined"
140-
span._set_ctx_item(PARENT_ID_KEY, str(parent_id))
141-
telemetry_writer.add_count_metric(
142-
namespace=TELEMETRY_NAMESPACE.MLOBS,
143-
name="span.start",
144-
value=1,
145-
tags=(
146-
("integration", self._integration_name),
147-
("autoinstrumented", "true"),
148-
),
149-
)
132+
LLMObs._instance._activate_llmobs_span(span)
133+
telemetry_writer.add_count_metric(
134+
namespace=TELEMETRY_NAMESPACE.MLOBS,
135+
name="span.start",
136+
value=1,
137+
tags=(
138+
("integration", self._integration_name),
139+
("autoinstrumented", "true"),
140+
),
141+
)
150142
return span
151143

152144
@classmethod

ddtrace/llmobs/_integrations/bedrock.py

+2-6
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,16 @@
44
from typing import Optional
55

66
from ddtrace.internal.logger import get_logger
7+
from ddtrace.llmobs import LLMObs
78
from ddtrace.llmobs._constants import INPUT_MESSAGES
89
from ddtrace.llmobs._constants import METADATA
910
from ddtrace.llmobs._constants import METRICS
1011
from ddtrace.llmobs._constants import MODEL_NAME
1112
from ddtrace.llmobs._constants import MODEL_PROVIDER
1213
from ddtrace.llmobs._constants import OUTPUT_MESSAGES
13-
from ddtrace.llmobs._constants import PARENT_ID_KEY
14-
from ddtrace.llmobs._constants import PROPAGATED_PARENT_ID_KEY
1514
from ddtrace.llmobs._constants import SPAN_KIND
1615
from ddtrace.llmobs._integrations import BaseLLMIntegration
1716
from ddtrace.llmobs._integrations.utils import get_llmobs_metrics_tags
18-
from ddtrace.llmobs._utils import _get_llmobs_parent_id
1917
from ddtrace.trace import Span
2018

2119

@@ -34,9 +32,7 @@ def _llmobs_set_tags(
3432
operation: str = "",
3533
) -> None:
3634
"""Extract prompt/response tags from a completion and set them as temporary "_ml_obs.*" tags."""
37-
if span.get_tag(PROPAGATED_PARENT_ID_KEY) is None:
38-
parent_id = _get_llmobs_parent_id(span) or "undefined"
39-
span._set_ctx_item(PARENT_ID_KEY, parent_id)
35+
LLMObs._instance._activate_llmobs_span(span)
4036
parameters = {}
4137
if span.get_tag("bedrock.request.temperature"):
4238
parameters["temperature"] = float(span.get_tag("bedrock.request.temperature") or 0.0)

ddtrace/llmobs/_integrations/langgraph.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,13 @@
88
from ddtrace.llmobs._constants import INPUT_VALUE
99
from ddtrace.llmobs._constants import NAME
1010
from ddtrace.llmobs._constants import OUTPUT_VALUE
11+
from ddtrace.llmobs._constants import PARENT_ID_KEY
12+
from ddtrace.llmobs._constants import ROOT_PARENT_ID
1113
from ddtrace.llmobs._constants import SPAN_KIND
1214
from ddtrace.llmobs._constants import SPAN_LINKS
1315
from ddtrace.llmobs._integrations.base import BaseLLMIntegration
1416
from ddtrace.llmobs._integrations.utils import format_langchain_io
1517
from ddtrace.llmobs._utils import _get_attr
16-
from ddtrace.llmobs._utils import _get_llmobs_parent_id
1718
from ddtrace.llmobs._utils import _get_nearest_llmobs_ancestor
1819
from ddtrace.trace import Span
1920
from ddtrace.trace import tracer
@@ -154,7 +155,7 @@ def _default_span_link(span: Span):
154155
the span is linked to its parent's input.
155156
"""
156157
return {
157-
"span_id": str(_get_llmobs_parent_id(span)) or "undefined",
158+
"span_id": span._get_ctx_item(PARENT_ID_KEY) or ROOT_PARENT_ID,
158159
"trace_id": "{:x}".format(span.trace_id),
159160
"attributes": {"from": "input", "to": "input"},
160161
}

0 commit comments

Comments
 (0)