diff options
author | Ciro Nishiguchi <ciro.nishiguchi@ni.com> | 2019-05-23 20:38:07 -0500 |
---|---|---|
committer | Martin Braun <martin.braun@ettus.com> | 2019-11-26 11:49:29 -0800 |
commit | 75a090543b8fb8e7c875387eee6d3fe7227e4450 (patch) | |
tree | 2904b48607cc07158aa6b068ada35ab56c4da516 /host/lib/include/uhdlib/transport/tx_streamer_impl.hpp | |
parent | d8e9705bc6c34b8d015b56a76955ee2f15426bd8 (diff) | |
download | uhd-75a090543b8fb8e7c875387eee6d3fe7227e4450.tar.gz uhd-75a090543b8fb8e7c875387eee6d3fe7227e4450.tar.bz2 uhd-75a090543b8fb8e7c875387eee6d3fe7227e4450.zip |
rfnoc: add rx and tx transports, and amend rfnoc_graph
transports:
Transports build on I/O service and implements flow control and
sequence number checking.
The rx streamer subclass extends the streamer implementation to connect
it to the rfnoc graph. It receives configuration values from property
propagation and configures the streamer accordingly. It also implements
the issue_stream_cmd rx_streamer API method.
Add implementation of rx streamer creation and method to connect it to
an rfnoc block.
rfnoc_graph: Cache more connection info, clarify contract
Summary of changes:
- rfnoc_graph stores more information about static connections at the
beginning. Some search algorithms are replaced by simpler lookups.
- The contract for connect() was clarified. It is required to call
connect, even for static connections.
Diffstat (limited to 'host/lib/include/uhdlib/transport/tx_streamer_impl.hpp')
-rw-r--r-- | host/lib/include/uhdlib/transport/tx_streamer_impl.hpp | 307 |
1 files changed, 307 insertions, 0 deletions
diff --git a/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp new file mode 100644 index 000000000..60881dad2 --- /dev/null +++ b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp @@ -0,0 +1,307 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_TX_STREAMER_IMPL_HPP +#define INCLUDED_LIBUHD_TX_STREAMER_IMPL_HPP + +#include <uhd/config.hpp> +#include <uhd/convert.hpp> +#include <uhd/stream.hpp> +#include <uhd/types/metadata.hpp> +#include <uhd/utils/tasks.hpp> +#include <uhdlib/transport/tx_streamer_zero_copy.hpp> +#include <limits> +#include <vector> + +namespace uhd { namespace transport { + +namespace detail { + +/*! + * Cache of metadata for send calls with zero samples + * + * Metadata is cached when we get a send requesting a start of burst with no + * samples. It is applied here on the next call to send() that actually has + * samples to send. + */ +class tx_metadata_cache +{ +public: + //! Stores metadata in the cache + UHD_FORCE_INLINE void store(const tx_metadata_t& metadata) + { + _metadata_cache = metadata; + _cached_metadata = true; + } + + //! Checks for cached metadata + UHD_FORCE_INLINE void check(tx_metadata_t& metadata) + { + if (_cached_metadata) { + // Only use cached time_spec if metadata does not have one + if (!metadata.has_time_spec) { + metadata.has_time_spec = _metadata_cache.has_time_spec; + metadata.time_spec = _metadata_cache.time_spec; + } + metadata.start_of_burst = _metadata_cache.start_of_burst; + metadata.end_of_burst = _metadata_cache.end_of_burst; + _cached_metadata = false; + } + } + +private: + // Whether there is a cached metadata object + bool _cached_metadata = false; + + // Cached metadata value + uhd::tx_metadata_t _metadata_cache; +}; + +} // namespace detail + +/*! + * Implementation of tx streamer API + */ +template <typename transport_t> +class tx_streamer_impl : public tx_streamer +{ +public: + tx_streamer_impl(const size_t num_chans, const uhd::stream_args_t stream_args) + : _zero_copy_streamer(num_chans) + , _zero_buffs(num_chans, &_zero) + , _out_buffs(num_chans) + { + _setup_converters(num_chans, stream_args); + _zero_copy_streamer.set_bytes_per_item(_convert_info.bytes_per_otw_item); + _spp = stream_args.args.cast<size_t>("spp", _spp); + } + + void connect_channel(const size_t channel, typename transport_t::uptr xport) + { + const size_t max_pyld_size = xport->get_max_payload_size(); + _zero_copy_streamer.connect_channel(channel, std::move(xport)); + + // Set spp based on the transport frame size + const size_t xport_spp = max_pyld_size / _convert_info.bytes_per_otw_item; + _spp = std::min(_spp, xport_spp); + } + + size_t get_num_channels() const + { + return _zero_copy_streamer.get_num_channels(); + } + + size_t get_max_num_samps() const + { + return _spp; + } + + + /*! Get width of each over-the-wire item component. For complex items, + * returns the width of one component only (real or imaginary). + */ + size_t get_otw_item_comp_bit_width() const + { + return _convert_info.otw_item_bit_width; + } + + size_t send(const uhd::tx_streamer::buffs_type& buffs, + const size_t nsamps_per_buff, + const uhd::tx_metadata_t& metadata_, + const double timeout) + { + uhd::tx_metadata_t metadata(metadata_); + + if (nsamps_per_buff == 0 && metadata.start_of_burst) { + _metadata_cache.store(metadata); + return 0; + } + + _metadata_cache.check(metadata); + + const int32_t timeout_ms = static_cast<int32_t>(timeout * 1000); + + if (nsamps_per_buff == 0) { + // Send requests with no samples are handled here, such as end of + // burst. Send packets need to have at least one sample based on the + // chdr specification, so we use _zero_buffs here. + _send_one_packet(_zero_buffs.data(), + 0, // buffer offset + 1, // num samples + metadata, + timeout_ms); + + return 0; + } else if (nsamps_per_buff <= _spp) { + return _send_one_packet(buffs, 0, nsamps_per_buff, metadata, timeout_ms); + + } else { + size_t total_num_samps_sent = 0; + const bool eob = metadata.end_of_burst; + metadata.end_of_burst = false; + + const size_t num_fragments = (nsamps_per_buff - 1) / _spp; + const size_t final_length = ((nsamps_per_buff - 1) % _spp) + 1; + + for (size_t i = 0; i < num_fragments; i++) { + const size_t num_samps_sent = _send_one_packet( + buffs, total_num_samps_sent, _spp, metadata, timeout_ms); + + total_num_samps_sent += num_samps_sent; + + if (num_samps_sent == 0) { + return total_num_samps_sent; + } + + // Setup timespec for the next fragment + if (metadata.has_time_spec) { + metadata.time_spec = + metadata.time_spec + + time_spec_t::from_ticks(num_samps_sent, _samp_rate); + } + + metadata.start_of_burst = false; + } + + // Send the final fragment + metadata.end_of_burst = eob; + + size_t nsamps_sent = + total_num_samps_sent + + _send_one_packet( + buffs, total_num_samps_sent, final_length, metadata, timeout); + + return nsamps_sent; + } + } + + //! Implementation of rx_streamer API method + bool recv_async_msg( + uhd::async_metadata_t& /*async_metadata*/, double /*timeout = 0.1*/) + { + // TODO: implement me + return false; + } + +protected: + //! Configures scaling factor for conversion + void set_scale_factor(const size_t chan, const double scale_factor) + { + _converters[chan]->set_scalar(scale_factor); + } + + //! Configures sample rate for conversion of timestamp + void set_samp_rate(const double rate) + { + _samp_rate = rate; + } + + //! Configures tick rate for conversion of timestamp + void set_tick_rate(const double rate) + { + _zero_copy_streamer.set_tick_rate(rate); + } + +private: + //! Converter and associated item sizes + struct convert_info + { + size_t bytes_per_otw_item; + size_t bytes_per_cpu_item; + size_t otw_item_bit_width; + }; + + //! Convert samples for one channel and sends a packet + size_t _send_one_packet(const uhd::tx_streamer::buffs_type& buffs, + const size_t buffer_offset_in_samps, + const size_t num_samples, + const tx_metadata_t& metadata, + const int32_t timeout_ms) + { + if (!_zero_copy_streamer.get_send_buffs( + _out_buffs, num_samples, metadata, timeout_ms)) { + return 0; + } + + size_t byte_offset = buffer_offset_in_samps * _convert_info.bytes_per_cpu_item; + + for (size_t i = 0; i < get_num_channels(); i++) { + const void* input_ptr = static_cast<const uint8_t*>(buffs[i]) + byte_offset; + _converters[i]->conv(input_ptr, _out_buffs[i], num_samples); + + _zero_copy_streamer.release_send_buff(i); + } + + return num_samples; + } + + //! Create converters and initialize _bytes_per_cpu_item + void _setup_converters(const size_t num_chans, const uhd::stream_args_t stream_args) + { + // Note to code archaeologists: In the past, we had to also specify the + // endianness here, but that is no longer necessary because we can make + // the wire endianness match the host endianness. + convert::id_type id; + id.input_format = stream_args.cpu_format; + id.num_inputs = 1; + id.output_format = stream_args.otw_format + "_chdr"; + id.num_outputs = 1; + + auto starts_with = [](const std::string& s, const std::string v) { + return s.find(v) == 0; + }; + + const bool otw_is_complex = starts_with(stream_args.otw_format, "fc") + || starts_with(stream_args.otw_format, "sc"); + + convert_info info; + info.bytes_per_otw_item = convert::get_bytes_per_item(id.output_format); + info.bytes_per_cpu_item = convert::get_bytes_per_item(id.input_format); + + if (otw_is_complex) { + info.otw_item_bit_width = info.bytes_per_otw_item * 8 / 2; + } else { + info.otw_item_bit_width = info.bytes_per_otw_item * 8; + } + + _convert_info = info; + + for (size_t i = 0; i < num_chans; i++) { + _converters.push_back(convert::get_converter(id)()); + _converters.back()->set_scalar(32767.0); + } + } + + // Converter item sizes + convert_info _convert_info; + + // Converters + std::vector<uhd::convert::converter::sptr> _converters; + + // Manages frame buffers and packet info + tx_streamer_zero_copy<transport_t> _zero_copy_streamer; + + // Buffer used to handle send calls with no data + std::vector<const void*> _zero_buffs; + + const uint64_t _zero = 0; + + // Container for buffer pointers used in send method + std::vector<void*> _out_buffs; + + // Sample rate used to calculate metadata time_spec_t + double _samp_rate = 1.0; + + // Maximum number of samples per packet + size_t _spp = std::numeric_limits<std::size_t>::max(); + + // Metadata cache for send calls with no data + detail::tx_metadata_cache _metadata_cache; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_LIBUHD_TRANSPORT_TX_STREAMER_IMPL_HPP */ |