diff options
| -rw-r--r-- | host/lib/include/uhdlib/rfnoc/radio_control_impl.hpp | 1 | ||||
| -rw-r--r-- | host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp | 8 | ||||
| -rw-r--r-- | host/lib/include/uhdlib/transport/rx_streamer_impl.hpp | 13 | ||||
| -rw-r--r-- | host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp | 137 | ||||
| -rw-r--r-- | host/lib/rfnoc/radio_control_impl.cpp | 22 | ||||
| -rw-r--r-- | host/lib/rfnoc/rfnoc_rx_streamer.cpp | 49 | 
6 files changed, 158 insertions, 72 deletions
diff --git a/host/lib/include/uhdlib/rfnoc/radio_control_impl.hpp b/host/lib/include/uhdlib/rfnoc/radio_control_impl.hpp index 5440c1e37..5327105c8 100644 --- a/host/lib/include/uhdlib/rfnoc/radio_control_impl.hpp +++ b/host/lib/include/uhdlib/rfnoc/radio_control_impl.hpp @@ -324,7 +324,6 @@ private:      std::unordered_map<size_t, double> _rx_bandwidth;      std::vector<uhd::stream_cmd_t> _last_stream_cmd; -    std::vector<bool> _restart_cont;  };  }} // namespace uhd::rfnoc diff --git a/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp b/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp index 1d1d0805d..d39d88f43 100644 --- a/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp +++ b/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp @@ -10,8 +10,8 @@  #include <uhd/rfnoc/node.hpp>  #include <uhdlib/rfnoc/chdr_rx_data_xport.hpp>  #include <uhdlib/transport/rx_streamer_impl.hpp> -#include <string>  #include <atomic> +#include <string>  namespace uhd { namespace rfnoc { @@ -75,16 +75,17 @@ public:       */      bool check_topology(const std::vector<size_t>& connected_inputs,          const std::vector<size_t>& connected_outputs); +  private:      void _register_props(const size_t chan, const std::string& otw_format);      void _handle_rx_event_action(          const res_source_info& src, rx_event_action_info::sptr rx_event_action); -    void _handle_restart_request( -        const res_source_info& src, action_info::sptr rx_event_action);      void _handle_stream_cmd_action(          const res_source_info& src, stream_cmd_action_info::sptr stream_cmd_action); +    void _handle_overrun(); +      // Properties      std::vector<property_t<double>> _scaling_in;      std::vector<property_t<double>> _samp_rate_in; @@ -98,6 +99,7 @@ private:      const uhd::stream_args_t _stream_args;      std::atomic<bool> _overrun_handling_mode{false}; +    size_t _overrun_channel = 0;  };  }} // namespace uhd::rfnoc diff --git a/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp index d3fe97c7f..cc989e8f2 100644 --- a/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp +++ b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp @@ -196,6 +196,19 @@ protected:          _zero_copy_streamer.set_tick_rate(rate);      } +    //! Notifies the streamer that an overrun has occured +    void set_stopped_due_to_overrun() +    { +        _zero_copy_streamer.set_stopped_due_to_overrun(); +    } + +    //! Provides a callback to handle overruns +    void set_overrun_handler( +        typename rx_streamer_zero_copy<transport_t>::overrun_handler_t handler) +    { +        _zero_copy_streamer.set_overrun_handler(handler); +    } +  private:      //! Converter and associated item sizes      struct convert_info diff --git a/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp b/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp index 36f568f2d..1f7320330 100644 --- a/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp +++ b/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp @@ -14,6 +14,7 @@  #include <uhdlib/transport/get_aligned_buffs.hpp>  #include <boost/format.hpp>  #include <vector> +#include <atomic>  namespace uhd { namespace transport { @@ -26,6 +27,8 @@ template <typename transport_t>  class rx_streamer_zero_copy  {  public: +    using overrun_handler_t = std::function<void()>; +      //! Constructor      rx_streamer_zero_copy(const size_t num_ports)          : _xports(num_ports) @@ -84,6 +87,18 @@ public:          _bytes_per_item = bpi;      } +    //! Notifies the streamer that an overrun has occured +    void set_stopped_due_to_overrun() +    { +        _stopped_due_to_overrun = true; +    } + +    //! Provides a callback to handle overruns +    void set_overrun_handler(overrun_handler_t handler) +    { +        _overrun_handler = handler; +    } +      /*!       * Gets a set of time-aligned buffers, one per channel.       * @@ -96,35 +111,62 @@ public:          rx_metadata_t& metadata,          const int32_t timeout_ms)      { -        metadata.reset(); +        // Function to set metadata based on alignment error +        auto set_metadata_for_error = +            [this](typename get_aligned_buffs_t::alignment_result_t error, +                rx_metadata_t& metadata) { +                switch (error) { +                    case get_aligned_buffs_t::BAD_PACKET: +                        metadata.error_code = rx_metadata_t::ERROR_CODE_BAD_PACKET; +                        break; + +                    case get_aligned_buffs_t::TIMEOUT: +                        metadata.error_code = rx_metadata_t::ERROR_CODE_TIMEOUT; +                        break; + +                    case get_aligned_buffs_t::ALIGNMENT_FAILURE: +                        metadata.error_code = rx_metadata_t::ERROR_CODE_ALIGNMENT; +                        break; + +                    case get_aligned_buffs_t::SEQUENCE_ERROR: +                        std::tie(metadata.has_time_spec, metadata.time_spec) = +                            _last_read_time_info.get_next_packet_time(_samp_rate); +                        metadata.out_of_sequence = true; +                        metadata.error_code      = rx_metadata_t::ERROR_CODE_OVERFLOW; +                        break; + +                    default: +                        UHD_THROW_INVALID_CODE_PATH(); +                } +            }; -        switch (_get_aligned_buffs(timeout_ms)) { -            case get_aligned_buffs_t::SUCCESS: -                break; - -            case get_aligned_buffs_t::BAD_PACKET: -                metadata.error_code = rx_metadata_t::ERROR_CODE_BAD_PACKET; -                return 0; - -            case get_aligned_buffs_t::TIMEOUT: -                metadata.error_code = rx_metadata_t::ERROR_CODE_TIMEOUT; -                return 0; - -            case get_aligned_buffs_t::ALIGNMENT_FAILURE: -                metadata.error_code = rx_metadata_t::ERROR_CODE_ALIGNMENT; -                return 0; +        metadata.reset(); -            case get_aligned_buffs_t::SEQUENCE_ERROR: -                metadata.has_time_spec = _last_read_time_info.has_time_spec; -                metadata.time_spec = -                    _last_read_time_info.time_spec -                    + time_spec_t::from_ticks(_last_read_time_info.num_samps, _samp_rate); -                metadata.out_of_sequence = true; -                metadata.error_code      = rx_metadata_t::ERROR_CODE_OVERFLOW; +        // Try to get buffs with a 0 timeout first. This avoids needing to check +        // if radios are stopped due to overrun when packets are available. +        auto result = _get_aligned_buffs(0); + +        if (result == get_aligned_buffs_t::TIMEOUT) { +            if (_stopped_due_to_overrun) { +                // An overrun occurred and the user has read all the packets +                // that were buffered prior to the overrun. Call the overrun +                // handler and return overrun error. +                _handle_overrun(); +                std::tie(metadata.has_time_spec, metadata.time_spec) = +                    _last_read_time_info.get_next_packet_time(_samp_rate); +                metadata.error_code     = rx_metadata_t::ERROR_CODE_OVERFLOW; +                _stopped_due_to_overrun = false;                  return 0; +            } else { +                // Packets were not available with zero timeout, wait for them +                // to arrive using the specified timeout. +                result = _get_aligned_buffs(timeout_ms); +            } +        } -            default: -                UHD_THROW_INVALID_CODE_PATH(); +        if (result != get_aligned_buffs_t::SUCCESS) { +            set_metadata_for_error(result, metadata); +            return 0;          }          // Get payload pointers for each buffer and aggregate eob. We set eob to @@ -167,6 +209,34 @@ public:  private:      using get_aligned_buffs_t = get_aligned_buffs<transport_t>; +    void _handle_overrun() +    { +        // Flush any remaining packets. This method is called after any channel +        // times out, so here we ensure all channels are flushed prior to +        // calling the overrun handler to potentially restart the radios. +        for (size_t chan = 0; chan < _xports.size(); chan++) { +            if (_frame_buffs[chan]) { +                _xports[chan]->release_recv_buff(std::move(_frame_buffs[chan])); +                _frame_buffs[chan] = nullptr; +            } + +            frame_buff::uptr buff; +            while (true) { +                std::tie(buff, std::ignore, std::ignore) = +                    _xports[chan]->get_recv_buff(0); +                if (!buff) { +                    break; +                } +                _xports[chan]->release_recv_buff(std::move(buff)); +            } +        } + +        // Now call the overrun handler +        if (_overrun_handler) { +            _overrun_handler(); +        } +    } +      // Information recorded by streamer about the last data packet processed,      // used to create the metadata when there is a sequence error.      struct last_read_time_info_t @@ -174,6 +244,16 @@ private:          size_t num_samps   = 0;          bool has_time_spec = false;          time_spec_t time_spec; + +        std::tuple<bool, time_spec_t> get_next_packet_time(double samp_rate) +        { +            if (has_time_spec) { +                return std::make_tuple( +                    true, time_spec + time_spec_t::from_ticks(num_samps, samp_rate)); +            } else { +                return std::make_tuple(false, time_spec_t()); +            } +        }      };      // Transports for each channel @@ -200,6 +280,13 @@ private:      // Information about the last data packet processed      last_read_time_info_t _last_read_time_info; + +    // Flag that indicates an overrun occurred. The streamer will return an +    // overrun error when no more packets are available. +    std::atomic<bool> _stopped_due_to_overrun{false}; + +    // Callback for overrun +    overrun_handler_t _overrun_handler;  };  }} // namespace uhd::transport diff --git a/host/lib/rfnoc/radio_control_impl.cpp b/host/lib/rfnoc/radio_control_impl.cpp index 9d7257108..e400033b3 100644 --- a/host/lib/rfnoc/radio_control_impl.cpp +++ b/host/lib/rfnoc/radio_control_impl.cpp @@ -6,6 +6,7 @@  #include <uhd/exception.hpp>  #include <uhd/utils/log.hpp> +#include <uhd/rfnoc/mb_controller.hpp>  #include <uhdlib/rfnoc/radio_control_impl.hpp>  #include <uhdlib/utils/compat_check.hpp>  #include <map> @@ -63,6 +64,8 @@ const uint32_t radio_control_impl::regmap::RX_CMD_TIMED_POS;  const uhd::fs_path radio_control_impl::DB_PATH("dboard");  const uhd::fs_path radio_control_impl::FE_PATH("frontends"); +static constexpr double OVERRUN_RESTART_DELAY = 0.05; +  /****************************************************************************   * Structors   ***************************************************************************/ @@ -74,7 +77,6 @@ radio_control_impl::radio_control_impl(make_args_ptr make_args)      , _spc(_radio_width & 0xFFFF)      , _last_stream_cmd(            get_num_output_ports(), uhd::stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS) -    , _restart_cont(get_num_output_ports(), false)  {      uhd::assert_fpga_compat(MAJOR_COMPAT,          MINOR_COMPAT, @@ -110,16 +112,6 @@ radio_control_impl::radio_control_impl(make_args_ptr make_args)                  return;              }              issue_stream_cmd(stream_cmd_action->stream_cmd, port); -            if (stream_cmd_action->stream_cmd.stream_mode -                    == uhd::stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS -                && _restart_cont.at(port)) { -                RFNOC_LOG_TRACE("Received stop command after reporting overrun, will now " -                                "request restart."); -                _restart_cont[port] = false; -                auto restart_request_action = -                    action_info::make(ACTION_KEY_RX_RESTART_REQ); -                post_action({res_source_info::OUTPUT_EDGE, port}, restart_request_action); -            }          });      register_action_handler(ACTION_KEY_RX_RESTART_REQ,          [this](const res_source_info& src, action_info::sptr /*action*/) { @@ -131,9 +123,10 @@ radio_control_impl::radio_control_impl(make_args_ptr make_args)              }              auto stream_cmd_action = stream_cmd_action_info::make(                  uhd::stream_cmd_t::STREAM_MODE_START_CONTINUOUS); -            // FIXME -            // stream_cmd_action->stream_cmd.stream_now = false; -            // stream_cmd_action->stream_cmd.time_spec = get_time_now() + DELTA; +            stream_cmd_action->stream_cmd.stream_now = false; +            stream_cmd_action->stream_cmd.time_spec = +                get_mb_controller()->get_timekeeper(0)->get_time_now() + +                uhd::time_spec_t(OVERRUN_RESTART_DELAY);              const size_t port = src.instance;              if (port > get_num_output_ports()) {                  RFNOC_LOG_WARNING("Received stream command to invalid output port!"); @@ -962,7 +955,6 @@ void radio_control_impl::async_message_handler(                      rx_event_action->error_code = uhd::rx_metadata_t::ERROR_CODE_OVERFLOW;                      const bool cont_mode        = _last_stream_cmd.at(chan).stream_mode                                             == stream_cmd_t::STREAM_MODE_START_CONTINUOUS; -                    _restart_cont[chan]                = cont_mode;                      rx_event_action->args["cont_mode"] = std::to_string(cont_mode);                      RFNOC_LOG_TRACE("Posting overrun event action message.");                      post_action(res_source_info{res_source_info::OUTPUT_EDGE, chan}, diff --git a/host/lib/rfnoc/rfnoc_rx_streamer.cpp b/host/lib/rfnoc/rfnoc_rx_streamer.cpp index cfa88b292..b50e2fe15 100644 --- a/host/lib/rfnoc/rfnoc_rx_streamer.cpp +++ b/host/lib/rfnoc/rfnoc_rx_streamer.cpp @@ -18,16 +18,18 @@ using namespace uhd::rfnoc;  const std::string STREAMER_ID = "RxStreamer";  static std::atomic<uint64_t> streamer_inst_ctr; -rfnoc_rx_streamer::rfnoc_rx_streamer(const size_t num_chans, -    const uhd::stream_args_t stream_args) +rfnoc_rx_streamer::rfnoc_rx_streamer( +    const size_t num_chans, const uhd::stream_args_t stream_args)      : rx_streamer_impl<chdr_rx_data_xport>(num_chans, stream_args)      , _unique_id(STREAMER_ID + "#" + std::to_string(streamer_inst_ctr++))      , _stream_args(stream_args)  { +    set_overrun_handler([this]() { this->_handle_overrun(); }); +      // No block to which to forward properties or actions      set_prop_forwarding_policy(forwarding_policy_t::DROP);      set_action_forwarding_policy(forwarding_policy_t::DROP); -    // +      register_action_handler(ACTION_KEY_RX_EVENT,          [this](const res_source_info& src, action_info::sptr action) {              rx_event_action_info::sptr rx_event_action = @@ -38,10 +40,6 @@ rfnoc_rx_streamer::rfnoc_rx_streamer(const size_t num_chans,              }              _handle_rx_event_action(src, rx_event_action);          }); -    register_action_handler(ACTION_KEY_RX_RESTART_REQ, -        [this](const res_source_info& src, action_info::sptr action) { -            _handle_restart_request(src, action); -        });      register_action_handler(ACTION_KEY_STREAM_CMD,          [this](const res_source_info& src, action_info::sptr action) {              stream_cmd_action_info::sptr stream_cmd_action = @@ -117,6 +115,15 @@ bool rfnoc_rx_streamer::check_topology(      return node_t::check_topology(connected_inputs, connected_outputs);  } +void rfnoc_rx_streamer::_handle_overrun() +{ +    if (_overrun_handling_mode) { +        RFNOC_LOG_TRACE("Requesting restart from overrun-reporting node..."); +        post_action({res_source_info::INPUT_EDGE, _overrun_channel}, +            action_info::make(ACTION_KEY_RX_RESTART_REQ)); +    } +} +  void rfnoc_rx_streamer::_register_props(const size_t chan,      const std::string& otw_format)  { @@ -178,6 +185,7 @@ void rfnoc_rx_streamer::_handle_rx_event_action(              RFNOC_LOG_TRACE("Ignoring duplicate overrun message.");              return;          } +        _overrun_channel = src.instance;          RFNOC_LOG_TRACE(              "Switching to overrun-handling mode: Stopping all upstream producers...");          auto stop_action = @@ -188,32 +196,17 @@ void rfnoc_rx_streamer::_handle_rx_event_action(              post_action({res_source_info::INPUT_EDGE, i}, stop_action);          }          if (!rx_event_action->args.cast<bool>("cont_mode", false)) { -            // FIXME wait until it's safe to restart radios -            // If we don't need to restart, that's all we need to do +            // If we don't need to restart, that's all we need to do. Clear this +            // flag before setting the stopped due to overrun status below to +            // avoid a potential race condition with the overrun handler.              _overrun_handling_mode = false;          } +        // Tell the streamer to flag an overrun to the user after the data that +        // was buffered prior to the overrun is read. +        set_stopped_due_to_overrun();      }  } -void rfnoc_rx_streamer::_handle_restart_request( -    const res_source_info& src, action_info::sptr) -{ -    // FIXME: Now we need to wait until it's safe to restart the radios. -    // A flush would achieve this, albeit at the cost of possibly losing -    // samples. -    // The earliest we can restart is when the FIFOs in upstream producers -    // are empty. -    RFNOC_LOG_TRACE("Waiting for FIFOs to clear"); - -    std::this_thread::sleep_for(100ms); - -    // Once it's safe to restart the radios, we ask a radio to send us a -    // stream command with its current time. -    RFNOC_LOG_TRACE("Requesting restart from overrun-reporting node..."); -    post_action({res_source_info::INPUT_EDGE, src.instance}, -        action_info::make(ACTION_KEY_RX_RESTART_REQ)); -} -  void rfnoc_rx_streamer::_handle_stream_cmd_action(      const res_source_info& src, stream_cmd_action_info::sptr stream_cmd_action)  {  | 
