diff options
Diffstat (limited to 'host/lib/include/uhdlib')
3 files changed, 339 insertions, 0 deletions
| diff --git a/host/lib/include/uhdlib/transport/frame_reservation_mgr.hpp b/host/lib/include/uhdlib/transport/frame_reservation_mgr.hpp new file mode 100644 index 000000000..f0dd853a4 --- /dev/null +++ b/host/lib/include/uhdlib/transport/frame_reservation_mgr.hpp @@ -0,0 +1,110 @@ +// +// Copyright 2019 Ettus Research, a National Instruments brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_UHDLIB_TRANSPORT_FRAME_RESERVATION_MGR_HPP +#define INCLUDED_UHDLIB_TRANSPORT_FRAME_RESERVATION_MGR_HPP + +#include <uhd/config.hpp> +#include <uhd/exception.hpp> +#include <uhdlib/transport/link_if.hpp> +#include <unordered_map> + +namespace uhd { namespace transport { + +/*! + * Helper class to keep track of the number of frames reserved from a pair of links + */ +class frame_reservation_mgr +{ +public: +    struct frame_reservation_t +    { +        recv_link_if::sptr recv_link; +        size_t num_recv_frames = 0; +        send_link_if::sptr send_link; +        size_t num_send_frames = 0; +    }; + +    void register_link(const recv_link_if::sptr& recv_link) +    { +        if (_recv_tbl[recv_link.get()] != 0) { +            throw uhd::runtime_error("Recv link already attached to I/O service"); +        } +        _recv_tbl[recv_link.get()] = 0; +    } + +    void register_link(const send_link_if::sptr& send_link) +    { +        if (_send_tbl[send_link.get()] != 0) { +            throw uhd::runtime_error("Send link already attached to I/O service"); +        } +        _send_tbl[send_link.get()] = 0; +    } + +    void unregister_link(const recv_link_if::sptr& recv_link) +    { +        auto link_ptr = recv_link.get(); +        UHD_ASSERT_THROW(_recv_tbl.count(link_ptr) != 0); +        _recv_tbl.erase(link_ptr); +    } + +    void unregister_link(const send_link_if::sptr& send_link) +    { +        auto link_ptr = send_link.get(); +        UHD_ASSERT_THROW(_send_tbl.count(link_ptr) != 0); +        _send_tbl.erase(link_ptr); +    } + +    void reserve_frames(const frame_reservation_t& reservation) +    { +        if (reservation.recv_link) { +            const size_t rsvd_frames = _recv_tbl.at(reservation.recv_link.get()); +            const size_t capacity    = reservation.recv_link->get_num_recv_frames(); +            if (rsvd_frames + reservation.num_recv_frames > capacity) { +                throw uhd::runtime_error( +                    "Number of frames requested exceeds link recv frame capacity"); +            } + +            recv_link_if* link_ptr = reservation.recv_link.get(); +            _recv_tbl[link_ptr]    = rsvd_frames + reservation.num_recv_frames; +        } + +        if (reservation.send_link) { +            const size_t rsvd_frames = _send_tbl.at(reservation.send_link.get()); +            const size_t capacity    = reservation.send_link->get_num_send_frames(); +            if (rsvd_frames + reservation.num_send_frames > capacity) { +                throw uhd::runtime_error( +                    "Number of frames requested exceeds link send frame capacity"); +            } + +            send_link_if* link_ptr = reservation.send_link.get(); +            _send_tbl[link_ptr]    = rsvd_frames + reservation.num_send_frames; +        } +    } + +    void unreserve_frames(const frame_reservation_t& reservation) +    { +        if (reservation.recv_link) { +            const size_t rsvd_frames = _recv_tbl.at(reservation.recv_link.get()); +            recv_link_if* link_ptr   = reservation.recv_link.get(); +            _recv_tbl[link_ptr]      = rsvd_frames - reservation.num_recv_frames; +        } + +        if (reservation.send_link) { +            const size_t rsvd_frames = _send_tbl.at(reservation.send_link.get()); +            send_link_if* link_ptr   = reservation.send_link.get(); +            _send_tbl[link_ptr]      = rsvd_frames - reservation.num_send_frames; +        } +    } + +private: +    std::unordered_map<recv_link_if*, size_t> _recv_tbl; +    std::unordered_map<send_link_if*, size_t> _send_tbl; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_UHDLIB_TRANSPORT_FRAME_RESERVATION_MGR_HPP */ diff --git a/host/lib/include/uhdlib/transport/offload_io_service_client.hpp b/host/lib/include/uhdlib/transport/offload_io_service_client.hpp new file mode 100644 index 000000000..620e796ef --- /dev/null +++ b/host/lib/include/uhdlib/transport/offload_io_service_client.hpp @@ -0,0 +1,158 @@ +// +// Copyright 2019 Ettus Research, a National Instruments brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_UHDLIB_TRANSPORT_OFFLOAD_IO_SERVICE_CLIENT_HPP +#define INCLUDED_UHDLIB_TRANSPORT_OFFLOAD_IO_SERVICE_CLIENT_HPP + +#include <uhd/transport/frame_buff.hpp> +#include <chrono> +#include <thread> + +namespace uhd { namespace transport { + +namespace detail { + +// Implementation of get_send_buff used below by send and recv clients +template <typename pop_func_t> +static frame_buff::uptr client_get_buff(pop_func_t pop, const int32_t timeout_ms) +{ +    using namespace std::chrono; + +    if (timeout_ms == 0) { +        return frame_buff::uptr(pop()); +    } + +    const auto end_time = steady_clock::now() + milliseconds(timeout_ms); + +    bool last_check = false; + +    while (true) { +        if (frame_buff* buff = pop()) { +            return frame_buff::uptr(buff); +        } + +        if (timeout_ms > 0 && steady_clock::now() > end_time) { +            if (last_check) { +                return nullptr; +            } else { +                last_check = true; +            } +        } +        std::this_thread::yield(); +    } +} + +} // namespace detail + +/*! + * Recv I/O client for offload I/O service + */ +template <typename io_service_t> +class offload_recv_io : public recv_io_if +{ +public: +    offload_recv_io(typename io_service_t::sptr io_srv, +        size_t num_recv_frames, +        size_t num_send_frames, +        typename io_service_t::client_port_t::sptr& port) +        : _io_srv(io_srv), _port(port) +    { +        _num_recv_frames = num_recv_frames; +        _num_send_frames = num_send_frames; +    } + +    ~offload_recv_io() +    { +        assert(_num_frames_in_use == 0); + +        if (_io_srv) { +            _port->client_disconnect(); +        } +    } + +    frame_buff::uptr get_recv_buff(int32_t timeout_ms) +    { +        return detail::client_get_buff( +            [this]() { +                frame_buff* buff = _port->client_pop(); +                _num_frames_in_use += buff ? 1 : 0; +                return buff; +            }, +            timeout_ms); +    } + +    void release_recv_buff(frame_buff::uptr buff) +    { +        assert(buff); +        _port->client_push(buff.release()); +        _num_frames_in_use--; +    } + +private: +    offload_recv_io()                       = delete; +    offload_recv_io(const offload_recv_io&) = delete; + +    typename io_service_t::sptr _io_srv; +    typename io_service_t::client_port_t::sptr _port; +    size_t _num_frames_in_use = 0; +}; + +/*! + * Send I/O client for offload I/O service + */ +template <typename io_service_t> +class offload_send_io : public send_io_if +{ +public: +    offload_send_io(typename io_service_t::sptr io_srv, +        size_t num_recv_frames, +        size_t num_send_frames, +        typename io_service_t::client_port_t::sptr& port) +        : _io_srv(io_srv), _port(port) +    { +        _num_recv_frames = num_recv_frames; +        _num_send_frames = num_send_frames; +    } + +    ~offload_send_io() +    { +        assert(_num_frames_in_use == 0); + +        if (_io_srv) { +            _port->client_disconnect(); +        } +    } + +    frame_buff::uptr get_send_buff(int32_t timeout_ms) +    { +        return detail::client_get_buff( +            [this]() { +                frame_buff* buff = _port->client_pop(); +                _num_frames_in_use += buff ? 1 : 0; +                return buff; +            }, +            timeout_ms); +    } + +    void release_send_buff(frame_buff::uptr buff) +    { +        assert(buff); +        _port->client_push(buff.release()); +        _num_frames_in_use--; +    } + +private: +    offload_send_io()                       = delete; +    offload_send_io(const offload_send_io&) = delete; + +    typename io_service_t::sptr _io_srv; +    typename io_service_t::client_port_t::sptr _port; +    size_t _num_frames_in_use = 0; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_UHDLIB_TRANSPORT_OFFLOAD_IO_SERVICE_CLIENT_HPP */ diff --git a/host/lib/include/uhdlib/utils/semaphore.hpp b/host/lib/include/uhdlib/utils/semaphore.hpp new file mode 100644 index 000000000..ae77ed102 --- /dev/null +++ b/host/lib/include/uhdlib/utils/semaphore.hpp @@ -0,0 +1,71 @@ +// +// Copyright 2019 Ettus Research, a National Instruments brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include <condition_variable> +#include <chrono> +#include <mutex> + +#ifndef INCLUDED_UHDLIB_UTILS_SEMAPHORE_HPP +#define INCLUDED_UHDLIB_UTILS_SEMAPHORE_HPP + +namespace uhd { + +/*! + * A sempahore built using std::condition_variable + */ +class semaphore +{ +public: +    void notify() +    { +        std::unique_lock<std::mutex> lock(_cv_mutex); +        _count++; +        _cv.notify_one(); +    } + +    void wait() +    { +        std::unique_lock<std::mutex> lock(_cv_mutex); +        _cv.wait(lock, [this]() { return this->_count != 0; }); +        _count--; +    } + +    bool try_wait() +    { +        std::unique_lock<std::mutex> lock(_cv_mutex); +        if (_count != 0) { +            _count--; +            return true; +        } +        return false; +    } + +    bool wait_for(size_t timeout_ms) +    { +        std::chrono::milliseconds timeout(timeout_ms); +        std::unique_lock<std::mutex> lock(_cv_mutex); +        if (_cv.wait_for(lock, timeout, [this]() { return this->_count != 0; })) { +            _count--; +            return true; +        } +        return false; +    } + +    size_t count() +    { +        std::unique_lock<std::mutex> lock(_cv_mutex); +        return _count; +    } + +private: +    std::condition_variable _cv; +    std::mutex _cv_mutex; +    size_t _count = 0; +}; + +} // namespace uhd + +#endif /* INCLUDED_UHDLIB_UTILS_SEMAPHORE_HPP */ | 
