diff options
Diffstat (limited to 'host/lib/usrp/usrp1/soft_time_ctrl.cpp')
-rw-r--r-- | host/lib/usrp/usrp1/soft_time_ctrl.cpp | 90 |
1 files changed, 56 insertions, 34 deletions
diff --git a/host/lib/usrp/usrp1/soft_time_ctrl.cpp b/host/lib/usrp/usrp1/soft_time_ctrl.cpp index 1bab34e7b..78481c3ff 100644 --- a/host/lib/usrp/usrp1/soft_time_ctrl.cpp +++ b/host/lib/usrp/usrp1/soft_time_ctrl.cpp @@ -16,10 +16,8 @@ // #include "soft_time_ctrl.hpp" -#include <uhd/transport/bounded_buffer.hpp> -#include <boost/any.hpp> -#include <boost/thread/thread.hpp> -#include <boost/thread/barrier.hpp> +#include <uhd/utils/tasks.hpp> +#include <boost/make_shared.hpp> #include <boost/thread/condition_variable.hpp> #include <boost/date_time/posix_time/posix_time.hpp> #include <iostream> @@ -41,24 +39,17 @@ public: _nsamps_remaining(0), _stream_mode(stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS), _cmd_queue(2), + _async_msg_queue(100), + _inline_msg_queue(100), _stream_on_off(stream_on_off) { //synchronously spawn a new thread - boost::barrier spawn_barrier(2); - _thread_group.create_thread(boost::bind( - &soft_time_ctrl_impl::recv_cmd_dispatcher, this, boost::ref(spawn_barrier)) - ); - spawn_barrier.wait(); + _recv_cmd_task = task::make(boost::bind(&soft_time_ctrl_impl::recv_cmd_task, this)); //initialize the time to something this->set_time(time_spec_t(0.0)); } - ~soft_time_ctrl_impl(void){ - _thread_group.interrupt_all(); - _thread_group.join_all(); - } - /******************************************************************* * Time control ******************************************************************/ @@ -89,32 +80,47 @@ public: /******************************************************************* * Receive control ******************************************************************/ - void recv_post(rx_metadata_t &md, size_t &nsamps){ + size_t recv_post(rx_metadata_t &md, const size_t nsamps){ boost::mutex::scoped_lock lock(_update_mutex); + //Since it timed out on the receive, check for inline messages... + //Must do a post check because recv() will not wake up for a message. + if (md.error_code == rx_metadata_t::ERROR_CODE_TIMEOUT){ + if (_inline_msg_queue.pop_with_haste(md)) return 0; + } + //load the metadata with the expected time md.has_time_spec = true; md.time_spec = time_now(); //none of the stuff below matters in continuous streaming mode - if (_stream_mode == stream_cmd_t::STREAM_MODE_START_CONTINUOUS) return; + if (_stream_mode == stream_cmd_t::STREAM_MODE_START_CONTINUOUS) return nsamps; //When to stop streaming: //The samples have been received and the stream mode is non-continuous. //Rewrite the sample count to clip to the requested number of samples. - if (_nsamps_remaining <= nsamps){ - nsamps = _nsamps_remaining; //set nsamps, then stop + if (_nsamps_remaining <= nsamps) switch(_stream_mode){ + case stream_cmd_t::STREAM_MODE_NUM_SAMPS_AND_MORE:{ + rx_metadata_t metadata; + metadata.has_time_spec = true; + metadata.time_spec = this->time_now(); + metadata.error_code = rx_metadata_t::ERROR_CODE_BROKEN_CHAIN; + _inline_msg_queue.push_with_pop_on_full(metadata); + } //continue to next case... + case stream_cmd_t::STREAM_MODE_NUM_SAMPS_AND_DONE: md.end_of_burst = true; - stream_on_off(false); - return; + this->issue_stream_cmd(stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS); + return _nsamps_remaining; + default: break; } //update the consumed samples _nsamps_remaining -= nsamps; + return nsamps; } void issue_stream_cmd(const stream_cmd_t &cmd){ - _cmd_queue.push_with_wait(cmd); + _cmd_queue.push_with_wait(boost::make_shared<stream_cmd_t>(cmd)); } void stream_on_off(bool enb){ @@ -134,7 +140,12 @@ public: //handle late packets if (time_at < time_now()){ - //TODO post async message + async_metadata_t metadata; + metadata.channel = 0; + metadata.has_time_spec = true; + metadata.time_spec = this->time_now(); + metadata.event_code = async_metadata_t::EVENT_CODE_TIME_ERROR; + _async_msg_queue.push_with_pop_on_full(metadata); return true; } @@ -153,7 +164,13 @@ public: if (not cmd.stream_now){ time_spec_t time_at(cmd.time_spec - TWIDDLE); if (time_at < time_now()){ - //TODO inject late cmd inline error + rx_metadata_t metadata; + metadata.has_time_spec = true; + metadata.time_spec = this->time_now(); + metadata.error_code = rx_metadata_t::ERROR_CODE_LATE_COMMAND; + _inline_msg_queue.push_with_pop_on_full(metadata); + this->issue_stream_cmd(stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS); + return; } else{ sleep_until_time(lock, time_at); @@ -177,15 +194,18 @@ public: _stream_mode = cmd.stream_mode; } - void recv_cmd_dispatcher(boost::barrier &spawn_barrier){ - spawn_barrier.wait(); - try{ - boost::any cmd; - while (true){ - _cmd_queue.pop_with_wait(cmd); - recv_cmd_handle_cmd(boost::any_cast<stream_cmd_t>(cmd)); - } - } catch(const boost::thread_interrupted &){} + void recv_cmd_task(void){ //task is looped + boost::shared_ptr<stream_cmd_t> cmd; + _cmd_queue.pop_with_wait(cmd); + recv_cmd_handle_cmd(*cmd); + } + + bounded_buffer<async_metadata_t> &get_async_queue(void){ + return _async_msg_queue; + } + + bounded_buffer<rx_metadata_t> &get_inline_queue(void){ + return _inline_msg_queue; } private: @@ -193,9 +213,11 @@ private: size_t _nsamps_remaining; stream_cmd_t::stream_mode_t _stream_mode; time_spec_t _time_offset; - bounded_buffer<boost::any> _cmd_queue; + bounded_buffer<boost::shared_ptr<stream_cmd_t> > _cmd_queue; + bounded_buffer<async_metadata_t> _async_msg_queue; + bounded_buffer<rx_metadata_t> _inline_msg_queue; const cb_fcn_type _stream_on_off; - boost::thread_group _thread_group; + task::sptr _recv_cmd_task; }; /*********************************************************************** |