Skip to content

Commit b7ba556

Browse files
mbonneaudavidwdan
authored andcommittedOct 26, 2017
Reroute exceptions in onNext to onError by default (#193)
* Send exceptions in onNext to error handler by default * Verify proper disposal on onNext throw defaulting to onError
1 parent a5d7c1e commit b7ba556

File tree

3 files changed

+96
-2
lines changed

3 files changed

+96
-2
lines changed
 

‎src/Observable.php

+20-2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
use React\Promise\Deferred;
88
use React\Promise\PromiseInterface;
9+
use Rx\Disposable\EmptyDisposable;
910
use Rx\Observable\AnonymousObservable;
1011
use Rx\Observable\ArrayObservable;
1112
use Rx\Observable\ConnectableObservable;
@@ -97,9 +98,26 @@ public function subscribe($onNextOrObserver = null, callable $onError = null, ca
9798
throw new \InvalidArgumentException('The first argument needs to be a "callable" or "Observer"');
9899
}
99100

100-
$observer = new CallbackObserver($onNextOrObserver, $onError, $onCompleted);
101+
$disposable = new EmptyDisposable();
102+
103+
$observer = new CallbackObserver(
104+
$onNextOrObserver === null
105+
? null
106+
: function ($value) use ($onNextOrObserver, &$observer, &$disposable) {
107+
try {
108+
$onNextOrObserver($value);
109+
} catch (\Throwable $throwable) {
110+
$disposable->dispose();
111+
$observer->onError($throwable);
112+
}
113+
},
114+
$onError,
115+
$onCompleted
116+
);
117+
118+
$disposable = $this->_subscribe($observer);
101119

102-
return $this->_subscribe($observer);
120+
return $disposable;
103121
}
104122

105123
/**

‎test/Rx/Functional/ObservableTest.php

+53
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
<?php
2+
3+
namespace Rx\Functional;
4+
5+
use Rx\Observer\TestException;
6+
use Rx\Testing\MockObserver;
7+
8+
class ObservableTest extends FunctionalTestCase
9+
{
10+
public function testExceptionInOnNextByDefaultGoesToErrorAndDisposes()
11+
{
12+
$xs = $this->createHotObservable(
13+
[
14+
onNext(150, 1),
15+
onNext(210, 2),
16+
onNext(220, 3),
17+
onNext(230, 4),
18+
onCompleted(250)
19+
]);
20+
21+
$results = new MockObserver($this->scheduler);
22+
23+
$disposable = null;
24+
25+
$this->scheduler->scheduleAbsolute(200, function () use ($xs, $results, &$disposable) {
26+
$disposable = $xs->subscribe(
27+
function ($value) use ($results) {
28+
if ($value === 3) {
29+
throw new TestException();
30+
}
31+
$results->onNext($value);
32+
},
33+
[$results, 'onError'],
34+
[$results, 'onCompleted']
35+
);
36+
});
37+
38+
$this->scheduler->scheduleAbsolute(1000, function () use (&$disposable) {
39+
$disposable->dispose();
40+
});
41+
42+
$this->scheduler->start();
43+
44+
$this->assertMessages(
45+
[
46+
onNext(210, 2),
47+
onError(220, new TestException()),
48+
],
49+
$results->getMessages());
50+
51+
$this->assertSubscriptions([subscribe(200, 220)], $xs->getSubscriptions());
52+
}
53+
}

‎test/Rx/ObservableTest.php

+23
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
use Rx\Observable\EmptyObservable;
99
use Rx\Observable\RefCountObservable;
1010
use Rx\Observable\ReturnObservable;
11+
use Rx\Observer\TestException;
1112
use Rx\Scheduler\ImmediateScheduler;
1213
use Rx\Testing\TestScheduler;
1314

@@ -179,4 +180,26 @@ public function testSwitchLatestCallsSwitch()
179180

180181
$o->switchLatest();
181182
}
183+
184+
/**
185+
* @test
186+
*/
187+
public function it_sends_throwables_in_onnext_to_onerror()
188+
{
189+
$onNext = function ($x) {
190+
throw new TestException();
191+
};
192+
193+
$error = null;
194+
195+
Observable::of(0)
196+
->subscribe(
197+
$onNext,
198+
function (\Throwable $e) use (&$error) {
199+
$error = $e;
200+
}
201+
);
202+
203+
$this->assertInstanceOf(TestException::class, $error);
204+
}
182205
}

0 commit comments

Comments
 (0)
Please sign in to comment.