Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(bedrock): [MLOB-2181] fix bedrock token metrics #12230

Draft
wants to merge 19 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
3ac7025
chore(openai): remove remaining v0 snapshot files (#12218)
Yun-Kim Feb 4, 2025
1245846
chore(tracing): remove deprecated properties and parameters (#12199)
mabdinur Feb 4, 2025
17c60dd
fix(tracing): add flag for aiohttp that disables potential for memory…
ZStriker19 Feb 4, 2025
be50fd7
fix(logging): remove unneeded forksafe unregister to avoid unneeded l…
ZStriker19 Feb 4, 2025
cdaa1e3
chore(openai): drop legacy integration metrics and logs (#12211)
Yun-Kim Feb 4, 2025
3081919
chore(langchain): drop legacy integration metrics and logs (#12220)
Yun-Kim Feb 4, 2025
a2b1d7b
chore(configurations): remove deprecated tracing env vars (#12176)
mabdinur Feb 5, 2025
a8760c6
chore(iast): xss vulnerability for jinja2 template engine (#12183)
avara1986 Feb 5, 2025
a54a55d
chore: remove more 3.7 related code (#12221)
taegyunkim Feb 5, 2025
e5055b7
chore(iast): header source in werkzeug 3.1 (#12213)
avara1986 Feb 5, 2025
b17990b
chore(llmobs): [MLOB-1944] generalize helper for extracting token met…
Kyle-Verhoog Feb 5, 2025
1ad06b3
set bedrock token counts as metrics
ncybul Feb 3, 2025
b4ecff2
update snapshots for bedrock tests
ncybul Feb 4, 2025
9abd718
attempt to fix streamed tests and float vs integer differences
ncybul Feb 4, 2025
13653f1
only set token metrics if defined
ncybul Feb 4, 2025
42289dd
fix minor bug by making sure token counts are defined
ncybul Feb 4, 2025
8931ffa
update token metric name and tests accordingly
ncybul Feb 5, 2025
712e15c
add release note
ncybul Feb 5, 2025
61d8724
handle potentially missing token metrics in tests
ncybul Feb 5, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build_deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
build_wheels:
uses: ./.github/workflows/build_python_3.yml
with:
cibw_build: 'cp37* cp38* cp39* cp310* cp311* cp312* cp313*'
cibw_build: 'cp38* cp39* cp310* cp311* cp312* cp313*'

build_sdist:
name: Build source distribution
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/bm/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def process_trace(self, trace):


def drop_traces(tracer):
tracer.configure(settings={"FILTERS": [_DropTraces()]})
tracer.configure(trace_processors=[_DropTraces()])


def drop_telemetry_events():
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/rate_limiter/scenario.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ def _(loops):
windows = [start + (i * self.time_window) for i in range(self.num_windows)]
per_window = math.floor(loops / self.num_windows)

for window in windows:
for _ in windows:
for _ in range(per_window):
rate_limiter.is_allowed(window)
rate_limiter.is_allowed()

yield _
14 changes: 5 additions & 9 deletions ddtrace/_trace/sampling_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
from ddtrace.internal.glob_matching import GlobMatcher
from ddtrace.internal.logger import get_logger
from ddtrace.internal.utils.cache import cachedmethod
from ddtrace.internal.utils.deprecations import DDTraceDeprecationWarning
from ddtrace.vendor.debtcollector import deprecate


if TYPE_CHECKING: # pragma: no cover
Expand Down Expand Up @@ -210,14 +208,12 @@ def choose_matcher(self, prop):
# We currently support the ability to pass in a function, a regular expression, or a string
# If a string is passed in we create a GlobMatcher to handle the matching
if callable(prop) or isinstance(prop, pattern_type):
# deprecated: passing a function or a regular expression'
deprecate(
"Using methods or regular expressions for SamplingRule matching is deprecated. ",
message="Please move to passing in a string for Glob matching.",
removal_version="3.0.0",
category=DDTraceDeprecationWarning,
log.error(
"Using methods or regular expressions for SamplingRule matching is not supported: %s ."
"Please move to passing in a string for Glob matching.",
str(prop),
)
return prop
return "None"
# Name and Resource will never be None, but service can be, since we str()
# whatever we pass into the GlobMatcher, we can just use its matching
elif prop is None:
Expand Down
25 changes: 0 additions & 25 deletions ddtrace/_trace/span.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@
from ddtrace.internal.logger import get_logger
from ddtrace.internal.sampling import SamplingMechanism
from ddtrace.internal.sampling import set_sampling_decision_maker
from ddtrace.internal.utils.deprecations import DDTraceDeprecationWarning
from ddtrace.vendor.debtcollector import deprecate


_NUMERIC_TAGS = (_ANALYTICS_SAMPLE_RATE_KEY,)
Expand Down Expand Up @@ -279,29 +277,6 @@ def duration(self) -> Optional[float]:
def duration(self, value: float) -> None:
self.duration_ns = int(value * 1e9)

@property
def sampled(self) -> Optional[bool]:
deprecate(
"span.sampled is deprecated and will be removed in a future version of the tracer.",
message="""span.sampled references the state of span.context.sampling_priority.
Please use span.context.sampling_priority instead to check if a span is sampled.""",
category=DDTraceDeprecationWarning,
)
if self.context.sampling_priority is None:
# this maintains original span.sampled behavior, where all spans would start
# with span.sampled = True until sampling runs
return True
return self.context.sampling_priority > 0

@sampled.setter
def sampled(self, value: bool) -> None:
deprecate(
"span.sampled is deprecated and will be removed in a future version of the tracer.",
message="""span.sampled has a no-op setter.
Please use span.set_tag('manual.keep'/'manual.drop') to keep or drop spans.""",
category=DDTraceDeprecationWarning,
)

def finish(self, finish_time: Optional[float] = None) -> None:
"""Mark the end time of the span and submit it to the tracer.
If the span has already been finished don't do anything.
Expand Down
11 changes: 8 additions & 3 deletions ddtrace/_trace/trace_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,8 +595,10 @@ def _on_botocore_patched_bedrock_api_call_success(ctx, reqid, latency, input_tok
span = ctx.span
span.set_tag_str("bedrock.response.id", reqid)
span.set_tag_str("bedrock.response.duration", latency)
span.set_tag_str("bedrock.usage.prompt_tokens", input_token_count)
span.set_tag_str("bedrock.usage.completion_tokens", output_token_count)
if input_token_count:
span.set_metric("bedrock.response.usage.prompt_tokens", int(input_token_count))
if output_token_count:
span.set_metric("bedrock.response.usage.completion_tokens", int(output_token_count))


def _propagate_context(ctx, headers):
Expand Down Expand Up @@ -640,7 +642,10 @@ def _on_botocore_bedrock_process_response(
integration = ctx["bedrock_integration"]
if metadata is not None:
for k, v in metadata.items():
span.set_tag_str("bedrock.{}".format(k), str(v))
if k in ["usage.completion_tokens", "usage.prompt_tokens"] and v:
span.set_metric("bedrock.response{}".format(k), int(v))
else:
span.set_tag_str("bedrock.response{}".format(k), str(v))
if "embed" in model_name:
span.set_metric("bedrock.response.embedding_length", len(formatted_response["text"][0]))
span.finish()
Expand Down
174 changes: 11 additions & 163 deletions ddtrace/_trace/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from ddtrace._trace.processor import TraceProcessor
from ddtrace._trace.processor import TraceSamplingProcessor
from ddtrace._trace.processor import TraceTagsProcessor
from ddtrace._trace.provider import BaseContextProvider
from ddtrace._trace.provider import DefaultContextProvider
from ddtrace._trace.sampler import BasePrioritySampler
from ddtrace._trace.sampler import BaseSampler
Expand Down Expand Up @@ -200,7 +201,7 @@ def __init__(
self,
url: Optional[str] = None,
dogstatsd_url: Optional[str] = None,
context_provider: Optional[DefaultContextProvider] = None,
context_provider: Optional[BaseContextProvider] = None,
) -> None:
"""
Create a new ``Tracer`` instance. A global tracer is already initialized
Expand Down Expand Up @@ -328,28 +329,6 @@ def sample(self, span):
else:
log.error("No sampler available to sample span")

@property
def sampler(self):
deprecate(
"tracer.sampler is deprecated and will be removed.",
message="To manually sample call tracer.sample(span) instead.",
category=DDTraceDeprecationWarning,
)
return self._sampler

@sampler.setter
def sampler(self, value):
deprecate(
"Setting a custom sampler is deprecated and will be removed.",
message="""Please use DD_TRACE_SAMPLING_RULES to configure the sampler instead:
https://ddtrace.readthedocs.io/en/stable/configuration.html#DD_TRACE_SAMPLING_RULES""",
category=DDTraceDeprecationWarning,
)
if asm_config._apm_opt_out:
log.warning("Cannot set a custom sampler with Standalone ASM mode")
return
self._sampler = value

def on_start_span(self, func: Callable) -> Callable:
"""Register a function to execute when a span start.

Expand Down Expand Up @@ -441,21 +420,7 @@ def get_log_correlation_context(self, active: Optional[Union[Context, Span]] = N

def configure(
self,
enabled: Optional[bool] = None,
hostname: Optional[str] = None,
port: Optional[int] = None,
uds_path: Optional[str] = None,
https: Optional[bool] = None,
sampler: Optional[BaseSampler] = None,
context_provider: Optional[DefaultContextProvider] = None,
wrap_executor: Optional[Callable] = None,
priority_sampling: Optional[bool] = None,
settings: Optional[Dict[str, Any]] = None,
dogstatsd_url: Optional[str] = None,
writer: Optional[TraceWriter] = None,
partial_flush_enabled: Optional[bool] = None,
partial_flush_min_spans: Optional[int] = None,
api_version: Optional[str] = None,
context_provider: Optional[BaseContextProvider] = None,
compute_stats_enabled: Optional[bool] = None,
appsec_enabled: Optional[bool] = None,
iast_enabled: Optional[bool] = None,
Expand All @@ -472,58 +437,14 @@ def configure(
:param bool appsec_standalone_enabled: When tracing is disabled ensures ASM support is still enabled.
:param List[TraceProcessor] trace_processors: This parameter sets TraceProcessor (ex: TraceFilters).
Trace processors are used to modify and filter traces based on certain criteria.

:param bool enabled: If True, finished traces will be submitted to the API, else they'll be dropped.
This parameter is deprecated and will be removed.
:param str hostname: Hostname running the Trace Agent. This parameter is deprecated and will be removed.
:param int port: Port of the Trace Agent. This parameter is deprecated and will be removed.
:param str uds_path: The Unix Domain Socket path of the agent. This parameter is deprecated and will be removed.
:param bool https: Whether to use HTTPS or HTTP. This parameter is deprecated and will be removed.
:param object sampler: A custom Sampler instance, locally deciding to totally drop the trace or not.
This parameter is deprecated and will be removed.
:param object wrap_executor: callable that is used when a function is decorated with
``Tracer.wrap()``. This is an advanced option that usually doesn't need to be changed
from the default value. This parameter is deprecated and will be removed.
:param priority_sampling: This parameter is deprecated and will be removed in a future version.
:param bool settings: This parameter is deprecated and will be removed.
:param str dogstatsd_url: URL for UDP or Unix socket connection to DogStatsD
This parameter is deprecated and will be removed.
:param TraceWriter writer: This parameter is deprecated and will be removed.
:param bool partial_flush_enabled: This parameter is deprecated and will be removed.
:param bool partial_flush_min_spans: This parameter is deprecated and will be removed.
:param str api_version: This parameter is deprecated and will be removed.
:param bool compute_stats_enabled: This parameter is deprecated and will be removed.
"""
if settings is not None:
deprecate(
"Support for ``tracer.configure(...)`` with the settings parameter is deprecated",
message="Please use the trace_processors parameter instead of settings['FILTERS'].",
version="3.0.0",
category=DDTraceDeprecationWarning,
)
trace_processors = (trace_processors or []) + (settings.get("FILTERS") or [])

return self._configure(
enabled,
hostname,
port,
uds_path,
https,
sampler,
context_provider,
wrap_executor,
priority_sampling,
trace_processors,
dogstatsd_url,
writer,
partial_flush_enabled,
partial_flush_min_spans,
api_version,
compute_stats_enabled,
appsec_enabled,
iast_enabled,
appsec_standalone_enabled,
True,
context_provider=context_provider,
trace_processors=trace_processors,
compute_stats_enabled=compute_stats_enabled,
appsec_enabled=appsec_enabled,
iast_enabled=iast_enabled,
appsec_standalone_enabled=appsec_standalone_enabled,
)

def _configure(
Expand All @@ -534,7 +455,7 @@ def _configure(
uds_path: Optional[str] = None,
https: Optional[bool] = None,
sampler: Optional[BaseSampler] = None,
context_provider: Optional[DefaultContextProvider] = None,
context_provider: Optional[BaseContextProvider] = None,
wrap_executor: Optional[Callable] = None,
priority_sampling: Optional[bool] = None,
trace_processors: Optional[List[TraceProcessor]] = None,
Expand All @@ -547,48 +468,18 @@ def _configure(
appsec_enabled: Optional[bool] = None,
iast_enabled: Optional[bool] = None,
appsec_standalone_enabled: Optional[bool] = None,
log_deprecations: bool = False,
) -> None:
if enabled is not None:
self.enabled = enabled
if log_deprecations:
deprecate(
"Enabling/Disabling tracing after application start is deprecated",
message="Please use DD_TRACE_ENABLED instead.",
version="3.0.0",
category=DDTraceDeprecationWarning,
)

if priority_sampling is not None and log_deprecations:
deprecate(
"Disabling priority sampling is deprecated",
message="Calling `tracer.configure(priority_sampling=....) has no effect",
version="3.0.0",
category=DDTraceDeprecationWarning,
)

if trace_processors is not None:
self._user_trace_processors = trace_processors

if partial_flush_enabled is not None:
self._partial_flush_enabled = partial_flush_enabled
if log_deprecations:
deprecate(
"Configuring partial flushing after application start is deprecated",
message="Please use DD_TRACE_PARTIAL_FLUSH_ENABLED to enable/disable the partial flushing instead.",
version="3.0.0",
category=DDTraceDeprecationWarning,
)

if partial_flush_min_spans is not None:
self._partial_flush_min_spans = partial_flush_min_spans
if log_deprecations:
deprecate(
"Configuring partial flushing after application start is deprecated",
message="Please use DD_TRACE_PARTIAL_FLUSH_MIN_SPANS to set the flushing threshold instead.",
version="3.0.0",
category=DDTraceDeprecationWarning,
)

if appsec_enabled is not None:
asm_config._asm_enabled = appsec_enabled
Expand Down Expand Up @@ -620,33 +511,11 @@ def _configure(
if sampler is not None:
self._sampler = sampler
self._user_sampler = self._sampler
if log_deprecations:
deprecate(
"Configuring custom samplers is deprecated",
message="Please use DD_TRACE_SAMPLING_RULES to configure the sample rates instead",
category=DDTraceDeprecationWarning,
removal_version="3.0.0",
)

if dogstatsd_url is not None:
if log_deprecations:
deprecate(
"Configuring dogstatsd_url after application start is deprecated",
message="Please use DD_DOGSTATSD_URL instead.",
version="3.0.0",
category=DDTraceDeprecationWarning,
)
self._dogstatsd_url = dogstatsd_url

if any(x is not None for x in [hostname, port, uds_path, https]):
if log_deprecations:
deprecate(
"Configuring tracer agent connection after application start is deprecated",
message="Please use DD_TRACE_AGENT_URL instead.",
version="3.0.0",
category=DDTraceDeprecationWarning,
)

# If any of the parts of the URL have updated, merge them with
# the previous writer values.
prev_url_parsed = compat.parse.urlparse(self._agent_url)
Expand All @@ -670,13 +539,6 @@ def _configure(
new_url = None

if compute_stats_enabled is not None:
if log_deprecations:
deprecate(
"Configuring tracer stats computation after application start is deprecated",
message="Please use DD_TRACE_STATS_COMPUTATION_ENABLED instead.",
version="3.0.0",
category=DDTraceDeprecationWarning,
)
self._compute_stats = compute_stats_enabled

try:
Expand All @@ -685,14 +547,6 @@ def _configure(
# It's possible the writer never got started
pass

if api_version is not None and log_deprecations:
deprecate(
"Configuring Tracer API version after application start is deprecated",
message="Please use DD_TRACE_API_VERSION instead.",
version="3.0.0",
category=DDTraceDeprecationWarning,
)

if writer is not None:
self._writer = writer
elif any(x is not None for x in [new_url, api_version, sampler, dogstatsd_url, appsec_enabled]):
Expand Down Expand Up @@ -754,12 +608,6 @@ def _configure(

if wrap_executor is not None:
self._wrap_executor = wrap_executor
if log_deprecations:
deprecate(
"Support for tracer.configure(...) with the wrap_executor parameter is deprecated",
version="3.0.0",
category=DDTraceDeprecationWarning,
)

self._generate_diagnostic_logs()

Expand Down Expand Up @@ -1344,7 +1192,7 @@ def _handle_sampler_update(self, cfg: Config) -> None:
and self._user_sampler
):
# if we get empty configs from rc for both sample rate and rules, we should revert to the user sampler
self.sampler = self._user_sampler
self._sampler = self._user_sampler
return

if cfg._get_source("_trace_sample_rate") != "remote_config" and self._user_sampler:
Expand Down
Loading
Loading