diff options
Diffstat (limited to 'fpga/usrp3/tools/scripts/launch_vivado.py')
-rwxr-xr-x | fpga/usrp3/tools/scripts/launch_vivado.py | 475 |
1 files changed, 475 insertions, 0 deletions
diff --git a/fpga/usrp3/tools/scripts/launch_vivado.py b/fpga/usrp3/tools/scripts/launch_vivado.py new file mode 100755 index 000000000..01774bef3 --- /dev/null +++ b/fpga/usrp3/tools/scripts/launch_vivado.py @@ -0,0 +1,475 @@ +#!/usr/bin/env python +# +# Notice: Some parts of this file were copied from PyBOMBS, which has a +# different copyright, and is highlighted appropriately. The following +# copyright notice pertains to all the parts written specifically for this +# script. +# +# Copyright 2016 Ettus Research +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# +""" +Run Vivado builds +""" + +from __future__ import print_function +import os +import sys +import re +import json +from datetime import datetime +import time +import argparse +import subprocess +import threading +try: + from Queue import Queue, Empty +except ImportError: + from queue import Queue, Empty # Py3k + +READ_TIMEOUT = 0.1 # s + +############################################################################# +# The following functions were copied with minor modifications from PyBOMBS: +def get_console_width(): + ''' + Returns width of console. + + http://stackoverflow.com/questions/566746/how-to-get-console-window-width-in-python + ''' + env = os.environ + def ioctl_GWINSZ(fd): + try: + import fcntl, termios, struct + cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234')) + except: + return + return cr + cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2) + if not cr: + try: + fd = os.open(os.ctermid(), os.O_RDONLY) + cr = ioctl_GWINSZ(fd) + os.close(fd) + except: + pass + if not cr: + cr = (env.get('LINES', 25), env.get('COLUMNS', 80)) + return cr[1] + +def which(program): + """ + Equivalent to Unix' `which` command. + Returns None if the executable `program` can't be found. + + If a full path is given (e.g. /usr/bin/foo), it will return + the path if the executable can be found, or None otherwise. + + If no path is given, it will search PATH. + """ + def is_exe(fpath): + " Check fpath is an executable " + return os.path.isfile(fpath) and os.access(fpath, os.X_OK) + if os.path.split(program)[0] and is_exe(program): + return program + else: + for path in os.environ.get("PATH", "").split(os.pathsep): + exe_file = os.path.join(path, program) + if is_exe(exe_file): + return exe_file + return None +# +# End of functions copied from PyBOMBS. +############################################################################# + +def print_timer(time_delta): + """docstring for print_timer""" + hours, secs = divmod(time_delta.seconds, 3600) + mins, secs = divmod(secs, 60) + return "[{h:02}:{m:02}:{s:02}]".format( + h=hours, m=mins, s=secs, + ) + +def list_search(patterns, string): + " Returns True if string matches any element of pattern " + for pattern in patterns: + if re.search(pattern, string) is not None: + return True + return False + +def parse_args(): + " Parses args for this script, and for Vivado. " + parser = argparse.ArgumentParser( + description="Run Vivado and parse output.", + ) + parser.add_argument( + '--no-color', action="store_true", + help="Don't colorize output.", + ) + parser.add_argument( + '--vivado-command', default=None, + help="Vivado command.", + ) + parser.add_argument( + '--parse-config', default=None, + help="Additional parser configurations", + ) + parser.add_argument( + '-v', '--verbose', default=False, + action='store_true', + help="Print Vivado output") + parser.add_argument( + '--warnings', default=False, + action='store_true', + help="Print Vivado warnings") + our_args, viv_args = parser.parse_known_args() + return our_args, " ".join(viv_args) + +class VivadoRunner(object): + " Vivado Runner " + colors = { + 'warning': '\033[0;35m', + 'critical warning': '\033[33m', + 'error': '\033[1;31m', + 'fatal': '\033[1;31m', + 'task': '\033[32m', + 'cmd': '\033[1;34m', + 'normal': '\033[0m', + } + # Black 0;30 Dark Gray 1;30 + # Blue 0;34 Light Blue 1;34 + # Green 0;32 Light Green 1;32 + # Cyan 0;36 Light Cyan 1;36 + # Red 0;31 Light Red 1;31 + # Purple 0;35 Light Purple 1;35 + # Brown 0;33 Yellow 1;33 + # Light Gray 0;37 White 1;37 + + viv_tcl_cmds = { + 'synth_design' : 'Synthesis', + 'opt_design': 'Logic Optimization', + 'place_design': 'Placer', + 'route_design': 'Routing', + 'phys_opt_design': 'Physical Synthesis', + 'report_timing' : 'Timing Reporting', + 'report_power': 'Power Reporting', + 'report_drc': 'DRC', + 'write_bitstream': 'Write Bitstream', + } + + def __init__(self, args, viv_args): + self.status = '' + self.args = args + self.current_task = "Initialization" + self.current_phase = "Starting" + self.command = args.vivado_command + " " + viv_args + self.notif_queue = Queue() + self.msg_counters = {} + self.fatal_error_found = False + self.line_types = { + 'cmd': { + 'regexes': [ + '^Command: .+', + ], + 'action': self.show_cmd, + 'id': "Command", + }, + 'task': { + 'regexes': [ + '^Starting .* Task', + '^.*Translating synthesized netlist.*', + '^\[TEST CASE .*', + ], + 'action': self.update_task, + 'id': "Task", + }, + 'phase': { + 'regexes': [ + '^Phase (?P<id>[a-zA-Z0-9/. ]*)$', + '^Start (?P<id>[a-zA-Z0-9/. ]*)$', + '^(?P<id>TESTBENCH STARTED: [\w_]*)$', + ], + 'action': self.update_phase, + 'id': "Phase", + }, + 'warning': { + 'regexes': [ + '^WARNING' + ], + 'action': lambda x: self.act_on_build_msg('warning', x), + 'id': "Warning", + 'fatal': [ + ] + }, + 'critical warning': { + 'regexes': [ + '^CRITICAL WARNING' + ], + 'action': lambda x: self.act_on_build_msg('critical warning', x), + 'id': "Critical Warning", + 'fatal': [ + ] + }, + 'error': { + 'regexes': [ + '^ERROR', + 'no such file or directory', + '^Result: FAILED' + ], + 'action': lambda x: self.act_on_build_msg('error', x), + 'id': "Error", + 'fatal': [ + '.', # All errors are fatal by default + ] + }, + 'test': { + 'regexes': [ + '^ - T' + '^Result: ' + ], + 'action': self.update_testbench, + 'id': "Test" + } + } + self.parse_config = None + if args.parse_config is not None: + try: + args.parse_config = os.path.normpath(args.parse_config) + parse_config = json.load(open(args.parse_config)) + self.add_notification( + "Using parser configuration from: {pc}".format(pc=args.parse_config), + color=self.colors.get('normal') + ) + loadables = ('regexes', 'ignore', 'fatal') + for line_type in self.line_types: + for loadable in loadables: + self.line_types[line_type][loadable] = \ + self.line_types[line_type].get(loadable, []) + \ + parse_config.get(line_type, {}).get(loadable, []) + except (IOError, ValueError): + self.add_notification( + "Could not read parser configuration from: {pc}".format(pc=args.parse_config), + color=self.colors.get('warning') + ) + self.tty = sys.stdout.isatty() + self.timer = datetime.now() # Make sure this is the last line in ctor + + def run(self): + """ + Kick off Vivado build. + + Returns True if it all passed. + """ + def enqueue_output(stdout_data, stdout_queue): + " Puts the output from the process into the queue " + for line in iter(stdout_data.readline, b''): + stdout_queue.put(line) + stdout_data.close() + def poll_queue(q): + " Safe polling from queue " + try: + return q.get(timeout=READ_TIMEOUT).decode('utf-8') + except UnicodeDecodeError: + pass + except Empty: + pass + return "" + # Start process + self.add_notification( + "Executing command: {cmd}".format(cmd=self.command), add_time=True, color=self.colors.get('cmd') + ) + proc = subprocess.Popen( + self.command, + shell=True, # Yes we run this in a shell. Unsafe but helps with Vivado. + stdout=subprocess.PIPE, stderr=subprocess.STDOUT # Pipe it all out via stdout + ) + # Init thread and queue + q_stdout = Queue() + t = threading.Thread(target=enqueue_output, args=(proc.stdout, q_stdout)) + # End the thread when the program terminates + t.daemon = True + t.start() + status_line_t = threading.Thread(target=VivadoRunner.run_loop, args=(self.print_status_line, 0.5 if self.tty else 60*10)) + status_line_t.daemon = True + status_line_t.start() + # Run loop + while proc.poll() is None or not q_stdout.empty(): # Run while process is alive + line_stdout = poll_queue(q_stdout) + self.update_output(line_stdout) + success = (proc.returncode == 0) and not self.fatal_error_found + self.cleanup_output(success) + return success + + def update_output(self, lines): + " Receives a line from Vivado output and acts upon it. " + self.process_line(lines) + + @staticmethod + def run_loop(func, delay, *args, **kwargs): + while True: + func(*args, **kwargs) + time.sleep(delay) + + def print_status_line(self): + " Prints status on stdout" + old_status_line_len = len(self.status) + self.update_status_line() + sys.stdout.write("\x1b[2K\r") # Scroll cursor back to beginning and clear last line + self.flush_notification_queue(old_status_line_len) + sys.stdout.write(self.status) + sys.stdout.flush() + # Make sure we print enough spaces to clear out all of the previous message + # if not msgs_printed: + # sys.stdout.write(" " * max(0, old_status_line_len - len(self.status))) + + def cleanup_output(self, success): + " Run final printery after all is said and done. " + # Check message counts are within limits + self.update_phase("Finished") + self.add_notification( + "Process terminated. Status: {status}".format(status='Success' if success else 'Failure'), + add_time=True, + color=self.colors.get("task" if success else "error") + ) + sys.stdout.write("\n") + self.flush_notification_queue(len(self.status)) + print("") + print("========================================================") + print("Warnings: ", self.msg_counters.get('warning', 0)) + print("Critical Warnings: ", self.msg_counters.get('critical warning', 0)) + print("Errors: ", self.msg_counters.get('error', 0)) + print("") + sys.stdout.flush() + + def process_line(self, lines): + " process line " + for line in [l.rstrip() for l in lines.split("\n") if len(l.strip())]: + line_info, line_data = self.classify_line(line) + if line_info is not None: + self.line_types[line_info]['action'](line_data) + elif self.args.verbose: + print(line) + + def classify_line(self, line): + """ + Identify the current line. Return None if the line can't be identified. + """ + for line_type in self.line_types: + for regex in self.line_types[line_type]['regexes']: + re_obj = re.search(regex, line) + if re_obj is not None: + return line_type, re_obj.groupdict().get('id', line) + return None, None + + def update_status_line(self): + " Update self.status. Does not print anything! " + status_line = "{timer} Current task: {task} +++ Current Phase: {phase}" + self.status = status_line.format( + timer=print_timer(datetime.now() - self.timer), + task=self.current_task.strip(), + phase=self.current_phase.strip(), + ) + + def add_notification(self, msg, add_time=False, color=None): + """ + Format msg and add it as a notification to the queue. + """ + if add_time: + msg = print_timer(datetime.now() - self.timer) + " " + msg + if color is not None and not self.args.no_color: + msg = color + msg + self.colors.get('normal') + self.notif_queue.put(msg) + + def flush_notification_queue(self, min_len): + " Print all strings in the notification queue. " + msg_printed = False + while not self.notif_queue.empty(): + msg = self.notif_queue.get().strip() + print(msg) + msg_printed = True + return msg_printed + + def act_on_build_msg(self, msg_type, msg): + """ + Act on a warning, error, critical warning, etc. + """ + if list_search(self.line_types[msg_type].get('fatal', []), msg): + self.add_notification(msg, color=self.colors.get('fatal')) + self.fatal_error_found = True + elif not list_search(self.line_types[msg_type].get('ignore', []), msg): + self.add_notification(msg, color=self.colors.get(msg_type)) + self.msg_counters[msg_type] = self.msg_counters.get(msg_type, 0) + 1 + + def show_cmd(self, tcl_cmd): + " Show the current command " + self.update_phase("Finished") + tcl_cmd = tcl_cmd.replace("Command:", "").strip() + #sys.stdout.write("\n") + self.add_notification("Executing Tcl: " + tcl_cmd, + add_time=True, color=self.colors.get("cmd")) + cmd = tcl_cmd.strip().split()[0]; + if cmd in self.viv_tcl_cmds: + cmd = self.viv_tcl_cmds[cmd] + self.update_task("Starting " + cmd + " Command", is_new=False) + #self.flush_notification_queue(len(self.status)) + + def update_task(self, task, is_new=True): + " Update current task " + # Special case: Treat "translation" as a phase as well + if "Translating synthesized netlist" in task: + task = "Translating Synthesized Netlist" + filtered_task = task.replace("Starting", "").replace("Task", "").replace("Command", "") + if is_new and (filtered_task != self.current_task): + self.update_phase("Finished") + self.current_task = filtered_task + self.current_phase = "Starting" + self.add_notification(task, add_time=True, color=self.colors.get("task")) + sys.stdout.write("\n") + self.print_status_line() + + def update_phase(self, phase): + " Update current phase " + self.current_phase = phase.strip() + self.current_task = self.current_task.replace("Phase", "") + sys.stdout.write("\n") + self.print_status_line() + + def update_testbench(self, testbench): + pass # Do nothing + + +def main(): + " Go, go, go! " + args, viv_args = parse_args() + if args.vivado_command is None: + if which("vivado"): + args.vivado_command = "vivado" + elif which("vivado_lab"): + args.vivado_command = "vivado_lab" + else: + print("Cannot find Vivado executable!") + return False + try: + return VivadoRunner(args, viv_args).run() + except KeyboardInterrupt: + print("") + print("") + print("Caught Ctrl-C. Exiting.") + print("") + return False + +if __name__ == "__main__": + exit(not main()) + |