From c93a726d8fffdca2b90aa22eed13ce58a0e8009f Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 24 Jun 2016 19:51:18 +0200 Subject: Fix UHD buffering leading to out of memory Since commit d9ef93e, UHD does not backpressure the modulator anymore. If a pipe input is used, the ODR-DabMux before also doesn't get any back-pressure, the modulation thread and the mux run a very high rate. This high rate fills the buffer between OutputUHD and its worker thread, until the out-of-memory killer kills ODR-DabMod. Less impact on ZMQ input, because that is throttled at the mux, but we still have a buffer that can grow in an uncontrolled way --- src/ThreadsafeQueue.h | 41 ++++++++++++++++++++++++++++++++++++----- 1 file changed, 36 insertions(+), 5 deletions(-) (limited to 'src/ThreadsafeQueue.h') diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h index e5e83ef..e27c100 100644 --- a/src/ThreadsafeQueue.h +++ b/src/ThreadsafeQueue.h @@ -58,14 +58,37 @@ public: size_t queue_size = the_queue.size(); lock.unlock(); - notify(); + the_rx_notification.notify_one(); return queue_size; } - void notify() + /* Push one element into the queue, but wait until the + * queue size goes below the threshold. + * + * Notify waiting thread. + * + * returns the new queue size. + */ + size_t push_wait_if_full(T const& val, size_t threshold) + { + boost::mutex::scoped_lock lock(the_mutex); + while (the_queue.size() >= threshold) { + the_tx_notification.wait(lock); + } + the_queue.push(val); + size_t queue_size = the_queue.size(); + lock.unlock(); + + the_rx_notification.notify_one(); + + return queue_size; + } + + /* Send a notification for the receiver thread */ + void notify(void) { - the_condition_variable.notify_one(); + the_rx_notification.notify_one(); } bool empty() const @@ -89,6 +112,10 @@ public: popped_value = the_queue.front(); the_queue.pop(); + + lock.unlock(); + the_tx_notification.notify_one(); + return true; } @@ -96,17 +123,21 @@ public: { boost::mutex::scoped_lock lock(the_mutex); while (the_queue.size() < prebuffering) { - the_condition_variable.wait(lock); + the_rx_notification.wait(lock); } popped_value = the_queue.front(); the_queue.pop(); + + lock.unlock(); + the_tx_notification.notify_one(); } private: std::queue the_queue; mutable boost::mutex the_mutex; - boost::condition_variable the_condition_variable; + boost::condition_variable the_rx_notification; + boost::condition_variable the_tx_notification; }; #endif -- cgit v1.2.3