From dc8bcfde805228ed5d00334ce44c6c0684dcfe2c Mon Sep 17 00:00:00 2001 From: Josh Blum Date: Sat, 25 Sep 2010 21:07:15 -0700 Subject: usb: work on libusb code to use a single context across all calls libusb allocation stuff had been moved inside of smart pointer classes to handle automatic cleanup the public device handle implementation now holds an actual libusb device inside of it needs testing - all platforms --- host/lib/transport/libusb1_zero_copy.cpp | 83 +++++++++++--------------------- 1 file changed, 29 insertions(+), 54 deletions(-) (limited to 'host/lib/transport/libusb1_zero_copy.cpp') diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp index b3a160462..41cd4b3fc 100644 --- a/host/lib/transport/libusb1_zero_copy.cpp +++ b/host/lib/transport/libusb1_zero_copy.cpp @@ -24,7 +24,6 @@ using namespace uhd::transport; -const int libusb_debug_level = 0; const int libusb_timeout = 0; /*********************************************************************** @@ -57,8 +56,8 @@ void pp_transfer(libusb_transfer *lut) **********************************************************************/ class usb_endpoint { private: - libusb_device_handle *_dev_handle; - libusb_context *_ctx; + libusb::device_handle::sptr _handle; + libusb::session::sptr _session; int _endpoint; bool _input; @@ -90,9 +89,8 @@ private: void print_transfer_status(libusb_transfer *lut); public: - usb_endpoint(libusb_device_handle *dev_handle, - libusb_context *ctx, int endpoint, bool input, - size_t transfer_size, size_t num_transfers); + usb_endpoint(libusb::device_handle::sptr handle, libusb::session::sptr sess, + int endpoint, bool input, size_t transfer_size, size_t num_transfers); ~usb_endpoint(); @@ -143,11 +141,11 @@ void usb_endpoint::callback_handle_transfer(libusb_transfer *lut) * submit the transfers so that they're ready to return when * data is available. */ -usb_endpoint::usb_endpoint(libusb_device_handle *dev_handle, - libusb_context *ctx, int endpoint, bool input, - size_t transfer_size, size_t num_transfers) - : _dev_handle(dev_handle), - _ctx(ctx), _endpoint(endpoint), _input(input), +usb_endpoint::usb_endpoint( + libusb::device_handle::sptr handle, libusb::session::sptr sess, + int endpoint, bool input, size_t transfer_size, size_t num_transfers) + : _handle(handle), _session(sess), + _endpoint(endpoint), _input(input), _transfer_size(transfer_size), _num_transfers(num_transfers) { unsigned int i; @@ -203,7 +201,7 @@ libusb_transfer *usb_endpoint::allocate_transfer(int buff_len) unsigned int endpoint = ((_endpoint & 0x7f) | (_input ? 0x80 : 0)); libusb_fill_bulk_transfer(lut, // transfer - _dev_handle, // dev_handle + _handle->get(), // dev_handle endpoint, // endpoint buff, // buffer buff_len, // length @@ -356,7 +354,7 @@ bool usb_endpoint::reap_pending_list() { int retval; - if ((retval = libusb_handle_events(_ctx)) < 0) { + if ((retval = libusb_handle_events(_session->get_context())) < 0) { std::cerr << "error: libusb_handle_events: " << retval << std::endl; return false; } @@ -383,7 +381,7 @@ bool usb_endpoint::reap_pending_list_timeout() size_t pending_list_size = _pending_list.size(); - if ((retval = libusb_handle_events_timeout(_ctx, &tv)) < 0) { + if ((retval = libusb_handle_events_timeout(_session->get_context(), &tv)) < 0) { std::cerr << "error: libusb_handle_events: " << retval << std::endl; return false; } @@ -613,11 +611,8 @@ private: usb_endpoint *_rx_ep; usb_endpoint *_tx_ep; - // Maintain libusb values - libusb_context *_rx_ctx; - libusb_context *_tx_ctx; - libusb_device_handle *_rx_dev_handle; - libusb_device_handle *_tx_dev_handle; + libusb::device_handle::sptr _handle; + libusb::session::sptr _session; size_t _recv_buff_size; size_t _send_buff_size; @@ -626,7 +621,7 @@ private: public: typedef boost::shared_ptr sptr; - libusb_zero_copy_impl(usb_device_handle::sptr handle, + libusb_zero_copy_impl(libusb::device_handle::sptr handle, unsigned int rx_endpoint, unsigned int tx_endpoint, size_t recv_buff_size, @@ -646,44 +641,27 @@ public: * Initializes libusb, opens devices, and sets up interfaces for I/O. * Finally, creates endpoints for asynchronous I/O. */ -libusb_zero_copy_impl::libusb_zero_copy_impl(usb_device_handle::sptr handle, +libusb_zero_copy_impl::libusb_zero_copy_impl(libusb::device_handle::sptr handle, unsigned int rx_endpoint, unsigned int tx_endpoint, size_t buff_size, size_t block_size) - : _rx_ctx(NULL), _tx_ctx(NULL), _rx_dev_handle(NULL), _tx_dev_handle(NULL), + : _handle(handle), _session(libusb::session::get_global_session()), _recv_buff_size(block_size), _send_buff_size(block_size), _num_frames(buff_size / block_size) { - // Initialize libusb with separate contexts to allow - // thread safe operation of transmit and receive - libusb::init(&_rx_ctx, libusb_debug_level); - libusb::init(&_tx_ctx, libusb_debug_level); - - UHD_ASSERT_THROW((_rx_ctx != NULL) && (_tx_ctx != NULL)); - - // Find and open the libusb_device corresponding to the - // given handle and return the libusb_device_handle - // that can be used for I/O purposes. - _rx_dev_handle = libusb::open_device(_rx_ctx, handle); - _tx_dev_handle = libusb::open_device(_tx_ctx, handle); - - // Open USB interfaces for tx/rx using magic values. - // IN interface: 2 - // OUT interface: 1 - // Control interface: 0 - libusb::open_interface(_rx_dev_handle, 2); - libusb::open_interface(_tx_dev_handle, 1); - - _rx_ep = new usb_endpoint(_rx_dev_handle, // libusb device_handle - _rx_ctx, // libusb context + _handle->claim_interface(2 /*in interface*/); + _handle->claim_interface(1 /*out interface*/); + + _rx_ep = new usb_endpoint(_handle, // libusb device_handle + _session, // libusb session w/ context rx_endpoint, // USB endpoint number true, // IN endpoint _recv_buff_size, // buffer size per transfer _num_frames); // number of libusb transfers - _tx_ep = new usb_endpoint(_tx_dev_handle, // libusb device_handle - _tx_ctx, // libusb context + _tx_ep = new usb_endpoint(_handle, // libusb device_handle + _session, // libusb session w/ context tx_endpoint, // USB endpoint number false, // OUT endpoint _send_buff_size, // buffer size per transfer @@ -694,13 +672,7 @@ libusb_zero_copy_impl::libusb_zero_copy_impl(usb_device_handle::sptr handle, libusb_zero_copy_impl::~libusb_zero_copy_impl() { delete _rx_ep; - delete _tx_ep; - - libusb_close(_rx_dev_handle); - libusb_close(_tx_dev_handle); - - libusb_exit(_rx_ctx); - libusb_exit(_tx_ctx); + delete _tx_ep; } @@ -755,7 +727,10 @@ usb_zero_copy::sptr usb_zero_copy::make(usb_device_handle::sptr handle, size_t block_size) { - return sptr(new libusb_zero_copy_impl(handle, + libusb::device_handle::sptr dev_handle(libusb::device_handle::get_cached_handle( + boost::static_pointer_cast(handle)->get_device() + )); + return sptr(new libusb_zero_copy_impl(dev_handle, rx_endpoint, tx_endpoint, buff_size, -- cgit v1.2.3 From 0816925695aa69a1ae344863fef47d239b3ec8af Mon Sep 17 00:00:00 2001 From: Josh Blum Date: Sun, 26 Sep 2010 20:23:23 -0700 Subject: usb: zero copy work, multiple endpoints with single context, async io Heavy work on the zero copy interface and endpoint wrappers to properly use the async io. The global libusb session starts a thread to run the event handler, the async callbacks push completed transfers onto a thread-safe bounded buffer. The managed buffer creation routines use the bounded buffer to efficiently pop off completed transfers. works on linux, throws a weird exception on cleanup --- host/lib/transport/libusb1_base.cpp | 16 ++ host/lib/transport/libusb1_zero_copy.cpp | 474 ++++++++----------------------- 2 files changed, 134 insertions(+), 356 deletions(-) (limited to 'host/lib/transport/libusb1_zero_copy.cpp') diff --git a/host/lib/transport/libusb1_base.cpp b/host/lib/transport/libusb1_base.cpp index 5ff996642..26e864459 100644 --- a/host/lib/transport/libusb1_base.cpp +++ b/host/lib/transport/libusb1_base.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include using namespace uhd::transport; @@ -32,9 +33,12 @@ public: libusb_session_impl(void){ UHD_ASSERT_THROW(libusb_init(&_context) == 0); libusb_set_debug(_context, debug_level); + _thread_group.create_thread(boost::bind(&libusb_session_impl::run_event_loop, this)); } ~libusb_session_impl(void){ + _running = false; + _thread_group.join_all(); libusb_exit(_context); } @@ -44,6 +48,18 @@ public: private: libusb_context *_context; + boost::thread_group _thread_group; + bool _running; + + void run_event_loop(void){ + _running = true; + timeval tv; + while(_running){ + tv.tv_sec = 0; + tv.tv_usec = 100000; //100ms + libusb_handle_events_timeout(this->get_context(), &tv); + } + } }; libusb::session::sptr libusb::session::get_global_session(void){ diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp index 41cd4b3fc..e84793e88 100644 --- a/host/lib/transport/libusb1_zero_copy.cpp +++ b/host/lib/transport/libusb1_zero_copy.cpp @@ -17,8 +17,11 @@ #include "libusb1_base.hpp" #include +#include #include -#include +#include +#include +#include #include #include @@ -55,52 +58,57 @@ void pp_transfer(libusb_transfer *lut) * interface provided by the kernel. **********************************************************************/ class usb_endpoint { +public: + typedef boost::shared_ptr sptr; + + usb_endpoint( + libusb::device_handle::sptr handle, + int endpoint, + bool input, + size_t transfer_size, + size_t num_transfers + ); + + ~usb_endpoint(void); + + // Exposed interface for submitting / retrieving transfer buffers + + //! Submit a new transfer that was presumably just filled or emptied. + void submit(libusb_transfer *lut); + + /*! + * Get an available transfer: + * For inputs, this is a just filled transfer. + * For outputs, this is a just emptied transfer. + * \param timeout_ms the timeout to wait for a lut + * \return the transfer pointer or NULL if timeout + */ + libusb_transfer *get_lut_with_wait(size_t timeout_ms = 100); + + //Callback use only + void callback_handle_transfer(libusb_transfer *lut); + private: libusb::device_handle::sptr _handle; - libusb::session::sptr _session; int _endpoint; bool _input; size_t _transfer_size; size_t _num_transfers; - // Transfer state lists (transfers are free, pending, or completed) - std::list _free_list; - std::list _pending_list; - std::list _completed_list; + //! hold a bounded buffer of completed transfers + typedef bounded_buffer lut_buff_type; + lut_buff_type::sptr _completed_list; + + //! a list of all transfer structs we allocated + std::vector _all_luts; + + //! a list of shared arrays for the transfer buffers + std::vector > _buffers; // Calls for processing asynchronous I/O libusb_transfer *allocate_transfer(int buff_len); - bool cancel(libusb_transfer *lut); - bool cancel_all(); - bool reap_pending_list(); - bool reap_pending_list_timeout(); - bool reap_completed_list(); - - // Transfer state manipulators - void free_list_add(libusb_transfer *lut); - void pending_list_add(libusb_transfer *lut); - void completed_list_add(libusb_transfer *lut); - libusb_transfer *free_list_get(); - libusb_transfer *completed_list_get(); - bool pending_list_remove(libusb_transfer *lut); - - // Debug use void print_transfer_status(libusb_transfer *lut); - -public: - usb_endpoint(libusb::device_handle::sptr handle, libusb::session::sptr sess, - int endpoint, bool input, size_t transfer_size, size_t num_transfers); - - ~usb_endpoint(); - - // Exposed interface for submitting / retrieving transfer buffers - bool submit(libusb_transfer *lut); - libusb_transfer *get_completed_transfer(); - libusb_transfer *get_free_transfer(); - - //Callback use only - void callback_handle_transfer(libusb_transfer *lut); }; @@ -113,9 +121,8 @@ public: * it from the pending to completed status list. * \param lut pointer to libusb_transfer */ -static void callback(libusb_transfer *lut) -{ - usb_endpoint *endpoint = (usb_endpoint *) lut->user_data; +static void callback(libusb_transfer *lut){ + usb_endpoint *endpoint = (usb_endpoint *) lut->user_data; endpoint->callback_handle_transfer(lut); } @@ -124,14 +131,8 @@ static void callback(libusb_transfer *lut) * Accessor call to allow list access from callback space * \param pointer to libusb_transfer */ -void usb_endpoint::callback_handle_transfer(libusb_transfer *lut) -{ - if (!pending_list_remove(lut)) { - std::cerr << "USB: pending remove failed" << std::endl; - return; - } - - completed_list_add(lut); +void usb_endpoint::callback_handle_transfer(libusb_transfer *lut){ + _completed_list->push_with_wait(lut); } @@ -142,18 +143,27 @@ void usb_endpoint::callback_handle_transfer(libusb_transfer *lut) * data is available. */ usb_endpoint::usb_endpoint( - libusb::device_handle::sptr handle, libusb::session::sptr sess, - int endpoint, bool input, size_t transfer_size, size_t num_transfers) - : _handle(handle), _session(sess), - _endpoint(endpoint), _input(input), - _transfer_size(transfer_size), _num_transfers(num_transfers) + libusb::device_handle::sptr handle, + int endpoint, + bool input, + size_t transfer_size, + size_t num_transfers +): + _handle(handle), + _endpoint(endpoint), + _input(input), + _transfer_size(transfer_size), + _num_transfers(num_transfers) { - unsigned int i; - for (i = 0; i < _num_transfers; i++) { - free_list_add(allocate_transfer(_transfer_size)); + _completed_list = lut_buff_type::make(num_transfers); - if (_input) - submit(free_list_get()); + for (size_t i = 0; i < _num_transfers; i++){ + _all_luts.push_back(allocate_transfer(_transfer_size)); + + //input luts are immediately submitted to be filled + //output luts go into the completed list as free buffers + if (_input) this->submit(_all_luts.back()); + else _completed_list->push_with_wait(_all_luts.back()); } } @@ -165,22 +175,21 @@ usb_endpoint::usb_endpoint( * the transfers. Libusb will deallocate the data buffer held by * each transfer. */ -usb_endpoint::~usb_endpoint() -{ - cancel_all(); - - while (!_pending_list.empty()) { - if (!reap_pending_list()) - std::cerr << "error: destructor failed to reap" << std::endl; +usb_endpoint::~usb_endpoint(void){ + //cancel all transfers + BOOST_FOREACH(libusb_transfer *lut, _all_luts){ + libusb_cancel_transfer(lut); } - while (!_completed_list.empty()) { - if (!reap_completed_list()) - std::cerr << "error: destructor failed to reap" << std::endl; - } + //collect canceled transfers (drain the queue) + libusb_transfer *lut; + while(_completed_list->pop_with_timed_wait( + lut, boost::posix_time::milliseconds(100) + )); - while (!_free_list.empty()) { - libusb_free_transfer(free_list_get()); + //free all transfers + BOOST_FOREACH(libusb_transfer *lut, _all_luts){ + libusb_free_transfer(lut); } } @@ -192,20 +201,21 @@ usb_endpoint::~usb_endpoint() * \param buff_len size of the individual buffer held by each transfer * \return pointer to an allocated libusb_transfer */ -libusb_transfer *usb_endpoint::allocate_transfer(int buff_len) -{ +libusb_transfer *usb_endpoint::allocate_transfer(int buff_len){ libusb_transfer *lut = libusb_alloc_transfer(0); - unsigned char *buff = new unsigned char[buff_len]; + boost::shared_array buff(new boost::uint8_t[buff_len]); + _buffers.push_back(buff); //store a reference to this shared array unsigned int endpoint = ((_endpoint & 0x7f) | (_input ? 0x80 : 0)); + libusb_transfer_cb_fn lut_callback = libusb_transfer_cb_fn(&callback); libusb_fill_bulk_transfer(lut, // transfer _handle->get(), // dev_handle endpoint, // endpoint - buff, // buffer + buff.get(), // buffer buff_len, // length - libusb_transfer_cb_fn(callback), // callback + lut_callback, // callback this, // user_data 0); // timeout return lut; @@ -218,95 +228,15 @@ libusb_transfer *usb_endpoint::allocate_transfer(int buff_len) * \param lut pointer to libusb_transfer * \return true on success or false on error */ -bool usb_endpoint::submit(libusb_transfer *lut) -{ - int retval; - if ((retval = libusb_submit_transfer(lut)) < 0) { - std::cerr << "error: libusb_submit_transfer: " << retval << std::endl; - return false; - } - - pending_list_add(lut); - return true; -} - - -/* - * Cancel a pending transfer - * Search the pending list for the transfer and cancel if found. - * \param lut pointer to libusb_transfer to cancel - * \return true on success or false if transfer is not found - * - * Note: success only indicates submission of cancelation request. - * Sucessful cancelation is not known until the callback occurs. - */ -bool usb_endpoint::cancel(libusb_transfer *lut) -{ - std::list::iterator iter; - for (iter = _pending_list.begin(); iter != _pending_list.end(); iter++) { - if (*iter == lut) { - libusb_cancel_transfer(lut); - return true; - } - } - return false; -} - - -/* - * Cancel all pending transfers - * \return bool true if cancelation request is submitted - * - * Note: success only indicates submission of cancelation request. - * Sucessful cancelation is not known until the callback occurs. - */ -bool usb_endpoint::cancel_all() -{ - std::list::iterator iter; - - for (iter = _pending_list.begin(); iter != _pending_list.end(); iter++) { - if (libusb_cancel_transfer(*iter) < 0) { - std::cerr << "error: libusb_cancal_transfer() failed" << std::endl; - return false; - } - } - - return true; -} - - -/* - * Reap completed transfers - * return true if at least one transfer was reaped, false otherwise. - * Check completed transfers for errors and mark as free. This is a - * blocking call. - * \return bool true if a libusb transfer is reaped, false otherwise - */ -bool usb_endpoint::reap_completed_list() -{ - libusb_transfer *lut; - - if (_completed_list.empty()) { - if (!reap_pending_list_timeout()) - return false; - } - - while (!_completed_list.empty()) { - lut = completed_list_get(); - print_transfer_status(lut); - free_list_add(lut); - } - - return true; +void usb_endpoint::submit(libusb_transfer *lut){ + UHD_ASSERT_THROW(libusb_submit_transfer(lut) == 0); } - /* * Print status errors of a completed transfer * \param lut pointer to an libusb_transfer */ -void usb_endpoint::print_transfer_status(libusb_transfer *lut) -{ +void usb_endpoint::print_transfer_status(libusb_transfer *lut){ switch (lut->status) { case LIBUSB_TRANSFER_COMPLETED: if (lut->actual_length < lut->length) { @@ -342,166 +272,14 @@ void usb_endpoint::print_transfer_status(libusb_transfer *lut) } } - -/* - * Reap pending transfers without timeout - * This is a blocking call. Reaping submitted transfers is - * handled by libusb and the assigned callback function. - * Block until at least one transfer is reaped. - * \return true true if a transfer was reaped or false otherwise - */ -bool usb_endpoint::reap_pending_list() -{ - int retval; - - if ((retval = libusb_handle_events(_session->get_context())) < 0) { - std::cerr << "error: libusb_handle_events: " << retval << std::endl; - return false; - } - - return true; -} - - -/* - * Reap pending transfers with timeout - * This call blocks until a transfer is reaped or timeout. - * Reaping submitted transfers is handled by libusb and the - * assigned callback function. Block until at least one - * transfer is reaped or timeout occurs. - * \return true if a transfer was reaped or false otherwise - */ -bool usb_endpoint::reap_pending_list_timeout() -{ - int retval; - timeval tv; - - tv.tv_sec = 0; - tv.tv_usec = 100000; //100ms - - size_t pending_list_size = _pending_list.size(); - - if ((retval = libusb_handle_events_timeout(_session->get_context(), &tv)) < 0) { - std::cerr << "error: libusb_handle_events: " << retval << std::endl; - return false; - } - - if (_pending_list.size() < pending_list_size) { - return true; - } - else { - return false; - } -} - - -/* - * Get a free transfer - * The transfer has an empty data bufer for OUT requests - * \return pointer to a libusb_transfer - */ -libusb_transfer *usb_endpoint::get_free_transfer() -{ - if (_free_list.empty()) { - if (!reap_completed_list()) - return NULL; - } - - return free_list_get(); -} - - -/* - * Get a completed transfer - * The transfer has a full data buffer for IN requests - * \return pointer to libusb_transfer - */ -libusb_transfer *usb_endpoint::get_completed_transfer() -{ - if (_completed_list.empty()) { - if (!reap_pending_list_timeout()) - return NULL; - } - - return completed_list_get(); -} - -/* - * List operations - */ -void usb_endpoint::free_list_add(libusb_transfer *lut) -{ - _free_list.push_back(lut); -} - -void usb_endpoint::pending_list_add(libusb_transfer *lut) -{ - _pending_list.push_back(lut); -} - -void usb_endpoint::completed_list_add(libusb_transfer *lut) -{ - _completed_list.push_back(lut); -} - - -/* - * Free and completed lists don't have ordered content - * Pop transfers from the front as needed - */ -libusb_transfer *usb_endpoint::free_list_get() -{ - libusb_transfer *lut; - - if (_free_list.size() == 0) { - return NULL; - } - else { - lut = _free_list.front(); - _free_list.pop_front(); - return lut; - } -} - - -/* - * Free and completed lists don't have ordered content - * Pop transfers from the front as needed - */ -libusb_transfer *usb_endpoint::completed_list_get() -{ +libusb_transfer *usb_endpoint::get_lut_with_wait(size_t timeout_ms){ libusb_transfer *lut; - - if (_completed_list.empty()) { - return NULL; - } - else { - lut = _completed_list.front(); - _completed_list.pop_front(); - return lut; - } + if (_completed_list->pop_with_timed_wait( + lut, boost::posix_time::milliseconds(timeout_ms) + )) return lut; + return NULL; } - -/* - * Search and remove transfer from pending list - * Assuming that the callbacks occur in order, the front element - * should yield the correct transfer. If not, then something else - * is going on. If no transfers match, then something went wrong. - */ -bool usb_endpoint::pending_list_remove(libusb_transfer *lut) -{ - std::list::iterator iter; - for (iter = _pending_list.begin(); iter != _pending_list.end(); iter++) { - if (*iter == lut) { - _pending_list.erase(iter); - return true; - } - } - return false; -} - - /*********************************************************************** * Managed buffers **********************************************************************/ @@ -515,17 +293,15 @@ bool usb_endpoint::pending_list_remove(libusb_transfer *lut) class libusb_managed_recv_buffer_impl : public managed_recv_buffer { public: libusb_managed_recv_buffer_impl(libusb_transfer *lut, - usb_endpoint *endpoint) + usb_endpoint::sptr endpoint) : _buff(lut->buffer, lut->length) { _lut = lut; _endpoint = endpoint; } - ~libusb_managed_recv_buffer_impl() - { - if (!_endpoint->submit(_lut)) - std::cerr << "USB: failed to submit IN transfer" << std::endl; + ~libusb_managed_recv_buffer_impl(void){ + _endpoint->submit(_lut); } private: @@ -535,7 +311,7 @@ private: } libusb_transfer *_lut; - usb_endpoint *_endpoint; + usb_endpoint::sptr _endpoint; const boost::asio::const_buffer _buff; }; @@ -551,7 +327,7 @@ private: class libusb_managed_send_buffer_impl : public managed_send_buffer { public: libusb_managed_send_buffer_impl(libusb_transfer *lut, - usb_endpoint *endpoint, + usb_endpoint::sptr endpoint, size_t buff_size) : _buff(lut->buffer, buff_size), _committed(false) { @@ -559,8 +335,7 @@ public: _endpoint = endpoint; } - ~libusb_managed_send_buffer_impl() - { + ~libusb_managed_send_buffer_impl(void){ if (!_committed) { _lut->length = 0; _lut->actual_length = 0; @@ -580,12 +355,14 @@ public: _lut->length = num_bytes; _lut->actual_length = 0; - if (_endpoint->submit(_lut)) { + try{ + _endpoint->submit(_lut); _committed = true; return num_bytes; } - else { - return 0; + catch(const std::exception &e){ + std::cerr << "Error in commit: " << e.what() << std::endl; + return -1; } } @@ -596,7 +373,7 @@ private: } libusb_transfer *_lut; - usb_endpoint *_endpoint; + usb_endpoint::sptr _endpoint; const boost::asio::mutable_buffer _buff; bool _committed; }; @@ -608,11 +385,9 @@ private: class libusb_zero_copy_impl : public usb_zero_copy { private: - usb_endpoint *_rx_ep; - usb_endpoint *_tx_ep; + usb_endpoint::sptr _rx_ep, _tx_ep; libusb::device_handle::sptr _handle; - libusb::session::sptr _session; size_t _recv_buff_size; size_t _send_buff_size; @@ -626,8 +401,6 @@ public: unsigned int tx_endpoint, size_t recv_buff_size, size_t send_buff_size); - - ~libusb_zero_copy_impl(); managed_recv_buffer::sptr get_recv_buff(void); managed_send_buffer::sptr get_send_buff(void); @@ -646,45 +419,38 @@ libusb_zero_copy_impl::libusb_zero_copy_impl(libusb::device_handle::sptr handle, unsigned int tx_endpoint, size_t buff_size, size_t block_size) - : _handle(handle), _session(libusb::session::get_global_session()), + : _handle(handle), _recv_buff_size(block_size), _send_buff_size(block_size), _num_frames(buff_size / block_size) { _handle->claim_interface(2 /*in interface*/); _handle->claim_interface(1 /*out interface*/); - _rx_ep = new usb_endpoint(_handle, // libusb device_handle - _session, // libusb session w/ context + _rx_ep = usb_endpoint::sptr(new usb_endpoint( + _handle, // libusb device_handle rx_endpoint, // USB endpoint number true, // IN endpoint _recv_buff_size, // buffer size per transfer - _num_frames); // number of libusb transfers + _num_frames // number of libusb transfers + )); - _tx_ep = new usb_endpoint(_handle, // libusb device_handle - _session, // libusb session w/ context + _tx_ep = usb_endpoint::sptr(new usb_endpoint( + _handle, // libusb device_handle tx_endpoint, // USB endpoint number false, // OUT endpoint _send_buff_size, // buffer size per transfer - _num_frames); // number of libusb transfers -} - - -libusb_zero_copy_impl::~libusb_zero_copy_impl() -{ - delete _rx_ep; - delete _tx_ep; + _num_frames // number of libusb transfers + )); } - /* * Construct a managed receive buffer from a completed libusb transfer * (happy with buffer full of data) obtained from the receive endpoint. * Return empty pointer if no transfer is available (timeout or error). * \return pointer to a managed receive buffer */ -managed_recv_buffer::sptr libusb_zero_copy_impl::get_recv_buff() -{ - libusb_transfer *lut = _rx_ep->get_completed_transfer(); +managed_recv_buffer::sptr libusb_zero_copy_impl::get_recv_buff(void){ + libusb_transfer *lut = _rx_ep->get_lut_with_wait(/* TODO timeout API */); if (lut == NULL) { return managed_recv_buffer::sptr(); } @@ -702,9 +468,8 @@ managed_recv_buffer::sptr libusb_zero_copy_impl::get_recv_buff() * (timeout or error). * \return pointer to a managed send buffer */ -managed_send_buffer::sptr libusb_zero_copy_impl::get_send_buff() -{ - libusb_transfer *lut = _tx_ep->get_free_transfer(); +managed_send_buffer::sptr libusb_zero_copy_impl::get_send_buff(void){ + libusb_transfer *lut = _tx_ep->get_lut_with_wait(/* TODO timeout API */); if (lut == NULL) { return managed_send_buffer::sptr(); } @@ -736,6 +501,3 @@ usb_zero_copy::sptr usb_zero_copy::make(usb_device_handle::sptr handle, buff_size, block_size)); } - - - -- cgit v1.2.3 From 86213f8de2cc4ca671f9afbcf475c744fe7aebcb Mon Sep 17 00:00:00 2001 From: Josh Blum Date: Sun, 26 Sep 2010 20:54:58 -0700 Subject: usb: disable thread interruption on wait calls --- host/lib/transport/libusb1_zero_copy.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'host/lib/transport/libusb1_zero_copy.cpp') diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp index e84793e88..d3888240b 100644 --- a/host/lib/transport/libusb1_zero_copy.cpp +++ b/host/lib/transport/libusb1_zero_copy.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -132,6 +133,7 @@ static void callback(libusb_transfer *lut){ * \param pointer to libusb_transfer */ void usb_endpoint::callback_handle_transfer(libusb_transfer *lut){ + boost::this_thread::disable_interruption di; //disable because the wait can throw _completed_list->push_with_wait(lut); } @@ -182,10 +184,7 @@ usb_endpoint::~usb_endpoint(void){ } //collect canceled transfers (drain the queue) - libusb_transfer *lut; - while(_completed_list->pop_with_timed_wait( - lut, boost::posix_time::milliseconds(100) - )); + while (this->get_lut_with_wait() != NULL); //free all transfers BOOST_FOREACH(libusb_transfer *lut, _all_luts){ @@ -273,6 +272,7 @@ void usb_endpoint::print_transfer_status(libusb_transfer *lut){ } libusb_transfer *usb_endpoint::get_lut_with_wait(size_t timeout_ms){ + boost::this_thread::disable_interruption di; //disable because the wait can throw libusb_transfer *lut; if (_completed_list->pop_with_timed_wait( lut, boost::posix_time::milliseconds(timeout_ms) -- cgit v1.2.3 From 2f62c39b7135728ed06faa06db46004323b1c70f Mon Sep 17 00:00:00 2001 From: Josh Blum Date: Mon, 27 Sep 2010 11:35:28 -0700 Subject: uhd: fix warning by adding brackets for while(cond){}; --- host/examples/test_async_messages.cpp | 2 +- host/lib/transport/libusb1_zero_copy.cpp | 2 +- host/lib/usrp/usrp2/io_impl.cpp | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) (limited to 'host/lib/transport/libusb1_zero_copy.cpp') diff --git a/host/examples/test_async_messages.cpp b/host/examples/test_async_messages.cpp index 4c9d18121..0a5c076ea 100644 --- a/host/examples/test_async_messages.cpp +++ b/host/examples/test_async_messages.cpp @@ -58,7 +58,7 @@ void test_no_async_message(uhd::usrp::single_usrp::sptr sdev){ " Got unexpected event code 0x%x.\n" ) % async_md.event_code << std::endl; //clear the async messages - while (dev->recv_async_msg(async_md, 0)); + while (dev->recv_async_msg(async_md, 0)){}; } else{ std::cout << boost::format( diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp index d3888240b..b089d11e0 100644 --- a/host/lib/transport/libusb1_zero_copy.cpp +++ b/host/lib/transport/libusb1_zero_copy.cpp @@ -184,7 +184,7 @@ usb_endpoint::~usb_endpoint(void){ } //collect canceled transfers (drain the queue) - while (this->get_lut_with_wait() != NULL); + while (this->get_lut_with_wait() != NULL){}; //free all transfers BOOST_FOREACH(libusb_transfer *lut, _all_luts){ diff --git a/host/lib/usrp/usrp2/io_impl.cpp b/host/lib/usrp/usrp2/io_impl.cpp index 91a1b2344..65411801d 100644 --- a/host/lib/usrp/usrp2/io_impl.cpp +++ b/host/lib/usrp/usrp2/io_impl.cpp @@ -150,7 +150,7 @@ void usrp2_impl::io_init(void){ std::memcpy(send_buff->cast(), &data, sizeof(data)); send_buff->commit(sizeof(data)); //drain the recv buffers (may have junk) - while (data_transport->get_recv_buff().get()); + while (data_transport->get_recv_buff().get()){}; } //the number of recv frames is the number for the first transport -- cgit v1.2.3 From 543a63648f11d0e502e897f3cd98667005580c9e Mon Sep 17 00:00:00 2001 From: Josh Blum Date: Wed, 29 Sep 2010 15:35:54 -0700 Subject: usrp: transfer resize options and documentation --- host/docs/usrp1.rst | 27 ++++++ host/include/uhd/transport/usb_zero_copy.hpp | 24 +++-- host/lib/transport/libusb1_zero_copy.cpp | 140 ++++++++++++++------------- host/lib/usrp/usrp1/usrp1_impl.cpp | 26 +++-- 4 files changed, 137 insertions(+), 80 deletions(-) (limited to 'host/lib/transport/libusb1_zero_copy.cpp') diff --git a/host/docs/usrp1.rst b/host/docs/usrp1.rst index 3c1431d30..7cf447719 100644 --- a/host/docs/usrp1.rst +++ b/host/docs/usrp1.rst @@ -95,6 +95,27 @@ Notice that the subdevice name is always specified in the 3 possible cases. B:B + +------------------------------------------------------------------------ +Change USB transfer parameters +------------------------------------------------------------------------ +The advanced user may manipulate parameters of the usb bulk transfers +for various reasons, such as lowering latency or increasing buffer size. +By default, the UHD will use values for these parameters +that are known to perform well on a variety of systems. + +The following device address can be used to manipulate USB bulk transfers: + +* **recv_xfer_size:** the size of each receive bulk transfer in bytes +* **recv_num_xfers:** the number of simultaneous receive bulk transfers +* **send_xfer_size:** the size of each send bulk transfer in bytes +* **send_num_xfers:** the number of simultaneous send bulk transfers + +Example, set the args string to the following: +:: + + serial=12345678, recv_num_xfers=16 + ------------------------------------------------------------------------ OS Specific Notes ------------------------------------------------------------------------ @@ -113,3 +134,9 @@ so that non-root users may access the device: sudo mv tmpfile /etc/udev/rules.d/10-usrp.rules sudo udevadm control --reload-rules +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Install libusb driver on Windows +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +On Windows, a driver must be installed the first time the USRP1 is attached to the host computer. +A download link for this driver can be found on the Ettus Research UHD wiki page. +Download and unpack the driver, and direct the Windows driver install wizard to the *.inf file. diff --git a/host/include/uhd/transport/usb_zero_copy.hpp b/host/include/uhd/transport/usb_zero_copy.hpp index 75232c22a..61bf380ba 100644 --- a/host/include/uhd/transport/usb_zero_copy.hpp +++ b/host/include/uhd/transport/usb_zero_copy.hpp @@ -45,16 +45,22 @@ public: * The underlying implementation may be platform specific. * * \param handle a device handle that uniquely identifying the device - * \param rx_endpoint an integer specifiying an IN endpoint number - * \param tx_endpoint an integer specifiying an OUT endpoint number - * \param buff_size total number of bytes of buffer space to allocate - * \param block_size number of bytes allocated for each I/O transaction + * \param recv_endpoint an integer specifiying an IN endpoint number + * \param send_endpoint an integer specifiying an OUT endpoint number + * \param recv_xfer_size the number of bytes for each receive transfer + * \param recv_num_xfers the number of simultaneous receive transfers + * \param send_xfer_size the number of bytes for each send transfer + * \param send_num_xfers the number of simultaneous send transfers */ - static sptr make(usb_device_handle::sptr handle, - unsigned int rx_endpoint, - unsigned int tx_endpoint, - size_t buff_size = 0, - size_t block_size = 0); + static sptr make( + usb_device_handle::sptr handle, + unsigned int recv_endpoint, + unsigned int send_endpoint, + size_t recv_xfer_size = 0, + size_t recv_num_xfers = 0, + size_t send_xfer_size = 0, + size_t send_num_xfers = 0 + ); }; }} //namespace diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp index b089d11e0..f2dcff6b5 100644 --- a/host/lib/transport/libusb1_zero_copy.cpp +++ b/host/lib/transport/libusb1_zero_copy.cpp @@ -30,6 +30,9 @@ using namespace uhd::transport; const int libusb_timeout = 0; +static const size_t DEFAULT_NUM_XFERS = 16; //num xfers +static const size_t DEFAULT_XFER_SIZE = 32*512; //bytes + /*********************************************************************** * Helper functions ***********************************************************************/ @@ -56,7 +59,7 @@ void pp_transfer(libusb_transfer *lut) * create a bidirectional interface. It is a zero copy implementation * with respect to libusb, however, each send and recv requires a copy * operation from kernel to userspace; this is due to the usbfs - * interface provided by the kernel. + * interface provided by the kernel. **********************************************************************/ class usb_endpoint { public: @@ -107,7 +110,7 @@ private: //! a list of shared arrays for the transfer buffers std::vector > _buffers; - // Calls for processing asynchronous I/O + // Calls for processing asynchronous I/O libusb_transfer *allocate_transfer(int buff_len); void print_transfer_status(libusb_transfer *lut); }; @@ -142,7 +145,7 @@ void usb_endpoint::callback_handle_transfer(libusb_transfer *lut){ * Constructor * Allocate libusb transfers and mark as free. For IN endpoints, * submit the transfers so that they're ready to return when - * data is available. + * data is available. */ usb_endpoint::usb_endpoint( libusb::device_handle::sptr handle, @@ -194,7 +197,7 @@ usb_endpoint::~usb_endpoint(void){ /* - * Allocate a libusb transfer + * Allocate a libusb transfer * The allocated transfer - and buffer it contains - is repeatedly * submitted, reaped, and reused and should not be freed until shutdown. * \param buff_len size of the individual buffer held by each transfer @@ -225,7 +228,7 @@ libusb_transfer *usb_endpoint::allocate_transfer(int buff_len){ * Asynchonous transfer submission * Submit a libusb transfer to libusb add pending status * \param lut pointer to libusb_transfer - * \return true on success or false on error + * \return true on success or false on error */ void usb_endpoint::submit(libusb_transfer *lut){ UHD_ASSERT_THROW(libusb_submit_transfer(lut) == 0); @@ -281,14 +284,14 @@ libusb_transfer *usb_endpoint::get_lut_with_wait(size_t timeout_ms){ } /*********************************************************************** - * Managed buffers + * Managed buffers **********************************************************************/ /* * Libusb managed receive buffer * Construct a recv buffer from a libusb transfer. The memory held by * the libusb transfer is exposed through the managed buffer interface. * Upon destruction, the transfer and buffer are resubmitted to the - * endpoint for further use. + * endpoint for further use. */ class libusb_managed_recv_buffer_impl : public managed_recv_buffer { public: @@ -307,7 +310,7 @@ public: private: const boost::asio::const_buffer &get() const { - return _buff; + return _buff; } libusb_transfer *_lut; @@ -327,9 +330,8 @@ private: class libusb_managed_send_buffer_impl : public managed_send_buffer { public: libusb_managed_send_buffer_impl(libusb_transfer *lut, - usb_endpoint::sptr endpoint, - size_t buff_size) - : _buff(lut->buffer, buff_size), _committed(false) + usb_endpoint::sptr endpoint) + : _buff(lut->buffer, lut->length), _committed(false) { _lut = lut; _endpoint = endpoint; @@ -349,7 +351,7 @@ public: std::cerr << "UHD: send buffer already committed" << std::endl; return 0; } - + UHD_ASSERT_THROW(num_bytes <= boost::asio::buffer_size(_buff)); _lut->length = num_bytes; @@ -369,7 +371,7 @@ public: private: const boost::asio::mutable_buffer &get() const { - return _buff; + return _buff; } libusb_transfer *_lut; @@ -385,61 +387,71 @@ private: class libusb_zero_copy_impl : public usb_zero_copy { private: - usb_endpoint::sptr _rx_ep, _tx_ep; - libusb::device_handle::sptr _handle; - - size_t _recv_buff_size; - size_t _send_buff_size; - size_t _num_frames; + size_t _recv_num_frames, _send_num_frames; + usb_endpoint::sptr _recv_ep, _send_ep; public: typedef boost::shared_ptr sptr; - libusb_zero_copy_impl(libusb::device_handle::sptr handle, - unsigned int rx_endpoint, - unsigned int tx_endpoint, - size_t recv_buff_size, - size_t send_buff_size); + libusb_zero_copy_impl( + libusb::device_handle::sptr handle, + unsigned int recv_endpoint, unsigned int send_endpoint, + size_t recv_xfer_size, size_t recv_num_xfers, + size_t send_xfer_size, size_t send_num_xfers + ); managed_recv_buffer::sptr get_recv_buff(void); managed_send_buffer::sptr get_send_buff(void); - size_t get_num_recv_frames(void) const { return _num_frames; } - size_t get_num_send_frames(void) const { return _num_frames; } + size_t get_num_recv_frames(void) const { return _recv_num_frames; } + size_t get_num_send_frames(void) const { return _send_num_frames; } }; /* * Constructor * Initializes libusb, opens devices, and sets up interfaces for I/O. - * Finally, creates endpoints for asynchronous I/O. + * Finally, creates endpoints for asynchronous I/O. */ -libusb_zero_copy_impl::libusb_zero_copy_impl(libusb::device_handle::sptr handle, - unsigned int rx_endpoint, - unsigned int tx_endpoint, - size_t buff_size, - size_t block_size) - : _handle(handle), - _recv_buff_size(block_size), _send_buff_size(block_size), - _num_frames(buff_size / block_size) -{ - _handle->claim_interface(2 /*in interface*/); +libusb_zero_copy_impl::libusb_zero_copy_impl( + libusb::device_handle::sptr handle, + unsigned int recv_endpoint, unsigned int send_endpoint, + size_t recv_xfer_size, size_t recv_num_xfers, + size_t send_xfer_size, size_t send_num_xfers +){ + _handle = handle; + + //if the sizes are left at 0 (automatic) -> use the defaults + if (recv_xfer_size == 0) recv_xfer_size = DEFAULT_XFER_SIZE; + if (recv_num_xfers == 0) recv_num_xfers = DEFAULT_NUM_XFERS; + if (send_xfer_size == 0) send_xfer_size = DEFAULT_XFER_SIZE; + if (send_num_xfers == 0) send_num_xfers = DEFAULT_NUM_XFERS; + + //sanity check the transfer sizes + UHD_ASSERT_THROW(recv_xfer_size % 512 == 0); + UHD_ASSERT_THROW(send_xfer_size % 512 == 0); + + //store the num xfers for the num frames count + _recv_num_frames = recv_num_xfers; + _send_num_frames = send_num_xfers; + + _handle->claim_interface(2 /*in interface*/); _handle->claim_interface(1 /*out interface*/); - _rx_ep = usb_endpoint::sptr(new usb_endpoint( + _recv_ep = usb_endpoint::sptr(new usb_endpoint( _handle, // libusb device_handle - rx_endpoint, // USB endpoint number + recv_endpoint, // USB endpoint number true, // IN endpoint - _recv_buff_size, // buffer size per transfer - _num_frames // number of libusb transfers + recv_xfer_size, // buffer size per transfer + recv_num_xfers // number of libusb transfers )); - _tx_ep = usb_endpoint::sptr(new usb_endpoint( + _send_ep = usb_endpoint::sptr(new usb_endpoint( _handle, // libusb device_handle - tx_endpoint, // USB endpoint number + send_endpoint, // USB endpoint number false, // OUT endpoint - _send_buff_size, // buffer size per transfer - _num_frames // number of libusb transfers + send_xfer_size, // buffer size per transfer + send_num_xfers // number of libusb transfers )); } @@ -447,17 +459,17 @@ libusb_zero_copy_impl::libusb_zero_copy_impl(libusb::device_handle::sptr handle, * Construct a managed receive buffer from a completed libusb transfer * (happy with buffer full of data) obtained from the receive endpoint. * Return empty pointer if no transfer is available (timeout or error). - * \return pointer to a managed receive buffer + * \return pointer to a managed receive buffer */ managed_recv_buffer::sptr libusb_zero_copy_impl::get_recv_buff(void){ - libusb_transfer *lut = _rx_ep->get_lut_with_wait(/* TODO timeout API */); + libusb_transfer *lut = _recv_ep->get_lut_with_wait(/* TODO timeout API */); if (lut == NULL) { return managed_recv_buffer::sptr(); } else { return managed_recv_buffer::sptr( new libusb_managed_recv_buffer_impl(lut, - _rx_ep)); + _recv_ep)); } } @@ -466,38 +478,36 @@ managed_recv_buffer::sptr libusb_zero_copy_impl::get_recv_buff(void){ * Construct a managed send buffer from a free libusb transfer (with * empty buffer). Return empty pointer of no transfer is available * (timeout or error). - * \return pointer to a managed send buffer + * \return pointer to a managed send buffer */ managed_send_buffer::sptr libusb_zero_copy_impl::get_send_buff(void){ - libusb_transfer *lut = _tx_ep->get_lut_with_wait(/* TODO timeout API */); + libusb_transfer *lut = _send_ep->get_lut_with_wait(/* TODO timeout API */); if (lut == NULL) { return managed_send_buffer::sptr(); } else { return managed_send_buffer::sptr( new libusb_managed_send_buffer_impl(lut, - _tx_ep, - _send_buff_size)); + _send_ep)); } } - /*********************************************************************** * USB zero_copy make functions **********************************************************************/ -usb_zero_copy::sptr usb_zero_copy::make(usb_device_handle::sptr handle, - unsigned int rx_endpoint, - unsigned int tx_endpoint, - size_t buff_size, - size_t block_size) - -{ +usb_zero_copy::sptr usb_zero_copy::make( + usb_device_handle::sptr handle, + unsigned int recv_endpoint, unsigned int send_endpoint, + size_t recv_xfer_size, size_t recv_num_xfers, + size_t send_xfer_size, size_t send_num_xfers +){ libusb::device_handle::sptr dev_handle(libusb::device_handle::get_cached_handle( boost::static_pointer_cast(handle)->get_device() )); - return sptr(new libusb_zero_copy_impl(dev_handle, - rx_endpoint, - tx_endpoint, - buff_size, - block_size)); + return sptr(new libusb_zero_copy_impl( + dev_handle, + recv_endpoint, send_endpoint, + recv_xfer_size, recv_num_xfers, + send_xfer_size, send_num_xfers + )); } diff --git a/host/lib/usrp/usrp1/usrp1_impl.cpp b/host/lib/usrp/usrp1/usrp1_impl.cpp index b4c23bf12..793e3027d 100644 --- a/host/lib/usrp/usrp1/usrp1_impl.cpp +++ b/host/lib/usrp/usrp1/usrp1_impl.cpp @@ -30,6 +30,7 @@ #include #include #include +#include #include using namespace uhd; @@ -107,6 +108,15 @@ static device_addrs_t usrp1_find(const device_addr_t &hint) /*********************************************************************** * Make **********************************************************************/ +template static output_type cast_from_dev_addr( + const device_addr_t &device_addr, + const std::string &key, + output_type def_val +){ + return (device_addr.has_key(key))? + boost::lexical_cast(device_addr[key]) : def_val; +} + static device::sptr usrp1_make(const device_addr_t &device_addr) { //extract the FPGA path for the USRP1 @@ -130,11 +140,15 @@ static device::sptr usrp1_make(const device_addr_t &device_addr) usrp_ctrl = usrp_ctrl::make(ctrl_transport); usrp_ctrl->usrp_load_fpga(usrp1_fpga_image); - data_transport = usb_zero_copy::make(handle, // identifier - 6, // IN endpoint - 2, // OUT endpoint - 2 * (1 << 20), // buffer size - 16384); // transfer size + data_transport = usb_zero_copy::make( + handle, // identifier + 6, // IN endpoint + 2, // OUT endpoint + size_t(cast_from_dev_addr(device_addr, "recv_xfer_size", 0)), + size_t(cast_from_dev_addr(device_addr, "recv_num_xfers", 0)), + size_t(cast_from_dev_addr(device_addr, "send_xfer_size", 0)), + size_t(cast_from_dev_addr(device_addr, "send_num_xfers", 0)) + ); break; } } @@ -173,7 +187,7 @@ usrp1_impl::usrp1_impl(uhd::transport::usb_zero_copy::sptr data_transport, //initialize the mboard mboard_init(); - //initialize the dboards + //initialize the dboards dboard_init(); //initialize the dsps -- cgit v1.2.3 From 2c8a7c7debf19d92065661cc1d258f97bd38e224 Mon Sep 17 00:00:00 2001 From: Josh Blum Date: Thu, 30 Sep 2010 14:36:24 -0700 Subject: uhd: implemented recv timeout for zero copy interface --- host/include/uhd/transport/zero_copy.hpp | 11 +++++--- host/lib/transport/libusb1_zero_copy.cpp | 42 +++++++++++++++---------------- host/lib/transport/udp_zero_copy_asio.cpp | 5 ++-- host/lib/transport/zero_copy.cpp | 4 +-- host/lib/usrp/usrp1/io_impl.cpp | 8 +++--- host/lib/usrp/usrp2/io_impl.cpp | 5 ++-- 6 files changed, 39 insertions(+), 36 deletions(-) (limited to 'host/lib/transport/libusb1_zero_copy.cpp') diff --git a/host/include/uhd/transport/zero_copy.hpp b/host/include/uhd/transport/zero_copy.hpp index 513291b63..8ecafd3fb 100644 --- a/host/include/uhd/transport/zero_copy.hpp +++ b/host/include/uhd/transport/zero_copy.hpp @@ -122,9 +122,10 @@ namespace uhd{ namespace transport{ /*! * Get a new receive buffer from this transport object. + * \param timeout_ms the timeout to get the buffer in ms * \return a managed buffer, or null sptr on timeout/error */ - virtual managed_recv_buffer::sptr get_recv_buff(void) = 0; + virtual managed_recv_buffer::sptr get_recv_buff(size_t timeout_ms) = 0; /*! * Get the maximum number of receive frames: @@ -171,16 +172,19 @@ namespace uhd{ namespace transport{ /*! * Get a new receive buffer from this transport object. + * \param timeout_ms the timeout to get the buffer in ms + * \return a managed buffer, or null sptr on timeout/error */ - managed_recv_buffer::sptr get_recv_buff(void); + managed_recv_buffer::sptr get_recv_buff(size_t timeout_ms); private: /*! * Perform a private copying recv. * \param buff the buffer to write data into + * \param timeout_ms the timeout to get the buffer in ms * \return the number of bytes written to buff, 0 for timeout, negative for error */ - virtual ssize_t recv(const boost::asio::mutable_buffer &buff) = 0; + virtual ssize_t recv(const boost::asio::mutable_buffer &buff, size_t timeout_ms) = 0; UHD_PIMPL_DECL(impl) _impl; }; @@ -204,6 +208,7 @@ namespace uhd{ namespace transport{ /*! * Get a new send buffer from this transport object. + * \return a managed buffer, or null sptr on timeout/error */ managed_send_buffer::sptr get_send_buff(void); diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp index f2dcff6b5..f9beb0b4c 100644 --- a/host/lib/transport/libusb1_zero_copy.cpp +++ b/host/lib/transport/libusb1_zero_copy.cpp @@ -308,8 +308,7 @@ public: } private: - const boost::asio::const_buffer &get() const - { + const boost::asio::const_buffer &get(void) const{ return _buff; } @@ -369,8 +368,7 @@ public: } private: - const boost::asio::mutable_buffer &get() const - { + const boost::asio::mutable_buffer &get(void) const{ return _buff; } @@ -395,13 +393,13 @@ public: typedef boost::shared_ptr sptr; libusb_zero_copy_impl( - libusb::device_handle::sptr handle, - unsigned int recv_endpoint, unsigned int send_endpoint, - size_t recv_xfer_size, size_t recv_num_xfers, - size_t send_xfer_size, size_t send_num_xfers - ); + libusb::device_handle::sptr handle, + unsigned int recv_endpoint, unsigned int send_endpoint, + size_t recv_xfer_size, size_t recv_num_xfers, + size_t send_xfer_size, size_t send_num_xfers + ); - managed_recv_buffer::sptr get_recv_buff(void); + managed_recv_buffer::sptr get_recv_buff(size_t timeout_ms); managed_send_buffer::sptr get_send_buff(void); size_t get_num_recv_frames(void) const { return _recv_num_frames; } @@ -419,23 +417,23 @@ libusb_zero_copy_impl::libusb_zero_copy_impl( size_t recv_xfer_size, size_t recv_num_xfers, size_t send_xfer_size, size_t send_num_xfers ){ - _handle = handle; + _handle = handle; - //if the sizes are left at 0 (automatic) -> use the defaults - if (recv_xfer_size == 0) recv_xfer_size = DEFAULT_XFER_SIZE; - if (recv_num_xfers == 0) recv_num_xfers = DEFAULT_NUM_XFERS; - if (send_xfer_size == 0) send_xfer_size = DEFAULT_XFER_SIZE; - if (send_num_xfers == 0) send_num_xfers = DEFAULT_NUM_XFERS; + //if the sizes are left at 0 (automatic) -> use the defaults + if (recv_xfer_size == 0) recv_xfer_size = DEFAULT_XFER_SIZE; + if (recv_num_xfers == 0) recv_num_xfers = DEFAULT_NUM_XFERS; + if (send_xfer_size == 0) send_xfer_size = DEFAULT_XFER_SIZE; + if (send_num_xfers == 0) send_num_xfers = DEFAULT_NUM_XFERS; //sanity check the transfer sizes UHD_ASSERT_THROW(recv_xfer_size % 512 == 0); UHD_ASSERT_THROW(send_xfer_size % 512 == 0); - //store the num xfers for the num frames count - _recv_num_frames = recv_num_xfers; - _send_num_frames = send_num_xfers; + //store the num xfers for the num frames count + _recv_num_frames = recv_num_xfers; + _send_num_frames = send_num_xfers; - _handle->claim_interface(2 /*in interface*/); + _handle->claim_interface(2 /*in interface*/); _handle->claim_interface(1 /*out interface*/); _recv_ep = usb_endpoint::sptr(new usb_endpoint( @@ -461,8 +459,8 @@ libusb_zero_copy_impl::libusb_zero_copy_impl( * Return empty pointer if no transfer is available (timeout or error). * \return pointer to a managed receive buffer */ -managed_recv_buffer::sptr libusb_zero_copy_impl::get_recv_buff(void){ - libusb_transfer *lut = _recv_ep->get_lut_with_wait(/* TODO timeout API */); +managed_recv_buffer::sptr libusb_zero_copy_impl::get_recv_buff(size_t timeout_ms){ + libusb_transfer *lut = _recv_ep->get_lut_with_wait(timeout_ms); if (lut == NULL) { return managed_recv_buffer::sptr(); } diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp index ee989ee2b..0a6c9f2af 100644 --- a/host/lib/transport/udp_zero_copy_asio.cpp +++ b/host/lib/transport/udp_zero_copy_asio.cpp @@ -35,7 +35,6 @@ static const size_t MIN_RECV_SOCK_BUFF_SIZE = size_t(sizeof(boost::uint32_t) * 2 //Perhaps this is due to the kernel scheduling, //but may change with host-based flow control. static const size_t MIN_SEND_SOCK_BUFF_SIZE = size_t(10e3); -static const double RECV_TIMEOUT = 0.1; //100 ms /*********************************************************************** * Zero Copy UDP implementation with ASIO: @@ -110,11 +109,11 @@ private: boost::asio::io_service _io_service; int _sock_fd; - ssize_t recv(const boost::asio::mutable_buffer &buff){ + ssize_t recv(const boost::asio::mutable_buffer &buff, size_t timeout_ms){ //setup timeval for timeout timeval tv; tv.tv_sec = 0; - tv.tv_usec = int(RECV_TIMEOUT*1e6); + tv.tv_usec = timeout_ms*1000; //setup rset for timeout fd_set rset; diff --git a/host/lib/transport/zero_copy.cpp b/host/lib/transport/zero_copy.cpp index 8a1cde694..1fcf846a0 100644 --- a/host/lib/transport/zero_copy.cpp +++ b/host/lib/transport/zero_copy.cpp @@ -68,12 +68,12 @@ phony_zero_copy_recv_if::~phony_zero_copy_recv_if(void){ /* NOP */ } -managed_recv_buffer::sptr phony_zero_copy_recv_if::get_recv_buff(void){ +managed_recv_buffer::sptr phony_zero_copy_recv_if::get_recv_buff(size_t timeout_ms){ //allocate memory boost::uint8_t *recv_mem = new boost::uint8_t[_impl->max_buff_size]; //call recv() with timeout option - ssize_t num_bytes = this->recv(boost::asio::buffer(recv_mem, _impl->max_buff_size)); + ssize_t num_bytes = this->recv(boost::asio::buffer(recv_mem, _impl->max_buff_size), timeout_ms); if (num_bytes <= 0) return managed_recv_buffer::sptr(); //NULL sptr diff --git a/host/lib/usrp/usrp1/io_impl.cpp b/host/lib/usrp/usrp1/io_impl.cpp index 73974f2d6..aee760a83 100644 --- a/host/lib/usrp/usrp1/io_impl.cpp +++ b/host/lib/usrp/usrp1/io_impl.cpp @@ -272,18 +272,18 @@ static void usrp1_bs_vrt_unpacker( } static bool get_recv_buffs( - zero_copy_if::sptr zc_if, + zero_copy_if::sptr zc_if, size_t timeout_ms, vrt_packet_handler::managed_recv_buffs_t &buffs ){ UHD_ASSERT_THROW(buffs.size() == 1); - buffs[0] = zc_if->get_recv_buff(); + buffs[0] = zc_if->get_recv_buff(timeout_ms); return buffs[0].get() != NULL; } size_t usrp1_impl::recv( const std::vector &buffs, size_t num_samps, rx_metadata_t &metadata, const io_type_t &io_type, - recv_mode_t recv_mode, size_t /*timeout_ms TODO*/ + recv_mode_t recv_mode, size_t timeout_ms ){ size_t num_samps_recvd = vrt_packet_handler::recv( _io_impl->packet_handler_recv_state, //last state of the recv handler @@ -292,7 +292,7 @@ size_t usrp1_impl::recv( io_type, _rx_otw_type, //input and output types to convert _clock_ctrl->get_master_clock_freq(), //master clock tick rate &usrp1_bs_vrt_unpacker, - boost::bind(&get_recv_buffs, _data_transport, _1), + boost::bind(&get_recv_buffs, _data_transport, timeout_ms, _1), &vrt_packet_handler::handle_overflow_nop, 0, //vrt header offset _rx_subdev_spec.size() //num channels diff --git a/host/lib/usrp/usrp2/io_impl.cpp b/host/lib/usrp/usrp2/io_impl.cpp index 65411801d..3395f94e2 100644 --- a/host/lib/usrp/usrp2/io_impl.cpp +++ b/host/lib/usrp/usrp2/io_impl.cpp @@ -33,6 +33,7 @@ using namespace uhd::transport; namespace asio = boost::asio; static const int underflow_flags = async_metadata_t::EVENT_CODE_UNDERFLOW | async_metadata_t::EVENT_CODE_UNDERFLOW_IN_PACKET; +static const double RECV_TIMEOUT_MS = 100; /*********************************************************************** * io impl details (internal to this file) @@ -90,7 +91,7 @@ void usrp2_impl::io_impl::recv_pirate_loop( size_t next_packet_seq = 0; while(recv_pirate_crew_raiding){ - managed_recv_buffer::sptr buff = zc_if->get_recv_buff(); + managed_recv_buffer::sptr buff = zc_if->get_recv_buff(RECV_TIMEOUT_MS); if (not buff.get()) continue; //ignore timeout/error buffers try{ @@ -150,7 +151,7 @@ void usrp2_impl::io_init(void){ std::memcpy(send_buff->cast(), &data, sizeof(data)); send_buff->commit(sizeof(data)); //drain the recv buffers (may have junk) - while (data_transport->get_recv_buff().get()){}; + while (data_transport->get_recv_buff(RECV_TIMEOUT_MS).get()){}; } //the number of recv frames is the number for the first transport -- cgit v1.2.3