Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit d82b0c1

Browse files
committedJun 26, 2024·
Add actor state TTL support.
Signed-off-by: Artur Souza <[email protected]>
1 parent 363b6f9 commit d82b0c1

19 files changed

+488
-158
lines changed
 

‎.github/workflows/build.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ jobs:
107107
./dist/linux_amd64/release/placement &
108108
- name: Spin local environment
109109
run: |
110-
docker-compose -f ./sdk-tests/deploy/local-test.yml up -d mongo kafka
110+
docker-compose -f ./sdk-tests/deploy/local-test.yml up -d mongo kafka mysql
111111
docker ps
112112
- name: Install local ToxiProxy to simulate connectivity issues to Dapr sidecar
113113
run: |
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Copyright 2021 The Dapr Authors
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package io.dapr.actors.runtime;
15+
16+
import java.time.Instant;
17+
18+
/**
19+
* Represents a state change for an actor.
20+
*/
21+
final class ActorState<T> {
22+
23+
/**
24+
* Name of the state being changed.
25+
*/
26+
private final String name;
27+
28+
/**
29+
* New value for the state being changed.
30+
*/
31+
private final T value;
32+
33+
/**
34+
* Expiration.
35+
*/
36+
private final Instant expiration;
37+
38+
/**
39+
* Creates a new instance of the metadata on actor state.
40+
*
41+
* @param name Name of the state being changed.
42+
* @param value Value to be set.
43+
*/
44+
ActorState(String name, T value) {
45+
this(name, value, null);
46+
}
47+
48+
/**
49+
* Creates a new instance of the metadata on actor state.
50+
*
51+
* @param name Name of the state being changed.
52+
* @param value Value to be set.
53+
* @param expiration When the value is set to expire (recommended but accepts null).
54+
*/
55+
ActorState(String name, T value, Instant expiration) {
56+
this.name = name;
57+
this.value = value;
58+
this.expiration = expiration;
59+
}
60+
61+
/**
62+
* Gets the name of the state being changed.
63+
*
64+
* @return Name of the state.
65+
*/
66+
String getName() {
67+
return name;
68+
}
69+
70+
/**
71+
* Gets the new value of the state being changed.
72+
*
73+
* @return New value.
74+
*/
75+
T getValue() {
76+
return value;
77+
}
78+
79+
/**
80+
* Gets the expiration of the state.
81+
*
82+
* @return State expiration.
83+
*/
84+
Instant getExpiration() {
85+
return expiration;
86+
}
87+
88+
}

‎sdk-actors/src/main/java/io/dapr/actors/runtime/ActorStateChange.java

+9-25
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,9 @@
1919
public final class ActorStateChange {
2020

2121
/**
22-
* Name of the state being changed.
22+
* State being changed.
2323
*/
24-
private final String stateName;
25-
26-
/**
27-
* New value for the state being changed.
28-
*/
29-
private final Object value;
24+
private final ActorState state;
3025

3126
/**
3227
* Type of change {@link ActorStateChangeKind}.
@@ -36,32 +31,21 @@ public final class ActorStateChange {
3631
/**
3732
* Creates an actor state change.
3833
*
39-
* @param stateName Name of the state being changed.
40-
* @param value New value for the state being changed.
34+
* @param state State being changed.
4135
* @param changeKind Kind of change.
4236
*/
43-
ActorStateChange(String stateName, Object value, ActorStateChangeKind changeKind) {
44-
this.stateName = stateName;
45-
this.value = value;
37+
ActorStateChange(ActorState state, ActorStateChangeKind changeKind) {
38+
this.state = state;
4639
this.changeKind = changeKind;
4740
}
4841

4942
/**
50-
* Gets the name of the state being changed.
51-
*
52-
* @return Name of the state.
53-
*/
54-
String getStateName() {
55-
return stateName;
56-
}
57-
58-
/**
59-
* Gets the new value of the state being changed.
43+
* Gets the state being changed.
6044
*
61-
* @return New value.
45+
* @return state.
6246
*/
63-
Object getValue() {
64-
return value;
47+
ActorState getState() {
48+
return state;
6549
}
6650

6751
/**

‎sdk-actors/src/main/java/io/dapr/actors/runtime/ActorStateManager.java

+64-17
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@
1717
import io.dapr.utils.TypeRef;
1818
import reactor.core.publisher.Mono;
1919

20+
import java.time.Duration;
21+
import java.time.Instant;
2022
import java.util.ArrayList;
23+
import java.util.Date;
2124
import java.util.List;
2225
import java.util.Map;
2326
import java.util.NoSuchElementException;
@@ -66,12 +69,13 @@ public class ActorStateManager {
6669
/**
6770
* Adds a given key/value to the Actor's state store's cache.
6871
*
69-
* @param stateName Name of the state being added.
70-
* @param value Value to be added.
71-
* @param <T> Type of the object being added.
72+
* @param stateName Name of the state being added.
73+
* @param value Value to be added.
74+
* @param expiration State's expiration.
75+
* @param <T> Type of the object being added.
7276
* @return Asynchronous void operation.
7377
*/
74-
public <T> Mono<Void> add(String stateName, T value) {
78+
public <T> Mono<Void> add(String stateName, T value, Instant expiration) {
7579
return Mono.fromSupplier(() -> {
7680
if (stateName == null) {
7781
throw new IllegalArgumentException("State's name cannot be null.");
@@ -84,7 +88,8 @@ public <T> Mono<Void> add(String stateName, T value) {
8488
StateChangeMetadata metadata = this.stateChangeTracker.get(stateName);
8589

8690
if (metadata.kind == ActorStateChangeKind.REMOVE) {
87-
this.stateChangeTracker.put(stateName, new StateChangeMetadata(ActorStateChangeKind.UPDATE, value));
91+
this.stateChangeTracker.put(
92+
stateName, new StateChangeMetadata(ActorStateChangeKind.UPDATE, value, expiration));
8893
return true;
8994
}
9095

@@ -95,7 +100,8 @@ public <T> Mono<Void> add(String stateName, T value) {
95100
throw new IllegalStateException("Duplicate state: " + stateName);
96101
}
97102

98-
this.stateChangeTracker.put(stateName, new StateChangeMetadata(ActorStateChangeKind.ADD, value));
103+
this.stateChangeTracker.put(
104+
stateName, new StateChangeMetadata(ActorStateChangeKind.ADD, value, expiration));
99105
return true;
100106
}))
101107
.then();
@@ -130,6 +136,10 @@ public <T> Mono<T> get(String stateName, TypeRef<T> type) {
130136
if (this.stateChangeTracker.containsKey(stateName)) {
131137
StateChangeMetadata metadata = this.stateChangeTracker.get(stateName);
132138

139+
if (metadata.isExpired()) {
140+
throw new NoSuchElementException("State is expired: " + stateName);
141+
}
142+
133143
if (metadata.kind == ActorStateChangeKind.REMOVE) {
134144
throw new NoSuchElementException("State is marked for removal: " + stateName);
135145
}
@@ -142,20 +152,37 @@ public <T> Mono<T> get(String stateName, TypeRef<T> type) {
142152
this.stateProvider.load(this.actorTypeName, this.actorId, stateName, type)
143153
.switchIfEmpty(Mono.error(new NoSuchElementException("State not found: " + stateName)))
144154
.map(v -> {
145-
this.stateChangeTracker.put(stateName, new StateChangeMetadata(ActorStateChangeKind.NONE, v));
146-
return (T) v;
155+
this.stateChangeTracker.put(
156+
stateName, new StateChangeMetadata(ActorStateChangeKind.NONE, v.getValue(), v.getExpiration()));
157+
return (T) v.getValue();
147158
}));
148159
}
149160

150161
/**
151162
* Updates a given key/value pair in the state store's cache.
163+
* Use the variation that takes in an TTL instead.
152164
*
153165
* @param stateName Name of the state being updated.
154166
* @param value Value to be set for given state.
155167
* @param <T> Type of the value being set.
156168
* @return Asynchronous void result.
157169
*/
170+
@Deprecated
158171
public <T> Mono<Void> set(String stateName, T value) {
172+
return this.set(stateName, value, Duration.ZERO);
173+
}
174+
175+
/**
176+
* Updates a given key/value pair in the state store's cache.
177+
* Using TTL is highly recommended to avoid state to be left in the state store forever.
178+
*
179+
* @param stateName Name of the state being updated.
180+
* @param value Value to be set for given state.
181+
* @param ttl Time to live.
182+
* @param <T> Type of the value being set.
183+
* @return Asynchronous void result.
184+
*/
185+
public <T> Mono<Void> set(String stateName, T value, Duration ttl) {
159186
return Mono.fromSupplier(() -> {
160187
if (stateName == null) {
161188
throw new IllegalArgumentException("State's name cannot be null.");
@@ -165,20 +192,23 @@ public <T> Mono<Void> set(String stateName, T value) {
165192
StateChangeMetadata metadata = this.stateChangeTracker.get(stateName);
166193

167194
ActorStateChangeKind kind = metadata.kind;
168-
if ((kind == ActorStateChangeKind.NONE) || (kind == ActorStateChangeKind.REMOVE)) {
195+
if (metadata.isExpired() || (kind == ActorStateChangeKind.NONE) || (kind == ActorStateChangeKind.REMOVE)) {
169196
kind = ActorStateChangeKind.UPDATE;
170197
}
171198

172-
this.stateChangeTracker.put(stateName, new StateChangeMetadata(kind, value));
199+
var expiration = buildExpiration(ttl);
200+
this.stateChangeTracker.put(stateName, new StateChangeMetadata(kind, value, expiration));
173201
return true;
174202
}
175203

176204
return false;
177205
}).filter(x -> x)
178206
.switchIfEmpty(this.stateProvider.contains(this.actorTypeName, this.actorId, stateName)
179207
.map(exists -> {
208+
var expiration = buildExpiration(ttl);
180209
this.stateChangeTracker.put(stateName,
181-
new StateChangeMetadata(exists ? ActorStateChangeKind.UPDATE : ActorStateChangeKind.ADD, value));
210+
new StateChangeMetadata(
211+
exists ? ActorStateChangeKind.UPDATE : ActorStateChangeKind.ADD, value, expiration));
182212
return exists;
183213
}))
184214
.then();
@@ -208,7 +238,7 @@ public Mono<Void> remove(String stateName) {
208238
return true;
209239
}
210240

211-
this.stateChangeTracker.put(stateName, new StateChangeMetadata(ActorStateChangeKind.REMOVE, null));
241+
this.stateChangeTracker.put(stateName, new StateChangeMetadata(ActorStateChangeKind.REMOVE, null, null));
212242
return true;
213243
}
214244

@@ -218,7 +248,7 @@ public Mono<Void> remove(String stateName) {
218248
.switchIfEmpty(this.stateProvider.contains(this.actorTypeName, this.actorId, stateName))
219249
.filter(exists -> exists)
220250
.map(exists -> {
221-
this.stateChangeTracker.put(stateName, new StateChangeMetadata(ActorStateChangeKind.REMOVE, null));
251+
this.stateChangeTracker.put(stateName, new StateChangeMetadata(ActorStateChangeKind.REMOVE, null, null));
222252
return exists;
223253
})
224254
.then();
@@ -239,7 +269,7 @@ public Mono<Boolean> contains(String stateName) {
239269
return this.stateChangeTracker.get(stateName);
240270
}
241271
).map(metadata -> {
242-
if (metadata.kind == ActorStateChangeKind.REMOVE) {
272+
if (metadata.isExpired() || (metadata.kind == ActorStateChangeKind.REMOVE)) {
243273
return Boolean.FALSE;
244274
}
245275

@@ -264,7 +294,8 @@ public Mono<Void> save() {
264294
continue;
265295
}
266296

267-
changes.add(new ActorStateChange(tuple.getKey(), tuple.getValue().value, tuple.getValue().kind));
297+
var actorState = new ActorState<>(tuple.getKey(), tuple.getValue().value, tuple.getValue().expiration);
298+
changes.add(new ActorStateChange(actorState, tuple.getValue().kind));
268299
}
269300

270301
return changes.toArray(new ActorStateChange[0]);
@@ -288,12 +319,17 @@ private void flush() {
288319
if (tuple.getValue().kind == ActorStateChangeKind.REMOVE) {
289320
this.stateChangeTracker.remove(stateName);
290321
} else {
291-
StateChangeMetadata metadata = new StateChangeMetadata(ActorStateChangeKind.NONE, tuple.getValue().value);
322+
StateChangeMetadata metadata =
323+
new StateChangeMetadata(ActorStateChangeKind.NONE, tuple.getValue().value, tuple.getValue().expiration);
292324
this.stateChangeTracker.put(stateName, metadata);
293325
}
294326
}
295327
}
296328

329+
private static Instant buildExpiration(Duration ttl) {
330+
return (ttl != null) && !ttl.isNegative() && !ttl.isZero() ? Instant.now().plus(ttl) : null;
331+
}
332+
297333
/**
298334
* Internal class to represent value and change kind.
299335
*/
@@ -309,15 +345,26 @@ private static final class StateChangeMetadata {
309345
*/
310346
private final Object value;
311347

348+
/**
349+
* Expiration.
350+
*/
351+
private final Instant expiration;
352+
312353
/**
313354
* Creates a new instance of the metadata on state change.
314355
*
315356
* @param kind Kind of change.
316357
* @param value Value to be set.
358+
* @param expiration When the value is set to expire (recommended but accepts null).
317359
*/
318-
private StateChangeMetadata(ActorStateChangeKind kind, Object value) {
360+
private StateChangeMetadata(ActorStateChangeKind kind, Object value, Instant expiration) {
319361
this.kind = kind;
320362
this.value = value;
363+
this.expiration = expiration;
364+
}
365+
366+
private boolean isExpired() {
367+
return (this.expiration != null) && Instant.now().isAfter(this.expiration);
321368
}
322369
}
323370
}

‎sdk-actors/src/main/java/io/dapr/actors/runtime/ActorStateOperation.java

+9-26
Original file line numberDiff line numberDiff line change
@@ -25,28 +25,19 @@ final class ActorStateOperation {
2525
private String operationType;
2626

2727
/**
28-
* Key for the state to be persisted.
28+
* State to be persisted.
2929
*/
30-
private String key;
31-
32-
/**
33-
* Value of the state to be persisted.
34-
*/
35-
private Object value;
30+
private ActorState state;
3631

3732
/**
3833
* Instantiates a new Actor Timer.
3934
*
4035
* @param operationType Type of state operation.
41-
* @param key Key to be persisted.
42-
* @param value Value to be persisted.
36+
* @param state Key to be persisted.
4337
*/
44-
ActorStateOperation(String operationType,
45-
String key,
46-
Object value) {
38+
ActorStateOperation(String operationType, ActorState state) {
4739
this.operationType = operationType;
48-
this.key = key;
49-
this.value = value;
40+
this.state = state;
5041
}
5142

5243
/**
@@ -59,20 +50,12 @@ public String getOperationType() {
5950
}
6051

6152
/**
62-
* Gets the key to be persisted.
53+
* Gets the state to be persisted.
6354
*
64-
* @return Key to be persisted.
55+
* @return State to be persisted.
6556
*/
66-
public String getKey() {
67-
return key;
57+
public ActorState getState() {
58+
return state;
6859
}
6960

70-
/**
71-
* Gets the value to be persisted.
72-
*
73-
* @return Value to be persisted.
74-
*/
75-
public Object getValue() {
76-
return value;
77-
}
7861
}

‎sdk-actors/src/main/java/io/dapr/actors/runtime/DaprClient.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@ interface DaprClient {
2828
* @param actorType Type of actor.
2929
* @param actorId Actor Identifier.
3030
* @param keyName State name.
31-
* @return Asynchronous result with current state value.
31+
* @return Asynchronous result with current state.
3232
*/
33-
Mono<byte[]> getState(String actorType, String actorId, String keyName);
33+
Mono<ActorState<byte[]>> getState(String actorType, String actorId, String keyName);
3434

3535
/**
3636
* Saves state batch to Dapr.

‎sdk-actors/src/main/java/io/dapr/actors/runtime/DaprClientImpl.java

+26-4
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
import java.io.IOException;
3131
import java.nio.charset.Charset;
32+
import java.time.Instant;
3233
import java.util.ArrayList;
3334
import java.util.List;
3435
import java.util.concurrent.ExecutionException;
@@ -77,7 +78,7 @@ class DaprClientImpl implements DaprClient {
7778
* {@inheritDoc}
7879
*/
7980
@Override
80-
public Mono<byte[]> getState(String actorType, String actorId, String keyName) {
81+
public Mono<ActorState<byte[]>> getState(String actorType, String actorId, final String keyName) {
8182
DaprProtos.GetActorStateRequest req =
8283
DaprProtos.GetActorStateRequest.newBuilder()
8384
.setActorType(actorType)
@@ -86,7 +87,14 @@ public Mono<byte[]> getState(String actorType, String actorId, String keyName) {
8687
.build();
8788

8889
return Mono.<DaprProtos.GetActorStateResponse>create(it ->
89-
client.getActorState(req, createStreamObserver(it))).map(r -> r.getData().toByteArray());
90+
client.getActorState(req, createStreamObserver(it))).map(r -> {
91+
var expirationStr = r.getMetadataOrDefault("ttlExpireTime", null);
92+
Instant expiration = null;
93+
if ((expirationStr != null) && !expirationStr.isEmpty()) {
94+
expiration = Instant.parse(expirationStr);
95+
}
96+
return new ActorState<>(keyName, r.getData().toByteArray(), expiration);
97+
});
9098
}
9199

92100
/**
@@ -100,12 +108,26 @@ public Mono<Void> saveStateTransactionally(
100108
List<DaprProtos.TransactionalActorStateOperation> grpcOps = new ArrayList<>();
101109
for (ActorStateOperation op : operations) {
102110
String operationType = op.getOperationType();
103-
String key = op.getKey();
104-
Object value = op.getValue();
111+
String key = op.getState().getName();
112+
Object value = op.getState().getValue();
113+
Instant expiration = op.getState().getExpiration();
114+
Long ttlInSeconds = null;
115+
if (expiration != null) {
116+
ttlInSeconds = expiration.getEpochSecond() - Instant.now().getEpochSecond();
117+
}
105118
DaprProtos.TransactionalActorStateOperation.Builder opBuilder =
106119
DaprProtos.TransactionalActorStateOperation.newBuilder()
107120
.setOperationType(operationType)
108121
.setKey(key);
122+
123+
if (ttlInSeconds != null) {
124+
if (ttlInSeconds <= 0) {
125+
// already expired, min is 1s.
126+
ttlInSeconds = 1L;
127+
}
128+
opBuilder.putMetadata("ttlInSeconds", ttlInSeconds.toString());
129+
}
130+
109131
if (value != null) {
110132
if (value instanceof String) {
111133
opBuilder.setValue(Any.newBuilder().setValue(ByteString.copyFrom((String) value, CHARSET)));

‎sdk-actors/src/main/java/io/dapr/actors/runtime/DaprStateAsyncProvider.java

+17-12
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,13 @@
2020
import io.dapr.serializer.DefaultObjectSerializer;
2121
import io.dapr.utils.TypeRef;
2222
import reactor.core.publisher.Mono;
23+
import reactor.util.function.Tuple2;
24+
import reactor.util.function.Tuples;
2325

2426
import java.io.IOException;
2527
import java.nio.charset.Charset;
28+
import java.time.Instant;
29+
import java.util.AbstractMap;
2630
import java.util.ArrayList;
2731

2832
/**
@@ -67,37 +71,37 @@ class DaprStateAsyncProvider {
6771
this.isStateSerializerDefault = stateSerializer.getClass() == DefaultObjectSerializer.class;
6872
}
6973

70-
<T> Mono<T> load(String actorType, ActorId actorId, String stateName, TypeRef<T> type) {
71-
Mono<byte[]> result = this.daprClient.getState(actorType, actorId.toString(), stateName);
74+
<T> Mono<ActorState<T>> load(String actorType, ActorId actorId, String stateName, TypeRef<T> type) {
75+
Mono<ActorState<byte[]>> result = this.daprClient.getState(actorType, actorId.toString(), stateName);
7276

7377
return result.flatMap(s -> {
7478
try {
7579
if (s == null) {
7680
return Mono.empty();
7781
}
7882

79-
T response = this.stateSerializer.deserialize(s, type);
83+
T response = this.stateSerializer.deserialize(s.getValue(), type);
8084
if (this.isStateSerializerDefault && (response instanceof byte[])) {
81-
if (s.length == 0) {
85+
if ((s.getValue() == null) || (s.getValue().length == 0)) {
8286
return Mono.empty();
8387
}
8488
// Default serializer just passes through byte arrays, so we need to decode it here.
85-
response = (T) OBJECT_MAPPER.readValue(s, byte[].class);
89+
response = (T) OBJECT_MAPPER.readValue(s.getValue(), byte[].class);
8690
}
8791
if (response == null) {
8892
return Mono.empty();
8993
}
9094

91-
return Mono.just(response);
95+
return Mono.just(new ActorState<>(s.getName(), response, s.getExpiration()));
9296
} catch (IOException e) {
9397
return Mono.error(new RuntimeException(e));
9498
}
9599
});
96100
}
97101

98102
Mono<Boolean> contains(String actorType, ActorId actorId, String stateName) {
99-
Mono<byte[]> result = this.daprClient.getState(actorType, actorId.toString(), stateName);
100-
return result.map(s -> s.length > 0).defaultIfEmpty(false);
103+
var result = this.daprClient.getState(actorType, actorId.toString(), stateName);
104+
return result.map(s -> (s.getValue() != null) && (s.getValue().length > 0)).defaultIfEmpty(false);
101105
}
102106

103107
/**
@@ -139,14 +143,15 @@ Mono<Void> apply(String actorType, ActorId actorId, ActorStateChange... stateCha
139143
continue;
140144
}
141145

142-
String key = stateChange.getStateName();
146+
var state = stateChange.getState();
147+
String key = state.getName();
143148
Object value = null;
144149
if ((stateChange.getChangeKind() == ActorStateChangeKind.UPDATE)
145150
|| (stateChange.getChangeKind() == ActorStateChangeKind.ADD)) {
146151
try {
147-
byte[] data = this.stateSerializer.serialize(stateChange.getValue());
152+
byte[] data = this.stateSerializer.serialize(state.getValue());
148153
if (data != null) {
149-
if (this.isStateSerializerDefault && !(stateChange.getValue() instanceof byte[])) {
154+
if (this.isStateSerializerDefault && !(state.getValue() instanceof byte[])) {
150155
// DefaultObjectSerializer is a JSON serializer, so we just pass it on.
151156
value = new String(data, CHARSET);
152157
} else {
@@ -160,7 +165,7 @@ Mono<Void> apply(String actorType, ActorId actorId, ActorStateChange... stateCha
160165
}
161166
}
162167

163-
operations.add(new ActorStateOperation(operationName, key, value));
168+
operations.add(new ActorStateOperation(operationName, new ActorState(key, value, state.getExpiration())));
164169
}
165170

166171
return this.daprClient.saveStateTransactionally(actorType, actorId.toString(), operations);

‎sdk-actors/src/test/java/io/dapr/actors/runtime/ActorStatefulTest.java

+72-10
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ public interface MyActor {
6868

6969
Mono<String> setMessage(String message);
7070

71+
Mono<Boolean> setMessageFor1s(String message);
72+
73+
Mono<Boolean> setMessageAndWait(String message);
74+
7175
Mono<String> getMessage();
7276

7377
Mono<Boolean> hasMessage();
@@ -197,14 +201,28 @@ public Mono<Integer> getCountButThrowsException() {
197201

198202
@Override
199203
public Mono<Void> addMessage(String message) {
200-
return super.getActorStateManager().add("message", message);
204+
return super.getActorStateManager().add("message", message, null);
201205
}
202206

203207
@Override
204208
public Mono<String> setMessage(String message) {
205209
return super.getActorStateManager().set("message", message).thenReturn(executeSayMethod(message));
206210
}
207211

212+
@Override
213+
public Mono<Boolean> setMessageFor1s(String message) {
214+
return super
215+
.getActorStateManager().set("message", message, Duration.ofSeconds(1))
216+
.then(super.getActorStateManager().contains("message"));
217+
}
218+
219+
@Override
220+
public Mono<Boolean> setMessageAndWait(String message) {
221+
return super.getActorStateManager().set("message", message, Duration.ofSeconds(1))
222+
.then(Mono.delay(Duration.ofMillis(1100)))
223+
.then(super.getActorStateManager().contains("message"));
224+
}
225+
208226
@Override
209227
public Mono<String> getMessage() {
210228
return super.getActorStateManager().get("message", String.class);
@@ -223,20 +241,20 @@ public Mono<Void> deleteMessage() {
223241
@Override
224242
public Mono<Void> forceDuplicateException() {
225243
// Second add should throw exception.
226-
return super.getActorStateManager().add("message", "anything")
227-
.then(super.getActorStateManager().add("message", "something else"));
244+
return super.getActorStateManager().add("message", "anything", null)
245+
.then(super.getActorStateManager().add("message", "something else", null));
228246
}
229247

230248
@Override
231249
public Mono<Void> forcePartialChange() {
232-
return super.getActorStateManager().add("message", "first message")
250+
return super.getActorStateManager().add("message", "first message", null)
233251
.then(super.saveState())
234-
.then(super.getActorStateManager().add("message", "second message"));
252+
.then(super.getActorStateManager().add("message", "second message", null));
235253
}
236254

237255
@Override
238256
public Mono<Void> throwsWithoutSaving() {
239-
return super.getActorStateManager().add("message", "first message")
257+
return super.getActorStateManager().add("message", "first message", null)
240258
.then(Mono.error(new IllegalCharsetNameException("random")));
241259
}
242260

@@ -298,23 +316,67 @@ public MyMethodContext setName(String name) {
298316
public void happyGetSetDeleteContains() {
299317
ActorProxy proxy = newActorProxy();
300318
Assertions.assertEquals(
301-
proxy.getActorId().toString(), proxy.invokeMethod("getIdString", String.class).block());
319+
proxy.getActorId().toString(), proxy.invokeMethod("getIdString", String.class).block());
302320
Assertions.assertFalse(proxy.invokeMethod("hasMessage", Boolean.class).block());
303321

304322
proxy.invokeMethod("setMessage", "hello world").block();
305323
Assertions.assertTrue(proxy.invokeMethod("hasMessage", Boolean.class).block());
306324

307325
Assertions.assertEquals(
308-
"hello world", proxy.invokeMethod("getMessage", String.class).block());
326+
"hello world", proxy.invokeMethod("getMessage", String.class).block());
309327

310328
Assertions.assertEquals(
311-
executeSayMethod("hello world"),
312-
proxy.invokeMethod("setMessage", "hello world", String.class).block());
329+
executeSayMethod("hello world"),
330+
proxy.invokeMethod("setMessage", "hello world", String.class).block());
313331

314332
proxy.invokeMethod("deleteMessage").block();
315333
Assertions.assertFalse(proxy.invokeMethod("hasMessage", Boolean.class).block());
316334
}
317335

336+
@Test
337+
public void actorStateTTL() throws Exception {
338+
ActorProxy proxy = newActorProxy();
339+
Assertions.assertEquals(
340+
proxy.getActorId().toString(), proxy.invokeMethod("getIdString", String.class).block());
341+
Assertions.assertFalse(proxy.invokeMethod("hasMessage", Boolean.class).block());
342+
343+
Assertions.assertTrue(
344+
proxy.invokeMethod("setMessageFor1s", "hello world expires in 1s", Boolean.class).block());
345+
Assertions.assertTrue(proxy.invokeMethod("hasMessage", Boolean.class).block());
346+
347+
Assertions.assertEquals(
348+
"hello world expires in 1s", proxy.invokeMethod("getMessage", String.class).block());
349+
350+
Assertions.assertTrue(proxy.invokeMethod("hasMessage", Boolean.class).block());
351+
352+
Thread.sleep(1100);
353+
354+
Assertions.assertFalse(proxy.invokeMethod("hasMessage", Boolean.class).block());
355+
}
356+
357+
@Test
358+
public void actorStateTTLExpiresInLocalCache() throws Exception {
359+
ActorProxy proxy = newActorProxy();
360+
Assertions.assertEquals(
361+
proxy.getActorId().toString(), proxy.invokeMethod("getIdString", String.class).block());
362+
Assertions.assertFalse(proxy.invokeMethod("hasMessage", Boolean.class).block());
363+
364+
//First, sets a message without TTL and checks it is saved.
365+
proxy.invokeMethod("setMessage", "hello world").block();
366+
Assertions.assertTrue(proxy.invokeMethod("hasMessage", Boolean.class).block());
367+
Assertions.assertEquals(
368+
"hello world", proxy.invokeMethod("getMessage", String.class).block());
369+
370+
// Now, sets a message that expires still in local cache, before it is sent to state store.
371+
Assertions.assertFalse(
372+
proxy.invokeMethod("setMessageAndWait", "expires while still in cache", Boolean.class).block());
373+
Assertions.assertFalse(proxy.invokeMethod("hasMessage", Boolean.class).block());
374+
375+
Thread.sleep(1100);
376+
377+
Assertions.assertFalse(proxy.invokeMethod("hasMessage", Boolean.class).block());
378+
}
379+
318380
@Test
319381
public void lazyGet() {
320382
ActorProxy proxy = newActorProxy();

‎sdk-actors/src/test/java/io/dapr/actors/runtime/DaprGrpcClientTest.java

+9-9
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,8 @@ public class DaprGrpcClientTest {
6363
private static final byte[] RESPONSE_PAYLOAD = "\"hello world\"".getBytes();
6464

6565
private static final List<ActorStateOperation> OPERATIONS = Arrays.asList(
66-
new ActorStateOperation("upsert", "mykey", "hello world".getBytes()),
67-
new ActorStateOperation("delete", "mykey", null));
66+
new ActorStateOperation("upsert", new ActorState("mykey", "hello world".getBytes(), null)),
67+
new ActorStateOperation("delete", new ActorState("mykey", null, null)));
6868

6969
private final DaprGrpc.DaprImplBase serviceImpl = new CustomDaprClient();
7070

@@ -92,7 +92,7 @@ public void setup() throws IOException {
9292

9393
@Test
9494
public void getActorStateException() {
95-
Mono<byte[]> result = client.getState(ACTOR_TYPE, ACTOR_EXCEPTION, KEY);
95+
Mono<ActorState<byte[]>> result = client.getState(ACTOR_TYPE, ACTOR_EXCEPTION, KEY);
9696
assertThrowsDaprException(
9797
ExecutionException.class,
9898
"UNKNOWN",
@@ -102,8 +102,8 @@ public void getActorStateException() {
102102

103103
@Test
104104
public void getActorState() {
105-
Mono<byte[]> result = client.getState(ACTOR_TYPE, ACTOR_ID, KEY);
106-
assertArrayEquals(RESPONSE_PAYLOAD, result.block());
105+
Mono<ActorState<byte[]>> result = client.getState(ACTOR_TYPE, ACTOR_ID, KEY);
106+
assertArrayEquals(RESPONSE_PAYLOAD, result.block().getValue());
107107
}
108108

109109
@Test
@@ -130,8 +130,8 @@ public void saveActorStateTransactionallyByteArray() {
130130
@Test
131131
public void saveActorStateTransactionallyInvalidValueType() {
132132
ActorStateOperation[] operations = new ActorStateOperation[]{
133-
new ActorStateOperation("upsert", "mykey", 123),
134-
new ActorStateOperation("delete", "mykey", null),
133+
new ActorStateOperation("upsert", new ActorState("mykey", 123, null)),
134+
new ActorStateOperation("delete", new ActorState("mykey", null, null)),
135135
};
136136

137137
Mono<Void> result = client.saveStateTransactionally(ACTOR_TYPE, ACTOR_ID, Arrays.asList(operations));
@@ -327,9 +327,9 @@ public boolean matches(DaprProtos.ExecuteActorStateTransactionRequest argument)
327327
for (ActorStateOperation operation : operations) {
328328
boolean found = false;
329329
for (DaprProtos.TransactionalActorStateOperation grpcOperation : argument.getOperationsList()) {
330-
if (operation.getKey().equals(grpcOperation.getKey())
330+
if (operation.getState().getName().equals(grpcOperation.getKey())
331331
&& operation.getOperationType().equals(grpcOperation.getOperationType())
332-
&& nullableEquals(operation.getValue(), grpcOperation.getValue())) {
332+
&& nullableEquals(operation.getState().getValue(), grpcOperation.getValue())) {
333333
found = true;
334334
break;
335335
}

‎sdk-actors/src/test/java/io/dapr/actors/runtime/DaprInMemoryStateProvider.java

+31-7
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import reactor.core.publisher.Mono;
2020

2121
import java.io.IOException;
22+
import java.time.Instant;
2223
import java.util.HashMap;
2324
import java.util.Map;
2425

@@ -27,7 +28,7 @@
2728
*/
2829
public class DaprInMemoryStateProvider extends DaprStateAsyncProvider {
2930

30-
private static final Map<String, byte[]> stateStore = new HashMap<>();
31+
private static final Map<String, ActorState<byte[]>> stateStore = new HashMap<>();
3132

3233
private final DaprObjectSerializer serializer;
3334

@@ -37,40 +38,63 @@ public class DaprInMemoryStateProvider extends DaprStateAsyncProvider {
3738
}
3839

3940
@Override
40-
<T> Mono<T> load(String actorType, ActorId actorId, String stateName, TypeRef<T> type) {
41+
<T> Mono<ActorState<T>> load(String actorType, ActorId actorId, String stateName, TypeRef<T> type) {
4142
return Mono.fromSupplier(() -> {
4243
try {
4344
String stateId = this.buildId(actorType, actorId, stateName);
4445
if (!stateStore.containsKey(stateId)) {
4546
throw new IllegalStateException("State not found.");
4647
}
4748

48-
return this.serializer.deserialize(this.stateStore.get(stateId), type);
49+
var state = this.stateStore.get(stateId);
50+
if (state.getExpiration() != null) {
51+
if (!state.getExpiration().isAfter(Instant.now())) {
52+
throw new IllegalStateException("State expired.");
53+
}
54+
}
55+
var v = this.serializer.deserialize(state.getValue(), type);
56+
return new ActorState<>(stateName, v, state.getExpiration());
4957
} catch (IOException e) {
5058
throw new RuntimeException(e);
5159
}
5260
});
5361
}
5462

63+
private boolean contains(String stateId) {
64+
if (!stateStore.containsKey(stateId)) {
65+
return false;
66+
}
67+
68+
var state = this.stateStore.get(stateId);
69+
if (state.getExpiration() != null) {
70+
if (!state.getExpiration().isAfter(Instant.now())) {
71+
return false;
72+
}
73+
}
74+
75+
return true;
76+
}
77+
5578
@Override
5679
Mono<Boolean> contains(String actorType, ActorId actorId, String stateName) {
57-
return Mono.fromSupplier(() -> stateStore.containsKey(this.buildId(actorType, actorId, stateName)));
80+
return Mono.fromSupplier(() -> contains(this.buildId(actorType, actorId, stateName)));
5881
}
5982

6083
@Override
6184
Mono<Void> apply(String actorType, ActorId actorId, ActorStateChange... stateChanges) {
6285
return Mono.fromRunnable(() -> {
6386
try {
6487
for (ActorStateChange stateChange : stateChanges) {
65-
String stateId = buildId(actorType, actorId, stateChange.getStateName());
88+
String stateId = buildId(actorType, actorId, stateChange.getState().getName());
6689
switch (stateChange.getChangeKind()) {
6790
case REMOVE:
6891
stateStore.remove(stateId);
6992
break;
7093
case ADD:
7194
case UPDATE:
72-
byte[] raw = this.serializer.serialize(stateChange.getValue());
73-
stateStore.put(stateId, raw);
95+
byte[] raw = this.serializer.serialize(stateChange.getState().getValue());
96+
stateStore.put(stateId,
97+
new ActorState<>(stateChange.getState().getName(), raw, stateChange.getState().getExpiration()));
7498
break;
7599
}
76100
}

‎sdk-actors/src/test/java/io/dapr/actors/runtime/DaprStateAsyncProviderTest.java

+40-33
Original file line numberDiff line numberDiff line change
@@ -110,13 +110,13 @@ public void happyCaseApply() {
110110
if (operation.getOperationType() == null) {
111111
return false;
112112
}
113-
if (operation.getKey() == null) {
113+
if (operation.getState().getName() == null) {
114114
return false;
115115
}
116116

117117
String opName = operation.getOperationType();
118-
String key = operation.getKey();
119-
Object value = operation.getValue();
118+
String key = operation.getState().getName();
119+
Object value = operation.getState().getValue();
120120

121121
foundInsertName |= "upsert".equals(opName) &&
122122
"name".equals(key) &&
@@ -153,58 +153,59 @@ public void happyCaseLoad() throws Exception {
153153
DaprClient daprClient = mock(DaprClient.class);
154154
when(daprClient
155155
.getState(any(), any(), eq("name")))
156-
.thenReturn(Mono.just(SERIALIZER.serialize("Jon Doe")));
156+
.thenReturn(Mono.just(new ActorState<>("name", SERIALIZER.serialize("Jon Doe"))));
157157
when(daprClient
158158
.getState(any(), any(), eq("zipcode")))
159-
.thenReturn(Mono.just(SERIALIZER.serialize(98021)));
159+
.thenReturn(Mono.just(new ActorState<>("zipcode", SERIALIZER.serialize(98021))));
160160
when(daprClient
161161
.getState(any(), any(), eq("goals")))
162-
.thenReturn(Mono.just(SERIALIZER.serialize(98)));
162+
.thenReturn(Mono.just(new ActorState<>("goals", SERIALIZER.serialize(98))));
163163
when(daprClient
164164
.getState(any(), any(), eq("balance")))
165-
.thenReturn(Mono.just(SERIALIZER.serialize(46.55)));
165+
.thenReturn(Mono.just(new ActorState<>("balance", SERIALIZER.serialize(46.55))));
166166
when(daprClient
167167
.getState(any(), any(), eq("active")))
168-
.thenReturn(Mono.just(SERIALIZER.serialize(true)));
168+
.thenReturn(Mono.just(new ActorState<>("active", SERIALIZER.serialize(true))));
169169
when(daprClient
170170
.getState(any(), any(), eq("customer")))
171-
.thenReturn(Mono.just("{ \"id\": 1000, \"name\": \"Roxane\"}".getBytes()));
171+
.thenReturn(Mono.just(new ActorState<>("customer", "{ \"id\": 1000, \"name\": \"Roxane\"}".getBytes())));
172172
when(daprClient
173173
.getState(any(), any(), eq("anotherCustomer")))
174-
.thenReturn(Mono.just("{ \"id\": 2000, \"name\": \"Max\"}".getBytes()));
174+
.thenReturn(Mono.just(new ActorState<>("anotherCustomer", "{ \"id\": 2000, \"name\": \"Max\"}".getBytes())));
175175
when(daprClient
176176
.getState(any(), any(), eq("nullCustomer")))
177177
.thenReturn(Mono.empty());
178178
when(daprClient
179179
.getState(any(), any(), eq("bytes")))
180-
.thenReturn(Mono.just("\"QQ==\"".getBytes()));
180+
.thenReturn(Mono.just(new ActorState<>("bytes", "\"QQ==\"".getBytes())));
181181
when(daprClient
182182
.getState(any(), any(), eq("emptyBytes")))
183-
.thenReturn(Mono.just(new byte[0]));
183+
.thenReturn(Mono.just(new ActorState<>("emptyBytes", new byte[0])));
184184

185185
DaprStateAsyncProvider provider = new DaprStateAsyncProvider(daprClient, SERIALIZER);
186186

187187
Assertions.assertEquals("Jon Doe",
188-
provider.load("MyActor", new ActorId("123"), "name", TypeRef.STRING).block());
188+
provider.load("MyActor", new ActorId("123"), "name", TypeRef.STRING).block().getValue());
189189
Assertions.assertEquals(98021,
190-
(int) provider.load("MyActor", new ActorId("123"), "zipcode", TypeRef.INT).block());
190+
(int) provider.load("MyActor", new ActorId("123"), "zipcode", TypeRef.INT).block().getValue());
191191
Assertions.assertEquals(98,
192-
(int) provider.load("MyActor", new ActorId("123"), "goals", TypeRef.INT).block());
192+
(int) provider.load("MyActor", new ActorId("123"), "goals", TypeRef.INT).block().getValue());
193193
Assertions.assertEquals(98,
194-
(int) provider.load("MyActor", new ActorId("123"), "goals", TypeRef.INT).block());
194+
(int) provider.load("MyActor", new ActorId("123"), "goals", TypeRef.INT).block().getValue());
195195
Assertions.assertEquals(46.55,
196-
(double) provider.load("MyActor", new ActorId("123"), "balance", TypeRef.DOUBLE).block(),
196+
(double) provider.load("MyActor", new ActorId("123"), "balance", TypeRef.DOUBLE).block().getValue(),
197197
EPSILON);
198198
Assertions.assertEquals(true,
199-
(boolean) provider.load("MyActor", new ActorId("123"), "active", TypeRef.BOOLEAN).block());
199+
(boolean) provider.load("MyActor", new ActorId("123"), "active", TypeRef.BOOLEAN).block().getValue());
200200
Assertions.assertEquals(new Customer().setId(1000).setName("Roxane"),
201-
provider.load("MyActor", new ActorId("123"), "customer", TypeRef.get(Customer.class)).block());
201+
provider.load("MyActor", new ActorId("123"), "customer", TypeRef.get(Customer.class)).block().getValue());
202202
Assertions.assertNotEquals(new Customer().setId(1000).setName("Roxane"),
203-
provider.load("MyActor", new ActorId("123"), "anotherCustomer", TypeRef.get(Customer.class)).block());
203+
provider.load(
204+
"MyActor", new ActorId("123"), "anotherCustomer", TypeRef.get(Customer.class)).block().getValue());
204205
Assertions.assertNull(
205206
provider.load("MyActor", new ActorId("123"), "nullCustomer", TypeRef.get(Customer.class)).block());
206207
Assertions.assertArrayEquals("A".getBytes(),
207-
provider.load("MyActor", new ActorId("123"), "bytes", TypeRef.get(byte[].class)).block());
208+
provider.load("MyActor", new ActorId("123"), "bytes", TypeRef.get(byte[].class)).block().getValue());
208209
Assertions.assertNull(
209210
provider.load("MyActor", new ActorId("123"), "emptyBytes", TypeRef.get(byte[].class)).block());
210211
}
@@ -216,22 +217,28 @@ public void happyCaseContains() {
216217
// Keys that exists.
217218
when(daprClient
218219
.getState(any(), any(), eq("name")))
219-
.thenReturn(Mono.just("Jon Doe".getBytes()));
220+
.thenReturn(Mono.just(
221+
new ActorState<>("name", "Jon Doe".getBytes())));
220222
when(daprClient
221223
.getState(any(), any(), eq("zipcode")))
222-
.thenReturn(Mono.just("98021".getBytes()));
224+
.thenReturn(Mono.just(
225+
new ActorState<>("zipcode", "98021".getBytes())));
223226
when(daprClient
224227
.getState(any(), any(), eq("goals")))
225-
.thenReturn(Mono.just("98".getBytes()));
228+
.thenReturn(Mono.just(
229+
new ActorState<>("goals", "98".getBytes())));
226230
when(daprClient
227231
.getState(any(), any(), eq("balance")))
228-
.thenReturn(Mono.just("46.55".getBytes()));
232+
.thenReturn(Mono.just(
233+
new ActorState<>("balance", "46.55".getBytes())));
229234
when(daprClient
230235
.getState(any(), any(), eq("active")))
231-
.thenReturn(Mono.just("true".getBytes()));
236+
.thenReturn(Mono.just(
237+
new ActorState<>("active", "true".getBytes())));
232238
when(daprClient
233239
.getState(any(), any(), eq("customer")))
234-
.thenReturn(Mono.just("{ \"id\": \"3000\", \"name\": \"Ely\" }".getBytes()));
240+
.thenReturn(Mono.just(
241+
new ActorState<>("customer", "{ \"id\": \"3000\", \"name\": \"Ely\" }".getBytes())));
235242

236243
// Keys that do not exist.
237244
when(daprClient
@@ -257,15 +264,15 @@ public void happyCaseContains() {
257264
Assertions.assertFalse(provider.contains("MyActor", new ActorId("123"), null).block());
258265
}
259266

260-
private final <T> ActorStateChange createInsertChange(String name, T value) {
261-
return new ActorStateChange(name, value, ActorStateChangeKind.ADD);
267+
private <T> ActorStateChange createInsertChange(String name, T value) {
268+
return new ActorStateChange(new ActorState(name, value), ActorStateChangeKind.ADD);
262269
}
263270

264-
private final <T> ActorStateChange createUpdateChange(String name, T value) {
265-
return new ActorStateChange(name, value, ActorStateChangeKind.UPDATE);
271+
private <T> ActorStateChange createUpdateChange(String name, T value) {
272+
return new ActorStateChange(new ActorState(name, value), ActorStateChangeKind.UPDATE);
266273
}
267274

268-
private final ActorStateChange createDeleteChange(String name) {
269-
return new ActorStateChange(name, null, ActorStateChangeKind.REMOVE);
275+
private ActorStateChange createDeleteChange(String name) {
276+
return new ActorStateChange(new ActorState(name, null), ActorStateChangeKind.REMOVE);
270277
}
271278
}
+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
apiVersion: dapr.io/v1alpha1
2+
kind: Component
3+
metadata:
4+
name: mysql-actorstatestore
5+
spec:
6+
type: state.mysql
7+
version: v1
8+
metadata:
9+
- name: connectionString
10+
value: "root:test@tcp(127.0.0.1:3306)/?allowNativePasswords=true"
11+
- name: cleanupIntervalInSeconds
12+
value: "1"
13+
- name: actorStateStore
14+
value: "true"
15+
scopes:
16+
- actorstateit-statefulactorservice

‎sdk-tests/components/statestore.yaml

-2
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,3 @@ spec:
1010
value: localhost:6379
1111
- name: redisPassword
1212
value: ""
13-
- name: actorStateStore
14-
value: "true"

‎sdk-tests/configurations/configuration.yaml

+4-1
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,7 @@ spec:
66
tracing:
77
samplingRate: "1"
88
zipkin:
9-
endpointAddress: http://localhost:9411/api/v2/spans
9+
endpointAddress: http://localhost:9411/api/v2/spans
10+
features:
11+
- name: ActorStateTTL
12+
enabled: true

‎sdk-tests/deploy/local-test.yml

+7
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,10 @@ services:
2626
image: mongo
2727
ports:
2828
- "27017:27017"
29+
30+
mysql:
31+
image: mysql
32+
environment:
33+
MYSQL_ROOT_PASSWORD: test
34+
ports:
35+
- '3306:3306'

‎sdk-tests/src/test/java/io/dapr/it/actors/ActorStateIT.java

+84-9
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,12 @@ public void writeReadState(AppRun.AppProtocol serviceAppProtocol) throws Excepti
5656
logger.debug("Starting actor runtime ...");
5757
// The call below will fail if service cannot start successfully.
5858
DaprRun runtime = startDaprApp(
59-
this.getClass().getSimpleName(),
60-
StatefulActorService.SUCCESS_MESSAGE,
61-
StatefulActorService.class,
62-
true,
63-
60000,
64-
serviceAppProtocol);
59+
this.getClass().getSimpleName(),
60+
StatefulActorService.SUCCESS_MESSAGE,
61+
StatefulActorService.class,
62+
true,
63+
60000,
64+
serviceAppProtocol);
6565

6666
String message = "This is a message to be saved and retrieved.";
6767
String name = "Jon Doe";
@@ -75,8 +75,8 @@ public void writeReadState(AppRun.AppProtocol serviceAppProtocol) throws Excepti
7575
ActorProxy proxy = proxyBuilder.build(actorId);
7676

7777
// wating for actor to be activated
78-
Thread.sleep(2000);
79-
78+
Thread.sleep(2000);
79+
8080
// Validate conditional read works.
8181
callWithRetry(() -> {
8282
logger.debug("Invoking readMessage where data is not present yet ... ");
@@ -162,7 +162,7 @@ public void writeReadState(AppRun.AppProtocol serviceAppProtocol) throws Excepti
162162
ActorProxy newProxy = proxyBuilder.build(actorId);
163163

164164
// wating for actor to be activated
165-
Thread.sleep(2000);
165+
Thread.sleep(2000);
166166

167167
callWithRetry(() -> {
168168
logger.debug("Invoking readMessage where data is not cached ... ");
@@ -189,4 +189,79 @@ public void writeReadState(AppRun.AppProtocol serviceAppProtocol) throws Excepti
189189
assertArrayEquals(bytes, result);
190190
}, 5000);
191191
}
192+
193+
@ParameterizedTest
194+
@MethodSource("data")
195+
public void stateTTL(AppRun.AppProtocol serviceAppProtocol) throws Exception {
196+
logger.debug("Starting actor runtime ...");
197+
// The call below will fail if service cannot start successfully.
198+
DaprRun runtime = startDaprApp(
199+
this.getClass().getSimpleName(),
200+
StatefulActorService.SUCCESS_MESSAGE,
201+
StatefulActorService.class,
202+
true,
203+
60000,
204+
serviceAppProtocol);
205+
206+
String message = "This is a message to be saved and retrieved.";
207+
String name = "Jon Doe";
208+
byte[] bytes = new byte[] { 0x1 };
209+
ActorId actorId = new ActorId(
210+
String.format("%d-%b-state-ttl", System.currentTimeMillis(), serviceAppProtocol));
211+
String actorType = "StatefulActorTest";
212+
logger.debug("Building proxy ...");
213+
ActorProxyBuilder<ActorProxy> proxyBuilder =
214+
new ActorProxyBuilder(actorType, ActorProxy.class, newActorClient());
215+
ActorProxy proxy = proxyBuilder.build(actorId);
216+
217+
// wating for actor to be activated
218+
Thread.sleep(2000);
219+
220+
// Validate conditional read works.
221+
callWithRetry(() -> {
222+
logger.debug("Invoking readMessage where data is not present yet ... ");
223+
String result = proxy.invokeMethod("readMessage", String.class).block();
224+
assertNull(result);
225+
}, 5000);
226+
227+
callWithRetry(() -> {
228+
logger.debug("Invoking writeMessageFor1s ... ");
229+
proxy.invokeMethod("writeMessageFor1s", message).block();
230+
}, 5000);
231+
232+
callWithRetry(() -> {
233+
logger.debug("Invoking readMessage where data is probably still cached ... ");
234+
String result = proxy.invokeMethod("readMessage", String.class).block();
235+
assertEquals(message, result);
236+
}, 5000);
237+
238+
239+
logger.debug("Waiting, so actor can be deactivated ...");
240+
Thread.sleep(10000);
241+
242+
logger.debug("Stopping service ...");
243+
runtime.stop();
244+
245+
logger.debug("Starting service ...");
246+
DaprRun run2 = startDaprApp(
247+
this.getClass().getSimpleName(),
248+
StatefulActorService.SUCCESS_MESSAGE,
249+
StatefulActorService.class,
250+
true,
251+
60000,
252+
serviceAppProtocol);
253+
254+
// Need new proxy builder because the proxy builder holds the channel.
255+
proxyBuilder = new ActorProxyBuilder(actorType, ActorProxy.class, newActorClient());
256+
ActorProxy newProxy = proxyBuilder.build(actorId);
257+
258+
// waiting for actor to be activated
259+
Thread.sleep(2000);
260+
261+
callWithRetry(() -> {
262+
logger.debug("Invoking readMessage where data is not cached and expired ... ");
263+
String result = newProxy.invokeMethod("readMessage", String.class).block();
264+
assertNull(result);
265+
}, 5000);
266+
}
192267
}

‎sdk-tests/src/test/java/io/dapr/it/actors/services/springboot/StatefulActor.java

+2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ public interface StatefulActor {
1717

1818
void writeMessage(String something);
1919

20+
void writeMessageFor1s(String something);
21+
2022
String readMessage();
2123

2224
void writeName(String something);

‎sdk-tests/src/test/java/io/dapr/it/actors/services/springboot/StatefulActorImpl.java

+7
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import io.dapr.actors.runtime.AbstractActor;
1919
import io.dapr.actors.runtime.ActorRuntimeContext;
2020

21+
import java.time.Duration;
22+
2123
@ActorType(name = "StatefulActorTest")
2224
public class StatefulActorImpl extends AbstractActor implements StatefulActor {
2325

@@ -30,6 +32,11 @@ public void writeMessage(String something) {
3032
super.getActorStateManager().set("message", something).block();
3133
}
3234

35+
@Override
36+
public void writeMessageFor1s(String something) {
37+
super.getActorStateManager().set("message", something, Duration.ofSeconds(1)).block();
38+
}
39+
3340
@Override
3441
public String readMessage() {
3542
if (super.getActorStateManager().contains("message").block()) {

0 commit comments

Comments
 (0)
Please sign in to comment.