Skip to content

Commit

Permalink
message registry buffer size limit #5582 (#5603)
Browse files Browse the repository at this point in the history
  • Loading branch information
rysweet authored Feb 21, 2025
1 parent 2f43005 commit 3a239c5
Show file tree
Hide file tree
Showing 3 changed files with 253 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,152 +4,79 @@
using Microsoft.AutoGen.Contracts;
using Microsoft.AutoGen.RuntimeGateway.Grpc.Abstractions;
using Microsoft.Extensions.Logging;
using Orleans.Concurrency;

namespace Microsoft.AutoGen.RuntimeGateway.Grpc;

internal sealed class MessageRegistryGrain(
[PersistentState("state", "PubSubStore")] IPersistentState<MessageRegistryState> state,
ILogger<MessageRegistryGrain> logger
) : Grain, IMessageRegistryGrain
[Reentrant]
internal sealed class MessageRegistryGrain : Grain, IMessageRegistryGrain
{
// <summary>
// Helper class for managing state writes.
// </summary>
private readonly StateManager _stateManager = new(state);
public enum QueueType
{
DeadLetterQueue,
EventBuffer
}

// <summary>
// The number of times to retry writing the state before giving up.
// </summary>
private const int _retries = 5;
/// <summary>
/// The time to wait before removing a message from the event buffer.
/// in milliseconds
/// </summary>
private const int _bufferTime = 5000;
private readonly ILogger<MessageRegistryGrain> _logger = logger;

// <inheritdoc />
public async Task AddMessageToDeadLetterQueueAsync(string topic, CloudEvent message)
{
await TryWriteMessageAsync("dlq", topic, message).ConfigureAwait(true);
}

///<inheritdoc />
public async Task AddMessageToEventBufferAsync(string topic, CloudEvent message)
{
await TryWriteMessageAsync("eb", topic, message).ConfigureAwait(true);
// Schedule the removal task to run in the background after bufferTime
RemoveMessageAfterDelay(topic, message).Ignore();
}

/// <summary>
/// remove a specific message from the buffer for a given topic
/// maximum size of a message we will write to the state store in bytes
/// </summary>
/// <param name="topic"></param>
/// <param name="message"></param>
/// <returns>ValueTask<bool></returns>
private async ValueTask<bool> RemoveMessage(string topic, CloudEvent message)
{
if (state.State.EventBuffer != null && state.State.EventBuffer.TryGetValue(topic, out List<CloudEvent>? events))
{
if (events != null && events.Remove(message))
{
state.State.EventBuffer.AddOrUpdate(topic, events, (_, _) => events);
await _stateManager.WriteStateAsync().ConfigureAwait(true);
return true;
}
}
return false;
}
/// <remarks>set this to HALF your intended limit as protobuf strings are UTF8 but .NET UTF16</remarks>
private const int _maxMessageSize = 1024 * 1024 * 10; // 10MB

/// <summary>
/// remove a specific message from the buffer for a given topic after a delay
/// maximum size of a each queue
/// </summary>
/// <param name="topic"></param>
/// <param name="message"></param>
private async Task RemoveMessageAfterDelay(string topic, CloudEvent message)
/// <remarks>set this to HALF your intended limit as protobuf strings are UTF8 but .NET UTF16</remarks>
private const int _maxQueueSize = 1024 * 1024 * 10; // 10MB

private readonly MessageRegistryQueue _dlqQueue;
private readonly MessageRegistryQueue _ebQueue;

public MessageRegistryGrain(
[PersistentState("state", "PubSubStore")] IPersistentState<MessageRegistryState> state,
ILogger<MessageRegistryGrain> logger)
{
await Task.Delay(_bufferTime);
await RemoveMessage(topic, message);
var stateManager = new StateManager(state);
_dlqQueue = new MessageRegistryQueue(
QueueType.DeadLetterQueue,
state,
stateManager,
logger,
_maxMessageSize,
_maxQueueSize);

_ebQueue = new MessageRegistryQueue(
QueueType.EventBuffer,
state,
stateManager,
logger,
_maxMessageSize,
_maxQueueSize);
}

/// <summary>
/// Tries to write a message to the given queue in Orleans state.
/// Allows for retries using etag for optimistic concurrency.
/// </summary>
/// <param name="whichQueue"></param>
/// <param name="topic"></param>
/// <param name="message"></param>
/// <returns></returns>
/// <exception cref="InvalidOperationException"></exception>
private async ValueTask<bool> TryWriteMessageAsync(string whichQueue, string topic, CloudEvent message)
// <inheritdoc />
public async Task AddMessageToDeadLetterQueueAsync(string topic, CloudEvent message)
{
var retries = _retries;
while (!await WriteMessageAsync(whichQueue, topic, message, state.Etag).ConfigureAwait(false))
{
if (retries-- <= 0)
{
throw new InvalidOperationException($"Failed to write MessageRegistryState after {_retries} retries.");
}
_logger.LogWarning("Failed to write MessageRegistryState. Retrying...");
retries--;
}
if (retries == 0) { return false; } else { return true; }
await _dlqQueue.AddMessageAsync(topic, message);
}
/// <summary>
/// Writes a message to the given queue in Orleans state.
/// </summary>
/// <param name="whichQueue"></param>
/// <param name="topic"></param>
/// <param name="message"></param>
/// <param name="etag"></param>
/// <returns>ValueTask<bool></returns>
/// <exception cref="ArgumentException"></exception>
private async ValueTask<bool> WriteMessageAsync(string whichQueue, string topic, CloudEvent message, string etag)

///<inheritdoc />
public async Task AddMessageToEventBufferAsync(string topic, CloudEvent message)
{
if (state.Etag != null && state.Etag != etag)
{
return false;
}
switch (whichQueue)
{
case "dlq":
var dlqQueue = state.State.DeadLetterQueue.GetOrAdd(topic, _ => new());
dlqQueue.Add(message);
state.State.DeadLetterQueue.AddOrUpdate(topic, dlqQueue, (_, _) => dlqQueue);
break;
case "eb":
var ebQueue = state.State.EventBuffer.GetOrAdd(topic, _ => new());
ebQueue.Add(message);
state.State.EventBuffer.AddOrUpdate(topic, ebQueue, (_, _) => ebQueue);
break;
default:
throw new ArgumentException($"Invalid queue name: {whichQueue}");
}
await _stateManager.WriteStateAsync().ConfigureAwait(true);
return true;
await _ebQueue.AddMessageAsync(topic, message);
_ebQueue.RemoveMessageAfterDelayAsync(topic, message, _bufferTime).Ignore();
}

// <inheritdoc />
public async Task<List<CloudEvent>> RemoveMessagesAsync(string topic)
{
var messages = new List<CloudEvent>();
if (state.State.DeadLetterQueue != null && state.State.DeadLetterQueue.Remove(topic, out List<CloudEvent>? letters))
{
await _stateManager.WriteStateAsync().ConfigureAwait(true);
if (letters != null)
{
messages.AddRange(letters);
}
}
if (state.State.EventBuffer != null && state.State.EventBuffer.Remove(topic, out List<CloudEvent>? events))
{
await _stateManager.WriteStateAsync().ConfigureAwait(true);
if (events != null)
{
messages.AddRange(events);
}
}
return messages;
var removedDeadLetter = await _dlqQueue.RemoveMessagesAsync(topic);
var removedBuffer = await _ebQueue.RemoveMessagesAsync(topic);
return removedDeadLetter.Concat(removedBuffer).ToList();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// MessageRegistryQueue.cs

using System.Collections.Concurrent;
using Microsoft.AutoGen.Contracts;
using Microsoft.AutoGen.RuntimeGateway.Grpc.Abstractions;
using Microsoft.Extensions.Logging;

namespace Microsoft.AutoGen.RuntimeGateway.Grpc;

public sealed class MessageRegistryQueue
{
private ConcurrentDictionary<string, List<CloudEvent>> _queue = new();
private readonly int _maxMessageSize;
private readonly int _maxQueueSize;
private readonly Dictionary<DateTime, string> _timestamps = new();
private int _currentSize;
private readonly IPersistentState<MessageRegistryState> _state;
private readonly ILogger _logger;
private readonly StateManager _stateManager;
private readonly MessageRegistryGrain.QueueType _queueType;

internal MessageRegistryQueue(MessageRegistryGrain.QueueType queueType,
IPersistentState<MessageRegistryState> state,
StateManager stateManager,
ILogger logger,
int maxMessageSize,
int maxQueueSize)
{
if (state.State == null)
{
state.State = new MessageRegistryState();
}
_queueType = queueType;
_state = state;
// use the queueType to get the correct queue from state.State.
_queue = GetQueue();
_stateManager = stateManager;
_logger = logger;
_maxMessageSize = maxMessageSize;
_maxQueueSize = maxQueueSize;
}

public async Task AddMessageAsync(string topic, CloudEvent message)
{
var size = message.CalculateSize();
if (size > _maxMessageSize)
{
_logger.LogWarning("Message size {Size} for topic {Topic} in queue {Name} exceeds the maximum message size {Max}.",
size, topic, _queueType.ToString(), _maxMessageSize);
return;
}
if (_currentSize + size > _maxQueueSize)
{
while (_currentSize + size > _maxQueueSize && _timestamps.Count > 0)
{
var oldest = _timestamps.OrderBy(x => x.Key).First();
if (await RemoveOldestMessage(oldest.Value))
{
_timestamps.Remove(oldest.Key);
}
}
}
await AddOrUpdate(topic, message);
_currentSize += size;
}

public async Task<List<CloudEvent>> RemoveMessagesAsync(string topic)
{
var removed = new List<CloudEvent>();
var queue = GetQueue();
if (queue.Remove(topic, out var events))
{
removed.AddRange(events);
var total = 0;
foreach (var e in events) { total += e.CalculateSize(); }
_currentSize -= total;
}
// Remove timestamps that refer to this topic
var toRemove = _timestamps.Where(x => x.Value == topic).Select(x => x.Key).ToList();
foreach (var t in toRemove) { _timestamps.Remove(t); }
await _stateManager.WriteStateAsync().ConfigureAwait(true);
return removed;
}

public async Task<bool> RemoveMessageAsync(string topic, CloudEvent message)
{
var queue = GetQueue();
if (queue.TryGetValue(topic, out var events) && events.Remove(message))
{
_currentSize -= message.CalculateSize();
await _stateManager.WriteStateAsync().ConfigureAwait(true);
return true;
}
return false;
}

private async Task<bool> RemoveOldestMessage(string topic)
{
var queue = GetQueue();
if (queue.TryGetValue(topic, out var events) && events != null && events.Count > 0)
{
var oldestEvent = events[0];
events.RemoveAt(0);
_currentSize -= oldestEvent.CalculateSize();
_timestamps.Remove(_timestamps.OrderBy(x => x.Key).First().Key);
queue[topic] = events;
await _stateManager.WriteStateAsync().ConfigureAwait(true);
return true;
}
return false;
}

private async Task AddOrUpdate(string topic, CloudEvent message)
{
var queue = GetQueue();
var list = queue.GetOrAdd(topic, _ => new());
list.Add(message);
queue.AddOrUpdate(topic, list, (_, _) => list);
await _stateManager.WriteStateAsync().ConfigureAwait(true);
_timestamps.Add(DateTime.UtcNow, topic);
}

private ConcurrentDictionary<string, List<CloudEvent>> GetQueue()
{
return _queueType switch
{
MessageRegistryGrain.QueueType.DeadLetterQueue => _state.State.DeadLetterQueue,
MessageRegistryGrain.QueueType.EventBuffer => _state.State.EventBuffer,
_ => throw new ArgumentException($"Invalid queue type: {_queueType}.")
};
}

public async Task RemoveMessageAfterDelayAsync(string topic, CloudEvent message, int delay)
{
await Task.Delay(delay);
await RemoveMessageAsync(topic, message);
_currentSize -= message.CalculateSize();
}
}
Loading

0 comments on commit 3a239c5

Please sign in to comment.