|
17 | 17 |
|
18 | 18 | import java.time.Duration;
|
19 | 19 | import java.util.ArrayList;
|
20 |
| -import java.util.Collections; |
21 | 20 | import java.util.List;
|
22 | 21 | import java.util.Map;
|
23 | 22 | import java.util.Optional;
|
@@ -58,15 +57,16 @@ protected RequestBatchManager(RequestBatchConfiguration overrideConfiguration,
|
58 | 57 | this.maxBatchItems = batchConfiguration.maxBatchItems();
|
59 | 58 | this.sendRequestFrequency = batchConfiguration.sendRequestFrequency();
|
60 | 59 | this.scheduledExecutor = Validate.notNull(scheduledExecutor, "Null scheduledExecutor");
|
61 |
| - pendingBatchResponses = Collections.newSetFromMap(new ConcurrentHashMap<>()); |
62 |
| - pendingResponses = Collections.newSetFromMap(new ConcurrentHashMap<>()); |
| 60 | + pendingBatchResponses = ConcurrentHashMap.newKeySet(); |
| 61 | + pendingResponses = ConcurrentHashMap.newKeySet(); |
63 | 62 | this.requestsAndResponsesMaps = new BatchingMap<>(overrideConfiguration);
|
64 | 63 |
|
65 | 64 | }
|
66 | 65 |
|
67 | 66 | public CompletableFuture<ResponseT> batchRequest(RequestT request) {
|
68 | 67 | CompletableFuture<ResponseT> response = new CompletableFuture<>();
|
69 | 68 | pendingResponses.add(response);
|
| 69 | + response.whenComplete((r, t) -> pendingResponses.remove(response)); |
70 | 70 |
|
71 | 71 | try {
|
72 | 72 | String batchKey = getBatchKey(request);
|
@@ -120,16 +120,17 @@ private void flushBuffer(String batchKey, Map<String, BatchingExecutionContext<R
|
120 | 120 | flushableRequests.forEach((contextId, batchExecutionContext) ->
|
121 | 121 | requestEntries.add(new IdentifiableMessage<>(contextId, batchExecutionContext.request())));
|
122 | 122 | if (!requestEntries.isEmpty()) {
|
123 |
| - CompletableFuture<BatchResponseT> pendingBatchingRequest = batchAndSend(requestEntries, batchKey) |
124 |
| - .whenComplete((result, ex) -> handleAndCompleteResponses(result, ex, flushableRequests)); |
125 |
| - |
| 123 | + CompletableFuture<BatchResponseT> pendingBatchingRequest = batchAndSend(requestEntries, batchKey); |
126 | 124 | pendingBatchResponses.add(pendingBatchingRequest);
|
| 125 | + pendingBatchingRequest.whenComplete((result, ex) -> { |
| 126 | + handleAndCompleteResponses(result, ex, flushableRequests); |
| 127 | + pendingBatchResponses.remove(pendingBatchingRequest); |
| 128 | + }); |
127 | 129 | }
|
128 | 130 | }
|
129 | 131 |
|
130 | 132 | private void handleAndCompleteResponses(BatchResponseT batchResult, Throwable exception,
|
131 | 133 | Map<String, BatchingExecutionContext<RequestT, ResponseT>> requests) {
|
132 |
| - requests.forEach((contextId, batchExecutionContext) -> pendingResponses.add(batchExecutionContext.response())); |
133 | 134 | if (exception != null) {
|
134 | 135 | requests.forEach((contextId, batchExecutionContext) -> batchExecutionContext.response()
|
135 | 136 | .completeExceptionally(exception));
|
|
0 commit comments