diff options
| author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2019-09-23 20:30:57 +0200 | 
|---|---|---|
| committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2019-09-23 20:30:57 +0200 | 
| commit | 3edd67dc81cd637e06ea22221f3aebfa0111d989 (patch) | |
| tree | 27ff4d2749a6081b2439e6fa8baec8270a5a2d7b | |
| parent | 09e514732788d821189c59ddc58e70355ba1a3cb (diff) | |
| download | ODR-SourceCompanion-3edd67dc81cd637e06ea22221f3aebfa0111d989.tar.gz ODR-SourceCompanion-3edd67dc81cd637e06ea22221f3aebfa0111d989.tar.bz2 ODR-SourceCompanion-3edd67dc81cd637e06ea22221f3aebfa0111d989.zip | |
Add output code from ODR-AudioEnc with EDI output
| -rw-r--r-- | Makefile.am | 3 | ||||
| -rw-r--r-- | src/Outputs.cpp | 258 | ||||
| -rw-r--r-- | src/Outputs.h | 161 | ||||
| -rw-r--r-- | src/common.h | 32 | ||||
| -rw-r--r-- | src/odr-sourcecompanion.cpp | 132 | 
5 files changed, 529 insertions, 57 deletions
| diff --git a/Makefile.am b/Makefile.am index 4884dab..6f925f6 100644 --- a/Makefile.am +++ b/Makefile.am @@ -9,11 +9,12 @@ endif  odr_sourcecompanion_LDADD       = -lzmq  odr_sourcecompanion_CFLAGS      = $(GITVERSION_FLAGS) -ggdb -O2 -Wall -odr_sourcecompanion_CXXFLAGS    = $(GITVERSION_FLAGS) -ggdb -O2 -Wall -Isrc/fec -Ilib +odr_sourcecompanion_CXXFLAGS    = $(GITVERSION_FLAGS) -ggdb -O2 -Wall -Isrc -Ilib  odr_sourcecompanion_SOURCES     = src/odr-sourcecompanion.cpp \  								  src/AACDecoder.h src/AACDecoder.cpp \  								  src/AVTInput.h src/AVTInput.cpp \  								  src/OrderedQueue.h src/OrderedQueue.cpp \ +								  src/Outputs.h src/Outputs.cpp \  								  src/StatsPublish.h src/StatsPublish.cpp \  								  src/encryption.h src/encryption.c \  								  src/utils.h src/utils.c \ diff --git a/src/Outputs.cpp b/src/Outputs.cpp new file mode 100644 index 0000000..d0d3ca4 --- /dev/null +++ b/src/Outputs.cpp @@ -0,0 +1,258 @@ +/* ------------------------------------------------------------------ + * Copyright (C) 2011 Martin Storsjo + * Copyright (C) 2019 Matthias P. Braendli + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *    http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. + * See the License for the specific language governing permissions + * and limitations under the License. + * ------------------------------------------------------------------- + */ + +#include "Outputs.h" +#include <string> +#include <stdexcept> +#include <cstring> +#include <cerrno> +#include <cassert> + +namespace Output { + +using namespace std; + +void Base::update_audio_levels(int16_t audiolevel_left, int16_t audiolevel_right) +{ +    m_audio_left = audiolevel_left; +    m_audio_right = audiolevel_right; +} + +File::File(const char *filename) +{ +    m_fd = fopen(filename, "wb"); +    if (m_fd == nullptr) { +        throw runtime_error(string("Error opening output file: ") + strerror(errno)); +    } +} + +File::File(FILE *fd) : m_fd(fd) { } + +File::~File() { +    if (m_fd) { +        fclose(m_fd); +        m_fd = nullptr; +    } +} + +bool File::write_frame(const uint8_t *buf, size_t len) +{ +    if (m_fd == nullptr) { +        throw logic_error("Invalid usage of closed File output"); +    } + +    return fwrite(buf, len, 1, m_fd) == 1; +} + +ZMQ::ZMQ() : +    m_ctx(), +    m_sock(m_ctx, ZMQ_PUB) +{ +    // Do not wait at teardown to send all data out +    int linger = 0; +    m_sock.setsockopt(ZMQ_LINGER, &linger, sizeof(linger)); +} + +ZMQ::~ZMQ() {} + +void ZMQ::connect(const char *uri, const char *keyfile) +{ +    if (keyfile) { +        fprintf(stderr, "Enabling encryption\n"); + +        int rc = readkey(keyfile, m_secretkey); +        if (rc) { +            throw runtime_error("Error reading secret key"); +        } + +        const int yes = 1; +        m_sock.setsockopt(ZMQ_CURVE_SERVER, +                &yes, sizeof(yes)); + +        m_sock.setsockopt(ZMQ_CURVE_SECRETKEY, +                m_secretkey, CURVE_KEYLEN); +    } +    m_sock.connect(uri); +} + +void ZMQ::set_encoder_type(encoder_selection_t& enc, int bitrate) +{ +    m_encoder = enc; +    m_bitrate = bitrate; +} + +bool ZMQ::write_frame(const uint8_t *buf, size_t len) +{ +    if (m_framebuf.size() != ZMQ_HEADER_SIZE + len) { +        m_framebuf.resize(ZMQ_HEADER_SIZE + len); +    } + +    zmq_frame_header_t *zmq_frame_header = (zmq_frame_header_t*)m_framebuf.data(); + +    try { +        switch (m_encoder) { +            case encoder_selection_t::fdk_dabplus: +                zmq_frame_header->encoder = ZMQ_ENCODER_FDK; +                break; +            case encoder_selection_t::toolame_dab: +                zmq_frame_header->encoder = ZMQ_ENCODER_TOOLAME; +                break; +        } + +        zmq_frame_header->version = 1; +        zmq_frame_header->datasize = len; +        zmq_frame_header->audiolevel_left = m_audio_left; +        zmq_frame_header->audiolevel_right = m_audio_right; + +        assert(ZMQ_FRAME_SIZE(zmq_frame_header) <= m_framebuf.size()); + +        memcpy(ZMQ_FRAME_DATA(zmq_frame_header), buf, len); + +        m_sock.send(m_framebuf.data(), ZMQ_FRAME_SIZE(zmq_frame_header), +                ZMQ_DONTWAIT); +    } +    catch (zmq::error_t& e) { +        fprintf(stderr, "ZeroMQ send error !\n"); +        return false; +    } + +    return true; +} + +EDI::EDI() : +    m_clock_tai({}) +{ } + +EDI::~EDI() { } + +void EDI::add_udp_destination(const std::string& host, unsigned int port) +{ +    auto dest = make_shared<edi::udp_destination_t>(); +    dest->dest_addr = host; +    m_edi_conf.dest_port = port; +    m_edi_conf.destinations.push_back(dest); + +    // We cannot carry AF packets over UDP, because they would be too large. +    m_edi_conf.enable_pft = true; + +    // TODO make FEC configurable +} + +void EDI::add_tcp_destination(const std::string& host, unsigned int port) +{ +    auto dest = make_shared<edi::tcp_client_t>(); +    dest->dest_addr = host; +    if (dest->dest_port != 0 and dest->dest_port != port) { +        throw runtime_error("All EDI UDP outputs must be to the same destination port"); +    } +    dest->dest_port = port; +    m_edi_conf.destinations.push_back(dest); + +    m_edi_conf.dump = true; +} + +bool EDI::enabled() const +{ +    return not m_edi_conf.destinations.empty(); +} + +void EDI::set_tist(bool enable, uint32_t delay_ms) +{ +    m_tist = enable; +    m_delay_ms = delay_ms; +} + +bool EDI::write_frame(const uint8_t *buf, size_t len) +{ +    if (not m_edi_sender) { +        m_edi_sender = make_shared<edi::Sender>(m_edi_conf); +    } + +    if (m_edi_time == 0) { +        using Sec = chrono::seconds; +        const auto now = chrono::time_point_cast<Sec>(chrono::system_clock::now()); +        m_edi_time = chrono::system_clock::to_time_t(now) + (m_delay_ms / 1000); +        m_send_version_at_time = m_edi_time; + +        /* TODO we still have to see if 24ms granularity is achievable, given that +         * one DAB+ super frame is carried over more than 1 ETI frame. +         */ +        for (int32_t sub_ms = (m_delay_ms % 1000); sub_ms > 0; sub_ms -= 24) { +            m_timestamp += 24 << 14; // Shift 24ms by 14 to Timestamp level 2 +        } +    } + +    edi::TagStarPTR edi_tagStarPtr("DSTI"); + +    m_edi_tagDSTI.stihf = false; +    m_edi_tagDSTI.atstf = m_tist; + +    m_timestamp += 24 << 14; // Shift 24ms by 14 to Timestamp level 2 +    if (m_timestamp > 0xf9FFff) { +        m_timestamp -= 0xfa0000; // Substract 16384000, corresponding to one second +        m_edi_time += 1; + +        m_num_seconds_sent++; +    } + +    m_edi_tagDSTI.set_edi_time(m_edi_time, m_clock_tai.get_offset()); +    m_edi_tagDSTI.tsta = m_timestamp & 0xffffff; + +    m_edi_tagDSTI.rfadf = false; +    // DFCT is handled inside the TagDSTI + +    edi::TagSSm edi_tagPayload; +    // TODO make edi_tagPayload.stid configurable +    edi_tagPayload.istd_data = buf; +    edi_tagPayload.istd_length = len; + +    edi::TagODRAudioLevels edi_tagAudioLevels(m_audio_left, m_audio_right); + +    stringstream ss; +    ss << PACKAGE_NAME << " " << +#if defined(GITVERSION) +        GITVERSION; +#else +    PACKAGE_VERSION; +#endif +    edi::TagODRVersion edi_tagVersion(ss.str(), m_num_seconds_sent); + + +    // The above Tag Items will be assembled into a TAG Packet +    edi::TagPacket edi_tagpacket(m_edi_conf.tagpacket_alignment); + +    // put tags *ptr, DETI and all subchannels into one TagPacket +    edi_tagpacket.tag_items.push_back(&edi_tagStarPtr); +    edi_tagpacket.tag_items.push_back(&m_edi_tagDSTI); +    edi_tagpacket.tag_items.push_back(&edi_tagPayload); +    edi_tagpacket.tag_items.push_back(&edi_tagAudioLevels); + +    // Send version information only every 10 seconds to save bandwidth +    if (m_send_version_at_time < m_edi_time) { +        m_send_version_at_time += 10; +        edi_tagpacket.tag_items.push_back(&edi_tagVersion); +    } + +    m_edi_sender->write(edi_tagpacket); + +    // TODO Handle TCP disconnect +    return true; +} + +} diff --git a/src/Outputs.h b/src/Outputs.h new file mode 100644 index 0000000..0f1f34f --- /dev/null +++ b/src/Outputs.h @@ -0,0 +1,161 @@ +/* ------------------------------------------------------------------ + * Copyright (C) 2011 Martin Storsjo + * Copyright (C) 2019 Matthias P. Braendli + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *    http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. + * See the License for the specific language governing permissions + * and limitations under the License. + * ------------------------------------------------------------------- + */ + +#pragma once +#include <vector> +#include <chrono> +#include <deque> +#include <cstdint> +#include <cstddef> +#include <cstdio> +#include "common.h" +#include "zmq.hpp" +#include "ClockTAI.h" +#include "edi/TagItems.h" +#include "edi/TagPacket.h" +#include "edi/AFPacket.h" +#include "edi/Transport.h" +extern "C" { +#include "encryption.h" +} + +namespace Output { + +/*! \file Outputs.h + * + * Declaration of all outputs + */ + +class Base { +    public: +        virtual ~Base() {}; + +        /*! Write a buffer of encoded data to the output */ +        virtual bool write_frame(const uint8_t *buf, size_t len) = 0; + +        /*! Update peak audio level information */ +        virtual void update_audio_levels( +                int16_t audiolevel_left, int16_t audiolevel_right); + +    protected: +        int16_t m_audio_left = 0; +        int16_t m_audio_right = 0; +}; + +class File : public Base { +    public: +        File(const char *filename); +        File(FILE *file); +        File(const File&) = delete; +        File& operator=(const File&) = delete; +        virtual ~File() override; + +        virtual bool write_frame(const uint8_t *buf, size_t len) override; + +    private: +        FILE *m_fd = nullptr; +}; + +/*! This defines the on-wire representation of a ZMQ message header. + * It must be compatible with the definition in ODR-DabMux. + * + * The data follows right after this header */ +struct zmq_frame_header_t +{ +    uint16_t version; // we support version=1 now +    uint16_t encoder; // see ZMQ_ENCODER_XYZ + +    /* length of the 'data' field */ +    uint32_t datasize; + +    /* Audio level, peak, linear PCM */ +    int16_t audiolevel_left; +    int16_t audiolevel_right; + +    /* Data follows this header */ +} __attribute__ ((packed)); + +#define ZMQ_ENCODER_FDK 1 +#define ZMQ_ENCODER_TOOLAME 2 + +#define ZMQ_HEADER_SIZE sizeof(struct zmq_frame_header_t) + +/* The expected frame size incl data of the given frame */ +#define ZMQ_FRAME_SIZE(f) (sizeof(struct zmq_frame_header_t) + f->datasize) + +#define ZMQ_FRAME_DATA(f) ( ((uint8_t*)f)+sizeof(struct zmq_frame_header_t) ) + + +class ZMQ: public Base { +    public: +        ZMQ(); +        ZMQ(const ZMQ&) = delete; +        ZMQ& operator=(const ZMQ&) = delete; +        virtual ~ZMQ() override; + +        void connect(const char *uri, const char *keyfile); +        void set_encoder_type(encoder_selection_t& enc, int bitrate); + +        virtual bool write_frame(const uint8_t *buf, size_t len) override; + +    private: +        zmq::context_t m_ctx; +        zmq::socket_t m_sock; + +        int m_bitrate = 0; +        char m_secretkey[CURVE_KEYLEN+1]; +        encoder_selection_t m_encoder = encoder_selection_t::fdk_dabplus; +        using vec_u8 = std::vector<uint8_t>; +        vec_u8 m_framebuf; +}; + + +class EDI: public Base { +    public: +        EDI(); +        EDI(const EDI&) = delete; +        EDI& operator=(const EDI&) = delete; +        virtual ~EDI() override; + +        void add_udp_destination(const std::string& host, unsigned int port); +        void add_tcp_destination(const std::string& host, unsigned int port); + +        void set_tist(bool enable, uint32_t delay_ms); + +        bool enabled() const; + +        virtual bool write_frame(const uint8_t *buf, size_t len) override; + +    private: +        edi::configuration_t m_edi_conf; +        std::shared_ptr<edi::Sender> m_edi_sender; + +        uint32_t m_timestamp = 0; +        uint32_t m_num_seconds_sent = 0; +        std::time_t m_edi_time = 0; +        std::time_t m_send_version_at_time = 0; + +        edi::TagDSTI m_edi_tagDSTI; + +        ClockTAI m_clock_tai; +        bool m_tist = false; +        uint32_t m_delay_ms = 0; +}; + +} diff --git a/src/common.h b/src/common.h new file mode 100644 index 0000000..774a4a0 --- /dev/null +++ b/src/common.h @@ -0,0 +1,32 @@ +/* ------------------------------------------------------------------ + * Copyright (C) 2016 Matthias P. Braendli + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *    http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. + * See the License for the specific language governing permissions + * and limitations under the License. + * ------------------------------------------------------------------- + */ + +#pragma once + +// 16 bits per sample is fine for now +#define BYTES_PER_SAMPLE 2 + +// How many samples we insert into the queue each call +#define NUM_SAMPLES_PER_CALL 10 // 10 samples @ 32kHz = 3.125ms + +//! Enumeration of encoders we can use +enum class encoder_selection_t { +    fdk_dabplus, +    toolame_dab +}; + diff --git a/src/odr-sourcecompanion.cpp b/src/odr-sourcecompanion.cpp index a69f705..8b5b701 100644 --- a/src/odr-sourcecompanion.cpp +++ b/src/odr-sourcecompanion.cpp @@ -27,6 +27,7 @@  #include "zmq.hpp"  #include "AVTInput.h" +#include "Outputs.h"  #include "AACDecoder.h"  #include "StatsPublish.h"  #include <sys/time.h> @@ -89,10 +90,10 @@ void usage(const char* name) {      "         --ps                             Force the usage of PS\n"      "   Output and pad parameters:\n"      "     -o, --output=URI                     Output ZMQ uri. (e.g. 'tcp://localhost:9000')\n" -    "                                     -or- Output file uri. (e.g. 'file.dabp')\n" -    "                                     -or- a single dash '-' to denote stdout\n"      "                                          If more than one ZMQ output is given, the socket\n"      "                                          will be connected to all listed endpoints.\n" +    "     -e, --edi=URI                        EDI output uri, (e.g. 'tcp://localhost:7000')\n" +    "     -T, --timestamp-delay=DELAY_MS       Enabled timestamps in EDI (requires TAI clock bulletin download) and\n"      "     -k, --secret-key=FILE                Enable ZMQ encryption with the given secret key.\n"      "     -p, --pad=BYTES                      Set PAD size in bytes.\n"      "     -P, --pad-fifo=FILENAME              Set PAD data input fifo name" @@ -137,12 +138,13 @@ int main(int argc, char *argv[])      string send_stats_to = "";      /* Data for ZMQ CURVE authentication */ -    char* keyfile = nullptr; -    char secretkey[CURVE_KEYLEN+1]; +    char *keyfile = nullptr;      const struct option longopts[] = {          {"bitrate",                required_argument,  0, 'b'},          {"channels",               required_argument,  0, 'c'}, +        {"edi",                    required_argument,  0, 'e'}, +        {"timestamp-delay",        required_argument,  0, 'T'},          {"output",                 required_argument,  0, 'o'},          {"pad",                    required_argument,  0, 'p'},          {"pad-fifo",               required_argument,  0, 'P'}, @@ -183,13 +185,17 @@ int main(int argc, char *argv[])      bool allowSBR = false;      bool allowPS  = false; +    vector<string> edi_output_uris; +    bool tist_enabled = false; +    uint32_t tist_delay_ms = 0; +      int bitrate = 0;      int channels = 2;      int sample_rate = 48000;      char ch = 0;      int index;      while(ch != -1) { -        ch = getopt_long(argc, argv, "hlb:c:k:o:r:p:P:I:", longopts, &index); +        ch = getopt_long(argc, argv, "hlb:c:e:T:k:o:r:p:P:I:", longopts, &index);          switch (ch) {          case 0: // AAC-LC              allowPS = false; @@ -204,10 +210,17 @@ int main(int argc, char *argv[])              allowSBR = true;              break;          case 'b': -            bitrate = atoi(optarg); +            bitrate = stoi(optarg);              break;          case 'c': -            channels = atoi(optarg); +            channels = stoi(optarg); +            break; +        case 'e': +            edi_output_uris.push_back(optarg); +            break; +        case 'T': +            tist_enabled = true; +            tist_delay_ms = std::stoi(optarg);              break;          case 'k':              keyfile = optarg; @@ -219,13 +232,13 @@ int main(int argc, char *argv[])              output_uris.push_back(optarg);              break;          case 'p': -            padlen = atoi(optarg); +            padlen = stoi(optarg);              break;          case 'P':              pad_fifo = optarg;              break;          case 'r': -            sample_rate = atoi(optarg); +            sample_rate = stoi(optarg);              break;          case 'S':              send_stats_to = optarg; @@ -238,16 +251,16 @@ int main(int argc, char *argv[])              avt_output_uri = optarg;              break;          case 7: -            avt_timeout = atoi(optarg); +            avt_timeout = stoi(optarg);              if (avt_timeout < 0) {                  avt_timeout = 2000;              }              break;          case 8: -            avt_pad_port = atoi(optarg); +            avt_pad_port = stoi(optarg);              break;          case 9: -            avt_jitterBufferSize = atoi(optarg); +            avt_jitterBufferSize = stoi(optarg);              break;          case '?':          case 'h': @@ -266,33 +279,52 @@ int main(int argc, char *argv[])          return 1;      } -    zmq::context_t zmq_ctx; -    zmq::socket_t zmq_sock(zmq_ctx, ZMQ_PUB); +    shared_ptr<Output::ZMQ> zmq_output; +    Output::EDI edi_output; -    if (not output_uris.empty()) { -        for (auto uri : output_uris) { -            if (keyfile) { -                fprintf(stderr, "Enabling encryption\n"); +    if (output_uris.empty() and edi_output_uris.empty()) { +        fprintf(stderr, "No output URIs defined\n"); +        return 1; +    } -                int rc = readkey(keyfile, secretkey); -                if (rc) { -                    fprintf(stderr, "Error reading secret key\n"); -                    return 2; -                } +    for (const auto& uri : output_uris) { +        if (not zmq_output) { +            zmq_output = make_shared<Output::ZMQ>(); +        } -                const int yes = 1; -                zmq_sock.setsockopt(ZMQ_CURVE_SERVER, -                        &yes, sizeof(yes)); +        zmq_output->connect(uri.c_str(), keyfile); +    } -                zmq_sock.setsockopt(ZMQ_CURVE_SECRETKEY, -                        secretkey, CURVE_KEYLEN); +    for (const auto& uri : edi_output_uris) { +        if (uri.compare(0, 6, "tcp://") == 0 or +            uri.compare(0, 6, "udp://") == 0) { +            auto host_port_sep_ix = uri.find(':', 6); +            if (host_port_sep_ix != string::npos) { +                auto host = uri.substr(6, host_port_sep_ix - 6); +                auto port = std::stoi(uri.substr(host_port_sep_ix + 1)); + +                auto proto = uri.substr(0, 3); +                if (proto == "tcp") { +                    edi_output.add_tcp_destination(host, port); +                } +                else if (proto == "udp") { +                    edi_output.add_udp_destination(host, port); +                } +                else { +                    throw logic_error("unhandled proto"); +                }              } -            zmq_sock.connect(uri.c_str()); +            else { +                fprintf(stderr, "Invalid EDI URL host!\n"); +            } +        } +        else { +            fprintf(stderr, "Invalid EDI protocol!\n");          }      } -    else { -        fprintf(stderr, "No output URI defined\n"); -        return 1; + +    if (not edi_output_uris.empty()) { +        edi_output.set_tist(tist_enabled, tist_delay_ms);      }      if (padlen != 0) { @@ -350,19 +382,15 @@ int main(int argc, char *argv[])      }      int outbuf_size; -    std::vector<uint8_t> zmqframebuf;      std::vector<uint8_t> outbuf;      outbuf_size = bitrate/8*120;      outbuf.resize(24*120); -    zmqframebuf.resize(ZMQ_HEADER_SIZE + 24*120);      if (outbuf_size % 5 != 0) {          fprintf(stderr, "Warning: (outbuf_size mod 5) = %d\n", outbuf_size % 5);      } -    zmq_frame_header_t *zmq_frame_header = (zmq_frame_header_t*)&zmqframebuf[0]; -      unsigned char pad_buf[padlen + 1];      fprintf(stderr, "Starting encoding\n"); @@ -449,28 +477,22 @@ int main(int argc, char *argv[])          read_bytes = numOutBytes;          if (numOutBytes != 0) { -            // ------------ ZeroMQ transmit -            try { -                zmq_frame_header->encoder = ZMQ_ENCODER_FDK; -                zmq_frame_header->version = 1; -                zmq_frame_header->datasize = numOutBytes; -                zmq_frame_header->audiolevel_left = peak_left; -                zmq_frame_header->audiolevel_right = peak_right; - -                assert(ZMQ_FRAME_SIZE(zmq_frame_header) <= zmqframebuf.size()); - -                memcpy(ZMQ_FRAME_DATA(zmq_frame_header), -                        &outbuf[0], numOutBytes); -                zmq_sock.send(&zmqframebuf[0], ZMQ_FRAME_SIZE(zmq_frame_header), -                        ZMQ_DONTWAIT); +            bool success = false; +            if (zmq_output) { +                zmq_output->update_audio_levels(peak_left, peak_right); +                success = zmq_output->write_frame(outbuf.data(), outbuf.size());              } -            catch (zmq::error_t& e) { -                fprintf(stderr, "ZeroMQ send error !\n"); -                send_error_count ++; +            else if (edi_output.enabled()) { +                edi_output.update_audio_levels(peak_left, peak_right); +                success = edi_output.write_frame(outbuf.data(), outbuf.size()); +            } + +            if (not success) { +                send_error_count++;              }              if (send_error_count > 10) { -                fprintf(stderr, "ZeroMQ send failed ten times, aborting!\n"); +                fprintf(stderr, "Send failed ten times, aborting!\n");                  retval = 4;                  break;              } @@ -500,7 +522,5 @@ int main(int argc, char *argv[])      fprintf(stderr, "\n"); -    zmq_sock.close(); -      return retval;  } | 
