diff options
| -rw-r--r-- | host/lib/transport/udp_zero_copy_asio.cpp | 140 | 
1 files changed, 101 insertions, 39 deletions
diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp index 7e28caf2d..798cc657d 100644 --- a/host/lib/transport/udp_zero_copy_asio.cpp +++ b/host/lib/transport/udp_zero_copy_asio.cpp @@ -35,6 +35,10 @@ namespace asio = boost::asio;  /***********************************************************************   * Constants   **********************************************************************/ +//Define this to the the boost async io calls to perform receive. +//Otherwise, get_recv_buff uses a blocking receive with timeout. +//#define USE_ASIO_ASYNC_RECV +  //enough buffering for half a second of samples at full rate on usrp2  static const size_t MIN_RECV_SOCK_BUFF_SIZE = size_t(4 * 25e6 * 0.5); @@ -43,8 +47,15 @@ static const size_t MIN_RECV_SOCK_BUFF_SIZE = size_t(4 * 25e6 * 0.5);  //but may change with host-based flow control.  static const size_t MIN_SEND_SOCK_BUFF_SIZE = size_t(10e3); -//the number of async frames to allocate for each send and recv -static const size_t DEFAULT_NUM_FRAMES = 32; +//The number of async frames to allocate for each send and recv: +//The non-async recv can have a very large number of recv frames +//because the CPU overhead is independent of the number of frames. +#ifdef USE_ASIO_ASYNC_RECV +static const size_t DEFAULT_NUM_RECV_FRAMES = 32; +#else +static const size_t DEFAULT_NUM_RECV_FRAMES = MIN_RECV_SOCK_BUFF_SIZE/udp_simple::mtu; +#endif +static const size_t DEFAULT_NUM_SEND_FRAMES = 32;  //a single concurrent thread for io_service seems to be the fastest  static const size_t CONCURRENCY_HINT = 1; @@ -67,9 +78,9 @@ public:      ):          _io_service(hints.cast<size_t>("concurrency_hint", CONCURRENCY_HINT)),          _recv_frame_size(size_t(hints.cast<double>("recv_frame_size", udp_simple::mtu))), -        _num_recv_frames(size_t(hints.cast<double>("num_recv_frames", DEFAULT_NUM_FRAMES))), +        _num_recv_frames(size_t(hints.cast<double>("num_recv_frames", DEFAULT_NUM_RECV_FRAMES))),          _send_frame_size(size_t(hints.cast<double>("send_frame_size", udp_simple::mtu))), -        _num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_FRAMES))) +        _num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_SEND_FRAMES)))      {          //std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl; @@ -82,6 +93,13 @@ public:          _socket = new asio::ip::udp::socket(_io_service);          _socket->open(asio::ip::udp::v4());          _socket->connect(receiver_endpoint); +        _sock_fd = _socket->native(); +    } + +    ~udp_zero_copy_asio_impl(void){ +        delete _work; //allow io_service run to complete +        _thread_group.join_all(); //wait for service threads to exit +        delete _socket;      }      void init(void){ @@ -106,10 +124,9 @@ public:          );      } -    ~udp_zero_copy_asio_impl(void){ -        delete _work; //allow io_service run to complete -        _thread_group.join_all(); //wait for service threads to exit -        delete _socket; +    void service(void){ +        set_thread_priority_safe(); +        _io_service.run();      }      //get size for internal socket buffer @@ -126,6 +143,9 @@ public:          return get_buff_size<Opt>();      } +    //////////////////////////////////////////////////////////////////// +    #ifdef USE_ASIO_ASYNC_RECV +    ////////////////////////////////////////////////////////////////////      //! pop a filled recv buffer off of the fifo and bind with the release callback      managed_recv_buffer::sptr get_recv_buff(double timeout){          boost::this_thread::disable_interruption di; //disable because the wait can throw @@ -142,6 +162,74 @@ public:          return managed_recv_buffer::sptr();      } +    //! handle a recv callback -> push the filled memory into the fifo +    void handle_recv(void *mem, size_t len){ +        boost::this_thread::disable_interruption di; //disable because the wait can throw +        _pending_recv_buffs->push_with_wait(boost::asio::buffer(mem, len)); +    } + +    //! release a recv buffer -> start an async recv on the buffer +    void release(void *mem){ +        _socket->async_receive( +            boost::asio::buffer(mem, _recv_frame_size), +            boost::bind( +                &udp_zero_copy_asio_impl::handle_recv, +                shared_from_this(), mem, +                asio::placeholders::bytes_transferred +            ) +        ); +    } + +    //////////////////////////////////////////////////////////////////// +    #else /*USE_ASIO_ASYNC_RECV*/ +    //////////////////////////////////////////////////////////////////// +    managed_recv_buffer::sptr get_recv_buff(double timeout){ +        boost::this_thread::disable_interruption di; //disable because the wait can throw + +        //setup timeval for timeout +        timeval tv; +        tv.tv_sec = 0; +        tv.tv_usec = timeout*1e6; + +        //setup rset for timeout +        fd_set rset; +        FD_ZERO(&rset); +        FD_SET(_sock_fd, &rset); + +        //call select to perform timed wait +        if (::select(_sock_fd+1, &rset, NULL, NULL, &tv) <= 0) +            return managed_recv_buffer::sptr(); + +        //grab an available buffer +        asio::mutable_buffer buff; +        if (not _pending_recv_buffs->pop_with_timed_wait(buff, timeout)) +            return managed_recv_buffer::sptr(); + +        //receive and return the buffer +        return managed_recv_buffer::make_safe( +            asio::buffer( +                boost::asio::buffer_cast<void *>(buff), +                _socket->receive(boost::asio::buffer(buff)) +            ), +            boost::bind( +                &udp_zero_copy_asio_impl::release, +                shared_from_this(), +                asio::buffer_cast<void*>(buff) +            ) +        ); +    } + +    void release(void *mem){ +        boost::this_thread::disable_interruption di; //disable because the wait can throw +        _pending_recv_buffs->push_with_wait( +            boost::asio::buffer(mem, this->get_recv_frame_size()) +        ); +    } + +    //////////////////////////////////////////////////////////////////// +    #endif /*USE_ASIO_ASYNC_RECV*/ +    //////////////////////////////////////////////////////////////////// +      size_t get_num_recv_frames(void) const {return _num_recv_frames;}      size_t get_recv_frame_size(void) const {return _recv_frame_size;} @@ -161,37 +249,6 @@ public:          return managed_send_buffer::sptr();      } -    size_t get_num_send_frames(void) const {return _num_send_frames;} -    size_t get_send_frame_size(void) const {return _send_frame_size;} - -private: -    void service(void){ -        set_thread_priority_safe(); -        _io_service.run(); -    } - -    /******************************************************************* -     * The async send and receive callbacks -     ******************************************************************/ - -    //! handle a recv callback -> push the filled memory into the fifo -    void handle_recv(void *mem, size_t len){ -        boost::this_thread::disable_interruption di; //disable because the wait can throw -        _pending_recv_buffs->push_with_wait(boost::asio::buffer(mem, len)); -    } - -    //! release a recv buffer -> start an async recv on the buffer -    void release(void *mem){ -        _socket->async_receive( -            boost::asio::buffer(mem, _recv_frame_size), -            boost::bind( -                &udp_zero_copy_asio_impl::handle_recv, -                shared_from_this(), mem, -                asio::placeholders::bytes_transferred -            ) -        ); -    } -      //! handle a send callback -> push the emptied memory into the fifo      void handle_send(void *mem){          boost::this_thread::disable_interruption di; //disable because the wait can throw @@ -209,10 +266,15 @@ private:          );      } +    size_t get_num_send_frames(void) const {return _num_send_frames;} +    size_t get_send_frame_size(void) const {return _send_frame_size;} + +private:      //asio guts -> socket and service      asio::ip::udp::socket   *_socket;      asio::io_service        _io_service;      asio::io_service::work  *_work; +    asio::ip::udp::socket::native_type _sock_fd;      //memory management -> buffers and fifos      boost::thread_group _thread_group;  | 
