diff options
Diffstat (limited to 'host/lib')
| -rw-r--r-- | host/lib/rfnoc/nocscript/expression.hpp | 2 | ||||
| -rw-r--r-- | host/lib/transport/nirio_zero_copy.cpp | 39 | ||||
| -rw-r--r-- | host/lib/transport/super_send_packet_handler.hpp | 211 | ||||
| -rw-r--r-- | host/lib/utils/tasks.cpp | 4 | 
4 files changed, 83 insertions, 173 deletions
| diff --git a/host/lib/rfnoc/nocscript/expression.hpp b/host/lib/rfnoc/nocscript/expression.hpp index 83fc5bcbc..1acd02009 100644 --- a/host/lib/rfnoc/nocscript/expression.hpp +++ b/host/lib/rfnoc/nocscript/expression.hpp @@ -215,6 +215,7 @@ class expression_container : public expression      //! Create an empty container      expression_container() : _combiner(COMBINE_NOTSET) {}; +    virtual ~expression_container(){};      /*! Type-deduction rules for containers are:       * - If the combination type is COMBINE_ALL or COMBINE_AND, @@ -299,6 +300,7 @@ class expression_function : public expression_container              const std::string &name,              const boost::shared_ptr<function_table> func_table      ); +    ~expression_function(){};      //! Add an argument expression      virtual void add(expression::sptr new_expr); diff --git a/host/lib/transport/nirio_zero_copy.cpp b/host/lib/transport/nirio_zero_copy.cpp index 14118d393..0635e01cf 100644 --- a/host/lib/transport/nirio_zero_copy.cpp +++ b/host/lib/transport/nirio_zero_copy.cpp @@ -32,6 +32,23 @@  //@TODO: Move the register defs required by the class to a common location  #include "../usrp/x300/x300_regs.hpp" +#if defined(_WIN32) || defined(__WIN32__) || defined(WIN32) +#include <windows.h> +static UHD_INLINE size_t get_page_size() +{ +    SYSTEM_INFO si; +    GetSystemInfo(&si); +    return si.dwPageSize; +} +#else +#include <unistd.h> +static UHD_INLINE size_t get_page_size() +{ +    return size_t(sysconf(_SC_PAGESIZE)); +} +#endif +static const size_t page_size = get_page_size(); +  using namespace uhd;  using namespace uhd::transport;  using namespace uhd::niusrprio; @@ -351,7 +368,6 @@ nirio_zero_copy::sptr nirio_zero_copy::make(  ){      //Initialize xport_params      zero_copy_xport_params xport_params = default_buff_args; -    size_t page_size = boost::interprocess::mapped_region::get_page_size();      //The kernel buffer for this transport must be (num_frames * frame_size) big. Unlike ethernet,      //where the kernel buffer size is independent of the circular buffer size for the transport, @@ -366,6 +382,22 @@ nirio_zero_copy::sptr nirio_zero_copy::make(      size_t usr_recv_buff_size = static_cast<size_t>(          hints.cast<double>("recv_buff_size", default_buff_args.num_recv_frames)); +    if (hints.has_key("recv_buff_size")) +    { +        if (usr_recv_buff_size % page_size != 0) +        { +            throw uhd::value_error((boost::format("recv_buff_size must be multiple of %d") % page_size).str()); +        } +    } + +    if (hints.has_key("recv_frame_size") and hints.has_key("num_recv_frames")) +    { +        if (usr_num_recv_frames * xport_params.recv_frame_size % page_size != 0) +        { +            throw uhd::value_error((boost::format("num_recv_frames * recv_frame_size must be an even multiple of %d") % page_size).str()); +        } +    } +      if (hints.has_key("num_recv_frames") and hints.has_key("recv_buff_size")) {          if (usr_recv_buff_size < xport_params.recv_frame_size)              throw uhd::value_error("recv_buff_size must be equal to or greater than (num_recv_frames * recv_frame_size)"); @@ -380,6 +412,11 @@ nirio_zero_copy::sptr nirio_zero_copy::make(          xport_params.num_recv_frames = usr_num_recv_frames;      } +    if (xport_params.num_recv_frames * xport_params.recv_frame_size % page_size != 0) +    { +        throw uhd::value_error((boost::format("num_recv_frames * recv_frame_size must be an even multiple of %d") % page_size).str()); +    } +      //TX      xport_params.send_frame_size = size_t(hints.cast<double>("send_frame_size", default_buff_args.send_frame_size)); diff --git a/host/lib/transport/super_send_packet_handler.hpp b/host/lib/transport/super_send_packet_handler.hpp index 003689a78..12c053efb 100644 --- a/host/lib/transport/super_send_packet_handler.hpp +++ b/host/lib/transport/super_send_packet_handler.hpp @@ -29,12 +29,9 @@  #include <uhd/types/metadata.hpp>  #include <uhd/transport/vrt_if_packet.hpp>  #include <uhd/transport/zero_copy.hpp> -#include <uhd/utils/safe_call.hpp>  #include <boost/thread/thread.hpp>  #include <boost/thread/thread_time.hpp>  #include <boost/function.hpp> -#include <boost/atomic.hpp> -#include <boost/make_shared.hpp>  #include <iostream>  #include <vector> @@ -51,9 +48,6 @@ namespace uhd {  namespace transport {  namespace sph { -static const size_t MAX_INTERLEAVE = 4; -static const double GET_BUFF_TIMEOUT = 0.1; -  /***********************************************************************   * Super send packet handler   * @@ -73,39 +67,19 @@ public:       * \param size the number of transport channels       */      send_packet_handler(const size_t size = 1): -       _next_packet_seq(0), _cached_metadata(false) +        _next_packet_seq(0), _cached_metadata(false)      {          this->set_enable_trailer(true);          this->resize(size);      }      ~send_packet_handler(void){ -        UHD_SAFE_CALL( -            for (size_t i = 0; i < _worker_data.size(); i++) -            { -                _worker_data[i]->stop = true; -            } -            _worker_thread_group.join_all(); -        ); +        /* NOP */      }      //! Resize the number of transport channels      void resize(const size_t size){          if (this->size() == size) return; - -        // Stop all worker threads -        for (size_t i = 0; i < _worker_data.size(); i++) -        { -            _worker_data[i]->stop = true; -        } -        _worker_thread_group.join_all(); -        _worker_threads.resize(size); -        _worker_data.resize(size); -        for (size_t i = 0; i < size; i++) -        { -            _worker_data[i] = boost::make_shared<worker_thread_data_t>(); -        } -          _props.resize(size);          static const uint64_t zero = 0;          _zero_buffs.resize(size, &zero); @@ -170,15 +144,7 @@ public:       * \param get_buff the getter function       */      void set_xport_chan_get_buff(const size_t xport_chan, const get_buff_type &get_buff){ -        if (_worker_threads[xport_chan]) -        { -            _worker_thread_group.remove_thread(_worker_threads[xport_chan]); -            _worker_data[xport_chan]->stop = true; -            _worker_threads[xport_chan]->join(); -            _worker_data[xport_chan]->stop = false; -        }          _props.at(xport_chan).get_buff = get_buff; -        _worker_threads[xport_chan] = _worker_thread_group.create_thread(boost::bind(&send_packet_handler::worker, this, xport_chan));      }      //! Set the conversion routine for all channels @@ -414,147 +380,63 @@ private:          if_packet_info.num_payload_words32 = (if_packet_info.num_payload_bytes + 3/*round up*/)/sizeof(uint32_t);          if_packet_info.packet_count = _next_packet_seq; -        // wait for all worker threads to be ready or timeout -        boost::system_time expiration = boost::get_system_time() + boost::posix_time::milliseconds(long(timeout * 1000)); -        for (size_t i = 0; i < this->size(); i++) -        { -            while (not _worker_data[i]->ready) -            { -                if (boost::get_system_time() > expiration) -                { -                    return 0; -                } -            } -            _worker_data[i]->ready = false; +        //get a buffer for each channel or timeout +        BOOST_FOREACH(xport_chan_props_type &props, _props){ +            if (not props.buff) props.buff = props.get_buff(timeout); +            if (not props.buff) return 0; //timeout          } -        //setup the data to share with worker threads +        //setup the data to share with converter threads          _convert_nsamps = nsamps_per_buff;          _convert_buffs = &buffs;          _convert_buffer_offset_bytes = buffer_offset_bytes;          _convert_if_packet_info = &if_packet_info; -        //start N channels of conversion -        for (size_t i = 0; i < this->size(); i++) -        { -            _worker_data[i]->go = true; -        } - -        //make sure any sleeping worker threads are woken up -        for (size_t i = 0; i < this->size(); i++) -        { -            // Acquiring the lock used by the condition variable -            // takes too long, so do a spin wait.  If the go flag -            // is not cleared by this point, it will be cleared -            // immediately by the worker thread when it wakes up. -            while (_worker_data[i]->go) -            { -                _worker_data[i]->data_ready.notify_one(); -            } -        } - -        //wait for all worker threads to be done -        for (size_t i = 0; i < this->size(); i++) -        { -            //TODO: Implement a better wait strategy -            //busy loop give fastest response, but these are just wasted cycles -            while (not _worker_data[i]->done) {} -            _worker_data[i]->done = false; +        //perform N channels of conversion +        for (size_t i = 0; i < this->size(); i++) { +            convert_to_in_buff(i);          }          _next_packet_seq++; //increment sequence after commits          return nsamps_per_buff;      } -    /*! Worker thread routine. +    /*! Run the conversion from the internal buffers to the user's input +     *  buffer.       * -     * - Gets an internal data buffer       * - Calls the converter       * - Releases internal data buffers +     * - Updates read/write pointers       */ -    void worker(const size_t index) +    UHD_INLINE void convert_to_in_buff(const size_t index)      { -        //maximum number of cycles to spin before waiting on condition variable -        //the value of 30000000 was derived from 15ms on a 10 GHz CPU divided by 5 cycles per loop -        //the assumption is that anything held up for 15ms can wait -        static const size_t MAX_SPIN_CYCLES = 30000000; - -        //maximum amount of time to wait before checking the stop flag -        static const double MAX_WAIT = 0.1; - -        managed_send_buffer::sptr buff; -        vrt::if_packet_info_t if_packet_info; -        std::vector<const void *> in_buffs(MAX_INTERLEAVE); -        boost::shared_ptr<worker_thread_data_t> worker_data = _worker_data[index]; -        boost::unique_lock<boost::mutex> lock(worker_data->data_ready_lock); -        size_t spins = 0; - -        while (not worker_data->stop) -        { -            if (not buff) -            { -                buff = _props[index].get_buff(MAX_WAIT); -                if (not buff) -                { -                    continue; -                } -                worker_data->ready = true; -            } - -            //make sure done flag is cleared by controlling thread before waiting on go signal -            if (worker_data->done) -            { -                continue; -            } - -            //partial spin lock before wait -            while (not worker_data->go and spins < MAX_SPIN_CYCLES) -            { -                spins++; -            } -            if (not worker_data->go and -                not worker_data->data_ready.timed_wait(lock, boost::posix_time::milliseconds(long(MAX_WAIT*1000)))) -            { -                continue; -            } -            // Clear the go flag immediately to let the -            // controlling thread know we are not sleeping. -            worker_data->go = false; - -            //reset the spin count -            spins = 0; - -            //pack metadata into a vrt header -            uint32_t *otw_mem = buff->cast<uint32_t *>() + _header_offset_words32; -            if_packet_info = *_convert_if_packet_info; -            if_packet_info.has_sid = _props[index].has_sid; -            if_packet_info.sid = _props[index].sid; -            _vrt_packer(otw_mem, if_packet_info); -            otw_mem += if_packet_info.num_header_words32; - -            //prepare the input buffers -            for (size_t i = 0; i < _num_inputs; i++) -            { -                in_buffs[i] = -                    (reinterpret_cast<const char *>((*_convert_buffs)[index*_num_inputs + i])) -                    + _convert_buffer_offset_bytes; -            } - -            //perform the conversion operation -            _converter->conv(in_buffs, otw_mem, _convert_nsamps); - -            //let the master know that new data can be prepared -            _worker_data[index]->done = true; - -            //commit the samples to the zero-copy interface -            buff->commit( -                (_header_offset_words32 + if_packet_info.num_packet_words32) -                * sizeof(uint32_t) -            ); - -            //release the buffer -            buff.reset(); +        //shortcut references to local data structures +        managed_send_buffer::sptr &buff = _props[index].buff; +        vrt::if_packet_info_t if_packet_info = *_convert_if_packet_info; +        const tx_streamer::buffs_type &buffs = *_convert_buffs; + +        //fill IO buffs with pointers into the output buffer +        const void *io_buffs[4/*max interleave*/]; +        for (size_t i = 0; i < _num_inputs; i++){ +            const char *b = reinterpret_cast<const char *>(buffs[index*_num_inputs + i]); +            io_buffs[i] = b + _convert_buffer_offset_bytes;          } +        const ref_vector<const void *> in_buffs(io_buffs, _num_inputs); + +        //pack metadata into a vrt header +        uint32_t *otw_mem = buff->cast<uint32_t *>() + _header_offset_words32; +        if_packet_info.has_sid = _props[index].has_sid; +        if_packet_info.sid = _props[index].sid; +        _vrt_packer(otw_mem, if_packet_info); +        otw_mem += if_packet_info.num_header_words32; + +        //perform the conversion operation +        _converter->conv(in_buffs, otw_mem, _convert_nsamps); + +        //commit the samples to the zero-copy interface +        const size_t num_vita_words32 = _header_offset_words32+if_packet_info.num_packet_words32; +        buff->commit(num_vita_words32*sizeof(uint32_t)); +        buff.reset(); //effectively a release      }      //! Shared variables for the worker threads @@ -562,18 +444,7 @@ private:      const tx_streamer::buffs_type *_convert_buffs;      size_t _convert_buffer_offset_bytes;      vrt::if_packet_info_t *_convert_if_packet_info; -    struct worker_thread_data_t { -        worker_thread_data_t() : ready(false), go(false), done(false), stop(false) {} -        boost::atomic_bool ready; -        boost::atomic_bool go; -        boost::atomic_bool done; -        boost::atomic_bool stop; -        boost::mutex data_ready_lock; -        boost::condition_variable data_ready; -    }; -    std::vector< boost::shared_ptr<worker_thread_data_t> > _worker_data; -    boost::thread_group _worker_thread_group; -    std::vector<boost::thread *> _worker_threads; +  };  class send_packet_streamer : public send_packet_handler, public tx_streamer{ diff --git a/host/lib/utils/tasks.cpp b/host/lib/utils/tasks.cpp index 4cc28e48b..5dac729c8 100644 --- a/host/lib/utils/tasks.cpp +++ b/host/lib/utils/tasks.cpp @@ -32,7 +32,7 @@ public:      task_impl(const task_fcn_type &task_fcn):          _spawn_barrier(2)      { -        _thread_group.create_thread(boost::bind(&task_impl::task_loop, this, task_fcn)); +        (void)_thread_group.create_thread(boost::bind(&task_impl::task_loop, this, task_fcn));          _spawn_barrier.wait();      } @@ -99,7 +99,7 @@ public:      msg_task_impl(const task_fcn_type &task_fcn):          _spawn_barrier(2)      { -        _thread_group.create_thread(boost::bind(&msg_task_impl::task_loop, this, task_fcn)); +        (void)_thread_group.create_thread(boost::bind(&msg_task_impl::task_loop, this, task_fcn));          _spawn_barrier.wait();      } | 
