Skip to content

Commit

Permalink
Delete SQS messages using BatchDelete
Browse files Browse the repository at this point in the history
  • Loading branch information
jamescarter-le committed May 23, 2024
1 parent c30de5d commit 67c69c0
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 4 deletions.
49 changes: 48 additions & 1 deletion src/AWS/Orleans.Streaming.SQS/Storage/SQSStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,11 @@ public async Task<IEnumerable<SQSMessage>> GetMessages(int count = 1)
if (count < 1)
throw new ArgumentOutOfRangeException(nameof(count));


var request = new ReceiveMessageRequest
{
QueueUrl = queueUrl,
MaxNumberOfMessages = count <= MAX_NUMBER_OF_MESSAGE_TO_PEAK ? count : MAX_NUMBER_OF_MESSAGE_TO_PEAK,
MaxNumberOfMessages = count <= MAX_NUMBER_OF_MESSAGE_TO_PEEK ? count : MAX_NUMBER_OF_MESSAGE_TO_PEEK,
AttributeNames = sqsOptions.ReceiveAttributes,
MessageAttributeNames = sqsOptions.ReceiveMessageAttributes,
};
Expand Down Expand Up @@ -293,6 +294,52 @@ public async Task DeleteMessage(SQSMessage message)
}
}

public async Task DeleteMessages(IEnumerable<SQSMessage> messages)
{
try
{
foreach (var message in messages)
{
ValidateMessageForDeletion(message);
}

foreach (var batch in messages.Chunk(MAX_NUMBER_OF_MESSAGE_TO_PEEK))
{
var deleteRequest = new DeleteMessageBatchRequest
{
QueueUrl = queueUrl,
Entries = batch
.Select((m, i) =>
new DeleteMessageBatchRequestEntry(i.ToString(), m.ReceiptHandle))
.ToList()
};

var result = await sqsClient.DeleteMessageBatchAsync(deleteRequest);
foreach (var failed in result.Failed)
{
Logger.LogWarning("Failed to delete message {MessageId} from SQS queue {QueueName}. Error code: {ErrorCode}. Error message: {ErrorMessage}",
failed.Id, QueueName, failed.Code, failed.Message);
}
}
}
catch (Exception exc)
{
ReportErrorAndRethrow(exc, "GetMessages", ErrorCode.StreamProviderManagerBase);
}
}

private void ValidateMessageForDeletion(SQSMessage message)
{
if (message == null)
throw new ArgumentNullException(nameof(message));

if (string.IsNullOrWhiteSpace(message.ReceiptHandle))
throw new ArgumentNullException(nameof(message.ReceiptHandle));

if (string.IsNullOrWhiteSpace(queueUrl))
throw new InvalidOperationException("Queue not initialized");
}

private void ReportErrorAndRethrow(Exception exc, string operation, ErrorCode errorCode)
{
Logger.LogError((int)errorCode, exc, "Error doing {Operation} for SQS queue {QueueName}", operation, QueueName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public async Task MessagesDeliveredAsync(IList<IBatchContainer> messages)
var queueRef = queue; // store direct ref, in case we are somehow asked to shutdown while we are receiving.
if (messages.Count == 0 || queueRef == null) return;
List<SQSMessage> cloudQueueMessages = messages.Cast<SQSBatchContainer>().Select(b => b.Message).ToList();
outstandingTask = Task.WhenAll(cloudQueueMessages.Select(queueRef.DeleteMessage));
outstandingTask = queue.DeleteMessages(cloudQueueMessages);
try
{
await outstandingTask;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ private async Task SendAndReceiveFromQueueAdapter(IQueueAdapterFactory adapterFa
{
while (receivedBatches < NumBatches)
{
var messages = (await receiver.GetQueueMessagesAsync(SQSStorage.MAX_NUMBER_OF_MESSAGE_TO_PEAK)).ToArray();
var messages = (await receiver.GetQueueMessagesAsync(SQSStorage.MAX_NUMBER_OF_MESSAGE_TO_PEEK)).ToArray();
if (!messages.Any())
{
await Task.Delay(QueuePollRate);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ private async Task SendAndReceiveFromQueueAdapter(IQueueAdapterFactory adapterFa
{
while (receivedBatches < NumBatches)
{
var messages = (await receiver.GetQueueMessagesAsync(SQSStorage.MAX_NUMBER_OF_MESSAGE_TO_PEAK)).ToArray();
var messages = (await receiver.GetQueueMessagesAsync(SQSStorage.MAX_NUMBER_OF_MESSAGE_TO_PEEK)).ToArray();
if (!messages.Any())
{
await Task.Delay(QueuePollRate);
Expand Down

0 comments on commit 67c69c0

Please sign in to comment.