Skip to content

Commit b2025e2

Browse files
committed
Drain and log pending work when AsyncConsumerDispatcher loop ends.
1 parent c48822c commit b2025e2

File tree

2 files changed

+13
-9
lines changed

2 files changed

+13
-9
lines changed

Diff for: projects/RabbitMQ.Client/ConsumerDispatching/AsyncConsumerDispatcher.cs

+11
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System.Threading.Tasks;
44
using RabbitMQ.Client.Events;
55
using RabbitMQ.Client.Impl;
6+
using RabbitMQ.Client.Logging;
67

78
namespace RabbitMQ.Client.ConsumerDispatching
89
{
@@ -71,6 +72,16 @@ await _channel.OnCallbackExceptionAsync(CallbackExceptionEventArgs.Build(e, work
7172
throw;
7273
}
7374
}
75+
finally
76+
{
77+
while (_reader.TryRead(out WorkStruct work))
78+
{
79+
using (work)
80+
{
81+
ESLog.Warn($"discarding consumer work: {work.WorkType}");
82+
}
83+
}
84+
}
7485
}
7586
}
7687
}

Diff for: projects/RabbitMQ.Client/ConsumerDispatching/ConsumerDispatcherChannelBase.cs

+2-9
Original file line numberDiff line numberDiff line change
@@ -161,17 +161,10 @@ public async Task WaitForShutdownAsync()
161161
{
162162
try
163163
{
164-
await _worker
165-
.ConfigureAwait(false);
166-
167-
/*
168-
* rabbitmq/rabbitmq-dotnet-client#1751
169-
*
170-
* Wait for the worker first to ensure all items have been read out of the channel,
171-
* otherwise the following will never return (https://stackoverflow.com/a/66521303)
172-
*/
173164
await _reader.Completion
174165
.ConfigureAwait(false);
166+
await _worker
167+
.ConfigureAwait(false);
175168
}
176169
catch (AggregateException aex)
177170
{

0 commit comments

Comments
 (0)