aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.c
diff options
context:
space:
mode:
authorAlex Williams <alex.williams@ni.com>2018-08-31 11:35:07 -0700
committerBrent Stapleton <brent.stapleton@ettus.com>2019-01-15 17:14:57 -0800
commite2195ac505bd423d3d2f973bbe94da1c78296aa6 (patch)
tree296ffd98c620c4ad3e313cd697891418af26cc94 /host/lib/transport/uhd-dpdk/uhd_dpdk_fops.c
parente2cde21ceb7497dcc1ef25156afa6472fe64f009 (diff)
downloaduhd-e2195ac505bd423d3d2f973bbe94da1c78296aa6.tar.gz
uhd-e2195ac505bd423d3d2f973bbe94da1c78296aa6.tar.bz2
uhd-e2195ac505bd423d3d2f973bbe94da1c78296aa6.zip
transport: Add blocking recv calls to uhd-dpdk
This adds an internal wait queue API to uhd-dpdk. Socket configuration requests had their blocking calls re-implemented on top of this API, and it is also used to service requests to wait on RX packets (w/ timeout). The wait API involves a multi-producer, single-consumer queue per I/O thread (waiter_ring), with a condition variable used for sleeping. The data structure is shared between user thread and I/O thread, and because timeouts make resource release time non-deterministic, we use reference counting on the shared resource. One reference is generated by the user thread and passed to the I/O thread to consume. A user thread that still needs the data after waking must get() another reference, to postpone the destruction of the resource until it is done. Timeouts are based on CLOCK_MONOTONIC. For recv, a timeout of 0 indicates blocking indefinitely, and a negative timeout indicates no timeout is desired. Also drop timeout for closing sockets in uhd-dpdk. The timeout would allow a user thread to pre-empt the I/O thread's cleanup process. The user thread would free data structures the I/O thread still needed to function. Since this timeout is superfluous anyway, let's just get rid of it. Also add some more input checking and error reporting.
Diffstat (limited to 'host/lib/transport/uhd-dpdk/uhd_dpdk_fops.c')
-rw-r--r--host/lib/transport/uhd-dpdk/uhd_dpdk_fops.c135
1 files changed, 90 insertions, 45 deletions
diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.c b/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.c
index 309e5e643..605f01de3 100644
--- a/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.c
+++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.c
@@ -5,6 +5,7 @@
//
#include "uhd_dpdk_fops.h"
#include "uhd_dpdk_udp.h"
+#include "uhd_dpdk_wait.h"
#include <rte_malloc.h>
#include <rte_ip.h>
@@ -22,24 +23,14 @@
int _uhd_dpdk_config_req_compl(struct uhd_dpdk_config_req *req, int retval)
{
req->retval = retval;
- int stat = pthread_mutex_trylock(&req->mutex);
- if (stat) {
- RTE_LOG(ERR, USER1, "%s: Could not lock req mutex\n", __func__);
- return stat;
- }
- stat = pthread_cond_signal(&req->cond);
- pthread_mutex_unlock(&req->mutex);
- if (stat) {
- RTE_LOG(ERR, USER1, "%s: Could not signal req cond\n", __func__);
- return stat;
- }
- return 0;
+ int stat = _uhd_dpdk_waiter_wake(req->waiter, req->sock->port->parent);
+ return stat;
}
int _uhd_dpdk_sock_setup(struct uhd_dpdk_config_req *req)
{
int stat = 0;
- switch (req->sock_type) {
+ switch (req->sock->sock_type) {
case UHD_DPDK_SOCK_UDP:
stat = _uhd_dpdk_udp_setup(req);
break;
@@ -53,7 +44,7 @@ int _uhd_dpdk_sock_setup(struct uhd_dpdk_config_req *req)
int _uhd_dpdk_sock_release(struct uhd_dpdk_config_req *req)
{
int stat = 0;
- switch (req->sock_type) {
+ switch (req->sock->sock_type) {
case UHD_DPDK_SOCK_UDP:
stat = _uhd_dpdk_udp_release(req);
break;
@@ -65,6 +56,22 @@ int _uhd_dpdk_sock_release(struct uhd_dpdk_config_req *req)
return stat;
}
+int _uhd_dpdk_sock_rx_key(struct uhd_dpdk_socket *sock,
+ struct uhd_dpdk_ipv4_5tuple *key)
+{
+ int stat = 0;
+ if (!key)
+ return -EINVAL;
+
+ switch (sock->sock_type) {
+ case UHD_DPDK_SOCK_UDP:
+ stat = _uhd_dpdk_udp_rx_key(sock, key);
+ break;
+ default:
+ stat = -EINVAL;
+ }
+ return stat;
+}
/************************************************
* API calls
*/
@@ -85,12 +92,17 @@ struct uhd_dpdk_socket* uhd_dpdk_sock_open(unsigned int portid,
return NULL;
}
-
struct uhd_dpdk_config_req *req = (struct uhd_dpdk_config_req *) rte_zmalloc(NULL, sizeof(*req), 0);
if (!req) {
return NULL;
}
+ req->waiter = uhd_dpdk_waiter_alloc(UHD_DPDK_WAIT_SIMPLE);
+ if (!req->waiter) {
+ req->retval = -ENOMEM;
+ goto sock_open_end;
+ }
+
struct uhd_dpdk_socket *s = (struct uhd_dpdk_socket *) rte_zmalloc(NULL, sizeof(*s), 0);
if (!s) {
goto sock_open_end;
@@ -99,13 +111,8 @@ struct uhd_dpdk_socket* uhd_dpdk_sock_open(unsigned int portid,
s->port = port;
req->sock = s;
req->req_type = UHD_DPDK_SOCK_OPEN;
- req->sock_type = t;
+ req->sock->sock_type = t;
req->retval = -ETIMEDOUT;
- pthread_mutex_init(&req->mutex, NULL);
- pthread_condattr_t condattr;
- pthread_condattr_init(&condattr);
- pthread_condattr_setclock(&condattr, CLOCK_MONOTONIC);
- pthread_cond_init(&req->cond, &condattr);
switch (t) {
case UHD_DPDK_SOCK_UDP:
@@ -121,6 +128,8 @@ struct uhd_dpdk_socket* uhd_dpdk_sock_open(unsigned int portid,
}
sock_open_end:
+ if (req->waiter)
+ uhd_dpdk_waiter_put(req->waiter);
rte_free(req);
return s;
}
@@ -133,12 +142,15 @@ int uhd_dpdk_sock_close(struct uhd_dpdk_socket *sock)
struct uhd_dpdk_config_req *req = (struct uhd_dpdk_config_req *) rte_zmalloc(NULL, sizeof(*req), 0);
if (!req)
return -ENOMEM;
+
+ req->waiter = uhd_dpdk_waiter_alloc(UHD_DPDK_WAIT_SIMPLE);
+ if (!req->waiter) {
+ rte_free(req);
+ return -ENOMEM;
+ }
req->sock = sock;
req->req_type = UHD_DPDK_SOCK_CLOSE;
- req->sock_type = sock->sock_type;
req->retval = -ETIMEDOUT;
- pthread_mutex_init(&req->mutex, NULL);
- pthread_cond_init(&req->cond, NULL);
switch (sock->sock_type) {
case UHD_DPDK_SOCK_UDP:
@@ -148,6 +160,8 @@ int uhd_dpdk_sock_close(struct uhd_dpdk_socket *sock)
break;
}
+ uhd_dpdk_waiter_put(req->waiter);
+
if (req->retval) {
rte_free(req);
return req->retval;
@@ -157,26 +171,34 @@ int uhd_dpdk_sock_close(struct uhd_dpdk_socket *sock)
return 0;
}
-/*
- * TODO:
- * Add blocking calls with timeout
- * Implementation would involve a condition variable, like config reqs
- * Also would create a cleanup section in I/O main loop (like config reqs)
- */
int uhd_dpdk_request_tx_bufs(struct uhd_dpdk_socket *sock, struct rte_mbuf **bufs,
- unsigned int num_bufs)
+ unsigned int num_bufs, int timeout)
{
if (!sock || !bufs || !num_bufs) {
return -EINVAL;
}
*bufs = NULL;
- if (!sock->tx_ring)
+ if (!sock->tx_queue)
return -EINVAL;
- unsigned int num_tx = rte_ring_count(sock->rx_ring);
+ if (!sock->tx_queue->freebufs)
+ return -EINVAL;
+
+ struct rte_ring *freebufs = sock->tx_queue->freebufs;
+ unsigned int num_tx = rte_ring_count(freebufs);
+ if (timeout != 0 && num_tx == 0) {
+ struct uhd_dpdk_wait_req *req =
+ uhd_dpdk_waiter_alloc(UHD_DPDK_WAIT_TX_BUF);
+ req->sock = sock;
+ uhd_dpdk_waiter_wait(req, timeout, sock->port->parent);
+ uhd_dpdk_waiter_put(req);
+ num_tx = rte_ring_count(freebufs);
+ if (!num_tx)
+ return -ETIMEDOUT;
+ }
num_tx = (num_tx < num_bufs) ? num_tx : num_bufs;
- if (rte_ring_dequeue_bulk(sock->rx_ring, (void **) bufs, num_tx, NULL) == 0)
+ if (rte_ring_dequeue_bulk(freebufs, (void **) bufs, num_tx, NULL) == 0)
return -ENOENT;
sock->tx_buf_count += num_tx;
return num_tx;
@@ -187,9 +209,12 @@ int uhd_dpdk_send(struct uhd_dpdk_socket *sock, struct rte_mbuf **bufs,
{
if (!sock || !bufs || !num_bufs)
return -EINVAL;
- if (!sock->tx_ring)
+ if (!sock->tx_queue)
return -EINVAL;
- unsigned int num_tx = rte_ring_free_count(sock->tx_ring);
+ if (!sock->tx_queue->queue)
+ return -EINVAL;
+ struct rte_ring *tx_ring = sock->tx_queue->queue;
+ unsigned int num_tx = rte_ring_free_count(tx_ring);
num_tx = (num_tx < num_bufs) ? num_tx : num_bufs;
switch (sock->sock_type) {
case UHD_DPDK_SOCK_UDP:
@@ -201,7 +226,7 @@ int uhd_dpdk_send(struct uhd_dpdk_socket *sock, struct rte_mbuf **bufs,
RTE_LOG(ERR, USER1, "%s: Unsupported sock type\n", __func__);
return -EINVAL;
}
- int status = rte_ring_enqueue_bulk(sock->tx_ring, (void **) bufs, num_tx, NULL);
+ int status = rte_ring_enqueue_bulk(tx_ring, (void **) bufs, num_tx, NULL);
if (status == 0) {
RTE_LOG(ERR, USER1, "Invalid shared usage of TX ring detected\n");
return status;
@@ -210,10 +235,6 @@ int uhd_dpdk_send(struct uhd_dpdk_socket *sock, struct rte_mbuf **bufs,
return num_tx;
}
-/*
- * TODO:
- * Add blocking calls with timeout
- */
int uhd_dpdk_recv(struct uhd_dpdk_socket *sock, struct rte_mbuf **bufs,
unsigned int num_bufs, int timeout)
{
@@ -221,11 +242,21 @@ int uhd_dpdk_recv(struct uhd_dpdk_socket *sock, struct rte_mbuf **bufs,
return -EINVAL;
if (!sock->rx_ring)
return -EINVAL;
+
unsigned int num_rx = rte_ring_count(sock->rx_ring);
+ if (timeout != 0 && num_rx == 0) {
+ struct uhd_dpdk_wait_req *req =
+ uhd_dpdk_waiter_alloc(UHD_DPDK_WAIT_RX);
+ req->sock = sock;
+ uhd_dpdk_waiter_wait(req, timeout, sock->port->parent);
+ uhd_dpdk_waiter_put(req);
+ num_rx = rte_ring_count(sock->rx_ring);
+ if (!num_rx)
+ return -ETIMEDOUT;
+ }
+
num_rx = (num_rx < num_bufs) ? num_rx : num_bufs;
- /* if ((timeout > 0) && (num_rx != num_bufs)) {
- // Wait for enough bufs
- } else*/ if (num_rx) {
+ if (num_rx) {
unsigned int avail = 0;
unsigned int status = rte_ring_dequeue_bulk(sock->rx_ring,
(void **) bufs, num_rx, &avail);
@@ -293,7 +324,7 @@ int uhd_dpdk_get_src_ipv4(struct uhd_dpdk_socket *sock, struct rte_mbuf *buf,
return 0;
}
-int uhd_dpdk_get_drop_count(struct uhd_dpdk_socket *sock, uint32_t *count)
+int uhd_dpdk_get_drop_count(struct uhd_dpdk_socket *sock, size_t *count)
{
if (!sock)
return -EINVAL;
@@ -306,3 +337,17 @@ int uhd_dpdk_get_drop_count(struct uhd_dpdk_socket *sock, uint32_t *count)
*count = pdata->dropped_pkts;
return 0;
}
+
+int uhd_dpdk_get_xfer_count(struct uhd_dpdk_socket *sock, size_t *count)
+{
+ if (!sock)
+ return -EINVAL;
+ if (sock->sock_type != UHD_DPDK_SOCK_UDP)
+ return -EINVAL;
+ if (!sock->priv)
+ return -ENODEV;
+
+ struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) sock->priv;
+ *count = pdata->xferd_pkts;
+ return 0;
+}