Skip to content

Commit 73a5a1d

Browse files
committed
fix: make handle_unilateral non-async
The check for `unsolicited.is_full()` at the beginning of `handle_unilateral` is not sufficient if the function is called from multiple threads parallel. This normally should not happen, but not guaranteed. Instead of checking if the channel is full in advance, use `tr_send` and ignore the error if the channel happens to be full when we try to send into it. We also ignore the error when the channel is closed instead of panic because the library should never panic.
1 parent ddbf1e9 commit 73a5a1d

File tree

5 files changed

+31
-44
lines changed

5 files changed

+31
-44
lines changed

src/client.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1434,7 +1434,7 @@ impl<T: Read + Write + Unpin + fmt::Debug> Connection<T> {
14341434
}
14351435

14361436
if let Some(unsolicited) = unsolicited.clone() {
1437-
handle_unilateral(response, unsolicited).await;
1437+
handle_unilateral(response, unsolicited);
14381438
}
14391439

14401440
if let Some(res) = self.stream.next().await {

src/extensions/id.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ pub(crate) async fn parse_id<T: Stream<Item = io::Result<ResponseData>> + Unpin>
5656
})
5757
}
5858
_ => {
59-
handle_unilateral(resp, unsolicited.clone()).await;
59+
handle_unilateral(resp, unsolicited.clone());
6060
}
6161
}
6262
}

src/extensions/idle.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ impl<T: Read + Write + Unpin + fmt::Debug + Send> Handle<T> {
168168
// continuation, wait for it
169169
}
170170
Response::Done { .. } => {
171-
handle_unilateral(resp, sender.clone()).await;
171+
handle_unilateral(resp, sender.clone());
172172
}
173173
_ => return Ok(IdleResponse::NewData(resp)),
174174
}
@@ -203,10 +203,10 @@ impl<T: Read + Write + Unpin + fmt::Debug + Send> Handle<T> {
203203
.into());
204204
}
205205
}
206-
handle_unilateral(res, self.session.unsolicited_responses_tx.clone()).await;
206+
handle_unilateral(res, self.session.unsolicited_responses_tx.clone());
207207
}
208208
_ => {
209-
handle_unilateral(res, self.session.unsolicited_responses_tx.clone()).await;
209+
handle_unilateral(res, self.session.unsolicited_responses_tx.clone());
210210
}
211211
}
212212
}

src/extensions/quota.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ pub(crate) async fn parse_get_quota<T: Stream<Item = io::Result<ResponseData>> +
3030
match resp.parsed() {
3131
Response::Quota(q) => quota = Some(q.clone().into()),
3232
_ => {
33-
handle_unilateral(resp, unsolicited.clone()).await;
33+
handle_unilateral(resp, unsolicited.clone());
3434
}
3535
}
3636
}
@@ -65,7 +65,7 @@ pub(crate) async fn parse_get_quota_root<T: Stream<Item = io::Result<ResponseDat
6565
quotas.push(q.clone().into());
6666
}
6767
_ => {
68-
handle_unilateral(resp, unsolicited.clone()).await;
68+
handle_unilateral(resp, unsolicited.clone());
6969
}
7070
}
7171
}

src/parse.rs

+24-37
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ pub(crate) fn parse_names<T: Stream<Item = io::Result<ResponseData>> + Unpin + S
2929
Some(Ok(name))
3030
}
3131
_ => {
32-
handle_unilateral(resp, unsolicited).await;
32+
handle_unilateral(resp, unsolicited);
3333
None
3434
}
3535
},
@@ -79,7 +79,7 @@ pub(crate) fn parse_fetches<T: Stream<Item = io::Result<ResponseData>> + Unpin +
7979
Ok(resp) => match resp.parsed() {
8080
Response::Fetch(..) => Some(Ok(Fetch::new(resp))),
8181
_ => {
82-
handle_unilateral(resp, unsolicited).await;
82+
handle_unilateral(resp, unsolicited);
8383
None
8484
}
8585
},
@@ -157,7 +157,7 @@ pub(crate) async fn parse_status<T: Stream<Item = io::Result<ResponseData>> + Un
157157
}
158158
}
159159
_ => {
160-
handle_unilateral(resp, unsolicited.clone()).await;
160+
handle_unilateral(resp, unsolicited.clone());
161161
}
162162
}
163163
}
@@ -182,7 +182,7 @@ pub(crate) fn parse_expunge<T: Stream<Item = io::Result<ResponseData>> + Unpin +
182182
Ok(resp) => match resp.parsed() {
183183
Response::Expunge(id) => Some(Ok(*id)),
184184
_ => {
185-
handle_unilateral(resp, unsolicited).await;
185+
handle_unilateral(resp, unsolicited);
186186
None
187187
}
188188
},
@@ -213,7 +213,7 @@ pub(crate) async fn parse_capabilities<T: Stream<Item = io::Result<ResponseData>
213213
}
214214
}
215215
_ => {
216-
handle_unilateral(resp, unsolicited.clone()).await;
216+
handle_unilateral(resp, unsolicited.clone());
217217
}
218218
}
219219
}
@@ -232,7 +232,7 @@ pub(crate) async fn parse_noop<T: Stream<Item = io::Result<ResponseData>> + Unpi
232232
.await
233233
{
234234
let resp = resp?;
235-
handle_unilateral(resp, unsolicited.clone()).await;
235+
handle_unilateral(resp, unsolicited.clone());
236236
}
237237

238238
Ok(())
@@ -338,7 +338,7 @@ pub(crate) async fn parse_mailbox<T: Stream<Item = io::Result<ResponseData>> + U
338338
}
339339
}
340340
Response::MailboxData(m) => match m {
341-
MailboxDatum::Status { .. } => handle_unilateral(resp, unsolicited.clone()).await,
341+
MailboxDatum::Status { .. } => handle_unilateral(resp, unsolicited.clone()),
342342
MailboxDatum::Exists(e) => {
343343
mailbox.exists = *e;
344344
}
@@ -358,7 +358,7 @@ pub(crate) async fn parse_mailbox<T: Stream<Item = io::Result<ResponseData>> + U
358358
_ => {}
359359
},
360360
_ => {
361-
handle_unilateral(resp, unsolicited.clone()).await;
361+
handle_unilateral(resp, unsolicited.clone());
362362
}
363363
}
364364
}
@@ -386,7 +386,7 @@ pub(crate) async fn parse_ids<T: Stream<Item = io::Result<ResponseData>> + Unpin
386386
}
387387
}
388388
_ => {
389-
handle_unilateral(resp, unsolicited.clone()).await;
389+
handle_unilateral(resp, unsolicited.clone());
390390
}
391391
}
392392
}
@@ -421,57 +421,44 @@ pub(crate) async fn parse_metadata<T: Stream<Item = io::Result<ResponseData>> +
421421
// [Unsolicited METADATA Response without Values](https://datatracker.ietf.org/doc/html/rfc5464.html#section-4.4.2),
422422
// they go to unsolicited channel with other unsolicited responses.
423423
_ => {
424-
handle_unilateral(resp, unsolicited.clone()).await;
424+
handle_unilateral(resp, unsolicited.clone());
425425
}
426426
}
427427
}
428428
Ok(res_values)
429429
}
430430

431-
// check if this is simply a unilateral server response
432-
// (see Section 7 of RFC 3501):
433-
pub(crate) async fn handle_unilateral(
431+
// Sends unilateral server response
432+
// (see Section 7 of RFC 3501)
433+
// into the channel.
434+
//
435+
// If the channel is full or closed,
436+
// i.e. the responses are not being consumed,
437+
// ignores new responses.
438+
pub(crate) fn handle_unilateral(
434439
res: ResponseData,
435440
unsolicited: channel::Sender<UnsolicitedResponse>,
436441
) {
437-
// ignore these if they are not being consumed
438-
if unsolicited.is_full() {
439-
return;
440-
}
441-
442442
match res.parsed() {
443443
Response::MailboxData(MailboxDatum::Status { mailbox, status }) => {
444444
unsolicited
445-
.send(UnsolicitedResponse::Status {
445+
.try_send(UnsolicitedResponse::Status {
446446
mailbox: (mailbox.as_ref()).into(),
447447
attributes: status.to_vec(),
448448
})
449-
.await
450-
.expect("Channel closed unexpectedly");
449+
.ok();
451450
}
452451
Response::MailboxData(MailboxDatum::Recent(n)) => {
453-
unsolicited
454-
.send(UnsolicitedResponse::Recent(*n))
455-
.await
456-
.expect("Channel closed unexpectedly");
452+
unsolicited.try_send(UnsolicitedResponse::Recent(*n)).ok();
457453
}
458454
Response::MailboxData(MailboxDatum::Exists(n)) => {
459-
unsolicited
460-
.send(UnsolicitedResponse::Exists(*n))
461-
.await
462-
.expect("Channel closed unexpectedly");
455+
unsolicited.try_send(UnsolicitedResponse::Exists(*n)).ok();
463456
}
464457
Response::Expunge(n) => {
465-
unsolicited
466-
.send(UnsolicitedResponse::Expunge(*n))
467-
.await
468-
.expect("Channel closed unexpectedly");
458+
unsolicited.try_send(UnsolicitedResponse::Expunge(*n)).ok();
469459
}
470460
_ => {
471-
unsolicited
472-
.send(UnsolicitedResponse::Other(res))
473-
.await
474-
.expect("Channel closed unexpectedly");
461+
unsolicited.try_send(UnsolicitedResponse::Other(res)).ok();
475462
}
476463
}
477464
}

0 commit comments

Comments
 (0)