diff options
| -rw-r--r-- | host/tests/dpdk_test.cpp | 140 | 
1 files changed, 100 insertions, 40 deletions
diff --git a/host/tests/dpdk_test.cpp b/host/tests/dpdk_test.cpp index 833ec0b7c..8a4e51af7 100644 --- a/host/tests/dpdk_test.cpp +++ b/host/tests/dpdk_test.cpp @@ -19,8 +19,11 @@  #include <sys/syscall.h>  #include "../transport/dpdk_zero_copy.hpp"  #include <boost/program_options.hpp> +#include <boost/regex.hpp>  #include <iostream> +static const boost::regex colons(":"); +  namespace po = boost::program_options;  namespace { @@ -39,6 +42,10 @@ struct dpdk_test_args {      std::string src_port;      std::string dst_ip;      std::string dst_port; +    pthread_cond_t *cond; +    pthread_mutex_t mutex; +    bool started; +    int cpu;  };  struct dpdk_test_stats { @@ -51,6 +58,7 @@ struct dpdk_test_stats {      uint64_t tx_xfer;  }; +  static void process_udp(int id, uint32_t *udp_data, struct dpdk_test_stats *stats)  {      if (udp_data[0] != stats[id].last_seqno + 1) { @@ -108,7 +116,7 @@ static void bench(uhd::transport::dpdk_zero_copy::sptr *stream, uint32_t nb_port       */      uint64_t total_received = 0;      uint32_t consec_no_rx = 0; -    while (total_received < 1000000 ) { //&& consec_no_rx < 10000) { +    while ((total_received / nb_ports) < 1000000 ) { //&& consec_no_rx < 10000) {          for (id = 0; id < nb_ports; id++) {              unsigned int nb_rx = 0;              uhd::transport::managed_recv_buffer::sptr bufs[BURST_SIZE]; @@ -188,18 +196,27 @@ static void bench(uhd::transport::dpdk_zero_copy::sptr *stream, uint32_t nb_port  } -void *prepare_and_bench_blocking(void *arg) +static inline void set_cpu(pthread_t t, int cpu)  { -    struct dpdk_test_args *args = (struct dpdk_test_args *) arg; -    pthread_t t = pthread_self();      cpu_set_t cpuset;      CPU_ZERO(&cpuset); -    /* FIXME: Make an argument to the test */ -    CPU_SET(4+args->portid, &cpuset); +    CPU_SET(cpu, &cpuset);      int status = pthread_setaffinity_np(t, sizeof(cpu_set_t), &cpuset);      if (status) {          perror("Could not set affinity"); +    } else { +        printf("Set CPU to %d\n", cpu);      } +} + +void *prepare_and_bench_blocking(void *arg) +{ +    struct dpdk_test_args *args = (struct dpdk_test_args *) arg; +    pthread_mutex_lock(&args->mutex); +    pthread_t t = pthread_self(); +    set_cpu(t, args->cpu); +    args->started = true; +    pthread_cond_wait(args->cond, &args->mutex);      auto &ctx = uhd::transport::uhd_dpdk_ctx::get();      uhd::transport::dpdk_zero_copy::sptr eth_data[1];      uhd::transport::zero_copy_xport_params buff_args; @@ -225,20 +242,20 @@ void *prepare_and_bench_blocking(void *arg)  void prepare_and_bench_polling(void)  {      auto &ctx = uhd::transport::uhd_dpdk_ctx::get(); -    struct dpdk_test_args bench_args[2] = { -        { -            .portid = 0, -            .src_port = "0xBEE7", -            .dst_ip = "192.168.0.4", -            .dst_port = "0xBEE7", -        }, -        { -            .portid = 1, -            .src_port = "0xBEE7", -            .dst_ip = "192.168.0.3", -            .dst_port = "0xBEE7", -        } -    }; +    struct dpdk_test_args bench_args[2]; +    bench_args[0].cond = NULL; +    bench_args[0].started = true; +    bench_args[0].portid = 0; +    bench_args[0].src_port = "0xBEE7"; +    bench_args[0].dst_ip = "192.168.0.4"; +    bench_args[0].dst_port = "0xBEE7"; +    bench_args[1].cond = NULL; +    bench_args[1].started = true; +    bench_args[1].portid = 1; +    bench_args[1].src_port = "0xBEE7"; +    bench_args[1].dst_ip = "192.168.0.3"; +    bench_args[1].dst_port = "0xBEE7"; +      uhd::transport::dpdk_zero_copy::sptr eth_data[NUM_PORTS];      uhd::transport::zero_copy_xport_params buff_args;      buff_args.recv_frame_size = 8000; @@ -263,14 +280,14 @@ void prepare_and_bench_polling(void)  int main(int argc, char **argv)  { -    int retval; -    std::string args, core_map; +    int retval, io0_cpu = 1, io1_cpu = 1, user0_cpu = 0, user1_cpu = 2; +    std::string args, cpusets;      po::options_description desc("Allowed options");      desc.add_options()          ("help", "help message")          ("args", po::value<std::string>(&args)->default_value(""), "UHD-DPDK args")          ("polling-mode", "Use polling mode (single thread on own core)") -        ("core-map", po::value<std::string>(&core_map)->default_value(""), "which core(s) to use (specify \"0\", \"1\", \"0,1\", etc)") +        ("cpusets", po::value<std::string>(&cpusets)->default_value(""), "which core(s) to use for a given thread (specify something like \"io0=1,io1=1,user0=0,user1=2\")")      ;      po::variables_map vm;      po::store(po::parse_command_line(argc, argv, desc), vm); @@ -281,11 +298,30 @@ int main(int argc, char **argv)         return 0;      } -    int port_thread_mapping[2] = {1, 1}; +    auto dpdk_args = uhd::device_addr_t(args); +    for (std::string &key : dpdk_args.keys()) { +        /* device_addr_t splits on commas, so we use colons and replace */ +        if (key == "corelist" || key == "coremap") { +            dpdk_args[key] = boost::regex_replace(dpdk_args[key], colons, ","); +        } +    } + +    auto cpuset_map = uhd::device_addr_t(cpusets); +    for (std::string &key : cpuset_map.keys()) { +        if (key == "io0") { +            io0_cpu = std::stoi(cpuset_map[key], NULL, 0); +        } else if (key == "io1") { +            io1_cpu = std::stoi(cpuset_map[key], NULL, 0); +        } else if (key == "user0") { +            user0_cpu = std::stoi(cpuset_map[key], NULL, 0); +        } else if (key == "user1") { +            user1_cpu = std::stoi(cpuset_map[key], NULL, 0); +        } +    } + +    int port_thread_mapping[2] = {io0_cpu, io1_cpu};      auto &ctx = uhd::transport::uhd_dpdk_ctx::get(); -    auto dpdk_args = uhd::dict<std::string, std::string>(); -    //dpdk_args.set("key", "val");      ctx.init(dpdk_args, 2, &port_thread_mapping[0], NUM_MBUFS, MBUF_CACHE_SIZE, 9000);      uint32_t eth_ip = htonl(0xc0a80003); @@ -305,24 +341,48 @@ int main(int argc, char **argv)      if (vm.count("polling-mode")) {          prepare_and_bench_polling();      } else { -        struct dpdk_test_args bench_args[2] = { -            { -                .portid = 0, -                .src_port = "0xBEE7", -                .dst_ip = "192.168.0.4", -                .dst_port = "0xBEE7", -            }, -            { -                .portid = 1, -                .src_port = "0xBEE7", -                .dst_ip = "192.168.0.3", -                .dst_port = "0xBEE7", -            } -        }; +        pthread_cond_t cond; +        pthread_cond_init(&cond, NULL); +        struct dpdk_test_args bench_args[2]; +        pthread_mutex_init(&bench_args[0].mutex, NULL); +        pthread_mutex_init(&bench_args[1].mutex, NULL); +        bench_args[0].cpu = user0_cpu; +        bench_args[0].cond = &cond; +        bench_args[0].started = false; +        bench_args[0].portid = 0; +        bench_args[0].src_port = "0xBEE7"; +        bench_args[0].dst_ip = "192.168.0.4"; +        bench_args[0].dst_port = "0xBEE7"; +        bench_args[1].cpu = user1_cpu; +        bench_args[1].cond = &cond; +        bench_args[1].started = false; +        bench_args[1].portid = 1; +        bench_args[1].src_port = "0xBEE7"; +        bench_args[1].dst_ip = "192.168.0.3"; +        bench_args[1].dst_port = "0xBEE7"; +          pthread_t threads[2];          pthread_create(&threads[0], NULL, prepare_and_bench_blocking, &bench_args[0]);          pthread_create(&threads[1], NULL, prepare_and_bench_blocking, &bench_args[1]); +        do { +           pthread_mutex_lock(&bench_args[0].mutex); +           if (bench_args[0].started) +               break; +           pthread_mutex_unlock(&bench_args[0].mutex); +        } while (true); +        pthread_mutex_unlock(&bench_args[0].mutex); + +        do { +           pthread_mutex_lock(&bench_args[1].mutex); +           if (bench_args[1].started) +               break; +           pthread_mutex_unlock(&bench_args[1].mutex); +        } while (true); +        pthread_mutex_unlock(&bench_args[1].mutex); + +        pthread_cond_broadcast(&cond); +          status = pthread_join(threads[0], (void **) &retval);          if (status) {              perror("Error while joining thread");  | 
