diff options
Diffstat (limited to 'lib')
| -rw-r--r-- | lib/Socket.cpp | 70 | ||||
| -rw-r--r-- | lib/Socket.h | 28 | ||||
| -rw-r--r-- | lib/ThreadsafeQueue.h | 23 | ||||
| -rw-r--r-- | lib/edi/Transport.cpp | 41 | ||||
| -rw-r--r-- | lib/edi/Transport.h | 2 | 
5 files changed, 135 insertions, 29 deletions
| diff --git a/lib/Socket.cpp b/lib/Socket.cpp index bfbef93..159de7e 100644 --- a/lib/Socket.cpp +++ b/lib/Socket.cpp @@ -199,10 +199,11 @@ UDPPacket UDPSocket::receive(size_t max_size)          // This suppresses the -Wlogical-op warning  #if EAGAIN == EWOULDBLOCK -        if (errno == EAGAIN) { +        if (errno == EAGAIN)  #else -        if (errno == EAGAIN or errno == EWOULDBLOCK) { +        if (errno == EAGAIN or errno == EWOULDBLOCK)  #endif +        {              return 0;          }          throw runtime_error(string("Can't receive data: ") + strerror(errno)); @@ -733,7 +734,9 @@ TCPConnection::~TCPConnection()      m_running = false;      vector<uint8_t> termination_marker;      queue.push(termination_marker); -    m_sender_thread.join(); +    if (m_sender_thread.joinable()) { +        m_sender_thread.join(); +    }  }  void TCPConnection::process() @@ -905,4 +908,65 @@ void TCPReceiveServer::process()      }  } +TCPSendClient::TCPSendClient(const std::string& hostname, int port) : +    m_hostname(hostname), +    m_port(port), +    m_running(true) +{ +    m_sender_thread = std::thread(&TCPSendClient::process, this); +} + +TCPSendClient::~TCPSendClient() +{ +    m_running = false; +    m_queue.trigger_wakeup(); +    if (m_sender_thread.joinable()) { +        m_sender_thread.join(); +    } +} + +void TCPSendClient::sendall(const std::vector<uint8_t>& buffer) +{ +    if (not m_running) { +        throw runtime_error(m_exception_data); +    } + +    m_queue.push(buffer); +} + +void TCPSendClient::process() +{ +    try { +        while (m_running) { +            if (m_is_connected) { +                try { +                    vector<uint8_t> incoming; +                    m_queue.wait_and_pop(incoming); +                    if (m_sock.sendall(incoming.data(), incoming.size()) == -1) { +                        m_is_connected = false; +                        m_sock = TCPSocket(); +                    } +                } +                catch (const ThreadsafeQueueWakeup&) { +                    break; +                } +            } +            else { +                try { +                    m_sock.connect(m_hostname, m_port); +                    m_is_connected = true; +                } +                catch (const runtime_error& e) { +                    m_is_connected = false; +                    this_thread::sleep_for(chrono::seconds(1)); +                } +            } +        } +    } +    catch (const runtime_error& e) { +        m_exception_data = e.what(); +        m_running = false; +    } +} +  } diff --git a/lib/Socket.h b/lib/Socket.h index b9f6317..84def40 100644 --- a/lib/Socket.h +++ b/lib/Socket.h @@ -291,4 +291,32 @@ class TCPReceiveServer {          TCPSocket m_listener_socket;  }; +/* A TCP client that abstracts the handling of connects and disconnects. + */ +class TCPSendClient { +    public: +        TCPSendClient(const std::string& hostname, int port); +        ~TCPSendClient(); + +        /* Throws a runtime_error on error +         */ +        void sendall(const std::vector<uint8_t>& buffer); + +    private: +        void process(); + +        std::string m_hostname; +        int m_port; + +        bool m_is_connected = false; + +        TCPSocket m_sock; +        static constexpr size_t MAX_QUEUE_SIZE = 1024; +        ThreadsafeQueue<std::vector<uint8_t> > m_queue; +        std::atomic<bool> m_running; +        std::string m_exception_data; +        std::thread m_sender_thread; +        TCPSocket m_listener_socket; +}; +  } diff --git a/lib/ThreadsafeQueue.h b/lib/ThreadsafeQueue.h index 62f4c96..815dfe0 100644 --- a/lib/ThreadsafeQueue.h +++ b/lib/ThreadsafeQueue.h @@ -52,12 +52,21 @@ public:      /* Push one element into the queue, and notify another thread that       * might be waiting.       * +     * if max_size > 0 and the queue already contains at least max_size elements, +     * the element gets discarded. +     *       * returns the new queue size.       */ -    size_t push(T const& val) +    size_t push(T const& val, size_t max_size = 0)      {          std::unique_lock<std::mutex> lock(the_mutex); -        the_queue.push(val); +        size_t queue_size_before = the_queue.size(); +        if (max_size == 0) { +            the_queue.push(val); +        } +        else if (queue_size_before < max_size) { +            the_queue.push(val); +        }          size_t queue_size = the_queue.size();          lock.unlock(); @@ -66,10 +75,16 @@ public:          return queue_size;      } -    size_t push(T&& val) +    size_t push(T&& val, size_t max_size = 0)      {          std::unique_lock<std::mutex> lock(the_mutex); -        the_queue.emplace(std::move(val)); +        size_t queue_size_before = the_queue.size(); +        if (max_size == 0) { +            the_queue.emplace(std::move(val)); +        } +        else if (queue_size_before < max_size) { +            the_queue.emplace(std::move(val)); +        }          size_t queue_size = the_queue.size();          lock.unlock(); diff --git a/lib/edi/Transport.cpp b/lib/edi/Transport.cpp index 0d5c237..4c91483 100644 --- a/lib/edi/Transport.cpp +++ b/lib/edi/Transport.cpp @@ -87,9 +87,8 @@ Sender::Sender(const configuration_t& conf) :              tcp_dispatchers.emplace(tcp_dest.get(), dispatcher);          }          else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(edi_dest)) { -            auto tcp_socket = make_shared<Socket::TCPSocket>(); -            tcp_socket->connect(tcp_dest->dest_addr, tcp_dest->dest_port); -            tcp_senders.emplace(tcp_dest.get(), tcp_socket); +            auto tcp_send_client = make_shared<Socket::TCPSendClient>(tcp_dest->dest_addr, tcp_dest->dest_port); +            tcp_senders.emplace(tcp_dest.get(), tcp_send_client);          }          else {              throw logic_error("EDI destination not implemented"); @@ -127,8 +126,18 @@ void Sender::write(const TagPacket& tagpacket)              edi_fragments = edi_interleaver.Interleave(edi_fragments);          } +        if (m_conf.verbose) { +            fprintf(stderr, "EDI number of PFT fragments %zu\n", +                    edi_fragments.size()); +        } +          // Send over ethernet -        for (const auto& edi_frag : edi_fragments) { +        for (auto& edi_frag : edi_fragments) { +            if (m_conf.dump) { +                ostream_iterator<uint8_t> debug_iterator(edi_debug_file); +                copy(edi_frag.begin(), edi_frag.end(), debug_iterator); +            } +              for (auto& dest : m_conf.destinations) {                  if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) {                      Socket::InetAddress addr; @@ -140,26 +149,21 @@ void Sender::write(const TagPacket& tagpacket)                      tcp_dispatchers.at(tcp_dest.get())->write(edi_frag);                  }                  else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) { -                    tcp_senders.at(tcp_dest.get())->sendall(edi_frag.data(), edi_frag.size()); +                    tcp_senders.at(tcp_dest.get())->sendall(edi_frag);                  }                  else {                      throw logic_error("EDI destination not implemented");                  }              } - -            if (m_conf.dump) { -                ostream_iterator<uint8_t> debug_iterator(edi_debug_file); -                copy(edi_frag.begin(), edi_frag.end(), debug_iterator); -            } -        } - -        if (m_conf.verbose) { -            fprintf(stderr, "EDI number of PFT fragments %zu\n", -                    edi_fragments.size());          }      }      else {          // Send over ethernet +        if (m_conf.dump) { +            ostream_iterator<uint8_t> debug_iterator(edi_debug_file); +            copy(af_packet.begin(), af_packet.end(), debug_iterator); +        } +          for (auto& dest : m_conf.destinations) {              if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) {                  Socket::InetAddress addr; @@ -171,17 +175,12 @@ void Sender::write(const TagPacket& tagpacket)                  tcp_dispatchers.at(tcp_dest.get())->write(af_packet);              }              else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) { -                tcp_senders.at(tcp_dest.get())->sendall(af_packet.data(), af_packet.size()); +                tcp_senders.at(tcp_dest.get())->sendall(af_packet);              }              else {                  throw logic_error("EDI destination not implemented");              }          } - -        if (m_conf.dump) { -            ostream_iterator<uint8_t> debug_iterator(edi_debug_file); -            copy(af_packet.begin(), af_packet.end(), debug_iterator); -        }      }  } diff --git a/lib/edi/Transport.h b/lib/edi/Transport.h index df6fe56..73b2ab6 100644 --- a/lib/edi/Transport.h +++ b/lib/edi/Transport.h @@ -64,7 +64,7 @@ class Sender {          std::unordered_map<udp_destination_t*, std::shared_ptr<Socket::UDPSocket>> udp_sockets;          std::unordered_map<tcp_server_t*, std::shared_ptr<Socket::TCPDataDispatcher>> tcp_dispatchers; -        std::unordered_map<tcp_client_t*, std::shared_ptr<Socket::TCPSocket>> tcp_senders; +        std::unordered_map<tcp_client_t*, std::shared_ptr<Socket::TCPSendClient>> tcp_senders;  };  } | 
