diff options
| author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2025-03-23 23:05:14 +0100 | 
|---|---|---|
| committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2025-03-23 23:05:14 +0100 | 
| commit | ef6ea5ab6b927fcf9e5152fbf44f72646848d2c9 (patch) | |
| tree | bc438ec3094b00eeee973766074e55e56f47cfd2 | |
| parent | d57e0e9635a18f226394b9f41feef1658a2e051c (diff) | |
| download | dabmux-ef6ea5ab6b927fcf9e5152fbf44f72646848d2c9.tar.gz dabmux-ef6ea5ab6b927fcf9e5152fbf44f72646848d2c9.tar.bz2 dabmux-ef6ea5ab6b927fcf9e5152fbf44f72646848d2c9.zip  | |
Common b23da85: make PFT per-output configurable
| -rw-r--r-- | doc/advanced.mux | 9 | ||||
| -rw-r--r-- | lib/edioutput/EDIConfig.h | 31 | ||||
| -rw-r--r-- | lib/edioutput/PFT.cpp | 12 | ||||
| -rw-r--r-- | lib/edioutput/PFT.h | 15 | ||||
| -rw-r--r-- | lib/edioutput/Transport.cpp | 288 | ||||
| -rw-r--r-- | lib/edioutput/Transport.h | 93 | ||||
| -rw-r--r-- | src/DabMux.cpp | 52 | 
7 files changed, 296 insertions, 204 deletions
diff --git a/doc/advanced.mux b/doc/advanced.mux index c07a2b2..0fc1b53 100644 --- a/doc/advanced.mux +++ b/doc/advanced.mux @@ -438,6 +438,10 @@ outputs {                  destination "192.168.23.23"                  port        12000 +                enable_pft  true +                fec         1 +                verbose     true +                  ; For compatibility: if port is not specified in the destination itself,                  ; it is taken from the parent 'destinations' block.              } @@ -452,6 +456,8 @@ outputs {                  ; The multicast TTL has to be adapted according to your network                  ttl 1 +                enable_pft  true +                fec         1              }              example_tcp {                  ; example for EDI TCP server. TCP is reliable, so it is counterproductive to @@ -469,7 +475,8 @@ outputs {              }          } -        ; The settings below apply to all destinations +        ; The settings below apply to all destinations, unless they are overridden +        ; inside a destination          ; Enable the PFT subsystem. If false, AFPackets are sent.          ; PFT is not necessary when using TCP. diff --git a/lib/edioutput/EDIConfig.h b/lib/edioutput/EDIConfig.h index 1997210..7016e87 100644 --- a/lib/edioutput/EDIConfig.h +++ b/lib/edioutput/EDIConfig.h @@ -1,5 +1,5 @@  /* -   Copyright (C) 2019 +   Copyright (C) 2025     Matthias P. Braendli, matthias.braendli@mpb.li      http://www.opendigitalradio.org @@ -36,17 +36,31 @@ namespace edi {  /** Configuration for EDI output */ +struct pft_settings_t { +    // protection and fragmentation settings +    bool verbose       = false; +    bool enable_pft    = false; +    unsigned chunk_len = 207;        // RSk, data length of each chunk +    unsigned fec       = 0;          // number of fragments that can be recovered +    double fragment_spreading_factor = 0.95; +    // Spread transmission of fragments in time. 1.0 = 100% means spreading over the whole duration of a frame (24ms) +    // Above 100% means that the fragments are spread over several 24ms periods, interleaving the AF packets. +}; +  struct destination_t {      virtual ~destination_t() {}; + +    pft_settings_t pft_settings = {};  }; +  // Can represent both unicast and multicast destinations  struct udp_destination_t : public destination_t {      std::string dest_addr; -    unsigned int dest_port = 0; +    uint16_t dest_port = 0;      std::string source_addr; -    unsigned int source_port = 0; -    unsigned int ttl = 10; +    uint16_t source_port = 0; +    uint8_t ttl = 10;  };  // TCP server that can accept multiple connections @@ -66,16 +80,9 @@ struct tcp_client_t : public destination_t {  };  struct configuration_t { -    unsigned chunk_len = 207;        // RSk, data length of each chunk -    unsigned fec       = 0;          // number of fragments that can be recovered -    bool dump          = false;      // dump a file with the EDI packets -    bool verbose       = false; -    bool enable_pft    = false;      // Enable protection and fragmentation +    bool verbose = false;      unsigned int tagpacket_alignment = 0;      std::vector<std::shared_ptr<destination_t> > destinations; -    double fragment_spreading_factor = 0.95; -    // Spread transmission of fragments in time. 1.0 = 100% means spreading over the whole duration of a frame (24ms) -    // Above 100% means that the fragments are spread over several 24ms periods, interleaving the AF packets.      bool enabled() const { return destinations.size() > 0; } diff --git a/lib/edioutput/PFT.cpp b/lib/edioutput/PFT.cpp index 7e0e8e9..f65fd67 100644 --- a/lib/edioutput/PFT.cpp +++ b/lib/edioutput/PFT.cpp @@ -1,5 +1,5 @@  /* -   Copyright (C) 2021 +   Copyright (C) 2025     Matthias P. Braendli, matthias.braendli@mpb.li      http://www.opendigitalradio.org @@ -31,7 +31,6 @@   */  #include <vector> -#include <list>  #include <cstdio>  #include <cstring>  #include <cstdint> @@ -41,6 +40,7 @@  #include "PFT.h"  #include "crc.h"  #include "ReedSolomon.h" +#include "Log.h"  namespace edi { @@ -51,11 +51,10 @@ using namespace std;  PFT::PFT() { } -PFT::PFT(const configuration_t &conf) : +PFT::PFT(const pft_settings_t& conf) : +    m_enabled(conf.enable_pft),      m_k(conf.chunk_len),      m_m(conf.fec), -    m_pseq(0), -    m_num_chunks(0),      m_verbose(conf.verbose)      {          if (m_k > 207) { @@ -324,5 +323,4 @@ void PFT::OverridePSeq(uint16_t pseq)      m_pseq = pseq;  } -} - +} // namespace edi diff --git a/lib/edioutput/PFT.h b/lib/edioutput/PFT.h index 42569a0..52e9f46 100644 --- a/lib/edioutput/PFT.h +++ b/lib/edioutput/PFT.h @@ -1,5 +1,5 @@  /* -   Copyright (C) 2021 +   Copyright (C) 2025     Matthias P. Braendli, matthias.braendli@mpb.li      http://www.opendigitalradio.org @@ -33,12 +33,8 @@  #pragma once  #include <vector> -#include <list> -#include <stdexcept>  #include <cstdint>  #include "AFPacket.h" -#include "Log.h" -#include "ReedSolomon.h"  #include "EDIConfig.h"  namespace edi { @@ -52,21 +48,24 @@ class PFT          static constexpr int PARITYBYTES = 48;          PFT(); -        PFT(const configuration_t& conf); +        PFT(const pft_settings_t& conf); + +        bool is_enabled() const { return m_enabled and m_k > 0; }          // return a list of PFT fragments with the correct          // PFT headers -        std::vector< PFTFragment > Assemble(AFPacket af_packet); +        std::vector<PFTFragment> Assemble(AFPacket af_packet);          // Apply Reed-Solomon FEC to the AF Packet          RSBlock Protect(AFPacket af_packet);          // Cut a RSBlock into several fragments that can be transmitted -        std::vector< std::vector<uint8_t> > ProtectAndFragment(AFPacket af_packet); +        std::vector<std::vector<uint8_t>> ProtectAndFragment(AFPacket af_packet);          void OverridePSeq(uint16_t pseq);      private: +        bool m_enabled = false;          unsigned int m_k = 207; // length of RS data word          unsigned int m_m = 3; // number of fragments that can be recovered if lost          uint16_t m_pseq = 0; diff --git a/lib/edioutput/Transport.cpp b/lib/edioutput/Transport.cpp index a5e0bc3..e9559b5 100644 --- a/lib/edioutput/Transport.cpp +++ b/lib/edioutput/Transport.cpp @@ -25,7 +25,7 @@     along with this program.  If not, see <http://www.gnu.org/licenses/>.   */  #include "Transport.h" -#include <iterator> +#include "Log.h"  #include <cmath>  #include <thread> @@ -57,13 +57,18 @@ void configuration_t::print() const          else {              throw logic_error("EDI destination not implemented");          } +        etiLog.level(info) << "  PFT=" << edi_dest->pft_settings.enable_pft; +        if (edi_dest->pft_settings.enable_pft) { +            etiLog.level(info) << "  FEC=" << edi_dest->pft_settings.fec; +            etiLog.level(info) << "  Chunk Len=" << edi_dest->pft_settings.chunk_len; +            etiLog.level(info) << "  Fragment spreading factor=" << edi_dest->pft_settings.fragment_spreading_factor; +        }      }  }  Sender::Sender(const configuration_t& conf) : -    m_conf(conf), -    edi_pft(m_conf) +    m_conf(conf)  {      if (m_conf.verbose) {          etiLog.level(info) << "Setup EDI Output"; @@ -71,37 +76,39 @@ Sender::Sender(const configuration_t& conf) :      for (const auto& edi_dest : m_conf.destinations) {          if (const auto udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(edi_dest)) { -            auto udp_socket = std::make_shared<Socket::UDPSocket>(udp_dest->source_port); +            Socket::UDPSocket udp_socket(udp_dest->source_port);              if (not udp_dest->source_addr.empty()) { -                udp_socket->setMulticastSource(udp_dest->source_addr.c_str()); -                udp_socket->setMulticastTTL(udp_dest->ttl); +                udp_socket.setMulticastSource(udp_dest->source_addr.c_str()); +                udp_socket.setMulticastTTL(udp_dest->ttl);              } -            udp_sockets.emplace(udp_dest.get(), udp_socket); +            auto sender = make_shared<udp_sender_t>( +                    udp_dest->dest_addr, +                    udp_dest->dest_port, +                    std::move(udp_socket)); +            m_pft_spreaders.emplace_back( +                make_shared<PFTSpreader>(udp_dest->pft_settings, sender));          }          else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(edi_dest)) { -            auto dispatcher = make_shared<Socket::TCPDataDispatcher>( -                    tcp_dest->max_frames_queued, tcp_dest->tcp_server_preroll_buffers); - -            dispatcher->start(tcp_dest->listen_port, "0.0.0.0"); -            tcp_dispatchers.emplace(tcp_dest.get(), dispatcher); +            auto sender = make_shared<tcp_dispatcher_t>( +                    tcp_dest->listen_port, +                    tcp_dest->max_frames_queued, +                    tcp_dest->tcp_server_preroll_buffers); +            m_pft_spreaders.emplace_back( +                    make_shared<PFTSpreader>(tcp_dest->pft_settings, sender));          }          else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(edi_dest)) { -            auto tcp_send_client = make_shared<Socket::TCPSendClient>(tcp_dest->dest_addr, tcp_dest->dest_port); -            tcp_senders.emplace(tcp_dest.get(), tcp_send_client); +            auto sender = make_shared<tcp_send_client_t>(tcp_dest->dest_addr, tcp_dest->dest_port); +            m_pft_spreaders.emplace_back( +                    make_shared<PFTSpreader>(tcp_dest->pft_settings, sender));          }          else {              throw logic_error("EDI destination not implemented");          }      } -    if (m_conf.dump) { -        edi_debug_file.open("./edi.debug"); -    } - -    if (m_conf.enable_pft) { -        unique_lock<mutex> lock(m_mutex); +    {          m_running = true;          m_thread = thread(&Sender::run, this);      } @@ -111,10 +118,52 @@ Sender::Sender(const configuration_t& conf) :      }  } +void Sender::write(const TagPacket& tagpacket) +{ +    // Assemble into one AF Packet +    edi::AFPacket af_packet = edi_af_packetiser.Assemble(tagpacket); + +    write(af_packet); +} + +void Sender::write(const AFPacket& af_packet) +{ +    for (auto& sender : m_pft_spreaders) { +        sender->send_af_packet(af_packet); +    } +} + +void Sender::override_af_sequence(uint16_t seq) +{ +    edi_af_packetiser.OverrideSeq(seq); +} + +void Sender::override_pft_sequence(uint16_t pseq) +{ +    for (auto& spreader : m_pft_spreaders) { +        spreader->edi_pft.OverridePSeq(pseq); +    } +} + +std::vector<Sender::stats_t> Sender::get_tcp_server_stats() const +{ +    std::vector<Sender::stats_t> stats; + +    for (auto& spreader : m_pft_spreaders) { +        if (auto sender = std::dynamic_pointer_cast<tcp_dispatcher_t>(spreader->sender)) { +            Sender::stats_t s; +            s.listen_port = sender->listen_port; +            s.stats = sender->sock.get_stats(); +            stats.push_back(s); +        } +    } + +    return stats; +} +  Sender::~Sender()  {      { -        unique_lock<mutex> lock(m_mutex);          m_running = false;      } @@ -123,36 +172,89 @@ Sender::~Sender()      }  } -void Sender::write(const TagPacket& tagpacket) +void Sender::run()  { -    // Assemble into one AF Packet -    edi::AFPacket af_packet = edi_afPacketiser.Assemble(tagpacket); +    while (m_running) { +        const auto now = chrono::steady_clock::now(); +        for (auto& spreader : m_pft_spreaders) { +            spreader->tick(now); +        } -    write(af_packet); +        this_thread::sleep_for(chrono::microseconds(500)); +    }  } -void Sender::write(const AFPacket& af_packet) + +void Sender::udp_sender_t::send_packet(const std::vector<uint8_t> &frame) +{ +    Socket::InetAddress addr; +    addr.resolveUdpDestination(dest_addr, dest_port); +    sock.send(frame, addr); +} + +void Sender::tcp_dispatcher_t::send_packet(const std::vector<uint8_t> &frame) +{ +    sock.write(frame); +} + +void Sender::tcp_send_client_t::send_packet(const std::vector<uint8_t> &frame) +{ +    sock.sendall(frame); +} + +Sender::udp_sender_t::udp_sender_t(std::string dest_addr, +                                   uint16_t dest_port, +                                   Socket::UDPSocket&& sock) : +    dest_addr(dest_addr), +    dest_port(dest_port), +    sock(std::move(sock)) +{ +} + +Sender::tcp_dispatcher_t::tcp_dispatcher_t(uint16_t listen_port, +                                           size_t max_frames_queued, +                                           size_t tcp_server_preroll_buffers) : +    listen_port(listen_port), +    sock(max_frames_queued, tcp_server_preroll_buffers)  { -    if (m_conf.enable_pft) { +    sock.start(listen_port, "0.0.0.0"); +} + +Sender::tcp_send_client_t::tcp_send_client_t(const std::string &dest_addr, +                                             uint16_t dest_port) : +    sock(dest_addr, dest_port) +{ +} + +Sender::PFTSpreader::PFTSpreader(const pft_settings_t& conf, sender_sp sender) : +    sender(sender), +    edi_pft(conf) +{ +} + +void Sender::PFTSpreader::send_af_packet(const AFPacket& af_packet) +{ +    using namespace std::chrono; +    if (edi_pft.is_enabled()) {          // Apply PFT layer to AF Packet (Reed Solomon FEC and Fragmentation)          vector<edi::PFTFragment> edi_fragments = edi_pft.Assemble(af_packet); -        if (m_conf.verbose and m_last_num_pft_fragments != edi_fragments.size()) { +        if (settings.verbose and last_num_pft_fragments != edi_fragments.size()) {              etiLog.log(debug, "EDI Output: Number of PFT fragments %zu\n",                      edi_fragments.size()); -            m_last_num_pft_fragments = edi_fragments.size(); +            last_num_pft_fragments = edi_fragments.size();          }          /* Spread out the transmission of all fragments over part of the 24ms AF packet duration           * to reduce the risk of losing a burst of fragments because of congestion. */ -        using namespace std::chrono;          auto inter_fragment_wait_time = microseconds(1);          if (edi_fragments.size() > 1) { -            if (m_conf.fragment_spreading_factor > 0) { +            if (settings.fragment_spreading_factor > 0) {                  inter_fragment_wait_time = -                    microseconds( -                            llrint(m_conf.fragment_spreading_factor * 24000.0 / edi_fragments.size()) -                            ); +                    microseconds(llrint( +                                settings.fragment_spreading_factor * 24000.0 / +                                edi_fragments.size() +                            ));              }          } @@ -162,121 +264,35 @@ void Sender::write(const AFPacket& af_packet)              auto tp = now;              unique_lock<mutex> lock(m_mutex);              for (auto& edi_frag : edi_fragments) { -                m_pending_frames[tp] = move(edi_frag); +                m_pending_frames[tp] = std::move(edi_frag);                  tp += inter_fragment_wait_time;              }          } - -        // Transmission done in run() function      }      else /* PFT disabled */ { -        // Send over ethernet -        if (m_conf.dump) { -            ostream_iterator<uint8_t> debug_iterator(edi_debug_file); -            copy(af_packet.begin(), af_packet.end(), debug_iterator); -        } - -        for (auto& dest : m_conf.destinations) { -            if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) { -                Socket::InetAddress addr; -                addr.resolveUdpDestination(udp_dest->dest_addr, udp_dest->dest_port); - -                if (af_packet.size() > 1400 and not m_udp_fragmentation_warning_printed) { -                    fprintf(stderr, "EDI Output: AF packet larger than 1400," -                            " consider using PFT to avoid UP fragmentation.\n"); -                    m_udp_fragmentation_warning_printed = true; -                } - -                udp_sockets.at(udp_dest.get())->send(af_packet, addr); -            } -            else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(dest)) { -                tcp_dispatchers.at(tcp_dest.get())->write(af_packet); -            } -            else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) { -                const auto error_stats = tcp_senders.at(tcp_dest.get())->sendall(af_packet); - -                if (m_conf.verbose and error_stats.has_seen_new_errors) { -                    fprintf(stderr, "TCP output %s:%d has %zu reconnects: most recent error: %s\n", -                            tcp_dest->dest_addr.c_str(), -                            tcp_dest->dest_port, -                            error_stats.num_reconnects, -                            error_stats.last_error.c_str()); -                } -            } -            else { -                throw logic_error("EDI destination not implemented"); -            } -        } +        const auto now = steady_clock::now(); +        unique_lock<mutex> lock(m_mutex); +        m_pending_frames[now] = std::move(af_packet);      } -} -void Sender::override_af_sequence(uint16_t seq) -{ -    edi_afPacketiser.OverrideSeq(seq); -} - -void Sender::override_pft_sequence(uint16_t pseq) -{ -    edi_pft.OverridePSeq(pseq); +    // Actual transmission done in tick() function  } -std::vector<Sender::stats_t> Sender::get_tcp_server_stats() const +void Sender::PFTSpreader::tick(const std::chrono::steady_clock::time_point& now)  { -    std::vector<Sender::stats_t> stats; +    unique_lock<mutex> lock(m_mutex); -    for (auto& el : tcp_dispatchers) { -        Sender::stats_t s; -        s.listen_port = el.first->listen_port; -        s.stats = el.second->get_stats(); -        stats.push_back(s); -    } - -    return stats; -} +    for (auto it = m_pending_frames.begin(); it != m_pending_frames.end(); ) { +        const auto& edi_frag = it->second; -void Sender::run() -{ -    while (m_running) { -        unique_lock<mutex> lock(m_mutex); -        const auto now = chrono::steady_clock::now(); - -        // Send over ethernet -        for (auto it = m_pending_frames.begin(); it != m_pending_frames.end(); ) { -            const auto& edi_frag = it->second; - -            if (it->first <= now) { -                if (m_conf.dump) { -                    ostream_iterator<uint8_t> debug_iterator(edi_debug_file); -                    copy(edi_frag.begin(), edi_frag.end(), debug_iterator); -                } - -                for (auto& dest : m_conf.destinations) { -                    if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) { -                        Socket::InetAddress addr; -                        addr.resolveUdpDestination(udp_dest->dest_addr, udp_dest->dest_port); - -                        udp_sockets.at(udp_dest.get())->send(edi_frag, addr); -                    } -                    else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(dest)) { -                        tcp_dispatchers.at(tcp_dest.get())->write(edi_frag); -                    } -                    else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) { -                        tcp_senders.at(tcp_dest.get())->sendall(edi_frag); -                    } -                    else { -                        throw logic_error("EDI destination not implemented"); -                    } -                } -                it = m_pending_frames.erase(it); -            } -            else { -                ++it; -            } +        if (it->first <= now) { +            sender->send_packet(edi_frag); +            it = m_pending_frames.erase(it); +        } +        else { +            ++it;          } - -        lock.unlock(); -        this_thread::sleep_for(chrono::microseconds(500));      }  } -} +} // namespace edi diff --git a/lib/edioutput/Transport.h b/lib/edioutput/Transport.h index 2ca638e..b8a9008 100644 --- a/lib/edioutput/Transport.h +++ b/lib/edioutput/Transport.h @@ -33,8 +33,6 @@  #include "Socket.h"  #include <chrono>  #include <map> -#include <unordered_map> -#include <fstream>  #include <cstdint>  #include <thread>  #include <mutex> @@ -43,12 +41,13 @@  namespace edi {  /** ETI/STI sender for EDI output */ -  class Sender {      public:          Sender(const configuration_t& conf);          Sender(const Sender&) = delete; -        Sender operator=(const Sender&) = delete; +        Sender& operator=(const Sender&) = delete; +        Sender(Sender&&) = delete; +        Sender& operator=(Sender&&) = delete;          ~Sender();          // Assemble the tagpacket into an AF packet, and if needed, @@ -72,32 +71,78 @@ class Sender {          std::vector<stats_t> get_tcp_server_stats() const;      private: -        void run(); - -        bool m_udp_fragmentation_warning_printed = false; -          configuration_t m_conf; -        std::ofstream edi_debug_file;          // The TagPacket will then be placed into an AFPacket -        edi::AFPacketiser edi_afPacketiser; +        edi::AFPacketiser edi_af_packetiser; -        // The AF Packet will be protected with reed-solomon and split in fragments -        edi::PFT edi_pft; +        // PFT spreading requires sending UDP packets at specific time, +        // independently of time when write() gets called +        bool m_running = false; +        std::thread m_thread; +        virtual void run(); -        std::unordered_map<udp_destination_t*, std::shared_ptr<Socket::UDPSocket>> udp_sockets; -        std::unordered_map<tcp_server_t*, std::shared_ptr<Socket::TCPDataDispatcher>> tcp_dispatchers; -        std::unordered_map<tcp_client_t*, std::shared_ptr<Socket::TCPSendClient>> tcp_senders; -        // PFT spreading requires sending UDP packets at specific time, independently of -        // time when write() gets called -        std::thread m_thread; -        std::mutex m_mutex; -        bool m_running = false; -        std::map<std::chrono::steady_clock::time_point, edi::PFTFragment> m_pending_frames; -        size_t m_last_num_pft_fragments = 0; -}; -} +        struct i_sender { +            virtual void send_packet(const std::vector<uint8_t> &frame) = 0; +            virtual ~i_sender() { } +        }; + +        struct udp_sender_t : public i_sender { +            udp_sender_t( +                    std::string dest_addr, +                    uint16_t dest_port, +                    Socket::UDPSocket&& sock); + +            std::string dest_addr; +            uint16_t dest_port; +            Socket::UDPSocket sock; + +            virtual void send_packet(const std::vector<uint8_t> &frame) override; +        }; + +        struct tcp_dispatcher_t : public i_sender { +            tcp_dispatcher_t( +                    uint16_t listen_port, +                    size_t max_frames_queued, +                    size_t tcp_server_preroll_buffers); + +            uint16_t listen_port; +            Socket::TCPDataDispatcher sock; +            virtual void send_packet(const std::vector<uint8_t> &frame) override; +        }; + +        struct tcp_send_client_t : public i_sender { +            tcp_send_client_t( +                    const std::string& dest_addr, +                    uint16_t dest_port); + +            Socket::TCPSendClient sock; +            virtual void send_packet(const std::vector<uint8_t> &frame) override; +        }; + +        class PFTSpreader { +            public: +                using sender_sp = std::shared_ptr<i_sender>; +                PFTSpreader(const pft_settings_t &conf, sender_sp sender); +                sender_sp sender; +                edi::PFT edi_pft; + +                void send_af_packet(const AFPacket &af_packet); +                void tick(const std::chrono::steady_clock::time_point& now); + +            private: +                // send_af_packet() and tick() are called from different threads, both +                // are accessing m_pending_frames +                std::mutex m_mutex; +                std::map<std::chrono::steady_clock::time_point, edi::PFTFragment> m_pending_frames; +                pft_settings_t settings; +                size_t last_num_pft_fragments = 0; +        }; + +        std::vector<std::shared_ptr<PFTSpreader>> m_pft_spreaders; +}; +} diff --git a/src/DabMux.cpp b/src/DabMux.cpp index 1a367da..bf525c1 100644 --- a/src/DabMux.cpp +++ b/src/DabMux.cpp @@ -327,6 +327,38 @@ int main(int argc, char *argv[])              if (outputuid == "edi") {                  ptree pt_edi = pt_outputs.get_child("edi"); +                bool default_enable_pft = pt_edi.get<bool>("enable_pft", false); +                edi_conf.verbose = pt_edi.get<bool>("verbose", false); + +                unsigned int default_fec = pt_edi.get<unsigned int>("fec", 3); +                unsigned int default_chunk_len = pt_edi.get<unsigned int>("chunk_len", 207); + +                auto check_spreading_factor = [](int percent) { +                    if (percent < 0) { +                        throw std::runtime_error("EDI output: negative packet_spread value is invalid."); +                    } +                    double factor = (double)percent / 100.0; +                    if (factor > 30000) { +                        throw std::runtime_error("EDI output: interleaving set for more than 30 seconds!"); +                    } +                    return factor; +                }; + +                double default_spreading_factor = check_spreading_factor(pt_edi.get<int>("packet_spread", 95)); + +                using pt_t = boost::property_tree::basic_ptree<std::basic_string<char>, std::basic_string<char>>; +                auto handle_overrides = [&](edi::pft_settings_t& pft_settings, pt_t pt) { +                    pft_settings.chunk_len = pt.get<unsigned int>("chunk_len", default_chunk_len); +                    pft_settings.enable_pft = pt.get<bool>("enable_pft", default_enable_pft); +                    pft_settings.fec = pt.get<unsigned int>("fec", default_fec); +                    pft_settings.fragment_spreading_factor = default_spreading_factor; +                    auto override_spread_percent = pt.get_optional<int>("packet_spread"); +                    if (override_spread_percent) { +                        pft_settings.fragment_spreading_factor = check_spreading_factor(*override_spread_percent); +                    } +                    pft_settings.verbose = pt.get<bool>("verbose", edi_conf.verbose); +                }; +                  for (auto pt_edi_dest : pt_edi.get_child("destinations")) {                      const auto proto = pt_edi_dest.second.get<string>("protocol", "udp");                      if (proto == "udp") { @@ -346,6 +378,8 @@ int main(int argc, char *argv[])                              dest->dest_port       = pt_edi.get<unsigned int>("port");                          } +                        handle_overrides(dest->pft_settings, pt_edi_dest.second); +                          edi_conf.destinations.push_back(dest);                      }                      else if (proto == "tcp") { @@ -355,6 +389,8 @@ int main(int argc, char *argv[])                          double preroll = pt_edi_dest.second.get<double>("preroll-burst", 0.0);                          dest->tcp_server_preroll_buffers = ceil(preroll / 24e-3); +                        handle_overrides(dest->pft_settings, pt_edi_dest.second); +                          edi_conf.destinations.push_back(dest);                      }                      else { @@ -362,22 +398,6 @@ int main(int argc, char *argv[])                      }                  } -                edi_conf.dump = pt_edi.get<bool>("dump", false); -                edi_conf.enable_pft = pt_edi.get<bool>("enable_pft", false); -                edi_conf.verbose = pt_edi.get<bool>("verbose", false); - -                edi_conf.fec = pt_edi.get<unsigned int>("fec", 3); -                edi_conf.chunk_len = pt_edi.get<unsigned int>("chunk_len", 207); - -                int spread_percent = pt_edi.get<int>("packet_spread", 95); -                if (spread_percent < 0) { -                    throw std::runtime_error("EDI output: negative packet_spread value is invalid."); -                } -                edi_conf.fragment_spreading_factor = (double)spread_percent / 100.0; -                if (edi_conf.fragment_spreading_factor > 30000) { -                    throw std::runtime_error("EDI output: interleaving set for more than 30 seconds!"); -                } -                  edi_conf.tagpacket_alignment = pt_edi.get<unsigned int>("tagpacket_alignment", 8);                  mux.set_edi_config(edi_conf);  | 
