diff options
Diffstat (limited to 'host')
| -rw-r--r-- | host/include/uhd/rfnoc/actions.hpp | 22 | ||||
| -rw-r--r-- | host/include/uhd/rfnoc/defaults.hpp | 3 | ||||
| -rw-r--r-- | host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp | 41 | ||||
| -rw-r--r-- | host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp | 17 | ||||
| -rw-r--r-- | host/lib/include/uhdlib/rfnoc/tx_async_msg_queue.hpp | 49 | ||||
| -rw-r--r-- | host/lib/include/uhdlib/transport/tx_streamer_impl.hpp | 11 | ||||
| -rw-r--r-- | host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp | 6 | ||||
| -rw-r--r-- | host/lib/rfnoc/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | host/lib/rfnoc/actions.cpp | 20 | ||||
| -rw-r--r-- | host/lib/rfnoc/radio_control_impl.cpp | 16 | ||||
| -rw-r--r-- | host/lib/rfnoc/rfnoc_tx_streamer.cpp | 69 | ||||
| -rw-r--r-- | host/lib/rfnoc/tx_async_msg_queue.cpp | 52 | ||||
| -rw-r--r-- | host/tests/tx_streamer_test.cpp | 6 | 
13 files changed, 291 insertions, 22 deletions
diff --git a/host/include/uhd/rfnoc/actions.hpp b/host/include/uhd/rfnoc/actions.hpp index b713bbed3..7326cfd6d 100644 --- a/host/include/uhd/rfnoc/actions.hpp +++ b/host/include/uhd/rfnoc/actions.hpp @@ -78,7 +78,27 @@ private:      rx_event_action_info();  }; +struct UHD_API tx_event_action_info : public action_info +{ +public: +    using sptr = std::shared_ptr<tx_event_action_info>; + +    //! The event code that describes the event +    uhd::async_metadata_t::event_code_t event_code; + +    //! Has time specification? +    bool has_tsf; + +    //! When the async event occurred +    uint64_t tsf; + +    //! Factory function +    static sptr make(uhd::async_metadata_t::event_code_t event_code); + +protected: +    tx_event_action_info(uhd::async_metadata_t::event_code_t event_code); +}; +  }} /* namespace uhd::rfnoc */  #endif /* INCLUDED_LIBUHD_RFNOC_ACTIONS_HPP */ - diff --git a/host/include/uhd/rfnoc/defaults.hpp b/host/include/uhd/rfnoc/defaults.hpp index 008847091..efc774467 100644 --- a/host/include/uhd/rfnoc/defaults.hpp +++ b/host/include/uhd/rfnoc/defaults.hpp @@ -32,11 +32,12 @@ static const io_type_t IO_TYPE_SC16 = "sc16";  static const std::string ACTION_KEY_STREAM_CMD("stream_cmd");  static const std::string ACTION_KEY_RX_EVENT("rx_event");  static const std::string ACTION_KEY_RX_RESTART_REQ("restart_request"); +static const std::string ACTION_KEY_TX_EVENT("tx_event");  //! If the block name can't be automatically detected, this name is used  static const std::string DEFAULT_BLOCK_NAME = "Block";  //! This NOC-ID is used to look up the default block -static const uint32_t DEFAULT_NOC_ID = 0xFFFFFFFF; +static const uint32_t DEFAULT_NOC_ID  = 0xFFFFFFFF;  static const double DEFAULT_TICK_RATE = 1.0;  // Whenever we need a default spp value use this, unless there are some  // block/device-specific constraints. It will keep the frame size below 1500. diff --git a/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp index 0fb0ab5d1..63c2b24cb 100644 --- a/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp +++ b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp @@ -7,6 +7,7 @@  #ifndef INCLUDED_LIBUHD_CHDR_TX_DATA_XPORT_HPP  #define INCLUDED_LIBUHD_CHDR_TX_DATA_XPORT_HPP +#include <uhd/types/metadata.hpp>  #include <uhdlib/rfnoc/chdr_packet.hpp>  #include <uhdlib/rfnoc/chdr_types.hpp>  #include <uhdlib/rfnoc/mgmt_portal.hpp> @@ -103,8 +104,9 @@ private:  class chdr_tx_data_xport  {  public: -    using uptr   = std::unique_ptr<chdr_tx_data_xport>; -    using buff_t = transport::frame_buff; +    using uptr                   = std::unique_ptr<chdr_tx_data_xport>; +    using buff_t                 = transport::frame_buff; +    using enqueue_async_msg_fn_t = std::function<void(async_metadata_t::event_code_t, bool, uint64_t)>;      //! Information about data packet      struct packet_info_t @@ -215,6 +217,16 @@ public:      }      /*! +     * Configure a function to call to enqueue async msgs +     * +     * \param fn Function to enqueue async messages +     */ +    void set_enqueue_async_msg_fn(enqueue_async_msg_fn_t fn) +    { +        _enqueue_async_msg = fn; +    } + +    /*!       * Sends a TX data packet       *       * \param buff the frame buffer containing the packet to send @@ -286,7 +298,27 @@ private:              _fc_state.update_dest_recv_count(                  {strs.xfer_count_bytes, static_cast<uint32_t>(strs.xfer_count_pkts)}); -            // TODO: check strs status here and push into async msg queue +            if (strs.status != chdr::STRS_OKAY) { +                switch (strs.status) { +                case chdr::STRS_SEQERR: +                    UHD_LOG_FASTPATH("S"); +                    if (_enqueue_async_msg) { +                        _enqueue_async_msg(async_metadata_t::EVENT_CODE_SEQ_ERROR, false, 0); +                    } +                    break; +                case chdr::STRS_DATAERR: +                    UHD_LOG_WARNING("XPORT::TX_DATA_XPORT", "Received data error in tx stream!"); +                    break; +                case chdr::STRS_RTERR: +                    UHD_LOG_WARNING("XPORT::TX_DATA_XPORT", "Received routing error in tx stream!"); +                    break; +                case chdr::STRS_CMDERR: +                    UHD_LOG_WARNING("XPORT::TX_DATA_XPORT", "Received command error in tx stream!"); +                    break; +                default: +                    break; +                } +            }              // Packet belongs to this transport, release buff and return true              recv_link->release_recv_buff(std::move(buff)); @@ -522,6 +554,9 @@ private:      // Handles sending of strc flow control ack packets      detail::tx_flow_ctrl_sender _fc_sender; +    // Function to enqueue an async msg +    enqueue_async_msg_fn_t _enqueue_async_msg; +      // Local / Source EPID      sep_id_t _epid;  }; diff --git a/host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp b/host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp index 3bfc9d05a..3e006f7f9 100644 --- a/host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp +++ b/host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp @@ -9,6 +9,7 @@  #include <uhd/rfnoc/node.hpp>  #include <uhdlib/rfnoc/chdr_tx_data_xport.hpp> +#include <uhdlib/rfnoc/tx_async_msg_queue.hpp>  #include <uhdlib/transport/tx_streamer_impl.hpp>  #include <string> @@ -78,9 +79,25 @@ public:       */      void connect_channel(const size_t channel, chdr_tx_data_xport::uptr xport); +    /*! Receive an asynchronous message from this tx stream +     * +     *  Implementation of tx_streamer API method. +     * +     * \param async_metadata the metadata to be filled in +     * \param timeout the timeout in seconds to wait for a message +     * \return true when the async_metadata is valid, false for timeout +     */ +    bool recv_async_msg(uhd::async_metadata_t& async_metadata, double timeout); +  private:      void _register_props(const size_t chan, const std::string& otw_format); +    void _handle_tx_event_action( +        const res_source_info& src, tx_event_action_info::sptr tx_event_action); + +    // Queue for async messages +    tx_async_msg_queue::sptr _async_msg_queue; +      // Properties      std::vector<property_t<double>> _scaling_out;      std::vector<property_t<double>> _samp_rate_out; diff --git a/host/lib/include/uhdlib/rfnoc/tx_async_msg_queue.hpp b/host/lib/include/uhdlib/rfnoc/tx_async_msg_queue.hpp new file mode 100644 index 000000000..181a31754 --- /dev/null +++ b/host/lib/include/uhdlib/rfnoc/tx_async_msg_queue.hpp @@ -0,0 +1,49 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_TX_ASYNC_MSG_QUEUE_HPP +#define INCLUDED_LIBUHD_TX_ASYNC_MSG_QUEUE_HPP + +#include <uhd/types/metadata.hpp> +#include <boost/lockfree/queue.hpp> + +namespace uhd { namespace rfnoc { + +/*! + *  Implements queue of async messages originating from the tx data transport + *  and from the rfnoc graph. + */ +class tx_async_msg_queue +{ +public: +    using sptr = std::shared_ptr<tx_async_msg_queue>; + +    //! Constructor +    tx_async_msg_queue(size_t capacity); + +    /*! +     *  Retrieve async message from queue +     * +     * \param async_metadata the metadata to be filled in +     * \param timeout_ms the timeout in milliseconds to wait for a message +     * \return true when the async_metadata is valid, false for timeout +     */ +    bool recv_async_msg(async_metadata_t& async_metadata, int32_t timeout_ms); + +    /*! +     *  Push an async message onto the queue +     * +     * \param async_metadata the metadata to be pushed +     */ +    void enqueue(const async_metadata_t& async_metadata); + +private: +    boost::lockfree::queue<async_metadata_t> _queue; +}; + +}} // namespace uhd::rfnoc + +#endif /* INCLUDED_LIBUHD_TX_ASYNC_MSG_QUEUE_HPP */ diff --git a/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp index 819ed5558..35a724fa9 100644 --- a/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp +++ b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp @@ -99,7 +99,6 @@ public:          return _spp;      } -      /*! Get width of each over-the-wire item component. For complex items,       *  returns the width of one component only (real or imaginary).       */ @@ -178,15 +177,13 @@ public:          }      } -    //! Implementation of rx_streamer API method -    bool recv_async_msg( -        uhd::async_metadata_t& /*async_metadata*/, double /*timeout = 0.1*/) +protected: +    //! Returns the tick rate for conversion of timestamp +    double get_tick_rate() const      { -        // TODO: implement me -        return false; +        return _zero_copy_streamer.get_tick_rate();      } -protected:      //! Returns the size in bytes of a sample in a packet      size_t get_mtu() const      { diff --git a/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp b/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp index 1b6f55238..5ac7a1e8c 100644 --- a/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp +++ b/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp @@ -51,6 +51,12 @@ public:          return _xports.size();      } +    //! Returns the tick rate for conversion of timestamp +    double get_tick_rate() const +    { +        return _tick_rate; +    } +      //! Configures tick rate for conversion of timestamp      void set_tick_rate(const double rate)      { diff --git a/host/lib/rfnoc/CMakeLists.txt b/host/lib/rfnoc/CMakeLists.txt index a88507dcd..73de394e3 100644 --- a/host/lib/rfnoc/CMakeLists.txt +++ b/host/lib/rfnoc/CMakeLists.txt @@ -55,6 +55,7 @@ LIBUHD_APPEND_SOURCES(      ${CMAKE_CURRENT_SOURCE_DIR}/wb_iface_adapter.cpp      ${CMAKE_CURRENT_SOURCE_DIR}/rfnoc_rx_streamer.cpp      ${CMAKE_CURRENT_SOURCE_DIR}/rfnoc_tx_streamer.cpp +    ${CMAKE_CURRENT_SOURCE_DIR}/tx_async_msg_queue.cpp      # Default block control classes:      ${CMAKE_CURRENT_SOURCE_DIR}/block_control.cpp      ${CMAKE_CURRENT_SOURCE_DIR}/ddc_block_control.cpp diff --git a/host/lib/rfnoc/actions.cpp b/host/lib/rfnoc/actions.cpp index d4ed4559f..4de05b304 100644 --- a/host/lib/rfnoc/actions.cpp +++ b/host/lib/rfnoc/actions.cpp @@ -63,3 +63,23 @@ rx_event_action_info::sptr rx_event_action_info::make()      };      return std::make_shared<rx_event_action_info_make_shared>();  } + +/*** TX Metadata Action Info *************************************************/ +tx_event_action_info::tx_event_action_info( +    uhd::async_metadata_t::event_code_t event_code_) +    : action_info(ACTION_KEY_TX_EVENT), event_code(event_code_) +{ +} + +tx_event_action_info::sptr tx_event_action_info::make( +    uhd::async_metadata_t::event_code_t event_code) +{ +    struct tx_event_action_info_make_shared : public tx_event_action_info +    { +        tx_event_action_info_make_shared(uhd::async_metadata_t::event_code_t event_code) +            : tx_event_action_info(event_code) +        { +        } +    }; +    return std::make_shared<tx_event_action_info_make_shared>(event_code); +} diff --git a/host/lib/rfnoc/radio_control_impl.cpp b/host/lib/rfnoc/radio_control_impl.cpp index 2094f4096..9d7257108 100644 --- a/host/lib/rfnoc/radio_control_impl.cpp +++ b/host/lib/rfnoc/radio_control_impl.cpp @@ -928,12 +928,24 @@ void radio_control_impl::async_message_handler(                  return;              }              switch (code) { -                case err_codes::ERR_TX_UNDERRUN: +                case err_codes::ERR_TX_UNDERRUN: { +                    auto tx_event_action = tx_event_action_info::make( +                        uhd::async_metadata_t::EVENT_CODE_UNDERFLOW); +                    post_action(res_source_info{res_source_info::INPUT_EDGE, chan}, +                        tx_event_action);                      UHD_LOG_FASTPATH("U"); +                    RFNOC_LOG_TRACE("Posting underrun event action message.");                      break; -                case err_codes::ERR_TX_LATE_DATA: +                } +                case err_codes::ERR_TX_LATE_DATA: { +                    auto tx_event_action = tx_event_action_info::make( +                        uhd::async_metadata_t::EVENT_CODE_TIME_ERROR); +                    post_action(res_source_info{res_source_info::INPUT_EDGE, chan}, +                        tx_event_action);                      UHD_LOG_FASTPATH("L"); +                    RFNOC_LOG_TRACE("Posting late data event action message.");                      break; +                }              }              break;          } diff --git a/host/lib/rfnoc/rfnoc_tx_streamer.cpp b/host/lib/rfnoc/rfnoc_tx_streamer.cpp index 61d714a85..4fc1a3ff8 100644 --- a/host/lib/rfnoc/rfnoc_tx_streamer.cpp +++ b/host/lib/rfnoc/rfnoc_tx_streamer.cpp @@ -14,6 +14,7 @@ using namespace uhd::rfnoc;  const std::string STREAMER_ID = "TxStreamer";  static std::atomic<uint64_t> streamer_inst_ctr; +static constexpr size_t ASYNC_MSG_QUEUE_SIZE = 1000;  rfnoc_tx_streamer::rfnoc_tx_streamer(const size_t num_chans,      const uhd::stream_args_t stream_args) @@ -21,10 +22,23 @@ rfnoc_tx_streamer::rfnoc_tx_streamer(const size_t num_chans,      , _unique_id(STREAMER_ID + "#" + std::to_string(streamer_inst_ctr++))      , _stream_args(stream_args)  { +    _async_msg_queue = std::make_shared<tx_async_msg_queue>(ASYNC_MSG_QUEUE_SIZE); +      // No block to which to forward properties or actions      set_prop_forwarding_policy(forwarding_policy_t::DROP);      set_action_forwarding_policy(forwarding_policy_t::DROP); +    register_action_handler(ACTION_KEY_TX_EVENT, +        [this](const res_source_info& src, action_info::sptr action) { +            tx_event_action_info::sptr tx_event_action = +                std::dynamic_pointer_cast<tx_event_action_info>(action); +            if (!tx_event_action) { +                RFNOC_LOG_WARNING("Received invalid TX event action!"); +                return; +            } +            _handle_tx_event_action(src, tx_event_action); +        }); +      // Initialize properties      _scaling_out.reserve(num_chans);      _samp_rate_out.reserve(num_chans); @@ -41,7 +55,6 @@ rfnoc_tx_streamer::rfnoc_tx_streamer(const size_t num_chans,          for (auto& mtu_prop : _mtu_out) {              mtu_resolver_out.insert(&mtu_prop);          } -        //property_t<size_t>* mtu_out = &_mtu_out.back();          add_property_resolver({&_mtu_out[i]}, std::move(mtu_resolver_out),              [&mtu_out = _mtu_out[i], i, this]() { @@ -105,10 +118,32 @@ void rfnoc_tx_streamer::connect_channel(      const size_t mtu = xport->get_max_payload_size();      set_property<size_t>(PROP_KEY_MTU, mtu, {res_source_info::OUTPUT_EDGE, channel}); +    xport->set_enqueue_async_msg_fn( +        [this, channel](async_metadata_t::event_code_t event_code, bool has_tsf, uint64_t tsf) { +            async_metadata_t md; +            md.channel       = channel; +            md.event_code    = event_code; +            md.has_time_spec = has_tsf; + +            if (has_tsf) { +                md.time_spec = time_spec_t::from_ticks(tsf, get_tick_rate()); +            } + +            this->_async_msg_queue->enqueue(md); +        }); +      tx_streamer_impl<chdr_tx_data_xport>::connect_channel(channel, std::move(xport));  } -void rfnoc_tx_streamer::_register_props(const size_t chan, const std::string& otw_format) +bool rfnoc_tx_streamer::recv_async_msg(uhd::async_metadata_t& async_metadata, +    double timeout) +{ +    const auto timeout_ms = static_cast<uint64_t>(timeout * 1000); +    return _async_msg_queue->recv_async_msg(async_metadata, timeout_ms); +} + +void rfnoc_tx_streamer::_register_props(const size_t chan, +    const std::string& otw_format)  {      // Create actual properties and store them      _scaling_out.push_back(property_t<double>( @@ -137,27 +172,45 @@ void rfnoc_tx_streamer::_register_props(const size_t chan, const std::string& ot      register_property(mtu_out);      // Add resolvers -    add_property_resolver({scaling_out}, {}, -        [&scaling_out = *scaling_out, chan, this]() { +    add_property_resolver( +        {scaling_out}, {}, [& scaling_out = *scaling_out, chan, this]() {              RFNOC_LOG_TRACE("Calling resolver for `scaling_out'@" << chan);              if (scaling_out.is_valid()) {                  this->set_scale_factor(chan, 32767.0 / scaling_out.get());              }          }); -    add_property_resolver({samp_rate_out}, {}, -        [&samp_rate_out = *samp_rate_out, chan, this]() { +    add_property_resolver( +        {samp_rate_out}, {}, [& samp_rate_out = *samp_rate_out, chan, this]() {              RFNOC_LOG_TRACE("Calling resolver for `samp_rate_out'@" << chan);              if (samp_rate_out.is_valid()) {                  this->set_samp_rate(samp_rate_out.get());              }          }); -    add_property_resolver({tick_rate_out}, {}, -        [&tick_rate_out = *tick_rate_out, chan, this]() { +    add_property_resolver( +        {tick_rate_out}, {}, [& tick_rate_out = *tick_rate_out, chan, this]() {              RFNOC_LOG_TRACE("Calling resolver for `tick_rate_out'@" << chan);              if (tick_rate_out.is_valid()) {                  this->set_tick_rate(tick_rate_out.get());              }          });  } + +void rfnoc_tx_streamer::_handle_tx_event_action( +    const res_source_info& src, tx_event_action_info::sptr tx_event_action) +{ +    UHD_ASSERT_THROW(src.type == res_source_info::OUTPUT_EDGE); + +    uhd::async_metadata_t md; +    md.event_code    = tx_event_action->event_code; +    md.channel       = src.instance; +    md.has_time_spec = tx_event_action->has_tsf; + +    if (md.has_time_spec) { +        md.time_spec = time_spec_t::from_ticks(tx_event_action->tsf, get_tick_rate()); +    } + +    RFNOC_LOG_TRACE("Pushing metadata onto tx async msg queue, channel " << md.channel); +    _async_msg_queue->enqueue(md); +} diff --git a/host/lib/rfnoc/tx_async_msg_queue.cpp b/host/lib/rfnoc/tx_async_msg_queue.cpp new file mode 100644 index 000000000..71a05074f --- /dev/null +++ b/host/lib/rfnoc/tx_async_msg_queue.cpp @@ -0,0 +1,52 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include <uhdlib/rfnoc/tx_async_msg_queue.hpp> +#include <chrono> +#include <thread> + +using namespace uhd; +using namespace uhd::rfnoc; + +tx_async_msg_queue::tx_async_msg_queue(size_t capacity) +    : _queue(capacity) +{ +} + +bool tx_async_msg_queue::recv_async_msg(uhd::async_metadata_t& async_metadata, +    int32_t timeout_ms) +{ +    using namespace std::chrono; + +    if (timeout_ms == 0.0) { +        return _queue.pop(async_metadata); +    } + +    const auto end_time = steady_clock::now() + milliseconds(timeout_ms); + +    bool last_check = false; + +    while (true) { +        if (_queue.pop(async_metadata)) { +            return true; +        } + +        if (steady_clock::now() > end_time) { +            if (last_check) { +                return false; +            } else { +                last_check = true; +            } +        } + +        std::this_thread::sleep_for(std::chrono::microseconds(100)); +    } +} + +void tx_async_msg_queue::enqueue(const async_metadata_t& async_metadata) +{ +    _queue.push(async_metadata); +} diff --git a/host/tests/tx_streamer_test.cpp b/host/tests/tx_streamer_test.cpp index cb07cffad..686c1b761 100644 --- a/host/tests/tx_streamer_test.cpp +++ b/host/tests/tx_streamer_test.cpp @@ -86,6 +86,12 @@ public:      {          tx_streamer_impl::set_scale_factor(chan, scale_factor);      } + +    bool recv_async_msg(uhd::async_metadata_t& /*async_metadata*/, +        double /*timeout = 0.1*/) +    { +        return false; +    }  };  }} // namespace uhd::transport  | 
