|
| 1 | +import contextvars |
| 2 | +from typing import Optional |
| 3 | +from typing import Union |
| 4 | + |
| 5 | +from ddtrace._trace.context import Context |
| 6 | +from ddtrace._trace.provider import DefaultContextProvider |
| 7 | +from ddtrace._trace.span import Span |
| 8 | +from ddtrace.ext import SpanTypes |
| 9 | + |
| 10 | + |
| 11 | +ContextTypeValue = Optional[Union[Context, Span]] |
| 12 | + |
| 13 | + |
| 14 | +_DD_LLMOBS_CONTEXTVAR: contextvars.ContextVar[ContextTypeValue] = contextvars.ContextVar( |
| 15 | + "datadog_llmobs_contextvar", |
| 16 | + default=None, |
| 17 | +) |
| 18 | + |
| 19 | + |
| 20 | +class LLMObsContextProvider(DefaultContextProvider): |
| 21 | + """Context provider that retrieves contexts from a context variable. |
| 22 | + It is suitable for synchronous programming and for asynchronous executors |
| 23 | + that support contextvars. |
| 24 | + """ |
| 25 | + |
| 26 | + def __init__(self) -> None: |
| 27 | + super(DefaultContextProvider, self).__init__() |
| 28 | + _DD_LLMOBS_CONTEXTVAR.set(None) |
| 29 | + |
| 30 | + def _has_active_context(self) -> bool: |
| 31 | + """Returns whether there is an active context in the current execution.""" |
| 32 | + ctx = _DD_LLMOBS_CONTEXTVAR.get() |
| 33 | + return ctx is not None |
| 34 | + |
| 35 | + def _update_active(self, span: Span) -> Optional[Span]: |
| 36 | + """Updates the active LLMObs span. |
| 37 | + The active span is updated to be the span's closest unfinished LLMObs ancestor span. |
| 38 | + """ |
| 39 | + if not span.finished: |
| 40 | + return span |
| 41 | + new_active: Optional[Span] = span |
| 42 | + while new_active and new_active.finished: |
| 43 | + new_active = new_active._parent |
| 44 | + if new_active and not new_active.finished and new_active.span_type == SpanTypes.LLM: |
| 45 | + break |
| 46 | + self.activate(new_active) |
| 47 | + return new_active |
| 48 | + |
| 49 | + def activate(self, ctx: ContextTypeValue) -> None: |
| 50 | + """Makes the given context active in the current execution.""" |
| 51 | + _DD_LLMOBS_CONTEXTVAR.set(ctx) |
| 52 | + super(DefaultContextProvider, self).activate(ctx) |
| 53 | + |
| 54 | + def active(self) -> ContextTypeValue: |
| 55 | + """Returns the active span or context for the current execution.""" |
| 56 | + item = _DD_LLMOBS_CONTEXTVAR.get() |
| 57 | + if isinstance(item, Span): |
| 58 | + return self._update_active(item) |
| 59 | + return item |
0 commit comments