diff options
Diffstat (limited to 'host')
| -rw-r--r-- | host/include/uhd/rfnoc/defaults.hpp | 1 | ||||
| -rw-r--r-- | host/lib/include/uhdlib/rfnoc/radio_control_impl.hpp | 1 | ||||
| -rw-r--r-- | host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp | 10 | ||||
| -rw-r--r-- | host/lib/rfnoc/graph.cpp | 5 | ||||
| -rw-r--r-- | host/lib/rfnoc/radio_control_impl.cpp | 48 | ||||
| -rw-r--r-- | host/lib/rfnoc/rfnoc_rx_streamer.cpp | 90 | 
6 files changed, 152 insertions, 3 deletions
diff --git a/host/include/uhd/rfnoc/defaults.hpp b/host/include/uhd/rfnoc/defaults.hpp index 696d31f30..e1046ada2 100644 --- a/host/include/uhd/rfnoc/defaults.hpp +++ b/host/include/uhd/rfnoc/defaults.hpp @@ -29,6 +29,7 @@ static const io_type_t IO_TYPE_SC16 = "sc16";  static const std::string ACTION_KEY_STREAM_CMD("stream_cmd");  static const std::string ACTION_KEY_RX_EVENT("rx_event"); +static const std::string ACTION_KEY_RX_RESTART_REQ("restart_request");  //! If the block name can't be automatically detected, this name is used  static const std::string DEFAULT_BLOCK_NAME = "Block"; diff --git a/host/lib/include/uhdlib/rfnoc/radio_control_impl.hpp b/host/lib/include/uhdlib/rfnoc/radio_control_impl.hpp index 0d10fd13b..6b50cc31c 100644 --- a/host/lib/include/uhdlib/rfnoc/radio_control_impl.hpp +++ b/host/lib/include/uhdlib/rfnoc/radio_control_impl.hpp @@ -303,6 +303,7 @@ private:      std::unordered_map<size_t, double> _rx_bandwidth;      std::vector<uhd::stream_cmd_t> _last_stream_cmd; +    std::vector<bool> _restart_cont;  };  }} // namespace uhd::rfnoc diff --git a/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp b/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp index 6ced60d19..1d1d0805d 100644 --- a/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp +++ b/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp @@ -11,6 +11,7 @@  #include <uhdlib/rfnoc/chdr_rx_data_xport.hpp>  #include <uhdlib/transport/rx_streamer_impl.hpp>  #include <string> +#include <atomic>  namespace uhd { namespace rfnoc { @@ -77,6 +78,13 @@ public:  private:      void _register_props(const size_t chan, const std::string& otw_format); +    void _handle_rx_event_action( +        const res_source_info& src, rx_event_action_info::sptr rx_event_action); +    void _handle_restart_request( +        const res_source_info& src, action_info::sptr rx_event_action); +    void _handle_stream_cmd_action( +        const res_source_info& src, stream_cmd_action_info::sptr stream_cmd_action); +      // Properties      std::vector<property_t<double>> _scaling_in;      std::vector<property_t<double>> _samp_rate_in; @@ -88,6 +96,8 @@ private:      // Stream args provided at construction      const uhd::stream_args_t _stream_args; + +    std::atomic<bool> _overrun_handling_mode{false};  };  }} // namespace uhd::rfnoc diff --git a/host/lib/rfnoc/graph.cpp b/host/lib/rfnoc/graph.cpp index ff5fde1e9..1a25f84f9 100644 --- a/host/lib/rfnoc/graph.cpp +++ b/host/lib/rfnoc/graph.cpp @@ -417,8 +417,9 @@ void graph_t::enqueue_action(          // The following call can cause other nodes to add more actions to          // the end of _action_queue!          UHD_LOG_TRACE(LOG_ID, -            "Now delivering action " << next_action_sptr->key << "#" -                                     << next_action_sptr->id); +            "Now delivering action " +                << next_action_sptr->key << "#" << next_action_sptr->id << " to " +                << recipient_node->get_unique_id() << "@" << recipient_port.to_string());          node_accessor_t{}.send_action(recipient_node, recipient_port, next_action_sptr);      }      UHD_LOG_TRACE(LOG_ID, "Delivered all actions, terminating action handling."); diff --git a/host/lib/rfnoc/radio_control_impl.cpp b/host/lib/rfnoc/radio_control_impl.cpp index 018f54d9e..4ed0c4b60 100644 --- a/host/lib/rfnoc/radio_control_impl.cpp +++ b/host/lib/rfnoc/radio_control_impl.cpp @@ -74,6 +74,7 @@ radio_control_impl::radio_control_impl(make_args_ptr make_args)      , _spc(_radio_width & 0xFFFF)      , _last_stream_cmd(            get_num_output_ports(), uhd::stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS) +    , _restart_cont(get_num_output_ports(), false)  {      uhd::assert_fpga_compat(MAJOR_COMPAT,          MINOR_COMPAT, @@ -99,7 +100,8 @@ radio_control_impl::radio_control_impl(make_args_ptr make_args)                  "Received stream command: " << stream_cmd_action->stream_cmd.stream_mode                                              << " to " << src.to_string());              if (src.type != res_source_info::OUTPUT_EDGE) { -                RFNOC_LOG_WARNING("Received stream command, but not to output port! Ignoring."); +                RFNOC_LOG_WARNING( +                    "Received stream command, but not to output port! Ignoring.");                  return;              }              const size_t port = src.instance; @@ -108,6 +110,36 @@ radio_control_impl::radio_control_impl(make_args_ptr make_args)                  return;              }              issue_stream_cmd(stream_cmd_action->stream_cmd, port); +            if (stream_cmd_action->stream_cmd.stream_mode +                    == uhd::stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS +                && _restart_cont.at(port)) { +                RFNOC_LOG_TRACE("Received stop command after reporting overrun, will now " +                                "request restart."); +                _restart_cont[port] = false; +                auto restart_request_action = +                    action_info::make(ACTION_KEY_RX_RESTART_REQ); +                post_action({res_source_info::OUTPUT_EDGE, port}, restart_request_action); +            } +        }); +    register_action_handler(ACTION_KEY_RX_RESTART_REQ, +        [this](const res_source_info& src, action_info::sptr /*action*/) { +            RFNOC_LOG_TRACE("Received restart request command to " << src.to_string()); +            if (src.type != res_source_info::OUTPUT_EDGE) { +                RFNOC_LOG_WARNING( +                    "Received stream command, but not to output port! Ignoring."); +                return; +            } +            auto stream_cmd_action = stream_cmd_action_info::make( +                uhd::stream_cmd_t::STREAM_MODE_START_CONTINUOUS); +            // FIXME +            // stream_cmd_action->stream_cmd.stream_now = false; +            // stream_cmd_action->stream_cmd.time_spec = get_time_now() + DELTA; +            const size_t port = src.instance; +            if (port > get_num_output_ports()) { +                RFNOC_LOG_WARNING("Received stream command to invalid output port!"); +                return; +            } +            post_action({res_source_info::OUTPUT_EDGE, port}, stream_cmd_action);          });      // Register spp properties and resolvers      _spp_prop.reserve(get_num_output_ports()); @@ -827,6 +859,11 @@ void radio_control_impl::async_message_handler(      }      switch (addr_base + addr_offset) {          case regmap::SWREG_TX_ERR: { +            if (chan > get_num_input_ports()) { +                RFNOC_LOG_WARNING( +                    "Cannot process TX-related async message to invalid chan " << chan); +                return; +            }              switch (code) {                  case err_codes::ERR_TX_UNDERRUN:                      UHD_LOG_FASTPATH("U"); @@ -838,11 +875,20 @@ void radio_control_impl::async_message_handler(              break;          }          case regmap::SWREG_RX_ERR: { +            if (chan > get_num_input_ports()) { +                RFNOC_LOG_WARNING( +                    "Cannot process RX-related async message to invalid chan " << chan); +                return; +            }              switch (code) {                  case err_codes::ERR_RX_OVERRUN: {                      UHD_LOG_FASTPATH("O");                      auto rx_event_action        = rx_event_action_info::make();                      rx_event_action->error_code = uhd::rx_metadata_t::ERROR_CODE_OVERFLOW; +                    const bool cont_mode        = _last_stream_cmd.at(chan).stream_mode +                                           == stream_cmd_t::STREAM_MODE_START_CONTINUOUS; +                    _restart_cont[chan]                = cont_mode; +                    rx_event_action->args["cont_mode"] = std::to_string(cont_mode);                      RFNOC_LOG_TRACE("Posting overrun event action message.");                      post_action(res_source_info{res_source_info::OUTPUT_EDGE, chan},                          rx_event_action); diff --git a/host/lib/rfnoc/rfnoc_rx_streamer.cpp b/host/lib/rfnoc/rfnoc_rx_streamer.cpp index 4340faff0..cfa88b292 100644 --- a/host/lib/rfnoc/rfnoc_rx_streamer.cpp +++ b/host/lib/rfnoc/rfnoc_rx_streamer.cpp @@ -8,7 +8,10 @@  #include <uhdlib/rfnoc/node_accessor.hpp>  #include <uhdlib/rfnoc/rfnoc_rx_streamer.hpp>  #include <atomic> +#include <thread> +using namespace std::chrono_literals; +;  using namespace uhd;  using namespace uhd::rfnoc; @@ -24,6 +27,31 @@ rfnoc_rx_streamer::rfnoc_rx_streamer(const size_t num_chans,      // No block to which to forward properties or actions      set_prop_forwarding_policy(forwarding_policy_t::DROP);      set_action_forwarding_policy(forwarding_policy_t::DROP); +    // +    register_action_handler(ACTION_KEY_RX_EVENT, +        [this](const res_source_info& src, action_info::sptr action) { +            rx_event_action_info::sptr rx_event_action = +                std::dynamic_pointer_cast<rx_event_action_info>(action); +            if (!rx_event_action) { +                RFNOC_LOG_WARNING("Received invalid RX event action!"); +                return; +            } +            _handle_rx_event_action(src, rx_event_action); +        }); +    register_action_handler(ACTION_KEY_RX_RESTART_REQ, +        [this](const res_source_info& src, action_info::sptr action) { +            _handle_restart_request(src, action); +        }); +    register_action_handler(ACTION_KEY_STREAM_CMD, +        [this](const res_source_info& src, action_info::sptr action) { +            stream_cmd_action_info::sptr stream_cmd_action = +                std::dynamic_pointer_cast<stream_cmd_action_info>(action); +            if (!stream_cmd_action) { +                RFNOC_LOG_WARNING("Received invalid stream command action!"); +                return; +            } +            _handle_stream_cmd_action(src, stream_cmd_action); +        });      // Initialize properties      _scaling_in.reserve(num_chans); @@ -139,3 +167,65 @@ void rfnoc_rx_streamer::_register_props(const size_t chan,              }          });  } + +void rfnoc_rx_streamer::_handle_rx_event_action( +    const res_source_info& src, rx_event_action_info::sptr rx_event_action) +{ +    UHD_ASSERT_THROW(src.type == res_source_info::INPUT_EDGE); +    if (rx_event_action->error_code == uhd::rx_metadata_t::ERROR_CODE_OVERFLOW) { +        RFNOC_LOG_DEBUG("Received overrun message on port " << src.instance); +        if (_overrun_handling_mode.exchange(true)) { +            RFNOC_LOG_TRACE("Ignoring duplicate overrun message."); +            return; +        } +        RFNOC_LOG_TRACE( +            "Switching to overrun-handling mode: Stopping all upstream producers..."); +        auto stop_action = +            stream_cmd_action_info::make(uhd::stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS); +        // Reminder: Delivery of all of these actions is deferred until this +        // action handler is complete. +        for (size_t i = 0; i < get_num_input_ports(); ++i) { +            post_action({res_source_info::INPUT_EDGE, i}, stop_action); +        } +        if (!rx_event_action->args.cast<bool>("cont_mode", false)) { +            // FIXME wait until it's safe to restart radios +            // If we don't need to restart, that's all we need to do +            _overrun_handling_mode = false; +        } +    } +} + +void rfnoc_rx_streamer::_handle_restart_request( +    const res_source_info& src, action_info::sptr) +{ +    // FIXME: Now we need to wait until it's safe to restart the radios. +    // A flush would achieve this, albeit at the cost of possibly losing +    // samples. +    // The earliest we can restart is when the FIFOs in upstream producers +    // are empty. +    RFNOC_LOG_TRACE("Waiting for FIFOs to clear"); + +    std::this_thread::sleep_for(100ms); + +    // Once it's safe to restart the radios, we ask a radio to send us a +    // stream command with its current time. +    RFNOC_LOG_TRACE("Requesting restart from overrun-reporting node..."); +    post_action({res_source_info::INPUT_EDGE, src.instance}, +        action_info::make(ACTION_KEY_RX_RESTART_REQ)); +} + +void rfnoc_rx_streamer::_handle_stream_cmd_action( +    const res_source_info& src, stream_cmd_action_info::sptr stream_cmd_action) +{ +    RFNOC_LOG_TRACE("Received stream command on " << src.to_string()); +    UHD_ASSERT_THROW(src.type == res_source_info::INPUT_EDGE); +    auto start_action = +        stream_cmd_action_info::make(stream_cmd_action->stream_cmd.stream_mode); +    start_action->stream_cmd = stream_cmd_action->stream_cmd; +    for (size_t i = 0; i < get_num_input_ports(); ++i) { +        post_action({res_source_info::INPUT_EDGE, i}, start_action); +    } +    if (_overrun_handling_mode.exchange(false)) { +        RFNOC_LOG_TRACE("Leaving overrun handling mode."); +    } +}  | 
