diff options
Diffstat (limited to 'host')
| -rw-r--r-- | host/lib/include/uhdlib/transport/udp_boost_asio_link.hpp | 115 | ||||
| -rw-r--r-- | host/lib/include/uhdlib/transport/udp_common.hpp | 200 | ||||
| -rw-r--r-- | host/lib/transport/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | host/lib/transport/tcp_zero_copy.cpp | 5 | ||||
| -rw-r--r-- | host/lib/transport/udp_boost_asio_link.cpp | 126 | ||||
| -rw-r--r-- | host/lib/transport/udp_common.hpp | 71 | ||||
| -rw-r--r-- | host/lib/transport/udp_simple.cpp | 6 | ||||
| -rw-r--r-- | host/lib/transport/udp_wsa_zero_copy.cpp | 2 | ||||
| -rw-r--r-- | host/lib/transport/udp_zero_copy.cpp | 159 | 
9 files changed, 491 insertions, 195 deletions
diff --git a/host/lib/include/uhdlib/transport/udp_boost_asio_link.hpp b/host/lib/include/uhdlib/transport/udp_boost_asio_link.hpp new file mode 100644 index 000000000..2e6f731c9 --- /dev/null +++ b/host/lib/include/uhdlib/transport/udp_boost_asio_link.hpp @@ -0,0 +1,115 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_UHD_TRANSPORT_UDP_BOOST_ASIO_LINK_HPP +#define INCLUDED_UHD_TRANSPORT_UDP_BOOST_ASIO_LINK_HPP + +#include <uhd/config.hpp> +#include <uhd/transport/buffer_pool.hpp> +#include <uhd/types/device_addr.hpp> +#include <uhdlib/transport/link_base.hpp> +#include <uhdlib/transport/links.hpp> +#include <uhdlib/transport/udp_common.hpp> +#include <boost/asio.hpp> +#include <memory> +#include <vector> + +namespace uhd { namespace transport { + +class udp_boost_asio_frame_buff : public frame_buff +{ +public: +    udp_boost_asio_frame_buff(void* mem) +    { +        _data = mem; +    } +}; + +class udp_boost_asio_link : public recv_link_base<udp_boost_asio_link>, +                            public send_link_base<udp_boost_asio_link> +{ +public: +    using sptr = std::shared_ptr<udp_boost_asio_link>; + +    /*! +     * Make a new udp link. +     * +     * \param addr a string representing the destination address +     * \param port a string representing the destination port +     * \param params Values for frame sizes, num frames, and buffer sizes +     * \param[out] recv_socket_buff_size Returns the recv socket buffer size +     * \param[out] send_socket_buff_size Returns the send socket buffer size +     */ +    static sptr make(const std::string& addr, +        const std::string& port, +        const link_params_t& params, +        size_t& recv_socket_buff_size, +        size_t& send_socket_buff_size); + +    /*! Return the local port of the UDP connection. Port is in host byte order. +     * +     * \returns Port number or 0 if port number couldn't be identified. +     */ +    uint16_t get_local_port() const; + +    /*! Return the local IP address of the UDP connection as a dotted string. +     * +     * \returns IP address as a string or empty string if the IP address could +     *          not be identified. +     */ +    std::string get_local_addr() const; + +private: +    using recv_link_base_t = recv_link_base<udp_boost_asio_link>; +    using send_link_base_t = send_link_base<udp_boost_asio_link>; + +    // Friend declarations to allow base classes to call private methods +    friend recv_link_base_t; +    friend send_link_base_t; + +    udp_boost_asio_link( +        const std::string& addr, const std::string& port, const link_params_t& params); + +    size_t resize_recv_socket_buffer(size_t num_bytes); +    size_t resize_send_socket_buffer(size_t num_bytes); + +    // Methods called by recv_link_base +    UHD_FORCE_INLINE size_t get_recv_buff_derived(frame_buff& buff, int32_t timeout_ms) +    { +        return recv_udp_packet(_sock_fd, buff.data(), get_recv_frame_size(), timeout_ms); +    } + +    UHD_FORCE_INLINE void release_recv_buff_derived(frame_buff& /*buff*/) +    { +        // No-op +    } + +    // Methods called by send_link_base +    UHD_FORCE_INLINE bool get_send_buff_derived( +        frame_buff& /*buff*/, int32_t /*timeout_ms*/) +    { +        return true; +    } + +    UHD_FORCE_INLINE void release_send_buff_derived(frame_buff& buff) +    { +        send_udp_packet(_sock_fd, buff.data(), buff.packet_size()); +    } + +    buffer_pool::sptr _recv_memory_pool; +    buffer_pool::sptr _send_memory_pool; + +    std::vector<udp_boost_asio_frame_buff> _recv_buffs; +    std::vector<udp_boost_asio_frame_buff> _send_buffs; + +    boost::asio::io_service _io_service; +    std::shared_ptr<boost::asio::ip::udp::socket> _socket; +    int _sock_fd; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_UHD_TRANSPORT_UDP_BOOST_ASIO_LINK_HPP */ diff --git a/host/lib/include/uhdlib/transport/udp_common.hpp b/host/lib/include/uhdlib/transport/udp_common.hpp new file mode 100644 index 000000000..5f5a18c27 --- /dev/null +++ b/host/lib/include/uhdlib/transport/udp_common.hpp @@ -0,0 +1,200 @@ +// +// Copyright 2011 Ettus Research LLC +// Copyright 2018 Ettus Research, a National Instruments Company +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_TRANSPORT_UDP_COMMON_HPP +#define INCLUDED_TRANSPORT_UDP_COMMON_HPP + +#include <uhd/config.hpp> +#include <uhd/exception.hpp> +#include <uhd/utils/log.hpp> +#include <boost/asio.hpp> +#include <boost/format.hpp> +#include <thread> + +namespace uhd { namespace transport { + +// Jumbo frames can be up to 9600 bytes; +constexpr size_t MAX_ETHERNET_MTU = 9600; + +constexpr size_t UDP_DEFAULT_NUM_FRAMES = 1; + +// Based on common 1500 byte MTU for 1GbE +constexpr size_t UDP_DEFAULT_FRAME_SIZE = 1472; + +// 20ms of data for 1GbE link (in bytes) +constexpr size_t UDP_DEFAULT_BUFF_SIZE = 2500000; + + +#if defined(UHD_PLATFORM_MACOS) || defined(UHD_PLATFORM_BSD) +// MacOS limits socket buffer size to 1 Mib +static const size_t MAX_BUFF_SIZE_ETH_MACOS = 0x100000; // 1Mib +#endif + +typedef std::shared_ptr<boost::asio::ip::udp::socket> socket_sptr; + +/*! + * Wait for the socket to become ready for a receive operation. + * \param sock_fd the open socket file descriptor + * \param timeout_ms the timeout duration in milliseconds + * \return true when the socket is ready for receive + */ +UHD_INLINE bool wait_for_recv_ready(int sock_fd, int32_t timeout_ms) +{ +#ifdef UHD_PLATFORM_WIN32 // select is more portable than poll unfortunately +    // setup timeval for timeout +    timeval tv; +    // If the tv_usec > 1 second on some platforms, select will +    // error EINVAL: An invalid timeout interval was specified. +    tv.tv_sec  = int(timeout_ms / 1000); +    tv.tv_usec = int(timeout_ms * 1000) % 1000000; + +    // setup rset for timeout +    fd_set rset; +    FD_ZERO(&rset); +    FD_SET(sock_fd, &rset); + +// http://www.gnu.org/s/hello/manual/libc/Interrupted-Primitives.html +// This macro is provided with gcc to properly deal with EINTR. +// If not provided, define an empty macro, assume that is OK +#    ifndef TEMP_FAILURE_RETRY +#        define TEMP_FAILURE_RETRY(x) (x) +#    endif + +    // call select with timeout on receive socket +    return TEMP_FAILURE_RETRY(::select(sock_fd + 1, &rset, NULL, NULL, &tv)) > 0; +#else +    pollfd pfd_read; +    pfd_read.fd     = sock_fd; +    pfd_read.events = POLLIN; + +    // call poll with timeout on receive socket +    return ::poll(&pfd_read, 1, (int)timeout_ms) > 0; +#endif +} + +UHD_INLINE socket_sptr open_udp_socket( +    const std::string& addr, const std::string& port, boost::asio::io_service& io_service) +{ +    using udp = boost::asio::ip::udp; + +    // resolve the address +    udp::resolver resolver(io_service); +    udp::resolver::query query(udp::v4(), addr, port); +    udp::endpoint receiver_endpoint = *resolver.resolve(query); + +    // create, open, and connect the socket +    socket_sptr socket = socket_sptr(new udp::socket(io_service)); +    socket->open(udp::v4()); +    socket->connect(receiver_endpoint); + +    return socket; +} + +UHD_INLINE size_t recv_udp_packet( +    int sock_fd, void* mem, size_t frame_size, int32_t timeout_ms) +{ +    ssize_t len; + +#ifdef MSG_DONTWAIT // try a non-blocking recv() if supported +    len = ::recv(sock_fd, (char*)mem, frame_size, MSG_DONTWAIT); +    if (len > 0) { +        return len; +    } +#endif + +    if (wait_for_recv_ready(sock_fd, timeout_ms)) { +        len = ::recv(sock_fd, (char*)mem, frame_size, 0); +        if (len == 0) { +            throw uhd::io_error("socket closed"); +        } +        if (len < 0) { +            throw uhd::io_error( +                str(boost::format("recv error on socket: %s") % strerror(errno))); +        } +        return len; +    } + +    return 0; // timeout +} + +UHD_INLINE void send_udp_packet(int sock_fd, void* mem, size_t len) +{ +    // Retry logic because send may fail with ENOBUFS. +    // This is known to occur at least on some OSX systems. +    // But it should be safe to always check for the error. +    while (true) { +        const ssize_t ret = ::send(sock_fd, (const char*)mem, len, 0); +        if (ret == ssize_t(len)) +            break; +        if (ret == -1 and errno == ENOBUFS) { +            std::this_thread::sleep_for(std::chrono::microseconds(1)); +            continue; // try to send again +        } +        if (ret == -1) { +            throw uhd::io_error( +                str(boost::format("send error on socket: %s") % strerror(errno))); +        } +        UHD_ASSERT_THROW(ret == ssize_t(len)); +    } +} + +template <typename Opt> +size_t get_udp_socket_buffer_size(socket_sptr socket) +{ +    Opt option; +    socket->get_option(option); +    return option.value(); +} + +template <typename Opt> +size_t resize_udp_socket_buffer(socket_sptr socket, size_t num_bytes) +{ +#if defined(UHD_PLATFORM_MACOS) || defined(UHD_PLATFORM_BSD) +    // limit buffer resize on macos or it will error +    num_bytes = std::min(num_bytes, MAX_BUFF_SIZE_ETH_MACOS); +#endif +    Opt option(num_bytes); +    socket->set_option(option); +    return get_udp_socket_buffer_size<Opt>(socket); +} + +UHD_INLINE size_t resize_udp_socket_buffer_with_warning( +    std::function<size_t(size_t)> resize_fn, +    const size_t target_size, +    const std::string& name) +{ +    size_t actual_size = 0; +    std::string help_message; +#if defined(UHD_PLATFORM_LINUX) +    help_message = str(boost::format("Please run: sudo sysctl -w net.core.%smem_max=%d") +                       % ((name == "recv") ? "r" : "w") % target_size); +#endif /*defined(UHD_PLATFORM_LINUX)*/ + +    // resize the buffer if size was provided +    if (target_size > 0) { +        actual_size = resize_fn(target_size); + +        UHD_LOGGER_TRACE("UDP") +            << boost::format("Target/actual %s sock buff size: %d/%d bytes") % name +                   % target_size % actual_size; +        if (actual_size < target_size) +            UHD_LOGGER_WARNING("UDP") +                << boost::format( +                       "The %s buffer could not be resized sufficiently.\n" +                       "Target sock buff size: %d bytes.\n" +                       "Actual sock buff size: %d bytes.\n" +                       "See the transport application notes on buffer resizing.\n%s") +                       % name % target_size % actual_size % help_message; +    } + +    return actual_size; +} + + +}} // namespace uhd::transport + +#endif /* INCLUDED_TRANSPORT_UDP_COMMON_HPP */ diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt index a9663c89a..003beeee4 100644 --- a/host/lib/transport/CMakeLists.txt +++ b/host/lib/transport/CMakeLists.txt @@ -81,8 +81,10 @@ set_source_files_properties(  ########################################################################  if(WIN32)      LIBUHD_APPEND_SOURCES(${CMAKE_CURRENT_SOURCE_DIR}/udp_wsa_zero_copy.cpp) +    LIBUHD_APPEND_SOURCES(${CMAKE_CURRENT_SOURCE_DIR}/udp_boost_asio_link.cpp)  else()      LIBUHD_APPEND_SOURCES(${CMAKE_CURRENT_SOURCE_DIR}/udp_zero_copy.cpp) +    LIBUHD_APPEND_SOURCES(${CMAKE_CURRENT_SOURCE_DIR}/udp_boost_asio_link.cpp)  endif()  #On windows, the boost asio implementation uses the winsock2 library. diff --git a/host/lib/transport/tcp_zero_copy.cpp b/host/lib/transport/tcp_zero_copy.cpp index 5cb713427..01bca900f 100644 --- a/host/lib/transport/tcp_zero_copy.cpp +++ b/host/lib/transport/tcp_zero_copy.cpp @@ -5,11 +5,11 @@  // SPDX-License-Identifier: GPL-3.0-or-later  // -#include "udp_common.hpp"  #include <uhd/transport/buffer_pool.hpp>  #include <uhd/transport/tcp_zero_copy.hpp>  #include <uhd/utils/log.hpp>  #include <uhdlib/utils/atomic.hpp> +#include <uhdlib/transport/udp_common.hpp>  #include <boost/format.hpp>  #include <boost/make_shared.hpp>  #include <chrono> @@ -52,8 +52,9 @@ public:              return make(this, _mem, size_t(_len));          }  #endif +        const int32_t timeout_ms = static_cast<int32_t>(timeout * 1000); -        if (wait_for_recv_ready(_sock_fd, timeout)) { +        if (wait_for_recv_ready(_sock_fd, timeout_ms)) {              _len = ::recv(_sock_fd, (char*)_mem, _frame_size, 0);              index++; // advances the caller's buffer              return make(this, _mem, size_t(_len)); diff --git a/host/lib/transport/udp_boost_asio_link.cpp b/host/lib/transport/udp_boost_asio_link.cpp new file mode 100644 index 000000000..95d68ba91 --- /dev/null +++ b/host/lib/transport/udp_boost_asio_link.cpp @@ -0,0 +1,126 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include <uhd/utils/log.hpp> +#include <uhdlib/transport/udp_boost_asio_link.hpp> +#include <boost/format.hpp> + +using namespace uhd::transport; + +namespace asio = boost::asio; + +udp_boost_asio_link::udp_boost_asio_link( +    const std::string& addr, const std::string& port, const link_params_t& params) +    : recv_link_base_t(params.num_recv_frames, params.recv_frame_size) +    , send_link_base_t(params.num_send_frames, params.send_frame_size) +    , _recv_memory_pool(buffer_pool::make(params.num_recv_frames, params.recv_frame_size)) +    , _send_memory_pool(buffer_pool::make(params.num_send_frames, params.send_frame_size)) +{ +    for (size_t i = 0; i < params.num_recv_frames; i++) { +        _recv_buffs.push_back(udp_boost_asio_frame_buff(_recv_memory_pool->at(i))); +    } + +    for (size_t i = 0; i < params.num_send_frames; i++) { +        _send_buffs.push_back(udp_boost_asio_frame_buff(_send_memory_pool->at(i))); +    } + +    for (auto& buff : _recv_buffs) { +        recv_link_base_t::preload_free_buff(&buff); +    } + +    for (auto& buff : _send_buffs) { +        send_link_base_t::preload_free_buff(&buff); +    } + +    // create, open, and connect the socket +    _socket  = open_udp_socket(addr, port, _io_service); +    _sock_fd = _socket->native_handle(); + +    UHD_LOGGER_TRACE("UDP") +        << boost::format("Created UDP link to %s:%s") % addr % port; +    UHD_LOGGER_TRACE("UDP") << boost::format("Local UDP socket endpoint: %s:%s") +        % get_local_addr() % get_local_port(); +} + +uint16_t udp_boost_asio_link::get_local_port() const +{ +    return _socket->local_endpoint().port(); +} + +std::string udp_boost_asio_link::get_local_addr() const +{ +    return _socket->local_endpoint().address().to_string(); +} + +size_t udp_boost_asio_link::resize_recv_socket_buffer(size_t num_bytes) +{ +    return resize_udp_socket_buffer<asio::socket_base::receive_buffer_size>( +        _socket, num_bytes); +} + +size_t udp_boost_asio_link::resize_send_socket_buffer(size_t num_bytes) +{ +    return resize_udp_socket_buffer<asio::socket_base::send_buffer_size>( +        _socket, num_bytes); +} + +udp_boost_asio_link::sptr udp_boost_asio_link::make(const std::string& addr, +    const std::string& port, +    const link_params_t& params, +    size_t& recv_socket_buff_size, +    size_t& send_socket_buff_size) +{ +    UHD_ASSERT_THROW(params.num_recv_frames != 0); +    UHD_ASSERT_THROW(params.num_send_frames != 0); +    UHD_ASSERT_THROW(params.recv_frame_size != 0); +    UHD_ASSERT_THROW(params.send_frame_size != 0); +    UHD_ASSERT_THROW(params.recv_buff_size != 0); +    UHD_ASSERT_THROW(params.send_buff_size != 0); + +#if defined(UHD_PLATFORM_MACOS) || defined(UHD_PLATFORM_BSD) +    // limit buffer size on macos to avoid the warning issued by +    // resize_buff_helper +    if (params.recv_buff_size > MAX_BUFF_SIZE_ETH_MACOS) { +        params.recv_buff_size = MAX_BUFF_SIZE_ETH_MACOS; +    } +    if (params.send_buff_size > MAX_BUFF_SIZE_ETH_MACOS) { +        params.send_buff_size = MAX_BUFF_SIZE_ETH_MACOS; +    } +#endif + +    udp_boost_asio_link::sptr link( +        new udp_boost_asio_link(addr, port, params)); + +    // call the helper to resize send and recv buffers + +    recv_socket_buff_size = resize_udp_socket_buffer_with_warning( +        [link](size_t size) { return link->resize_recv_socket_buffer(size); }, +        params.recv_buff_size, +        "recv"); +    send_socket_buff_size = resize_udp_socket_buffer_with_warning( +        [link](size_t size) { return link->resize_send_socket_buffer(size); }, +        params.send_buff_size, +        "send"); + +    if (recv_socket_buff_size < params.num_recv_frames * MAX_ETHERNET_MTU) { +        UHD_LOG_WARNING("UDP", +            "The current recv_buff_size of " +                << params.recv_buff_size +                << " is less than the minimum recommended size of " +                << params.num_recv_frames * MAX_ETHERNET_MTU +                << " and may result in dropped packets on some NICs"); +    } +    if (send_socket_buff_size < params.num_send_frames * MAX_ETHERNET_MTU) { +        UHD_LOG_WARNING("UDP", +            "The current send_buff_size of " +                << params.send_buff_size +                << " is less than the minimum recommended size of " +                << params.num_send_frames * MAX_ETHERNET_MTU +                << " and may result in dropped packets on some NICs"); +    } + +    return link; +} diff --git a/host/lib/transport/udp_common.hpp b/host/lib/transport/udp_common.hpp deleted file mode 100644 index f320e3d85..000000000 --- a/host/lib/transport/udp_common.hpp +++ /dev/null @@ -1,71 +0,0 @@ -// -// Copyright 2011 Ettus Research LLC -// Copyright 2018 Ettus Research, a National Instruments Company -// -// SPDX-License-Identifier: GPL-3.0-or-later -// - -#ifndef INCLUDED_LIBUHD_TRANSPORT_VRT_PACKET_HANDLER_HPP -#define INCLUDED_LIBUHD_TRANSPORT_VRT_PACKET_HANDLER_HPP - -#include <uhd/config.hpp> -#include <boost/asio.hpp> - -namespace uhd { namespace transport { - -// Jumbo frames can be up to 9600 bytes; -static const size_t MAX_ETHERNET_MTU = 9600; - -#if defined(UHD_PLATFORM_MACOS) || defined(UHD_PLATFORM_BSD) -// MacOS limits socket buffer size to 1 Mib -static const size_t MAX_BUFF_SIZE_ETH_MACOS = 0x100000; // 1Mib -#endif - -typedef boost::shared_ptr<boost::asio::ip::udp::socket> socket_sptr; - -/*! - * Wait for the socket to become ready for a receive operation. - * \param sock_fd the open socket file descriptor - * \param timeout the timeout duration in seconds - * \return true when the socket is ready for receive - */ -UHD_INLINE bool wait_for_recv_ready(int sock_fd, double timeout) -{ -#ifdef UHD_PLATFORM_WIN32 // select is more portable than poll unfortunately -    // setup timeval for timeout -    timeval tv; -    // If the tv_usec > 1 second on some platforms, select will -    // error EINVAL: An invalid timeout interval was specified. -    tv.tv_sec  = int(timeout); -    tv.tv_usec = int(timeout * 1000000) % 1000000; - -    // setup rset for timeout -    fd_set rset; -    FD_ZERO(&rset); -    FD_SET(sock_fd, &rset); - -// http://www.gnu.org/s/hello/manual/libc/Interrupted-Primitives.html -// This macro is provided with gcc to properly deal with EINTR. -// If not provided, define an empty macro, assume that is OK -#    ifndef TEMP_FAILURE_RETRY -#        define TEMP_FAILURE_RETRY(x) (x) -#    endif - -    // call select with timeout on receive socket -    return TEMP_FAILURE_RETRY(::select(sock_fd + 1, &rset, NULL, NULL, &tv)) > 0; -#else -    // calculate the total timeout in milliseconds (from seconds) -    int total_timeout = int(timeout * 1000); - -    pollfd pfd_read; -    pfd_read.fd     = sock_fd; -    pfd_read.events = POLLIN; - -    // call poll with timeout on receive socket -    return ::poll(&pfd_read, 1, total_timeout) > 0; -#endif -} - -}} // namespace uhd::transport - -#endif /* INCLUDED_LIBUHD_TRANSPORT_VRT_PACKET_HANDLER_HPP */ diff --git a/host/lib/transport/udp_simple.cpp b/host/lib/transport/udp_simple.cpp index 48d7b500e..e10dff7f8 100644 --- a/host/lib/transport/udp_simple.cpp +++ b/host/lib/transport/udp_simple.cpp @@ -5,9 +5,9 @@  // SPDX-License-Identifier: GPL-3.0-or-later  // -#include "udp_common.hpp"  #include <uhd/transport/udp_simple.hpp>  #include <uhd/utils/log.hpp> +#include <uhdlib/transport/udp_common.hpp>  #include <boost/format.hpp>  using namespace uhd::transport; @@ -53,7 +53,9 @@ public:      size_t recv(const asio::mutable_buffer& buff, double timeout)      { -        if (not wait_for_recv_ready(_socket->native_handle(), timeout)) +        const int32_t timeout_ms = static_cast<int32_t>(timeout * 1000); + +        if (not wait_for_recv_ready(_socket->native_handle(), timeout_ms))              return 0;          return _socket->receive_from(asio::buffer(buff), _recv_endpoint);      } diff --git a/host/lib/transport/udp_wsa_zero_copy.cpp b/host/lib/transport/udp_wsa_zero_copy.cpp index 8f83ea5ef..36837cda4 100644 --- a/host/lib/transport/udp_wsa_zero_copy.cpp +++ b/host/lib/transport/udp_wsa_zero_copy.cpp @@ -5,11 +5,11 @@  // SPDX-License-Identifier: GPL-3.0-or-later  // -#include "udp_common.hpp"  #include <uhd/transport/buffer_pool.hpp>  #include <uhd/transport/udp_simple.hpp> //mtu  #include <uhd/transport/udp_zero_copy.hpp>  #include <uhd/utils/log.hpp> +#include <uhdlib/transport/udp_common.hpp>  #include <boost/format.hpp>  #include <vector> diff --git a/host/lib/transport/udp_zero_copy.cpp b/host/lib/transport/udp_zero_copy.cpp index e3df80da6..44df9526d 100644 --- a/host/lib/transport/udp_zero_copy.cpp +++ b/host/lib/transport/udp_zero_copy.cpp @@ -5,11 +5,10 @@  // SPDX-License-Identifier: GPL-3.0-or-later  // -#include "udp_common.hpp"  #include <uhd/transport/buffer_pool.hpp> -#include <uhd/transport/udp_simple.hpp> //mtu  #include <uhd/transport/udp_zero_copy.hpp>  #include <uhd/utils/log.hpp> +#include <uhdlib/transport/udp_common.hpp>  #include <uhdlib/utils/atomic.hpp>  #include <boost/format.hpp>  #include <boost/make_shared.hpp> @@ -19,12 +18,8 @@  using namespace uhd;  using namespace uhd::transport; -namespace asio                                    = boost::asio; -constexpr size_t UDP_ZERO_COPY_DEFAULT_NUM_FRAMES = 1; -constexpr size_t UDP_ZERO_COPY_DEFAULT_FRAME_SIZE = -    1472; // Based on common 1500 byte MTU for 1GbE. -constexpr size_t UDP_ZERO_COPY_DEFAULT_BUFF_SIZE = -    2500000; // 20ms of data for 1GbE link (in bytes) +namespace asio = boost::asio; +  /***********************************************************************   * Check registry for correct fast-path setting (windows only)   **********************************************************************/ @@ -80,22 +75,11 @@ public:          if (not _claimer.claim_with_wait(timeout))              return sptr(); -#ifdef MSG_DONTWAIT // try a non-blocking recv() if supported -        _len = ::recv(_sock_fd, (char*)_mem, _frame_size, MSG_DONTWAIT); -        if (_len > 0) { -            index++; // advances the caller's buffer -            return make(this, _mem, size_t(_len)); -        } -#endif +        const int32_t timeout_ms = static_cast<int32_t>(timeout * 1000); +        _len = recv_udp_packet(_sock_fd, _mem, _frame_size, timeout_ms); -        if (wait_for_recv_ready(_sock_fd, timeout)) { -            _len = ::recv(_sock_fd, (char*)_mem, _frame_size, 0); -            if (_len == 0) -                throw uhd::io_error("socket closed"); -            if (_len < 0) -                throw uhd::io_error( -                    str(boost::format("recv error on socket: %s") % strerror(errno))); -            index++; // advances the caller's buffer +        if (_len > 0) { +            index++;              return make(this, _mem, size_t(_len));          } @@ -125,23 +109,7 @@ public:      void release(void)      { -        // Retry logic because send may fail with ENOBUFS. -        // This is known to occur at least on some OSX systems. -        // But it should be safe to always check for the error. -        while (true) { -            const ssize_t ret = ::send(_sock_fd, (const char*)_mem, size(), 0); -            if (ret == ssize_t(size())) -                break; -            if (ret == -1 and errno == ENOBUFS) { -                std::this_thread::sleep_for(std::chrono::microseconds(1)); -                continue; // try to send again -            } -            if (ret == -1) { -                throw uhd::io_error( -                    str(boost::format("send error on socket: %s") % strerror(errno))); -            } -            UHD_ASSERT_THROW(ret == ssize_t(size())); -        } +        send_udp_packet(_sock_fd, _mem, size());          _claimer.release();      } @@ -193,15 +161,7 @@ public:          check_registry_for_fast_send_threshold(this->get_send_frame_size());  #endif /*CHECK_REG_SEND_THRESH*/ -        // resolve the address -        asio::ip::udp::resolver resolver(_io_service); -        asio::ip::udp::resolver::query query(asio::ip::udp::v4(), addr, port); -        asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query); - -        // create, open, and connect the socket -        _socket = socket_sptr(new asio::ip::udp::socket(_io_service)); -        _socket->open(asio::ip::udp::v4()); -        _socket->connect(receiver_endpoint); +        _socket  = open_udp_socket(addr, port, _io_service);          _sock_fd = _socket->native_handle();          UHD_LOGGER_TRACE("UDP") << boost::format("Local UDP socket endpoint: %s:%s") @@ -220,24 +180,16 @@ public:          }      } -    // get size for internal socket buffer -    template <typename Opt> size_t get_buff_size(void) const +    size_t resize_send_socket_buffer(size_t num_bytes)      { -        Opt option; -        _socket->get_option(option); -        return option.value(); +        return resize_udp_socket_buffer<asio::socket_base::send_buffer_size>( +            _socket, num_bytes);      } -    // set size for internal socket buffer -    template <typename Opt> size_t resize_buff(size_t num_bytes) +    size_t resize_recv_socket_buffer(size_t num_bytes)      { -#if defined(UHD_PLATFORM_MACOS) || defined(UHD_PLATFORM_BSD) -        // limit buffer resize on macos or it will error -        num_bytes = std::min(num_bytes, MAX_BUFF_SIZE_ETH_MACOS); -#endif -        Opt option(num_bytes); -        _socket->set_option(option); -        return get_buff_size<Opt>(); +        return resize_udp_socket_buffer<asio::socket_base::receive_buffer_size>( +            _socket, num_bytes);      }      /******************************************************************* @@ -308,37 +260,6 @@ private:  /***********************************************************************   * UDP zero copy make function   **********************************************************************/ -template <typename Opt> -static size_t resize_buff_helper(udp_zero_copy_asio_impl::sptr udp_trans, -    const size_t target_size, -    const std::string& name) -{ -    size_t actual_size = 0; -    std::string help_message; -#if defined(UHD_PLATFORM_LINUX) -    help_message = str(boost::format("Please run: sudo sysctl -w net.core.%smem_max=%d") -                       % ((name == "recv") ? "r" : "w") % target_size); -#endif /*defined(UHD_PLATFORM_LINUX)*/ - -    // resize the buffer if size was provided -    if (target_size > 0) { -        actual_size = udp_trans->resize_buff<Opt>(target_size); -        UHD_LOGGER_TRACE("UDP") -            << boost::format("Target/actual %s sock buff size: %d/%d bytes") % name -                   % target_size % actual_size; -        if (actual_size < target_size) -            UHD_LOGGER_WARNING("UDP") -                << boost::format( -                       "The %s buffer could not be resized sufficiently.\n" -                       "Target sock buff size: %d bytes.\n" -                       "Actual sock buff size: %d bytes.\n" -                       "See the transport application notes on buffer resizing.\n%s") -                       % name % target_size % actual_size % help_message; -    } - -    return actual_size; -} -  udp_zero_copy::sptr udp_zero_copy::make(const std::string& addr,      const std::string& port,      const zero_copy_xport_params& default_buff_args, @@ -362,26 +283,24 @@ udp_zero_copy::sptr udp_zero_copy::make(const std::string& addr,          size_t(hints.cast<double>("send_buff_size", default_buff_args.send_buff_size));      if (xport_params.num_recv_frames == 0) { -        UHD_LOG_TRACE("UDP", -            "Default value for num_recv_frames: " << UDP_ZERO_COPY_DEFAULT_NUM_FRAMES); -        xport_params.num_recv_frames = UDP_ZERO_COPY_DEFAULT_NUM_FRAMES; +        UHD_LOG_TRACE( +            "UDP", "Default value for num_recv_frames: " << UDP_DEFAULT_NUM_FRAMES); +        xport_params.num_recv_frames = UDP_DEFAULT_NUM_FRAMES;      }      if (xport_params.num_send_frames == 0) { -        UHD_LOG_TRACE("UDP", -            "Default value for no num_send_frames: " << UDP_ZERO_COPY_DEFAULT_NUM_FRAMES); -        xport_params.num_send_frames = UDP_ZERO_COPY_DEFAULT_NUM_FRAMES; +        UHD_LOG_TRACE( +            "UDP", "Default value for no num_send_frames: " << UDP_DEFAULT_NUM_FRAMES); +        xport_params.num_send_frames = UDP_DEFAULT_NUM_FRAMES;      }      if (xport_params.recv_frame_size == 0) {          UHD_LOG_TRACE("UDP", -            "Using default value for  recv_frame_size: " -                << UDP_ZERO_COPY_DEFAULT_FRAME_SIZE); -        xport_params.recv_frame_size = UDP_ZERO_COPY_DEFAULT_FRAME_SIZE; +            "Using default value for  recv_frame_size: " << UDP_DEFAULT_FRAME_SIZE); +        xport_params.recv_frame_size = UDP_DEFAULT_FRAME_SIZE;      }      if (xport_params.send_frame_size == 0) { -        UHD_LOG_TRACE("UDP", -            "Using default value for send_frame_size, " -                << UDP_ZERO_COPY_DEFAULT_FRAME_SIZE); -        xport_params.send_frame_size = UDP_ZERO_COPY_DEFAULT_FRAME_SIZE; +        UHD_LOG_TRACE( +            "UDP", "Using default value for send_frame_size, " << UDP_DEFAULT_FRAME_SIZE); +        xport_params.send_frame_size = UDP_DEFAULT_FRAME_SIZE;      }      UHD_LOG_TRACE("UDP", @@ -391,15 +310,15 @@ udp_zero_copy::sptr udp_zero_copy::make(const std::string& addr,      if (xport_params.recv_buff_size == 0) {          UHD_LOG_TRACE("UDP", "Using default value for recv_buff_size"); -        xport_params.recv_buff_size = std::max(UDP_ZERO_COPY_DEFAULT_BUFF_SIZE, -            xport_params.num_recv_frames * MAX_ETHERNET_MTU); +        xport_params.recv_buff_size = std::max( +            UDP_DEFAULT_BUFF_SIZE, xport_params.num_recv_frames * MAX_ETHERNET_MTU);          UHD_LOG_TRACE("UDP",              "Using default value for recv_buff_size" << xport_params.recv_buff_size);      }      if (xport_params.send_buff_size == 0) {          UHD_LOG_TRACE("UDP", "default_buff_args has no send_buff_size"); -        xport_params.send_buff_size = std::max(UDP_ZERO_COPY_DEFAULT_BUFF_SIZE, -            xport_params.num_send_frames * MAX_ETHERNET_MTU); +        xport_params.send_buff_size = std::max( +            UDP_DEFAULT_BUFF_SIZE, xport_params.num_send_frames * MAX_ETHERNET_MTU);      }  #if defined(UHD_PLATFORM_MACOS) || defined(UHD_PLATFORM_BSD) @@ -419,18 +338,20 @@ udp_zero_copy::sptr udp_zero_copy::make(const std::string& addr,          new udp_zero_copy_asio_impl(addr, port, xport_params));      // call the helper to resize send and recv buffers -    buff_params_out.recv_buff_size = -        resize_buff_helper<asio::socket_base::receive_buffer_size>( -            udp_trans, xport_params.recv_buff_size, "recv"); -    buff_params_out.send_buff_size = -        resize_buff_helper<asio::socket_base::send_buffer_size>( -            udp_trans, xport_params.send_buff_size, "send"); +    buff_params_out.recv_buff_size = resize_udp_socket_buffer_with_warning( +        [udp_trans](size_t size) { return udp_trans->resize_recv_socket_buffer(size); }, +        xport_params.recv_buff_size, +        "recv"); +    buff_params_out.send_buff_size = resize_udp_socket_buffer_with_warning( +        [udp_trans](size_t size) { return udp_trans->resize_send_socket_buffer(size); }, +        xport_params.send_buff_size, +        "send");      if (buff_params_out.recv_buff_size          < xport_params.num_recv_frames * MAX_ETHERNET_MTU) {          UHD_LOG_WARNING("UDP",              "The current recv_buff_size of " -                << buff_params_out.recv_buff_size +                << xport_params.recv_buff_size                  << " is less than the minimum recommended size of "                  << xport_params.num_recv_frames * MAX_ETHERNET_MTU                  << " and may result in dropped packets on some NICs"); @@ -439,7 +360,7 @@ udp_zero_copy::sptr udp_zero_copy::make(const std::string& addr,          < xport_params.num_send_frames * MAX_ETHERNET_MTU) {          UHD_LOG_WARNING("UDP",              "The current send_buff_size of " -                << buff_params_out.send_buff_size +                << xport_params.send_buff_size                  << " is less than the minimum recommended size of "                  << xport_params.num_send_frames * MAX_ETHERNET_MTU                  << " and may result in dropped packets on some NICs");  | 
