Skip to content

Commit

Permalink
Merge pull request #160 from AlexandreCarlton/add-documentation-for-p…
Browse files Browse the repository at this point in the history
…ublishers

Have MappedBatchPublisher take in a Set<K> keys (and add README sections)
  • Loading branch information
bbakerman authored May 26, 2024
2 parents 4b9356e + 2e82858 commit c3e6ee5
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 9 deletions.
66 changes: 66 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,66 @@ For example, let's assume you want to load users from a database, you could prob
// ...
```

### Returning a stream of results from your batch publisher

It may be that your batch loader function is a [Reactive Streams](https://www.reactive-streams.org/) [Publisher](https://www.reactive-streams.org/reactive-streams-1.0.3-javadoc/org/reactivestreams/Publisher.html), where values are emitted as an asynchronous stream.

For example, let's say you wanted to load many users from a service without forcing the service to load all
users into its memory (which may exert considerable pressure on it).

A `org.dataloader.BatchPublisher` may be used to load this data:

```java
BatchPublisher<Long, User> batchPublisher = new BatchPublisher<Long, User>() {
@Override
public void load(List<Long> userIds, Subscriber<User> userSubscriber) {
userManager.publishUsersById(userIds, userSubscriber);
}
};
DataLoader<Long, User> userLoader = DataLoaderFactory.newPublisherDataLoader(batchPublisher);

// ...
```

Rather than waiting for all values to be returned, this `DataLoader` will complete
the `CompletableFuture<User>` returned by `Dataloader#load(Long)` as each value is
processed.

If an exception is thrown, the remaining futures yet to be completed are completed
exceptionally.

You *MUST* ensure that the values are streamed in the same order as the keys provided,
with the same cardinality (i.e. the number of values must match the number of keys).
Failing to do so will result in incorrect data being returned from `DataLoader#load`.


### Returning a mapped stream of results from your batch publisher

Your publisher may not necessarily return values in the same order in which it processes keys.

For example, let's say your batch publisher function loads user data which is spread across shards,
with some shards responding more quickly than others.

In instances like these, `org.dataloader.MappedBatchPublisher` can be used.

```java
MappedBatchPublisher<Long, User> mappedBatchPublisher = new MappedBatchPublisher<Long, User>() {
@Override
public void load(Set<Long> userIds, Subscriber<Map.Entry<Long, User>> userEntrySubscriber) {
userManager.publishUsersById(userIds, userEntrySubscriber);
}
};
DataLoader<Long, User> userLoader = DataLoaderFactory.newMappedPublisherDataLoader(mappedBatchPublisher);

// ...
```

Like the `BatchPublisher`, if an exception is thrown, the remaining futures yet to be completed are completed
exceptionally.

Unlike the `BatchPublisher`, however, it is not necessary to return values in the same order as the provided keys,
or even the same number of values.

### Error object is not a thing in a type safe Java world

In the reference JS implementation if the batch loader returns an `Error` object back from the `load()` promise is rejected
Expand Down Expand Up @@ -541,6 +601,12 @@ The following is a `BatchLoaderScheduler` that waits 10 milliseconds before invo
return scheduledCall.invoke();
}).thenCompose(Function.identity());
}

@Override
public <K> void scheduleBatchPublisher(ScheduledBatchPublisherCall scheduledCall, List<K> keys, BatchLoaderEnvironment environment) {
snooze(10);
scheduledCall.invoke();
}
};
```

Expand Down
6 changes: 3 additions & 3 deletions src/main/java/org/dataloader/DataLoaderHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ private CompletableFuture<List<V>> invokeBatchPublisher(List<K> keys, List<Objec
private CompletableFuture<List<V>> invokeMappedBatchPublisher(List<K> keys, List<Object> keyContexts, List<CompletableFuture<V>> queuedFutures, BatchLoaderEnvironment environment) {
CompletableFuture<List<V>> loadResult = new CompletableFuture<>();
Subscriber<Map.Entry<K, V>> subscriber = ReactiveSupport.mappedBatchSubscriber(loadResult, keys, keyContexts, queuedFutures, helperIntegration());

Set<K> setOfKeys = new LinkedHashSet<>(keys);
BatchLoaderScheduler batchLoaderScheduler = loaderOptions.getBatchLoaderScheduler();
if (batchLoadFunction instanceof MappedBatchPublisherWithContext) {
//noinspection unchecked
Expand All @@ -551,10 +551,10 @@ private CompletableFuture<List<V>> invokeMappedBatchPublisher(List<K> keys, List
//noinspection unchecked
MappedBatchPublisher<K, V> loadFunction = (MappedBatchPublisher<K, V>) batchLoadFunction;
if (batchLoaderScheduler != null) {
BatchLoaderScheduler.ScheduledBatchPublisherCall loadCall = () -> loadFunction.load(keys, subscriber);
BatchLoaderScheduler.ScheduledBatchPublisherCall loadCall = () -> loadFunction.load(setOfKeys, subscriber);
batchLoaderScheduler.scheduleBatchPublisher(loadCall, keys, null);
} else {
loadFunction.load(keys, subscriber);
loadFunction.load(setOfKeys, subscriber);
}
}
return loadResult;
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/dataloader/MappedBatchPublisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import org.reactivestreams.Subscriber;

import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* A function that is invoked for batch loading a stream of data values indicated by the provided list of keys.
Expand All @@ -26,5 +26,5 @@ public interface MappedBatchPublisher<K, V> {
* @param keys the collection of keys to load
* @param subscriber as values arrive you must call the subscriber for each value
*/
void load(List<K> keys, Subscriber<Map.Entry<K, V>> subscriber);
void load(Set<K> keys, Subscriber<Map.Entry<K, V>> subscriber);
}
25 changes: 24 additions & 1 deletion src/test/java/ReadmeExamples.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import org.dataloader.BatchLoader;
import org.dataloader.BatchLoaderEnvironment;
import org.dataloader.BatchLoaderWithContext;
import org.dataloader.BatchPublisher;
import org.dataloader.CacheMap;
import org.dataloader.DataLoader;
import org.dataloader.DataLoaderFactory;
import org.dataloader.DataLoaderOptions;
import org.dataloader.MappedBatchLoaderWithContext;
import org.dataloader.MappedBatchPublisher;
import org.dataloader.Try;
import org.dataloader.fixtures.SecurityCtx;
import org.dataloader.fixtures.User;
Expand All @@ -15,6 +17,7 @@
import org.dataloader.scheduler.BatchLoaderScheduler;
import org.dataloader.stats.Statistics;
import org.dataloader.stats.ThreadLocalStatisticsCollector;
import org.reactivestreams.Subscriber;

import java.time.Duration;
import java.util.ArrayList;
Expand Down Expand Up @@ -171,7 +174,7 @@ private void tryExample() {
}
}

private void tryBatcLoader() {
private void tryBatchLoader() {
DataLoader<String, User> dataLoader = DataLoaderFactory.newDataLoaderWithTry(new BatchLoader<String, Try<User>>() {
@Override
public CompletionStage<List<Try<User>>> load(List<String> keys) {
Expand All @@ -187,6 +190,26 @@ public CompletionStage<List<Try<User>>> load(List<String> keys) {
});
}

private void batchPublisher() {
BatchPublisher<Long, User> batchPublisher = new BatchPublisher<Long, User>() {
@Override
public void load(List<Long> userIds, Subscriber<User> userSubscriber) {
userManager.publishUsersById(userIds, userSubscriber);
}
};
DataLoader<Long, User> userLoader = DataLoaderFactory.newPublisherDataLoader(batchPublisher);
}

private void mappedBatchPublisher() {
MappedBatchPublisher<Long, User> mappedBatchPublisher = new MappedBatchPublisher<Long, User>() {
@Override
public void load(Set<Long> userIds, Subscriber<Map.Entry<Long, User>> userEntrySubscriber) {
userManager.publishUsersById(userIds, userEntrySubscriber);
}
};
DataLoader<Long, User> userLoader = DataLoaderFactory.newMappedPublisherDataLoader(mappedBatchPublisher);
}

DataLoader<String, User> userDataLoader;

private void clearCacheOnError() {
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/org/dataloader/DataLoaderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,7 @@ public void should_work_with_duplicate_keys_when_caching_disabled(TestDataLoader
assertThat(future1.get(), equalTo("A"));
assertThat(future2.get(), equalTo("B"));
assertThat(future3.get(), equalTo("A"));
if (factory instanceof MappedDataLoaderFactory) {
if (factory instanceof MappedDataLoaderFactory || factory instanceof MappedPublisherDataLoaderFactory) {
assertThat(loadCalls, equalTo(singletonList(asList("A", "B"))));
} else {
assertThat(loadCalls, equalTo(singletonList(asList("A", "B", "A"))));
Expand Down
11 changes: 11 additions & 0 deletions src/test/java/org/dataloader/fixtures/UserManager.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package org.dataloader.fixtures;

import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;

import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -52,6 +55,14 @@ public List<User> loadUsersById(List<Long> userIds) {
return userIds.stream().map(this::loadUserById).collect(Collectors.toList());
}

public void publishUsersById(List<Long> userIds, Subscriber<? super User> userSubscriber) {
Flux.fromIterable(loadUsersById(userIds)).subscribe(userSubscriber);
}

public void publishUsersById(Set<Long> userIds, Subscriber<? super Map.Entry<Long, User>> userEntrySubscriber) {
Flux.fromIterable(loadMapOfUsersByIds(null, userIds).entrySet()).subscribe(userEntrySubscriber);
}

public Map<Long, User> loadMapOfUsersByIds(SecurityCtx callCtx, Set<Long> userIds) {
Map<Long, User> map = new HashMap<>();
userIds.forEach(userId -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static org.dataloader.DataLoaderFactory.newMappedPublisherDataLoader;
import static org.dataloader.DataLoaderFactory.newMappedPublisherDataLoaderWithTry;

Expand Down Expand Up @@ -69,7 +73,7 @@ public <K> DataLoader<K, K> idLoaderBlowsUpsAfterN(int N, DataLoaderOptions opti
return newMappedPublisherDataLoader((keys, subscriber) -> {
loadCalls.add(new ArrayList<>(keys));

List<K> nKeys = keys.subList(0, N);
List<K> nKeys = keys.stream().limit(N).collect(toList());
Flux<Map.Entry<K, K>> subFlux = Flux.fromIterable(nKeys).map(k -> Map.entry(k, k));
subFlux.concatWith(Flux.error(new IllegalStateException("Error")))
.subscribe(subscriber);
Expand All @@ -81,7 +85,7 @@ public DataLoader<String, String> onlyReturnsNValues(int N, DataLoaderOptions op
return newMappedPublisherDataLoader((keys, subscriber) -> {
loadCalls.add(new ArrayList<>(keys));

List<String> nKeys = keys.subList(0, N);
List<String> nKeys = keys.stream().limit(N).collect(toList());
Flux.fromIterable(nKeys).map(k -> Map.entry(k, k))
.subscribe(subscriber);
}, options);
Expand Down

0 comments on commit c3e6ee5

Please sign in to comment.