diff options
| author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2022-08-19 17:19:16 +0200 | 
|---|---|---|
| committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2022-08-19 17:19:16 +0200 | 
| commit | 8007dffae5fee4fcc81fcb5f888e0ee138db6e1e (patch) | |
| tree | f0ef8f06c046ce7242736078367000769b1fff4a /lib/Socket.cpp | |
| parent | fe2c5e875e6410376f4dc2e9257b3beaf52d33e0 (diff) | |
| download | ODR-SourceCompanion-8007dffae5fee4fcc81fcb5f888e0ee138db6e1e.tar.gz ODR-SourceCompanion-8007dffae5fee4fcc81fcb5f888e0ee138db6e1e.tar.bz2 ODR-SourceCompanion-8007dffae5fee4fcc81fcb5f888e0ee138db6e1e.zip | |
Common c23bfcb, fe2a905, 036201c with socket changes
Diffstat (limited to 'lib/Socket.cpp')
| -rw-r--r-- | lib/Socket.cpp | 58 | 
1 files changed, 53 insertions, 5 deletions
| diff --git a/lib/Socket.cpp b/lib/Socket.cpp index 1ff6418..10ec1ca 100644 --- a/lib/Socket.cpp +++ b/lib/Socket.cpp @@ -2,7 +2,7 @@     Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the     Queen in Right of Canada (Communications Research Center Canada) -   Copyright (C) 2020 +   Copyright (C) 2022     Matthias P. Braendli, matthias.braendli@mpb.li      http://www.opendigitalradio.org @@ -30,6 +30,7 @@  #include <cerrno>  #include <fcntl.h>  #include <poll.h> +#include <netinet/tcp.h>  namespace Socket { @@ -610,6 +611,37 @@ void TCPSocket::connect(const std::string& hostname, int port, bool nonblock)      }  } +void TCPSocket::enable_keepalive(int time, int intvl, int probes) +{ +    if (m_sock == INVALID_SOCKET) { +        throw std::logic_error("You may not call enable_keepalive on invalid socket"); +    } +    int optval = 1; +    auto optlen = sizeof(optval); +    if (setsockopt(m_sock, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen) < 0) { +        std::string errstr(strerror(errno)); +        throw std::runtime_error("TCP: Could not set SO_KEEPALIVE: " + errstr); +    } + +    optval = time; +    if (setsockopt(m_sock, SOL_TCP, TCP_KEEPIDLE, &optval, optlen) < 0) { +        std::string errstr(strerror(errno)); +        throw std::runtime_error("TCP: Could not set TCP_KEEPIDLE: " + errstr); +    } + +    optval = intvl; +    if (setsockopt(m_sock, SOL_TCP, TCP_KEEPINTVL, &optval, optlen) < 0) { +        std::string errstr(strerror(errno)); +        throw std::runtime_error("TCP: Could not set TCP_KEEPINTVL: " + errstr); +    } + +    optval = probes; +    if (setsockopt(m_sock, SOL_TCP, TCP_KEEPCNT, &optval, optlen) < 0) { +        std::string errstr(strerror(errno)); +        throw std::runtime_error("TCP: Could not set TCP_KEEPCNT: " + errstr); +    } +} +  void TCPSocket::listen(int port, const string& name)  {      if (m_sock != INVALID_SOCKET) { @@ -938,8 +970,9 @@ void TCPConnection::process()  } -TCPDataDispatcher::TCPDataDispatcher(size_t max_queue_size) : -    m_max_queue_size(max_queue_size) +TCPDataDispatcher::TCPDataDispatcher(size_t max_queue_size, size_t buffers_to_preroll) : +    m_max_queue_size(max_queue_size), +    m_buffers_to_preroll(buffers_to_preroll)  {  } @@ -967,12 +1000,20 @@ void TCPDataDispatcher::write(const vector<uint8_t>& data)          throw runtime_error(m_exception_data);      } +    auto lock = unique_lock<mutex>(m_mutex); + +    if (m_buffers_to_preroll > 0) { +        m_preroll_queue.push_back(data); +        if (m_preroll_queue.size() > m_buffers_to_preroll) { +            m_preroll_queue.pop_front(); +        } +    } +      for (auto& connection : m_connections) {          connection.queue.push(data);      } -    m_connections.remove_if( -            [&](const TCPConnection& conn){ return conn.queue.size() > m_max_queue_size; }); +    m_connections.remove_if( [&](const TCPConnection& conn){ return conn.queue.size() > m_max_queue_size; });  }  void TCPDataDispatcher::process() @@ -984,7 +1025,14 @@ void TCPDataDispatcher::process()              // Add a new TCPConnection to the list, constructing it from the client socket              auto sock = m_listener_socket.accept(timeout_ms);              if (sock.valid()) { +                auto lock = unique_lock<mutex>(m_mutex);                  m_connections.emplace(m_connections.begin(), move(sock)); + +                if (m_buffers_to_preroll > 0) { +                    for (const auto& buf : m_preroll_queue) { +                        m_connections.front().queue.push(buf); +                    } +                }              }          }      } | 
