Skip to content

Commit 6c1ec23

Browse files
committed
Ensure DisposeAsync and Dispose are idempotent
Fixes #1749 * Add test project for GH #1749. * Add `FirstChanceException` logging in #1749 test project. * Add code to repeatedly close connection. * Allow passing hostname to GH-1749 program as the first arg. * Toxiproxy tests fixup. * Ensure `_disposed` is set in `finally` block. * Add lock object for disposal of Channels and Connections. Note: a `SemaphoreSlim` can't be used because it must be disposed as well, and that can't happen cleanly in a `Dispose` method. * Add basic test to see what dispose after a channel exception does. * Modify `CreateChannel` app to try and trigger GH1751 * Add TCS for server-originated channel closure. This is the "real" fix for the issue.
1 parent fdc03ca commit 6c1ec23

File tree

11 files changed

+770
-369
lines changed

11 files changed

+770
-369
lines changed

RabbitMQDotNetClient.sln

+7
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GH-1647", "projects\Applica
4646
EndProject
4747
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "PublisherConfirms", "projects\Applications\PublisherConfirms\PublisherConfirms.csproj", "{13149F73-2CDB-4ECF-BF2C-403860045751}"
4848
EndProject
49+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GH-1749", "projects\Applications\GH-1749\GH-1749.csproj", "{725D9986-ACD1-424E-AF4C-2BEB407D2BD9}"
50+
EndProject
4951
Global
5052
GlobalSection(SolutionConfigurationPlatforms) = preSolution
5153
Debug|Any CPU = Debug|Any CPU
@@ -108,6 +110,10 @@ Global
108110
{13149F73-2CDB-4ECF-BF2C-403860045751}.Debug|Any CPU.Build.0 = Debug|Any CPU
109111
{13149F73-2CDB-4ECF-BF2C-403860045751}.Release|Any CPU.ActiveCfg = Release|Any CPU
110112
{13149F73-2CDB-4ECF-BF2C-403860045751}.Release|Any CPU.Build.0 = Release|Any CPU
113+
{725D9986-ACD1-424E-AF4C-2BEB407D2BD9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
114+
{725D9986-ACD1-424E-AF4C-2BEB407D2BD9}.Debug|Any CPU.Build.0 = Debug|Any CPU
115+
{725D9986-ACD1-424E-AF4C-2BEB407D2BD9}.Release|Any CPU.ActiveCfg = Release|Any CPU
116+
{725D9986-ACD1-424E-AF4C-2BEB407D2BD9}.Release|Any CPU.Build.0 = Release|Any CPU
111117
EndGlobalSection
112118
GlobalSection(SolutionProperties) = preSolution
113119
HideSolutionNode = FALSE
@@ -123,6 +129,7 @@ Global
123129
{AB5B7C53-D7EC-4985-A6DE-70178E4B688A} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704}
124130
{64ED07BF-4D77-47CD-AF4F-5B4525686FA1} = {D21B282C-49E6-4A30-887B-9626D94B8D69}
125131
{13149F73-2CDB-4ECF-BF2C-403860045751} = {D21B282C-49E6-4A30-887B-9626D94B8D69}
132+
{725D9986-ACD1-424E-AF4C-2BEB407D2BD9} = {D21B282C-49E6-4A30-887B-9626D94B8D69}
126133
EndGlobalSection
127134
GlobalSection(ExtensibilityGlobals) = postSolution
128135
SolutionGuid = {3C6A0C44-FA63-4101-BBF9-2598641167D1}

projects/Applications/CreateChannel/Program.cs

+59-18
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,12 @@
3030
//---------------------------------------------------------------------------
3131

3232
using System;
33+
using System.Collections.Generic;
3334
using System.Diagnostics;
34-
using System.Threading;
3535
using System.Threading.Tasks;
3636

3737
using RabbitMQ.Client;
38+
using RabbitMQ.Client.Exceptions;
3839

3940
namespace CreateChannel
4041
{
@@ -44,49 +45,89 @@ public static class Program
4445
private const int ChannelsToOpen = 50;
4546

4647
private static int channelsOpened;
47-
private static AutoResetEvent doneEvent;
4848

4949
public static async Task Main()
5050
{
51-
doneEvent = new AutoResetEvent(false);
51+
var doneTcs = new TaskCompletionSource<bool>();
5252

5353
var connectionFactory = new ConnectionFactory { };
5454
await using IConnection connection = await connectionFactory.CreateConnectionAsync();
5555

5656
var watch = Stopwatch.StartNew();
57-
_ = Task.Run(async () =>
57+
var workTask = Task.Run(async () =>
5858
{
59-
var channels = new IChannel[ChannelsToOpen];
60-
for (int i = 0; i < Repeats; i++)
59+
try
6160
{
62-
for (int j = 0; j < channels.Length; j++)
61+
var channelOpenTasks = new List<Task<IChannel>>();
62+
var channelDisposeTasks = new List<ValueTask>();
63+
var channels = new List<IChannel>();
64+
for (int i = 0; i < Repeats; i++)
6365
{
64-
channels[j] = await connection.CreateChannelAsync();
65-
channelsOpened++;
66-
}
66+
for (int j = 0; j < ChannelsToOpen; j++)
67+
{
68+
channelOpenTasks.Add(connection.CreateChannelAsync());
69+
}
6770

68-
for (int j = 0; j < channels.Length; j++)
69-
{
70-
await channels[j].DisposeAsync();
71+
for (int j = 0; j < channelOpenTasks.Count; j++)
72+
{
73+
IChannel ch = await channelOpenTasks[j];
74+
if (j % 8 == 0)
75+
{
76+
try
77+
{
78+
await ch.QueueDeclarePassiveAsync(Guid.NewGuid().ToString());
79+
}
80+
catch (OperationInterruptedException)
81+
{
82+
await ch.DisposeAsync();
83+
}
84+
catch (Exception ex)
85+
{
86+
_ = Console.Error.WriteLineAsync($"{DateTime.Now:s} [ERROR] {ex}");
87+
}
88+
}
89+
else
90+
{
91+
channels.Add(ch);
92+
channelsOpened++;
93+
}
94+
}
95+
channelOpenTasks.Clear();
96+
97+
for (int j = 0; j < channels.Count; j++)
98+
{
99+
channelDisposeTasks.Add(channels[j].DisposeAsync());
100+
}
101+
102+
for (int j = 0; j < channels.Count; j++)
103+
{
104+
await channelDisposeTasks[j];
105+
}
106+
channelDisposeTasks.Clear();
71107
}
72-
}
73108

74-
doneEvent.Set();
109+
doneTcs.SetResult(true);
110+
}
111+
catch (Exception ex)
112+
{
113+
doneTcs.SetException(ex);
114+
}
75115
});
76116

77117
Console.WriteLine($"{Repeats} times opening {ChannelsToOpen} channels on a connection. => Total channel open/close: {Repeats * ChannelsToOpen}");
78118
Console.WriteLine();
79119
Console.WriteLine("Opened");
80-
while (!doneEvent.WaitOne(500))
120+
while (false == doneTcs.Task.IsCompleted)
81121
{
82122
Console.WriteLine($"{channelsOpened,5}");
123+
await Task.Delay(150);
83124
}
84125
watch.Stop();
85126
Console.WriteLine($"{channelsOpened,5}");
86127
Console.WriteLine();
87-
Console.WriteLine($"Took {watch.Elapsed.TotalMilliseconds} ms");
128+
Console.WriteLine($"Took {watch.Elapsed}");
88129

89-
Console.ReadLine();
130+
await workTask;
90131
}
91132
}
92133
}

projects/Applications/GH-1647/GH-1647.csproj

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
</PropertyGroup>
1010

1111
<ItemGroup>
12-
<ProjectReference Include="../../RabbitMQ.Client\RabbitMQ.Client.csproj" />
12+
<ProjectReference Include="../../RabbitMQ.Client/RabbitMQ.Client.csproj" />
1313
</ItemGroup>
1414

1515
</Project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<OutputType>Exe</OutputType>
5+
<TargetFramework>net8.0</TargetFramework>
6+
<RootNamespace>GH_1749</RootNamespace>
7+
<ImplicitUsings>enable</ImplicitUsings>
8+
<Nullable>enable</Nullable>
9+
</PropertyGroup>
10+
11+
<ItemGroup>
12+
<ProjectReference Include="../../RabbitMQ.Client/RabbitMQ.Client.csproj" />
13+
</ItemGroup>
14+
15+
</Project>
+199
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
8+
//
9+
// Licensed under the Apache License, Version 2.0 (the "License");
10+
// you may not use this file except in compliance with the License.
11+
// You may obtain a copy of the License at
12+
//
13+
// https://www.apache.org/licenses/LICENSE-2.0
14+
//
15+
// Unless required by applicable law or agreed to in writing, software
16+
// distributed under the License is distributed on an "AS IS" BASIS,
17+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
// See the License for the specific language governing permissions and
19+
// limitations under the License.
20+
//---------------------------------------------------------------------------
21+
//
22+
// The MPL v2.0:
23+
//
24+
//---------------------------------------------------------------------------
25+
// This Source Code Form is subject to the terms of the Mozilla Public
26+
// License, v. 2.0. If a copy of the MPL was not distributed with this
27+
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
28+
//
29+
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
30+
//---------------------------------------------------------------------------
31+
32+
#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task
33+
34+
using System.Runtime.ExceptionServices;
35+
using RabbitMQ.Client;
36+
using RabbitMQ.Client.Events;
37+
using RabbitMQ.Client.Exceptions;
38+
39+
namespace GH_1749
40+
{
41+
class GH1749Consumer : AsyncDefaultBasicConsumer
42+
{
43+
public GH1749Consumer(IChannel channel) : base(channel)
44+
{
45+
}
46+
47+
protected override Task OnCancelAsync(string[] consumerTags, CancellationToken cancellationToken = default)
48+
{
49+
Console.WriteLine("{0} [INFO] OnCancelAsync, tags[0]: {1}", DateTime.Now.ToString("s"), consumerTags[0]);
50+
return base.OnCancelAsync(consumerTags, cancellationToken);
51+
}
52+
}
53+
54+
static class Program
55+
{
56+
const string DefaultHostName = "localhost";
57+
const string ConnectionClientProvidedName = "GH_1749";
58+
static readonly CancellationTokenSource s_cancellationTokenSource = new();
59+
static readonly CancellationToken s_cancellationToken = s_cancellationTokenSource.Token;
60+
61+
static async Task Main(string[] args)
62+
{
63+
string hostname = DefaultHostName;
64+
if (args.Length > 0)
65+
{
66+
hostname = args[0];
67+
}
68+
69+
AppDomain.CurrentDomain.FirstChanceException += CurrentDomain_FirstChanceException;
70+
71+
ConnectionFactory connectionFactory = new()
72+
{
73+
HostName = hostname,
74+
AutomaticRecoveryEnabled = false,
75+
TopologyRecoveryEnabled = false,
76+
RequestedConnectionTimeout = TimeSpan.FromSeconds(600),
77+
RequestedHeartbeat = TimeSpan.FromSeconds(600),
78+
UserName = "guest",
79+
Password = "guest",
80+
ClientProvidedName = ConnectionClientProvidedName
81+
};
82+
83+
var channelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true);
84+
await using var connection = await connectionFactory.CreateConnectionAsync();
85+
86+
connection.RecoverySucceededAsync += (object sender, AsyncEventArgs ea) =>
87+
{
88+
Console.WriteLine("{0} [INFO] saw RecoverySucceededAsync, event: {1}", Now, ea);
89+
return Task.CompletedTask;
90+
};
91+
92+
connection.CallbackExceptionAsync += Connection_CallbackExceptionAsync;
93+
94+
connection.ConnectionBlockedAsync += Connection_ConnectionBlockedAsync;
95+
connection.ConnectionUnblockedAsync += Connection_ConnectionUnblockedAsync;
96+
97+
connection.ConnectionRecoveryErrorAsync += Connection_ConnectionRecoveryErrorAsync;
98+
99+
connection.ConnectionShutdownAsync += (object sender, ShutdownEventArgs ea) =>
100+
{
101+
Console.WriteLine("{0} [INFO] saw ConnectionShutdownAsync, event: {1}", Now, ea);
102+
return Task.CompletedTask;
103+
};
104+
105+
connection.ConsumerTagChangeAfterRecoveryAsync += Connection_ConsumerTagChangeAfterRecoveryAsync;
106+
connection.QueueNameChangedAfterRecoveryAsync += Connection_QueueNameChangedAfterRecoveryAsync;
107+
108+
connection.RecoveringConsumerAsync += Connection_RecoveringConsumerAsync;
109+
110+
await using var channel = await connection.CreateChannelAsync(options: channelOptions);
111+
112+
channel.CallbackExceptionAsync += Channel_CallbackExceptionAsync;
113+
channel.ChannelShutdownAsync += Channel_ChannelShutdownAsync;
114+
115+
try
116+
{
117+
await channel.QueueDeclarePassiveAsync(Guid.NewGuid().ToString());
118+
}
119+
catch (OperationInterruptedException)
120+
{
121+
await channel.DisposeAsync();
122+
// rabbitmq-dotnet-client-1749
123+
// await Task.Delay(2000);
124+
}
125+
}
126+
127+
private static string Now => DateTime.Now.ToString("s");
128+
129+
private static Task Channel_CallbackExceptionAsync(object sender, CallbackExceptionEventArgs ea)
130+
{
131+
Console.WriteLine("{0} [INFO] channel saw CallbackExceptionAsync, event: {1}", Now, ea);
132+
Console.WriteLine("{0} [INFO] channel CallbackExceptionAsync, exception: {1}", Now, ea.Exception);
133+
return Task.CompletedTask;
134+
}
135+
136+
private static Task Channel_ChannelShutdownAsync(object sender, ShutdownEventArgs ea)
137+
{
138+
Console.WriteLine("{0} [INFO] saw ChannelShutdownAsync, event: {1}", Now, ea);
139+
return Task.CompletedTask;
140+
// rabbitmq-dotnet-client-1749
141+
// return Task.Delay(1000);
142+
}
143+
144+
private static void CurrentDomain_FirstChanceException(object? sender, FirstChanceExceptionEventArgs e)
145+
{
146+
if (e.Exception is ObjectDisposedException)
147+
{
148+
Console.WriteLine("{0} [INFO] saw FirstChanceException, exception: {1}", Now, e.Exception);
149+
}
150+
}
151+
152+
private static Task Connection_CallbackExceptionAsync(object sender, CallbackExceptionEventArgs ea)
153+
{
154+
Console.WriteLine("{0} [INFO] connection saw CallbackExceptionAsync, event: {1}", Now, ea);
155+
Console.WriteLine("{0} [INFO] connection CallbackExceptionAsync, exception: {1}", Now, ea.Exception);
156+
return Task.CompletedTask;
157+
}
158+
159+
private static Task Connection_ConnectionBlockedAsync(object sender, ConnectionBlockedEventArgs ea)
160+
{
161+
Console.WriteLine("{0} [INFO] saw ConnectionBlockedAsync, event: {1}", Now, ea);
162+
return Task.CompletedTask;
163+
}
164+
165+
private static Task Connection_ConnectionUnblockedAsync(object sender, AsyncEventArgs ea)
166+
{
167+
Console.WriteLine("{0} [INFO] saw ConnectionUnlockedAsync, event: {1}", Now, ea);
168+
return Task.CompletedTask;
169+
}
170+
171+
private static Task Connection_ConnectionRecoveryErrorAsync(object sender, ConnectionRecoveryErrorEventArgs ea)
172+
{
173+
Console.WriteLine("{0} [INFO] saw ConnectionRecoveryErrorAsync, event: {1}", Now, ea);
174+
Console.WriteLine("{0} [INFO] ConnectionRecoveryErrorAsync, exception: {1}", Now, ea.Exception);
175+
return Task.CompletedTask;
176+
}
177+
178+
private static Task Connection_ConsumerTagChangeAfterRecoveryAsync(object sender, ConsumerTagChangedAfterRecoveryEventArgs ea)
179+
{
180+
Console.WriteLine("{0} [INFO] saw ConsumerTagChangeAfterRecoveryAsync, event: {1}", Now, ea);
181+
Console.WriteLine("{0} [INFO] ConsumerTagChangeAfterRecoveryAsync, tags: {1} {2}", Now, ea.TagBefore, ea.TagAfter);
182+
return Task.CompletedTask;
183+
}
184+
185+
private static Task Connection_QueueNameChangedAfterRecoveryAsync(object sender, QueueNameChangedAfterRecoveryEventArgs ea)
186+
{
187+
Console.WriteLine("{0} [INFO] saw QueueNameChangedAfterRecoveryAsync, event: {1}", Now, ea);
188+
Console.WriteLine("{0} [INFO] QueueNameChangedAfterRecoveryAsync, queue names: {1} {2}", Now, ea.NameBefore, ea.NameAfter);
189+
return Task.CompletedTask;
190+
}
191+
192+
private static Task Connection_RecoveringConsumerAsync(object sender, RecoveringConsumerEventArgs ea)
193+
{
194+
Console.WriteLine("{0} [INFO] saw RecoveringConsumerAsync, event: {1}, tag: {2}", Now, ea, ea.ConsumerTag);
195+
return Task.CompletedTask;
196+
}
197+
}
198+
}
199+

0 commit comments

Comments
 (0)