diff options
| author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2019-06-26 10:57:58 +0200 | 
|---|---|---|
| committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2019-06-26 10:57:58 +0200 | 
| commit | 8e7a15754a3fef09cc5de372f207936740459c56 (patch) | |
| tree | f1c3cc289eebc9ed9b620696b24467680288cc39 /src | |
| parent | 899dcb83ec873cb35d38583d6f48922e1312e9be (diff) | |
| download | ODR-SourceCompanion-8e7a15754a3fef09cc5de372f207936740459c56.tar.gz ODR-SourceCompanion-8e7a15754a3fef09cc5de372f207936740459c56.tar.bz2 ODR-SourceCompanion-8e7a15754a3fef09cc5de372f207936740459c56.zip | |
Refactor OrderedQueue a bit
Diffstat (limited to 'src')
| -rw-r--r-- | src/AVTInput.cpp | 11 | ||||
| -rw-r--r-- | src/AVTInput.h | 12 | ||||
| -rw-r--r-- | src/OrderedQueue.cpp | 59 | ||||
| -rw-r--r-- | src/OrderedQueue.h | 18 | 
4 files changed, 52 insertions, 48 deletions
| diff --git a/src/AVTInput.cpp b/src/AVTInput.cpp index 973ed7b..0e5b669 100644 --- a/src/AVTInput.cpp +++ b/src/AVTInput.cpp @@ -69,9 +69,7 @@ AVTInput::AVTInput(const std::string& input_uri,      _input_pad_packet(2048),      _ordered(5000, _jitterBufferSize),      _lastInfoFrameType(_typeCantExtract) -{ - -} +{ }  int AVTInput::prepare(void)  { @@ -430,14 +428,13 @@ ssize_t AVTInput::getNextFrame(std::vector<uint8_t> &buf)      //printf("B: _padFrameQueue size=%zu\n", _padFrameQueue.size());      // Assemble next frame -    int32_t nb = 0;      std::vector<uint8_t> part; -    while (_nbFrames < 5 && (nb = _ordered.pop(part)) != 0) +    while (_nbFrames < 5 and not (part = _ordered.pop()).empty())      {          while (_checkMessage()); -        memcpy(_currentFrame.data() + _currentFrameSize, part.data(), nb); -        _currentFrameSize += nb; +        memcpy(_currentFrame.data() + _currentFrameSize, part.data(), part.size()); +        _currentFrameSize += part.size();          _nbFrames ++;      } diff --git a/src/AVTInput.h b/src/AVTInput.h index 62b2248..0f58418 100644 --- a/src/AVTInput.h +++ b/src/AVTInput.h @@ -101,12 +101,12 @@ class AVTInput          uint32_t _pad_port;          size_t _jitterBufferSize; -        Socket::UDPSocket       _input_socket; -        Socket::UDPSocket       _output_socket; -        Socket::UDPPacket       _output_packet; -        Socket::UDPSocket       _input_pad_socket; -        Socket::UDPPacket       _input_pad_packet; -        OrderedQueue    _ordered; +        Socket::UDPSocket _input_socket; +        Socket::UDPSocket _output_socket; +        Socket::UDPPacket _output_packet; +        Socket::UDPSocket _input_pad_socket; +        Socket::UDPPacket _input_pad_packet; +        OrderedQueue _ordered;          std::queue<std::vector<uint8_t> > _padFrameQueue;          int32_t _subChannelIndex = DEF_BR/8; diff --git a/src/OrderedQueue.cpp b/src/OrderedQueue.cpp index 8d545df..8a768e7 100644 --- a/src/OrderedQueue.cpp +++ b/src/OrderedQueue.cpp @@ -26,39 +26,40 @@  //#define DEBUG(x...)  #define ERROR(fmt, A...)   fprintf(stderr, "OrderedQueue: ERROR " fmt, ##A) -OrderedQueue::OrderedQueue(int countModulo, size_t capacity) : -    _countModulo(countModulo), +OrderedQueue::OrderedQueue(int maxIndex, size_t capacity) : +    _maxIndex(maxIndex),      _capacity(capacity)  {  } -void OrderedQueue::push(int32_t count, const uint8_t* buf, size_t size) +void OrderedQueue::push(int32_t index, const uint8_t* buf, size_t size)  { -//    DEBUG("OrderedQueue::push count=%d\n", count); -    count = (count+_countModulo) % _countModulo; +    // DEBUG("OrderedQueue::push index=%d\n", index); +    index = (index + _maxIndex) % _maxIndex; -    // First frame makes the count initialisation. -    if (_lastCount == -1) { -        _lastCount = (count+_countModulo-1) % _countModulo; +    // First frame makes the index initialisation. +    if (_lastIndexPop == -1) { +        // Equivalent to index - 1 in modulo arithmetic: +        _lastIndexPop = (index + _maxIndex-1) % _maxIndex;      }      if (_stock.size() < _capacity) { -        if (_stock.find(count) == _stock.end()) { -            // count already exists, duplicated frame +        if (_stock.find(index) == _stock.end()) { +            // index already exists, duplicated frame              // Replace the old one by the new one. -            // the old one could a an old frame from the previous count loop +            // the old one could a an old frame from the previous index loop              _duplicated++; -            DEBUG("Duplicated count=%d\n", count); +            DEBUG("Duplicated index=%d\n", index);          }          OrderedQueueData oqd(size);          copy(buf, buf + size, oqd.begin()); -        _stock[count] = move(oqd); +        _stock[index] = move(oqd);      }      else {          _overruns++;          if (_overruns < 100) { -            DEBUG("Overruns (size=%zu) count=%d not inserted\n", _stock.size(), count); +            DEBUG("Overruns (size=%zu) index=%d not inserted\n", _stock.size(), index);          }          else if (_overruns == 100) {              DEBUG("stop displaying Overruns\n"); @@ -72,43 +73,41 @@ bool OrderedQueue::availableData() const      return _stock.size() > 0;  } -size_t OrderedQueue::pop(std::vector<uint8_t>& buf, int32_t *retCount) +std::vector<uint8_t> OrderedQueue::pop(int32_t *retCount)  { -    size_t nbBytes = 0; +    OrderedQueueData buf;      uint32_t gap = 0;      if (_stock.size() > 0) { -        int32_t nextCount = (_lastCount+1) % _countModulo; +        int32_t nextIndex = (_lastIndexPop+1) % _maxIndex;          bool found = false;          while (not found) {              try { -                auto& oqd = _stock.at(nextCount); -                buf = move(oqd); -                _stock.erase(nextCount); -                _lastCount = nextCount; -                if (retCount) *retCount = _lastCount; +                buf = move(_stock.at(nextIndex)); +                _stock.erase(nextIndex); +                _lastIndexPop = nextIndex; +                if (retCount) *retCount = _lastIndexPop;                  found = true;              } -            catch (const std::out_of_range&) -            { +            catch (const std::out_of_range&) {                  if (_stock.size() < _capacity) { -                    found = true; +                    break;                  }                  else { -                    // Search for the new reference count, starting from the current one +                    // Search for the new index, starting from the current one                      // This could be optimised, but the modulo makes things                      // not easy.                      gap++; -                    nextCount = (nextCount+1) % _countModulo; +                    nextIndex = (nextIndex+1) % _maxIndex;                  }              }          }      }      if (gap > 0) { -        DEBUG("Count jump of %d\n", gap); +        DEBUG("index jump of %d\n", gap);      } -//    if (nbBytes > 0 && retCount) DEBUG("OrderedQueue::pop count=%d\n", *retCount); -    return nbBytes; + +    return buf;  } diff --git a/src/OrderedQueue.h b/src/OrderedQueue.h index c8958cb..4652762 100644 --- a/src/OrderedQueue.h +++ b/src/OrderedQueue.h @@ -25,23 +25,31 @@  #include <cstdint>  #include <cstdio> +/* An queue that receives indexed frames, potentially out-of-order, + * which returns the frames in-order. + */  class OrderedQueue  {      public: -        OrderedQueue(int32_t countModulo, size_t capacity); +        /* Indexes of frames must be between 0 and maxIndex. +         * The queue will fill to capacity if there is a gap. +         */ +        OrderedQueue(int32_t maxIndex, size_t capacity); -        void push(int32_t count, const uint8_t* buf, size_t size); +        void push(int32_t index, const uint8_t* buf, size_t size);          bool availableData() const; -        size_t pop(std::vector<uint8_t>& buf, int32_t *retCount=nullptr); + +        /* Return the next buffer, or an empty buffer if none available */ +        std::vector<uint8_t> pop(int32_t *retCount=nullptr);          using OrderedQueueData = std::vector<uint8_t>;      private: -        int32_t     _countModulo; +        int32_t     _maxIndex;          size_t      _capacity;          uint64_t    _duplicated = 0;          uint64_t    _overruns = 0; -        int32_t     _lastCount = -1; +        int32_t     _lastIndexPop = -1;          std::map<int, OrderedQueueData> _stock;  }; | 
