Skip to content

Commit 98d42db

Browse files
committed
pipe: add server backlog for concurrent Accept()
Teach `pipe.go:ListenPipe()` to create multiple instances of the server pipe in the kernel so that client connections are less likely to receive a `windows.ERROR_PIPE_BUSY` error. This is conceptually similar to the `backlog` argument of the Unix `listen(2)` function. The current `listenerRoutine()` function works sequentially and in response to calls to `Accept()`, such that there will only be at most one unbound server pipe present at any time. Even if the server application calls `Accept()` concurrrently from a pool of application threads, `listenerRoutine()` will process them sequentially. In this model and because there is only one `listenerRoutine()` instance, there is an interval of time where there are no available unbound/free server pipes. When `ConnectNamedPipe()` returns `listenerRoutine()` sends the new pipe handle over a channel to the caller of `Accept()`. Application code then has an opportunity to dispatch/process it and then call `Accept()` again. This causes `listenerRoutine()` to create a new unbound serer pipe and wait for the next connection. Anytime during this interval, a client will get a pipe busy error. Code in `DialPipe()` hides this from GOLANG callers because it includes a busy retry loop. However, clients written in other languages without this assistance are likely to see it and deal with it. This change introduces an "accept queue" using a buffered channel and splits `listenerRoutine()` into a pool of listener worker threads. Each worker creates a new unbound pipe and waits for a client connection. The NPFS and kernel handle connectioni delivery to a random listener worker. The resulting connected pipe is delivered back to the caller `Accept()` as before. A `PipeConfig.QueueSize` variable controls the number of listener worker threads and the maximum number of unbound/free pipes server pipes that will be present at any given time. Note that a listener worker will normally have an unbound/free pipe except during that same delivery interval. Having multiple active workers gives us extra capacity to handle rapidly arriving connections. The application is encouraged to call `Accept()` from a pool of application workers. The size of the application pool should be the same or larger than the queue size to take full advantage of the listener queue. To preserve backwards compatibility, a queue size of 0 or 1 will behave as before. Also for backwards compatibility, listener workers are required to wait for an `Accept()` call so that the worker has a return channel to send the connected pipe and error code. This implies that the number of unbound pipes will be the smaller of the queue size and the application pool size. Finally, a Mutex was added to `l.Close()` to ensure that concurrent threads do not simultaneously try to shutdown the pipe. Signed-off-by: Jeff Hostetler <[email protected]>
1 parent 4f41be6 commit 98d42db

File tree

2 files changed

+373
-41
lines changed

2 files changed

+373
-41
lines changed

pipe.go

+152-41
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"net"
1212
"os"
1313
"runtime"
14+
"sync"
1415
"syscall"
1516
"time"
1617
"unsafe"
@@ -258,9 +259,30 @@ type win32PipeListener struct {
258259
firstHandle syscall.Handle
259260
path string
260261
config PipeConfig
261-
acceptCh chan (chan acceptResponse)
262-
closeCh chan int
263-
doneCh chan int
262+
263+
// `acceptQueueCh` is a buffered channel (of channels). Calls to
264+
// Accept() will append to this queue to schedule a listener-worker
265+
// to create a new named pipe instance in the named pipe file system
266+
// (NPFS) and then listen for a connection from a client.
267+
//
268+
// The resulting connected pipe (or error) will be signalled (back
269+
// to `Accept()`) on the channel value's channel.
270+
acceptQueueCh chan (chan acceptResponse)
271+
272+
// `shutdownStartedCh` will be closed to indicate that all listener
273+
// workers should shutdown. `l.Close()` will signal this to begin
274+
// a shutdown.
275+
shutdownStartedCh chan struct{}
276+
277+
// `shutdownFinishedCh` will be closed to indicate that `l.listenerRoutine()`
278+
// has stopped all of the listener worker threads and has finished the
279+
// shutdown. `l.Close()` must wait for this signal before returning.
280+
shutdownFinishedCh chan struct{}
281+
282+
// `closeMux` is used to create a critical section in `l.Close()` and
283+
// coordinate the shutdown and prevent problems if a second thread calls
284+
// `l.Close()` while a shutdown is in progress.
285+
closeMux sync.Mutex
264286
}
265287

266288
func makeServerPipeHandle(path string, sd []byte, c *PipeConfig, first bool) (syscall.Handle, error) {
@@ -383,7 +405,7 @@ func (l *win32PipeListener) makeConnectedServerPipe() (*win32File, error) {
383405
p.Close()
384406
p = nil
385407
}
386-
case <-l.closeCh:
408+
case <-l.shutdownStartedCh:
387409
// Abort the connect request by closing the handle.
388410
p.Close()
389411
p = nil
@@ -395,33 +417,44 @@ func (l *win32PipeListener) makeConnectedServerPipe() (*win32File, error) {
395417
return p, err
396418
}
397419

398-
func (l *win32PipeListener) listenerRoutine() {
399-
closed := false
400-
for !closed {
420+
func (l *win32PipeListener) listenerWorker(wg *sync.WaitGroup) {
421+
var stop bool
422+
for !stop {
401423
select {
402-
case <-l.closeCh:
403-
closed = true
404-
case responseCh := <-l.acceptCh:
405-
var (
406-
p *win32File
407-
err error
408-
)
409-
for {
410-
p, err = l.makeConnectedServerPipe()
411-
// If the connection was immediately closed by the client, try
412-
// again.
413-
if err != windows.ERROR_NO_DATA { //nolint:errorlint // err is Errno
414-
break
415-
}
416-
}
424+
case <-l.shutdownStartedCh:
425+
stop = true
426+
case responseCh := <-l.acceptQueueCh:
427+
p, err := l.makeConnectedServerPipe()
417428
responseCh <- acceptResponse{p, err}
418-
closed = err == ErrPipeListenerClosed //nolint:errorlint // err is Errno
419429
}
420430
}
431+
432+
wg.Done()
433+
}
434+
435+
func (l *win32PipeListener) listenerRoutine(queueSize int) {
436+
var wg sync.WaitGroup
437+
438+
for k := 0; k < queueSize; k++ {
439+
wg.Add(1)
440+
go l.listenerWorker(&wg)
441+
}
442+
443+
wg.Wait() // for all listenerWorkers to finish.
444+
445+
// We can assert here that `l.shutdownStartedCh` has been
446+
// signalled (since `l.Close()` closed it).
447+
//
448+
// We might consider draining the `l.acceptQueueCh` and
449+
// closing each of the channel instances, but that is not
450+
// necessary since the second "select" in `l.Accept()` is
451+
// waiting on the `requestCh` and `l.shutdownFinishedCh`.
452+
// And we're going to signal the latter in a moment.
453+
421454
syscall.Close(l.firstHandle)
422455
l.firstHandle = 0
423456
// Notify Close() and Accept() callers that the handle has been closed.
424-
close(l.doneCh)
457+
close(l.shutdownFinishedCh)
425458
}
426459

427460
// PipeConfig contain configuration for the pipe listener.
@@ -442,6 +475,19 @@ type PipeConfig struct {
442475

443476
// OutputBufferSize specifies the size of the output buffer, in bytes.
444477
OutputBufferSize int32
478+
479+
// QueueSize specifies the maximum number of concurrently active pipe server
480+
// handles to allow. This is conceptually similar to the `backlog` argument
481+
// to `listen(2)` on Unix systems. Increasing this value reduces the likelyhood
482+
// of a connecting client receiving a `windows.ERROR_PIPE_BUSY` error.
483+
// (Assuming that the server is written to call `l.Accept()` using a pool of
484+
// application worker threads.)
485+
//
486+
// This value should be larger than your expected client arrival rate so that
487+
// there are always a few extra listener worker threads and (more importantly)
488+
// unbound server pipes in the kernel, so that a client "CreateFile()" should
489+
// not get a busy signal.
490+
QueueSize int32
445491
}
446492

447493
// ListenPipe creates a listener on a Windows named pipe path, e.g. \\.\pipe\mypipe.
@@ -460,19 +506,30 @@ func ListenPipe(path string, c *PipeConfig) (net.Listener, error) {
460506
return nil, err
461507
}
462508
}
509+
510+
queueSize := int(c.QueueSize)
511+
if queueSize < 1 {
512+
// Legacy calls will pass 0 since they won't know to set the queue size.
513+
// Default to legacy behavior where we never have more than 1 available
514+
// unbound pipe and that is only present when an application thread is
515+
// blocked in `l.Accept()`.
516+
queueSize = 1
517+
}
518+
463519
h, err := makeServerPipeHandle(path, sd, c, true)
464520
if err != nil {
465521
return nil, err
466522
}
467523
l := &win32PipeListener{
468-
firstHandle: h,
469-
path: path,
470-
config: *c,
471-
acceptCh: make(chan (chan acceptResponse)),
472-
closeCh: make(chan int),
473-
doneCh: make(chan int),
474-
}
475-
go l.listenerRoutine()
524+
firstHandle: h,
525+
path: path,
526+
config: *c,
527+
acceptQueueCh: make(chan chan acceptResponse, queueSize),
528+
shutdownStartedCh: make(chan struct{}),
529+
shutdownFinishedCh: make(chan struct{}),
530+
closeMux: sync.Mutex{},
531+
}
532+
go l.listenerRoutine(queueSize)
476533
return l, nil
477534
}
478535

@@ -492,31 +549,85 @@ func connectPipe(p *win32File) error {
492549
}
493550

494551
func (l *win32PipeListener) Accept() (net.Conn, error) {
552+
553+
tryAgain:
495554
ch := make(chan acceptResponse)
555+
496556
select {
497-
case l.acceptCh <- ch:
498-
response := <-ch
499-
err := response.err
500-
if err != nil {
501-
return nil, err
557+
case l.acceptQueueCh <- ch:
558+
// We have queued a request for a worker thread to listen
559+
// for a connection.
560+
case <-l.shutdownFinishedCh:
561+
// The shutdown completed before we could request a connection.
562+
return nil, ErrPipeListenerClosed
563+
case <-l.shutdownStartedCh:
564+
// The shutdown is already in progress. Don't bother trying to
565+
// schedule a new request.
566+
return nil, ErrPipeListenerClosed
567+
}
568+
569+
// We queued a request. Now wait for a connection signal or a
570+
// shutdown while we were waiting.
571+
572+
select {
573+
case response := <-ch:
574+
if response.f == nil && response.err == nil {
575+
// The listener worker could close our channel instance
576+
// to indicate that the listener is shut down.
577+
return nil, ErrPipeListenerClosed
578+
}
579+
if response.err == ErrPipeListenerClosed {
580+
return nil, ErrPipeListenerClosed
581+
}
582+
if response.err == windows.ERROR_NO_DATA { //nolint:errorlint // err is Errno
583+
// If the connection was immediately closed by the client,
584+
// try again (without reporting an error or a dead connection
585+
// to the `Accept()` caller). This avoids spurious
586+
// "The pipe is being closed." messages.
587+
goto tryAgain
588+
}
589+
if response.err != nil {
590+
return nil, response.err
502591
}
503592
if l.config.MessageMode {
504593
return &win32MessageBytePipe{
505594
win32Pipe: win32Pipe{win32File: response.f, path: l.path},
506595
}, nil
507596
}
508597
return &win32Pipe{win32File: response.f, path: l.path}, nil
509-
case <-l.doneCh:
598+
case <-l.shutdownFinishedCh:
599+
// The shutdown started and completed while we were waiting for a
600+
// connection.
510601
return nil, ErrPipeListenerClosed
602+
603+
// case <-l.shutdownStartedCh:
604+
// We DO NOT watch for `l.shutdownStartedCh` because we need
605+
// to keep listening on our local `ch` so that the associated
606+
// listener worker can signal it without blocking when throwing
607+
// an ErrPipeListenerClosed error.
511608
}
512609
}
513610

514611
func (l *win32PipeListener) Close() error {
612+
l.closeMux.Lock()
515613
select {
516-
case l.closeCh <- 1:
517-
<-l.doneCh
518-
case <-l.doneCh:
614+
case <-l.shutdownFinishedCh:
615+
// The shutdown has already completed. Nothing to do.
616+
default:
617+
select {
618+
case <-l.shutdownStartedCh:
619+
// The shutdown is in progress. We should not get here because
620+
// of the Mutex, but either way, we don't want to race here
621+
// and accidentally close `l.shutdownStartedCh` twice.
622+
default:
623+
// Cause all listener workers to abort.
624+
close(l.shutdownStartedCh)
625+
// Wait for listenerRoutine to stop the workers and clean up.
626+
<-l.shutdownFinishedCh
627+
}
519628
}
629+
l.closeMux.Unlock()
630+
520631
return nil
521632
}
522633

0 commit comments

Comments
 (0)