Skip to content

Commit 1750e70

Browse files
committed
pipe: add server backlog for concurrent Accept()
Teach `pipe.go:ListenPipe()` to create multiple instances of the server pipe in the kernel so that client connections are less likely to receive a `windows.ERROR_PIPE_BUSY` error. This is conceptually similar to the `backlog` argument of the Unix `listen(2)` function. The current `listenerRoutine()` function works sequentially and in response to calls to `Accept()`, such that there will never be more than one unbound server pipe in the NPFS present at any time. Even if the server application calls `Accept()` concurrrently from a pool of application threads, the `listenerRoutine()` will process them sequentially. In this model, because there is only one `listenerRoutine()` instance, there is an interval of time where there are no available unbound/free server pipes. This happens when `ConnectNamedPipe()` returns and `listenerRoutine()` sends the new pipe handle via a channel to the caller of `Accept()` where the application code has an opportunity to use the pipe or give it to another goroutine and then call `Accept()` again. The subsequent `Accept()` call causes `listenerRoutine()` to create a new unbound serer pipe instance in the file system and wait for the next connection. Anytime during this interval, a client will get a pipe busy error. Code in `DialPipe()` hides this from GOLANG callers because it includes a busy-retry loop. However, clients written in other languages without this assistance are likely to see the busy error and be forced deal with it. This change introduces an "accept queue" using a buffered channel and splits `listenerRoutine()` into a pool of listener worker threads. Each worker creates a new unbound pipe instance in the file system and waits for a client connection. The NPFS and kernel handle connection delivery to a random listener worker. The resulting connected pipe is then delivered back to the caller of `Accept()` as before. A `PipeConfig.QueueSize` variable controls the number of listener worker threads and the maximum number of unbound/free pipes server pipes that will be present at any given time. Note that a listener worker will normally have an unbound/free pipe except during that same delivery interval. Having multiple active workers (and unbound pipes in the file system) gives us extra capacity to handle rapidly arriving connections and minimizes the odds of a client seeing a busy error. The application is encouraged to call `Accept()` from a pool of application workers. The size of the application pool should be the same or larger than the queue size to take full advantage of the listener queue. To preserve backwards compatibility, a queue size of 0 or 1 will behave as before. Also for backwards compatibility, listener workers are required to wait for an `Accept()` call so that the worker has a return channel to send the connected pipe and/or error code. This implies that the number of unbound pipes will be the smaller of the queue size and the application pool size. Finally, a Mutex was added to `l.Close()` to ensure that concurrent threads do not simultaneously try to shutdown the pipe. Signed-off-by: Jeff Hostetler <[email protected]>
1 parent 9f0d5dc commit 1750e70

File tree

2 files changed

+377
-42
lines changed

2 files changed

+377
-42
lines changed

pipe.go

+152-42
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"net"
1212
"os"
1313
"runtime"
14+
"sync"
1415
"time"
1516
"unsafe"
1617

@@ -297,9 +298,30 @@ type win32PipeListener struct {
297298
firstHandle windows.Handle
298299
path string
299300
config PipeConfig
300-
acceptCh chan (chan acceptResponse)
301-
closeCh chan int
302-
doneCh chan int
301+
302+
// `acceptQueueCh` is a buffered channel (of channels). Calls to
303+
// Accept() will append to this queue to schedule a listener-worker
304+
// to create a new named pipe instance in the named pipe file system
305+
// (NPFS) and then listen for a connection from a client.
306+
//
307+
// The resulting connected pipe (or error) will be signalled (back
308+
// to `Accept()`) on the channel value's channel.
309+
acceptQueueCh chan (chan acceptResponse)
310+
311+
// `shutdownStartedCh` will be closed to indicate that all listener
312+
// workers should shutdown. `l.Close()` will signal this to begin
313+
// a shutdown.
314+
shutdownStartedCh chan struct{}
315+
316+
// `shutdownFinishedCh` will be closed to indicate that `l.listenerRoutine()`
317+
// has stopped all of the listener worker threads and has finished the
318+
// shutdown. `l.Close()` must wait for this signal before returning.
319+
shutdownFinishedCh chan struct{}
320+
321+
// `closeMux` is used to create a critical section in `l.Close()` and
322+
// coordinate the shutdown and prevent problems if a second thread calls
323+
// `l.Close()` while a shutdown is in progress.
324+
closeMux sync.Mutex
303325
}
304326

305327
func makeServerPipeHandle(path string, sd []byte, c *PipeConfig, first bool) (windows.Handle, error) {
@@ -426,45 +448,56 @@ func (l *win32PipeListener) makeConnectedServerPipe() (*win32File, error) {
426448
p.Close()
427449
p = nil
428450
}
429-
case <-l.closeCh:
451+
case <-l.shutdownStartedCh:
430452
// Abort the connect request by closing the handle.
431453
p.Close()
432454
p = nil
433455
err = <-ch
434-
if err == nil || err == ErrFileClosed { //nolint:errorlint // err is Errno
456+
if err == nil || err == ErrFileClosed || err == windows.ERROR_OPERATION_ABORTED { //nolint:errorlint // err is Errno
435457
err = ErrPipeListenerClosed
436458
}
437459
}
438460
return p, err
439461
}
440462

441-
func (l *win32PipeListener) listenerRoutine() {
442-
closed := false
443-
for !closed {
463+
func (l *win32PipeListener) listenerWorker(wg *sync.WaitGroup) {
464+
var stop bool
465+
for !stop {
444466
select {
445-
case <-l.closeCh:
446-
closed = true
447-
case responseCh := <-l.acceptCh:
448-
var (
449-
p *win32File
450-
err error
451-
)
452-
for {
453-
p, err = l.makeConnectedServerPipe()
454-
// If the connection was immediately closed by the client, try
455-
// again.
456-
if err != windows.ERROR_NO_DATA { //nolint:errorlint // err is Errno
457-
break
458-
}
459-
}
467+
case <-l.shutdownStartedCh:
468+
stop = true
469+
case responseCh := <-l.acceptQueueCh:
470+
p, err := l.makeConnectedServerPipe()
460471
responseCh <- acceptResponse{p, err}
461-
closed = err == ErrPipeListenerClosed //nolint:errorlint // err is Errno
462472
}
463473
}
474+
475+
wg.Done()
476+
}
477+
478+
func (l *win32PipeListener) listenerRoutine(queueSize int) {
479+
var wg sync.WaitGroup
480+
481+
for k := 0; k < queueSize; k++ {
482+
wg.Add(1)
483+
go l.listenerWorker(&wg)
484+
}
485+
486+
wg.Wait() // for all listenerWorkers to finish.
487+
488+
// We can assert here that `l.shutdownStartedCh` has been
489+
// signalled (since `l.Close()` closed it).
490+
//
491+
// We might consider draining the `l.acceptQueueCh` and
492+
// closing each of the channel instances, but that is not
493+
// necessary since the second "select" in `l.Accept()` is
494+
// waiting on the `requestCh` and `l.shutdownFinishedCh`.
495+
// And we're going to signal the latter in a moment.
496+
464497
windows.Close(l.firstHandle)
465498
l.firstHandle = 0
466499
// Notify Close() and Accept() callers that the handle has been closed.
467-
close(l.doneCh)
500+
close(l.shutdownFinishedCh)
468501
}
469502

470503
// PipeConfig contain configuration for the pipe listener.
@@ -485,6 +518,19 @@ type PipeConfig struct {
485518

486519
// OutputBufferSize specifies the size of the output buffer, in bytes.
487520
OutputBufferSize int32
521+
522+
// QueueSize specifies the maximum number of concurrently active pipe server
523+
// handles to allow. This is conceptually similar to the `backlog` argument
524+
// to `listen(2)` on Unix systems. Increasing this value reduces the likelyhood
525+
// of a connecting client receiving a `windows.ERROR_PIPE_BUSY` error.
526+
// (Assuming that the server is written to call `l.Accept()` using a pool of
527+
// application worker threads.)
528+
//
529+
// This value should be larger than your expected client arrival rate so that
530+
// there are always a few extra listener worker threads and (more importantly)
531+
// unbound server pipes in the kernel, so that a client "CreateFile()" should
532+
// not get a busy signal.
533+
QueueSize int32
488534
}
489535

490536
// ListenPipe creates a listener on a Windows named pipe path, e.g. \\.\pipe\mypipe.
@@ -503,19 +549,30 @@ func ListenPipe(path string, c *PipeConfig) (net.Listener, error) {
503549
return nil, err
504550
}
505551
}
552+
553+
queueSize := int(c.QueueSize)
554+
if queueSize < 1 {
555+
// Legacy calls will pass 0 since they won't know to set the queue size.
556+
// Default to legacy behavior where we never have more than 1 available
557+
// unbound pipe and that is only present when an application thread is
558+
// blocked in `l.Accept()`.
559+
queueSize = 1
560+
}
561+
506562
h, err := makeServerPipeHandle(path, sd, c, true)
507563
if err != nil {
508564
return nil, err
509565
}
510566
l := &win32PipeListener{
511-
firstHandle: h,
512-
path: path,
513-
config: *c,
514-
acceptCh: make(chan (chan acceptResponse)),
515-
closeCh: make(chan int),
516-
doneCh: make(chan int),
517-
}
518-
go l.listenerRoutine()
567+
firstHandle: h,
568+
path: path,
569+
config: *c,
570+
acceptQueueCh: make(chan chan acceptResponse, queueSize),
571+
shutdownStartedCh: make(chan struct{}),
572+
shutdownFinishedCh: make(chan struct{}),
573+
closeMux: sync.Mutex{},
574+
}
575+
go l.listenerRoutine(queueSize)
519576
return l, nil
520577
}
521578

@@ -535,31 +592,84 @@ func connectPipe(p *win32File) error {
535592
}
536593

537594
func (l *win32PipeListener) Accept() (net.Conn, error) {
595+
tryAgain:
538596
ch := make(chan acceptResponse)
597+
539598
select {
540-
case l.acceptCh <- ch:
541-
response := <-ch
542-
err := response.err
543-
if err != nil {
544-
return nil, err
599+
case l.acceptQueueCh <- ch:
600+
// We have queued a request for a worker thread to listen
601+
// for a connection.
602+
case <-l.shutdownFinishedCh:
603+
// The shutdown completed before we could request a connection.
604+
return nil, ErrPipeListenerClosed
605+
case <-l.shutdownStartedCh:
606+
// The shutdown is already in progress. Don't bother trying to
607+
// schedule a new request.
608+
return nil, ErrPipeListenerClosed
609+
}
610+
611+
// We queued a request. Now wait for a connection signal or a
612+
// shutdown while we were waiting.
613+
614+
select {
615+
case response := <-ch:
616+
if response.f == nil && response.err == nil {
617+
// The listener worker could close our channel instance
618+
// to indicate that the listener is shut down.
619+
return nil, ErrPipeListenerClosed
620+
}
621+
if errors.Is(response.err, ErrPipeListenerClosed) {
622+
return nil, ErrPipeListenerClosed
623+
}
624+
if response.err == windows.ERROR_NO_DATA { //nolint:errorlint // err is Errno
625+
// If the connection was immediately closed by the client,
626+
// try again (without reporting an error or a dead connection
627+
// to the `Accept()` caller). This avoids spurious
628+
// "The pipe is being closed." messages.
629+
goto tryAgain
630+
}
631+
if response.err != nil {
632+
return nil, response.err
545633
}
546634
if l.config.MessageMode {
547635
return &win32MessageBytePipe{
548636
win32Pipe: win32Pipe{win32File: response.f, path: l.path},
549637
}, nil
550638
}
551639
return &win32Pipe{win32File: response.f, path: l.path}, nil
552-
case <-l.doneCh:
640+
case <-l.shutdownFinishedCh:
641+
// The shutdown started and completed while we were waiting for a
642+
// connection.
553643
return nil, ErrPipeListenerClosed
644+
645+
// case <-l.shutdownStartedCh:
646+
// We DO NOT watch for `l.shutdownStartedCh` because we need
647+
// to keep listening on our local `ch` so that the associated
648+
// listener worker can signal it without blocking when throwing
649+
// an ErrPipeListenerClosed error.
554650
}
555651
}
556652

557653
func (l *win32PipeListener) Close() error {
654+
l.closeMux.Lock()
558655
select {
559-
case l.closeCh <- 1:
560-
<-l.doneCh
561-
case <-l.doneCh:
656+
case <-l.shutdownFinishedCh:
657+
// The shutdown has already completed. Nothing to do.
658+
default:
659+
select {
660+
case <-l.shutdownStartedCh:
661+
// The shutdown is in progress. We should not get here because
662+
// of the Mutex, but either way, we don't want to race here
663+
// and accidentally close `l.shutdownStartedCh` twice.
664+
default:
665+
// Cause all listener workers to abort.
666+
close(l.shutdownStartedCh)
667+
// Wait for listenerRoutine to stop the workers and clean up.
668+
<-l.shutdownFinishedCh
669+
}
562670
}
671+
l.closeMux.Unlock()
672+
563673
return nil
564674
}
565675

0 commit comments

Comments
 (0)