diff options
| author | Josh Blum <josh@joshknows.com> | 2011-06-12 19:46:06 -0700 | 
|---|---|---|
| committer | Josh Blum <josh@joshknows.com> | 2011-06-14 17:27:46 -0700 | 
| commit | c65a1f8e40960d5016788de2cc4775c9e603293a (patch) | |
| tree | 24a404212d7ee4aef3b34ac2b0df9b1e92841162 | |
| parent | 85ebb705fa567e8093aa68c0ad88996d434ed2bf (diff) | |
| download | uhd-c65a1f8e40960d5016788de2cc4775c9e603293a.tar.gz uhd-c65a1f8e40960d5016788de2cc4775c9e603293a.tar.bz2 uhd-c65a1f8e40960d5016788de2cc4775c9e603293a.zip  | |
uhd: supper packet handler support squashed
| -rwxr-xr-x | host/lib/transport/gen_vrt_if_packet.py | 44 | ||||
| -rw-r--r-- | host/lib/transport/super_recv_packet_handler.hpp | 637 | ||||
| -rw-r--r-- | host/lib/transport/super_send_packet_handler.hpp | 287 | ||||
| -rw-r--r-- | host/tests/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | host/tests/sph_recv_test.cpp | 715 | ||||
| -rw-r--r-- | host/tests/sph_send_test.cpp | 204 | 
6 files changed, 1876 insertions, 13 deletions
diff --git a/host/lib/transport/gen_vrt_if_packet.py b/host/lib/transport/gen_vrt_if_packet.py index 7440def6a..5f048d8c7 100755 --- a/host/lib/transport/gen_vrt_if_packet.py +++ b/host/lib/transport/gen_vrt_if_packet.py @@ -62,6 +62,8 @@ static pred_table_type get_pred_unpack_table(void){          if(vrt_hdr_word & $hex(0x3 << 22)) table[i] |= $hex($tsi_p);          if(vrt_hdr_word & $hex(0x3 << 20)) table[i] |= $hex($tsf_p);          if(vrt_hdr_word & $hex(0x1 << 26)) table[i] |= $hex($tlr_p); +        if(vrt_hdr_word & $hex(0x1 << 24)) table[i] |= $hex($eob_p); +        if(vrt_hdr_word & $hex(0x1 << 25)) table[i] |= $hex($sob_p);      }      return table;  } @@ -84,9 +86,11 @@ void vrt::if_hdr_pack_$(suffix)(      if (if_packet_info.has_tsi) pred |= $hex($tsi_p);      if (if_packet_info.has_tsf) pred |= $hex($tsf_p);      if (if_packet_info.has_tlr) pred |= $hex($tlr_p); +    if (if_packet_info.eob)     pred |= $hex($eob_p); +    if (if_packet_info.sob)     pred |= $hex($sob_p);      switch(pred){ -    #for $pred in range(2**5) +    #for $pred in range(2**7)      case $pred:          #set $num_header_words = 1          #set $flags = 0 @@ -126,6 +130,13 @@ void vrt::if_hdr_pack_$(suffix)(          #else              #set $num_trailer_words = 0;          #end if +        ########## Burst Flags ########## +        #if $pred & $eob_p +            #set $flags |= (0x1 << 24); +        #end if +        #if $pred & $sob_p +            #set $flags |= (0x1 << 25); +        #end if          ########## Variables ##########              if_packet_info.num_header_words32 = $num_header_words;              if_packet_info.num_packet_words32 = $($num_header_words + $num_trailer_words) + if_packet_info.num_payload_words32; @@ -134,10 +145,6 @@ void vrt::if_hdr_pack_$(suffix)(      #end for      } -    //set the burst flags -    if (if_packet_info.sob) vrt_hdr_flags |= $hex(0x1 << 25); -    if (if_packet_info.eob) vrt_hdr_flags |= $hex(0x1 << 24); -      //fill in complete header word      packet_buff[0] = $(XE_MACRO)(boost::uint32_t(0          | (if_packet_info.packet_type << 29) @@ -162,13 +169,11 @@ void vrt::if_hdr_unpack_$(suffix)(      //extract fields from the header      if_packet_info.packet_type = if_packet_info_t::packet_type_t(vrt_hdr_word >> 29);      if_packet_info.packet_count = (vrt_hdr_word >> 16) & 0xf; -    //if_packet_info.sob = bool(vrt_hdr_word & $hex(0x1 << 25)); //not implemented -    //if_packet_info.eob = bool(vrt_hdr_word & $hex(0x1 << 24)); //not implemented      const pred_type pred = pred_unpack_table[pred_table_index(vrt_hdr_word)];      switch(pred){ -    #for $pred in range(2**5) +    #for $pred in range(2**7)      case $pred:          #set $has_time_spec = False          #set $num_header_words = 1 @@ -215,6 +220,17 @@ void vrt::if_hdr_unpack_$(suffix)(              if_packet_info.has_tlr = false;              #set $num_trailer_words = 0;          #end if +        ########## Burst Flags ########## +        #if $pred & $eob_p +            if_packet_info.eob = true; +        #else +            if_packet_info.eob = false; +        #end if +        #if $pred & $sob_p +            if_packet_info.sob = true; +        #else +            if_packet_info.sob = false; +        #end if          ########## Variables ##########              //another failure case              if (packet_words32 < $($num_header_words + $num_trailer_words)) @@ -243,9 +259,11 @@ if __name__ == '__main__':      open(sys.argv[1], 'w').write(parse_tmpl(          TMPL_TEXT,          file=__file__, -        sid_p = 0b00001, -        cid_p = 0b00010, -        tsi_p = 0b00100, -        tsf_p = 0b01000, -        tlr_p = 0b10000, +        sid_p = 0b0000001, +        cid_p = 0b0000010, +        tsi_p = 0b0000100, +        tsf_p = 0b0001000, +        tlr_p = 0b0010000, +        sob_p = 0b0100000, +        eob_p = 0b1000000,      )) diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp new file mode 100644 index 000000000..78d8dfce8 --- /dev/null +++ b/host/lib/transport/super_recv_packet_handler.hpp @@ -0,0 +1,637 @@ +// +// Copyright 2011 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. +// + +#ifndef INCLUDED_LIBUHD_TRANSPORT_SUPER_RECV_PACKET_HANDLER_HPP +#define INCLUDED_LIBUHD_TRANSPORT_SUPER_RECV_PACKET_HANDLER_HPP + +#include <uhd/config.hpp> +#include <uhd/exception.hpp> +#include <uhd/convert.hpp> +#include <uhd/device.hpp> +#include <uhd/utils/msg.hpp> +#include <uhd/utils/byteswap.hpp> +#include <uhd/types/io_type.hpp> +#include <uhd/types/otw_type.hpp> +#include <uhd/types/metadata.hpp> +#include <uhd/transport/vrt_if_packet.hpp> +#include <uhd/transport/zero_copy.hpp> +#include <boost/thread/mutex.hpp> +#include <boost/foreach.hpp> +#include <boost/function.hpp> +#include <boost/format.hpp> +#include <iostream> +#include <vector> + +namespace uhd{ namespace transport{ namespace sph{ + +UHD_INLINE boost::uint32_t get_context_code( +    const boost::uint32_t *vrt_hdr, const vrt::if_packet_info_t &if_packet_info +){ +    //extract the context word (we dont know the endianness so mirror the bytes) +    boost::uint32_t word0 = vrt_hdr[if_packet_info.num_header_words32] | +              uhd::byteswap(vrt_hdr[if_packet_info.num_header_words32]); +    return word0 & 0xff; +} + +typedef boost::function<void(void)> handle_overflow_type; +static inline void handle_overflow_nop(void){} + +/*********************************************************************** + * Alignment indexes class: + *  - Access an integer set with very quick operations. + **********************************************************************/ +class alignment_indexes{ +public: +    typedef boost::uint16_t index_type; //16 buffers + +    alignment_indexes(void): +        _indexes(0), +        _sizes(256, 0), +        _fronts(256, ~0) +    { +        //fill the O(1) look up tables for a single byte +        for (size_t i = 0; i < 256; i++){ +            for (size_t j = 0; j < 8; j++){ +                if (i & (1 << j)){ +                    _sizes[i]++; +                    _fronts[i] = j; +                } +            } +        } +    } + +    UHD_INLINE void reset(size_t len){_indexes = (1 << len) - 1;} + +    UHD_INLINE size_t front(void){ +        //check one byte per iteration +        for (size_t i = 0; i < sizeof(_indexes)*8; i+=8){ +            size_t front = _fronts[(_indexes >> i) & 0xff]; +            if (front != size_t(~0)) return front + i; +        } +        if (empty()) throw uhd::runtime_error("cannot call front() when empty"); +        UHD_THROW_INVALID_CODE_PATH(); +    } + +    UHD_INLINE void remove(size_t index){_indexes &= ~(1 << index);} + +    UHD_INLINE bool empty(void){return _indexes == 0;} + +    UHD_INLINE size_t size(void){ +        size_t size = 0; +        //check one byte per iteration +        for (size_t i = 0; i < sizeof(_indexes)*8; i+=8){ +            size += _sizes[(_indexes >> i) & 0xff]; +        } +        return size; +    } + +private: +    index_type _indexes; +    std::vector<size_t> _sizes; +    std::vector<size_t> _fronts; +}; + +/*********************************************************************** + * Super receive packet handler + * + * A receive packet handler represents a group of channels. + * The channel group shares a common sample rate. + * All channels are received in unison in recv(). + **********************************************************************/ +class recv_packet_handler{ +public: +    typedef boost::function<managed_recv_buffer::sptr(double)> get_buff_type; +    typedef void(*vrt_unpacker_type)(const boost::uint32_t *, vrt::if_packet_info_t &); +    //typedef boost::function<void(const boost::uint32_t *, vrt::if_packet_info_t &)> vrt_unpacker_type; + +    /*! +     * Make a new packet handler for receive +     * \param size the number of transport channels +     */ +    recv_packet_handler(const size_t size = 1): +        _queue_error_for_next_call(false), +        _buffers_infos_index(0) +    { +        UHD_ASSERT_THROW(size <= sizeof(alignment_indexes::index_type)*8); +        this->resize(size); +        set_alignment_failure_threshold(1000); +    } + +    //! Resize the number of transport channels +    void resize(const size_t size){ +        if (this->size() == size) return; +        _props.resize(size); +        _buffers_infos.resize(4, buffers_info_type(size)); +    } + +    //! Get the channel width of this handler +    size_t size(void) const{ +        return _props.size(); +    } + +    //! Setup the vrt unpacker function and offset +    void set_vrt_unpacker(const vrt_unpacker_type &vrt_unpacker, const size_t header_offset_words32 = 0){ +        _vrt_unpacker = vrt_unpacker; +        _header_offset_words32 = header_offset_words32; +    } + +    /*! +     * Set the threshold for alignment failure. +     * How many packets throw out before giving up? +     * \param threshold number of packets per channel +     */ +    void set_alignment_failure_threshold(const size_t threshold){ +        _alignment_faulure_threshold = threshold*this->size(); +    } + +    //! Set the rate of ticks per second +    void set_tick_rate(const double rate){ +        _tick_rate = rate; +    } + +    //! Set the rate of samples per second +    void set_samp_rate(const double rate){ +        _samp_rate = rate; +    } + +    /*! +     * Set the function to get a managed buffer. +     * \param xport_chan which transport channel +     * \param get_buff the getter function +     */ +    void set_xport_chan_get_buff(const size_t xport_chan, const get_buff_type &get_buff){ +        _props.at(xport_chan).get_buff = get_buff; +    } + +    /*! +     * Setup the conversion functions (homogeneous across transports). +     * Here, we load a table of converters for all possible io types. +     * This makes the converter look-up an O(1) operation. +     * \param otw_type the channel data type +     * \param width the streams per channel (usually 1) +     */ +    void set_converter(const uhd::otw_type_t &otw_type, const size_t width = 1){ +        _io_buffs.resize(width); +        _converters.resize(128); +        for (size_t io_type = 0; io_type < _converters.size(); io_type++){ +            try{ +                _converters[io_type] = uhd::convert::get_converter_otw_to_cpu( +                    io_type_t::tid_t(io_type), otw_type, 1, width +                ); +            }catch(const uhd::value_error &e){} //we expect this, not all io_types valid... +        } +        _bytes_per_item = otw_type.get_sample_size(); +    } + +    //! Set the transport channel's overflow handler +    void set_overflow_handler(const size_t xport_chan, const handle_overflow_type &handle_overflow){ +        _props.at(xport_chan).handle_overflow = handle_overflow; +    } + +    //! Get a scoped lock object for this instance +    boost::mutex::scoped_lock get_scoped_lock(void){ +        return boost::mutex::scoped_lock(_mutex); +    } + +    /******************************************************************* +     * Receive: +     * The entry point for the fast-path receive calls. +     * Dispatch into combinations of single packet receive calls. +     ******************************************************************/ +    UHD_INLINE size_t recv( +        const uhd::device::recv_buffs_type &buffs, +        const size_t nsamps_per_buff, +        uhd::rx_metadata_t &metadata, +        const uhd::io_type_t &io_type, +        uhd::device::recv_mode_t recv_mode, +        double timeout +    ){ +        boost::mutex::scoped_lock lock(_mutex); + +        //handle metadata queued from a previous receive +        if (_queue_error_for_next_call){ +            _queue_error_for_next_call = false; +            metadata = _queue_metadata; +            //We want to allow a full buffer recv to be cut short by a timeout, +            //but do not want to generate an inline timeout message packet. +            if (_queue_metadata.error_code != rx_metadata_t::ERROR_CODE_TIMEOUT) return 0; +        } + +        switch(recv_mode){ + +        //////////////////////////////////////////////////////////////// +        case uhd::device::RECV_MODE_ONE_PACKET:{ +        //////////////////////////////////////////////////////////////// +            return recv_one_packet(buffs, nsamps_per_buff, metadata, io_type, timeout); +        } + +        //////////////////////////////////////////////////////////////// +        case uhd::device::RECV_MODE_FULL_BUFF:{ +        //////////////////////////////////////////////////////////////// +            size_t accum_num_samps = recv_one_packet( +                buffs, nsamps_per_buff, metadata, io_type, timeout +            ); + +            //first recv had an error code set, return immediately +            if (metadata.error_code != rx_metadata_t::ERROR_CODE_NONE) return accum_num_samps; + +            //loop until buffer is filled or error code +            while(accum_num_samps < nsamps_per_buff){ +                size_t num_samps = recv_one_packet( +                    buffs, nsamps_per_buff - accum_num_samps, _queue_metadata, +                    io_type, timeout, accum_num_samps*io_type.size +                ); + +                //metadata had an error code set, store for next call and return +                if (_queue_metadata.error_code != rx_metadata_t::ERROR_CODE_NONE){ +                    _queue_error_for_next_call = true; +                    break; +                } +                accum_num_samps += num_samps; +            } +            return accum_num_samps; +        } + +        default: throw uhd::value_error("unknown recv mode"); +        }//switch(recv_mode) +    } + +private: + +    boost::mutex _mutex; +    vrt_unpacker_type _vrt_unpacker; +    size_t _header_offset_words32; +    double _tick_rate, _samp_rate; +    bool _queue_error_for_next_call; +    size_t _alignment_faulure_threshold; +    rx_metadata_t _queue_metadata; +    struct xport_chan_props_type{ +        xport_chan_props_type(void): +            packet_count(0), +            handle_overflow(&handle_overflow_nop) +        {} +        get_buff_type get_buff; +        size_t packet_count; +        handle_overflow_type handle_overflow; +    }; +    std::vector<xport_chan_props_type> _props; +    std::vector<void *> _io_buffs; //used in conversion +    size_t _bytes_per_item; //used in conversion +    std::vector<uhd::convert::function_type> _converters; //used in conversion + +    //! information stored for a received buffer +    struct per_buffer_info_type{ +        managed_recv_buffer::sptr buff; +        const boost::uint32_t *vrt_hdr; +        vrt::if_packet_info_t ifpi; +        time_spec_t time; +        const char *copy_buff; +    }; + +    //!information stored for a set of aligned buffers +    struct buffers_info_type : std::vector<per_buffer_info_type> { +        buffers_info_type(const size_t size): +            std::vector<per_buffer_info_type>(size), +            alignment_time_valid(false), +            data_bytes_to_copy(0), +            fragment_offset_in_samps(0) +        { +            indexes_to_do.reset(size); +        } +        alignment_indexes indexes_to_do; //used in alignment logic +        time_spec_t alignment_time; //used in alignment logic +        bool alignment_time_valid; //used in alignment logic +        size_t data_bytes_to_copy; //keeps track of state +        size_t fragment_offset_in_samps; //keeps track of state +        rx_metadata_t metadata; //packet description +    }; + +    //! a circular queue of buffer infos +    std::vector<buffers_info_type> _buffers_infos; +    size_t _buffers_infos_index; +    buffers_info_type &get_curr_buffer_info(void){return _buffers_infos[_buffers_infos_index];} +    buffers_info_type &get_prev_buffer_info(void){return _buffers_infos[(_buffers_infos_index + 3)%4];} +    buffers_info_type &get_next_buffer_info(void){return _buffers_infos[(_buffers_infos_index + 1)%4];} +    void increment_buffer_info(void){_buffers_infos_index = (_buffers_infos_index + 1)%4;} + +    //! possible return options for the packet receiver +    enum packet_type{ +        PACKET_IF_DATA, +        PACKET_TIMESTAMP_ERROR, +        PACKET_INLINE_MESSAGE, +        PACKET_TIMEOUT_ERROR, +        PACKET_SEQUENCE_ERROR +    }; + +    /******************************************************************* +     * Get and process a single packet from the transport: +     * Receive a single packet at the given index. +     * Extract all the relevant info and store. +     * Check the info to determine the return code. +     ******************************************************************/ +    UHD_INLINE packet_type get_and_process_single_packet( +        const size_t index, +        buffers_info_type &prev_buffer_info, +        buffers_info_type &curr_buffer_info, +        double timeout +    ){ +        //get a single packet from the transport layer +        managed_recv_buffer::sptr &buff = curr_buffer_info[index].buff; +        buff = _props[index].get_buff(timeout); +        if (buff.get() == NULL) return PACKET_TIMEOUT_ERROR; + +        //bounds check before extract +        size_t num_packet_words32 = buff->size()/sizeof(boost::uint32_t); +        if (num_packet_words32 <= _header_offset_words32){ +            throw std::runtime_error("recv buffer smaller than vrt packet offset"); +        } + +        //extract packet info +        per_buffer_info_type &info = curr_buffer_info[index]; +        info.ifpi.num_packet_words32 = num_packet_words32 - _header_offset_words32; +        info.vrt_hdr = buff->cast<const boost::uint32_t *>() + _header_offset_words32; +        _vrt_unpacker(info.vrt_hdr, info.ifpi); +        info.time = time_spec_t(time_t(info.ifpi.tsi), size_t(info.ifpi.tsf), _tick_rate); //assumes has_tsi and has_tsf are true +        info.copy_buff = reinterpret_cast<const char *>(info.vrt_hdr + info.ifpi.num_header_words32); + +        //store the packet count for the next iteration +        #ifndef SRPH_DONT_CHECK_SEQUENCE +        const size_t expected_packet_count = _props[index].packet_count; +        _props[index].packet_count = (info.ifpi.packet_count + 1)%16; +        #endif + +        //-------------------------------------------------------------- +        //-- Determine return conditions: +        //-- The order of these checks is HOLY. +        //-------------------------------------------------------------- + +        //1) check for out of order timestamps +        if (info.ifpi.has_tsi and info.ifpi.has_tsf and prev_buffer_info[index].time > info.time){ +            return PACKET_TIMESTAMP_ERROR; +        } + +        //2) check for inline IF message packets +        if (info.ifpi.packet_type != vrt::if_packet_info_t::PACKET_TYPE_DATA){ +            return PACKET_INLINE_MESSAGE; +        } + +        //3) check for sequence errors +        #ifndef SRPH_DONT_CHECK_SEQUENCE +        if (expected_packet_count != info.ifpi.packet_count){ +            return PACKET_SEQUENCE_ERROR; +        } +        #endif + +        //4) otherwise the packet is normal! +        return PACKET_IF_DATA; +    } + +    /******************************************************************* +     * Alignment check: +     * Check the received packet for alignment and mark accordingly. +     ******************************************************************/ +    UHD_INLINE void alignment_check( +        const size_t index, buffers_info_type &info +    ){ +        //if alignment time was not valid or if the sequence id is newer: +        //  use this index's time as the alignment time +        //  reset the indexes list and remove this index +        if (not info.alignment_time_valid or info[index].time > info.alignment_time){ +            info.alignment_time_valid = true; +            info.alignment_time = info[index].time; +            info.indexes_to_do.reset(this->size()); +            info.indexes_to_do.remove(index); +            info.data_bytes_to_copy = info[index].ifpi.num_payload_words32*sizeof(boost::uint32_t); +        } + +        //if the sequence id matches: +        //  remove this index from the list and continue +        else if (info[index].time == info.alignment_time){ +            info.indexes_to_do.remove(index); +        } + +        //if the sequence id is older: +        //  continue with the same index to try again +        //else if (info[index].time < info.alignment_time)... +    } + +    /******************************************************************* +     * Get aligned buffers: +     * Iterate through each index and try to accumulate aligned buffers. +     * Handle all of the edge cases like inline messages and errors. +     * The logic will throw out older packets until it finds a match. +     ******************************************************************/ +    UHD_INLINE void get_aligned_buffs(double timeout){ + +        increment_buffer_info(); //increment to next buffer +        buffers_info_type &prev_info = get_prev_buffer_info(); +        buffers_info_type &curr_info = get_curr_buffer_info(); +        buffers_info_type &next_info = get_next_buffer_info(); + +        //Loop until we get a message of an aligned set of buffers: +        // - Receive a single packet and extract its info. +        // - Handle the packet type yielded by the receive. +        // - Check the timestamps for alignment conditions. +        size_t iterations = 0; +        while (not curr_info.indexes_to_do.empty()){ + +            //get the index to process for this iteration +            const size_t index = curr_info.indexes_to_do.front(); +            packet_type packet; + +            //receive a single packet from the transport +            try{ +                packet = get_and_process_single_packet( +                    index, prev_info, curr_info, timeout +                ); +            } + +            //handle the case when the get packet throws +            catch(const std::exception &e){ +                UHD_MSG(error) << boost::format( +                    "The receive packet handler caught an exception.\n%s" +                ) % e.what() << std::endl; +                std::swap(curr_info, next_info); //save progress from curr -> next +                curr_info.metadata.has_time_spec = false; +                curr_info.metadata.time_spec = time_spec_t(0.0); +                curr_info.metadata.more_fragments = false; +                curr_info.metadata.fragment_offset = 0; +                curr_info.metadata.start_of_burst = false; +                curr_info.metadata.end_of_burst = false; +                curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_BAD_PACKET; +                return; +            } + +            switch(packet){ +            case PACKET_IF_DATA: +                alignment_check(index, curr_info); +                break; + +            case PACKET_TIMESTAMP_ERROR: +                //If the user changes the device time while streaming or without flushing, +                //we can receive a packet that comes before the previous packet in time. +                //This could cause the alignment logic to discard future received packets. +                //Therefore, when this occurs, we reset the info to restart from scratch. +                if (curr_info.alignment_time_valid and curr_info.alignment_time != curr_info[index].time){ +                    curr_info.alignment_time_valid = false; +                } +                alignment_check(index, curr_info); +                break; + +            case PACKET_INLINE_MESSAGE: +                std::swap(curr_info, next_info); //save progress from curr -> next +                curr_info.metadata.has_time_spec = next_info[index].ifpi.has_tsi and next_info[index].ifpi.has_tsf; +                curr_info.metadata.time_spec = next_info[index].time; +                curr_info.metadata.more_fragments = false; +                curr_info.metadata.fragment_offset = 0; +                curr_info.metadata.start_of_burst = false; +                curr_info.metadata.end_of_burst = false; +                curr_info.metadata.error_code = rx_metadata_t::error_code_t(get_context_code(next_info[index].vrt_hdr, next_info[index].ifpi)); +                if (curr_info.metadata.error_code == rx_metadata_t::ERROR_CODE_OVERFLOW) _props[index].handle_overflow(); +                UHD_MSG(fastpath) << "O"; +                return; + +            case PACKET_TIMEOUT_ERROR: +                std::swap(curr_info, next_info); //save progress from curr -> next +                curr_info.metadata.has_time_spec = false; +                curr_info.metadata.time_spec = time_spec_t(0.0); +                curr_info.metadata.more_fragments = false; +                curr_info.metadata.fragment_offset = 0; +                curr_info.metadata.start_of_burst = false; +                curr_info.metadata.end_of_burst = false; +                curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_TIMEOUT; +                return; + +            case PACKET_SEQUENCE_ERROR: +                alignment_check(index, curr_info); +                std::swap(curr_info, next_info); //save progress from curr -> next +                curr_info.metadata.has_time_spec = prev_info.metadata.has_time_spec; +                curr_info.metadata.time_spec = prev_info.metadata.time_spec + time_spec_t( +                    0.0, prev_info[index].ifpi.num_payload_words32*sizeof(boost::uint32_t)/_bytes_per_item, _samp_rate); +                curr_info.metadata.more_fragments = false; +                curr_info.metadata.fragment_offset = 0; +                curr_info.metadata.start_of_burst = false; +                curr_info.metadata.end_of_burst = false; +                curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW; +                UHD_MSG(fastpath) << "O"; +                return; + +            } + +            //too many iterations: detect alignment failure +            if (iterations++ > _alignment_faulure_threshold){ +                UHD_MSG(error) << boost::format( +                    "The receive packet handler failed to time-align packets.\n" +                    "%u received packets were processed by the handler.\n" +                    "However, a timestamp match could not be determined.\n" +                ) % iterations << std::endl; +                std::swap(curr_info, next_info); //save progress from curr -> next +                curr_info.metadata.has_time_spec = false; +                curr_info.metadata.time_spec = time_spec_t(0.0); +                curr_info.metadata.more_fragments = false; +                curr_info.metadata.fragment_offset = 0; +                curr_info.metadata.start_of_burst = false; +                curr_info.metadata.end_of_burst = false; +                curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_ALIGNMENT; +                return; +            } + +        } + +        //set the metadata from the buffer information at index zero +        curr_info.metadata.has_time_spec = curr_info[0].ifpi.has_tsi and curr_info[0].ifpi.has_tsf; +        curr_info.metadata.time_spec = curr_info[0].time; +        curr_info.metadata.more_fragments = false; +        curr_info.metadata.fragment_offset = 0; +        /* TODO SOB on RX not supported in hardware +        static const int tlr_sob_flags = (1 << 21) | (1 << 9); //enable and indicator bits +        curr_info.metadata.start_of_burst = curr_info[0].ifpi.has_tlr and (int(curr_info[0].ifpi.tlr & tlr_sob_flags) != 0); +        */ +        curr_info.metadata.start_of_burst = false; +        static const int tlr_eob_flags = (1 << 20) | (1 << 8); //enable and indicator bits +        curr_info.metadata.end_of_burst   = curr_info[0].ifpi.has_tlr and (int(curr_info[0].ifpi.tlr & tlr_eob_flags) != 0); +        curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_NONE; + +    } + +    /******************************************************************* +     * Receive a single packet: +     * Handles fragmentation, messages, errors, and copy-conversion. +     * When no fragments are available, call the get aligned buffers. +     * Then copy-convert available data into the user's IO buffers. +     ******************************************************************/ +    UHD_INLINE size_t recv_one_packet( +        const uhd::device::recv_buffs_type &buffs, +        const size_t nsamps_per_buff, +        uhd::rx_metadata_t &metadata, +        const uhd::io_type_t &io_type, +        double timeout, +        const size_t buffer_offset_bytes = 0 +    ){ +        //get the next buffer if the current one has expired +        if (get_curr_buffer_info().data_bytes_to_copy == 0){ + +            //reset current buffer info members for reuse +            get_curr_buffer_info().fragment_offset_in_samps = 0; +            get_curr_buffer_info().alignment_time_valid = false; +            get_curr_buffer_info().indexes_to_do.reset(this->size()); + +            //perform receive with alignment logic +            get_aligned_buffs(timeout); +        } + +        buffers_info_type &info = get_curr_buffer_info(); +        metadata = info.metadata; + +        //interpolate the time spec (useful when this is a fragment) +        metadata.time_spec += time_spec_t(0, info.fragment_offset_in_samps, _samp_rate); + +        //extract the number of samples available to copy +        const size_t nsamps_available = info.data_bytes_to_copy/_bytes_per_item; +        const size_t nsamps_to_copy = std::min(nsamps_per_buff*_io_buffs.size(), nsamps_available); +        const size_t bytes_to_copy = nsamps_to_copy*_bytes_per_item; +        const size_t nsamps_to_copy_per_io_buff = nsamps_to_copy/_io_buffs.size(); + +        size_t buff_index = 0; +        BOOST_FOREACH(per_buffer_info_type &buff_info, info){ + +            //fill a vector with pointers to the io buffers +            BOOST_FOREACH(void *&io_buff, _io_buffs){ +                io_buff = reinterpret_cast<char *>(buffs[buff_index++]) + buffer_offset_bytes; +            } + +            //copy-convert the samples from the recv buffer +            _converters[io_type.tid](buff_info.copy_buff, _io_buffs, nsamps_to_copy_per_io_buff, 1/32767.); + +            //update the rx copy buffer to reflect the bytes copied +            buff_info.copy_buff += bytes_to_copy; +        } +        //update the copy buffer's availability +        info.data_bytes_to_copy -= bytes_to_copy; + +        //setup the fragment flags and offset +        metadata.more_fragments = info.data_bytes_to_copy != 0; +        metadata.fragment_offset = info.fragment_offset_in_samps; +        info.fragment_offset_in_samps += nsamps_to_copy; //set for next call + +        return nsamps_to_copy_per_io_buff; +    } +}; + +}}} //namespace + +#endif /* INCLUDED_LIBUHD_TRANSPORT_SUPER_RECV_PACKET_HANDLER_HPP */ diff --git a/host/lib/transport/super_send_packet_handler.hpp b/host/lib/transport/super_send_packet_handler.hpp new file mode 100644 index 000000000..99d445180 --- /dev/null +++ b/host/lib/transport/super_send_packet_handler.hpp @@ -0,0 +1,287 @@ +// +// Copyright 2011 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. +// + +#ifndef INCLUDED_LIBUHD_TRANSPORT_SUPER_SEND_PACKET_HANDLER_HPP +#define INCLUDED_LIBUHD_TRANSPORT_SUPER_SEND_PACKET_HANDLER_HPP + +#include <uhd/config.hpp> +#include <uhd/exception.hpp> +#include <uhd/convert.hpp> +#include <uhd/device.hpp> +#include <uhd/utils/byteswap.hpp> +#include <uhd/types/io_type.hpp> +#include <uhd/types/otw_type.hpp> +#include <uhd/types/metadata.hpp> +#include <uhd/transport/vrt_if_packet.hpp> +#include <uhd/transport/zero_copy.hpp> +#include <boost/thread/thread_time.hpp> +#include <boost/thread/mutex.hpp> +#include <boost/foreach.hpp> +#include <boost/function.hpp> +#include <iostream> +#include <vector> + +namespace uhd{ namespace transport{ namespace sph{ + +/*********************************************************************** + * Super send packet handler + * + * A send packet handler represents a group of channels. + * The channel group shares a common sample rate. + * All channels are sent in unison in send(). + **********************************************************************/ +class send_packet_handler{ +public: +    typedef boost::function<managed_send_buffer::sptr(double)> get_buff_type; +    typedef void(*vrt_packer_type)(boost::uint32_t *, vrt::if_packet_info_t &); +    //typedef boost::function<void(boost::uint32_t *, vrt::if_packet_info_t &)> vrt_packer_type; + +    /*! +     * Make a new packet handler for send +     * \param size the number of transport channels +     */ +    send_packet_handler(const size_t size = 1): +        _next_packet_seq(0) +    { +        this->resize(size); +    } + +    //! Resize the number of transport channels +    void resize(const size_t size){ +        if (this->size() == size) return; +        _props.resize(size); +        static const boost::uint64_t zero = 0; +        _zero_buffs.resize(size, &zero); +    } + +    //! Get the channel width of this handler +    size_t size(void) const{ +        return _props.size(); +    } + +    //! Setup the vrt packer function and offset +    void set_vrt_packer(const vrt_packer_type &vrt_packer, const size_t header_offset_words32 = 0){ +        _vrt_packer = vrt_packer; +        _header_offset_words32 = header_offset_words32; +    } + +    //! Set the rate of ticks per second +    void set_tick_rate(const double rate){ +        _tick_rate = rate; +    } + +    //! Set the rate of samples per second +    void set_samp_rate(const double rate){ +        _samp_rate = rate; +    } + +    /*! +     * Set the function to get a managed buffer. +     * \param xport_chan which transport channel +     * \param get_buff the getter function +     */ +    void set_xport_chan_get_buff(const size_t xport_chan, const get_buff_type &get_buff){ +        _props.at(xport_chan).get_buff = get_buff; +    } + +    /*! +     * Setup the conversion functions (homogeneous across transports). +     * Here, we load a table of converters for all possible io types. +     * This makes the converter look-up an O(1) operation. +     * \param otw_type the channel data type +     * \param width the streams per channel (usually 1) +     */ +    void set_converter(const uhd::otw_type_t &otw_type, const size_t width = 1){ +        _io_buffs.resize(width); +        _converters.resize(128); +        for (size_t io_type = 0; io_type < _converters.size(); io_type++){ +            try{ +                _converters[io_type] = uhd::convert::get_converter_cpu_to_otw( +                    io_type_t::tid_t(io_type), otw_type, 1, width +                ); +            }catch(const uhd::value_error &e){} //we expect this, not all io_types valid... +        } +        _bytes_per_item = otw_type.get_sample_size(); +    } + +    /*! +     * Set the maximum number of samples per host packet. +     * Ex: A USRP1 in dual channel mode would be half. +     * \param num_samps the maximum samples in a packet +     */ +    void set_max_samples_per_packet(const size_t num_samps){ +        _max_samples_per_packet = num_samps; +    } + +    //! Get a scoped lock object for this instance +    boost::mutex::scoped_lock get_scoped_lock(void){ +        return boost::mutex::scoped_lock(_mutex); +    } + +    /******************************************************************* +     * Send: +     * The entry point for the fast-path send calls. +     * Dispatch into combinations of single packet send calls. +     ******************************************************************/ +    UHD_INLINE size_t send( +        const uhd::device::send_buffs_type &buffs, +        const size_t nsamps_per_buff, +        const uhd::tx_metadata_t &metadata, +        const uhd::io_type_t &io_type, +        uhd::device::send_mode_t send_mode, +        double timeout +    ){ +        boost::mutex::scoped_lock lock(_mutex); + +        //translate the metadata to vrt if packet info +        vrt::if_packet_info_t if_packet_info; +        if_packet_info.has_sid = false; +        if_packet_info.has_cid = false; +        if_packet_info.has_tlr = false; +        if_packet_info.has_tsi = metadata.has_time_spec; +        if_packet_info.has_tsf = metadata.has_time_spec; +        if_packet_info.tsi     = boost::uint32_t(metadata.time_spec.get_full_secs()); +        if_packet_info.tsf     = boost::uint64_t(metadata.time_spec.get_tick_count(_tick_rate)); +        if_packet_info.sob     = metadata.start_of_burst; +        if_packet_info.eob     = metadata.end_of_burst; + +        if (nsamps_per_buff <= _max_samples_per_packet) send_mode = uhd::device::SEND_MODE_ONE_PACKET; +        switch(send_mode){ + +        //////////////////////////////////////////////////////////////// +        case uhd::device::SEND_MODE_ONE_PACKET:{ +        //////////////////////////////////////////////////////////////// + +            //TODO remove this code when sample counts of zero are supported by hardware +            if (nsamps_per_buff == 0) return send_one_packet( +                _zero_buffs, 1, if_packet_info, io_type, timeout +            ); + +            return send_one_packet( +                buffs, +                std::min(nsamps_per_buff, _max_samples_per_packet), +                if_packet_info, io_type, timeout +            ); +        } + +        //////////////////////////////////////////////////////////////// +        case uhd::device::SEND_MODE_FULL_BUFF:{ +        //////////////////////////////////////////////////////////////// +            size_t total_num_samps_sent = 0; + +            //false until final fragment +            if_packet_info.eob = false; + +            const size_t num_fragments = (nsamps_per_buff-1)/_max_samples_per_packet; +            const size_t final_length = ((nsamps_per_buff-1)%_max_samples_per_packet)+1; + +            //loop through the following fragment indexes +            for (size_t i = 0; i < num_fragments; i++){ + +                //send a fragment with the helper function +                const size_t num_samps_sent = send_one_packet( +                    buffs, _max_samples_per_packet, +                    if_packet_info, io_type, timeout, +                    total_num_samps_sent*io_type.size +                ); +                total_num_samps_sent += num_samps_sent; +                if (num_samps_sent == 0) return total_num_samps_sent; + +                //setup metadata for the next fragment +                const time_spec_t time_spec = metadata.time_spec + time_spec_t(0, total_num_samps_sent, _samp_rate); +                if_packet_info.tsi = boost::uint32_t(time_spec.get_full_secs()); +                if_packet_info.tsf = boost::uint64_t(time_spec.get_tick_count(_tick_rate)); +                if_packet_info.sob = false; + +            } + +            //send the final fragment with the helper function +            if_packet_info.eob = metadata.end_of_burst; +            return total_num_samps_sent + send_one_packet( +                buffs, final_length, +                if_packet_info, io_type, timeout, +                total_num_samps_sent*io_type.size +            ); +        } + +        default: throw uhd::value_error("unknown send mode"); +        }//switch(send_mode) +    } + +private: + +    boost::mutex _mutex; +    vrt_packer_type _vrt_packer; +    size_t _header_offset_words32; +    double _tick_rate, _samp_rate; +    struct xport_chan_props_type{ +        get_buff_type get_buff; +    }; +    std::vector<xport_chan_props_type> _props; +    std::vector<const void *> _io_buffs; //used in conversion +    size_t _bytes_per_item; //used in conversion +    std::vector<uhd::convert::function_type> _converters; //used in conversion +    size_t _max_samples_per_packet; +    std::vector<const void *> _zero_buffs; +    size_t _next_packet_seq; + +    /******************************************************************* +     * Send a single packet: +     ******************************************************************/ +    size_t send_one_packet( +        const uhd::device::send_buffs_type &buffs, +        const size_t nsamps_per_buff, +        vrt::if_packet_info_t &if_packet_info, +        const uhd::io_type_t &io_type, +        double timeout, +        const size_t buffer_offset_bytes = 0 +    ){ +        //load the rest of the if_packet_info in here +        if_packet_info.num_payload_words32 = (nsamps_per_buff*_io_buffs.size()*_bytes_per_item)/sizeof(boost::uint32_t); +        if_packet_info.packet_count = _next_packet_seq; + +        size_t buff_index = 0; +        BOOST_FOREACH(xport_chan_props_type &props, _props){ +            managed_send_buffer::sptr buff = props.get_buff(timeout); +            if (buff.get() == NULL) return 0; //timeout + +            //fill a vector with pointers to the io buffers +            BOOST_FOREACH(const void *&io_buff, _io_buffs){ +                io_buff = reinterpret_cast<const char *>(buffs[buff_index++]) + buffer_offset_bytes; +            } +            boost::uint32_t *otw_mem = buff->cast<boost::uint32_t *>() + _header_offset_words32; + +            //pack metadata into a vrt header +            _vrt_packer(otw_mem, if_packet_info); +            otw_mem += if_packet_info.num_header_words32; + +            //copy-convert the samples into the send buffer +            _converters[io_type.tid](_io_buffs, otw_mem, nsamps_per_buff, 32767.); + +            //commit the samples to the zero-copy interface +            size_t num_bytes_total = (_header_offset_words32+if_packet_info.num_packet_words32)*sizeof(boost::uint32_t); +            buff->commit(num_bytes_total); + +        } +        _next_packet_seq++; //increment sequence after commits +        return nsamps_per_buff; +    } +}; + +}}} //namespace + +#endif /* INCLUDED_LIBUHD_TRANSPORT_SUPER_SEND_PACKET_HANDLER_HPP */ diff --git a/host/tests/CMakeLists.txt b/host/tests/CMakeLists.txt index b38afccf0..b7bcfb7d5 100644 --- a/host/tests/CMakeLists.txt +++ b/host/tests/CMakeLists.txt @@ -28,6 +28,8 @@ SET(test_sources      gain_group_test.cpp      msg_test.cpp      ranges_test.cpp +    sph_recv_test.cpp +    sph_send_test.cpp      subdev_spec_test.cpp      time_spec_test.cpp      tune_helper_test.cpp diff --git a/host/tests/sph_recv_test.cpp b/host/tests/sph_recv_test.cpp new file mode 100644 index 000000000..6bb5d1175 --- /dev/null +++ b/host/tests/sph_recv_test.cpp @@ -0,0 +1,715 @@ +// +// Copyright 2011 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. +// + +#include <boost/test/unit_test.hpp> +#include "../lib/transport/super_recv_packet_handler.hpp" +#include <boost/shared_array.hpp> +#include <boost/bind.hpp> +#include <complex> +#include <vector> +#include <list> + +#define BOOST_CHECK_TS_CLOSE(a, b) \ +    BOOST_CHECK_CLOSE((a).get_real_secs(), (b).get_real_secs(), 0.001) + +/*********************************************************************** + * A dummy managed receive buffer for testing + **********************************************************************/ +class dummy_mrb : public uhd::transport::managed_recv_buffer{ +public: +    void release(void){ +        //NOP +    } + +    sptr get_new(boost::shared_array<char> mem, size_t len){ +        _mem = mem; +        _len = len; +        return make_managed_buffer(this); +    } + +private: +    const void *get_buff(void) const{return _mem.get();} +    size_t get_size(void) const{return _len;} + +    boost::shared_array<char> _mem; +    size_t _len; +}; + +/*********************************************************************** + * A dummy transport class to fill with fake data + **********************************************************************/ +class dummy_recv_xport_class{ +public: +    dummy_recv_xport_class(const uhd::otw_type_t &otw_type){ +        _otw_type = otw_type; +    } + +    void push_back_packet( +        uhd::transport::vrt::if_packet_info_t &ifpi, +        const boost::uint32_t optional_msg_word = 0 +    ){ +        const size_t max_pkt_len = (ifpi.num_payload_words32 + uhd::transport::vrt::max_if_hdr_words32 + 1/*tlr*/)*sizeof(boost::uint32_t); +        _mems.push_back(boost::shared_array<char>(new char[max_pkt_len])); +        if (_otw_type.byteorder == uhd::otw_type_t::BO_BIG_ENDIAN){ +            uhd::transport::vrt::if_hdr_pack_be(reinterpret_cast<boost::uint32_t *>(_mems.back().get()), ifpi); +        } +        if (_otw_type.byteorder == uhd::otw_type_t::BO_LITTLE_ENDIAN){ +            uhd::transport::vrt::if_hdr_pack_le(reinterpret_cast<boost::uint32_t *>(_mems.back().get()), ifpi); +        } +        (reinterpret_cast<boost::uint32_t *>(_mems.back().get()) + ifpi.num_header_words32)[0] = optional_msg_word | uhd::byteswap(optional_msg_word); +        _lens.push_back(ifpi.num_packet_words32*sizeof(boost::uint32_t)); +    } + +    uhd::transport::managed_recv_buffer::sptr get_recv_buff(double){ +        if (_mems.empty()) return uhd::transport::managed_recv_buffer::sptr(); //timeout +        _mrbs.push_back(dummy_mrb()); +        uhd::transport::managed_recv_buffer::sptr mrb = _mrbs.back().get_new(_mems.front(), _lens.front()); +        _mems.pop_front(); +        _lens.pop_front(); +        return mrb; +    } + +private: +    std::list<boost::shared_array<char> > _mems; +    std::list<size_t> _lens; +    std::list<dummy_mrb> _mrbs; //list means no-realloc +    uhd::otw_type_t _otw_type; +}; + +//////////////////////////////////////////////////////////////////////// +BOOST_AUTO_TEST_CASE(test_sph_recv_one_channel_normal){ +//////////////////////////////////////////////////////////////////////// +    uhd::otw_type_t otw_type; +    otw_type.width = 16; +    otw_type.shift = 0; +    otw_type.byteorder = uhd::otw_type_t::BO_BIG_ENDIAN; + +    dummy_recv_xport_class dummy_recv_xport(otw_type); +    uhd::transport::vrt::if_packet_info_t ifpi; +    ifpi.packet_type = uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_DATA; +    ifpi.num_payload_words32 = 0; +    ifpi.packet_count = 0; +    ifpi.sob = true; +    ifpi.eob = false; +    ifpi.has_sid = false; +    ifpi.has_cid = false; +    ifpi.has_tsi = true; +    ifpi.has_tsf = true; +    ifpi.tsi = 0; +    ifpi.tsf = 0; +    ifpi.has_tlr = false; + +    static const double TICK_RATE = 100e6; +    static const double SAMP_RATE = 10e6; +    static const size_t NUM_PKTS_TO_TEST = 30; + +    //generate a bunch of packets +    for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){ +        ifpi.num_payload_words32 = 10 + i%10; +        dummy_recv_xport.push_back_packet(ifpi); +        ifpi.packet_count++; +        ifpi.tsf += ifpi.num_payload_words32*size_t(TICK_RATE/SAMP_RATE); +    } + +    //create the super receive packet handler +    uhd::transport::sph::recv_packet_handler handler(1); +    handler.set_vrt_unpacker(&uhd::transport::vrt::if_hdr_unpack_be); +    handler.set_tick_rate(TICK_RATE); +    handler.set_samp_rate(SAMP_RATE); +    handler.set_xport_chan_get_buff(0, boost::bind(&dummy_recv_xport_class::get_recv_buff, &dummy_recv_xport, _1)); +    handler.set_converter(otw_type); + +    //check the received packets +    size_t num_accum_samps = 0; +    std::vector<std::complex<float> > buff(20); +    uhd::rx_metadata_t metadata; +    for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){ +        std::cout << "data check " << i << std::endl; +        size_t num_samps_ret = handler.recv( +            &buff.front(), buff.size(), metadata, +            uhd::io_type_t::COMPLEX_FLOAT32, +            uhd::device::RECV_MODE_ONE_PACKET, 1.0 +        ); +        BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_NONE); +        BOOST_CHECK(not metadata.more_fragments); +        BOOST_CHECK(metadata.has_time_spec); +        BOOST_CHECK_TS_CLOSE(metadata.time_spec, uhd::time_spec_t(0, num_accum_samps, SAMP_RATE)); +        BOOST_CHECK_EQUAL(num_samps_ret, 10 + i%10); +        num_accum_samps += num_samps_ret; +    } + +    //subsequent receives should be a timeout +    for (size_t i = 0; i < 3; i++){ +        std::cout << "timeout check " << i << std::endl; +        handler.recv( +            &buff.front(), buff.size(), metadata, +            uhd::io_type_t::COMPLEX_FLOAT32, +            uhd::device::RECV_MODE_ONE_PACKET, 1.0 +        ); +        BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_TIMEOUT); +    } +} + +//////////////////////////////////////////////////////////////////////// +BOOST_AUTO_TEST_CASE(test_sph_recv_one_channel_sequence_error){ +//////////////////////////////////////////////////////////////////////// +    uhd::otw_type_t otw_type; +    otw_type.width = 16; +    otw_type.shift = 0; +    otw_type.byteorder = uhd::otw_type_t::BO_BIG_ENDIAN; + +    dummy_recv_xport_class dummy_recv_xport(otw_type); +    uhd::transport::vrt::if_packet_info_t ifpi; +    ifpi.packet_type = uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_DATA; +    ifpi.num_payload_words32 = 0; +    ifpi.packet_count = 0; +    ifpi.sob = true; +    ifpi.eob = false; +    ifpi.has_sid = false; +    ifpi.has_cid = false; +    ifpi.has_tsi = true; +    ifpi.has_tsf = true; +    ifpi.tsi = 0; +    ifpi.tsf = 0; +    ifpi.has_tlr = false; + +    static const double TICK_RATE = 100e6; +    static const double SAMP_RATE = 10e6; +    static const size_t NUM_PKTS_TO_TEST = 30; + +    //generate a bunch of packets +    for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){ +        ifpi.num_payload_words32 = 10 + i%10; +        if (i != NUM_PKTS_TO_TEST/2){ //simulate a lost packet +            dummy_recv_xport.push_back_packet(ifpi); +        } +        ifpi.packet_count++; +        ifpi.tsf += ifpi.num_payload_words32*size_t(TICK_RATE/SAMP_RATE); +    } + +    //create the super receive packet handler +    uhd::transport::sph::recv_packet_handler handler(1); +    handler.set_vrt_unpacker(&uhd::transport::vrt::if_hdr_unpack_be); +    handler.set_tick_rate(TICK_RATE); +    handler.set_samp_rate(SAMP_RATE); +    handler.set_xport_chan_get_buff(0, boost::bind(&dummy_recv_xport_class::get_recv_buff, &dummy_recv_xport, _1)); +    handler.set_converter(otw_type); + +    //check the received packets +    size_t num_accum_samps = 0; +    std::vector<std::complex<float> > buff(20); +    uhd::rx_metadata_t metadata; +    for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){ +        std::cout << "data check " << i << std::endl; +        size_t num_samps_ret = handler.recv( +            &buff.front(), buff.size(), metadata, +            uhd::io_type_t::COMPLEX_FLOAT32, +            uhd::device::RECV_MODE_ONE_PACKET, 1.0 +        ); +        if (i == NUM_PKTS_TO_TEST/2){ +            //must get the soft overflow here +            BOOST_REQUIRE(metadata.error_code == uhd::rx_metadata_t::ERROR_CODE_OVERFLOW); +            BOOST_CHECK_TS_CLOSE(metadata.time_spec, uhd::time_spec_t(0, num_accum_samps, SAMP_RATE)); +            num_accum_samps += 10 + i%10; +        } +        else{ +            BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_NONE); +            BOOST_CHECK(not metadata.more_fragments); +            BOOST_CHECK(metadata.has_time_spec); +            BOOST_CHECK_TS_CLOSE(metadata.time_spec, uhd::time_spec_t(0, num_accum_samps, SAMP_RATE)); +            BOOST_CHECK_EQUAL(num_samps_ret, 10 + i%10); +            num_accum_samps += num_samps_ret; +        } +    } + +    //subsequent receives should be a timeout +    for (size_t i = 0; i < 3; i++){ +        std::cout << "timeout check " << i << std::endl; +        handler.recv( +            &buff.front(), buff.size(), metadata, +            uhd::io_type_t::COMPLEX_FLOAT32, +            uhd::device::RECV_MODE_ONE_PACKET, 1.0 +        ); +        BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_TIMEOUT); +    } +} + +//////////////////////////////////////////////////////////////////////// +BOOST_AUTO_TEST_CASE(test_sph_recv_one_channel_inline_message){ +//////////////////////////////////////////////////////////////////////// +    uhd::otw_type_t otw_type; +    otw_type.width = 16; +    otw_type.shift = 0; +    otw_type.byteorder = uhd::otw_type_t::BO_BIG_ENDIAN; + +    dummy_recv_xport_class dummy_recv_xport(otw_type); +    uhd::transport::vrt::if_packet_info_t ifpi; +    ifpi.packet_type = uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_DATA; +    ifpi.num_payload_words32 = 0; +    ifpi.packet_count = 0; +    ifpi.sob = true; +    ifpi.eob = false; +    ifpi.has_sid = false; +    ifpi.has_cid = false; +    ifpi.has_tsi = true; +    ifpi.has_tsf = true; +    ifpi.tsi = 0; +    ifpi.tsf = 0; +    ifpi.has_tlr = false; + +    static const double TICK_RATE = 100e6; +    static const double SAMP_RATE = 10e6; +    static const size_t NUM_PKTS_TO_TEST = 30; + +    //generate a bunch of packets +    for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){ +        ifpi.num_payload_words32 = 10 + i%10; +        dummy_recv_xport.push_back_packet(ifpi); +        ifpi.packet_count++; +        ifpi.tsf += ifpi.num_payload_words32*size_t(TICK_RATE/SAMP_RATE); + +        //simulate overflow +        if (i == NUM_PKTS_TO_TEST/2){ +            ifpi.packet_type = uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_EXTENSION; +            ifpi.num_payload_words32 = 1; +            dummy_recv_xport.push_back_packet(ifpi, uhd::rx_metadata_t::ERROR_CODE_OVERFLOW); +            ifpi.packet_count++; +            ifpi.packet_type = uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_DATA; +        } +    } + +    //create the super receive packet handler +    uhd::transport::sph::recv_packet_handler handler(1); +    handler.set_vrt_unpacker(&uhd::transport::vrt::if_hdr_unpack_be); +    handler.set_tick_rate(TICK_RATE); +    handler.set_samp_rate(SAMP_RATE); +    handler.set_xport_chan_get_buff(0, boost::bind(&dummy_recv_xport_class::get_recv_buff, &dummy_recv_xport, _1)); +    handler.set_converter(otw_type); + +    //check the received packets +    size_t num_accum_samps = 0; +    std::vector<std::complex<float> > buff(20); +    uhd::rx_metadata_t metadata; +    for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){ +        std::cout << "data check " << i << std::endl; +        size_t num_samps_ret = handler.recv( +            &buff.front(), buff.size(), metadata, +            uhd::io_type_t::COMPLEX_FLOAT32, +            uhd::device::RECV_MODE_ONE_PACKET, 1.0 +        ); +        BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_NONE); +        BOOST_CHECK(not metadata.more_fragments); +        BOOST_CHECK(metadata.has_time_spec); +        BOOST_CHECK_TS_CLOSE(metadata.time_spec, uhd::time_spec_t(0, num_accum_samps, SAMP_RATE)); +        BOOST_CHECK_EQUAL(num_samps_ret, 10 + i%10); +        num_accum_samps += num_samps_ret; +        if (i == NUM_PKTS_TO_TEST/2){ +            handler.recv( +                &buff.front(), buff.size(), metadata, +                uhd::io_type_t::COMPLEX_FLOAT32, +                uhd::device::RECV_MODE_ONE_PACKET, 1.0 +            ); +            std::cout << "metadata.error_code " << metadata.error_code << std::endl; +            BOOST_REQUIRE(metadata.error_code == uhd::rx_metadata_t::ERROR_CODE_OVERFLOW); +            BOOST_CHECK_TS_CLOSE(metadata.time_spec, uhd::time_spec_t(0, num_accum_samps, SAMP_RATE)); +        } +    } + +    //subsequent receives should be a timeout +    for (size_t i = 0; i < 3; i++){ +        std::cout << "timeout check " << i << std::endl; +        handler.recv( +            &buff.front(), buff.size(), metadata, +            uhd::io_type_t::COMPLEX_FLOAT32, +            uhd::device::RECV_MODE_ONE_PACKET, 1.0 +        ); +        BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_TIMEOUT); +    } +} + +//////////////////////////////////////////////////////////////////////// +BOOST_AUTO_TEST_CASE(test_sph_recv_multi_channel_normal){ +//////////////////////////////////////////////////////////////////////// +    uhd::otw_type_t otw_type; +    otw_type.width = 16; +    otw_type.shift = 0; +    otw_type.byteorder = uhd::otw_type_t::BO_BIG_ENDIAN; + +    uhd::transport::vrt::if_packet_info_t ifpi; +    ifpi.packet_type = uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_DATA; +    ifpi.num_payload_words32 = 0; +    ifpi.packet_count = 0; +    ifpi.sob = true; +    ifpi.eob = false; +    ifpi.has_sid = false; +    ifpi.has_cid = false; +    ifpi.has_tsi = true; +    ifpi.has_tsf = true; +    ifpi.tsi = 0; +    ifpi.tsf = 0; +    ifpi.has_tlr = false; + +    static const double TICK_RATE = 100e6; +    static const double SAMP_RATE = 10e6; +    static const size_t NUM_PKTS_TO_TEST = 30; +    static const size_t NUM_SAMPS_PER_BUFF = 20; +    static const size_t NCHANNELS = 4; + +    std::vector<dummy_recv_xport_class> dummy_recv_xports(NCHANNELS, dummy_recv_xport_class(otw_type)); + +    //generate a bunch of packets +    for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){ +        ifpi.num_payload_words32 = 10 + i%10; +        for (size_t ch = 0; ch < NCHANNELS; ch++){ +            dummy_recv_xports[ch].push_back_packet(ifpi); +        } +        ifpi.packet_count++; +        ifpi.tsf += ifpi.num_payload_words32*size_t(TICK_RATE/SAMP_RATE); +    } + +    //create the super receive packet handler +    uhd::transport::sph::recv_packet_handler handler(NCHANNELS); +    handler.set_vrt_unpacker(&uhd::transport::vrt::if_hdr_unpack_be); +    handler.set_tick_rate(TICK_RATE); +    handler.set_samp_rate(SAMP_RATE); +    for (size_t ch = 0; ch < NCHANNELS; ch++){ +        handler.set_xport_chan_get_buff(ch, boost::bind(&dummy_recv_xport_class::get_recv_buff, &dummy_recv_xports[ch], _1)); +    } +    handler.set_converter(otw_type); + +    //check the received packets +    size_t num_accum_samps = 0; +    std::vector<std::complex<float> > mem(NUM_SAMPS_PER_BUFF*NCHANNELS); +    std::vector<std::complex<float> *> buffs(NCHANNELS); +    for (size_t ch = 0; ch < NCHANNELS; ch++){ +        buffs[ch] = &mem[ch*NUM_SAMPS_PER_BUFF]; +    } +    uhd::rx_metadata_t metadata; +    for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){ +        std::cout << "data check " << i << std::endl; +        size_t num_samps_ret = handler.recv( +            buffs, NUM_SAMPS_PER_BUFF, metadata, +            uhd::io_type_t::COMPLEX_FLOAT32, +            uhd::device::RECV_MODE_ONE_PACKET, 1.0 +        ); +        BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_NONE); +        BOOST_CHECK(not metadata.more_fragments); +        BOOST_CHECK(metadata.has_time_spec); +        BOOST_CHECK_TS_CLOSE(metadata.time_spec, uhd::time_spec_t(0, num_accum_samps, SAMP_RATE)); +        BOOST_CHECK_EQUAL(num_samps_ret, 10 + i%10); +        num_accum_samps += num_samps_ret; +    } + +    //subsequent receives should be a timeout +    for (size_t i = 0; i < 3; i++){ +        std::cout << "timeout check " << i << std::endl; +        handler.recv( +            buffs, NUM_SAMPS_PER_BUFF, metadata, +            uhd::io_type_t::COMPLEX_FLOAT32, +            uhd::device::RECV_MODE_ONE_PACKET, 1.0 +        ); +        BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_TIMEOUT); +    } + +} + +//////////////////////////////////////////////////////////////////////// +BOOST_AUTO_TEST_CASE(test_sph_recv_multi_channel_sequence_error){ +//////////////////////////////////////////////////////////////////////// +    uhd::otw_type_t otw_type; +    otw_type.width = 16; +    otw_type.shift = 0; +    otw_type.byteorder = uhd::otw_type_t::BO_BIG_ENDIAN; + +    uhd::transport::vrt::if_packet_info_t ifpi; +    ifpi.packet_type = uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_DATA; +    ifpi.num_payload_words32 = 0; +    ifpi.packet_count = 0; +    ifpi.sob = true; +    ifpi.eob = false; +    ifpi.has_sid = false; +    ifpi.has_cid = false; +    ifpi.has_tsi = true; +    ifpi.has_tsf = true; +    ifpi.tsi = 0; +    ifpi.tsf = 0; +    ifpi.has_tlr = false; + +    static const double TICK_RATE = 100e6; +    static const double SAMP_RATE = 10e6; +    static const size_t NUM_PKTS_TO_TEST = 30; +    static const size_t NUM_SAMPS_PER_BUFF = 20; +    static const size_t NCHANNELS = 4; + +    std::vector<dummy_recv_xport_class> dummy_recv_xports(NCHANNELS, dummy_recv_xport_class(otw_type)); + +    //generate a bunch of packets +    for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){ +        ifpi.num_payload_words32 = 10 + i%10; +        for (size_t ch = 0; ch < NCHANNELS; ch++){ +            if (i == NUM_PKTS_TO_TEST/2 and ch == 2){ +                continue; //simulates a lost packet +            } +            dummy_recv_xports[ch].push_back_packet(ifpi); +        } +        ifpi.packet_count++; +        ifpi.tsf += ifpi.num_payload_words32*size_t(TICK_RATE/SAMP_RATE); +    } + +    //create the super receive packet handler +    uhd::transport::sph::recv_packet_handler handler(NCHANNELS); +    handler.set_vrt_unpacker(&uhd::transport::vrt::if_hdr_unpack_be); +    handler.set_tick_rate(TICK_RATE); +    handler.set_samp_rate(SAMP_RATE); +    for (size_t ch = 0; ch < NCHANNELS; ch++){ +        handler.set_xport_chan_get_buff(ch, boost::bind(&dummy_recv_xport_class::get_recv_buff, &dummy_recv_xports[ch], _1)); +    } +    handler.set_converter(otw_type); + +    //check the received packets +    size_t num_accum_samps = 0; +    std::vector<std::complex<float> > mem(NUM_SAMPS_PER_BUFF*NCHANNELS); +    std::vector<std::complex<float> *> buffs(NCHANNELS); +    for (size_t ch = 0; ch < NCHANNELS; ch++){ +        buffs[ch] = &mem[ch*NUM_SAMPS_PER_BUFF]; +    } +    uhd::rx_metadata_t metadata; +    for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){ +        std::cout << "data check " << i << std::endl; +        size_t num_samps_ret = handler.recv( +            buffs, NUM_SAMPS_PER_BUFF, metadata, +            uhd::io_type_t::COMPLEX_FLOAT32, +            uhd::device::RECV_MODE_ONE_PACKET, 1.0 +        ); +        if (i == NUM_PKTS_TO_TEST/2){ +            //must get the soft overflow here +            BOOST_REQUIRE(metadata.error_code == uhd::rx_metadata_t::ERROR_CODE_OVERFLOW); +            BOOST_CHECK_TS_CLOSE(metadata.time_spec, uhd::time_spec_t(0, num_accum_samps, SAMP_RATE)); +            num_accum_samps += 10 + i%10; +        } +        else{ +            BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_NONE); +            BOOST_CHECK(not metadata.more_fragments); +            BOOST_CHECK(metadata.has_time_spec); +            BOOST_CHECK_TS_CLOSE(metadata.time_spec, uhd::time_spec_t(0, num_accum_samps, SAMP_RATE)); +            BOOST_CHECK_EQUAL(num_samps_ret, 10 + i%10); +            num_accum_samps += num_samps_ret; +        } +    } + +    //subsequent receives should be a timeout +    for (size_t i = 0; i < 3; i++){ +        std::cout << "timeout check " << i << std::endl; +        handler.recv( +            buffs, NUM_SAMPS_PER_BUFF, metadata, +            uhd::io_type_t::COMPLEX_FLOAT32, +            uhd::device::RECV_MODE_ONE_PACKET, 1.0 +        ); +        BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_TIMEOUT); +    } +} + +//////////////////////////////////////////////////////////////////////// +BOOST_AUTO_TEST_CASE(test_sph_recv_multi_channel_time_error){ +//////////////////////////////////////////////////////////////////////// +    uhd::otw_type_t otw_type; +    otw_type.width = 16; +    otw_type.shift = 0; +    otw_type.byteorder = uhd::otw_type_t::BO_BIG_ENDIAN; + +    uhd::transport::vrt::if_packet_info_t ifpi; +    ifpi.packet_type = uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_DATA; +    ifpi.num_payload_words32 = 0; +    ifpi.packet_count = 0; +    ifpi.sob = true; +    ifpi.eob = false; +    ifpi.has_sid = false; +    ifpi.has_cid = false; +    ifpi.has_tsi = true; +    ifpi.has_tsf = true; +    ifpi.tsi = 0; +    ifpi.tsf = 0; +    ifpi.has_tlr = false; + +    static const double TICK_RATE = 100e6; +    static const double SAMP_RATE = 10e6; +    static const size_t NUM_PKTS_TO_TEST = 30; +    static const size_t NUM_SAMPS_PER_BUFF = 20; +    static const size_t NCHANNELS = 4; + +    std::vector<dummy_recv_xport_class> dummy_recv_xports(NCHANNELS, dummy_recv_xport_class(otw_type)); + +    //generate a bunch of packets +    for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){ +        ifpi.num_payload_words32 = 10 + i%10; +        for (size_t ch = 0; ch < NCHANNELS; ch++){ +            dummy_recv_xports[ch].push_back_packet(ifpi); +        } +        ifpi.packet_count++; +        ifpi.tsf += ifpi.num_payload_words32*size_t(TICK_RATE/SAMP_RATE); +        if (i == NUM_PKTS_TO_TEST/2){ +            ifpi.tsf = 0; //simulate the user changing the time +        } +    } + +    //create the super receive packet handler +    uhd::transport::sph::recv_packet_handler handler(NCHANNELS); +    handler.set_vrt_unpacker(&uhd::transport::vrt::if_hdr_unpack_be); +    handler.set_tick_rate(TICK_RATE); +    handler.set_samp_rate(SAMP_RATE); +    for (size_t ch = 0; ch < NCHANNELS; ch++){ +        handler.set_xport_chan_get_buff(ch, boost::bind(&dummy_recv_xport_class::get_recv_buff, &dummy_recv_xports[ch], _1)); +    } +    handler.set_converter(otw_type); + +    //check the received packets +    size_t num_accum_samps = 0; +    std::vector<std::complex<float> > mem(NUM_SAMPS_PER_BUFF*NCHANNELS); +    std::vector<std::complex<float> *> buffs(NCHANNELS); +    for (size_t ch = 0; ch < NCHANNELS; ch++){ +        buffs[ch] = &mem[ch*NUM_SAMPS_PER_BUFF]; +    } +    uhd::rx_metadata_t metadata; +    for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){ +        std::cout << "data check " << i << std::endl; +        size_t num_samps_ret = handler.recv( +            buffs, NUM_SAMPS_PER_BUFF, metadata, +            uhd::io_type_t::COMPLEX_FLOAT32, +            uhd::device::RECV_MODE_ONE_PACKET, 1.0 +        ); +        BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_NONE); +        BOOST_CHECK(not metadata.more_fragments); +        BOOST_CHECK(metadata.has_time_spec); +        BOOST_CHECK_TS_CLOSE(metadata.time_spec, uhd::time_spec_t(0, num_accum_samps, SAMP_RATE)); +        BOOST_CHECK_EQUAL(num_samps_ret, 10 + i%10); +        num_accum_samps += num_samps_ret; +        if (i == NUM_PKTS_TO_TEST/2){ +            num_accum_samps = 0; //simulate the user changing the time +        } +    } + +    //subsequent receives should be a timeout +    for (size_t i = 0; i < 3; i++){ +        std::cout << "timeout check " << i << std::endl; +        handler.recv( +            buffs, NUM_SAMPS_PER_BUFF, metadata, +            uhd::io_type_t::COMPLEX_FLOAT32, +            uhd::device::RECV_MODE_ONE_PACKET, 1.0 +        ); +        BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_TIMEOUT); +    } +} + +//////////////////////////////////////////////////////////////////////// +BOOST_AUTO_TEST_CASE(test_sph_recv_multi_channel_fragment){ +//////////////////////////////////////////////////////////////////////// +    uhd::otw_type_t otw_type; +    otw_type.width = 16; +    otw_type.shift = 0; +    otw_type.byteorder = uhd::otw_type_t::BO_BIG_ENDIAN; + +    uhd::transport::vrt::if_packet_info_t ifpi; +    ifpi.packet_type = uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_DATA; +    ifpi.num_payload_words32 = 0; +    ifpi.packet_count = 0; +    ifpi.sob = true; +    ifpi.eob = false; +    ifpi.has_sid = false; +    ifpi.has_cid = false; +    ifpi.has_tsi = true; +    ifpi.has_tsf = true; +    ifpi.tsi = 0; +    ifpi.tsf = 0; +    ifpi.has_tlr = false; + +    static const double TICK_RATE = 100e6; +    static const double SAMP_RATE = 10e6; +    static const size_t NUM_PKTS_TO_TEST = 30; +    static const size_t NUM_SAMPS_PER_BUFF = 10; +    static const size_t NCHANNELS = 4; + +    std::vector<dummy_recv_xport_class> dummy_recv_xports(NCHANNELS, dummy_recv_xport_class(otw_type)); + +    //generate a bunch of packets +    for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){ +        ifpi.num_payload_words32 = 10 + i%10; +        for (size_t ch = 0; ch < NCHANNELS; ch++){ +            dummy_recv_xports[ch].push_back_packet(ifpi); +        } +        ifpi.packet_count++; +        ifpi.tsf += ifpi.num_payload_words32*size_t(TICK_RATE/SAMP_RATE); +    } + +    //create the super receive packet handler +    uhd::transport::sph::recv_packet_handler handler(NCHANNELS); +    handler.set_vrt_unpacker(&uhd::transport::vrt::if_hdr_unpack_be); +    handler.set_tick_rate(TICK_RATE); +    handler.set_samp_rate(SAMP_RATE); +    for (size_t ch = 0; ch < NCHANNELS; ch++){ +        handler.set_xport_chan_get_buff(ch, boost::bind(&dummy_recv_xport_class::get_recv_buff, &dummy_recv_xports[ch], _1)); +    } +    handler.set_converter(otw_type); + +    //check the received packets +    size_t num_accum_samps = 0; +    std::vector<std::complex<float> > mem(NUM_SAMPS_PER_BUFF*NCHANNELS); +    std::vector<std::complex<float> *> buffs(NCHANNELS); +    for (size_t ch = 0; ch < NCHANNELS; ch++){ +        buffs[ch] = &mem[ch*NUM_SAMPS_PER_BUFF]; +    } +    uhd::rx_metadata_t metadata; +    for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){ +        std::cout << "data check " << i << std::endl; +        size_t num_samps_ret = handler.recv( +            buffs, NUM_SAMPS_PER_BUFF, metadata, +            uhd::io_type_t::COMPLEX_FLOAT32, +            uhd::device::RECV_MODE_ONE_PACKET, 1.0 +        ); +        BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_NONE); +        BOOST_CHECK(metadata.has_time_spec); +        BOOST_CHECK_TS_CLOSE(metadata.time_spec, uhd::time_spec_t(0, num_accum_samps, SAMP_RATE)); +        BOOST_CHECK_EQUAL(num_samps_ret, 10); +        num_accum_samps += num_samps_ret; + +        if (not metadata.more_fragments) continue; + +        num_samps_ret = handler.recv( +            buffs, NUM_SAMPS_PER_BUFF, metadata, +            uhd::io_type_t::COMPLEX_FLOAT32, +            uhd::device::RECV_MODE_ONE_PACKET, 1.0 +        ); +        BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_NONE); +        BOOST_CHECK(not metadata.more_fragments); +        BOOST_CHECK_EQUAL(metadata.fragment_offset, 10); +        BOOST_CHECK(metadata.has_time_spec); +        BOOST_CHECK_TS_CLOSE(metadata.time_spec, uhd::time_spec_t(0, num_accum_samps, SAMP_RATE)); +        BOOST_CHECK_EQUAL(num_samps_ret, i%10); +        num_accum_samps += num_samps_ret; +    } + +    //subsequent receives should be a timeout +    for (size_t i = 0; i < 3; i++){ +        std::cout << "timeout check " << i << std::endl; +        handler.recv( +            buffs, NUM_SAMPS_PER_BUFF, metadata, +            uhd::io_type_t::COMPLEX_FLOAT32, +            uhd::device::RECV_MODE_ONE_PACKET, 1.0 +        ); +        BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_TIMEOUT); +    } + +} diff --git a/host/tests/sph_send_test.cpp b/host/tests/sph_send_test.cpp new file mode 100644 index 000000000..ed2f54371 --- /dev/null +++ b/host/tests/sph_send_test.cpp @@ -0,0 +1,204 @@ +// +// Copyright 2011 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. +// + +#include <boost/test/unit_test.hpp> +#include "../lib/transport/super_send_packet_handler.hpp" +#include <boost/shared_array.hpp> +#include <boost/bind.hpp> +#include <complex> +#include <vector> +#include <list> + +#define BOOST_CHECK_TS_CLOSE(a, b) \ +    BOOST_CHECK_CLOSE((a).get_real_secs(), (b).get_real_secs(), 0.001) + +/*********************************************************************** + * A dummy managed send buffer for testing + **********************************************************************/ +class dummy_msb : public uhd::transport::managed_send_buffer{ +public: +    void commit(size_t len){ +        if (len == 0) return; +        *_len = len; +    } + +    sptr get_new(boost::shared_array<char> mem, size_t *len){ +        _mem = mem; +        _len = len; +        return make_managed_buffer(this); +    } + +private: +    void *get_buff(void) const{return _mem.get();} +    size_t get_size(void) const{return *_len;} + +    boost::shared_array<char> _mem; +    size_t *_len; +}; + +/*********************************************************************** + * A dummy transport class to fill with fake data + **********************************************************************/ +class dummy_send_xport_class{ +public: +    dummy_send_xport_class(const uhd::otw_type_t &otw_type){ +        _otw_type = otw_type; +    } + +    void pop_front_packet( +        uhd::transport::vrt::if_packet_info_t &ifpi +    ){ +        ifpi.num_packet_words32 = _lens.front()/sizeof(boost::uint32_t); +        if (_otw_type.byteorder == uhd::otw_type_t::BO_BIG_ENDIAN){ +            uhd::transport::vrt::if_hdr_unpack_be(reinterpret_cast<boost::uint32_t *>(_mems.front().get()), ifpi); +        } +        if (_otw_type.byteorder == uhd::otw_type_t::BO_LITTLE_ENDIAN){ +            uhd::transport::vrt::if_hdr_unpack_le(reinterpret_cast<boost::uint32_t *>(_mems.front().get()), ifpi); +        } +        _mems.pop_front(); +        _lens.pop_front(); +    } + +    uhd::transport::managed_send_buffer::sptr get_send_buff(double){ +        _msbs.push_back(dummy_msb()); +        _mems.push_back(boost::shared_array<char>(new char[1000])); +        _lens.push_back(1000); +        uhd::transport::managed_send_buffer::sptr mrb = _msbs.back().get_new(_mems.back(), &_lens.back()); +        return mrb; +    } + +private: +    std::list<boost::shared_array<char> > _mems; +    std::list<size_t> _lens; +    std::list<dummy_msb> _msbs; //list means no-realloc +    uhd::otw_type_t _otw_type; +}; + +//////////////////////////////////////////////////////////////////////// +BOOST_AUTO_TEST_CASE(test_sph_send_one_channel_one_packet_mode){ +//////////////////////////////////////////////////////////////////////// +    uhd::otw_type_t otw_type; +    otw_type.width = 16; +    otw_type.shift = 0; +    otw_type.byteorder = uhd::otw_type_t::BO_BIG_ENDIAN; + +    dummy_send_xport_class dummy_send_xport(otw_type); + +    static const double TICK_RATE = 100e6; +    static const double SAMP_RATE = 10e6; +    static const size_t NUM_PKTS_TO_TEST = 30; + +    //create the super send packet handler +    uhd::transport::sph::send_packet_handler handler(1); +    handler.set_vrt_packer(&uhd::transport::vrt::if_hdr_pack_be); +    handler.set_tick_rate(TICK_RATE); +    handler.set_samp_rate(SAMP_RATE); +    handler.set_xport_chan_get_buff(0, boost::bind(&dummy_send_xport_class::get_send_buff, &dummy_send_xport, _1)); +    handler.set_converter(otw_type); +    handler.set_max_samples_per_packet(20); + +    //allocate metadata and buffer +    std::vector<std::complex<float> > buff(20); +    uhd::tx_metadata_t metadata; +    metadata.has_time_spec = true; +    metadata.time_spec = uhd::time_spec_t(0.0); + +    //generate the test data +    for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){ +        metadata.start_of_burst = (i == 0); +        metadata.end_of_burst = (i == NUM_PKTS_TO_TEST-1); +        const size_t num_sent = handler.send( +            &buff.front(), 10 + i%10, metadata, +            uhd::io_type_t::COMPLEX_FLOAT32, +            uhd::device::SEND_MODE_ONE_PACKET, 1.0 +        ); +        BOOST_CHECK_EQUAL(num_sent, 10 + i%10); +        metadata.time_spec += uhd::time_spec_t(0, num_sent, SAMP_RATE); +    } + +    //check the sent packets +    size_t num_accum_samps = 0; +    uhd::transport::vrt::if_packet_info_t ifpi; +    for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){ +        std::cout << "data check " << i << std::endl; +        dummy_send_xport.pop_front_packet(ifpi); +        BOOST_CHECK_EQUAL(ifpi.num_payload_words32, 10+i%10); +        BOOST_CHECK(ifpi.has_tsi); +        BOOST_CHECK(ifpi.has_tsf); +        BOOST_CHECK_EQUAL(ifpi.tsi, 0); +        BOOST_CHECK_EQUAL(ifpi.tsf, num_accum_samps*TICK_RATE/SAMP_RATE); +        BOOST_CHECK_EQUAL(ifpi.sob, i == 0); +        BOOST_CHECK_EQUAL(ifpi.eob, i == NUM_PKTS_TO_TEST-1); +        num_accum_samps += ifpi.num_payload_words32; +    } +} + +//////////////////////////////////////////////////////////////////////// +BOOST_AUTO_TEST_CASE(test_sph_send_one_channel_full_buffer_mode){ +//////////////////////////////////////////////////////////////////////// +    uhd::otw_type_t otw_type; +    otw_type.width = 16; +    otw_type.shift = 0; +    otw_type.byteorder = uhd::otw_type_t::BO_BIG_ENDIAN; + +    dummy_send_xport_class dummy_send_xport(otw_type); + +    static const double TICK_RATE = 100e6; +    static const double SAMP_RATE = 10e6; +    static const size_t NUM_PKTS_TO_TEST = 30; + +    //create the super send packet handler +    uhd::transport::sph::send_packet_handler handler(1); +    handler.set_vrt_packer(&uhd::transport::vrt::if_hdr_pack_be); +    handler.set_tick_rate(TICK_RATE); +    handler.set_samp_rate(SAMP_RATE); +    handler.set_xport_chan_get_buff(0, boost::bind(&dummy_send_xport_class::get_send_buff, &dummy_send_xport, _1)); +    handler.set_converter(otw_type); +    handler.set_max_samples_per_packet(20); + +    //allocate metadata and buffer +    std::vector<std::complex<float> > buff(20*NUM_PKTS_TO_TEST); +    uhd::tx_metadata_t metadata; +    metadata.start_of_burst = true; +    metadata.end_of_burst = true; +    metadata.has_time_spec = true; +    metadata.time_spec = uhd::time_spec_t(0.0); + +    //generate the test data +    const size_t num_sent = handler.send( +        &buff.front(), buff.size(), metadata, +        uhd::io_type_t::COMPLEX_FLOAT32, +        uhd::device::SEND_MODE_FULL_BUFF, 1.0 +    ); +    BOOST_CHECK_EQUAL(num_sent, buff.size()); + +    //check the sent packets +    size_t num_accum_samps = 0; +    uhd::transport::vrt::if_packet_info_t ifpi; +    for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){ +        std::cout << "data check " << i << std::endl; +        dummy_send_xport.pop_front_packet(ifpi); +        BOOST_CHECK_EQUAL(ifpi.num_payload_words32, 20); +        BOOST_CHECK(ifpi.has_tsi); +        BOOST_CHECK(ifpi.has_tsf); +        BOOST_CHECK_EQUAL(ifpi.tsi, 0); +        BOOST_CHECK_EQUAL(ifpi.tsf, num_accum_samps*TICK_RATE/SAMP_RATE); +        BOOST_CHECK_EQUAL(ifpi.sob, i == 0); +        BOOST_CHECK_EQUAL(ifpi.eob, i == NUM_PKTS_TO_TEST-1); +        num_accum_samps += ifpi.num_payload_words32; +    } +}  | 
