diff options
Diffstat (limited to 'lib/Socket.cpp')
| -rw-r--r-- | lib/Socket.cpp | 142 | 
1 files changed, 84 insertions, 58 deletions
| diff --git a/lib/Socket.cpp b/lib/Socket.cpp index 6a20429..c876f32 100644 --- a/lib/Socket.cpp +++ b/lib/Socket.cpp @@ -66,6 +66,18 @@ void InetAddress::resolveUdpDestination(const std::string& destination, int port      }  } +string InetAddress::to_string() const +{ +    char received_from_str[64] = {}; +    sockaddr *addr = reinterpret_cast<sockaddr*>(&addr); +    const char* ret = inet_ntop(AF_INET, addr, received_from_str, 63); + +    if (ret == nullptr) { +        throw invalid_argument(string("Error converting InetAddress") + strerror(errno)); +    } +    return ret; +} +  UDPPacket::UDPPacket() { }  UDPPacket::UDPPacket(size_t initSize) : @@ -74,24 +86,37 @@ UDPPacket::UDPPacket(size_t initSize) :  { } -UDPSocket::UDPSocket() : -    m_sock(INVALID_SOCKET) +UDPSocket::UDPSocket()  {      reinit(0, "");  } -UDPSocket::UDPSocket(int port) : -    m_sock(INVALID_SOCKET) +UDPSocket::UDPSocket(int port)  {      reinit(port, "");  } -UDPSocket::UDPSocket(int port, const std::string& name) : -    m_sock(INVALID_SOCKET) +UDPSocket::UDPSocket(int port, const std::string& name)  {      reinit(port, name);  } +UDPSocket::UDPSocket(UDPSocket&& other) +{ +    m_sock = other.m_sock; +    m_port = other.m_port; +    other.m_port = 0; +    other.m_sock = INVALID_SOCKET; +} + +const UDPSocket& UDPSocket::operator=(UDPSocket&& other) +{ +    m_sock = other.m_sock; +    m_port = other.m_port; +    other.m_port = 0; +    other.m_sock = INVALID_SOCKET; +    return *this; +}  void UDPSocket::setBlocking(bool block)  { @@ -112,6 +137,8 @@ void UDPSocket::reinit(int port, const std::string& name)          ::close(m_sock);      } +    m_port = port; +      if (port == 0) {          // No need to bind to a given port, creating the          // socket is enough @@ -276,72 +303,71 @@ void UDPSocket::setMulticastTTL(int ttl)      }  } -UDPReceiver::UDPReceiver() { } - -UDPReceiver::~UDPReceiver() { -    m_stop = true; -    m_sock.close(); -    if (m_thread.joinable()) { -        m_thread.join(); -    } +SOCKET UDPSocket::getNativeSocket() const +{ +    return m_sock;  } -void UDPReceiver::start(int port, const string& bindto, const string& mcastaddr, size_t max_packets_queued) { -    m_port = port; -    m_bindto = bindto; -    m_mcastaddr = mcastaddr; -    m_max_packets_queued = max_packets_queued; -    m_thread = std::thread(&UDPReceiver::m_run, this); +int UDPSocket::getPort() const +{ +    return m_port;  } -std::vector<uint8_t> UDPReceiver::get_packet_buffer() -{ -    if (m_stop) { -        throw runtime_error("UDP Receiver not running"); -    } +void UDPReceiver::add_receive_port(int port, const string& bindto, const string& mcastaddr) { +    UDPSocket sock; -    UDPPacket p; -    m_packets.wait_and_pop(p); +    if (IN_MULTICAST(ntohl(inet_addr(mcastaddr.c_str())))) { +        sock.reinit(port, mcastaddr); +        sock.setMulticastSource(bindto.c_str()); +        sock.joinGroup(mcastaddr.c_str(), bindto.c_str()); +    } +    else { +        sock.reinit(port, bindto); +    } -    return p.buffer; +    m_sockets.push_back(move(sock));  } -void UDPReceiver::m_run() +vector<UDPReceiver::ReceivedPacket> UDPReceiver::receive(int timeout_ms)  { -    // Ensure that stop is set to true in case of exception or return -    struct SetStopOnDestruct { -        SetStopOnDestruct(atomic<bool>& stop) : m_stop(stop) {} -        ~SetStopOnDestruct() { m_stop = true; } -        private: atomic<bool>& m_stop; -    } autoSetStop(m_stop); - -    if (IN_MULTICAST(ntohl(inet_addr(m_mcastaddr.c_str())))) { -        m_sock.reinit(m_port, m_mcastaddr); -        m_sock.setMulticastSource(m_bindto.c_str()); -        m_sock.joinGroup(m_mcastaddr.c_str(), m_bindto.c_str()); +    constexpr size_t MAX_FDS = 64; +    struct pollfd fds[MAX_FDS]; +    if (m_sockets.size() > MAX_FDS) { +        throw std::runtime_error("UDPReceiver only supports up to 64 ports");      } -    else { -        m_sock.reinit(m_port, m_bindto); + +    for (size_t i = 0; i < m_sockets.size(); i++) { +        fds[i].fd = m_sockets[i].getNativeSocket(); +        fds[i].events = POLLIN;      } -    while (not m_stop) { -        constexpr size_t packsize = 8192; -        try { -            auto packet = m_sock.receive(packsize); -            if (packet.buffer.size() == packsize) { -                // TODO replace fprintf -                fprintf(stderr, "Warning, possible UDP truncation\n"); -            } +    int retval = poll(fds, m_sockets.size(), timeout_ms); -            // If this blocks, the UDP socket will lose incoming packets -            m_packets.push_wait_if_full(packet, m_max_packets_queued); -        } -        catch (const std::runtime_error& e) { -            // TODO replace fprintf -            // TODO handle intr -            fprintf(stderr, "Socket error: %s\n", e.what()); -            m_stop = true; +    if (retval == -1 and errno == EINTR) { +        throw Interrupted(); +    } +    else if (retval == -1) { +        std::string errstr(strerror(errno)); +        throw std::runtime_error("UDP receive with poll() error: " + errstr); +    } +    else if (retval > 0) { +        vector<ReceivedPacket> received; + +        for (size_t i = 0; i < m_sockets.size(); i++) { +            if (fds[i].revents & POLLIN) { +                auto p = m_sockets[i].receive(2048); // This is larger than the usual MTU +                ReceivedPacket rp; +                rp.packetdata = move(p.buffer); +                rp.received_from = move(p.address); +                rp.port_received_on = m_sockets[i].getPort(); +                received.push_back(move(rp)); +            }          } + +        return received; +    } +    else { +        throw Timeout();      }  } | 
