diff --git a/README.md b/README.md index b64453e..24a65f6 100644 --- a/README.md +++ b/README.md @@ -583,6 +583,95 @@ since it was last dispatched". The above acts as a kind of minimum batch depth, with a time overload. It won't dispatch if the loader depth is less than or equal to 10 but if 200ms pass it will dispatch. +## Chaining DataLoader calls + +It's natural to want to have chained `DataLoader` calls. + +```java + CompletableFuture chainedCalls = dataLoaderA.load("user1") + .thenCompose(userAsKey -> dataLoaderB.load(userAsKey)); +``` + +However, the challenge here is how to be efficient in batching terms. + +This is discussed in detail in the https://github.com/graphql-java/java-dataloader/issues/54 issue. + +Since CompletableFuture's are async and can complete at some time in the future, when is the best time to call +`dispatch` again when a load call has completed to maximize batching? + +The most naive approach is to immediately dispatch the second chained call as follows : + +```java + CompletableFuture chainedWithImmediateDispatch = dataLoaderA.load("user1") + .thenCompose(userAsKey -> { + CompletableFuture loadB = dataLoaderB.load(userAsKey); + dataLoaderB.dispatch(); + return loadB; + }); +``` + +The above will work however the window of batching together multiple calls to `dataLoaderB` will be very small and since +it will likely result in batch sizes of 1. + +This is a very difficult problem to solve because you have to balance two competing design ideals which is to maximize the +batching window of secondary calls in a small window of time so you customer requests don't take longer than necessary. + +* If the batching window is wide you will maximize the number of keys presented to a `BatchLoader` but your request latency will increase. + +* If the batching window is narrow you will reduce your request latency, but also you will reduce the number of keys presented to a `BatchLoader`. + + +### ScheduledDataLoaderRegistry ticker mode + +The `ScheduledDataLoaderRegistry` offers one solution to this called "ticker mode" where it will continually reschedule secondary +`DataLoader` calls after the initial `dispatch()` call is made. + +The batch window of time is controlled by the schedule duration setup at when the `ScheduledDataLoaderRegistry` is created. + +```java + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() + .register("a", dataLoaderA) + .register("b", dataLoaderB) + .scheduledExecutorService(executorService) + .schedule(Duration.ofMillis(10)) + .tickerMode(true) // ticker mode is on + .build(); + + CompletableFuture chainedCalls = dataLoaderA.load("user1") + .thenCompose(userAsKey -> dataLoaderB.load(userAsKey)); + +``` +When ticker mode is on the chained dataloader calls will complete but the batching window size will depend on how quickly +the first level of `DataLoader` calls returned compared to the `schedule` of the `ScheduledDataLoaderRegistry`. + +If you use ticker mode, then you MUST `registry.close()` on the `ScheduledDataLoaderRegistry` at the end of the request (say) otherwise +it will continue to reschedule tasks to the `ScheduledExecutorService` associated with the registry. + +You will want to look at sharing the `ScheduledExecutorService` in some way between requests when creating the `ScheduledDataLoaderRegistry` +otherwise you will be creating a thread per `ScheduledDataLoaderRegistry` instance created and with enough concurrent requests +you may create too many threads. + +### ScheduledDataLoaderRegistry dispatching algorithm + +When ticker mode is **false** the `ScheduledDataLoaderRegistry` algorithm is as follows : + +* Nothing starts scheduled - some code must call `registry.dispatchAll()` a first time +* Then for every `DataLoader` in the registry + * The `DispatchPredicate` is called to test if the data loader should be dispatched + * if it returns **false** then a task is scheduled to re-evaluate this specific dataloader in the near future + * If it returns **true**, then `dataLoader.dispatch()` is called and the dataloader is not rescheduled again +* The re-evaluation tasks are run periodically according to the `registry.getScheduleDuration()` + +When ticker mode is **true** the `ScheduledDataLoaderRegistry` algorithm is as follows: + +* Nothing starts scheduled - some code must call `registry.dispatchAll()` a first time +* Then for every `DataLoader` in the registry + * The `DispatchPredicate` is called to test if the data loader should be dispatched + * if it returns **false** then a task is scheduled to re-evaluate this specific dataloader in the near future + * If it returns **true**, then `dataLoader.dispatch()` is called **and** a task is scheduled to re-evaluate this specific dataloader in the near future +* The re-evaluation tasks are run periodically according to the `registry.getScheduleDuration()` ## Other information sources diff --git a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java index 5b1af76..fada66d 100644 --- a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java +++ b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java @@ -25,9 +25,31 @@ * This will continue to loop (test false and reschedule) until such time as the predicate returns true, in which case * no rescheduling will occur, and you will need to call dispatch again to restart the process. *

+ * In the default mode, when {@link #tickerMode} is false, the registry will continue to loop (test false and reschedule) until such time as the predicate returns true, in which case + * no rescheduling will occur, and you will need to call dispatch again to restart the process. + *

+ * However, when {@link #tickerMode} is true, the registry will always reschedule continuously after the first ever call to {@link #dispatchAll()}. + *

+ * This will allow you to chain together {@link DataLoader} load calls like this : + *

{@code
+ *   CompletableFuture future = dataLoaderA.load("A")
+ *                                          .thenCompose(value -> dataLoaderB.load(value));
+ * }
+ *

+ * However, it may mean your batching will not be as efficient as it might be. In environments + * like graphql this might mean you are too eager in fetching. The {@link DispatchPredicate} still runs to decide if + * dispatch should happen however in ticker mode it will be continuously rescheduled. + *

+ * When {@link #tickerMode} is true, you really SHOULD close the registry say at the end of a request otherwise you will leave a job + * on the {@link ScheduledExecutorService} that is continuously dispatching. + *

* If you wanted to create a ScheduledDataLoaderRegistry that started a rescheduling immediately, just create one and * call {@link #rescheduleNow()}. *

+ * By default, it uses a {@link Executors#newSingleThreadScheduledExecutor()}} to schedule the tasks. However, if you + * are creating a {@link ScheduledDataLoaderRegistry} per request you will want to look at sharing this {@link ScheduledExecutorService} + * to avoid creating a new thread per registry created. + *

* This code is currently marked as {@link ExperimentalApi} */ @ExperimentalApi @@ -37,6 +59,7 @@ public class ScheduledDataLoaderRegistry extends DataLoaderRegistry implements A private final DispatchPredicate dispatchPredicate; private final ScheduledExecutorService scheduledExecutorService; private final Duration schedule; + private final boolean tickerMode; private volatile boolean closed; private ScheduledDataLoaderRegistry(Builder builder) { @@ -44,6 +67,7 @@ private ScheduledDataLoaderRegistry(Builder builder) { this.dataLoaders.putAll(builder.dataLoaders); this.scheduledExecutorService = builder.scheduledExecutorService; this.schedule = builder.schedule; + this.tickerMode = builder.tickerMode; this.closed = false; this.dispatchPredicate = builder.dispatchPredicate; this.dataLoaderPredicates.putAll(builder.dataLoaderPredicates); @@ -64,6 +88,13 @@ public Duration getScheduleDuration() { return schedule; } + /** + * @return true of the registry is in ticker mode or false otherwise + */ + public boolean isTickerMode() { + return tickerMode; + } + /** * This will combine all the current data loaders in this registry and all the data loaders from the specified registry * and return a new combined registry @@ -127,25 +158,6 @@ public ScheduledDataLoaderRegistry register(String key, DataLoader dataLoa return this; } - /** - * Returns true if the dataloader has a predicate which returned true, OR the overall - * registry predicate returned true. - * - * @param dataLoaderKey the key in the dataloader map - * @param dataLoader the dataloader - * - * @return true if it should dispatch - */ - private boolean shouldDispatch(String dataLoaderKey, DataLoader dataLoader) { - DispatchPredicate dispatchPredicate = dataLoaderPredicates.get(dataLoader); - if (dispatchPredicate != null) { - if (dispatchPredicate.test(dataLoaderKey, dataLoader)) { - return true; - } - } - return this.dispatchPredicate.test(dataLoaderKey, dataLoader); - } - @Override public void dispatchAll() { dispatchAllWithCount(); @@ -157,11 +169,7 @@ public int dispatchAllWithCount() { for (Map.Entry> entry : dataLoaders.entrySet()) { DataLoader dataLoader = entry.getValue(); String key = entry.getKey(); - if (shouldDispatch(key, dataLoader)) { - sum += dataLoader.dispatchWithCounts().getKeysCount(); - } else { - reschedule(key, dataLoader); - } + sum += dispatchOrReschedule(key, dataLoader); } return sum; } @@ -196,6 +204,25 @@ public void rescheduleNow() { dataLoaders.forEach(this::reschedule); } + /** + * Returns true if the dataloader has a predicate which returned true, OR the overall + * registry predicate returned true. + * + * @param dataLoaderKey the key in the dataloader map + * @param dataLoader the dataloader + * + * @return true if it should dispatch + */ + private boolean shouldDispatch(String dataLoaderKey, DataLoader dataLoader) { + DispatchPredicate dispatchPredicate = dataLoaderPredicates.get(dataLoader); + if (dispatchPredicate != null) { + if (dispatchPredicate.test(dataLoaderKey, dataLoader)) { + return true; + } + } + return this.dispatchPredicate.test(dataLoaderKey, dataLoader); + } + private void reschedule(String key, DataLoader dataLoader) { if (!closed) { Runnable runThis = () -> dispatchOrReschedule(key, dataLoader); @@ -203,12 +230,16 @@ private void reschedule(String key, DataLoader dataLoader) { } } - private void dispatchOrReschedule(String key, DataLoader dataLoader) { - if (shouldDispatch(key, dataLoader)) { - dataLoader.dispatch(); - } else { + private int dispatchOrReschedule(String key, DataLoader dataLoader) { + int sum = 0; + boolean shouldDispatch = shouldDispatch(key, dataLoader); + if (shouldDispatch) { + sum = dataLoader.dispatchWithCounts().getKeysCount(); + } + if (tickerMode || !shouldDispatch) { reschedule(key, dataLoader); } + return sum; } /** @@ -228,6 +259,7 @@ public static class Builder { private DispatchPredicate dispatchPredicate = DispatchPredicate.DISPATCH_ALWAYS; private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); private Duration schedule = Duration.ofMillis(10); + private boolean tickerMode = false; public Builder scheduledExecutorService(ScheduledExecutorService executorService) { this.scheduledExecutorService = nonNull(executorService); @@ -298,6 +330,20 @@ public Builder dispatchPredicate(DispatchPredicate dispatchPredicate) { return this; } + /** + * This sets ticker mode on the registry. When ticker mode is true the registry will + * continuously reschedule the data loaders for possible dispatching after the first call + * to dispatchAll. + * + * @param tickerMode true or false + * + * @return this builder for a fluent pattern + */ + public Builder tickerMode(boolean tickerMode) { + this.tickerMode = tickerMode; + return this; + } + /** * @return the newly built {@link ScheduledDataLoaderRegistry} */ diff --git a/src/test/java/ReadmeExamples.java b/src/test/java/ReadmeExamples.java index 40a0260..d25dfa7 100644 --- a/src/test/java/ReadmeExamples.java +++ b/src/test/java/ReadmeExamples.java @@ -19,11 +19,14 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.function.Function; import java.util.stream.Collectors; @@ -304,7 +307,7 @@ public CompletionStage> scheduleMappedBatchLoader(ScheduledMapp }; } - private void ScheduledDispatche() { + private void ScheduledDispatcher() { DispatchPredicate depthOrTimePredicate = DispatchPredicate.dispatchIfDepthGreaterThan(10) .or(DispatchPredicate.dispatchIfLongerThan(Duration.ofMillis(200))); @@ -314,4 +317,35 @@ private void ScheduledDispatche() { .register("users", userDataLoader) .build(); } + + + DataLoader dataLoaderA = DataLoaderFactory.newDataLoader(userBatchLoader); + DataLoader dataLoaderB = DataLoaderFactory.newDataLoader(keys -> { + return CompletableFuture.completedFuture(Collections.singletonList(1L)); + }); + + private void ScheduledDispatcherChained() { + CompletableFuture chainedCalls = dataLoaderA.load("user1") + .thenCompose(userAsKey -> dataLoaderB.load(userAsKey)); + + + CompletableFuture chainedWithImmediateDispatch = dataLoaderA.load("user1") + .thenCompose(userAsKey -> { + CompletableFuture loadB = dataLoaderB.load(userAsKey); + dataLoaderB.dispatch(); + return loadB; + }); + + + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() + .register("a", dataLoaderA) + .register("b", dataLoaderB) + .scheduledExecutorService(executorService) + .schedule(Duration.ofMillis(10)) + .tickerMode(true) // ticker mode is on + .build(); + + } } diff --git a/src/test/java/org/dataloader/fixtures/TestKit.java b/src/test/java/org/dataloader/fixtures/TestKit.java index 1e7c02f..ac29a0c 100644 --- a/src/test/java/org/dataloader/fixtures/TestKit.java +++ b/src/test/java/org/dataloader/fixtures/TestKit.java @@ -8,6 +8,7 @@ import org.dataloader.MappedBatchLoader; import org.dataloader.MappedBatchLoaderWithContext; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -61,6 +62,23 @@ public static BatchLoader keysAsValues(List> loadCalls) { }; } + public static BatchLoader keysAsValuesAsync(Duration delay) { + return keysAsValuesAsync(new ArrayList<>(), delay); + } + + public static BatchLoader keysAsValuesAsync(List> loadCalls, Duration delay) { + return keys -> CompletableFuture.supplyAsync(() -> { + snooze(delay.toMillis()); + List ks = new ArrayList<>(keys); + loadCalls.add(ks); + @SuppressWarnings("unchecked") + List values = keys.stream() + .map(k -> (V) k) + .collect(toList()); + return values; + }); + } + public static DataLoader idLoader() { return idLoader(null, new ArrayList<>()); } @@ -73,6 +91,14 @@ public static DataLoader idLoader(DataLoaderOptions options, List DataLoader idLoaderAsync(Duration delay) { + return idLoaderAsync(null, new ArrayList<>(), delay); + } + + public static DataLoader idLoaderAsync(DataLoaderOptions options, List> loadCalls, Duration delay) { + return DataLoaderFactory.newDataLoader(keysAsValuesAsync(loadCalls, delay), options); + } + public static Collection listFrom(int i, int max) { List ints = new ArrayList<>(); for (int j = i; j < max; j++) { @@ -85,7 +111,7 @@ public static CompletableFuture futureError() { return failedFuture(new IllegalStateException("Error")); } - public static void snooze(int millis) { + public static void snooze(long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { diff --git a/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryTest.java b/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryTest.java index 527f419..5e0cd9a 100644 --- a/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryTest.java +++ b/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryTest.java @@ -1,6 +1,7 @@ package org.dataloader.registries; import junit.framework.TestCase; +import org.awaitility.core.ConditionTimeoutException; import org.dataloader.DataLoader; import org.dataloader.DataLoaderFactory; import org.dataloader.DataLoaderRegistry; @@ -11,13 +12,17 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import static java.util.Arrays.asList; import static java.util.Collections.singletonList; +import static org.awaitility.Awaitility.await; +import static org.awaitility.Duration.TWO_SECONDS; import static org.dataloader.fixtures.TestKit.keysAsValues; import static org.dataloader.fixtures.TestKit.snooze; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; public class ScheduledDataLoaderRegistryTest extends TestCase { @@ -257,4 +262,65 @@ public void test_close_is_a_one_way_door() { snooze(200); assertEquals(counter.get(), countThen + 1); } + + public void test_can_tick_after_first_dispatch_for_chain_data_loaders() { + + // delays much bigger than the tick rate will mean multiple calls to dispatch + DataLoader dlA = TestKit.idLoaderAsync(Duration.ofMillis(100)); + DataLoader dlB = TestKit.idLoaderAsync(Duration.ofMillis(200)); + + CompletableFuture chainedCF = dlA.load("AK1").thenCompose(dlB::load); + + AtomicBoolean done = new AtomicBoolean(); + chainedCF.whenComplete((v, t) -> done.set(true)); + + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() + .register("a", dlA) + .register("b", dlB) + .dispatchPredicate(alwaysDispatch) + .schedule(Duration.ofMillis(10)) + .tickerMode(true) + .build(); + + assertThat(registry.isTickerMode(), equalTo(true)); + + int count = registry.dispatchAllWithCount(); + assertThat(count,equalTo(1)); + + await().atMost(TWO_SECONDS).untilAtomic(done, is(true)); + + registry.close(); + } + + public void test_chain_data_loaders_will_hang_if_not_in_ticker_mode() { + + // delays much bigger than the tick rate will mean multiple calls to dispatch + DataLoader dlA = TestKit.idLoaderAsync(Duration.ofMillis(100)); + DataLoader dlB = TestKit.idLoaderAsync(Duration.ofMillis(200)); + + CompletableFuture chainedCF = dlA.load("AK1").thenCompose(dlB::load); + + AtomicBoolean done = new AtomicBoolean(); + chainedCF.whenComplete((v, t) -> done.set(true)); + + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() + .register("a", dlA) + .register("b", dlB) + .dispatchPredicate(alwaysDispatch) + .schedule(Duration.ofMillis(10)) + .tickerMode(false) + .build(); + + assertThat(registry.isTickerMode(), equalTo(false)); + + int count = registry.dispatchAllWithCount(); + assertThat(count,equalTo(1)); + + try { + await().atMost(TWO_SECONDS).untilAtomic(done, is(true)); + fail("This should not have completed but rather timed out"); + } catch (ConditionTimeoutException expected) { + } + registry.close(); + } } \ No newline at end of file