1
- use std:: { collections :: HashMap , io, time:: Duration } ;
1
+ use std:: io, time:: Duration ;
2
2
3
3
use bytes:: { BufMut , BytesMut } ;
4
4
use corro_types:: {
@@ -160,7 +160,7 @@ pub async fn serve_follow(
160
160
} ;
161
161
162
162
debug ! ( "sending cleared version since from - {from_ts}" ) ;
163
- let mut last_empty_ts : HashMap < ActorId , Timestamp > = HashMap :: new ( ) ;
163
+
164
164
loop {
165
165
let conn = agent. pool ( ) . read ( ) . await ?;
166
166
@@ -175,7 +175,7 @@ pub async fn serve_follow(
175
175
let mut bk_prepped = conn. prepare_cached ( & format ! ( "SELECT actor_id, start_version, end_version, db_version, last_seq, ts
176
176
FROM __corro_bookkeeping WHERE (db_version IS NOT NULL AND db_version > ?)
177
177
OR (db_version IS NULL and ts > ?) {extra_where_clause}
178
- ORDER BY db_version ASC" ) ) ?;
178
+ ORDER BY db_version ASC, ts ASC " ) ) ?;
179
179
180
180
let map = |row : & Row | {
181
181
Ok ( (
@@ -220,7 +220,7 @@ pub async fn serve_follow(
220
220
let last_seq = last_seq. unwrap ( ) ;
221
221
let db_version: CrsqlDbVersion = db_version. unwrap ( ) ;
222
222
let mut prepped = conn. prepare_cached (
223
- "SELECT \" table\" , pk, cid, val, col_version, db_version, seq, site_id, cl FROM crsql_changes WHERE db_version = ? ORDER BY db_version ASC, seq ASC, ts ASC " ,
223
+ "SELECT \" table\" , pk, cid, val, col_version, db_version, seq, site_id, cl FROM crsql_changes WHERE db_version = ? ORDER BY db_version ASC, seq ASC" ,
224
224
) ?;
225
225
// implicit read transaction
226
226
let rows = prepped. query_map ( [ db_version] , row_to_change) ?;
0 commit comments