diff options
Diffstat (limited to 'host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c')
-rw-r--r-- | host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c | 401 |
1 files changed, 401 insertions, 0 deletions
diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c b/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c new file mode 100644 index 000000000..0af9cc4e5 --- /dev/null +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c @@ -0,0 +1,401 @@ +// +// Copyright 2018 Ettus Research, a National Instruments Company +// +// SPDX-License-Identifier: GPL-3.0-or-later +// +#include "uhd_dpdk_driver.h" +#include "uhd_dpdk_fops.h" +#include "uhd_dpdk_udp.h" +#include <rte_malloc.h> +#include <rte_mempool.h> +#include <arpa/inet.h> +#include <unistd.h> + +int _uhd_dpdk_process_arp(struct uhd_dpdk_port *port, struct arp_hdr *arp_frame) +{ + uint32_t dest_ip = arp_frame->arp_data.arp_sip; + struct ether_addr dest_addr = arp_frame->arp_data.arp_sha; + + struct uhd_dpdk_arp_entry *entry = NULL; + rte_hash_lookup_data(port->arp_table, &dest_ip, (void **) &entry); + if (!entry) { + entry = rte_zmalloc(NULL, sizeof(*entry), 0); + if (!entry) { + return -ENOMEM; + } + LIST_INIT(&entry->pending_list); + ether_addr_copy(&dest_addr, &entry->mac_addr); + if (rte_hash_add_key_data(port->arp_table, &dest_ip, entry) < 0) { + rte_free(entry); + return -ENOSPC; + } + } else { + struct uhd_dpdk_config_req *req = NULL; + ether_addr_copy(&dest_addr, &entry->mac_addr); + /* Now wake any config reqs waiting for the ARP */ + LIST_FOREACH(req, &entry->pending_list, entry) { + _uhd_dpdk_config_req_compl(req, 0); + } + } + + return 0; +} + +int _uhd_dpdk_arp_request(struct uhd_dpdk_port *port, uint32_t ip) +{ + struct rte_mbuf *mbuf; + struct ether_hdr *hdr; + struct arp_hdr *arp_frame; + + mbuf = rte_pktmbuf_alloc(port->parent->tx_pktbuf_pool); + if (unlikely(mbuf == NULL)) { + RTE_LOG(WARNING, MEMPOOL, "Could not allocate packet buffer for ARP request\n"); + return -ENOMEM; + } + + hdr = rte_pktmbuf_mtod(mbuf, struct ether_hdr *); + arp_frame = (struct arp_hdr *) &hdr[1]; + + memset(hdr->d_addr.addr_bytes, 0xFF, ETHER_ADDR_LEN); + ether_addr_copy(&port->mac_addr, &hdr->s_addr); + hdr->ether_type = rte_cpu_to_be_16(ETHER_TYPE_ARP); + + arp_frame->arp_hrd = rte_cpu_to_be_16(ARP_HRD_ETHER); + arp_frame->arp_pro = rte_cpu_to_be_16(ETHER_TYPE_IPv4); + arp_frame->arp_hln = 6; + arp_frame->arp_pln = 4; + arp_frame->arp_op = rte_cpu_to_be_16(ARP_OP_REQUEST); + ether_addr_copy(&port->mac_addr, &arp_frame->arp_data.arp_sha); + arp_frame->arp_data.arp_sip = port->ipv4_addr; + memset(arp_frame->arp_data.arp_tha.addr_bytes, 0x00, ETHER_ADDR_LEN); + arp_frame->arp_data.arp_tip = ip; + + mbuf->pkt_len = 42; + mbuf->data_len = 42; + mbuf->ol_flags = PKT_TX_IP_CKSUM; + + if (rte_eth_tx_burst(port->id, 0, &mbuf, 1) != 1) { + RTE_LOG(WARNING, RING, "%s: TX descriptor ring is full\n", __func__); + rte_pktmbuf_free(mbuf); + return -EAGAIN; + } + return 0; +} + + +int _uhd_dpdk_process_udp(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, struct udp_hdr *pkt) +{ + int status = 0; + struct uhd_dpdk_ipv4_5tuple ht_key = { + .sock_type = UHD_DPDK_SOCK_UDP, + .src_ip = 0, + .src_port = 0, + .dst_ip = 0, + .dst_port = pkt->dst_port + }; + + struct uhd_dpdk_socket *sock = NULL; + rte_hash_lookup_data(port->rx_table, &ht_key, (void **) &sock); + if (!sock) { + status = -ENODEV; + goto udp_rx_drop; + } + + struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) sock->priv; + status = rte_ring_enqueue(sock->rx_ring, mbuf); + if (status) { + pdata->dropped_pkts++; + goto udp_rx_drop; + } + return 0; + +udp_rx_drop: + rte_pktmbuf_free(mbuf); + return status; +} + +int _uhd_dpdk_process_ipv4(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, struct ipv4_hdr *pkt) +{ + if (pkt->dst_addr != port->ipv4_addr) { + rte_pktmbuf_free(mbuf); + return -ENODEV; + } + if (pkt->next_proto_id == 0x11) { + return _uhd_dpdk_process_udp(port, mbuf, (struct udp_hdr *) &pkt[1]); + } + rte_pktmbuf_free(mbuf); + return -EINVAL; +} + +static int _uhd_dpdk_fill_ipv4_addr(struct uhd_dpdk_port *port, + struct rte_mbuf *mbuf) +{ + struct ether_hdr *eth_hdr = rte_pktmbuf_mtod(mbuf, struct ether_hdr *); + struct ipv4_hdr *ip_hdr = (struct ipv4_hdr *) ð_hdr[1]; + if (is_broadcast(port, ip_hdr->dst_addr)) { + memset(eth_hdr->d_addr.addr_bytes, 0xff, ETHER_ADDR_LEN); + } else { + /* Lookup dest_addr */ + struct uhd_dpdk_arp_entry *entry = NULL; + rte_hash_lookup_data(port->arp_table, &ip_hdr->dst_addr, (void **) &entry); + if (!entry) { + RTE_LOG(ERR, USER1, "TX packet on port %d to addr 0x%08x has no ARP entry\n", port->id, ip_hdr->dst_addr); + return -ENODEV; + } + + ether_addr_copy(&entry->mac_addr, ð_hdr->d_addr); + } + return 0; +} + +static int _uhd_dpdk_send(struct uhd_dpdk_port *port, + struct uhd_dpdk_tx_queue *txq, + struct rte_ring *q) +{ + struct rte_mbuf *buf; + + unsigned int num_tx = rte_ring_count(q); + num_tx = (num_tx < UHD_DPDK_TX_BURST_SIZE) ? num_tx : UHD_DPDK_TX_BURST_SIZE; + for (unsigned int i = 0; i < num_tx; i++) { + int status = rte_ring_dequeue(q, (void **) &buf); + if (status) { + RTE_LOG(ERR, USER1, "%s: Q Count doesn't match actual\n", __func__); + break; + } + struct ether_hdr *eth_hdr = rte_pktmbuf_mtod(buf, struct ether_hdr *); + if (eth_hdr->ether_type == rte_cpu_to_be_16(ETHER_TYPE_IPv4)) { + status = _uhd_dpdk_fill_ipv4_addr(port, buf); + if (status) { + return status; + } + } + status = rte_eth_tx_burst(port->id, 0, &buf, 1); /* Automatically frees mbuf */ + if (status != 1) { + status = rte_ring_enqueue(txq->retry_queue, buf); + if (status) + RTE_LOG(WARNING, USER1, "%s: Could not re-enqueue pkt %d\n", __func__, i); + num_tx = i; + rte_pktmbuf_free(buf); + break; + } + } + + return num_tx; +} + +static inline int _uhd_dpdk_restore_bufs(struct uhd_dpdk_port *port, + struct uhd_dpdk_tx_queue *q, + unsigned int num_bufs) +{ + /* Allocate more buffers to replace the sent ones */ + struct rte_mbuf *freebufs[UHD_DPDK_TXQ_SIZE]; + int status = rte_pktmbuf_alloc_bulk(port->parent->tx_pktbuf_pool, freebufs, num_bufs); + if (status) { + RTE_LOG(ERR, USER1, "%d %s: Could not restore %u pktmbufs in bulk!\n", status, __func__, num_bufs); + } + + /* Enqueue the buffers for the user thread to retrieve */ + unsigned int enqd = rte_ring_enqueue_bulk(q->freebufs, (void **) freebufs, num_bufs, NULL); + if (enqd != num_bufs) { + RTE_LOG(ERR, USER1, "Could not enqueue pktmbufs!\n"); + return status; + } + + return num_bufs; +} + +static inline void _uhd_dpdk_disable_ports(struct uhd_dpdk_thread *t) +{ + struct uhd_dpdk_port *port = NULL; + LIST_FOREACH(port, &t->port_list, port_entry) { + rte_eth_dev_stop(port->id); + } +} + +static inline int _uhd_dpdk_driver_cleanup(struct uhd_dpdk_thread *t) +{ + /* Close sockets upon request, but reply to other service requests with + * errors + */ + struct uhd_dpdk_config_req *sock_req; + if (rte_ring_dequeue(t->sock_req_ring, (void **) &sock_req)) { + switch (sock_req->req_type) { + case UHD_DPDK_SOCK_CLOSE: + _uhd_dpdk_sock_release(sock_req); + break; + default: + _uhd_dpdk_config_req_compl(sock_req, -ENODEV); + break; + } + } + + /* Do nothing if there are users remaining */ + struct uhd_dpdk_port *port = NULL; + LIST_FOREACH(port, &t->port_list, port_entry) { + /* Check for RX sockets */ + const void *hash_key; + void *hash_sock; + uint32_t hash_next = 0; + if (rte_hash_iterate(port->rx_table, &hash_key, + &hash_sock, &hash_next) != -ENOENT) + return -EAGAIN; + + /* Check for TX sockets */ + struct uhd_dpdk_tx_queue *q = NULL; + LIST_FOREACH(q, &port->txq_list, entry) { + if (!LIST_EMPTY(&q->tx_list)) + return -EAGAIN; + } + } + + /* Now can free memory */ + LIST_FOREACH(port, &t->port_list, port_entry) { + rte_hash_free(port->rx_table); + + struct uhd_dpdk_tx_queue *q = LIST_FIRST(&port->txq_list); + while (!LIST_EMPTY(&port->txq_list)) { + struct uhd_dpdk_tx_queue *nextq = LIST_NEXT(q, entry); + while (!rte_ring_empty(q->queue)) { + struct rte_buf *buf = NULL; + rte_ring_dequeue(q->queue, (void **) &buf); + rte_free(buf); + } + while (!rte_ring_empty(q->freebufs)) { + struct rte_buf *buf = NULL; + rte_ring_dequeue(q->freebufs, (void **) &buf); + rte_free(buf); + } + while (!rte_ring_empty(q->retry_queue)) { + struct rte_buf *buf = NULL; + rte_ring_dequeue(q->retry_queue, (void **) &buf); + rte_free(buf); + } + rte_ring_free(q->queue); + rte_ring_free(q->freebufs); + rte_ring_free(q->retry_queue); + rte_free(q); + q = nextq; + } + + const void *arp_key; + uint32_t arp_key_next = 0; + struct uhd_dpdk_arp_entry *arp_entry = NULL; + while (rte_hash_iterate(port->arp_table, &arp_key, + (void **) &arp_entry, &arp_key_next) >= 0) { + rte_free(arp_entry); + } + rte_hash_free(port->arp_table); + } + + return 0; +} + +int _uhd_dpdk_driver_main(void *arg) +{ + + /* Don't currently have use for arguments */ + if (arg) + return -EINVAL; + + /* Check that this is a valid lcore */ + unsigned int lcore_id = rte_lcore_id(); + if (lcore_id == LCORE_ID_ANY) + return -ENODEV; + + /* Check that this lcore has ports */ + struct uhd_dpdk_thread *t = &ctx->threads[lcore_id]; + if (t->id != lcore_id) + return -ENODEV; + + RTE_LOG(INFO, USER2, "Thread %d started\n", lcore_id); + int status = 0; + while (!status) { + /* Check for open()/close() requests and service 1 at a time */ + struct uhd_dpdk_config_req *sock_req; + if (rte_ring_dequeue(t->sock_req_ring, (void **) &sock_req) == 0) { + /* FIXME: Not checking return vals */ + switch (sock_req->req_type) { + case UHD_DPDK_SOCK_OPEN: + _uhd_dpdk_sock_setup(sock_req); + break; + case UHD_DPDK_SOCK_CLOSE: + _uhd_dpdk_sock_release(sock_req); + break; + case UHD_DPDK_LCORE_TERM: + RTE_LOG(INFO, EAL, "Terminating lcore %u\n", lcore_id); + status = 1; + _uhd_dpdk_config_req_compl(sock_req, 0); + break; + default: + RTE_LOG(ERR, USER2, "Invalid socket request %d\n", sock_req->req_type); + break; + } + } + /* For each port, attempt to receive packets and process */ + struct uhd_dpdk_port *port = NULL; + LIST_FOREACH(port, &t->port_list, port_entry) { + struct ether_hdr *hdr; + char *l2_data; + struct rte_mbuf *bufs[UHD_DPDK_RX_BURST_SIZE]; + const uint16_t num_rx = rte_eth_rx_burst(port->id, 0, + bufs, UHD_DPDK_RX_BURST_SIZE); + if (unlikely(num_rx == 0)) { + continue; + } + + for (int buf = 0; buf < num_rx; buf++) { + uint64_t ol_flags = bufs[buf]->ol_flags; + hdr = rte_pktmbuf_mtod(bufs[buf], struct ether_hdr *); + l2_data = (char *) &hdr[1]; + switch (rte_be_to_cpu_16(hdr->ether_type)) { + case ETHER_TYPE_ARP: + _uhd_dpdk_process_arp(port, (struct arp_hdr *) l2_data); + rte_pktmbuf_free(bufs[buf]); + break; + case ETHER_TYPE_IPv4: + if (ol_flags == PKT_RX_IP_CKSUM_BAD) { /* TODO: Track IP cksum errors? */ + RTE_LOG(WARNING, RING, "Buf %d: Bad IP cksum\n", buf); + } else { + _uhd_dpdk_process_ipv4(port, bufs[buf], (struct ipv4_hdr *) l2_data); + } + break; + default: + rte_pktmbuf_free(bufs[buf]); + break; + } + } + } + /* For each port's TX queues, do TX */ + LIST_FOREACH(port, &t->port_list, port_entry) { + struct uhd_dpdk_tx_queue *q = NULL; + LIST_FOREACH(q, &port->txq_list, entry) { + if (!rte_ring_empty(q->retry_queue)) { + int num_retry = _uhd_dpdk_send(port, q, q->retry_queue); + _uhd_dpdk_restore_bufs(port, q, num_retry); + if (!rte_ring_empty(q->retry_queue)) { + break; + } + } + if (rte_ring_empty(q->queue)) { + continue; + } + int num_tx = _uhd_dpdk_send(port, q, q->queue); + if (num_tx > 0) { + _uhd_dpdk_restore_bufs(port, q, num_tx); + } else { + break; + } + } + } + } + + /* Now turn off ports */ + _uhd_dpdk_disable_ports(t); + + /* Now clean up before exiting */ + int cleaning = -EAGAIN; + while (cleaning == -EAGAIN) { + cleaning = _uhd_dpdk_driver_cleanup(t); + } + return status; +} |