diff options
author | Ciro Nishiguchi <ciro.nishiguchi@ni.com> | 2019-10-26 18:40:01 -0500 |
---|---|---|
committer | Martin Braun <martin.braun@ettus.com> | 2019-11-26 12:21:33 -0800 |
commit | 10b9d2688b5bcb150eec786a9ef7473f1c1c28ac (patch) | |
tree | 17f45cc4c8d56bba84a6a54a0f6568703fe03e1e /host/lib/transport/inline_io_service.cpp | |
parent | 98a510d68e12751917dba29522c69de9964decf6 (diff) | |
download | uhd-10b9d2688b5bcb150eec786a9ef7473f1c1c28ac.tar.gz uhd-10b9d2688b5bcb150eec786a9ef7473f1c1c28ac.tar.bz2 uhd-10b9d2688b5bcb150eec786a9ef7473f1c1c28ac.zip |
rfnoc: Make I/O services relinquish CPU while waiting
Diffstat (limited to 'host/lib/transport/inline_io_service.cpp')
-rw-r--r-- | host/lib/transport/inline_io_service.cpp | 81 |
1 files changed, 77 insertions, 4 deletions
diff --git a/host/lib/transport/inline_io_service.cpp b/host/lib/transport/inline_io_service.cpp index 93967e09a..6449bbda8 100644 --- a/host/lib/transport/inline_io_service.cpp +++ b/host/lib/transport/inline_io_service.cpp @@ -144,6 +144,41 @@ public: } } + void recv_flow_ctrl(inline_recv_cb* cb, recv_link_if* recv_link, int32_t timeout_ms) + { + while (true) { + frame_buff::uptr buff = recv_link->get_recv_buff(timeout_ms); + /* Process buffer */ + if (buff) { + bool rcvr_found = false; + for (auto& rcvr : _callbacks) { + if (rcvr->callback(buff, recv_link)) { + rcvr_found = true; + if (rcvr == cb) { + assert(!buff); + return; + } else if (buff) { + /* NOTE: Should not overflow, by construction + * Every queue can hold link->get_num_recv_frames() + */ + _queues[rcvr]->push_back(buff.release()); + } else { + /* Continue looping if buffer was consumed and + receiver is not the requested one */ + break; + } + } + } + if (not rcvr_found) { + UHD_LOG_DEBUG("IO_SRV", "Dropping packet with no receiver"); + recv_link->release_recv_buff(std::move(buff)); + } + } else { /* Timeout */ + return; + } + } + } + private: recv_link_if* _link; std::list<inline_recv_cb*> _callbacks; @@ -248,11 +283,18 @@ public: void release_send_buff(frame_buff::uptr buff) { - while (buff) { /* TODO: Possibly don't loop indefinitely here */ + while (buff) { + // Try to send a packet + _send_cb(buff, _send_link.get()); if (_recv_link) { - _io_srv->recv(this, _recv_link.get(), 0); + // If the buffer was not released, use a timeout to receive + // the flow control packet, to avoid wasting CPU. + if (!buff) { + _io_srv->recv_flow_ctrl(this, _recv_link.get(), 0); + } else { + _io_srv->recv_flow_ctrl(this, _recv_link.get(), 100); + } } - _send_cb(buff, _send_link.get()); } _num_frames_in_use--; } @@ -430,7 +472,38 @@ frame_buff::uptr inline_io_service::recv( return frame_buff::uptr(); } } - return frame_buff::uptr(); +} + +void inline_io_service::recv_flow_ctrl( + inline_recv_cb* recv_io_cb, recv_link_if* recv_link, int32_t timeout_ms) +{ + inline_recv_mux* mux; + inline_recv_cb* rcvr; + std::tie(mux, rcvr) = _recv_tbl.at(recv_link); + + if (mux) { + /* Defer to mux's recv_flow_ctrl() if present */ + mux->recv_flow_ctrl(recv_io_cb, recv_link, timeout_ms); + return; + } else { + assert(recv_io_cb == rcvr); + } + + while (true) { + frame_buff::uptr buff = recv_link->get_recv_buff(timeout_ms); + /* Process buffer */ + if (buff) { + if (rcvr->callback(buff, recv_link)) { + assert(!buff); + return; + } else { + UHD_LOG_DEBUG("IO_SRV", "Dropping packet with no receiver"); + recv_link->release_recv_buff(std::move(buff)); + } + } else { /* Timeout */ + return; + } + } } }} // namespace uhd::transport |