diff options
Diffstat (limited to 'host')
| -rw-r--r-- | host/include/uhd/utils/tasks.hpp | 10 | ||||
| -rw-r--r-- | host/lib/usrp/usrp1/io_impl.cpp | 13 | ||||
| -rw-r--r-- | host/lib/usrp/usrp1/soft_time_ctrl.cpp | 5 | ||||
| -rw-r--r-- | host/lib/usrp/usrp1/usrp1_impl.hpp | 3 | ||||
| -rw-r--r-- | host/lib/usrp/usrp2/io_impl.cpp | 9 | ||||
| -rw-r--r-- | host/lib/usrp/usrp2/usrp2_impl.cpp | 4 | ||||
| -rw-r--r-- | host/lib/usrp/usrp2/usrp2_impl.hpp | 2 | ||||
| -rw-r--r-- | host/lib/utils/tasks.cpp | 38 | ||||
| -rw-r--r-- | host/tests/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | host/tests/tasks_test.cpp | 38 | 
10 files changed, 82 insertions, 41 deletions
diff --git a/host/include/uhd/utils/tasks.hpp b/host/include/uhd/utils/tasks.hpp index a1f682a83..9b17ae08a 100644 --- a/host/include/uhd/utils/tasks.hpp +++ b/host/include/uhd/utils/tasks.hpp @@ -1,5 +1,6 @@  //  // Copyright 2011-2012 Ettus Research LLC +// Copyright 2017 Ettus Research (National Instruments Corp.)  //  // This program is free software: you can redistribute it and/or modify  // it under the terms of the GNU General Public License as published by @@ -33,18 +34,15 @@ namespace uhd{          /*!           * Create a new task object with function callback.           * The task function callback will be run in a loop. -         * until the thread is interrupted by the deconstructor. +         * until the thread is interrupted by the destructor.           * -         * A task should return in a reasonable amount of time -         * or may block forever under the following conditions: -         *  - The blocking call is interruptible. -         *  - The task polls the interrupt condition. +         * A task should return in a reasonable amount of time. +         * It may not block, or the destructor will also block.           *           * \param task_fcn the task callback function           * \return a new task object           */          static sptr make(const task_fcn_type &task_fcn); -      };  } //namespace uhd diff --git a/host/lib/usrp/usrp1/io_impl.cpp b/host/lib/usrp/usrp1/io_impl.cpp index 7ed1d8671..7cb38548f 100644 --- a/host/lib/usrp/usrp1/io_impl.cpp +++ b/host/lib/usrp/usrp1/io_impl.cpp @@ -32,6 +32,7 @@  #include <boost/bind.hpp>  #include <boost/format.hpp>  #include <boost/make_shared.hpp> +#include <atomic>  #define bmFR_RX_FORMAT_SHIFT_SHIFT 0  #define bmFR_RX_FORMAT_WIDTH_SHIFT 4 @@ -147,12 +148,14 @@ struct usrp1_impl::io_impl{      io_impl(zero_copy_if::sptr data_transport):          data_transport(data_transport),          curr_buff(offset_send_buffer(data_transport->get_send_buff())), -        omsb(boost::bind(&usrp1_impl::io_impl::commit_send_buff, this, _1, _2, _3)) +        omsb(boost::bind(&usrp1_impl::io_impl::commit_send_buff, this, _1, _2, _3)), +        vandal_loop_exit(false)      {          /* NOP */      }      ~io_impl(void){ +        vandal_loop_exit = true;          UHD_SAFE_CALL(flush_send_buff();)      } @@ -175,6 +178,7 @@ struct usrp1_impl::io_impl{          return omsb.get_new(curr_buff, next_buff);      } +    std::atomic<bool> vandal_loop_exit;      task::sptr vandal_task;      boost::system_time last_send_time;  }; @@ -247,7 +251,7 @@ void usrp1_impl::io_init(void){      //create a new vandal thread to poll xerflow conditions      _io_impl->vandal_task = task::make(boost::bind( -        &usrp1_impl::vandal_conquest_loop, this +        &usrp1_impl::vandal_conquest_loop, this, std::ref(_io_impl->vandal_loop_exit)      ));  } @@ -271,7 +275,7 @@ void usrp1_impl::tx_stream_on_off(bool enb){   * On an overflow, interleave an inline message into recv and print.   * This procedure creates "soft" inline and async user messages.   */ -void usrp1_impl::vandal_conquest_loop(void){ +void usrp1_impl::vandal_conquest_loop(std::atomic<bool> &exit_loop){      //initialize the async metadata      async_metadata_t async_metadata; @@ -285,7 +289,7 @@ void usrp1_impl::vandal_conquest_loop(void){      inline_metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW;      //start the polling loop... -    try{ while (not boost::this_thread::interruption_requested()){ +    try{ while (not exit_loop){          uint8_t underflow = 0, overflow = 0;          //shutoff transmit if it has been too long since send() was called @@ -315,7 +319,6 @@ void usrp1_impl::vandal_conquest_loop(void){          boost::this_thread::sleep(boost::posix_time::milliseconds(50));      }} -    catch(const boost::thread_interrupted &){} //normal exit condition      catch(const std::exception &e){          UHD_LOGGER_ERROR("USRP1") << "The vandal caught an unexpected exception " << e.what() ;      } diff --git a/host/lib/usrp/usrp1/soft_time_ctrl.cpp b/host/lib/usrp/usrp1/soft_time_ctrl.cpp index bb8b3a704..9cef99a60 100644 --- a/host/lib/usrp/usrp1/soft_time_ctrl.cpp +++ b/host/lib/usrp/usrp1/soft_time_ctrl.cpp @@ -199,8 +199,9 @@ public:      void recv_cmd_task(void){ //task is looped          boost::shared_ptr<stream_cmd_t> cmd; -        _cmd_queue.pop_with_wait(cmd); -        recv_cmd_handle_cmd(*cmd); +        if (_cmd_queue.pop_with_timed_wait(cmd, 0.25)) { +            recv_cmd_handle_cmd(*cmd); +        }      }      bounded_buffer<async_metadata_t> &get_async_queue(void){ diff --git a/host/lib/usrp/usrp1/usrp1_impl.hpp b/host/lib/usrp/usrp1/usrp1_impl.hpp index 1aa255f8d..b45d138d1 100644 --- a/host/lib/usrp/usrp1/usrp1_impl.hpp +++ b/host/lib/usrp/usrp1/usrp1_impl.hpp @@ -33,6 +33,7 @@  #include <uhd/transport/usb_zero_copy.hpp>  #include <boost/weak_ptr.hpp>  #include <complex> +#include <atomic>  #ifndef INCLUDED_USRP1_IMPL_HPP  #define INCLUDED_USRP1_IMPL_HPP @@ -144,7 +145,7 @@ private:      bool has_rx_halfband(void);      bool has_tx_halfband(void); -    void vandal_conquest_loop(void); +    void vandal_conquest_loop(std::atomic<bool> &);      void set_reg(const std::pair<uint8_t, uint32_t> ®); diff --git a/host/lib/usrp/usrp2/io_impl.cpp b/host/lib/usrp/usrp2/io_impl.cpp index 992d70835..8f6a67453 100644 --- a/host/lib/usrp/usrp2/io_impl.cpp +++ b/host/lib/usrp/usrp2/io_impl.cpp @@ -165,7 +165,7 @@ struct usrp2_impl::io_impl{      std::vector<flow_control_monitor::sptr> fc_mons;      //methods and variables for the pirate crew -    void recv_pirate_loop(zero_copy_if::sptr, size_t); +    void recv_pirate_loop(zero_copy_if::sptr, size_t, const std::atomic<bool> &);      std::list<task::sptr> pirate_tasks;      bounded_buffer<async_metadata_t> async_msg_fifo;      double tick_rate; @@ -178,14 +178,14 @@ struct usrp2_impl::io_impl{   * - put async message packets into queue   **********************************************************************/  void usrp2_impl::io_impl::recv_pirate_loop( -    zero_copy_if::sptr err_xport, size_t index +    zero_copy_if::sptr err_xport, size_t index, const std::atomic<bool> &exit_loop  ){      set_thread_priority_safe();      //store a reference to the flow control monitor (offset by max dsps)      flow_control_monitor &fc_mon = *(this->fc_mons[index]); -    while (not boost::this_thread::interruption_requested()){ +    while (not exit_loop){          managed_recv_buffer::sptr buff = err_xport->get_recv_buff();          if (not buff.get()) continue; //ignore timeout/error buffers @@ -252,7 +252,8 @@ void usrp2_impl::io_init(void){          //spawn a new pirate to plunder the recv booty          _io_impl->pirate_tasks.push_back(task::make(boost::bind(              &usrp2_impl::io_impl::recv_pirate_loop, _io_impl.get(), -            _mbc[mb].tx_dsp_xport, index++ +            _mbc[mb].tx_dsp_xport, index++, +            boost::ref(_pirate_task_exit)          )));      }  } diff --git a/host/lib/usrp/usrp2/usrp2_impl.cpp b/host/lib/usrp/usrp2/usrp2_impl.cpp index 78a9acb72..9ee13d289 100644 --- a/host/lib/usrp/usrp2/usrp2_impl.cpp +++ b/host/lib/usrp/usrp2/usrp2_impl.cpp @@ -316,7 +316,8 @@ static zero_copy_if::sptr make_xport(   * Structors   **********************************************************************/  usrp2_impl::usrp2_impl(const device_addr_t &_device_addr) : -    device_addr(_device_addr) +    device_addr(_device_addr), +    _pirate_task_exit(false)  {      UHD_LOGGER_INFO("USRP2") << "Opening a USRP2/N-Series device..."; @@ -786,6 +787,7 @@ usrp2_impl::usrp2_impl(const device_addr_t &_device_addr) :  }  usrp2_impl::~usrp2_impl(void){UHD_SAFE_CALL( +    _pirate_task_exit = true;      for(const std::string &mb:  _mbc.keys()){          _mbc[mb].tx_dsp->set_updates(0, 0);      } diff --git a/host/lib/usrp/usrp2/usrp2_impl.hpp b/host/lib/usrp/usrp2/usrp2_impl.hpp index 790daa749..087a4f8e9 100644 --- a/host/lib/usrp/usrp2/usrp2_impl.hpp +++ b/host/lib/usrp/usrp2/usrp2_impl.hpp @@ -46,6 +46,7 @@  #include <uhd/usrp/dboard_manager.hpp>  #include <uhd/usrp/subdev_spec.hpp>  #include <boost/weak_ptr.hpp> +#include <atomic>  static const double USRP2_LINK_RATE_BPS = 1000e6/8;  static const double mimo_clock_delay_usrp2_rev4 = 4.18e-9; @@ -120,6 +121,7 @@ private:      //io impl methods and members      uhd::device_addr_t device_addr;      UHD_PIMPL_DECL(io_impl) _io_impl; +    std::atomic<bool> _pirate_task_exit;      void io_init(void);      void update_tick_rate(const double rate);      void update_rx_samp_rate(const std::string &, const size_t, const double rate); diff --git a/host/lib/utils/tasks.cpp b/host/lib/utils/tasks.cpp index 5dac729c8..38d19502e 100644 --- a/host/lib/utils/tasks.cpp +++ b/host/lib/utils/tasks.cpp @@ -18,11 +18,14 @@  #include <uhd/utils/tasks.hpp>  #include <uhd/utils/msg_task.hpp>  #include <uhd/utils/log.hpp> +#include <uhd/exception.hpp>  #include <boost/thread/thread.hpp>  #include <boost/thread/barrier.hpp>  #include <exception>  #include <iostream>  #include <vector> +#include <thread> +#include <atomic>  using namespace uhd; @@ -30,53 +33,44 @@ class task_impl : public task{  public:      task_impl(const task_fcn_type &task_fcn): -        _spawn_barrier(2) +        _exit(false)      { -        (void)_thread_group.create_thread(boost::bind(&task_impl::task_loop, this, task_fcn)); -        _spawn_barrier.wait(); +        _task = std::thread([this, task_fcn](){ this->task_loop(task_fcn); });      }      ~task_impl(void){ -        _running = false; -        _thread_group.interrupt_all(); -        _thread_group.join_all(); +        _exit = true; +        if (_task.joinable()) { +            _task.join(); +        }      }  private:      void task_loop(const task_fcn_type &task_fcn){ -        _running = true; -        _spawn_barrier.wait(); -          try{ -            while (_running){ +            while (!_exit){                  task_fcn();              }          } -        catch(const boost::thread_interrupted &){ -            //this is an ok way to exit the task loop -        }          catch(const std::exception &e){              do_error_msg(e.what());          }          catch(...){ -            //FIXME -            //Unfortunately, this is also an ok way to end a task, -            //because on some systems boost throws uncatchables. +            UHD_THROW_INVALID_CODE_PATH();          }      }      void do_error_msg(const std::string &msg){          UHD_LOGGER_ERROR("UHD") -            << "An unexpected exception was caught in a task loop."  -            << "The task loop will now exit, things may not work."  -            << msg  +            << "An unexpected exception was caught in a task loop." +            << "The task loop will now exit, things may not work." +            << msg          ;      } -    boost::thread_group _thread_group; -    boost::barrier _spawn_barrier; -    bool _running; +    std::atomic<bool> _exit; +    std::thread _task;  };  task::sptr task::make(const task_fcn_type &task_fcn){ diff --git a/host/tests/CMakeLists.txt b/host/tests/CMakeLists.txt index ebda2cf70..bfbf57b41 100644 --- a/host/tests/CMakeLists.txt +++ b/host/tests/CMakeLists.txt @@ -45,6 +45,7 @@ SET(test_sources      sph_send_test.cpp      subdev_spec_test.cpp      time_spec_test.cpp +    tasks_test.cpp      vrt_test.cpp      expert_test.cpp      fe_conn_test.cpp diff --git a/host/tests/tasks_test.cpp b/host/tests/tasks_test.cpp new file mode 100644 index 000000000..225582591 --- /dev/null +++ b/host/tests/tasks_test.cpp @@ -0,0 +1,38 @@ +// +// Copyright 2010-2011 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. +// + +#include <boost/test/unit_test.hpp> +#include <uhd/utils/tasks.hpp> +#include <thread> +#include <chrono> +#include <vector> +#include <iostream> + +void test_tasks_sleep(size_t usecs) +{ +    std::this_thread::sleep_for(std::chrono::milliseconds(usecs)); +} + +BOOST_AUTO_TEST_CASE(tasks_test) { + +    static const size_t N_TASKS = 100; +    std::vector<uhd::task::sptr> test_vec; + +    for (size_t i = 0; i < N_TASKS; i++) { +        test_vec.push_back(uhd::task::make([i](){ test_tasks_sleep(i); })); +    } +}  | 
