diff --git a/.github/workflows/dotnet-build.yml b/.github/workflows/dotnet-build.yml index 8539502b9e33..ca55a6f142c7 100644 --- a/.github/workflows/dotnet-build.yml +++ b/.github/workflows/dotnet-build.yml @@ -65,6 +65,18 @@ jobs: - uses: actions/checkout@v4 with: lfs: true + - uses: astral-sh/setup-uv@v5 + with: + enable-cache: true + version: "0.5.18" + - uses: actions/setup-python@v5 + with: + python-version: "3.11" + - run: uv sync --locked --all-extras + working-directory: ./python + - name: Prepare python venv + run: | + source ${{ github.workspace }}/python/.venv/bin/activate - name: Setup .NET 8.0 uses: actions/setup-dotnet@v4 with: diff --git a/dotnet/AutoGen.sln b/dotnet/AutoGen.sln index a267a11e6fa9..cb590f21dbd0 100644 --- a/dotnet/AutoGen.sln +++ b/dotnet/AutoGen.sln @@ -122,6 +122,22 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "GettingStartedGrpc", "sampl EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.AutoGen.Core.Grpc.Tests", "test\Microsoft.AutoGen.Core.Grpc.Tests\Microsoft.AutoGen.Core.Grpc.Tests.csproj", "{23A028D3-5EB1-4FA0-9CD1-A1340B830579}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.AutoGen.RuntimeGateway.Grpc", "src\Microsoft.AutoGen\RuntimeGateway.Grpc\Microsoft.AutoGen.RuntimeGateway.Grpc.csproj", "{BE420A71-7615-4DFD-BE94-9409397949F1}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.AutoGen.RuntimeGateway.Grpc.Tests", "test\Microsoft.AutoGen.RuntimeGateway.Grpc.Tests\Microsoft.AutoGen.RuntimeGateway.Grpc.Tests.csproj", "{CDD859F3-1B60-4ECE-8472-54DF8EFCA682}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.AutoGen.Integration.Tests", "test\Microsoft.AutoGen.Integration.Tests\Microsoft.AutoGen.Integration.Tests.csproj", "{7A11022E-4E5D-4A4A-AADF-E715C2ECF800}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.AutoGen.AgentHost", "src\Microsoft.AutoGen\AgentHost\Microsoft.AutoGen.AgentHost.csproj", "{50C2E8D5-68AB-45A3-B96F-355E1F8AC039}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Hello.AppHost", "samples\Hello\Hello.AppHost\Hello.AppHost.csproj", "{B8E77E57-C983-4EEA-9589-906271486D80}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Microsoft.AutoGen", "Microsoft.AutoGen", "{81BA12F2-2D2F-42C1-AF83-FBDAA1A78A45}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.AutoGen.Agents", "src\Microsoft.AutoGen\Agents\Microsoft.AutoGen.Agents.csproj", "{EF954ED3-87D5-40F1-8557-E7179F43EA0E}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "HelloAgent.AppHost", "test\Microsoft.AutoGen.Integration.Tests.AppHosts\HelloAgent.AppHost\HelloAgent.AppHost.csproj", "{50082F76-917F-42EE-8869-8C72630423A7}" + Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.AutoGen.AgentChat", "src\Microsoft.AutoGen\AgentChat\Microsoft.AutoGen.AgentChat.csproj", "{7F828599-56E8-4597-8F68-EE26FD631417}" EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.AutoGen.AgentChat.Tests", "test\Microsoft.AutoGen.AgentChat.Tests\Microsoft.AutoGen.AgentChat.Tests.csproj", "{217A4F86-8ADD-4998-90BA-880092A019F5}" @@ -320,6 +336,34 @@ Global {23A028D3-5EB1-4FA0-9CD1-A1340B830579}.Debug|Any CPU.Build.0 = Debug|Any CPU {23A028D3-5EB1-4FA0-9CD1-A1340B830579}.Release|Any CPU.ActiveCfg = Release|Any CPU {23A028D3-5EB1-4FA0-9CD1-A1340B830579}.Release|Any CPU.Build.0 = Release|Any CPU + {BE420A71-7615-4DFD-BE94-9409397949F1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {BE420A71-7615-4DFD-BE94-9409397949F1}.Debug|Any CPU.Build.0 = Debug|Any CPU + {BE420A71-7615-4DFD-BE94-9409397949F1}.Release|Any CPU.ActiveCfg = Release|Any CPU + {BE420A71-7615-4DFD-BE94-9409397949F1}.Release|Any CPU.Build.0 = Release|Any CPU + {CDD859F3-1B60-4ECE-8472-54DF8EFCA682}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {CDD859F3-1B60-4ECE-8472-54DF8EFCA682}.Debug|Any CPU.Build.0 = Debug|Any CPU + {CDD859F3-1B60-4ECE-8472-54DF8EFCA682}.Release|Any CPU.ActiveCfg = Release|Any CPU + {CDD859F3-1B60-4ECE-8472-54DF8EFCA682}.Release|Any CPU.Build.0 = Release|Any CPU + {7A11022E-4E5D-4A4A-AADF-E715C2ECF800}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {7A11022E-4E5D-4A4A-AADF-E715C2ECF800}.Debug|Any CPU.Build.0 = Debug|Any CPU + {7A11022E-4E5D-4A4A-AADF-E715C2ECF800}.Release|Any CPU.ActiveCfg = Release|Any CPU + {7A11022E-4E5D-4A4A-AADF-E715C2ECF800}.Release|Any CPU.Build.0 = Release|Any CPU + {50C2E8D5-68AB-45A3-B96F-355E1F8AC039}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {50C2E8D5-68AB-45A3-B96F-355E1F8AC039}.Debug|Any CPU.Build.0 = Debug|Any CPU + {50C2E8D5-68AB-45A3-B96F-355E1F8AC039}.Release|Any CPU.ActiveCfg = Release|Any CPU + {50C2E8D5-68AB-45A3-B96F-355E1F8AC039}.Release|Any CPU.Build.0 = Release|Any CPU + {B8E77E57-C983-4EEA-9589-906271486D80}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {B8E77E57-C983-4EEA-9589-906271486D80}.Debug|Any CPU.Build.0 = Debug|Any CPU + {B8E77E57-C983-4EEA-9589-906271486D80}.Release|Any CPU.ActiveCfg = Release|Any CPU + {B8E77E57-C983-4EEA-9589-906271486D80}.Release|Any CPU.Build.0 = Release|Any CPU + {EF954ED3-87D5-40F1-8557-E7179F43EA0E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {EF954ED3-87D5-40F1-8557-E7179F43EA0E}.Debug|Any CPU.Build.0 = Debug|Any CPU + {EF954ED3-87D5-40F1-8557-E7179F43EA0E}.Release|Any CPU.ActiveCfg = Release|Any CPU + {EF954ED3-87D5-40F1-8557-E7179F43EA0E}.Release|Any CPU.Build.0 = Release|Any CPU + {50082F76-917F-42EE-8869-8C72630423A7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {50082F76-917F-42EE-8869-8C72630423A7}.Debug|Any CPU.Build.0 = Debug|Any CPU + {50082F76-917F-42EE-8869-8C72630423A7}.Release|Any CPU.ActiveCfg = Release|Any CPU + {50082F76-917F-42EE-8869-8C72630423A7}.Release|Any CPU.Build.0 = Release|Any CPU {7F828599-56E8-4597-8F68-EE26FD631417}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {7F828599-56E8-4597-8F68-EE26FD631417}.Debug|Any CPU.Build.0 = Debug|Any CPU {7F828599-56E8-4597-8F68-EE26FD631417}.Release|Any CPU.ActiveCfg = Release|Any CPU @@ -383,6 +427,14 @@ Global {3D83C6DB-ACEA-48F3-959F-145CCD2EE135} = {18BF8DD7-0585-48BF-8F97-AD333080CE06} {C3740DF1-18B1-4607-81E4-302F0308C848} = {CE0AA8D5-12B8-4628-9589-DAD8CB0DDCF6} {23A028D3-5EB1-4FA0-9CD1-A1340B830579} = {F823671B-3ECA-4AE6-86DA-25E920D3FE64} + {BE420A71-7615-4DFD-BE94-9409397949F1} = {18BF8DD7-0585-48BF-8F97-AD333080CE06} + {CDD859F3-1B60-4ECE-8472-54DF8EFCA682} = {F823671B-3ECA-4AE6-86DA-25E920D3FE64} + {7A11022E-4E5D-4A4A-AADF-E715C2ECF800} = {F823671B-3ECA-4AE6-86DA-25E920D3FE64} + {50C2E8D5-68AB-45A3-B96F-355E1F8AC039} = {18BF8DD7-0585-48BF-8F97-AD333080CE06} + {B8E77E57-C983-4EEA-9589-906271486D80} = {F42F9C8E-7BD9-4687-9B63-AFFA461AF5C1} + {81BA12F2-2D2F-42C1-AF83-FBDAA1A78A45} = {18BF8DD7-0585-48BF-8F97-AD333080CE06} + {EF954ED3-87D5-40F1-8557-E7179F43EA0E} = {18BF8DD7-0585-48BF-8F97-AD333080CE06} + {50082F76-917F-42EE-8869-8C72630423A7} = {F823671B-3ECA-4AE6-86DA-25E920D3FE64} {7F828599-56E8-4597-8F68-EE26FD631417} = {18BF8DD7-0585-48BF-8F97-AD333080CE06} {217A4F86-8ADD-4998-90BA-880092A019F5} = {F823671B-3ECA-4AE6-86DA-25E920D3FE64} EndGlobalSection diff --git a/dotnet/src/Microsoft.AutoGen/AgentHost/Host.cs b/dotnet/src/Microsoft.AutoGen/AgentHost/Host.cs index 1ecf42c79589..0176b3faa3e3 100644 --- a/dotnet/src/Microsoft.AutoGen/AgentHost/Host.cs +++ b/dotnet/src/Microsoft.AutoGen/AgentHost/Host.cs @@ -4,7 +4,7 @@ using Microsoft.AspNetCore.Builder; using Microsoft.Extensions.Hosting; -namespace Microsoft.AutoGen.Runtime.Grpc; +namespace Microsoft.AutoGen.RuntimeGateway.Grpc; public static class Host { diff --git a/dotnet/src/Microsoft.AutoGen/AgentHost/Microsoft.AutoGen.AgentHost.csproj b/dotnet/src/Microsoft.AutoGen/AgentHost/Microsoft.AutoGen.AgentHost.csproj index 33b051ad917b..dea603ebc668 100644 --- a/dotnet/src/Microsoft.AutoGen/AgentHost/Microsoft.AutoGen.AgentHost.csproj +++ b/dotnet/src/Microsoft.AutoGen/AgentHost/Microsoft.AutoGen.AgentHost.csproj @@ -15,7 +15,7 @@ - + \ No newline at end of file diff --git a/dotnet/src/Microsoft.AutoGen/AgentHost/Program.cs b/dotnet/src/Microsoft.AutoGen/AgentHost/Program.cs index 024ca0d4309f..3e32c50a3a0f 100644 --- a/dotnet/src/Microsoft.AutoGen/AgentHost/Program.cs +++ b/dotnet/src/Microsoft.AutoGen/AgentHost/Program.cs @@ -2,5 +2,5 @@ // Program.cs using Microsoft.Extensions.Hosting; -var app = await Microsoft.AutoGen.Runtime.Grpc.Host.StartAsync(local: false, useGrpc: true).ConfigureAwait(false); +var app = await Microsoft.AutoGen.RuntimeGateway.Grpc.Host.StartAsync(local: false, useGrpc: true).ConfigureAwait(false); await app.WaitForShutdownAsync(); diff --git a/dotnet/src/Microsoft.AutoGen/Agents/AIAgent/InferenceAgent.cs b/dotnet/src/Microsoft.AutoGen/Agents/AIAgent/InferenceAgent.cs new file mode 100644 index 000000000000..d3dc100012eb --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/Agents/AIAgent/InferenceAgent.cs @@ -0,0 +1,44 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// InferenceAgent.cs +using Google.Protobuf; +using Microsoft.AutoGen.Contracts; +using Microsoft.AutoGen.Core; +using Microsoft.Extensions.AI; +using Microsoft.Extensions.Logging; +namespace Microsoft.AutoGen.Agents; +/// +/// Base class for inference agents using the Microsoft.Extensions.AI library. +/// +/// +/// +/// +/// +/// +/// +public abstract class InferenceAgent( + AgentId id, + IAgentRuntime runtime, + string name, + ILogger>? logger, + IChatClient client) + : BaseAgent(id, runtime, name, logger) + where T : IMessage, new() +{ + protected IChatClient ChatClient { get; } = client; + private ILogger>? Logger => _logger as ILogger>; + private Task CompleteAsync( + IList chatMessages, + ChatOptions? options = null, + CancellationToken cancellationToken = default) + { + return ChatClient.CompleteAsync(chatMessages, options, cancellationToken); + } + private IAsyncEnumerable CompleteStreamingAsync( + IList chatMessages, + ChatOptions? options = null, + CancellationToken cancellationToken = default) + { + return ChatClient.CompleteStreamingAsync(chatMessages, options, cancellationToken); + } + +} diff --git a/dotnet/src/Microsoft.AutoGen/Agents/IOAgent/ConsoleAgent/IHandleConsole.cs b/dotnet/src/Microsoft.AutoGen/Agents/IOAgent/ConsoleAgent/IHandleConsole.cs new file mode 100644 index 000000000000..651be87314e0 --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/Agents/IOAgent/ConsoleAgent/IHandleConsole.cs @@ -0,0 +1,65 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// IHandleConsole.cs +using Google.Protobuf; +using Microsoft.AutoGen.Contracts; + +namespace Microsoft.AutoGen.Agents; +/// +/// Default interface methods for an event handler for Input and Output that writes or reads from the console +/// Can be used inside your agents by inheriting from this interface +/// public class MyAgent : BaseAgent, IHandleConsole +/// +public interface IHandleConsole : IHandle, IHandle, IProcessIO +{ + /// + /// Prototype for Publish Message Async method + /// + /// + /// + /// + /// + /// + /// ValueTask + ValueTask PublishMessageAsync(T message, TopicId topic, string? messageId, CancellationToken token = default) where T : IMessage; + + /// + /// Receives events of type Output and writes them to the console + /// then runs the ProcessOutputAsync method which you should implement in your agent + /// + /// + /// + /// ValueTask + async ValueTask IHandle.HandleAsync(Output item, MessageContext messageContext) + { + // Assuming item has a property `Message` that we want to write to the console + Console.WriteLine(item.Message); + await ProcessOutputAsync(item.Message); + + var evt = new OutputWritten + { + Route = "console" + }; + await PublishMessageAsync(evt, new TopicId("OutputWritten"), null, token: CancellationToken.None).ConfigureAwait(false); + } + + /// + /// Receives events of type Input and reads from the console, then runs the ProcessInputAsync method + /// which you should implement in your agent + /// + /// + /// + /// + async ValueTask IHandle.HandleAsync(Input item, MessageContext messageContext) + { + Console.WriteLine("Please enter input:"); + string content = Console.ReadLine() ?? string.Empty; + + await ProcessInputAsync(content); + + var evt = new InputProcessed + { + Route = "console" + }; + await PublishMessageAsync(evt, new TopicId("InputProcessed"), null, token: CancellationToken.None).ConfigureAwait(false); + } +} diff --git a/dotnet/src/Microsoft.AutoGen/Agents/IOAgent/FileAgent/IHandleFileIO.cs b/dotnet/src/Microsoft.AutoGen/Agents/IOAgent/FileAgent/IHandleFileIO.cs new file mode 100644 index 000000000000..b3d670004e19 --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/Agents/IOAgent/FileAgent/IHandleFileIO.cs @@ -0,0 +1,75 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// IHandleFileIO.cs + +using Google.Protobuf; +using Microsoft.AutoGen.Contracts; +using Microsoft.Extensions.Logging; + +namespace Microsoft.AutoGen.Agents; +/// +/// Default interface methods for an event handler for Input and Output that writes or reads from a file +/// Can be used inside your agents by inheriting from this interface +/// public class MyAgent : BaseAgent, IHandleFileIO +/// +public interface IHandleFileIO : IHandle, IHandle, IProcessIO +{ + // A Logger instance to log messages + ILogger LogTarget { get; } + // The path to the input file + string InputPath { get; } + // The path to the output file + string OutputPath { get; } + // The route of the agent (used in the post-process events) + const string Route = "Microsoft.AutoGen.Agents.IHandleFileIO"; + + /// + /// Prototype for Publish Message Async method + /// + /// + /// + /// + /// + /// + /// ValueTask + ValueTask PublishMessageAsync(T message, TopicId topic, string? messageId, CancellationToken token = default) where T : IMessage; + async ValueTask IHandle.HandleAsync(Input item, MessageContext messageContext) + { + + // validate that the file exists + if (!File.Exists(InputPath)) + { + var errorMessage = $"File not found: {InputPath}"; + LogTarget.LogError(errorMessage); + //publish IOError event + var err = new IOError + { + Message = errorMessage + }; + await PublishMessageAsync(err, new TopicId("IOError"), null, token: CancellationToken.None).ConfigureAwait(false); + return; + } + string content; + using (var reader = new StreamReader(item.Message)) + { + content = await reader.ReadToEndAsync(CancellationToken.None); + } + await ProcessInputAsync(content); + var evt = new InputProcessed + { + Route = Route + }; + await PublishMessageAsync(evt, new TopicId("InputProcessed"), null, token: CancellationToken.None).ConfigureAwait(false); + } + async ValueTask IHandle.HandleAsync(Output item, MessageContext messageContext) + { + using (var writer = new StreamWriter(OutputPath, append: true)) + { + await writer.WriteLineAsync(item.Message); + } + var evt = new OutputWritten + { + Route = Route + }; + await PublishMessageAsync(evt, new TopicId("OutputWritten"), null, token: CancellationToken.None).ConfigureAwait(false); + } +} diff --git a/dotnet/src/Microsoft.AutoGen/Agents/IOAgent/IProcessIO.cs b/dotnet/src/Microsoft.AutoGen/Agents/IOAgent/IProcessIO.cs new file mode 100644 index 000000000000..e348f3e1ca71 --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/Agents/IOAgent/IProcessIO.cs @@ -0,0 +1,23 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// IProcessIO.cs + +namespace Microsoft.AutoGen.Agents; + +/// +/// Default Interface methods for processing input and output shared by IOAgents that should be implemented in your agent +/// +public interface IProcessIO +{ + /// + /// Implement this method in your agent to process the input + /// + /// + /// Task + static Task ProcessOutputAsync(string message) { return Task.CompletedTask; } + /// + /// Implement this method in your agent to process the output + /// + /// + /// Task + static Task ProcessInputAsync(string message) { return Task.FromResult(message); } +} diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Microsoft.AutoGen.Agents.csproj b/dotnet/src/Microsoft.AutoGen/Agents/Microsoft.AutoGen.Agents.csproj new file mode 100644 index 000000000000..5032e95a12a0 --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/Agents/Microsoft.AutoGen.Agents.csproj @@ -0,0 +1,24 @@ + + + + net8.0 + enable + enable + + + + + + + + + + + + + + + + + + diff --git a/dotnet/src/Microsoft.AutoGen/Agents/protos/agent_events.proto b/dotnet/src/Microsoft.AutoGen/Agents/protos/agent_events.proto new file mode 100644 index 000000000000..414d79f9678c --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/Agents/protos/agent_events.proto @@ -0,0 +1,43 @@ +syntax = "proto3"; + +package agents; + +option csharp_namespace = "Microsoft.AutoGen.Agents"; +message TextMessage { + string textMessage = 1; + string source = 2; +} +message Input { + string message = 1; +} +message InputProcessed { + string route = 1; +} +message Output { + string message = 1; +} +message OutputWritten { + string route = 1; +} +message IOError { + string message = 1; +} +message NewMessageReceived { + string message = 1; +} +message ResponseGenerated { + string response = 1; +} +message GoodBye { + string message = 1; +} +message MessageStored { + string message = 1; +} +message ConversationClosed { + string user_id = 1; + string user_message = 2; +} +message Shutdown { + string message = 1; +} diff --git a/dotnet/src/Microsoft.AutoGen/Core.Grpc/GrpcAgentRuntime.cs b/dotnet/src/Microsoft.AutoGen/Core.Grpc/GrpcAgentRuntime.cs index f3dc619fdbef..324874a0005b 100644 --- a/dotnet/src/Microsoft.AutoGen/Core.Grpc/GrpcAgentRuntime.cs +++ b/dotnet/src/Microsoft.AutoGen/Core.Grpc/GrpcAgentRuntime.cs @@ -155,6 +155,7 @@ private async ValueTask HandleRequest(RpcRequest request, CancellationToken canc var messageContext = new MessageContext(request.RequestId, cancellationToken) { + Sender = request.Source?.FromProtobuf() ?? null, Topic = null, IsRpc = true @@ -275,6 +276,7 @@ public Task StopAsync(CancellationToken cancellationToken) var request = new RpcRequest { RequestId = Guid.NewGuid().ToString(), + Source = sender?.ToProtobuf() ?? null, Target = recepient.ToProtobuf(), Payload = payload, diff --git a/dotnet/src/Microsoft.AutoGen/Core/AgentRuntimeExtensions.cs b/dotnet/src/Microsoft.AutoGen/Core/AgentRuntimeExtensions.cs index fd7a479adde3..dfda552956ba 100644 --- a/dotnet/src/Microsoft.AutoGen/Core/AgentRuntimeExtensions.cs +++ b/dotnet/src/Microsoft.AutoGen/Core/AgentRuntimeExtensions.cs @@ -62,7 +62,7 @@ private static ISubscriptionDefinition[] BindSubscriptionsForAgentType(AgentType var classSubscriptions = runtimeType.GetCustomAttributes().Select(t => t.Bind(agentType)); subscriptions.AddRange(classSubscriptions); - var prefixSubscriptions = runtimeType.GetCustomAttributes().Select(t => t.Bind(agentType)); + var prefixSubscriptions = runtimeType.GetCustomAttributes().Select(t => t.Bind(agentType)); subscriptions.AddRange(prefixSubscriptions); } diff --git a/dotnet/src/Microsoft.AutoGen/Core/TypePrefixSubscriptionAttribute.cs b/dotnet/src/Microsoft.AutoGen/Core/TypePrefixSubscriptionAttribute.cs index 57105c8b9629..be48ab8b195f 100644 --- a/dotnet/src/Microsoft.AutoGen/Core/TypePrefixSubscriptionAttribute.cs +++ b/dotnet/src/Microsoft.AutoGen/Core/TypePrefixSubscriptionAttribute.cs @@ -6,7 +6,7 @@ namespace Microsoft.AutoGen.Core; [AttributeUsage(AttributeTargets.All)] -public class TopicPrefixSubscriptionAttribute(string topic) : Attribute, IUnboundSubscriptionDefinition +public class TypePrefixSubscriptionAttribute(string topic) : Attribute, IUnboundSubscriptionDefinition { public string Topic { get; } = topic; diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Abstractions/IAgentGrain.cs b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Abstractions/IAgentGrain.cs deleted file mode 100644 index 947b6b0cbc0a..000000000000 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Abstractions/IAgentGrain.cs +++ /dev/null @@ -1,10 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// IAgentGrain.cs - -namespace Microsoft.AutoGen.Runtime.Grpc.Abstractions; - -internal interface IAgentGrain : IGrainWithStringKey -{ - ValueTask ReadStateAsync(); - ValueTask WriteStateAsync(Contracts.AgentState state, string eTag); -} diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Grpc/GrpcGatewayService.cs b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Grpc/GrpcGatewayService.cs deleted file mode 100644 index 9481922943c9..000000000000 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Grpc/GrpcGatewayService.cs +++ /dev/null @@ -1,61 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// GrpcGatewayService.cs - -using Grpc.Core; -using Microsoft.AutoGen.Contracts; - -namespace Microsoft.AutoGen.Runtime.Grpc; - -// gRPC service which handles communication between the agent worker and the cluster. -public sealed class GrpcGatewayService(GrpcGateway gateway) : AgentRpc.AgentRpcBase -{ - private readonly GrpcGateway Gateway = (GrpcGateway)gateway; - - public override async Task OpenChannel(IAsyncStreamReader requestStream, IServerStreamWriter responseStream, ServerCallContext context) - { - try - { - await Gateway.ConnectToWorkerProcess(requestStream, responseStream, context).ConfigureAwait(true); - } - catch - { - if (context.CancellationToken.IsCancellationRequested) - { - return; - } - throw; - } - } - public override async Task GetState(AgentId request, ServerCallContext context) - { - var state = await Gateway.ReadAsync(request); - return new GetStateResponse { AgentState = state }; - } - public override async Task SaveState(AgentState request, ServerCallContext context) - { - await Gateway.StoreAsync(request); - return new SaveStateResponse - { - Success = true // TODO: Implement error handling - }; - } - public override async Task AddSubscription(AddSubscriptionRequest request, ServerCallContext context) - { - request.RequestId = context.Peer; - return await Gateway.SubscribeAsync(request).ConfigureAwait(true); - } - public override async Task RemoveSubscription(RemoveSubscriptionRequest request, ServerCallContext context) - { - return await Gateway.UnsubscribeAsync(request).ConfigureAwait(true); - } - public override async Task GetSubscriptions(GetSubscriptionsRequest request, ServerCallContext context) - { - var subscriptions = await Gateway.GetSubscriptionsAsync(request); - return new GetSubscriptionsResponse { Subscriptions = { subscriptions } }; - } - public override async Task RegisterAgent(RegisterAgentTypeRequest request, ServerCallContext context) - { - request.RequestId = context.Peer; - return await Gateway.RegisterAgentTypeAsync(request).ConfigureAwait(true); - } -} diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/AgentStateGrain.cs b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/AgentStateGrain.cs deleted file mode 100644 index 97869cd91fd1..000000000000 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/AgentStateGrain.cs +++ /dev/null @@ -1,46 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// AgentStateGrain.cs - -using Microsoft.AutoGen.Runtime.Grpc.Abstractions; - -namespace Microsoft.AutoGen.Runtime.Grpc; - -internal sealed class AgentStateGrain([PersistentState("state", "AgentStateStore")] IPersistentState state) : Grain, IAgentState, IAgentGrain -{ - /// - public async ValueTask WriteStateAsync(AgentState newState, string eTag, CancellationToken cancellationToken = default) - { - // etags for optimistic concurrency control - // if the Etag is null, its a new state - // if the passed etag is null or empty, we should not check the current state's Etag - caller doesnt care - // if both etags are set, they should match or it means that the state has changed since the last read. - if ((string.IsNullOrEmpty(state.Etag)) || (string.IsNullOrEmpty(eTag)) || (string.Equals(state.Etag, eTag, StringComparison.Ordinal))) - { - state.State = newState; - await state.WriteStateAsync().ConfigureAwait(false); - } - else - { - //TODO - this is probably not the correct behavior to just throw - I presume we want to somehow let the caller know that the state has changed and they need to re-read it - throw new ArgumentException( - "The provided ETag does not match the current ETag. The state has been modified by another request."); - } - return state.Etag; - } - - /// - public ValueTask ReadStateAsync(CancellationToken cancellationToken = default) - { - return ValueTask.FromResult(state.State); - } - - ValueTask IAgentGrain.ReadStateAsync() - { - return ReadStateAsync(); - } - - ValueTask IAgentGrain.WriteStateAsync(AgentState state, string eTag) - { - return WriteStateAsync(state, eTag); - } -} diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/ISubscriptionsGrain.cs b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/ISubscriptionsGrain.cs deleted file mode 100644 index 60c17b7c6597..000000000000 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/ISubscriptionsGrain.cs +++ /dev/null @@ -1,10 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// ISubscriptionsGrain.cs - -namespace Microsoft.AutoGen.Runtime.Grpc; -public interface ISubscriptionsGrain : IGrainWithIntegerKey -{ - ValueTask SubscribeAsync(string agentType, string topic); - ValueTask UnsubscribeAsync(string agentType, string topic); - ValueTask>> GetSubscriptions(string agentType); -} diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/SubscriptionsGrain.cs b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/SubscriptionsGrain.cs deleted file mode 100644 index 632cc7cefde8..000000000000 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/SubscriptionsGrain.cs +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// SubscriptionsGrain.cs - -namespace Microsoft.AutoGen.Runtime.Grpc; - -internal sealed class SubscriptionsGrain([PersistentState("state", "PubSubStore")] IPersistentState state) : Grain, ISubscriptionsGrain -{ - private readonly Dictionary> _subscriptions = new(); - public ValueTask>> GetSubscriptions(string? agentType = null) - { - //if agentType is null, return all subscriptions else filter on agentType - if (agentType != null) - { - return new ValueTask>>(_subscriptions.Where(x => x.Value.Contains(agentType)).ToDictionary(x => x.Key, x => x.Value)); - } - return new ValueTask>>(_subscriptions); - } - public async ValueTask SubscribeAsync(string agentType, string topic) - { - if (!_subscriptions.TryGetValue(topic, out var subscriptions)) - { - subscriptions = _subscriptions[topic] = []; - } - if (!subscriptions.Contains(agentType)) - { - subscriptions.Add(agentType); - } - _subscriptions[topic] = subscriptions; - state.State.Subscriptions = _subscriptions; - await state.WriteStateAsync().ConfigureAwait(false); - } - public async ValueTask UnsubscribeAsync(string agentType, string topic) - { - if (!_subscriptions.TryGetValue(topic, out var subscriptions)) - { - subscriptions = _subscriptions[topic] = []; - } - if (!subscriptions.Contains(agentType)) - { - subscriptions.Remove(agentType); - } - _subscriptions[topic] = subscriptions; - state.State.Subscriptions = _subscriptions; - await state.WriteStateAsync(); - } -} -public sealed class SubscriptionsState -{ - public Dictionary> Subscriptions { get; set; } = new(); -} diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/AgentStateSurrogate.cs b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/AgentStateSurrogate.cs deleted file mode 100644 index a5291f942155..000000000000 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/AgentStateSurrogate.cs +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// AgentStateSurrogate.cs - -using Google.Protobuf; -using Microsoft.AutoGen.Contracts; - -namespace Microsoft.AutoGen.Runtime.Grpc.Orleans.Surrogates; - -[GenerateSerializer] -public struct AgentStateSurrogate -{ - [Id(0)] - public string Id; - [Id(1)] - public string TextData; - [Id(2)] - public ByteString BinaryData; - [Id(3)] - public AgentId AgentId; - [Id(4)] - public string Etag; - [Id(5)] - public ByteString ProtoData; -} - -[RegisterConverter] -public sealed class AgentStateSurrogateConverter : - IConverter -{ - public AgentState ConvertFromSurrogate( - in AgentStateSurrogate surrogate) - { - var agentState = new AgentState - { - AgentId = surrogate.AgentId, - BinaryData = surrogate.BinaryData, - TextData = surrogate.TextData, - ETag = surrogate.Etag - }; - //agentState.ProtoData = surrogate.ProtoData; - return agentState; - } - - public AgentStateSurrogate ConvertToSurrogate( - in AgentState value) => - new AgentStateSurrogate - { - AgentId = value.AgentId, - BinaryData = value.BinaryData, - TextData = value.TextData, - Etag = value.ETag, - //ProtoData = value.ProtoData.Value - }; -} - diff --git a/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Abstractions/AgentsRegistryState.cs b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Abstractions/AgentsRegistryState.cs new file mode 100644 index 000000000000..2baa70b33ef8 --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Abstractions/AgentsRegistryState.cs @@ -0,0 +1,15 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// AgentsRegistryState.cs +using System.Collections.Concurrent; +using Microsoft.AutoGen.Protobuf; + +namespace Microsoft.AutoGen.RuntimeGateway.Grpc.Abstractions; +public class AgentsRegistryState +{ + public ConcurrentDictionary> AgentsToTopicsMap { get; set; } = []; + public ConcurrentDictionary> AgentsToTopicsPrefixMap { get; set; } = []; + public ConcurrentDictionary> TopicToAgentTypesMap { get; set; } = []; + public ConcurrentDictionary> TopicPrefixToAgentTypesMap { get; set; } = []; + public ConcurrentDictionary> GuidSubscriptionsMap { get; set; } = []; + public string Etag { get; set; } = Guid.NewGuid().ToString(); +} diff --git a/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Abstractions/IConnection.cs b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Abstractions/IConnection.cs new file mode 100644 index 000000000000..a5044ff69b4f --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Abstractions/IConnection.cs @@ -0,0 +1,7 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// IConnection.cs + +namespace Microsoft.AutoGen.RuntimeGateway.Grpc.Abstractions; +public interface IConnection +{ +} diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Abstractions/IGateway.cs b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Abstractions/IGateway.cs similarity index 58% rename from dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Abstractions/IGateway.cs rename to dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Abstractions/IGateway.cs index 33bb94f7c49b..b8aeae041e86 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Abstractions/IGateway.cs +++ b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Abstractions/IGateway.cs @@ -1,18 +1,15 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // IGateway.cs -using Microsoft.AutoGen.Contracts; +using Grpc.Core; +using Microsoft.AutoGen.Protobuf; -namespace Microsoft.AutoGen.Runtime.Grpc.Abstractions; +namespace Microsoft.AutoGen.RuntimeGateway.Grpc.Abstractions; public interface IGateway : IGrainObserver { ValueTask InvokeRequestAsync(RpcRequest request); - ValueTask BroadcastEventAsync(CloudEvent evt); - ValueTask StoreAsync(Contracts.AgentState value); - ValueTask ReadAsync(AgentId agentId); - ValueTask RegisterAgentTypeAsync(RegisterAgentTypeRequest request); + ValueTask RegisterAgentTypeAsync(RegisterAgentTypeRequest request, ServerCallContext context); ValueTask SubscribeAsync(AddSubscriptionRequest request); ValueTask UnsubscribeAsync(RemoveSubscriptionRequest request); ValueTask> GetSubscriptionsAsync(GetSubscriptionsRequest request); - Task SendMessageAsync(IConnection connection, CloudEvent cloudEvent); } diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Abstractions/IGatewayRegistry.cs b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Abstractions/IGatewayRegistry.cs similarity index 93% rename from dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Abstractions/IGatewayRegistry.cs rename to dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Abstractions/IGatewayRegistry.cs index cb3778418040..3d47696da3ee 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Abstractions/IGatewayRegistry.cs +++ b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Abstractions/IGatewayRegistry.cs @@ -1,9 +1,8 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // IGatewayRegistry.cs +using Microsoft.AutoGen.Protobuf; -using Microsoft.AutoGen.Contracts; - -namespace Microsoft.AutoGen.Runtime.Grpc.Abstractions; +namespace Microsoft.AutoGen.RuntimeGateway.Grpc.Abstractions; /// /// Interface for managing agent registration, placement, and subscriptions. @@ -30,7 +29,7 @@ public interface IGatewayRegistry : IRegistry /// The request containing agent type details. /// The worker to register the agent type with. /// A task representing the asynchronous operation. - ValueTask RegisterAgentTypeAsync(RegisterAgentTypeRequest request, IGateway worker); + ValueTask RegisterAgentTypeAsync(RegisterAgentTypeRequest request, string clientId, IGateway worker); /// /// Adds a new worker to the registry. diff --git a/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Abstractions/IRegistry.cs b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Abstractions/IRegistry.cs new file mode 100644 index 000000000000..215ace6e5dda --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Abstractions/IRegistry.cs @@ -0,0 +1,38 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// IRegistry.cs +using Microsoft.AutoGen.Protobuf; +namespace Microsoft.AutoGen.RuntimeGateway.Grpc.Abstractions; + +public interface IRegistry +{ + + /// + /// Gets a list of agents subscribed to and handling the specified topic and event type. + /// + /// The topic to check subscriptions for. + /// The event type to check subscriptions for. + /// A task representing the asynchronous operation, with the list of agent IDs as the result. + ValueTask> GetSubscribedAndHandlingAgentsAsync(string topic, string key); + + /// + /// Subscribes an agent to a topic. + /// + /// The subscription request. + /// A task representing the asynchronous operation. + /// removing CancellationToken from here as it is not compatible with Orleans Serialization + ValueTask SubscribeAsync(AddSubscriptionRequest request); + + /// + /// Unsubscribes an agent from a topic. + /// + /// The unsubscription request. + /// A task representing the asynchronous operation. + /// removing CancellationToken from here as it is not compatible with Orleans Serialization + ValueTask UnsubscribeAsync(RemoveSubscriptionRequest request); // TODO: This should have its own request type. + + /// + /// Gets the subscriptions for a specified agent type. + /// + /// A task representing the asynchronous operation, with the subscriptions as the result. + ValueTask> GetSubscriptionsAsync(GetSubscriptionsRequest request); +} diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Abstractions/IRegistryGrain.cs b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Abstractions/IRegistryGrain.cs similarity index 63% rename from dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Abstractions/IRegistryGrain.cs rename to dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Abstractions/IRegistryGrain.cs index 81b59858619c..a44da1ce5b22 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Abstractions/IRegistryGrain.cs +++ b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Abstractions/IRegistryGrain.cs @@ -1,11 +1,11 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // IRegistryGrain.cs -namespace Microsoft.AutoGen.Runtime.Grpc.Abstractions; +namespace Microsoft.AutoGen.RuntimeGateway.Grpc.Abstractions; /// /// Orleans specific interface, needed to mark the key /// -[Alias("Microsoft.AutoGen.Runtime.Grpc.Abstractions.IRegistryGrain")] +[Alias("Microsoft.AutoGen.RuntimeGateway.Grpc.Abstractions.IRegistryGrain")] public interface IRegistryGrain : IGatewayRegistry, IGrainWithIntegerKey { } diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Microsoft.AutoGen.Runtime.Grpc.csproj b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Microsoft.AutoGen.RuntimeGateway.Grpc.csproj similarity index 85% rename from dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Microsoft.AutoGen.Runtime.Grpc.csproj rename to dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Microsoft.AutoGen.RuntimeGateway.Grpc.csproj index b874a657d8f2..f9a568e89ade 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Microsoft.AutoGen.Runtime.Grpc.csproj +++ b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Microsoft.AutoGen.RuntimeGateway.Grpc.csproj @@ -7,6 +7,7 @@ + @@ -29,4 +30,9 @@ + + + + + \ No newline at end of file diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/AgentWorkerHostingExtensions.cs b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/AgentWorkerHostingExtensions.cs similarity index 79% rename from dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/AgentWorkerHostingExtensions.cs rename to dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/AgentWorkerHostingExtensions.cs index 3b130ca4bed5..37b9e1f1b6df 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/AgentWorkerHostingExtensions.cs +++ b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/AgentWorkerHostingExtensions.cs @@ -3,12 +3,11 @@ using System.Diagnostics; using Microsoft.AspNetCore.Builder; -using Microsoft.AutoGen.Core; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; using Microsoft.Extensions.Hosting; -namespace Microsoft.AutoGen.Runtime.Grpc; +namespace Microsoft.AutoGen.RuntimeGateway.Grpc; public static class AgentWorkerHostingExtensions { public static WebApplicationBuilder AddAgentService(this WebApplicationBuilder builder) @@ -18,10 +17,6 @@ public static WebApplicationBuilder AddAgentService(this WebApplicationBuilder b builder.Services.TryAddSingleton(DistributedContextPropagator.Current); builder.Services.AddGrpc(); - builder.Services.AddKeyedSingleton("AgentsMetadata", (sp, key) => - { - return ReflectionHelper.GetAgentsMetadata(AppDomain.CurrentDomain.GetAssemblies()); - }); builder.Services.AddSingleton(); builder.Services.AddSingleton(sp => (IHostedService)sp.GetRequiredService()); diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Grpc/GrpcGateway.cs b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs similarity index 58% rename from dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Grpc/GrpcGateway.cs rename to dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs index 26c99c894248..e657ac36fef0 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Grpc/GrpcGateway.cs +++ b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs @@ -4,46 +4,81 @@ using System.Collections.Concurrent; using Grpc.Core; using Microsoft.AutoGen.Contracts; -using Microsoft.AutoGen.Runtime.Grpc.Abstractions; +using Microsoft.AutoGen.Protobuf; +using Microsoft.AutoGen.RuntimeGateway.Grpc.Abstractions; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; -namespace Microsoft.AutoGen.Runtime.Grpc; +namespace Microsoft.AutoGen.RuntimeGateway.Grpc; +/// +/// Represents the gRPC gateway service that handles communication between the agent worker and the cluster. +/// public sealed class GrpcGateway : BackgroundService, IGateway { private static readonly TimeSpan s_agentResponseTimeout = TimeSpan.FromSeconds(30); private readonly ILogger _logger; private readonly IClusterClient _clusterClient; - //private readonly ConcurrentDictionary _agentState = new(); private readonly IRegistryGrain _gatewayRegistry; private readonly IGateway _reference; - // The agents supported by each worker process. private readonly ConcurrentDictionary> _supportedAgentTypes = []; - public readonly ConcurrentDictionary _workers = new(); - internal readonly ConcurrentDictionary _workersByConnection = new(); - private readonly ConcurrentDictionary _subscriptionsByAgentType = new(); - private readonly ConcurrentDictionary> _subscriptionsByTopic = new(); - private readonly ISubscriptionsGrain _subscriptions; - - // The mapping from agent id to worker process. + public readonly ConcurrentDictionary _workers = new(); private readonly ConcurrentDictionary<(string Type, string Key), GrpcWorkerConnection> _agentDirectory = new(); - // RPC private readonly ConcurrentDictionary<(GrpcWorkerConnection, string), TaskCompletionSource> _pendingRequests = new(); + + /// + /// Initializes a new instance of the class. + /// + /// The cluster client. + /// The logger. public GrpcGateway(IClusterClient clusterClient, ILogger logger) { _logger = logger; _clusterClient = clusterClient; _reference = clusterClient.CreateObjectReference(this); _gatewayRegistry = clusterClient.GetGrain(0); - _subscriptions = clusterClient.GetGrain(0); } + + /// + /// Executes the background service. + /// + /// The cancellation token. + /// A task that represents the asynchronous operation. + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + while (!stoppingToken.IsCancellationRequested) + { + try + { + await _gatewayRegistry.AddWorkerAsync(_reference); + } + catch (Exception exception) + { + _logger.LogWarning(exception, "Error adding worker to registry."); + } + await Task.Delay(TimeSpan.FromSeconds(15), stoppingToken); + } + try + { + await _gatewayRegistry.RemoveWorkerAsync(_reference); + } + catch (Exception exception) + { + _logger.LogWarning(exception, "Error removing worker from registry."); + } + } + + /// + /// Invokes a request asynchronously. + /// + /// The RPC request. + /// The cancellation token. + /// A task that represents the asynchronous operation. The task result contains the RPC response. public async ValueTask InvokeRequestAsync(RpcRequest request, CancellationToken cancellationToken = default) { var agentId = (request.Target.Type, request.Target.Key); if (!_agentDirectory.TryGetValue(agentId, out var connection) || connection.Completion.IsCompleted == true) { - // Activate the agent on a compatible worker process. if (_supportedAgentTypes.TryGetValue(request.Target.Type, out var workers)) { connection = workers[Random.Shared.Next(workers.Count)]; @@ -54,109 +89,143 @@ public async ValueTask InvokeRequestAsync(RpcRequest request, Cance return new(new RpcResponse { Error = "Agent not found." }); } } - // Proxy the request to the agent. var originalRequestId = request.RequestId; var newRequestId = Guid.NewGuid().ToString(); var completion = _pendingRequests[(connection, newRequestId)] = new(TaskCreationOptions.RunContinuationsAsynchronously); request.RequestId = newRequestId; await connection.ResponseStream.WriteAsync(new Message { Request = request }, cancellationToken).ConfigureAwait(false); - // Wait for the response and send it back to the caller. var response = await completion.Task.WaitAsync(s_agentResponseTimeout); response.RequestId = originalRequestId; return response; } - public async ValueTask StoreAsync(AgentState value, CancellationToken cancellationToken = default) - { - _ = value.AgentId ?? throw new ArgumentNullException(nameof(value.AgentId)); - var agentState = _clusterClient.GetGrain($"{value.AgentId.Type}:{value.AgentId.Key}"); - await agentState.WriteStateAsync(value, value.ETag); - } - public async ValueTask ReadAsync(AgentId agentId, CancellationToken cancellationToken = default) - { - var agentState = _clusterClient.GetGrain($"{agentId.Type}:{agentId.Key}"); - return await agentState.ReadStateAsync(); - } - public async ValueTask RegisterAgentTypeAsync(RegisterAgentTypeRequest request, CancellationToken cancellationToken = default) + + /// + /// Registers an agent type asynchronously. + /// + /// The register agent type request. + /// The server call context. + /// The cancellation token. + /// A task that represents the asynchronous operation. The task result contains the register agent type response. + public async ValueTask RegisterAgentTypeAsync(RegisterAgentTypeRequest request, ServerCallContext context, CancellationToken cancellationToken = default) { try { - var connection = _workersByConnection[request.RequestId]; + var clientId = context.RequestHeaders.Get("client-id")?.Value ?? + throw new RpcException(new Status(StatusCode.InvalidArgument, "Grpc Client ID is required.")); + if (!_workers.TryGetValue(clientId, out var connection)) + { + throw new RpcException(new Status(StatusCode.InvalidArgument, $"Grpc Worker Connection not found for ClientId {clientId}.")); + } connection.AddSupportedType(request.Type); _supportedAgentTypes.GetOrAdd(request.Type, _ => []).Add(connection); - await _gatewayRegistry.RegisterAgentTypeAsync(request, _reference).ConfigureAwait(true); - return new RegisterAgentTypeResponse - { - Success = true, - RequestId = request.RequestId - }; + await _gatewayRegistry.RegisterAgentTypeAsync(request, clientId, _reference).ConfigureAwait(true); + return new RegisterAgentTypeResponse { }; } catch (Exception ex) { - return new RegisterAgentTypeResponse - { - Success = false, - RequestId = request.RequestId, - Error = ex.Message - }; + throw new RpcException(new Status(StatusCode.Internal, ex.Message)); } } + + /// + /// Subscribes to a topic asynchronously. + /// + /// The add subscription request. + /// The cancellation token. + /// A task that represents the asynchronous operation. The task result contains the add subscription response. public async ValueTask SubscribeAsync(AddSubscriptionRequest request, CancellationToken cancellationToken = default) { try { await _gatewayRegistry.SubscribeAsync(request).ConfigureAwait(true); - return new AddSubscriptionResponse - { - Success = true, - RequestId = request.RequestId - }; + return new AddSubscriptionResponse { }; } catch (Exception ex) { - return new AddSubscriptionResponse - { - Success = false, - RequestId = request.RequestId, - Error = ex.Message - }; + throw new RpcException(new Status(StatusCode.Internal, ex.Message)); } } - protected override async Task ExecuteAsync(CancellationToken stoppingToken) + + /// + /// Unsubscribes from a topic asynchronously. + /// + /// The remove subscription request. + /// The cancellation token. + /// A task that represents the asynchronous operation. The task result contains the remove subscription response. + public async ValueTask UnsubscribeAsync(RemoveSubscriptionRequest request, CancellationToken cancellationToken = default) { - while (!stoppingToken.IsCancellationRequested) - { - try - { - await _gatewayRegistry.AddWorkerAsync(_reference); - } - catch (Exception exception) - { - _logger.LogWarning(exception, "Error adding worker to registry."); - } - await Task.Delay(TimeSpan.FromSeconds(15), stoppingToken); - } try { - await _gatewayRegistry.RemoveWorkerAsync(_reference); + await _gatewayRegistry.UnsubscribeAsync(request).ConfigureAwait(true); + return new RemoveSubscriptionResponse { }; } - catch (Exception exception) + catch (Exception ex) { - _logger.LogWarning(exception, "Error removing worker from registry."); + throw new RpcException(new Status(StatusCode.Internal, ex.Message)); } } + + /// + /// Gets the subscriptions asynchronously. + /// + /// The get subscriptions request. + /// The cancellation token. + /// A task that represents the asynchronous operation. The task result contains the list of subscriptions. + public ValueTask> GetSubscriptionsAsync(GetSubscriptionsRequest request, CancellationToken cancellationToken = default) + { + return _gatewayRegistry.GetSubscriptionsAsync(request); + } + + async ValueTask IGateway.InvokeRequestAsync(RpcRequest request) + { + return await InvokeRequestAsync(request, default).ConfigureAwait(false); + } + + ValueTask IGateway.RegisterAgentTypeAsync(RegisterAgentTypeRequest request, ServerCallContext context) + { + return RegisterAgentTypeAsync(request, context, default); + } + + ValueTask IGateway.SubscribeAsync(AddSubscriptionRequest request) + { + return SubscribeAsync(request, default); + } + + ValueTask IGateway.UnsubscribeAsync(RemoveSubscriptionRequest request) + { + return UnsubscribeAsync(request, default); + } + + ValueTask> IGateway.GetSubscriptionsAsync(GetSubscriptionsRequest request) + { + return GetSubscriptionsAsync(request); + } + + /// + /// Connects to a worker process. + /// + /// The request stream. + /// The response stream. + /// The server call context. + /// A task that represents the asynchronous operation. internal async Task ConnectToWorkerProcess(IAsyncStreamReader requestStream, IServerStreamWriter responseStream, ServerCallContext context) { _logger.LogInformation("Received new connection from {Peer}.", context.Peer); + var clientId = (context.RequestHeaders.Get("client-id")?.Value) ?? + throw new RpcException(new Status(StatusCode.InvalidArgument, "Client ID is required.")); var workerProcess = new GrpcWorkerConnection(this, requestStream, responseStream, context); - _workers.GetOrAdd(workerProcess, workerProcess); - _workersByConnection.GetOrAdd(context.Peer, workerProcess); + _workers.GetOrAdd(clientId, workerProcess); await workerProcess.Connect().ConfigureAwait(false); } - internal async Task SendMessageAsync(GrpcWorkerConnection connection, CloudEvent cloudEvent, CancellationToken cancellationToken = default) - { - await connection.ResponseStream.WriteAsync(new Message { CloudEvent = cloudEvent }, cancellationToken).ConfigureAwait(false); - } + + /// + /// Handles received messages from a worker connection. + /// + /// The worker connection. + /// The received message. + /// The cancellation token. + /// A task that represents the asynchronous operation. internal async Task OnReceivedMessageAsync(GrpcWorkerConnection connection, Message message, CancellationToken cancellationToken = default) { _logger.LogInformation("Received message {Message} from connection {Connection}.", message, connection); @@ -171,18 +240,17 @@ internal async Task OnReceivedMessageAsync(GrpcWorkerConnection connection, Mess case Message.MessageOneofCase.CloudEvent: await DispatchEventAsync(message.CloudEvent, cancellationToken); break; - case Message.MessageOneofCase.RegisterAgentTypeRequest: - await RegisterAgentTypeAsync(connection, message.RegisterAgentTypeRequest); - break; - case Message.MessageOneofCase.AddSubscriptionRequest: - await AddSubscriptionAsync(connection, message.AddSubscriptionRequest); - break; default: - // if it wasn't recognized return bad request await RespondBadRequestAsync(connection, $"Unknown message type for message '{message}'."); break; }; } + + /// + /// Dispatches a response to a pending request. + /// + /// The worker connection. + /// The RPC response. private void DispatchResponse(GrpcWorkerConnection connection, RpcResponse response) { if (!_pendingRequests.TryRemove((connection, response.RequestId), out var completion)) @@ -190,26 +258,15 @@ private void DispatchResponse(GrpcWorkerConnection connection, RpcResponse respo _logger.LogWarning("Received response for unknown request id: {RequestId}.", response.RequestId); return; } - // Complete the request. completion.SetResult(response); } - private async ValueTask RegisterAgentTypeAsync(GrpcWorkerConnection connection, RegisterAgentTypeRequest msg) - { - connection.AddSupportedType(msg.Type); - _supportedAgentTypes.GetOrAdd(msg.Type, _ => []).Add(connection); - await _gatewayRegistry.RegisterAgentTypeAsync(msg, _reference).ConfigureAwait(true); - Message response = new() - { - RegisterAgentTypeResponse = new() - { - RequestId = msg.RequestId, - Error = "", - Success = true - } - }; - await connection.ResponseStream.WriteAsync(response).ConfigureAwait(false); - } + /// + /// Dispatches an event to the appropriate agents. + /// + /// The cloud event. + /// The cancellation token. + /// A task that represents the asynchronous operation. private async ValueTask DispatchEventAsync(CloudEvent evt, CancellationToken cancellationToken = default) { var registry = _clusterClient.GetGrain(0); @@ -227,7 +284,7 @@ private async ValueTask DispatchEventAsync(CloudEvent evt, CancellationToken can var activeConnections = connections.Where(c => c.Completion?.IsCompleted == false).ToList(); foreach (var connection in activeConnections) { - tasks.Add(this.SendMessageAsync(connection, evt, cancellationToken)); + tasks.Add(this.WriteResponseAsync(connection, evt, cancellationToken)); } } } @@ -238,6 +295,13 @@ private async ValueTask DispatchEventAsync(CloudEvent evt, CancellationToken can _logger.LogWarning("No agent types found for event type {EventType}.", evt.Type); } } + + /// + /// Dispatches a request to the appropriate agent. + /// + /// The worker connection. + /// The RPC request. + /// A task that represents the asynchronous operation. private async ValueTask DispatchRequestAsync(GrpcWorkerConnection connection, RpcRequest request) { var requestId = request.RequestId; @@ -260,6 +324,14 @@ await InvokeRequestDelegate(connection, request, async request => return await gateway.InvokeRequestAsync(request).ConfigureAwait(true); }).ConfigureAwait(false); } + + /// + /// Invokes a request delegate. + /// + /// The worker connection. + /// The RPC request. + /// The function to invoke. + /// A task that represents the asynchronous operation. private static async Task InvokeRequestDelegate(GrpcWorkerConnection connection, RpcRequest request, Func> func) { try @@ -273,9 +345,16 @@ private static async Task InvokeRequestDelegate(GrpcWorkerConnection connection, await connection.ResponseStream.WriteAsync(new Message { Response = new RpcResponse { RequestId = request.RequestId, Error = ex.Message } }).ConfigureAwait(false); } } + + /// + /// Handles the removal of a worker process. + /// + /// The worker process. internal void OnRemoveWorkerProcess(GrpcWorkerConnection workerProcess) { - _workers.TryRemove(workerProcess, out _); + var clientId = workerProcess.ServerCallContext.RequestHeaders.Get("client-id")?.Value ?? + throw new RpcException(new Status(StatusCode.InvalidArgument, "Grpc Client ID is required.")); + _workers.TryRemove(clientId, out _); var types = workerProcess.GetSupportedTypes(); foreach (var type in types) { @@ -284,7 +363,6 @@ internal void OnRemoveWorkerProcess(GrpcWorkerConnection workerProcess) supported.Remove(workerProcess); } } - // Any agents activated on that worker are also gone. foreach (var pair in _agentDirectory) { if (pair.Value == workerProcess) @@ -293,39 +371,24 @@ internal void OnRemoveWorkerProcess(GrpcWorkerConnection workerProcess) } } } + + /// + /// Responds with a bad request error. + /// + /// The worker connection. + /// The error message. + /// A task that represents the asynchronous operation. private static async ValueTask RespondBadRequestAsync(GrpcWorkerConnection connection, string error) { throw new RpcException(new Status(StatusCode.InvalidArgument, error)); } - private async ValueTask AddSubscriptionAsync(GrpcWorkerConnection connection, AddSubscriptionRequest request) - { - var topic = ""; - var agentType = ""; - if (request.Subscription.TypePrefixSubscription is not null) - { - topic = request.Subscription.TypePrefixSubscription.TopicTypePrefix; - agentType = request.Subscription.TypePrefixSubscription.AgentType; - } - else if (request.Subscription.TypeSubscription is not null) - { - topic = request.Subscription.TypeSubscription.TopicType; - agentType = request.Subscription.TypeSubscription.AgentType; - } - _subscriptionsByAgentType[agentType] = request.Subscription; - _subscriptionsByTopic.GetOrAdd(topic, _ => []).Add(agentType); - await _subscriptions.SubscribeAsync(topic, agentType); - //var response = new SubscriptionResponse { RequestId = request.RequestId, Error = "", Success = true }; - Message response = new() - { - AddSubscriptionResponse = new() - { - RequestId = request.RequestId, - Error = "", - Success = true - } - }; - await connection.ResponseStream.WriteAsync(response).ConfigureAwait(false); - } + + /// + /// Dispatches an event to the specified agent types. + /// + /// The agent types. + /// The cloud event. + /// A task that represents the asynchronous operation. private async ValueTask DispatchEventToAgentsAsync(IEnumerable agentTypes, CloudEvent evt) { var tasks = new List(agentTypes.Count()); @@ -335,86 +398,33 @@ private async ValueTask DispatchEventToAgentsAsync(IEnumerable agentType { foreach (var connection in connections) { - tasks.Add(this.SendMessageAsync(connection, evt)); + tasks.Add(this.WriteResponseAsync(connection, evt)); } } } await Task.WhenAll(tasks).ConfigureAwait(false); } - public async ValueTask BroadcastEventAsync(CloudEvent evt, CancellationToken cancellationToken = default) - { - var tasks = new List(_workers.Count); - foreach (var (_, connection) in _supportedAgentTypes) - { - tasks.Add(this.SendMessageAsync((IConnection)connection[0], evt, default)); - } - await Task.WhenAll(tasks).ConfigureAwait(false); - } - Task IGateway.SendMessageAsync(IConnection connection, CloudEvent cloudEvent) - { - return this.SendMessageAsync(connection, cloudEvent, default); - } - public async Task SendMessageAsync(IConnection connection, CloudEvent cloudEvent, CancellationToken cancellationToken = default) + /// + /// Writes a response to a worker connection. + /// + /// The worker connection. + /// The cloud event. + /// The cancellation token. + /// A task that represents the asynchronous operation. + private async Task WriteResponseAsync(GrpcWorkerConnection connection, CloudEvent cloudEvent, CancellationToken cancellationToken = default) { - var queue = (GrpcWorkerConnection)connection; - await queue.ResponseStream.WriteAsync(new Message { CloudEvent = cloudEvent }, cancellationToken).ConfigureAwait(false); + await connection.ResponseStream.WriteAsync(new Message { CloudEvent = cloudEvent }, cancellationToken).ConfigureAwait(false); } - public async ValueTask UnsubscribeAsync(RemoveSubscriptionRequest request, CancellationToken cancellationToken = default) - { - try - { - await _gatewayRegistry.UnsubscribeAsync(request).ConfigureAwait(true); - return new RemoveSubscriptionResponse - - { - Success = true, - }; - } - catch (Exception ex) - { - return new RemoveSubscriptionResponse - { - Success = false, - Error = ex.Message - }; - } - } - public ValueTask> GetSubscriptionsAsync(GetSubscriptionsRequest request, CancellationToken cancellationToken = default) - { - return _gatewayRegistry.GetSubscriptionsAsync(request); - } - async ValueTask IGateway.InvokeRequestAsync(RpcRequest request) - { - return await InvokeRequestAsync(request, default).ConfigureAwait(false); - } - async ValueTask IGateway.BroadcastEventAsync(CloudEvent evt) - { - await BroadcastEventAsync(evt, default).ConfigureAwait(false); - } - ValueTask IGateway.StoreAsync(AgentState value) - { - return StoreAsync(value, default); - } - ValueTask IGateway.ReadAsync(AgentId agentId) - { - return ReadAsync(agentId, default); - } - ValueTask IGateway.RegisterAgentTypeAsync(RegisterAgentTypeRequest request) - { - return RegisterAgentTypeAsync(request, default); - } - ValueTask IGateway.SubscribeAsync(AddSubscriptionRequest request) - { - return SubscribeAsync(request, default); - } - ValueTask IGateway.UnsubscribeAsync(RemoveSubscriptionRequest request) - { - return UnsubscribeAsync(request, default); - } - ValueTask> IGateway.GetSubscriptionsAsync(GetSubscriptionsRequest request) - { - return GetSubscriptionsAsync(request); + /// + /// Writes a response to a worker connection. + /// + /// The worker connection. + /// The cloud event. + /// A task that represents the asynchronous operation. + public async Task WriteResponseAsync(IConnection connection, CloudEvent cloudEvent) + { + await WriteResponseAsync((GrpcWorkerConnection)connection, cloudEvent, default).ConfigureAwait(false); } } diff --git a/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGatewayService.cs b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGatewayService.cs new file mode 100644 index 000000000000..1f04647db322 --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGatewayService.cs @@ -0,0 +1,110 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// GrpcGatewayService.cs +using Grpc.Core; +using Microsoft.AutoGen.Protobuf; + +namespace Microsoft.AutoGen.RuntimeGateway.Grpc; + +/// +/// Represents the gRPC service which handles communication between the agent worker and the cluster. +/// +public sealed class GrpcGatewayService(GrpcGateway gateway) : AgentRpc.AgentRpcBase +{ + private readonly GrpcGateway Gateway = (GrpcGateway)gateway; + + /// + /// Method run on first connect from a worker process. + /// + /// The request stream. + /// The response stream. + /// The server call context. + /// A task that represents the asynchronous operation. + public override async Task OpenChannel(IAsyncStreamReader requestStream, IServerStreamWriter responseStream, ServerCallContext context) + { + try + { + await Gateway.ConnectToWorkerProcess(requestStream, responseStream, context).ConfigureAwait(true); + } + catch + { + if (context.CancellationToken.IsCancellationRequested) + { + return; + } + throw; + } + } + + /// + /// Adds a subscription. + /// + /// The add subscription request. + /// The server call context. + /// A task that represents the asynchronous operation. The task result contains the add subscription response. + public override async Task AddSubscription(AddSubscriptionRequest request, ServerCallContext context) + { + try + { + return await Gateway.SubscribeAsync(request).ConfigureAwait(true); + } + catch (Exception e) + { + throw new RpcException(new Status(StatusCode.Internal, e.Message)); + } + } + + /// + /// Removes a subscription. + /// + /// The remove subscription request. + /// The server call context. + /// A task that represents the asynchronous operation. The task result contains the remove subscription response. + public override async Task RemoveSubscription(RemoveSubscriptionRequest request, ServerCallContext context) + { + try + { + return await Gateway.UnsubscribeAsync(request).ConfigureAwait(true); + } + catch (Exception e) + { + throw new RpcException(new Status(StatusCode.Internal, e.Message)); + } + } + + /// + /// Gets the subscriptions. + /// + /// The get subscriptions request. + /// The server call context. + /// A task that represents the asynchronous operation. The task result contains the get subscriptions response. + public override async Task GetSubscriptions(GetSubscriptionsRequest request, ServerCallContext context) + { + try + { + var subscriptions = await Gateway.GetSubscriptionsAsync(request); + return new GetSubscriptionsResponse { Subscriptions = { subscriptions } }; + } + catch (Exception e) + { + throw new RpcException(new Status(StatusCode.Internal, e.Message)); + } + } + + /// + /// Registers an agent type (factory) + /// + /// The register agent type request. + /// The server call context. + /// A task that represents the asynchronous operation. The task result contains the register agent type response. + public override async Task RegisterAgent(RegisterAgentTypeRequest request, ServerCallContext context) + { + try + { + return await Gateway.RegisterAgentTypeAsync(request, context).ConfigureAwait(true); + } + catch (Exception e) + { + throw new RpcException(new Status(StatusCode.Internal, e.Message)); + } + } +} diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Grpc/GrpcWorkerConnection.cs b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcWorkerConnection.cs similarity index 91% rename from dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Grpc/GrpcWorkerConnection.cs rename to dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcWorkerConnection.cs index cba0f8c4772b..0cdfc9a2898b 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Grpc/GrpcWorkerConnection.cs +++ b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcWorkerConnection.cs @@ -1,12 +1,13 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // GrpcWorkerConnection.cs - using System.Threading.Channels; using Grpc.Core; +using Microsoft.AutoGen.Protobuf; +using Microsoft.AutoGen.RuntimeGateway.Grpc.Abstractions; -namespace Microsoft.AutoGen.Runtime.Grpc; +namespace Microsoft.AutoGen.RuntimeGateway.Grpc; -internal sealed class GrpcWorkerConnection : IAsyncDisposable, IConnection +public sealed class GrpcWorkerConnection : IAsyncDisposable, IConnection { private static long s_nextConnectionId; private Task _readTask = Task.CompletedTask; @@ -102,9 +103,9 @@ public async Task RunWritePump() await Task.CompletedTask.ConfigureAwait(ConfigureAwaitOptions.ForceYielding); try { - await foreach (var message in _outboundMessages.Reader.ReadAllAsync(_shutdownCancellationToken.Token)) + await foreach (var message in _outboundMessages.Reader.ReadAllAsync(_shutdownCancellationToken.Token).ConfigureAwait(false)) { - await ResponseStream.WriteAsync(message); + await ResponseStream.WriteAsync(message).ConfigureAwait(false); } } catch (OperationCanceledException) diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/OrleansRuntimeHostingExtenions.cs b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/OrleansRuntimeHostingExtenions.cs similarity index 98% rename from dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/OrleansRuntimeHostingExtenions.cs rename to dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/OrleansRuntimeHostingExtenions.cs index e83db26ad0b7..9abf9ac048c7 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/OrleansRuntimeHostingExtenions.cs +++ b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/OrleansRuntimeHostingExtenions.cs @@ -9,7 +9,7 @@ using Orleans.Configuration; using Orleans.Serialization; -namespace Microsoft.AutoGen.Runtime.Grpc; +namespace Microsoft.AutoGen.RuntimeGateway.Grpc; public static class OrleansRuntimeHostingExtenions { diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/RegistryGrain.cs b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/RegistryGrain.cs similarity index 75% rename from dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/RegistryGrain.cs rename to dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/RegistryGrain.cs index 9de7065fdb62..522c20614606 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/RegistryGrain.cs +++ b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/RegistryGrain.cs @@ -1,9 +1,9 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // RegistryGrain.cs -using Microsoft.AutoGen.Contracts; -using Microsoft.AutoGen.Runtime.Grpc.Abstractions; +using Microsoft.AutoGen.Protobuf; +using Microsoft.AutoGen.RuntimeGateway.Grpc.Abstractions; -namespace Microsoft.AutoGen.Runtime.Grpc; +namespace Microsoft.AutoGen.RuntimeGateway.Grpc; internal sealed class RegistryGrain([PersistentState("state", "AgentRegistryStore")] IPersistentState state) : Grain, IRegistryGrain { private readonly Dictionary _workerStates = new(); @@ -16,7 +16,7 @@ public override Task OnActivateAsync(CancellationToken cancellationToken) this.RegisterGrainTimer(static state => state.PurgeInactiveWorkers(), this, TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(30)); return base.OnActivateAsync(cancellationToken); } - public ValueTask> GetSubscribedAndHandlingAgentsAsync(string topic, string eventType) + public ValueTask> GetSubscribedAndHandlingAgentsAsync(string topic, string key) { List agents = []; // get all agent types that are subscribed to the topic @@ -29,19 +29,19 @@ public ValueTask> GetSubscribedAndHandlingAgentsAsync(string topic, }*/ agents.AddRange(subscribedAgentTypes.ToList()); } - if (state.State.TopicToAgentTypesMap.TryGetValue(eventType, out var eventHandlingAgents)) + if (state.State.TopicToAgentTypesMap.TryGetValue(key, out var eventHandlingAgents)) { agents.AddRange(eventHandlingAgents.ToList()); } - if (state.State.TopicToAgentTypesMap.TryGetValue(topic + "." + eventType, out var combo)) + if (state.State.TopicToAgentTypesMap.TryGetValue(topic + "." + key, out var combo)) { agents.AddRange(combo.ToList()); } - // instead of an exact match, we can also check for a prefix match where key starts with the eventType - if (state.State.TopicToAgentTypesMap.Keys.Any(key => key.StartsWith(eventType))) + // instead of an exact match, we can also check for a prefix match from the TopicPrefixToAgentTypesMap + if (state.State.TopicPrefixToAgentTypesMap.Keys.Any(key => key.StartsWith(topic))) { - state.State.TopicToAgentTypesMap.Where( - kvp => kvp.Key.StartsWith(eventType)) + state.State.TopicPrefixToAgentTypesMap.Where( + kvp => kvp.Key.StartsWith(topic)) .SelectMany(kvp => kvp.Value) .Distinct() .ToList() @@ -51,7 +51,6 @@ public ValueTask> GetSubscribedAndHandlingAgentsAsync(string topic, }); } agents = agents.Distinct().ToList(); - return new ValueTask>(agents); } public ValueTask<(IGateway? Worker, bool NewPlacement)> GetOrPlaceAgent(AgentId agentId) @@ -94,18 +93,8 @@ public ValueTask RemoveWorkerAsync(IGateway worker) } return ValueTask.CompletedTask; } - public async ValueTask RegisterAgentTypeAsync(RegisterAgentTypeRequest registration, IGateway gateway) + public async ValueTask RegisterAgentTypeAsync(RegisterAgentTypeRequest registration, string clientId, IGateway gateway) { - if (!_supportedAgentTypes.TryGetValue(registration.Type, out var supportedAgentTypes)) - { - supportedAgentTypes = _supportedAgentTypes[registration.Type] = []; - } - - if (!supportedAgentTypes.Contains(gateway)) - { - supportedAgentTypes.Add(gateway); - } - var workerState = GetOrAddWorker(gateway); workerState.SupportedTypes.Add(registration.Type); @@ -159,9 +148,7 @@ private WorkerState GetOrAddWorker(IGateway worker) workerState.LastSeen = DateTimeOffset.UtcNow; return workerState; } - public ValueTask GetCompatibleWorkerAsync(string type) => new(GetCompatibleWorkerCore(type)); - private IGateway? GetCompatibleWorkerCore(string type) { if (_supportedAgentTypes.TryGetValue(type, out var workers)) @@ -178,9 +165,27 @@ public async ValueTask SubscribeAsync(AddSubscriptionRequest subscription) subscription.Subscription.Id = guid; switch (subscription.Subscription.SubscriptionCase) { - //TODO: this doesnt look right case Subscription.SubscriptionOneofCase.TypePrefixSubscription: - break; + { + // add the topic to the set of topics for the agent type + state.State.AgentsToTopicsMap.TryGetValue(subscription.Subscription.TypePrefixSubscription.AgentType, out var topics); + if (topics is null) + { + topics = new HashSet(); + state.State.AgentsToTopicsPrefixMap[subscription.Subscription.TypePrefixSubscription.AgentType] = topics; + } + topics.Add(subscription.Subscription.TypePrefixSubscription.TopicTypePrefix); + + // add the agent type to the set of agent types for the topic + state.State.TopicPrefixToAgentTypesMap.TryGetValue(subscription.Subscription.TypePrefixSubscription.TopicTypePrefix, out var agents); + if (agents is null) + { + agents = new HashSet(); + state.State.TopicPrefixToAgentTypesMap[subscription.Subscription.TypePrefixSubscription.TopicTypePrefix] = agents; + } + agents.Add(subscription.Subscription.TypePrefixSubscription.AgentType); + break; + } case Subscription.SubscriptionOneofCase.TypeSubscription: { // add the topic to the set of topics for the agent type @@ -200,20 +205,19 @@ public async ValueTask SubscribeAsync(AddSubscriptionRequest subscription) state.State.TopicToAgentTypesMap[subscription.Subscription.TypeSubscription.TopicType] = agents; } agents.Add(subscription.Subscription.TypeSubscription.AgentType); - - // add the subscription by Guid - state.State.GuidSubscriptionsMap.TryGetValue(guid, out var existingSubscriptions); - if (existingSubscriptions is null) - { - existingSubscriptions = new HashSet(); - state.State.GuidSubscriptionsMap[guid] = existingSubscriptions; - } - existingSubscriptions.Add(subscription.Subscription); break; } default: throw new InvalidOperationException("Invalid subscription type"); } + // add the subscription by Guid + state.State.GuidSubscriptionsMap.TryGetValue(guid, out var existingSubscriptions); + if (existingSubscriptions is null) + { + existingSubscriptions = new HashSet(); + state.State.GuidSubscriptionsMap[guid] = existingSubscriptions; + } + existingSubscriptions.Add(subscription.Subscription); await state.WriteStateAsync().ConfigureAwait(false); } public async ValueTask UnsubscribeAsync(RemoveSubscriptionRequest request) @@ -239,17 +243,25 @@ public async ValueTask UnsubscribeAsync(RemoveSubscriptionRequest request) // remove the agent type from the set of agent types for the topic state.State.TopicToAgentTypesMap.TryGetValue(subscription.TypeSubscription.TopicType, out var agents); agents?.Remove(subscription.TypeSubscription.AgentType); - - //remove the subscription by Guid - state.State.GuidSubscriptionsMap.TryGetValue(guid, out var existingSubscriptions); - existingSubscriptions?.Remove(subscription); break; } case Subscription.SubscriptionOneofCase.TypePrefixSubscription: - break; + { + // remove the topic from the set of topics for the agent type + state.State.AgentsToTopicsPrefixMap.TryGetValue(subscription.TypePrefixSubscription.AgentType, out var topics); + topics?.Remove(subscription.TypePrefixSubscription.TopicTypePrefix); + + // remove the agent type from the set of agent types for the topic + state.State.TopicPrefixToAgentTypesMap.TryGetValue(subscription.TypePrefixSubscription.TopicTypePrefix, out var agents); + agents?.Remove(subscription.TypePrefixSubscription.AgentType); + break; + } default: throw new InvalidOperationException("Invalid subscription type"); } + //remove the subscription by Guid + state.State.GuidSubscriptionsMap.TryGetValue(guid, out var existingSubscriptions); + existingSubscriptions?.Remove(subscription); } state.State.GuidSubscriptionsMap.Remove(guid, out _); } @@ -265,17 +277,6 @@ public ValueTask> GetSubscriptionsAsync(GetSubscriptionsReque } return new(subscriptions); } - public ValueTask RegisterAgentTypeAsync(RegisterAgentTypeRequest request, IAgentRuntime worker) - { - var (_, _) = (request, worker); - var e = "RegisterAgentTypeAsync(RegisterAgentTypeRequest request, IAgentRuntime worker) is not implemented when using the Grpc runtime."; - throw new NotImplementedException(e); - } - public ValueTask UnregisterAgentTypeAsync(string type, IAgentRuntime worker) - { - var e = "UnregisterAgentTypeAsync(string type, IAgentRuntime worker) is not implemented when using the Grpc runtime."; - throw new NotImplementedException(e); - } private sealed class WorkerState { public HashSet SupportedTypes { get; set; } = []; diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/AddSubscriptionRequestSurrogate.cs b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/Surrogates/AddSubscriptionRequestSurrogate.cs similarity index 86% rename from dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/AddSubscriptionRequestSurrogate.cs rename to dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/Surrogates/AddSubscriptionRequestSurrogate.cs index 37e3af1b9d17..793301cf4d5f 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/AddSubscriptionRequestSurrogate.cs +++ b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/Surrogates/AddSubscriptionRequestSurrogate.cs @@ -1,7 +1,8 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // AddSubscriptionRequestSurrogate.cs +using Microsoft.AutoGen.Protobuf; -namespace Microsoft.AutoGen.Runtime.Grpc.Orleans.Surrogates; +namespace Microsoft.AutoGen.RuntimeGateway.Grpc.Orleans.Surrogates; [GenerateSerializer] public struct AddSubscriptionRequestSurrogate @@ -21,7 +22,6 @@ public AddSubscriptionRequest ConvertFromSurrogate( { var request = new AddSubscriptionRequest() { - RequestId = surrogate.RequestId, Subscription = surrogate.Subscription }; return request; @@ -31,7 +31,6 @@ public AddSubscriptionRequestSurrogate ConvertToSurrogate( in AddSubscriptionRequest value) => new AddSubscriptionRequestSurrogate { - RequestId = value.RequestId, Subscription = value.Subscription }; } diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/AddSubscriptionResponseSurrogate.cs b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/Surrogates/AddSubscriptionResponseSurrogate.cs similarity index 61% rename from dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/AddSubscriptionResponseSurrogate.cs rename to dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/Surrogates/AddSubscriptionResponseSurrogate.cs index 4c15784e0fcc..6a07a114d3e6 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/AddSubscriptionResponseSurrogate.cs +++ b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/Surrogates/AddSubscriptionResponseSurrogate.cs @@ -1,7 +1,9 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // AddSubscriptionResponseSurrogate.cs -namespace Microsoft.AutoGen.Runtime.Grpc.Orleans.Surrogates; +using Microsoft.AutoGen.Protobuf; + +namespace Microsoft.AutoGen.RuntimeGateway.Grpc.Orleans.Surrogates; [GenerateSerializer] public struct AddSubscriptionResponseSurrogate @@ -20,20 +22,10 @@ public sealed class AddSubscriptionResponseSurrogateConverter : { public AddSubscriptionResponse ConvertFromSurrogate( in AddSubscriptionResponseSurrogate surrogate) => - new AddSubscriptionResponse - { - RequestId = surrogate.RequestId, - Success = surrogate.Success, - Error = surrogate.Error - }; + new AddSubscriptionResponse { }; public AddSubscriptionResponseSurrogate ConvertToSurrogate( in AddSubscriptionResponse value) => - new AddSubscriptionResponseSurrogate - { - RequestId = value.RequestId, - Success = value.Success, - Error = value.Error - }; + new AddSubscriptionResponseSurrogate { }; } diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/AgentIdSurrogate.cs b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/Surrogates/AgentIdSurrogate.cs similarity index 88% rename from dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/AgentIdSurrogate.cs rename to dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/Surrogates/AgentIdSurrogate.cs index ddef9e997575..af7728d1254c 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/AgentIdSurrogate.cs +++ b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/Surrogates/AgentIdSurrogate.cs @@ -3,9 +3,9 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // AgentIdSurrogate.cs -using Microsoft.AutoGen.Contracts; +using Microsoft.AutoGen.Protobuf; -namespace Microsoft.AutoGen.Runtime.Grpc.Orleans.Surrogates; +namespace Microsoft.AutoGen.RuntimeGateway.Grpc.Orleans.Surrogates; [GenerateSerializer] public struct AgentIdSurrogate diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/CloudEventSurrogate.cs b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/Surrogates/CloudEventSurrogate.cs similarity index 91% rename from dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/CloudEventSurrogate.cs rename to dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/Surrogates/CloudEventSurrogate.cs index 22359a08981c..a69f7cfc89a5 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/CloudEventSurrogate.cs +++ b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/Surrogates/CloudEventSurrogate.cs @@ -1,10 +1,10 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // CloudEventSurrogate.cs - using Google.Protobuf; using Google.Protobuf.WellKnownTypes; +using Microsoft.AutoGen.Contracts; -namespace Microsoft.AutoGen.Runtime.Grpc.Orleans.Surrogates; +namespace Microsoft.AutoGen.RuntimeGateway.Grpc.Orleans.Surrogates; // TODO: Add the rest of the properties [GenerateSerializer] diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/GetSubscriptionsRequest.cs b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/Surrogates/GetSubscriptionsRequest.cs similarity index 88% rename from dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/GetSubscriptionsRequest.cs rename to dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/Surrogates/GetSubscriptionsRequest.cs index ab4722ff8c74..e53948041828 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/GetSubscriptionsRequest.cs +++ b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/Surrogates/GetSubscriptionsRequest.cs @@ -1,7 +1,9 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // GetSubscriptionsRequest.cs -namespace Microsoft.AutoGen.Runtime.Grpc.Orleans.Surrogates; +using Microsoft.AutoGen.Protobuf; + +namespace Microsoft.AutoGen.RuntimeGateway.Grpc.Orleans.Surrogates; [GenerateSerializer] public struct GetSubscriptionsRequestSurrogate diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/RegisterAgentTypeRequestSurrogate.cs b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/Surrogates/RegisterAgentTypeRequestSurrogate.cs similarity index 89% rename from dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/RegisterAgentTypeRequestSurrogate.cs rename to dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/Surrogates/RegisterAgentTypeRequestSurrogate.cs index fa50e597fabe..9bcba2391bb4 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/RegisterAgentTypeRequestSurrogate.cs +++ b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/Surrogates/RegisterAgentTypeRequestSurrogate.cs @@ -2,8 +2,9 @@ // RegisterAgentTypeRequestSurrogate.cs using Google.Protobuf.Collections; +using Microsoft.AutoGen.Protobuf; -namespace Microsoft.AutoGen.Runtime.Grpc.Orleans.Surrogates; +namespace Microsoft.AutoGen.RuntimeGateway.Grpc.Orleans.Surrogates; [GenerateSerializer] public struct RegisterAgentTypeRequestSurrogate @@ -27,7 +28,6 @@ public RegisterAgentTypeRequest ConvertFromSurrogate( { var request = new RegisterAgentTypeRequest() { - RequestId = surrogate.RequestId, Type = surrogate.Type }; /* future @@ -40,7 +40,6 @@ public RegisterAgentTypeRequestSurrogate ConvertToSurrogate( in RegisterAgentTypeRequest value) => new RegisterAgentTypeRequestSurrogate { - RequestId = value.RequestId, Type = value.Type, /* future Events = value.Events, diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/RegisterAgentTypeResponseSurrogate.cs b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/Surrogates/RegisterAgentTypeResponseSurrogate.cs similarity index 62% rename from dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/RegisterAgentTypeResponseSurrogate.cs rename to dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/Surrogates/RegisterAgentTypeResponseSurrogate.cs index 2c7d6788a76c..c91fb3833c30 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/RegisterAgentTypeResponseSurrogate.cs +++ b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/Surrogates/RegisterAgentTypeResponseSurrogate.cs @@ -1,7 +1,9 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // RegisterAgentTypeResponseSurrogate.cs -namespace Microsoft.AutoGen.Runtime.Grpc.Orleans.Surrogates; +using Microsoft.AutoGen.Protobuf; + +namespace Microsoft.AutoGen.RuntimeGateway.Grpc.Orleans.Surrogates; [GenerateSerializer] public struct RegisterAgentTypeResponseSurrogate @@ -20,20 +22,10 @@ public sealed class RegisterAgentTypeResponseSurrogateConverter : { public RegisterAgentTypeResponse ConvertFromSurrogate( in RegisterAgentTypeResponseSurrogate surrogate) => - new RegisterAgentTypeResponse - { - RequestId = surrogate.RequestId, - Success = surrogate.Success, - Error = surrogate.Error - }; + new RegisterAgentTypeResponse { }; public RegisterAgentTypeResponseSurrogate ConvertToSurrogate( in RegisterAgentTypeResponse value) => - new RegisterAgentTypeResponseSurrogate - { - RequestId = value.RequestId, - Success = value.Success, - Error = value.Error - }; + new RegisterAgentTypeResponseSurrogate { }; } diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/RemoveSubscriptionRequest.cs b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/Surrogates/RemoveSubscriptionRequest.cs similarity index 89% rename from dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/RemoveSubscriptionRequest.cs rename to dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/Surrogates/RemoveSubscriptionRequest.cs index 27299728baa8..9b397c2bb82d 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/RemoveSubscriptionRequest.cs +++ b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/Surrogates/RemoveSubscriptionRequest.cs @@ -1,7 +1,8 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // RemoveSubscriptionRequest.cs +using Microsoft.AutoGen.Protobuf; -namespace Microsoft.AutoGen.Runtime.Grpc.Orleans.Surrogates; +namespace Microsoft.AutoGen.RuntimeGateway.Grpc.Orleans.Surrogates; [GenerateSerializer] public struct RemoveSubscriptionRequestSurrogate diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/RemoveSubscriptionResponse.cs b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/Surrogates/RemoveSubscriptionResponse.cs similarity index 67% rename from dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/RemoveSubscriptionResponse.cs rename to dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/Surrogates/RemoveSubscriptionResponse.cs index 88253c99b916..eec77162942e 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/RemoveSubscriptionResponse.cs +++ b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/Surrogates/RemoveSubscriptionResponse.cs @@ -1,7 +1,8 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // RemoveSubscriptionResponse.cs +using Microsoft.AutoGen.Protobuf; -namespace Microsoft.AutoGen.Runtime.Grpc.Orleans.Surrogates; +namespace Microsoft.AutoGen.RuntimeGateway.Grpc.Orleans.Surrogates; [GenerateSerializer] public struct RemoveSubscriptionResponseSurrogate @@ -20,18 +21,10 @@ public sealed class SubscriptionResponseSurrogateConverter : { public RemoveSubscriptionResponse ConvertFromSurrogate( in RemoveSubscriptionResponseSurrogate surrogate) => - new RemoveSubscriptionResponse - { - Success = surrogate.Success, - Error = surrogate.Error - }; + new RemoveSubscriptionResponse { }; public RemoveSubscriptionResponseSurrogate ConvertToSurrogate( in RemoveSubscriptionResponse value) => - new RemoveSubscriptionResponseSurrogate - { - Success = value.Success, - Error = value.Error - }; + new RemoveSubscriptionResponseSurrogate { }; } diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/RpcRequestSurrogate.cs b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/Surrogates/RpcRequestSurrogate.cs similarity index 92% rename from dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/RpcRequestSurrogate.cs rename to dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/Surrogates/RpcRequestSurrogate.cs index 9791a68d7952..4b9fdb2500f6 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/RpcRequestSurrogate.cs +++ b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/Surrogates/RpcRequestSurrogate.cs @@ -2,9 +2,9 @@ // RpcRequestSurrogate.cs using Google.Protobuf.Collections; -using Microsoft.AutoGen.Contracts; +using Microsoft.AutoGen.Protobuf; -namespace Microsoft.AutoGen.Runtime.Grpc.Orleans.Surrogates; +namespace Microsoft.AutoGen.RuntimeGateway.Grpc.Orleans.Surrogates; [GenerateSerializer] public struct RpcRequestSurrogate diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/RpcResponseSurrogate.cs b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/Surrogates/RpcResponseSurrogate.cs similarity index 91% rename from dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/RpcResponseSurrogate.cs rename to dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/Surrogates/RpcResponseSurrogate.cs index 5c9fac246f84..999ae2bf6502 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/RpcResponseSurrogate.cs +++ b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/Surrogates/RpcResponseSurrogate.cs @@ -1,9 +1,9 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // RpcResponseSurrogate.cs - using Google.Protobuf.Collections; +using Microsoft.AutoGen.Protobuf; -namespace Microsoft.AutoGen.Runtime.Grpc.Orleans.Surrogates; +namespace Microsoft.AutoGen.RuntimeGateway.Grpc.Orleans.Surrogates; [GenerateSerializer] public struct RpcResponseSurrogate diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/SubscriptionSurrogate.cs b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/Surrogates/SubscriptionSurrogate.cs similarity index 92% rename from dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/SubscriptionSurrogate.cs rename to dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/Surrogates/SubscriptionSurrogate.cs index 1fd56c176278..6942ada72b0b 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/SubscriptionSurrogate.cs +++ b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/Surrogates/SubscriptionSurrogate.cs @@ -1,9 +1,8 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // SubscriptionSurrogate.cs +using Microsoft.AutoGen.Protobuf; -using Microsoft.AutoGen.Contracts; - -namespace Microsoft.AutoGen.Runtime.Grpc.Orleans.Surrogates; +namespace Microsoft.AutoGen.RuntimeGateway.Grpc.Orleans.Surrogates; [GenerateSerializer] public struct SubscriptionSurrogate diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/TypePrefixSubscriptionSurrogate.cs b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/Surrogates/TypePrefixSubscriptionSurrogate.cs similarity index 90% rename from dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/TypePrefixSubscriptionSurrogate.cs rename to dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/Surrogates/TypePrefixSubscriptionSurrogate.cs index ca4d721315e8..c38d84641b11 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/TypePrefixSubscriptionSurrogate.cs +++ b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/Surrogates/TypePrefixSubscriptionSurrogate.cs @@ -1,9 +1,9 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // TypePrefixSubscriptionSurrogate.cs -using Microsoft.AutoGen.Contracts; +using Microsoft.AutoGen.Protobuf; -namespace Microsoft.AutoGen.Runtime.Grpc.Orleans.Surrogates; +namespace Microsoft.AutoGen.RuntimeGateway.Grpc.Orleans.Surrogates; [GenerateSerializer] public struct TypePrefixSubscriptionSurrogate diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/TypeSubscriptionSurrogate.cs b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/Surrogates/TypeSubscriptionSurrogate.cs similarity index 89% rename from dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/TypeSubscriptionSurrogate.cs rename to dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/Surrogates/TypeSubscriptionSurrogate.cs index 57fa202ebfc3..958c1c4d682f 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/TypeSubscriptionSurrogate.cs +++ b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/Surrogates/TypeSubscriptionSurrogate.cs @@ -1,9 +1,8 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // TypeSubscriptionSurrogate.cs -using Microsoft.AutoGen.Contracts; - -namespace Microsoft.AutoGen.Runtime.Grpc.Orleans.Surrogates; +using Microsoft.AutoGen.Protobuf; +namespace Microsoft.AutoGen.RuntimeGateway.Grpc.Orleans.Surrogates; [GenerateSerializer] public struct TypeSubscriptionSurrogate diff --git a/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/HelloAgent.AppHost/HelloAgent.AppHost.csproj b/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/HelloAgent.AppHost/HelloAgent.AppHost.csproj index 1442ebe3d05d..441d48d18cb5 100644 --- a/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/HelloAgent.AppHost/HelloAgent.AppHost.csproj +++ b/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/HelloAgent.AppHost/HelloAgent.AppHost.csproj @@ -16,6 +16,6 @@ - + diff --git a/dotnet/test/Microsoft.AutoGen.Integration.Tests/Microsoft.AutoGen.Integration.Tests.csproj b/dotnet/test/Microsoft.AutoGen.Integration.Tests/Microsoft.AutoGen.Integration.Tests.csproj index 320aa44deccb..b3d7cbf8776a 100644 --- a/dotnet/test/Microsoft.AutoGen.Integration.Tests/Microsoft.AutoGen.Integration.Tests.csproj +++ b/dotnet/test/Microsoft.AutoGen.Integration.Tests/Microsoft.AutoGen.Integration.Tests.csproj @@ -28,15 +28,15 @@ - - + + .venv - $(RepoRoot)..\python + $(RepoRoot)..\python @@ -44,10 +44,17 @@ $(PythonVenvRoot)\$(PythonVirtualEnvironmentName)\ True + ~/.local/bin/uv + True + uv + $(Uv) - + + + + diff --git a/dotnet/test/Microsoft.AutoGen.Runtime.Grpc.Tests/GrpcGatewayServiceTests.cs b/dotnet/test/Microsoft.AutoGen.Runtime.Grpc.Tests/GrpcGatewayServiceTests.cs deleted file mode 100644 index fcae1ec3dcdb..000000000000 --- a/dotnet/test/Microsoft.AutoGen.Runtime.Grpc.Tests/GrpcGatewayServiceTests.cs +++ /dev/null @@ -1,187 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// GrpcGatewayServiceTests.cs - -using FluentAssertions; -using Microsoft.AutoGen.Contracts; -using Microsoft.AutoGen.Core; -using Microsoft.AutoGen.Runtime.Grpc.Tests.Helpers.Grpc; -using Microsoft.AutoGen.Runtime.Grpc.Tests.Helpers.Orleans; -using Microsoft.Extensions.Logging; -using Moq; -using NewMessageReceived = Tests.Events.NewMessageReceived; - -namespace Microsoft.AutoGen.Runtime.Grpc.Tests; -[Collection(ClusterCollection.Name)] -[Trait("Category", "GRPC")] -public class GrpcGatewayServiceTests -{ - private readonly ClusterFixture _fixture; - - public GrpcGatewayServiceTests(ClusterFixture fixture) - { - _fixture = fixture; - } - [Fact] - public async Task Test_OpenChannel() - { - var logger = Mock.Of>(); - var gateway = new GrpcGateway(_fixture.Cluster.Client, logger); - var service = new GrpcGatewayService(gateway); - var client = new TestGrpcClient(); - - gateway._workers.Count.Should().Be(0); - var task = OpenChannel(service, client); - gateway._workers.Count.Should().Be(1); - client.Dispose(); - await task; - } - - [Fact] - public async Task Test_Message_Exchange_Through_Gateway() - { - var logger = Mock.Of>(); - var gateway = new GrpcGateway(_fixture.Cluster.Client, logger); - var service = new GrpcGatewayService(gateway); - var client = new TestGrpcClient(); - var task = OpenChannel(service: service, client); - await service.RegisterAgent(await CreateRegistrationRequest(service, typeof(PBAgent), client.CallContext.Peer), client.CallContext); - await service.RegisterAgent(await CreateRegistrationRequest(service, typeof(GMAgent), client.CallContext.Peer), client.CallContext); - - var inputEvent = new NewMessageReceived { Message = $"Start-{client.CallContext.Peer}" }.ToCloudEvent("gh-gh-gh", "gh-gh-gh"); - - client.AddMessage(new Message { CloudEvent = inputEvent }); - var newMessageReceived = await client.ReadNext(); - newMessageReceived!.CloudEvent.Type.Should().Be(GetFullName(typeof(NewMessageReceived))); - newMessageReceived.CloudEvent.Source.Should().Be("gh-gh-gh"); - var secondMessage = await client.ReadNext(); - secondMessage!.CloudEvent.Type.Should().Be(GetFullName(typeof(NewMessageReceived))); - - // Simulate an agent, by publishing a new message in the request stream - var helloEvent = new Hello { Message = $"Hello test-{client.CallContext.Peer}" }.ToCloudEvent("gh-gh-gh", "gh-gh-gh"); - client.AddMessage(new Message { CloudEvent = helloEvent }); - var helloMessageReceived = await client.ReadNext(); - helloMessageReceived!.CloudEvent.Type.Should().Be(GetFullName(typeof(Hello))); - helloMessageReceived.CloudEvent.Source.Should().Be("gh-gh-gh"); - client.Dispose(); - await task; - } - - [Fact] - public async Task Test_RegisterAgent_Should_Succeed() - { - var logger = Mock.Of>(); - var gateway = new GrpcGateway(_fixture.Cluster.Client, logger); - var service = new GrpcGatewayService(gateway); - var client = new TestGrpcClient(); - var task = OpenChannel(service: service, client); - var response = await service.RegisterAgent(await CreateRegistrationRequest(service, typeof(PBAgent), client.CallContext.Peer), client.CallContext); - response.Success.Should().BeTrue(); - client.Dispose(); - await task; - } - - [Fact] - public async Task Test_RegisterAgent_Should_Fail_For_Wrong_ConnectionId() - { - var logger = Mock.Of>(); - var gateway = new GrpcGateway(_fixture.Cluster.Client, logger); - var service = new GrpcGatewayService(gateway); - var client = new TestGrpcClient(); - var response = await service.RegisterAgent(await CreateRegistrationRequest(service, typeof(PBAgent), "faulty_connection_id"), client.CallContext); - response.Success.Should().BeFalse(); - client.Dispose(); - } - - [Fact] - public async Task Test_SaveState() - { - var logger = Mock.Of>(); - var gateway = new GrpcGateway(_fixture.Cluster.Client, logger); - var service = new GrpcGatewayService(gateway); - var callContext = TestServerCallContext.Create(); - var response = await service.SaveState(new AgentState { AgentId = new AgentId { Key = "Test", Type = "test" } }, callContext); - response.Should().NotBeNull(); - } - - [Fact] - public async Task Test_GetState() - { - var logger = Mock.Of>(); - var gateway = new GrpcGateway(_fixture.Cluster.Client, logger); - var service = new GrpcGatewayService(gateway); - var callContext = TestServerCallContext.Create(); - var response = await service.GetState(new AgentId { Key = "", Type = "" }, callContext); - response.Should().NotBeNull(); - } - - private async Task CreateRegistrationRequest(GrpcGatewayService service, Type type, string requestId) - { - var registration = new RegisterAgentTypeRequest - { - Type = type.Name, - RequestId = requestId - }; - var assembly = type.Assembly; - var eventTypes = ReflectionHelper.GetAgentsMetadata(assembly); - var events = eventTypes.GetEventsForAgent(type)?.ToList(); - var topics = eventTypes.GetTopicsForAgent(type)?.ToList(); - if (events is not null && topics is not null) { events.AddRange(topics); } - var client = new TestGrpcClient(); - - if (events != null) - { - foreach (var e in events) - { - var subscriptionRequest = new Message - { - AddSubscriptionRequest = new AddSubscriptionRequest - { - RequestId = Guid.NewGuid().ToString(), - Subscription = new Subscription - { - TypeSubscription = new TypeSubscription - { - AgentType = type.Name, - TopicType = type.Name + "." + e - } - } - } - }; - await service.AddSubscription(subscriptionRequest.AddSubscriptionRequest, client.CallContext); - } - } - var topicTypes = type.GetCustomAttributes(typeof(TopicSubscriptionAttribute), true).Cast().Select(t => t.Topic).ToList(); - if (topicTypes != null) - { - foreach (var topicType in topicTypes) - { - var subscriptionRequest = new Message - { - AddSubscriptionRequest = new AddSubscriptionRequest - { - RequestId = Guid.NewGuid().ToString(), - Subscription = new Subscription - { - TypeSubscription = new TypeSubscription - { - AgentType = type.Name, - TopicType = topicType - } - } - } - }; - await service.AddSubscription(subscriptionRequest.AddSubscriptionRequest, client.CallContext); - } - } - return registration; - } - - private Task OpenChannel(GrpcGatewayService service, TestGrpcClient client) - { - return service.OpenChannel(client.RequestStream, client.ResponseStream, client.CallContext); - } - private string GetFullName(Type type) - { - return ReflectionHelper.GetMessageDescriptor(type)!.FullName; - } -} diff --git a/dotnet/test/Microsoft.AutoGen.Runtime.Grpc.Tests/TestAgent.cs b/dotnet/test/Microsoft.AutoGen.Runtime.Grpc.Tests/TestAgent.cs deleted file mode 100644 index e479a2cd7d33..000000000000 --- a/dotnet/test/Microsoft.AutoGen.Runtime.Grpc.Tests/TestAgent.cs +++ /dev/null @@ -1,46 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// TestAgent.cs - -using System.Collections.Concurrent; -using Microsoft.AutoGen.Contracts; -using Microsoft.AutoGen.Core; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Logging; - -namespace Microsoft.AutoGen.Runtime.Grpc.Tests; - -[TopicSubscription("gh-gh-gh")] -public class PBAgent([FromKeyedServices("AgentsMetadata")] AgentsMetadata eventTypes, ILogger? logger = null) - : Agent(eventTypes, logger) - , IHandle - , IHandle -{ - public async Task Handle(NewMessageReceived item, CancellationToken cancellationToken = default) - { - ReceivedMessages[AgentId.Key] = item.Message; - var hello = new Hello { Message = item.Message }; - await PublishMessageAsync(hello); - } - public Task Handle(GoodBye item, CancellationToken cancellationToken) - { - _logger.LogInformation($"Received GoodBye message {item.Message}"); - return Task.CompletedTask; - } - - public static ConcurrentDictionary ReceivedMessages { get; private set; } = new(); -} - -[TopicSubscription("gh-gh-gh")] -public class GMAgent([FromKeyedServices("AgentsMetadata")] AgentsMetadata eventTypes, ILogger? logger = null) - : Agent(eventTypes, logger) - , IHandle -{ - public async Task Handle(Hello item, CancellationToken cancellationToken) - { - _logger.LogInformation($"Received Hello message {item.Message}"); - ReceivedMessages[AgentId.Key] = item.Message; - await PublishMessageAsync(new GoodBye { Message = "" }); - } - - public static ConcurrentDictionary ReceivedMessages { get; private set; } = new(); -} diff --git a/dotnet/test/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests/GrpcGatewayServiceTests.cs b/dotnet/test/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests/GrpcGatewayServiceTests.cs new file mode 100644 index 000000000000..e9009c92394e --- /dev/null +++ b/dotnet/test/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests/GrpcGatewayServiceTests.cs @@ -0,0 +1,241 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// GrpcGatewayServiceTests.cs + +using FluentAssertions; +using Microsoft.AutoGen.Contracts; +using Microsoft.AutoGen.Core; +using Microsoft.AutoGen.Protobuf; +using Microsoft.AutoGen.RuntimeGateway.Grpc.Tests.Helpers.Grpc; +using Microsoft.AutoGen.RuntimeGateway.Grpc.Tests.Helpers.Orleans; +using Microsoft.Extensions.Logging; +using Moq; +using NewMessageReceived = Tests.Events.NewMessageReceived; + +namespace Microsoft.AutoGen.RuntimeGateway.Grpc.Tests; +[Collection(ClusterCollection.Name)] +[Trait("Category", "UnitV2")] +public class GrpcGatewayServiceTests +{ + private readonly ClusterFixture _fixture; + + public GrpcGatewayServiceTests(ClusterFixture fixture) + { + _fixture = fixture; + } + [Fact] + public async Task Test_OpenChannel() + { + var logger = Mock.Of>(); + var gateway = new GrpcGateway(_fixture.Cluster.Client, logger); + var service = new GrpcGatewayService(gateway); + var client = new TestGrpcClient(); + + gateway._workers.Count.Should().Be(0); + var task = OpenChannel(service, client); + gateway._workers.Count.Should().Be(1); + client.Dispose(); + await task; + } + + [Fact] + public async Task Test_Message_Exchange_Through_Gateway() + { + var logger = Mock.Of>(); + var gateway = new GrpcGateway(_fixture.Cluster.Client, logger); + var service = new GrpcGatewayService(gateway); + var client = new TestGrpcClient(); + var task = OpenChannel(service: service, client); + await service.RegisterAgent(await CreateRegistrationRequest(service, typeof(PBAgent)), client.CallContext); + await service.RegisterAgent(await CreateRegistrationRequest(service, typeof(GMAgent)), client.CallContext); + + //var inputEvent = new NewMessageReceived { Message = $"Start-{client.CallContext.Peer}" }.ToCloudEvent("gh-gh-gh", "gh-gh-gh"); + var newMessage = new NewMessageReceived { Message = $"Start-{client.CallContext.Peer}" }; + var eventType = GetFullName(typeof(NewMessageReceived)); + var inputEvent = CloudEventExtensions.CreateCloudEvent( + Google.Protobuf.WellKnownTypes.Any.Pack(newMessage), + new TopicId(eventType, "gh-gh-gh"), + eventType, + null, + Guid.NewGuid().ToString()); + + client.AddMessage(new Message { CloudEvent = inputEvent }); + var newMessageReceived = await client.ReadNext(); + newMessageReceived!.CloudEvent.Type.Should().Be(GetFullName(typeof(NewMessageReceived))); + newMessageReceived.CloudEvent.Source.Should().Be("gh-gh-gh"); + var secondMessage = await client.ReadNext(); + secondMessage!.CloudEvent.Type.Should().Be(GetFullName(typeof(NewMessageReceived))); + + // Simulate an agent, by publishing a new message in the request stream + //var helloEvent = new Hello { Message = $"Hello test-{client.CallContext.Peer}" }.ToCloudEvent("gh-gh-gh", "gh-gh-gh"); + var hello = new Hello { Message = $"Hello test-{client.CallContext.Peer}" }; + var eventTypeHello = GetFullName(typeof(Hello)); + var helloEvent = CloudEventExtensions.CreateCloudEvent( + Google.Protobuf.WellKnownTypes.Any.Pack(message: hello), + new TopicId(eventTypeHello, "gh-gh-gh"), + eventTypeHello, + null, + Guid.NewGuid().ToString() + ); + client.AddMessage(new Message { CloudEvent = helloEvent }); + var helloMessageReceived = await client.ReadNext(); + helloMessageReceived!.CloudEvent.Type.Should().Be(GetFullName(typeof(Hello))); + helloMessageReceived.CloudEvent.Source.Should().Be("gh-gh-gh"); + client.Dispose(); + await task; + } + + [Fact] + public async Task Test_RegisterAgent_Should_Succeed() + { + var logger = Mock.Of>(); + var gateway = new GrpcGateway(_fixture.Cluster.Client, logger); + var service = new GrpcGatewayService(gateway); + var client = new TestGrpcClient(); + var task = OpenChannel(service: service, client); + var response = await service.RegisterAgent(await CreateRegistrationRequest(service, typeof(PBAgent)), client.CallContext); + response.GetType().Should().Be(typeof(RegisterAgentTypeResponse)); + client.Dispose(); + await task; + } + + private async Task CreateRegistrationRequest(GrpcGatewayService service, Type type) + { + var registration = new RegisterAgentTypeRequest + { + Type = type.Name, + }; + var assembly = type.Assembly; + var eventTypes = ReflectionHelper.GetAgentsMetadata(assembly); + var events = eventTypes.GetEventsForAgent(type)?.ToList(); + var topics = eventTypes.GetTopicsForAgent(type)?.ToList(); + var topicsPrefix = eventTypes.GetTopicsPrefixForAgent(type)?.ToList(); + if (events is not null && topics is not null) { events.AddRange(topics); } + var client = new TestGrpcClient(); + + if (events != null) + { + foreach (var e in events) + { + var subscriptionRequest = new AddSubscriptionRequest + { + Subscription = new Subscription + { + Id = Guid.NewGuid().ToString(), + TypeSubscription = new Protobuf.TypeSubscription + { + AgentType = type.Name, + TopicType = type.Name + "." + e + } + } + + }; + await service.AddSubscription(subscriptionRequest, client.CallContext); + } + } + var topicTypes = type.GetCustomAttributes(typeof(TypeSubscriptionAttribute), true).Cast().Select(t => t.Topic).ToList(); + if (topicTypes != null) + { + foreach (var topicType in topicTypes) + { + var subscriptionRequest = new AddSubscriptionRequest + { + Subscription = new Subscription + { + Id = Guid.NewGuid().ToString(), + TypeSubscription = new Protobuf.TypeSubscription + { + AgentType = type.Name, + TopicType = topicType + } + } + + }; + await service.AddSubscription(subscriptionRequest, client.CallContext); + } + } + var topicPrefixTypes = type.GetCustomAttributes(typeof(TypePrefixSubscriptionAttribute), true).Cast().Select(t => t.Topic).ToList(); + if (topicPrefixTypes != null) + { + foreach (var topicType in topicPrefixTypes) + { + var subscriptionRequest = new AddSubscriptionRequest + { + Subscription = new Subscription + { + Id = Guid.NewGuid().ToString(), + TypePrefixSubscription = new Protobuf.TypePrefixSubscription + { + AgentType = type.Name, + TopicTypePrefix = topicType + } + } + + }; + await service.AddSubscription(subscriptionRequest, client.CallContext); + } + } + return registration; + } + + private Task OpenChannel(GrpcGatewayService service, TestGrpcClient client) + { + return service.OpenChannel(client.RequestStream, client.ResponseStream, client.CallContext); + } + private string GetFullName(Type type) + { + return ReflectionHelper.GetMessageDescriptor(type)!.FullName; + } + /// duplicate code here because I could not get InternalsVisibleTo to work + internal static class Constants + { + public const string DATA_CONTENT_TYPE_PROTOBUF_VALUE = "application/x-protobuf"; + public const string DATA_CONTENT_TYPE_JSON_VALUE = "application/json"; + public const string DATA_CONTENT_TYPE_TEXT_VALUE = "text/plain"; + + public const string DATA_CONTENT_TYPE_ATTR = "datacontenttype"; + public const string DATA_SCHEMA_ATTR = "dataschema"; + public const string AGENT_SENDER_TYPE_ATTR = "agagentsendertype"; + public const string AGENT_SENDER_KEY_ATTR = "agagentsenderkey"; + + public const string MESSAGE_KIND_ATTR = "agmsgkind"; + public const string MESSAGE_KIND_VALUE_PUBLISH = "publish"; + public const string MESSAGE_KIND_VALUE_RPC_REQUEST = "rpc_request"; + public const string MESSAGE_KIND_VALUE_RPC_RESPONSE = "rpc_response"; + } + internal static class CloudEventExtensions + { + // Convert an ISubscrptionDefinition to a Protobuf Subscription + internal static CloudEvent CreateCloudEvent(Google.Protobuf.WellKnownTypes.Any payload, TopicId topic, string dataType, Contracts.AgentId? sender, string messageId) + { + var attributes = new Dictionary + { + { + Constants.DATA_CONTENT_TYPE_ATTR, new CloudEvent.Types.CloudEventAttributeValue { CeString = Constants.DATA_CONTENT_TYPE_PROTOBUF_VALUE } + }, + { + Constants.DATA_SCHEMA_ATTR, new CloudEvent.Types.CloudEventAttributeValue { CeString = dataType } + }, + { + Constants.MESSAGE_KIND_ATTR, new CloudEvent.Types.CloudEventAttributeValue { CeString = Constants.MESSAGE_KIND_VALUE_PUBLISH } + } + }; + + if (sender != null) + { + var senderNonNull = (Contracts.AgentId)sender; + attributes.Add(Constants.AGENT_SENDER_TYPE_ATTR, new CloudEvent.Types.CloudEventAttributeValue { CeString = senderNonNull.Type }); + attributes.Add(Constants.AGENT_SENDER_KEY_ATTR, new CloudEvent.Types.CloudEventAttributeValue { CeString = senderNonNull.Key }); + } + + return new CloudEvent + { + ProtoData = payload, + Type = topic.Type, + Source = topic.Source, + Id = messageId, + Attributes = { attributes } + }; + + } + } +} diff --git a/dotnet/test/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests/Helpers/AgentTypes.cs b/dotnet/test/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests/Helpers/AgentTypes.cs new file mode 100644 index 000000000000..d8e286b1fbb3 --- /dev/null +++ b/dotnet/test/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests/Helpers/AgentTypes.cs @@ -0,0 +1,18 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// AgentTypes.cs +using Microsoft.AutoGen.Core; +namespace Microsoft.AutoGen.RuntimeGateway.Grpc.Tests; +public sealed class AgentTypes(Dictionary types) +{ + public Dictionary Types { get; } = types; + public static AgentTypes? GetAgentTypesFromAssembly() + { + var agents = AppDomain.CurrentDomain.GetAssemblies() + .SelectMany(assembly => assembly.GetTypes()) + .Where(type => ReflectionHelper.IsSubclassOfGeneric(type, typeof(BaseAgent)) + && !type.IsAbstract) + .ToDictionary(type => type.Name, type => type); + + return new AgentTypes(agents); + } +} diff --git a/dotnet/test/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests/Helpers/AgentsMetadata.cs b/dotnet/test/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests/Helpers/AgentsMetadata.cs new file mode 100644 index 000000000000..016bfc329bfe --- /dev/null +++ b/dotnet/test/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests/Helpers/AgentsMetadata.cs @@ -0,0 +1,104 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// AgentsMetadata.cs + +using System.Collections.Concurrent; +using Google.Protobuf.Reflection; + +namespace Microsoft.AutoGen.RuntimeGateway.Grpc.Tests; + +/// +/// Represents a collection of event types and their associated metadata. +/// +public sealed class AgentsMetadata +{ + /// + /// Initializes a new instance of the class. + /// + /// The type registry containing protobuf type information. + /// A dictionary mapping event names to their corresponding types. + /// A dictionary mapping types to a set of event names associated with those types. + /// A dictionary mapping types to a set of topics associated with those types. + /// A dictionary mapping types to a set of topics associated with those types. + /// + public AgentsMetadata( + TypeRegistry typeRegistry, + Dictionary types, + Dictionary> eventsMap, + Dictionary> topicsMap, + Dictionary> topicsPrefixMap) + { + TypeRegistry = typeRegistry; + _types = new(types); + _eventsMap = new(eventsMap); + _topicsMap = new(topicsMap); + _topicsPrefixMap = new(topicsPrefixMap); + } + + /// + /// Gets the type registry containing protobuf type information. + /// + public TypeRegistry TypeRegistry { get; } + + private ConcurrentDictionary _types; + + private ConcurrentDictionary> _eventsMap; + private ConcurrentDictionary> _topicsMap; + private ConcurrentDictionary> _topicsPrefixMap; + + /// + /// Checks if a given type handles a specific event name. + /// + /// The type to check. + /// The event name to check. + /// true if the type handles the event name; otherwise, false. + public bool CheckIfTypeHandles(Type type, string eventName) + { + if (_eventsMap.TryGetValue(type, out var events)) + { + return events.Contains(eventName); + } + return false; + } + + /// + /// Gets the event type by its name. + /// + /// The name of the event type. + /// The event type if found; otherwise, null. + public Type? GetEventTypeByName(string type) + { + if (_types.TryGetValue(type, out var eventType)) + { + return eventType; + } + return null; + } + + public HashSet? GetEventsForAgent(Type agent) + { + if (_eventsMap.TryGetValue(agent, out var events)) + { + return events; + } + return null; + } + + public HashSet? GetTopicsForAgent(Type agent) + { + if (_topicsMap.TryGetValue(agent, out var topics)) + { + return topics; + } + return null; + } + + public HashSet? GetTopicsPrefixForAgent(Type type) + { + if (_topicsPrefixMap.TryGetValue(type, out var topics)) + { + return topics; + } + return null; + } +} + diff --git a/dotnet/test/Microsoft.AutoGen.Runtime.Grpc.Tests/Helpers/Grpc/TestAsyncStreamReader.cs b/dotnet/test/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests/Helpers/Grpc/TestAsyncStreamReader.cs similarity index 96% rename from dotnet/test/Microsoft.AutoGen.Runtime.Grpc.Tests/Helpers/Grpc/TestAsyncStreamReader.cs rename to dotnet/test/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests/Helpers/Grpc/TestAsyncStreamReader.cs index 4f26711d149f..a0708a13b484 100644 --- a/dotnet/test/Microsoft.AutoGen.Runtime.Grpc.Tests/Helpers/Grpc/TestAsyncStreamReader.cs +++ b/dotnet/test/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests/Helpers/Grpc/TestAsyncStreamReader.cs @@ -16,7 +16,7 @@ using System.Threading.Channels; using Grpc.Core; -namespace Microsoft.AutoGen.Runtime.Grpc.Tests.Helpers.Grpc; +namespace Microsoft.AutoGen.RuntimeGateway.Grpc.Tests.Helpers.Grpc; public class TestAsyncStreamReader : IDisposable, IAsyncStreamReader where T : class diff --git a/dotnet/test/Microsoft.AutoGen.Runtime.Grpc.Tests/Helpers/Grpc/TestGrpcClient.cs b/dotnet/test/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests/Helpers/Grpc/TestGrpcClient.cs similarity index 91% rename from dotnet/test/Microsoft.AutoGen.Runtime.Grpc.Tests/Helpers/Grpc/TestGrpcClient.cs rename to dotnet/test/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests/Helpers/Grpc/TestGrpcClient.cs index e47f26eda159..8c325678cbf2 100644 --- a/dotnet/test/Microsoft.AutoGen.Runtime.Grpc.Tests/Helpers/Grpc/TestGrpcClient.cs +++ b/dotnet/test/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests/Helpers/Grpc/TestGrpcClient.cs @@ -1,15 +1,12 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // TestGrpcClient.cs - -using Microsoft.AutoGen.Contracts; - -namespace Microsoft.AutoGen.Runtime.Grpc.Tests.Helpers.Grpc; +using Microsoft.AutoGen.Protobuf; +namespace Microsoft.AutoGen.RuntimeGateway.Grpc.Tests.Helpers.Grpc; internal sealed class TestGrpcClient : IDisposable { public TestAsyncStreamReader RequestStream { get; } public TestServerStreamWriter ResponseStream { get; } public TestServerCallContext CallContext { get; } - private CancellationTokenSource CallContextCancellation = new(); public TestGrpcClient() { @@ -28,7 +25,6 @@ public void AddMessage(Message message) { RequestStream.AddMessage(message); } - public void Dispose() { CallContextCancellation.Cancel(); @@ -36,3 +32,4 @@ public void Dispose() ResponseStream.Dispose(); } } + diff --git a/dotnet/test/Microsoft.AutoGen.Runtime.Grpc.Tests/Helpers/Grpc/TestServerCallContext.cs b/dotnet/test/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests/Helpers/Grpc/TestServerCallContext.cs similarity index 94% rename from dotnet/test/Microsoft.AutoGen.Runtime.Grpc.Tests/Helpers/Grpc/TestServerCallContext.cs rename to dotnet/test/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests/Helpers/Grpc/TestServerCallContext.cs index 47f25155602d..491eb112b4bb 100644 --- a/dotnet/test/Microsoft.AutoGen.Runtime.Grpc.Tests/Helpers/Grpc/TestServerCallContext.cs +++ b/dotnet/test/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests/Helpers/Grpc/TestServerCallContext.cs @@ -15,7 +15,7 @@ using Grpc.Core; -namespace Microsoft.AutoGen.Runtime.Grpc.Tests.Helpers.Grpc; +namespace Microsoft.AutoGen.RuntimeGateway.Grpc.Tests.Helpers.Grpc; public class TestServerCallContext : ServerCallContext { @@ -68,6 +68,7 @@ protected override Task WriteResponseHeadersAsyncCore(Metadata responseHeaders) public static TestServerCallContext Create(Metadata? requestHeaders = null, CancellationToken cancellationToken = default) { + requestHeaders ??= new Metadata() { { "client-id", Guid.NewGuid().ToString() } }; return new TestServerCallContext(requestHeaders ?? new Metadata(), cancellationToken); } } diff --git a/dotnet/test/Microsoft.AutoGen.Runtime.Grpc.Tests/Helpers/Grpc/TestServerStreamWriter.cs b/dotnet/test/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests/Helpers/Grpc/TestServerStreamWriter.cs similarity index 97% rename from dotnet/test/Microsoft.AutoGen.Runtime.Grpc.Tests/Helpers/Grpc/TestServerStreamWriter.cs rename to dotnet/test/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests/Helpers/Grpc/TestServerStreamWriter.cs index ca2aeab2e410..92074b2fabc6 100644 --- a/dotnet/test/Microsoft.AutoGen.Runtime.Grpc.Tests/Helpers/Grpc/TestServerStreamWriter.cs +++ b/dotnet/test/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests/Helpers/Grpc/TestServerStreamWriter.cs @@ -16,7 +16,7 @@ using System.Threading.Channels; using Grpc.Core; -namespace Microsoft.AutoGen.Runtime.Grpc.Tests.Helpers.Grpc; +namespace Microsoft.AutoGen.RuntimeGateway.Grpc.Tests.Helpers.Grpc; public class TestServerStreamWriter : IDisposable, IServerStreamWriter where T : class { diff --git a/dotnet/test/Microsoft.AutoGen.Runtime.Grpc.Tests/Helpers/Orleans/ClusterCollection.cs b/dotnet/test/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests/Helpers/Orleans/ClusterCollection.cs similarity index 78% rename from dotnet/test/Microsoft.AutoGen.Runtime.Grpc.Tests/Helpers/Orleans/ClusterCollection.cs rename to dotnet/test/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests/Helpers/Orleans/ClusterCollection.cs index d61dc7b21c50..e391a47f3e6c 100644 --- a/dotnet/test/Microsoft.AutoGen.Runtime.Grpc.Tests/Helpers/Orleans/ClusterCollection.cs +++ b/dotnet/test/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests/Helpers/Orleans/ClusterCollection.cs @@ -1,7 +1,7 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // ClusterCollection.cs -namespace Microsoft.AutoGen.Runtime.Grpc.Tests.Helpers.Orleans; +namespace Microsoft.AutoGen.RuntimeGateway.Grpc.Tests.Helpers.Orleans; [CollectionDefinition(Name)] public sealed class ClusterCollection : ICollectionFixture diff --git a/dotnet/test/Microsoft.AutoGen.Runtime.Grpc.Tests/Helpers/Orleans/ClusterFixture.cs b/dotnet/test/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests/Helpers/Orleans/ClusterFixture.cs similarity index 87% rename from dotnet/test/Microsoft.AutoGen.Runtime.Grpc.Tests/Helpers/Orleans/ClusterFixture.cs rename to dotnet/test/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests/Helpers/Orleans/ClusterFixture.cs index 9db2f7f654d4..cb1320a251c3 100644 --- a/dotnet/test/Microsoft.AutoGen.Runtime.Grpc.Tests/Helpers/Orleans/ClusterFixture.cs +++ b/dotnet/test/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests/Helpers/Orleans/ClusterFixture.cs @@ -3,7 +3,7 @@ using Orleans.TestingHost; -namespace Microsoft.AutoGen.Runtime.Grpc.Tests.Helpers.Orleans; +namespace Microsoft.AutoGen.RuntimeGateway.Grpc.Tests.Helpers.Orleans; public sealed class ClusterFixture : IDisposable { diff --git a/dotnet/test/Microsoft.AutoGen.Runtime.Grpc.Tests/Helpers/Orleans/SiloBuilderConfigurator.cs b/dotnet/test/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests/Helpers/Orleans/SiloBuilderConfigurator.cs similarity index 89% rename from dotnet/test/Microsoft.AutoGen.Runtime.Grpc.Tests/Helpers/Orleans/SiloBuilderConfigurator.cs rename to dotnet/test/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests/Helpers/Orleans/SiloBuilderConfigurator.cs index bb960f7b1107..731ab83694c8 100644 --- a/dotnet/test/Microsoft.AutoGen.Runtime.Grpc.Tests/Helpers/Orleans/SiloBuilderConfigurator.cs +++ b/dotnet/test/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests/Helpers/Orleans/SiloBuilderConfigurator.cs @@ -4,7 +4,7 @@ using Orleans.Serialization; using Orleans.TestingHost; -namespace Microsoft.AutoGen.Runtime.Grpc.Tests.Helpers.Orleans; +namespace Microsoft.AutoGen.RuntimeGateway.Grpc.Tests.Helpers.Orleans; public class SiloBuilderConfigurator : ISiloConfigurator { diff --git a/dotnet/test/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests/Helpers/ReflectionHelper.cs b/dotnet/test/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests/Helpers/ReflectionHelper.cs new file mode 100644 index 000000000000..12e9b799b97c --- /dev/null +++ b/dotnet/test/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests/Helpers/ReflectionHelper.cs @@ -0,0 +1,70 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// ReflectionHelper.cs +using System.Reflection; +using Google.Protobuf; +using Google.Protobuf.Reflection; +using Microsoft.AutoGen.Contracts; +using Microsoft.AutoGen.Core; + +namespace Microsoft.AutoGen.RuntimeGateway.Grpc.Tests; +public sealed class ReflectionHelper +{ + public static bool IsSubclassOfGeneric(Type type, Type genericBaseType) + { + while (type != null && type != typeof(object)) + { + if (genericBaseType == (type.IsGenericType ? type.GetGenericTypeDefinition() : type)) + { + return true; + } + if (type.BaseType == null) + { + return false; + } + type = type.BaseType; + } + return false; + } + public static AgentsMetadata GetAgentsMetadata(params Assembly[] assemblies) + { + var interfaceType = typeof(IMessage); + var pairs = assemblies + .SelectMany(assembly => assembly.GetTypes()) + .Where(type => interfaceType.IsAssignableFrom(type) && type.IsClass && !type.IsAbstract) + .Select(t => (t, GetMessageDescriptor(t))); + + var descriptors = pairs.Select(t => t.Item2); + var typeRegistry = TypeRegistry.FromMessages(descriptors); + var types = pairs.ToDictionary(item => item.Item2?.FullName ?? "", item => item.t); + + var eventsMap = assemblies + .SelectMany(assembly => assembly.GetTypes()) + .Where(type => IsSubclassOfGeneric(type, typeof(BaseAgent)) && !type.IsAbstract) + .Select(t => (t, t.GetInterfaces() + .Where(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IHandle<>)) + .Select(i => GetMessageDescriptor(i.GetGenericArguments().First())?.FullName ?? "").ToHashSet())) + .ToDictionary(item => item.t, item => item.Item2); + var topicsMap = assemblies + .SelectMany(assembly => assembly.GetTypes()) + .Where(type => IsSubclassOfGeneric(type, typeof(BaseAgent)) && !type.IsAbstract) + .Select(t => (t, t.GetCustomAttributes().Select(a => a.Topic).ToHashSet())) + .ToDictionary(item => item.t, item => item.Item2); + var topicsPrefixMap = assemblies + .SelectMany(assembly => assembly.GetTypes()) + .Where(type => IsSubclassOfGeneric(type, typeof(BaseAgent)) && !type.IsAbstract) + .Select(t => (t, t.GetCustomAttributes().Select(a => a.Topic).ToHashSet())) + .ToDictionary(item => item.t, item => item.Item2); + return new AgentsMetadata(typeRegistry, types, eventsMap, topicsMap, topicsPrefixMap); + } + + /// + /// Gets the message descriptor for the specified type. + /// + /// The type to get the message descriptor for. + /// The message descriptor if found; otherwise, null. + public static MessageDescriptor? GetMessageDescriptor(Type type) + { + var property = type.GetProperty("Descriptor", BindingFlags.Static | BindingFlags.Public); + return property?.GetValue(null) as MessageDescriptor; + } +} diff --git a/dotnet/test/Microsoft.AutoGen.Runtime.Grpc.Tests/Microsoft.AutoGen.Runtime.Grpc.Tests.csproj b/dotnet/test/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests.csproj similarity index 74% rename from dotnet/test/Microsoft.AutoGen.Runtime.Grpc.Tests/Microsoft.AutoGen.Runtime.Grpc.Tests.csproj rename to dotnet/test/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests.csproj index c8b00ee268b0..066a49c2de68 100644 --- a/dotnet/test/Microsoft.AutoGen.Runtime.Grpc.Tests/Microsoft.AutoGen.Runtime.Grpc.Tests.csproj +++ b/dotnet/test/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests.csproj @@ -8,17 +8,13 @@ - - runtime; build; native; contentfiles; analyzers; buildtransitive - all - - + diff --git a/dotnet/test/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests/TestAgent.cs b/dotnet/test/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests/TestAgent.cs new file mode 100644 index 000000000000..493b0370433a --- /dev/null +++ b/dotnet/test/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests/TestAgent.cs @@ -0,0 +1,45 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// TestAgent.cs +using System.Collections.Concurrent; +using Microsoft.AutoGen.Contracts; +using Microsoft.AutoGen.Core; +using Microsoft.AutoGen.Protobuf; +using Microsoft.Extensions.Logging; + +namespace Microsoft.AutoGen.RuntimeGateway.Grpc.Tests; + +[TypeSubscription("gh-gh-gh")] +public class PBAgent(Contracts.AgentId id, IAgentRuntime runtime, ILogger? logger = null) + : BaseAgent(id, runtime, "Test Agent", logger), + IHandle, + IHandle +{ + public async ValueTask HandleAsync(NewMessageReceived item, MessageContext messageContext) + { + var key = messageContext.MessageId ?? Guid.NewGuid().ToString(); + ReceivedMessages.AddOrUpdate(key, item.Message, (k, v) => item.Message); + var hello = new Hello { Message = item.Message }; + var typeFullName = typeof(Hello).FullName ?? throw new InvalidOperationException("Type full name is null"); + await PublishMessageAsync(hello, new TopicId(typeFullName), "gh-gh-gh"); + } + public async ValueTask HandleAsync(GoodBye item, MessageContext context) + { + _logger.LogInformation($"Received GoodBye message {item.Message}"); + } + public static ConcurrentDictionary ReceivedMessages { get; private set; } = new(); +} + +[TypeSubscription("gh-gh-gh")] +public class GMAgent(Contracts.AgentId id, IAgentRuntime runtime, ILogger? logger = null) + : BaseAgent(id, runtime, "Test Agent", logger), + IHandle +{ + public async ValueTask HandleAsync(Hello item, MessageContext messageContext) + { + var key = messageContext.MessageId ?? Guid.NewGuid().ToString(); + ReceivedMessages.AddOrUpdate(key, item.Message, (k, v) => item.Message); + var typeFullName = typeof(GoodBye).FullName ?? throw new InvalidOperationException("Type full name is null"); + await PublishMessageAsync(new GoodBye { Message = "" }, new TopicId(typeFullName, "gh-gh-gh")); + } + public static ConcurrentDictionary ReceivedMessages { get; private set; } = new(); +} diff --git a/python/pyproject.toml b/python/pyproject.toml index 29cf8f963f09..832f7518a870 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -103,7 +103,7 @@ gen-proto = [ ] -gen-proto-samples = "python -m grpc_tools.protoc --python_out=./samples/core_xlang_hello_python_agent/protos --grpc_python_out=./samples/core_xlang_hello_python_agent/protos --mypy_out=./samples/core_xlang_hello_python_agent/protos --mypy_grpc_out=./samples/core_xlang_hello_python_agent/protos --proto_path ../protos/ agent_events.proto" +gen-proto-samples = "python -m grpc_tools.protoc --python_out=./samples/core_xlang_hello_python_agent/protos --grpc_python_out=./samples/core_xlang_hello_python_agent/protos --mypy_out=./samples/core_xlang_hello_python_agent/protos --mypy_grpc_out=./samples/core_xlang_hello_python_agent/protos --proto_path ../dotnet/src/Microsoft.AutoGen/Agents/protos/ agent_events.proto" [[tool.poe.tasks.gen-test-proto.sequence]] cmd = "python -m grpc_tools.protoc --python_out=./packages/autogen-core/tests/protos --grpc_python_out=./packages/autogen-core/tests/protos --mypy_out=./packages/autogen-core/tests/protos --mypy_grpc_out=./packages/autogen-core/tests/protos --proto_path ./packages/autogen-core/tests/protos serialization_test.proto" diff --git a/python/samples/core_xlang_hello_python_agent/protos/agent_events_pb2.py b/python/samples/core_xlang_hello_python_agent/protos/agent_events_pb2.py index fdd42804947d..4d65bcefd3cc 100644 --- a/python/samples/core_xlang_hello_python_agent/protos/agent_events_pb2.py +++ b/python/samples/core_xlang_hello_python_agent/protos/agent_events_pb2.py @@ -24,14 +24,14 @@ -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x12\x61gent_events.proto\x12\x06\x61gents\"2\n\x0bTextMessage\x12\x13\n\x0btextMessage\x18\x01 \x01(\t\x12\x0e\n\x06source\x18\x02 \x01(\t\"\x18\n\x05Input\x12\x0f\n\x07message\x18\x01 \x01(\t\"\x1f\n\x0eInputProcessed\x12\r\n\x05route\x18\x01 \x01(\t\"\x19\n\x06Output\x12\x0f\n\x07message\x18\x01 \x01(\t\"\x1e\n\rOutputWritten\x12\r\n\x05route\x18\x01 \x01(\t\"\x1a\n\x07IOError\x12\x0f\n\x07message\x18\x01 \x01(\t\"%\n\x12NewMessageReceived\x12\x0f\n\x07message\x18\x01 \x01(\t\"%\n\x11ResponseGenerated\x12\x10\n\x08response\x18\x01 \x01(\t\"\x1a\n\x07GoodBye\x12\x0f\n\x07message\x18\x01 \x01(\t\" \n\rMessageStored\x12\x0f\n\x07message\x18\x01 \x01(\t\";\n\x12\x43onversationClosed\x12\x0f\n\x07user_id\x18\x01 \x01(\t\x12\x14\n\x0cuser_message\x18\x02 \x01(\t\"\x1b\n\x08Shutdown\x12\x0f\n\x07message\x18\x01 \x01(\tB\x1e\xaa\x02\x1bMicrosoft.AutoGen.Contractsb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x12\x61gent_events.proto\x12\x06\x61gents\"2\n\x0bTextMessage\x12\x13\n\x0btextMessage\x18\x01 \x01(\t\x12\x0e\n\x06source\x18\x02 \x01(\t\"\x18\n\x05Input\x12\x0f\n\x07message\x18\x01 \x01(\t\"\x1f\n\x0eInputProcessed\x12\r\n\x05route\x18\x01 \x01(\t\"\x19\n\x06Output\x12\x0f\n\x07message\x18\x01 \x01(\t\"\x1e\n\rOutputWritten\x12\r\n\x05route\x18\x01 \x01(\t\"\x1a\n\x07IOError\x12\x0f\n\x07message\x18\x01 \x01(\t\"%\n\x12NewMessageReceived\x12\x0f\n\x07message\x18\x01 \x01(\t\"%\n\x11ResponseGenerated\x12\x10\n\x08response\x18\x01 \x01(\t\"\x1a\n\x07GoodBye\x12\x0f\n\x07message\x18\x01 \x01(\t\" \n\rMessageStored\x12\x0f\n\x07message\x18\x01 \x01(\t\";\n\x12\x43onversationClosed\x12\x0f\n\x07user_id\x18\x01 \x01(\t\x12\x14\n\x0cuser_message\x18\x02 \x01(\t\"\x1b\n\x08Shutdown\x12\x0f\n\x07message\x18\x01 \x01(\tB\x1b\xaa\x02\x18Microsoft.AutoGen.Agentsb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'agent_events_pb2', _globals) if not _descriptor._USE_C_DESCRIPTORS: _globals['DESCRIPTOR']._loaded_options = None - _globals['DESCRIPTOR']._serialized_options = b'\252\002\033Microsoft.AutoGen.Contracts' + _globals['DESCRIPTOR']._serialized_options = b'\252\002\030Microsoft.AutoGen.Agents' _globals['_TEXTMESSAGE']._serialized_start=30 _globals['_TEXTMESSAGE']._serialized_end=80 _globals['_INPUT']._serialized_start=82 diff --git a/python/uv.lock b/python/uv.lock index dbe382896347..eab21f4ab3cd 100644 --- a/python/uv.lock +++ b/python/uv.lock @@ -4308,7 +4308,6 @@ name = "nvidia-cublas-cu12" version = "12.4.5.8" source = { registry = "https://pypi.org/simple" } wheels = [ - { url = "https://files.pythonhosted.org/packages/7f/7f/7fbae15a3982dc9595e49ce0f19332423b260045d0a6afe93cdbe2f1f624/nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_aarch64.whl", hash = "sha256:0f8aa1706812e00b9f19dfe0cdb3999b092ccb8ca168c0db5b8ea712456fd9b3", size = 363333771 }, { url = "https://files.pythonhosted.org/packages/ae/71/1c91302526c45ab494c23f61c7a84aa568b8c1f9d196efa5993957faf906/nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl", hash = "sha256:2fc8da60df463fdefa81e323eef2e36489e1c94335b5358bcb38360adf75ac9b", size = 363438805 }, ] @@ -4317,7 +4316,6 @@ name = "nvidia-cuda-cupti-cu12" version = "12.4.127" source = { registry = "https://pypi.org/simple" } wheels = [ - { url = "https://files.pythonhosted.org/packages/93/b5/9fb3d00386d3361b03874246190dfec7b206fd74e6e287b26a8fcb359d95/nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_aarch64.whl", hash = "sha256:79279b35cf6f91da114182a5ce1864997fd52294a87a16179ce275773799458a", size = 12354556 }, { url = "https://files.pythonhosted.org/packages/67/42/f4f60238e8194a3106d06a058d494b18e006c10bb2b915655bd9f6ea4cb1/nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl", hash = "sha256:9dec60f5ac126f7bb551c055072b69d85392b13311fcc1bcda2202d172df30fb", size = 13813957 }, ] @@ -4326,7 +4324,6 @@ name = "nvidia-cuda-nvrtc-cu12" version = "12.4.127" source = { registry = "https://pypi.org/simple" } wheels = [ - { url = "https://files.pythonhosted.org/packages/77/aa/083b01c427e963ad0b314040565ea396f914349914c298556484f799e61b/nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_aarch64.whl", hash = "sha256:0eedf14185e04b76aa05b1fea04133e59f465b6f960c0cbf4e37c3cb6b0ea198", size = 24133372 }, { url = "https://files.pythonhosted.org/packages/2c/14/91ae57cd4db3f9ef7aa99f4019cfa8d54cb4caa7e00975df6467e9725a9f/nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl", hash = "sha256:a178759ebb095827bd30ef56598ec182b85547f1508941a3d560eb7ea1fbf338", size = 24640306 }, ] @@ -4335,7 +4332,6 @@ name = "nvidia-cuda-runtime-cu12" version = "12.4.127" source = { registry = "https://pypi.org/simple" } wheels = [ - { url = "https://files.pythonhosted.org/packages/a1/aa/b656d755f474e2084971e9a297def515938d56b466ab39624012070cb773/nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_aarch64.whl", hash = "sha256:961fe0e2e716a2a1d967aab7caee97512f71767f852f67432d572e36cb3a11f3", size = 894177 }, { url = "https://files.pythonhosted.org/packages/ea/27/1795d86fe88ef397885f2e580ac37628ed058a92ed2c39dc8eac3adf0619/nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl", hash = "sha256:64403288fa2136ee8e467cdc9c9427e0434110899d07c779f25b5c068934faa5", size = 883737 }, ] @@ -4358,7 +4354,6 @@ dependencies = [ { name = "nvidia-nvjitlink-cu12", marker = "(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" }, ] wheels = [ - { url = "https://files.pythonhosted.org/packages/7a/8a/0e728f749baca3fbeffad762738276e5df60851958be7783af121a7221e7/nvidia_cufft_cu12-11.2.1.3-py3-none-manylinux2014_aarch64.whl", hash = "sha256:5dad8008fc7f92f5ddfa2101430917ce2ffacd86824914c82e28990ad7f00399", size = 211422548 }, { url = "https://files.pythonhosted.org/packages/27/94/3266821f65b92b3138631e9c8e7fe1fb513804ac934485a8d05776e1dd43/nvidia_cufft_cu12-11.2.1.3-py3-none-manylinux2014_x86_64.whl", hash = "sha256:f083fc24912aa410be21fa16d157fed2055dab1cc4b6934a0e03cba69eb242b9", size = 211459117 }, ] @@ -4367,7 +4362,6 @@ name = "nvidia-curand-cu12" version = "10.3.5.147" source = { registry = "https://pypi.org/simple" } wheels = [ - { url = "https://files.pythonhosted.org/packages/80/9c/a79180e4d70995fdf030c6946991d0171555c6edf95c265c6b2bf7011112/nvidia_curand_cu12-10.3.5.147-py3-none-manylinux2014_aarch64.whl", hash = "sha256:1f173f09e3e3c76ab084aba0de819c49e56614feae5c12f69883f4ae9bb5fad9", size = 56314811 }, { url = "https://files.pythonhosted.org/packages/8a/6d/44ad094874c6f1b9c654f8ed939590bdc408349f137f9b98a3a23ccec411/nvidia_curand_cu12-10.3.5.147-py3-none-manylinux2014_x86_64.whl", hash = "sha256:a88f583d4e0bb643c49743469964103aa59f7f708d862c3ddb0fc07f851e3b8b", size = 56305206 }, ] @@ -4381,7 +4375,6 @@ dependencies = [ { name = "nvidia-nvjitlink-cu12", marker = "(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" }, ] wheels = [ - { url = "https://files.pythonhosted.org/packages/46/6b/a5c33cf16af09166845345275c34ad2190944bcc6026797a39f8e0a282e0/nvidia_cusolver_cu12-11.6.1.9-py3-none-manylinux2014_aarch64.whl", hash = "sha256:d338f155f174f90724bbde3758b7ac375a70ce8e706d70b018dd3375545fc84e", size = 127634111 }, { url = "https://files.pythonhosted.org/packages/3a/e1/5b9089a4b2a4790dfdea8b3a006052cfecff58139d5a4e34cb1a51df8d6f/nvidia_cusolver_cu12-11.6.1.9-py3-none-manylinux2014_x86_64.whl", hash = "sha256:19e33fa442bcfd085b3086c4ebf7e8debc07cfe01e11513cc6d332fd918ac260", size = 127936057 }, ] @@ -4393,7 +4386,6 @@ dependencies = [ { name = "nvidia-nvjitlink-cu12", marker = "(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" }, ] wheels = [ - { url = "https://files.pythonhosted.org/packages/96/a9/c0d2f83a53d40a4a41be14cea6a0bf9e668ffcf8b004bd65633f433050c0/nvidia_cusparse_cu12-12.3.1.170-py3-none-manylinux2014_aarch64.whl", hash = "sha256:9d32f62896231ebe0480efd8a7f702e143c98cfaa0e8a76df3386c1ba2b54df3", size = 207381987 }, { url = "https://files.pythonhosted.org/packages/db/f7/97a9ea26ed4bbbfc2d470994b8b4f338ef663be97b8f677519ac195e113d/nvidia_cusparse_cu12-12.3.1.170-py3-none-manylinux2014_x86_64.whl", hash = "sha256:ea4f11a2904e2a8dc4b1833cc1b5181cde564edd0d5cd33e3c168eff2d1863f1", size = 207454763 }, ] @@ -4410,7 +4402,6 @@ name = "nvidia-nvjitlink-cu12" version = "12.4.127" source = { registry = "https://pypi.org/simple" } wheels = [ - { url = "https://files.pythonhosted.org/packages/02/45/239d52c05074898a80a900f49b1615d81c07fceadd5ad6c4f86a987c0bc4/nvidia_nvjitlink_cu12-12.4.127-py3-none-manylinux2014_aarch64.whl", hash = "sha256:4abe7fef64914ccfa909bc2ba39739670ecc9e820c83ccc7a6ed414122599b83", size = 20552510 }, { url = "https://files.pythonhosted.org/packages/ff/ff/847841bacfbefc97a00036e0fce5a0f086b640756dc38caea5e1bb002655/nvidia_nvjitlink_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl", hash = "sha256:06b3b9b25bf3f8af351d664978ca26a16d2c5127dbd53c0497e28d1fb9611d57", size = 21066810 }, ] @@ -4419,7 +4410,6 @@ name = "nvidia-nvtx-cu12" version = "12.4.127" source = { registry = "https://pypi.org/simple" } wheels = [ - { url = "https://files.pythonhosted.org/packages/06/39/471f581edbb7804b39e8063d92fc8305bdc7a80ae5c07dbe6ea5c50d14a5/nvidia_nvtx_cu12-12.4.127-py3-none-manylinux2014_aarch64.whl", hash = "sha256:7959ad635db13edf4fc65c06a6e9f9e55fc2f92596db928d169c0bb031e88ef3", size = 100417 }, { url = "https://files.pythonhosted.org/packages/87/20/199b8713428322a2f22b722c62b8cc278cc53dffa9705d744484b5035ee9/nvidia_nvtx_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl", hash = "sha256:781e950d9b9f60d8241ccea575b32f5105a5baf4c2351cab5256a24869f12a1a", size = 99144 }, ]