aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/transport/inline_io_service.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/transport/inline_io_service.cpp')
-rw-r--r--host/lib/transport/inline_io_service.cpp63
1 files changed, 37 insertions, 26 deletions
diff --git a/host/lib/transport/inline_io_service.cpp b/host/lib/transport/inline_io_service.cpp
index 6449bbda8..1438699b7 100644
--- a/host/lib/transport/inline_io_service.cpp
+++ b/host/lib/transport/inline_io_service.cpp
@@ -144,7 +144,7 @@ public:
}
}
- void recv_flow_ctrl(inline_recv_cb* cb, recv_link_if* recv_link, int32_t timeout_ms)
+ bool recv_flow_ctrl(inline_recv_cb* cb, recv_link_if* recv_link, int32_t timeout_ms)
{
while (true) {
frame_buff::uptr buff = recv_link->get_recv_buff(timeout_ms);
@@ -156,7 +156,7 @@ public:
rcvr_found = true;
if (rcvr == cb) {
assert(!buff);
- return;
+ return true;
} else if (buff) {
/* NOTE: Should not overflow, by construction
* Every queue can hold link->get_num_recv_frames()
@@ -174,7 +174,7 @@ public:
recv_link->release_recv_buff(std::move(buff));
}
} else { /* Timeout */
- return;
+ return false;
}
}
}
@@ -250,12 +250,14 @@ public:
send_callback_t send_cb,
recv_link_if::sptr recv_link,
size_t num_recv_frames,
- recv_callback_t fc_cb)
- : inline_recv_cb(fc_cb, send_link.get())
+ recv_callback_t recv_cb,
+ send_io_if::fc_callback_t fc_cb)
+ : inline_recv_cb(recv_cb, send_link.get())
, _io_srv(io_srv)
, _send_link(send_link)
, _send_cb(send_cb)
, _recv_link(recv_link)
+ , _fc_cb(fc_cb)
{
_num_recv_frames = num_recv_frames;
_num_send_frames = num_send_frames;
@@ -269,9 +271,27 @@ public:
}
}
+ bool wait_for_dest_ready(size_t num_bytes, int32_t timeout_ms)
+ {
+ if (!_recv_link) {
+ // If there is no flow control link, then the destination must
+ // always be ready for more data.
+ return true;
+ }
+
+ while (!_fc_cb(num_bytes)) {
+ const bool updated =
+ _io_srv->recv_flow_ctrl(this, _recv_link.get(), timeout_ms);
+
+ if (!updated) {
+ return false;
+ }
+ }
+ return true;
+ }
+
frame_buff::uptr get_send_buff(int32_t timeout_ms)
{
- /* Check initial flow control result */
frame_buff::uptr buff = _send_link->get_send_buff(timeout_ms);
if (buff) {
_num_frames_in_use++;
@@ -283,19 +303,8 @@ public:
void release_send_buff(frame_buff::uptr buff)
{
- while (buff) {
- // Try to send a packet
- _send_cb(buff, _send_link.get());
- if (_recv_link) {
- // If the buffer was not released, use a timeout to receive
- // the flow control packet, to avoid wasting CPU.
- if (!buff) {
- _io_srv->recv_flow_ctrl(this, _recv_link.get(), 0);
- } else {
- _io_srv->recv_flow_ctrl(this, _recv_link.get(), 100);
- }
- }
- }
+ // Send the packet using callback
+ _send_cb(std::move(buff), _send_link.get());
_num_frames_in_use--;
}
@@ -305,6 +314,7 @@ private:
send_callback_t _send_cb;
recv_link_if::sptr _recv_link;
recv_callback_t _recv_cb;
+ fc_callback_t _fc_cb;
size_t _num_frames_in_use = 0;
};
@@ -369,7 +379,8 @@ send_io_if::sptr inline_io_service::make_send_client(send_link_if::sptr send_lin
send_io_if::send_callback_t send_cb,
recv_link_if::sptr recv_link,
size_t num_recv_frames,
- recv_callback_t recv_cb)
+ recv_callback_t recv_cb,
+ send_io_if::fc_callback_t fc_cb)
{
UHD_ASSERT_THROW(send_link);
UHD_ASSERT_THROW(num_send_frames > 0);
@@ -377,9 +388,10 @@ send_io_if::sptr inline_io_service::make_send_client(send_link_if::sptr send_lin
connect_sender(send_link.get(), num_send_frames);
sptr io_srv = shared_from_this();
auto send_io = std::make_shared<inline_send_io>(
- io_srv, send_link, num_send_frames, send_cb, recv_link, num_recv_frames, recv_cb);
+ io_srv, send_link, num_send_frames, send_cb, recv_link, num_recv_frames, recv_cb, fc_cb);
if (recv_link) {
UHD_ASSERT_THROW(recv_cb);
+ UHD_ASSERT_THROW(fc_cb);
UHD_ASSERT_THROW(num_recv_frames > 0);
connect_receiver(recv_link.get(), send_io.get(), num_recv_frames);
}
@@ -474,7 +486,7 @@ frame_buff::uptr inline_io_service::recv(
}
}
-void inline_io_service::recv_flow_ctrl(
+bool inline_io_service::recv_flow_ctrl(
inline_recv_cb* recv_io_cb, recv_link_if* recv_link, int32_t timeout_ms)
{
inline_recv_mux* mux;
@@ -483,8 +495,7 @@ void inline_io_service::recv_flow_ctrl(
if (mux) {
/* Defer to mux's recv_flow_ctrl() if present */
- mux->recv_flow_ctrl(recv_io_cb, recv_link, timeout_ms);
- return;
+ return mux->recv_flow_ctrl(recv_io_cb, recv_link, timeout_ms);
} else {
assert(recv_io_cb == rcvr);
}
@@ -495,13 +506,13 @@ void inline_io_service::recv_flow_ctrl(
if (buff) {
if (rcvr->callback(buff, recv_link)) {
assert(!buff);
- return;
+ return true;
} else {
UHD_LOG_DEBUG("IO_SRV", "Dropping packet with no receiver");
recv_link->release_recv_buff(std::move(buff));
}
} else { /* Timeout */
- return;
+ return false;
}
}
}