From 8848c403201867e8558036f4d0bfe979aa8678bb Mon Sep 17 00:00:00 2001 From: Artur Ciocanu Date: Sat, 1 Feb 2025 12:05:57 +0200 Subject: [PATCH] Ensure ActorRuntime can be instantiated using Properties Signed-off-by: Artur Ciocanu --- .../io/dapr/actors/client/ActorClient.java | 25 +------- .../io/dapr/actors/runtime/ActorRuntime.java | 61 +++++++++++-------- .../dapr/actors/runtime/ActorRuntimeTest.java | 40 ++++++------ .../actors/ActorTurnBasedConcurrencyIT.java | 26 ++------ 4 files changed, 63 insertions(+), 89 deletions(-) diff --git a/sdk-actors/src/main/java/io/dapr/actors/client/ActorClient.java b/sdk-actors/src/main/java/io/dapr/actors/client/ActorClient.java index 08c0a1c9ec..5ae955bfc2 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/client/ActorClient.java +++ b/sdk-actors/src/main/java/io/dapr/actors/client/ActorClient.java @@ -15,11 +15,10 @@ import io.dapr.client.resiliency.ResiliencyOptions; import io.dapr.config.Properties; -import io.dapr.utils.Version; +import io.dapr.utils.NetworkUtils; import io.dapr.v1.DaprGrpc; import io.grpc.Channel; import io.grpc.ManagedChannel; -import io.grpc.ManagedChannelBuilder; import reactor.core.publisher.Mono; import java.util.Collections; @@ -83,7 +82,7 @@ public ActorClient(Properties overrideProperties, ResiliencyOptions resiliencyOp * @param resiliencyOptions Client resiliency options. */ public ActorClient(Properties overrideProperties, Map metadata, ResiliencyOptions resiliencyOptions) { - this(buildManagedChannel(overrideProperties), + this(NetworkUtils.buildGrpcManagedChannel(overrideProperties), metadata, resiliencyOptions, overrideProperties.getValue(Properties.API_TOKEN)); @@ -129,26 +128,6 @@ public void close() { } } - /** - * Creates a GRPC managed channel (or null, if not applicable). - * - * @param overrideProperties Overrides - * @return GRPC managed channel or null. - */ - private static ManagedChannel buildManagedChannel(Properties overrideProperties) { - int port = overrideProperties.getValue(Properties.GRPC_PORT); - if (port <= 0) { - throw new IllegalArgumentException("Invalid port."); - } - - var sidecarHost = overrideProperties.getValue(Properties.SIDECAR_IP); - - return ManagedChannelBuilder.forAddress(sidecarHost, port) - .usePlaintext() - .userAgent(Version.getSdkVersion()) - .build(); - } - /** * Build an instance of the Client based on the provided setup. * diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntime.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntime.java index 8eb09bc7db..b12c4d9244 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntime.java +++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntime.java @@ -18,9 +18,8 @@ import io.dapr.config.Properties; import io.dapr.serializer.DaprObjectSerializer; import io.dapr.serializer.DefaultObjectSerializer; -import io.dapr.utils.Version; +import io.dapr.utils.NetworkUtils; import io.grpc.ManagedChannel; -import io.grpc.ManagedChannelBuilder; import reactor.core.publisher.Mono; import java.io.Closeable; @@ -77,20 +76,30 @@ public class ActorRuntime implements Closeable { /** * The default constructor. This should not be called directly. * - * @throws IllegalStateException If cannot instantiate Runtime. + * @throws IllegalStateException If you cannot instantiate Runtime. */ private ActorRuntime() throws IllegalStateException { - this(buildManagedChannel()); + this(new Properties()); + } + + /** + * Constructor once channel is available. This should not be called directly. + * + * @param properties Properties to use. + * @throws IllegalStateException If you cannot instantiate Runtime. + */ + private ActorRuntime(Properties properties) throws IllegalStateException { + this(NetworkUtils.buildGrpcManagedChannel(properties)); } /** * Constructor once channel is available. This should not be called directly. * * @param channel GRPC managed channel to be closed (or null). - * @throws IllegalStateException If cannot instantiate Runtime. + * @throws IllegalStateException If you cannot instantiate Runtime. */ private ActorRuntime(ManagedChannel channel) throws IllegalStateException { - this(channel, buildDaprClient(channel)); + this(channel, new DaprClientImpl(channel)); } /** @@ -112,7 +121,7 @@ private ActorRuntime(ManagedChannel channel, DaprClient daprClient) throws Illeg } /** - * Returns an ActorRuntime object. + * Creates or returns an existing ActorRuntime object. * * @return An ActorRuntime object. */ @@ -128,6 +137,24 @@ public static ActorRuntime getInstance() { return instance; } + /** + * Creates or returns an existing ActorRuntime object. + * + * @param properties Properties to use. + * @return An ActorRuntime object. + */ + public static ActorRuntime getInstance(Properties properties) { + if (instance == null) { + synchronized (ActorRuntime.class) { + if (instance == null) { + instance = new ActorRuntime(properties); + } + } + } + + return instance; + } + /** * Gets the Actor configuration for this runtime. * @@ -149,7 +176,6 @@ public byte[] serializeConfig() throws IOException { /** * Registers an actor with the runtime, using {@link DefaultObjectSerializer} and {@link DefaultActorFactory}. - * * {@link DefaultObjectSerializer} is not recommended for production scenarios. * * @param clazz The type of actor. @@ -314,27 +340,10 @@ private ActorManager getActorManager(String actorTypeName) { * @return an instance of the setup Client * @throws java.lang.IllegalStateException if any required field is missing */ - private static DaprClient buildDaprClient(ManagedChannel channel) { + private DaprClient buildDaprClient(ManagedChannel channel) { return new DaprClientImpl(channel); } - /** - * Creates a GRPC managed channel (or null, if not applicable). - * - * @return GRPC managed channel or null. - */ - private static ManagedChannel buildManagedChannel() { - int port = Properties.GRPC_PORT.get(); - if (port <= 0) { - throw new IllegalStateException("Invalid port."); - } - - return ManagedChannelBuilder.forAddress(Properties.SIDECAR_IP.get(), port) - .usePlaintext() - .userAgent(Version.getSdkVersion()) - .build(); - } - /** * {@inheritDoc} */ diff --git a/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorRuntimeTest.java b/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorRuntimeTest.java index 49167116fe..2ebccb4cc1 100644 --- a/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorRuntimeTest.java +++ b/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorRuntimeTest.java @@ -16,7 +16,6 @@ import io.dapr.actors.ActorId; import io.dapr.actors.ActorType; import io.dapr.serializer.DefaultObjectSerializer; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -27,7 +26,11 @@ import java.util.Arrays; import java.util.UUID; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.Mockito.mock; public class ActorRuntimeTest { @@ -97,8 +100,6 @@ public int count() { private static Constructor constructor; - private DaprClient mockDaprClient; - private ActorRuntime runtime; @BeforeAll @@ -113,8 +114,7 @@ public static void beforeAll() throws Exception { @BeforeEach public void setup() throws Exception { - this.mockDaprClient = mock(DaprClient.class); - this.runtime = constructor.newInstance(null, this.mockDaprClient); + this.runtime = constructor.newInstance(null, mock(DaprClient.class)); } @Test @@ -143,21 +143,21 @@ public void registerActorNullStateSerializer() { @Test public void setActorIdleTimeout() throws Exception { this.runtime.getConfig().setActorIdleTimeout(Duration.ofSeconds(123)); - Assertions.assertEquals("{\"entities\":[],\"actorIdleTimeout\":\"0h2m3s0ms\"}", + assertEquals("{\"entities\":[],\"actorIdleTimeout\":\"0h2m3s0ms\"}", new String(this.runtime.serializeConfig())); } @Test public void setActorScanInterval() throws Exception { this.runtime.getConfig().setActorScanInterval(Duration.ofSeconds(123)); - Assertions.assertEquals("{\"entities\":[],\"actorScanInterval\":\"0h2m3s0ms\"}", + assertEquals("{\"entities\":[],\"actorScanInterval\":\"0h2m3s0ms\"}", new String(this.runtime.serializeConfig())); } @Test public void setDrainBalancedActors() throws Exception { this.runtime.getConfig().setDrainBalancedActors(true); - Assertions.assertEquals("{\"entities\":[],\"drainBalancedActors\":true}", + assertEquals("{\"entities\":[],\"drainBalancedActors\":true}", new String(this.runtime.serializeConfig())); } @@ -183,7 +183,7 @@ public void addActorTypeConfig() throws Exception { this.runtime.getConfig().addActorTypeConfig(actorTypeConfig2); this.runtime.getConfig().addRegisteredActorType("actor2"); - Assertions.assertEquals( + assertEquals( "{\"entities\":[\"actor1\",\"actor2\"],\"entitiesConfig\":[{\"entities\":[\"actor1\"],\"actorIdleTimeout\":\"0h2m3s0ms\",\"actorScanInterval\":\"0h2m3s0ms\",\"drainOngoingCallTimeout\":\"0h2m3s0ms\",\"drainBalancedActors\":true,\"remindersStoragePartitions\":1},{\"entities\":[\"actor2\"],\"actorIdleTimeout\":\"0h2m3s0ms\",\"actorScanInterval\":\"0h2m3s0ms\",\"drainOngoingCallTimeout\":\"0h2m3s0ms\",\"drainBalancedActors\":false,\"remindersStoragePartitions\":2}]}", new String(this.runtime.serializeConfig()) ); @@ -194,28 +194,28 @@ public void addNullActorTypeConfig() throws Exception { try { this.runtime.getConfig().addActorTypeConfig(null); } catch (Exception ex) { - Assertions.assertTrue(ex instanceof IllegalArgumentException); - Assertions.assertTrue(ex.getMessage().contains("Add actor type config failed.")); + assertInstanceOf(IllegalArgumentException.class, ex); + assertTrue(ex.getMessage().contains("Add actor type config failed.")); } try { this.runtime.getConfig().addRegisteredActorType(null); } catch (Exception ex) { - Assertions.assertTrue(ex instanceof IllegalArgumentException); - Assertions.assertTrue(ex.getMessage().contains("Registered actor must have a type name.")); + assertInstanceOf(IllegalArgumentException.class, ex); + assertTrue(ex.getMessage().contains("Registered actor must have a type name.")); } } @Test public void setDrainOngoingCallTimeout() throws Exception { this.runtime.getConfig().setDrainOngoingCallTimeout(Duration.ofSeconds(123)); - Assertions.assertEquals("{\"entities\":[],\"drainOngoingCallTimeout\":\"0h2m3s0ms\"}", + assertEquals("{\"entities\":[],\"drainOngoingCallTimeout\":\"0h2m3s0ms\"}", new String(this.runtime.serializeConfig())); } @Test public void setRemindersStoragePartitions() throws Exception { this.runtime.getConfig().setRemindersStoragePartitions(12); - Assertions.assertEquals("{\"entities\":[],\"remindersStoragePartitions\":12}", + assertEquals("{\"entities\":[],\"remindersStoragePartitions\":12}", new String(this.runtime.serializeConfig())); } @@ -226,7 +226,7 @@ public void invokeActor() throws Exception { byte[] response = this.runtime.invoke(ACTOR_NAME, actorId, "say", null).block(); String message = ACTOR_STATE_SERIALIZER.deserialize(response, String.class); - Assertions.assertEquals("Nothing to say.", message); + assertEquals("Nothing to say.", message); } @Test @@ -256,8 +256,8 @@ public void lazyDeactivate() throws Exception { deactivateCall.block(); this.runtime.invoke(ACTOR_NAME, actorId, "say", null) - .doOnError(e -> Assertions.assertTrue(e.getMessage().contains("Could not find actor"))) - .doOnSuccess(s -> Assertions.fail()).onErrorReturn("".getBytes()).block(); + .doOnError(e -> assertTrue(e.getMessage().contains("Could not find actor"))) + .doOnSuccess(s -> fail()).onErrorReturn("".getBytes()).block(); } @Test @@ -269,13 +269,13 @@ public void lazyInvoke() throws Exception { byte[] response = this.runtime.invoke(ACTOR_NAME, actorId, "count", null).block(); int count = ACTOR_STATE_SERIALIZER.deserialize(response, Integer.class); - Assertions.assertEquals(0, count); + assertEquals(0, count); invokeCall.block(); response = this.runtime.invoke(ACTOR_NAME, actorId, "count", null).block(); count = ACTOR_STATE_SERIALIZER.deserialize(response, Integer.class); - Assertions.assertEquals(1, count); + assertEquals(1, count); } } diff --git a/sdk-tests/src/test/java/io/dapr/it/actors/ActorTurnBasedConcurrencyIT.java b/sdk-tests/src/test/java/io/dapr/it/actors/ActorTurnBasedConcurrencyIT.java index dd021d98b9..d4f8bb2013 100644 --- a/sdk-tests/src/test/java/io/dapr/it/actors/ActorTurnBasedConcurrencyIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/actors/ActorTurnBasedConcurrencyIT.java @@ -20,9 +20,7 @@ import io.dapr.config.Properties; import io.dapr.it.BaseIT; import io.dapr.it.actors.app.MyActorService; -import io.dapr.utils.Version; -import io.grpc.ManagedChannel; -import io.grpc.ManagedChannelBuilder; +import io.dapr.utils.NetworkUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -36,8 +34,8 @@ import static io.dapr.it.actors.MyActorTestUtils.fetchMethodCallLogs; import static io.dapr.it.actors.MyActorTestUtils.validateMethodCalls; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertTrue; public class ActorTurnBasedConcurrencyIT extends BaseIT { @@ -56,7 +54,7 @@ public class ActorTurnBasedConcurrencyIT extends BaseIT { @AfterEach public void cleanUpTestCase() { // Delete the reminder in case the test failed, otherwise it may interfere with future tests since it is persisted. - var channel = buildManagedChannel(); + var channel = NetworkUtils.buildGrpcManagedChannel(new Properties()); try { System.out.println("Invoking during cleanup"); DaprClientHttpUtils.unregisterActorReminder(channel, ACTOR_TYPE, ACTOR_ID, REMINDER_NAME); @@ -120,7 +118,7 @@ public void invokeOneActorMethodReminderAndTimer() throws Exception { String msg = "message" + i; String reversedString = new StringBuilder(msg).reverse().toString(); String output = proxy.invokeMethod("say", "message" + i, String.class).block(); - assertTrue(reversedString.equals(output)); + assertEquals(reversedString, output); expectedSayMethodInvocations.incrementAndGet(); }); @@ -166,7 +164,7 @@ public void invokeOneActorMethodReminderAndTimer() throws Exception { * @param logs logs with info about method entries and exits returned from the app */ void validateTurnBasedConcurrency(List logs) { - if (logs.size() == 0) { + if (logs.isEmpty()) { logger.warn("No logs"); return; } @@ -176,7 +174,7 @@ void validateTurnBasedConcurrency(List logs) { if (s.getIsEnter()) { currentMethodName = s.getMethodName(); } else { - assertTrue(currentMethodName.equals(s.getMethodName())); + assertEquals(currentMethodName, s.getMethodName()); } } @@ -228,16 +226,4 @@ void validateEventNotObserved(List logs, String startingPoin } } } - - private static ManagedChannel buildManagedChannel() { - int port = Properties.GRPC_PORT.get(); - if (port <= 0) { - throw new IllegalStateException("Invalid port."); - } - - return ManagedChannelBuilder.forAddress(Properties.SIDECAR_IP.get(), port) - .usePlaintext() - .userAgent(Version.getSdkVersion()) - .build(); - } }