Skip to content

Commit 30a244d

Browse files
committed
Ensure DisposeAsync and Dispose are idempotent
Fixes #1749 * Add test project * Add `FirstChanceException` logging. * Add code to repeatedly close connection. * Allow passing hostname to GH-1749 program as the first arg. * Toxiproxy tests fixup. * Standardize on `_disposedValue` name. * Ensure `_disposedValue` is set in `finally` block. * Add lock object for disposal of Channels and Connections. Note: a `SemaphoreSlim` can't be used because it must be disposed as well, and that can't happen cleanly in a `Dispose` method.
1 parent 5b64e27 commit 30a244d

14 files changed

+672
-103
lines changed

Diff for: RabbitMQDotNetClient.sln

+7
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GH-1647", "projects\Applica
4646
EndProject
4747
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "PublisherConfirms", "projects\Applications\PublisherConfirms\PublisherConfirms.csproj", "{13149F73-2CDB-4ECF-BF2C-403860045751}"
4848
EndProject
49+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GH-1749", "projects\Applications\GH-1749\GH-1749.csproj", "{725D9986-ACD1-424E-AF4C-2BEB407D2BD9}"
50+
EndProject
4951
Global
5052
GlobalSection(SolutionConfigurationPlatforms) = preSolution
5153
Debug|Any CPU = Debug|Any CPU
@@ -108,6 +110,10 @@ Global
108110
{13149F73-2CDB-4ECF-BF2C-403860045751}.Debug|Any CPU.Build.0 = Debug|Any CPU
109111
{13149F73-2CDB-4ECF-BF2C-403860045751}.Release|Any CPU.ActiveCfg = Release|Any CPU
110112
{13149F73-2CDB-4ECF-BF2C-403860045751}.Release|Any CPU.Build.0 = Release|Any CPU
113+
{725D9986-ACD1-424E-AF4C-2BEB407D2BD9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
114+
{725D9986-ACD1-424E-AF4C-2BEB407D2BD9}.Debug|Any CPU.Build.0 = Debug|Any CPU
115+
{725D9986-ACD1-424E-AF4C-2BEB407D2BD9}.Release|Any CPU.ActiveCfg = Release|Any CPU
116+
{725D9986-ACD1-424E-AF4C-2BEB407D2BD9}.Release|Any CPU.Build.0 = Release|Any CPU
111117
EndGlobalSection
112118
GlobalSection(SolutionProperties) = preSolution
113119
HideSolutionNode = FALSE
@@ -123,6 +129,7 @@ Global
123129
{AB5B7C53-D7EC-4985-A6DE-70178E4B688A} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704}
124130
{64ED07BF-4D77-47CD-AF4F-5B4525686FA1} = {D21B282C-49E6-4A30-887B-9626D94B8D69}
125131
{13149F73-2CDB-4ECF-BF2C-403860045751} = {D21B282C-49E6-4A30-887B-9626D94B8D69}
132+
{725D9986-ACD1-424E-AF4C-2BEB407D2BD9} = {D21B282C-49E6-4A30-887B-9626D94B8D69}
126133
EndGlobalSection
127134
GlobalSection(ExtensibilityGlobals) = postSolution
128135
SolutionGuid = {3C6A0C44-FA63-4101-BBF9-2598641167D1}

Diff for: projects/Applications/GH-1647/GH-1647.csproj

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
</PropertyGroup>
1010

1111
<ItemGroup>
12-
<ProjectReference Include="../../RabbitMQ.Client\RabbitMQ.Client.csproj" />
12+
<ProjectReference Include="../../RabbitMQ.Client/RabbitMQ.Client.csproj" />
1313
</ItemGroup>
1414

1515
</Project>

Diff for: projects/Applications/GH-1749/GH-1749.csproj

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<OutputType>Exe</OutputType>
5+
<TargetFramework>net8.0</TargetFramework>
6+
<RootNamespace>GH_1749</RootNamespace>
7+
<ImplicitUsings>enable</ImplicitUsings>
8+
<Nullable>enable</Nullable>
9+
</PropertyGroup>
10+
11+
<ItemGroup>
12+
<PackageReference Include="EasyNetQ.Management.Client" />
13+
</ItemGroup>
14+
15+
<ItemGroup>
16+
<ProjectReference Include="../../RabbitMQ.Client/RabbitMQ.Client.csproj" />
17+
</ItemGroup>
18+
19+
</Project>

Diff for: projects/Applications/GH-1749/Program.cs

+216
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
8+
//
9+
// Licensed under the Apache License, Version 2.0 (the "License");
10+
// you may not use this file except in compliance with the License.
11+
// You may obtain a copy of the License at
12+
//
13+
// https://www.apache.org/licenses/LICENSE-2.0
14+
//
15+
// Unless required by applicable law or agreed to in writing, software
16+
// distributed under the License is distributed on an "AS IS" BASIS,
17+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
// See the License for the specific language governing permissions and
19+
// limitations under the License.
20+
//---------------------------------------------------------------------------
21+
//
22+
// The MPL v2.0:
23+
//
24+
//---------------------------------------------------------------------------
25+
// This Source Code Form is subject to the terms of the Mozilla Public
26+
// License, v. 2.0. If a copy of the MPL was not distributed with this
27+
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
28+
//
29+
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
30+
//---------------------------------------------------------------------------
31+
32+
#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task
33+
34+
using System.Runtime.ExceptionServices;
35+
using RabbitMQ.Client;
36+
using RabbitMQ.Client.Events;
37+
38+
namespace GH_1749
39+
{
40+
class GH1749Consumer : AsyncDefaultBasicConsumer
41+
{
42+
public GH1749Consumer(IChannel channel) : base(channel)
43+
{
44+
}
45+
46+
protected override Task OnCancelAsync(string[] consumerTags, CancellationToken cancellationToken = default)
47+
{
48+
Console.WriteLine("{0} [INFO] OnCancelAsync, tags[0]: {1}", Util.Now, consumerTags[0]);
49+
return base.OnCancelAsync(consumerTags, cancellationToken);
50+
}
51+
}
52+
53+
static class Program
54+
{
55+
const string DefaultHostName = "localhost";
56+
const string ConnectionClientProvidedName = "GH_1749";
57+
static readonly CancellationTokenSource s_cancellationTokenSource = new();
58+
static readonly CancellationToken s_cancellationToken = s_cancellationTokenSource.Token;
59+
60+
static Util? s_util;
61+
62+
static async Task Main(string[] args)
63+
{
64+
string hostname = DefaultHostName;
65+
if (args.Length > 0)
66+
{
67+
hostname = args[0];
68+
}
69+
70+
s_util = new Util(hostname, "guest", "guest");
71+
72+
AppDomain.CurrentDomain.FirstChanceException += CurrentDomain_FirstChanceException;
73+
74+
ConnectionFactory connectionFactory = new()
75+
{
76+
HostName = hostname,
77+
AutomaticRecoveryEnabled = true,
78+
UserName = "guest",
79+
Password = "guest",
80+
ClientProvidedName = ConnectionClientProvidedName
81+
};
82+
83+
var channelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true);
84+
await using var connection = await connectionFactory.CreateConnectionAsync();
85+
86+
connection.RecoverySucceededAsync += (object sender, AsyncEventArgs ea) =>
87+
{
88+
Console.WriteLine("{0} [INFO] saw RecoverySucceededAsync, event: {1}", Now, ea);
89+
_ = CloseConnectionAsync();
90+
return Task.CompletedTask;
91+
};
92+
93+
connection.CallbackExceptionAsync += Connection_CallbackExceptionAsync;
94+
95+
connection.ConnectionBlockedAsync += Connection_ConnectionBlockedAsync;
96+
connection.ConnectionUnblockedAsync += Connection_ConnectionUnblockedAsync;
97+
98+
connection.ConnectionRecoveryErrorAsync += Connection_ConnectionRecoveryErrorAsync;
99+
100+
connection.ConnectionShutdownAsync += (object sender, ShutdownEventArgs ea) =>
101+
{
102+
Console.WriteLine("{0} [INFO] saw ConnectionShutdownAsync, event: {1}", Now, ea);
103+
return Task.CompletedTask;
104+
};
105+
106+
connection.ConsumerTagChangeAfterRecoveryAsync += Connection_ConsumerTagChangeAfterRecoveryAsync;
107+
connection.QueueNameChangedAfterRecoveryAsync += Connection_QueueNameChangedAfterRecoveryAsync;
108+
109+
connection.RecoveringConsumerAsync += Connection_RecoveringConsumerAsync;
110+
111+
await using var channel = await connection.CreateChannelAsync(options: channelOptions);
112+
113+
channel.CallbackExceptionAsync += Channel_CallbackExceptionAsync;
114+
channel.ChannelShutdownAsync += Channel_ChannelShutdownAsync;
115+
116+
QueueDeclareOk queue = await channel.QueueDeclareAsync();
117+
118+
var consumer = new GH1749Consumer(channel);
119+
await channel.BasicConsumeAsync(queue.QueueName, true, consumer);
120+
121+
_ = CloseConnectionAsync();
122+
123+
Console.WriteLine("{0} [INFO] consumer is running", Util.Now);
124+
Console.ReadLine();
125+
}
126+
127+
static async Task CloseConnectionAsync()
128+
{
129+
if (s_util is null)
130+
{
131+
throw new NullReferenceException("s_util");
132+
}
133+
134+
try
135+
{
136+
Console.WriteLine("{0} [INFO] start closing connection: {1}", Now, ConnectionClientProvidedName);
137+
await s_util.CloseConnectionAsync(ConnectionClientProvidedName);
138+
Console.WriteLine("{0} [INFO] done closing connection: {1}", Now, ConnectionClientProvidedName);
139+
}
140+
catch (Exception ex)
141+
{
142+
Console.Error.WriteLine("{0} [ERROR] error while closing connection: {1}", Now, ex);
143+
}
144+
}
145+
146+
private static string Now => Util.Now;
147+
148+
private static Task Channel_CallbackExceptionAsync(object sender, CallbackExceptionEventArgs ea)
149+
{
150+
Console.WriteLine("{0} [INFO] channel saw CallbackExceptionAsync, event: {1}", Now, ea);
151+
Console.WriteLine("{0} [INFO] channel CallbackExceptionAsync, exception: {1}", Now, ea.Exception);
152+
return Task.CompletedTask;
153+
}
154+
155+
private static Task Channel_ChannelShutdownAsync(object sender, ShutdownEventArgs ea)
156+
{
157+
Console.WriteLine("{0} [INFO] saw ChannelShutdownAsync, event: {1}", Now, ea);
158+
return Task.CompletedTask;
159+
}
160+
161+
private static void CurrentDomain_FirstChanceException(object? sender, FirstChanceExceptionEventArgs e)
162+
{
163+
if (e.Exception is ObjectDisposedException)
164+
{
165+
Console.WriteLine("{0} [INFO] saw FirstChanceException, exception: {1}", Now, e.Exception);
166+
}
167+
}
168+
169+
private static Task Connection_CallbackExceptionAsync(object sender, CallbackExceptionEventArgs ea)
170+
{
171+
Console.WriteLine("{0} [INFO] connection saw CallbackExceptionAsync, event: {1}", Now, ea);
172+
Console.WriteLine("{0} [INFO] connection CallbackExceptionAsync, exception: {1}", Now, ea.Exception);
173+
return Task.CompletedTask;
174+
}
175+
176+
private static Task Connection_ConnectionBlockedAsync(object sender, ConnectionBlockedEventArgs ea)
177+
{
178+
Console.WriteLine("{0} [INFO] saw ConnectionBlockedAsync, event: {1}", Now, ea);
179+
return Task.CompletedTask;
180+
}
181+
182+
private static Task Connection_ConnectionUnblockedAsync(object sender, AsyncEventArgs ea)
183+
{
184+
Console.WriteLine("{0} [INFO] saw ConnectionUnlockedAsync, event: {1}", Now, ea);
185+
return Task.CompletedTask;
186+
}
187+
188+
private static Task Connection_ConnectionRecoveryErrorAsync(object sender, ConnectionRecoveryErrorEventArgs ea)
189+
{
190+
Console.WriteLine("{0} [INFO] saw ConnectionRecoveryErrorAsync, event: {1}", Now, ea);
191+
Console.WriteLine("{0} [INFO] ConnectionRecoveryErrorAsync, exception: {1}", Now, ea.Exception);
192+
return Task.CompletedTask;
193+
}
194+
195+
private static Task Connection_ConsumerTagChangeAfterRecoveryAsync(object sender, ConsumerTagChangedAfterRecoveryEventArgs ea)
196+
{
197+
Console.WriteLine("{0} [INFO] saw ConsumerTagChangeAfterRecoveryAsync, event: {1}", Now, ea);
198+
Console.WriteLine("{0} [INFO] ConsumerTagChangeAfterRecoveryAsync, tags: {1} {2}", Now, ea.TagBefore, ea.TagAfter);
199+
return Task.CompletedTask;
200+
}
201+
202+
private static Task Connection_QueueNameChangedAfterRecoveryAsync(object sender, QueueNameChangedAfterRecoveryEventArgs ea)
203+
{
204+
Console.WriteLine("{0} [INFO] saw QueueNameChangedAfterRecoveryAsync, event: {1}", Now, ea);
205+
Console.WriteLine("{0} [INFO] QueueNameChangedAfterRecoveryAsync, queue names: {1} {2}", Now, ea.NameBefore, ea.NameAfter);
206+
return Task.CompletedTask;
207+
}
208+
209+
private static Task Connection_RecoveringConsumerAsync(object sender, RecoveringConsumerEventArgs ea)
210+
{
211+
Console.WriteLine("{0} [INFO] saw RecoveringConsumerAsync, event: {1}, tag: {2}", Now, ea, ea.ConsumerTag);
212+
return Task.CompletedTask;
213+
}
214+
}
215+
}
216+

0 commit comments

Comments
 (0)