Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds test to produce deadlock #4471

Draft
wants to merge 1 commit into
base: develop
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 89 additions & 23 deletions exist-core/src/test/java/org/exist/storage/BrokerPoolTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@
import org.junit.Test;
import org.xmldb.api.base.XMLDBException;

import java.util.List;
import java.util.ArrayList;
import java.util.Optional;
import java.util.concurrent.*;
import java.util.function.Consumer;

import static junit.framework.TestCase.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.junit.Assert.*;

/**
* @author <a href="mailto:[email protected]">Adam Retter</a>
Expand All @@ -50,7 +52,7 @@ public void noPrivilegeEscalationThroughBrokerRelease() throws EXistException {
//take a broker with the guest user
final BrokerPool pool = existEmbeddedServer.getBrokerPool();
final Subject guestUser = pool.getSecurityManager().getGuestSubject();
try(final DBBroker broker1 = pool.get(Optional.of(guestUser))) {
try (final DBBroker broker1 = pool.get(Optional.of(guestUser))) {

assertEquals("Expected `guest` user, but was: " + broker1.getCurrentSubject().getName(), guestUser.getId(), broker1.getCurrentSubject().getId());

Expand All @@ -70,7 +72,7 @@ public void privilegeStableWhenSubjectNull() throws EXistException {
//take a broker with the SYSTEM user
final BrokerPool pool = existEmbeddedServer.getBrokerPool();
final Subject sysUser = pool.getSecurityManager().getSystemSubject();
try(final DBBroker broker1 = pool.get(Optional.of(sysUser))) {
try (final DBBroker broker1 = pool.get(Optional.of(sysUser))) {

assertEquals("Expected `SYSTEM` user, but was: " + broker1.getCurrentSubject().getName(), sysUser.getId(), broker1.getCurrentSubject().getId());

Expand All @@ -88,7 +90,7 @@ public void privilegeStableWhenSubjectNull() throws EXistException {
public void guestDefaultPriviledge() throws EXistException {
//take a broker with default perms
final BrokerPool pool = existEmbeddedServer.getBrokerPool();
try(final DBBroker broker1 = pool.getBroker()) {
try (final DBBroker broker1 = pool.getBroker()) {

final Subject guestUser = pool.getSecurityManager().getGuestSubject();

Expand All @@ -109,7 +111,7 @@ public void noPrivilegeEscalationThroughBrokerRelease_xmldb() throws EXistExcept
//take a broker with the guest user
final BrokerPool pool = existEmbeddedServer.getBrokerPool();
final Subject guestUser = pool.getSecurityManager().getGuestSubject();
try(final DBBroker broker1 = pool.get(Optional.of(guestUser))) {
try (final DBBroker broker1 = pool.get(Optional.of(guestUser))) {

assertEquals("Expected `guest` user, but was: " + broker1.getCurrentSubject().getName(), guestUser.getId(), broker1.getCurrentSubject().getId());

Expand All @@ -135,20 +137,17 @@ public void canReleaseWhenSaturated() throws InterruptedException, ExecutionExce
// test requires at least 2 leasedBrokers to prove the issue
assertTrue(maxBrokers > 1);

final ExecutorService executor = Executors.newFixedThreadPool(maxBrokers + 1);
final List<Future<Void>> tasks = new ArrayList<>(maxBrokers);
final CountDownLatch firstBrokerReleaseLatch = new CountDownLatch(1);
final CountDownLatch releaseLatch = new CountDownLatch(1);
try {

// lease all brokers
final Thread brokerUsers[] = new Thread[maxBrokers];
final CountDownLatch acquiredLatch = new CountDownLatch(maxBrokers);

final Thread firstBrokerUser = new Thread(new BrokerUser(pool, acquiredLatch, firstBrokerReleaseLatch), "first-brokerUser");
brokerUsers[0] = firstBrokerUser;
brokerUsers[0].start();
for (int i = 1; i < maxBrokers; i++) {
brokerUsers[i] = new Thread(new BrokerUser(pool, acquiredLatch, releaseLatch));
brokerUsers[i].start();
Future<Void> firstBrokerUser = executor.submit(new BrokerUser(pool, acquiredLatch, firstBrokerReleaseLatch));
for (int count = 1; count < maxBrokers; count++) {
tasks.add(executor.submit(new BrokerUser(pool, acquiredLatch, releaseLatch)));
}

// wait for all brokers to be acquired
Expand All @@ -160,9 +159,8 @@ public void canReleaseWhenSaturated() throws InterruptedException, ExecutionExce

// create a new thread and attempt to get an additional broker
final CountDownLatch additionalBrokerAcquiredLatch = new CountDownLatch(1);
final Thread additionalBrokerUser = new Thread(new BrokerUser(pool, additionalBrokerAcquiredLatch, releaseLatch), "additional-brokerUser");
assertEquals(1, additionalBrokerAcquiredLatch.getCount());
additionalBrokerUser.start();
executor.submit(new BrokerUser(pool, additionalBrokerAcquiredLatch, releaseLatch));

// we should not be able to acquire an additional broker, as we have already leased max
Thread.sleep(500); // just to ensure the other thread has done something
Expand All @@ -172,23 +170,92 @@ public void canReleaseWhenSaturated() throws InterruptedException, ExecutionExce
assertEquals(1, firstBrokerReleaseLatch.getCount());
firstBrokerReleaseLatch.countDown();
assertEquals(0, firstBrokerReleaseLatch.getCount());
firstBrokerUser.join(); // wait for the first broker lease thread to complete
firstBrokerUser.get(); // wait for the first broker lease thread to complete

// check that the additional broker lease has now been acquired
Thread.sleep(500); // just to ensure the other thread has done something
assertEquals(0, additionalBrokerAcquiredLatch.getCount());

executor.shutdown();
} finally {
// release all brokers from brokerUsers
if(firstBrokerReleaseLatch.getCount() == 1) {
if (firstBrokerReleaseLatch.getCount() == 1) {
firstBrokerReleaseLatch.countDown();
}
releaseLatch.countDown();
assertTrue(executor.awaitTermination(1, TimeUnit.SECONDS));
for (Future<Void> task : tasks) {
task.get();
}
for (Runnable task : executor.shutdownNow()) {
assertNotNull(task);
}
}
}

@Test
public void concurrentShutdownAndUse() throws InterruptedException, ExecutionException {
final BrokerPool pool = existEmbeddedServer.getBrokerPool();
final int maxBrokers = pool.getMax();
final int taskAmount = maxBrokers * 50;

// test requires at least 5 leasedBrokers to prove the issue
assertTrue(maxBrokers > 4);

final CountDownLatch readyLatch = new CountDownLatch(1);
final CountDownLatch executeLatch = new CountDownLatch(1);
final ExecutorService executor = Executors.newFixedThreadPool(taskAmount);
final List<Future<Void>> tasks = new ArrayList<>(taskAmount);
final Consumer<BrokerPool> brokerAquire = brokerPool -> {
try (final DBBroker broker = brokerPool.getBroker()) {
TimeUnit.SECONDS.sleep(1);
} catch (EXistException e) {
throw new IllegalStateException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
};
for (int count = 0; count < taskAmount; count++) {
tasks.add(executor.submit(new PoolAction(pool, readyLatch, executeLatch, (count % 2 == 0) ? BrokerPool::shutdown : brokerAquire)));
}
executor.shutdown();

TimeUnit.SECONDS.sleep(2);
readyLatch.countDown();

assertTrue(executor.awaitTermination(1, TimeUnit.MINUTES));
for (Future<Void> task : tasks) {
task.get();
}
for (Runnable task : executor.shutdownNow()) {
assertNotNull(task);
}
}

static class PoolAction implements Callable<Void> {
private final BrokerPool brokerPool;
private final CountDownLatch readyLatch;
private final CountDownLatch excuteLatch;
private final Consumer<BrokerPool> action;

PoolAction(final BrokerPool brokerPool, CountDownLatch readyLatch, CountDownLatch excuteLatch, Consumer<BrokerPool> action) {
this.brokerPool = brokerPool;
this.readyLatch = readyLatch;
this.excuteLatch = excuteLatch;
this.action = action;
}

@Override
public Void call() throws InterruptedException {
readyLatch.await();
action.accept(brokerPool);
return null;
}
}

public static class BrokerUser implements Runnable {

public static class BrokerUser implements Callable<Void> {
final BrokerPool brokerPool;
private final CountDownLatch acquiredLatch;
private final CountDownLatch releaseLatch;
Expand All @@ -200,8 +267,8 @@ public BrokerUser(final BrokerPool brokerPool, final CountDownLatch acquiredLatc
}

@Override
public void run() {
try(final DBBroker broker = brokerPool.getBroker()) {
public Void call() throws EXistException, InterruptedException {
try (final DBBroker broker = brokerPool.getBroker()) {

// signal that we have acquired the broker
acquiredLatch.countDown();
Expand All @@ -210,9 +277,8 @@ public void run() {
// wait for signal to release the broker
releaseLatch.await();

} catch(final EXistException | InterruptedException e) {
fail(e.getMessage());
}
return null;
}
}

Expand Down