aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/transport/nirio/rpc/rpc_client.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/transport/nirio/rpc/rpc_client.cpp')
-rw-r--r--host/lib/transport/nirio/rpc/rpc_client.cpp185
1 files changed, 104 insertions, 81 deletions
diff --git a/host/lib/transport/nirio/rpc/rpc_client.cpp b/host/lib/transport/nirio/rpc/rpc_client.cpp
index e9bf4bbb4..efc802e89 100644
--- a/host/lib/transport/nirio/rpc/rpc_client.cpp
+++ b/host/lib/transport/nirio/rpc/rpc_client.cpp
@@ -12,35 +12,36 @@
#include <boost/version.hpp>
-#define CHAIN_BLOCKING_XFER(func, exp, status) \
- if (status) { \
- status = (static_cast<size_t>((func)) == exp); \
- } else { \
+#define CHAIN_BLOCKING_XFER(func, exp, status) \
+ if (status) { \
+ status = (static_cast<size_t>((func)) == exp); \
+ } else { \
UHD_LOGGER_DEBUG("NIRIO") << "rpc_client operation skipped: " #func "\n"; \
- } \
+ }
namespace uhd { namespace usrprio_rpc {
using boost::asio::ip::tcp;
-rpc_client::rpc_client (
- const std::string& server,
+rpc_client::rpc_client(const std::string& server,
const std::string& port,
uint32_t process_id,
- uint32_t host_id
-) : _socket(_io_service)
+ uint32_t host_id)
+ : _socket(_io_service)
{
- //Fill in handshake info
- _hshake_args_client.version = CURRENT_VERSION;
+ // Fill in handshake info
+ _hshake_args_client.version = CURRENT_VERSION;
_hshake_args_client.oldest_comp_version = OLDEST_COMPATIBLE_VERSION;
- _hshake_args_client.client_id = build_client_id(host_id, process_id);
- _hshake_args_client.boost_archive_version = boost_serialization_archive_utils::get_version();
+ _hshake_args_client.client_id = build_client_id(host_id, process_id);
+ _hshake_args_client.boost_archive_version =
+ boost_serialization_archive_utils::get_version();
try {
- //Synchronous resolve + connect
+ // Synchronous resolve + connect
tcp::resolver resolver(_io_service);
- //Create flags object with all special flags disabled. Especially the following:
- //- address_configured: Only return addresses if a non-loopback address is configured for the system.
+ // Create flags object with all special flags disabled. Especially the following:
+ //- address_configured: Only return addresses if a non-loopback address is
+ // configured for the system.
//- numeric_host: No name resolution should be attempted for host
//- numeric_service: No name resolution should be attempted for service
tcp::resolver::query::flags query_flags(tcp::resolver::query::passive);
@@ -48,56 +49,64 @@ rpc_client::rpc_client (
tcp::resolver::iterator iterator = resolver.resolve(query);
boost::asio::connect(_socket, iterator);
- UHD_LOGGER_TRACE("NIRIO") << "rpc_client connected to server." ;
+ UHD_LOGGER_TRACE("NIRIO") << "rpc_client connected to server.";
try {
- //Perform handshake
+ // Perform handshake
bool status = true;
- CHAIN_BLOCKING_XFER(
- boost::asio::write(_socket, boost::asio::buffer(&_hshake_args_client, sizeof(_hshake_args_client))),
- sizeof(_hshake_args_client), status);
- CHAIN_BLOCKING_XFER(
- boost::asio::read(_socket, boost::asio::buffer(&_hshake_args_server, sizeof(_hshake_args_server))),
- sizeof(_hshake_args_server), status);
+ CHAIN_BLOCKING_XFER(boost::asio::write(_socket,
+ boost::asio::buffer(&_hshake_args_client,
+ sizeof(_hshake_args_client))),
+ sizeof(_hshake_args_client),
+ status);
+ CHAIN_BLOCKING_XFER(boost::asio::read(_socket,
+ boost::asio::buffer(&_hshake_args_server,
+ sizeof(_hshake_args_server))),
+ sizeof(_hshake_args_server),
+ status);
_request.header.client_id = _hshake_args_server.client_id;
- if (_hshake_args_server.version >= _hshake_args_client.oldest_comp_version &&
- _hshake_args_client.version >= _hshake_args_server.oldest_comp_version &&
- status)
- {
- UHD_LOGGER_TRACE("NIRIO") << "rpc_client bound to server." ;
+ if (_hshake_args_server.version >= _hshake_args_client.oldest_comp_version
+ && _hshake_args_client.version >= _hshake_args_server.oldest_comp_version
+ && status) {
+ UHD_LOGGER_TRACE("NIRIO") << "rpc_client bound to server.";
_wait_for_next_response_header();
- //Spawn a thread for the io_service callback handler. This thread will run until rpc_client is destroyed.
- _io_service_thread.reset(new boost::thread(boost::bind(&boost::asio::io_service::run, &_io_service)));
+ // Spawn a thread for the io_service callback handler. This thread will
+ // run until rpc_client is destroyed.
+ _io_service_thread.reset(new boost::thread(
+ boost::bind(&boost::asio::io_service::run, &_io_service)));
} else {
- UHD_LOGGER_DEBUG("NIRIO") << "rpc_client handshake failed." ;
- _exec_err.assign(boost::asio::error::connection_refused, boost::asio::error::get_system_category());
+ UHD_LOGGER_DEBUG("NIRIO") << "rpc_client handshake failed.";
+ _exec_err.assign(boost::asio::error::connection_refused,
+ boost::asio::error::get_system_category());
}
- UHD_LOGGER_TRACE("NIRIO") << boost::format("rpc_client archive = %d, rpc_server archive = %d\n.") %
- _hshake_args_client.boost_archive_version %
- _hshake_args_server.boost_archive_version;
+ UHD_LOGGER_TRACE("NIRIO")
+ << boost::format("rpc_client archive = %d, rpc_server archive = %d\n.")
+ % _hshake_args_client.boost_archive_version
+ % _hshake_args_server.boost_archive_version;
} catch (boost::exception&) {
- UHD_LOGGER_DEBUG("NIRIO") << "rpc_client handshake aborted." ;
- _exec_err.assign(boost::asio::error::connection_refused, boost::asio::error::get_system_category());
+ UHD_LOGGER_DEBUG("NIRIO") << "rpc_client handshake aborted.";
+ _exec_err.assign(boost::asio::error::connection_refused,
+ boost::asio::error::get_system_category());
}
} catch (boost::exception&) {
- UHD_LOGGER_TRACE("NIRIO") << "rpc_client connection request cancelled/aborted." ;
- _exec_err.assign(boost::asio::error::connection_aborted, boost::asio::error::get_system_category());
+ UHD_LOGGER_TRACE("NIRIO") << "rpc_client connection request cancelled/aborted.";
+ _exec_err.assign(boost::asio::error::connection_aborted,
+ boost::asio::error::get_system_category());
}
}
-rpc_client::~rpc_client () {
+rpc_client::~rpc_client()
+{
_stop_io_service();
}
-const boost::system::error_code& rpc_client::call(
- func_id_t func_id,
+const boost::system::error_code& rpc_client::call(func_id_t func_id,
const func_args_writer_t& in_args,
- func_args_reader_t &out_args,
- boost::posix_time::milliseconds timeout
-)
+ func_args_reader_t& out_args,
+ boost::posix_time::milliseconds timeout)
{
boost::mutex::scoped_lock lock(_mutex);
@@ -108,61 +117,70 @@ const boost::system::error_code& rpc_client::call(
_exec_err.clear();
- //Send function call header and args
+ // Send function call header and args
bool status = true;
try {
CHAIN_BLOCKING_XFER(
- boost::asio::write(_socket, boost::asio::buffer(&_request.header, sizeof(_request.header))),
- sizeof(_request.header), status);
- if (not _request.data.empty())
- {
- CHAIN_BLOCKING_XFER(
- boost::asio::write(_socket, boost::asio::buffer(&(*_request.data.begin()), _request.data.size())),
- _request.data.size(), status);
+ boost::asio::write(_socket,
+ boost::asio::buffer(&_request.header, sizeof(_request.header))),
+ sizeof(_request.header),
+ status);
+ if (not _request.data.empty()) {
+ CHAIN_BLOCKING_XFER(boost::asio::write(_socket,
+ boost::asio::buffer(&(*_request.data.begin()),
+ _request.data.size())),
+ _request.data.size(),
+ status);
}
} catch (boost::exception&) {
status = false;
}
- //Wait for response using condition variable
+ // Wait for response using condition variable
if (status) {
if (!_exec_gate.timed_wait(lock, timeout)) {
- UHD_LOGGER_DEBUG("NIRIO") << "rpc_client function timed out." ;
- _exec_err.assign(boost::asio::error::timed_out, boost::asio::error::get_system_category());
+ UHD_LOGGER_DEBUG("NIRIO") << "rpc_client function timed out.";
+ _exec_err.assign(boost::asio::error::timed_out,
+ boost::asio::error::get_system_category());
}
} else {
- UHD_LOGGER_DEBUG("NIRIO") << "rpc_client connection dropped." ;
- _exec_err.assign(boost::asio::error::connection_aborted, boost::asio::error::get_system_category());
+ UHD_LOGGER_DEBUG("NIRIO") << "rpc_client connection dropped.";
+ _exec_err.assign(boost::asio::error::connection_aborted,
+ boost::asio::error::get_system_category());
_stop_io_service();
}
- //Verify that we are talking to the correct endpoint
+ // Verify that we are talking to the correct endpoint
if ((_request.header.client_id != _response.header.client_id) && !_exec_err) {
- UHD_LOGGER_DEBUG("NIRIO") << "rpc_client confused about who its talking to." ;
- _exec_err.assign(boost::asio::error::operation_aborted, boost::asio::error::get_system_category());
+ UHD_LOGGER_DEBUG("NIRIO") << "rpc_client confused about who its talking to.";
+ _exec_err.assign(boost::asio::error::operation_aborted,
+ boost::asio::error::get_system_category());
}
- if (!_exec_err) out_args.load(_response.data);
+ if (!_exec_err)
+ out_args.load(_response.data);
}
return _exec_err;
}
-void rpc_client::_handle_response_hdr(const boost::system::error_code& err, size_t transferred, size_t expected)
+void rpc_client::_handle_response_hdr(
+ const boost::system::error_code& err, size_t transferred, size_t expected)
{
boost::mutex::scoped_lock lock(_mutex);
_exec_err = err;
if (!_exec_err && (transferred == expected)) {
- //Response header received. Verify that it is expected
+ // Response header received. Verify that it is expected
if (func_args_header_t::match_function(_request.header, _response.header)) {
- if (_response.header.func_args_size)
- {
+ if (_response.header.func_args_size) {
_response.data.resize(_response.header.func_args_size);
- //Wait for response data
+ // Wait for response data
boost::asio::async_read(_socket,
- boost::asio::buffer(&(*_response.data.begin()), _response.data.size()),
- boost::bind(&rpc_client::_handle_response_data, this,
+ boost::asio::buffer(
+ &(*_response.data.begin()), _response.data.size()),
+ boost::bind(&rpc_client::_handle_response_data,
+ this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred,
_response.data.size()));
@@ -170,23 +188,27 @@ void rpc_client::_handle_response_hdr(const boost::system::error_code& err, size
_handle_response_data(err, 0, 0);
}
} else {
- //Unexpected response. Ignore it.
- UHD_LOGGER_DEBUG("NIRIO") << "rpc_client received garbage responses." ;
- _exec_err.assign(boost::asio::error::operation_aborted, boost::asio::error::get_system_category());
+ // Unexpected response. Ignore it.
+ UHD_LOGGER_DEBUG("NIRIO") << "rpc_client received garbage responses.";
+ _exec_err.assign(boost::asio::error::operation_aborted,
+ boost::asio::error::get_system_category());
_wait_for_next_response_header();
}
}
- if (_exec_err) _exec_gate.notify_all();
+ if (_exec_err)
+ _exec_gate.notify_all();
}
-void rpc_client::_handle_response_data(const boost::system::error_code& err, size_t transferred, size_t expected)
+void rpc_client::_handle_response_data(
+ const boost::system::error_code& err, size_t transferred, size_t expected)
{
boost::mutex::scoped_lock lock(_mutex);
_exec_err = err;
if (transferred != expected) {
- _exec_err.assign(boost::asio::error::operation_aborted, boost::asio::error::get_system_category());
+ _exec_err.assign(boost::asio::error::operation_aborted,
+ boost::asio::error::get_system_category());
}
_exec_gate.notify_all();
@@ -194,15 +216,16 @@ void rpc_client::_handle_response_data(const boost::system::error_code& err, siz
_wait_for_next_response_header();
}
-void rpc_client::_wait_for_next_response_header() {
+void rpc_client::_wait_for_next_response_header()
+{
//_mutex must be locked when this call is made
- boost::asio::async_read(
- _socket,
+ boost::asio::async_read(_socket,
boost::asio::buffer(&_response.header, sizeof(_response.header)),
- boost::bind(&rpc_client::_handle_response_hdr, this,
+ boost::bind(&rpc_client::_handle_response_hdr,
+ this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred,
sizeof(_response.header)));
}
-}}
+}} // namespace uhd::usrprio_rpc