From 783a928e27e080f5232279be4baf6aa2a48c1f00 Mon Sep 17 00:00:00 2001 From: Matt Bonneau Date: Wed, 4 Mar 2020 00:09:51 -0500 Subject: [PATCH 1/4] Implement mergeDelayError --- src/Observable.php | 27 ++++ .../Operator/MergeDelayErrorTest.php | 129 ++++++++++++++++++ 2 files changed, 156 insertions(+) create mode 100644 test/Rx/Functional/Operator/MergeDelayErrorTest.php diff --git a/src/Observable.php b/src/Observable.php index 79db73ba..f06109bc 100644 --- a/src/Observable.php +++ b/src/Observable.php @@ -293,6 +293,33 @@ public function mergeAll(): Observable }); } + /** + * Combine an Observable together with another Observable by merging their emissions into a single Observable. + * This variant of merge will hold errors emitted from component observables until all observables have + * either completed or errored. + * + * @return Observable + * + * @operator + * @reactivex mergeDelayError + */ + public function mergeDelayError(Observable $o) : Observable { + return Observable::fromArray([$this, $o]) + ->reduce(function ($a, Observable $o) { + $s = new Subject(); + return [ + $a[0]->merge($o->catch(function (\Throwable $e) use ($s) { + $s->onError($e); + return Observable::empty(); + })), + $a[1]->merge($s) + ]; + }, [Observable::empty(), Observable::empty()]) + ->flatMap(function ($a) { + return $a[0]->concat($a[1]); + }); + } + /** * Converts an array to an observable sequence * diff --git a/test/Rx/Functional/Operator/MergeDelayErrorTest.php b/test/Rx/Functional/Operator/MergeDelayErrorTest.php new file mode 100644 index 00000000..48ebdfe7 --- /dev/null +++ b/test/Rx/Functional/Operator/MergeDelayErrorTest.php @@ -0,0 +1,129 @@ +createColdObservable(array( + onNext(100, 4), + onNext(200, 2), + onNext(300, 3), + onNext(400, 1), + onCompleted(500) + )); + + $ys = $this->createColdObservable(array( + onNext(50, 'foo'), + onNext(100, 'bar'), + onNext(150, 'baz'), + onError(160, new \Exception()), + onNext(200, 'qux'), + onCompleted(250) + )); + + $results = $this->scheduler->startWithCreate(function() use ($xs, $ys) { + return $xs->mergeDelayError($ys); + }); + + $this->assertMessages(array( + onNext(250, 'foo'), + onNext(300, 4), + onNext(300, 'bar'), + onNext(350, 'baz'), + onNext(400, 2), + onNext(500, 3), + onNext(600, 1), + onError(700, new \Exception()) + ), $results->getMessages()); + + $this->assertSubscriptions(array(subscribe(200, 700)), $xs->getSubscriptions()); + $this->assertSubscriptions(array(subscribe(200, 360)), $ys->getSubscriptions()); + } + + /** + * @test + */ + public function it_waits_for_complete_before_emitting_error_2() + { + $xs = $this->createColdObservable(array( + onNext(100, 4), + onError(160, new \Exception()), + onNext(200, 2), + onNext(300, 3), + onNext(400, 1), + onCompleted(500) + )); + + $ys = $this->createColdObservable(array( + onNext(50, 'foo'), + onNext(100, 'bar'), + onNext(150, 'baz'), + onNext(200, 'qux'), + onCompleted(250) + )); + + $results = $this->scheduler->startWithCreate(function() use ($xs, $ys) { + return $xs->mergeDelayError($ys); + }); + + $this->assertMessages(array( + onNext(250, 'foo'), + onNext(300, 4), + onNext(300, 'bar'), + onNext(350, 'baz'), + onNext(400, 'qux'), + onError(450, new \Exception()) + ), $results->getMessages()); + + $this->assertSubscriptions(array(subscribe(200, 360)), $xs->getSubscriptions()); + $this->assertSubscriptions(array(subscribe(200, 450)), $ys->getSubscriptions()); + } + + /** + * @test + */ + public function it_works_when_both_sources_error() + { + $xs = $this->createColdObservable(array( + onNext(100, 4), + onError(160, new \Exception()), + onNext(200, 2), + onNext(300, 3), + onNext(400, 1), + onCompleted(500) + )); + + $ys = $this->createColdObservable(array( + onNext(50, 'foo'), + onNext(100, 'bar'), + onNext(150, 'baz'), + onError(161, new \Exception()), + onNext(200, 'qux'), + onCompleted(250) + )); + + $results = $this->scheduler->startWithCreate(function() use ($xs, $ys) { + return $xs->mergeDelayError($ys); + }); + + $this->assertMessages(array( + onNext(250, 'foo'), + onNext(300, 4), + onNext(300, 'bar'), + onNext(350, 'baz'), + onError(361, new \Exception()) + ), $results->getMessages()); + + $this->assertSubscriptions(array(subscribe(200, 360)), $xs->getSubscriptions()); + $this->assertSubscriptions(array(subscribe(200, 361)), $ys->getSubscriptions()); + } +} \ No newline at end of file From 3607b96bdcb8fa8c75d53507948e48b5b5d134f4 Mon Sep 17 00:00:00 2001 From: Matt Bonneau Date: Wed, 4 Mar 2020 00:23:55 -0500 Subject: [PATCH 2/4] Support PHP 8 and add more 7s to travis --- .travis.yml | 12 ++++++++++++ composer.json | 2 +- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 2a63b16e..7f3cb8c3 100644 --- a/.travis.yml +++ b/.travis.yml @@ -40,6 +40,18 @@ matrix: env: - dependencies=highest - dropPlatform=false + - php: 7.3 + env: + - dependencies=highest + - php: 7.3 + env: + - dependencies=lowest + - php: 7.4 + env: + - dependencies=highest + - php: 7.4 + env: + - dependencies=lowest before_script: composer install diff --git a/composer.json b/composer.json index 02612e4f..cd8bc87c 100644 --- a/composer.json +++ b/composer.json @@ -20,7 +20,7 @@ } ], "require": { - "php": "~7.0", + "php": "~7.0 || ~8.0", "react/promise": "~2.2" }, "require-dev": { From 5b78c8bc1949a80892b101430b54cf9dcbd8fc3a Mon Sep 17 00:00:00 2001 From: Matt Bonneau Date: Wed, 4 Mar 2020 00:34:59 -0500 Subject: [PATCH 3/4] Remove PHP versions from travis because tests are incompatible at the moment --- .travis.yml | 24 +++++++----------------- 1 file changed, 7 insertions(+), 17 deletions(-) diff --git a/.travis.yml b/.travis.yml index 7f3cb8c3..ff7262a7 100644 --- a/.travis.yml +++ b/.travis.yml @@ -9,9 +9,6 @@ matrix: - php: 7.2 env: - dropPlatform=false - - php: nightly - env: - - dropPlatform=false - php: 7.0 env: - dependencies=lowest @@ -22,10 +19,6 @@ matrix: env: - dependencies=lowest - dropPlatform=false - - php: nightly - env: - - dependencies=lowest - - dropPlatform=false - php: 7.0 env: - dependencies=highest @@ -36,22 +29,19 @@ matrix: env: - dependencies=highest - dropPlatform=false - - php: nightly - env: - - dependencies=highest - - dropPlatform=false - php: 7.3 env: - dependencies=highest - php: 7.3 env: - dependencies=lowest - - php: 7.4 - env: - - dependencies=highest - - php: 7.4 - env: - - dependencies=lowest +# php 7.4 causes error with: "Function ReflectionType::__toString() is deprecated" in several tests +# - php: 7.4 +# env: +# - dependencies=highest +# - php: 7.4 +# env: +# - dependencies=lowest before_script: composer install From c4ebbed429a8e2931a0252c73e912d4792abe217 Mon Sep 17 00:00:00 2001 From: Matt Bonneau Date: Fri, 10 Dec 2021 11:57:03 -0500 Subject: [PATCH 4/4] Delete .travis.yml --- .travis.yml | 55 ----------------------------------------------------- 1 file changed, 55 deletions(-) delete mode 100644 .travis.yml diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index ff7262a7..00000000 --- a/.travis.yml +++ /dev/null @@ -1,55 +0,0 @@ -language: php - -matrix: - include: - - php: 7.0 - env: - - qaExtended=true - - php: 7.1 - - php: 7.2 - env: - - dropPlatform=false - - php: 7.0 - env: - - dependencies=lowest - - php: 7.1 - env: - - dependencies=lowest - - php: 7.2 - env: - - dependencies=lowest - - dropPlatform=false - - php: 7.0 - env: - - dependencies=highest - - php: 7.1 - env: - - dependencies=highest - - php: 7.2 - env: - - dependencies=highest - - dropPlatform=false - - php: 7.3 - env: - - dependencies=highest - - php: 7.3 - env: - - dependencies=lowest -# php 7.4 causes error with: "Function ReflectionType::__toString() is deprecated" in several tests -# - php: 7.4 -# env: -# - dependencies=highest -# - php: 7.4 -# env: -# - dependencies=lowest - -before_script: composer install - -script: - - mkdir -p build/logs - - vendor/bin/phpunit - - php demo/test.php - - php docs/build-docs.php lint - -after_success: - - travis_retry php vendor/bin/coveralls -v