Skip to content

Commit

Permalink
fixes #198 negative send/receive deadlines do not work
Browse files Browse the repository at this point in the history
This actually affected pretty much all the protocols and both send
and receive deadlines.  While here we've also made sure that
a non-blocking check will not fail if a message can immediately
be sent or received.
  • Loading branch information
gdamore committed Sep 27, 2020
1 parent e0b0da2 commit 4d69051
Show file tree
Hide file tree
Showing 20 changed files with 448 additions and 164 deletions.
60 changes: 35 additions & 25 deletions protocol/rep/rep.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 The Mangos Authors
// Copyright 2020 The Mangos Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use file except in compliance with the License.
Expand Down Expand Up @@ -96,7 +96,7 @@ func (c *context) RecvMsg() (*protocol.Message, error) {
expireTime := c.recvExpire
s.Unlock()

if expireTime > 0 {
if expireTime != 0 {
wq = time.After(expireTime)
}

Expand All @@ -107,10 +107,15 @@ func (c *context) RecvMsg() (*protocol.Message, error) {
select {
case entry := <-s.recvQ:
m, p = entry.m, entry.p
case <-wq:
err = protocol.ErrRecvTimeout
case <-cq:
err = protocol.ErrClosed
default:
select {
case entry := <-s.recvQ:
m, p = entry.m, entry.p
case <-wq:
err = protocol.ErrRecvTimeout
case <-cq:
err = protocol.ErrClosed
}
}

s.Lock()
Expand Down Expand Up @@ -144,7 +149,7 @@ func (c *context) SendMsg(m *protocol.Message) error {
timeQ := nilQ
if bestEffort {
timeQ = closedQ
} else if c.sendExpire > 0 {
} else if c.sendExpire != 0 {
timeQ = time.After(c.sendExpire)
}

Expand All @@ -154,26 +159,31 @@ func (c *context) SendMsg(m *protocol.Message) error {
r.Unlock()

select {
case <-cq:
m.Header = nil
return protocol.ErrClosed
case <-p.closeQ:
// Pipe closed, so no way to get it to the recipient.
// Just discard the message.
m.Free()
case p.sendQ <- m:
return nil
case <-timeQ:
if bestEffort {
// No way to report to caller, so just discard
// the message.
default:
select {
case <-cq:
m.Header = nil
return protocol.ErrClosed
case <-p.closeQ:
// Pipe closed, so no way to get it to the recipient.
// Just discard the message.
m.Free()
return nil
}
m.Header = nil
return protocol.ErrSendTimeout
case <-timeQ:
if bestEffort {
// No way to report to caller, so just discard
// the message.
m.Free()
return nil
}
m.Header = nil
return protocol.ErrSendTimeout

case p.sendQ <- m:
return nil
case p.sendQ <- m:
return nil
}
}
}

Expand Down Expand Up @@ -228,7 +238,7 @@ func (c *context) SetOption(name string, v interface{}) error {
return protocol.ErrBadValue

case protocol.OptionSendDeadline:
if val, ok := v.(time.Duration); ok && val > 0 {
if val, ok := v.(time.Duration); ok {
c.s.Lock()
c.sendExpire = val
c.s.Unlock()
Expand All @@ -237,7 +247,7 @@ func (c *context) SetOption(name string, v interface{}) error {
return protocol.ErrBadValue

case protocol.OptionRecvDeadline:
if val, ok := v.(time.Duration); ok && val > 0 {
if val, ok := v.(time.Duration); ok {
c.s.Lock()
c.recvExpire = val
c.s.Unlock()
Expand Down
55 changes: 53 additions & 2 deletions protocol/rep/rep_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 The Mangos Authors
// Copyright 2020 The Mangos Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use file except in compliance with the License.
Expand Down Expand Up @@ -121,6 +121,40 @@ func TestRepBestEffortSend(t *testing.T) {
MustSucceed(t, p.Close())
}

func TestRepSendNonBlocking(t *testing.T) {
s := GetSocket(t, NewSocket)
p := GetSocket(t, xreq.NewSocket)
MustSucceed(t, s.SetOption(mangos.OptionWriteQLen, 1))
MustSucceed(t, p.SetOption(mangos.OptionReadQLen, 1))
MustSucceed(t, s.SetOption(mangos.OptionSendDeadline, time.Duration(-1)))

ConnectPair(t, s, p)
for i := 0; i < 100; i++ {
// We have to make a raw message when using xreq. We
// use xreq because normal req will simply discard
// messages for requests it doesn't have outstanding.
m := mangos.NewMessage(0)
m.Header = make([]byte, 4)
binary.BigEndian.PutUint32(m.Header, uint32(i)|0x80000000)
MustSucceed(t, p.SendMsg(m))
m = MustRecvMsg(t, s)
start := time.Now()
e := s.SendMsg(m)
if e != nil {
MustBeError(t, e, mangos.ErrSendTimeout)
m.Free()
MustBeTrue(t, time.Since(start) < time.Second)
break
} else if i > 20 {
MustBeError(t, e, mangos.ErrSendTimeout)
}
// NB: We never ask the peer to receive it -- this ensures we
// encounter back-pressure.
}
MustSucceed(t, s.Close())
MustSucceed(t, p.Close())
}

// This verifies that closing the socket aborts a blocking send.
// We use a context because closing the socket also closes pipes
// making it less reproducible.
Expand Down Expand Up @@ -190,6 +224,23 @@ func TestRepRecvJunk(t *testing.T) {
MustSucceed(t, self.Close())
}

func TestRepRecvNonBlocking(t *testing.T) {
s := GetSocket(t, NewSocket)
defer MustClose(t, s)
p := GetSocket(t, req.NewSocket)
defer MustClose(t, p)

MustSucceed(t, s.SetOption(mangos.OptionRecvDeadline, time.Duration(-1)))

ConnectPair(t, s, p)

start := time.Now()
m, e := s.Recv()
MustBeNil(t, m)
MustBeError(t, e, mangos.ErrRecvTimeout)
MustBeTrue(t, time.Since(start) < time.Second)
}

func TestRepDoubleRecv(t *testing.T) {
self := GetSocket(t, NewSocket)
MustSucceed(t, self.SetOption(mangos.OptionRecvDeadline, time.Second))
Expand Down Expand Up @@ -266,7 +317,7 @@ func TestRepPipeRecvCloseSocket(t *testing.T) {

// This sets up a bunch of contexts to run in parallel, and verifies that
// they all seem to run with no mis-deliveries.
func TestRespondentMultiContexts(t *testing.T) {
func TestRepMultiContexts(t *testing.T) {
count := 30
repeat := 20

Expand Down
6 changes: 6 additions & 0 deletions protocol/req/req.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,8 @@ func (c *context) SendMsg(m *protocol.Message) error {
}
s.Unlock()
})
} else if c.sendExpire < 0 {
expired = true
}

s.send()
Expand Down Expand Up @@ -322,6 +324,10 @@ func (c *context) RecvMsg() (*protocol.Message, error) {
}

for id == c.reqID && c.repMsg == nil {
if c.recvExpire < 0 {
c.cancel()
return nil, protocol.ErrRecvTimeout
}
c.cond.Wait()
}

Expand Down
31 changes: 30 additions & 1 deletion protocol/req/req_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 The Mangos Authors
// Copyright 2020 The Mangos Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use file except in compliance with the License.
Expand Down Expand Up @@ -78,6 +78,22 @@ func TestReqRecvDeadline(t *testing.T) {
MustSucceed(t, peer.Close())
}

func TestReqRecvNonBlocking(t *testing.T) {
self := GetSocket(t, NewSocket)
peer := GetSocket(t, rep.NewSocket)
ConnectPair(t, self, peer)
MustSucceed(t, self.SetOption(mangos.OptionRecvDeadline, time.Duration(-1)))
MustSucceed(t, self.Send([]byte{}))
_ = MustRecv(t, peer)
start := time.Now()
m, e := self.RecvMsg()
MustBeError(t, e, mangos.ErrRecvTimeout)
MustBeTrue(t, time.Since(start) < time.Second)
MustBeNil(t, m)
MustSucceed(t, self.Close())
MustSucceed(t, peer.Close())
}

func TestReqContextClosed(t *testing.T) {
s := GetSocket(t, NewSocket)
c, e := s.OpenContext()
Expand Down Expand Up @@ -160,8 +176,21 @@ func TestReqBestEffort(t *testing.T) {
MustSucceed(t, s.SetOption(mangos.OptionBestEffort, false))
MustBeError(t, s.Send(msg), mangos.ErrSendTimeout)
MustBeError(t, s.Send(msg), mangos.ErrSendTimeout)
MustClose(t, s)
}

func TestReqSendNonBlocking(t *testing.T) {
timeout := -time.Millisecond
msg := []byte{'0', '1', '2', '3'}

s := GetSocket(t, NewSocket)
MustSucceed(t, s.SetOption(mangos.OptionSendDeadline, timeout))
MustSucceed(t, s.Listen(AddrTestInp()))
MustBeError(t, s.Send(msg), mangos.ErrSendTimeout)
MustClose(t, s)
}


// This test demonstrates cancellation before calling receive but after the
// message is received causes the original message to be discarded.
func TestReqRetry(t *testing.T) {
Expand Down
10 changes: 7 additions & 3 deletions protocol/respondent/respondent.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 The Mangos Authors
// Copyright 2020 The Mangos Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use file except in compliance with the License.
Expand Down Expand Up @@ -102,6 +102,8 @@ func (c *context) RecvMsg() (*protocol.Message, error) {

if expTime > 0 {
tq = time.After(expTime)
} else if expTime < 0 {
tq = closedQ
}

select {
Expand Down Expand Up @@ -147,6 +149,8 @@ func (c *context) SendMsg(m *protocol.Message) error {
tq = closedQ
} else if c.sendExpire > 0 {
tq = time.After(c.sendExpire)
} else if c.sendExpire < 0 {
tq = closedQ
}

m.Header = bt
Expand Down Expand Up @@ -220,7 +224,7 @@ func (c *context) GetOption(name string) (interface{}, error) {
func (c *context) SetOption(name string, v interface{}) error {
switch name {
case protocol.OptionSendDeadline:
if val, ok := v.(time.Duration); ok && val.Nanoseconds() > 0 {
if val, ok := v.(time.Duration); ok {
c.s.Lock()
c.sendExpire = val
c.s.Unlock()
Expand All @@ -229,7 +233,7 @@ func (c *context) SetOption(name string, v interface{}) error {
return protocol.ErrBadValue

case protocol.OptionRecvDeadline:
if val, ok := v.(time.Duration); ok && val.Nanoseconds() > 0 {
if val, ok := v.(time.Duration); ok {
c.s.Lock()
c.recvExpire = val
c.s.Unlock()
Expand Down
48 changes: 47 additions & 1 deletion protocol/respondent/respondent_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 The Mangos Authors
// Copyright 2020 The Mangos Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use file except in compliance with the License.
Expand Down Expand Up @@ -264,6 +264,15 @@ func TestRespondentRecvExpire(t *testing.T) {
MustSucceed(t, s.Close())
}

func TestRespondentRecvNonBlocking(t *testing.T) {
s := GetSocket(t, NewSocket)
MustSucceed(t, s.SetOption(mangos.OptionRecvDeadline, time.Duration(-1)))
v, e := s.RecvMsg()
MustBeError(t, e, mangos.ErrRecvTimeout)
MustBeNil(t, v)
MustSucceed(t, s.Close())
}

func TestRespondentSendState(t *testing.T) {
s := GetSocket(t, NewSocket)
MustBeError(t, s.Send([]byte{}), mangos.ErrProtoState)
Expand Down Expand Up @@ -297,6 +306,43 @@ func TestRespondentBestEffortSend(t *testing.T) {
MustSucceed(t, p.Close())
}

func TestRespondentSendNonBlocking(t *testing.T) {
s := GetSocket(t, NewSocket)
p := GetSocket(t, xsurveyor.NewSocket)
MustSucceed(t, s.SetOption(mangos.OptionWriteQLen, 1))
MustSucceed(t, p.SetOption(mangos.OptionReadQLen, 1))
MustSucceed(t, s.SetOption(mangos.OptionSendDeadline, time.Duration(-1)))

ConnectPair(t, s, p)
for i := 0; i < 100; i++ {
// We have to make a raw message when using xsurveyor. We
// use xsurveyor because normal surveyor will simply discard
// messages for surveys it doesn't have outstanding.
m := mangos.NewMessage(0)
m.Header = make([]byte, 4)
binary.BigEndian.PutUint32(m.Header, uint32(i)|0x80000000)
MustSucceed(t, p.SendMsg(m))
m, e := s.RecvMsg()
MustSucceed(t, e)
MustNotBeNil(t, m)
start := time.Now()
e = s.SendMsg(m)
if e != nil {
MustBeError(t, e, mangos.ErrSendTimeout)
m.Free()
MustBeTrue(t, time.Since(start) < time.Second)
break
} else if i > 20 {
MustBeError(t, e, mangos.ErrSendTimeout)
}

// NB: We never ask the peer to receive it -- this ensures we
// encounter backpressure.
}
MustSucceed(t, s.Close())
MustSucceed(t, p.Close())
}

func TestRespondentSendBackPressure(t *testing.T) {
s := GetSocket(t, NewSocket)
p := GetSocket(t, xsurveyor.NewSocket)
Expand Down
Loading

0 comments on commit 4d69051

Please sign in to comment.