From 7c7deae499f67805bf242b40c1933131bedb1859 Mon Sep 17 00:00:00 2001 From: Scott Mitchell Date: Tue, 5 Sep 2023 17:34:13 -0400 Subject: [PATCH] Add Publisher.zip Motivation: The zip operator helps combine results from multiple Publishers into a single Publisher. --- .../servicetalk/concurrent/api/Publisher.java | 724 ++++++++++++++++++ .../concurrent/api/PublisherZipper.java | 302 ++++++++ .../io/servicetalk/concurrent/api/Single.java | 4 +- .../concurrent/api/SingleZipper.java | 8 +- .../concurrent/api/PublisherZip3Test.java | 365 +++++++++ .../concurrent/api/PublisherZip4Test.java | 403 ++++++++++ .../concurrent/api/PublisherZip5Test.java | 462 +++++++++++ .../concurrent/api/PublisherZipWithTest.java | 312 ++++++++ .../tck/AbstractPublisherZipTckTest.java | 30 + .../PublisherSwitchMapDelayErrorTckTest.java | 4 +- .../tck/PublisherSwitchMapTckTest.java | 27 +- .../tck/PublisherZip3DelayErrorTckTest.java | 31 + .../tck/PublisherZip3TckTest.java | 31 + .../tck/PublisherZip4DelayErrorTckTest.java | 32 + .../tck/PublisherZip4TckTest.java | 32 + .../tck/PublisherZip5DelayErrorTckTest.java | 32 + .../tck/PublisherZip5TckTest.java | 32 + .../PublisherZipWithDelayErrorTckTest.java | 30 + .../tck/PublisherZipWithTckTest.java | 30 + 19 files changed, 2872 insertions(+), 19 deletions(-) create mode 100644 servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherZipper.java create mode 100644 servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherZip3Test.java create mode 100644 servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherZip4Test.java create mode 100644 servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherZip5Test.java create mode 100644 servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherZipWithTest.java create mode 100644 servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/AbstractPublisherZipTckTest.java create mode 100644 servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZip3DelayErrorTckTest.java create mode 100644 servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZip3TckTest.java create mode 100644 servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZip4DelayErrorTckTest.java create mode 100644 servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZip4TckTest.java create mode 100644 servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZip5DelayErrorTckTest.java create mode 100644 servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZip5TckTest.java create mode 100644 servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZipWithDelayErrorTckTest.java create mode 100644 servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZipWithTckTest.java diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java index f8cbb05870..2858361ea4 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java @@ -66,6 +66,7 @@ import static io.servicetalk.concurrent.api.PublisherDoOnUtils.doOnNextSupplier; import static io.servicetalk.concurrent.api.PublisherDoOnUtils.doOnRequestSupplier; import static io.servicetalk.concurrent.api.PublisherDoOnUtils.doOnSubscribeSupplier; +import static io.servicetalk.concurrent.api.PublisherZipper.ZIP_MAX_CONCURRENCY; import static io.servicetalk.concurrent.api.ReplayPublisher.newReplayPublisher; import static io.servicetalk.concurrent.internal.SubscriberUtils.deliverErrorFromSource; import static io.servicetalk.utils.internal.DurationUtils.toNanos; @@ -2370,6 +2371,134 @@ public final Publisher concatPropagateCancel(Completable next) { return new PublisherConcatWithCompletable<>(this, next, true); } + /** + * Create a new {@link Publisher} that emits the results of a specified zipper {@link BiFunction} to items emitted + * by {@code this} and {@code other}. + *

+ * From a sequential programming point of view this method is roughly equivalent to the following: + *

{@code
+     *      List thisResults = resultOfThisPublisher();
+     *      List otherResults = resultOfOtherPublisher();
+     *      assert thisResults.size() == otherResults.size();
+     *      List zippedResults = ...;
+     *      for (int i = 0; i < thisResults.size(); ++i) {
+     *        zippedResults.add(zipper.apply(thisResults.get(i), otherResults.get(i)));
+     *      }
+     *      return zippedResults;
+     * }
+ * @param other The other {@link Publisher} to zip. + * @param zipper Used to combine the completed results for each item from {@link Publisher}. + * @param The type of {@code other}. + * @param The result type of the zipper. + * @return a new {@link Publisher} that emits the results of a specified zipper {@link BiFunction} to items emitted + * by {@code this} and {@code other}. + * @see ReactiveX zip operator. + * @see #zipWith(Publisher, BiFunction, int) + * @see #zipWithDelayError(Publisher, BiFunction) + */ + public final Publisher zipWith(Publisher other, + BiFunction zipper) { + return zip(this, other, zipper); + } + + /** + * Create a new {@link Publisher} that emits the results of a specified zipper {@link BiFunction} to items emitted + * by {@code this} and {@code other}. + *

+ * From a sequential programming point of view this method is roughly equivalent to the following: + *

{@code
+     *      List thisResults = resultOfThisPublisher();
+     *      List otherResults = resultOfOtherPublisher();
+     *      assert thisResults.size() == otherResults.size();
+     *      List zippedResults = ...;
+     *      for (int i = 0; i < thisResults.size(); ++i) {
+     *        zippedResults.add(zipper.apply(thisResults.get(i), otherResults.get(i)));
+     *      }
+     *      return zippedResults;
+     * }
+ * @param other The other {@link Publisher} to zip. + * @param zipper Used to combine the completed results for each item from {@link Publisher}. + * @param maxOutstandingDemand maximum outstanding demand to each {@link Publisher}, used to limit queue sizes that + * are required between {@link Publisher}s that produce data at different rates. + * @param The type of {@code other}. + * @param The result type of the zipper. + * @return a new {@link Publisher} that emits the results of a specified zipper {@link BiFunction} to items emitted + * by {@code this} and {@code other}. + * @see ReactiveX zip operator. + * @see #zipWithDelayError(Publisher, BiFunction) + */ + public final Publisher zipWith(Publisher other, + BiFunction zipper, + int maxOutstandingDemand) { + return zip(this, other, zipper, maxOutstandingDemand); + } + + /** + * Create a new {@link Publisher} that emits the results of a specified zipper {@link BiFunction} to items emitted + * by {@code this} and {@code other}. If any of the {@link Publisher}s terminate with an error, the returned + * {@link Publisher} will wait for termination till all the other {@link Publisher}s have been subscribed and + * terminated, and then terminate with the first error. + *

+ * From a sequential programming point of view this method is roughly equivalent to the following: + *

{@code
+     *      List thisResults = resultOfThisPublisher();
+     *      List otherResults = resultOfOtherPublisher();
+     *      assert thisResults.size() == otherResults.size();
+     *      List zippedResults = ...;
+     *      for (int i = 0; i < thisResults.size(); ++i) {
+     *        zippedResults.add(zipper.apply(thisResults.get(i), otherResults.get(i)));
+     *      }
+     *      return zippedResults;
+     * }
+ * @param other The other {@link Publisher} to zip. + * @param zipper Used to combine the completed results for each item from each {@link Publisher}. + * @param The type of {@code other}. + * @param The result type of the zipper. + * @return a new {@link Publisher} that emits the results of a specified zipper {@link BiFunction} to items emitted + * by {@code this} and {@code other}. + * @see ReactiveX zip operator. + * @see #zipWithDelayError(Publisher, BiFunction, int) + * @see #zipWith(Publisher, BiFunction) + */ + public final Publisher zipWithDelayError(Publisher other, + BiFunction zipper) { + return zipDelayError(this, other, zipper); + } + + /** + * Create a new {@link Publisher} that emits the results of a specified zipper {@link BiFunction} to items emitted + * by {@code this} and {@code other}. If any of the {@link Publisher}s terminate with an error, the returned + * {@link Publisher} will wait for termination till all the other {@link Publisher}s have been subscribed and + * terminated, and then terminate with the first error. + *

+ * From a sequential programming point of view this method is roughly equivalent to the following: + *

{@code
+     *      List thisResults = resultOfThisPublisher();
+     *      List otherResults = resultOfOtherPublisher();
+     *      assert thisResults.size() == otherResults.size();
+     *      List zippedResults = ...;
+     *      for (int i = 0; i < thisResults.size(); ++i) {
+     *        zippedResults.add(zipper.apply(thisResults.get(i), otherResults.get(i)));
+     *      }
+     *      return zippedResults;
+     * }
+ * @param other The other {@link Publisher} to zip. + * @param zipper Used to combine the completed results for each item from each {@link Publisher}. + * @param maxOutstandingDemand maximum outstanding demand to each {@link Publisher}, used to limit queue sizes that + * are required between {@link Publisher}s that produce data at different rates. + * @param The type of {@code other}. + * @param The result type of the zipper. + * @return a new {@link Publisher} that emits the results of a specified zipper {@link BiFunction} to items emitted + * by {@code this} and {@code other}. + * @see ReactiveX zip operator. + * @see #zipWith(Publisher, BiFunction) + */ + public final Publisher zipWithDelayError(Publisher other, + BiFunction zipper, + int maxOutstandingDemand) { + return zipDelayError(this, other, zipper, maxOutstandingDemand); + } + /** * Re-subscribes to this {@link Publisher} if an error is emitted and the passed {@link BiIntPredicate} returns * {@code true}. @@ -4786,6 +4915,601 @@ public static Publisher mergeAllDelayError(int maxConcurrency, Publisher< return from(publishers).flatMapMergeDelayError(identity(), maxConcurrency); } + /** + * Create a new {@link Publisher} that emits the results of a specified zipper {@link BiFunction} to items emitted + * by {@code this} and {@code other}. + *

+ * From a sequential programming point of view this method is roughly equivalent to the following: + *

{@code
+     *      List p1Results = resultOfP1Publisher();
+     *      List p2Results = resultOfP2Publisher();
+     *      assert p1Results.size() == p2Results.size();
+     *      List zippedResults = ...;
+     *      for (int i = 0; i < p1Results.size(); ++i) {
+     *        zippedResults.add(zipper.apply(p1Results.get(i), p2Results.get(i)));
+     *      }
+     *      return zippedResults;
+     * }
+ * @param p1 The first {@link Publisher} to zip. + * @param p2 The second {@link Publisher} to zip. + * @param zipper Used to combine the completed results for each item from all {@link Publisher}s. + * @param The type of {@code p1}. + * @param The type of {@code p2}. + * @param The result type of the zipper. + * @return a new {@link Publisher} that emits the results of a specified zipper {@link BiFunction} to items emitted + * by {@code p1} and {@code p2}. + * @see ReactiveX zip operator. + * @see #zip(Publisher, Publisher, BiFunction, int) + * @see #zipDelayError(Publisher, Publisher, BiFunction) + */ + public static Publisher zip(Publisher p1, Publisher p2, + BiFunction zipper) { + return zip(p1, p2, zipper, ZIP_MAX_CONCURRENCY); + } + + /** + * Create a new {@link Publisher} that emits the results of a specified zipper {@link BiFunction} to items emitted + * by {@code this} and {@code other}. + *

+ * From a sequential programming point of view this method is roughly equivalent to the following: + *

{@code
+     *      List p1Results = resultOfP1Publisher();
+     *      List p2Results = resultOfP2Publisher();
+     *      assert p1Results.size() == p2Results.size();
+     *      List zippedResults = ...;
+     *      for (int i = 0; i < p1Results.size(); ++i) {
+     *        zippedResults.add(zipper.apply(p1Results.get(i), p2Results.get(i)));
+     *      }
+     *      return zippedResults;
+     * }
+ * @param p1 The first {@link Publisher} to zip. + * @param p2 The second {@link Publisher} to zip. + * @param zipper Used to combine the completed results for each item from all {@link Publisher}s. + * @param maxOutstandingDemand maximum outstanding demand to each {@link Publisher}, used to limit queue sizes that + * are required between {@link Publisher}s that produce data at different rates. + * @param The type of {@code p1}. + * @param The type of {@code p2}. + * @param The result type of the zipper. + * @return a new {@link Publisher} that emits the results of a specified zipper {@link BiFunction} to items emitted + * by {@code p1} and {@code p2}. + * @see ReactiveX zip operator. + * @see #zipDelayError(Publisher, Publisher, BiFunction, int) + */ + public static Publisher zip(Publisher p1, Publisher p2, + BiFunction zipper, + int maxOutstandingDemand) { + return PublisherZipper.zip(false, maxOutstandingDemand, p1, p2, zipper); + } + + /** + * Create a new {@link Publisher} that emits the results of a specified zipper {@link BiFunction} to items emitted + * by {@code this} and {@code other}. If any of the {@link Publisher}s terminate with an error, the returned + * {@link Publisher} will wait for termination till all the other {@link Publisher}s have been subscribed and + * terminated, and then terminate with the first error. + *

+ * From a sequential programming point of view this method is roughly equivalent to the following: + *

{@code
+     *      List p1Results = resultOfP1Publisher();
+     *      List p2Results = resultOfP2Publisher();
+     *      assert p1Results.size() == p2Results.size();
+     *      List zippedResults = ...;
+     *      for (int i = 0; i < p1Results.size(); ++i) {
+     *        zippedResults.add(zipper.apply(p1Results.get(i), p2Results.get(i)));
+     *      }
+     *      return zippedResults;
+     * }
+ * @param p1 The first {@link Publisher} to zip. + * @param p2 The second {@link Publisher} to zip. + * @param zipper Used to combine the completed results for each item from all {@link Publisher}s. + * @param The type of {@code p1}. + * @param The type of {@code p2}. + * @param The result type of the zipper. + * @return a new {@link Publisher} that emits the results of a specified zipper {@link BiFunction} to items emitted + * by {@code p1} and {@code p2}. + * @see ReactiveX zip operator. + * @see #zipDelayError(Publisher, Publisher, BiFunction, int) + * @see #zip(Publisher, Publisher, BiFunction) + */ + public static Publisher zipDelayError(Publisher p1, Publisher p2, + BiFunction zipper) { + return zipDelayError(p1, p2, zipper, ZIP_MAX_CONCURRENCY); + } + + /** + * Create a new {@link Publisher} that emits the results of a specified zipper {@link BiFunction} to items emitted + * by {@code this} and {@code other}. If any of the {@link Publisher}s terminate with an error, the returned + * {@link Publisher} will wait for termination till all the other {@link Publisher}s have been subscribed and + * terminated, and then terminate with the first error. + *

+ * From a sequential programming point of view this method is roughly equivalent to the following: + *

{@code
+     *      List p1Results = resultOfP1Publisher();
+     *      List p2Results = resultOfP2Publisher();
+     *      assert p1Results.size() == p2Results.size();
+     *      List zippedResults = ...;
+     *      for (int i = 0; i < p1Results.size(); ++i) {
+     *        zippedResults.add(zipper.apply(p1Results.get(i), p2Results.get(i)));
+     *      }
+     *      return zippedResults;
+     * }
+ * @param p1 The first {@link Publisher} to zip. + * @param p2 The second {@link Publisher} to zip. + * @param zipper Used to combine the completed results for each item from all {@link Publisher}s. + * @param maxOutstandingDemand maximum outstanding demand to each {@link Publisher}, used to limit queue sizes that + * are required between {@link Publisher}s that produce data at different rates. + * @param The type of {@code p1}. + * @param The type of {@code p2}. + * @param The result type of the zipper. + * @return a new {@link Publisher} that emits the results of a specified zipper {@link BiFunction} to items emitted + * by {@code p1} and {@code p2}. + * @see ReactiveX zip operator. + * @see #zip(Publisher, Publisher, BiFunction, int) + */ + public static Publisher zipDelayError(Publisher p1, Publisher p2, + BiFunction zipper, + int maxOutstandingDemand) { + return PublisherZipper.zip(true, maxOutstandingDemand, p1, p2, zipper); + } + + /** + * Create a new {@link Publisher} that emits the results of a specified zipper {@link BiFunction} to items emitted + * by {@code this} and {@code other}. + *

+ * From a sequential programming point of view this method is roughly equivalent to the following: + *

{@code
+     *      List p1Results = resultOfP1Publisher();
+     *      List p2Results = resultOfP2Publisher();
+     *      List p3Results = resultOfP3Publisher();
+     *      assert p1Results.size() == p2Results.size() && p1Results.size() == p3Results.size();
+     *      List zippedResults = ...;
+     *      for (int i = 0; i < p1Results.size(); ++i) {
+     *        zippedResults.add(zipper.apply(p1Results.get(i), p2Results.get(i), p3Results.get(i)));
+     *      }
+     *      return zippedResults;
+     * }
+ * @param p1 The first {@link Publisher} to zip. + * @param p2 The second {@link Publisher} to zip. + * @param p3 The third {@link Publisher} to zip. + * @param zipper Used to combine the completed results for each item from all {@link Publisher}s. + * @param The type of {@code p1}. + * @param The type of {@code p2}. + * @param the type of {@code p3}. + * @param The result type of the zipper. + * @return a new {@link Publisher} that emits the results of a specified zipper {@link BiFunction} to items emitted + * by {@code p1}, {@code p2}, and {@code p3}. + * @see ReactiveX zip operator. + * @see #zip(Publisher, Publisher, Publisher, Function3, int) + * @see #zipDelayError(Publisher, Publisher, Publisher, Function3) + */ + public static Publisher zip( + Publisher p1, Publisher p2, Publisher p3, + Function3 zipper) { + return zip(p1, p2, p3, zipper, ZIP_MAX_CONCURRENCY); + } + + /** + * Create a new {@link Publisher} that emits the results of a specified zipper {@link BiFunction} to items emitted + * by {@code this} and {@code other}. + *

+ * From a sequential programming point of view this method is roughly equivalent to the following: + *

{@code
+     *      List p1Results = resultOfP1Publisher();
+     *      List p2Results = resultOfP2Publisher();
+     *      List p3Results = resultOfP3Publisher();
+     *      assert p1Results.size() == p2Results.size() && p1Results.size() == p3Results.size();
+     *      List zippedResults = ...;
+     *      for (int i = 0; i < p1Results.size(); ++i) {
+     *        zippedResults.add(zipper.apply(p1Results.get(i), p2Results.get(i), p3Results.get(i)));
+     *      }
+     *      return zippedResults;
+     * }
+ * @param p1 The first {@link Publisher} to zip. + * @param p2 The second {@link Publisher} to zip. + * @param p3 The third {@link Publisher} to zip. + * @param zipper Used to combine the completed results for each item from all {@link Publisher}s. + * @param maxOutstandingDemand maximum outstanding demand to each {@link Publisher}, used to limit queue sizes that + * are required between {@link Publisher}s that produce data at different rates. + * @param The type of {@code p1}. + * @param The type of {@code p2}. + * @param the type of {@code p3}. + * @param The result type of the zipper. + * @return a new {@link Publisher} that emits the results of a specified zipper {@link BiFunction} to items emitted + * by {@code p1}, {@code p2}, and {@code p3}. + * @see ReactiveX zip operator. + * @see #zipDelayError(Publisher, Publisher, Publisher, Function3, int) + */ + public static Publisher zip( + Publisher p1, Publisher p2, Publisher p3, + Function3 zipper, int maxOutstandingDemand) { + return PublisherZipper.zip(false, maxOutstandingDemand, p1, p2, p3, zipper); + } + + /** + * Create a new {@link Publisher} that emits the results of a specified zipper {@link BiFunction} to items emitted + * by {@code this} and {@code other}. If any of the {@link Publisher}s terminate with an error, the returned + * {@link Publisher} will wait for termination till all the other {@link Publisher}s have been subscribed and + * terminated, and then terminate with the first error. + *

+ * From a sequential programming point of view this method is roughly equivalent to the following: + *

{@code
+     *      List p1Results = resultOfP1Publisher();
+     *      List p2Results = resultOfP2Publisher();
+     *      List p3Results = resultOfP3Publisher();
+     *      assert p1Results.size() == p2Results.size() && p1Results.size() == p3Results.size();
+     *      List zippedResults = ...;
+     *      for (int i = 0; i < p1Results.size(); ++i) {
+     *        zippedResults.add(zipper.apply(p1Results.get(i), p2Results.get(i), p3Results.get(i)));
+     *      }
+     *      return zippedResults;
+     * }
+ * @param p1 The first {@link Publisher} to zip. + * @param p2 The second {@link Publisher} to zip. + * @param p3 The third {@link Publisher} to zip. + * @param zipper Used to combine the completed results for each item from all {@link Publisher}s. + * @param The type of {@code p1}. + * @param The type of {@code p2}. + * @param the type of {@code p3}. + * @param The result type of the zipper. + * @return a new {@link Publisher} that emits the results of a specified zipper {@link BiFunction} to items emitted + * by {@code p1}, {@code p2}, and {@code p3}. + * @see ReactiveX zip operator. + * @see #zipDelayError(Publisher, Publisher, Publisher, Function3, int) + * @see #zip(Publisher, Publisher, Publisher, Function3) + */ + public static Publisher zipDelayError( + Publisher p1, Publisher p2, Publisher p3, + Function3 zipper) { + return zipDelayError(p1, p2, p3, zipper, ZIP_MAX_CONCURRENCY); + } + + /** + * Create a new {@link Publisher} that emits the results of a specified zipper {@link BiFunction} to items emitted + * by {@code this} and {@code other}. If any of the {@link Publisher}s terminate with an error, the returned + * {@link Publisher} will wait for termination till all the other {@link Publisher}s have been subscribed and + * terminated, and then terminate with the first error. + *

+ * From a sequential programming point of view this method is roughly equivalent to the following: + *

{@code
+     *      List p1Results = resultOfP1Publisher();
+     *      List p2Results = resultOfP2Publisher();
+     *      List p3Results = resultOfP3Publisher();
+     *      assert p1Results.size() == p2Results.size() && p1Results.size() == p3Results.size();
+     *      List zippedResults = ...;
+     *      for (int i = 0; i < p1Results.size(); ++i) {
+     *        zippedResults.add(zipper.apply(p1Results.get(i), p2Results.get(i), p3Results.get(i)));
+     *      }
+     *      return zippedResults;
+     * }
+ * @param p1 The first {@link Publisher} to zip. + * @param p2 The second {@link Publisher} to zip. + * @param p3 The third {@link Publisher} to zip. + * @param zipper Used to combine the completed results for each item from all {@link Publisher}s. + * @param maxOutstandingDemand maximum outstanding demand to each {@link Publisher}, used to limit queue sizes that + * are required between {@link Publisher}s that produce data at different rates. + * @param The type of {@code p1}. + * @param The type of {@code p2}. + * @param the type of {@code p3}. + * @param The result type of the zipper. + * @return a new {@link Publisher} that emits the results of a specified zipper {@link BiFunction} to items emitted + * by {@code p1}, {@code p2}, and {@code p3}. + * @see ReactiveX zip operator. + * @see #zip(Publisher, Publisher, Publisher, Function3, int) + */ + public static Publisher zipDelayError( + Publisher p1, Publisher p2, Publisher p3, + Function3 zipper, int maxOutstandingDemand) { + return PublisherZipper.zip(true, maxOutstandingDemand, p1, p2, p3, zipper); + } + + /** + * Create a new {@link Publisher} that emits the results of a specified zipper {@link BiFunction} to items emitted + * by {@code this} and {@code other}. + *

+ * From a sequential programming point of view this method is roughly equivalent to the following: + *

{@code
+     *      List p1Results = resultOfP1Publisher();
+     *      List p2Results = resultOfP2Publisher();
+     *      List p3Results = resultOfP3Publisher();
+     *      List p4Results = resultOfP3Publisher();
+     *      assert p1Results.size() == p2Results.size() && p1Results.size() == p3Results.size() &&
+     *             p1Results.size() == p4Results.size();
+     *      List zippedResults = ...;
+     *      for (int i = 0; i < p1Results.size(); ++i) {
+     *        zippedResults.add(zipper.apply(p1Results.get(i), p2Results.get(i), p3Results.get(i), r4Results.get(i)));
+     *      }
+     *      return zippedResults;
+     * }
+ * @param p1 The first {@link Publisher} to zip. + * @param p2 The second {@link Publisher} to zip. + * @param p3 The third {@link Publisher} to zip. + * @param p4 The third {@link Publisher} to zip. + * @param zipper Used to combine the completed results for each item from all {@link Publisher}s. + * @param The type of {@code p1}. + * @param The type of {@code p2}. + * @param the type of {@code p3}. + * @param the type of {@code p4}. + * @param The result type of the zipper. + * @return a new {@link Publisher} that emits the results of a specified zipper {@link BiFunction} to items emitted + * by {@code p1}, {@code p2}, {@code p3}, and {@code p4}. + * @see ReactiveX zip operator. + * @see #zip(Publisher, Publisher, Publisher, Publisher, Function4, int) + * @see #zipDelayError(Publisher, Publisher, Publisher, Publisher, Function4) + */ + public static Publisher zip( + Publisher p1, Publisher p2, Publisher p3, + Publisher p4, Function4 zipper) { + return zip(p1, p2, p3, p4, zipper, ZIP_MAX_CONCURRENCY); + } + + /** + * Create a new {@link Publisher} that emits the results of a specified zipper {@link BiFunction} to items emitted + * by {@code this} and {@code other}. + *

+ * From a sequential programming point of view this method is roughly equivalent to the following: + *

{@code
+     *      List p1Results = resultOfP1Publisher();
+     *      List p2Results = resultOfP2Publisher();
+     *      List p3Results = resultOfP3Publisher();
+     *      List p4Results = resultOfP3Publisher();
+     *      assert p1Results.size() == p2Results.size() && p1Results.size() == p3Results.size() &&
+     *             p1Results.size() == p4Results.size();
+     *      List zippedResults = ...;
+     *      for (int i = 0; i < p1Results.size(); ++i) {
+     *        zippedResults.add(zipper.apply(p1Results.get(i), p2Results.get(i), p3Results.get(i), r4Results.get(i)));
+     *      }
+     *      return zippedResults;
+     * }
+ * @param p1 The first {@link Publisher} to zip. + * @param p2 The second {@link Publisher} to zip. + * @param p3 The third {@link Publisher} to zip. + * @param p4 The third {@link Publisher} to zip. + * @param zipper Used to combine the completed results for each item from all {@link Publisher}s. + * @param maxOutstandingDemand maximum outstanding demand to each {@link Publisher}, used to limit queue sizes that + * are required between {@link Publisher}s that produce data at different rates. + * @param The type of {@code p1}. + * @param The type of {@code p2}. + * @param the type of {@code p3}. + * @param the type of {@code p4}. + * @param The result type of the zipper. + * @return a new {@link Publisher} that emits the results of a specified zipper {@link BiFunction} to items emitted + * by {@code p1}, {@code p2}, {@code p3}, and {@code p4}. + * @see ReactiveX zip operator. + * @see #zipDelayError(Publisher, Publisher, Publisher, Publisher, Function4, int) + */ + public static Publisher zip( + Publisher p1, Publisher p2, Publisher p3, + Publisher p4, Function4 zipper, + int maxOutstandingDemand) { + return PublisherZipper.zip(false, maxOutstandingDemand, p1, p2, p3, p4, zipper); + } + + /** + * Create a new {@link Publisher} that emits the results of a specified zipper {@link BiFunction} to items emitted + * by {@code this} and {@code other}. If any of the {@link Publisher}s terminate with an error, the returned + * {@link Publisher} will wait for termination till all the other {@link Publisher}s have been subscribed and + * terminated, and then terminate with the first error. + *

+ * From a sequential programming point of view this method is roughly equivalent to the following: + *

{@code
+     *      List p1Results = resultOfP1Publisher();
+     *      List p2Results = resultOfP2Publisher();
+     *      List p3Results = resultOfP3Publisher();
+     *      List p4Results = resultOfP3Publisher();
+     *      assert p1Results.size() == p2Results.size() && p1Results.size() == p3Results.size() &&
+     *             p1Results.size() == p4Results.size();
+     *      List zippedResults = ...;
+     *      for (int i = 0; i < p1Results.size(); ++i) {
+     *        zippedResults.add(zipper.apply(p1Results.get(i), p2Results.get(i), p3Results.get(i), r4Results.get(i)));
+     *      }
+     *      return zippedResults;
+     * }
+ * @param p1 The first {@link Publisher} to zip. + * @param p2 The second {@link Publisher} to zip. + * @param p3 The third {@link Publisher} to zip. + * @param p4 The third {@link Publisher} to zip. + * @param zipper Used to combine the completed results for each item from all {@link Publisher}s. + * @param The type of {@code p1}. + * @param The type of {@code p2}. + * @param the type of {@code p3}. + * @param the type of {@code p4}. + * @param The result type of the zipper. + * @return a new {@link Publisher} that emits the results of a specified zipper {@link BiFunction} to items emitted + * by {@code p1}, {@code p2}, {@code p3}, and {@code p4}. + * @see ReactiveX zip operator. + * @see #zipDelayError(Publisher, Publisher, Publisher, Publisher, Function4, int) + * @see #zip(Publisher, Publisher, Publisher, Publisher, Function4) + */ + public static Publisher zipDelayError( + Publisher p1, Publisher p2, Publisher p3, + Publisher p4, Function4 zipper) { + return zipDelayError(p1, p2, p3, p4, zipper, ZIP_MAX_CONCURRENCY); + } + + /** + * Create a new {@link Publisher} that emits the results of a specified zipper {@link BiFunction} to items emitted + * by {@code this} and {@code other}. If any of the {@link Publisher}s terminate with an error, the returned + * {@link Publisher} will wait for termination till all the other {@link Publisher}s have been subscribed and + * terminated, and then terminate with the first error. + *

+ * From a sequential programming point of view this method is roughly equivalent to the following: + *

{@code
+     *      List p1Results = resultOfP1Publisher();
+     *      List p2Results = resultOfP2Publisher();
+     *      List p3Results = resultOfP3Publisher();
+     *      List p4Results = resultOfP3Publisher();
+     *      assert p1Results.size() == p2Results.size() && p1Results.size() == p3Results.size() &&
+     *             p1Results.size() == p4Results.size();
+     *      List zippedResults = ...;
+     *      for (int i = 0; i < p1Results.size(); ++i) {
+     *        zippedResults.add(zipper.apply(p1Results.get(i), p2Results.get(i), p3Results.get(i), r4Results.get(i)));
+     *      }
+     *      return zippedResults;
+     * }
+ * @param p1 The first {@link Publisher} to zip. + * @param p2 The second {@link Publisher} to zip. + * @param p3 The third {@link Publisher} to zip. + * @param p4 The third {@link Publisher} to zip. + * @param zipper Used to combine the completed results for each item from all {@link Publisher}s. + * @param maxOutstandingDemand maximum outstanding demand to each {@link Publisher}, used to limit queue sizes that + * are required between {@link Publisher}s that produce data at different rates. + * @param The type of {@code p1}. + * @param The type of {@code p2}. + * @param the type of {@code p3}. + * @param the type of {@code p4}. + * @param The result type of the zipper. + * @return a new {@link Publisher} that emits the results of a specified zipper {@link BiFunction} to items emitted + * by {@code p1}, {@code p2}, {@code p3}, and {@code p4}. + * @see ReactiveX zip operator. + * @see #zip(Publisher, Publisher, Publisher, Publisher, Function4, int) + */ + public static Publisher zipDelayError( + Publisher p1, Publisher p2, Publisher p3, + Publisher p4, Function4 zipper, + int maxOutstandingDemand) { + return PublisherZipper.zip(true, maxOutstandingDemand, p1, p2, p3, p4, zipper); + } + + /** + * Create a new {@link Publisher} that emits the results of a specified zipper {@link BiFunction} to items emitted + * by {@code this} and {@code other}. + *

+ * From a sequential programming point of view this method is roughly equivalent to the following: + *

{@code
+     *      Queue[] results = resultOfAllPublishers();
+     *      List zippedResults = ...;
+     *      for (int i = 0; i < results.length - 1; ++i) {
+     *        assert results[i].size() == results[i + 1].size(); // each publisher emits the same number of signals.
+     *      }
+     *      for (;;) {
+     *        Object[] objects = new Object[results.size()];
+     *        for (int i = 0; i < objects.length; ++i) {
+     *          objects[i] = results[i].poll();
+     *        }
+     *        zippedResults.add(zipper.apply(objects));
+     *      }
+     *      return zippedResults;
+     * }
+ * @param zipper Used to combine the completed results for each item from all {@link Publisher}s. + * @param publishers The {@link Publisher}s to zip together. + * @param The result type of the zipper. + * @return a new {@link Publisher} that emits the results of a specified zipper {@link BiFunction} to items emitted + * by {@code publishers}. + * @see ReactiveX zip operator. + * @see #zip(Function, int, Publisher[]) + * @see #zipDelayError(Function, Publisher[]) + */ + public static Publisher zip(Function zipper, Publisher... publishers) { + return zip(zipper, ZIP_MAX_CONCURRENCY, publishers); + } + + /** + * Create a new {@link Publisher} that emits the results of a specified zipper {@link BiFunction} to items emitted + * by {@code this} and {@code other}. + *

+ * From a sequential programming point of view this method is roughly equivalent to the following: + *

{@code
+     *      Queue[] results = resultOfAllPublishers();
+     *      List zippedResults = ...;
+     *      for (int i = 0; i < results.length - 1; ++i) {
+     *        assert results[i].size() == results[i + 1].size(); // each publisher emits the same number of signals.
+     *      }
+     *      for (;;) {
+     *        Object[] objects = new Object[results.size()];
+     *        for (int i = 0; i < objects.length; ++i) {
+     *          objects[i] = results[i].poll();
+     *        }
+     *        zippedResults.add(zipper.apply(objects));
+     *      }
+     *      return zippedResults;
+     * }
+ * @param zipper Used to combine the completed results for each item from all {@link Publisher}s. + * @param maxOutstandingDemand maximum outstanding demand to each {@link Publisher}, used to limit queue sizes that + * are required between {@link Publisher}s that produce data at different rates. + * @param publishers The {@link Publisher}s to zip together. + * @param The result type of the zipper. + * @return a new {@link Publisher} that emits the results of a specified zipper {@link BiFunction} to items emitted + * by {@code publishers}. + * @see ReactiveX zip operator. + * @see #zipDelayError(Function, int, Publisher[]) + */ + public static Publisher zip(Function zipper, int maxOutstandingDemand, + Publisher... publishers) { + return PublisherZipper.zip(false, maxOutstandingDemand, zipper, publishers); + } + + /** + * Create a new {@link Publisher} that emits the results of a specified zipper {@link BiFunction} to items emitted + * by {@code this} and {@code other}. If any of the {@link Publisher}s terminate with an error, the returned + * {@link Publisher} will wait for termination till all the other {@link Publisher}s have been subscribed and + * terminated, and then terminate with the first error. + *

+ * From a sequential programming point of view this method is roughly equivalent to the following: + *

{@code
+     *      Queue[] results = resultOfAllPublishers();
+     *      List zippedResults = ...;
+     *      for (int i = 0; i < results.length - 1; ++i) {
+     *        assert results[i].size() == results[i + 1].size(); // each publisher emits the same number of signals.
+     *      }
+     *      for (;;) {
+     *        Object[] objects = new Object[results.size()];
+     *        for (int i = 0; i < objects.length; ++i) {
+     *          objects[i] = results[i].poll();
+     *        }
+     *        zippedResults.add(zipper.apply(objects));
+     *      }
+     *      return zippedResults;
+     * }
+ * @param zipper Used to combine the completed results for each item from all {@link Publisher}s. + * @param publishers The {@link Publisher}s to zip together. + * @param The result type of the zipper. + * @return a new {@link Publisher} that emits the results of a specified zipper {@link BiFunction} to items emitted + * by {@code publishers}. + * @see ReactiveX zip operator. + * @see #zipDelayError(Function, int, Publisher[]) + * @see #zip(Function, Publisher[]) + */ + public static Publisher zipDelayError( + Function zipper, Publisher... publishers) { + return zipDelayError(zipper, ZIP_MAX_CONCURRENCY, publishers); + } + + /** + * Create a new {@link Publisher} that emits the results of a specified zipper {@link BiFunction} to items emitted + * by {@code this} and {@code other}. If any of the {@link Publisher}s terminate with an error, the returned + * {@link Publisher} will wait for termination till all the other {@link Publisher}s have been subscribed and + * terminated, and then terminate with the first error. + *

+ * From a sequential programming point of view this method is roughly equivalent to the following: + *

{@code
+     *      Queue[] results = resultOfAllPublishers();
+     *      List zippedResults = ...;
+     *      for (int i = 0; i < results.length - 1; ++i) {
+     *        assert results[i].size() == results[i + 1].size(); // each publisher emits the same number of signals.
+     *      }
+     *      for (;;) {
+     *        Object[] objects = new Object[results.size()];
+     *        for (int i = 0; i < objects.length; ++i) {
+     *          objects[i] = results[i].poll();
+     *        }
+     *        zippedResults.add(zipper.apply(objects));
+     *      }
+     *      return zippedResults;
+     * }
+ * @param zipper Used to combine the completed results for each item from all {@link Publisher}s. + * @param maxOutstandingDemand maximum outstanding demand to each {@link Publisher}, used to limit queue sizes that + * are required between {@link Publisher}s that produce data at different rates. + * @param publishers The {@link Publisher}s to zip together. + * @param The result type of the zipper. + * @return a new {@link Publisher} that emits the results of a specified zipper {@link BiFunction} to items emitted + * by {@code publishers}. + * @see ReactiveX zip operator. + * @see #zip(Function, int, Publisher[]) + */ + public static Publisher zipDelayError( + Function zipper, int maxOutstandingDemand, Publisher... publishers) { + return PublisherZipper.zip(true, maxOutstandingDemand, zipper, publishers); + } + // // Static Utility Methods End // diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherZipper.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherZipper.java new file mode 100644 index 0000000000..0f3b316e9a --- /dev/null +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherZipper.java @@ -0,0 +1,302 @@ +/* + * Copyright © 2023 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.concurrent.api; + +import io.servicetalk.concurrent.PublisherSource.Subscriber; +import io.servicetalk.concurrent.PublisherSource.Subscription; +import io.servicetalk.concurrent.api.SingleZipper.ZipArg; +import io.servicetalk.concurrent.internal.ConcurrentSubscription; +import io.servicetalk.concurrent.internal.FlowControlUtils; + +import java.lang.reflect.Array; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.function.BiFunction; +import java.util.function.Function; +import javax.annotation.Nullable; + +import static io.servicetalk.concurrent.api.Publisher.defer; +import static io.servicetalk.concurrent.api.Publisher.from; +import static io.servicetalk.concurrent.api.SubscriberApiUtils.unwrapNullUnchecked; +import static io.servicetalk.concurrent.api.SubscriberApiUtils.wrapNull; +import static io.servicetalk.concurrent.internal.ConcurrentUtils.calculateSourceRequested; +import static io.servicetalk.concurrent.internal.SubscriberUtils.isRequestNValid; +import static java.util.Objects.requireNonNull; +import static java.util.function.Function.identity; + +final class PublisherZipper { + static final int ZIP_MAX_CONCURRENCY = 64; + + private PublisherZipper() { + } + + @SuppressWarnings("unchecked") + static Publisher zip( + boolean delayError, int maxOutstandingDemand, Publisher p1, Publisher p2, + BiFunction zipper) { + return zip(delayError, maxOutstandingDemand, objects -> zipper.apply((T1) objects[0], (T2) objects[1]), p1, p2); + } + + @SuppressWarnings("unchecked") + static Publisher zip( + boolean delayError, int maxOutstandingDemand, Publisher p1, Publisher p2, + Publisher p3, Function3 zipper) { + return zip(delayError, maxOutstandingDemand, + objects -> zipper.apply((T1) objects[0], (T2) objects[1], (T3) objects[2]), p1, p2, p3); + } + + @SuppressWarnings("unchecked") + static Publisher zip( + boolean delayError, int maxOutstandingDemand, + Publisher p1, Publisher p2, Publisher p3, + Publisher p4, Function4 zipper) { + return zip(delayError, maxOutstandingDemand, + objects -> zipper.apply((T1) objects[0], (T2) objects[1], (T3) objects[2], (T4) objects[3]), + p1, p2, p3, p4); + } + + static Publisher zip(boolean delayError, int maxOutstandingDemand, + Function zipper, Publisher... publishers) { + if (maxOutstandingDemand <= 0) { + throw new IllegalArgumentException("maxOutstandingDemand: " + maxOutstandingDemand + " (expected>0)"); + } + return defer(() -> { + // flatMap doesn't require any ordering and so it always optimistically requests from all mapped Publishers + // as long as there is demand from downstream to ensure forward progress is made if some Publishers aren't + // producing. However, for the zip use case we queue all signals internally before applying the zipper, and + // request more from upstream if we don't have a signal from all Publishers. If we let flatMap request + // unrestricted we could end up queuing infinitely from 1 Publisher while none of the others are producing + // (or producing much more slowly) and run out of memory. To prevent this issue we limit upstream demand + // to each Publisher until we can deliver a zipped result downstream which means the zip queues are bounded. + @SuppressWarnings("unchecked") + RequestLimiterSubscriber[] demandSubscribers = (RequestLimiterSubscriber[]) + Array.newInstance(RequestLimiterSubscriber.class, publishers.length); + @SuppressWarnings("unchecked") + Publisher[] mappedPublishers = new Publisher[publishers.length]; + for (int i = 0; i < publishers.length; ++i) { + final int finalI = i; + mappedPublishers[i] = publishers[i].map(v -> new ZipArg(finalI, v)).liftSync(subscriber -> { + RequestLimiterSubscriber demandSubscriber = + new RequestLimiterSubscriber<>(subscriber, maxOutstandingDemand); + demandSubscribers[finalI] = demandSubscriber; + return demandSubscriber; + }); + } + + return (delayError ? from(mappedPublishers) + .flatMapMergeDelayError(identity(), mappedPublishers.length, mappedPublishers.length) : + from(mappedPublishers).flatMapMerge(identity(), mappedPublishers.length)) + .liftSync(new ZipPublisherOperator<>(mappedPublishers.length, zipper, demandSubscribers)) + .shareContextOnSubscribe(); + }); + } + + private static final class ZipPublisherOperator implements PublisherOperator { + private final int zipperArity; + private final Function zipper; + private final RequestLimiterSubscriber[] demandSubscribers; + + private ZipPublisherOperator(final int zipperArity, final Function zipper, + final RequestLimiterSubscriber[] demandSubscribers) { + if (zipperArity > 64 || zipperArity <= 0) { // long used as bit mask to check for non-empty queues. + throw new IllegalArgumentException("zipperArity " + zipperArity + "(expected: <64 && >0)"); + } + this.zipperArity = zipperArity; + this.zipper = requireNonNull(zipper); + this.demandSubscribers = requireNonNull(demandSubscribers); + } + + @Override + public Subscriber apply(final Subscriber subscriber) { + return new ZipSubscriber<>(subscriber, zipperArity, zipper, demandSubscribers); + } + + private static final class ZipSubscriber implements Subscriber { + private static final long ALL_NON_EMPTY_MASK = 0xFFFFFFFFFFFFFFFFL; + private final Subscriber subscriber; + private final Queue[] array; + private final Function zipper; + private final RequestLimiterSubscriber[] demandSubscribers; + private long nonEmptyQueueIndexes; + @Nullable + private Subscription subscription; + + @SuppressWarnings("unchecked") + private ZipSubscriber(final Subscriber subscriber, + final int zipperArity, + final Function zipper, + final RequestLimiterSubscriber[] demandSubscribers) { + this.subscriber = subscriber; + array = (Queue[]) Array.newInstance(Queue.class, zipperArity); + for (int i = 0; i < zipperArity; ++i) { + array[i] = new ArrayDeque<>(); + } + this.demandSubscribers = requireNonNull(demandSubscribers); + this.zipper = requireNonNull(zipper); + for (int i = 63; i >= zipperArity; --i) { + nonEmptyQueueIndexes |= (1L << i); + } + } + + @Override + public void onSubscribe(final Subscription s) { + this.subscription = ConcurrentSubscription.wrap(s); + subscriber.onSubscribe(subscription); + } + + @Override + public void onNext(@Nullable final ZipArg zipArg) { + assert zipArg != null; + array[zipArg.index].add(wrapNull(zipArg.value)); + nonEmptyQueueIndexes |= 1L << zipArg.index; + if (nonEmptyQueueIndexes == ALL_NON_EMPTY_MASK) { + Object[] zipArray = new Object[array.length]; + for (int i = 0; i < array.length; ++i) { + final Queue arrayQueue = array[i]; + final Object queuePoll = arrayQueue.poll(); + assert queuePoll != null; + if (arrayQueue.isEmpty()) { + nonEmptyQueueIndexes &= ~(1L << i); + } + zipArray[i] = unwrapNullUnchecked(queuePoll); + // Allow this subscriber to request more if demand is pending. + // Reentry: note that we call out to request more before we dequeued the current set of signals + // which in theory may result in out of order delivery. However, flatMap protects against + // reentry so no need to provide double protection in this method. + demandSubscribers[i].incrementSourceEmitted(); + } + subscriber.onNext(zipper.apply(zipArray)); + } else { + assert subscription != null; + subscription.request(1); + } + } + + @Override + public void onError(final Throwable t) { + subscriber.onError(t); + } + + @Override + public void onComplete() { + List nonEmptyIndexes = new ArrayList<>(); + for (int i = 0; i < array.length; ++i) { + if ((nonEmptyQueueIndexes & (1L << i)) != 0) { + nonEmptyIndexes.add(i); + } + } + if (nonEmptyIndexes.isEmpty()) { + subscriber.onComplete(); + } else { + StringBuilder sb = new StringBuilder(20 + 68 + nonEmptyIndexes.size() * 4); + sb.append("Publisher indexes: ["); + Iterator itr = nonEmptyIndexes.iterator(); + sb.append(itr.next()); // safe to call next(), already checked is not empty. + while (itr.hasNext()) { + sb.append(", ").append(itr.next()); + } + sb.append("] had onNext signals queued when onComplete terminal signal received"); + subscriber.onError(new IllegalStateException(sb.toString())); + } + } + } + } + + /** + * Limits the outstanding demand upstream to a positive {@link Integer} value. + * @param The type of data. + */ + private static final class RequestLimiterSubscriber implements Subscriber { + @SuppressWarnings("rawtypes") + private static final AtomicLongFieldUpdater sourceEmittedUpdater = + AtomicLongFieldUpdater.newUpdater(RequestLimiterSubscriber.class, "sourceEmitted"); + @SuppressWarnings("rawtypes") + private static final AtomicLongFieldUpdater sourceRequestedUpdater = + AtomicLongFieldUpdater.newUpdater(RequestLimiterSubscriber.class, "sourceRequested"); + @SuppressWarnings("rawtypes") + private static final AtomicLongFieldUpdater requestedUpdater = + AtomicLongFieldUpdater.newUpdater(RequestLimiterSubscriber.class, "requested"); + private final Subscriber subscriber; + private volatile long sourceEmitted; + @SuppressWarnings("unused") + private volatile long sourceRequested; + private volatile long requested; + private final int maxConcurrency; + @Nullable + private Subscription subscription; + + RequestLimiterSubscriber(final Subscriber subscriber, + final int maxConcurrency) { + this.subscriber = subscriber; + this.maxConcurrency = maxConcurrency; + } + + @Override + public void onSubscribe(final Subscription s) { + this.subscription = ConcurrentSubscription.wrap(s); + subscriber.onSubscribe(new Subscription() { + @Override + public void request(final long n) { + if (isRequestNValid(n)) { + requestedUpdater.accumulateAndGet(RequestLimiterSubscriber.this, n, + FlowControlUtils::addWithOverflowProtection); + recalculateDemand(); + } else { + subscription.request(n); + } + } + + @Override + public void cancel() { + subscription.cancel(); + } + }); + } + + @Override + public void onNext(@Nullable final T t) { + subscriber.onNext(t); + } + + @Override + public void onError(final Throwable t) { + subscriber.onError(t); + } + + @Override + public void onComplete() { + subscriber.onComplete(); + } + + void incrementSourceEmitted() { + sourceEmittedUpdater.incrementAndGet(this); + recalculateDemand(); + } + + private void recalculateDemand() { + final long actualSourceRequestN = calculateSourceRequested(requestedUpdater, sourceRequestedUpdater, + sourceEmittedUpdater, maxConcurrency, this); + if (actualSourceRequestN != 0) { + assert subscription != null; + subscription.request(actualSourceRequestN); + } + } + } +} diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Single.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Single.java index f6cf6cccca..fae5555bb9 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Single.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Single.java @@ -858,7 +858,7 @@ public final Publisher concatPropagateCancel(Publisher next) { * return zipper.apply(f1.get(), other.get()); * } * @param other The other {@link Single} to zip with. - * @param zipper Used to combine the completed results for each item from {@code singles}. + * @param zipper Used to combine the completed results for each item from each {@link Single}. * @param The type of {@code other}. * @param The result type of the zipper. * @return a new {@link Single} that emits the results of a specified zipper {@link BiFunction} to items emitted by @@ -884,7 +884,7 @@ public final Single zipWith(Single other, * return zipper.apply(f1.get(), other.get()); * } * @param other The other {@link Single} to zip with. - * @param zipper Used to combine the completed results for each item from {@code singles}. + * @param zipper Used to combine the completed results for each item from each {@link Single}. * @param The type of {@code other}. * @param The result type of the zipper. * @return a new {@link Single} that emits the results of a specified zipper {@link BiFunction} to items emitted by diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SingleZipper.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SingleZipper.java index 79eac6ff7c..f1c71ebeef 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SingleZipper.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SingleZipper.java @@ -127,12 +127,12 @@ static Single zipDelayError(Function zippe }).map(zipper); } - private static final class ZipArg { - private final int index; + static final class ZipArg { + final int index; @Nullable - private final Object value; + final Object value; - private ZipArg(final int index, final Object value) { + ZipArg(final int index, final Object value) { this.index = index; this.value = value; } diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherZip3Test.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherZip3Test.java new file mode 100644 index 0000000000..92cdd5bd22 --- /dev/null +++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherZip3Test.java @@ -0,0 +1,365 @@ +/* + * Copyright © 2023 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.concurrent.api; + +import io.servicetalk.concurrent.PublisherSource; +import io.servicetalk.concurrent.internal.DeliberateException; +import io.servicetalk.concurrent.test.internal.TestPublisherSubscriber; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import javax.annotation.Nullable; + +import static io.servicetalk.concurrent.api.Publisher.zip; +import static io.servicetalk.concurrent.api.Publisher.zipDelayError; +import static io.servicetalk.concurrent.api.SourceAdapters.fromSource; +import static io.servicetalk.concurrent.api.SourceAdapters.toSource; +import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION; +import static java.lang.Math.max; +import static java.util.Arrays.asList; +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; + +final class PublisherZip3Test { + private TestSubscription firstSubscription; + private TestSubscription secondSubscription; + private TestSubscription thirdSubscription; + private TestPublisher first; + private TestPublisher second; + private TestPublisher third; + private TestPublisherSubscriber subscriber; + + @BeforeEach + void setUp() { + firstSubscription = new TestSubscription(); + secondSubscription = new TestSubscription(); + thirdSubscription = new TestSubscription(); + first = new TestPublisher.Builder().disableAutoOnSubscribe().build(subscriber1 -> { + subscriber1.onSubscribe(firstSubscription); + return subscriber1; + }); + second = new TestPublisher.Builder().disableAutoOnSubscribe().build(subscriber1 -> { + subscriber1.onSubscribe(secondSubscription); + return subscriber1; + }); + third = new TestPublisher.Builder().disableAutoOnSubscribe().build(subscriber1 -> { + subscriber1.onSubscribe(thirdSubscription); + return subscriber1; + }); + subscriber = new TestPublisherSubscriber<>(); + } + + @ParameterizedTest(name = "{displayName} [{index}] inOrderCompletion={0} onError={1}") + @CsvSource(value = {"true,true", "true,false", "false,true", "false,false"}) + void allComplete(boolean inOrderCompletion, boolean onError) throws InterruptedException { + toSource(zip(first, second, third, PublisherZip3Test::combine)).subscribe(subscriber); + subscriber.awaitSubscription().request(1); + int i = 3; + double d = 10.23; + short s = 88; + firstSubscription.awaitRequestN(1); + secondSubscription.awaitRequestN(1); + thirdSubscription.awaitRequestN(1); + if (inOrderCompletion) { + first.onNext(i); + second.onNext(d); + third.onNext(s); + } else { + third.onNext(s); + second.onNext(d); + first.onNext(i); + } + assertThat(subscriber.takeOnNext(), equalTo(combine(i, d, s))); + terminateSubscribers(inOrderCompletion, onError); + } + + @ParameterizedTest(name = "{displayName} [{index}] errorNum={0}") + @ValueSource(ints = {1, 2, 3}) + void justError(int errorNum) { + toSource(zip(first, second, third, PublisherZip3Test::combine)).subscribe(subscriber); + subscriber.awaitSubscription().request(1); + if (errorNum == 1) { + first.onError(DELIBERATE_EXCEPTION); + } else if (errorNum == 2) { + second.onError(DELIBERATE_EXCEPTION); + } else { + third.onError(DELIBERATE_EXCEPTION); + } + assertThat(subscriber.awaitOnError(), is(DELIBERATE_EXCEPTION)); + } + + @Test + void justCancel() throws InterruptedException { + toSource(zip(first, second, third, PublisherZip3Test::combine)).subscribe(subscriber); + subscriber.awaitSubscription().cancel(); + firstSubscription.awaitCancelled(); + secondSubscription.awaitCancelled(); + thirdSubscription.awaitCancelled(); + } + + @ParameterizedTest(name = "{displayName} [{index}] inOrderCompletion={0}") + @ValueSource(booleans = {true, false}) + void cancelWhilePending(boolean inOrderCompletion) throws InterruptedException { + toSource(zip(first, second, third, PublisherZip3Test::combine)).subscribe(subscriber); + PublisherSource.Subscription sub = subscriber.awaitSubscription(); + sub.request(1); + int i = 3; + double d = 10.23; + short s = 88; + firstSubscription.awaitRequestN(1); + secondSubscription.awaitRequestN(1); + thirdSubscription.awaitRequestN(1); + if (inOrderCompletion) { + first.onNext(i); + second.onNext(d); + } else { + third.onNext(s); + second.onNext(d); + } + sub.cancel(); + firstSubscription.awaitCancelled(); + secondSubscription.awaitCancelled(); + thirdSubscription.awaitCancelled(); + } + + @ParameterizedTest(name = "{displayName} [{index}] inOrderCompletion={0} onError={1}") + @CsvSource(value = {"true,true", "true,false", "false,true", "false,false"}) + void terminateWithOutstandingSignals(boolean inOrderCompletion, boolean onError) throws InterruptedException { + toSource(zip(first, second, third, PublisherZip3Test::combine)).subscribe(subscriber); + subscriber.awaitSubscription().request(1); + int i = 3; + double d = 10.23; + short s = 88; + firstSubscription.awaitRequestN(1); + secondSubscription.awaitRequestN(1); + thirdSubscription.awaitRequestN(1); + if (inOrderCompletion) { + first.onNext(i); + second.onNext(d); + if (onError) { + third.onError(DELIBERATE_EXCEPTION); + } else { + first.onComplete(); + second.onComplete(); + third.onComplete(); + } + } else { + third.onNext(s); + second.onNext(d); + if (onError) { + first.onError(DELIBERATE_EXCEPTION); + } else { + third.onComplete(); + second.onComplete(); + first.onComplete(); + } + } + if (onError) { + assertThat(subscriber.awaitOnError(), is(DELIBERATE_EXCEPTION)); + } else { + assertThat(subscriber.awaitOnError(), instanceOf(IllegalStateException.class)); + } + } + + @ParameterizedTest(name = "{displayName} [{index}] inOrderCompletion={0} onError={1}") + @CsvSource(value = {"true,true", "true,false", "false,true", "false,false"}) + void maxOutstandingDemand(boolean inOrderCompletion, boolean onError) throws InterruptedException { + toSource(zip(first, second, third, PublisherZip3Test::combine, 1)).subscribe(subscriber); + subscriber.awaitSubscription().request(Long.MAX_VALUE); + int i = 3; + double d = 10.23; + short s = 88; + firstSubscription.awaitRequestN(1); + assertThat(firstSubscription.requested(), equalTo(1L)); + secondSubscription.awaitRequestN(1); + assertThat(secondSubscription.requested(), equalTo(1L)); + thirdSubscription.awaitRequestN(1); + assertThat(thirdSubscription.requested(), equalTo(1L)); + + if (inOrderCompletion) { + first.onNext(i); + assertThat(firstSubscription.requested(), equalTo(1L)); + second.onNext(d); + assertThat(secondSubscription.requested(), equalTo(1L)); + third.onNext(s); + } else { + third.onNext(s); + assertThat(thirdSubscription.requested(), equalTo(1L)); + second.onNext(d); + assertThat(secondSubscription.requested(), equalTo(1L)); + first.onNext(i); + } + assertThat(subscriber.takeOnNext(), equalTo(combine(i, d, s))); + + firstSubscription.awaitRequestN(2); + assertThat(firstSubscription.requested(), equalTo(2L)); + secondSubscription.awaitRequestN(2); + assertThat(secondSubscription.requested(), equalTo(2L)); + thirdSubscription.awaitRequestN(2); + assertThat(thirdSubscription.requested(), equalTo(2L)); + + i = 5; + d = 6.7; + s = 12; + if (inOrderCompletion) { + first.onNext(i); + second.onNext(d); + third.onNext(s); + } else { + third.onNext(s); + second.onNext(d); + first.onNext(i); + } + assertThat(subscriber.takeOnNext(), equalTo(combine(i, d, s))); + terminateSubscribers(inOrderCompletion, onError); + } + + @ParameterizedTest(name = "{displayName} [{index}] begin={0} end={1}") + @CsvSource(value = {"0,2", "0,4", "0,10"}) + void reentry(int begin, int end) throws InterruptedException { + final String completeSignal = "complete"; + final BlockingQueue signals = new LinkedBlockingQueue<>(); + toSource(zip(fromSource(new ReentryPublisher(begin, end)), + fromSource(new ReentryPublisher(begin, end)), + fromSource(new ReentryPublisher(begin, end)), + PublisherZip3Test::combineReentry, 1)) + .subscribe(new PublisherSource.Subscriber() { + @Nullable + private PublisherSource.Subscription subscription; + + @Override + public void onSubscribe(PublisherSource.Subscription s) { + subscription = s; + // Request more than 1 bcz a potential reentry condition is triggered from inside the zip + // operator due to how demand is managed, and we want pending downstream demand to allow for + // reentry delivery. + subscription.request(max(end, 10)); + } + + @Override + public void onNext(@Nullable String next) { + assert subscription != null; + signals.add(requireNonNull(next)); + subscription.request(1); + } + + @Override + public void onError(Throwable t) { + signals.add(t); + } + + @Override + public void onComplete() { + signals.add(completeSignal); + } + }); + Object signal; + for (int i = begin; i < end; ++i) { + signal = signals.take(); + assertThat(signal, equalTo(combineReentry(i, i, i))); + } + signal = signals.take(); + assertThat(signal, is(completeSignal)); + assertThat(signals.isEmpty(), equalTo(true)); + } + + @ParameterizedTest(name = "{displayName} [{index}] inOrderCompletion={0} onError={1}") + @CsvSource(value = {"true,true", "true,false", "false,true", "false,false"}) + void delayError(boolean inOrderCompletion, boolean onError) throws InterruptedException { + toSource(zipDelayError(first, second, third, PublisherZip3Test::combine)).subscribe(subscriber); + subscriber.awaitSubscription().request(1); + int i = 3; + double d = 10.23; + short s = 88; + firstSubscription.awaitRequestN(1); + secondSubscription.awaitRequestN(1); + thirdSubscription.awaitRequestN(1); + DeliberateException e1 = new DeliberateException(); + DeliberateException e2 = new DeliberateException(); + DeliberateException e3 = new DeliberateException(); + if (inOrderCompletion) { + first.onNext(i); + second.onNext(d); + third.onError(e1); + } else { + third.onNext(s); + second.onNext(d); + first.onError(e1); + } + assertThat(subscriber.pollTerminal(10, MILLISECONDS), nullValue()); + if (inOrderCompletion) { + if (onError) { + first.onError(e2); + second.onError(e3); + } else { + first.onComplete(); + second.onComplete(); + } + } else if (onError) { + third.onError(e2); + second.onError(e3); + } else { + third.onComplete(); + second.onComplete(); + } + assertThat(subscriber.awaitOnError(), is(e1)); + if (onError) { + assertThat(asList(e1.getSuppressed()), contains(e2, e3)); + } + } + + private void terminateSubscribers(boolean inOrderCompletion, boolean onError) { + if (onError) { + if (inOrderCompletion) { + first.onError(DELIBERATE_EXCEPTION); + } else { + third.onError(DELIBERATE_EXCEPTION); + } + assertThat(subscriber.awaitOnError(), is(DELIBERATE_EXCEPTION)); + } else { + if (inOrderCompletion) { + first.onComplete(); + second.onComplete(); + third.onComplete(); + } else { + third.onComplete(); + second.onComplete(); + first.onComplete(); + } + subscriber.awaitOnComplete(); + } + } + + private static String combineReentry(int i, int d, int s) { + return "first: " + i + " second: " + d + " third: " + s; + } + + private static String combine(int i, double d, short s) { + return "int: " + i + " double: " + d + " short: " + s; + } +} diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherZip4Test.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherZip4Test.java new file mode 100644 index 0000000000..bec1de50be --- /dev/null +++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherZip4Test.java @@ -0,0 +1,403 @@ +/* + * Copyright © 2023 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.concurrent.api; + +import io.servicetalk.concurrent.PublisherSource; +import io.servicetalk.concurrent.internal.DeliberateException; +import io.servicetalk.concurrent.test.internal.TestPublisherSubscriber; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import javax.annotation.Nullable; + +import static io.servicetalk.concurrent.api.Publisher.zip; +import static io.servicetalk.concurrent.api.Publisher.zipDelayError; +import static io.servicetalk.concurrent.api.SourceAdapters.fromSource; +import static io.servicetalk.concurrent.api.SourceAdapters.toSource; +import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION; +import static java.lang.Math.max; +import static java.util.Arrays.asList; +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; + +final class PublisherZip4Test { + private TestSubscription firstSubscription; + private TestSubscription secondSubscription; + private TestSubscription thirdSubscription; + private TestSubscription fourthSubscription; + private TestPublisher first; + private TestPublisher second; + private TestPublisher third; + private TestPublisher fourth; + private TestPublisherSubscriber subscriber; + + @BeforeEach + void setUp() { + firstSubscription = new TestSubscription(); + secondSubscription = new TestSubscription(); + thirdSubscription = new TestSubscription(); + fourthSubscription = new TestSubscription(); + first = new TestPublisher.Builder().disableAutoOnSubscribe().build(subscriber1 -> { + subscriber1.onSubscribe(firstSubscription); + return subscriber1; + }); + second = new TestPublisher.Builder().disableAutoOnSubscribe().build(subscriber1 -> { + subscriber1.onSubscribe(secondSubscription); + return subscriber1; + }); + third = new TestPublisher.Builder().disableAutoOnSubscribe().build(subscriber1 -> { + subscriber1.onSubscribe(thirdSubscription); + return subscriber1; + }); + fourth = new TestPublisher.Builder().disableAutoOnSubscribe().build(subscriber1 -> { + subscriber1.onSubscribe(fourthSubscription); + return subscriber1; + }); + subscriber = new TestPublisherSubscriber<>(); + } + + @ParameterizedTest(name = "{displayName} [{index}] inOrderCompletion={0} onError={1}") + @CsvSource(value = {"true,true", "true,false", "false,true", "false,false"}) + void allComplete(boolean inOrderCompletion, boolean onError) throws InterruptedException { + toSource(zip(first, second, third, fourth, PublisherZip4Test::combine)).subscribe(subscriber); + subscriber.awaitSubscription().request(1); + int i = 3; + double d = 10.23; + short s = 88; + byte b = 9; + firstSubscription.awaitRequestN(1); + secondSubscription.awaitRequestN(1); + thirdSubscription.awaitRequestN(1); + fourthSubscription.awaitRequestN(1); + if (inOrderCompletion) { + first.onNext(i); + second.onNext(d); + third.onNext(s); + fourth.onNext(b); + } else { + fourth.onNext(b); + third.onNext(s); + second.onNext(d); + first.onNext(i); + } + assertThat(subscriber.takeOnNext(), equalTo(combine(i, d, s, b))); + terminateSubscribers(inOrderCompletion, onError); + } + + @ParameterizedTest(name = "{displayName} [{index}] errorNum={0}") + @ValueSource(ints = {1, 2, 3, 4}) + void justError(int errorNum) { + toSource(zip(first, second, third, fourth, PublisherZip4Test::combine)).subscribe(subscriber); + subscriber.awaitSubscription().request(1); + if (errorNum == 1) { + first.onError(DELIBERATE_EXCEPTION); + } else if (errorNum == 2) { + second.onError(DELIBERATE_EXCEPTION); + } else if (errorNum == 3) { + third.onError(DELIBERATE_EXCEPTION); + } else { + fourth.onError(DELIBERATE_EXCEPTION); + } + assertThat(subscriber.awaitOnError(), is(DELIBERATE_EXCEPTION)); + } + + @ParameterizedTest(name = "{displayName} [{index}] inOrderCompletion={0}") + @ValueSource(booleans = {true, false}) + void cancelWhilePending(boolean inOrderCompletion) throws InterruptedException { + toSource(zip(first, second, third, fourth, PublisherZip4Test::combine)).subscribe(subscriber); + PublisherSource.Subscription sub = subscriber.awaitSubscription(); + sub.request(1); + int i = 3; + double d = 10.23; + short s = 88; + byte b = 9; + firstSubscription.awaitRequestN(1); + secondSubscription.awaitRequestN(1); + thirdSubscription.awaitRequestN(1); + fourthSubscription.awaitRequestN(1); + if (inOrderCompletion) { + first.onNext(i); + second.onNext(d); + third.onNext(s); + } else { + fourth.onNext(b); + third.onNext(s); + second.onNext(d); + } + sub.cancel(); + firstSubscription.awaitCancelled(); + secondSubscription.awaitCancelled(); + thirdSubscription.awaitCancelled(); + fourthSubscription.awaitCancelled(); + } + + @ParameterizedTest(name = "{displayName} [{index}] inOrderCompletion={0} onError={1}") + @CsvSource(value = {"true,true", "true,false", "false,true", "false,false"}) + void terminateWithOutstandingSignals(boolean inOrderCompletion, boolean onError) throws InterruptedException { + toSource(zip(first, second, third, fourth, PublisherZip4Test::combine)).subscribe(subscriber); + subscriber.awaitSubscription().request(1); + int i = 3; + double d = 10.23; + short s = 88; + byte b = 9; + firstSubscription.awaitRequestN(1); + secondSubscription.awaitRequestN(1); + thirdSubscription.awaitRequestN(1); + fourthSubscription.awaitRequestN(1); + if (inOrderCompletion) { + first.onNext(i); + second.onNext(d); + third.onNext(s); + if (onError) { + fourth.onError(DELIBERATE_EXCEPTION); + } else { + first.onComplete(); + second.onComplete(); + third.onComplete(); + fourth.onComplete(); + } + } else { + fourth.onNext(b); + third.onNext(s); + second.onNext(d); + if (onError) { + first.onError(DELIBERATE_EXCEPTION); + } else { + fourth.onComplete(); + third.onComplete(); + second.onComplete(); + first.onComplete(); + } + } + if (onError) { + assertThat(subscriber.awaitOnError(), is(DELIBERATE_EXCEPTION)); + } else { + assertThat(subscriber.awaitOnError(), instanceOf(IllegalStateException.class)); + } + } + + @ParameterizedTest(name = "{displayName} [{index}] inOrderCompletion={0} onError={1}") + @CsvSource(value = {"true,true", "true,false", "false,true", "false,false"}) + void maxOutstandingDemand(boolean inOrderCompletion, boolean onError) throws InterruptedException { + toSource(zip(first, second, third, fourth, PublisherZip4Test::combine, 1)).subscribe(subscriber); + subscriber.awaitSubscription().request(Long.MAX_VALUE); + int i = 3; + double d = 10.23; + short s = 88; + byte b = 9; + firstSubscription.awaitRequestN(1); + assertThat(firstSubscription.requested(), equalTo(1L)); + secondSubscription.awaitRequestN(1); + assertThat(secondSubscription.requested(), equalTo(1L)); + thirdSubscription.awaitRequestN(1); + assertThat(thirdSubscription.requested(), equalTo(1L)); + fourthSubscription.awaitRequestN(1); + assertThat(fourthSubscription.requested(), equalTo(1L)); + + if (inOrderCompletion) { + first.onNext(i); + assertThat(firstSubscription.requested(), equalTo(1L)); + second.onNext(d); + assertThat(secondSubscription.requested(), equalTo(1L)); + third.onNext(s); + assertThat(thirdSubscription.requested(), equalTo(1L)); + fourth.onNext(b); + } else { + fourth.onNext(b); + assertThat(fourthSubscription.requested(), equalTo(1L)); + third.onNext(s); + assertThat(thirdSubscription.requested(), equalTo(1L)); + second.onNext(d); + assertThat(secondSubscription.requested(), equalTo(1L)); + first.onNext(i); + } + assertThat(subscriber.takeOnNext(), equalTo(combine(i, d, s, b))); + + firstSubscription.awaitRequestN(2); + assertThat(firstSubscription.requested(), equalTo(2L)); + secondSubscription.awaitRequestN(2); + assertThat(secondSubscription.requested(), equalTo(2L)); + thirdSubscription.awaitRequestN(2); + assertThat(thirdSubscription.requested(), equalTo(2L)); + fourthSubscription.awaitRequestN(2); + assertThat(fourthSubscription.requested(), equalTo(2L)); + + i = 5; + d = 6.7; + s = 12; + b = 2; + if (inOrderCompletion) { + first.onNext(i); + second.onNext(d); + third.onNext(s); + fourth.onNext(b); + } else { + fourth.onNext(b); + third.onNext(s); + second.onNext(d); + first.onNext(i); + } + assertThat(subscriber.takeOnNext(), equalTo(combine(i, d, s, b))); + terminateSubscribers(inOrderCompletion, onError); + } + + @ParameterizedTest(name = "{displayName} [{index}] begin={0} end={1}") + @CsvSource(value = {"0,2", "0,4", "0,10"}) + void reentry(int begin, int end) throws InterruptedException { + final String completeSignal = "complete"; + final BlockingQueue signals = new LinkedBlockingQueue<>(); + toSource(zip(fromSource(new ReentryPublisher(begin, end)), + fromSource(new ReentryPublisher(begin, end)), + fromSource(new ReentryPublisher(begin, end)), + fromSource(new ReentryPublisher(begin, end)), + PublisherZip4Test::combineReentry, 1)) + .subscribe(new PublisherSource.Subscriber() { + @Nullable + private PublisherSource.Subscription subscription; + + @Override + public void onSubscribe(PublisherSource.Subscription s) { + subscription = s; + // Request more than 1 bcz a potential reentry condition is triggered from inside the zip + // operator due to how demand is managed, and we want pending downstream demand to allow for + // reentry delivery. + subscription.request(max(end, 10)); + } + + @Override + public void onNext(@Nullable String next) { + assert subscription != null; + signals.add(requireNonNull(next)); + subscription.request(1); + } + + @Override + public void onError(Throwable t) { + signals.add(t); + } + + @Override + public void onComplete() { + signals.add(completeSignal); + } + }); + Object signal; + for (int i = begin; i < end; ++i) { + signal = signals.take(); + assertThat(signal, equalTo(combineReentry(i, i, i, i))); + } + signal = signals.take(); + assertThat(signal, is(completeSignal)); + assertThat(signals.isEmpty(), equalTo(true)); + } + + @ParameterizedTest(name = "{displayName} [{index}] inOrderCompletion={0} onError={1}") + @CsvSource(value = {"true,true", "true,false", "false,true", "false,false"}) + void delayError(boolean inOrderCompletion, boolean onError) throws InterruptedException { + toSource(zipDelayError(first, second, third, fourth, PublisherZip4Test::combine)).subscribe(subscriber); + subscriber.awaitSubscription().request(1); + int i = 3; + double d = 10.23; + short s = 88; + byte b = 9; + firstSubscription.awaitRequestN(1); + secondSubscription.awaitRequestN(1); + thirdSubscription.awaitRequestN(1); + fourthSubscription.awaitRequestN(1); + DeliberateException e1 = new DeliberateException(); + DeliberateException e2 = new DeliberateException(); + DeliberateException e3 = new DeliberateException(); + DeliberateException e4 = new DeliberateException(); + if (inOrderCompletion) { + first.onNext(i); + second.onNext(d); + third.onNext(s); + fourth.onError(e1); + } else { + fourth.onNext(b); + third.onNext(s); + second.onNext(d); + first.onError(e1); + } + assertThat(subscriber.pollTerminal(10, MILLISECONDS), nullValue()); + if (inOrderCompletion) { + if (onError) { + first.onError(e2); + second.onError(e3); + third.onError(e4); + } else { + first.onComplete(); + second.onComplete(); + third.onComplete(); + } + } else if (onError) { + fourth.onError(e2); + third.onError(e3); + second.onError(e4); + } else { + fourth.onComplete(); + third.onComplete(); + second.onComplete(); + } + assertThat(subscriber.awaitOnError(), is(e1)); + if (onError) { + assertThat(asList(e1.getSuppressed()), contains(e2, e3, e4)); + } + } + + private void terminateSubscribers(boolean inOrderCompletion, boolean onError) { + if (onError) { + if (inOrderCompletion) { + first.onError(DELIBERATE_EXCEPTION); + } else { + fourth.onError(DELIBERATE_EXCEPTION); + } + assertThat(subscriber.awaitOnError(), is(DELIBERATE_EXCEPTION)); + } else { + if (inOrderCompletion) { + first.onComplete(); + second.onComplete(); + third.onComplete(); + fourth.onComplete(); + } else { + fourth.onComplete(); + third.onComplete(); + second.onComplete(); + first.onComplete(); + } + subscriber.awaitOnComplete(); + } + } + + private static String combineReentry(int i, int d, int s, int b) { + return "first: " + i + " second: " + d + " third: " + s + " fourth:" + b; + } + + private static String combine(int i, double d, short s, byte b) { + return "int: " + i + " double: " + d + " short: " + s + " byte: " + b; + } +} diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherZip5Test.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherZip5Test.java new file mode 100644 index 0000000000..8968529a6c --- /dev/null +++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherZip5Test.java @@ -0,0 +1,462 @@ +/* + * Copyright © 2023 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.concurrent.api; + +import io.servicetalk.concurrent.PublisherSource; +import io.servicetalk.concurrent.internal.DeliberateException; +import io.servicetalk.concurrent.test.internal.TestPublisherSubscriber; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.function.Function; +import javax.annotation.Nullable; + +import static io.servicetalk.concurrent.api.Publisher.zip; +import static io.servicetalk.concurrent.api.Publisher.zipDelayError; +import static io.servicetalk.concurrent.api.SourceAdapters.fromSource; +import static io.servicetalk.concurrent.api.SourceAdapters.toSource; +import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION; +import static java.lang.Math.max; +import static java.util.Arrays.asList; +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; + +final class PublisherZip5Test { + private TestSubscription firstSubscription; + private TestSubscription secondSubscription; + private TestSubscription thirdSubscription; + private TestSubscription fourthSubscription; + private TestSubscription fifthSubscription; + private TestPublisher first; + private TestPublisher second; + private TestPublisher third; + private TestPublisher fourth; + private TestPublisher fifth; + private TestPublisherSubscriber subscriber; + + @BeforeEach + void setUp() { + firstSubscription = new TestSubscription(); + secondSubscription = new TestSubscription(); + thirdSubscription = new TestSubscription(); + fourthSubscription = new TestSubscription(); + fifthSubscription = new TestSubscription(); + first = new TestPublisher.Builder().disableAutoOnSubscribe().build(subscriber1 -> { + subscriber1.onSubscribe(firstSubscription); + return subscriber1; + }); + second = new TestPublisher.Builder().disableAutoOnSubscribe().build(subscriber1 -> { + subscriber1.onSubscribe(secondSubscription); + return subscriber1; + }); + third = new TestPublisher.Builder().disableAutoOnSubscribe().build(subscriber1 -> { + subscriber1.onSubscribe(thirdSubscription); + return subscriber1; + }); + fourth = new TestPublisher.Builder().disableAutoOnSubscribe().build(subscriber1 -> { + subscriber1.onSubscribe(fourthSubscription); + return subscriber1; + }); + fifth = new TestPublisher.Builder().disableAutoOnSubscribe().build(subscriber1 -> { + subscriber1.onSubscribe(fifthSubscription); + return subscriber1; + }); + subscriber = new TestPublisherSubscriber<>(); + } + + @ParameterizedTest(name = "{displayName} [{index}] inOrderCompletion={0} onError={1}") + @CsvSource(value = {"true,true", "true,false", "false,true", "false,false"}) + void allComplete(boolean inOrderCompletion, boolean onError) throws InterruptedException { + toSource(zip(combineFunc(), first, second, third, fourth, fifth)).subscribe(subscriber); + subscriber.awaitSubscription().request(1); + int i = 3; + double d = 10.23; + short s = 88; + byte b = 9; + boolean bool = true; + firstSubscription.awaitRequestN(1); + secondSubscription.awaitRequestN(1); + thirdSubscription.awaitRequestN(1); + fourthSubscription.awaitRequestN(1); + fifthSubscription.awaitRequestN(1); + if (inOrderCompletion) { + first.onNext(i); + second.onNext(d); + third.onNext(s); + fourth.onNext(b); + fifth.onNext(bool); + } else { + fifth.onNext(bool); + fourth.onNext(b); + third.onNext(s); + second.onNext(d); + first.onNext(i); + } + assertThat(subscriber.takeOnNext(), equalTo(combine(i, d, s, b, bool))); + terminateSubscribers(inOrderCompletion, onError); + } + + @ParameterizedTest(name = "{displayName} [{index}] errorNum={0}") + @ValueSource(ints = {1, 2, 3, 4, 5}) + void justError(int errorNum) { + toSource(zip(combineFunc(), first, second, third, fourth, fifth)).subscribe(subscriber); + subscriber.awaitSubscription().request(1); + if (errorNum == 1) { + first.onError(DELIBERATE_EXCEPTION); + } else if (errorNum == 2) { + second.onError(DELIBERATE_EXCEPTION); + } else if (errorNum == 3) { + third.onError(DELIBERATE_EXCEPTION); + } else if (errorNum == 4) { + fourth.onError(DELIBERATE_EXCEPTION); + } else { + fifth.onError(DELIBERATE_EXCEPTION); + } + assertThat(subscriber.awaitOnError(), is(DELIBERATE_EXCEPTION)); + } + + @ParameterizedTest(name = "{displayName} [{index}] inOrderCompletion={0}") + @ValueSource(booleans = {true, false}) + void cancelWhilePending(boolean inOrderCompletion) throws InterruptedException { + toSource(zip(combineFunc(), first, second, third, fourth, fifth)).subscribe(subscriber); + PublisherSource.Subscription sub = subscriber.awaitSubscription(); + sub.request(1); + int i = 3; + double d = 10.23; + short s = 88; + byte b = 9; + boolean bool = true; + firstSubscription.awaitRequestN(1); + secondSubscription.awaitRequestN(1); + thirdSubscription.awaitRequestN(1); + fourthSubscription.awaitRequestN(1); + fifthSubscription.awaitRequestN(1); + if (inOrderCompletion) { + first.onNext(i); + second.onNext(d); + third.onNext(s); + fourth.onNext(b); + } else { + fifth.onNext(bool); + fourth.onNext(b); + third.onNext(s); + second.onNext(d); + } + sub.cancel(); + firstSubscription.awaitCancelled(); + secondSubscription.awaitCancelled(); + thirdSubscription.awaitCancelled(); + fourthSubscription.awaitCancelled(); + fifthSubscription.awaitCancelled(); + } + + @ParameterizedTest(name = "{displayName} [{index}] inOrderCompletion={0} onError={1}") + @CsvSource(value = {"true,true", "true,false", "false,true", "false,false"}) + void terminateWithOutstandingSignals(boolean inOrderCompletion, boolean onError) throws InterruptedException { + toSource(zip(combineFunc(), first, second, third, fourth, fifth)).subscribe(subscriber); + subscriber.awaitSubscription().request(1); + int i = 3; + double d = 10.23; + short s = 88; + byte b = 9; + boolean bool = true; + firstSubscription.awaitRequestN(1); + secondSubscription.awaitRequestN(1); + thirdSubscription.awaitRequestN(1); + fourthSubscription.awaitRequestN(1); + fifthSubscription.awaitRequestN(1); + if (inOrderCompletion) { + first.onNext(i); + second.onNext(d); + third.onNext(s); + fourth.onNext(b); + if (onError) { + fifth.onError(DELIBERATE_EXCEPTION); + } else { + first.onComplete(); + second.onComplete(); + third.onComplete(); + fourth.onComplete(); + fifth.onComplete(); + } + } else { + fifth.onNext(bool); + fourth.onNext(b); + third.onNext(s); + second.onNext(d); + if (onError) { + first.onError(DELIBERATE_EXCEPTION); + } else { + fifth.onComplete(); + fourth.onComplete(); + third.onComplete(); + second.onComplete(); + first.onComplete(); + } + } + if (onError) { + assertThat(subscriber.awaitOnError(), is(DELIBERATE_EXCEPTION)); + } else { + assertThat(subscriber.awaitOnError(), instanceOf(IllegalStateException.class)); + } + } + + @ParameterizedTest(name = "{displayName} [{index}] inOrderCompletion={0} onError={1}") + @CsvSource(value = {"true,true", "true,false", "false,true", "false,false"}) + void maxOutstandingDemand(boolean inOrderCompletion, boolean onError) throws InterruptedException { + toSource(zip(combineFunc(), 1, first, second, third, fourth, fifth)).subscribe(subscriber); + subscriber.awaitSubscription().request(Long.MAX_VALUE); + int i = 3; + double d = 10.23; + short s = 88; + byte b = 9; + boolean bool = true; + firstSubscription.awaitRequestN(1); + assertThat(firstSubscription.requested(), equalTo(1L)); + secondSubscription.awaitRequestN(1); + assertThat(secondSubscription.requested(), equalTo(1L)); + thirdSubscription.awaitRequestN(1); + assertThat(thirdSubscription.requested(), equalTo(1L)); + fourthSubscription.awaitRequestN(1); + assertThat(fourthSubscription.requested(), equalTo(1L)); + fifthSubscription.awaitRequestN(1); + assertThat(fifthSubscription.requested(), equalTo(1L)); + + if (inOrderCompletion) { + first.onNext(i); + assertThat(firstSubscription.requested(), equalTo(1L)); + second.onNext(d); + assertThat(secondSubscription.requested(), equalTo(1L)); + third.onNext(s); + assertThat(thirdSubscription.requested(), equalTo(1L)); + fourth.onNext(b); + assertThat(fourthSubscription.requested(), equalTo(1L)); + fifth.onNext(bool); + } else { + fifth.onNext(bool); + assertThat(fifthSubscription.requested(), equalTo(1L)); + fourth.onNext(b); + assertThat(fourthSubscription.requested(), equalTo(1L)); + third.onNext(s); + assertThat(thirdSubscription.requested(), equalTo(1L)); + second.onNext(d); + assertThat(secondSubscription.requested(), equalTo(1L)); + first.onNext(i); + } + assertThat(subscriber.takeOnNext(), equalTo(combine(i, d, s, b, bool))); + + firstSubscription.awaitRequestN(2); + assertThat(firstSubscription.requested(), equalTo(2L)); + secondSubscription.awaitRequestN(2); + assertThat(secondSubscription.requested(), equalTo(2L)); + thirdSubscription.awaitRequestN(2); + assertThat(thirdSubscription.requested(), equalTo(2L)); + fourthSubscription.awaitRequestN(2); + assertThat(fourthSubscription.requested(), equalTo(2L)); + fifthSubscription.awaitRequestN(2); + assertThat(fifthSubscription.requested(), equalTo(2L)); + + i = 5; + d = 6.7; + s = 12; + b = 2; + bool = false; + if (inOrderCompletion) { + first.onNext(i); + second.onNext(d); + third.onNext(s); + fourth.onNext(b); + fifth.onNext(bool); + } else { + fifth.onNext(bool); + fourth.onNext(b); + third.onNext(s); + second.onNext(d); + first.onNext(i); + } + assertThat(subscriber.takeOnNext(), equalTo(combine(i, d, s, b, bool))); + terminateSubscribers(inOrderCompletion, onError); + } + + @ParameterizedTest(name = "{displayName} [{index}] begin={0} end={1}") + @CsvSource(value = {"0,2", "0,4", "0,10"}) + void reentry(int begin, int end) throws InterruptedException { + final String completeSignal = "complete"; + final BlockingQueue signals = new LinkedBlockingQueue<>(); + toSource(zip(reentryCombineFunc(), 1, + fromSource(new ReentryPublisher(begin, end)), + fromSource(new ReentryPublisher(begin, end)), + fromSource(new ReentryPublisher(begin, end)), + fromSource(new ReentryPublisher(begin, end)), + fromSource(new ReentryPublisher(begin, end)))) + .subscribe(new PublisherSource.Subscriber() { + @Nullable + private PublisherSource.Subscription subscription; + + @Override + public void onSubscribe(PublisherSource.Subscription s) { + subscription = s; + // Request more than 1 bcz a potential reentry condition is triggered from inside the zip + // operator due to how demand is managed, and we want pending downstream demand to allow for + // reentry delivery. + subscription.request(max(end, 10)); + } + + @Override + public void onNext(@Nullable String next) { + assert subscription != null; + signals.add(requireNonNull(next)); + subscription.request(1); + } + + @Override + public void onError(Throwable t) { + signals.add(t); + } + + @Override + public void onComplete() { + signals.add(completeSignal); + } + }); + Object signal; + for (int i = begin; i < end; ++i) { + signal = signals.take(); + assertThat(signal, equalTo(combineReentry(i, i, i, i, i))); + } + signal = signals.take(); + assertThat(signal, is(completeSignal)); + assertThat(signals.isEmpty(), equalTo(true)); + } + + @ParameterizedTest(name = "{displayName} [{index}] inOrderCompletion={0} onError={1}") + @CsvSource(value = {"true,true", "true,false", "false,true", "false,false"}) + void delayError(boolean inOrderCompletion, boolean onError) throws InterruptedException { + toSource(zipDelayError(combineFunc(), first, second, third, fourth, fifth)).subscribe(subscriber); + subscriber.awaitSubscription().request(1); + int i = 3; + double d = 10.23; + short s = 88; + byte b = 9; + boolean bool = true; + firstSubscription.awaitRequestN(1); + secondSubscription.awaitRequestN(1); + thirdSubscription.awaitRequestN(1); + fourthSubscription.awaitRequestN(1); + fifthSubscription.awaitRequestN(1); + DeliberateException e1 = new DeliberateException(); + DeliberateException e2 = new DeliberateException(); + DeliberateException e3 = new DeliberateException(); + DeliberateException e4 = new DeliberateException(); + DeliberateException e5 = new DeliberateException(); + if (inOrderCompletion) { + first.onNext(i); + second.onNext(d); + third.onNext(s); + fourth.onNext(b); + fifth.onError(e1); + } else { + fifth.onNext(bool); + fourth.onNext(b); + third.onNext(s); + second.onNext(d); + first.onError(e1); + } + assertThat(subscriber.pollTerminal(10, MILLISECONDS), nullValue()); + if (inOrderCompletion) { + if (onError) { + first.onError(e2); + second.onError(e3); + third.onError(e4); + fourth.onError(e5); + } else { + first.onComplete(); + second.onComplete(); + third.onComplete(); + fourth.onComplete(); + } + } else if (onError) { + fifth.onError(e2); + fourth.onError(e3); + third.onError(e4); + second.onError(e5); + } else { + fifth.onComplete(); + fourth.onComplete(); + third.onComplete(); + second.onComplete(); + } + assertThat(subscriber.awaitOnError(), is(e1)); + if (onError) { + assertThat(asList(e1.getSuppressed()), contains(e2, e3, e4, e5)); + } + } + + private void terminateSubscribers(boolean inOrderCompletion, boolean onError) { + if (onError) { + if (inOrderCompletion) { + first.onError(DELIBERATE_EXCEPTION); + } else { + fifth.onError(DELIBERATE_EXCEPTION); + } + assertThat(subscriber.awaitOnError(), is(DELIBERATE_EXCEPTION)); + } else { + if (inOrderCompletion) { + first.onComplete(); + second.onComplete(); + third.onComplete(); + fourth.onComplete(); + fifth.onComplete(); + } else { + fifth.onComplete(); + fourth.onComplete(); + third.onComplete(); + second.onComplete(); + first.onComplete(); + } + subscriber.awaitOnComplete(); + } + } + + private static String combineReentry(int i, int d, int s, int b, int bool) { + return "first: " + i + " second: " + d + " third: " + s + " forth: " + b + " fifth: " + bool; + } + + private static String combine(int i, double d, short s, byte b, boolean bool) { + return "int: " + i + " double: " + d + " short: " + s + " byte: " + b + " boolean: " + bool; + } + + private static Function reentryCombineFunc() { + return objects -> combineReentry((Integer) objects[0], (Integer) objects[1], (Integer) objects[2], + (Integer) objects[3], (Integer) objects[4]); + } + + private static Function combineFunc() { + return objects -> combine((Integer) objects[0], (Double) objects[1], (Short) objects[2], (Byte) objects[3], + (Boolean) objects[4]); + } +} diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherZipWithTest.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherZipWithTest.java new file mode 100644 index 0000000000..bf2b6e481a --- /dev/null +++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherZipWithTest.java @@ -0,0 +1,312 @@ +/* + * Copyright © 2023 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.concurrent.api; + +import io.servicetalk.concurrent.PublisherSource; +import io.servicetalk.concurrent.PublisherSource.Subscription; +import io.servicetalk.concurrent.internal.DeliberateException; +import io.servicetalk.concurrent.test.internal.TestPublisherSubscriber; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import javax.annotation.Nullable; + +import static io.servicetalk.concurrent.api.SourceAdapters.fromSource; +import static io.servicetalk.concurrent.api.SourceAdapters.toSource; +import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION; +import static java.lang.Math.max; +import static java.util.Arrays.asList; +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; + +final class PublisherZipWithTest { + private TestSubscription firstSubscription; + private TestSubscription secondSubscription; + private TestPublisher first; + private TestPublisher second; + private TestPublisherSubscriber subscriber; + + @BeforeEach + void setUp() { + firstSubscription = new TestSubscription(); + secondSubscription = new TestSubscription(); + first = new TestPublisher.Builder().disableAutoOnSubscribe().build(subscriber1 -> { + subscriber1.onSubscribe(firstSubscription); + return subscriber1; + }); + second = new TestPublisher.Builder().disableAutoOnSubscribe().build(subscriber1 -> { + subscriber1.onSubscribe(secondSubscription); + return subscriber1; + }); + subscriber = new TestPublisherSubscriber<>(); + } + + @ParameterizedTest(name = "{displayName} [{index}] inOrderCompletion={0} onError={1}") + @CsvSource(value = {"true,true", "true,false", "false,true", "false,false"}) + void allComplete(boolean inOrderCompletion, boolean onError) throws InterruptedException { + toSource(first.zipWith(second, PublisherZipWithTest::combine)).subscribe(subscriber); + subscriber.awaitSubscription().request(1); + int i = 3; + double d = 10.23; + firstSubscription.awaitRequestN(1); + secondSubscription.awaitRequestN(1); + if (inOrderCompletion) { + first.onNext(i); + second.onNext(d); + } else { + second.onNext(d); + first.onNext(i); + } + assertThat(subscriber.takeOnNext(), equalTo(combine(i, d))); + terminateSubscribers(inOrderCompletion, onError); + } + + @ParameterizedTest(name = "{displayName} [{index}] inOrderCompletion={0}") + @ValueSource(booleans = {true, false}) + void justError(boolean inOrderCompletion) { + toSource(first.zipWith(second, PublisherZipWithTest::combine)).subscribe(subscriber); + subscriber.awaitSubscription().request(1); + if (inOrderCompletion) { + first.onError(DELIBERATE_EXCEPTION); + } else { + second.onError(DELIBERATE_EXCEPTION); + } + assertThat(subscriber.awaitOnError(), is(DELIBERATE_EXCEPTION)); + } + + @Test + void justCancel() throws InterruptedException { + toSource(first.zipWith(second, PublisherZipWithTest::combine)).subscribe(subscriber); + subscriber.awaitSubscription().cancel(); + firstSubscription.awaitCancelled(); + secondSubscription.awaitCancelled(); + } + + @ParameterizedTest(name = "{displayName} [{index}] inOrderCompletion={0}") + @ValueSource(booleans = {true, false}) + void cancelWhilePending(boolean inOrderCompletion) throws InterruptedException { + toSource(first.zipWith(second, PublisherZipWithTest::combine)).subscribe(subscriber); + Subscription s = subscriber.awaitSubscription(); + s.request(1); + int i = 3; + double d = 10.23; + firstSubscription.awaitRequestN(1); + secondSubscription.awaitRequestN(1); + if (inOrderCompletion) { + first.onNext(i); + } else { + second.onNext(d); + } + s.cancel(); + firstSubscription.awaitCancelled(); + secondSubscription.awaitCancelled(); + } + + @ParameterizedTest(name = "{displayName} [{index}] inOrderCompletion={0} onError={1}") + @CsvSource(value = {"true,true", "true,false", "false,true", "false,false"}) + void terminateWithOutstandingSignals(boolean inOrderCompletion, boolean onError) throws InterruptedException { + toSource(first.zipWith(second, PublisherZipWithTest::combine)).subscribe(subscriber); + subscriber.awaitSubscription().request(1); + int i = 3; + double d = 10.23; + firstSubscription.awaitRequestN(1); + secondSubscription.awaitRequestN(1); + if (inOrderCompletion) { + first.onNext(i); + if (onError) { + second.onError(DELIBERATE_EXCEPTION); + } else { + first.onComplete(); + second.onComplete(); + } + } else { + second.onNext(d); + if (onError) { + first.onError(DELIBERATE_EXCEPTION); + } else { + second.onComplete(); + first.onComplete(); + } + } + if (onError) { + assertThat(subscriber.awaitOnError(), is(DELIBERATE_EXCEPTION)); + } else { + assertThat(subscriber.awaitOnError(), instanceOf(IllegalStateException.class)); + } + } + + @ParameterizedTest(name = "{displayName} [{index}] inOrderCompletion={0} onError={1}") + @CsvSource(value = {"true,true", "true,false", "false,true", "false,false"}) + void maxOutstandingDemand(boolean inOrderCompletion, boolean onError) throws InterruptedException { + toSource(first.zipWith(second, PublisherZipWithTest::combine, 1)).subscribe(subscriber); + subscriber.awaitSubscription().request(Long.MAX_VALUE); + int i = 3; + double d = 10.23; + firstSubscription.awaitRequestN(1); + assertThat(firstSubscription.requested(), equalTo(1L)); + secondSubscription.awaitRequestN(1); + assertThat(secondSubscription.requested(), equalTo(1L)); + + if (inOrderCompletion) { + first.onNext(i); + assertThat(firstSubscription.requested(), equalTo(1L)); + second.onNext(d); + } else { + second.onNext(d); + assertThat(secondSubscription.requested(), equalTo(1L)); + first.onNext(i); + } + assertThat(subscriber.takeOnNext(), equalTo(combine(i, d))); + + firstSubscription.awaitRequestN(2); + assertThat(firstSubscription.requested(), equalTo(2L)); + secondSubscription.awaitRequestN(2); + assertThat(secondSubscription.requested(), equalTo(2L)); + + i = 5; + d = 6.7; + if (inOrderCompletion) { + first.onNext(i); + second.onNext(d); + } else { + second.onNext(d); + first.onNext(i); + } + assertThat(subscriber.takeOnNext(), equalTo(combine(i, d))); + terminateSubscribers(inOrderCompletion, onError); + } + + @ParameterizedTest(name = "{displayName} [{index}] begin={0} end={1}") + @CsvSource(value = {"0,2", "0,4", "0,10"}) + void reentry(int begin, int end) throws InterruptedException { + final String completeSignal = "complete"; + final BlockingQueue signals = new LinkedBlockingQueue<>(); + toSource(fromSource(new ReentryPublisher(begin, end)).zipWith(fromSource(new ReentryPublisher(begin, end)), + PublisherZipWithTest::combineReentry, 1)) + .subscribe(new PublisherSource.Subscriber() { + @Nullable + private Subscription subscription; + + @Override + public void onSubscribe(Subscription s) { + subscription = s; + // Request more than 1 bcz a potential reentry condition is triggered from inside the zip operator + // due to how demand is managed, and we want pending downstream demand to allow for reentry delivery. + subscription.request(max(end, 10)); + } + + @Override + public void onNext(@Nullable String next) { + assert subscription != null; + signals.add(requireNonNull(next)); + subscription.request(1); + } + + @Override + public void onError(Throwable t) { + signals.add(t); + } + + @Override + public void onComplete() { + signals.add(completeSignal); + } + }); + Object signal; + for (int i = begin; i < end; ++i) { + signal = signals.take(); + assertThat(signal, equalTo(combineReentry(i, i))); + } + signal = signals.take(); + assertThat(signal, is(completeSignal)); + assertThat(signals.isEmpty(), equalTo(true)); + } + + @ParameterizedTest(name = "{displayName} [{index}] inOrderCompletion={0} onError={1}") + @CsvSource(value = {"true,true", "true,false", "false,true", "false,false"}) + void delayError(boolean inOrderCompletion, boolean onError) throws InterruptedException { + toSource(first.zipWithDelayError(second, PublisherZipWithTest::combine)).subscribe(subscriber); + subscriber.awaitSubscription().request(1); + int i = 3; + double d = 10.23; + firstSubscription.awaitRequestN(1); + secondSubscription.awaitRequestN(1); + DeliberateException e1 = new DeliberateException(); + if (inOrderCompletion) { + first.onNext(i); + second.onError(e1); + } else { + second.onNext(d); + first.onError(e1); + } + assertThat(subscriber.pollTerminal(10, MILLISECONDS), nullValue()); + if (inOrderCompletion) { + if (onError) { + first.onError(DELIBERATE_EXCEPTION); + } else { + first.onComplete(); + } + } else if (onError) { + second.onError(DELIBERATE_EXCEPTION); + } else { + second.onComplete(); + } + assertThat(subscriber.awaitOnError(), is(e1)); + if (onError) { + assertThat(asList(e1.getSuppressed()), contains(DELIBERATE_EXCEPTION)); + } + } + + private void terminateSubscribers(boolean inOrderCompletion, boolean onError) { + if (onError) { + if (inOrderCompletion) { + first.onError(DELIBERATE_EXCEPTION); + } else { + second.onError(DELIBERATE_EXCEPTION); + } + assertThat(subscriber.awaitOnError(), is(DELIBERATE_EXCEPTION)); + } else { + if (inOrderCompletion) { + first.onComplete(); + second.onComplete(); + } else { + second.onComplete(); + first.onComplete(); + } + subscriber.awaitOnComplete(); + } + } + + private static String combineReentry(int i, int d) { + return "first: " + i + " second: " + d; + } + + private static String combine(int i, double d) { + return "int: " + i + " double: " + d; + } +} diff --git a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/AbstractPublisherZipTckTest.java b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/AbstractPublisherZipTckTest.java new file mode 100644 index 0000000000..a550d73a60 --- /dev/null +++ b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/AbstractPublisherZipTckTest.java @@ -0,0 +1,30 @@ +/* + * Copyright © 2023 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.concurrent.reactivestreams.tck; + +import org.testng.annotations.Ignore; + +public abstract class AbstractPublisherZipTckTest extends AbstractPublisherOperatorTckTest { + @Ignore("zip operator proactively requests from upstream, and will not deliver errors until demand comes") + @Override + public void required_spec309_requestNegativeNumberMustSignalIllegalArgumentException() { + } + + @Ignore("zip operator proactively requests from upstream, and will not deliver errors until demand comes") + @Override + public void required_spec309_requestZeroMustSignalIllegalArgumentException() { + } +} diff --git a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherSwitchMapDelayErrorTckTest.java b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherSwitchMapDelayErrorTckTest.java index cbf8b8c931..d9a8cfb8ff 100644 --- a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherSwitchMapDelayErrorTckTest.java +++ b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherSwitchMapDelayErrorTckTest.java @@ -16,7 +16,7 @@ package io.servicetalk.concurrent.reactivestreams.tck; import io.servicetalk.concurrent.api.Publisher; -import io.servicetalk.concurrent.reactivestreams.tck.PublisherSwitchMapTckTest.SingleUpstreamDemandOperator; +import io.servicetalk.concurrent.reactivestreams.tck.PublisherSwitchMapTckTest.OneUpstreamDemandOperator; import org.testng.annotations.Ignore; import org.testng.annotations.Test; @@ -29,7 +29,7 @@ public class PublisherSwitchMapDelayErrorTckTest extends AbstractPublisherOperat @Override protected Publisher composePublisher(Publisher publisher, int elements) { return defer(() -> { - final SingleUpstreamDemandOperator demandOperator = new SingleUpstreamDemandOperator<>(); + final OneUpstreamDemandOperator demandOperator = new OneUpstreamDemandOperator<>(); return publisher.liftAsync(demandOperator) .switchMapDelayError(i -> from(i).afterOnNext(x -> demandOperator.subscriberRef.get().decrementDemand())); diff --git a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherSwitchMapTckTest.java b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherSwitchMapTckTest.java index 13cd340620..efe38fa9bf 100644 --- a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherSwitchMapTckTest.java +++ b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherSwitchMapTckTest.java @@ -19,12 +19,13 @@ import io.servicetalk.concurrent.PublisherSource.Subscription; import io.servicetalk.concurrent.api.Publisher; import io.servicetalk.concurrent.api.PublisherOperator; +import io.servicetalk.concurrent.internal.ConcurrentSubscription; import io.servicetalk.concurrent.internal.DuplicateSubscribeException; import io.servicetalk.concurrent.internal.FlowControlUtils; import org.testng.annotations.Test; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; @@ -37,17 +38,17 @@ public class PublisherSwitchMapTckTest extends AbstractPublisherOperatorTckTest< @Override protected Publisher composePublisher(Publisher publisher, int elements) { return defer(() -> { - final SingleUpstreamDemandOperator demandOperator = new SingleUpstreamDemandOperator<>(); + final OneUpstreamDemandOperator demandOperator = new OneUpstreamDemandOperator<>(); return publisher.liftAsync(demandOperator) .switchMap(i -> from(i).afterOnNext(x -> demandOperator.subscriberRef.get().decrementDemand())); }); } - static final class SingleUpstreamDemandOperator implements PublisherOperator { - final AtomicReference> subscriberRef = new AtomicReference<>(); + static final class OneUpstreamDemandOperator implements PublisherOperator { + final AtomicReference> subscriberRef = new AtomicReference<>(); @Override public PublisherSource.Subscriber apply(final PublisherSource.Subscriber subscriber) { - SingleUpstreamDemandSubscriber sub = new SingleUpstreamDemandSubscriber<>(subscriber); + OneUpstreamDemandSubscriber sub = new OneUpstreamDemandSubscriber<>(subscriber); if (subscriberRef.compareAndSet(null, sub)) { return sub; } else { @@ -73,25 +74,29 @@ public void onComplete() { } } - static final class SingleUpstreamDemandSubscriber implements PublisherSource.Subscriber { - private final AtomicLong demand = new AtomicLong(); + static final class OneUpstreamDemandSubscriber implements PublisherSource.Subscriber { + @SuppressWarnings("rawtypes") + private static final AtomicLongFieldUpdater demandUpdater = + AtomicLongFieldUpdater.newUpdater(OneUpstreamDemandSubscriber.class, "demand"); private final PublisherSource.Subscriber subscriber; + private volatile long demand; @Nullable private Subscription subscription; - SingleUpstreamDemandSubscriber(final PublisherSource.Subscriber subscriber) { + OneUpstreamDemandSubscriber(final PublisherSource.Subscriber subscriber) { this.subscriber = subscriber; } @Override public void onSubscribe(final Subscription s) { - this.subscription = s; + this.subscription = ConcurrentSubscription.wrap(s); subscriber.onSubscribe(new Subscription() { @Override public void request(final long n) { if (n <= 0) { subscription.request(n); - } else if (demand.getAndAccumulate(n, FlowControlUtils::addWithOverflowProtection) == 0) { + } else if (demandUpdater.getAndAccumulate(OneUpstreamDemandSubscriber.this, n, + FlowControlUtils::addWithOverflowProtection) == 0) { subscription.request(1); } } @@ -119,7 +124,7 @@ public void onComplete() { } void decrementDemand() { - if (demand.decrementAndGet() > 0) { + if (demandUpdater.decrementAndGet(this) > 0) { assert subscription != null; subscription.request(1); } diff --git a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZip3DelayErrorTckTest.java b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZip3DelayErrorTckTest.java new file mode 100644 index 0000000000..7b8edbc99c --- /dev/null +++ b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZip3DelayErrorTckTest.java @@ -0,0 +1,31 @@ +/* + * Copyright © 2023 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.concurrent.reactivestreams.tck; + +import io.servicetalk.concurrent.api.Publisher; + +import org.testng.annotations.Test; + +import static io.servicetalk.concurrent.api.Publisher.range; +import static io.servicetalk.concurrent.api.Publisher.zipDelayError; + +@Test +public class PublisherZip3DelayErrorTckTest extends AbstractPublisherZipTckTest { + @Override + protected Publisher composePublisher(final Publisher publisher, final int elements) { + return zipDelayError(publisher, range(0, elements), range(0, elements), (result, ignored1, ignored2) -> result); + } +} diff --git a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZip3TckTest.java b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZip3TckTest.java new file mode 100644 index 0000000000..4a791e1254 --- /dev/null +++ b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZip3TckTest.java @@ -0,0 +1,31 @@ +/* + * Copyright © 2023 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.concurrent.reactivestreams.tck; + +import io.servicetalk.concurrent.api.Publisher; + +import org.testng.annotations.Test; + +import static io.servicetalk.concurrent.api.Publisher.range; +import static io.servicetalk.concurrent.api.Publisher.zip; + +@Test +public class PublisherZip3TckTest extends AbstractPublisherZipTckTest { + @Override + protected Publisher composePublisher(final Publisher publisher, final int elements) { + return zip(publisher, range(0, elements), range(0, elements), (result, ignored1, ignored2) -> result); + } +} diff --git a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZip4DelayErrorTckTest.java b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZip4DelayErrorTckTest.java new file mode 100644 index 0000000000..90b7e805b1 --- /dev/null +++ b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZip4DelayErrorTckTest.java @@ -0,0 +1,32 @@ +/* + * Copyright © 2023 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.concurrent.reactivestreams.tck; + +import io.servicetalk.concurrent.api.Publisher; + +import org.testng.annotations.Test; + +import static io.servicetalk.concurrent.api.Publisher.range; +import static io.servicetalk.concurrent.api.Publisher.zipDelayError; + +@Test +public class PublisherZip4DelayErrorTckTest extends AbstractPublisherZipTckTest { + @Override + protected Publisher composePublisher(final Publisher publisher, final int elements) { + return zipDelayError(publisher, range(0, elements), range(0, elements), range(0, elements), + (result, ignored1, ignored2, ignored3) -> result); + } +} diff --git a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZip4TckTest.java b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZip4TckTest.java new file mode 100644 index 0000000000..60d896f095 --- /dev/null +++ b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZip4TckTest.java @@ -0,0 +1,32 @@ +/* + * Copyright © 2023 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.concurrent.reactivestreams.tck; + +import io.servicetalk.concurrent.api.Publisher; + +import org.testng.annotations.Test; + +import static io.servicetalk.concurrent.api.Publisher.range; +import static io.servicetalk.concurrent.api.Publisher.zip; + +@Test +public class PublisherZip4TckTest extends AbstractPublisherZipTckTest { + @Override + protected Publisher composePublisher(final Publisher publisher, final int elements) { + return zip(publisher, range(0, elements), range(0, elements), range(0, elements), + (result, ignored1, ignored2, ignored3) -> result); + } +} diff --git a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZip5DelayErrorTckTest.java b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZip5DelayErrorTckTest.java new file mode 100644 index 0000000000..7dfb6e6675 --- /dev/null +++ b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZip5DelayErrorTckTest.java @@ -0,0 +1,32 @@ +/* + * Copyright © 2023 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.concurrent.reactivestreams.tck; + +import io.servicetalk.concurrent.api.Publisher; + +import org.testng.annotations.Test; + +import static io.servicetalk.concurrent.api.Publisher.range; +import static io.servicetalk.concurrent.api.Publisher.zipDelayError; + +@Test +public class PublisherZip5DelayErrorTckTest extends AbstractPublisherZipTckTest { + @Override + protected Publisher composePublisher(final Publisher publisher, final int elements) { + return zipDelayError(args -> (Integer) args[0], + publisher, range(0, elements), range(0, elements), range(0, elements), range(0, elements)); + } +} diff --git a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZip5TckTest.java b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZip5TckTest.java new file mode 100644 index 0000000000..9fff94b727 --- /dev/null +++ b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZip5TckTest.java @@ -0,0 +1,32 @@ +/* + * Copyright © 2023 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.concurrent.reactivestreams.tck; + +import io.servicetalk.concurrent.api.Publisher; + +import org.testng.annotations.Test; + +import static io.servicetalk.concurrent.api.Publisher.range; +import static io.servicetalk.concurrent.api.Publisher.zip; + +@Test +public class PublisherZip5TckTest extends AbstractPublisherZipTckTest { + @Override + protected Publisher composePublisher(final Publisher publisher, final int elements) { + return zip(args -> (Integer) args[0], + publisher, range(0, elements), range(0, elements), range(0, elements), range(0, elements)); + } +} diff --git a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZipWithDelayErrorTckTest.java b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZipWithDelayErrorTckTest.java new file mode 100644 index 0000000000..a2c7d5ceb2 --- /dev/null +++ b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZipWithDelayErrorTckTest.java @@ -0,0 +1,30 @@ +/* + * Copyright © 2023 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.concurrent.reactivestreams.tck; + +import io.servicetalk.concurrent.api.Publisher; + +import org.testng.annotations.Test; + +import static io.servicetalk.concurrent.api.Publisher.range; + +@Test +public class PublisherZipWithDelayErrorTckTest extends AbstractPublisherZipTckTest { + @Override + protected Publisher composePublisher(final Publisher publisher, final int elements) { + return publisher.zipWithDelayError(range(0, elements), (result, ignore) -> result); + } +} diff --git a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZipWithTckTest.java b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZipWithTckTest.java new file mode 100644 index 0000000000..1a6d301ea1 --- /dev/null +++ b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZipWithTckTest.java @@ -0,0 +1,30 @@ +/* + * Copyright © 2023 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.concurrent.reactivestreams.tck; + +import io.servicetalk.concurrent.api.Publisher; + +import org.testng.annotations.Test; + +import static io.servicetalk.concurrent.api.Publisher.range; + +@Test +public class PublisherZipWithTckTest extends AbstractPublisherZipTckTest { + @Override + protected Publisher composePublisher(final Publisher publisher, final int elements) { + return publisher.zipWith(range(0, elements), (result, ignore) -> result); + } +}