diff options
| -rw-r--r-- | host/lib/include/uhdlib/transport/inline_io_service.hpp | 121 | ||||
| -rw-r--r-- | host/lib/transport/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | host/lib/transport/inline_io_service.cpp | 415 | ||||
| -rw-r--r-- | host/tests/CMakeLists.txt | 6 | ||||
| -rw-r--r-- | host/tests/common/mock_transport.hpp | 369 | ||||
| -rw-r--r-- | host/tests/transport_test.cpp | 188 | 
6 files changed, 1100 insertions, 0 deletions
diff --git a/host/lib/include/uhdlib/transport/inline_io_service.hpp b/host/lib/include/uhdlib/transport/inline_io_service.hpp new file mode 100644 index 000000000..f10e7018d --- /dev/null +++ b/host/lib/include/uhdlib/transport/inline_io_service.hpp @@ -0,0 +1,121 @@ +// +// Copyright 2019 Ettus Research, a National Instruments brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_UHDLIB_TRANSPORT_INLINE_IO_SERVICE_HPP +#define INCLUDED_UHDLIB_TRANSPORT_INLINE_IO_SERVICE_HPP + +#include <uhdlib/transport/io_service.hpp> +#include <unordered_map> +#include <list> + +namespace uhd { namespace transport { + +class inline_recv_mux; +class inline_recv_cb; + +/*! + * Single-threaded I/O service + * Note this is not an appropriate io_service to use with polling-mode drivers, + * since such drivers require a thread to poll them and not block (i.e. + * timeouts are not allowed at the link interface) + */ +class inline_io_service : public virtual io_service, +                          public std::enable_shared_from_this<inline_io_service> +{ +public: +    using sptr = std::shared_ptr<inline_io_service>; +    static sptr make(void) +    { +        return sptr(new inline_io_service()); +    } + +    ~inline_io_service(); + +    void attach_recv_link(recv_link_if::sptr link); +    void attach_send_link(send_link_if::sptr link); + +    recv_io_if::sptr make_recv_client(recv_link_if::sptr data_link, +        size_t num_recv_frames, +        recv_callback_t cb, +        send_link_if::sptr fc_link, +        size_t num_send_frames, +        recv_io_if::fc_callback_t fc_cb); + +    send_io_if::sptr make_send_client(send_link_if::sptr send_link, +        size_t num_send_frames, +        send_io_if::send_callback_t send_cb, +        recv_link_if::sptr recv_link, +        size_t num_recv_frames, +        recv_callback_t recv_cb); + +private: +    friend class inline_recv_io; +    friend class inline_send_io; + +    inline_io_service()                         = default; +    inline_io_service(const inline_io_service&) = delete; + +    /*! +     * Senders are free to mux a send_link, but the total reserved send_frames +     * must be less than or equal to the link's capacity +     * +     * \param link the link used for sending data +     * \param num_frames number of frames to reserve for this connection +     */ +    void connect_sender(send_link_if* link, size_t num_frames); + +    /*! +     * Disconnect the sender and free resources +     * +     * \param link the link that was used for sending data +     * \param num_frames number of frames to release (same as reservation) +     */ +    void disconnect_sender(send_link_if* link, size_t num_frames); + +    /*! +     * Connect a receiver to the link and reserve resources +     * \param link the recv link to use for getting data +     * \param cb a callback for processing received data +     * \param num_frames the number of frames to reserve for this receiver +     */ +    void connect_receiver(recv_link_if* link, inline_recv_cb* cb, size_t num_frames); + +    /*! +     * Disconnect the receiver from the provided link and free resources +     * \param link the recv link that was used for reception +     * \param cb the callback to disassociate +     * \param num_frames the number of frames that was reserved for the cb +     */ +    void disconnect_receiver(recv_link_if* link, inline_recv_cb* cb, size_t num_frames); + +    /* +     * Function to perform recv operations on a link, which is potentially +     * muxed. Packets are forwarded to the appropriate mux or callback. +     * +     * \param recv_io_cb the callback+interface initiating the operation +     * \param recv_link link to perform receive on +     * \param timeout_ms timeout to wait for a buffer on the link +     * \return a frame_buff uptr with either a buffer with data or no buffer +     */ +    frame_buff::uptr recv( +        inline_recv_cb* recv_io_cb, recv_link_if* recv_link, int32_t timeout_ms); + +    /* Track whether link is muxed, the callback, and buffer reservations */ +    std::unordered_map<recv_link_if*, +        std::tuple<inline_recv_mux*, inline_recv_cb*, size_t>> +        _recv_tbl; + +    /* Track how many send_frames have been reserved for each link */ +    std::unordered_map<send_link_if*, size_t> _send_tbl; + +    /* Shared ptr kept to avoid untimely release */ +    std::list<send_link_if::sptr> _send_links; +    std::list<recv_link_if::sptr> _recv_links; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_UHDLIB_TRANSPORT_INLINE_IO_SERVICE_HPP */ diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt index df94f42be..a9663c89a 100644 --- a/host/lib/transport/CMakeLists.txt +++ b/host/lib/transport/CMakeLists.txt @@ -121,6 +121,7 @@ LIBUHD_APPEND_SOURCES(      ${CMAKE_CURRENT_SOURCE_DIR}/chdr.cpp      ${CMAKE_CURRENT_SOURCE_DIR}/muxed_zero_copy_if.cpp      ${CMAKE_CURRENT_SOURCE_DIR}/zero_copy_flow_ctrl.cpp +    ${CMAKE_CURRENT_SOURCE_DIR}/inline_io_service.cpp  )  if(ENABLE_X300) diff --git a/host/lib/transport/inline_io_service.cpp b/host/lib/transport/inline_io_service.cpp new file mode 100644 index 000000000..72acea738 --- /dev/null +++ b/host/lib/transport/inline_io_service.cpp @@ -0,0 +1,415 @@ +#include <uhd/config.hpp> +#include <uhd/exception.hpp> +#include <uhd/utils/log.hpp> +#include <uhdlib/transport/inline_io_service.hpp> +#include <boost/circular_buffer.hpp> +#include <cassert> + +namespace uhd { namespace transport { + +/*! + * Interface class for unifying callback processing between both inline_send_io + * and inline_recv_io + */ +class inline_recv_cb +{ +public: +    /*! +     * Function to call the callback method +     * +     * \param buff buffer received +     * \param recv_link pointer to recv link used with the callback +     * \return whether the packet was destined for this callback +     */ +    UHD_FORCE_INLINE bool callback(frame_buff::uptr& buff, recv_link_if* recv_link) +    { +        return _recv_cb(buff, recv_link, _cb_send_link); +    } + +protected: +    inline_recv_cb(recv_callback_t cb, send_link_if* send_link) +        : _recv_cb(cb), _cb_send_link(send_link) +    { +    } + +    recv_callback_t _recv_cb; +    // pointer to send link used with the callback +    send_link_if* _cb_send_link; +}; + +/*! + * Mux class that intercepts packets from the link and distributes them to + * queues for each client that is not the caller of the recv() function + */ +class inline_recv_mux +{ +public: +    inline_recv_mux(recv_link_if* link) : _link(link){}; + +    ~inline_recv_mux(){}; + +    /*! +     * Connect a new receiver to the recv link +     * +     * \param cb pointer to the callback for the receiver +     */ +    void connect(inline_recv_cb* cb) +    { +        UHD_ASSERT_THROW(_queues.count(cb) == 0); +        /* Always create queue of max size, since we don't know when there are +         * virtual channels (which share frames) +         */ +        auto queue = +            new boost::circular_buffer<frame_buff*>(_link->get_num_recv_frames()); +        _queues[cb] = queue; +        _callbacks.push_back(cb); +    } + +    /*! +     * Disconnect a receiver currently connected to the recv link +     * \param cb a pointer to the callback to disconnect +     */ +    void disconnect(inline_recv_cb* cb) +    { +        auto queue = _queues.at(cb); +        while (!queue->empty()) { +            frame_buff* buff = queue->front(); +            _link->release_recv_buff(frame_buff::uptr(buff)); +            queue->pop_front(); +        } +        delete queue; +        _queues.erase(cb); +        _callbacks.remove(cb); +    } + +    /*! +     * Check if there are callbacks registered to this mux +     * \return whether there are no callbacks registered +     */ +    UHD_FORCE_INLINE bool is_empty(void) const +    { +        return _callbacks.empty(); +    } + +    /*! +     * Do receive processing for the mux +     * \param cb the callback that is currently seeking a buffer +     * \param recv_link the link to do recv on +     * \param timeout_ms the timeout for the recv operation +     * \return a frame_buff with data if a packet was received (else empty) +     */ +    frame_buff::uptr recv(inline_recv_cb* cb, recv_link_if* recv_link, int32_t timeout_ms) +    { +        auto queue = _queues.at(cb); +        if (!queue->empty()) { +            frame_buff* buff = queue->front(); +            queue->pop_front(); +            return frame_buff::uptr(buff); +        } +        while (true) { +            frame_buff::uptr buff = recv_link->get_recv_buff(timeout_ms); +            /* Process buffer */ +            if (buff) { +                bool rcvr_found = false; +                for (auto& rcvr : _callbacks) { +                    if (rcvr->callback(buff, recv_link)) { +                        rcvr_found = true; +                        if (buff) { +                            if (rcvr == cb) { +                                return frame_buff::uptr(std::move(buff)); +                            } else { +                                /* NOTE: Should not overflow, by construction +                                 * Every queue can hold link->get_num_recv_frames() +                                 */ +                                _queues[rcvr]->push_back(buff.release()); +                            } +                        } +                        /* Continue looping if buffer was consumed */ +                        break; +                    } +                } +                if (not rcvr_found) { +                    UHD_LOG_DEBUG("IO_SRV", "Dropping packet with no receiver"); +                    recv_link->release_recv_buff(std::move(buff)); +                } +            } else { /* Timeout */ +                return frame_buff::uptr(); +            } +        } +    } + +private: +    recv_link_if* _link; +    std::list<inline_recv_cb*> _callbacks; +    std::unordered_map<inline_recv_cb*, boost::circular_buffer<frame_buff*>*> _queues; +}; + +class inline_recv_io : public virtual recv_io_if, public virtual inline_recv_cb +{ +public: +    using sptr = std::shared_ptr<inline_recv_io>; + +    inline_recv_io(inline_io_service::sptr io_srv, +        recv_link_if::sptr data_link, +        size_t num_recv_frames, +        recv_callback_t recv_cb, +        send_link_if::sptr fc_link, +        size_t num_send_frames, +        fc_callback_t fc_cb) +        : inline_recv_cb(recv_cb, fc_link.get()) +        , _io_srv(io_srv) +        , _data_link(data_link) +        , _num_recv_frames(num_recv_frames) +        , _fc_link(fc_link) +        , _num_send_frames(num_send_frames) +        , _fc_cb(fc_cb) +    { +    } + +    ~inline_recv_io() +    { +        _io_srv->disconnect_receiver(_data_link.get(), this, _num_recv_frames); +        if (_fc_link) { +            _io_srv->disconnect_sender(_fc_link.get(), _num_send_frames); +        } +    } + +    frame_buff::uptr get_recv_buff(int32_t timeout_ms) +    { +        return _io_srv->recv(this, _data_link.get(), timeout_ms); +    } + +    void release_recv_buff(frame_buff::uptr buff) +    { +        _fc_cb(frame_buff::uptr(std::move(buff)), _data_link.get(), _fc_link.get()); +    } + +private: +    inline_io_service::sptr _io_srv; +    recv_link_if::sptr _data_link; +    size_t _num_recv_frames; +    send_link_if::sptr _fc_link; +    size_t _num_send_frames; +    fc_callback_t _fc_cb; +}; + +class inline_send_io : public virtual send_io_if, public virtual inline_recv_cb +{ +public: +    using sptr = std::shared_ptr<inline_send_io>; + +    inline_send_io(inline_io_service::sptr io_srv, +        send_link_if::sptr send_link, +        size_t num_send_frames, +        send_callback_t send_cb, +        recv_link_if::sptr recv_link, +        size_t num_recv_frames, +        recv_callback_t fc_cb) +        : inline_recv_cb(fc_cb, send_link.get()) +        , _io_srv(io_srv) +        , _send_link(send_link) +        , _num_send_frames(num_send_frames) +        , _send_cb(send_cb) +        , _recv_link(recv_link) +        , _num_recv_frames(num_recv_frames) +    { +    } + +    ~inline_send_io() +    { +        _io_srv->disconnect_sender(_send_link.get(), _num_send_frames); +        if (_recv_link) { +            _io_srv->disconnect_receiver(_recv_link.get(), this, _num_recv_frames); +        } +    } + +    frame_buff::uptr get_send_buff(int32_t timeout_ms) +    { +        /* Check initial flow control result */ +        frame_buff::uptr buff = _send_link->get_send_buff(timeout_ms); +        if (buff) { +            return frame_buff::uptr(std::move(buff)); +        } +        return frame_buff::uptr(); +    } + +    void release_send_buff(frame_buff::uptr buff) +    { +        while (buff) { /* TODO: Possibly don't loop indefinitely here */ +            if (_recv_link) { +                _io_srv->recv(this, _recv_link.get(), 0); +            } +            _send_cb(buff, _send_link.get()); +        } +    } + +private: +    inline_io_service::sptr _io_srv; +    send_link_if::sptr _send_link; +    size_t _num_send_frames; +    send_callback_t _send_cb; +    recv_link_if::sptr _recv_link; +    size_t _num_recv_frames; +    recv_callback_t _recv_cb; +}; + +inline_io_service::~inline_io_service(){}; + +void inline_io_service::attach_recv_link(recv_link_if::sptr link) +{ +    auto link_ptr = link.get(); +    UHD_ASSERT_THROW(_recv_tbl.count(link_ptr) == 0); +    _recv_tbl[link_ptr] = +        std::tuple<inline_recv_mux*, inline_recv_cb*, size_t>(nullptr, nullptr, 0); +    _recv_links.push_back(link); +}; + +recv_io_if::sptr inline_io_service::make_recv_client(recv_link_if::sptr data_link, +    size_t num_recv_frames, +    recv_callback_t cb, +    send_link_if::sptr fc_link, +    size_t num_send_frames, +    recv_io_if::fc_callback_t fc_cb) +{ +    UHD_ASSERT_THROW(data_link); +    UHD_ASSERT_THROW(num_recv_frames > 0); +    UHD_ASSERT_THROW(cb); +    if (fc_link) { +        UHD_ASSERT_THROW(num_send_frames > 0); +        UHD_ASSERT_THROW(fc_cb); +        connect_sender(fc_link.get(), num_send_frames); +    } +    sptr io_srv  = shared_from_this(); +    auto recv_io = std::make_shared<inline_recv_io>( +        io_srv, data_link, num_recv_frames, cb, fc_link, num_send_frames, fc_cb); +    connect_receiver(data_link.get(), recv_io.get(), num_recv_frames); +    return recv_io; +} + +void inline_io_service::attach_send_link(send_link_if::sptr link) +{ +    auto link_ptr = link.get(); +    UHD_ASSERT_THROW(_send_tbl.count(link_ptr) == 0); +    _send_tbl[link_ptr] = 0; +    _send_links.push_back(link); +}; + +send_io_if::sptr inline_io_service::make_send_client(send_link_if::sptr send_link, +    size_t num_send_frames, +    send_io_if::send_callback_t send_cb, +    recv_link_if::sptr recv_link, +    size_t num_recv_frames, +    recv_callback_t recv_cb) +{ +    UHD_ASSERT_THROW(send_link); +    UHD_ASSERT_THROW(num_send_frames > 0); +    UHD_ASSERT_THROW(send_cb); +    connect_sender(send_link.get(), num_send_frames); +    sptr io_srv  = shared_from_this(); +    auto send_io = std::make_shared<inline_send_io>( +        io_srv, send_link, num_send_frames, send_cb, recv_link, num_recv_frames, recv_cb); +    if (recv_link) { +        UHD_ASSERT_THROW(num_recv_frames > 0); +        UHD_ASSERT_THROW(recv_cb); +        connect_receiver(recv_link.get(), send_io.get(), num_recv_frames); +    } +    return send_io; +} + +/* + * Senders are free to mux a send_link, but the total reserved send_frames + * must be less than or equal to the link's capacity + */ +void inline_io_service::connect_sender(send_link_if* link, size_t num_frames) +{ +    size_t rsvd_frames    = _send_tbl.at(link); +    size_t frame_capacity = link->get_num_send_frames(); +    UHD_ASSERT_THROW(frame_capacity >= rsvd_frames + num_frames); +    _send_tbl[link] = rsvd_frames + num_frames; +} + +void inline_io_service::disconnect_sender(send_link_if* link, size_t num_frames) +{ +    size_t rsvd_frames = _send_tbl.at(link); +    UHD_ASSERT_THROW(rsvd_frames >= num_frames); +    _send_tbl[link] = rsvd_frames - num_frames; +} + +void inline_io_service::connect_receiver( +    recv_link_if* link, inline_recv_cb* cb, size_t num_frames) +{ +    inline_recv_mux* mux; +    inline_recv_cb* rcvr; +    size_t rsvd_frames; +    std::tie(mux, rcvr, rsvd_frames) = _recv_tbl.at(link); +    if (mux) { +        mux->connect(cb); +    } else if (rcvr) { +        mux = new inline_recv_mux(link); +        mux->connect(rcvr); +        mux->connect(cb); +        rcvr = nullptr; +    } else { +        rcvr = cb; +    } +    size_t capacity = link->get_num_recv_frames(); +    UHD_ASSERT_THROW(rsvd_frames + num_frames <= capacity); +    _recv_tbl[link] = std::make_tuple(mux, rcvr, rsvd_frames + num_frames); +} + +void inline_io_service::disconnect_receiver( +    recv_link_if* link, inline_recv_cb* cb, size_t num_frames) +{ +    inline_recv_mux* mux; +    inline_recv_cb* rcvr; +    size_t rsvd_frames; +    std::tie(mux, rcvr, rsvd_frames) = _recv_tbl.at(link); +    UHD_ASSERT_THROW(rsvd_frames >= num_frames); +    if (mux) { +        mux->disconnect(cb); +        if (mux->is_empty()) { +            delete mux; +            mux = nullptr; +        } +    } else { +        rcvr = nullptr; +    } +    _recv_tbl[link] = std::make_tuple(mux, rcvr, rsvd_frames - num_frames); +} + +frame_buff::uptr inline_io_service::recv( +    inline_recv_cb* recv_io_cb, recv_link_if* recv_link, int32_t timeout_ms) +{ +    inline_recv_mux* mux; +    inline_recv_cb* rcvr; +    size_t num_frames; +    std::tie(mux, rcvr, num_frames) = _recv_tbl.at(recv_link); + +    if (mux) { +        /* Defer to mux's recv() if present */ +        return mux->recv(recv_io_cb, recv_link, timeout_ms); +    } else { +        assert(recv_io_cb == rcvr); +    } + +    while (true) { +        frame_buff::uptr buff = recv_link->get_recv_buff(timeout_ms); +        /* Process buffer */ +        if (buff) { +            if (rcvr->callback(buff, recv_link)) { +                if (buff) { +                    return frame_buff::uptr(std::move(buff)); +                } +                /* Retry receive if got buffer but it got consumed */ +            } else { +                UHD_LOG_DEBUG("IO_SRV", "Dropping packet with no receiver"); +                recv_link->release_recv_buff(std::move(buff)); +            } +        } else { /* Timeout */ +            return frame_buff::uptr(); +        } +    } +    return frame_buff::uptr(); +} + +}} // namespace uhd::transport diff --git a/host/tests/CMakeLists.txt b/host/tests/CMakeLists.txt index 431c380c3..0c48ae058 100644 --- a/host/tests/CMakeLists.txt +++ b/host/tests/CMakeLists.txt @@ -252,6 +252,12 @@ UHD_ADD_NONAPI_TEST(      ${CMAKE_SOURCE_DIR}/lib/usrp/cores/dsp_core_utils.cpp  ) +UHD_ADD_NONAPI_TEST( +    TARGET "transport_test.cpp" +    EXTRA_SOURCES +    ${CMAKE_SOURCE_DIR}/lib/transport/inline_io_service.cpp +) +  ########################################################################  # demo of a loadable module  ######################################################################## diff --git a/host/tests/common/mock_transport.hpp b/host/tests/common/mock_transport.hpp new file mode 100644 index 000000000..321f22830 --- /dev/null +++ b/host/tests/common/mock_transport.hpp @@ -0,0 +1,369 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_UHDLIB_TRANSPORT_TRANSPORT_IF_HPP +#define INCLUDED_UHDLIB_TRANSPORT_TRANSPORT_IF_HPP + +#include <uhdlib/transport/io_service.hpp> +#include <boost/lockfree/spsc_queue.hpp> +#include <utility> + +namespace uhd { namespace transport { + +namespace { +constexpr size_t ADDR_OFFSET  = 0; +constexpr size_t TYPE_OFFSET  = 1; /* 0 for data, 1 for FC, 2 for msg */ +constexpr size_t SEQNO_OFFSET = 2; /* For FC, this is last seen seqno */ +constexpr size_t LEN_OFFSET   = 3; +constexpr size_t DATA_OFFSET  = 4; +constexpr size_t MSG_BUFFS    = 8; +}; // namespace +/*! + * Mock transport with following packet format: + * Data: [dst_addr/src_addr, type, seqno, data_len, data...] + * FC: [dst_addr/src_addr, type, ackno] + * Msg: [dst_addr/src_addr, type, null, data] + * All fields are 32-bit words (dst_addr and src_addr are 16 bits each) + */ +class mock_send_transport +{ +public: +    using sptr = std::shared_ptr<mock_send_transport>; + +    mock_send_transport(io_service::sptr io_srv, +        send_link_if::sptr send_link, +        recv_link_if::sptr recv_link, +        uint16_t dst_addr, +        uint16_t src_addr, +        uint32_t credits) +        : _credits(credits) +    { +        _send_addr = (dst_addr << 16) | (src_addr << 0); +        _recv_addr = (src_addr << 16) | (dst_addr << 0); + +        /* Make message client for sending side-band messages */ +        send_io_if::send_callback_t msg_send_cb = [this](frame_buff::uptr& buff, +                                                      send_link_if* link) { +            uint32_t* data    = (uint32_t*)buff->data(); +            data[ADDR_OFFSET] = this->_send_addr; +            data[TYPE_OFFSET] = 2; /* MSG type */ +            link->release_send_buff(std::move(buff)); +        }; +        _msg_if = io_srv->make_send_client( +            send_link, MSG_BUFFS, msg_send_cb, recv_link_if::sptr(), 0, nullptr); + +        /* Make client for sending streaming data */ +        send_io_if::send_callback_t send_cb = [this](frame_buff::uptr& buff, +                                                  send_link_if* link) { +            this->send_buff(buff, link); +        }; +        recv_callback_t recv_cb = [this](frame_buff::uptr& buff, +                                      recv_link_if* link, +                                      send_link_if* /*send_link*/) { +            return this->recv_buff(buff, link); +        }; +        /* Pretend get 1 flow control message per sent packet */ +        _send_if = io_srv->make_send_client( +            send_link, credits, send_cb, recv_link, credits, recv_cb); +    } + +    ~mock_send_transport() {} + +    /*! +     * Get a buffer for creating a non-flow-controlled message +     */ +    bool put_msg(uint32_t msg, int32_t timeout_ms) +    { +        frame_buff::uptr buff = _msg_if->get_send_buff(timeout_ms); +        if (!buff) { +            return false; +        } +        uint32_t* data    = (uint32_t*)buff->data(); +        data[TYPE_OFFSET] = 2; +        data[DATA_OFFSET] = msg; +        buff->set_packet_size((1 + DATA_OFFSET) * sizeof(uint32_t)); +        _msg_if->release_send_buff(std::move(buff)); +        return true; +    } + +    /*! +      * Get an empty frame buffer in which to write packet contents. +      * +      * \param timeout_ms a positive timeout value specifies the maximum number +                          of ms to wait, a negative value specifies to block +                          until successful, and a value of 0 specifies no wait. +      * \return a frame buffer, or null uptr if timeout occurs +      */ +    frame_buff::uptr get_data_buff(int32_t timeout_ms) +    { +        frame_buff::uptr buff = _send_if->get_send_buff(timeout_ms); +        if (!buff) { +            return frame_buff::uptr(); +        } +        uint32_t* data    = (uint32_t*)buff->data(); +        data[TYPE_OFFSET] = 0; +        return frame_buff::uptr(std::move(buff)); +    } + +    /*! +     * Release a frame buffer, allowing the driver to reuse it. +     * +     * \param buffer frame buffer to release for reuse by the link +     */ +    void release_data_buff(frame_buff::uptr& buff, size_t len) +    { +        if (len == 0) { +            _send_if->release_send_buff(std::move(buff)); +            return; +        } +        uint32_t* data   = (uint32_t*)buff->data(); +        data[LEN_OFFSET] = len; +        buff->set_packet_size((len + DATA_OFFSET) * sizeof(uint32_t)); +        _send_if->release_send_buff(std::move(buff)); +    } + +    /*! +     * Callback for sending the packet. Callback is responsible for calling +     * release_send_buff() if it wants to send the packet. This will require +     * moving the uptr's reference. If the packet will NOT be sent, the +     * callback must NOT release the uptr. +     * +     * Function should update any internal state needed. For example, flow +     * control state could be updated here, and the header could be filled out +     * as well, like the packet's sequence number and/or addresses. +     * +     * Callbacks execute on the I/O thread! Be careful about what state is +     * touched. In addition, this callback should NOT sleep. +     */ +    void send_buff(frame_buff::uptr& buff, send_link_if* send_link) +    { +        if (_seqno >= _ackno + _credits) { +            return; +        } +        uint32_t* data     = (uint32_t*)buff->data(); +        data[ADDR_OFFSET]  = _send_addr; +        data[SEQNO_OFFSET] = _seqno; +        send_link->release_send_buff(std::move(buff)); +        _seqno++; +    } + +    /*! +     * Callback for when packets are received (for processing). +     * Function should make a determination of whether the packet belongs to it +     * and return the bool. +     * +     * Function may consume and release the buffer internally (if packet was +     * destined for it). The recv_link_if may be used to release it, and the +     * provided frame_buff::uptr must relinquish ownership before returning. +     * If the buffer was not destined for the user of this function, buffer must +     * NOT be released, and the uptr must remain intact. +     * +     * Callbacks execute on the I/O thread! Be careful about what state is +     * touched. In addition, this callback should NOT sleep. +     * +     * \param frame_buff the buffer that was received +     * \param recv_link_if the link used to retrieve the buffer. Can be used to +     *  release the buffer back to the link, if buffer is consumed internally. +     * \return true if buffer matched this transport, false otherwise +     */ +    bool recv_buff(frame_buff::uptr& buff, recv_link_if* recv_link) +    { +        /* Check address and if no match, return false */ +        uint32_t* data = (uint32_t*)buff->data(); +        if (data[ADDR_OFFSET] != _recv_addr) { +            return false; +        } +        if (data[TYPE_OFFSET] == 1) { /* Flow control message */ +            _ackno = data[SEQNO_OFFSET]; +        } +        if (data[TYPE_OFFSET] != 0) { /* Only data packets go up to user */ +            recv_link->release_recv_buff(std::move(buff)); +        } else { /* mock_send_transport does not receive data packets */ +            return false; +        } +        return true; +    } + +    std::pair<uint32_t*, size_t> buff_to_data(frame_buff* buff) +    { +        uint32_t* data  = (uint32_t*)buff->data(); +        size_t data_len = buff->packet_size() - DATA_OFFSET * sizeof(uint32_t); +        return std::pair<uint32_t*, size_t>(&data[DATA_OFFSET], data_len); +    } + +private: +    uint32_t _send_addr; +    uint32_t _recv_addr; +    uint32_t _credits; +    send_io_if::sptr _msg_if; +    send_io_if::sptr _send_if; +    uint32_t _seqno = 0; +    uint32_t _ackno = 0; +}; + +/*! + * Mock transport with following packet format: + * Data: [dst_addr/src_addr, type, seqno, data_len, data...] + * FC: [dst_addr/src_addr, type, ackno] + * Msg: [dst_addr/src_addr, type, seqno, data_len, data...] + * All fields are 32-bit words (dst_addr and src_addr are 16 bits each) + */ +class mock_recv_transport +{ +public: +    using sptr = std::shared_ptr<mock_recv_transport>; + +    mock_recv_transport(io_service::sptr io_srv, +        recv_link_if::sptr recv_link, +        send_link_if::sptr send_link, +        uint16_t dst_addr, +        uint16_t src_addr, +        uint32_t credits) +        : _credits(credits) +    { +        _send_addr = (src_addr << 16) | (dst_addr << 0); +        _recv_addr = (dst_addr << 16) | (src_addr << 0); + +        /* Make client for sending streaming data */ +        recv_io_if::fc_callback_t send_cb = [this](frame_buff::uptr buff, +                                                recv_link_if* recv_link, +                                                send_link_if* send_link) { +            this->handle_flow_ctrl(std::move(buff), recv_link, send_link); +        }; +        recv_callback_t recv_cb = [this](frame_buff::uptr& buff, +                                      recv_link_if* link, +                                      send_link_if* /*send_link*/) { +            return this->recv_buff(buff, link); +        }; +        /* Pretend get 1 flow control message per sent packet */ +        _recv_if = io_srv->make_recv_client( +            recv_link, credits, recv_cb, send_link, credits, send_cb); +    } + +    ~mock_recv_transport() {} + +    /*! +     * Get a buffer for creating a non-flow-controlled message +     */ +    bool get_msg(uint32_t& msg) +    { +        if (_msg_queue.read_available()) { +            msg = _msg_queue.front(); +            _msg_queue.pop(); +            return true; +        } +        return false; +    } + +    /*! +      * Get an empty frame buffer in which to write packet contents. +      * +      * \param timeout_ms a positive timeout value specifies the maximum number +                          of ms to wait, a negative value specifies to block +                          until successful, and a value of 0 specifies no wait. +      * \return a frame buffer, or null uptr if timeout occurs +      */ +    frame_buff::uptr get_data_buff(int32_t timeout_ms) +    { +        return _recv_if->get_recv_buff(timeout_ms); +    } + +    /*! +     * Release a frame buffer, allowing the driver to reuse it. +     * +     * \param buffer frame buffer to release for reuse by the link +     */ +    void release_data_buff(frame_buff::uptr buff) +    { +        _recv_if->release_recv_buff(std::move(buff)); +    } + +    /*! +     * Callback for producing a flow control response. +     * This callback is run whenever a frame_buff is scheduled to be released. +     * +     * The callback must release the buffer, but it can update internal state +     * as well. It can also send a response with the send_link_if, should it +     * desire to do so. +     * +     * Callbacks execute on the I/O thread! Be careful about what state is +     * touched. In addition, this callback should NOT sleep. +     */ +    void handle_flow_ctrl( +        frame_buff::uptr buff, recv_link_if* recv_link, send_link_if* send_link) +    { +        uint32_t* data = (uint32_t*)buff->data(); +        if (data[TYPE_OFFSET] == 0) { +            frame_buff::uptr fc_buff = send_link->get_send_buff(0); +            UHD_ASSERT_THROW(fc_buff); +            uint32_t* fc_data     = (uint32_t*)fc_buff->data(); +            fc_data[SEQNO_OFFSET] = data[SEQNO_OFFSET]; +            recv_link->release_recv_buff(std::move(buff)); +            UHD_ASSERT_THROW(buff == nullptr); +            fc_data[TYPE_OFFSET] = 1; /* FC type */ +            fc_data[ADDR_OFFSET] = _send_addr; +            send_link->release_send_buff(std::move(fc_buff)); +        } else { +            recv_link->release_recv_buff(std::move(buff)); +        } +    } + +    /*! +     * Callback for when packets are received (for processing). +     * Function should make a determination of whether the packet belongs to it +     * and return the bool. +     * +     * Function may consume and release the buffer internally (if packet was +     * destined for it). The recv_link_if may be used to release it, and the +     * provided frame_buff::uptr must relinquish ownership before returning. +     * If the buffer was not destined for the user of this function, buffer must +     * NOT be released, and the uptr must remain intact. +     * +     * Callbacks execute on the I/O thread! Be careful about what state is +     * touched. In addition, this callback should NOT sleep. +     * +     * \param frame_buff the buffer that was received +     * \param recv_link_if the link used to retrieve the buffer. Can be used to +     *  release the buffer back to the link, if buffer is consumed internally. +     * \return true if buffer matched this transport, false otherwise +     */ +    bool recv_buff(frame_buff::uptr& buff, recv_link_if* recv_link) +    { +        /* Check address and if no match, return false */ +        uint32_t* data = (uint32_t*)buff->data(); +        if (data[ADDR_OFFSET] != _recv_addr) { +            return false; +        } +        if (data[TYPE_OFFSET] == 1) { /* No FC for mock_recv_transport */ +            return false; +        } +        if (data[TYPE_OFFSET] == 2) { /* Record message */ +            _msg_queue.push(data[DATA_OFFSET]); +            recv_link->release_recv_buff(std::move(buff)); +        } +        /* (Data packets will go up to user) */ +        return true; +    } + +    std::pair<uint32_t*, size_t> buff_to_data(frame_buff* buff) +    { +        uint32_t* data  = (uint32_t*)buff->data(); +        size_t data_len = data[LEN_OFFSET]; +        return std::pair<uint32_t*, size_t>(&data[DATA_OFFSET], data_len); +    } + +private: +    uint32_t _send_addr; +    uint32_t _recv_addr; +    uint32_t _credits; +    recv_io_if::sptr _recv_if; +    boost::lockfree::spsc_queue<uint32_t, boost::lockfree::capacity<8>> _msg_queue; +    uint32_t _seqno = 0; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_UHDLIB_TRANSPORT_TRANSPORT_IF_HPP */ diff --git a/host/tests/transport_test.cpp b/host/tests/transport_test.cpp new file mode 100644 index 000000000..3e86da2d8 --- /dev/null +++ b/host/tests/transport_test.cpp @@ -0,0 +1,188 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include "common/mock_link.hpp" +#include "common/mock_transport.hpp" +#include <uhdlib/transport/inline_io_service.hpp> +#include <boost/test/unit_test.hpp> + +using namespace uhd::transport; + +static mock_send_link::sptr make_send_link(size_t num_frames) +{ +    const mock_send_link::link_params params = {1000, num_frames}; +    return std::make_shared<mock_send_link>(params); +} + +static mock_recv_link::sptr make_recv_link(size_t num_frames) +{ +    const mock_recv_link::link_params params = {1000, num_frames}; +    return std::make_shared<mock_recv_link>(params); +} + +static mock_send_transport::sptr make_send_xport(io_service::sptr io_srv, +    send_link_if::sptr send_link, +    recv_link_if::sptr recv_link, +    uint16_t dst_addr, +    uint16_t src_addr, +    uint32_t credits) +{ +    return std::make_shared<mock_send_transport>( +        io_srv, send_link, recv_link, dst_addr, src_addr, credits); +} + +static mock_recv_transport::sptr make_recv_xport(io_service::sptr io_srv, +    recv_link_if::sptr recv_link, +    send_link_if::sptr send_link, +    uint16_t dst_addr, +    uint16_t src_addr, +    uint32_t credits) +{ +    return std::make_shared<mock_recv_transport>( +        io_srv, recv_link, send_link, dst_addr, src_addr, credits); +} + +BOOST_AUTO_TEST_CASE(test_construction) +{ +    auto io_srv    = inline_io_service::make(); +    auto send_link = make_send_link(40); +    io_srv->attach_send_link(send_link); +    auto recv_link = make_recv_link(40); +    io_srv->attach_recv_link(recv_link); +    auto send_xport = make_send_xport(io_srv, send_link, recv_link, 1, 2, 32); + +    auto send_buff = send_xport->get_data_buff(0); +    send_buff->set_packet_size(0); +    send_xport->release_data_buff(send_buff, 0); +    send_xport.reset(); + +    auto recv_xport = make_recv_xport(io_srv, recv_link, send_link, 1, 2, 32); +    uint32_t msg; +    UHD_ASSERT_THROW(recv_xport->get_msg(msg) == false); +} + +BOOST_AUTO_TEST_CASE(test_io) +{ +    auto io_srv     = inline_io_service::make(); +    auto send_link0 = make_send_link(40); +    io_srv->attach_send_link(send_link0); +    auto recv_link0 = make_recv_link(40); +    io_srv->attach_recv_link(recv_link0); +    auto send_xport = make_send_xport(io_srv, send_link0, recv_link0, 1, 2, 32); + +    auto send_link1 = make_send_link(40); +    io_srv->attach_send_link(send_link1); +    auto recv_link1 = make_recv_link(40); +    io_srv->attach_recv_link(recv_link1); +    auto recv_xport = make_recv_xport(io_srv, recv_link1, send_link1, 1, 2, 32); + +    /* FIXME: Testing async messages requires the dummy read -- To not have it, needs recv +     * mux + separate recv queue */ +    send_xport->put_msg(0xa5d3b33f, 0); +    auto packet = send_link0->pop_send_packet(); +    recv_link1->push_back_recv_packet(packet.first, packet.second); +    auto recv_buff = recv_xport->get_data_buff(0); +    if (recv_buff) { +        recv_xport->release_data_buff(std::move(recv_buff)); +    } +    uint32_t msg; +    UHD_ASSERT_THROW(recv_xport->get_msg(msg)); +    UHD_ASSERT_THROW(msg == 0xa5d3b33f); + +    auto send_buff = send_xport->get_data_buff(0); +    UHD_ASSERT_THROW(send_buff); +    auto buff_data = send_xport->buff_to_data(send_buff.get()); +    UHD_ASSERT_THROW(buff_data.second >= 16); +    uint32_t* data = buff_data.first; +    for (size_t i = 0; i < 16; i++) { +        data[i] = (uint32_t)i; +    } + +    send_xport->release_data_buff(send_buff, 16); +    packet = send_link0->pop_send_packet(); +    recv_link1->push_back_recv_packet(packet.first, packet.second); + +    recv_buff = recv_xport->get_data_buff(0); +    UHD_ASSERT_THROW(recv_buff); +    auto recv_data = recv_xport->buff_to_data(recv_buff.get()); +    UHD_ASSERT_THROW(recv_data.second == 16); +    data = recv_data.first; +    for (size_t i = 0; i < 16; i++) { +        UHD_ASSERT_THROW(data[i] == (uint32_t)i); +    } +    recv_xport->release_data_buff(std::move(recv_buff)); +} + +BOOST_AUTO_TEST_CASE(test_muxed_io) +{ +    auto io_srv    = inline_io_service::make(); +    auto send_link = make_send_link(80); +    io_srv->attach_send_link(send_link); +    auto recv_link = make_recv_link(80); +    io_srv->attach_recv_link(recv_link); +    auto send_xport = make_send_xport(io_srv, send_link, recv_link, 1, 2, 32); +    auto recv_xport = make_recv_xport(io_srv, recv_link, send_link, 1, 2, 32); + +    /* Send a sideband message */ +    send_xport->put_msg(0xa5d3b33f, 0); + +    /* Send some normal data */ +    auto send_buff = send_xport->get_data_buff(0); +    UHD_ASSERT_THROW(send_buff); +    auto buff_data = send_xport->buff_to_data(send_buff.get()); +    UHD_ASSERT_THROW(buff_data.second >= 16); +    uint32_t* data = buff_data.first; +    for (size_t i = 0; i < 16; i++) { +        data[i] = (uint32_t)i; +    } +    send_xport->release_data_buff(send_buff, 16); + +    /* Move the two packets over */ +    auto packet = send_link->pop_send_packet(); +    recv_link->push_back_recv_packet(packet.first, packet.second); +    packet = send_link->pop_send_packet(); +    recv_link->push_back_recv_packet(packet.first, packet.second); + +    /* Try to receive the data +     * (message won't arrive unless we try to get the data first) +     * However, the message should be processed and enqueued here +     */ +    auto recv_buff = recv_xport->get_data_buff(0); +    UHD_ASSERT_THROW(recv_buff); +    auto recv_data = recv_xport->buff_to_data(recv_buff.get()); +    UHD_ASSERT_THROW(recv_data.second == 16); +    data = recv_data.first; +    for (size_t i = 0; i < 16; i++) { +        UHD_ASSERT_THROW(data[i] == (uint32_t)i); +    } +    recv_xport->release_data_buff(std::move(recv_buff)); + +    /* Now can get the message */ +    uint32_t msg; +    UHD_ASSERT_THROW(recv_xport->get_msg(msg)); +    UHD_ASSERT_THROW(msg == 0xa5d3b33f); +} + +/* +BOOST_AUTO_TEST_CASE(test_oversubscribed) +{ +    auto io_srv = inline_io_service::make(); +    auto send_link = make_send_link(32); +    io_srv->attach_send_link(send_link); +    auto recv_link = make_recv_link(32); +    io_srv->attach_recv_link(recv_link); +    auto send_xport = make_send_xport(io_srv, send_link, recv_link, 1, 2, 32); + +    auto send_buff = send_xport->get_data_buff(0); +    send_buff->set_packet_size(0); +    send_xport->release_data_buff(send_buff, 0); +    send_xport.reset(); + +    auto recv_xport = make_recv_xport(io_srv, recv_link, send_link, 1, 2, 32); +    uint32_t msg; +    UHD_ASSERT_THROW(recv_xport->get_msg(msg) == false); +} +*/  | 
