diff --git a/src/ds/non_blocking.h b/src/ds/non_blocking.h index 99026e501104..fa2e0a65bb0a 100644 --- a/src/ds/non_blocking.h +++ b/src/ds/non_blocking.h @@ -102,10 +102,12 @@ namespace ringbuffer { for (auto& it : pending) { - const auto buffer_end = it.buffer.data() + it.buffer.size(); - if ( - it.marker == marker.value() && - marker.value() != reinterpret_cast(buffer_end)) + if (it.finished) + { + continue; + } + + if (it.marker == marker.value()) { // This is a pending write - dump data directly to write marker, // which should be within the appropriate buffer @@ -118,6 +120,7 @@ namespace ringbuffer (size_t)it.buffer.data())); } + const auto buffer_end = it.buffer.data() + it.buffer.size(); if (dest + size > buffer_end) { throw std::runtime_error(fmt::format( @@ -128,7 +131,11 @@ namespace ringbuffer (size_t)buffer_end)); } - std::memcpy(dest, bytes, size); + if (size != 0) + { + std::memcpy(dest, bytes, size); + } + dest += size; it.marker = (size_t)dest; return {it.marker}; diff --git a/src/node/channels.h b/src/node/channels.h index 909eda4efc91..352712be6703 100644 --- a/src/node/channels.h +++ b/src/node/channels.h @@ -145,9 +145,6 @@ namespace ccf } } - class KeyExchangeProtocol - {}; - // Key exchange states are: // - Have nothing // - Initiated (have my own share) @@ -251,6 +248,8 @@ namespace ccf send_key = nullptr; recv_key = nullptr; + reset_key_exchange(); + initiate(); } } @@ -542,6 +541,8 @@ namespace ccf // shares back to the initiator send_key_exchange_response(); + flush_pending_outgoing(); + return true; } @@ -633,6 +634,8 @@ namespace ccf send_key_exchange_final(); + flush_pending_outgoing(); + update_recv_key(); establish(); @@ -804,7 +807,10 @@ namespace ccf "Node certificate serial numbers: node={} peer={}", node_cv->serial_number(), peer_cv->serial_number()); + } + void flush_pending_outgoing() + { if (outgoing_consensus_msg.has_value()) { send_unsafe( diff --git a/src/node/test/channels.cpp b/src/node/test/channels.cpp index 506c17a7b271..20885ca31e6a 100644 --- a/src/node/test/channels.cpp +++ b/src/node/test/channels.cpp @@ -30,7 +30,14 @@ namespace ccf::enclavetime namespace ccf { - std::chrono::microseconds Channel::min_gap_between_initiation_attempts(0); + std::chrono::microseconds Channel::min_gap_between_initiation_attempts(5'000); +} + +void sleep_to_reinitiate() +{ + ccf::enclavetime::last_value.store( + ccf::enclavetime::last_value.load() + + 2 * ccf::Channel::min_gap_between_initiation_attempts); } std::unique_ptr @@ -267,6 +274,7 @@ TEST_CASE_FIXTURE(IORingbuffersFixture, "Client/Server key exchange") // Queue 2 messages on channel1 REQUIRE(channels1.send_authenticated( nid2, NodeMsgType::consensus_msg, msg.begin(), msg.size())); + sleep_to_reinitiate(); REQUIRE(channels1.send_authenticated( nid2, NodeMsgType::consensus_msg, msg.begin(), msg.size())); } @@ -1069,6 +1077,7 @@ TEST_CASE_FIXTURE(IORingbuffersFixture, "Stuttering handshake") nid2, NodeMsgType::forwarded_msg, {aad.begin(), aad.size()}, msg_body)); INFO("Send a second request, triggering a second handshake"); + sleep_to_reinitiate(); REQUIRE(channels1.send_encrypted( nid2, NodeMsgType::forwarded_msg, {aad.begin(), aad.size()}, msg_body)); @@ -1208,8 +1217,10 @@ TEST_CASE_FIXTURE(IORingbuffersFixture, "Robust key exchange") channels1.send_encrypted( nid2, NodeMsgType::consensus_msg, {aad.data(), aad.size()}, payload); + sleep_to_reinitiate(); channels1.send_encrypted( nid3, NodeMsgType::consensus_msg, {aad.data(), aad.size()}, payload); + sleep_to_reinitiate(); channels1.send_encrypted( nid2, NodeMsgType::consensus_msg, {aad.data(), aad.size()}, payload); @@ -1225,8 +1236,10 @@ TEST_CASE_FIXTURE(IORingbuffersFixture, "Robust key exchange") channels1.close_channel(nid3); channels1.send_encrypted( nid2, NodeMsgType::consensus_msg, {aad.data(), aad.size()}, payload); + sleep_to_reinitiate(); channels1.send_encrypted( nid3, NodeMsgType::consensus_msg, {aad.data(), aad.size()}, payload); + sleep_to_reinitiate(); channels1.send_encrypted( nid2, NodeMsgType::consensus_msg, {aad.data(), aad.size()}, payload); @@ -1242,8 +1255,10 @@ TEST_CASE_FIXTURE(IORingbuffersFixture, "Robust key exchange") channels2.send_encrypted( nid1, NodeMsgType::consensus_msg, {aad.data(), aad.size()}, payload); + sleep_to_reinitiate(); channels2.send_encrypted( nid1, NodeMsgType::consensus_msg, {aad.data(), aad.size()}, payload); + sleep_to_reinitiate(); channels2.send_encrypted( nid3, NodeMsgType::consensus_msg, {aad.data(), aad.size()}, payload); @@ -1403,58 +1418,53 @@ TEST_CASE_FIXTURE(IORingbuffersFixture, "Robust key exchange") } // Run separate threads simulating each node, sending many messages in both -// direction. Goal is that the message stream is uninterrupted, despite multiple -// key rotation exchanges happening during the sequence +// direction. Goal is that the message stream is largely uninterrupted, despite +// multiple key rotation exchanges happening during the sequence TEST_CASE_FIXTURE(IORingbuffersFixture, "Key rotation") { auto network_kp = ccf::crypto::make_key_pair(default_curve); auto service_cert = generate_self_signed_cert(network_kp, "CN=Network"); - MsgType aad; - aad.fill(0x42); - - struct QueueWithLock - { - std::mutex lock; - std::vector>> to_send; - }; + using SendQueue = std::queue>; - using ReceivedMessages = std::vector>; + using ReceivedMessages = std::vector>>; static constexpr auto message_limit = 40; + static constexpr auto messages_each = 5 * message_limit; + + std::atomic finished_reading = 0; + std::atomic workers_stop = false; - std::atomic finished = false; - auto run_channel = [&]( - ccf::NodeId my_node_id, - ringbuffer::Circuit& source_buffer, - ringbuffer::AbstractWriterFactory& writer_factory, - QueueWithLock& send_queue, - ReceivedMessages& received_results) { - auto kp = ccf::crypto::make_key_pair(default_curve); - auto cert = generate_endorsed_cert( - kp, fmt::format("CN={}", my_node_id), network_kp, service_cert); - - auto channels = NodeToNodeChannelManager(writer_factory); - channels.initialize(my_node_id, service_cert, kp, cert); - channels.set_message_limit(message_limit); - - while (!finished) + struct TmpChannel + { + ccf::NodeId my_node_id; + ccf::NodeId peer_node_id; + ringbuffer::Circuit& source_buffer; + ringbuffer::NonBlockingWriterFactory& nbwf; + NodeToNodeChannelManager& channels; + SendQueue& send_queue; + + ReceivedMessages received_results; + + TmpChannel( + ccf::NodeId my_node_id_, + ccf::NodeId peer_node_id_, + ringbuffer::Circuit& source_buffer_, + ringbuffer::NonBlockingWriterFactory& nbwf_, + NodeToNodeChannelManager& channels_, + SendQueue& send_queue_) : + my_node_id(my_node_id_), + peer_node_id(peer_node_id_), + source_buffer(source_buffer_), + nbwf(nbwf_), + channels(channels_), + send_queue(send_queue_) + {} + + void process(std::atomic& signal_when_done, bool wrap_it_up = false) { - { - // Send any new messages added to your work queue - std::lock_guard guard(send_queue.lock); - for (auto& queued_msg : send_queue.to_send) - { - auto peer_id = queued_msg.first; - auto& msg_body = queued_msg.second; - REQUIRE(channels.send_encrypted( - peer_id, - NodeMsgType::forwarded_msg, - {aad.begin(), aad.size()}, - msg_body)); - } - send_queue.to_send.clear(); - } + MsgType aad; + aad.fill(0x42); // Read and process all messages from peer auto msgs = read_outbound_msgs(source_buffer); @@ -1469,14 +1479,31 @@ TEST_CASE_FIXTURE(IORingbuffersFixture, "Key rotation") break; } + case consensus_msg: + { + break; + } + case forwarded_msg: { - auto decrypted = channels.recv_encrypted( - msg.from, - {msg.authenticated_hdr.data(), msg.authenticated_hdr.size()}, - msg.payload.data(), - msg.payload.size()); - received_results.emplace_back(decrypted); + try + { + auto decrypted = channels.recv_encrypted( + msg.from, + {msg.authenticated_hdr.data(), msg.authenticated_hdr.size()}, + msg.payload.data(), + msg.payload.size()); + received_results.emplace_back(decrypted); + } + catch (const ccf::NodeToNode::DroppedMessageException& e) + { + received_results.emplace_back(std::nullopt); + } + + if (received_results.size() == messages_each) + { + ++signal_when_done; + } break; } @@ -1487,109 +1514,180 @@ TEST_CASE_FIXTURE(IORingbuffersFixture, "Key rotation") } } - std::this_thread::sleep_for(std::chrono::milliseconds(1)); + // Send some messages from start of your work queue + while (!send_queue.empty()) + { + // Sometimes randomly give up on sending any more + if (!wrap_it_up && rand() % 3 == 0) + { + break; + } + + if (channels.send_encrypted( + peer_node_id, + NodeMsgType::forwarded_msg, + {aad.begin(), aad.size()}, + send_queue.front())) + { + send_queue.pop(); + } + else + { + break; + } + } + + if (wrap_it_up || rand() % 5 == 0) + { + // Occasionally send a dummy consensus msg to flush the pipes. + // Forwarded messages may be queued until something else comes along + // to push them, which in a real system is periodic consensus traffic + std::vector dummy_consensus_msg; + dummy_consensus_msg.push_back(0x12); + channels.send_authenticated( + peer_node_id, + ccf::NodeMsgType::consensus_msg, + dummy_consensus_msg.data(), + dummy_consensus_msg.size()); + + if (!channels.channel_open(peer_node_id)) + { + sleep_to_reinitiate(); + } + } + + LOG_INFO_FMT( + "{} (sent {}, received {}, goal is {})", + my_node_id, + messages_each - send_queue.size(), + received_results.size(), + messages_each); + + nbwf.flush_all_outbound(); } }; - QueueWithLock sent_by_1; + auto run_channel = [&](TmpChannel& tc) { + do + { + tc.process(finished_reading); + + std::this_thread::yield(); + } while (!workers_stop.load()); + }; + + SendQueue to_send_from_1; ringbuffer::NonBlockingWriterFactory nbwf1(wf1); - ReceivedMessages received_by_1; ReceivedMessages expected_received_by_1; - std::thread channels1( - run_channel, - std::ref(nid1), - std::ref(eio2), - std::ref(nbwf1), - std::ref(sent_by_1), - std::ref(received_by_1)); - - QueueWithLock sent_by_2; + + SendQueue to_send_from_2; ringbuffer::NonBlockingWriterFactory nbwf2(wf2); - ReceivedMessages received_by_2; ReceivedMessages expected_received_by_2; - std::thread channels2( - run_channel, - std::ref(nid2), - std::ref(eio1), - std::ref(nbwf2), - std::ref(sent_by_2), - std::ref(received_by_2)); // Submit a randomly generated workload - for (auto i = 0; i < 5 * message_limit; ++i) + for (auto i = 0; i < 2 * messages_each; ++i) { - ccf::NodeId peer_nid; - QueueWithLock* send_queue; - ReceivedMessages* expected_results; - if (rand() % 2 == 0) + std::vector msg_body(rand() % 20); + for (auto& n : msg_body) { - peer_nid = nid2; - send_queue = &sent_by_1; - expected_results = &expected_received_by_2; + n = rand(); } - else + + if (i < messages_each) { - peer_nid = nid1; - send_queue = &sent_by_2; - expected_results = &expected_received_by_1; + to_send_from_1.emplace(msg_body); + expected_received_by_2.emplace_back(msg_body); } - + else { - std::lock_guard guard(send_queue->lock); - std::vector msg_body(rand() % 20); - for (auto& n : msg_body) - { - n = rand(); - } - send_queue->to_send.emplace_back(std::make_pair(peer_nid, msg_body)); - expected_results->emplace_back(msg_body); + to_send_from_2.emplace(msg_body); + expected_received_by_1.emplace_back(msg_body); } - - // If we do not sleep here, we submit many messages at once, resulting in a - // node's pending send queue filling up and a failure result when it calls - // `send_encrypted`. A real application should handle this error code, and - // set parameters so it is extremely rare. Here we simply sleep so that the - // sends happen one-at-a-time. - std::this_thread::sleep_for(std::chrono::milliseconds(20)); } - // Wait for channel threads to fully catch up. - constexpr int wait_finish_attempts = 100; - for (int attempt = 0; attempt < wait_finish_attempts; attempt++) + auto kp1 = ccf::crypto::make_key_pair(default_curve); + NodeToNodeChannelManager channels1(nbwf1); + channels1.initialize( + nid1, + service_cert, + kp1, + generate_endorsed_cert( + kp1, fmt::format("CN={}", nid1), network_kp, service_cert)); + channels1.set_message_limit(message_limit); + TmpChannel tc1(nid1, nid2, eio2, nbwf1, channels1, to_send_from_1); + + auto kp2 = ccf::crypto::make_key_pair(default_curve); + NodeToNodeChannelManager channels2(nbwf2); + channels2.initialize( + nid2, + service_cert, + kp2, + generate_endorsed_cert( + kp2, fmt::format("CN={}", nid2), network_kp, service_cert)); + TmpChannel tc2(nid2, nid1, eio1, nbwf2, channels2, to_send_from_2); + + std::thread thread1(run_channel, std::ref(tc1)); + std::thread thread2(run_channel, std::ref(tc2)); + + // Run in parallel threads for a while + std::chrono::milliseconds elapsed(0); + const std::chrono::milliseconds timeout(500); + + while (finished_reading.load() < 2 && elapsed < timeout) { - bool skip_waiting{true}; - { - std::lock_guard guard(sent_by_1.lock); - skip_waiting &= sent_by_1.to_send.empty(); - } - { - std::lock_guard guard(sent_by_2.lock); - skip_waiting &= sent_by_2.to_send.empty(); - } - if (skip_waiting) - { - break; - } - std::this_thread::sleep_for(std::chrono::milliseconds(10)); + constexpr auto sleep_time = std::chrono::milliseconds(10); + std::this_thread::sleep_for(sleep_time); + elapsed += sleep_time; } - finished.store(true); + LOG_INFO_FMT("Exited main loop"); - channels1.join(); - channels2.join(); + workers_stop.store(true); + thread1.join(); + thread2.join(); + + // Run a few more iterations manually interleaved, simulating a synchronous + // period, to reach quiescence + static constexpr size_t worst_case = 2 * messages_each / 10; + for (auto i = 0; i < worst_case; ++i) { - std::lock_guard guard(sent_by_1.lock); - REQUIRE(sent_by_1.to_send.empty()); - } - { - std::lock_guard guard(sent_by_2.lock); - REQUIRE(sent_by_2.to_send.empty()); + LOG_INFO_FMT("Catchup loop #{}/{}", i, worst_case); + tc1.process(finished_reading, true); + tc2.process(finished_reading, true); + nbwf1.flush_all_outbound(); + nbwf2.flush_all_outbound(); + + if ( + to_send_from_1.empty() && to_send_from_2.empty() && + finished_reading.load() == 2) + { + LOG_INFO_FMT("Early out after {}/{} iterations\n", i, worst_case); + break; + } } + REQUIRE(to_send_from_1.empty()); + REQUIRE(to_send_from_2.empty()); + // Validate results - REQUIRE(received_by_1 == expected_received_by_1); - REQUIRE(received_by_2 == expected_received_by_2); + auto equal_modulo_holes = + [](const ReceivedMessages& actual, const ReceivedMessages& expected) { + REQUIRE(actual.size() == expected.size()); + REQUIRE(actual.size() == messages_each); + size_t i = 0; + for (const auto& msg_opt : actual) + { + if (msg_opt.has_value()) + { + REQUIRE(msg_opt == expected[i]); + } + ++i; + } + }; + + equal_modulo_holes(tc1.received_results, expected_received_by_1); + equal_modulo_holes(tc2.received_results, expected_received_by_2); } TEST_CASE_FIXTURE(IORingbuffersFixture, "Timeout idle channels")