Skip to content

Commit 708405b

Browse files
uglidesazzad16
andauthored
Add A-A failover scenario test (#3935)
* Add A-A failover scenario test * Update src/main/java/redis/clients/jedis/providers/MultiClusterPooledConnectionProvider.java Co-authored-by: M Sazzadul Hoque <[email protected]> * Update src/test/java/redis/clients/jedis/scenario/ActiveActiveFailoverTest.java Co-authored-by: M Sazzadul Hoque <[email protected]> * Add missing import --------- Co-authored-by: M Sazzadul Hoque <[email protected]>
1 parent 4c09472 commit 708405b

File tree

5 files changed

+251
-6
lines changed

5 files changed

+251
-6
lines changed

src/main/java/redis/clients/jedis/providers/MultiClusterPooledConnectionProvider.java

+6
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import redis.clients.jedis.*;
2525
import redis.clients.jedis.MultiClusterClientConfig.ClusterConfig;
2626
import redis.clients.jedis.annots.Experimental;
27+
import redis.clients.jedis.annots.VisibleForTesting;
2728
import redis.clients.jedis.exceptions.JedisConnectionException;
2829
import redis.clients.jedis.exceptions.JedisValidationException;
2930
import redis.clients.jedis.util.Pool;
@@ -299,6 +300,11 @@ public Cluster getCluster() {
299300
return multiClusterMap.get(activeMultiClusterIndex);
300301
}
301302

303+
@VisibleForTesting
304+
public Cluster getCluster(int multiClusterIndex) {
305+
return multiClusterMap.get(multiClusterIndex);
306+
}
307+
302308
public CircuitBreaker getClusterCircuitBreaker() {
303309
return multiClusterMap.get(activeMultiClusterIndex).getCircuitBreaker();
304310
}

src/test/java/redis/clients/jedis/EndpointConfig.java

+4
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ public HostAndPort getHostAndPort() {
3131
return JedisURIHelper.getHostAndPort(endpoints.get(0));
3232
}
3333

34+
public HostAndPort getHostAndPort(int index) {
35+
return JedisURIHelper.getHostAndPort(endpoints.get(index));
36+
}
37+
3438
public String getPassword() {
3539
return password;
3640
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
package redis.clients.jedis.scenario;
2+
3+
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
4+
import org.junit.BeforeClass;
5+
import org.junit.Test;
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
import redis.clients.jedis.*;
9+
import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider;
10+
import redis.clients.jedis.exceptions.JedisConnectionException;
11+
12+
import java.io.IOException;
13+
import java.time.Duration;
14+
import java.time.Instant;
15+
import java.util.HashMap;
16+
import java.util.Map;
17+
import java.util.concurrent.atomic.AtomicLong;
18+
import java.util.concurrent.atomic.AtomicReference;
19+
import java.util.function.Consumer;
20+
21+
import static org.junit.Assert.*;
22+
23+
public class ActiveActiveFailoverTest {
24+
private static final Logger log = LoggerFactory.getLogger(ActiveActiveFailoverTest.class);
25+
26+
private static EndpointConfig endpoint;
27+
28+
private final FaultInjectionClient faultClient = new FaultInjectionClient();
29+
30+
@BeforeClass
31+
public static void beforeClass() {
32+
try {
33+
ActiveActiveFailoverTest.endpoint = HostAndPorts.getRedisEndpoint("re-active-active");
34+
} catch (IllegalArgumentException e) {
35+
log.warn("Skipping test because no Redis endpoint is configured");
36+
org.junit.Assume.assumeTrue(false);
37+
}
38+
}
39+
40+
@Test
41+
public void testFailover() {
42+
43+
MultiClusterClientConfig.ClusterConfig[] clusterConfig = new MultiClusterClientConfig.ClusterConfig[2];
44+
45+
JedisClientConfig config = endpoint.getClientConfigBuilder()
46+
.socketTimeoutMillis(RecommendedSettings.DEFAULT_TIMEOUT_MS)
47+
.connectionTimeoutMillis(RecommendedSettings.DEFAULT_TIMEOUT_MS).build();
48+
49+
clusterConfig[0] = new MultiClusterClientConfig.ClusterConfig(endpoint.getHostAndPort(0),
50+
config, RecommendedSettings.poolConfig);
51+
clusterConfig[1] = new MultiClusterClientConfig.ClusterConfig(endpoint.getHostAndPort(1),
52+
config, RecommendedSettings.poolConfig);
53+
54+
MultiClusterClientConfig.Builder builder = new MultiClusterClientConfig.Builder(clusterConfig);
55+
56+
builder.circuitBreakerSlidingWindowType(CircuitBreakerConfig.SlidingWindowType.TIME_BASED);
57+
builder.circuitBreakerSlidingWindowSize(1); // SLIDING WINDOW SIZE IN SECONDS
58+
builder.circuitBreakerSlidingWindowMinCalls(1);
59+
builder.circuitBreakerFailureRateThreshold(10.0f); // percentage of failures to trigger circuit breaker
60+
61+
builder.retryWaitDuration(10);
62+
builder.retryMaxAttempts(1);
63+
builder.retryWaitDurationExponentialBackoffMultiplier(1);
64+
65+
class FailoverReporter implements Consumer<String> {
66+
67+
String currentClusterName = "not set";
68+
69+
boolean failoverHappened = false;
70+
71+
Instant failoverAt = null;
72+
73+
public String getCurrentClusterName() {
74+
return currentClusterName;
75+
}
76+
77+
@Override
78+
public void accept(String clusterName) {
79+
this.currentClusterName = clusterName;
80+
log.info(
81+
"\n\n====FailoverEvent=== \nJedis failover to cluster: {}\n====FailoverEvent===\n\n",
82+
clusterName);
83+
84+
failoverHappened = true;
85+
failoverAt = Instant.now();
86+
}
87+
}
88+
89+
MultiClusterPooledConnectionProvider provider = new MultiClusterPooledConnectionProvider(
90+
builder.build());
91+
FailoverReporter reporter = new FailoverReporter();
92+
provider.setClusterFailoverPostProcessor(reporter);
93+
provider.setActiveMultiClusterIndex(1);
94+
95+
UnifiedJedis client = new UnifiedJedis(provider);
96+
97+
AtomicLong retryingThreadsCounter = new AtomicLong(0);
98+
AtomicLong failedCommandsAfterFailover = new AtomicLong(0);
99+
AtomicReference<Instant> lastFailedCommandAt = new AtomicReference<>();
100+
101+
// Start thread that imitates an application that uses the client
102+
MultiThreadedFakeApp fakeApp = new MultiThreadedFakeApp(client, (UnifiedJedis c) -> {
103+
104+
long threadId = Thread.currentThread().getId();
105+
106+
int attempt = 0;
107+
int maxTries = 500;
108+
int retryingDelay = 5;
109+
while (true) {
110+
try {
111+
Map<String, String> executionInfo = new HashMap<String, String>() {{
112+
put("threadId", String.valueOf(threadId));
113+
put("cluster", reporter.getCurrentClusterName());
114+
}};
115+
client.xadd("execution_log", StreamEntryID.NEW_ENTRY, executionInfo);
116+
117+
if (attempt > 0) {
118+
log.info("Thread {} recovered after {} ms. Threads still not recovered: {}", threadId,
119+
attempt * retryingDelay, retryingThreadsCounter.decrementAndGet());
120+
}
121+
122+
break;
123+
} catch (JedisConnectionException e) {
124+
125+
if (reporter.failoverHappened) {
126+
long failedCommands = failedCommandsAfterFailover.incrementAndGet();
127+
lastFailedCommandAt.set(Instant.now());
128+
log.warn(
129+
"Thread {} failed to execute command after failover. Failed commands after failover: {}",
130+
threadId, failedCommands);
131+
}
132+
133+
if (attempt == 0) {
134+
long failedThreads = retryingThreadsCounter.incrementAndGet();
135+
log.warn("Thread {} failed to execute command. Failed threads: {}", threadId,
136+
failedThreads);
137+
}
138+
try {
139+
Thread.sleep(retryingDelay);
140+
} catch (InterruptedException ie) {
141+
throw new RuntimeException(ie);
142+
}
143+
if (++attempt == maxTries) throw e;
144+
}
145+
}
146+
return true;
147+
}, 18);
148+
fakeApp.setKeepExecutingForSeconds(30);
149+
Thread t = new Thread(fakeApp);
150+
t.start();
151+
152+
HashMap<String, Object> params = new HashMap<>();
153+
params.put("bdb_id", endpoint.getBdbId());
154+
params.put("rlutil_command", "pause_bdb");
155+
156+
FaultInjectionClient.TriggerActionResponse actionResponse = null;
157+
158+
try {
159+
log.info("Triggering bdb_pause");
160+
actionResponse = faultClient.triggerAction("execute_rlutil_command", params);
161+
} catch (IOException e) {
162+
fail("Fault Injection Server error:" + e.getMessage());
163+
}
164+
165+
log.info("Action id: {}", actionResponse.getActionId());
166+
fakeApp.setAction(actionResponse);
167+
168+
try {
169+
t.join();
170+
} catch (InterruptedException e) {
171+
throw new RuntimeException(e);
172+
}
173+
174+
ConnectionPool pool = provider.getCluster(1).getConnectionPool();
175+
176+
log.info("First connection pool state: active: {}, idle: {}", pool.getNumActive(),
177+
pool.getNumIdle());
178+
log.info("Full failover time: {} s",
179+
Duration.between(reporter.failoverAt, lastFailedCommandAt.get()).getSeconds());
180+
181+
assertEquals(0, pool.getNumActive());
182+
assertTrue(fakeApp.capturedExceptions().isEmpty());
183+
184+
client.close();
185+
}
186+
187+
}

src/test/java/redis/clients/jedis/scenario/FakeApp.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,18 @@
1212

1313
public class FakeApp implements Runnable {
1414

15-
private static final Logger log = LoggerFactory.getLogger(FakeApp.class);
15+
protected static final Logger log = LoggerFactory.getLogger(FakeApp.class);
1616

1717
public void setKeepExecutingForSeconds(int keepExecutingForSeconds) {
1818
this.keepExecutingForSeconds = keepExecutingForSeconds;
1919
}
2020

21-
private int keepExecutingForSeconds = 60;
21+
protected int keepExecutingForSeconds = 60;
2222

23-
private FaultInjectionClient.TriggerActionResponse actionResponse = null;
24-
private final UnifiedJedis client;
25-
private final ExecutedAction action;
26-
private List<JedisException> exceptions = new ArrayList<>();
23+
protected FaultInjectionClient.TriggerActionResponse actionResponse = null;
24+
protected final UnifiedJedis client;
25+
protected final ExecutedAction action;
26+
protected List<JedisException> exceptions = new ArrayList<>();
2727

2828
@FunctionalInterface
2929
public interface ExecutedAction {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package redis.clients.jedis.scenario;
2+
3+
import redis.clients.jedis.UnifiedJedis;
4+
import redis.clients.jedis.exceptions.JedisConnectionException;
5+
6+
import java.time.Duration;
7+
import java.util.concurrent.ExecutorService;
8+
import java.util.concurrent.Executors;
9+
import java.util.concurrent.TimeUnit;
10+
11+
public class MultiThreadedFakeApp extends FakeApp {
12+
13+
private final ExecutorService executorService;
14+
15+
public MultiThreadedFakeApp(UnifiedJedis client, FakeApp.ExecutedAction action, int numThreads) {
16+
super(client, action);
17+
this.executorService = Executors.newFixedThreadPool(numThreads);
18+
}
19+
20+
@Override
21+
public void run() {
22+
log.info("Starting FakeApp");
23+
24+
int checkEachSeconds = 5;
25+
int timeoutSeconds = 120;
26+
27+
while (actionResponse == null || !actionResponse.isCompleted(
28+
Duration.ofSeconds(checkEachSeconds), Duration.ofSeconds(keepExecutingForSeconds),
29+
Duration.ofSeconds(timeoutSeconds))) {
30+
try {
31+
executorService.submit(() -> action.run(client));
32+
} catch (JedisConnectionException e) {
33+
log.error("Error executing action", e);
34+
exceptions.add(e);
35+
}
36+
}
37+
38+
executorService.shutdown();
39+
40+
try {
41+
if (!executorService.awaitTermination(keepExecutingForSeconds, TimeUnit.SECONDS)) {
42+
executorService.shutdownNow();
43+
}
44+
} catch (InterruptedException e) {
45+
log.error("Error waiting for executor service to terminate", e);
46+
}
47+
}
48+
}

0 commit comments

Comments
 (0)