diff options
Diffstat (limited to 'host/lib')
| -rw-r--r-- | host/lib/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | host/lib/include/uhd/transport/uhd-dpdk.h | 249 | ||||
| -rw-r--r-- | host/lib/transport/CMakeLists.txt | 6 | ||||
| -rw-r--r-- | host/lib/transport/uhd-dpdk/CMakeLists.txt | 36 | ||||
| -rw-r--r-- | host/lib/transport/uhd-dpdk/test/Makefile | 53 | ||||
| -rw-r--r-- | host/lib/transport/uhd-dpdk/test/test.c | 303 | ||||
| -rw-r--r-- | host/lib/transport/uhd-dpdk/uhd_dpdk.c | 363 | ||||
| -rw-r--r-- | host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h | 253 | ||||
| -rw-r--r-- | host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c | 401 | ||||
| -rw-r--r-- | host/lib/transport/uhd-dpdk/uhd_dpdk_driver.h | 32 | ||||
| -rw-r--r-- | host/lib/transport/uhd-dpdk/uhd_dpdk_fops.c | 306 | ||||
| -rw-r--r-- | host/lib/transport/uhd-dpdk/uhd_dpdk_fops.h | 15 | ||||
| -rw-r--r-- | host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c | 456 | ||||
| -rw-r--r-- | host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h | 30 | 
14 files changed, 2503 insertions, 2 deletions
diff --git a/host/lib/CMakeLists.txt b/host/lib/CMakeLists.txt index 4461612de..99598570e 100644 --- a/host/lib/CMakeLists.txt +++ b/host/lib/CMakeLists.txt @@ -62,6 +62,7 @@ MESSAGE(STATUS "")  FIND_PACKAGE(USB1)  FIND_PACKAGE(GPSD)  FIND_PACKAGE(LIBERIO) +FIND_PACKAGE(DPDK)  LIBUHD_REGISTER_COMPONENT("LIBERIO" ENABLE_LIBERIO ON "ENABLE_LIBUHD;LIBERIO_FOUND" OFF OFF)  LIBUHD_REGISTER_COMPONENT("USB" ENABLE_USB ON "ENABLE_LIBUHD;LIBUSB_FOUND" OFF OFF)  LIBUHD_REGISTER_COMPONENT("GPSD" ENABLE_GPSD OFF "ENABLE_LIBUHD;ENABLE_GPSD;LIBGPS_FOUND" OFF OFF) @@ -77,6 +78,7 @@ LIBUHD_REGISTER_COMPONENT("MPMD" ENABLE_MPMD ON "ENABLE_LIBUHD" OFF OFF)  LIBUHD_REGISTER_COMPONENT("N300" ENABLE_N300 ON "ENABLE_LIBUHD;ENABLE_MPMD" OFF OFF)  LIBUHD_REGISTER_COMPONENT("E320" ENABLE_E320 ON "ENABLE_LIBUHD;ENABLE_MPMD" OFF OFF)  LIBUHD_REGISTER_COMPONENT("OctoClock" ENABLE_OCTOCLOCK ON "ENABLE_LIBUHD" OFF OFF) +LIBUHD_REGISTER_COMPONENT("DPDK" ENABLE_DPDK ON "ENABLE_MPMD;DPDK_FOUND" OFF OFF)  ########################################################################  # Include subdirectories (different than add) diff --git a/host/lib/include/uhd/transport/uhd-dpdk.h b/host/lib/include/uhd/transport/uhd-dpdk.h new file mode 100644 index 000000000..5f74ee9b4 --- /dev/null +++ b/host/lib/include/uhd/transport/uhd-dpdk.h @@ -0,0 +1,249 @@ +// +// Copyright 2018 Ettus Research, a National Instruments Company +// +// SPDX-License-Identifier: GPL-3.0-or-later +// +#ifndef _UHD_DPDK_H_ +#define _UHD_DPDK_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include <stdbool.h> +#include <stdint.h> +#include <rte_mbuf.h> + +/* For MAC address */ +struct eth_addr { +    uint8_t addr[6]; +}; + +/* Opaque type representing a socket + * May NOT be shared between threads + */ +struct uhd_dpdk_socket; + +/* Only support UDP sockets currently */ +enum uhd_dpdk_sock_type { +    UHD_DPDK_SOCK_UDP = 0, +    UHD_DPDK_SOCK_TYPE_COUNT +}; + +/** + * Init UHD-DPDK environment and bring up ports (link UP). + * + * Offload capabilities will be used if available + *  + * @param argc passed directly to rte_eal_init() + * @param argv passed directly to rte_eal_init() + * @param num_ports number of network interfaces to map + * @param port_thread_mapping array of num_ports entries specifying which thread + *     will drive the I/O for a given port (determined by array index) + * @param num_mbufs number of packets in each packet buffer pool (multiplied by num_ports) + *     There is one RX and one TX buffer pool per CPU socket + * @param mbuf_cache_size Number of packet buffers to put in core-local cache + * @param mtu Maximum frame size + * + * @return Returns negative error code if there were issues, else 0 + */ +int uhd_dpdk_init(int argc, char **argv, unsigned int num_ports, +                  int *port_thread_mapping, int num_mbufs, int mbuf_cache_size, +                  int mtu); + +/** + * @return Returns number of ports registered to DPDK. + *         Returns negative error value if uhd-dpdk hasn't been init'd + */ +int uhd_dpdk_port_count(void); + +/** + * @return Returns Ethernet MAC address of requested port + * + * @param portid ID number of network interface + */ +struct eth_addr uhd_dpdk_get_eth_addr(unsigned int portid); + +/** + * Get IPv4 address of requested port + * + * @param portid ID number of network interface + * @param ipv4_addr pointer to uint32_t where ipv4 address is stored + *     Must be non-NULL + * @param netmask pointer to uint32_t where netmask is stored + *     May be left NULL + * + * @return Returns + *  0 = success + *  nonzero = failure + */ +int uhd_dpdk_get_ipv4_addr(unsigned int portid, uint32_t *ipv4_addr, uint32_t *netmask); + +/** + * Sets IPv4 address of requested port + * + * @param portid ID number of network interface + * @param ipv4_addr must be in network format + * @param netmask must be in network format + * + * @return Return values: + * 0 = success + * nonzero = failure + */ +int uhd_dpdk_set_ipv4_addr(unsigned int portid, uint32_t ipv4_addr, uint32_t netmask); + +/** + * Create new socket of type sock_type on port portid + * Copies needed info from sockarg + * Do NOT share struct uhd_dpdk_socket between threads! + * + * @param portid ID number of network interface + * @param t Type of socket to create (only UDP supported currently) + * @param sockarg Pointer to arguments for corresponding socket type + * + * @return Returns pointer to socket structure on success, else NULL + */ +struct uhd_dpdk_socket* uhd_dpdk_sock_open(unsigned int portid, +                                           enum uhd_dpdk_sock_type t, void *sockarg); + +/** + * Close socket created by uhd_dpdk_sock_open + * + * Note: Outstanding packet buffers must still be freed by user + * + * @param sock Socket to close + * + * @return Returns + *  0 = success + *  nonzero = failure + */ +int uhd_dpdk_sock_close(struct uhd_dpdk_socket *sock); + +/** + * Arguments for a UDP socket + * All data should be provided in network format + */ +struct uhd_dpdk_sockarg_udp { +    /*! True for TX socket, false for RX socket */ +    bool     is_tx; +    /*! Local udp port. This is dst_port for RX, src_port for TX */ +    uint16_t local_port; +    /*! Remote udp port. This is dst_port for TX */ +    uint16_t remote_port; +    /*! IPv4 address for destination (TX) */ +    uint32_t dst_addr; +}; + +/** + * Brings all ports and threads down in preparation for a clean program exit + * + * All sockets will need to be closed by the user for a thread to terminate in + * this function. + */ +int uhd_dpdk_destroy(void); + +/** + * Requests num_bufs buffers from sock. Places pointers to buffers in bufs table. + * + * @param sock pointer to socket + * @param bufs pointer to array of buffers (to store buffer locations) + * @param num_bufs number of buffers requested + * + * @return Returns number of buffers retrieved or negative error code + */ +int uhd_dpdk_request_tx_bufs(struct uhd_dpdk_socket *sock, struct rte_mbuf **bufs, unsigned int num_bufs); + +/** + * Enqueues num_bufs buffers in sock TX buffer. Uses pointers to buffers in bufs table. + * + * @param sock pointer to socket + * @param bufs pointer to array of buffers (to retrieve buffer locations) + * @param num_bufs number of buffers requested + * + * @return Returns number of buffers enqueued or negative error code + */ +int uhd_dpdk_send(struct uhd_dpdk_socket *sock, struct rte_mbuf **bufs, unsigned int num_bufs); + +/** + * Dequeues num_bufs buffers from sock RX buffer. Uses pointers to buffers in bufs table. + * + * @param sock pointer to socket + * @param bufs pointer to array of buffers (to store buffer locations) + * @param num_bufs number of buffers requested + * @param timeout Time (in us) to wait for a packet + * + * @return Returns number of buffers dequeued or negative error code + * + * NOTE: MUST free buffers with uhd_dpdk_free_buf once finished + */ +int uhd_dpdk_recv(struct uhd_dpdk_socket *sock, struct rte_mbuf **bufs, +                  unsigned int num_bufs, unsigned int timeout); + +/** + * Frees buffer previously received from uhd_dpdk_recv + *      (or unused ones from uhd_dpdk_request_tx_bufs) + * + * @param buf pointer to packet buffer + */ +void uhd_dpdk_free_buf(struct rte_mbuf *buf); + +/** + * Returns pointer to start of data segment of packet buffer + * + * @param sock Socket associated with packet buffer + * @param buf pointer to packet buffer + */ +void * uhd_dpdk_buf_to_data(struct uhd_dpdk_socket *sock, struct rte_mbuf *buf); + +/** + * Returns size of data segment of packet buffer (in bytes) + * + * This is protocol-dependent. A UDP socket will return the UDP payload size. + * + * @param sock Socket associated with packet buffer + * @param buf pointer to packet buffer + * + * @return Return 0 for success, else failed + */ +int uhd_dpdk_get_len(struct uhd_dpdk_socket *sock, struct rte_mbuf *buf); + +/** + * Get IPv4 address of sender (for UDP RX socket) + * + * @param sock Socket associated with packet buffer + * @param buf pointer to packet buffer + * @param ipv4_addr pointer to buffer where ipv4 address will be written + * + * @return Return 0 for success, else failed + */ +int uhd_dpdk_get_src_ipv4(struct uhd_dpdk_socket *sock, struct rte_mbuf *buf, +                          uint32_t *ipv4_addr); + +/** + * Get info (local port, remote port, dst addr, etc.) for UDP socket + * + * @param sock Socket to get information from + * @param sockarg Pointer to location where information will be stored + * + * @return Return 0 for success, else failed + */ +int uhd_dpdk_udp_get_info(struct uhd_dpdk_socket *sock, struct uhd_dpdk_sockarg_udp *sockarg); + + +/*********************************************** + * Statistics + ***********************************************/ +/** + * Get dropped packet count of provided socket + * + * @param sock Socket to get information from + * @param count Pointer to location where information will be stored + * + * @return Return 0 for success, else failed + */ +int uhd_dpdk_get_drop_count(struct uhd_dpdk_socket *sock, uint32_t *count); + +#ifdef __cplusplus +} +#endif +#endif /* _UHD_DPDK_H_ */ diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt index 15771697a..7c79bc67c 100644 --- a/host/lib/transport/CMakeLists.txt +++ b/host/lib/transport/CMakeLists.txt @@ -130,8 +130,6 @@ IF(ENABLE_X300)  ENDIF(ENABLE_X300)  IF(ENABLE_LIBERIO) -    MESSAGE(STATUS "") -    MESSAGE(STATUS "liberio support enabled.")      INCLUDE_DIRECTORIES(${LIBERIO_INCLUDE_DIRS})      LIBUHD_APPEND_LIBS(${LIBERIO_LIBRARIES})      LIBUHD_APPEND_SOURCES( @@ -139,6 +137,10 @@ IF(ENABLE_LIBERIO)      )  ENDIF(ENABLE_LIBERIO) +IF(ENABLE_DPDK) +    INCLUDE_SUBDIRECTORY(uhd-dpdk) +ENDIF(ENABLE_DPDK) +  # Verbose Debug output for send/recv  SET( UHD_TXRX_DEBUG_PRINTS OFF CACHE BOOL "Use verbose debug output for send/recv" )  OPTION( UHD_TXRX_DEBUG_PRINTS "Use verbose debug output for send/recv" "" ) diff --git a/host/lib/transport/uhd-dpdk/CMakeLists.txt b/host/lib/transport/uhd-dpdk/CMakeLists.txt new file mode 100644 index 000000000..be683aaf2 --- /dev/null +++ b/host/lib/transport/uhd-dpdk/CMakeLists.txt @@ -0,0 +1,36 @@ +# +# Copyright 2018 Ettus Research, a National Instruments Company +# +# SPDX-License-Identifier: GPL-3.0 +# + +######################################################################## +# Add the subdirectories +######################################################################## +if(ENABLE_DPDK) +    if(NOT DEFINED UHD_DPDK_CFLAGS) +        message(STATUS "") +        set(UHD_DPDK_CFLAGS "-march=native" +            CACHE STRING "CFLAGS to use when building uhd-dpdk sources") +        message(STATUS "DPDK: Using default UHD_DPDK_CFLAGS=" ${UHD_DPDK_CFLAGS}) +    endif(NOT DEFINED UHD_DPDK_CFLAGS) + +    include_directories(${CMAKE_CURRENT_SOURCE_DIR}) + +    LIBUHD_APPEND_SOURCES( +        ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk.c +        ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_driver.c +        ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_fops.c +        ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_udp.c +    ) +    set_source_files_properties( +        ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk.c +        ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_driver.c +        ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_fops.c +        ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_udp.c +        PROPERTIES COMPILE_FLAGS ${UHD_DPDK_CFLAGS} +    ) +    include_directories(${DPDK_INCLUDE_DIR}) +    LIBUHD_APPEND_LIBS(${DPDK_LIBRARIES}) +endif(ENABLE_DPDK) + diff --git a/host/lib/transport/uhd-dpdk/test/Makefile b/host/lib/transport/uhd-dpdk/test/Makefile new file mode 100644 index 000000000..9d6e60372 --- /dev/null +++ b/host/lib/transport/uhd-dpdk/test/Makefile @@ -0,0 +1,53 @@ +#   BSD LICENSE +# +#   Copyright(c) 2010-2014 Intel Corporation. All rights reserved. +#   All rights reserved. +# +#   Redistribution and use in source and binary forms, with or without +#   modification, are permitted provided that the following conditions +#   are met: +# +#     * Redistributions of source code must retain the above copyright +#       notice, this list of conditions and the following disclaimer. +#     * Redistributions in binary form must reproduce the above copyright +#       notice, this list of conditions and the following disclaimer in +#       the documentation and/or other materials provided with the +#       distribution. +#     * Neither the name of Intel Corporation nor the names of its +#       contributors may be used to endorse or promote products derived +#       from this software without specific prior written permission. +# +#   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +#   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +#   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +#   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +#   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +#   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +#   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +#   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +#   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +#   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +#   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +ifeq ($(RTE_SDK),) +$(error "Please define RTE_SDK environment variable") +endif + +# Default target, can be overridden by command line or environment +RTE_TARGET ?= x86_64-native-linuxapp-gcc + +include $(RTE_SDK)/mk/rte.vars.mk + +# binary name +APP = test + +# all source are stored in SRCS-y +SRCS-y := test.c + +CFLAGS += -O0 -g +CFLAGS += $(WERROR_FLAGS) + +EXTRA_CFLAGS=-I${S}/../include +EXTRA_LDFLAGS=-L${O}/lib -luhd-dpdk + +include $(RTE_SDK)/mk/rte.extapp.mk diff --git a/host/lib/transport/uhd-dpdk/test/test.c b/host/lib/transport/uhd-dpdk/test/test.c new file mode 100644 index 000000000..c324561de --- /dev/null +++ b/host/lib/transport/uhd-dpdk/test/test.c @@ -0,0 +1,303 @@ +// +// Copyright 2018 Ettus Research, a National Instruments Company +// +// SPDX-License-Identifier: GPL-3.0-or-later +// +/** + * Benchmark program to check performance of 2 simultaneous links + */ + + +#include <uhd-dpdk.h> +#include <stdio.h> +#include <stdbool.h> +#include <string.h> +#include <unistd.h> +#include <sys/time.h> +#include <errno.h> +#include <arpa/inet.h> + +#define NUM_MBUFS 4095 +#define MBUF_CACHE_SIZE 315 +#define BURST_SIZE 64 + +#define NUM_PORTS 2 +#define TX_CREDITS 28 +#define RX_CREDITS 64 +#define BENCH_SPP 700 +#define BENCH_IFG 220 + + +static uint32_t last_seqno[NUM_PORTS]; +static uint32_t dropped_packets[NUM_PORTS]; +static uint32_t lasts[NUM_PORTS][16], drops[NUM_PORTS][16]; +static uint32_t last_ackno[NUM_PORTS]; +static uint32_t tx_seqno[NUM_PORTS]; +static uint64_t tx_xfer[NUM_PORTS]; + +static void process_udp(int id, uint32_t *udp_data) +{ +	if (udp_data[0] != last_seqno[id] + 1) { +		lasts[id][dropped_packets[id] & 0xf] = last_seqno[id]; +		drops[id][dropped_packets[id] & 0xf] = udp_data[0]; +		dropped_packets[id]++; +	} + +	last_seqno[id] = udp_data[0]; +	last_ackno[id] = udp_data[1]; +} + +static void send_ctrl(struct uhd_dpdk_socket *sock, uint32_t run) +{ +	struct rte_mbuf *mbuf = NULL; +	uhd_dpdk_request_tx_bufs(sock, &mbuf, 1); +	if (unlikely(mbuf == NULL)) +		return; +	uint32_t *tx_data = uhd_dpdk_buf_to_data(sock, mbuf); +	tx_data[0] = (BENCH_SPP << 16) | (TX_CREDITS << 4) | run; // spp, tx_credits, run +	tx_data[1] = (RX_CREDITS << 16) | (BENCH_IFG); // credits, ifg +	mbuf->pkt_len = 8; +	mbuf->data_len = 8; +	uhd_dpdk_send(sock, &mbuf, 1); +} + +static void send_udp(struct uhd_dpdk_socket *sock, int id, bool fc_only) +{ +	struct rte_mbuf *mbuf = NULL; +	uhd_dpdk_request_tx_bufs(sock, &mbuf, 1); +	if (unlikely(mbuf == NULL)) +		return; +	uint32_t *tx_data = uhd_dpdk_buf_to_data(sock, mbuf); +	tx_data[0] = fc_only ? tx_seqno[id] - 1 : tx_seqno[id]; +	tx_data[1] = last_seqno[id]; +	if (!fc_only) { +		memset(&tx_data[2], last_seqno[id], 8*BENCH_SPP); +		tx_xfer[id] += 8*BENCH_SPP; +	} +	mbuf->pkt_len = 8 + (fc_only ? 0 : 8*BENCH_SPP); +	mbuf->data_len = 8 + (fc_only ? 0 : 8*BENCH_SPP); + +	uhd_dpdk_send(sock, &mbuf, 1); + +	if (!fc_only) { +		tx_seqno[id]++; +	} +} + +static void bench(struct uhd_dpdk_socket **tx, struct uhd_dpdk_socket **rx, struct uhd_dpdk_socket **ctrl, uint32_t nb_ports) +{ +	uint64_t total_xfer[NUM_PORTS]; +	uint32_t id; +	for (id = 0; id < nb_ports; id++) { +		tx_seqno[id] = 1; +		tx_xfer[id] = 0; +		last_ackno[id] = 0; +		last_seqno[id] = 0; +		dropped_packets[id] = 0; +		total_xfer[id] = 0; +	} +	sleep(1); +	struct timeval bench_start, bench_end; +	gettimeofday(&bench_start, NULL); +	for (id = 0; id < nb_ports; id++) { +		send_ctrl(ctrl[id], 0); +		for (int pktno = 0; (pktno < TX_CREDITS*3/4); pktno++) { +			send_udp(tx[id], id, false); +		} +	} +	for (id = 0; id < nb_ports; id++) { +		send_ctrl(ctrl[id], 1); +	} +	/* +	 * The test... +	 */ +	uint64_t total_received = 0; +	uint32_t consec_no_rx = 0; +	while (total_received < 1000000 ) { //&& consec_no_rx < 10000) { +		for (id = 0; id < nb_ports; id++) { + +			/* Get burst of RX packets, from first port of pair. */ +			struct rte_mbuf *bufs[BURST_SIZE]; +			const int64_t nb_rx = uhd_dpdk_recv(rx[id], bufs, BURST_SIZE); + +			if (unlikely(nb_rx <= 0)) { +				consec_no_rx++; +				if (consec_no_rx >= 100000) { +					uint32_t skt_drops = 0; +					uhd_dpdk_get_drop_count(rx[id], &skt_drops); +					printf("TX seq %d, TX ack %d, RX seq %d, %d, drops!\n", tx_seqno[id], last_ackno[id], last_seqno[id], skt_drops); +					consec_no_rx = 0; +					break; +				} +				continue; +			} else { +				consec_no_rx = 0; +			} + +			for (int buf = 0; buf < nb_rx; buf++) { +				total_xfer[id] += bufs[buf]->pkt_len; +				uint64_t ol_flags = bufs[buf]->ol_flags; +				uint32_t *data = (uint32_t *) uhd_dpdk_buf_to_data(rx[id], bufs[buf]); +				if (ol_flags == PKT_RX_IP_CKSUM_BAD) { /* FIXME: Deprecated/changed in later release */ +					printf("Buf %d: Bad IP cksum\n", buf); +				} else { +					process_udp(id, data); +				} +			} + +			/* Free buffers. */ +			for (int buf = 0; buf < nb_rx; buf++) +				uhd_dpdk_free_buf(bufs[buf]); +			total_received += nb_rx; +		} + +		for (id = 0; id < nb_ports; id++) { +			/* TX portion */ +			uint32_t window_end = last_ackno[id] + TX_CREDITS; +			//uint32_t window_end = last_seqno[port] + TX_CREDITS; +			if (window_end <= tx_seqno[id]) { +				if (consec_no_rx == 9999) { +					send_udp(tx[id], id, true); +				} +				//send_udp(tx[id], id, true); +				; +			} else { +				for (int pktno = 0; (pktno < BURST_SIZE) && (tx_seqno[id] < window_end); pktno++) { +					send_udp(tx[id], id, false); +				} +			} +		} +	} +	for (id = 0; id < nb_ports; id++) { +		send_ctrl(ctrl[id], 0); +	} +	gettimeofday(&bench_end, NULL); +	printf("Benchmark complete\n\n"); + +	for (id = 0; id < nb_ports; id++) { +		printf("\n"); +		printf("Bytes received = %ld\n", total_xfer[id]); +		printf("Bytes sent = %ld\n", tx_xfer[id]); +		printf("Time taken = %ld us\n", (bench_end.tv_sec - bench_start.tv_sec)*1000000 + (bench_end.tv_usec - bench_start.tv_usec)); +		double elapsed_time = (bench_end.tv_sec - bench_start.tv_sec)*1000000 + (bench_end.tv_usec - bench_start.tv_usec); +		elapsed_time *= 1.0e-6; +		double elapsed_bytes = total_xfer[id]; +		printf("RX Performance = %e Gbps\n", elapsed_bytes*8.0/1.0e9/elapsed_time); +		elapsed_bytes = tx_xfer[id]; +		printf("TX Performance = %e Gbps\n", elapsed_bytes*8.0/1.0e9/elapsed_time); +		uint32_t skt_drops = 0; +		uhd_dpdk_get_drop_count(rx[id], &skt_drops); +		printf("Dropped %d packets\n", dropped_packets[id]); +		printf("Socket reports dropped %d packets\n", skt_drops); +		for (unsigned int i = 0; i < 16; i++) { +			if (i >= dropped_packets[id]) +				break; +			printf("Last(%u), Recv(%u)\n", lasts[id][i], drops[id][i]); +		} +		//printf("%d missed/dropped packets\n", errors); +		printf("\n"); +	} + +} + +int main(int argc, char **argv) +{ +	int port_thread_mapping[2] = {1, 1}; +	int status = uhd_dpdk_init(argc, argv, 2, &port_thread_mapping[0], NUM_MBUFS, MBUF_CACHE_SIZE); +	if (status) { +		printf("%d: Something wrong?\n", status); +		return status; +	} + +	uint32_t eth_ip = htonl(0xc0a80008); +	uint32_t eth_mask = htonl(0xffffff00); +	status = uhd_dpdk_set_ipv4_addr(0, eth_ip, eth_mask); +	if (status) { +		printf("Error while setting IP0: %d\n", status); +		return status; +	} +	status = uhd_dpdk_set_ipv4_addr(1, eth_ip, eth_mask); +	if (status) { +		printf("Error while setting IP1: %d\n", status); +		return status; +	} + +	struct uhd_dpdk_socket *eth_rx[2]; +	struct uhd_dpdk_socket *eth_tx[2]; +	struct uhd_dpdk_socket *eth_ctrl[2]; +	struct uhd_dpdk_sockarg_udp sockarg = { +		.is_tx = false, +		.local_port = htons(0xBEE7), +		.remote_port = htons(0xBEE7), +		.dst_addr = htonl(0xc0a80004) +	}; +	eth_rx[0] = uhd_dpdk_sock_open(0, UHD_DPDK_SOCK_UDP, &sockarg); +	if (!eth_rx[0]) { +		printf("!eth0_rx\n"); +		return -ENODEV; +	} +	eth_rx[1] = uhd_dpdk_sock_open(1, UHD_DPDK_SOCK_UDP, &sockarg); +	if (!eth_rx[1]) { +		printf("!eth1_rx\n"); +		return -ENODEV; +	} + +	sockarg.is_tx = true; +	eth_tx[0] = uhd_dpdk_sock_open(0, UHD_DPDK_SOCK_UDP, &sockarg); +	if (!eth_tx[0]) { +		printf("!eth0_tx\n"); +		return -ENODEV; +	} +	eth_tx[1] = uhd_dpdk_sock_open(1, UHD_DPDK_SOCK_UDP, &sockarg); +	if (!eth_tx[1]) { +		printf("!eth1_tx\n"); +		return -ENODEV; +	} + +	sockarg.local_port = htons(0xB4D); +	sockarg.remote_port = htons(0xB4D); +	eth_ctrl[0] = uhd_dpdk_sock_open(0, UHD_DPDK_SOCK_UDP, &sockarg); +	if (!eth_ctrl[0]) { +		printf("!eth0_ctrl\n"); +		return -ENODEV; +	} +	eth_ctrl[1] = uhd_dpdk_sock_open(1, UHD_DPDK_SOCK_UDP, &sockarg); +	if (!eth_ctrl[1]) { +		printf("!eth1_ctrl\n"); +		return -ENODEV; +	} + +	bench(eth_tx, eth_rx, eth_ctrl, 2); + +	status = uhd_dpdk_sock_close(eth_rx[0]); +	if (status) { +		printf("Bad close RX0 %d\n", status); +		return status; +	} +	status = uhd_dpdk_sock_close(eth_rx[1]); +	if (status) { +		printf("Bad close RX1 %d\n", status); +		return status; +	} +	status = uhd_dpdk_sock_close(eth_tx[0]); +	if (status) { +		printf("Bad close TX0 %d\n", status); +		return status; +	} +	status = uhd_dpdk_sock_close(eth_tx[1]); +	if (status) { +		printf("Bad close TX1 %d\n", status); +		return status; +	} +	status = uhd_dpdk_sock_close(eth_ctrl[0]); +	if (status) { +		printf("Bad close Ctrl0 %d\n", status); +		return status; +	} +	status = uhd_dpdk_sock_close(eth_ctrl[1]); +	if (status) { +		printf("Bad close Ctrl1 %d\n", status); +		return status; +	} +	return status; +} diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk.c b/host/lib/transport/uhd-dpdk/uhd_dpdk.c new file mode 100644 index 000000000..2ee74a201 --- /dev/null +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk.c @@ -0,0 +1,363 @@ +// +// Copyright 2018 Ettus Research, a National Instruments Company +// +// SPDX-License-Identifier: GPL-3.0-or-later +// +#include "uhd_dpdk_ctx.h" +#include "uhd_dpdk_udp.h" +#include "uhd_dpdk_driver.h" +#include <stdlib.h> +#include <rte_errno.h> +#include <rte_malloc.h> +#include <rte_log.h> + +/* FIXME: Replace with configurable values */ +#define DEFAULT_RING_SIZE 512 + +/* FIXME: This needs to be protected */ +struct uhd_dpdk_ctx *ctx = NULL; + +/** + * TODO: Probably should provide way to get access to thread for a given port + * UHD's first calling thread will be the master thread + * In UHD, maybe check thread, and if it is different, pass work to that thread and optionally wait() on it (some condition variable) + */ + +/* TODO: For nice scheduling options later, make sure to separate RX and TX activity */ + + +int uhd_dpdk_port_count(void) +{ +    if (!ctx) +        return -ENODEV; +    return ctx->num_ports; +} + +struct eth_addr uhd_dpdk_get_eth_addr(unsigned int portid) +{ +    struct eth_addr retval; +    memset(retval.addr, 0xff, ETHER_ADDR_LEN); + +    struct uhd_dpdk_port *p = find_port(portid); +    if (p) { +        memcpy(retval.addr, p->mac_addr.addr_bytes, ETHER_ADDR_LEN); +    } +    return retval; +} + +int uhd_dpdk_get_ipv4_addr(unsigned int portid, uint32_t *ipv4_addr, uint32_t *netmask) +{ +    if (!ipv4_addr) +        return -EINVAL; +    struct uhd_dpdk_port *p = find_port(portid); +    if (p) { +        *ipv4_addr = p->ipv4_addr; +        if (netmask) { +            *netmask = p->netmask; +        } +        return 0; +    } +    return -ENODEV; +} + +int uhd_dpdk_set_ipv4_addr(unsigned int portid, uint32_t ipv4_addr, uint32_t netmask) +{ +    struct uhd_dpdk_port *p = find_port(portid); +    if (p) { +        p->ipv4_addr = ipv4_addr; +        p->netmask = netmask; +        return 0; +    } +    return -ENODEV; +} + +/* + * Initialize a given port using default settings and with the RX buffers + * coming from the mbuf_pool passed as a parameter. + * FIXME: Starting with assumption of one thread/core per port + */ +static inline int uhd_dpdk_port_init(struct uhd_dpdk_port *port, +                                     struct rte_mempool *rx_mbuf_pool, +                                     unsigned int mtu) +{ +    int retval; + +    /* Check for a valid port */ +    if (port->id >= rte_eth_dev_count()) +        return -ENODEV; + +    /* Set up Ethernet device with defaults (1 RX ring, 1 TX ring) */ +    /* FIXME: Check if hw_ip_checksum is possible */ +    struct rte_eth_conf port_conf = { +        .rxmode = { +            .max_rx_pkt_len = mtu, +            .jumbo_frame = 1, +            .hw_ip_checksum = 1, +        } +    }; +    retval = rte_eth_dev_configure(port->id, 1, 1, &port_conf); +    if (retval != 0) +        return retval; + +    retval = rte_eth_rx_queue_setup(port->id, 0, DEFAULT_RING_SIZE, +                 rte_eth_dev_socket_id(port->id), NULL, rx_mbuf_pool); +    if (retval < 0) +        return retval; + +    retval = rte_eth_tx_queue_setup(port->id, 0, DEFAULT_RING_SIZE, +                 rte_eth_dev_socket_id(port->id), NULL); +    if (retval < 0) +        goto port_init_fail; + +    /* Create the hash table for the RX sockets */ +    char name[32]; +    snprintf(name, sizeof(name), "rx_table_%u", port->id); +    struct rte_hash_parameters hash_params = { +        .name = name, +        .entries = UHD_DPDK_MAX_SOCKET_CNT, +        .key_len = sizeof(struct uhd_dpdk_ipv4_5tuple), +        .hash_func = NULL, +        .hash_func_init_val = 0, +    }; +    port->rx_table = rte_hash_create(&hash_params); +    if (port->rx_table == NULL) { +        retval = rte_errno; +        goto port_init_fail; +    } + +    /* Create ARP table */ +    snprintf(name, sizeof(name), "arp_table_%u", port->id); +    hash_params.name = name; +    hash_params.entries = UHD_DPDK_MAX_SOCKET_CNT; +    hash_params.key_len = sizeof(uint32_t); +    hash_params.hash_func = NULL; +    hash_params.hash_func_init_val = 0; +    port->arp_table = rte_hash_create(&hash_params); +    if (port->arp_table == NULL) { +        retval = rte_errno; +        goto free_rx_table; +    } + +    /* Set up list for TX queues */ +    LIST_INIT(&port->txq_list); + +    /* Start the Ethernet port. */ +    retval = rte_eth_dev_start(port->id); +    if (retval < 0) { +        goto free_arp_table; +    } + +    /* Display the port MAC address. */ +    rte_eth_macaddr_get(port->id, &port->mac_addr); +    RTE_LOG(INFO, EAL, "Port %u MAC: %02x %02x %02x %02x %02x %02x\n", +                (unsigned)port->id, +                port->mac_addr.addr_bytes[0], port->mac_addr.addr_bytes[1], +                port->mac_addr.addr_bytes[2], port->mac_addr.addr_bytes[3], +                port->mac_addr.addr_bytes[4], port->mac_addr.addr_bytes[5]); + +    struct rte_eth_link link; +    rte_eth_link_get(port->id, &link); +    RTE_LOG(INFO, EAL, "Port %u UP: %d\n", port->id, link.link_status); + +    return 0; + +free_arp_table: +    rte_hash_free(port->arp_table); +free_rx_table: +    rte_hash_free(port->rx_table); +port_init_fail: +    return rte_errno; +} + +static int uhd_dpdk_thread_init(struct uhd_dpdk_thread *thread, unsigned int id) +{ +    if (!ctx || !thread) +        return -EINVAL; + +    unsigned int socket_id = rte_lcore_to_socket_id(id); +    thread->id = id; +    thread->rx_pktbuf_pool = ctx->rx_pktbuf_pools[socket_id]; +    thread->tx_pktbuf_pool = ctx->tx_pktbuf_pools[socket_id]; +    LIST_INIT(&thread->port_list); + +    char name[32]; +    snprintf(name, sizeof(name), "sockreq_ring_%u", id); +    thread->sock_req_ring = rte_ring_create( +                               name, +                               UHD_DPDK_MAX_PENDING_SOCK_REQS, +                               socket_id, +                               RING_F_SC_DEQ +                            ); +    if (!thread->sock_req_ring) +        return -ENOMEM; +    return 0; +} + + +int uhd_dpdk_init(int argc, char **argv, unsigned int num_ports, +                  int *port_thread_mapping, int num_mbufs, int mbuf_cache_size, +                  int mtu) +{ +    /* Init context only once */ +    if (ctx) +        return 1; + +    if ((num_ports == 0) || (port_thread_mapping == NULL)) { +        return -EINVAL; +    } + +    /* Grabs arguments intended for DPDK's EAL */ +    int ret = rte_eal_init(argc, argv); +    if (ret < 0) +        rte_exit(EXIT_FAILURE, "Error with EAL initialization\n"); + +    ctx = (struct uhd_dpdk_ctx *) rte_zmalloc("uhd_dpdk_ctx", sizeof(*ctx), rte_socket_id()); +    if (!ctx) +        return -ENOMEM; + +    ctx->num_threads = rte_lcore_count(); +    if (ctx->num_threads <= 1) +        rte_exit(EXIT_FAILURE, "Error: No worker threads enabled\n"); + +    /* Check that we have ports to send/receive on */ +    ctx->num_ports = rte_eth_dev_count(); +    if (ctx->num_ports < 1) +        rte_exit(EXIT_FAILURE, "Error: Found no ports\n"); +    if (ctx->num_ports < num_ports) +        rte_exit(EXIT_FAILURE, "Error: User requested more ports than available\n"); + +    /* Get memory for thread and port data structures */ +    ctx->threads = rte_zmalloc("uhd_dpdk_thread", RTE_MAX_LCORE*sizeof(struct uhd_dpdk_thread), 0); +    if (!ctx->threads) +        rte_exit(EXIT_FAILURE, "Error: Could not allocate memory for thread data\n"); +    ctx->ports = rte_zmalloc("uhd_dpdk_port", ctx->num_ports*sizeof(struct uhd_dpdk_port), 0); +    if (!ctx->ports) +        rte_exit(EXIT_FAILURE, "Error: Could not allocate memory for port data\n"); + +    /* Initialize the thread data structures */ +    for (int i = rte_get_next_lcore(-1, 1, 0); +        (i < RTE_MAX_LCORE); +        i = rte_get_next_lcore(i, 1, 0)) +    { +        /* Do one mempool of RX/TX per socket */ +        unsigned int socket_id = rte_lcore_to_socket_id(i); +        /* FIXME Probably want to take into account actual number of ports per socket */ +        if (ctx->tx_pktbuf_pools[socket_id] == NULL) { +            /* Creates a new mempool in memory to hold the mbufs. +             * This is done for each CPU socket +             */ +            const int mbuf_size = mtu + 2048 + RTE_PKTMBUF_HEADROOM; +            char name[32]; +            snprintf(name, sizeof(name), "rx_mbuf_pool_%u", socket_id); +            ctx->rx_pktbuf_pools[socket_id] = rte_pktmbuf_pool_create( +                                               name, +                                               ctx->num_ports*num_mbufs, +                                               mbuf_cache_size, +                                               0, +                                               mbuf_size, +                                               socket_id +                                           ); +            snprintf(name, sizeof(name), "tx_mbuf_pool_%u", socket_id); +            ctx->tx_pktbuf_pools[socket_id] = rte_pktmbuf_pool_create( +                                               name, +                                               ctx->num_ports*num_mbufs, +                                               mbuf_cache_size, +                                               0, +                                               mbuf_size, +                                               socket_id +                                           ); +            if ((ctx->rx_pktbuf_pools[socket_id]== NULL) || +                (ctx->tx_pktbuf_pools[socket_id]== NULL)) +                rte_exit(EXIT_FAILURE, "Cannot create mbuf pool\n"); +        } + +        if (uhd_dpdk_thread_init(&ctx->threads[i], i) < 0) +            rte_exit(EXIT_FAILURE, "Error initializing thread %i\n", i); +    } + +    unsigned master_lcore = rte_get_master_lcore(); + +    /* Assign ports to threads and initialize the port data structures */ +    for (unsigned int i = 0; i < num_ports; i++) { +        int thread_id = port_thread_mapping[i]; +        if (thread_id < 0) +            continue; +        if (((unsigned int) thread_id) == master_lcore) +            RTE_LOG(WARNING, EAL, "User requested master lcore for port %u\n", i); +        if (ctx->threads[thread_id].id != (unsigned int) thread_id) +            rte_exit(EXIT_FAILURE, "Requested inactive lcore %u for port %u\n", (unsigned int) thread_id, i); + +        struct uhd_dpdk_port *port = &ctx->ports[i]; +        port->id = i; +        port->parent = &ctx->threads[thread_id]; +        ctx->threads[thread_id].num_ports++; +        LIST_INSERT_HEAD(&ctx->threads[thread_id].port_list, port, port_entry); + +        /* Initialize port. */ +        if (uhd_dpdk_port_init(port, port->parent->rx_pktbuf_pool, mtu) != 0) +            rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu8 "\n", +                    i); +    } + +    RTE_LOG(INFO, EAL, "Init DONE!\n"); + +    /* FIXME: Create functions to do this */ +    RTE_LOG(INFO, EAL, "Starting I/O threads!\n"); + +    for (int i = rte_get_next_lcore(-1, 1, 0); +        (i < RTE_MAX_LCORE); +        i = rte_get_next_lcore(i, 1, 0)) +    { +        struct uhd_dpdk_thread *t = &ctx->threads[i]; +        if (!LIST_EMPTY(&t->port_list)) { +            rte_eal_remote_launch(_uhd_dpdk_driver_main, NULL, ctx->threads[i].id); +        } +    } +    return 0; +} + +/* FIXME: This will be changed once we have functions to handle the threads */ +int uhd_dpdk_destroy(void) +{ +    if (!ctx) +        return -ENODEV; + +    struct uhd_dpdk_config_req *req = (struct uhd_dpdk_config_req *) rte_zmalloc(NULL, sizeof(*req), 0); +    if (!req) +        return -ENOMEM; + +    req->req_type = UHD_DPDK_LCORE_TERM; + +    for (int i = rte_get_next_lcore(-1, 1, 0); +        (i < RTE_MAX_LCORE); +        i = rte_get_next_lcore(i, 1, 0)) +    { +        struct uhd_dpdk_thread *t = &ctx->threads[i]; + +        if (LIST_EMPTY(&t->port_list)) +            continue; + +        if (rte_eal_get_lcore_state(t->id) == FINISHED) +            continue; + +        pthread_mutex_init(&req->mutex, NULL); +        pthread_cond_init(&req->cond, NULL); +        pthread_mutex_lock(&req->mutex); +        if (rte_ring_enqueue(t->sock_req_ring, req)) { +            pthread_mutex_unlock(&req->mutex); +            RTE_LOG(ERR, USER2, "Failed to terminate thread %d\n", i); +            rte_free(req); +            return -ENOSPC; +        } +        struct timespec timeout = { +            .tv_sec = 1, +            .tv_nsec = 0 +        }; +        pthread_cond_timedwait(&req->cond, &req->mutex, &timeout); +        pthread_mutex_unlock(&req->mutex); +    } + +    rte_free(req); +    return 0; +} + diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h b/host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h new file mode 100644 index 000000000..31c9dba0c --- /dev/null +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h @@ -0,0 +1,253 @@ +// +// Copyright 2018 Ettus Research, a National Instruments Company +// +// SPDX-License-Identifier: GPL-3.0-or-later +// +#ifndef _UHD_DPDK_CTX_H_ +#define _UHD_DPDK_CTX_H_ + +#include <stdint.h> +#include <sys/queue.h> +#include <sys/types.h> +#include <rte_ethdev.h> +#include <rte_mbuf.h> +#include <rte_hash.h> +#include <rte_eal.h> +#include <uhd/transport/uhd-dpdk.h> +//#include <pthread.h> + +/* For nice scheduling options later, make sure to separate RX and TX activity */ + +#define UHD_DPDK_MAX_SOCKET_CNT 1024 +#define UHD_DPDK_MAX_PENDING_SOCK_REQS 16 +#define UHD_DPDK_TXQ_SIZE 64 +#define UHD_DPDK_TX_BURST_SIZE (UHD_DPDK_TXQ_SIZE - 1) +#define UHD_DPDK_RXQ_SIZE 64 +#define UHD_DPDK_RX_BURST_SIZE (UHD_DPDK_RXQ_SIZE - 1) + +struct uhd_dpdk_port; + +/** + * + * All memory allocation for port, rx_ring, and tx_ring owned by I/O thread + * Rest owned by user thread + * + * port: port servicing this socket + * tid: thread ID that owns this socket (to be associated with TX queue) + * sock_type: Type of socket + * priv: Private data, based on sock_type + * rx_ring: pointer to individual rx_ring (created during init--Also used as free buffer ring for TX) + * tx_ring: pointer to shared tx_ring (with all sockets for this tid) + * tx_buf_count: Number of buffers currently outside the rings + * tx_entry: List node for TX Queue tracking + * + * If a user closes a socket without outstanding TX buffers, user must free the + * buffers. Otherwise, that memory will be leaked, and usage will grow. + */ +struct uhd_dpdk_socket { +    struct uhd_dpdk_port *port; +    pid_t tid; +    enum uhd_dpdk_sock_type sock_type; +    void *priv; +    struct rte_ring *rx_ring; +    struct rte_ring *tx_ring; +    int tx_buf_count; +    LIST_ENTRY(uhd_dpdk_socket) tx_entry; +}; +LIST_HEAD(uhd_dpdk_tx_head, uhd_dpdk_socket); + +/************************************************ + * Configuration + ************************************************/ +enum uhd_dpdk_sock_req { +    UHD_DPDK_SOCK_OPEN = 0, +    UHD_DPDK_SOCK_CLOSE, +    UHD_DPDK_LCORE_TERM, +    UHD_DPDK_SOCK_REQ_COUNT +}; + +/** + * port: port associated with this request + * sock: socket associated with this request + * req_type: Open, Close, or terminate lcore + * sock_type: Only udp is supported + * cond: Used to sleep until socket creation is finished + * mutex: associated with cond + * entry: List node for requests pending ARP responses + * priv: private data + * retval: Result of call (needed post-wakeup) + */ +struct uhd_dpdk_config_req { +    struct uhd_dpdk_port *port; +    struct uhd_dpdk_socket *sock; +    enum uhd_dpdk_sock_req req_type; +    enum uhd_dpdk_sock_type sock_type; +    pthread_cond_t cond; +    pthread_mutex_t mutex; +    LIST_ENTRY(uhd_dpdk_config_req) entry; +    void *priv; +    int retval; +}; +LIST_HEAD(uhd_dpdk_config_head, uhd_dpdk_config_req); + +/************************************************ + * RX Table + ************************************************/ +struct uhd_dpdk_arp_entry { +    struct ether_addr mac_addr; +    struct uhd_dpdk_config_head pending_list; /* Config reqs pending ARP--Thread-unsafe */ +}; + +struct uhd_dpdk_ipv4_5tuple { +    enum uhd_dpdk_sock_type sock_type; +    uint32_t src_ip; +    uint32_t dst_ip; +    uint16_t src_port; +    uint16_t dst_port; +}; + +/************************************************ + * TX Queues + * + * 1 TX Queue per thread sending through a hardware port + * All memory allocation owned by I/O thread + * + * tid: thread id + * queue: TX queue holding threads prepared packets (via send()) + * retry_queue: queue holding packets that couldn't be sent + * freebufs: queue holding empty buffers + * tx_list: list of sockets using this queue + * entry: list node for port to track TX queues + * + * queue, retry_queue, and freebufs are single-producer, single-consumer queues + * retry_queue wholly-owned by I/O thread + * For queue, user thread is producer, I/O thread is consumer + * For freebufs, user thread is consumer, I/O thread is consumer + * + * All queues are same size, and they are shared between all sockets on one + * thread (tid is the identifier) + * 1. Buffers start in freebufs (user gets buffers from freebufs) + * 2. User submits packet to queue + * 3. If packet couldn't be sent, it is (re)enqueued on retry_queue + ************************************************/ +struct uhd_dpdk_tx_queue { +    pid_t tid; +    struct rte_ring *queue; +    struct rte_ring *retry_queue; +    struct rte_ring *freebufs; +    struct uhd_dpdk_tx_head tx_list; +    LIST_ENTRY(uhd_dpdk_tx_queue) entry; +}; +LIST_HEAD(uhd_dpdk_txq_head, uhd_dpdk_tx_queue); + +/************************************************ + * Port structure + * + * All memory allocation owned by I/O thread + * + * id: hardware port id (for DPDK) + * parent: I/O thread servicing this port + * mac_addr: MAC address of this port + * ipv4_addr: IPv4 address of this port + * netmask: Subnet mask of this port + * arp_table: ARP cache for this port + * rx_table: Mapping of 5-tuple key to sockets for RX + * txq_list: List of TX queues associated with this port + * port_entry: List node entry for I/O thread to track + ************************************************/ +struct uhd_dpdk_port { +    unsigned int id; +    struct uhd_dpdk_thread *parent; +    struct ether_addr mac_addr; +    uint32_t ipv4_addr; /* FIXME: Check this before allowing a socket!!! */ +    uint32_t netmask; +    /* Key = IP addr +     * Value = MAC addr (ptr to uhd_dpdk_arp_entry) +     */ +    struct rte_hash *arp_table; +    /* hash map of RX sockets +     * Key = uhd_dpdk_ipv4_5tuple +     * Value = uhd_dpdk_socket +     */ +    struct rte_hash *rx_table; +    /* doubly-linked list of TX sockets */ +    struct uhd_dpdk_txq_head txq_list; +    LIST_ENTRY(uhd_dpdk_port) port_entry; +}; + +LIST_HEAD(uhd_dpdk_port_head, uhd_dpdk_port); + +/************************************************ + * Thread/lcore-private data structure + * + * All data owned by global context + * + * id: lcore id (from DPDK) + * rx_pktbuf_pool: memory pool for generating buffers for RX packets + * tx_pktbuf_pool: memory pool for generating buffers for TX packets + * num_ports: Number of ports this lcore is servicing + * port_list: List of ports this lcore is servicing + * sock_req_ring: Queue for user threads to submit service requests to the lcore + * + * sock_req_ring is a multi-producer, single-consumer queue + * + * For threads that have ports: + * Launch individually + * For threads without ports: + * Do not launch unless user specifically does it themselves. + * Should also have master lcore returned to user + * REMEMBER: Without args, DPDK creates an lcore for each CPU core! + */ +struct uhd_dpdk_thread { +    unsigned int id; +    struct rte_mempool *rx_pktbuf_pool; +    struct rte_mempool *tx_pktbuf_pool; +    int num_ports; +    struct uhd_dpdk_port_head port_list; +    struct rte_ring *sock_req_ring; +}; + + +/************************************************ + * One global context + * + * num_threads: Number of DPDK lcores tracked + * num_ports: Number of DPDK/NIC ports tracked + * threads: Array of all lcores/threads + * ports: Array of all DPDK/NIC ports + * rx_pktbuf_pools: Array of all packet buffer pools for RX + * tx_pktbuf_pools: Array of all packet buffer pools for TX + * + * The packet buffer pools are memory pools that are associated with a CPU + * socket. They will provide storage close to the socket to accommodate NUMA + * nodes. + ************************************************/ +struct uhd_dpdk_ctx { +    unsigned int num_threads; +    unsigned int num_ports; +    struct uhd_dpdk_thread *threads; +    struct uhd_dpdk_port *ports; +    struct rte_mempool *rx_pktbuf_pools[RTE_MAX_NUMA_NODES]; +    struct rte_mempool *tx_pktbuf_pools[RTE_MAX_NUMA_NODES]; +}; + +extern struct uhd_dpdk_ctx *ctx; + +static inline struct uhd_dpdk_port * find_port(unsigned int portid) +{ +    if (!ctx) +        return NULL; + +    for (unsigned int i = 0; i < ctx->num_threads; i++) { +        struct uhd_dpdk_thread *t = &ctx->threads[i]; +        struct uhd_dpdk_port *p; +        LIST_FOREACH(p, &t->port_list, port_entry) { +            if (p->id == portid) { +                return p; +            } +        } +    } +    return NULL; +} + +#endif /* _UHD_DPDK_CTX_H_ */ diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c b/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c new file mode 100644 index 000000000..0af9cc4e5 --- /dev/null +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c @@ -0,0 +1,401 @@ +// +// Copyright 2018 Ettus Research, a National Instruments Company +// +// SPDX-License-Identifier: GPL-3.0-or-later +// +#include "uhd_dpdk_driver.h" +#include "uhd_dpdk_fops.h" +#include "uhd_dpdk_udp.h" +#include <rte_malloc.h> +#include <rte_mempool.h> +#include <arpa/inet.h> +#include <unistd.h> + +int _uhd_dpdk_process_arp(struct uhd_dpdk_port *port, struct arp_hdr *arp_frame) +{ +    uint32_t dest_ip = arp_frame->arp_data.arp_sip; +    struct ether_addr dest_addr = arp_frame->arp_data.arp_sha; + +    struct uhd_dpdk_arp_entry *entry = NULL; +    rte_hash_lookup_data(port->arp_table, &dest_ip, (void **) &entry); +    if (!entry) { +        entry = rte_zmalloc(NULL, sizeof(*entry), 0); +        if (!entry) { +            return -ENOMEM; +        } +        LIST_INIT(&entry->pending_list); +        ether_addr_copy(&dest_addr, &entry->mac_addr); +        if (rte_hash_add_key_data(port->arp_table, &dest_ip, entry) < 0) { +            rte_free(entry); +            return -ENOSPC; +        } +    } else { +        struct uhd_dpdk_config_req *req = NULL; +        ether_addr_copy(&dest_addr, &entry->mac_addr); +        /* Now wake any config reqs waiting for the ARP */ +        LIST_FOREACH(req, &entry->pending_list, entry) { +            _uhd_dpdk_config_req_compl(req, 0); +        } +    } + +    return 0; +} + +int _uhd_dpdk_arp_request(struct uhd_dpdk_port *port, uint32_t ip) +{ +    struct rte_mbuf *mbuf; +    struct ether_hdr *hdr; +    struct arp_hdr *arp_frame; + +    mbuf = rte_pktmbuf_alloc(port->parent->tx_pktbuf_pool); +    if (unlikely(mbuf == NULL)) { +        RTE_LOG(WARNING, MEMPOOL, "Could not allocate packet buffer for ARP request\n"); +        return -ENOMEM; +    } + +    hdr = rte_pktmbuf_mtod(mbuf, struct ether_hdr *); +    arp_frame = (struct arp_hdr *) &hdr[1]; + +    memset(hdr->d_addr.addr_bytes, 0xFF, ETHER_ADDR_LEN); +    ether_addr_copy(&port->mac_addr, &hdr->s_addr); +    hdr->ether_type = rte_cpu_to_be_16(ETHER_TYPE_ARP); + +    arp_frame->arp_hrd = rte_cpu_to_be_16(ARP_HRD_ETHER); +    arp_frame->arp_pro = rte_cpu_to_be_16(ETHER_TYPE_IPv4); +    arp_frame->arp_hln = 6; +    arp_frame->arp_pln = 4; +    arp_frame->arp_op  = rte_cpu_to_be_16(ARP_OP_REQUEST); +    ether_addr_copy(&port->mac_addr, &arp_frame->arp_data.arp_sha); +    arp_frame->arp_data.arp_sip = port->ipv4_addr; +    memset(arp_frame->arp_data.arp_tha.addr_bytes, 0x00, ETHER_ADDR_LEN); +    arp_frame->arp_data.arp_tip = ip; + +    mbuf->pkt_len = 42; +    mbuf->data_len = 42; +    mbuf->ol_flags = PKT_TX_IP_CKSUM; + +    if (rte_eth_tx_burst(port->id, 0, &mbuf, 1) != 1) { +        RTE_LOG(WARNING, RING, "%s: TX descriptor ring is full\n", __func__); +        rte_pktmbuf_free(mbuf); +        return -EAGAIN; +    } +    return 0; +} + + +int _uhd_dpdk_process_udp(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, struct udp_hdr *pkt) +{ +    int status = 0; +    struct uhd_dpdk_ipv4_5tuple ht_key = { +        .sock_type = UHD_DPDK_SOCK_UDP, +        .src_ip = 0, +        .src_port = 0, +        .dst_ip = 0, +        .dst_port = pkt->dst_port +    }; + +    struct uhd_dpdk_socket *sock = NULL; +    rte_hash_lookup_data(port->rx_table, &ht_key, (void **) &sock); +    if (!sock) { +        status = -ENODEV; +        goto udp_rx_drop; +    } + +    struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) sock->priv; +    status = rte_ring_enqueue(sock->rx_ring, mbuf); +    if (status) { +        pdata->dropped_pkts++; +        goto udp_rx_drop; +    } +    return 0; + +udp_rx_drop: +    rte_pktmbuf_free(mbuf); +    return status; +} + +int _uhd_dpdk_process_ipv4(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, struct ipv4_hdr *pkt) +{ +    if (pkt->dst_addr != port->ipv4_addr) { +        rte_pktmbuf_free(mbuf); +        return -ENODEV; +    } +    if (pkt->next_proto_id == 0x11) { +        return _uhd_dpdk_process_udp(port, mbuf, (struct udp_hdr *) &pkt[1]); +    } +    rte_pktmbuf_free(mbuf); +    return -EINVAL; +} + +static int _uhd_dpdk_fill_ipv4_addr(struct uhd_dpdk_port *port, +                                    struct rte_mbuf *mbuf) +{ +    struct ether_hdr *eth_hdr = rte_pktmbuf_mtod(mbuf, struct ether_hdr *); +    struct ipv4_hdr *ip_hdr = (struct ipv4_hdr *) ð_hdr[1]; +    if (is_broadcast(port, ip_hdr->dst_addr)) { +        memset(eth_hdr->d_addr.addr_bytes, 0xff, ETHER_ADDR_LEN); +    } else { +        /* Lookup dest_addr */ +        struct uhd_dpdk_arp_entry *entry = NULL; +        rte_hash_lookup_data(port->arp_table, &ip_hdr->dst_addr, (void **) &entry); +        if (!entry) { +            RTE_LOG(ERR, USER1, "TX packet on port %d to addr 0x%08x has no ARP entry\n", port->id, ip_hdr->dst_addr); +            return -ENODEV; +        } + +        ether_addr_copy(&entry->mac_addr, ð_hdr->d_addr); +    } +    return 0; +} + +static int _uhd_dpdk_send(struct uhd_dpdk_port *port, +                          struct uhd_dpdk_tx_queue *txq, +                          struct rte_ring *q) +{ +    struct rte_mbuf *buf; + +    unsigned int num_tx = rte_ring_count(q); +    num_tx = (num_tx < UHD_DPDK_TX_BURST_SIZE) ? num_tx : UHD_DPDK_TX_BURST_SIZE; +    for (unsigned int i = 0; i < num_tx; i++) { +        int status = rte_ring_dequeue(q, (void **) &buf); +        if (status) { +            RTE_LOG(ERR, USER1, "%s: Q Count doesn't match actual\n", __func__); +            break; +        } +        struct ether_hdr *eth_hdr = rte_pktmbuf_mtod(buf, struct ether_hdr *); +        if (eth_hdr->ether_type == rte_cpu_to_be_16(ETHER_TYPE_IPv4)) { +            status = _uhd_dpdk_fill_ipv4_addr(port, buf); +            if (status) { +                return status; +            } +        } +        status = rte_eth_tx_burst(port->id, 0, &buf, 1); /* Automatically frees mbuf */ +        if (status != 1) { +            status = rte_ring_enqueue(txq->retry_queue, buf); +            if (status) +                RTE_LOG(WARNING, USER1, "%s: Could not re-enqueue pkt %d\n", __func__, i); +            num_tx = i; +            rte_pktmbuf_free(buf); +            break; +        } +    } + +    return num_tx; +} + +static inline int _uhd_dpdk_restore_bufs(struct uhd_dpdk_port *port, +                                         struct uhd_dpdk_tx_queue *q, +                                         unsigned int num_bufs) +{ +    /* Allocate more buffers to replace the sent ones */ +    struct rte_mbuf *freebufs[UHD_DPDK_TXQ_SIZE]; +    int status = rte_pktmbuf_alloc_bulk(port->parent->tx_pktbuf_pool, freebufs, num_bufs); +    if (status) { +        RTE_LOG(ERR, USER1, "%d %s: Could not restore %u pktmbufs in bulk!\n", status, __func__, num_bufs); +    } + +    /* Enqueue the buffers for the user thread to retrieve */ +    unsigned int enqd = rte_ring_enqueue_bulk(q->freebufs, (void **) freebufs, num_bufs, NULL); +    if (enqd != num_bufs) { +        RTE_LOG(ERR, USER1, "Could not enqueue pktmbufs!\n"); +        return status; +    } + +    return num_bufs; +} + +static inline void _uhd_dpdk_disable_ports(struct uhd_dpdk_thread *t) +{ +    struct uhd_dpdk_port *port = NULL; +    LIST_FOREACH(port, &t->port_list, port_entry) { +        rte_eth_dev_stop(port->id); +    } +} + +static inline int _uhd_dpdk_driver_cleanup(struct uhd_dpdk_thread *t) +{ +    /* Close sockets upon request, but reply to other service requests with +     * errors +     */ +    struct uhd_dpdk_config_req *sock_req; +    if (rte_ring_dequeue(t->sock_req_ring, (void **) &sock_req)) { +        switch (sock_req->req_type) { +        case UHD_DPDK_SOCK_CLOSE: +            _uhd_dpdk_sock_release(sock_req); +            break; +        default: +            _uhd_dpdk_config_req_compl(sock_req, -ENODEV); +            break; +        } +    } + +    /* Do nothing if there are users remaining */ +    struct uhd_dpdk_port *port = NULL; +    LIST_FOREACH(port, &t->port_list, port_entry) { +        /* Check for RX sockets */ +        const void *hash_key; +        void *hash_sock; +        uint32_t hash_next = 0; +        if (rte_hash_iterate(port->rx_table, &hash_key, +                             &hash_sock, &hash_next) != -ENOENT) +            return -EAGAIN; + +        /* Check for TX sockets */ +        struct uhd_dpdk_tx_queue *q = NULL; +        LIST_FOREACH(q, &port->txq_list, entry) { +            if (!LIST_EMPTY(&q->tx_list)) +                return -EAGAIN; +        } +    } + +    /* Now can free memory */ +    LIST_FOREACH(port, &t->port_list, port_entry) { +        rte_hash_free(port->rx_table); + +        struct uhd_dpdk_tx_queue *q = LIST_FIRST(&port->txq_list); +        while (!LIST_EMPTY(&port->txq_list)) { +            struct uhd_dpdk_tx_queue *nextq = LIST_NEXT(q, entry); +            while (!rte_ring_empty(q->queue)) { +                struct rte_buf *buf = NULL; +                rte_ring_dequeue(q->queue, (void **) &buf); +                rte_free(buf); +            } +            while (!rte_ring_empty(q->freebufs)) { +                struct rte_buf *buf = NULL; +                rte_ring_dequeue(q->freebufs, (void **) &buf); +                rte_free(buf); +            } +            while (!rte_ring_empty(q->retry_queue)) { +                struct rte_buf *buf = NULL; +                rte_ring_dequeue(q->retry_queue, (void **) &buf); +                rte_free(buf); +            } +            rte_ring_free(q->queue); +            rte_ring_free(q->freebufs); +            rte_ring_free(q->retry_queue); +            rte_free(q); +            q = nextq; +        } + +        const void *arp_key; +        uint32_t arp_key_next = 0; +        struct uhd_dpdk_arp_entry *arp_entry = NULL; +        while (rte_hash_iterate(port->arp_table, &arp_key, +                                (void **) &arp_entry, &arp_key_next) >= 0) { +            rte_free(arp_entry); +        } +        rte_hash_free(port->arp_table); +    } + +    return 0; +} + +int _uhd_dpdk_driver_main(void *arg) +{ + +    /* Don't currently have use for arguments */ +    if (arg) +        return -EINVAL; + +    /* Check that this is a valid lcore */ +    unsigned int lcore_id = rte_lcore_id(); +    if (lcore_id == LCORE_ID_ANY) +        return -ENODEV; + +    /* Check that this lcore has ports */ +    struct uhd_dpdk_thread *t = &ctx->threads[lcore_id]; +    if (t->id != lcore_id) +        return -ENODEV; + +    RTE_LOG(INFO, USER2, "Thread %d started\n", lcore_id); +    int status = 0; +    while (!status) { +        /* Check for open()/close() requests and service 1 at a time */ +        struct uhd_dpdk_config_req *sock_req; +        if (rte_ring_dequeue(t->sock_req_ring, (void **) &sock_req) == 0) { +            /* FIXME: Not checking return vals */ +            switch (sock_req->req_type) { +            case UHD_DPDK_SOCK_OPEN: +                _uhd_dpdk_sock_setup(sock_req); +                break; +            case UHD_DPDK_SOCK_CLOSE: +                _uhd_dpdk_sock_release(sock_req); +                break; +            case UHD_DPDK_LCORE_TERM: +                RTE_LOG(INFO, EAL, "Terminating lcore %u\n", lcore_id); +                status = 1; +                _uhd_dpdk_config_req_compl(sock_req, 0); +                break; +            default: +                RTE_LOG(ERR, USER2, "Invalid socket request %d\n", sock_req->req_type); +                break; +            } +        } +        /* For each port, attempt to receive packets and process */ +        struct uhd_dpdk_port *port = NULL; +        LIST_FOREACH(port, &t->port_list, port_entry) { +            struct ether_hdr *hdr; +            char *l2_data; +            struct rte_mbuf *bufs[UHD_DPDK_RX_BURST_SIZE]; +            const uint16_t num_rx = rte_eth_rx_burst(port->id, 0, +                                       bufs, UHD_DPDK_RX_BURST_SIZE); +            if (unlikely(num_rx == 0)) { +                 continue; +            } + +            for (int buf = 0; buf < num_rx; buf++) { +                uint64_t ol_flags = bufs[buf]->ol_flags; +                hdr = rte_pktmbuf_mtod(bufs[buf], struct ether_hdr *); +                l2_data = (char *) &hdr[1]; +                switch (rte_be_to_cpu_16(hdr->ether_type)) { +                case ETHER_TYPE_ARP: +                    _uhd_dpdk_process_arp(port, (struct arp_hdr *) l2_data); +                    rte_pktmbuf_free(bufs[buf]); +                    break; +                case ETHER_TYPE_IPv4: +                    if (ol_flags == PKT_RX_IP_CKSUM_BAD) { /* TODO: Track IP cksum errors? */ +                        RTE_LOG(WARNING, RING, "Buf %d: Bad IP cksum\n", buf); +                    } else { +                        _uhd_dpdk_process_ipv4(port, bufs[buf], (struct ipv4_hdr *) l2_data); +                    } +                    break; +                default: +                    rte_pktmbuf_free(bufs[buf]); +                    break; +                } +            } +        } +        /* For each port's TX queues, do TX */ +        LIST_FOREACH(port, &t->port_list, port_entry) { +            struct uhd_dpdk_tx_queue *q = NULL; +            LIST_FOREACH(q, &port->txq_list, entry) { +                if (!rte_ring_empty(q->retry_queue)) { +                    int num_retry = _uhd_dpdk_send(port, q, q->retry_queue); +                    _uhd_dpdk_restore_bufs(port, q, num_retry); +                    if (!rte_ring_empty(q->retry_queue)) { +                        break; +                    } +                } +                if (rte_ring_empty(q->queue)) { +                    continue; +                } +                int num_tx = _uhd_dpdk_send(port, q, q->queue); +                if (num_tx > 0) { +                    _uhd_dpdk_restore_bufs(port, q, num_tx); +                } else { +                    break; +                } +            } +        } +    } + +    /* Now turn off ports */ +    _uhd_dpdk_disable_ports(t); + +    /* Now clean up before exiting */ +    int cleaning = -EAGAIN; +    while (cleaning == -EAGAIN) { +        cleaning = _uhd_dpdk_driver_cleanup(t); +    } +    return status; +} diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.h b/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.h new file mode 100644 index 000000000..b0d3e42cd --- /dev/null +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.h @@ -0,0 +1,32 @@ +// +// Copyright 2018 Ettus Research, a National Instruments Company +// +// SPDX-License-Identifier: GPL-3.0-or-later +// +#ifndef _UHD_DPDK_DRIVER_H_ +#define _UHD_DPDK_DRIVER_H_ + +#include "uhd_dpdk_ctx.h" +#include <rte_mbuf.h> +#include <rte_arp.h> +#include <rte_udp.h> +#include <rte_ip.h> + +static inline bool is_broadcast(struct uhd_dpdk_port *port, uint32_t dst_ipv4_addr) +{ +    uint32_t network = port->netmask | ((~port->netmask) & dst_ipv4_addr); +    return (network == 0xffffffff); +} + + +int _uhd_dpdk_process_arp(struct uhd_dpdk_port *port, struct arp_hdr *arp_frame); +int _uhd_dpdk_process_udp(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, struct udp_hdr *pkt); +int _uhd_dpdk_process_ipv4(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, struct ipv4_hdr *pkt); +int _uhd_dpdk_send_udp(struct uhd_dpdk_port *port, +                       struct uhd_dpdk_socket *sock, +                       struct rte_mbuf *mbuf); +int _uhd_dpdk_arp_request(struct uhd_dpdk_port *port, +                          uint32_t ip); + +int _uhd_dpdk_driver_main(void *arg); +#endif /* _UHD_DPDK_DRIVER_H_ */ diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.c b/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.c new file mode 100644 index 000000000..3acc3d709 --- /dev/null +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.c @@ -0,0 +1,306 @@ +// +// Copyright 2018 Ettus Research, a National Instruments Company +// +// SPDX-License-Identifier: GPL-3.0-or-later +// +#include "uhd_dpdk_fops.h" +#include "uhd_dpdk_udp.h" +#include <rte_malloc.h> +#include <rte_ip.h> + +/************************************************ + * I/O thread ONLY + * + * TODO: Decide whether to allow blocking on mutex + *     This would cause the I/O thread to sleep, which isn't desireable + *     Could throw in a "request completion cleanup" section in I/O thread's + *     main loop, though. Just keep trying until the requesting thred is woken + *     up. This would be to handle the case where the thread hadn't finished + *     setting itself up to wait on the condition variable, but the I/O thread + *     still got the request. + */ +int _uhd_dpdk_config_req_compl(struct uhd_dpdk_config_req *req, int retval) +{ +    req->retval = retval; +    int stat = pthread_mutex_trylock(&req->mutex); +    if (stat) { +        RTE_LOG(ERR, USER1, "%s: Could not lock req mutex\n", __func__); +        return stat; +    } +    stat = pthread_cond_signal(&req->cond); +    pthread_mutex_unlock(&req->mutex); +    if (stat) { +        RTE_LOG(ERR, USER1, "%s: Could not signal req cond\n", __func__); +        return stat; +    } +    return 0; +} + +int _uhd_dpdk_sock_setup(struct uhd_dpdk_config_req *req) +{ +    int stat = 0; +    switch (req->sock_type) { +    case UHD_DPDK_SOCK_UDP: +        stat = _uhd_dpdk_udp_setup(req); +        break; +    default: +        stat = -EINVAL; +        _uhd_dpdk_config_req_compl(req, -EINVAL); +    } +    return stat; +} + +int _uhd_dpdk_sock_release(struct uhd_dpdk_config_req *req) +{ +    int stat = 0; +    switch (req->sock_type) { +    case UHD_DPDK_SOCK_UDP: +        stat = _uhd_dpdk_udp_release(req); +        break; +    default: +        stat = -EINVAL; +        _uhd_dpdk_config_req_compl(req, -EINVAL); +    } + +    return stat; +} + +/************************************************ + * API calls + */ +struct uhd_dpdk_socket* uhd_dpdk_sock_open(unsigned int portid, +                                           enum uhd_dpdk_sock_type t, void *sockarg) +{ +    if (!ctx || (t >= UHD_DPDK_SOCK_TYPE_COUNT)) { +        return NULL; +    } + +    struct uhd_dpdk_port *port = find_port(portid); +    if (!port) { +        return NULL; +    } + +    if (!port->ipv4_addr) { +        RTE_LOG(WARNING, EAL, "Please set IPv4 address for port %u before opening socket\n", portid); +        return NULL; +    } + + +    struct uhd_dpdk_config_req *req = (struct uhd_dpdk_config_req *) rte_zmalloc(NULL, sizeof(*req), 0); +    if (!req) { +        return NULL; +    } + +    struct uhd_dpdk_socket *s = (struct uhd_dpdk_socket *) rte_zmalloc(NULL, sizeof(*s), 0); +    if (!s) { +        goto sock_open_end; +    } + +    s->port = port; +    req->sock = s; +    req->req_type = UHD_DPDK_SOCK_OPEN; +    req->sock_type = t; +    req->retval = -ETIMEDOUT; +    pthread_mutex_init(&req->mutex, NULL); +    pthread_condattr_t condattr; +    pthread_condattr_init(&condattr); +    pthread_condattr_setclock(&condattr, CLOCK_MONOTONIC); +    pthread_cond_init(&req->cond, &condattr); + +    switch (t) { +    case UHD_DPDK_SOCK_UDP: +        uhd_dpdk_udp_open(req, sockarg); +        break; +    default: +        break; +    } + +    if (req->retval) { +        rte_free(s); +        s = NULL; +    } + +sock_open_end: +    rte_free(req); +    return s; +} + +int uhd_dpdk_sock_close(struct uhd_dpdk_socket *sock) +{ +    if (!ctx || !sock) +        return -EINVAL; + +    struct uhd_dpdk_config_req *req = (struct uhd_dpdk_config_req *) rte_zmalloc(NULL, sizeof(*req), 0); +    if (!req) +        return -ENOMEM; +    req->sock = sock; +    req->req_type = UHD_DPDK_SOCK_CLOSE; +    req->sock_type = sock->sock_type; +    req->retval = -ETIMEDOUT; +    pthread_mutex_init(&req->mutex, NULL); +    pthread_cond_init(&req->cond, NULL); + +    switch (sock->sock_type) { +    case UHD_DPDK_SOCK_UDP: +        uhd_dpdk_udp_close(req); +        break; +    default: +        break; +    } + +    if (req->retval) { +        rte_free(req); +        return req->retval; +    } + +    rte_free(sock); +    return 0; +} + +/* + * TODO: + *     Add blocking calls with timeout + *     Implementation would involve a condition variable, like config reqs + *     Also would create a cleanup section in I/O main loop (like config reqs) + */ +int uhd_dpdk_request_tx_bufs(struct uhd_dpdk_socket *sock, struct rte_mbuf **bufs, +                             unsigned int num_bufs) +{ +    if (!sock || !bufs || !num_bufs) { +        return -EINVAL; +    } +    *bufs = NULL; + +    if (!sock->tx_ring) +        return -EINVAL; + +    unsigned int num_tx = rte_ring_count(sock->rx_ring); +    num_tx = (num_tx < num_bufs) ? num_tx : num_bufs; +    if (rte_ring_dequeue_bulk(sock->rx_ring, (void **) bufs, num_tx, NULL) == 0) +        return -ENOENT; +    sock->tx_buf_count += num_tx; +    return num_tx; +} + +int uhd_dpdk_send(struct uhd_dpdk_socket *sock, struct rte_mbuf **bufs, +                  unsigned int num_bufs) +{ +    if (!sock || !bufs || !num_bufs) +        return -EINVAL; +    if (!sock->tx_ring) +        return -EINVAL; +    unsigned int num_tx = rte_ring_free_count(sock->tx_ring); +    num_tx = (num_tx < num_bufs) ? num_tx : num_bufs; +    switch (sock->sock_type) { +    case UHD_DPDK_SOCK_UDP: +        for (unsigned int i = 0; i < num_tx; i++) { +            uhd_dpdk_udp_prep(sock, bufs[i]); +        } +        break; +    default: +        RTE_LOG(ERR, USER1, "%s: Unsupported sock type\n", __func__); +        return -EINVAL; +    } +    int status = rte_ring_enqueue_bulk(sock->tx_ring, (void **) bufs, num_tx, NULL); +    if (status == 0) { +        RTE_LOG(ERR, USER1, "Invalid shared usage of TX ring detected\n"); +        return status; +    } +    sock->tx_buf_count -= num_tx; +    return num_tx; +} + +/* + * TODO: + *     Add blocking calls with timeout + */ +int uhd_dpdk_recv(struct uhd_dpdk_socket *sock, struct rte_mbuf **bufs, +                  unsigned int num_bufs, unsigned int timeout) +{ +    if (!sock || !bufs || !num_bufs) +        return -EINVAL; +    if (!sock->rx_ring) +        return -EINVAL; +    unsigned int num_rx = rte_ring_count(sock->rx_ring); +    num_rx = (num_rx < num_bufs) ? num_rx : num_bufs; +    if (num_rx) { +        unsigned int avail = 0; +        unsigned int status = rte_ring_dequeue_bulk(sock->rx_ring, +                                    (void **) bufs, num_rx, &avail); +        if (status == 0) { +            RTE_LOG(ERR, USER1, "Invalid shared usage of RX ring detected\n"); +            RTE_LOG(ERR, USER1, "Requested %u, but %u available\n", +                                 num_rx, avail); +            return -ENOENT; +        } +    } +    return num_rx; +} + +void uhd_dpdk_free_buf(struct rte_mbuf *buf) +{ +    rte_pktmbuf_free(buf); +} + +void * uhd_dpdk_buf_to_data(struct uhd_dpdk_socket *sock, struct rte_mbuf *buf) +{ +    if (!sock || !buf) +        return NULL; + +    /* TODO: Support for more types? */ +    switch (sock->sock_type) { +    case UHD_DPDK_SOCK_UDP: +        return rte_pktmbuf_mtod_offset(buf, void *, sizeof(struct ether_hdr) + +                                                    sizeof(struct ipv4_hdr) + +                                                    sizeof(struct udp_hdr)); +    default: +        return NULL; +    } +} + + +int uhd_dpdk_get_len(struct uhd_dpdk_socket *sock, struct rte_mbuf *buf) +{ +    if (!sock || !buf) +        return -EINVAL; + +    if (sock->sock_type != UHD_DPDK_SOCK_UDP) +        return -EINVAL; + +    struct udp_hdr *hdr = (struct udp_hdr *) ((uint8_t *) uhd_dpdk_buf_to_data(sock, buf) - sizeof(struct udp_hdr)); +    if (!hdr) +        return -EINVAL; + +    /* Report dgram length - header */ +    return ntohs(hdr->dgram_len) - 8; +} + +int uhd_dpdk_get_src_ipv4(struct uhd_dpdk_socket *sock, struct rte_mbuf *buf, +                          uint32_t *ipv4_addr) +{ +    if (!sock || !buf || !ipv4_addr) +        return -EINVAL; + +    if (sock->sock_type != UHD_DPDK_SOCK_UDP) +        return -EINVAL; + +    struct ipv4_hdr *hdr = rte_pktmbuf_mtod_offset(buf, struct ipv4_hdr *, +                                                   sizeof(struct ether_hdr)); + +    *ipv4_addr = hdr->src_addr; +    return 0; +} + +int uhd_dpdk_get_drop_count(struct uhd_dpdk_socket *sock, uint32_t *count) +{ +    if (!sock) +        return -EINVAL; +    if (sock->sock_type != UHD_DPDK_SOCK_UDP) +        return -EINVAL; +    if (!sock->priv) +        return -ENODEV; + +    struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) sock->priv; +    *count = pdata->dropped_pkts; +    return 0; +} diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.h b/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.h new file mode 100644 index 000000000..c07c1913a --- /dev/null +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.h @@ -0,0 +1,15 @@ +// +// Copyright 2018 Ettus Research, a National Instruments Company +// +// SPDX-License-Identifier: GPL-3.0-or-later +// +#ifndef _UHD_DPDK_FOPS_H_ +#define _UHD_DPDK_FOPS_H_ + +#include "uhd_dpdk_ctx.h" + +int _uhd_dpdk_config_req_compl(struct uhd_dpdk_config_req *req, int retval); + +int _uhd_dpdk_sock_setup(struct uhd_dpdk_config_req *req); +int _uhd_dpdk_sock_release(struct uhd_dpdk_config_req *req); +#endif /* _UHD_DPDK_FOPS_H_ */ diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c new file mode 100644 index 000000000..26cfd43e1 --- /dev/null +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c @@ -0,0 +1,456 @@ +// +// Copyright 2018 Ettus Research, a National Instruments Company +// +// SPDX-License-Identifier: GPL-3.0-or-later +// +#include "uhd_dpdk_fops.h" +#include "uhd_dpdk_udp.h" +#include "uhd_dpdk_driver.h" +#include <rte_ring.h> +#include <rte_malloc.h> +#include <unistd.h> +#include <sys/syscall.h> +#include <arpa/inet.h> + +/************************************************ + * I/O thread ONLY + */ + +static int _alloc_txq(struct uhd_dpdk_port *port, pid_t tid, struct uhd_dpdk_tx_queue **queue) +{ +    *queue = NULL; +    struct uhd_dpdk_tx_queue *q = rte_zmalloc(NULL, sizeof(*q), 0); +    if (!q) { +        RTE_LOG(ERR, USER1, "%s: Cannot allocate TX queue\n", __func__); +        return -ENOMEM; +    } +    q->tid = tid; +    LIST_INIT(&q->tx_list); + +    char name[32]; +    snprintf(name, sizeof(name), "tx_ring_udp_%u.%u", port->id, tid); +    q->queue = rte_ring_create( +                        name, +                        UHD_DPDK_TXQ_SIZE, +                        rte_socket_id(), +                        RING_F_SC_DEQ | RING_F_SP_ENQ +                    ); +    snprintf(name, sizeof(name), "buffer_ring_udp_%u.%u", port->id, tid); +    q->freebufs = rte_ring_create( +                        name, +                        UHD_DPDK_TXQ_SIZE, +                        rte_socket_id(), +                        RING_F_SC_DEQ | RING_F_SP_ENQ +                    ); +    /* Set up retry queue */ +    snprintf(name, sizeof(name), "retry_queue_%u", port->id); +    q->retry_queue = rte_ring_create( +                               name, +                               UHD_DPDK_TXQ_SIZE, +                               rte_socket_id(), +                               RING_F_SC_DEQ | RING_F_SP_ENQ +                            ); + +    if (!q->queue || !q->freebufs || !q->retry_queue) { +        RTE_LOG(ERR, USER1, "%s: Cannot allocate TX rings\n", __func__); +        if (q->queue) +            rte_ring_free(q->queue); +        if (q->freebufs) +            rte_ring_free(q->freebufs); +        if (q->retry_queue) +            rte_ring_free(q->retry_queue); +        rte_free(q); +        return -ENOMEM; +    } +    struct rte_mbuf *bufs[UHD_DPDK_TXQ_SIZE]; +    unsigned int num_bufs = rte_ring_free_count(q->freebufs); +    int buf_stat = rte_pktmbuf_alloc_bulk(port->parent->tx_pktbuf_pool, bufs, num_bufs); +    if (buf_stat) { +        rte_ring_free(q->freebufs); +        rte_ring_free(q->queue); +        rte_ring_free(q->retry_queue); +        rte_free(q); +        RTE_LOG(ERR, USER1, "%s: Cannot allocate packet buffers\n", __func__); +        return -ENOENT; +    } +    unsigned int enqd = rte_ring_enqueue_bulk(q->freebufs, (void **) bufs, num_bufs, NULL); +    if (enqd != num_bufs) { +        RTE_LOG(ERR, USER1, "%s: Cannot enqueue freebufs\n", __func__); +    } +    LIST_INSERT_HEAD(&port->txq_list, q, entry); +    *queue = q; +    return 0; +} + +/* Finish setting up UDP socket (unless ARP needs to be done) + * Not multi-thread safe! + * This call should only be used by the thread servicing the port + * In addition, the code below assumes simplex sockets and unique receive ports + * FIXME: May need some way to help clean up abandoned socket requests (refcnt check...) + */ +int _uhd_dpdk_udp_setup(struct uhd_dpdk_config_req *req) +{ +    int retval = 0; +    struct uhd_dpdk_socket *sock = req->sock; +    struct uhd_dpdk_udp_priv *pdata = sock->priv; +    struct uhd_dpdk_port *port = req->sock->port; + +    struct uhd_dpdk_ipv4_5tuple ht_key = { +        .sock_type = UHD_DPDK_SOCK_UDP, +        .src_ip = 0, +        .src_port = 0, +        .dst_ip = 0, +        .dst_port = pdata->dst_port +    }; + +    /* Are we doing RX? */ +    if (sock->rx_ring) { +        /* Add to rx table */ +        if (pdata->dst_port == 0) { +            /* Assign unused one in a very slow fashion */ +            for (uint16_t i = 1; i > 0; i++) { +                ht_key.dst_port = htons(i); +                if (rte_hash_lookup(port->rx_table, &ht_key) == -ENOENT) { +                    pdata->dst_port = htons(i); +                    break; +                } +            } +        } + +        /* Is the port STILL invalid? */ +        if (pdata->dst_port == 0) { +            RTE_LOG(ERR, USER1, "%s: No available UDP ports\n", __func__); +            _uhd_dpdk_config_req_compl(req, -EADDRINUSE); +            return -EADDRINUSE; +        } + +        ht_key.dst_port = pdata->dst_port; +        if (rte_hash_lookup(port->rx_table, &ht_key) > 0) { +            RTE_LOG(ERR, USER1, "%s: Cannot add to RX table\n", __func__); +            _uhd_dpdk_config_req_compl(req, -EADDRINUSE); +            return -EADDRINUSE; +        } + +        char name[32]; +        snprintf(name, sizeof(name), "rx_ring_udp_%u.%u", port->id, ntohs(pdata->dst_port)); +        sock->rx_ring = rte_ring_create( +                            name, +                            UHD_DPDK_RXQ_SIZE, +                            rte_socket_id(), +                            RING_F_SC_DEQ | RING_F_SP_ENQ +                        ); +        if (!sock->rx_ring) { +            RTE_LOG(ERR, USER1, "%s: Cannot allocate RX ring\n", __func__); +            _uhd_dpdk_config_req_compl(req, -ENOMEM); +            return -ENOMEM; +        } +        retval = rte_hash_add_key_data(port->rx_table, &ht_key, sock); +        if (retval != 0) { +            RTE_LOG(WARNING, TABLE, "Could not add new RX socket to port %d: %d\n", port->id, retval); +            rte_ring_free(sock->rx_ring); +            _uhd_dpdk_config_req_compl(req, retval); +            return retval; +        } +        _uhd_dpdk_config_req_compl(req, 0); +    } + +    /* Are we doing TX? */ +    if (sock->tx_ring) { +        sock->tx_ring = NULL; +        struct uhd_dpdk_tx_queue *q = NULL; +        LIST_FOREACH(q, &port->txq_list, entry) { +            if (q->tid == sock->tid) { +                LIST_INSERT_HEAD(&q->tx_list, sock, tx_entry); +                sock->tx_ring = q->queue; +                sock->rx_ring = q->freebufs; +                break; +            } +        } +        if (!sock->tx_ring) { +            retval = _alloc_txq(port, sock->tid, &q); +            if (retval) { +                _uhd_dpdk_config_req_compl(req, retval); +                return retval; +            } +            sock->tx_ring = q->queue; +            sock->rx_ring = q->freebufs; +        } +        /* If a broadcast type, just finish setup and return */ +        if (is_broadcast(port, pdata->dst_ipv4_addr)) { +            LIST_INSERT_HEAD(&q->tx_list, sock, tx_entry); +            _uhd_dpdk_config_req_compl(req, 0); +            return 0; +        } +        /* Otherwise... Check for entry in ARP table */ +        struct uhd_dpdk_arp_entry *entry = NULL; +        int arp_table_stat = rte_hash_lookup_data(port->arp_table, &pdata->dst_ipv4_addr, (void **) &entry); +        if (entry) { +            /* Check for null entry */ +            if ((entry->mac_addr.addr_bytes[0] == 0xFF) && +                (entry->mac_addr.addr_bytes[1] == 0xFF) && +                (entry->mac_addr.addr_bytes[2] == 0xFF) && +                (entry->mac_addr.addr_bytes[3] == 0xFF) && +                (entry->mac_addr.addr_bytes[4] == 0xFF) && +                (entry->mac_addr.addr_bytes[5] == 0xFF)) { +                arp_table_stat = -ENOENT; +            } +        } else { +            /* No entry -> Add null entry */ +            entry = rte_zmalloc(NULL, sizeof(*entry), 0); +            if (!entry) { +                RTE_LOG(ERR, USER1, "%s: Cannot allocate ARP entry\n", __func__); +                _uhd_dpdk_config_req_compl(req, -ENOMEM); +                return -ENOMEM; +            } +            memset(entry->mac_addr.addr_bytes, 0xFF, ETHER_ADDR_LEN); +            LIST_INIT(&entry->pending_list); + +            if (rte_hash_add_key_data(port->arp_table, &pdata->dst_ipv4_addr, entry) < 0) { +                rte_free(entry); +                RTE_LOG(ERR, USER1, "%s: Cannot add entry to ARP table\n", __func__); +                _uhd_dpdk_config_req_compl(req, -ENOMEM); +                return -ENOMEM; +            } +        } + +        /* Was there a valid address? */ +        if (arp_table_stat == -ENOENT) { +            /* Get valid address and have requestor continue waiting */ +            int arp_stat = 0; +            do { /* Keep trying to send request if no descriptor */ +                arp_stat = _uhd_dpdk_arp_request(port, pdata->dst_ipv4_addr); +            } while (arp_stat == -EAGAIN); + +            if (arp_stat) { +                /* Config request errors out */ +                RTE_LOG(ERR, USER1, "%s: Cannot make ARP request\n", __func__); +                _uhd_dpdk_config_req_compl(req, arp_stat); +                return arp_stat; +            } +            /* Append req to pending list. Will signal later. */ +            LIST_INSERT_HEAD(&entry->pending_list, req, entry); +            LIST_INSERT_HEAD(&q->tx_list, sock, tx_entry); +        } else { +            /* We have a valid address. All good. */ +            LIST_INSERT_HEAD(&q->tx_list, sock, tx_entry); +            _uhd_dpdk_config_req_compl(req, 0); +        } +    } +    return 0; +} + +int _uhd_dpdk_udp_release(struct uhd_dpdk_config_req *req) +{ +    struct uhd_dpdk_socket *sock = req->sock; +    struct uhd_dpdk_port *port = req->sock->port; +    struct uhd_dpdk_config_req *conf_req = NULL; +    struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) sock->priv; +    if (sock->tx_ring) { +        /* Remove from tx_list */ +        LIST_REMOVE(sock, tx_entry); +        /* Check for entry in ARP table */ +        struct uhd_dpdk_arp_entry *entry = NULL; +        rte_hash_lookup_data(port->arp_table, &pdata->dst_ipv4_addr, (void **) &entry); +        if (entry) { +            LIST_FOREACH(conf_req, &entry->pending_list, entry) { +                if (conf_req->sock == sock) { +                    LIST_REMOVE(conf_req, entry); +                    break; +                } +            } +        } + +        /* Add outstanding buffers back to TX queue's freebufs */ +        struct rte_mbuf *freebufs[UHD_DPDK_TXQ_SIZE]; +        int status = rte_pktmbuf_alloc_bulk(port->parent->tx_pktbuf_pool, freebufs, sock->tx_buf_count); +        if (status) { +            RTE_LOG(ERR, USER1, "%d %s: Could not restore %u TX buffers in bulk!\n", status, __func__, sock->tx_buf_count); +        } + +        unsigned int enqd = rte_ring_enqueue_bulk(sock->rx_ring, (void **) freebufs, sock->tx_buf_count, NULL); +        if (enqd != (unsigned int) sock->tx_buf_count) { +            RTE_LOG(ERR, USER1, "%s: Could not enqueue TX buffers!\n", __func__); +            return status; +        } +    } else if (sock->rx_ring) { +        struct uhd_dpdk_ipv4_5tuple ht_key = { +            .sock_type = UHD_DPDK_SOCK_UDP, +            .src_ip = 0, +            .src_port = 0, +            .dst_ip = 0, +            .dst_port = pdata->dst_port +        }; +        rte_hash_del_key(port->rx_table, &ht_key); +        struct rte_mbuf *mbuf = NULL; +        while (rte_ring_dequeue(sock->rx_ring, (void **) &mbuf) == 0) { +            rte_pktmbuf_free(mbuf); +        } +        rte_ring_free(sock->rx_ring); +    } + +    _uhd_dpdk_config_req_compl(req, 0); +    return 0; +} + +/* Configure a socket for UDP + * TODO: Make sure EVERYTHING is configured in socket + * FIXME: Make all of this part of the user thread, except hash table interaction + * and list handling + */ +void uhd_dpdk_udp_open(struct uhd_dpdk_config_req *req, +                       struct uhd_dpdk_sockarg_udp *arg) +{ +    if (!req) +        return; + +    if (!arg) { +        req->retval = -EINVAL; +        return; +    } + +    struct uhd_dpdk_socket *sock = req->sock; +    pid_t tid = syscall(__NR_gettid); +    sock->tid = tid; + +    /* Create private data */ +    struct uhd_dpdk_udp_priv *data = (struct uhd_dpdk_udp_priv *) rte_zmalloc(NULL, sizeof(*data), 0); +    if (!data) { +        req->retval = -ENOMEM; +        return; +    } +    sock->priv = data; + +    data->dst_ipv4_addr = arg->dst_addr; +    if (arg->is_tx) { +        data->src_port = arg->local_port; +        data->dst_port = arg->remote_port; +        sock->tx_ring = (struct rte_ring *) sock; +    } else { +        data->src_port = arg->remote_port; +        data->dst_port = arg->local_port; +        sock->rx_ring = (struct rte_ring *) sock; +    } + +    /* TODO: Add support for I/O thread calling (skip locking and sleep) */ +    /* Add to port's config queue */ +    pthread_mutex_lock(&req->mutex); +    if (rte_ring_enqueue(req->sock->port->parent->sock_req_ring, req)) { +        pthread_mutex_unlock(&req->mutex); +        rte_free(data); +        req->retval = -ENOSPC; +        return; +    } +    struct timespec timeout; +    clock_gettime(CLOCK_MONOTONIC, &timeout); +    timeout.tv_sec += 1; +    pthread_cond_timedwait(&req->cond, &req->mutex, &timeout); +    pthread_mutex_unlock(&req->mutex); + +    if (req->retval) +        rte_free(data); +} + +void uhd_dpdk_udp_close(struct uhd_dpdk_config_req *req) +{ +    if (!req) +        return; + +    pthread_mutex_lock(&req->mutex); +    if (rte_ring_enqueue(req->sock->port->parent->sock_req_ring, req)) { +        pthread_mutex_unlock(&req->mutex); +        rte_free(req->sock->priv); +        req->retval = -ENOSPC; +        return; +    } +    struct timespec timeout; +    clock_gettime(CLOCK_MONOTONIC, &timeout); +    timeout.tv_sec += 1; +    pthread_cond_timedwait(&req->cond, &req->mutex, &timeout); +    pthread_mutex_unlock(&req->mutex); +    rte_free(req->sock->priv); +} + +/* + * Note: I/O thread will fill in destination MAC address (doesn't happen here) + */ +static void uhd_dpdk_ipv4_prep(struct uhd_dpdk_port *port, +                                     struct rte_mbuf *mbuf, +                                     uint32_t dst_ipv4_addr, +                                     uint8_t proto_id, +                                     uint32_t payload_len) +{ +    struct ether_hdr *eth_hdr = rte_pktmbuf_mtod(mbuf, struct ether_hdr *); +    struct ipv4_hdr *ip_hdr = (struct ipv4_hdr *) ð_hdr[1]; + +    ether_addr_copy(&port->mac_addr, ð_hdr->s_addr); +    eth_hdr->ether_type = rte_cpu_to_be_16(ETHER_TYPE_IPv4); + +    ip_hdr->version_ihl = 0x40 | 5; +    ip_hdr->type_of_service = 0; +    ip_hdr->total_length = rte_cpu_to_be_16(20 + payload_len); +    ip_hdr->packet_id = 0; +    ip_hdr->fragment_offset = rte_cpu_to_be_16(IPV4_HDR_DF_FLAG); +    ip_hdr->time_to_live = 64; +    ip_hdr->next_proto_id = proto_id; +    ip_hdr->hdr_checksum = 0; /* FIXME: Assuming hardware can offload */ +    ip_hdr->src_addr = port->ipv4_addr; +    ip_hdr->dst_addr = dst_ipv4_addr; + +    mbuf->pkt_len = sizeof(struct ether_hdr) + sizeof(struct ipv4_hdr) + payload_len; +    mbuf->data_len = sizeof(struct ether_hdr) + sizeof(struct ipv4_hdr) + payload_len; +} + +int uhd_dpdk_udp_prep(struct uhd_dpdk_socket *sock, +                      struct rte_mbuf *mbuf) +{ +    struct ether_hdr *eth_hdr; +    struct ipv4_hdr *ip_hdr; +    struct udp_hdr *tx_hdr; +    struct uhd_dpdk_port *port = sock->port; +    struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) sock->priv; + +    if (unlikely(mbuf == NULL || pdata == NULL || port == NULL)) +        return -EINVAL; + +    uint32_t udp_data_len = mbuf->data_len; +    uhd_dpdk_ipv4_prep(port, +                       mbuf, +                       pdata->dst_ipv4_addr, +                       0x11, +                       8 + udp_data_len); + +    eth_hdr = rte_pktmbuf_mtod(mbuf, struct ether_hdr *); +    ip_hdr = (struct ipv4_hdr *) ð_hdr[1]; +    tx_hdr = (struct udp_hdr *) &ip_hdr[1]; + +    tx_hdr->src_port = pdata->src_port; +    tx_hdr->dst_port = pdata->dst_port; +    tx_hdr->dgram_len = rte_cpu_to_be_16(8 + udp_data_len); +    tx_hdr->dgram_cksum = 0; + +    return 0; +} + +int uhd_dpdk_udp_get_info(struct uhd_dpdk_socket *sock, +                          struct uhd_dpdk_sockarg_udp *sockarg) +{ +    if (unlikely(sock == NULL || sockarg == NULL)) +        return -EINVAL; +    if (sock->sock_type != UHD_DPDK_SOCK_UDP) +        return -EINVAL; + +    struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) sock->priv; +        if (sock->tx_ring) { +        sockarg->is_tx = true; +        sockarg->local_port = pdata->src_port; +        sockarg->remote_port = pdata->dst_port; +        sockarg->dst_addr = pdata->dst_ipv4_addr; +    } else { +        sockarg->is_tx = false; +        sockarg->local_port = pdata->dst_port; +        sockarg->remote_port = pdata->src_port; +        sockarg->dst_addr = 0; +    } +    return 0; +} + diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h new file mode 100644 index 000000000..651ae144e --- /dev/null +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h @@ -0,0 +1,30 @@ +// +// Copyright 2018 Ettus Research, a National Instruments Company +// +// SPDX-License-Identifier: GPL-3.0-or-later +// +#ifndef _UHD_DPDK_UDP_H_ +#define _UHD_DPDK_UDP_H_ + +#include "uhd_dpdk_ctx.h" +#include <rte_udp.h> + +struct uhd_dpdk_udp_priv { +    uint16_t src_port; +    uint16_t dst_port; +    uint32_t dst_ipv4_addr; +    uint32_t dropped_pkts; +    /* TODO: Cache destination address ptr to avoid ARP table lookup cost? */ +    //struct uhd_dpdk_arp_entry *arp_entry; +}; + +int _uhd_dpdk_udp_setup(struct uhd_dpdk_config_req *req); +int _uhd_dpdk_udp_release(struct uhd_dpdk_config_req *req); + +void uhd_dpdk_udp_open(struct uhd_dpdk_config_req *req, +                       struct uhd_dpdk_sockarg_udp *arg); +void uhd_dpdk_udp_close(struct uhd_dpdk_config_req *req); + +int uhd_dpdk_udp_prep(struct uhd_dpdk_socket *sock, +                      struct rte_mbuf *mbuf); +#endif /* _UHD_DPDK_UDP_H_ */  | 
