diff options
author | Alex Williams <alex.williams@ni.com> | 2018-08-31 11:35:07 -0700 |
---|---|---|
committer | Brent Stapleton <brent.stapleton@ettus.com> | 2019-01-15 17:14:57 -0800 |
commit | e2195ac505bd423d3d2f973bbe94da1c78296aa6 (patch) | |
tree | 296ffd98c620c4ad3e313cd697891418af26cc94 /host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c | |
parent | e2cde21ceb7497dcc1ef25156afa6472fe64f009 (diff) | |
download | uhd-e2195ac505bd423d3d2f973bbe94da1c78296aa6.tar.gz uhd-e2195ac505bd423d3d2f973bbe94da1c78296aa6.tar.bz2 uhd-e2195ac505bd423d3d2f973bbe94da1c78296aa6.zip |
transport: Add blocking recv calls to uhd-dpdk
This adds an internal wait queue API to uhd-dpdk. Socket configuration
requests had their blocking calls re-implemented on top of this API, and
it is also used to service requests to wait on RX packets (w/ timeout).
The wait API involves a multi-producer, single-consumer queue per I/O
thread (waiter_ring), with a condition variable used for sleeping. The
data structure is shared between user thread and I/O thread, and because
timeouts make resource release time non-deterministic, we use reference
counting on the shared resource.
One reference is generated by the user thread and passed to the I/O
thread to consume. A user thread that still needs the data after waking
must get() another reference, to postpone the destruction of the
resource until it is done.
Timeouts are based on CLOCK_MONOTONIC. For recv, a timeout of 0
indicates blocking indefinitely, and a negative timeout indicates no
timeout is desired.
Also drop timeout for closing sockets in uhd-dpdk.
The timeout would allow a user thread to pre-empt the I/O thread's
cleanup process. The user thread would free data structures the I/O
thread still needed to function. Since this timeout is superfluous
anyway, let's just get rid of it.
Also add some more input checking and error reporting.
Diffstat (limited to 'host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c')
-rw-r--r-- | host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c | 158 |
1 files changed, 99 insertions, 59 deletions
diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c index 26cfd43e1..6ea77b930 100644 --- a/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c @@ -6,17 +6,20 @@ #include "uhd_dpdk_fops.h" #include "uhd_dpdk_udp.h" #include "uhd_dpdk_driver.h" +#include "uhd_dpdk_wait.h" #include <rte_ring.h> #include <rte_malloc.h> #include <unistd.h> #include <sys/syscall.h> #include <arpa/inet.h> +#define MAX_UDP_PORT 65535 + /************************************************ * I/O thread ONLY */ -static int _alloc_txq(struct uhd_dpdk_port *port, pid_t tid, struct uhd_dpdk_tx_queue **queue) +static int _alloc_txq(struct uhd_dpdk_port *port, pthread_t tid, struct uhd_dpdk_tx_queue **queue) { *queue = NULL; struct uhd_dpdk_tx_queue *q = rte_zmalloc(NULL, sizeof(*q), 0); @@ -108,7 +111,7 @@ int _uhd_dpdk_udp_setup(struct uhd_dpdk_config_req *req) /* Add to rx table */ if (pdata->dst_port == 0) { /* Assign unused one in a very slow fashion */ - for (uint16_t i = 1; i > 0; i++) { + for (uint16_t i = MAX_UDP_PORT; i > 0; i--) { ht_key.dst_port = htons(i); if (rte_hash_lookup(port->rx_table, &ht_key) == -ENOENT) { pdata->dst_port = htons(i); @@ -144,9 +147,22 @@ int _uhd_dpdk_udp_setup(struct uhd_dpdk_config_req *req) _uhd_dpdk_config_req_compl(req, -ENOMEM); return -ENOMEM; } - retval = rte_hash_add_key_data(port->rx_table, &ht_key, sock); + + struct uhd_dpdk_rx_entry *entry = (struct uhd_dpdk_rx_entry *) + rte_zmalloc(NULL, sizeof(*entry), 0); + if (!entry) { + rte_ring_free(sock->rx_ring); + RTE_LOG(ERR, USER1, "%s: Cannot create RX entry\n", __func__); + _uhd_dpdk_config_req_compl(req, -ENOMEM); + return -ENOMEM; + } + entry->sock = sock; + entry->waiter = NULL; + + retval = rte_hash_add_key_data(port->rx_table, &ht_key, entry); if (retval != 0) { RTE_LOG(WARNING, TABLE, "Could not add new RX socket to port %d: %d\n", port->id, retval); + rte_free(entry); rte_ring_free(sock->rx_ring); _uhd_dpdk_config_req_compl(req, retval); return retval; @@ -155,25 +171,25 @@ int _uhd_dpdk_udp_setup(struct uhd_dpdk_config_req *req) } /* Are we doing TX? */ - if (sock->tx_ring) { - sock->tx_ring = NULL; + if (sock->tx_queue) { + sock->tx_queue = NULL; struct uhd_dpdk_tx_queue *q = NULL; - LIST_FOREACH(q, &port->txq_list, entry) { - if (q->tid == sock->tid) { - LIST_INSERT_HEAD(&q->tx_list, sock, tx_entry); - sock->tx_ring = q->queue; - sock->rx_ring = q->freebufs; - break; - } - } - if (!sock->tx_ring) { + // FIXME Not sharing txq across all thread's sockets for now + //LIST_FOREACH(q, &port->txq_list, entry) { + // if (pthread_equal(q->tid, sock->tid)) { + // LIST_INSERT_HEAD(&q->tx_list, sock, tx_entry); + // sock->tx_ring = q->queue; + // sock->rx_ring = q->freebufs; + // break; + // } + //} + if (!sock->tx_queue) { retval = _alloc_txq(port, sock->tid, &q); if (retval) { _uhd_dpdk_config_req_compl(req, retval); return retval; } - sock->tx_ring = q->queue; - sock->rx_ring = q->freebufs; + sock->tx_queue = q; } /* If a broadcast type, just finish setup and return */ if (is_broadcast(port, pdata->dst_ipv4_addr)) { @@ -242,10 +258,23 @@ int _uhd_dpdk_udp_setup(struct uhd_dpdk_config_req *req) int _uhd_dpdk_udp_release(struct uhd_dpdk_config_req *req) { struct uhd_dpdk_socket *sock = req->sock; + if (req->sock == NULL) { + RTE_LOG(ERR, USER1, "%s: no sock in req\n", __func__); + return -EINVAL; + } struct uhd_dpdk_port *port = req->sock->port; struct uhd_dpdk_config_req *conf_req = NULL; struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) sock->priv; - if (sock->tx_ring) { + if (pdata == NULL) { + RTE_LOG(ERR, USER1, "%s: no pdata in sock\n", __func__); + return -EINVAL; + } + if (sock->tx_queue) { + // FIXME not sharing buffers anymore + LIST_REMOVE(sock->tx_queue, entry); + rte_ring_free(sock->tx_queue->queue); + rte_ring_free(sock->tx_queue->retry_queue); + /* Remove from tx_list */ LIST_REMOVE(sock, tx_entry); /* Check for entry in ARP table */ @@ -260,18 +289,32 @@ int _uhd_dpdk_udp_release(struct uhd_dpdk_config_req *req) } } - /* Add outstanding buffers back to TX queue's freebufs */ - struct rte_mbuf *freebufs[UHD_DPDK_TXQ_SIZE]; - int status = rte_pktmbuf_alloc_bulk(port->parent->tx_pktbuf_pool, freebufs, sock->tx_buf_count); - if (status) { - RTE_LOG(ERR, USER1, "%d %s: Could not restore %u TX buffers in bulk!\n", status, __func__, sock->tx_buf_count); + // FIXME not sharing buffers anymore + // Remove outstanding buffers from TX queue's freebufs */ + unsigned int bufs = rte_ring_count(sock->tx_queue->freebufs); + for (unsigned int i = 0; i < bufs; i++) { + struct rte_mbuf *buf = NULL; + if (rte_ring_dequeue(sock->tx_queue->freebufs, (void **) &buf)) { + RTE_LOG(ERR, USER1, "%s: Could not dequeue freebufs\n", __func__); + } else if (buf) { + rte_pktmbuf_free(buf); + } } + rte_ring_free(sock->tx_queue->freebufs); + rte_free(sock->tx_queue); - unsigned int enqd = rte_ring_enqueue_bulk(sock->rx_ring, (void **) freebufs, sock->tx_buf_count, NULL); - if (enqd != (unsigned int) sock->tx_buf_count) { - RTE_LOG(ERR, USER1, "%s: Could not enqueue TX buffers!\n", __func__); - return status; - } + /* Add outstanding buffers back to TX queue's freebufs */ + //struct rte_mbuf *freebufs[UHD_DPDK_TXQ_SIZE]; + //int status = rte_pktmbuf_alloc_bulk(port->parent->tx_pktbuf_pool, freebufs, sock->tx_buf_count); + //if (status) { + // RTE_LOG(ERR, USER1, "%d %s: Could not restore %u TX buffers in bulk!\n", status, __func__, sock->tx_buf_count); + //} + + //unsigned int enqd = rte_ring_enqueue_bulk(sock->rx_ring, (void **) freebufs, sock->tx_buf_count, NULL); + //if (enqd != (unsigned int) sock->tx_buf_count) { + // RTE_LOG(ERR, USER1, "%s: Could not enqueue TX buffers!\n", __func__); + // return status; + //} } else if (sock->rx_ring) { struct uhd_dpdk_ipv4_5tuple ht_key = { .sock_type = UHD_DPDK_SOCK_UDP, @@ -280,6 +323,13 @@ int _uhd_dpdk_udp_release(struct uhd_dpdk_config_req *req) .dst_ip = 0, .dst_port = pdata->dst_port }; + struct uhd_dpdk_rx_entry *entry = NULL; + rte_hash_lookup_data(port->rx_table, &ht_key, (void **) &entry); + if (entry) { + if (entry->waiter) + uhd_dpdk_waiter_put(entry->waiter); + rte_free(entry); + } rte_hash_del_key(port->rx_table, &ht_key); struct rte_mbuf *mbuf = NULL; while (rte_ring_dequeue(sock->rx_ring, (void **) &mbuf) == 0) { @@ -292,10 +342,21 @@ int _uhd_dpdk_udp_release(struct uhd_dpdk_config_req *req) return 0; } +int _uhd_dpdk_udp_rx_key(struct uhd_dpdk_socket *sock, + struct uhd_dpdk_ipv4_5tuple *key) +{ + struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) sock->priv; + if (!pdata) + return -EINVAL; + key->sock_type = UHD_DPDK_SOCK_UDP; + key->src_ip = 0; + key->src_port = 0; + key->dst_ip = 0; + key->dst_port = pdata->dst_port; + return 0; +} + /* Configure a socket for UDP - * TODO: Make sure EVERYTHING is configured in socket - * FIXME: Make all of this part of the user thread, except hash table interaction - * and list handling */ void uhd_dpdk_udp_open(struct uhd_dpdk_config_req *req, struct uhd_dpdk_sockarg_udp *arg) @@ -309,8 +370,7 @@ void uhd_dpdk_udp_open(struct uhd_dpdk_config_req *req, } struct uhd_dpdk_socket *sock = req->sock; - pid_t tid = syscall(__NR_gettid); - sock->tid = tid; + sock->tid = pthread_self(); /* Create private data */ struct uhd_dpdk_udp_priv *data = (struct uhd_dpdk_udp_priv *) rte_zmalloc(NULL, sizeof(*data), 0); @@ -324,7 +384,7 @@ void uhd_dpdk_udp_open(struct uhd_dpdk_config_req *req, if (arg->is_tx) { data->src_port = arg->local_port; data->dst_port = arg->remote_port; - sock->tx_ring = (struct rte_ring *) sock; + sock->tx_queue = (struct uhd_dpdk_tx_queue *) sock; } else { data->src_port = arg->remote_port; data->dst_port = arg->local_port; @@ -333,19 +393,10 @@ void uhd_dpdk_udp_open(struct uhd_dpdk_config_req *req, /* TODO: Add support for I/O thread calling (skip locking and sleep) */ /* Add to port's config queue */ - pthread_mutex_lock(&req->mutex); - if (rte_ring_enqueue(req->sock->port->parent->sock_req_ring, req)) { - pthread_mutex_unlock(&req->mutex); - rte_free(data); - req->retval = -ENOSPC; - return; - } - struct timespec timeout; - clock_gettime(CLOCK_MONOTONIC, &timeout); - timeout.tv_sec += 1; - pthread_cond_timedwait(&req->cond, &req->mutex, &timeout); - pthread_mutex_unlock(&req->mutex); - + int status = uhd_dpdk_config_req_submit(req, -1, sock->port->parent); + if (status) + req->retval = status; + if (req->retval) rte_free(data); } @@ -355,18 +406,7 @@ void uhd_dpdk_udp_close(struct uhd_dpdk_config_req *req) if (!req) return; - pthread_mutex_lock(&req->mutex); - if (rte_ring_enqueue(req->sock->port->parent->sock_req_ring, req)) { - pthread_mutex_unlock(&req->mutex); - rte_free(req->sock->priv); - req->retval = -ENOSPC; - return; - } - struct timespec timeout; - clock_gettime(CLOCK_MONOTONIC, &timeout); - timeout.tv_sec += 1; - pthread_cond_timedwait(&req->cond, &req->mutex, &timeout); - pthread_mutex_unlock(&req->mutex); + uhd_dpdk_config_req_submit(req, -1, req->sock->port->parent); rte_free(req->sock->priv); } @@ -440,7 +480,7 @@ int uhd_dpdk_udp_get_info(struct uhd_dpdk_socket *sock, return -EINVAL; struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) sock->priv; - if (sock->tx_ring) { + if (sock->tx_queue) { sockarg->is_tx = true; sockarg->local_port = pdata->src_port; sockarg->remote_port = pdata->dst_port; |