diff options
author | Alex Williams <alex.williams@ni.com> | 2018-08-31 11:35:07 -0700 |
---|---|---|
committer | Brent Stapleton <brent.stapleton@ettus.com> | 2019-01-15 17:14:57 -0800 |
commit | e2195ac505bd423d3d2f973bbe94da1c78296aa6 (patch) | |
tree | 296ffd98c620c4ad3e313cd697891418af26cc94 /host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c | |
parent | e2cde21ceb7497dcc1ef25156afa6472fe64f009 (diff) | |
download | uhd-e2195ac505bd423d3d2f973bbe94da1c78296aa6.tar.gz uhd-e2195ac505bd423d3d2f973bbe94da1c78296aa6.tar.bz2 uhd-e2195ac505bd423d3d2f973bbe94da1c78296aa6.zip |
transport: Add blocking recv calls to uhd-dpdk
This adds an internal wait queue API to uhd-dpdk. Socket configuration
requests had their blocking calls re-implemented on top of this API, and
it is also used to service requests to wait on RX packets (w/ timeout).
The wait API involves a multi-producer, single-consumer queue per I/O
thread (waiter_ring), with a condition variable used for sleeping. The
data structure is shared between user thread and I/O thread, and because
timeouts make resource release time non-deterministic, we use reference
counting on the shared resource.
One reference is generated by the user thread and passed to the I/O
thread to consume. A user thread that still needs the data after waking
must get() another reference, to postpone the destruction of the
resource until it is done.
Timeouts are based on CLOCK_MONOTONIC. For recv, a timeout of 0
indicates blocking indefinitely, and a negative timeout indicates no
timeout is desired.
Also drop timeout for closing sockets in uhd-dpdk.
The timeout would allow a user thread to pre-empt the I/O thread's
cleanup process. The user thread would free data structures the I/O
thread still needed to function. Since this timeout is superfluous
anyway, let's just get rid of it.
Also add some more input checking and error reporting.
Diffstat (limited to 'host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c')
-rw-r--r-- | host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c | 299 |
1 files changed, 226 insertions, 73 deletions
diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c b/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c index 8388359e7..f603f1f8f 100644 --- a/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c @@ -6,6 +6,7 @@ #include "uhd_dpdk_driver.h" #include "uhd_dpdk_fops.h" #include "uhd_dpdk_udp.h" +#include "uhd_dpdk_wait.h" #include <rte_malloc.h> #include <rte_mempool.h> #include <arpa/inet.h> @@ -145,19 +146,25 @@ int _uhd_dpdk_process_udp(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, str .dst_port = pkt->dst_port }; - struct uhd_dpdk_socket *sock = NULL; - rte_hash_lookup_data(port->rx_table, &ht_key, (void **) &sock); - if (!sock) { + struct uhd_dpdk_rx_entry *entry = NULL; + rte_hash_lookup_data(port->rx_table, &ht_key, (void **) &entry); + if (!entry) { status = -ENODEV; + //RTE_LOG(WARNING, USER1, "%s: Dropping packet to UDP port %d\n", __func__, ntohs(pkt->dst_port)); goto udp_rx_drop; } - struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) sock->priv; - status = rte_ring_enqueue(sock->rx_ring, mbuf); + struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) entry->sock->priv; + status = rte_ring_enqueue(entry->sock->rx_ring, mbuf); + if (entry->waiter) { + _uhd_dpdk_waiter_wake(entry->waiter, port->parent); + entry->waiter = NULL; + } if (status) { pdata->dropped_pkts++; goto udp_rx_drop; } + pdata->xferd_pkts++; return 0; udp_rx_drop: @@ -220,6 +227,7 @@ static int _uhd_dpdk_send(struct uhd_dpdk_port *port, return status; } } + status = rte_eth_tx_burst(port->id, 0, &buf, 1); /* Automatically frees mbuf */ if (status != 1) { status = rte_ring_enqueue(txq->retry_queue, buf); @@ -247,6 +255,10 @@ static inline int _uhd_dpdk_restore_bufs(struct uhd_dpdk_port *port, /* Enqueue the buffers for the user thread to retrieve */ unsigned int enqd = rte_ring_enqueue_bulk(q->freebufs, (void **) freebufs, num_bufs, NULL); + if (q->waiter && rte_ring_count(q->freebufs) > 0) { + _uhd_dpdk_waiter_wake(q->waiter, port->parent); + q->waiter = NULL; + } if (enqd != num_bufs) { RTE_LOG(ERR, USER1, "Could not enqueue pktmbufs!\n"); return status; @@ -299,7 +311,19 @@ static inline int _uhd_dpdk_driver_cleanup(struct uhd_dpdk_thread *t) } } - /* Now can free memory */ + /* Now clean up waiters + * TODO: Determine if better to wake threads + */ + int num_waiters = rte_ring_count(t->waiter_ring); + for (int i = 0; i < num_waiters; i++) { + struct uhd_dpdk_wait_req *req = NULL; + rte_ring_dequeue(t->waiter_ring, (void **) &req); + uhd_dpdk_waiter_put(req); + } + if (rte_ring_count(t->waiter_ring)) + return -EAGAIN; + + /* Now can free memory, except sock_req_ring and waiter_ring */ LIST_FOREACH(port, &t->port_list, port_entry) { rte_hash_free(port->rx_table); @@ -341,29 +365,12 @@ static inline int _uhd_dpdk_driver_cleanup(struct uhd_dpdk_thread *t) return 0; } -int _uhd_dpdk_driver_main(void *arg) +static inline int _uhd_dpdk_service_config_req(struct rte_ring *sock_req_ring) { - - /* Don't currently have use for arguments */ - if (arg) - return -EINVAL; - - /* Check that this is a valid lcore */ - unsigned int lcore_id = rte_lcore_id(); - if (lcore_id == LCORE_ID_ANY) - return -ENODEV; - - /* Check that this lcore has ports */ - struct uhd_dpdk_thread *t = &ctx->threads[lcore_id]; - if (t->id != lcore_id) - return -ENODEV; - - RTE_LOG(INFO, USER2, "Thread %d started\n", lcore_id); int status = 0; - while (!status) { - /* Check for open()/close() requests and service 1 at a time */ - struct uhd_dpdk_config_req *sock_req; - if (rte_ring_dequeue(t->sock_req_ring, (void **) &sock_req) == 0) { + struct uhd_dpdk_config_req *sock_req; + if (rte_ring_dequeue(sock_req_ring, (void **) &sock_req) == 0) { + if (sock_req) { /* FIXME: Not checking return vals */ switch (sock_req->req_type) { case UHD_DPDK_SOCK_OPEN: @@ -373,7 +380,7 @@ int _uhd_dpdk_driver_main(void *arg) _uhd_dpdk_sock_release(sock_req); break; case UHD_DPDK_LCORE_TERM: - RTE_LOG(INFO, EAL, "Terminating lcore %u\n", lcore_id); + RTE_LOG(INFO, EAL, "Terminating lcore %u\n", rte_lcore_id()); status = 1; _uhd_dpdk_config_req_compl(sock_req, 0); break; @@ -381,61 +388,207 @@ int _uhd_dpdk_driver_main(void *arg) RTE_LOG(ERR, USER2, "Invalid socket request %d\n", sock_req->req_type); break; } + } else { + RTE_LOG(ERR, USER1, "%s: NULL service request received\n", __func__); + } + } + return status; +} + +/* Do a burst of RX on port */ +static inline void _uhd_dpdk_rx_burst(struct uhd_dpdk_port *port) +{ + struct ether_hdr *hdr; + char *l2_data; + struct rte_mbuf *bufs[UHD_DPDK_RX_BURST_SIZE]; + const uint16_t num_rx = rte_eth_rx_burst(port->id, 0, + bufs, UHD_DPDK_RX_BURST_SIZE); + if (unlikely(num_rx == 0)) { + return; + } + + for (int buf = 0; buf < num_rx; buf++) { + uint64_t ol_flags = bufs[buf]->ol_flags; + hdr = rte_pktmbuf_mtod(bufs[buf], struct ether_hdr *); + l2_data = (char *) &hdr[1]; + switch (rte_be_to_cpu_16(hdr->ether_type)) { + case ETHER_TYPE_ARP: + _uhd_dpdk_process_arp(port, (struct arp_hdr *) l2_data); + rte_pktmbuf_free(bufs[buf]); + break; + case ETHER_TYPE_IPv4: + if (ol_flags == PKT_RX_IP_CKSUM_BAD) { /* TODO: Track IP cksum errors? */ + RTE_LOG(WARNING, RING, "Buf %d: Bad IP cksum\n", buf); + } else { + _uhd_dpdk_process_ipv4(port, bufs[buf], (struct ipv4_hdr *) l2_data); + } + break; + default: + rte_pktmbuf_free(bufs[buf]); + break; + } + } +} + +/* Do a burst of TX on port's tx q */ +static inline int _uhd_dpdk_tx_burst(struct uhd_dpdk_port *port, + struct uhd_dpdk_tx_queue *q) +{ + if (!rte_ring_empty(q->retry_queue)) { + int num_retry = _uhd_dpdk_send(port, q, q->retry_queue); + _uhd_dpdk_restore_bufs(port, q, num_retry); + if (!rte_ring_empty(q->retry_queue)) { + return -EAGAIN; } + } + if (rte_ring_empty(q->queue)) { + return 0; + } + int num_tx = _uhd_dpdk_send(port, q, q->queue); + if (num_tx > 0) { + _uhd_dpdk_restore_bufs(port, q, num_tx); + return 0; + } else { + return num_tx; + } +} + +/* Process threads requesting to block on RX */ +static inline void _uhd_dpdk_process_rx_wait(struct uhd_dpdk_thread *t, + struct uhd_dpdk_wait_req *req) +{ + struct uhd_dpdk_socket *sock = req->sock; + if (!sock) + goto rx_wait_skip; + if (!sock->port) + goto rx_wait_skip; + if (!sock->port->rx_table) + goto rx_wait_skip; + + if (!rte_ring_empty(sock->rx_ring)) + goto rx_wait_skip; + + struct uhd_dpdk_ipv4_5tuple ht_key; + if (_uhd_dpdk_sock_rx_key(sock, &ht_key)) + goto rx_wait_skip; + + struct uhd_dpdk_rx_entry *entry = NULL; + rte_hash_lookup_data(sock->port->rx_table, &ht_key, (void **) &entry); + entry->waiter = req; + return; + +rx_wait_skip: + _uhd_dpdk_waiter_wake(req, t); +} + +/* Process threads requesting to block on TX bufs*/ +static inline void _uhd_dpdk_process_tx_buf_wait(struct uhd_dpdk_thread *t, + struct uhd_dpdk_wait_req *req) +{ + struct uhd_dpdk_socket *sock = req->sock; + if (!sock) + goto tx_wait_skip; + if (!sock->port) + goto tx_wait_skip; + if (!sock->tx_queue) + goto tx_wait_skip; + + struct uhd_dpdk_tx_queue *q = sock->tx_queue; + if (!q->freebufs || !q->retry_queue || !q->queue) + goto tx_wait_skip; + + if (!rte_ring_empty(q->freebufs)) + goto tx_wait_skip; + + sock->tx_queue->waiter = req; + + // Attempt to restore bufs only if failed before + unsigned int num_bufs = sock->tx_buf_count + rte_ring_count(q->queue) + + rte_ring_count(q->retry_queue); + unsigned int max_bufs = rte_ring_get_size(q->freebufs); + if (num_bufs < max_bufs) { + _uhd_dpdk_restore_bufs(sock->port, q, max_bufs - num_bufs); + } + return; + +tx_wait_skip: + _uhd_dpdk_waiter_wake(req, t); +} + +/* Process threads making requests to wait */ +static inline void _uhd_dpdk_process_waiters(struct uhd_dpdk_thread *t) +{ + int num_waiters = rte_ring_count(t->waiter_ring); + num_waiters = (num_waiters > UHD_DPDK_MAX_PENDING_SOCK_REQS) ? + UHD_DPDK_MAX_PENDING_SOCK_REQS : + num_waiters; + for (int i = 0; i < num_waiters; i++) { + /* Dequeue */ + struct uhd_dpdk_wait_req *req = NULL; + if (rte_ring_dequeue(t->waiter_ring, (void **) &req)) + break; + switch (req->reason) { + case UHD_DPDK_WAIT_SIMPLE: + _uhd_dpdk_waiter_wake(req, t); + break; + case UHD_DPDK_WAIT_RX: + _uhd_dpdk_process_rx_wait(t, req); + break; + default: + RTE_LOG(ERR, USER2, "Invalid reason associated with wait request\n"); + _uhd_dpdk_waiter_wake(req, t); + break; + } + } +} + +int _uhd_dpdk_driver_main(void *arg) +{ + + /* Don't currently have use for arguments */ + if (arg) + return -EINVAL; + + /* Check that this is a valid lcore */ + unsigned int lcore_id = rte_lcore_id(); + if (lcore_id == LCORE_ID_ANY) + return -ENODEV; + + /* Check that this lcore has ports */ + struct uhd_dpdk_thread *t = &ctx->threads[lcore_id]; + if (t->lcore != lcore_id) + return -ENODEV; + + pthread_getaffinity_np(pthread_self(), sizeof(cpu_set_t), + &t->cpu_affinity); + char name[16]; + snprintf(name, sizeof(name), "dpdk-io_%u", lcore_id); + pthread_setname_np(pthread_self(), name); + + RTE_LOG(INFO, USER2, "Thread %d started\n", lcore_id); + int status = 0; + while (!status) { + /* Check for open()/close()/term() requests and service 1 at a time */ + status = _uhd_dpdk_service_config_req(t->sock_req_ring); /* For each port, attempt to receive packets and process */ struct uhd_dpdk_port *port = NULL; LIST_FOREACH(port, &t->port_list, port_entry) { - struct ether_hdr *hdr; - char *l2_data; - struct rte_mbuf *bufs[UHD_DPDK_RX_BURST_SIZE]; - const uint16_t num_rx = rte_eth_rx_burst(port->id, 0, - bufs, UHD_DPDK_RX_BURST_SIZE); - if (unlikely(num_rx == 0)) { - continue; - } - - for (int buf = 0; buf < num_rx; buf++) { - uint64_t ol_flags = bufs[buf]->ol_flags; - hdr = rte_pktmbuf_mtod(bufs[buf], struct ether_hdr *); - l2_data = (char *) &hdr[1]; - switch (rte_be_to_cpu_16(hdr->ether_type)) { - case ETHER_TYPE_ARP: - _uhd_dpdk_process_arp(port, (struct arp_hdr *) l2_data); - rte_pktmbuf_free(bufs[buf]); - break; - case ETHER_TYPE_IPv4: - if (ol_flags == PKT_RX_IP_CKSUM_BAD) { /* TODO: Track IP cksum errors? */ - RTE_LOG(WARNING, RING, "Buf %d: Bad IP cksum\n", buf); - } else { - _uhd_dpdk_process_ipv4(port, bufs[buf], (struct ipv4_hdr *) l2_data); - } - break; - default: - rte_pktmbuf_free(bufs[buf]); - break; - } - } + _uhd_dpdk_rx_burst(port); } + + /* TODO: Handle waiter_ring + * Also use it for config_req wake retries + * Also take care of RX table with new struct w/ waiter + * (construction, adding, destruction) + */ + _uhd_dpdk_process_waiters(t); + /* For each port's TX queues, do TX */ LIST_FOREACH(port, &t->port_list, port_entry) { struct uhd_dpdk_tx_queue *q = NULL; LIST_FOREACH(q, &port->txq_list, entry) { - if (!rte_ring_empty(q->retry_queue)) { - int num_retry = _uhd_dpdk_send(port, q, q->retry_queue); - _uhd_dpdk_restore_bufs(port, q, num_retry); - if (!rte_ring_empty(q->retry_queue)) { - break; - } - } - if (rte_ring_empty(q->queue)) { - continue; - } - int num_tx = _uhd_dpdk_send(port, q, q->queue); - if (num_tx > 0) { - _uhd_dpdk_restore_bufs(port, q, num_tx); - } else { + if (_uhd_dpdk_tx_burst(port, q)) break; - } } } } |