1
+ using System ;
2
+ using System . Collections . Concurrent ;
3
+ using System . Collections . Generic ;
4
+ using System . Linq ;
5
+ using System . Threading ;
6
+ using System . Threading . Tasks ;
7
+ using Microsoft . Extensions . Logging ;
8
+
9
+ #nullable enable
10
+ namespace Orleans . Runtime . MembershipService . SiloMetadata ;
11
+
12
+ internal class SiloMetadataCache (
13
+ ISiloMetadataClient siloMetadataClient ,
14
+ MembershipTableManager membershipTableManager ,
15
+ ILogger < SiloMetadataCache > logger )
16
+ : ISiloMetadataCache , ILifecycleParticipant < ISiloLifecycle > , IDisposable
17
+ {
18
+ private readonly ConcurrentDictionary < SiloAddress , SiloMetadata > _metadata = new ( ) ;
19
+ private readonly CancellationTokenSource _cts = new ( ) ;
20
+
21
+ void ILifecycleParticipant < ISiloLifecycle > . Participate ( ISiloLifecycle lifecycle )
22
+ {
23
+ Task ? task = null ;
24
+ Task OnStart ( CancellationToken _ )
25
+ {
26
+ task = Task . Run ( ( ) => this . ProcessMembershipUpdates ( _cts . Token ) ) ;
27
+ return Task . CompletedTask ;
28
+ }
29
+
30
+ async Task OnStop ( CancellationToken ct )
31
+ {
32
+ await _cts . CancelAsync ( ) . ConfigureAwait ( ConfigureAwaitOptions . SuppressThrowing ) ;
33
+ if ( task is not null )
34
+ {
35
+ await task . WaitAsync ( ct ) . ConfigureAwait ( ConfigureAwaitOptions . SuppressThrowing ) ;
36
+ }
37
+ }
38
+
39
+ lifecycle . Subscribe (
40
+ nameof ( ClusterMembershipService ) ,
41
+ ServiceLifecycleStage . RuntimeServices ,
42
+ OnStart ,
43
+ OnStop ) ;
44
+ }
45
+
46
+ private async Task ProcessMembershipUpdates ( CancellationToken ct )
47
+ {
48
+ try
49
+ {
50
+ if ( logger . IsEnabled ( LogLevel . Debug ) ) logger . LogDebug ( "Starting to process membership updates." ) ;
51
+ await foreach ( var update in membershipTableManager . MembershipTableUpdates . WithCancellation ( ct ) )
52
+ {
53
+ // Add entries for members that aren't already in the cache
54
+ foreach ( var membershipEntry in update . Entries . Where ( e => e . Value . Status is SiloStatus . Active or SiloStatus . Joining ) )
55
+ {
56
+ if ( ! _metadata . ContainsKey ( membershipEntry . Key ) )
57
+ {
58
+ try
59
+ {
60
+ var metadata = await siloMetadataClient . GetSiloMetadata ( membershipEntry . Key ) . WaitAsync ( ct ) ;
61
+ _metadata . TryAdd ( membershipEntry . Key , metadata ) ;
62
+ }
63
+ catch ( Exception exception )
64
+ {
65
+ logger . LogError ( exception , "Error fetching metadata for silo {Silo}" , membershipEntry . Key ) ;
66
+ }
67
+ }
68
+ }
69
+
70
+ // Remove entries for members that are now dead
71
+ foreach ( var membershipEntry in update . Entries . Where ( e => e . Value . Status == SiloStatus . Dead ) )
72
+ {
73
+ _metadata . TryRemove ( membershipEntry . Key , out _ ) ;
74
+ }
75
+
76
+ // Remove entries for members that are no longer in the table
77
+ foreach ( var silo in _metadata . Keys . ToList ( ) )
78
+ {
79
+ if ( ! update . Entries . ContainsKey ( silo ) )
80
+ {
81
+ _metadata . TryRemove ( silo , out _ ) ;
82
+ }
83
+ }
84
+ }
85
+ }
86
+ catch ( OperationCanceledException ) when ( ct . IsCancellationRequested )
87
+ {
88
+ // Ignore and continue shutting down.
89
+ }
90
+ catch ( Exception exception )
91
+ {
92
+ logger . LogError ( exception , "Error processing membership updates" ) ;
93
+ }
94
+ finally
95
+ {
96
+ if ( logger . IsEnabled ( LogLevel . Debug ) ) logger . LogDebug ( "Stopping membership update processor" ) ;
97
+ }
98
+ }
99
+
100
+ public SiloMetadata GetSiloMetadata ( SiloAddress siloAddress ) => _metadata . GetValueOrDefault ( siloAddress ) ?? SiloMetadata . Empty ;
101
+
102
+ public void SetMetadata ( SiloAddress siloAddress , SiloMetadata metadata ) => _metadata . TryAdd ( siloAddress , metadata ) ;
103
+
104
+ public void Dispose ( ) => _cts . Cancel ( ) ;
105
+ }
0 commit comments