aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/rfnoc/async_msg_handler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/rfnoc/async_msg_handler.cpp')
-rw-r--r--host/lib/rfnoc/async_msg_handler.cpp110
1 files changed, 47 insertions, 63 deletions
diff --git a/host/lib/rfnoc/async_msg_handler.cpp b/host/lib/rfnoc/async_msg_handler.cpp
index b412eec9d..6b7d7d057 100644
--- a/host/lib/rfnoc/async_msg_handler.cpp
+++ b/host/lib/rfnoc/async_msg_handler.cpp
@@ -5,12 +5,12 @@
//
#include <uhd/exception.hpp>
-#include <uhd/utils/tasks.hpp>
-#include <uhd/utils/byteswap.hpp>
-#include <uhd/utils/log.hpp>
+#include <uhd/transport/bounded_buffer.hpp>
#include <uhd/transport/chdr.hpp>
#include <uhd/transport/zero_copy.hpp>
-#include <uhd/transport/bounded_buffer.hpp>
+#include <uhd/utils/byteswap.hpp>
+#include <uhd/utils/log.hpp>
+#include <uhd/utils/tasks.hpp>
#include <uhdlib/rfnoc/async_msg_handler.hpp>
#include <boost/make_shared.hpp>
#include <mutex>
@@ -30,19 +30,13 @@ public:
/************************************************************************
* Structors
***********************************************************************/
- async_msg_handler_impl(
- uhd::transport::zero_copy_if::sptr recv,
- uhd::transport::zero_copy_if::sptr send,
- uhd::sid_t sid
- ) : _rx_xport(recv),
- _tx_xport(send),
- _sid(sid)
+ async_msg_handler_impl(uhd::transport::zero_copy_if::sptr recv,
+ uhd::transport::zero_copy_if::sptr send,
+ uhd::sid_t sid)
+ : _rx_xport(recv), _tx_xport(send), _sid(sid)
{
// Launch receive thread
- _recv_msg_task = task::make([=](){
- this->handle_async_msgs();
- }
- );
+ _recv_msg_task = task::make([=]() { this->handle_async_msgs(); });
}
~async_msg_handler_impl() {}
@@ -51,23 +45,21 @@ public:
* API calls
***********************************************************************/
int register_event_handler(
- const async_msg_t::event_code_t event_code,
- async_handler_type handler
- ) {
- _event_handlers.insert(std::pair<async_msg_t::event_code_t, async_handler_type>(event_code, handler));
+ const async_msg_t::event_code_t event_code, async_handler_type handler)
+ {
+ _event_handlers.insert(std::pair<async_msg_t::event_code_t, async_handler_type>(
+ event_code, handler));
return _event_handlers.count(event_code);
}
- void post_async_msg(
- const async_msg_t &metadata
- ) {
+ void post_async_msg(const async_msg_t& metadata)
+ {
std::lock_guard<std::mutex> lock(_mutex);
for (auto const event_handler : _event_handlers) {
// If the event code in the message matches the event code used at
// registration time, call the event handler
- if ((metadata.event_code & event_handler.first)
- == event_handler.first) {
+ if ((metadata.event_code & event_handler.first) == event_handler.first) {
event_handler.second(metadata);
}
}
@@ -75,15 +67,13 @@ public:
// Print
if (metadata.event_code & async_msg_t::EVENT_CODE_UNDERFLOW) {
UHD_LOG_FASTPATH("U")
- } else if (metadata.event_code &
- ( async_msg_t::EVENT_CODE_SEQ_ERROR
- | async_msg_t::EVENT_CODE_SEQ_ERROR_IN_BURST)
- ) {
+ } else if (metadata.event_code
+ & (async_msg_t::EVENT_CODE_SEQ_ERROR
+ | async_msg_t::EVENT_CODE_SEQ_ERROR_IN_BURST)) {
UHD_LOG_FASTPATH("S")
- } else if (metadata.event_code &
- ( async_msg_t::EVENT_CODE_LATE_CMD_ERROR
- | async_msg_t::EVENT_CODE_LATE_DATA_ERROR)
- ) {
+ } else if (metadata.event_code
+ & (async_msg_t::EVENT_CODE_LATE_CMD_ERROR
+ | async_msg_t::EVENT_CODE_LATE_DATA_ERROR)) {
UHD_LOG_FASTPATH("L")
} else if (metadata.event_code & async_msg_t::EVENT_CODE_OVERRUN) {
UHD_LOG_FASTPATH("O")
@@ -96,7 +86,7 @@ private: // methods
***********************************************************************/
/*! Packet receiver thread call.
*/
- void handle_async_msgs( )
+ void handle_async_msgs()
{
using namespace uhd::transport;
managed_recv_buffer::sptr buff = _rx_xport->get_recv_buff();
@@ -105,10 +95,10 @@ private: // methods
// Get packet info
vrt::if_packet_info_t if_packet_info;
- if_packet_info.num_packet_words32 = buff->size()/sizeof(uint32_t);
- const uint32_t *packet_buff = buff->cast<const uint32_t *>();
+ if_packet_info.num_packet_words32 = buff->size() / sizeof(uint32_t);
+ const uint32_t* packet_buff = buff->cast<const uint32_t*>();
- //unpacking can fail
+ // unpacking can fail
uint32_t (*endian_conv)(uint32_t) = uhd::ntohx;
try {
if (_endianness == ENDIANNESS_BIG) {
@@ -118,31 +108,30 @@ private: // methods
vrt::chdr::if_hdr_unpack_le(packet_buff, if_packet_info);
endian_conv = uhd::wtohx;
}
- }
- catch (const uhd::value_error &ex) {
- UHD_LOGGER_ERROR("RFNOC") << "[async message handler] Error parsing async message packet: " << ex.what() << std::endl;
+ } catch (const uhd::value_error& ex) {
+ UHD_LOGGER_ERROR("RFNOC")
+ << "[async message handler] Error parsing async message packet: "
+ << ex.what() << std::endl;
return;
}
// We discard anything that's not actually a command or response packet.
- if (not (if_packet_info.packet_type & vrt::if_packet_info_t::PACKET_TYPE_CMD)
- or if_packet_info.num_packet_words32 == 0) {
+ if (not(if_packet_info.packet_type & vrt::if_packet_info_t::PACKET_TYPE_CMD)
+ or if_packet_info.num_packet_words32 == 0) {
return;
}
- const uint32_t *payload = packet_buff + if_packet_info.num_header_words32;
+ const uint32_t* payload = packet_buff + if_packet_info.num_header_words32;
async_msg_t metadata(if_packet_info.num_payload_words32 - 1);
metadata.has_time_spec = if_packet_info.has_tsf;
// FIXME: not hardcoding tick rate
- metadata.time_spec = time_spec_t::from_ticks(if_packet_info.tsf, 1);
- metadata.event_code = async_msg_t::event_code_t(
- endian_conv(payload[0]) & 0xFFFF
- );
- metadata.sid = if_packet_info.sid;
+ metadata.time_spec = time_spec_t::from_ticks(if_packet_info.tsf, 1);
+ metadata.event_code = async_msg_t::event_code_t(endian_conv(payload[0]) & 0xFFFF);
+ metadata.sid = if_packet_info.sid;
- //load user payload
+ // load user payload
for (size_t i = 1; i < if_packet_info.num_payload_words32; i++) {
- metadata.payload[i-1] = endian_conv(payload[i]);
+ metadata.payload[i - 1] = endian_conv(payload[i]);
}
this->post_async_msg(metadata);
@@ -152,16 +141,15 @@ private: // methods
{
return _sid.get_src();
}
-
-private: // members
+private: // members
std::mutex _mutex;
//! Store event handlers
std::multimap<async_msg_t::event_code_t, async_handler_type> _event_handlers;
//! port that receive messge
uhd::transport::zero_copy_if::sptr _rx_xport;
- //!port that send out respond
+ //! port that send out respond
uhd::transport::zero_copy_if::sptr _tx_xport;
//! The source part of \p _sid is the address of this async message handler.
@@ -171,20 +159,16 @@ private: // members
task::sptr _recv_msg_task;
};
-async_msg_handler::sptr async_msg_handler::make(
- uhd::transport::zero_copy_if::sptr recv,
+async_msg_handler::sptr async_msg_handler::make(uhd::transport::zero_copy_if::sptr recv,
uhd::transport::zero_copy_if::sptr send,
uhd::sid_t sid,
- endianness_t endianness
-) {
+ endianness_t endianness)
+{
if (endianness == ENDIANNESS_BIG) {
- return boost::make_shared< async_msg_handler_impl<ENDIANNESS_BIG> >(
- recv, send, sid
- );
+ return boost::make_shared<async_msg_handler_impl<ENDIANNESS_BIG>>(
+ recv, send, sid);
} else {
- return boost::make_shared< async_msg_handler_impl<ENDIANNESS_LITTLE> >(
- recv, send, sid
- );
+ return boost::make_shared<async_msg_handler_impl<ENDIANNESS_LITTLE>>(
+ recv, send, sid);
}
}
-