diff options
| -rw-r--r-- | src/AVTInput.cpp | 18 | ||||
| -rw-r--r-- | src/AVTInput.h | 6 | ||||
| -rw-r--r-- | src/OrderedQueue.cpp | 19 | ||||
| -rw-r--r-- | src/OrderedQueue.h | 15 | ||||
| -rw-r--r-- | src/Outputs.cpp | 51 | ||||
| -rw-r--r-- | src/Outputs.h | 6 | ||||
| -rw-r--r-- | src/odr-sourcecompanion.cpp | 17 | 
7 files changed, 76 insertions, 56 deletions
| diff --git a/src/AVTInput.cpp b/src/AVTInput.cpp index 11dd4cc..d39f2ef 100644 --- a/src/AVTInput.cpp +++ b/src/AVTInput.cpp @@ -368,6 +368,7 @@ bool AVTInput::_readFrame()      size_t dataSize = 0;      auto packet = _input_socket.receive(MAX_AVT_FRAME_SIZE); +    const timestamp_t ts = std::chrono::system_clock::now();      const size_t readBytes = packet.buffer.size();      if (readBytes > 0) { @@ -380,7 +381,7 @@ bool AVTInput::_readFrame()          if (dataPtr) {              if (dataSize == _dab24msFrameSize) { -                _ordered.push(frameNumber, dataPtr, dataSize); +                _ordered.push(frameNumber, dataPtr, dataSize, ts);              }              else ERROR("Wrong frame size from encoder %zu != %zu\n", dataSize, _dab24msFrameSize);          } @@ -392,10 +393,8 @@ bool AVTInput::_readFrame()      return readBytes > 0;  } -ssize_t AVTInput::getNextFrame(std::vector<uint8_t> &buf) +size_t AVTInput::getNextFrame(std::vector<uint8_t> &buf, std::chrono::system_clock::time_point& ts)  { -    ssize_t nbBytes = 0; -      //printf("A: _padFrameQueue size=%zu\n", _padFrameQueue.size());      // Read all messages from encoder (in priority) @@ -409,16 +408,19 @@ ssize_t AVTInput::getNextFrame(std::vector<uint8_t> &buf)      int32_t returnedIndex = -1;      while (_nbFrames < 5) { -        auto part = _ordered.pop(&returnedIndex); +        const auto queue_data = _ordered.pop(&returnedIndex); +        const auto& part = queue_data.buf;          if (part.empty()) {              break;          }          while (_checkMessage()) {}; +          if (not _frameAligned) {              if (returnedIndex % 5 == 0) {                  _frameAligned = true; +                _frameZeroTimestamp = queue_data.capture_timestamp;                  memcpy(_currentFrame.data() + _currentFrameSize, part.data(), part.size());                  _currentFrameSize += part.size(); @@ -430,6 +432,10 @@ ssize_t AVTInput::getNextFrame(std::vector<uint8_t> &buf)                  memcpy(_currentFrame.data() + _currentFrameSize, part.data(), part.size());                  _currentFrameSize += part.size();                  _nbFrames++; + +                // UDP packets arrive with jitter, we intentionally only consider +                // their timestamp after a discontinuity. +                _frameZeroTimestamp += std::chrono::milliseconds(24);              }              else {                  _nbFrames = 0; @@ -441,11 +447,13 @@ ssize_t AVTInput::getNextFrame(std::vector<uint8_t> &buf)          }      } +    size_t nbBytes = 0;      if (_nbFrames == 5 && _currentFrameSize <= buf.size()) {          memcpy(&buf[0], _currentFrame.data(), _currentFrameSize);          nbBytes = _currentFrameSize;          _currentFrameSize = 0;          _nbFrames = 0; +        ts = _frameZeroTimestamp;      }      //printf("C: _padFrameQueue size=%zu\n", _padFrameQueue.size()); diff --git a/src/AVTInput.h b/src/AVTInput.h index e925a80..fd6cf02 100644 --- a/src/AVTInput.h +++ b/src/AVTInput.h @@ -37,6 +37,7 @@  #include <string>  #include <queue>  #include <vector> +#include <chrono>  #define DEF_BR  64 @@ -79,12 +80,12 @@ class AVTInput           */          int setDabPlusParameters(int bitrate, int channels, int sample_rate, bool sbr, bool ps); -        /*! Read incomming frames from the encoder, reorder and reassemble then into DAB+ superframes +        /*! Read incoming frames from the encoder, reorder and reassemble then into DAB+ superframes           *! Give the next reassembled audio frame (120ms for DAB+)           *           * \return the size of the frame or 0 if none are available yet           */ -        ssize_t getNextFrame(std::vector<uint8_t> &buf); +        size_t getNextFrame(std::vector<uint8_t> &buf, std::chrono::system_clock::time_point& ts);          /*! Store a new PAD frame.           *! Frames are sent to the encoder on request @@ -118,6 +119,7 @@ class AVTInput          bool _frameAligned = false;          std::vector<uint8_t> _currentFrame;          int32_t _nbFrames = 0; +        std::chrono::system_clock::time_point _frameZeroTimestamp;          size_t _currentFrameSize = 0;          bool _parseURI(const char* uri, std::string& address, long& port); diff --git a/src/OrderedQueue.cpp b/src/OrderedQueue.cpp index eb2cf97..707f0f9 100644 --- a/src/OrderedQueue.cpp +++ b/src/OrderedQueue.cpp @@ -22,6 +22,8 @@  #include <cstdio>  #include <stdint.h> +using namespace std; +  #define DEBUG(fmt, A...)   fprintf(stderr, "OrderedQueue: " fmt, ##A)  //#define DEBUG(x...)  #define ERROR(fmt, A...)   fprintf(stderr, "OrderedQueue: ERROR " fmt, ##A) @@ -32,7 +34,7 @@ OrderedQueue::OrderedQueue(int maxIndex, size_t capacity) :  {  } -void OrderedQueue::push(int32_t index, const uint8_t* buf, size_t size) +void OrderedQueue::push(int32_t index, const uint8_t* buf, size_t size, const timestamp_t& ts)  {      // DEBUG("OrderedQueue::push index=%d\n", index);      index = (index + _maxIndex) % _maxIndex; @@ -52,8 +54,11 @@ void OrderedQueue::push(int32_t index, const uint8_t* buf, size_t size)              DEBUG("Duplicated index=%d\n", index);          } -        OrderedQueueData oqd(size); -        copy(buf, buf + size, oqd.begin()); +        OrderedQueueData oqd; +        oqd.buf.resize(size); +        oqd.capture_timestamp = ts; + +        copy(buf, buf + size, oqd.buf.begin());          _stock[index] = move(oqd);      }      else { @@ -73,9 +78,9 @@ bool OrderedQueue::availableData() const      return _stock.size() > 0;  } -std::vector<uint8_t> OrderedQueue::pop(int32_t *returnedIndex) +OrderedQueueData OrderedQueue::pop(int32_t *returnedIndex)  { -    OrderedQueueData buf; +    OrderedQueueData oqd;      uint32_t gap = 0;      if (_stock.size() > 0) { @@ -83,7 +88,7 @@ std::vector<uint8_t> OrderedQueue::pop(int32_t *returnedIndex)          bool found = false;          while (not found) {              try { -                buf = move(_stock.at(nextIndex)); +                oqd = move(_stock.at(nextIndex));                  _stock.erase(nextIndex);                  _lastIndexPop = nextIndex;                  if (returnedIndex) *returnedIndex = _lastIndexPop; @@ -108,6 +113,6 @@ std::vector<uint8_t> OrderedQueue::pop(int32_t *returnedIndex)          DEBUG("index jump of %d\n", gap);      } -    return buf; +    return oqd;  } diff --git a/src/OrderedQueue.h b/src/OrderedQueue.h index 7cc59ee..654ae61 100644 --- a/src/OrderedQueue.h +++ b/src/OrderedQueue.h @@ -22,9 +22,18 @@  #include <string>  #include <map>  #include <vector> +#include <chrono>  #include <cstdint>  #include <cstdio> +using timestamp_t = std::chrono::system_clock::time_point; +using vec_u8 = std::vector<uint8_t>; + +struct OrderedQueueData { +    vec_u8 buf; +    timestamp_t capture_timestamp; +}; +  /* An queue that receives indexed frames, potentially out-of-order,   * which returns the frames in-order.   */ @@ -36,13 +45,11 @@ class OrderedQueue           */          OrderedQueue(int32_t maxIndex, size_t capacity); -        void push(int32_t index, const uint8_t* buf, size_t size); +        void push(int32_t index, const uint8_t* buf, size_t size, const timestamp_t& ts);          bool availableData() const;          /* Return the next buffer, or an empty buffer if none available */ -        std::vector<uint8_t> pop(int32_t *returnedIndex=nullptr); - -        using OrderedQueueData = std::vector<uint8_t>; +        OrderedQueueData pop(int32_t *returnedIndex=nullptr);      private:          int32_t     _maxIndex; diff --git a/src/Outputs.cpp b/src/Outputs.cpp index d0d3ca4..3b4de65 100644 --- a/src/Outputs.cpp +++ b/src/Outputs.cpp @@ -136,12 +136,13 @@ bool ZMQ::write_frame(const uint8_t *buf, size_t len)  }  EDI::EDI() : +    m_time_last_version_sent(chrono::steady_clock::now()),      m_clock_tai({})  { }  EDI::~EDI() { } -void EDI::add_udp_destination(const std::string& host, unsigned int port) +void EDI::add_udp_destination(const string& host, unsigned int port)  {      auto dest = make_shared<edi::udp_destination_t>();      dest->dest_addr = host; @@ -154,7 +155,7 @@ void EDI::add_udp_destination(const std::string& host, unsigned int port)      // TODO make FEC configurable  } -void EDI::add_tcp_destination(const std::string& host, unsigned int port) +void EDI::add_tcp_destination(const string& host, unsigned int port)  {      auto dest = make_shared<edi::tcp_client_t>();      dest->dest_addr = host; @@ -172,10 +173,22 @@ bool EDI::enabled() const      return not m_edi_conf.destinations.empty();  } -void EDI::set_tist(bool enable, uint32_t delay_ms) +void EDI::set_tist(bool enable, uint32_t delay_ms, const chrono::system_clock::time_point& ts)  {      m_tist = enable;      m_delay_ms = delay_ms; + +    const auto ts_with_delay = ts + chrono::milliseconds(m_delay_ms); + +    const auto ts_s = chrono::time_point_cast<chrono::seconds>(ts_with_delay); +    const auto remainder = ts_with_delay - ts_s; +    if (remainder < chrono::milliseconds(0)) { +        throw logic_error("EDI::set_tist remainder duration negative!"); +    } +    const uint32_t remainder_ms = chrono::duration_cast<chrono::milliseconds>(remainder).count(); + +    m_edi_time = chrono::system_clock::to_time_t(ts_s); +    m_timestamp += remainder_ms << 14; // Shift ms by 14 to Timestamp level 2  }  bool EDI::write_frame(const uint8_t *buf, size_t len) @@ -184,33 +197,11 @@ bool EDI::write_frame(const uint8_t *buf, size_t len)          m_edi_sender = make_shared<edi::Sender>(m_edi_conf);      } -    if (m_edi_time == 0) { -        using Sec = chrono::seconds; -        const auto now = chrono::time_point_cast<Sec>(chrono::system_clock::now()); -        m_edi_time = chrono::system_clock::to_time_t(now) + (m_delay_ms / 1000); -        m_send_version_at_time = m_edi_time; - -        /* TODO we still have to see if 24ms granularity is achievable, given that -         * one DAB+ super frame is carried over more than 1 ETI frame. -         */ -        for (int32_t sub_ms = (m_delay_ms % 1000); sub_ms > 0; sub_ms -= 24) { -            m_timestamp += 24 << 14; // Shift 24ms by 14 to Timestamp level 2 -        } -    } -      edi::TagStarPTR edi_tagStarPtr("DSTI");      m_edi_tagDSTI.stihf = false;      m_edi_tagDSTI.atstf = m_tist; -    m_timestamp += 24 << 14; // Shift 24ms by 14 to Timestamp level 2 -    if (m_timestamp > 0xf9FFff) { -        m_timestamp -= 0xfa0000; // Substract 16384000, corresponding to one second -        m_edi_time += 1; - -        m_num_seconds_sent++; -    } -      m_edi_tagDSTI.set_edi_time(m_edi_time, m_clock_tai.get_offset());      m_edi_tagDSTI.tsta = m_timestamp & 0xffffff; @@ -231,8 +222,10 @@ bool EDI::write_frame(const uint8_t *buf, size_t len)  #else      PACKAGE_VERSION;  #endif -    edi::TagODRVersion edi_tagVersion(ss.str(), m_num_seconds_sent); +    // We always send in 24ms interval +    const size_t num_seconds_sent = m_num_frames_sent * 1000 / 24; +    edi::TagODRVersion edi_tagVersion(ss.str(), num_seconds_sent);      // The above Tag Items will be assembled into a TAG Packet      edi::TagPacket edi_tagpacket(m_edi_conf.tagpacket_alignment); @@ -244,13 +237,15 @@ bool EDI::write_frame(const uint8_t *buf, size_t len)      edi_tagpacket.tag_items.push_back(&edi_tagAudioLevels);      // Send version information only every 10 seconds to save bandwidth -    if (m_send_version_at_time < m_edi_time) { -        m_send_version_at_time += 10; +    if (m_time_last_version_sent + chrono::seconds(10) < chrono::steady_clock::now()) { +        m_time_last_version_sent += chrono::seconds(10);          edi_tagpacket.tag_items.push_back(&edi_tagVersion);      }      m_edi_sender->write(edi_tagpacket); +    m_num_frames_sent++; +      // TODO Handle TCP disconnect      return true;  } diff --git a/src/Outputs.h b/src/Outputs.h index 0f1f34f..1f17491 100644 --- a/src/Outputs.h +++ b/src/Outputs.h @@ -136,7 +136,7 @@ class EDI: public Base {          void add_udp_destination(const std::string& host, unsigned int port);          void add_tcp_destination(const std::string& host, unsigned int port); -        void set_tist(bool enable, uint32_t delay_ms); +        void set_tist(bool enable, uint32_t delay_ms, const std::chrono::system_clock::time_point& ts);          bool enabled() const; @@ -147,9 +147,9 @@ class EDI: public Base {          std::shared_ptr<edi::Sender> m_edi_sender;          uint32_t m_timestamp = 0; -        uint32_t m_num_seconds_sent = 0; +        uint32_t m_num_frames_sent = 0;          std::time_t m_edi_time = 0; -        std::time_t m_send_version_at_time = 0; +        std::chrono::steady_clock::time_point m_time_last_version_sent;          edi::TagDSTI m_edi_tagDSTI; diff --git a/src/odr-sourcecompanion.cpp b/src/odr-sourcecompanion.cpp index 02475eb..b8c5547 100644 --- a/src/odr-sourcecompanion.cpp +++ b/src/odr-sourcecompanion.cpp @@ -323,10 +323,6 @@ int main(int argc, char *argv[])          }      } -    if (not edi_output_uris.empty()) { -        edi_output.set_tist(tist_enabled, tist_delay_ms); -    } -      if (padlen != 0) {          int flags;          if (mkfifo(pad_fifo, S_IWUSR | S_IRUSR | S_IRGRP | S_IROTH) != 0) { @@ -436,14 +432,21 @@ int main(int argc, char *argv[])                  }              } -            numOutBytes = avtinput.getNextFrame(outbuf); -            if (numOutBytes == 0) { +            chrono::system_clock::time_point ts; +            numOutBytes = avtinput.getNextFrame(outbuf, ts); +            if (numOutBytes > 0) { +                if (not edi_output_uris.empty()) { +                    edi_output.set_tist(tist_enabled, tist_delay_ms, ts); +                } +            } +            else {                  const auto curTime = std::chrono::steady_clock::now();                  const auto diff = curTime - timeout_start;                  if (diff > timeout_duration) {                      fprintf(stderr, "timeout reached\n");                      timedout = true; -                } else { +                } +                else {                      const int wait_ms = 1;                      usleep(wait_ms * 1000);                  } | 
