diff options
| -rw-r--r-- | host/include/uhd/transport/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | host/include/uhd/transport/alignment_buffer.hpp | 134 | ||||
| -rw-r--r-- | host/include/uhd/transport/bounded_buffer.hpp | 146 | ||||
| -rw-r--r-- | host/include/uhd/transport/zero_copy.hpp | 19 | ||||
| -rw-r--r-- | host/lib/transport/udp_zero_copy_asio.cpp | 88 | ||||
| -rw-r--r-- | host/lib/transport/vrt_packet_handler.hpp | 27 | ||||
| -rw-r--r-- | host/lib/usrp/usrp2/io_impl.cpp | 77 | ||||
| -rw-r--r-- | host/lib/usrp/usrp2/usrp2_impl.cpp | 3 | ||||
| -rw-r--r-- | host/lib/usrp/usrp2/usrp2_impl.hpp | 5 | ||||
| -rw-r--r-- | host/test/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | host/test/buffer_test.cpp | 115 | 
11 files changed, 559 insertions, 58 deletions
diff --git a/host/include/uhd/transport/CMakeLists.txt b/host/include/uhd/transport/CMakeLists.txt index 4cefffa24..23a4aae94 100644 --- a/host/include/uhd/transport/CMakeLists.txt +++ b/host/include/uhd/transport/CMakeLists.txt @@ -17,6 +17,8 @@  INSTALL(FILES +    alignment_buffer.hpp +    bounded_buffer.hpp      convert_types.hpp      if_addrs.hpp      udp_simple.hpp diff --git a/host/include/uhd/transport/alignment_buffer.hpp b/host/include/uhd/transport/alignment_buffer.hpp new file mode 100644 index 000000000..dc6ccc3ed --- /dev/null +++ b/host/include/uhd/transport/alignment_buffer.hpp @@ -0,0 +1,134 @@ +// +// Copyright 2010 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. +// + +#ifndef INCLUDED_UHD_TRANSPORT_ALIGNMENT_BUFFER_HPP +#define INCLUDED_UHD_TRANSPORT_ALIGNMENT_BUFFER_HPP + +#include <uhd/config.hpp> +#include <uhd/transport/bounded_buffer.hpp> +#include <boost/thread/condition_variable.hpp> +#include <boost/shared_ptr.hpp> +#include <utility> +#include <vector> + +namespace uhd{ namespace transport{ + +    /*! +     * Imlement a templated alignment buffer: +     * Used for aligning asynchronously pushed elements with matching ids. +     */ +    template <typename elem_type, typename seq_type> class alignment_buffer{ +    public: +        typedef boost::shared_ptr<alignment_buffer<elem_type, seq_type> > sptr; + +        /*! +         * Make a new alignment buffer object. +         * \param capacity the maximum elements per index +         * \param width the number of elements to align +         */ +        static sptr make(size_t capacity, size_t width){ +            return sptr(new alignment_buffer(capacity, width)); +        } + +        /*! +         * Push an element with sequence id into the buffer at index. +         * \param elem the element to push +         * \param seq the sequence identifier +         * \param index the buffer index +         * \return true if the element fit without popping for space +         */ +        UHD_INLINE bool push_with_pop_on_full( +            const elem_type &elem, +            const seq_type &seq, +            size_t index +        ){ +            return _buffs[index]->push_with_pop_on_full(buff_contents_type(elem, seq)); +        } + +        /*! +         * Pop an aligned set of elements from this alignment buffer. +         * \param elems a collection to store the aligned elements +         * \param time the timeout time +         * \return false when the operation times out +         */ +        template <typename elems_type, typename time_type> +        bool pop_elems_with_timed_wait(elems_type &elems, const time_type &time){ +            buff_contents_type buff_contents_tmp; +            std::list<size_t> indexes_to_do(_all_indexes); + +            //do an initial pop to load an initial sequence id +            size_t index = indexes_to_do.front(); +            if (not _buffs[index]->pop_with_timed_wait(buff_contents_tmp, time)) return false; +            elems[index] = buff_contents_tmp.first; +            seq_type expected_seq_id = buff_contents_tmp.second; +            indexes_to_do.pop_front(); + +            //get an aligned set of elements from the buffers: +            while(indexes_to_do.size() != 0){ +                //pop an element off for this index +                index = indexes_to_do.front(); +                if (not _buffs[index]->pop_with_timed_wait(buff_contents_tmp, time)) return false; + +                //if the sequence id matches: +                //  store the popped element into the output, +                //  remove this index from the list and continue +                if (buff_contents_tmp.second == expected_seq_id){ +                    elems[index] = buff_contents_tmp.first; +                    indexes_to_do.pop_front(); +                    continue; +                } + +                //if the sequence id is older: +                //  continue with the same index to try again +                if (buff_contents_tmp.second < expected_seq_id){ +                    continue; +                } + +                //if the sequence id is newer: +                //  store the popped element into the output, +                //  add all other indexes back into the list +                if (buff_contents_tmp.second > expected_seq_id){ +                    elems[index] = buff_contents_tmp.first; +                    expected_seq_id = buff_contents_tmp.second; +                    indexes_to_do = _all_indexes; +                    indexes_to_do.remove(index); +                    continue; +                } +            } +            return true; +        } + +    private: +        //a vector of bounded buffers for each index +        typedef std::pair<elem_type, seq_type> buff_contents_type; +        typedef bounded_buffer<buff_contents_type> bounded_buffer_type; +        typedef boost::shared_ptr<bounded_buffer_type> bounded_buffer_sptr; +        std::vector<bounded_buffer_sptr> _buffs; +        std::list<size_t> _all_indexes; + +        //private constructor +        alignment_buffer(size_t capacity, size_t width){ +            for (size_t i = 0; i < width; i++){ +                _buffs.push_back(bounded_buffer_type::make(capacity)); +                _all_indexes.push_back(i); +            } +        } +    }; + +}} //namespace + +#endif /* INCLUDED_UHD_TRANSPORT_ALIGNMENT_BUFFER_HPP */ diff --git a/host/include/uhd/transport/bounded_buffer.hpp b/host/include/uhd/transport/bounded_buffer.hpp new file mode 100644 index 000000000..baecd6382 --- /dev/null +++ b/host/include/uhd/transport/bounded_buffer.hpp @@ -0,0 +1,146 @@ +// +// Copyright 2010 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. +// + +#ifndef INCLUDED_UHD_TRANSPORT_BOUNDED_BUFFER_HPP +#define INCLUDED_UHD_TRANSPORT_BOUNDED_BUFFER_HPP + +#include <uhd/config.hpp> +#include <boost/bind.hpp> +#include <boost/shared_ptr.hpp> +#include <boost/circular_buffer.hpp> +#include <boost/thread/condition.hpp> + +namespace uhd{ namespace transport{ + +    /*! +     * Imlement a templated bounded buffer: +     * Used for passing elements between threads in a producer-consumer model. +     * The bounded buffer implemented waits and timed waits with condition variables. +     * The pop operation blocks on the bounded_buffer to become non empty. +     * The push operation blocks on the bounded_buffer to become non full. +     */ +    template <typename elem_type> class bounded_buffer{ +    public: +        typedef boost::shared_ptr<bounded_buffer<elem_type> > sptr; + +        /*! +         * Make a new bounded buffer object. +         * \param capacity the bounded_buffer capacity +         */ +        static sptr make(size_t capacity){ +            return sptr(new bounded_buffer(capacity)); +        } + +        /*! +         * Push a new element into the bounded buffer. +         * If the buffer is full prior to the push, +         * make room by poping the oldest element. +         * \param elem the new element to push +         * \return true if the element fit without popping for space +         */ +        UHD_INLINE bool push_with_pop_on_full(const elem_type &elem){ +            boost::unique_lock<boost::mutex> lock(_mutex); +            if(_buffer.full()){ +                _buffer.pop_back(); +                _buffer.push_front(elem); +                lock.unlock(); +                _empty_cond.notify_one(); +                return false; +            } +            else{ +                _buffer.push_front(elem); +                lock.unlock(); +                _empty_cond.notify_one(); +                return true; +            } +        } + +        /*! +         * Push a new element into the bounded_buffer. +         * Wait until the bounded_buffer becomes non-full. +         * \param elem the new element to push +         */ +        UHD_INLINE void push_with_wait(const elem_type &elem){ +            boost::unique_lock<boost::mutex> lock(_mutex); +            _full_cond.wait(lock, boost::bind(&bounded_buffer<elem_type>::not_full, this)); +            _buffer.push_front(elem); +            lock.unlock(); +            _empty_cond.notify_one(); +        } + +        /*! +         * Push a new element into the bounded_buffer. +         * Wait until the bounded_buffer becomes non-full or timeout. +         * \param elem the new element to push +         * \param time the timeout time +         * \return false when the operation times out +         */ +        template<typename time_type> UHD_INLINE +        bool push_with_timed_wait(const elem_type &elem, const time_type &time){ +            boost::unique_lock<boost::mutex> lock(_mutex); +            if (not _full_cond.timed_wait(lock, time, boost::bind(&bounded_buffer<elem_type>::not_full, this))) return false; +            _buffer.push_front(elem); +            lock.unlock(); +            _empty_cond.notify_one(); +            return true; +        } + +        /*! +         * Pop an element from the bounded_buffer. +         * Wait until the bounded_buffer becomes non-empty. +         * \param elem the element reference pop to +         */ +        UHD_INLINE void pop_with_wait(elem_type &elem){ +            boost::unique_lock<boost::mutex> lock(_mutex); +            _empty_cond.wait(lock, boost::bind(&bounded_buffer<elem_type>::not_empty, this)); +            elem = _buffer.back(); _buffer.pop_back(); +            lock.unlock(); +            _full_cond.notify_one(); +        } + +        /*! +         * Pop an element from the bounded_buffer. +         * Wait until the bounded_buffer becomes non-empty or timeout. +         * \param elem the element reference pop to +         * \param time the timeout time +         * \return false when the operation times out +         */ +        template<typename time_type> UHD_INLINE +        bool pop_with_timed_wait(elem_type &elem, const time_type &time){ +            boost::unique_lock<boost::mutex> lock(_mutex); +            if (not _empty_cond.timed_wait(lock, time, boost::bind(&bounded_buffer<elem_type>::not_empty, this))) return false; +            elem = _buffer.back(); _buffer.pop_back(); +            lock.unlock(); +            _full_cond.notify_one(); +            return true; +        } + +    private: +        boost::mutex _mutex; +        boost::condition _empty_cond, _full_cond; +        boost::circular_buffer<elem_type> _buffer; + +        bool not_full(void) const{return not _buffer.full();} +        bool not_empty(void) const{return not _buffer.empty();} + +        //private constructor +        bounded_buffer(size_t capacity) : _buffer(capacity){} +    }; + +}} //namespace + +#endif /* INCLUDED_UHD_TRANSPORT_BOUNDED_BUFFER_HPP */ diff --git a/host/include/uhd/transport/zero_copy.hpp b/host/include/uhd/transport/zero_copy.hpp index 2efabaccf..d6eb89a91 100644 --- a/host/include/uhd/transport/zero_copy.hpp +++ b/host/include/uhd/transport/zero_copy.hpp @@ -124,9 +124,28 @@ namespace uhd{ namespace transport{          virtual managed_recv_buffer::sptr get_recv_buff(void) = 0;          /*! +         * Get the maximum number of receive frames: +         *   The maximum number of valid managed recv buffers, +         *   or the maximum number of frames in the ring buffer, +         *   depending upon the underlying implementation. +         * \return number of frames +         */ +        virtual size_t get_num_recv_frames(void) const = 0; + +        /*!           * Get a new send buffer from this transport object.           */          virtual managed_send_buffer::sptr get_send_buff(void) = 0; + +        /*! +         * Get the maximum number of send frames: +         *   The maximum number of valid managed send buffers, +         *   or the maximum number of frames in the ring buffer, +         *   depending upon the underlying implementation. +         * \return number of frames +         */ +        virtual size_t get_num_send_frames(void) const = 0; +      };      /*! diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp index 0c604811a..ffd1a4d65 100644 --- a/host/lib/transport/udp_zero_copy_asio.cpp +++ b/host/lib/transport/udp_zero_copy_asio.cpp @@ -20,6 +20,7 @@  #include <boost/cstdint.hpp>  #include <boost/asio.hpp>  #include <boost/format.hpp> +#include <boost/thread.hpp>  #include <iostream>  using namespace uhd::transport; @@ -28,8 +29,7 @@ using namespace uhd::transport;   * Constants   **********************************************************************/  static const size_t MIN_SOCK_BUFF_SIZE = size_t(100e3); -static const size_t MAX_DGRAM_SIZE = 2048; //assume max size on send and recv -static const double RECV_TIMEOUT = 0.1; // 100 ms +static const size_t MAX_DGRAM_SIZE = 1500; //assume max size on send and recv  /***********************************************************************   * Zero Copy UDP implementation with ASIO: @@ -46,12 +46,32 @@ class udp_zero_copy_impl:  public:      typedef boost::shared_ptr<udp_zero_copy_impl> sptr; -    //structors -    udp_zero_copy_impl(const std::string &addr, const std::string &port); -    ~udp_zero_copy_impl(void); +    udp_zero_copy_impl( +        const std::string &addr, +        const std::string &port +    ): +        phony_zero_copy_recv_if(MAX_DGRAM_SIZE), +        phony_zero_copy_send_if(MAX_DGRAM_SIZE) +    { +        //std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl; + +        // resolve the address +        boost::asio::ip::udp::resolver resolver(_io_service); +        boost::asio::ip::udp::resolver::query query(boost::asio::ip::udp::v4(), addr, port); +        boost::asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query); + +        // create, open, and connect the socket +        _socket = new boost::asio::ip::udp::socket(_io_service); +        _socket->open(boost::asio::ip::udp::v4()); +        _socket->connect(receiver_endpoint); +    } + +    ~udp_zero_copy_impl(void){ +        delete _socket; +    }      //get size for internal socket buffer -    template <typename Opt> size_t get_buff_size(void){ +    template <typename Opt> size_t get_buff_size(void) const{          Opt option;          _socket->get_option(option);          return option.value(); @@ -64,12 +84,33 @@ public:          return get_buff_size<Opt>();      } + +    //The number of frames is approximately the buffer size divided by the max datagram size. +    //In reality, this is a phony zero-copy interface and the number of frames is infinite. +    //However, its sensible to advertise a frame count that is approximate to buffer size. +    //This way, the transport caller will have an idea about how much buffering to create. + +    size_t get_num_recv_frames(void) const{ +        return this->get_buff_size<boost::asio::socket_base::receive_buffer_size>()/MAX_DGRAM_SIZE; +    } + +    size_t get_num_send_frames(void) const{ +        return this->get_buff_size<boost::asio::socket_base::send_buffer_size>()/MAX_DGRAM_SIZE; +    } +  private:      boost::asio::ip::udp::socket   *_socket;      boost::asio::io_service        _io_service;      size_t recv(const boost::asio::mutable_buffer &buff){ -        return _socket->receive(boost::asio::buffer(buff)); +        boost::asio::deadline_timer timer(_socket->get_io_service()); +        timer.expires_from_now(boost::posix_time::milliseconds(100)); +        while (not (_socket->available() or timer.expires_from_now().is_negative())){ +            boost::this_thread::sleep(boost::posix_time::milliseconds(1)); +        } + +        if (_socket->available()) return _socket->receive(boost::asio::buffer(buff)); +        return 0; //no bytes available, timeout...      }      size_t send(const boost::asio::const_buffer &buff){ @@ -77,41 +118,10 @@ private:      }  }; -udp_zero_copy_impl::udp_zero_copy_impl(const std::string &addr, const std::string &port): -    phony_zero_copy_recv_if(MAX_DGRAM_SIZE), -    phony_zero_copy_send_if(MAX_DGRAM_SIZE) -{ -    //std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl; - -    // resolve the address -    boost::asio::ip::udp::resolver resolver(_io_service); -    boost::asio::ip::udp::resolver::query query(boost::asio::ip::udp::v4(), addr, port); -    boost::asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query); - -    // create, open, and connect the socket -    _socket = new boost::asio::ip::udp::socket(_io_service); -    _socket->open(boost::asio::ip::udp::v4()); -    _socket->connect(receiver_endpoint); - -    // set recv timeout -    timeval tv; -    tv.tv_sec = 0; -    tv.tv_usec = size_t(RECV_TIMEOUT*1e6); -    UHD_ASSERT_THROW(setsockopt( -        _socket->native(), -        SOL_SOCKET, SO_RCVTIMEO, -        (const char *)&tv, sizeof(timeval) -    ) == 0); -} - -udp_zero_copy_impl::~udp_zero_copy_impl(void){ -    delete _socket; -} -  /***********************************************************************   * UDP zero copy make function   **********************************************************************/ -template<typename Opt> static inline void resize_buff_helper( +template<typename Opt> static void resize_buff_helper(      udp_zero_copy_impl::sptr udp_trans,      size_t target_size,      const std::string &name diff --git a/host/lib/transport/vrt_packet_handler.hpp b/host/lib/transport/vrt_packet_handler.hpp index e64e3383d..d6b863040 100644 --- a/host/lib/transport/vrt_packet_handler.hpp +++ b/host/lib/transport/vrt_packet_handler.hpp @@ -54,6 +54,8 @@ namespace vrt_packet_handler{          }      }; +    typedef boost::function<uhd::transport::managed_recv_buffer::sptr(void)> get_recv_buff_t; +      typedef boost::function<void(uhd::transport::managed_recv_buffer::sptr)> recv_cb_t;      static UHD_INLINE void recv_cb_nop(uhd::transport::managed_recv_buffer::sptr){ @@ -112,15 +114,16 @@ namespace vrt_packet_handler{          const uhd::io_type_t &io_type,          const uhd::otw_type_t &otw_type,          double tick_rate, -        uhd::transport::zero_copy_if::sptr zc_iface, +        const get_recv_buff_t &get_recv_buff,          //use these two params to handle a layer above vrt          size_t vrt_header_offset_words32, -        const recv_cb_t& recv_cb +        const recv_cb_t &recv_cb      ){          //perform a receive if no rx data is waiting to be copied          if (boost::asio::buffer_size(state.copy_buff) == 0){              state.fragment_offset_in_samps = 0; -            state.managed_buff = zc_iface->get_recv_buff(); +            state.managed_buff = get_recv_buff(); +            if (state.managed_buff.get() == NULL) return 0;              recv_cb(state.managed_buff); //callback before vrt unpack              try{                  _recv1_helper( @@ -169,7 +172,7 @@ namespace vrt_packet_handler{          const uhd::io_type_t &io_type,          const uhd::otw_type_t &otw_type,          double tick_rate, -        uhd::transport::zero_copy_if::sptr zc_iface, +        const get_recv_buff_t &get_recv_buff,          //use these two params to handle a layer above vrt          size_t vrt_header_offset_words32 = 0,          const recv_cb_t& recv_cb = &recv_cb_nop @@ -189,7 +192,7 @@ namespace vrt_packet_handler{                  metadata,                  io_type, otw_type,                  tick_rate, -                zc_iface, +                get_recv_buff,                  vrt_header_offset_words32,                  recv_cb              ); @@ -208,7 +211,7 @@ namespace vrt_packet_handler{                      (accum_num_samps == 0)? metadata : tmp_md, //only the first metadata gets kept                      io_type, otw_type,                      tick_rate, -                    zc_iface, +                    get_recv_buff,                      vrt_header_offset_words32,                      recv_cb                  ); @@ -234,6 +237,8 @@ namespace vrt_packet_handler{          }      }; +    typedef boost::function<uhd::transport::managed_send_buffer::sptr(void)> get_send_buff_t; +      typedef boost::function<void(uhd::transport::managed_send_buffer::sptr)> send_cb_t;      static UHD_INLINE void send_cb_nop(uhd::transport::managed_send_buffer::sptr){ @@ -252,12 +257,12 @@ namespace vrt_packet_handler{          const uhd::io_type_t &io_type,          const uhd::otw_type_t &otw_type,          double tick_rate, -        uhd::transport::zero_copy_if::sptr zc_iface, +        const get_send_buff_t &get_send_buff,          size_t vrt_header_offset_words32,          const send_cb_t& send_cb      ){          //get a new managed send buffer -        uhd::transport::managed_send_buffer::sptr send_buff = zc_iface->get_send_buff(); +        uhd::transport::managed_send_buffer::sptr send_buff = get_send_buff();          boost::uint32_t *tx_mem = send_buff->cast<boost::uint32_t *>() + vrt_header_offset_words32;          size_t num_header_words32, num_packet_words32; @@ -298,7 +303,7 @@ namespace vrt_packet_handler{          const uhd::io_type_t &io_type,          const uhd::otw_type_t &otw_type,          double tick_rate, -        uhd::transport::zero_copy_if::sptr zc_iface, +        const get_send_buff_t &get_send_buff,          size_t max_samples_per_packet,          //use these two params to handle a layer above vrt          size_t vrt_header_offset_words32 = 0, @@ -319,7 +324,7 @@ namespace vrt_packet_handler{                  metadata,                  io_type, otw_type,                  tick_rate, -                zc_iface, +                get_send_buff,                  vrt_header_offset_words32,                  send_cb              ); @@ -353,7 +358,7 @@ namespace vrt_packet_handler{                      md,                      io_type, otw_type,                      tick_rate, -                    zc_iface, +                    get_send_buff,                      vrt_header_offset_words32,                      send_cb                  ); diff --git a/host/lib/usrp/usrp2/io_impl.cpp b/host/lib/usrp/usrp2/io_impl.cpp index 79b18fb63..efd64d4ab 100644 --- a/host/lib/usrp/usrp2/io_impl.cpp +++ b/host/lib/usrp/usrp2/io_impl.cpp @@ -15,11 +15,15 @@  // along with this program.  If not, see <http://www.gnu.org/licenses/>.  // +#include "../../transport/vrt_packet_handler.hpp"  #include "usrp2_impl.hpp"  #include "usrp2_regs.hpp"  #include <uhd/transport/convert_types.hpp> +#include <uhd/transport/bounded_buffer.hpp>  #include <boost/format.hpp>  #include <boost/asio.hpp> //htonl and ntohl +#include <boost/bind.hpp> +#include <boost/thread.hpp>  #include <iostream>  using namespace uhd; @@ -28,6 +32,60 @@ using namespace uhd::transport;  namespace asio = boost::asio;  /*********************************************************************** + * io impl details (internal to this file) + **********************************************************************/ +struct usrp2_impl::io_impl{ + +    io_impl(zero_copy_if::sptr zc_if); +    ~io_impl(void); + +    managed_recv_buffer::sptr get_recv_buff(void); + +    //state management for the vrt packet handler code +    vrt_packet_handler::recv_state packet_handler_recv_state; +    vrt_packet_handler::send_state packet_handler_send_state; + +    //methods and variables for the recv pirate +    void recv_pirate_loop(zero_copy_if::sptr zc_if); +    boost::thread *recv_pirate_thread; bool recv_pirate_running; +    bounded_buffer<managed_recv_buffer::sptr>::sptr recv_pirate_booty; +}; + +usrp2_impl::io_impl::io_impl(zero_copy_if::sptr zc_if){ +    //create a large enough booty +    size_t num_frames = zc_if->get_num_recv_frames(); +    std::cout << "Recv pirate num frames: " << num_frames << std::endl; +    recv_pirate_booty = bounded_buffer<managed_recv_buffer::sptr>::make(num_frames); + +    //create a new pirate thread (yarr!!) +    recv_pirate_thread = new boost::thread( +        boost::bind(&usrp2_impl::io_impl::recv_pirate_loop, this, zc_if) +    ); +} + +usrp2_impl::io_impl::~io_impl(void){ +    recv_pirate_running = false; +    recv_pirate_thread->interrupt(); +    recv_pirate_thread->join(); +    delete recv_pirate_thread; +} + +managed_recv_buffer::sptr usrp2_impl::io_impl::get_recv_buff(void){ +    managed_recv_buffer::sptr buff; +    recv_pirate_booty->pop_with_timed_wait(buff, boost::posix_time::milliseconds(100)); +    //timeout means a null sptr... +    return buff; +} + +void usrp2_impl::io_impl::recv_pirate_loop(zero_copy_if::sptr zc_if){ +    recv_pirate_running = true; +    while(recv_pirate_running){ +        managed_recv_buffer::sptr buff = zc_if->get_recv_buff(); +        if (buff->size()) recv_pirate_booty->push_with_pop_on_full(buff); +    } +} + +/***********************************************************************   * Helper Functions   **********************************************************************/  void usrp2_impl::io_init(void){ @@ -60,11 +118,22 @@ void usrp2_impl::io_init(void){      );      _iface->poke32(FR_RX_CTRL_VRT_STREAM_ID, 0);      _iface->poke32(FR_RX_CTRL_VRT_TRAILER, 0); + +    //create new io impl +    _io_impl = new io_impl(_data_transport); +} + +void usrp2_impl::io_done(void){ +    delete _io_impl;  }  /***********************************************************************   * Send Data   **********************************************************************/ +static inline managed_send_buffer::sptr get_send_buff(zero_copy_if::sptr zc_if){ +    return zc_if->get_send_buff(); +} +  size_t usrp2_impl::send(      const asio::const_buffer &buff,      const tx_metadata_t &metadata, @@ -72,11 +141,11 @@ size_t usrp2_impl::send(      send_mode_t send_mode  ){      return vrt_packet_handler::send( -        _packet_handler_send_state, //last state of the send handler +        _io_impl->packet_handler_send_state, //last state of the send handler          buff, metadata, send_mode,  //buffer to empty and samples metadata          io_type, _tx_otw_type,      //input and output types to convert          get_master_clock_freq(),    //master clock tick rate -        _data_transport,            //zero copy interface +        boost::bind(get_send_buff, _data_transport),          get_max_send_samps_per_packet()      );  } @@ -91,10 +160,10 @@ size_t usrp2_impl::recv(      recv_mode_t recv_mode  ){      return vrt_packet_handler::recv( -        _packet_handler_recv_state, //last state of the recv handler +        _io_impl->packet_handler_recv_state, //last state of the recv handler          buff, metadata, recv_mode,  //buffer to fill and samples metadata          io_type, _rx_otw_type,      //input and output types to convert          get_master_clock_freq(),    //master clock tick rate -        _data_transport             //zero copy interface +        boost::bind(&usrp2_impl::io_impl::get_recv_buff, _io_impl)      );  } diff --git a/host/lib/usrp/usrp2/usrp2_impl.cpp b/host/lib/usrp/usrp2/usrp2_impl.cpp index af3ec216a..7f79c483b 100644 --- a/host/lib/usrp/usrp2/usrp2_impl.cpp +++ b/host/lib/usrp/usrp2/usrp2_impl.cpp @@ -185,7 +185,8 @@ usrp2_impl::usrp2_impl(  }  usrp2_impl::~usrp2_impl(void){ -    /* NOP */ +    //cleanup the send and recv io +    io_done();  }  /*********************************************************************** diff --git a/host/lib/usrp/usrp2/usrp2_impl.hpp b/host/lib/usrp/usrp2/usrp2_impl.hpp index 7948a2069..4b6805217 100644 --- a/host/lib/usrp/usrp2/usrp2_impl.hpp +++ b/host/lib/usrp/usrp2/usrp2_impl.hpp @@ -33,7 +33,6 @@  #include <uhd/transport/vrt.hpp>  #include <uhd/transport/udp_zero_copy.hpp>  #include <uhd/usrp/dboard_manager.hpp> -#include "../../transport/vrt_packet_handler.hpp"  /*!   * Make a usrp2 dboard interface. @@ -153,10 +152,10 @@ private:          uhd::transport::vrt::max_header_words32*sizeof(boost::uint32_t)      ; -    vrt_packet_handler::recv_state _packet_handler_recv_state; -    vrt_packet_handler::send_state _packet_handler_send_state;      uhd::otw_type_t _rx_otw_type, _tx_otw_type; +    struct io_impl; io_impl *_io_impl;      void io_init(void); +    void io_done(void);      //udp transports for control and data      uhd::transport::udp_zero_copy::sptr _data_transport; diff --git a/host/test/CMakeLists.txt b/host/test/CMakeLists.txt index 61b0b503d..24778d13e 100644 --- a/host/test/CMakeLists.txt +++ b/host/test/CMakeLists.txt @@ -21,6 +21,7 @@  ADD_EXECUTABLE(main_test      main_test.cpp      addr_test.cpp +    buffer_test.cpp      dict_test.cpp      error_test.cpp      gain_handler_test.cpp diff --git a/host/test/buffer_test.cpp b/host/test/buffer_test.cpp new file mode 100644 index 000000000..aadb3f951 --- /dev/null +++ b/host/test/buffer_test.cpp @@ -0,0 +1,115 @@ +// +// Copyright 2010 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. +// + +#include <boost/test/unit_test.hpp> +#include <uhd/transport/bounded_buffer.hpp> +#include <uhd/transport/alignment_buffer.hpp> +#include <boost/assign/list_of.hpp> + +using namespace boost::assign; +using namespace uhd::transport; + +static const boost::posix_time::milliseconds timeout(10); + +BOOST_AUTO_TEST_CASE(test_bounded_buffer_with_timed_wait){ +    bounded_buffer<int>::sptr bb(bounded_buffer<int>::make(3)); + +    //push elements, check for timeout +    BOOST_CHECK(bb->push_with_timed_wait(0, timeout)); +    BOOST_CHECK(bb->push_with_timed_wait(1, timeout)); +    BOOST_CHECK(bb->push_with_timed_wait(2, timeout)); +    BOOST_CHECK(not bb->push_with_timed_wait(3, timeout)); + +    int val; +    //pop elements, check for timeout and check values +    BOOST_CHECK(bb->pop_with_timed_wait(val, timeout)); +    BOOST_CHECK_EQUAL(val, 0); +    BOOST_CHECK(bb->pop_with_timed_wait(val, timeout)); +    BOOST_CHECK_EQUAL(val, 1); +    BOOST_CHECK(bb->pop_with_timed_wait(val, timeout)); +    BOOST_CHECK_EQUAL(val, 2); +    BOOST_CHECK(not bb->pop_with_timed_wait(val, timeout)); +} + +BOOST_AUTO_TEST_CASE(test_bounded_buffer_with_pop_on_full){ +    bounded_buffer<int>::sptr bb(bounded_buffer<int>::make(3)); + +    //push elements, check for timeout +    BOOST_CHECK(bb->push_with_pop_on_full(0)); +    BOOST_CHECK(bb->push_with_pop_on_full(1)); +    BOOST_CHECK(bb->push_with_pop_on_full(2)); +    BOOST_CHECK(not bb->push_with_pop_on_full(3)); + +    int val; +    //pop elements, check for timeout and check values +    BOOST_CHECK(bb->pop_with_timed_wait(val, timeout)); +    BOOST_CHECK_EQUAL(val, 1); +    BOOST_CHECK(bb->pop_with_timed_wait(val, timeout)); +    BOOST_CHECK_EQUAL(val, 2); +    BOOST_CHECK(bb->pop_with_timed_wait(val, timeout)); +    BOOST_CHECK_EQUAL(val, 3); +} + +BOOST_AUTO_TEST_CASE(test_alignment_buffer){ +    alignment_buffer<int, size_t>::sptr ab(alignment_buffer<int, size_t>::make(7, 3)); +    //load index 0 with all good seq numbers +    BOOST_CHECK(ab->push_with_pop_on_full(0, 0, 0)); +    BOOST_CHECK(ab->push_with_pop_on_full(1, 1, 0)); +    BOOST_CHECK(ab->push_with_pop_on_full(2, 2, 0)); +    BOOST_CHECK(ab->push_with_pop_on_full(3, 3, 0)); +    BOOST_CHECK(ab->push_with_pop_on_full(4, 4, 0)); + +    //load index 1 with some skipped seq numbers +    BOOST_CHECK(ab->push_with_pop_on_full(10, 0, 1)); +    BOOST_CHECK(ab->push_with_pop_on_full(11, 1, 1)); +    BOOST_CHECK(ab->push_with_pop_on_full(14, 4, 1)); +    BOOST_CHECK(ab->push_with_pop_on_full(15, 5, 1)); +    BOOST_CHECK(ab->push_with_pop_on_full(16, 6, 1)); + +    //load index 2 with all good seq numbers +    BOOST_CHECK(ab->push_with_pop_on_full(20, 0, 2)); +    BOOST_CHECK(ab->push_with_pop_on_full(21, 1, 2)); +    BOOST_CHECK(ab->push_with_pop_on_full(22, 2, 2)); +    BOOST_CHECK(ab->push_with_pop_on_full(23, 3, 2)); +    BOOST_CHECK(ab->push_with_pop_on_full(24, 4, 2)); + +    //readback aligned values +    std::vector<int> aligned_elems(3); + +    static const std::vector<int> expected_elems0 = list_of(0)(10)(20); +    BOOST_CHECK(ab->pop_elems_with_timed_wait(aligned_elems, timeout)); +    BOOST_CHECK_EQUAL_COLLECTIONS( +        aligned_elems.begin(), aligned_elems.end(), +        expected_elems0.begin(), expected_elems0.end() +    ); + +    static const std::vector<int> expected_elems1 = list_of(1)(11)(21); +    BOOST_CHECK(ab->pop_elems_with_timed_wait(aligned_elems, timeout)); +    BOOST_CHECK_EQUAL_COLLECTIONS( +        aligned_elems.begin(), aligned_elems.end(), +        expected_elems1.begin(), expected_elems1.end() +    ); + +    //there was a skip now find 4 + +    static const std::vector<int> expected_elems4 = list_of(4)(14)(24); +    BOOST_CHECK(ab->pop_elems_with_timed_wait(aligned_elems, timeout)); +    BOOST_CHECK_EQUAL_COLLECTIONS( +        aligned_elems.begin(), aligned_elems.end(), +        expected_elems4.begin(), expected_elems4.end() +    ); +}  | 
