diff options
| -rw-r--r-- | lib/Socket.cpp | 142 | ||||
| -rw-r--r-- | lib/Socket.h | 42 | 
2 files changed, 107 insertions, 77 deletions
| diff --git a/lib/Socket.cpp b/lib/Socket.cpp index 6a20429..c876f32 100644 --- a/lib/Socket.cpp +++ b/lib/Socket.cpp @@ -66,6 +66,18 @@ void InetAddress::resolveUdpDestination(const std::string& destination, int port      }  } +string InetAddress::to_string() const +{ +    char received_from_str[64] = {}; +    sockaddr *addr = reinterpret_cast<sockaddr*>(&addr); +    const char* ret = inet_ntop(AF_INET, addr, received_from_str, 63); + +    if (ret == nullptr) { +        throw invalid_argument(string("Error converting InetAddress") + strerror(errno)); +    } +    return ret; +} +  UDPPacket::UDPPacket() { }  UDPPacket::UDPPacket(size_t initSize) : @@ -74,24 +86,37 @@ UDPPacket::UDPPacket(size_t initSize) :  { } -UDPSocket::UDPSocket() : -    m_sock(INVALID_SOCKET) +UDPSocket::UDPSocket()  {      reinit(0, "");  } -UDPSocket::UDPSocket(int port) : -    m_sock(INVALID_SOCKET) +UDPSocket::UDPSocket(int port)  {      reinit(port, "");  } -UDPSocket::UDPSocket(int port, const std::string& name) : -    m_sock(INVALID_SOCKET) +UDPSocket::UDPSocket(int port, const std::string& name)  {      reinit(port, name);  } +UDPSocket::UDPSocket(UDPSocket&& other) +{ +    m_sock = other.m_sock; +    m_port = other.m_port; +    other.m_port = 0; +    other.m_sock = INVALID_SOCKET; +} + +const UDPSocket& UDPSocket::operator=(UDPSocket&& other) +{ +    m_sock = other.m_sock; +    m_port = other.m_port; +    other.m_port = 0; +    other.m_sock = INVALID_SOCKET; +    return *this; +}  void UDPSocket::setBlocking(bool block)  { @@ -112,6 +137,8 @@ void UDPSocket::reinit(int port, const std::string& name)          ::close(m_sock);      } +    m_port = port; +      if (port == 0) {          // No need to bind to a given port, creating the          // socket is enough @@ -276,72 +303,71 @@ void UDPSocket::setMulticastTTL(int ttl)      }  } -UDPReceiver::UDPReceiver() { } - -UDPReceiver::~UDPReceiver() { -    m_stop = true; -    m_sock.close(); -    if (m_thread.joinable()) { -        m_thread.join(); -    } +SOCKET UDPSocket::getNativeSocket() const +{ +    return m_sock;  } -void UDPReceiver::start(int port, const string& bindto, const string& mcastaddr, size_t max_packets_queued) { -    m_port = port; -    m_bindto = bindto; -    m_mcastaddr = mcastaddr; -    m_max_packets_queued = max_packets_queued; -    m_thread = std::thread(&UDPReceiver::m_run, this); +int UDPSocket::getPort() const +{ +    return m_port;  } -std::vector<uint8_t> UDPReceiver::get_packet_buffer() -{ -    if (m_stop) { -        throw runtime_error("UDP Receiver not running"); -    } +void UDPReceiver::add_receive_port(int port, const string& bindto, const string& mcastaddr) { +    UDPSocket sock; -    UDPPacket p; -    m_packets.wait_and_pop(p); +    if (IN_MULTICAST(ntohl(inet_addr(mcastaddr.c_str())))) { +        sock.reinit(port, mcastaddr); +        sock.setMulticastSource(bindto.c_str()); +        sock.joinGroup(mcastaddr.c_str(), bindto.c_str()); +    } +    else { +        sock.reinit(port, bindto); +    } -    return p.buffer; +    m_sockets.push_back(move(sock));  } -void UDPReceiver::m_run() +vector<UDPReceiver::ReceivedPacket> UDPReceiver::receive(int timeout_ms)  { -    // Ensure that stop is set to true in case of exception or return -    struct SetStopOnDestruct { -        SetStopOnDestruct(atomic<bool>& stop) : m_stop(stop) {} -        ~SetStopOnDestruct() { m_stop = true; } -        private: atomic<bool>& m_stop; -    } autoSetStop(m_stop); - -    if (IN_MULTICAST(ntohl(inet_addr(m_mcastaddr.c_str())))) { -        m_sock.reinit(m_port, m_mcastaddr); -        m_sock.setMulticastSource(m_bindto.c_str()); -        m_sock.joinGroup(m_mcastaddr.c_str(), m_bindto.c_str()); +    constexpr size_t MAX_FDS = 64; +    struct pollfd fds[MAX_FDS]; +    if (m_sockets.size() > MAX_FDS) { +        throw std::runtime_error("UDPReceiver only supports up to 64 ports");      } -    else { -        m_sock.reinit(m_port, m_bindto); + +    for (size_t i = 0; i < m_sockets.size(); i++) { +        fds[i].fd = m_sockets[i].getNativeSocket(); +        fds[i].events = POLLIN;      } -    while (not m_stop) { -        constexpr size_t packsize = 8192; -        try { -            auto packet = m_sock.receive(packsize); -            if (packet.buffer.size() == packsize) { -                // TODO replace fprintf -                fprintf(stderr, "Warning, possible UDP truncation\n"); -            } +    int retval = poll(fds, m_sockets.size(), timeout_ms); -            // If this blocks, the UDP socket will lose incoming packets -            m_packets.push_wait_if_full(packet, m_max_packets_queued); -        } -        catch (const std::runtime_error& e) { -            // TODO replace fprintf -            // TODO handle intr -            fprintf(stderr, "Socket error: %s\n", e.what()); -            m_stop = true; +    if (retval == -1 and errno == EINTR) { +        throw Interrupted(); +    } +    else if (retval == -1) { +        std::string errstr(strerror(errno)); +        throw std::runtime_error("UDP receive with poll() error: " + errstr); +    } +    else if (retval > 0) { +        vector<ReceivedPacket> received; + +        for (size_t i = 0; i < m_sockets.size(); i++) { +            if (fds[i].revents & POLLIN) { +                auto p = m_sockets[i].receive(2048); // This is larger than the usual MTU +                ReceivedPacket rp; +                rp.packetdata = move(p.buffer); +                rp.received_from = move(p.address); +                rp.port_received_on = m_sockets[i].getPort(); +                received.push_back(move(rp)); +            }          } + +        return received; +    } +    else { +        throw Timeout();      }  } diff --git a/lib/Socket.h b/lib/Socket.h index 2291dd5..33cdc05 100644 --- a/lib/Socket.h +++ b/lib/Socket.h @@ -56,6 +56,8 @@ struct InetAddress {      struct sockaddr *as_sockaddr() { return reinterpret_cast<sockaddr*>(&addr); };      void resolveUdpDestination(const std::string& destination, int port); + +    std::string to_string() const;  };  /** This class represents a UDP packet. @@ -103,6 +105,8 @@ class UDPSocket          ~UDPSocket();          UDPSocket(const UDPSocket& other) = delete;          const UDPSocket& operator=(const UDPSocket& other) = delete; +        UDPSocket(UDPSocket&& other); +        const UDPSocket& operator=(UDPSocket&& other);          /** Close the already open socket, and create a new one. Throws a runtime_error on error.  */          void reinit(int port); @@ -121,36 +125,36 @@ class UDPSocket           */          void setBlocking(bool block); +        SOCKET getNativeSocket() const; +        int getPort() const; +      protected: -        SOCKET m_sock; +        SOCKET m_sock = INVALID_SOCKET; +        int m_port = 0;  }; -/* Threaded UDP receiver */ +/* UDP packet receiver supporting receiving from several ports at once */  class UDPReceiver {      public: -        UDPReceiver(); -        ~UDPReceiver(); -        UDPReceiver(const UDPReceiver&) = delete; -        UDPReceiver operator=(const UDPReceiver&) = delete; +        void add_receive_port(int port, const std::string& bindto, const std::string& mcastaddr); -        // Start the receiver in a separate thread -        void start(int port, const std::string& bindto, const std::string& mcastaddr, size_t max_packets_queued); +        struct ReceivedPacket { +            std::vector<uint8_t> packetdata; +            InetAddress received_from; +            int port_received_on; +        }; -        // Get the data contained in a UDP packet, blocks if none available -        // In case of error, throws a runtime_error -        std::vector<uint8_t> get_packet_buffer(void); +        class Interrupted {}; +        class Timeout {}; +        /* Returns one or several packets, +         * throws a Timeout on timeout, Interrupted on EINTR, a runtime_error +         * on error. */ +        std::vector<ReceivedPacket> receive(int timeout_ms);      private:          void m_run(void); -        int m_port = 0; -        std::string m_bindto; -        std::string m_mcastaddr; -        size_t m_max_packets_queued = 1; -        std::thread m_thread; -        std::atomic<bool> m_stop = ATOMIC_VAR_INIT(false); -        ThreadsafeQueue<UDPPacket> m_packets; -        UDPSocket m_sock; +        std::vector<UDPSocket> m_sockets;  };  class TCPSocket { | 
