aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/transport/udp_zero_copy_asio.cpp
diff options
context:
space:
mode:
authorJosh Blum <josh@joshknows.com>2010-05-18 17:46:34 -0700
committerJosh Blum <josh@joshknows.com>2010-05-18 17:46:34 -0700
commit21e9cf4bece884ced5b7294e012be971e941c511 (patch)
tree634f4b8b27d9dc8a6ebfc2ab731bddf62d0bbe22 /host/lib/transport/udp_zero_copy_asio.cpp
parent7c98884eda041b676584059ca3a63b11f1bbd505 (diff)
parent4ea8822a50073b01ede9e0b0f7c8c713767ea1b8 (diff)
downloaduhd-21e9cf4bece884ced5b7294e012be971e941c511.tar.gz
uhd-21e9cf4bece884ced5b7294e012be971e941c511.tar.bz2
uhd-21e9cf4bece884ced5b7294e012be971e941c511.zip
Merge branch 'work'
Diffstat (limited to 'host/lib/transport/udp_zero_copy_asio.cpp')
-rw-r--r--host/lib/transport/udp_zero_copy_asio.cpp136
1 files changed, 72 insertions, 64 deletions
diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp
index ee44803f4..56ba391d3 100644
--- a/host/lib/transport/udp_zero_copy_asio.cpp
+++ b/host/lib/transport/udp_zero_copy_asio.cpp
@@ -16,9 +16,9 @@
//
#include <uhd/transport/udp_zero_copy.hpp>
+#include <uhd/utils/assert.hpp>
#include <boost/cstdint.hpp>
#include <boost/asio.hpp>
-#include <boost/thread.hpp>
#include <boost/format.hpp>
#include <iostream>
@@ -26,65 +26,56 @@ using namespace uhd::transport;
/***********************************************************************
* Managed receive buffer implementation for udp zero-copy asio:
- * Frees the memory held by the const buffer on done.
**********************************************************************/
class managed_recv_buffer_impl : public managed_recv_buffer{
public:
managed_recv_buffer_impl(const boost::asio::const_buffer &buff) : _buff(buff){
- _done = false;
+ /* NOP */
}
~managed_recv_buffer_impl(void){
- if (not _done) this->done();
+ /* NOP */
}
void done(void){
- _done = true;
- delete [] boost::asio::buffer_cast<const boost::uint32_t *>(_buff);
+ /* NOP */
}
private:
- const boost::asio::const_buffer &get(void){
+ const boost::asio::const_buffer &get(void) const{
return _buff;
}
const boost::asio::const_buffer _buff;
- bool _done;
};
/***********************************************************************
* Managed send buffer implementation for udp zero-copy asio:
- * Sends and frees the memory held by the mutable buffer on done.
**********************************************************************/
class managed_send_buffer_impl : public managed_send_buffer{
public:
managed_send_buffer_impl(
const boost::asio::mutable_buffer &buff,
boost::asio::ip::udp::socket *socket
- ) : _buff(buff){
- _done = false;
- _socket = socket;
+ ) : _buff(buff), _socket(socket){
+ /* NOP */
}
~managed_send_buffer_impl(void){
- if (not _done) this->done(0);
+ /* NOP */
}
void done(size_t num_bytes){
- _done = true;
- boost::uint32_t *mem = boost::asio::buffer_cast<boost::uint32_t *>(_buff);
- _socket->send(boost::asio::buffer(mem, num_bytes));
- delete [] mem;
+ _socket->send(boost::asio::buffer(_buff, num_bytes));
}
private:
- const boost::asio::mutable_buffer &get(void){
+ const boost::asio::mutable_buffer &get(void) const{
return _buff;
}
const boost::asio::mutable_buffer _buff;
boost::asio::ip::udp::socket *_socket;
- bool _done;
};
/***********************************************************************
@@ -94,8 +85,12 @@ private:
* However, it is not a true zero copy implementation as each
* send and recv requires a copy operation to/from userspace.
**********************************************************************/
+static const size_t max_buff_size = 2000; //assume max size on send and recv
+
class udp_zero_copy_impl : public udp_zero_copy{
public:
+ typedef boost::shared_ptr<udp_zero_copy_impl> sptr;
+
//structors
udp_zero_copy_impl(const std::string &addr, const std::string &port);
~udp_zero_copy_impl(void);
@@ -104,23 +99,27 @@ public:
managed_recv_buffer::sptr get_recv_buff(void);
managed_send_buffer::sptr get_send_buff(void);
- //resize
- size_t resize_recv_buff(size_t num_bytes){
- boost::asio::socket_base::receive_buffer_size option(num_bytes);
- _socket->set_option(option);
+ //manage buffer
+ template <typename Opt> size_t get_buff_size(void){
+ Opt option;
_socket->get_option(option);
return option.value();
}
- size_t resize_send_buff(size_t num_bytes){
- boost::asio::socket_base::send_buffer_size option(num_bytes);
+
+ template <typename Opt> size_t resize_buff(size_t num_bytes){
+ Opt option(num_bytes);
_socket->set_option(option);
- _socket->get_option(option);
- return option.value();
+ return get_buff_size<Opt>();
}
private:
boost::asio::ip::udp::socket *_socket;
boost::asio::io_service _io_service;
+
+ //send and recv buffer memory (allocated once)
+ boost::uint8_t _send_mem[max_buff_size], _recv_mem[max_buff_size];
+
+ managed_send_buffer::sptr _send_buff;
};
udp_zero_copy_impl::udp_zero_copy_impl(const std::string &addr, const std::string &port){
@@ -131,10 +130,25 @@ udp_zero_copy_impl::udp_zero_copy_impl(const std::string &addr, const std::strin
boost::asio::ip::udp::resolver::query query(boost::asio::ip::udp::v4(), addr, port);
boost::asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query);
- // Create, open, and connect the socket
+ // create, open, and connect the socket
_socket = new boost::asio::ip::udp::socket(_io_service);
_socket->open(boost::asio::ip::udp::v4());
_socket->connect(receiver_endpoint);
+
+ // create the managed send buff (just once)
+ _send_buff = managed_send_buffer::sptr(new managed_send_buffer_impl(
+ boost::asio::buffer(_send_mem, max_buff_size), _socket
+ ));
+
+ // set recv timeout
+ timeval tv;
+ tv.tv_sec = 0;
+ tv.tv_usec = 100*1000; //100 ms
+ UHD_ASSERT_THROW(setsockopt(
+ _socket->native(),
+ SOL_SOCKET, SO_RCVTIMEO,
+ (const char *)&tv, sizeof(timeval)
+ ) == 0);
}
udp_zero_copy_impl::~udp_zero_copy_impl(void){
@@ -142,61 +156,55 @@ udp_zero_copy_impl::~udp_zero_copy_impl(void){
}
managed_recv_buffer::sptr udp_zero_copy_impl::get_recv_buff(void){
- //implement timeout through polling and sleeping
- size_t available = 0;
- boost::asio::deadline_timer timer(_socket->get_io_service());
- timer.expires_from_now(boost::posix_time::milliseconds(100));
- while (not ((available = _socket->available()) or timer.expires_from_now().is_negative())){
- boost::this_thread::sleep(boost::posix_time::milliseconds(1));
- }
-
- //receive only if data is available
- boost::uint32_t *buff_mem = new boost::uint32_t[available/sizeof(boost::uint32_t)];
- if (available){
- available = _socket->receive(boost::asio::buffer(buff_mem, available));
- }
+ //call recv() with timeout option
+ size_t num_bytes = _socket->receive(boost::asio::buffer(_recv_mem, max_buff_size));
//create a new managed buffer to house the data
return managed_recv_buffer::sptr(
- new managed_recv_buffer_impl(boost::asio::buffer(buff_mem, available))
+ new managed_recv_buffer_impl(boost::asio::buffer(_recv_mem, num_bytes))
);
}
managed_send_buffer::sptr udp_zero_copy_impl::get_send_buff(void){
- boost::uint32_t *buff_mem = new boost::uint32_t[2000/sizeof(boost::uint32_t)];
- return managed_send_buffer::sptr(
- new managed_send_buffer_impl(boost::asio::buffer(buff_mem, 2000), _socket)
- );
+ return _send_buff;
}
/***********************************************************************
* UDP zero copy make function
**********************************************************************/
+template<typename Opt> static inline void resize_buff_helper(
+ udp_zero_copy_impl::sptr udp_trans,
+ size_t target_size,
+ const std::string &name
+){
+ static const size_t min_buff_size = size_t(100e3);
+
+ //resize the buffer if size was provided
+ if (target_size > 0){
+ size_t actual_size = udp_trans->resize_buff<Opt>(target_size);
+ if (target_size != actual_size) std::cout << boost::format(
+ "Target %s buffer size: %d\n"
+ "Actual %s byffer size: %d"
+ ) % name % target_size % name % actual_size << std::endl;
+ }
+
+ //otherwise, ensure that the buffer is at least the minimum size
+ else if (udp_trans->get_buff_size<Opt>() < min_buff_size){
+ resize_buff_helper<Opt>(udp_trans, min_buff_size, name);
+ }
+}
+
udp_zero_copy::sptr udp_zero_copy::make(
const std::string &addr,
const std::string &port,
size_t recv_buff_size,
size_t send_buff_size
){
- boost::shared_ptr<udp_zero_copy_impl> udp_trans(new udp_zero_copy_impl(addr, port));
-
- //resize the recv buffer if size was provided
- if (recv_buff_size > 0){
- size_t actual_bytes = udp_trans->resize_recv_buff(recv_buff_size);
- if (recv_buff_size != actual_bytes) std::cout << boost::format(
- "Target recv buffer size: %d\n"
- "Actual recv byffer size: %d"
- ) % recv_buff_size % actual_bytes << std::endl;
- }
+ udp_zero_copy_impl::sptr udp_trans(new udp_zero_copy_impl(addr, port));
- //resize the send buffer if size was provided
- if (send_buff_size > 0){
- size_t actual_bytes = udp_trans->resize_send_buff(send_buff_size);
- if (send_buff_size != actual_bytes) std::cout << boost::format(
- "Target send buffer size: %d\n"
- "Actual send byffer size: %d"
- ) % send_buff_size % actual_bytes << std::endl;
- }
+ //call the helper to resize send and recv buffers
+ resize_buff_helper<boost::asio::socket_base::receive_buffer_size>(udp_trans, recv_buff_size, "recv");
+ resize_buff_helper<boost::asio::socket_base::send_buffer_size> (udp_trans, send_buff_size, "send");
return udp_trans;
}