diff options
| author | Josh Blum <josh@joshknows.com> | 2012-07-02 11:21:23 -0700 | 
|---|---|---|
| committer | Josh Blum <josh@joshknows.com> | 2012-07-02 14:05:53 -0700 | 
| commit | 2684652511036a901e22fe936953abfa8327469e (patch) | |
| tree | 1750a6fdca08a8365c5ed3264a354a7cacd4794a | |
| parent | c01e7dff8f68417dedeffd813887024015c5a048 (diff) | |
| download | uhd-2684652511036a901e22fe936953abfa8327469e.tar.gz uhd-2684652511036a901e22fe936953abfa8327469e.tar.bz2 uhd-2684652511036a901e22fe936953abfa8327469e.zip  | |
transport: multi-threaded send_packet_handler
| -rw-r--r-- | host/lib/transport/super_send_packet_handler.hpp | 102 | 
1 files changed, 77 insertions, 25 deletions
diff --git a/host/lib/transport/super_send_packet_handler.hpp b/host/lib/transport/super_send_packet_handler.hpp index 02cfad80f..8f943effb 100644 --- a/host/lib/transport/super_send_packet_handler.hpp +++ b/host/lib/transport/super_send_packet_handler.hpp @@ -23,6 +23,8 @@  #include <uhd/convert.hpp>  #include <uhd/stream.hpp>  #include <uhd/utils/msg.hpp> +#include <uhd/utils/tasks.hpp> +#include <uhd/utils/atomic.hpp>  #include <uhd/utils/byteswap.hpp>  #include <uhd/types/metadata.hpp>  #include <uhd/transport/vrt_if_packet.hpp> @@ -58,12 +60,23 @@ public:          this->resize(size);      } +    ~send_packet_handler(void){ +        _task_handlers.clear(); +    } +      //! Resize the number of transport channels      void resize(const size_t size){          if (this->size() == size) return; +        _task_handlers.clear();          _props.resize(size);          static const boost::uint64_t zero = 0;          _zero_buffs.resize(size, &zero); +        _task_barrier_entry.resize(size); +        _task_barrier_exit.resize(size); +        _task_handlers.resize(size); +        for (size_t i = 1/*skip 0*/; i < size; i++){ +            _task_handlers[i] = task::make(boost::bind(&send_packet_handler::converter_thread_task, this, i)); +        };      }      //! Get the channel width of this handler @@ -104,7 +117,7 @@ public:      //! Set the conversion routine for all channels      void set_converter(const uhd::convert::id_type &id){ -        _io_buffs.resize(id.num_inputs); +        _num_inputs = id.num_inputs;          _converter = uhd::convert::get_converter(id)();          this->set_scale_factor(32767.); //update after setting converter          _bytes_per_otw_item = uhd::convert::get_bytes_per_item(id.output_format); @@ -205,9 +218,10 @@ private:          get_buff_type get_buff;          bool has_sid;          boost::uint32_t sid; +        managed_send_buffer::sptr buff;      };      std::vector<xport_chan_props_type> _props; -    std::vector<const void *> _io_buffs; //used in conversion +    size_t _num_inputs;      size_t _bytes_per_otw_item; //used in conversion      size_t _bytes_per_cpu_item; //used in conversion      uhd::convert::converter::sptr _converter; //used in conversion @@ -226,39 +240,77 @@ private:          const size_t buffer_offset_bytes = 0      ){          //load the rest of the if_packet_info in here -        if_packet_info.num_payload_bytes = nsamps_per_buff*_io_buffs.size()*_bytes_per_otw_item; +        if_packet_info.num_payload_bytes = nsamps_per_buff*_num_inputs*_bytes_per_otw_item;          if_packet_info.num_payload_words32 = (if_packet_info.num_payload_bytes + 3/*round up*/)/sizeof(boost::uint32_t);          if_packet_info.packet_count = _next_packet_seq; -        size_t buff_index = 0; +        //get a buffer for each channel or timeout          BOOST_FOREACH(xport_chan_props_type &props, _props){ -            managed_send_buffer::sptr buff = props.get_buff(timeout); -            if (buff.get() == NULL) return 0; //timeout - -            //fill a vector with pointers to the io buffers -            BOOST_FOREACH(const void *&io_buff, _io_buffs){ -                io_buff = reinterpret_cast<const char *>(buffs[buff_index++]) + buffer_offset_bytes; -            } -            boost::uint32_t *otw_mem = buff->cast<boost::uint32_t *>() + _header_offset_words32; - -            //pack metadata into a vrt header -            if_packet_info.has_sid = props.has_sid; -            if_packet_info.sid = props.sid; -            _vrt_packer(otw_mem, if_packet_info); -            otw_mem += if_packet_info.num_header_words32; +            if (not props.buff) props.buff = props.get_buff(timeout); +            if (not props.buff) return 0; //timeout +        } -            //copy-convert the samples into the send buffer -            _converter->conv(_io_buffs, otw_mem, nsamps_per_buff); +        //setup the data to share with converter threads +        _convert_nsamps = nsamps_per_buff; +        _convert_buffs = &buffs; +        _convert_buffer_offset_bytes = buffer_offset_bytes; +        _convert_if_packet_info = &if_packet_info; -            //commit the samples to the zero-copy interface -            size_t num_bytes_total = (_header_offset_words32+if_packet_info.num_packet_words32)*sizeof(boost::uint32_t); -            buff->commit(num_bytes_total); -            buff.reset(); //effectively a release +        //perform N channels of conversion +        converter_thread_task(0); -        }          _next_packet_seq++; //increment sequence after commits          return nsamps_per_buff;      } + +    /******************************************************************* +     * Perform one thread's work of the conversion task. +     * The entry and exit use a dual synchronization barrier, +     * to wait for data to become ready and block until completion. +     ******************************************************************/ +    UHD_INLINE void converter_thread_task(const size_t index) +    { +        _task_barrier_entry.wait(); + +        //shortcut references to local data structures +        managed_send_buffer::sptr &buff = _props[index].buff; +        vrt::if_packet_info_t if_packet_info = *_convert_if_packet_info; +        const tx_streamer::buffs_type &buffs = *_convert_buffs; + +        //fill IO buffs with pointers into the output buffer +        const void *io_buffs[4/*max interleave*/]; +        for (size_t i = 0; i < _num_inputs; i++){ +            const char *b = reinterpret_cast<const char *>(buffs[index*_num_inputs + i]); +            io_buffs[i] = b + _convert_buffer_offset_bytes; +        } +        const ref_vector<const void *> in_buffs(io_buffs, _num_inputs); + +        //pack metadata into a vrt header +        boost::uint32_t *otw_mem = buff->cast<boost::uint32_t *>() + _header_offset_words32; +        if_packet_info.has_sid = _props[index].has_sid; +        if_packet_info.sid = _props[index].sid; +        _vrt_packer(otw_mem, if_packet_info); +        otw_mem += if_packet_info.num_header_words32; + +        //perform the conversion operation +        _converter->conv(in_buffs, otw_mem, _convert_nsamps); + +        //commit the samples to the zero-copy interface +        const size_t num_vita_words32 = _header_offset_words32+if_packet_info.num_packet_words32; +        buff->commit(num_vita_words32*sizeof(boost::uint32_t)); +        buff.reset(); //effectively a release + +        _task_barrier_exit.wait(); +    } + +    //! Shared variables for the worker threads +    reusable_barrier _task_barrier_entry, _task_barrier_exit; +    std::vector<task::sptr> _task_handlers; +    size_t _convert_nsamps; +    const tx_streamer::buffs_type *_convert_buffs; +    size_t _convert_buffer_offset_bytes; +    vrt::if_packet_info_t *_convert_if_packet_info; +  };  class send_packet_streamer : public send_packet_handler, public tx_streamer{  | 
