diff options
Diffstat (limited to 'host/lib/transport/nirio/rpc/rpc_client.cpp')
-rw-r--r-- | host/lib/transport/nirio/rpc/rpc_client.cpp | 185 |
1 files changed, 104 insertions, 81 deletions
diff --git a/host/lib/transport/nirio/rpc/rpc_client.cpp b/host/lib/transport/nirio/rpc/rpc_client.cpp index e9bf4bbb4..efc802e89 100644 --- a/host/lib/transport/nirio/rpc/rpc_client.cpp +++ b/host/lib/transport/nirio/rpc/rpc_client.cpp @@ -12,35 +12,36 @@ #include <boost/version.hpp> -#define CHAIN_BLOCKING_XFER(func, exp, status) \ - if (status) { \ - status = (static_cast<size_t>((func)) == exp); \ - } else { \ +#define CHAIN_BLOCKING_XFER(func, exp, status) \ + if (status) { \ + status = (static_cast<size_t>((func)) == exp); \ + } else { \ UHD_LOGGER_DEBUG("NIRIO") << "rpc_client operation skipped: " #func "\n"; \ - } \ + } namespace uhd { namespace usrprio_rpc { using boost::asio::ip::tcp; -rpc_client::rpc_client ( - const std::string& server, +rpc_client::rpc_client(const std::string& server, const std::string& port, uint32_t process_id, - uint32_t host_id -) : _socket(_io_service) + uint32_t host_id) + : _socket(_io_service) { - //Fill in handshake info - _hshake_args_client.version = CURRENT_VERSION; + // Fill in handshake info + _hshake_args_client.version = CURRENT_VERSION; _hshake_args_client.oldest_comp_version = OLDEST_COMPATIBLE_VERSION; - _hshake_args_client.client_id = build_client_id(host_id, process_id); - _hshake_args_client.boost_archive_version = boost_serialization_archive_utils::get_version(); + _hshake_args_client.client_id = build_client_id(host_id, process_id); + _hshake_args_client.boost_archive_version = + boost_serialization_archive_utils::get_version(); try { - //Synchronous resolve + connect + // Synchronous resolve + connect tcp::resolver resolver(_io_service); - //Create flags object with all special flags disabled. Especially the following: - //- address_configured: Only return addresses if a non-loopback address is configured for the system. + // Create flags object with all special flags disabled. Especially the following: + //- address_configured: Only return addresses if a non-loopback address is + // configured for the system. //- numeric_host: No name resolution should be attempted for host //- numeric_service: No name resolution should be attempted for service tcp::resolver::query::flags query_flags(tcp::resolver::query::passive); @@ -48,56 +49,64 @@ rpc_client::rpc_client ( tcp::resolver::iterator iterator = resolver.resolve(query); boost::asio::connect(_socket, iterator); - UHD_LOGGER_TRACE("NIRIO") << "rpc_client connected to server." ; + UHD_LOGGER_TRACE("NIRIO") << "rpc_client connected to server."; try { - //Perform handshake + // Perform handshake bool status = true; - CHAIN_BLOCKING_XFER( - boost::asio::write(_socket, boost::asio::buffer(&_hshake_args_client, sizeof(_hshake_args_client))), - sizeof(_hshake_args_client), status); - CHAIN_BLOCKING_XFER( - boost::asio::read(_socket, boost::asio::buffer(&_hshake_args_server, sizeof(_hshake_args_server))), - sizeof(_hshake_args_server), status); + CHAIN_BLOCKING_XFER(boost::asio::write(_socket, + boost::asio::buffer(&_hshake_args_client, + sizeof(_hshake_args_client))), + sizeof(_hshake_args_client), + status); + CHAIN_BLOCKING_XFER(boost::asio::read(_socket, + boost::asio::buffer(&_hshake_args_server, + sizeof(_hshake_args_server))), + sizeof(_hshake_args_server), + status); _request.header.client_id = _hshake_args_server.client_id; - if (_hshake_args_server.version >= _hshake_args_client.oldest_comp_version && - _hshake_args_client.version >= _hshake_args_server.oldest_comp_version && - status) - { - UHD_LOGGER_TRACE("NIRIO") << "rpc_client bound to server." ; + if (_hshake_args_server.version >= _hshake_args_client.oldest_comp_version + && _hshake_args_client.version >= _hshake_args_server.oldest_comp_version + && status) { + UHD_LOGGER_TRACE("NIRIO") << "rpc_client bound to server."; _wait_for_next_response_header(); - //Spawn a thread for the io_service callback handler. This thread will run until rpc_client is destroyed. - _io_service_thread.reset(new boost::thread(boost::bind(&boost::asio::io_service::run, &_io_service))); + // Spawn a thread for the io_service callback handler. This thread will + // run until rpc_client is destroyed. + _io_service_thread.reset(new boost::thread( + boost::bind(&boost::asio::io_service::run, &_io_service))); } else { - UHD_LOGGER_DEBUG("NIRIO") << "rpc_client handshake failed." ; - _exec_err.assign(boost::asio::error::connection_refused, boost::asio::error::get_system_category()); + UHD_LOGGER_DEBUG("NIRIO") << "rpc_client handshake failed."; + _exec_err.assign(boost::asio::error::connection_refused, + boost::asio::error::get_system_category()); } - UHD_LOGGER_TRACE("NIRIO") << boost::format("rpc_client archive = %d, rpc_server archive = %d\n.") % - _hshake_args_client.boost_archive_version % - _hshake_args_server.boost_archive_version; + UHD_LOGGER_TRACE("NIRIO") + << boost::format("rpc_client archive = %d, rpc_server archive = %d\n.") + % _hshake_args_client.boost_archive_version + % _hshake_args_server.boost_archive_version; } catch (boost::exception&) { - UHD_LOGGER_DEBUG("NIRIO") << "rpc_client handshake aborted." ; - _exec_err.assign(boost::asio::error::connection_refused, boost::asio::error::get_system_category()); + UHD_LOGGER_DEBUG("NIRIO") << "rpc_client handshake aborted."; + _exec_err.assign(boost::asio::error::connection_refused, + boost::asio::error::get_system_category()); } } catch (boost::exception&) { - UHD_LOGGER_TRACE("NIRIO") << "rpc_client connection request cancelled/aborted." ; - _exec_err.assign(boost::asio::error::connection_aborted, boost::asio::error::get_system_category()); + UHD_LOGGER_TRACE("NIRIO") << "rpc_client connection request cancelled/aborted."; + _exec_err.assign(boost::asio::error::connection_aborted, + boost::asio::error::get_system_category()); } } -rpc_client::~rpc_client () { +rpc_client::~rpc_client() +{ _stop_io_service(); } -const boost::system::error_code& rpc_client::call( - func_id_t func_id, +const boost::system::error_code& rpc_client::call(func_id_t func_id, const func_args_writer_t& in_args, - func_args_reader_t &out_args, - boost::posix_time::milliseconds timeout -) + func_args_reader_t& out_args, + boost::posix_time::milliseconds timeout) { boost::mutex::scoped_lock lock(_mutex); @@ -108,61 +117,70 @@ const boost::system::error_code& rpc_client::call( _exec_err.clear(); - //Send function call header and args + // Send function call header and args bool status = true; try { CHAIN_BLOCKING_XFER( - boost::asio::write(_socket, boost::asio::buffer(&_request.header, sizeof(_request.header))), - sizeof(_request.header), status); - if (not _request.data.empty()) - { - CHAIN_BLOCKING_XFER( - boost::asio::write(_socket, boost::asio::buffer(&(*_request.data.begin()), _request.data.size())), - _request.data.size(), status); + boost::asio::write(_socket, + boost::asio::buffer(&_request.header, sizeof(_request.header))), + sizeof(_request.header), + status); + if (not _request.data.empty()) { + CHAIN_BLOCKING_XFER(boost::asio::write(_socket, + boost::asio::buffer(&(*_request.data.begin()), + _request.data.size())), + _request.data.size(), + status); } } catch (boost::exception&) { status = false; } - //Wait for response using condition variable + // Wait for response using condition variable if (status) { if (!_exec_gate.timed_wait(lock, timeout)) { - UHD_LOGGER_DEBUG("NIRIO") << "rpc_client function timed out." ; - _exec_err.assign(boost::asio::error::timed_out, boost::asio::error::get_system_category()); + UHD_LOGGER_DEBUG("NIRIO") << "rpc_client function timed out."; + _exec_err.assign(boost::asio::error::timed_out, + boost::asio::error::get_system_category()); } } else { - UHD_LOGGER_DEBUG("NIRIO") << "rpc_client connection dropped." ; - _exec_err.assign(boost::asio::error::connection_aborted, boost::asio::error::get_system_category()); + UHD_LOGGER_DEBUG("NIRIO") << "rpc_client connection dropped."; + _exec_err.assign(boost::asio::error::connection_aborted, + boost::asio::error::get_system_category()); _stop_io_service(); } - //Verify that we are talking to the correct endpoint + // Verify that we are talking to the correct endpoint if ((_request.header.client_id != _response.header.client_id) && !_exec_err) { - UHD_LOGGER_DEBUG("NIRIO") << "rpc_client confused about who its talking to." ; - _exec_err.assign(boost::asio::error::operation_aborted, boost::asio::error::get_system_category()); + UHD_LOGGER_DEBUG("NIRIO") << "rpc_client confused about who its talking to."; + _exec_err.assign(boost::asio::error::operation_aborted, + boost::asio::error::get_system_category()); } - if (!_exec_err) out_args.load(_response.data); + if (!_exec_err) + out_args.load(_response.data); } return _exec_err; } -void rpc_client::_handle_response_hdr(const boost::system::error_code& err, size_t transferred, size_t expected) +void rpc_client::_handle_response_hdr( + const boost::system::error_code& err, size_t transferred, size_t expected) { boost::mutex::scoped_lock lock(_mutex); _exec_err = err; if (!_exec_err && (transferred == expected)) { - //Response header received. Verify that it is expected + // Response header received. Verify that it is expected if (func_args_header_t::match_function(_request.header, _response.header)) { - if (_response.header.func_args_size) - { + if (_response.header.func_args_size) { _response.data.resize(_response.header.func_args_size); - //Wait for response data + // Wait for response data boost::asio::async_read(_socket, - boost::asio::buffer(&(*_response.data.begin()), _response.data.size()), - boost::bind(&rpc_client::_handle_response_data, this, + boost::asio::buffer( + &(*_response.data.begin()), _response.data.size()), + boost::bind(&rpc_client::_handle_response_data, + this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred, _response.data.size())); @@ -170,23 +188,27 @@ void rpc_client::_handle_response_hdr(const boost::system::error_code& err, size _handle_response_data(err, 0, 0); } } else { - //Unexpected response. Ignore it. - UHD_LOGGER_DEBUG("NIRIO") << "rpc_client received garbage responses." ; - _exec_err.assign(boost::asio::error::operation_aborted, boost::asio::error::get_system_category()); + // Unexpected response. Ignore it. + UHD_LOGGER_DEBUG("NIRIO") << "rpc_client received garbage responses."; + _exec_err.assign(boost::asio::error::operation_aborted, + boost::asio::error::get_system_category()); _wait_for_next_response_header(); } } - if (_exec_err) _exec_gate.notify_all(); + if (_exec_err) + _exec_gate.notify_all(); } -void rpc_client::_handle_response_data(const boost::system::error_code& err, size_t transferred, size_t expected) +void rpc_client::_handle_response_data( + const boost::system::error_code& err, size_t transferred, size_t expected) { boost::mutex::scoped_lock lock(_mutex); _exec_err = err; if (transferred != expected) { - _exec_err.assign(boost::asio::error::operation_aborted, boost::asio::error::get_system_category()); + _exec_err.assign(boost::asio::error::operation_aborted, + boost::asio::error::get_system_category()); } _exec_gate.notify_all(); @@ -194,15 +216,16 @@ void rpc_client::_handle_response_data(const boost::system::error_code& err, siz _wait_for_next_response_header(); } -void rpc_client::_wait_for_next_response_header() { +void rpc_client::_wait_for_next_response_header() +{ //_mutex must be locked when this call is made - boost::asio::async_read( - _socket, + boost::asio::async_read(_socket, boost::asio::buffer(&_response.header, sizeof(_response.header)), - boost::bind(&rpc_client::_handle_response_hdr, this, + boost::bind(&rpc_client::_handle_response_hdr, + this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred, sizeof(_response.header))); } -}} +}} // namespace uhd::usrprio_rpc |