Skip to content

Commit 4477b9d

Browse files
authored
RUST-1425 Clarify test event handling (#1188)
1 parent babe981 commit 4477b9d

File tree

18 files changed

+197
-218
lines changed

18 files changed

+197
-218
lines changed

src/client/session/test.rs

+8-8
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ async fn cluster_time_in_commands() {
244244
F: Fn(Client) -> G,
245245
G: Future<Output = Result<R>>,
246246
{
247-
let mut subscriber = event_buffer.subscribe();
247+
let mut event_stream = event_buffer.stream();
248248

249249
operation(client.clone())
250250
.await
@@ -254,8 +254,8 @@ async fn cluster_time_in_commands() {
254254
.await
255255
.expect("operation should succeed");
256256

257-
let (first_command_started, first_command_succeeded) = subscriber
258-
.wait_for_successful_command_execution(Duration::from_secs(5), command_name)
257+
let (first_command_started, first_command_succeeded) = event_stream
258+
.next_successful_command_execution(Duration::from_secs(5), command_name)
259259
.await
260260
.unwrap_or_else(|| {
261261
panic!(
@@ -270,8 +270,8 @@ async fn cluster_time_in_commands() {
270270
.get("$clusterTime")
271271
.expect("should get cluster time from command response");
272272

273-
let (second_command_started, _) = subscriber
274-
.wait_for_successful_command_execution(Duration::from_secs(5), command_name)
273+
let (second_command_started, _) = event_stream
274+
.next_successful_command_execution(Duration::from_secs(5), command_name)
275275
.await
276276
.unwrap_or_else(|| {
277277
panic!(
@@ -311,13 +311,13 @@ async fn cluster_time_in_commands() {
311311
}
312312
}
313313

314-
let mut subscriber = buffer.subscribe();
314+
let mut event_stream = buffer.stream();
315315

316316
let client = Client::with_options(options).unwrap();
317317

318318
// Wait for initial monitor check to complete and discover the server.
319-
subscriber
320-
.wait_for_event(Duration::from_secs(5), |event| match event {
319+
event_stream
320+
.next_match(Duration::from_secs(5), |event| match event {
321321
Event::Sdam(SdamEvent::ServerDescriptionChanged(e)) => {
322322
!e.previous_description.server_type().is_available()
323323
&& e.new_description.server_type().is_available()

src/cmap/test.rs

+15-10
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ impl Executor {
146146
}
147147

148148
async fn execute_test(self) {
149-
let mut subscriber = self.state.events.subscribe();
149+
let mut event_stream = self.state.events.stream();
150150

151151
let (updater, mut receiver) = TopologyUpdater::channel();
152152

@@ -202,8 +202,8 @@ impl Executor {
202202
let description = self.description;
203203
let filter = |e: &CmapEvent| !ignored_event_names.iter().any(|name| e.name() == name);
204204
for expected_event in self.events {
205-
let actual_event = subscriber
206-
.wait_for_event(EVENT_TIMEOUT, filter)
205+
let actual_event = event_stream
206+
.next_match(EVENT_TIMEOUT, filter)
207207
.await
208208
.unwrap_or_else(|| {
209209
panic!(
@@ -214,7 +214,12 @@ impl Executor {
214214
assert_matches(&actual_event, &expected_event, Some(description.as_str()));
215215
}
216216

217-
assert_eq!(subscriber.all(filter), Vec::new(), "{}", description);
217+
assert_eq!(
218+
event_stream.collect_now(filter),
219+
Vec::new(),
220+
"{}",
221+
description
222+
);
218223
}
219224
}
220225

@@ -262,16 +267,16 @@ impl Operation {
262267
}
263268
}
264269
Operation::CheckIn { connection } => {
265-
let mut subscriber = state.events.subscribe();
270+
let mut event_stream = state.events.stream();
266271
let conn = state.connections.write().await.remove(&connection).unwrap();
267272
let id = conn.id;
268273
// connections are checked in via tasks spawned in their drop implementation,
269274
// they are not checked in explicitly.
270275
drop(conn);
271276

272277
// wait for event to be emitted to ensure check in has completed.
273-
subscriber
274-
.wait_for_event(EVENT_TIMEOUT, |e| {
278+
event_stream
279+
.next_match(EVENT_TIMEOUT, |e| {
275280
matches!(e, CmapEvent::ConnectionCheckedIn(event) if event.connection_id == id)
276281
})
277282
.await
@@ -300,14 +305,14 @@ impl Operation {
300305
}
301306
}
302307
Operation::Close => {
303-
let mut subscriber = state.events.subscribe();
308+
let mut event_stream = state.events.stream();
304309

305310
// pools are closed via their drop implementation
306311
state.pool.write().await.take();
307312

308313
// wait for event to be emitted to ensure drop has completed.
309-
subscriber
310-
.wait_for_event(EVENT_TIMEOUT, |e| matches!(e, CmapEvent::PoolClosed(_)))
314+
event_stream
315+
.next_match(EVENT_TIMEOUT, |e| matches!(e, CmapEvent::PoolClosed(_)))
311316
.await
312317
.expect("did not receive ConnectionPoolClosed event after closing pool");
313318
}

src/cmap/test/integration.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ async fn connection_error_during_establishment() {
199199

200200
let buffer = EventBuffer::<CmapEvent>::new();
201201

202-
let mut subscriber = buffer.subscribe();
202+
let mut event_stream = buffer.stream();
203203

204204
let mut options = ConnectionPoolOptions::from_client_options(&client_options);
205205
options.ready = Some(true);
@@ -215,8 +215,8 @@ async fn connection_error_during_establishment() {
215215

216216
pool.check_out().await.expect_err("check out should fail");
217217

218-
subscriber
219-
.wait_for_event(EVENT_TIMEOUT, |e| match e {
218+
event_stream
219+
.next_match(EVENT_TIMEOUT, |e| match e {
220220
CmapEvent::ConnectionClosed(event) => {
221221
event.connection_id == 1 && event.reason == ConnectionClosedReason::Error
222222
}
@@ -249,16 +249,16 @@ async fn connection_error_during_operation() {
249249
FailPoint::fail_command(&["ping"], FailPointMode::Times(10)).close_connection(true);
250250
let _guard = client.enable_fail_point(fail_point).await.unwrap();
251251

252-
let mut subscriber = buffer.subscribe();
252+
let mut event_stream = buffer.stream();
253253

254254
client
255255
.database("test")
256256
.run_command(doc! { "ping": 1 })
257257
.await
258258
.expect_err("ping should fail due to fail point");
259259

260-
subscriber
261-
.wait_for_event(EVENT_TIMEOUT, |e| match e {
260+
event_stream
261+
.next_match(EVENT_TIMEOUT, |e| match e {
262262
CmapEvent::ConnectionClosed(event) => {
263263
event.connection_id == 1 && event.reason == ConnectionClosedReason::Error
264264
}

src/sdam/description/topology/server_selection/test/in_window.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ async fn load_balancing_test() {
221221
.build()
222222
.await;
223223

224-
let mut subscriber = client.events.subscribe_all();
224+
let mut subscriber = client.events.stream_all();
225225

226226
// wait for both servers pools to be saturated.
227227
for address in hosts {
@@ -243,7 +243,7 @@ async fn load_balancing_test() {
243243
let mut conns = 0;
244244
while conns < max_pool_size * 2 {
245245
subscriber
246-
.wait_for_event(Duration::from_secs(30), |event| {
246+
.next_match(Duration::from_secs(30), |event| {
247247
matches!(event, Event::Cmap(CmapEvent::ConnectionReady(_)))
248248
})
249249
.await

src/sdam/description/topology/test/sdam.rs

+10-10
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ async fn run_test(test_file: TestFile) {
277277
options.sdam_event_handler = Some(buffer.handler());
278278
options.test_options_mut().disable_monitoring_threads = true;
279279

280-
let mut event_subscriber = buffer.subscribe();
280+
let mut event_stream = buffer.stream();
281281
let mut topology = Topology::new(options.clone()).unwrap();
282282

283283
for (i, phase) in test_file.phases.into_iter().enumerate() {
@@ -374,8 +374,8 @@ async fn run_test(test_file: TestFile) {
374374
);
375375
}
376376
Outcome::Events(EventsOutcome { events: expected }) => {
377-
let actual = event_subscriber
378-
.collect_events(Duration::from_millis(500), |e| matches!(e, Event::Sdam(_)))
377+
let actual = event_stream
378+
.collect(Duration::from_millis(500), |e| matches!(e, Event::Sdam(_)))
379379
.await
380380
.into_iter()
381381
.map(|e| e.unwrap_sdam_event());
@@ -598,7 +598,7 @@ async fn topology_closed_event_last() {
598598
.await;
599599
let events = client.events.clone();
600600

601-
let mut subscriber = events.subscribe_all();
601+
let mut subscriber = events.stream_all();
602602

603603
client
604604
.database(function_name!())
@@ -609,15 +609,15 @@ async fn topology_closed_event_last() {
609609
drop(client);
610610

611611
subscriber
612-
.wait_for_event(Duration::from_millis(1000), |event| {
612+
.next_match(Duration::from_millis(1000), |event| {
613613
matches!(event, Event::Sdam(SdamEvent::TopologyClosed(_)))
614614
})
615615
.await
616616
.expect("should see topology closed event");
617617

618618
// no further SDAM events should be emitted after the TopologyClosedEvent
619619
let event = subscriber
620-
.wait_for_event(Duration::from_millis(1000), |event| {
620+
.next_match(Duration::from_millis(1000), |event| {
621621
matches!(event, Event::Sdam(_))
622622
})
623623
.await;
@@ -643,22 +643,22 @@ async fn heartbeat_events() {
643643
.build()
644644
.await;
645645

646-
let mut subscriber = client.events.subscribe_all();
646+
let mut subscriber = client.events.stream_all();
647647

648648
if client.is_load_balanced() {
649649
log_uncaptured("skipping heartbeat_events tests due to load-balanced topology");
650650
return;
651651
}
652652

653653
subscriber
654-
.wait_for_event(Duration::from_millis(500), |event| {
654+
.next_match(Duration::from_millis(500), |event| {
655655
matches!(event, Event::Sdam(SdamEvent::ServerHeartbeatStarted(_)))
656656
})
657657
.await
658658
.expect("should see server heartbeat started event");
659659

660660
subscriber
661-
.wait_for_event(Duration::from_millis(500), |event| {
661+
.next_match(Duration::from_millis(500), |event| {
662662
matches!(event, Event::Sdam(SdamEvent::ServerHeartbeatSucceeded(_)))
663663
})
664664
.await
@@ -681,7 +681,7 @@ async fn heartbeat_events() {
681681
let _guard = fp_client.enable_fail_point(fail_point).await.unwrap();
682682

683683
subscriber
684-
.wait_for_event(Duration::from_millis(500), |event| {
684+
.next_match(Duration::from_millis(500), |event| {
685685
matches!(event, Event::Sdam(SdamEvent::ServerHeartbeatFailed(_)))
686686
})
687687
.await

src/sdam/test.rs

+14-16
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ async fn sdam_pool_management() {
103103
.build()
104104
.await;
105105

106-
let mut subscriber = client.events.subscribe_all();
106+
let mut subscriber = client.events.stream_all();
107107

108108
if !VersionReq::parse(">= 4.2.9")
109109
.unwrap()
@@ -116,14 +116,14 @@ async fn sdam_pool_management() {
116116
}
117117

118118
subscriber
119-
.wait_for_event(Duration::from_millis(500), |event| {
119+
.next_match(Duration::from_millis(500), |event| {
120120
matches!(event, Event::Cmap(CmapEvent::PoolReady(_)))
121121
})
122122
.await
123123
.expect("should see pool ready event");
124124

125125
subscriber
126-
.wait_for_event(Duration::from_millis(500), |event| {
126+
.next_match(Duration::from_millis(500), |event| {
127127
matches!(event, Event::Sdam(SdamEvent::ServerHeartbeatSucceeded(_)))
128128
})
129129
.await
@@ -143,9 +143,7 @@ async fn sdam_pool_management() {
143143

144144
// Since there is no deterministic ordering, simply collect all the events and check for their
145145
// presence.
146-
let events = subscriber
147-
.collect_events(Duration::from_secs(1), |_| true)
148-
.await;
146+
let events = subscriber.collect(Duration::from_secs(1), |_| true).await;
149147
assert!(events
150148
.iter()
151149
.any(|e| matches!(e, Event::Sdam(SdamEvent::ServerHeartbeatFailed(_)))));
@@ -186,7 +184,7 @@ async fn hello_ok_true() {
186184

187185
let buffer = EventBuffer::new();
188186

189-
let mut subscriber = buffer.subscribe();
187+
let mut event_stream = buffer.stream();
190188

191189
let mut options = setup_client_options.clone();
192190
options.sdam_event_handler = Some(buffer.handler());
@@ -195,8 +193,8 @@ async fn hello_ok_true() {
195193
let _client = Client::with_options(options).expect("client creation should succeed");
196194

197195
// first heartbeat should be legacy hello but contain helloOk
198-
subscriber
199-
.wait_for_event(Duration::from_millis(2000), |event| {
196+
event_stream
197+
.next_match(Duration::from_millis(2000), |event| {
200198
if let Event::Sdam(SdamEvent::ServerHeartbeatSucceeded(e)) = event {
201199
assert_eq!(e.reply.get_bool("helloOk"), Ok(true));
202200
assert!(e.reply.get(LEGACY_HELLO_COMMAND_NAME_LOWERCASE).is_some());
@@ -210,8 +208,8 @@ async fn hello_ok_true() {
210208

211209
// subsequent heartbeats should just be hello
212210
for _ in 0..3 {
213-
subscriber
214-
.wait_for_event(Duration::from_millis(2000), |event| {
211+
event_stream
212+
.next_match(Duration::from_millis(2000), |event| {
215213
if let Event::Sdam(SdamEvent::ServerHeartbeatSucceeded(e)) = event {
216214
assert!(e.reply.get("isWritablePrimary").is_some());
217215
assert!(e.reply.get(LEGACY_HELLO_COMMAND_NAME_LOWERCASE).is_none());
@@ -270,13 +268,13 @@ async fn removed_server_monitor_stops() -> crate::error::Result<()> {
270268
let hosts = options.hosts.clone();
271269
let set_name = options.repl_set_name.clone().unwrap();
272270

273-
let mut subscriber = buffer.subscribe();
271+
let mut event_stream = buffer.stream();
274272
let topology = Topology::new(options)?;
275273

276274
// Wait until all three monitors have started.
277275
let mut seen_monitors = HashSet::new();
278-
subscriber
279-
.wait_for_event(Duration::from_millis(500), |event| {
276+
event_stream
277+
.next_match(Duration::from_millis(500), |event| {
280278
if let Event::Sdam(SdamEvent::ServerHeartbeatStarted(e)) = event {
281279
seen_monitors.insert(e.server_address.clone());
282280
}
@@ -315,13 +313,13 @@ async fn removed_server_monitor_stops() -> crate::error::Result<()> {
315313
))
316314
.await;
317315

318-
subscriber.wait_for_event(Duration::from_secs(1), |event| {
316+
event_stream.next_match(Duration::from_secs(1), |event| {
319317
matches!(event, Event::Sdam(SdamEvent::ServerClosed(e)) if e.address == hosts[2])
320318
}).await.expect("should see server closed event");
321319

322320
// Capture heartbeat events for 1 second. The monitor for the removed server should stop
323321
// publishing them.
324-
let events = subscriber.collect_events(Duration::from_secs(1), |event| {
322+
let events = event_stream.collect(Duration::from_secs(1), |event| {
325323
matches!(event, Event::Sdam(SdamEvent::ServerHeartbeatStarted(e)) if e.server_address == hosts[2])
326324
}).await;
327325

0 commit comments

Comments
 (0)