diff --git a/README.md b/README.md index 24a65f6..9ffe265 100644 --- a/README.md +++ b/README.md @@ -286,6 +286,66 @@ For example, let's assume you want to load users from a database, you could prob // ... ``` +### Returning a stream of results from your batch publisher + +It may be that your batch loader function is a [Reactive Streams](https://www.reactive-streams.org/) [Publisher](https://www.reactive-streams.org/reactive-streams-1.0.3-javadoc/org/reactivestreams/Publisher.html), where values are emitted as an asynchronous stream. + +For example, let's say you wanted to load many users from a service without forcing the service to load all +users into its memory (which may exert considerable pressure on it). + +A `org.dataloader.BatchPublisher` may be used to load this data: + +```java + BatchPublisher batchPublisher = new BatchPublisher() { + @Override + public void load(List userIds, Subscriber userSubscriber) { + userManager.publishUsersById(userIds, userSubscriber); + } + }; + DataLoader userLoader = DataLoaderFactory.newPublisherDataLoader(batchPublisher); + + // ... +``` + +Rather than waiting for all values to be returned, this `DataLoader` will complete +the `CompletableFuture` returned by `Dataloader#load(Long)` as each value is +processed. + +If an exception is thrown, the remaining futures yet to be completed are completed +exceptionally. + +You *MUST* ensure that the values are streamed in the same order as the keys provided, +with the same cardinality (i.e. the number of values must match the number of keys). +Failing to do so will result in incorrect data being returned from `DataLoader#load`. + + +### Returning a mapped stream of results from your batch publisher + +Your publisher may not necessarily return values in the same order in which it processes keys. + +For example, let's say your batch publisher function loads user data which is spread across shards, +with some shards responding more quickly than others. + +In instances like these, `org.dataloader.MappedBatchPublisher` can be used. + +```java + MappedBatchPublisher mappedBatchPublisher = new MappedBatchPublisher() { + @Override + public void load(Set userIds, Subscriber> userEntrySubscriber) { + userManager.publishUsersById(userIds, userEntrySubscriber); + } + }; + DataLoader userLoader = DataLoaderFactory.newMappedPublisherDataLoader(mappedBatchPublisher); + + // ... +``` + +Like the `BatchPublisher`, if an exception is thrown, the remaining futures yet to be completed are completed +exceptionally. + +Unlike the `BatchPublisher`, however, it is not necessary to return values in the same order as the provided keys, +or even the same number of values. + ### Error object is not a thing in a type safe Java world In the reference JS implementation if the batch loader returns an `Error` object back from the `load()` promise is rejected @@ -541,6 +601,12 @@ The following is a `BatchLoaderScheduler` that waits 10 milliseconds before invo return scheduledCall.invoke(); }).thenCompose(Function.identity()); } + + @Override + public void scheduleBatchPublisher(ScheduledBatchPublisherCall scheduledCall, List keys, BatchLoaderEnvironment environment) { + snooze(10); + scheduledCall.invoke(); + } }; ``` diff --git a/src/main/java/org/dataloader/DataLoaderHelper.java b/src/main/java/org/dataloader/DataLoaderHelper.java index 62a7cb6..9cd38d6 100644 --- a/src/main/java/org/dataloader/DataLoaderHelper.java +++ b/src/main/java/org/dataloader/DataLoaderHelper.java @@ -536,7 +536,7 @@ private CompletableFuture> invokeBatchPublisher(List keys, List> invokeMappedBatchPublisher(List keys, List keyContexts, List> queuedFutures, BatchLoaderEnvironment environment) { CompletableFuture> loadResult = new CompletableFuture<>(); Subscriber> subscriber = ReactiveSupport.mappedBatchSubscriber(loadResult, keys, keyContexts, queuedFutures, helperIntegration()); - + Set setOfKeys = new LinkedHashSet<>(keys); BatchLoaderScheduler batchLoaderScheduler = loaderOptions.getBatchLoaderScheduler(); if (batchLoadFunction instanceof MappedBatchPublisherWithContext) { //noinspection unchecked @@ -551,10 +551,10 @@ private CompletableFuture> invokeMappedBatchPublisher(List keys, List //noinspection unchecked MappedBatchPublisher loadFunction = (MappedBatchPublisher) batchLoadFunction; if (batchLoaderScheduler != null) { - BatchLoaderScheduler.ScheduledBatchPublisherCall loadCall = () -> loadFunction.load(keys, subscriber); + BatchLoaderScheduler.ScheduledBatchPublisherCall loadCall = () -> loadFunction.load(setOfKeys, subscriber); batchLoaderScheduler.scheduleBatchPublisher(loadCall, keys, null); } else { - loadFunction.load(keys, subscriber); + loadFunction.load(setOfKeys, subscriber); } } return loadResult; diff --git a/src/main/java/org/dataloader/MappedBatchPublisher.java b/src/main/java/org/dataloader/MappedBatchPublisher.java index 398e880..754ee52 100644 --- a/src/main/java/org/dataloader/MappedBatchPublisher.java +++ b/src/main/java/org/dataloader/MappedBatchPublisher.java @@ -2,8 +2,8 @@ import org.reactivestreams.Subscriber; -import java.util.List; import java.util.Map; +import java.util.Set; /** * A function that is invoked for batch loading a stream of data values indicated by the provided list of keys. @@ -26,5 +26,5 @@ public interface MappedBatchPublisher { * @param keys the collection of keys to load * @param subscriber as values arrive you must call the subscriber for each value */ - void load(List keys, Subscriber> subscriber); + void load(Set keys, Subscriber> subscriber); } diff --git a/src/test/java/ReadmeExamples.java b/src/test/java/ReadmeExamples.java index 31354ea..a20c0ea 100644 --- a/src/test/java/ReadmeExamples.java +++ b/src/test/java/ReadmeExamples.java @@ -1,11 +1,13 @@ import org.dataloader.BatchLoader; import org.dataloader.BatchLoaderEnvironment; import org.dataloader.BatchLoaderWithContext; +import org.dataloader.BatchPublisher; import org.dataloader.CacheMap; import org.dataloader.DataLoader; import org.dataloader.DataLoaderFactory; import org.dataloader.DataLoaderOptions; import org.dataloader.MappedBatchLoaderWithContext; +import org.dataloader.MappedBatchPublisher; import org.dataloader.Try; import org.dataloader.fixtures.SecurityCtx; import org.dataloader.fixtures.User; @@ -15,6 +17,7 @@ import org.dataloader.scheduler.BatchLoaderScheduler; import org.dataloader.stats.Statistics; import org.dataloader.stats.ThreadLocalStatisticsCollector; +import org.reactivestreams.Subscriber; import java.time.Duration; import java.util.ArrayList; @@ -171,7 +174,7 @@ private void tryExample() { } } - private void tryBatcLoader() { + private void tryBatchLoader() { DataLoader dataLoader = DataLoaderFactory.newDataLoaderWithTry(new BatchLoader>() { @Override public CompletionStage>> load(List keys) { @@ -187,6 +190,26 @@ public CompletionStage>> load(List keys) { }); } + private void batchPublisher() { + BatchPublisher batchPublisher = new BatchPublisher() { + @Override + public void load(List userIds, Subscriber userSubscriber) { + userManager.publishUsersById(userIds, userSubscriber); + } + }; + DataLoader userLoader = DataLoaderFactory.newPublisherDataLoader(batchPublisher); + } + + private void mappedBatchPublisher() { + MappedBatchPublisher mappedBatchPublisher = new MappedBatchPublisher() { + @Override + public void load(Set userIds, Subscriber> userEntrySubscriber) { + userManager.publishUsersById(userIds, userEntrySubscriber); + } + }; + DataLoader userLoader = DataLoaderFactory.newMappedPublisherDataLoader(mappedBatchPublisher); + } + DataLoader userDataLoader; private void clearCacheOnError() { diff --git a/src/test/java/org/dataloader/DataLoaderTest.java b/src/test/java/org/dataloader/DataLoaderTest.java index 1f748fb..1ce34ea 100644 --- a/src/test/java/org/dataloader/DataLoaderTest.java +++ b/src/test/java/org/dataloader/DataLoaderTest.java @@ -740,7 +740,7 @@ public void should_work_with_duplicate_keys_when_caching_disabled(TestDataLoader assertThat(future1.get(), equalTo("A")); assertThat(future2.get(), equalTo("B")); assertThat(future3.get(), equalTo("A")); - if (factory instanceof MappedDataLoaderFactory) { + if (factory instanceof MappedDataLoaderFactory || factory instanceof MappedPublisherDataLoaderFactory) { assertThat(loadCalls, equalTo(singletonList(asList("A", "B")))); } else { assertThat(loadCalls, equalTo(singletonList(asList("A", "B", "A")))); diff --git a/src/test/java/org/dataloader/fixtures/UserManager.java b/src/test/java/org/dataloader/fixtures/UserManager.java index 24fee0d..4fed3f7 100644 --- a/src/test/java/org/dataloader/fixtures/UserManager.java +++ b/src/test/java/org/dataloader/fixtures/UserManager.java @@ -1,5 +1,8 @@ package org.dataloader.fixtures; +import org.reactivestreams.Subscriber; +import reactor.core.publisher.Flux; + import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -52,6 +55,14 @@ public List loadUsersById(List userIds) { return userIds.stream().map(this::loadUserById).collect(Collectors.toList()); } + public void publishUsersById(List userIds, Subscriber userSubscriber) { + Flux.fromIterable(loadUsersById(userIds)).subscribe(userSubscriber); + } + + public void publishUsersById(Set userIds, Subscriber> userEntrySubscriber) { + Flux.fromIterable(loadMapOfUsersByIds(null, userIds).entrySet()).subscribe(userEntrySubscriber); + } + public Map loadMapOfUsersByIds(SecurityCtx callCtx, Set userIds) { Map map = new HashMap<>(); userIds.forEach(userId -> { diff --git a/src/test/java/org/dataloader/fixtures/parameterized/MappedPublisherDataLoaderFactory.java b/src/test/java/org/dataloader/fixtures/parameterized/MappedPublisherDataLoaderFactory.java index f5c1ad5..9c92330 100644 --- a/src/test/java/org/dataloader/fixtures/parameterized/MappedPublisherDataLoaderFactory.java +++ b/src/test/java/org/dataloader/fixtures/parameterized/MappedPublisherDataLoaderFactory.java @@ -10,8 +10,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; import java.util.stream.Stream; +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toSet; import static org.dataloader.DataLoaderFactory.newMappedPublisherDataLoader; import static org.dataloader.DataLoaderFactory.newMappedPublisherDataLoaderWithTry; @@ -69,7 +73,7 @@ public DataLoader idLoaderBlowsUpsAfterN(int N, DataLoaderOptions opti return newMappedPublisherDataLoader((keys, subscriber) -> { loadCalls.add(new ArrayList<>(keys)); - List nKeys = keys.subList(0, N); + List nKeys = keys.stream().limit(N).collect(toList()); Flux> subFlux = Flux.fromIterable(nKeys).map(k -> Map.entry(k, k)); subFlux.concatWith(Flux.error(new IllegalStateException("Error"))) .subscribe(subscriber); @@ -81,7 +85,7 @@ public DataLoader onlyReturnsNValues(int N, DataLoaderOptions op return newMappedPublisherDataLoader((keys, subscriber) -> { loadCalls.add(new ArrayList<>(keys)); - List nKeys = keys.subList(0, N); + List nKeys = keys.stream().limit(N).collect(toList()); Flux.fromIterable(nKeys).map(k -> Map.entry(k, k)) .subscribe(subscriber); }, options);