diff options
Diffstat (limited to 'host')
| -rw-r--r-- | host/CMakeLists.txt | 4 | ||||
| -rw-r--r-- | host/include/uhd/transport/bounded_buffer.ipp | 89 | ||||
| -rw-r--r-- | host/include/uhd/transport/zero_copy_flow_ctrl.hpp | 58 | ||||
| -rw-r--r-- | host/lib/transport/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | host/lib/transport/nirio_zero_copy.cpp | 23 | ||||
| -rw-r--r-- | host/lib/transport/super_send_packet_handler.hpp | 211 | ||||
| -rw-r--r-- | host/lib/transport/zero_copy_flow_ctrl.cpp | 227 | ||||
| -rw-r--r-- | host/lib/usrp/device3/device3_io_impl.cpp | 68 | ||||
| -rw-r--r-- | host/lib/usrp/x300/x300_fw_ctrl.cpp | 2 | ||||
| -rw-r--r-- | host/lib/usrp/x300/x300_impl.cpp | 8 | ||||
| -rw-r--r-- | host/lib/usrp/x300/x300_impl.hpp | 21 | 
11 files changed, 597 insertions, 115 deletions
diff --git a/host/CMakeLists.txt b/host/CMakeLists.txt index 8def96811..21706d580 100644 --- a/host/CMakeLists.txt +++ b/host/CMakeLists.txt @@ -358,8 +358,8 @@ UHD_INSTALL(FILES  #{{{IMG_SECTION  # This section is written automatically by /images/create_imgs_package.py  # Any manual changes in here will be overwritten. -SET(UHD_IMAGES_MD5SUM "53a1ea139d8344fec9914f05db79bdd0") -SET(UHD_IMAGES_DOWNLOAD_SRC "uhd-images_003.010.001.001-7-g63fcfb95.zip") +SET(UHD_IMAGES_MD5SUM "487beb2dd2477ad90f9b21399931979c") +SET(UHD_IMAGES_DOWNLOAD_SRC "uhd-images_003.010.001.001-27-g47672ede.zip")  #}}}  ######################################################################## diff --git a/host/include/uhd/transport/bounded_buffer.ipp b/host/include/uhd/transport/bounded_buffer.ipp index 35ffb293b..daca3f04f 100644 --- a/host/include/uhd/transport/bounded_buffer.ipp +++ b/host/include/uhd/transport/bounded_buffer.ipp @@ -28,7 +28,8 @@  namespace uhd{ namespace transport{ -    template <typename elem_type> class bounded_buffer_detail : boost::noncopyable{ +    template <typename elem_type> class bounded_buffer_detail : boost::noncopyable +    {      public:          bounded_buffer_detail(size_t capacity): @@ -38,79 +39,97 @@ namespace uhd{ namespace transport{              _not_empty_fcn = boost::bind(&bounded_buffer_detail<elem_type>::not_empty, this);          } -        UHD_INLINE bool push_with_haste(const elem_type &elem){ +        UHD_INLINE bool push_with_haste(const elem_type &elem) +        {              boost::mutex::scoped_lock lock(_mutex); -            if (_buffer.full()) return false; +            if (_buffer.full()) +            { +                return false; +            }              _buffer.push_front(elem); -            lock.unlock();              _empty_cond.notify_one();              return true;          } -        UHD_INLINE bool push_with_pop_on_full(const elem_type &elem){ +        UHD_INLINE bool push_with_pop_on_full(const elem_type &elem) +        {              boost::mutex::scoped_lock lock(_mutex); -            if (_buffer.full()){ +            if (_buffer.full()) +            {                  _buffer.pop_back();                  _buffer.push_front(elem); -                lock.unlock();                  _empty_cond.notify_one();                  return false;              } -            else{ +            else {                  _buffer.push_front(elem); -                lock.unlock();                  _empty_cond.notify_one();                  return true;              }          } -        UHD_INLINE void push_with_wait(const elem_type &elem){ -            if (this->push_with_haste(elem)) return; +        UHD_INLINE void push_with_wait(const elem_type &elem) +        {              boost::mutex::scoped_lock lock(_mutex); -            _full_cond.wait(lock, _not_full_fcn); +            if (_buffer.full()) +            { +                _full_cond.wait(lock, _not_full_fcn); +            }              _buffer.push_front(elem); -            lock.unlock();              _empty_cond.notify_one();          } -        UHD_INLINE bool push_with_timed_wait(const elem_type &elem, double timeout){ -            if (this->push_with_haste(elem)) return true; +        UHD_INLINE bool push_with_timed_wait(const elem_type &elem, double timeout) +        {              boost::mutex::scoped_lock lock(_mutex); -            if (not _full_cond.timed_wait( -                lock, to_time_dur(timeout), _not_full_fcn -            )) return false; +            if (_buffer.full()) +            { +                if (not _full_cond.timed_wait(lock, +                    to_time_dur(timeout), _not_full_fcn)) +                { +                    return false; +                } +            }              _buffer.push_front(elem); -            lock.unlock();              _empty_cond.notify_one();              return true;          } -        UHD_INLINE bool pop_with_haste(elem_type &elem){ +        UHD_INLINE bool pop_with_haste(elem_type &elem) +        {              boost::mutex::scoped_lock lock(_mutex); -            if (_buffer.empty()) return false; +            if (_buffer.empty())  +            { +                return false; +            }              this->pop_back(elem); -            lock.unlock();              _full_cond.notify_one();              return true;          } -        UHD_INLINE void pop_with_wait(elem_type &elem){ -            if (this->pop_with_haste(elem)) return; +        UHD_INLINE void pop_with_wait(elem_type &elem) +        {              boost::mutex::scoped_lock lock(_mutex); -            _empty_cond.wait(lock, _not_empty_fcn); +            if (_buffer.empty()) +            { +                _empty_cond.wait(lock, _not_empty_fcn); +            }              this->pop_back(elem); -            lock.unlock();              _full_cond.notify_one();          } -        UHD_INLINE bool pop_with_timed_wait(elem_type &elem, double timeout){ -            if (this->pop_with_haste(elem)) return true; +        UHD_INLINE bool pop_with_timed_wait(elem_type &elem, double timeout) +        {              boost::mutex::scoped_lock lock(_mutex); -            if (not _empty_cond.timed_wait( -                lock, to_time_dur(timeout), _not_empty_fcn -            )) return false; +            if (_buffer.empty())  +            { +                if (not _empty_cond.timed_wait(lock, to_time_dur(timeout), +                    _not_empty_fcn)) +                { +                    return false; +                } +            }              this->pop_back(elem); -            lock.unlock();              _full_cond.notify_one();              return true;          } @@ -131,13 +150,15 @@ namespace uhd{ namespace transport{           * 2) assign the back element to empty           * 3) pop the back to move the counter           */ -        UHD_INLINE void pop_back(elem_type &elem){ +        UHD_INLINE void pop_back(elem_type &elem) +        {              elem = _buffer.back();              _buffer.back() = elem_type();              _buffer.pop_back();          } -        static UHD_INLINE boost::posix_time::time_duration to_time_dur(double timeout){ +        static UHD_INLINE boost::posix_time::time_duration to_time_dur(double timeout) +        {              return boost::posix_time::microseconds(long(timeout*1e6));          } diff --git a/host/include/uhd/transport/zero_copy_flow_ctrl.hpp b/host/include/uhd/transport/zero_copy_flow_ctrl.hpp new file mode 100644 index 000000000..8075c503d --- /dev/null +++ b/host/include/uhd/transport/zero_copy_flow_ctrl.hpp @@ -0,0 +1,58 @@ +//
 +// Copyright 2017 Ettus Research
 +//
 +// This program is free software: you can redistribute it and/or modify
 +// it under the terms of the GNU General Public License as published by
 +// the Free Software Foundation, either version 3 of the License, or
 +// (at your option) any later version.
 +//
 +// This program is distributed in the hope that it will be useful,
 +// but WITHOUT ANY WARRANTY; without even the implied warranty of
 +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 +// GNU General Public License for more details.
 +//
 +// You should have received a copy of the GNU General Public License
 +// along with this program.  If not, see <http://www.gnu.org/licenses/>.
 +//
 +
 +#ifndef INCLUDED_ZERO_COPY_FLOW_CTRL_HPP
 +#define INCLUDED_ZERO_COPY_FLOW_CTRL_HPP
 +
 +#include <uhd/config.hpp>
 +#include <uhd/transport/zero_copy.hpp>
 +#include <boost/function.hpp>
 +#include <boost/shared_ptr.hpp>
 +
 +namespace uhd{ namespace transport{
 +
 +/*!
 + * Flow control function.
 + * \param buff buffer to be sent or receive buffer being released
 + * \return true if OK, false if not
 + */
 +typedef boost::function<bool(managed_buffer::sptr buff)> flow_ctrl_func;
 +
 +/*!
 + * Adds flow control to any zero_copy_if transport.
 + */
 +class UHD_API zero_copy_flow_ctrl : public virtual zero_copy_if {
 +public:
 +    typedef boost::shared_ptr<zero_copy_flow_ctrl> sptr;
 +
 +    /*!
 +     * Make flow controlled transport.
 +     *
 +     * \param transport a shared pointer to the transport interface
 +     * \param send_flow_ctrl optional send flow control function called before buffer is sent
 +     * \param recv_flow_ctrl optional receive flow control function called after buffer released
 +     */
 +    static sptr make(
 +        zero_copy_if::sptr transport,
 +        flow_ctrl_func send_flow_ctrl,
 +        flow_ctrl_func recv_flow_ctrl
 +    );
 +};
 +
 +}} //namespace
 +
 +#endif /* INCLUDED_ZERO_COPY_FLOW_CTRL_HPP */
 diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt index 44c8d59af..a6d84cc4a 100644 --- a/host/lib/transport/CMakeLists.txt +++ b/host/lib/transport/CMakeLists.txt @@ -129,6 +129,7 @@ LIBUHD_APPEND_SOURCES(      ${CMAKE_CURRENT_SOURCE_DIR}/udp_simple.cpp      ${CMAKE_CURRENT_SOURCE_DIR}/chdr.cpp      ${CMAKE_CURRENT_SOURCE_DIR}/muxed_zero_copy_if.cpp +    ${CMAKE_CURRENT_SOURCE_DIR}/zero_copy_flow_ctrl.cpp  )  IF(ENABLE_X300) diff --git a/host/lib/transport/nirio_zero_copy.cpp b/host/lib/transport/nirio_zero_copy.cpp index 3c52e5080..14118d393 100644 --- a/host/lib/transport/nirio_zero_copy.cpp +++ b/host/lib/transport/nirio_zero_copy.cpp @@ -26,6 +26,7 @@  #include <boost/make_shared.hpp>  #include <boost/date_time/posix_time/posix_time.hpp>  #include <boost/thread/thread.hpp> //sleep +#include <boost/interprocess/mapped_region.hpp>	//get_page_size()  #include <vector>  #include <algorithm>    // std::max  //@TODO: Move the register defs required by the class to a common location @@ -350,6 +351,7 @@ nirio_zero_copy::sptr nirio_zero_copy::make(  ){      //Initialize xport_params      zero_copy_xport_params xport_params = default_buff_args; +    size_t page_size = boost::interprocess::mapped_region::get_page_size();      //The kernel buffer for this transport must be (num_frames * frame_size) big. Unlike ethernet,      //where the kernel buffer size is independent of the circular buffer size for the transport, @@ -386,6 +388,22 @@ nirio_zero_copy::sptr nirio_zero_copy::make(      size_t usr_send_buff_size = static_cast<size_t>(          hints.cast<double>("send_buff_size", default_buff_args.num_send_frames)); +    if (hints.has_key("send_buff_size"))  +    { +        if (usr_send_buff_size % page_size != 0) +        { +            throw uhd::value_error((boost::format("send_buff_size must be multiple of %d") % page_size).str()); +        } +    } + +    if (hints.has_key("send_frame_size") and hints.has_key("num_send_frames")) +    { +        if (usr_num_send_frames * xport_params.send_frame_size % page_size != 0) +        { +            throw uhd::value_error((boost::format("num_send_frames * send_frame_size must be an even multiple of %d") % page_size).str()); +        } +    } +      if (hints.has_key("num_send_frames") and hints.has_key("send_buff_size")) {          if (usr_send_buff_size < xport_params.send_frame_size)              throw uhd::value_error("send_buff_size must be equal to or greater than (num_send_frames * send_frame_size)"); @@ -400,6 +418,11 @@ nirio_zero_copy::sptr nirio_zero_copy::make(          xport_params.num_send_frames = usr_num_send_frames;      } +    if (xport_params.num_send_frames * xport_params.send_frame_size % page_size != 0) +    { +        throw uhd::value_error((boost::format("num_send_frames * send_frame_size must be an even multiple of %d") % page_size).str()); +    } +      return nirio_zero_copy::sptr(new nirio_zero_copy_impl(fpga_session, instance, xport_params));  } diff --git a/host/lib/transport/super_send_packet_handler.hpp b/host/lib/transport/super_send_packet_handler.hpp index 8ad76fa39..003689a78 100644 --- a/host/lib/transport/super_send_packet_handler.hpp +++ b/host/lib/transport/super_send_packet_handler.hpp @@ -29,9 +29,12 @@  #include <uhd/types/metadata.hpp>  #include <uhd/transport/vrt_if_packet.hpp>  #include <uhd/transport/zero_copy.hpp> +#include <uhd/utils/safe_call.hpp>  #include <boost/thread/thread.hpp>  #include <boost/thread/thread_time.hpp>  #include <boost/function.hpp> +#include <boost/atomic.hpp> +#include <boost/make_shared.hpp>  #include <iostream>  #include <vector> @@ -48,6 +51,9 @@ namespace uhd {  namespace transport {  namespace sph { +static const size_t MAX_INTERLEAVE = 4; +static const double GET_BUFF_TIMEOUT = 0.1; +  /***********************************************************************   * Super send packet handler   * @@ -67,19 +73,39 @@ public:       * \param size the number of transport channels       */      send_packet_handler(const size_t size = 1): -        _next_packet_seq(0), _cached_metadata(false) +       _next_packet_seq(0), _cached_metadata(false)      {          this->set_enable_trailer(true);          this->resize(size);      }      ~send_packet_handler(void){ -        /* NOP */ +        UHD_SAFE_CALL( +            for (size_t i = 0; i < _worker_data.size(); i++) +            { +                _worker_data[i]->stop = true; +            } +            _worker_thread_group.join_all(); +        );      }      //! Resize the number of transport channels      void resize(const size_t size){          if (this->size() == size) return; + +        // Stop all worker threads +        for (size_t i = 0; i < _worker_data.size(); i++) +        { +            _worker_data[i]->stop = true; +        } +        _worker_thread_group.join_all(); +        _worker_threads.resize(size); +        _worker_data.resize(size); +        for (size_t i = 0; i < size; i++) +        { +            _worker_data[i] = boost::make_shared<worker_thread_data_t>(); +        } +          _props.resize(size);          static const uint64_t zero = 0;          _zero_buffs.resize(size, &zero); @@ -144,7 +170,15 @@ public:       * \param get_buff the getter function       */      void set_xport_chan_get_buff(const size_t xport_chan, const get_buff_type &get_buff){ +        if (_worker_threads[xport_chan]) +        { +            _worker_thread_group.remove_thread(_worker_threads[xport_chan]); +            _worker_data[xport_chan]->stop = true; +            _worker_threads[xport_chan]->join(); +            _worker_data[xport_chan]->stop = false; +        }          _props.at(xport_chan).get_buff = get_buff; +        _worker_threads[xport_chan] = _worker_thread_group.create_thread(boost::bind(&send_packet_handler::worker, this, xport_chan));      }      //! Set the conversion routine for all channels @@ -380,63 +414,147 @@ private:          if_packet_info.num_payload_words32 = (if_packet_info.num_payload_bytes + 3/*round up*/)/sizeof(uint32_t);          if_packet_info.packet_count = _next_packet_seq; -        //get a buffer for each channel or timeout -        for(xport_chan_props_type &props:  _props){ -            if (not props.buff) props.buff = props.get_buff(timeout); -            if (not props.buff) return 0; //timeout +        // wait for all worker threads to be ready or timeout +        boost::system_time expiration = boost::get_system_time() + boost::posix_time::milliseconds(long(timeout * 1000)); +        for (size_t i = 0; i < this->size(); i++) +        { +            while (not _worker_data[i]->ready) +            { +                if (boost::get_system_time() > expiration) +                { +                    return 0; +                } +            } +            _worker_data[i]->ready = false;          } -        //setup the data to share with converter threads +        //setup the data to share with worker threads          _convert_nsamps = nsamps_per_buff;          _convert_buffs = &buffs;          _convert_buffer_offset_bytes = buffer_offset_bytes;          _convert_if_packet_info = &if_packet_info; -        //perform N channels of conversion -        for (size_t i = 0; i < this->size(); i++) { -            convert_to_in_buff(i); +        //start N channels of conversion +        for (size_t i = 0; i < this->size(); i++) +        { +            _worker_data[i]->go = true; +        } + +        //make sure any sleeping worker threads are woken up +        for (size_t i = 0; i < this->size(); i++) +        { +            // Acquiring the lock used by the condition variable +            // takes too long, so do a spin wait.  If the go flag +            // is not cleared by this point, it will be cleared +            // immediately by the worker thread when it wakes up. +            while (_worker_data[i]->go) +            { +                _worker_data[i]->data_ready.notify_one(); +            } +        } + +        //wait for all worker threads to be done +        for (size_t i = 0; i < this->size(); i++) +        { +            //TODO: Implement a better wait strategy +            //busy loop give fastest response, but these are just wasted cycles +            while (not _worker_data[i]->done) {} +            _worker_data[i]->done = false;          }          _next_packet_seq++; //increment sequence after commits          return nsamps_per_buff;      } -    /*! Run the conversion from the internal buffers to the user's input -     *  buffer. +    /*! Worker thread routine.       * +     * - Gets an internal data buffer       * - Calls the converter       * - Releases internal data buffers -     * - Updates read/write pointers       */ -    UHD_INLINE void convert_to_in_buff(const size_t index) +    void worker(const size_t index)      { -        //shortcut references to local data structures -        managed_send_buffer::sptr &buff = _props[index].buff; -        vrt::if_packet_info_t if_packet_info = *_convert_if_packet_info; -        const tx_streamer::buffs_type &buffs = *_convert_buffs; - -        //fill IO buffs with pointers into the output buffer -        const void *io_buffs[4/*max interleave*/]; -        for (size_t i = 0; i < _num_inputs; i++){ -            const char *b = reinterpret_cast<const char *>(buffs[index*_num_inputs + i]); -            io_buffs[i] = b + _convert_buffer_offset_bytes; +        //maximum number of cycles to spin before waiting on condition variable +        //the value of 30000000 was derived from 15ms on a 10 GHz CPU divided by 5 cycles per loop +        //the assumption is that anything held up for 15ms can wait +        static const size_t MAX_SPIN_CYCLES = 30000000; + +        //maximum amount of time to wait before checking the stop flag +        static const double MAX_WAIT = 0.1; + +        managed_send_buffer::sptr buff; +        vrt::if_packet_info_t if_packet_info; +        std::vector<const void *> in_buffs(MAX_INTERLEAVE); +        boost::shared_ptr<worker_thread_data_t> worker_data = _worker_data[index]; +        boost::unique_lock<boost::mutex> lock(worker_data->data_ready_lock); +        size_t spins = 0; + +        while (not worker_data->stop) +        { +            if (not buff) +            { +                buff = _props[index].get_buff(MAX_WAIT); +                if (not buff) +                { +                    continue; +                } +                worker_data->ready = true; +            } + +            //make sure done flag is cleared by controlling thread before waiting on go signal +            if (worker_data->done) +            { +                continue; +            } + +            //partial spin lock before wait +            while (not worker_data->go and spins < MAX_SPIN_CYCLES) +            { +                spins++; +            } +            if (not worker_data->go and +                not worker_data->data_ready.timed_wait(lock, boost::posix_time::milliseconds(long(MAX_WAIT*1000)))) +            { +                continue; +            } +            // Clear the go flag immediately to let the +            // controlling thread know we are not sleeping. +            worker_data->go = false; + +            //reset the spin count +            spins = 0; + +            //pack metadata into a vrt header +            uint32_t *otw_mem = buff->cast<uint32_t *>() + _header_offset_words32; +            if_packet_info = *_convert_if_packet_info; +            if_packet_info.has_sid = _props[index].has_sid; +            if_packet_info.sid = _props[index].sid; +            _vrt_packer(otw_mem, if_packet_info); +            otw_mem += if_packet_info.num_header_words32; + +            //prepare the input buffers +            for (size_t i = 0; i < _num_inputs; i++) +            { +                in_buffs[i] = +                    (reinterpret_cast<const char *>((*_convert_buffs)[index*_num_inputs + i])) +                    + _convert_buffer_offset_bytes; +            } + +            //perform the conversion operation +            _converter->conv(in_buffs, otw_mem, _convert_nsamps); + +            //let the master know that new data can be prepared +            _worker_data[index]->done = true; + +            //commit the samples to the zero-copy interface +            buff->commit( +                (_header_offset_words32 + if_packet_info.num_packet_words32) +                * sizeof(uint32_t) +            ); + +            //release the buffer +            buff.reset();          } -        const ref_vector<const void *> in_buffs(io_buffs, _num_inputs); - -        //pack metadata into a vrt header -        uint32_t *otw_mem = buff->cast<uint32_t *>() + _header_offset_words32; -        if_packet_info.has_sid = _props[index].has_sid; -        if_packet_info.sid = _props[index].sid; -        _vrt_packer(otw_mem, if_packet_info); -        otw_mem += if_packet_info.num_header_words32; - -        //perform the conversion operation -        _converter->conv(in_buffs, otw_mem, _convert_nsamps); - -        //commit the samples to the zero-copy interface -        const size_t num_vita_words32 = _header_offset_words32+if_packet_info.num_packet_words32; -        buff->commit(num_vita_words32*sizeof(uint32_t)); -        buff.reset(); //effectively a release      }      //! Shared variables for the worker threads @@ -444,7 +562,18 @@ private:      const tx_streamer::buffs_type *_convert_buffs;      size_t _convert_buffer_offset_bytes;      vrt::if_packet_info_t *_convert_if_packet_info; - +    struct worker_thread_data_t { +        worker_thread_data_t() : ready(false), go(false), done(false), stop(false) {} +        boost::atomic_bool ready; +        boost::atomic_bool go; +        boost::atomic_bool done; +        boost::atomic_bool stop; +        boost::mutex data_ready_lock; +        boost::condition_variable data_ready; +    }; +    std::vector< boost::shared_ptr<worker_thread_data_t> > _worker_data; +    boost::thread_group _worker_thread_group; +    std::vector<boost::thread *> _worker_threads;  };  class send_packet_streamer : public send_packet_handler, public tx_streamer{ diff --git a/host/lib/transport/zero_copy_flow_ctrl.cpp b/host/lib/transport/zero_copy_flow_ctrl.cpp new file mode 100644 index 000000000..06d7934e2 --- /dev/null +++ b/host/lib/transport/zero_copy_flow_ctrl.cpp @@ -0,0 +1,227 @@ +//
 +// Copyright 2017 Ettus Research
 +//
 +// This program is free software: you can redistribute it and/or modify
 +// it under the terms of the GNU General Public License as published by
 +// the Free Software Foundation, either version 3 of the License, or
 +// (at your option) any later version.
 +//
 +// This program is distributed in the hope that it will be useful,
 +// but WITHOUT ANY WARRANTY; without even the implied warranty of
 +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 +// GNU General Public License for more details.
 +//
 +// You should have received a copy of the GNU General Public License
 +// along with this program.  If not, see <http://www.gnu.org/licenses/>.
 +//
 +
 +#include <uhd/transport/zero_copy_flow_ctrl.hpp>
 +#include <uhd/transport/bounded_buffer.hpp>
 +#include <uhd/transport/buffer_pool.hpp>
 +#include <uhd/utils/msg.hpp>
 +#include <uhd/utils/log.hpp>
 +#include <uhd/utils/safe_call.hpp>
 +#include <boost/format.hpp>
 +#include <boost/make_shared.hpp>
 +#include <boost/thread/mutex.hpp>
 +#include <boost/thread/thread.hpp>
 +#include <boost/bind.hpp>
 +
 +using namespace uhd;
 +using namespace uhd::transport;
 +
 +typedef bounded_buffer<managed_send_buffer::sptr> bounded_buffer_t;
 +
 +class zero_copy_flow_ctrl_msb : public managed_send_buffer
 +{
 +public:
 +    zero_copy_flow_ctrl_msb(
 +        flow_ctrl_func flow_ctrl
 +    ) :
 +        _mb(NULL),
 +        _flow_ctrl(flow_ctrl)
 +    {
 +        /* NOP */
 +    }
 +
 +    ~zero_copy_flow_ctrl_msb()
 +    {
 +        /* NOP */
 +    }
 +
 +    void release()
 +    {
 +        if (_mb)
 +        {
 +            _mb->commit(size());
 +            while (_flow_ctrl and not _flow_ctrl(_mb)) {}
 +            _mb.reset();
 +        }
 +    }
 +
 +    UHD_INLINE sptr get(sptr &mb)
 +    {
 +        _mb = mb;
 +        return make(this, _mb->cast<void *>(), _mb->size());
 +    }
 +
 +private:
 +    sptr _mb;
 +    flow_ctrl_func _flow_ctrl;
 +};
 +
 +class zero_copy_flow_ctrl_mrb : public managed_recv_buffer
 +{
 +public:
 +    zero_copy_flow_ctrl_mrb(
 +        flow_ctrl_func flow_ctrl
 +    ) :
 +        _mb(NULL),
 +        _flow_ctrl(flow_ctrl)
 +    {
 +        /* NOP */
 +    }
 +
 +    ~zero_copy_flow_ctrl_mrb()
 +    {
 +        /* NOP */
 +    }
 +
 +    void release()
 +    {
 +        if (_mb)
 +        {
 +            _mb->commit(size());
 +            while (_flow_ctrl and not _flow_ctrl(_mb)) {}
 +            _mb.reset();
 +        }
 +    }
 +
 +    UHD_INLINE sptr get(sptr &mb)
 +    {
 +        _mb = mb;
 +        return make(this, _mb->cast<void *>(), _mb->size());
 +    }
 +
 +private:
 +    sptr _mb;
 +    flow_ctrl_func _flow_ctrl;
 +};
 +
 +/***********************************************************************
 + * Zero copy offload transport:
 + * An intermediate transport that utilizes threading to free
 + * the main thread from any receive work.
 + **********************************************************************/
 +class zero_copy_flow_ctrl_impl : public zero_copy_flow_ctrl {
 +public:
 +    typedef boost::shared_ptr<zero_copy_flow_ctrl_impl> sptr;
 +
 +    zero_copy_flow_ctrl_impl(zero_copy_if::sptr transport,
 +        flow_ctrl_func send_flow_ctrl,
 +        flow_ctrl_func recv_flow_ctrl) :
 +        _transport(transport),
 +        _send_buffers(transport->get_num_send_frames()),
 +        _recv_buffers(transport->get_num_recv_frames()),
 +        _send_buff_index(0),
 +        _recv_buff_index(0),
 +        _send_flow_ctrl(send_flow_ctrl),
 +        _recv_flow_ctrl(recv_flow_ctrl)
 +    {
 +        UHD_LOG << "Created zero_copy_flow_ctrl" << std::endl;
 +
 +        for (size_t i = 0; i < transport->get_num_send_frames(); i++)
 +        {
 +            _send_buffers[i] = boost::make_shared<zero_copy_flow_ctrl_msb>(_send_flow_ctrl);
 +        }
 +        for (size_t i = 0; i < transport->get_num_recv_frames(); i++)
 +        {
 +            _recv_buffers[i] = boost::make_shared<zero_copy_flow_ctrl_mrb>(_recv_flow_ctrl);
 +        }
 +    }
 +
 +    ~zero_copy_flow_ctrl_impl()
 +    {
 +    }
 +
 +    /*******************************************************************
 +     * Receive implementation:
 +     * Pop the receive buffer pointer from the underlying transport
 +     ******************************************************************/
 +    UHD_INLINE managed_recv_buffer::sptr get_recv_buff(double timeout)
 +    {
 +        managed_recv_buffer::sptr ptr;
 +        managed_recv_buffer::sptr buff = _transport->get_recv_buff(timeout);
 +        if (buff)
 +        {
 +            boost::shared_ptr<zero_copy_flow_ctrl_mrb> mb = _recv_buffers[_recv_buff_index++];
 +            _recv_buff_index %= _recv_buffers.size();
 +            ptr = mb->get(buff);
 +        }
 +        return ptr;
 +    }
 +
 +    UHD_INLINE size_t get_num_recv_frames() const
 +    {
 +        return _transport->get_num_recv_frames();
 +    }
 +
 +    UHD_INLINE size_t get_recv_frame_size() const
 +    {
 +        return _transport->get_recv_frame_size();
 +    }
 +
 +    /*******************************************************************
 +     * Send implementation:
 +     * Pass the send buffer pointer from the underlying transport
 +     ******************************************************************/
 +    managed_send_buffer::sptr get_send_buff(double timeout)
 +    {
 +        managed_send_buffer::sptr ptr;
 +        managed_send_buffer::sptr buff = _transport->get_send_buff(timeout);
 +        if (buff)
 +        {
 +            boost::shared_ptr<zero_copy_flow_ctrl_msb> mb = _send_buffers[_send_buff_index++];
 +            _send_buff_index %= _send_buffers.size();
 +            ptr = mb->get(buff);
 +        }
 +        return ptr;
 +    }
 +
 +    UHD_INLINE size_t get_num_send_frames() const
 +    {
 +        return _transport->get_num_send_frames();
 +    }
 +
 +    UHD_INLINE size_t get_send_frame_size() const
 +    {
 +        return _transport->get_send_frame_size();
 +    }
 +
 +private:
 +    // The underlying transport
 +    zero_copy_if::sptr _transport;
 +
 +    // buffers
 +    std::vector< boost::shared_ptr<zero_copy_flow_ctrl_msb> > _send_buffers;
 +    std::vector< boost::shared_ptr<zero_copy_flow_ctrl_mrb> > _recv_buffers;
 +    size_t _send_buff_index;
 +    size_t _recv_buff_index;
 +
 +    // Flow control functions
 +    flow_ctrl_func _send_flow_ctrl;
 +    flow_ctrl_func _recv_flow_ctrl;
 +};
 +
 +zero_copy_flow_ctrl::sptr zero_copy_flow_ctrl::make(
 +        zero_copy_if::sptr transport,
 +        flow_ctrl_func send_flow_ctrl,
 +        flow_ctrl_func recv_flow_ctrl
 +)
 +{
 +    zero_copy_flow_ctrl_impl::sptr zero_copy_flow_ctrl(
 +        new zero_copy_flow_ctrl_impl(transport, send_flow_ctrl, recv_flow_ctrl)
 +    );
 +
 +    return zero_copy_flow_ctrl;
 +}
 diff --git a/host/lib/usrp/device3/device3_io_impl.cpp b/host/lib/usrp/device3/device3_io_impl.cpp index 8a42fe148..46cc0ee5a 100644 --- a/host/lib/usrp/device3/device3_io_impl.cpp +++ b/host/lib/usrp/device3/device3_io_impl.cpp @@ -31,6 +31,8 @@  #include "../../rfnoc/tx_stream_terminator.hpp"  #include <uhd/rfnoc/rate_node_ctrl.hpp>  #include <uhd/rfnoc/radio_ctrl.hpp> +#include <uhd/transport/zero_copy_flow_ctrl.hpp> +#include <boost/atomic.hpp>  #define UHD_TX_STREAMER_LOG() UHD_LOGGER_TRACE("STREAMER")  #define UHD_RX_STREAMER_LOG() UHD_LOGGER_TRACE("STREAMER") @@ -305,12 +307,13 @@ struct tx_fc_cache_t          device_channel(0),          last_seq_out(0),          last_seq_ack(0), -        seq_queue(1){} +        last_seq_ack_cache(0) {} +      size_t stream_channel;      size_t device_channel;      size_t last_seq_out; -    size_t last_seq_ack; -    uhd::transport::bounded_buffer<size_t> seq_queue; +    boost::atomic_size_t last_seq_ack; +    size_t last_seq_ack_cache;      boost::shared_ptr<device3_impl::async_md_type> async_queue;      boost::shared_ptr<device3_impl::async_md_type> old_async_queue;  }; @@ -338,33 +341,43 @@ static size_t get_tx_flow_control_window(      return window_in_pkts;  } -static managed_send_buffer::sptr get_tx_buff_with_flowctrl( -    task::sptr /*holds ref*/, -    boost::shared_ptr<tx_fc_cache_t> fc_cache, +// TODO: Remove this function +// This function only exists to make sure the transport is not destroyed +// until it is no longer needed. +static managed_send_buffer::sptr get_tx_buff(      zero_copy_if::sptr xport, -    size_t fc_window,      const double timeout  ){ +    return xport->get_send_buff(timeout); +} + +static bool tx_flow_ctrl( +    task::sptr /*holds ref*/, +    boost::shared_ptr<tx_fc_cache_t> fc_cache, +    size_t fc_window, +    managed_buffer::sptr +) { +    // Busy loop waiting for flow control update.  This is necessary because +    // at this point there is data trying to be sent and it must be sent as +    // quickly as possible when the flow control update arrives to avoid +    // underruns at high rates.  This is also OK because it only occurs when +    // data needs to be sent and flow control is holding it back.      while (true)      {          // delta is the amount of FC credit we've used up -        const size_t delta = (fc_cache->last_seq_out & HW_SEQ_NUM_MASK) - (fc_cache->last_seq_ack & HW_SEQ_NUM_MASK); +        const size_t delta = (fc_cache->last_seq_out & HW_SEQ_NUM_MASK) - +            (fc_cache->last_seq_ack_cache & HW_SEQ_NUM_MASK);          // If we want to send another packet, we must have FC credit left          if ((delta & HW_SEQ_NUM_MASK) < fc_window) -            break; - -        // If credit is all used up, we check seq_queue for more. -        const bool ok = fc_cache->seq_queue.pop_with_timed_wait(fc_cache->last_seq_ack, timeout); -        if (not ok) { -            return managed_send_buffer::sptr(); //timeout waiting for flow control +        { +            // Packet will be sent +            fc_cache->last_seq_out++; //update seq +            return true;          } +        // update the cached value from the atomic +        fc_cache->last_seq_ack_cache = fc_cache->last_seq_ack;      } - -    managed_send_buffer::sptr buff = xport->get_send_buff(timeout); -    if (buff) { -        fc_cache->last_seq_out++; //update seq, this will actually be a send -    } -    return buff; +    return false;  }  #define DEVICE3_ASYNC_EVENT_CODE_FLOW_CTRL 0 @@ -381,7 +394,9 @@ static void handle_tx_async_msgs(  ) {      managed_recv_buffer::sptr buff = xport->get_recv_buff();      if (not buff) +    {          return; +    }      //extract packet info      vrt::if_packet_info_t if_packet_info; @@ -430,8 +445,7 @@ static void handle_tx_async_msgs(      //The FC response and the burst ack are two indicators that the radio      //consumed packets. Use them to update the FC metadata      if (metadata.event_code == DEVICE3_ASYNC_EVENT_CODE_FLOW_CTRL) { -        const size_t seq = metadata.user_payload[0]; -        fc_cache->seq_queue.push_with_pop_on_full(seq); +        fc_cache->last_seq_ack = metadata.user_payload[0];      }      //FC responses don't propagate up to the user so filter them here @@ -842,13 +856,19 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_)              }          } +        // Add flow control +        xport.send = zero_copy_flow_ctrl::make( +            xport.send, +            boost::bind(&tx_flow_ctrl, task, fc_cache, fc_window, _1), +            NULL); +          //Give the streamer a functor to get the send buffer -        //get_tx_buff_with_flowctrl is static so bind has no lifetime issues +        //get_tx_buff is static so bind has no lifetime issues          //xport.send (sptr) is required to add streamer->data-transport lifetime dependency          //task (sptr) is required to add  a streamer->async-handler lifetime dependency          my_streamer->set_xport_chan_get_buff(              stream_i, -            boost::bind(&get_tx_buff_with_flowctrl, task, fc_cache, xport.send, fc_window, _1) +            boost::bind(&get_tx_buff, xport.send, _1)          );          //Give the streamer a functor handled received async messages          my_streamer->set_async_receiver( diff --git a/host/lib/usrp/x300/x300_fw_ctrl.cpp b/host/lib/usrp/x300/x300_fw_ctrl.cpp index 1df0fa611..5ff40c966 100644 --- a/host/lib/usrp/x300/x300_fw_ctrl.cpp +++ b/host/lib/usrp/x300/x300_fw_ctrl.cpp @@ -292,7 +292,7 @@ protected:  private:      niriok_proxy::sptr _drv_proxy; -    static const uint32_t READ_TIMEOUT_IN_MS = 10; +    static const uint32_t READ_TIMEOUT_IN_MS = 100;      static const uint32_t INIT_TIMEOUT_IN_MS = 5000;  }; diff --git a/host/lib/usrp/x300/x300_impl.cpp b/host/lib/usrp/x300/x300_impl.cpp index 785f7b4a3..ac08cf565 100644 --- a/host/lib/usrp/x300/x300_impl.cpp +++ b/host/lib/usrp/x300/x300_impl.cpp @@ -1128,14 +1128,14 @@ uhd::both_xports_t x300_impl::make_transport(                  ? X300_PCIE_RX_DATA_FRAME_SIZE                  : X300_PCIE_MSG_FRAME_SIZE; -            default_buff_args.num_send_frames = -                (xport_type == TX_DATA) -                ? X300_PCIE_DATA_NUM_FRAMES +			default_buff_args.num_send_frames = +				(xport_type == TX_DATA) +                ? X300_PCIE_TX_DATA_NUM_FRAMES                  : X300_PCIE_MSG_NUM_FRAMES;              default_buff_args.num_recv_frames =                  (xport_type == RX_DATA) -                ? X300_PCIE_DATA_NUM_FRAMES +                ? X300_PCIE_RX_DATA_NUM_FRAMES                  : X300_PCIE_MSG_NUM_FRAMES;              xports.recv = nirio_zero_copy::make( diff --git a/host/lib/usrp/x300/x300_impl.hpp b/host/lib/usrp/x300/x300_impl.hpp index 2de295bd9..27f3f130e 100644 --- a/host/lib/usrp/x300/x300_impl.hpp +++ b/host/lib/usrp/x300/x300_impl.hpp @@ -52,15 +52,18 @@ static const size_t X300_RX_SW_BUFF_SIZE_ETH        = 0x2000000;//32MiB    For a  static const size_t X300_RX_SW_BUFF_SIZE_ETH_MACOS  = 0x100000; //1Mib  //The FIFO closest to the DMA controller is 1023 elements deep for RX and 1029 elements deep for TX -//where an element is 8 bytes. For best throughput ensure that the data frame fits in these buffers. -//Also ensure that the kernel has enough frames to hold buffered TX and RX data -static const size_t X300_PCIE_RX_DATA_FRAME_SIZE    = 8184;     //bytes -static const size_t X300_PCIE_TX_DATA_FRAME_SIZE    = 8184;     //bytes -static const size_t X300_PCIE_DATA_NUM_FRAMES       = 2048; -static const size_t X300_PCIE_MSG_FRAME_SIZE        = 256;      //bytes -static const size_t X300_PCIE_MSG_NUM_FRAMES        = 64; -static const size_t X300_PCIE_MAX_CHANNELS          = 6; -static const size_t X300_PCIE_MAX_MUXED_XPORTS      = 32; +//where an element is 8 bytes. The buffers (number of frames * frame size) must be aligned to the +//memory page size.  For the control, we are getting lucky because 64 frames * 256 bytes each aligns +//with the typical page size of 4096 bytes.  Since most page sizes are 4096 bytes or some multiple of +//that, keep the number of frames * frame size aligned to it. +static const size_t X300_PCIE_RX_DATA_FRAME_SIZE        = 4096;     //bytes +static const size_t X300_PCIE_RX_DATA_NUM_FRAMES        = 4096; +static const size_t X300_PCIE_TX_DATA_FRAME_SIZE        = 4096;     //bytes +static const size_t X300_PCIE_TX_DATA_NUM_FRAMES	    = 4096; +static const size_t X300_PCIE_MSG_FRAME_SIZE            = 256;      //bytes +static const size_t X300_PCIE_MSG_NUM_FRAMES            = 64; +static const size_t X300_PCIE_MAX_CHANNELS              = 6; +static const size_t X300_PCIE_MAX_MUXED_XPORTS          = 32;  static const size_t X300_10GE_DATA_FRAME_MAX_SIZE   = 8000;     // CHDR packet size in bytes  static const size_t X300_1GE_DATA_FRAME_MAX_SIZE    = 1472;     // CHDR packet size in bytes  | 
