Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

.Net: SK Process Cloud Events - Publish Interface abstractions scaffolding #10222

Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright (c) Microsoft. All rights reserved.

using System.Threading.Tasks;

namespace Microsoft.SemanticKernel;

/// <summary>
/// An interface that provides a channel for emitting external messages from a step.
/// In addition provide common methods like initialization and Uninitialization
/// </summary>
public interface IExternalKernelProcessMessageChannel
{
/// <summary>
/// Initialization of the external messaging channel used
/// </summary>
/// <returns>A <see cref="ValueTask"/></returns>
public abstract ValueTask Initialize();

/// <summary>
/// Uninitialization of the external messaging channel used
/// </summary>
/// <returns>A <see cref="ValueTask"/></returns>
public abstract ValueTask Uninitialize();

/// <summary>
/// Emits the specified event from the step outside the SK process
/// </summary>
/// <param name="externalTopicEvent">name of the topic to be used externally as the event name</param>
/// <param name="eventData">data to be transmitted externally</param>
/// <returns></returns>
public abstract Task EmitExternalEventAsync(string externalTopicEvent, object? eventData);
esttenorio marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,10 @@ public abstract class KernelProcessContext
/// </summary>
/// <returns>A <see cref="Task{T}"/> where T is <see cref="KernelProcess"/></returns>
public abstract Task<KernelProcess> GetStateAsync();

/// <summary>
/// Gets the instance of <see cref="IExternalKernelProcessMessageChannel"/> used for external messages
/// </summary>
/// <returns></returns>
public abstract Task<IExternalKernelProcessMessageChannel?> GetExternalMessageChannelAsync();
esttenorio marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,21 @@ namespace Microsoft.SemanticKernel;
public sealed class KernelProcessStepContext
{
private readonly IKernelProcessMessageChannel _stepMessageChannel;
private readonly IExternalKernelProcessMessageChannel? _externalMessageChannel;

/// <summary>
/// Initializes a new instance of the <see cref="KernelProcessStepContext"/> class.
/// </summary>
/// <param name="channel">An instance of <see cref="IKernelProcessMessageChannel"/>.</param>
public KernelProcessStepContext(IKernelProcessMessageChannel channel)
/// <param name="externalMessageChannel">An instance of <see cref="IExternalKernelProcessMessageChannel"/></param>
public KernelProcessStepContext(IKernelProcessMessageChannel channel, IExternalKernelProcessMessageChannel? externalMessageChannel = null)
{
this._stepMessageChannel = channel;
this._externalMessageChannel = externalMessageChannel;
}

/// <summary>
/// Emit an event from the current step.
/// Emit an SK process event from the current step.
/// </summary>
/// <param name="processEvent">An instance of <see cref="KernelProcessEvent"/> to be emitted from the <see cref="KernelProcessStep"/></param>
/// <returns>A <see cref="ValueTask"/></returns>
Expand All @@ -31,7 +34,7 @@ public ValueTask EmitEventAsync(KernelProcessEvent processEvent)
}

/// <summary>
/// Emit an event from the current step with a simplified method signature.
/// Emit an SK process event from the current step with a simplified method signature.
/// </summary>
/// <param name="eventId"></param>
/// <param name="data"></param>
Expand All @@ -52,4 +55,22 @@ public ValueTask EmitEventAsync(
Visibility = visibility
});
}

/// <summary>
/// Emit an external event to through a <see cref="IExternalKernelProcessMessageChannel"/>
/// component if connected from within the SK process
/// </summary>
/// <param name="topicName"></param>
/// <param name="processEventData"></param>
/// <returns></returns>
/// <exception cref="KernelException"></exception>
public async Task EmitExternalEventAsync(string topicName, object? processEventData = null)
{
if (this._externalMessageChannel == null)
{
throw new KernelException($"External message channel not configured for step with topic {topicName}");
}

await this._externalMessageChannel.EmitExternalEventAsync(topicName, processEventData).ConfigureAwait(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using Microsoft.AspNetCore.Mvc;
using Microsoft.SemanticKernel;
using Microsoft.SemanticKernel.Process.Serialization;
using SemanticKernel.Process.IntegrationTests.CloudEvents;

namespace SemanticKernel.Process.IntegrationTests.Controllers;

Expand Down Expand Up @@ -72,6 +73,23 @@ public async Task<IActionResult> GetProcessAsync(string processId)
return this.Ok(daprProcess);
}

/// <summary>
/// Retrieves current state of the MockCloudEventClient used in the running process
/// </summary>
/// <param name="processId">The Id of the process.</param>
/// <param name="cloudClient">Mock Cloud client ingested via dependency injection</param>
/// <returns></returns>
[HttpGet("processes/{processId}/mockCloudClient")]
public Task<IActionResult> GetMockCloudClient(string processId, MockCloudEventClient cloudClient)
{
if (!s_processes.TryGetValue(processId, out DaprKernelProcessContext? context))
{
return Task.FromResult<IActionResult>(this.NotFound());
}

return Task.FromResult<IActionResult>(this.Ok(cloudClient));
}

/// <summary>
/// Checks the health of the Dapr runtime by attempting to send a message to a health actor.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

using Microsoft.SemanticKernel;
using SemanticKernel.Process.IntegrationTests;
using SemanticKernel.Process.IntegrationTests.CloudEvents;

var builder = WebApplication.CreateBuilder(args);

Expand All @@ -15,6 +16,10 @@
// Configure the Kernel with DI. This is required for dependency injection to work with processes.
builder.Services.AddKernel();

// Configure IExternalKernelProcessMessageChannel used for testing purposes
builder.Services.AddSingleton<IExternalKernelProcessMessageChannel>(MockCloudEventClient.Instance);
builder.Services.AddSingleton(MockCloudEventClient.Instance);

// Configure Dapr
builder.Services.AddActors(static options =>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using Microsoft.SemanticKernel;
using Microsoft.SemanticKernel.Process;
using Microsoft.SemanticKernel.Process.Serialization;
using SemanticKernel.Process.IntegrationTests.CloudEvents;

namespace SemanticKernel.Process.IntegrationTests;
internal sealed class DaprTestProcessContext : KernelProcessContext
Expand Down Expand Up @@ -68,4 +69,14 @@ public override Task StopAsync()
{
throw new NotImplementedException();
}

public override async Task<IExternalKernelProcessMessageChannel?> GetExternalMessageChannelAsync()
{
var response = await this._httpClient.GetFromJsonAsync<MockCloudEventClient>($"http://localhost:5200/processes/{this._processId}/mockCloudClient", options: this._serializerOptions);
return response switch
{
null => throw new InvalidOperationException("Process not found"),
_ => response
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,11 @@ private async Task WaitForHostStartupAsync()
/// <param name="process">The process to start.</param>
/// <param name="kernel">An instance of <see cref="Kernel"/></param>
/// <param name="initialEvent">An optional initial event.</param>
/// <param name="externalMessageChannel">channel used for external messages</param>
/// <returns>A <see cref="Task{KernelProcessContext}"/></returns>
public async Task<KernelProcessContext> StartProcessAsync(KernelProcess process, Kernel kernel, KernelProcessEvent initialEvent)
public async Task<KernelProcessContext> StartProcessAsync(KernelProcess process, Kernel kernel, KernelProcessEvent initialEvent, IExternalKernelProcessMessageChannel? externalMessageChannel = null)
{
// Actual Kernel injection of Kernel and ExternalKernelProcessMessageChannel is in dotnet\src\Experimental\Process.IntegrationTestHost.Dapr\Program.cs
var context = new DaprTestProcessContext(process, this._httpClient!);
await context.StartWithEventAsync(initialEvent);
return context;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ public class ProcessTestFixture
/// <param name="process">The process to start.</param>
/// <param name="kernel">An instance of <see cref="Kernel"/></param>
/// <param name="initialEvent">An optional initial event.</param>
/// <param name="externalMessageChannel">channel used for external messages</param>
/// <returns>A <see cref="Task{KernelProcessContext}"/></returns>
public async Task<KernelProcessContext> StartProcessAsync(KernelProcess process, Kernel kernel, KernelProcessEvent initialEvent)
public async Task<KernelProcessContext> StartProcessAsync(KernelProcess process, Kernel kernel, KernelProcessEvent initialEvent, IExternalKernelProcessMessageChannel? externalMessageChannel = null)
{
return await process.StartAsync(kernel, initialEvent);
return await process.StartAsync(kernel, initialEvent, externalMessageChannel);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright (c) Microsoft. All rights reserved.

using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.SemanticKernel;

namespace SemanticKernel.Process.IntegrationTests.CloudEvents;
/// <summary>
/// Class used for testing purposes to mock emitting external cloud events
/// </summary>
public class MockCloudEventClient : IExternalKernelProcessMessageChannel
{
/// <summary>
/// Initialization counter for testing
/// </summary>
public int InitializationCounter { get; set; } = 0;
/// <summary>
/// Uninitialization counter for testing
/// </summary>
public int UninitializationCounter { get; set; } = 0;
/// <summary>
/// Captures cloud events emitted for testing
/// </summary>
public List<MockCloudEventData> CloudEvents { get; set; } = [];

private static MockCloudEventClient? s_instance = null;

/// <summary>
/// Instance of <see cref="MockCloudEventClient"/> when used as singleton
/// </summary>
public static MockCloudEventClient Instance
{
get
{
return s_instance ??= new MockCloudEventClient();
}
}

/// <inheritdoc/>
public Task EmitExternalEventAsync(string externalTopicEvent, object? eventData)
{
if (eventData != null)
{
this.CloudEvents.Add(new() { TopicName = externalTopicEvent, Data = (string)eventData });
}

return Task.CompletedTask;
}

/// <inheritdoc/>
public ValueTask Initialize()
{
this.InitializationCounter++;
return ValueTask.CompletedTask;
}

/// <inheritdoc/>
public ValueTask Uninitialize()
{
this.UninitializationCounter++;
return ValueTask.CompletedTask;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright (c) Microsoft. All rights reserved.

namespace SemanticKernel.Process.IntegrationTests.CloudEvents;

/// <summary>
/// Mock cloud event data used for testing purposes only
/// </summary>
public class MockCloudEventData
{
/// <summary>
/// Name of the mock topic
/// </summary>
public required string TopicName { get; set; }

/// <summary>
/// Data emitted in the mock cloud event
/// </summary>
public string? Data { get; set; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright (c) Microsoft. All rights reserved.

using System.Threading.Tasks;
using Microsoft.SemanticKernel;

namespace SemanticKernel.Process.IntegrationTests;

#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member

/// <summary>
/// A step that emits messages externally
/// </summary>
public sealed class MockProxyStep : KernelProcessStep
{
public static class FunctionNames
{
public const string OnRepeatMessage = nameof(OnRepeatMessage);
public const string OnEchoMessage = nameof(OnEchoMessage);
}

public static class TopicNames
{
public const string RepeatExternalTopic = nameof(RepeatExternalTopic);
public const string EchoExternalTopic = nameof(EchoExternalTopic);
}

[KernelFunction(FunctionNames.OnRepeatMessage)]
public async Task OnRepeatMessageAsync(KernelProcessStepContext context, string message)
{
await context.EmitExternalEventAsync(TopicNames.RepeatExternalTopic, message);
}

[KernelFunction(FunctionNames.OnEchoMessage)]
public async Task OnEchoMessageAsync(KernelProcessStepContext context, string message)
{
await context.EmitExternalEventAsync(TopicNames.EchoExternalTopic, message);
}
}

#pragma warning restore CS1591 // Missing XML comment for publicly visible type or member
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
<ItemGroup>
<Compile Include="$(RepoRoot)/dotnet/src/Experimental/Process.IntegrationTests.Shared/TestSettings/**/*.cs" Link="%(RecursiveDir)%(Filename)%(Extension)" />
<Compile Include="$(RepoRoot)/dotnet/src/Experimental/Process.IntegrationTests.Shared/**/*Tests.cs" Link="%(RecursiveDir)%(Filename)%(Extension)" />
<Compile Include="$(RepoRoot)/dotnet/src/Experimental/Process.IntegrationTests.Shared/CloudEvents/*.cs" Link="%(RecursiveDir)%(Filename)%(Extension)" />
</ItemGroup>
</Project>
Loading
Loading