From cdeded18deb9ef2a12bf5206757f235e3925c848 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Wed, 24 Feb 2021 10:23:55 +0100 Subject: Common fc2902b and 4ad00b8: Update EDI output interleaver and spreading --- lib/edioutput/Transport.cpp | 128 ++++++++++++++++++++++++++++---------------- 1 file changed, 81 insertions(+), 47 deletions(-) (limited to 'lib/edioutput/Transport.cpp') diff --git a/lib/edioutput/Transport.cpp b/lib/edioutput/Transport.cpp index f8e5dc7..136c71c 100644 --- a/lib/edioutput/Transport.cpp +++ b/lib/edioutput/Transport.cpp @@ -27,6 +27,7 @@ #include "Transport.h" #include #include +#include using namespace std; @@ -57,9 +58,6 @@ void configuration_t::print() const throw logic_error("EDI destination not implemented"); } } - if (interleaver_enabled()) { - etiLog.level(info) << " interleave " << latency_frames * 24 << " ms"; - } } @@ -96,19 +94,33 @@ Sender::Sender(const configuration_t& conf) : } } - if (m_conf.interleaver_enabled()) { - edi_interleaver.SetLatency(m_conf.latency_frames); - } - if (m_conf.dump) { edi_debug_file.open("./edi.debug"); } + if (m_conf.enable_pft) { + unique_lock lock(m_mutex); + m_running = true; + m_thread = thread(&Sender::run, this); + } + if (m_conf.verbose) { etiLog.log(info, "EDI output set up"); } } +Sender::~Sender() +{ + { + unique_lock lock(m_mutex); + m_running = false; + } + + if (m_thread.joinable()) { + m_thread.join(); + } +} + void Sender::write(const TagPacket& tagpacket) { // Assemble into one AF Packet @@ -118,58 +130,35 @@ void Sender::write(const TagPacket& tagpacket) // Apply PFT layer to AF Packet (Reed Solomon FEC and Fragmentation) vector edi_fragments = edi_pft.Assemble(af_packet); - if (m_conf.verbose) { - fprintf(stderr, "EDI Output: Number of PFT fragment before interleaver %zu\n", - edi_fragments.size()); - } - - if (m_conf.interleaver_enabled()) { - edi_fragments = edi_interleaver.Interleave(edi_fragments); - } - if (m_conf.verbose) { fprintf(stderr, "EDI Output: Number of PFT fragments %zu\n", edi_fragments.size()); } /* Spread out the transmission of all fragments over 25% of the 24ms AF packet duration - * to reduce the risk of losing a burst of fragments because of congestion. - * - * 25% was chosen so that other outputs still have time to do their thing. */ - auto inter_fragment_wait_time = std::chrono::microseconds(0); + * to reduce the risk of losing a burst of fragments because of congestion. */ + using namespace std::chrono; + auto inter_fragment_wait_time = microseconds(0); if (edi_fragments.size() > 1) { - inter_fragment_wait_time = std::chrono::microseconds(llrint(0.25 * 24000.0 / edi_fragments.size())); + inter_fragment_wait_time = microseconds( + llrint(m_conf.fragment_spreading_factor * 24000.0 / edi_fragments.size()) + ); } - // Send over ethernet - for (auto& edi_frag : edi_fragments) { - if (m_conf.dump) { - ostream_iterator debug_iterator(edi_debug_file); - copy(edi_frag.begin(), edi_frag.end(), debug_iterator); + /* Separate insertion into map and transmission so as to make spreading possible */ + const auto now = steady_clock::now(); + { + auto tp = now; + unique_lock lock(m_mutex); + for (auto& edi_frag : edi_fragments) { + m_pending_frames[tp] = move(edi_frag); + tp += inter_fragment_wait_time; } - - for (auto& dest : m_conf.destinations) { - if (const auto& udp_dest = dynamic_pointer_cast(dest)) { - Socket::InetAddress addr; - addr.resolveUdpDestination(udp_dest->dest_addr, udp_dest->dest_port); - - udp_sockets.at(udp_dest.get())->send(edi_frag, addr); - } - else if (auto tcp_dest = dynamic_pointer_cast(dest)) { - tcp_dispatchers.at(tcp_dest.get())->write(edi_frag); - } - else if (auto tcp_dest = dynamic_pointer_cast(dest)) { - tcp_senders.at(tcp_dest.get())->sendall(edi_frag); - } - else { - throw logic_error("EDI destination not implemented"); - } - } - - std::this_thread::sleep_for(inter_fragment_wait_time); } + + // Transmission done in run() function } - else { + else /* PFT disabled */ { // Send over ethernet if (m_conf.dump) { ostream_iterator debug_iterator(edi_debug_file); @@ -202,4 +191,49 @@ void Sender::write(const TagPacket& tagpacket) } } +void Sender::run() +{ + while (m_running) { + unique_lock lock(m_mutex); + const auto now = chrono::steady_clock::now(); + + // Send over ethernet + for (auto it = m_pending_frames.begin(); it != m_pending_frames.end(); ) { + const auto& edi_frag = it->second; + + if (it->first <= now) { + if (m_conf.dump) { + ostream_iterator debug_iterator(edi_debug_file); + copy(edi_frag.begin(), edi_frag.end(), debug_iterator); + } + + for (auto& dest : m_conf.destinations) { + if (const auto& udp_dest = dynamic_pointer_cast(dest)) { + Socket::InetAddress addr; + addr.resolveUdpDestination(udp_dest->dest_addr, udp_dest->dest_port); + + udp_sockets.at(udp_dest.get())->send(edi_frag, addr); + } + else if (auto tcp_dest = dynamic_pointer_cast(dest)) { + tcp_dispatchers.at(tcp_dest.get())->write(edi_frag); + } + else if (auto tcp_dest = dynamic_pointer_cast(dest)) { + tcp_senders.at(tcp_dest.get())->sendall(edi_frag); + } + else { + throw logic_error("EDI destination not implemented"); + } + } + it = m_pending_frames.erase(it); + } + else { + ++it; + } + } + + lock.unlock(); + this_thread::sleep_for(chrono::microseconds(500)); + } +} + } -- cgit v1.2.3