Skip to content

Commit 97062ac

Browse files
shmurchiJohnMormanReubenBond
authored
Close connections asynchronously and improve gracefulness (#7006)
* Close connections asynchronously and improve gracefulness (#6762) * Lock around connection cleanup so we don't call .Complete() and .CancelPendingFlush()/.CancelPendingRead() on the same pipe which results in an ObjectDisposedException * Close connections asynchronously and improve gracefulness Co-authored-by: ReubenBond <[email protected]> * Adding await for GatewayListNotification * Updating GatewayManager interfaces Co-authored-by: John Morman <[email protected]> Co-authored-by: ReubenBond <[email protected]>
1 parent 7ca97a2 commit 97062ac

13 files changed

+488
-343
lines changed

src/Orleans.Connections.Security/Security/DuplexPipeStream.cs

+215-141
Large diffs are not rendered by default.

src/Orleans.Connections.Security/Security/DuplexPipeStreamAdapter.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, Func<Stream, TStream> cre
2020
}
2121

2222
public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, StreamPipeReaderOptions readerOptions, StreamPipeWriterOptions writerOptions, Func<Stream, TStream> createStream) :
23-
base(duplexPipe.Input, duplexPipe.Output)
23+
base(duplexPipe)
2424
{
2525
var stream = createStream(this);
2626
Stream = stream;

src/Orleans.Connections.Security/Security/TlsClientConnectionMiddleware.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ await sslStream.AuthenticateAsClientAsync(
143143
}
144144
catch (Exception ex)
145145
{
146-
_logger?.LogWarning(1, ex, "Authentication failed: {Exception}", ex);
146+
_logger?.LogWarning(1, ex, "Authentication failed");
147147
#if NETCOREAPP
148148
await sslStream.DisposeAsync();
149149
#else

src/Orleans.Connections.Security/Security/TlsServerConnectionMiddleware.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ await sslStream.AuthenticateAsServerAsync(
186186
}
187187
catch (Exception ex)
188188
{
189-
_logger?.LogWarning(1, ex, "Authentication failed: {Exception}", ex);
189+
_logger?.LogWarning(1, ex, "Authentication failed");
190190
#if NETCOREAPP
191191
await sslStream.DisposeAsync();
192192
#else

src/Orleans.Core/Messaging/GatewayManager.cs

+14-13
Original file line numberDiff line numberDiff line change
@@ -234,11 +234,11 @@ internal void ExpediteUpdateLiveGatewaysSnapshot()
234234
}
235235

236236
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
237-
public void GatewayListNotification(IEnumerable<Uri> gateways)
237+
public async Task GatewayListNotification(IEnumerable<Uri> gateways)
238238
{
239239
try
240240
{
241-
UpdateLiveGatewaysSnapshot(gateways.Select(gw => gw.ToGatewayAddress()), gatewayListProvider.MaxStaleness);
241+
await UpdateLiveGatewaysSnapshot(gateways.Select(gw => gw.ToGatewayAddress()), gatewayListProvider.MaxStaleness);
242242
}
243243
catch (Exception exc)
244244
{
@@ -261,8 +261,7 @@ internal async Task RefreshSnapshotLiveGateways_TimerCallback(object context)
261261
logger.LogDebug("Discovered {GatewayCount} gateways: {Gateways}", refreshedGateways.Count, Utils.EnumerableToString(refreshedGateways));
262262
}
263263

264-
// the next one will grab the lock.
265-
UpdateLiveGatewaysSnapshot(refreshedGateways, gatewayListProvider.MaxStaleness);
264+
await UpdateLiveGatewaysSnapshot(refreshedGateways, gatewayListProvider.MaxStaleness);
266265
}
267266
catch (Exception exc)
268267
{
@@ -271,13 +270,14 @@ internal async Task RefreshSnapshotLiveGateways_TimerCallback(object context)
271270
}
272271

273272
// This function is called asynchronously from gateway refresh timer.
274-
private void UpdateLiveGatewaysSnapshot(IEnumerable<SiloAddress> refreshedGateways, TimeSpan maxStaleness)
273+
private async Task UpdateLiveGatewaysSnapshot(IEnumerable<SiloAddress> refreshedGateways, TimeSpan maxStaleness)
275274
{
276-
// this is a short lock, protecting the access to knownDead, knownMasked and cachedLiveGateways.
275+
List<SiloAddress> connectionsToKeepAlive;
276+
277+
// This is a short lock, protecting the access to knownDead, knownMasked and cachedLiveGateways.
277278
lock (lockable)
278279
{
279280
// now take whatever listProvider gave us and exclude those we think are dead.
280-
281281
var live = new List<SiloAddress>();
282282
var now = DateTime.UtcNow;
283283

@@ -340,8 +340,8 @@ private void UpdateLiveGatewaysSnapshot(IEnumerable<SiloAddress> refreshedGatewa
340340
if (logger.IsEnabled(LogLevel.Information))
341341
{
342342
logger.Info(ErrorCode.GatewayManager_FoundKnownGateways,
343-
"Refreshed the live Gateway list. Found {0} gateways from Gateway listProvider: {1}. Picked only known live out of them. Now has {2} live Gateways: {3}. Previous refresh time was = {4}",
344-
knownGateways.Count,
343+
"Refreshed the live gateway list. Found {0} gateways from gateway list provider: {1}. Picked only known live out of them. Now has {2} live gateways: {3}. Previous refresh time was = {4}",
344+
knownGateways.Count,
345345
Utils.EnumerableToString(knownGateways),
346346
cachedLiveGateways.Count,
347347
Utils.EnumerableToString(cachedLiveGateways),
@@ -351,13 +351,14 @@ private void UpdateLiveGatewaysSnapshot(IEnumerable<SiloAddress> refreshedGatewa
351351
// Close connections to known dead connections, but keep the "masked" ones.
352352
// Client will not send any new request to the "masked" connections, but might still
353353
// receive responses
354-
var connectionsToKeepAlive = new List<SiloAddress>(live);
354+
connectionsToKeepAlive = new List<SiloAddress>(live);
355355
connectionsToKeepAlive.AddRange(knownMasked.Select(e => e.Key));
356-
this.CloseEvictedGatewayConnections(connectionsToKeepAlive);
357356
}
357+
358+
await this.CloseEvictedGatewayConnections(connectionsToKeepAlive);
358359
}
359360

360-
private void CloseEvictedGatewayConnections(List<SiloAddress> liveGateways)
361+
private async Task CloseEvictedGatewayConnections(List<SiloAddress> liveGateways)
361362
{
362363
if (this.connectionManager == null) return;
363364

@@ -381,7 +382,7 @@ private void CloseEvictedGatewayConnections(List<SiloAddress> liveGateways)
381382
this.logger.LogInformation("Closing connection to {Endpoint} because it has been marked as dead", address);
382383
}
383384

384-
this.connectionManager.Close(address);
385+
await this.connectionManager.CloseAsync(address);
385386
}
386387
}
387388
}

src/Orleans.Core/Messaging/IGatewayListProvider.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public interface IGatewayListProvider
3636
/// </summary>
3737
public interface IGatewayListListener
3838
{
39-
void GatewayListNotification(IEnumerable<Uri> gateways);
39+
Task GatewayListNotification(IEnumerable<Uri> gateways);
4040
}
4141

4242
/// <summary>

0 commit comments

Comments
 (0)