aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/rfnoc/rfnoc_tx_streamer.cpp
diff options
context:
space:
mode:
authorCiro Nishiguchi <ciro.nishiguchi@ni.com>2019-08-08 10:25:20 -0500
committerMartin Braun <martin.braun@ettus.com>2019-11-26 11:49:36 -0800
commitbffef674fbbcd892967017e81515bb76e0b850b5 (patch)
tree8f56eb8548d0fb56094b555ae11d16eb61e6c381 /host/lib/rfnoc/rfnoc_tx_streamer.cpp
parent91e01c484475600fcd659bb433ab86efa5146426 (diff)
downloaduhd-bffef674fbbcd892967017e81515bb76e0b850b5.tar.gz
uhd-bffef674fbbcd892967017e81515bb76e0b850b5.tar.bz2
uhd-bffef674fbbcd892967017e81515bb76e0b850b5.zip
rfnoc: tx_streamer: add support for async messages
Add an async message queue that aggregates errors from multiple sources. Errors can come from the strs packets originating from the stream endpoint or from the radio block through control packets to the host.
Diffstat (limited to 'host/lib/rfnoc/rfnoc_tx_streamer.cpp')
-rw-r--r--host/lib/rfnoc/rfnoc_tx_streamer.cpp69
1 files changed, 61 insertions, 8 deletions
diff --git a/host/lib/rfnoc/rfnoc_tx_streamer.cpp b/host/lib/rfnoc/rfnoc_tx_streamer.cpp
index 61d714a85..4fc1a3ff8 100644
--- a/host/lib/rfnoc/rfnoc_tx_streamer.cpp
+++ b/host/lib/rfnoc/rfnoc_tx_streamer.cpp
@@ -14,6 +14,7 @@ using namespace uhd::rfnoc;
const std::string STREAMER_ID = "TxStreamer";
static std::atomic<uint64_t> streamer_inst_ctr;
+static constexpr size_t ASYNC_MSG_QUEUE_SIZE = 1000;
rfnoc_tx_streamer::rfnoc_tx_streamer(const size_t num_chans,
const uhd::stream_args_t stream_args)
@@ -21,10 +22,23 @@ rfnoc_tx_streamer::rfnoc_tx_streamer(const size_t num_chans,
, _unique_id(STREAMER_ID + "#" + std::to_string(streamer_inst_ctr++))
, _stream_args(stream_args)
{
+ _async_msg_queue = std::make_shared<tx_async_msg_queue>(ASYNC_MSG_QUEUE_SIZE);
+
// No block to which to forward properties or actions
set_prop_forwarding_policy(forwarding_policy_t::DROP);
set_action_forwarding_policy(forwarding_policy_t::DROP);
+ register_action_handler(ACTION_KEY_TX_EVENT,
+ [this](const res_source_info& src, action_info::sptr action) {
+ tx_event_action_info::sptr tx_event_action =
+ std::dynamic_pointer_cast<tx_event_action_info>(action);
+ if (!tx_event_action) {
+ RFNOC_LOG_WARNING("Received invalid TX event action!");
+ return;
+ }
+ _handle_tx_event_action(src, tx_event_action);
+ });
+
// Initialize properties
_scaling_out.reserve(num_chans);
_samp_rate_out.reserve(num_chans);
@@ -41,7 +55,6 @@ rfnoc_tx_streamer::rfnoc_tx_streamer(const size_t num_chans,
for (auto& mtu_prop : _mtu_out) {
mtu_resolver_out.insert(&mtu_prop);
}
- //property_t<size_t>* mtu_out = &_mtu_out.back();
add_property_resolver({&_mtu_out[i]}, std::move(mtu_resolver_out),
[&mtu_out = _mtu_out[i], i, this]() {
@@ -105,10 +118,32 @@ void rfnoc_tx_streamer::connect_channel(
const size_t mtu = xport->get_max_payload_size();
set_property<size_t>(PROP_KEY_MTU, mtu, {res_source_info::OUTPUT_EDGE, channel});
+ xport->set_enqueue_async_msg_fn(
+ [this, channel](async_metadata_t::event_code_t event_code, bool has_tsf, uint64_t tsf) {
+ async_metadata_t md;
+ md.channel = channel;
+ md.event_code = event_code;
+ md.has_time_spec = has_tsf;
+
+ if (has_tsf) {
+ md.time_spec = time_spec_t::from_ticks(tsf, get_tick_rate());
+ }
+
+ this->_async_msg_queue->enqueue(md);
+ });
+
tx_streamer_impl<chdr_tx_data_xport>::connect_channel(channel, std::move(xport));
}
-void rfnoc_tx_streamer::_register_props(const size_t chan, const std::string& otw_format)
+bool rfnoc_tx_streamer::recv_async_msg(uhd::async_metadata_t& async_metadata,
+ double timeout)
+{
+ const auto timeout_ms = static_cast<uint64_t>(timeout * 1000);
+ return _async_msg_queue->recv_async_msg(async_metadata, timeout_ms);
+}
+
+void rfnoc_tx_streamer::_register_props(const size_t chan,
+ const std::string& otw_format)
{
// Create actual properties and store them
_scaling_out.push_back(property_t<double>(
@@ -137,27 +172,45 @@ void rfnoc_tx_streamer::_register_props(const size_t chan, const std::string& ot
register_property(mtu_out);
// Add resolvers
- add_property_resolver({scaling_out}, {},
- [&scaling_out = *scaling_out, chan, this]() {
+ add_property_resolver(
+ {scaling_out}, {}, [& scaling_out = *scaling_out, chan, this]() {
RFNOC_LOG_TRACE("Calling resolver for `scaling_out'@" << chan);
if (scaling_out.is_valid()) {
this->set_scale_factor(chan, 32767.0 / scaling_out.get());
}
});
- add_property_resolver({samp_rate_out}, {},
- [&samp_rate_out = *samp_rate_out, chan, this]() {
+ add_property_resolver(
+ {samp_rate_out}, {}, [& samp_rate_out = *samp_rate_out, chan, this]() {
RFNOC_LOG_TRACE("Calling resolver for `samp_rate_out'@" << chan);
if (samp_rate_out.is_valid()) {
this->set_samp_rate(samp_rate_out.get());
}
});
- add_property_resolver({tick_rate_out}, {},
- [&tick_rate_out = *tick_rate_out, chan, this]() {
+ add_property_resolver(
+ {tick_rate_out}, {}, [& tick_rate_out = *tick_rate_out, chan, this]() {
RFNOC_LOG_TRACE("Calling resolver for `tick_rate_out'@" << chan);
if (tick_rate_out.is_valid()) {
this->set_tick_rate(tick_rate_out.get());
}
});
}
+
+void rfnoc_tx_streamer::_handle_tx_event_action(
+ const res_source_info& src, tx_event_action_info::sptr tx_event_action)
+{
+ UHD_ASSERT_THROW(src.type == res_source_info::OUTPUT_EDGE);
+
+ uhd::async_metadata_t md;
+ md.event_code = tx_event_action->event_code;
+ md.channel = src.instance;
+ md.has_time_spec = tx_event_action->has_tsf;
+
+ if (md.has_time_spec) {
+ md.time_spec = time_spec_t::from_ticks(tx_event_action->tsf, get_tick_rate());
+ }
+
+ RFNOC_LOG_TRACE("Pushing metadata onto tx async msg queue, channel " << md.channel);
+ _async_msg_queue->enqueue(md);
+}