diff options
Diffstat (limited to 'lib')
| -rw-r--r-- | lib/RemoteControl.cpp | 18 | ||||
| -rw-r--r-- | lib/Socket.cpp | 12 | ||||
| -rw-r--r-- | lib/Socket.h | 23 | ||||
| -rw-r--r-- | lib/edioutput/AFPacket.cpp (renamed from lib/edi/AFPacket.cpp) | 0 | ||||
| -rw-r--r-- | lib/edioutput/AFPacket.h (renamed from lib/edi/AFPacket.h) | 0 | ||||
| -rw-r--r-- | lib/edioutput/EDIConfig.h (renamed from lib/edi/EDIConfig.h) | 3 | ||||
| -rw-r--r-- | lib/edioutput/Interleaver.cpp (renamed from lib/edi/Interleaver.cpp) | 0 | ||||
| -rw-r--r-- | lib/edioutput/Interleaver.h (renamed from lib/edi/Interleaver.h) | 0 | ||||
| -rw-r--r-- | lib/edioutput/PFT.cpp (renamed from lib/edi/PFT.cpp) | 1 | ||||
| -rw-r--r-- | lib/edioutput/PFT.h (renamed from lib/edi/PFT.h) | 5 | ||||
| -rw-r--r-- | lib/edioutput/TagItems.cpp (renamed from lib/edi/TagItems.cpp) | 6 | ||||
| -rw-r--r-- | lib/edioutput/TagItems.h (renamed from lib/edi/TagItems.h) | 2 | ||||
| -rw-r--r-- | lib/edioutput/TagPacket.cpp (renamed from lib/edi/TagPacket.cpp) | 0 | ||||
| -rw-r--r-- | lib/edioutput/TagPacket.h (renamed from lib/edi/TagPacket.h) | 0 | ||||
| -rw-r--r-- | lib/edioutput/Transport.cpp (renamed from lib/edi/Transport.cpp) | 6 | ||||
| -rw-r--r-- | lib/edioutput/Transport.h (renamed from lib/edi/Transport.h) | 0 | 
16 files changed, 44 insertions, 32 deletions
| diff --git a/lib/RemoteControl.cpp b/lib/RemoteControl.cpp index 4adb90c..9ca8d22 100644 --- a/lib/RemoteControl.cpp +++ b/lib/RemoteControl.cpp @@ -29,6 +29,7 @@  #include <algorithm>  #include "RemoteControl.h" +#include "zmq.hpp"  using namespace std; @@ -424,7 +425,7 @@ void RemoteControllerZmq::recv_all(zmq::socket_t& pSocket, std::vector<std::stri      bool more = true;      do {          zmq::message_t msg; -        pSocket.recv(&msg); +        pSocket.recv(msg);          std::string incoming((char*)msg.data(), msg.size());          message.push_back(incoming);          more = msg.more(); @@ -436,7 +437,7 @@ void RemoteControllerZmq::send_ok_reply(zmq::socket_t &pSocket)      zmq::message_t msg(2);      char repCode[2] = {'o', 'k'};      memcpy ((void*) msg.data(), repCode, 2); -    pSocket.send(msg, 0); +    pSocket.send(msg, zmq::send_flags::none);  }  void RemoteControllerZmq::send_fail_reply(zmq::socket_t &pSocket, const std::string &error) @@ -444,11 +445,11 @@ void RemoteControllerZmq::send_fail_reply(zmq::socket_t &pSocket, const std::str      zmq::message_t msg1(4);      char repCode[4] = {'f', 'a', 'i', 'l'};      memcpy ((void*) msg1.data(), repCode, 4); -    pSocket.send(msg1, ZMQ_SNDMORE); +    pSocket.send(msg1, zmq::send_flags::sndmore);      zmq::message_t msg2(error.length());      memcpy ((void*) msg2.data(), error.c_str(), error.length()); -    pSocket.send(msg2, 0); +    pSocket.send(msg2, zmq::send_flags::none);  }  void RemoteControllerZmq::process() @@ -508,8 +509,7 @@ void RemoteControllerZmq::process()                          zmq::message_t zmsg(ss.str().size());                          memcpy ((void*) zmsg.data(), msg_s.data(), msg_s.size()); -                        int flag = (--cohort_size > 0) ? ZMQ_SNDMORE : 0; -                        repSocket.send(zmsg, flag); +                        repSocket.send(zmsg, (--cohort_size > 0) ? zmq::send_flags::sndmore : zmq::send_flags::none);                      }                  }                  else if (msg.size() == 2 && command == "show") { @@ -523,8 +523,7 @@ void RemoteControllerZmq::process()                              zmq::message_t zmsg(ss.str().size());                              memcpy(zmsg.data(), ss.str().data(), ss.str().size()); -                            int flag = (--r_size > 0) ? ZMQ_SNDMORE : 0; -                            repSocket.send(zmsg, flag); +                            repSocket.send(zmsg, (--r_size > 0) ? zmq::send_flags::sndmore : zmq::send_flags::none);                          }                      }                      catch (const ParameterError &err) { @@ -539,7 +538,7 @@ void RemoteControllerZmq::process()                          std::string value = rcs.get_param(module, parameter);                          zmq::message_t zmsg(value.size());                          memcpy ((void*) zmsg.data(), value.data(), value.size()); -                        repSocket.send(zmsg, 0); +                        repSocket.send(zmsg, zmq::send_flags::none);                      }                      catch (const ParameterError &err) {                          send_fail_reply(repSocket, err.what()); @@ -576,4 +575,3 @@ void RemoteControllerZmq::process()  }  #endif - diff --git a/lib/Socket.cpp b/lib/Socket.cpp index d41ed1c..6a20429 100644 --- a/lib/Socket.cpp +++ b/lib/Socket.cpp @@ -862,9 +862,9 @@ TCPReceiveServer::~TCPReceiveServer()      }  } -vector<uint8_t> TCPReceiveServer::receive() +shared_ptr<TCPReceiveMessage> TCPReceiveServer::receive()  { -    vector<uint8_t> buffer; +    shared_ptr<TCPReceiveMessage> buffer = make_shared<TCPReceiveMessageEmpty>();      m_queue.try_pop(buffer);      // we can ignore try_pop()'s return value, because @@ -892,11 +892,12 @@ void TCPReceiveServer::process()                  }                  else if (r == 0) {                      sock.close(); +                    m_queue.push(make_shared<TCPReceiveMessageDisconnected>());                      break;                  }                  else {                      buf.resize(r); -                    m_queue.push(move(buf)); +                    m_queue.push(make_shared<TCPReceiveMessageData>(move(buf)));                  }              }              catch (const TCPSocket::Interrupted&) { @@ -905,6 +906,11 @@ void TCPReceiveServer::process()              catch (const TCPSocket::Timeout&) {                  num_timeouts++;              } +            catch (const runtime_error& e) { +                sock.close(); +                // TODO replace fprintf +                fprintf(stderr, "TCP Receiver restarted after error: %s\n", e.what()); +            }              if (num_timeouts > max_num_timeouts) {                  sock.close(); diff --git a/lib/Socket.h b/lib/Socket.h index 8881be3..2291dd5 100644 --- a/lib/Socket.h +++ b/lib/Socket.h @@ -30,11 +30,12 @@  #include "ThreadsafeQueue.h"  #include <cstdlib> -#include <iostream> -#include <vector>  #include <atomic> -#include <thread> +#include <iostream>  #include <list> +#include <memory> +#include <thread> +#include <vector>  #include <sys/socket.h>  #include <netinet/in.h> @@ -265,6 +266,14 @@ class TCPDataDispatcher          std::list<TCPConnection> m_connections;  }; +struct TCPReceiveMessage { virtual ~TCPReceiveMessage() {}; }; +struct TCPReceiveMessageDisconnected : public TCPReceiveMessage { }; +struct TCPReceiveMessageEmpty : public TCPReceiveMessage { }; +struct TCPReceiveMessageData : public TCPReceiveMessage { +    TCPReceiveMessageData(std::vector<uint8_t> d) : data(d) {}; +    std::vector<uint8_t> data; +}; +  /* A TCP Server to receive data, which abstracts the handling of connects and disconnects.   */  class TCPReceiveServer { @@ -276,15 +285,15 @@ class TCPReceiveServer {          void start(int listen_port, const std::string& address); -        // Return a vector that contains up to blocksize bytes of data, or -        // and empty vector if no data is available. -        std::vector<uint8_t> receive(); +        // Return an instance of a subclass of TCPReceiveMessage that contains up to blocksize +        // bytes of data, or TCPReceiveMessageEmpty if no data is available. +        std::shared_ptr<TCPReceiveMessage> receive();      private:          void process();          size_t m_blocksize = 0; -        ThreadsafeQueue<std::vector<uint8_t> > m_queue; +        ThreadsafeQueue<std::shared_ptr<TCPReceiveMessage> > m_queue;          std::atomic<bool> m_running = ATOMIC_VAR_INIT(false);          std::string m_exception_data;          std::thread m_listener_thread; diff --git a/lib/edi/AFPacket.cpp b/lib/edioutput/AFPacket.cpp index b38c38b..b38c38b 100644 --- a/lib/edi/AFPacket.cpp +++ b/lib/edioutput/AFPacket.cpp diff --git a/lib/edi/AFPacket.h b/lib/edioutput/AFPacket.h index f2c4e35..f2c4e35 100644 --- a/lib/edi/AFPacket.h +++ b/lib/edioutput/AFPacket.h diff --git a/lib/edi/EDIConfig.h b/lib/edioutput/EDIConfig.h index 4f1df97..647d77e 100644 --- a/lib/edi/EDIConfig.h +++ b/lib/edioutput/EDIConfig.h @@ -44,6 +44,7 @@ struct destination_t {  // Can represent both unicast and multicast destinations  struct udp_destination_t : public destination_t {      std::string dest_addr; +    unsigned int dest_port = 0;      std::string source_addr;      unsigned int source_port = 0;      unsigned int ttl = 10; @@ -68,10 +69,8 @@ struct configuration_t {      bool dump          = false;      // dump a file with the EDI packets      bool verbose       = false;      bool enable_pft    = false;      // Enable protection and fragmentation -    bool enable_transport_header = true; // Sets Addr, Source and Dest in PFT      unsigned int tagpacket_alignment = 0;      std::vector<std::shared_ptr<destination_t> > destinations; -    unsigned int dest_port = 0;      // common destination port, because it's encoded in the transport layer      unsigned int latency_frames = 0; // if nonzero, enable interleaver with a latency of latency_frames * 24ms      bool enabled() const { return destinations.size() > 0; } diff --git a/lib/edi/Interleaver.cpp b/lib/edioutput/Interleaver.cpp index f26a50e..f26a50e 100644 --- a/lib/edi/Interleaver.cpp +++ b/lib/edioutput/Interleaver.cpp diff --git a/lib/edi/Interleaver.h b/lib/edioutput/Interleaver.h index 3029d5d..3029d5d 100644 --- a/lib/edi/Interleaver.h +++ b/lib/edioutput/Interleaver.h diff --git a/lib/edi/PFT.cpp b/lib/edioutput/PFT.cpp index 1e3d4da..b2f07e0 100644 --- a/lib/edi/PFT.cpp +++ b/lib/edioutput/PFT.cpp @@ -55,7 +55,6 @@ PFT::PFT() { }  PFT::PFT(const configuration_t &conf) :      m_k(conf.chunk_len),      m_m(conf.fec), -    m_dest_port(conf.dest_port),      m_pseq(0),      m_num_chunks(0),      m_verbose(conf.verbose) diff --git a/lib/edi/PFT.h b/lib/edioutput/PFT.h index 1019915..4d138c5 100644 --- a/lib/edi/PFT.h +++ b/lib/edioutput/PFT.h @@ -68,13 +68,14 @@ class PFT      private:          unsigned int m_k = 207; // length of RS data word          unsigned int m_m = 3; // number of fragments that can be recovered if lost -        unsigned int m_dest_port = 12000; // Destination port for transport header          uint16_t m_pseq = 0;          size_t m_num_chunks = 0;          bool m_verbose = false; -        bool m_transport_header = false; +        // Transport header is always deactivated +        const bool m_transport_header = false;          const uint16_t m_addr_source = 0; +        const unsigned int m_dest_port = 0;  };  } diff --git a/lib/edi/TagItems.cpp b/lib/edioutput/TagItems.cpp index 9746469..739adfa 100644 --- a/lib/edi/TagItems.cpp +++ b/lib/edioutput/TagItems.cpp @@ -212,8 +212,8 @@ std::vector<uint8_t> TagDSTI::Assemble()      packet.push_back(0);      packet.push_back(0); -    uint8_t dfctl = dflc % 250; -    uint8_t dfcth = dflc / 250; +    uint8_t dfctl = dlfc % 250; +    uint8_t dfcth = dlfc / 250;      uint16_t dstiHeader = dfctl | (dfcth << 8) | (rfadf << 13) | (atstf << 14) | (stihf << 15); @@ -254,7 +254,7 @@ std::vector<uint8_t> TagDSTI::Assemble()      packet[6] = (taglength >> 8) & 0xFF;      packet[7] = taglength & 0xFF; -    dflc = (dflc+1) % 5000; +    dlfc = (dlfc+1) % 5000;      /*      std::cerr << "TagItem dsti, packet.size " << packet.size() << std::endl; diff --git a/lib/edi/TagItems.h b/lib/edioutput/TagItems.h index 5c81b01..f24dc44 100644 --- a/lib/edi/TagItems.h +++ b/lib/edioutput/TagItems.h @@ -147,7 +147,7 @@ class TagDSTI : public TagItem          bool stihf = false;          bool atstf = false; // presence of atst data          bool rfadf = false; -        uint16_t dflc = 0; // modulo 5000 frame counter +        uint16_t dlfc = 0; // modulo 5000 frame counter          // STI Header (optional)          uint8_t stat = 0; diff --git a/lib/edi/TagPacket.cpp b/lib/edioutput/TagPacket.cpp index ec52ad7..ec52ad7 100644 --- a/lib/edi/TagPacket.cpp +++ b/lib/edioutput/TagPacket.cpp diff --git a/lib/edi/TagPacket.h b/lib/edioutput/TagPacket.h index b53b718..b53b718 100644 --- a/lib/edi/TagPacket.h +++ b/lib/edioutput/TagPacket.h diff --git a/lib/edi/Transport.cpp b/lib/edioutput/Transport.cpp index fa7588a..cfed9ec 100644 --- a/lib/edi/Transport.cpp +++ b/lib/edioutput/Transport.cpp @@ -38,7 +38,7 @@ void configuration_t::print() const      etiLog.level(info) << " verbose     " << verbose;      for (auto edi_dest : destinations) {          if (auto udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(edi_dest)) { -            etiLog.level(info) << " UDP to " << udp_dest->dest_addr << ":" << dest_port; +            etiLog.level(info) << " UDP to " << udp_dest->dest_addr << ":" << udp_dest->dest_port;              if (not udp_dest->source_addr.empty()) {                  etiLog.level(info) << "  source      " << udp_dest->source_addr;                  etiLog.level(info) << "  ttl         " << udp_dest->ttl; @@ -148,7 +148,7 @@ void Sender::write(const TagPacket& tagpacket)              for (auto& dest : m_conf.destinations) {                  if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) {                      Socket::InetAddress addr; -                    addr.resolveUdpDestination(udp_dest->dest_addr, m_conf.dest_port); +                    addr.resolveUdpDestination(udp_dest->dest_addr, udp_dest->dest_port);                      udp_sockets.at(udp_dest.get())->send(edi_frag, addr);                  } @@ -176,7 +176,7 @@ void Sender::write(const TagPacket& tagpacket)          for (auto& dest : m_conf.destinations) {              if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) {                  Socket::InetAddress addr; -                addr.resolveUdpDestination(udp_dest->dest_addr, m_conf.dest_port); +                addr.resolveUdpDestination(udp_dest->dest_addr, udp_dest->dest_port);                  if (af_packet.size() > 1400 and not m_udp_fragmentation_warning_printed) {                      fprintf(stderr, "EDI Output: AF packet larger than 1400," diff --git a/lib/edi/Transport.h b/lib/edioutput/Transport.h index 56ded3b..56ded3b 100644 --- a/lib/edi/Transport.h +++ b/lib/edioutput/Transport.h | 
