diff options
Diffstat (limited to 'host')
| -rw-r--r-- | host/lib/transport/libusb1_base.cpp | 19 | ||||
| -rw-r--r-- | host/lib/transport/libusb1_zero_copy.cpp | 98 | ||||
| -rw-r--r-- | host/lib/usrp/b200/b200_io_impl.cpp | 4 | 
3 files changed, 66 insertions, 55 deletions
| diff --git a/host/lib/transport/libusb1_base.cpp b/host/lib/transport/libusb1_base.cpp index 0ef53db0a..4cf3ea17d 100644 --- a/host/lib/transport/libusb1_base.cpp +++ b/host/lib/transport/libusb1_base.cpp @@ -19,10 +19,12 @@  #include <uhd/exception.hpp>  #include <uhd/utils/msg.hpp>  #include <uhd/utils/log.hpp> +#include <uhd/utils/tasks.hpp>  #include <uhd/types/dict.hpp>  #include <boost/weak_ptr.hpp>  #include <boost/thread/mutex.hpp>  #include <boost/foreach.hpp> +#include <boost/bind.hpp>  #include <cstdlib>  #include <iostream> @@ -37,9 +39,11 @@ public:      libusb_session_impl(void){          UHD_ASSERT_THROW(libusb_init(&_context) == 0);          libusb_set_debug(_context, debug_level); +        task_handler = task::make(boost::bind(&libusb_session_impl::libusb_event_handler_task, this, _context));      }      ~libusb_session_impl(void){ +        task_handler.reset();          libusb_exit(_context);      } @@ -49,6 +53,21 @@ public:  private:      libusb_context *_context; +    task::sptr task_handler; + +    /* +     * Task to handle libusb events.  There should only be one thread per libusb_context handling events. +     * Using more than one thread can result in excessive CPU usage in kernel space (presumably from locking/waiting). +     * The libusb documentation says it is safe, which it is, but it neglects to state the cost in CPU usage. +     * Just don't do it! +     */ +    UHD_INLINE void libusb_event_handler_task(libusb_context *context) +    { +        timeval tv; +        tv.tv_sec = 0; +        tv.tv_usec = 100000; +        libusb_handle_events_timeout(context, &tv); +    }  };  libusb::session::sptr libusb::session::get_global_session(void){ diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp index 197e257da..7b1de65c3 100644 --- a/host/lib/transport/libusb1_zero_copy.cpp +++ b/host/lib/transport/libusb1_zero_copy.cpp @@ -73,6 +73,14 @@ struct lut_result_t      int completed;      libusb_transfer_status status;      int actual_length; +    boost::mutex mut; +    boost::condition_variable wait_for_complete; +}; + +struct lut_result_completed { +    lut_result_t& _result; +    lut_result_completed(lut_result_t& result):_result(result) {} +    bool operator()() const {return (_result.completed ? true : false);}  };  /*! @@ -84,48 +92,11 @@ struct lut_result_t  static void LIBUSB_CALL libusb_async_cb(libusb_transfer *lut)  {      lut_result_t *r = (lut_result_t *)lut->user_data; -    r->completed = 1; +    boost::lock_guard<boost::mutex> lock(r->mut);      r->status = lut->status;      r->actual_length = lut->actual_length; -} - -/*! - * Wait for a managed buffer to become complete. - * - * This routine processes async events until the transaction completes. - * We must call the libusb handle events in a loop because the handler - * may complete managed buffers other than the one we are waiting on. - * - * We cannot determine if handle events timed out or processed an event. - * Therefore, the timeout condition is handled by using boost system time. - * - * \param ctx the libusb context structure - * \param timeout the wait timeout in seconds - * \param completed a reference to the completed flag - * \return true for completion, false for timeout - */ -UHD_INLINE bool wait_for_completion(libusb_context *ctx, const double timeout, int &completed) -{ -    //already completed by a previous call? -    if (completed) return true; - -    //perform a non-blocking event handle -    timeval tv; -    tv.tv_sec = 0; -    tv.tv_usec = 0; -    libusb_handle_events_timeout_completed(ctx, &tv, &completed); -    if (completed) return true; - -    //finish the rest with a timeout loop -    const boost::system_time timeout_time = boost::get_system_time() + boost::posix_time::microseconds(long(timeout*1000000)); -    while (not completed and (boost::get_system_time() < timeout_time)){ -        timeval tv; -        tv.tv_sec = 0; -        tv.tv_usec = 10000; /*10ms*/ -        libusb_handle_events_timeout_completed(ctx, &tv, &completed); -    } - -    return completed; +    r->completed = 1; +    r->wait_for_complete.notify_one();  }  /*********************************************************************** @@ -154,7 +125,7 @@ public:      template <typename buffer_type>      UHD_INLINE typename buffer_type::sptr get_new(const double timeout)      { -        if (wait_for_completion(_ctx, timeout, result.completed)) +        if (wait_for_completion(timeout))          {              if (result.status != LIBUSB_TRANSFER_COMPLETED) throw uhd::runtime_error(str(boost::format(                  "usb %s transfer status: %d") % _name % int(result.status))); @@ -164,9 +135,31 @@ public:          return typename buffer_type::sptr();      } +    UHD_INLINE bool flush(double timeout) +    { +        return wait_for_completion(timeout); +    } +      lut_result_t result;  private: +    /*! +     * Wait for a managed buffer to become complete. +     * +     * \param timeout the wait timeout in seconds +     * \return true for completion, false for timeout +     */ +    UHD_INLINE bool wait_for_completion(const double timeout) +    { +        boost::unique_lock<boost::mutex> lock(result.mut); +        if (!result.completed) { +            const boost::system_time timeout_time = boost::get_system_time() + boost::posix_time::microseconds(long(timeout*1000000)); +            result.wait_for_complete.timed_wait(lock, timeout_time, lut_result_completed(result)); +        } +        return result.completed; +    } + +      boost::function<void(libusb_zero_copy_mb *)> _release_cb;      const bool _is_recv;      const std::string _name; @@ -252,8 +245,6 @@ public:      ~libusb_zero_copy_single(void)      { -        libusb_context *ctx = libusb::session::get_global_session()->get_context(); -          //cancel all transfers          BOOST_FOREACH(libusb_transfer *lut, _all_luts)          { @@ -261,8 +252,10 @@ public:          }          //process all transfers until timeout occurs -        int completed = 0; -        wait_for_completion(ctx, 0.01, completed); +        BOOST_FOREACH(libusb_zero_copy_mb *mb, _enqueued) +        { +            mb->flush(0.01); +        }          //free all transfers          BOOST_FOREACH(libusb_transfer *lut, _all_luts) @@ -276,19 +269,18 @@ public:      {          typename buffer_type::sptr buff;          libusb_zero_copy_mb *front = NULL; +        boost::mutex::scoped_lock lock(_mutex); +        if (_enqueued.empty())          { -            boost::mutex::scoped_lock l(_mutex); -            if (_enqueued.empty()) -            { -                _cond.timed_wait(l, boost::posix_time::microseconds(long(timeout*1e6))); -            } -            if (_enqueued.empty()) return buff; -            front = _enqueued.front(); +            _cond.timed_wait(l, boost::posix_time::microseconds(long(timeout*1e6)));          } +        if (_enqueued.empty()) return buff; +        front = _enqueued.front(); +        lock.unlock();          buff = front->get_new<buffer_type>(timeout); +        lock.lock(); -        boost::mutex::scoped_lock l(_mutex);          if (buff) _enqueued.pop_front();          this->submit_what_we_can();          return buff; diff --git a/host/lib/usrp/b200/b200_io_impl.cpp b/host/lib/usrp/b200/b200_io_impl.cpp index d643ef855..f2d463a79 100644 --- a/host/lib/usrp/b200/b200_io_impl.cpp +++ b/host/lib/usrp/b200/b200_io_impl.cpp @@ -231,14 +231,14 @@ rx_streamer::sptr b200_impl::get_rx_stream(const uhd::stream_args_t &args_)          //calculate packet size          static const size_t hdr_size = 0              + vrt::max_if_hdr_words32*sizeof(boost::uint32_t) -            + sizeof(vrt::if_packet_info_t().tlr) //forced to have trailer +            //+ sizeof(vrt::if_packet_info_t().tlr) //forced to have trailer              - sizeof(vrt::if_packet_info_t().cid) //no class id ever used              - sizeof(vrt::if_packet_info_t().tsi) //no int time ever used          ;          const size_t bpp = _data_transport->get_recv_frame_size() - hdr_size;          const size_t bpi = convert::get_bytes_per_item(args.otw_format);          size_t spp = unsigned(args.args.cast<double>("spp", bpp/bpi)); -        spp = std::min<size_t>(2000, spp); //magic maximum for framing at full rate +        spp = std::min<size_t>(4092, spp); //magic maximum for framing at full rate          //make the new streamer given the samples per packet          if (not my_streamer) my_streamer = boost::make_shared<sph::recv_packet_streamer>(spp); | 
