diff options
author | Aaron Rossetto <aaron.rossetto@ni.com> | 2019-09-27 14:56:25 -0500 |
---|---|---|
committer | Martin Braun <martin.braun@ettus.com> | 2019-11-26 12:21:32 -0800 |
commit | 2f97f8bd0167d4179427efa8a955046fbf417e91 (patch) | |
tree | 255c660b8bdff86047937a75a0f3b3798b1da73b /host/lib/include/uhdlib/transport/tx_streamer_impl.hpp | |
parent | 41f142050fb39ad533f82256b574b5c08c160bc1 (diff) | |
download | uhd-2f97f8bd0167d4179427efa8a955046fbf417e91.tar.gz uhd-2f97f8bd0167d4179427efa8a955046fbf417e91.tar.bz2 uhd-2f97f8bd0167d4179427efa8a955046fbf417e91.zip |
transport: Implement eov indications for Rx and Tx streams
Diffstat (limited to 'host/lib/include/uhdlib/transport/tx_streamer_impl.hpp')
-rw-r--r-- | host/lib/include/uhdlib/transport/tx_streamer_impl.hpp | 215 |
1 files changed, 174 insertions, 41 deletions
diff --git a/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp index fa84026fe..c594dd530 100644 --- a/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp +++ b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp @@ -60,6 +60,41 @@ private: uhd::tx_metadata_t _metadata_cache; }; +class tx_eov_data_wrapper +{ +public: + tx_eov_data_wrapper(const uhd::tx_metadata_t& metadata) + : _eov_positions(metadata.eov_positions) + , _eov_positions_size(metadata.eov_positions_size) + , _remaining(metadata.eov_positions_size) + , _read_pos(0) + { + } + + UHD_FORCE_INLINE size_t* data() const + { + return _eov_positions; + } + + UHD_FORCE_INLINE size_t remaining() const + { + return _remaining; + } + + UHD_FORCE_INLINE size_t pop_front() + { + assert(_eov_positions && _remaining > 0); + _remaining--; + return _eov_positions[_read_pos++]; + } + +private: + size_t* _eov_positions; + size_t _eov_positions_size; + size_t _remaining; + size_t _read_pos; +}; + } // namespace detail /*! @@ -125,60 +160,157 @@ public: _metadata_cache.check(metadata); - const int32_t timeout_ms = static_cast<int32_t>(timeout * 1000); + const bool eob_on_last_packet = metadata.end_of_burst; - if (nsamps_per_buff == 0) { - // Send requests with no samples are handled here, such as end of - // burst. Send packets need to have at least one sample based on the - // chdr specification, so we use _zero_buffs here. - _send_one_packet(_zero_buffs.data(), - 0, // buffer offset - 1, // num samples - metadata, - timeout_ms); + const int32_t timeout_ms = static_cast<int32_t>(timeout * 1000); - return 0; - } else if (nsamps_per_buff <= _spp) { - return _send_one_packet(buffs, 0, nsamps_per_buff, metadata, timeout_ms); + detail::tx_eov_data_wrapper eov_positions(metadata); + + // If there are EOVs specified in the metadata, it will be necessary + // to break up the packet sends based on where the EOVs should be + // generated in the sequence of packets. + // + // `nsamps_to_send_remaining` represents the total number of + // samples remaining to send to fulfill the caller's request. + size_t nsamps_to_send_remaining = nsamps_per_buff; + + // `nsamps_to_send` represents a subset of the total number of + // samples to send based on whether or not the caller's metadata + // specifies EOV positions. + // * If there are no EOVs, it represents the entire send request + // made by the caller. It may be broken up into chunks no larger + // than _spp later on in the function, but it will not be broken up + // due to EOV. There will only be one iteration through the do/ + // while loop. + // * If there are EOVs, `nsamps_to_send` represents the number of + // samples to send to get to the next EOV position. Again, it may + // be broken up into chunks no larger than _spp, but note that the + // final chunk will have EOV signalled in its header. There may be + // multiple iterations through the do/while loop to fulfill the + // caller's entire send request. + size_t nsamps_to_send; + + // `num_samps_sent` is the return value from each individual call + // to `_send_one_packet()`. + size_t num_samps_sent = 0; + + // `total_nsamps_sent` accumulates the total number of samples sent + // in each chunk, and is used to determine the offset within `buffs` + // to pass to `_send_one_packet()`. + size_t total_nsamps_sent = 0; + + size_t last_eov_position = 0; + bool eov; + + do { + if (eov_positions.data() and eov_positions.remaining() > 0) { + size_t next_eov_position = eov_positions.pop_front(); + // Check basic requirements: EOV positions must be monotonically + // increasing + if (next_eov_position <= last_eov_position) { + throw uhd::value_error("Invalid EOV position specified " + "(violates eov_pos[n] > eov_pos[n-1])"); + } + // EOV position must be within the range of the samples written + if (next_eov_position > nsamps_per_buff) { + throw uhd::value_error("Invalid EOV position specified " + "(violates eov_pos[n] <= nsamps_per_buff)"); + } + nsamps_to_send = next_eov_position - last_eov_position; + eov = true; + } else { + // No EOVs, or the EOV position list has been exhausted: + // simply send the remaining samples + nsamps_to_send = nsamps_to_send_remaining; + eov = false; + } - } else { - size_t total_num_samps_sent = 0; - const bool eob = metadata.end_of_burst; - metadata.end_of_burst = false; + if (nsamps_to_send == 0) { + // Send requests with no samples are handled here, such as end of + // burst. Send packets need to have at least one sample based on the + // chdr specification, so we use _zero_buffs here. + _send_one_packet(_zero_buffs.data(), + 0, // buffer offset + 1, // num samples + metadata, + false, + timeout_ms); - const size_t num_fragments = (nsamps_per_buff - 1) / _spp; - const size_t final_length = ((nsamps_per_buff - 1) % _spp) + 1; + return 0; - for (size_t i = 0; i < num_fragments; i++) { - const size_t num_samps_sent = _send_one_packet( - buffs, total_num_samps_sent, _spp, metadata, timeout_ms); + } else if (nsamps_to_send <= _spp) { + // If last packet, apply saved EOB state to metadata + metadata.end_of_burst = + (eob_on_last_packet and nsamps_to_send == nsamps_to_send_remaining); - total_num_samps_sent += num_samps_sent; + num_samps_sent = _send_one_packet( + buffs, total_nsamps_sent, nsamps_to_send, metadata, eov, timeout_ms); - if (num_samps_sent == 0) { - return total_num_samps_sent; + metadata.start_of_burst = false; + } else { + // Note: since `nsamps_to_send` is guaranteed to be > _spp + // if the code reaches this else clause, `num_fragments` will + // always be at least 1. + const size_t num_fragments = (nsamps_to_send - 1) / _spp; + const size_t final_length = ((nsamps_to_send - 1) % _spp) + 1; + + metadata.end_of_burst = false; + + for (size_t i = 0; i < num_fragments; i++) { + num_samps_sent = _send_one_packet( + buffs, total_nsamps_sent, _spp, metadata, false, timeout_ms); + + // Advance sample accumulator and decrement remaining + // samples for this segment + total_nsamps_sent += num_samps_sent; + nsamps_to_send_remaining -= num_samps_sent; + + if (num_samps_sent == 0) { + return total_nsamps_sent; + } + + // Setup timespec for the next fragment + if (metadata.has_time_spec) { + metadata.time_spec = + metadata.time_spec + + time_spec_t::from_ticks(num_samps_sent, _samp_rate); + } + + metadata.start_of_burst = false; } - // Setup timespec for the next fragment - if (metadata.has_time_spec) { - metadata.time_spec = - metadata.time_spec - + time_spec_t::from_ticks(num_samps_sent, _samp_rate); - } + // Send the final fragment + metadata.end_of_burst = + (eob_on_last_packet and final_length == nsamps_to_send_remaining); - metadata.start_of_burst = false; + num_samps_sent = _send_one_packet( + buffs, total_nsamps_sent, final_length, metadata, eov, timeout); } - // Send the final fragment - metadata.end_of_burst = eob; + // Advance sample accumulator and decrement remaining samples + total_nsamps_sent += num_samps_sent; + nsamps_to_send_remaining -= num_samps_sent; - size_t nsamps_sent = - total_num_samps_sent - + _send_one_packet( - buffs, total_num_samps_sent, final_length, metadata, timeout); + // Loop exit condition: return from `_send_one_packet()` indicates + // an error + if (num_samps_sent == 0) { + break; + } - return nsamps_sent; - } + // If there are more samples to be sent, thus requiring another + // trip around the do/while loop, update the timespec in the + // metadata for the next fragment (if desired) + if (nsamps_to_send_remaining > 0 and metadata.has_time_spec) { + metadata.time_spec = + metadata.time_spec + + time_spec_t::from_ticks(num_samps_sent, _samp_rate); + } + + last_eov_position = total_nsamps_sent; + + } while (nsamps_to_send_remaining > 0); + + return total_nsamps_sent; } protected: @@ -233,10 +365,11 @@ private: const size_t buffer_offset_in_samps, const size_t num_samples, const tx_metadata_t& metadata, + const bool eov, const int32_t timeout_ms) { if (!_zero_copy_streamer.get_send_buffs( - _out_buffs, num_samples, metadata, timeout_ms)) { + _out_buffs, num_samples, metadata, eov, timeout_ms)) { return 0; } |