Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ASIO examples #29

Open
ashvardanian opened this issue Jan 25, 2025 · 0 comments
Open

Fix ASIO examples #29

ashvardanian opened this issue Jan 25, 2025 · 0 comments
Labels
bug Something isn't working good first issue Good for newcomers help wanted Extra attention is needed

Comments

@ashvardanian
Copy link
Owner

I'm not a massive fan of ASIO, Boost.ASIO and the NetworkingTS that builds on top of them. I'm also not a great user. My current implementation on the asio-uring-web-server branch doesn't work. I'd love to see someone suggesting a better way to use it 🤗

The current draft looks like this:

#include <asio.hpp>

class rpc_asio_server {

    asio::io_context &context_;
    asio::ip::udp::socket socket_;

    /// @brief Buffers, one per concurrent request
    std::vector<rpc_buffer_t> buffers_;
    /// @brief Where did the packets come from
    std::vector<asio::ip::udp::endpoint> clients_;
    /// @brief Flag to stop the server without corrupting the state
    std::atomic_bool should_stop_;
    /// @brief Maximum time for this entire batch
    std::chrono::microseconds max_cycle_duration_;

    std::size_t failed_receptions_ = 0;
    std::size_t failed_responses_ = 0;

  public:
    rpc_asio_server(                                                           //
        asio::io_context &ctx, std::string const &address, std::uint16_t port, //
        std::size_t max_concurrency, std::chrono::microseconds max_cycle_duration)
        : context_(ctx), socket_(context_, asio::ip::udp::endpoint(asio::ip::make_address(address), port)),
          buffers_(max_concurrency), clients_(max_concurrency), max_cycle_duration_(max_cycle_duration) {}

    void stop() { should_stop_.store(true, std::memory_order_relaxed); }

    void operator()() {
        while (!should_stop_.load(std::memory_order_relaxed)) one_batch();
    }

    void one_batch() {
        // For per-operation cancellations we could use the `asio::cancellation_signal`,
        // but this is the simple lucky case when we only want to cancel all the outstanding
        // transfers at once.
        std::atomic<std::size_t> remaining = 0;
        for (std::size_t job = 0; job < buffers_.size(); ++job, ++remaining) {
            auto finalize = [this, &remaining](std::error_code error, std::size_t) {
                remaining--;
                if (error) failed_responses_++;
            };
            auto respond = [this, job, finalize, &remaining](std::error_code error, std::size_t bytes) {
                if (error) { remaining--; }
                else { socket_.async_send_to(asio::buffer(buffers_[job], bytes), clients_[job], finalize); }
            };
            socket_.async_receive_from(asio::buffer(buffers_[job]), clients_[job], respond);
        }
        std::chrono::steady_clock::time_point expiry = std::chrono::steady_clock::now() + max_cycle_duration_;
        asio::steady_timer timer(context_, expiry);
        timer.wait();
        if (remaining) socket_.cancel(); // Forcibly abort all ops on this socket
    }
};

class rpc_asio_client {

    asio::io_context &context_;
    asio::ip::udp::socket socket_;
    asio::ip::udp::endpoint server_;

    /// @brief Buffers, one per concurrent request
    std::vector<rpc_buffer_t> buffers_;
    /// @brief Track the send timestamps for each slot to measure latency
    std::vector<std::chrono::steady_clock::time_point> send_times_;
    /// @brief Maximum time for this entire batch
    std::chrono::microseconds max_cycle_duration_;

  public:
    rpc_asio_client(                                                               //
        asio::io_context &ctx, std::string const &server_addr, std::uint16_t port, //
        std::size_t concurrency, std::chrono::microseconds max_cycle_duration)
        : context_(ctx), socket_(ctx, asio::ip::udp::endpoint(asio::ip::udp::v4(), 0)), buffers_(concurrency),
          send_times_(concurrency), max_cycle_duration_(max_cycle_duration) {

        // Resolve the server address
        asio::ip::udp::resolver resolver(context_);
        asio::ip::udp::resolver::results_type endpoints = resolver.resolve(server_addr, std::to_string(port));
        server_ = *endpoints.begin(); // Take the first resolved endpoint

        // Fill each buffer with some pattern (just 'X's, for example)
        for (auto &buf : buffers_) buf.fill('X');
    }

    rpc_batch_result operator()() { return one_batch(); }

  private:
    rpc_batch_result one_batch() {
        rpc_batch_result result;

        // For per-operation cancellations we could use the `asio::cancellation_signal`,
        // but this is the simple lucky case when we only want to cancel all the outstanding
        // transfers at once.
        std::atomic<std::size_t> remaining = 0;
        for (std::size_t job = 0; job < buffers_.size(); ++job, ++remaining) {
            send_times_[job] = std::chrono::steady_clock::now();
            auto finalize = [this, job, &result, &remaining](std::error_code error, std::size_t) {
                remaining--;
                if (error) return;

                // Measure latency
                auto response_time = std::chrono::steady_clock::now();
                auto diff = response_time - send_times_[job];
                result.batch_latency += diff;
                result.max_packet_latency = std::max(result.max_packet_latency, diff);
                result.received_packets++;
            };
            auto receive = [this, job, finalize, &remaining](std::error_code error, std::size_t bytes) {
                if (error) { remaining--; }
                else { socket_.async_receive_from(asio::buffer(buffers_[job], bytes), server_, finalize); }
            };
            socket_.async_send_to(asio::buffer(buffers_[job]), server_, receive);
            result.sent_packets++;
        }
        std::chrono::steady_clock::time_point expiry = std::chrono::steady_clock::now() + max_cycle_duration_;
        asio::steady_timer timer(context_, expiry);
        timer.wait();
        if (remaining) socket_.cancel(); // Forcibly abort all ops on this socket
        return result;
    }
};
@ashvardanian ashvardanian added bug Something isn't working good first issue Good for newcomers help wanted Extra attention is needed labels Jan 25, 2025
@ashvardanian ashvardanian linked a pull request Jan 25, 2025 that will close this issue
3 tasks
@ashvardanian ashvardanian removed a link to a pull request Jan 27, 2025
3 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working good first issue Good for newcomers help wanted Extra attention is needed
Projects
None yet
Development

No branches or pull requests

1 participant