aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c
diff options
context:
space:
mode:
authorAlex Williams <alex.williams@ni.com>2018-08-31 11:35:07 -0700
committerBrent Stapleton <brent.stapleton@ettus.com>2019-01-15 17:14:57 -0800
commite2195ac505bd423d3d2f973bbe94da1c78296aa6 (patch)
tree296ffd98c620c4ad3e313cd697891418af26cc94 /host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c
parente2cde21ceb7497dcc1ef25156afa6472fe64f009 (diff)
downloaduhd-e2195ac505bd423d3d2f973bbe94da1c78296aa6.tar.gz
uhd-e2195ac505bd423d3d2f973bbe94da1c78296aa6.tar.bz2
uhd-e2195ac505bd423d3d2f973bbe94da1c78296aa6.zip
transport: Add blocking recv calls to uhd-dpdk
This adds an internal wait queue API to uhd-dpdk. Socket configuration requests had their blocking calls re-implemented on top of this API, and it is also used to service requests to wait on RX packets (w/ timeout). The wait API involves a multi-producer, single-consumer queue per I/O thread (waiter_ring), with a condition variable used for sleeping. The data structure is shared between user thread and I/O thread, and because timeouts make resource release time non-deterministic, we use reference counting on the shared resource. One reference is generated by the user thread and passed to the I/O thread to consume. A user thread that still needs the data after waking must get() another reference, to postpone the destruction of the resource until it is done. Timeouts are based on CLOCK_MONOTONIC. For recv, a timeout of 0 indicates blocking indefinitely, and a negative timeout indicates no timeout is desired. Also drop timeout for closing sockets in uhd-dpdk. The timeout would allow a user thread to pre-empt the I/O thread's cleanup process. The user thread would free data structures the I/O thread still needed to function. Since this timeout is superfluous anyway, let's just get rid of it. Also add some more input checking and error reporting.
Diffstat (limited to 'host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c')
-rw-r--r--host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c299
1 files changed, 226 insertions, 73 deletions
diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c b/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c
index 8388359e7..f603f1f8f 100644
--- a/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c
+++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c
@@ -6,6 +6,7 @@
#include "uhd_dpdk_driver.h"
#include "uhd_dpdk_fops.h"
#include "uhd_dpdk_udp.h"
+#include "uhd_dpdk_wait.h"
#include <rte_malloc.h>
#include <rte_mempool.h>
#include <arpa/inet.h>
@@ -145,19 +146,25 @@ int _uhd_dpdk_process_udp(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, str
.dst_port = pkt->dst_port
};
- struct uhd_dpdk_socket *sock = NULL;
- rte_hash_lookup_data(port->rx_table, &ht_key, (void **) &sock);
- if (!sock) {
+ struct uhd_dpdk_rx_entry *entry = NULL;
+ rte_hash_lookup_data(port->rx_table, &ht_key, (void **) &entry);
+ if (!entry) {
status = -ENODEV;
+ //RTE_LOG(WARNING, USER1, "%s: Dropping packet to UDP port %d\n", __func__, ntohs(pkt->dst_port));
goto udp_rx_drop;
}
- struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) sock->priv;
- status = rte_ring_enqueue(sock->rx_ring, mbuf);
+ struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) entry->sock->priv;
+ status = rte_ring_enqueue(entry->sock->rx_ring, mbuf);
+ if (entry->waiter) {
+ _uhd_dpdk_waiter_wake(entry->waiter, port->parent);
+ entry->waiter = NULL;
+ }
if (status) {
pdata->dropped_pkts++;
goto udp_rx_drop;
}
+ pdata->xferd_pkts++;
return 0;
udp_rx_drop:
@@ -220,6 +227,7 @@ static int _uhd_dpdk_send(struct uhd_dpdk_port *port,
return status;
}
}
+
status = rte_eth_tx_burst(port->id, 0, &buf, 1); /* Automatically frees mbuf */
if (status != 1) {
status = rte_ring_enqueue(txq->retry_queue, buf);
@@ -247,6 +255,10 @@ static inline int _uhd_dpdk_restore_bufs(struct uhd_dpdk_port *port,
/* Enqueue the buffers for the user thread to retrieve */
unsigned int enqd = rte_ring_enqueue_bulk(q->freebufs, (void **) freebufs, num_bufs, NULL);
+ if (q->waiter && rte_ring_count(q->freebufs) > 0) {
+ _uhd_dpdk_waiter_wake(q->waiter, port->parent);
+ q->waiter = NULL;
+ }
if (enqd != num_bufs) {
RTE_LOG(ERR, USER1, "Could not enqueue pktmbufs!\n");
return status;
@@ -299,7 +311,19 @@ static inline int _uhd_dpdk_driver_cleanup(struct uhd_dpdk_thread *t)
}
}
- /* Now can free memory */
+ /* Now clean up waiters
+ * TODO: Determine if better to wake threads
+ */
+ int num_waiters = rte_ring_count(t->waiter_ring);
+ for (int i = 0; i < num_waiters; i++) {
+ struct uhd_dpdk_wait_req *req = NULL;
+ rte_ring_dequeue(t->waiter_ring, (void **) &req);
+ uhd_dpdk_waiter_put(req);
+ }
+ if (rte_ring_count(t->waiter_ring))
+ return -EAGAIN;
+
+ /* Now can free memory, except sock_req_ring and waiter_ring */
LIST_FOREACH(port, &t->port_list, port_entry) {
rte_hash_free(port->rx_table);
@@ -341,29 +365,12 @@ static inline int _uhd_dpdk_driver_cleanup(struct uhd_dpdk_thread *t)
return 0;
}
-int _uhd_dpdk_driver_main(void *arg)
+static inline int _uhd_dpdk_service_config_req(struct rte_ring *sock_req_ring)
{
-
- /* Don't currently have use for arguments */
- if (arg)
- return -EINVAL;
-
- /* Check that this is a valid lcore */
- unsigned int lcore_id = rte_lcore_id();
- if (lcore_id == LCORE_ID_ANY)
- return -ENODEV;
-
- /* Check that this lcore has ports */
- struct uhd_dpdk_thread *t = &ctx->threads[lcore_id];
- if (t->id != lcore_id)
- return -ENODEV;
-
- RTE_LOG(INFO, USER2, "Thread %d started\n", lcore_id);
int status = 0;
- while (!status) {
- /* Check for open()/close() requests and service 1 at a time */
- struct uhd_dpdk_config_req *sock_req;
- if (rte_ring_dequeue(t->sock_req_ring, (void **) &sock_req) == 0) {
+ struct uhd_dpdk_config_req *sock_req;
+ if (rte_ring_dequeue(sock_req_ring, (void **) &sock_req) == 0) {
+ if (sock_req) {
/* FIXME: Not checking return vals */
switch (sock_req->req_type) {
case UHD_DPDK_SOCK_OPEN:
@@ -373,7 +380,7 @@ int _uhd_dpdk_driver_main(void *arg)
_uhd_dpdk_sock_release(sock_req);
break;
case UHD_DPDK_LCORE_TERM:
- RTE_LOG(INFO, EAL, "Terminating lcore %u\n", lcore_id);
+ RTE_LOG(INFO, EAL, "Terminating lcore %u\n", rte_lcore_id());
status = 1;
_uhd_dpdk_config_req_compl(sock_req, 0);
break;
@@ -381,61 +388,207 @@ int _uhd_dpdk_driver_main(void *arg)
RTE_LOG(ERR, USER2, "Invalid socket request %d\n", sock_req->req_type);
break;
}
+ } else {
+ RTE_LOG(ERR, USER1, "%s: NULL service request received\n", __func__);
+ }
+ }
+ return status;
+}
+
+/* Do a burst of RX on port */
+static inline void _uhd_dpdk_rx_burst(struct uhd_dpdk_port *port)
+{
+ struct ether_hdr *hdr;
+ char *l2_data;
+ struct rte_mbuf *bufs[UHD_DPDK_RX_BURST_SIZE];
+ const uint16_t num_rx = rte_eth_rx_burst(port->id, 0,
+ bufs, UHD_DPDK_RX_BURST_SIZE);
+ if (unlikely(num_rx == 0)) {
+ return;
+ }
+
+ for (int buf = 0; buf < num_rx; buf++) {
+ uint64_t ol_flags = bufs[buf]->ol_flags;
+ hdr = rte_pktmbuf_mtod(bufs[buf], struct ether_hdr *);
+ l2_data = (char *) &hdr[1];
+ switch (rte_be_to_cpu_16(hdr->ether_type)) {
+ case ETHER_TYPE_ARP:
+ _uhd_dpdk_process_arp(port, (struct arp_hdr *) l2_data);
+ rte_pktmbuf_free(bufs[buf]);
+ break;
+ case ETHER_TYPE_IPv4:
+ if (ol_flags == PKT_RX_IP_CKSUM_BAD) { /* TODO: Track IP cksum errors? */
+ RTE_LOG(WARNING, RING, "Buf %d: Bad IP cksum\n", buf);
+ } else {
+ _uhd_dpdk_process_ipv4(port, bufs[buf], (struct ipv4_hdr *) l2_data);
+ }
+ break;
+ default:
+ rte_pktmbuf_free(bufs[buf]);
+ break;
+ }
+ }
+}
+
+/* Do a burst of TX on port's tx q */
+static inline int _uhd_dpdk_tx_burst(struct uhd_dpdk_port *port,
+ struct uhd_dpdk_tx_queue *q)
+{
+ if (!rte_ring_empty(q->retry_queue)) {
+ int num_retry = _uhd_dpdk_send(port, q, q->retry_queue);
+ _uhd_dpdk_restore_bufs(port, q, num_retry);
+ if (!rte_ring_empty(q->retry_queue)) {
+ return -EAGAIN;
}
+ }
+ if (rte_ring_empty(q->queue)) {
+ return 0;
+ }
+ int num_tx = _uhd_dpdk_send(port, q, q->queue);
+ if (num_tx > 0) {
+ _uhd_dpdk_restore_bufs(port, q, num_tx);
+ return 0;
+ } else {
+ return num_tx;
+ }
+}
+
+/* Process threads requesting to block on RX */
+static inline void _uhd_dpdk_process_rx_wait(struct uhd_dpdk_thread *t,
+ struct uhd_dpdk_wait_req *req)
+{
+ struct uhd_dpdk_socket *sock = req->sock;
+ if (!sock)
+ goto rx_wait_skip;
+ if (!sock->port)
+ goto rx_wait_skip;
+ if (!sock->port->rx_table)
+ goto rx_wait_skip;
+
+ if (!rte_ring_empty(sock->rx_ring))
+ goto rx_wait_skip;
+
+ struct uhd_dpdk_ipv4_5tuple ht_key;
+ if (_uhd_dpdk_sock_rx_key(sock, &ht_key))
+ goto rx_wait_skip;
+
+ struct uhd_dpdk_rx_entry *entry = NULL;
+ rte_hash_lookup_data(sock->port->rx_table, &ht_key, (void **) &entry);
+ entry->waiter = req;
+ return;
+
+rx_wait_skip:
+ _uhd_dpdk_waiter_wake(req, t);
+}
+
+/* Process threads requesting to block on TX bufs*/
+static inline void _uhd_dpdk_process_tx_buf_wait(struct uhd_dpdk_thread *t,
+ struct uhd_dpdk_wait_req *req)
+{
+ struct uhd_dpdk_socket *sock = req->sock;
+ if (!sock)
+ goto tx_wait_skip;
+ if (!sock->port)
+ goto tx_wait_skip;
+ if (!sock->tx_queue)
+ goto tx_wait_skip;
+
+ struct uhd_dpdk_tx_queue *q = sock->tx_queue;
+ if (!q->freebufs || !q->retry_queue || !q->queue)
+ goto tx_wait_skip;
+
+ if (!rte_ring_empty(q->freebufs))
+ goto tx_wait_skip;
+
+ sock->tx_queue->waiter = req;
+
+ // Attempt to restore bufs only if failed before
+ unsigned int num_bufs = sock->tx_buf_count + rte_ring_count(q->queue) +
+ rte_ring_count(q->retry_queue);
+ unsigned int max_bufs = rte_ring_get_size(q->freebufs);
+ if (num_bufs < max_bufs) {
+ _uhd_dpdk_restore_bufs(sock->port, q, max_bufs - num_bufs);
+ }
+ return;
+
+tx_wait_skip:
+ _uhd_dpdk_waiter_wake(req, t);
+}
+
+/* Process threads making requests to wait */
+static inline void _uhd_dpdk_process_waiters(struct uhd_dpdk_thread *t)
+{
+ int num_waiters = rte_ring_count(t->waiter_ring);
+ num_waiters = (num_waiters > UHD_DPDK_MAX_PENDING_SOCK_REQS) ?
+ UHD_DPDK_MAX_PENDING_SOCK_REQS :
+ num_waiters;
+ for (int i = 0; i < num_waiters; i++) {
+ /* Dequeue */
+ struct uhd_dpdk_wait_req *req = NULL;
+ if (rte_ring_dequeue(t->waiter_ring, (void **) &req))
+ break;
+ switch (req->reason) {
+ case UHD_DPDK_WAIT_SIMPLE:
+ _uhd_dpdk_waiter_wake(req, t);
+ break;
+ case UHD_DPDK_WAIT_RX:
+ _uhd_dpdk_process_rx_wait(t, req);
+ break;
+ default:
+ RTE_LOG(ERR, USER2, "Invalid reason associated with wait request\n");
+ _uhd_dpdk_waiter_wake(req, t);
+ break;
+ }
+ }
+}
+
+int _uhd_dpdk_driver_main(void *arg)
+{
+
+ /* Don't currently have use for arguments */
+ if (arg)
+ return -EINVAL;
+
+ /* Check that this is a valid lcore */
+ unsigned int lcore_id = rte_lcore_id();
+ if (lcore_id == LCORE_ID_ANY)
+ return -ENODEV;
+
+ /* Check that this lcore has ports */
+ struct uhd_dpdk_thread *t = &ctx->threads[lcore_id];
+ if (t->lcore != lcore_id)
+ return -ENODEV;
+
+ pthread_getaffinity_np(pthread_self(), sizeof(cpu_set_t),
+ &t->cpu_affinity);
+ char name[16];
+ snprintf(name, sizeof(name), "dpdk-io_%u", lcore_id);
+ pthread_setname_np(pthread_self(), name);
+
+ RTE_LOG(INFO, USER2, "Thread %d started\n", lcore_id);
+ int status = 0;
+ while (!status) {
+ /* Check for open()/close()/term() requests and service 1 at a time */
+ status = _uhd_dpdk_service_config_req(t->sock_req_ring);
/* For each port, attempt to receive packets and process */
struct uhd_dpdk_port *port = NULL;
LIST_FOREACH(port, &t->port_list, port_entry) {
- struct ether_hdr *hdr;
- char *l2_data;
- struct rte_mbuf *bufs[UHD_DPDK_RX_BURST_SIZE];
- const uint16_t num_rx = rte_eth_rx_burst(port->id, 0,
- bufs, UHD_DPDK_RX_BURST_SIZE);
- if (unlikely(num_rx == 0)) {
- continue;
- }
-
- for (int buf = 0; buf < num_rx; buf++) {
- uint64_t ol_flags = bufs[buf]->ol_flags;
- hdr = rte_pktmbuf_mtod(bufs[buf], struct ether_hdr *);
- l2_data = (char *) &hdr[1];
- switch (rte_be_to_cpu_16(hdr->ether_type)) {
- case ETHER_TYPE_ARP:
- _uhd_dpdk_process_arp(port, (struct arp_hdr *) l2_data);
- rte_pktmbuf_free(bufs[buf]);
- break;
- case ETHER_TYPE_IPv4:
- if (ol_flags == PKT_RX_IP_CKSUM_BAD) { /* TODO: Track IP cksum errors? */
- RTE_LOG(WARNING, RING, "Buf %d: Bad IP cksum\n", buf);
- } else {
- _uhd_dpdk_process_ipv4(port, bufs[buf], (struct ipv4_hdr *) l2_data);
- }
- break;
- default:
- rte_pktmbuf_free(bufs[buf]);
- break;
- }
- }
+ _uhd_dpdk_rx_burst(port);
}
+
+ /* TODO: Handle waiter_ring
+ * Also use it for config_req wake retries
+ * Also take care of RX table with new struct w/ waiter
+ * (construction, adding, destruction)
+ */
+ _uhd_dpdk_process_waiters(t);
+
/* For each port's TX queues, do TX */
LIST_FOREACH(port, &t->port_list, port_entry) {
struct uhd_dpdk_tx_queue *q = NULL;
LIST_FOREACH(q, &port->txq_list, entry) {
- if (!rte_ring_empty(q->retry_queue)) {
- int num_retry = _uhd_dpdk_send(port, q, q->retry_queue);
- _uhd_dpdk_restore_bufs(port, q, num_retry);
- if (!rte_ring_empty(q->retry_queue)) {
- break;
- }
- }
- if (rte_ring_empty(q->queue)) {
- continue;
- }
- int num_tx = _uhd_dpdk_send(port, q, q->queue);
- if (num_tx > 0) {
- _uhd_dpdk_restore_bufs(port, q, num_tx);
- } else {
+ if (_uhd_dpdk_tx_burst(port, q))
break;
- }
}
}
}