From b74b895dd44f5a76d581b8dec65dbf76dd5cece9 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Mon, 6 May 2019 11:45:08 +0200 Subject: Move TCPDataDispatcher into TcpSocket --- src/TcpSocket.h | 57 +++++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 53 insertions(+), 4 deletions(-) (limited to 'src/TcpSocket.h') diff --git a/src/TcpSocket.h b/src/TcpSocket.h index 9ff09e5..ec7afd3 100644 --- a/src/TcpSocket.h +++ b/src/TcpSocket.h @@ -2,7 +2,7 @@ Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2017 + Copyright (C) 2019 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -32,6 +32,7 @@ #endif #include "InetAddress.h" +#include "ThreadsafeQueue.h" #include #include #include @@ -45,8 +46,11 @@ #include #include - +#include #include +#include +#include +#include /** * This class represents a TCP socket. @@ -77,8 +81,8 @@ class TcpSocket /** Send data over the TCP connection. * @param data The buffer that will be sent. * @param size Number of bytes to send. - * @param timeout_ms number of milliseconds before timeout - * return number of bytes sent or -1 if error + * @param timeout_ms number of milliseconds before timeout, or 0 for infinite timeout + * return number of bytes sent, 0 on timeout, or throws runtime_error. */ ssize_t send(const void* data, size_t size, int timeout_ms=0); @@ -112,4 +116,49 @@ class TcpSocket SOCKET m_sock; }; +/* Helper class for TCPDataDispatcher, contains a queue of pending data and + * a sender thread. */ +class TCPConnection +{ + public: + TCPConnection(TcpSocket&& sock); + TCPConnection(const TCPConnection&) = delete; + TCPConnection& operator=(const TCPConnection&) = delete; + ~TCPConnection(); + + ThreadsafeQueue > queue; + + private: + std::atomic m_running; + std::thread m_sender_thread; + TcpSocket m_sock; + + void process(void); +}; + +/* Send a TCP stream to several destinations, and automatically disconnect destinations + * whose buffer overflows. + */ +class TCPDataDispatcher +{ + public: + TCPDataDispatcher(size_t max_queue_size); + ~TCPDataDispatcher(); + TCPDataDispatcher(const TCPDataDispatcher&) = delete; + TCPDataDispatcher& operator=(const TCPDataDispatcher&) = delete; + + void start(int port, const std::string& address); + void write(const std::vector& data); + + private: + void process(void); + + size_t m_max_queue_size; + + std::atomic m_running; + std::thread m_listener_thread; + TcpSocket m_listener_socket; + std::list m_connections; +}; + #endif // _TCPSOCKET -- cgit v1.2.3