From 3620a7ad7de34c6571b4de59599faef967203f31 Mon Sep 17 00:00:00 2001 From: Daniel Gaias Malagurti Date: Thu, 19 Dec 2024 19:04:21 -0300 Subject: [PATCH 1/2] fix(tracing): resolve OpenTelemetry token type warnings --- .../promptflow/tracing/_trace.py | 136 +++++++++++------- 1 file changed, 86 insertions(+), 50 deletions(-) diff --git a/src/promptflow-tracing/promptflow/tracing/_trace.py b/src/promptflow-tracing/promptflow/tracing/_trace.py index ab53d6c986a..e2fd35e0105 100644 --- a/src/promptflow-tracing/promptflow/tracing/_trace.py +++ b/src/promptflow-tracing/promptflow/tracing/_trace.py @@ -109,56 +109,92 @@ def start_as_current_span( class TokenCollector: - _lock = Lock() - - def __init__(self): - self._span_id_to_tokens = {} - - def collect_openai_tokens(self, span, output): - span_id = span.get_span_context().span_id - if not inspect.isgenerator(output) and hasattr(output, "usage") and output.usage is not None: - tokens = output.usage.dict() - if tokens: - with self._lock: - self._span_id_to_tokens[span_id] = tokens - - def collect_openai_tokens_for_streaming(self, span, inputs, output, is_chat): - span_id = span.get_span_context().span_id - calculator = OpenAIMetricsCalculator() - if is_chat: - tokens = calculator.get_openai_metrics_for_chat_api(inputs, output) - else: - tokens = calculator.get_openai_metrics_for_completion_api(inputs, output) - with self._lock: - self._span_id_to_tokens[span_id] = tokens - - def collect_openai_tokens_for_parent_span(self, span): - tokens = self.try_get_openai_tokens(span.get_span_context().span_id) - if tokens: - if not hasattr(span, "parent") or span.parent is None: - return - parent_span_id = span.parent.span_id - with self._lock: - if parent_span_id in self._span_id_to_tokens: - merged_tokens = {} - for key in set(self._span_id_to_tokens[parent_span_id]) | set(tokens): - parent_value = self._span_id_to_tokens[parent_span_id].get(key, 0) - token_value = tokens.get(key, 0) - if isinstance(parent_value, dict) and isinstance(token_value, dict): - # Handle the case where both values are dictionaries - merged_tokens[key] = {**parent_value, **token_value} - elif isinstance(parent_value, dict) or isinstance(token_value, dict): - # Handle the case where one value is a dictionary and the other is not - merged_tokens[key] = parent_value if isinstance(parent_value, dict) else token_value - else: - merged_tokens[key] = int(parent_value or 0) + int(token_value or 0) - self._span_id_to_tokens[parent_span_id] = merged_tokens - else: - self._span_id_to_tokens[parent_span_id] = tokens - - def try_get_openai_tokens(self, span_id): - with self._lock: - return self._span_id_to_tokens.get(span_id, None) + _lock = Lock() + + def __init__(self): + self._span_id_to_tokens = {} + + def collect_openai_tokens(self, span, output): + span_id = span.get_span_context().span_id + if not inspect.isgenerator(output) and hasattr(output, "usage") and output.usage is not None: + try: + tokens = output.usage.dict() + if tokens: + safe_tokens = { + k: float(v) if isinstance(v, (int, float)) else 0 + for k, v in tokens.items() + } + with self._lock: + self._span_id_to_tokens[span_id] = safe_tokens + except: + logging.warning("Failed to process token usage") + + def collect_openai_tokens_for_streaming(self, span, inputs, output, is_chat): + span_id = span.get_span_context().span_id + calculator = OpenAIMetricsCalculator() + try: + if is_chat: + tokens = calculator.get_openai_metrics_for_chat_api(inputs, output) + else: + tokens = calculator.get_openai_metrics_for_completion_api(inputs, output) + safe_tokens = { + k: float(v) if isinstance(v, (int, float)) else 0 + for k, v in tokens.items() + } + with self._lock: + self._span_id_to_tokens[span_id] = safe_tokens + except Exception as e: + logging.warning(f"Failed to collect streaming tokens: {e}") + + def collect_openai_tokens_for_parent_span(self, span): + def safe_token_value(value): + if value is None: + return 0 + if isinstance(value, dict): + try: + return sum(safe_token_value(v) for v in value.values()) + except: + return 0 + if isinstance(value, (int, float)): + return float(value) + try: + return float(value) + except: + return 0 + + tokens = self.try_get_openai_tokens(span.get_span_context().span_id) + if not tokens: + return + + if not hasattr(span, "parent") or span.parent is None: + return + + parent_span_id = span.parent.span_id + with self._lock: + if parent_span_id in self._span_id_to_tokens: + current_tokens = self._span_id_to_tokens[parent_span_id] + merged_tokens = { + key: safe_token_value(current_tokens.get(key, 0)) + safe_token_value(tokens.get(key, 0)) + for key in set(current_tokens.keys()) | set(tokens.keys()) + } + for field in ['completion_tokens', 'prompt_tokens', 'total_tokens']: + if field not in merged_tokens: + merged_tokens[field] = 0.0 + else: + merged_tokens[field] = float(merged_tokens[field]) + self._span_id_to_tokens[parent_span_id] = merged_tokens + else: + safe_tokens = {k: safe_token_value(v) for k, v in tokens.items()} + for field in ['completion_tokens', 'prompt_tokens', 'total_tokens']: + if field not in safe_tokens: + safe_tokens[field] = 0.0 + else: + safe_tokens[field] = float(safe_tokens[field]) + self._span_id_to_tokens[parent_span_id] = safe_tokens + + def try_get_openai_tokens(self, span_id): + with self._lock: + return self._span_id_to_tokens.get(span_id, None) token_collector = TokenCollector() From 1210f77649e2737bc7f363c5a9619a3cfd8b4ad0 Mon Sep 17 00:00:00 2001 From: Daniel Gaias Malagurti Date: Thu, 19 Dec 2024 19:04:21 -0300 Subject: [PATCH 2/2] fix(tracing): resolve OpenTelemetry token type warnings --- src/promptflow-tracing/CHANGELOG.md | 4 + .../promptflow/tracing/_trace.py | 136 +++++++++++------- 2 files changed, 90 insertions(+), 50 deletions(-) diff --git a/src/promptflow-tracing/CHANGELOG.md b/src/promptflow-tracing/CHANGELOG.md index b840ed12e49..af13913e714 100644 --- a/src/promptflow-tracing/CHANGELOG.md +++ b/src/promptflow-tracing/CHANGELOG.md @@ -1,5 +1,9 @@ # promptflow-tracing package +## v1.16.4 (2024.19.14) +- Fix Open Telemetry warnings +- Fix token count issue + ## v1.16.3 (2024.12.14) - Fix token count issue when the value is None or it is a Dict diff --git a/src/promptflow-tracing/promptflow/tracing/_trace.py b/src/promptflow-tracing/promptflow/tracing/_trace.py index ab53d6c986a..e2fd35e0105 100644 --- a/src/promptflow-tracing/promptflow/tracing/_trace.py +++ b/src/promptflow-tracing/promptflow/tracing/_trace.py @@ -109,56 +109,92 @@ def start_as_current_span( class TokenCollector: - _lock = Lock() - - def __init__(self): - self._span_id_to_tokens = {} - - def collect_openai_tokens(self, span, output): - span_id = span.get_span_context().span_id - if not inspect.isgenerator(output) and hasattr(output, "usage") and output.usage is not None: - tokens = output.usage.dict() - if tokens: - with self._lock: - self._span_id_to_tokens[span_id] = tokens - - def collect_openai_tokens_for_streaming(self, span, inputs, output, is_chat): - span_id = span.get_span_context().span_id - calculator = OpenAIMetricsCalculator() - if is_chat: - tokens = calculator.get_openai_metrics_for_chat_api(inputs, output) - else: - tokens = calculator.get_openai_metrics_for_completion_api(inputs, output) - with self._lock: - self._span_id_to_tokens[span_id] = tokens - - def collect_openai_tokens_for_parent_span(self, span): - tokens = self.try_get_openai_tokens(span.get_span_context().span_id) - if tokens: - if not hasattr(span, "parent") or span.parent is None: - return - parent_span_id = span.parent.span_id - with self._lock: - if parent_span_id in self._span_id_to_tokens: - merged_tokens = {} - for key in set(self._span_id_to_tokens[parent_span_id]) | set(tokens): - parent_value = self._span_id_to_tokens[parent_span_id].get(key, 0) - token_value = tokens.get(key, 0) - if isinstance(parent_value, dict) and isinstance(token_value, dict): - # Handle the case where both values are dictionaries - merged_tokens[key] = {**parent_value, **token_value} - elif isinstance(parent_value, dict) or isinstance(token_value, dict): - # Handle the case where one value is a dictionary and the other is not - merged_tokens[key] = parent_value if isinstance(parent_value, dict) else token_value - else: - merged_tokens[key] = int(parent_value or 0) + int(token_value or 0) - self._span_id_to_tokens[parent_span_id] = merged_tokens - else: - self._span_id_to_tokens[parent_span_id] = tokens - - def try_get_openai_tokens(self, span_id): - with self._lock: - return self._span_id_to_tokens.get(span_id, None) + _lock = Lock() + + def __init__(self): + self._span_id_to_tokens = {} + + def collect_openai_tokens(self, span, output): + span_id = span.get_span_context().span_id + if not inspect.isgenerator(output) and hasattr(output, "usage") and output.usage is not None: + try: + tokens = output.usage.dict() + if tokens: + safe_tokens = { + k: float(v) if isinstance(v, (int, float)) else 0 + for k, v in tokens.items() + } + with self._lock: + self._span_id_to_tokens[span_id] = safe_tokens + except: + logging.warning("Failed to process token usage") + + def collect_openai_tokens_for_streaming(self, span, inputs, output, is_chat): + span_id = span.get_span_context().span_id + calculator = OpenAIMetricsCalculator() + try: + if is_chat: + tokens = calculator.get_openai_metrics_for_chat_api(inputs, output) + else: + tokens = calculator.get_openai_metrics_for_completion_api(inputs, output) + safe_tokens = { + k: float(v) if isinstance(v, (int, float)) else 0 + for k, v in tokens.items() + } + with self._lock: + self._span_id_to_tokens[span_id] = safe_tokens + except Exception as e: + logging.warning(f"Failed to collect streaming tokens: {e}") + + def collect_openai_tokens_for_parent_span(self, span): + def safe_token_value(value): + if value is None: + return 0 + if isinstance(value, dict): + try: + return sum(safe_token_value(v) for v in value.values()) + except: + return 0 + if isinstance(value, (int, float)): + return float(value) + try: + return float(value) + except: + return 0 + + tokens = self.try_get_openai_tokens(span.get_span_context().span_id) + if not tokens: + return + + if not hasattr(span, "parent") or span.parent is None: + return + + parent_span_id = span.parent.span_id + with self._lock: + if parent_span_id in self._span_id_to_tokens: + current_tokens = self._span_id_to_tokens[parent_span_id] + merged_tokens = { + key: safe_token_value(current_tokens.get(key, 0)) + safe_token_value(tokens.get(key, 0)) + for key in set(current_tokens.keys()) | set(tokens.keys()) + } + for field in ['completion_tokens', 'prompt_tokens', 'total_tokens']: + if field not in merged_tokens: + merged_tokens[field] = 0.0 + else: + merged_tokens[field] = float(merged_tokens[field]) + self._span_id_to_tokens[parent_span_id] = merged_tokens + else: + safe_tokens = {k: safe_token_value(v) for k, v in tokens.items()} + for field in ['completion_tokens', 'prompt_tokens', 'total_tokens']: + if field not in safe_tokens: + safe_tokens[field] = 0.0 + else: + safe_tokens[field] = float(safe_tokens[field]) + self._span_id_to_tokens[parent_span_id] = safe_tokens + + def try_get_openai_tokens(self, span_id): + with self._lock: + return self._span_id_to_tokens.get(span_id, None) token_collector = TokenCollector()