diff options
Diffstat (limited to 'host/lib')
| -rw-r--r-- | host/lib/include/uhdlib/transport/offload_io_service.hpp | 66 | ||||
| -rw-r--r-- | host/lib/transport/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | host/lib/transport/offload_io_service.cpp | 998 | 
3 files changed, 1065 insertions, 0 deletions
diff --git a/host/lib/include/uhdlib/transport/offload_io_service.hpp b/host/lib/include/uhdlib/transport/offload_io_service.hpp new file mode 100644 index 000000000..a7d9d211d --- /dev/null +++ b/host/lib/include/uhdlib/transport/offload_io_service.hpp @@ -0,0 +1,66 @@ +// +// Copyright 2019 Ettus Research, a National Instruments brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_UHDLIB_TRANSPORT_OFFLOAD_IO_SERVICE_HPP +#define INCLUDED_UHDLIB_TRANSPORT_OFFLOAD_IO_SERVICE_HPP + +#include <uhdlib/transport/io_service.hpp> + +namespace uhd { namespace transport { + +/*! + * I/O service with offload thread + * + * Note: This I/O service can only be used with transports that allow releasing + * frame buffers out of order, since flow control packets are handled entirely + * within the offload thread. + */ +class offload_io_service : public io_service +{ +public: +    enum client_type_t +    { +        RECV_ONLY, +        SEND_ONLY, +        BOTH_SEND_AND_RECV +    }; + +    enum wait_mode_t +    { +        POLL, +        BLOCK +    }; + +    /*! +     * Options for configuring offload I/O service +     */ +    struct params_t +    { +        //! Array of CPU numbers to which to affinitize the offload thread. +        std::vector<size_t> cpu_affinity_list; +        //! The types of client that the I/O service needs to support. +        client_type_t client_type = BOTH_SEND_AND_RECV; +        //! The thread behavior when waiting for incoming packets If set to +        //! BLOCK, the client type must be set to either RECV_ONLY or SEND_ONLY. +        wait_mode_t wait_mode = POLL; +    }; + +    /*! +     * Creates an io service that offloads I/O to a worker thread and +     * passes configuration parameters to it. +     * +     *  \param io_srv The io service to perform the actual work in the worker +     *                thread. +     *  \param params Parameters to pass to the offload I/O service. +     *  \return A composite I/O service that executes the provided io service +     *          in its own thread. +     */ +    static sptr make(io_service::sptr io_srv, const params_t& params); +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_UHDLIB_TRANSPORT_OFFLOAD_IO_SERVICE_HPP */ diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt index d21644f01..d39ca7336 100644 --- a/host/lib/transport/CMakeLists.txt +++ b/host/lib/transport/CMakeLists.txt @@ -123,6 +123,7 @@ LIBUHD_APPEND_SOURCES(      ${CMAKE_CURRENT_SOURCE_DIR}/muxed_zero_copy_if.cpp      ${CMAKE_CURRENT_SOURCE_DIR}/zero_copy_flow_ctrl.cpp      ${CMAKE_CURRENT_SOURCE_DIR}/inline_io_service.cpp +    ${CMAKE_CURRENT_SOURCE_DIR}/offload_io_service.cpp      ${CMAKE_CURRENT_SOURCE_DIR}/adapter.cpp  ) diff --git a/host/lib/transport/offload_io_service.cpp b/host/lib/transport/offload_io_service.cpp new file mode 100644 index 000000000..ed28a93f9 --- /dev/null +++ b/host/lib/transport/offload_io_service.cpp @@ -0,0 +1,998 @@ +// +// Copyright 2019 Ettus Research, a National Instruments brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include <uhd/config.hpp> +#include <uhd/exception.hpp> +#include <uhd/utils/log.hpp> +#include <uhd/utils/thread.hpp> +#include <uhdlib/transport/offload_io_service.hpp> +#include <condition_variable> +#include <unordered_map> +#include <boost/lockfree/queue.hpp> +#include <atomic> +#include <chrono> +#include <functional> +#include <list> +#include <memory> +#include <thread> + +namespace uhd { namespace transport { + +namespace { + +constexpr int32_t blocking_timeout_ms = 100; + +// Struct to help keep track of frames reserved for each link +struct frame_reservation_t +{ +    recv_link_if::sptr recv_link; +    size_t num_recv_frames = 0; +    send_link_if::sptr send_link; +    size_t num_send_frames = 0; +}; + +// Helper class to keep track of frames reserved for each link +class frame_reservation_mgr +{ +public: +    void register_link(const recv_link_if::sptr& recv_link) +    { +        if (_recv_tbl[recv_link.get()] != 0) { +            throw uhd::runtime_error("Recv link already attached to I/O service"); +        } +        _recv_tbl[recv_link.get()] = 0; +    } + +    void register_link(const send_link_if::sptr& send_link) +    { +        if (_send_tbl[send_link.get()] != 0) { +            throw uhd::runtime_error("Send link already attached to I/O service"); +        } +        _send_tbl[send_link.get()] = 0; +    } + +    void reserve_frames(const frame_reservation_t& reservation) +    { +        if (reservation.recv_link) { +            const size_t rsvd_frames = _recv_tbl.at(reservation.recv_link.get()); +            const size_t capacity    = reservation.recv_link->get_num_recv_frames(); +            if (rsvd_frames + reservation.num_recv_frames > capacity) { +                throw uhd::runtime_error("Number of frames requested exceeds link recv frame capacity"); +            } + +            recv_link_if* link_ptr = reservation.recv_link.get(); +            _recv_tbl[link_ptr]    = rsvd_frames + reservation.num_recv_frames; +        } + +        if (reservation.send_link) { +            const size_t rsvd_frames = _send_tbl.at(reservation.send_link.get()); +            const size_t capacity    = reservation.send_link->get_num_send_frames(); +            if (rsvd_frames + reservation.num_send_frames > capacity) { +                throw uhd::runtime_error("Number of frames requested exceeds link send frame capacity"); +            } + +            send_link_if* link_ptr = reservation.send_link.get(); +            _send_tbl[link_ptr]    = rsvd_frames + reservation.num_send_frames; +        } +    } + +    void unreserve_frames(const frame_reservation_t& reservation) +    { +        if (reservation.recv_link) { +            const size_t rsvd_frames = _recv_tbl.at(reservation.recv_link.get()); +            recv_link_if* link_ptr   = reservation.recv_link.get(); +            _recv_tbl[link_ptr]      = rsvd_frames - reservation.num_recv_frames; +        } + +        if (reservation.send_link) { +            const size_t rsvd_frames = _send_tbl.at(reservation.send_link.get()); +            send_link_if* link_ptr   = reservation.send_link.get(); +            _send_tbl[link_ptr]      = rsvd_frames - reservation.num_send_frames; +        } +    } + +private: +    std::unordered_map<recv_link_if*, size_t> _recv_tbl; +    std::unordered_map<send_link_if*, size_t> _send_tbl; +}; + + +// Semaphore used in blocking I/O for offload thread +class semaphore +{ +public: +    void notify() { +        std::unique_lock<std::mutex> lock(_cv_mutex); +        _count++; +        _cv.notify_one(); +    } + +    void wait() { +        std::unique_lock<std::mutex> lock(_cv_mutex); +        _cv.wait(lock, [this]() { return this->_count != 0; }); +        _count--; +    } + +    bool try_wait() { +        std::unique_lock<std::mutex> lock(_cv_mutex); +        if (_count != 0) { +            _count--; +            return true; +        } +        return false; +    } + +    bool wait_for(size_t timeout_ms) { +        std::chrono::milliseconds timeout(timeout_ms); +        std::unique_lock<std::mutex> lock(_cv_mutex); +        if (_cv.wait_for(lock, timeout, [this]() { return this->_count != 0; })) { +            _count--; +            return true; +        } +        return false; +    } + +    size_t count() { +        std::unique_lock<std::mutex> lock(_cv_mutex); +        return _count; +    } + +private: +    std::condition_variable _cv; +    std::mutex _cv_mutex; +    size_t _count = 0; +}; + +// Fixed-size queue that supports blocking semantics +template <typename queue_item_t> +class offload_thread_queue { +public: +    offload_thread_queue(size_t size) +        : _buffer(new queue_item_t[size]) +        , _capacity(size) +    { +    } + +    ~offload_thread_queue() +    { +        delete [] _buffer; +    } + +    void push(const queue_item_t& item) +    { +        _buffer[_write_index++] = item; +        _write_index %= _capacity; +        _item_sem.notify(); +    } + +    bool pop(queue_item_t& item) +    { +        if (_item_sem.try_wait()) { +            item = _buffer[_read_index++]; +            _read_index %= _capacity; +            return true; +        } else { +            return false; +        } +    } + +    bool pop(queue_item_t& item, int32_t timeout_ms) +    { +        if (_item_sem.wait_for(timeout_ms)) { +            item = _buffer[_read_index++]; +            _read_index %= _capacity; +            return true; +        } else { +            return false; +        } +    } + +    size_t read_available() +    { +        return _item_sem.count(); +    } + +private: +    queue_item_t* _buffer; +    const size_t _capacity; + +    size_t _read_index  = 0; +    size_t _write_index = 0; + +    // Semaphore gating number of items available to read +    semaphore _item_sem; +}; + +// Object that implements the communication between client and offload thread +struct client_port_t +{ +public: +    using sptr = std::shared_ptr<client_port_t>; + +    client_port_t(size_t size) +        : _from_offload_thread(size) +        , _to_offload_thread(size + 1) // add one for disconnect command +    { +    } + +    // +    // Client methods +    // +    frame_buff* client_pop() +    { +        from_offload_thread_t queue_element; +        _from_offload_thread.pop(queue_element); +        return queue_element.buff; +    } + +    size_t client_read_available() +    { +        return _from_offload_thread.read_available(); +    } + +    void client_push(frame_buff* buff) +    { +        to_offload_thread_t queue_element{buff, false}; +        _to_offload_thread.push(queue_element); +    } + +    void client_wait_until_connected() +    { +        std::unique_lock<std::mutex> lock(_connect_cv_mutex); +        _connect_cv.wait(lock, [this]() { return _connected; }); +    } + +    void client_disconnect() +    { +        to_offload_thread_t queue_element{nullptr, true}; +        _to_offload_thread.push(queue_element); + +        // Need to wait for the disconnect to occur before returning, since the +        // caller (the xport object) has callbacks installed in the inline I/O +        // service. After this method returns, the caller can be deallocated. +        std::unique_lock<std::mutex> lock(_connect_cv_mutex); +        _connect_cv.wait(lock, [this]() { return !_connected; }); +    } + +    // +    // Offload thread methods +    // +    void offload_thread_push(frame_buff* buff) +    { +        from_offload_thread_t queue_element{buff}; +        _from_offload_thread.push(queue_element); +    } + +    std::tuple<frame_buff*, bool> offload_thread_pop() +    { +        to_offload_thread_t queue_element; +        _to_offload_thread.pop(queue_element); +        return std::make_tuple(queue_element.buff, queue_element.disconnect); +    } + +    std::tuple<frame_buff*, bool> offload_thread_pop(int32_t timeout_ms) +    { +        to_offload_thread_t queue_element; +        _to_offload_thread.pop(queue_element, timeout_ms); +        return std::make_tuple(queue_element.buff, queue_element.disconnect); +    } + +    void offload_thread_set_connected(const bool value) +    { +        { +            std::lock_guard<std::mutex> lock(_connect_cv_mutex); +            _connected = value; +        } +        _connect_cv.notify_one(); +    } + +    // Flush should only be called once the client is no longer accessing the +    // queue going from the offload thread to the client, since it drains that +    // queue from the offload thread. +    template <typename fn_t> +    size_t offload_thread_flush(fn_t f) +    { +        size_t count = 0; +        from_offload_thread_t queue_element; +        while (_from_offload_thread.pop(queue_element)) { +            f(queue_element.buff); +            count++; +        } +        return count; +    } + +private: +    // Queue for frame buffers coming from the offload thread +    struct from_offload_thread_t +    { +        frame_buff* buff = nullptr; +    }; + +    using from_offload_thread_queue_t = offload_thread_queue<from_offload_thread_t>; + +    // Queue for frame buffers and disconnect requests to offload thread. Disconnect +    // requests must be inline with incoming buffers to avoid any race conditions +    // between the two. +    struct to_offload_thread_t +    { +        frame_buff* buff = nullptr; +        bool disconnect  = false; +    }; + +    using to_offload_thread_queue_t = offload_thread_queue<to_offload_thread_t>; + +    // Queues to carry frame buffers in both directions +    from_offload_thread_queue_t _from_offload_thread; +    to_offload_thread_queue_t _to_offload_thread; + +    // Mutex and condition variable to wait for connect and disconnect +    std::condition_variable _connect_cv; +    std::mutex _connect_cv_mutex; +    bool _connected = false; +}; + +} // namespace + +// Implementation of io service that executes an inline io service in an offload +// thread. The offload thread communicates with send and recv clients using a +// pair of spsc queues. One queue carries buffers from the offload thread to the +// client, and the other carries buffers in the opposite direction. +// +// Requests to create new clients are handled using a separate mpsc queue. Client +// requests to disconnect are sent in the same spsc queue as the buffers so that +// they are processed only after all buffer release requestss have been processed. +class offload_io_service_impl +    : public offload_io_service, +      public std::enable_shared_from_this<offload_io_service_impl> +{ +public: +    using sptr = std::shared_ptr<offload_io_service_impl>; + +    offload_io_service_impl( +        io_service::sptr io_srv, const offload_io_service::params_t& params); +    ~offload_io_service_impl(); + +    void attach_recv_link(recv_link_if::sptr link); +    void attach_send_link(send_link_if::sptr link); + +    recv_io_if::sptr make_recv_client(recv_link_if::sptr recv_link, +        size_t num_recv_frames, +        recv_callback_t cb, +        send_link_if::sptr fc_link, +        size_t num_send_frames, +        recv_io_if::fc_callback_t fc_cb); + +    send_io_if::sptr make_send_client(send_link_if::sptr send_link, +        size_t num_send_frames, +        send_io_if::send_callback_t send_cb, +        recv_link_if::sptr recv_link, +        size_t num_recv_frames, +        recv_callback_t recv_cb); + +private: +    offload_io_service_impl(const offload_io_service_impl&) = delete; + +    // Queue for new client creation, multiple producers allowed. Requests are +    // passed as heap-allocated pointers because boost lockfree queues require +    // simple types. +    struct client_req_t +    { +        std::function<void()>* req = nullptr; +    }; +    using client_req_queue_t = boost::lockfree::queue<client_req_t>; + +    // Values used by offload thread for each client +    struct recv_client_info_t +    { +        client_port_t::sptr port; +        recv_io_if::sptr inline_io; +        size_t num_frames_in_use = 0; +        frame_reservation_t frames_reserved; +    }; +    struct send_client_info_t +    { +        client_port_t::sptr port; +        send_io_if::sptr inline_io; +        size_t num_frames_in_use = 0; +        frame_reservation_t frames_reserved; +    }; + +    void _get_recv_buff(recv_client_info_t& info, int32_t timeout_ms); +    void _get_send_buff(send_client_info_t& info); +    void _release_recv_buff(recv_client_info_t& info, frame_buff* buff); +    void _release_send_buff(send_client_info_t& info, frame_buff* buff); +    void _disconnect_recv_client(recv_client_info_t& info); +    void _disconnect_send_client(send_client_info_t& info); + +    template <bool allow_recv, bool allow_send> +    void _do_work_polling(); + +    template <bool allow_recv, bool allow_send> +    void _do_work_blocking(); + +    // The I/O service that executes within the offload thread +    io_service::sptr _io_srv; + +    // Type of clients supported by this I/O service +    client_type_t _client_type; + +    // Offload thread, its stop flag, and thread-related parameters +    std::unique_ptr<std::thread> _offload_thread; +    std::atomic<bool> _stop_offload_thread{false}; +    offload_io_service::params_t _offload_thread_params; + +    // Lists of clients and their respective queues +    std::list<recv_client_info_t> _recv_clients; +    std::list<send_client_info_t> _send_clients; + +    // Queue for connect and disconnect client requests +    client_req_queue_t _client_connect_queue; + +    // Keep track of frame reservations +    frame_reservation_mgr _reservation_mgr; +}; + +class offload_recv_io : public recv_io_if +{ +public: +    offload_recv_io(offload_io_service_impl::sptr io_srv, +        size_t num_recv_frames, +        size_t num_send_frames, +        client_port_t::sptr& port); + +    ~offload_recv_io(); + +    frame_buff::uptr get_recv_buff(int32_t timeout_ms); +    void release_recv_buff(frame_buff::uptr buff); + +private: +    offload_recv_io()                       = delete; +    offload_recv_io(const offload_recv_io&) = delete; + +    offload_io_service_impl::sptr _io_srv; +    client_port_t::sptr _port; +    size_t _num_frames_in_use = 0; +}; + +class offload_send_io : public send_io_if +{ +public: +    offload_send_io(offload_io_service_impl::sptr io_srv, +        size_t num_recv_frames, +        size_t num_send_frames, +        client_port_t::sptr& port); + +    ~offload_send_io(); + +    frame_buff::uptr get_send_buff(int32_t timeout_ms); +    void release_send_buff(frame_buff::uptr buff); + +private: +    offload_send_io()                       = delete; +    offload_send_io(const offload_send_io&) = delete; + +    offload_io_service_impl::sptr _io_srv; +    client_port_t::sptr _port; +    size_t _num_frames_in_use = 0; +}; + +// Implementation of get_send_buff used below by send and recv clients +template <typename pop_func_t> +static frame_buff::uptr client_get_buff(pop_func_t pop, const int32_t timeout_ms) +{ +    using namespace std::chrono; + +    if (timeout_ms == 0) { +        return frame_buff::uptr(pop()); +    } + +    const auto end_time = steady_clock::now() + milliseconds(timeout_ms); + +    bool last_check = false; + +    while (true) { +        if (frame_buff* buff = pop()) { +            return frame_buff::uptr(buff); +        } + +        if (timeout_ms > 0 && steady_clock::now() > end_time) { +            if (last_check) { +                return nullptr; +            } else { +                last_check = true; +            } +        } +        std::this_thread::yield(); +    } +} + +// +// offload_recv_io methods +// +offload_recv_io::offload_recv_io(offload_io_service_impl::sptr io_srv, +    size_t num_recv_frames, +    size_t num_send_frames, +    client_port_t::sptr& port) +    : _io_srv(io_srv), _port(port) +{ +    _num_recv_frames = num_recv_frames; +    _num_send_frames = num_send_frames; +} + +offload_recv_io::~offload_recv_io() +{ +    assert(_num_frames_in_use == 0); + +    if (_io_srv) { +        _port->client_disconnect(); +    } +} + +frame_buff::uptr offload_recv_io::get_recv_buff(int32_t timeout_ms) +{ +    return client_get_buff( +        [this]() { +            frame_buff* buff = _port->client_pop(); +            _num_frames_in_use += buff ? 1 : 0; +            return buff; +        }, +        timeout_ms); +} + +void offload_recv_io::release_recv_buff(frame_buff::uptr buff) +{ +    assert(buff); +    _port->client_push(buff.release()); +    _num_frames_in_use--; +} + +// +// offload_send_io methods +// +offload_send_io::offload_send_io(offload_io_service_impl::sptr io_srv, +    size_t num_recv_frames, +    size_t num_send_frames, +    client_port_t::sptr& port) +    : _io_srv(io_srv), _port(port) +{ +    _num_recv_frames = num_recv_frames; +    _num_send_frames = num_send_frames; +} + +offload_send_io::~offload_send_io() +{ +    assert(_num_frames_in_use == 0); + +    if (_io_srv) { +        _port->client_disconnect(); +    } +} + +frame_buff::uptr offload_send_io::get_send_buff(int32_t timeout_ms) +{ +    return client_get_buff( +        [this]() { +            frame_buff* buff = _port->client_pop(); +            _num_frames_in_use += buff ? 1 : 0; +            return buff; +        }, +        timeout_ms); +} + +void offload_send_io::release_send_buff(frame_buff::uptr buff) +{ +    assert(buff); +    _port->client_push(buff.release()); +    _num_frames_in_use--; +} + +// +// offload_io_service methods +// +offload_io_service::sptr offload_io_service::make( +    io_service::sptr io_srv, const offload_io_service::params_t& params) +{ +    return std::make_shared<offload_io_service_impl>(io_srv, params); +} + +// +// offload_io_service_impl methods +// +offload_io_service_impl::offload_io_service_impl( +    io_service::sptr io_srv, const offload_io_service::params_t& params) +    : _io_srv(io_srv) +    , _offload_thread_params(params) +    , _client_connect_queue(10) // arbitrary initial size +{ +    if (params.wait_mode == BLOCK && params.client_type == BOTH_SEND_AND_RECV) { +        throw uhd::value_error( +            "An I/O service configured to block should only service either " +            "send or recv clients to prevent one client type from starving " +            "the other"); +    } + +    std::function<void()> thread_fn; + +    if (params.wait_mode == BLOCK) { +        if (params.client_type == RECV_ONLY) { +            thread_fn = [this]() { _do_work_blocking<true, false>(); }; +        } else if (params.client_type == SEND_ONLY) { +            thread_fn = [this]() { _do_work_blocking<false, true>(); }; +        } else { +            UHD_THROW_INVALID_CODE_PATH(); +        } +    } else if (params.wait_mode == POLL) { +        if (params.client_type == RECV_ONLY) { +            thread_fn = [this]() { _do_work_polling<true, false>(); }; +        } else if (params.client_type == SEND_ONLY) { +            thread_fn = [this]() { _do_work_polling<false, true>(); }; +        } else if (params.client_type == BOTH_SEND_AND_RECV) { +            thread_fn = [this]() { _do_work_polling<true, true>(); }; +        } else { +            UHD_THROW_INVALID_CODE_PATH(); +        } +    } else { +        UHD_THROW_INVALID_CODE_PATH(); +    } + +    _offload_thread = std::make_unique<std::thread>(thread_fn); +} + +offload_io_service_impl::~offload_io_service_impl() +{ +    _stop_offload_thread = true; + +    if (_offload_thread) { +        _offload_thread->join(); +    } + +    assert(_recv_clients.empty()); +    assert(_send_clients.empty()); +} + +void offload_io_service_impl::attach_recv_link(recv_link_if::sptr link) +{ +    // Create a request to attach link in the offload thread +    auto req_fn = [this, link]() { +        _reservation_mgr.register_link(link); +        _io_srv->attach_recv_link(link); +    }; + +    client_req_t queue_element; +    queue_element.req  = {new std::function<void()>(req_fn)}; +    const bool success = _client_connect_queue.push(queue_element); +    if (!success) { +        throw uhd::runtime_error("Failed to push attach_recv_link request"); +    } +} + +void offload_io_service_impl::attach_send_link(send_link_if::sptr link) +{ +    // Create a request to attach link in the offload thread +    auto req_fn = [this, link]() { +        _reservation_mgr.register_link(link); +        _io_srv->attach_send_link(link); +    }; + +    client_req_t queue_element; +    queue_element.req  = {new std::function<void()>(req_fn)}; +    const bool success = _client_connect_queue.push(queue_element); +    if (!success) { +        throw uhd::runtime_error("Failed to push attach_send_link request"); +    } +} + +recv_io_if::sptr offload_io_service_impl::make_recv_client(recv_link_if::sptr recv_link, +    size_t num_recv_frames, +    recv_callback_t cb, +    send_link_if::sptr fc_link, +    size_t num_send_frames, +    recv_io_if::fc_callback_t fc_cb) +{ +    UHD_ASSERT_THROW(_offload_thread); + +    if (_client_type == SEND_ONLY) { +        throw uhd::runtime_error("Recv client not supported by this I/O service"); +    } + +    auto port = std::make_shared<client_port_t>(num_recv_frames); + +    // Create a request to create a new receiver in the offload thread +    auto req_fn = +        [this, recv_link, num_recv_frames, cb, fc_link, num_send_frames, fc_cb, port]() { +            frame_reservation_t frames = {recv_link, num_recv_frames, fc_link, num_send_frames}; +            _reservation_mgr.reserve_frames(frames); + +            auto inline_recv_io = _io_srv->make_recv_client( +                recv_link, num_recv_frames, cb, fc_link, num_send_frames, fc_cb); + +            recv_client_info_t client_info; +            client_info.inline_io       = inline_recv_io; +            client_info.port            = port; +            client_info.frames_reserved = frames; + +            _recv_clients.push_back(client_info); + +            // Notify that the connection is created +            port->offload_thread_set_connected(true); +        }; + +    client_req_t queue_element; +    queue_element.req  = {new std::function<void()>(req_fn)}; +    const bool success = _client_connect_queue.push(queue_element); +    if (!success) { +        throw uhd::runtime_error("Failed to push make_recv_client request"); +    } + +    port->client_wait_until_connected(); + +    // Return a new recv client to the caller that just operates on the queues +    return std::make_shared<offload_recv_io>( +        shared_from_this(), num_recv_frames, num_send_frames, port); +} + +send_io_if::sptr offload_io_service_impl::make_send_client(send_link_if::sptr send_link, +    size_t num_send_frames, +    send_io_if::send_callback_t send_cb, +    recv_link_if::sptr recv_link, +    size_t num_recv_frames, +    recv_callback_t recv_cb) +{ +    UHD_ASSERT_THROW(_offload_thread); + +    if (_client_type == RECV_ONLY) { +        throw uhd::runtime_error("Send client not supported by this I/O service"); +    } + +    auto port = std::make_shared<client_port_t>(num_send_frames); + +    // Create a request to create a new receiver in the offload thread +    auto req_fn = [this, +                      send_link, +                      num_send_frames, +                      send_cb, +                      recv_link, +                      num_recv_frames, +                      recv_cb, +                      port]() { +        frame_reservation_t frames = {recv_link, num_recv_frames, send_link, num_send_frames}; +        _reservation_mgr.reserve_frames(frames); + +        auto inline_send_io = _io_srv->make_send_client( +            send_link, num_send_frames, send_cb, recv_link, num_recv_frames, recv_cb); + +        send_client_info_t client_info; +        client_info.inline_io       = inline_send_io; +        client_info.port            = port; +        client_info.frames_reserved = frames; + +        _send_clients.push_back(client_info); + +        // Notify that the connection is created +        port->offload_thread_set_connected(true); +    }; + +    client_req_t queue_element; +    queue_element.req  = {new std::function<void()>(req_fn)}; +    const bool success = _client_connect_queue.push(queue_element); +    if (!success) { +        throw uhd::runtime_error("Failed to push make_send_client request"); +    } + +    port->client_wait_until_connected(); + +    // Wait for buffer queue to be full +    while (port->client_read_available() != num_send_frames) { +        std::this_thread::sleep_for(std::chrono::microseconds(100)); +    } + +    // Return a new recv client to the caller that just operates on the queues +    return std::make_shared<offload_send_io>( +        shared_from_this(), num_recv_frames, num_send_frames, port); +} + +// Get a single receive buffer if available and update client info +void offload_io_service_impl::_get_recv_buff(recv_client_info_t& info, int32_t timeout_ms) +{ +    if (info.num_frames_in_use < info.frames_reserved.num_recv_frames) { +        if (frame_buff::uptr buff = info.inline_io->get_recv_buff(timeout_ms)) { +            info.port->offload_thread_push(buff.release()); +            info.num_frames_in_use++; +        } +    } +} + +// Get a single send buffer if available and update client info +void offload_io_service_impl::_get_send_buff(send_client_info_t& info) +{ +    if (info.num_frames_in_use < info.frames_reserved.num_send_frames) { +        if (frame_buff::uptr buff = info.inline_io->get_send_buff(0)) { +            info.port->offload_thread_push(buff.release()); +            info.num_frames_in_use++; +        } +    } +} + +// Release a single recv buffer and update client info +void offload_io_service_impl::_release_recv_buff(recv_client_info_t& info, frame_buff* buff) +{ +    info.inline_io->release_recv_buff(frame_buff::uptr(buff)); +    assert(info.num_frames_in_use > 0); +    info.num_frames_in_use--; +} + +// Release a single send info +void offload_io_service_impl::_release_send_buff(send_client_info_t& info, frame_buff* buff) +{ +    info.inline_io->release_send_buff(frame_buff::uptr(buff)); +    assert(info.num_frames_in_use > 0); +    info.num_frames_in_use--; +} + +// Flush client queues and unreserve its frames +void offload_io_service_impl::_disconnect_recv_client(recv_client_info_t& info) +{ +    auto release_buff = [&info](frame_buff* buff) { +        info.inline_io->release_recv_buff(frame_buff::uptr(buff)); +    }; + +    info.num_frames_in_use -= info.port->offload_thread_flush(release_buff); +    assert(info.num_frames_in_use == 0); +    _reservation_mgr.unreserve_frames(info.frames_reserved); + +    // Client waits for a notification after requesting disconnect, so notify it +    info.port->offload_thread_set_connected(false); +} + +// Flush client queues and unreserve its frames +void offload_io_service_impl::_disconnect_send_client(send_client_info_t& info) +{ +    auto release_buff = [&info](frame_buff* buff) { +        info.inline_io->release_send_buff(frame_buff::uptr(buff)); +    }; +    info.num_frames_in_use -= info.port->offload_thread_flush(release_buff); +    assert(info.num_frames_in_use == 0); +    _reservation_mgr.unreserve_frames(info.frames_reserved); + +    // Client waits for a notification after requesting disconnect, so notify it +    info.port->offload_thread_set_connected(false); +} + +template <bool allow_recv, bool allow_send> +void offload_io_service_impl::_do_work_polling() +{ +    uhd::set_thread_affinity(_offload_thread_params.cpu_affinity_list); + +    client_req_t client_req; + +    while (!_stop_offload_thread) { +        if (allow_recv) { +            // Get recv buffers +            for (auto& recv_info : _recv_clients) { +                _get_recv_buff(recv_info, 0); +            } + +            // Release recv buffers +            for (auto it = _recv_clients.begin(); it != _recv_clients.end();) { +                frame_buff* buff; +                bool disconnect; +                std::tie(buff, disconnect) = it->port->offload_thread_pop(); +                if (buff) { +                    _release_recv_buff(*it, buff); +                } else if (disconnect) { +                    _disconnect_recv_client(*it); +                    it = _recv_clients.erase(it); // increments it +                    continue; +                } +                ++it; +            } +        } + +        if (allow_send) { +            // Get send buffers +            for (auto& send_info : _send_clients) { +                _get_send_buff(send_info); +            } + +            // Release send buffers +            for (auto it = _send_clients.begin(); it != _send_clients.end();) { +                frame_buff* buff; +                bool disconnect; +                std::tie(buff, disconnect) = it->port->offload_thread_pop(); +                if (buff) { +                    _release_send_buff(*it, buff); +                } else if (disconnect) { +                    _disconnect_send_client(*it); +                    it = _send_clients.erase(it); // increments it +                    continue; +                } +                ++it; +            } +        } + +        // Execute one client connect command per main loop iteration +        if (_client_connect_queue.pop(client_req)) { +            (*client_req.req)(); +            delete client_req.req; +        } +    } +} + +template <bool allow_recv, bool allow_send> +void offload_io_service_impl::_do_work_blocking() +{ +    uhd::set_thread_affinity(_offload_thread_params.cpu_affinity_list); + +    client_req_t client_req; + +    while (!_stop_offload_thread) { +        if (allow_recv) { +            // Get recv buffers +            for (auto& recv_info : _recv_clients) { +                _get_recv_buff(recv_info, blocking_timeout_ms); +            } + +            // Release recv buffers +            for (auto it = _recv_clients.begin(); it != _recv_clients.end();) { +                frame_buff* buff; +                bool disconnect; + +                if (it->num_frames_in_use == it->frames_reserved.num_recv_frames) { +                    // If all buffers are in use, block to avoid excessive CPU usage +                    std::tie(buff, disconnect) = it->port->offload_thread_pop(blocking_timeout_ms); +                } else { +                    // Otherwise, just check current status +                    std::tie(buff, disconnect) = it->port->offload_thread_pop(); +                } + +                if (buff) { +                    _release_recv_buff(*it, buff); +                } else if (disconnect) { +                    _disconnect_recv_client(*it); +                    it = _recv_clients.erase(it); // increments it +                    continue; +                } +                ++it; +            } +        } + +        if (allow_send) { +            // Get send buffers +            for (auto& send_info : _send_clients) { +                _get_send_buff(send_info); +            } + +            // Release send buffers +            for (auto it = _send_clients.begin(); it != _send_clients.end();) { +                if (it->num_frames_in_use > 0) { +                    frame_buff* buff; +                    bool disconnect; +                    std::tie(buff, disconnect) = it->port->offload_thread_pop(blocking_timeout_ms); + +                    if (buff) { +                        _release_send_buff(*it, buff); +                    } else if (disconnect) { +                        _disconnect_send_client(*it); +                        it = _send_clients.erase(it); // increments it +                        continue; +                    } +                } +                ++it; +            } +        } + +        // Execute one client connect command per main loop iteration +        // TODO: In a blocking I/O strategy, the loop can take a long time to +        // service these requests. Need to configure all clients up-front, +        // before starting the offload thread to avoid this. +        if (_client_connect_queue.pop(client_req)) { +            (*client_req.req)(); +            delete client_req.req; +        } +    } +} + +}} // namespace uhd::transport  | 
