Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SK assistant for agentchat #5134

Open
wants to merge 29 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
74c9671
initial sk assistant agent implementation
lpinheiroms Jan 22, 2025
3a1396a
handle chat message types
lpinheiroms Jan 22, 2025
6746e82
add unit tests
lpinheiroms Jan 24, 2025
573bb0d
add docstring and lint
lpinheiroms Jan 24, 2025
aa24748
add docs
lpinheiroms Jan 24, 2025
418b1b8
Merge branch 'main' into feat/add-sk-assistant-agent
lspinheiro Jan 24, 2025
d6d7b15
doc example lint
lpinheiroms Jan 24, 2025
fea7ecc
update toctree
lpinheiroms Jan 24, 2025
b6e941b
remove partial messages
lpinheiroms Jan 24, 2025
05f8cd9
Merge branch 'main' into feat/add-sk-assistant-agent
lspinheiro Jan 24, 2025
3debf1c
Merge branch 'main' into feat/add-sk-assistant-agent
lspinheiro Jan 25, 2025
7e23d71
Merge branch 'main' into feat/add-sk-assistant-agent
lspinheiro Jan 27, 2025
b91a09f
Merge branch 'main' into feat/add-sk-assistant-agent
rysweet Jan 31, 2025
fa1e2f2
Merge branch 'main' into feat/add-sk-assistant-agent
rysweet Feb 2, 2025
38afae4
Merge branch 'main' into feat/add-sk-assistant-agent
lspinheiro Feb 11, 2025
47e87cb
Merge branch 'main' into feat/add-sk-assistant-agent
lspinheiro Feb 12, 2025
3ba6661
validate prompt args to enforce function auto exec
lpinheiroms Feb 13, 2025
0842e5f
update stream call event processing
lpinheiroms Feb 18, 2025
f4122e4
Merge branch 'main' into feat/add-sk-assistant-agent
lspinheiro Feb 18, 2025
09d8ae1
fix test
lpinheiroms Feb 19, 2025
761c731
improve testing coverage
lpinheiroms Feb 19, 2025
1c847f0
add simpler example
lpinheiroms Feb 19, 2025
3b65aba
Merge branch 'main' into feat/add-sk-assistant-agent
lspinheiro Feb 19, 2025
13d4e5c
typing fixes
lpinheiroms Feb 19, 2025
50230a9
doc example typing fix
lpinheiroms Feb 19, 2025
b28543f
Merge branch 'main' into feat/add-sk-assistant-agent
lspinheiro Feb 19, 2025
5b6ef2e
add doc references
lpinheiroms Feb 19, 2025
15fd09c
Merge branch 'main' into feat/add-sk-assistant-agent
lspinheiro Feb 19, 2025
8027f89
Merge branch 'main' into feat/add-sk-assistant-agent
lspinheiro Feb 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions python/packages/autogen-core/docs/src/reference/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ python/autogen_ext.agents.web_surfer
python/autogen_ext.agents.file_surfer
python/autogen_ext.agents.video_surfer
python/autogen_ext.agents.video_surfer.tools
python/autogen_ext.agents.semantic_kernel
python/autogen_ext.auth.azure
python/autogen_ext.teams.magentic_one
python/autogen_ext.models.cache
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
autogen\_ext.agents.semantic_kernel
====================================


.. automodule:: autogen_ext.agents.semantic_kernel
:members:
:undoc-members:
:show-inheritance:
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from ._sk_assistant_agent import SKAssistantAgent

__all__ = ["SKAssistantAgent"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,327 @@
import base64
from collections.abc import AsyncGenerator
from dataclasses import asdict
from typing import Any, Optional, Sequence

from autogen_agentchat.agents._base_chat_agent import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import ChatMessage, HandoffMessage, StopMessage, TextMessage, ToolCallSummaryMessage
from autogen_core import CancellationToken

from semantic_kernel.connectors.ai.chat_completion_client_base import ChatCompletionClientBase
from semantic_kernel.connectors.ai.prompt_execution_settings import PromptExecutionSettings
from semantic_kernel.contents import ImageContent, TextContent
from semantic_kernel.contents.chat_history import ChatHistory
from semantic_kernel.contents.chat_message_content import ChatMessageContent
from semantic_kernel.contents.utils.author_role import AuthorRole
from semantic_kernel.exceptions import KernelServiceNotFoundError
from semantic_kernel.kernel import Kernel


class SKAssistantAgent(BaseChatAgent):
"""
SKAssistantAgent is a specialized agent that leverages Semantic Kernel for
conversation handling and response generation. It extends the autogen
``BaseChatAgent`` class and uses a single Semantic Kernel ``ChatHistory``
to store and manage dialogue context.

Installation:

.. code-block:: bash

pip install "autogen-ext[semantic-kernel-core]"

For other model providers and semantic kernel features install the appropriate extra or install all providers with: semantic-kernel-all

This agent supports streaming responses (token by token) and final message
generation by calling the configured Semantic Kernel chat completion service.

Args:
name (str): The name of the agent.
description (str): A description of the agent's capabilities or purpose.
kernel (Kernel): The Semantic Kernel instance to use for chat completions.
service_id (str, optional): The ID of the chat completion service. Defaults to "default".
instructions (str, optional): Optional system-level instructions for the assistant.
execution_settings (PromptExecutionSettings, optional):
Optional prompt execution settings to override defaults.

Example usage:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Requiring a Bing search API key adds barrier to entry. I think we should have another example before this one to show the usage of basic function-based tools like calculator.

Also we need to reference relevant docs in Semantic Kernel documentation whenever we mention a new concept from SK.


The following example demonstrates how to create and use an ``SKAssistantAgent``
in conjunction with a Semantic Kernel. It sets up an Azure-based chat model,
adds a Bing search plugin, and then streams the agent's response to the console:

.. code-block:: python

import asyncio
import os

from dotenv import load_dotenv
from autogen_agentchat.ui._console import Console
from semantic_kernel import Kernel
from semantic_kernel.connectors.ai.function_choice_behavior import FunctionChoiceBehavior
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion, AzureChatPromptExecutionSettings
from semantic_kernel.connectors.search.bing import BingSearch
from semantic_kernel.functions import KernelArguments, KernelParameterMetadata, KernelPlugin
from autogen_core import CancellationToken
from autogen_ext.agents.semantic_kernel import SKAssistantAgent
from autogen_agentchat.messages import TextMessage

load_dotenv("../.env")


async def main():
# Initialize the kernel
kernel = Kernel()

# Configure OpenAI chat completion
ai_service = AzureChatCompletion(
api_key=os.getenv("AZURE_OPENAI_API_KEY"),
deployment_name="gpt-4o-mini",
endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
api_version=os.getenv("AZURE_OPENAI_VERSION"),
)
kernel.add_service(ai_service)

# Configure Bing search
bing_api_key = os.getenv("BING_API_KEY")
# Add the WebSearchEnginePlugin to the kernel
kernel.add_plugin(
KernelPlugin.from_text_search_with_search(
BingSearch(bing_api_key),
plugin_name="bing",
description="Get details about Semantic Kernel concepts.",
parameters=[
KernelParameterMetadata(
name="query",
description="The search query.",
type="str",
is_required=True,
type_object=str,
),
KernelParameterMetadata(
name="top",
description="Number of results to return.",
type="int",
is_required=False,
default_value=2,
type_object=int,
),
KernelParameterMetadata(
name="skip",
description="Number of results to skip.",
type="int",
is_required=False,
default_value=0,
type_object=int,
),
],
)
)

# Create the SKAssistantAgent
agent = SKAssistantAgent(
name="MyAssistant",
description="An AI assistant that can search the web and answer questions",
kernel=kernel,
execution_settings=AzureChatPromptExecutionSettings(
function_choice_behavior=FunctionChoiceBehavior.Auto(auto_invoke=True)
),
)

query = "What are the latest news on autogen?"
await Console(agent.run_stream(task=query))


if __name__ == "__main__":
asyncio.run(main())

"""

def __init__(
self,
name: str,
description: str,
kernel: Kernel,
service_id: str = "default",
instructions: Optional[str] = None,
execution_settings: Optional[PromptExecutionSettings] = None,
) -> None:
super().__init__(name, description)
self._kernel = kernel
self._service_id = service_id
self._instructions = instructions
self._execution_settings = execution_settings

# Maintain the entire conversation as a ChatHistory (SK concept).
self._chat_history: ChatHistory = ChatHistory()

# If instructions are provided, set them as the first system message
if instructions:
self._chat_history.add_system_message(instructions)

@property
def produced_message_types(self) -> Sequence[type[ChatMessage]]:
return [TextMessage]
Copy link
Collaborator

@ekzhu ekzhu Feb 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the agent can't produce HandoffMessage, in that case we need to make a TODO and reference a new issue. Similar to #5496

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any guides on how to implement handoffs? I can add support but I'm not familiar with the conditions for when handoffs should occur.

Copy link
Collaborator

@ekzhu ekzhu Feb 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handoff are basically function calls that models can make. We created the functions under-the-hood while the user provides either just strings for the target agent's names or Handoff objects. The AssistantAgent implementation shows how handoff is implemented. You can see the flow diagram: https://microsoft.github.io/autogen/stable/reference/python/autogen_agentchat.agents.html#autogen_agentchat.agents.AssistantAgent


async def on_messages(
self,
messages: Sequence[ChatMessage],
cancellation_token: CancellationToken,
) -> Response:
"""
Handle incoming messages, add them to our ChatHistory, call SK for a response,
and return a final single text response.
"""
# 1) Convert & store new agent messages in ChatHistory
for msg in messages:
sk_msg = self._convert_chat_message_to_sk_chat_message_content(AuthorRole.USER, msg)
self._chat_history.add_message(sk_msg)

# 2) Retrieve the SK chat completion service
chat_completion_service = self._kernel.get_service(
service_id=self._service_id,
type=ChatCompletionClientBase,
)
if not chat_completion_service:
raise KernelServiceNotFoundError(f"Chat completion service not found with service_id: {self._service_id}")

assert isinstance(chat_completion_service, ChatCompletionClientBase)

# 3) Get or create the PromptExecutionSettings
settings = (
self._execution_settings
or self._kernel.get_prompt_execution_settings_from_service_id(self._service_id)
or chat_completion_service.instantiate_prompt_execution_settings( # type: ignore
service_id=self._service_id,
extension_data={"ai_model_id": chat_completion_service.ai_model_id},
)
)

# 4) Invoke SK to get an assistant response
sk_responses = await chat_completion_service.get_chat_message_contents(
chat_history=self._chat_history,
settings=settings,
kernel=self._kernel,
)
# Convert SK's list of responses into a single final text
assistant_reply = "\n".join(r.content for r in sk_responses if r.content)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For SK's list of responses, are they separate messages or a single message? Is it possible for this to contain messages that may contain tool calls?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It returns a list

    async def get_chat_message_contents(
        self,
        chat_history: "ChatHistory",
        settings: "PromptExecutionSettings",
        **kwargs: Any,
    ) -> list["ChatMessageContent"]:

But it can return tool calls, it is a good catch. This depends on the configuration of the prompt settings. In the model adapter we are forcing semantic kernel to return tool calls to keep the same contract of the chat completion client. Here in the sample I'm using semantic kernel to automatically execute the function but this is not being forced in the configuration. Do you think we should force it or allow it to return tool calls? We may need to be more explicit about this behavior in the docs and maybe log some warnings

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ekzhu , do you have an idea of how you want to handle function calls? The output is similar to what we would get from the client. We have some options.

  1. Override the prompt setting to execute it
  2. Execute it manually by calling the kernel function (need to look into it, but looks possible)
  3. Return a tool call message / handoff? (unsure how handoffs work and what type of agent configuration would be needed).

Thoughts?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ended up not going with any of the suggestions above. If the execution settings are not configured to auto invoke, then we will throw an exception

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And there is also a validation that would throw a value error with a descriptive message in case function call message is returned by the model client, but this is not an expected behavior now.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding to the response message being a concatenation of multiple messages. Do you think it should be a sequence of event messages (e.g., ToolCallRequestedEvent) followed by a final response, as in the AssistantAgent.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I asked Evan form the SK team and he said it actually only returns a single message in the list, the list format was added just in case. We could add an assert/exception handler to check if it is a single element and then update the code to only process the expected message. That could make the processing more explicit while providing a clear indication in case this expectation is broken in the future.

reply_message = TextMessage(content=assistant_reply, source=self.name)

# 5) Add the new assistant message into our chat history
if assistant_reply.strip():
self._chat_history.add_message(
self._convert_chat_message_to_sk_chat_message_content(AuthorRole.ASSISTANT, reply_message)
)

# 6) Return an autogen Response containing the text
return Response(chat_message=reply_message)

async def on_messages_stream(
self,
messages: Sequence[ChatMessage],
cancellation_token: CancellationToken,
) -> AsyncGenerator[ChatMessage | Response, None]:
"""
Handle new messages in streaming mode, yielding partial text messages
as we receive them, then yield a final single Response.
"""
# 1) Convert & store new agent messages
for msg in messages:
sk_msg = self._convert_chat_message_to_sk_chat_message_content(AuthorRole.USER, msg)
self._chat_history.add_message(sk_msg)

# 2) Retrieve chat completion service
chat_completion_service = self._kernel.get_service(
service_id=self._service_id,
type=ChatCompletionClientBase,
)
if not chat_completion_service:
raise KernelServiceNotFoundError(f"Chat completion service not found with service_id: {self._service_id}")

assert isinstance(chat_completion_service, ChatCompletionClientBase)

settings = (
self._execution_settings
or self._kernel.get_prompt_execution_settings_from_service_id(self._service_id)
or chat_completion_service.instantiate_prompt_execution_settings( # type: ignore
service_id=self._service_id,
extension_data={"ai_model_id": chat_completion_service.ai_model_id},
)
)

# 3) Stream the SK response
accumulated_reply: list[str] = []
async for sk_message_list in chat_completion_service.get_streaming_chat_message_contents(
chat_history=self._chat_history,
settings=settings,
kernel=self._kernel,
):
for sk_message in sk_message_list:
# If it's streaming text, yield partial text as a new TextMessage
if sk_message.content:
partial_text = sk_message.content
accumulated_reply.append(partial_text)
yield TextMessage(content=partial_text, source=self.name)

# 4) After streaming ends, save the entire assistant message
final_text = "".join(accumulated_reply).strip()
if final_text:
self._chat_history.add_assistant_message(final_text, name=self.name)

# 5) Finally, yield the single autogen Response
yield Response(chat_message=TextMessage(content=final_text, source=self.name))

async def on_reset(self, cancellation_token: CancellationToken) -> None:
"""Clear the entire conversation history."""
self._chat_history.messages.clear()

@staticmethod
def _convert_chat_message_to_sk_chat_message_content(
role: AuthorRole, chat_message: ChatMessage
) -> ChatMessageContent:
# Prepare a place to store metadata (e.g., usage)
metadata: dict[str, Any] = {}
if chat_message.models_usage is not None:
metadata["models_usage"] = asdict(chat_message.models_usage)

items: list[TextContent | ImageContent] = []
msg_type = chat_message.type

match msg_type:
case "TextMessage":
assert isinstance(chat_message, TextMessage)
items.append(TextContent(text=chat_message.content))

case "MultiModalMessage":
for entry in chat_message.content:
if isinstance(entry, str):
items.append(TextContent(text=entry))
else:
# entry is autogen_core.Image
# Convert to base64 and then into bytes for ImageContent
b64 = entry.to_base64()
img_bytes = base64.b64decode(b64)
items.append(
ImageContent(
data=img_bytes, # type: ignore
data_format="base64", # type: ignore
mime_type="image/png", # type: ignore
)
)

case "StopMessage":
assert isinstance(chat_message, StopMessage)
items.append(TextContent(text=chat_message.content))

case "HandoffMessage":
assert isinstance(chat_message, HandoffMessage)
# Store handoff details as text
text = f"Handoff target: {chat_message.target}\n\n{chat_message.content}"
items.append(TextContent(text=text))

case "ToolCallSummaryMessage":
assert isinstance(chat_message, ToolCallSummaryMessage)
items.append(TextContent(text=chat_message.content))

return ChatMessageContent(role=role, items=items, metadata=metadata, name=chat_message.source) # type: ignore
Loading
Loading