diff options
author | Alex Williams <alex.williams@ni.com> | 2018-08-31 11:35:07 -0700 |
---|---|---|
committer | Brent Stapleton <brent.stapleton@ettus.com> | 2019-01-15 17:14:57 -0800 |
commit | e2195ac505bd423d3d2f973bbe94da1c78296aa6 (patch) | |
tree | 296ffd98c620c4ad3e313cd697891418af26cc94 /host/lib/transport/uhd-dpdk/uhd_dpdk_fops.c | |
parent | e2cde21ceb7497dcc1ef25156afa6472fe64f009 (diff) | |
download | uhd-e2195ac505bd423d3d2f973bbe94da1c78296aa6.tar.gz uhd-e2195ac505bd423d3d2f973bbe94da1c78296aa6.tar.bz2 uhd-e2195ac505bd423d3d2f973bbe94da1c78296aa6.zip |
transport: Add blocking recv calls to uhd-dpdk
This adds an internal wait queue API to uhd-dpdk. Socket configuration
requests had their blocking calls re-implemented on top of this API, and
it is also used to service requests to wait on RX packets (w/ timeout).
The wait API involves a multi-producer, single-consumer queue per I/O
thread (waiter_ring), with a condition variable used for sleeping. The
data structure is shared between user thread and I/O thread, and because
timeouts make resource release time non-deterministic, we use reference
counting on the shared resource.
One reference is generated by the user thread and passed to the I/O
thread to consume. A user thread that still needs the data after waking
must get() another reference, to postpone the destruction of the
resource until it is done.
Timeouts are based on CLOCK_MONOTONIC. For recv, a timeout of 0
indicates blocking indefinitely, and a negative timeout indicates no
timeout is desired.
Also drop timeout for closing sockets in uhd-dpdk.
The timeout would allow a user thread to pre-empt the I/O thread's
cleanup process. The user thread would free data structures the I/O
thread still needed to function. Since this timeout is superfluous
anyway, let's just get rid of it.
Also add some more input checking and error reporting.
Diffstat (limited to 'host/lib/transport/uhd-dpdk/uhd_dpdk_fops.c')
-rw-r--r-- | host/lib/transport/uhd-dpdk/uhd_dpdk_fops.c | 135 |
1 files changed, 90 insertions, 45 deletions
diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.c b/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.c index 309e5e643..605f01de3 100644 --- a/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.c +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.c @@ -5,6 +5,7 @@ // #include "uhd_dpdk_fops.h" #include "uhd_dpdk_udp.h" +#include "uhd_dpdk_wait.h" #include <rte_malloc.h> #include <rte_ip.h> @@ -22,24 +23,14 @@ int _uhd_dpdk_config_req_compl(struct uhd_dpdk_config_req *req, int retval) { req->retval = retval; - int stat = pthread_mutex_trylock(&req->mutex); - if (stat) { - RTE_LOG(ERR, USER1, "%s: Could not lock req mutex\n", __func__); - return stat; - } - stat = pthread_cond_signal(&req->cond); - pthread_mutex_unlock(&req->mutex); - if (stat) { - RTE_LOG(ERR, USER1, "%s: Could not signal req cond\n", __func__); - return stat; - } - return 0; + int stat = _uhd_dpdk_waiter_wake(req->waiter, req->sock->port->parent); + return stat; } int _uhd_dpdk_sock_setup(struct uhd_dpdk_config_req *req) { int stat = 0; - switch (req->sock_type) { + switch (req->sock->sock_type) { case UHD_DPDK_SOCK_UDP: stat = _uhd_dpdk_udp_setup(req); break; @@ -53,7 +44,7 @@ int _uhd_dpdk_sock_setup(struct uhd_dpdk_config_req *req) int _uhd_dpdk_sock_release(struct uhd_dpdk_config_req *req) { int stat = 0; - switch (req->sock_type) { + switch (req->sock->sock_type) { case UHD_DPDK_SOCK_UDP: stat = _uhd_dpdk_udp_release(req); break; @@ -65,6 +56,22 @@ int _uhd_dpdk_sock_release(struct uhd_dpdk_config_req *req) return stat; } +int _uhd_dpdk_sock_rx_key(struct uhd_dpdk_socket *sock, + struct uhd_dpdk_ipv4_5tuple *key) +{ + int stat = 0; + if (!key) + return -EINVAL; + + switch (sock->sock_type) { + case UHD_DPDK_SOCK_UDP: + stat = _uhd_dpdk_udp_rx_key(sock, key); + break; + default: + stat = -EINVAL; + } + return stat; +} /************************************************ * API calls */ @@ -85,12 +92,17 @@ struct uhd_dpdk_socket* uhd_dpdk_sock_open(unsigned int portid, return NULL; } - struct uhd_dpdk_config_req *req = (struct uhd_dpdk_config_req *) rte_zmalloc(NULL, sizeof(*req), 0); if (!req) { return NULL; } + req->waiter = uhd_dpdk_waiter_alloc(UHD_DPDK_WAIT_SIMPLE); + if (!req->waiter) { + req->retval = -ENOMEM; + goto sock_open_end; + } + struct uhd_dpdk_socket *s = (struct uhd_dpdk_socket *) rte_zmalloc(NULL, sizeof(*s), 0); if (!s) { goto sock_open_end; @@ -99,13 +111,8 @@ struct uhd_dpdk_socket* uhd_dpdk_sock_open(unsigned int portid, s->port = port; req->sock = s; req->req_type = UHD_DPDK_SOCK_OPEN; - req->sock_type = t; + req->sock->sock_type = t; req->retval = -ETIMEDOUT; - pthread_mutex_init(&req->mutex, NULL); - pthread_condattr_t condattr; - pthread_condattr_init(&condattr); - pthread_condattr_setclock(&condattr, CLOCK_MONOTONIC); - pthread_cond_init(&req->cond, &condattr); switch (t) { case UHD_DPDK_SOCK_UDP: @@ -121,6 +128,8 @@ struct uhd_dpdk_socket* uhd_dpdk_sock_open(unsigned int portid, } sock_open_end: + if (req->waiter) + uhd_dpdk_waiter_put(req->waiter); rte_free(req); return s; } @@ -133,12 +142,15 @@ int uhd_dpdk_sock_close(struct uhd_dpdk_socket *sock) struct uhd_dpdk_config_req *req = (struct uhd_dpdk_config_req *) rte_zmalloc(NULL, sizeof(*req), 0); if (!req) return -ENOMEM; + + req->waiter = uhd_dpdk_waiter_alloc(UHD_DPDK_WAIT_SIMPLE); + if (!req->waiter) { + rte_free(req); + return -ENOMEM; + } req->sock = sock; req->req_type = UHD_DPDK_SOCK_CLOSE; - req->sock_type = sock->sock_type; req->retval = -ETIMEDOUT; - pthread_mutex_init(&req->mutex, NULL); - pthread_cond_init(&req->cond, NULL); switch (sock->sock_type) { case UHD_DPDK_SOCK_UDP: @@ -148,6 +160,8 @@ int uhd_dpdk_sock_close(struct uhd_dpdk_socket *sock) break; } + uhd_dpdk_waiter_put(req->waiter); + if (req->retval) { rte_free(req); return req->retval; @@ -157,26 +171,34 @@ int uhd_dpdk_sock_close(struct uhd_dpdk_socket *sock) return 0; } -/* - * TODO: - * Add blocking calls with timeout - * Implementation would involve a condition variable, like config reqs - * Also would create a cleanup section in I/O main loop (like config reqs) - */ int uhd_dpdk_request_tx_bufs(struct uhd_dpdk_socket *sock, struct rte_mbuf **bufs, - unsigned int num_bufs) + unsigned int num_bufs, int timeout) { if (!sock || !bufs || !num_bufs) { return -EINVAL; } *bufs = NULL; - if (!sock->tx_ring) + if (!sock->tx_queue) return -EINVAL; - unsigned int num_tx = rte_ring_count(sock->rx_ring); + if (!sock->tx_queue->freebufs) + return -EINVAL; + + struct rte_ring *freebufs = sock->tx_queue->freebufs; + unsigned int num_tx = rte_ring_count(freebufs); + if (timeout != 0 && num_tx == 0) { + struct uhd_dpdk_wait_req *req = + uhd_dpdk_waiter_alloc(UHD_DPDK_WAIT_TX_BUF); + req->sock = sock; + uhd_dpdk_waiter_wait(req, timeout, sock->port->parent); + uhd_dpdk_waiter_put(req); + num_tx = rte_ring_count(freebufs); + if (!num_tx) + return -ETIMEDOUT; + } num_tx = (num_tx < num_bufs) ? num_tx : num_bufs; - if (rte_ring_dequeue_bulk(sock->rx_ring, (void **) bufs, num_tx, NULL) == 0) + if (rte_ring_dequeue_bulk(freebufs, (void **) bufs, num_tx, NULL) == 0) return -ENOENT; sock->tx_buf_count += num_tx; return num_tx; @@ -187,9 +209,12 @@ int uhd_dpdk_send(struct uhd_dpdk_socket *sock, struct rte_mbuf **bufs, { if (!sock || !bufs || !num_bufs) return -EINVAL; - if (!sock->tx_ring) + if (!sock->tx_queue) return -EINVAL; - unsigned int num_tx = rte_ring_free_count(sock->tx_ring); + if (!sock->tx_queue->queue) + return -EINVAL; + struct rte_ring *tx_ring = sock->tx_queue->queue; + unsigned int num_tx = rte_ring_free_count(tx_ring); num_tx = (num_tx < num_bufs) ? num_tx : num_bufs; switch (sock->sock_type) { case UHD_DPDK_SOCK_UDP: @@ -201,7 +226,7 @@ int uhd_dpdk_send(struct uhd_dpdk_socket *sock, struct rte_mbuf **bufs, RTE_LOG(ERR, USER1, "%s: Unsupported sock type\n", __func__); return -EINVAL; } - int status = rte_ring_enqueue_bulk(sock->tx_ring, (void **) bufs, num_tx, NULL); + int status = rte_ring_enqueue_bulk(tx_ring, (void **) bufs, num_tx, NULL); if (status == 0) { RTE_LOG(ERR, USER1, "Invalid shared usage of TX ring detected\n"); return status; @@ -210,10 +235,6 @@ int uhd_dpdk_send(struct uhd_dpdk_socket *sock, struct rte_mbuf **bufs, return num_tx; } -/* - * TODO: - * Add blocking calls with timeout - */ int uhd_dpdk_recv(struct uhd_dpdk_socket *sock, struct rte_mbuf **bufs, unsigned int num_bufs, int timeout) { @@ -221,11 +242,21 @@ int uhd_dpdk_recv(struct uhd_dpdk_socket *sock, struct rte_mbuf **bufs, return -EINVAL; if (!sock->rx_ring) return -EINVAL; + unsigned int num_rx = rte_ring_count(sock->rx_ring); + if (timeout != 0 && num_rx == 0) { + struct uhd_dpdk_wait_req *req = + uhd_dpdk_waiter_alloc(UHD_DPDK_WAIT_RX); + req->sock = sock; + uhd_dpdk_waiter_wait(req, timeout, sock->port->parent); + uhd_dpdk_waiter_put(req); + num_rx = rte_ring_count(sock->rx_ring); + if (!num_rx) + return -ETIMEDOUT; + } + num_rx = (num_rx < num_bufs) ? num_rx : num_bufs; - /* if ((timeout > 0) && (num_rx != num_bufs)) { - // Wait for enough bufs - } else*/ if (num_rx) { + if (num_rx) { unsigned int avail = 0; unsigned int status = rte_ring_dequeue_bulk(sock->rx_ring, (void **) bufs, num_rx, &avail); @@ -293,7 +324,7 @@ int uhd_dpdk_get_src_ipv4(struct uhd_dpdk_socket *sock, struct rte_mbuf *buf, return 0; } -int uhd_dpdk_get_drop_count(struct uhd_dpdk_socket *sock, uint32_t *count) +int uhd_dpdk_get_drop_count(struct uhd_dpdk_socket *sock, size_t *count) { if (!sock) return -EINVAL; @@ -306,3 +337,17 @@ int uhd_dpdk_get_drop_count(struct uhd_dpdk_socket *sock, uint32_t *count) *count = pdata->dropped_pkts; return 0; } + +int uhd_dpdk_get_xfer_count(struct uhd_dpdk_socket *sock, size_t *count) +{ + if (!sock) + return -EINVAL; + if (sock->sock_type != UHD_DPDK_SOCK_UDP) + return -EINVAL; + if (!sock->priv) + return -ENODEV; + + struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) sock->priv; + *count = pdata->xferd_pkts; + return 0; +} |