aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/transport/udp_zero_copy_asio.cpp
diff options
context:
space:
mode:
authorJosh Blum <josh@joshknows.com>2011-02-20 01:13:03 -0800
committerJosh Blum <josh@joshknows.com>2011-02-20 01:13:03 -0800
commit2d9838f467013d5397b6daf83afb5ccea92065a4 (patch)
treecc34b6106523307a2f18eff86c3311f3458d9eb7 /host/lib/transport/udp_zero_copy_asio.cpp
parent8ab02dbef4cb36b2419371c131ff8c8edae88ce8 (diff)
downloaduhd-2d9838f467013d5397b6daf83afb5ccea92065a4.tar.gz
uhd-2d9838f467013d5397b6daf83afb5ccea92065a4.tar.bz2
uhd-2d9838f467013d5397b6daf83afb5ccea92065a4.zip
udp: update docs for transport, create common header for wait implementation
Reimplemented simple udp transport with one impl class. Moved wait for ready/select implementation into common header. Important note on select, timeval should have usecs < 1 second or it may error on some platforms. Fixed in this implementation.
Diffstat (limited to 'host/lib/transport/udp_zero_copy_asio.cpp')
-rw-r--r--host/lib/transport/udp_zero_copy_asio.cpp319
1 files changed, 0 insertions, 319 deletions
diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp
deleted file mode 100644
index 05352ffce..000000000
--- a/host/lib/transport/udp_zero_copy_asio.cpp
+++ /dev/null
@@ -1,319 +0,0 @@
-//
-// Copyright 2010-2011 Ettus Research LLC
-//
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU General Public License for more details.
-//
-// You should have received a copy of the GNU General Public License
-// along with this program. If not, see <http://www.gnu.org/licenses/>.
-//
-
-#include <uhd/transport/udp_zero_copy.hpp>
-#include <uhd/transport/udp_simple.hpp> //mtu
-#include <uhd/transport/bounded_buffer.hpp>
-#include <uhd/transport/buffer_pool.hpp>
-#include <uhd/utils/assert.hpp>
-#include <uhd/utils/warning.hpp>
-#include <boost/asio.hpp>
-#include <boost/format.hpp>
-#include <iostream>
-#include <list>
-
-using namespace uhd;
-using namespace uhd::transport;
-namespace asio = boost::asio;
-
-//A reasonable number of frames for send/recv and async/sync
-static const size_t DEFAULT_NUM_FRAMES = 32;
-
-/***********************************************************************
- * Reusable managed receiver buffer:
- * - Initialize with memory and a release callback.
- * - Call get new with a length in bytes to re-use.
- **********************************************************************/
-class udp_zero_copy_asio_mrb : public managed_recv_buffer{
-public:
- typedef boost::function<void(udp_zero_copy_asio_mrb *)> release_cb_type;
-
- udp_zero_copy_asio_mrb(void *mem, const release_cb_type &release_cb):
- _mem(mem), _len(0), _release_cb(release_cb){/* NOP */}
-
- void release(void){
- if (_len == 0) return;
- this->_release_cb(this);
- _len = 0;
- }
-
- sptr get_new(size_t len){
- _len = len;
- return sptr(this, &udp_zero_copy_asio_mrb::fake_deleter);
- }
-
- template <class T> T cast(void) const{return static_cast<T>(_mem);}
-
-private:
- static void fake_deleter(void *obj){
- static_cast<udp_zero_copy_asio_mrb *>(obj)->release();
- }
-
- const void *get_buff(void) const{return _mem;}
- size_t get_size(void) const{return _len;}
-
- void *_mem;
- size_t _len;
- release_cb_type _release_cb;
-};
-
-/***********************************************************************
- * Reusable managed send buffer:
- * - Initialize with memory and a commit callback.
- * - Call get new with a length in bytes to re-use.
- **********************************************************************/
-class udp_zero_copy_asio_msb : public managed_send_buffer{
-public:
- typedef boost::function<void(udp_zero_copy_asio_msb *, size_t)> commit_cb_type;
-
- udp_zero_copy_asio_msb(void *mem, const commit_cb_type &commit_cb):
- _mem(mem), _len(0), _commit_cb(commit_cb){/* NOP */}
-
- void commit(size_t len){
- if (_len == 0) return;
- this->_commit_cb(this, len);
- _len = 0;
- }
-
- sptr get_new(size_t len){
- _len = len;
- return sptr(this, &udp_zero_copy_asio_msb::fake_deleter);
- }
-
-private:
- static void fake_deleter(void *obj){
- static_cast<udp_zero_copy_asio_msb *>(obj)->commit(0);
- }
-
- void *get_buff(void) const{return _mem;}
- size_t get_size(void) const{return _len;}
-
- void *_mem;
- size_t _len;
- commit_cb_type _commit_cb;
-};
-
-/***********************************************************************
- * Zero Copy UDP implementation with ASIO:
- * This is the portable zero copy implementation for systems
- * where a faster, platform specific solution is not available.
- * However, it is not a true zero copy implementation as each
- * send and recv requires a copy operation to/from userspace.
- **********************************************************************/
-class udp_zero_copy_asio_impl : public udp_zero_copy{
-public:
- typedef boost::shared_ptr<udp_zero_copy_asio_impl> sptr;
-
- udp_zero_copy_asio_impl(
- const std::string &addr,
- const std::string &port,
- const device_addr_t &hints
- ):
- _recv_frame_size(size_t(hints.cast<double>("recv_frame_size", udp_simple::mtu))),
- _num_recv_frames(size_t(hints.cast<double>("num_recv_frames", DEFAULT_NUM_FRAMES))),
- _send_frame_size(size_t(hints.cast<double>("send_frame_size", udp_simple::mtu))),
- _num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_FRAMES))),
- _recv_buffer_pool(buffer_pool::make(_num_recv_frames, _recv_frame_size)),
- _send_buffer_pool(buffer_pool::make(_num_send_frames, _send_frame_size)),
- _pending_recv_buffs(_num_recv_frames),
- _pending_send_buffs(_num_send_frames)
- {
- //std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl;
-
- //resolve the address
- asio::ip::udp::resolver resolver(_io_service);
- asio::ip::udp::resolver::query query(asio::ip::udp::v4(), addr, port);
- asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query);
-
- //create, open, and connect the socket
- _socket = new asio::ip::udp::socket(_io_service);
- _socket->open(asio::ip::udp::v4());
- _socket->connect(receiver_endpoint);
- _sock_fd = _socket->native();
-
- //allocate re-usable managed receive buffers
- for (size_t i = 0; i < get_num_recv_frames(); i++){
- _mrb_pool.push_back(udp_zero_copy_asio_mrb(_recv_buffer_pool->at(i),
- boost::bind(&udp_zero_copy_asio_impl::release, this, _1))
- );
- handle_recv(&_mrb_pool.back());
- }
-
- //allocate re-usable managed send buffers
- for (size_t i = 0; i < get_num_send_frames(); i++){
- _msb_pool.push_back(udp_zero_copy_asio_msb(_send_buffer_pool->at(i),
- boost::bind(&udp_zero_copy_asio_impl::commit, this, _1, _2))
- );
- handle_send(&_msb_pool.back());
- }
- }
-
- ~udp_zero_copy_asio_impl(void){
- delete _socket;
- }
-
- //get size for internal socket buffer
- template <typename Opt> size_t get_buff_size(void) const{
- Opt option;
- _socket->get_option(option);
- return option.value();
- }
-
- //set size for internal socket buffer
- template <typename Opt> size_t resize_buff(size_t num_bytes){
- Opt option(num_bytes);
- _socket->set_option(option);
- return get_buff_size<Opt>();
- }
-
- /*******************************************************************
- * Receive implementation:
- *
- * Use select to perform a blocking receive with timeout.
- * Return the managed receive buffer with the new length.
- * When the caller is finished with the managed buffer,
- * the managed receive buffer is released back into the queue.
- ******************************************************************/
- UHD_INLINE bool is_recv_ready(double timeout){
- //setup timeval for timeout
- timeval tv;
- tv.tv_sec = 0;
- tv.tv_usec = long(timeout*1e6);
-
- //setup rset for timeout
- fd_set rset;
- FD_ZERO(&rset);
- FD_SET(_sock_fd, &rset);
-
- //call select with timeout on receive socket
- return ::select(_sock_fd+1, &rset, NULL, NULL, &tv) > 0;
- }
-
- managed_recv_buffer::sptr get_recv_buff(double timeout){
- udp_zero_copy_asio_mrb *mrb = NULL;
- if (is_recv_ready(timeout) and _pending_recv_buffs.pop_with_timed_wait(mrb, timeout)){
- return mrb->get_new(::recv(_sock_fd, mrb->cast<char *>(), _recv_frame_size, 0));
- }
- return managed_recv_buffer::sptr();
- }
-
- UHD_INLINE void handle_recv(udp_zero_copy_asio_mrb *mrb){
- _pending_recv_buffs.push_with_haste(mrb);
- }
-
- void release(udp_zero_copy_asio_mrb *mrb){
- handle_recv(mrb);
- }
-
- size_t get_num_recv_frames(void) const {return _num_recv_frames;}
- size_t get_recv_frame_size(void) const {return _recv_frame_size;}
-
- /*******************************************************************
- * Send implementation:
- *
- * Get a managed receive buffer immediately with max length set.
- * The caller will fill the buffer and commit it when finished.
- * The commit routine will perform a blocking send operation,
- * and push the managed send buffer back into the queue.
- ******************************************************************/
- managed_send_buffer::sptr get_send_buff(double timeout){
- udp_zero_copy_asio_msb *msb = NULL;
- if (_pending_send_buffs.pop_with_timed_wait(msb, timeout)){
- return msb->get_new(_send_frame_size);
- }
- return managed_send_buffer::sptr();
- }
-
- UHD_INLINE void handle_send(udp_zero_copy_asio_msb *msb){
- _pending_send_buffs.push_with_haste(msb);
- }
-
- void commit(udp_zero_copy_asio_msb *msb, size_t len){
- ::send(_sock_fd, msb->cast<const char *>(), len, 0);
- handle_send(msb);
- }
-
- size_t get_num_send_frames(void) const {return _num_send_frames;}
- size_t get_send_frame_size(void) const {return _send_frame_size;}
-
-private:
- //memory management -> buffers and fifos
- const size_t _recv_frame_size, _num_recv_frames;
- const size_t _send_frame_size, _num_send_frames;
- buffer_pool::sptr _recv_buffer_pool, _send_buffer_pool;
- bounded_buffer<udp_zero_copy_asio_mrb *> _pending_recv_buffs;
- bounded_buffer<udp_zero_copy_asio_msb *> _pending_send_buffs;
- std::list<udp_zero_copy_asio_msb> _msb_pool;
- std::list<udp_zero_copy_asio_mrb> _mrb_pool;
-
- //asio guts -> socket and service
- asio::io_service _io_service;
- asio::ip::udp::socket *_socket;
- int _sock_fd;
-};
-
-/***********************************************************************
- * UDP zero copy make function
- **********************************************************************/
-template<typename Opt> static void resize_buff_helper(
- udp_zero_copy_asio_impl::sptr udp_trans,
- const size_t target_size,
- const std::string &name
-){
- std::string help_message;
- #if defined(UHD_PLATFORM_LINUX)
- help_message = str(boost::format(
- "Please run: sudo sysctl -w net.core.%smem_max=%d\n"
- ) % ((name == "recv")?"r":"w") % target_size);
- #endif /*defined(UHD_PLATFORM_LINUX)*/
-
- //resize the buffer if size was provided
- if (target_size > 0){
- size_t actual_size = udp_trans->resize_buff<Opt>(target_size);
- if (target_size != actual_size) std::cout << boost::format(
- "Target %s sock buff size: %d bytes\n"
- "Actual %s sock buff size: %d bytes"
- ) % name % target_size % name % actual_size << std::endl;
- else std::cout << boost::format(
- "Current %s sock buff size: %d bytes"
- ) % name % actual_size << std::endl;
- if (actual_size < target_size) uhd::warning::post(str(boost::format(
- "The %s buffer could not be resized sufficiently.\n"
- "See the transport application notes on buffer resizing.\n%s"
- ) % name % help_message));
- }
-}
-
-udp_zero_copy::sptr udp_zero_copy::make(
- const std::string &addr,
- const std::string &port,
- const device_addr_t &hints
-){
- udp_zero_copy_asio_impl::sptr udp_trans(
- new udp_zero_copy_asio_impl(addr, port, hints)
- );
-
- //extract buffer size hints from the device addr
- size_t recv_buff_size = size_t(hints.cast<double>("recv_buff_size", 0.0));
- size_t send_buff_size = size_t(hints.cast<double>("send_buff_size", 0.0));
-
- //call the helper to resize send and recv buffers
- resize_buff_helper<asio::socket_base::receive_buffer_size>(udp_trans, recv_buff_size, "recv");
- resize_buff_helper<asio::socket_base::send_buffer_size> (udp_trans, send_buff_size, "send");
-
- return udp_trans;
-}