diff --git a/dotnet/src/Experimental/Process.Abstractions/IKernelExternalProcessMessageChannel.cs b/dotnet/src/Experimental/Process.Abstractions/IKernelExternalProcessMessageChannel.cs new file mode 100644 index 000000000000..eaf9b0e92859 --- /dev/null +++ b/dotnet/src/Experimental/Process.Abstractions/IKernelExternalProcessMessageChannel.cs @@ -0,0 +1,32 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Threading.Tasks; + +namespace Microsoft.SemanticKernel; + +/// <summary> +/// An interface that provides a channel for emitting external messages from a step. +/// In addition provide common methods like initialization and Uninitialization +/// </summary> +public interface IExternalKernelProcessMessageChannel +{ + /// <summary> + /// Initialization of the external messaging channel used + /// </summary> + /// <returns>A <see cref="ValueTask"/></returns> + public abstract ValueTask Initialize(); + + /// <summary> + /// Uninitialization of the external messaging channel used + /// </summary> + /// <returns>A <see cref="ValueTask"/></returns> + public abstract ValueTask Uninitialize(); + + /// <summary> + /// Emits the specified event from the step outside the SK process + /// </summary> + /// <param name="externalTopicEvent">name of the topic to be used externally as the event name</param> + /// <param name="eventData">data to be transmitted externally</param> + /// <returns></returns> + public abstract Task EmitExternalEventAsync(string externalTopicEvent, object? eventData); +} diff --git a/dotnet/src/Experimental/Process.Abstractions/KernelProcessContext.cs b/dotnet/src/Experimental/Process.Abstractions/KernelProcessContext.cs index 6495eecbfdec..59f420642f0a 100644 --- a/dotnet/src/Experimental/Process.Abstractions/KernelProcessContext.cs +++ b/dotnet/src/Experimental/Process.Abstractions/KernelProcessContext.cs @@ -27,4 +27,10 @@ public abstract class KernelProcessContext /// </summary> /// <returns>A <see cref="Task{T}"/> where T is <see cref="KernelProcess"/></returns> public abstract Task<KernelProcess> GetStateAsync(); + + /// <summary> + /// Gets the instance of <see cref="IExternalKernelProcessMessageChannel"/> used for external messages + /// </summary> + /// <returns></returns> + public abstract Task<IExternalKernelProcessMessageChannel?> GetExternalMessageChannelAsync(); } diff --git a/dotnet/src/Experimental/Process.Abstractions/KernelProcessStepContext.cs b/dotnet/src/Experimental/Process.Abstractions/KernelProcessStepContext.cs index 6dfac0412d29..9beadf7b9896 100644 --- a/dotnet/src/Experimental/Process.Abstractions/KernelProcessStepContext.cs +++ b/dotnet/src/Experimental/Process.Abstractions/KernelProcessStepContext.cs @@ -10,18 +10,21 @@ namespace Microsoft.SemanticKernel; public sealed class KernelProcessStepContext { private readonly IKernelProcessMessageChannel _stepMessageChannel; + private readonly IExternalKernelProcessMessageChannel? _externalMessageChannel; /// <summary> /// Initializes a new instance of the <see cref="KernelProcessStepContext"/> class. /// </summary> /// <param name="channel">An instance of <see cref="IKernelProcessMessageChannel"/>.</param> - public KernelProcessStepContext(IKernelProcessMessageChannel channel) + /// <param name="externalMessageChannel">An instance of <see cref="IExternalKernelProcessMessageChannel"/></param> + public KernelProcessStepContext(IKernelProcessMessageChannel channel, IExternalKernelProcessMessageChannel? externalMessageChannel = null) { this._stepMessageChannel = channel; + this._externalMessageChannel = externalMessageChannel; } /// <summary> - /// Emit an event from the current step. + /// Emit an SK process event from the current step. /// </summary> /// <param name="processEvent">An instance of <see cref="KernelProcessEvent"/> to be emitted from the <see cref="KernelProcessStep"/></param> /// <returns>A <see cref="ValueTask"/></returns> @@ -31,7 +34,7 @@ public ValueTask EmitEventAsync(KernelProcessEvent processEvent) } /// <summary> - /// Emit an event from the current step with a simplified method signature. + /// Emit an SK process event from the current step with a simplified method signature. /// </summary> /// <param name="eventId"></param> /// <param name="data"></param> @@ -52,4 +55,22 @@ public ValueTask EmitEventAsync( Visibility = visibility }); } + + /// <summary> + /// Emit an external event to through a <see cref="IExternalKernelProcessMessageChannel"/> + /// component if connected from within the SK process + /// </summary> + /// <param name="topicName"></param> + /// <param name="processEventData"></param> + /// <returns></returns> + /// <exception cref="KernelException"></exception> + public async Task EmitExternalEventAsync(string topicName, object? processEventData = null) + { + if (this._externalMessageChannel == null) + { + throw new KernelException($"External message channel not configured for step with topic {topicName}"); + } + + await this._externalMessageChannel.EmitExternalEventAsync(topicName, processEventData).ConfigureAwait(false); + } } diff --git a/dotnet/src/Experimental/Process.IntegrationTestHost.Dapr/Controllers/ProcessTestController.cs b/dotnet/src/Experimental/Process.IntegrationTestHost.Dapr/Controllers/ProcessTestController.cs index 187500e15dee..4df90bdd20a1 100644 --- a/dotnet/src/Experimental/Process.IntegrationTestHost.Dapr/Controllers/ProcessTestController.cs +++ b/dotnet/src/Experimental/Process.IntegrationTestHost.Dapr/Controllers/ProcessTestController.cs @@ -5,6 +5,7 @@ using Microsoft.AspNetCore.Mvc; using Microsoft.SemanticKernel; using Microsoft.SemanticKernel.Process.Serialization; +using SemanticKernel.Process.IntegrationTests.CloudEvents; namespace SemanticKernel.Process.IntegrationTests.Controllers; @@ -72,6 +73,23 @@ public async Task<IActionResult> GetProcessAsync(string processId) return this.Ok(daprProcess); } + /// <summary> + /// Retrieves current state of the MockCloudEventClient used in the running process + /// </summary> + /// <param name="processId">The Id of the process.</param> + /// <param name="cloudClient">Mock Cloud client ingested via dependency injection</param> + /// <returns></returns> + [HttpGet("processes/{processId}/mockCloudClient")] + public Task<IActionResult> GetMockCloudClient(string processId, MockCloudEventClient cloudClient) + { + if (!s_processes.TryGetValue(processId, out DaprKernelProcessContext? context)) + { + return Task.FromResult<IActionResult>(this.NotFound()); + } + + return Task.FromResult<IActionResult>(this.Ok(cloudClient)); + } + /// <summary> /// Checks the health of the Dapr runtime by attempting to send a message to a health actor. /// </summary> diff --git a/dotnet/src/Experimental/Process.IntegrationTestHost.Dapr/Program.cs b/dotnet/src/Experimental/Process.IntegrationTestHost.Dapr/Program.cs index d1d66f317d50..6d3789bb2047 100644 --- a/dotnet/src/Experimental/Process.IntegrationTestHost.Dapr/Program.cs +++ b/dotnet/src/Experimental/Process.IntegrationTestHost.Dapr/Program.cs @@ -2,6 +2,7 @@ using Microsoft.SemanticKernel; using SemanticKernel.Process.IntegrationTests; +using SemanticKernel.Process.IntegrationTests.CloudEvents; var builder = WebApplication.CreateBuilder(args); @@ -15,6 +16,10 @@ // Configure the Kernel with DI. This is required for dependency injection to work with processes. builder.Services.AddKernel(); +// Configure IExternalKernelProcessMessageChannel used for testing purposes +builder.Services.AddSingleton<IExternalKernelProcessMessageChannel>(MockCloudEventClient.Instance); +builder.Services.AddSingleton(MockCloudEventClient.Instance); + // Configure Dapr builder.Services.AddActors(static options => { diff --git a/dotnet/src/Experimental/Process.IntegrationTestRunner.Dapr/DaprTestProcessContext.cs b/dotnet/src/Experimental/Process.IntegrationTestRunner.Dapr/DaprTestProcessContext.cs index 59401b1c2979..e7ca6292b16f 100644 --- a/dotnet/src/Experimental/Process.IntegrationTestRunner.Dapr/DaprTestProcessContext.cs +++ b/dotnet/src/Experimental/Process.IntegrationTestRunner.Dapr/DaprTestProcessContext.cs @@ -5,6 +5,7 @@ using Microsoft.SemanticKernel; using Microsoft.SemanticKernel.Process; using Microsoft.SemanticKernel.Process.Serialization; +using SemanticKernel.Process.IntegrationTests.CloudEvents; namespace SemanticKernel.Process.IntegrationTests; internal sealed class DaprTestProcessContext : KernelProcessContext @@ -68,4 +69,14 @@ public override Task StopAsync() { throw new NotImplementedException(); } + + public override async Task<IExternalKernelProcessMessageChannel?> GetExternalMessageChannelAsync() + { + var response = await this._httpClient.GetFromJsonAsync<MockCloudEventClient>($"http://localhost:5200/processes/{this._processId}/mockCloudClient", options: this._serializerOptions); + return response switch + { + null => throw new InvalidOperationException("Process not found"), + _ => response + }; + } } diff --git a/dotnet/src/Experimental/Process.IntegrationTestRunner.Dapr/ProcessTestFixture.cs b/dotnet/src/Experimental/Process.IntegrationTestRunner.Dapr/ProcessTestFixture.cs index fa35cf1fe0fa..c6f55eb95f69 100644 --- a/dotnet/src/Experimental/Process.IntegrationTestRunner.Dapr/ProcessTestFixture.cs +++ b/dotnet/src/Experimental/Process.IntegrationTestRunner.Dapr/ProcessTestFixture.cs @@ -121,9 +121,11 @@ private async Task WaitForHostStartupAsync() /// <param name="process">The process to start.</param> /// <param name="kernel">An instance of <see cref="Kernel"/></param> /// <param name="initialEvent">An optional initial event.</param> + /// <param name="externalMessageChannel">channel used for external messages</param> /// <returns>A <see cref="Task{KernelProcessContext}"/></returns> - public async Task<KernelProcessContext> StartProcessAsync(KernelProcess process, Kernel kernel, KernelProcessEvent initialEvent) + public async Task<KernelProcessContext> StartProcessAsync(KernelProcess process, Kernel kernel, KernelProcessEvent initialEvent, IExternalKernelProcessMessageChannel? externalMessageChannel = null) { + // Actual Kernel injection of Kernel and ExternalKernelProcessMessageChannel is in dotnet\src\Experimental\Process.IntegrationTestHost.Dapr\Program.cs var context = new DaprTestProcessContext(process, this._httpClient!); await context.StartWithEventAsync(initialEvent); return context; diff --git a/dotnet/src/Experimental/Process.IntegrationTestRunner.Local/ProcessTestFixture.cs b/dotnet/src/Experimental/Process.IntegrationTestRunner.Local/ProcessTestFixture.cs index 7fb4f7d72393..cbe202fdd7e0 100644 --- a/dotnet/src/Experimental/Process.IntegrationTestRunner.Local/ProcessTestFixture.cs +++ b/dotnet/src/Experimental/Process.IntegrationTestRunner.Local/ProcessTestFixture.cs @@ -17,9 +17,10 @@ public class ProcessTestFixture /// <param name="process">The process to start.</param> /// <param name="kernel">An instance of <see cref="Kernel"/></param> /// <param name="initialEvent">An optional initial event.</param> + /// <param name="externalMessageChannel">channel used for external messages</param> /// <returns>A <see cref="Task{KernelProcessContext}"/></returns> - public async Task<KernelProcessContext> StartProcessAsync(KernelProcess process, Kernel kernel, KernelProcessEvent initialEvent) + public async Task<KernelProcessContext> StartProcessAsync(KernelProcess process, Kernel kernel, KernelProcessEvent initialEvent, IExternalKernelProcessMessageChannel? externalMessageChannel = null) { - return await process.StartAsync(kernel, initialEvent); + return await process.StartAsync(kernel, initialEvent, externalMessageChannel); } } diff --git a/dotnet/src/Experimental/Process.IntegrationTests.Resources/CloudEvents/MockCloudEventClient.cs b/dotnet/src/Experimental/Process.IntegrationTests.Resources/CloudEvents/MockCloudEventClient.cs new file mode 100644 index 000000000000..317a2fe545d2 --- /dev/null +++ b/dotnet/src/Experimental/Process.IntegrationTests.Resources/CloudEvents/MockCloudEventClient.cs @@ -0,0 +1,63 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Collections.Generic; +using System.Threading.Tasks; +using Microsoft.SemanticKernel; + +namespace SemanticKernel.Process.IntegrationTests.CloudEvents; +/// <summary> +/// Class used for testing purposes to mock emitting external cloud events +/// </summary> +public class MockCloudEventClient : IExternalKernelProcessMessageChannel +{ + /// <summary> + /// Initialization counter for testing + /// </summary> + public int InitializationCounter { get; set; } = 0; + /// <summary> + /// Uninitialization counter for testing + /// </summary> + public int UninitializationCounter { get; set; } = 0; + /// <summary> + /// Captures cloud events emitted for testing + /// </summary> + public List<MockCloudEventData> CloudEvents { get; set; } = []; + + private static MockCloudEventClient? s_instance = null; + + /// <summary> + /// Instance of <see cref="MockCloudEventClient"/> when used as singleton + /// </summary> + public static MockCloudEventClient Instance + { + get + { + return s_instance ??= new MockCloudEventClient(); + } + } + + /// <inheritdoc/> + public Task EmitExternalEventAsync(string externalTopicEvent, object? eventData) + { + if (eventData != null) + { + this.CloudEvents.Add(new() { TopicName = externalTopicEvent, Data = (string)eventData }); + } + + return Task.CompletedTask; + } + + /// <inheritdoc/> + public ValueTask Initialize() + { + this.InitializationCounter++; + return ValueTask.CompletedTask; + } + + /// <inheritdoc/> + public ValueTask Uninitialize() + { + this.UninitializationCounter++; + return ValueTask.CompletedTask; + } +} diff --git a/dotnet/src/Experimental/Process.IntegrationTests.Resources/CloudEvents/MockCloudEventData.cs b/dotnet/src/Experimental/Process.IntegrationTests.Resources/CloudEvents/MockCloudEventData.cs new file mode 100644 index 000000000000..97dd18e9de2d --- /dev/null +++ b/dotnet/src/Experimental/Process.IntegrationTests.Resources/CloudEvents/MockCloudEventData.cs @@ -0,0 +1,19 @@ +// Copyright (c) Microsoft. All rights reserved. + +namespace SemanticKernel.Process.IntegrationTests.CloudEvents; + +/// <summary> +/// Mock cloud event data used for testing purposes only +/// </summary> +public class MockCloudEventData +{ + /// <summary> + /// Name of the mock topic + /// </summary> + public required string TopicName { get; set; } + + /// <summary> + /// Data emitted in the mock cloud event + /// </summary> + public string? Data { get; set; } +} diff --git a/dotnet/src/Experimental/Process.IntegrationTests.Resources/ProcessCloudEventsResources.cs b/dotnet/src/Experimental/Process.IntegrationTests.Resources/ProcessCloudEventsResources.cs new file mode 100644 index 000000000000..e54388269e1e --- /dev/null +++ b/dotnet/src/Experimental/Process.IntegrationTests.Resources/ProcessCloudEventsResources.cs @@ -0,0 +1,40 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Threading.Tasks; +using Microsoft.SemanticKernel; + +namespace SemanticKernel.Process.IntegrationTests; + +#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member + +/// <summary> +/// A step that emits messages externally +/// </summary> +public sealed class MockProxyStep : KernelProcessStep +{ + public static class FunctionNames + { + public const string OnRepeatMessage = nameof(OnRepeatMessage); + public const string OnEchoMessage = nameof(OnEchoMessage); + } + + public static class TopicNames + { + public const string RepeatExternalTopic = nameof(RepeatExternalTopic); + public const string EchoExternalTopic = nameof(EchoExternalTopic); + } + + [KernelFunction(FunctionNames.OnRepeatMessage)] + public async Task OnRepeatMessageAsync(KernelProcessStepContext context, string message) + { + await context.EmitExternalEventAsync(TopicNames.RepeatExternalTopic, message); + } + + [KernelFunction(FunctionNames.OnEchoMessage)] + public async Task OnEchoMessageAsync(KernelProcessStepContext context, string message) + { + await context.EmitExternalEventAsync(TopicNames.EchoExternalTopic, message); + } +} + +#pragma warning restore CS1591 // Missing XML comment for publicly visible type or member diff --git a/dotnet/src/Experimental/Process.IntegrationTests.Shared/Process.IntegrationTests.Shared.props b/dotnet/src/Experimental/Process.IntegrationTests.Shared/Process.IntegrationTests.Shared.props index 9c4c35980463..b0be78e43a06 100644 --- a/dotnet/src/Experimental/Process.IntegrationTests.Shared/Process.IntegrationTests.Shared.props +++ b/dotnet/src/Experimental/Process.IntegrationTests.Shared/Process.IntegrationTests.Shared.props @@ -2,5 +2,6 @@ <ItemGroup> <Compile Include="$(RepoRoot)/dotnet/src/Experimental/Process.IntegrationTests.Shared/TestSettings/**/*.cs" Link="%(RecursiveDir)%(Filename)%(Extension)" /> <Compile Include="$(RepoRoot)/dotnet/src/Experimental/Process.IntegrationTests.Shared/**/*Tests.cs" Link="%(RecursiveDir)%(Filename)%(Extension)" /> + <Compile Include="$(RepoRoot)/dotnet/src/Experimental/Process.IntegrationTests.Shared/CloudEvents/*.cs" Link="%(RecursiveDir)%(Filename)%(Extension)" /> </ItemGroup> </Project> \ No newline at end of file diff --git a/dotnet/src/Experimental/Process.IntegrationTests.Shared/ProcessCloudEventsTests.cs b/dotnet/src/Experimental/Process.IntegrationTests.Shared/ProcessCloudEventsTests.cs new file mode 100644 index 000000000000..ee262b50f7e9 --- /dev/null +++ b/dotnet/src/Experimental/Process.IntegrationTests.Shared/ProcessCloudEventsTests.cs @@ -0,0 +1,113 @@ +// Copyright (c) Microsoft. All rights reserved. + +#pragma warning disable IDE0005 // Using directive is unnecessary. +using System; +using System.Linq; +using System.Runtime.Serialization; +using System.Threading.Tasks; +using Microsoft.Extensions.Configuration; +using Microsoft.SemanticKernel; +using SemanticKernel.IntegrationTests.TestSettings; +using SemanticKernel.Process.IntegrationTests.CloudEvents; +using Xunit; +#pragma warning restore IDE0005 // Using directive is unnecessary. + +namespace SemanticKernel.Process.IntegrationTests; + +/// <summary> +/// Integration tests for processes. +/// </summary> +[Collection(nameof(ProcessTestGroup))] +public sealed class ProcessCloudEventsTests : IClassFixture<ProcessTestFixture> +{ + private readonly ProcessTestFixture _fixture; + private readonly IKernelBuilder _kernelBuilder = Kernel.CreateBuilder(); + private readonly IConfigurationRoot _configuration = new ConfigurationBuilder() + .AddJsonFile(path: "testsettings.json", optional: true, reloadOnChange: true) + .AddJsonFile(path: "testsettings.development.json", optional: true, reloadOnChange: true) + .AddEnvironmentVariables() + .AddUserSecrets<OpenAIConfiguration>() + .Build(); + + private readonly IExternalKernelProcessMessageChannel _externalMessageChannel = MockCloudEventClient.Instance; + + /// <summary> + /// Initializes a new instance of the <see cref="ProcessTests"/> class. This is called by the test framework. + /// </summary> + /// <param name="fixture"></param> + public ProcessCloudEventsTests(ProcessTestFixture fixture) + { + this._fixture = fixture; + } + + /// <summary> + /// Tests a simple linear process with two steps and no sub processes. + /// </summary> + /// <returns>A <see cref="Task"/></returns> + [Fact] + public async Task LinearProcessWithCloudEventSubscribersAsync() + { + // Arrange + OpenAIConfiguration configuration = this._configuration.GetSection("OpenAI").Get<OpenAIConfiguration>()!; + this._kernelBuilder.AddOpenAIChatCompletion( + modelId: configuration.ModelId!, + apiKey: configuration.ApiKey); + + Kernel kernel = this._kernelBuilder.Build(); + var process = this.CreateLinearProcess("SimpleWithCloudEvents").Build(); + + // Act + string testInput = "Test"; + var processHandle = await this._fixture.StartProcessAsync(process, kernel, new() { Id = ProcessTestsEvents.StartProcess, Data = testInput }, this._externalMessageChannel); + var externalMessageChannel = await processHandle.GetExternalMessageChannelAsync(); + + // Assert + Assert.NotNull(externalMessageChannel); + var mockClient = (MockCloudEventClient)externalMessageChannel; + Assert.NotNull(mockClient); + Assert.True(mockClient.InitializationCounter > 0); + Assert.Equal(2, mockClient.CloudEvents.Count); + Assert.Equal(testInput, mockClient.CloudEvents[0].Data); + Assert.Equal(MockProxyStep.TopicNames.EchoExternalTopic, mockClient.CloudEvents[0].TopicName); + Assert.Equal($"{testInput} {testInput}", mockClient.CloudEvents[1].Data); + Assert.Equal(MockProxyStep.TopicNames.RepeatExternalTopic, mockClient.CloudEvents[1].TopicName); + } + + /// <summary> + /// Creates a simple linear process with two steps and a proxy step to emit events externally<br/> + /// Input Event: <see cref="ProcessTestsEvents.StartProcess"/><br/> + /// Output Events: [<see cref="ProcessTestsEvents.OutputReadyInternal"/>, <see cref="ProcessTestsEvents.OutputReadyPublic"/>]<br/> + /// <code> + /// ┌────────┐ ┌────────┐ + /// │ echo ├─┬─►│ repeat ├───┐ + /// └────────┘ │ └────────┘ │ + /// │ │ + /// │ ┌───────┐ │ + /// └─►│ proxy │◄───┘ + /// └───────┘ + /// </code> + /// </summary> + private ProcessBuilder CreateLinearProcess(string name) + { + var processBuilder = new ProcessBuilder(name); + var echoStep = processBuilder.AddStepFromType<EchoStep>(); + var repeatStep = processBuilder.AddStepFromType<RepeatStep>(); + var proxyStep = processBuilder.AddStepFromType<MockProxyStep>(); + + processBuilder.OnInputEvent(ProcessTestsEvents.StartProcess) + .SendEventTo(new ProcessFunctionTargetBuilder(echoStep)); + + echoStep.OnFunctionResult(nameof(EchoStep.Echo)) + .SendEventTo(new ProcessFunctionTargetBuilder(repeatStep, parameterName: "message")); + + echoStep + .OnFunctionResult() + .SendEventTo(new ProcessFunctionTargetBuilder(proxyStep, functionName: MockProxyStep.FunctionNames.OnEchoMessage)); + + repeatStep + .OnEvent(ProcessTestsEvents.OutputReadyInternal) + .SendEventTo(new ProcessFunctionTargetBuilder(proxyStep, functionName: MockProxyStep.FunctionNames.OnRepeatMessage)); + + return processBuilder; + } +} diff --git a/dotnet/src/Experimental/Process.IntegrationTests.Shared/ProcessTestFixture.cs b/dotnet/src/Experimental/Process.IntegrationTests.Shared/ProcessTestFixture.cs index 1fd11bef274b..90dabb3c4bcd 100644 --- a/dotnet/src/Experimental/Process.IntegrationTests.Shared/ProcessTestFixture.cs +++ b/dotnet/src/Experimental/Process.IntegrationTests.Shared/ProcessTestFixture.cs @@ -17,6 +17,7 @@ public abstract class ProcessTestFixture /// <param name="process">The process to start.</param> /// <param name="kernel">An instance of <see cref="Kernel"/></param> /// <param name="initialEvent">An optional initial event.</param> + /// <param name="externalMessageChannel">channel used for external messages</param> /// <returns>A <see cref="Task{KernelProcessContext}"/></returns> - public abstract Task<KernelProcessContext> StartProcessAsync(KernelProcess process, Kernel kernel, KernelProcessEvent initialEvent); + public abstract Task<KernelProcessContext> StartProcessAsync(KernelProcess process, Kernel kernel, KernelProcessEvent initialEvent, IExternalKernelProcessMessageChannel? externalMessageChannel = null); } diff --git a/dotnet/src/Experimental/Process.LocalRuntime/LocalKernelProcessContext.cs b/dotnet/src/Experimental/Process.LocalRuntime/LocalKernelProcessContext.cs index b59dd70211f4..9ddf0c4074cb 100644 --- a/dotnet/src/Experimental/Process.LocalRuntime/LocalKernelProcessContext.cs +++ b/dotnet/src/Experimental/Process.LocalRuntime/LocalKernelProcessContext.cs @@ -13,18 +13,17 @@ public sealed class LocalKernelProcessContext : KernelProcessContext, IDisposabl private readonly LocalProcess _localProcess; private readonly Kernel _kernel; - internal LocalKernelProcessContext(KernelProcess process, Kernel kernel, ProcessEventProxy? eventProxy = null) + internal LocalKernelProcessContext(KernelProcess process, Kernel kernel, ProcessEventProxy? eventProxy = null, IExternalKernelProcessMessageChannel? externalMessageChannel = null) { Verify.NotNull(process, nameof(process)); Verify.NotNull(kernel, nameof(kernel)); Verify.NotNullOrWhiteSpace(process.State?.Name); this._kernel = kernel; - this._localProcess = new LocalProcess( - process, - kernel) + this._localProcess = new LocalProcess(process, kernel) { - EventProxy = eventProxy + EventProxy = eventProxy, + ExternalMessageChannel = externalMessageChannel, }; } @@ -55,4 +54,10 @@ public override Task SendEventAsync(KernelProcessEvent processEvent) => /// Disposes of the resources used by the process. /// </summary> public void Dispose() => this._localProcess.Dispose(); + + /// <inheritdoc/> + public override Task<IExternalKernelProcessMessageChannel?> GetExternalMessageChannelAsync() + { + return Task.FromResult(this._localProcess.ExternalMessageChannel); + } } diff --git a/dotnet/src/Experimental/Process.LocalRuntime/LocalKernelProcessFactory.cs b/dotnet/src/Experimental/Process.LocalRuntime/LocalKernelProcessFactory.cs index 4904366c9d39..eac8826b37a5 100644 --- a/dotnet/src/Experimental/Process.LocalRuntime/LocalKernelProcessFactory.cs +++ b/dotnet/src/Experimental/Process.LocalRuntime/LocalKernelProcessFactory.cs @@ -15,12 +15,13 @@ public static class LocalKernelProcessFactory /// <param name="process">Required: The <see cref="KernelProcess"/> to start running.</param> /// <param name="kernel">Required: An instance of <see cref="Kernel"/></param> /// <param name="initialEvent">Required: The initial event to start the process.</param> + /// <param name="externalMessageChannel">Optional: an instance of <see cref="IExternalKernelProcessMessageChannel"/>.</param> /// <returns>An instance of <see cref="KernelProcess"/> that can be used to interrogate or stop the running process.</returns> - public static async Task<LocalKernelProcessContext> StartAsync(this KernelProcess process, Kernel kernel, KernelProcessEvent initialEvent) + public static async Task<LocalKernelProcessContext> StartAsync(this KernelProcess process, Kernel kernel, KernelProcessEvent initialEvent, IExternalKernelProcessMessageChannel? externalMessageChannel = null) { Verify.NotNull(initialEvent, nameof(initialEvent)); - LocalKernelProcessContext processContext = new(process, kernel); + LocalKernelProcessContext processContext = new(process, kernel, null, externalMessageChannel); await processContext.StartWithEventAsync(initialEvent).ConfigureAwait(false); return processContext; } diff --git a/dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs b/dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs index b7a6695996f4..aea736ceced0 100644 --- a/dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs +++ b/dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs @@ -211,6 +211,7 @@ private ValueTask InitializeProcessAsync() { ParentProcessId = this.Id, EventProxy = this.EventProxy, + ExternalMessageChannel = this.ExternalMessageChannel, }; } diff --git a/dotnet/src/Experimental/Process.LocalRuntime/LocalStep.cs b/dotnet/src/Experimental/Process.LocalRuntime/LocalStep.cs index 2fe9287bafda..c95ba287d0db 100644 --- a/dotnet/src/Experimental/Process.LocalRuntime/LocalStep.cs +++ b/dotnet/src/Experimental/Process.LocalRuntime/LocalStep.cs @@ -80,6 +80,8 @@ public LocalStep(KernelProcessStepInfo stepInfo, Kernel kernel, string? parentPr /// </summary> internal ProcessEventProxy? EventProxy { get; init; } + internal IExternalKernelProcessMessageChannel? ExternalMessageChannel { get; init; } + /// <summary> /// Retrieves all events that have been emitted by this step in the previous superstep. /// </summary> @@ -231,6 +233,13 @@ internal virtual async Task HandleMessageAsync(ProcessMessage message) /// <exception cref="KernelException"></exception> protected virtual async ValueTask InitializeStepAsync() { + if (this.ExternalMessageChannel != null) + { + // initialize external message channel + // TODO: in LocalRuntime need to ensure initialization only happens once + await this.ExternalMessageChannel.Initialize().ConfigureAwait(false); + } + // Instantiate an instance of the inner step object KernelProcessStep stepInstance = (KernelProcessStep)ActivatorUtilities.CreateInstance(this._kernel.Services, this._stepInfo.InnerStepType); var kernelPlugin = KernelPluginFactory.CreateFromObject(stepInstance, pluginName: this._stepInfo.State.Name); @@ -242,7 +251,7 @@ protected virtual async ValueTask InitializeStepAsync() } // Initialize the input channels - this._initialInputs = this.FindInputChannels(this._functions, this._logger); + this._initialInputs = this.FindInputChannels(this._functions, this._logger, this.ExternalMessageChannel); this._inputs = this._initialInputs.ToDictionary(kvp => kvp.Key, kvp => kvp.Value?.ToDictionary(kvp => kvp.Key, kvp => kvp.Value)); // Activate the step with user-defined state if needed diff --git a/dotnet/src/Experimental/Process.Runtime.Dapr/Actors/ExternalMessageBufferActor.cs b/dotnet/src/Experimental/Process.Runtime.Dapr/Actors/ExternalMessageBufferActor.cs new file mode 100644 index 000000000000..52b16051e070 --- /dev/null +++ b/dotnet/src/Experimental/Process.Runtime.Dapr/Actors/ExternalMessageBufferActor.cs @@ -0,0 +1,39 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Threading.Tasks; +using Dapr.Actors.Runtime; + +namespace Microsoft.SemanticKernel; + +/// <summary> +/// An actor that represents en external event messaging buffer. +/// </summary> +internal sealed class ExternalMessageBufferActor : Actor, IExternalMessageBuffer +{ + private readonly IExternalKernelProcessMessageChannel _externalMessageChannel; + + /// <summary> + /// Required constructor for Dapr Actor. + /// </summary> + /// <param name="host">The actor host.</param> + /// <param name="externalMessageChannel">Instance of <see cref="IExternalKernelProcessMessageChannel"/></param> + public ExternalMessageBufferActor(ActorHost host, IExternalKernelProcessMessageChannel externalMessageChannel) : base(host) + { + this._externalMessageChannel = externalMessageChannel; + } + + public async Task EmitExternalEventAsync(string externalTopicEvent, object? eventData) + { + await this._externalMessageChannel.EmitExternalEventAsync(externalTopicEvent, eventData).ConfigureAwait(false); + } + + protected override async Task OnDeactivateAsync() + { + await this._externalMessageChannel.Uninitialize().ConfigureAwait(false); + } + + protected override async Task OnActivateAsync() + { + await this._externalMessageChannel.Initialize().ConfigureAwait(false); + } +} diff --git a/dotnet/src/Experimental/Process.Runtime.Dapr/Actors/ExternalMessageBufferActorWrapper.cs b/dotnet/src/Experimental/Process.Runtime.Dapr/Actors/ExternalMessageBufferActorWrapper.cs new file mode 100644 index 000000000000..5de54a277d20 --- /dev/null +++ b/dotnet/src/Experimental/Process.Runtime.Dapr/Actors/ExternalMessageBufferActorWrapper.cs @@ -0,0 +1,43 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Threading.Tasks; + +namespace Microsoft.SemanticKernel; + +/// <summary> +/// Class used to allow using <see cref="IExternalEventBuffer"/> as <see cref="IExternalKernelProcessMessageChannel"/> +/// in SK Process shared abstractions +/// </summary> +public class ExternalMessageBufferActorWrapper : IExternalKernelProcessMessageChannel +{ + private readonly IExternalMessageBuffer _actor; + + /// <summary> + /// Constructor to wrap <see cref="IExternalMessageBuffer"/> as <see cref="IExternalKernelProcessMessageChannel"/> + /// </summary> + /// <param name="actor">The actor host.</param> + public ExternalMessageBufferActorWrapper(IExternalMessageBuffer actor) + { + this._actor = actor; + } + + /// <inheritdoc cref="IExternalMessageBuffer.EmitExternalEventAsync(string, object?)"/> + public async Task EmitExternalEventAsync(string externalTopicEvent, object? eventData) + { + await this._actor.EmitExternalEventAsync(externalTopicEvent, eventData).ConfigureAwait(false); + } + + /// <inheritdoc/> + public ValueTask Initialize() + { + // When using Dapr initialization is already taken care of by Dapr Actors + throw new System.NotImplementedException(); + } + + /// <inheritdoc/> + public ValueTask Uninitialize() + { + // When using Dapr uninitialization is already taken care of by Dapr Actors + throw new System.NotImplementedException(); + } +} diff --git a/dotnet/src/Experimental/Process.Runtime.Dapr/Actors/StepActor.cs b/dotnet/src/Experimental/Process.Runtime.Dapr/Actors/StepActor.cs index f5445bdf0afc..479687f97077 100644 --- a/dotnet/src/Experimental/Process.Runtime.Dapr/Actors/StepActor.cs +++ b/dotnet/src/Experimental/Process.Runtime.Dapr/Actors/StepActor.cs @@ -338,8 +338,15 @@ protected virtual async ValueTask ActivateStepAsync() this._functions.Add(f.Name, f); } + // Creating external process channel actor to be used for external messaging by some steps + IExternalKernelProcessMessageChannel? externalMessageChannelActor = null; + var scopedExternalMessageBufferId = this.ScopedActorId(new ActorId(this.Id.GetId())); + var actor = this.ProxyFactory.CreateActorProxy<IExternalMessageBuffer>(scopedExternalMessageBufferId, nameof(ExternalMessageBufferActor)); + externalMessageChannelActor = new ExternalMessageBufferActorWrapper(actor); + // Initialize the input channels - this._initialInputs = this.FindInputChannels(this._functions, this._logger); + // TODO: Issue #10328 Cloud Events - new Step type dedicated to work as Proxy Step abstraction https://github.com/microsoft/semantic-kernel/issues/10328 + this._initialInputs = this.FindInputChannels(this._functions, this._logger, externalMessageChannelActor); this._inputs = this._initialInputs.ToDictionary(kvp => kvp.Key, kvp => kvp.Value?.ToDictionary(kvp => kvp.Key, kvp => kvp.Value)); // Activate the step with user-defined state if needed diff --git a/dotnet/src/Experimental/Process.Runtime.Dapr/DaprKernelProcessContext.cs b/dotnet/src/Experimental/Process.Runtime.Dapr/DaprKernelProcessContext.cs index f09fa4f39222..b7425516863a 100644 --- a/dotnet/src/Experimental/Process.Runtime.Dapr/DaprKernelProcessContext.cs +++ b/dotnet/src/Experimental/Process.Runtime.Dapr/DaprKernelProcessContext.cs @@ -67,4 +67,10 @@ public override async Task<KernelProcess> GetStateAsync() var daprProcessInfo = await this._daprProcess.GetProcessInfoAsync().ConfigureAwait(false); return daprProcessInfo.ToKernelProcess(); } + + /// <inheritdoc/> + public override Task<IExternalKernelProcessMessageChannel?> GetExternalMessageChannelAsync() + { + throw new NotImplementedException(); + } } diff --git a/dotnet/src/Experimental/Process.Runtime.Dapr/Interfaces/IExternalMessageBuffer.cs b/dotnet/src/Experimental/Process.Runtime.Dapr/Interfaces/IExternalMessageBuffer.cs new file mode 100644 index 000000000000..5db64dbd6f68 --- /dev/null +++ b/dotnet/src/Experimental/Process.Runtime.Dapr/Interfaces/IExternalMessageBuffer.cs @@ -0,0 +1,25 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Threading.Tasks; +using Dapr.Actors; + +namespace Microsoft.SemanticKernel; + +// estenori-note: +// for some reason dapr doesn't like if instead public interface IExternalMessageBuffer : IActor, IExternalKernelProcessMessageChannelBase +// instead defining the interface component is necessary. To make it compatible with shared components a "casting" to IExternalKernelProcessMessageChannelEmitter +// is added in StepActor logic to make use of FindInputChannels + +/// <summary> +/// An interface for <see cref="IExternalKernelProcessMessageChannel"/> +/// </summary> +public interface IExternalMessageBuffer : IActor +{ + /// <summary> + /// Emits external events outside of the SK process + /// </summary> + /// <param name="externalTopicEvent"></param> + /// <param name="eventData"></param> + /// <returns></returns> + abstract Task EmitExternalEventAsync(string externalTopicEvent, object? eventData); +} diff --git a/dotnet/src/Experimental/Process.Runtime.Dapr/KernelProcessDaprExtensions.cs b/dotnet/src/Experimental/Process.Runtime.Dapr/KernelProcessDaprExtensions.cs index 52f86899d608..ad65b7f89c4f 100644 --- a/dotnet/src/Experimental/Process.Runtime.Dapr/KernelProcessDaprExtensions.cs +++ b/dotnet/src/Experimental/Process.Runtime.Dapr/KernelProcessDaprExtensions.cs @@ -22,5 +22,6 @@ public static void AddProcessActors(this ActorRuntimeOptions actorOptions) actorOptions.Actors.RegisterActor<EventBufferActor>(); actorOptions.Actors.RegisterActor<MessageBufferActor>(); actorOptions.Actors.RegisterActor<ExternalEventBufferActor>(); + actorOptions.Actors.RegisterActor<ExternalMessageBufferActor>(); } } diff --git a/dotnet/src/InternalUtilities/process/Abstractions/StepExtensions.cs b/dotnet/src/InternalUtilities/process/Abstractions/StepExtensions.cs index ab74689a33db..fead79cde844 100644 --- a/dotnet/src/InternalUtilities/process/Abstractions/StepExtensions.cs +++ b/dotnet/src/InternalUtilities/process/Abstractions/StepExtensions.cs @@ -101,9 +101,14 @@ public static void InitializeUserState(this KernelProcessStepState stateObject, /// <param name="channel">The source channel to evaluate</param> /// <param name="functions">A dictionary of KernelFunction instances.</param> /// <param name="logger">An instance of <see cref="ILogger"/>.</param> + /// <param name="externalMessageChannel">An instance of <see cref="IExternalKernelProcessMessageChannel"/></param> /// <returns><see cref="Dictionary{TKey, TValue}"/></returns> /// <exception cref="InvalidOperationException"></exception> - public static Dictionary<string, Dictionary<string, object?>?> FindInputChannels(this IKernelProcessMessageChannel channel, Dictionary<string, KernelFunction> functions, ILogger? logger) + public static Dictionary<string, Dictionary<string, object?>?> FindInputChannels( + this IKernelProcessMessageChannel channel, + Dictionary<string, KernelFunction> functions, + ILogger? logger, + IExternalKernelProcessMessageChannel? externalMessageChannel = null) { if (functions is null) { @@ -126,7 +131,7 @@ public static void InitializeUserState(this KernelProcessStepState stateObject, // and are instantiated here. if (param.ParameterType == typeof(KernelProcessStepContext)) { - inputs[kvp.Key]![param.Name] = new KernelProcessStepContext(channel); + inputs[kvp.Key]![param.Name] = new KernelProcessStepContext(channel, externalMessageChannel); } else {