diff options
| author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2023-08-12 15:35:30 +0200 | 
|---|---|---|
| committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2023-08-12 15:35:56 +0200 | 
| commit | 3bdeec4cec73210583454e4a8333d68ad6f34f54 (patch) | |
| tree | 8f7b28e2933ce01e78e12502273477638ff81afc /lib/ThreadsafeQueue.h | |
| parent | 4f404eb1152fc4f8b121c22416583c57da4a86f8 (diff) | |
| download | ODR-SourceCompanion-3bdeec4cec73210583454e4a8333d68ad6f34f54.tar.gz ODR-SourceCompanion-3bdeec4cec73210583454e4a8333d68ad6f34f54.tar.bz2 ODR-SourceCompanion-3bdeec4cec73210583454e4a8333d68ad6f34f54.zip | |
Update common code
Diffstat (limited to 'lib/ThreadsafeQueue.h')
| -rw-r--r-- | lib/ThreadsafeQueue.h | 54 | 
1 files changed, 50 insertions, 4 deletions
| diff --git a/lib/ThreadsafeQueue.h b/lib/ThreadsafeQueue.h index 815dfe0..8b385d6 100644 --- a/lib/ThreadsafeQueue.h +++ b/lib/ThreadsafeQueue.h @@ -2,7 +2,7 @@     Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in     Right of Canada (Communications Research Center Canada) -   Copyright (C) 2018 +   Copyright (C) 2023     Matthias P. Braendli, matthias.braendli@mpb.li     An implementation for a threadsafe queue, depends on C++11 @@ -32,6 +32,7 @@  #include <condition_variable>  #include <queue>  #include <utility> +#include <cassert>  /* This queue is meant to be used by two threads. One producer   * that pushes elements into the queue, and one consumer that @@ -69,7 +70,6 @@ public:          }          size_t queue_size = the_queue.size();          lock.unlock(); -          the_rx_notification.notify_one();          return queue_size; @@ -93,11 +93,57 @@ public:          return queue_size;      } +    struct push_overflow_result { bool overflowed; size_t new_size; }; + +    /* Push one element into the queue, and if queue is +     * full remove one element from the other end. +     * +     * max_size == 0 is not allowed. +     * +     * returns the new queue size and a flag if overflow occurred. +     */ +    push_overflow_result push_overflow(T const& val, size_t max_size) +    { +        assert(max_size > 0); +        std::unique_lock<std::mutex> lock(the_mutex); + +        bool overflow = false; +        while (the_queue.size() >= max_size) { +            overflow = true; +            the_queue.pop(); +        } +        the_queue.push(val); +        const size_t queue_size = the_queue.size(); +        lock.unlock(); + +        the_rx_notification.notify_one(); + +        return {overflow, queue_size}; +    } + +    push_overflow_result push_overflow(T&& val, size_t max_size) +    { +        assert(max_size > 0); +        std::unique_lock<std::mutex> lock(the_mutex); + +        bool overflow = false; +        while (the_queue.size() >= max_size) { +            overflow = true; +            the_queue.pop(); +        } +        the_queue.emplace(std::move(val)); +        const size_t queue_size = the_queue.size(); +        lock.unlock(); + +        the_rx_notification.notify_one(); + +        return {overflow, queue_size}; +    } + +      /* Push one element into the queue, but wait until the       * queue size goes below the threshold.       * -     * Notify waiting thread. -     *       * returns the new queue size.       */      size_t push_wait_if_full(T const& val, size_t threshold) | 
