aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c')
-rw-r--r--host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c158
1 files changed, 99 insertions, 59 deletions
diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c
index 26cfd43e1..6ea77b930 100644
--- a/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c
+++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c
@@ -6,17 +6,20 @@
#include "uhd_dpdk_fops.h"
#include "uhd_dpdk_udp.h"
#include "uhd_dpdk_driver.h"
+#include "uhd_dpdk_wait.h"
#include <rte_ring.h>
#include <rte_malloc.h>
#include <unistd.h>
#include <sys/syscall.h>
#include <arpa/inet.h>
+#define MAX_UDP_PORT 65535
+
/************************************************
* I/O thread ONLY
*/
-static int _alloc_txq(struct uhd_dpdk_port *port, pid_t tid, struct uhd_dpdk_tx_queue **queue)
+static int _alloc_txq(struct uhd_dpdk_port *port, pthread_t tid, struct uhd_dpdk_tx_queue **queue)
{
*queue = NULL;
struct uhd_dpdk_tx_queue *q = rte_zmalloc(NULL, sizeof(*q), 0);
@@ -108,7 +111,7 @@ int _uhd_dpdk_udp_setup(struct uhd_dpdk_config_req *req)
/* Add to rx table */
if (pdata->dst_port == 0) {
/* Assign unused one in a very slow fashion */
- for (uint16_t i = 1; i > 0; i++) {
+ for (uint16_t i = MAX_UDP_PORT; i > 0; i--) {
ht_key.dst_port = htons(i);
if (rte_hash_lookup(port->rx_table, &ht_key) == -ENOENT) {
pdata->dst_port = htons(i);
@@ -144,9 +147,22 @@ int _uhd_dpdk_udp_setup(struct uhd_dpdk_config_req *req)
_uhd_dpdk_config_req_compl(req, -ENOMEM);
return -ENOMEM;
}
- retval = rte_hash_add_key_data(port->rx_table, &ht_key, sock);
+
+ struct uhd_dpdk_rx_entry *entry = (struct uhd_dpdk_rx_entry *)
+ rte_zmalloc(NULL, sizeof(*entry), 0);
+ if (!entry) {
+ rte_ring_free(sock->rx_ring);
+ RTE_LOG(ERR, USER1, "%s: Cannot create RX entry\n", __func__);
+ _uhd_dpdk_config_req_compl(req, -ENOMEM);
+ return -ENOMEM;
+ }
+ entry->sock = sock;
+ entry->waiter = NULL;
+
+ retval = rte_hash_add_key_data(port->rx_table, &ht_key, entry);
if (retval != 0) {
RTE_LOG(WARNING, TABLE, "Could not add new RX socket to port %d: %d\n", port->id, retval);
+ rte_free(entry);
rte_ring_free(sock->rx_ring);
_uhd_dpdk_config_req_compl(req, retval);
return retval;
@@ -155,25 +171,25 @@ int _uhd_dpdk_udp_setup(struct uhd_dpdk_config_req *req)
}
/* Are we doing TX? */
- if (sock->tx_ring) {
- sock->tx_ring = NULL;
+ if (sock->tx_queue) {
+ sock->tx_queue = NULL;
struct uhd_dpdk_tx_queue *q = NULL;
- LIST_FOREACH(q, &port->txq_list, entry) {
- if (q->tid == sock->tid) {
- LIST_INSERT_HEAD(&q->tx_list, sock, tx_entry);
- sock->tx_ring = q->queue;
- sock->rx_ring = q->freebufs;
- break;
- }
- }
- if (!sock->tx_ring) {
+ // FIXME Not sharing txq across all thread's sockets for now
+ //LIST_FOREACH(q, &port->txq_list, entry) {
+ // if (pthread_equal(q->tid, sock->tid)) {
+ // LIST_INSERT_HEAD(&q->tx_list, sock, tx_entry);
+ // sock->tx_ring = q->queue;
+ // sock->rx_ring = q->freebufs;
+ // break;
+ // }
+ //}
+ if (!sock->tx_queue) {
retval = _alloc_txq(port, sock->tid, &q);
if (retval) {
_uhd_dpdk_config_req_compl(req, retval);
return retval;
}
- sock->tx_ring = q->queue;
- sock->rx_ring = q->freebufs;
+ sock->tx_queue = q;
}
/* If a broadcast type, just finish setup and return */
if (is_broadcast(port, pdata->dst_ipv4_addr)) {
@@ -242,10 +258,23 @@ int _uhd_dpdk_udp_setup(struct uhd_dpdk_config_req *req)
int _uhd_dpdk_udp_release(struct uhd_dpdk_config_req *req)
{
struct uhd_dpdk_socket *sock = req->sock;
+ if (req->sock == NULL) {
+ RTE_LOG(ERR, USER1, "%s: no sock in req\n", __func__);
+ return -EINVAL;
+ }
struct uhd_dpdk_port *port = req->sock->port;
struct uhd_dpdk_config_req *conf_req = NULL;
struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) sock->priv;
- if (sock->tx_ring) {
+ if (pdata == NULL) {
+ RTE_LOG(ERR, USER1, "%s: no pdata in sock\n", __func__);
+ return -EINVAL;
+ }
+ if (sock->tx_queue) {
+ // FIXME not sharing buffers anymore
+ LIST_REMOVE(sock->tx_queue, entry);
+ rte_ring_free(sock->tx_queue->queue);
+ rte_ring_free(sock->tx_queue->retry_queue);
+
/* Remove from tx_list */
LIST_REMOVE(sock, tx_entry);
/* Check for entry in ARP table */
@@ -260,18 +289,32 @@ int _uhd_dpdk_udp_release(struct uhd_dpdk_config_req *req)
}
}
- /* Add outstanding buffers back to TX queue's freebufs */
- struct rte_mbuf *freebufs[UHD_DPDK_TXQ_SIZE];
- int status = rte_pktmbuf_alloc_bulk(port->parent->tx_pktbuf_pool, freebufs, sock->tx_buf_count);
- if (status) {
- RTE_LOG(ERR, USER1, "%d %s: Could not restore %u TX buffers in bulk!\n", status, __func__, sock->tx_buf_count);
+ // FIXME not sharing buffers anymore
+ // Remove outstanding buffers from TX queue's freebufs */
+ unsigned int bufs = rte_ring_count(sock->tx_queue->freebufs);
+ for (unsigned int i = 0; i < bufs; i++) {
+ struct rte_mbuf *buf = NULL;
+ if (rte_ring_dequeue(sock->tx_queue->freebufs, (void **) &buf)) {
+ RTE_LOG(ERR, USER1, "%s: Could not dequeue freebufs\n", __func__);
+ } else if (buf) {
+ rte_pktmbuf_free(buf);
+ }
}
+ rte_ring_free(sock->tx_queue->freebufs);
+ rte_free(sock->tx_queue);
- unsigned int enqd = rte_ring_enqueue_bulk(sock->rx_ring, (void **) freebufs, sock->tx_buf_count, NULL);
- if (enqd != (unsigned int) sock->tx_buf_count) {
- RTE_LOG(ERR, USER1, "%s: Could not enqueue TX buffers!\n", __func__);
- return status;
- }
+ /* Add outstanding buffers back to TX queue's freebufs */
+ //struct rte_mbuf *freebufs[UHD_DPDK_TXQ_SIZE];
+ //int status = rte_pktmbuf_alloc_bulk(port->parent->tx_pktbuf_pool, freebufs, sock->tx_buf_count);
+ //if (status) {
+ // RTE_LOG(ERR, USER1, "%d %s: Could not restore %u TX buffers in bulk!\n", status, __func__, sock->tx_buf_count);
+ //}
+
+ //unsigned int enqd = rte_ring_enqueue_bulk(sock->rx_ring, (void **) freebufs, sock->tx_buf_count, NULL);
+ //if (enqd != (unsigned int) sock->tx_buf_count) {
+ // RTE_LOG(ERR, USER1, "%s: Could not enqueue TX buffers!\n", __func__);
+ // return status;
+ //}
} else if (sock->rx_ring) {
struct uhd_dpdk_ipv4_5tuple ht_key = {
.sock_type = UHD_DPDK_SOCK_UDP,
@@ -280,6 +323,13 @@ int _uhd_dpdk_udp_release(struct uhd_dpdk_config_req *req)
.dst_ip = 0,
.dst_port = pdata->dst_port
};
+ struct uhd_dpdk_rx_entry *entry = NULL;
+ rte_hash_lookup_data(port->rx_table, &ht_key, (void **) &entry);
+ if (entry) {
+ if (entry->waiter)
+ uhd_dpdk_waiter_put(entry->waiter);
+ rte_free(entry);
+ }
rte_hash_del_key(port->rx_table, &ht_key);
struct rte_mbuf *mbuf = NULL;
while (rte_ring_dequeue(sock->rx_ring, (void **) &mbuf) == 0) {
@@ -292,10 +342,21 @@ int _uhd_dpdk_udp_release(struct uhd_dpdk_config_req *req)
return 0;
}
+int _uhd_dpdk_udp_rx_key(struct uhd_dpdk_socket *sock,
+ struct uhd_dpdk_ipv4_5tuple *key)
+{
+ struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) sock->priv;
+ if (!pdata)
+ return -EINVAL;
+ key->sock_type = UHD_DPDK_SOCK_UDP;
+ key->src_ip = 0;
+ key->src_port = 0;
+ key->dst_ip = 0;
+ key->dst_port = pdata->dst_port;
+ return 0;
+}
+
/* Configure a socket for UDP
- * TODO: Make sure EVERYTHING is configured in socket
- * FIXME: Make all of this part of the user thread, except hash table interaction
- * and list handling
*/
void uhd_dpdk_udp_open(struct uhd_dpdk_config_req *req,
struct uhd_dpdk_sockarg_udp *arg)
@@ -309,8 +370,7 @@ void uhd_dpdk_udp_open(struct uhd_dpdk_config_req *req,
}
struct uhd_dpdk_socket *sock = req->sock;
- pid_t tid = syscall(__NR_gettid);
- sock->tid = tid;
+ sock->tid = pthread_self();
/* Create private data */
struct uhd_dpdk_udp_priv *data = (struct uhd_dpdk_udp_priv *) rte_zmalloc(NULL, sizeof(*data), 0);
@@ -324,7 +384,7 @@ void uhd_dpdk_udp_open(struct uhd_dpdk_config_req *req,
if (arg->is_tx) {
data->src_port = arg->local_port;
data->dst_port = arg->remote_port;
- sock->tx_ring = (struct rte_ring *) sock;
+ sock->tx_queue = (struct uhd_dpdk_tx_queue *) sock;
} else {
data->src_port = arg->remote_port;
data->dst_port = arg->local_port;
@@ -333,19 +393,10 @@ void uhd_dpdk_udp_open(struct uhd_dpdk_config_req *req,
/* TODO: Add support for I/O thread calling (skip locking and sleep) */
/* Add to port's config queue */
- pthread_mutex_lock(&req->mutex);
- if (rte_ring_enqueue(req->sock->port->parent->sock_req_ring, req)) {
- pthread_mutex_unlock(&req->mutex);
- rte_free(data);
- req->retval = -ENOSPC;
- return;
- }
- struct timespec timeout;
- clock_gettime(CLOCK_MONOTONIC, &timeout);
- timeout.tv_sec += 1;
- pthread_cond_timedwait(&req->cond, &req->mutex, &timeout);
- pthread_mutex_unlock(&req->mutex);
-
+ int status = uhd_dpdk_config_req_submit(req, -1, sock->port->parent);
+ if (status)
+ req->retval = status;
+
if (req->retval)
rte_free(data);
}
@@ -355,18 +406,7 @@ void uhd_dpdk_udp_close(struct uhd_dpdk_config_req *req)
if (!req)
return;
- pthread_mutex_lock(&req->mutex);
- if (rte_ring_enqueue(req->sock->port->parent->sock_req_ring, req)) {
- pthread_mutex_unlock(&req->mutex);
- rte_free(req->sock->priv);
- req->retval = -ENOSPC;
- return;
- }
- struct timespec timeout;
- clock_gettime(CLOCK_MONOTONIC, &timeout);
- timeout.tv_sec += 1;
- pthread_cond_timedwait(&req->cond, &req->mutex, &timeout);
- pthread_mutex_unlock(&req->mutex);
+ uhd_dpdk_config_req_submit(req, -1, req->sock->port->parent);
rte_free(req->sock->priv);
}
@@ -440,7 +480,7 @@ int uhd_dpdk_udp_get_info(struct uhd_dpdk_socket *sock,
return -EINVAL;
struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) sock->priv;
- if (sock->tx_ring) {
+ if (sock->tx_queue) {
sockarg->is_tx = true;
sockarg->local_port = pdata->src_port;
sockarg->remote_port = pdata->dst_port;