|
1 |
| -using System.Buffers.Binary; |
| 1 | +using RabbitMQ.Client; |
| 2 | +using System.Buffers.Binary; |
2 | 3 | using System.Diagnostics;
|
3 | 4 | using System.Text;
|
4 |
| -using RabbitMQ.Client; |
5 | 5 |
|
6 | 6 | const ushort MAX_OUTSTANDING_CONFIRMS = 256;
|
7 | 7 |
|
@@ -94,27 +94,34 @@ async Task PublishMessagesInBatchAsync()
|
94 | 94 | ValueTask publishTask = channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body, mandatory: true, basicProperties: props);
|
95 | 95 | publishTasks.Add(publishTask);
|
96 | 96 |
|
97 |
| - // NOTE: [publishTasks] should be published after the final message has been added, |
98 |
| - // even if the # of tasks it contains isn't equal to [batchSize]. |
99 |
| - if (publishTasks.Count == batchSize || i+1 == MESSAGE_COUNT) |
| 97 | + await MaybeAwaitPublishes(publishTasks, batchSize); |
| 98 | + } |
| 99 | + |
| 100 | + // Await any remaining tasks in case message count was not |
| 101 | + // evenly divisible by batch size. |
| 102 | + await MaybeAwaitPublishes(publishTasks, 0); |
| 103 | + |
| 104 | + sw.Stop(); |
| 105 | + Console.WriteLine($"{DateTime.Now} [INFO] published {MESSAGE_COUNT:N0} messages in batch in {sw.ElapsedMilliseconds:N0} ms"); |
| 106 | +} |
| 107 | + |
| 108 | +static async Task MaybeAwaitPublishes(List<ValueTask> publishTasks, int batchSize) |
| 109 | +{ |
| 110 | + if (publishTasks.Count >= batchSize) |
| 111 | + { |
| 112 | + foreach (ValueTask pt in publishTasks) |
100 | 113 | {
|
101 |
| - foreach (ValueTask pt in publishTasks) |
| 114 | + try |
102 | 115 | {
|
103 |
| - try |
104 |
| - { |
105 |
| - await pt; |
106 |
| - } |
107 |
| - catch (Exception ex) |
108 |
| - { |
109 |
| - Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack or return, ex: '{ex}'"); |
110 |
| - } |
| 116 | + await pt; |
| 117 | + } |
| 118 | + catch (Exception ex) |
| 119 | + { |
| 120 | + Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack or return, ex: '{ex}'"); |
111 | 121 | }
|
112 |
| - publishTasks.Clear(); |
113 | 122 | }
|
| 123 | + publishTasks.Clear(); |
114 | 124 | }
|
115 |
| - |
116 |
| - sw.Stop(); |
117 |
| - Console.WriteLine($"{DateTime.Now} [INFO] published {MESSAGE_COUNT:N0} messages in batch in {sw.ElapsedMilliseconds:N0} ms"); |
118 | 125 | }
|
119 | 126 |
|
120 | 127 | async Task HandlePublishConfirmsAsynchronously()
|
|
0 commit comments