From 1e3b765e3acd4c1ca19cf80a7c6747b86e45b38f Mon Sep 17 00:00:00 2001 From: Ryan Sweet Date: Tue, 19 Nov 2024 11:00:48 -0800 Subject: [PATCH] .net changes to re-enable xlang support, add subscription apis (#4159) * add subscription response * fix send subscription response * add register agent type response * adding a test * working on shaping up a test * appsettins update for backend * another appsettings * fixup aspire hosting * enable AGENT_HOST var from aspire * add SendMessageAsync * remove broken test * test compiles and runs but is not (yet) correct * subscriptions grain wireup. * temp assert true. * remove DI for SubscriptionGrain * add xlang python code * add subscription response * rebond * Update to .NET 9.0 * Fix Backend project SDK * Package updates * get RegisterAgentTypeRequest working * fix exceptions * add error handling for requests * whoops * send cloud event message type * processing cloudevents * trying tosend proto data - doesn't work * trying to pack proto_data * fix (#4238) * pack the Message from agents_events * format - not sure why these? * format * cleanup, error handling, xlang sample publishes messages that can be heard by .NET and vice versa * format * sdk version * sdk vers * net8 * back to net8 * remove netstandard2 * fix used * remove unused * more cleanup * remove unneeded package * I'm terrible at writing tests * deserialize the cloud events and sent them as events * comment * cleanup * await * Delete dotnet/samples/Hello/Backend/Backend.csproj unneeded change * whoops * merge main python back into here * revert back to local * revert some of the helloAgents changes. * [.NET] Add happy path test for in-memory agent && Simplify HelloAgent example && some clean-up in extension APIs (#4227) * add happy path test * remove unnecessary namespace * fix build error * Update AgentBaseTests.cs * revert changes --------- * fix busted merge from main * addressing review comments * make internal * case sensitive rename step 1 * case sensitive rename step 2 * remove! --------- Co-authored-by: Peter Chang Co-authored-by: Reuben Bond Co-authored-by: Eric Zhu Co-authored-by: Xiaoyun Zhang --- dotnet/Directory.Packages.props | 60 +++++++++---------- dotnet/global.json | 2 +- dotnet/samples/Hello/Backend/Backend.csproj | 2 +- dotnet/samples/Hello/Backend/Program.cs | 2 - dotnet/samples/Hello/Backend/appsettings.json | 20 ++++--- .../Hello/Hello.AppHost/Hello.AppHost.csproj | 5 +- dotnet/samples/Hello/Hello.AppHost/Program.cs | 18 +++++- .../Hello/HelloAIAgents/HelloAIAgents.csproj | 2 +- .../Hello/HelloAgent/HelloAgent.csproj | 2 +- dotnet/samples/Hello/HelloAgent/Program.cs | 18 +++--- .../HelloAgentState/HelloAgentState.csproj | 2 +- .../samples/Hello/HelloAgentState/Program.cs | 2 +- .../DevTeam.AgentHost.csproj | 2 +- .../DevTeam.Agents/DevTeam.Agents.csproj | 2 +- .../DevTeam.AppHost/DevTeam.AppHost.csproj | 4 +- .../DevTeam.Backend/DevTeam.Backend.csproj | 2 +- .../DevTeam.Shared/DevTeam.Shared.csproj | 2 +- .../Function/FunctionAttribute.cs | 2 +- .../Abstractions/IAgentBase.cs | 1 + .../Abstractions/IAgentRuntime.cs | 1 + .../Abstractions/IAgentWorker.cs | 1 + .../Microsoft.AutoGen.Abstractions.csproj | 2 +- .../src/Microsoft.AutoGen/Agents/AgentBase.cs | 21 +++++++ .../Microsoft.AutoGen/Agents/AgentRuntime.cs | 4 ++ .../Agents/Microsoft.AutoGen.Agents.csproj | 2 +- .../Agents/Services/AgentWorker.cs | 10 +++- .../Services/AgentWorkerHostingExtensions.cs | 2 +- .../Agents/Services/Grpc/GrpcAgentWorker.cs | 19 ++++++ .../GrpcAgentWorkerHostBuilderExtension.cs | 6 +- .../Agents/Services/Grpc/GrpcGateway.cs | 46 +++++++++++++- .../Microsoft.AutoGen/Agents/Services/Host.cs | 4 +- .../Agents/Services/HostBuilderExtensions.cs | 6 +- .../Services/Orleans/ISubscriptionsGrain.cs | 10 ++++ .../Agents/Services/Orleans/RegistryGrain.cs | 2 +- .../Services/Orleans/SubscriptionsGrain.cs | 48 +++++++++++++++ .../AIModelClientHostingExtensions.csproj | 4 +- ...t.AutoGen.Extensions.SemanticKernel.csproj | 2 +- .../Microsoft.AutoGen.ServiceDefaults.csproj | 2 +- .../AutoGen.AotCompatibility.Tests.csproj | 2 +- .../test/AutoGen.Tests/AutoGen.Tests.csproj | 2 +- python/.gitignore | 3 + 41 files changed, 265 insertions(+), 84 deletions(-) create mode 100644 dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/ISubscriptionsGrain.cs create mode 100644 dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/SubscriptionsGrain.cs diff --git a/dotnet/Directory.Packages.props b/dotnet/Directory.Packages.props index c1b26f2f9b91..5321860e427f 100644 --- a/dotnet/Directory.Packages.props +++ b/dotnet/Directory.Packages.props @@ -4,26 +4,25 @@ 1.22.0 1.22.0-alpha 9.0.0-preview.9.24525.1 - direct - + - - - + + + - - + + - + - + - + @@ -43,25 +42,25 @@ - + - + - - - - + + + + - + runtime; build; native; contentfiles; analyzers; buildtransitive @@ -78,17 +77,17 @@ - + - + - - + + @@ -96,28 +95,29 @@ - - + + - - + + - + - + - + + - + - + \ No newline at end of file diff --git a/dotnet/global.json b/dotnet/global.json index 5f78cce063fa..4f9e9b79a15a 100644 --- a/dotnet/global.json +++ b/dotnet/global.json @@ -1,6 +1,6 @@ { "sdk": { - "version": "8.0.104", + "version": "8.0.401", "rollForward": "latestMinor" } } diff --git a/dotnet/samples/Hello/Backend/Backend.csproj b/dotnet/samples/Hello/Backend/Backend.csproj index d502d7260d15..360459334805 100644 --- a/dotnet/samples/Hello/Backend/Backend.csproj +++ b/dotnet/samples/Hello/Backend/Backend.csproj @@ -1,4 +1,4 @@ - + diff --git a/dotnet/samples/Hello/Backend/Program.cs b/dotnet/samples/Hello/Backend/Program.cs index b913d39d643f..b74dba139826 100644 --- a/dotnet/samples/Hello/Backend/Program.cs +++ b/dotnet/samples/Hello/Backend/Program.cs @@ -1,7 +1,5 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Program.cs -using Microsoft.Extensions.Hosting; - var app = await Microsoft.AutoGen.Agents.Host.StartAsync(local: false, useGrpc: true); await app.WaitForShutdownAsync(); diff --git a/dotnet/samples/Hello/Backend/appsettings.json b/dotnet/samples/Hello/Backend/appsettings.json index 3bb8d882550c..ae32fe371a70 100644 --- a/dotnet/samples/Hello/Backend/appsettings.json +++ b/dotnet/samples/Hello/Backend/appsettings.json @@ -1,9 +1,15 @@ { - "Logging": { - "LogLevel": { - "Default": "Warning", - "Microsoft": "Warning", - "Microsoft.Orleans": "Warning" - } + "Logging": { + "LogLevel": { + "Default": "Warning", + "Microsoft": "Warning", + "Microsoft.Orleans": "Warning" } - } \ No newline at end of file + }, + "AllowedHosts": "*", + "Kestrel": { + "EndpointDefaults": { + "Protocols": "Http2" + } + } +} diff --git a/dotnet/samples/Hello/Hello.AppHost/Hello.AppHost.csproj b/dotnet/samples/Hello/Hello.AppHost/Hello.AppHost.csproj index 88d23268c44d..5ce0d0531faf 100644 --- a/dotnet/samples/Hello/Hello.AppHost/Hello.AppHost.csproj +++ b/dotnet/samples/Hello/Hello.AppHost/Hello.AppHost.csproj @@ -1,7 +1,10 @@ + + + Exe - net8.0 + net8.0 enable enable true diff --git a/dotnet/samples/Hello/Hello.AppHost/Program.cs b/dotnet/samples/Hello/Hello.AppHost/Program.cs index d9acc3ea3f12..326eddbcc9ec 100644 --- a/dotnet/samples/Hello/Hello.AppHost/Program.cs +++ b/dotnet/samples/Hello/Hello.AppHost/Program.cs @@ -1,7 +1,19 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Program.cs +using Microsoft.Extensions.Hosting; + var builder = DistributedApplication.CreateBuilder(args); -var backend = builder.AddProject("backend"); -builder.AddProject("client").WithReference(backend).WaitFor(backend); -builder.Build().Run(); +var backend = builder.AddProject("backend").WithExternalHttpEndpoints(); +builder.AddProject("client") + .WithReference(backend) + .WithEnvironment("AGENT_HOST", $"{backend.GetEndpoint("https").Property(EndpointProperty.Url)}") + .WaitFor(backend); + +using var app = builder.Build(); + +await app.StartAsync(); +var url = backend.GetEndpoint("http").Url; +Console.WriteLine("Backend URL: " + url); + +await app.WaitForShutdownAsync(); diff --git a/dotnet/samples/Hello/HelloAIAgents/HelloAIAgents.csproj b/dotnet/samples/Hello/HelloAIAgents/HelloAIAgents.csproj index f17ab0c9f0a5..e49cfd456abc 100644 --- a/dotnet/samples/Hello/HelloAIAgents/HelloAIAgents.csproj +++ b/dotnet/samples/Hello/HelloAIAgents/HelloAIAgents.csproj @@ -1,7 +1,7 @@ Exe - net8.0 + net8.0 enable enable diff --git a/dotnet/samples/Hello/HelloAgent/HelloAgent.csproj b/dotnet/samples/Hello/HelloAgent/HelloAgent.csproj index 88893ccc874a..dcb693a52225 100644 --- a/dotnet/samples/Hello/HelloAgent/HelloAgent.csproj +++ b/dotnet/samples/Hello/HelloAgent/HelloAgent.csproj @@ -1,7 +1,7 @@ Exe - net8.0 + net8.0 enable enable diff --git a/dotnet/samples/Hello/HelloAgent/Program.cs b/dotnet/samples/Hello/HelloAgent/Program.cs index 506d91502328..4f74520a71e0 100644 --- a/dotnet/samples/Hello/HelloAgent/Program.cs +++ b/dotnet/samples/Hello/HelloAgent/Program.cs @@ -19,7 +19,7 @@ { Message = "World" }, local: true); - +//var app = await AgentsApp.StartAsync(); await app.WaitForShutdownAsync(); namespace Hello @@ -33,7 +33,8 @@ public class HelloAgent( ISayHello, IHandleConsole, IHandle, - IHandle + IHandle, + IHandle { public async Task Handle(NewMessageReceived item) { @@ -50,13 +51,14 @@ public async Task Handle(NewMessageReceived item) public async Task Handle(ConversationClosed item) { var goodbye = $"********************* {item.UserId} said {item.UserMessage} ************************"; - var evt = new Output - { - Message = goodbye - }; - await PublishMessageAsync(evt).ConfigureAwait(false); + var evt = new Output { Message = goodbye }; + await PublishMessageAsync(evt).ConfigureAwait(true); + await PublishMessageAsync(new Shutdown()).ConfigureAwait(false); + } - // Signal shutdown. + public async Task Handle(Shutdown item) + { + Console.WriteLine("Shutting down..."); hostApplicationLifetime.StopApplication(); } diff --git a/dotnet/samples/Hello/HelloAgentState/HelloAgentState.csproj b/dotnet/samples/Hello/HelloAgentState/HelloAgentState.csproj index 797fe957bb75..e26b6c9521c2 100644 --- a/dotnet/samples/Hello/HelloAgentState/HelloAgentState.csproj +++ b/dotnet/samples/Hello/HelloAgentState/HelloAgentState.csproj @@ -1,7 +1,7 @@ Exe - net8.0 + net8.0 enable enable diff --git a/dotnet/samples/Hello/HelloAgentState/Program.cs b/dotnet/samples/Hello/HelloAgentState/Program.cs index 7c15c4c54df0..664689de824d 100644 --- a/dotnet/samples/Hello/HelloAgentState/Program.cs +++ b/dotnet/samples/Hello/HelloAgentState/Program.cs @@ -9,7 +9,7 @@ var app = await AgentsApp.PublishMessageAsync("HelloAgents", new NewMessageReceived { Message = "World" -}, local: true); +}, local: false); await app.WaitForShutdownAsync(); diff --git a/dotnet/samples/dev-team/DevTeam.AgentHost/DevTeam.AgentHost.csproj b/dotnet/samples/dev-team/DevTeam.AgentHost/DevTeam.AgentHost.csproj index d8d7ebf8e48b..bf1bed178248 100644 --- a/dotnet/samples/dev-team/DevTeam.AgentHost/DevTeam.AgentHost.csproj +++ b/dotnet/samples/dev-team/DevTeam.AgentHost/DevTeam.AgentHost.csproj @@ -1,7 +1,7 @@ - net8.0 + net8.0 enable enable diff --git a/dotnet/samples/dev-team/DevTeam.Agents/DevTeam.Agents.csproj b/dotnet/samples/dev-team/DevTeam.Agents/DevTeam.Agents.csproj index 8dfd6912e547..d8034a01b99a 100644 --- a/dotnet/samples/dev-team/DevTeam.Agents/DevTeam.Agents.csproj +++ b/dotnet/samples/dev-team/DevTeam.Agents/DevTeam.Agents.csproj @@ -1,7 +1,7 @@ - net8.0 + net8.0 enable enable diff --git a/dotnet/samples/dev-team/DevTeam.AppHost/DevTeam.AppHost.csproj b/dotnet/samples/dev-team/DevTeam.AppHost/DevTeam.AppHost.csproj index a9227ea9516d..89d121b303ea 100644 --- a/dotnet/samples/dev-team/DevTeam.AppHost/DevTeam.AppHost.csproj +++ b/dotnet/samples/dev-team/DevTeam.AppHost/DevTeam.AppHost.csproj @@ -1,8 +1,10 @@ + + Exe - net8.0 + net8.0 enable enable true diff --git a/dotnet/samples/dev-team/DevTeam.Backend/DevTeam.Backend.csproj b/dotnet/samples/dev-team/DevTeam.Backend/DevTeam.Backend.csproj index 8296f7aa670e..c486471984fa 100644 --- a/dotnet/samples/dev-team/DevTeam.Backend/DevTeam.Backend.csproj +++ b/dotnet/samples/dev-team/DevTeam.Backend/DevTeam.Backend.csproj @@ -5,7 +5,7 @@ - net8.0 + net8.0 enable enable diff --git a/dotnet/samples/dev-team/DevTeam.Shared/DevTeam.Shared.csproj b/dotnet/samples/dev-team/DevTeam.Shared/DevTeam.Shared.csproj index bc739135da95..18fcb9745238 100644 --- a/dotnet/samples/dev-team/DevTeam.Shared/DevTeam.Shared.csproj +++ b/dotnet/samples/dev-team/DevTeam.Shared/DevTeam.Shared.csproj @@ -5,7 +5,7 @@ - net8.0 + net8.0 enable enable diff --git a/dotnet/src/AutoGen.Core/Function/FunctionAttribute.cs b/dotnet/src/AutoGen.Core/Function/FunctionAttribute.cs index 9418dc7fd6ae..9367f5c6f297 100644 --- a/dotnet/src/AutoGen.Core/Function/FunctionAttribute.cs +++ b/dotnet/src/AutoGen.Core/Function/FunctionAttribute.cs @@ -98,7 +98,7 @@ public static implicit operator AIFunctionMetadata(FunctionContract contract) [NamespaceKey] = contract.Namespace, [ClassNameKey] = contract.ClassName, }, - Parameters = [.. contract.Parameters?.Select(p => (AIFunctionParameterMetadata)p)], + Parameters = [.. contract.Parameters?.Select(p => (AIFunctionParameterMetadata)p)!], }; } } diff --git a/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentBase.cs b/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentBase.cs index 14c2688c236a..ee7b9e74583c 100644 --- a/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentBase.cs +++ b/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentBase.cs @@ -19,4 +19,5 @@ public interface IAgentBase Task ReadAsync(AgentId agentId, CancellationToken cancellationToken = default) where T : IMessage, new(); ValueTask PublishEventAsync(CloudEvent item, CancellationToken cancellationToken = default); ValueTask PublishEventAsync(string topic, IMessage evt, CancellationToken cancellationToken = default); + List Subscribe(string topic); } diff --git a/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentRuntime.cs b/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentRuntime.cs index 2125e57a8b96..aa5b5a13a6dc 100644 --- a/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentRuntime.cs +++ b/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentRuntime.cs @@ -13,6 +13,7 @@ public interface IAgentRuntime ValueTask ReadAsync(AgentId agentId, CancellationToken cancellationToken = default); ValueTask SendResponseAsync(RpcRequest request, RpcResponse response, CancellationToken cancellationToken = default); ValueTask SendRequestAsync(IAgentBase agent, RpcRequest request, CancellationToken cancellationToken = default); + ValueTask SendMessageAsync(Message message, CancellationToken cancellationToken = default); ValueTask PublishEventAsync(CloudEvent @event, CancellationToken cancellationToken = default); void Update(Activity? activity, RpcRequest request); void Update(Activity? activity, CloudEvent cloudEvent); diff --git a/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentWorker.cs b/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentWorker.cs index 67a867d87dfa..adce9be60c9e 100644 --- a/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentWorker.cs +++ b/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentWorker.cs @@ -8,6 +8,7 @@ public interface IAgentWorker ValueTask PublishEventAsync(CloudEvent evt, CancellationToken cancellationToken = default); ValueTask SendRequestAsync(IAgentBase agent, RpcRequest request, CancellationToken cancellationToken = default); ValueTask SendResponseAsync(RpcResponse response, CancellationToken cancellationToken = default); + ValueTask SendMessageAsync(Message message, CancellationToken cancellationToken = default); ValueTask StoreAsync(AgentState value, CancellationToken cancellationToken = default); ValueTask ReadAsync(AgentId agentId, CancellationToken cancellationToken = default); } diff --git a/dotnet/src/Microsoft.AutoGen/Abstractions/Microsoft.AutoGen.Abstractions.csproj b/dotnet/src/Microsoft.AutoGen/Abstractions/Microsoft.AutoGen.Abstractions.csproj index e24b52187c82..c680e201301b 100644 --- a/dotnet/src/Microsoft.AutoGen/Abstractions/Microsoft.AutoGen.Abstractions.csproj +++ b/dotnet/src/Microsoft.AutoGen/Abstractions/Microsoft.AutoGen.Abstractions.csproj @@ -1,7 +1,7 @@ - net8.0 + net8.0 enable enable AutoGen.Core diff --git a/dotnet/src/Microsoft.AutoGen/Agents/AgentBase.cs b/dotnet/src/Microsoft.AutoGen/Agents/AgentBase.cs index 6fffdaadf1d8..13b2e851969e 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/AgentBase.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/AgentBase.cs @@ -114,6 +114,27 @@ await this.InvokeWithActivityAsync( break; } } + public List Subscribe(string topic) + { + Message message = new() + { + AddSubscriptionRequest = new() + { + RequestId = Guid.NewGuid().ToString(), + Subscription = new Subscription + { + TypeSubscription = new TypeSubscription + { + TopicType = topic, + AgentType = this.AgentId.Key + } + } + } + }; + _context.SendMessageAsync(message).AsTask().Wait(); + + return new List { topic }; + } public async Task StoreAsync(AgentState state, CancellationToken cancellationToken = default) { await _context.StoreAsync(state, cancellationToken).ConfigureAwait(false); diff --git a/dotnet/src/Microsoft.AutoGen/Agents/AgentRuntime.cs b/dotnet/src/Microsoft.AutoGen/Agents/AgentRuntime.cs index 86944cad3ab3..fad372ce2f93 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/AgentRuntime.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/AgentRuntime.cs @@ -45,6 +45,10 @@ public async ValueTask SendRequestAsync(IAgentBase agent, RpcRequest request, Ca { await worker.SendRequestAsync(agent, request, cancellationToken).ConfigureAwait(false); } + public async ValueTask SendMessageAsync(Message message, CancellationToken cancellationToken = default) + { + await worker.SendMessageAsync(message, cancellationToken).ConfigureAwait(false); + } public async ValueTask PublishEventAsync(CloudEvent @event, CancellationToken cancellationToken = default) { await worker.PublishEventAsync(@event, cancellationToken).ConfigureAwait(false); diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Microsoft.AutoGen.Agents.csproj b/dotnet/src/Microsoft.AutoGen/Agents/Microsoft.AutoGen.Agents.csproj index 3bc2b3acb012..68b26f88b9c2 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Microsoft.AutoGen.Agents.csproj +++ b/dotnet/src/Microsoft.AutoGen/Agents/Microsoft.AutoGen.Agents.csproj @@ -1,7 +1,7 @@ - net8.0 + net8.0 enable enable Microsoft.AutoGen.Agents diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Services/AgentWorker.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/AgentWorker.cs index 490051490315..a69da96fb3d4 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Services/AgentWorker.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/AgentWorker.cs @@ -47,7 +47,7 @@ public async ValueTask PublishEventAsync(CloudEvent cloudEvent, CancellationToke { foreach (var (typeName, _) in _agentTypes) { - if (typeName == "Client") { continue; } + if (typeName == nameof(Client)) { continue; } var agent = GetOrActivateAgent(new AgentId(typeName, cloudEvent.Source)); agent.ReceiveMessage(new Message { CloudEvent = cloudEvent }); } @@ -63,6 +63,10 @@ public ValueTask SendResponseAsync(RpcResponse response, CancellationToken cance { return _mailbox.Writer.WriteAsync(new Message { Response = response }, cancellationToken); } + public ValueTask SendMessageAsync(Message message, CancellationToken cancellationToken = default) + { + return _mailbox.Writer.WriteAsync(message, cancellationToken); + } public ValueTask StoreAsync(AgentState value, CancellationToken cancellationToken = default) { var agentId = value.AgentId ?? throw new InvalidOperationException("AgentId is required when saving AgentState."); @@ -92,6 +96,10 @@ public async Task RunMessagePump() if (message == null) { continue; } switch (message) { + case Message.MessageOneofCase.AddSubscriptionResponse: + break; + case Message.MessageOneofCase.RegisterAgentTypeResponse: + break; case Message msg: var item = msg.CloudEvent; diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Services/AgentWorkerHostingExtensions.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/AgentWorkerHostingExtensions.cs index 3736fc76cb61..fab29e86ce71 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Services/AgentWorkerHostingExtensions.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/AgentWorkerHostingExtensions.cs @@ -29,7 +29,7 @@ public static IHostApplicationBuilder AddAgentService(this IHostApplicationBuild public static IHostApplicationBuilder AddLocalAgentService(this IHostApplicationBuilder builder, bool useGrpc = true) { - return builder.AddAgentService(local: true, useGrpc); + return builder.AddAgentService(local: false, useGrpc); } public static WebApplication MapAgentService(this WebApplication app, bool local = false, bool useGrpc = true) diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcAgentWorker.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcAgentWorker.cs index 431a5629c142..48f07573430d 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcAgentWorker.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcAgentWorker.cs @@ -85,6 +85,13 @@ private async Task RunReadPump() } break; + case Message.MessageOneofCase.AddSubscriptionResponse: + if (!message.AddSubscriptionResponse.Success) + { + throw new InvalidOperationException($"Failed to add subscription: '{message.AddSubscriptionResponse.Error}'."); + } + break; + case Message.MessageOneofCase.CloudEvent: // HACK: Send the message to an instance of each agent type @@ -153,6 +160,13 @@ private async Task RunWritePump() item.WriteCompletionSource?.TrySetCanceled(); break; } + catch (RpcException ex) when (ex.StatusCode == StatusCode.Unavailable) + { + // we could not connect to the endpoint - most likely we have the wrong port or failed ssl + // we need to let the user know what port we tried to connect to and then do backoff and retry + _logger.LogError(ex, "Error connecting to GRPC endpoint {Endpoint}.", channel.ToString()); + break; + } catch (Exception ex) when (!_shutdownCts.IsCancellationRequested) { item.WriteCompletionSource?.TrySetException(ex); @@ -230,6 +244,11 @@ await WriteChannelAsync(new Message await WriteChannelAsync(new Message { Request = request }, cancellationToken).ConfigureAwait(false); } // new is intentional + public new async ValueTask SendMessageAsync(Message message, CancellationToken cancellationToken = default) + { + await WriteChannelAsync(message, cancellationToken).ConfigureAwait(false); + } + // new is intentional public new async ValueTask PublishEventAsync(CloudEvent @event, CancellationToken cancellationToken = default) { await WriteChannelAsync(new Message { CloudEvent = @event }, cancellationToken).ConfigureAwait(false); diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcAgentWorkerHostBuilderExtension.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcAgentWorkerHostBuilderExtension.cs index 670411b33677..6757428302f4 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcAgentWorkerHostBuilderExtension.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcAgentWorkerHostBuilderExtension.cs @@ -11,12 +11,12 @@ namespace Microsoft.AutoGen.Agents; public static class GrpcAgentWorkerHostBuilderExtensions { - private const string _defaultAgentServiceAddress = "https://localhost:5001"; - public static IHostApplicationBuilder AddGrpcAgentWorker(this IHostApplicationBuilder builder, string agentServiceAddress = _defaultAgentServiceAddress) + private const string _defaultAgentServiceAddress = "https://localhost:53071"; + public static IHostApplicationBuilder AddGrpcAgentWorker(this IHostApplicationBuilder builder, string? agentServiceAddress = null) { builder.Services.AddGrpcClient(options => { - options.Address = new Uri(agentServiceAddress); + options.Address = new Uri(agentServiceAddress ?? builder.Configuration["AGENT_HOST"] ?? _defaultAgentServiceAddress); options.ChannelOptionsActions.Add(channelOptions => { diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcGateway.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcGateway.cs index 89e9c55c4648..45477c8eb5a6 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcGateway.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcGateway.cs @@ -16,10 +16,13 @@ public sealed class GrpcGateway : BackgroundService, IGateway private readonly IClusterClient _clusterClient; private readonly ConcurrentDictionary _agentState = new(); private readonly IRegistryGrain _gatewayRegistry; + private readonly ISubscriptionsGrain _subscriptions; private readonly IGateway _reference; // The agents supported by each worker process. private readonly ConcurrentDictionary> _supportedAgentTypes = []; public readonly ConcurrentDictionary _workers = new(); + private readonly ConcurrentDictionary _subscriptionsByAgentType = new(); + private readonly ConcurrentDictionary> _subscriptionsByTopic = new(); // The mapping from agent id to worker process. private readonly ConcurrentDictionary<(string Type, string Key), GrpcWorkerConnection> _agentDirectory = new(); @@ -33,6 +36,7 @@ public GrpcGateway(IClusterClient clusterClient, ILogger logger) _clusterClient = clusterClient; _reference = clusterClient.CreateObjectReference(this); _gatewayRegistry = clusterClient.GetGrain(0); + _subscriptions = clusterClient.GetGrain(0); } public async ValueTask BroadcastEvent(CloudEvent evt) { @@ -102,16 +106,54 @@ internal async Task OnReceivedMessageAsync(GrpcWorkerConnection connection, Mess case Message.MessageOneofCase.RegisterAgentTypeRequest: await RegisterAgentTypeAsync(connection, message.RegisterAgentTypeRequest); break; + case Message.MessageOneofCase.AddSubscriptionRequest: + await AddSubscriptionAsync(connection, message.AddSubscriptionRequest); + break; default: - throw new InvalidOperationException($"Unknown message type for message '{message}'."); + // if it wasn't recognized return bad request + await RespondBadRequestAsync(connection, $"Unknown message type for message '{message}'."); + break; + }; + } + private async ValueTask RespondBadRequestAsync(GrpcWorkerConnection connection, string error) + { + throw new RpcException(new Status(StatusCode.InvalidArgument, error)); + } + private async ValueTask AddSubscriptionAsync(GrpcWorkerConnection connection, AddSubscriptionRequest request) + { + var topic = request.Subscription.TypeSubscription.TopicType; + var agentType = request.Subscription.TypeSubscription.AgentType; + _subscriptionsByAgentType[agentType] = request.Subscription; + _subscriptionsByTopic.GetOrAdd(topic, _ => []).Add(agentType); + await _subscriptions.Subscribe(topic, agentType); + //var response = new AddSubscriptionResponse { RequestId = request.RequestId, Error = "", Success = true }; + Message response = new() + { + AddSubscriptionResponse = new() + { + RequestId = request.RequestId, + Error = "", + Success = true + } }; + await connection.ResponseStream.WriteAsync(response).ConfigureAwait(false); } private async ValueTask RegisterAgentTypeAsync(GrpcWorkerConnection connection, RegisterAgentTypeRequest msg) { connection.AddSupportedType(msg.Type); _supportedAgentTypes.GetOrAdd(msg.Type, _ => []).Add(connection); - await _gatewayRegistry.RegisterAgentType(msg.Type, _reference); + await _gatewayRegistry.RegisterAgentType(msg.Type, _reference).ConfigureAwait(true); + Message response = new() + { + RegisterAgentTypeResponse = new() + { + RequestId = msg.RequestId, + Error = "", + Success = true + } + }; + await connection.ResponseStream.WriteAsync(response).ConfigureAwait(false); } private async ValueTask DispatchEventAsync(CloudEvent evt) { diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Services/Host.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/Host.cs index 5b725af0c9a9..464536d54b21 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Services/Host.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/Host.cs @@ -14,11 +14,11 @@ public static async Task StartAsync(bool local = false, bool use builder.AddServiceDefaults(); if (local) { - builder.AddLocalAgentService(useGrpc); + builder.AddLocalAgentService(useGrpc: useGrpc); } else { - builder.AddAgentService(useGrpc); + builder.AddAgentService(useGrpc: useGrpc); } var app = builder.Build(); app.MapAgentService(local, useGrpc); diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Services/HostBuilderExtensions.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/HostBuilderExtensions.cs index f020f0bb6670..f21096ccfbdb 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Services/HostBuilderExtensions.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/HostBuilderExtensions.cs @@ -15,7 +15,7 @@ namespace Microsoft.AutoGen.Agents; public static class HostBuilderExtensions { - private const string _defaultAgentServiceAddress = "https://localhost:5001"; + private const string _defaultAgentServiceAddress = "https://localhost:53071"; public static IHostApplicationBuilder AddAgent< [DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicConstructors)] TAgent>(this IHostApplicationBuilder builder, string typeName) where TAgent : AgentBase @@ -28,12 +28,12 @@ public static IHostApplicationBuilder AddAgent< public static IHostApplicationBuilder AddAgent(this IHostApplicationBuilder builder, string typeName, Type agentType) { builder.Services.AddKeyedSingleton("AgentTypes", (sp, key) => Tuple.Create(typeName, agentType)); - return builder; } - public static IHostApplicationBuilder AddAgentWorker(this IHostApplicationBuilder builder, string agentServiceAddress = _defaultAgentServiceAddress, bool local = false) + public static IHostApplicationBuilder AddAgentWorker(this IHostApplicationBuilder builder, string? agentServiceAddress = null, bool local = false) { + agentServiceAddress ??= builder.Configuration["AGENT_HOST"] ?? _defaultAgentServiceAddress; builder.Services.TryAddSingleton(DistributedContextPropagator.Current); // if !local, then add the gRPC client diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/ISubscriptionsGrain.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/ISubscriptionsGrain.cs new file mode 100644 index 000000000000..302df9ebff98 --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/ISubscriptionsGrain.cs @@ -0,0 +1,10 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// ISubscriptionsGrain.cs + +namespace Microsoft.AutoGen.Agents; +public interface ISubscriptionsGrain : IGrainWithIntegerKey +{ + ValueTask Subscribe(string agentType, string topic); + ValueTask Unsubscribe(string agentType, string topic); + ValueTask>> GetSubscriptions(string agentType); +} diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/RegistryGrain.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/RegistryGrain.cs index c5114e3e7423..cb7523126436 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/RegistryGrain.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/RegistryGrain.cs @@ -5,7 +5,7 @@ namespace Microsoft.AutoGen.Agents; -public sealed class RegistryGrain : Grain, IRegistryGrain +internal sealed class RegistryGrain : Grain, IRegistryGrain { // TODO: use persistent state for some of these or (better) extend Orleans to implement some of this natively. private readonly Dictionary _workerStates = new(); diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/SubscriptionsGrain.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/SubscriptionsGrain.cs new file mode 100644 index 000000000000..905dc8e914ac --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/SubscriptionsGrain.cs @@ -0,0 +1,48 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// SubscriptionsGrain.cs + +namespace Microsoft.AutoGen.Agents; + +internal sealed class SubscriptionsGrain([PersistentState("state", "PubSubStore")] IPersistentState state) : Grain, ISubscriptionsGrain +{ + private readonly Dictionary> _subscriptions = new(); + public ValueTask>> GetSubscriptions(string agentType) + { + return new ValueTask>>(_subscriptions); + } + public ValueTask Subscribe(string agentType, string topic) + { + if (!_subscriptions.TryGetValue(topic, out var subscriptions)) + { + subscriptions = _subscriptions[topic] = []; + } + if (!subscriptions.Contains(agentType)) + { + subscriptions.Add(agentType); + } + _subscriptions[topic] = subscriptions; + state.State.Subscriptions = _subscriptions; + state.WriteStateAsync(); + + return ValueTask.CompletedTask; + } + public ValueTask Unsubscribe(string agentType, string topic) + { + if (!_subscriptions.TryGetValue(topic, out var subscriptions)) + { + subscriptions = _subscriptions[topic] = []; + } + if (!subscriptions.Contains(agentType)) + { + subscriptions.Remove(agentType); + } + _subscriptions[topic] = subscriptions; + state.State.Subscriptions = _subscriptions; + state.WriteStateAsync(); + return ValueTask.CompletedTask; + } +} +public sealed class SubscriptionsState +{ + public Dictionary> Subscriptions { get; set; } = new(); +} diff --git a/dotnet/src/Microsoft.AutoGen/Extensions/AIModelClientHostingExtensions/AIModelClientHostingExtensions.csproj b/dotnet/src/Microsoft.AutoGen/Extensions/AIModelClientHostingExtensions/AIModelClientHostingExtensions.csproj index 2358351deb6c..970ae5db4b78 100644 --- a/dotnet/src/Microsoft.AutoGen/Extensions/AIModelClientHostingExtensions/AIModelClientHostingExtensions.csproj +++ b/dotnet/src/Microsoft.AutoGen/Extensions/AIModelClientHostingExtensions/AIModelClientHostingExtensions.csproj @@ -1,6 +1,6 @@ - net8.0 + net8.0 enable enable @@ -14,6 +14,6 @@ - + diff --git a/dotnet/src/Microsoft.AutoGen/Extensions/SemanticKernel/Microsoft.AutoGen.Extensions.SemanticKernel.csproj b/dotnet/src/Microsoft.AutoGen/Extensions/SemanticKernel/Microsoft.AutoGen.Extensions.SemanticKernel.csproj index fb47750fd44d..3c7fe517799b 100644 --- a/dotnet/src/Microsoft.AutoGen/Extensions/SemanticKernel/Microsoft.AutoGen.Extensions.SemanticKernel.csproj +++ b/dotnet/src/Microsoft.AutoGen/Extensions/SemanticKernel/Microsoft.AutoGen.Extensions.SemanticKernel.csproj @@ -6,7 +6,7 @@ - net8.0 + net8.0 enable enable diff --git a/dotnet/src/Microsoft.AutoGen/Extensions/ServiceDefaults/Microsoft.AutoGen.ServiceDefaults.csproj b/dotnet/src/Microsoft.AutoGen/Extensions/ServiceDefaults/Microsoft.AutoGen.ServiceDefaults.csproj index cf2446f93349..b70161c7e776 100644 --- a/dotnet/src/Microsoft.AutoGen/Extensions/ServiceDefaults/Microsoft.AutoGen.ServiceDefaults.csproj +++ b/dotnet/src/Microsoft.AutoGen/Extensions/ServiceDefaults/Microsoft.AutoGen.ServiceDefaults.csproj @@ -1,6 +1,6 @@ - net8.0 + net8.0 enable enable true diff --git a/dotnet/test/AutoGen.AotCompatibility.Tests/AutoGen.AotCompatibility.Tests.csproj b/dotnet/test/AutoGen.AotCompatibility.Tests/AutoGen.AotCompatibility.Tests.csproj index aec9660bb922..379bca541012 100644 --- a/dotnet/test/AutoGen.AotCompatibility.Tests/AutoGen.AotCompatibility.Tests.csproj +++ b/dotnet/test/AutoGen.AotCompatibility.Tests/AutoGen.AotCompatibility.Tests.csproj @@ -2,7 +2,7 @@ Exe - net8.0 + net8.0 enable enable true diff --git a/dotnet/test/AutoGen.Tests/AutoGen.Tests.csproj b/dotnet/test/AutoGen.Tests/AutoGen.Tests.csproj index 248a9e29b00d..367d74619bb4 100644 --- a/dotnet/test/AutoGen.Tests/AutoGen.Tests.csproj +++ b/dotnet/test/AutoGen.Tests/AutoGen.Tests.csproj @@ -14,7 +14,7 @@ - + diff --git a/python/.gitignore b/python/.gitignore index 186e847cc125..677a888f2f49 100644 --- a/python/.gitignore +++ b/python/.gitignore @@ -172,3 +172,6 @@ docs/**/jupyter_execute # Temporary files tmp_code_*.py + +# .NET Development settings +appsettings.Development.json \ No newline at end of file