Skip to content

Commit

Permalink
First go at dotnet worker
Browse files Browse the repository at this point in the history
  • Loading branch information
jackgerrits committed Jan 28, 2025
1 parent bd5a24b commit d784c17
Show file tree
Hide file tree
Showing 10 changed files with 925 additions and 0 deletions.
597 changes: 597 additions & 0 deletions dotnet/src/Microsoft.AutoGen/Core.Grpc/GrpcAgentRuntime.cs

Large diffs are not rendered by default.

23 changes: 23 additions & 0 deletions dotnet/src/Microsoft.AutoGen/Core.Grpc/IAgentMessageSerializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// IAgentMessageSerializer.cs

namespace Microsoft.AutoGen.Core.Grpc;
/// <summary>
/// Interface for serializing and deserializing agent messages.
/// </summary>
public interface IAgentMessageSerializer
{
/// <summary>
/// Serialize an agent message.
/// </summary>
/// <param name="message">The message to serialize.</param>
/// <returns>The serialized message.</returns>
Google.Protobuf.WellKnownTypes.Any Serialize(object message);

/// <summary>
/// Deserialize an agent message.
/// </summary>
/// <param name="message">The message to deserialize.</param>
/// <returns>The deserialized message.</returns>
object Deserialize(Google.Protobuf.WellKnownTypes.Any message);
}
101 changes: 101 additions & 0 deletions dotnet/src/Microsoft.AutoGen/Core.Grpc/IAgentRuntimeExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// IAgentRuntimeExtensions.cs

using System.Diagnostics;
using Google.Protobuf.Collections;
using Microsoft.AutoGen.Contracts;
using Microsoft.Extensions.DependencyInjection;
using static Microsoft.AutoGen.Contracts.CloudEvent.Types;

namespace Microsoft.AutoGen.Core.Grpc;

public static class IAgentRuntimeExtensions
{
public static (string?, string?) GetTraceIdAndState(IAgentRuntime runtime, IDictionary<string, string> metadata)
{
var dcp = runtime.RuntimeServiceProvider.GetRequiredService<DistributedContextPropagator>();
dcp.ExtractTraceIdAndState(metadata,
static (object? carrier, string fieldName, out string? fieldValue, out IEnumerable<string>? fieldValues) =>
{
var metadata = (IDictionary<string, string>)carrier!;
fieldValues = null;
metadata.TryGetValue(fieldName, out fieldValue);
},
out var traceParent,
out var traceState);
return (traceParent, traceState);
}
public static (string?, string?) GetTraceIdAndState(IAgentRuntime worker, MapField<string, CloudEventAttributeValue> metadata)
{
var dcp = worker.RuntimeServiceProvider.GetRequiredService<DistributedContextPropagator>();
dcp.ExtractTraceIdAndState(metadata,
static (object? carrier, string fieldName, out string? fieldValue, out IEnumerable<string>? fieldValues) =>
{
var metadata = (MapField<string, CloudEventAttributeValue>)carrier!;
fieldValues = null;
metadata.TryGetValue(fieldName, out var ceValue);
fieldValue = ceValue?.CeString;
},
out var traceParent,
out var traceState);
return (traceParent, traceState);
}
public static void Update(IAgentRuntime worker, RpcRequest request, Activity? activity = null)
{
var dcp = worker.RuntimeServiceProvider.GetRequiredService<DistributedContextPropagator>();
dcp.Inject(activity, request.Metadata, static (carrier, key, value) =>
{
var metadata = (IDictionary<string, string>)carrier!;
if (metadata.TryGetValue(key, out _))
{
metadata[key] = value;
}
else
{
metadata.Add(key, value);
}
});
}
public static void Update(IAgentRuntime worker, CloudEvent cloudEvent, Activity? activity = null)
{
var dcp = worker.RuntimeServiceProvider.GetRequiredService<DistributedContextPropagator>();
dcp.Inject(activity, cloudEvent.Attributes, static (carrier, key, value) =>
{
var mapField = (MapField<string, CloudEventAttributeValue>)carrier!;
if (mapField.TryGetValue(key, out var ceValue))
{
mapField[key] = new CloudEventAttributeValue { CeString = value };
}
else
{
mapField.Add(key, new CloudEventAttributeValue { CeString = value });
}
});
}

public static IDictionary<string, string> ExtractMetadata(IAgentRuntime worker, IDictionary<string, string> metadata)
{
var dcp = worker.RuntimeServiceProvider.GetRequiredService<DistributedContextPropagator>();
var baggage = dcp.ExtractBaggage(metadata, static (object? carrier, string fieldName, out string? fieldValue, out IEnumerable<string>? fieldValues) =>
{
var metadata = (IDictionary<string, string>)carrier!;
fieldValues = null;
metadata.TryGetValue(fieldName, out fieldValue);
});

return baggage as IDictionary<string, string> ?? new Dictionary<string, string>();
}
public static IDictionary<string, string> ExtractMetadata(IAgentRuntime worker, MapField<string, CloudEventAttributeValue> metadata)
{
var dcp = worker.RuntimeServiceProvider.GetRequiredService<DistributedContextPropagator>();
var baggage = dcp.ExtractBaggage(metadata, static (object? carrier, string fieldName, out string? fieldValue, out IEnumerable<string>? fieldValues) =>
{
var metadata = (MapField<string, CloudEventAttributeValue>)carrier!;
fieldValues = null;
metadata.TryGetValue(fieldName, out var ceValue);
fieldValue = ceValue?.CeString;
});

return baggage as IDictionary<string, string> ?? new Dictionary<string, string>();
}
}
10 changes: 10 additions & 0 deletions dotnet/src/Microsoft.AutoGen/Core.Grpc/IProtoMessageSerializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// IProtoMessageSerializer.cs

namespace Microsoft.AutoGen.Core.Grpc;

public interface IProtoMessageSerializer
{
Google.Protobuf.WellKnownTypes.Any Serialize(object input);
object Deserialize(Google.Protobuf.WellKnownTypes.Any input);
}
27 changes: 27 additions & 0 deletions dotnet/src/Microsoft.AutoGen/Core.Grpc/ISerializationRegistry.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// ISerializationRegistry.cs

namespace Microsoft.AutoGen.Core.Grpc;

public interface IProtoSerializationRegistry
{
/// <summary>
/// Registers a serializer for the specified type.
/// </summary>
/// <param name="type">The type to register.</param>
void RegisterSerializer(System.Type type) => RegisterSerializer(type, new ProtobufMessageSerializer(type));

void RegisterSerializer(System.Type type, IProtoMessageSerializer serializer);

/// <summary>
/// Gets the serializer for the specified type.
/// </summary>
/// <param name="type">The type to get the serializer for.</param>
/// <returns>The serializer for the specified type.</returns>
IProtoMessageSerializer? GetSerializer(System.Type type) => GetSerializer(TypeNameResolver.ResolveTypeName(type));
IProtoMessageSerializer? GetSerializer(string typeName);

ITypeNameResolver TypeNameResolver { get; }

bool Exists(System.Type type);
}
9 changes: 9 additions & 0 deletions dotnet/src/Microsoft.AutoGen/Core.Grpc/ITypeNameResolver.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// ITypeNameResolver.cs

namespace Microsoft.AutoGen.Core.Grpc;

public interface ITypeNameResolver
{
string ResolveTypeName(object input);
}
21 changes: 21 additions & 0 deletions dotnet/src/Microsoft.AutoGen/Core.Grpc/ProtoTypeNameResolver.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// ITypeNameResolver.cs

using Google.Protobuf;

namespace Microsoft.AutoGen.Core.Grpc;

public class ProtoTypeNameResolver : ITypeNameResolver
{
public string ResolveTypeName(object input)
{
if (input is IMessage protoMessage)
{
return protoMessage.Descriptor.FullName;
}
else
{
throw new ArgumentException("Input must be a protobuf message.");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// ProtobufConversionExtensions.cs

using Microsoft.AutoGen.Contracts;
using Microsoft.AutoGen.Protobuf;

namespace Microsoft.AutoGen.Core.Grpc;

public static class ProtobufConversionExtensions
{
// Convert an ISubscrptionDefinition to a Protobuf Subscription
public static Subscription? ToProtobuf(this ISubscriptionDefinition subscriptionDefinition)
{
// Check if is a TypeSubscription
if (subscriptionDefinition is Contracts.TypeSubscription typeSubscription)
{
return new Subscription
{
Id = typeSubscription.Id,
TypeSubscription = new Protobuf.TypeSubscription
{
TopicType = typeSubscription.TopicType,
AgentType = typeSubscription.AgentType
}
};
}

// Check if is a TypePrefixSubscription
if (subscriptionDefinition is Contracts.TypePrefixSubscription typePrefixSubscription)
{
return new Subscription
{
Id = typePrefixSubscription.Id,
TypePrefixSubscription = new Protobuf.TypePrefixSubscription
{
TopicTypePrefix = typePrefixSubscription.TopicTypePrefix,
AgentType = typePrefixSubscription.AgentType
}
};
}

return null;
}

// Convert AgentId from Protobuf to AgentId
public static Contracts.AgentId FromProtobuf(this Protobuf.AgentId agentId)
{
return new Contracts.AgentId(agentId.Type, agentId.Key);
}

// Convert AgentId from AgentId to Protobuf
public static Protobuf.AgentId ToProtobuf(this Contracts.AgentId agentId)
{
return new Protobuf.AgentId
{
Type = agentId.Type,
Key = agentId.Key
};
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// ProtobufMessageSerializer.cs

using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;

namespace Microsoft.AutoGen.Core.Grpc;

/// <summary>
/// Interface for serializing and deserializing agent messages.
/// </summary>
public class ProtobufMessageSerializer : IProtoMessageSerializer
{
private System.Type _concreteType;

public ProtobufMessageSerializer(System.Type concreteType)
{
_concreteType = concreteType;
}

public object Deserialize(Any message)
{
// Check if the concrete type is a proto IMessage
if (typeof(IMessage).IsAssignableFrom(_concreteType))
{
var nameOfMethod = nameof(Any.Unpack);
var result = message.GetType().GetMethods().Where(m => m.Name == nameOfMethod && m.IsGenericMethod).First().MakeGenericMethod(_concreteType).Invoke(message, null);
return result as IMessage ?? throw new ArgumentException("Failed to deserialize", nameof(message));
}

// Raise an exception if the concrete type is not a proto IMessage
throw new ArgumentException("Concrete type must be a proto IMessage", nameof(_concreteType));
}

public Any Serialize(object message)
{
// Check if message is a proto IMessage
if (message is IMessage protoMessage)
{
return Any.Pack(protoMessage);
}

// Raise an exception if the message is not a proto IMessage
throw new ArgumentException("Message must be a proto IMessage", nameof(message));
}
}
30 changes: 30 additions & 0 deletions dotnet/src/Microsoft.AutoGen/Core.Grpc/SerializationRegistry.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// SerializationRegistry.cs

namespace Microsoft.AutoGen.Core.Grpc;

public class ProtoSerializationRegistry : IProtoSerializationRegistry
{
private readonly Dictionary<Type, IProtoMessageSerializer> _serializers
= new Dictionary<Type, IProtoMessageSerializer>();

public bool Exists(Type type)
{
return _serializers.ContainsKey(type);
}

public IProtoMessageSerializer? GetSerializer(Type type)
{
_serializers.TryGetValue(type, out var serializer);
return serializer;
}

public void RegisterSerializer(Type type, IProtoMessageSerializer serializer)
{
if (_serializers.ContainsKey(type))
{
throw new InvalidOperationException($"Serializer already registered for {type.FullName}");
}
_serializers[type] = serializer;
}
}

0 comments on commit d784c17

Please sign in to comment.