diff options
| -rw-r--r-- | host/lib/include/uhdlib/rfnoc/mgmt_portal.hpp | 191 | ||||
| -rw-r--r-- | host/lib/rfnoc/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | host/lib/rfnoc/mgmt_portal.cpp | 997 | 
3 files changed, 1189 insertions, 1 deletions
diff --git a/host/lib/include/uhdlib/rfnoc/mgmt_portal.hpp b/host/lib/include/uhdlib/rfnoc/mgmt_portal.hpp new file mode 100644 index 000000000..305f8fb85 --- /dev/null +++ b/host/lib/include/uhdlib/rfnoc/mgmt_portal.hpp @@ -0,0 +1,191 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_MGMT_PORTAL_HPP +#define INCLUDED_LIBUHD_MGMT_PORTAL_HPP + +#include <uhdlib/rfnoc/chdr/chdr_types.hpp> +#include <uhdlib/rfnoc/xports.hpp> +#include <memory> + +namespace uhd { namespace rfnoc { namespace mgmt { + +//! A portal to perform low-level management operations from an endpoint +// +// This object provides an interface to send management commands from a software stream +// endpoint. There must one instance of this object per software stream endpoint. +// The management portal is capable of discovering all endpoints reachable from the +// transport associated with it. It can then setup routes and configure stream endpoints +// downstream. +class mgmt_portal +{ +public: +    using uptr           = std::unique_ptr<mgmt_portal>; +    using xport_cfg_fn_t = std::function<void( +        device_id_t devid, uint16_t inst, uint8_t subtype, chdr::mgmt_hop_t& hop)>; + +    //! Flow control buffer configuration parameters +    struct buff_params_t +    { +        uint64_t bytes; +        uint32_t packets; +    }; + +    //! Information about a stream endpoint +    struct sep_info_t +    { +        //! Does the endpoint support control traffic? +        bool has_ctrl; +        //! Does the endpoint support data traffic? +        bool has_data; +        //! Number of input data ports +        size_t num_input_ports; +        //! Number of output data ports +        size_t num_output_ports; +        //! Does the endpoint send a stream status packet in case of errors +        bool reports_strm_errs; +        //! Address of the endpoint +        sep_addr_t addr; +    }; + +    //! The data type of the buffer used to capture/generate data +    enum sw_buff_t { BUFF_U64 = 0, BUFF_U32 = 1, BUFF_U16 = 2, BUFF_U8 = 3 }; + +    virtual ~mgmt_portal() = 0; + +    //! Get addresses for all stream endpoints reachable from this SW mgmt portal +    //  Note that the endpoints that are not physically connected/reachable from +    //  the underlying transport will not be discovered. +    // +    virtual const std::vector<sep_addr_t>& get_reachable_endpoints() const = 0; + +    //! Initialize a stream endpoint and assign an endpoint ID to it +    // +    // \param addr The physical address of the stream endpoint +    // \param epid The endpoint ID to assign to this endpoint +    // +    virtual void initialize_endpoint(const sep_addr_t& addr, const sep_id_t& epid) = 0; + +    //! Get information about a discovered (reachable) stream endpoint +    // +    // \param epid The endpoint ID of the endpoint to lookup +    // +    virtual bool is_endpoint_initialized(const sep_id_t& epid) const = 0; + +    //! Get information about a discovered (reachable) stream endpoint +    // +    // \param epid The endpoint ID of the endpoint to lookup +    // +    virtual sep_info_t get_endpoint_info(const sep_id_t& epid) const = 0; + +    //! Setup a route from this SW mgmt portal to the specified destination endpoint +    // +    //  After a route is established, it should be possible to send packets to the +    //  destination simply by setting the DstEPID in the CHDR header to the specified +    //  dst_epid +    // +    // \param dst_epid The endpoint ID of the destination +    // +    virtual void setup_local_route(const sep_id_t& dst_epid) = 0; + +    //! Setup a route from between the source and destination endpoints +    // +    //  After a route is established, it should be possible for the source to send packets +    //  to the destination simply by setting the DstEPID in the CHDR header to the +    //  specified dst_epid +    // +    // \param dst_epid The endpoint ID of the destination +    // \param src_epid The endpoint ID of the source +    // +    virtual void setup_remote_route( +        const sep_id_t& dst_epid, const sep_id_t& src_epid) = 0; + +    //! Start configuring a flow controlled receive data stream from the endpoint with the +    //  specified ID to this SW mgmt portal. +    // +    //  RX stream setup is a two-step process. After this function is called, the flow +    //  control handler needs to acknoweledge the setup transaction then call the commit +    //  function below. +    // +    // \param epid The endpoint ID of the data source +    // \param lossy_xport Is the transport lossy? (e.g. UDP, not liberio) +    // \param pyld_buff_fmt Datatype of SW buffer that holds the data payload +    // \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata +    // \param fc_freq Flow control response frequency parameters +    // \param fc_freq Flow control headroom parameters +    // \param reset Reset ingress stream endpoint state +    // +    virtual void config_local_rx_stream_start(const sep_id_t& epid, +        const bool lossy_xport, +        const sw_buff_t pyld_buff_fmt, +        const sw_buff_t mdata_buff_fmt, +        const buff_params_t& fc_freq, +        const buff_params_t& fc_headroom, +        const bool reset = false) = 0; + +    //! Finish configuring a flow controlled receive data stream from the endpoint with +    //  the specified ID to this SW mgmt portal. +    // +    // \param epid The endpoint ID of the data source +    // +    virtual buff_params_t config_local_rx_stream_commit( +        const sep_id_t& epid, const double timeout = 0.2) = 0; + +    //! Configure a flow controlled transmit data stream from this SW mgmt portal to the +    //  endpoint with the specified ID. +    // +    // \param pyld_buff_fmt Datatype of SW buffer that holds the data payload +    // \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata +    // \param reset Reset ingress stream endpoint state +    // +    virtual void config_local_tx_stream(const sep_id_t& epid, +        const sw_buff_t pyld_buff_fmt, +        const sw_buff_t mdata_buff_fmt, +        const bool reset = false) = 0; + +    //! Configure a flow controlled data stream from the endpoint with ID src_epid to the +    //  endpoint with ID dst_epid +    // +    // \param dst_epid The endpoint ID of the destination +    // \param src_epid The endpoint ID of the source +    // \param lossy_xport Is the transport lossy? +    // \param pyld_buff_fmt Datatype of SW buffer that holds the data payload +    // \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata +    // \param fc_freq Flow control response frequency parameters +    // \param fc_freq Flow control headroom parameters +    // +    virtual buff_params_t config_remote_stream(const sep_id_t& dst_epid, +        const sep_id_t& src_epid, +        const bool lossy_xport, +        const buff_params_t& fc_freq, +        const buff_params_t& fc_headroom, +        const bool reset     = false, +        const double timeout = 0.2) = 0; + +    //! Define custom configuration functions for custom transports +    // +    // \param xport_type The type of the custom transport +    // \param init_hop_cfg_fn The function to call when initializing the custom xport +    // \param rtcfg_hop_cfg_fn The function to call when configuring routing for the +    // custom xport +    // +    virtual void register_xport_hop_cfg_fns(uint8_t xport_subtype, +        xport_cfg_fn_t init_hop_cfg_fn, +        xport_cfg_fn_t rtcfg_hop_cfg_fn) = 0; + +    //! Create an endpoint manager object +    // +    static uptr make(const both_xports_t& xports, +        const chdr::chdr_packet_factory& pkt_factory, +        uint16_t protover, +        chdr_w_t chdr_w, +        sep_id_t epid, +        device_id_t device_id); +}; + +}}} // namespace uhd::rfnoc::mgmt + +#endif /* INCLUDED_LIBUHD_MGMT_PORTAL_HPP */ diff --git a/host/lib/rfnoc/CMakeLists.txt b/host/lib/rfnoc/CMakeLists.txt index fe8e8a564..e4715a644 100644 --- a/host/lib/rfnoc/CMakeLists.txt +++ b/host/lib/rfnoc/CMakeLists.txt @@ -32,7 +32,7 @@ LIBUHD_APPEND_SOURCES(      ${CMAKE_CURRENT_SOURCE_DIR}/sink_node_ctrl.cpp      ${CMAKE_CURRENT_SOURCE_DIR}/source_block_ctrl_base.cpp      ${CMAKE_CURRENT_SOURCE_DIR}/source_node_ctrl.cpp -#    ${CMAKE_CURRENT_SOURCE_DIR}/mgmt_portal.cpp +    ${CMAKE_CURRENT_SOURCE_DIR}/mgmt_portal.cpp      ${CMAKE_CURRENT_SOURCE_DIR}/stream_sig.cpp      ${CMAKE_CURRENT_SOURCE_DIR}/tick_node_ctrl.cpp      ${CMAKE_CURRENT_SOURCE_DIR}/tx_stream_terminator.cpp diff --git a/host/lib/rfnoc/mgmt_portal.cpp b/host/lib/rfnoc/mgmt_portal.cpp new file mode 100644 index 000000000..fcb19997e --- /dev/null +++ b/host/lib/rfnoc/mgmt_portal.cpp @@ -0,0 +1,997 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + + +#include <uhd/exception.hpp> +#include <uhd/utils/log.hpp> +#include <uhdlib/rfnoc/chdr/chdr_packet.hpp> +#include <uhdlib/rfnoc/mgmt_portal.hpp> +#include <unordered_set> +#include <boost/format.hpp> +#include <cmath> +#include <mutex> +#include <queue> + +namespace uhd { namespace rfnoc { namespace mgmt { + +using namespace chdr; +using namespace transport; + +constexpr bool ALLOW_DAISY_CHAINING = false; + +constexpr uint16_t REG_EPID_SELF               = 0x00; // RW +constexpr uint16_t REG_RESET_AND_FLUSH         = 0x04; // W +constexpr uint16_t REG_OSTRM_CTRL_STATUS       = 0x08; // RW +constexpr uint16_t REG_OSTRM_DST_EPID          = 0x0C; // W +constexpr uint16_t REG_OSTRM_FC_FREQ_BYTES_LO  = 0x10; // W +constexpr uint16_t REG_OSTRM_FC_FREQ_BYTES_HI  = 0x14; // W +constexpr uint16_t REG_OSTRM_FC_FREQ_PKTS      = 0x18; // W +constexpr uint16_t REG_OSTRM_FC_HEADROOM       = 0x1C; // W +constexpr uint16_t REG_OSTRM_BUFF_CAP_BYTES_LO = 0x20; // R +constexpr uint16_t REG_OSTRM_BUFF_CAP_BYTES_HI = 0x24; // R +constexpr uint16_t REG_OSTRM_BUFF_CAP_PKTS     = 0x28; // R +constexpr uint16_t REG_OSTRM_SEQ_ERR_CNT       = 0x2C; // R +constexpr uint16_t REG_OSTRM_DATA_ERR_CNT      = 0x30; // R +constexpr uint16_t REG_OSTRM_ROUTE_ERR_CNT     = 0x34; // R +constexpr uint16_t REG_ISTRM_CTRL_STATUS       = 0x38; // RW + +constexpr uint32_t RESET_AND_FLUSH_OSTRM = (1 << 0); +constexpr uint32_t RESET_AND_FLUSH_ISTRM = (1 << 1); +constexpr uint32_t RESET_AND_FLUSH_CTRL  = (1 << 2); +constexpr uint32_t RESET_AND_FLUSH_ALL   = 0x7; + +constexpr uint32_t BUILD_CTRL_STATUS_WORD(bool cfg_start, +    bool xport_lossy, +    mgmt_portal::sw_buff_t pyld_buff_fmt, +    mgmt_portal::sw_buff_t mdata_buff_fmt) +{ +    return (cfg_start ? 1 : 0) | (xport_lossy ? 2 : 0) +           | (static_cast<uint32_t>(pyld_buff_fmt) << 2) +           | (static_cast<uint32_t>(mdata_buff_fmt) << 4); +} + +constexpr uint32_t STRM_STATUS_FC_ENABLED    = 0x80000000; +constexpr uint32_t STRM_STATUS_SETUP_ERR     = 0x40000000; +constexpr uint32_t STRM_STATUS_SETUP_PENDING = 0x20000000; + +//! The type of a node in the data-flow graph +enum node_type { +    //! Invalid type. The FPGA will never have a node with type = 0 +    NODE_TYPE_INVALID = 0, +    //! CHDR Crossbar +    NODE_TYPE_XBAR = 1, +    //! Stream Endpoint +    NODE_TYPE_STRM_EP = 2, +    //! Transport +    NODE_TYPE_XPORT = 3 +}; + +//! A unique identifier for a node +struct node_id_t +{ +    //! A unique ID for device that houses this node +    device_id_t device_id; +    //! The type of this node +    node_type type; +    //! The instance number of this node in the device +    sep_inst_t inst; +    //! Extended info about node (not used for comparisons) +    uint32_t extended_info; + +    // ctors and operators +    node_id_t()                     = delete; +    node_id_t(const node_id_t& rhs) = default; +    node_id_t(device_id_t device_id_, node_type type_, sep_inst_t inst_) +        : device_id(device_id_), type(type_), inst(inst_), extended_info(0) +    { +    } +    node_id_t(device_id_t device_id_, +        node_type type_, +        sep_inst_t inst_, +        uint32_t extended_info_) +        : device_id(device_id_), type(type_), inst(inst_), extended_info(extended_info_) +    { +    } +    node_id_t(const sep_addr_t& sep_addr) +        : device_id(sep_addr.first) +        , type(NODE_TYPE_STRM_EP) +        , inst(sep_addr.second) +        , extended_info(0) +    { +    } + +    inline uint64_t unique_id() const +    { +        return (static_cast<uint64_t>(inst) + (static_cast<uint64_t>(device_id) << 16) +                + (static_cast<uint64_t>(type) << 32)); +    } +    inline std::string to_string() const +    { +        static const std::map<node_type, std::string> NODE_STR = { +            {NODE_TYPE_INVALID, "unknown"}, +            {NODE_TYPE_XBAR, "xbar"}, +            {NODE_TYPE_STRM_EP, "sep"}, +            {NODE_TYPE_XPORT, "xport"}}; +        return str( +            boost::format("device:%d/%s:%d") % device_id % NODE_STR.at(type) % inst); +    } + +    inline friend bool operator<(const node_id_t& lhs, const node_id_t& rhs) +    { +        return (lhs.unique_id() < rhs.unique_id()); +    } +    inline friend bool operator==(const node_id_t& lhs, const node_id_t& rhs) +    { +        return (lhs.unique_id() == rhs.unique_id()); +    } +    inline friend bool operator!=(const node_id_t& lhs, const node_id_t& rhs) +    { +        return (lhs.unique_id() != rhs.unique_id()); +    } +    inline node_id_t& operator=(const node_id_t&) = default; +}; + +//! The local destination to take at the current node to reach the next node +//  - If negative, then no specific action necessary +//  - If non-negative, then route (select destination) to the value +using next_dest_t = int32_t; + +//! An address that allows locating a node in a data-flow network starting from +//  a specific stream endpoint. The address is a collection (vector) of nodes and +//  the respective routing decisions to get to the final node. +using node_addr_t = std::vector<std::pair<node_id_t, next_dest_t>>; + +std::string to_string(const node_addr_t& node_addr) +{ +    if (!node_addr.empty()) { +        std::string str(""); +        for (const auto& hop : node_addr) { +            str += hop.first.to_string() + std::string(",") + std::to_string(hop.second) + std::string("->"); +        } +        return str; +    } else { +        return std::string("<empty>"); +    } +} + +// Empty dtor for stream_manager +mgmt_portal::~mgmt_portal() {} + +//--------------------------------------------------------------- +// Stream Manager Implementation +//--------------------------------------------------------------- +class mgmt_portal_impl : public mgmt_portal +{ +public: +    mgmt_portal_impl(const both_xports_t& xports, +        const chdr::chdr_packet_factory& pkt_factory, +        uint16_t protover, +        chdr_w_t chdr_w, +        sep_id_t my_epid, +        device_id_t my_device_id) +        : _my_epid(my_epid) +        , _protover(protover) +        , _chdr_w(chdr_w) +        , _my_node_id(my_device_id, NODE_TYPE_STRM_EP, my_epid) +        , _recv_xport(xports.recv) +        , _send_xport(xports.send) +        , _send_seqnum(0) +        , _send_pkt(std::move(pkt_factory.make_mgmt())) +        , _recv_pkt(std::move(pkt_factory.make_mgmt())) +    { +        std::lock_guard<std::recursive_mutex> lock(_mutex); +        _discover_topology(); +    } + +    virtual ~mgmt_portal_impl() {} + +    virtual const std::vector<sep_addr_t>& get_reachable_endpoints() const +    { +        return _discovered_ep_vtr; +    } + +    virtual void initialize_endpoint(const sep_addr_t& addr, const sep_id_t& epid) +    { +        std::lock_guard<std::recursive_mutex> lock(_mutex); + +        // Create a node ID from lookup info +        node_id_t lookup_node(addr.first, NODE_TYPE_STRM_EP, addr.second); +        if (_node_addr_map.count(lookup_node) == 0) { +            throw uhd::lookup_error( +                "initialize_endpoint(): Cannot reach node with specified address."); +        } +        const node_addr_t& node_addr = _node_addr_map.at(lookup_node); + +        // Build a management transaction to first get to the node +        mgmt_payload cfg_xact; +        cfg_xact.set_header(_my_epid, _protover, _chdr_w); +        _traverse_to_node(cfg_xact, node_addr); + +        mgmt_hop_t cfg_hop; +        cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_WR_REQ, +            mgmt_op_t::cfg_payload(REG_RESET_AND_FLUSH, RESET_AND_FLUSH_ALL))); +        cfg_hop.add_op(mgmt_op_t( +            mgmt_op_t::MGMT_OP_CFG_WR_REQ, mgmt_op_t::cfg_payload(REG_EPID_SELF, epid))); +        cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_RETURN)); +        cfg_xact.add_hop(cfg_hop); + +        // Send the transaction and receive a response. +        // We don't care about the contents of the response. +        _send_recv_mgmt_transaction(cfg_xact); + +        // Add/update the entry in the stream endpoint ID map +        _epid_addr_map[epid] = addr; +        UHD_LOG_DEBUG("RFNOC::MGMT", +            (boost::format("Bound stream endpoint with Addr=(%d,%d) to EPID=%d") +                % addr.first % addr.second % epid)); +        UHD_LOG_TRACE("RFNOC::MGMT", +            (boost::format( +                 "Stream endpoint with EPID=%d can be reached by taking the path: %s") +                % epid % to_string(node_addr))); +    } + +    virtual bool is_endpoint_initialized(const sep_id_t& epid) const +    { +        std::lock_guard<std::recursive_mutex> lock(_mutex); +        return (_epid_addr_map.count(epid) > 0); +    } + +    virtual sep_info_t get_endpoint_info(const sep_id_t& epid) const +    { +        std::lock_guard<std::recursive_mutex> lock(_mutex); + +        // Lookup the destination node address using the endpoint ID +        if (_epid_addr_map.count(epid) == 0) { +            throw uhd::lookup_error( +                "get_endpoint_info(): Could not find a stream with specified ID."); +        } +        node_id_t lookup_node(_epid_addr_map.at(epid)); +        // If a node is in _epid_addr_map then it must be in _node_addr_map +        UHD_ASSERT_THROW(_node_addr_map.count(lookup_node) > 0); +        // Why is key_node different from lookup_node? +        // Because it has additional extended info (look at operator< def) +        const node_id_t& key_node = _node_addr_map.find(lookup_node)->first; + +        // Build a return val +        sep_info_t retval; +        retval.has_ctrl        = (key_node.extended_info >> 0) & 0x1; +        retval.has_data        = (key_node.extended_info >> 1) & 0x1; +        retval.num_input_ports = retval.has_data ? ((key_node.extended_info >> 2) & 0x3F) +                                                 : 0; +        retval.num_output_ports = retval.has_data ? ((key_node.extended_info >> 8) & 0x3F) +                                                  : 0; +        retval.reports_strm_errs = (key_node.extended_info >> 14) & 0x1; +        retval.addr              = _epid_addr_map.at(epid); +        return retval; +    } + +    virtual void setup_local_route(const sep_id_t& dst_epid) +    { +        std::lock_guard<std::recursive_mutex> lock(_mutex); + +        // Lookup the physical stream endpoint address using the endpoint ID +        const node_addr_t& node_addr = _lookup_sep_node_addr(dst_epid); + +        // Build a management transaction to configure all the nodes in the path going to +        // dst_epid +        mgmt_payload cfg_xact; +        cfg_xact.set_header(_my_epid, _protover, _chdr_w); +        for (const auto& addr_pair : node_addr) { +            const node_id_t& curr_node   = addr_pair.first; +            const next_dest_t& curr_dest = addr_pair.second; +            mgmt_hop_t curr_cfg_hop; +            switch (curr_node.type) { +                case NODE_TYPE_XBAR: { +                    // Configure the routing table to route all packets going to dst_epid +                    // to the port with index next_dest_t +                    curr_cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_WR_REQ, +                        mgmt_op_t::cfg_payload(dst_epid, curr_dest))); +                    curr_cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_SEL_DEST, +                        mgmt_op_t::sel_dest_payload(static_cast<uint16_t>(curr_dest)))); +                } break; +                case NODE_TYPE_XPORT: { +                    uint8_t node_subtype = +                        static_cast<uint8_t>(curr_node.extended_info & 0xFF); +                    // Run a hop configuration function for custom transports +                    if (_rtcfg_cfg_fns.count(node_subtype)) { +                        _rtcfg_cfg_fns.at(node_subtype)(curr_node.device_id, +                            curr_node.inst, +                            node_subtype, +                            curr_cfg_hop); +                    } else { +                        curr_cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_NOP)); +                    } +                } break; +                case NODE_TYPE_STRM_EP: { +                    // Stream are not involved in routing, so do nothing +                } break; +                default: { +                    UHD_THROW_INVALID_CODE_PATH(); +                } break; +            } +            // Add this hop to the trancation only if it's not empty +            if (curr_cfg_hop.get_num_ops() > 0) { +                cfg_xact.add_hop(curr_cfg_hop); +            } +        } + +        // If we follow this route then we should end up at a stream endpoint +        // so add an extra hop and return the packet back with the node info we will +        // sanity check it later +        mgmt_hop_t discover_hop; +        discover_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_INFO_REQ)); +        discover_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_RETURN)); +        cfg_xact.add_hop(discover_hop); + +        // Send the transaction and validate that we saw a stream endpoint +        const mgmt_payload sep_info_xact = _send_recv_mgmt_transaction(cfg_xact); +        const node_id_t sep_node         = _pop_node_discovery_hop(sep_info_xact); +        if (sep_node.type != NODE_TYPE_STRM_EP) { +            throw uhd::routing_error( +                "Route setup failed. Could not confirm terminal stream endpoint"); +        } + +        UHD_LOG_DEBUG("RFNOC::MGMT", +            (boost::format("Established a route from EPID=%d (SW) to EPID=%d") % _my_epid +                % dst_epid)); +        UHD_LOG_TRACE("RFNOC::MGMT", +            (boost::format("The destination for EPID=%d has been added to all routers in " +                           "the path: %s") +                % dst_epid % to_string(node_addr))); +    } + +    virtual void setup_remote_route(const sep_id_t& dst_epid, const sep_id_t& src_epid) +    { +        std::lock_guard<std::recursive_mutex> lock(_mutex); + +        // Lookup the src and dst node address using the endpoint ID +        const node_addr_t& dst_node_addr = _lookup_sep_node_addr(dst_epid); +        const node_addr_t& src_node_addr = _lookup_sep_node_addr(src_epid); + +        // Find a common parent (could be faster than n^2 but meh, this is easier) +        bool found_common_parent = false; +        for (const auto& dnode : dst_node_addr) { +            for (const auto& snode : src_node_addr) { +                found_common_parent = +                    ((dnode.first == snode.first) && dnode.first.type == NODE_TYPE_XBAR); +                if (found_common_parent) +                    break; +            } +            if (found_common_parent) +                break; +        } +        if (!found_common_parent) { +            throw uhd::routing_error("setup_remote_route: Route setup failed. The " +                                     "endpoints don't share a common crossbar parent."); +        } + +        // If we setup local routes from this host to both the source and destination +        // endpoints then the routing algorithm will guarantee that packet between src and +        // dst will have a path between them as long as they share a common parent +        // (crossbar). The assumption is verified above. It is also guaranteed that the +        // path between them will be the shortest one. It is possible that we are +        // configuring more crossbars than necessary but we do this for simplicity. If +        // there is a need to optimize for routing table fullness, we can do a software +        // graph traversal here, find the closest common parent (crossbar) for the two +        // nodes and only configure the nodes downstream of that. +        setup_local_route(dst_epid); +        setup_local_route(src_epid); + +        UHD_LOG_DEBUG("RFNOC::MGMT", +            (boost::format( +                 "The two routes above now enable a route from EPID=%d to EPID=%s") +                % src_epid % dst_epid)); +    } + +    virtual void config_local_rx_stream_start(const sep_id_t& epid, +        const bool lossy_xport, +        const sw_buff_t pyld_buff_fmt, +        const sw_buff_t mdata_buff_fmt, +        const buff_params_t& fc_freq, +        const buff_params_t& fc_headroom, +        const bool reset = false) +    { +        std::lock_guard<std::recursive_mutex> lock(_mutex); + +        // The discovery process has already setup a route from the +        // destination to us. No additional action is necessary. + +        const node_addr_t& node_addr = _lookup_sep_node_addr(epid); + +        // Build a management transaction to first get to the node +        mgmt_payload cfg_xact; +        cfg_xact.set_header(_my_epid, _protover, _chdr_w); +        _traverse_to_node(cfg_xact, node_addr); + +        mgmt_hop_t cfg_hop; +        // Assert reset if requested +        if (reset) { +            cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_WR_REQ, +                mgmt_op_t::cfg_payload(REG_RESET_AND_FLUSH, RESET_AND_FLUSH_OSTRM))); +        } +        // Set destination of the stream to us (this endpoint) +        cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_WR_REQ, +            mgmt_op_t::cfg_payload(REG_OSTRM_DST_EPID, _my_epid))); +        // Configure flow control parameters +        _push_ostrm_flow_control_config( +            lossy_xport, pyld_buff_fmt, mdata_buff_fmt, fc_freq, fc_headroom, cfg_hop); +        // Return the packet back to us +        cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_RETURN)); + +        // Send the transaction and receive a response. +        // We don't care about the contents of the response. +        cfg_xact.add_hop(cfg_hop); +        _send_recv_mgmt_transaction(cfg_xact); + +        UHD_LOG_DEBUG("RFNOC::MGMT", +            (boost::format("Initiated RX stream setup for EPID=%d") % epid)); +    } + +    virtual buff_params_t config_local_rx_stream_commit( +        const sep_id_t& epid, const double timeout = 0.2) +    { +        std::lock_guard<std::recursive_mutex> lock(_mutex); + +        // Wait for stream configuration to finish on the HW side +        const node_addr_t& node_addr = _lookup_sep_node_addr(epid); +        _validate_stream_setup(node_addr, timeout); + +        UHD_LOG_DEBUG("RFNOC::MGMT", +            (boost::format("Finished RX stream setup for EPID=%d") % epid)); + +        // Return discovered buffer parameters +        return std::get<1>(_get_ostrm_status(node_addr)); +    } + +    virtual void config_local_tx_stream(const sep_id_t& epid, +        const sw_buff_t pyld_buff_fmt, +        const sw_buff_t mdata_buff_fmt, +        const bool reset = false) +    { +        std::lock_guard<std::recursive_mutex> lock(_mutex); + +        // First setup a route between to the endpoint +        setup_local_route(epid); + +        const node_addr_t& node_addr = _lookup_sep_node_addr(epid); + +        // Build a management transaction to first get to the node +        mgmt_payload cfg_xact; +        cfg_xact.set_header(_my_epid, _protover, _chdr_w); +        _traverse_to_node(cfg_xact, node_addr); + +        mgmt_hop_t cfg_hop; +        // Assert reset if requested +        if (reset) { +            cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_WR_REQ, +                mgmt_op_t::cfg_payload(REG_RESET_AND_FLUSH, RESET_AND_FLUSH_ISTRM))); +        } +        // Configure buffer types +        cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_WR_REQ, +            mgmt_op_t::cfg_payload(REG_ISTRM_CTRL_STATUS, +                BUILD_CTRL_STATUS_WORD(false, false, pyld_buff_fmt, mdata_buff_fmt)))); +        cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_RETURN)); +        cfg_xact.add_hop(cfg_hop); + +        // Send the transaction and receive a response. +        // We don't care about the contents of the response. +        _send_recv_mgmt_transaction(cfg_xact); + +        UHD_LOG_DEBUG("RFNOC::MGMT", +            (boost::format("Finished TX stream setup for EPID=%d") % epid)); +    } + +    virtual buff_params_t config_remote_stream(const sep_id_t& dst_epid, +        const sep_id_t& src_epid, +        const bool lossy_xport, +        const buff_params_t& fc_freq, +        const buff_params_t& fc_headroom, +        const bool reset     = false, +        const double timeout = 0.2) +    { +        std::lock_guard<std::recursive_mutex> lock(_mutex); + +        // First setup a route between the two endpoints +        setup_remote_route(dst_epid, src_epid); + +        const node_addr_t& dst_node_addr = _lookup_sep_node_addr(dst_epid); +        const node_addr_t& src_node_addr = _lookup_sep_node_addr(src_epid); + +        // If requested, send transactions to reset and flush endpoints +        if (reset) { +            // Reset source and destination (in that order) +            for (size_t i = 0; i < 2; i++) { +                mgmt_payload rst_xact; +                rst_xact.set_header(_my_epid, _protover, _chdr_w); +                _traverse_to_node(rst_xact, (i == 0) ? src_node_addr : dst_node_addr); +                mgmt_hop_t rst_hop; +                rst_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_WR_REQ, +                    mgmt_op_t::cfg_payload(REG_RESET_AND_FLUSH, +                        (i == 0) ? RESET_AND_FLUSH_OSTRM : RESET_AND_FLUSH_ISTRM))); +                rst_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_RETURN)); +                rst_xact.add_hop(rst_hop); +                _send_recv_mgmt_transaction(rst_xact); +            } +        } + +        // Build a management transaction to configure the source node +        { +            mgmt_payload cfg_xact; +            cfg_xact.set_header(_my_epid, _protover, _chdr_w); +            _traverse_to_node(cfg_xact, src_node_addr); +            mgmt_hop_t cfg_hop; +            // Set destination of the stream to dst_epid +            cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_WR_REQ, +                mgmt_op_t::cfg_payload(REG_OSTRM_DST_EPID, dst_epid))); +            // Configure flow control parameters +            _push_ostrm_flow_control_config( +                lossy_xport, BUFF_U64, BUFF_U64, fc_freq, fc_headroom, cfg_hop); +            // Return the packet back to us +            cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_RETURN)); + +            // Send the transaction and receive a response. +            // We don't care about the contents of the response. +            cfg_xact.add_hop(cfg_hop); +            _send_recv_mgmt_transaction(cfg_xact); +        } + +        // Wait for stream configuration to finish on the HW side +        _validate_stream_setup(src_node_addr, timeout); + +        UHD_LOG_DEBUG("RFNOC::MGMT", +            (boost::format("Setup a stream from EPID=%d to EPID=%d") % src_epid +                % dst_epid)); + +        // Return discovered buffer parameters +        return std::get<1>(_get_ostrm_status(src_node_addr)); +    } + + +    virtual void register_xport_hop_cfg_fns(uint8_t xport_subtype, +        xport_cfg_fn_t init_hop_cfg_fn, +        xport_cfg_fn_t rtcfg_hop_cfg_fn) +    { +        _init_cfg_fns[xport_subtype]  = init_hop_cfg_fn; +        _rtcfg_cfg_fns[xport_subtype] = rtcfg_hop_cfg_fn; +    } + + +private: // Functions +    // Discover all nodes that are reachable from this software stream endpoint +    void _discover_topology() +    { +        // Initialize a queue of pending paths. We will use this for a breadth-first +        // traversal of the dataflow graph. The queue consists of a previously discovered +        // node and the next destination to take from that node. +        std::queue<std::pair<node_id_t, next_dest_t>> pending_paths; + +        // Add ourselves to the the pending queue to kick off the search +        UHD_LOG_DEBUG("RFNOC::MGMT", +            "Starting topology discovery from " << _my_node_id.to_string()); +        bool is_first_path = true; +        pending_paths.push(std::make_pair(_my_node_id, next_dest_t(-1))); + +        while (not pending_paths.empty()) { +            // Pop the next path to discover from the pending queue +            const auto& next_path = pending_paths.front(); +            pending_paths.pop(); + +            // We need to build a node_addr_t to allow us to get to next_path +            // To do so we first lookup how to get to next_path.first. This location has +            // already been discovered so we should just be able to look it up in +            // _node_addr_map. The only exception for that is when we are just starting +            // out, in which case our previous node is "us". +            node_addr_t next_addr = is_first_path ? node_addr_t() +                                                  : _node_addr_map.at(next_path.first); +            // Once we know how to get to the base node, then add the next destination +            next_addr.push_back(next_path); +            is_first_path = false; + +            // Build a management transaction to first get to our destination so that we +            // can ask it to identify itself +            mgmt_payload route_xact; +            route_xact.set_header(_my_epid, _protover, _chdr_w); +            _traverse_to_node(route_xact, next_addr); + +            // Discover downstream node (we ask the node to identify itself) +            mgmt_payload disc_req_xact(route_xact); +            // Push a node discovery hop +            mgmt_hop_t disc_hop; +            disc_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_INFO_REQ)); +            disc_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_RETURN)); +            disc_req_xact.add_hop(disc_hop); +            const mgmt_payload disc_resp_xact = +                _send_recv_mgmt_transaction(disc_req_xact); +            const node_id_t new_node = _pop_node_discovery_hop(disc_resp_xact); + +            // We found a node! +            // First check if we have already seen this node in the past. If not, we have +            // to add it to our internal data structures. If we have already seen it then +            // we just skip it. It is OK to skip the node because we are doing a BFS, +            // which means that the first time a node is discovered during the traversal, +            // the distance from this EP to that node will be the shortest path. The core +            // design philosophy for RFNoC is that the data will always take the shortest +            // path, because we make the assumption that a shorter path *always* has +            // better QoS compared to a longer one. If this assumption is not true, we +            // have to handle ordering by QoS for which we need to modify this search a +            // bit and provide QoS preferences in the API. That may be a future feature. +            if (_node_addr_map.count(new_node) > 0) { +                UHD_LOG_DEBUG("RFNOC::MGMT", +                    "Re-discovered node " << new_node.to_string() << ". Skipping it"); +            } else { +                UHD_LOG_DEBUG("RFNOC::MGMT", "Discovered node " << new_node.to_string()); +                _node_addr_map[new_node] = next_addr; + +                // Initialize the node (first time config) +                mgmt_payload init_req_xact(route_xact); +                _push_node_init_hop(init_req_xact, new_node); +                const mgmt_payload init_resp_xact = +                    _send_recv_mgmt_transaction(init_req_xact); +                UHD_LOG_DEBUG("RFNOC::MGMT", "Initialized node " << new_node.to_string()); + +                // If the new node is a stream endpoint then we are done traversing this +                // path. If not, then check all ports downstream of the new node and add +                // them to pending_paths for further traversal +                switch (new_node.type) { +                    case NODE_TYPE_XBAR: { +                        // Total ports on this crossbar +                        size_t nports = +                            static_cast<size_t>(new_node.extended_info & 0xFF); +                        // Total transport ports on this crossbar (the first nports_xport +                        // ports are transport ports) +                        size_t nports_xport = +                            static_cast<size_t>((new_node.extended_info >> 8) & 0xFF); +                        // When we allow daisy chaining, we need to recursively check +                        // other transports +                        size_t start_port = ALLOW_DAISY_CHAINING ? 0 : nports_xport; +                        for (size_t i = start_port; i < nports; i++) { +                            // Skip the current port because it's the input +                            if (i != static_cast<size_t>(new_node.inst)) { +                                // If there is a single downstream port then do nothing +                                pending_paths.push(std::make_pair( +                                    new_node, static_cast<next_dest_t>(i))); +                            } +                        } +                        UHD_LOG_TRACE("RFNOC::MGMT", +                            "Discovered crossbar has " +                                << nports << " ports, " << nports_xport +                                << " transports and we are hooked up on port " +                                << new_node.inst); +                    } break; +                    case NODE_TYPE_STRM_EP: { +                        // Stop searching when we find a stream endpoint +                        // Add the endpoint to the discovered endpoint vector +                        _discovered_ep_vtr.push_back( +                            std::make_pair(new_node.device_id, new_node.inst)); +                    } break; +                    case NODE_TYPE_XPORT: { +                        // A transport has only one output. We don't need to take +                        // any action to reach +                        pending_paths.push(std::make_pair(new_node, -1)); +                    } break; +                    default: { +                        UHD_THROW_INVALID_CODE_PATH(); +                        break; +                    } +                } +            } +        } +    } + +    // Add hops to the management transaction to reach the specified node +    void _traverse_to_node(mgmt_payload& transaction, const node_addr_t& node_addr) +    { +        for (const auto& addr_pair : node_addr) { +            const node_id_t& curr_node   = addr_pair.first; +            const next_dest_t& curr_dest = addr_pair.second; +            if (curr_node.type != NODE_TYPE_STRM_EP) { +                // If a node is a crossbar, then it have have a non-negative destination +                UHD_ASSERT_THROW((curr_node.type != NODE_TYPE_XBAR || curr_dest >= 0)); +                _push_advance_hop(transaction, curr_dest); +            } else { +                // This is a stream endpoint. Nothing needs to be done to advance +                // here. The behavior of this operation is identical whether or +                // not the stream endpoint is in software or not. +            } +        } +    } + +    // Add a hop to the transaction simply to get to the next node +    void _push_advance_hop(mgmt_payload& transaction, const next_dest_t& next_dst) +    { +        if (next_dst >= 0) { +            mgmt_hop_t sel_dest_hop; +            sel_dest_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_SEL_DEST, +                mgmt_op_t::sel_dest_payload(static_cast<uint16_t>(next_dst)))); +            transaction.add_hop(sel_dest_hop); +        } else { +            mgmt_hop_t nop_hop; +            nop_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_NOP)); +            transaction.add_hop(nop_hop); +        } +    } + +    // Add operations to a hop to configure flow control for an output stream +    void _push_ostrm_flow_control_config(const bool lossy_xport, +        const sw_buff_t pyld_buff_fmt, +        const sw_buff_t mdata_buff_fmt, +        const buff_params_t& fc_freq, +        const buff_params_t& fc_headroom, +        mgmt_hop_t& hop) +    { +        // Validate flow control parameters +        if (fc_freq.bytes >= (uint64_t(1) << 40) +            || fc_freq.packets >= (uint64_t(1) << 24)) { +            throw uhd::value_error("Flow control frequency parameters out of bounds"); +        } +        if (fc_headroom.bytes >= (uint64_t(1) << 16) +            || fc_headroom.packets >= (uint64_t(1) << 8)) { +            throw uhd::value_error("Flow control headroom parameters out of bounds"); +        } + +        // Add flow control parameters to hop +        hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_WR_REQ, +            mgmt_op_t::cfg_payload(REG_OSTRM_FC_FREQ_BYTES_LO, +                static_cast<uint32_t>(fc_freq.bytes & uint64_t(0xFFFFFFFF))))); +        hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_WR_REQ, +            mgmt_op_t::cfg_payload( +                REG_OSTRM_FC_FREQ_BYTES_HI, static_cast<uint32_t>(fc_freq.bytes >> 32)))); +        hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_WR_REQ, +            mgmt_op_t::cfg_payload( +                REG_OSTRM_FC_FREQ_PKTS, static_cast<uint32_t>(fc_freq.packets)))); +        const uint32_t headroom_reg = +            (static_cast<uint32_t>(fc_headroom.bytes) & 0xFFFF) +            | ((static_cast<uint32_t>(fc_headroom.packets) & 0xFF) << 16); +        hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_WR_REQ, +            mgmt_op_t::cfg_payload(REG_OSTRM_FC_HEADROOM, headroom_reg))); +        // Configure buffer types and lossy_xport, then start configuration +        hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_WR_REQ, +            mgmt_op_t::cfg_payload(REG_OSTRM_CTRL_STATUS, +                BUILD_CTRL_STATUS_WORD( +                    true, lossy_xport, pyld_buff_fmt, mdata_buff_fmt)))); +    } + +    // Send/recv a management transaction that will get the output stream status +    std::tuple<uint32_t, buff_params_t> _get_ostrm_status(const node_addr_t& node_addr) +    { +        // Build a management transaction to first get to the node +        mgmt_payload status_xact; +        status_xact.set_header(_my_epid, _protover, _chdr_w); +        _traverse_to_node(status_xact, node_addr); + +        // Read all the status registers +        mgmt_hop_t cfg_hop; +        cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_RD_REQ, +            mgmt_op_t::cfg_payload(REG_OSTRM_CTRL_STATUS))); +        cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_RD_REQ, +            mgmt_op_t::cfg_payload(REG_OSTRM_BUFF_CAP_BYTES_LO))); +        cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_RD_REQ, +            mgmt_op_t::cfg_payload(REG_OSTRM_BUFF_CAP_BYTES_HI))); +        cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_RD_REQ, +            mgmt_op_t::cfg_payload(REG_OSTRM_BUFF_CAP_PKTS))); +        cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_RETURN)); +        status_xact.add_hop(cfg_hop); + +        // Send the transaction, receive a response and validate it +        const mgmt_payload resp_xact = _send_recv_mgmt_transaction(status_xact); +        if (resp_xact.get_num_hops() != 1) { +            throw uhd::op_failed("Management operation failed. Incorrect format (hops)."); +        } +        const mgmt_hop_t& rhop = resp_xact.get_hop(0); +        if (rhop.get_num_ops() <= 1 +            || rhop.get_op(0).get_op_code() != mgmt_op_t::MGMT_OP_NOP) { +            throw uhd::op_failed( +                "Management operation failed. Incorrect format (operations)."); +        } +        for (size_t i = 1; i < rhop.get_num_ops(); i++) { +            if (rhop.get_op(i).get_op_code() != mgmt_op_t::MGMT_OP_CFG_RD_RESP) { +                throw uhd::op_failed( +                    "Management operation failed. Incorrect format (operations)."); +            } +        } + +        // Extract peek data from transaction +        mgmt_op_t::cfg_payload status_pl    = rhop.get_op(1).get_op_payload(); +        mgmt_op_t::cfg_payload cap_bytes_lo = rhop.get_op(2).get_op_payload(); +        mgmt_op_t::cfg_payload cap_bytes_hi = rhop.get_op(3).get_op_payload(); +        mgmt_op_t::cfg_payload cap_pkts     = rhop.get_op(4).get_op_payload(); + +        buff_params_t buff_params; +        buff_params.bytes = static_cast<uint64_t>(cap_bytes_lo.data) +                            | (static_cast<uint64_t>(cap_bytes_hi.data) << 32); +        buff_params.packets = static_cast<uint32_t>(cap_pkts.data); +        return std::make_tuple(status_pl.data, buff_params); +    } + +    // Make sure that stream setup is complete and successful, else throw exception +    void _validate_stream_setup(const node_addr_t& node_addr, const double timeout) +    { +        // Get the status of the output stream +        uint32_t ostrm_status = 0; +        double sleep_s        = 0.05; +        for (size_t i = 0; i < size_t(std::ceil(timeout / sleep_s)); i++) { +            ostrm_status = std::get<0>(_get_ostrm_status(node_addr)); +            if ((ostrm_status & STRM_STATUS_SETUP_PENDING) != 0) { +                // Wait and retry +                std::chrono::milliseconds(static_cast<int64_t>(sleep_s * 1000)); +            } else { +                // Configuration is done +                break; +            } +        } + +        if ((ostrm_status & STRM_STATUS_SETUP_PENDING) != 0) { +            throw uhd::op_timeout("config_stream: Operation timed out"); +        } +        if ((ostrm_status & STRM_STATUS_SETUP_ERR) != 0) { +            throw uhd::op_failed("config_stream: Setup failure"); +        } +        if ((ostrm_status & STRM_STATUS_FC_ENABLED) == 0) { +            throw uhd::op_failed("config_stream: Flow control negotiation failed"); +        } +    } + + +    // Pop a node discovery response from a transaction and parse it +    const node_id_t _pop_node_discovery_hop(const mgmt_payload& transaction) +    { +        if (transaction.get_num_hops() != 1) { +            throw uhd::op_failed("Management operation failed. Incorrect format (hops)."); +        } +        const mgmt_hop_t& rhop     = transaction.get_hop(0); +        const mgmt_op_t& nop_resp  = rhop.get_op(0); +        const mgmt_op_t& info_resp = rhop.get_op(1); +        if (rhop.get_num_ops() <= 1 || nop_resp.get_op_code() != mgmt_op_t::MGMT_OP_NOP +            || info_resp.get_op_code() != mgmt_op_t::MGMT_OP_INFO_RESP) { +            throw uhd::op_failed( +                "Management operation failed. Incorrect format (operations)."); +        } +        mgmt_op_t::node_info_payload resp_pl(info_resp.get_op_payload()); +        return std::move(node_id_t(resp_pl.device_id, +            static_cast<node_type>(resp_pl.node_type), +            resp_pl.node_inst, +            resp_pl.ext_info)); +    } + +    // Push a hop onto a transaction to initialize the current node +    void _push_node_init_hop(mgmt_payload& transaction, const node_id_t& node) +    { +        mgmt_hop_t init_hop; +        switch (node.type) { +            case NODE_TYPE_XBAR: { +                // Configure the routing table to route all packets going to _my_epid back +                // to the port where the packet is entering +                // The address for the transaction is the EPID and the data is the port # +                init_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_WR_REQ, +                    mgmt_op_t::cfg_payload(_my_epid, node.inst))); +            } break; +            case NODE_TYPE_STRM_EP: { +                // Do nothing +                init_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_NOP)); +            } break; +            case NODE_TYPE_XPORT: { +                uint8_t node_subtype = static_cast<uint8_t>(node.extended_info & 0xFF); +                // Run a hop configuration function for custom transports +                if (_rtcfg_cfg_fns.count(node_subtype)) { +                    _rtcfg_cfg_fns.at(node_subtype)( +                        node.device_id, node.inst, node_subtype, init_hop); +                } else { +                    // For a generic transport, just advertise the transaction to the +                    // outside world. The generic xport adapter will do the rest +                    init_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_ADVERTISE)); +                } +            } break; +            default: { +                UHD_THROW_INVALID_CODE_PATH(); +            } break; +        } +        init_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_RETURN)); +        transaction.add_hop(init_hop); +    } + +    // Lookup the full address of a stream endpoint node given the EPID +    const node_addr_t& _lookup_sep_node_addr(const sep_id_t& epid) +    { +        // Lookup the destination node address using the endpoint ID +        if (_epid_addr_map.count(epid) == 0) { +            throw uhd::lookup_error( +                "Could not find a stream endpoint with the requested ID."); +        } +        node_id_t sep_node(_epid_addr_map.at(epid)); +        // If a node is in _epid_addr_map then it must be in _node_addr_map +        UHD_ASSERT_THROW(_node_addr_map.count(sep_node) > 0); +        return _node_addr_map.at(sep_node); +    } + +    // Send the specified management transaction to the device +    void _send_mgmt_transaction(const mgmt_payload& payload, double timeout = 0.1) +    { +        chdr_header header; +        header.set_pkt_type(PKT_TYPE_MGMT); +        header.set_num_mdata(0); +        header.set_seq_num(_send_seqnum++); +        header.set_length(payload.get_size_bytes() + (chdr_w_to_bits(_chdr_w) / 8)); +        header.set_dst_epid(0); + +        managed_send_buffer::sptr send_buff = _send_xport->get_send_buff(timeout); +        if (not send_buff) { +            UHD_LOG_ERROR("RFNOC::MGMT", "Timed out getting send buff for management transaction"); +            throw uhd::io_error("Timed out getting send buff for management transaction"); +        } +        _send_pkt->refresh(send_buff->cast<void*>(), header, payload); +        send_buff->commit(header.get_length()); +    } + +    // Send the specified management transaction to the device and receive a response +    const mgmt_payload _send_recv_mgmt_transaction( +        const mgmt_payload& transaction, double timeout = 0.1) +    { +        mgmt_payload send(transaction); +        send.set_header(_my_epid, _protover, _chdr_w); +        // If we are expecting to receive a response then we have to add an additional +        // NO-OP hop for the receive endpoint. All responses will be appended to this hop. +        mgmt_hop_t nop_hop; +        nop_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_NOP)); +        send.add_hop(nop_hop); +        // Send the transaction over the wire +        _send_mgmt_transaction(send); + +        managed_recv_buffer::sptr recv_buff = _recv_xport->get_recv_buff(timeout); +        if (not recv_buff) { +            throw uhd::io_error("Timed out getting recv buff for management transaction"); +        } +        _recv_pkt->refresh(recv_buff->cast<void*>()); +        mgmt_payload recv; +        recv.set_header(_my_epid, _protover, _chdr_w); +        _recv_pkt->fill_payload(recv); +        return std::move(recv); +    } + +private: // Members +    // The endpoint ID of this software endpoint +    const sep_id_t _my_epid; +    // The software RFNoC protocol version +    const uint16_t _protover; +    // CHDR Width for this design/application +    const chdr_w_t _chdr_w; +    // The node ID for this software endpoint +    const node_id_t _my_node_id; +    // A table that maps a node_id_t to a node_addr_t. This map allows looking up the +    // address of a node given the node ID. There may be multiple ways to get to the +    // node but we only store the shortest path here. +    std::map<node_id_t, node_addr_t> _node_addr_map; +    // A list of all discovered endpoints +    std::vector<sep_addr_t> _discovered_ep_vtr; +    // A table that maps a stream endpoint ID to the physical address of the stream +    // endpoint +    std::map<sep_id_t, sep_addr_t> _epid_addr_map; +    // Send/recv transports +    uhd::transport::zero_copy_if::sptr _recv_xport; +    uhd::transport::zero_copy_if::sptr _send_xport; +    size_t _send_seqnum; +    // Management packet containers +    chdr_mgmt_packet::uptr _send_pkt; +    chdr_mgmt_packet::cuptr _recv_pkt; +    // Hop configuration function maps +    std::map<uint8_t, xport_cfg_fn_t> _init_cfg_fns; +    std::map<uint8_t, xport_cfg_fn_t> _rtcfg_cfg_fns; +    // Mutex that protects all state in this class +    mutable std::recursive_mutex _mutex; +}; // namespace mgmt + + +mgmt_portal::uptr mgmt_portal::make(const both_xports_t& xports, +    const chdr::chdr_packet_factory& pkt_factory, +    uint16_t protover, +    chdr_w_t chdr_w, +    sep_id_t epid, +    device_id_t device_id) +{ +    return std::make_unique<mgmt_portal_impl>( +        xports, pkt_factory, protover, chdr_w, epid, device_id); +} + +}}} // namespace uhd::rfnoc::mgmt  | 
