Skip to content

Commit ac5b037

Browse files
authored
Merge branch 'main' into rysweet-remove-ELSA
2 parents 33f4cc2 + 3058baf commit ac5b037

21 files changed

+529
-122
lines changed

protos/agent_worker.proto

+1-2
Original file line numberDiff line numberDiff line change
@@ -117,12 +117,11 @@ message Message {
117117
oneof message {
118118
RpcRequest request = 1;
119119
RpcResponse response = 2;
120-
Event event = 3;
120+
cloudevent.CloudEvent cloudEvent = 3;
121121
RegisterAgentTypeRequest registerAgentTypeRequest = 4;
122122
RegisterAgentTypeResponse registerAgentTypeResponse = 5;
123123
AddSubscriptionRequest addSubscriptionRequest = 6;
124124
AddSubscriptionResponse addSubscriptionResponse = 7;
125-
cloudevent.CloudEvent cloudEvent = 8;
126125
}
127126
}
128127

python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_base_group_chat.py

+106-1
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,13 @@ async def run(
170170
:meth:`run_stream` to run the team and then returns the final result.
171171
Once the team is stopped, the termination condition is reset.
172172
173+
Args:
174+
task (str | ChatMessage | None): The task to run the team with.
175+
cancellation_token (CancellationToken | None): The cancellation token to kill the task immediately.
176+
Setting the cancellation token potentially put the team in an inconsistent state,
177+
and it may not reset the termination condition.
178+
To gracefully stop the team, use :class:`~autogen_agentchat.task.ExternalTermination` instead.
179+
173180
Example using the :class:`~autogen_agentchat.teams.RoundRobinGroupChat` team:
174181
175182
@@ -198,6 +205,47 @@ async def main() -> None:
198205
print(result)
199206
200207
208+
asyncio.run(main())
209+
210+
211+
Example using the :class:`~autogen_core.base.CancellationToken` to cancel the task:
212+
213+
.. code-block:: python
214+
215+
import asyncio
216+
from autogen_agentchat.agents import AssistantAgent
217+
from autogen_agentchat.task import MaxMessageTermination
218+
from autogen_agentchat.teams import RoundRobinGroupChat
219+
from autogen_core.base import CancellationToken
220+
from autogen_ext.models import OpenAIChatCompletionClient
221+
222+
223+
async def main() -> None:
224+
model_client = OpenAIChatCompletionClient(model="gpt-4o")
225+
226+
agent1 = AssistantAgent("Assistant1", model_client=model_client)
227+
agent2 = AssistantAgent("Assistant2", model_client=model_client)
228+
termination = MaxMessageTermination(3)
229+
team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination)
230+
231+
cancellation_token = CancellationToken()
232+
233+
# Create a task to run the team in the background.
234+
run_task = asyncio.create_task(
235+
team.run(
236+
task="Count from 1 to 10, respond one at a time.",
237+
cancellation_token=cancellation_token,
238+
)
239+
)
240+
241+
# Wait for 1 second and then cancel the task.
242+
await asyncio.sleep(1)
243+
cancellation_token.cancel()
244+
245+
# This will raise a cancellation error.
246+
await run_task
247+
248+
201249
asyncio.run(main())
202250
"""
203251
result: TaskResult | None = None
@@ -221,6 +269,13 @@ async def run_stream(
221269
of the type :class:`TaskResult` as the last item in the stream. Once the
222270
team is stopped, the termination condition is reset.
223271
272+
Args:
273+
task (str | ChatMessage | None): The task to run the team with.
274+
cancellation_token (CancellationToken | None): The cancellation token to kill the task immediately.
275+
Setting the cancellation token potentially put the team in an inconsistent state,
276+
and it may not reset the termination condition.
277+
To gracefully stop the team, use :class:`~autogen_agentchat.task.ExternalTermination` instead.
278+
224279
Example using the :class:`~autogen_agentchat.teams.RoundRobinGroupChat` team:
225280
226281
.. code-block:: python
@@ -251,7 +306,52 @@ async def main() -> None:
251306
252307
253308
asyncio.run(main())
309+
310+
311+
Example using the :class:`~autogen_core.base.CancellationToken` to cancel the task:
312+
313+
.. code-block:: python
314+
315+
import asyncio
316+
from autogen_agentchat.agents import AssistantAgent
317+
from autogen_agentchat.task import MaxMessageTermination, Console
318+
from autogen_agentchat.teams import RoundRobinGroupChat
319+
from autogen_core.base import CancellationToken
320+
from autogen_ext.models import OpenAIChatCompletionClient
321+
322+
323+
async def main() -> None:
324+
model_client = OpenAIChatCompletionClient(model="gpt-4o")
325+
326+
agent1 = AssistantAgent("Assistant1", model_client=model_client)
327+
agent2 = AssistantAgent("Assistant2", model_client=model_client)
328+
termination = MaxMessageTermination(3)
329+
team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination)
330+
331+
cancellation_token = CancellationToken()
332+
333+
# Create a task to run the team in the background.
334+
run_task = asyncio.create_task(
335+
Console(
336+
team.run_stream(
337+
task="Count from 1 to 10, respond one at a time.",
338+
cancellation_token=cancellation_token,
339+
)
340+
)
341+
)
342+
343+
# Wait for 1 second and then cancel the task.
344+
await asyncio.sleep(1)
345+
cancellation_token.cancel()
346+
347+
# This will raise a cancellation error.
348+
await run_task
349+
350+
351+
asyncio.run(main())
352+
254353
"""
354+
255355
# Create the first chat message if the task is a string or a chat message.
256356
first_chat_message: ChatMessage | None = None
257357
if task is None:
@@ -288,12 +388,17 @@ async def stop_runtime() -> None:
288388
await self._runtime.send_message(
289389
GroupChatStart(message=first_chat_message),
290390
recipient=AgentId(type=self._group_chat_manager_topic_type, key=self._team_id),
391+
cancellation_token=cancellation_token,
291392
)
292393
# Collect the output messages in order.
293394
output_messages: List[AgentMessage] = []
294395
# Yield the messsages until the queue is empty.
295396
while True:
296-
message = await self._output_message_queue.get()
397+
message_future = asyncio.ensure_future(self._output_message_queue.get())
398+
if cancellation_token is not None:
399+
cancellation_token.link_future(message_future)
400+
# Wait for the next message, this will raise an exception if the task is cancelled.
401+
message = await message_future
297402
if message is None:
298403
break
299404
yield message

python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_base_group_chat_manager.py

+23-5
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
from abc import ABC, abstractmethod
23
from typing import Any, List
34

@@ -78,7 +79,9 @@ async def handle_start(self, message: GroupChatStart, ctx: MessageContext) -> No
7879
await self.publish_message(message, topic_id=DefaultTopicId(type=self._output_topic_type))
7980

8081
# Relay the start message to the participants.
81-
await self.publish_message(message, topic_id=DefaultTopicId(type=self._group_topic_type))
82+
await self.publish_message(
83+
message, topic_id=DefaultTopicId(type=self._group_topic_type), cancellation_token=ctx.cancellation_token
84+
)
8285

8386
# Append the user message to the message thread.
8487
self._message_thread.append(message.message)
@@ -95,8 +98,16 @@ async def handle_start(self, message: GroupChatStart, ctx: MessageContext) -> No
9598
await self._termination_condition.reset()
9699
return
97100

98-
speaker_topic_type = await self.select_speaker(self._message_thread)
99-
await self.publish_message(GroupChatRequestPublish(), topic_id=DefaultTopicId(type=speaker_topic_type))
101+
# Select a speaker to start the conversation.
102+
speaker_topic_type_future = asyncio.ensure_future(self.select_speaker(self._message_thread))
103+
# Link the select speaker future to the cancellation token.
104+
ctx.cancellation_token.link_future(speaker_topic_type_future)
105+
speaker_topic_type = await speaker_topic_type_future
106+
await self.publish_message(
107+
GroupChatRequestPublish(),
108+
topic_id=DefaultTopicId(type=speaker_topic_type),
109+
cancellation_token=ctx.cancellation_token,
110+
)
100111

101112
@event
102113
async def handle_agent_response(self, message: GroupChatAgentResponse, ctx: MessageContext) -> None:
@@ -140,8 +151,15 @@ async def handle_agent_response(self, message: GroupChatAgentResponse, ctx: Mess
140151
return
141152

142153
# Select a speaker to continue the conversation.
143-
speaker_topic_type = await self.select_speaker(self._message_thread)
144-
await self.publish_message(GroupChatRequestPublish(), topic_id=DefaultTopicId(type=speaker_topic_type))
154+
speaker_topic_type_future = asyncio.ensure_future(self.select_speaker(self._message_thread))
155+
# Link the select speaker future to the cancellation token.
156+
ctx.cancellation_token.link_future(speaker_topic_type_future)
157+
speaker_topic_type = await speaker_topic_type_future
158+
await self.publish_message(
159+
GroupChatRequestPublish(),
160+
topic_id=DefaultTopicId(type=speaker_topic_type),
161+
cancellation_token=ctx.cancellation_token,
162+
)
145163

146164
@rpc
147165
async def handle_reset(self, message: GroupChatReset, ctx: MessageContext) -> None:

python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_chat_agent_container.py

+1
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ async def handle_request(self, message: GroupChatRequestPublish, ctx: MessageCon
7171
await self.publish_message(
7272
GroupChatAgentResponse(agent_response=response),
7373
topic_id=DefaultTopicId(type=self._parent_topic_type),
74+
cancellation_token=ctx.cancellation_token,
7475
)
7576

7677
async def on_unhandled_message(self, message: Any, ctx: MessageContext) -> None:

python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_magentic_one/_magentic_one_orchestrator.py

+30-22
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import json
22
from typing import Any, List
33

4-
from autogen_core.base import MessageContext
4+
from autogen_core.base import AgentId, CancellationToken, MessageContext
55
from autogen_core.components import DefaultTopicId, Image, event, rpc
66
from autogen_core.components.models import (
77
AssistantMessage,
@@ -120,7 +120,7 @@ async def handle_start(self, message: GroupChatStart, ctx: MessageContext) -> No
120120
planning_conversation.append(
121121
UserMessage(content=self._get_task_ledger_facts_prompt(self._task), source=self._name)
122122
)
123-
response = await self._model_client.create(planning_conversation)
123+
response = await self._model_client.create(planning_conversation, cancellation_token=ctx.cancellation_token)
124124

125125
assert isinstance(response.content, str)
126126
self._facts = response.content
@@ -131,19 +131,19 @@ async def handle_start(self, message: GroupChatStart, ctx: MessageContext) -> No
131131
planning_conversation.append(
132132
UserMessage(content=self._get_task_ledger_plan_prompt(self._team_description), source=self._name)
133133
)
134-
response = await self._model_client.create(planning_conversation)
134+
response = await self._model_client.create(planning_conversation, cancellation_token=ctx.cancellation_token)
135135

136136
assert isinstance(response.content, str)
137137
self._plan = response.content
138138

139139
# Kick things off
140140
self._n_stalls = 0
141-
await self._reenter_inner_loop()
141+
await self._reenter_inner_loop(ctx.cancellation_token)
142142

143143
@event
144144
async def handle_agent_response(self, message: GroupChatAgentResponse, ctx: MessageContext) -> None:
145145
self._message_thread.append(message.agent_response.chat_message)
146-
await self._orchestrate_step()
146+
await self._orchestrate_step(ctx.cancellation_token)
147147

148148
@rpc
149149
async def handle_reset(self, message: GroupChatReset, ctx: MessageContext) -> None:
@@ -162,12 +162,16 @@ async def reset(self) -> None:
162162
async def on_unhandled_message(self, message: Any, ctx: MessageContext) -> None:
163163
raise ValueError(f"Unhandled message in group chat manager: {type(message)}")
164164

165-
async def _reenter_inner_loop(self) -> None:
165+
async def _reenter_inner_loop(self, cancellation_token: CancellationToken) -> None:
166166
# Reset the agents
167-
await self.publish_message(
168-
GroupChatReset(),
169-
topic_id=DefaultTopicId(type=self._group_topic_type),
170-
)
167+
for participant_topic_type in self._participant_topic_types:
168+
await self._runtime.send_message(
169+
GroupChatReset(),
170+
recipient=AgentId(type=participant_topic_type, key=self.id.key),
171+
cancellation_token=cancellation_token,
172+
)
173+
# Reset the group chat manager
174+
await self.reset()
171175
self._message_thread.clear()
172176

173177
# Prepare the ledger
@@ -192,12 +196,12 @@ async def _reenter_inner_loop(self) -> None:
192196
)
193197

194198
# Restart the inner loop
195-
await self._orchestrate_step()
199+
await self._orchestrate_step(cancellation_token=cancellation_token)
196200

197-
async def _orchestrate_step(self) -> None:
201+
async def _orchestrate_step(self, cancellation_token: CancellationToken) -> None:
198202
# Check if we reached the maximum number of rounds
199203
if self._max_turns is not None and self._n_rounds > self._max_turns:
200-
await self._prepare_final_answer("Max rounds reached.")
204+
await self._prepare_final_answer("Max rounds reached.", cancellation_token)
201205
return
202206
self._n_rounds += 1
203207

@@ -216,7 +220,7 @@ async def _orchestrate_step(self) -> None:
216220

217221
# Check for task completion
218222
if progress_ledger["is_request_satisfied"]["answer"]:
219-
await self._prepare_final_answer(progress_ledger["is_request_satisfied"]["reason"])
223+
await self._prepare_final_answer(progress_ledger["is_request_satisfied"]["reason"], cancellation_token)
220224
return
221225

222226
# Check for stalling
@@ -229,8 +233,8 @@ async def _orchestrate_step(self) -> None:
229233

230234
# Too much stalling
231235
if self._n_stalls >= self._max_stalls:
232-
await self._update_task_ledger()
233-
await self._reenter_inner_loop()
236+
await self._update_task_ledger(cancellation_token)
237+
await self._reenter_inner_loop(cancellation_token)
234238
return
235239

236240
# Broadcst the next step
@@ -247,20 +251,23 @@ async def _orchestrate_step(self) -> None:
247251
await self.publish_message( # Broadcast
248252
GroupChatAgentResponse(agent_response=Response(chat_message=message)),
249253
topic_id=DefaultTopicId(type=self._group_topic_type),
254+
cancellation_token=cancellation_token,
250255
)
251256

252257
# Request that the step be completed
253258
next_speaker = progress_ledger["next_speaker"]["answer"]
254-
await self.publish_message(GroupChatRequestPublish(), topic_id=DefaultTopicId(type=next_speaker))
259+
await self.publish_message(
260+
GroupChatRequestPublish(), topic_id=DefaultTopicId(type=next_speaker), cancellation_token=cancellation_token
261+
)
255262

256-
async def _update_task_ledger(self) -> None:
263+
async def _update_task_ledger(self, cancellation_token: CancellationToken) -> None:
257264
context = self._thread_to_context()
258265

259266
# Update the facts
260267
update_facts_prompt = self._get_task_ledger_facts_update_prompt(self._task, self._facts)
261268
context.append(UserMessage(content=update_facts_prompt, source=self._name))
262269

263-
response = await self._model_client.create(context)
270+
response = await self._model_client.create(context, cancellation_token=cancellation_token)
264271

265272
assert isinstance(response.content, str)
266273
self._facts = response.content
@@ -270,19 +277,19 @@ async def _update_task_ledger(self) -> None:
270277
update_plan_prompt = self._get_task_ledger_plan_update_prompt(self._team_description)
271278
context.append(UserMessage(content=update_plan_prompt, source=self._name))
272279

273-
response = await self._model_client.create(context)
280+
response = await self._model_client.create(context, cancellation_token=cancellation_token)
274281

275282
assert isinstance(response.content, str)
276283
self._plan = response.content
277284

278-
async def _prepare_final_answer(self, reason: str) -> None:
285+
async def _prepare_final_answer(self, reason: str, cancellation_token: CancellationToken) -> None:
279286
context = self._thread_to_context()
280287

281288
# Get the final answer
282289
final_answer_prompt = self._get_final_answer_prompt(self._task)
283290
context.append(UserMessage(content=final_answer_prompt, source=self._name))
284291

285-
response = await self._model_client.create(context)
292+
response = await self._model_client.create(context, cancellation_token=cancellation_token)
286293
assert isinstance(response.content, str)
287294
message = TextMessage(content=response.content, source=self._name)
288295

@@ -298,6 +305,7 @@ async def _prepare_final_answer(self, reason: str) -> None:
298305
await self.publish_message(
299306
GroupChatAgentResponse(agent_response=Response(chat_message=message)),
300307
topic_id=DefaultTopicId(type=self._group_topic_type),
308+
cancellation_token=cancellation_token,
301309
)
302310

303311
# Signal termination

0 commit comments

Comments
 (0)