5
5
using System . Threading ;
6
6
using System . Threading . Tasks ;
7
7
using Microsoft . Extensions . Logging ;
8
- using Orleans . Configuration ;
9
- using Orleans . Internal ;
10
8
11
9
#nullable enable
12
10
namespace Orleans . Runtime . MembershipService . SiloMetadata ;
@@ -22,44 +20,44 @@ internal class SiloMetadataCache(
22
20
23
21
void ILifecycleParticipant < ISiloLifecycle > . Participate ( ISiloLifecycle lifecycle )
24
22
{
25
- var tasks = new List < Task > ( 1 ) ;
26
- var cancellation = new CancellationTokenSource ( ) ;
27
- Task OnRuntimeInitializeStart ( CancellationToken _ )
23
+ Task ? task = null ;
24
+ Task OnStart ( CancellationToken _ )
28
25
{
29
- tasks . Add ( Task . Run ( ( ) => this . ProcessMembershipUpdates ( cancellation . Token ) ) ) ;
26
+ task = Task . Run ( ( ) => this . ProcessMembershipUpdates ( _cts . Token ) ) ;
30
27
return Task . CompletedTask ;
31
28
}
32
29
33
- async Task OnRuntimeInitializeStop ( CancellationToken ct )
30
+ async Task OnStop ( CancellationToken ct )
34
31
{
35
- cancellation . Cancel ( throwOnFirstException : false ) ;
36
- var shutdownGracePeriod = Task . WhenAll ( Task . Delay ( ClusterMembershipOptions . ClusteringShutdownGracePeriod ) , ct . WhenCancelled ( ) ) ;
37
- await Task . WhenAny ( shutdownGracePeriod , Task . WhenAll ( tasks ) ) ;
32
+ await _cts . CancelAsync ( ) . ConfigureAwait ( ConfigureAwaitOptions . SuppressThrowing ) ;
33
+ if ( task is not null )
34
+ {
35
+ await task . WaitAsync ( ct ) . ConfigureAwait ( ConfigureAwaitOptions . SuppressThrowing ) ;
36
+ }
38
37
}
39
38
40
39
lifecycle . Subscribe (
41
40
nameof ( ClusterMembershipService ) ,
42
- ServiceLifecycleStage . RuntimeInitialize ,
43
- OnRuntimeInitializeStart ,
44
- OnRuntimeInitializeStop ) ;
41
+ ServiceLifecycleStage . RuntimeServices ,
42
+ OnStart ,
43
+ OnStop ) ;
45
44
}
46
45
47
-
48
46
private async Task ProcessMembershipUpdates ( CancellationToken ct )
49
47
{
50
48
try
51
49
{
52
- if ( logger . IsEnabled ( LogLevel . Debug ) ) logger . LogDebug ( "Starting to process membership updates" ) ;
50
+ if ( logger . IsEnabled ( LogLevel . Debug ) ) logger . LogDebug ( "Starting to process membership updates. " ) ;
53
51
await foreach ( var update in membershipTableManager . MembershipTableUpdates . WithCancellation ( ct ) )
54
52
{
55
53
// Add entries for members that aren't already in the cache
56
- foreach ( var membershipEntry in update . Entries . Where ( e => e . Value . Status != SiloStatus . Dead ) )
54
+ foreach ( var membershipEntry in update . Entries . Where ( e => e . Value . Status is SiloStatus . Active or SiloStatus . Joining ) )
57
55
{
58
56
if ( ! _metadata . ContainsKey ( membershipEntry . Key ) )
59
57
{
60
58
try
61
59
{
62
- var metadata = await siloMetadataClient . GetSiloMetadata ( membershipEntry . Key ) ;
60
+ var metadata = await siloMetadataClient . GetSiloMetadata ( membershipEntry . Key ) . WaitAsync ( ct ) ;
63
61
_metadata . TryAdd ( membershipEntry . Key , metadata ) ;
64
62
}
65
63
catch ( Exception exception )
@@ -85,6 +83,10 @@ private async Task ProcessMembershipUpdates(CancellationToken ct)
85
83
}
86
84
}
87
85
}
86
+ catch ( OperationCanceledException ) when ( ct . IsCancellationRequested )
87
+ {
88
+ // Ignore and continue shutting down.
89
+ }
88
90
catch ( Exception exception )
89
91
{
90
92
logger . LogError ( exception , "Error processing membership updates" ) ;
@@ -95,7 +97,7 @@ private async Task ProcessMembershipUpdates(CancellationToken ct)
95
97
}
96
98
}
97
99
98
- public SiloMetadata GetMetadata ( SiloAddress siloAddress ) => _metadata . GetValueOrDefault ( siloAddress ) ?? SiloMetadata . Empty ;
100
+ public SiloMetadata GetSiloMetadata ( SiloAddress siloAddress ) => _metadata . GetValueOrDefault ( siloAddress ) ?? SiloMetadata . Empty ;
99
101
100
102
public void SetMetadata ( SiloAddress siloAddress , SiloMetadata metadata ) => _metadata . TryAdd ( siloAddress , metadata ) ;
101
103
0 commit comments