diff options
Diffstat (limited to 'host/lib/transport/super_recv_packet_handler.hpp')
| -rw-r--r-- | host/lib/transport/super_recv_packet_handler.hpp | 205 |
1 files changed, 163 insertions, 42 deletions
diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp index 5080182d6..5c84327a4 100644 --- a/host/lib/transport/super_recv_packet_handler.hpp +++ b/host/lib/transport/super_recv_packet_handler.hpp @@ -39,6 +39,13 @@ #include <iostream> #include <vector> +// Included for debugging +#ifdef UHD_TXRX_DEBUG_PRINTS +#include <boost/format.hpp> +#include <boost/thread/thread.hpp> +#include "boost/date_time/posix_time/posix_time.hpp" +#endif + namespace uhd{ namespace transport{ namespace sph{ UHD_INLINE boost::uint32_t get_context_code( @@ -76,6 +83,10 @@ public: _queue_error_for_next_call(false), _buffers_infos_index(0) { + #ifdef ERROR_INJECT_DROPPED_PACKETS + recvd_packets = 0; + #endif + this->resize(size); set_alignment_failure_threshold(1000); } @@ -142,6 +153,16 @@ public: } /*! + * Flush all transports in the streamer: + * The packet payload is discarded. + */ + void flush_all(const double timeout = 0.0) + { + _flush_all(timeout); + return; + } + + /*! * Set the function to handle flow control * \param xport_chan which transport channel * \param handle_flowctrl the callback function @@ -213,7 +234,12 @@ public: buffs, nsamps_per_buff, metadata, timeout ); - if (one_packet) return accum_num_samps; + if (one_packet){ +#ifdef UHD_TXRX_DEBUG_PRINTS + dbg_gather_data(nsamps_per_buff, accum_num_samps, metadata, timeout, one_packet); +#endif + return accum_num_samps; + } //first recv had an error code set, return immediately if (metadata.error_code != rx_metadata_t::ERROR_CODE_NONE) return accum_num_samps; @@ -232,11 +258,13 @@ public: } accum_num_samps += num_samps; } +#ifdef UHD_TXRX_DEBUG_PRINTS + dbg_gather_data(nsamps_per_buff, accum_num_samps, metadata, timeout, one_packet); +#endif return accum_num_samps; } private: - vrt_unpacker_type _vrt_unpacker; size_t _header_offset_words32; double _tick_rate, _samp_rate; @@ -264,6 +292,13 @@ private: //! information stored for a received buffer struct per_buffer_info_type{ + void reset() + { + buff.reset(); + vrt_hdr = NULL; + time = time_spec_t(0.0); + copy_buff = NULL; + } managed_recv_buffer::sptr buff; const boost::uint32_t *vrt_hdr; vrt::if_packet_info_t ifpi; @@ -280,6 +315,17 @@ private: data_bytes_to_copy(0), fragment_offset_in_samps(0) {/* NOP */} + void reset() + { + indexes_todo.set(); + alignment_time = time_spec_t(0.0); + alignment_time_valid = false; + data_bytes_to_copy = 0; + fragment_offset_in_samps = 0; + metadata.reset(); + for (size_t i = 0; i < size(); i++) + at(i).reset(); + } boost::dynamic_bitset<> indexes_todo; //used in alignment logic time_spec_t alignment_time; //used in alignment logic bool alignment_time_valid; //used in alignment logic @@ -305,6 +351,10 @@ private: PACKET_SEQUENCE_ERROR }; + #ifdef ERROR_INJECT_DROPPED_PACKETS + int recvd_packets; + #endif + /******************************************************************* * Get and process a single packet from the transport: * Receive a single packet at the given index. @@ -313,15 +363,25 @@ private: ******************************************************************/ UHD_INLINE packet_type get_and_process_single_packet( const size_t index, - buffers_info_type &prev_buffer_info, - buffers_info_type &curr_buffer_info, + per_buffer_info_type &prev_buffer_info, + per_buffer_info_type &curr_buffer_info, double timeout ){ //get a single packet from the transport layer - managed_recv_buffer::sptr &buff = curr_buffer_info[index].buff; + managed_recv_buffer::sptr &buff = curr_buffer_info.buff; buff = _props[index].get_buff(timeout); if (buff.get() == NULL) return PACKET_TIMEOUT_ERROR; + #ifdef ERROR_INJECT_DROPPED_PACKETS + if (++recvd_packets > 1000) + { + recvd_packets = 0; + buff.reset(); + buff = _props[index].get_buff(timeout); + if (buff.get() == NULL) return PACKET_TIMEOUT_ERROR; + } + #endif + //bounds check before extract size_t num_packet_words32 = buff->size()/sizeof(boost::uint32_t); if (num_packet_words32 <= _header_offset_words32){ @@ -329,7 +389,7 @@ private: } //extract packet info - per_buffer_info_type &info = curr_buffer_info[index]; + per_buffer_info_type &info = curr_buffer_info; info.ifpi.num_packet_words32 = num_packet_words32 - _header_offset_words32; info.vrt_hdr = buff->cast<const boost::uint32_t *>() + _header_offset_words32; _vrt_unpacker(info.vrt_hdr, info.ifpi); @@ -339,7 +399,7 @@ private: //handle flow control if (_props[index].handle_flowctrl) { - if ((info.ifpi.packet_count % _props[index].fc_update_window/2) == 0) + if ((info.ifpi.packet_count % _props[index].fc_update_window) == 0) { _props[index].handle_flowctrl(info.ifpi.packet_count); } @@ -366,7 +426,7 @@ private: #endif //3) check for out of order timestamps - if (info.ifpi.has_tsf and prev_buffer_info[index].time > info.time){ + if (info.ifpi.has_tsf and prev_buffer_info.time > info.time){ return PACKET_TIMESTAMP_ERROR; } @@ -374,6 +434,33 @@ private: return PACKET_IF_DATA; } + void _flush_all(double timeout) + { + for (size_t i = 0; i < _props.size(); i++) + { + per_buffer_info_type prev_buffer_info, curr_buffer_info; + while (true) + { + //receive a single packet from the transport + try + { + // call into get_and_process_single_packet() + // to make sure flow control is handled + if (get_and_process_single_packet( + i, + prev_buffer_info, + curr_buffer_info, + timeout) == PACKET_TIMEOUT_ERROR) break; + } catch(...){} + prev_buffer_info = curr_buffer_info; + curr_buffer_info.reset(); + } + } + get_prev_buffer_info().reset(); + get_curr_buffer_info().reset(); + get_next_buffer_info().reset(); + } + /******************************************************************* * Alignment check: * Check the received packet for alignment and mark accordingly. @@ -411,7 +498,10 @@ private: ******************************************************************/ UHD_INLINE void get_aligned_buffs(double timeout){ + get_prev_buffer_info().reset(); // no longer need the previous info - reset it for future use + increment_buffer_info(); //increment to next buffer + buffers_info_type &prev_info = get_prev_buffer_info(); buffers_info_type &curr_info = get_curr_buffer_info(); buffers_info_type &next_info = get_next_buffer_info(); @@ -430,7 +520,7 @@ private: //receive a single packet from the transport try{ packet = get_and_process_single_packet( - index, prev_info, curr_info, timeout + index, prev_info[index], curr_info[index], timeout ); } @@ -440,12 +530,6 @@ private: "The receive packet handler caught an exception.\n%s" ) % e.what() << std::endl; std::swap(curr_info, next_info); //save progress from curr -> next - curr_info.metadata.has_time_spec = false; - curr_info.metadata.time_spec = time_spec_t(0.0); - curr_info.metadata.more_fragments = false; - curr_info.metadata.fragment_offset = 0; - curr_info.metadata.start_of_burst = false; - curr_info.metadata.end_of_burst = false; curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_BAD_PACKET; return; } @@ -470,25 +554,17 @@ private: std::swap(curr_info, next_info); //save progress from curr -> next curr_info.metadata.has_time_spec = next_info[index].ifpi.has_tsf; curr_info.metadata.time_spec = next_info[index].time; - curr_info.metadata.more_fragments = false; - curr_info.metadata.fragment_offset = 0; - curr_info.metadata.start_of_burst = false; - curr_info.metadata.end_of_burst = false; curr_info.metadata.error_code = rx_metadata_t::error_code_t(get_context_code(next_info[index].vrt_hdr, next_info[index].ifpi)); if (curr_info.metadata.error_code == rx_metadata_t::ERROR_CODE_OVERFLOW){ + rx_metadata_t metadata = curr_info.metadata; _props[index].handle_overflow(); + curr_info.metadata = metadata; UHD_MSG(fastpath) << "O"; } return; case PACKET_TIMEOUT_ERROR: std::swap(curr_info, next_info); //save progress from curr -> next - curr_info.metadata.has_time_spec = false; - curr_info.metadata.time_spec = time_spec_t(0.0); - curr_info.metadata.more_fragments = false; - curr_info.metadata.fragment_offset = 0; - curr_info.metadata.start_of_burst = false; - curr_info.metadata.end_of_burst = false; curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_TIMEOUT; return; @@ -498,10 +574,7 @@ private: curr_info.metadata.has_time_spec = prev_info.metadata.has_time_spec; curr_info.metadata.time_spec = prev_info.metadata.time_spec + time_spec_t::from_ticks( prev_info[index].ifpi.num_payload_words32*sizeof(boost::uint32_t)/_bytes_per_otw_item, _samp_rate); - curr_info.metadata.more_fragments = false; - curr_info.metadata.fragment_offset = 0; - curr_info.metadata.start_of_burst = false; - curr_info.metadata.end_of_burst = false; + curr_info.metadata.out_of_sequence = true; curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW; UHD_MSG(fastpath) << "D"; return; @@ -516,12 +589,6 @@ private: "However, a timestamp match could not be determined.\n" ) % iterations << std::endl; std::swap(curr_info, next_info); //save progress from curr -> next - curr_info.metadata.has_time_spec = false; - curr_info.metadata.time_spec = time_spec_t(0.0); - curr_info.metadata.more_fragments = false; - curr_info.metadata.fragment_offset = 0; - curr_info.metadata.start_of_burst = false; - curr_info.metadata.end_of_burst = false; curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_ALIGNMENT; _props[index].handle_overflow(); return; @@ -554,13 +621,8 @@ private: const size_t buffer_offset_bytes = 0 ){ //get the next buffer if the current one has expired - if (get_curr_buffer_info().data_bytes_to_copy == 0){ - - //reset current buffer info members for reuse - get_curr_buffer_info().fragment_offset_in_samps = 0; - get_curr_buffer_info().alignment_time_valid = false; - get_curr_buffer_info().indexes_todo.set(); - + if (get_curr_buffer_info().data_bytes_to_copy == 0) + { //perform receive with alignment logic get_aligned_buffs(timeout); } @@ -641,6 +703,65 @@ private: size_t _convert_buffer_offset_bytes; size_t _convert_bytes_to_copy; + /* + * This last section is only for debugging purposes. + * It causes a lot of prints to stderr which can be piped to a file. + * Gathered data can be used to post process it with external tools. + */ +#ifdef UHD_TXRX_DEBUG_PRINTS + struct dbg_recv_stat_t { + dbg_recv_stat_t(long wc, size_t nspb, size_t nsr, uhd::rx_metadata_t md, double to, bool op, double rate): + wallclock(wc), nsamps_per_buff(nspb), nsamps_recv(nsr), metadata(md), timeout(to), one_packet(op), samp_rate(rate) + {} + long wallclock; + size_t nsamps_per_buff; + size_t nsamps_recv; + uhd::rx_metadata_t metadata; + double timeout; + bool one_packet; + double samp_rate; + // Create a formatted print line for all the info gathered in this struct. + std::string print_line() { + boost::format fmt("recv,%ld,%f,%i,%i,%s,%i,%s,%s,%s,%i,%s,%ld"); + fmt % wallclock; + fmt % timeout % (int)nsamps_per_buff % (int) nsamps_recv; + fmt % (one_packet ? "true":"false"); + fmt % metadata.error_code; + fmt % (metadata.start_of_burst ? "true":"false") % (metadata.end_of_burst ? "true":"false"); + fmt % (metadata.more_fragments ? "true":"false") % (int)metadata.fragment_offset; + fmt % (metadata.has_time_spec ? "true":"false") % metadata.time_spec.to_ticks(samp_rate); + return fmt.str(); + } + }; + + void dbg_gather_data(const size_t nsamps_per_buff, const size_t nsamps_recv, + uhd::rx_metadata_t &metadata, const double timeout, + const bool one_packet, + bool dbg_print_directly = true + ) + { + // Initialize a struct with all available data. It can return a formatted string with all infos if wanted. + dbg_recv_stat_t data(boost::get_system_time().time_of_day().total_microseconds(), + nsamps_per_buff, + nsamps_recv, + metadata, + timeout, + one_packet, + _samp_rate + ); + if(dbg_print_directly) { + dbg_print_err(data.print_line()); + } + } + + + + void dbg_print_err(std::string msg) { + std::string dbg_prefix("super_recv_packet_handler,"); + msg = dbg_prefix + msg; + fprintf(stderr, "%s\n", msg.c_str()); + } +#endif }; class recv_packet_streamer : public recv_packet_handler, public rx_streamer{ |
