diff options
author | Ciro Nishiguchi <ciro.nishiguchi@ni.com> | 2019-08-08 10:25:20 -0500 |
---|---|---|
committer | Martin Braun <martin.braun@ettus.com> | 2019-11-26 11:49:36 -0800 |
commit | bffef674fbbcd892967017e81515bb76e0b850b5 (patch) | |
tree | 8f56eb8548d0fb56094b555ae11d16eb61e6c381 /host/lib/rfnoc/rfnoc_tx_streamer.cpp | |
parent | 91e01c484475600fcd659bb433ab86efa5146426 (diff) | |
download | uhd-bffef674fbbcd892967017e81515bb76e0b850b5.tar.gz uhd-bffef674fbbcd892967017e81515bb76e0b850b5.tar.bz2 uhd-bffef674fbbcd892967017e81515bb76e0b850b5.zip |
rfnoc: tx_streamer: add support for async messages
Add an async message queue that aggregates errors from multiple sources.
Errors can come from the strs packets originating from the stream
endpoint or from the radio block through control packets to the host.
Diffstat (limited to 'host/lib/rfnoc/rfnoc_tx_streamer.cpp')
-rw-r--r-- | host/lib/rfnoc/rfnoc_tx_streamer.cpp | 69 |
1 files changed, 61 insertions, 8 deletions
diff --git a/host/lib/rfnoc/rfnoc_tx_streamer.cpp b/host/lib/rfnoc/rfnoc_tx_streamer.cpp index 61d714a85..4fc1a3ff8 100644 --- a/host/lib/rfnoc/rfnoc_tx_streamer.cpp +++ b/host/lib/rfnoc/rfnoc_tx_streamer.cpp @@ -14,6 +14,7 @@ using namespace uhd::rfnoc; const std::string STREAMER_ID = "TxStreamer"; static std::atomic<uint64_t> streamer_inst_ctr; +static constexpr size_t ASYNC_MSG_QUEUE_SIZE = 1000; rfnoc_tx_streamer::rfnoc_tx_streamer(const size_t num_chans, const uhd::stream_args_t stream_args) @@ -21,10 +22,23 @@ rfnoc_tx_streamer::rfnoc_tx_streamer(const size_t num_chans, , _unique_id(STREAMER_ID + "#" + std::to_string(streamer_inst_ctr++)) , _stream_args(stream_args) { + _async_msg_queue = std::make_shared<tx_async_msg_queue>(ASYNC_MSG_QUEUE_SIZE); + // No block to which to forward properties or actions set_prop_forwarding_policy(forwarding_policy_t::DROP); set_action_forwarding_policy(forwarding_policy_t::DROP); + register_action_handler(ACTION_KEY_TX_EVENT, + [this](const res_source_info& src, action_info::sptr action) { + tx_event_action_info::sptr tx_event_action = + std::dynamic_pointer_cast<tx_event_action_info>(action); + if (!tx_event_action) { + RFNOC_LOG_WARNING("Received invalid TX event action!"); + return; + } + _handle_tx_event_action(src, tx_event_action); + }); + // Initialize properties _scaling_out.reserve(num_chans); _samp_rate_out.reserve(num_chans); @@ -41,7 +55,6 @@ rfnoc_tx_streamer::rfnoc_tx_streamer(const size_t num_chans, for (auto& mtu_prop : _mtu_out) { mtu_resolver_out.insert(&mtu_prop); } - //property_t<size_t>* mtu_out = &_mtu_out.back(); add_property_resolver({&_mtu_out[i]}, std::move(mtu_resolver_out), [&mtu_out = _mtu_out[i], i, this]() { @@ -105,10 +118,32 @@ void rfnoc_tx_streamer::connect_channel( const size_t mtu = xport->get_max_payload_size(); set_property<size_t>(PROP_KEY_MTU, mtu, {res_source_info::OUTPUT_EDGE, channel}); + xport->set_enqueue_async_msg_fn( + [this, channel](async_metadata_t::event_code_t event_code, bool has_tsf, uint64_t tsf) { + async_metadata_t md; + md.channel = channel; + md.event_code = event_code; + md.has_time_spec = has_tsf; + + if (has_tsf) { + md.time_spec = time_spec_t::from_ticks(tsf, get_tick_rate()); + } + + this->_async_msg_queue->enqueue(md); + }); + tx_streamer_impl<chdr_tx_data_xport>::connect_channel(channel, std::move(xport)); } -void rfnoc_tx_streamer::_register_props(const size_t chan, const std::string& otw_format) +bool rfnoc_tx_streamer::recv_async_msg(uhd::async_metadata_t& async_metadata, + double timeout) +{ + const auto timeout_ms = static_cast<uint64_t>(timeout * 1000); + return _async_msg_queue->recv_async_msg(async_metadata, timeout_ms); +} + +void rfnoc_tx_streamer::_register_props(const size_t chan, + const std::string& otw_format) { // Create actual properties and store them _scaling_out.push_back(property_t<double>( @@ -137,27 +172,45 @@ void rfnoc_tx_streamer::_register_props(const size_t chan, const std::string& ot register_property(mtu_out); // Add resolvers - add_property_resolver({scaling_out}, {}, - [&scaling_out = *scaling_out, chan, this]() { + add_property_resolver( + {scaling_out}, {}, [& scaling_out = *scaling_out, chan, this]() { RFNOC_LOG_TRACE("Calling resolver for `scaling_out'@" << chan); if (scaling_out.is_valid()) { this->set_scale_factor(chan, 32767.0 / scaling_out.get()); } }); - add_property_resolver({samp_rate_out}, {}, - [&samp_rate_out = *samp_rate_out, chan, this]() { + add_property_resolver( + {samp_rate_out}, {}, [& samp_rate_out = *samp_rate_out, chan, this]() { RFNOC_LOG_TRACE("Calling resolver for `samp_rate_out'@" << chan); if (samp_rate_out.is_valid()) { this->set_samp_rate(samp_rate_out.get()); } }); - add_property_resolver({tick_rate_out}, {}, - [&tick_rate_out = *tick_rate_out, chan, this]() { + add_property_resolver( + {tick_rate_out}, {}, [& tick_rate_out = *tick_rate_out, chan, this]() { RFNOC_LOG_TRACE("Calling resolver for `tick_rate_out'@" << chan); if (tick_rate_out.is_valid()) { this->set_tick_rate(tick_rate_out.get()); } }); } + +void rfnoc_tx_streamer::_handle_tx_event_action( + const res_source_info& src, tx_event_action_info::sptr tx_event_action) +{ + UHD_ASSERT_THROW(src.type == res_source_info::OUTPUT_EDGE); + + uhd::async_metadata_t md; + md.event_code = tx_event_action->event_code; + md.channel = src.instance; + md.has_time_spec = tx_event_action->has_tsf; + + if (md.has_time_spec) { + md.time_spec = time_spec_t::from_ticks(tx_event_action->tsf, get_tick_rate()); + } + + RFNOC_LOG_TRACE("Pushing metadata onto tx async msg queue, channel " << md.channel); + _async_msg_queue->enqueue(md); +} |