From ea0b23e238e529b830d169c005058e26802405ce Mon Sep 17 00:00:00 2001 From: Hussein Ait Lahcen Date: Mon, 18 Nov 2024 16:39:41 +0100 Subject: [PATCH] fix: mempool reactor --- mempool/reactor.go | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/mempool/reactor.go b/mempool/reactor.go index 73b615dec6e..d9027b3ff9f 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -191,6 +191,8 @@ type PeerState interface { // Send new mempool txs to peer. func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) { + memR.Logger.Debug("BroadcastTxRoutine start") + var next *clist.CElement // If the node is catching up, don't start this routine immediately. @@ -216,6 +218,7 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) { select { case <-memR.mempool.TxsWaitChan(): // Wait until a tx is available if next = memR.mempool.TxsFront(); next == nil { + memR.Logger.Debug("Waiting for transaction") continue } case <-peer.Quit(): @@ -234,6 +237,7 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) { // will be initialized before the consensus reactor. We should wait a few // milliseconds and retry. time.Sleep(PeerCatchupSleepIntervalMS * time.Millisecond) + memR.Logger.Debug("Invalid peer state, rechecking.") continue } @@ -252,18 +256,24 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) { // NOTE: Transaction batching was disabled due to // https://github.com/tendermint/tendermint/issues/5796 - // Do not send this transaction if we receive it from peer. - if memTx.isSender(peer.ID()) { + // The entry may have been removed from the mempool since it was + // chosen at the beginning of the loop. Skip it if that's the case. + if !memR.mempool.InMempool(memTx.tx.Key()) { + memR.Logger.Debug("Transaction no longer in mempool, skipping.") + time.Sleep(PeerCatchupSleepIntervalMS * time.Millisecond) continue } - success := peer.Send(p2p.Envelope{ - ChannelID: MempoolChannel, - Message: &protomem.Txs{Txs: [][]byte{memTx.tx}}, - }) - if !success { - time.Sleep(PeerCatchupSleepIntervalMS * time.Millisecond) - continue + // Do not send this transaction if we receive it from peer. + if !memTx.isSender(peer.ID()) { + success := peer.Send(p2p.Envelope{ + ChannelID: MempoolChannel, + Message: &protomem.Txs{Txs: [][]byte{memTx.tx}}, + }) + if !success { + time.Sleep(PeerCatchupSleepIntervalMS * time.Millisecond) + continue + } } select {