diff options
Diffstat (limited to 'host/lib/include/uhdlib/rfnoc')
| -rw-r--r-- | host/lib/include/uhdlib/rfnoc/chdr_packet.hpp | 9 | ||||
| -rw-r--r-- | host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp | 481 | ||||
| -rw-r--r-- | host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp | 550 | ||||
| -rw-r--r-- | host/lib/include/uhdlib/rfnoc/chdr_types.hpp | 2 | ||||
| -rw-r--r-- | host/lib/include/uhdlib/rfnoc/graph_stream_manager.hpp | 32 | ||||
| -rw-r--r-- | host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp | 13 | ||||
| -rw-r--r-- | host/lib/include/uhdlib/rfnoc/mb_iface.hpp | 44 | ||||
| -rw-r--r-- | host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp | 21 | ||||
| -rw-r--r-- | host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp | 95 | ||||
| -rw-r--r-- | host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp | 90 | ||||
| -rw-r--r-- | host/lib/include/uhdlib/rfnoc/rx_flow_ctrl_state.hpp | 130 | ||||
| -rw-r--r-- | host/lib/include/uhdlib/rfnoc/tx_flow_ctrl_state.hpp | 99 | 
12 files changed, 1530 insertions, 36 deletions
diff --git a/host/lib/include/uhdlib/rfnoc/chdr_packet.hpp b/host/lib/include/uhdlib/rfnoc/chdr_packet.hpp index 770c6cf6f..cc729de6c 100644 --- a/host/lib/include/uhdlib/rfnoc/chdr_packet.hpp +++ b/host/lib/include/uhdlib/rfnoc/chdr_packet.hpp @@ -114,6 +114,15 @@ public:       */      virtual void* get_payload_ptr() = 0; +    /*! Return the payload offset in bytes for a given type and num_mdata +     * +     * \param pkt_type The packet type for calculation +     * \param num_mdata The number of metadata words for calculation +     * \return The offset of the payload in a packet with the given params +     */ +    virtual size_t calculate_payload_offset(const packet_type_t pkt_type, +        const uint8_t num_mdata = 0) const = 0; +      //! Shortcut to return the const metadata pointer cast as a specific type      template <typename data_t>      inline const data_t* get_mdata_const_ptr_as() const diff --git a/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp b/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp new file mode 100644 index 000000000..69dceebe1 --- /dev/null +++ b/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp @@ -0,0 +1,481 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_CHDR_RX_DATA_XPORT_HPP +#define INCLUDED_LIBUHD_CHDR_RX_DATA_XPORT_HPP + +#include <uhd/config.hpp> +#include <uhdlib/rfnoc/chdr_packet.hpp> +#include <uhdlib/rfnoc/chdr_types.hpp> +#include <uhdlib/rfnoc/mgmt_portal.hpp> +#include <uhdlib/rfnoc/rfnoc_common.hpp> +#include <uhdlib/rfnoc/rx_flow_ctrl_state.hpp> +#include <uhdlib/transport/io_service.hpp> +#include <uhdlib/transport/link_if.hpp> +#include <memory> + +namespace uhd { namespace rfnoc { + +namespace detail { + +/*! + * Utility class to send rx flow control responses + */ +class rx_flow_ctrl_sender +{ +public: +    //! Constructor +    rx_flow_ctrl_sender( +        const chdr::chdr_packet_factory& pkt_factory, const sep_id_pair_t sep_ids) +        : _dst_epid(sep_ids.first) +    { +        _fc_packet             = pkt_factory.make_strs(); +        _fc_strs_pyld.src_epid = sep_ids.second; +    } + +    /*! Configure buffer capacity +     * \param recv_capacity The buffer capacity of the receive link +     */ +    void set_capacity(const stream_buff_params_t& recv_capacity) +    { +        _fc_strs_pyld.capacity_bytes = recv_capacity.bytes; +        _fc_strs_pyld.capacity_pkts  = recv_capacity.packets; +    } + +    /*! Send a flow control response packet +     * +     * \param send_link the link to use to send the packet +     * \counts transfer counts for packet contents +     */ +    void send_strs(transport::send_link_if* send_link, const stream_buff_params_t& counts) +    { +        auto buff = send_link->get_send_buff(0); +        if (!buff) { +            throw uhd::runtime_error("rx_flowctrl timed out getting a send buffer"); +        } + +        chdr::chdr_header header; +        header.set_seq_num(_fc_seq_num++); +        header.set_dst_epid(_dst_epid); + +        chdr::strs_payload fc_payload(_fc_strs_pyld); +        fc_payload.xfer_count_bytes = counts.bytes; +        fc_payload.xfer_count_pkts  = counts.packets; + +        _fc_packet->refresh(buff->data(), header, fc_payload); +        const size_t size = header.get_length(); + +        buff->set_packet_size(size); +        send_link->release_send_buff(std::move(buff)); +    } + +private: +    // Endpoint ID for recipient of flow control response +    const sep_id_t _dst_epid; + +    // Packet for writing flow control info +    chdr::chdr_strs_packet::uptr _fc_packet; + +    // Pre-configured strs payload to hold values that don't change +    chdr::strs_payload _fc_strs_pyld; + +    // Sequence number for flow control packets +    uint16_t _fc_seq_num = 0; +}; +} // namespace detail + +/*! + * Flow-controlled transport for RX chdr data + * + * This transport provides the streamer an interface to read RX data packets. + * The transport implements flow control and sequence number checking. + * + * The transport uses I/O services to provide options for work scheduling. I/O + * services allow the I/O work to be offloaded to a worker thread or to be + * performed in the same thread as the streamer API calls. + * + * For an rx transport, the device sends data packets, and the host sends strs + * packets letting the device know that buffer space in the host has been freed. + * For lossy links, the device also sends strc packets to resynchronize the + * transfer counts between host and device, to correct for any dropped packets + * in the link. + */ +class chdr_rx_data_xport +{ +public: +    using uptr   = std::unique_ptr<chdr_rx_data_xport>; +    using buff_t = transport::frame_buff; + +    //! Values extracted from received RX data packets +    struct packet_info_t +    { +        bool eob             = false; +        bool has_tsf         = false; +        uint64_t tsf         = 0; +        size_t payload_bytes = 0; +        const void* payload  = nullptr; +    }; + +    /*! Constructor +     * +     * \param io_srv The service that will schedule the xport I/O +     * \param recv_link The recv link, already attached to the I/O service +     * \param send_link The send link, already attached to the I/O service +     * \param pkt_factory Factory to create packets with the desired chdr_w and endianness +     * \param addrs Source and destination addresses +     * \param epids Source and destination endpoint IDs +     * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload +     * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata +     * \param num_recv_frames Num frames to reserve from the recv link +     * \param recv_capacity Total capacity of the recv link +     * \param fc_freq Frequency of flow control status messages +     * \param fc_headroom Headroom for flow control status messages +     * \param lossy_xport Whether the xport is lossy, for flow control configuration +     */ +    chdr_rx_data_xport(uhd::transport::io_service::sptr io_srv, +        uhd::transport::recv_link_if::sptr recv_link, +        uhd::transport::send_link_if::sptr send_link, +        const chdr::chdr_packet_factory& pkt_factory, +        const uhd::rfnoc::sep_addr_pair_t& addrs, +        const uhd::rfnoc::sep_id_pair_t& epids, +        const uhd::rfnoc::sw_buff_t pyld_buff_fmt, +        const uhd::rfnoc::sw_buff_t mdata_buff_fmt, +        const size_t num_recv_frames, +        const stream_buff_params_t& recv_capacity, +        const stream_buff_params_t& fc_freq, +        const stream_buff_params_t& fc_headroom, +        const bool lossy_xport) +        : _fc_state(epids), _fc_sender(pkt_factory, epids), _epid(epids.second) +    { +        const sep_addr_t remote_sep_addr = addrs.first; +        const sep_addr_t local_sep_addr  = addrs.second; +        const sep_id_t remote_epid       = epids.first; +        const sep_id_t local_epid        = epids.second; + +        UHD_LOG_TRACE("XPORT::RX_DATA_XPORT", +            "Creating rx xport with local epid=" << local_epid +                                                 << ", remote epid=" << remote_epid); + +        _recv_packet = pkt_factory.make_generic(); +        _fc_sender.set_capacity(recv_capacity); + +        // Calculate max payload size +        const size_t pyld_offset = +            _recv_packet->calculate_payload_offset(chdr::PKT_TYPE_DATA_WITH_TS); +        _max_payload_size = recv_link->get_recv_frame_size() - pyld_offset; + +        // Make data transport +        auto recv_cb = [this](buff_t::uptr& buff, +                           transport::recv_link_if* recv_link, +                           transport::send_link_if* send_link) { +            return this->_recv_callback(buff, recv_link, send_link); +        }; + +        auto fc_cb = [this](buff_t::uptr buff, +                         transport::recv_link_if* recv_link, +                         transport::send_link_if* send_link) { +            this->_fc_callback(std::move(buff), recv_link, send_link); +        }; + +        // Needs just a single send frame for responses +        _recv_io = io_srv->make_recv_client(recv_link, +            num_recv_frames, +            recv_cb, +            send_link, +            /* num_send_frames*/ 1, +            fc_cb); + +        // Create a control transport with the rx data links to send mgmt packets +        // needed to setup the stream +        // Piggyback on frames from the recv_io_if +        auto ctrl_xport = uhd::rfnoc::chdr_ctrl_xport::make(io_srv, +            send_link, +            recv_link, +            local_epid, +            0, // num_send_frames +            0); // num_recv_frames + +        // Create new temporary management portal with the transports used for this stream +        // TODO: This is a bit excessive. Maybe we can pare down the functionality of the +        // portal just for route setup purposes. Whatever we do, we *must* use xport in it +        // though otherwise the transport will not behave correctly. +        auto data_mgmt_portal = uhd::rfnoc::mgmt::mgmt_portal::make( +            *ctrl_xport, pkt_factory, local_sep_addr, local_epid); + +        // Setup a route to the EPID +        // Note that this may be gratuitous--The endpoint may already have been set up +        data_mgmt_portal->initialize_endpoint(*ctrl_xport, remote_sep_addr, remote_epid); +        data_mgmt_portal->setup_local_route(*ctrl_xport, remote_epid); + +        // Initialize flow control - management portal sends a stream command +        // containing its requested flow control frequency, the rx transport +        // responds with a stream status containing its buffer capacity. +        data_mgmt_portal->config_local_rx_stream_start(*ctrl_xport, +            remote_epid, +            lossy_xport, +            pyld_buff_fmt, +            mdata_buff_fmt, +            fc_freq, +            fc_headroom); + +        data_mgmt_portal->config_local_rx_stream_commit(*ctrl_xport, remote_epid); + +        UHD_LOG_TRACE("XPORT::RX_DATA_XPORT", +            "Stream endpoint was configured with:" +                << std::endl +                << "capacity bytes=" << recv_capacity.bytes +                << ", packets=" << recv_capacity.packets << std::endl +                << "fc headroom bytes=" << fc_headroom.bytes +                << ", packets=" << fc_headroom.packets << std::endl +                << "fc frequency bytes=" << fc_freq.bytes +                << ", packets=" << fc_freq.packets); + +        // We no longer need the control xport and mgmt_portal, release them so +        // the control xport is no longer connected to the I/O service. +        data_mgmt_portal.reset(); +        ctrl_xport.reset(); +    } + +    /*! Returns maximum number payload bytes +     * +     * \return maximum payload bytes per packet +     */ +    size_t get_max_payload_size() const +    { +        return _max_payload_size; +    } + +    /*! +     * Gets an RX frame buffer containing a recv packet +     * +     * \param timeout_ms timeout in milliseconds +     * \return returns a tuple containing: +     * - a frame_buff, or null if timeout occurs +     * - info struct corresponding to the packet +     * - whether the packet was out of sequence +     */ +    std::tuple<typename buff_t::uptr, packet_info_t, bool> get_recv_buff( +        const int32_t timeout_ms) +    { +        buff_t::uptr buff = _recv_io->get_recv_buff(timeout_ms); + +        if (!buff) { +            return std::make_tuple(typename buff_t::uptr(), packet_info_t(), false); +        } + +        auto info      = _read_data_packet_info(buff); +        bool seq_error = _is_out_of_sequence(std::get<1>(info)); + +        return std::make_tuple(std::move(buff), std::get<0>(info), seq_error); +    } + +    /*! +     * Releases an RX frame buffer +     * +     * \param buff the frame buffer to release +     */ +    void release_recv_buff(typename buff_t::uptr buff) +    { +        _recv_io->release_recv_buff(std::move(buff)); +    } + +private: +    /*! +     * Recv callback for I/O service +     * +     * The I/O service invokes this callback when it reads a packet from the +     * recv link. +     * +     * \param buff the frame buffer containing the packet data +     * \param recv_link the recv link from which buff was read +     * \param send_link the send link for flow control messages +     */ +    bool _recv_callback(buff_t::uptr& buff, +        transport::recv_link_if* recv_link, +        transport::send_link_if* send_link) +    { +        _recv_packet->refresh(buff->data()); +        const auto header      = _recv_packet->get_chdr_header(); +        const auto type        = header.get_pkt_type(); +        const auto dst_epid    = header.get_dst_epid(); +        const auto packet_size = buff->packet_size(); + +        if (dst_epid != _epid) { +            return false; +        } + +        if (type == chdr::PKT_TYPE_STRC) { +            chdr::strc_payload strc; +            strc.deserialize(_recv_packet->get_payload_const_ptr_as<uint64_t>(), +                _recv_packet->get_payload_size() / sizeof(uint64_t), +                _recv_packet->conv_to_host<uint64_t>()); + +            const stream_buff_params_t strc_counts = { +                strc.num_bytes, static_cast<uint32_t>(strc.num_pkts)}; + +            if (strc.op_code == chdr::STRC_RESYNC) { +                // Resynchronize before updating fc_state, the strc payload +                // contains counts before the strc packet itself +                _fc_state.resynchronize(strc_counts); + +                // Update state that we received a packet +                _fc_state.data_received(packet_size); + +                recv_link->release_recv_buff(std::move(buff)); +                buff = buff_t::uptr(); +                _fc_state.xfer_done(packet_size); +                _send_fc_response(send_link); +            } else if (strc.op_code == chdr::STRC_INIT) { +                _fc_state.initialize( +                    {strc.num_bytes, static_cast<uint32_t>(strc.num_pkts)}); + +                UHD_LOG_TRACE("XPORT::RX_DATA_XPORT", +                    "Received strc init with fc freq" +                        << " bytes=" << strc.num_bytes << ", packets=" << strc.num_pkts); + +                // Make sure flow control was initialized +                assert(_fc_state.get_fc_freq().bytes > 0); +                assert(_fc_state.get_fc_freq().packets > 0); + +                // Send a strs response to configure flow control on the sender +                _fc_sender.send_strs(send_link, _fc_state.get_xfer_counts()); + +                // Reset counts, since mgmt_portal will do it to FPGA +                _fc_state.reset_counts(); + +                recv_link->release_recv_buff(std::move(buff)); +                buff = buff_t::uptr(); +            } else { +                throw uhd::value_error("Unexpected opcode value in STRC packet."); +            } + +            // For stream commands, we return true (packet was destined to this +            // client) but release the buffer. The I/O service won't queue this +            // packet in the recv_io_if. +            return true; + +        } else if (type == chdr::PKT_TYPE_DATA_NO_TS +                   || type == chdr::PKT_TYPE_DATA_WITH_TS) { +            // Update state that we received a packet +            _fc_state.data_received(packet_size); + +            // If this is a data packet, just claim it by returning true. The +            // I/O service will queue this packet in the recv_io_if. +            return true; + +        } else { +            return false; +        } +    } + +    /*! +     * Flow control callback for I/O service +     * +     * The I/O service invokes this callback when a packet needs to be released +     * to the recv link. +     * +     * \param buff the frame buffer containing the packet data +     * \param recv_link the recv link to which to release the buffer +     * \param send_link the send link for flow control messages +     */ +    void _fc_callback(buff_t::uptr buff, +        transport::recv_link_if* recv_link, +        transport::send_link_if* send_link) +    { +        const size_t packet_size = buff->packet_size(); +        recv_link->release_recv_buff(std::move(buff)); +        _fc_state.xfer_done(packet_size); +        _send_fc_response(send_link); +    } + +    /*! +     * Sends a flow control response packet if necessary. +     * +     * \param send_link the send link for flow control messages +     */ +    void _send_fc_response(transport::send_link_if* send_link) +    { +        if (_fc_state.fc_resp_due()) { +            _fc_sender.send_strs(send_link, _fc_state.get_xfer_counts()); +            _fc_state.fc_resp_sent(); +        } +    } + +    /*! +     * Checks if the sequence number is out of sequence, prints 'D' if it is +     * and returns result of check. +     * +     * \return true if a sequence error occurred +     */ +    UHD_FORCE_INLINE bool _is_out_of_sequence(uint16_t seq_num) +    { +        const uint16_t expected_packet_count = _data_seq_num; +        _data_seq_num                        = seq_num + 1; + +        if (expected_packet_count != seq_num) { +            UHD_LOG_FASTPATH("D"); +            return true; +        } +        return false; +    } + +    /*! +     * Reads packet header and returns information in a struct. +     * +     * \return a tuple containing the packet info and packet sequence number +     */ +    std::tuple<packet_info_t, uint16_t> _read_data_packet_info(buff_t::uptr& buff) +    { +        const void* data = buff->data(); +        _recv_packet->refresh(data); +        const auto header        = _recv_packet->get_chdr_header(); +        const auto optional_time = _recv_packet->get_timestamp(); + +        packet_info_t info; +        info.eob           = header.get_eob(); +        info.has_tsf       = optional_time.is_initialized(); +        info.tsf           = optional_time ? *optional_time : 0; +        info.payload_bytes = _recv_packet->get_payload_size(); +        info.payload       = _recv_packet->get_payload_const_ptr(); + +        const uint8_t* pkt_end = +            reinterpret_cast<uint8_t*>(buff->data()) + buff->packet_size(); +        const size_t pyld_pkt_len = +            pkt_end - reinterpret_cast<const uint8_t*>(info.payload); + +        if (pyld_pkt_len < info.payload_bytes) { +            _recv_io->release_recv_buff(std::move(buff)); +            throw uhd::value_error("Bad CHDR header or invalid packet length."); +        } + +        return std::make_tuple(info, header.get_seq_num()); +    } + +    // Interface to the I/O service +    transport::recv_io_if::sptr _recv_io; + +    // Flow control state +    rx_flow_ctrl_state _fc_state; + +    // Maximum data payload in bytes +    size_t _max_payload_size = 0; + +    // Sequence number for data packets +    uint16_t _data_seq_num = 0; + +    // Packet for received data +    chdr::chdr_packet::uptr _recv_packet; + +    // Handles sending of strs flow control response packets +    detail::rx_flow_ctrl_sender _fc_sender; + +    // Local / Sink EPID +    sep_id_t _epid; +}; + +}} // namespace uhd::rfnoc + +#endif /* INCLUDED_LIBUHD_CHDR_RX_DATA_XPORT_HPP */ diff --git a/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp new file mode 100644 index 000000000..62b811bf5 --- /dev/null +++ b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp @@ -0,0 +1,550 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_CHDR_TX_DATA_XPORT_HPP +#define INCLUDED_LIBUHD_CHDR_TX_DATA_XPORT_HPP + +#include <uhdlib/rfnoc/chdr_packet.hpp> +#include <uhdlib/rfnoc/chdr_types.hpp> +#include <uhdlib/rfnoc/mgmt_portal.hpp> +#include <uhdlib/rfnoc/rfnoc_common.hpp> +#include <uhdlib/rfnoc/tx_flow_ctrl_state.hpp> +#include <uhdlib/transport/io_service.hpp> +#include <uhdlib/transport/link_if.hpp> +#include <memory> + +namespace uhd { namespace rfnoc { + +namespace detail { + +/*! + * Utility class to send tx flow control messages + */ +class tx_flow_ctrl_sender +{ +public: +    //! Constructor +    tx_flow_ctrl_sender( +        const chdr::chdr_packet_factory& pkt_factory, const sep_id_pair_t sep_ids) +        : _dst_epid(sep_ids.second) +    { +        _fc_packet             = pkt_factory.make_strc(); +        _fc_strc_pyld.src_epid = sep_ids.first; +        _fc_strc_pyld.op_code  = chdr::STRC_RESYNC; +    } + +    /*! +     * Sends a flow control resync packet +     * +     * Sends a strc packet with the resync opcode to make the device transfer +     * counts match those of the host, to correct for dropped packets. +     * +     * \param send_link the link to use to send the packet +     * \counts transfer counts for packet contents +     */ +    size_t send_strc_resync( +        transport::send_link_if* send_link, const stream_buff_params_t& counts) +    { +        auto buff = send_link->get_send_buff(0); +        if (!buff) { +            throw uhd::runtime_error("tx_flowctrl timed out getting a send buffer"); +        } + +        chdr::chdr_header header; +        header.set_seq_num(_fc_seq_num++); +        header.set_dst_epid(_dst_epid); + +        chdr::strc_payload fc_payload(_fc_strc_pyld); +        fc_payload.num_bytes = counts.bytes; +        fc_payload.num_pkts  = counts.packets; + +        _fc_packet->refresh(buff->data(), header, fc_payload); +        const size_t size = header.get_length(); + +        buff->set_packet_size(size); +        send_link->release_send_buff(std::move(buff)); +        return size; +    } + +private: +    // Endpoint ID for recipient of flow control response +    const sep_id_t _dst_epid; + +    // Packet for writing flow control info +    chdr::chdr_strc_packet::uptr _fc_packet; + +    // Pre-configured strc payload to hold values that don't change +    chdr::strc_payload _fc_strc_pyld; + +    // Sequence number for flow control packets +    uint16_t _fc_seq_num = 0; +}; +} // namespace detail + +/*! + * Flow-controlled transport for TX chdr data + * + * This transport provides the streamer an interface to send TX data packets. + * The transport implements flow control and keeps track of sequence numbers. + * + * The transport uses I/O services to provide options for work scheduling. I/O + * services allow the I/O work to be offloaded to a worker thread or to be + * performed in the same thread as the streamer API calls. + * + * For a tx transport, the host sends data packets, and the device sends strs + * packets letting the host know that buffer space in the device stream endpoint + * has been freed. For lossy links, the host also sends strc packets to + * resynchronize the transfer counts between host and device, to correct for + * any dropped packets in the link. + */ +class chdr_tx_data_xport +{ +public: +    using uptr   = std::unique_ptr<chdr_tx_data_xport>; +    using buff_t = transport::frame_buff; + +    //! Information about data packet +    struct packet_info_t +    { +        bool eob             = false; +        bool has_tsf         = false; +        uint64_t tsf         = 0; +        size_t payload_bytes = 0; +    }; + +    /*! Constructor +     * +     * \param io_srv The service that will schedule the xport I/O +     * \param recv_link The recv link, already attached to the I/O service +     * \param send_link The send link, already attached to the I/O service +     * \param pkt_factory Factory to create packets with the desired chdr_w and endianness +     * \param addrs Source and destination addresses +     * \param epids Source and destination endpoint IDs +     * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload +     * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata +     * \param num_send_frames Num frames to reserve from the send link +     * \param fc_freq_ratio Ratio to use to configure the device fc frequency +     * \param fc_headroom_ratio Ratio to use to configure the device fc headroom +     */ +    chdr_tx_data_xport(uhd::transport::io_service::sptr io_srv, +        uhd::transport::recv_link_if::sptr recv_link, +        uhd::transport::send_link_if::sptr send_link, +        const chdr::chdr_packet_factory& pkt_factory, +        const uhd::rfnoc::sep_addr_pair_t& addrs, +        const uhd::rfnoc::sep_id_pair_t& epids, +        const uhd::rfnoc::sw_buff_t pyld_buff_fmt, +        const uhd::rfnoc::sw_buff_t mdata_buff_fmt, +        const size_t num_send_frames, +        const double fc_freq_ratio, +        const double fc_headroom_ratio) +        : _fc_sender(pkt_factory, epids), _epid(epids.first) +    { +        const sep_addr_t remote_sep_addr = addrs.second; +        const sep_addr_t local_sep_addr  = addrs.first; +        const sep_id_t remote_epid       = epids.second; +        const sep_id_t local_epid        = epids.first; + +        UHD_LOG_TRACE("XPORT::TX_DATA_XPORT", +            "Creating tx xport with local epid=" << local_epid +                                                 << ", remote epid=" << remote_epid); + +        _send_header.set_dst_epid(epids.second); +        _send_packet = pkt_factory.make_generic(); +        _recv_packet = pkt_factory.make_generic(); + +        // Calculate max payload size +        const size_t pyld_offset = +            _send_packet->calculate_payload_offset(chdr::PKT_TYPE_DATA_WITH_TS); +        _max_payload_size = send_link->get_send_frame_size() - pyld_offset; + +        _configure_sep(io_srv, +            recv_link, +            send_link, +            pkt_factory, +            local_sep_addr, +            local_epid, +            remote_sep_addr, +            remote_epid, +            pyld_buff_fmt, +            mdata_buff_fmt); + +        _initialize_flow_ctrl(io_srv, +            recv_link, +            send_link, +            pkt_factory, +            epids, +            fc_freq_ratio, +            fc_headroom_ratio); + +        // Now create the send I/O we will use for data +        auto send_cb = [this](buff_t::uptr& buff, transport::send_link_if* send_link) { +            this->_send_callback(buff, send_link); +        }; + +        auto recv_cb = [this](buff_t::uptr& buff, +                           transport::recv_link_if* recv_link, +                           transport::send_link_if* send_link) { +            return this->_recv_callback(buff, recv_link, send_link); +        }; + +        // Needs just a single recv frame for strs packets +        _send_io = io_srv->make_send_client(send_link, +            num_send_frames, +            send_cb, +            recv_link, +            /* num_recv_frames */ 1, +            recv_cb); +    } + +    /*! Returns maximum number of payload bytes +     * +     * \return maximum number of payload bytes +     */ +    size_t get_max_payload_size() const +    { +        return _max_payload_size; +    } + +    /*! +     * Gets a TX frame buffer +     * +     * \param timeout_ms timeout in milliseconds +     * \return the frame buffer, or nullptr if timeout occurs +     */ +    buff_t::uptr get_send_buff(const int32_t timeout_ms) +    { +        return _send_io->get_send_buff(timeout_ms); +    } + +    /*! +     * Sends a TX data packet +     * +     * \param buff the frame buffer containing the packet to send +     */ +    void release_send_buff(buff_t::uptr buff) +    { +        _send_io->release_send_buff(std::move(buff)); +    } + +    /*! +     * Writes header into frame buffer and returns payload pointer +     * +     * \param buff Frame buffer to write header into +     * \param info Information to include in the header +     * \return A pointer to the payload data area and the packet size in bytes +     */ +    std::pair<void*, size_t> write_packet_header(buff_t::uptr& buff, +        const packet_info_t& info) +    { +        uint64_t tsf = 0; + +        if (info.has_tsf) { +            _send_header.set_pkt_type(chdr::PKT_TYPE_DATA_WITH_TS); +            tsf = info.tsf; +        } else { +            _send_header.set_pkt_type(chdr::PKT_TYPE_DATA_NO_TS); +        } + +        _send_header.set_eob(info.eob); +        _send_header.set_seq_num(_data_seq_num++); + +        _send_packet->refresh(buff->data(), _send_header, tsf); +        _send_packet->update_payload_size(info.payload_bytes); + +        return std::make_pair( +            _send_packet->get_payload_ptr(), +            _send_packet->get_chdr_header().get_length()); +    } + +private: +    /*! +     * Recv callback for I/O service +     * +     * The I/O service invokes this callback when it reads a packet from the +     * recv link. +     * +     * \param buff the frame buffer containing the packet data +     * \param recv_link the recv link from which buff was read +     * \param send_link the send link for flow control messages +     */ +    bool _recv_callback(buff_t::uptr& buff, +        transport::recv_link_if* recv_link, +        transport::send_link_if* /*send_link*/) +    { +        _recv_packet->refresh(buff->data()); +        const auto header   = _recv_packet->get_chdr_header(); +        const auto type     = header.get_pkt_type(); +        const auto dst_epid = header.get_dst_epid(); + +        if (dst_epid != _epid) { +            return false; +        } + +        if (type == chdr::PKT_TYPE_STRS) { +            chdr::strs_payload strs; +            strs.deserialize(_recv_packet->get_payload_const_ptr_as<uint64_t>(), +                _recv_packet->get_payload_size() / sizeof(uint64_t), +                _recv_packet->conv_to_host<uint64_t>()); + +            _fc_state.update_dest_recv_count({strs.xfer_count_bytes, +                static_cast<uint32_t>(strs.xfer_count_pkts)}); + +            // TODO: check strs status here and push into async msg queue + +            // Packet belongs to this transport, release buff and return true +            recv_link->release_recv_buff(std::move(buff)); +            buff = nullptr; +            return true; +        } else { +            UHD_THROW_INVALID_CODE_PATH(); +        } +    } + +    /*! +     * Send callback for I/O service +     * +     * The I/O service invokes this callback when it is requested to release +     * a send buffer to the send link. +     * +     * \param buff the frame buffer to release +     * \param send_link the send link for flow control messages +     */ +    void _send_callback(buff_t::uptr& buff, transport::send_link_if* send_link) +    { +        const size_t packet_size = buff->packet_size(); + +        if (_fc_state.dest_has_space(packet_size)) { +            send_link->release_send_buff(std::move(buff)); +            buff = nullptr; + +            _fc_state.data_sent(packet_size); + +            if (_fc_state.get_fc_resync_req_pending() +                && _fc_state.dest_has_space(chdr::strc_payload::PACKET_SIZE)) { +                const auto& xfer_counts = _fc_state.get_xfer_counts(); +                const size_t strc_size = +                    _fc_sender.send_strc_resync(send_link, xfer_counts); +                _fc_state.clear_fc_resync_req_pending(); +                _fc_state.data_sent(strc_size); +            } +        } +    } + +    /*! +     * Configures the stream endpoint using mgmt_portal +     */ +    void _configure_sep(uhd::transport::io_service::sptr io_srv, +        uhd::transport::recv_link_if::sptr recv_link, +        uhd::transport::send_link_if::sptr send_link, +        const chdr::chdr_packet_factory& pkt_factory, +        const uhd::rfnoc::sep_addr_t& local_sep_addr, +        const uhd::rfnoc::sep_id_t& local_epid, +        const uhd::rfnoc::sep_addr_t& remote_sep_addr, +        const uhd::rfnoc::sep_id_t& remote_epid, +        const uhd::rfnoc::sw_buff_t pyld_buff_fmt, +        const uhd::rfnoc::sw_buff_t mdata_buff_fmt) +    { +        // Create a control transport with the tx data links to send mgmt packets +        // needed to setup the stream. Only need one frame for this. +        auto ctrl_xport = uhd::rfnoc::chdr_ctrl_xport::make(io_srv, +            send_link, +            recv_link, +            local_epid, +            1, // num_send_frames +            1); // num_recv_frames + +        // Create new temporary management portal with the transports used for this stream +        // TODO: This is a bit excessive. Maybe we can pare down the functionality of the +        // portal just for route setup purposes. Whatever we do, we *must* use xport in it +        // though otherwise the transport will not behave correctly. +        auto data_mgmt_portal = uhd::rfnoc::mgmt::mgmt_portal::make( +            *ctrl_xport, pkt_factory, local_sep_addr, local_epid); + +        // Setup a route to the EPID +        data_mgmt_portal->initialize_endpoint(*ctrl_xport, remote_sep_addr, remote_epid); +        data_mgmt_portal->setup_local_route(*ctrl_xport, remote_epid); + +        data_mgmt_portal->config_local_tx_stream( +            *ctrl_xport, remote_epid, pyld_buff_fmt, mdata_buff_fmt); + +        // We no longer need the control xport and mgmt_portal, release them so +        // the control xport is no longer connected to the I/O service. +        data_mgmt_portal.reset(); +        ctrl_xport.reset(); +    } + +    /*! +     * Initializes flow control +     * +     * To initialize flow control, we need to send an init strc packet, then +     * receive a strs containing the stream endpoint ingress buffer size. We +     * then repeat this (now that we know the buffer size) to configure the flow +     * control frequency. To avoid having this logic within the data packet +     * processing flow, we use temporary send and recv I/O instances with +     * simple callbacks here. +     */ +    void _initialize_flow_ctrl(uhd::transport::io_service::sptr io_srv, +        uhd::transport::recv_link_if::sptr recv_link, +        uhd::transport::send_link_if::sptr send_link, +        const chdr::chdr_packet_factory& pkt_factory, +        const sep_id_pair_t sep_ids, +        const double fc_freq_ratio, +        const double fc_headroom_ratio) +    { +        // No flow control at initialization, just release all send buffs +        auto send_cb = [this](buff_t::uptr& buff, transport::send_link_if* send_link) { +            send_link->release_send_buff(std::move(buff)); +            buff = nullptr; +        }; + +        // For recv, just queue strs packets for recv_io to read +        auto recv_cb = [this](buff_t::uptr& buff, +                           transport::recv_link_if* /*recv_link*/, +                           transport::send_link_if* /*send_link*/) { +            _recv_packet->refresh(buff->data()); +            const auto header   = _recv_packet->get_chdr_header(); +            const auto type     = header.get_pkt_type(); +            const auto dst_epid = header.get_dst_epid(); + +            return (dst_epid == _epid && type == chdr::PKT_TYPE_STRS); +        }; + +        // No flow control at initialization, just release all recv buffs +        auto fc_cb = [this](buff_t::uptr buff, +                         transport::recv_link_if* recv_link, +                         transport::send_link_if* /*send_link*/) { +            recv_link->release_recv_buff(std::move(buff)); +        }; + +        auto send_io = io_srv->make_send_client(send_link, +            1, // num_send_frames +            send_cb, +            nullptr, +            0, // num_recv_frames +            nullptr); + +        auto recv_io = io_srv->make_recv_client(recv_link, +            1, // num_recv_frames +            recv_cb, +            nullptr, +            0, // num_send_frames +            fc_cb); + +        chdr::chdr_strc_packet::uptr strc_packet = pkt_factory.make_strc(); +        chdr::chdr_packet::uptr& recv_packet     = _recv_packet; + +        // Function to send a strc init +        auto send_strc_init = [&send_io, sep_ids, &strc_packet]( +                                  const stream_buff_params_t fc_freq = {0, 0}) { +            transport::frame_buff::uptr buff = send_io->get_send_buff(0); + +            if (!buff) { +                throw uhd::runtime_error( +                    "tx xport timed out getting a send buffer for strc init"); +            } + +            chdr::chdr_header header; +            header.set_seq_num(0); +            header.set_dst_epid(sep_ids.second); + +            chdr::strc_payload strc_pyld; +            strc_pyld.src_epid  = sep_ids.first; +            strc_pyld.op_code   = chdr::STRC_INIT; +            strc_pyld.num_bytes = fc_freq.bytes; +            strc_pyld.num_pkts  = fc_freq.packets; +            strc_packet->refresh(buff->data(), header, strc_pyld); + +            const size_t size = header.get_length(); +            buff->set_packet_size(size); +            send_io->release_send_buff(std::move(buff)); +        }; + +        // Function to receive a strs, returns buffer capacity +        auto recv_strs = [&recv_io, &recv_packet]() -> stream_buff_params_t { +            transport::frame_buff::uptr buff = recv_io->get_recv_buff(200); + +            if (!buff) { +                throw uhd::runtime_error( +                    "tx xport timed out wating for a strs packet during initialization"); +            } + +            recv_packet->refresh(buff->data()); +            UHD_ASSERT_THROW( +                recv_packet->get_chdr_header().get_pkt_type() == chdr::PKT_TYPE_STRS); +            chdr::strs_payload strs; +            strs.deserialize(recv_packet->get_payload_const_ptr_as<uint64_t>(), +                recv_packet->get_payload_size() / sizeof(uint64_t), +                recv_packet->conv_to_host<uint64_t>()); + +            recv_io->release_recv_buff(std::move(buff)); + +            return {strs.capacity_bytes, +                static_cast<uint32_t>(strs.capacity_pkts)}; +        }; + +        // Send a strc init to get the buffer size +        send_strc_init(); +        stream_buff_params_t capacity = recv_strs(); +        _fc_state.set_dest_capacity(capacity); + +        UHD_LOG_TRACE("XPORT::TX_DATA_XPORT", +            "Received strs initializing buffer capacity to " +            << capacity.bytes << " bytes"); + +        // Calculate the requested fc_freq parameters +        uhd::rfnoc::stream_buff_params_t fc_freq = { +            static_cast<uint64_t>(std::ceil(double(capacity.bytes) * fc_freq_ratio)), +            static_cast<uint32_t>( +                std::ceil(double(capacity.packets) * fc_freq_ratio))}; + +        const size_t headroom_bytes = +            static_cast<uint64_t>(std::ceil(double(capacity.bytes) * fc_headroom_ratio)); +        const size_t headroom_packets = static_cast<uint32_t>( +            std::ceil(double(capacity.packets) * fc_headroom_ratio)); + +        fc_freq.bytes -= headroom_bytes; +        fc_freq.packets -= headroom_packets; + +        // Send a strc init to configure fc freq +        send_strc_init(fc_freq); +        recv_strs(); + +        // Release temporary I/O service interfaces to disconnect from it +        send_io.reset(); +        recv_io.reset(); +    } + +    // Interface to the I/O service +    transport::send_io_if::sptr _send_io; + +    // Flow control state +    tx_flow_ctrl_state _fc_state; + +    // Maximum data payload in bytes +    size_t _max_payload_size = 0; + +    // Sequence number for data packets +    uint16_t _data_seq_num = 0; + +    // Header to write into send packets +    chdr::chdr_header _send_header; + +    // Packet for send data +    chdr::chdr_packet::uptr _send_packet; + +    // Packet to receive strs messages +    chdr::chdr_packet::uptr _recv_packet; + +    // Handles sending of strc flow control ack packets +    detail::tx_flow_ctrl_sender _fc_sender; + +    // Local / Source EPID +    sep_id_t _epid; +}; + +}} // namespace uhd::rfnoc + +#endif /* INCLUDED_LIBUHD_CHDR_TX_DATA_XPORT_HPP */ diff --git a/host/lib/include/uhdlib/rfnoc/chdr_types.hpp b/host/lib/include/uhdlib/rfnoc/chdr_types.hpp index 62b24ab61..1f14ea7d0 100644 --- a/host/lib/include/uhdlib/rfnoc/chdr_types.hpp +++ b/host/lib/include/uhdlib/rfnoc/chdr_types.hpp @@ -482,6 +482,8 @@ public: // Members      uint64_t num_pkts = 0;      //! Number of bytes to use for operation (64 bits)      uint64_t num_bytes = 0; +    //! Size of a strc packet (including header) +    static constexpr size_t PACKET_SIZE = 24;  public: // Functions      strc_payload()                        = default; diff --git a/host/lib/include/uhdlib/rfnoc/graph_stream_manager.hpp b/host/lib/include/uhdlib/rfnoc/graph_stream_manager.hpp index 120b0e0f8..28fa8ec7c 100644 --- a/host/lib/include/uhdlib/rfnoc/graph_stream_manager.hpp +++ b/host/lib/include/uhdlib/rfnoc/graph_stream_manager.hpp @@ -7,11 +7,14 @@  #ifndef INCLUDED_LIBUHD_RFNOC_GRAPH_STREAM_MANAGER_HPP  #define INCLUDED_LIBUHD_RFNOC_GRAPH_STREAM_MANAGER_HPP +#include <uhd/stream.hpp>  #include <uhdlib/rfnoc/chdr_packet.hpp>  #include <uhdlib/rfnoc/client_zero.hpp>  #include <uhdlib/rfnoc/ctrlport_endpoint.hpp>  #include <uhdlib/rfnoc/epid_allocator.hpp>  #include <uhdlib/rfnoc/mb_iface.hpp> +#include <uhdlib/rfnoc/chdr_rx_data_xport.hpp> +#include <uhdlib/rfnoc/chdr_tx_data_xport.hpp>  #include <functional>  #include <memory>  #include <set> @@ -84,6 +87,7 @@ public:      virtual detail::client_zero::sptr get_client_zero(          sep_addr_t dst_addr, device_id_t via_device = NULL_DEVICE_ID) const = 0; +      /*! Configure a flow controlled data stream from the endpoint with ID src_epid to the       *  endpoint with ID dst_epid       * @@ -102,7 +106,33 @@ public:          const double fc_headroom_ratio,          const bool reset = false) = 0; -    // TODO: Implement functions to get graph-wide streamers +    /*! \brief Create a data stream going from the device to the host +     * +     * \param dst_addr The address of the destination stream endpoint +     * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload +     * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata +     * \param xport_args The transport arguments +     * \return An transport instance +     */ +    virtual chdr_rx_data_xport::uptr create_device_to_host_data_stream( +        sep_addr_t dst_addr, +        const sw_buff_t pyld_buff_fmt, +        const sw_buff_t mdata_buff_fmt, +        const device_addr_t& xport_args) = 0; + +    /*! \brief Create a data stream going from the host to the device +     * +     * \param dst_addr The address of the destination stream endpoint +     * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload +     * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata +     * \param xport_args The transport arguments +     * \return An transport instance +     */ +    virtual chdr_tx_data_xport::uptr create_host_to_device_data_stream( +        sep_addr_t dst_addr, +        const sw_buff_t pyld_buff_fmt, +        const sw_buff_t mdata_buff_fmt, +        const device_addr_t& xport_args) = 0;      /*!       * \brief Create a graph_stream_manager and return a unique_ptr to it diff --git a/host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp b/host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp index 79121a498..72f1cf1c7 100644 --- a/host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp +++ b/host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp @@ -11,6 +11,7 @@  #include <uhdlib/rfnoc/ctrlport_endpoint.hpp>  #include <uhdlib/rfnoc/epid_allocator.hpp>  #include <uhdlib/rfnoc/mb_iface.hpp> +#include <uhdlib/rfnoc/chdr_rx_data_xport.hpp>  #include <functional>  #include <memory>  #include <set> @@ -112,41 +113,29 @@ public:      /*! \brief Create a data stream going from the host to the device       *       * \param dst_addr The address of the destination stream endpoint -     * \param lossy_xport Is the transport lossy?       * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload       * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata -     * \param fc_freq_ratio Flow control response frequency as a ratio of the buff params -     * \param fc_headroom_ratio Flow control headroom as a ratio of the buff params       * \param xport_args The transport arguments       * \return An transport instance       */      virtual chdr_tx_data_xport::uptr create_host_to_device_data_stream(          const sep_addr_t dst_addr, -        const bool lossy_xport,          const sw_buff_t pyld_buff_fmt,          const sw_buff_t mdata_buff_fmt, -        const double fc_freq_ratio, -        const double fc_headroom_ratio,          const device_addr_t& xport_args) = 0;      /*! \brief Create a data stream going from the device to the host       *       * \param dst_addr The address of the destination stream endpoint -     * \param lossy_xport Is the transport lossy?       * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload       * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata -     * \param fc_freq_ratio Flow control response frequency as a ratio of the buff params -     * \param fc_headroom_ratio Flow control headroom as a ratio of the buff params       * \param xport_args The transport arguments       * \return An transport instance       */      virtual chdr_rx_data_xport::uptr create_device_to_host_data_stream(          const sep_addr_t src_addr, -        const bool lossy_xport,          const sw_buff_t pyld_buff_fmt,          const sw_buff_t mdata_buff_fmt, -        const double fc_freq_ratio, -        const double fc_headroom_ratio,          const device_addr_t& xport_args) = 0;      static uptr make(const chdr::chdr_packet_factory& pkt_factory, diff --git a/host/lib/include/uhdlib/rfnoc/mb_iface.hpp b/host/lib/include/uhdlib/rfnoc/mb_iface.hpp index 0a2790a34..33a0e3df0 100644 --- a/host/lib/include/uhdlib/rfnoc/mb_iface.hpp +++ b/host/lib/include/uhdlib/rfnoc/mb_iface.hpp @@ -8,21 +8,14 @@  #define INCLUDED_LIBUHD_MB_IFACE_HPP  #include <uhdlib/rfnoc/chdr_ctrl_xport.hpp> -#include <uhdlib/rfnoc/rfnoc_common.hpp> +#include <uhdlib/rfnoc/chdr_rx_data_xport.hpp> +#include <uhdlib/rfnoc/chdr_tx_data_xport.hpp>  #include <uhdlib/rfnoc/clock_iface.hpp> +#include <uhdlib/rfnoc/rfnoc_common.hpp>  #include <memory>  namespace uhd { namespace rfnoc { -// FIXME: Update this -class chdr_rx_data_xport -{ -public: -    using uptr = std::unique_ptr<chdr_rx_data_xport>; -}; - -using chdr_tx_data_xport = chdr_rx_data_xport; -  /*! Motherboard (backchannel) interface   *   * In RFNoC devices, the RFNoC subystem needs a backchannel interface to talk to @@ -70,7 +63,8 @@ public:      /*! Return a reference to a clock iface       */ -    virtual std::shared_ptr<clock_iface> get_clock_iface(const std::string& clock_name) = 0; +    virtual std::shared_ptr<clock_iface> get_clock_iface( +        const std::string& clock_name) = 0;      /*! Create a control transport       * @@ -89,30 +83,34 @@ public:       *       * This is typically called once per streaming channel.       * -     * \param local_device_id ID for the host transport adapter to use -     * \param local_epid Host (sink) streaming endpoint ID -     * \param remote_epid Remote device (source) streaming endpoint ID +     * \param addrs Address of the device and host stream endpoints +     * \param epids Endpoint IDs of the device and host stream endpoints +     * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload +     * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata       * \param xport_args Transport configuration args       * \return A CHDR RX data transport       */ -    virtual chdr_rx_data_xport::uptr make_rx_data_transport(device_id_t local_device_id, -        const sep_id_t& local_epid, -        const sep_id_t& remote_epid, +    virtual chdr_rx_data_xport::uptr make_rx_data_transport(const sep_addr_pair_t& addrs, +        const sep_id_pair_t& epids, +        const sw_buff_t pyld_buff_fmt, +        const sw_buff_t mdata_buff_fmt,          const device_addr_t& xport_args) = 0;      /*! Create an TX data transport       *       * This is typically called once per streaming channel.       * -     * \param local_device_id ID for the host transport adapter to use -     * \param local_epid Host (source) streaming endpoint ID -     * \param remote_epid Remote device (sink) streaming endpoint ID +     * \param addrs Address of the host and device stream endpoints +     * \param epids Endpoint IDs of the host and device stream endpoints +     * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload +     * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata       * \param xport_args Transport configuration args       * \return A CHDR TX data transport       */ -    virtual chdr_tx_data_xport::uptr make_tx_data_transport(device_id_t local_device_id, -        const sep_id_t& local_epid, -        const sep_id_t& remote_epid, +    virtual chdr_tx_data_xport::uptr make_tx_data_transport(const sep_addr_pair_t& addrs, +        const sep_id_pair_t& epids, +        const uhd::rfnoc::sw_buff_t pyld_buff_fmt, +        const uhd::rfnoc::sw_buff_t mdata_buff_fmt,          const device_addr_t& xport_args) = 0;  }; diff --git a/host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp b/host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp index 7ec1b7bb2..bc56fd311 100644 --- a/host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp +++ b/host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp @@ -41,6 +41,8 @@ using device_id_t = uint16_t;  using sep_inst_t = uint16_t;  //! Stream Endpoint Physical Address Type  using sep_addr_t = std::pair<device_id_t, sep_inst_t>; +//! Stream Endpoint Physical Address Type (first = source, second = destination) +using sep_addr_pair_t = std::pair<sep_addr_t, sep_addr_t>;  //! Stream Endpoint ID Type  using sep_id_t = uint16_t;  //! Stream Endpoint pair Type (first = source, second = destination) @@ -65,6 +67,19 @@ struct stream_buff_params_t  //! The data type of the buffer used to capture/generate data  enum sw_buff_t { BUFF_U64 = 0, BUFF_U32 = 1, BUFF_U16 = 2, BUFF_U8 = 3 }; +//! Conversion from number of bits to sw_buff +constexpr sw_buff_t bits_to_sw_buff(size_t bits) +{ +    if (bits <= 8) { +        return BUFF_U8; +    } else if (bits <= 16) { +        return BUFF_U16; +    } else if (bits <= 32) { +        return BUFF_U32; +    } else { +        return BUFF_U64; +    } +}  //----------------------------------------------  // Constants @@ -72,6 +87,12 @@ enum sw_buff_t { BUFF_U64 = 0, BUFF_U32 = 1, BUFF_U16 = 2, BUFF_U8 = 3 };  constexpr uint16_t RFNOC_PROTO_VER = 0x0100; +constexpr uint64_t MAX_FC_CAPACITY_BYTES = (uint64_t(1) << 40) - 1; +constexpr uint32_t MAX_FC_CAPACITY_PKTS  = (uint32_t(1) << 24) - 1; +constexpr uint64_t MAX_FC_FREQ_BYTES     = (uint64_t(1) << 40) - 1; +constexpr uint32_t MAX_FC_FREQ_PKTS      = (uint32_t(1) << 24) - 1; +constexpr uint64_t MAX_FC_HEADROOM_BYTES = (uint64_t(1) << 16) - 1; +constexpr uint32_t MAX_FC_HEADROOM_PKTS  = (uint32_t(1) << 8) - 1;  }} // namespace uhd::rfnoc diff --git a/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp b/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp new file mode 100644 index 000000000..6ced60d19 --- /dev/null +++ b/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp @@ -0,0 +1,95 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_RFNOC_RX_STREAMER_HPP +#define INCLUDED_LIBUHD_RFNOC_RX_STREAMER_HPP + +#include <uhd/rfnoc/node.hpp> +#include <uhdlib/rfnoc/chdr_rx_data_xport.hpp> +#include <uhdlib/transport/rx_streamer_impl.hpp> +#include <string> + +namespace uhd { namespace rfnoc { + +/*! + *  Extends the streamer_impl to be an rfnoc node so it can connect to the + *  graph. Configures the streamer conversion and rate parameters with values + *  received through property propagation. + */ +class rfnoc_rx_streamer : public node_t, +                          public transport::rx_streamer_impl<chdr_rx_data_xport> +{ +public: +    /*! Constructor +     * +     * \param num_ports The number of ports +     * \param stream_args Arguments to aid the construction of the streamer +     */ +    rfnoc_rx_streamer(const size_t num_ports, const uhd::stream_args_t stream_args); + +    /*! Returns a unique identifier string for this node. In every RFNoC graph, +     * no two nodes cannot have the same ID. Returns a string in the form of +     * "RxStreamer#0". +     * +     * \returns The unique ID as a string +     */ +    std::string get_unique_id() const; + +    /*! Returns the number of input ports for this block. +     * +     * \return noc_id The number of ports +     */ +    size_t get_num_input_ports() const; + +    /*! Returns the number of output ports for this block. +     * +     * Always returns 0 for this block. +     * +     * \return noc_id The number of ports +     */ +    size_t get_num_output_ports() const; + +    /*! Implementation of rx_streamer API method +     * +     * \param stream_cmd the stream command to issue +     */ +    void issue_stream_cmd(const stream_cmd_t& stream_cmd); + +    /*! Returns stream args provided at creation +     * +     * \return stream args provided when streamer is created +     */ +    const uhd::stream_args_t& get_stream_args() const; + +    /*! Check that all streamer ports are connected to blocks +     * +     * Overrides node_t to ensure there are no unconnected ports. +     * +     * \param connected_inputs A list of input ports that are connected +     * \param connected_outputs A list of output ports that are connected +     * \returns true if the block can deal with this configuration +     */ +    bool check_topology(const std::vector<size_t>& connected_inputs, +        const std::vector<size_t>& connected_outputs); +private: +    void _register_props(const size_t chan, const std::string& otw_format); + +    // Properties +    std::vector<property_t<double>> _scaling_in; +    std::vector<property_t<double>> _samp_rate_in; +    std::vector<property_t<double>> _tick_rate_in; +    std::vector<property_t<std::string>> _type_in; + +    // Streamer unique ID +    const std::string _unique_id; + +    // Stream args provided at construction +    const uhd::stream_args_t _stream_args; +}; + +}} // namespace uhd::rfnoc + +#endif /* INCLUDED_LIBUHD_RFNOC_RX_STREAMER_HPP */ diff --git a/host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp b/host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp new file mode 100644 index 000000000..4acee45cc --- /dev/null +++ b/host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp @@ -0,0 +1,90 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_RFNOC_TX_STREAMER_HPP +#define INCLUDED_LIBUHD_RFNOC_TX_STREAMER_HPP + +#include <uhd/rfnoc/node.hpp> +#include <uhdlib/rfnoc/chdr_tx_data_xport.hpp> +#include <uhdlib/transport/tx_streamer_impl.hpp> +#include <string> + +namespace uhd { namespace rfnoc { + +/*! + *  Extends the streamer_impl to be an rfnoc node so it can connect to the + *  graph. Configures the streamer conversion and rate parameters with values + *  received through property propagation. + */ +class rfnoc_tx_streamer : public node_t, +                          public transport::tx_streamer_impl<chdr_tx_data_xport> +{ +public: +    /*! Constructor +     * +     * \param num_ports The number of ports +     * \param stream_args Arguments to aid the construction of the streamer +     */ +    rfnoc_tx_streamer(const size_t num_chans, const uhd::stream_args_t stream_args); + +    /*! Returns a unique identifier string for this node. In every RFNoC graph, +     * no two nodes cannot have the same ID. Returns a string in the form of +     * "TxStreamer#0". +     * +     * \returns The unique ID as a string +     */ +    std::string get_unique_id() const; + +    /*! Returns the number of input ports for this block. +     * +     * Always returns 0 for this block. +     * +     * \return noc_id The number of ports +     */ +    size_t get_num_input_ports() const; + +    /*! Returns the number of output ports for this block. +     * +     * \return noc_id The number of ports +     */ +    size_t get_num_output_ports() const; + +    /*! Returns stream args provided at creation +     * +     * \return stream args provided when streamer is created +     */ +    const uhd::stream_args_t& get_stream_args() const; + +    /*! Check that all streamer ports are connected to blocks +     * +     * Overrides node_t to ensure there are no unconnected ports. +     * +     * \param connected_inputs A list of input ports that are connected +     * \param connected_outputs A list of output ports that are connected +     * \returns true if the block can deal with this configuration +     */ +    bool check_topology(const std::vector<size_t>& connected_inputs, +        const std::vector<size_t>& connected_outputs); + +private: +    void _register_props(const size_t chan, const std::string& otw_format); + +    // Properties +    std::vector<property_t<double>> _scaling_out; +    std::vector<property_t<double>> _samp_rate_out; +    std::vector<property_t<double>> _tick_rate_out; +    std::vector<property_t<std::string>> _type_out; + +    // Streamer unique ID +    const std::string _unique_id; + +    // Stream args provided at construction +    const uhd::stream_args_t _stream_args; +}; + +}} // namespace uhd::rfnoc + +#endif /* INCLUDED_LIBUHD_RFNOC_TX_STREAMER_HPP */ diff --git a/host/lib/include/uhdlib/rfnoc/rx_flow_ctrl_state.hpp b/host/lib/include/uhdlib/rfnoc/rx_flow_ctrl_state.hpp new file mode 100644 index 000000000..937baf982 --- /dev/null +++ b/host/lib/include/uhdlib/rfnoc/rx_flow_ctrl_state.hpp @@ -0,0 +1,130 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_RFNOC_RX_FLOW_CTRL_STATE_HPP +#define INCLUDED_LIBUHD_RFNOC_RX_FLOW_CTRL_STATE_HPP + +#include <uhd/utils/log.hpp> +#include <uhdlib/rfnoc/rfnoc_common.hpp> + +namespace uhd { namespace rfnoc { + +//! Class to manage rx flow control state +class rx_flow_ctrl_state +{ +public: +    //! Constructor +    rx_flow_ctrl_state(const rfnoc::sep_id_pair_t epids) : _epids(epids) {} + +    //! Initialize frequency parameters +    void initialize(const stream_buff_params_t fc_freq) +    { +        _fc_freq = fc_freq; +    } + +    //! Resynchronize with transfer counts from the sender +    void resynchronize(const stream_buff_params_t counts) +    { +        if (_recv_counts.bytes != counts.bytes +            || _recv_counts.packets != counts.packets) { +            // If there is a discrepancy between the amount of data sent by +            // the device and received by the transport, adjust the counts +            // of data received and transferred to include the dropped data. +            auto bytes_dropped = counts.bytes - _recv_counts.bytes; +            auto pkts_dropped  = counts.packets - _recv_counts.packets; +            _xfer_counts.bytes += bytes_dropped; +            _xfer_counts.packets += pkts_dropped; + +            UHD_LOGGER_DEBUG("rx_flow_ctrl_state") +                << "oh noes: bytes_sent=" << counts.bytes +                << "  bytes_received=" << _recv_counts.bytes +                << "  pkts_sent=" << counts.packets +                << "  pkts_received=" << _recv_counts.packets +                << " src_epid=" << _epids.first << " dst_epid=" << _epids.second +                << std::endl; + +            _recv_counts = counts; +        } +    } + +    //! Reset the transfer counts (happens during init) +    void reset_counts() +    { +        UHD_LOGGER_TRACE("rx_flow_ctrl_state") +            << "Resetting transfer counts" << std::endl; +        _recv_counts = {0, 0}; +        _xfer_counts = {0, 0}; +    } + +    //! Update state when data is received +    void data_received(const size_t bytes) +    { +        _recv_counts.bytes += bytes; +        _recv_counts.packets++; +    } + +    //! Update state when transfer is complete (buffer space freed) +    void xfer_done(const size_t bytes) +    { +        _xfer_counts.bytes += bytes; +        _xfer_counts.packets++; +    } + +    //! Returns whether a flow control response is needed +    bool fc_resp_due() const +    { +        stream_buff_params_t accum_counts = { +            _xfer_counts.bytes - _last_fc_resp_counts.bytes, +            _xfer_counts.packets - _last_fc_resp_counts.packets}; + +        return accum_counts.bytes >= _fc_freq.bytes +               || accum_counts.packets >= _fc_freq.packets; +    } + +    //! Update state after flow control response was sent +    void fc_resp_sent() +    { +        _last_fc_resp_counts = _xfer_counts; +    } + +    //! Returns counts for completed transfers +    stream_buff_params_t get_xfer_counts() const +    { +        return _xfer_counts; +    } + +    //! Returns counts for completed transfers +    stream_buff_params_t get_recv_counts() const +    { +        return _recv_counts; +    } + +    //! Returns configured flow control frequency +    stream_buff_params_t get_fc_freq() const +    { +        return _fc_freq; +    } + +private: +    // Counts for data received, including any data still in use +    stream_buff_params_t _recv_counts{0, 0}; + +    // Counts for data read and whose buffer space is ok to reuse +    stream_buff_params_t _xfer_counts{0, 0}; + +    // Counts sent in last flow control response +    stream_buff_params_t _last_fc_resp_counts{0, 0}; + +    // Frequency of flow control responses +    stream_buff_params_t _fc_freq{0, 0}; + +    // Endpoint ID for log messages +    const sep_id_pair_t _epids; +}; + +}} // namespace uhd::rfnoc + +#endif /* INCLUDED_LIBUHD_RFNOC_RX_FLOW_CTRL_STATE_HPP */ diff --git a/host/lib/include/uhdlib/rfnoc/tx_flow_ctrl_state.hpp b/host/lib/include/uhdlib/rfnoc/tx_flow_ctrl_state.hpp new file mode 100644 index 000000000..65fc1b093 --- /dev/null +++ b/host/lib/include/uhdlib/rfnoc/tx_flow_ctrl_state.hpp @@ -0,0 +1,99 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_RFNOC_TX_FLOW_CTRL_STATE_HPP +#define INCLUDED_LIBUHD_RFNOC_TX_FLOW_CTRL_STATE_HPP + +#include <uhdlib/rfnoc/rfnoc_common.hpp> + +namespace uhd { namespace rfnoc { + +//! Class to manage tx flow control state +class tx_flow_ctrl_state +{ +public: +    //! Updates destination capacity +    void set_dest_capacity(const stream_buff_params_t& capacity) +    { +        _dest_capacity = capacity; +    } + +    //! Updates destination received count +    void update_dest_recv_count(const stream_buff_params_t& recv_count) +    { +        _recv_counts = recv_count; +    } + +    /*! Returns whether the destination has buffer space for the requested +     *  packet size +     */ +    bool dest_has_space(const size_t packet_size) +    { +        // The stream endpoint only cares about bytes, the packet count is not +        // important to determine the space available. +        const auto buffer_fullness = _xfer_counts.bytes - _recv_counts.bytes; +        const auto space_available = _dest_capacity.bytes - buffer_fullness; +        return space_available >= packet_size; +    } + +    //! Increments transfer count with amount of data sent +    void data_sent(const size_t packet_size) +    { +        _xfer_counts.bytes += packet_size; +        _xfer_counts.packets++; + +        // Request an fc resync after we have transferred a number of bytes >= +        // to the destination capacity. There is no strict requirement on how +        // often we need to send this, as it is only needed to correct for +        // dropped packets. One buffer's worth of bytes is probably a good +        // cadence. +        if (_xfer_counts.bytes - _last_fc_resync_bytes >= _dest_capacity.bytes) { +            _fc_resync_req = true; +        } +    } + +    /*! Returns whether an fc resync request is pending. The policy we use +     * here is to send an fc resync every time we send a number of bytes +     * equal to the destination buffer capacity. +     */ +    bool get_fc_resync_req_pending() const +    { +        return _fc_resync_req; +    } + +    //! Clears fc resync request pending status +    void clear_fc_resync_req_pending() +    { +        _fc_resync_req = false; +        _last_fc_resync_bytes = _xfer_counts.bytes; +    } + +    //! Returns counts for packets sent +    stream_buff_params_t get_xfer_counts() const +    { +        return _xfer_counts; +    } + +private: +    // Counts for data sent +    stream_buff_params_t _xfer_counts{0, 0}; + +    // Counts for data received by the destination +    stream_buff_params_t _recv_counts{0, 0}; + +    // Buffer size at the destination +    stream_buff_params_t _dest_capacity{0, 0}; + +    // Counts sent in last flow control resync +    size_t _last_fc_resync_bytes = 0; + +    // Track when to send ack packets +    bool _fc_resync_req = false; +}; + +}} // namespace uhd::rfnoc + +#endif /* INCLUDED_LIBUHD_RFNOC_TX_FLOW_CTRL_STATE_HPP */  | 
