1
- using System . Collections . Immutable ;
1
+ using System . Collections . Immutable ;
2
2
using Chirper . Grains . Models ;
3
3
using Microsoft . Extensions . Logging ;
4
4
using Orleans . Concurrency ;
7
7
namespace Chirper . Grains ;
8
8
9
9
[ Reentrant ]
10
- public sealed class ChirperAccount : Grain , IChirperAccount
10
+ public sealed class ChirperAccount (
11
+ [ PersistentState ( stateName : "account" , storageName : "AccountState" ) ] IPersistentState < ChirperAccountState > state ,
12
+ ILogger < ChirperAccount > logger ) : Grain , IChirperAccount
11
13
{
14
+ private static string GrainType => nameof ( ChirperAccount ) ;
15
+
12
16
/// <summary>
13
17
/// Size for the recently received message cache.
14
18
/// </summary>
@@ -29,28 +33,17 @@ public sealed class ChirperAccount : Grain, IChirperAccount
29
33
/// This list is not part of state and will not survive grain deactivation.
30
34
/// </summary>
31
35
private readonly HashSet < IChirperViewer > _viewers = new ( ) ;
32
- private readonly ILogger < ChirperAccount > _logger ;
33
- private readonly IPersistentState < ChirperAccountState > _state ;
34
36
35
37
/// <summary>
36
38
/// Allows state writing to happen in the background.
37
39
/// </summary>
38
40
private Task ? _outstandingWriteStateOperation ;
39
41
40
- public ChirperAccount (
41
- [ PersistentState ( stateName : "account" , storageName : "AccountState" ) ] IPersistentState < ChirperAccountState > state ,
42
- ILogger < ChirperAccount > logger )
43
- {
44
- _state = state ;
45
- _logger = logger ;
46
- }
47
-
48
- private static string GrainType => nameof ( ChirperAccount ) ;
49
42
private string GrainKey => this . GetPrimaryKeyString ( ) ;
50
43
51
44
public override Task OnActivateAsync ( CancellationToken _ )
52
45
{
53
- _logger . LogInformation ( "{GrainType} {GrainKey} activated." , GrainType , GrainKey ) ;
46
+ logger . LogInformation ( "{GrainType} {GrainKey} activated." , GrainType , GrainKey ) ;
54
47
55
48
return Task . CompletedTask ;
56
49
}
@@ -59,49 +52,49 @@ public async ValueTask PublishMessageAsync(string message)
59
52
{
60
53
var chirp = CreateNewChirpMessage ( message ) ;
61
54
62
- _logger . LogInformation ( "{GrainType} {GrainKey} publishing new chirp message '{Chirp}'." ,
55
+ logger . LogInformation ( "{GrainType} {GrainKey} publishing new chirp message '{Chirp}'." ,
63
56
GrainType , GrainKey , chirp ) ;
64
57
65
- _state . State . MyPublishedMessages . Enqueue ( chirp ) ;
58
+ state . State . MyPublishedMessages . Enqueue ( chirp ) ;
66
59
67
- while ( _state . State . MyPublishedMessages . Count > PublishedMessagesCacheSize )
60
+ while ( state . State . MyPublishedMessages . Count > PublishedMessagesCacheSize )
68
61
{
69
- _state . State . MyPublishedMessages . Dequeue ( ) ;
62
+ state . State . MyPublishedMessages . Dequeue ( ) ;
70
63
}
71
64
72
65
await WriteStateAsync ( ) ;
73
66
74
67
// notify viewers of new message
75
- _logger . LogInformation ( "{GrainType} {GrainKey} sending new chirp message to {ViewerCount} viewers." ,
68
+ logger . LogInformation ( "{GrainType} {GrainKey} sending new chirp message to {ViewerCount} viewers." ,
76
69
GrainType , GrainKey , _viewers . Count ) ;
77
70
78
71
_viewers . ForEach ( _ => _ . NewChirp ( chirp ) ) ;
79
72
80
73
// notify followers of a new message
81
- _logger . LogInformation ( "{GrainType} {GrainKey} sending new chirp message to {FollowerCount} followers." ,
82
- GrainType , GrainKey , _state . State . Followers . Count ) ;
74
+ logger . LogInformation ( "{GrainType} {GrainKey} sending new chirp message to {FollowerCount} followers." ,
75
+ GrainType , GrainKey , state . State . Followers . Count ) ;
83
76
84
- await Task . WhenAll ( _state . State . Followers . Values . Select ( _ => _ . NewChirpAsync ( chirp ) ) . ToArray ( ) ) ;
77
+ await Task . WhenAll ( state . State . Followers . Values . Select ( _ => _ . NewChirpAsync ( chirp ) ) . ToArray ( ) ) ;
85
78
}
86
79
87
80
public ValueTask < ImmutableList < ChirperMessage > > GetReceivedMessagesAsync ( int number , int start )
88
81
{
89
82
if ( start < 0 ) start = 0 ;
90
- if ( start + number > _state . State . RecentReceivedMessages . Count )
83
+ if ( start + number > state . State . RecentReceivedMessages . Count )
91
84
{
92
- number = _state . State . RecentReceivedMessages . Count - start ;
85
+ number = state . State . RecentReceivedMessages . Count - start ;
93
86
}
94
87
95
88
return ValueTask . FromResult (
96
- _state . State . RecentReceivedMessages
89
+ state . State . RecentReceivedMessages
97
90
. Skip ( start )
98
91
. Take ( number )
99
92
. ToImmutableList ( ) ) ;
100
93
}
101
94
102
95
public async ValueTask FollowUserIdAsync ( string username )
103
96
{
104
- _logger . LogInformation (
97
+ logger . LogInformation (
105
98
"{GrainType} {UserName} > FollowUserName({TargetUserName})." ,
106
99
GrainType ,
107
100
GrainKey ,
@@ -111,7 +104,7 @@ public async ValueTask FollowUserIdAsync(string username)
111
104
112
105
await userToFollow . AddFollowerAsync ( GrainKey , this . AsReference < IChirperSubscriber > ( ) ) ;
113
106
114
- _state . State . Subscriptions [ username ] = userToFollow ;
107
+ state . State . Subscriptions [ username ] = userToFollow ;
115
108
116
109
await WriteStateAsync ( ) ;
117
110
@@ -121,7 +114,7 @@ public async ValueTask FollowUserIdAsync(string username)
121
114
122
115
public async ValueTask UnfollowUserIdAsync ( string username )
123
116
{
124
- _logger . LogInformation (
117
+ logger . LogInformation (
125
118
"{GrainType} {GrainKey} > UnfollowUserName({TargetUserName})." ,
126
119
GrainType ,
127
120
GrainKey ,
@@ -132,7 +125,7 @@ await GrainFactory.GetGrain<IChirperPublisher>(username)
132
125
. RemoveFollowerAsync ( GrainKey ) ;
133
126
134
127
// remove this publisher from the subscriptions list
135
- _state . State . Subscriptions . Remove ( username ) ;
128
+ state . State . Subscriptions . Remove ( username ) ;
136
129
137
130
// save now
138
131
await WriteStateAsync ( ) ;
@@ -142,10 +135,10 @@ await GrainFactory.GetGrain<IChirperPublisher>(username)
142
135
}
143
136
144
137
public ValueTask < ImmutableList < string > > GetFollowingListAsync ( ) =>
145
- ValueTask . FromResult ( _state . State . Subscriptions . Keys . ToImmutableList ( ) ) ;
138
+ ValueTask . FromResult ( state . State . Subscriptions . Keys . ToImmutableList ( ) ) ;
146
139
147
140
public ValueTask < ImmutableList < string > > GetFollowersListAsync ( ) =>
148
- ValueTask . FromResult ( _state . State . Followers . Keys . ToImmutableList ( ) ) ;
141
+ ValueTask . FromResult ( state . State . Followers . Keys . ToImmutableList ( ) ) ;
149
142
150
143
public ValueTask SubscribeAsync ( IChirperViewer viewer )
151
144
{
@@ -162,50 +155,50 @@ public ValueTask UnsubscribeAsync(IChirperViewer viewer)
162
155
public ValueTask < ImmutableList < ChirperMessage > > GetPublishedMessagesAsync ( int number , int start )
163
156
{
164
157
if ( start < 0 ) start = 0 ;
165
- if ( start + number > _state . State . MyPublishedMessages . Count )
158
+ if ( start + number > state . State . MyPublishedMessages . Count )
166
159
{
167
- number = _state . State . MyPublishedMessages . Count - start ;
160
+ number = state . State . MyPublishedMessages . Count - start ;
168
161
}
169
162
return ValueTask . FromResult (
170
- _state . State . MyPublishedMessages
163
+ state . State . MyPublishedMessages
171
164
. Skip ( start )
172
165
. Take ( number )
173
166
. ToImmutableList ( ) ) ;
174
167
}
175
168
176
169
public async ValueTask AddFollowerAsync ( string username , IChirperSubscriber follower )
177
170
{
178
- _state . State . Followers [ username ] = follower ;
171
+ state . State . Followers [ username ] = follower ;
179
172
await WriteStateAsync ( ) ;
180
173
_viewers . ForEach ( cv => cv . NewFollower ( username ) ) ;
181
174
}
182
175
183
176
public ValueTask RemoveFollowerAsync ( string username )
184
177
{
185
- _state . State . Followers . Remove ( username ) ;
178
+ state . State . Followers . Remove ( username ) ;
186
179
return WriteStateAsync ( ) ;
187
180
}
188
181
189
182
public async Task NewChirpAsync ( ChirperMessage chirp )
190
183
{
191
- _logger . LogInformation (
184
+ logger . LogInformation (
192
185
"{GrainType} {GrainKey} received chirp message = {Chirp}" ,
193
186
GrainType ,
194
187
GrainKey ,
195
188
chirp ) ;
196
189
197
- _state . State . RecentReceivedMessages . Enqueue ( chirp ) ;
190
+ state . State . RecentReceivedMessages . Enqueue ( chirp ) ;
198
191
199
192
// only relevant when not using fixed queue
200
- while ( _state . State . RecentReceivedMessages . Count > ReceivedMessagesCacheSize ) // to keep not more than the max number of messages
193
+ while ( state . State . RecentReceivedMessages . Count > ReceivedMessagesCacheSize ) // to keep not more than the max number of messages
201
194
{
202
- _state . State . RecentReceivedMessages . Dequeue ( ) ;
195
+ state . State . RecentReceivedMessages . Dequeue ( ) ;
203
196
}
204
197
205
198
await WriteStateAsync ( ) ;
206
199
207
200
// notify any viewers that a new chirp has been received
208
- _logger . LogInformation (
201
+ logger . LogInformation (
209
202
"{GrainType} {GrainKey} sending received chirp message to {ViewerCount} viewers" ,
210
203
GrainType ,
211
204
GrainKey ,
@@ -246,7 +239,7 @@ private async ValueTask WriteStateAsync()
246
239
if ( _outstandingWriteStateOperation is null )
247
240
{
248
241
// If after the initial write is completed, no other request initiated a new write operation, do it now.
249
- currentWriteStateOperation = _state . WriteStateAsync ( ) ;
242
+ currentWriteStateOperation = state . WriteStateAsync ( ) ;
250
243
_outstandingWriteStateOperation = currentWriteStateOperation ;
251
244
}
252
245
else
0 commit comments