diff options
| author | Josh Blum <josh@joshknows.com> | 2010-10-01 18:22:41 -0700 | 
|---|---|---|
| committer | Josh Blum <josh@joshknows.com> | 2010-10-01 18:22:41 -0700 | 
| commit | 00cd6018405b57a0982b0ce103ff858c646ee18c (patch) | |
| tree | d930efd38829ed3d9f9b79c7ceec503202387288 | |
| parent | a772f4536e46227df3301c637927c3fbfb69a08d (diff) | |
| download | uhd-00cd6018405b57a0982b0ce103ff858c646ee18c.tar.gz uhd-00cd6018405b57a0982b0ce103ff858c646ee18c.tar.bz2 uhd-00cd6018405b57a0982b0ce103ff858c646ee18c.zip | |
uhd: implemented a double timeout (in seconds) for send and recv chains
converted all size_t timeout_ms to double timeout
bounded and alignment buffer now take double timeout
added timeout to device::send and zero_copy_if::get_send_buff
| -rw-r--r-- | host/include/uhd/device.hpp | 27 | ||||
| -rw-r--r-- | host/include/uhd/device.ipp | 9 | ||||
| -rw-r--r-- | host/include/uhd/transport/alignment_buffer.hpp | 9 | ||||
| -rw-r--r-- | host/include/uhd/transport/alignment_buffer.ipp | 15 | ||||
| -rw-r--r-- | host/include/uhd/transport/bounded_buffer.hpp | 12 | ||||
| -rw-r--r-- | host/include/uhd/transport/bounded_buffer.ipp | 15 | ||||
| -rw-r--r-- | host/include/uhd/transport/zero_copy.hpp | 18 | ||||
| -rw-r--r-- | host/lib/transport/libusb1_zero_copy.cpp | 42 | ||||
| -rw-r--r-- | host/lib/transport/udp_zero_copy_asio.cpp | 4 | ||||
| -rw-r--r-- | host/lib/transport/vrt_packet_handler.hpp | 49 | ||||
| -rw-r--r-- | host/lib/transport/zero_copy.cpp | 6 | ||||
| -rw-r--r-- | host/lib/usrp/usrp1/io_impl.cpp | 20 | ||||
| -rw-r--r-- | host/lib/usrp/usrp1/usrp1_impl.cpp | 4 | ||||
| -rw-r--r-- | host/lib/usrp/usrp1/usrp1_impl.hpp | 7 | ||||
| -rw-r--r-- | host/lib/usrp/usrp2/io_impl.cpp | 32 | ||||
| -rw-r--r-- | host/lib/usrp/usrp2/usrp2_impl.hpp | 6 | ||||
| -rw-r--r-- | host/test/buffer_test.cpp | 2 | 
17 files changed, 140 insertions, 137 deletions
| diff --git a/host/include/uhd/device.hpp b/host/include/uhd/device.hpp index 2077cae62..992276928 100644 --- a/host/include/uhd/device.hpp +++ b/host/include/uhd/device.hpp @@ -41,9 +41,6 @@ public:      typedef boost::function<device_addrs_t(const device_addr_t &)> find_t;      typedef boost::function<sptr(const device_addr_t &)> make_t; -    //! A reasonable default timeout for receive -    static const size_t default_recv_timeout_ms = 100; -      /*!       * Register a device into the discovery and factory system.       * @@ -112,12 +109,15 @@ public:       *       * This is a blocking call and will not return until the number       * of samples returned have been read out of each buffer. +     * Under a timeout condition, the number of samples returned +     * may be less than the number of samples specified.       *       * \param buffs a vector of read-only memory containing IF data       * \param nsamps_per_buff the number of samples to send, per buffer       * \param metadata data describing the buffer's contents       * \param io_type the type of data loaded in the buffer       * \param send_mode tells send how to unload the buffer +     * \param timeout the timeout in seconds to wait on a packet       * \return the number of samples sent       */      virtual size_t send( @@ -125,7 +125,8 @@ public:          size_t nsamps_per_buff,          const tx_metadata_t &metadata,          const io_type_t &io_type, -        send_mode_t send_mode +        send_mode_t send_mode, +        double timeout = 0.1      ) = 0;      /*! @@ -136,7 +137,8 @@ public:          size_t nsamps_per_buff,          const tx_metadata_t &metadata,          const io_type_t &io_type, -        send_mode_t send_mode +        send_mode_t send_mode, +        double timeout = 0.1      );      /*! @@ -154,7 +156,9 @@ public:       * See the rx metadata fragment flags and offset fields for details.       *       * This is a blocking call and will not return until the number -     * of samples returned have been written into each buffer or timeout. +     * of samples returned have been written into each buffer. +     * Under a timeout condition, the number of samples returned +     * may be less than the number of samples specified.       *       * When using the full buffer recv mode, the metadata only applies       * to the first packet received and written into the recv buffers. @@ -165,7 +169,7 @@ public:       * \param metadata data to fill describing the buffer       * \param io_type the type of data to fill into the buffer       * \param recv_mode tells recv how to load the buffer -     * \param timeout_ms the timeout in milliseconds to wait for a packet +     * \param timeout the timeout in seconds to wait for a packet       * \return the number of samples received or 0 on error       */      virtual size_t recv( @@ -174,7 +178,7 @@ public:          rx_metadata_t &metadata,          const io_type_t &io_type,          recv_mode_t recv_mode, -        size_t timeout_ms = default_recv_timeout_ms +        double timeout = 0.1      ) = 0;      /*! @@ -186,7 +190,7 @@ public:          rx_metadata_t &metadata,          const io_type_t &io_type,          recv_mode_t recv_mode, -        size_t timeout_ms = default_recv_timeout_ms +        double timeout = 0.1      );      /*! @@ -204,12 +208,11 @@ public:      /*!       * Receive and asynchronous message from the device.       * \param async_metadata the metadata to be filled in -     * \param timeout_ms the timeout in milliseconds to wait for a message +     * \param timeout the timeout in seconds to wait for a message       * \return true when the async_metadata is valid, false for timeout       */      virtual bool recv_async_msg( -        async_metadata_t &async_metadata, -        size_t timeout_ms = default_recv_timeout_ms +        async_metadata_t &async_metadata, double timeout = 0.1      ) = 0;  }; diff --git a/host/include/uhd/device.ipp b/host/include/uhd/device.ipp index 60a3f535d..e2e51ecd0 100644 --- a/host/include/uhd/device.ipp +++ b/host/include/uhd/device.ipp @@ -25,12 +25,13 @@ namespace uhd{          size_t nsamps_per_buff,          const tx_metadata_t &metadata,          const io_type_t &io_type, -        send_mode_t send_mode +        send_mode_t send_mode, +        double timeout      ){          return this->send(              std::vector<const void *>(1, buff),              nsamps_per_buff, metadata, -            io_type, send_mode +            io_type, send_mode, timeout          );      } @@ -40,12 +41,12 @@ namespace uhd{          rx_metadata_t &metadata,          const io_type_t &io_type,          recv_mode_t recv_mode, -        size_t timeout_ms +        double timeout      ){          return this->recv(              std::vector<void *>(1, buff),              nsamps_per_buff, metadata, -            io_type, recv_mode, timeout_ms +            io_type, recv_mode, timeout          );      } diff --git a/host/include/uhd/transport/alignment_buffer.hpp b/host/include/uhd/transport/alignment_buffer.hpp index 29ba74efc..f44a037f8 100644 --- a/host/include/uhd/transport/alignment_buffer.hpp +++ b/host/include/uhd/transport/alignment_buffer.hpp @@ -48,20 +48,17 @@ namespace uhd{ namespace transport{           * \return true if the element fit without popping for space           */          virtual bool push_with_pop_on_full( -            const elem_type &elem, -            const seq_type &seq, -            size_t index +            const elem_type &elem, const seq_type &seq, size_t index          ) = 0;          /*!           * Pop an aligned set of elements from this alignment buffer.           * \param elems a collection to store the aligned elements -         * \param time the timeout time +         * \param timeout the timeout in seconds           * \return false when the operation times out           */          virtual bool pop_elems_with_timed_wait( -            std::vector<elem_type> &elems, -            const time_duration_t &time +            std::vector<elem_type> &elems, double timeout          ) = 0;      }; diff --git a/host/include/uhd/transport/alignment_buffer.ipp b/host/include/uhd/transport/alignment_buffer.ipp index 61b3b60f5..5f09de0d9 100644 --- a/host/include/uhd/transport/alignment_buffer.ipp +++ b/host/include/uhd/transport/alignment_buffer.ipp @@ -41,9 +41,7 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/          }          UHD_INLINE bool push_with_pop_on_full( -            const elem_type &elem, -            const seq_type &seq, -            size_t index +            const elem_type &elem, const seq_type &seq, size_t index          ){              //clear the buffer for this index if the seqs are mis-ordered              if (seq < _last_seqs[index]){ @@ -54,17 +52,16 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/          }          UHD_INLINE bool pop_elems_with_timed_wait( -            std::vector<elem_type> &elems, -            const time_duration_t &time +            std::vector<elem_type> &elems, double timeout          ){ -            boost::system_time exit_time = boost::get_system_time() + time; +            boost::system_time exit_time = boost::get_system_time() + boost::posix_time::microseconds(long(timeout*1e6));              buff_contents_type buff_contents_tmp;              std::list<size_t> indexes_to_do(_all_indexes);              //do an initial pop to load an initial sequence id              size_t index = indexes_to_do.front();              if (not _buffs[index]->pop_with_timed_wait( -                buff_contents_tmp, exit_time - boost::get_system_time() +                buff_contents_tmp, 1e-6*(exit_time - boost::get_system_time()).total_microseconds()              )) return false;              elems[index] = buff_contents_tmp.first;              seq_type expected_seq_id = buff_contents_tmp.second; @@ -79,7 +76,7 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/                      indexes_to_do = _all_indexes;                      index = indexes_to_do.front();                      if (not _buffs[index]->pop_with_timed_wait( -                        buff_contents_tmp, exit_time - boost::get_system_time() +                        buff_contents_tmp, 1e-6*(exit_time - boost::get_system_time()).total_microseconds()                      )) return false;                      elems[index] = buff_contents_tmp.first;                      expected_seq_id = buff_contents_tmp.second; @@ -89,7 +86,7 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/                  //pop an element off for this index                  index = indexes_to_do.front();                  if (not _buffs[index]->pop_with_timed_wait( -                    buff_contents_tmp, exit_time - boost::get_system_time() +                    buff_contents_tmp, 1e-6*(exit_time - boost::get_system_time()).total_microseconds()                  )) return false;                  //if the sequence id matches: diff --git a/host/include/uhd/transport/bounded_buffer.hpp b/host/include/uhd/transport/bounded_buffer.hpp index d1deece96..aca93b071 100644 --- a/host/include/uhd/transport/bounded_buffer.hpp +++ b/host/include/uhd/transport/bounded_buffer.hpp @@ -20,13 +20,9 @@  #include <uhd/config.hpp>  #include <boost/shared_ptr.hpp> -#include <boost/date_time/posix_time/posix_time_types.hpp>  namespace uhd{ namespace transport{ -    //! typedef for the time duration type for wait operations -    typedef boost::posix_time::time_duration time_duration_t; -      /*!       * Implement a templated bounded buffer:       * Used for passing elements between threads in a producer-consumer model. @@ -64,10 +60,10 @@ namespace uhd{ namespace transport{           * Push a new element into the bounded_buffer.           * Wait until the bounded_buffer becomes non-full or timeout.           * \param elem the new element to push -         * \param time the timeout time +         * \param timeout the timeout in seconds           * \return false when the operation times out           */ -        virtual bool push_with_timed_wait(const elem_type &elem, const time_duration_t &time) = 0; +        virtual bool push_with_timed_wait(const elem_type &elem, double timeout) = 0;          /*!           * Pop an element from the bounded_buffer. @@ -80,10 +76,10 @@ namespace uhd{ namespace transport{           * Pop an element from the bounded_buffer.           * Wait until the bounded_buffer becomes non-empty or timeout.           * \param elem the element reference pop to -         * \param time the timeout time +         * \param timeout the timeout in seconds           * \return false when the operation times out           */ -        virtual bool pop_with_timed_wait(elem_type &elem, const time_duration_t &time) = 0; +        virtual bool pop_with_timed_wait(elem_type &elem, double timeout) = 0;          /*!           * Clear all elements from the bounded_buffer. diff --git a/host/include/uhd/transport/bounded_buffer.ipp b/host/include/uhd/transport/bounded_buffer.ipp index e106e229e..71143741e 100644 --- a/host/include/uhd/transport/bounded_buffer.ipp +++ b/host/include/uhd/transport/bounded_buffer.ipp @@ -21,6 +21,7 @@  #include <boost/bind.hpp>  #include <boost/circular_buffer.hpp>  #include <boost/thread/condition.hpp> +#include <boost/date_time/posix_time/posix_time_types.hpp>  namespace uhd{ namespace transport{ namespace{ /*anon*/ @@ -57,9 +58,12 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/              _empty_cond.notify_one();          } -        bool push_with_timed_wait(const elem_type &elem, const time_duration_t &time){ +        bool push_with_timed_wait(const elem_type &elem, double timeout){              boost::unique_lock<boost::mutex> lock(_mutex); -            if (not _full_cond.timed_wait(lock, time, boost::bind(&bounded_buffer_impl<elem_type>::not_full, this))) return false; +            if (not _full_cond.timed_wait( +                lock, boost::posix_time::microseconds(long(timeout*1e6)), +                boost::bind(&bounded_buffer_impl<elem_type>::not_full, this) +            )) return false;              _buffer.push_front(elem);              lock.unlock();              _empty_cond.notify_one(); @@ -74,9 +78,12 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/              _full_cond.notify_one();          } -        bool pop_with_timed_wait(elem_type &elem, const time_duration_t &time){ +        bool pop_with_timed_wait(elem_type &elem, double timeout){              boost::unique_lock<boost::mutex> lock(_mutex); -            if (not _empty_cond.timed_wait(lock, time, boost::bind(&bounded_buffer_impl<elem_type>::not_empty, this))) return false; +            if (not _empty_cond.timed_wait( +                lock, boost::posix_time::microseconds(long(timeout*1e6)), +                boost::bind(&bounded_buffer_impl<elem_type>::not_empty, this) +            )) return false;              elem = _buffer.back(); _buffer.pop_back();              lock.unlock();              _full_cond.notify_one(); diff --git a/host/include/uhd/transport/zero_copy.hpp b/host/include/uhd/transport/zero_copy.hpp index 8ecafd3fb..ba19b193c 100644 --- a/host/include/uhd/transport/zero_copy.hpp +++ b/host/include/uhd/transport/zero_copy.hpp @@ -122,10 +122,10 @@ namespace uhd{ namespace transport{          /*!           * Get a new receive buffer from this transport object. -         * \param timeout_ms the timeout to get the buffer in ms +         * \param timeout the timeout to get the buffer in seconds           * \return a managed buffer, or null sptr on timeout/error           */ -        virtual managed_recv_buffer::sptr get_recv_buff(size_t timeout_ms) = 0; +        virtual managed_recv_buffer::sptr get_recv_buff(double timeout = 0.1) = 0;          /*!           * Get the maximum number of receive frames: @@ -138,9 +138,10 @@ namespace uhd{ namespace transport{          /*!           * Get a new send buffer from this transport object. +         * \param timeout the timeout to get the buffer in seconds           * \return a managed buffer, or null sptr on timeout/error           */ -        virtual managed_send_buffer::sptr get_send_buff(void) = 0; +        virtual managed_send_buffer::sptr get_send_buff(double timeout = 0.1) = 0;          /*!           * Get the maximum number of send frames: @@ -172,19 +173,19 @@ namespace uhd{ namespace transport{          /*!           * Get a new receive buffer from this transport object. -         * \param timeout_ms the timeout to get the buffer in ms +         * \param timeout the timeout to get the buffer in seconds           * \return a managed buffer, or null sptr on timeout/error           */ -        managed_recv_buffer::sptr get_recv_buff(size_t timeout_ms); +        managed_recv_buffer::sptr get_recv_buff(double timeout);      private:          /*!           * Perform a private copying recv.           * \param buff the buffer to write data into -         * \param timeout_ms the timeout to get the buffer in ms +         * \param timeout the timeout to get the buffer in seconds           * \return the number of bytes written to buff, 0 for timeout, negative for error           */ -        virtual ssize_t recv(const boost::asio::mutable_buffer &buff, size_t timeout_ms) = 0; +        virtual ssize_t recv(const boost::asio::mutable_buffer &buff, double timeout) = 0;          UHD_PIMPL_DECL(impl) _impl;      }; @@ -208,9 +209,10 @@ namespace uhd{ namespace transport{          /*!           * Get a new send buffer from this transport object. +         * \param timeout the timeout to get the buffer in seconds           * \return a managed buffer, or null sptr on timeout/error           */ -        managed_send_buffer::sptr get_send_buff(void); +        managed_send_buffer::sptr get_send_buff(double timeout);      private:          /*! diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp index f9beb0b4c..7f2bc3468 100644 --- a/host/lib/transport/libusb1_zero_copy.cpp +++ b/host/lib/transport/libusb1_zero_copy.cpp @@ -24,12 +24,10 @@  #include <boost/thread.hpp>  #include <vector>  #include <iostream> -#include <iomanip>  using namespace uhd::transport; -const int libusb_timeout = 0; - +static const double CLEANUP_TIMEOUT   = 0.2;    //seconds  static const size_t DEFAULT_NUM_XFERS = 16;     //num xfers  static const size_t DEFAULT_XFER_SIZE = 32*512; //bytes @@ -84,10 +82,10 @@ public:       * Get an available transfer:       * For inputs, this is a just filled transfer.       * For outputs, this is a just emptied transfer. -     * \param timeout_ms the timeout to wait for a lut +     * \param timeout the timeout to wait for a lut       * \return the transfer pointer or NULL if timeout       */ -    libusb_transfer *get_lut_with_wait(size_t timeout_ms = 100); +    libusb_transfer *get_lut_with_wait(double timeout);      //Callback use only      void callback_handle_transfer(libusb_transfer *lut); @@ -187,7 +185,7 @@ usb_endpoint::~usb_endpoint(void){      }      //collect canceled transfers (drain the queue) -    while (this->get_lut_with_wait() != NULL){}; +    while (this->get_lut_with_wait(CLEANUP_TIMEOUT) != NULL){};      //free all transfers      BOOST_FOREACH(libusb_transfer *lut, _all_luts){ @@ -274,12 +272,10 @@ void usb_endpoint::print_transfer_status(libusb_transfer *lut){      }  } -libusb_transfer *usb_endpoint::get_lut_with_wait(size_t timeout_ms){ +libusb_transfer *usb_endpoint::get_lut_with_wait(double timeout){      boost::this_thread::disable_interruption di; //disable because the wait can throw      libusb_transfer *lut; -    if (_completed_list->pop_with_timed_wait( -        lut, boost::posix_time::milliseconds(timeout_ms) -    )) return lut; +    if (_completed_list->pop_with_timed_wait(lut, timeout)) return lut;      return NULL;  } @@ -399,8 +395,8 @@ public:          size_t send_xfer_size, size_t send_num_xfers      ); -    managed_recv_buffer::sptr get_recv_buff(size_t timeout_ms); -    managed_send_buffer::sptr get_send_buff(void); +    managed_recv_buffer::sptr get_recv_buff(double); +    managed_send_buffer::sptr get_send_buff(double);      size_t get_num_recv_frames(void) const { return _recv_num_frames; }      size_t get_num_send_frames(void) const { return _send_num_frames; } @@ -459,8 +455,8 @@ libusb_zero_copy_impl::libusb_zero_copy_impl(   * Return empty pointer if no transfer is available (timeout or error).   * \return pointer to a managed receive buffer   */ -managed_recv_buffer::sptr libusb_zero_copy_impl::get_recv_buff(size_t timeout_ms){ -    libusb_transfer *lut = _recv_ep->get_lut_with_wait(timeout_ms); +managed_recv_buffer::sptr libusb_zero_copy_impl::get_recv_buff(double timeout){ +    libusb_transfer *lut = _recv_ep->get_lut_with_wait(timeout);      if (lut == NULL) {          return managed_recv_buffer::sptr();      } @@ -478,8 +474,8 @@ managed_recv_buffer::sptr libusb_zero_copy_impl::get_recv_buff(size_t timeout_ms   * (timeout or error).   * \return pointer to a managed send buffer   */ -managed_send_buffer::sptr libusb_zero_copy_impl::get_send_buff(void){ -    libusb_transfer *lut = _send_ep->get_lut_with_wait(/* TODO timeout API */); +managed_send_buffer::sptr libusb_zero_copy_impl::get_send_buff(double timeout){ +    libusb_transfer *lut = _send_ep->get_lut_with_wait(timeout);      if (lut == NULL) {          return managed_send_buffer::sptr();      } @@ -494,18 +490,18 @@ managed_send_buffer::sptr libusb_zero_copy_impl::get_send_buff(void){   * USB zero_copy make functions   **********************************************************************/  usb_zero_copy::sptr usb_zero_copy::make( -	usb_device_handle::sptr handle, +    usb_device_handle::sptr handle,      unsigned int recv_endpoint, unsigned int send_endpoint, -	size_t recv_xfer_size, size_t recv_num_xfers, -	size_t send_xfer_size, size_t send_num_xfers +    size_t recv_xfer_size, size_t recv_num_xfers, +    size_t send_xfer_size, size_t send_num_xfers  ){      libusb::device_handle::sptr dev_handle(libusb::device_handle::get_cached_handle(          boost::static_pointer_cast<libusb::special_handle>(handle)->get_device()      ));      return sptr(new libusb_zero_copy_impl( -		dev_handle, -		recv_endpoint,  send_endpoint, -		recv_xfer_size, recv_num_xfers, -		send_xfer_size, send_num_xfers +        dev_handle, +        recv_endpoint,  send_endpoint, +        recv_xfer_size, recv_num_xfers, +        send_xfer_size, send_num_xfers      ));  } diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp index 0a6c9f2af..3130830a5 100644 --- a/host/lib/transport/udp_zero_copy_asio.cpp +++ b/host/lib/transport/udp_zero_copy_asio.cpp @@ -109,11 +109,11 @@ private:      boost::asio::io_service        _io_service;      int                            _sock_fd; -    ssize_t recv(const boost::asio::mutable_buffer &buff, size_t timeout_ms){ +    ssize_t recv(const boost::asio::mutable_buffer &buff, double timeout){          //setup timeval for timeout          timeval tv;          tv.tv_sec = 0; -        tv.tv_usec = timeout_ms*1000; +        tv.tv_usec = long(timeout*1e6);          //setup rset for timeout          fd_set rset; diff --git a/host/lib/transport/vrt_packet_handler.hpp b/host/lib/transport/vrt_packet_handler.hpp index b603f1371..e11afff30 100644 --- a/host/lib/transport/vrt_packet_handler.hpp +++ b/host/lib/transport/vrt_packet_handler.hpp @@ -303,18 +303,18 @@ template <typename T> UHD_INLINE T get_context_code(       * Pack a vrt header, copy-convert the data, and send it.       *  - helper function for vrt_packet_handler::send       ******************************************************************/ -    static UHD_INLINE void _send1( +    static UHD_INLINE size_t _send1(          send_state &state,          const std::vector<const void *> &buffs, -        size_t offset_bytes, -        size_t num_samps, +        const size_t offset_bytes, +        const size_t num_samps,          uhd::transport::vrt::if_packet_info_t &if_packet_info,          const uhd::io_type_t &io_type,          const uhd::otw_type_t &otw_type,          const vrt_packer_t &vrt_packer,          const get_send_buffs_t &get_send_buffs, -        size_t vrt_header_offset_words32, -        size_t chans_per_otw_buff +        const size_t vrt_header_offset_words32, +        const size_t chans_per_otw_buff      ){          //load the rest of the if_packet_info in here          if_packet_info.num_payload_words32 = (num_samps*chans_per_otw_buff*otw_type.get_sample_size())/sizeof(boost::uint32_t); @@ -322,7 +322,7 @@ template <typename T> UHD_INLINE T get_context_code(          //get send buffers for each channel          managed_send_buffs_t send_buffs(buffs.size()/chans_per_otw_buff); -        UHD_ASSERT_THROW(get_send_buffs(send_buffs)); +        if (not get_send_buffs(send_buffs)) return 0;          std::vector<const void *> io_buffs(chans_per_otw_buff);          for (size_t i = 0; i < buffs.size(); i+=chans_per_otw_buff){ @@ -347,6 +347,7 @@ template <typename T> UHD_INLINE T get_context_code(                  std::cerr << "commit to send buffer returned less than commit size" << std::endl;              }          } +        return num_samps;      }      /******************************************************************* @@ -381,7 +382,6 @@ template <typename T> UHD_INLINE T get_context_code(          ////////////////////////////////////////////////////////////////          case uhd::device::SEND_MODE_ONE_PACKET:{          //////////////////////////////////////////////////////////////// -            size_t num_samps = std::min(total_num_samps, max_samples_per_packet);              //fill in parts of the packet info overwrote in full buff mode              if_packet_info.has_tsi = metadata.has_time_spec; @@ -389,10 +389,10 @@ template <typename T> UHD_INLINE T get_context_code(              if_packet_info.sob = metadata.start_of_burst;              if_packet_info.eob = metadata.end_of_burst; -            _send1( +            return _send1(                  state,                  buffs, 0, -                num_samps, +                std::min(total_num_samps, max_samples_per_packet),                  if_packet_info,                  io_type, otw_type,                  vrt_packer, @@ -400,31 +400,32 @@ template <typename T> UHD_INLINE T get_context_code(                  vrt_header_offset_words32,                  chans_per_otw_buff              ); -            return num_samps;          }          ////////////////////////////////////////////////////////////////          case uhd::device::SEND_MODE_FULL_BUFF:{          //////////////////////////////////////////////////////////////// -            //calculate constants for fragmentation -            const size_t num_fragments = (total_num_samps+max_samples_per_packet-1)/max_samples_per_packet; -            static const size_t first_fragment_index = 0; -            const size_t final_fragment_index = num_fragments-1; +            size_t total_num_samps_sent = 0;              //loop through the following fragment indexes -            for (size_t n = first_fragment_index; n <= final_fragment_index; n++){ +            while(total_num_samps_sent < total_num_samps){ + +                //calculate per-loop-iteration variables +                const size_t total_num_samps_unsent = total_num_samps - total_num_samps_sent; +                const bool first_fragment = (total_num_samps_sent == 0); +                const bool final_fragment = (total_num_samps_unsent <= max_samples_per_packet);                  //calculate new flags for the fragments -                if_packet_info.has_tsi = metadata.has_time_spec  and (n == first_fragment_index); -                if_packet_info.has_tsf = metadata.has_time_spec  and (n == first_fragment_index); -                if_packet_info.sob     = metadata.start_of_burst and (n == first_fragment_index); -                if_packet_info.eob     = metadata.end_of_burst   and (n == final_fragment_index); +                if_packet_info.has_tsi = metadata.has_time_spec  and first_fragment; +                if_packet_info.has_tsf = if_packet_info.has_tsi; +                if_packet_info.sob     = metadata.start_of_burst and first_fragment; +                if_packet_info.eob     = metadata.end_of_burst   and final_fragment;                  //send the fragment with the helper function -                _send1( +                const size_t num_samps_sent = _send1(                      state, -                    buffs, n*max_samples_per_packet*io_type.size, -                    (n == final_fragment_index)?(total_num_samps%max_samples_per_packet):max_samples_per_packet, +                    buffs, total_num_samps_sent*io_type.size, +                    std::min(total_num_samps_unsent, max_samples_per_packet),                      if_packet_info,                      io_type, otw_type,                      vrt_packer, @@ -432,8 +433,10 @@ template <typename T> UHD_INLINE T get_context_code(                      vrt_header_offset_words32,                      chans_per_otw_buff                  ); +                total_num_samps_sent += num_samps_sent; +                if (num_samps_sent == 0) return total_num_samps_sent;              } -            return total_num_samps; +            return total_num_samps_sent;          }          default: throw std::runtime_error("unknown send mode"); diff --git a/host/lib/transport/zero_copy.cpp b/host/lib/transport/zero_copy.cpp index 1fcf846a0..dfb65951f 100644 --- a/host/lib/transport/zero_copy.cpp +++ b/host/lib/transport/zero_copy.cpp @@ -68,12 +68,12 @@ phony_zero_copy_recv_if::~phony_zero_copy_recv_if(void){      /* NOP */  } -managed_recv_buffer::sptr phony_zero_copy_recv_if::get_recv_buff(size_t timeout_ms){ +managed_recv_buffer::sptr phony_zero_copy_recv_if::get_recv_buff(double timeout){      //allocate memory      boost::uint8_t *recv_mem = new boost::uint8_t[_impl->max_buff_size];      //call recv() with timeout option -    ssize_t num_bytes = this->recv(boost::asio::buffer(recv_mem, _impl->max_buff_size), timeout_ms); +    ssize_t num_bytes = this->recv(boost::asio::buffer(recv_mem, _impl->max_buff_size), timeout);      if (num_bytes <= 0) return managed_recv_buffer::sptr(); //NULL sptr @@ -138,6 +138,6 @@ phony_zero_copy_send_if::~phony_zero_copy_send_if(void){      delete [] _impl->send_mem;  } -managed_send_buffer::sptr phony_zero_copy_send_if::get_send_buff(void){ +managed_send_buffer::sptr phony_zero_copy_send_if::get_send_buff(double){      return _impl->send_buff; //FIXME there is only ever one send buff, we assume that the caller doesnt hang onto these  } diff --git a/host/lib/usrp/usrp1/io_impl.cpp b/host/lib/usrp/usrp1/io_impl.cpp index aee760a83..8d9c68961 100644 --- a/host/lib/usrp/usrp1/io_impl.cpp +++ b/host/lib/usrp/usrp1/io_impl.cpp @@ -94,12 +94,13 @@ struct usrp1_impl::io_impl{      //all of this to ensure only full buffers are committed      managed_send_buffer::sptr send_buff;      size_t num_bytes_committed; +    double send_timeout;      boost::uint8_t pseudo_buff[BYTES_PER_PACKET];      ssize_t phony_commit_pseudo_buff(size_t num_bytes);      ssize_t phony_commit_send_buff(size_t num_bytes);      ssize_t commit_send_buff(void);      void flush_send_buff(void); -    bool get_send_buffs(vrt_packet_handler::managed_send_buffs_t &); +    bool get_send_buffs(vrt_packet_handler::managed_send_buffs_t &, double);      //helpers to get at the send buffer + offset      inline void *get_send_mem_ptr(void){ @@ -159,14 +160,15 @@ void usrp1_impl::io_impl::flush_send_buff(void){   */  ssize_t usrp1_impl::io_impl::commit_send_buff(void){      ssize_t ret = send_buff->commit(num_bytes_committed); -    send_buff = data_transport->get_send_buff(); +    send_buff = data_transport->get_send_buff(send_timeout);      num_bytes_committed = 0;      return ret;  }  bool usrp1_impl::io_impl::get_send_buffs( -    vrt_packet_handler::managed_send_buffs_t &buffs +    vrt_packet_handler::managed_send_buffs_t &buffs, double timeout  ){ +    send_timeout = timeout;      UHD_ASSERT_THROW(buffs.size() == 1);      //not enough bytes free -> use the pseudo buffer @@ -216,7 +218,7 @@ static void usrp1_bs_vrt_packer(  size_t usrp1_impl::send(      const std::vector<const void *> &buffs, size_t num_samps,      const tx_metadata_t &metadata, const io_type_t &io_type, -    send_mode_t send_mode +    send_mode_t send_mode, double timeout  ){      size_t num_samps_sent = vrt_packet_handler::send(          _io_impl->packet_handler_send_state,       //last state of the send handler @@ -225,7 +227,7 @@ size_t usrp1_impl::send(          io_type, _tx_otw_type,                     //input and output types to convert          _clock_ctrl->get_master_clock_freq(),      //master clock tick rate          &usrp1_bs_vrt_packer, -        boost::bind(&usrp1_impl::io_impl::get_send_buffs, _io_impl.get(), _1), +        boost::bind(&usrp1_impl::io_impl::get_send_buffs, _io_impl.get(), _1, timeout),          get_max_send_samps_per_packet(),          0,                                         //vrt header offset          _tx_subdev_spec.size()                     //num channels @@ -272,18 +274,18 @@ static void usrp1_bs_vrt_unpacker(  }  static bool get_recv_buffs( -    zero_copy_if::sptr zc_if, size_t timeout_ms, +    zero_copy_if::sptr zc_if, double timeout,      vrt_packet_handler::managed_recv_buffs_t &buffs  ){      UHD_ASSERT_THROW(buffs.size() == 1); -    buffs[0] = zc_if->get_recv_buff(timeout_ms); +    buffs[0] = zc_if->get_recv_buff(timeout);      return buffs[0].get() != NULL;  }  size_t usrp1_impl::recv(      const std::vector<void *> &buffs, size_t num_samps,      rx_metadata_t &metadata, const io_type_t &io_type, -    recv_mode_t recv_mode, size_t timeout_ms +    recv_mode_t recv_mode, double timeout  ){      size_t num_samps_recvd = vrt_packet_handler::recv(          _io_impl->packet_handler_recv_state,       //last state of the recv handler @@ -292,7 +294,7 @@ size_t usrp1_impl::recv(          io_type, _rx_otw_type,                     //input and output types to convert          _clock_ctrl->get_master_clock_freq(),      //master clock tick rate          &usrp1_bs_vrt_unpacker, -        boost::bind(&get_recv_buffs, _data_transport, timeout_ms, _1), +        boost::bind(&get_recv_buffs, _data_transport, timeout, _1),          &vrt_packet_handler::handle_overflow_nop,          0,                                         //vrt header offset          _rx_subdev_spec.size()                     //num channels diff --git a/host/lib/usrp/usrp1/usrp1_impl.cpp b/host/lib/usrp/usrp1/usrp1_impl.cpp index 156fc119f..6ebd6bb09 100644 --- a/host/lib/usrp/usrp1/usrp1_impl.cpp +++ b/host/lib/usrp/usrp1/usrp1_impl.cpp @@ -209,9 +209,9 @@ usrp1_impl::~usrp1_impl(void){      /* NOP */  } -bool usrp1_impl::recv_async_msg(uhd::async_metadata_t &, size_t timeout_ms){ +bool usrp1_impl::recv_async_msg(uhd::async_metadata_t &, double timeout){      //dummy fill-in for the recv_async_msg -    boost::this_thread::sleep(boost::posix_time::milliseconds(timeout_ms)); +    boost::this_thread::sleep(boost::posix_time::microseconds(long(timeout*1e6)));      return false;  } diff --git a/host/lib/usrp/usrp1/usrp1_impl.hpp b/host/lib/usrp/usrp1/usrp1_impl.hpp index 20ae3c02a..f2c464610 100644 --- a/host/lib/usrp/usrp1/usrp1_impl.hpp +++ b/host/lib/usrp/usrp1/usrp1_impl.hpp @@ -83,13 +83,12 @@ public:                  size_t,                  const uhd::tx_metadata_t &,                  const uhd::io_type_t &, -                send_mode_t); +                send_mode_t, double);      size_t recv(const std::vector<void *> &,                  size_t, uhd::rx_metadata_t &,                  const uhd::io_type_t &, -                recv_mode_t, -                size_t timeout); +                recv_mode_t, double);      static const size_t BYTES_PER_PACKET = 512*4; //under the transfer size @@ -101,7 +100,7 @@ public:          return BYTES_PER_PACKET/_rx_otw_type.get_sample_size()/_rx_subdev_spec.size();      } -    bool recv_async_msg(uhd::async_metadata_t &, size_t); +    bool recv_async_msg(uhd::async_metadata_t &, double);  private:      /*! diff --git a/host/lib/usrp/usrp2/io_impl.cpp b/host/lib/usrp/usrp2/io_impl.cpp index 3395f94e2..c0d8ab029 100644 --- a/host/lib/usrp/usrp2/io_impl.cpp +++ b/host/lib/usrp/usrp2/io_impl.cpp @@ -33,7 +33,6 @@ using namespace uhd::transport;  namespace asio = boost::asio;  static const int underflow_flags = async_metadata_t::EVENT_CODE_UNDERFLOW | async_metadata_t::EVENT_CODE_UNDERFLOW_IN_PACKET; -static const double RECV_TIMEOUT_MS = 100;  /***********************************************************************   * io impl details (internal to this file) @@ -59,9 +58,9 @@ struct usrp2_impl::io_impl{          recv_pirate_crew.join_all();      } -    bool get_recv_buffs(vrt_packet_handler::managed_recv_buffs_t &buffs, size_t timeout_ms){ +    bool get_recv_buffs(vrt_packet_handler::managed_recv_buffs_t &buffs, double timeout){          boost::this_thread::disable_interruption di; //disable because the wait can throw -        return recv_pirate_booty->pop_elems_with_timed_wait(buffs, boost::posix_time::milliseconds(timeout_ms)); +        return recv_pirate_booty->pop_elems_with_timed_wait(buffs, timeout);      }      //state management for the vrt packet handler code @@ -91,7 +90,7 @@ void usrp2_impl::io_impl::recv_pirate_loop(      size_t next_packet_seq = 0;      while(recv_pirate_crew_raiding){ -        managed_recv_buffer::sptr buff = zc_if->get_recv_buff(RECV_TIMEOUT_MS); +        managed_recv_buffer::sptr buff = zc_if->get_recv_buff();          if (not buff.get()) continue; //ignore timeout/error buffers          try{ @@ -151,7 +150,7 @@ void usrp2_impl::io_init(void){          std::memcpy(send_buff->cast<void*>(), &data, sizeof(data));          send_buff->commit(sizeof(data));          //drain the recv buffers (may have junk) -        while (data_transport->get_recv_buff(RECV_TIMEOUT_MS).get()){}; +        while (data_transport->get_recv_buff().get()){};      }      //the number of recv frames is the number for the first transport @@ -179,12 +178,10 @@ void usrp2_impl::io_init(void){   * Async Data   **********************************************************************/  bool usrp2_impl::recv_async_msg( -    async_metadata_t &async_metadata, size_t timeout_ms +    async_metadata_t &async_metadata, double timeout  ){      boost::this_thread::disable_interruption di; //disable because the wait can throw -    return _io_impl->async_msg_fifo->pop_with_timed_wait( -        async_metadata, boost::posix_time::milliseconds(timeout_ms) -    ); +    return _io_impl->async_msg_fifo->pop_with_timed_wait(async_metadata, timeout);  }  /*********************************************************************** @@ -192,19 +189,22 @@ bool usrp2_impl::recv_async_msg(   **********************************************************************/  static bool get_send_buffs(      const std::vector<udp_zero_copy::sptr> &trans, -    vrt_packet_handler::managed_send_buffs_t &buffs +    vrt_packet_handler::managed_send_buffs_t &buffs, +    double timeout  ){      UHD_ASSERT_THROW(trans.size() == buffs.size()); +    bool good = true;      for (size_t i = 0; i < buffs.size(); i++){ -        buffs[i] = trans[i]->get_send_buff(); +        buffs[i] = trans[i]->get_send_buff(timeout); +        good = good and (buffs[i].get() != NULL);      } -    return true; +    return good;  }  size_t usrp2_impl::send(      const std::vector<const void *> &buffs, size_t num_samps,      const tx_metadata_t &metadata, const io_type_t &io_type, -    send_mode_t send_mode +    send_mode_t send_mode, double timeout  ){      return vrt_packet_handler::send(          _io_impl->packet_handler_send_state,       //last state of the send handler @@ -213,7 +213,7 @@ size_t usrp2_impl::send(          io_type, _io_helper.get_tx_otw_type(),     //input and output types to convert          _mboards.front()->get_master_clock_freq(), //master clock tick rate          uhd::transport::vrt::if_hdr_pack_be, -        boost::bind(&get_send_buffs, _data_transports, _1), +        boost::bind(&get_send_buffs, _data_transports, _1, timeout),          get_max_send_samps_per_packet()      );  } @@ -224,7 +224,7 @@ size_t usrp2_impl::send(  size_t usrp2_impl::recv(      const std::vector<void *> &buffs, size_t num_samps,      rx_metadata_t &metadata, const io_type_t &io_type, -    recv_mode_t recv_mode, size_t timeout_ms +    recv_mode_t recv_mode, double timeout  ){      return vrt_packet_handler::recv(          _io_impl->packet_handler_recv_state,       //last state of the recv handler @@ -233,6 +233,6 @@ size_t usrp2_impl::recv(          io_type, _io_helper.get_rx_otw_type(),     //input and output types to convert          _mboards.front()->get_master_clock_freq(), //master clock tick rate          uhd::transport::vrt::if_hdr_unpack_be, -        boost::bind(&usrp2_impl::io_impl::get_recv_buffs, _io_impl.get(), _1, timeout_ms) +        boost::bind(&usrp2_impl::io_impl::get_recv_buffs, _io_impl.get(), _1, timeout)      );  } diff --git a/host/lib/usrp/usrp2/usrp2_impl.hpp b/host/lib/usrp/usrp2/usrp2_impl.hpp index 157d17057..e8763b284 100644 --- a/host/lib/usrp/usrp2/usrp2_impl.hpp +++ b/host/lib/usrp/usrp2/usrp2_impl.hpp @@ -234,7 +234,7 @@ public:      size_t send(          const std::vector<const void *> &, size_t,          const uhd::tx_metadata_t &, const uhd::io_type_t &, -        uhd::device::send_mode_t +        uhd::device::send_mode_t, double      );      size_t get_max_recv_samps_per_packet(void) const{          return _io_helper.get_max_recv_samps_per_packet(); @@ -242,9 +242,9 @@ public:      size_t recv(          const std::vector<void *> &, size_t,          uhd::rx_metadata_t &, const uhd::io_type_t &, -        uhd::device::recv_mode_t, size_t +        uhd::device::recv_mode_t, double      ); -    bool recv_async_msg(uhd::async_metadata_t &, size_t); +    bool recv_async_msg(uhd::async_metadata_t &, double);  private:      //device properties interface diff --git a/host/test/buffer_test.cpp b/host/test/buffer_test.cpp index aadb3f951..8445412e7 100644 --- a/host/test/buffer_test.cpp +++ b/host/test/buffer_test.cpp @@ -23,7 +23,7 @@  using namespace boost::assign;  using namespace uhd::transport; -static const boost::posix_time::milliseconds timeout(10); +static const double timeout = 0.01/*secs*/;  BOOST_AUTO_TEST_CASE(test_bounded_buffer_with_timed_wait){      bounded_buffer<int>::sptr bb(bounded_buffer<int>::make(3)); | 
