diff options
| author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2019-06-25 17:13:27 +0200 | 
|---|---|---|
| committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2019-06-25 17:13:27 +0200 | 
| commit | 899dcb83ec873cb35d38583d6f48922e1312e9be (patch) | |
| tree | 42cfe4ddc96b52cb9f365f617c556f7e7153f5eb /lib/ThreadsafeQueue.h | |
| parent | 8bba5052cf7a3677e8be12315a03959fec33bf17 (diff) | |
| download | ODR-SourceCompanion-899dcb83ec873cb35d38583d6f48922e1312e9be.tar.gz ODR-SourceCompanion-899dcb83ec873cb35d38583d6f48922e1312e9be.tar.bz2 ODR-SourceCompanion-899dcb83ec873cb35d38583d6f48922e1312e9be.zip | |
Replace socket library
Diffstat (limited to 'lib/ThreadsafeQueue.h')
| -rw-r--r-- | lib/ThreadsafeQueue.h | 176 | 
1 files changed, 176 insertions, 0 deletions
| diff --git a/lib/ThreadsafeQueue.h b/lib/ThreadsafeQueue.h new file mode 100644 index 0000000..62f4c96 --- /dev/null +++ b/lib/ThreadsafeQueue.h @@ -0,0 +1,176 @@ +/* +   Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in +   Right of Canada (Communications Research Center Canada) + +   Copyright (C) 2018 +   Matthias P. Braendli, matthias.braendli@mpb.li + +   An implementation for a threadsafe queue, depends on C++11 + +   When creating a ThreadsafeQueue, one can specify the minimal number +   of elements it must contain before it is possible to take one +   element out. + */ +/* +   This program is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as published by +   the Free Software Foundation, either version 3 of the License, or +   (at your option) any later version. + +   This program is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with this program.  If not, see <https://www.gnu.org/licenses/>. + */ + +#pragma once + +#include <mutex> +#include <condition_variable> +#include <queue> +#include <utility> + +/* This queue is meant to be used by two threads. One producer + * that pushes elements into the queue, and one consumer that + * retrieves the elements. + * + * The queue can make the consumer block until an element + * is available, or a wakeup requested. + */ + +/* Class thrown by blocking pop to tell the consumer + * that there's a wakeup requested. */ +class ThreadsafeQueueWakeup {}; + +template<typename T> +class ThreadsafeQueue +{ +public: +    /* Push one element into the queue, and notify another thread that +     * might be waiting. +     * +     * returns the new queue size. +     */ +    size_t push(T const& val) +    { +        std::unique_lock<std::mutex> lock(the_mutex); +        the_queue.push(val); +        size_t queue_size = the_queue.size(); +        lock.unlock(); + +        the_rx_notification.notify_one(); + +        return queue_size; +    } + +    size_t push(T&& val) +    { +        std::unique_lock<std::mutex> lock(the_mutex); +        the_queue.emplace(std::move(val)); +        size_t queue_size = the_queue.size(); +        lock.unlock(); + +        the_rx_notification.notify_one(); + +        return queue_size; +    } + +    /* Push one element into the queue, but wait until the +     * queue size goes below the threshold. +     * +     * Notify waiting thread. +     * +     * returns the new queue size. +     */ +    size_t push_wait_if_full(T const& val, size_t threshold) +    { +        std::unique_lock<std::mutex> lock(the_mutex); +        while (the_queue.size() >= threshold) { +            the_tx_notification.wait(lock); +        } +        the_queue.push(val); +        size_t queue_size = the_queue.size(); +        lock.unlock(); + +        the_rx_notification.notify_one(); + +        return queue_size; +    } + +    /* Trigger a wakeup event on a blocking consumer, which +     * will receive a ThreadsafeQueueWakeup exception. +     */ +    void trigger_wakeup(void) +    { +        std::unique_lock<std::mutex> lock(the_mutex); +        wakeup_requested = true; +        lock.unlock(); +        the_rx_notification.notify_one(); +    } + +    /* Send a notification for the receiver thread */ +    void notify(void) +    { +        the_rx_notification.notify_one(); +    } + +    bool empty() const +    { +        std::unique_lock<std::mutex> lock(the_mutex); +        return the_queue.empty(); +    } + +    size_t size() const +    { +        std::unique_lock<std::mutex> lock(the_mutex); +        return the_queue.size(); +    } + +    bool try_pop(T& popped_value) +    { +        std::unique_lock<std::mutex> lock(the_mutex); +        if (the_queue.empty()) { +            return false; +        } + +        popped_value = the_queue.front(); +        the_queue.pop(); + +        lock.unlock(); +        the_tx_notification.notify_one(); + +        return true; +    } + +    void wait_and_pop(T& popped_value, size_t prebuffering = 1) +    { +        std::unique_lock<std::mutex> lock(the_mutex); +        while (the_queue.size() < prebuffering and +                not wakeup_requested) { +            the_rx_notification.wait(lock); +        } + +        if (wakeup_requested) { +            wakeup_requested = false; +            throw ThreadsafeQueueWakeup(); +        } +        else { +            std::swap(popped_value, the_queue.front()); +            the_queue.pop(); + +            lock.unlock(); +            the_tx_notification.notify_one(); +        } +    } + +private: +    std::queue<T> the_queue; +    mutable std::mutex the_mutex; +    std::condition_variable the_rx_notification; +    std::condition_variable the_tx_notification; +    bool wakeup_requested = false; +}; + | 
