diff --git a/src/AWS/Orleans.Streaming.SQS/Storage/SQSStorage.cs b/src/AWS/Orleans.Streaming.SQS/Storage/SQSStorage.cs index a0374916087..9694279c02f 100644 --- a/src/AWS/Orleans.Streaming.SQS/Storage/SQSStorage.cs +++ b/src/AWS/Orleans.Streaming.SQS/Storage/SQSStorage.cs @@ -245,10 +245,11 @@ public async Task> GetMessages(int count = 1) if (count < 1) throw new ArgumentOutOfRangeException(nameof(count)); + var request = new ReceiveMessageRequest { QueueUrl = queueUrl, - MaxNumberOfMessages = count <= MAX_NUMBER_OF_MESSAGE_TO_PEAK ? count : MAX_NUMBER_OF_MESSAGE_TO_PEAK, + MaxNumberOfMessages = count <= MAX_NUMBER_OF_MESSAGE_TO_PEEK ? count : MAX_NUMBER_OF_MESSAGE_TO_PEEK, AttributeNames = sqsOptions.ReceiveAttributes, MessageAttributeNames = sqsOptions.ReceiveMessageAttributes, }; @@ -293,6 +294,52 @@ public async Task DeleteMessage(SQSMessage message) } } + public async Task DeleteMessages(IEnumerable messages) + { + try + { + foreach (var message in messages) + { + ValidateMessageForDeletion(message); + } + + foreach (var batch in messages.Chunk(MAX_NUMBER_OF_MESSAGE_TO_PEEK)) + { + var deleteRequest = new DeleteMessageBatchRequest + { + QueueUrl = queueUrl, + Entries = batch + .Select((m, i) => + new DeleteMessageBatchRequestEntry(i.ToString(), m.ReceiptHandle)) + .ToList() + }; + + var result = await sqsClient.DeleteMessageBatchAsync(deleteRequest); + foreach (var failed in result.Failed) + { + Logger.LogWarning("Failed to delete message {MessageId} from SQS queue {QueueName}. Error code: {ErrorCode}. Error message: {ErrorMessage}", + failed.Id, QueueName, failed.Code, failed.Message); + } + } + } + catch (Exception exc) + { + ReportErrorAndRethrow(exc, "GetMessages", ErrorCode.StreamProviderManagerBase); + } + } + + private void ValidateMessageForDeletion(SQSMessage message) + { + if (message == null) + throw new ArgumentNullException(nameof(message)); + + if (string.IsNullOrWhiteSpace(message.ReceiptHandle)) + throw new ArgumentNullException(nameof(message.ReceiptHandle)); + + if (string.IsNullOrWhiteSpace(queueUrl)) + throw new InvalidOperationException("Queue not initialized"); + } + private void ReportErrorAndRethrow(Exception exc, string operation, ErrorCode errorCode) { Logger.LogError((int)errorCode, exc, "Error doing {Operation} for SQS queue {QueueName}", operation, QueueName); diff --git a/src/AWS/Orleans.Streaming.SQS/Streams/SQSAdapterReceiver.cs b/src/AWS/Orleans.Streaming.SQS/Streams/SQSAdapterReceiver.cs index 045357ec7fc..7fb82f3d61f 100644 --- a/src/AWS/Orleans.Streaming.SQS/Streams/SQSAdapterReceiver.cs +++ b/src/AWS/Orleans.Streaming.SQS/Streams/SQSAdapterReceiver.cs @@ -100,7 +100,7 @@ public async Task MessagesDeliveredAsync(IList messages) var queueRef = queue; // store direct ref, in case we are somehow asked to shutdown while we are receiving. if (messages.Count == 0 || queueRef == null) return; List cloudQueueMessages = messages.Cast().Select(b => b.Message).ToList(); - outstandingTask = Task.WhenAll(cloudQueueMessages.Select(queueRef.DeleteMessage)); + outstandingTask = queue.DeleteMessages(cloudQueueMessages); try { await outstandingTask; diff --git a/test/Extensions/AWSUtils.Tests/Streaming/SQSAdapterTests.cs b/test/Extensions/AWSUtils.Tests/Streaming/SQSAdapterTests.cs index 6cf18f0cc28..259485a1e0a 100644 --- a/test/Extensions/AWSUtils.Tests/Streaming/SQSAdapterTests.cs +++ b/test/Extensions/AWSUtils.Tests/Streaming/SQSAdapterTests.cs @@ -101,7 +101,7 @@ private async Task SendAndReceiveFromQueueAdapter(IQueueAdapterFactory adapterFa { while (receivedBatches < NumBatches) { - var messages = (await receiver.GetQueueMessagesAsync(SQSStorage.MAX_NUMBER_OF_MESSAGE_TO_PEAK)).ToArray(); + var messages = (await receiver.GetQueueMessagesAsync(SQSStorage.MAX_NUMBER_OF_MESSAGE_TO_PEEK)).ToArray(); if (!messages.Any()) { await Task.Delay(QueuePollRate); diff --git a/test/Extensions/AWSUtils.Tests/Streaming/SQSDataAdapterTests.cs b/test/Extensions/AWSUtils.Tests/Streaming/SQSDataAdapterTests.cs index ec6424c459b..327e69e6d8a 100644 --- a/test/Extensions/AWSUtils.Tests/Streaming/SQSDataAdapterTests.cs +++ b/test/Extensions/AWSUtils.Tests/Streaming/SQSDataAdapterTests.cs @@ -103,7 +103,7 @@ private async Task SendAndReceiveFromQueueAdapter(IQueueAdapterFactory adapterFa { while (receivedBatches < NumBatches) { - var messages = (await receiver.GetQueueMessagesAsync(SQSStorage.MAX_NUMBER_OF_MESSAGE_TO_PEAK)).ToArray(); + var messages = (await receiver.GetQueueMessagesAsync(SQSStorage.MAX_NUMBER_OF_MESSAGE_TO_PEEK)).ToArray(); if (!messages.Any()) { await Task.Delay(QueuePollRate);