@@ -827,7 +827,135 @@ For now, see [https://github.com/wicg/observable#operators](https://github.com/w
827
827
<div algorithm>
828
828
The <dfn for=Observable method><code>flatMap(|mapper|)</code></dfn> method steps are:
829
829
830
- 1. <span class=XXX> TODO: Spec this and use |mapper|.</span>
830
+ 1. Let |sourceObservable| be [=this=] .
831
+
832
+ 1. Let |observable| be a [=new=] {{Observable}} whose [=Observable/subscribe callback=] is an
833
+ algorithm that takes a {{Subscriber}} |subscriber| and does the following:
834
+
835
+ 1. Let |outerSubscriptionHasCompleted| to a [=boolean=] , initially false.
836
+
837
+ 1. Let |queue| be a new [=list=] of {{any}} values, initially empty.
838
+
839
+ Note: This |queue| is used to store any {{Observable}} s emitted by |sourceObservable|,
840
+ while |observable| is currently subscribed to an {{Observable}} emitted earlier by
841
+ |sourceObservable| that has not yet been exhausted.
842
+
843
+ 1. Let |activeInnerSubscription| be a [=boolean=] , initially false.
844
+
845
+ 1. Let |sourceObserver| be a new [=internal observer=] , initialized as follows:
846
+
847
+ : [=internal observer/next steps=]
848
+ :: 1. If |activeInnerSubscription| is true, then:
849
+
850
+ 1. [=list/Append=] |value| to |queue|.
851
+
852
+ Note: This |value| will eventually be processed once the {{Observable}} that is
853
+ currently subscribed-to (as indicated by |activeInnerSubscription|) is exhausted.
854
+
855
+ 1. Otherwise:
856
+
857
+ 1. Set |activeInnerSubscription| to true.
858
+
859
+ 1. Run the [=flatmap process next value steps=] with |value|, |subscriber|,
860
+ |mapper|, and <b> references</b> to all of the following: |queue|,
861
+ |activeInnerSubscription|, and |outerSubscriptionHasCompleted|.
862
+
863
+ <div class=note>
864
+ <p> Note: This [=flatmap process next value steps=] will subscribe to the
865
+ {{Observable}} derived from |value| (if one such can be derived) and keep
866
+ processing values from it until its subscription becomes inactive (either by
867
+ error or completion). If this "inner" {{Observable}} completes, then the
868
+ processing steps will recursively invoke themselves with the next {{any}} in
869
+ |queue|.</p>
870
+
871
+ <p> If no such value [=list/exists=] , then the processing steps will terminate,
872
+ <b> unsetting</b> |activeInnerSubscription|, so that future values emitted from
873
+ |sourceObservable| are processed correctly.</p>
874
+ </div>
875
+
876
+ : [=internal observer/error steps=]
877
+ :: Run |subscriber|'s {{Subscriber/error()}} method, given the passed in <var
878
+ ignore> error</var> .
879
+
880
+ : [=internal observer/complete steps=]
881
+ :: 1. Set |outerSubscriptionHasCompleted| to true.
882
+
883
+ Note: If |activeInnerSubscription| is true, then the below step will *not* complete
884
+ |subscriber|. In that case, the [=flatmap process next value steps=] will be
885
+ responsible for completing |subscriber| when |queue| is [=list/empty=] , after the
886
+ "inner" subscription becomes inactive.
887
+
888
+ 1. If |activeInnerSubscription| is false and |queue| is [=list/empty=] , run
889
+ |subscriber|'s {{Subscriber/complete()}} method.
890
+
891
+ 1. Let |options| be a new {{SubscribeOptions}} whose {{SubscribeOptions/signal}} is
892
+ |subscriber|'s [=Subscriber/signal=] .
893
+
894
+ 1. <a for=Observable lt="subscribe to an Observable">Subscribe</a> to |sourceObservable|
895
+ given |sourceObserver| and |options|.
896
+
897
+ 1. Return |observable|.
898
+ </div>
899
+
900
+ <div algorithm>
901
+ The <dfn>flatmap process next value steps</dfn> , given an {{any}} |value|, a {{Subscriber}}
902
+ |subscriber|, a {{Mapper}} |mapper|, and <b> references</b> to all of the following: a [=list=] of
903
+ {{any}} values |queue|, a [=boolean=] |activeInnerSubscription|, and a [=boolean=]
904
+ |outerSubscriptionHasCompleted|:
905
+
906
+ 1. Let |mappedResult| be the result of [=invoking=] |mapper| with |value|.
907
+
908
+ If <a spec=webidl lt="an exception was thrown">an exception |E| was thrown</a> , then run
909
+ |subscriber|'s {{Subscriber/error()}} method, given |E|, and abort these steps.
910
+
911
+ 1. Let |innerObservable| be the result of calling {{Observable/from()}} with |mappedResult|.
912
+
913
+ If <a spec=webidl lt="an exception was thrown">an exception |E| was thrown</a> , then run
914
+ |subscriber|'s {{Subscriber/error()}} method, given |E|, and abort these steps.
915
+
916
+ Issue: We shouldn't invoke {{Observable/from()}} directly. Rather, we should
917
+ call some internal algorithm that passes-back the exceptions for us to handle
918
+ properly here, since we want to pipe them to |subscriber|.
919
+
920
+ 1. Let |innerObserver| be a new [=internal observer=] , initialized as follows:
921
+
922
+ : [=internal observer/next steps=]
923
+ :: Run |subscriber|'s {{Subscriber/next()}} method, given the passed in |value|.
924
+
925
+ : [=internal observer/error steps=]
926
+ :: Run |subscriber|'s {{Subscriber/error()}} method, given the passed in <var
927
+ ignore> error</var> .
928
+
929
+ : [=internal observer/complete steps=]
930
+ :: 1. If |queue| is not empty, then:
931
+
932
+ 1. Let |nextValue| be the first item in |queue|; [=list/remove=] remove this item from
933
+ |queue|.
934
+
935
+ 1. Run [=flatmap process next value steps=] given |nextValue|, |subscriber|, |mapper|,
936
+ and <b> references</b> to |queue| and |activeInnerSubscription|.
937
+
938
+ 1. Otherwise:
939
+
940
+ 1. Set |activeInnerSubscription| to false.
941
+
942
+ Note: Because |activeInnerSubscription| is a reference, this has the effect of
943
+ ensuring that all subsequent values emitted from the "outer" {{Observable}} (called
944
+ <var ignore> sourceObservable</var> .
945
+
946
+ 1. If |outerSubscriptionHasCompleted| is true, run |subscriber|'s
947
+ {{Subscriber/complete()}} method.
948
+
949
+ Note: This means the "outer" {{Observable}} has already completed, but did not
950
+ proceed to complete |subscriber| yet because there was at least one more pending
951
+ "inner" {{Observable}} (i.e., |innerObservable|) that had already been queued and
952
+ had not yet completed. Until right now!
953
+
954
+ 1. Let |innerOptions| be a new {{SubscribeOptions}} whose {{SubscribeOptions/signal}} is
955
+ |subscriber|'s [=Subscriber/signal=] .
956
+
957
+ 1. <a for=Observable lt="subscribe to an Observable">Subscribe</a> to |innerObservable| given
958
+ |innerObserver| and |innerOptions|.
831
959
</div>
832
960
833
961
<div algorithm>
0 commit comments