Skip to content

Commit 9bbcb5e

Browse files
committed
Ensure broker-originated channel closure completes
Fixes #1749 * Ensure `Dispose` and `DisposeAsync` are idempotent and thread-safe. * Use TaskCompletionSource when `HandleChannelCloseAsync` runs to allow dispose methods to wait. * Use `Interlocked` for thread safety. * I like `_isDisposing` better. So sue me! * Move the `Interlocked.Exchange` code to a getter, for readability. * Minor nullable change.
1 parent 5b1c9cc commit 9bbcb5e

File tree

6 files changed

+245
-47
lines changed

6 files changed

+245
-47
lines changed

projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs

+31-7
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
using System;
3333
using System.Collections.Generic;
34+
using System.Diagnostics.CodeAnalysis;
3435
using System.Runtime.CompilerServices;
3536
using System.Threading;
3637
using System.Threading.Tasks;
@@ -48,6 +49,7 @@ internal sealed class AutorecoveringChannel : IChannel, IRecoverable
4849
private AutorecoveringConnection _connection;
4950
private RecoveryAwareChannel _innerChannel;
5051
private bool _disposed;
52+
private int _isDisposing;
5153

5254
private ushort _prefetchCountConsumer;
5355
private ushort _prefetchCountGlobal;
@@ -256,19 +258,25 @@ public override string ToString()
256258

257259
public async ValueTask DisposeAsync()
258260
{
259-
if (_disposed)
261+
if (IsDisposing)
260262
{
261263
return;
262264
}
263265

264-
if (IsOpen)
266+
try
265267
{
266-
await this.AbortAsync()
267-
.ConfigureAwait(false);
268-
}
268+
if (IsOpen)
269+
{
270+
await this.AbortAsync()
271+
.ConfigureAwait(false);
272+
}
269273

270-
_recordedConsumerTags.Clear();
271-
_disposed = true;
274+
_recordedConsumerTags.Clear();
275+
}
276+
finally
277+
{
278+
_disposed = true;
279+
}
272280
}
273281

274282
public ValueTask<ulong> GetNextPublishSequenceNumberAsync(CancellationToken cancellationToken = default) => InnerChannel.GetNextPublishSequenceNumberAsync(cancellationToken);
@@ -482,7 +490,23 @@ private void ThrowIfDisposed()
482490
ThrowDisposed();
483491
}
484492

493+
return;
494+
495+
[DoesNotReturn]
485496
static void ThrowDisposed() => throw new ObjectDisposedException(typeof(AutorecoveringChannel).FullName);
486497
}
498+
499+
private bool IsDisposing
500+
{
501+
get
502+
{
503+
if (Interlocked.Exchange(ref _isDisposing, 1) != 0)
504+
{
505+
return true;
506+
}
507+
508+
return false;
509+
}
510+
}
487511
}
488512
}

projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs

+24-5
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
using System;
3333
using System.Collections.Generic;
34+
using System.Diagnostics.CodeAnalysis;
3435
using System.Runtime.CompilerServices;
3536
using System.Threading;
3637
using System.Threading.Tasks;
@@ -50,6 +51,7 @@ internal sealed partial class AutorecoveringConnection : IConnection
5051

5152
private Connection _innerConnection;
5253
private bool _disposed;
54+
private int _isDisposing;
5355

5456
private Connection InnerConnection
5557
{
@@ -272,7 +274,7 @@ await RecordChannelAsync(autorecoveringChannel, channelsSemaphoreHeld: false, ca
272274

273275
public async ValueTask DisposeAsync()
274276
{
275-
if (_disposed)
277+
if (IsDisposing)
276278
{
277279
return;
278280
}
@@ -281,17 +283,18 @@ public async ValueTask DisposeAsync()
281283
{
282284
await _innerConnection.DisposeAsync()
283285
.ConfigureAwait(false);
286+
287+
_channels.Clear();
288+
_recordedEntitiesSemaphore.Dispose();
289+
_channelsSemaphore.Dispose();
290+
_recoveryCancellationTokenSource.Dispose();
284291
}
285292
catch (OperationInterruptedException)
286293
{
287294
// ignored, see rabbitmq/rabbitmq-dotnet-client#133
288295
}
289296
finally
290297
{
291-
_channels.Clear();
292-
_recordedEntitiesSemaphore.Dispose();
293-
_channelsSemaphore.Dispose();
294-
_recoveryCancellationTokenSource.Dispose();
295298
_disposed = true;
296299
}
297300
}
@@ -307,7 +310,23 @@ private void ThrowIfDisposed()
307310
ThrowDisposed();
308311
}
309312

313+
return;
314+
315+
[DoesNotReturn]
310316
static void ThrowDisposed() => throw new ObjectDisposedException(typeof(AutorecoveringConnection).FullName);
311317
}
318+
319+
private bool IsDisposing
320+
{
321+
get
322+
{
323+
if (Interlocked.Exchange(ref _isDisposing, 1) != 0)
324+
{
325+
return true;
326+
}
327+
328+
return false;
329+
}
330+
}
312331
}
313332
}

projects/RabbitMQ.Client/Impl/Channel.cs

+103-29
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,13 @@ internal partial class Channel : IChannel, IRecoverable
5959
private ShutdownEventArgs? _closeReason;
6060
public ShutdownEventArgs? CloseReason => Volatile.Read(ref _closeReason);
6161

62+
private TaskCompletionSource<bool>? _serverOriginatedChannelCloseTcs;
63+
6264
internal readonly IConsumerDispatcher ConsumerDispatcher;
6365

66+
private bool _disposed;
67+
private int _isDisposing;
68+
6469
public Channel(ISession session, CreateChannelOptions createChannelOptions)
6570
{
6671
ContinuationTimeout = createChannelOptions.ContinuationTimeout;
@@ -514,22 +519,41 @@ public override string ToString()
514519

515520
void IDisposable.Dispose()
516521
{
522+
if (_disposed)
523+
{
524+
return;
525+
}
526+
517527
Dispose(true);
518528
}
519529

520530
protected virtual void Dispose(bool disposing)
521531
{
532+
if (IsDisposing)
533+
{
534+
return;
535+
}
536+
522537
if (disposing)
523538
{
524-
if (IsOpen)
539+
try
525540
{
526-
this.AbortAsync().GetAwaiter().GetResult();
527-
}
541+
if (IsOpen)
542+
{
543+
this.AbortAsync().GetAwaiter().GetResult();
544+
}
528545

529-
ConsumerDispatcher.Dispose();
530-
_rpcSemaphore.Dispose();
531-
_confirmSemaphore.Dispose();
532-
_outstandingPublisherConfirmationsRateLimiter?.Dispose();
546+
_serverOriginatedChannelCloseTcs?.Task.Wait(TimeSpan.FromSeconds(5));
547+
548+
ConsumerDispatcher.Dispose();
549+
_rpcSemaphore.Dispose();
550+
_confirmSemaphore.Dispose();
551+
_outstandingPublisherConfirmationsRateLimiter?.Dispose();
552+
}
553+
finally
554+
{
555+
_disposed = true;
556+
}
533557
}
534558
}
535559

@@ -543,18 +567,37 @@ await DisposeAsyncCore()
543567

544568
protected virtual async ValueTask DisposeAsyncCore()
545569
{
546-
if (IsOpen)
570+
if (IsDisposing)
547571
{
548-
await this.AbortAsync().ConfigureAwait(false);
572+
return;
549573
}
550574

551-
ConsumerDispatcher.Dispose();
552-
_rpcSemaphore.Dispose();
553-
_confirmSemaphore.Dispose();
554-
if (_outstandingPublisherConfirmationsRateLimiter is not null)
575+
try
555576
{
556-
await _outstandingPublisherConfirmationsRateLimiter.DisposeAsync()
557-
.ConfigureAwait(false);
577+
if (IsOpen)
578+
{
579+
await this.AbortAsync().ConfigureAwait(false);
580+
}
581+
582+
if (_serverOriginatedChannelCloseTcs is not null)
583+
{
584+
await _serverOriginatedChannelCloseTcs.Task.WaitAsync(TimeSpan.FromSeconds(5))
585+
.ConfigureAwait(false);
586+
}
587+
588+
ConsumerDispatcher.Dispose();
589+
_rpcSemaphore.Dispose();
590+
_confirmSemaphore.Dispose();
591+
592+
if (_outstandingPublisherConfirmationsRateLimiter is not null)
593+
{
594+
await _outstandingPublisherConfirmationsRateLimiter.DisposeAsync()
595+
.ConfigureAwait(false);
596+
}
597+
}
598+
finally
599+
{
600+
_disposed = true;
558601
}
559602
}
560603

@@ -651,23 +694,41 @@ protected virtual ulong AdjustDeliveryTag(ulong deliveryTag)
651694

652695
protected async Task<bool> HandleChannelCloseAsync(IncomingCommand cmd, CancellationToken cancellationToken)
653696
{
654-
var channelClose = new ChannelClose(cmd.MethodSpan);
655-
SetCloseReason(new ShutdownEventArgs(ShutdownInitiator.Peer,
656-
channelClose._replyCode,
657-
channelClose._replyText,
658-
channelClose._classId,
659-
channelClose._methodId));
697+
TaskCompletionSource<bool>? serverOriginatedChannelCloseTcs = _serverOriginatedChannelCloseTcs;
698+
if (serverOriginatedChannelCloseTcs is null)
699+
{
700+
// Attempt to assign the new TCS only if _tcs is still null
701+
_ = Interlocked.CompareExchange(ref _serverOriginatedChannelCloseTcs,
702+
new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously), null);
703+
}
660704

661-
await Session.CloseAsync(_closeReason, notify: false)
662-
.ConfigureAwait(false);
705+
try
706+
{
707+
var channelClose = new ChannelClose(cmd.MethodSpan);
708+
SetCloseReason(new ShutdownEventArgs(ShutdownInitiator.Peer,
709+
channelClose._replyCode,
710+
channelClose._replyText,
711+
channelClose._classId,
712+
channelClose._methodId));
663713

664-
var method = new ChannelCloseOk();
665-
await ModelSendAsync(in method, cancellationToken)
666-
.ConfigureAwait(false);
714+
await Session.CloseAsync(_closeReason, notify: false)
715+
.ConfigureAwait(false);
667716

668-
await Session.NotifyAsync(cancellationToken)
669-
.ConfigureAwait(false);
670-
return true;
717+
var method = new ChannelCloseOk();
718+
await ModelSendAsync(in method, cancellationToken)
719+
.ConfigureAwait(false);
720+
721+
await Session.NotifyAsync(cancellationToken)
722+
.ConfigureAwait(false);
723+
724+
_serverOriginatedChannelCloseTcs?.TrySetResult(true);
725+
return true;
726+
}
727+
catch (Exception ex)
728+
{
729+
_serverOriginatedChannelCloseTcs?.TrySetException(ex);
730+
throw;
731+
}
671732
}
672733

673734
protected async Task<bool> HandleChannelCloseOkAsync(IncomingCommand cmd, CancellationToken cancellationToken)
@@ -1587,5 +1648,18 @@ private Task<bool> DispatchCommandAsync(IncomingCommand cmd, CancellationToken c
15871648
}
15881649
}
15891650
}
1651+
1652+
private bool IsDisposing
1653+
{
1654+
get
1655+
{
1656+
if (Interlocked.Exchange(ref _isDisposing, 1) != 0)
1657+
{
1658+
return true;
1659+
}
1660+
1661+
return false;
1662+
}
1663+
}
15901664
}
15911665
}

projects/RabbitMQ.Client/Impl/Connection.cs

+22-6
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
using System;
3333
using System.Collections.Generic;
3434
using System.Diagnostics;
35+
using System.Diagnostics.CodeAnalysis;
3536
using System.IO;
3637
using System.Runtime.CompilerServices;
3738
using System.Threading;
@@ -46,6 +47,7 @@ namespace RabbitMQ.Client.Framing
4647
internal sealed partial class Connection : IConnection
4748
{
4849
private bool _disposed;
50+
private int _isDisposing;
4951
private volatile bool _closed;
5052

5153
private readonly ConnectionConfig _config;
@@ -489,7 +491,7 @@ internal ValueTask WriteAsync(RentedMemory frames, CancellationToken cancellatio
489491

490492
public async ValueTask DisposeAsync()
491493
{
492-
if (_disposed)
494+
if (IsDisposing)
493495
{
494496
return;
495497
}
@@ -523,23 +525,37 @@ private void ThrowIfDisposed()
523525
{
524526
if (_disposed)
525527
{
526-
ThrowObjectDisposedException();
528+
ThrowDisposed();
527529
}
528530

529-
static void ThrowObjectDisposedException()
530-
{
531-
throw new ObjectDisposedException(typeof(Connection).FullName);
532-
}
531+
return;
532+
533+
[DoesNotReturn]
534+
static void ThrowDisposed() => throw new ObjectDisposedException(typeof(Connection).FullName);
533535
}
534536

535537
public override string ToString()
536538
{
537539
return $"Connection({_id},{Endpoint})";
538540
}
539541

542+
[DoesNotReturn]
540543
private static void ThrowAlreadyClosedException(ShutdownEventArgs closeReason)
541544
{
542545
throw new AlreadyClosedException(closeReason);
543546
}
547+
548+
private bool IsDisposing
549+
{
550+
get
551+
{
552+
if (Interlocked.Exchange(ref _isDisposing, 1) != 0)
553+
{
554+
return true;
555+
}
556+
557+
return false;
558+
}
559+
}
544560
}
545561
}

0 commit comments

Comments
 (0)