From 07bdb554977a34a2e27c6f8ed50bb6dbcc19303c Mon Sep 17 00:00:00 2001 From: Jacob Alber Date: Thu, 30 Jan 2025 01:45:32 -0500 Subject: [PATCH] feat: Restore Runtime.Grpc project and get it building --- dotnet/AutoGen.sln | 19 ++- .../Runtime.Grpc/Abstractions/IAgentGrain.cs | 6 +- .../Runtime.Grpc/Abstractions/IGateway.cs | 18 +- .../Abstractions/IGatewayRegistry.cs | 55 +++++- .../Abstractions/IRegistryGrain.cs | 15 ++ .../Microsoft.AutoGen.Runtime.Grpc.csproj | 1 + .../Services/AgentWorkerHostingExtensions.cs | 10 +- .../Runtime.Grpc/Services/Grpc/GrpcGateway.cs | 156 ++++++++++++------ .../Services/Grpc/GrpcGatewayService.cs | 15 +- .../Services/Grpc/GrpcWorkerConnection.cs | 2 + .../Services/Orleans/AgentStateGrain.cs | 23 +++ .../Services/Orleans/RegistryGrain.cs | 4 +- .../AddSubscriptionRequestSurrogate.cs | 6 +- .../AddSubscriptionResponseSurrogate.cs | 14 +- .../Orleans/Surrogates/AgentIdSurrogate.cs | 2 +- .../Orleans/Surrogates/AgentStateSurrogate.cs | 2 +- .../Orleans/Surrogates/CloudEventSurrogate.cs | 1 + .../Surrogates/GetSubscriptionsRequest.cs | 2 + .../RegisterAgentTypeRequestSurrogate.cs | 5 +- .../RegisterAgentTypeResponseSurrogate.cs | 14 +- .../Surrogates/RemoveSubscriptionRequest.cs | 2 + .../Surrogates/RemoveSubscriptionResponse.cs | 10 +- .../Orleans/Surrogates/RpcRequestSurrogate.cs | 2 +- .../Surrogates/RpcResponseSurrogate.cs | 1 + .../Surrogates/SubscriptionSurrogate.cs | 93 +++++------ .../TypePrefixSubscriptionSurrogate.cs | 56 +++---- .../Surrogates/TypeSubscriptionSurrogate.cs | 56 +++---- 27 files changed, 393 insertions(+), 197 deletions(-) diff --git a/dotnet/AutoGen.sln b/dotnet/AutoGen.sln index ab7a07464c52..61fdd8bf4ae4 100644 --- a/dotnet/AutoGen.sln +++ b/dotnet/AutoGen.sln @@ -118,6 +118,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Hello", "Hello", "{F42F9C8E EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.AutoGen.Core.Grpc", "src\Microsoft.AutoGen\Core.Grpc\Microsoft.AutoGen.Core.Grpc.csproj", "{3D83C6DB-ACEA-48F3-959F-145CCD2EE135}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.AutoGen.Runtime.Grpc", "src\Microsoft.AutoGen\Runtime.Grpc\Microsoft.AutoGen.Runtime.Grpc.csproj", "{BEC2FDB8-5FC4-4B88-9D69-69759F63F4DC}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -296,16 +298,18 @@ Global {70A8D4B5-D0A6-4098-A6F3-6ED274B65E7D}.Debug|Any CPU.Build.0 = Debug|Any CPU {70A8D4B5-D0A6-4098-A6F3-6ED274B65E7D}.Release|Any CPU.ActiveCfg = Release|Any CPU {70A8D4B5-D0A6-4098-A6F3-6ED274B65E7D}.Release|Any CPU.Build.0 = Release|Any CPU - {3D83C6DB-ACEA-48F3-959F-145CCD2EE135}.CoreOnly|Any CPU.ActiveCfg = Debug|Any CPU - {3D83C6DB-ACEA-48F3-959F-145CCD2EE135}.CoreOnly|Any CPU.Build.0 = Debug|Any CPU - {3D83C6DB-ACEA-48F3-959F-145CCD2EE135}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {3D83C6DB-ACEA-48F3-959F-145CCD2EE135}.Debug|Any CPU.Build.0 = Debug|Any CPU - {3D83C6DB-ACEA-48F3-959F-145CCD2EE135}.Release|Any CPU.ActiveCfg = Release|Any CPU - {3D83C6DB-ACEA-48F3-959F-145CCD2EE135}.Release|Any CPU.Build.0 = Release|Any CPU {AAD593FE-A49B-425E-A9FE-A0022CD25E3D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {AAD593FE-A49B-425E-A9FE-A0022CD25E3D}.Debug|Any CPU.Build.0 = Debug|Any CPU {AAD593FE-A49B-425E-A9FE-A0022CD25E3D}.Release|Any CPU.ActiveCfg = Release|Any CPU {AAD593FE-A49B-425E-A9FE-A0022CD25E3D}.Release|Any CPU.Build.0 = Release|Any CPU + {3D83C6DB-ACEA-48F3-959F-145CCD2EE135}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {3D83C6DB-ACEA-48F3-959F-145CCD2EE135}.Debug|Any CPU.Build.0 = Debug|Any CPU + {3D83C6DB-ACEA-48F3-959F-145CCD2EE135}.Release|Any CPU.ActiveCfg = Release|Any CPU + {3D83C6DB-ACEA-48F3-959F-145CCD2EE135}.Release|Any CPU.Build.0 = Release|Any CPU + {BEC2FDB8-5FC4-4B88-9D69-69759F63F4DC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {BEC2FDB8-5FC4-4B88-9D69-69759F63F4DC}.Debug|Any CPU.Build.0 = Debug|Any CPU + {BEC2FDB8-5FC4-4B88-9D69-69759F63F4DC}.Release|Any CPU.ActiveCfg = Release|Any CPU + {BEC2FDB8-5FC4-4B88-9D69-69759F63F4DC}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -356,9 +360,10 @@ Global {EAFFE339-26CB-4019-991D-BCCE8E7D33A1} = {F823671B-3ECA-4AE6-86DA-25E920D3FE64} {58AD8E1D-83BD-4950-A324-1A20677D78D9} = {F823671B-3ECA-4AE6-86DA-25E920D3FE64} {70A8D4B5-D0A6-4098-A6F3-6ED274B65E7D} = {CE0AA8D5-12B8-4628-9589-DAD8CB0DDCF6} - {3D83C6DB-ACEA-48F3-959F-145CCD2EE135} = {18BF8DD7-0585-48BF-8F97-AD333080CE06} {AAD593FE-A49B-425E-A9FE-A0022CD25E3D} = {F42F9C8E-7BD9-4687-9B63-AFFA461AF5C1} {F42F9C8E-7BD9-4687-9B63-AFFA461AF5C1} = {CE0AA8D5-12B8-4628-9589-DAD8CB0DDCF6} + {3D83C6DB-ACEA-48F3-959F-145CCD2EE135} = {18BF8DD7-0585-48BF-8F97-AD333080CE06} + {BEC2FDB8-5FC4-4B88-9D69-69759F63F4DC} = {18BF8DD7-0585-48BF-8F97-AD333080CE06} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {93384647-528D-46C8-922C-8DB36A382F0B} diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Abstractions/IAgentGrain.cs b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Abstractions/IAgentGrain.cs index 947b6b0cbc0a..bc6c098a8d2a 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Abstractions/IAgentGrain.cs +++ b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Abstractions/IAgentGrain.cs @@ -1,10 +1,12 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // IAgentGrain.cs +using Microsoft.AutoGen.Protobuf; + namespace Microsoft.AutoGen.Runtime.Grpc.Abstractions; internal interface IAgentGrain : IGrainWithStringKey { - ValueTask ReadStateAsync(); - ValueTask WriteStateAsync(Contracts.AgentState state, string eTag); + ValueTask ReadStateAsync(); + ValueTask WriteStateAsync(AgentState state, string eTag); } diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Abstractions/IGateway.cs b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Abstractions/IGateway.cs index 33bb94f7c49b..6b6ff3bc7b14 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Abstractions/IGateway.cs +++ b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Abstractions/IGateway.cs @@ -1,18 +1,28 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // IGateway.cs using Microsoft.AutoGen.Contracts; +using Microsoft.AutoGen.Protobuf; namespace Microsoft.AutoGen.Runtime.Grpc.Abstractions; +public interface IConnection +{ +} + public interface IGateway : IGrainObserver { - ValueTask InvokeRequestAsync(RpcRequest request); ValueTask BroadcastEventAsync(CloudEvent evt); - ValueTask StoreAsync(Contracts.AgentState value); - ValueTask ReadAsync(AgentId agentId); - ValueTask RegisterAgentTypeAsync(RegisterAgentTypeRequest request); + + ValueTask InvokeRequestAsync(RpcRequest request); + + ValueTask StoreAsync(Protobuf.AgentState value); + ValueTask ReadAsync(Protobuf.AgentId agentId); + + ValueTask RegisterAgentTypeAsync(string requestId, RegisterAgentTypeRequest request); + ValueTask SubscribeAsync(AddSubscriptionRequest request); ValueTask UnsubscribeAsync(RemoveSubscriptionRequest request); ValueTask> GetSubscriptionsAsync(GetSubscriptionsRequest request); + Task SendMessageAsync(IConnection connection, CloudEvent cloudEvent); } diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Abstractions/IGatewayRegistry.cs b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Abstractions/IGatewayRegistry.cs index cb3778418040..20f06e8b3636 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Abstractions/IGatewayRegistry.cs +++ b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Abstractions/IGatewayRegistry.cs @@ -2,9 +2,62 @@ // IGatewayRegistry.cs using Microsoft.AutoGen.Contracts; +using Microsoft.AutoGen.Protobuf; namespace Microsoft.AutoGen.Runtime.Grpc.Abstractions; +public interface IRegistry +{ + //AgentsRegistryState State { get; set; } + /// + /// Registers a new agent type with the specified worker. + /// + /// The request containing agent type details. + /// The worker to register the agent type with. + /// A task representing the asynchronous operation. + /// removing CancellationToken from here as it is not compatible with Orleans Serialization + ValueTask RegisterAgentTypeAsync(RegisterAgentTypeRequest request, IAgentRuntime worker); + + /// + /// Unregisters an agent type from the specified worker. + /// + /// The type of the agent to unregister. + /// The worker to unregister the agent type from. + /// A task representing the asynchronous operation. + /// removing CancellationToken from here as it is not compatible with Orleans Serialization + ValueTask UnregisterAgentTypeAsync(string type, IAgentRuntime worker); + + /// + /// Gets a list of agents subscribed to and handling the specified topic and event type. + /// + /// The topic to check subscriptions for. + /// The event type to check subscriptions for. + /// A task representing the asynchronous operation, with the list of agent IDs as the result. + ValueTask> GetSubscribedAndHandlingAgentsAsync(string topic, string eventType); + + /// + /// Subscribes an agent to a topic. + /// + /// The subscription request. + /// A task representing the asynchronous operation. + /// removing CancellationToken from here as it is not compatible with Orleans Serialization + ValueTask SubscribeAsync(AddSubscriptionRequest request); + + /// + /// Unsubscribes an agent from a topic. + /// + /// The unsubscription request. + /// A task representing the asynchronous operation. + /// removing CancellationToken from here as it is not compatible with Orleans Serialization + ValueTask UnsubscribeAsync(RemoveSubscriptionRequest request); // TODO: This should have its own request type. + + /// + /// Gets the subscriptions for a specified agent type. + /// + /// A task representing the asynchronous operation, with the subscriptions as the result. + ValueTask> GetSubscriptionsAsync(GetSubscriptionsRequest request); +} + /// /// Interface for managing agent registration, placement, and subscriptions. /// @@ -15,7 +68,7 @@ public interface IGatewayRegistry : IRegistry /// /// The ID of the agent. /// A tuple containing the worker and a boolean indicating if it's a new placement. - ValueTask<(IGateway? Worker, bool NewPlacement)> GetOrPlaceAgent(AgentId agentId); + ValueTask<(IGateway? Worker, bool NewPlacement)> GetOrPlaceAgent(Protobuf.AgentId agentId); /// /// Removes a worker from the registry. diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Abstractions/IRegistryGrain.cs b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Abstractions/IRegistryGrain.cs index 81b59858619c..ea7d99ffb8a5 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Abstractions/IRegistryGrain.cs +++ b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Abstractions/IRegistryGrain.cs @@ -1,6 +1,9 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // IRegistryGrain.cs +using Microsoft.AutoGen.Protobuf; +using System.Collections.Concurrent; + namespace Microsoft.AutoGen.Runtime.Grpc.Abstractions; /// @@ -9,3 +12,15 @@ namespace Microsoft.AutoGen.Runtime.Grpc.Abstractions; [Alias("Microsoft.AutoGen.Runtime.Grpc.Abstractions.IRegistryGrain")] public interface IRegistryGrain : IGatewayRegistry, IGrainWithIntegerKey { } + +public class AgentsRegistryState +{ + public ConcurrentDictionary> AgentsToEventsMap { get; set; } = new ConcurrentDictionary>(); + public ConcurrentDictionary> AgentsToTopicsMap { get; set; } = []; + public ConcurrentDictionary> TopicToAgentTypesMap { get; set; } = []; + public ConcurrentDictionary> EventsToAgentTypesMap { get; set; } = []; + public ConcurrentDictionary> GuidSubscriptionsMap { get; set; } = []; + public ConcurrentDictionary AgentTypes { get; set; } = []; + public string Etag { get; set; } = Guid.NewGuid().ToString(); +} + diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Microsoft.AutoGen.Runtime.Grpc.csproj b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Microsoft.AutoGen.Runtime.Grpc.csproj index b874a657d8f2..caf2f64c55fd 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Microsoft.AutoGen.Runtime.Grpc.csproj +++ b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Microsoft.AutoGen.Runtime.Grpc.csproj @@ -6,6 +6,7 @@ + diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/AgentWorkerHostingExtensions.cs b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/AgentWorkerHostingExtensions.cs index 3b130ca4bed5..5fbcca9fede4 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/AgentWorkerHostingExtensions.cs +++ b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/AgentWorkerHostingExtensions.cs @@ -3,7 +3,7 @@ using System.Diagnostics; using Microsoft.AspNetCore.Builder; -using Microsoft.AutoGen.Core; +//using Microsoft.AutoGen.Core; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; using Microsoft.Extensions.Hosting; @@ -18,10 +18,10 @@ public static WebApplicationBuilder AddAgentService(this WebApplicationBuilder b builder.Services.TryAddSingleton(DistributedContextPropagator.Current); builder.Services.AddGrpc(); - builder.Services.AddKeyedSingleton("AgentsMetadata", (sp, key) => - { - return ReflectionHelper.GetAgentsMetadata(AppDomain.CurrentDomain.GetAssemblies()); - }); + //builder.Services.AddKeyedSingleton("AgentsMetadata", (sp, key) => + //{ + // return ReflectionHelper.GetAgentsMetadata(AppDomain.CurrentDomain.GetAssemblies()); + //}); builder.Services.AddSingleton(); builder.Services.AddSingleton(sp => (IHostedService)sp.GetRequiredService()); diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Grpc/GrpcGateway.cs b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Grpc/GrpcGateway.cs index 26c99c894248..afa4c3603b5b 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Grpc/GrpcGateway.cs +++ b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Grpc/GrpcGateway.cs @@ -4,6 +4,7 @@ using System.Collections.Concurrent; using Grpc.Core; using Microsoft.AutoGen.Contracts; +using Microsoft.AutoGen.Protobuf; using Microsoft.AutoGen.Runtime.Grpc.Abstractions; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; @@ -30,6 +31,7 @@ public sealed class GrpcGateway : BackgroundService, IGateway private readonly ConcurrentDictionary<(string Type, string Key), GrpcWorkerConnection> _agentDirectory = new(); // RPC private readonly ConcurrentDictionary<(GrpcWorkerConnection, string), TaskCompletionSource> _pendingRequests = new(); + public GrpcGateway(IClusterClient clusterClient, ILogger logger) { _logger = logger; @@ -38,8 +40,26 @@ public GrpcGateway(IClusterClient clusterClient, ILogger logger) _gatewayRegistry = clusterClient.GetGrain(0); _subscriptions = clusterClient.GetGrain(0); } + public async ValueTask InvokeRequestAsync(RpcRequest request, CancellationToken cancellationToken = default) { + //if (string.IsNullOrWhiteSpace(request.Target.Type) && string.IsNullOrWhiteSpace(request.Target.Key)) + //{ + // // Check if this is a request to the gateway itself. + // switch (request.Method) + // { + // case "RegisterAgentType": + // { + // if (!request.Payload.DataType.Equals(nameof(RegisterAgentTypeRequest))) + // { + // return new(new RpcResponse { Error = "Invalid payload type." }); + // } + + // //return await TryServiceRegisterAgentType(request.RequestId, request.Payload, cancellationToken).ConfigureAwait(false); + // } + // } + //} + var agentId = (request.Target.Type, request.Target.Key); if (!_agentDirectory.TryGetValue(agentId, out var connection) || connection.Completion.IsCompleted == true) { @@ -65,63 +85,89 @@ public async ValueTask InvokeRequestAsync(RpcRequest request, Cance response.RequestId = originalRequestId; return response; } + public async ValueTask StoreAsync(AgentState value, CancellationToken cancellationToken = default) { _ = value.AgentId ?? throw new ArgumentNullException(nameof(value.AgentId)); var agentState = _clusterClient.GetGrain($"{value.AgentId.Type}:{value.AgentId.Key}"); await agentState.WriteStateAsync(value, value.ETag); } - public async ValueTask ReadAsync(AgentId agentId, CancellationToken cancellationToken = default) + + public async ValueTask ReadAsync(Protobuf.AgentId agentId, CancellationToken cancellationToken = default) { var agentState = _clusterClient.GetGrain($"{agentId.Type}:{agentId.Key}"); return await agentState.ReadStateAsync(); } - public async ValueTask RegisterAgentTypeAsync(RegisterAgentTypeRequest request, CancellationToken cancellationToken = default) + + public async ValueTask RegisterAgentTypeAsync(RegisterAgentTypeRequest request, CancellationToken _ = default) { + string requestId = string.Empty; try { - var connection = _workersByConnection[request.RequestId]; + var connection = _workersByConnection[requestId]; connection.AddSupportedType(request.Type); _supportedAgentTypes.GetOrAdd(request.Type, _ => []).Add(connection); await _gatewayRegistry.RegisterAgentTypeAsync(request, _reference).ConfigureAwait(true); return new RegisterAgentTypeResponse { - Success = true, - RequestId = request.RequestId + //Success = true, + //RequestId = request.RequestId }; } - catch (Exception ex) + catch (Exception) { return new RegisterAgentTypeResponse { - Success = false, - RequestId = request.RequestId, - Error = ex.Message + //Success = false, + //RequestId = request.RequestId, + //Error = ex.Message }; } } - public async ValueTask SubscribeAsync(AddSubscriptionRequest request, CancellationToken cancellationToken = default) + + private async ValueTask RegisterAgentTypeAsync(string requestId, GrpcWorkerConnection connection, RegisterAgentTypeRequest msg) + { + connection.AddSupportedType(msg.Type); + _supportedAgentTypes.GetOrAdd(msg.Type, _ => []).Add(connection); + + await _gatewayRegistry.RegisterAgentTypeAsync(msg, _reference).ConfigureAwait(true); + + Message response = new() + { + Response = new RpcResponse + { + RequestId = requestId, + Error = "", + //Success = true + } + }; + + await connection.ResponseStream.WriteAsync(response).ConfigureAwait(false); + } + + public async ValueTask SubscribeAsync(AddSubscriptionRequest request, CancellationToken _ = default) { try { await _gatewayRegistry.SubscribeAsync(request).ConfigureAwait(true); return new AddSubscriptionResponse { - Success = true, - RequestId = request.RequestId + //Success = true, + //RequestId = request.RequestId }; } - catch (Exception ex) + catch (Exception) { return new AddSubscriptionResponse { - Success = false, - RequestId = request.RequestId, - Error = ex.Message + //Success = false, + //RequestId = request.RequestId, + //Error = ex.Message }; } } + protected override async Task ExecuteAsync(CancellationToken stoppingToken) { while (!stoppingToken.IsCancellationRequested) @@ -145,6 +191,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) _logger.LogWarning(exception, "Error removing worker from registry."); } } + internal async Task ConnectToWorkerProcess(IAsyncStreamReader requestStream, IServerStreamWriter responseStream, ServerCallContext context) { _logger.LogInformation("Received new connection from {Peer}.", context.Peer); @@ -153,10 +200,12 @@ internal async Task ConnectToWorkerProcess(IAsyncStreamReader requestSt _workersByConnection.GetOrAdd(context.Peer, workerProcess); await workerProcess.Connect().ConfigureAwait(false); } + internal async Task SendMessageAsync(GrpcWorkerConnection connection, CloudEvent cloudEvent, CancellationToken cancellationToken = default) { await connection.ResponseStream.WriteAsync(new Message { CloudEvent = cloudEvent }, cancellationToken).ConfigureAwait(false); } + internal async Task OnReceivedMessageAsync(GrpcWorkerConnection connection, Message message, CancellationToken cancellationToken = default) { _logger.LogInformation("Received message {Message} from connection {Connection}.", message, connection); @@ -171,18 +220,19 @@ internal async Task OnReceivedMessageAsync(GrpcWorkerConnection connection, Mess case Message.MessageOneofCase.CloudEvent: await DispatchEventAsync(message.CloudEvent, cancellationToken); break; - case Message.MessageOneofCase.RegisterAgentTypeRequest: - await RegisterAgentTypeAsync(connection, message.RegisterAgentTypeRequest); - break; - case Message.MessageOneofCase.AddSubscriptionRequest: - await AddSubscriptionAsync(connection, message.AddSubscriptionRequest); - break; + //case Message.MessageOneofCase.RegisterAgentTypeRequest: + // await RegisterAgentTypeAsync(connection, message.RegisterAgentTypeRequest); + // break; + //case Message.MessageOneofCase.AddSubscriptionRequest: + // await AddSubscriptionAsync(connection, message.AddSubscriptionRequest); + // break; default: // if it wasn't recognized return bad request await RespondBadRequestAsync(connection, $"Unknown message type for message '{message}'."); break; }; } + private void DispatchResponse(GrpcWorkerConnection connection, RpcResponse response) { if (!_pendingRequests.TryRemove((connection, response.RequestId), out var completion)) @@ -193,23 +243,7 @@ private void DispatchResponse(GrpcWorkerConnection connection, RpcResponse respo // Complete the request. completion.SetResult(response); } - private async ValueTask RegisterAgentTypeAsync(GrpcWorkerConnection connection, RegisterAgentTypeRequest msg) - { - connection.AddSupportedType(msg.Type); - _supportedAgentTypes.GetOrAdd(msg.Type, _ => []).Add(connection); - await _gatewayRegistry.RegisterAgentTypeAsync(msg, _reference).ConfigureAwait(true); - Message response = new() - { - RegisterAgentTypeResponse = new() - { - RequestId = msg.RequestId, - Error = "", - Success = true - } - }; - await connection.ResponseStream.WriteAsync(response).ConfigureAwait(false); - } private async ValueTask DispatchEventAsync(CloudEvent evt, CancellationToken cancellationToken = default) { var registry = _clusterClient.GetGrain(0); @@ -238,11 +272,21 @@ private async ValueTask DispatchEventAsync(CloudEvent evt, CancellationToken can _logger.LogWarning("No agent types found for event type {EventType}.", evt.Type); } } + private async ValueTask DispatchRequestAsync(GrpcWorkerConnection connection, RpcRequest request) { var requestId = request.RequestId; if (request.Target is null) { + // If the gateway knows how to service this request, treat the target as the "Gateway" + if (request.Method == "RegisterAgent") + { + //RegisterAgentTypeRequest request = + + //await RegisterAgentTypeAsync(requestId, connection, request.Payload).ConfigureAwait(false); + return; + } + throw new InvalidOperationException($"Request message is missing a target. Message: '{request}'."); } await InvokeRequestDelegate(connection, request, async request => @@ -260,6 +304,7 @@ await InvokeRequestDelegate(connection, request, async request => return await gateway.InvokeRequestAsync(request).ConfigureAwait(true); }).ConfigureAwait(false); } + private static async Task InvokeRequestDelegate(GrpcWorkerConnection connection, RpcRequest request, Func> func) { try @@ -273,6 +318,7 @@ private static async Task InvokeRequestDelegate(GrpcWorkerConnection connection, await connection.ResponseStream.WriteAsync(new Message { Response = new RpcResponse { RequestId = request.RequestId, Error = ex.Message } }).ConfigureAwait(false); } } + internal void OnRemoveWorkerProcess(GrpcWorkerConnection workerProcess) { _workers.TryRemove(workerProcess, out _); @@ -293,10 +339,12 @@ internal void OnRemoveWorkerProcess(GrpcWorkerConnection workerProcess) } } } + private static async ValueTask RespondBadRequestAsync(GrpcWorkerConnection connection, string error) { throw new RpcException(new Status(StatusCode.InvalidArgument, error)); } + private async ValueTask AddSubscriptionAsync(GrpcWorkerConnection connection, AddSubscriptionRequest request) { var topic = ""; @@ -317,15 +365,16 @@ private async ValueTask AddSubscriptionAsync(GrpcWorkerConnection connection, Ad //var response = new SubscriptionResponse { RequestId = request.RequestId, Error = "", Success = true }; Message response = new() { - AddSubscriptionResponse = new() + Response = new() { - RequestId = request.RequestId, + //RequestId = request.RequestId, Error = "", - Success = true + //Success = true } }; await connection.ResponseStream.WriteAsync(response).ConfigureAwait(false); } + private async ValueTask DispatchEventToAgentsAsync(IEnumerable agentTypes, CloudEvent evt) { var tasks = new List(agentTypes.Count()); @@ -341,6 +390,7 @@ private async ValueTask DispatchEventToAgentsAsync(IEnumerable agentType } await Task.WhenAll(tasks).ConfigureAwait(false); } + public async ValueTask BroadcastEventAsync(CloudEvent evt, CancellationToken cancellationToken = default) { var tasks = new List(_workers.Count); @@ -351,10 +401,12 @@ public async ValueTask BroadcastEventAsync(CloudEvent evt, CancellationToken can } await Task.WhenAll(tasks).ConfigureAwait(false); } + Task IGateway.SendMessageAsync(IConnection connection, CloudEvent cloudEvent) { return this.SendMessageAsync(connection, cloudEvent, default); } + public async Task SendMessageAsync(IConnection connection, CloudEvent cloudEvent, CancellationToken cancellationToken = default) { var queue = (GrpcWorkerConnection)connection; @@ -367,52 +419,60 @@ public async ValueTask UnsubscribeAsync(RemoveSubscr { await _gatewayRegistry.UnsubscribeAsync(request).ConfigureAwait(true); return new RemoveSubscriptionResponse - { - Success = true, + //Success = true, }; } - catch (Exception ex) + catch (Exception) { return new RemoveSubscriptionResponse { - Success = false, - Error = ex.Message + //Success = false, + //Error = ex.Message }; } } + public ValueTask> GetSubscriptionsAsync(GetSubscriptionsRequest request, CancellationToken cancellationToken = default) { return _gatewayRegistry.GetSubscriptionsAsync(request); } + async ValueTask IGateway.InvokeRequestAsync(RpcRequest request) { return await InvokeRequestAsync(request, default).ConfigureAwait(false); } + async ValueTask IGateway.BroadcastEventAsync(CloudEvent evt) { await BroadcastEventAsync(evt, default).ConfigureAwait(false); } + ValueTask IGateway.StoreAsync(AgentState value) { return StoreAsync(value, default); } - ValueTask IGateway.ReadAsync(AgentId agentId) + + ValueTask IGateway.ReadAsync(Protobuf.AgentId agentId) { return ReadAsync(agentId, default); } - ValueTask IGateway.RegisterAgentTypeAsync(RegisterAgentTypeRequest request) + + ValueTask IGateway.RegisterAgentTypeAsync(string requestId, RegisterAgentTypeRequest request) { return RegisterAgentTypeAsync(request, default); } + ValueTask IGateway.SubscribeAsync(AddSubscriptionRequest request) { return SubscribeAsync(request, default); } + ValueTask IGateway.UnsubscribeAsync(RemoveSubscriptionRequest request) { return UnsubscribeAsync(request, default); } + ValueTask> IGateway.GetSubscriptionsAsync(GetSubscriptionsRequest request) { return GetSubscriptionsAsync(request); diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Grpc/GrpcGatewayService.cs b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Grpc/GrpcGatewayService.cs index 9481922943c9..4be883854f4b 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Grpc/GrpcGatewayService.cs +++ b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Grpc/GrpcGatewayService.cs @@ -2,7 +2,8 @@ // GrpcGatewayService.cs using Grpc.Core; -using Microsoft.AutoGen.Contracts; +//using Microsoft.AutoGen.Contracts; +using Microsoft.AutoGen.Protobuf; namespace Microsoft.AutoGen.Runtime.Grpc; @@ -26,36 +27,42 @@ public override async Task OpenChannel(IAsyncStreamReader requestStream throw; } } + public override async Task GetState(AgentId request, ServerCallContext context) { var state = await Gateway.ReadAsync(request); return new GetStateResponse { AgentState = state }; } + public override async Task SaveState(AgentState request, ServerCallContext context) { await Gateway.StoreAsync(request); return new SaveStateResponse { - Success = true // TODO: Implement error handling + //Success = true // TODO: Implement error handling }; } + public override async Task AddSubscription(AddSubscriptionRequest request, ServerCallContext context) { - request.RequestId = context.Peer; + //request.RequestId = context.Peer; return await Gateway.SubscribeAsync(request).ConfigureAwait(true); } + public override async Task RemoveSubscription(RemoveSubscriptionRequest request, ServerCallContext context) { return await Gateway.UnsubscribeAsync(request).ConfigureAwait(true); } + public override async Task GetSubscriptions(GetSubscriptionsRequest request, ServerCallContext context) { var subscriptions = await Gateway.GetSubscriptionsAsync(request); return new GetSubscriptionsResponse { Subscriptions = { subscriptions } }; } + public override async Task RegisterAgent(RegisterAgentTypeRequest request, ServerCallContext context) { - request.RequestId = context.Peer; + //request.RequestId = context.Peer; return await Gateway.RegisterAgentTypeAsync(request).ConfigureAwait(true); } } diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Grpc/GrpcWorkerConnection.cs b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Grpc/GrpcWorkerConnection.cs index cba0f8c4772b..6b2c544a8e53 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Grpc/GrpcWorkerConnection.cs +++ b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Grpc/GrpcWorkerConnection.cs @@ -3,6 +3,8 @@ using System.Threading.Channels; using Grpc.Core; +using Microsoft.AutoGen.Protobuf; +using Microsoft.AutoGen.Runtime.Grpc.Abstractions; namespace Microsoft.AutoGen.Runtime.Grpc; diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/AgentStateGrain.cs b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/AgentStateGrain.cs index 97869cd91fd1..d745184f04e3 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/AgentStateGrain.cs +++ b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/AgentStateGrain.cs @@ -1,10 +1,33 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // AgentStateGrain.cs +using Microsoft.AutoGen.Protobuf; using Microsoft.AutoGen.Runtime.Grpc.Abstractions; namespace Microsoft.AutoGen.Runtime.Grpc; +/// +/// Interface for managing the state of an agent. +/// +public interface IAgentState +{ + /// + /// Reads the current state of the agent asynchronously. + /// + /// A token to cancel the operation. + /// A task that represents the asynchronous read operation. The task result contains the current state of the agent. + ValueTask ReadStateAsync(CancellationToken cancellationToken = default); + + /// + /// Writes the specified state of the agent asynchronously. + /// + /// The state to write. + /// The ETag for concurrency control. + /// A token to cancel the operation. + /// A task that represents the asynchronous write operation. The task result contains the ETag of the written state. + ValueTask WriteStateAsync(AgentState state, string eTag, CancellationToken cancellationToken = default); +} + internal sealed class AgentStateGrain([PersistentState("state", "AgentStateStore")] IPersistentState state) : Grain, IAgentState, IAgentGrain { /// diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/RegistryGrain.cs b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/RegistryGrain.cs index 9de7065fdb62..1faa29f39aff 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/RegistryGrain.cs +++ b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/RegistryGrain.cs @@ -1,9 +1,11 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // RegistryGrain.cs using Microsoft.AutoGen.Contracts; +using Microsoft.AutoGen.Protobuf; using Microsoft.AutoGen.Runtime.Grpc.Abstractions; namespace Microsoft.AutoGen.Runtime.Grpc; + internal sealed class RegistryGrain([PersistentState("state", "AgentRegistryStore")] IPersistentState state) : Grain, IRegistryGrain { private readonly Dictionary _workerStates = new(); @@ -54,7 +56,7 @@ public ValueTask> GetSubscribedAndHandlingAgentsAsync(string topic, return new ValueTask>(agents); } - public ValueTask<(IGateway? Worker, bool NewPlacement)> GetOrPlaceAgent(AgentId agentId) + public ValueTask<(IGateway? Worker, bool NewPlacement)> GetOrPlaceAgent(Protobuf.AgentId agentId) { // TODO: Clarify the logic bool isNewPlacement; diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/AddSubscriptionRequestSurrogate.cs b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/AddSubscriptionRequestSurrogate.cs index 37e3af1b9d17..6bb6b90da4d2 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/AddSubscriptionRequestSurrogate.cs +++ b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/AddSubscriptionRequestSurrogate.cs @@ -1,6 +1,8 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // AddSubscriptionRequestSurrogate.cs +using Microsoft.AutoGen.Protobuf; + namespace Microsoft.AutoGen.Runtime.Grpc.Orleans.Surrogates; [GenerateSerializer] @@ -21,7 +23,7 @@ public AddSubscriptionRequest ConvertFromSurrogate( { var request = new AddSubscriptionRequest() { - RequestId = surrogate.RequestId, + //RequestId = surrogate.RequestId, Subscription = surrogate.Subscription }; return request; @@ -31,7 +33,7 @@ public AddSubscriptionRequestSurrogate ConvertToSurrogate( in AddSubscriptionRequest value) => new AddSubscriptionRequestSurrogate { - RequestId = value.RequestId, + //RequestId = value.RequestId, Subscription = value.Subscription }; } diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/AddSubscriptionResponseSurrogate.cs b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/AddSubscriptionResponseSurrogate.cs index 4c15784e0fcc..6313d4f8da7c 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/AddSubscriptionResponseSurrogate.cs +++ b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/AddSubscriptionResponseSurrogate.cs @@ -1,6 +1,8 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // AddSubscriptionResponseSurrogate.cs +using Microsoft.AutoGen.Protobuf; + namespace Microsoft.AutoGen.Runtime.Grpc.Orleans.Surrogates; [GenerateSerializer] @@ -22,18 +24,18 @@ public AddSubscriptionResponse ConvertFromSurrogate( in AddSubscriptionResponseSurrogate surrogate) => new AddSubscriptionResponse { - RequestId = surrogate.RequestId, - Success = surrogate.Success, - Error = surrogate.Error + //RequestId = surrogate.RequestId, + //Success = surrogate.Success, + //Error = surrogate.Error }; public AddSubscriptionResponseSurrogate ConvertToSurrogate( in AddSubscriptionResponse value) => new AddSubscriptionResponseSurrogate { - RequestId = value.RequestId, - Success = value.Success, - Error = value.Error + //RequestId = value.RequestId, + //Success = value.Success, + //Error = value.Error }; } diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/AgentIdSurrogate.cs b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/AgentIdSurrogate.cs index ddef9e997575..d19c3b058025 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/AgentIdSurrogate.cs +++ b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/AgentIdSurrogate.cs @@ -3,7 +3,7 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // AgentIdSurrogate.cs -using Microsoft.AutoGen.Contracts; +using Microsoft.AutoGen.Protobuf; namespace Microsoft.AutoGen.Runtime.Grpc.Orleans.Surrogates; diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/AgentStateSurrogate.cs b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/AgentStateSurrogate.cs index a5291f942155..67b35ef1e8b1 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/AgentStateSurrogate.cs +++ b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/AgentStateSurrogate.cs @@ -2,7 +2,7 @@ // AgentStateSurrogate.cs using Google.Protobuf; -using Microsoft.AutoGen.Contracts; +using Microsoft.AutoGen.Protobuf; namespace Microsoft.AutoGen.Runtime.Grpc.Orleans.Surrogates; diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/CloudEventSurrogate.cs b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/CloudEventSurrogate.cs index 22359a08981c..7572ec3c31a3 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/CloudEventSurrogate.cs +++ b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/CloudEventSurrogate.cs @@ -3,6 +3,7 @@ using Google.Protobuf; using Google.Protobuf.WellKnownTypes; +using Microsoft.AutoGen.Contracts; namespace Microsoft.AutoGen.Runtime.Grpc.Orleans.Surrogates; diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/GetSubscriptionsRequest.cs b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/GetSubscriptionsRequest.cs index ab4722ff8c74..c9910ca19c16 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/GetSubscriptionsRequest.cs +++ b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/GetSubscriptionsRequest.cs @@ -1,6 +1,8 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // GetSubscriptionsRequest.cs +using Microsoft.AutoGen.Protobuf; + namespace Microsoft.AutoGen.Runtime.Grpc.Orleans.Surrogates; [GenerateSerializer] diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/RegisterAgentTypeRequestSurrogate.cs b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/RegisterAgentTypeRequestSurrogate.cs index fa50e597fabe..b2abf686a121 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/RegisterAgentTypeRequestSurrogate.cs +++ b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/RegisterAgentTypeRequestSurrogate.cs @@ -2,6 +2,7 @@ // RegisterAgentTypeRequestSurrogate.cs using Google.Protobuf.Collections; +using Microsoft.AutoGen.Protobuf; namespace Microsoft.AutoGen.Runtime.Grpc.Orleans.Surrogates; @@ -27,7 +28,7 @@ public RegisterAgentTypeRequest ConvertFromSurrogate( { var request = new RegisterAgentTypeRequest() { - RequestId = surrogate.RequestId, + //RequestId = surrogate.RequestId, Type = surrogate.Type }; /* future @@ -40,7 +41,7 @@ public RegisterAgentTypeRequestSurrogate ConvertToSurrogate( in RegisterAgentTypeRequest value) => new RegisterAgentTypeRequestSurrogate { - RequestId = value.RequestId, + //RequestId = value.RequestId, Type = value.Type, /* future Events = value.Events, diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/RegisterAgentTypeResponseSurrogate.cs b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/RegisterAgentTypeResponseSurrogate.cs index 2c7d6788a76c..c6bf562bf8b6 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/RegisterAgentTypeResponseSurrogate.cs +++ b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/RegisterAgentTypeResponseSurrogate.cs @@ -1,6 +1,8 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // RegisterAgentTypeResponseSurrogate.cs +using Microsoft.AutoGen.Protobuf; + namespace Microsoft.AutoGen.Runtime.Grpc.Orleans.Surrogates; [GenerateSerializer] @@ -22,18 +24,18 @@ public RegisterAgentTypeResponse ConvertFromSurrogate( in RegisterAgentTypeResponseSurrogate surrogate) => new RegisterAgentTypeResponse { - RequestId = surrogate.RequestId, - Success = surrogate.Success, - Error = surrogate.Error + //RequestId = surrogate.RequestId, + //Success = surrogate.Success, + //Error = surrogate.Error }; public RegisterAgentTypeResponseSurrogate ConvertToSurrogate( in RegisterAgentTypeResponse value) => new RegisterAgentTypeResponseSurrogate { - RequestId = value.RequestId, - Success = value.Success, - Error = value.Error + //RequestId = value.RequestId, + //Success = value.Success, + //Error = value.Error }; } diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/RemoveSubscriptionRequest.cs b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/RemoveSubscriptionRequest.cs index 27299728baa8..96edcc101715 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/RemoveSubscriptionRequest.cs +++ b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/RemoveSubscriptionRequest.cs @@ -1,6 +1,8 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // RemoveSubscriptionRequest.cs +using Microsoft.AutoGen.Protobuf; + namespace Microsoft.AutoGen.Runtime.Grpc.Orleans.Surrogates; [GenerateSerializer] diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/RemoveSubscriptionResponse.cs b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/RemoveSubscriptionResponse.cs index 88253c99b916..27fcf5edb48d 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/RemoveSubscriptionResponse.cs +++ b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/RemoveSubscriptionResponse.cs @@ -1,6 +1,8 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // RemoveSubscriptionResponse.cs +using Microsoft.AutoGen.Protobuf; + namespace Microsoft.AutoGen.Runtime.Grpc.Orleans.Surrogates; [GenerateSerializer] @@ -22,16 +24,16 @@ public RemoveSubscriptionResponse ConvertFromSurrogate( in RemoveSubscriptionResponseSurrogate surrogate) => new RemoveSubscriptionResponse { - Success = surrogate.Success, - Error = surrogate.Error + //Success = surrogate.Success, + //Error = surrogate.Error }; public RemoveSubscriptionResponseSurrogate ConvertToSurrogate( in RemoveSubscriptionResponse value) => new RemoveSubscriptionResponseSurrogate { - Success = value.Success, - Error = value.Error + //Success = value.Success, + //Error = value.Error }; } diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/RpcRequestSurrogate.cs b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/RpcRequestSurrogate.cs index 9791a68d7952..a8cf07672a9d 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/RpcRequestSurrogate.cs +++ b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/RpcRequestSurrogate.cs @@ -2,7 +2,7 @@ // RpcRequestSurrogate.cs using Google.Protobuf.Collections; -using Microsoft.AutoGen.Contracts; +using Microsoft.AutoGen.Protobuf; namespace Microsoft.AutoGen.Runtime.Grpc.Orleans.Surrogates; diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/RpcResponseSurrogate.cs b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/RpcResponseSurrogate.cs index 5c9fac246f84..fee1f79f522b 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/RpcResponseSurrogate.cs +++ b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/RpcResponseSurrogate.cs @@ -2,6 +2,7 @@ // RpcResponseSurrogate.cs using Google.Protobuf.Collections; +using Microsoft.AutoGen.Protobuf; namespace Microsoft.AutoGen.Runtime.Grpc.Orleans.Surrogates; diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/SubscriptionSurrogate.cs b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/SubscriptionSurrogate.cs index 1fd56c176278..dc060023c160 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/SubscriptionSurrogate.cs +++ b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/SubscriptionSurrogate.cs @@ -1,54 +1,55 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // SubscriptionSurrogate.cs -using Microsoft.AutoGen.Contracts; +//using Microsoft.AutoGen.Contracts; +//using Microsoft.AutoGen.Protobuf; namespace Microsoft.AutoGen.Runtime.Grpc.Orleans.Surrogates; -[GenerateSerializer] -public struct SubscriptionSurrogate -{ - [Id(0)] - public TypeSubscription? TypeSubscription; - [Id(1)] - public TypePrefixSubscription? TypePrefixSubscription; - [Id(2)] - public string Id; -} +//[GenerateSerializer] +//public struct SubscriptionSurrogate +//{ +// [Id(0)] +// public TypeSubscription? TypeSubscription; +// [Id(1)] +// public TypePrefixSubscription? TypePrefixSubscription; +// [Id(2)] +// public string Id; +//} -[RegisterConverter] -public sealed class SubscriptionSurrogateConverter : - IConverter -{ - public Subscription ConvertFromSurrogate( - in SubscriptionSurrogate surrogate) - { - if (surrogate.TypeSubscription is not null) - { - return new Subscription - { - Id = surrogate.Id, - TypeSubscription = surrogate.TypeSubscription - }; - } - else - { - return new Subscription - { - Id = surrogate.Id, - TypePrefixSubscription = surrogate.TypePrefixSubscription - }; - } - } +//[RegisterConverter] +//public sealed class SubscriptionSurrogateConverter : +// IConverter +//{ +// public Subscription ConvertFromSurrogate( +// in SubscriptionSurrogate surrogate) +// { +// if (surrogate.TypeSubscription is not null) +// { +// return new Subscription +// { +// Id = surrogate.Id, +// TypeSubscription = surrogate.TypeSubscription +// }; +// } +// else +// { +// return new Subscription +// { +// Id = surrogate.Id, +// TypePrefixSubscription = surrogate.TypePrefixSubscription +// }; +// } +// } - public SubscriptionSurrogate ConvertToSurrogate( - in Subscription value) - { - return new SubscriptionSurrogate - { - Id = value.Id, - TypeSubscription = value.TypeSubscription, - TypePrefixSubscription = value.TypePrefixSubscription - }; - } -} +// public SubscriptionSurrogate ConvertToSurrogate( +// in Subscription value) +// { +// return new SubscriptionSurrogate +// { +// Id = value.Id, +// TypeSubscription = value.TypeSubscription, +// TypePrefixSubscription = value.TypePrefixSubscription +// }; +// } +//} diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/TypePrefixSubscriptionSurrogate.cs b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/TypePrefixSubscriptionSurrogate.cs index ca4d721315e8..ff2d684c6bac 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/TypePrefixSubscriptionSurrogate.cs +++ b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/TypePrefixSubscriptionSurrogate.cs @@ -1,36 +1,36 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // TypePrefixSubscriptionSurrogate.cs -using Microsoft.AutoGen.Contracts; +//using Microsoft.AutoGen.Contracts; namespace Microsoft.AutoGen.Runtime.Grpc.Orleans.Surrogates; -[GenerateSerializer] -public struct TypePrefixSubscriptionSurrogate -{ - [Id(0)] - public string TopicTypePrefix; - [Id(1)] - public string AgentType; -} +//[GenerateSerializer] +//public struct TypePrefixSubscriptionSurrogate +//{ +// [Id(0)] +// public string TopicTypePrefix; +// [Id(1)] +// public string AgentType; +//} -[RegisterConverter] -public sealed class TypePrefixSubscriptionConverter : - IConverter -{ - public TypePrefixSubscription ConvertFromSurrogate( - in TypePrefixSubscriptionSurrogate surrogate) => - new TypePrefixSubscription - { - TopicTypePrefix = surrogate.TopicTypePrefix, - AgentType = surrogate.AgentType - }; +//[RegisterConverter] +//public sealed class TypePrefixSubscriptionConverter : +// IConverter +//{ +// public TypePrefixSubscription ConvertFromSurrogate( +// in TypePrefixSubscriptionSurrogate surrogate) => +// new TypePrefixSubscription +// { +// TopicTypePrefix = surrogate.TopicTypePrefix, +// AgentType = surrogate.AgentType +// }; - public TypePrefixSubscriptionSurrogate ConvertToSurrogate( - in TypePrefixSubscription value) => - new TypePrefixSubscriptionSurrogate - { - TopicTypePrefix = value.TopicTypePrefix, - AgentType = value.AgentType - }; -} +// public TypePrefixSubscriptionSurrogate ConvertToSurrogate( +// in TypePrefixSubscription value) => +// new TypePrefixSubscriptionSurrogate +// { +// TopicTypePrefix = value.TopicTypePrefix, +// AgentType = value.AgentType +// }; +//} diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/TypeSubscriptionSurrogate.cs b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/TypeSubscriptionSurrogate.cs index 57fa202ebfc3..ff28bfcac617 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/TypeSubscriptionSurrogate.cs +++ b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/Surrogates/TypeSubscriptionSurrogate.cs @@ -1,36 +1,36 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // TypeSubscriptionSurrogate.cs -using Microsoft.AutoGen.Contracts; +//using Microsoft.AutoGen.Contracts; namespace Microsoft.AutoGen.Runtime.Grpc.Orleans.Surrogates; -[GenerateSerializer] -public struct TypeSubscriptionSurrogate -{ - [Id(0)] - public string TopicType; - [Id(1)] - public string AgentType; -} +//[GenerateSerializer] +//public struct TypeSubscriptionSurrogate +//{ +// [Id(0)] +// public string TopicType; +// [Id(1)] +// public string AgentType; +//} -[RegisterConverter] -public sealed class TypeSubscriptionSurrogateConverter : - IConverter -{ - public TypeSubscription ConvertFromSurrogate( - in TypeSubscriptionSurrogate surrogate) => - new TypeSubscription - { - TopicType = surrogate.TopicType, - AgentType = surrogate.AgentType - }; +//[RegisterConverter] +//public sealed class TypeSubscriptionSurrogateConverter : +// IConverter +//{ +// public TypeSubscription ConvertFromSurrogate( +// in TypeSubscriptionSurrogate surrogate) => +// new TypeSubscription +// { +// TopicType = surrogate.TopicType, +// AgentType = surrogate.AgentType +// }; - public TypeSubscriptionSurrogate ConvertToSurrogate( - in TypeSubscription value) => - new TypeSubscriptionSurrogate - { - TopicType = value.TopicType, - AgentType = value.AgentType - }; -} +// public TypeSubscriptionSurrogate ConvertToSurrogate( +// in TypeSubscription value) => +// new TypeSubscriptionSurrogate +// { +// TopicType = value.TopicType, +// AgentType = value.AgentType +// }; +//}