aboutsummaryrefslogtreecommitdiffstats
path: root/tools/gr-usrptest/python/flowgraphs/selftest_fg.py
diff options
context:
space:
mode:
authorAndrej Rode <andrej.rode@ettus.com>2016-10-24 10:25:42 -0700
committerMartin Braun <martin.braun@ettus.com>2017-05-26 16:01:37 -0700
commit76e9e6393fe1d5a0ccc9e05deb431694921850ec (patch)
treeb7f1097a04fd7a7d0a423a353fbeba037746b782 /tools/gr-usrptest/python/flowgraphs/selftest_fg.py
parent06ff34eaa9e147b11d2698308fa91a5260fee2d2 (diff)
downloaduhd-76e9e6393fe1d5a0ccc9e05deb431694921850ec.tar.gz
uhd-76e9e6393fe1d5a0ccc9e05deb431694921850ec.tar.bz2
uhd-76e9e6393fe1d5a0ccc9e05deb431694921850ec.zip
gr-usrptest: Initial creation
- new OOT-blocks: phase_calc_ccf hier-block, measurement_sink_f - new python submodules: flowgraphs, functions, rts_tests - new apps: usrp_phasealignment.py - cmdline example for manual testing OOT-Blocks: - phase_calc_ccf takes two complex input streams and conjugate multiplys them and extracts the phase from the result and converts it to degree scale - measurement_sink_f: takes a float input stream and calculates average and stddev for a specified number of samples. Start of a measurement is invoked by a call of start_run() on the block. After a couple of runs average and stddev can be extracted. Python modules: - flowgrahps contains reconfigurable flowgraphs for different GNU Radio RF test cases - functions contains functions which are used in different apps/RTS scripts - rts_tests contains test cases which are meant to be executed from the RTS system. Depends on TinyDB, labview_automation Apps: - usrp_phasealignment.py is an example how to use the underlying flowgraph to measure phase differences. Commandline arguments of uhd_app can be used and several additional arguments can/have to be specified. Runs a phase difference measurement --runs number of times and averages phase difference over --duration seconds. Between measurements USRP sinks are retuned to random frequencies in daughterboard range. Results are displayed using motherboard serial and daughterboard serial
Diffstat (limited to 'tools/gr-usrptest/python/flowgraphs/selftest_fg.py')
-rw-r--r--tools/gr-usrptest/python/flowgraphs/selftest_fg.py122
1 files changed, 122 insertions, 0 deletions
diff --git a/tools/gr-usrptest/python/flowgraphs/selftest_fg.py b/tools/gr-usrptest/python/flowgraphs/selftest_fg.py
new file mode 100644
index 000000000..cdfc35e74
--- /dev/null
+++ b/tools/gr-usrptest/python/flowgraphs/selftest_fg.py
@@ -0,0 +1,122 @@
+#!/usr/bin/env python
+
+from gnuradio import blocks
+from gnuradio import gr
+from gnuradio import uhd
+from usrptest import phase_calc_ccf
+from gnuradio.uhd.uhd_app import UHDApp
+import numpy as np
+
+class selftest_fg(gr.top_block):
+
+ def __init__(self, freq, samp_rate, dphase, devices=list()):
+ gr.top_block.__init__(self, "Generate Signal extract phase")
+
+ ##################################################
+ # Variables
+ ##################################################
+ self.samp_rate = samp_rate
+ self.freq = 10e3
+ self.devices = devices
+ self.dphase = dphase
+ self.tx_gain = 50
+ self.rx_gain = 50
+ self.center_freq = freq
+ self.omega = 2*np.pi*self.freq
+ self.steps = np.arange(
+ self.samp_rate)*float(self.omega)/float(self.samp_rate)
+ self.reference = self.complex_sine(self.steps)
+ self.test_signal = self.complex_sine(self.steps+0.5*np.pi)
+ self.device_test = False
+
+ ##################################################
+ # Block dicts
+ ##################################################
+ self.rx_devices = dict()
+ self.tx_devices = dict()
+ self.sink_dict = dict()
+ self.phase_dict = dict()
+ self.reference_source = blocks.vector_source_c(self.reference)
+
+ if len(self.devices):
+ ##################################################
+ # Devices
+ ##################################################
+ self.device_test = True
+ #To reuse existing setup_usrp() command
+ for device in self.devices:
+ # Create and configure all devices
+ self.rx_devices[device] = uhd.usrp_source(
+ device, uhd.stream_args(
+ cpu_format="fc32", channel=range(1)))
+ self.tx_devices[device] = uhd.usrp_sink(
+ device, uhd.stream_args(
+ cpu_format="fc32", channel=range(1)))
+ self.rx_devices[device].set_samp_rate(self.samp_rate)
+ self.rx_devices[device].set_center_freq(self.center_freq, 0)
+ self.rx_devices[device].set_gain(self.rx_gain, 0)
+ self.tx_devices[device].set_samp_rate(self.samp_rate)
+ self.tx_devices[device].set_center_freq(self.center_freq, 0)
+ self.tx_devices[device].set_gain(self.tx_gain, 0)
+ self.sink_dict[device] = blocks.vector_sink_f()
+ self.phase_dict[device] = phase_calc_ccf(
+ self.samp_rate, self.freq)
+ for device in self.tx_devices.values():
+ self.connect((self.reference_source, 0), (device, 0))
+
+ for device_key in self.rx_devices.keys():
+ self.connect(
+ (self.rx_devices[device_key], 0), (self.phase_dict[device_key], 0))
+ self.connect((self.reference_source, 0),
+ (self.phase_dict[device_key], 1))
+ self.connect(
+ (self.phase_dict[device_key], 0), (self.sink_dict[device_key], 0))
+ # Debug options
+ # self.sink_list.append(blocks.vector_sink_c())
+ #self.connect((device, 0), (self.sink_list[-1], 0))
+ # self.sink_list.append(blocks.vector_sink_c())
+ #self.connect((self.reference_source, 0), (self.sink_list[-1], 0))
+ else:
+ ##################################################
+ # Blocks
+ ##################################################
+ self.result = blocks.vector_sink_f(1)
+ self.test_source = blocks.vector_source_c(self.test_signal)
+ self.block_phase_calc = phase_calc_ccf(
+ self.samp_rate, self.freq)
+
+ ##################################################
+ # Connections
+ ##################################################
+ self.connect((self.reference_source, 0), (self.block_phase_calc, 1))
+ self.connect((self.test_source, 0), (self.block_phase_calc, 0))
+ self.connect((self.block_phase_calc, 0), (self.result, 0))
+ def complex_sine(self, steps):
+ return np.exp(1j*steps)
+
+ def get_samp_rate(self):
+ return self.samp_rate
+
+ def set_samp_rate(self, samp_rate):
+ self.samp_rate = samp_rate
+
+ def run(self):
+ self.start()
+ self.wait()
+ if self.device_test:
+ data = dict()
+ for device_key in self.sink_dict.keys():
+ curr_data = self.sink_dict[device_key].data()
+ curr_data = curr_data[int(0.2*self.samp_rate):-int(0.2*self.samp_rate)]
+ phase_avg = np.average(curr_data)
+ if (np.max(curr_data) < phase_avg+self.dphase*0.5) and (np.min(curr_data) > phase_avg-self.dphase*0.5):
+ data[device_key] = phase_avg
+ else:
+ print("Error phase not settled")
+
+ #Debug
+ # plt.ylim(-1, 1)
+ # plt.xlim(self.samp_rate/2.), (self.samp_rate/2.)+1000)
+ #for key in data:
+ # plt.plot(data[key])
+ return data