diff options
| author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2025-03-11 16:48:39 +0100 | 
|---|---|---|
| committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2025-03-11 16:48:39 +0100 | 
| commit | 228ec291a58869a19b368a536f3ea0fcd97a57b6 (patch) | |
| tree | 957d2be8f67f0fbd6ae70c49e3705061982c8fbf /lib/ThreadsafeQueue.h | |
| parent | b2a964b6338466b74d6ef217f7453a1c7c2f72c9 (diff) | |
| download | ODR-SourceCompanion-228ec291a58869a19b368a536f3ea0fcd97a57b6.tar.gz ODR-SourceCompanion-228ec291a58869a19b368a536f3ea0fcd97a57b6.tar.bz2 ODR-SourceCompanion-228ec291a58869a19b368a536f3ea0fcd97a57b6.zip | |
Update common 5959418
Diffstat (limited to 'lib/ThreadsafeQueue.h')
| -rw-r--r-- | lib/ThreadsafeQueue.h | 38 | 
1 files changed, 25 insertions, 13 deletions
| diff --git a/lib/ThreadsafeQueue.h b/lib/ThreadsafeQueue.h index 8b385d6..13bc19e 100644 --- a/lib/ThreadsafeQueue.h +++ b/lib/ThreadsafeQueue.h @@ -2,7 +2,7 @@     Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in     Right of Canada (Communications Research Center Canada) -   Copyright (C) 2023 +   Copyright (C) 2025     Matthias P. Braendli, matthias.braendli@mpb.li     An implementation for a threadsafe queue, depends on C++11 @@ -28,6 +28,7 @@  #pragma once +#include <functional>  #include <mutex>  #include <condition_variable>  #include <queue> @@ -63,10 +64,10 @@ public:          std::unique_lock<std::mutex> lock(the_mutex);          size_t queue_size_before = the_queue.size();          if (max_size == 0) { -            the_queue.push(val); +            the_queue.push_back(val);          }          else if (queue_size_before < max_size) { -            the_queue.push(val); +            the_queue.push_back(val);          }          size_t queue_size = the_queue.size();          lock.unlock(); @@ -80,10 +81,10 @@ public:          std::unique_lock<std::mutex> lock(the_mutex);          size_t queue_size_before = the_queue.size();          if (max_size == 0) { -            the_queue.emplace(std::move(val)); +            the_queue.emplace_back(std::move(val));          }          else if (queue_size_before < max_size) { -            the_queue.emplace(std::move(val)); +            the_queue.emplace_back(std::move(val));          }          size_t queue_size = the_queue.size();          lock.unlock(); @@ -110,9 +111,9 @@ public:          bool overflow = false;          while (the_queue.size() >= max_size) {              overflow = true; -            the_queue.pop(); +            the_queue.pop_front();          } -        the_queue.push(val); +        the_queue.push_back(val);          const size_t queue_size = the_queue.size();          lock.unlock(); @@ -129,9 +130,9 @@ public:          bool overflow = false;          while (the_queue.size() >= max_size) {              overflow = true; -            the_queue.pop(); +            the_queue.pop_front();          } -        the_queue.emplace(std::move(val)); +        the_queue.emplace_back(std::move(val));          const size_t queue_size = the_queue.size();          lock.unlock(); @@ -152,7 +153,7 @@ public:          while (the_queue.size() >= threshold) {              the_tx_notification.wait(lock);          } -        the_queue.push(val); +        the_queue.push_back(val);          size_t queue_size = the_queue.size();          lock.unlock(); @@ -198,7 +199,7 @@ public:          }          popped_value = the_queue.front(); -        the_queue.pop(); +        the_queue.pop_front();          lock.unlock();          the_tx_notification.notify_one(); @@ -220,15 +221,26 @@ public:          }          else {              std::swap(popped_value, the_queue.front()); -            the_queue.pop(); +            the_queue.pop_front();              lock.unlock();              the_tx_notification.notify_one();          }      } +    template<typename R> +    std::vector<R> map(std::function<R(const T&)> func) const +    { +        std::vector<R> result; +        std::unique_lock<std::mutex> lock(the_mutex); +        for (const T& elem : the_queue) { +            result.push_back(func(elem)); +        } +        return result; +    } +  private: -    std::queue<T> the_queue; +    std::deque<T> the_queue;      mutable std::mutex the_mutex;      std::condition_variable the_rx_notification;      std::condition_variable the_tx_notification; | 
