diff options
Diffstat (limited to 'host/lib')
| -rw-r--r-- | host/lib/transport/super_recv_packet_handler.hpp | 93 | 
1 files changed, 71 insertions, 22 deletions
| diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp index 205c7a3a3..4b96199e2 100644 --- a/host/lib/transport/super_recv_packet_handler.hpp +++ b/host/lib/transport/super_recv_packet_handler.hpp @@ -23,6 +23,8 @@  #include <uhd/convert.hpp>  #include <uhd/stream.hpp>  #include <uhd/utils/msg.hpp> +#include <uhd/utils/tasks.hpp> +#include <uhd/utils/atomic.hpp>  #include <uhd/utils/byteswap.hpp>  #include <uhd/types/metadata.hpp>  #include <uhd/transport/vrt_if_packet.hpp> @@ -31,6 +33,9 @@  #include <boost/foreach.hpp>  #include <boost/function.hpp>  #include <boost/format.hpp> +#include <boost/bind.hpp> +#include <boost/make_shared.hpp> +#include <boost/thread/barrier.hpp>  #include <iostream>  #include <vector> @@ -73,12 +78,23 @@ public:          set_alignment_failure_threshold(1000);      } +    ~recv_packet_handler(void){ +        _task_handlers.clear(); +    } +      //! Resize the number of transport channels      void resize(const size_t size){          if (this->size() == size) return; +        _task_handlers.clear();          _props.resize(size);          //re-initialize all buffers infos by re-creating the vector          _buffers_infos = std::vector<buffers_info_type>(4, buffers_info_type(size)); +        _task_barrier_entry.resize(size); +        _task_barrier_exit.resize(size); +        _task_handlers.resize(size); +        for (size_t i = 1/*skip 0*/; i < size; i++){ +            _task_handlers[i] = task::make(boost::bind(&recv_packet_handler::converter_thread_task, this, i)); +        };      }      //! Get the channel width of this handler @@ -125,7 +141,7 @@ public:      //! Set the conversion routine for all channels      void set_converter(const uhd::convert::id_type &id){ -        _io_buffs.resize(id.num_outputs); +        _num_outputs = id.num_outputs;          _converter = uhd::convert::get_converter(id)();          this->set_scale_factor(1/32767.); //update after setting converter          _bytes_per_otw_item = uhd::convert::get_bytes_per_item(id.input_format); @@ -207,7 +223,7 @@ private:          handle_overflow_type handle_overflow;      };      std::vector<xport_chan_props_type> _props; -    std::vector<void *> _io_buffs; //used in conversion +    size_t _num_outputs;      size_t _bytes_per_otw_item; //used in conversion      size_t _bytes_per_cpu_item; //used in conversion      uhd::convert::converter::sptr _converter; //used in conversion @@ -512,24 +528,19 @@ private:          //extract the number of samples available to copy          const size_t nsamps_available = info.data_bytes_to_copy/_bytes_per_otw_item; -        const size_t nsamps_to_copy = std::min(nsamps_per_buff*_io_buffs.size(), nsamps_available); +        const size_t nsamps_to_copy = std::min(nsamps_per_buff*_num_outputs, nsamps_available);          const size_t bytes_to_copy = nsamps_to_copy*_bytes_per_otw_item; -        const size_t nsamps_to_copy_per_io_buff = nsamps_to_copy/_io_buffs.size(); +        const size_t nsamps_to_copy_per_io_buff = nsamps_to_copy/_num_outputs; -        size_t buff_index = 0; -        BOOST_FOREACH(per_buffer_info_type &buff_info, info){ +        //setup the data to share with converter threads +        _convert_nsamps = nsamps_to_copy_per_io_buff; +        _convert_buffs = &buffs; +        _convert_buffer_offset_bytes = buffer_offset_bytes; +        _convert_bytes_to_copy = bytes_to_copy; -            //fill a vector with pointers to the io buffers -            BOOST_FOREACH(void *&io_buff, _io_buffs){ -                io_buff = reinterpret_cast<char *>(buffs[buff_index++]) + buffer_offset_bytes; -            } +        //perform N channels of conversion +        converter_thread_task(0); -            //copy-convert the samples from the recv buffer -            _converter->conv(buff_info.copy_buff, _io_buffs, nsamps_to_copy_per_io_buff); - -            //update the rx copy buffer to reflect the bytes copied -            buff_info.copy_buff += bytes_to_copy; -        }          //update the copy buffer's availability          info.data_bytes_to_copy -= bytes_to_copy; @@ -538,15 +549,53 @@ private:          metadata.fragment_offset = info.fragment_offset_in_samps;          info.fragment_offset_in_samps += nsamps_to_copy; //set for next call -        //done with buffers? this action releases buffers in-order -        if (not metadata.more_fragments){ -            BOOST_FOREACH(per_buffer_info_type &buff_info, info){ -                buff_info.buff.reset(); //effectively a release -            } +        return nsamps_to_copy_per_io_buff; +    } + +    /******************************************************************* +     * Perform one thread's work of the conversion task. +     * The entry and exit use a dual synchronization barrier, +     * to wait for data to become ready and block until completion. +     ******************************************************************/ +    UHD_INLINE void converter_thread_task(const size_t index) +    { +        _task_barrier_entry.wait(); + +        //shortcut references to local data structures +        buffers_info_type &buff_info = get_curr_buffer_info(); +        per_buffer_info_type &info = buff_info[index]; +        const rx_streamer::buffs_type &buffs = *_convert_buffs; + +        //fill IO buffs with pointers into the output buffer +        void *io_buffs[4/*max interleave*/]; +        for (size_t i = 0; i < _num_outputs; i++){ +            char *b = reinterpret_cast<char *>(buffs[index*_num_outputs + i]); +            io_buffs[i] = b + _convert_buffer_offset_bytes;          } +        const ref_vector<void *> out_buffs(io_buffs, _num_outputs); -        return nsamps_to_copy_per_io_buff; +        //perform the conversion operation +        _converter->conv(info.copy_buff, out_buffs, _convert_nsamps); + +        //advance the pointer for the source buffer +        info.copy_buff += _convert_bytes_to_copy; + +        //release the buffer if fully consumed +        if (buff_info.data_bytes_to_copy == _convert_bytes_to_copy){ +            info.buff.reset(); //effectively a release +        } + +        _task_barrier_exit.wait();      } + +    //! Shared variables for the worker threads +    reusable_barrier _task_barrier_entry, _task_barrier_exit; +    std::vector<task::sptr> _task_handlers; +    size_t _convert_nsamps; +    const rx_streamer::buffs_type *_convert_buffs; +    size_t _convert_buffer_offset_bytes; +    size_t _convert_bytes_to_copy; +  };  class recv_packet_streamer : public recv_packet_handler, public rx_streamer{ | 
