diff options
author | Martin Braun <martin.braun@ettus.com> | 2019-07-17 15:25:51 -0700 |
---|---|---|
committer | Martin Braun <martin.braun@ettus.com> | 2019-11-26 11:49:32 -0800 |
commit | ab87597e9e76854237b5d78f7d35959b14e9737b (patch) | |
tree | ffb471238391c8a42a9feadd54d7819c45d9a989 /host/lib/rfnoc/rfnoc_rx_streamer.cpp | |
parent | 83abb81e61beec213f9f83843d6fab637a578f4a (diff) | |
download | uhd-ab87597e9e76854237b5d78f7d35959b14e9737b.tar.gz uhd-ab87597e9e76854237b5d78f7d35959b14e9737b.tar.bz2 uhd-ab87597e9e76854237b5d78f7d35959b14e9737b.zip |
rfnoc: Implement overrun handling using action API
In order to enable overrun handling through the action API, a few new
features are implemented:
- The RX streamer can now accept stream command actions. The streamer
will interpret stream command actions as a request to send stream
commands upstream to all producers.
- A new action type is defined ('restart request') which is understood
by the radio and streamer, and is a handshake between producers and
consumers. In this case, it will ask the radio to send a stream
command itself.
When an RX streamer receives an overrun, it will now run the following
algorithm:
1. Stop all upstream producers (this was already in the code before this
commit).
2. If no restart is required, Wait for the radios to have space in the
downstream blocks.
The radio, if it was in continuous streaming mode before the overrun,
includes a flag in its initial action whether or not to restart the
streaming. Also, it will wait for the stop stream command from the
streamer. When it receives that, it will initiate a restart request
handshake.
3. The streamer submits a restart request action upstream. This action
will be received by the radio.
The radio will then check the current time, and send a stream command
action back downstream.
4. The RX streamer receives the stream command action, and uses it to
send another stream command to all upstream producers. This way, all
upstream producers receive a start command for the same time.
Diffstat (limited to 'host/lib/rfnoc/rfnoc_rx_streamer.cpp')
-rw-r--r-- | host/lib/rfnoc/rfnoc_rx_streamer.cpp | 90 |
1 files changed, 90 insertions, 0 deletions
diff --git a/host/lib/rfnoc/rfnoc_rx_streamer.cpp b/host/lib/rfnoc/rfnoc_rx_streamer.cpp index 4340faff0..cfa88b292 100644 --- a/host/lib/rfnoc/rfnoc_rx_streamer.cpp +++ b/host/lib/rfnoc/rfnoc_rx_streamer.cpp @@ -8,7 +8,10 @@ #include <uhdlib/rfnoc/node_accessor.hpp> #include <uhdlib/rfnoc/rfnoc_rx_streamer.hpp> #include <atomic> +#include <thread> +using namespace std::chrono_literals; +; using namespace uhd; using namespace uhd::rfnoc; @@ -24,6 +27,31 @@ rfnoc_rx_streamer::rfnoc_rx_streamer(const size_t num_chans, // No block to which to forward properties or actions set_prop_forwarding_policy(forwarding_policy_t::DROP); set_action_forwarding_policy(forwarding_policy_t::DROP); + // + register_action_handler(ACTION_KEY_RX_EVENT, + [this](const res_source_info& src, action_info::sptr action) { + rx_event_action_info::sptr rx_event_action = + std::dynamic_pointer_cast<rx_event_action_info>(action); + if (!rx_event_action) { + RFNOC_LOG_WARNING("Received invalid RX event action!"); + return; + } + _handle_rx_event_action(src, rx_event_action); + }); + register_action_handler(ACTION_KEY_RX_RESTART_REQ, + [this](const res_source_info& src, action_info::sptr action) { + _handle_restart_request(src, action); + }); + register_action_handler(ACTION_KEY_STREAM_CMD, + [this](const res_source_info& src, action_info::sptr action) { + stream_cmd_action_info::sptr stream_cmd_action = + std::dynamic_pointer_cast<stream_cmd_action_info>(action); + if (!stream_cmd_action) { + RFNOC_LOG_WARNING("Received invalid stream command action!"); + return; + } + _handle_stream_cmd_action(src, stream_cmd_action); + }); // Initialize properties _scaling_in.reserve(num_chans); @@ -139,3 +167,65 @@ void rfnoc_rx_streamer::_register_props(const size_t chan, } }); } + +void rfnoc_rx_streamer::_handle_rx_event_action( + const res_source_info& src, rx_event_action_info::sptr rx_event_action) +{ + UHD_ASSERT_THROW(src.type == res_source_info::INPUT_EDGE); + if (rx_event_action->error_code == uhd::rx_metadata_t::ERROR_CODE_OVERFLOW) { + RFNOC_LOG_DEBUG("Received overrun message on port " << src.instance); + if (_overrun_handling_mode.exchange(true)) { + RFNOC_LOG_TRACE("Ignoring duplicate overrun message."); + return; + } + RFNOC_LOG_TRACE( + "Switching to overrun-handling mode: Stopping all upstream producers..."); + auto stop_action = + stream_cmd_action_info::make(uhd::stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS); + // Reminder: Delivery of all of these actions is deferred until this + // action handler is complete. + for (size_t i = 0; i < get_num_input_ports(); ++i) { + post_action({res_source_info::INPUT_EDGE, i}, stop_action); + } + if (!rx_event_action->args.cast<bool>("cont_mode", false)) { + // FIXME wait until it's safe to restart radios + // If we don't need to restart, that's all we need to do + _overrun_handling_mode = false; + } + } +} + +void rfnoc_rx_streamer::_handle_restart_request( + const res_source_info& src, action_info::sptr) +{ + // FIXME: Now we need to wait until it's safe to restart the radios. + // A flush would achieve this, albeit at the cost of possibly losing + // samples. + // The earliest we can restart is when the FIFOs in upstream producers + // are empty. + RFNOC_LOG_TRACE("Waiting for FIFOs to clear"); + + std::this_thread::sleep_for(100ms); + + // Once it's safe to restart the radios, we ask a radio to send us a + // stream command with its current time. + RFNOC_LOG_TRACE("Requesting restart from overrun-reporting node..."); + post_action({res_source_info::INPUT_EDGE, src.instance}, + action_info::make(ACTION_KEY_RX_RESTART_REQ)); +} + +void rfnoc_rx_streamer::_handle_stream_cmd_action( + const res_source_info& src, stream_cmd_action_info::sptr stream_cmd_action) +{ + RFNOC_LOG_TRACE("Received stream command on " << src.to_string()); + UHD_ASSERT_THROW(src.type == res_source_info::INPUT_EDGE); + auto start_action = + stream_cmd_action_info::make(stream_cmd_action->stream_cmd.stream_mode); + start_action->stream_cmd = stream_cmd_action->stream_cmd; + for (size_t i = 0; i < get_num_input_ports(); ++i) { + post_action({res_source_info::INPUT_EDGE, i}, start_action); + } + if (_overrun_handling_mode.exchange(false)) { + RFNOC_LOG_TRACE("Leaving overrun handling mode."); + } +} |