diff options
| -rw-r--r-- | host/lib/transport/super_send_packet_handler.hpp | 211 | 
1 files changed, 170 insertions, 41 deletions
diff --git a/host/lib/transport/super_send_packet_handler.hpp b/host/lib/transport/super_send_packet_handler.hpp index 0acc8df4b..431cbf216 100644 --- a/host/lib/transport/super_send_packet_handler.hpp +++ b/host/lib/transport/super_send_packet_handler.hpp @@ -29,10 +29,13 @@  #include <uhd/types/metadata.hpp>  #include <uhd/transport/vrt_if_packet.hpp>  #include <uhd/transport/zero_copy.hpp> +#include <uhd/utils/safe_call.hpp>  #include <boost/thread/thread.hpp>  #include <boost/thread/thread_time.hpp>  #include <boost/foreach.hpp>  #include <boost/function.hpp> +#include <boost/atomic.hpp> +#include <boost/make_shared.hpp>  #include <iostream>  #include <vector> @@ -49,6 +52,9 @@ namespace uhd {  namespace transport {  namespace sph { +static const size_t MAX_INTERLEAVE = 4; +static const double GET_BUFF_TIMEOUT = 0.1; +  /***********************************************************************   * Super send packet handler   * @@ -68,19 +74,39 @@ public:       * \param size the number of transport channels       */      send_packet_handler(const size_t size = 1): -        _next_packet_seq(0), _cached_metadata(false) +       _next_packet_seq(0), _cached_metadata(false)      {          this->set_enable_trailer(true);          this->resize(size);      }      ~send_packet_handler(void){ -        /* NOP */ +        UHD_SAFE_CALL( +            for (size_t i = 0; i < _worker_data.size(); i++) +            { +                _worker_data[i]->stop = true; +            } +            _worker_thread_group.join_all(); +        );      }      //! Resize the number of transport channels      void resize(const size_t size){          if (this->size() == size) return; + +        // Stop all worker threads +        for (size_t i = 0; i < _worker_data.size(); i++) +        { +            _worker_data[i]->stop = true; +        } +        _worker_thread_group.join_all(); +        _worker_threads.resize(size); +        _worker_data.resize(size); +        for (size_t i = 0; i < size; i++) +        { +            _worker_data[i] = boost::make_shared<worker_thread_data_t>(); +        } +          _props.resize(size);          static const uint64_t zero = 0;          _zero_buffs.resize(size, &zero); @@ -145,7 +171,15 @@ public:       * \param get_buff the getter function       */      void set_xport_chan_get_buff(const size_t xport_chan, const get_buff_type &get_buff){ +        if (_worker_threads[xport_chan]) +        { +            _worker_thread_group.remove_thread(_worker_threads[xport_chan]); +            _worker_data[xport_chan]->stop = true; +            _worker_threads[xport_chan]->join(); +            _worker_data[xport_chan]->stop = false; +        }          _props.at(xport_chan).get_buff = get_buff; +        _worker_threads[xport_chan] = _worker_thread_group.create_thread(boost::bind(&send_packet_handler::worker, this, xport_chan));      }      //! Set the conversion routine for all channels @@ -381,63 +415,147 @@ private:          if_packet_info.num_payload_words32 = (if_packet_info.num_payload_bytes + 3/*round up*/)/sizeof(uint32_t);          if_packet_info.packet_count = _next_packet_seq; -        //get a buffer for each channel or timeout -        BOOST_FOREACH(xport_chan_props_type &props, _props){ -            if (not props.buff) props.buff = props.get_buff(timeout); -            if (not props.buff) return 0; //timeout +        // wait for all worker threads to be ready or timeout +        boost::system_time expiration = boost::get_system_time() + boost::posix_time::milliseconds(long(timeout * 1000)); +        for (size_t i = 0; i < this->size(); i++) +        { +            while (not _worker_data[i]->ready) +            { +                if (boost::get_system_time() > expiration) +                { +                    return 0; +                } +            } +            _worker_data[i]->ready = false;          } -        //setup the data to share with converter threads +        //setup the data to share with worker threads          _convert_nsamps = nsamps_per_buff;          _convert_buffs = &buffs;          _convert_buffer_offset_bytes = buffer_offset_bytes;          _convert_if_packet_info = &if_packet_info; -        //perform N channels of conversion -        for (size_t i = 0; i < this->size(); i++) { -            convert_to_in_buff(i); +        //start N channels of conversion +        for (size_t i = 0; i < this->size(); i++) +        { +            _worker_data[i]->go = true; +        } + +        //make sure any sleeping worker threads are woken up +        for (size_t i = 0; i < this->size(); i++) +        { +            // Acquiring the lock used by the condition variable +            // takes too long, so do a spin wait.  If the go flag +            // is not cleared by this point, it will be cleared +            // immediately by the worker thread when it wakes up. +            while (_worker_data[i]->go) +            { +                _worker_data[i]->data_ready.notify_one(); +            } +        } + +        //wait for all worker threads to be done +        for (size_t i = 0; i < this->size(); i++) +        { +            //TODO: Implement a better wait strategy +            //busy loop give fastest response, but these are just wasted cycles +            while (not _worker_data[i]->done) {} +            _worker_data[i]->done = false;          }          _next_packet_seq++; //increment sequence after commits          return nsamps_per_buff;      } -    /*! Run the conversion from the internal buffers to the user's input -     *  buffer. +    /*! Worker thread routine.       * +     * - Gets an internal data buffer       * - Calls the converter       * - Releases internal data buffers -     * - Updates read/write pointers       */ -    UHD_INLINE void convert_to_in_buff(const size_t index) +    void worker(const size_t index)      { -        //shortcut references to local data structures -        managed_send_buffer::sptr &buff = _props[index].buff; -        vrt::if_packet_info_t if_packet_info = *_convert_if_packet_info; -        const tx_streamer::buffs_type &buffs = *_convert_buffs; - -        //fill IO buffs with pointers into the output buffer -        const void *io_buffs[4/*max interleave*/]; -        for (size_t i = 0; i < _num_inputs; i++){ -            const char *b = reinterpret_cast<const char *>(buffs[index*_num_inputs + i]); -            io_buffs[i] = b + _convert_buffer_offset_bytes; +        //maximum number of cycles to spin before waiting on condition variable +        //the value of 30000000 was derived from 15ms on a 10 GHz CPU divided by 5 cycles per loop +        //the assumption is that anything held up for 15ms can wait +        static const size_t MAX_SPIN_CYCLES = 30000000; + +        //maximum amount of time to wait before checking the stop flag +        static const double MAX_WAIT = 0.1; + +        managed_send_buffer::sptr buff; +        vrt::if_packet_info_t if_packet_info; +        std::vector<const void *> in_buffs(MAX_INTERLEAVE); +        boost::shared_ptr<worker_thread_data_t> worker_data = _worker_data[index]; +        boost::unique_lock<boost::mutex> lock(worker_data->data_ready_lock); +        size_t spins = 0; + +        while (not worker_data->stop) +        { +            if (not buff) +            { +                buff = _props[index].get_buff(MAX_WAIT); +                if (not buff) +                { +                    continue; +                } +                worker_data->ready = true; +            } + +            //make sure done flag is cleared by controlling thread before waiting on go signal +            if (worker_data->done) +            { +                continue; +            } + +            //partial spin lock before wait +            while (not worker_data->go and spins < MAX_SPIN_CYCLES) +            { +                spins++; +            } +            if (not worker_data->go and +                not worker_data->data_ready.timed_wait(lock, boost::posix_time::milliseconds(long(MAX_WAIT*1000)))) +            { +                continue; +            } +            // Clear the go flag immediately to let the +            // controlling thread know we are not sleeping. +            worker_data->go = false; + +            //reset the spin count +            spins = 0; + +            //pack metadata into a vrt header +            uint32_t *otw_mem = buff->cast<uint32_t *>() + _header_offset_words32; +            if_packet_info = *_convert_if_packet_info; +            if_packet_info.has_sid = _props[index].has_sid; +            if_packet_info.sid = _props[index].sid; +            _vrt_packer(otw_mem, if_packet_info); +            otw_mem += if_packet_info.num_header_words32; + +            //prepare the input buffers +            for (size_t i = 0; i < _num_inputs; i++) +            { +                in_buffs[i] = +                    (reinterpret_cast<const char *>((*_convert_buffs)[index*_num_inputs + i])) +                    + _convert_buffer_offset_bytes; +            } + +            //perform the conversion operation +            _converter->conv(in_buffs, otw_mem, _convert_nsamps); + +            //let the master know that new data can be prepared +            _worker_data[index]->done = true; + +            //commit the samples to the zero-copy interface +            buff->commit( +                (_header_offset_words32 + if_packet_info.num_packet_words32) +                * sizeof(uint32_t) +            ); + +            //release the buffer +            buff.reset();          } -        const ref_vector<const void *> in_buffs(io_buffs, _num_inputs); - -        //pack metadata into a vrt header -        uint32_t *otw_mem = buff->cast<uint32_t *>() + _header_offset_words32; -        if_packet_info.has_sid = _props[index].has_sid; -        if_packet_info.sid = _props[index].sid; -        _vrt_packer(otw_mem, if_packet_info); -        otw_mem += if_packet_info.num_header_words32; - -        //perform the conversion operation -        _converter->conv(in_buffs, otw_mem, _convert_nsamps); - -        //commit the samples to the zero-copy interface -        const size_t num_vita_words32 = _header_offset_words32+if_packet_info.num_packet_words32; -        buff->commit(num_vita_words32*sizeof(uint32_t)); -        buff.reset(); //effectively a release      }      //! Shared variables for the worker threads @@ -445,7 +563,18 @@ private:      const tx_streamer::buffs_type *_convert_buffs;      size_t _convert_buffer_offset_bytes;      vrt::if_packet_info_t *_convert_if_packet_info; - +    struct worker_thread_data_t { +        worker_thread_data_t() : ready(false), go(false), done(false), stop(false) {} +        boost::atomic_bool ready; +        boost::atomic_bool go; +        boost::atomic_bool done; +        boost::atomic_bool stop; +        boost::mutex data_ready_lock; +        boost::condition_variable data_ready; +    }; +    std::vector< boost::shared_ptr<worker_thread_data_t> > _worker_data; +    boost::thread_group _worker_thread_group; +    std::vector<boost::thread *> _worker_threads;  };  class send_packet_streamer : public send_packet_handler, public tx_streamer{  | 
