Skip to content

Commit

Permalink
[release/4.x] Cherry pick: Timeout idle node-to-node channels (#5266) (
Browse files Browse the repository at this point in the history
  • Loading branch information
CCF [bot] authored May 19, 2023
1 parent 1b0e89d commit f0f8915
Show file tree
Hide file tree
Showing 7 changed files with 244 additions and 24 deletions.
2 changes: 1 addition & 1 deletion .threading_canary
Original file line number Diff line number Diff line change
@@ -1 +1 @@
This looks like a "job" for Threading Canary!!!!!
THIS looks like a job for Threading Canary!!
3 changes: 3 additions & 0 deletions src/consensus/aft/test/logging_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,9 @@ namespace aft
}

void set_message_limit(size_t message_limit) override {}
void set_idle_timeout(std::chrono::milliseconds idle_timeout) override {}

void tick(std::chrono::milliseconds elapsed) override {}

bool recv_authenticated_with_load(
const ccf::NodeId& from, const uint8_t*& data, size_t& size) override
Expand Down
6 changes: 6 additions & 0 deletions src/enclave/enclave.h
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,12 @@ namespace ccf

node->set_n2n_message_limit(ccf_config_.node_to_node_message_limit);

// If we haven't heard from a node for multiple elections, then cleanup
// their node-to-node channel
const auto idle_timeout =
std::chrono::milliseconds(ccf_config_.consensus.election_timeout) * 4;
node->set_n2n_idle_timeout(idle_timeout);

ccf::NodeCreateInfo r;
try
{
Expand Down
7 changes: 7 additions & 0 deletions src/node/node_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -1559,6 +1559,8 @@ namespace ccf
const auto tx_id = consensus->get_committed_txid();
indexer->update_strategies(elapsed, {tx_id.first, tx_id.second});
}

n2n_channels->tick(elapsed);
}

void tick_end()
Expand Down Expand Up @@ -2596,6 +2598,11 @@ namespace ccf
n2n_channels->set_message_limit(message_limit);
}

void set_n2n_idle_timeout(std::chrono::milliseconds idle_timeout)
{
n2n_channels->set_idle_timeout(idle_timeout);
}

virtual const StartupConfig& get_node_config() const override
{
return config;
Expand Down
3 changes: 3 additions & 0 deletions src/node/node_to_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,5 +141,8 @@ namespace ccf
size_t size) = 0;

virtual void set_message_limit(size_t message_limit) = 0;
virtual void set_idle_timeout(std::chrono::milliseconds idle_timeout) = 0;

virtual void tick(std::chrono::milliseconds elapsed) = 0;
};
}
100 changes: 77 additions & 23 deletions src/node/node_to_node_channel_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@ namespace ccf
ringbuffer::AbstractWriterFactory& writer_factory;
ringbuffer::WriterPtr to_host;

std::unordered_map<NodeId, std::shared_ptr<Channel>> channels;
struct ChannelInfo
{
std::shared_ptr<Channel> channel;
std::chrono::milliseconds idle_time;
};

std::unordered_map<NodeId, ChannelInfo> channels;
ccf::pal::Mutex lock; //< Protects access to channels map

struct ThisNode
Expand All @@ -36,6 +42,10 @@ namespace ccf
std::nullopt;
#endif

// This is set during node startup, using a value derived from the run-time
// configuration. Before that, no timeout applies
std::optional<std::chrono::milliseconds> idle_timeout = std::nullopt;

std::shared_ptr<Channel> get_channel(const NodeId& peer_id)
{
CCF_ASSERT_FMT(
Expand All @@ -55,21 +65,23 @@ namespace ccf
auto search = channels.find(peer_id);
if (search != channels.end())
{
return search->second;
auto& channel_info = search->second;
channel_info.idle_time = std::chrono::milliseconds(0);
return channel_info.channel;
}

// Create channel
channels.try_emplace(
auto channel = std::make_shared<Channel>(
writer_factory,
this_node->service_cert,
this_node->node_kp,
this_node->endorsed_node_cert.value(),
this_node->node_id,
peer_id,
std::make_shared<Channel>(
writer_factory,
this_node->service_cert,
this_node->node_kp,
this_node->endorsed_node_cert.value(),
this_node->node_id,
peer_id,
message_limit.value()));
return channels.at(peer_id);
message_limit.value());
auto info = ChannelInfo{channel, std::chrono::milliseconds(0)};
channels.try_emplace(peer_id, info);
return channel;
}

public:
Expand Down Expand Up @@ -116,6 +128,41 @@ namespace ccf
message_limit = message_limit_;
}

void set_idle_timeout(std::chrono::milliseconds idle_timeout_) override
{
idle_timeout = idle_timeout_;
}

void tick(std::chrono::milliseconds elapsed) override
{
std::lock_guard<ccf::pal::Mutex> guard(lock);

if (idle_timeout.has_value())
{
// Close idle channels
auto it = channels.begin();
while (it != channels.end())
{
const auto idle_time = it->second.idle_time += elapsed;
if (idle_time < idle_timeout.value())
{
++it;
}
else
{
LOG_DEBUG_FMT(
"Closing idle channel to node {}. Was idle for {}, threshold for "
"closure is {}",
it->first,
idle_time,
idle_timeout.value());
it->second.channel->close_channel();
it = channels.erase(it);
}
}
}
}

virtual void associate_node_address(
const NodeId& peer_id,
const std::string& peer_hostname,
Expand All @@ -129,22 +176,12 @@ namespace ccf
peer_service);
}

void close_channel(const NodeId& peer_id) override
{
get_channel(peer_id)->close_channel();
}

bool have_channel(const ccf::NodeId& nid) override
{
std::lock_guard<ccf::pal::Mutex> guard(lock);
return channels.find(nid) != channels.end();
}

bool channel_open(const NodeId& peer_id)
{
return get_channel(peer_id)->channel_open();
}

bool send_authenticated(
const NodeId& to,
NodeMsgType type,
Expand Down Expand Up @@ -232,10 +269,27 @@ namespace ccf
return get_channel(from)->recv_key_exchange_message(data, size);
}

// NB: Only used by tests!
// NB: Following methods are only used by tests!
bool recv_channel_message(const NodeId& from, std::vector<uint8_t>&& body)
{
return recv_channel_message(from, body.data(), body.size());
}

void close_channel(const NodeId& peer_id) override
{
std::lock_guard<ccf::pal::Mutex> guard(lock);

auto search = channels.find(peer_id);
if (search != channels.end())
{
search->second.channel->close_channel();
channels.erase(search);
}
}

bool channel_open(const NodeId& peer_id)
{
return get_channel(peer_id)->channel_open();
}
};
}
147 changes: 147 additions & 0 deletions src/node/test/channels.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1490,3 +1490,150 @@ TEST_CASE_FIXTURE(IORingbuffersFixture, "Key rotation")
REQUIRE(received_by_1 == expected_received_by_1);
REQUIRE(received_by_2 == expected_received_by_2);
}

TEST_CASE_FIXTURE(IORingbuffersFixture, "Timeout idle channels")
{
auto network_kp = crypto::make_key_pair(default_curve);
auto service_cert = generate_self_signed_cert(network_kp, "CN=Network");

auto channel1_kp = crypto::make_key_pair(default_curve);
auto channel1_cert =
generate_endorsed_cert(channel1_kp, "CN=Node1", network_kp, service_cert);

auto channel2_kp = crypto::make_key_pair(default_curve);
auto channel2_cert =
generate_endorsed_cert(channel2_kp, "CN=Node2", network_kp, service_cert);

const auto idle_timeout = std::chrono::milliseconds(10);
const auto not_quite_idle = 2 * idle_timeout / 3;

auto channels1 = NodeToNodeChannelManager(wf1);
channels1.initialize(nid1, service_cert, channel1_kp, channel1_cert);
channels1.set_idle_timeout(idle_timeout);

auto channels2 = NodeToNodeChannelManager(wf2);
channels2.initialize(nid2, service_cert, channel2_kp, channel2_cert);
channels2.set_idle_timeout(idle_timeout);

MsgType msg;
msg.fill(0x42);

{
INFO("Idle channels are destroyed");
REQUIRE_FALSE(channels1.have_channel(nid2));
REQUIRE(channels1.send_authenticated(
nid2, NodeMsgType::consensus_msg, msg.begin(), msg.size()));

REQUIRE_FALSE(channels2.have_channel(nid1));
REQUIRE(channels2.send_authenticated(
nid1, NodeMsgType::consensus_msg, msg.begin(), msg.size()));

REQUIRE(channels1.have_channel(nid2));
REQUIRE(channels2.have_channel(nid1));

channels1.tick(not_quite_idle);
REQUIRE(channels1.have_channel(nid2));
REQUIRE(channels2.have_channel(nid1));

channels1.tick(not_quite_idle);
REQUIRE_FALSE(channels1.have_channel(nid2));
REQUIRE(channels2.have_channel(nid1));

channels2.tick(idle_timeout);
REQUIRE_FALSE(channels1.have_channel(nid2));
REQUIRE_FALSE(channels2.have_channel(nid1));

// Flush previous messages
read_outbound_msgs<MsgType>(eio1);
read_outbound_msgs<MsgType>(eio2);
}

// Send some messages from 1 to 2. Confirm that those keep the channel (on
// both ends) from being destroyed
bool handshake_complete = false;

for (size_t i = 0; i < 20; ++i)
{
REQUIRE(channels1.send_authenticated(
nid2, NodeMsgType::consensus_msg, msg.begin(), msg.size()));

auto msgs = read_outbound_msgs<MsgType>(eio1);
for (const auto& msg : msgs)
{
switch (msg.type)
{
case NodeMsgType::channel_msg:
{
channels2.recv_channel_message(msg.from, msg.data());
break;
}
case NodeMsgType::consensus_msg:
{
auto hdr = msg.authenticated_hdr;
const auto* data = msg.payload.data();
auto size = msg.payload.size();

REQUIRE(channels2.recv_authenticated(
msg.from, {hdr.data(), hdr.size()}, data, size));
break;
}
default:
{
REQUIRE(false);
}
}
}

if (!handshake_complete)
{
// Deliver any responses from 2 to 1, to complete handshake
msgs = read_outbound_msgs<MsgType>(eio2);
if (msgs.empty())
{
handshake_complete = true;
}
else
{
for (const auto& msg : msgs)
{
switch (msg.type)
{
case NodeMsgType::channel_msg:
{
channels1.recv_channel_message(msg.from, msg.data());
break;
}
default:
{
REQUIRE(false);
}
}
}
}
}

{
INFO("Sends preserve channels");
REQUIRE(channels1.have_channel(nid2));
}

{
INFO("Receives preserve channels");
REQUIRE(channels2.have_channel(nid1));
}

channels1.tick(not_quite_idle);
channels2.tick(not_quite_idle);
}

REQUIRE(handshake_complete);

{
INFO("After comms, channels may still close due to idleness");
channels1.tick(not_quite_idle);
REQUIRE_FALSE(channels1.have_channel(nid2));

channels2.tick(not_quite_idle);
REQUIRE_FALSE(channels2.have_channel(nid1));
}
}

0 comments on commit f0f8915

Please sign in to comment.