aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/transport/super_recv_packet_handler.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/transport/super_recv_packet_handler.hpp')
-rw-r--r--host/lib/transport/super_recv_packet_handler.hpp180
1 files changed, 144 insertions, 36 deletions
diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp
index 5080182d6..5fdf2594d 100644
--- a/host/lib/transport/super_recv_packet_handler.hpp
+++ b/host/lib/transport/super_recv_packet_handler.hpp
@@ -39,6 +39,13 @@
#include <iostream>
#include <vector>
+// Included for debugging
+#ifdef UHD_TXRX_DEBUG_PRINTS
+#include <boost/format.hpp>
+#include <boost/thread/thread.hpp>
+#include "boost/date_time/posix_time/posix_time.hpp"
+#endif
+
namespace uhd{ namespace transport{ namespace sph{
UHD_INLINE boost::uint32_t get_context_code(
@@ -76,6 +83,10 @@ public:
_queue_error_for_next_call(false),
_buffers_infos_index(0)
{
+ #ifdef ERROR_INJECT_DROPPED_PACKETS
+ recvd_packets = 0;
+ #endif
+
this->resize(size);
set_alignment_failure_threshold(1000);
}
@@ -142,6 +153,32 @@ public:
}
/*!
+ * Flush all transports in the streamer:
+ * This calls into get_and_process_single_packet(),
+ * so the sequence and flow control are handled.
+ * However, the packet payload is discarded.
+ */
+ void flush_all(const double timeout = 0.0)
+ {
+ increment_buffer_info(); //increment to next buffer
+
+ for (size_t i = 0; i < _props.size(); i++)
+ {
+ while (true) //while (_props.at(i).get_buff(timeout));
+ {
+ //receive a single packet from the transport
+ try
+ {
+ if (get_and_process_single_packet(i,
+ get_prev_buffer_info(),
+ get_curr_buffer_info(),
+ timeout) == PACKET_TIMEOUT_ERROR) break;
+ }catch(...){}
+ }
+ }
+ }
+
+ /*!
* Set the function to handle flow control
* \param xport_chan which transport channel
* \param handle_flowctrl the callback function
@@ -213,7 +250,12 @@ public:
buffs, nsamps_per_buff, metadata, timeout
);
- if (one_packet) return accum_num_samps;
+ if (one_packet){
+#ifdef UHD_TXRX_DEBUG_PRINTS
+ dbg_gather_data(nsamps_per_buff, accum_num_samps, metadata, timeout, one_packet);
+#endif
+ return accum_num_samps;
+ }
//first recv had an error code set, return immediately
if (metadata.error_code != rx_metadata_t::ERROR_CODE_NONE) return accum_num_samps;
@@ -232,11 +274,13 @@ public:
}
accum_num_samps += num_samps;
}
+#ifdef UHD_TXRX_DEBUG_PRINTS
+ dbg_gather_data(nsamps_per_buff, accum_num_samps, metadata, timeout, one_packet);
+#endif
return accum_num_samps;
}
private:
-
vrt_unpacker_type _vrt_unpacker;
size_t _header_offset_words32;
double _tick_rate, _samp_rate;
@@ -264,6 +308,13 @@ private:
//! information stored for a received buffer
struct per_buffer_info_type{
+ void reset()
+ {
+ buff.reset();
+ vrt_hdr = NULL;
+ time = time_spec_t(0.0);
+ copy_buff = NULL;
+ }
managed_recv_buffer::sptr buff;
const boost::uint32_t *vrt_hdr;
vrt::if_packet_info_t ifpi;
@@ -280,6 +331,17 @@ private:
data_bytes_to_copy(0),
fragment_offset_in_samps(0)
{/* NOP */}
+ void reset()
+ {
+ indexes_todo.set();
+ alignment_time = time_spec_t(0.0);
+ alignment_time_valid = false;
+ data_bytes_to_copy = 0;
+ fragment_offset_in_samps = 0;
+ metadata.reset();
+ for (size_t i = 0; i < size(); i++)
+ at(i).reset();
+ }
boost::dynamic_bitset<> indexes_todo; //used in alignment logic
time_spec_t alignment_time; //used in alignment logic
bool alignment_time_valid; //used in alignment logic
@@ -305,6 +367,10 @@ private:
PACKET_SEQUENCE_ERROR
};
+ #ifdef ERROR_INJECT_DROPPED_PACKETS
+ int recvd_packets;
+ #endif
+
/*******************************************************************
* Get and process a single packet from the transport:
* Receive a single packet at the given index.
@@ -322,6 +388,16 @@ private:
buff = _props[index].get_buff(timeout);
if (buff.get() == NULL) return PACKET_TIMEOUT_ERROR;
+ #ifdef ERROR_INJECT_DROPPED_PACKETS
+ if (++recvd_packets > 1000)
+ {
+ recvd_packets = 0;
+ buff.reset();
+ buff = _props[index].get_buff(timeout);
+ if (buff.get() == NULL) return PACKET_TIMEOUT_ERROR;
+ }
+ #endif
+
//bounds check before extract
size_t num_packet_words32 = buff->size()/sizeof(boost::uint32_t);
if (num_packet_words32 <= _header_offset_words32){
@@ -339,7 +415,7 @@ private:
//handle flow control
if (_props[index].handle_flowctrl)
{
- if ((info.ifpi.packet_count % _props[index].fc_update_window/2) == 0)
+ if ((info.ifpi.packet_count % _props[index].fc_update_window) == 0)
{
_props[index].handle_flowctrl(info.ifpi.packet_count);
}
@@ -411,7 +487,10 @@ private:
******************************************************************/
UHD_INLINE void get_aligned_buffs(double timeout){
+ get_prev_buffer_info().reset(); // no longer need the previous info - reset it for future use
+
increment_buffer_info(); //increment to next buffer
+
buffers_info_type &prev_info = get_prev_buffer_info();
buffers_info_type &curr_info = get_curr_buffer_info();
buffers_info_type &next_info = get_next_buffer_info();
@@ -440,12 +519,6 @@ private:
"The receive packet handler caught an exception.\n%s"
) % e.what() << std::endl;
std::swap(curr_info, next_info); //save progress from curr -> next
- curr_info.metadata.has_time_spec = false;
- curr_info.metadata.time_spec = time_spec_t(0.0);
- curr_info.metadata.more_fragments = false;
- curr_info.metadata.fragment_offset = 0;
- curr_info.metadata.start_of_burst = false;
- curr_info.metadata.end_of_burst = false;
curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_BAD_PACKET;
return;
}
@@ -470,10 +543,6 @@ private:
std::swap(curr_info, next_info); //save progress from curr -> next
curr_info.metadata.has_time_spec = next_info[index].ifpi.has_tsf;
curr_info.metadata.time_spec = next_info[index].time;
- curr_info.metadata.more_fragments = false;
- curr_info.metadata.fragment_offset = 0;
- curr_info.metadata.start_of_burst = false;
- curr_info.metadata.end_of_burst = false;
curr_info.metadata.error_code = rx_metadata_t::error_code_t(get_context_code(next_info[index].vrt_hdr, next_info[index].ifpi));
if (curr_info.metadata.error_code == rx_metadata_t::ERROR_CODE_OVERFLOW){
_props[index].handle_overflow();
@@ -483,12 +552,6 @@ private:
case PACKET_TIMEOUT_ERROR:
std::swap(curr_info, next_info); //save progress from curr -> next
- curr_info.metadata.has_time_spec = false;
- curr_info.metadata.time_spec = time_spec_t(0.0);
- curr_info.metadata.more_fragments = false;
- curr_info.metadata.fragment_offset = 0;
- curr_info.metadata.start_of_burst = false;
- curr_info.metadata.end_of_burst = false;
curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_TIMEOUT;
return;
@@ -498,10 +561,7 @@ private:
curr_info.metadata.has_time_spec = prev_info.metadata.has_time_spec;
curr_info.metadata.time_spec = prev_info.metadata.time_spec + time_spec_t::from_ticks(
prev_info[index].ifpi.num_payload_words32*sizeof(boost::uint32_t)/_bytes_per_otw_item, _samp_rate);
- curr_info.metadata.more_fragments = false;
- curr_info.metadata.fragment_offset = 0;
- curr_info.metadata.start_of_burst = false;
- curr_info.metadata.end_of_burst = false;
+ curr_info.metadata.out_of_sequence = true;
curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW;
UHD_MSG(fastpath) << "D";
return;
@@ -516,12 +576,6 @@ private:
"However, a timestamp match could not be determined.\n"
) % iterations << std::endl;
std::swap(curr_info, next_info); //save progress from curr -> next
- curr_info.metadata.has_time_spec = false;
- curr_info.metadata.time_spec = time_spec_t(0.0);
- curr_info.metadata.more_fragments = false;
- curr_info.metadata.fragment_offset = 0;
- curr_info.metadata.start_of_burst = false;
- curr_info.metadata.end_of_burst = false;
curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_ALIGNMENT;
_props[index].handle_overflow();
return;
@@ -554,13 +608,8 @@ private:
const size_t buffer_offset_bytes = 0
){
//get the next buffer if the current one has expired
- if (get_curr_buffer_info().data_bytes_to_copy == 0){
-
- //reset current buffer info members for reuse
- get_curr_buffer_info().fragment_offset_in_samps = 0;
- get_curr_buffer_info().alignment_time_valid = false;
- get_curr_buffer_info().indexes_todo.set();
-
+ if (get_curr_buffer_info().data_bytes_to_copy == 0)
+ {
//perform receive with alignment logic
get_aligned_buffs(timeout);
}
@@ -641,6 +690,65 @@ private:
size_t _convert_buffer_offset_bytes;
size_t _convert_bytes_to_copy;
+ /*
+ * This last section is only for debugging purposes.
+ * It causes a lot of prints to stderr which can be piped to a file.
+ * Gathered data can be used to post process it with external tools.
+ */
+#ifdef UHD_TXRX_DEBUG_PRINTS
+ struct dbg_recv_stat_t {
+ dbg_recv_stat_t(long wc, size_t nspb, size_t nsr, uhd::rx_metadata_t md, double to, bool op, double rate):
+ wallclock(wc), nsamps_per_buff(nspb), nsamps_recv(nsr), metadata(md), timeout(to), one_packet(op), samp_rate(rate)
+ {}
+ long wallclock;
+ size_t nsamps_per_buff;
+ size_t nsamps_recv;
+ uhd::rx_metadata_t metadata;
+ double timeout;
+ bool one_packet;
+ double samp_rate;
+ // Create a formatted print line for all the info gathered in this struct.
+ std::string print_line() {
+ boost::format fmt("recv,%ld,%f,%i,%i,%s,%i,%s,%s,%s,%i,%s,%ld");
+ fmt % wallclock;
+ fmt % timeout % (int)nsamps_per_buff % (int) nsamps_recv;
+ fmt % (one_packet ? "true":"false");
+ fmt % metadata.error_code;
+ fmt % (metadata.start_of_burst ? "true":"false") % (metadata.end_of_burst ? "true":"false");
+ fmt % (metadata.more_fragments ? "true":"false") % (int)metadata.fragment_offset;
+ fmt % (metadata.has_time_spec ? "true":"false") % metadata.time_spec.to_ticks(samp_rate);
+ return fmt.str();
+ }
+ };
+
+ void dbg_gather_data(const size_t nsamps_per_buff, const size_t nsamps_recv,
+ uhd::rx_metadata_t &metadata, const double timeout,
+ const bool one_packet,
+ bool dbg_print_directly = true
+ )
+ {
+ // Initialize a struct with all available data. It can return a formatted string with all infos if wanted.
+ dbg_recv_stat_t data(boost::get_system_time().time_of_day().total_microseconds(),
+ nsamps_per_buff,
+ nsamps_recv,
+ metadata,
+ timeout,
+ one_packet,
+ _samp_rate
+ );
+ if(dbg_print_directly) {
+ dbg_print_err(data.print_line());
+ }
+ }
+
+
+
+ void dbg_print_err(std::string msg) {
+ std::string dbg_prefix("super_recv_packet_handler,");
+ msg = dbg_prefix + msg;
+ fprintf(stderr, "%s\n", msg.c_str());
+ }
+#endif
};
class recv_packet_streamer : public recv_packet_handler, public rx_streamer{