-
Notifications
You must be signed in to change notification settings - Fork 90
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add a proof-of-concept for "Observer-like" batch loading
**Note**: This commit, as-is, is not (yet) intended for merge. It is created to provide a proof-of-concept and gauge interest as polishing/testing this requires a non-trivial amount of effort. Motivation ========== The current DataLoader mechanism completes the corresponding `CompletableFuture` for a given key when the corresponding value is returned. However, DataLoader's `BatchLoader` assumes that the underlying batch function can only return all of its requested items at once (as an example, a SQL database query). However, the batch function may be a service that can return items progressively using a subscription-like architecture. Some examples include: - Project Reactor's [Subscriber](https://www.reactive-streams.org/reactive-streams-1.0.4-javadoc/org/reactivestreams/Subscriber.html). - gRPC's [StreamObserver](https://grpc.github.io/grpc-java/javadoc/io/grpc/stub/StreamObserver.html). - RX Java's [Flowable](https://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Flowable.html). Streaming results in this fashion offers several advantages: - Certain values may be returned earlier than others (for example, the batch function may have cached values it can return early). - Memory load is lessened on the batch function (which may be an external service), as it does not need to keep hold of the retrieved values before it can send them out at once. - We are able to save the need to stream individual error values by providing an `onError` function to terminate the stream early. Proposal ======== We provide two new `BatchLoader`s and support for them in `java-dataloader`: - `ObserverBatchLoader`, with a load function that accepts: - a list of keys. - a `BatchObserver` intended as a delegate for publisher-like structures found in Project Reactor and Rx Java. This obviates the need to depend on external libraries. - `MappedObserverBatchLoader`, similar to `ObserverBatchLoader` but with an `onNext` that accepts a key _and_ value (to allow for early termination of streams without needing to process `null`s). - `*WithContext` variants for the above. The key value-add is that the implementation of `BatchObserver` (provided to the load functions) will immediately complete the queued future for a given key when `onNext` is called with a value. This means that if we have a batch function that can deliver values progressively, we can continue evaluating the query as the values arrive. As an arbitrary example, let's have a batch function that serves both the reporter and project fields on a Jira issue: ```graphql query { issue { project { issueTypes { ... } } reporter { ... } } } ``` If the batch function can return a `project` immediately but is delayed in when it can `reporter`, then our batch loader can return `project` and start evaluating the `issueTypes` immediately while we load the `reporter` in parallel. This would provide a more performant query evaluation. As mentioned above, this is not in a state to be merged - this is intended to gauge whether this is something the maintainers would be interested in owning. Should this be the case, the author is willing to test/polish this pull request so that it may be merged.
- Loading branch information
1 parent
1bb63aa
commit 6e30220
Showing
12 changed files
with
644 additions
and
7 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
package org.dataloader; | ||
|
||
/** | ||
* A interface intended as a delegate for other Observer-like classes used in other libraries, to be invoked by the calling | ||
* {@link ObserverBatchLoader}. | ||
* <p> | ||
* Some examples include: | ||
* <ul> | ||
* <li>Project Reactor's <a href="https://www.reactive-streams.org/reactive-streams-1.0.4-javadoc/org/reactivestreams/Subscriber.html">{@code Subscriber}</a> | ||
* <li>gRPC's <a href="https://grpc.github.io/grpc-java/javadoc/io/grpc/stub/StreamObserver.html">{@code StreamObserver}</a> | ||
* <li>RX Java's <a href="https://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Flowable.html">{@code Flowable}</a> | ||
* </ul> | ||
* @param <V> the value type of the {@link ObserverBatchLoader} | ||
*/ | ||
public interface BatchObserver<V> { | ||
|
||
/** | ||
* To be called by the {@link ObserverBatchLoader} to load a new value. | ||
*/ | ||
void onNext(V value); | ||
|
||
/** | ||
* To be called by the {@link ObserverBatchLoader} to indicate all values have been successfully processed. | ||
* This {@link BatchObserver} should not have any method invoked after this is called. | ||
*/ | ||
void onCompleted(); | ||
|
||
/** | ||
* To be called by the {@link ObserverBatchLoader} to indicate an unrecoverable error has been encountered. | ||
* This {@link BatchObserver} should not have any method invoked after this is called. | ||
*/ | ||
void onError(Throwable e); | ||
} |
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
package org.dataloader; | ||
|
||
/** | ||
* A interface intended as a delegate for other Observer-like classes used in other libraries, to be invoked by the calling | ||
* {@link MappedObserverBatchLoader}. | ||
* <p> | ||
* Some examples include: | ||
* <ul> | ||
* <li>Project Reactor's <a href="https://www.reactive-streams.org/reactive-streams-1.0.4-javadoc/org/reactivestreams/Subscriber.html">{@code Subscriber}</a> | ||
* <li>gRPC's <a href="https://grpc.github.io/grpc-java/javadoc/io/grpc/stub/StreamObserver.html">{@code StreamObserver}</a> | ||
* <li>RX Java's <a href="https://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Flowable.html">{@code Flowable}</a> | ||
* </ul> | ||
* @param <K> the key type of the calling {@link MappedObserverBatchLoader}. | ||
* @param <V> the value type of the calling {@link MappedObserverBatchLoader}. | ||
*/ | ||
public interface MappedBatchObserver<K, V> { | ||
|
||
/** | ||
* To be called by the {@link MappedObserverBatchLoader} to process a new key/value pair. | ||
*/ | ||
void onNext(K key, V value); | ||
|
||
/** | ||
* To be called by the {@link MappedObserverBatchLoader} to indicate all values have been successfully processed. | ||
* This {@link MappedBatchObserver} should not have any method invoked after this method is called. | ||
*/ | ||
void onCompleted(); | ||
|
||
/** | ||
* To be called by the {@link MappedObserverBatchLoader} to indicate an unrecoverable error has been encountered. | ||
* This {@link MappedBatchObserver} should not have any method invoked after this method is called. | ||
*/ | ||
void onError(Throwable e); | ||
} |
17 changes: 17 additions & 0 deletions
17
src/main/java/org/dataloader/MappedObserverBatchLoader.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
package org.dataloader; | ||
|
||
import java.util.List; | ||
|
||
/** | ||
* A function that is invoked for batch loading a stream of data values indicated by the provided list of keys. | ||
* <p> | ||
* The function will call the provided {@link MappedBatchObserver} to process the key/value pairs it has retrieved to allow | ||
* the future returned by {@link DataLoader#load(Object)} to complete as soon as the individual value is available | ||
* (rather than when all values have been retrieved). | ||
* | ||
* @param <K> type parameter indicating the type of keys to use for data load requests. | ||
* @param <V> type parameter indicating the type of values returned | ||
*/ | ||
public interface MappedObserverBatchLoader<K, V> { | ||
void load(List<K> keys, MappedBatchObserver<K, V> observer); | ||
} |
10 changes: 10 additions & 0 deletions
10
src/main/java/org/dataloader/MappedObserverBatchLoaderWithContext.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
package org.dataloader; | ||
|
||
import java.util.List; | ||
|
||
/** | ||
* A {@link MappedObserverBatchLoader} with a {@link BatchLoaderEnvironment} provided as an extra parameter to {@link #load}. | ||
*/ | ||
public interface MappedObserverBatchLoaderWithContext<K, V> { | ||
void load(List<K> keys, MappedBatchObserver<K, V> observer, BatchLoaderEnvironment environment); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
package org.dataloader; | ||
|
||
import java.util.List; | ||
|
||
/** | ||
* A function that is invoked for batch loading a stream of data values indicated by the provided list of keys. | ||
* <p> | ||
* The function will call the provided {@link BatchObserver} to process the values it has retrieved to allow | ||
* the future returned by {@link DataLoader#load(Object)} to complete as soon as the individual value is available | ||
* (rather than when all values have been retrieved). | ||
* <p> | ||
* It is required that values be returned in the same order as the keys provided. | ||
* | ||
* @param <K> type parameter indicating the type of keys to use for data load requests. | ||
* @param <V> type parameter indicating the type of values returned | ||
*/ | ||
public interface ObserverBatchLoader<K, V> { | ||
void load(List<K> keys, BatchObserver<V> observer); | ||
} |
10 changes: 10 additions & 0 deletions
10
src/main/java/org/dataloader/ObserverBatchLoaderWithContext.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
package org.dataloader; | ||
|
||
import java.util.List; | ||
|
||
/** | ||
* An {@link ObserverBatchLoader} with a {@link BatchLoaderEnvironment} provided as an extra parameter to {@link #load}. | ||
*/ | ||
public interface ObserverBatchLoaderWithContext<K, V> { | ||
void load(List<K> keys, BatchObserver<V> observer, BatchLoaderEnvironment environment); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
106 changes: 106 additions & 0 deletions
106
src/test/java/org/dataloader/DataLoaderMappedObserverBatchLoaderTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
package org.dataloader; | ||
|
||
import org.junit.Test; | ||
|
||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.CompletionStage; | ||
import java.util.concurrent.ExecutionException; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
|
||
import static java.util.Arrays.asList; | ||
import static java.util.function.Function.identity; | ||
import static java.util.stream.Collectors.toMap; | ||
import static org.awaitility.Awaitility.await; | ||
import static org.dataloader.DataLoaderFactory.mkDataLoader; | ||
import static org.hamcrest.Matchers.equalTo; | ||
import static org.hamcrest.Matchers.is; | ||
import static org.junit.Assert.assertThat; | ||
|
||
public class DataLoaderMappedObserverBatchLoaderTest { | ||
|
||
@Test | ||
public void should_Build_a_really_really_simple_data_loader() { | ||
AtomicBoolean success = new AtomicBoolean(); | ||
DataLoader<Integer, Integer> identityLoader = mkDataLoader(keysAsValues(), DataLoaderOptions.newOptions()); | ||
|
||
CompletionStage<Integer> future1 = identityLoader.load(1); | ||
|
||
future1.thenAccept(value -> { | ||
assertThat(value, equalTo(1)); | ||
success.set(true); | ||
}); | ||
identityLoader.dispatch(); | ||
await().untilAtomic(success, is(true)); | ||
} | ||
|
||
@Test | ||
public void should_Support_loading_multiple_keys_in_one_call() { | ||
AtomicBoolean success = new AtomicBoolean(); | ||
DataLoader<Integer, Integer> identityLoader = mkDataLoader(keysAsValues(), DataLoaderOptions.newOptions()); | ||
|
||
CompletionStage<List<Integer>> futureAll = identityLoader.loadMany(asList(1, 2)); | ||
futureAll.thenAccept(promisedValues -> { | ||
assertThat(promisedValues.size(), is(2)); | ||
success.set(true); | ||
}); | ||
identityLoader.dispatch(); | ||
await().untilAtomic(success, is(true)); | ||
assertThat(futureAll.toCompletableFuture().join(), equalTo(asList(1, 2))); | ||
} | ||
|
||
@Test | ||
public void simple_dataloader() { | ||
DataLoader<String, String> loader = mkDataLoader(keysAsValues(), DataLoaderOptions.newOptions()); | ||
|
||
loader.load("A"); | ||
loader.load("B"); | ||
loader.loadMany(asList("C", "D")); | ||
|
||
List<String> results = loader.dispatchAndJoin(); | ||
|
||
assertThat(results.size(), equalTo(4)); | ||
assertThat(results, equalTo(asList("A", "B", "C", "D"))); | ||
} | ||
|
||
@Test | ||
public void should_observer_batch_multiple_requests() throws ExecutionException, InterruptedException { | ||
DataLoader<Integer, Integer> identityLoader = mkDataLoader(keysAsValues(), new DataLoaderOptions()); | ||
|
||
CompletableFuture<Integer> future1 = identityLoader.load(1); | ||
CompletableFuture<Integer> future2 = identityLoader.load(2); | ||
identityLoader.dispatch(); | ||
|
||
await().until(() -> future1.isDone() && future2.isDone()); | ||
assertThat(future1.get(), equalTo(1)); | ||
assertThat(future2.get(), equalTo(2)); | ||
} | ||
|
||
// A simple wrapper class intended as a proof external libraries can leverage this. | ||
private static class Publisher<K, V> { | ||
|
||
private final MappedBatchObserver<K, V> delegate; | ||
private Publisher(MappedBatchObserver<K, V> delegate) { this.delegate = delegate; } | ||
void onNext(Map.Entry<K, V> entry) { delegate.onNext(entry.getKey(), entry.getValue()); } | ||
void onCompleted() { delegate.onCompleted(); } | ||
void onError(Throwable e) { delegate.onError(e); } | ||
// Mock 'subscribe' methods to simulate what would happen in the real thing. | ||
void subscribe(Map<K, V> valueByKey) { | ||
valueByKey.entrySet().forEach(this::onNext); | ||
this.onCompleted(); | ||
} | ||
void subscribe(Map<K, V> valueByKey, Throwable e) { | ||
valueByKey.entrySet().forEach(this::onNext); | ||
this.onError(e); | ||
} | ||
} | ||
|
||
private static <K> MappedObserverBatchLoader<K, K> keysAsValues() { | ||
return (keys, observer) -> { | ||
Publisher<K, K> publisher = new Publisher<>(observer); | ||
Map<K, K> valueByKey = keys.stream().collect(toMap(identity(), identity())); | ||
publisher.subscribe(valueByKey); | ||
}; | ||
} | ||
} |
108 changes: 108 additions & 0 deletions
108
src/test/java/org/dataloader/DataLoaderObserverBatchLoaderTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
package org.dataloader; | ||
|
||
import org.junit.Test; | ||
|
||
import java.util.List; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.CompletionStage; | ||
import java.util.concurrent.ExecutionException; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
|
||
import static java.util.Arrays.asList; | ||
import static org.awaitility.Awaitility.await; | ||
import static org.dataloader.DataLoaderFactory.mkDataLoader; | ||
import static org.hamcrest.Matchers.equalTo; | ||
import static org.hamcrest.Matchers.is; | ||
import static org.junit.Assert.assertThat; | ||
|
||
public class DataLoaderObserverBatchLoaderTest { | ||
|
||
@Test | ||
public void should_Build_a_really_really_simple_data_loader() { | ||
AtomicBoolean success = new AtomicBoolean(); | ||
DataLoader<Integer, Integer> identityLoader = mkDataLoader(keysAsValues(), DataLoaderOptions.newOptions()); | ||
|
||
CompletionStage<Integer> future1 = identityLoader.load(1); | ||
|
||
future1.thenAccept(value -> { | ||
assertThat(value, equalTo(1)); | ||
success.set(true); | ||
}); | ||
identityLoader.dispatch(); | ||
await().untilAtomic(success, is(true)); | ||
} | ||
|
||
@Test | ||
public void should_Support_loading_multiple_keys_in_one_call() { | ||
AtomicBoolean success = new AtomicBoolean(); | ||
DataLoader<Integer, Integer> identityLoader = mkDataLoader(keysAsValues(), DataLoaderOptions.newOptions()); | ||
|
||
CompletionStage<List<Integer>> futureAll = identityLoader.loadMany(asList(1, 2)); | ||
futureAll.thenAccept(promisedValues -> { | ||
assertThat(promisedValues.size(), is(2)); | ||
success.set(true); | ||
}); | ||
identityLoader.dispatch(); | ||
await().untilAtomic(success, is(true)); | ||
assertThat(futureAll.toCompletableFuture().join(), equalTo(asList(1, 2))); | ||
} | ||
|
||
@Test | ||
public void simple_dataloader() { | ||
DataLoader<String, String> loader = mkDataLoader(keysAsValues(), DataLoaderOptions.newOptions()); | ||
|
||
loader.load("A"); | ||
loader.load("B"); | ||
loader.loadMany(asList("C", "D")); | ||
|
||
List<String> results = loader.dispatchAndJoin(); | ||
|
||
assertThat(results.size(), equalTo(4)); | ||
assertThat(results, equalTo(asList("A", "B", "C", "D"))); | ||
} | ||
|
||
@Test | ||
public void should_observer_batch_multiple_requests() throws ExecutionException, InterruptedException { | ||
DataLoader<Integer, Integer> identityLoader = mkDataLoader(keysAsValues(), new DataLoaderOptions()); | ||
|
||
CompletableFuture<Integer> future1 = identityLoader.load(1); | ||
CompletableFuture<Integer> future2 = identityLoader.load(2); | ||
identityLoader.dispatch(); | ||
|
||
await().until(() -> future1.isDone() && future2.isDone()); | ||
assertThat(future1.get(), equalTo(1)); | ||
assertThat(future2.get(), equalTo(2)); | ||
} | ||
|
||
// A simple wrapper class intended as a proof external libraries can leverage this. | ||
private static class Publisher<V> { | ||
private final BatchObserver<V> delegate; | ||
private Publisher(BatchObserver<V> delegate) { this.delegate = delegate; } | ||
void onNext(V value) { delegate.onNext(value); } | ||
void onCompleted() { delegate.onCompleted(); } | ||
void onError(Throwable e) { delegate.onError(e); } | ||
// Mock 'subscribe' methods to simulate what would happen in the real thing. | ||
void subscribe(List<V> values) { | ||
values.forEach(this::onNext); | ||
this.onCompleted(); | ||
} | ||
void subscribe(List<V> values, Throwable e) { | ||
values.forEach(this::onNext); | ||
this.onError(e); | ||
} | ||
} | ||
|
||
private static <K> ObserverBatchLoader<K, K> keysAsValues() { | ||
return (keys, observer) -> { | ||
Publisher<K> publisher = new Publisher<>(observer); | ||
publisher.subscribe(keys); | ||
}; | ||
} | ||
|
||
private static <K, V> ObserverBatchLoader<K, V> keysWithValuesAndException(List<V> values, Throwable e) { | ||
return (keys, observer) -> { | ||
Publisher<V> publisher = new Publisher<>(observer); | ||
publisher.subscribe(values, e); | ||
}; | ||
} | ||
} |
Oops, something went wrong.