Skip to content

Commit

Permalink
Merge pull request #131 from graphql-java/ticker_mode_on_scheduled_re…
Browse files Browse the repository at this point in the history
…gistry

Ticker mode on ScheduledDataLoaderRegistry
  • Loading branch information
dondonz authored Oct 17, 2023
2 parents 442edf4 + 46ce173 commit 5f8ec4a
Show file tree
Hide file tree
Showing 5 changed files with 291 additions and 30 deletions.
89 changes: 89 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,95 @@ since it was last dispatched".
The above acts as a kind of minimum batch depth, with a time overload. It won't dispatch if the loader depth is less
than or equal to 10 but if 200ms pass it will dispatch.

## Chaining DataLoader calls

It's natural to want to have chained `DataLoader` calls.

```java
CompletableFuture<Object> chainedCalls = dataLoaderA.load("user1")
.thenCompose(userAsKey -> dataLoaderB.load(userAsKey));
```

However, the challenge here is how to be efficient in batching terms.

This is discussed in detail in the https://github.com/graphql-java/java-dataloader/issues/54 issue.

Since CompletableFuture's are async and can complete at some time in the future, when is the best time to call
`dispatch` again when a load call has completed to maximize batching?

The most naive approach is to immediately dispatch the second chained call as follows :

```java
CompletableFuture<Object> chainedWithImmediateDispatch = dataLoaderA.load("user1")
.thenCompose(userAsKey -> {
CompletableFuture<Object> loadB = dataLoaderB.load(userAsKey);
dataLoaderB.dispatch();
return loadB;
});
```

The above will work however the window of batching together multiple calls to `dataLoaderB` will be very small and since
it will likely result in batch sizes of 1.

This is a very difficult problem to solve because you have to balance two competing design ideals which is to maximize the
batching window of secondary calls in a small window of time so you customer requests don't take longer than necessary.

* If the batching window is wide you will maximize the number of keys presented to a `BatchLoader` but your request latency will increase.

* If the batching window is narrow you will reduce your request latency, but also you will reduce the number of keys presented to a `BatchLoader`.


### ScheduledDataLoaderRegistry ticker mode

The `ScheduledDataLoaderRegistry` offers one solution to this called "ticker mode" where it will continually reschedule secondary
`DataLoader` calls after the initial `dispatch()` call is made.

The batch window of time is controlled by the schedule duration setup at when the `ScheduledDataLoaderRegistry` is created.

```java
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();

ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry()
.register("a", dataLoaderA)
.register("b", dataLoaderB)
.scheduledExecutorService(executorService)
.schedule(Duration.ofMillis(10))
.tickerMode(true) // ticker mode is on
.build();

CompletableFuture<Object> chainedCalls = dataLoaderA.load("user1")
.thenCompose(userAsKey -> dataLoaderB.load(userAsKey));

```
When ticker mode is on the chained dataloader calls will complete but the batching window size will depend on how quickly
the first level of `DataLoader` calls returned compared to the `schedule` of the `ScheduledDataLoaderRegistry`.

If you use ticker mode, then you MUST `registry.close()` on the `ScheduledDataLoaderRegistry` at the end of the request (say) otherwise
it will continue to reschedule tasks to the `ScheduledExecutorService` associated with the registry.

You will want to look at sharing the `ScheduledExecutorService` in some way between requests when creating the `ScheduledDataLoaderRegistry`
otherwise you will be creating a thread per `ScheduledDataLoaderRegistry` instance created and with enough concurrent requests
you may create too many threads.

### ScheduledDataLoaderRegistry dispatching algorithm

When ticker mode is **false** the `ScheduledDataLoaderRegistry` algorithm is as follows :

* Nothing starts scheduled - some code must call `registry.dispatchAll()` a first time
* Then for every `DataLoader` in the registry
* The `DispatchPredicate` is called to test if the data loader should be dispatched
* if it returns **false** then a task is scheduled to re-evaluate this specific dataloader in the near future
* If it returns **true**, then `dataLoader.dispatch()` is called and the dataloader is not rescheduled again
* The re-evaluation tasks are run periodically according to the `registry.getScheduleDuration()`

When ticker mode is **true** the `ScheduledDataLoaderRegistry` algorithm is as follows:

* Nothing starts scheduled - some code must call `registry.dispatchAll()` a first time
* Then for every `DataLoader` in the registry
* The `DispatchPredicate` is called to test if the data loader should be dispatched
* if it returns **false** then a task is scheduled to re-evaluate this specific dataloader in the near future
* If it returns **true**, then `dataLoader.dispatch()` is called **and** a task is scheduled to re-evaluate this specific dataloader in the near future
* The re-evaluation tasks are run periodically according to the `registry.getScheduleDuration()`

## Other information sources

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,31 @@
* This will continue to loop (test false and reschedule) until such time as the predicate returns true, in which case
* no rescheduling will occur, and you will need to call dispatch again to restart the process.
* <p>
* In the default mode, when {@link #tickerMode} is false, the registry will continue to loop (test false and reschedule) until such time as the predicate returns true, in which case
* no rescheduling will occur, and you will need to call dispatch again to restart the process.
* <p>
* However, when {@link #tickerMode} is true, the registry will always reschedule continuously after the first ever call to {@link #dispatchAll()}.
* <p>
* This will allow you to chain together {@link DataLoader} load calls like this :
* <pre>{@code
* CompletableFuture<String> future = dataLoaderA.load("A")
* .thenCompose(value -> dataLoaderB.load(value));
* }</pre>
* <p>
* However, it may mean your batching will not be as efficient as it might be. In environments
* like graphql this might mean you are too eager in fetching. The {@link DispatchPredicate} still runs to decide if
* dispatch should happen however in ticker mode it will be continuously rescheduled.
* <p>
* When {@link #tickerMode} is true, you really SHOULD close the registry say at the end of a request otherwise you will leave a job
* on the {@link ScheduledExecutorService} that is continuously dispatching.
* <p>
* If you wanted to create a ScheduledDataLoaderRegistry that started a rescheduling immediately, just create one and
* call {@link #rescheduleNow()}.
* <p>
* By default, it uses a {@link Executors#newSingleThreadScheduledExecutor()}} to schedule the tasks. However, if you
* are creating a {@link ScheduledDataLoaderRegistry} per request you will want to look at sharing this {@link ScheduledExecutorService}
* to avoid creating a new thread per registry created.
* <p>
* This code is currently marked as {@link ExperimentalApi}
*/
@ExperimentalApi
Expand All @@ -37,13 +59,15 @@ public class ScheduledDataLoaderRegistry extends DataLoaderRegistry implements A
private final DispatchPredicate dispatchPredicate;
private final ScheduledExecutorService scheduledExecutorService;
private final Duration schedule;
private final boolean tickerMode;
private volatile boolean closed;

private ScheduledDataLoaderRegistry(Builder builder) {
super();
this.dataLoaders.putAll(builder.dataLoaders);
this.scheduledExecutorService = builder.scheduledExecutorService;
this.schedule = builder.schedule;
this.tickerMode = builder.tickerMode;
this.closed = false;
this.dispatchPredicate = builder.dispatchPredicate;
this.dataLoaderPredicates.putAll(builder.dataLoaderPredicates);
Expand All @@ -64,6 +88,13 @@ public Duration getScheduleDuration() {
return schedule;
}

/**
* @return true of the registry is in ticker mode or false otherwise
*/
public boolean isTickerMode() {
return tickerMode;
}

/**
* This will combine all the current data loaders in this registry and all the data loaders from the specified registry
* and return a new combined registry
Expand Down Expand Up @@ -127,25 +158,6 @@ public ScheduledDataLoaderRegistry register(String key, DataLoader<?, ?> dataLoa
return this;
}

/**
* Returns true if the dataloader has a predicate which returned true, OR the overall
* registry predicate returned true.
*
* @param dataLoaderKey the key in the dataloader map
* @param dataLoader the dataloader
*
* @return true if it should dispatch
*/
private boolean shouldDispatch(String dataLoaderKey, DataLoader<?, ?> dataLoader) {
DispatchPredicate dispatchPredicate = dataLoaderPredicates.get(dataLoader);
if (dispatchPredicate != null) {
if (dispatchPredicate.test(dataLoaderKey, dataLoader)) {
return true;
}
}
return this.dispatchPredicate.test(dataLoaderKey, dataLoader);
}

@Override
public void dispatchAll() {
dispatchAllWithCount();
Expand All @@ -157,11 +169,7 @@ public int dispatchAllWithCount() {
for (Map.Entry<String, DataLoader<?, ?>> entry : dataLoaders.entrySet()) {
DataLoader<?, ?> dataLoader = entry.getValue();
String key = entry.getKey();
if (shouldDispatch(key, dataLoader)) {
sum += dataLoader.dispatchWithCounts().getKeysCount();
} else {
reschedule(key, dataLoader);
}
sum += dispatchOrReschedule(key, dataLoader);
}
return sum;
}
Expand Down Expand Up @@ -196,19 +204,42 @@ public void rescheduleNow() {
dataLoaders.forEach(this::reschedule);
}

/**
* Returns true if the dataloader has a predicate which returned true, OR the overall
* registry predicate returned true.
*
* @param dataLoaderKey the key in the dataloader map
* @param dataLoader the dataloader
*
* @return true if it should dispatch
*/
private boolean shouldDispatch(String dataLoaderKey, DataLoader<?, ?> dataLoader) {
DispatchPredicate dispatchPredicate = dataLoaderPredicates.get(dataLoader);
if (dispatchPredicate != null) {
if (dispatchPredicate.test(dataLoaderKey, dataLoader)) {
return true;
}
}
return this.dispatchPredicate.test(dataLoaderKey, dataLoader);
}

private void reschedule(String key, DataLoader<?, ?> dataLoader) {
if (!closed) {
Runnable runThis = () -> dispatchOrReschedule(key, dataLoader);
scheduledExecutorService.schedule(runThis, schedule.toMillis(), TimeUnit.MILLISECONDS);
}
}

private void dispatchOrReschedule(String key, DataLoader<?, ?> dataLoader) {
if (shouldDispatch(key, dataLoader)) {
dataLoader.dispatch();
} else {
private int dispatchOrReschedule(String key, DataLoader<?, ?> dataLoader) {
int sum = 0;
boolean shouldDispatch = shouldDispatch(key, dataLoader);
if (shouldDispatch) {
sum = dataLoader.dispatchWithCounts().getKeysCount();
}
if (tickerMode || !shouldDispatch) {
reschedule(key, dataLoader);
}
return sum;
}

/**
Expand All @@ -228,6 +259,7 @@ public static class Builder {
private DispatchPredicate dispatchPredicate = DispatchPredicate.DISPATCH_ALWAYS;
private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
private Duration schedule = Duration.ofMillis(10);
private boolean tickerMode = false;

public Builder scheduledExecutorService(ScheduledExecutorService executorService) {
this.scheduledExecutorService = nonNull(executorService);
Expand Down Expand Up @@ -298,6 +330,20 @@ public Builder dispatchPredicate(DispatchPredicate dispatchPredicate) {
return this;
}

/**
* This sets ticker mode on the registry. When ticker mode is true the registry will
* continuously reschedule the data loaders for possible dispatching after the first call
* to dispatchAll.
*
* @param tickerMode true or false
*
* @return this builder for a fluent pattern
*/
public Builder tickerMode(boolean tickerMode) {
this.tickerMode = tickerMode;
return this;
}

/**
* @return the newly built {@link ScheduledDataLoaderRegistry}
*/
Expand Down
36 changes: 35 additions & 1 deletion src/test/java/ReadmeExamples.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -304,7 +307,7 @@ public <K, V> CompletionStage<Map<K, V>> scheduleMappedBatchLoader(ScheduledMapp
};
}

private void ScheduledDispatche() {
private void ScheduledDispatcher() {
DispatchPredicate depthOrTimePredicate = DispatchPredicate.dispatchIfDepthGreaterThan(10)
.or(DispatchPredicate.dispatchIfLongerThan(Duration.ofMillis(200)));

Expand All @@ -314,4 +317,35 @@ private void ScheduledDispatche() {
.register("users", userDataLoader)
.build();
}


DataLoader<String, User> dataLoaderA = DataLoaderFactory.newDataLoader(userBatchLoader);
DataLoader<User, Object> dataLoaderB = DataLoaderFactory.newDataLoader(keys -> {
return CompletableFuture.completedFuture(Collections.singletonList(1L));
});

private void ScheduledDispatcherChained() {
CompletableFuture<Object> chainedCalls = dataLoaderA.load("user1")
.thenCompose(userAsKey -> dataLoaderB.load(userAsKey));


CompletableFuture<Object> chainedWithImmediateDispatch = dataLoaderA.load("user1")
.thenCompose(userAsKey -> {
CompletableFuture<Object> loadB = dataLoaderB.load(userAsKey);
dataLoaderB.dispatch();
return loadB;
});


ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();

ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry()
.register("a", dataLoaderA)
.register("b", dataLoaderB)
.scheduledExecutorService(executorService)
.schedule(Duration.ofMillis(10))
.tickerMode(true) // ticker mode is on
.build();

}
}
28 changes: 27 additions & 1 deletion src/test/java/org/dataloader/fixtures/TestKit.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.dataloader.MappedBatchLoader;
import org.dataloader.MappedBatchLoaderWithContext;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -61,6 +62,23 @@ public static <K, V> BatchLoader<K, V> keysAsValues(List<List<K>> loadCalls) {
};
}

public static <K, V> BatchLoader<K, V> keysAsValuesAsync(Duration delay) {
return keysAsValuesAsync(new ArrayList<>(), delay);
}

public static <K, V> BatchLoader<K, V> keysAsValuesAsync(List<List<K>> loadCalls, Duration delay) {
return keys -> CompletableFuture.supplyAsync(() -> {
snooze(delay.toMillis());
List<K> ks = new ArrayList<>(keys);
loadCalls.add(ks);
@SuppressWarnings("unchecked")
List<V> values = keys.stream()
.map(k -> (V) k)
.collect(toList());
return values;
});
}

public static <K, V> DataLoader<K, V> idLoader() {
return idLoader(null, new ArrayList<>());
}
Expand All @@ -73,6 +91,14 @@ public static <K, V> DataLoader<K, V> idLoader(DataLoaderOptions options, List<L
return DataLoaderFactory.newDataLoader(keysAsValues(loadCalls), options);
}

public static <K, V> DataLoader<K, V> idLoaderAsync(Duration delay) {
return idLoaderAsync(null, new ArrayList<>(), delay);
}

public static <K, V> DataLoader<K, V> idLoaderAsync(DataLoaderOptions options, List<List<K>> loadCalls, Duration delay) {
return DataLoaderFactory.newDataLoader(keysAsValuesAsync(loadCalls, delay), options);
}

public static Collection<Integer> listFrom(int i, int max) {
List<Integer> ints = new ArrayList<>();
for (int j = i; j < max; j++) {
Expand All @@ -85,7 +111,7 @@ public static <V> CompletableFuture<V> futureError() {
return failedFuture(new IllegalStateException("Error"));
}

public static void snooze(int millis) {
public static void snooze(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Expand Down
Loading

0 comments on commit 5f8ec4a

Please sign in to comment.