diff options
| author | Martin Braun <martin.braun@ettus.com> | 2019-05-15 10:26:44 -0700 | 
|---|---|---|
| committer | Martin Braun <martin.braun@ettus.com> | 2019-11-26 11:49:14 -0800 | 
| commit | b8a6c64d6012ab1ec0b3b843fccec2d990d440a3 (patch) | |
| tree | 31a99d71af5a6aa2db2a7c9f2a7d19986a2d3856 | |
| parent | d6251df6347390e74784b2fbe116b0e64780547e (diff) | |
| download | uhd-b8a6c64d6012ab1ec0b3b843fccec2d990d440a3.tar.gz uhd-b8a6c64d6012ab1ec0b3b843fccec2d990d440a3.tar.bz2 uhd-b8a6c64d6012ab1ec0b3b843fccec2d990d440a3.zip  | |
rfnoc: Add action API
- Added action_info class
- Allow to send actions from node to node
- Allow to post actions into nodes
- Allow to set default forwarding policies
- Added unit tests
| -rw-r--r-- | host/include/uhd/rfnoc/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | host/include/uhd/rfnoc/actions.hpp | 50 | ||||
| -rw-r--r-- | host/include/uhd/rfnoc/defaults.hpp | 45 | ||||
| -rw-r--r-- | host/include/uhd/rfnoc/node.hpp | 108 | ||||
| -rw-r--r-- | host/lib/include/uhdlib/rfnoc/graph.hpp | 49 | ||||
| -rw-r--r-- | host/lib/include/uhdlib/rfnoc/node_accessor.hpp | 20 | ||||
| -rw-r--r-- | host/lib/rfnoc/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | host/lib/rfnoc/actions.cpp | 21 | ||||
| -rw-r--r-- | host/lib/rfnoc/graph.cpp | 78 | ||||
| -rw-r--r-- | host/lib/rfnoc/node.cpp | 87 | ||||
| -rw-r--r-- | host/tests/CMakeLists.txt | 13 | ||||
| -rw-r--r-- | host/tests/actions_test.cpp | 81 | ||||
| -rw-r--r-- | host/tests/rfnoc_graph_mock_nodes.hpp | 122 | 
13 files changed, 656 insertions, 20 deletions
diff --git a/host/include/uhd/rfnoc/CMakeLists.txt b/host/include/uhd/rfnoc/CMakeLists.txt index 894d4fda4..052d44090 100644 --- a/host/include/uhd/rfnoc/CMakeLists.txt +++ b/host/include/uhd/rfnoc/CMakeLists.txt @@ -13,6 +13,7 @@ if(ENABLE_RFNOC)          blockdef.hpp          block_id.hpp          constants.hpp +        defaults.hpp          dirtifier.hpp          graph.hpp          node_ctrl_base.hpp diff --git a/host/include/uhd/rfnoc/actions.hpp b/host/include/uhd/rfnoc/actions.hpp new file mode 100644 index 000000000..ac454827c --- /dev/null +++ b/host/include/uhd/rfnoc/actions.hpp @@ -0,0 +1,50 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_RFNOC_ACTIONS_HPP +#define INCLUDED_LIBUHD_RFNOC_ACTIONS_HPP + +#include <uhd/config.hpp> +#include <string> +#include <vector> +#include <memory> + +namespace uhd { namespace rfnoc { + +/*! Container for an action + * + * In the RFNoC context, an action is comparable to a command. Nodes in the + * graph can send each other actions. action_info is the payload of such an + * action message. Nodes pass shared pointers to action_info objects between + * each other to avoid costly copies of large action_info objects. + */ +struct UHD_API action_info +{ +public: +    using sptr = std::shared_ptr<action_info>; +    //! A unique counter for this action +    const size_t id; +    //! A string identifier for this action +    std::string key; +    //! An arbitrary payload. It is up to consumers and producers to +    // (de-)serialize it. +    std::vector<uint8_t> payload; + +    //! Factory function +    static sptr make(const std::string& key="") +    { +        //return std::make_shared<action_info>(key); +        return sptr(new action_info(key)); +    } + +private: +    action_info(const std::string& key); +}; + +}} /* namespace uhd::rfnoc */ + +#endif /* INCLUDED_LIBUHD_RFNOC_ACTIONS_HPP */ + diff --git a/host/include/uhd/rfnoc/defaults.hpp b/host/include/uhd/rfnoc/defaults.hpp new file mode 100644 index 000000000..3eb9e1d30 --- /dev/null +++ b/host/include/uhd/rfnoc/defaults.hpp @@ -0,0 +1,45 @@ +// +// Copyright 2014 Ettus Research LLC +// Copyright 2018 Ettus Research, a National Instruments Company +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_RFNOC_DEFAULTS_HPP +#define INCLUDED_LIBUHD_RFNOC_DEFAULTS_HPP + +#include <string> + +namespace uhd { namespace rfnoc { + +static const std::string CLOCK_KEY_GRAPH("__graph__"); + +static const std::string PROP_KEY_DECIM("decim"); +static const std::string PROP_KEY_SAMP_RATE("samp_rate"); +static const std::string PROP_KEY_SCALING("scaling"); +static const std::string PROP_KEY_TYPE("type"); +static const std::string PROP_KEY_FREQ("freq"); +static const std::string PROP_KEY_TICK_RATE("tick_rate"); +static const std::string PROP_KEY_SPP("spp"); + +static const std::string NODE_ID_SEP("SEP"); + +using io_type_t                     = std::string; +static const io_type_t IO_TYPE_SC16 = "sc16"; + +static const std::string ACTION_KEY_STREAM_CMD("stream_cmd"); +static const std::string ACTION_KEY_RX_EVENT("rx_event"); + +//! If the block name can't be automatically detected, this name is used +static const std::string DEFAULT_BLOCK_NAME = "Block"; +//! This NOC-ID is used to look up the default block +static const uint32_t DEFAULT_NOC_ID = 0xFFFFFFFF; +static const double DEFAULT_TICK_RATE = 1.0; +// Whenever we need a default spp value use this, unless there are some +// block/device-specific constraints. It will keep the frame size below 1500. +static const int DEFAULT_SPP = 1996; + +}} // namespace uhd::rfnoc + +#endif /* INCLUDED_LIBUHD_RFNOC_DEFAULTS_HPP */ + diff --git a/host/include/uhd/rfnoc/node.hpp b/host/include/uhd/rfnoc/node.hpp index 1e634ecea..54c66c985 100644 --- a/host/include/uhd/rfnoc/node.hpp +++ b/host/include/uhd/rfnoc/node.hpp @@ -7,13 +7,14 @@  #ifndef INCLUDED_LIBUHD_RFNOC_NODE_HPP  #define INCLUDED_LIBUHD_RFNOC_NODE_HPP -#include <uhd/rfnoc/property.hpp> +#include <uhd/rfnoc/actions.hpp>  #include <uhd/rfnoc/dirtifier.hpp> -#include <uhd/utils/scope_exit.hpp> +#include <uhd/rfnoc/property.hpp>  #include <uhd/utils/log.hpp> -#include <boost/graph/adjacency_list.hpp> +#include <uhd/utils/scope_exit.hpp>  #include <unordered_map>  #include <unordered_set> +#include <boost/graph/adjacency_list.hpp>  #include <functional>  #include <memory>  #include <mutex> @@ -34,6 +35,8 @@ class UHD_API node_t  public:      using resolver_fn_t = std::function<void(void)>;      using resolve_callback_t = std::function<void(void)>; +    using action_handler_t = +        std::function<void(const res_source_info&, action_info::sptr)>;      //! Types of property/action forwarding for those not defined by the block itself      enum class forwarding_policy_t { @@ -134,11 +137,6 @@ public:      const prop_data_t& get_property(          const std::string& id, const size_t instance = 0) /* mutable */; -    /****************************************** -     * Action Specific -     ******************************************/ -    // TBW -  protected:      /******************************************       * Internal Registration Functions @@ -207,20 +205,58 @@ protected:      void set_prop_forwarding_policy(          forwarding_policy_t policy, const std::string& prop_id = ""); +    /****************************************** +     * Internal action forwarding +     ******************************************/      /*! Handle a request to perform an action. The default action handler       *  ignores user action and forwards port actions.       * -     * \param handler The function that is called to handle the action +     * \param id The action ID for which this action handler is valid. The first +     *           argument to the handler will be a uhd::rfnoc::action_info::sptr, +     *           and its `id` value will match this parameter (unless the same +     *           action handler is registered multiple times). +     *           If this function was previously called with the same `id` value, +     *           the previous action handler is overwritten. +     * \param handler The function that is called to handle the action. It needs +     *                to accept a uhd::rfnoc::res_source_info object, and a +     *                uhd::rfnoc::action_info::sptr.       */ -    // void register_action_handler(std::function< -    // void(const action_info& info, const res_source_info& src) -    //> handler); +    void register_action_handler(const std::string& id, action_handler_t&& handler); + +    /*! Set an action forwarding policy +     * +     * Whenever this node is asked to handle an action that is not registered, +     * this is how the node knows what to do with the action. For example, the +     * FIFO block controller will almost always want to pass on actions to +     * the next block. +     * +     * This method can be called more than once, and it will overwrite previous +     * policies. +     * Typically, this function should only ever be called from within the +     * constructor. +     * +     * \param policy The policy that is applied (see also forwarding_policy_t). +     * \param action_key The action key that this forwarding policy is applied +     *                   to. If \p action_key is not given, it will apply to all +     *                   properties, unless a different policy was given with a +     *                   matching key. +     */ +    void set_action_forwarding_policy( +        forwarding_policy_t policy, const std::string& action_key = ""); + +    /*! Post an action to an up- or downstream node in the graph. +     * +     * If the action is posted to an edge which is not connected, the action +     * is lost. +     * +     * \param edge_info The edge to which this action is posted. If +     *                  edge_info.type == INPUT_EDGE, the that means the action +     *                  will be posted to an upstream node, on port edge_info.instance. +     * \param action A reference to the action info object. +     * \throws uhd::runtime_error if edge_info is not either INPUT_EDGE or OUTPUT_EDGE +     */ +    void post_action(const res_source_info& edge_info, action_info::sptr action); -    /****************************************** -     * Internal action forwarding -     ******************************************/ -    // TBW -    //      //! A dirtifyer object, useful for properties that always need updating.      static dirtifier_t ALWAYS_DIRTY; @@ -369,6 +405,27 @@ private:          property_base_t* incoming_prop, const size_t incoming_port);      /************************************************************************** +     * Action-Related Methods +     *************************************************************************/ +    /*! Sets a callback that this node can call if it wants to post actions to +     * other nodes. +     */ +    void set_post_action_callback(action_handler_t&& post_handler) +    { +        _post_action_cb = std::move(post_handler); +    } + +    /*! This function gets called by the framework when there's a new action for +     * this node. It will then dispatch appropriate action handlers. +     * +     * \param src_info Tells us on which edge this came in. If +     *                 src_info.type == INPUT_EDGE, then we received this action +     *                 on an input edge. +     * \param action A reference to the action object +     */ +    void receive_action(const res_source_info& src_info, action_info::sptr action); + +    /**************************************************************************       * Private helpers       *************************************************************************/      //! Return true if this node has a port that matches \p port_info @@ -414,6 +471,23 @@ private:      std::unordered_map<std::string, forwarding_policy_t> _prop_fwd_policies{{          "", forwarding_policy_t::ONE_TO_ONE}}; +    /************************************************************************** +     * Action-related attributes +     *************************************************************************/ +    mutable std::mutex _action_mutex; + +    //! Storage for action handlers +    std::unordered_map<std::string, action_handler_t> _action_handlers; + +    //! Default action forwarding policies +    std::unordered_map<std::string, forwarding_policy_t> _action_fwd_policies{{ +        "", forwarding_policy_t::ONE_TO_ONE}}; + +    //! Callback which allows us to post actions to other nodes in the graph +    // +    // The default callback will simply drop actions +    action_handler_t _post_action_cb = [](const res_source_info&, +                                           action_info::sptr) { /* nop */ };  }; // class node_t  }} /* namespace uhd::rfnoc */ diff --git a/host/lib/include/uhdlib/rfnoc/graph.hpp b/host/lib/include/uhdlib/rfnoc/graph.hpp index f9fb7ac41..ec6309bd0 100644 --- a/host/lib/include/uhdlib/rfnoc/graph.hpp +++ b/host/lib/include/uhdlib/rfnoc/graph.hpp @@ -7,10 +7,12 @@  #ifndef INCLUDED_LIBUHD_GRAPH_HPP  #define INCLUDED_LIBUHD_GRAPH_HPP +#include <uhd/rfnoc/actions.hpp>  #include <uhd/rfnoc/node.hpp>  #include <boost/graph/adjacency_list.hpp>  #include <tuple>  #include <memory> +#include <deque>  namespace uhd { namespace rfnoc { @@ -192,6 +194,39 @@ private:      void resolve_all_properties();      /************************************************************************** +     * Action API +     *************************************************************************/ +    /*! Entrypoint for action delivery +     * +     * When a node invokes its node_t::post_action() function, eventually that +     * call lands here. This function acts as a mailman, that is, it figures out +     * which edge on which node is supposed to receive this action, and delivers +     * it via the node_t::receive_action() method. +     * Note since this is private, nodes can't directly access this functions. +     * We provide a lambda to nodes for that purpose. +     * +     * When an action is posted, that may trigger further actions. In order not +     * to go into infinite recursion, this function is also responsible for +     * serializing the actions. Even so, it is possible that, due to +     * misconfiguration of nodes and their behaviour, a cascade of actions is +     * posted that never stops. Therefore, another responsibility of this +     * function is to track the number of follow-up messages sent, and terminate +     * an infinite cycle of messages. +     * +     * \param src_node Reference to the node where the post_action() call is +     *                 originating from +     * \param src_edge The edge on that node where the action is being posted to. +     *                 Note that its the edge from the node's point of view, so +     *                 if src_edge.type == OUTPUT_EDGE, then the node posted to +     *                 its output edge. +     * +     * \throws uhd::runtime_error if it has to terminate a infinite cascade of +     *         actions +     */ +    void enqueue_action( +        node_ref_t src_node, res_source_info src_edge, action_info::sptr action); + +    /**************************************************************************       * Private graph helpers       *************************************************************************/      template <typename VertexContainerType> @@ -225,7 +260,7 @@ private:      /*! Find the neighbouring node for \p origin based on \p port_info       *       * This function will check port_info to identify the port number and the -     * direction (input or output) from \p port_info. It will then return a +     * direction (input or output) from \p origin. It will then return a       * reference to the node that is attached to the node \p origin if such a       * node exists, and the edge info.       * @@ -263,6 +298,18 @@ private:      // descriptor without having to traverse the graph. The rfnoc_graph_t is not      // efficient for lookups of vertices.      node_map_t _node_map; + +    using action_tuple_t = std::tuple<node_ref_t, res_source_info, action_info::sptr>; + +    //! FIFO for incoming actions +    std::deque<action_tuple_t> _action_queue; + +    //! Flag to ensure serialized handling of actions +    std::atomic_flag _action_handling_ongoing; + +    //! Mutex for to avoid the user from sending one message before another +    // message is sent +    std::recursive_mutex _action_mutex;  }; diff --git a/host/lib/include/uhdlib/rfnoc/node_accessor.hpp b/host/lib/include/uhdlib/rfnoc/node_accessor.hpp index 554cc8f4f..827c87dd2 100644 --- a/host/lib/include/uhdlib/rfnoc/node_accessor.hpp +++ b/host/lib/include/uhdlib/rfnoc/node_accessor.hpp @@ -7,7 +7,9 @@  #ifndef INCLUDED_LIBUHD_NODE_ACCESSOR_HPP  #define INCLUDED_LIBUHD_NODE_ACCESSOR_HPP +#include <uhd/rfnoc/actions.hpp>  #include <uhd/rfnoc/node.hpp> +#include <uhd/rfnoc/res_source_info.hpp>  #include <functional>  namespace uhd { namespace rfnoc { @@ -77,6 +79,24 @@ public:      {          dst_node->forward_edge_property(incoming_prop, dst_port);      } + +    /*! Set post action callback for the node +     * +     * See node_t::set_post_action_callback() for details. +     */ +    void set_post_action_callback(node_t* node, node_t::action_handler_t&& post_handler) +    { +        node->set_post_action_callback(std::move(post_handler)); +    } + +    /*! Send an action to \p node +     * +     * This will call node_t::receive_action() (see that for details). +     */ +    void send_action(node_t* node, const res_source_info& port_info, action_info::sptr action) +    { +        node->receive_action(port_info, action); +    }  }; diff --git a/host/lib/rfnoc/CMakeLists.txt b/host/lib/rfnoc/CMakeLists.txt index e4715a644..6aab0d499 100644 --- a/host/lib/rfnoc/CMakeLists.txt +++ b/host/lib/rfnoc/CMakeLists.txt @@ -11,6 +11,7 @@  LIBUHD_APPEND_SOURCES(      # Infrastructure: +    ${CMAKE_CURRENT_SOURCE_DIR}/actions.cpp      ${CMAKE_CURRENT_SOURCE_DIR}/async_msg_handler.cpp      ${CMAKE_CURRENT_SOURCE_DIR}/block_container.cpp      ${CMAKE_CURRENT_SOURCE_DIR}/block_ctrl_base.cpp diff --git a/host/lib/rfnoc/actions.cpp b/host/lib/rfnoc/actions.cpp new file mode 100644 index 000000000..1f5f0f2f7 --- /dev/null +++ b/host/lib/rfnoc/actions.cpp @@ -0,0 +1,21 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include <uhd/rfnoc/actions.hpp> +#include <atomic> + +using namespace uhd::rfnoc; + +namespace { +    // A static counter, which we use to uniquely label actions +    std::atomic<size_t> action_counter{0}; +} + +action_info::action_info(const std::string& key_) : id(action_counter++), key(key_) +{ +    // nop +} + diff --git a/host/lib/rfnoc/graph.cpp b/host/lib/rfnoc/graph.cpp index f90f70b43..d311a00bd 100644 --- a/host/lib/rfnoc/graph.cpp +++ b/host/lib/rfnoc/graph.cpp @@ -18,6 +18,7 @@ using namespace uhd::rfnoc::detail;  namespace {  const std::string LOG_ID = "RFNOC::GRAPH::DETAIL"; +constexpr unsigned MAX_ACTION_ITERATIONS = 200;  /*! Helper function to pretty-print edge info   */ @@ -120,6 +121,15 @@ void graph_t::connect(node_ref_t src_node, node_ref_t dst_node, graph_edge_t edg          src_node, [this]() { this->resolve_all_properties(); });      node_accessor.set_resolve_all_callback(          dst_node, [this]() { this->resolve_all_properties(); }); +    // Set post action callbacks: +    node_accessor.set_post_action_callback( +        src_node, [this, src_node](const res_source_info& src, action_info::sptr action) { +            this->enqueue_action(src_node, src, action); +        }); +    node_accessor.set_post_action_callback( +        dst_node, [this, dst_node](const res_source_info& src, action_info::sptr action) { +            this->enqueue_action(dst_node, src, action); +        });      // Add nodes to graph, if not already in there:      _add_node(src_node); @@ -310,6 +320,74 @@ void graph_t::resolve_all_properties()      }  } +void graph_t::enqueue_action( +    node_ref_t src_node, res_source_info src_edge, action_info::sptr action) +{ +    // First, make sure that once we start action handling, no other node from +    // a different thread can throw in their own actions +    std::lock_guard<std::recursive_mutex> l(_action_mutex); + +    // Check if we're already in the middle of handling actions. In that case, +    // we're already in the loop below, and then all we want to do is to enqueue +    // this action tuple. The first call to enqueue_action() within this thread +    // context will have handling_ongoing == false. +    const bool handling_ongoing = _action_handling_ongoing.test_and_set(); + +    _action_queue.emplace_back(std::make_tuple(src_node, src_edge, action)); +    if (handling_ongoing) { +        UHD_LOG_TRACE(LOG_ID, +            "Action handling ongoing, deferring delivery of " << action->key << "#" +                                                              << action->id); +        return; +    } + +    unsigned iteration_count = 0; +    while (!_action_queue.empty()) { +        if (iteration_count++ == MAX_ACTION_ITERATIONS) { +            throw uhd::runtime_error("Terminating action handling: Reached " +                                     "recursion limit!"); +        } + +        // Unpack next action +        auto& next_action               = _action_queue.front(); +        node_ref_t action_src_node      = std::get<0>(next_action); +        res_source_info action_src_port = std::get<1>(next_action); +        action_info::sptr next_action_sptr   = std::get<2>(next_action); +        _action_queue.pop_front(); + +        // Find the node that is supposed to receive this action, and if we find +        // something, then send the action +        auto recipient_info = +            _find_neighbour(_node_map.at(action_src_node), action_src_port); +        if (recipient_info.first == nullptr) { +            UHD_LOG_WARNING(LOG_ID, +                "Cannot forward action " +                    << action->key << " from " << src_node->get_unique_id() +                    << ":" << src_edge.to_string() << ", no neighbour found!"); +        } else { +            node_ref_t recipient_node      = recipient_info.first; +            res_source_info recipient_port = { +                res_source_info::invert_edge(action_src_port.type), +                action_src_port.type == res_source_info::INPUT_EDGE +                    ? recipient_info.second.dst_port +                    : recipient_info.second.src_port}; +            // The following call can cause other nodes to add more actions to +            // the end of _action_queue! +            UHD_LOG_TRACE(LOG_ID, +                "Now delivering action " << next_action_sptr->key << "#" +                                         << next_action_sptr->id); +            node_accessor_t{}.send_action( +                recipient_node, recipient_port, next_action_sptr); +        } +    } +    UHD_LOG_TRACE(LOG_ID, "Delivered all actions, terminating action handling."); + +    // Release the action handling flag +    _action_handling_ongoing.clear(); +    // Now, the _action_mutex is released, and someone else can start sending +    // actions. +} +  /******************************************************************************   * Private methods   *****************************************************************************/ diff --git a/host/lib/rfnoc/node.cpp b/host/lib/rfnoc/node.cpp index 0b724b889..d569cea4a 100644 --- a/host/lib/rfnoc/node.cpp +++ b/host/lib/rfnoc/node.cpp @@ -113,6 +113,27 @@ void node_t::set_prop_forwarding_policy(      _prop_fwd_policies[prop_id] = policy;  } +void node_t::register_action_handler(const std::string& id, action_handler_t&& handler) +{ +    if (_action_handlers.count(id)) { +        _action_handlers.erase(id); +    } +    _action_handlers.emplace(id, std::move(handler)); +} + +void node_t::set_action_forwarding_policy( +    node_t::forwarding_policy_t policy, const std::string& action_key) +{ +    _action_fwd_policies[action_key] = policy; +} + +void node_t::post_action( +    const res_source_info& edge_info, +    action_info::sptr action) +{ +    _post_action_cb(edge_info, action); +} +  /*** Private methods *********************************************************/  property_base_t* node_t::_find_property(      res_source_info src_info, const std::string& id) const @@ -403,6 +424,72 @@ void node_t::forward_edge_property(      prop_accessor.forward<false>(incoming_prop, local_prop);  } +void node_t::receive_action(const res_source_info& src_info, action_info::sptr action) +{ +    std::lock_guard<std::mutex> l(_action_mutex); +    // See if the user defined an action handler for us: +    if (_action_handlers.count(action->key)) { +        _action_handlers.at(action->key)(src_info, action); +        return; +    } + +    // Otherwise, we need to figure out the correct default action handling: +    const auto fwd_policy = [&](const std::string& id) { +        if (_action_fwd_policies.count(id)) { +            return _action_fwd_policies.at(id); +        } +        return _action_fwd_policies.at(""); +    }(action->key); + +    // Now implement custom forwarding for all forwarding policies: +    if (fwd_policy == forwarding_policy_t::DROP) { +        UHD_LOG_TRACE( +            get_unique_id(), "Dropping action " << action->key); +    } +    if (fwd_policy == forwarding_policy_t::ONE_TO_ONE) { +        UHD_LOG_TRACE( +            get_unique_id(), "Forwarding action " << action->key << " to opposite port"); +        const res_source_info dst_info{ +            res_source_info::invert_edge(src_info.type), src_info.instance}; +        if (_has_port(dst_info)) { +            post_action(dst_info, action); +        } +    } +    if (fwd_policy == forwarding_policy_t::ONE_TO_FAN) { +        UHD_LOG_TRACE(get_unique_id(), +            "Forwarding action " << action->key << " to all opposite ports"); +        const auto new_edge_type = res_source_info::invert_edge(src_info.type); +        const size_t num_ports   = new_edge_type == res_source_info::INPUT_EDGE +                                     ? get_num_input_ports() +                                     : get_num_output_ports(); +        for (size_t i = 0; i < num_ports; i++) { +            post_action({new_edge_type, i}, action); +        } +    } +    if (fwd_policy == forwarding_policy_t::ONE_TO_ALL +        || fwd_policy == forwarding_policy_t::ONE_TO_ALL_IN) { +        UHD_LOG_TRACE(get_unique_id(), +            "Forwarding action " << action->key << " to all input ports"); +        for (size_t i = 0; i < get_num_input_ports(); i++) { +            if (src_info.type == res_source_info::INPUT_EDGE && i == src_info.instance) { +                continue; +            } +            post_action({res_source_info::INPUT_EDGE, i}, action); +        } +    } +    if (fwd_policy == forwarding_policy_t::ONE_TO_ALL +        || fwd_policy == forwarding_policy_t::ONE_TO_ALL_OUT) { +        UHD_LOG_TRACE(get_unique_id(), +            "Forwarding action " << action->key << " to all output ports"); +        for (size_t i = 0; i < get_num_output_ports(); i++) { +            if (src_info.type == res_source_info::OUTPUT_EDGE && i == src_info.instance) { +                continue; +            } +            post_action({res_source_info::OUTPUT_EDGE, i}, action); +        } +    } +} +  bool node_t::_has_port(const res_source_info& port_info) const  {      return (port_info.type == res_source_info::INPUT_EDGE diff --git a/host/tests/CMakeLists.txt b/host/tests/CMakeLists.txt index f31daed50..c308abdcb 100644 --- a/host/tests/CMakeLists.txt +++ b/host/tests/CMakeLists.txt @@ -246,6 +246,19 @@ UHD_INSTALL(TARGETS      COMPONENT tests  ) +add_executable(actions_test +    actions_test.cpp +    ${CMAKE_SOURCE_DIR}/lib/rfnoc/graph.cpp +) +target_link_libraries(actions_test uhd ${Boost_LIBRARIES}) +UHD_ADD_TEST(actions_test actions_test) +UHD_INSTALL(TARGETS +    actions_test +    RUNTIME +    DESTINATION ${PKG_LIB_DIR}/tests +    COMPONENT tests +) +  ########################################################################  # demo of a loadable module  ######################################################################## diff --git a/host/tests/actions_test.cpp b/host/tests/actions_test.cpp new file mode 100644 index 000000000..c0344eacf --- /dev/null +++ b/host/tests/actions_test.cpp @@ -0,0 +1,81 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include <uhd/rfnoc/node.hpp> +#include <uhd/rfnoc/actions.hpp> +#include <uhd/utils/log.hpp> +#include <uhdlib/rfnoc/node_accessor.hpp> +#include <uhdlib/rfnoc/prop_accessor.hpp> +#include <uhdlib/rfnoc/graph.hpp> +#include <boost/test/unit_test.hpp> +#include <iostream> + +#include "rfnoc_graph_mock_nodes.hpp" + + +const std::string STREAM_CMD_KEY = "stream_cmd"; + +BOOST_AUTO_TEST_CASE(test_actions_single_node) +{ +    node_accessor_t node_accessor{}; + +    // Define some mock nodes: +    mock_radio_node_t mock_radio(0); + +    auto stream_cmd         = action_info::make(STREAM_CMD_KEY); +    std::string cmd_payload = "START"; +    stream_cmd->payload = std::vector<uint8_t>(cmd_payload.begin(), cmd_payload.end()); + +    auto other_cmd = action_info::make("FOO"); + +    node_accessor.send_action(&mock_radio, {res_source_info::INPUT_EDGE, 0}, stream_cmd); +    node_accessor.send_action(&mock_radio, {res_source_info::INPUT_EDGE, 0}, other_cmd); + +    mock_radio.update_fwd_policy(node_t::forwarding_policy_t::ONE_TO_ONE); +    node_accessor.send_action(&mock_radio, {res_source_info::INPUT_EDGE, 0}, other_cmd); +    mock_radio.update_fwd_policy(node_t::forwarding_policy_t::ONE_TO_FAN); +    node_accessor.send_action(&mock_radio, {res_source_info::INPUT_EDGE, 0}, other_cmd); +    mock_radio.update_fwd_policy(node_t::forwarding_policy_t::ONE_TO_ALL); +    node_accessor.send_action(&mock_radio, {res_source_info::INPUT_EDGE, 0}, other_cmd); +    mock_radio.update_fwd_policy(node_t::forwarding_policy_t::ONE_TO_ALL_IN); +    node_accessor.send_action(&mock_radio, {res_source_info::INPUT_EDGE, 0}, other_cmd); +    mock_radio.update_fwd_policy(node_t::forwarding_policy_t::ONE_TO_ALL_OUT); +    node_accessor.send_action(&mock_radio, {res_source_info::INPUT_EDGE, 0}, other_cmd); +} + +BOOST_AUTO_TEST_CASE(test_actions_simple_graph) +{ +    node_accessor_t node_accessor{}; +    uhd::rfnoc::detail::graph_t graph{}; + +    // Define some mock nodes: +    mock_radio_node_t mock_rx_radio{0}; +    mock_ddc_node_t mock_ddc{}; +    mock_fifo_t mock_fifo{1}; +    mock_streamer_t mock_streamer{1}; + +    // These init calls would normally be done by the framework +    node_accessor.init_props(&mock_rx_radio); +    node_accessor.init_props(&mock_ddc); +    node_accessor.init_props(&mock_fifo); +    node_accessor.init_props(&mock_streamer); + +    graph.connect(&mock_rx_radio, &mock_ddc, {0, 0, graph_edge_t::DYNAMIC, true}); +    graph.connect(&mock_ddc, &mock_fifo, {0, 0, graph_edge_t::DYNAMIC, true}); +    graph.connect(&mock_fifo, &mock_streamer, {0, 0, graph_edge_t::DYNAMIC, true}); +    graph.initialize(); + +    // Force the DDC to actually set a decimation rate != 1 +    mock_streamer.set_property<double>("samp_rate", 10e6, 0); + +    uhd::stream_cmd_t num_samps_cmd(uhd::stream_cmd_t::STREAM_MODE_NUM_SAMPS_AND_DONE); +    constexpr size_t NUM_SAMPS = 100; +    num_samps_cmd.num_samps = NUM_SAMPS; + +    mock_streamer.issue_stream_cmd(num_samps_cmd, 0); +    BOOST_CHECK_EQUAL(NUM_SAMPS * mock_ddc.get_property<int>("decim", 0), +            mock_rx_radio.last_num_samps); +} diff --git a/host/tests/rfnoc_graph_mock_nodes.hpp b/host/tests/rfnoc_graph_mock_nodes.hpp index 85e667ebd..a9d8d4e55 100644 --- a/host/tests/rfnoc_graph_mock_nodes.hpp +++ b/host/tests/rfnoc_graph_mock_nodes.hpp @@ -7,7 +7,9 @@  #ifndef INCLUDED_LIBUHD_TESTS_MOCK_NODES_HPP  #define INCLUDED_LIBUHD_TESTS_MOCK_NODES_HPP +#include <uhd/rfnoc/defaults.hpp>  #include <uhd/rfnoc/node.hpp> +#include <uhd/types/stream_cmd.hpp>  using namespace uhd::rfnoc; @@ -82,9 +84,37 @@ public:                  rssi_resolver_count++;                  _rssi = static_cast<double>(rssi_resolver_count);              }); + + +        set_action_forwarding_policy(forwarding_policy_t::DROP); + +        register_action_handler( +            "stream_cmd", [this](const res_source_info& src, action_info::sptr action) { +                UHD_ASSERT_THROW(action->key == "stream_cmd"); +                const std::string cmd(action->payload.begin(), action->payload.end()); +                UHD_LOG_INFO(get_unique_id(), +                    "Received stream command: " << cmd << " to " << src.to_string()); +                if (cmd == "START") { +                    UHD_LOG_INFO(get_unique_id(), "Starting Stream!"); +                } else if (cmd == "STOP") { +                    UHD_LOG_INFO(get_unique_id(), "Stopping Stream!"); +                } else { +                    this->last_num_samps = std::stoul(cmd); +                    UHD_LOG_INFO(get_unique_id(), +                        "Streaming num samps: " <<  this->last_num_samps); +                } +            });      } -    std::string get_unique_id() const { return "MOCK_RADIO" + std::to_string(_radio_idx); } +    void update_fwd_policy(forwarding_policy_t policy) +    { +        set_action_forwarding_policy(policy); +    } + +    std::string get_unique_id() const +    { +        return "MOCK_RADIO" + std::to_string(_radio_idx); +    }      size_t get_num_input_ports() const      { @@ -101,6 +131,8 @@ public:      bool disable_samp_out_resolver = false;      double force_samp_out_value = 23e6; +    size_t last_num_samps = 0; +  private:      const size_t _radio_idx; @@ -162,6 +194,31 @@ public:                  decim = coerce_decim(int(samp_rate_in.get() / samp_rate_out.get()));                  samp_rate_in = samp_rate_out.get() * decim.get();              }); + +        register_action_handler( +            "stream_cmd", [this](const res_source_info& src, action_info::sptr action) { +                res_source_info dst_edge{ +                    res_source_info::invert_edge(src.type), src.instance}; +                auto new_action = action_info::make(action->key); +                std::string cmd(action->payload.begin(), action->payload.end()); +                if (cmd == "START" || cmd == "STOP") { +                    new_action->payload = action->payload; +                } else { +                    unsigned long long num_samps = std::stoull(cmd); +                    if (src.type == res_source_info::OUTPUT_EDGE) { +                        num_samps *= _decim.get(); +                    } else { +                        num_samps /= _decim.get(); +                    } +                    std::string new_cmd = std::to_string(num_samps); +                    new_action->payload.insert( +                        new_action->payload.begin(), new_cmd.begin(), new_cmd.end()); +                } + +                UHD_LOG_INFO(get_unique_id(), +                    "Forwarding stream_cmd, decim is " << _decim.get()); +                post_action(dst_edge, new_action); +            });      }      std::string get_unique_id() const { return "MOCK_DDC"; } @@ -203,7 +260,7 @@ private:  /*! FIFO   * - * Not much here -- we use it to test dynamic prop forwarding. + * Not much here -- we use it to test dynamic prop and action forwarding.   */  class mock_fifo_t : public node_t  { @@ -211,6 +268,7 @@ public:      mock_fifo_t(const size_t num_ports) : _num_ports(num_ports)      {          set_prop_forwarding_policy(forwarding_policy_t::ONE_TO_ONE); +        set_action_forwarding_policy(forwarding_policy_t::ONE_TO_ONE);      }      std::string get_unique_id() const { return "MOCK_FIFO"; } @@ -230,4 +288,64 @@ private:      const size_t _num_ports;  }; +/*! Streamer + * + * Not much here -- we use it to test dynamic prop and action forwarding. + */ +class mock_streamer_t : public node_t +{ +public: +    mock_streamer_t(const size_t num_ports) : _num_ports(num_ports) +    { +        set_prop_forwarding_policy(forwarding_policy_t::DROP); +        set_action_forwarding_policy(forwarding_policy_t::DROP); +        register_property(&_samp_rate_user); +        register_property(&_samp_rate_in); +        add_property_resolver({&_samp_rate_user}, {&_samp_rate_in}, [this]() { +            UHD_LOG_INFO(get_unique_id(), "Calling resolver for `samp_rate_user'..."); +            _samp_rate_in = _samp_rate_user.get(); +        }); +        add_property_resolver({&_samp_rate_in}, {}, [this]() { +            UHD_LOG_INFO(get_unique_id(), "Calling resolver for `samp_rate_in'..."); +            // nop +        }); +    } + +    std::string get_unique_id() const +    { +        return "MOCK_STREAMER"; +    } + +    size_t get_num_input_ports() const +    { +        return _num_ports; +    } + +    size_t get_num_output_ports() const +    { +        return _num_ports; +    } + +    void issue_stream_cmd(uhd::stream_cmd_t stream_cmd, const size_t chan) +    { +        std::string cmd = +            stream_cmd.stream_mode == uhd::stream_cmd_t::STREAM_MODE_START_CONTINUOUS +                ? "START" +                : stream_cmd.stream_mode == uhd::stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS +                      ? "STOP" +                      : std::to_string(stream_cmd.num_samps); +        auto scmd = action_info::make("stream_cmd"); +        scmd->payload.insert(scmd->payload.begin(), cmd.begin(), cmd.end()); + +        post_action({res_source_info::INPUT_EDGE, chan}, scmd); +    } + +private: +    property_t<double> _samp_rate_user{ +        "samp_rate", 1e6, {res_source_info::USER}}; +    property_t<double> _samp_rate_in{ +        "samp_rate", 1e6, {res_source_info::INPUT_EDGE}}; +    const size_t _num_ports; +}; +  #endif /* INCLUDED_LIBUHD_TESTS_MOCK_NODES_HPP */  | 
