-
-
Notifications
You must be signed in to change notification settings - Fork 182
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: Patrick Reinhart <[email protected]>
- Loading branch information
Showing
1 changed file
with
89 additions
and
23 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,12 +30,14 @@ | |
import org.junit.Test; | ||
import org.xmldb.api.base.XMLDBException; | ||
|
||
import java.util.List; | ||
import java.util.ArrayList; | ||
import java.util.Optional; | ||
import java.util.concurrent.*; | ||
import java.util.function.Consumer; | ||
|
||
import static junit.framework.TestCase.assertTrue; | ||
import static org.junit.Assert.assertEquals; | ||
import static org.junit.Assert.fail; | ||
import static org.junit.Assert.*; | ||
|
||
/** | ||
* @author <a href="mailto:[email protected]">Adam Retter</a> | ||
|
@@ -50,7 +52,7 @@ public void noPrivilegeEscalationThroughBrokerRelease() throws EXistException { | |
//take a broker with the guest user | ||
final BrokerPool pool = existEmbeddedServer.getBrokerPool(); | ||
final Subject guestUser = pool.getSecurityManager().getGuestSubject(); | ||
try(final DBBroker broker1 = pool.get(Optional.of(guestUser))) { | ||
try (final DBBroker broker1 = pool.get(Optional.of(guestUser))) { | ||
|
||
assertEquals("Expected `guest` user, but was: " + broker1.getCurrentSubject().getName(), guestUser.getId(), broker1.getCurrentSubject().getId()); | ||
|
||
|
@@ -70,7 +72,7 @@ public void privilegeStableWhenSubjectNull() throws EXistException { | |
//take a broker with the SYSTEM user | ||
final BrokerPool pool = existEmbeddedServer.getBrokerPool(); | ||
final Subject sysUser = pool.getSecurityManager().getSystemSubject(); | ||
try(final DBBroker broker1 = pool.get(Optional.of(sysUser))) { | ||
try (final DBBroker broker1 = pool.get(Optional.of(sysUser))) { | ||
|
||
assertEquals("Expected `SYSTEM` user, but was: " + broker1.getCurrentSubject().getName(), sysUser.getId(), broker1.getCurrentSubject().getId()); | ||
|
||
|
@@ -88,7 +90,7 @@ public void privilegeStableWhenSubjectNull() throws EXistException { | |
public void guestDefaultPriviledge() throws EXistException { | ||
//take a broker with default perms | ||
final BrokerPool pool = existEmbeddedServer.getBrokerPool(); | ||
try(final DBBroker broker1 = pool.getBroker()) { | ||
try (final DBBroker broker1 = pool.getBroker()) { | ||
|
||
final Subject guestUser = pool.getSecurityManager().getGuestSubject(); | ||
|
||
|
@@ -109,7 +111,7 @@ public void noPrivilegeEscalationThroughBrokerRelease_xmldb() throws EXistExcept | |
//take a broker with the guest user | ||
final BrokerPool pool = existEmbeddedServer.getBrokerPool(); | ||
final Subject guestUser = pool.getSecurityManager().getGuestSubject(); | ||
try(final DBBroker broker1 = pool.get(Optional.of(guestUser))) { | ||
try (final DBBroker broker1 = pool.get(Optional.of(guestUser))) { | ||
|
||
assertEquals("Expected `guest` user, but was: " + broker1.getCurrentSubject().getName(), guestUser.getId(), broker1.getCurrentSubject().getId()); | ||
|
||
|
@@ -135,20 +137,17 @@ public void canReleaseWhenSaturated() throws InterruptedException, ExecutionExce | |
// test requires at least 2 leasedBrokers to prove the issue | ||
assertTrue(maxBrokers > 1); | ||
|
||
final ExecutorService executor = Executors.newFixedThreadPool(maxBrokers + 1); | ||
final List<Future<Void>> tasks = new ArrayList<>(maxBrokers); | ||
final CountDownLatch firstBrokerReleaseLatch = new CountDownLatch(1); | ||
final CountDownLatch releaseLatch = new CountDownLatch(1); | ||
try { | ||
|
||
// lease all brokers | ||
final Thread brokerUsers[] = new Thread[maxBrokers]; | ||
final CountDownLatch acquiredLatch = new CountDownLatch(maxBrokers); | ||
|
||
final Thread firstBrokerUser = new Thread(new BrokerUser(pool, acquiredLatch, firstBrokerReleaseLatch), "first-brokerUser"); | ||
brokerUsers[0] = firstBrokerUser; | ||
brokerUsers[0].start(); | ||
for (int i = 1; i < maxBrokers; i++) { | ||
brokerUsers[i] = new Thread(new BrokerUser(pool, acquiredLatch, releaseLatch)); | ||
brokerUsers[i].start(); | ||
Future<Void> firstBrokerUser = executor.submit(new BrokerUser(pool, acquiredLatch, firstBrokerReleaseLatch)); | ||
for (int count = 1; count < maxBrokers; count++) { | ||
tasks.add(executor.submit(new BrokerUser(pool, acquiredLatch, releaseLatch))); | ||
} | ||
|
||
// wait for all brokers to be acquired | ||
|
@@ -160,9 +159,8 @@ public void canReleaseWhenSaturated() throws InterruptedException, ExecutionExce | |
|
||
// create a new thread and attempt to get an additional broker | ||
final CountDownLatch additionalBrokerAcquiredLatch = new CountDownLatch(1); | ||
final Thread additionalBrokerUser = new Thread(new BrokerUser(pool, additionalBrokerAcquiredLatch, releaseLatch), "additional-brokerUser"); | ||
assertEquals(1, additionalBrokerAcquiredLatch.getCount()); | ||
additionalBrokerUser.start(); | ||
executor.submit(new BrokerUser(pool, additionalBrokerAcquiredLatch, releaseLatch)); | ||
|
||
// we should not be able to acquire an additional broker, as we have already leased max | ||
Thread.sleep(500); // just to ensure the other thread has done something | ||
|
@@ -172,23 +170,92 @@ public void canReleaseWhenSaturated() throws InterruptedException, ExecutionExce | |
assertEquals(1, firstBrokerReleaseLatch.getCount()); | ||
firstBrokerReleaseLatch.countDown(); | ||
assertEquals(0, firstBrokerReleaseLatch.getCount()); | ||
firstBrokerUser.join(); // wait for the first broker lease thread to complete | ||
firstBrokerUser.get(); // wait for the first broker lease thread to complete | ||
|
||
// check that the additional broker lease has now been acquired | ||
Thread.sleep(500); // just to ensure the other thread has done something | ||
assertEquals(0, additionalBrokerAcquiredLatch.getCount()); | ||
|
||
executor.shutdown(); | ||
} finally { | ||
// release all brokers from brokerUsers | ||
if(firstBrokerReleaseLatch.getCount() == 1) { | ||
if (firstBrokerReleaseLatch.getCount() == 1) { | ||
firstBrokerReleaseLatch.countDown(); | ||
} | ||
releaseLatch.countDown(); | ||
assertTrue(executor.awaitTermination(1, TimeUnit.SECONDS)); | ||
for (Future<Void> task : tasks) { | ||
task.get(); | ||
} | ||
for (Runnable task : executor.shutdownNow()) { | ||
assertNotNull(task); | ||
} | ||
} | ||
} | ||
|
||
@Test | ||
public void concurrentShutdownAndUse() throws InterruptedException, ExecutionException { | ||
final BrokerPool pool = existEmbeddedServer.getBrokerPool(); | ||
final int maxBrokers = pool.getMax(); | ||
final int taskAmount = maxBrokers * 50; | ||
|
||
// test requires at least 5 leasedBrokers to prove the issue | ||
assertTrue(maxBrokers > 4); | ||
|
||
final CountDownLatch readyLatch = new CountDownLatch(1); | ||
final CountDownLatch executeLatch = new CountDownLatch(1); | ||
final ExecutorService executor = Executors.newFixedThreadPool(taskAmount); | ||
final List<Future<Void>> tasks = new ArrayList<>(taskAmount); | ||
final Consumer<BrokerPool> brokerAquire = brokerPool -> { | ||
try (final DBBroker broker = brokerPool.getBroker()) { | ||
TimeUnit.SECONDS.sleep(1); | ||
} catch (EXistException e) { | ||
throw new IllegalStateException(e); | ||
} catch (InterruptedException e) { | ||
Thread.currentThread().interrupt(); | ||
throw new IllegalStateException(e); | ||
} | ||
}; | ||
for (int count = 0; count < taskAmount; count++) { | ||
tasks.add(executor.submit(new PoolAction(pool, readyLatch, executeLatch, (count % 2 == 0) ? BrokerPool::shutdown : brokerAquire))); | ||
} | ||
executor.shutdown(); | ||
|
||
TimeUnit.SECONDS.sleep(2); | ||
readyLatch.countDown(); | ||
|
||
assertTrue(executor.awaitTermination(1, TimeUnit.MINUTES)); | ||
for (Future<Void> task : tasks) { | ||
task.get(); | ||
} | ||
for (Runnable task : executor.shutdownNow()) { | ||
assertNotNull(task); | ||
} | ||
} | ||
|
||
static class PoolAction implements Callable<Void> { | ||
private final BrokerPool brokerPool; | ||
private final CountDownLatch readyLatch; | ||
private final CountDownLatch excuteLatch; | ||
private final Consumer<BrokerPool> action; | ||
|
||
PoolAction(final BrokerPool brokerPool, CountDownLatch readyLatch, CountDownLatch excuteLatch, Consumer<BrokerPool> action) { | ||
this.brokerPool = brokerPool; | ||
this.readyLatch = readyLatch; | ||
this.excuteLatch = excuteLatch; | ||
this.action = action; | ||
} | ||
|
||
@Override | ||
public Void call() throws InterruptedException { | ||
readyLatch.await(); | ||
action.accept(brokerPool); | ||
return null; | ||
} | ||
} | ||
|
||
public static class BrokerUser implements Runnable { | ||
|
||
public static class BrokerUser implements Callable<Void> { | ||
final BrokerPool brokerPool; | ||
private final CountDownLatch acquiredLatch; | ||
private final CountDownLatch releaseLatch; | ||
|
@@ -200,8 +267,8 @@ public BrokerUser(final BrokerPool brokerPool, final CountDownLatch acquiredLatc | |
} | ||
|
||
@Override | ||
public void run() { | ||
try(final DBBroker broker = brokerPool.getBroker()) { | ||
public Void call() throws EXistException, InterruptedException { | ||
try (final DBBroker broker = brokerPool.getBroker()) { | ||
|
||
// signal that we have acquired the broker | ||
acquiredLatch.countDown(); | ||
|
@@ -210,9 +277,8 @@ public void run() { | |
// wait for signal to release the broker | ||
releaseLatch.await(); | ||
|
||
} catch(final EXistException | InterruptedException e) { | ||
fail(e.getMessage()); | ||
} | ||
return null; | ||
} | ||
} | ||
|
||
|