diff options
Diffstat (limited to 'host/lib/transport/super_recv_packet_handler.hpp')
-rw-r--r-- | host/lib/transport/super_recv_packet_handler.hpp | 180 |
1 files changed, 144 insertions, 36 deletions
diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp index 5080182d6..5fdf2594d 100644 --- a/host/lib/transport/super_recv_packet_handler.hpp +++ b/host/lib/transport/super_recv_packet_handler.hpp @@ -39,6 +39,13 @@ #include <iostream> #include <vector> +// Included for debugging +#ifdef UHD_TXRX_DEBUG_PRINTS +#include <boost/format.hpp> +#include <boost/thread/thread.hpp> +#include "boost/date_time/posix_time/posix_time.hpp" +#endif + namespace uhd{ namespace transport{ namespace sph{ UHD_INLINE boost::uint32_t get_context_code( @@ -76,6 +83,10 @@ public: _queue_error_for_next_call(false), _buffers_infos_index(0) { + #ifdef ERROR_INJECT_DROPPED_PACKETS + recvd_packets = 0; + #endif + this->resize(size); set_alignment_failure_threshold(1000); } @@ -142,6 +153,32 @@ public: } /*! + * Flush all transports in the streamer: + * This calls into get_and_process_single_packet(), + * so the sequence and flow control are handled. + * However, the packet payload is discarded. + */ + void flush_all(const double timeout = 0.0) + { + increment_buffer_info(); //increment to next buffer + + for (size_t i = 0; i < _props.size(); i++) + { + while (true) //while (_props.at(i).get_buff(timeout)); + { + //receive a single packet from the transport + try + { + if (get_and_process_single_packet(i, + get_prev_buffer_info(), + get_curr_buffer_info(), + timeout) == PACKET_TIMEOUT_ERROR) break; + }catch(...){} + } + } + } + + /*! * Set the function to handle flow control * \param xport_chan which transport channel * \param handle_flowctrl the callback function @@ -213,7 +250,12 @@ public: buffs, nsamps_per_buff, metadata, timeout ); - if (one_packet) return accum_num_samps; + if (one_packet){ +#ifdef UHD_TXRX_DEBUG_PRINTS + dbg_gather_data(nsamps_per_buff, accum_num_samps, metadata, timeout, one_packet); +#endif + return accum_num_samps; + } //first recv had an error code set, return immediately if (metadata.error_code != rx_metadata_t::ERROR_CODE_NONE) return accum_num_samps; @@ -232,11 +274,13 @@ public: } accum_num_samps += num_samps; } +#ifdef UHD_TXRX_DEBUG_PRINTS + dbg_gather_data(nsamps_per_buff, accum_num_samps, metadata, timeout, one_packet); +#endif return accum_num_samps; } private: - vrt_unpacker_type _vrt_unpacker; size_t _header_offset_words32; double _tick_rate, _samp_rate; @@ -264,6 +308,13 @@ private: //! information stored for a received buffer struct per_buffer_info_type{ + void reset() + { + buff.reset(); + vrt_hdr = NULL; + time = time_spec_t(0.0); + copy_buff = NULL; + } managed_recv_buffer::sptr buff; const boost::uint32_t *vrt_hdr; vrt::if_packet_info_t ifpi; @@ -280,6 +331,17 @@ private: data_bytes_to_copy(0), fragment_offset_in_samps(0) {/* NOP */} + void reset() + { + indexes_todo.set(); + alignment_time = time_spec_t(0.0); + alignment_time_valid = false; + data_bytes_to_copy = 0; + fragment_offset_in_samps = 0; + metadata.reset(); + for (size_t i = 0; i < size(); i++) + at(i).reset(); + } boost::dynamic_bitset<> indexes_todo; //used in alignment logic time_spec_t alignment_time; //used in alignment logic bool alignment_time_valid; //used in alignment logic @@ -305,6 +367,10 @@ private: PACKET_SEQUENCE_ERROR }; + #ifdef ERROR_INJECT_DROPPED_PACKETS + int recvd_packets; + #endif + /******************************************************************* * Get and process a single packet from the transport: * Receive a single packet at the given index. @@ -322,6 +388,16 @@ private: buff = _props[index].get_buff(timeout); if (buff.get() == NULL) return PACKET_TIMEOUT_ERROR; + #ifdef ERROR_INJECT_DROPPED_PACKETS + if (++recvd_packets > 1000) + { + recvd_packets = 0; + buff.reset(); + buff = _props[index].get_buff(timeout); + if (buff.get() == NULL) return PACKET_TIMEOUT_ERROR; + } + #endif + //bounds check before extract size_t num_packet_words32 = buff->size()/sizeof(boost::uint32_t); if (num_packet_words32 <= _header_offset_words32){ @@ -339,7 +415,7 @@ private: //handle flow control if (_props[index].handle_flowctrl) { - if ((info.ifpi.packet_count % _props[index].fc_update_window/2) == 0) + if ((info.ifpi.packet_count % _props[index].fc_update_window) == 0) { _props[index].handle_flowctrl(info.ifpi.packet_count); } @@ -411,7 +487,10 @@ private: ******************************************************************/ UHD_INLINE void get_aligned_buffs(double timeout){ + get_prev_buffer_info().reset(); // no longer need the previous info - reset it for future use + increment_buffer_info(); //increment to next buffer + buffers_info_type &prev_info = get_prev_buffer_info(); buffers_info_type &curr_info = get_curr_buffer_info(); buffers_info_type &next_info = get_next_buffer_info(); @@ -440,12 +519,6 @@ private: "The receive packet handler caught an exception.\n%s" ) % e.what() << std::endl; std::swap(curr_info, next_info); //save progress from curr -> next - curr_info.metadata.has_time_spec = false; - curr_info.metadata.time_spec = time_spec_t(0.0); - curr_info.metadata.more_fragments = false; - curr_info.metadata.fragment_offset = 0; - curr_info.metadata.start_of_burst = false; - curr_info.metadata.end_of_burst = false; curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_BAD_PACKET; return; } @@ -470,10 +543,6 @@ private: std::swap(curr_info, next_info); //save progress from curr -> next curr_info.metadata.has_time_spec = next_info[index].ifpi.has_tsf; curr_info.metadata.time_spec = next_info[index].time; - curr_info.metadata.more_fragments = false; - curr_info.metadata.fragment_offset = 0; - curr_info.metadata.start_of_burst = false; - curr_info.metadata.end_of_burst = false; curr_info.metadata.error_code = rx_metadata_t::error_code_t(get_context_code(next_info[index].vrt_hdr, next_info[index].ifpi)); if (curr_info.metadata.error_code == rx_metadata_t::ERROR_CODE_OVERFLOW){ _props[index].handle_overflow(); @@ -483,12 +552,6 @@ private: case PACKET_TIMEOUT_ERROR: std::swap(curr_info, next_info); //save progress from curr -> next - curr_info.metadata.has_time_spec = false; - curr_info.metadata.time_spec = time_spec_t(0.0); - curr_info.metadata.more_fragments = false; - curr_info.metadata.fragment_offset = 0; - curr_info.metadata.start_of_burst = false; - curr_info.metadata.end_of_burst = false; curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_TIMEOUT; return; @@ -498,10 +561,7 @@ private: curr_info.metadata.has_time_spec = prev_info.metadata.has_time_spec; curr_info.metadata.time_spec = prev_info.metadata.time_spec + time_spec_t::from_ticks( prev_info[index].ifpi.num_payload_words32*sizeof(boost::uint32_t)/_bytes_per_otw_item, _samp_rate); - curr_info.metadata.more_fragments = false; - curr_info.metadata.fragment_offset = 0; - curr_info.metadata.start_of_burst = false; - curr_info.metadata.end_of_burst = false; + curr_info.metadata.out_of_sequence = true; curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW; UHD_MSG(fastpath) << "D"; return; @@ -516,12 +576,6 @@ private: "However, a timestamp match could not be determined.\n" ) % iterations << std::endl; std::swap(curr_info, next_info); //save progress from curr -> next - curr_info.metadata.has_time_spec = false; - curr_info.metadata.time_spec = time_spec_t(0.0); - curr_info.metadata.more_fragments = false; - curr_info.metadata.fragment_offset = 0; - curr_info.metadata.start_of_burst = false; - curr_info.metadata.end_of_burst = false; curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_ALIGNMENT; _props[index].handle_overflow(); return; @@ -554,13 +608,8 @@ private: const size_t buffer_offset_bytes = 0 ){ //get the next buffer if the current one has expired - if (get_curr_buffer_info().data_bytes_to_copy == 0){ - - //reset current buffer info members for reuse - get_curr_buffer_info().fragment_offset_in_samps = 0; - get_curr_buffer_info().alignment_time_valid = false; - get_curr_buffer_info().indexes_todo.set(); - + if (get_curr_buffer_info().data_bytes_to_copy == 0) + { //perform receive with alignment logic get_aligned_buffs(timeout); } @@ -641,6 +690,65 @@ private: size_t _convert_buffer_offset_bytes; size_t _convert_bytes_to_copy; + /* + * This last section is only for debugging purposes. + * It causes a lot of prints to stderr which can be piped to a file. + * Gathered data can be used to post process it with external tools. + */ +#ifdef UHD_TXRX_DEBUG_PRINTS + struct dbg_recv_stat_t { + dbg_recv_stat_t(long wc, size_t nspb, size_t nsr, uhd::rx_metadata_t md, double to, bool op, double rate): + wallclock(wc), nsamps_per_buff(nspb), nsamps_recv(nsr), metadata(md), timeout(to), one_packet(op), samp_rate(rate) + {} + long wallclock; + size_t nsamps_per_buff; + size_t nsamps_recv; + uhd::rx_metadata_t metadata; + double timeout; + bool one_packet; + double samp_rate; + // Create a formatted print line for all the info gathered in this struct. + std::string print_line() { + boost::format fmt("recv,%ld,%f,%i,%i,%s,%i,%s,%s,%s,%i,%s,%ld"); + fmt % wallclock; + fmt % timeout % (int)nsamps_per_buff % (int) nsamps_recv; + fmt % (one_packet ? "true":"false"); + fmt % metadata.error_code; + fmt % (metadata.start_of_burst ? "true":"false") % (metadata.end_of_burst ? "true":"false"); + fmt % (metadata.more_fragments ? "true":"false") % (int)metadata.fragment_offset; + fmt % (metadata.has_time_spec ? "true":"false") % metadata.time_spec.to_ticks(samp_rate); + return fmt.str(); + } + }; + + void dbg_gather_data(const size_t nsamps_per_buff, const size_t nsamps_recv, + uhd::rx_metadata_t &metadata, const double timeout, + const bool one_packet, + bool dbg_print_directly = true + ) + { + // Initialize a struct with all available data. It can return a formatted string with all infos if wanted. + dbg_recv_stat_t data(boost::get_system_time().time_of_day().total_microseconds(), + nsamps_per_buff, + nsamps_recv, + metadata, + timeout, + one_packet, + _samp_rate + ); + if(dbg_print_directly) { + dbg_print_err(data.print_line()); + } + } + + + + void dbg_print_err(std::string msg) { + std::string dbg_prefix("super_recv_packet_handler,"); + msg = dbg_prefix + msg; + fprintf(stderr, "%s\n", msg.c_str()); + } +#endif }; class recv_packet_streamer : public recv_packet_handler, public rx_streamer{ |