aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/rfnoc/rfnoc_tx_streamer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/rfnoc/rfnoc_tx_streamer.cpp')
-rw-r--r--host/lib/rfnoc/rfnoc_tx_streamer.cpp69
1 files changed, 61 insertions, 8 deletions
diff --git a/host/lib/rfnoc/rfnoc_tx_streamer.cpp b/host/lib/rfnoc/rfnoc_tx_streamer.cpp
index 61d714a85..4fc1a3ff8 100644
--- a/host/lib/rfnoc/rfnoc_tx_streamer.cpp
+++ b/host/lib/rfnoc/rfnoc_tx_streamer.cpp
@@ -14,6 +14,7 @@ using namespace uhd::rfnoc;
const std::string STREAMER_ID = "TxStreamer";
static std::atomic<uint64_t> streamer_inst_ctr;
+static constexpr size_t ASYNC_MSG_QUEUE_SIZE = 1000;
rfnoc_tx_streamer::rfnoc_tx_streamer(const size_t num_chans,
const uhd::stream_args_t stream_args)
@@ -21,10 +22,23 @@ rfnoc_tx_streamer::rfnoc_tx_streamer(const size_t num_chans,
, _unique_id(STREAMER_ID + "#" + std::to_string(streamer_inst_ctr++))
, _stream_args(stream_args)
{
+ _async_msg_queue = std::make_shared<tx_async_msg_queue>(ASYNC_MSG_QUEUE_SIZE);
+
// No block to which to forward properties or actions
set_prop_forwarding_policy(forwarding_policy_t::DROP);
set_action_forwarding_policy(forwarding_policy_t::DROP);
+ register_action_handler(ACTION_KEY_TX_EVENT,
+ [this](const res_source_info& src, action_info::sptr action) {
+ tx_event_action_info::sptr tx_event_action =
+ std::dynamic_pointer_cast<tx_event_action_info>(action);
+ if (!tx_event_action) {
+ RFNOC_LOG_WARNING("Received invalid TX event action!");
+ return;
+ }
+ _handle_tx_event_action(src, tx_event_action);
+ });
+
// Initialize properties
_scaling_out.reserve(num_chans);
_samp_rate_out.reserve(num_chans);
@@ -41,7 +55,6 @@ rfnoc_tx_streamer::rfnoc_tx_streamer(const size_t num_chans,
for (auto& mtu_prop : _mtu_out) {
mtu_resolver_out.insert(&mtu_prop);
}
- //property_t<size_t>* mtu_out = &_mtu_out.back();
add_property_resolver({&_mtu_out[i]}, std::move(mtu_resolver_out),
[&mtu_out = _mtu_out[i], i, this]() {
@@ -105,10 +118,32 @@ void rfnoc_tx_streamer::connect_channel(
const size_t mtu = xport->get_max_payload_size();
set_property<size_t>(PROP_KEY_MTU, mtu, {res_source_info::OUTPUT_EDGE, channel});
+ xport->set_enqueue_async_msg_fn(
+ [this, channel](async_metadata_t::event_code_t event_code, bool has_tsf, uint64_t tsf) {
+ async_metadata_t md;
+ md.channel = channel;
+ md.event_code = event_code;
+ md.has_time_spec = has_tsf;
+
+ if (has_tsf) {
+ md.time_spec = time_spec_t::from_ticks(tsf, get_tick_rate());
+ }
+
+ this->_async_msg_queue->enqueue(md);
+ });
+
tx_streamer_impl<chdr_tx_data_xport>::connect_channel(channel, std::move(xport));
}
-void rfnoc_tx_streamer::_register_props(const size_t chan, const std::string& otw_format)
+bool rfnoc_tx_streamer::recv_async_msg(uhd::async_metadata_t& async_metadata,
+ double timeout)
+{
+ const auto timeout_ms = static_cast<uint64_t>(timeout * 1000);
+ return _async_msg_queue->recv_async_msg(async_metadata, timeout_ms);
+}
+
+void rfnoc_tx_streamer::_register_props(const size_t chan,
+ const std::string& otw_format)
{
// Create actual properties and store them
_scaling_out.push_back(property_t<double>(
@@ -137,27 +172,45 @@ void rfnoc_tx_streamer::_register_props(const size_t chan, const std::string& ot
register_property(mtu_out);
// Add resolvers
- add_property_resolver({scaling_out}, {},
- [&scaling_out = *scaling_out, chan, this]() {
+ add_property_resolver(
+ {scaling_out}, {}, [& scaling_out = *scaling_out, chan, this]() {
RFNOC_LOG_TRACE("Calling resolver for `scaling_out'@" << chan);
if (scaling_out.is_valid()) {
this->set_scale_factor(chan, 32767.0 / scaling_out.get());
}
});
- add_property_resolver({samp_rate_out}, {},
- [&samp_rate_out = *samp_rate_out, chan, this]() {
+ add_property_resolver(
+ {samp_rate_out}, {}, [& samp_rate_out = *samp_rate_out, chan, this]() {
RFNOC_LOG_TRACE("Calling resolver for `samp_rate_out'@" << chan);
if (samp_rate_out.is_valid()) {
this->set_samp_rate(samp_rate_out.get());
}
});
- add_property_resolver({tick_rate_out}, {},
- [&tick_rate_out = *tick_rate_out, chan, this]() {
+ add_property_resolver(
+ {tick_rate_out}, {}, [& tick_rate_out = *tick_rate_out, chan, this]() {
RFNOC_LOG_TRACE("Calling resolver for `tick_rate_out'@" << chan);
if (tick_rate_out.is_valid()) {
this->set_tick_rate(tick_rate_out.get());
}
});
}
+
+void rfnoc_tx_streamer::_handle_tx_event_action(
+ const res_source_info& src, tx_event_action_info::sptr tx_event_action)
+{
+ UHD_ASSERT_THROW(src.type == res_source_info::OUTPUT_EDGE);
+
+ uhd::async_metadata_t md;
+ md.event_code = tx_event_action->event_code;
+ md.channel = src.instance;
+ md.has_time_spec = tx_event_action->has_tsf;
+
+ if (md.has_time_spec) {
+ md.time_spec = time_spec_t::from_ticks(tx_event_action->tsf, get_tick_rate());
+ }
+
+ RFNOC_LOG_TRACE("Pushing metadata onto tx async msg queue, channel " << md.channel);
+ _async_msg_queue->enqueue(md);
+}