From 5fb0fc409e8fe9b7e48e54bb3cf496ae6567c477 Mon Sep 17 00:00:00 2001
From: Jonathan Vusich <31666175+JonathanVusich@users.noreply.github.com>
Date: Tue, 15 Oct 2019 22:23:00 -0500
Subject: [PATCH 1/3] Updated ObservableListSource to include indexes from list
 changes

---
 .../sources/ObservableListSource.java         | 68 ++++++++++++++-----
 1 file changed, 51 insertions(+), 17 deletions(-)

diff --git a/src/main/java/io/reactivex/rxjavafx/sources/ObservableListSource.java b/src/main/java/io/reactivex/rxjavafx/sources/ObservableListSource.java
index 53ea5ae..11b611b 100644
--- a/src/main/java/io/reactivex/rxjavafx/sources/ObservableListSource.java
+++ b/src/main/java/io/reactivex/rxjavafx/sources/ObservableListSource.java
@@ -103,14 +103,20 @@ public static <T> Observable<ListChange<T>> fromObservableListChanges(final Obse
             ListChangeListener<T> listener = c -> {
                 while (c.next()) {
                     if (c.wasAdded()) {
-                        c.getAddedSubList().forEach(v -> subscriber.onNext(ListChange.of(v,Flag.ADDED)));
+                        for (int i = c.getFrom(); i < c.getTo(); i++) {
+                            subscriber.onNext(ListChange.of(c.getList().get(i), Flag.ADDED, i));
+                        }
                     }
                     if (c.wasRemoved()) {
-                        c.getRemoved().forEach(v -> subscriber.onNext(ListChange.of(v,Flag.REMOVED)));
+                        int removedIdx = 0;
+                        for (int i = c.getFrom(); i < c.getTo() + 1; i++) {
+                            subscriber.onNext(ListChange.of(c.getRemoved().get(removedIdx), Flag.REMOVED, i));
+                            removedIdx++;
+                        }
                     }
                     if (c.wasUpdated()) {
                         for (int i = c.getFrom(); i < c.getTo(); i++) {
-                            subscriber.onNext(ListChange.of(c.getList().get(i),Flag.UPDATED));
+                            subscriber.onNext(ListChange.of(c.getList().get(i), Flag.UPDATED, i));
                         }
                     }
                 }
@@ -132,12 +138,22 @@ public static <T> Observable<ListChange<T>> fromObservableListDistinctChanges(fi
 
                 while (c.next()) {
                     if (c.wasAdded()) {
-                        c.getAddedSubList().stream().filter(v -> dupeCounter.add(v) == 1)
-                                .forEach(v -> subscriber.onNext(ListChange.of(v,Flag.ADDED)));
+                        for (int i = c.getFrom(); i < c.getTo(); i++) {
+                            var item = c.getList().get(i);
+                            if (dupeCounter.add(item) == 1) {
+                                subscriber.onNext(ListChange.of(item, Flag.ADDED, i));
+                            }
+                        }
                     }
                     if (c.wasRemoved()) {
-                        c.getRemoved().stream().filter(v -> dupeCounter.remove(v) == 0)
-                                .forEach(v -> subscriber.onNext(ListChange.of(v,Flag.REMOVED)));
+                        int removedIdx = 0;
+                        for (int i = c.getFrom(); i < c.getTo() + 1; i++) {
+                            var item = c.getRemoved().get(removedIdx);
+                            if (dupeCounter.remove(item) == 0) {
+                                subscriber.onNext(ListChange.of(item, Flag.REMOVED, i));
+                            }
+                            removedIdx++;
+                        }
                     }
                 }
             };
@@ -157,12 +173,22 @@ public static <T,R> Observable<ListChange<T>> fromObservableListDistinctChanges(
 
                 while (c.next()) {
                     if (c.wasAdded()) {
-                        c.getAddedSubList().stream().filter(v -> dupeCounter.add(mapper.apply(v)) == 1)
-                                .forEach(v -> subscriber.onNext(ListChange.of(v,Flag.ADDED)));
+                        for (int i = c.getFrom(); i < c.getTo(); i++) {
+                            var item = c.getList().get(i);
+                            if (dupeCounter.add(mapper.apply(item)) == 1) {
+                                subscriber.onNext(ListChange.of(item, Flag.ADDED, i));
+                            }
+                        }
                     }
                     if (c.wasRemoved()) {
-                        c.getRemoved().stream().filter(v -> dupeCounter.remove(mapper.apply(v)) == 0)
-                                .forEach(v -> subscriber.onNext(ListChange.of(v,Flag.REMOVED)));
+                        int removedIdx = 0;
+                        for (int i = c.getFrom(); i < c.getTo() + 1; i++) {
+                            var item = c.getRemoved().get(removedIdx);
+                            if (dupeCounter.remove(mapper.apply(item)) == 0) {
+                                subscriber.onNext(ListChange.of(item, Flag.REMOVED, i));
+                            }
+                            removedIdx++;
+                        }
                     }
                 }
             };
@@ -181,14 +207,22 @@ public static <T,R> Observable<ListChange<R>> fromObservableListDistinctMappings
 
                 while (c.next()) {
                     if (c.wasAdded()) {
-                        c.getAddedSubList().stream().map(mapper)
-                                .filter(v -> dupeCounter.add(v) == 1)
-                                .forEach(v -> subscriber.onNext(ListChange.of(v,Flag.ADDED)));
+                        for (int i = c.getFrom(); i < c.getTo(); i++) {
+                            var item = mapper.apply(c.getList().get(i));
+                            if (dupeCounter.add(item) == 1) {
+                                subscriber.onNext(ListChange.of(item, Flag.ADDED, i));
+                            }
+                        }
                     }
                     if (c.wasRemoved()) {
-                        c.getRemoved().stream().map(mapper)
-                                .filter(v -> dupeCounter.remove(v) == 0)
-                                .forEach(v -> subscriber.onNext(ListChange.of(v,Flag.REMOVED)));
+                        int removedIdx = 0;
+                        for (int i = c.getFrom(); i < c.getTo() + 1; i++) {
+                            var item = mapper.apply(c.getRemoved().get(removedIdx));
+                            if (dupeCounter.remove(item) == 0) {
+                                subscriber.onNext(ListChange.of(item, Flag.REMOVED, i));
+                            }
+                            removedIdx++;
+                        }
                     }
                 }
             };

From d7a35fed57140ae0f92b21ff153d33d152b2daad Mon Sep 17 00:00:00 2001
From: Jonathan Vusich <31666175+JonathanVusich@users.noreply.github.com>
Date: Tue, 15 Oct 2019 22:23:31 -0500
Subject: [PATCH 2/3] Added index field to ListChange

Also implemented hashcode + equals for testing purposes.
---
 .../rxjavafx/sources/ListChange.java          | 31 ++++++++++++++++---
 1 file changed, 27 insertions(+), 4 deletions(-)

diff --git a/src/main/java/io/reactivex/rxjavafx/sources/ListChange.java b/src/main/java/io/reactivex/rxjavafx/sources/ListChange.java
index 8baf9af..83abf29 100644
--- a/src/main/java/io/reactivex/rxjavafx/sources/ListChange.java
+++ b/src/main/java/io/reactivex/rxjavafx/sources/ListChange.java
@@ -15,6 +15,8 @@
  */
 package io.reactivex.rxjavafx.sources;
 
+import java.util.Objects;
+
 /**
  * Holds an ADDED, REMOVED, or UPDATED flag with the associated value
  * @param <T>
@@ -22,13 +24,15 @@
 public final class ListChange<T> {
     private final T value;
     private final Flag flag;
+    private final int index;
 
-    private ListChange(T value, Flag flag) {
+    private ListChange(T value, Flag flag, int index) {
         this.value = value;
         this.flag = flag;
+        this.index = index;
     }
-    public static <T> ListChange<T> of(T value, Flag flag) {
-        return new ListChange<>(value, flag);
+    public static <T> ListChange<T> of(T value, Flag flag, int index) {
+        return new ListChange<>(value, flag, index);
     }
     public T getValue() {
         return value;
@@ -36,8 +40,27 @@ public T getValue() {
     public Flag getFlag() {
         return flag;
     }
+    public int getIndex() {
+        return index;
+    }
+
     @Override
     public String toString() {
-        return flag + " " + value;
+        return flag + " " + value + " " + index;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        ListChange<?> that = (ListChange<?>) o;
+        return index == that.index &&
+                value.equals(that.value) &&
+                flag == that.flag;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(value, flag, index);
     }
 }

From 9eb73a172ba0f0274bdd24df5bffb5b8e8e6b1b5 Mon Sep 17 00:00:00 2001
From: Jonathan Vusich <31666175+JonathanVusich@users.noreply.github.com>
Date: Tue, 15 Oct 2019 22:23:51 -0500
Subject: [PATCH 3/3] Modify tests to check for indexes

---
 .../sources/JavaFxObservableTest.java         | 96 +++++++++----------
 1 file changed, 46 insertions(+), 50 deletions(-)

diff --git a/src/test/java/io/reactivex/rxjavafx/sources/JavaFxObservableTest.java b/src/test/java/io/reactivex/rxjavafx/sources/JavaFxObservableTest.java
index 1ede5f0..44227a2 100644
--- a/src/test/java/io/reactivex/rxjavafx/sources/JavaFxObservableTest.java
+++ b/src/test/java/io/reactivex/rxjavafx/sources/JavaFxObservableTest.java
@@ -32,6 +32,7 @@
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 
+import static junit.framework.TestCase.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 public final class JavaFxObservableTest {
@@ -180,31 +181,28 @@ public void testRxObservableListRemoves() {
 
     @Test
     public void testRxObservableListChanges() {
-        //new JFXPanel()();
 
         ObservableList<String> sourceList = FXCollections.observableArrayList();
         Observable<ListChange<String>> emissions = JavaFxObservable.changesOf(sourceList);
 
         CountDownLatch gate = new CountDownLatch(1);
 
-        class FlagAndCount {
-            final Flag flag;
-            final long count;
-            FlagAndCount(Flag flag, long count) {
-                this.flag = flag;
-                this.count = count;
-            }
-
-        }
+        var expected = List.of(
+                ListChange.of("Alpha", Flag.ADDED, 0),
+                ListChange.of("Beta", Flag.ADDED, 1),
+                ListChange.of("Alpha", Flag.REMOVED, 0),
+                ListChange.of("Gamma", Flag.ADDED, 1),
+                ListChange.of("Gamma", Flag.REMOVED, 1),
+                ListChange.of("Epsilon", Flag.ADDED, 0)
+        );
         emissions.observeOn(Schedulers.io())
                 .observeOn(JavaFxScheduler.platform())
-                .take(5)
-                .groupBy(ListChange::getFlag)
-                .flatMapSingle(grp -> grp.count().map(ct -> new FlagAndCount(grp.getKey(),ct)))
-                .subscribe(l -> {
-                    if (l.flag.equals(Flag.ADDED)) { assertTrue(l.count == 3); }
-                    if (l.flag.equals(Flag.REMOVED)) { assertTrue(l.count == 2); }
-                },Throwable::printStackTrace,gate::countDown);
+                .take(6)
+                .toList()
+                .toObservable()
+                .subscribe(l -> assertEquals(expected, l),
+                                Throwable::printStackTrace,
+                                gate::countDown);
 
         Platform.runLater(() -> {
             sourceList.add("Alpha");
@@ -212,6 +210,7 @@ class FlagAndCount {
             sourceList.remove("Alpha");
             sourceList.add("Gamma");
             sourceList.remove("Gamma");
+            sourceList.add(0, "Epsilon");
         });
 
         try {
@@ -230,24 +229,21 @@ public void testRxObservableListDistinctChangeMappings() {
 
         CountDownLatch gate = new CountDownLatch(1);
 
-        class FlagAndCount {
-            final Flag flag;
-            final long count;
-            FlagAndCount(Flag flag, long count) {
-                this.flag = flag;
-                this.count = count;
-            }
-
-        }
+        var expected = List.of(
+                ListChange.of(5, Flag.ADDED, 0),
+                ListChange.of(4, Flag.ADDED, 1),
+                ListChange.of(5, Flag.REMOVED, 1),
+                ListChange.of(7, Flag.ADDED, 1)
+                );
         emissions.observeOn(Schedulers.io())
                 .observeOn(JavaFxScheduler.platform())
-                .take(3)
-                .groupBy(ListChange::getFlag)
-                .flatMapSingle(grp -> grp.count().map(ct -> new FlagAndCount(grp.getKey(),ct)))
-                .subscribe(l -> {
-                    if (l.flag.equals(Flag.ADDED)) { assertTrue(l.count == 2); }
-                    if (l.flag.equals(Flag.REMOVED)) { assertTrue(l.count == 1); }
-                },Throwable::printStackTrace,gate::countDown);
+                .take(4)
+                .toList()
+                .toObservable()
+                .subscribe(l -> assertEquals(expected, l),
+                                Throwable::printStackTrace,
+                                gate::countDown
+                );
 
         Platform.runLater(() -> {
             sourceList.add("Alpha");
@@ -257,6 +253,7 @@ class FlagAndCount {
             sourceList.add("Gamma");
             sourceList.remove("Gamma");
             sourceList.remove("Alpha");
+            sourceList.add(1, "Epsilon");
         });
 
         try {
@@ -269,31 +266,29 @@ class FlagAndCount {
 
     @Test
     public void testRxObservableListDistinctChanges() {
-        //new JFXPanel()();
-
         ObservableList<String> sourceList = FXCollections.observableArrayList();
         Observable<ListChange<String>> emissions = JavaFxObservable.distinctChangesOf(sourceList);
 
         CountDownLatch gate = new CountDownLatch(1);
 
-        class FlagAndCount {
-            final Flag flag;
-            final long count;
-            FlagAndCount(Flag flag, long count) {
-                this.flag = flag;
-                this.count = count;
-            }
+        var expected = List.of(
+                ListChange.of("Alpha", Flag.ADDED, 0),
+                ListChange.of("Beta", Flag.ADDED, 1),
+                ListChange.of("Gamma", Flag.ADDED, 2),
+                ListChange.of("Gamma", Flag.REMOVED, 2),
+                ListChange.of("Alpha", Flag.REMOVED, 1),
+                ListChange.of("Epsilon", Flag.ADDED, 1)
+        );
 
-        }
         emissions.observeOn(Schedulers.io())
                 .observeOn(JavaFxScheduler.platform())
-                .take(5)
-                .groupBy(ListChange::getFlag)
-                .flatMapSingle(grp -> grp.count().map(ct -> new FlagAndCount(grp.getKey(),ct)))
-                .subscribe(l -> {
-                    if (l.flag.equals(Flag.ADDED)) { assertTrue(l.count == 3); }
-                    if (l.flag.equals(Flag.REMOVED)) { assertTrue(l.count == 2); }
-                },Throwable::printStackTrace,gate::countDown);
+                .take(6)
+                .toList()
+                .toObservable()
+                .subscribe(l -> assertEquals(expected, l),
+                                Throwable::printStackTrace,
+                                gate::countDown
+                );
 
         Platform.runLater(() -> {
             sourceList.add("Alpha");
@@ -303,6 +298,7 @@ class FlagAndCount {
             sourceList.add("Gamma");
             sourceList.remove("Gamma");
             sourceList.remove("Alpha");
+            sourceList.add(1, "Epsilon");
         });
 
         try {