Skip to content

Commit 0574a23

Browse files
authored
[Membership] Ignore stale silos when picking a probing intermediary (#9335)
1 parent ac738a4 commit 0574a23

File tree

6 files changed

+129
-81
lines changed

6 files changed

+129
-81
lines changed

src/Orleans.Runtime/MembershipService/ClusterHealthMonitor.cs

+5-5
Original file line numberDiff line numberDiff line change
@@ -323,12 +323,12 @@ private async Task OnProbeResultInternal(SiloHealthMonitor monitor, ProbeResult
323323
{
324324
if (probeResult.Status == ProbeResultStatus.Failed && probeResult.FailedProbeCount >= this.clusterMembershipOptions.CurrentValue.NumMissedProbesLimit)
325325
{
326-
await this.membershipService.TryToSuspectOrKill(monitor.SiloAddress).ConfigureAwait(false);
326+
await this.membershipService.TryToSuspectOrKill(monitor.TargetSiloAddress).ConfigureAwait(false);
327327
}
328328
}
329329
else if (probeResult.Status == ProbeResultStatus.Failed)
330330
{
331-
await this.membershipService.TryToSuspectOrKill(monitor.SiloAddress, probeResult.Intermediary).ConfigureAwait(false);
331+
await this.membershipService.TryToSuspectOrKill(monitor.TargetSiloAddress, probeResult.Intermediary).ConfigureAwait(false);
332332
}
333333
}
334334

@@ -341,7 +341,7 @@ bool IHealthCheckable.CheckHealth(DateTime lastCheckTime, out string reason)
341341
ok &= monitor.CheckHealth(lastCheckTime, out var monitorReason);
342342
if (!string.IsNullOrWhiteSpace(monitorReason))
343343
{
344-
var siloReason = $"Monitor for {monitor.SiloAddress} is degraded with: {monitorReason}.";
344+
var siloReason = $"Monitor for {monitor.TargetSiloAddress} is degraded with: {monitorReason}.";
345345
if (string.IsNullOrWhiteSpace(reason))
346346
{
347347
reason = siloReason;
@@ -375,7 +375,7 @@ public void Dispose()
375375
}
376376
catch (Exception exception)
377377
{
378-
log.LogError(exception, "Error disposing monitor for {SiloAddress}.", monitor.SiloAddress);
378+
log.LogError(exception, "Error disposing monitor for {SiloAddress}.", monitor.TargetSiloAddress);
379379
}
380380
}
381381
}
@@ -400,7 +400,7 @@ public async ValueTask DisposeAsync()
400400
}
401401
catch (Exception exception)
402402
{
403-
log.LogError(exception, "Error disposing monitor for {SiloAddress}.", monitor.SiloAddress);
403+
log.LogError(exception, "Error disposing monitor for {SiloAddress}.", monitor.TargetSiloAddress);
404404
}
405405
}
406406

src/Orleans.Runtime/MembershipService/SiloHealthMonitor.cs

+33-28
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ internal class SiloHealthMonitor : ITestAccessor, IHealthCheckable, IDisposable,
2222
private readonly IOptionsMonitor<ClusterMembershipOptions> _clusterMembershipOptions;
2323
private readonly IRemoteSiloProber _prober;
2424
private readonly ILocalSiloHealthMonitor _localSiloHealthMonitor;
25-
private readonly IClusterMembershipService _membershipService;
25+
private readonly MembershipTableManager _membershipService;
2626
private readonly ILocalSiloDetails _localSiloDetails;
2727
private readonly CancellationTokenSource _stoppingCancellation = new();
2828
private readonly object _lockObj = new();
@@ -59,10 +59,10 @@ public SiloHealthMonitor(
5959
IRemoteSiloProber remoteSiloProber,
6060
IAsyncTimerFactory asyncTimerFactory,
6161
ILocalSiloHealthMonitor localSiloHealthMonitor,
62-
IClusterMembershipService membershipService,
62+
MembershipTableManager membershipService,
6363
ILocalSiloDetails localSiloDetails)
6464
{
65-
SiloAddress = siloAddress;
65+
TargetSiloAddress = siloAddress;
6666
_clusterMembershipOptions = clusterMembershipOptions;
6767
_prober = remoteSiloProber;
6868
_localSiloHealthMonitor = localSiloHealthMonitor;
@@ -84,7 +84,7 @@ internal interface ITestAccessor
8484
/// <summary>
8585
/// The silo which this instance is responsible for.
8686
/// </summary>
87-
public SiloAddress SiloAddress { get; }
87+
public SiloAddress TargetSiloAddress { get; }
8888

8989
/// <summary>
9090
/// Whether or not this monitor is canceled.
@@ -153,28 +153,33 @@ public async ValueTask DisposeAsync()
153153

154154
private async Task Run()
155155
{
156-
ClusterMembershipSnapshot? activeMembersSnapshot = default;
156+
MembershipTableSnapshot? activeMembersSnapshot = default;
157157
SiloAddress[]? otherNodes = default;
158-
TimeSpan? overrideDelay = RandomTimeSpan.Next(_clusterMembershipOptions.CurrentValue.ProbeTimeout);
158+
var options = _clusterMembershipOptions.CurrentValue;
159+
TimeSpan? overrideDelay = RandomTimeSpan.Next(options.ProbeTimeout);
159160
while (await _pingTimer.NextTick(overrideDelay))
160161
{
161162
ProbeResult probeResult;
162163
overrideDelay = default;
164+
var now = DateTime.UtcNow;
163165

164166
try
165167
{
166168
// Discover the other active nodes in the cluster, if there are any.
167-
var membershipSnapshot = _membershipService.CurrentSnapshot;
169+
var membershipSnapshot = _membershipService.MembershipTableSnapshot;
168170
if (otherNodes is null || !ReferenceEquals(activeMembersSnapshot, membershipSnapshot))
169171
{
170172
activeMembersSnapshot = membershipSnapshot;
171-
otherNodes = membershipSnapshot.Members.Values
172-
.Where(v => v.Status == SiloStatus.Active && v.SiloAddress != this.SiloAddress && v.SiloAddress != _localSiloDetails.SiloAddress)
173+
otherNodes = membershipSnapshot.Entries.Values
174+
.Where(v => v.Status == SiloStatus.Active
175+
&& !v.SiloAddress.Equals(TargetSiloAddress)
176+
&& !v.SiloAddress.Equals(_localSiloDetails.SiloAddress)
177+
&& !v.HasMissedIAmAlives(options, now))
173178
.Select(s => s.SiloAddress)
174179
.ToArray();
175180
}
176181

177-
var isDirectProbe = !_clusterMembershipOptions.CurrentValue.EnableIndirectProbes || _failedProbes < _clusterMembershipOptions.CurrentValue.NumMissedProbesLimit - 1 || otherNodes.Length == 0;
182+
var isDirectProbe = !options.EnableIndirectProbes || _failedProbes < options.NumMissedProbesLimit - 1 || otherNodes.Length == 0;
178183
var timeout = GetTimeout(isDirectProbe);
179184
using var cancellation = new CancellationTokenSource(timeout);
180185

@@ -198,7 +203,7 @@ private async Task Run()
198203
if (probeResult.Status != ProbeResultStatus.Succeeded && probeResult.IntermediaryHealthDegradationScore > 0)
199204
{
200205
_log.LogInformation("Recusing unhealthy intermediary '{Intermediary}' and trying again with remaining nodes", intermediary);
201-
otherNodes = otherNodes.Where(node => !node.Equals(intermediary)).ToArray();
206+
otherNodes = [.. otherNodes.Where(node => !node.Equals(intermediary))];
202207
overrideDelay = TimeSpan.FromMilliseconds(250);
203208
}
204209
}
@@ -210,15 +215,15 @@ private async Task Run()
210215
}
211216
catch (Exception exception)
212217
{
213-
_log.LogError(exception, "Exception monitoring silo {SiloAddress}", SiloAddress);
218+
_log.LogError(exception, "Exception monitoring silo {SiloAddress}", TargetSiloAddress);
214219
}
215220
}
216221

217222
TimeSpan GetTimeout(bool isDirectProbe)
218223
{
219224
var additionalTimeout = 0;
220225

221-
if (_clusterMembershipOptions.CurrentValue.ExtendProbeTimeoutDuringDegradation)
226+
if (options.ExtendProbeTimeoutDuringDegradation)
222227
{
223228
// Attempt to account for local health degradation by extending the timeout period.
224229
var localDegradationScore = _localSiloHealthMonitor.GetLocalHealthDegradationScore(DateTime.UtcNow);
@@ -231,7 +236,7 @@ TimeSpan GetTimeout(bool isDirectProbe)
231236
additionalTimeout += 1;
232237
}
233238

234-
return _clusterMembershipOptions.CurrentValue.ProbeTimeout.Multiply(1 + additionalTimeout);
239+
return options.ProbeTimeout.Multiply(1 + additionalTimeout);
235240
}
236241
}
237242

@@ -245,15 +250,15 @@ private async Task<ProbeResult> ProbeDirectly(CancellationToken cancellation)
245250
var id = ++_nextProbeId;
246251
if (_log.IsEnabled(LogLevel.Trace))
247252
{
248-
_log.LogTrace("Going to send Ping #{Id} to probe silo {Silo}", id, SiloAddress);
253+
_log.LogTrace("Going to send Ping #{Id} to probe silo {Silo}", id, TargetSiloAddress);
249254
}
250255

251256
var roundTripTimer = ValueStopwatch.StartNew();
252257
ProbeResult probeResult;
253258
Exception? failureException;
254259
try
255260
{
256-
await _prober.Probe(SiloAddress, id, cancellation).WaitAsync(cancellation);
261+
await _prober.Probe(TargetSiloAddress, id, cancellation).WaitAsync(cancellation);
257262
failureException = null;
258263
}
259264
catch (OperationCanceledException exception)
@@ -271,14 +276,14 @@ private async Task<ProbeResult> ProbeDirectly(CancellationToken cancellation)
271276

272277
if (failureException is null)
273278
{
274-
MessagingInstruments.OnPingReplyReceived(SiloAddress);
279+
MessagingInstruments.OnPingReplyReceived(TargetSiloAddress);
275280

276281
if (_log.IsEnabled(LogLevel.Trace))
277282
{
278283
_log.LogTrace(
279284
"Got successful ping response for ping #{Id} from {Silo} with round trip time of {RoundTripTime}",
280285
id,
281-
SiloAddress,
286+
TargetSiloAddress,
282287
roundTripTimer.Elapsed);
283288
}
284289

@@ -289,15 +294,15 @@ private async Task<ProbeResult> ProbeDirectly(CancellationToken cancellation)
289294
}
290295
else
291296
{
292-
MessagingInstruments.OnPingReplyMissed(SiloAddress);
297+
MessagingInstruments.OnPingReplyMissed(TargetSiloAddress);
293298

294299
var failedProbes = ++_failedProbes;
295300
_log.LogWarning(
296301
(int)ErrorCode.MembershipMissedPing,
297302
failureException,
298303
"Did not get response for probe #{Id} to silo {Silo} after {Elapsed}. Current number of consecutive failed probes is {FailedProbeCount}",
299304
id,
300-
SiloAddress,
305+
TargetSiloAddress,
301306
roundTripTimer.Elapsed,
302307
failedProbes);
303308

@@ -319,15 +324,15 @@ private async Task<ProbeResult> ProbeIndirectly(SiloAddress intermediary, TimeSp
319324
var id = ++_nextProbeId;
320325
if (_log.IsEnabled(LogLevel.Trace))
321326
{
322-
_log.LogTrace("Going to send indirect ping #{Id} to probe silo {Silo} via {Intermediary}", id, SiloAddress, intermediary);
327+
_log.LogTrace("Going to send indirect ping #{Id} to probe silo {Silo} via {Intermediary}", id, TargetSiloAddress, intermediary);
323328
}
324329

325330
var roundTripTimer = ValueStopwatch.StartNew();
326331
ProbeResult probeResult;
327332
try
328333
{
329334
using var cancellationSource = CancellationTokenSource.CreateLinkedTokenSource(cancellation, _stoppingCancellation.Token);
330-
var indirectResult = await _prober.ProbeIndirectly(intermediary, SiloAddress, directProbeTimeout, id, cancellationSource.Token).WaitAsync(cancellationSource.Token);
335+
var indirectResult = await _prober.ProbeIndirectly(intermediary, TargetSiloAddress, directProbeTimeout, id, cancellationSource.Token).WaitAsync(cancellationSource.Token);
331336
roundTripTimer.Stop();
332337
var roundTripTime = roundTripTimer.Elapsed - indirectResult.ProbeResponseTime;
333338

@@ -340,26 +345,26 @@ private async Task<ProbeResult> ProbeIndirectly(SiloAddress intermediary, TimeSp
340345
_log.LogInformation(
341346
"Indirect probe request #{Id} to silo {SiloAddress} via silo {IntermediarySiloAddress} succeeded after {RoundTripTime} with a direct probe response time of {ProbeResponseTime}.",
342347
id,
343-
SiloAddress,
348+
TargetSiloAddress,
344349
intermediary,
345350
roundTripTimer.Elapsed,
346351
indirectResult.ProbeResponseTime);
347352

348-
MessagingInstruments.OnPingReplyReceived(SiloAddress);
353+
MessagingInstruments.OnPingReplyReceived(TargetSiloAddress);
349354

350355
_failedProbes = 0;
351356
probeResult = ProbeResult.CreateIndirect(0, ProbeResultStatus.Succeeded, indirectResult, intermediary);
352357
}
353358
else
354359
{
355-
MessagingInstruments.OnPingReplyMissed(SiloAddress);
360+
MessagingInstruments.OnPingReplyMissed(TargetSiloAddress);
356361

357362
if (indirectResult.IntermediaryHealthScore > 0)
358363
{
359364
_log.LogInformation(
360365
"Ignoring failure result for ping #{Id} from {Silo} since the intermediary used to probe the silo is not healthy. Intermediary health degradation score: {IntermediaryHealthScore}.",
361366
id,
362-
SiloAddress,
367+
TargetSiloAddress,
363368
indirectResult.IntermediaryHealthScore);
364369
probeResult = ProbeResult.CreateIndirect(_failedProbes, ProbeResultStatus.Unknown, indirectResult, intermediary);
365370
}
@@ -368,7 +373,7 @@ private async Task<ProbeResult> ProbeIndirectly(SiloAddress intermediary, TimeSp
368373
_log.LogWarning(
369374
"Indirect probe request #{Id} to silo {SiloAddress} via silo {IntermediarySiloAddress} failed after {RoundTripTime} with a direct probe response time of {ProbeResponseTime}. Failure message: {FailureMessage}. Intermediary health score: {IntermediaryHealthScore}.",
370375
id,
371-
SiloAddress,
376+
TargetSiloAddress,
372377
intermediary,
373378
roundTripTimer.Elapsed,
374379
indirectResult.ProbeResponseTime,
@@ -382,7 +387,7 @@ private async Task<ProbeResult> ProbeIndirectly(SiloAddress intermediary, TimeSp
382387
}
383388
catch (Exception exception)
384389
{
385-
MessagingInstruments.OnPingReplyMissed(SiloAddress);
390+
MessagingInstruments.OnPingReplyMissed(TargetSiloAddress);
386391
_log.LogWarning(exception, "Indirect probe request failed.");
387392
probeResult = ProbeResult.CreateIndirect(_failedProbes, ProbeResultStatus.Unknown, default, intermediary);
388393
}

test/NonSilo.Tests/Membership/ClusterHealthMonitorTests.cs

+4-10
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ private async Task ClusterHealthMonitor_BasicScenario_Runner(bool enableIndirect
224224
Assert.DoesNotContain(testRig.TestAccessor.MonitoredSilos, s => s.Key.Equals(this.localSilo));
225225
var expectedNumProbedSilos = otherSilosAreStale ? otherSilos.Length : clusterMembershipOptions.NumProbedSilos;
226226
Assert.Equal(expectedNumProbedSilos, testRig.TestAccessor.MonitoredSilos.Count);
227-
Assert.All(testRig.TestAccessor.MonitoredSilos, m => m.Key.Equals(m.Value.SiloAddress));
227+
Assert.All(testRig.TestAccessor.MonitoredSilos, m => m.Key.Equals(m.Value.TargetSiloAddress));
228228
Assert.Empty(probeCalls);
229229

230230
// Check that those silos are actually being probed periodically
@@ -290,7 +290,7 @@ await UntilEqual(expectedNumProbedSilos, () =>
290290
{
291291
Assert.Equal(expectedMissedProbes, ((SiloHealthMonitor.ITestAccessor)siloMonitor).MissedProbes);
292292

293-
var entry = table.Members.Single(m => m.Item1.SiloAddress.Equals(siloMonitor.SiloAddress)).Item1;
293+
var entry = table.Members.Single(m => m.Item1.SiloAddress.Equals(siloMonitor.TargetSiloAddress)).Item1;
294294
var votes = entry.GetFreshVotes(now.UtcDateTime, clusterMembershipOptions.DeathVoteExpirationTimeout);
295295
if (expectedMissedProbes < clusterMembershipOptions.NumMissedProbesLimit)
296296
{
@@ -359,7 +359,7 @@ await UntilEqual(testRig.TestAccessor.MonitoredSilos.Count, () =>
359359

360360
foreach (var siloMonitor in testRig.TestAccessor.MonitoredSilos.Values)
361361
{
362-
this.output.WriteLine($"Checking missed probes on {siloMonitor.SiloAddress}: {((SiloHealthMonitor.ITestAccessor)siloMonitor).MissedProbes}");
362+
this.output.WriteLine($"Checking missed probes on {siloMonitor.TargetSiloAddress}: {((SiloHealthMonitor.ITestAccessor)siloMonitor).MissedProbes}");
363363
Assert.Equal(0, ((SiloHealthMonitor.ITestAccessor)siloMonitor).MissedProbes);
364364
}
365365

@@ -533,12 +533,10 @@ private async Task StopLifecycle(CancellationToken cancellation = default)
533533

534534
private class ClusterHealthMonitorTestRig(
535535
MembershipTableManager manager,
536-
IClusterMembershipService membershipService,
537536
IOptionsMonitor<ClusterMembershipOptions> optionsMonitor,
538537
ClusterHealthMonitor.ITestAccessor testAccessor)
539538
{
540539
public readonly MembershipTableManager Manager = manager;
541-
public readonly IClusterMembershipService MembershipService = membershipService;
542540
public readonly IOptionsMonitor<ClusterMembershipOptions> OptionsMonitor = optionsMonitor;
543541
public readonly ClusterHealthMonitor.ITestAccessor TestAccessor = testAccessor;
544542
}
@@ -557,9 +555,6 @@ private ClusterHealthMonitorTestRig CreateClusterHealthMonitorTestRig(ClusterMem
557555

558556
((ILifecycleParticipant<ISiloLifecycle>)manager).Participate(this.lifecycle);
559557

560-
var membershipService = Substitute.For<IClusterMembershipService>();
561-
membershipService.CurrentSnapshot.ReturnsForAnyArgs(info => manager.MembershipTableSnapshot.CreateClusterMembershipSnapshot());
562-
563558
var optionsMonitor = Substitute.For<IOptionsMonitor<ClusterMembershipOptions>>();
564559
optionsMonitor.CurrentValue.ReturnsForAnyArgs(clusterMembershipOptions);
565560

@@ -582,12 +577,11 @@ private ClusterHealthMonitorTestRig CreateClusterHealthMonitorTestRig(ClusterMem
582577
this.prober,
583578
this.timerFactory,
584579
this.localSiloHealthMonitor,
585-
membershipService,
580+
manager,
586581
this.localSiloDetails);
587582

588583
return new(
589584
manager: manager,
590-
membershipService: membershipService,
591585
optionsMonitor: optionsMonitor,
592586
testAccessor: testAccessor);
593587
}

0 commit comments

Comments
 (0)