@@ -1463,20 +1463,20 @@ _dispatch_fd_entry_create_with_fd(dispatch_fd_t fd, uintptr_t hash)
1463
1463
int result = ioctlsocket ((SOCKET )fd , (long )FIONBIO , & value );
1464
1464
(void )dispatch_assume_zero (result );
1465
1465
} else {
1466
- // Try to make writing nonblocking, although pipes not coming
1467
- // from Foundation.Pipe may not have FILE_WRITE_ATTRIBUTES.
1466
+ // The _dispatch_pipe_monitor_thread expects pipes to be
1467
+ // PIPE_WAIT and exploits this assumption by using a blocking
1468
+ // 0-byte read as a synchronization mechanism.
1468
1469
DWORD dwPipeMode = 0 ;
1469
1470
if (GetNamedPipeHandleState ((HANDLE )fd , & dwPipeMode , NULL ,
1470
- NULL , NULL , NULL , 0 ) && !(dwPipeMode & PIPE_NOWAIT )) {
1471
- dwPipeMode |= PIPE_NOWAIT ;
1471
+ NULL , NULL , NULL , 0 ) && !(dwPipeMode & PIPE_WAIT )) {
1472
+ dwPipeMode |= PIPE_WAIT ;
1472
1473
if (!SetNamedPipeHandleState ((HANDLE )fd , & dwPipeMode ,
1473
1474
NULL , NULL )) {
1474
- // We may end up blocking on subsequent writes, but we
1475
- // don't have a good alternative.
1476
- // The WriteQuotaAvailable from NtQueryInformationFile
1477
- // erroneously returns 0 when there is a blocking read
1478
- // on the other end of the pipe.
1479
- _dispatch_fd_entry_debug ("failed to set PIPE_NOWAIT" ,
1475
+ // If setting the pipe to PIPE_WAIT fails, the
1476
+ // monitoring thread will spin constantly, saturating
1477
+ // a core, which is undesirable but non-fatal.
1478
+ // The semantics will still be correct in this case.
1479
+ _dispatch_fd_entry_debug ("failed to set PIPE_WAIT" ,
1480
1480
fd_entry );
1481
1481
}
1482
1482
}
@@ -2551,13 +2551,40 @@ _dispatch_operation_perform(dispatch_operation_t op)
2551
2551
NTSTATUS status = _dispatch_NtQueryInformationFile (hFile ,
2552
2552
& iosb , & fpli , sizeof (fpli ), FilePipeLocalInformation );
2553
2553
if (NT_SUCCESS (status )) {
2554
- // WriteQuotaAvailable is unreliable in the presence
2555
- // of a blocking reader, when it can return zero, so only
2556
- // account for it otherwise
2557
- if (fpli .WriteQuotaAvailable > 0 ) {
2558
- len = MIN (len , fpli .WriteQuotaAvailable );
2554
+ // WriteQuotaAvailable is the free space in the output buffer
2555
+ // that has not already been reserved for reading. In other words,
2556
+ // WriteQuotaAvailable =
2557
+ // OutboundQuota - WriteQuotaUsed - QueuedReadSize.
2558
+ // It is not documented that QueuedReadSize is part of this
2559
+ // calculation, but this behavior has been observed experimentally.
2560
+ // Unfortunately, this means that it is not possible to distinguish
2561
+ // between a full output buffer and a reader blocked waiting for a
2562
+ // full buffer's worth of data. This is a problem because if the
2563
+ // output buffer is full and no reader is waiting for data, then
2564
+ // attempting to write to the buffer of a PIPE_WAIT, non-
2565
+ // overlapped I/O pipe will block the dispatch queue thread.
2566
+ //
2567
+ // In order to work around this idiosyncrasy, we bound the size of
2568
+ // the write to be OutboundQuota - 1. This affords us a sentinel value
2569
+ // in WriteQuotaAvailable that can be used to detect if a reader is
2570
+ // making progress or not.
2571
+ // WriteQuotaAvailable = 0 => a reader is blocked waiting for data.
2572
+ // WriteQuotaAvailable = 1 => the pipe has been written to, but no
2573
+ // reader is making progress.
2574
+ // When we detect that WriteQuotaAvailable == 1, we write 0 bytes to
2575
+ // avoid blocking the dispatch queue thread.
2576
+ if (fpli .WriteQuotaAvailable == 0 ) {
2577
+ // This condition can only occur when we have a reader blocked
2578
+ // waiting for data on the pipe. In this case, write a full
2579
+ // buffer's worth of data (less one byte to preserve this
2580
+ // sentinel value of WriteQuotaAvailable == 0).
2581
+ len = MIN (len , fpli .OutboundQuota - 1 );
2582
+ } else {
2583
+ // Subtract 1 from WriteQuotaAvailable to ensure we do not fill
2584
+ // the pipe and preserve the sentinel value of
2585
+ // WriteQuotaAvailable == 1.
2586
+ len = MIN (len , fpli .WriteQuotaAvailable - 1 );
2559
2587
}
2560
- len = MIN (len , fpli .OutboundQuota );
2561
2588
}
2562
2589
2563
2590
OVERLAPPED ovlOverlapped = {};
0 commit comments