Skip to content

Commit

Permalink
Adds test to produce deadlock
Browse files Browse the repository at this point in the history
Signed-off-by: Patrick Reinhart <[email protected]>
  • Loading branch information
reinhapa committed Feb 5, 2023
1 parent e0282b1 commit 9f04558
Showing 1 changed file with 89 additions and 23 deletions.
112 changes: 89 additions & 23 deletions exist-core/src/test/java/org/exist/storage/BrokerPoolTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@
import org.junit.Test;
import org.xmldb.api.base.XMLDBException;

import java.util.List;
import java.util.ArrayList;
import java.util.Optional;
import java.util.concurrent.*;
import java.util.function.Consumer;

import static junit.framework.TestCase.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.junit.Assert.*;

/**
* @author <a href="mailto:[email protected]">Adam Retter</a>
Expand All @@ -50,7 +52,7 @@ public void noPrivilegeEscalationThroughBrokerRelease() throws EXistException {
//take a broker with the guest user
final BrokerPool pool = existEmbeddedServer.getBrokerPool();
final Subject guestUser = pool.getSecurityManager().getGuestSubject();
try(final DBBroker broker1 = pool.get(Optional.of(guestUser))) {
try (final DBBroker broker1 = pool.get(Optional.of(guestUser))) {

assertEquals("Expected `guest` user, but was: " + broker1.getCurrentSubject().getName(), guestUser.getId(), broker1.getCurrentSubject().getId());

Expand All @@ -70,7 +72,7 @@ public void privilegeStableWhenSubjectNull() throws EXistException {
//take a broker with the SYSTEM user
final BrokerPool pool = existEmbeddedServer.getBrokerPool();
final Subject sysUser = pool.getSecurityManager().getSystemSubject();
try(final DBBroker broker1 = pool.get(Optional.of(sysUser))) {
try (final DBBroker broker1 = pool.get(Optional.of(sysUser))) {

assertEquals("Expected `SYSTEM` user, but was: " + broker1.getCurrentSubject().getName(), sysUser.getId(), broker1.getCurrentSubject().getId());

Expand All @@ -88,7 +90,7 @@ public void privilegeStableWhenSubjectNull() throws EXistException {
public void guestDefaultPriviledge() throws EXistException {
//take a broker with default perms
final BrokerPool pool = existEmbeddedServer.getBrokerPool();
try(final DBBroker broker1 = pool.getBroker()) {
try (final DBBroker broker1 = pool.getBroker()) {

final Subject guestUser = pool.getSecurityManager().getGuestSubject();

Expand All @@ -109,7 +111,7 @@ public void noPrivilegeEscalationThroughBrokerRelease_xmldb() throws EXistExcept
//take a broker with the guest user
final BrokerPool pool = existEmbeddedServer.getBrokerPool();
final Subject guestUser = pool.getSecurityManager().getGuestSubject();
try(final DBBroker broker1 = pool.get(Optional.of(guestUser))) {
try (final DBBroker broker1 = pool.get(Optional.of(guestUser))) {

assertEquals("Expected `guest` user, but was: " + broker1.getCurrentSubject().getName(), guestUser.getId(), broker1.getCurrentSubject().getId());

Expand All @@ -135,20 +137,17 @@ public void canReleaseWhenSaturated() throws InterruptedException, ExecutionExce
// test requires at least 2 leasedBrokers to prove the issue
assertTrue(maxBrokers > 1);

final ExecutorService executor = Executors.newFixedThreadPool(maxBrokers + 1);
final List<Future<Void>> tasks = new ArrayList<>(maxBrokers);
final CountDownLatch firstBrokerReleaseLatch = new CountDownLatch(1);
final CountDownLatch releaseLatch = new CountDownLatch(1);
try {

// lease all brokers
final Thread brokerUsers[] = new Thread[maxBrokers];
final CountDownLatch acquiredLatch = new CountDownLatch(maxBrokers);

final Thread firstBrokerUser = new Thread(new BrokerUser(pool, acquiredLatch, firstBrokerReleaseLatch), "first-brokerUser");
brokerUsers[0] = firstBrokerUser;
brokerUsers[0].start();
for (int i = 1; i < maxBrokers; i++) {
brokerUsers[i] = new Thread(new BrokerUser(pool, acquiredLatch, releaseLatch));
brokerUsers[i].start();
Future<Void> firstBrokerUser = executor.submit(new BrokerUser(pool, acquiredLatch, firstBrokerReleaseLatch));
for (int count = 1; count < maxBrokers; count++) {
tasks.add(executor.submit(new BrokerUser(pool, acquiredLatch, releaseLatch)));
}

// wait for all brokers to be acquired
Expand All @@ -160,9 +159,8 @@ public void canReleaseWhenSaturated() throws InterruptedException, ExecutionExce

// create a new thread and attempt to get an additional broker
final CountDownLatch additionalBrokerAcquiredLatch = new CountDownLatch(1);
final Thread additionalBrokerUser = new Thread(new BrokerUser(pool, additionalBrokerAcquiredLatch, releaseLatch), "additional-brokerUser");
assertEquals(1, additionalBrokerAcquiredLatch.getCount());
additionalBrokerUser.start();
executor.submit(new BrokerUser(pool, additionalBrokerAcquiredLatch, releaseLatch));

// we should not be able to acquire an additional broker, as we have already leased max
Thread.sleep(500); // just to ensure the other thread has done something
Expand All @@ -172,23 +170,92 @@ public void canReleaseWhenSaturated() throws InterruptedException, ExecutionExce
assertEquals(1, firstBrokerReleaseLatch.getCount());
firstBrokerReleaseLatch.countDown();
assertEquals(0, firstBrokerReleaseLatch.getCount());
firstBrokerUser.join(); // wait for the first broker lease thread to complete
firstBrokerUser.get(); // wait for the first broker lease thread to complete

// check that the additional broker lease has now been acquired
Thread.sleep(500); // just to ensure the other thread has done something
assertEquals(0, additionalBrokerAcquiredLatch.getCount());

executor.shutdown();
} finally {
// release all brokers from brokerUsers
if(firstBrokerReleaseLatch.getCount() == 1) {
if (firstBrokerReleaseLatch.getCount() == 1) {
firstBrokerReleaseLatch.countDown();
}
releaseLatch.countDown();
assertTrue(executor.awaitTermination(1, TimeUnit.SECONDS));
for (Future<Void> task : tasks) {
task.get();
}
for (Runnable task : executor.shutdownNow()) {
assertNotNull(task);
}
}
}

@Test
public void concurrentShutdownAndUse() throws InterruptedException, ExecutionException {
final BrokerPool pool = existEmbeddedServer.getBrokerPool();
final int maxBrokers = pool.getMax();
final int taskAmount = maxBrokers * 50;

// test requires at least 5 leasedBrokers to prove the issue
assertTrue(maxBrokers > 4);

final CountDownLatch readyLatch = new CountDownLatch(1);
final CountDownLatch executeLatch = new CountDownLatch(1);
final ExecutorService executor = Executors.newFixedThreadPool(taskAmount);
final List<Future<Void>> tasks = new ArrayList<>(taskAmount);
final Consumer<BrokerPool> brokerAquire = brokerPool -> {
try (final DBBroker broker = brokerPool.getBroker()) {
TimeUnit.SECONDS.sleep(1);
} catch (EXistException e) {
throw new IllegalStateException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
};
for (int count = 0; count < taskAmount; count++) {
tasks.add(executor.submit(new PoolAction(pool, readyLatch, executeLatch, (count % 2 == 0) ? BrokerPool::shutdown : brokerAquire)));
}
executor.shutdown();

TimeUnit.SECONDS.sleep(2);
readyLatch.countDown();

assertTrue(executor.awaitTermination(1, TimeUnit.MINUTES));
for (Future<Void> task : tasks) {
task.get();
}
for (Runnable task : executor.shutdownNow()) {
assertNotNull(task);
}
}

static class PoolAction implements Callable<Void> {
private final BrokerPool brokerPool;
private final CountDownLatch readyLatch;
private final CountDownLatch excuteLatch;
private final Consumer<BrokerPool> action;

PoolAction(final BrokerPool brokerPool, CountDownLatch readyLatch, CountDownLatch excuteLatch, Consumer<BrokerPool> action) {
this.brokerPool = brokerPool;
this.readyLatch = readyLatch;
this.excuteLatch = excuteLatch;
this.action = action;
}

@Override
public Void call() throws InterruptedException {
readyLatch.await();
action.accept(brokerPool);
return null;
}
}

public static class BrokerUser implements Runnable {

public static class BrokerUser implements Callable<Void> {
final BrokerPool brokerPool;
private final CountDownLatch acquiredLatch;
private final CountDownLatch releaseLatch;
Expand All @@ -200,8 +267,8 @@ public BrokerUser(final BrokerPool brokerPool, final CountDownLatch acquiredLatc
}

@Override
public void run() {
try(final DBBroker broker = brokerPool.getBroker()) {
public Void call() throws EXistException, InterruptedException {
try (final DBBroker broker = brokerPool.getBroker()) {

// signal that we have acquired the broker
acquiredLatch.countDown();
Expand All @@ -210,9 +277,8 @@ public void run() {
// wait for signal to release the broker
releaseLatch.await();

} catch(final EXistException | InterruptedException e) {
fail(e.getMessage());
}
return null;
}
}

Expand Down

0 comments on commit 9f04558

Please sign in to comment.