17
17
import io .dapr .utils .TypeRef ;
18
18
import reactor .core .publisher .Mono ;
19
19
20
+ import java .time .Duration ;
21
+ import java .time .Instant ;
20
22
import java .util .ArrayList ;
23
+ import java .util .Date ;
21
24
import java .util .List ;
22
25
import java .util .Map ;
23
26
import java .util .NoSuchElementException ;
@@ -66,12 +69,13 @@ public class ActorStateManager {
66
69
/**
67
70
* Adds a given key/value to the Actor's state store's cache.
68
71
*
69
- * @param stateName Name of the state being added.
70
- * @param value Value to be added.
71
- * @param <T> Type of the object being added.
72
+ * @param stateName Name of the state being added.
73
+ * @param value Value to be added.
74
+ * @param expiration State's expiration.
75
+ * @param <T> Type of the object being added.
72
76
* @return Asynchronous void operation.
73
77
*/
74
- public <T > Mono <Void > add (String stateName , T value ) {
78
+ public <T > Mono <Void > add (String stateName , T value , Instant expiration ) {
75
79
return Mono .fromSupplier (() -> {
76
80
if (stateName == null ) {
77
81
throw new IllegalArgumentException ("State's name cannot be null." );
@@ -84,7 +88,8 @@ public <T> Mono<Void> add(String stateName, T value) {
84
88
StateChangeMetadata metadata = this .stateChangeTracker .get (stateName );
85
89
86
90
if (metadata .kind == ActorStateChangeKind .REMOVE ) {
87
- this .stateChangeTracker .put (stateName , new StateChangeMetadata (ActorStateChangeKind .UPDATE , value ));
91
+ this .stateChangeTracker .put (
92
+ stateName , new StateChangeMetadata (ActorStateChangeKind .UPDATE , value , expiration ));
88
93
return true ;
89
94
}
90
95
@@ -95,7 +100,8 @@ public <T> Mono<Void> add(String stateName, T value) {
95
100
throw new IllegalStateException ("Duplicate state: " + stateName );
96
101
}
97
102
98
- this .stateChangeTracker .put (stateName , new StateChangeMetadata (ActorStateChangeKind .ADD , value ));
103
+ this .stateChangeTracker .put (
104
+ stateName , new StateChangeMetadata (ActorStateChangeKind .ADD , value , expiration ));
99
105
return true ;
100
106
}))
101
107
.then ();
@@ -130,6 +136,10 @@ public <T> Mono<T> get(String stateName, TypeRef<T> type) {
130
136
if (this .stateChangeTracker .containsKey (stateName )) {
131
137
StateChangeMetadata metadata = this .stateChangeTracker .get (stateName );
132
138
139
+ if (metadata .isExpired ()) {
140
+ throw new NoSuchElementException ("State is expired: " + stateName );
141
+ }
142
+
133
143
if (metadata .kind == ActorStateChangeKind .REMOVE ) {
134
144
throw new NoSuchElementException ("State is marked for removal: " + stateName );
135
145
}
@@ -142,20 +152,37 @@ public <T> Mono<T> get(String stateName, TypeRef<T> type) {
142
152
this .stateProvider .load (this .actorTypeName , this .actorId , stateName , type )
143
153
.switchIfEmpty (Mono .error (new NoSuchElementException ("State not found: " + stateName )))
144
154
.map (v -> {
145
- this .stateChangeTracker .put (stateName , new StateChangeMetadata (ActorStateChangeKind .NONE , v ));
146
- return (T ) v ;
155
+ this .stateChangeTracker .put (
156
+ stateName , new StateChangeMetadata (ActorStateChangeKind .NONE , v .getValue (), v .getExpiration ()));
157
+ return (T ) v .getValue ();
147
158
}));
148
159
}
149
160
150
161
/**
151
162
* Updates a given key/value pair in the state store's cache.
163
+ * Use the variation that takes in an TTL instead.
152
164
*
153
165
* @param stateName Name of the state being updated.
154
166
* @param value Value to be set for given state.
155
167
* @param <T> Type of the value being set.
156
168
* @return Asynchronous void result.
157
169
*/
170
+ @ Deprecated
158
171
public <T > Mono <Void > set (String stateName , T value ) {
172
+ return this .set (stateName , value , Duration .ZERO );
173
+ }
174
+
175
+ /**
176
+ * Updates a given key/value pair in the state store's cache.
177
+ * Using TTL is highly recommended to avoid state to be left in the state store forever.
178
+ *
179
+ * @param stateName Name of the state being updated.
180
+ * @param value Value to be set for given state.
181
+ * @param ttl Time to live.
182
+ * @param <T> Type of the value being set.
183
+ * @return Asynchronous void result.
184
+ */
185
+ public <T > Mono <Void > set (String stateName , T value , Duration ttl ) {
159
186
return Mono .fromSupplier (() -> {
160
187
if (stateName == null ) {
161
188
throw new IllegalArgumentException ("State's name cannot be null." );
@@ -165,20 +192,23 @@ public <T> Mono<Void> set(String stateName, T value) {
165
192
StateChangeMetadata metadata = this .stateChangeTracker .get (stateName );
166
193
167
194
ActorStateChangeKind kind = metadata .kind ;
168
- if ((kind == ActorStateChangeKind .NONE ) || (kind == ActorStateChangeKind .REMOVE )) {
195
+ if (metadata . isExpired () || (kind == ActorStateChangeKind .NONE ) || (kind == ActorStateChangeKind .REMOVE )) {
169
196
kind = ActorStateChangeKind .UPDATE ;
170
197
}
171
198
172
- this .stateChangeTracker .put (stateName , new StateChangeMetadata (kind , value ));
199
+ var expiration = buildExpiration (ttl );
200
+ this .stateChangeTracker .put (stateName , new StateChangeMetadata (kind , value , expiration ));
173
201
return true ;
174
202
}
175
203
176
204
return false ;
177
205
}).filter (x -> x )
178
206
.switchIfEmpty (this .stateProvider .contains (this .actorTypeName , this .actorId , stateName )
179
207
.map (exists -> {
208
+ var expiration = buildExpiration (ttl );
180
209
this .stateChangeTracker .put (stateName ,
181
- new StateChangeMetadata (exists ? ActorStateChangeKind .UPDATE : ActorStateChangeKind .ADD , value ));
210
+ new StateChangeMetadata (
211
+ exists ? ActorStateChangeKind .UPDATE : ActorStateChangeKind .ADD , value , expiration ));
182
212
return exists ;
183
213
}))
184
214
.then ();
@@ -208,7 +238,7 @@ public Mono<Void> remove(String stateName) {
208
238
return true ;
209
239
}
210
240
211
- this .stateChangeTracker .put (stateName , new StateChangeMetadata (ActorStateChangeKind .REMOVE , null ));
241
+ this .stateChangeTracker .put (stateName , new StateChangeMetadata (ActorStateChangeKind .REMOVE , null , null ));
212
242
return true ;
213
243
}
214
244
@@ -218,7 +248,7 @@ public Mono<Void> remove(String stateName) {
218
248
.switchIfEmpty (this .stateProvider .contains (this .actorTypeName , this .actorId , stateName ))
219
249
.filter (exists -> exists )
220
250
.map (exists -> {
221
- this .stateChangeTracker .put (stateName , new StateChangeMetadata (ActorStateChangeKind .REMOVE , null ));
251
+ this .stateChangeTracker .put (stateName , new StateChangeMetadata (ActorStateChangeKind .REMOVE , null , null ));
222
252
return exists ;
223
253
})
224
254
.then ();
@@ -239,7 +269,7 @@ public Mono<Boolean> contains(String stateName) {
239
269
return this .stateChangeTracker .get (stateName );
240
270
}
241
271
).map (metadata -> {
242
- if (metadata .kind == ActorStateChangeKind .REMOVE ) {
272
+ if (metadata .isExpired () || ( metadata . kind == ActorStateChangeKind .REMOVE ) ) {
243
273
return Boolean .FALSE ;
244
274
}
245
275
@@ -264,7 +294,8 @@ public Mono<Void> save() {
264
294
continue ;
265
295
}
266
296
267
- changes .add (new ActorStateChange (tuple .getKey (), tuple .getValue ().value , tuple .getValue ().kind ));
297
+ var actorState = new ActorState <>(tuple .getKey (), tuple .getValue ().value , tuple .getValue ().expiration );
298
+ changes .add (new ActorStateChange (actorState , tuple .getValue ().kind ));
268
299
}
269
300
270
301
return changes .toArray (new ActorStateChange [0 ]);
@@ -288,12 +319,17 @@ private void flush() {
288
319
if (tuple .getValue ().kind == ActorStateChangeKind .REMOVE ) {
289
320
this .stateChangeTracker .remove (stateName );
290
321
} else {
291
- StateChangeMetadata metadata = new StateChangeMetadata (ActorStateChangeKind .NONE , tuple .getValue ().value );
322
+ StateChangeMetadata metadata =
323
+ new StateChangeMetadata (ActorStateChangeKind .NONE , tuple .getValue ().value , tuple .getValue ().expiration );
292
324
this .stateChangeTracker .put (stateName , metadata );
293
325
}
294
326
}
295
327
}
296
328
329
+ private static Instant buildExpiration (Duration ttl ) {
330
+ return (ttl != null ) && !ttl .isNegative () && !ttl .isZero () ? Instant .now ().plus (ttl ) : null ;
331
+ }
332
+
297
333
/**
298
334
* Internal class to represent value and change kind.
299
335
*/
@@ -309,15 +345,26 @@ private static final class StateChangeMetadata {
309
345
*/
310
346
private final Object value ;
311
347
348
+ /**
349
+ * Expiration.
350
+ */
351
+ private final Instant expiration ;
352
+
312
353
/**
313
354
* Creates a new instance of the metadata on state change.
314
355
*
315
356
* @param kind Kind of change.
316
357
* @param value Value to be set.
358
+ * @param expiration When the value is set to expire (recommended but accepts null).
317
359
*/
318
- private StateChangeMetadata (ActorStateChangeKind kind , Object value ) {
360
+ private StateChangeMetadata (ActorStateChangeKind kind , Object value , Instant expiration ) {
319
361
this .kind = kind ;
320
362
this .value = value ;
363
+ this .expiration = expiration ;
364
+ }
365
+
366
+ private boolean isExpired () {
367
+ return (this .expiration != null ) && Instant .now ().isAfter (this .expiration );
321
368
}
322
369
}
323
370
}
0 commit comments