Skip to content

Commit

Permalink
Merge bitcoin/bitcoin#30807: Fix peers abruptly disconnecting from As…
Browse files Browse the repository at this point in the history
…sumeUTXO nodes during IBD

992f83b test: add coverage for assumeUTXO honest peers disconnection (furszy)
6d5812e assumeUTXO: fix peers disconnection during sync (furszy)

Pull request description:

  Because AssumeUTXO nodes prioritize tip synchronization, they relay their local
  address through the network before completing the background chain sync.
  This, combined with the advertising of full-node service (`NODE_NETWORK`), can
  result in an honest peer in IBD connecting to the AssumeUTXO node (while syncing)
  and requesting an historical block the node does not have. This behavior leads to
  an abrupt disconnection due to perceived unresponsiveness from the AssumeUTXO
  node.

  This lack of response occurs because nodes ignore `getdata` requests when they do
  not have the block data available (further discussion can be found in #30385).

  Fix this by refraining from signaling full-node service support while the
  background chain is being synced. During this period, the node will only
  signal `NODE_NETWORK_LIMITED` support. Then, full-node (`NODE_NETWORK`)
  support will be re-enabled once the background chain sync is completed.

  Thanks mzumsande for a post-#30385 convo too.

  Testing notes:
  Just cherry-pick the second commit (bb08c22) on master.
  It will fail there, due to the IBD node requesting historical blocks to the snapshot
  node - which is bad because the snapshot node will ignore the requests and
  stall + disconnect after some time.

ACKs for top commit:
  achow101:
    ACK 992f83b
  naumenkogs:
    ACK 992f83b
  mzumsande:
    ACK 992f83b

Tree-SHA512: fef525d1cf3200c2dd89a346be9c82d77f2e28ddaaea1f490a435e180d1a47a371cadea508349777d740ab56e94be536ad8f7d61cc81f6550c58b609b3779ed3
  • Loading branch information
achow101 committed Sep 11, 2024
2 parents f6298a8 + 992f83b commit 349632e
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 11 deletions.
16 changes: 13 additions & 3 deletions src/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1579,7 +1579,12 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
// This is defined and set here instead of inline in validation.h to avoid a hard
// dependency between validation and index/base, since the latter is not in
// libbitcoinkernel.
chainman.restart_indexes = [&node]() {
chainman.snapshot_download_completed = [&node]() {
if (!node.chainman->m_blockman.IsPruneMode()) {
LogPrintf("[snapshot] re-enabling NODE_NETWORK services\n");
node.connman->AddLocalServices(NODE_NETWORK);
}

LogPrintf("[snapshot] restarting indexes\n");

// Drain the validation interface queue to ensure that the old indexes
Expand Down Expand Up @@ -1716,8 +1721,13 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
}
}
} else {
LogPrintf("Setting NODE_NETWORK on non-prune mode\n");
nLocalServices = ServiceFlags(nLocalServices | NODE_NETWORK);
// Prior to setting NODE_NETWORK, check if we can provide historical blocks.
if (!WITH_LOCK(chainman.GetMutex(), return chainman.BackgroundSyncInProgress())) {
LogPrintf("Setting NODE_NETWORK on non-prune mode\n");
nLocalServices = ServiceFlags(nLocalServices | NODE_NETWORK);
} else {
LogPrintf("Running node in NODE_NETWORK_LIMITED mode until snapshot background sync completes\n");
}
}

// ********************************************************* Step 11: import blocks
Expand Down
5 changes: 3 additions & 2 deletions src/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1791,7 +1791,8 @@ void CConnman::CreateNodeFromAcceptedSocket(std::unique_ptr<Sock>&& sock,
const bool inbound_onion = std::find(m_onion_binds.begin(), m_onion_binds.end(), addr_bind) != m_onion_binds.end();
// The V2Transport transparently falls back to V1 behavior when an incoming V1 connection is
// detected, so use it whenever we signal NODE_P2P_V2.
const bool use_v2transport(nLocalServices & NODE_P2P_V2);
ServiceFlags local_services = GetLocalServices();
const bool use_v2transport(local_services & NODE_P2P_V2);

CNode* pnode = new CNode(id,
std::move(sock),
Expand All @@ -1809,7 +1810,7 @@ void CConnman::CreateNodeFromAcceptedSocket(std::unique_ptr<Sock>&& sock,
.use_v2transport = use_v2transport,
});
pnode->AddRef();
m_msgproc->InitializeNode(*pnode, nLocalServices);
m_msgproc->InitializeNode(*pnode, local_services);
{
LOCK(m_nodes_mutex);
m_nodes.push_back(pnode);
Expand Down
10 changes: 8 additions & 2 deletions src/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -1221,6 +1221,11 @@ class CConnman
//! that peer during `net_processing.cpp:PushNodeVersion()`.
ServiceFlags GetLocalServices() const;

//! Updates the local services that this node advertises to other peers
//! during connection handshake.
void AddLocalServices(ServiceFlags services) { nLocalServices = ServiceFlags(nLocalServices | services); };
void RemoveLocalServices(ServiceFlags services) { nLocalServices = ServiceFlags(nLocalServices & ~services); }

uint64_t GetMaxOutboundTarget() const EXCLUSIVE_LOCKS_REQUIRED(!m_total_bytes_sent_mutex);
std::chrono::seconds GetMaxOutboundTimeframe() const;

Expand Down Expand Up @@ -1460,11 +1465,12 @@ class CConnman
* This data is replicated in each Peer instance we create.
*
* This data is not marked const, but after being set it should not
* change.
* change. Unless AssumeUTXO is started, in which case, the peer
* will be limited until the background chain sync finishes.
*
* \sa Peer::our_services
*/
ServiceFlags nLocalServices;
std::atomic<ServiceFlags> nLocalServices;

std::unique_ptr<CSemaphore> semOutbound;
std::unique_ptr<CSemaphore> semAddnode;
Expand Down
7 changes: 7 additions & 0 deletions src/rpc/blockchain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3042,6 +3042,13 @@ static RPCHelpMan loadtxoutset()
throw JSONRPCError(RPC_INTERNAL_ERROR, strprintf("Unable to load UTXO snapshot: %s. (%s)", util::ErrorString(activation_result).original, path.utf8string()));
}

// Because we can't provide historical blocks during tip or background sync.
// Update local services to reflect we are a limited peer until we are fully sync.
node.connman->RemoveLocalServices(NODE_NETWORK);
// Setting the limited state is usually redundant because the node can always
// provide the last 288 blocks, but it doesn't hurt to set it.
node.connman->AddLocalServices(NODE_NETWORK_LIMITED);

CBlockIndex& snapshot_index{*CHECK_NONFATAL(*activation_result)};

UniValue result(UniValue::VOBJ);
Expand Down
4 changes: 2 additions & 2 deletions src/validation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3575,8 +3575,8 @@ bool Chainstate::ActivateBestChain(BlockValidationState& state, std::shared_ptr<
//
// This cannot be done while holding cs_main (within
// MaybeCompleteSnapshotValidation) or a cs_main deadlock will occur.
if (m_chainman.restart_indexes) {
m_chainman.restart_indexes();
if (m_chainman.snapshot_download_completed) {
m_chainman.snapshot_download_completed();
}
break;
}
Expand Down
2 changes: 1 addition & 1 deletion src/validation.h
Original file line number Diff line number Diff line change
Expand Up @@ -977,7 +977,7 @@ class ChainstateManager

//! Function to restart active indexes; set dynamically to avoid a circular
//! dependency on `base/index.cpp`.
std::function<void()> restart_indexes = std::function<void()>();
std::function<void()> snapshot_download_completed = std::function<void()>();

const CChainParams& GetParams() const { return m_options.chainparams; }
const Consensus::Params& GetConsensus() const { return m_options.chainparams.GetConsensus(); }
Expand Down
114 changes: 113 additions & 1 deletion test/functional/feature_assumeutxo.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,30 @@
The assumeutxo value generated and used here is committed to in
`CRegTestParams::m_assumeutxo_data` in `src/kernel/chainparams.cpp`.
"""
import time
from shutil import rmtree

from dataclasses import dataclass
from test_framework.blocktools import (
create_block,
create_coinbase
)
from test_framework.messages import tx_from_hex
from test_framework.messages import (
CBlockHeader,
from_hex,
msg_headers,
tx_from_hex
)
from test_framework.p2p import (
P2PInterface,
)
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import (
assert_approx,
assert_equal,
assert_raises_rpc_error,
sha256sum_file,
try_rpc,
)
from test_framework.wallet import (
getnewdestination,
Expand Down Expand Up @@ -248,6 +258,74 @@ def test_snapshot_not_on_most_work_chain(self, dump_output_path):
node1.submitheader(main_block1)
node1.submitheader(main_block2)

def test_sync_from_assumeutxo_node(self, snapshot):
"""
This test verifies that:
1. An IBD node can sync headers from an AssumeUTXO node at any time.
2. IBD nodes do not request historical blocks from AssumeUTXO nodes while they are syncing the background-chain.
3. The assumeUTXO node dynamically adjusts the network services it offers according to its state.
4. IBD nodes can fully sync from AssumeUTXO nodes after they finish the background-chain sync.
"""
self.log.info("Testing IBD-sync from assumeUTXO node")
# Node2 starts clean and loads the snapshot.
# Node3 starts clean and seeks to sync-up from snapshot_node.
miner = self.nodes[0]
snapshot_node = self.nodes[2]
ibd_node = self.nodes[3]

# Start test fresh by cleaning up node directories
for node in (snapshot_node, ibd_node):
self.stop_node(node.index)
rmtree(node.chain_path)
self.start_node(node.index, extra_args=self.extra_args[node.index])

# Sync-up headers chain on snapshot_node to load snapshot
headers_provider_conn = snapshot_node.add_p2p_connection(P2PInterface())
headers_provider_conn.wait_for_getheaders()
msg = msg_headers()
for block_num in range(1, miner.getblockcount()+1):
msg.headers.append(from_hex(CBlockHeader(), miner.getblockheader(miner.getblockhash(block_num), verbose=False)))
headers_provider_conn.send_message(msg)

# Ensure headers arrived
default_value = {'status': ''} # No status
headers_tip_hash = miner.getbestblockhash()
self.wait_until(lambda: next(filter(lambda x: x['hash'] == headers_tip_hash, snapshot_node.getchaintips()), default_value)['status'] == "headers-only")
snapshot_node.disconnect_p2ps()

# Load snapshot
snapshot_node.loadtxoutset(snapshot['path'])

# Connect nodes and verify the ibd_node can sync-up the headers-chain from the snapshot_node
self.connect_nodes(ibd_node.index, snapshot_node.index)
snapshot_block_hash = snapshot['base_hash']
self.wait_until(lambda: next(filter(lambda x: x['hash'] == snapshot_block_hash, ibd_node.getchaintips()), default_value)['status'] == "headers-only")

# Once the headers-chain is synced, the ibd_node must avoid requesting historical blocks from the snapshot_node.
# If it does request such blocks, the snapshot_node will ignore requests it cannot fulfill, causing the ibd_node
# to stall. This stall could last for up to 10 min, ultimately resulting in an abrupt disconnection due to the
# ibd_node's perceived unresponsiveness.
time.sleep(3) # Sleep here because we can't detect when a node avoids requesting blocks from other peer.
assert_equal(len(ibd_node.getpeerinfo()[0]['inflight']), 0)

# Now disconnect nodes and finish background chain sync
self.disconnect_nodes(ibd_node.index, snapshot_node.index)
self.connect_nodes(snapshot_node.index, miner.index)
self.sync_blocks(nodes=(miner, snapshot_node))
# Check the base snapshot block was stored and ensure node signals full-node service support
self.wait_until(lambda: not try_rpc(-1, "Block not found", snapshot_node.getblock, snapshot_block_hash))
assert 'NETWORK' in snapshot_node.getnetworkinfo()['localservicesnames']

# Now the snapshot_node is sync, verify the ibd_node can sync from it
self.connect_nodes(snapshot_node.index, ibd_node.index)
assert 'NETWORK' in ibd_node.getpeerinfo()[0]['servicesnames']
self.sync_blocks(nodes=(ibd_node, snapshot_node))

def assert_only_network_limited_service(self, node):
node_services = node.getnetworkinfo()['localservicesnames']
assert 'NETWORK' not in node_services
assert 'NETWORK_LIMITED' in node_services

def run_test(self):
"""
Bring up two (disconnected) nodes, mine some new blocks on the first,
Expand Down Expand Up @@ -381,13 +459,20 @@ def check_dump_output(output):
self.test_snapshot_block_invalidated(dump_output['path'])
self.test_snapshot_not_on_most_work_chain(dump_output['path'])

# Prune-node sanity check
assert 'NETWORK' not in n1.getnetworkinfo()['localservicesnames']

self.log.info(f"Loading snapshot into second node from {dump_output['path']}")
# This node's tip is on an ancestor block of the snapshot, which should
# be the normal case
loaded = n1.loadtxoutset(dump_output['path'])
assert_equal(loaded['coins_loaded'], SNAPSHOT_BASE_HEIGHT)
assert_equal(loaded['base_height'], SNAPSHOT_BASE_HEIGHT)

self.log.info("Confirm that local services remain unchanged")
# Since n1 is a pruned node, the 'NETWORK' service flag must always be unset.
self.assert_only_network_limited_service(n1)

self.log.info("Check that UTXO-querying RPCs operate on snapshot chainstate")
snapshot_hash = loaded['tip_hash']
snapshot_num_coins = loaded['coins_loaded']
Expand Down Expand Up @@ -491,6 +576,9 @@ def check_tx_counts(final: bool) -> None:
self.restart_node(1, extra_args=[
f"-stopatheight={PAUSE_HEIGHT}", *self.extra_args[1]])

# Upon restart during snapshot tip sync, the node must remain in 'limited' mode.
self.assert_only_network_limited_service(n1)

# Finally connect the nodes and let them sync.
#
# Set `wait_for_connect=False` to avoid a race between performing connection
Expand All @@ -507,6 +595,9 @@ def check_tx_counts(final: bool) -> None:
self.log.info("Restarted node before snapshot validation completed, reloading...")
self.restart_node(1, extra_args=self.extra_args[1])

# Upon restart, the node must remain in 'limited' mode
self.assert_only_network_limited_service(n1)

# Send snapshot block to n1 out of order. This makes the test less
# realistic because normally the snapshot block is one of the last
# blocks downloaded, but its useful to test because it triggers more
Expand All @@ -525,6 +616,10 @@ def check_tx_counts(final: bool) -> None:
self.log.info("Ensuring background validation completes")
self.wait_until(lambda: len(n1.getchainstates()['chainstates']) == 1)

# Since n1 is a pruned node, it will not signal NODE_NETWORK after
# completing the background sync.
self.assert_only_network_limited_service(n1)

# Ensure indexes have synced.
completed_idx_state = {
'basic block filter index': COMPLETE_IDX,
Expand Down Expand Up @@ -555,12 +650,18 @@ def check_tx_counts(final: bool) -> None:

self.log.info("-- Testing all indexes + reindex")
assert_equal(n2.getblockcount(), START_HEIGHT)
assert 'NETWORK' in n2.getnetworkinfo()['localservicesnames'] # sanity check

self.log.info(f"Loading snapshot into third node from {dump_output['path']}")
loaded = n2.loadtxoutset(dump_output['path'])
assert_equal(loaded['coins_loaded'], SNAPSHOT_BASE_HEIGHT)
assert_equal(loaded['base_height'], SNAPSHOT_BASE_HEIGHT)

# Even though n2 is a full node, it will unset the 'NETWORK' service flag during snapshot loading.
# This indicates other peers that the node will temporarily not provide historical blocks.
self.log.info("Check node2 updated the local services during snapshot load")
self.assert_only_network_limited_service(n2)

for reindex_arg in ['-reindex=1', '-reindex-chainstate=1']:
self.log.info(f"Check that restarting with {reindex_arg} will delete the snapshot chainstate")
self.restart_node(2, extra_args=[reindex_arg, *self.extra_args[2]])
Expand All @@ -584,13 +685,21 @@ def check_tx_counts(final: bool) -> None:
msg = "Unable to load UTXO snapshot: Can't activate a snapshot-based chainstate more than once"
assert_raises_rpc_error(-32603, msg, n2.loadtxoutset, dump_output['path'])

# Upon restart, the node must stay in 'limited' mode until the background
# chain sync completes.
self.restart_node(2, extra_args=self.extra_args[2])
self.assert_only_network_limited_service(n2)

self.connect_nodes(0, 2)
self.wait_until(lambda: n2.getchainstates()['chainstates'][-1]['blocks'] == FINAL_HEIGHT)
self.sync_blocks(nodes=(n0, n2))

self.log.info("Ensuring background validation completes")
self.wait_until(lambda: len(n2.getchainstates()['chainstates']) == 1)

# Once background chain sync completes, the full node must start offering historical blocks again.
assert {'NETWORK', 'NETWORK_LIMITED'}.issubset(n2.getnetworkinfo()['localservicesnames'])

completed_idx_state = {
'basic block filter index': COMPLETE_IDX,
'coinstatsindex': COMPLETE_IDX,
Expand Down Expand Up @@ -625,6 +734,9 @@ def check_tx_counts(final: bool) -> None:

self.test_snapshot_in_a_divergent_chain(dump_output['path'])

# The following test cleans node2 and node3 chain directories.
self.test_sync_from_assumeutxo_node(snapshot=dump_output)

@dataclass
class Block:
hash: str
Expand Down

0 comments on commit 349632e

Please sign in to comment.