Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: streaming token mode cannot work in function calls and will infi… #5396

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

so2liu
Copy link

@so2liu so2liu commented Feb 6, 2025

Fix: Prevent empty messages accumulation in streaming mode

Why are these changes needed?

In new released autogen 0.4.5, there is a bug in streaming token mode that causes the system to hang when agents attempt to use tool calls. This happens because the current condition if choice.delta.content is not None: allows empty strings to pass through, causing empty messages to accumulate in the conversation history:

[
    {'content': "You are the user's assistant...", 'role': 'system'},
    {'content': "Tomorrow's weather", 'role': 'user', 'name': 'user'},
    {'content': '', 'role': 'assistant', 'name': 'user_agent'},  # Empty message
    {'content': '', 'role': 'assistant', 'name': 'user_agent'}   # Empty message
]

By changing the condition to if choice.delta.content:, we prevent these empty messages from accumulating, fixing the hanging issue during tool calls.

Reproduce Code

import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import Swarm
from autogen_agentchat.base import TaskResult
from autogen_agentchat.messages import (
    ModelClientStreamingChunkEvent,
    TextMessage,
    HandoffMessage,
    ToolCallRequestEvent,
    ToolCallExecutionEvent,
    ToolCallSummaryMessage,
)
from autogen_agentchat.conditions import TextMentionTermination
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_core.models import ModelFamily, CreateResult, LLMMessage
from typing import Any, List, Sequence

LARGE_LLM_MODEL = "gpt-4o"
LARGE_LLM_API_KEY = "sk-or-v1-"
LARGE_LLM_API_URL = "https://openrouter.ai/api/v1"

model_client = OpenAIChatCompletionClient(
    model=LARGE_LLM_MODEL,
    api_key=LARGE_LLM_API_KEY,
    base_url=LARGE_LLM_API_URL,
    model_info={
        "family": ModelFamily.ANY,
        "function_calling": True,
        "json_output": True,
        "vision": False,
    },
)


# Define a tool
async def get_weather(city: str) -> str:
    return f"The weather in {city} is 73 degrees and Sunny."


async def get_users_location() -> str:
    return "Beijing"


MODEL_CLIENT_STREAM = True

user_agent = AssistantAgent(
    "user_agent",
    model_client=model_client,
    system_message="You are the user's assistant. You are responsible for forwarding the user's questions. If you believe the user's question has been answered, summarize the complete answer for the user and use TERMINATE to end.",
    model_client_stream=MODEL_CLIENT_STREAM,
    handoffs=["locate_agent", "weather_agent"],
)

locate_agent = AssistantAgent(
    "locate_agent",
    model_client=model_client,
    system_message="You can use the get_users_location tool to get the user's location.",
    model_client_stream=MODEL_CLIENT_STREAM,
    tools=[get_users_location],
    handoffs=["weather_agent"],
)

weather_agent = AssistantAgent(
    "weather_agent",
    model_client=model_client,
    system_message="You can use the get_weather tool to get the weather of a city. "
    "If the location is missing, hand over to locate_agent to get the location. "
    "If the user's question has been answered, hand over to user_agent to confirm task completion.",
    model_client_stream=MODEL_CLIENT_STREAM,
    tools=[get_weather],
    handoffs=["user_agent", "locate_agent"],
)


async def main() -> None:
    # Use Swarm instead of RoundRobinGroupChat
    agent_team = Swarm(
        participants=[
            user_agent,
            locate_agent,
            weather_agent,
        ],
        termination_condition=TextMentionTermination("TERMINATE"),
    )

    stream = agent_team.run_stream(
        task="Tomorrow's weather",
    )
    current_speaker = ""
    async for message in stream:
        # Define a custom print to show streaming tokens
        if isinstance(message, ModelClientStreamingChunkEvent):
            if message.source != current_speaker:
                current_speaker = message.source
                print(f"\n{current_speaker}: ", end="")
            print(message.content, flush=True, end="")
            await asyncio.sleep(0.1)
        elif isinstance(message, TaskResult):
            print(f"\nTaskResult: {message}")
        elif isinstance(message, TextMessage):
            if message.source != current_speaker:
                current_speaker = message.source
                print(f"\n{current_speaker}: ", end="")
            else:
                continue
            print(message.content)
            await asyncio.sleep(0.1)
        elif isinstance(message, ToolCallRequestEvent):
            print(f"\nToolCallRequestEvent: {message.content[0].name}")
        elif isinstance(message, ToolCallExecutionEvent):
            print(f"\nToolCallExecutionEvent: {message.content[0].content}")
        elif isinstance(message, ToolCallSummaryMessage):
            print(f"\nToolCallSummaryMessage: {message.content}")
        elif isinstance(message, HandoffMessage):
            print(f"\nHandoffMessage: {message.content}")
        else:
            print(f"Unknown message type: {type(message)}")


# NOTE: if running this inside a Python script you'll need to use asyncio.run(main()).
asyncio.run(main())

Changes Made

Changed in python/packages/autogen-ext/src/autogen_ext/models/openai/_openai_client.py:

- if choice.delta.content is not None:
+ if choice.delta.content:

Checks

Note: There are 3 existing test failures in tests/test_db_manager.py that are unrelated to this change and were present before this modification:

ERROR tests/test_db_manager.py::TestDatabaseOperations::test_basic_entity_creation
ERROR tests/test_db_manager.py::TestDatabaseOperations::test_upsert_operations
ERROR tests/test_db_manager.py::TestDatabaseOperations::test_delete_operations

These errors are related to OpenAI API key configuration and are outside the scope of this fix.

FYI, I can't use OpenAI API directly, therefore I can't test them.


Chinese Version:

修复:防止流式模式下空消息累积

为什么需要这些更改?

在 autogen 0.4.5 版本中,当代理使用工具调用时,流式令牌模式会出现卡住的问题。这是因为当前的条件 if choice.delta.content is not None: 允许空字符串通过,导致空消息在对话历史中累积:

[
    {'content': "You are the user's assistant...", 'role': 'system'},
    {'content': "Tomorrow's weather", 'role': 'user', 'name': 'user'},
    {'content': '', 'role': 'assistant', 'name': 'user_agent'},  # 空消息
    {'content': '', 'role': 'assistant', 'name': 'user_agent'}   # 空消息
]

通过将条件改为 if choice.delta.content:,我们可以防止这些空消息累积,从而修复工具调用时的卡住问题。

复现代码

参见英文部分

代码变更

python/packages/autogen-ext/src/autogen_ext/models/openai/_openai_client.py 中修改:

- if choice.delta.content is not None:
+ if choice.delta.content:

注意:在 tests/test_db_manager.py 中有 3 个已存在的测试失败,这些错误与本次修改无关,且在修改前就已存在。这些错误与 OpenAI API 密钥配置相关,超出了本次修复的范围。

@ekzhu ekzhu enabled auto-merge (squash) February 7, 2025 06:23
@ekzhu ekzhu disabled auto-merge February 7, 2025 06:25
@so2liu
Copy link
Author

so2liu commented Feb 7, 2025

@ekzhu Thanks for approval. Do you know why Merging is blocked?
I see CI is cancelled, but don't know the reason.
https://github.com/microsoft/autogen/actions/runs/13194403114/job/36833008745?pr=5396

Copy link

codecov bot commented Feb 7, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 78.09%. Comparing base (3c30d89) to head (23cfc64).
Report is 1 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #5396      +/-   ##
==========================================
+ Coverage   78.08%   78.09%   +0.01%     
==========================================
  Files         158      158              
  Lines        9576     9576              
==========================================
+ Hits         7477     7478       +1     
+ Misses       2099     2098       -1     
Flag Coverage Δ
unittests 78.09% <100.00%> (+0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants