diff options
| -rwxr-xr-x | tools/gr-usrptest/apps/rx_settling_time.py | 249 | 
1 files changed, 249 insertions, 0 deletions
diff --git a/tools/gr-usrptest/apps/rx_settling_time.py b/tools/gr-usrptest/apps/rx_settling_time.py new file mode 100755 index 000000000..d58eeb9dc --- /dev/null +++ b/tools/gr-usrptest/apps/rx_settling_time.py @@ -0,0 +1,249 @@ +#!/usr/bin/env python +# +# Copyright 2018 Ettus Research, a National Instruments Company +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +""" +RX samples and apply settings as a timed command. Use this tool to analyze the +settling time of analog components as well as the accuracy of timed commands. +Typically, you will need to connect a tone or other signal generator to the +DUT's input. + +Example: This would receive several seconds of data from an X3x0 device, +tune to 1 GHz, and then bump the gain by 30 dB after a set amount of +time: + +$ rx_settling_time.py -a type=x300 -f 1e9 -g 0 --new-gain 30 --plot +""" + +from __future__ import print_function +import argparse +import numpy as np +from six import iteritems +import uhd + + +def parse_args(): +    """Parse the command line arguments""" +    parser = argparse.ArgumentParser( +        description=__doc__ +    ) +    parser.add_argument( +        "-a", "--args", default="", +        help="Device args (e.g., 'type=x300')") +    parser.add_argument( +        "--spec", +        help="Subdev spec (e.g. 'B:0')") +    parser.add_argument( +        "-d", "--duration", default=5.0, type=float, +        help="Total acquisition time") +    parser.add_argument( +        "--setup-delay", default=.5, type=float, +        help="Time before starting receive") +    parser.add_argument( +        "--set-delay", default=2.0, type=float, +        help="Time between starting to receive and changing settings") +    parser.add_argument( +        "--skip-time", default=0.0, type=float, +        help="Time to skip after starting to receive") +    parser.add_argument( +        "-o", "--output-file", type=str, +        help="Name of the output file (e.g., 'output.dat')") +    parser.add_argument( +        "-f", "--freq", type=float, required=True, +        help="Initial frequency") +    parser.add_argument( +        "-g", "--gain", type=float, default=20.0, +        help="Initial gain") +    parser.add_argument( +        "--new-freq", type=float, +        help="Frequency after set time") +    parser.add_argument( +        "--new-gain", +        help="Gain after set time") +    parser.add_argument( +        "-r", "--rate", default=1e6, type=float, +        help="Sampling rate (Hz)") +    parser.add_argument( +        "-c", "--channel", default=0, type=int, help="Channel on which to receive on") +    parser.add_argument( +        "-n", "--numpy", default=False, action="store_true", +        help="Save output file in NumPy format (default: No)") +    parser.add_argument( +        "--plot", default=False, action="store_true", +        help="Show nice pic") +    return parser.parse_args() + + + +def get_rx_streamer(usrp, chan): +    """ +    Return a streamer +    """ +    st_args = uhd.usrp.StreamArgs("fc32", "sc16") +    st_args.channels = [chan,] +    return usrp.get_rx_stream(st_args) + + +def apply_initial_settings(usrp, chan, rate, freq, gain): +    """ +    Apply initial settings for: +    - freq +    - gain +    - rate +    """ +    usrp.set_rx_rate(rate) +    tune_req = uhd.types.TuneRequest(freq) +    usrp.set_rx_freq(tune_req, chan) +    usrp.set_rx_gain(gain, chan) + + +def start_rx_stream(streamer, start_time): +    """ +    Kick off the RX streamer +    """ +    stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.start_cont) +    stream_cmd.stream_now = False +    stream_cmd.time_spec = start_time +    streamer.issue_stream_cmd(stream_cmd) + + +def load_commands(usrp, chan, cmd_time, **kwargs): +    """ +    Load the switching commands. +    """ +    usrp.set_command_time(cmd_time) +    kw_cb_map = { +        'freq': lambda freq: usrp.set_rx_freq(uhd.types.TuneRequest(float(freq)), chan), +        'gain': lambda gain: usrp.set_rx_gain(float(gain), chan), +    } +    for key, callback in iteritems(kw_cb_map): +        if kwargs.get(key) is not None: +            callback(kwargs[key]) +    usrp.clear_command_time() + + +def recv_samples(rx_streamer, total_num_samps, skip_samples): +    """ +    Run the receive loop and crop samples. +    """ +    metadata = uhd.types.RXMetadata() +    result = np.empty((1, total_num_samps), dtype=np.complex64) +    total_samps_recvd = 0 +    timeouts = 0 # This is a bit of a hack, until we can pass timeout values to +                 # Python +    max_timeouts = 20 +    buffer_samps = rx_streamer.get_max_num_samps() +    recv_buffer = np.zeros( +        (1, buffer_samps), dtype=np.complex64) +    while total_samps_recvd < total_num_samps: +        samps_recvd = rx_streamer.recv(recv_buffer, metadata) +        if metadata.error_code == uhd.types.RXMetadataErrorCode.timeout: +            timeouts += 1 +            if timeouts >= max_timeouts: +                print("[ERROR] Reached timeout threshold. Exiting.") +                return None +        elif metadata.error_code != uhd.types.RXMetadataErrorCode.none: +            print("[ERROR] " + metadata.strerror()) +            return None +        if samps_recvd: +            samps_recvd = min(total_num_samps - total_samps_recvd, samps_recvd) +            result[:, total_samps_recvd:total_samps_recvd + samps_recvd] = \ +                recv_buffer[:, 0:samps_recvd] +            total_samps_recvd += samps_recvd +    if skip_samples: +        print("Skipping {} samples.".format(skip_samples)) +    return result[0][skip_samples:] + + +def save_to_file(samps, filename, save_as_numpy): +    """ +    Save samples to binary file +    """ +    with open(filename, 'wb') as out_file: +        if save_as_numpy: +            np.save(out_file, samps, allow_pickle=False, fix_imports=False) +        else: +            samps.tofile(out_file) + +def plot_samps(samps, rate, set_offset): +    """ +    Show a nice piccie +    """ +    try: +        import pylab +    except ImportError: +        print("[ERROR] --plot requires pylab.") +        return +    ylim = max( +        max(np.abs(np.real(samps))), +        max(np.abs(np.imag(samps))), +    ) +    time_axis = np.arange(len(samps)) / rate - set_offset +    pylab.plot(time_axis, np.real(samps)) +    pylab.plot(time_axis, np.imag(samps)) +    pylab.ylim((-ylim, ylim)) +    pylab.grid(True) +    pylab.xlabel('Time offset [s]') +    pylab.ylabel('Amplitude') +    pylab.legend(('In-Phase', 'Quadrature')) +    pylab.title('Settling Time') +    pylab.show() + + +def main(): +    """Execute""" +    args = parse_args() +    usrp = uhd.usrp.MultiUSRP(args.args) +    if args.spec is not None: +        usrp.set_rx_subdev_spec(uhd.usrp.SubdevSpec(args.spec)) +    rx_streamer = get_rx_streamer(usrp, args.channel) +    total_num_samps = int(args.duration * args.rate) +    skip_samps = int(args.skip_time * args.rate) +    print("Total number of samples to acquire: {}".format(total_num_samps)) +    apply_initial_settings( +        usrp, +        args.channel, +        args.rate, +        args.freq, +        args.gain +    ) +    time_zero = usrp.get_time_now() +    print("Sending stream commands...") +    start_rx_stream( +        rx_streamer, +        time_zero+args.setup_delay, +    ) +    print("Preloading set commands...") +    load_commands( +        usrp=usrp, +        chan=args.channel, +        cmd_time=time_zero+args.setup_delay+args.set_delay, +        freq=args.new_freq, +        gain=args.new_gain, +    ) +    print("Starting receive...") +    samps = recv_samples(rx_streamer, total_num_samps, skip_samps) +    if samps is None: +        return False +    print("Received {} samples.".format(samps.size)) +    print("New settings are applied at sample index {}." +          .format(int((args.set_delay - args.skip_time) * args.rate))) +    if args.plot: +        plot_samps( +            samps, +            args.rate, +            args.set_delay - args.skip_time, +        ) +    if args.output_file: +        save_to_file( +            samps, +            args.output_file, +            args.numpy, +        ) +    return True + +if __name__ == "__main__": +    exit(not main()) +  | 
