Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
jackgerrits committed Jan 29, 2025
1 parent d784c17 commit 1b9ad7a
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 44 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// GrpcAgentWorkerHostBuilderExtension.cs
using System.Diagnostics;
using Grpc.Core;
using Grpc.Net.Client.Configuration;
using Microsoft.AutoGen.Contracts;
using Microsoft.AutoGen.Protobuf;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace Microsoft.AutoGen.Core.Grpc;

public static class GrpcAgentWorkerHostBuilderExtensions
{
private const string _defaultAgentServiceAddress = "https://localhost:53071";

// TODO: How do we ensure AddGrpcAgentWorker and UseInProcessRuntime are mutually exclusive?
public static AgentsAppBuilder AddGrpcAgentWorker(this AgentsAppBuilder builder, string? agentServiceAddress = null)
{
builder.Services.AddGrpcClient<AgentRpc.AgentRpcClient>(options =>
{
options.Address = new Uri(agentServiceAddress ?? builder.Configuration["AGENT_HOST"] ?? _defaultAgentServiceAddress);
options.ChannelOptionsActions.Add(channelOptions =>
{
var loggerFactory = new LoggerFactory();
if (Debugger.IsAttached)
{
channelOptions.HttpHandler = new SocketsHttpHandler
{
EnableMultipleHttp2Connections = false,
KeepAlivePingDelay = TimeSpan.FromSeconds(200),
KeepAlivePingTimeout = TimeSpan.FromSeconds(100),
KeepAlivePingPolicy = HttpKeepAlivePingPolicy.Always
};
}
else
{
channelOptions.HttpHandler = new SocketsHttpHandler
{
EnableMultipleHttp2Connections = true,
KeepAlivePingDelay = TimeSpan.FromSeconds(20),
KeepAlivePingTimeout = TimeSpan.FromSeconds(10),
KeepAlivePingPolicy = HttpKeepAlivePingPolicy.WithActiveRequests
};
}

var methodConfig = new MethodConfig
{
Names = { MethodName.Default },
RetryPolicy = new RetryPolicy
{
MaxAttempts = 5,
InitialBackoff = TimeSpan.FromSeconds(1),
MaxBackoff = TimeSpan.FromSeconds(5),
BackoffMultiplier = 1.5,
RetryableStatusCodes = { StatusCode.Unavailable }
}
};

channelOptions.ServiceConfig = new() { MethodConfigs = { methodConfig } };
channelOptions.ThrowOperationCanceledOnCancellation = true;
});
});
builder.Services.TryAddSingleton(DistributedContextPropagator.Current);
builder.Services.AddSingleton<IAgentRuntime, GrpcAgentRuntime>();
builder.Services.AddSingleton<IHostedService>(sp => (IHostedService)sp.GetRequiredService<IAgentRuntime>());
return builder;
}
}
27 changes: 14 additions & 13 deletions dotnet/src/Microsoft.AutoGen/Core.Grpc/IAgentRuntimeExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@
using System.Diagnostics;
using Google.Protobuf.Collections;
using Microsoft.AutoGen.Contracts;
using Microsoft.AutoGen.Protobuf;
using Microsoft.Extensions.DependencyInjection;
using static Microsoft.AutoGen.Contracts.CloudEvent.Types;

namespace Microsoft.AutoGen.Core.Grpc;

public static class IAgentRuntimeExtensions
public static class GrpcAgentRuntimeExtensions
{
public static (string?, string?) GetTraceIdAndState(IAgentRuntime runtime, IDictionary<string, string> metadata)
public static (string?, string?) GetTraceIdAndState(GrpcAgentRuntime runtime, IDictionary<string, string> metadata)
{
var dcp = runtime.RuntimeServiceProvider.GetRequiredService<DistributedContextPropagator>();
var dcp = runtime.ServiceProvider.GetRequiredService<DistributedContextPropagator>();
dcp.ExtractTraceIdAndState(metadata,
static (object? carrier, string fieldName, out string? fieldValue, out IEnumerable<string>? fieldValues) =>
{
Expand All @@ -25,9 +26,9 @@ public static (string?, string?) GetTraceIdAndState(IAgentRuntime runtime, IDict
out var traceState);
return (traceParent, traceState);
}
public static (string?, string?) GetTraceIdAndState(IAgentRuntime worker, MapField<string, CloudEventAttributeValue> metadata)
public static (string?, string?) GetTraceIdAndState(GrpcAgentRuntime worker, MapField<string, CloudEventAttributeValue> metadata)
{
var dcp = worker.RuntimeServiceProvider.GetRequiredService<DistributedContextPropagator>();
var dcp = worker.ServiceProvider.GetRequiredService<DistributedContextPropagator>();
dcp.ExtractTraceIdAndState(metadata,
static (object? carrier, string fieldName, out string? fieldValue, out IEnumerable<string>? fieldValues) =>
{
Expand All @@ -40,9 +41,9 @@ public static (string?, string?) GetTraceIdAndState(IAgentRuntime worker, MapFie
out var traceState);
return (traceParent, traceState);
}
public static void Update(IAgentRuntime worker, RpcRequest request, Activity? activity = null)
public static void Update(GrpcAgentRuntime worker, RpcRequest request, Activity? activity = null)
{
var dcp = worker.RuntimeServiceProvider.GetRequiredService<DistributedContextPropagator>();
var dcp = worker.ServiceProvider.GetRequiredService<DistributedContextPropagator>();
dcp.Inject(activity, request.Metadata, static (carrier, key, value) =>
{
var metadata = (IDictionary<string, string>)carrier!;
Expand All @@ -56,9 +57,9 @@ public static void Update(IAgentRuntime worker, RpcRequest request, Activity? ac
}
});
}
public static void Update(IAgentRuntime worker, CloudEvent cloudEvent, Activity? activity = null)
public static void Update(GrpcAgentRuntime worker, CloudEvent cloudEvent, Activity? activity = null)
{
var dcp = worker.RuntimeServiceProvider.GetRequiredService<DistributedContextPropagator>();
var dcp = worker.ServiceProvider.GetRequiredService<DistributedContextPropagator>();
dcp.Inject(activity, cloudEvent.Attributes, static (carrier, key, value) =>
{
var mapField = (MapField<string, CloudEventAttributeValue>)carrier!;
Expand All @@ -73,9 +74,9 @@ public static void Update(IAgentRuntime worker, CloudEvent cloudEvent, Activity?
});
}

public static IDictionary<string, string> ExtractMetadata(IAgentRuntime worker, IDictionary<string, string> metadata)
public static IDictionary<string, string> ExtractMetadata(GrpcAgentRuntime worker, IDictionary<string, string> metadata)
{
var dcp = worker.RuntimeServiceProvider.GetRequiredService<DistributedContextPropagator>();
var dcp = worker.ServiceProvider.GetRequiredService<DistributedContextPropagator>();
var baggage = dcp.ExtractBaggage(metadata, static (object? carrier, string fieldName, out string? fieldValue, out IEnumerable<string>? fieldValues) =>
{
var metadata = (IDictionary<string, string>)carrier!;
Expand All @@ -85,9 +86,9 @@ public static IDictionary<string, string> ExtractMetadata(IAgentRuntime worker,

return baggage as IDictionary<string, string> ?? new Dictionary<string, string>();
}
public static IDictionary<string, string> ExtractMetadata(IAgentRuntime worker, MapField<string, CloudEventAttributeValue> metadata)
public static IDictionary<string, string> ExtractMetadata(GrpcAgentRuntime worker, MapField<string, CloudEventAttributeValue> metadata)
{
var dcp = worker.RuntimeServiceProvider.GetRequiredService<DistributedContextPropagator>();
var dcp = worker.ServiceProvider.GetRequiredService<DistributedContextPropagator>();
var baggage = dcp.ExtractBaggage(metadata, static (object? carrier, string fieldName, out string? fieldValue, out IEnumerable<string>? fieldValues) =>
{
var metadata = (MapField<string, CloudEventAttributeValue>)carrier!;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// ProtoSerializationRegistry.cs

namespace Microsoft.AutoGen.Core.Grpc;

public class ProtoSerializationRegistry : IProtoSerializationRegistry
{
private readonly Dictionary<string, IProtoMessageSerializer> _serializers
= new Dictionary<string, IProtoMessageSerializer>();

public ITypeNameResolver TypeNameResolver => new ProtoTypeNameResolver();

public bool Exists(Type type)
{
return _serializers.ContainsKey(TypeNameResolver.ResolveTypeName(type));
}

public IProtoMessageSerializer? GetSerializer(Type type)
{
return GetSerializer(TypeNameResolver.ResolveTypeName(type));
}

public IProtoMessageSerializer? GetSerializer(string typeName)
{
_serializers.TryGetValue(typeName, out var serializer);
return serializer;
}

public void RegisterSerializer(Type type, IProtoMessageSerializer serializer)
{
if (_serializers.ContainsKey(TypeNameResolver.ResolveTypeName(type)))
{
throw new InvalidOperationException($"Serializer already registered for {type.FullName}");
}
_serializers[TypeNameResolver.ResolveTypeName(type)] = serializer;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// ITypeNameResolver.cs
// ProtoTypeNameResolver.cs

using Google.Protobuf;

Expand Down
30 changes: 0 additions & 30 deletions dotnet/src/Microsoft.AutoGen/Core.Grpc/SerializationRegistry.cs

This file was deleted.

2 changes: 2 additions & 0 deletions dotnet/src/Microsoft.AutoGen/Core/AgentsApp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Diagnostics;
using System.Reflection;
using Microsoft.AutoGen.Contracts;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

Expand All @@ -21,6 +22,7 @@ public AgentsAppBuilder(HostApplicationBuilder? baseBuilder = null)
}

public IServiceCollection Services => this.builder.Services;
public IConfiguration Configuration => this.builder.Configuration;

public void AddAgentsFromAssemblies()
{
Expand Down

0 comments on commit 1b9ad7a

Please sign in to comment.