aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp
diff options
context:
space:
mode:
authorAaron Rossetto <aaron.rossetto@ni.com>2019-09-27 14:56:25 -0500
committerMartin Braun <martin.braun@ettus.com>2019-11-26 12:21:32 -0800
commit2f97f8bd0167d4179427efa8a955046fbf417e91 (patch)
tree255c660b8bdff86047937a75a0f3b3798b1da73b /host/lib/include/uhdlib/transport/tx_streamer_impl.hpp
parent41f142050fb39ad533f82256b574b5c08c160bc1 (diff)
downloaduhd-2f97f8bd0167d4179427efa8a955046fbf417e91.tar.gz
uhd-2f97f8bd0167d4179427efa8a955046fbf417e91.tar.bz2
uhd-2f97f8bd0167d4179427efa8a955046fbf417e91.zip
transport: Implement eov indications for Rx and Tx streams
Diffstat (limited to 'host/lib/include/uhdlib/transport/tx_streamer_impl.hpp')
-rw-r--r--host/lib/include/uhdlib/transport/tx_streamer_impl.hpp215
1 files changed, 174 insertions, 41 deletions
diff --git a/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp
index fa84026fe..c594dd530 100644
--- a/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp
+++ b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp
@@ -60,6 +60,41 @@ private:
uhd::tx_metadata_t _metadata_cache;
};
+class tx_eov_data_wrapper
+{
+public:
+ tx_eov_data_wrapper(const uhd::tx_metadata_t& metadata)
+ : _eov_positions(metadata.eov_positions)
+ , _eov_positions_size(metadata.eov_positions_size)
+ , _remaining(metadata.eov_positions_size)
+ , _read_pos(0)
+ {
+ }
+
+ UHD_FORCE_INLINE size_t* data() const
+ {
+ return _eov_positions;
+ }
+
+ UHD_FORCE_INLINE size_t remaining() const
+ {
+ return _remaining;
+ }
+
+ UHD_FORCE_INLINE size_t pop_front()
+ {
+ assert(_eov_positions && _remaining > 0);
+ _remaining--;
+ return _eov_positions[_read_pos++];
+ }
+
+private:
+ size_t* _eov_positions;
+ size_t _eov_positions_size;
+ size_t _remaining;
+ size_t _read_pos;
+};
+
} // namespace detail
/*!
@@ -125,60 +160,157 @@ public:
_metadata_cache.check(metadata);
- const int32_t timeout_ms = static_cast<int32_t>(timeout * 1000);
+ const bool eob_on_last_packet = metadata.end_of_burst;
- if (nsamps_per_buff == 0) {
- // Send requests with no samples are handled here, such as end of
- // burst. Send packets need to have at least one sample based on the
- // chdr specification, so we use _zero_buffs here.
- _send_one_packet(_zero_buffs.data(),
- 0, // buffer offset
- 1, // num samples
- metadata,
- timeout_ms);
+ const int32_t timeout_ms = static_cast<int32_t>(timeout * 1000);
- return 0;
- } else if (nsamps_per_buff <= _spp) {
- return _send_one_packet(buffs, 0, nsamps_per_buff, metadata, timeout_ms);
+ detail::tx_eov_data_wrapper eov_positions(metadata);
+
+ // If there are EOVs specified in the metadata, it will be necessary
+ // to break up the packet sends based on where the EOVs should be
+ // generated in the sequence of packets.
+ //
+ // `nsamps_to_send_remaining` represents the total number of
+ // samples remaining to send to fulfill the caller's request.
+ size_t nsamps_to_send_remaining = nsamps_per_buff;
+
+ // `nsamps_to_send` represents a subset of the total number of
+ // samples to send based on whether or not the caller's metadata
+ // specifies EOV positions.
+ // * If there are no EOVs, it represents the entire send request
+ // made by the caller. It may be broken up into chunks no larger
+ // than _spp later on in the function, but it will not be broken up
+ // due to EOV. There will only be one iteration through the do/
+ // while loop.
+ // * If there are EOVs, `nsamps_to_send` represents the number of
+ // samples to send to get to the next EOV position. Again, it may
+ // be broken up into chunks no larger than _spp, but note that the
+ // final chunk will have EOV signalled in its header. There may be
+ // multiple iterations through the do/while loop to fulfill the
+ // caller's entire send request.
+ size_t nsamps_to_send;
+
+ // `num_samps_sent` is the return value from each individual call
+ // to `_send_one_packet()`.
+ size_t num_samps_sent = 0;
+
+ // `total_nsamps_sent` accumulates the total number of samples sent
+ // in each chunk, and is used to determine the offset within `buffs`
+ // to pass to `_send_one_packet()`.
+ size_t total_nsamps_sent = 0;
+
+ size_t last_eov_position = 0;
+ bool eov;
+
+ do {
+ if (eov_positions.data() and eov_positions.remaining() > 0) {
+ size_t next_eov_position = eov_positions.pop_front();
+ // Check basic requirements: EOV positions must be monotonically
+ // increasing
+ if (next_eov_position <= last_eov_position) {
+ throw uhd::value_error("Invalid EOV position specified "
+ "(violates eov_pos[n] > eov_pos[n-1])");
+ }
+ // EOV position must be within the range of the samples written
+ if (next_eov_position > nsamps_per_buff) {
+ throw uhd::value_error("Invalid EOV position specified "
+ "(violates eov_pos[n] <= nsamps_per_buff)");
+ }
+ nsamps_to_send = next_eov_position - last_eov_position;
+ eov = true;
+ } else {
+ // No EOVs, or the EOV position list has been exhausted:
+ // simply send the remaining samples
+ nsamps_to_send = nsamps_to_send_remaining;
+ eov = false;
+ }
- } else {
- size_t total_num_samps_sent = 0;
- const bool eob = metadata.end_of_burst;
- metadata.end_of_burst = false;
+ if (nsamps_to_send == 0) {
+ // Send requests with no samples are handled here, such as end of
+ // burst. Send packets need to have at least one sample based on the
+ // chdr specification, so we use _zero_buffs here.
+ _send_one_packet(_zero_buffs.data(),
+ 0, // buffer offset
+ 1, // num samples
+ metadata,
+ false,
+ timeout_ms);
- const size_t num_fragments = (nsamps_per_buff - 1) / _spp;
- const size_t final_length = ((nsamps_per_buff - 1) % _spp) + 1;
+ return 0;
- for (size_t i = 0; i < num_fragments; i++) {
- const size_t num_samps_sent = _send_one_packet(
- buffs, total_num_samps_sent, _spp, metadata, timeout_ms);
+ } else if (nsamps_to_send <= _spp) {
+ // If last packet, apply saved EOB state to metadata
+ metadata.end_of_burst =
+ (eob_on_last_packet and nsamps_to_send == nsamps_to_send_remaining);
- total_num_samps_sent += num_samps_sent;
+ num_samps_sent = _send_one_packet(
+ buffs, total_nsamps_sent, nsamps_to_send, metadata, eov, timeout_ms);
- if (num_samps_sent == 0) {
- return total_num_samps_sent;
+ metadata.start_of_burst = false;
+ } else {
+ // Note: since `nsamps_to_send` is guaranteed to be > _spp
+ // if the code reaches this else clause, `num_fragments` will
+ // always be at least 1.
+ const size_t num_fragments = (nsamps_to_send - 1) / _spp;
+ const size_t final_length = ((nsamps_to_send - 1) % _spp) + 1;
+
+ metadata.end_of_burst = false;
+
+ for (size_t i = 0; i < num_fragments; i++) {
+ num_samps_sent = _send_one_packet(
+ buffs, total_nsamps_sent, _spp, metadata, false, timeout_ms);
+
+ // Advance sample accumulator and decrement remaining
+ // samples for this segment
+ total_nsamps_sent += num_samps_sent;
+ nsamps_to_send_remaining -= num_samps_sent;
+
+ if (num_samps_sent == 0) {
+ return total_nsamps_sent;
+ }
+
+ // Setup timespec for the next fragment
+ if (metadata.has_time_spec) {
+ metadata.time_spec =
+ metadata.time_spec
+ + time_spec_t::from_ticks(num_samps_sent, _samp_rate);
+ }
+
+ metadata.start_of_burst = false;
}
- // Setup timespec for the next fragment
- if (metadata.has_time_spec) {
- metadata.time_spec =
- metadata.time_spec
- + time_spec_t::from_ticks(num_samps_sent, _samp_rate);
- }
+ // Send the final fragment
+ metadata.end_of_burst =
+ (eob_on_last_packet and final_length == nsamps_to_send_remaining);
- metadata.start_of_burst = false;
+ num_samps_sent = _send_one_packet(
+ buffs, total_nsamps_sent, final_length, metadata, eov, timeout);
}
- // Send the final fragment
- metadata.end_of_burst = eob;
+ // Advance sample accumulator and decrement remaining samples
+ total_nsamps_sent += num_samps_sent;
+ nsamps_to_send_remaining -= num_samps_sent;
- size_t nsamps_sent =
- total_num_samps_sent
- + _send_one_packet(
- buffs, total_num_samps_sent, final_length, metadata, timeout);
+ // Loop exit condition: return from `_send_one_packet()` indicates
+ // an error
+ if (num_samps_sent == 0) {
+ break;
+ }
- return nsamps_sent;
- }
+ // If there are more samples to be sent, thus requiring another
+ // trip around the do/while loop, update the timespec in the
+ // metadata for the next fragment (if desired)
+ if (nsamps_to_send_remaining > 0 and metadata.has_time_spec) {
+ metadata.time_spec =
+ metadata.time_spec
+ + time_spec_t::from_ticks(num_samps_sent, _samp_rate);
+ }
+
+ last_eov_position = total_nsamps_sent;
+
+ } while (nsamps_to_send_remaining > 0);
+
+ return total_nsamps_sent;
}
protected:
@@ -233,10 +365,11 @@ private:
const size_t buffer_offset_in_samps,
const size_t num_samples,
const tx_metadata_t& metadata,
+ const bool eov,
const int32_t timeout_ms)
{
if (!_zero_copy_streamer.get_send_buffs(
- _out_buffs, num_samples, metadata, timeout_ms)) {
+ _out_buffs, num_samples, metadata, eov, timeout_ms)) {
return 0;
}