Skip to content

Commit

Permalink
Rysweet fix integration tests and xlang (microsoft#5107)
Browse files Browse the repository at this point in the history
* fix python path

* update HelloAgent to enable grpc

* agents app remove webapplication and add grpc

* add di for client

* adding events types to grpc di

* warnings to information

* improve logging, add some error handling; fix grpc startup

* improve error logging

* cleaning up publishing of messages and message handling.

* WbApplication->HostedApplication

* formatting

* WebApplication -> HostApplication

* ensure correct .NET versions are available for integration test
  • Loading branch information
rysweet authored Jan 20, 2025
1 parent d9fd39a commit 11461b7
Show file tree
Hide file tree
Showing 13 changed files with 153 additions and 33 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/dotnet-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ jobs:
- name: Prepare python venv
run: |
source ${{ github.workspace }}/python/.venv/bin/activate
- name: Setup .NET 8.0
uses: actions/setup-dotnet@v4
with:
dotnet-version: '8.0.x'
- name: Setup .NET 9.0
uses: actions/setup-dotnet@v4
with:
Expand Down
3 changes: 1 addition & 2 deletions dotnet/samples/Hello/Hello.AppHost/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
.WaitFor(backend);
#pragma warning disable ASPIREHOSTINGPYTHON001 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed.
// xlang is over http for now - in prod use TLS between containers
builder.AddPythonApp("HelloAgentsPython", "../../../../python/packages/autogen-core/samples/xlang/hello_python_agent", "hello_python_agent.py", "../../../../../.venv")
.WithReference(backend)
builder.AddPythonApp("HelloAgentsPython", "../../../../python/samples/core_xlang_hello_python_agent", "hello_python_agent.py", "../../.venv").WithReference(backend)
.WithEnvironment("AGENT_HOST", backend.GetEndpoint("http"))
.WithEnvironment("STAY_ALIVE_ON_GOODBYE", "true")
.WithEnvironment("GRPC_DNS_RESOLVER", "native")
Expand Down
2 changes: 1 addition & 1 deletion dotnet/samples/Hello/HelloAIAgents/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
using Microsoft.AutoGen.Core;

// send a message to the agent
var builder = WebApplication.CreateBuilder();
var builder = new HostApplicationBuilder();
// put these in your environment or appsettings.json
builder.Configuration["HelloAIAgents:ModelType"] = "azureopenai";
builder.Configuration["HelloAIAgents:LlmModelName"] = "gpt-3.5-turbo";
Expand Down
1 change: 1 addition & 0 deletions dotnet/samples/Hello/HelloAgent/HelloAgent.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
</ItemGroup>

<ItemGroup>
<ProjectReference Include="../../../src/Microsoft.AutoGen/Core.Grpc/Microsoft.AutoGen.Core.Grpc.csproj" />
<ProjectReference Include="..\..\..\src\Microsoft.AutoGen\Agents\Microsoft.AutoGen.Agents.csproj" />
<ProjectReference Include="..\..\..\src\Microsoft.AutoGen\Contracts\Microsoft.AutoGen.Contracts.csproj" />
<ProjectReference Include="..\..\..\src\Microsoft.AutoGen\Core\Microsoft.AutoGen.Core.csproj" />
Expand Down
4 changes: 2 additions & 2 deletions dotnet/samples/Hello/HelloAgent/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@

var local = true;
if (Environment.GetEnvironmentVariable("AGENT_HOST") != null) { local = false; }
var app = await AgentsApp.PublishMessageAsync("HelloAgents", new NewMessageReceived
var app = await Microsoft.AutoGen.Core.Grpc.AgentsApp.PublishMessageAsync("HelloAgents", new NewMessageReceived
{
Message = "World"
}, local: local).ConfigureAwait(false);
await app.WaitForShutdownAsync();

namespace Hello
{
[TopicSubscription("agents")]
[TopicSubscription("HelloAgents")]
public class HelloAgent(
IAgentWorker worker, IHostApplicationLifetime hostApplicationLifetime,
[FromKeyedServices("EventTypes")] EventTypes typeRegistry) : Agent(
Expand Down
2 changes: 1 addition & 1 deletion dotnet/samples/Hello/HelloAgent/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"Logging": {
"LogLevel": {
"Default": "Warning",
"Microsoft": "Warning",
"Microsoft": "Information",
"Microsoft.Orleans": "Warning"
}
}
Expand Down
5 changes: 3 additions & 2 deletions dotnet/src/Microsoft.AutoGen/AgentHost/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
"Logging": {
"LogLevel": {
"Default": "Warning",
"Microsoft": "Warning",
"Microsoft.Orleans": "Warning"
"Microsoft": "Information",
"Microsoft.Orleans": "Warning",
"Orleans.Runtime": "Error"
}
},
"AllowedHosts": "*",
Expand Down
23 changes: 14 additions & 9 deletions dotnet/src/Microsoft.AutoGen/Core.Grpc/App.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using Google.Protobuf;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Hosting;
Expand All @@ -13,26 +12,32 @@ namespace Microsoft.AutoGen.Core.Grpc;
public static class AgentsApp
{
// need a variable to store the runtime instance
public static WebApplication? Host { get; private set; }
public static IHost? Host { get; private set; }

[MemberNotNull(nameof(Host))]
public static async ValueTask<WebApplication> StartAsync(WebApplicationBuilder? builder = null, AgentTypes? agentTypes = null, bool local = false)
public static async ValueTask<IHost> StartAsync(HostApplicationBuilder? builder = null, AgentTypes? agentTypes = null, bool local = false)
{
builder ??= WebApplication.CreateBuilder();
builder ??= new HostApplicationBuilder();
builder.Services.TryAddSingleton(DistributedContextPropagator.Current);
builder.AddAgentWorker()
if (!local)
{
builder.AddGrpcAgentWorker()
.AddAgents(agentTypes);
builder.AddServiceDefaults();
}
else
{
builder.AddAgentWorker()
.AddAgents(agentTypes);
}
var app = builder.Build();
app.MapDefaultEndpoints();
Host = app;
await app.StartAsync().ConfigureAwait(false);
return Host;
}
public static async ValueTask<WebApplication> PublishMessageAsync(
public static async ValueTask<IHost> PublishMessageAsync(
string topic,
IMessage message,
WebApplicationBuilder? builder = null,
HostApplicationBuilder? builder = null,
AgentTypes? agents = null,
bool local = false)
{
Expand Down
16 changes: 15 additions & 1 deletion dotnet/src/Microsoft.AutoGen/Core.Grpc/GrpcAgentWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,18 @@ private async Task RunReadPump()
agent.ReceiveMessage(message);
}

break;
case Message.MessageOneofCase.RegisterAgentTypeResponse:
if (!message.RegisterAgentTypeResponse.Success)
{
_logger.LogError($"Failed to register agent type '{message.RegisterAgentTypeResponse.Error}'");
}
break;
case Message.MessageOneofCase.AddSubscriptionResponse:
if (!message.AddSubscriptionResponse.Success)
{
_logger.LogError($"Failed to add subscription '{message.AddSubscriptionResponse.Error}'");
}
break;
default:
throw new InvalidOperationException($"Unexpected message '{message}'.");
Expand Down Expand Up @@ -146,7 +158,7 @@ private async Task RunWritePump()
{
// we could not connect to the endpoint - most likely we have the wrong port or failed ssl
// we need to let the user know what port we tried to connect to and then do backoff and retry
_logger.LogError(ex, "Error connecting to GRPC endpoint {Endpoint}.", channel.ToString());
_logger.LogError(ex, "Error connecting to GRPC endpoint {Endpoint}.", Environment.GetEnvironmentVariable("AGENT_HOST"));
break;
}
catch (Exception ex) when (!_shutdownCts.IsCancellationRequested)
Expand Down Expand Up @@ -316,6 +328,8 @@ private AsyncDuplexStreamingCall<Message, Message> RecreateChannel(AsyncDuplexSt
public async Task StartAsync(CancellationToken cancellationToken)
{
_channel = GetChannel();
_logger.LogInformation("Starting GrpcAgentWorker, connecting to gRPC endpoint " + Environment.GetEnvironmentVariable("AGENT_HOST"));

StartCore();

var tasks = new List<Task>(_agentTypes.Count);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// GrpcAgentWorkerHostBuilderExtension.cs
using System.Reflection;
using Google.Protobuf;
using Google.Protobuf.Reflection;
using Grpc.Core;
using Grpc.Net.Client.Configuration;
using Microsoft.AutoGen.Contracts;
Expand Down Expand Up @@ -44,6 +47,86 @@ public static IHostApplicationBuilder AddGrpcAgentWorker(this IHostApplicationBu
});
});
builder.Services.AddSingleton<IAgentWorker, GrpcAgentWorker>();
builder.Services.AddKeyedSingleton("EventTypes", (sp, key) =>
{
var interfaceType = typeof(IMessage);
var pairs = AppDomain.CurrentDomain.GetAssemblies()
.SelectMany(assembly => assembly.GetTypes())
.Where(type => interfaceType.IsAssignableFrom(type) && type.IsClass && !type.IsAbstract)
.Select(t => (t, GetMessageDescriptor(t)));

var descriptors = pairs.Select(t => t.Item2);
var typeRegistry = TypeRegistry.FromMessages(descriptors);
var types = pairs.ToDictionary(item => item.Item2?.FullName ?? "", item => item.t);

var eventsMap = AppDomain.CurrentDomain.GetAssemblies()
.SelectMany(assembly => assembly.GetTypes())
.Where(type => ReflectionHelper.IsSubclassOfGeneric(type, typeof(Agent)) && !type.IsAbstract)
.Select(t => (t, t.GetInterfaces()
.Where(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IHandle<>))
.Select(i => (GetMessageDescriptor(i.GetGenericArguments().First())?.FullName ?? "")).ToHashSet()))
.ToDictionary(item => item.t, item => item.Item2);
// if the assembly contains any interfaces of type IHandler, then add all the methods of the interface to the eventsMap
var handlersMap = AppDomain.CurrentDomain.GetAssemblies()
.SelectMany(assembly => assembly.GetTypes())
.Where(type => ReflectionHelper.IsSubclassOfGeneric(type, typeof(Agent)) && !type.IsAbstract)
.Select(t => (t, t.GetMethods()
.Where(m => m.Name == "Handle")
.Select(m => (GetMessageDescriptor(m.GetParameters().First().ParameterType)?.FullName ?? "")).ToHashSet()))
.ToDictionary(item => item.t, item => item.Item2);
// get interfaces implemented by the agent and get the methods of the interface if they are named Handle
var ifaceHandlersMap = AppDomain.CurrentDomain.GetAssemblies()
.SelectMany(assembly => assembly.GetTypes())
.Where(type => ReflectionHelper.IsSubclassOfGeneric(type, typeof(Agent)) && !type.IsAbstract)
.Select(t => t.GetInterfaces()
.Select(i => (t, i, i.GetMethods()
.Where(m => m.Name == "Handle")
.Select(m => (GetMessageDescriptor(m.GetParameters().First().ParameterType)?.FullName ?? ""))
//to dictionary of type t and paramter type of the method
.ToDictionary(m => m, m => m).Keys.ToHashSet())).ToList());
// for each item in ifaceHandlersMap, add the handlers to eventsMap with item as the key
foreach (var item in ifaceHandlersMap)
{
foreach (var iface in item)
{
if (eventsMap.TryGetValue(iface.Item2, out var events))
{
events.UnionWith(iface.Item3);
}
else
{
eventsMap[iface.Item2] = iface.Item3;
}
}
}

// merge the handlersMap into the eventsMap
foreach (var item in handlersMap)
{
if (eventsMap.TryGetValue(item.Key, out var events))
{
events.UnionWith(item.Value);
}
else
{
eventsMap[item.Key] = item.Value;
}
}
return new EventTypes(typeRegistry, types, eventsMap);
});
builder.Services.AddSingleton<IHostedService>(sp => (IHostedService)sp.GetRequiredService<IAgentWorker>());
builder.Services.AddSingleton((s) =>
{
var worker = s.GetRequiredService<IAgentWorker>();
var client = ActivatorUtilities.CreateInstance<Client>(s);
return client;
});
builder.Services.AddSingleton(new AgentApplicationBuilder(builder));
return builder;
}
private static MessageDescriptor? GetMessageDescriptor(Type type)
{
var property = type.GetProperty("Descriptor", BindingFlags.Static | BindingFlags.Public);
return property?.GetValue(null) as MessageDescriptor;
}
}
22 changes: 16 additions & 6 deletions dotnet/src/Microsoft.AutoGen/Core/Agent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ protected Agent(IAgentWorker worker,
ILogger<Agent>? logger = null)
{
EventTypes = eventTypes;
AgentId = new AgentId(this.GetType().Name, new Guid().ToString());
AgentId = new AgentId(this.GetType().Name, Guid.NewGuid().ToString()); ;
_logger = logger ?? LoggerFactory.Create(builder => { }).CreateLogger<Agent>();
_handlersByMessageType = new(GetType().GetHandlersLookupTable());
Messenger = AgentMessengerFactory.Create(worker, DistributedContextPropagator.Current);
Expand Down Expand Up @@ -272,9 +272,19 @@ static async (state, ct) =>

public async ValueTask PublishMessageAsync<T>(T message, string? source = null, CancellationToken token = default) where T : IMessage
{
var src = string.IsNullOrWhiteSpace(source) ? this.AgentId.Key : source;
var evt = message.ToCloudEvent(src);
await PublishEventAsync(evt, token).ConfigureAwait(false);
var topicTypes = this.GetType().GetCustomAttributes<TopicSubscriptionAttribute>().Select(t => t.Topic);
if (!topicTypes.Any())
{
topicTypes = topicTypes.Append(string.IsNullOrWhiteSpace(source) ? this.AgentId.Type + "." + this.AgentId.Key : source);
}
foreach (var topic in topicTypes)
{
await PublishMessageAsync(topic, message, source, token).ConfigureAwait(false);
}
}
public async ValueTask PublishMessageAsync<T>(string topic, T message, string? source = null, CancellationToken token = default) where T : IMessage
{
await PublishEventAsync(topic, message, token).ConfigureAwait(false);
}

public async ValueTask PublishEventAsync(CloudEvent item, CancellationToken cancellationToken = default)
Expand Down Expand Up @@ -353,8 +363,8 @@ public virtual Task HandleObject(object item)
// otherwise, complain
throw new InvalidOperationException($"No handler found for type {item.GetType().FullName}");
}
public async ValueTask PublishEventAsync(string topic, IMessage evt, CancellationToken cancellationToken = default)
public async ValueTask PublishEventAsync(string topic, IMessage message, CancellationToken cancellationToken = default)
{
await PublishEventAsync(evt.ToCloudEvent(topic), cancellationToken).ConfigureAwait(false);
await PublishEventAsync(message.ToCloudEvent(topic), cancellationToken).ConfigureAwait(false);
}
}
13 changes: 5 additions & 8 deletions dotnet/src/Microsoft.AutoGen/Core/App.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using Google.Protobuf;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Hosting;
Expand All @@ -13,27 +12,25 @@ namespace Microsoft.AutoGen.Core;
public static class AgentsApp
{
// need a variable to store the runtime instance
public static WebApplication? Host { get; private set; }
public static IHost? Host { get; private set; }

[MemberNotNull(nameof(Host))]
public static async ValueTask<WebApplication> StartAsync(WebApplicationBuilder? builder = null, AgentTypes? agentTypes = null)
public static async ValueTask<IHost> StartAsync(HostApplicationBuilder? builder = null, AgentTypes? agentTypes = null)
{
builder ??= WebApplication.CreateBuilder();
builder ??= new HostApplicationBuilder();
builder.Services.TryAddSingleton(DistributedContextPropagator.Current);
builder.AddAgentWorker()
.AddAgents(agentTypes);
builder.AddServiceDefaults();
var app = builder.Build();

app.MapDefaultEndpoints();
Host = app;
await app.StartAsync().ConfigureAwait(false);
return Host;
}
public static async ValueTask<WebApplication> PublishMessageAsync(
public static async ValueTask<IHost> PublishMessageAsync(
string topic,
IMessage message,
WebApplicationBuilder? builder = null,
HostApplicationBuilder? builder = null,
AgentTypes? agents = null,
bool local = false)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,14 @@ private async ValueTask DispatchEventAsync(CloudEvent evt)
{
// get the event type and then send to all agents that are subscribed to that event type
var eventType = evt.Type;
var source = evt.Source;
var agentTypes = new List<string>();
// ensure that we get agentTypes as an async enumerable list - try to get the value of agentTypes by topic and then cast it to an async enumerable list
if (_subscriptionsByTopic.TryGetValue(eventType, out var agentTypes))
if (_subscriptionsByTopic.TryGetValue(eventType, out var agentTypesList)) { agentTypes.AddRange(agentTypesList); }
if (_subscriptionsByTopic.TryGetValue(source, out var agentTypesList2)) { agentTypes.AddRange(agentTypesList2); }
if (_subscriptionsByTopic.TryGetValue(source + "." + eventType, out var agentTypesList3)) { agentTypes.AddRange(agentTypesList3); }
agentTypes = agentTypes.Distinct().ToList();
if (agentTypes.Count > 0)
{
await DispatchEventToAgentsAsync(agentTypes, evt);
}
Expand Down

0 comments on commit 11461b7

Please sign in to comment.