From 7823e08855f1fe48fbc014c37f822a9f22de19ef Mon Sep 17 00:00:00 2001 From: Josh Blum Date: Mon, 17 May 2010 10:12:16 -0700 Subject: simplification of udp asio socket stuff --- host/lib/transport/udp_zero_copy_asio.cpp | 75 +++++++++++++++---------------- 1 file changed, 37 insertions(+), 38 deletions(-) (limited to 'host/lib/transport/udp_zero_copy_asio.cpp') diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp index ee44803f4..c26b39867 100644 --- a/host/lib/transport/udp_zero_copy_asio.cpp +++ b/host/lib/transport/udp_zero_copy_asio.cpp @@ -16,9 +16,9 @@ // #include +#include #include #include -#include #include #include @@ -26,65 +26,56 @@ using namespace uhd::transport; /*********************************************************************** * Managed receive buffer implementation for udp zero-copy asio: - * Frees the memory held by the const buffer on done. **********************************************************************/ class managed_recv_buffer_impl : public managed_recv_buffer{ public: managed_recv_buffer_impl(const boost::asio::const_buffer &buff) : _buff(buff){ - _done = false; + /* NOP */ } ~managed_recv_buffer_impl(void){ - if (not _done) this->done(); + /* NOP */ } void done(void){ - _done = true; - delete [] boost::asio::buffer_cast(_buff); + /* NOP */ } private: - const boost::asio::const_buffer &get(void){ + const boost::asio::const_buffer &get(void) const{ return _buff; } const boost::asio::const_buffer _buff; - bool _done; }; /*********************************************************************** * Managed send buffer implementation for udp zero-copy asio: - * Sends and frees the memory held by the mutable buffer on done. **********************************************************************/ class managed_send_buffer_impl : public managed_send_buffer{ public: managed_send_buffer_impl( const boost::asio::mutable_buffer &buff, boost::asio::ip::udp::socket *socket - ) : _buff(buff){ - _done = false; - _socket = socket; + ) : _buff(buff), _socket(socket){ + /* NOP */ } ~managed_send_buffer_impl(void){ - if (not _done) this->done(0); + /* NOP */ } void done(size_t num_bytes){ - _done = true; - boost::uint32_t *mem = boost::asio::buffer_cast(_buff); - _socket->send(boost::asio::buffer(mem, num_bytes)); - delete [] mem; + _socket->send(boost::asio::buffer(_buff, num_bytes)); } private: - const boost::asio::mutable_buffer &get(void){ + const boost::asio::mutable_buffer &get(void) const{ return _buff; } const boost::asio::mutable_buffer _buff; boost::asio::ip::udp::socket *_socket; - bool _done; }; /*********************************************************************** @@ -94,6 +85,8 @@ private: * However, it is not a true zero copy implementation as each * send and recv requires a copy operation to/from userspace. **********************************************************************/ +static const size_t max_buff_size = 2000; //assume max size on send and recv + class udp_zero_copy_impl : public udp_zero_copy{ public: //structors @@ -121,6 +114,11 @@ public: private: boost::asio::ip::udp::socket *_socket; boost::asio::io_service _io_service; + + //send and recv buffer memory (allocated once) + boost::uint8_t _send_mem[max_buff_size], _recv_mem[max_buff_size]; + + managed_send_buffer::sptr _send_buff; }; udp_zero_copy_impl::udp_zero_copy_impl(const std::string &addr, const std::string &port){ @@ -131,10 +129,25 @@ udp_zero_copy_impl::udp_zero_copy_impl(const std::string &addr, const std::strin boost::asio::ip::udp::resolver::query query(boost::asio::ip::udp::v4(), addr, port); boost::asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query); - // Create, open, and connect the socket + // create, open, and connect the socket _socket = new boost::asio::ip::udp::socket(_io_service); _socket->open(boost::asio::ip::udp::v4()); _socket->connect(receiver_endpoint); + + // create the managed send buff (just once) + _send_buff = managed_send_buffer::sptr(new managed_send_buffer_impl( + boost::asio::buffer(_send_mem, max_buff_size), _socket + )); + + // set recv timeout + timeval tv; + tv.tv_sec = 0; + tv.tv_usec = 100*1000; //100 ms + UHD_ASSERT_THROW(setsockopt( + _socket->native(), + SOL_SOCKET, SO_RCVTIMEO, + (timeval *)&tv, sizeof(timeval) + ) == 0); } udp_zero_copy_impl::~udp_zero_copy_impl(void){ @@ -142,31 +155,17 @@ udp_zero_copy_impl::~udp_zero_copy_impl(void){ } managed_recv_buffer::sptr udp_zero_copy_impl::get_recv_buff(void){ - //implement timeout through polling and sleeping - size_t available = 0; - boost::asio::deadline_timer timer(_socket->get_io_service()); - timer.expires_from_now(boost::posix_time::milliseconds(100)); - while (not ((available = _socket->available()) or timer.expires_from_now().is_negative())){ - boost::this_thread::sleep(boost::posix_time::milliseconds(1)); - } - - //receive only if data is available - boost::uint32_t *buff_mem = new boost::uint32_t[available/sizeof(boost::uint32_t)]; - if (available){ - available = _socket->receive(boost::asio::buffer(buff_mem, available)); - } + //call recv() with timeout option + size_t num_bytes = _socket->receive(boost::asio::buffer(_recv_mem, max_buff_size)); //create a new managed buffer to house the data return managed_recv_buffer::sptr( - new managed_recv_buffer_impl(boost::asio::buffer(buff_mem, available)) + new managed_recv_buffer_impl(boost::asio::buffer(_recv_mem, num_bytes)) ); } managed_send_buffer::sptr udp_zero_copy_impl::get_send_buff(void){ - boost::uint32_t *buff_mem = new boost::uint32_t[2000/sizeof(boost::uint32_t)]; - return managed_send_buffer::sptr( - new managed_send_buffer_impl(boost::asio::buffer(buff_mem, 2000), _socket) - ); + return _send_buff; } /*********************************************************************** -- cgit v1.2.3 From 7facfba666046f305086f754c9cebe1406ea54ad Mon Sep 17 00:00:00 2001 From: Josh Blum Date: Mon, 17 May 2010 10:22:14 -0700 Subject: windows fix for setsockopt --- host/lib/transport/udp_zero_copy_asio.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'host/lib/transport/udp_zero_copy_asio.cpp') diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp index c26b39867..e97bbf31c 100644 --- a/host/lib/transport/udp_zero_copy_asio.cpp +++ b/host/lib/transport/udp_zero_copy_asio.cpp @@ -146,7 +146,7 @@ udp_zero_copy_impl::udp_zero_copy_impl(const std::string &addr, const std::strin UHD_ASSERT_THROW(setsockopt( _socket->native(), SOL_SOCKET, SO_RCVTIMEO, - (timeval *)&tv, sizeof(timeval) + (const char *)&tv, sizeof(timeval) ) == 0); } -- cgit v1.2.3 From a6f7b02ae69eb4b0755f2805922eeb06d977c1ee Mon Sep 17 00:00:00 2001 From: Josh Blum Date: Mon, 17 May 2010 11:01:42 -0700 Subject: automatic resize for small udp buffers --- host/lib/transport/udp_zero_copy_asio.cpp | 61 ++++++++++++++++++------------- 1 file changed, 35 insertions(+), 26 deletions(-) (limited to 'host/lib/transport/udp_zero_copy_asio.cpp') diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp index e97bbf31c..56ba391d3 100644 --- a/host/lib/transport/udp_zero_copy_asio.cpp +++ b/host/lib/transport/udp_zero_copy_asio.cpp @@ -89,6 +89,8 @@ static const size_t max_buff_size = 2000; //assume max size on send and recv class udp_zero_copy_impl : public udp_zero_copy{ public: + typedef boost::shared_ptr sptr; + //structors udp_zero_copy_impl(const std::string &addr, const std::string &port); ~udp_zero_copy_impl(void); @@ -97,18 +99,17 @@ public: managed_recv_buffer::sptr get_recv_buff(void); managed_send_buffer::sptr get_send_buff(void); - //resize - size_t resize_recv_buff(size_t num_bytes){ - boost::asio::socket_base::receive_buffer_size option(num_bytes); - _socket->set_option(option); + //manage buffer + template size_t get_buff_size(void){ + Opt option; _socket->get_option(option); return option.value(); } - size_t resize_send_buff(size_t num_bytes){ - boost::asio::socket_base::send_buffer_size option(num_bytes); + + template size_t resize_buff(size_t num_bytes){ + Opt option(num_bytes); _socket->set_option(option); - _socket->get_option(option); - return option.value(); + return get_buff_size(); } private: @@ -171,31 +172,39 @@ managed_send_buffer::sptr udp_zero_copy_impl::get_send_buff(void){ /*********************************************************************** * UDP zero copy make function **********************************************************************/ +template static inline void resize_buff_helper( + udp_zero_copy_impl::sptr udp_trans, + size_t target_size, + const std::string &name +){ + static const size_t min_buff_size = size_t(100e3); + + //resize the buffer if size was provided + if (target_size > 0){ + size_t actual_size = udp_trans->resize_buff(target_size); + if (target_size != actual_size) std::cout << boost::format( + "Target %s buffer size: %d\n" + "Actual %s byffer size: %d" + ) % name % target_size % name % actual_size << std::endl; + } + + //otherwise, ensure that the buffer is at least the minimum size + else if (udp_trans->get_buff_size() < min_buff_size){ + resize_buff_helper(udp_trans, min_buff_size, name); + } +} + udp_zero_copy::sptr udp_zero_copy::make( const std::string &addr, const std::string &port, size_t recv_buff_size, size_t send_buff_size ){ - boost::shared_ptr udp_trans(new udp_zero_copy_impl(addr, port)); - - //resize the recv buffer if size was provided - if (recv_buff_size > 0){ - size_t actual_bytes = udp_trans->resize_recv_buff(recv_buff_size); - if (recv_buff_size != actual_bytes) std::cout << boost::format( - "Target recv buffer size: %d\n" - "Actual recv byffer size: %d" - ) % recv_buff_size % actual_bytes << std::endl; - } + udp_zero_copy_impl::sptr udp_trans(new udp_zero_copy_impl(addr, port)); - //resize the send buffer if size was provided - if (send_buff_size > 0){ - size_t actual_bytes = udp_trans->resize_send_buff(send_buff_size); - if (send_buff_size != actual_bytes) std::cout << boost::format( - "Target send buffer size: %d\n" - "Actual send byffer size: %d" - ) % send_buff_size % actual_bytes << std::endl; - } + //call the helper to resize send and recv buffers + resize_buff_helper(udp_trans, recv_buff_size, "recv"); + resize_buff_helper (udp_trans, send_buff_size, "send"); return udp_trans; } -- cgit v1.2.3 From 4eff47a4b66eff61feffe6498b9ecebef94dc6b9 Mon Sep 17 00:00:00 2001 From: Josh Blum Date: Thu, 27 May 2010 14:10:50 -0700 Subject: Tweak with the udp and zero-copy transport. Eventually, the caller will hang onto a ring of managed buffers. --- host/include/uhd/transport/zero_copy.hpp | 5 ++-- host/lib/transport/udp_zero_copy_asio.cpp | 38 ++++++++++++++++--------------- host/lib/transport/vrt_packet_handler.hpp | 2 +- host/lib/usrp/usrp2/io_impl.cpp | 2 +- 4 files changed, 25 insertions(+), 22 deletions(-) (limited to 'host/lib/transport/udp_zero_copy_asio.cpp') diff --git a/host/include/uhd/transport/zero_copy.hpp b/host/include/uhd/transport/zero_copy.hpp index fdc5b141c..52c6d4143 100644 --- a/host/include/uhd/transport/zero_copy.hpp +++ b/host/include/uhd/transport/zero_copy.hpp @@ -35,11 +35,12 @@ public: typedef boost::shared_ptr sptr; /*! + * Managed recv buffer destructor: * Signal to the transport that we are done with the buffer. * This should be called to release the buffer to the transport. * After calling, the referenced memory should be considered invalid. */ - virtual void done(void) = 0; + virtual ~managed_recv_buffer(void){}; /*! * Get the size of the underlying buffer. @@ -81,7 +82,7 @@ public: * After calling, the referenced memory should be considered invalid. * \param num_bytes the number of bytes written into the buffer */ - virtual void done(size_t num_bytes) = 0; + virtual void commit(size_t num_bytes) = 0; /*! * Get the size of the underlying buffer. diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp index 56ba391d3..f8a222475 100644 --- a/host/lib/transport/udp_zero_copy_asio.cpp +++ b/host/lib/transport/udp_zero_copy_asio.cpp @@ -24,6 +24,13 @@ using namespace uhd::transport; +/*********************************************************************** + * Constants + **********************************************************************/ +static const size_t MIN_SOCK_BUFF_SIZE = size_t(100e3); +static const size_t MAX_DGRAM_SIZE = 2048; //assume max size on send and recv +static const double RECV_TIMEOUT = 0.1; // 100 ms + /*********************************************************************** * Managed receive buffer implementation for udp zero-copy asio: **********************************************************************/ @@ -34,11 +41,7 @@ public: } ~managed_recv_buffer_impl(void){ - /* NOP */ - } - - void done(void){ - /* NOP */ + delete [] this->cast(); } private: @@ -65,7 +68,7 @@ public: /* NOP */ } - void done(size_t num_bytes){ + void commit(size_t num_bytes){ _socket->send(boost::asio::buffer(_buff, num_bytes)); } @@ -85,8 +88,6 @@ private: * However, it is not a true zero copy implementation as each * send and recv requires a copy operation to/from userspace. **********************************************************************/ -static const size_t max_buff_size = 2000; //assume max size on send and recv - class udp_zero_copy_impl : public udp_zero_copy{ public: typedef boost::shared_ptr sptr; @@ -117,7 +118,7 @@ private: boost::asio::io_service _io_service; //send and recv buffer memory (allocated once) - boost::uint8_t _send_mem[max_buff_size], _recv_mem[max_buff_size]; + boost::uint8_t _send_mem[MIN_SOCK_BUFF_SIZE]; managed_send_buffer::sptr _send_buff; }; @@ -137,13 +138,13 @@ udp_zero_copy_impl::udp_zero_copy_impl(const std::string &addr, const std::strin // create the managed send buff (just once) _send_buff = managed_send_buffer::sptr(new managed_send_buffer_impl( - boost::asio::buffer(_send_mem, max_buff_size), _socket + boost::asio::buffer(_send_mem, MIN_SOCK_BUFF_SIZE), _socket )); // set recv timeout timeval tv; tv.tv_sec = 0; - tv.tv_usec = 100*1000; //100 ms + tv.tv_usec = size_t(RECV_TIMEOUT*1e6); UHD_ASSERT_THROW(setsockopt( _socket->native(), SOL_SOCKET, SO_RCVTIMEO, @@ -156,17 +157,20 @@ udp_zero_copy_impl::~udp_zero_copy_impl(void){ } managed_recv_buffer::sptr udp_zero_copy_impl::get_recv_buff(void){ + //allocate memory + boost::uint8_t *recv_mem = new boost::uint8_t[MAX_DGRAM_SIZE]; + //call recv() with timeout option - size_t num_bytes = _socket->receive(boost::asio::buffer(_recv_mem, max_buff_size)); + size_t num_bytes = _socket->receive(boost::asio::buffer(recv_mem, MIN_SOCK_BUFF_SIZE)); //create a new managed buffer to house the data return managed_recv_buffer::sptr( - new managed_recv_buffer_impl(boost::asio::buffer(_recv_mem, num_bytes)) + new managed_recv_buffer_impl(boost::asio::buffer(recv_mem, num_bytes)) ); } managed_send_buffer::sptr udp_zero_copy_impl::get_send_buff(void){ - return _send_buff; + return _send_buff; //FIXME there is only ever one send buff, we assume that the caller doesnt hang onto these } /*********************************************************************** @@ -177,8 +181,6 @@ template static inline void resize_buff_helper( size_t target_size, const std::string &name ){ - static const size_t min_buff_size = size_t(100e3); - //resize the buffer if size was provided if (target_size > 0){ size_t actual_size = udp_trans->resize_buff(target_size); @@ -189,8 +191,8 @@ template static inline void resize_buff_helper( } //otherwise, ensure that the buffer is at least the minimum size - else if (udp_trans->get_buff_size() < min_buff_size){ - resize_buff_helper(udp_trans, min_buff_size, name); + else if (udp_trans->get_buff_size() < MIN_SOCK_BUFF_SIZE){ + resize_buff_helper(udp_trans, MIN_SOCK_BUFF_SIZE, name); } } diff --git a/host/lib/transport/vrt_packet_handler.hpp b/host/lib/transport/vrt_packet_handler.hpp index 81420b39e..e64e3383d 100644 --- a/host/lib/transport/vrt_packet_handler.hpp +++ b/host/lib/transport/vrt_packet_handler.hpp @@ -284,7 +284,7 @@ namespace vrt_packet_handler{ send_cb(send_buff); //callback after memory filled //commit the samples to the zero-copy interface - send_buff->done(num_packet_words32*sizeof(boost::uint32_t)); + send_buff->commit(num_packet_words32*sizeof(boost::uint32_t)); } /******************************************************************* diff --git a/host/lib/usrp/usrp2/io_impl.cpp b/host/lib/usrp/usrp2/io_impl.cpp index b6ab919e7..79b18fb63 100644 --- a/host/lib/usrp/usrp2/io_impl.cpp +++ b/host/lib/usrp/usrp2/io_impl.cpp @@ -45,7 +45,7 @@ void usrp2_impl::io_init(void){ managed_send_buffer::sptr send_buff = _data_transport->get_send_buff(); boost::uint32_t data = htonl(USRP2_INVALID_VRT_HEADER); memcpy(send_buff->cast(), &data, sizeof(data)); - send_buff->done(sizeof(data)); + send_buff->commit(sizeof(data)); //setup RX DSP regs std::cout << "RX samples per packet: " << get_max_recv_samps_per_packet() << std::endl; -- cgit v1.2.3