diff options
| -rw-r--r-- | host/lib/include/uhdlib/rfnoc/chdr_ctrl_endpoint.hpp | 5 | ||||
| -rw-r--r-- | host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp | 10 | ||||
| -rw-r--r-- | host/lib/include/uhdlib/rfnoc/mb_iface.hpp | 51 | ||||
| -rw-r--r-- | host/lib/include/uhdlib/rfnoc/mgmt_portal.hpp | 28 | ||||
| -rw-r--r-- | host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp | 16 | ||||
| -rw-r--r-- | host/lib/rfnoc/chdr_ctrl_endpoint.cpp | 34 | ||||
| -rw-r--r-- | host/lib/rfnoc/link_stream_manager.cpp | 79 | ||||
| -rw-r--r-- | host/lib/rfnoc/mgmt_portal.cpp | 94 | 
8 files changed, 182 insertions, 135 deletions
| diff --git a/host/lib/include/uhdlib/rfnoc/chdr_ctrl_endpoint.hpp b/host/lib/include/uhdlib/rfnoc/chdr_ctrl_endpoint.hpp index b3c3e0108..1281cc0ea 100644 --- a/host/lib/include/uhdlib/rfnoc/chdr_ctrl_endpoint.hpp +++ b/host/lib/include/uhdlib/rfnoc/chdr_ctrl_endpoint.hpp @@ -7,6 +7,7 @@  #ifndef INCLUDED_LIBUHD_RFNOC_CHDR_CTRL_ENDPOINT_HPP  #define INCLUDED_LIBUHD_RFNOC_CHDR_CTRL_ENDPOINT_HPP +#include <uhdlib/rfnoc/chdr_ctrl_xport.hpp>  #include <uhdlib/rfnoc/chdr_packet.hpp>  #include <uhdlib/rfnoc/ctrlport_endpoint.hpp>  #include <functional> @@ -46,11 +47,11 @@ public:      //! Creates a control endpoint object      // -    // \param xports The transports used to send and recv packets +    // \param xport The transport used to send and recv packets      // \param pkt_factor An instance of the CHDR packet factory      // \param my_epid The endpoint ID of this software endpoint      // -    static uptr make(const chdr_ctrl_xport_t& xports, +    static uptr make(chdr_ctrl_xport::sptr xport,          const chdr::chdr_packet_factory& pkt_factory,          sep_id_t my_epid); diff --git a/host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp b/host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp index 4ff69bb3e..79121a498 100644 --- a/host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp +++ b/host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp @@ -17,8 +17,8 @@  namespace uhd { namespace rfnoc { -/*! A class that is responsible managing all data endpoints, control endpoints and client - * zero instances accessible via a logical link between the host device and +/*! A class that is responsible for managing all data endpoints, control endpoints and + * client zero instances accessible via a logical link between the host device and   * motherboard.   *   * Note that each transport adapter on the host has its own set of streaming endpoints, @@ -120,7 +120,8 @@ public:       * \param xport_args The transport arguments       * \return An transport instance       */ -    virtual chdr_data_xport_t create_host_to_device_data_stream(const sep_addr_t dst_addr, +    virtual chdr_tx_data_xport::uptr create_host_to_device_data_stream( +        const sep_addr_t dst_addr,          const bool lossy_xport,          const sw_buff_t pyld_buff_fmt,          const sw_buff_t mdata_buff_fmt, @@ -139,7 +140,8 @@ public:       * \param xport_args The transport arguments       * \return An transport instance       */ -    virtual chdr_data_xport_t create_device_to_host_data_stream(const sep_addr_t src_addr, +    virtual chdr_rx_data_xport::uptr create_device_to_host_data_stream( +        const sep_addr_t src_addr,          const bool lossy_xport,          const sw_buff_t pyld_buff_fmt,          const sw_buff_t mdata_buff_fmt, diff --git a/host/lib/include/uhdlib/rfnoc/mb_iface.hpp b/host/lib/include/uhdlib/rfnoc/mb_iface.hpp index ce1106c4c..cca8dcab8 100644 --- a/host/lib/include/uhdlib/rfnoc/mb_iface.hpp +++ b/host/lib/include/uhdlib/rfnoc/mb_iface.hpp @@ -7,11 +7,20 @@  #ifndef INCLUDED_LIBUHD_MB_IFACE_HPP  #define INCLUDED_LIBUHD_MB_IFACE_HPP +#include <uhdlib/rfnoc/chdr_ctrl_xport.hpp>  #include <uhdlib/rfnoc/rfnoc_common.hpp>  #include <memory>  namespace uhd { namespace rfnoc { +// FIXME: Update this +class chdr_rx_data_xport +{ +public: +    using uptr = std::unique_ptr<chdr_rx_data_xport>; +}; + +using chdr_tx_data_xport = chdr_rx_data_xport;  /*! Motherboard (backchannel) interface   * @@ -59,14 +68,46 @@ public:      virtual void reset_network() = 0;      /*! Create a control transport +     * +     * This is usually called once per motherboard, since there is only one +     * control transport required to talk to all the blocks on the control +     * crossbar. +     * +     * \param local_device_id ID for the host transport adapter to use +     * \param local_epid Host streaming endpoint ID +     * \return A CHDR control transport       */ -    virtual chdr_ctrl_xport_t make_ctrl_transport( -        device_id_t local_device_id, const sep_id_t& src_epid) = 0; +    virtual chdr_ctrl_xport::sptr make_ctrl_transport( +        device_id_t local_device_id, const sep_id_t& local_epid) = 0; -    /*! Create a data transport +    /*! Create an RX data transport +     * +     * This is typically called once per streaming channel. +     * +     * \param local_device_id ID for the host transport adapter to use +     * \param local_epid Host (sink) streaming endpoint ID +     * \param remote_epid Remote device (source) streaming endpoint ID +     * \param xport_args Transport configuration args +     * \return A CHDR RX data transport +     */ +    virtual chdr_rx_data_xport::uptr make_rx_data_transport(device_id_t local_device_id, +        const sep_id_t& local_epid, +        const sep_id_t& remote_epid, +        const device_addr_t& xport_args) = 0; + +    /*! Create an TX data transport +     * +     * This is typically called once per streaming channel. +     * +     * \param local_device_id ID for the host transport adapter to use +     * \param local_epid Host (source) streaming endpoint ID +     * \param remote_epid Remote device (sink) streaming endpoint ID +     * \param xport_args Transport configuration args +     * \return A CHDR TX data transport       */ -    virtual chdr_data_xport_t make_data_transport(device_id_t local_device_id, -        const sep_id_t& src_epid, +    virtual chdr_tx_data_xport::uptr make_tx_data_transport(device_id_t local_device_id, +        const sep_id_t& local_epid, +        const sep_id_t& remote_epid,          const device_addr_t& xport_args) = 0;  }; diff --git a/host/lib/include/uhdlib/rfnoc/mgmt_portal.hpp b/host/lib/include/uhdlib/rfnoc/mgmt_portal.hpp index 1412d0e3d..ac72931bf 100644 --- a/host/lib/include/uhdlib/rfnoc/mgmt_portal.hpp +++ b/host/lib/include/uhdlib/rfnoc/mgmt_portal.hpp @@ -7,6 +7,7 @@  #ifndef INCLUDED_LIBUHD_MGMT_PORTAL_HPP  #define INCLUDED_LIBUHD_MGMT_PORTAL_HPP +#include <uhdlib/rfnoc/chdr_ctrl_xport.hpp>  #include <uhdlib/rfnoc/chdr_types.hpp>  #include <memory>  #include <set> @@ -54,10 +55,12 @@ public:      //! Initialize a stream endpoint and assign an endpoint ID to it      // +    // \param xport The host stream endpoint's CTRL transport      // \param addr The physical address of the stream endpoint      // \param epid The endpoint ID to assign to this endpoint      // -    virtual void initialize_endpoint(const sep_addr_t& addr, const sep_id_t& epid) = 0; +    virtual void initialize_endpoint( +        chdr_ctrl_xport& xport, const sep_addr_t& addr, const sep_id_t& epid) = 0;      //! Get information about a discovered (reachable) stream endpoint      // @@ -77,9 +80,10 @@ public:      //  destination simply by setting the DstEPID in the CHDR header to the specified      //  dst_epid      // +    // \param xport The host stream endpoint's CTRL transport      // \param dst_epid The endpoint ID of the destination      // -    virtual void setup_local_route(const sep_id_t& dst_epid) = 0; +    virtual void setup_local_route(chdr_ctrl_xport& xport, const sep_id_t& dst_epid) = 0;      //! Can a route from between the source and destination endpoints be established?      // @@ -95,11 +99,12 @@ public:      //  to the destination simply by setting the DstEPID in the CHDR header to the      //  specified dst_epid      // +    // \param xport The host stream endpoint's CTRL transport      // \param dst_epid The endpoint ID of the destination      // \param src_epid The endpoint ID of the source      //      virtual void setup_remote_route( -        const sep_id_t& dst_epid, const sep_id_t& src_epid) = 0; +        chdr_ctrl_xport& xport, const sep_id_t& dst_epid, const sep_id_t& src_epid) = 0;      //! Start configuring a flow controlled receive data stream from the endpoint with the      //  specified ID to this SW mgmt portal. @@ -108,6 +113,7 @@ public:      //  control handler needs to acknoweledge the setup transaction then call the commit      //  function below.      // +    // \param xport The host stream endpoint's CTRL transport (same EPID as RX stream)      // \param epid The endpoint ID of the data source      // \param lossy_xport Is the transport lossy? (e.g. UDP, not liberio)      // \param pyld_buff_fmt Datatype of SW buffer that holds the data payload @@ -116,7 +122,8 @@ public:      // \param fc_freq Flow control headroom parameters      // \param reset Reset ingress stream endpoint state      // -    virtual void config_local_rx_stream_start(const sep_id_t& epid, +    virtual void config_local_rx_stream_start(chdr_ctrl_xport& xport, +        const sep_id_t& epid,          const bool lossy_xport,          const sw_buff_t pyld_buff_fmt,          const sw_buff_t mdata_buff_fmt, @@ -127,19 +134,22 @@ public:      //! Finish configuring a flow controlled receive data stream from the endpoint with      //  the specified ID to this SW mgmt portal.      // +    // \param xport The host stream endpoint's CTRL transport (same EPID as RX stream)      // \param epid The endpoint ID of the data source      //      virtual stream_buff_params_t config_local_rx_stream_commit( -        const sep_id_t& epid, const double timeout = 0.2) = 0; +        chdr_ctrl_xport& xport, const sep_id_t& epid, const double timeout = 0.2) = 0;      //! Configure a flow controlled transmit data stream from this SW mgmt portal to the      //  endpoint with the specified ID.      // +    // \param xport The host stream endpoint's CTRL transport (same EPID as TX stream)      // \param pyld_buff_fmt Datatype of SW buffer that holds the data payload      // \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata      // \param reset Reset ingress stream endpoint state      // -    virtual void config_local_tx_stream(const sep_id_t& epid, +    virtual void config_local_tx_stream(chdr_ctrl_xport& xport, +        const sep_id_t& epid,          const sw_buff_t pyld_buff_fmt,          const sw_buff_t mdata_buff_fmt,          const bool reset = false) = 0; @@ -147,6 +157,7 @@ public:      //! Configure a flow controlled data stream from the endpoint with ID src_epid to the      //  endpoint with ID dst_epid      // +    // \param xport The host stream endpoint's CTRL transport      // \param dst_epid The endpoint ID of the destination      // \param src_epid The endpoint ID of the source      // \param lossy_xport Is the transport lossy? @@ -155,7 +166,8 @@ public:      // \param fc_freq Flow control response frequency parameters      // \param fc_freq Flow control headroom parameters      // -    virtual stream_buff_params_t config_remote_stream(const sep_id_t& dst_epid, +    virtual stream_buff_params_t config_remote_stream(chdr_ctrl_xport& xport, +        const sep_id_t& dst_epid,          const sep_id_t& src_epid,          const bool lossy_xport,          const stream_buff_params_t& fc_freq, @@ -176,7 +188,7 @@ public:      //! Create an endpoint manager object      // -    static uptr make(const chdr_ctrl_xport_t& xport, +    static uptr make(chdr_ctrl_xport& xport,          const chdr::chdr_packet_factory& pkt_factory,          sep_addr_t my_sep_addr,          sep_id_t my_epid); diff --git a/host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp b/host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp index c08c8d74a..7ec1b7bb2 100644 --- a/host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp +++ b/host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp @@ -7,7 +7,7 @@  #ifndef INCLUDED_RFNOC_RFNOC_COMMON_HPP  #define INCLUDED_RFNOC_RFNOC_COMMON_HPP -#include <uhd/transport/zero_copy.hpp> +#include <uhdlib/transport/link_if.hpp>  #include <memory>  namespace uhd { namespace rfnoc { @@ -66,20 +66,6 @@ struct stream_buff_params_t  //! The data type of the buffer used to capture/generate data  enum sw_buff_t { BUFF_U64 = 0, BUFF_U32 = 1, BUFF_U16 = 2, BUFF_U8 = 3 }; -// TODO: Update these -struct chdr_ctrl_xport_t -{ -    chdr_ctrl_xport_t() = default; -    uhd::transport::zero_copy_if::sptr recv; -    uhd::transport::zero_copy_if::sptr send; -    stream_buff_params_t recv_buff_params{0, 0}; -    stream_buff_params_t send_buff_params{0, 0}; -    sep_id_t src_epid{0}; -    sep_id_t dst_epid{0}; -}; - -using chdr_data_xport_t = chdr_ctrl_xport_t; -  //----------------------------------------------  // Constants  //---------------------------------------------- diff --git a/host/lib/rfnoc/chdr_ctrl_endpoint.cpp b/host/lib/rfnoc/chdr_ctrl_endpoint.cpp index d1d1dccca..d3c7cd58f 100644 --- a/host/lib/rfnoc/chdr_ctrl_endpoint.cpp +++ b/host/lib/rfnoc/chdr_ctrl_endpoint.cpp @@ -27,7 +27,7 @@ chdr_ctrl_endpoint::~chdr_ctrl_endpoint() = default;  class chdr_ctrl_endpoint_impl : public chdr_ctrl_endpoint  {  public: -    chdr_ctrl_endpoint_impl(const chdr_ctrl_xport_t& xport, +    chdr_ctrl_endpoint_impl(chdr_ctrl_xport::sptr xport,          const chdr::chdr_packet_factory& pkt_factory,          sep_id_t my_epid)          : _my_epid(my_epid) @@ -57,7 +57,14 @@ public:              // there are no timed blocks on the underlying.              _recv_thread.join();              // Flush base transport -            while (_xport.recv->get_recv_buff(0.0001)) /*NOP*/; +            while (true) { +                auto buff = _xport->get_recv_buff(100); +                if (buff) { +                    _xport->release_recv_buff(std::move(buff)); +                } else { +                    break; +                } +            }              // Release child endpoints              _endpoint_map.clear(););      } @@ -82,9 +89,10 @@ public:              header.set_dst_epid(dst_epid);              // Acquire send buffer and send the packet              std::lock_guard<std::mutex> lock(_send_mutex); -            auto send_buff = _xport.send->get_send_buff(timeout); -            _send_pkt->refresh(send_buff->cast<void*>(), header, payload); -            send_buff->commit(header.get_length()); +            auto send_buff = _xport->get_send_buff(timeout * 1000); +            _send_pkt->refresh(send_buff->data(), header, payload); +            send_buff->set_packet_size(header.get_length()); +            _xport->release_send_buff(std::move(send_buff));          };          if (_endpoint_map.find(key) == _endpoint_map.end()) { @@ -118,11 +126,14 @@ private:          // - Route them based on the dst_port          // - Pass them to the ctrlport_endpoint for additional processing          while (not _stop_recv_thread) { -            auto buff = _xport.recv->get_recv_buff(0.0); +            // FIXME Move lock back once have threaded_io_service +            std::unique_lock<std::mutex> lock(_mutex); +            auto buff = _xport->get_recv_buff(0);              if (buff) { -                std::lock_guard<std::mutex> lock(_mutex); +                // FIXME Move lock back to here once have threaded_io_service +                // std::lock_guard<std::mutex> lock(_mutex);                  try { -                    _recv_pkt->refresh(buff->cast<void*>()); +                    _recv_pkt->refresh(buff->data());                      const ctrl_payload payload = _recv_pkt->get_payload();                      ep_map_key_t key{payload.src_epid, payload.dst_port};                      if (_endpoint_map.find(key) != _endpoint_map.end()) { @@ -131,7 +142,10 @@ private:                  } catch (...) {                      // Ignore all errors                  } +                _xport->release_recv_buff(std::move(buff));              } else { +                // FIXME Move lock back to lock_guard once have threaded_io_service +                lock.unlock();                  // Be a good citizen and yield if no packet is processed                  static const size_t MIN_DUR = 1;                  boost::this_thread::sleep_for(boost::chrono::nanoseconds(MIN_DUR)); @@ -154,7 +168,7 @@ private:      // The endpoint ID of this software endpoint      const sep_id_t _my_epid;      // Send/recv transports -    const chdr_ctrl_xport_t _xport; +    chdr_ctrl_xport::sptr _xport;      // The curent sequence number for a send packet      size_t _send_seqnum = 0;      // The number of packets dropped @@ -173,7 +187,7 @@ private:      std::mutex _send_mutex;  }; -chdr_ctrl_endpoint::uptr chdr_ctrl_endpoint::make(const chdr_ctrl_xport_t& xport, +chdr_ctrl_endpoint::uptr chdr_ctrl_endpoint::make(chdr_ctrl_xport::sptr xport,      const chdr::chdr_packet_factory& pkt_factory,      sep_id_t my_epid)  { diff --git a/host/lib/rfnoc/link_stream_manager.cpp b/host/lib/rfnoc/link_stream_manager.cpp index bd330e313..4fe183529 100644 --- a/host/lib/rfnoc/link_stream_manager.cpp +++ b/host/lib/rfnoc/link_stream_manager.cpp @@ -63,31 +63,10 @@ public:          // chdr_ctrl_endpoint. We have to use the same base transport here to ensure that          // the route setup logic in the FPGA transport works correctly.          // TODO: This needs to be cleaned up. A muxed_zero_copy_if is excessive here -        chdr_ctrl_xport_t base_xport = -            _mb_iface.make_ctrl_transport(_my_device_id, _my_mgmt_ctrl_epid); -        UHD_ASSERT_THROW(base_xport.send.get() == base_xport.recv.get()) - -        auto classify_fn = [&pkt_factory](void* buff, size_t) -> uint32_t { -            if (buff) { -                chdr_packet::cuptr pkt = pkt_factory.make_generic(); -                pkt->refresh(buff); -                return (pkt->get_chdr_header().get_pkt_type() == PKT_TYPE_MGMT) ? 0 : 1; -            } else { -                throw uhd::assertion_error("null pointer"); -            } -        }; -        _muxed_xport = muxed_zero_copy_if::make(base_xport.send, classify_fn, 2); - -        // Create child transports -        chdr_ctrl_xport_t mgmt_xport = base_xport; -        mgmt_xport.send              = _muxed_xport->make_stream(0); -        mgmt_xport.recv              = mgmt_xport.send; -        _ctrl_xport                  = base_xport; -        _ctrl_xport.send             = _muxed_xport->make_stream(1); -        _ctrl_xport.recv             = _ctrl_xport.send; +        _ctrl_xport = _mb_iface.make_ctrl_transport(_my_device_id, _my_mgmt_ctrl_epid);          // Create management portal using one of the child transports -        _mgmt_portal = mgmt_portal::make(mgmt_xport, +        _mgmt_portal = mgmt_portal::make(*_ctrl_xport,              _pkt_factory,              sep_addr_t(_my_device_id, SEP_INST_MGMT_CTRL),              _my_mgmt_ctrl_epid); @@ -131,8 +110,8 @@ public:          }          // Setup a route to the EPID -        _mgmt_portal->initialize_endpoint(dst_addr, dst_epid); -        _mgmt_portal->setup_local_route(dst_epid); +        _mgmt_portal->initialize_endpoint(*_ctrl_xport, dst_addr, dst_epid); +        _mgmt_portal->setup_local_route(*_ctrl_xport, dst_epid);          if (!_mgmt_portal->get_endpoint_info(dst_epid).has_ctrl) {              throw uhd::rfnoc_error(                  "Downstream endpoint does not support control traffic"); @@ -157,10 +136,10 @@ public:          sep_id_t src_epid = _epid_alloc->allocate_epid(src_addr);          // Initialize endpoints -        _mgmt_portal->initialize_endpoint(dst_addr, dst_epid); -        _mgmt_portal->initialize_endpoint(src_addr, src_epid); +        _mgmt_portal->initialize_endpoint(*_ctrl_xport, dst_addr, dst_epid); +        _mgmt_portal->initialize_endpoint(*_ctrl_xport, src_addr, src_epid); -        _mgmt_portal->setup_remote_route(dst_epid, src_epid); +        _mgmt_portal->setup_remote_route(*_ctrl_xport, dst_epid, src_epid);          return sep_id_pair_t(src_epid, dst_epid);      } @@ -214,13 +193,15 @@ public:          // EPIDs)          // Setup a stream -        stream_buff_params_t buff_params = _mgmt_portal->config_remote_stream(dst_epid, -            src_epid, -            lossy_xport, -            stream_buff_params_t{1, 1}, // Dummy frequency -            stream_buff_params_t{0, 0}, // Dummy headroom -            false, -            STREAM_SETUP_TIMEOUT); +        stream_buff_params_t buff_params = +            _mgmt_portal->config_remote_stream(*_ctrl_xport, +                dst_epid, +                src_epid, +                lossy_xport, +                stream_buff_params_t{1, 1}, // Dummy frequency +                stream_buff_params_t{0, 0}, // Dummy headroom +                false, +                STREAM_SETUP_TIMEOUT);          // Compute FC frequency and headroom based on buff parameters          stream_buff_params_t fc_freq{ @@ -234,7 +215,8 @@ public:                  std::ceil(double(buff_params.packets) * fc_headroom_ratio))};          // Reconfigure flow control using the new frequency and headroom -        return _mgmt_portal->config_remote_stream(dst_epid, +        return _mgmt_portal->config_remote_stream(*_ctrl_xport, +            dst_epid,              src_epid,              lossy_xport,              fc_freq, @@ -243,7 +225,8 @@ public:              STREAM_SETUP_TIMEOUT);      } -    virtual chdr_data_xport_t create_host_to_device_data_stream(const sep_addr_t dst_addr, +    virtual chdr_tx_data_xport::uptr create_host_to_device_data_stream( +        const sep_addr_t dst_addr,          const bool lossy_xport,          const sw_buff_t pyld_buff_fmt,          const sw_buff_t mdata_buff_fmt, @@ -261,21 +244,22 @@ public:          sep_id_t dst_epid = _epid_alloc->allocate_epid(dst_addr);          // Create the data transport that we will return to the client -        chdr_data_xport_t xport = -            _mb_iface.make_data_transport(_my_device_id, src_epid, xport_args); -        xport.src_epid = src_epid; -        xport.dst_epid = dst_epid; +        chdr_rx_data_xport::uptr xport = _mb_iface.make_rx_data_transport( +            _my_device_id, src_epid, dst_epid, xport_args); + +        chdr_ctrl_xport::sptr mgmt_xport = +            _mb_iface.make_ctrl_transport(_my_device_id, src_epid);          // Create new temporary management portal with the transports used for this stream          // TODO: This is a bit excessive. Maybe we can pair down the functionality of the          // portal just for route setup purposes. Whatever we do, we *must* use xport in it          // though otherwise the transport will not behave correctly.          mgmt_portal::uptr data_mgmt_portal = -            mgmt_portal::make(xport, _pkt_factory, sw_epid_addr, src_epid); +            mgmt_portal::make(*mgmt_xport, _pkt_factory, sw_epid_addr, src_epid);          // Setup a route to the EPID -        data_mgmt_portal->initialize_endpoint(dst_addr, dst_epid); -        data_mgmt_portal->setup_local_route(dst_epid); +        data_mgmt_portal->initialize_endpoint(*mgmt_xport, dst_addr, dst_epid); +        data_mgmt_portal->setup_local_route(*mgmt_xport, dst_epid);          if (!_mgmt_portal->get_endpoint_info(dst_epid).has_data) {              throw uhd::rfnoc_error("Downstream endpoint does not support data traffic");          } @@ -288,7 +272,8 @@ public:          return xport;      } -    virtual chdr_data_xport_t create_device_to_host_data_stream(const sep_addr_t src_addr, +    virtual chdr_tx_data_xport::uptr create_device_to_host_data_stream( +        const sep_addr_t src_addr,          const bool lossy_xport,          const sw_buff_t pyld_buff_fmt,          const sw_buff_t mdata_buff_fmt, @@ -297,6 +282,7 @@ public:          const device_addr_t& xport_args)      {          // TODO: Implement me +        return chdr_tx_data_xport::uptr();      }  private: @@ -323,8 +309,7 @@ private:      // The software EPID for all management and control traffic      sep_id_t _my_mgmt_ctrl_epid;      // Transports -    muxed_zero_copy_if::sptr _muxed_xport; -    chdr_ctrl_xport_t _ctrl_xport; +    chdr_ctrl_xport::sptr _ctrl_xport;      // Management portal for control endpoints      mgmt_portal::uptr _mgmt_portal;      // The CHDR control endpoint diff --git a/host/lib/rfnoc/mgmt_portal.cpp b/host/lib/rfnoc/mgmt_portal.cpp index 79e297407..b490e0baf 100644 --- a/host/lib/rfnoc/mgmt_portal.cpp +++ b/host/lib/rfnoc/mgmt_portal.cpp @@ -7,6 +7,7 @@  #include <uhd/exception.hpp>  #include <uhd/utils/log.hpp> +#include <uhdlib/rfnoc/chdr_ctrl_xport.hpp>  #include <uhdlib/rfnoc/chdr_packet.hpp>  #include <uhdlib/rfnoc/mgmt_portal.hpp>  #include <unordered_set> @@ -174,7 +175,7 @@ mgmt_portal::~mgmt_portal() {}  class mgmt_portal_impl : public mgmt_portal  {  public: -    mgmt_portal_impl(const chdr_ctrl_xport_t& xport, +    mgmt_portal_impl(chdr_ctrl_xport& xport,          const chdr::chdr_packet_factory& pkt_factory,          sep_addr_t my_sep_addr,          sep_id_t my_epid) @@ -183,14 +184,12 @@ public:          , _chdr_w(pkt_factory.get_chdr_w())          , _endianness(pkt_factory.get_endianness())          , _my_node_id(my_sep_addr.first, NODE_TYPE_STRM_EP, my_epid) -        , _recv_xport(xport.recv) -        , _send_xport(xport.send)          , _send_seqnum(0)          , _send_pkt(std::move(pkt_factory.make_mgmt()))          , _recv_pkt(std::move(pkt_factory.make_mgmt()))      {          std::lock_guard<std::recursive_mutex> lock(_mutex); -        _discover_topology(); +        _discover_topology(xport);          UHD_LOG_DEBUG("RFNOC::MGMT",              "The following endpoints are reachable from " << _my_node_id.to_string());          for (const auto& ep : _discovered_ep_set) { @@ -205,7 +204,8 @@ public:          return _discovered_ep_set;      } -    virtual void initialize_endpoint(const sep_addr_t& addr, const sep_id_t& epid) +    virtual void initialize_endpoint( +        chdr_ctrl_xport& xport, const sep_addr_t& addr, const sep_id_t& epid)      {          std::lock_guard<std::recursive_mutex> lock(_mutex); @@ -232,7 +232,7 @@ public:          // Send the transaction and receive a response.          // We don't care about the contents of the response. -        _send_recv_mgmt_transaction(cfg_xact); +        _send_recv_mgmt_transaction(xport, cfg_xact);          // Add/update the entry in the stream endpoint ID map          _epid_addr_map[epid] = addr; @@ -280,7 +280,7 @@ public:          return retval;      } -    virtual void setup_local_route(const sep_id_t& dst_epid) +    virtual void setup_local_route(chdr_ctrl_xport& xport, const sep_id_t& dst_epid)      {          std::lock_guard<std::recursive_mutex> lock(_mutex); @@ -339,7 +339,7 @@ public:          cfg_xact.add_hop(discover_hop);          // Send the transaction and validate that we saw a stream endpoint -        const mgmt_payload sep_info_xact = _send_recv_mgmt_transaction(cfg_xact); +        const mgmt_payload sep_info_xact = _send_recv_mgmt_transaction(xport, cfg_xact);          const node_id_t sep_node         = _pop_node_discovery_hop(sep_info_xact);          if (sep_node.type != NODE_TYPE_STRM_EP) {              throw uhd::routing_error( @@ -384,7 +384,8 @@ public:          return false;      } -    virtual void setup_remote_route(const sep_id_t& dst_epid, const sep_id_t& src_epid) +    virtual void setup_remote_route( +        chdr_ctrl_xport& xport, const sep_id_t& dst_epid, const sep_id_t& src_epid)      {          std::lock_guard<std::recursive_mutex> lock(_mutex); @@ -412,8 +413,8 @@ public:          // there is a need to optimize for routing table fullness, we can do a software          // graph traversal here, find the closest common parent (crossbar) for the two          // nodes and only configure the nodes downstream of that. -        setup_local_route(dst_epid); -        setup_local_route(src_epid); +        setup_local_route(xport, dst_epid); +        setup_local_route(xport, src_epid);          UHD_LOG_DEBUG("RFNOC::MGMT",              (boost::format( @@ -421,7 +422,8 @@ public:                  % src_epid % dst_epid));      } -    virtual void config_local_rx_stream_start(const sep_id_t& epid, +    virtual void config_local_rx_stream_start(chdr_ctrl_xport& xport, +        const sep_id_t& epid,          const bool lossy_xport,          const sw_buff_t pyld_buff_fmt,          const sw_buff_t mdata_buff_fmt, @@ -464,29 +466,30 @@ public:          // Send the transaction and receive a response.          // We don't care about the contents of the response.          cfg_xact.add_hop(cfg_hop); -        _send_recv_mgmt_transaction(cfg_xact); +        _send_recv_mgmt_transaction(xport, cfg_xact);          UHD_LOG_DEBUG("RFNOC::MGMT",              (boost::format("Initiated RX stream setup for EPID=%d") % epid));      }      virtual stream_buff_params_t config_local_rx_stream_commit( -        const sep_id_t& epid, const double timeout = 0.2) +        chdr_ctrl_xport& xport, const sep_id_t& epid, const double timeout = 0.2)      {          std::lock_guard<std::recursive_mutex> lock(_mutex);          // Wait for stream configuration to finish on the HW side          const node_addr_t& node_addr = _lookup_sep_node_addr(epid); -        _validate_stream_setup(node_addr, timeout); +        _validate_stream_setup(xport, node_addr, timeout);          UHD_LOG_DEBUG("RFNOC::MGMT",              (boost::format("Finished RX stream setup for EPID=%d") % epid));          // Return discovered buffer parameters -        return std::get<1>(_get_ostrm_status(node_addr)); +        return std::get<1>(_get_ostrm_status(xport, node_addr));      } -    virtual void config_local_tx_stream(const sep_id_t& epid, +    virtual void config_local_tx_stream(chdr_ctrl_xport& xport, +        const sep_id_t& epid,          const sw_buff_t pyld_buff_fmt,          const sw_buff_t mdata_buff_fmt,          const bool reset = false) @@ -494,7 +497,7 @@ public:          std::lock_guard<std::recursive_mutex> lock(_mutex);          // First setup a route between to the endpoint -        setup_local_route(epid); +        setup_local_route(xport, epid);          const node_addr_t& node_addr = _lookup_sep_node_addr(epid); @@ -522,13 +525,14 @@ public:          // Send the transaction and receive a response.          // We don't care about the contents of the response. -        _send_recv_mgmt_transaction(cfg_xact); +        _send_recv_mgmt_transaction(xport, cfg_xact);          UHD_LOG_DEBUG("RFNOC::MGMT",              (boost::format("Finished TX stream setup for EPID=%d") % epid));      } -    virtual stream_buff_params_t config_remote_stream(const sep_id_t& dst_epid, +    virtual stream_buff_params_t config_remote_stream(chdr_ctrl_xport& xport, +        const sep_id_t& dst_epid,          const sep_id_t& src_epid,          const bool lossy_xport,          const stream_buff_params_t& fc_freq, @@ -539,7 +543,7 @@ public:          std::lock_guard<std::recursive_mutex> lock(_mutex);          // First setup a route between the two endpoints -        setup_remote_route(dst_epid, src_epid); +        setup_remote_route(xport, dst_epid, src_epid);          const node_addr_t& dst_node_addr = _lookup_sep_node_addr(dst_epid);          const node_addr_t& src_node_addr = _lookup_sep_node_addr(src_epid); @@ -557,7 +561,7 @@ public:                          (i == 0) ? RESET_AND_FLUSH_OSTRM : RESET_AND_FLUSH_ISTRM)));                  rst_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_RETURN));                  rst_xact.add_hop(rst_hop); -                _send_recv_mgmt_transaction(rst_xact); +                _send_recv_mgmt_transaction(xport, rst_xact);              }          } @@ -579,18 +583,18 @@ public:              // Send the transaction and receive a response.              // We don't care about the contents of the response.              cfg_xact.add_hop(cfg_hop); -            _send_recv_mgmt_transaction(cfg_xact); +            _send_recv_mgmt_transaction(xport, cfg_xact);          }          // Wait for stream configuration to finish on the HW side -        _validate_stream_setup(src_node_addr, timeout); +        _validate_stream_setup(xport, src_node_addr, timeout);          UHD_LOG_DEBUG("RFNOC::MGMT",              (boost::format("Setup a stream from EPID=%d to EPID=%d") % src_epid                  % dst_epid));          // Return discovered buffer parameters -        return std::get<1>(_get_ostrm_status(src_node_addr)); +        return std::get<1>(_get_ostrm_status(xport, src_node_addr));      } @@ -605,7 +609,7 @@ public:  private: // Functions      // Discover all nodes that are reachable from this software stream endpoint -    void _discover_topology() +    void _discover_topology(chdr_ctrl_xport& xport)      {          // Initialize a queue of pending paths. We will use this for a breadth-first          // traversal of the dataflow graph. The queue consists of a previously discovered @@ -652,7 +656,7 @@ private: // Functions              try {                  // Send the discovery transaction                  const mgmt_payload disc_resp_xact = -                    _send_recv_mgmt_transaction(disc_req_xact); +                    _send_recv_mgmt_transaction(xport, disc_req_xact);                  new_node = _pop_node_discovery_hop(disc_resp_xact);              } catch (uhd::io_error& io_err) {                  // We received an IO error. This could happen if we have a legitimate @@ -695,7 +699,7 @@ private: // Functions                  mgmt_payload init_req_xact(route_xact);                  _push_node_init_hop(init_req_xact, new_node);                  const mgmt_payload init_resp_xact = -                    _send_recv_mgmt_transaction(init_req_xact); +                    _send_recv_mgmt_transaction(xport, init_req_xact);                  UHD_LOG_DEBUG("RFNOC::MGMT", "Initialized node " << new_node.to_string());                  // If the new node is a stream endpoint then we are done traversing this @@ -823,7 +827,7 @@ private: // Functions      // Send/recv a management transaction that will get the output stream status      std::tuple<uint32_t, stream_buff_params_t> _get_ostrm_status( -        const node_addr_t& node_addr) +        chdr_ctrl_xport& xport, const node_addr_t& node_addr)      {          // Build a management transaction to first get to the node          mgmt_payload status_xact; @@ -844,7 +848,7 @@ private: // Functions          status_xact.add_hop(cfg_hop);          // Send the transaction, receive a response and validate it -        const mgmt_payload resp_xact = _send_recv_mgmt_transaction(status_xact); +        const mgmt_payload resp_xact = _send_recv_mgmt_transaction(xport, status_xact);          if (resp_xact.get_num_hops() != 1) {              throw uhd::op_failed("Management operation failed. Incorrect format (hops).");          } @@ -875,13 +879,14 @@ private: // Functions      }      // Make sure that stream setup is complete and successful, else throw exception -    void _validate_stream_setup(const node_addr_t& node_addr, const double timeout) +    void _validate_stream_setup( +        chdr_ctrl_xport& xport, const node_addr_t& node_addr, const double timeout)      {          // Get the status of the output stream          uint32_t ostrm_status = 0;          double sleep_s        = 0.05;          for (size_t i = 0; i < size_t(std::ceil(timeout / sleep_s)); i++) { -            ostrm_status = std::get<0>(_get_ostrm_status(node_addr)); +            ostrm_status = std::get<0>(_get_ostrm_status(xport, node_addr));              if ((ostrm_status & STRM_STATUS_SETUP_PENDING) != 0) {                  // Wait and retry                  std::chrono::milliseconds(static_cast<int64_t>(sleep_s * 1000)); @@ -975,7 +980,8 @@ private: // Functions      }      // Send the specified management transaction to the device -    void _send_mgmt_transaction(const mgmt_payload& payload, double timeout = 0.1) +    void _send_mgmt_transaction( +        chdr_ctrl_xport& xport, const mgmt_payload& payload, double timeout = 0.1)      {          chdr_header header;          header.set_pkt_type(PKT_TYPE_MGMT); @@ -984,18 +990,19 @@ private: // Functions          header.set_length(payload.get_size_bytes() + (chdr_w_to_bits(_chdr_w) / 8));          header.set_dst_epid(0); -        managed_send_buffer::sptr send_buff = _send_xport->get_send_buff(timeout); +        auto send_buff = xport.get_send_buff(timeout * 1000);          if (not send_buff) {              UHD_LOG_ERROR("RFNOC::MGMT", "Timed out getting send buff for management transaction");              throw uhd::io_error("Timed out getting send buff for management transaction");          } -        _send_pkt->refresh(send_buff->cast<void*>(), header, payload); -        send_buff->commit(header.get_length()); +        _send_pkt->refresh(send_buff->data(), header, payload); +        send_buff->set_packet_size(header.get_length()); +        xport.release_send_buff(std::move(send_buff));      }      // Send the specified management transaction to the device and receive a response      const mgmt_payload _send_recv_mgmt_transaction( -        const mgmt_payload& transaction, double timeout = 0.1) +        chdr_ctrl_xport& xport, const mgmt_payload& transaction, double timeout = 0.1)      {          mgmt_payload send(transaction);          send.set_header(_my_epid, _protover, _chdr_w); @@ -1005,17 +1012,18 @@ private: // Functions          nop_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_NOP));          send.add_hop(nop_hop);          // Send the transaction over the wire -        _send_mgmt_transaction(send); +        _send_mgmt_transaction(xport, send); -        managed_recv_buffer::sptr recv_buff = _recv_xport->get_recv_buff(timeout); +        auto recv_buff = xport.get_mgmt_buff(timeout * 1000);          if (not recv_buff) {              throw uhd::io_error("Timed out getting recv buff for management transaction");          } -        _recv_pkt->refresh(recv_buff->cast<void*>()); +        _recv_pkt->refresh(recv_buff->data());          mgmt_payload recv;          recv.set_header(_my_epid, _protover, _chdr_w);          _recv_pkt->fill_payload(recv); -        return std::move(recv); +        xport.release_recv_buff(std::move(recv_buff)); +        return recv;      }  private: // Members @@ -1039,8 +1047,6 @@ private: // Members      // endpoint      std::map<sep_id_t, sep_addr_t> _epid_addr_map;      // Send/recv transports -    uhd::transport::zero_copy_if::sptr _recv_xport; -    uhd::transport::zero_copy_if::sptr _send_xport;      size_t _send_seqnum;      // Management packet containers      chdr_mgmt_packet::uptr _send_pkt; @@ -1053,7 +1059,7 @@ private: // Members  }; // namespace mgmt -mgmt_portal::uptr mgmt_portal::make(const chdr_ctrl_xport_t& xport, +mgmt_portal::uptr mgmt_portal::make(chdr_ctrl_xport& xport,      const chdr::chdr_packet_factory& pkt_factory,      sep_addr_t my_sep_addr,      sep_id_t my_epid) | 
