diff options
| author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2020-04-21 15:51:31 +0200 | 
|---|---|---|
| committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2020-04-21 15:51:31 +0200 | 
| commit | 43d5fa8e85dd013391a8eafc16350cc13b008389 (patch) | |
| tree | 63b93aefd0428a6e4a3604d73f1ab076a601f3ad /lib/Socket.cpp | |
| parent | 0aa8e58e6763bda1d20246155e61b7cd54cdfc65 (diff) | |
| download | ODR-SourceCompanion-43d5fa8e85dd013391a8eafc16350cc13b008389.tar.gz ODR-SourceCompanion-43d5fa8e85dd013391a8eafc16350cc13b008389.tar.bz2 ODR-SourceCompanion-43d5fa8e85dd013391a8eafc16350cc13b008389.zip | |
Common 33a8362: EDI TCP output: handle disconnects
Diffstat (limited to 'lib/Socket.cpp')
| -rw-r--r-- | lib/Socket.cpp | 70 | 
1 files changed, 67 insertions, 3 deletions
| diff --git a/lib/Socket.cpp b/lib/Socket.cpp index bfbef93..159de7e 100644 --- a/lib/Socket.cpp +++ b/lib/Socket.cpp @@ -199,10 +199,11 @@ UDPPacket UDPSocket::receive(size_t max_size)          // This suppresses the -Wlogical-op warning  #if EAGAIN == EWOULDBLOCK -        if (errno == EAGAIN) { +        if (errno == EAGAIN)  #else -        if (errno == EAGAIN or errno == EWOULDBLOCK) { +        if (errno == EAGAIN or errno == EWOULDBLOCK)  #endif +        {              return 0;          }          throw runtime_error(string("Can't receive data: ") + strerror(errno)); @@ -733,7 +734,9 @@ TCPConnection::~TCPConnection()      m_running = false;      vector<uint8_t> termination_marker;      queue.push(termination_marker); -    m_sender_thread.join(); +    if (m_sender_thread.joinable()) { +        m_sender_thread.join(); +    }  }  void TCPConnection::process() @@ -905,4 +908,65 @@ void TCPReceiveServer::process()      }  } +TCPSendClient::TCPSendClient(const std::string& hostname, int port) : +    m_hostname(hostname), +    m_port(port), +    m_running(true) +{ +    m_sender_thread = std::thread(&TCPSendClient::process, this); +} + +TCPSendClient::~TCPSendClient() +{ +    m_running = false; +    m_queue.trigger_wakeup(); +    if (m_sender_thread.joinable()) { +        m_sender_thread.join(); +    } +} + +void TCPSendClient::sendall(const std::vector<uint8_t>& buffer) +{ +    if (not m_running) { +        throw runtime_error(m_exception_data); +    } + +    m_queue.push(buffer); +} + +void TCPSendClient::process() +{ +    try { +        while (m_running) { +            if (m_is_connected) { +                try { +                    vector<uint8_t> incoming; +                    m_queue.wait_and_pop(incoming); +                    if (m_sock.sendall(incoming.data(), incoming.size()) == -1) { +                        m_is_connected = false; +                        m_sock = TCPSocket(); +                    } +                } +                catch (const ThreadsafeQueueWakeup&) { +                    break; +                } +            } +            else { +                try { +                    m_sock.connect(m_hostname, m_port); +                    m_is_connected = true; +                } +                catch (const runtime_error& e) { +                    m_is_connected = false; +                    this_thread::sleep_for(chrono::seconds(1)); +                } +            } +        } +    } +    catch (const runtime_error& e) { +        m_exception_data = e.what(); +        m_running = false; +    } +} +  } | 
