diff options
Diffstat (limited to 'lib/edioutput')
| -rw-r--r-- | lib/edioutput/EDIConfig.h | 5 | ||||
| -rw-r--r-- | lib/edioutput/Transport.cpp | 128 | ||||
| -rw-r--r-- | lib/edioutput/Transport.h | 20 | 
3 files changed, 100 insertions, 53 deletions
| diff --git a/lib/edioutput/EDIConfig.h b/lib/edioutput/EDIConfig.h index 647d77e..be6c9c4 100644 --- a/lib/edioutput/EDIConfig.h +++ b/lib/edioutput/EDIConfig.h @@ -71,10 +71,11 @@ struct configuration_t {      bool enable_pft    = false;      // Enable protection and fragmentation      unsigned int tagpacket_alignment = 0;      std::vector<std::shared_ptr<destination_t> > destinations; -    unsigned int latency_frames = 0; // if nonzero, enable interleaver with a latency of latency_frames * 24ms +    double fragment_spreading_factor = 0.95; +    // Spread transmission of fragments in time. 1.0 = 100% means spreading over the whole duration of a frame (24ms) +    // Above 100% means that the fragments are spread over several 24ms periods, interleaving the AF packets.      bool enabled() const { return destinations.size() > 0; } -    bool interleaver_enabled() const { return latency_frames > 0; }      void print() const;  }; diff --git a/lib/edioutput/Transport.cpp b/lib/edioutput/Transport.cpp index f8e5dc7..136c71c 100644 --- a/lib/edioutput/Transport.cpp +++ b/lib/edioutput/Transport.cpp @@ -27,6 +27,7 @@  #include "Transport.h"  #include <iterator>  #include <cmath> +#include <thread>  using namespace std; @@ -57,9 +58,6 @@ void configuration_t::print() const              throw logic_error("EDI destination not implemented");          }      } -    if (interleaver_enabled()) { -        etiLog.level(info) << " interleave     " << latency_frames * 24 << " ms"; -    }  } @@ -96,19 +94,33 @@ Sender::Sender(const configuration_t& conf) :          }      } -    if (m_conf.interleaver_enabled()) { -        edi_interleaver.SetLatency(m_conf.latency_frames); -    } -      if (m_conf.dump) {          edi_debug_file.open("./edi.debug");      } +    if (m_conf.enable_pft) { +        unique_lock<mutex> lock(m_mutex); +        m_running = true; +        m_thread = thread(&Sender::run, this); +    } +      if (m_conf.verbose) {          etiLog.log(info, "EDI output set up");      }  } +Sender::~Sender() +{ +    { +        unique_lock<mutex> lock(m_mutex); +        m_running = false; +    } + +    if (m_thread.joinable()) { +        m_thread.join(); +    } +} +  void Sender::write(const TagPacket& tagpacket)  {      // Assemble into one AF Packet @@ -119,57 +131,34 @@ void Sender::write(const TagPacket& tagpacket)          vector<edi::PFTFragment> edi_fragments = edi_pft.Assemble(af_packet);          if (m_conf.verbose) { -            fprintf(stderr, "EDI Output: Number of PFT fragment before interleaver %zu\n", -                    edi_fragments.size()); -        } - -        if (m_conf.interleaver_enabled()) { -            edi_fragments = edi_interleaver.Interleave(edi_fragments); -        } - -        if (m_conf.verbose) {              fprintf(stderr, "EDI Output: Number of PFT fragments %zu\n",                      edi_fragments.size());          }          /* Spread out the transmission of all fragments over 25% of the 24ms AF packet duration -         * to reduce the risk of losing a burst of fragments because of congestion. -         * -         * 25% was chosen so that other outputs still have time to do their thing. */ -        auto inter_fragment_wait_time = std::chrono::microseconds(0); +         * to reduce the risk of losing a burst of fragments because of congestion. */ +        using namespace std::chrono; +        auto inter_fragment_wait_time = microseconds(0);          if (edi_fragments.size() > 1) { -            inter_fragment_wait_time = std::chrono::microseconds(llrint(0.25 * 24000.0 / edi_fragments.size())); +            inter_fragment_wait_time = microseconds( +                    llrint(m_conf.fragment_spreading_factor * 24000.0 / edi_fragments.size()) +                    );          } -        // Send over ethernet -        for (auto& edi_frag : edi_fragments) { -            if (m_conf.dump) { -                ostream_iterator<uint8_t> debug_iterator(edi_debug_file); -                copy(edi_frag.begin(), edi_frag.end(), debug_iterator); +        /* Separate insertion into map and transmission so as to make spreading possible */ +        const auto now = steady_clock::now(); +        { +            auto tp = now; +            unique_lock<mutex> lock(m_mutex); +            for (auto& edi_frag : edi_fragments) { +                m_pending_frames[tp] = move(edi_frag); +                tp += inter_fragment_wait_time;              } - -            for (auto& dest : m_conf.destinations) { -                if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) { -                    Socket::InetAddress addr; -                    addr.resolveUdpDestination(udp_dest->dest_addr, udp_dest->dest_port); - -                    udp_sockets.at(udp_dest.get())->send(edi_frag, addr); -                } -                else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(dest)) { -                    tcp_dispatchers.at(tcp_dest.get())->write(edi_frag); -                } -                else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) { -                    tcp_senders.at(tcp_dest.get())->sendall(edi_frag); -                } -                else { -                    throw logic_error("EDI destination not implemented"); -                } -            } - -            std::this_thread::sleep_for(inter_fragment_wait_time);          } + +        // Transmission done in run() function      } -    else { +    else /* PFT disabled */ {          // Send over ethernet          if (m_conf.dump) {              ostream_iterator<uint8_t> debug_iterator(edi_debug_file); @@ -202,4 +191,49 @@ void Sender::write(const TagPacket& tagpacket)      }  } +void Sender::run() +{ +    while (m_running) { +        unique_lock<mutex> lock(m_mutex); +        const auto now = chrono::steady_clock::now(); + +        // Send over ethernet +        for (auto it = m_pending_frames.begin(); it != m_pending_frames.end(); ) { +            const auto& edi_frag = it->second; + +            if (it->first <= now) { +                if (m_conf.dump) { +                    ostream_iterator<uint8_t> debug_iterator(edi_debug_file); +                    copy(edi_frag.begin(), edi_frag.end(), debug_iterator); +                } + +                for (auto& dest : m_conf.destinations) { +                    if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) { +                        Socket::InetAddress addr; +                        addr.resolveUdpDestination(udp_dest->dest_addr, udp_dest->dest_port); + +                        udp_sockets.at(udp_dest.get())->send(edi_frag, addr); +                    } +                    else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(dest)) { +                        tcp_dispatchers.at(tcp_dest.get())->write(edi_frag); +                    } +                    else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) { +                        tcp_senders.at(tcp_dest.get())->sendall(edi_frag); +                    } +                    else { +                        throw logic_error("EDI destination not implemented"); +                    } +                } +                it = m_pending_frames.erase(it); +            } +            else { +                ++it; +            } +        } + +        lock.unlock(); +        this_thread::sleep_for(chrono::microseconds(500)); +    } +} +  } diff --git a/lib/edioutput/Transport.h b/lib/edioutput/Transport.h index 56ded3b..3bcc2f4 100644 --- a/lib/edioutput/Transport.h +++ b/lib/edioutput/Transport.h @@ -31,13 +31,16 @@  #include "EDIConfig.h"  #include "AFPacket.h"  #include "PFT.h" -#include "Interleaver.h"  #include "Socket.h"  #include <vector> +#include <chrono> +#include <map>  #include <unordered_map>  #include <stdexcept>  #include <fstream>  #include <cstdint> +#include <thread> +#include <mutex>  namespace edi { @@ -46,10 +49,15 @@ namespace edi {  class Sender {      public:          Sender(const configuration_t& conf); +        Sender(const Sender&) = delete; +        Sender operator=(const Sender&) = delete; +        ~Sender();          void write(const TagPacket& tagpacket);      private: +        void run(); +          bool m_udp_fragmentation_warning_printed = false;          configuration_t m_conf; @@ -61,12 +69,16 @@ class Sender {          // The AF Packet will be protected with reed-solomon and split in fragments          edi::PFT edi_pft; -        // To mitigate for burst packet loss, PFT fragments can be sent out-of-order -        edi::Interleaver edi_interleaver; -          std::unordered_map<udp_destination_t*, std::shared_ptr<Socket::UDPSocket>> udp_sockets;          std::unordered_map<tcp_server_t*, std::shared_ptr<Socket::TCPDataDispatcher>> tcp_dispatchers;          std::unordered_map<tcp_client_t*, std::shared_ptr<Socket::TCPSendClient>> tcp_senders; + +        // PFT spreading requires sending UDP packets at specific time, independently of +        // time when write() gets called +        std::thread m_thread; +        std::mutex m_mutex; +        bool m_running = false; +        std::map<std::chrono::steady_clock::time_point, edi::PFTFragment> m_pending_frames;  };  } | 
