From 1044d9c27648360383d85e2717c8635f217383eb Mon Sep 17 00:00:00 2001 From: Brad Baker Date: Mon, 25 Sep 2023 14:31:38 +1000 Subject: [PATCH 1/6] This adds a ticker mode to ScheduledDataLoaderRegistry --- .../ScheduledDataLoaderRegistry.java | 63 ++++++++++++++---- .../java/org/dataloader/fixtures/TestKit.java | 28 +++++++- .../ScheduledDataLoaderRegistryTest.java | 64 +++++++++++++++++++ 3 files changed, 143 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java index 4be317e..9f40d62 100644 --- a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java +++ b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java @@ -15,15 +15,34 @@ /** * This {@link DataLoaderRegistry} will use a {@link DispatchPredicate} when {@link #dispatchAll()} is called - * to test (for each {@link DataLoader} in the registry) if a dispatch should proceed. If the predicate returns false, then a task is scheduled - * to perform that predicate dispatch again via the {@link ScheduledExecutorService}. + * to test (for each {@link DataLoader} in the registry) if a dispatch should proceed. If the predicate returns false, + * then a task is scheduled to perform that predicate dispatch again via the {@link ScheduledExecutorService}. *

- * This will continue to loop (test false and reschedule) until such time as the predicate returns true, in which case - * no rescheduling will occur and you will need to call dispatch again to restart the process. + * In the default mode, when {@link #tickerMode} is false, the registry will continue to loop (test false and reschedule) until such time as the predicate returns true, in which case + * no rescheduling will occur, and you will need to call dispatch again to restart the process. + *

+ * However, when {@link #tickerMode} is true, the registry will always reschedule continuously after the first ever call to {@link #dispatchAll()}. + *

+ * This will allow you to chain together {@link DataLoader} load calls like this : + *

{@code
+ *   CompletableFuture future = dataLoaderA.load("A")
+ *                                          .thenCompose(value -> dataLoaderB.load(value));
+ * }
+ *

+ * However, it may mean your batching will not be as efficient as it might be. In environments + * like graphql this might mean you are too eager in fetching. The {@link DispatchPredicate} still runs to decide if + * dispatch should happen however in ticker mode it will be continuously rescheduled. + *

+ * When {@link #tickerMode} is true, you really SHOULD close the registry say at the end of a request otherwise you will leave a job + * on the {@link ScheduledExecutorService} that is continuously dispatching. *

* If you wanted to create a ScheduledDataLoaderRegistry that started a rescheduling immediately, just create one and * call {@link #rescheduleNow()}. *

+ * By default, it uses a {@link Executors#newSingleThreadScheduledExecutor()}} to schedule the tasks. However, if you + * are creating a {@link ScheduledDataLoaderRegistry} per request you will want to look at sharing this {@link ScheduledExecutorService} + * to avoid creating a new thread per registry created. + *

* This code is currently marked as {@link ExperimentalApi} */ @ExperimentalApi @@ -32,6 +51,7 @@ public class ScheduledDataLoaderRegistry extends DataLoaderRegistry implements A private final ScheduledExecutorService scheduledExecutorService; private final DispatchPredicate dispatchPredicate; private final Duration schedule; + private final boolean tickerMode; private volatile boolean closed; private ScheduledDataLoaderRegistry(Builder builder) { @@ -39,6 +59,7 @@ private ScheduledDataLoaderRegistry(Builder builder) { this.scheduledExecutorService = builder.scheduledExecutorService; this.dispatchPredicate = builder.dispatchPredicate; this.schedule = builder.schedule; + this.tickerMode = builder.tickerMode; this.closed = false; } @@ -57,6 +78,13 @@ public Duration getScheduleDuration() { return schedule; } + /** + * @return true of the registry is in ticker mode or false otherwise + */ + public boolean isTickerMode() { + return tickerMode; + } + @Override public void dispatchAll() { dispatchAllWithCount(); @@ -68,11 +96,7 @@ public int dispatchAllWithCount() { for (Map.Entry> entry : dataLoaders.entrySet()) { DataLoader dataLoader = entry.getValue(); String key = entry.getKey(); - if (dispatchPredicate.test(key, dataLoader)) { - sum += dataLoader.dispatchWithCounts().getKeysCount(); - } else { - reschedule(key, dataLoader); - } + dispatchOrReschedule(key, dataLoader); } return sum; } @@ -111,9 +135,11 @@ private void reschedule(String key, DataLoader dataLoader) { } private void dispatchOrReschedule(String key, DataLoader dataLoader) { - if (dispatchPredicate.test(key, dataLoader)) { + boolean shouldDispatch = dispatchPredicate.test(key, dataLoader); + if (shouldDispatch) { dataLoader.dispatch(); - } else { + } + if (tickerMode || !shouldDispatch) { reschedule(key, dataLoader); } } @@ -134,6 +160,7 @@ public static class Builder { private DispatchPredicate dispatchPredicate = (key, dl) -> true; private Duration schedule = Duration.ofMillis(10); private final Map> dataLoaders = new HashMap<>(); + private boolean tickerMode = false; public Builder scheduledExecutorService(ScheduledExecutorService executorService) { this.scheduledExecutorService = nonNull(executorService); @@ -176,6 +203,20 @@ public Builder registerAll(DataLoaderRegistry otherRegistry) { return this; } + /** + * This sets ticker mode on the registry. When ticker mode is true the registry will + * continuously reschedule the data loaders for possible dispatching after the first call + * to dispatchAll. + * + * @param tickerMode true or false + * + * @return this builder for a fluent pattern + */ + public Builder tickerMode(boolean tickerMode) { + this.tickerMode = tickerMode; + return this; + } + /** * @return the newly built {@link ScheduledDataLoaderRegistry} */ diff --git a/src/test/java/org/dataloader/fixtures/TestKit.java b/src/test/java/org/dataloader/fixtures/TestKit.java index 5c87148..d40fdc7 100644 --- a/src/test/java/org/dataloader/fixtures/TestKit.java +++ b/src/test/java/org/dataloader/fixtures/TestKit.java @@ -5,6 +5,7 @@ import org.dataloader.DataLoaderFactory; import org.dataloader.DataLoaderOptions; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -31,6 +32,23 @@ public static BatchLoader keysAsValues(List> loadCalls) { }; } + public static BatchLoader keysAsValuesAsync(Duration delay) { + return keysAsValuesAsync(new ArrayList<>(), delay); + } + + public static BatchLoader keysAsValuesAsync(List> loadCalls, Duration delay) { + return keys -> CompletableFuture.supplyAsync(() -> { + snooze(delay.toMillis()); + List ks = new ArrayList<>(keys); + loadCalls.add(ks); + @SuppressWarnings("unchecked") + List values = keys.stream() + .map(k -> (V) k) + .collect(toList()); + return values; + }); + } + public static DataLoader idLoader() { return idLoader(null, new ArrayList<>()); } @@ -43,6 +61,14 @@ public static DataLoader idLoader(DataLoaderOptions options, List DataLoader idLoaderAsync(Duration delay) { + return idLoaderAsync(null, new ArrayList<>(), delay); + } + + public static DataLoader idLoaderAsync(DataLoaderOptions options, List> loadCalls, Duration delay) { + return DataLoaderFactory.newDataLoader(keysAsValuesAsync(loadCalls, delay), options); + } + public static Collection listFrom(int i, int max) { List ints = new ArrayList<>(); for (int j = i; j < max; j++) { @@ -55,7 +81,7 @@ public static CompletableFuture futureError() { return failedFuture(new IllegalStateException("Error")); } - public static void snooze(int millis) { + public static void snooze(long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { diff --git a/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryTest.java b/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryTest.java index 527f419..18ba41e 100644 --- a/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryTest.java +++ b/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryTest.java @@ -1,6 +1,7 @@ package org.dataloader.registries; import junit.framework.TestCase; +import org.awaitility.core.ConditionTimeoutException; import org.dataloader.DataLoader; import org.dataloader.DataLoaderFactory; import org.dataloader.DataLoaderRegistry; @@ -11,13 +12,17 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import static java.util.Arrays.asList; import static java.util.Collections.singletonList; +import static org.awaitility.Awaitility.await; +import static org.awaitility.Duration.TWO_SECONDS; import static org.dataloader.fixtures.TestKit.keysAsValues; import static org.dataloader.fixtures.TestKit.snooze; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; public class ScheduledDataLoaderRegistryTest extends TestCase { @@ -257,4 +262,63 @@ public void test_close_is_a_one_way_door() { snooze(200); assertEquals(counter.get(), countThen + 1); } + + public void test_can_tick_after_first_dispatch_for_chain_data_loaders() { + + // delays much bigger than the tick rate will mean multiple calls to dispatch + DataLoader dlA = TestKit.idLoaderAsync(Duration.ofMillis(100)); + DataLoader dlB = TestKit.idLoaderAsync(Duration.ofMillis(200)); + + CompletableFuture chainedCF = dlA.load("AK1").thenCompose(dlB::load); + + AtomicBoolean done = new AtomicBoolean(); + chainedCF.whenComplete((v, t) -> done.set(true)); + + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() + .register("a", dlA) + .register("b", dlB) + .dispatchPredicate(alwaysDispatch) + .schedule(Duration.ofMillis(10)) + .tickerMode(true) + .build(); + + assertThat(registry.isTickerMode(), equalTo(true)); + + registry.dispatchAll(); + + await().atMost(TWO_SECONDS).untilAtomic(done, is(true)); + + registry.close(); + } + + public void test_chain_data_loaders_will_hang_if_not_in_ticker_mode() { + + // delays much bigger than the tick rate will mean multiple calls to dispatch + DataLoader dlA = TestKit.idLoaderAsync(Duration.ofMillis(100)); + DataLoader dlB = TestKit.idLoaderAsync(Duration.ofMillis(200)); + + CompletableFuture chainedCF = dlA.load("AK1").thenCompose(dlB::load); + + AtomicBoolean done = new AtomicBoolean(); + chainedCF.whenComplete((v, t) -> done.set(true)); + + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() + .register("a", dlA) + .register("b", dlB) + .dispatchPredicate(alwaysDispatch) + .schedule(Duration.ofMillis(10)) + .tickerMode(false) + .build(); + + assertThat(registry.isTickerMode(), equalTo(false)); + + registry.dispatchAll(); + + try { + await().atMost(TWO_SECONDS).untilAtomic(done, is(true)); + fail("This should not have completed but rather timed out"); + } catch (ConditionTimeoutException expected) { + } + registry.close(); + } } \ No newline at end of file From 37c7a5f8fd88a430ddf724382fcae80efd0e789e Mon Sep 17 00:00:00 2001 From: Brad Baker Date: Mon, 25 Sep 2023 16:07:08 +1000 Subject: [PATCH 2/6] This adds a ticker mode to ScheduledDataLoaderRegistry - added readme info --- README.md | 69 +++++++++++++++++++++++++++++++ src/test/java/ReadmeExamples.java | 36 +++++++++++++++- 2 files changed, 104 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index e48de1d..afbadb7 100644 --- a/README.md +++ b/README.md @@ -538,6 +538,75 @@ since it was last dispatched". The above acts as a kind of minimum batch depth, with a time overload. It won't dispatch if the loader depth is less than or equal to 10 but if 200ms pass it will dispatch. +## Chaining DataLoader calls + +It's natural to want to have chained `DataLoader` calls. + +```java + CompletableFuture chainedCalls = dataLoaderA.load("user1") + .thenCompose(userAsKey -> dataLoaderB.load(userAsKey)); +``` + +However, the challenge here is how to be efficient in batching terms. + +This is discussed in detail in the https://github.com/graphql-java/java-dataloader/issues/54 issue. + +Since CompletableFuture's are async and can complete at some time in the future, when is the best time to call +`dispatch` again when a load call has completed to maximize batching? + +The most naive approach is to immediately dispatch the second chained call as follows : + +```java + CompletableFuture chainedWithImmediateDispatch = dataLoaderA.load("user1") + .thenCompose(userAsKey -> { + CompletableFuture loadB = dataLoaderB.load(userAsKey); + dataLoaderB.dispatch(); + return loadB; + }); +``` + +The above will work however the window of batching together multiple calls to `dataLoaderB` will be very small and since +it will likely result in batch sizes of 1. + +This is a very difficult problem to solve because you have to balance two competing design ideals which is to maximize the +batching window of secondary calls in a small window of time so you customer requests don't take longer than necessary. + +* If the batching window is wide you will maximize the number of keys presented to a `BatchLoader` but your request latency will increase. + +* If the batching window is narrow you will reduce your request latency, but also you will reduce the number of keys presented to a `BatchLoader`. + + +### ScheduledDataLoaderRegistry ticker mode + +The `ScheduledDataLoaderRegistry` offers one solution to this called "ticker mode" where it will continually reschedule secondary +`DataLoader` calls after the initial `dispatch()` call is made. + +The batch window of time is controlled by the schedule duration setup at when the `ScheduledDataLoaderRegistry` is created. + +```java + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() + .register("a", dataLoaderA) + .register("b", dataLoaderB) + .scheduledExecutorService(executorService) + .schedule(Duration.ofMillis(10)) + .tickerMode(true) // ticker mode is on + .build(); + + CompletableFuture chainedCalls = dataLoaderA.load("user1") + .thenCompose(userAsKey -> dataLoaderB.load(userAsKey)); + +``` +When ticker mode is on the chained dataloader calls will complete but the batching window size will depend on how quickly +the first level of `DataLoader` calls returned compared to the `schedule` of the `ScheduledDataLoaderRegistry`. + +If you use ticker mode, then you MUST `registry.close()` on the `ScheduledDataLoaderRegistry` at the end of the request (say) otherwise +it will continue to reschedule tasks to the `ScheduledExecutorService` associated with the registry. + +You will want to look at sharing the `ScheduledExecutorService` in some way between requests when creating the `ScheduledDataLoaderRegistry` +otherwise you will be creating a thread per `ScheduledDataLoaderRegistry` instance created and with enough concurrent requests +you may create too many threads. ## Other information sources diff --git a/src/test/java/ReadmeExamples.java b/src/test/java/ReadmeExamples.java index e37550e..3127a43 100644 --- a/src/test/java/ReadmeExamples.java +++ b/src/test/java/ReadmeExamples.java @@ -18,11 +18,14 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.stream.Collectors; import static java.lang.String.format; @@ -278,7 +281,7 @@ private void statsConfigExample() { DataLoader userDataLoader = DataLoaderFactory.newDataLoader(userBatchLoader, options); } - private void ScheduledDispatche() { + private void ScheduledDispatcher() { DispatchPredicate depthOrTimePredicate = DispatchPredicate.dispatchIfDepthGreaterThan(10) .or(DispatchPredicate.dispatchIfLongerThan(Duration.ofMillis(200))); @@ -288,4 +291,35 @@ private void ScheduledDispatche() { .register("users", userDataLoader) .build(); } + + + DataLoader dataLoaderA = DataLoaderFactory.newDataLoader(userBatchLoader); + DataLoader dataLoaderB = DataLoaderFactory.newDataLoader(keys -> { + return CompletableFuture.completedFuture(Collections.singletonList(1L)); + }); + + private void ScheduledDispatcherChained() { + CompletableFuture chainedCalls = dataLoaderA.load("user1") + .thenCompose(userAsKey -> dataLoaderB.load(userAsKey)); + + + CompletableFuture chainedWithImmediateDispatch = dataLoaderA.load("user1") + .thenCompose(userAsKey -> { + CompletableFuture loadB = dataLoaderB.load(userAsKey); + dataLoaderB.dispatch(); + return loadB; + }); + + + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() + .register("a", dataLoaderA) + .register("b", dataLoaderB) + .scheduledExecutorService(executorService) + .schedule(Duration.ofMillis(10)) + .tickerMode(true) // ticker mode is on + .build(); + + } } From 1d11d87dfce7f216aa3b4d63854ef4447b0e8111 Mon Sep 17 00:00:00 2001 From: Brad Baker Date: Mon, 25 Sep 2023 17:17:22 +1000 Subject: [PATCH 3/6] This adds a ticker mode to ScheduledDataLoaderRegistry - testing works bitches! Found a bug in the sum code that I refactored way --- .../registries/ScheduledDataLoaderRegistry.java | 8 +++++--- .../registries/ScheduledDataLoaderRegistryTest.java | 6 ++++-- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java index 9f40d62..5b58aa6 100644 --- a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java +++ b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java @@ -96,7 +96,7 @@ public int dispatchAllWithCount() { for (Map.Entry> entry : dataLoaders.entrySet()) { DataLoader dataLoader = entry.getValue(); String key = entry.getKey(); - dispatchOrReschedule(key, dataLoader); + sum += dispatchOrReschedule(key, dataLoader); } return sum; } @@ -134,14 +134,16 @@ private void reschedule(String key, DataLoader dataLoader) { } } - private void dispatchOrReschedule(String key, DataLoader dataLoader) { + private int dispatchOrReschedule(String key, DataLoader dataLoader) { + int sum = 0; boolean shouldDispatch = dispatchPredicate.test(key, dataLoader); if (shouldDispatch) { - dataLoader.dispatch(); + sum = dataLoader.dispatchWithCounts().getKeysCount(); } if (tickerMode || !shouldDispatch) { reschedule(key, dataLoader); } + return sum; } /** diff --git a/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryTest.java b/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryTest.java index 18ba41e..5e0cd9a 100644 --- a/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryTest.java +++ b/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryTest.java @@ -284,7 +284,8 @@ public void test_can_tick_after_first_dispatch_for_chain_data_loaders() { assertThat(registry.isTickerMode(), equalTo(true)); - registry.dispatchAll(); + int count = registry.dispatchAllWithCount(); + assertThat(count,equalTo(1)); await().atMost(TWO_SECONDS).untilAtomic(done, is(true)); @@ -312,7 +313,8 @@ public void test_chain_data_loaders_will_hang_if_not_in_ticker_mode() { assertThat(registry.isTickerMode(), equalTo(false)); - registry.dispatchAll(); + int count = registry.dispatchAllWithCount(); + assertThat(count,equalTo(1)); try { await().atMost(TWO_SECONDS).untilAtomic(done, is(true)); From 5ee388c1e2892ebe4946e8cef6deac7c9aa038e3 Mon Sep 17 00:00:00 2001 From: Brad Baker Date: Tue, 26 Sep 2023 09:10:40 +1000 Subject: [PATCH 4/6] This adds a ticker mode to ScheduledDataLoaderRegistry - testing works bitches! More doco based on PR feedback --- README.md | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/README.md b/README.md index 5a1c78d..24a65f6 100644 --- a/README.md +++ b/README.md @@ -653,6 +653,26 @@ You will want to look at sharing the `ScheduledExecutorService` in some way betw otherwise you will be creating a thread per `ScheduledDataLoaderRegistry` instance created and with enough concurrent requests you may create too many threads. +### ScheduledDataLoaderRegistry dispatching algorithm + +When ticker mode is **false** the `ScheduledDataLoaderRegistry` algorithm is as follows : + +* Nothing starts scheduled - some code must call `registry.dispatchAll()` a first time +* Then for every `DataLoader` in the registry + * The `DispatchPredicate` is called to test if the data loader should be dispatched + * if it returns **false** then a task is scheduled to re-evaluate this specific dataloader in the near future + * If it returns **true**, then `dataLoader.dispatch()` is called and the dataloader is not rescheduled again +* The re-evaluation tasks are run periodically according to the `registry.getScheduleDuration()` + +When ticker mode is **true** the `ScheduledDataLoaderRegistry` algorithm is as follows: + +* Nothing starts scheduled - some code must call `registry.dispatchAll()` a first time +* Then for every `DataLoader` in the registry + * The `DispatchPredicate` is called to test if the data loader should be dispatched + * if it returns **false** then a task is scheduled to re-evaluate this specific dataloader in the near future + * If it returns **true**, then `dataLoader.dispatch()` is called **and** a task is scheduled to re-evaluate this specific dataloader in the near future +* The re-evaluation tasks are run periodically according to the `registry.getScheduleDuration()` + ## Other information sources - [Facebook DataLoader Github repo](https://github.com/facebook/dataloader) From f5d79b471989924919b16a9d262c050558590967 Mon Sep 17 00:00:00 2001 From: Brad Baker Date: Sun, 8 Oct 2023 17:17:53 +1100 Subject: [PATCH 5/6] Merged in master --- .../ScheduledDataLoaderRegistry.java | 38 +++++++++---------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java index 2fbec3d..6d3b910 100644 --- a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java +++ b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java @@ -157,25 +157,6 @@ public ScheduledDataLoaderRegistry register(String key, DataLoader dataLoa return this; } - /** - * Returns true if the dataloader has a predicate which returned true, OR the overall - * registry predicate returned true. - * - * @param dataLoaderKey the key in the dataloader map - * @param dataLoader the dataloader - * - * @return true if it should dispatch - */ - private boolean shouldDispatch(String dataLoaderKey, DataLoader dataLoader) { - DispatchPredicate dispatchPredicate = dataLoaderPredicates.get(dataLoader); - if (dispatchPredicate != null) { - if (dispatchPredicate.test(dataLoaderKey, dataLoader)) { - return true; - } - } - return this.dispatchPredicate.test(dataLoaderKey, dataLoader); - } - @Override public void dispatchAll() { dispatchAllWithCount(); @@ -222,6 +203,25 @@ public void rescheduleNow() { dataLoaders.forEach(this::reschedule); } + /** + * Returns true if the dataloader has a predicate which returned true, OR the overall + * registry predicate returned true. + * + * @param dataLoaderKey the key in the dataloader map + * @param dataLoader the dataloader + * + * @return true if it should dispatch + */ + private boolean shouldDispatch(String dataLoaderKey, DataLoader dataLoader) { + DispatchPredicate dispatchPredicate = dataLoaderPredicates.get(dataLoader); + if (dispatchPredicate != null) { + if (dispatchPredicate.test(dataLoaderKey, dataLoader)) { + return true; + } + } + return this.dispatchPredicate.test(dataLoaderKey, dataLoader); + } + private void reschedule(String key, DataLoader dataLoader) { if (!closed) { Runnable runThis = () -> dispatchOrReschedule(key, dataLoader); From 46ce1736dd609005e800ebded87cdb046dc55a43 Mon Sep 17 00:00:00 2001 From: Brad Baker Date: Sun, 8 Oct 2023 17:20:27 +1100 Subject: [PATCH 6/6] Merged in master - tweaked doco --- .../org/dataloader/registries/ScheduledDataLoaderRegistry.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java index 6d3b910..fada66d 100644 --- a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java +++ b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java @@ -42,7 +42,8 @@ *

* When {@link #tickerMode} is true, you really SHOULD close the registry say at the end of a request otherwise you will leave a job * on the {@link ScheduledExecutorService} that is continuously dispatching. - *

* If you wanted to create a ScheduledDataLoaderRegistry that started a rescheduling immediately, just create one and + *

+ * If you wanted to create a ScheduledDataLoaderRegistry that started a rescheduling immediately, just create one and * call {@link #rescheduleNow()}. *

* By default, it uses a {@link Executors#newSingleThreadScheduledExecutor()}} to schedule the tasks. However, if you