diff options
31 files changed, 4540 insertions, 334 deletions
| diff --git a/host/include/uhd/rfnoc_graph.hpp b/host/include/uhd/rfnoc_graph.hpp index c13939ac9..08d5fc095 100644 --- a/host/include/uhd/rfnoc_graph.hpp +++ b/host/include/uhd/rfnoc_graph.hpp @@ -159,16 +159,17 @@ public:      /*! Connect a RFNOC block with block ID \p src_block to another with block ID \p       * dst_block.       * +     * Note you need to also call this on statically connected blocks if you +     * desire to use them. +     *       * \param src_blk The block ID of the source block to connect.       * \param src_port The port of the source block to connect.       * \param dst_blk The block ID of the destination block to connect to.       * \param dst_port The port of the destination block to connect to.       * \param skip_property_propagation Skip property propagation for this edge       * -     * \throws connect_disallowed_on_src -     *     if the source port is statically connected to a *different* block -     * \throws connect_disallowed_on_dst -     *     if the destination port is statically connected to a *different* block +     * \throws uhd::routing_error if the source or destination ports are +     *                            statically connected to a *different* block       */      virtual void connect(const block_id_t& src_blk,          size_t src_port, @@ -186,7 +187,7 @@ public:       * \throws connect_disallowed_on_dst       *     if the destination port is statically connected to a *different* block       */ -    virtual void connect(uhd::tx_streamer& streamer, +    virtual void connect(uhd::tx_streamer::sptr streamer,          size_t strm_port,          const block_id_t& dst_blk,          size_t dst_port) = 0; @@ -203,7 +204,7 @@ public:       */      virtual void connect(const block_id_t& src_blk,          size_t src_port, -        uhd::rx_streamer& streamer, +        uhd::rx_streamer::sptr streamer,          size_t strm_port) = 0;      /*! Enumerate all the possible static connections in the graph @@ -244,10 +245,12 @@ public:       *  start using it. If a different streamer is already connected       *  to the intended source then that call may fail.       * +     * \param num_ports Number of ports that will be connected to the streamer       * \param args Arguments to aid the construction of the streamer       * \return a shared pointer to a new streamer       */ -    //virtual rx_streamer::sptr create_rx_streamer(const stream_args_t& args) = 0; +    virtual rx_streamer::sptr create_rx_streamer(const size_t num_ports, +        const stream_args_t& args) = 0;      /*! Create a new transmit streamer from the streamer arguments       *  The created streamer is still not connected to anything yet. @@ -255,10 +258,12 @@ public:       *  start using it. If a different streamer is already connected       *  to the intended sink then that call may fail.       * +     * \param num_ports Number of ports that will be connected to the streamer       * \param args Arguments to aid the construction of the streamer       * \return a shared pointer to a new streamer       */ -    //virtual tx_streamer::sptr create_tx_streamer(const stream_args_t& args) = 0; +    virtual tx_streamer::sptr create_tx_streamer(const size_t num_ports, +        const stream_args_t& args) = 0;      /**************************************************************************       * Hardware Control diff --git a/host/lib/include/uhdlib/rfnoc/chdr_packet.hpp b/host/lib/include/uhdlib/rfnoc/chdr_packet.hpp index 770c6cf6f..cc729de6c 100644 --- a/host/lib/include/uhdlib/rfnoc/chdr_packet.hpp +++ b/host/lib/include/uhdlib/rfnoc/chdr_packet.hpp @@ -114,6 +114,15 @@ public:       */      virtual void* get_payload_ptr() = 0; +    /*! Return the payload offset in bytes for a given type and num_mdata +     * +     * \param pkt_type The packet type for calculation +     * \param num_mdata The number of metadata words for calculation +     * \return The offset of the payload in a packet with the given params +     */ +    virtual size_t calculate_payload_offset(const packet_type_t pkt_type, +        const uint8_t num_mdata = 0) const = 0; +      //! Shortcut to return the const metadata pointer cast as a specific type      template <typename data_t>      inline const data_t* get_mdata_const_ptr_as() const diff --git a/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp b/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp new file mode 100644 index 000000000..69dceebe1 --- /dev/null +++ b/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp @@ -0,0 +1,481 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_CHDR_RX_DATA_XPORT_HPP +#define INCLUDED_LIBUHD_CHDR_RX_DATA_XPORT_HPP + +#include <uhd/config.hpp> +#include <uhdlib/rfnoc/chdr_packet.hpp> +#include <uhdlib/rfnoc/chdr_types.hpp> +#include <uhdlib/rfnoc/mgmt_portal.hpp> +#include <uhdlib/rfnoc/rfnoc_common.hpp> +#include <uhdlib/rfnoc/rx_flow_ctrl_state.hpp> +#include <uhdlib/transport/io_service.hpp> +#include <uhdlib/transport/link_if.hpp> +#include <memory> + +namespace uhd { namespace rfnoc { + +namespace detail { + +/*! + * Utility class to send rx flow control responses + */ +class rx_flow_ctrl_sender +{ +public: +    //! Constructor +    rx_flow_ctrl_sender( +        const chdr::chdr_packet_factory& pkt_factory, const sep_id_pair_t sep_ids) +        : _dst_epid(sep_ids.first) +    { +        _fc_packet             = pkt_factory.make_strs(); +        _fc_strs_pyld.src_epid = sep_ids.second; +    } + +    /*! Configure buffer capacity +     * \param recv_capacity The buffer capacity of the receive link +     */ +    void set_capacity(const stream_buff_params_t& recv_capacity) +    { +        _fc_strs_pyld.capacity_bytes = recv_capacity.bytes; +        _fc_strs_pyld.capacity_pkts  = recv_capacity.packets; +    } + +    /*! Send a flow control response packet +     * +     * \param send_link the link to use to send the packet +     * \counts transfer counts for packet contents +     */ +    void send_strs(transport::send_link_if* send_link, const stream_buff_params_t& counts) +    { +        auto buff = send_link->get_send_buff(0); +        if (!buff) { +            throw uhd::runtime_error("rx_flowctrl timed out getting a send buffer"); +        } + +        chdr::chdr_header header; +        header.set_seq_num(_fc_seq_num++); +        header.set_dst_epid(_dst_epid); + +        chdr::strs_payload fc_payload(_fc_strs_pyld); +        fc_payload.xfer_count_bytes = counts.bytes; +        fc_payload.xfer_count_pkts  = counts.packets; + +        _fc_packet->refresh(buff->data(), header, fc_payload); +        const size_t size = header.get_length(); + +        buff->set_packet_size(size); +        send_link->release_send_buff(std::move(buff)); +    } + +private: +    // Endpoint ID for recipient of flow control response +    const sep_id_t _dst_epid; + +    // Packet for writing flow control info +    chdr::chdr_strs_packet::uptr _fc_packet; + +    // Pre-configured strs payload to hold values that don't change +    chdr::strs_payload _fc_strs_pyld; + +    // Sequence number for flow control packets +    uint16_t _fc_seq_num = 0; +}; +} // namespace detail + +/*! + * Flow-controlled transport for RX chdr data + * + * This transport provides the streamer an interface to read RX data packets. + * The transport implements flow control and sequence number checking. + * + * The transport uses I/O services to provide options for work scheduling. I/O + * services allow the I/O work to be offloaded to a worker thread or to be + * performed in the same thread as the streamer API calls. + * + * For an rx transport, the device sends data packets, and the host sends strs + * packets letting the device know that buffer space in the host has been freed. + * For lossy links, the device also sends strc packets to resynchronize the + * transfer counts between host and device, to correct for any dropped packets + * in the link. + */ +class chdr_rx_data_xport +{ +public: +    using uptr   = std::unique_ptr<chdr_rx_data_xport>; +    using buff_t = transport::frame_buff; + +    //! Values extracted from received RX data packets +    struct packet_info_t +    { +        bool eob             = false; +        bool has_tsf         = false; +        uint64_t tsf         = 0; +        size_t payload_bytes = 0; +        const void* payload  = nullptr; +    }; + +    /*! Constructor +     * +     * \param io_srv The service that will schedule the xport I/O +     * \param recv_link The recv link, already attached to the I/O service +     * \param send_link The send link, already attached to the I/O service +     * \param pkt_factory Factory to create packets with the desired chdr_w and endianness +     * \param addrs Source and destination addresses +     * \param epids Source and destination endpoint IDs +     * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload +     * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata +     * \param num_recv_frames Num frames to reserve from the recv link +     * \param recv_capacity Total capacity of the recv link +     * \param fc_freq Frequency of flow control status messages +     * \param fc_headroom Headroom for flow control status messages +     * \param lossy_xport Whether the xport is lossy, for flow control configuration +     */ +    chdr_rx_data_xport(uhd::transport::io_service::sptr io_srv, +        uhd::transport::recv_link_if::sptr recv_link, +        uhd::transport::send_link_if::sptr send_link, +        const chdr::chdr_packet_factory& pkt_factory, +        const uhd::rfnoc::sep_addr_pair_t& addrs, +        const uhd::rfnoc::sep_id_pair_t& epids, +        const uhd::rfnoc::sw_buff_t pyld_buff_fmt, +        const uhd::rfnoc::sw_buff_t mdata_buff_fmt, +        const size_t num_recv_frames, +        const stream_buff_params_t& recv_capacity, +        const stream_buff_params_t& fc_freq, +        const stream_buff_params_t& fc_headroom, +        const bool lossy_xport) +        : _fc_state(epids), _fc_sender(pkt_factory, epids), _epid(epids.second) +    { +        const sep_addr_t remote_sep_addr = addrs.first; +        const sep_addr_t local_sep_addr  = addrs.second; +        const sep_id_t remote_epid       = epids.first; +        const sep_id_t local_epid        = epids.second; + +        UHD_LOG_TRACE("XPORT::RX_DATA_XPORT", +            "Creating rx xport with local epid=" << local_epid +                                                 << ", remote epid=" << remote_epid); + +        _recv_packet = pkt_factory.make_generic(); +        _fc_sender.set_capacity(recv_capacity); + +        // Calculate max payload size +        const size_t pyld_offset = +            _recv_packet->calculate_payload_offset(chdr::PKT_TYPE_DATA_WITH_TS); +        _max_payload_size = recv_link->get_recv_frame_size() - pyld_offset; + +        // Make data transport +        auto recv_cb = [this](buff_t::uptr& buff, +                           transport::recv_link_if* recv_link, +                           transport::send_link_if* send_link) { +            return this->_recv_callback(buff, recv_link, send_link); +        }; + +        auto fc_cb = [this](buff_t::uptr buff, +                         transport::recv_link_if* recv_link, +                         transport::send_link_if* send_link) { +            this->_fc_callback(std::move(buff), recv_link, send_link); +        }; + +        // Needs just a single send frame for responses +        _recv_io = io_srv->make_recv_client(recv_link, +            num_recv_frames, +            recv_cb, +            send_link, +            /* num_send_frames*/ 1, +            fc_cb); + +        // Create a control transport with the rx data links to send mgmt packets +        // needed to setup the stream +        // Piggyback on frames from the recv_io_if +        auto ctrl_xport = uhd::rfnoc::chdr_ctrl_xport::make(io_srv, +            send_link, +            recv_link, +            local_epid, +            0, // num_send_frames +            0); // num_recv_frames + +        // Create new temporary management portal with the transports used for this stream +        // TODO: This is a bit excessive. Maybe we can pare down the functionality of the +        // portal just for route setup purposes. Whatever we do, we *must* use xport in it +        // though otherwise the transport will not behave correctly. +        auto data_mgmt_portal = uhd::rfnoc::mgmt::mgmt_portal::make( +            *ctrl_xport, pkt_factory, local_sep_addr, local_epid); + +        // Setup a route to the EPID +        // Note that this may be gratuitous--The endpoint may already have been set up +        data_mgmt_portal->initialize_endpoint(*ctrl_xport, remote_sep_addr, remote_epid); +        data_mgmt_portal->setup_local_route(*ctrl_xport, remote_epid); + +        // Initialize flow control - management portal sends a stream command +        // containing its requested flow control frequency, the rx transport +        // responds with a stream status containing its buffer capacity. +        data_mgmt_portal->config_local_rx_stream_start(*ctrl_xport, +            remote_epid, +            lossy_xport, +            pyld_buff_fmt, +            mdata_buff_fmt, +            fc_freq, +            fc_headroom); + +        data_mgmt_portal->config_local_rx_stream_commit(*ctrl_xport, remote_epid); + +        UHD_LOG_TRACE("XPORT::RX_DATA_XPORT", +            "Stream endpoint was configured with:" +                << std::endl +                << "capacity bytes=" << recv_capacity.bytes +                << ", packets=" << recv_capacity.packets << std::endl +                << "fc headroom bytes=" << fc_headroom.bytes +                << ", packets=" << fc_headroom.packets << std::endl +                << "fc frequency bytes=" << fc_freq.bytes +                << ", packets=" << fc_freq.packets); + +        // We no longer need the control xport and mgmt_portal, release them so +        // the control xport is no longer connected to the I/O service. +        data_mgmt_portal.reset(); +        ctrl_xport.reset(); +    } + +    /*! Returns maximum number payload bytes +     * +     * \return maximum payload bytes per packet +     */ +    size_t get_max_payload_size() const +    { +        return _max_payload_size; +    } + +    /*! +     * Gets an RX frame buffer containing a recv packet +     * +     * \param timeout_ms timeout in milliseconds +     * \return returns a tuple containing: +     * - a frame_buff, or null if timeout occurs +     * - info struct corresponding to the packet +     * - whether the packet was out of sequence +     */ +    std::tuple<typename buff_t::uptr, packet_info_t, bool> get_recv_buff( +        const int32_t timeout_ms) +    { +        buff_t::uptr buff = _recv_io->get_recv_buff(timeout_ms); + +        if (!buff) { +            return std::make_tuple(typename buff_t::uptr(), packet_info_t(), false); +        } + +        auto info      = _read_data_packet_info(buff); +        bool seq_error = _is_out_of_sequence(std::get<1>(info)); + +        return std::make_tuple(std::move(buff), std::get<0>(info), seq_error); +    } + +    /*! +     * Releases an RX frame buffer +     * +     * \param buff the frame buffer to release +     */ +    void release_recv_buff(typename buff_t::uptr buff) +    { +        _recv_io->release_recv_buff(std::move(buff)); +    } + +private: +    /*! +     * Recv callback for I/O service +     * +     * The I/O service invokes this callback when it reads a packet from the +     * recv link. +     * +     * \param buff the frame buffer containing the packet data +     * \param recv_link the recv link from which buff was read +     * \param send_link the send link for flow control messages +     */ +    bool _recv_callback(buff_t::uptr& buff, +        transport::recv_link_if* recv_link, +        transport::send_link_if* send_link) +    { +        _recv_packet->refresh(buff->data()); +        const auto header      = _recv_packet->get_chdr_header(); +        const auto type        = header.get_pkt_type(); +        const auto dst_epid    = header.get_dst_epid(); +        const auto packet_size = buff->packet_size(); + +        if (dst_epid != _epid) { +            return false; +        } + +        if (type == chdr::PKT_TYPE_STRC) { +            chdr::strc_payload strc; +            strc.deserialize(_recv_packet->get_payload_const_ptr_as<uint64_t>(), +                _recv_packet->get_payload_size() / sizeof(uint64_t), +                _recv_packet->conv_to_host<uint64_t>()); + +            const stream_buff_params_t strc_counts = { +                strc.num_bytes, static_cast<uint32_t>(strc.num_pkts)}; + +            if (strc.op_code == chdr::STRC_RESYNC) { +                // Resynchronize before updating fc_state, the strc payload +                // contains counts before the strc packet itself +                _fc_state.resynchronize(strc_counts); + +                // Update state that we received a packet +                _fc_state.data_received(packet_size); + +                recv_link->release_recv_buff(std::move(buff)); +                buff = buff_t::uptr(); +                _fc_state.xfer_done(packet_size); +                _send_fc_response(send_link); +            } else if (strc.op_code == chdr::STRC_INIT) { +                _fc_state.initialize( +                    {strc.num_bytes, static_cast<uint32_t>(strc.num_pkts)}); + +                UHD_LOG_TRACE("XPORT::RX_DATA_XPORT", +                    "Received strc init with fc freq" +                        << " bytes=" << strc.num_bytes << ", packets=" << strc.num_pkts); + +                // Make sure flow control was initialized +                assert(_fc_state.get_fc_freq().bytes > 0); +                assert(_fc_state.get_fc_freq().packets > 0); + +                // Send a strs response to configure flow control on the sender +                _fc_sender.send_strs(send_link, _fc_state.get_xfer_counts()); + +                // Reset counts, since mgmt_portal will do it to FPGA +                _fc_state.reset_counts(); + +                recv_link->release_recv_buff(std::move(buff)); +                buff = buff_t::uptr(); +            } else { +                throw uhd::value_error("Unexpected opcode value in STRC packet."); +            } + +            // For stream commands, we return true (packet was destined to this +            // client) but release the buffer. The I/O service won't queue this +            // packet in the recv_io_if. +            return true; + +        } else if (type == chdr::PKT_TYPE_DATA_NO_TS +                   || type == chdr::PKT_TYPE_DATA_WITH_TS) { +            // Update state that we received a packet +            _fc_state.data_received(packet_size); + +            // If this is a data packet, just claim it by returning true. The +            // I/O service will queue this packet in the recv_io_if. +            return true; + +        } else { +            return false; +        } +    } + +    /*! +     * Flow control callback for I/O service +     * +     * The I/O service invokes this callback when a packet needs to be released +     * to the recv link. +     * +     * \param buff the frame buffer containing the packet data +     * \param recv_link the recv link to which to release the buffer +     * \param send_link the send link for flow control messages +     */ +    void _fc_callback(buff_t::uptr buff, +        transport::recv_link_if* recv_link, +        transport::send_link_if* send_link) +    { +        const size_t packet_size = buff->packet_size(); +        recv_link->release_recv_buff(std::move(buff)); +        _fc_state.xfer_done(packet_size); +        _send_fc_response(send_link); +    } + +    /*! +     * Sends a flow control response packet if necessary. +     * +     * \param send_link the send link for flow control messages +     */ +    void _send_fc_response(transport::send_link_if* send_link) +    { +        if (_fc_state.fc_resp_due()) { +            _fc_sender.send_strs(send_link, _fc_state.get_xfer_counts()); +            _fc_state.fc_resp_sent(); +        } +    } + +    /*! +     * Checks if the sequence number is out of sequence, prints 'D' if it is +     * and returns result of check. +     * +     * \return true if a sequence error occurred +     */ +    UHD_FORCE_INLINE bool _is_out_of_sequence(uint16_t seq_num) +    { +        const uint16_t expected_packet_count = _data_seq_num; +        _data_seq_num                        = seq_num + 1; + +        if (expected_packet_count != seq_num) { +            UHD_LOG_FASTPATH("D"); +            return true; +        } +        return false; +    } + +    /*! +     * Reads packet header and returns information in a struct. +     * +     * \return a tuple containing the packet info and packet sequence number +     */ +    std::tuple<packet_info_t, uint16_t> _read_data_packet_info(buff_t::uptr& buff) +    { +        const void* data = buff->data(); +        _recv_packet->refresh(data); +        const auto header        = _recv_packet->get_chdr_header(); +        const auto optional_time = _recv_packet->get_timestamp(); + +        packet_info_t info; +        info.eob           = header.get_eob(); +        info.has_tsf       = optional_time.is_initialized(); +        info.tsf           = optional_time ? *optional_time : 0; +        info.payload_bytes = _recv_packet->get_payload_size(); +        info.payload       = _recv_packet->get_payload_const_ptr(); + +        const uint8_t* pkt_end = +            reinterpret_cast<uint8_t*>(buff->data()) + buff->packet_size(); +        const size_t pyld_pkt_len = +            pkt_end - reinterpret_cast<const uint8_t*>(info.payload); + +        if (pyld_pkt_len < info.payload_bytes) { +            _recv_io->release_recv_buff(std::move(buff)); +            throw uhd::value_error("Bad CHDR header or invalid packet length."); +        } + +        return std::make_tuple(info, header.get_seq_num()); +    } + +    // Interface to the I/O service +    transport::recv_io_if::sptr _recv_io; + +    // Flow control state +    rx_flow_ctrl_state _fc_state; + +    // Maximum data payload in bytes +    size_t _max_payload_size = 0; + +    // Sequence number for data packets +    uint16_t _data_seq_num = 0; + +    // Packet for received data +    chdr::chdr_packet::uptr _recv_packet; + +    // Handles sending of strs flow control response packets +    detail::rx_flow_ctrl_sender _fc_sender; + +    // Local / Sink EPID +    sep_id_t _epid; +}; + +}} // namespace uhd::rfnoc + +#endif /* INCLUDED_LIBUHD_CHDR_RX_DATA_XPORT_HPP */ diff --git a/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp new file mode 100644 index 000000000..62b811bf5 --- /dev/null +++ b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp @@ -0,0 +1,550 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_CHDR_TX_DATA_XPORT_HPP +#define INCLUDED_LIBUHD_CHDR_TX_DATA_XPORT_HPP + +#include <uhdlib/rfnoc/chdr_packet.hpp> +#include <uhdlib/rfnoc/chdr_types.hpp> +#include <uhdlib/rfnoc/mgmt_portal.hpp> +#include <uhdlib/rfnoc/rfnoc_common.hpp> +#include <uhdlib/rfnoc/tx_flow_ctrl_state.hpp> +#include <uhdlib/transport/io_service.hpp> +#include <uhdlib/transport/link_if.hpp> +#include <memory> + +namespace uhd { namespace rfnoc { + +namespace detail { + +/*! + * Utility class to send tx flow control messages + */ +class tx_flow_ctrl_sender +{ +public: +    //! Constructor +    tx_flow_ctrl_sender( +        const chdr::chdr_packet_factory& pkt_factory, const sep_id_pair_t sep_ids) +        : _dst_epid(sep_ids.second) +    { +        _fc_packet             = pkt_factory.make_strc(); +        _fc_strc_pyld.src_epid = sep_ids.first; +        _fc_strc_pyld.op_code  = chdr::STRC_RESYNC; +    } + +    /*! +     * Sends a flow control resync packet +     * +     * Sends a strc packet with the resync opcode to make the device transfer +     * counts match those of the host, to correct for dropped packets. +     * +     * \param send_link the link to use to send the packet +     * \counts transfer counts for packet contents +     */ +    size_t send_strc_resync( +        transport::send_link_if* send_link, const stream_buff_params_t& counts) +    { +        auto buff = send_link->get_send_buff(0); +        if (!buff) { +            throw uhd::runtime_error("tx_flowctrl timed out getting a send buffer"); +        } + +        chdr::chdr_header header; +        header.set_seq_num(_fc_seq_num++); +        header.set_dst_epid(_dst_epid); + +        chdr::strc_payload fc_payload(_fc_strc_pyld); +        fc_payload.num_bytes = counts.bytes; +        fc_payload.num_pkts  = counts.packets; + +        _fc_packet->refresh(buff->data(), header, fc_payload); +        const size_t size = header.get_length(); + +        buff->set_packet_size(size); +        send_link->release_send_buff(std::move(buff)); +        return size; +    } + +private: +    // Endpoint ID for recipient of flow control response +    const sep_id_t _dst_epid; + +    // Packet for writing flow control info +    chdr::chdr_strc_packet::uptr _fc_packet; + +    // Pre-configured strc payload to hold values that don't change +    chdr::strc_payload _fc_strc_pyld; + +    // Sequence number for flow control packets +    uint16_t _fc_seq_num = 0; +}; +} // namespace detail + +/*! + * Flow-controlled transport for TX chdr data + * + * This transport provides the streamer an interface to send TX data packets. + * The transport implements flow control and keeps track of sequence numbers. + * + * The transport uses I/O services to provide options for work scheduling. I/O + * services allow the I/O work to be offloaded to a worker thread or to be + * performed in the same thread as the streamer API calls. + * + * For a tx transport, the host sends data packets, and the device sends strs + * packets letting the host know that buffer space in the device stream endpoint + * has been freed. For lossy links, the host also sends strc packets to + * resynchronize the transfer counts between host and device, to correct for + * any dropped packets in the link. + */ +class chdr_tx_data_xport +{ +public: +    using uptr   = std::unique_ptr<chdr_tx_data_xport>; +    using buff_t = transport::frame_buff; + +    //! Information about data packet +    struct packet_info_t +    { +        bool eob             = false; +        bool has_tsf         = false; +        uint64_t tsf         = 0; +        size_t payload_bytes = 0; +    }; + +    /*! Constructor +     * +     * \param io_srv The service that will schedule the xport I/O +     * \param recv_link The recv link, already attached to the I/O service +     * \param send_link The send link, already attached to the I/O service +     * \param pkt_factory Factory to create packets with the desired chdr_w and endianness +     * \param addrs Source and destination addresses +     * \param epids Source and destination endpoint IDs +     * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload +     * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata +     * \param num_send_frames Num frames to reserve from the send link +     * \param fc_freq_ratio Ratio to use to configure the device fc frequency +     * \param fc_headroom_ratio Ratio to use to configure the device fc headroom +     */ +    chdr_tx_data_xport(uhd::transport::io_service::sptr io_srv, +        uhd::transport::recv_link_if::sptr recv_link, +        uhd::transport::send_link_if::sptr send_link, +        const chdr::chdr_packet_factory& pkt_factory, +        const uhd::rfnoc::sep_addr_pair_t& addrs, +        const uhd::rfnoc::sep_id_pair_t& epids, +        const uhd::rfnoc::sw_buff_t pyld_buff_fmt, +        const uhd::rfnoc::sw_buff_t mdata_buff_fmt, +        const size_t num_send_frames, +        const double fc_freq_ratio, +        const double fc_headroom_ratio) +        : _fc_sender(pkt_factory, epids), _epid(epids.first) +    { +        const sep_addr_t remote_sep_addr = addrs.second; +        const sep_addr_t local_sep_addr  = addrs.first; +        const sep_id_t remote_epid       = epids.second; +        const sep_id_t local_epid        = epids.first; + +        UHD_LOG_TRACE("XPORT::TX_DATA_XPORT", +            "Creating tx xport with local epid=" << local_epid +                                                 << ", remote epid=" << remote_epid); + +        _send_header.set_dst_epid(epids.second); +        _send_packet = pkt_factory.make_generic(); +        _recv_packet = pkt_factory.make_generic(); + +        // Calculate max payload size +        const size_t pyld_offset = +            _send_packet->calculate_payload_offset(chdr::PKT_TYPE_DATA_WITH_TS); +        _max_payload_size = send_link->get_send_frame_size() - pyld_offset; + +        _configure_sep(io_srv, +            recv_link, +            send_link, +            pkt_factory, +            local_sep_addr, +            local_epid, +            remote_sep_addr, +            remote_epid, +            pyld_buff_fmt, +            mdata_buff_fmt); + +        _initialize_flow_ctrl(io_srv, +            recv_link, +            send_link, +            pkt_factory, +            epids, +            fc_freq_ratio, +            fc_headroom_ratio); + +        // Now create the send I/O we will use for data +        auto send_cb = [this](buff_t::uptr& buff, transport::send_link_if* send_link) { +            this->_send_callback(buff, send_link); +        }; + +        auto recv_cb = [this](buff_t::uptr& buff, +                           transport::recv_link_if* recv_link, +                           transport::send_link_if* send_link) { +            return this->_recv_callback(buff, recv_link, send_link); +        }; + +        // Needs just a single recv frame for strs packets +        _send_io = io_srv->make_send_client(send_link, +            num_send_frames, +            send_cb, +            recv_link, +            /* num_recv_frames */ 1, +            recv_cb); +    } + +    /*! Returns maximum number of payload bytes +     * +     * \return maximum number of payload bytes +     */ +    size_t get_max_payload_size() const +    { +        return _max_payload_size; +    } + +    /*! +     * Gets a TX frame buffer +     * +     * \param timeout_ms timeout in milliseconds +     * \return the frame buffer, or nullptr if timeout occurs +     */ +    buff_t::uptr get_send_buff(const int32_t timeout_ms) +    { +        return _send_io->get_send_buff(timeout_ms); +    } + +    /*! +     * Sends a TX data packet +     * +     * \param buff the frame buffer containing the packet to send +     */ +    void release_send_buff(buff_t::uptr buff) +    { +        _send_io->release_send_buff(std::move(buff)); +    } + +    /*! +     * Writes header into frame buffer and returns payload pointer +     * +     * \param buff Frame buffer to write header into +     * \param info Information to include in the header +     * \return A pointer to the payload data area and the packet size in bytes +     */ +    std::pair<void*, size_t> write_packet_header(buff_t::uptr& buff, +        const packet_info_t& info) +    { +        uint64_t tsf = 0; + +        if (info.has_tsf) { +            _send_header.set_pkt_type(chdr::PKT_TYPE_DATA_WITH_TS); +            tsf = info.tsf; +        } else { +            _send_header.set_pkt_type(chdr::PKT_TYPE_DATA_NO_TS); +        } + +        _send_header.set_eob(info.eob); +        _send_header.set_seq_num(_data_seq_num++); + +        _send_packet->refresh(buff->data(), _send_header, tsf); +        _send_packet->update_payload_size(info.payload_bytes); + +        return std::make_pair( +            _send_packet->get_payload_ptr(), +            _send_packet->get_chdr_header().get_length()); +    } + +private: +    /*! +     * Recv callback for I/O service +     * +     * The I/O service invokes this callback when it reads a packet from the +     * recv link. +     * +     * \param buff the frame buffer containing the packet data +     * \param recv_link the recv link from which buff was read +     * \param send_link the send link for flow control messages +     */ +    bool _recv_callback(buff_t::uptr& buff, +        transport::recv_link_if* recv_link, +        transport::send_link_if* /*send_link*/) +    { +        _recv_packet->refresh(buff->data()); +        const auto header   = _recv_packet->get_chdr_header(); +        const auto type     = header.get_pkt_type(); +        const auto dst_epid = header.get_dst_epid(); + +        if (dst_epid != _epid) { +            return false; +        } + +        if (type == chdr::PKT_TYPE_STRS) { +            chdr::strs_payload strs; +            strs.deserialize(_recv_packet->get_payload_const_ptr_as<uint64_t>(), +                _recv_packet->get_payload_size() / sizeof(uint64_t), +                _recv_packet->conv_to_host<uint64_t>()); + +            _fc_state.update_dest_recv_count({strs.xfer_count_bytes, +                static_cast<uint32_t>(strs.xfer_count_pkts)}); + +            // TODO: check strs status here and push into async msg queue + +            // Packet belongs to this transport, release buff and return true +            recv_link->release_recv_buff(std::move(buff)); +            buff = nullptr; +            return true; +        } else { +            UHD_THROW_INVALID_CODE_PATH(); +        } +    } + +    /*! +     * Send callback for I/O service +     * +     * The I/O service invokes this callback when it is requested to release +     * a send buffer to the send link. +     * +     * \param buff the frame buffer to release +     * \param send_link the send link for flow control messages +     */ +    void _send_callback(buff_t::uptr& buff, transport::send_link_if* send_link) +    { +        const size_t packet_size = buff->packet_size(); + +        if (_fc_state.dest_has_space(packet_size)) { +            send_link->release_send_buff(std::move(buff)); +            buff = nullptr; + +            _fc_state.data_sent(packet_size); + +            if (_fc_state.get_fc_resync_req_pending() +                && _fc_state.dest_has_space(chdr::strc_payload::PACKET_SIZE)) { +                const auto& xfer_counts = _fc_state.get_xfer_counts(); +                const size_t strc_size = +                    _fc_sender.send_strc_resync(send_link, xfer_counts); +                _fc_state.clear_fc_resync_req_pending(); +                _fc_state.data_sent(strc_size); +            } +        } +    } + +    /*! +     * Configures the stream endpoint using mgmt_portal +     */ +    void _configure_sep(uhd::transport::io_service::sptr io_srv, +        uhd::transport::recv_link_if::sptr recv_link, +        uhd::transport::send_link_if::sptr send_link, +        const chdr::chdr_packet_factory& pkt_factory, +        const uhd::rfnoc::sep_addr_t& local_sep_addr, +        const uhd::rfnoc::sep_id_t& local_epid, +        const uhd::rfnoc::sep_addr_t& remote_sep_addr, +        const uhd::rfnoc::sep_id_t& remote_epid, +        const uhd::rfnoc::sw_buff_t pyld_buff_fmt, +        const uhd::rfnoc::sw_buff_t mdata_buff_fmt) +    { +        // Create a control transport with the tx data links to send mgmt packets +        // needed to setup the stream. Only need one frame for this. +        auto ctrl_xport = uhd::rfnoc::chdr_ctrl_xport::make(io_srv, +            send_link, +            recv_link, +            local_epid, +            1, // num_send_frames +            1); // num_recv_frames + +        // Create new temporary management portal with the transports used for this stream +        // TODO: This is a bit excessive. Maybe we can pare down the functionality of the +        // portal just for route setup purposes. Whatever we do, we *must* use xport in it +        // though otherwise the transport will not behave correctly. +        auto data_mgmt_portal = uhd::rfnoc::mgmt::mgmt_portal::make( +            *ctrl_xport, pkt_factory, local_sep_addr, local_epid); + +        // Setup a route to the EPID +        data_mgmt_portal->initialize_endpoint(*ctrl_xport, remote_sep_addr, remote_epid); +        data_mgmt_portal->setup_local_route(*ctrl_xport, remote_epid); + +        data_mgmt_portal->config_local_tx_stream( +            *ctrl_xport, remote_epid, pyld_buff_fmt, mdata_buff_fmt); + +        // We no longer need the control xport and mgmt_portal, release them so +        // the control xport is no longer connected to the I/O service. +        data_mgmt_portal.reset(); +        ctrl_xport.reset(); +    } + +    /*! +     * Initializes flow control +     * +     * To initialize flow control, we need to send an init strc packet, then +     * receive a strs containing the stream endpoint ingress buffer size. We +     * then repeat this (now that we know the buffer size) to configure the flow +     * control frequency. To avoid having this logic within the data packet +     * processing flow, we use temporary send and recv I/O instances with +     * simple callbacks here. +     */ +    void _initialize_flow_ctrl(uhd::transport::io_service::sptr io_srv, +        uhd::transport::recv_link_if::sptr recv_link, +        uhd::transport::send_link_if::sptr send_link, +        const chdr::chdr_packet_factory& pkt_factory, +        const sep_id_pair_t sep_ids, +        const double fc_freq_ratio, +        const double fc_headroom_ratio) +    { +        // No flow control at initialization, just release all send buffs +        auto send_cb = [this](buff_t::uptr& buff, transport::send_link_if* send_link) { +            send_link->release_send_buff(std::move(buff)); +            buff = nullptr; +        }; + +        // For recv, just queue strs packets for recv_io to read +        auto recv_cb = [this](buff_t::uptr& buff, +                           transport::recv_link_if* /*recv_link*/, +                           transport::send_link_if* /*send_link*/) { +            _recv_packet->refresh(buff->data()); +            const auto header   = _recv_packet->get_chdr_header(); +            const auto type     = header.get_pkt_type(); +            const auto dst_epid = header.get_dst_epid(); + +            return (dst_epid == _epid && type == chdr::PKT_TYPE_STRS); +        }; + +        // No flow control at initialization, just release all recv buffs +        auto fc_cb = [this](buff_t::uptr buff, +                         transport::recv_link_if* recv_link, +                         transport::send_link_if* /*send_link*/) { +            recv_link->release_recv_buff(std::move(buff)); +        }; + +        auto send_io = io_srv->make_send_client(send_link, +            1, // num_send_frames +            send_cb, +            nullptr, +            0, // num_recv_frames +            nullptr); + +        auto recv_io = io_srv->make_recv_client(recv_link, +            1, // num_recv_frames +            recv_cb, +            nullptr, +            0, // num_send_frames +            fc_cb); + +        chdr::chdr_strc_packet::uptr strc_packet = pkt_factory.make_strc(); +        chdr::chdr_packet::uptr& recv_packet     = _recv_packet; + +        // Function to send a strc init +        auto send_strc_init = [&send_io, sep_ids, &strc_packet]( +                                  const stream_buff_params_t fc_freq = {0, 0}) { +            transport::frame_buff::uptr buff = send_io->get_send_buff(0); + +            if (!buff) { +                throw uhd::runtime_error( +                    "tx xport timed out getting a send buffer for strc init"); +            } + +            chdr::chdr_header header; +            header.set_seq_num(0); +            header.set_dst_epid(sep_ids.second); + +            chdr::strc_payload strc_pyld; +            strc_pyld.src_epid  = sep_ids.first; +            strc_pyld.op_code   = chdr::STRC_INIT; +            strc_pyld.num_bytes = fc_freq.bytes; +            strc_pyld.num_pkts  = fc_freq.packets; +            strc_packet->refresh(buff->data(), header, strc_pyld); + +            const size_t size = header.get_length(); +            buff->set_packet_size(size); +            send_io->release_send_buff(std::move(buff)); +        }; + +        // Function to receive a strs, returns buffer capacity +        auto recv_strs = [&recv_io, &recv_packet]() -> stream_buff_params_t { +            transport::frame_buff::uptr buff = recv_io->get_recv_buff(200); + +            if (!buff) { +                throw uhd::runtime_error( +                    "tx xport timed out wating for a strs packet during initialization"); +            } + +            recv_packet->refresh(buff->data()); +            UHD_ASSERT_THROW( +                recv_packet->get_chdr_header().get_pkt_type() == chdr::PKT_TYPE_STRS); +            chdr::strs_payload strs; +            strs.deserialize(recv_packet->get_payload_const_ptr_as<uint64_t>(), +                recv_packet->get_payload_size() / sizeof(uint64_t), +                recv_packet->conv_to_host<uint64_t>()); + +            recv_io->release_recv_buff(std::move(buff)); + +            return {strs.capacity_bytes, +                static_cast<uint32_t>(strs.capacity_pkts)}; +        }; + +        // Send a strc init to get the buffer size +        send_strc_init(); +        stream_buff_params_t capacity = recv_strs(); +        _fc_state.set_dest_capacity(capacity); + +        UHD_LOG_TRACE("XPORT::TX_DATA_XPORT", +            "Received strs initializing buffer capacity to " +            << capacity.bytes << " bytes"); + +        // Calculate the requested fc_freq parameters +        uhd::rfnoc::stream_buff_params_t fc_freq = { +            static_cast<uint64_t>(std::ceil(double(capacity.bytes) * fc_freq_ratio)), +            static_cast<uint32_t>( +                std::ceil(double(capacity.packets) * fc_freq_ratio))}; + +        const size_t headroom_bytes = +            static_cast<uint64_t>(std::ceil(double(capacity.bytes) * fc_headroom_ratio)); +        const size_t headroom_packets = static_cast<uint32_t>( +            std::ceil(double(capacity.packets) * fc_headroom_ratio)); + +        fc_freq.bytes -= headroom_bytes; +        fc_freq.packets -= headroom_packets; + +        // Send a strc init to configure fc freq +        send_strc_init(fc_freq); +        recv_strs(); + +        // Release temporary I/O service interfaces to disconnect from it +        send_io.reset(); +        recv_io.reset(); +    } + +    // Interface to the I/O service +    transport::send_io_if::sptr _send_io; + +    // Flow control state +    tx_flow_ctrl_state _fc_state; + +    // Maximum data payload in bytes +    size_t _max_payload_size = 0; + +    // Sequence number for data packets +    uint16_t _data_seq_num = 0; + +    // Header to write into send packets +    chdr::chdr_header _send_header; + +    // Packet for send data +    chdr::chdr_packet::uptr _send_packet; + +    // Packet to receive strs messages +    chdr::chdr_packet::uptr _recv_packet; + +    // Handles sending of strc flow control ack packets +    detail::tx_flow_ctrl_sender _fc_sender; + +    // Local / Source EPID +    sep_id_t _epid; +}; + +}} // namespace uhd::rfnoc + +#endif /* INCLUDED_LIBUHD_CHDR_TX_DATA_XPORT_HPP */ diff --git a/host/lib/include/uhdlib/rfnoc/chdr_types.hpp b/host/lib/include/uhdlib/rfnoc/chdr_types.hpp index 62b24ab61..1f14ea7d0 100644 --- a/host/lib/include/uhdlib/rfnoc/chdr_types.hpp +++ b/host/lib/include/uhdlib/rfnoc/chdr_types.hpp @@ -482,6 +482,8 @@ public: // Members      uint64_t num_pkts = 0;      //! Number of bytes to use for operation (64 bits)      uint64_t num_bytes = 0; +    //! Size of a strc packet (including header) +    static constexpr size_t PACKET_SIZE = 24;  public: // Functions      strc_payload()                        = default; diff --git a/host/lib/include/uhdlib/rfnoc/graph_stream_manager.hpp b/host/lib/include/uhdlib/rfnoc/graph_stream_manager.hpp index 120b0e0f8..28fa8ec7c 100644 --- a/host/lib/include/uhdlib/rfnoc/graph_stream_manager.hpp +++ b/host/lib/include/uhdlib/rfnoc/graph_stream_manager.hpp @@ -7,11 +7,14 @@  #ifndef INCLUDED_LIBUHD_RFNOC_GRAPH_STREAM_MANAGER_HPP  #define INCLUDED_LIBUHD_RFNOC_GRAPH_STREAM_MANAGER_HPP +#include <uhd/stream.hpp>  #include <uhdlib/rfnoc/chdr_packet.hpp>  #include <uhdlib/rfnoc/client_zero.hpp>  #include <uhdlib/rfnoc/ctrlport_endpoint.hpp>  #include <uhdlib/rfnoc/epid_allocator.hpp>  #include <uhdlib/rfnoc/mb_iface.hpp> +#include <uhdlib/rfnoc/chdr_rx_data_xport.hpp> +#include <uhdlib/rfnoc/chdr_tx_data_xport.hpp>  #include <functional>  #include <memory>  #include <set> @@ -84,6 +87,7 @@ public:      virtual detail::client_zero::sptr get_client_zero(          sep_addr_t dst_addr, device_id_t via_device = NULL_DEVICE_ID) const = 0; +      /*! Configure a flow controlled data stream from the endpoint with ID src_epid to the       *  endpoint with ID dst_epid       * @@ -102,7 +106,33 @@ public:          const double fc_headroom_ratio,          const bool reset = false) = 0; -    // TODO: Implement functions to get graph-wide streamers +    /*! \brief Create a data stream going from the device to the host +     * +     * \param dst_addr The address of the destination stream endpoint +     * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload +     * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata +     * \param xport_args The transport arguments +     * \return An transport instance +     */ +    virtual chdr_rx_data_xport::uptr create_device_to_host_data_stream( +        sep_addr_t dst_addr, +        const sw_buff_t pyld_buff_fmt, +        const sw_buff_t mdata_buff_fmt, +        const device_addr_t& xport_args) = 0; + +    /*! \brief Create a data stream going from the host to the device +     * +     * \param dst_addr The address of the destination stream endpoint +     * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload +     * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata +     * \param xport_args The transport arguments +     * \return An transport instance +     */ +    virtual chdr_tx_data_xport::uptr create_host_to_device_data_stream( +        sep_addr_t dst_addr, +        const sw_buff_t pyld_buff_fmt, +        const sw_buff_t mdata_buff_fmt, +        const device_addr_t& xport_args) = 0;      /*!       * \brief Create a graph_stream_manager and return a unique_ptr to it diff --git a/host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp b/host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp index 79121a498..72f1cf1c7 100644 --- a/host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp +++ b/host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp @@ -11,6 +11,7 @@  #include <uhdlib/rfnoc/ctrlport_endpoint.hpp>  #include <uhdlib/rfnoc/epid_allocator.hpp>  #include <uhdlib/rfnoc/mb_iface.hpp> +#include <uhdlib/rfnoc/chdr_rx_data_xport.hpp>  #include <functional>  #include <memory>  #include <set> @@ -112,41 +113,29 @@ public:      /*! \brief Create a data stream going from the host to the device       *       * \param dst_addr The address of the destination stream endpoint -     * \param lossy_xport Is the transport lossy?       * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload       * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata -     * \param fc_freq_ratio Flow control response frequency as a ratio of the buff params -     * \param fc_headroom_ratio Flow control headroom as a ratio of the buff params       * \param xport_args The transport arguments       * \return An transport instance       */      virtual chdr_tx_data_xport::uptr create_host_to_device_data_stream(          const sep_addr_t dst_addr, -        const bool lossy_xport,          const sw_buff_t pyld_buff_fmt,          const sw_buff_t mdata_buff_fmt, -        const double fc_freq_ratio, -        const double fc_headroom_ratio,          const device_addr_t& xport_args) = 0;      /*! \brief Create a data stream going from the device to the host       *       * \param dst_addr The address of the destination stream endpoint -     * \param lossy_xport Is the transport lossy?       * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload       * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata -     * \param fc_freq_ratio Flow control response frequency as a ratio of the buff params -     * \param fc_headroom_ratio Flow control headroom as a ratio of the buff params       * \param xport_args The transport arguments       * \return An transport instance       */      virtual chdr_rx_data_xport::uptr create_device_to_host_data_stream(          const sep_addr_t src_addr, -        const bool lossy_xport,          const sw_buff_t pyld_buff_fmt,          const sw_buff_t mdata_buff_fmt, -        const double fc_freq_ratio, -        const double fc_headroom_ratio,          const device_addr_t& xport_args) = 0;      static uptr make(const chdr::chdr_packet_factory& pkt_factory, diff --git a/host/lib/include/uhdlib/rfnoc/mb_iface.hpp b/host/lib/include/uhdlib/rfnoc/mb_iface.hpp index 0a2790a34..33a0e3df0 100644 --- a/host/lib/include/uhdlib/rfnoc/mb_iface.hpp +++ b/host/lib/include/uhdlib/rfnoc/mb_iface.hpp @@ -8,21 +8,14 @@  #define INCLUDED_LIBUHD_MB_IFACE_HPP  #include <uhdlib/rfnoc/chdr_ctrl_xport.hpp> -#include <uhdlib/rfnoc/rfnoc_common.hpp> +#include <uhdlib/rfnoc/chdr_rx_data_xport.hpp> +#include <uhdlib/rfnoc/chdr_tx_data_xport.hpp>  #include <uhdlib/rfnoc/clock_iface.hpp> +#include <uhdlib/rfnoc/rfnoc_common.hpp>  #include <memory>  namespace uhd { namespace rfnoc { -// FIXME: Update this -class chdr_rx_data_xport -{ -public: -    using uptr = std::unique_ptr<chdr_rx_data_xport>; -}; - -using chdr_tx_data_xport = chdr_rx_data_xport; -  /*! Motherboard (backchannel) interface   *   * In RFNoC devices, the RFNoC subystem needs a backchannel interface to talk to @@ -70,7 +63,8 @@ public:      /*! Return a reference to a clock iface       */ -    virtual std::shared_ptr<clock_iface> get_clock_iface(const std::string& clock_name) = 0; +    virtual std::shared_ptr<clock_iface> get_clock_iface( +        const std::string& clock_name) = 0;      /*! Create a control transport       * @@ -89,30 +83,34 @@ public:       *       * This is typically called once per streaming channel.       * -     * \param local_device_id ID for the host transport adapter to use -     * \param local_epid Host (sink) streaming endpoint ID -     * \param remote_epid Remote device (source) streaming endpoint ID +     * \param addrs Address of the device and host stream endpoints +     * \param epids Endpoint IDs of the device and host stream endpoints +     * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload +     * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata       * \param xport_args Transport configuration args       * \return A CHDR RX data transport       */ -    virtual chdr_rx_data_xport::uptr make_rx_data_transport(device_id_t local_device_id, -        const sep_id_t& local_epid, -        const sep_id_t& remote_epid, +    virtual chdr_rx_data_xport::uptr make_rx_data_transport(const sep_addr_pair_t& addrs, +        const sep_id_pair_t& epids, +        const sw_buff_t pyld_buff_fmt, +        const sw_buff_t mdata_buff_fmt,          const device_addr_t& xport_args) = 0;      /*! Create an TX data transport       *       * This is typically called once per streaming channel.       * -     * \param local_device_id ID for the host transport adapter to use -     * \param local_epid Host (source) streaming endpoint ID -     * \param remote_epid Remote device (sink) streaming endpoint ID +     * \param addrs Address of the host and device stream endpoints +     * \param epids Endpoint IDs of the host and device stream endpoints +     * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload +     * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata       * \param xport_args Transport configuration args       * \return A CHDR TX data transport       */ -    virtual chdr_tx_data_xport::uptr make_tx_data_transport(device_id_t local_device_id, -        const sep_id_t& local_epid, -        const sep_id_t& remote_epid, +    virtual chdr_tx_data_xport::uptr make_tx_data_transport(const sep_addr_pair_t& addrs, +        const sep_id_pair_t& epids, +        const uhd::rfnoc::sw_buff_t pyld_buff_fmt, +        const uhd::rfnoc::sw_buff_t mdata_buff_fmt,          const device_addr_t& xport_args) = 0;  }; diff --git a/host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp b/host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp index 7ec1b7bb2..bc56fd311 100644 --- a/host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp +++ b/host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp @@ -41,6 +41,8 @@ using device_id_t = uint16_t;  using sep_inst_t = uint16_t;  //! Stream Endpoint Physical Address Type  using sep_addr_t = std::pair<device_id_t, sep_inst_t>; +//! Stream Endpoint Physical Address Type (first = source, second = destination) +using sep_addr_pair_t = std::pair<sep_addr_t, sep_addr_t>;  //! Stream Endpoint ID Type  using sep_id_t = uint16_t;  //! Stream Endpoint pair Type (first = source, second = destination) @@ -65,6 +67,19 @@ struct stream_buff_params_t  //! The data type of the buffer used to capture/generate data  enum sw_buff_t { BUFF_U64 = 0, BUFF_U32 = 1, BUFF_U16 = 2, BUFF_U8 = 3 }; +//! Conversion from number of bits to sw_buff +constexpr sw_buff_t bits_to_sw_buff(size_t bits) +{ +    if (bits <= 8) { +        return BUFF_U8; +    } else if (bits <= 16) { +        return BUFF_U16; +    } else if (bits <= 32) { +        return BUFF_U32; +    } else { +        return BUFF_U64; +    } +}  //----------------------------------------------  // Constants @@ -72,6 +87,12 @@ enum sw_buff_t { BUFF_U64 = 0, BUFF_U32 = 1, BUFF_U16 = 2, BUFF_U8 = 3 };  constexpr uint16_t RFNOC_PROTO_VER = 0x0100; +constexpr uint64_t MAX_FC_CAPACITY_BYTES = (uint64_t(1) << 40) - 1; +constexpr uint32_t MAX_FC_CAPACITY_PKTS  = (uint32_t(1) << 24) - 1; +constexpr uint64_t MAX_FC_FREQ_BYTES     = (uint64_t(1) << 40) - 1; +constexpr uint32_t MAX_FC_FREQ_PKTS      = (uint32_t(1) << 24) - 1; +constexpr uint64_t MAX_FC_HEADROOM_BYTES = (uint64_t(1) << 16) - 1; +constexpr uint32_t MAX_FC_HEADROOM_PKTS  = (uint32_t(1) << 8) - 1;  }} // namespace uhd::rfnoc diff --git a/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp b/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp new file mode 100644 index 000000000..6ced60d19 --- /dev/null +++ b/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp @@ -0,0 +1,95 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_RFNOC_RX_STREAMER_HPP +#define INCLUDED_LIBUHD_RFNOC_RX_STREAMER_HPP + +#include <uhd/rfnoc/node.hpp> +#include <uhdlib/rfnoc/chdr_rx_data_xport.hpp> +#include <uhdlib/transport/rx_streamer_impl.hpp> +#include <string> + +namespace uhd { namespace rfnoc { + +/*! + *  Extends the streamer_impl to be an rfnoc node so it can connect to the + *  graph. Configures the streamer conversion and rate parameters with values + *  received through property propagation. + */ +class rfnoc_rx_streamer : public node_t, +                          public transport::rx_streamer_impl<chdr_rx_data_xport> +{ +public: +    /*! Constructor +     * +     * \param num_ports The number of ports +     * \param stream_args Arguments to aid the construction of the streamer +     */ +    rfnoc_rx_streamer(const size_t num_ports, const uhd::stream_args_t stream_args); + +    /*! Returns a unique identifier string for this node. In every RFNoC graph, +     * no two nodes cannot have the same ID. Returns a string in the form of +     * "RxStreamer#0". +     * +     * \returns The unique ID as a string +     */ +    std::string get_unique_id() const; + +    /*! Returns the number of input ports for this block. +     * +     * \return noc_id The number of ports +     */ +    size_t get_num_input_ports() const; + +    /*! Returns the number of output ports for this block. +     * +     * Always returns 0 for this block. +     * +     * \return noc_id The number of ports +     */ +    size_t get_num_output_ports() const; + +    /*! Implementation of rx_streamer API method +     * +     * \param stream_cmd the stream command to issue +     */ +    void issue_stream_cmd(const stream_cmd_t& stream_cmd); + +    /*! Returns stream args provided at creation +     * +     * \return stream args provided when streamer is created +     */ +    const uhd::stream_args_t& get_stream_args() const; + +    /*! Check that all streamer ports are connected to blocks +     * +     * Overrides node_t to ensure there are no unconnected ports. +     * +     * \param connected_inputs A list of input ports that are connected +     * \param connected_outputs A list of output ports that are connected +     * \returns true if the block can deal with this configuration +     */ +    bool check_topology(const std::vector<size_t>& connected_inputs, +        const std::vector<size_t>& connected_outputs); +private: +    void _register_props(const size_t chan, const std::string& otw_format); + +    // Properties +    std::vector<property_t<double>> _scaling_in; +    std::vector<property_t<double>> _samp_rate_in; +    std::vector<property_t<double>> _tick_rate_in; +    std::vector<property_t<std::string>> _type_in; + +    // Streamer unique ID +    const std::string _unique_id; + +    // Stream args provided at construction +    const uhd::stream_args_t _stream_args; +}; + +}} // namespace uhd::rfnoc + +#endif /* INCLUDED_LIBUHD_RFNOC_RX_STREAMER_HPP */ diff --git a/host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp b/host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp new file mode 100644 index 000000000..4acee45cc --- /dev/null +++ b/host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp @@ -0,0 +1,90 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_RFNOC_TX_STREAMER_HPP +#define INCLUDED_LIBUHD_RFNOC_TX_STREAMER_HPP + +#include <uhd/rfnoc/node.hpp> +#include <uhdlib/rfnoc/chdr_tx_data_xport.hpp> +#include <uhdlib/transport/tx_streamer_impl.hpp> +#include <string> + +namespace uhd { namespace rfnoc { + +/*! + *  Extends the streamer_impl to be an rfnoc node so it can connect to the + *  graph. Configures the streamer conversion and rate parameters with values + *  received through property propagation. + */ +class rfnoc_tx_streamer : public node_t, +                          public transport::tx_streamer_impl<chdr_tx_data_xport> +{ +public: +    /*! Constructor +     * +     * \param num_ports The number of ports +     * \param stream_args Arguments to aid the construction of the streamer +     */ +    rfnoc_tx_streamer(const size_t num_chans, const uhd::stream_args_t stream_args); + +    /*! Returns a unique identifier string for this node. In every RFNoC graph, +     * no two nodes cannot have the same ID. Returns a string in the form of +     * "TxStreamer#0". +     * +     * \returns The unique ID as a string +     */ +    std::string get_unique_id() const; + +    /*! Returns the number of input ports for this block. +     * +     * Always returns 0 for this block. +     * +     * \return noc_id The number of ports +     */ +    size_t get_num_input_ports() const; + +    /*! Returns the number of output ports for this block. +     * +     * \return noc_id The number of ports +     */ +    size_t get_num_output_ports() const; + +    /*! Returns stream args provided at creation +     * +     * \return stream args provided when streamer is created +     */ +    const uhd::stream_args_t& get_stream_args() const; + +    /*! Check that all streamer ports are connected to blocks +     * +     * Overrides node_t to ensure there are no unconnected ports. +     * +     * \param connected_inputs A list of input ports that are connected +     * \param connected_outputs A list of output ports that are connected +     * \returns true if the block can deal with this configuration +     */ +    bool check_topology(const std::vector<size_t>& connected_inputs, +        const std::vector<size_t>& connected_outputs); + +private: +    void _register_props(const size_t chan, const std::string& otw_format); + +    // Properties +    std::vector<property_t<double>> _scaling_out; +    std::vector<property_t<double>> _samp_rate_out; +    std::vector<property_t<double>> _tick_rate_out; +    std::vector<property_t<std::string>> _type_out; + +    // Streamer unique ID +    const std::string _unique_id; + +    // Stream args provided at construction +    const uhd::stream_args_t _stream_args; +}; + +}} // namespace uhd::rfnoc + +#endif /* INCLUDED_LIBUHD_RFNOC_TX_STREAMER_HPP */ diff --git a/host/lib/include/uhdlib/rfnoc/rx_flow_ctrl_state.hpp b/host/lib/include/uhdlib/rfnoc/rx_flow_ctrl_state.hpp new file mode 100644 index 000000000..937baf982 --- /dev/null +++ b/host/lib/include/uhdlib/rfnoc/rx_flow_ctrl_state.hpp @@ -0,0 +1,130 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_RFNOC_RX_FLOW_CTRL_STATE_HPP +#define INCLUDED_LIBUHD_RFNOC_RX_FLOW_CTRL_STATE_HPP + +#include <uhd/utils/log.hpp> +#include <uhdlib/rfnoc/rfnoc_common.hpp> + +namespace uhd { namespace rfnoc { + +//! Class to manage rx flow control state +class rx_flow_ctrl_state +{ +public: +    //! Constructor +    rx_flow_ctrl_state(const rfnoc::sep_id_pair_t epids) : _epids(epids) {} + +    //! Initialize frequency parameters +    void initialize(const stream_buff_params_t fc_freq) +    { +        _fc_freq = fc_freq; +    } + +    //! Resynchronize with transfer counts from the sender +    void resynchronize(const stream_buff_params_t counts) +    { +        if (_recv_counts.bytes != counts.bytes +            || _recv_counts.packets != counts.packets) { +            // If there is a discrepancy between the amount of data sent by +            // the device and received by the transport, adjust the counts +            // of data received and transferred to include the dropped data. +            auto bytes_dropped = counts.bytes - _recv_counts.bytes; +            auto pkts_dropped  = counts.packets - _recv_counts.packets; +            _xfer_counts.bytes += bytes_dropped; +            _xfer_counts.packets += pkts_dropped; + +            UHD_LOGGER_DEBUG("rx_flow_ctrl_state") +                << "oh noes: bytes_sent=" << counts.bytes +                << "  bytes_received=" << _recv_counts.bytes +                << "  pkts_sent=" << counts.packets +                << "  pkts_received=" << _recv_counts.packets +                << " src_epid=" << _epids.first << " dst_epid=" << _epids.second +                << std::endl; + +            _recv_counts = counts; +        } +    } + +    //! Reset the transfer counts (happens during init) +    void reset_counts() +    { +        UHD_LOGGER_TRACE("rx_flow_ctrl_state") +            << "Resetting transfer counts" << std::endl; +        _recv_counts = {0, 0}; +        _xfer_counts = {0, 0}; +    } + +    //! Update state when data is received +    void data_received(const size_t bytes) +    { +        _recv_counts.bytes += bytes; +        _recv_counts.packets++; +    } + +    //! Update state when transfer is complete (buffer space freed) +    void xfer_done(const size_t bytes) +    { +        _xfer_counts.bytes += bytes; +        _xfer_counts.packets++; +    } + +    //! Returns whether a flow control response is needed +    bool fc_resp_due() const +    { +        stream_buff_params_t accum_counts = { +            _xfer_counts.bytes - _last_fc_resp_counts.bytes, +            _xfer_counts.packets - _last_fc_resp_counts.packets}; + +        return accum_counts.bytes >= _fc_freq.bytes +               || accum_counts.packets >= _fc_freq.packets; +    } + +    //! Update state after flow control response was sent +    void fc_resp_sent() +    { +        _last_fc_resp_counts = _xfer_counts; +    } + +    //! Returns counts for completed transfers +    stream_buff_params_t get_xfer_counts() const +    { +        return _xfer_counts; +    } + +    //! Returns counts for completed transfers +    stream_buff_params_t get_recv_counts() const +    { +        return _recv_counts; +    } + +    //! Returns configured flow control frequency +    stream_buff_params_t get_fc_freq() const +    { +        return _fc_freq; +    } + +private: +    // Counts for data received, including any data still in use +    stream_buff_params_t _recv_counts{0, 0}; + +    // Counts for data read and whose buffer space is ok to reuse +    stream_buff_params_t _xfer_counts{0, 0}; + +    // Counts sent in last flow control response +    stream_buff_params_t _last_fc_resp_counts{0, 0}; + +    // Frequency of flow control responses +    stream_buff_params_t _fc_freq{0, 0}; + +    // Endpoint ID for log messages +    const sep_id_pair_t _epids; +}; + +}} // namespace uhd::rfnoc + +#endif /* INCLUDED_LIBUHD_RFNOC_RX_FLOW_CTRL_STATE_HPP */ diff --git a/host/lib/include/uhdlib/rfnoc/tx_flow_ctrl_state.hpp b/host/lib/include/uhdlib/rfnoc/tx_flow_ctrl_state.hpp new file mode 100644 index 000000000..65fc1b093 --- /dev/null +++ b/host/lib/include/uhdlib/rfnoc/tx_flow_ctrl_state.hpp @@ -0,0 +1,99 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_RFNOC_TX_FLOW_CTRL_STATE_HPP +#define INCLUDED_LIBUHD_RFNOC_TX_FLOW_CTRL_STATE_HPP + +#include <uhdlib/rfnoc/rfnoc_common.hpp> + +namespace uhd { namespace rfnoc { + +//! Class to manage tx flow control state +class tx_flow_ctrl_state +{ +public: +    //! Updates destination capacity +    void set_dest_capacity(const stream_buff_params_t& capacity) +    { +        _dest_capacity = capacity; +    } + +    //! Updates destination received count +    void update_dest_recv_count(const stream_buff_params_t& recv_count) +    { +        _recv_counts = recv_count; +    } + +    /*! Returns whether the destination has buffer space for the requested +     *  packet size +     */ +    bool dest_has_space(const size_t packet_size) +    { +        // The stream endpoint only cares about bytes, the packet count is not +        // important to determine the space available. +        const auto buffer_fullness = _xfer_counts.bytes - _recv_counts.bytes; +        const auto space_available = _dest_capacity.bytes - buffer_fullness; +        return space_available >= packet_size; +    } + +    //! Increments transfer count with amount of data sent +    void data_sent(const size_t packet_size) +    { +        _xfer_counts.bytes += packet_size; +        _xfer_counts.packets++; + +        // Request an fc resync after we have transferred a number of bytes >= +        // to the destination capacity. There is no strict requirement on how +        // often we need to send this, as it is only needed to correct for +        // dropped packets. One buffer's worth of bytes is probably a good +        // cadence. +        if (_xfer_counts.bytes - _last_fc_resync_bytes >= _dest_capacity.bytes) { +            _fc_resync_req = true; +        } +    } + +    /*! Returns whether an fc resync request is pending. The policy we use +     * here is to send an fc resync every time we send a number of bytes +     * equal to the destination buffer capacity. +     */ +    bool get_fc_resync_req_pending() const +    { +        return _fc_resync_req; +    } + +    //! Clears fc resync request pending status +    void clear_fc_resync_req_pending() +    { +        _fc_resync_req = false; +        _last_fc_resync_bytes = _xfer_counts.bytes; +    } + +    //! Returns counts for packets sent +    stream_buff_params_t get_xfer_counts() const +    { +        return _xfer_counts; +    } + +private: +    // Counts for data sent +    stream_buff_params_t _xfer_counts{0, 0}; + +    // Counts for data received by the destination +    stream_buff_params_t _recv_counts{0, 0}; + +    // Buffer size at the destination +    stream_buff_params_t _dest_capacity{0, 0}; + +    // Counts sent in last flow control resync +    size_t _last_fc_resync_bytes = 0; + +    // Track when to send ack packets +    bool _fc_resync_req = false; +}; + +}} // namespace uhd::rfnoc + +#endif /* INCLUDED_LIBUHD_RFNOC_TX_FLOW_CTRL_STATE_HPP */ diff --git a/host/lib/include/uhdlib/transport/get_aligned_buffs.hpp b/host/lib/include/uhdlib/transport/get_aligned_buffs.hpp new file mode 100644 index 000000000..662be6d2d --- /dev/null +++ b/host/lib/include/uhdlib/transport/get_aligned_buffs.hpp @@ -0,0 +1,175 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_GET_ALIGNED_BUFFS_HPP +#define INCLUDED_LIBUHD_GET_ALIGNED_BUFFS_HPP + +#include <uhd/exception.hpp> +#include <uhd/utils/log.hpp> +#include <boost/dynamic_bitset.hpp> +#include <boost/format.hpp> + +namespace uhd { namespace transport { + +// Number of iterations that get_aligned_buffs will attempt to time align +// packets before returning an alignment failure. get_aligned_buffs increments +// the iteration count when it finds a timestamp that is larger than the +// timestamps on channels it has already aligned and thus has to restart +// aligning timestamps on all channels to the new timestamp. +constexpr size_t ALIGNMENT_FAILURE_THRESHOLD = 1000; + +/*! + * Implementation of rx time alignment. This method reads packets from the + * transports for each channel and discards any packets whose tsf does not + * match those of other channels due to dropped packets. Packets that do not + * have a tsf are not checked for alignment and never dropped. + */ +template <typename transport_t> +class get_aligned_buffs +{ +public: +    enum alignment_result_t { +        SUCCESS, +        TIMEOUT, +        SEQUENCE_ERROR, +        ALIGNMENT_FAILURE, +        BAD_PACKET +    }; + +    get_aligned_buffs(std::vector<typename transport_t::uptr>& xports, +        std::vector<typename transport_t::buff_t::uptr>& frame_buffs, +        std::vector<typename transport_t::packet_info_t>& infos) +        : _xports(xports) +        , _frame_buffs(frame_buffs) +        , _infos(infos) +        , _prev_tsf(_xports.size(), 0) +        , _channels_to_align(_xports.size()) +    { +    } + +    alignment_result_t operator()(const int32_t timeout_ms) +    { +        // Clear state +        _channels_to_align.set(); +        bool time_valid   = false; +        uint64_t tsf      = 0; +        size_t iterations = 0; + +        while (_channels_to_align.any()) { +            const size_t chan = _channels_to_align.find_first(); +            auto& xport       = _xports[chan]; +            auto& info        = _infos[chan]; +            auto& frame_buff  = _frame_buffs[chan]; +            bool seq_error    = false; + +            // Receive a data packet for the channel if we don't have one. A +            // packet may already be there if the previous call was interrupted +            // by an error. +            if (!frame_buff) { +                try { +                    std::tie(frame_buff, info, seq_error) = +                        xport->get_recv_buff(timeout_ms); +                } catch (const uhd::value_error& e) { +                    // Bad packet +                    UHD_LOGGER_ERROR("STREAMER") +                        << boost::format( +                               "The receive transport caught a value exception.\n%s") +                               % e.what(); +                    return BAD_PACKET; +                } +            } + +            if (!frame_buff) { +                return TIMEOUT; +            } + +            if (info.has_tsf) { +                const bool time_out_of_order = _prev_tsf[chan] > info.tsf; +                _prev_tsf[chan]              = info.tsf; + +                // If the user changes the device time while streaming, we can +                // receive a packet that comes before the previous packet in +                // time. This would cause the alignment logic to discard future +                // received packets. Therefore, when this occurs, we reset the +                // info to restart the alignment. +                if (time_out_of_order) { +                    time_valid = false; +                } + +                // Check if the time is larger than packets received for other +                // channels, and if so, use this time to align all channels +                if (!time_valid || info.tsf > tsf) { +                    // If we haven't found a set of aligned packets after many +                    // iterations, return an alignment failure +                    if (iterations++ > ALIGNMENT_FAILURE_THRESHOLD) { +                        UHD_LOGGER_ERROR("STREAMER") +                            << "The rx streamer failed to time-align packets."; +                        return ALIGNMENT_FAILURE; +                    } + +                    // Release buffers for channels aligned previously. Keep +                    // buffers that don't have a tsf since we don't need to +                    // align those. +                    for (size_t i = 0; i < _xports.size(); i++) { +                        if (!_channels_to_align.test(i) && _infos[i].has_tsf) { +                            _xports[i]->release_recv_buff(std::move(_frame_buffs[i])); +                            _frame_buffs[i] = nullptr; +                        } +                    } + +                    // Mark only this channel as aligned and save its tsf +                    _channels_to_align.set(); +                    _channels_to_align.reset(chan); +                    time_valid = true; +                    tsf        = info.tsf; +                } + +                // Check if the time matches that of other aligned channels +                else if (info.tsf == tsf) { +                    _channels_to_align.reset(chan); +                } + +                // Otherwise, time is smaller than other channels, release the buffer +                else { +                    _xports[chan]->release_recv_buff(std::move(_frame_buffs[chan])); +                    _frame_buffs[chan] = nullptr; +                } +            } else { +                // Packet doesn't have a tsf, just mark it as aligned +                _channels_to_align.reset(chan); +            } + +            // If this packet had a sequence error, stop to return the error. +            // Keep the packet for the next call to get_aligned_buffs. +            if (seq_error) { +                return SEQUENCE_ERROR; +            } +        } + +        // All channels aligned +        return SUCCESS; +    } + +private: +    // Transports for each channel +    std::vector<typename transport_t::uptr>& _xports; + +    // Storage for buffers resulting from alignment +    std::vector<typename transport_t::buff_t::uptr>& _frame_buffs; + +    // Packet info corresponding to aligned buffers +    std::vector<typename transport_t::packet_info_t>& _infos; + +    // Time of previous packet for each channel +    std::vector<uint64_t> _prev_tsf; + +    // Keeps track of channels that are aligned +    boost::dynamic_bitset<> _channels_to_align; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_LIBUHD_GET_ALIGNED_BUFFS_HPP */ diff --git a/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp new file mode 100644 index 000000000..d66e867bc --- /dev/null +++ b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp @@ -0,0 +1,341 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_RX_STREAMER_IMPL_HPP +#define INCLUDED_LIBUHD_RX_STREAMER_IMPL_HPP + +#include <uhd/config.hpp> +#include <uhd/convert.hpp> +#include <uhd/exception.hpp> +#include <uhd/stream.hpp> +#include <uhd/types/endianness.hpp> +#include <uhd/utils/log.hpp> +#include <uhdlib/transport/rx_streamer_zero_copy.hpp> +#include <limits> +#include <vector> + +namespace uhd { namespace transport { + +namespace detail { + +/*! + * Cache of metadata for error handling + * + * If a recv call reads data from multiple packets, and an error occurs in the + * second or later packets, recv stops short of the num samps requested and + * returns no error. The error is cached for the next call to recv. + * + * Timeout errors are an exception. Timeouts that occur in the second or later + * packets of a recv call stop the recv method but the error is not returned in + * the next call. The user can check for this condition since fewer samples are + * returned than the number requested. + */ +class rx_metadata_cache +{ +public: +    //! Stores metadata in the cache, ignoring timeout errors +    UHD_FORCE_INLINE void store(const rx_metadata_t& metadata) +    { +        if (metadata.error_code != rx_metadata_t::ERROR_CODE_TIMEOUT) { +            _metadata_cache  = metadata; +            _cached_metadata = true; +        } +    } + +    //! Checks for cached metadata +    UHD_FORCE_INLINE bool check(rx_metadata_t& metadata) +    { +        if (_cached_metadata) { +            metadata         = _metadata_cache; +            _cached_metadata = false; +            return true; +        } +        return false; +    } + +private: +    // Whether there is a cached metadata object +    bool _cached_metadata = false; + +    // Cached metadata value +    uhd::rx_metadata_t _metadata_cache; +}; + +} // namespace detail + +/*! + * Implementation of rx streamer API + */ +template <typename transport_t> +class rx_streamer_impl : public rx_streamer +{ +public: +    //! Constructor +    rx_streamer_impl(const size_t num_ports, const uhd::stream_args_t stream_args) +        : _zero_copy_streamer(num_ports) +        , _in_buffs(num_ports) +    { +        if (stream_args.cpu_format.empty()) { +            throw uhd::value_error("[rx_stream] Must provide a cpu_format!"); +        } +        if (stream_args.otw_format.empty()) { +            throw uhd::value_error("[rx_stream] Must provide a otw_format!"); +        } +        _setup_converters(num_ports, stream_args); +        _zero_copy_streamer.set_samp_rate(_samp_rate); +        _zero_copy_streamer.set_bytes_per_item(_convert_info.bytes_per_otw_item); +    } + +    //! Connect a new channel to the streamer +    void connect_channel(const size_t channel, typename transport_t::uptr xport) +    { +        const size_t max_pyld_size = xport->get_max_payload_size(); +        _zero_copy_streamer.connect_channel(channel, std::move(xport)); + +        // Set spp based on the transport frame size +        const size_t xport_spp = max_pyld_size / _convert_info.bytes_per_otw_item; +        _spp = std::min(_spp, xport_spp); +    } + +    //! Implementation of rx_streamer API method +    size_t get_num_channels() const +    { +        return _zero_copy_streamer.get_num_channels(); +    } + +    //! Implementation of rx_streamer API method +    size_t get_max_num_samps() const +    { +        return _spp; +    } + +    /*! Get width of each over-the-wire item component. For complex items, +     *  returns the width of one component only (real or imaginary). +     */ +    size_t get_otw_item_comp_bit_width() const +    { +        return _convert_info.otw_item_bit_width; +    } + +    //! Implementation of rx_streamer API method +    UHD_INLINE size_t recv(const uhd::rx_streamer::buffs_type& buffs, +        const size_t nsamps_per_buff, +        uhd::rx_metadata_t& metadata, +        const double timeout, +        const bool one_packet) +    { +        if (_error_metadata_cache.check(metadata)) { +            return 0; +        } + +        const int32_t timeout_ms = static_cast<int32_t>(timeout * 1000); + +        size_t total_samps_recv = +            _recv_one_packet(buffs, nsamps_per_buff, metadata, timeout_ms); + +        if (one_packet or metadata.end_of_burst) { +            return total_samps_recv; +        } + +        // First set of packets recv had an error, return immediately +        if (metadata.error_code != rx_metadata_t::ERROR_CODE_NONE) { +            return total_samps_recv; +        } + +        // Loop until buffer is filled or error code. This method returns the +        // metadata from the first packet received, with the exception of +        // end-of-burst. +        uhd::rx_metadata_t loop_metadata; + +        while (total_samps_recv < nsamps_per_buff) { +            size_t num_samps = _recv_one_packet(buffs, +                nsamps_per_buff - total_samps_recv, +                loop_metadata, +                timeout_ms, +                total_samps_recv * _convert_info.bytes_per_cpu_item); + +            // If metadata had an error code set, store for next call and return +            if (loop_metadata.error_code != rx_metadata_t::ERROR_CODE_NONE) { +                _error_metadata_cache.store(loop_metadata); +                break; +            } + +            total_samps_recv += num_samps; + +            // Return immediately if end of burst +            if (loop_metadata.end_of_burst) { +                metadata.end_of_burst = true; +                break; +            } +        } + +        return total_samps_recv; +    } + +protected: +    //! Configures scaling factor for conversion +    void set_scale_factor(const size_t chan, const double scale_factor) +    { +        _converters[chan]->set_scalar(scale_factor); +    } + +    //! Configures sample rate for conversion of timestamp +    void set_samp_rate(const double rate) +    { +        _samp_rate = rate; +        _zero_copy_streamer.set_samp_rate(rate); +    } + +    //! Configures tick rate for conversion of timestamp +    void set_tick_rate(const double rate) +    { +        _zero_copy_streamer.set_tick_rate(rate); +    } + +private: +    //! Converter and associated item sizes +    struct convert_info +    { +        size_t bytes_per_otw_item; +        size_t bytes_per_cpu_item; +        size_t otw_item_bit_width; +    }; + +    //! Receive a single packet +    UHD_FORCE_INLINE size_t _recv_one_packet(const uhd::rx_streamer::buffs_type& buffs, +        const size_t nsamps_per_buff, +        uhd::rx_metadata_t& metadata, +        const int32_t timeout_ms, +        const size_t buffer_offset_bytes = 0) +    { +        if (_buff_samps_remaining == 0) { +            // Current set of buffers has expired, get the next one +            _buff_samps_remaining = +                _zero_copy_streamer.get_recv_buffs(_in_buffs, metadata, timeout_ms); +            _fragment_offset_in_samps = 0; +        } else { +            // There are samples still left in the current set of buffers +            metadata = _last_fragment_metadata; +            metadata.time_spec += time_spec_t::from_ticks( +                _fragment_offset_in_samps - metadata.fragment_offset, _samp_rate); +        } + +        if (_buff_samps_remaining != 0) { +            const size_t num_samps = std::min(nsamps_per_buff, _buff_samps_remaining); + +            // Convert samples to the streamer's output format +            for (size_t i = 0; i < get_num_channels(); i++) { +                char* b = reinterpret_cast<char*>(buffs[i]); +                const uhd::rx_streamer::buffs_type out_buffs(b + buffer_offset_bytes); +                _convert_to_out_buff(out_buffs, i, num_samps); +            } + +            _buff_samps_remaining -= num_samps; + +            // Write the fragment flags and offset +            metadata.more_fragments  = _buff_samps_remaining != 0; +            metadata.fragment_offset = _fragment_offset_in_samps; + +            if (metadata.more_fragments) { +                _fragment_offset_in_samps += num_samps; +                _last_fragment_metadata = metadata; +            } + +            return num_samps; +        } else { +            return 0; +        } +    } + +    //! Convert samples for one channel into its buffer +    UHD_FORCE_INLINE void _convert_to_out_buff( +        const uhd::rx_streamer::buffs_type& out_buffs, +        const size_t chan, +        const size_t num_samps) +    { +        const char* buffer_ptr = reinterpret_cast<const char*>(_in_buffs[chan]); + +        _converters[chan]->conv(buffer_ptr, out_buffs, num_samps); + +        // Advance the pointer for the source buffer +        _in_buffs[chan] = +            buffer_ptr + num_samps * _convert_info.bytes_per_otw_item; + +        if (_buff_samps_remaining == num_samps) { +            _zero_copy_streamer.release_recv_buff(chan); +        } +    } + +    //! Create converters and initialize _convert_info +    void _setup_converters(const size_t num_ports, +        const uhd::stream_args_t stream_args) +    { +        // Note to code archaeologists: In the past, we had to also specify the +        // endianness here, but that is no longer necessary because we can make +        // the wire endianness match the host endianness. +        convert::id_type id; +        id.input_format  = stream_args.otw_format + "_chdr"; +        id.num_inputs    = 1; +        id.output_format = stream_args.cpu_format; +        id.num_outputs   = 1; + +        auto starts_with = [](const std::string& s, const std::string v) { +            return s.find(v) == 0; +        }; + +        const bool otw_is_complex = starts_with(stream_args.otw_format, "fc") +                                    || starts_with(stream_args.otw_format, "sc"); + +        convert_info info; +        info.bytes_per_otw_item = convert::get_bytes_per_item(id.input_format); +        info.bytes_per_cpu_item = convert::get_bytes_per_item(id.output_format); + +        if (otw_is_complex) { +            info.otw_item_bit_width = info.bytes_per_otw_item * 8 / 2; +        } else { +            info.otw_item_bit_width = info.bytes_per_otw_item * 8; +        } + +        _convert_info = info; + +        for (size_t i = 0; i < num_ports; i++) { +            _converters.push_back(convert::get_converter(id)()); +            _converters.back()->set_scalar(1 / 32767.0); +        } +    } + +    // Converter and item sizes +    convert_info _convert_info; + +    // Converters +    std::vector<uhd::convert::converter::sptr> _converters; + +    // Implementation of frame buffer management and packet info +    rx_streamer_zero_copy<transport_t> _zero_copy_streamer; + +    // Container for buffer pointers used in recv method +    std::vector<const void*> _in_buffs; + +    // Sample rate used to calculate metadata time_spec_t +    double _samp_rate = 1.0; + +    // Maximum number of samples per packet +    size_t _spp = std::numeric_limits<std::size_t>::max(); + +    // Num samps remaining in buffer currently held by zero copy streamer +    size_t _buff_samps_remaining = 0; + +    // Metadata cache for error handling +    detail::rx_metadata_cache _error_metadata_cache; + +    // Fragment (partially read packet) information +    size_t _fragment_offset_in_samps = 0; +    rx_metadata_t _last_fragment_metadata; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_LIBUHD_RX_STREAMER_IMPL_HPP */ diff --git a/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp b/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp new file mode 100644 index 000000000..36f568f2d --- /dev/null +++ b/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp @@ -0,0 +1,207 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_RX_STREAMER_ZERO_COPY_HPP +#define INCLUDED_LIBUHD_RX_STREAMER_ZERO_COPY_HPP + +#include <uhd/config.hpp> +#include <uhd/exception.hpp> +#include <uhd/types/metadata.hpp> +#include <uhd/utils/log.hpp> +#include <uhdlib/transport/get_aligned_buffs.hpp> +#include <boost/format.hpp> +#include <vector> + +namespace uhd { namespace transport { + +/*! + * Implementation of rx streamer manipulation of frame buffers and packet info. + * This class is part of rx_streamer_impl, split into a separate unit as it is + * a mostly self-contained portion of the streamer logic. + */ +template <typename transport_t> +class rx_streamer_zero_copy +{ +public: +    //! Constructor +    rx_streamer_zero_copy(const size_t num_ports) +        : _xports(num_ports) +        , _frame_buffs(num_ports) +        , _infos(num_ports) +        , _get_aligned_buffs(_xports, _frame_buffs, _infos) +    { +    } + +    ~rx_streamer_zero_copy() +    { +        for (size_t i = 0; i < _frame_buffs.size(); i++) { +            if (_frame_buffs[i]) { +                _xports[i]->release_recv_buff(std::move(_frame_buffs[i])); +            } +        } +    } + +    //! Connect a new channel to the streamer +    void connect_channel(const size_t port, typename transport_t::uptr xport) +    { +        if (port >= get_num_channels()) { +            throw uhd::index_error( +                "Port number indexes beyond the number of streamer ports"); +        } + +        if (_xports[port]) { +            throw uhd::runtime_error( +                "Streamer port number is already connected to a port"); +        } + +        _xports[port] = std::move(xport); +    } + +    //! Returns number of channels handled by this streamer +    size_t get_num_channels() const +    { +        return _xports.size(); +    } + +    //! Configures tick rate for conversion of timestamp +    void set_tick_rate(const double rate) +    { +        _tick_rate = rate; +    } + +    //! Configures sample rate for conversion of timestamp +    void set_samp_rate(const double rate) +    { +        _samp_rate = rate; +    } + +    //! Configures the size of each sample +    void set_bytes_per_item(const size_t bpi) +    { +        _bytes_per_item = bpi; +    } + +    /*! +     * Gets a set of time-aligned buffers, one per channel. +     * +     * \param buffs returns a pointer to the buffer data +     * \param metadata returns the metadata corresponding to the buffer +     * \param timeout_ms timeout in milliseconds +     * \return the size in samples of each packet, or 0 if timeout +     */ +    size_t get_recv_buffs(std::vector<const void*>& buffs, +        rx_metadata_t& metadata, +        const int32_t timeout_ms) +    { +        metadata.reset(); + +        switch (_get_aligned_buffs(timeout_ms)) { +            case get_aligned_buffs_t::SUCCESS: +                break; + +            case get_aligned_buffs_t::BAD_PACKET: +                metadata.error_code = rx_metadata_t::ERROR_CODE_BAD_PACKET; +                return 0; + +            case get_aligned_buffs_t::TIMEOUT: +                metadata.error_code = rx_metadata_t::ERROR_CODE_TIMEOUT; +                return 0; + +            case get_aligned_buffs_t::ALIGNMENT_FAILURE: +                metadata.error_code = rx_metadata_t::ERROR_CODE_ALIGNMENT; +                return 0; + +            case get_aligned_buffs_t::SEQUENCE_ERROR: +                metadata.has_time_spec = _last_read_time_info.has_time_spec; +                metadata.time_spec = +                    _last_read_time_info.time_spec +                    + time_spec_t::from_ticks(_last_read_time_info.num_samps, _samp_rate); +                metadata.out_of_sequence = true; +                metadata.error_code      = rx_metadata_t::ERROR_CODE_OVERFLOW; +                return 0; + +            default: +                UHD_THROW_INVALID_CODE_PATH(); +        } + +        // Get payload pointers for each buffer and aggregate eob. We set eob to +        // true if any channel has it set, since no more data will be received for +        // that channel. In most cases, all channels should have the same value. +        bool eob = false; +        for (size_t i = 0; i < buffs.size(); i++) { +            buffs[i] = _infos[i].payload; +            eob |= _infos[i].eob; +        } + +        // Set the metadata from the buffer information at index zero +        const auto& info_0 = _infos[0]; + +        metadata.has_time_spec  = info_0.has_tsf; +        metadata.time_spec      = time_spec_t::from_ticks(info_0.tsf, _tick_rate); +        metadata.start_of_burst = false; +        metadata.end_of_burst   = eob; +        metadata.error_code     = rx_metadata_t::ERROR_CODE_NONE; + +        // Done with these packets, save timestamp info for next call +        _last_read_time_info.has_time_spec = metadata.has_time_spec; +        _last_read_time_info.time_spec     = metadata.time_spec; +        _last_read_time_info.num_samps     = info_0.payload_bytes / _bytes_per_item; + +        return _last_read_time_info.num_samps; +    } + +    /*! +     * Release the packet for the specified channel +     * +     * \param channel the channel for which to release the packet +     */ +    void release_recv_buff(const size_t channel) +    { +        _xports[channel]->release_recv_buff(std::move(_frame_buffs[channel])); +        _frame_buffs[channel] = typename transport_t::buff_t::uptr(); +    } + +private: +    using get_aligned_buffs_t = get_aligned_buffs<transport_t>; + +    // Information recorded by streamer about the last data packet processed, +    // used to create the metadata when there is a sequence error. +    struct last_read_time_info_t +    { +        size_t num_samps   = 0; +        bool has_time_spec = false; +        time_spec_t time_spec; +    }; + +    // Transports for each channel +    std::vector<typename transport_t::uptr> _xports; + +    // Storage for buffers for each channel while they are in flight (between +    // calls to get_recv_buff and release_recv_buff). +    std::vector<typename transport_t::buff_t::uptr> _frame_buffs; + +    // Packet info corresponding to the packets in flight +    std::vector<typename transport_t::packet_info_t> _infos; + +    // Rate used in conversion of timestamp to time_spec_t +    double _tick_rate = 1.0; + +    // Rate used in conversion of timestamp to time_spec_t +    double _samp_rate = 1.0; + +    // Size of a sample on the device +    size_t _bytes_per_item = 0; + +    // Implementation of packet time alignment +    get_aligned_buffs_t _get_aligned_buffs; + +    // Information about the last data packet processed +    last_read_time_info_t _last_read_time_info; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_LIBUHD_RX_STREAMER_ZERO_COPY_HPP */ diff --git a/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp new file mode 100644 index 000000000..60881dad2 --- /dev/null +++ b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp @@ -0,0 +1,307 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_TX_STREAMER_IMPL_HPP +#define INCLUDED_LIBUHD_TX_STREAMER_IMPL_HPP + +#include <uhd/config.hpp> +#include <uhd/convert.hpp> +#include <uhd/stream.hpp> +#include <uhd/types/metadata.hpp> +#include <uhd/utils/tasks.hpp> +#include <uhdlib/transport/tx_streamer_zero_copy.hpp> +#include <limits> +#include <vector> + +namespace uhd { namespace transport { + +namespace detail { + +/*! + * Cache of metadata for send calls with zero samples + * + * Metadata is cached when we get a send requesting a start of burst with no + * samples. It is applied here on the next call to send() that actually has + * samples to send. + */ +class tx_metadata_cache +{ +public: +    //! Stores metadata in the cache +    UHD_FORCE_INLINE void store(const tx_metadata_t& metadata) +    { +        _metadata_cache  = metadata; +        _cached_metadata = true; +    } + +    //! Checks for cached metadata +    UHD_FORCE_INLINE void check(tx_metadata_t& metadata) +    { +        if (_cached_metadata) { +            // Only use cached time_spec if metadata does not have one +            if (!metadata.has_time_spec) { +                metadata.has_time_spec = _metadata_cache.has_time_spec; +                metadata.time_spec     = _metadata_cache.time_spec; +            } +            metadata.start_of_burst = _metadata_cache.start_of_burst; +            metadata.end_of_burst   = _metadata_cache.end_of_burst; +            _cached_metadata        = false; +        } +    } + +private: +    // Whether there is a cached metadata object +    bool _cached_metadata = false; + +    // Cached metadata value +    uhd::tx_metadata_t _metadata_cache; +}; + +} // namespace detail + +/*! + * Implementation of tx streamer API + */ +template <typename transport_t> +class tx_streamer_impl : public tx_streamer +{ +public: +    tx_streamer_impl(const size_t num_chans, const uhd::stream_args_t stream_args) +        : _zero_copy_streamer(num_chans) +        , _zero_buffs(num_chans, &_zero) +        , _out_buffs(num_chans) +    { +        _setup_converters(num_chans, stream_args); +        _zero_copy_streamer.set_bytes_per_item(_convert_info.bytes_per_otw_item); +        _spp = stream_args.args.cast<size_t>("spp", _spp); +    } + +    void connect_channel(const size_t channel, typename transport_t::uptr xport) +    { +        const size_t max_pyld_size = xport->get_max_payload_size(); +        _zero_copy_streamer.connect_channel(channel, std::move(xport)); + +        // Set spp based on the transport frame size +        const size_t xport_spp = max_pyld_size / _convert_info.bytes_per_otw_item; +        _spp                   = std::min(_spp, xport_spp); +    } + +    size_t get_num_channels() const +    { +        return _zero_copy_streamer.get_num_channels(); +    } + +    size_t get_max_num_samps() const +    { +        return _spp; +    } + + +    /*! Get width of each over-the-wire item component. For complex items, +     *  returns the width of one component only (real or imaginary). +     */ +    size_t get_otw_item_comp_bit_width() const +    { +        return _convert_info.otw_item_bit_width; +    } + +    size_t send(const uhd::tx_streamer::buffs_type& buffs, +        const size_t nsamps_per_buff, +        const uhd::tx_metadata_t& metadata_, +        const double timeout) +    { +        uhd::tx_metadata_t metadata(metadata_); + +        if (nsamps_per_buff == 0 && metadata.start_of_burst) { +            _metadata_cache.store(metadata); +            return 0; +        } + +        _metadata_cache.check(metadata); + +        const int32_t timeout_ms = static_cast<int32_t>(timeout * 1000); + +        if (nsamps_per_buff == 0) { +            // Send requests with no samples are handled here, such as end of +            // burst. Send packets need to have at least one sample based on the +            // chdr specification, so we use _zero_buffs here. +            _send_one_packet(_zero_buffs.data(), +                0, // buffer offset +                1, // num samples +                metadata, +                timeout_ms); + +            return 0; +        } else if (nsamps_per_buff <= _spp) { +            return _send_one_packet(buffs, 0, nsamps_per_buff, metadata, timeout_ms); + +        } else { +            size_t total_num_samps_sent = 0; +            const bool eob              = metadata.end_of_burst; +            metadata.end_of_burst       = false; + +            const size_t num_fragments = (nsamps_per_buff - 1) / _spp; +            const size_t final_length  = ((nsamps_per_buff - 1) % _spp) + 1; + +            for (size_t i = 0; i < num_fragments; i++) { +                const size_t num_samps_sent = _send_one_packet( +                    buffs, total_num_samps_sent, _spp, metadata, timeout_ms); + +                total_num_samps_sent += num_samps_sent; + +                if (num_samps_sent == 0) { +                    return total_num_samps_sent; +                } + +                // Setup timespec for the next fragment +                if (metadata.has_time_spec) { +                    metadata.time_spec = +                        metadata.time_spec +                        + time_spec_t::from_ticks(num_samps_sent, _samp_rate); +                } + +                metadata.start_of_burst = false; +            } + +            // Send the final fragment +            metadata.end_of_burst = eob; + +            size_t nsamps_sent = +                total_num_samps_sent +                + _send_one_packet( +                      buffs, total_num_samps_sent, final_length, metadata, timeout); + +            return nsamps_sent; +        } +    } + +    //! Implementation of rx_streamer API method +    bool recv_async_msg( +        uhd::async_metadata_t& /*async_metadata*/, double /*timeout = 0.1*/) +    { +        // TODO: implement me +        return false; +    } + +protected: +    //! Configures scaling factor for conversion +    void set_scale_factor(const size_t chan, const double scale_factor) +    { +        _converters[chan]->set_scalar(scale_factor); +    } + +    //! Configures sample rate for conversion of timestamp +    void set_samp_rate(const double rate) +    { +        _samp_rate = rate; +    } + +    //! Configures tick rate for conversion of timestamp +    void set_tick_rate(const double rate) +    { +        _zero_copy_streamer.set_tick_rate(rate); +    } + +private: +    //! Converter and associated item sizes +    struct convert_info +    { +        size_t bytes_per_otw_item; +        size_t bytes_per_cpu_item; +        size_t otw_item_bit_width; +    }; + +    //! Convert samples for one channel and sends a packet +    size_t _send_one_packet(const uhd::tx_streamer::buffs_type& buffs, +        const size_t buffer_offset_in_samps, +        const size_t num_samples, +        const tx_metadata_t& metadata, +        const int32_t timeout_ms) +    { +        if (!_zero_copy_streamer.get_send_buffs( +                _out_buffs, num_samples, metadata, timeout_ms)) { +            return 0; +        } + +        size_t byte_offset = buffer_offset_in_samps * _convert_info.bytes_per_cpu_item; + +        for (size_t i = 0; i < get_num_channels(); i++) { +            const void* input_ptr = static_cast<const uint8_t*>(buffs[i]) + byte_offset; +            _converters[i]->conv(input_ptr, _out_buffs[i], num_samples); + +            _zero_copy_streamer.release_send_buff(i); +        } + +        return num_samples; +    } + +    //! Create converters and initialize _bytes_per_cpu_item +    void _setup_converters(const size_t num_chans, const uhd::stream_args_t stream_args) +    { +        // Note to code archaeologists: In the past, we had to also specify the +        // endianness here, but that is no longer necessary because we can make +        // the wire endianness match the host endianness. +        convert::id_type id; +        id.input_format  = stream_args.cpu_format; +        id.num_inputs    = 1; +        id.output_format = stream_args.otw_format + "_chdr"; +        id.num_outputs   = 1; + +        auto starts_with = [](const std::string& s, const std::string v) { +            return s.find(v) == 0; +        }; + +        const bool otw_is_complex = starts_with(stream_args.otw_format, "fc") +                                    || starts_with(stream_args.otw_format, "sc"); + +        convert_info info; +        info.bytes_per_otw_item = convert::get_bytes_per_item(id.output_format); +        info.bytes_per_cpu_item = convert::get_bytes_per_item(id.input_format); + +        if (otw_is_complex) { +            info.otw_item_bit_width = info.bytes_per_otw_item * 8 / 2; +        } else { +            info.otw_item_bit_width = info.bytes_per_otw_item * 8; +        } + +        _convert_info = info; + +        for (size_t i = 0; i < num_chans; i++) { +            _converters.push_back(convert::get_converter(id)()); +            _converters.back()->set_scalar(32767.0); +        } +    } + +    // Converter item sizes +    convert_info _convert_info; + +    // Converters +    std::vector<uhd::convert::converter::sptr> _converters; + +    // Manages frame buffers and packet info +    tx_streamer_zero_copy<transport_t> _zero_copy_streamer; + +    // Buffer used to handle send calls with no data +    std::vector<const void*> _zero_buffs; + +    const uint64_t _zero = 0; + +    // Container for buffer pointers used in send method +    std::vector<void*> _out_buffs; + +    // Sample rate used to calculate metadata time_spec_t +    double _samp_rate = 1.0; + +    // Maximum number of samples per packet +    size_t _spp = std::numeric_limits<std::size_t>::max(); + +    // Metadata cache for send calls with no data +    detail::tx_metadata_cache _metadata_cache; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_LIBUHD_TRANSPORT_TX_STREAMER_IMPL_HPP */ diff --git a/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp b/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp new file mode 100644 index 000000000..1b6f55238 --- /dev/null +++ b/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp @@ -0,0 +1,147 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_TX_STREAMER_ZERO_COPY_HPP +#define INCLUDED_LIBUHD_TX_STREAMER_ZERO_COPY_HPP + +#include <uhd/config.hpp> +#include <uhd/stream.hpp> +#include <uhd/types/metadata.hpp> +#include <vector> + +namespace uhd { namespace transport { + +/*! + * Implementation of rx streamer manipulation of frame buffers and packet info. + * This class is part of tx_streamer_impl, split into a separate unit as it is + * a mostly self-contained portion of the streamer logic. + */ +template <typename transport_t> +class tx_streamer_zero_copy +{ +public: +    //! Constructor +    tx_streamer_zero_copy(const size_t num_chans) +        : _xports(num_chans), _frame_buffs(num_chans) +    { +    } + +    //! Connect a new channel to the streamer +    void connect_channel(const size_t port, typename transport_t::uptr xport) +    { +        if (port >= get_num_channels()) { +            throw uhd::index_error( +                "Port number indexes beyond the number of streamer ports"); +        } + +        if (_xports[port]) { +            throw uhd::runtime_error( +                "Streamer port number is already connected to a port"); +        } + +        _xports[port] = std::move(xport); +    } + +    //! Returns number of channels handled by this streamer +    size_t get_num_channels() const +    { +        return _xports.size(); +    } + +    //! Configures tick rate for conversion of timestamp +    void set_tick_rate(const double rate) +    { +        _tick_rate = rate; +    } + +    //! Configures the size of each sample +    void set_bytes_per_item(const size_t bpi) +    { +        _bytes_per_item = bpi; +    } + +    /*! +     * Gets a set of frame buffers, one per channel. +     * +     * \param buffs returns a pointer to the buffer data +     * \param nsamps_per_buff the number of samples that will be written to each buffer +     * \param metadata the metadata to write to the packet header +     * \param timeout_ms timeout in milliseconds +     * \return true if the operation was sucessful, false if timeout occurs +     */ +    UHD_FORCE_INLINE bool get_send_buffs(std::vector<void*>& buffs, +        const size_t nsamps_per_buff, +        const tx_metadata_t& metadata, +        const int32_t timeout_ms) +    { +        // Try to get a buffer per channel +        for (; _next_buff_to_get < _xports.size(); _next_buff_to_get++) { +            _frame_buffs[_next_buff_to_get].first = +                _xports[_next_buff_to_get]->get_send_buff(timeout_ms); + +            if (!_frame_buffs[_next_buff_to_get].first) { +                return false; +            } +        } + +        // Got all the buffers, start from index 0 next call +        _next_buff_to_get = 0; + +        // Store portions of metadata we care about +        typename transport_t::packet_info_t info; +        info.has_tsf = metadata.has_time_spec; + +        if (metadata.has_time_spec) { +            info.tsf = metadata.time_spec.to_ticks(_tick_rate); +        } + +        info.payload_bytes = nsamps_per_buff * _bytes_per_item; +        info.eob           = metadata.end_of_burst; + +        // Write packet header +        for (size_t i = 0; i < buffs.size(); i++) { +            std::tie(buffs[i], _frame_buffs[i].second) = +                _xports[i]->write_packet_header(_frame_buffs[i].first, info); +        } + +        return true; +    } + +    /*! +     * Send the packet for the specified channel +     * +     * \param channel the channel for which to release the packet +     */ +    UHD_FORCE_INLINE void release_send_buff(const size_t channel) +    { +        _frame_buffs[channel].first->set_packet_size(_frame_buffs[channel].second); +        _xports[channel]->release_send_buff(std::move(_frame_buffs[channel].first)); + +        _frame_buffs[channel].first  = nullptr; +        _frame_buffs[channel].second = 0; +    } + +private: +    // Transports for each channel +    std::vector<typename transport_t::uptr> _xports; + +    // Container to hold frame buffers for each channel and their packet sizes +    std::vector<std::pair<typename transport_t::buff_t::uptr, size_t>> _frame_buffs; + +    // Rate used in conversion of timestamp to time_spec_t +    double _tick_rate = 1.0; + +    // Size of a sample on the device +    size_t _bytes_per_item = 0; + +    // Next channel from which to get a buffer, stored as a member to +    // allow the streamer to continue where it stopped due to timeouts. +    size_t _next_buff_to_get = 0; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_LIBUHD_TX_STREAMER_ZERO_COPY_HPP */ diff --git a/host/lib/rfnoc/CMakeLists.txt b/host/lib/rfnoc/CMakeLists.txt index dfef4f90f..963458fe6 100644 --- a/host/lib/rfnoc/CMakeLists.txt +++ b/host/lib/rfnoc/CMakeLists.txt @@ -53,6 +53,8 @@ LIBUHD_APPEND_SOURCES(      ${CMAKE_CURRENT_SOURCE_DIR}/tick_node_ctrl.cpp      ${CMAKE_CURRENT_SOURCE_DIR}/tx_stream_terminator.cpp      ${CMAKE_CURRENT_SOURCE_DIR}/wb_iface_adapter.cpp +    ${CMAKE_CURRENT_SOURCE_DIR}/rfnoc_rx_streamer.cpp +    ${CMAKE_CURRENT_SOURCE_DIR}/rfnoc_tx_streamer.cpp      # Default block control classes:      ${CMAKE_CURRENT_SOURCE_DIR}/block_control.cpp      ${CMAKE_CURRENT_SOURCE_DIR}/ddc_block_control.cpp diff --git a/host/lib/rfnoc/chdr_packet.cpp b/host/lib/rfnoc/chdr_packet.cpp index 653181c04..d4b7494e2 100644 --- a/host/lib/rfnoc/chdr_packet.cpp +++ b/host/lib/rfnoc/chdr_packet.cpp @@ -31,7 +31,7 @@ public:      {          assert(pkt_buff);          _pkt_buff = const_cast<uint64_t*>(reinterpret_cast<const uint64_t*>(pkt_buff)); -        _compute_mdata_offset(); +        _mdata_offset = _compute_mdata_offset(get_chdr_header());      }      virtual void refresh(void* pkt_buff, chdr_header& header, uint64_t timestamp = 0) @@ -42,7 +42,7 @@ public:          if (_has_timestamp(header)) {              _pkt_buff[1] = u64_from_host(timestamp);          } -        _compute_mdata_offset(); +        _mdata_offset = _compute_mdata_offset(get_chdr_header());      }      virtual void update_payload_size(size_t payload_size_bytes) @@ -115,19 +115,27 @@ public:              + (chdr_w_stride * (_mdata_offset + get_chdr_header().get_num_mdata())));      } +    virtual size_t calculate_payload_offset(const packet_type_t pkt_type, +        const uint8_t num_mdata = 0) const +    { +        chdr_header header; +        header.set_pkt_type(pkt_type); +        return (_compute_mdata_offset(header) + num_mdata) * chdr_w_bytes; +    } +  private:      inline bool _has_timestamp(const chdr_header& header) const      {          return (header.get_pkt_type() == PKT_TYPE_DATA_WITH_TS);      } -    inline void _compute_mdata_offset() const +    inline size_t _compute_mdata_offset(const chdr_header& header) const      {          // The metadata offset depends on the chdr_w and whether we have a timestamp          if (chdr_w == 64) { -            _mdata_offset = _has_timestamp(get_chdr_header()) ? 2 : 1; +            return _has_timestamp(header) ? 2 : 1;          } else { -            _mdata_offset = 1; +            return 1;          }      } diff --git a/host/lib/rfnoc/graph_stream_manager.cpp b/host/lib/rfnoc/graph_stream_manager.cpp index f2024786a..2db68db04 100644 --- a/host/lib/rfnoc/graph_stream_manager.cpp +++ b/host/lib/rfnoc/graph_stream_manager.cpp @@ -8,7 +8,9 @@  #include <uhd/utils/log.hpp>  #include <uhdlib/rfnoc/graph_stream_manager.hpp>  #include <uhdlib/rfnoc/link_stream_manager.hpp> +#include <uhdlib/rfnoc/chdr_rx_data_xport.hpp>  #include <boost/format.hpp> +#include <boost/make_shared.hpp>  #include <map>  #include <set> @@ -175,6 +177,38 @@ public:                                   "specified source endpoint");      } +    chdr_rx_data_xport::uptr create_device_to_host_data_stream( +        const sep_addr_t src_addr, +        const sw_buff_t pyld_buff_fmt, +        const sw_buff_t mdata_buff_fmt, +        const device_addr_t& xport_args) +    { +        // TODO: choose a route +        const device_id_t via_device = NULL_DEVICE_ID; + +        return _link_mgrs.at(_check_dst_and_find_src(src_addr, via_device)) +            ->create_device_to_host_data_stream(src_addr, +                pyld_buff_fmt, +                mdata_buff_fmt, +                xport_args); +    } + +    virtual chdr_tx_data_xport::uptr create_host_to_device_data_stream( +        sep_addr_t dst_addr, +        const sw_buff_t pyld_buff_fmt, +        const sw_buff_t mdata_buff_fmt, +        const device_addr_t& xport_args) +    { +        // TODO: choose a route +        const device_id_t via_device = NULL_DEVICE_ID; + +        return _link_mgrs.at(_check_dst_and_find_src(dst_addr, via_device)) +            ->create_host_to_device_data_stream(dst_addr, +                pyld_buff_fmt, +                mdata_buff_fmt, +                xport_args); +    } +  private:      device_id_t _check_dst_and_find_src(sep_addr_t dst_addr, device_id_t via_device) const      { diff --git a/host/lib/rfnoc/link_stream_manager.cpp b/host/lib/rfnoc/link_stream_manager.cpp index 4fe183529..6855162de 100644 --- a/host/lib/rfnoc/link_stream_manager.cpp +++ b/host/lib/rfnoc/link_stream_manager.cpp @@ -203,86 +203,71 @@ public:                  false,                  STREAM_SETUP_TIMEOUT); -        // Compute FC frequency and headroom based on buff parameters -        stream_buff_params_t fc_freq{ -            static_cast<uint64_t>(std::ceil(double(buff_params.bytes) * fc_freq_ratio)), -            static_cast<uint32_t>( -                std::ceil(double(buff_params.packets) * fc_freq_ratio))}; -        stream_buff_params_t fc_headroom{ -            static_cast<uint64_t>( -                std::ceil(double(buff_params.bytes) * fc_headroom_ratio)), -            static_cast<uint32_t>( -                std::ceil(double(buff_params.packets) * fc_headroom_ratio))}; -          // Reconfigure flow control using the new frequency and headroom          return _mgmt_portal->config_remote_stream(*_ctrl_xport,              dst_epid,              src_epid,              lossy_xport, -            fc_freq, -            fc_headroom, +            _get_buff_params_ratio(buff_params, fc_freq_ratio), +            _get_buff_params_ratio(buff_params, fc_headroom_ratio),              reset,              STREAM_SETUP_TIMEOUT);      }      virtual chdr_tx_data_xport::uptr create_host_to_device_data_stream(          const sep_addr_t dst_addr, -        const bool lossy_xport,          const sw_buff_t pyld_buff_fmt,          const sw_buff_t mdata_buff_fmt, -        const double fc_freq_ratio, -        const double fc_headroom_ratio,          const device_addr_t& xport_args)      { -        // Create a new source endpoint and EPID -        sep_addr_t sw_epid_addr(_my_device_id, SEP_INST_DATA_BASE + (_data_ep_inst++)); -        sep_id_t src_epid = _epid_alloc->allocate_epid(sw_epid_addr); -        _allocated_epids.insert(src_epid);          _ensure_ep_is_reachable(dst_addr); -        // Generate a new destination EPID instance +        // Generate a new destination (device) EPID instance          sep_id_t dst_epid = _epid_alloc->allocate_epid(dst_addr); +        _mgmt_portal->initialize_endpoint(*_ctrl_xport, dst_addr, dst_epid); -        // Create the data transport that we will return to the client -        chdr_rx_data_xport::uptr xport = _mb_iface.make_rx_data_transport( -            _my_device_id, src_epid, dst_epid, xport_args); - -        chdr_ctrl_xport::sptr mgmt_xport = -            _mb_iface.make_ctrl_transport(_my_device_id, src_epid); - -        // Create new temporary management portal with the transports used for this stream -        // TODO: This is a bit excessive. Maybe we can pair down the functionality of the -        // portal just for route setup purposes. Whatever we do, we *must* use xport in it -        // though otherwise the transport will not behave correctly. -        mgmt_portal::uptr data_mgmt_portal = -            mgmt_portal::make(*mgmt_xport, _pkt_factory, sw_epid_addr, src_epid); - -        // Setup a route to the EPID -        data_mgmt_portal->initialize_endpoint(*mgmt_xport, dst_addr, dst_epid); -        data_mgmt_portal->setup_local_route(*mgmt_xport, dst_epid);          if (!_mgmt_portal->get_endpoint_info(dst_epid).has_data) {              throw uhd::rfnoc_error("Downstream endpoint does not support data traffic");          } -        // TODO: Implement data transport setup logic here - +        // Create a new destination (host) endpoint and EPID +        sep_addr_t sw_epid_addr(_my_device_id, SEP_INST_DATA_BASE + (_data_ep_inst++)); +        sep_id_t src_epid = _epid_alloc->allocate_epid(sw_epid_addr); +        _allocated_epids.insert(src_epid); -        // Delete the portal when done -        data_mgmt_portal.reset(); -        return xport; +        return _mb_iface.make_tx_data_transport({sw_epid_addr, dst_addr}, +            {src_epid, dst_epid}, +            pyld_buff_fmt, +            mdata_buff_fmt, +            xport_args);      } -    virtual chdr_tx_data_xport::uptr create_device_to_host_data_stream( -        const sep_addr_t src_addr, -        const bool lossy_xport, +    virtual chdr_rx_data_xport::uptr create_device_to_host_data_stream( +        sep_addr_t src_addr,          const sw_buff_t pyld_buff_fmt,          const sw_buff_t mdata_buff_fmt, -        const double fc_freq_ratio, -        const double fc_headroom_ratio,          const device_addr_t& xport_args)      { -        // TODO: Implement me -        return chdr_tx_data_xport::uptr(); +        _ensure_ep_is_reachable(src_addr); + +        // Generate a new source (device) EPID instance +        sep_id_t src_epid = _epid_alloc->allocate_epid(src_addr); +        _mgmt_portal->initialize_endpoint(*_ctrl_xport, src_addr, src_epid); + +        if (!_mgmt_portal->get_endpoint_info(src_epid).has_data) { +            throw uhd::rfnoc_error("Downstream endpoint does not support data traffic"); +        } + +        // Create a new destination (host) endpoint and EPID +        sep_addr_t sw_epid_addr(_my_device_id, SEP_INST_DATA_BASE + (_data_ep_inst++)); +        sep_id_t dst_epid = _epid_alloc->allocate_epid(sw_epid_addr); +        _allocated_epids.insert(dst_epid); + +        return _mb_iface.make_rx_data_transport({src_addr, sw_epid_addr}, +            {src_epid, dst_epid}, +            pyld_buff_fmt, +            mdata_buff_fmt, +            xport_args);      }  private: @@ -295,7 +280,14 @@ private:          throw uhd::routing_error("Specified endpoint is not reachable");      } -    // A reference to the packet factor +    stream_buff_params_t _get_buff_params_ratio( +        const stream_buff_params_t& buff_params, const double ratio) +    { +        return {static_cast<uint64_t>(std::ceil(double(buff_params.bytes) * ratio)), +            static_cast<uint32_t>(std::ceil(double(buff_params.packets) * ratio))}; +    } + +    // A reference to the packet factory      const chdr::chdr_packet_factory& _pkt_factory;      // The device address of this software endpoint      const device_id_t _my_device_id; diff --git a/host/lib/rfnoc/mgmt_portal.cpp b/host/lib/rfnoc/mgmt_portal.cpp index b490e0baf..a8b72cbdf 100644 --- a/host/lib/rfnoc/mgmt_portal.cpp +++ b/host/lib/rfnoc/mgmt_portal.cpp @@ -794,12 +794,11 @@ private: // Functions          mgmt_hop_t& hop)      {          // Validate flow control parameters -        if (fc_freq.bytes >= (uint64_t(1) << 40) -            || fc_freq.packets >= (uint64_t(1) << 24)) { +        if (fc_freq.bytes > MAX_FC_FREQ_BYTES || fc_freq.packets > MAX_FC_FREQ_PKTS) {              throw uhd::value_error("Flow control frequency parameters out of bounds");          } -        if (fc_headroom.bytes >= (uint64_t(1) << 16) -            || fc_headroom.packets >= (uint64_t(1) << 8)) { +        if (fc_headroom.bytes > MAX_FC_HEADROOM_BYTES +            || fc_headroom.packets > MAX_FC_HEADROOM_PKTS) {              throw uhd::value_error("Flow control headroom parameters out of bounds");          } @@ -992,7 +991,8 @@ private: // Functions          auto send_buff = xport.get_send_buff(timeout * 1000);          if (not send_buff) { -            UHD_LOG_ERROR("RFNOC::MGMT", "Timed out getting send buff for management transaction"); +            UHD_LOG_ERROR( +                "RFNOC::MGMT", "Timed out getting send buff for management transaction");              throw uhd::io_error("Timed out getting send buff for management transaction");          }          _send_pkt->refresh(send_buff->data(), header, payload); diff --git a/host/lib/rfnoc/rfnoc_graph.cpp b/host/lib/rfnoc/rfnoc_graph.cpp index dd3dd7b90..4bf35cff1 100644 --- a/host/lib/rfnoc/rfnoc_graph.cpp +++ b/host/lib/rfnoc/rfnoc_graph.cpp @@ -15,12 +15,18 @@  #include <uhdlib/rfnoc/graph.hpp>  #include <uhdlib/rfnoc/graph_stream_manager.hpp>  #include <uhdlib/rfnoc/rfnoc_device.hpp> +#include <uhdlib/rfnoc/rfnoc_rx_streamer.hpp> +#include <uhdlib/rfnoc/rfnoc_tx_streamer.hpp>  #include <uhdlib/utils/narrow.hpp> +#include <boost/make_shared.hpp>  #include <boost/shared_ptr.hpp> // FIXME remove when rfnoc_device is ready  #include <memory>  using namespace uhd::rfnoc; +namespace { +const std::string LOG_ID("RFNOC::GRAPH"); +}  class rfnoc_graph_impl : public rfnoc_graph  { @@ -33,6 +39,7 @@ public:          , _graph(std::make_unique<uhd::rfnoc::detail::graph_t>())      {          setup_graph(dev_addr); +        _init_sep_map();          _init_static_connections();      } @@ -76,31 +83,136 @@ public:          }          if (!has_block(dst_blk)) {              throw uhd::lookup_error( -                std::string("Cannot connect blocks, source block not found: ") -                + src_blk.to_string()); +                std::string("Cannot connect blocks, destination block not found: ") +                + dst_blk.to_string());          } +        auto edge_type = _physical_connect(src_blk, src_port, dst_blk, dst_port);          _connect(get_block(src_blk),              src_port,              get_block(dst_blk),              dst_port, +            edge_type,              skip_property_propagation); -        _physical_connect(src_blk, src_port, dst_blk, dst_port);      } -    void connect(uhd::tx_streamer& /*streamer*/, -        size_t /*strm_port*/, -        const block_id_t& /*dst_blk*/, -        size_t /*dst_port*/) +    void connect(uhd::tx_streamer::sptr streamer, +        size_t strm_port, +        const block_id_t& dst_blk, +        size_t dst_port)      { -        throw uhd::not_implemented_error(""); +        // Verify the streamer was created by us +        auto rfnoc_streamer = boost::dynamic_pointer_cast<rfnoc_tx_streamer>(streamer); +        if (!rfnoc_streamer) { +            throw uhd::type_error("Streamer is not rfnoc capable"); +        } + +        // Verify src_blk even exists in this graph +        if (!has_block(dst_blk)) { +            throw uhd::lookup_error( +                std::string("Cannot connect block to streamer, source block not found: ") +                + dst_blk.to_string()); +        } + +        // Verify src_blk has an SEP upstream +        graph_edge_t dst_static_edge = _assert_edge( +            _get_static_edge( +                [dst_blk_id = dst_blk.to_string(), dst_port](const graph_edge_t& edge) { +                    return edge.dst_blockid == dst_blk_id && edge.dst_port == dst_port; +                }), +            dst_blk.to_string()); +        if (block_id_t(dst_static_edge.src_blockid).get_block_name() != NODE_ID_SEP) { +            const std::string err_msg = +                dst_blk.to_string() + ":" + std::to_string(dst_port) +                + " is not connected to an SEP! Routing impossible."; +            UHD_LOG_ERROR(LOG_ID, err_msg); +            throw uhd::routing_error(err_msg); +        } + +        // Now get the name and address of the SEP +        const std::string sep_block_id = dst_static_edge.src_blockid; +        const sep_addr_t sep_addr      = _sep_map.at(sep_block_id); + +        const sw_buff_t pyld_fmt = +            bits_to_sw_buff(rfnoc_streamer->get_otw_item_comp_bit_width()); +        const sw_buff_t mdata_fmt = BUFF_U64; + +        auto xport = _gsm->create_host_to_device_data_stream(sep_addr, +            pyld_fmt, +            mdata_fmt, +            rfnoc_streamer->get_stream_args().args); + +        rfnoc_streamer->connect_channel(strm_port, std::move(xport)); + +        //// If this worked, then also connect the streamer in the BGL graph +        auto dst = get_block(dst_blk); +        graph_edge_t edge_info(strm_port, dst_port, graph_edge_t::TX_STREAM, true); +        _graph->connect(rfnoc_streamer.get(), dst.get(), edge_info);      } -    void connect(const block_id_t& /*src_blk*/, -        size_t /*src_port*/, -        uhd::rx_streamer& /*streamer*/, -        size_t /*strm_port*/) +    void connect(const block_id_t& src_blk, +        size_t src_port, +        uhd::rx_streamer::sptr streamer, +        size_t strm_port)      { -        throw uhd::not_implemented_error(""); +        // Verify the streamer was created by us +        auto rfnoc_streamer = boost::dynamic_pointer_cast<rfnoc_rx_streamer>(streamer); +        if (!rfnoc_streamer) { +            throw uhd::type_error("Streamer is not rfnoc capable"); +        } + +        // Verify src_blk even exists in this graph +        if (!has_block(src_blk)) { +            throw uhd::lookup_error( +                std::string("Cannot connect block to streamer, source block not found: ") +                + src_blk.to_string()); +        } + +        // Verify src_blk has an SEP downstream +        graph_edge_t src_static_edge = _assert_edge( +            _get_static_edge( +                [src_blk_id = src_blk.to_string(), src_port](const graph_edge_t& edge) { +                    return edge.src_blockid == src_blk_id && edge.src_port == src_port; +                }), +            src_blk.to_string()); +        if (block_id_t(src_static_edge.dst_blockid).get_block_name() != NODE_ID_SEP) { +            const std::string err_msg = +                src_blk.to_string() + ":" + std::to_string(src_port) +                + " is not connected to an SEP! Routing impossible."; +            UHD_LOG_ERROR(LOG_ID, err_msg); +            throw uhd::routing_error(err_msg); +        } + +        // Now get the name and address of the SEP +        const std::string sep_block_id = src_static_edge.dst_blockid; +        const sep_addr_t sep_addr      = _sep_map.at(sep_block_id); + +        const sw_buff_t pyld_fmt = +            bits_to_sw_buff(rfnoc_streamer->get_otw_item_comp_bit_width()); +        const sw_buff_t mdata_fmt = BUFF_U64; + +        auto xport = _gsm->create_device_to_host_data_stream(sep_addr, +            pyld_fmt, +            mdata_fmt, +            rfnoc_streamer->get_stream_args().args); + +        rfnoc_streamer->connect_channel(strm_port, std::move(xport)); + +        // If this worked, then also connect the streamer in the BGL graph +        auto src = get_block(src_blk); +        graph_edge_t edge_info(src_port, strm_port, graph_edge_t::RX_STREAM, true); +        _graph->connect(src.get(), rfnoc_streamer.get(), edge_info); +    } + +    uhd::rx_streamer::sptr create_rx_streamer( +        const size_t num_chans, const uhd::stream_args_t& args) +    { +        return boost::make_shared<rfnoc_rx_streamer>(num_chans, args); +    } + +    uhd::tx_streamer::sptr create_tx_streamer( +        const size_t num_chans, const uhd::stream_args_t& args) +    { +        return boost::make_shared<rfnoc_tx_streamer>(num_chans, args);      }      std::shared_ptr<mb_controller> get_mb_controller(const size_t mb_index = 0) @@ -152,7 +264,7 @@ private:              throw uhd::key_error(std::string("Found no RFNoC devices for ----->\n")                                   + dev_addr.to_pp_string());          } -        _tree = _device->get_tree(); +        _tree        = _device->get_tree();          _num_mboards = _tree->list("/mboards").size();          for (size_t i = 0; i < _num_mboards; ++i) {              _mb_controllers.emplace(i, _device->get_mb_controller(i)); @@ -170,7 +282,8 @@ private:          try {              _gsm = graph_stream_manager::make(pkt_factory, epid_alloc, links);          } catch (uhd::io_error& ex) { -            UHD_LOG_ERROR("RFNOC::GRAPH", "IO Error during GSM initialization. " << ex.what()); +            UHD_LOG_ERROR( +                "RFNOC::GRAPH", "IO Error during GSM initialization. " << ex.what());              throw;          } @@ -187,6 +300,9 @@ private:              _gsm->connect_host_to_device(ctrl_sep_addr);              // Grab and stash the Client Zero for this mboard              detail::client_zero::sptr mb_cz = _gsm->get_client_zero(ctrl_sep_addr); +            // Client zero port numbers are based on the control xbar numbers, +            // which have the client 0 interface first, followed by stream +            // endpoints, and then the blocks.              _client_zeros.emplace(mb_idx, mb_cz);              const size_t num_blocks       = mb_cz->get_num_blocks(); @@ -204,7 +320,7 @@ private:              // Iterate through and register each of the blocks in this mboard              for (size_t portno = 0; portno < num_blocks; ++portno) { -                auto noc_id = mb_cz->get_noc_id(portno + first_block_port); +                auto noc_id             = mb_cz->get_noc_id(portno + first_block_port);                  auto block_factory_info = factory::get_block_factory(noc_id);                  auto block_info = mb_cz->get_block_info(portno + first_block_port);                  block_id_t block_id(mb_idx, @@ -222,24 +338,25 @@ private:                  //   iface object through the mb_iface                  auto ctrlport_clk_iface =                      mb.get_clock_iface(block_factory_info.ctrlport_clk); -                auto tb_clk_iface = (block_factory_info.timebase_clk == CLOCK_KEY_GRAPH) ? -                    std::make_shared<clock_iface>(CLOCK_KEY_GRAPH) : -                    mb.get_clock_iface(block_factory_info.timebase_clk); +                auto tb_clk_iface = +                    (block_factory_info.timebase_clk == CLOCK_KEY_GRAPH) +                        ? std::make_shared<clock_iface>(CLOCK_KEY_GRAPH) +                        : mb.get_clock_iface(block_factory_info.timebase_clk);                  // A "graph" clock is always "running"                  if (block_factory_info.timebase_clk == CLOCK_KEY_GRAPH) {                      tb_clk_iface->set_running(true);                  } -                auto block_reg_iface = _gsm->get_block_register_iface(ctrl_sep_addr, +                auto block_reg_iface   = _gsm->get_block_register_iface(ctrl_sep_addr,                      portno,                      *ctrlport_clk_iface.get(),                      *tb_clk_iface.get()); -                auto make_args_uptr = std::make_unique<noc_block_base::make_args_t>(); +                auto make_args_uptr    = std::make_unique<noc_block_base::make_args_t>();                  make_args_uptr->noc_id = noc_id; -                make_args_uptr->block_id = block_id; -                make_args_uptr->num_input_ports = block_info.num_inputs; -                make_args_uptr->num_output_ports = block_info.num_outputs; -                make_args_uptr->reg_iface = block_reg_iface; -                make_args_uptr->tb_clk_iface = tb_clk_iface; +                make_args_uptr->block_id           = block_id; +                make_args_uptr->num_input_ports    = block_info.num_inputs; +                make_args_uptr->num_output_ports   = block_info.num_outputs; +                make_args_uptr->reg_iface          = block_reg_iface; +                make_args_uptr->tb_clk_iface       = tb_clk_iface;                  make_args_uptr->ctrlport_clk_iface = ctrlport_clk_iface;                  make_args_uptr->mb_control = (factory::has_requested_mb_access(noc_id)                                                    ? _mb_controllers.at(mb_idx) @@ -262,40 +379,43 @@ private:          _block_registry->init_props();      } +    void _init_sep_map() +    { +        for (size_t mb_idx = 0; mb_idx < get_num_mboards(); ++mb_idx) { +            auto remote_device_id = _device->get_mb_iface(mb_idx).get_remote_device_id(); +            auto& cz              = _client_zeros.at(mb_idx); +            for (size_t sep_idx = 0; sep_idx < cz->get_num_stream_endpoints(); +                 ++sep_idx) { +                // Register ID in _port_block_map +                block_id_t id(mb_idx, NODE_ID_SEP, sep_idx); +                _port_block_map.insert({{mb_idx, sep_idx + 1}, id}); +                _sep_map.insert({id.to_string(), sep_addr_t(remote_device_id, sep_idx)}); +            } +        } +    } +      void _init_static_connections()      {          UHD_LOG_TRACE("RFNOC::GRAPH", "Identifying static connections...");          for (auto& kv_cz : _client_zeros) { -            auto& adjacency_list          = kv_cz.second->get_adjacency_list(); -            const size_t first_block_port = 1 + kv_cz.second->get_num_stream_endpoints(); - +            auto& adjacency_list = kv_cz.second->get_adjacency_list();              for (auto& edge : adjacency_list) {                  // Assemble edge                  auto graph_edge = graph_edge_t(); -                if (edge.src_blk_index < first_block_port) { -                    block_id_t id(kv_cz.first, NODE_ID_SEP, edge.src_blk_index - 1); -                    _port_block_map.insert({{kv_cz.first, edge.src_blk_index}, id}); -                    graph_edge.src_blockid = id.to_string(); -                } else { -                    graph_edge.src_blockid = -                        _port_block_map.at({kv_cz.first, edge.src_blk_index}); -                } -                if (edge.dst_blk_index < first_block_port) { -                    block_id_t id(kv_cz.first, NODE_ID_SEP, edge.dst_blk_index - 1); -                    _port_block_map.insert({{kv_cz.first, edge.dst_blk_index}, id}); -                    graph_edge.dst_blockid = id.to_string(); -                } else { -                    graph_edge.dst_blockid = -                        _port_block_map.at({kv_cz.first, edge.dst_blk_index}); -                } +                UHD_ASSERT_THROW( +                    _port_block_map.count({kv_cz.first, edge.src_blk_index})); +                graph_edge.src_blockid = +                    _port_block_map.at({kv_cz.first, edge.src_blk_index}); +                UHD_ASSERT_THROW( +                    _port_block_map.count({kv_cz.first, edge.dst_blk_index})); +                graph_edge.dst_blockid = +                    _port_block_map.at({kv_cz.first, edge.dst_blk_index});                  graph_edge.src_port = edge.src_blk_port;                  graph_edge.dst_port = edge.dst_blk_port;                  graph_edge.edge     = graph_edge_t::edge_t::STATIC;                  _static_edges.push_back(graph_edge); -                UHD_LOG_TRACE("RFNOC::GRAPH", -                    "Static connection: " -                        << graph_edge.src_blockid << ":" << graph_edge.src_port << " -> " -                        << graph_edge.dst_blockid << ":" << graph_edge.dst_port); +                UHD_LOG_TRACE( +                    "RFNOC::GRAPH", "Static connection: " << graph_edge.to_string());              }          }      } @@ -312,214 +432,98 @@ private:          size_t src_port,          std::shared_ptr<node_t> dst_blk,          size_t dst_port, +        graph_edge_t::edge_t edge_type,          bool skip_property_propagation)      {          graph_edge_t edge_info( -            src_port, dst_port, graph_edge_t::DYNAMIC, not skip_property_propagation); +            src_port, dst_port, edge_type, not skip_property_propagation);          edge_info.src_blockid = src_blk->get_unique_id();          edge_info.dst_blockid = dst_blk->get_unique_id();          _graph->connect(src_blk.get(), dst_blk.get(), edge_info);      } -    /*! Helper method to find a stream endpoint connected to a block -     * -     * \param blk_id the block connected to the stream endpoint -     * \param port the port connected to the stream endpoint -     * \param blk_is_src true if the block is a data source, false if it is a -     *                   destination -     * \return the address of the stream endpoint, or boost::none if it is not -     *         directly connected to a stream endpoint -     */ -    boost::optional<sep_addr_t> _get_adjacent_sep( -        const block_id_t& blk_id, const size_t port, const bool blk_is_src) const -    { -        const std::string block_id_str = get_block(blk_id)->get_block_id().to_string(); -        UHD_LOG_TRACE("RFNOC::GRAPH", -            "Finding SEP for " << (blk_is_src ? "source" : "dst") << " block " -                               << block_id_str << ":" << port); -        // TODO: This is an attempt to simplify the algo, but it turns out to be -        // as many lines as before: -        //auto edge_predicate = [blk_is_src, block_id_str](const graph_edge_t edge) { -            //if (blk_is_src) { -                //return edge.src_blockid == block_id_str; -            //} -            //return edge.dst_blockid == block_id_str; -        //}; - -        //auto blk_edge_it = -            //std::find_if(_static_edges.cbegin(), _static_edges.cend(), edge_predicate); -        //if (blk_edge_it == _static_edges.cend()) { -            //return boost::none; -        //} - -        //const std::string sep_block_id = blk_is_src ? -            //blk_edge_it->dst_blockid : blk_edge_it->src_blockid; -        //UHD_LOG_TRACE("RFNOC::GRAPH", -            //"Found SEP: " << sep_block_id); - -        //auto port_map_result = std::find_if(_port_block_map.cbegin(), -            //_port_block_map.cend, -            //[sep_block_id](std::pair<std::pair<size_t, size_t>, block_id_t> port_block) { -                //return port_block.second == sep_block_id; -            //}); -        //if (port_map_result == _port_block_map.cend()) { -            //throw uhd::lookup_error( -                //std::string("SEP `") + sep_block_id + "' not found in port/block map!"); -        //} -        //const auto dev = _device->get_mb_iface(mb_idx).get_remote_device_id(); -        //const sep_inst_t sep_inst = blk_is_src ? -            //edge.dst_blk_index - 1 : edge.src_blk_index - 1; -        //return sep_addr_t(dev, sep_inst); - -        const auto& info  = _xbar_block_config.at(block_id_str); -        const auto mb_idx = blk_id.get_device_no(); -        const auto cz     = _client_zeros.at(mb_idx); - -        const size_t first_block_port = 1 + cz->get_num_stream_endpoints(); - -        for (const auto& edge : cz->get_adjacency_list()) { -            const auto edge_blk_idx = blk_is_src ? edge.src_blk_index -                                                 : edge.dst_blk_index; -            const auto edge_blk_port = blk_is_src ? edge.src_blk_port : edge.dst_blk_port; - -            if ((edge_blk_idx == info.xbar_port + first_block_port) -                and (edge_blk_port == port)) { -                UHD_LOGGER_DEBUG("RFNOC::GRAPH") -                    << boost::format("Block found in adjacency list. %d:%d->%d:%d") -                           % edge.src_blk_index -                           % static_cast<unsigned int>(edge.src_blk_port) -                           % edge.dst_blk_index -                           % static_cast<unsigned int>(edge.dst_blk_port); - -                // Check that the block is connected to a stream endpoint. The -                // minus one here is because index zero is client 0. -                const sep_inst_t sep_inst = blk_is_src ? -                    edge.dst_blk_index - 1 : edge.src_blk_index - 1; - -                if (sep_inst < cz->get_num_stream_endpoints()) { -                    const auto dev = _device->get_mb_iface(mb_idx).get_remote_device_id(); -                    return sep_addr_t(dev, sep_inst); -                } else { -                    // Block is connected to another block -                    return boost::none; -                } -            } -        } -        return boost::none; -    } -      /*! Internal physical connection helper       *       * Make the connections in the physical device       * -     * \throws connect_disallowed_on_src -     *     if the source port is statically connected to a *different* block -     * \throws connect_disallowed_on_dst -     *     if the destination port is statically connected to a *different* block +     * \throws uhd::routing_error +     *     if the blocks are statically connected to something else       */ -    void _physical_connect(const block_id_t& src_blk, +    graph_edge_t::edge_t _physical_connect(const block_id_t& src_blk,          size_t src_port,          const block_id_t& dst_blk,          size_t dst_port)      { -        auto src_blk_ctrl = get_block(src_blk); -        auto dst_blk_ctrl = get_block(dst_blk); - -        /* -         * Start by determining if the connection can be made -         * Get the adjacency list and check if the connection is in it already -         */ -        // Read the adjacency list for the source and destination blocks -        auto src_mb_idx = src_blk.get_device_no(); -        auto src_cz     = _gsm->get_client_zero( -            sep_addr_t(_device->get_mb_iface(src_mb_idx).get_remote_device_id(), 0)); -        std::vector<detail::client_zero::edge_def_t>& adj_list = -            src_cz->get_adjacency_list(); -        // Check the src_blk -        auto src_blk_xbar_info = -            _xbar_block_config.at(src_blk_ctrl->get_block_id().to_string()); -        // This "xbar_port" starts at the first block, so we need to add the client zero -        // and stream endpoint ports -        const auto src_xbar_port = -            src_blk_xbar_info.xbar_port + src_cz->get_num_stream_endpoints() + 1; -        // We can also find out which stream endpoint the src block is connected to -        sep_inst_t src_sep; -        for (detail::client_zero::edge_def_t edge : adj_list) { -            if ((edge.src_blk_index == src_xbar_port) -                and (edge.src_blk_port == src_port)) { -                UHD_LOGGER_DEBUG("RFNOC::GRAPH") -                    << boost::format("Block found in adjacency list. %d:%d->%d:%d") -                           % edge.src_blk_index -                           % static_cast<unsigned int>(edge.src_blk_port) -                           % edge.dst_blk_index -                           % static_cast<unsigned int>(edge.dst_blk_port); -                if (edge.dst_blk_index <= src_cz->get_num_stream_endpoints()) { -                    src_sep = -                        edge.dst_blk_index - 1 /* minus 1 because port 0 is client zero*/; -                } else { -                    // TODO connect_disallowed_on_src? -                    // TODO put more info in exception -                    throw uhd::routing_error( -                        "Unable to connect to statically connected source port"); -                } -            } +        const std::string src_blk_info = +            src_blk.to_string() + ":" + std::to_string(src_port); +        const std::string dst_blk_info = +            dst_blk.to_string() + ":" + std::to_string(dst_port); + +        // Find the static edge for src_blk:src_port +        graph_edge_t src_static_edge = _assert_edge( +            _get_static_edge( +                [src_blk_id = src_blk.to_string(), src_port](const graph_edge_t& edge) { +                    return edge.src_blockid == src_blk_id && edge.src_port == src_port; +                }), +            src_blk_info); + +        // Now see if it's already connected to the destination +        if (src_static_edge.dst_blockid == dst_blk.to_string() +            && src_static_edge.dst_port == dst_port) { +            UHD_LOG_TRACE(LOG_ID, +                "Blocks " << src_blk_info << " and " << dst_blk_info +                          << " are already statically connected, no physical connection " +                             "required."); +            return graph_edge_t::STATIC;          } -        // Read the dst adjacency list if its different -        // TODO they may be on the same mboard, which would make this redundant -        auto dst_mb_idx = dst_blk.get_device_no(); -        auto dst_cz     = _gsm->get_client_zero( -            sep_addr_t(_device->get_mb_iface(dst_mb_idx).get_remote_device_id(), 0)); -        adj_list = dst_cz->get_adjacency_list(); -        // Check the dst blk -        auto dst_blk_xbar_info = -            _xbar_block_config.at(dst_blk_ctrl->get_block_id().to_string()); -        // This "xbar_port" starts at the first block, so we need to add the client zero -        // and stream endpoint ports -        const auto dst_xbar_port = -            dst_blk_xbar_info.xbar_port + dst_cz->get_num_stream_endpoints() + 1; -        // We can also find out which stream endpoint the dst block is connected to -        sep_inst_t dst_sep; -        for (detail::client_zero::edge_def_t edge : adj_list) { -            if ((edge.dst_blk_index == dst_xbar_port) -                and (edge.dst_blk_port == dst_port)) { -                UHD_LOGGER_DEBUG("RFNOC::GRAPH") -                    << boost::format("Block found in adjacency list. %d:%d->%d:%d") -                           % edge.src_blk_index -                           % static_cast<unsigned int>(edge.src_blk_port) -                           % edge.dst_blk_index -                           % static_cast<unsigned int>(edge.dst_blk_port); -                if (edge.src_blk_index <= dst_cz->get_num_stream_endpoints()) { -                    dst_sep = -                        edge.src_blk_index - 1 /* minus 1 because port 0 is client zero*/; -                } else { -                    // TODO connect_disallowed_on_dst? -                    // TODO put more info in exception -                    throw uhd::routing_error( -                        "Unable to connect to statically connected destination port"); -                } -            } +        // If they're not statically connected, the source *must* be connected +        // to an SEP, or this route is impossible +        if (block_id_t(src_static_edge.dst_blockid).get_block_name() != NODE_ID_SEP) { +            const std::string err_msg = +                src_blk_info + " is neither statically connected to " + dst_blk_info +                + " nor to an SEP! Routing impossible."; +            UHD_LOG_ERROR(LOG_ID, err_msg); +            throw uhd::routing_error(err_msg);          } -        /* TODO: we checked if either port is used in a static connection (and its not if -         * we've made it this far). We also need to check something else, but I can't -         * remember what... -         */ - -        // At this point, we know the attempted connection is valid, so let's go ahead and -        // make it -        sep_addr_t src_sep_addr( -            _device->get_mb_iface(src_mb_idx).get_remote_device_id(), src_sep); -        sep_addr_t dst_sep_addr( -            _device->get_mb_iface(dst_mb_idx).get_remote_device_id(), dst_sep); +        // OK, now we know which source SEP we have +        const std::string src_sep_info = src_static_edge.dst_blockid; +        const sep_addr_t src_sep_addr  = _sep_map.at(src_sep_info); + +        // Now find the static edge for the destination SEP +        auto dst_static_edge = _assert_edge( +            _get_static_edge( +                [dst_blk_id = dst_blk.to_string(), dst_port](const graph_edge_t& edge) { +                    return edge.dst_blockid == dst_blk_id && edge.dst_port == dst_port; +                }), +            dst_blk_info); + +        // If they're not statically connected, the source *must* be connected +        // to an SEP, or this route is impossible +        if (block_id_t(dst_static_edge.src_blockid).get_block_name() != NODE_ID_SEP) { +            const std::string err_msg = +                dst_blk_info + " is neither statically connected to " + src_blk_info +                + " nor to an SEP! Routing impossible."; +            UHD_LOG_ERROR(LOG_ID, err_msg); +            throw uhd::routing_error(err_msg); +        } + +        // OK, now we know which destination SEP we have +        const std::string dst_sep_info = dst_static_edge.src_blockid; +        const sep_addr_t dst_sep_addr  = _sep_map.at(dst_sep_info); + +        // Now all we need to do is dynamically connect those SEPs          auto strm_info = _gsm->create_device_to_device_data_stream(              dst_sep_addr, src_sep_addr, false, 0.1, 0.0, false); -        UHD_LOGGER_INFO("RFNOC::GRAPH") +        UHD_LOGGER_DEBUG(LOG_ID)              << boost::format("Data stream between EPID %d and EPID %d established "                               "where downstream buffer can hold %lu bytes and %u packets")                     % std::get<0>(strm_info).first % std::get<0>(strm_info).second                     % std::get<1>(strm_info).bytes % std::get<1>(strm_info).packets; + +        return graph_edge_t::DYNAMIC;      }      //! Flush and reset each connected port on the mboard @@ -541,6 +545,35 @@ private:              mb_cz->reset_ctrl(block_portno);          }      } + +    /*! Find the static edge that matches \p pred +     * +     * \throws uhd::assertion_error if the edge can't be found. So be careful! +     */ +    template <typename UnaryPredicate> +    boost::optional<graph_edge_t> _get_static_edge(UnaryPredicate&& pred) +    { +        auto edge_it = std::find_if(_static_edges.cbegin(), _static_edges.cend(), pred); +        if (edge_it == _static_edges.cend()) { +            return boost::none; +        } +        return *edge_it; +    } + +    /*! Make sure an optional edge info is valid, or throw. +     */ +    graph_edge_t _assert_edge( +        boost::optional<graph_edge_t> edge_o, const std::string& blk_info) +    { +        if (!bool(edge_o)) { +            const std::string err_msg = std::string("Cannot connect block ") + blk_info +                                        + ", port is unconnected in the FPGA!"; +            UHD_LOG_ERROR("RFNOC::GRAPH", err_msg); +            throw uhd::routing_error(err_msg); +        } +        return edge_o.get(); +    } +      /**************************************************************************       * Attributes       *************************************************************************/ @@ -580,6 +613,9 @@ private:      // or SEP      std::map<std::pair<size_t, size_t>, block_id_t> _port_block_map; +    //! Map SEP block ID (e.g. 0/SEP#0) onto a sep_addr_t +    std::unordered_map<std::string, sep_addr_t> _sep_map; +      //! List of statically connected edges. Includes SEPs too!      std::vector<graph_edge_t> _static_edges; diff --git a/host/lib/rfnoc/rfnoc_rx_streamer.cpp b/host/lib/rfnoc/rfnoc_rx_streamer.cpp new file mode 100644 index 000000000..4340faff0 --- /dev/null +++ b/host/lib/rfnoc/rfnoc_rx_streamer.cpp @@ -0,0 +1,141 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include <uhd/rfnoc/defaults.hpp> +#include <uhdlib/rfnoc/node_accessor.hpp> +#include <uhdlib/rfnoc/rfnoc_rx_streamer.hpp> +#include <atomic> + +using namespace uhd; +using namespace uhd::rfnoc; + +const std::string STREAMER_ID = "RxStreamer"; +static std::atomic<uint64_t> streamer_inst_ctr; + +rfnoc_rx_streamer::rfnoc_rx_streamer(const size_t num_chans, +    const uhd::stream_args_t stream_args) +    : rx_streamer_impl<chdr_rx_data_xport>(num_chans, stream_args) +    , _unique_id(STREAMER_ID + "#" + std::to_string(streamer_inst_ctr++)) +    , _stream_args(stream_args) +{ +    // No block to which to forward properties or actions +    set_prop_forwarding_policy(forwarding_policy_t::DROP); +    set_action_forwarding_policy(forwarding_policy_t::DROP); + +    // Initialize properties +    _scaling_in.reserve(num_chans); +    _samp_rate_in.reserve(num_chans); +    _tick_rate_in.reserve(num_chans); +    _type_in.reserve(num_chans); + +    for (size_t i = 0; i < num_chans; i++) { +        _register_props(i, stream_args.otw_format); +    } +    node_accessor_t node_accessor{}; +    node_accessor.init_props(this); +} + +std::string rfnoc_rx_streamer::get_unique_id() const +{ +    return _unique_id; +} + +size_t rfnoc_rx_streamer::get_num_input_ports() const +{ +    return get_num_channels(); +} + +size_t rfnoc_rx_streamer::get_num_output_ports() const +{ +    return 0; +} + +void rfnoc_rx_streamer::issue_stream_cmd(const stream_cmd_t& stream_cmd) +{ +    if (get_num_channels() > 1 and stream_cmd.stream_now +        and stream_cmd.stream_mode != stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS) { +        throw uhd::runtime_error( +            "Invalid recv stream command - stream now on multiple channels in a " +            "single streamer will fail to time align."); +    } + +    auto cmd = stream_cmd_action_info::make(stream_cmd.stream_mode); +    cmd->stream_cmd = stream_cmd; + +    for (size_t i = 0; i < get_num_channels(); i++) { +        const res_source_info info(res_source_info::INPUT_EDGE, i); +        post_action(info, cmd); +    } +} + +const uhd::stream_args_t& rfnoc_rx_streamer::get_stream_args() const +{ +    return _stream_args; +} + +bool rfnoc_rx_streamer::check_topology( +    const std::vector<size_t>& connected_inputs, +    const std::vector<size_t>& connected_outputs) +{ +    // Check that all channels are connected +    if (connected_inputs.size() != get_num_input_ports()) { +        return false; +    } + +    // Call base class to check that connections are valid +    return node_t::check_topology(connected_inputs, connected_outputs); +} + +void rfnoc_rx_streamer::_register_props(const size_t chan, +    const std::string& otw_format) +{ +    // Create actual properties and store them +    _scaling_in.push_back(property_t<double>( +        PROP_KEY_SCALING, {res_source_info::INPUT_EDGE, chan})); +    _samp_rate_in.push_back( +        property_t<double>(PROP_KEY_SAMP_RATE, {res_source_info::INPUT_EDGE, chan})); +    _tick_rate_in.push_back(property_t<double>( +        PROP_KEY_TICK_RATE, {res_source_info::INPUT_EDGE, chan})); +    _type_in.emplace_back(property_t<std::string>( +        PROP_KEY_TYPE, otw_format, {res_source_info::INPUT_EDGE, chan})); + +    // Give us some shorthands for the rest of this function +    property_t<double>* scaling_in   = &_scaling_in.back(); +    property_t<double>* samp_rate_in = &_samp_rate_in.back(); +    property_t<double>* tick_rate_in = &_tick_rate_in.back(); +    property_t<std::string>* type_in = &_type_in.back(); + +    // Register them +    register_property(scaling_in); +    register_property(samp_rate_in); +    register_property(tick_rate_in); +    register_property(type_in); + +    // Add resolvers +    add_property_resolver({scaling_in}, {}, +        [&scaling_in = *scaling_in, chan, this]() { +            RFNOC_LOG_TRACE("Calling resolver for `scaling_in'@" << chan); +            if (scaling_in.is_valid()) { +                this->set_scale_factor(chan, scaling_in.get() / 32767.0); +            } +        }); + +    add_property_resolver({samp_rate_in}, {}, +        [&samp_rate_in = *samp_rate_in, chan, this]() { +            RFNOC_LOG_TRACE("Calling resolver for `samp_rate_in'@" << chan); +            if (samp_rate_in.is_valid()) { +                this->set_samp_rate(samp_rate_in.get()); +            } +        }); + +    add_property_resolver({tick_rate_in}, {}, +        [&tick_rate_in = *tick_rate_in, chan, this]() { +            RFNOC_LOG_TRACE("Calling resolver for `tick_rate_in'@" << chan); +            if (tick_rate_in.is_valid()) { +                this->set_tick_rate(tick_rate_in.get()); +            } +        }); +} diff --git a/host/lib/rfnoc/rfnoc_tx_streamer.cpp b/host/lib/rfnoc/rfnoc_tx_streamer.cpp new file mode 100644 index 000000000..82feeaf1f --- /dev/null +++ b/host/lib/rfnoc/rfnoc_tx_streamer.cpp @@ -0,0 +1,124 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include <uhd/rfnoc/defaults.hpp> +#include <uhdlib/rfnoc/node_accessor.hpp> +#include <uhdlib/rfnoc/rfnoc_tx_streamer.hpp> +#include <atomic> + +using namespace uhd; +using namespace uhd::rfnoc; + +const std::string STREAMER_ID = "TxStreamer"; +static std::atomic<uint64_t> streamer_inst_ctr; + +rfnoc_tx_streamer::rfnoc_tx_streamer(const size_t num_chans, +    const uhd::stream_args_t stream_args) +    : tx_streamer_impl<chdr_tx_data_xport>(num_chans, stream_args) +    , _unique_id(STREAMER_ID + "#" + std::to_string(streamer_inst_ctr++)) +    , _stream_args(stream_args) +{ +    // No block to which to forward properties or actions +    set_prop_forwarding_policy(forwarding_policy_t::DROP); +    set_action_forwarding_policy(forwarding_policy_t::DROP); + +    // Initialize properties +    _scaling_out.reserve(num_chans); +    _samp_rate_out.reserve(num_chans); +    _tick_rate_out.reserve(num_chans); +    _type_out.reserve(num_chans); + +    for (size_t i = 0; i < num_chans; i++) { +        _register_props(i, stream_args.otw_format); +    } + +    node_accessor_t node_accessor; +    node_accessor.init_props(this); +} + +std::string rfnoc_tx_streamer::get_unique_id() const +{ +    return _unique_id; +} + +size_t rfnoc_tx_streamer::get_num_input_ports() const +{ +    return 0; +} + +size_t rfnoc_tx_streamer::get_num_output_ports() const +{ +    return get_num_channels(); +} + +const uhd::stream_args_t& rfnoc_tx_streamer::get_stream_args() const +{ +    return _stream_args; +} + +bool rfnoc_tx_streamer::check_topology( +    const std::vector<size_t>& connected_inputs, +    const std::vector<size_t>& connected_outputs) +{ +    // Check that all channels are connected +    if (connected_outputs.size() != get_num_output_ports()) { +        return false; +    } + +    // Call base class to check that connections are valid +    return node_t::check_topology(connected_inputs, connected_outputs); +} + +void rfnoc_tx_streamer::_register_props(const size_t chan, +    const std::string& otw_format) +{ +    // Create actual properties and store them +    _scaling_out.push_back(property_t<double>( +        PROP_KEY_SCALING, {res_source_info::OUTPUT_EDGE, chan})); +    _samp_rate_out.push_back(property_t<double>( +        PROP_KEY_SAMP_RATE, {res_source_info::OUTPUT_EDGE, chan})); +    _tick_rate_out.push_back(property_t<double>( +        PROP_KEY_TICK_RATE, {res_source_info::OUTPUT_EDGE, chan})); +    _type_out.emplace_back(property_t<std::string>( +        PROP_KEY_TYPE, otw_format, {res_source_info::OUTPUT_EDGE, chan})); + +    // Give us some shorthands for the rest of this function +    property_t<double>* scaling_out   = &_scaling_out.back(); +    property_t<double>* samp_rate_out = &_samp_rate_out.back(); +    property_t<double>* tick_rate_out = &_tick_rate_out.back(); +    property_t<std::string>* type_out = &_type_out.back(); + +    // Register them +    register_property(scaling_out); +    register_property(samp_rate_out); +    register_property(tick_rate_out); +    register_property(type_out); + +    // Add resolvers +    add_property_resolver({scaling_out}, {}, +        [&scaling_out = *scaling_out, chan, this]() { +            RFNOC_LOG_TRACE("Calling resolver for `scaling_out'@" << chan); +            if (scaling_out.is_valid()) { +                this->set_scale_factor(chan, 32767.0 / scaling_out.get()); +            } +        }); + +    add_property_resolver({samp_rate_out}, {}, +        [&samp_rate_out = *samp_rate_out, chan, this]() { +            RFNOC_LOG_TRACE("Calling resolver for `samp_rate_out'@" << chan); +            if (samp_rate_out.is_valid()) { +                this->set_samp_rate(samp_rate_out.get()); +            } +        }); + +    add_property_resolver({tick_rate_out}, {}, +        [&tick_rate_out = *tick_rate_out, chan, this]() { +            RFNOC_LOG_TRACE("Calling resolver for `tick_rate_out'@" << chan); +            if (tick_rate_out.is_valid()) { +                this->set_tick_rate(tick_rate_out.get()); +            } +        }); +} diff --git a/host/tests/CMakeLists.txt b/host/tests/CMakeLists.txt index d6ad6d777..1cdb42b96 100644 --- a/host/tests/CMakeLists.txt +++ b/host/tests/CMakeLists.txt @@ -55,6 +55,8 @@ set(test_sources      fe_conn_test.cpp      rfnoc_node_test.cpp      link_test.cpp +    rx_streamer_test.cpp +    tx_streamer_test.cpp  )  set(benchmark_sources diff --git a/host/tests/common/mock_link.hpp b/host/tests/common/mock_link.hpp index 34ea15540..73a65916c 100644 --- a/host/tests/common/mock_link.hpp +++ b/host/tests/common/mock_link.hpp @@ -94,6 +94,14 @@ public:      }      /*! +     * Return the number of packets stored in the mock link. +     */ +    size_t get_num_packets() const +    { +        return _tx_mems.size(); +    } + +    /*!       * Retrieve the contents of a packet sent by the link. The link       * stores packets in a queue in the order they were sent.       */ diff --git a/host/tests/rfnoc_chdr_test.cpp b/host/tests/rfnoc_chdr_test.cpp index 417ed2f96..1c63d5976 100644 --- a/host/tests/rfnoc_chdr_test.cpp +++ b/host/tests/rfnoc_chdr_test.cpp @@ -222,3 +222,49 @@ BOOST_AUTO_TEST_CASE(chdr_strc_packet_no_swap_64)          std::cout << pyld.to_string();      }  } + +BOOST_AUTO_TEST_CASE(chdr_generic_packet_calculate_pyld_offset_64) +{ +    // Check calculation without timestamp +    auto test_pyld_offset = [](chdr_packet::uptr& pkt, +        const packet_type_t pkt_type, +        const size_t num_mdata) +    { +        uint64_t buff[MAX_BUF_SIZE_WORDS]; +        chdr_header header; +        header.set_pkt_type(pkt_type); +        header.set_num_mdata(num_mdata); + +        pkt->refresh(reinterpret_cast<void*>(buff), header, 0); + +        const size_t pyld_offset = pkt->calculate_payload_offset( +            pkt_type, num_mdata); + +        void* pyld_ptr = pkt->get_payload_ptr(); + +        const size_t non_pyld_bytes = static_cast<size_t>( +            reinterpret_cast<uint8_t*>(pyld_ptr) - +            reinterpret_cast<uint8_t*>(buff)); + +        BOOST_CHECK(pyld_offset == non_pyld_bytes); +    }; + +    { +        chdr_packet::uptr pkt = chdr64_be_factory.make_generic(); +        test_pyld_offset(pkt, PKT_TYPE_DATA_NO_TS, 0); +        test_pyld_offset(pkt, PKT_TYPE_DATA_NO_TS, 1); +        test_pyld_offset(pkt, PKT_TYPE_DATA_NO_TS, 2); +        test_pyld_offset(pkt, PKT_TYPE_DATA_WITH_TS, 0); +        test_pyld_offset(pkt, PKT_TYPE_DATA_WITH_TS, 1); +        test_pyld_offset(pkt, PKT_TYPE_DATA_WITH_TS, 2); +    } +    { +        chdr_packet::uptr pkt = chdr256_be_factory.make_generic(); +        test_pyld_offset(pkt, PKT_TYPE_DATA_NO_TS, 0); +        test_pyld_offset(pkt, PKT_TYPE_DATA_NO_TS, 1); +        test_pyld_offset(pkt, PKT_TYPE_DATA_NO_TS, 2); +        test_pyld_offset(pkt, PKT_TYPE_DATA_WITH_TS, 0); +        test_pyld_offset(pkt, PKT_TYPE_DATA_WITH_TS, 1); +        test_pyld_offset(pkt, PKT_TYPE_DATA_WITH_TS, 2); +    } +} diff --git a/host/tests/rx_streamer_test.cpp b/host/tests/rx_streamer_test.cpp new file mode 100644 index 000000000..cd4daf569 --- /dev/null +++ b/host/tests/rx_streamer_test.cpp @@ -0,0 +1,744 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include "../common/mock_link.hpp" +#include <uhdlib/transport/rx_streamer_impl.hpp> +#include <boost/make_shared.hpp> +#include <boost/test/unit_test.hpp> +#include <iostream> + +namespace uhd { namespace transport { + +/*! + * Contents of mock packet header + */ +struct mock_header_t +{ +    bool eob             = false; +    bool has_tsf         = false; +    uint64_t tsf         = 0; +    size_t payload_bytes = 0; +    bool ignore_seq      = true; +    size_t seq_num       = 0; +}; + +/*! + * Mock rx data xport which doesn't use I/O service, and just interacts with + * the link directly. + */ +class mock_rx_data_xport +{ +public: +    using uptr   = std::unique_ptr<mock_rx_data_xport>; +    using buff_t = uhd::transport::frame_buff; + +    //! Values extracted from received RX data packets +    struct packet_info_t +    { +        bool eob             = false; +        bool has_tsf         = false; +        uint64_t tsf         = 0; +        size_t payload_bytes = 0; +        const void* payload  = nullptr; +    }; + +    mock_rx_data_xport(mock_recv_link::sptr recv_link) : _recv_link(recv_link) {} + +    std::tuple<frame_buff::uptr, packet_info_t, bool> get_recv_buff( +        const int32_t timeout_ms) +    { +        frame_buff::uptr buff = _recv_link->get_recv_buff(timeout_ms); +        mock_header_t header  = *(reinterpret_cast<mock_header_t*>(buff->data())); + +        packet_info_t info; +        info.eob           = header.eob; +        info.has_tsf       = header.has_tsf; +        info.tsf           = header.tsf; +        info.payload_bytes = header.payload_bytes; +        info.payload = reinterpret_cast<uint8_t*>(buff->data()) + sizeof(mock_header_t); + +        const uint8_t* pkt_end = +            reinterpret_cast<uint8_t*>(buff->data()) + buff->packet_size(); +        const size_t pyld_pkt_len = +            pkt_end - reinterpret_cast<const uint8_t*>(info.payload); + +        if (pyld_pkt_len < info.payload_bytes) { +            _recv_link->release_recv_buff(std::move(buff)); +            throw uhd::value_error("Bad header or invalid packet length."); +        } + +        const bool seq_match = header.seq_num == _seq_num; +        const bool seq_error = !header.ignore_seq && !seq_match; +        _seq_num             = header.seq_num + 1; + +        return std::make_tuple(std::move(buff), info, seq_error); +    } + +    void release_recv_buff(frame_buff::uptr buff) +    { +        _recv_link->release_recv_buff(std::move(buff)); +    } + +    size_t get_max_payload_size() const +    { +        return _recv_link->get_recv_frame_size() - sizeof(packet_info_t); +    } + +private: +    mock_recv_link::sptr _recv_link; +    size_t _seq_num = 0; +}; + +/*! + * Mock rx streamer for testing + */ +class mock_rx_streamer : public rx_streamer_impl<mock_rx_data_xport> +{ +public: +    mock_rx_streamer(const size_t num_chans, const uhd::stream_args_t& stream_args) +        : rx_streamer_impl(num_chans, stream_args) +    { +    } + +    void issue_stream_cmd(const stream_cmd_t&) {} + +    void set_tick_rate(double rate) +    { +        rx_streamer_impl::set_tick_rate(rate); +    } + +    void set_samp_rate(double rate) +    { +        rx_streamer_impl::set_samp_rate(rate); +    } + +    void set_scale_factor(const size_t chan, const double scale_factor) +    { +        rx_streamer_impl::set_scale_factor(chan, scale_factor); +    } +}; + +}} // namespace uhd::transport + +using namespace uhd::transport; + +using rx_streamer = rx_streamer_impl<mock_rx_data_xport>; + +static const double TICK_RATE    = 100e6; +static const double SAMP_RATE    = 10e6; +static const size_t FRAME_SIZE   = 1000; +static const double SCALE_FACTOR = 2; + +/*! + * Helper functions + */ +static std::vector<mock_recv_link::sptr> make_links(const size_t num) +{ +    const mock_recv_link::link_params params = {FRAME_SIZE, 1}; + +    std::vector<mock_recv_link::sptr> links; + +    for (size_t i = 0; i < num; i++) { +        links.push_back(std::make_shared<mock_recv_link>(params)); +    } + +    return links; +} + +static boost::shared_ptr<mock_rx_streamer> make_rx_streamer( +    std::vector<mock_recv_link::sptr> recv_links, +    const std::string& host_format, +    const std::string& otw_format = "sc16") +{ +    const uhd::stream_args_t stream_args(host_format, otw_format); +    auto streamer = boost::make_shared<mock_rx_streamer>(recv_links.size(), stream_args); +    streamer->set_tick_rate(TICK_RATE); +    streamer->set_samp_rate(SAMP_RATE); + +    for (size_t i = 0; i < recv_links.size(); i++) { +        mock_rx_data_xport::uptr xport( +            std::make_unique<mock_rx_data_xport>(recv_links[i])); + +        streamer->set_scale_factor(i, SCALE_FACTOR); +        streamer->connect_channel(i, std::move(xport)); +    } + +    return streamer; +} + +static void push_back_recv_packet(mock_recv_link::sptr recv_link, +    mock_header_t header, +    size_t num_samps, +    uint16_t start_data = 0) +{ +    // Allocate buffer +    const size_t pyld_bytes = num_samps * sizeof(std::complex<uint16_t>); +    const size_t buff_len   = sizeof(header) + pyld_bytes; +    boost::shared_array<uint8_t> data(new uint8_t[buff_len]); + +    // Write header to buffer +    header.payload_bytes                            = pyld_bytes; +    *(reinterpret_cast<mock_header_t*>(data.get())) = header; + +    // Write data to buffer +    auto data_ptr = +        reinterpret_cast<std::complex<uint16_t>*>(data.get() + sizeof(header)); + +    for (size_t i = 0; i < num_samps; i++) { +        uint16_t val = (start_data + i) * 2; +        data_ptr[i]  = std::complex<uint16_t>(val, val + 1); +    } + +    // Push back buffer for link to recv +    recv_link->push_back_recv_packet(data, buff_len); +} + +/*! + * Tests + */ +BOOST_AUTO_TEST_CASE(test_recv_one_channel_one_packet) +{ +    const size_t NUM_PKTS_TO_TEST = 5; +    const std::string format("fc32"); + +    auto recv_links = make_links(1); +    auto streamer   = make_rx_streamer(recv_links, format); + +    const size_t num_samps = 20; +    std::vector<std::complex<float>> buff(num_samps); +    uhd::rx_metadata_t metadata; + +    for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) { +        const bool even_iteration = (i % 2 == 0); +        const bool odd_iteration  = (i % 2 != 0); +        mock_header_t header; +        header.eob     = even_iteration; +        header.has_tsf = odd_iteration; +        header.tsf     = i; +        push_back_recv_packet(recv_links[0], header, num_samps); + +        std::cout << "receiving packet " << i << std::endl; + +        size_t num_samps_ret = +            streamer->recv(buff.data(), buff.size(), metadata, 1.0, false); + +        BOOST_CHECK_EQUAL(num_samps_ret, num_samps); +        BOOST_CHECK_EQUAL(metadata.end_of_burst, even_iteration); +        BOOST_CHECK_EQUAL(metadata.has_time_spec, odd_iteration); +        BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), i); + +        for (size_t j = 0; j < num_samps; j++) { +            const auto value = +                std::complex<float>((j * 2) * SCALE_FACTOR, (j * 2 + 1) * SCALE_FACTOR); +            BOOST_CHECK_EQUAL(value, buff[j]); +        } +    } +} + +BOOST_AUTO_TEST_CASE(test_recv_one_channel_multi_packet) +{ +    const size_t NUM_BUFFS_TO_TEST = 5; +    const std::string format("fc64"); + +    auto recv_links = make_links(1); +    auto streamer   = make_rx_streamer(recv_links, format); + +    const size_t spp       = streamer->get_max_num_samps(); +    const size_t num_samps = spp * 4; +    std::vector<std::complex<double>> buff(num_samps); +    uhd::rx_metadata_t metadata; + +    for (size_t i = 0; i < NUM_BUFFS_TO_TEST; i++) { +        mock_header_t header; +        header.eob     = false; +        header.has_tsf = true; +        header.tsf     = i; + +        size_t samps_written = 0; +        while (samps_written < num_samps) { +            size_t samps_to_write = std::min(num_samps - samps_written, spp); +            push_back_recv_packet(recv_links[0], header, samps_to_write, samps_written); +            samps_written += samps_to_write; +        } + +        std::cout << "receiving packet " << i << std::endl; + +        size_t num_samps_ret = +            streamer->recv(buff.data(), buff.size(), metadata, 1.0, false); + +        BOOST_CHECK_EQUAL(num_samps_ret, num_samps); +        BOOST_CHECK_EQUAL(metadata.end_of_burst, false); +        BOOST_CHECK_EQUAL(metadata.has_time_spec, true); +        BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), i); + +        for (size_t j = 0; j < num_samps; j++) { +            const auto value = +                std::complex<double>((j * 2) * SCALE_FACTOR, (j * 2 + 1) * SCALE_FACTOR); +            BOOST_CHECK_EQUAL(value, buff[j]); +        } +    } +} + +BOOST_AUTO_TEST_CASE(test_recv_one_channel_multi_packet_with_eob) +{ +    // EOB should terminate a multi-packet recv, test that it does +    const std::string format("sc16"); + +    auto recv_links = make_links(1); +    auto streamer   = make_rx_streamer(recv_links, format); + +    const size_t num_packets = 4; +    const size_t spp         = streamer->get_max_num_samps(); +    const size_t num_samps   = spp * num_packets; +    std::vector<std::complex<double>> buff(num_samps); +    uhd::rx_metadata_t metadata; + +    // Queue 4 packets, with eob set in every other packet +    for (size_t i = 0; i < num_packets; i++) { +        mock_header_t header; +        header.has_tsf = false; +        header.eob     = (i % 2) != 0; +        push_back_recv_packet(recv_links[0], header, spp); +    } + +    // Now call recv and check that eob terminates a recv call +    for (size_t i = 0; i < num_packets / 2; i++) { +        std::cout << "receiving packet " << i << std::endl; + +        size_t num_samps_ret = +            streamer->recv(buff.data(), buff.size(), metadata, 1.0, false); + +        BOOST_CHECK_EQUAL(num_samps_ret, spp * 2); +        BOOST_CHECK_EQUAL(metadata.end_of_burst, true); +        BOOST_CHECK_EQUAL(metadata.has_time_spec, false); +    } +} + +BOOST_AUTO_TEST_CASE(test_recv_two_channel_one_packet) +{ +    const size_t NUM_PKTS_TO_TEST = 5; +    const std::string format("sc16"); + +    const size_t num_chans = 2; + +    auto recv_links = make_links(num_chans); +    auto streamer   = make_rx_streamer(recv_links, format); + +    const size_t num_samps = 20; + +    std::vector<std::vector<std::complex<uint16_t>>> buffer(num_chans); +    std::vector<void*> buffers; +    for (size_t i = 0; i < num_chans; i++) { +        buffer[i].resize(num_samps); +        buffers.push_back(&buffer[i].front()); +    } + +    uhd::rx_metadata_t metadata; + +    for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) { +        const bool even_iteration = (i % 2 == 0); +        const bool odd_iteration  = (i % 2 != 0); +        mock_header_t header; +        header.eob     = even_iteration; +        header.has_tsf = odd_iteration; +        header.tsf     = i; + +        size_t samps_pushed = 0; +        for (size_t ch = 0; ch < num_chans; ch++) { +            push_back_recv_packet(recv_links[ch], header, num_samps, samps_pushed); +            samps_pushed += num_samps; +        } + +        std::cout << "receiving packet " << i << std::endl; + +        size_t num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false); + +        BOOST_CHECK_EQUAL(num_samps_ret, num_samps); +        BOOST_CHECK_EQUAL(metadata.end_of_burst, even_iteration); +        BOOST_CHECK_EQUAL(metadata.has_time_spec, odd_iteration); +        BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), i); + +        size_t samps_checked = 0; +        for (size_t ch = 0; ch < num_chans; ch++) { +            for (size_t samp = 0; samp < num_samps; samp++) { +                const size_t n   = samps_checked + samp; +                const auto value = std::complex<uint16_t>((n * 2), (n * 2 + 1)); +                BOOST_CHECK_EQUAL(value, buffer[ch][samp]); +            } +            samps_checked += num_samps; +        } +    } +} + +BOOST_AUTO_TEST_CASE(test_recv_one_channel_packet_fragment) +{ +    const size_t NUM_PKTS_TO_TEST = 5; +    const std::string format("fc32"); + +    auto recv_links = make_links(1); +    auto streamer   = make_rx_streamer(recv_links, format); + +    // Push back five packets, then read them 1/4 of a packet at a time +    const size_t spp              = streamer->get_max_num_samps(); +    const size_t reads_per_packet = 4; +    const size_t num_samps        = spp / reads_per_packet; +    for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) { +        mock_header_t header; +        header.eob     = true; +        header.has_tsf = true; +        header.tsf     = 0; +        push_back_recv_packet(recv_links[0], header, num_samps * reads_per_packet); +    } + +    std::vector<std::complex<float>> buff(num_samps); +    uhd::rx_metadata_t metadata; + +    for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) { +        std::cout << "receiving packet " << i << std::endl; + +        size_t total_samps_read = 0; +        for (size_t j = 0; j < reads_per_packet; j++) { +            size_t num_samps_ret = +                streamer->recv(buff.data(), buff.size(), metadata, 1.0, false); + +            BOOST_CHECK_EQUAL(num_samps_ret, num_samps); +            BOOST_CHECK_EQUAL(metadata.has_time_spec, true); +            BOOST_CHECK_EQUAL(metadata.end_of_burst, true); +            BOOST_CHECK_EQUAL(metadata.more_fragments, j != reads_per_packet - 1); +            BOOST_CHECK_EQUAL(metadata.fragment_offset, total_samps_read); + +            const size_t ticks_per_sample = static_cast<size_t>(TICK_RATE / SAMP_RATE); +            const size_t expected_ticks   = ticks_per_sample * total_samps_read; +            BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), expected_ticks); + +            for (size_t samp = 0; samp < num_samps; samp++) { +                const size_t pkt_idx = samp + total_samps_read; +                const auto value     = std::complex<float>( +                    (pkt_idx * 2) * SCALE_FACTOR, (pkt_idx * 2 + 1) * SCALE_FACTOR); +                BOOST_CHECK_EQUAL(value, buff[samp]); +            } + +            total_samps_read += num_samps_ret; +        } +    } +} + +BOOST_AUTO_TEST_CASE(test_recv_seq_error) +{ +    // Test that when we get a sequence error the error is returned in the +    // metadata with a time spec that corresponds to the time spec of the +    // last sample in the previous packet plus one sample clock. Test that +    // the packet that causes the sequence error is not discarded. +    const size_t NUM_PKTS_TO_TEST = 2; +    const std::string format("fc32"); + +    auto recv_links = make_links(1); +    auto streamer   = make_rx_streamer(recv_links, format); + +    const size_t num_samps = 20; +    std::vector<std::complex<float>> buff(num_samps); +    uhd::rx_metadata_t metadata; +    size_t seq_num = 0; +    size_t tsf     = 0; + +    for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) { +        mock_header_t header; +        header.eob        = false; +        header.has_tsf    = true; +        header.ignore_seq = false; + +        // Push back three packets but skip a seq_num after the second +        header.seq_num = seq_num++; +        header.tsf     = tsf; +        push_back_recv_packet(recv_links[0], header, num_samps); + +        tsf += num_samps; +        header.seq_num = seq_num++; +        header.tsf     = tsf; +        push_back_recv_packet(recv_links[0], header, num_samps); + +        seq_num++; // dropped packet +        tsf += num_samps; + +        header.seq_num = seq_num++; +        header.tsf     = tsf; +        push_back_recv_packet(recv_links[0], header, num_samps); + +        // First two reads should succeed +        size_t num_samps_ret = +            streamer->recv(buff.data(), buff.size(), metadata, 1.0, false); +        BOOST_CHECK_EQUAL(num_samps_ret, num_samps); + +        num_samps_ret = streamer->recv(buff.data(), buff.size(), metadata, 1.0, false); +        BOOST_CHECK_EQUAL(num_samps_ret, num_samps); +        size_t prev_tsf     = metadata.time_spec.to_ticks(TICK_RATE); +        size_t expected_tsf = prev_tsf + num_samps * (TICK_RATE / SAMP_RATE); + +        // Third read should be a sequence error +        num_samps_ret = streamer->recv(buff.data(), buff.size(), metadata, 1.0, false); +        BOOST_CHECK_EQUAL(num_samps_ret, 0); +        BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_OVERFLOW); +        BOOST_CHECK_EQUAL(metadata.out_of_sequence, true); +        size_t metadata_tsf = metadata.time_spec.to_ticks(TICK_RATE); +        BOOST_CHECK_EQUAL(metadata_tsf, expected_tsf); + +        // Next read should succeed +        num_samps_ret = streamer->recv(buff.data(), buff.size(), metadata, 1.0, false); +        BOOST_CHECK_EQUAL(num_samps_ret, num_samps); +        BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_NONE); +        BOOST_CHECK_EQUAL(metadata.out_of_sequence, false); +    } +} + +BOOST_AUTO_TEST_CASE(test_recv_bad_packet) +{ +    // Test that when we receive a packet with invalid chdr header or length +    // the streamer returns the correct error in meatadata. +    auto push_back_bad_packet = [](mock_recv_link::sptr recv_link) { +        mock_header_t header; +        header.payload_bytes = 1000; + +        // Allocate a buffer that is too small for the payload +        const size_t buff_len = 100; +        boost::shared_array<uint8_t> data(new uint8_t[buff_len]); + +        // Write header to buffer +        *(reinterpret_cast<mock_header_t*>(data.get())) = header; + +        // Push back buffer for link to recv +        recv_link->push_back_recv_packet(data, buff_len); +    }; + +    const std::string format("fc32"); + +    auto recv_links = make_links(1); +    auto streamer   = make_rx_streamer(recv_links, format); + +    const size_t num_samps = 20; +    std::vector<std::complex<float>> buff(num_samps); +    uhd::rx_metadata_t metadata; + +    mock_header_t header; + +    // Push back a regular packet +    push_back_recv_packet(recv_links[0], header, num_samps); + +    // Push back a bad packet +    push_back_bad_packet(recv_links[0]); + +    // Push back another regular packet +    push_back_recv_packet(recv_links[0], header, num_samps); + +    // First read should succeed +    size_t num_samps_ret = streamer->recv(buff.data(), buff.size(), metadata, 1.0, false); +    BOOST_CHECK_EQUAL(num_samps_ret, num_samps); + +    // Second read should be an error +    num_samps_ret = streamer->recv(buff.data(), buff.size(), metadata, 1.0, false); +    BOOST_CHECK_EQUAL(num_samps_ret, 0); +    BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_BAD_PACKET); + +    // Third read should succeed +    num_samps_ret = streamer->recv(buff.data(), buff.size(), metadata, 1.0, false); +    BOOST_CHECK_EQUAL(num_samps_ret, num_samps); +    BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_NONE); +} + +BOOST_AUTO_TEST_CASE(test_recv_multi_channel_no_tsf) +{ +    // Test that we can receive packets without tsf. Start by pushing +    // a packet with a tsf followed by a few packets without. +    const size_t NUM_PKTS_TO_TEST = 6; +    const std::string format("fc64"); + +    const size_t num_chans = 10; + +    auto recv_links = make_links(num_chans); +    auto streamer   = make_rx_streamer(recv_links, format); + +    const size_t num_samps = 21; + +    std::vector<std::vector<std::complex<double>>> buffer(num_chans); +    std::vector<void*> buffers; +    for (size_t i = 0; i < num_chans; i++) { +        buffer[i].resize(num_samps); +        buffers.push_back(&buffer[i].front()); +    } + +    uhd::rx_metadata_t metadata; + +    for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) { +        mock_header_t header; +        header.eob     = (i == NUM_PKTS_TO_TEST - 1); +        header.has_tsf = (i == 0); +        header.tsf     = 500; + +        for (size_t ch = 0; ch < num_chans; ch++) { +            push_back_recv_packet(recv_links[ch], header, num_samps); +        } + +        size_t num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false); + +        BOOST_CHECK_EQUAL(num_samps_ret, num_samps); +        BOOST_CHECK_EQUAL(metadata.end_of_burst, i == NUM_PKTS_TO_TEST - 1); +        BOOST_CHECK_EQUAL(metadata.has_time_spec, i == 0); +    } +} + +BOOST_AUTO_TEST_CASE(test_recv_multi_channel_seq_error) +{ +    // Test that the streamer handles dropped packets correctly by injecting +    // a sequence error in one channel. The streamer should discard +    // corresponding packets from all other channels. +    const std::string format("fc64"); + +    const size_t num_chans = 100; + +    auto recv_links = make_links(num_chans); +    auto streamer   = make_rx_streamer(recv_links, format); + +    const size_t num_samps = 99; + +    std::vector<std::vector<std::complex<double>>> buffer(num_chans); +    std::vector<void*> buffers; +    for (size_t i = 0; i < num_chans; i++) { +        buffer[i].resize(num_samps); +        buffers.push_back(&buffer[i].front()); +    } + +    for (size_t ch = 0; ch < num_chans; ch++) { +        mock_header_t header; +        header.eob        = false; +        header.has_tsf    = true; +        header.tsf        = 0; +        header.ignore_seq = false; +        header.seq_num    = 0; + +        // Drop a packet from an arbitrary channel right at the start +        if (ch != num_chans / 2) { +            push_back_recv_packet(recv_links[ch], header, num_samps); +        } + +        // Add a regular packet to check the streamer drops the first +        header.seq_num++; +        header.tsf++; +        push_back_recv_packet(recv_links[ch], header, num_samps); + +        // Drop a packet from the first channel +        header.seq_num++; +        header.tsf++; +        if (ch != 0) { +            push_back_recv_packet(recv_links[ch], header, num_samps); +        } + +        // Add a regular packet +        header.seq_num++; +        header.tsf++; +        push_back_recv_packet(recv_links[ch], header, num_samps); + +        // Drop a few packets from the last channel +        for (size_t j = 0; j < 10; j++) { +            header.seq_num++; +            header.tsf++; +            if (ch != num_chans - 1) { +                push_back_recv_packet(recv_links[ch], header, num_samps); +            } +        } + +        // Add a regular packet +        header.seq_num++; +        header.tsf++; +        push_back_recv_packet(recv_links[ch], header, num_samps); +    } + +    uhd::rx_metadata_t metadata; + +    // First recv should result in error +    size_t num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false); +    BOOST_CHECK_EQUAL(num_samps_ret, 0); +    BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_OVERFLOW); +    BOOST_CHECK_EQUAL(metadata.out_of_sequence, true); + +    // Packet with tsf == 1 should be returned next +    num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false); +    BOOST_CHECK_EQUAL(num_samps_ret, num_samps); +    BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), 1); + +    // Next recv should result in error +    num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false); +    BOOST_CHECK_EQUAL(num_samps_ret, 0); +    BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_OVERFLOW); +    BOOST_CHECK_EQUAL(metadata.out_of_sequence, true); + +    // Packet with tsf == 3 should be returned next +    num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false); +    BOOST_CHECK_EQUAL(num_samps_ret, num_samps); +    BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), 3); + +    // Next recv should result in error +    num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false); +    BOOST_CHECK_EQUAL(num_samps_ret, 0); +    BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_OVERFLOW); +    BOOST_CHECK_EQUAL(metadata.out_of_sequence, true); + +    // Packet with tsf == 14 should be returned next +    num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false); +    BOOST_CHECK_EQUAL(num_samps_ret, num_samps); +    BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), 14); +} + +BOOST_AUTO_TEST_CASE(test_recv_alignment_error) +{ +    // Test that the alignment procedure returns an alignment error if it can't +    // time align packets. +    const std::string format("fc64"); + +    const size_t num_chans = 4; + +    auto recv_links = make_links(num_chans); +    auto streamer   = make_rx_streamer(recv_links, format); + +    const size_t num_samps = 2; + +    std::vector<std::vector<std::complex<double>>> buffer(num_chans); +    std::vector<void*> buffers; +    for (size_t i = 0; i < num_chans; i++) { +        buffer[i].resize(num_samps); +        buffers.push_back(&buffer[i].front()); +    } + +    uhd::rx_metadata_t metadata; + +    mock_header_t header; +    header.eob     = true; +    header.has_tsf = true; +    header.tsf     = 500; + +    for (size_t ch = 0; ch < num_chans; ch++) { +        push_back_recv_packet(recv_links[ch], header, num_samps); +    } + +    size_t num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false); + +    BOOST_CHECK_EQUAL(num_samps_ret, num_samps); +    BOOST_CHECK_EQUAL(metadata.end_of_burst, true); +    BOOST_CHECK_EQUAL(metadata.has_time_spec, true); + +    for (size_t pkt = 0; pkt < uhd::transport::ALIGNMENT_FAILURE_THRESHOLD; pkt++) { +        header.tsf = header.tsf + num_samps; +        for (size_t ch = 0; ch < num_chans; ch++) { +            if (ch == num_chans - 1) { +                // Misalign this time stamp +                header.tsf += 1; +            } +            push_back_recv_packet(recv_links[ch], header, num_samps); +        } +    } + +    num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false); +    BOOST_CHECK_EQUAL(num_samps_ret, 0); +    BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_ALIGNMENT); +} diff --git a/host/tests/tx_streamer_test.cpp b/host/tests/tx_streamer_test.cpp new file mode 100644 index 000000000..cb07cffad --- /dev/null +++ b/host/tests/tx_streamer_test.cpp @@ -0,0 +1,393 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include "../common/mock_link.hpp" +#include <uhdlib/transport/tx_streamer_impl.hpp> +#include <boost/make_shared.hpp> +#include <boost/test/unit_test.hpp> +#include <iostream> + +namespace uhd { namespace transport { + +/*! + * Mock tx data xport which doesn't use I/O service, and just interacts with + * the link directly. Transport copies packet info directly into the frame + * buffer. + */ +class mock_tx_data_xport +{ +public: +    using uptr   = std::unique_ptr<mock_tx_data_xport>; +    using buff_t = uhd::transport::frame_buff; + +    struct packet_info_t +    { +        bool eob             = false; +        bool has_tsf         = false; +        uint64_t tsf         = 0; +        size_t payload_bytes = 0; +    }; + +    mock_tx_data_xport(mock_send_link::sptr send_link) : _send_link(send_link) {} + +    buff_t::uptr get_send_buff(const int32_t timeout_ms) +    { +        return _send_link->get_send_buff(timeout_ms); +    } + +    std::pair<void*, size_t> write_packet_header( +        buff_t::uptr& buff, const packet_info_t& info) +    { +        uint8_t* data                             = static_cast<uint8_t*>(buff->data()); +        *(reinterpret_cast<packet_info_t*>(data)) = info; +        return std::make_pair(data + sizeof(info), sizeof(info) + info.payload_bytes); +    } + +    void release_send_buff(buff_t::uptr buff) +    { +        _send_link->release_send_buff(std::move(buff)); +    } + +    size_t get_max_payload_size() const +    { +        return _send_link->get_send_frame_size() - sizeof(packet_info_t); +        ; +    } + +private: +    mock_send_link::sptr _send_link; +}; + +/*! + * Mock tx streamer for testing + */ +class mock_tx_streamer : public tx_streamer_impl<mock_tx_data_xport> +{ +public: +    mock_tx_streamer(const size_t num_chans, const uhd::stream_args_t& stream_args) +        : tx_streamer_impl(num_chans, stream_args) +    { +    } + +    void set_tick_rate(double rate) +    { +        tx_streamer_impl::set_tick_rate(rate); +    } + +    void set_samp_rate(double rate) +    { +        tx_streamer_impl::set_samp_rate(rate); +    } + +    void set_scale_factor(const size_t chan, const double scale_factor) +    { +        tx_streamer_impl::set_scale_factor(chan, scale_factor); +    } +}; + +}} // namespace uhd::transport + +using namespace uhd::transport; + +using tx_streamer = tx_streamer_impl<mock_tx_data_xport>; + +static const double TICK_RATE    = 100e6; +static const double SAMP_RATE    = 10e6; +static const size_t FRAME_SIZE   = 1000; +static const double SCALE_FACTOR = 2; + +/*! + * Helper functions + */ +static std::vector<mock_send_link::sptr> make_links(const size_t num) +{ +    const mock_send_link::link_params params = {FRAME_SIZE, 1}; + +    std::vector<mock_send_link::sptr> links; + +    for (size_t i = 0; i < num; i++) { +        links.push_back(std::make_shared<mock_send_link>(params)); +    } + +    return links; +} + +static boost::shared_ptr<mock_tx_streamer> make_tx_streamer( +    std::vector<mock_send_link::sptr> send_links, const std::string& format) +{ +    const uhd::stream_args_t stream_args(format, "sc16"); +    auto streamer = boost::make_shared<mock_tx_streamer>(send_links.size(), stream_args); +    streamer->set_tick_rate(TICK_RATE); +    streamer->set_samp_rate(SAMP_RATE); + +    for (size_t i = 0; i < send_links.size(); i++) { +        mock_tx_data_xport::uptr xport( +            std::make_unique<mock_tx_data_xport>(send_links[i])); + +        streamer->set_scale_factor(i, SCALE_FACTOR); +        streamer->connect_channel(i, std::move(xport)); +    } + +    return streamer; +} + +std::tuple<mock_tx_data_xport::packet_info_t, std::complex<uint16_t>*, size_t, boost::shared_array<uint8_t>> +pop_send_packet(mock_send_link::sptr send_link) +{ +    auto packet = send_link->pop_send_packet(); + +    const size_t packet_samps = +        (packet.second - sizeof(mock_tx_data_xport::packet_info_t)) +        / sizeof(std::complex<uint16_t>); + +    uint8_t* buff_ptr = packet.first.get(); +    auto info         = *(reinterpret_cast<mock_tx_data_xport::packet_info_t*>(buff_ptr)); + +    std::complex<uint16_t>* data = reinterpret_cast<std::complex<uint16_t>*>( +        buff_ptr + sizeof(mock_tx_data_xport::packet_info_t)); + +    return std::make_tuple(info, data, packet_samps, packet.first); +} + +/*! + * Tests + */ +BOOST_AUTO_TEST_CASE(test_send_one_channel_one_packet) +{ +    const size_t NUM_PKTS_TO_TEST = 30; +    const std::string format("fc32"); + +    auto send_links = make_links(1); +    auto streamer   = make_tx_streamer(send_links, format); + +    // Allocate metadata +    uhd::tx_metadata_t metadata; +    metadata.has_time_spec = true; +    metadata.time_spec     = uhd::time_spec_t(0.0); + +    // Allocate buffer and write data +    std::vector<std::complex<float>> buff(20); +    for (size_t i = 0; i < buff.size(); i++) { +        buff[i] = std::complex<float>(i * 2, i * 2 + 1); +    } + +    // Send packets and check data +    size_t num_accum_samps = 0; + +    for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) { +        std::cout << "sending packet " << i << std::endl; + +        // Vary num_samps for each packet +        const size_t num_samps = 10 + i % 10; +        metadata.end_of_burst  = (i == NUM_PKTS_TO_TEST - 1); +        const size_t num_sent  = streamer->send(&buff.front(), num_samps, metadata, 1.0); +        BOOST_CHECK_EQUAL(num_sent, num_samps); +        metadata.time_spec += uhd::time_spec_t(0, num_sent, SAMP_RATE); + +        mock_tx_data_xport::packet_info_t info; +        std::complex<uint16_t>* data; +        size_t packet_samps; +        boost::shared_array<uint8_t> frame_buff; + +        std::tie(info, data, packet_samps, frame_buff) = pop_send_packet(send_links[0]); +        BOOST_CHECK_EQUAL(num_samps, packet_samps); + +        // Check data +        for (size_t j = 0; j < num_samps; j++) { +            const std::complex<uint16_t> value( +                (j * 2) * SCALE_FACTOR, (j * 2 + 1) * SCALE_FACTOR); +            BOOST_CHECK_EQUAL(value, data[j]); +        } + +        BOOST_CHECK_EQUAL(num_samps, info.payload_bytes / sizeof(std::complex<uint16_t>)); +        BOOST_CHECK(info.has_tsf); +        BOOST_CHECK_EQUAL(info.tsf, num_accum_samps * TICK_RATE / SAMP_RATE); +        BOOST_CHECK_EQUAL(info.eob, i == NUM_PKTS_TO_TEST - 1); +        num_accum_samps += num_samps; +    } +} + +BOOST_AUTO_TEST_CASE(test_send_one_channel_multi_packet) +{ +    const size_t NUM_BUFFS_TO_TEST = 5; +    const std::string format("fc64"); + +    auto send_links = make_links(1); +    auto streamer   = make_tx_streamer(send_links, format); + +    // Allocate metadata +    uhd::tx_metadata_t metadata; +    metadata.has_time_spec = true; +    metadata.time_spec     = uhd::time_spec_t(0.0); + +    // Allocate buffer and write data +    const size_t spp       = streamer->get_max_num_samps(); +    const size_t num_samps = spp * 4; +    std::vector<std::complex<double>> buff(num_samps); +    for (size_t i = 0; i < buff.size(); i++) { +        buff[i] = std::complex<double>(i * 2, i * 2 + 1); +    } + +    // Send packets and check data +    size_t num_accum_samps = 0; + +    for (size_t i = 0; i < NUM_BUFFS_TO_TEST; i++) { +        std::cout << "sending packet " << i << std::endl; + +        metadata.end_of_burst = true; +        const size_t num_sent = streamer->send(&buff.front(), num_samps, metadata, 1.0); +        BOOST_CHECK_EQUAL(num_sent, num_samps); +        metadata.time_spec += uhd::time_spec_t(0, num_sent, SAMP_RATE); + +        size_t samps_checked = 0; + +        while (samps_checked < num_samps) { +            mock_tx_data_xport::packet_info_t info; +            std::complex<uint16_t>* data; +            size_t packet_samps; +            boost::shared_array<uint8_t> frame_buff; + +            std::tie(info, data, packet_samps, frame_buff) = pop_send_packet(send_links[0]); + +            for (size_t j = 0; j < packet_samps; j++) { +                const size_t n = j + samps_checked; +                const std::complex<uint16_t> value( +                    (n * 2) * SCALE_FACTOR, (n * 2 + 1) * SCALE_FACTOR); +                BOOST_CHECK_EQUAL(value, data[j]); +            } + +            BOOST_CHECK_EQUAL( +                packet_samps, info.payload_bytes / sizeof(std::complex<uint16_t>)); +            BOOST_CHECK(info.has_tsf); +            BOOST_CHECK_EQUAL( +                info.tsf, (num_accum_samps + samps_checked) * TICK_RATE / SAMP_RATE); +            samps_checked += packet_samps; + +            BOOST_CHECK_EQUAL(info.eob, samps_checked == num_samps); +        } + +        BOOST_CHECK_EQUAL(samps_checked, num_samps); +        num_accum_samps += samps_checked; +    } +} + +BOOST_AUTO_TEST_CASE(test_send_two_channel_one_packet) +{ +    const size_t NUM_PKTS_TO_TEST = 30; +    const std::string format("sc16"); + +    auto send_links = make_links(2); +    auto streamer   = make_tx_streamer(send_links, format); + +    // Allocate metadata +    uhd::tx_metadata_t metadata; +    metadata.has_time_spec = true; +    metadata.time_spec     = uhd::time_spec_t(0.0); + +    // Allocate buffer and write data +    std::vector<std::complex<uint16_t>> buff(20); +    for (size_t i = 0; i < buff.size(); i++) { +        buff[i] = std::complex<uint16_t>(i * 2, i * 2 + 1); +    } +    std::vector<void*> buffs; +    for (size_t ch = 0; ch < 2; ch++) { +        buffs.push_back(buff.data()); // same buffer for each channel +    } + +    // Send packets and check data +    size_t num_accum_samps = 0; + +    for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) { +        std::cout << "sending packet " << i << std::endl; + +        // Vary num_samps for each packet +        const size_t num_samps = 10 + i % 10; +        metadata.end_of_burst  = (i == NUM_PKTS_TO_TEST - 1); +        const size_t num_sent  = streamer->send(buffs, num_samps, metadata, 1.0); +        BOOST_CHECK_EQUAL(num_sent, num_samps); +        metadata.time_spec += uhd::time_spec_t(0, num_sent, SAMP_RATE); + +        for (size_t ch = 0; ch < 2; ch++) { +            mock_tx_data_xport::packet_info_t info; +            std::complex<uint16_t>* data; +            size_t packet_samps; +            boost::shared_array<uint8_t> frame_buff; + +            std::tie(info, data, packet_samps, frame_buff) = pop_send_packet(send_links[ch]); +            BOOST_CHECK_EQUAL(num_samps, packet_samps); + +            // Check data +            for (size_t j = 0; j < num_samps; j++) { +                const std::complex<uint16_t> value((j * 2), (j * 2 + 1)); +                BOOST_CHECK_EQUAL(value, data[j]); +            } + +            BOOST_CHECK_EQUAL( +                num_samps, info.payload_bytes / sizeof(std::complex<uint16_t>)); +            BOOST_CHECK(info.has_tsf); +            BOOST_CHECK_EQUAL(info.tsf, num_accum_samps * TICK_RATE / SAMP_RATE); +            BOOST_CHECK_EQUAL(info.eob, i == NUM_PKTS_TO_TEST - 1); +        } +        num_accum_samps += num_samps; +    } +} + +BOOST_AUTO_TEST_CASE(test_meta_data_cache) +{ +    auto send_links = make_links(1); +    auto streamer   = make_tx_streamer(send_links, "fc32"); + +    // Allocate metadata +    uhd::tx_metadata_t metadata; +    metadata.start_of_burst = true; +    metadata.end_of_burst   = true; +    metadata.has_time_spec  = true; +    metadata.time_spec      = uhd::time_spec_t(0.0); + +    // Allocate buffer and write data +    std::vector<std::complex<float>> buff(20); + +    size_t num_sent = streamer->send(buff.data(), 0, metadata, 1.0); +    BOOST_CHECK_EQUAL(send_links[0]->get_num_packets(), 0); +    BOOST_CHECK_EQUAL(num_sent, 0); +    uhd::tx_metadata_t metadata2; +    num_sent = streamer->send(buff.data(), 10, metadata2, 1.0); + +    mock_tx_data_xport::packet_info_t info; +    size_t packet_samps; +    boost::shared_array<uint8_t> frame_buff; + +    std::tie(info, std::ignore, packet_samps, frame_buff) = pop_send_packet(send_links[0]); +    BOOST_CHECK_EQUAL(packet_samps, num_sent); +    BOOST_CHECK(info.has_tsf); +    BOOST_CHECK(info.eob); +} + +BOOST_AUTO_TEST_CASE(test_spp) +{ +    // Test the spp calculation when it is limited by the stream args +    { +        auto send_links = make_links(1); +        uhd::stream_args_t stream_args("fc64", "sc16"); +        stream_args.args["spp"] = std::to_string(10); +        auto streamer = boost::make_shared<mock_tx_streamer>(send_links.size(), stream_args); +        mock_tx_data_xport::uptr xport(std::make_unique<mock_tx_data_xport>(send_links[0])); +        streamer->connect_channel(0, std::move(xport)); +        BOOST_CHECK_EQUAL(streamer->get_max_num_samps(), 10); +    } + +    // Test the spp calculation when it is limited by the frame size +    { +        auto send_links = make_links(1); +        uhd::stream_args_t stream_args("fc64", "sc16"); +        stream_args.args["spp"] = std::to_string(10000); +        auto streamer = boost::make_shared<mock_tx_streamer>(send_links.size(), stream_args); +        mock_tx_data_xport::uptr xport(std::make_unique<mock_tx_data_xport>(send_links[0])); +        const size_t max_pyld = xport->get_max_payload_size(); +        streamer->connect_channel(0, std::move(xport)); +        BOOST_CHECK_EQUAL(streamer->get_max_num_samps(), max_pyld / sizeof(std::complex<uint16_t>)); +    } +} | 
