diff options
| author | Josh Blum <josh@joshknows.com> | 2010-10-03 19:34:41 -0700 | 
|---|---|---|
| committer | Josh Blum <josh@joshknows.com> | 2010-10-03 19:34:41 -0700 | 
| commit | c6f17f1e2f908747ae1547fa43b2c22c3c20ba50 (patch) | |
| tree | d7247a7467d51b6e035061f16fbac979ce225797 | |
| parent | b57c84b34bcdd6c66eb053695b83e6bd6c481774 (diff) | |
| download | uhd-c6f17f1e2f908747ae1547fa43b2c22c3c20ba50.tar.gz uhd-c6f17f1e2f908747ae1547fa43b2c22c3c20ba50.tar.bz2 uhd-c6f17f1e2f908747ae1547fa43b2c22c3c20ba50.zip | |
uhd: changed buffer allocations to be in a single chunk, udp: pass frame sizes into the impl constructor
| -rw-r--r-- | host/lib/transport/libusb1_zero_copy.cpp | 27 | ||||
| -rw-r--r-- | host/lib/transport/udp_zero_copy_asio.cpp | 85 | 
2 files changed, 59 insertions, 53 deletions
| diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp index c819302b6..ab48e4fc4 100644 --- a/host/lib/transport/libusb1_zero_copy.cpp +++ b/host/lib/transport/libusb1_zero_copy.cpp @@ -100,13 +100,13 @@ private:      lut_buff_type::sptr _completed_list;      //! a list of all transfer structs we allocated -    std::vector<libusb_transfer *>  _all_luts; +    std::vector<libusb_transfer *> _all_luts; -    //! a list of shared arrays for the transfer buffers -    std::vector<boost::shared_array<boost::uint8_t> > _buffers; +    //! a block of memory for the transfer buffers +    boost::shared_array<char> _buffer;      // Calls for processing asynchronous I/O -    libusb_transfer *allocate_transfer(int buff_len); +    libusb_transfer *allocate_transfer(void *mem, size_t len);      void print_transfer_status(libusb_transfer *lut);  }; @@ -154,9 +154,9 @@ usb_endpoint::usb_endpoint(      _input(input)  {      _completed_list = lut_buff_type::make(num_transfers); - +    _buffer = boost::shared_array<char>(new char[num_transfers*transfer_size]);      for (size_t i = 0; i < num_transfers; i++){ -        _all_luts.push_back(allocate_transfer(transfer_size)); +        _all_luts.push_back(allocate_transfer(_buffer.get() + i*transfer_size, transfer_size));          //input luts are immediately submitted to be filled          //output luts go into the completed list as free buffers @@ -193,23 +193,23 @@ usb_endpoint::~usb_endpoint(void){   * Allocate a libusb transfer   * The allocated transfer - and buffer it contains - is repeatedly   * submitted, reaped, and reused and should not be freed until shutdown. - * \param buff_len size of the individual buffer held by each transfer + * \param mem a pointer to the buffer memory + * \param len size of the individual buffer   * \return pointer to an allocated libusb_transfer   */ -libusb_transfer *usb_endpoint::allocate_transfer(int buff_len){ +libusb_transfer *usb_endpoint::allocate_transfer(void *mem, size_t len){      libusb_transfer *lut = libusb_alloc_transfer(0); - -    boost::shared_array<boost::uint8_t> buff(new boost::uint8_t[buff_len]); -    _buffers.push_back(buff); //store a reference to this shared array +    UHD_ASSERT_THROW(lut != NULL);      unsigned int endpoint = ((_endpoint & 0x7f) | (_input ? 0x80 : 0)); +    unsigned char *buff = reinterpret_cast<unsigned char *>(mem);      libusb_transfer_cb_fn lut_callback = libusb_transfer_cb_fn(&callback);      libusb_fill_bulk_transfer(lut,                // transfer                                _handle->get(),     // dev_handle                                endpoint,           // endpoint -                              buff.get(),         // buffer -                              buff_len,           // length +                              buff,               // buffer +                              len,                // length                                lut_callback,       // callback                                this,               // user_data                                0);                 // timeout @@ -232,6 +232,7 @@ void usb_endpoint::submit(libusb_transfer *lut){   * \param lut pointer to an libusb_transfer   */  void usb_endpoint::print_transfer_status(libusb_transfer *lut){ +    std::cout << "here " << lut->status << std::endl;      switch (lut->status) {      case LIBUSB_TRANSFER_COMPLETED:          if (lut->actual_length < lut->length) { diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp index 70e7514a1..a1eb516fc 100644 --- a/host/lib/transport/udp_zero_copy_asio.cpp +++ b/host/lib/transport/udp_zero_copy_asio.cpp @@ -18,6 +18,7 @@  #include <uhd/transport/udp_zero_copy.hpp>  #include <uhd/transport/udp_simple.hpp> //mtu  #include <uhd/transport/bounded_buffer.hpp> +#include <uhd/utils/thread_priority.hpp>  #include <uhd/utils/assert.hpp>  #include <uhd/utils/warning.hpp>  #include <boost/shared_array.hpp> @@ -26,6 +27,7 @@  #include <boost/thread.hpp>  #include <iostream> +using namespace uhd;  using namespace uhd::transport;  namespace asio = boost::asio; @@ -34,11 +36,15 @@ namespace asio = boost::asio;   **********************************************************************/  //enough buffering for half a second of samples at full rate on usrp2  static const size_t MIN_RECV_SOCK_BUFF_SIZE = size_t(4 * 25e6 * 0.5); +  //Large buffers cause more underflow at high rates.  //Perhaps this is due to the kernel scheduling,  //but may change with host-based flow control.  static const size_t MIN_SEND_SOCK_BUFF_SIZE = size_t(10e3); +//the number of async frames to allocate for each send and recv +static const size_t DEFAULT_NUM_ASYNC_FRAMES = 32; +  /***********************************************************************   * Zero Copy UDP implementation with ASIO:   *   This is the portable zero copy implementation for systems @@ -50,51 +56,52 @@ class udp_zero_copy_asio_impl : public udp_zero_copy, public boost::enable_share  public:      typedef boost::shared_ptr<udp_zero_copy_asio_impl> sptr; -    udp_zero_copy_asio_impl(const std::string &addr, const std::string &port){ +    udp_zero_copy_asio_impl( +        const std::string &addr, const std::string &port, +        size_t recv_frame_size, size_t num_recv_frames, +        size_t send_frame_size, size_t num_send_frames +    ): +        _recv_frame_size(recv_frame_size), _num_recv_frames(num_recv_frames), +        _send_frame_size(send_frame_size), _num_send_frames(num_send_frames) +    {          //std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl; -        // resolve the address +        //resolve the address          asio::ip::udp::resolver resolver(_io_service);          asio::ip::udp::resolver::query query(asio::ip::udp::v4(), addr, port);          asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query); -        // create, open, and connect the socket +        //create, open, and connect the socket          _socket = new asio::ip::udp::socket(_io_service);          _socket->open(asio::ip::udp::v4());          _socket->connect(receiver_endpoint);      } -    ~udp_zero_copy_asio_impl(void){ -        _io_service.stop(); -        _thread_group.join_all(); -        delete _socket; -    } - -    /*! -     * Init, the second contructor: -     * Allocate memory and spwan service thread. -     */      void init(void){          //allocate all recv frames and release them to begin xfers -        _pending_recv_buffs = pending_buffs_type::make(this->get_num_recv_frames()); -        for (size_t i = 0; i < this->get_num_recv_frames(); i++){ -            boost::shared_array<char> buff(new char[udp_simple::mtu]); -            _buffers.push_back(buff); //store a reference to this shared array -            release(buff.get()); +        _pending_recv_buffs = pending_buffs_type::make(_num_recv_frames); +        _recv_buffer = boost::shared_array<char>(new char[_num_recv_frames*_recv_frame_size]); +        for (size_t i = 0; i < _num_recv_frames; i++){ +            release(_recv_buffer.get() + i*_recv_frame_size);          }          //allocate all send frames and push them into the fifo -        _pending_send_buffs = pending_buffs_type::make(this->get_num_send_frames()); -        for (size_t i = 0; i < this->get_num_send_frames(); i++){ -            boost::shared_array<char> buff(new char[udp_simple::mtu]); -            _buffers.push_back(buff); //store a reference to this shared array -            handle_send(buff.get()); +        _pending_send_buffs = pending_buffs_type::make(_num_send_frames); +        _send_buffer = boost::shared_array<char>(new char[_num_send_frames*_send_frame_size]); +        for (size_t i = 0; i < _num_send_frames; i++){ +            handle_send(_send_buffer.get() + i*_send_frame_size);          }          //spawn the service thread that will run the io service          _thread_group.create_thread(boost::bind(&udp_zero_copy_asio_impl::service, this));      } +    ~udp_zero_copy_asio_impl(void){ +        _io_service.stop(); +        _thread_group.join_all(); +        delete _socket; +    } +      //get size for internal socket buffer      template <typename Opt> size_t get_buff_size(void) const{          Opt option; @@ -109,19 +116,6 @@ public:          return get_buff_size<Opt>();      } -    //The number of frames is approximately the buffer size divided by the max datagram size. -    //In reality, this is a phony zero-copy interface and the number of frames is infinite. -    //However, its sensible to advertise a frame count that is approximate to buffer size. -    //This way, the transport caller will have an idea about how much buffering to create. - -    size_t get_num_recv_frames(void) const{ -        return this->get_buff_size<asio::socket_base::receive_buffer_size>()/udp_simple::mtu; -    } - -    size_t get_num_send_frames(void) const{ -        return this->get_buff_size<asio::socket_base::send_buffer_size>()/udp_simple::mtu; -    } -      //! pop a filled recv buffer off of the fifo and bind with the release callback      managed_recv_buffer::sptr get_recv_buff(double timeout){          boost::this_thread::disable_interruption di; //disable because the wait can throw @@ -138,6 +132,8 @@ public:          return managed_recv_buffer::sptr();      } +    size_t get_num_recv_frames(void) const {return _num_recv_frames;} +      //! pop an empty send buffer off of the fifo and bind with the commit callback      managed_send_buffer::sptr get_send_buff(double timeout){          boost::this_thread::disable_interruption di; //disable because the wait can throw @@ -154,8 +150,11 @@ public:          return managed_send_buffer::sptr();      } +    size_t get_num_send_frames(void) const {return _num_send_frames;} +  private:      void service(void){ +        set_thread_priority_safe();          _io_service.run();      } @@ -172,7 +171,7 @@ private:      //! release a recv buffer -> start an async recv on the buffer      void release(void *mem){          _socket->async_receive( -            boost::asio::buffer(mem, udp_simple::mtu), +            boost::asio::buffer(mem, _recv_frame_size),              boost::bind(                  &udp_zero_copy_asio_impl::handle_recv,                  shared_from_this(), mem, @@ -184,7 +183,7 @@ private:      //! handle a send callback -> push the emptied memory into the fifo      void handle_send(void *mem){          boost::this_thread::disable_interruption di; //disable because the wait can throw -        _pending_send_buffs->push_with_wait(boost::asio::buffer(mem, udp_simple::mtu)); +        _pending_send_buffs->push_with_wait(boost::asio::buffer(mem, _send_frame_size));      }      //! commit a send buffer -> start an async send on the buffer @@ -204,9 +203,11 @@ private:      //memory management -> buffers and fifos      boost::thread_group _thread_group; -    std::vector<boost::shared_array<char> > _buffers; +    boost::shared_array<char> _send_buffer, _recv_buffer;      typedef bounded_buffer<asio::mutable_buffer> pending_buffs_type;      pending_buffs_type::sptr _pending_recv_buffs, _pending_send_buffs; +    const size_t _recv_frame_size, _num_recv_frames; +    const size_t _send_frame_size, _num_send_frames;  };  /*********************************************************************** @@ -253,7 +254,11 @@ udp_zero_copy::sptr udp_zero_copy::make(      size_t recv_buff_size,      size_t send_buff_size  ){ -    udp_zero_copy_asio_impl::sptr udp_trans(new udp_zero_copy_asio_impl(addr, port)); +    udp_zero_copy_asio_impl::sptr udp_trans(new udp_zero_copy_asio_impl( +        addr, port, +        udp_simple::mtu, DEFAULT_NUM_ASYNC_FRAMES, //recv +        udp_simple::mtu, DEFAULT_NUM_ASYNC_FRAMES  //send +    ));      //call the helper to resize send and recv buffers      resize_buff_helper<asio::socket_base::receive_buffer_size>(udp_trans, recv_buff_size, "recv"); | 
