diff options
Diffstat (limited to 'host/lib/rfnoc/async_msg_handler.cpp')
-rw-r--r-- | host/lib/rfnoc/async_msg_handler.cpp | 110 |
1 files changed, 47 insertions, 63 deletions
diff --git a/host/lib/rfnoc/async_msg_handler.cpp b/host/lib/rfnoc/async_msg_handler.cpp index b412eec9d..6b7d7d057 100644 --- a/host/lib/rfnoc/async_msg_handler.cpp +++ b/host/lib/rfnoc/async_msg_handler.cpp @@ -5,12 +5,12 @@ // #include <uhd/exception.hpp> -#include <uhd/utils/tasks.hpp> -#include <uhd/utils/byteswap.hpp> -#include <uhd/utils/log.hpp> +#include <uhd/transport/bounded_buffer.hpp> #include <uhd/transport/chdr.hpp> #include <uhd/transport/zero_copy.hpp> -#include <uhd/transport/bounded_buffer.hpp> +#include <uhd/utils/byteswap.hpp> +#include <uhd/utils/log.hpp> +#include <uhd/utils/tasks.hpp> #include <uhdlib/rfnoc/async_msg_handler.hpp> #include <boost/make_shared.hpp> #include <mutex> @@ -30,19 +30,13 @@ public: /************************************************************************ * Structors ***********************************************************************/ - async_msg_handler_impl( - uhd::transport::zero_copy_if::sptr recv, - uhd::transport::zero_copy_if::sptr send, - uhd::sid_t sid - ) : _rx_xport(recv), - _tx_xport(send), - _sid(sid) + async_msg_handler_impl(uhd::transport::zero_copy_if::sptr recv, + uhd::transport::zero_copy_if::sptr send, + uhd::sid_t sid) + : _rx_xport(recv), _tx_xport(send), _sid(sid) { // Launch receive thread - _recv_msg_task = task::make([=](){ - this->handle_async_msgs(); - } - ); + _recv_msg_task = task::make([=]() { this->handle_async_msgs(); }); } ~async_msg_handler_impl() {} @@ -51,23 +45,21 @@ public: * API calls ***********************************************************************/ int register_event_handler( - const async_msg_t::event_code_t event_code, - async_handler_type handler - ) { - _event_handlers.insert(std::pair<async_msg_t::event_code_t, async_handler_type>(event_code, handler)); + const async_msg_t::event_code_t event_code, async_handler_type handler) + { + _event_handlers.insert(std::pair<async_msg_t::event_code_t, async_handler_type>( + event_code, handler)); return _event_handlers.count(event_code); } - void post_async_msg( - const async_msg_t &metadata - ) { + void post_async_msg(const async_msg_t& metadata) + { std::lock_guard<std::mutex> lock(_mutex); for (auto const event_handler : _event_handlers) { // If the event code in the message matches the event code used at // registration time, call the event handler - if ((metadata.event_code & event_handler.first) - == event_handler.first) { + if ((metadata.event_code & event_handler.first) == event_handler.first) { event_handler.second(metadata); } } @@ -75,15 +67,13 @@ public: // Print if (metadata.event_code & async_msg_t::EVENT_CODE_UNDERFLOW) { UHD_LOG_FASTPATH("U") - } else if (metadata.event_code & - ( async_msg_t::EVENT_CODE_SEQ_ERROR - | async_msg_t::EVENT_CODE_SEQ_ERROR_IN_BURST) - ) { + } else if (metadata.event_code + & (async_msg_t::EVENT_CODE_SEQ_ERROR + | async_msg_t::EVENT_CODE_SEQ_ERROR_IN_BURST)) { UHD_LOG_FASTPATH("S") - } else if (metadata.event_code & - ( async_msg_t::EVENT_CODE_LATE_CMD_ERROR - | async_msg_t::EVENT_CODE_LATE_DATA_ERROR) - ) { + } else if (metadata.event_code + & (async_msg_t::EVENT_CODE_LATE_CMD_ERROR + | async_msg_t::EVENT_CODE_LATE_DATA_ERROR)) { UHD_LOG_FASTPATH("L") } else if (metadata.event_code & async_msg_t::EVENT_CODE_OVERRUN) { UHD_LOG_FASTPATH("O") @@ -96,7 +86,7 @@ private: // methods ***********************************************************************/ /*! Packet receiver thread call. */ - void handle_async_msgs( ) + void handle_async_msgs() { using namespace uhd::transport; managed_recv_buffer::sptr buff = _rx_xport->get_recv_buff(); @@ -105,10 +95,10 @@ private: // methods // Get packet info vrt::if_packet_info_t if_packet_info; - if_packet_info.num_packet_words32 = buff->size()/sizeof(uint32_t); - const uint32_t *packet_buff = buff->cast<const uint32_t *>(); + if_packet_info.num_packet_words32 = buff->size() / sizeof(uint32_t); + const uint32_t* packet_buff = buff->cast<const uint32_t*>(); - //unpacking can fail + // unpacking can fail uint32_t (*endian_conv)(uint32_t) = uhd::ntohx; try { if (_endianness == ENDIANNESS_BIG) { @@ -118,31 +108,30 @@ private: // methods vrt::chdr::if_hdr_unpack_le(packet_buff, if_packet_info); endian_conv = uhd::wtohx; } - } - catch (const uhd::value_error &ex) { - UHD_LOGGER_ERROR("RFNOC") << "[async message handler] Error parsing async message packet: " << ex.what() << std::endl; + } catch (const uhd::value_error& ex) { + UHD_LOGGER_ERROR("RFNOC") + << "[async message handler] Error parsing async message packet: " + << ex.what() << std::endl; return; } // We discard anything that's not actually a command or response packet. - if (not (if_packet_info.packet_type & vrt::if_packet_info_t::PACKET_TYPE_CMD) - or if_packet_info.num_packet_words32 == 0) { + if (not(if_packet_info.packet_type & vrt::if_packet_info_t::PACKET_TYPE_CMD) + or if_packet_info.num_packet_words32 == 0) { return; } - const uint32_t *payload = packet_buff + if_packet_info.num_header_words32; + const uint32_t* payload = packet_buff + if_packet_info.num_header_words32; async_msg_t metadata(if_packet_info.num_payload_words32 - 1); metadata.has_time_spec = if_packet_info.has_tsf; // FIXME: not hardcoding tick rate - metadata.time_spec = time_spec_t::from_ticks(if_packet_info.tsf, 1); - metadata.event_code = async_msg_t::event_code_t( - endian_conv(payload[0]) & 0xFFFF - ); - metadata.sid = if_packet_info.sid; + metadata.time_spec = time_spec_t::from_ticks(if_packet_info.tsf, 1); + metadata.event_code = async_msg_t::event_code_t(endian_conv(payload[0]) & 0xFFFF); + metadata.sid = if_packet_info.sid; - //load user payload + // load user payload for (size_t i = 1; i < if_packet_info.num_payload_words32; i++) { - metadata.payload[i-1] = endian_conv(payload[i]); + metadata.payload[i - 1] = endian_conv(payload[i]); } this->post_async_msg(metadata); @@ -152,16 +141,15 @@ private: // methods { return _sid.get_src(); } - -private: // members +private: // members std::mutex _mutex; //! Store event handlers std::multimap<async_msg_t::event_code_t, async_handler_type> _event_handlers; //! port that receive messge uhd::transport::zero_copy_if::sptr _rx_xport; - //!port that send out respond + //! port that send out respond uhd::transport::zero_copy_if::sptr _tx_xport; //! The source part of \p _sid is the address of this async message handler. @@ -171,20 +159,16 @@ private: // members task::sptr _recv_msg_task; }; -async_msg_handler::sptr async_msg_handler::make( - uhd::transport::zero_copy_if::sptr recv, +async_msg_handler::sptr async_msg_handler::make(uhd::transport::zero_copy_if::sptr recv, uhd::transport::zero_copy_if::sptr send, uhd::sid_t sid, - endianness_t endianness -) { + endianness_t endianness) +{ if (endianness == ENDIANNESS_BIG) { - return boost::make_shared< async_msg_handler_impl<ENDIANNESS_BIG> >( - recv, send, sid - ); + return boost::make_shared<async_msg_handler_impl<ENDIANNESS_BIG>>( + recv, send, sid); } else { - return boost::make_shared< async_msg_handler_impl<ENDIANNESS_LITTLE> >( - recv, send, sid - ); + return boost::make_shared<async_msg_handler_impl<ENDIANNESS_LITTLE>>( + recv, send, sid); } } - |