diff --git a/.gitignore b/.gitignore index bd419b14d7c7..899d49f9dd67 100644 --- a/.gitignore +++ b/.gitignore @@ -195,4 +195,7 @@ samples/apps/autogen-studio/autogenstudio/models/test/ notebook/coding # dotnet artifacts -artifacts \ No newline at end of file +artifacts + +# project data +registry.json \ No newline at end of file diff --git a/dotnet/.editorconfig b/dotnet/.editorconfig index 3821c59cc19f..ce670b6c3aa2 100644 --- a/dotnet/.editorconfig +++ b/dotnet/.editorconfig @@ -701,3 +701,7 @@ generated_code = true # IDE1591 Missing XML comment for publicly visible type or member dotnet_diagnostic.CS1591.severity = none + +[I*.cs] +# dont warn on missing accessibility modifiers for interfaces +dotnet_diagnostic.IDE0040.severity = none \ No newline at end of file diff --git a/dotnet/src/Microsoft.AutoGen/Contracts/AgentsRegistryState.cs b/dotnet/src/Microsoft.AutoGen/Contracts/AgentsRegistryState.cs new file mode 100644 index 000000000000..836d4699cfce --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/Contracts/AgentsRegistryState.cs @@ -0,0 +1,15 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// AgentsRegistryState.cs +using System.Collections.Concurrent; + +namespace Microsoft.AutoGen.Contracts; +public class AgentsRegistryState +{ + public ConcurrentDictionary> AgentsToEventsMap { get; set; } = new ConcurrentDictionary>(); + public ConcurrentDictionary> AgentsToTopicsMap { get; set; } = []; + public ConcurrentDictionary> TopicToAgentTypesMap { get; set; } = []; + public ConcurrentDictionary> EventsToAgentTypesMap { get; set; } = []; + public ConcurrentDictionary> GuidSubscriptionsMap { get; set; } = []; + public ConcurrentDictionary AgentTypes { get; set; } = []; + public string Etag { get; set; } = Guid.NewGuid().ToString(); +} diff --git a/dotnet/src/Microsoft.AutoGen/Contracts/IAgent.cs b/dotnet/src/Microsoft.AutoGen/Contracts/IAgent.cs new file mode 100644 index 000000000000..01a3624cf08e --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/Contracts/IAgent.cs @@ -0,0 +1,24 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// IAgent.cs + +using Google.Protobuf; + +namespace Microsoft.AutoGen.Contracts; + +public interface IAgent +{ + AgentId AgentId { get; } + IAgentRuntime Worker { get; } + ValueTask> GetSubscriptionsAsync(); + ValueTask SubscribeAsync(string topic); + ValueTask UnsubscribeAsync(Guid id); + ValueTask UnsubscribeAsync(string topic); + Task StoreAsync(AgentState state, CancellationToken cancellationToken = default); + Task ReadAsync(AgentId agentId, CancellationToken cancellationToken = default) where T : IMessage, new(); + ValueTask PublishMessageAsync(IMessage message, string topic, string source, string key, CancellationToken token = default); + ValueTask PublishMessageAsync(T message, string topic, string source, CancellationToken token = default) where T : IMessage; + ValueTask PublishMessageAsync(T message, string topic, CancellationToken token = default) where T : IMessage; + ValueTask PublishMessageAsync(T message, CancellationToken token = default) where T : IMessage; + Task HandleRequestAsync(RpcRequest request); + Task HandleObjectAsync(object item, CancellationToken cancellationToken = default); +} diff --git a/dotnet/src/Microsoft.AutoGen/Contracts/IAgentRuntime.cs b/dotnet/src/Microsoft.AutoGen/Contracts/IAgentRuntime.cs new file mode 100644 index 000000000000..a2c771f208f9 --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/Contracts/IAgentRuntime.cs @@ -0,0 +1,93 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// IAgentRuntime.cs + +using Google.Protobuf; +namespace Microsoft.AutoGen.Contracts; + +/// +/// Defines the common surface for agent runtime implementations. +/// +public interface IAgentRuntime +{ + /// + /// Gets the dependency injection service provider for the runtime. + /// + IServiceProvider RuntimeServiceProvider { get; } + + /// + /// Registers a new agent type asynchronously. + /// + /// The request containing the agent type details. + /// A token to cancel the operation. + /// A task that represents the asynchronous operation. + ValueTask RegisterAgentTypeAsync(RegisterAgentTypeRequest request, CancellationToken cancellationToken = default); + + /// + /// to be removed in favor of send_message + /// Sends a request to and agent. + /// + /// The agent sending the request. + /// The request to be sent. + /// A token to cancel the operation. + /// A task that represents the asynchronous operation. + ValueTask RuntimeSendRequestAsync(IAgent agent, RpcRequest request, CancellationToken cancellationToken = default); + + /// + /// Sends a response to the above request. + /// /// to be removed in favor of send_message + /// + /// The response to be sent. + /// A token to cancel the operation. + /// A task that represents the asynchronous operation. + ValueTask RuntimeSendResponseAsync(RpcResponse response, CancellationToken cancellationToken = default); + + /// + /// Publishes a message to a topic. + /// + /// The message to be published. + /// The topic to publish the message to. + /// The agent sending the message. + /// A token to cancel the operation. + /// A task that represents the asynchronous operation. + ValueTask PublishMessageAsync(IMessage message, TopicId topic, IAgent? sender, CancellationToken? cancellationToken = default); + + /// + /// Saves the state of an agent asynchronously. + /// + /// The state to be saved. + /// A token to cancel the operation. + /// A task that represents the asynchronous operation. + ValueTask SaveStateAsync(AgentState value, CancellationToken cancellationToken = default); + + /// + /// Loads the state of an agent asynchronously. + /// + /// The ID of the agent whose state is to be loaded. + /// A token to cancel the operation. + /// A task that represents the asynchronous operation, containing the agent state. + ValueTask LoadStateAsync(AgentId agentId, CancellationToken cancellationToken = default); + + /// + /// Adds a subscription to a topic. + /// + /// The request containing the subscription types. + /// A token to cancel the operation. + /// A task that represents the asynchronous operation, containing the response. + ValueTask AddSubscriptionAsync(AddSubscriptionRequest request, CancellationToken cancellationToken = default); + + /// + /// Removes a subscription. + /// + /// The request containing the subscription id. + /// A token to cancel the operation. + /// A task that represents the asynchronous operation, containing the response. + ValueTask RemoveSubscriptionAsync(RemoveSubscriptionRequest request, CancellationToken cancellationToken = default); + + /// + /// Gets the list of subscriptions. + /// + /// The request containing the subscription query details. + /// A token to cancel the operation. + /// A task that represents the asynchronous operation, containing the list of subscriptions. + ValueTask> GetSubscriptionsAsync(GetSubscriptionsRequest request, CancellationToken cancellationToken = default); +} diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Abstractions/IRegistry.cs b/dotnet/src/Microsoft.AutoGen/Contracts/IRegistry.cs similarity index 53% rename from dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Abstractions/IRegistry.cs rename to dotnet/src/Microsoft.AutoGen/Contracts/IRegistry.cs index 436fa038774e..bea5f657d8a8 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Abstractions/IRegistry.cs +++ b/dotnet/src/Microsoft.AutoGen/Contracts/IRegistry.cs @@ -1,43 +1,18 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // IRegistry.cs +namespace Microsoft.AutoGen.Contracts; -using Microsoft.AutoGen.Contracts; - -namespace Microsoft.AutoGen.Runtime.Grpc.Abstractions; - -/// -/// Interface for managing agent registration, placement, and subscriptions. -/// public interface IRegistry { - /// - /// Gets or places an agent based on the provided agent ID. - /// - /// The ID of the agent. - /// A tuple containing the worker and a boolean indicating if it's a new placement. - ValueTask<(IGateway? Worker, bool NewPlacement)> GetOrPlaceAgent(AgentId agentId); - - /// - /// Removes a worker from the registry. - /// - /// The worker to remove. - /// A task representing the asynchronous operation. - ValueTask RemoveWorker(IGateway worker); - + //AgentsRegistryState State { get; set; } /// /// Registers a new agent type with the specified worker. /// /// The request containing agent type details. /// The worker to register the agent type with. /// A task representing the asynchronous operation. - ValueTask RegisterAgentType(RegisterAgentTypeRequest request, IGateway worker); - - /// - /// Adds a new worker to the registry. - /// - /// The worker to add. - /// A task representing the asynchronous operation. - ValueTask AddWorker(IGateway worker); + /// removing CancellationToken from here as it is not compatible with Orleans Serialization + ValueTask RegisterAgentTypeAsync(RegisterAgentTypeRequest request, IAgentRuntime worker); /// /// Unregisters an agent type from the specified worker. @@ -45,14 +20,8 @@ public interface IRegistry /// The type of the agent to unregister. /// The worker to unregister the agent type from. /// A task representing the asynchronous operation. - ValueTask UnregisterAgentType(string type, IGateway worker); - - /// - /// Gets a compatible worker for the specified agent type. - /// - /// The type of the agent. - /// A task representing the asynchronous operation, with the compatible worker as the result. - ValueTask GetCompatibleWorker(string type); + /// removing CancellationToken from here as it is not compatible with Orleans Serialization + ValueTask UnregisterAgentTypeAsync(string type, IAgentRuntime worker); /// /// Gets a list of agents subscribed to and handling the specified topic and event type. @@ -60,13 +29,14 @@ public interface IRegistry /// The topic to check subscriptions for. /// The event type to check subscriptions for. /// A task representing the asynchronous operation, with the list of agent IDs as the result. - ValueTask> GetSubscribedAndHandlingAgents(string topic, string eventType); + ValueTask> GetSubscribedAndHandlingAgentsAsync(string topic, string eventType); /// /// Subscribes an agent to a topic. /// /// The subscription request. /// A task representing the asynchronous operation. + /// removing CancellationToken from here as it is not compatible with Orleans Serialization ValueTask SubscribeAsync(AddSubscriptionRequest request); /// @@ -74,11 +44,12 @@ public interface IRegistry /// /// The unsubscription request. /// A task representing the asynchronous operation. + /// removing CancellationToken from here as it is not compatible with Orleans Serialization ValueTask UnsubscribeAsync(RemoveSubscriptionRequest request); // TODO: This should have its own request type. /// /// Gets the subscriptions for a specified agent type. /// /// A task representing the asynchronous operation, with the subscriptions as the result. - ValueTask> GetSubscriptions(GetSubscriptionsRequest request); + ValueTask> GetSubscriptionsAsync(GetSubscriptionsRequest request); } diff --git a/dotnet/src/Microsoft.AutoGen/Contracts/IRegistryStorage.cs b/dotnet/src/Microsoft.AutoGen/Contracts/IRegistryStorage.cs new file mode 100644 index 000000000000..499fc9147215 --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/Contracts/IRegistryStorage.cs @@ -0,0 +1,20 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// IRegistryStorage.cs + +namespace Microsoft.AutoGen.Contracts; + +public interface IRegistryStorage +{ + /// + /// Populates the Registry state from the storage. + /// + /// + Task ReadStateAsync(CancellationToken cancellationToken = default); + /// + /// Writes the Registry state to the storage. + /// + /// + /// + /// the etag that was written + ValueTask WriteStateAsync(AgentsRegistryState state, CancellationToken cancellationToken = default); +} diff --git a/dotnet/src/Microsoft.AutoGen/Core.Grpc/GrpcAgentRuntime.cs b/dotnet/src/Microsoft.AutoGen/Core.Grpc/GrpcAgentRuntime.cs index 85d070f368b1..baf16f31981f 100644 --- a/dotnet/src/Microsoft.AutoGen/Core.Grpc/GrpcAgentRuntime.cs +++ b/dotnet/src/Microsoft.AutoGen/Core.Grpc/GrpcAgentRuntime.cs @@ -295,11 +295,10 @@ await WriteChannelAsync(new Message { await WriteChannelAsync(new Message { Response = response }, cancellationToken).ConfigureAwait(false); } - - public new async ValueTask RuntimeSendRequestAsync(Agent agent, RpcRequest request, CancellationToken cancellationToken = default) + public new async ValueTask RuntimeSendRequestAsync(IAgent agent, RpcRequest request, CancellationToken cancellationToken = default) { var requestId = Guid.NewGuid().ToString(); - _pendingRequests[requestId] = (agent, request.RequestId); + _pendingRequests[requestId] = ((Agent)agent, request.RequestId); request.RequestId = requestId; await WriteChannelAsync(new Message { Request = request }, cancellationToken).ConfigureAwait(false); } @@ -307,7 +306,6 @@ await WriteChannelAsync(new Message { await WriteChannelAsync(message, cancellationToken).ConfigureAwait(false); } - public async ValueTask RuntimePublishEventAsync(CloudEvent @event, CancellationToken cancellationToken = default) { await WriteChannelAsync(new Message { CloudEvent = @event }, cancellationToken).ConfigureAwait(false); diff --git a/dotnet/src/Microsoft.AutoGen/Core.Grpc/GrpcAgentWorkerHostBuilderExtension.cs b/dotnet/src/Microsoft.AutoGen/Core.Grpc/GrpcAgentWorkerHostBuilderExtension.cs index 2331d17023fa..4e9e0bbca8bf 100644 --- a/dotnet/src/Microsoft.AutoGen/Core.Grpc/GrpcAgentWorkerHostBuilderExtension.cs +++ b/dotnet/src/Microsoft.AutoGen/Core.Grpc/GrpcAgentWorkerHostBuilderExtension.cs @@ -63,6 +63,8 @@ public static IHostApplicationBuilder AddGrpcAgentWorker(this IHostApplicationBu }); var assemblies = AppDomain.CurrentDomain.GetAssemblies(); builder.Services.TryAddSingleton(DistributedContextPropagator.Current); + builder.Services.AddSingleton(); + builder.Services.AddSingleton(); builder.Services.AddSingleton(); builder.Services.AddSingleton(sp => (IHostedService)sp.GetRequiredService()); builder.Services.AddKeyedSingleton("AgentsMetadata", (sp, key) => diff --git a/dotnet/src/Microsoft.AutoGen/Core/Agent.cs b/dotnet/src/Microsoft.AutoGen/Core/Agent.cs index 865ff8593cf2..ee0abaad280a 100644 --- a/dotnet/src/Microsoft.AutoGen/Core/Agent.cs +++ b/dotnet/src/Microsoft.AutoGen/Core/Agent.cs @@ -16,7 +16,7 @@ namespace Microsoft.AutoGen.Core; /// /// Represents the base class for an agent in the AutoGen system. /// -public abstract class Agent +public abstract class Agent : IAgent { private readonly object _lock = new(); private readonly ConcurrentDictionary> _pendingRequests = []; diff --git a/dotnet/src/Microsoft.AutoGen/Core/AgentRuntime.cs b/dotnet/src/Microsoft.AutoGen/Core/AgentRuntime.cs index 9e18d15faeee..3d824daa8da6 100644 --- a/dotnet/src/Microsoft.AutoGen/Core/AgentRuntime.cs +++ b/dotnet/src/Microsoft.AutoGen/Core/AgentRuntime.cs @@ -8,10 +8,10 @@ namespace Microsoft.AutoGen.Core; /// -/// Represents a worker that manages agents and handles messages. +/// An InMemory single-process implementation of . /// /// -/// Initializes a new instance of the class. +/// Responsible for message routing and delivery. /// /// The application lifetime. /// The service provider. @@ -29,6 +29,13 @@ public class AgentRuntime( private readonly ConcurrentDictionary> _subscriptionsByAgentType = new(); private readonly ConcurrentDictionary> _subscriptionsByTopic = new(); private readonly ConcurrentDictionary> _subscriptionsByGuid = new(); + private readonly IRegistry _registry = serviceProvider.GetRequiredService(); + + /// + public override async ValueTask RegisterAgentTypeAsync(RegisterAgentTypeRequest request, CancellationToken cancellationToken = default) + { + await _registry.RegisterAgentTypeAsync(request, this); + } /// public override ValueTask SaveStateAsync(AgentState value, CancellationToken cancellationToken = default) @@ -52,16 +59,16 @@ public override ValueTask LoadStateAsync(AgentId agentId, Cancellati } } /// - public new async ValueTask RuntimeSendRequestAsync(Agent agent, RpcRequest request, CancellationToken cancellationToken = default) + public override async ValueTask RuntimeSendRequestAsync(IAgent agent, RpcRequest request, CancellationToken cancellationToken = default) { var requestId = Guid.NewGuid().ToString(); - _pendingClientRequests[requestId] = (agent, request.RequestId); + _pendingClientRequests[requestId] = ((Agent)agent, request.RequestId); request.RequestId = requestId; await _mailbox.Writer.WriteAsync(request, cancellationToken).ConfigureAwait(false); } /// - public new ValueTask RuntimeSendResponseAsync(RpcResponse response, CancellationToken cancellationToken = default) + public override ValueTask RuntimeSendResponseAsync(RpcResponse response, CancellationToken cancellationToken = default) { return _mailbox.Writer.WriteAsync(new Message { Response = response }, cancellationToken); } diff --git a/dotnet/src/Microsoft.AutoGen/Core/AgentRuntimeBase.cs b/dotnet/src/Microsoft.AutoGen/Core/AgentRuntimeBase.cs index 7f9b772e0b0f..6584e2b059b2 100644 --- a/dotnet/src/Microsoft.AutoGen/Core/AgentRuntimeBase.cs +++ b/dotnet/src/Microsoft.AutoGen/Core/AgentRuntimeBase.cs @@ -132,11 +132,12 @@ public async Task RunMessagePump() } } } - public async ValueTask PublishMessageAsync(IMessage message, TopicId topic, Agent? sender, CancellationToken? cancellationToken = default) + public abstract ValueTask RegisterAgentTypeAsync(RegisterAgentTypeRequest request, CancellationToken cancellationToken = default); + public async ValueTask PublishMessageAsync(IMessage message, TopicId topic, IAgent? sender, CancellationToken? cancellationToken = default) { var topicString = topic.Type + "." + topic.Source; sender ??= RuntimeServiceProvider.GetRequiredService(); - await PublishEventAsync(message.ToCloudEvent(key: sender.GetType().Name, topic: topicString), sender, cancellationToken.GetValueOrDefault()).ConfigureAwait(false); + await PublishEventAsync(message.ToCloudEvent(key: sender.GetType().Name, topic: topicString), (Agent)sender, cancellationToken.GetValueOrDefault()).ConfigureAwait(false); } public abstract ValueTask SaveStateAsync(AgentState value, CancellationToken cancellationToken = default); public abstract ValueTask LoadStateAsync(AgentId agentId, CancellationToken cancellationToken = default); @@ -202,14 +203,7 @@ private async ValueTask DispatchEventsToAgentsAsync(CloudEvent cloudEvent, Cance } await Task.WhenAll(taskList).ConfigureAwait(false); } + public abstract ValueTask RuntimeSendRequestAsync(IAgent agent, RpcRequest request, CancellationToken cancellationToken = default); - public ValueTask RuntimeSendRequestAsync(Agent agent, RpcRequest request, CancellationToken cancellationToken = default) - { - throw new NotImplementedException(); - } - - public ValueTask RuntimeSendResponseAsync(RpcResponse response, CancellationToken cancellationToken = default) - { - throw new NotImplementedException(); - } + public abstract ValueTask RuntimeSendResponseAsync(RpcResponse response, CancellationToken cancellationToken = default); } diff --git a/dotnet/src/Microsoft.AutoGen/Core/HostBuilderExtensions.cs b/dotnet/src/Microsoft.AutoGen/Core/HostBuilderExtensions.cs index 4cf1fe3ec93d..7df12c12fd27 100644 --- a/dotnet/src/Microsoft.AutoGen/Core/HostBuilderExtensions.cs +++ b/dotnet/src/Microsoft.AutoGen/Core/HostBuilderExtensions.cs @@ -3,6 +3,7 @@ using System.Diagnostics; using System.Diagnostics.CodeAnalysis; +using Microsoft.AutoGen.Contracts; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; using Microsoft.Extensions.Hosting; @@ -29,6 +30,8 @@ public static IHostApplicationBuilder AddAgentWorker(this IHostApplicationBuilde { var assemblies = AppDomain.CurrentDomain.GetAssemblies(); builder.Services.TryAddSingleton(DistributedContextPropagator.Current); + builder.Services.AddSingleton(); + builder.Services.AddSingleton(); builder.Services.AddSingleton(); builder.Services.AddSingleton(sp => (IHostedService)sp.GetRequiredService()); builder.Services.AddKeyedSingleton("AgentsMetadata", (sp, key) => diff --git a/dotnet/src/Microsoft.AutoGen/Core/IAgentRuntime.cs b/dotnet/src/Microsoft.AutoGen/Core/IAgentRuntime.cs deleted file mode 100644 index b2e9b45973f0..000000000000 --- a/dotnet/src/Microsoft.AutoGen/Core/IAgentRuntime.cs +++ /dev/null @@ -1,19 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// IAgentRuntime.cs - -using Google.Protobuf; -using Microsoft.AutoGen.Contracts; -namespace Microsoft.AutoGen.Core; - -public interface IAgentRuntime -{ - IServiceProvider RuntimeServiceProvider { get; } - ValueTask RuntimeSendRequestAsync(Agent agent, RpcRequest request, CancellationToken cancellationToken = default); - ValueTask RuntimeSendResponseAsync(RpcResponse response, CancellationToken cancellationToken = default); - ValueTask PublishMessageAsync(IMessage message, TopicId topic, Agent? sender, CancellationToken? cancellationToken = default); - ValueTask SaveStateAsync(AgentState value, CancellationToken cancellationToken = default); - ValueTask LoadStateAsync(AgentId agentId, CancellationToken cancellationToken = default); - ValueTask AddSubscriptionAsync(AddSubscriptionRequest request, CancellationToken cancellationToken = default); - ValueTask RemoveSubscriptionAsync(RemoveSubscriptionRequest request, CancellationToken cancellationToken = default); - ValueTask> GetSubscriptionsAsync(GetSubscriptionsRequest request, CancellationToken cancellationToken = default); -} diff --git a/dotnet/src/Microsoft.AutoGen/Core/Registry.cs b/dotnet/src/Microsoft.AutoGen/Core/Registry.cs new file mode 100644 index 000000000000..5437844215c5 --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/Core/Registry.cs @@ -0,0 +1,268 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Registry.cs +using Microsoft.AutoGen.Contracts; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; + +namespace Microsoft.AutoGen.Core; +public class Registry : IRegistry +{ + public AgentsRegistryState State { get; set; } + private readonly IRegistryStorage Storage; + private readonly ILogger _logger; + private string _registryEtag; + private const int _retries = 5; + + public Registry(IRegistryStorage storage, ILogger logger) + { + _logger = logger; + Storage = storage; + State = Storage.ReadStateAsync().ConfigureAwait(true).GetAwaiter().GetResult(); + _registryEtag = State.Etag; + _logger.LogInformation("Registry initialized."); + } + + public ValueTask> GetSubscribedAndHandlingAgentsAsync(string topic, string eventType) + { + UpdateStateIfStale(); + List agents = []; + // get all agent types that are subscribed to the topic + if (State.TopicToAgentTypesMap.TryGetValue(topic, out var subscribedAgentTypes)) + { + /*// get all agent types that are handling the event + if (State.EventsToAgentTypesMap.TryGetValue(eventType, out var handlingAgents)) + { + agents.AddRange(subscribedAgentTypes.Intersect(handlingAgents).ToList()); + }*/ + agents.AddRange(subscribedAgentTypes.ToList()); + } + if (State.TopicToAgentTypesMap.TryGetValue(eventType, out var eventHandlingAgents)) + { + agents.AddRange(eventHandlingAgents.ToList()); + } + if (State.TopicToAgentTypesMap.TryGetValue(topic + "." + eventType, out var combo)) + { + agents.AddRange(combo.ToList()); + } + // instead of an exact match, we can also check for a prefix match where key starts with the eventType + if (State.TopicToAgentTypesMap.Keys.Any(key => key.StartsWith(eventType))) + { + State.TopicToAgentTypesMap.Where( + kvp => kvp.Key.StartsWith(eventType)) + .SelectMany(kvp => kvp.Value) + .Distinct() + .ToList() + .ForEach(async agentType => + { + agents.Add(agentType); + }); + } + agents = agents.Distinct().ToList(); + + return new ValueTask>(agents); + } + public async ValueTask RegisterAgentTypeAsync(RegisterAgentTypeRequest registration, IAgentRuntime runtime) + { + var retries = _retries; + while (!await RegisterAgentTypeWriteAsync(registration, runtime)) + { + if (retries == 0) + { + throw new IOException($"Failed to register agent type after {_retries} retries."); + } + _logger.LogWarning("Failed to register agent type, retrying..."); + retries--; + } + } + + private async ValueTask RegisterAgentTypeWriteAsync(RegisterAgentTypeRequest registration, IAgentRuntime runtime, CancellationToken cancellationToken = default) + { + UpdateStateIfStale(); + if (registration.Type is null) + { + throw new InvalidOperationException("RegisterAgentType: Agent type is required."); + } + var agentTypes = AgentTypes.GetAgentTypesFromAssembly() + ?? throw new InvalidOperationException("No agent types found in the assembly"); + + if (!agentTypes.Types.TryGetValue(registration.Type, out var value)) + { + throw new InvalidOperationException($"RegisterAgentType: Invalid agent type {registration.Type}."); + } + try + { + var agentInstance = (Agent)runtime.RuntimeServiceProvider.GetRequiredService(value); + _logger.LogWarning("Agent type {agentType} is already registered.", registration.Type); + State.AgentTypes.TryAdd(registration.Type, agentInstance.AgentId); + } + catch (InvalidOperationException) + { + // Agent type was not yet in the registry - it won't be available in DI + _logger.LogInformation("Agent type {agentType} is not yet registered, activating", registration.Type); + var agent = (Agent)ActivatorUtilities.CreateInstance(runtime.RuntimeServiceProvider, instanceType: value); + Agent.Initialize(runtime, agent); + State.AgentTypes.TryAdd(registration.Type, agent.AgentId); + } + return await WriteStateAsync(State, cancellationToken).ConfigureAwait(false); + } + public async ValueTask SubscribeAsync(AddSubscriptionRequest subscription) + { + var retries = _retries; + while (!await SubscribeWriteAsync(subscription)) + { + if (retries == 0) + { + throw new IOException($"Failed to subscribe after {_retries} retries."); + } + _logger.LogWarning("Failed to subscribe, retrying..."); + retries--; + } + } + private async ValueTask SubscribeWriteAsync(AddSubscriptionRequest subscription, CancellationToken cancellationToken = default) + { + UpdateStateIfStale(); + var guid = Guid.NewGuid().ToString(); + subscription.Subscription.Id = guid; + switch (subscription.Subscription.SubscriptionCase) + { + //TODO: this doesnt look right + case Subscription.SubscriptionOneofCase.TypePrefixSubscription: + break; + case Subscription.SubscriptionOneofCase.TypeSubscription: + { + // add the topic to the set of topics for the agent type + State.AgentsToTopicsMap.TryGetValue(subscription.Subscription.TypeSubscription.AgentType, out var topics); + if (topics is null) + { + topics = new HashSet(); + State.AgentsToTopicsMap[subscription.Subscription.TypeSubscription.AgentType] = topics; + } + topics.Add(subscription.Subscription.TypeSubscription.TopicType); + + // add the agent type to the set of agent types for the topic + State.TopicToAgentTypesMap.TryGetValue(subscription.Subscription.TypeSubscription.TopicType, out var agents); + if (agents is null) + { + agents = new HashSet(); + State.TopicToAgentTypesMap[subscription.Subscription.TypeSubscription.TopicType] = agents; + } + agents.Add(subscription.Subscription.TypeSubscription.AgentType); + + // add the subscription by Guid + State.GuidSubscriptionsMap.TryGetValue(guid, out var existingSubscriptions); + if (existingSubscriptions is null) + { + existingSubscriptions = new HashSet(); + State.GuidSubscriptionsMap[guid] = existingSubscriptions; + } + existingSubscriptions.Add(subscription.Subscription); + break; + } + default: + throw new InvalidOperationException("Invalid subscription type"); + } + return await WriteStateAsync(State, cancellationToken).ConfigureAwait(false); + } + public async ValueTask UnsubscribeAsync(RemoveSubscriptionRequest request) + { + var retries = _retries; + while (!await UnsubscribeWriteAsync(request)) + { + if (retries == 0) + { + throw new IOException($"Failed to unsubscribe after {_retries} retries."); + } + _logger.LogWarning("Failed to unsubscribe, retrying..."); + retries--; + } + } + private async ValueTask UnsubscribeWriteAsync(RemoveSubscriptionRequest request, CancellationToken cancellationToken = default) + { + UpdateStateIfStale(); + var guid = request.Id; + // does the guid parse? + if (!Guid.TryParse(guid, out var _)) + { + throw new InvalidOperationException("Invalid subscription id"); + } + if (State.GuidSubscriptionsMap.TryGetValue(guid, out var subscriptions)) + { + foreach (var subscription in subscriptions) + { + switch (subscription.SubscriptionCase) + { + case Subscription.SubscriptionOneofCase.TypeSubscription: + { + // remove the topic from the set of topics for the agent type + State.AgentsToTopicsMap.TryGetValue(subscription.TypeSubscription.AgentType, out var topics); + topics?.Remove(subscription.TypeSubscription.TopicType); + + // remove the agent type from the set of agent types for the topic + State.TopicToAgentTypesMap.TryGetValue(subscription.TypeSubscription.TopicType, out var agents); + agents?.Remove(subscription.TypeSubscription.AgentType); + + //remove the subscription by Guid + State.GuidSubscriptionsMap.TryGetValue(guid, out var existingSubscriptions); + existingSubscriptions?.Remove(subscription); + break; + } + case Subscription.SubscriptionOneofCase.TypePrefixSubscription: + break; + default: + throw new InvalidOperationException("Invalid subscription type"); + } + } + State.GuidSubscriptionsMap.Remove(guid, out _); + return await WriteStateAsync(State, cancellationToken).ConfigureAwait(false); + } + return true; + } + public ValueTask> GetSubscriptionsAsync(GetSubscriptionsRequest request) + { + var _ = request; + UpdateStateIfStale(); + var subscriptions = new List(); + foreach (var kvp in State.GuidSubscriptionsMap) + { + subscriptions.AddRange(kvp.Value); + } + return new(subscriptions); + } + /// + /// in case there is a write in between our last read and now... + /// + private void UpdateStateIfStale() + { + if (State.Etag != _registryEtag) + { + State = Storage.ReadStateAsync().ConfigureAwait(true).GetAwaiter().GetResult(); + _registryEtag = State.Etag; + } + } + /// + /// Writes the state to the storage. + /// + /// + /// bool true on success, false on failure + private async ValueTask WriteStateAsync(AgentsRegistryState state, CancellationToken cancellationToken = default) + { + try + { + await Storage.WriteStateAsync(state, cancellationToken).ConfigureAwait(false); + _registryEtag = state.Etag; + State = state; + return true; + } + catch (Exception e) + { + _logger.LogError(e, "Failed to write state to storage."); + return false; + } + } + + public ValueTask UnregisterAgentTypeAsync(string type, IAgentRuntime worker) + { + throw new NotImplementedException(); + } +} + diff --git a/dotnet/src/Microsoft.AutoGen/Core/RegistryStorage.cs b/dotnet/src/Microsoft.AutoGen/Core/RegistryStorage.cs new file mode 100644 index 000000000000..aab624ec8dde --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/Core/RegistryStorage.cs @@ -0,0 +1,77 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// RegistryStorage.cs + +using System.Text.Json; +using Microsoft.AutoGen.Contracts; +using Microsoft.Extensions.Logging; + +namespace Microsoft.AutoGen.Core; +/// +/// Storage implementation for the RegistryState +/// Note: if you really care about the performance and resilience of this you should probably use the distributed runtime and persistent storage through Orleans +/// +public class RegistryStorage(ILogger logger) : IRegistryStorage +{ + /// a property representing the file path to the state file read from configuration + public string FilePath { get; set; } = Environment.GetEnvironmentVariable("AGENTS_REGISTRY") ?? "registry.json"; + protected internal ILogger _logger = logger; + private readonly object _lock = new(); + + public async Task ReadStateAsync(CancellationToken cancellationToken = default) + { + string json; + var state = new AgentsRegistryState(); + if (File.Exists(FilePath)) { json = await File.ReadAllTextAsync(FilePath, cancellationToken).ConfigureAwait(false); } + else + { + _logger.LogWarning("Registry file {FilePath} does not exist, starting with an empty Registry", FilePath); + await WriteStateAsync(state, cancellationToken, true).ConfigureAwait(false); + return state; + } + try + { + state = JsonSerializer.Deserialize(json) ?? new AgentsRegistryState(); + } + catch (Exception e) + { + _logger.LogWarning(e, "Failed to read the Registry from {FilePath}, starting with an empty Registry", FilePath); + await WriteStateAsync(state, cancellationToken, true).ConfigureAwait(false); + } + return state; + } + public async ValueTask WriteStateAsync(AgentsRegistryState state, CancellationToken cancellationToken = default) + { + return await WriteStateAsync(state, cancellationToken, false).ConfigureAwait(false); + } + + private async ValueTask WriteStateAsync(AgentsRegistryState state, CancellationToken cancellationToken = default, bool noRead = false) + { + lock (_lock) + { + var eTag = ""; + // etags for optimistic concurrency control + if (!noRead) + { + // read the current state to get the current ETag + eTag = ReadStateAsync().ConfigureAwait(false).GetAwaiter().GetResult().Etag; + } + else { eTag = state.Etag; } + // if the Etag is null, its a new state + // if both etags are set, they should match or it means that the state has changed since the last read. + if (string.IsNullOrEmpty(state.Etag) || (string.IsNullOrEmpty(eTag)) || (string.Equals(state.Etag, eTag, StringComparison.Ordinal))) + { + state.Etag = Guid.NewGuid().ToString(); + // serialize to JSON and write to file + var json = JsonSerializer.Serialize(state); + File.WriteAllTextAsync(FilePath, json, cancellationToken).ConfigureAwait(false); + return state.Etag; + } + else + { + //TODO - this is probably not the correct behavior to just throw - I presume we want to somehow let the caller know that the state has changed and they need to re-read it + throw new ArgumentException( + "The provided ETag does not match the current ETag. The state has been modified by another request."); + } + } + } +} diff --git a/dotnet/src/Microsoft.AutoGen/Core/UninitializedAgentWorker.cs b/dotnet/src/Microsoft.AutoGen/Core/UninitializedAgentWorker.cs index d8a8f63860ae..28d2c5f44ebc 100644 --- a/dotnet/src/Microsoft.AutoGen/Core/UninitializedAgentWorker.cs +++ b/dotnet/src/Microsoft.AutoGen/Core/UninitializedAgentWorker.cs @@ -10,13 +10,14 @@ public class UninitializedAgentWorker() : IAgentRuntime public IServiceProvider RuntimeServiceProvider => throw new AgentInitalizedIncorrectlyException(AgentNotInitializedMessage); internal const string AgentNotInitializedMessage = "Agent not initialized correctly. An Agent should never be directly intialized - it is always started by the AgentWorker from the Runtime (using the static Initialize() method)."; public ValueTask LoadStateAsync(AgentId agentId, CancellationToken cancellationToken = default) => throw new AgentInitalizedIncorrectlyException(AgentNotInitializedMessage); - public ValueTask RuntimeSendRequestAsync(Agent agent, RpcRequest request, CancellationToken cancellationToken = default) => throw new AgentInitalizedIncorrectlyException(AgentNotInitializedMessage); + public ValueTask RuntimeSendRequestAsync(IAgent agent, RpcRequest request, CancellationToken cancellationToken = default) => throw new AgentInitalizedIncorrectlyException(AgentNotInitializedMessage); public ValueTask RuntimeSendResponseAsync(RpcResponse response, CancellationToken cancellationToken = default) => throw new AgentInitalizedIncorrectlyException(AgentNotInitializedMessage); public ValueTask SaveStateAsync(AgentState value, CancellationToken cancellationToken = default) => throw new AgentInitalizedIncorrectlyException(AgentNotInitializedMessage); public ValueTask> GetSubscriptionsAsync(GetSubscriptionsRequest request, CancellationToken cancellationToken = default) => throw new AgentInitalizedIncorrectlyException(AgentNotInitializedMessage); public ValueTask AddSubscriptionAsync(AddSubscriptionRequest request, CancellationToken cancellationToken = default) => throw new AgentInitalizedIncorrectlyException(AgentNotInitializedMessage); public ValueTask RemoveSubscriptionAsync(RemoveSubscriptionRequest request, CancellationToken cancellationToken = default) => throw new AgentInitalizedIncorrectlyException(AgentNotInitializedMessage); - public ValueTask PublishMessageAsync(IMessage message, TopicId topic, Agent? sender, CancellationToken? cancellationToken = null) => throw new AgentInitalizedIncorrectlyException(AgentNotInitializedMessage); + public ValueTask PublishMessageAsync(IMessage message, TopicId topic, IAgent? sender, CancellationToken? cancellationToken = null) => throw new AgentInitalizedIncorrectlyException(AgentNotInitializedMessage); + public ValueTask RegisterAgentTypeAsync(RegisterAgentTypeRequest request, CancellationToken cancellationToken = default) => throw new AgentInitalizedIncorrectlyException(AgentNotInitializedMessage); public class AgentInitalizedIncorrectlyException(string message) : Exception(message) { } diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Abstractions/IGatewayRegistry.cs b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Abstractions/IGatewayRegistry.cs new file mode 100644 index 000000000000..cb3778418040 --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Abstractions/IGatewayRegistry.cs @@ -0,0 +1,48 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// IGatewayRegistry.cs + +using Microsoft.AutoGen.Contracts; + +namespace Microsoft.AutoGen.Runtime.Grpc.Abstractions; + +/// +/// Interface for managing agent registration, placement, and subscriptions. +/// +public interface IGatewayRegistry : IRegistry +{ + /// + /// Gets or places an agent based on the provided agent ID. + /// + /// The ID of the agent. + /// A tuple containing the worker and a boolean indicating if it's a new placement. + ValueTask<(IGateway? Worker, bool NewPlacement)> GetOrPlaceAgent(AgentId agentId); + + /// + /// Removes a worker from the registry. + /// + /// The worker to remove. + /// A task representing the asynchronous operation. + ValueTask RemoveWorkerAsync(IGateway worker); + + /// + /// Registers a new agent type with the specified worker. + /// + /// The request containing agent type details. + /// The worker to register the agent type with. + /// A task representing the asynchronous operation. + ValueTask RegisterAgentTypeAsync(RegisterAgentTypeRequest request, IGateway worker); + + /// + /// Adds a new worker to the registry. + /// + /// The worker to add. + /// A task representing the asynchronous operation. + ValueTask AddWorkerAsync(IGateway worker); + + /// + /// Gets a compatible worker for the specified agent type. + /// + /// The type of the agent. + /// A task representing the asynchronous operation, with the compatible worker as the result. + ValueTask GetCompatibleWorkerAsync(string type); +} diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Abstractions/IRegistryGrain.cs b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Abstractions/IRegistryGrain.cs index 6a5a8e725ecd..81b59858619c 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Abstractions/IRegistryGrain.cs +++ b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Abstractions/IRegistryGrain.cs @@ -7,5 +7,5 @@ namespace Microsoft.AutoGen.Runtime.Grpc.Abstractions; /// Orleans specific interface, needed to mark the key /// [Alias("Microsoft.AutoGen.Runtime.Grpc.Abstractions.IRegistryGrain")] -public interface IRegistryGrain : IRegistry, IGrainWithIntegerKey +public interface IRegistryGrain : IGatewayRegistry, IGrainWithIntegerKey { } diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Grpc/GrpcGateway.cs b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Grpc/GrpcGateway.cs index 0f730df718f2..26c99c894248 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Grpc/GrpcGateway.cs +++ b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Grpc/GrpcGateway.cs @@ -84,7 +84,7 @@ public async ValueTask RegisterAgentTypeAsync(Registe connection.AddSupportedType(request.Type); _supportedAgentTypes.GetOrAdd(request.Type, _ => []).Add(connection); - await _gatewayRegistry.RegisterAgentType(request, _reference).ConfigureAwait(true); + await _gatewayRegistry.RegisterAgentTypeAsync(request, _reference).ConfigureAwait(true); return new RegisterAgentTypeResponse { Success = true, @@ -128,7 +128,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) { try { - await _gatewayRegistry.AddWorker(_reference); + await _gatewayRegistry.AddWorkerAsync(_reference); } catch (Exception exception) { @@ -138,7 +138,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) } try { - await _gatewayRegistry.RemoveWorker(_reference); + await _gatewayRegistry.RemoveWorkerAsync(_reference); } catch (Exception exception) { @@ -198,7 +198,7 @@ private async ValueTask RegisterAgentTypeAsync(GrpcWorkerConnection connection, connection.AddSupportedType(msg.Type); _supportedAgentTypes.GetOrAdd(msg.Type, _ => []).Add(connection); - await _gatewayRegistry.RegisterAgentType(msg, _reference).ConfigureAwait(true); + await _gatewayRegistry.RegisterAgentTypeAsync(msg, _reference).ConfigureAwait(true); Message response = new() { RegisterAgentTypeResponse = new() @@ -214,7 +214,7 @@ private async ValueTask DispatchEventAsync(CloudEvent evt, CancellationToken can { var registry = _clusterClient.GetGrain(0); //intentionally blocking - var targetAgentTypes = await registry.GetSubscribedAndHandlingAgents(evt.Source, evt.Type).ConfigureAwait(true); + var targetAgentTypes = await registry.GetSubscribedAndHandlingAgentsAsync(evt.Source, evt.Type).ConfigureAwait(true); if (targetAgentTypes is not null && targetAgentTypes.Count > 0) { targetAgentTypes = targetAgentTypes.Distinct().ToList(); @@ -383,7 +383,7 @@ public async ValueTask UnsubscribeAsync(RemoveSubscr } public ValueTask> GetSubscriptionsAsync(GetSubscriptionsRequest request, CancellationToken cancellationToken = default) { - return _gatewayRegistry.GetSubscriptions(request); + return _gatewayRegistry.GetSubscriptionsAsync(request); } async ValueTask IGateway.InvokeRequestAsync(RpcRequest request) { diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/AgentsRegistryState.cs b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/AgentsRegistryState.cs deleted file mode 100644 index 8be5e8dd5873..000000000000 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/AgentsRegistryState.cs +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// AgentsRegistryState.cs - -using Microsoft.AutoGen.Contracts; - -namespace Microsoft.AutoGen.Runtime.Grpc; - -public class AgentsRegistryState -{ - public Dictionary> AgentsToEventsMap { get; set; } = []; - public Dictionary> AgentsToTopicsMap { get; set; } = []; - public Dictionary> TopicToAgentTypesMap { get; set; } = []; - public Dictionary> EventsToAgentTypesMap { get; set; } = []; - public Dictionary> GuidSubscriptionsMap { get; set; } = []; -} diff --git a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/RegistryGrain.cs b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/RegistryGrain.cs index 4129a0bc413b..9de7065fdb62 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/RegistryGrain.cs +++ b/dotnet/src/Microsoft.AutoGen/Runtime.Grpc/Services/Orleans/RegistryGrain.cs @@ -1,13 +1,11 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // RegistryGrain.cs - using Microsoft.AutoGen.Contracts; using Microsoft.AutoGen.Runtime.Grpc.Abstractions; namespace Microsoft.AutoGen.Runtime.Grpc; internal sealed class RegistryGrain([PersistentState("state", "AgentRegistryStore")] IPersistentState state) : Grain, IRegistryGrain { - // TODO: use persistent state for some of these or (better) extend Orleans to implement some of this natively. private readonly Dictionary _workerStates = new(); private readonly Dictionary> _supportedAgentTypes = []; private readonly Dictionary<(string Type, string Key), IGateway> _agentDirectory = []; @@ -18,8 +16,7 @@ public override Task OnActivateAsync(CancellationToken cancellationToken) this.RegisterGrainTimer(static state => state.PurgeInactiveWorkers(), this, TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(30)); return base.OnActivateAsync(cancellationToken); } - - public ValueTask> GetSubscribedAndHandlingAgents(string topic, string eventType) + public ValueTask> GetSubscribedAndHandlingAgentsAsync(string topic, string eventType) { List agents = []; // get all agent types that are subscribed to the topic @@ -83,7 +80,7 @@ public ValueTask> GetSubscribedAndHandlingAgents(string topic, stri } return new((worker, isNewPlacement)); } - public ValueTask RemoveWorker(IGateway worker) + public ValueTask RemoveWorkerAsync(IGateway worker) { if (_workerStates.Remove(worker, out var state)) { @@ -97,7 +94,7 @@ public ValueTask RemoveWorker(IGateway worker) } return ValueTask.CompletedTask; } - public async ValueTask RegisterAgentType(RegisterAgentTypeRequest registration, IGateway gateway) + public async ValueTask RegisterAgentTypeAsync(RegisterAgentTypeRequest registration, IGateway gateway) { if (!_supportedAgentTypes.TryGetValue(registration.Type, out var supportedAgentTypes)) { @@ -111,53 +108,26 @@ public async ValueTask RegisterAgentType(RegisterAgentTypeRequest registration, var workerState = GetOrAddWorker(gateway); workerState.SupportedTypes.Add(registration.Type); - /* future - state.State.AgentsToEventsMap[registration.Type] = new HashSet(registration.Events); - state.State.AgentsToTopicsMap[registration.Type] = new HashSet(registration.Topics); - // construct the inverse map for topics and agent types - foreach (var topic in registration.Topics) - { - if (!state.State.TopicToAgentTypesMap.TryGetValue(topic, out var topicSet)) - { - topicSet = new HashSet(); - state.State.TopicToAgentTypesMap[topic] = topicSet; - } - - topicSet.Add(registration.Type); - } - - // construct the inverse map for events and agent types - foreach (var evt in registration.Events) - { - if (!state.State.EventsToAgentTypesMap.TryGetValue(evt, out var eventSet)) - { - eventSet = new HashSet(); - state.State.EventsToAgentTypesMap[evt] = eventSet; - } - - eventSet.Add(registration.Type); - } - */ await state.WriteStateAsync().ConfigureAwait(false); } - public ValueTask AddWorker(IGateway worker) + public ValueTask AddWorkerAsync(IGateway worker) { GetOrAddWorker(worker); return ValueTask.CompletedTask; } - public ValueTask UnregisterAgentType(string type, IGateway worker) + public async ValueTask UnregisterAgentType(string type, IGateway worker) { - if (_workerStates.TryGetValue(worker, out var state)) + if (_workerStates.TryGetValue(worker, out var workerState)) { - state.SupportedTypes.Remove(type); + workerState.SupportedTypes.Remove(type); } if (_supportedAgentTypes.TryGetValue(type, out var workers)) { workers.Remove(worker); } - return ValueTask.CompletedTask; + await state.WriteStateAsync().ConfigureAwait(false); } private Task PurgeInactiveWorkers() { @@ -190,7 +160,7 @@ private WorkerState GetOrAddWorker(IGateway worker) return workerState; } - public ValueTask GetCompatibleWorker(string type) => new(GetCompatibleWorkerCore(type)); + public ValueTask GetCompatibleWorkerAsync(string type) => new(GetCompatibleWorkerCore(type)); private IGateway? GetCompatibleWorkerCore(string type) { @@ -202,7 +172,6 @@ private WorkerState GetOrAddWorker(IGateway worker) return null; } - public async ValueTask SubscribeAsync(AddSubscriptionRequest subscription) { var guid = Guid.NewGuid().ToString(); @@ -282,32 +251,13 @@ public async ValueTask UnsubscribeAsync(RemoveSubscriptionRequest request) throw new InvalidOperationException("Invalid subscription type"); } } - state.State.GuidSubscriptionsMap.Remove(guid); + state.State.GuidSubscriptionsMap.Remove(guid, out _); } await state.WriteStateAsync().ConfigureAwait(false); } - - public ValueTask> GetSubscriptions(string agentType) - { - var subscriptions = new List(); - if (state.State.AgentsToTopicsMap.TryGetValue(agentType, out var topics)) - { - foreach (var topic in topics) - { - subscriptions.Add(new Subscription - { - TypeSubscription = new TypeSubscription - { - AgentType = agentType, - TopicType = topic - } - }); - } - } - return new(subscriptions); - } - public ValueTask> GetSubscriptions(GetSubscriptionsRequest request) + public ValueTask> GetSubscriptionsAsync(GetSubscriptionsRequest request) { + var _ = request; var subscriptions = new List(); foreach (var kvp in state.State.GuidSubscriptionsMap) { @@ -315,7 +265,17 @@ public ValueTask> GetSubscriptions(GetSubscriptionsRequest re } return new(subscriptions); } - + public ValueTask RegisterAgentTypeAsync(RegisterAgentTypeRequest request, IAgentRuntime worker) + { + var (_, _) = (request, worker); + var e = "RegisterAgentTypeAsync(RegisterAgentTypeRequest request, IAgentRuntime worker) is not implemented when using the Grpc runtime."; + throw new NotImplementedException(e); + } + public ValueTask UnregisterAgentTypeAsync(string type, IAgentRuntime worker) + { + var e = "UnregisterAgentTypeAsync(string type, IAgentRuntime worker) is not implemented when using the Grpc runtime."; + throw new NotImplementedException(e); + } private sealed class WorkerState { public HashSet SupportedTypes { get; set; } = []; diff --git a/dotnet/test/Microsoft.AutoGen.Core.Tests/AgentTests.cs b/dotnet/test/Microsoft.AutoGen.Core.Tests/AgentTests.cs index 33d2a8eed90a..617a0a2b6293 100644 --- a/dotnet/test/Microsoft.AutoGen.Core.Tests/AgentTests.cs +++ b/dotnet/test/Microsoft.AutoGen.Core.Tests/AgentTests.cs @@ -1,18 +1,12 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // AgentTests.cs - -using System.Collections.Concurrent; -using System.Diagnostics; using System.Text.Json; using FluentAssertions; using Google.Protobuf.Reflection; using Microsoft.AutoGen.Contracts; using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.DependencyInjection.Extensions; -using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Xunit; -using static Microsoft.AutoGen.Core.Tests.AgentTests; namespace Microsoft.AutoGen.Core.Tests; @@ -26,8 +20,8 @@ public class AgentTests() [Fact] public async Task Agent_ShouldThrowException_WhenNotInitialized() { - using var runtime = new InMemoryAgentRuntimeFixture(); - var agent = ActivatorUtilities.CreateInstance(runtime.AppHost.Services); + using var fixture = new InMemoryAgentRuntimeFixture(); + var agent = ActivatorUtilities.CreateInstance(fixture.AppHost.Services); await Assert.ThrowsAsync( async () => { @@ -43,12 +37,12 @@ public async Task Agent_ShouldThrowException_WhenNotInitialized() [Fact] public async Task Agent_ShouldInitializeCorrectly() { - var runtime = new InMemoryAgentRuntimeFixture(); - var (worker, agent) = runtime.Start(); - Assert.Equal(nameof(AgentRuntime), worker.GetType().Name); + var fixture = new InMemoryAgentRuntimeFixture(); + var (runtime, agent) = fixture.Start(); + Assert.Equal(nameof(AgentRuntime), runtime.GetType().Name); var subscriptions = await agent.GetSubscriptionsAsync(); Assert.Equal(2, subscriptions.Count); - runtime.Stop(); + fixture.Stop(); } /// /// Test SubscribeAsync method @@ -57,8 +51,8 @@ public async Task Agent_ShouldInitializeCorrectly() [Fact] public async Task SubscribeAsync_UnsubscribeAsync_and_GetSubscriptionsTest() { - var runtime = new InMemoryAgentRuntimeFixture(); - var (_, agent) = runtime.Start(); + var fixture = new InMemoryAgentRuntimeFixture(); + var (_, agent) = fixture.Start(); await agent.SubscribeAsync("TestEvent"); await Task.Delay(100); var subscriptions = await agent.GetSubscriptionsAsync().ConfigureAwait(true); @@ -83,7 +77,7 @@ public async Task SubscribeAsync_UnsubscribeAsync_and_GetSubscriptionsTest() } } Assert.False(found); - runtime.Stop(); + fixture.Stop(); } /// @@ -93,8 +87,8 @@ public async Task SubscribeAsync_UnsubscribeAsync_and_GetSubscriptionsTest() [Fact] public async Task StoreAsync_and_ReadAsyncTest() { - var runtime = new InMemoryAgentRuntimeFixture(); - var (_, agent) = runtime.Start(); + var fixture = new InMemoryAgentRuntimeFixture(); + var (_, agent) = fixture.Start(); Dictionary state = new() { { "testdata", "Active" } @@ -108,7 +102,7 @@ await agent.StoreAsync(new AgentState var read = JsonSerializer.Deserialize>(readState.TextData) ?? new Dictionary { { "data", "No state data found" } }; read.TryGetValue("testdata", out var value); Assert.Equal("Active", value); - runtime.Stop(); + fixture.Stop(); } /// @@ -118,8 +112,8 @@ await agent.StoreAsync(new AgentState [Fact] public async Task PublishMessageAsync_and_ReceiveMessageTest() { - var runtime = new InMemoryAgentRuntimeFixture(); - var (_, agent) = runtime.Start(); + var fixture = new InMemoryAgentRuntimeFixture(); + var (_, agent) = fixture.Start(); var topicType = "TestTopic"; await agent.SubscribeAsync(topicType).ConfigureAwait(true); var subscriptions = await agent.GetSubscriptionsAsync().ConfigureAwait(true); @@ -139,17 +133,15 @@ await agent.PublishMessageAsync(new TextMessage() }, topicType).ConfigureAwait(true); await Task.Delay(100); Assert.True(TestAgent.ReceivedMessages.ContainsKey(topicType)); - runtime.Stop(); + fixture.Stop(); } [Fact] public async Task InvokeCorrectHandler() { var agent = new TestAgent(new AgentsMetadata(TypeRegistry.Empty, new Dictionary(), new Dictionary>(), new Dictionary>()), new Logger(new LoggerFactory())); - await agent.HandleObjectAsync("hello world"); await agent.HandleObjectAsync(42); - agent.ReceivedItems.Should().HaveCount(2); agent.ReceivedItems[0].Should().Be("hello world"); agent.ReceivedItems[1].Should().Be(42); @@ -176,88 +168,9 @@ await client.PublishMessageAsync(new TextMessage() TestAgent.ReceivedMessages[nameof(DelegateMessageToTestAgentAsync)].Should().NotBeNull(); } - /// - /// The test agent is a simple agent that is used for testing purposes. - /// - public class TestAgent( - [FromKeyedServices("AgentsMetadata")] AgentsMetadata eventTypes, - Logger? logger = null) : Agent(eventTypes, logger), IHandle - { - public Task Handle(TextMessage item, CancellationToken cancellationToken = default) - { - ReceivedMessages[item.Source] = item.TextMessage_; - return Task.CompletedTask; - } - public Task Handle(string item) - { - ReceivedItems.Add(item); - return Task.CompletedTask; - } - public Task Handle(int item) - { - ReceivedItems.Add(item); - return Task.CompletedTask; - } - public List ReceivedItems { get; private set; } = []; - - /// - /// Key: source - /// Value: message - /// - public static ConcurrentDictionary ReceivedMessages { get; private set; } = new(); - } -} - -/// -/// InMemoryAgentRuntimeFixture - provides a fixture for the agent runtime. -/// -/// -/// This fixture is used to provide a runtime for the agent tests. -/// However, it is shared between tests. So operations from one test can affect another. -/// -public sealed class InMemoryAgentRuntimeFixture : IDisposable -{ - public InMemoryAgentRuntimeFixture() - { - var builder = new HostApplicationBuilder(); - builder.Services.TryAddSingleton(DistributedContextPropagator.Current); - builder.AddAgentWorker() - .AddAgent(nameof(TestAgent)); - AppHost = builder.Build(); - AppHost.StartAsync().Wait(); - } - public IHost AppHost { get; } - - /// - /// Start - starts the agent - /// - /// IAgentWorker, TestAgent - public (IAgentRuntime, TestAgent) Start() + [CollectionDefinition(Name)] + public sealed class ClusterFixtureCollection : ICollectionFixture { - var agent = ActivatorUtilities.CreateInstance(AppHost.Services); - var worker = AppHost.Services.GetRequiredService(); - Agent.Initialize(worker, agent); - return (worker, agent); + public const string Name = nameof(ClusterFixtureCollection); } - /// - /// Stop - stops the agent and ensures cleanup - /// - public void Stop() - { - AppHost?.StopAsync().GetAwaiter().GetResult(); - } - - /// - /// Dispose - Ensures cleanup after each test - /// - public void Dispose() - { - Stop(); - } -} - -[CollectionDefinition(Name)] -public sealed class ClusterFixtureCollection : ICollectionFixture -{ - public const string Name = nameof(ClusterFixtureCollection); } diff --git a/dotnet/test/Microsoft.AutoGen.Core.Tests/InMemoryAgentRuntimeFixture.cs b/dotnet/test/Microsoft.AutoGen.Core.Tests/InMemoryAgentRuntimeFixture.cs new file mode 100644 index 000000000000..4231e3fb5c82 --- /dev/null +++ b/dotnet/test/Microsoft.AutoGen.Core.Tests/InMemoryAgentRuntimeFixture.cs @@ -0,0 +1,55 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// InMemoryAgentRuntimeFixture.cs +using System.Diagnostics; +using Microsoft.AutoGen.Contracts; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; +using Microsoft.Extensions.Hosting; +namespace Microsoft.AutoGen.Core.Tests; +/// +/// InMemoryAgentRuntimeFixture - provides a fixture for the agent runtime. +/// +/// +/// This fixture is used to provide a runtime for the agent tests. +/// However, it is shared between tests. So operations from one test can affect another. +/// +public sealed class InMemoryAgentRuntimeFixture : IDisposable +{ + public InMemoryAgentRuntimeFixture() + { + var builder = new HostApplicationBuilder(); + builder.Services.TryAddSingleton(DistributedContextPropagator.Current); + builder.AddAgentWorker() + .AddAgent(nameof(TestAgent)); + AppHost = builder.Build(); + AppHost.StartAsync().Wait(); + } + public IHost AppHost { get; } + + /// + /// Start - starts the agent + /// + /// IAgentWorker, TestAgent + public (IAgentRuntime, TestAgent) Start() + { + var agent = ActivatorUtilities.CreateInstance(AppHost.Services); + var worker = AppHost.Services.GetRequiredService(); + Agent.Initialize(worker, agent); + return (worker, agent); + } + /// + /// Stop - stops the agent and ensures cleanup + /// + public void Stop() + { + AppHost?.StopAsync().GetAwaiter().GetResult(); + } + + /// + /// Dispose - Ensures cleanup after each test + /// + public void Dispose() + { + Stop(); + } +} diff --git a/dotnet/test/Microsoft.AutoGen.Core.Tests/RegistryStorageTests.cs b/dotnet/test/Microsoft.AutoGen.Core.Tests/RegistryStorageTests.cs new file mode 100644 index 000000000000..f1024cefcd2e --- /dev/null +++ b/dotnet/test/Microsoft.AutoGen.Core.Tests/RegistryStorageTests.cs @@ -0,0 +1,111 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// RegistryStorageTests.cs +using System.Collections.Concurrent; +using System.Text.Json; +using Microsoft.AutoGen.Contracts; +using Microsoft.Extensions.Logging; +using Moq; +using Xunit; + +namespace Microsoft.AutoGen.Core.Tests; + +public class RegistryStorageTests +{ + private readonly Mock> _loggerMock; + private readonly RegistryStorage _registryStorage; + + public RegistryStorageTests() + { + _loggerMock = new Mock>(); + _registryStorage = new RegistryStorage(_loggerMock.Object) + { + FilePath = "test_registry.json" + }; + } + + [Fact] + public async Task ReadStateAsync_ShouldReturnEmptyState_WhenFileDoesNotExist() + { + // Arrange + if (File.Exists(_registryStorage.FilePath)) + { + File.Delete(_registryStorage.FilePath); + } + + // Act + var state = await _registryStorage.ReadStateAsync(); + + // Assert + Assert.NotNull(state); + Assert.Empty(state.AgentTypes); + } + + [Fact] + public async Task ReadStateAsync_ShouldReturnState_WhenFileExists() + { + // Arrange + if (File.Exists(_registryStorage.FilePath)) + { + File.Delete(_registryStorage.FilePath); + } + var agentType = "agent1"; + var expectedState = new AgentsRegistryState + { + Etag = Guid.NewGuid().ToString(), + AgentTypes = new ConcurrentDictionary + { + [agentType] = new AgentId() { Type = agentType, Key = Guid.NewGuid().ToString() } + } + }; + var json = JsonSerializer.Serialize(expectedState); + await File.WriteAllTextAsync(_registryStorage.FilePath, json); + + // Act + var state = await _registryStorage.ReadStateAsync(); + + // Assert + Assert.NotNull(state); + Assert.Single(state.AgentTypes); + Assert.Equal(agentType, state.AgentTypes.Keys.First()); + } + + [Fact] + public async Task WriteStateAsync_ShouldWriteStateToFile() + { + // Arrange + if (File.Exists(_registryStorage.FilePath)) + { + File.Delete(_registryStorage.FilePath); + } + var agentType = "agent1"; + var state = await _registryStorage.ReadStateAsync(); + state.AgentTypes.TryAdd(agentType, new AgentId() { Type = agentType, Key = Guid.NewGuid().ToString() }); + + // Act + await _registryStorage.WriteStateAsync(state); + + // Assert + var json = await File.ReadAllTextAsync(_registryStorage.FilePath); + var writtenState = JsonSerializer.Deserialize(json); + Assert.NotNull(writtenState); + Assert.Single(writtenState.AgentTypes); + Assert.Equal(agentType, writtenState.AgentTypes.Keys.First()); + } + + [Fact] + public async Task WriteStateAsync_ShouldThrowException_WhenETagMismatch() + { + // Arrange + // Arrange + if (File.Exists(_registryStorage.FilePath)) + { + File.Delete(_registryStorage.FilePath); + } + var initialState = await _registryStorage.ReadStateAsync(); + + var newState = new AgentsRegistryState { Etag = "mismatch" }; + + // Act & Assert + await Assert.ThrowsAsync(async () => await _registryStorage.WriteStateAsync(newState)); + } +} diff --git a/dotnet/test/Microsoft.AutoGen.Core.Tests/RegistryTests.cs b/dotnet/test/Microsoft.AutoGen.Core.Tests/RegistryTests.cs new file mode 100644 index 000000000000..a6938b342b44 --- /dev/null +++ b/dotnet/test/Microsoft.AutoGen.Core.Tests/RegistryTests.cs @@ -0,0 +1,160 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// RegistryTests.cs + +using System.Collections.Concurrent; +using Microsoft.AutoGen.Contracts; +using Microsoft.Extensions.Logging; +using Moq; +using Xunit; + +namespace Microsoft.AutoGen.Core.Tests; + +public class RegistryTests +{ + private readonly Mock _storageMock; + private readonly Mock> _loggerMock; + private readonly Registry _registry; + + public RegistryTests() + { + _storageMock = new Mock(); + _loggerMock = new Mock>(); + _storageMock.Setup(s => s.ReadStateAsync(It.IsAny())).ReturnsAsync(new AgentsRegistryState()); + _registry = new Registry(_storageMock.Object, _loggerMock.Object); + } + + [Fact] + public async Task GetSubscribedAndHandlingAgentsAsync_ShouldReturnEmptyList_WhenNoAgentsSubscribed() + { + // Arrange + var state = new AgentsRegistryState + { + TopicToAgentTypesMap = new ConcurrentDictionary>() + }; + _storageMock.Setup(s => s.ReadStateAsync(CancellationToken.None)).ReturnsAsync(state); + + // Act + var agents = await _registry.GetSubscribedAndHandlingAgentsAsync("topic", "eventType"); + + // Assert + Assert.Empty(agents); + } + + [Fact] + public async Task GetSubscribedAndHandlingAgentsAsync_ShouldReturnAgents_WhenAgentsSubscribed() + { + // Arrange + var agent1 = "agent1"; + var agent2 = "agent2"; + var topic = "topic"; + var request = new AddSubscriptionRequest + { + Subscription = new Subscription + { + TypeSubscription = new TypeSubscription + { + AgentType = agent1, + TopicType = topic + } + } + }; + await _registry.SubscribeAsync(request); + request.Subscription.TypeSubscription.AgentType = agent2; + await _registry.SubscribeAsync(request); + + // Act + var agents = await _registry.GetSubscribedAndHandlingAgentsAsync(topic, "eventType"); + + // Assert + Assert.Equal(2, agents.Count); + Assert.Contains(agent1, agents); + Assert.Contains(agent2, agents); + } + + [Fact] + public async Task RegisterAgentTypeAsync_ShouldAddAgentType() + { + // Arrange + var agentTypeName = "TestAgent"; + var agentType = Type.GetType(agentTypeName); + + var request = new RegisterAgentTypeRequest { Type = agentTypeName }; + var fixture = new InMemoryAgentRuntimeFixture(); + var (runtime, agent) = fixture.Start(); + + // Act + await _registry.RegisterAgentTypeAsync(request, runtime); + + // Assert + _storageMock.Verify(s => s.WriteStateAsync(It.IsAny(), It.IsAny()), Times.Once); + Assert.Contains(agentTypeName, _registry.State.AgentTypes.Keys); + fixture.Stop(); + } + + [Fact] + public async Task SubscribeAsync_ShouldAddSubscription() + { + // Arrange + var request = new AddSubscriptionRequest + { + Subscription = new Subscription + { + TypeSubscription = new TypeSubscription + { + AgentType = "TestAgent", + TopicType = "TestTopic" + } + } + }; + + // Act + await _registry.SubscribeAsync(request); + + // Assert + _storageMock.Verify(s => s.WriteStateAsync(It.IsAny(), It.IsAny()), Times.Once); + Assert.Contains("TestAgent", _registry.State.AgentsToTopicsMap.Keys); + Assert.Contains("TestTopic", _registry.State.TopicToAgentTypesMap.Keys); + } + + [Fact] + public async Task UnsubscribeAsync_ShouldFail_WhenRequestIsInvalid() + { + // Arrange + var request = new RemoveSubscriptionRequest { Id = "invalid-guid" }; + + // Act + var exception = await Assert.ThrowsAsync(async () => await _registry.UnsubscribeAsync(request).AsTask()); + } + + [Fact] + public async Task UnsubscribeAsync_ShouldRemoveSubscription() + { + // Arrange + var topic = "TestTopic1"; + var agent = "TestAgent1"; + var request = new AddSubscriptionRequest + { + Subscription = new Subscription + { + TypeSubscription = new TypeSubscription + { + AgentType = agent, + TopicType = topic + } + } + }; + await _registry.SubscribeAsync(request); + var subscriptions = await _registry.GetSubscriptionsAsync(new GetSubscriptionsRequest()); + var subscriptionId = subscriptions.Where( + s => s.TypeSubscription.AgentType == agent && s.TypeSubscription.TopicType == topic + ).Select(s => s.Id).FirstOrDefault(); + var removeRequest = new RemoveSubscriptionRequest { Id = subscriptionId }; + + //Act + await _registry.UnsubscribeAsync(removeRequest); + subscriptions = await _registry.GetSubscriptionsAsync(new GetSubscriptionsRequest()); + + // Assert subscriptions doesn't match for id + Assert.DoesNotContain(subscriptionId, subscriptions.Select(s => s.Id)); + } +} diff --git a/dotnet/test/Microsoft.AutoGen.Core.Tests/TestAgent.cs b/dotnet/test/Microsoft.AutoGen.Core.Tests/TestAgent.cs new file mode 100644 index 000000000000..28b1334ba618 --- /dev/null +++ b/dotnet/test/Microsoft.AutoGen.Core.Tests/TestAgent.cs @@ -0,0 +1,38 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// TestAgent.cs + +using System.Collections.Concurrent; +using Microsoft.AutoGen.Contracts; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +namespace Microsoft.AutoGen.Core.Tests; +/// +/// The test agent is a simple agent that is used for testing purposes. +/// +public class TestAgent( + [FromKeyedServices("AgentsMetadata")] AgentsMetadata eventTypes, + Logger? logger = null) : Agent(eventTypes, logger), IHandle +{ + public Task Handle(TextMessage item, CancellationToken cancellationToken = default) + { + ReceivedMessages[item.Source] = item.TextMessage_; + return Task.CompletedTask; + } + public Task Handle(string item) + { + ReceivedItems.Add(item); + return Task.CompletedTask; + } + public Task Handle(int item) + { + ReceivedItems.Add(item); + return Task.CompletedTask; + } + public List ReceivedItems { get; private set; } = []; + + /// + /// Key: source + /// Value: message + /// + public static ConcurrentDictionary ReceivedMessages { get; private set; } = new(); +}