From 6e302206f43a40b44982ff2e2747a6dff579d5ff Mon Sep 17 00:00:00 2001 From: Alexandre Carlton Date: Sun, 12 May 2024 17:56:56 +1000 Subject: [PATCH] Add a proof-of-concept for "Observer-like" batch loading **Note**: This commit, as-is, is not (yet) intended for merge. It is created to provide a proof-of-concept and gauge interest as polishing/testing this requires a non-trivial amount of effort. Motivation ========== The current DataLoader mechanism completes the corresponding `CompletableFuture` for a given key when the corresponding value is returned. However, DataLoader's `BatchLoader` assumes that the underlying batch function can only return all of its requested items at once (as an example, a SQL database query). However, the batch function may be a service that can return items progressively using a subscription-like architecture. Some examples include: - Project Reactor's [Subscriber](https://www.reactive-streams.org/reactive-streams-1.0.4-javadoc/org/reactivestreams/Subscriber.html). - gRPC's [StreamObserver](https://grpc.github.io/grpc-java/javadoc/io/grpc/stub/StreamObserver.html). - RX Java's [Flowable](https://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Flowable.html). Streaming results in this fashion offers several advantages: - Certain values may be returned earlier than others (for example, the batch function may have cached values it can return early). - Memory load is lessened on the batch function (which may be an external service), as it does not need to keep hold of the retrieved values before it can send them out at once. - We are able to save the need to stream individual error values by providing an `onError` function to terminate the stream early. Proposal ======== We provide two new `BatchLoader`s and support for them in `java-dataloader`: - `ObserverBatchLoader`, with a load function that accepts: - a list of keys. - a `BatchObserver` intended as a delegate for publisher-like structures found in Project Reactor and Rx Java. This obviates the need to depend on external libraries. - `MappedObserverBatchLoader`, similar to `ObserverBatchLoader` but with an `onNext` that accepts a key _and_ value (to allow for early termination of streams without needing to process `null`s). - `*WithContext` variants for the above. The key value-add is that the implementation of `BatchObserver` (provided to the load functions) will immediately complete the queued future for a given key when `onNext` is called with a value. This means that if we have a batch function that can deliver values progressively, we can continue evaluating the query as the values arrive. As an arbitrary example, let's have a batch function that serves both the reporter and project fields on a Jira issue: ```graphql query { issue { project { issueTypes { ... } } reporter { ... } } } ``` If the batch function can return a `project` immediately but is delayed in when it can `reporter`, then our batch loader can return `project` and start evaluating the `issueTypes` immediately while we load the `reporter` in parallel. This would provide a more performant query evaluation. As mentioned above, this is not in a state to be merged - this is intended to gauge whether this is something the maintainers would be interested in owning. Should this be the case, the author is willing to test/polish this pull request so that it may be merged. --- .../java/org/dataloader/BatchObserver.java | 33 +++ .../java/org/dataloader/DataLoaderHelper.java | 267 +++++++++++++++++- .../org/dataloader/MappedBatchObserver.java | 34 +++ .../dataloader/MappedObserverBatchLoader.java | 17 ++ .../MappedObserverBatchLoaderWithContext.java | 10 + .../org/dataloader/ObserverBatchLoader.java | 19 ++ .../ObserverBatchLoaderWithContext.java | 10 + .../scheduler/BatchLoaderScheduler.java | 21 ++ src/test/java/ReadmeExamples.java | 6 + ...taLoaderMappedObserverBatchLoaderTest.java | 106 +++++++ .../DataLoaderObserverBatchLoaderTest.java | 108 +++++++ .../scheduler/BatchLoaderSchedulerTest.java | 20 ++ 12 files changed, 644 insertions(+), 7 deletions(-) create mode 100644 src/main/java/org/dataloader/BatchObserver.java create mode 100644 src/main/java/org/dataloader/MappedBatchObserver.java create mode 100644 src/main/java/org/dataloader/MappedObserverBatchLoader.java create mode 100644 src/main/java/org/dataloader/MappedObserverBatchLoaderWithContext.java create mode 100644 src/main/java/org/dataloader/ObserverBatchLoader.java create mode 100644 src/main/java/org/dataloader/ObserverBatchLoaderWithContext.java create mode 100644 src/test/java/org/dataloader/DataLoaderMappedObserverBatchLoaderTest.java create mode 100644 src/test/java/org/dataloader/DataLoaderObserverBatchLoaderTest.java diff --git a/src/main/java/org/dataloader/BatchObserver.java b/src/main/java/org/dataloader/BatchObserver.java new file mode 100644 index 0000000..14ef051 --- /dev/null +++ b/src/main/java/org/dataloader/BatchObserver.java @@ -0,0 +1,33 @@ +package org.dataloader; + +/** + * A interface intended as a delegate for other Observer-like classes used in other libraries, to be invoked by the calling + * {@link ObserverBatchLoader}. + *

+ * Some examples include: + *

+ * @param the value type of the {@link ObserverBatchLoader} + */ +public interface BatchObserver { + + /** + * To be called by the {@link ObserverBatchLoader} to load a new value. + */ + void onNext(V value); + + /** + * To be called by the {@link ObserverBatchLoader} to indicate all values have been successfully processed. + * This {@link BatchObserver} should not have any method invoked after this is called. + */ + void onCompleted(); + + /** + * To be called by the {@link ObserverBatchLoader} to indicate an unrecoverable error has been encountered. + * This {@link BatchObserver} should not have any method invoked after this is called. + */ + void onError(Throwable e); +} diff --git a/src/main/java/org/dataloader/DataLoaderHelper.java b/src/main/java/org/dataloader/DataLoaderHelper.java index d934de2..47d2d35 100644 --- a/src/main/java/org/dataloader/DataLoaderHelper.java +++ b/src/main/java/org/dataloader/DataLoaderHelper.java @@ -15,6 +15,7 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -241,10 +242,14 @@ private CompletableFuture> sliceIntoBatchesOfBatches(List keys, List< @SuppressWarnings("unchecked") private CompletableFuture> dispatchQueueBatch(List keys, List callContexts, List> queuedFutures) { stats.incrementBatchLoadCountBy(keys.size(), new IncrementBatchLoadCountByStatisticsContext<>(keys, callContexts)); - CompletableFuture> batchLoad = invokeLoader(keys, callContexts, loaderOptions.cachingEnabled()); + CompletableFuture> batchLoad = invokeLoader(keys, callContexts, queuedFutures, loaderOptions.cachingEnabled()); return batchLoad .thenApply(values -> { assertResultSize(keys, values); + if (isObserverLoader() || isMapObserverLoader()) { + // We have already completed the queued futures by the time the overall batchLoad future has completed. + return values; + } List clearCacheKeys = new ArrayList<>(); for (int idx = 0; idx < queuedFutures.size(); idx++) { @@ -342,14 +347,15 @@ private CompletableFuture queueOrInvokeLoader(K key, Object loadContext, bool CompletableFuture invokeLoaderImmediately(K key, Object keyContext, boolean cachingEnabled) { List keys = singletonList(key); List keyContexts = singletonList(keyContext); - return invokeLoader(keys, keyContexts, cachingEnabled) + List> queuedFutures = singletonList(new CompletableFuture<>()); + return invokeLoader(keys, keyContexts, queuedFutures, cachingEnabled) .thenApply(list -> list.get(0)) .toCompletableFuture(); } - CompletableFuture> invokeLoader(List keys, List keyContexts, boolean cachingEnabled) { + CompletableFuture> invokeLoader(List keys, List keyContexts, List> queuedFutures, boolean cachingEnabled) { if (!cachingEnabled) { - return invokeLoader(keys, keyContexts); + return invokeLoader(keys, keyContexts, queuedFutures); } CompletableFuture>> cacheCallCF = getFromValueCache(keys); return cacheCallCF.thenCompose(cachedValues -> { @@ -360,6 +366,7 @@ CompletableFuture> invokeLoader(List keys, List keyContexts, List missedKeyIndexes = new ArrayList<>(); List missedKeys = new ArrayList<>(); List missedKeyContexts = new ArrayList<>(); + List> missedQueuedFutures = new ArrayList<>(); // if they return a ValueCachingNotSupported exception then we insert this special marker value, and it // means it's a total miss, we need to get all these keys via the batch loader @@ -369,6 +376,7 @@ CompletableFuture> invokeLoader(List keys, List keyContexts, missedKeyIndexes.add(i); missedKeys.add(keys.get(i)); missedKeyContexts.add(keyContexts.get(i)); + missedQueuedFutures.add(queuedFutures.get(i)); } } else { assertState(keys.size() == cachedValues.size(), () -> "The size of the cached values MUST be the same size as the key list"); @@ -393,7 +401,7 @@ CompletableFuture> invokeLoader(List keys, List keyContexts, // we missed some keys from cache, so send them to the batch loader // and then fill in their values // - CompletableFuture> batchLoad = invokeLoader(missedKeys, missedKeyContexts); + CompletableFuture> batchLoad = invokeLoader(missedKeys, missedKeyContexts, missedQueuedFutures); return batchLoad.thenCompose(missedValues -> { assertResultSize(missedKeys, missedValues); @@ -412,8 +420,7 @@ CompletableFuture> invokeLoader(List keys, List keyContexts, }); } - - CompletableFuture> invokeLoader(List keys, List keyContexts) { + CompletableFuture> invokeLoader(List keys, List keyContexts, List> queuedFutures) { CompletableFuture> batchLoad; try { Object context = loaderOptions.getBatchLoaderContextProvider().getContext(); @@ -421,6 +428,10 @@ CompletableFuture> invokeLoader(List keys, List keyContexts) .context(context).keyContexts(keys, keyContexts).build(); if (isMapLoader()) { batchLoad = invokeMapBatchLoader(keys, environment); + } else if (isObserverLoader()) { + batchLoad = invokeObserverBatchLoader(keys, keyContexts, queuedFutures, environment); + } else if (isMapObserverLoader()) { + batchLoad = invokeMappedObserverBatchLoader(keys, keyContexts, queuedFutures, environment); } else { batchLoad = invokeListBatchLoader(keys, environment); } @@ -492,10 +503,68 @@ private CompletableFuture> invokeMapBatchLoader(List keys, BatchLoade }); } + private CompletableFuture> invokeObserverBatchLoader(List keys, List keyContexts, List> queuedFutures, BatchLoaderEnvironment environment) { + CompletableFuture> loadResult = new CompletableFuture<>(); + BatchObserver observer = new BatchObserverImpl(loadResult, keys, keyContexts, queuedFutures); + + BatchLoaderScheduler batchLoaderScheduler = loaderOptions.getBatchLoaderScheduler(); + if (batchLoadFunction instanceof ObserverBatchLoaderWithContext) { + ObserverBatchLoaderWithContext loadFunction = (ObserverBatchLoaderWithContext) batchLoadFunction; + if (batchLoaderScheduler != null) { + BatchLoaderScheduler.ScheduledObserverBatchLoaderCall loadCall = () -> loadFunction.load(keys, observer, environment); + batchLoaderScheduler.scheduleObserverBatchLoader(loadCall, keys, environment); + } else { + loadFunction.load(keys, observer, environment); + } + } else { + ObserverBatchLoader loadFunction = (ObserverBatchLoader) batchLoadFunction; + if (batchLoaderScheduler != null) { + BatchLoaderScheduler.ScheduledObserverBatchLoaderCall loadCall = () -> loadFunction.load(keys, observer); + batchLoaderScheduler.scheduleObserverBatchLoader(loadCall, keys, null); + } else { + loadFunction.load(keys, observer); + } + } + return loadResult; + } + + private CompletableFuture> invokeMappedObserverBatchLoader(List keys, List keyContexts, List> queuedFutures, BatchLoaderEnvironment environment) { + CompletableFuture> loadResult = new CompletableFuture<>(); + MappedBatchObserver observer = new MappedBatchObserverImpl(loadResult, keys, keyContexts, queuedFutures); + + BatchLoaderScheduler batchLoaderScheduler = loaderOptions.getBatchLoaderScheduler(); + if (batchLoadFunction instanceof MappedObserverBatchLoaderWithContext) { + MappedObserverBatchLoaderWithContext loadFunction = (MappedObserverBatchLoaderWithContext) batchLoadFunction; + if (batchLoaderScheduler != null) { + BatchLoaderScheduler.ScheduledObserverBatchLoaderCall loadCall = () -> loadFunction.load(keys, observer, environment); + batchLoaderScheduler.scheduleObserverBatchLoader(loadCall, keys, environment); + } else { + loadFunction.load(keys, observer, environment); + } + } else { + MappedObserverBatchLoader loadFunction = (MappedObserverBatchLoader) batchLoadFunction; + if (batchLoaderScheduler != null) { + BatchLoaderScheduler.ScheduledObserverBatchLoaderCall loadCall = () -> loadFunction.load(keys, observer); + batchLoaderScheduler.scheduleObserverBatchLoader(loadCall, keys, null); + } else { + loadFunction.load(keys, observer); + } + } + return loadResult; + } + private boolean isMapLoader() { return batchLoadFunction instanceof MappedBatchLoader || batchLoadFunction instanceof MappedBatchLoaderWithContext; } + private boolean isObserverLoader() { + return batchLoadFunction instanceof ObserverBatchLoader; + } + + private boolean isMapObserverLoader() { + return batchLoadFunction instanceof MappedObserverBatchLoader; + } + int dispatchDepth() { synchronized (dataLoader) { return loaderQueue.size(); @@ -546,4 +615,188 @@ private CompletableFuture> setToValueCache(List assembledValues, List private static DispatchResult emptyDispatchResult() { return (DispatchResult) EMPTY_DISPATCH_RESULT; } + + private class BatchObserverImpl implements BatchObserver { + private final CompletableFuture> valuesFuture; + private final List keys; + private final List callContexts; + private final List> queuedFutures; + + private final List clearCacheKeys = new ArrayList<>(); + private final List completedValues = new ArrayList<>(); + private int idx = 0; + private boolean onErrorCalled = false; + private boolean onCompletedCalled = false; + + private BatchObserverImpl( + CompletableFuture> valuesFuture, + List keys, + List callContexts, + List> queuedFutures + ) { + this.valuesFuture = valuesFuture; + this.keys = keys; + this.callContexts = callContexts; + this.queuedFutures = queuedFutures; + } + + @Override + public void onNext(V value) { + assert !onErrorCalled && !onCompletedCalled; + + K key = keys.get(idx); + Object callContext = callContexts.get(idx); + CompletableFuture future = queuedFutures.get(idx); + if (value instanceof Throwable) { + stats.incrementLoadErrorCount(new IncrementLoadErrorCountStatisticsContext<>(key, callContext)); + future.completeExceptionally((Throwable) value); + clearCacheKeys.add(keys.get(idx)); + } else if (value instanceof Try) { + // we allow the batch loader to return a Try so we can better represent a computation + // that might have worked or not. + Try tryValue = (Try) value; + if (tryValue.isSuccess()) { + future.complete(tryValue.get()); + } else { + stats.incrementLoadErrorCount(new IncrementLoadErrorCountStatisticsContext<>(key, callContext)); + future.completeExceptionally(tryValue.getThrowable()); + clearCacheKeys.add(keys.get(idx)); + } + } else { + future.complete(value); + } + + completedValues.add(value); + idx++; + } + + @Override + public void onCompleted() { + assert !onErrorCalled; + onCompletedCalled = true; + + assertResultSize(keys, completedValues); + + possiblyClearCacheEntriesOnExceptions(clearCacheKeys); + valuesFuture.complete(completedValues); + } + + @Override + public void onError(Throwable ex) { + assert !onCompletedCalled; + onErrorCalled = true; + + stats.incrementBatchLoadExceptionCount(new IncrementBatchLoadExceptionCountStatisticsContext<>(keys, callContexts)); + if (ex instanceof CompletionException) { + ex = ex.getCause(); + } + // Set the remaining keys to the exception. + for (int i = idx; i < queuedFutures.size(); i++) { + K key = keys.get(i); + CompletableFuture future = queuedFutures.get(i); + future.completeExceptionally(ex); + // clear any cached view of this key because they all failed + dataLoader.clear(key); + } + } + } + + private class MappedBatchObserverImpl implements MappedBatchObserver { + private final CompletableFuture> valuesFuture; + private final List keys; + private final List callContexts; + private final List> queuedFutures; + private final Map callContextByKey; + private final Map> queuedFutureByKey; + + private final List clearCacheKeys = new ArrayList<>(); + private final Map completedValuesByKey = new HashMap<>(); + private boolean onErrorCalled = false; + private boolean onCompletedCalled = false; + + private MappedBatchObserverImpl( + CompletableFuture> valuesFuture, + List keys, + List callContexts, + List> queuedFutures + ) { + this.valuesFuture = valuesFuture; + this.keys = keys; + this.callContexts = callContexts; + this.queuedFutures = queuedFutures; + + this.callContextByKey = new HashMap<>(); + this.queuedFutureByKey = new HashMap<>(); + for (int idx = 0; idx < queuedFutures.size(); idx++) { + K key = keys.get(idx); + Object callContext = callContexts.get(idx); + CompletableFuture queuedFuture = queuedFutures.get(idx); + callContextByKey.put(key, callContext); + queuedFutureByKey.put(key, queuedFuture); + } + } + + @Override + public void onNext(K key, V value) { + assert !onErrorCalled && !onCompletedCalled; + + Object callContext = callContextByKey.get(key); + CompletableFuture future = queuedFutureByKey.get(key); + if (value instanceof Throwable) { + stats.incrementLoadErrorCount(new IncrementLoadErrorCountStatisticsContext<>(key, callContext)); + future.completeExceptionally((Throwable) value); + clearCacheKeys.add(key); + } else if (value instanceof Try) { + // we allow the batch loader to return a Try so we can better represent a computation + // that might have worked or not. + Try tryValue = (Try) value; + if (tryValue.isSuccess()) { + future.complete(tryValue.get()); + } else { + stats.incrementLoadErrorCount(new IncrementLoadErrorCountStatisticsContext<>(key, callContext)); + future.completeExceptionally(tryValue.getThrowable()); + clearCacheKeys.add(key); + } + } else { + future.complete(value); + } + + completedValuesByKey.put(key, value); + } + + @Override + public void onCompleted() { + assert !onErrorCalled; + onCompletedCalled = true; + + possiblyClearCacheEntriesOnExceptions(clearCacheKeys); + List values = new ArrayList<>(keys.size()); + for (K key : keys) { + V value = completedValuesByKey.get(key); + values.add(value); + } + valuesFuture.complete(values); + } + + @Override + public void onError(Throwable ex) { + assert !onCompletedCalled; + onErrorCalled = true; + + stats.incrementBatchLoadExceptionCount(new IncrementBatchLoadExceptionCountStatisticsContext<>(keys, callContexts)); + if (ex instanceof CompletionException) { + ex = ex.getCause(); + } + // Complete the futures for the remaining keys with the exception. + for (int idx = 0; idx < queuedFutures.size(); idx++) { + K key = keys.get(idx); + CompletableFuture future = queuedFutureByKey.get(key); + if (!completedValuesByKey.containsKey(key)) { + future.completeExceptionally(ex); + // clear any cached view of this key because they all failed + dataLoader.clear(key); + } + } + } + } } diff --git a/src/main/java/org/dataloader/MappedBatchObserver.java b/src/main/java/org/dataloader/MappedBatchObserver.java new file mode 100644 index 0000000..59a0f73 --- /dev/null +++ b/src/main/java/org/dataloader/MappedBatchObserver.java @@ -0,0 +1,34 @@ +package org.dataloader; + +/** + * A interface intended as a delegate for other Observer-like classes used in other libraries, to be invoked by the calling + * {@link MappedObserverBatchLoader}. + *

+ * Some examples include: + *

+ * @param the key type of the calling {@link MappedObserverBatchLoader}. + * @param the value type of the calling {@link MappedObserverBatchLoader}. + */ +public interface MappedBatchObserver { + + /** + * To be called by the {@link MappedObserverBatchLoader} to process a new key/value pair. + */ + void onNext(K key, V value); + + /** + * To be called by the {@link MappedObserverBatchLoader} to indicate all values have been successfully processed. + * This {@link MappedBatchObserver} should not have any method invoked after this method is called. + */ + void onCompleted(); + + /** + * To be called by the {@link MappedObserverBatchLoader} to indicate an unrecoverable error has been encountered. + * This {@link MappedBatchObserver} should not have any method invoked after this method is called. + */ + void onError(Throwable e); +} diff --git a/src/main/java/org/dataloader/MappedObserverBatchLoader.java b/src/main/java/org/dataloader/MappedObserverBatchLoader.java new file mode 100644 index 0000000..d82ec75 --- /dev/null +++ b/src/main/java/org/dataloader/MappedObserverBatchLoader.java @@ -0,0 +1,17 @@ +package org.dataloader; + +import java.util.List; + +/** + * A function that is invoked for batch loading a stream of data values indicated by the provided list of keys. + *

+ * The function will call the provided {@link MappedBatchObserver} to process the key/value pairs it has retrieved to allow + * the future returned by {@link DataLoader#load(Object)} to complete as soon as the individual value is available + * (rather than when all values have been retrieved). + * + * @param type parameter indicating the type of keys to use for data load requests. + * @param type parameter indicating the type of values returned + */ +public interface MappedObserverBatchLoader { + void load(List keys, MappedBatchObserver observer); +} diff --git a/src/main/java/org/dataloader/MappedObserverBatchLoaderWithContext.java b/src/main/java/org/dataloader/MappedObserverBatchLoaderWithContext.java new file mode 100644 index 0000000..6619198 --- /dev/null +++ b/src/main/java/org/dataloader/MappedObserverBatchLoaderWithContext.java @@ -0,0 +1,10 @@ +package org.dataloader; + +import java.util.List; + +/** + * A {@link MappedObserverBatchLoader} with a {@link BatchLoaderEnvironment} provided as an extra parameter to {@link #load}. + */ +public interface MappedObserverBatchLoaderWithContext { + void load(List keys, MappedBatchObserver observer, BatchLoaderEnvironment environment); +} diff --git a/src/main/java/org/dataloader/ObserverBatchLoader.java b/src/main/java/org/dataloader/ObserverBatchLoader.java new file mode 100644 index 0000000..0c481f9 --- /dev/null +++ b/src/main/java/org/dataloader/ObserverBatchLoader.java @@ -0,0 +1,19 @@ +package org.dataloader; + +import java.util.List; + +/** + * A function that is invoked for batch loading a stream of data values indicated by the provided list of keys. + *

+ * The function will call the provided {@link BatchObserver} to process the values it has retrieved to allow + * the future returned by {@link DataLoader#load(Object)} to complete as soon as the individual value is available + * (rather than when all values have been retrieved). + *

+ * It is required that values be returned in the same order as the keys provided. + * + * @param type parameter indicating the type of keys to use for data load requests. + * @param type parameter indicating the type of values returned + */ +public interface ObserverBatchLoader { + void load(List keys, BatchObserver observer); +} diff --git a/src/main/java/org/dataloader/ObserverBatchLoaderWithContext.java b/src/main/java/org/dataloader/ObserverBatchLoaderWithContext.java new file mode 100644 index 0000000..14a3dd1 --- /dev/null +++ b/src/main/java/org/dataloader/ObserverBatchLoaderWithContext.java @@ -0,0 +1,10 @@ +package org.dataloader; + +import java.util.List; + +/** + * An {@link ObserverBatchLoader} with a {@link BatchLoaderEnvironment} provided as an extra parameter to {@link #load}. + */ +public interface ObserverBatchLoaderWithContext { + void load(List keys, BatchObserver observer, BatchLoaderEnvironment environment); +} diff --git a/src/main/java/org/dataloader/scheduler/BatchLoaderScheduler.java b/src/main/java/org/dataloader/scheduler/BatchLoaderScheduler.java index 7cddd54..5b88d2c 100644 --- a/src/main/java/org/dataloader/scheduler/BatchLoaderScheduler.java +++ b/src/main/java/org/dataloader/scheduler/BatchLoaderScheduler.java @@ -5,6 +5,8 @@ import org.dataloader.DataLoader; import org.dataloader.DataLoaderOptions; import org.dataloader.MappedBatchLoader; +import org.dataloader.MappedObserverBatchLoader; +import org.dataloader.ObserverBatchLoader; import java.util.List; import java.util.Map; @@ -42,6 +44,13 @@ interface ScheduledMappedBatchLoaderCall { CompletionStage> invoke(); } + /** + * This represents a callback that will invoke a {@link ObserverBatchLoader} or {@link MappedObserverBatchLoader} function under the covers + */ + interface ScheduledObserverBatchLoaderCall { + void invoke(); + } + /** * This is called to schedule a {@link BatchLoader} call. * @@ -71,4 +80,16 @@ interface ScheduledMappedBatchLoaderCall { * @return a promise to the values that come from the {@link BatchLoader} */ CompletionStage> scheduleMappedBatchLoader(ScheduledMappedBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment); + + /** + * This is called to schedule a {@link ObserverBatchLoader} call. + * + * @param scheduledCall the callback that needs to be invoked to allow the {@link ObserverBatchLoader} to proceed. + * @param keys this is the list of keys that will be passed to the {@link ObserverBatchLoader}. + * This is provided only for informative reasons and, you can't change the keys that are used + * @param environment this is the {@link BatchLoaderEnvironment} in place, + * which can be null if it's a simple {@link ObserverBatchLoader} call + * @param the key type + */ + void scheduleObserverBatchLoader(ScheduledObserverBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment); } diff --git a/src/test/java/ReadmeExamples.java b/src/test/java/ReadmeExamples.java index d25dfa7..df733ed 100644 --- a/src/test/java/ReadmeExamples.java +++ b/src/test/java/ReadmeExamples.java @@ -304,6 +304,12 @@ public CompletionStage> scheduleMappedBatchLoader(ScheduledMapp return scheduledCall.invoke(); }).thenCompose(Function.identity()); } + + @Override + public void scheduleObserverBatchLoader(ScheduledObserverBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment) { + snooze(10); + scheduledCall.invoke(); + } }; } diff --git a/src/test/java/org/dataloader/DataLoaderMappedObserverBatchLoaderTest.java b/src/test/java/org/dataloader/DataLoaderMappedObserverBatchLoaderTest.java new file mode 100644 index 0000000..e6f1168 --- /dev/null +++ b/src/test/java/org/dataloader/DataLoaderMappedObserverBatchLoaderTest.java @@ -0,0 +1,106 @@ +package org.dataloader; + +import org.junit.Test; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; + +import static java.util.Arrays.asList; +import static java.util.function.Function.identity; +import static java.util.stream.Collectors.toMap; +import static org.awaitility.Awaitility.await; +import static org.dataloader.DataLoaderFactory.mkDataLoader; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +public class DataLoaderMappedObserverBatchLoaderTest { + + @Test + public void should_Build_a_really_really_simple_data_loader() { + AtomicBoolean success = new AtomicBoolean(); + DataLoader identityLoader = mkDataLoader(keysAsValues(), DataLoaderOptions.newOptions()); + + CompletionStage future1 = identityLoader.load(1); + + future1.thenAccept(value -> { + assertThat(value, equalTo(1)); + success.set(true); + }); + identityLoader.dispatch(); + await().untilAtomic(success, is(true)); + } + + @Test + public void should_Support_loading_multiple_keys_in_one_call() { + AtomicBoolean success = new AtomicBoolean(); + DataLoader identityLoader = mkDataLoader(keysAsValues(), DataLoaderOptions.newOptions()); + + CompletionStage> futureAll = identityLoader.loadMany(asList(1, 2)); + futureAll.thenAccept(promisedValues -> { + assertThat(promisedValues.size(), is(2)); + success.set(true); + }); + identityLoader.dispatch(); + await().untilAtomic(success, is(true)); + assertThat(futureAll.toCompletableFuture().join(), equalTo(asList(1, 2))); + } + + @Test + public void simple_dataloader() { + DataLoader loader = mkDataLoader(keysAsValues(), DataLoaderOptions.newOptions()); + + loader.load("A"); + loader.load("B"); + loader.loadMany(asList("C", "D")); + + List results = loader.dispatchAndJoin(); + + assertThat(results.size(), equalTo(4)); + assertThat(results, equalTo(asList("A", "B", "C", "D"))); + } + + @Test + public void should_observer_batch_multiple_requests() throws ExecutionException, InterruptedException { + DataLoader identityLoader = mkDataLoader(keysAsValues(), new DataLoaderOptions()); + + CompletableFuture future1 = identityLoader.load(1); + CompletableFuture future2 = identityLoader.load(2); + identityLoader.dispatch(); + + await().until(() -> future1.isDone() && future2.isDone()); + assertThat(future1.get(), equalTo(1)); + assertThat(future2.get(), equalTo(2)); + } + + // A simple wrapper class intended as a proof external libraries can leverage this. + private static class Publisher { + + private final MappedBatchObserver delegate; + private Publisher(MappedBatchObserver delegate) { this.delegate = delegate; } + void onNext(Map.Entry entry) { delegate.onNext(entry.getKey(), entry.getValue()); } + void onCompleted() { delegate.onCompleted(); } + void onError(Throwable e) { delegate.onError(e); } + // Mock 'subscribe' methods to simulate what would happen in the real thing. + void subscribe(Map valueByKey) { + valueByKey.entrySet().forEach(this::onNext); + this.onCompleted(); + } + void subscribe(Map valueByKey, Throwable e) { + valueByKey.entrySet().forEach(this::onNext); + this.onError(e); + } + } + + private static MappedObserverBatchLoader keysAsValues() { + return (keys, observer) -> { + Publisher publisher = new Publisher<>(observer); + Map valueByKey = keys.stream().collect(toMap(identity(), identity())); + publisher.subscribe(valueByKey); + }; + } +} diff --git a/src/test/java/org/dataloader/DataLoaderObserverBatchLoaderTest.java b/src/test/java/org/dataloader/DataLoaderObserverBatchLoaderTest.java new file mode 100644 index 0000000..eaeef8f --- /dev/null +++ b/src/test/java/org/dataloader/DataLoaderObserverBatchLoaderTest.java @@ -0,0 +1,108 @@ +package org.dataloader; + +import org.junit.Test; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; + +import static java.util.Arrays.asList; +import static org.awaitility.Awaitility.await; +import static org.dataloader.DataLoaderFactory.mkDataLoader; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +public class DataLoaderObserverBatchLoaderTest { + + @Test + public void should_Build_a_really_really_simple_data_loader() { + AtomicBoolean success = new AtomicBoolean(); + DataLoader identityLoader = mkDataLoader(keysAsValues(), DataLoaderOptions.newOptions()); + + CompletionStage future1 = identityLoader.load(1); + + future1.thenAccept(value -> { + assertThat(value, equalTo(1)); + success.set(true); + }); + identityLoader.dispatch(); + await().untilAtomic(success, is(true)); + } + + @Test + public void should_Support_loading_multiple_keys_in_one_call() { + AtomicBoolean success = new AtomicBoolean(); + DataLoader identityLoader = mkDataLoader(keysAsValues(), DataLoaderOptions.newOptions()); + + CompletionStage> futureAll = identityLoader.loadMany(asList(1, 2)); + futureAll.thenAccept(promisedValues -> { + assertThat(promisedValues.size(), is(2)); + success.set(true); + }); + identityLoader.dispatch(); + await().untilAtomic(success, is(true)); + assertThat(futureAll.toCompletableFuture().join(), equalTo(asList(1, 2))); + } + + @Test + public void simple_dataloader() { + DataLoader loader = mkDataLoader(keysAsValues(), DataLoaderOptions.newOptions()); + + loader.load("A"); + loader.load("B"); + loader.loadMany(asList("C", "D")); + + List results = loader.dispatchAndJoin(); + + assertThat(results.size(), equalTo(4)); + assertThat(results, equalTo(asList("A", "B", "C", "D"))); + } + + @Test + public void should_observer_batch_multiple_requests() throws ExecutionException, InterruptedException { + DataLoader identityLoader = mkDataLoader(keysAsValues(), new DataLoaderOptions()); + + CompletableFuture future1 = identityLoader.load(1); + CompletableFuture future2 = identityLoader.load(2); + identityLoader.dispatch(); + + await().until(() -> future1.isDone() && future2.isDone()); + assertThat(future1.get(), equalTo(1)); + assertThat(future2.get(), equalTo(2)); + } + + // A simple wrapper class intended as a proof external libraries can leverage this. + private static class Publisher { + private final BatchObserver delegate; + private Publisher(BatchObserver delegate) { this.delegate = delegate; } + void onNext(V value) { delegate.onNext(value); } + void onCompleted() { delegate.onCompleted(); } + void onError(Throwable e) { delegate.onError(e); } + // Mock 'subscribe' methods to simulate what would happen in the real thing. + void subscribe(List values) { + values.forEach(this::onNext); + this.onCompleted(); + } + void subscribe(List values, Throwable e) { + values.forEach(this::onNext); + this.onError(e); + } + } + + private static ObserverBatchLoader keysAsValues() { + return (keys, observer) -> { + Publisher publisher = new Publisher<>(observer); + publisher.subscribe(keys); + }; + } + + private static ObserverBatchLoader keysWithValuesAndException(List values, Throwable e) { + return (keys, observer) -> { + Publisher publisher = new Publisher<>(observer); + publisher.subscribe(values, e); + }; + } +} diff --git a/src/test/java/org/dataloader/scheduler/BatchLoaderSchedulerTest.java b/src/test/java/org/dataloader/scheduler/BatchLoaderSchedulerTest.java index beb7c18..b77026c 100644 --- a/src/test/java/org/dataloader/scheduler/BatchLoaderSchedulerTest.java +++ b/src/test/java/org/dataloader/scheduler/BatchLoaderSchedulerTest.java @@ -36,6 +36,11 @@ public CompletionStage> scheduleBatchLoader(ScheduledBatchLoaderC public CompletionStage> scheduleMappedBatchLoader(ScheduledMappedBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment) { return scheduledCall.invoke(); } + + @Override + public void scheduleObserverBatchLoader(ScheduledObserverBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment) { + scheduledCall.invoke(); + } }; private BatchLoaderScheduler delayedScheduling(int ms) { @@ -56,6 +61,12 @@ public CompletionStage> scheduleMappedBatchLoader(ScheduledMapp return scheduledCall.invoke(); }).thenCompose(Function.identity()); } + + @Override + public void scheduleObserverBatchLoader(ScheduledObserverBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment) { + snooze(ms); + scheduledCall.invoke(); + } }; } @@ -139,6 +150,15 @@ public CompletionStage> scheduleMappedBatchLoader(ScheduledMapp return scheduledCall.invoke(); }).thenCompose(Function.identity()); } + + @Override + public void scheduleObserverBatchLoader(ScheduledObserverBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment) { + CompletableFuture.supplyAsync(() -> { + snooze(10); + scheduledCall.invoke(); + return null; + }); + } }; DataLoaderOptions options = DataLoaderOptions.newOptions().setBatchLoaderScheduler(funkyScheduler);