Skip to content

Commit 2718d46

Browse files
committed
Fix very rare deadlock
Fixes #1751 * Drain and log pending work when AsyncConsumerDispatcher loop ends. * Make quiescing thread safe. * Ensure that dequeued RPC continuations are always disposed. Found by @DenisMayorko
1 parent 5f4621f commit 2718d46

File tree

4 files changed

+54
-23
lines changed

4 files changed

+54
-23
lines changed

Diff for: projects/RabbitMQ.Client/ConsumerDispatching/AsyncConsumerDispatcher.cs

+11
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System.Threading.Tasks;
44
using RabbitMQ.Client.Events;
55
using RabbitMQ.Client.Impl;
6+
using RabbitMQ.Client.Logging;
67

78
namespace RabbitMQ.Client.ConsumerDispatching
89
{
@@ -71,6 +72,16 @@ await _channel.OnCallbackExceptionAsync(CallbackExceptionEventArgs.Build(e, work
7172
throw;
7273
}
7374
}
75+
finally
76+
{
77+
while (_reader.TryRead(out WorkStruct work))
78+
{
79+
using (work)
80+
{
81+
ESLog.Warn($"discarding consumer work: {work.WorkType}");
82+
}
83+
}
84+
}
7485
}
7586
}
7687
}

Diff for: projects/RabbitMQ.Client/ConsumerDispatching/ConsumerDispatcherChannelBase.cs

+21-8
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ internal abstract class ConsumerDispatcherChannelBase : ConsumerDispatcherBase,
4444
private readonly System.Threading.Channels.ChannelWriter<WorkStruct> _writer;
4545
private readonly Task _worker;
4646
private readonly ushort _concurrency;
47-
private bool _quiesce = false;
47+
private long _isQuiescing;
4848
private bool _disposed;
4949

5050
internal ConsumerDispatcherChannelBase(Impl.Channel channel, ushort concurrency)
@@ -79,15 +79,15 @@ internal ConsumerDispatcherChannelBase(Impl.Channel channel, ushort concurrency)
7979
}
8080
}
8181

82-
public bool IsShutdown => _quiesce;
82+
public bool IsShutdown => IsQuiescing;
8383

8484
public ushort Concurrency => _concurrency;
8585

8686
public async ValueTask HandleBasicConsumeOkAsync(IAsyncBasicConsumer consumer, string consumerTag, CancellationToken cancellationToken)
8787
{
8888
cancellationToken.ThrowIfCancellationRequested();
8989

90-
if (false == _disposed && false == _quiesce)
90+
if (false == _disposed && false == IsQuiescing)
9191
{
9292
try
9393
{
@@ -110,7 +110,7 @@ public async ValueTask HandleBasicDeliverAsync(string consumerTag, ulong deliver
110110
{
111111
cancellationToken.ThrowIfCancellationRequested();
112112

113-
if (false == _disposed && false == _quiesce)
113+
if (false == _disposed && false == IsQuiescing)
114114
{
115115
IAsyncBasicConsumer consumer = GetConsumerOrDefault(consumerTag);
116116
var work = WorkStruct.CreateDeliver(consumer, consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body);
@@ -123,7 +123,7 @@ public async ValueTask HandleBasicCancelOkAsync(string consumerTag, Cancellation
123123
{
124124
cancellationToken.ThrowIfCancellationRequested();
125125

126-
if (false == _disposed && false == _quiesce)
126+
if (false == _disposed && false == IsQuiescing)
127127
{
128128
IAsyncBasicConsumer consumer = GetAndRemoveConsumer(consumerTag);
129129
WorkStruct work = WorkStruct.CreateCancelOk(consumer, consumerTag);
@@ -136,7 +136,7 @@ public async ValueTask HandleBasicCancelAsync(string consumerTag, CancellationTo
136136
{
137137
cancellationToken.ThrowIfCancellationRequested();
138138

139-
if (false == _disposed && false == _quiesce)
139+
if (false == _disposed && false == IsQuiescing)
140140
{
141141
IAsyncBasicConsumer consumer = GetAndRemoveConsumer(consumerTag);
142142
WorkStruct work = WorkStruct.CreateCancel(consumer, consumerTag);
@@ -147,7 +147,7 @@ await _writer.WriteAsync(work, cancellationToken)
147147

148148
public void Quiesce()
149149
{
150-
_quiesce = true;
150+
Interlocked.Exchange(ref _isQuiescing, 1);
151151
}
152152

153153
public async Task WaitForShutdownAsync()
@@ -157,7 +157,7 @@ public async Task WaitForShutdownAsync()
157157
return;
158158
}
159159

160-
if (_quiesce)
160+
if (IsQuiescing)
161161
{
162162
try
163163
{
@@ -193,6 +193,19 @@ await _worker
193193
}
194194
}
195195

196+
protected bool IsQuiescing
197+
{
198+
get
199+
{
200+
if (Interlocked.Read(ref _isQuiescing) == 1)
201+
{
202+
return true;
203+
}
204+
205+
return false;
206+
}
207+
}
208+
196209
protected sealed override void ShutdownConsumer(IAsyncBasicConsumer consumer, ShutdownEventArgs reason)
197210
{
198211
_writer.TryWrite(WorkStruct.CreateShutdown(consumer, reason));

Diff for: projects/RabbitMQ.Client/Impl/Channel.cs

+18-14
Original file line numberDiff line numberDiff line change
@@ -744,11 +744,13 @@ protected async Task<bool> HandleChannelCloseOkAsync(IncomingCommand cmd, Cancel
744744
await FinishCloseAsync(cancellationToken)
745745
.ConfigureAwait(false);
746746

747-
if (_continuationQueue.TryPeek<ChannelCloseAsyncRpcContinuation>(out ChannelCloseAsyncRpcContinuation? k))
747+
if (_continuationQueue.TryPeek(out ChannelCloseAsyncRpcContinuation? k))
748748
{
749-
_continuationQueue.Next();
750-
await k.HandleCommandAsync(cmd)
751-
.ConfigureAwait(false);
749+
using (IRpcContinuation c = _continuationQueue.Next())
750+
{
751+
await k.HandleCommandAsync(cmd)
752+
.ConfigureAwait(false);
753+
}
752754
}
753755

754756
return true;
@@ -818,10 +820,12 @@ await ModelSendAsync(in replyMethod, cancellationToken)
818820

819821
protected async Task<bool> HandleConnectionSecureAsync(IncomingCommand cmd, CancellationToken cancellationToken)
820822
{
821-
var k = (ConnectionSecureOrTuneAsyncRpcContinuation)_continuationQueue.Next();
822-
await k.HandleCommandAsync(new IncomingCommand())
823-
.ConfigureAwait(false); // release the continuation.
824-
return true;
823+
using (var k = (ConnectionSecureOrTuneAsyncRpcContinuation)_continuationQueue.Next())
824+
{
825+
await k.HandleCommandAsync(new IncomingCommand())
826+
.ConfigureAwait(false); // release the continuation.
827+
return true;
828+
}
825829
}
826830

827831
protected async Task<bool> HandleConnectionStartAsync(IncomingCommand cmd, CancellationToken cancellationToken)
@@ -848,12 +852,12 @@ await Session.Connection.CloseAsync(reason, false,
848852

849853
protected async Task<bool> HandleConnectionTuneAsync(IncomingCommand cmd, CancellationToken cancellationToken)
850854
{
851-
// Note: `using` here to ensure instance is disposed
852-
using var k = (ConnectionSecureOrTuneAsyncRpcContinuation)_continuationQueue.Next();
853-
854-
// Note: releases the continuation and returns the buffers
855-
await k.HandleCommandAsync(cmd)
856-
.ConfigureAwait(false);
855+
using (var k = (ConnectionSecureOrTuneAsyncRpcContinuation)_continuationQueue.Next())
856+
{
857+
// Note: releases the continuation and returns the buffers
858+
await k.HandleCommandAsync(cmd)
859+
.ConfigureAwait(false);
860+
}
857861

858862
return true;
859863
}

Diff for: projects/RabbitMQ.Client/Impl/RpcContinuationQueue.cs

+4-1
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,10 @@ public void Enqueue(IRpcContinuation k)
9797
///</remarks>
9898
public void HandleChannelShutdown(ShutdownEventArgs reason)
9999
{
100-
Next().HandleChannelShutdown(reason);
100+
using (IRpcContinuation c = Next())
101+
{
102+
c.HandleChannelShutdown(reason);
103+
}
101104
}
102105

103106
///<summary>Retrieve the next waiting continuation.</summary>

0 commit comments

Comments
 (0)