diff options
| -rw-r--r-- | host/include/uhd/transport/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | host/include/uhd/transport/alignment_buffer.hpp | 69 | ||||
| -rw-r--r-- | host/include/uhd/transport/alignment_buffer.ipp | 144 | ||||
| -rw-r--r-- | host/include/uhd/transport/bounded_buffer.ipp | 13 | ||||
| -rw-r--r-- | host/lib/usrp/usrp2/io_impl.cpp | 187 | ||||
| -rw-r--r-- | host/test/buffer_test.cpp | 51 | 
6 files changed, 150 insertions, 316 deletions
| diff --git a/host/include/uhd/transport/CMakeLists.txt b/host/include/uhd/transport/CMakeLists.txt index 2c84c0724..ec3b7b113 100644 --- a/host/include/uhd/transport/CMakeLists.txt +++ b/host/include/uhd/transport/CMakeLists.txt @@ -17,8 +17,6 @@  INSTALL(FILES -    alignment_buffer.hpp -    alignment_buffer.ipp      bounded_buffer.hpp      bounded_buffer.ipp      convert_types.hpp diff --git a/host/include/uhd/transport/alignment_buffer.hpp b/host/include/uhd/transport/alignment_buffer.hpp deleted file mode 100644 index f44a037f8..000000000 --- a/host/include/uhd/transport/alignment_buffer.hpp +++ /dev/null @@ -1,69 +0,0 @@ -// -// Copyright 2010 Ettus Research LLC -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program.  If not, see <http://www.gnu.org/licenses/>. -// - -#ifndef INCLUDED_UHD_TRANSPORT_ALIGNMENT_BUFFER_HPP -#define INCLUDED_UHD_TRANSPORT_ALIGNMENT_BUFFER_HPP - -#include <uhd/config.hpp> -#include <uhd/transport/bounded_buffer.hpp> //time_duration_t -#include <boost/shared_ptr.hpp> -#include <vector> - -namespace uhd{ namespace transport{ - -    /*! -     * Implement a templated alignment buffer: -     * Used for aligning asynchronously pushed elements with matching ids. -     */ -    template <typename elem_type, typename seq_type> class alignment_buffer{ -    public: -        typedef boost::shared_ptr<alignment_buffer<elem_type, seq_type> > sptr; - -        /*! -         * Make a new alignment buffer object. -         * \param capacity the maximum elements per index -         * \param width the number of elements to align -         */ -        static sptr make(size_t capacity, size_t width); - -        /*! -         * Push an element with sequence id into the buffer at index. -         * \param elem the element to push -         * \param seq the sequence identifier -         * \param index the buffer index -         * \return true if the element fit without popping for space -         */ -        virtual bool push_with_pop_on_full( -            const elem_type &elem, const seq_type &seq, size_t index -        ) = 0; - -        /*! -         * Pop an aligned set of elements from this alignment buffer. -         * \param elems a collection to store the aligned elements -         * \param timeout the timeout in seconds -         * \return false when the operation times out -         */ -        virtual bool pop_elems_with_timed_wait( -            std::vector<elem_type> &elems, double timeout -        ) = 0; -    }; - -}} //namespace - -#include <uhd/transport/alignment_buffer.ipp> - -#endif /* INCLUDED_UHD_TRANSPORT_ALIGNMENT_BUFFER_HPP */ diff --git a/host/include/uhd/transport/alignment_buffer.ipp b/host/include/uhd/transport/alignment_buffer.ipp deleted file mode 100644 index 833b5d399..000000000 --- a/host/include/uhd/transport/alignment_buffer.ipp +++ /dev/null @@ -1,144 +0,0 @@ -// -// Copyright 2010 Ettus Research LLC -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program.  If not, see <http://www.gnu.org/licenses/>. -// - -#ifndef INCLUDED_UHD_TRANSPORT_ALIGNMENT_BUFFER_IPP -#define INCLUDED_UHD_TRANSPORT_ALIGNMENT_BUFFER_IPP - -#include <uhd/transport/bounded_buffer.hpp> -#include <boost/thread/condition_variable.hpp> -#include <utility> - -namespace uhd{ namespace transport{ namespace{ /*anon*/ - -    /*! -     * Imlement a templated alignment buffer: -     * Used for aligning asynchronously pushed elements with matching ids. -     */ -    template <typename elem_type, typename seq_type> -    class alignment_buffer_impl : public alignment_buffer<elem_type, seq_type>{ -    public: - -        alignment_buffer_impl(size_t capacity, size_t width) : _last_seqs(width){ -            for (size_t i = 0; i < width; i++){ -                _buffs.push_back(bounded_buffer<buff_contents_type>::make(capacity)); -                _all_indexes.push_back(i); -            } -            _there_was_a_clear = false; -        } - -        UHD_INLINE bool push_with_pop_on_full( -            const elem_type &elem, const seq_type &seq, size_t index -        ){ -            //clear the buffer for this index if the seqs are mis-ordered -            if (seq < _last_seqs[index]){ -                _buffs[index]->clear(); -                _there_was_a_clear = true; -            } _last_seqs[index] = seq; -            return _buffs[index]->push_with_pop_on_full(buff_contents_type(elem, seq)); -        } - -        UHD_INLINE bool pop_elems_with_timed_wait( -            std::vector<elem_type> &elems, double timeout -        ){ -            boost::system_time exit_time = boost::get_system_time() + to_time_dur(timeout); -            buff_contents_type buff_contents_tmp; -            std::list<size_t> indexes_to_do(_all_indexes); - -            //do an initial pop to load an initial sequence id -            size_t index = indexes_to_do.front(); -            if (not _buffs[index]->pop_with_timed_wait( -                buff_contents_tmp, from_time_dur(exit_time - boost::get_system_time()) -            )) return false; -            elems[index] = buff_contents_tmp.first; -            seq_type expected_seq_id = buff_contents_tmp.second; -            indexes_to_do.pop_front(); - -            //get an aligned set of elements from the buffers: -            while(indexes_to_do.size() != 0){ - -                //respond to a clear by starting from scratch -                if(_there_was_a_clear){ -                    _there_was_a_clear = false; -                    indexes_to_do = _all_indexes; -                    index = indexes_to_do.front(); -                    if (not _buffs[index]->pop_with_timed_wait( -                        buff_contents_tmp, from_time_dur(exit_time - boost::get_system_time()) -                    )) return false; -                    elems[index] = buff_contents_tmp.first; -                    expected_seq_id = buff_contents_tmp.second; -                    indexes_to_do.pop_front(); -                } - -                //pop an element off for this index -                index = indexes_to_do.front(); -                if (not _buffs[index]->pop_with_timed_wait( -                    buff_contents_tmp, from_time_dur(exit_time - boost::get_system_time()) -                )) return false; - -                //if the sequence id matches: -                //  store the popped element into the output, -                //  remove this index from the list and continue -                if (buff_contents_tmp.second == expected_seq_id){ -                    elems[index] = buff_contents_tmp.first; -                    indexes_to_do.pop_front(); -                    continue; -                } - -                //if the sequence id is older: -                //  continue with the same index to try again -                if (buff_contents_tmp.second < expected_seq_id){ -                    continue; -                } - -                //if the sequence id is newer: -                //  store the popped element into the output, -                //  add all other indexes back into the list -                if (buff_contents_tmp.second > expected_seq_id){ -                    elems[index] = buff_contents_tmp.first; -                    expected_seq_id = buff_contents_tmp.second; -                    indexes_to_do = _all_indexes; -                    indexes_to_do.remove(index); -                    continue; -                } -            } -            return true; -        } - -    private: -        //a vector of bounded buffers for each index -        typedef std::pair<elem_type, seq_type> buff_contents_type; -        std::vector<typename bounded_buffer<buff_contents_type>::sptr> _buffs; -        std::vector<seq_type> _last_seqs; -        std::list<size_t> _all_indexes; -        bool _there_was_a_clear; -    }; - -}}} //namespace - -namespace uhd{ namespace transport{ - -    template <typename elem_type, typename seq_type> -    typename alignment_buffer<elem_type, seq_type>::sptr -    alignment_buffer<elem_type, seq_type>::make(size_t capacity, size_t width){ -        return typename alignment_buffer<elem_type, seq_type>::sptr( -            new alignment_buffer_impl<elem_type, seq_type>(capacity, width) -        ); -    } - -}} //namespace - -#endif /* INCLUDED_UHD_TRANSPORT_ALIGNMENT_BUFFER_IPP */ diff --git a/host/include/uhd/transport/bounded_buffer.ipp b/host/include/uhd/transport/bounded_buffer.ipp index edc7faa06..f7915d866 100644 --- a/host/include/uhd/transport/bounded_buffer.ipp +++ b/host/include/uhd/transport/bounded_buffer.ipp @@ -26,14 +26,6 @@  namespace uhd{ namespace transport{ namespace{ /*anon*/ -    static UHD_INLINE boost::posix_time::time_duration to_time_dur(double timeout){ -        return boost::posix_time::microseconds(long(timeout*1e6)); -    } - -    static UHD_INLINE double from_time_dur(const boost::posix_time::time_duration &time_dur){ -        return 1e-6*time_dur.total_microseconds(); -    } -      template <typename elem_type>      class bounded_buffer_impl : public bounded_buffer<elem_type>{      public: @@ -127,6 +119,11 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/              _buffer.pop_back();              return elem;          } + +        static UHD_INLINE boost::posix_time::time_duration to_time_dur(double timeout){ +            return boost::posix_time::microseconds(long(timeout*1e6)); +        } +      };  }}} //namespace diff --git a/host/lib/usrp/usrp2/io_impl.cpp b/host/lib/usrp/usrp2/io_impl.cpp index cbc0a0817..c8e4b7096 100644 --- a/host/lib/usrp/usrp2/io_impl.cpp +++ b/host/lib/usrp/usrp2/io_impl.cpp @@ -21,11 +21,13 @@  #include <uhd/utils/byteswap.hpp>  #include <uhd/utils/thread_priority.hpp>  #include <uhd/transport/convert_types.hpp> -#include <uhd/transport/alignment_buffer.hpp> +#include <uhd/transport/bounded_buffer.hpp>  #include <boost/format.hpp>  #include <boost/bind.hpp>  #include <boost/thread.hpp> +#include <boost/date_time/posix_time/posix_time_types.hpp>  #include <iostream> +#include <list>  using namespace uhd;  using namespace uhd::usrp; @@ -108,16 +110,24 @@ private:   * - vrt packet handler states   **********************************************************************/  struct usrp2_impl::io_impl{ -    typedef alignment_buffer<managed_recv_buffer::sptr, time_spec_t> alignment_buffer_type; -    io_impl(size_t num_recv_frames, size_t send_frame_size, size_t width): +    io_impl(size_t send_frame_size, size_t width):          packet_handler_recv_state(width), -        recv_pirate_booty(alignment_buffer_type::make(num_recv_frames-3, width)),          async_msg_fifo(bounded_buffer<async_metadata_t>::make(100/*messages deep*/))      { -        for (size_t i = 0; i < width; i++) fc_mons.push_back( -            flow_control_monitor::sptr(new flow_control_monitor(usrp2_impl::sram_bytes/send_frame_size)) -        ); +        for (size_t i = 0; i < width; i++){ +            fc_mons.push_back(flow_control_monitor::sptr( +                new flow_control_monitor(usrp2_impl::sram_bytes/send_frame_size) +            )); +            //init empty packet infos +            vrt::if_packet_info_t packet_info; +            packet_info.packet_count = 0; +            packet_info.has_tsi = true; +            packet_info.tsi = 0; +            packet_info.has_tsf = true; +            packet_info.tsf = 0; +            prev_infos.push_back(packet_info); +        }      }      ~io_impl(void){ @@ -126,11 +136,6 @@ struct usrp2_impl::io_impl{          recv_pirate_crew.join_all();      } -    bool get_recv_buffs(vrt_packet_handler::managed_recv_buffs_t &buffs, double timeout){ -        boost::this_thread::disable_interruption di; //disable because the wait can throw -        return recv_pirate_booty->pop_elems_with_timed_wait(buffs, timeout); -    } -      bool get_send_buffs(          const std::vector<zero_copy_if::sptr> &trans,          vrt_packet_handler::managed_send_buffs_t &buffs, @@ -151,6 +156,15 @@ struct usrp2_impl::io_impl{          return true;      } +    bool get_recv_buffs( +        const std::vector<zero_copy_if::sptr> xports, +        vrt_packet_handler::managed_recv_buffs_t &buffs, +        double timeout +    ); + +    //previous state for each buffer +    std::vector<vrt::if_packet_info_t> prev_infos; +      //flow control monitors      std::vector<flow_control_monitor::sptr> fc_mons; @@ -162,29 +176,28 @@ struct usrp2_impl::io_impl{      void recv_pirate_loop(zero_copy_if::sptr, usrp2_mboard_impl::sptr, size_t);      boost::thread_group recv_pirate_crew;      bool recv_pirate_crew_raiding; -    alignment_buffer_type::sptr recv_pirate_booty;      bounded_buffer<async_metadata_t>::sptr async_msg_fifo;      boost::mutex spawn_mutex;  };  /***********************************************************************   * Receive Pirate Loop - * - while raiding, loot for recv buffers - * - put booty into the alignment buffer + * - while raiding, loot for message packet + * - update flow control condition count + * - put async message packets into queue   **********************************************************************/  void usrp2_impl::io_impl::recv_pirate_loop( -    zero_copy_if::sptr zc_if, +    zero_copy_if::sptr zc_if_err0,      usrp2_mboard_impl::sptr mboard,      size_t index  ){      set_thread_priority_safe();      recv_pirate_crew_raiding = true; -    size_t next_packet_seq = 0;      spawn_mutex.unlock();      while(recv_pirate_crew_raiding){ -        managed_recv_buffer::sptr buff = zc_if->get_recv_buff(); +        managed_recv_buffer::sptr buff = zc_if_err0->get_recv_buff();          if (not buff.get()) continue; //ignore timeout/error buffers          try{ @@ -194,26 +207,6 @@ void usrp2_impl::io_impl::recv_pirate_loop(              const boost::uint32_t *vrt_hdr = buff->cast<const boost::uint32_t *>();              vrt::if_hdr_unpack_be(vrt_hdr, if_packet_info); -            //handle the rx data stream -            if (if_packet_info.sid == usrp2_impl::RECV_SID){ -                //handle the packet count / sequence number -                if (if_packet_info.packet_count != next_packet_seq){ -                    //std::cerr << "S" << (if_packet_info.packet_count - next_packet_seq)%16; -                    std::cerr << "O" << std::flush; //report overflow (drops in the kernel) -                } -                next_packet_seq = (if_packet_info.packet_count+1)%16; - -                //extract the timespec and round to the nearest packet -                UHD_ASSERT_THROW(if_packet_info.has_tsi and if_packet_info.has_tsf); -                time_spec_t time( -                    time_t(if_packet_info.tsi), size_t(if_packet_info.tsf), mboard->get_master_clock_freq() -                ); - -                //push the packet into the buffer with the new time -                recv_pirate_booty->push_with_pop_on_full(buff, time, index); -                continue; -            } -              //handle a tx async report message              if (if_packet_info.sid == usrp2_impl::ASYNC_SID and if_packet_info.packet_type != vrt::if_packet_info_t::PACKET_TYPE_DATA){ @@ -253,11 +246,10 @@ void usrp2_impl::io_impl::recv_pirate_loop(  void usrp2_impl::io_init(void){      //the assumption is that all data transports should be identical -    const size_t num_recv_frames = _data_transports.front()->get_num_recv_frames();      const size_t send_frame_size = _data_transports.front()->get_send_frame_size();      //create new io impl -    _io_impl = UHD_PIMPL_MAKE(io_impl, (num_recv_frames, send_frame_size, _data_transports.size())); +    _io_impl = UHD_PIMPL_MAKE(io_impl, (send_frame_size, _data_transports.size()));      //TODO temporary fix for weird power up state, remove when FPGA fixed      { @@ -276,7 +268,7 @@ void usrp2_impl::io_init(void){          //spawn a new pirate to plunder the recv booty          _io_impl->recv_pirate_crew.create_thread(boost::bind(              &usrp2_impl::io_impl::recv_pirate_loop, -            _io_impl.get(), _data_transports.at(i), +            _io_impl.get(), _err0_transports.at(i),              _mboards.at(i), i          ));          //block here until the spawned thread unlocks @@ -328,6 +320,117 @@ size_t usrp2_impl::send(  }  /*********************************************************************** + * Alignment logic on receive + **********************************************************************/ +static UHD_INLINE boost::posix_time::time_duration to_time_dur(double timeout){ +    return boost::posix_time::microseconds(long(timeout*1e6)); +} + +static UHD_INLINE double from_time_dur(const boost::posix_time::time_duration &time_dur){ +    return 1e-6*time_dur.total_microseconds(); +} + +static UHD_INLINE time_spec_t extract_time_spec(const vrt::if_packet_info_t &packet_info){ +    return time_spec_t( //assumes has_tsi and has_tsf are true +        time_t(packet_info.tsi), size_t(packet_info.tsf), +        100e6 //tick rate does not have to be correct for comparison purposes +    ); +} + +static UHD_INLINE void extract_packet_info( +    managed_recv_buffer::sptr buff, +    vrt::if_packet_info_t &prev_info, +    time_spec_t &time, bool &clear +){ +    //extract packet info +    vrt::if_packet_info_t next_info; +    vrt::if_hdr_unpack_be(buff->cast<const boost::uint32_t *>(), next_info); + +    //handle the packet count / sequence number +    if ((prev_info.packet_count+1)%16 != next_info.packet_count){ +        std::cerr << "O" << std::flush; //report overflow (drops in the kernel) +    } + +    time = extract_time_spec(next_info); +    clear = extract_time_spec(prev_info) > time; +    prev_info = next_info; +} + +UHD_INLINE bool usrp2_impl::io_impl::get_recv_buffs( +    const std::vector<zero_copy_if::sptr> xports, +    vrt_packet_handler::managed_recv_buffs_t &buffs, +    double timeout +){ +    if (buffs.size() == 1){ +        buffs[0] = xports[0]->get_recv_buff(timeout); +        if (buffs[0].get() == NULL) return false; +        bool clear; time_spec_t time; //unused variables +        //call extract_packet_info to handle printing the overflows +        extract_packet_info(buffs[0], this->prev_infos[0], time, clear); +        return true; +    } +    //-------------------- begin alignment logic ---------------------// +    boost::system_time exit_time = boost::get_system_time() + to_time_dur(timeout); +    managed_recv_buffer::sptr buff_tmp; +    std::list<size_t> _all_indexes, indexes_to_do; +    for (size_t i = 0; i < buffs.size(); i++) _all_indexes.push_back(i); +    bool clear; +    time_spec_t expected_time; + +    //respond to a clear by starting from scratch +    got_clear: +    indexes_to_do = _all_indexes; +    clear = false; + +    //do an initial pop to load an initial sequence id +    size_t index = indexes_to_do.front(); +    buff_tmp = xports[index]->get_recv_buff(from_time_dur(exit_time - boost::get_system_time())); +    if (buff_tmp.get() == NULL) return false; +    extract_packet_info(buff_tmp, this->prev_infos[index], expected_time, clear); +    if (clear) goto got_clear; +    buffs[index] = buff_tmp; +    indexes_to_do.pop_front(); + +    //get an aligned set of elements from the buffers: +    while(indexes_to_do.size() != 0){ + +        //pop an element off for this index +        index = indexes_to_do.front(); +        buff_tmp = xports[index]->get_recv_buff(from_time_dur(exit_time - boost::get_system_time())); +        if (buff_tmp.get() == NULL) return false; +        time_spec_t this_time; +        extract_packet_info(buff_tmp, this->prev_infos[index], this_time, clear); +        if (clear) goto got_clear; +        buffs[index] = buff_tmp; + +        //if the sequence id matches: +        //  remove this index from the list and continue +        if (this_time == expected_time){ +            indexes_to_do.pop_front(); +            continue; +        } + +        //if the sequence id is older: +        //  continue with the same index to try again +        else if (this_time < expected_time){ +            continue; +        } + +        //if the sequence id is newer: +        //  use the new expected time for comparison +        //  add all other indexes back into the list +        else{ +            expected_time = this_time; +            indexes_to_do = _all_indexes; +            indexes_to_do.remove(index); +            continue; +        } +    } +    return true; +    //-------------------- end alignment logic -----------------------// +} + +/***********************************************************************   * Receive Data   **********************************************************************/  size_t usrp2_impl::get_max_recv_samps_per_packet(void) const{ @@ -357,7 +460,7 @@ size_t usrp2_impl::recv(          io_type, _rx_otw_type,                     //input and output types to convert          _mboards.front()->get_master_clock_freq(), //master clock tick rate          uhd::transport::vrt::if_hdr_unpack_be, -        boost::bind(&usrp2_impl::io_impl::get_recv_buffs, _io_impl.get(), _1, timeout), +        boost::bind(&usrp2_impl::io_impl::get_recv_buffs, _io_impl.get(), _data_transports, _1, timeout),          boost::bind(&handle_overflow, _mboards, _1)      );  } diff --git a/host/test/buffer_test.cpp b/host/test/buffer_test.cpp index 8445412e7..e7bc88699 100644 --- a/host/test/buffer_test.cpp +++ b/host/test/buffer_test.cpp @@ -17,7 +17,6 @@  #include <boost/test/unit_test.hpp>  #include <uhd/transport/bounded_buffer.hpp> -#include <uhd/transport/alignment_buffer.hpp>  #include <boost/assign/list_of.hpp>  using namespace boost::assign; @@ -63,53 +62,3 @@ BOOST_AUTO_TEST_CASE(test_bounded_buffer_with_pop_on_full){      BOOST_CHECK(bb->pop_with_timed_wait(val, timeout));      BOOST_CHECK_EQUAL(val, 3);  } - -BOOST_AUTO_TEST_CASE(test_alignment_buffer){ -    alignment_buffer<int, size_t>::sptr ab(alignment_buffer<int, size_t>::make(7, 3)); -    //load index 0 with all good seq numbers -    BOOST_CHECK(ab->push_with_pop_on_full(0, 0, 0)); -    BOOST_CHECK(ab->push_with_pop_on_full(1, 1, 0)); -    BOOST_CHECK(ab->push_with_pop_on_full(2, 2, 0)); -    BOOST_CHECK(ab->push_with_pop_on_full(3, 3, 0)); -    BOOST_CHECK(ab->push_with_pop_on_full(4, 4, 0)); - -    //load index 1 with some skipped seq numbers -    BOOST_CHECK(ab->push_with_pop_on_full(10, 0, 1)); -    BOOST_CHECK(ab->push_with_pop_on_full(11, 1, 1)); -    BOOST_CHECK(ab->push_with_pop_on_full(14, 4, 1)); -    BOOST_CHECK(ab->push_with_pop_on_full(15, 5, 1)); -    BOOST_CHECK(ab->push_with_pop_on_full(16, 6, 1)); - -    //load index 2 with all good seq numbers -    BOOST_CHECK(ab->push_with_pop_on_full(20, 0, 2)); -    BOOST_CHECK(ab->push_with_pop_on_full(21, 1, 2)); -    BOOST_CHECK(ab->push_with_pop_on_full(22, 2, 2)); -    BOOST_CHECK(ab->push_with_pop_on_full(23, 3, 2)); -    BOOST_CHECK(ab->push_with_pop_on_full(24, 4, 2)); - -    //readback aligned values -    std::vector<int> aligned_elems(3); - -    static const std::vector<int> expected_elems0 = list_of(0)(10)(20); -    BOOST_CHECK(ab->pop_elems_with_timed_wait(aligned_elems, timeout)); -    BOOST_CHECK_EQUAL_COLLECTIONS( -        aligned_elems.begin(), aligned_elems.end(), -        expected_elems0.begin(), expected_elems0.end() -    ); - -    static const std::vector<int> expected_elems1 = list_of(1)(11)(21); -    BOOST_CHECK(ab->pop_elems_with_timed_wait(aligned_elems, timeout)); -    BOOST_CHECK_EQUAL_COLLECTIONS( -        aligned_elems.begin(), aligned_elems.end(), -        expected_elems1.begin(), expected_elems1.end() -    ); - -    //there was a skip now find 4 - -    static const std::vector<int> expected_elems4 = list_of(4)(14)(24); -    BOOST_CHECK(ab->pop_elems_with_timed_wait(aligned_elems, timeout)); -    BOOST_CHECK_EQUAL_COLLECTIONS( -        aligned_elems.begin(), aligned_elems.end(), -        expected_elems4.begin(), expected_elems4.end() -    ); -} | 
