diff options
author | Alex Williams <alex.williams@ni.com> | 2018-03-07 15:24:04 -0800 |
---|---|---|
committer | Martin Braun <martin.braun@ettus.com> | 2018-08-29 15:52:00 -0700 |
commit | 2084a5a72df45fbcda82839ea35486f8583227bc (patch) | |
tree | 150695c61e0d2ec3eee906da87dd0a9e5546d725 /host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c | |
parent | 3f39388059546d44e4302e098fc241f1a71e6d4e (diff) | |
download | uhd-2084a5a72df45fbcda82839ea35486f8583227bc.tar.gz uhd-2084a5a72df45fbcda82839ea35486f8583227bc.tar.bz2 uhd-2084a5a72df45fbcda82839ea35486f8583227bc.zip |
uhd-dpdk: Add DPDK-based sockets-like library
This library makes available a userspace network stack with a
socket-like interface for applications (except the sockets pass around
pointers to buffers and use the buffers directly--It's sockets + a
put/get for buffer management). Supported services are ARP and UDP.
Destinations can be unicast or broadcast. Multicast is not currently
supported.
The implementation has two driver layers. The upper layer runs within
the caller's context. The caller will make requests through lockless
ring buffers (including socket creation and packet transmission), and
the lower layer will implement the requests and provide a response.
Currently, the lower layer runs in a separate I/O thread, and the caller
will block until it receives a response.
The I/O thread's main body is in src/uhd_dpdk_driver.c. You'll find that
all I/O thread functions are prefixed by an underscore, and user thread
functions do not.
src/uhd_dpdk.c is used to initialize uhd-dpdk and bring up the network
interfaces.
src/uhd_dpdk_fops.c and src/uhd_dpdk_udp.c are for network services.
The test is a benchmark of a flow control loop using a certain made-up
protocol with credits and sequence number tracking.
Diffstat (limited to 'host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c')
-rw-r--r-- | host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c | 456 |
1 files changed, 456 insertions, 0 deletions
diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c new file mode 100644 index 000000000..26cfd43e1 --- /dev/null +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c @@ -0,0 +1,456 @@ +// +// Copyright 2018 Ettus Research, a National Instruments Company +// +// SPDX-License-Identifier: GPL-3.0-or-later +// +#include "uhd_dpdk_fops.h" +#include "uhd_dpdk_udp.h" +#include "uhd_dpdk_driver.h" +#include <rte_ring.h> +#include <rte_malloc.h> +#include <unistd.h> +#include <sys/syscall.h> +#include <arpa/inet.h> + +/************************************************ + * I/O thread ONLY + */ + +static int _alloc_txq(struct uhd_dpdk_port *port, pid_t tid, struct uhd_dpdk_tx_queue **queue) +{ + *queue = NULL; + struct uhd_dpdk_tx_queue *q = rte_zmalloc(NULL, sizeof(*q), 0); + if (!q) { + RTE_LOG(ERR, USER1, "%s: Cannot allocate TX queue\n", __func__); + return -ENOMEM; + } + q->tid = tid; + LIST_INIT(&q->tx_list); + + char name[32]; + snprintf(name, sizeof(name), "tx_ring_udp_%u.%u", port->id, tid); + q->queue = rte_ring_create( + name, + UHD_DPDK_TXQ_SIZE, + rte_socket_id(), + RING_F_SC_DEQ | RING_F_SP_ENQ + ); + snprintf(name, sizeof(name), "buffer_ring_udp_%u.%u", port->id, tid); + q->freebufs = rte_ring_create( + name, + UHD_DPDK_TXQ_SIZE, + rte_socket_id(), + RING_F_SC_DEQ | RING_F_SP_ENQ + ); + /* Set up retry queue */ + snprintf(name, sizeof(name), "retry_queue_%u", port->id); + q->retry_queue = rte_ring_create( + name, + UHD_DPDK_TXQ_SIZE, + rte_socket_id(), + RING_F_SC_DEQ | RING_F_SP_ENQ + ); + + if (!q->queue || !q->freebufs || !q->retry_queue) { + RTE_LOG(ERR, USER1, "%s: Cannot allocate TX rings\n", __func__); + if (q->queue) + rte_ring_free(q->queue); + if (q->freebufs) + rte_ring_free(q->freebufs); + if (q->retry_queue) + rte_ring_free(q->retry_queue); + rte_free(q); + return -ENOMEM; + } + struct rte_mbuf *bufs[UHD_DPDK_TXQ_SIZE]; + unsigned int num_bufs = rte_ring_free_count(q->freebufs); + int buf_stat = rte_pktmbuf_alloc_bulk(port->parent->tx_pktbuf_pool, bufs, num_bufs); + if (buf_stat) { + rte_ring_free(q->freebufs); + rte_ring_free(q->queue); + rte_ring_free(q->retry_queue); + rte_free(q); + RTE_LOG(ERR, USER1, "%s: Cannot allocate packet buffers\n", __func__); + return -ENOENT; + } + unsigned int enqd = rte_ring_enqueue_bulk(q->freebufs, (void **) bufs, num_bufs, NULL); + if (enqd != num_bufs) { + RTE_LOG(ERR, USER1, "%s: Cannot enqueue freebufs\n", __func__); + } + LIST_INSERT_HEAD(&port->txq_list, q, entry); + *queue = q; + return 0; +} + +/* Finish setting up UDP socket (unless ARP needs to be done) + * Not multi-thread safe! + * This call should only be used by the thread servicing the port + * In addition, the code below assumes simplex sockets and unique receive ports + * FIXME: May need some way to help clean up abandoned socket requests (refcnt check...) + */ +int _uhd_dpdk_udp_setup(struct uhd_dpdk_config_req *req) +{ + int retval = 0; + struct uhd_dpdk_socket *sock = req->sock; + struct uhd_dpdk_udp_priv *pdata = sock->priv; + struct uhd_dpdk_port *port = req->sock->port; + + struct uhd_dpdk_ipv4_5tuple ht_key = { + .sock_type = UHD_DPDK_SOCK_UDP, + .src_ip = 0, + .src_port = 0, + .dst_ip = 0, + .dst_port = pdata->dst_port + }; + + /* Are we doing RX? */ + if (sock->rx_ring) { + /* Add to rx table */ + if (pdata->dst_port == 0) { + /* Assign unused one in a very slow fashion */ + for (uint16_t i = 1; i > 0; i++) { + ht_key.dst_port = htons(i); + if (rte_hash_lookup(port->rx_table, &ht_key) == -ENOENT) { + pdata->dst_port = htons(i); + break; + } + } + } + + /* Is the port STILL invalid? */ + if (pdata->dst_port == 0) { + RTE_LOG(ERR, USER1, "%s: No available UDP ports\n", __func__); + _uhd_dpdk_config_req_compl(req, -EADDRINUSE); + return -EADDRINUSE; + } + + ht_key.dst_port = pdata->dst_port; + if (rte_hash_lookup(port->rx_table, &ht_key) > 0) { + RTE_LOG(ERR, USER1, "%s: Cannot add to RX table\n", __func__); + _uhd_dpdk_config_req_compl(req, -EADDRINUSE); + return -EADDRINUSE; + } + + char name[32]; + snprintf(name, sizeof(name), "rx_ring_udp_%u.%u", port->id, ntohs(pdata->dst_port)); + sock->rx_ring = rte_ring_create( + name, + UHD_DPDK_RXQ_SIZE, + rte_socket_id(), + RING_F_SC_DEQ | RING_F_SP_ENQ + ); + if (!sock->rx_ring) { + RTE_LOG(ERR, USER1, "%s: Cannot allocate RX ring\n", __func__); + _uhd_dpdk_config_req_compl(req, -ENOMEM); + return -ENOMEM; + } + retval = rte_hash_add_key_data(port->rx_table, &ht_key, sock); + if (retval != 0) { + RTE_LOG(WARNING, TABLE, "Could not add new RX socket to port %d: %d\n", port->id, retval); + rte_ring_free(sock->rx_ring); + _uhd_dpdk_config_req_compl(req, retval); + return retval; + } + _uhd_dpdk_config_req_compl(req, 0); + } + + /* Are we doing TX? */ + if (sock->tx_ring) { + sock->tx_ring = NULL; + struct uhd_dpdk_tx_queue *q = NULL; + LIST_FOREACH(q, &port->txq_list, entry) { + if (q->tid == sock->tid) { + LIST_INSERT_HEAD(&q->tx_list, sock, tx_entry); + sock->tx_ring = q->queue; + sock->rx_ring = q->freebufs; + break; + } + } + if (!sock->tx_ring) { + retval = _alloc_txq(port, sock->tid, &q); + if (retval) { + _uhd_dpdk_config_req_compl(req, retval); + return retval; + } + sock->tx_ring = q->queue; + sock->rx_ring = q->freebufs; + } + /* If a broadcast type, just finish setup and return */ + if (is_broadcast(port, pdata->dst_ipv4_addr)) { + LIST_INSERT_HEAD(&q->tx_list, sock, tx_entry); + _uhd_dpdk_config_req_compl(req, 0); + return 0; + } + /* Otherwise... Check for entry in ARP table */ + struct uhd_dpdk_arp_entry *entry = NULL; + int arp_table_stat = rte_hash_lookup_data(port->arp_table, &pdata->dst_ipv4_addr, (void **) &entry); + if (entry) { + /* Check for null entry */ + if ((entry->mac_addr.addr_bytes[0] == 0xFF) && + (entry->mac_addr.addr_bytes[1] == 0xFF) && + (entry->mac_addr.addr_bytes[2] == 0xFF) && + (entry->mac_addr.addr_bytes[3] == 0xFF) && + (entry->mac_addr.addr_bytes[4] == 0xFF) && + (entry->mac_addr.addr_bytes[5] == 0xFF)) { + arp_table_stat = -ENOENT; + } + } else { + /* No entry -> Add null entry */ + entry = rte_zmalloc(NULL, sizeof(*entry), 0); + if (!entry) { + RTE_LOG(ERR, USER1, "%s: Cannot allocate ARP entry\n", __func__); + _uhd_dpdk_config_req_compl(req, -ENOMEM); + return -ENOMEM; + } + memset(entry->mac_addr.addr_bytes, 0xFF, ETHER_ADDR_LEN); + LIST_INIT(&entry->pending_list); + + if (rte_hash_add_key_data(port->arp_table, &pdata->dst_ipv4_addr, entry) < 0) { + rte_free(entry); + RTE_LOG(ERR, USER1, "%s: Cannot add entry to ARP table\n", __func__); + _uhd_dpdk_config_req_compl(req, -ENOMEM); + return -ENOMEM; + } + } + + /* Was there a valid address? */ + if (arp_table_stat == -ENOENT) { + /* Get valid address and have requestor continue waiting */ + int arp_stat = 0; + do { /* Keep trying to send request if no descriptor */ + arp_stat = _uhd_dpdk_arp_request(port, pdata->dst_ipv4_addr); + } while (arp_stat == -EAGAIN); + + if (arp_stat) { + /* Config request errors out */ + RTE_LOG(ERR, USER1, "%s: Cannot make ARP request\n", __func__); + _uhd_dpdk_config_req_compl(req, arp_stat); + return arp_stat; + } + /* Append req to pending list. Will signal later. */ + LIST_INSERT_HEAD(&entry->pending_list, req, entry); + LIST_INSERT_HEAD(&q->tx_list, sock, tx_entry); + } else { + /* We have a valid address. All good. */ + LIST_INSERT_HEAD(&q->tx_list, sock, tx_entry); + _uhd_dpdk_config_req_compl(req, 0); + } + } + return 0; +} + +int _uhd_dpdk_udp_release(struct uhd_dpdk_config_req *req) +{ + struct uhd_dpdk_socket *sock = req->sock; + struct uhd_dpdk_port *port = req->sock->port; + struct uhd_dpdk_config_req *conf_req = NULL; + struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) sock->priv; + if (sock->tx_ring) { + /* Remove from tx_list */ + LIST_REMOVE(sock, tx_entry); + /* Check for entry in ARP table */ + struct uhd_dpdk_arp_entry *entry = NULL; + rte_hash_lookup_data(port->arp_table, &pdata->dst_ipv4_addr, (void **) &entry); + if (entry) { + LIST_FOREACH(conf_req, &entry->pending_list, entry) { + if (conf_req->sock == sock) { + LIST_REMOVE(conf_req, entry); + break; + } + } + } + + /* Add outstanding buffers back to TX queue's freebufs */ + struct rte_mbuf *freebufs[UHD_DPDK_TXQ_SIZE]; + int status = rte_pktmbuf_alloc_bulk(port->parent->tx_pktbuf_pool, freebufs, sock->tx_buf_count); + if (status) { + RTE_LOG(ERR, USER1, "%d %s: Could not restore %u TX buffers in bulk!\n", status, __func__, sock->tx_buf_count); + } + + unsigned int enqd = rte_ring_enqueue_bulk(sock->rx_ring, (void **) freebufs, sock->tx_buf_count, NULL); + if (enqd != (unsigned int) sock->tx_buf_count) { + RTE_LOG(ERR, USER1, "%s: Could not enqueue TX buffers!\n", __func__); + return status; + } + } else if (sock->rx_ring) { + struct uhd_dpdk_ipv4_5tuple ht_key = { + .sock_type = UHD_DPDK_SOCK_UDP, + .src_ip = 0, + .src_port = 0, + .dst_ip = 0, + .dst_port = pdata->dst_port + }; + rte_hash_del_key(port->rx_table, &ht_key); + struct rte_mbuf *mbuf = NULL; + while (rte_ring_dequeue(sock->rx_ring, (void **) &mbuf) == 0) { + rte_pktmbuf_free(mbuf); + } + rte_ring_free(sock->rx_ring); + } + + _uhd_dpdk_config_req_compl(req, 0); + return 0; +} + +/* Configure a socket for UDP + * TODO: Make sure EVERYTHING is configured in socket + * FIXME: Make all of this part of the user thread, except hash table interaction + * and list handling + */ +void uhd_dpdk_udp_open(struct uhd_dpdk_config_req *req, + struct uhd_dpdk_sockarg_udp *arg) +{ + if (!req) + return; + + if (!arg) { + req->retval = -EINVAL; + return; + } + + struct uhd_dpdk_socket *sock = req->sock; + pid_t tid = syscall(__NR_gettid); + sock->tid = tid; + + /* Create private data */ + struct uhd_dpdk_udp_priv *data = (struct uhd_dpdk_udp_priv *) rte_zmalloc(NULL, sizeof(*data), 0); + if (!data) { + req->retval = -ENOMEM; + return; + } + sock->priv = data; + + data->dst_ipv4_addr = arg->dst_addr; + if (arg->is_tx) { + data->src_port = arg->local_port; + data->dst_port = arg->remote_port; + sock->tx_ring = (struct rte_ring *) sock; + } else { + data->src_port = arg->remote_port; + data->dst_port = arg->local_port; + sock->rx_ring = (struct rte_ring *) sock; + } + + /* TODO: Add support for I/O thread calling (skip locking and sleep) */ + /* Add to port's config queue */ + pthread_mutex_lock(&req->mutex); + if (rte_ring_enqueue(req->sock->port->parent->sock_req_ring, req)) { + pthread_mutex_unlock(&req->mutex); + rte_free(data); + req->retval = -ENOSPC; + return; + } + struct timespec timeout; + clock_gettime(CLOCK_MONOTONIC, &timeout); + timeout.tv_sec += 1; + pthread_cond_timedwait(&req->cond, &req->mutex, &timeout); + pthread_mutex_unlock(&req->mutex); + + if (req->retval) + rte_free(data); +} + +void uhd_dpdk_udp_close(struct uhd_dpdk_config_req *req) +{ + if (!req) + return; + + pthread_mutex_lock(&req->mutex); + if (rte_ring_enqueue(req->sock->port->parent->sock_req_ring, req)) { + pthread_mutex_unlock(&req->mutex); + rte_free(req->sock->priv); + req->retval = -ENOSPC; + return; + } + struct timespec timeout; + clock_gettime(CLOCK_MONOTONIC, &timeout); + timeout.tv_sec += 1; + pthread_cond_timedwait(&req->cond, &req->mutex, &timeout); + pthread_mutex_unlock(&req->mutex); + rte_free(req->sock->priv); +} + +/* + * Note: I/O thread will fill in destination MAC address (doesn't happen here) + */ +static void uhd_dpdk_ipv4_prep(struct uhd_dpdk_port *port, + struct rte_mbuf *mbuf, + uint32_t dst_ipv4_addr, + uint8_t proto_id, + uint32_t payload_len) +{ + struct ether_hdr *eth_hdr = rte_pktmbuf_mtod(mbuf, struct ether_hdr *); + struct ipv4_hdr *ip_hdr = (struct ipv4_hdr *) ð_hdr[1]; + + ether_addr_copy(&port->mac_addr, ð_hdr->s_addr); + eth_hdr->ether_type = rte_cpu_to_be_16(ETHER_TYPE_IPv4); + + ip_hdr->version_ihl = 0x40 | 5; + ip_hdr->type_of_service = 0; + ip_hdr->total_length = rte_cpu_to_be_16(20 + payload_len); + ip_hdr->packet_id = 0; + ip_hdr->fragment_offset = rte_cpu_to_be_16(IPV4_HDR_DF_FLAG); + ip_hdr->time_to_live = 64; + ip_hdr->next_proto_id = proto_id; + ip_hdr->hdr_checksum = 0; /* FIXME: Assuming hardware can offload */ + ip_hdr->src_addr = port->ipv4_addr; + ip_hdr->dst_addr = dst_ipv4_addr; + + mbuf->pkt_len = sizeof(struct ether_hdr) + sizeof(struct ipv4_hdr) + payload_len; + mbuf->data_len = sizeof(struct ether_hdr) + sizeof(struct ipv4_hdr) + payload_len; +} + +int uhd_dpdk_udp_prep(struct uhd_dpdk_socket *sock, + struct rte_mbuf *mbuf) +{ + struct ether_hdr *eth_hdr; + struct ipv4_hdr *ip_hdr; + struct udp_hdr *tx_hdr; + struct uhd_dpdk_port *port = sock->port; + struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) sock->priv; + + if (unlikely(mbuf == NULL || pdata == NULL || port == NULL)) + return -EINVAL; + + uint32_t udp_data_len = mbuf->data_len; + uhd_dpdk_ipv4_prep(port, + mbuf, + pdata->dst_ipv4_addr, + 0x11, + 8 + udp_data_len); + + eth_hdr = rte_pktmbuf_mtod(mbuf, struct ether_hdr *); + ip_hdr = (struct ipv4_hdr *) ð_hdr[1]; + tx_hdr = (struct udp_hdr *) &ip_hdr[1]; + + tx_hdr->src_port = pdata->src_port; + tx_hdr->dst_port = pdata->dst_port; + tx_hdr->dgram_len = rte_cpu_to_be_16(8 + udp_data_len); + tx_hdr->dgram_cksum = 0; + + return 0; +} + +int uhd_dpdk_udp_get_info(struct uhd_dpdk_socket *sock, + struct uhd_dpdk_sockarg_udp *sockarg) +{ + if (unlikely(sock == NULL || sockarg == NULL)) + return -EINVAL; + if (sock->sock_type != UHD_DPDK_SOCK_UDP) + return -EINVAL; + + struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) sock->priv; + if (sock->tx_ring) { + sockarg->is_tx = true; + sockarg->local_port = pdata->src_port; + sockarg->remote_port = pdata->dst_port; + sockarg->dst_addr = pdata->dst_ipv4_addr; + } else { + sockarg->is_tx = false; + sockarg->local_port = pdata->dst_port; + sockarg->remote_port = pdata->src_port; + sockarg->dst_addr = 0; + } + return 0; +} + |