From 6d5171b7ab1f9479cb1a6a42f4f83925d248c999 Mon Sep 17 00:00:00 2001 From: Kosta Petan Date: Sat, 7 Dec 2024 17:55:03 +0100 Subject: [PATCH] add TestGrpcClient --- .../GrpcGatewayServiceTests.cs | 58 +++++++++++++------ .../Helpers/Grpc/TestGrpcClient.cs | 36 ++++++++++++ 2 files changed, 75 insertions(+), 19 deletions(-) create mode 100644 dotnet/test/Microsoft.AutoGen.Runtime.Grpc.Tests/Helpers/Grpc/TestGrpcClient.cs diff --git a/dotnet/test/Microsoft.AutoGen.Runtime.Grpc.Tests/GrpcGatewayServiceTests.cs b/dotnet/test/Microsoft.AutoGen.Runtime.Grpc.Tests/GrpcGatewayServiceTests.cs index f6800bf5a205..a303d175ef7a 100644 --- a/dotnet/test/Microsoft.AutoGen.Runtime.Grpc.Tests/GrpcGatewayServiceTests.cs +++ b/dotnet/test/Microsoft.AutoGen.Runtime.Grpc.Tests/GrpcGatewayServiceTests.cs @@ -27,13 +27,10 @@ public async Task Test_OpenChannel() var logger = Mock.Of>(); var gateway = new GrpcGateway(_fixture.Cluster.Client, logger); var service = new GrpcGatewayService(gateway); - var callContext = TestServerCallContext.Create(); - - using var requestStream = new TestAsyncStreamReader(callContext); - using var responseStream = new TestServerStreamWriter(callContext); - + using var client = new TestGrpcClient(); + gateway.WorkersCount.Should().Be(0); - await service.OpenChannel(requestStream, responseStream, callContext); + await service.OpenChannel(client.RequestStream, client.ResponseStream, client.CallContext); gateway.WorkersCount.Should().Be(1); } @@ -43,30 +40,53 @@ public async Task Test_Message_Exchange_Through_Gateway() var logger = Mock.Of>(); var gateway = new GrpcGateway(_fixture.Cluster.Client, logger); var service = new GrpcGatewayService(gateway); - var callContext = TestServerCallContext.Create(); - - using var requestStream = new TestAsyncStreamReader(callContext); - using var responseStream = new TestServerStreamWriter(callContext); + using var client = new TestGrpcClient(); var assembly = typeof(PBAgent).Assembly; var eventTypes = ReflectionHelper.GetAgentsMetadata(assembly); - await service.OpenChannel(requestStream, responseStream, callContext); - var responseMessage = await responseStream.ReadNextAsync(); + await service.OpenChannel(client.RequestStream, client.ResponseStream, client.CallContext); + var responseMessage = await client.ReadNext(); - await service.RegisterAgent(CreateRegistrationRequest(eventTypes, typeof(PBAgent), responseMessage!.Response.RequestId), callContext); - await service.RegisterAgent(CreateRegistrationRequest(eventTypes, typeof(GMAgent), responseMessage!.Response.RequestId), callContext); + var connectionId = responseMessage!.Response.RequestId; - var inputEvent = new NewMessageReceived { Message = "Hello" }.ToCloudEvent("gh-gh-gh"); + await service.RegisterAgent(CreateRegistrationRequest(eventTypes, typeof(PBAgent), connectionId), client.CallContext); + await service.RegisterAgent(CreateRegistrationRequest(eventTypes, typeof(GMAgent), connectionId), client.CallContext); - requestStream.AddMessage(new Message { CloudEvent = inputEvent }); - var newMessageReceived = await responseStream.ReadNextAsync(); + var inputEvent = new NewMessageReceived { Message = $"Start-{connectionId}" }.ToCloudEvent("gh-gh-gh"); + + client.AddMessage(new Message { CloudEvent = inputEvent }); + var newMessageReceived = await client.ReadNext(); newMessageReceived!.CloudEvent.Type.Should().Be(GetFullName(typeof(NewMessageReceived))); + newMessageReceived.CloudEvent.Source.Should().Be("gh-gh-gh"); // Simulate an agent, by publishing a new message in the request stream + var helloEvent = new Hello { Message = $"Hello test-{connectionId}" }.ToCloudEvent("gh-gh-gh"); + client.AddMessage(new Message { CloudEvent = helloEvent }); + + var helloMessageReceived = await client.ReadNext(); + helloMessageReceived!.CloudEvent.Type.Should().Be(GetFullName(typeof(Hello))); + helloMessageReceived.CloudEvent.Source.Should().Be("gh-gh-gh"); + } + + [Fact] + public async Task Test_Message_Goes_To_Right_Worker() + { + var logger = Mock.Of>(); + var gateway = new GrpcGateway(_fixture.Cluster.Client, logger); + var service = new GrpcGatewayService(gateway); + using var client = new TestGrpcClient(); + + var assembly = typeof(PBAgent).Assembly; + var eventTypes = ReflectionHelper.GetAgentsMetadata(assembly); + + await service.OpenChannel(client.RequestStream, client.ResponseStream, client.CallContext); + var responseMessage = await client.ReadNext(); + + var connectionId = responseMessage!.Response.RequestId; - var outputEvent = await responseStream.ReadNextAsync(); - outputEvent!.CloudEvent.Type.Should().Be(GetFullName(typeof(Hello))); + await service.RegisterAgent(CreateRegistrationRequest(eventTypes, typeof(PBAgent), connectionId), client.CallContext); + await service.RegisterAgent(CreateRegistrationRequest(eventTypes, typeof(GMAgent), connectionId), client.CallContext); } diff --git a/dotnet/test/Microsoft.AutoGen.Runtime.Grpc.Tests/Helpers/Grpc/TestGrpcClient.cs b/dotnet/test/Microsoft.AutoGen.Runtime.Grpc.Tests/Helpers/Grpc/TestGrpcClient.cs new file mode 100644 index 000000000000..f839b318b85f --- /dev/null +++ b/dotnet/test/Microsoft.AutoGen.Runtime.Grpc.Tests/Helpers/Grpc/TestGrpcClient.cs @@ -0,0 +1,36 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// TestGrpcClient.cs + +using Microsoft.AutoGen.Abstractions; + +namespace Microsoft.AutoGen.Runtime.Grpc.Tests.Helpers.Grpc; +internal class TestGrpcClient: IDisposable +{ + public TestAsyncStreamReader RequestStream { get; } + public TestServerStreamWriter ResponseStream { get; } + public TestServerCallContext CallContext { get; } + + public TestGrpcClient() + { + CallContext = TestServerCallContext.Create(); + RequestStream = new TestAsyncStreamReader(CallContext); + ResponseStream = new TestServerStreamWriter(CallContext); + } + + public async Task ReadNext() + { + var response = await ResponseStream.ReadNextAsync(); + return response!; + } + + public void AddMessage(Message message) + { + RequestStream.AddMessage(message); + } + + public void Dispose() + { + RequestStream.Dispose(); + ResponseStream.Dispose(); + } +}