diff options
Diffstat (limited to 'host/lib')
| -rw-r--r-- | host/lib/transport/muxed_zero_copy_if.cpp | 68 | 
1 files changed, 59 insertions, 9 deletions
diff --git a/host/lib/transport/muxed_zero_copy_if.cpp b/host/lib/transport/muxed_zero_copy_if.cpp index 996db3c98..7a2b76165 100644 --- a/host/lib/transport/muxed_zero_copy_if.cpp +++ b/host/lib/transport/muxed_zero_copy_if.cpp @@ -74,7 +74,12 @@ public:          if (_streams.size() >= _max_num_streams) {              throw uhd::runtime_error("muxed_zero_copy_if: stream capacity exceeded. cannot create more streams.");          } -        stream_impl::sptr stream = boost::make_shared<stream_impl>(this->shared_from_this(), stream_num); +        // Only allocate a portion of the base transport's frames to each stream +        // to prevent all streams from attempting to use all the frames. +        stream_impl::sptr stream = boost::make_shared<stream_impl>( +            this->shared_from_this(), stream_num, +            _base_xport->get_num_send_frames() / _max_num_streams, +            _base_xport->get_num_recv_frames() / _max_num_streams);          _streams[stream_num] = stream;          return stream;      } @@ -91,16 +96,55 @@ public:      }  private: +    /* +     * @class stream_mrb is used to copy the data and release the original +     * managed receive buffer back to the base transport. +     */ +    class stream_mrb : public managed_recv_buffer +    { +    public: +        stream_mrb(size_t size) : _buff(new char[size]) {} + +        ~stream_mrb() { +            delete _buff; +        } + +        void release() {} + +        UHD_INLINE sptr get_new(char *buff, size_t len) +        { +            memcpy(_buff, buff, len); +            return make(this, _buff, len); +        } + +    private: +        char		*_buff; +    }; +      class stream_impl : public zero_copy_if      {      public:          typedef boost::shared_ptr<stream_impl> sptr;          typedef boost::weak_ptr<stream_impl> wptr; -        stream_impl(muxed_zero_copy_if_impl::sptr muxed_xport, const uint32_t stream_num): +        stream_impl( +            muxed_zero_copy_if_impl::sptr muxed_xport, +            const uint32_t stream_num, +            const size_t num_send_frames, +            const size_t num_recv_frames +            ) :              _stream_num(stream_num), _muxed_xport(muxed_xport), -            _buff_queue(muxed_xport->base_xport()->get_num_recv_frames()) +            _num_send_frames(num_send_frames), +            _send_frame_size(_muxed_xport->base_xport()->get_send_frame_size()), +            _num_recv_frames(num_recv_frames), +            _recv_frame_size(_muxed_xport->base_xport()->get_recv_frame_size()), +            _buff_queue(num_recv_frames), +            _buffers(num_recv_frames), +            _buffer_index(0)          { +            for (size_t i = 0; i < num_recv_frames; i++) { +                _buffers[i] = boost::make_shared<stream_mrb>(_recv_frame_size); +            }          }          ~stream_impl(void) @@ -116,11 +160,11 @@ private:          }          size_t get_num_recv_frames(void) const { -            return _muxed_xport->base_xport()->get_num_recv_frames(); +            return _num_recv_frames;          }          size_t get_recv_frame_size(void) const { -            return _muxed_xport->base_xport()->get_recv_frame_size(); +            return _recv_frame_size;          }          managed_recv_buffer::sptr get_recv_buff(double timeout) { @@ -130,19 +174,19 @@ private:              } else {                  return managed_recv_buffer::sptr();              } -          }          void push_recv_buff(managed_recv_buffer::sptr buff) { -            _buff_queue.push_with_wait(buff); +            _buff_queue.push_with_wait(_buffers.at(_buffer_index++)->get_new(buff->cast<char*>(), buff->size())); +            _buffer_index %= _buffers.size();          }          size_t get_num_send_frames(void) const { -            return _muxed_xport->base_xport()->get_num_send_frames(); +            return _num_send_frames;          }          size_t get_send_frame_size(void) const { -            return _muxed_xport->base_xport()->get_send_frame_size(); +            return _send_frame_size;          }          managed_send_buffer::sptr get_send_buff(double timeout) @@ -153,7 +197,13 @@ private:      private:          const uint32_t                              _stream_num;          muxed_zero_copy_if_impl::sptr               _muxed_xport; +        const size_t                                _num_send_frames; +        const size_t                                _send_frame_size; +        const size_t                                _num_recv_frames; +        const size_t                                _recv_frame_size;          bounded_buffer<managed_recv_buffer::sptr>   _buff_queue; +        std::vector< boost::shared_ptr<stream_mrb> >    _buffers; +        size_t                                      _buffer_index;      };      inline zero_copy_if::sptr& base_xport() { return _base_xport; }  | 
