diff options
author | Ciro Nishiguchi <ciro.nishiguchi@ni.com> | 2019-03-25 13:11:17 -0500 |
---|---|---|
committer | Martin Braun <martin.braun@ettus.com> | 2019-11-26 11:49:20 -0800 |
commit | 1e65500d791461be9aa7a2d2646d463f536f49e3 (patch) | |
tree | a603d8b56d41012e39f56531f017d0ebb5dedebd /host/lib/transport/udp_zero_copy.cpp | |
parent | 6f3c201a802079a3d565c5f14e1222743097b459 (diff) | |
download | uhd-1e65500d791461be9aa7a2d2646d463f536f49e3.tar.gz uhd-1e65500d791461be9aa7a2d2646d463f536f49e3.tar.bz2 uhd-1e65500d791461be9aa7a2d2646d463f536f49e3.zip |
uhd: add udp boost asio implementation of transport interface
Diffstat (limited to 'host/lib/transport/udp_zero_copy.cpp')
-rw-r--r-- | host/lib/transport/udp_zero_copy.cpp | 159 |
1 files changed, 40 insertions, 119 deletions
diff --git a/host/lib/transport/udp_zero_copy.cpp b/host/lib/transport/udp_zero_copy.cpp index e3df80da6..44df9526d 100644 --- a/host/lib/transport/udp_zero_copy.cpp +++ b/host/lib/transport/udp_zero_copy.cpp @@ -5,11 +5,10 @@ // SPDX-License-Identifier: GPL-3.0-or-later // -#include "udp_common.hpp" #include <uhd/transport/buffer_pool.hpp> -#include <uhd/transport/udp_simple.hpp> //mtu #include <uhd/transport/udp_zero_copy.hpp> #include <uhd/utils/log.hpp> +#include <uhdlib/transport/udp_common.hpp> #include <uhdlib/utils/atomic.hpp> #include <boost/format.hpp> #include <boost/make_shared.hpp> @@ -19,12 +18,8 @@ using namespace uhd; using namespace uhd::transport; -namespace asio = boost::asio; -constexpr size_t UDP_ZERO_COPY_DEFAULT_NUM_FRAMES = 1; -constexpr size_t UDP_ZERO_COPY_DEFAULT_FRAME_SIZE = - 1472; // Based on common 1500 byte MTU for 1GbE. -constexpr size_t UDP_ZERO_COPY_DEFAULT_BUFF_SIZE = - 2500000; // 20ms of data for 1GbE link (in bytes) +namespace asio = boost::asio; + /*********************************************************************** * Check registry for correct fast-path setting (windows only) **********************************************************************/ @@ -80,22 +75,11 @@ public: if (not _claimer.claim_with_wait(timeout)) return sptr(); -#ifdef MSG_DONTWAIT // try a non-blocking recv() if supported - _len = ::recv(_sock_fd, (char*)_mem, _frame_size, MSG_DONTWAIT); - if (_len > 0) { - index++; // advances the caller's buffer - return make(this, _mem, size_t(_len)); - } -#endif + const int32_t timeout_ms = static_cast<int32_t>(timeout * 1000); + _len = recv_udp_packet(_sock_fd, _mem, _frame_size, timeout_ms); - if (wait_for_recv_ready(_sock_fd, timeout)) { - _len = ::recv(_sock_fd, (char*)_mem, _frame_size, 0); - if (_len == 0) - throw uhd::io_error("socket closed"); - if (_len < 0) - throw uhd::io_error( - str(boost::format("recv error on socket: %s") % strerror(errno))); - index++; // advances the caller's buffer + if (_len > 0) { + index++; return make(this, _mem, size_t(_len)); } @@ -125,23 +109,7 @@ public: void release(void) { - // Retry logic because send may fail with ENOBUFS. - // This is known to occur at least on some OSX systems. - // But it should be safe to always check for the error. - while (true) { - const ssize_t ret = ::send(_sock_fd, (const char*)_mem, size(), 0); - if (ret == ssize_t(size())) - break; - if (ret == -1 and errno == ENOBUFS) { - std::this_thread::sleep_for(std::chrono::microseconds(1)); - continue; // try to send again - } - if (ret == -1) { - throw uhd::io_error( - str(boost::format("send error on socket: %s") % strerror(errno))); - } - UHD_ASSERT_THROW(ret == ssize_t(size())); - } + send_udp_packet(_sock_fd, _mem, size()); _claimer.release(); } @@ -193,15 +161,7 @@ public: check_registry_for_fast_send_threshold(this->get_send_frame_size()); #endif /*CHECK_REG_SEND_THRESH*/ - // resolve the address - asio::ip::udp::resolver resolver(_io_service); - asio::ip::udp::resolver::query query(asio::ip::udp::v4(), addr, port); - asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query); - - // create, open, and connect the socket - _socket = socket_sptr(new asio::ip::udp::socket(_io_service)); - _socket->open(asio::ip::udp::v4()); - _socket->connect(receiver_endpoint); + _socket = open_udp_socket(addr, port, _io_service); _sock_fd = _socket->native_handle(); UHD_LOGGER_TRACE("UDP") << boost::format("Local UDP socket endpoint: %s:%s") @@ -220,24 +180,16 @@ public: } } - // get size for internal socket buffer - template <typename Opt> size_t get_buff_size(void) const + size_t resize_send_socket_buffer(size_t num_bytes) { - Opt option; - _socket->get_option(option); - return option.value(); + return resize_udp_socket_buffer<asio::socket_base::send_buffer_size>( + _socket, num_bytes); } - // set size for internal socket buffer - template <typename Opt> size_t resize_buff(size_t num_bytes) + size_t resize_recv_socket_buffer(size_t num_bytes) { -#if defined(UHD_PLATFORM_MACOS) || defined(UHD_PLATFORM_BSD) - // limit buffer resize on macos or it will error - num_bytes = std::min(num_bytes, MAX_BUFF_SIZE_ETH_MACOS); -#endif - Opt option(num_bytes); - _socket->set_option(option); - return get_buff_size<Opt>(); + return resize_udp_socket_buffer<asio::socket_base::receive_buffer_size>( + _socket, num_bytes); } /******************************************************************* @@ -308,37 +260,6 @@ private: /*********************************************************************** * UDP zero copy make function **********************************************************************/ -template <typename Opt> -static size_t resize_buff_helper(udp_zero_copy_asio_impl::sptr udp_trans, - const size_t target_size, - const std::string& name) -{ - size_t actual_size = 0; - std::string help_message; -#if defined(UHD_PLATFORM_LINUX) - help_message = str(boost::format("Please run: sudo sysctl -w net.core.%smem_max=%d") - % ((name == "recv") ? "r" : "w") % target_size); -#endif /*defined(UHD_PLATFORM_LINUX)*/ - - // resize the buffer if size was provided - if (target_size > 0) { - actual_size = udp_trans->resize_buff<Opt>(target_size); - UHD_LOGGER_TRACE("UDP") - << boost::format("Target/actual %s sock buff size: %d/%d bytes") % name - % target_size % actual_size; - if (actual_size < target_size) - UHD_LOGGER_WARNING("UDP") - << boost::format( - "The %s buffer could not be resized sufficiently.\n" - "Target sock buff size: %d bytes.\n" - "Actual sock buff size: %d bytes.\n" - "See the transport application notes on buffer resizing.\n%s") - % name % target_size % actual_size % help_message; - } - - return actual_size; -} - udp_zero_copy::sptr udp_zero_copy::make(const std::string& addr, const std::string& port, const zero_copy_xport_params& default_buff_args, @@ -362,26 +283,24 @@ udp_zero_copy::sptr udp_zero_copy::make(const std::string& addr, size_t(hints.cast<double>("send_buff_size", default_buff_args.send_buff_size)); if (xport_params.num_recv_frames == 0) { - UHD_LOG_TRACE("UDP", - "Default value for num_recv_frames: " << UDP_ZERO_COPY_DEFAULT_NUM_FRAMES); - xport_params.num_recv_frames = UDP_ZERO_COPY_DEFAULT_NUM_FRAMES; + UHD_LOG_TRACE( + "UDP", "Default value for num_recv_frames: " << UDP_DEFAULT_NUM_FRAMES); + xport_params.num_recv_frames = UDP_DEFAULT_NUM_FRAMES; } if (xport_params.num_send_frames == 0) { - UHD_LOG_TRACE("UDP", - "Default value for no num_send_frames: " << UDP_ZERO_COPY_DEFAULT_NUM_FRAMES); - xport_params.num_send_frames = UDP_ZERO_COPY_DEFAULT_NUM_FRAMES; + UHD_LOG_TRACE( + "UDP", "Default value for no num_send_frames: " << UDP_DEFAULT_NUM_FRAMES); + xport_params.num_send_frames = UDP_DEFAULT_NUM_FRAMES; } if (xport_params.recv_frame_size == 0) { UHD_LOG_TRACE("UDP", - "Using default value for recv_frame_size: " - << UDP_ZERO_COPY_DEFAULT_FRAME_SIZE); - xport_params.recv_frame_size = UDP_ZERO_COPY_DEFAULT_FRAME_SIZE; + "Using default value for recv_frame_size: " << UDP_DEFAULT_FRAME_SIZE); + xport_params.recv_frame_size = UDP_DEFAULT_FRAME_SIZE; } if (xport_params.send_frame_size == 0) { - UHD_LOG_TRACE("UDP", - "Using default value for send_frame_size, " - << UDP_ZERO_COPY_DEFAULT_FRAME_SIZE); - xport_params.send_frame_size = UDP_ZERO_COPY_DEFAULT_FRAME_SIZE; + UHD_LOG_TRACE( + "UDP", "Using default value for send_frame_size, " << UDP_DEFAULT_FRAME_SIZE); + xport_params.send_frame_size = UDP_DEFAULT_FRAME_SIZE; } UHD_LOG_TRACE("UDP", @@ -391,15 +310,15 @@ udp_zero_copy::sptr udp_zero_copy::make(const std::string& addr, if (xport_params.recv_buff_size == 0) { UHD_LOG_TRACE("UDP", "Using default value for recv_buff_size"); - xport_params.recv_buff_size = std::max(UDP_ZERO_COPY_DEFAULT_BUFF_SIZE, - xport_params.num_recv_frames * MAX_ETHERNET_MTU); + xport_params.recv_buff_size = std::max( + UDP_DEFAULT_BUFF_SIZE, xport_params.num_recv_frames * MAX_ETHERNET_MTU); UHD_LOG_TRACE("UDP", "Using default value for recv_buff_size" << xport_params.recv_buff_size); } if (xport_params.send_buff_size == 0) { UHD_LOG_TRACE("UDP", "default_buff_args has no send_buff_size"); - xport_params.send_buff_size = std::max(UDP_ZERO_COPY_DEFAULT_BUFF_SIZE, - xport_params.num_send_frames * MAX_ETHERNET_MTU); + xport_params.send_buff_size = std::max( + UDP_DEFAULT_BUFF_SIZE, xport_params.num_send_frames * MAX_ETHERNET_MTU); } #if defined(UHD_PLATFORM_MACOS) || defined(UHD_PLATFORM_BSD) @@ -419,18 +338,20 @@ udp_zero_copy::sptr udp_zero_copy::make(const std::string& addr, new udp_zero_copy_asio_impl(addr, port, xport_params)); // call the helper to resize send and recv buffers - buff_params_out.recv_buff_size = - resize_buff_helper<asio::socket_base::receive_buffer_size>( - udp_trans, xport_params.recv_buff_size, "recv"); - buff_params_out.send_buff_size = - resize_buff_helper<asio::socket_base::send_buffer_size>( - udp_trans, xport_params.send_buff_size, "send"); + buff_params_out.recv_buff_size = resize_udp_socket_buffer_with_warning( + [udp_trans](size_t size) { return udp_trans->resize_recv_socket_buffer(size); }, + xport_params.recv_buff_size, + "recv"); + buff_params_out.send_buff_size = resize_udp_socket_buffer_with_warning( + [udp_trans](size_t size) { return udp_trans->resize_send_socket_buffer(size); }, + xport_params.send_buff_size, + "send"); if (buff_params_out.recv_buff_size < xport_params.num_recv_frames * MAX_ETHERNET_MTU) { UHD_LOG_WARNING("UDP", "The current recv_buff_size of " - << buff_params_out.recv_buff_size + << xport_params.recv_buff_size << " is less than the minimum recommended size of " << xport_params.num_recv_frames * MAX_ETHERNET_MTU << " and may result in dropped packets on some NICs"); @@ -439,7 +360,7 @@ udp_zero_copy::sptr udp_zero_copy::make(const std::string& addr, < xport_params.num_send_frames * MAX_ETHERNET_MTU) { UHD_LOG_WARNING("UDP", "The current send_buff_size of " - << buff_params_out.send_buff_size + << xport_params.send_buff_size << " is less than the minimum recommended size of " << xport_params.num_send_frames * MAX_ETHERNET_MTU << " and may result in dropped packets on some NICs"); |