diff options
| author | Paul David <paul.david@ettus.com> | 2016-04-11 14:21:59 -0700 | 
|---|---|---|
| committer | Ashish Chaudhari <ashish@ettus.com> | 2016-04-18 12:23:45 -0700 | 
| commit | d4bb4266b58dd5ef33040afb6c6f101f2c9895b1 (patch) | |
| tree | 42f13484220067c2ac964ed434ec0d50e4b3cfbd | |
| parent | bad77039c71d9e8099292b0b0b1a8f8471754b84 (diff) | |
| download | uhd-d4bb4266b58dd5ef33040afb6c6f101f2c9895b1.tar.gz uhd-d4bb4266b58dd5ef33040afb6c6f101f2c9895b1.tar.bz2 uhd-d4bb4266b58dd5ef33040afb6c6f101f2c9895b1.zip  | |
transport optimize: Added a thread transport offload to share the workload
| -rw-r--r-- | host/include/uhd/transport/zero_copy_recv_offload.hpp | 50 | ||||
| -rw-r--r-- | host/lib/transport/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | host/lib/transport/zero_copy_recv_offload.cpp | 158 | 
3 files changed, 209 insertions, 0 deletions
diff --git a/host/include/uhd/transport/zero_copy_recv_offload.hpp b/host/include/uhd/transport/zero_copy_recv_offload.hpp new file mode 100644 index 000000000..793753276 --- /dev/null +++ b/host/include/uhd/transport/zero_copy_recv_offload.hpp @@ -0,0 +1,50 @@ +// +// Copyright 2016 Ettus Research +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. +// + +#ifndef INCLUDED_UHD_ZERO_COPY_RECV_OFFLOAD_HPP +#define INCLUDED_UHD_ZERO_COPY_RECV_OFFLOAD_HPP + +#include <uhd/config.hpp> +#include <uhd/transport/zero_copy.hpp> +#include <boost/shared_ptr.hpp> + +namespace uhd{ namespace transport{ + +/*! + * A threaded transport offload that is meant to relieve the main thread of + * the responsibility of making receive calls. + */ +class UHD_API zero_copy_recv_offload : public virtual zero_copy_if { +public: +    typedef boost::shared_ptr<zero_copy_recv_offload> sptr; + +    /*! +     * This transport offload adds a receive thread in order to +     * communicate with the underlying transport. It is meant to be +     * used in cases where the main thread needs to be relieved of the burden +     * of the underlying transport receive calls. +     * +     * \param transport a shared pointer to the transport interface +     * \param timeout a general timeout for pushing and pulling on the bounded buffer +     */ +    static sptr make(zero_copy_if::sptr transport, +                     const double timeout); +}; + +}} //namespace + +#endif /* INCLUDED_ZERO_COPY_OFFLOAD_HPP */ diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt index 79c8a90b7..db21b9f8e 100644 --- a/host/lib/transport/CMakeLists.txt +++ b/host/lib/transport/CMakeLists.txt @@ -122,6 +122,7 @@ LIBUHD_PYTHON_GEN_SOURCE(  )  LIBUHD_APPEND_SOURCES( +    ${CMAKE_CURRENT_SOURCE_DIR}/zero_copy_recv_offload.cpp      ${CMAKE_CURRENT_SOURCE_DIR}/tcp_zero_copy.cpp      ${CMAKE_CURRENT_SOURCE_DIR}/buffer_pool.cpp      ${CMAKE_CURRENT_SOURCE_DIR}/if_addrs.cpp diff --git a/host/lib/transport/zero_copy_recv_offload.cpp b/host/lib/transport/zero_copy_recv_offload.cpp new file mode 100644 index 000000000..e8b013abc --- /dev/null +++ b/host/lib/transport/zero_copy_recv_offload.cpp @@ -0,0 +1,158 @@ +// +// Copyright 2016 Ettus Research +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. +// + +#include <uhd/transport/zero_copy_recv_offload.hpp> +#include <uhd/transport/bounded_buffer.hpp> +#include <uhd/transport/buffer_pool.hpp> +#include <uhd/utils/msg.hpp> +#include <uhd/utils/log.hpp> +#include <uhd/utils/safe_call.hpp> +#include <boost/format.hpp> +#include <boost/make_shared.hpp> +#include <boost/thread/mutex.hpp> +#include <boost/thread/thread.hpp> +#include <boost/bind.hpp> + +using namespace uhd; +using namespace uhd::transport; + +typedef bounded_buffer<managed_recv_buffer::sptr> bounded_buffer_t; + +/*********************************************************************** + * Zero copy offload transport: + * An intermediate transport that utilizes threading to free + * the main thread from any receive work. + **********************************************************************/ +class zero_copy_recv_offload_impl : public zero_copy_recv_offload { +public: +    typedef boost::shared_ptr<zero_copy_recv_offload_impl> sptr; + +    zero_copy_recv_offload_impl(zero_copy_if::sptr transport, +                          const double timeout) : +        _transport(transport), _timeout(timeout), +        _inbox(transport->get_num_recv_frames()), +        _recv_done(false) +    { +        UHD_LOG << "Created threaded transport" << std::endl; + +        // Create the receive and send threads to offload +        // the system calls onto other threads +        _recv_thread = boost::thread( +            boost::bind(&zero_copy_recv_offload_impl::enqueue_recv, this) +        ); +    } + +    // Receive thread flags +    void set_recv_done() +    { +        boost::lock_guard<boost::mutex> guard(_recv_mutex); +        _recv_done = true; +    } + +    bool is_recv_done() +    { +        boost::lock_guard<boost::mutex> guard(_recv_mutex); +        return _recv_done; +    } + +    ~zero_copy_recv_offload_impl() +    { +        // Signal the threads we're finished +        set_recv_done(); + +        // Wait for them to join +        UHD_SAFE_CALL( +            _recv_thread.join(); +        ) +    } + +    // The receive thread function is responsible for +    // pulling pointers to managed receiver buffers quickly +    void enqueue_recv() +    { +        while (not is_recv_done()) { +            managed_recv_buffer::sptr buff = _transport->get_recv_buff(_timeout); +            if (not buff) continue; +            _inbox.push_with_timed_wait(buff, _timeout); +        } +    } + +    /******************************************************************* +     * Receive implementation: +     * Pop the receive buffer pointer from the underlying transport +     ******************************************************************/ +    managed_recv_buffer::sptr get_recv_buff(double timeout) +    { +        managed_recv_buffer::sptr ptr; +        _inbox.pop_with_timed_wait(ptr, timeout); +        return ptr; +    } + +    size_t get_num_recv_frames() const +    { +        return _transport->get_num_recv_frames(); +    } + +    size_t get_recv_frame_size() const +    { +        return _transport->get_recv_frame_size(); +    } + +    /******************************************************************* +     * Send implementation: +     * Pass the send buffer pointer from the underlying transport +     ******************************************************************/ +    managed_send_buffer::sptr get_send_buff(double timeout) +    { +        return _transport->get_send_buff(timeout); +    } + +    size_t get_num_send_frames() const +    { +        return _transport->get_num_send_frames(); +    } + +    size_t get_send_frame_size() const +    { +        return _transport->get_send_frame_size(); +    } + +private: +    // The linked transport +    zero_copy_if::sptr _transport; + +    const double _timeout; + +    // Shared buffers +    bounded_buffer_t _inbox; + +    // Threading +    bool _recv_done; +    boost::thread _recv_thread; +    boost::mutex  _recv_mutex; +}; + +zero_copy_recv_offload::sptr zero_copy_recv_offload::make( +        zero_copy_if::sptr transport, +        const double timeout) +{ +    zero_copy_recv_offload_impl::sptr zero_copy_recv_offload( +        new zero_copy_recv_offload_impl(transport, timeout) +    ); + +    return zero_copy_recv_offload; +}  | 
