From 78b42accc921990767a1074bc46270247ff80894 Mon Sep 17 00:00:00 2001 From: Jordan Hrycaj Date: Mon, 17 Mar 2025 09:35:06 +0000 Subject: [PATCH] Implement rlpx peer connections limit why The number of incoming connections were not restricted in the `p2p` handler. Even worse, there was a nimbus option `--max-peers` which was used to set up the minimum(sic) number of peers to keep connected with. Now, the re-organised nimbus options mean: * `--min-peers` -- minimum number of connections to hold * `--max-peers` -- maximum number of all peer connections caveat While the p2p module now rejects new incoming connections when the limit is reached it does allow more connection list entries for outgoing connection when explicitly using the `connectToNode()` function (as in `rpc/common.nim`, `sync/peers.nim` and probably others.) In practice, this lead to at most one more connections on the list. --- execution_chain/config.nim | 12 ++- execution_chain/networking/p2p.nim | 7 ++ execution_chain/networking/p2p_types.nim | 1 + execution_chain/networking/rlpx.nim | 87 ++++++++++++++++++++- execution_chain/nimbus_execution_client.nim | 4 +- tests/networking/test_rlpx_thunk.nim | 48 +++++++++--- 6 files changed, 144 insertions(+), 15 deletions(-) diff --git a/execution_chain/config.nim b/execution_chain/config.nim index 04b1e55ba..da45c9ee1 100644 --- a/execution_chain/config.nim +++ b/execution_chain/config.nim @@ -296,8 +296,13 @@ type defaultValueDesc: "default to --tcp-port" name: "udp-port" }: Port + minPeers* {. + desc: "Minimum number of peers to keep connected with" + defaultValue: 10 + name: "min-peers" }: int + maxPeers* {. - desc: "Maximum number of peers to connect to" + desc: "Maximum number of peer connections" defaultValue: 25 name: "max-peers" }: int @@ -833,6 +838,11 @@ proc makeConfig*(cmdLine = commandLineParams()): NimbusConf # if udpPort not set in cli, then result.udpPort = result.tcpPort + if not (0 <= result.minPeers and result.minPeers <= result.maxPeers): + fatal "Invalid min/max peers options", `min-peers`=result.minPeers, + `max-peers`=result.maxPeers + quit QuitFailure + # see issue #1346 if result.keyStore.string == defaultKeystoreDir() and result.dataDir.string != defaultDataDir(): diff --git a/execution_chain/networking/p2p.nim b/execution_chain/networking/p2p.nim index 414a2108b..1aa613b2b 100644 --- a/execution_chain/networking/p2p.nim +++ b/execution_chain/networking/p2p.nim @@ -82,6 +82,7 @@ proc newEthereumNode*( clientId = "nim-eth-p2p", addAllCapabilities = true, minPeers = 10, + maxPeers = 25, bootstrapNodes: seq[ENode] = @[], bindUdpPort: Port, bindTcpPort: Port, @@ -101,6 +102,7 @@ proc newEthereumNode*( result.connectionState = ConnectionState.None result.bindIp = bindIp result.bindPort = bindTcpPort + result.maxPeers = maxPeers result.discovery = newDiscoveryProtocol( keys.seckey, address, bootstrapNodes, bindUdpPort, bindIp, rng) @@ -121,6 +123,11 @@ proc processIncoming(server: StreamServer, remote: StreamTransport): Future[void] {.async: (raises: []).} = try: var node = getUserData[EthereumNode](server) + + if node.maxPeers <= node.peerPool.connectedNodes.len: + await node.rlpxReject(remote, TooManyPeers) + return + let peer = await node.rlpxAccept(remote) if not peer.isNil: trace "Connection established (incoming)", peer diff --git a/execution_chain/networking/p2p_types.nim b/execution_chain/networking/p2p_types.nim index 2cac17f9b..ff5766b72 100644 --- a/execution_chain/networking/p2p_types.nim +++ b/execution_chain/networking/p2p_types.nim @@ -39,6 +39,7 @@ type protocolStates*: seq[RootRef] discovery*: DiscoveryProtocol rng*: ref HmacDrbgContext + maxPeers*: int # limit on accepting incoming connection requests Peer* = ref object remote*: Node diff --git a/execution_chain/networking/rlpx.nim b/execution_chain/networking/rlpx.nim index 24d7ae011..6b2a31ab8 100644 --- a/execution_chain/networking/rlpx.nim +++ b/execution_chain/networking/rlpx.nim @@ -72,6 +72,19 @@ declarePublicCounter rlpx_accept_success, "Number of successful rlpx accepted pe declarePublicCounter rlpx_accept_failure, "Number of rlpx accept attempts that failed", labels = ["reason"] +declarePublicCounter rlpx_reject_success, + "Number of successfully rejected rlpx peers" + +declarePublicCounter rlpx_reject_failure, + "Number of failed rlpx reject attempts", labels = ["reason"] + + +formatIt(TransportAddress): + $it # remoteAddress=stream.remoteAddress() + +formatIt(cstring): + $it # exc=e.name + logScope: topics = "eth p2p rlpx" @@ -1304,7 +1317,7 @@ proc rlpxAccept*( debug "Could not get remote address", err = exc.msg return nil - trace "Incoming connection", remoteAddress = $remoteAddress + trace "Incoming connection to accept", remoteAddress = $remoteAddress peer.transport = try: @@ -1403,6 +1416,78 @@ proc rlpxAccept*( return peer +proc rlpxReject*( + node: EthereumNode; + stream: StreamTransport; + reason: DisconnectionReason; + ): Future[void] {.async: (raises: [CancelledError]).} = + ## Instead of accepting a connection running the Hello handshake, + ## a Disconnect message is sent. + initTracing(devp2pInfo, node.protocols) + + let + peer = Peer(network: node) + deadline = sleepAsync(connectionTimeout) + + defer: + deadline.cancelSoon() + stream.close() + + let remoteAddress = + try: + stream.remoteAddress() + except TransportOsError as e: + debug "Could not get remote address", exc=e.name, err=e.msg + return + + trace "Incoming connection to reject", remoteAddress, reason + + peer.transport = + try: + await RlpxTransport.accept(node.rng, node.keys, stream).wait(deadline) + except AsyncTimeoutError: + debug "Timeout while rejecting", remoteAddress + rlpx_reject_failure.inc(labelValues = ["timeout"]) + return + except RlpxTransportError as e: + debug "Rlpx error while rejecting", + remoteAddress, exc=e.name, err=e.msg + rlpx_reject_failure.inc(labelValues = [$BreachOfProtocol]) + return + except TransportError as e: + debug "Transport error while rejecting", + remoteAddress, exc=e.name, err=e.msg + rlpx_reject_failure.inc(labelValues = [$TcpError]) + return + + let + ip = try: + remoteAddress.address + except ValueError: + raiseAssert "only tcp sockets supported" + address = Address( + ip: ip, tcpPort: remoteAddress.port, udpPort: remoteAddress.port) + + peer.remote = newNode(ENode(pubkey: peer.transport.pubkey, address: address)) + + logDisconnectedPeer peer + logScope: remote=peer.remote + + try: + await peer.sendDisconnectMsg( + DisconnectionReasonList(value: reason)).wait(deadline) + except AsyncTimeoutError: + debug "Disconnect handshake timeout" + rlpx_reject_failure.inc(labelValues = ["timeout"]) + return + except EthP2PError as e: + debug "Disconnect handshake error", name=e.name, err=e.msg + rlpx_reject_failure.inc(labelValues = ["error"]) + return + + trace "DevP2P sent Disconnect", reason + rlpx_reject_success.inc() + #------------------------------------------------------------------------------ # Mini Protocol DSL #------------------------------------------------------------------------------ diff --git a/execution_chain/nimbus_execution_client.nim b/execution_chain/nimbus_execution_client.nim index 730c4ecd7..dbaad81bb 100644 --- a/execution_chain/nimbus_execution_client.nim +++ b/execution_chain/nimbus_execution_client.nim @@ -96,7 +96,9 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf, nimbus.ethNode = newEthereumNode( keypair, address, conf.networkId, conf.agentString, - addAllCapabilities = false, minPeers = conf.maxPeers, + addAllCapabilities = false, + minPeers = conf.minPeers, + maxPeers = conf.maxPeers, bootstrapNodes = bootstrapNodes, bindUdpPort = conf.udpPort, bindTcpPort = conf.tcpPort, bindIp = conf.listenAddress, diff --git a/tests/networking/test_rlpx_thunk.nim b/tests/networking/test_rlpx_thunk.nim index 1cd86f276..eb42bb0a7 100644 --- a/tests/networking/test_rlpx_thunk.nim +++ b/tests/networking/test_rlpx_thunk.nim @@ -20,24 +20,35 @@ import let rng = newRng() -var - node1 = setupTestNode(rng, eth) - node2 = setupTestNode(rng, eth) +type ClientServerPair = tuple + client, server: EthereumNode -node2.startListening() -let res = waitFor node1.rlpxConnect(newNode(node2.toENode())) -check res.isOk() +proc setupClientServer(): ClientServerPair = + let + client = setupTestNode(rng, eth) + server = setupTestNode(rng, eth) + server.startListening() + (client, server) -let peer = res.get() +proc connectClient(cs: ClientServerPair): Result[Peer,RlpxError] = + waitFor cs.client.rlpxConnect(newNode(cs.server.toENode())) -proc testThunk(payload: openArray[byte]) = + +proc testThunk(peer: Peer, payload: openArray[byte]) = var (msgId, msgData) = recvMsgMock(payload) waitFor peer.invokeThunk(msgId, msgData) -proc testPayloads(filename: string) = - let js = json.parseFile(filename) +proc testPayloads(filename: string) = suite extractFilename(filename): + let + js = json.parseFile(filename) + cs = setupClientServer() + res = cs.connectClient() + + check res.isOk() + let peer = res.get() + for testname, testdata in js: test testname: let @@ -51,7 +62,7 @@ proc testPayloads(filename: string) = let payload = hexToSeqByte(payloadHex.str) if error.isNil: - testThunk(payload) + peer.testThunk(payload) else: if error.kind != JString: skip() @@ -60,9 +71,22 @@ proc testPayloads(filename: string) = # TODO: can I convert the error string to an Exception type at runtime? expect CatchableError: try: - testThunk(payload) + peer.testThunk(payload) except CatchableError as e: check: e.name == error.str raise e + +proc testRejectHello() = + suite "Reject incoming connection": + let cs = setupClientServer() + cs.server.maxPeers = 0 # so incoming messages will be rejected + + test "Hello message TooManyPeersError reply": + let rc = cs.connectClient() + check rc.isErr() + check rc.error == TooManyPeersError + + testPayloads(sourceDir / "test_rlpx_thunk.json") +testRejectHello()