diff options
| -rw-r--r-- | src/SampleQueue.h | 52 | ||||
| -rw-r--r-- | src/odr-audioenc.cpp | 5 | 
2 files changed, 35 insertions, 22 deletions
| diff --git a/src/SampleQueue.h b/src/SampleQueue.h index 646f3dd..aeeb8d4 100644 --- a/src/SampleQueue.h +++ b/src/SampleQueue.h @@ -1,5 +1,5 @@  /* - * Copyright (C) 2016 Matthias P. Braendli + * Copyright (C) 2018 Matthias P. Braendli   *   * Licensed under the Apache License, Version 2.0 (the "License");   * you may not use this file except in compliance with the License. @@ -48,7 +48,7 @@   * bytes_per_sample * number_of_channels   *   * The queue has a maximum size. If this size is reached, push() - * ignores new data. + * either blocks or ignores new data, depending on drift_compensation.   *   * If pop() is called but there is not enough data in the queue,   * the missing samples are replaced by zeros. pop() will always @@ -65,10 +65,12 @@ class SampleQueue  public:      SampleQueue(unsigned int bytes_per_sample,              unsigned int channels, -            size_t max_size) : +            size_t max_size, +            bool drift_compensation) :          m_bytes_per_sample(bytes_per_sample),          m_channels(channels),          m_max_size(max_size), +        m_push_block(not drift_compensation),          m_overruns(0) {} @@ -81,7 +83,7 @@ public:          size_t new_size = 0;          { -            std::lock_guard<std::mutex> lock(m_mutex); +            std::unique_lock<std::mutex> lock(m_mutex);              assert(len % (m_channels * m_bytes_per_sample) == 0); @@ -93,17 +95,32 @@ public:                      m_max_size / 4);  #endif -            if (m_queue.size() < m_max_size) { -                for (size_t i = 0; i < len; i++) { -                    m_queue.push_back(val[i]); +            if (m_push_block) { +                while (len) { +                    const size_t available = m_max_size - m_queue.size(); +                    const size_t copy_len = std::min(available, len); + +                    if (copy_len > 0) { +                        std::copy(val, val + copy_len, std::back_inserter(m_queue)); +                        len -= copy_len; +                        val += copy_len; +                    } +                    else { +                        const auto wait_timeout = std::chrono::milliseconds(100); +                        m_pop_notification.wait_for(lock, wait_timeout); +                    }                  } - -                new_size = m_queue.size();              }              else { -                m_overruns++; -                new_size = 0; +                if (m_queue.size() < m_max_size) { +                    std::copy(val, val + len, std::back_inserter(m_queue)); +                } +                else { +                    m_overruns++; +                }              } + +            new_size = m_queue.size();          }          m_push_notification.notify_all(); @@ -142,7 +159,6 @@ public:          auto time_start = std::chrono::steady_clock::now();          const auto timeout = std::chrono::milliseconds(timeout_ms); -#if 1          do {              const auto wait_timeout = std::chrono::milliseconds(10);              m_push_notification.wait_for(lock, wait_timeout); @@ -159,13 +175,6 @@ public:                  break;              }          } while (m_queue.size() < len); -#else -        while (m_queue.size() < len) { -            lock.unlock(); -            std::this_thread::sleep_for(std::chrono::milliseconds(1)); -            lock.lock(); -        } -#endif          size_t num_to_copy = (m_queue.size() < len) ?              m_queue.size() : len; @@ -182,6 +191,8 @@ public:  #if DEBUG_SAMPLE_QUEUE          fprintf(stdout, "######## pop_wait returns %zu\n", num_to_copy);  #endif + +        m_pop_notification.notify_all();          return num_to_copy;      } @@ -260,6 +271,7 @@ public:  #endif          } +        m_pop_notification.notify_all();          return ret;      } @@ -267,10 +279,12 @@ private:      std::deque<T> m_queue;      mutable std::mutex m_mutex;      std::condition_variable m_push_notification; +    std::condition_variable m_pop_notification;      unsigned int m_channels;      unsigned int m_bytes_per_sample;      size_t m_max_size; +    bool m_push_block;      /*! Counter to keep track of number of overruns between calls       * to pop() diff --git a/src/odr-audioenc.cpp b/src/odr-audioenc.cpp index b5f7b9f..3b42dfd 100644 --- a/src/odr-audioenc.cpp +++ b/src/odr-audioenc.cpp @@ -931,7 +931,7 @@ int main(int argc, char *argv[])      /*! The SampleQueue \c queue is given to the inputs, so that they       * can fill it.       */ -    SampleQueue<uint8_t> queue(BYTES_PER_SAMPLE, settings.channels, max_size); +    SampleQueue<uint8_t> queue(BYTES_PER_SAMPLE, settings.channels, max_size, settings.drift_compensation);      /* symsize=8, gfpoly=0x11d, fcr=0, prim=1, nroots=10, pad=135 */      rs_handler = init_rs_char(8, 0x11d, 0, 1, 10, 135); @@ -1106,8 +1106,7 @@ int main(int argc, char *argv[])              size_t bytes_from_queue = queue.pop_wait(&input_buf[0], read_bytes, timeout_ms, &overruns); // returns bytes              if (overruns) { -                fprintf(stderr, "%zd overruns occured!\n", overruns); -                status |= STATUS_OVERRUN; +                throw logic_error("Queue overrun in non-drift compensation!");              }              if (bytes_from_queue < read_bytes) { | 
