diff options
| -rw-r--r-- | lib/Socket.cpp | 58 | ||||
| -rw-r--r-- | lib/Socket.h | 14 | ||||
| -rw-r--r-- | lib/edioutput/EDIConfig.h | 3 | ||||
| -rw-r--r-- | lib/edioutput/Transport.cpp | 13 | ||||
| -rw-r--r-- | lib/edioutput/Transport.h | 4 | 
5 files changed, 79 insertions, 13 deletions
| diff --git a/lib/Socket.cpp b/lib/Socket.cpp index 1ff6418..10ec1ca 100644 --- a/lib/Socket.cpp +++ b/lib/Socket.cpp @@ -2,7 +2,7 @@     Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the     Queen in Right of Canada (Communications Research Center Canada) -   Copyright (C) 2020 +   Copyright (C) 2022     Matthias P. Braendli, matthias.braendli@mpb.li      http://www.opendigitalradio.org @@ -30,6 +30,7 @@  #include <cerrno>  #include <fcntl.h>  #include <poll.h> +#include <netinet/tcp.h>  namespace Socket { @@ -610,6 +611,37 @@ void TCPSocket::connect(const std::string& hostname, int port, bool nonblock)      }  } +void TCPSocket::enable_keepalive(int time, int intvl, int probes) +{ +    if (m_sock == INVALID_SOCKET) { +        throw std::logic_error("You may not call enable_keepalive on invalid socket"); +    } +    int optval = 1; +    auto optlen = sizeof(optval); +    if (setsockopt(m_sock, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen) < 0) { +        std::string errstr(strerror(errno)); +        throw std::runtime_error("TCP: Could not set SO_KEEPALIVE: " + errstr); +    } + +    optval = time; +    if (setsockopt(m_sock, SOL_TCP, TCP_KEEPIDLE, &optval, optlen) < 0) { +        std::string errstr(strerror(errno)); +        throw std::runtime_error("TCP: Could not set TCP_KEEPIDLE: " + errstr); +    } + +    optval = intvl; +    if (setsockopt(m_sock, SOL_TCP, TCP_KEEPINTVL, &optval, optlen) < 0) { +        std::string errstr(strerror(errno)); +        throw std::runtime_error("TCP: Could not set TCP_KEEPINTVL: " + errstr); +    } + +    optval = probes; +    if (setsockopt(m_sock, SOL_TCP, TCP_KEEPCNT, &optval, optlen) < 0) { +        std::string errstr(strerror(errno)); +        throw std::runtime_error("TCP: Could not set TCP_KEEPCNT: " + errstr); +    } +} +  void TCPSocket::listen(int port, const string& name)  {      if (m_sock != INVALID_SOCKET) { @@ -938,8 +970,9 @@ void TCPConnection::process()  } -TCPDataDispatcher::TCPDataDispatcher(size_t max_queue_size) : -    m_max_queue_size(max_queue_size) +TCPDataDispatcher::TCPDataDispatcher(size_t max_queue_size, size_t buffers_to_preroll) : +    m_max_queue_size(max_queue_size), +    m_buffers_to_preroll(buffers_to_preroll)  {  } @@ -967,12 +1000,20 @@ void TCPDataDispatcher::write(const vector<uint8_t>& data)          throw runtime_error(m_exception_data);      } +    auto lock = unique_lock<mutex>(m_mutex); + +    if (m_buffers_to_preroll > 0) { +        m_preroll_queue.push_back(data); +        if (m_preroll_queue.size() > m_buffers_to_preroll) { +            m_preroll_queue.pop_front(); +        } +    } +      for (auto& connection : m_connections) {          connection.queue.push(data);      } -    m_connections.remove_if( -            [&](const TCPConnection& conn){ return conn.queue.size() > m_max_queue_size; }); +    m_connections.remove_if( [&](const TCPConnection& conn){ return conn.queue.size() > m_max_queue_size; });  }  void TCPDataDispatcher::process() @@ -984,7 +1025,14 @@ void TCPDataDispatcher::process()              // Add a new TCPConnection to the list, constructing it from the client socket              auto sock = m_listener_socket.accept(timeout_ms);              if (sock.valid()) { +                auto lock = unique_lock<mutex>(m_mutex);                  m_connections.emplace(m_connections.begin(), move(sock)); + +                if (m_buffers_to_preroll > 0) { +                    for (const auto& buf : m_preroll_queue) { +                        m_connections.front().queue.push(buf); +                    } +                }              }          }      } diff --git a/lib/Socket.h b/lib/Socket.h index f5143a0..d8242e2 100644 --- a/lib/Socket.h +++ b/lib/Socket.h @@ -2,7 +2,7 @@     Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the     Queen in Right of Canada (Communications Research Center Canada) -   Copyright (C) 2020 +   Copyright (C) 2022     Matthias P. Braendli, matthias.braendli@mpb.li      http://www.opendigitalradio.org @@ -173,6 +173,11 @@ class TCPSocket {          void listen(int port, const std::string& name);          void close(void); +        /* Enable TCP keepalive. See +         * https://tldp.org/HOWTO/TCP-Keepalive-HOWTO/usingkeepalive.html +         */ +        void enable_keepalive(int time, int intvl, int probes); +          /* throws a runtime_error on failure, an invalid socket on timeout */          TCPSocket accept(int timeout_ms); @@ -254,7 +259,7 @@ class TCPConnection  class TCPDataDispatcher  {      public: -        TCPDataDispatcher(size_t max_queue_size); +        TCPDataDispatcher(size_t max_queue_size, size_t buffers_to_preroll);          ~TCPDataDispatcher();          TCPDataDispatcher(const TCPDataDispatcher&) = delete;          TCPDataDispatcher& operator=(const TCPDataDispatcher&) = delete; @@ -266,11 +271,16 @@ class TCPDataDispatcher          void process();          size_t m_max_queue_size; +        size_t m_buffers_to_preroll; +          std::atomic<bool> m_running = ATOMIC_VAR_INIT(false);          std::string m_exception_data;          std::thread m_listener_thread;          TCPSocket m_listener_socket; + +        std::mutex m_mutex; +        std::deque<std::vector<uint8_t> > m_preroll_queue;          std::list<TCPConnection> m_connections;  }; diff --git a/lib/edioutput/EDIConfig.h b/lib/edioutput/EDIConfig.h index d57e9ce..a7225a7 100644 --- a/lib/edioutput/EDIConfig.h +++ b/lib/edioutput/EDIConfig.h @@ -74,6 +74,9 @@ struct configuration_t {      // Spread transmission of fragments in time. 1.0 = 100% means spreading over the whole duration of a frame (24ms)      // Above 100% means that the fragments are spread over several 24ms periods, interleaving the AF packets. +    // TCP Server output can preroll a fixed number of previous buffers each time a new client connects. +    size_t tcp_server_preroll_buffers = 0; +      bool enabled() const { return destinations.size() > 0; }      void print() const; diff --git a/lib/edioutput/Transport.cpp b/lib/edioutput/Transport.cpp index 5d34814..a870aa0 100644 --- a/lib/edioutput/Transport.cpp +++ b/lib/edioutput/Transport.cpp @@ -1,5 +1,5 @@  /* -   Copyright (C) 2020 +   Copyright (C) 2022     Matthias P. Braendli, matthias.braendli@mpb.li      http://www.opendigitalradio.org @@ -66,7 +66,7 @@ Sender::Sender(const configuration_t& conf) :      edi_pft(m_conf)  {      if (m_conf.verbose) { -        etiLog.log(info, "Setup EDI Output"); +        etiLog.level(info) << "Setup EDI Output, TCP output preroll " << m_conf.tcp_server_preroll_buffers;      }      for (const auto& edi_dest : m_conf.destinations) { @@ -81,7 +81,9 @@ Sender::Sender(const configuration_t& conf) :              udp_sockets.emplace(udp_dest.get(), udp_socket);          }          else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(edi_dest)) { -            auto dispatcher = make_shared<Socket::TCPDataDispatcher>(tcp_dest->max_frames_queued); +            auto dispatcher = make_shared<Socket::TCPDataDispatcher>( +                    tcp_dest->max_frames_queued, m_conf.tcp_server_preroll_buffers); +              dispatcher->start(tcp_dest->listen_port, "0.0.0.0");              tcp_dispatchers.emplace(tcp_dest.get(), dispatcher);          } @@ -135,9 +137,10 @@ void Sender::write(const AFPacket& af_packet)          // Apply PFT layer to AF Packet (Reed Solomon FEC and Fragmentation)          vector<edi::PFTFragment> edi_fragments = edi_pft.Assemble(af_packet); -        if (m_conf.verbose) { -            fprintf(stderr, "EDI Output: Number of PFT fragments %zu\n", +        if (m_conf.verbose and m_last_num_pft_fragments != edi_fragments.size()) { +            etiLog.log(debug, "EDI Output: Number of PFT fragments %zu\n",                      edi_fragments.size()); +            m_last_num_pft_fragments = edi_fragments.size();          }          /* Spread out the transmission of all fragments over part of the 24ms AF packet duration diff --git a/lib/edioutput/Transport.h b/lib/edioutput/Transport.h index be93297..6a3f229 100644 --- a/lib/edioutput/Transport.h +++ b/lib/edioutput/Transport.h @@ -1,5 +1,5 @@  /* -   Copyright (C) 2020 +   Copyright (C) 2022     Matthias P. Braendli, matthias.braendli@mpb.li      http://www.opendigitalradio.org @@ -90,6 +90,8 @@ class Sender {          std::mutex m_mutex;          bool m_running = false;          std::map<std::chrono::steady_clock::time_point, edi::PFTFragment> m_pending_frames; + +        size_t m_last_num_pft_fragments = 0;  };  } | 
