diff options
Diffstat (limited to 'tools/u_boot_pylib')
-rw-r--r-- | tools/u_boot_pylib/__init__.py | 4 | ||||
-rwxr-xr-x | tools/u_boot_pylib/__main__.py | 23 | ||||
-rw-r--r-- | tools/u_boot_pylib/command.py | 139 | ||||
-rw-r--r-- | tools/u_boot_pylib/cros_subprocess.py | 401 | ||||
-rw-r--r-- | tools/u_boot_pylib/terminal.py | 270 | ||||
-rw-r--r-- | tools/u_boot_pylib/test_util.py | 216 | ||||
-rw-r--r-- | tools/u_boot_pylib/tools.py | 596 | ||||
-rw-r--r-- | tools/u_boot_pylib/tout.py | 179 | ||||
l--------- | tools/u_boot_pylib/u_boot_pylib | 1 |
9 files changed, 1829 insertions, 0 deletions
diff --git a/tools/u_boot_pylib/__init__.py b/tools/u_boot_pylib/__init__.py new file mode 100644 index 0000000000..63c88e85ec --- /dev/null +++ b/tools/u_boot_pylib/__init__.py @@ -0,0 +1,4 @@ +# SPDX-License-Identifier: GPL-2.0+ + +__all__ = ['command', 'cros_subprocess','terminal', 'test_util', 'tools', + 'tout'] diff --git a/tools/u_boot_pylib/__main__.py b/tools/u_boot_pylib/__main__.py new file mode 100755 index 0000000000..8f98d7bd9f --- /dev/null +++ b/tools/u_boot_pylib/__main__.py @@ -0,0 +1,23 @@ +#!/usr/bin/env python3 +# SPDX-License-Identifier: GPL-2.0+ +# +# Copyright 2023 Google LLC +# + +import os +import sys + +if __name__ == "__main__": + # Allow 'from u_boot_pylib import xxx to work' + our_path = os.path.dirname(os.path.realpath(__file__)) + sys.path.append(os.path.join(our_path, '..')) + + # Run tests + from u_boot_pylib import terminal + from u_boot_pylib import test_util + + result = test_util.run_test_suites( + 'u_boot_pylib', False, False, False, None, None, None, + ['terminal']) + + sys.exit(0 if result.wasSuccessful() else 1) diff --git a/tools/u_boot_pylib/command.py b/tools/u_boot_pylib/command.py new file mode 100644 index 0000000000..9bbfc5bdd8 --- /dev/null +++ b/tools/u_boot_pylib/command.py @@ -0,0 +1,139 @@ +# SPDX-License-Identifier: GPL-2.0+ +# Copyright (c) 2011 The Chromium OS Authors. +# + +import os + +from u_boot_pylib import cros_subprocess + +"""Shell command ease-ups for Python.""" + +class CommandResult: + """A class which captures the result of executing a command. + + Members: + stdout: stdout obtained from command, as a string + stderr: stderr obtained from command, as a string + return_code: Return code from command + exception: Exception received, or None if all ok + """ + def __init__(self, stdout='', stderr='', combined='', return_code=0, + exception=None): + self.stdout = stdout + self.stderr = stderr + self.combined = combined + self.return_code = return_code + self.exception = exception + + def to_output(self, binary): + if not binary: + self.stdout = self.stdout.decode('utf-8') + self.stderr = self.stderr.decode('utf-8') + self.combined = self.combined.decode('utf-8') + return self + + +# This permits interception of RunPipe for test purposes. If it is set to +# a function, then that function is called with the pipe list being +# executed. Otherwise, it is assumed to be a CommandResult object, and is +# returned as the result for every run_pipe() call. +# When this value is None, commands are executed as normal. +test_result = None + +def run_pipe(pipe_list, infile=None, outfile=None, + capture=False, capture_stderr=False, oneline=False, + raise_on_error=True, cwd=None, binary=False, + output_func=None, **kwargs): + """ + Perform a command pipeline, with optional input/output filenames. + + Args: + pipe_list: List of command lines to execute. Each command line is + piped into the next, and is itself a list of strings. For + example [ ['ls', '.git'] ['wc'] ] will pipe the output of + 'ls .git' into 'wc'. + infile: File to provide stdin to the pipeline + outfile: File to store stdout + capture: True to capture output + capture_stderr: True to capture stderr + oneline: True to strip newline chars from output + output_func: Output function to call with each output fragment + (if it returns True the function terminates) + kwargs: Additional keyword arguments to cros_subprocess.Popen() + Returns: + CommandResult object + """ + if test_result: + if hasattr(test_result, '__call__'): + # pylint: disable=E1102 + result = test_result(pipe_list=pipe_list) + if result: + return result + else: + return test_result + # No result: fall through to normal processing + result = CommandResult(b'', b'', b'') + last_pipe = None + pipeline = list(pipe_list) + user_pipestr = '|'.join([' '.join(pipe) for pipe in pipe_list]) + kwargs['stdout'] = None + kwargs['stderr'] = None + while pipeline: + cmd = pipeline.pop(0) + if last_pipe is not None: + kwargs['stdin'] = last_pipe.stdout + elif infile: + kwargs['stdin'] = open(infile, 'rb') + if pipeline or capture: + kwargs['stdout'] = cros_subprocess.PIPE + elif outfile: + kwargs['stdout'] = open(outfile, 'wb') + if capture_stderr: + kwargs['stderr'] = cros_subprocess.PIPE + + try: + last_pipe = cros_subprocess.Popen(cmd, cwd=cwd, **kwargs) + except Exception as err: + result.exception = err + if raise_on_error: + raise Exception("Error running '%s': %s" % (user_pipestr, str)) + result.return_code = 255 + return result.to_output(binary) + + if capture: + result.stdout, result.stderr, result.combined = ( + last_pipe.communicate_filter(output_func)) + if result.stdout and oneline: + result.output = result.stdout.rstrip(b'\r\n') + result.return_code = last_pipe.wait() + else: + result.return_code = os.waitpid(last_pipe.pid, 0)[1] + if raise_on_error and result.return_code: + raise Exception("Error running '%s'" % user_pipestr) + return result.to_output(binary) + +def output(*cmd, **kwargs): + kwargs['raise_on_error'] = kwargs.get('raise_on_error', True) + return run_pipe([cmd], capture=True, **kwargs).stdout + +def output_one_line(*cmd, **kwargs): + """Run a command and output it as a single-line string + + The command us expected to produce a single line of output + + Returns: + String containing output of command + """ + raise_on_error = kwargs.pop('raise_on_error', True) + result = run_pipe([cmd], capture=True, oneline=True, + raise_on_error=raise_on_error, **kwargs).stdout.strip() + return result + +def run(*cmd, **kwargs): + return run_pipe([cmd], **kwargs).stdout + +def run_list(cmd): + return run_pipe([cmd], capture=True).stdout + +def stop_all(): + cros_subprocess.stay_alive = False diff --git a/tools/u_boot_pylib/cros_subprocess.py b/tools/u_boot_pylib/cros_subprocess.py new file mode 100644 index 0000000000..cd614f38a6 --- /dev/null +++ b/tools/u_boot_pylib/cros_subprocess.py @@ -0,0 +1,401 @@ +# Copyright (c) 2012 The Chromium OS Authors. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. +# +# Copyright (c) 2003-2005 by Peter Astrand <astrand@lysator.liu.se> +# Licensed to PSF under a Contributor Agreement. +# See http://www.python.org/2.4/license for licensing details. + +"""Subprocess execution + +This module holds a subclass of subprocess.Popen with our own required +features, mainly that we get access to the subprocess output while it +is running rather than just at the end. This makes it easier to show +progress information and filter output in real time. +""" + +import errno +import os +import pty +import select +import subprocess +import sys +import unittest + + +# Import these here so the caller does not need to import subprocess also. +PIPE = subprocess.PIPE +STDOUT = subprocess.STDOUT +PIPE_PTY = -3 # Pipe output through a pty +stay_alive = True + + +class Popen(subprocess.Popen): + """Like subprocess.Popen with ptys and incremental output + + This class deals with running a child process and filtering its output on + both stdout and stderr while it is running. We do this so we can monitor + progress, and possibly relay the output to the user if requested. + + The class is similar to subprocess.Popen, the equivalent is something like: + + Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + But this class has many fewer features, and two enhancement: + + 1. Rather than getting the output data only at the end, this class sends it + to a provided operation as it arrives. + 2. We use pseudo terminals so that the child will hopefully flush its output + to us as soon as it is produced, rather than waiting for the end of a + line. + + Use communicate_filter() to handle output from the subprocess. + + """ + + def __init__(self, args, stdin=None, stdout=PIPE_PTY, stderr=PIPE_PTY, + shell=False, cwd=None, env=None, **kwargs): + """Cut-down constructor + + Args: + args: Program and arguments for subprocess to execute. + stdin: See subprocess.Popen() + stdout: See subprocess.Popen(), except that we support the sentinel + value of cros_subprocess.PIPE_PTY. + stderr: See subprocess.Popen(), except that we support the sentinel + value of cros_subprocess.PIPE_PTY. + shell: See subprocess.Popen() + cwd: Working directory to change to for subprocess, or None if none. + env: Environment to use for this subprocess, or None to inherit parent. + kwargs: No other arguments are supported at the moment. Passing other + arguments will cause a ValueError to be raised. + """ + stdout_pty = None + stderr_pty = None + + if stdout == PIPE_PTY: + stdout_pty = pty.openpty() + stdout = os.fdopen(stdout_pty[1]) + if stderr == PIPE_PTY: + stderr_pty = pty.openpty() + stderr = os.fdopen(stderr_pty[1]) + + super(Popen, self).__init__(args, stdin=stdin, + stdout=stdout, stderr=stderr, shell=shell, cwd=cwd, env=env, + **kwargs) + + # If we're on a PTY, we passed the slave half of the PTY to the subprocess. + # We want to use the master half on our end from now on. Setting this here + # does make some assumptions about the implementation of subprocess, but + # those assumptions are pretty minor. + + # Note that if stderr is STDOUT, then self.stderr will be set to None by + # this constructor. + if stdout_pty is not None: + self.stdout = os.fdopen(stdout_pty[0]) + if stderr_pty is not None: + self.stderr = os.fdopen(stderr_pty[0]) + + # Insist that unit tests exist for other arguments we don't support. + if kwargs: + raise ValueError("Unit tests do not test extra args - please add tests") + + def convert_data(self, data): + """Convert stdout/stderr data to the correct format for output + + Args: + data: Data to convert, or None for '' + + Returns: + Converted data, as bytes + """ + if data is None: + return b'' + return data + + def communicate_filter(self, output, input_buf=''): + """Interact with process: Read data from stdout and stderr. + + This method runs until end-of-file is reached, then waits for the + subprocess to terminate. + + The output function is sent all output from the subprocess and must be + defined like this: + + def output([self,] stream, data) + Args: + stream: the stream the output was received on, which will be + sys.stdout or sys.stderr. + data: a string containing the data + + Returns: + True to terminate the process + + Note: The data read is buffered in memory, so do not use this + method if the data size is large or unlimited. + + Args: + output: Function to call with each fragment of output. + + Returns: + A tuple (stdout, stderr, combined) which is the data received on + stdout, stderr and the combined data (interleaved stdout and stderr). + + Note that the interleaved output will only be sensible if you have + set both stdout and stderr to PIPE or PIPE_PTY. Even then it depends on + the timing of the output in the subprocess. If a subprocess flips + between stdout and stderr quickly in succession, by the time we come to + read the output from each we may see several lines in each, and will read + all the stdout lines, then all the stderr lines. So the interleaving + may not be correct. In this case you might want to pass + stderr=cros_subprocess.STDOUT to the constructor. + + This feature is still useful for subprocesses where stderr is + rarely used and indicates an error. + + Note also that if you set stderr to STDOUT, then stderr will be empty + and the combined output will just be the same as stdout. + """ + + read_set = [] + write_set = [] + stdout = None # Return + stderr = None # Return + + if self.stdin: + # Flush stdio buffer. This might block, if the user has + # been writing to .stdin in an uncontrolled fashion. + self.stdin.flush() + if input_buf: + write_set.append(self.stdin) + else: + self.stdin.close() + if self.stdout: + read_set.append(self.stdout) + stdout = bytearray() + if self.stderr and self.stderr != self.stdout: + read_set.append(self.stderr) + stderr = bytearray() + combined = bytearray() + + stop_now = False + input_offset = 0 + while read_set or write_set: + try: + rlist, wlist, _ = select.select(read_set, write_set, [], 0.2) + except select.error as e: + if e.args[0] == errno.EINTR: + continue + raise + + if not stay_alive: + self.terminate() + + if self.stdin in wlist: + # When select has indicated that the file is writable, + # we can write up to PIPE_BUF bytes without risk + # blocking. POSIX defines PIPE_BUF >= 512 + chunk = input_buf[input_offset : input_offset + 512] + bytes_written = os.write(self.stdin.fileno(), chunk) + input_offset += bytes_written + if input_offset >= len(input_buf): + self.stdin.close() + write_set.remove(self.stdin) + + if self.stdout in rlist: + data = b'' + # We will get an error on read if the pty is closed + try: + data = os.read(self.stdout.fileno(), 1024) + except OSError: + pass + if not len(data): + self.stdout.close() + read_set.remove(self.stdout) + else: + stdout += data + combined += data + if output: + stop_now = output(sys.stdout, data) + if self.stderr in rlist: + data = b'' + # We will get an error on read if the pty is closed + try: + data = os.read(self.stderr.fileno(), 1024) + except OSError: + pass + if not len(data): + self.stderr.close() + read_set.remove(self.stderr) + else: + stderr += data + combined += data + if output: + stop_now = output(sys.stderr, data) + if stop_now: + self.terminate() + + # All data exchanged. Translate lists into strings. + stdout = self.convert_data(stdout) + stderr = self.convert_data(stderr) + combined = self.convert_data(combined) + + self.wait() + return (stdout, stderr, combined) + + +# Just being a unittest.TestCase gives us 14 public methods. Unless we +# disable this, we can only have 6 tests in a TestCase. That's not enough. +# +# pylint: disable=R0904 + +class TestSubprocess(unittest.TestCase): + """Our simple unit test for this module""" + + class MyOperation: + """Provides a operation that we can pass to Popen""" + def __init__(self, input_to_send=None): + """Constructor to set up the operation and possible input. + + Args: + input_to_send: a text string to send when we first get input. We will + add \r\n to the string. + """ + self.stdout_data = '' + self.stderr_data = '' + self.combined_data = '' + self.stdin_pipe = None + self._input_to_send = input_to_send + if input_to_send: + pipe = os.pipe() + self.stdin_read_pipe = pipe[0] + self._stdin_write_pipe = os.fdopen(pipe[1], 'w') + + def output(self, stream, data): + """Output handler for Popen. Stores the data for later comparison""" + if stream == sys.stdout: + self.stdout_data += data + if stream == sys.stderr: + self.stderr_data += data + self.combined_data += data + + # Output the input string if we have one. + if self._input_to_send: + self._stdin_write_pipe.write(self._input_to_send + '\r\n') + self._stdin_write_pipe.flush() + + def _basic_check(self, plist, oper): + """Basic checks that the output looks sane.""" + self.assertEqual(plist[0], oper.stdout_data) + self.assertEqual(plist[1], oper.stderr_data) + self.assertEqual(plist[2], oper.combined_data) + + # The total length of stdout and stderr should equal the combined length + self.assertEqual(len(plist[0]) + len(plist[1]), len(plist[2])) + + def test_simple(self): + """Simple redirection: Get process list""" + oper = TestSubprocess.MyOperation() + plist = Popen(['ps']).communicate_filter(oper.output) + self._basic_check(plist, oper) + + def test_stderr(self): + """Check stdout and stderr""" + oper = TestSubprocess.MyOperation() + cmd = 'echo fred >/dev/stderr && false || echo bad' + plist = Popen([cmd], shell=True).communicate_filter(oper.output) + self._basic_check(plist, oper) + self.assertEqual(plist [0], 'bad\r\n') + self.assertEqual(plist [1], 'fred\r\n') + + def test_shell(self): + """Check with and without shell works""" + oper = TestSubprocess.MyOperation() + cmd = 'echo test >/dev/stderr' + self.assertRaises(OSError, Popen, [cmd], shell=False) + plist = Popen([cmd], shell=True).communicate_filter(oper.output) + self._basic_check(plist, oper) + self.assertEqual(len(plist [0]), 0) + self.assertEqual(plist [1], 'test\r\n') + + def test_list_args(self): + """Check with and without shell works using list arguments""" + oper = TestSubprocess.MyOperation() + cmd = ['echo', 'test', '>/dev/stderr'] + plist = Popen(cmd, shell=False).communicate_filter(oper.output) + self._basic_check(plist, oper) + self.assertEqual(plist [0], ' '.join(cmd[1:]) + '\r\n') + self.assertEqual(len(plist [1]), 0) + + oper = TestSubprocess.MyOperation() + + # this should be interpreted as 'echo' with the other args dropped + cmd = ['echo', 'test', '>/dev/stderr'] + plist = Popen(cmd, shell=True).communicate_filter(oper.output) + self._basic_check(plist, oper) + self.assertEqual(plist [0], '\r\n') + + def test_cwd(self): + """Check we can change directory""" + for shell in (False, True): + oper = TestSubprocess.MyOperation() + plist = Popen('pwd', shell=shell, cwd='/tmp').communicate_filter( + oper.output) + self._basic_check(plist, oper) + self.assertEqual(plist [0], '/tmp\r\n') + + def test_env(self): + """Check we can change environment""" + for add in (False, True): + oper = TestSubprocess.MyOperation() + env = os.environ + if add: + env ['FRED'] = 'fred' + cmd = 'echo $FRED' + plist = Popen(cmd, shell=True, env=env).communicate_filter(oper.output) + self._basic_check(plist, oper) + self.assertEqual(plist [0], add and 'fred\r\n' or '\r\n') + + def test_extra_args(self): + """Check we can't add extra arguments""" + self.assertRaises(ValueError, Popen, 'true', close_fds=False) + + def test_basic_input(self): + """Check that incremental input works + + We set up a subprocess which will prompt for name. When we see this prompt + we send the name as input to the process. It should then print the name + properly to stdout. + """ + oper = TestSubprocess.MyOperation('Flash') + prompt = 'What is your name?: ' + cmd = 'echo -n "%s"; read name; echo Hello $name' % prompt + plist = Popen([cmd], stdin=oper.stdin_read_pipe, + shell=True).communicate_filter(oper.output) + self._basic_check(plist, oper) + self.assertEqual(len(plist [1]), 0) + self.assertEqual(plist [0], prompt + 'Hello Flash\r\r\n') + + def test_isatty(self): + """Check that ptys appear as terminals to the subprocess""" + oper = TestSubprocess.MyOperation() + cmd = ('if [ -t %d ]; then echo "terminal %d" >&%d; ' + 'else echo "not %d" >&%d; fi;') + both_cmds = '' + for fd in (1, 2): + both_cmds += cmd % (fd, fd, fd, fd, fd) + plist = Popen(both_cmds, shell=True).communicate_filter(oper.output) + self._basic_check(plist, oper) + self.assertEqual(plist [0], 'terminal 1\r\n') + self.assertEqual(plist [1], 'terminal 2\r\n') + + # Now try with PIPE and make sure it is not a terminal + oper = TestSubprocess.MyOperation() + plist = Popen(both_cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE, + shell=True).communicate_filter(oper.output) + self._basic_check(plist, oper) + self.assertEqual(plist [0], 'not 1\n') + self.assertEqual(plist [1], 'not 2\n') + +if __name__ == '__main__': + unittest.main() diff --git a/tools/u_boot_pylib/terminal.py b/tools/u_boot_pylib/terminal.py new file mode 100644 index 0000000000..40d79f8ac0 --- /dev/null +++ b/tools/u_boot_pylib/terminal.py @@ -0,0 +1,270 @@ +# SPDX-License-Identifier: GPL-2.0+ +# Copyright (c) 2011 The Chromium OS Authors. +# + +"""Terminal utilities + +This module handles terminal interaction including ANSI color codes. +""" + +import os +import re +import shutil +import sys + +# Selection of when we want our output to be colored +COLOR_IF_TERMINAL, COLOR_ALWAYS, COLOR_NEVER = range(3) + +# Initially, we are set up to print to the terminal +print_test_mode = False +print_test_list = [] + +# The length of the last line printed without a newline +last_print_len = None + +# credit: +# stackoverflow.com/questions/14693701/how-can-i-remove-the-ansi-escape-sequences-from-a-string-in-python +ansi_escape = re.compile(r'\x1b(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') + +class PrintLine: + """A line of text output + + Members: + text: Text line that was printed + newline: True to output a newline after the text + colour: Text colour to use + """ + def __init__(self, text, colour, newline=True, bright=True): + self.text = text + self.newline = newline + self.colour = colour + self.bright = bright + + def __eq__(self, other): + return (self.text == other.text and + self.newline == other.newline and + self.colour == other.colour and + self.bright == other.bright) + + def __str__(self): + return ("newline=%s, colour=%s, bright=%d, text='%s'" % + (self.newline, self.colour, self.bright, self.text)) + + +def calc_ascii_len(text): + """Calculate the length of a string, ignoring any ANSI sequences + + When displayed on a terminal, ANSI sequences don't take any space, so we + need to ignore them when calculating the length of a string. + + Args: + text: Text to check + + Returns: + Length of text, after skipping ANSI sequences + + >>> col = Color(COLOR_ALWAYS) + >>> text = col.build(Color.RED, 'abc') + >>> len(text) + 14 + >>> calc_ascii_len(text) + 3 + >>> + >>> text += 'def' + >>> calc_ascii_len(text) + 6 + >>> text += col.build(Color.RED, 'abc') + >>> calc_ascii_len(text) + 9 + """ + result = ansi_escape.sub('', text) + return len(result) + +def trim_ascii_len(text, size): + """Trim a string containing ANSI sequences to the given ASCII length + + The string is trimmed with ANSI sequences being ignored for the length + calculation. + + >>> col = Color(COLOR_ALWAYS) + >>> text = col.build(Color.RED, 'abc') + >>> len(text) + 14 + >>> calc_ascii_len(trim_ascii_len(text, 4)) + 3 + >>> calc_ascii_len(trim_ascii_len(text, 2)) + 2 + >>> text += 'def' + >>> calc_ascii_len(trim_ascii_len(text, 4)) + 4 + >>> text += col.build(Color.RED, 'ghi') + >>> calc_ascii_len(trim_ascii_len(text, 7)) + 7 + """ + if calc_ascii_len(text) < size: + return text + pos = 0 + out = '' + left = size + + # Work through each ANSI sequence in turn + for m in ansi_escape.finditer(text): + # Find the text before the sequence and add it to our string, making + # sure it doesn't overflow + before = text[pos:m.start()] + toadd = before[:left] + out += toadd + + # Figure out how much non-ANSI space we have left + left -= len(toadd) + + # Add the ANSI sequence and move to the position immediately after it + out += m.group() + pos = m.start() + len(m.group()) + + # Deal with text after the last ANSI sequence + after = text[pos:] + toadd = after[:left] + out += toadd + + return out + + +def tprint(text='', newline=True, colour=None, limit_to_line=False, bright=True): + """Handle a line of output to the terminal. + + In test mode this is recorded in a list. Otherwise it is output to the + terminal. + + Args: + text: Text to print + newline: True to add a new line at the end of the text + colour: Colour to use for the text + """ + global last_print_len + + if print_test_mode: + print_test_list.append(PrintLine(text, colour, newline, bright)) + else: + if colour: + col = Color() + text = col.build(colour, text, bright=bright) + if newline: + print(text) + last_print_len = None + else: + if limit_to_line: + cols = shutil.get_terminal_size().columns + text = trim_ascii_len(text, cols) + print(text, end='', flush=True) + last_print_len = calc_ascii_len(text) + +def print_clear(): + """Clear a previously line that was printed with no newline""" + global last_print_len + + if last_print_len: + print('\r%s\r' % (' '* last_print_len), end='', flush=True) + last_print_len = None + +def set_print_test_mode(enable=True): + """Go into test mode, where all printing is recorded""" + global print_test_mode + + print_test_mode = enable + get_print_test_lines() + +def get_print_test_lines(): + """Get a list of all lines output through tprint() + + Returns: + A list of PrintLine objects + """ + global print_test_list + + ret = print_test_list + print_test_list = [] + return ret + +def echo_print_test_lines(): + """Print out the text lines collected""" + for line in print_test_list: + if line.colour: + col = Color() + print(col.build(line.colour, line.text), end='') + else: + print(line.text, end='') + if line.newline: + print() + + +class Color(object): + """Conditionally wraps text in ANSI color escape sequences.""" + BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8) + BOLD = -1 + BRIGHT_START = '\033[1;%dm' + NORMAL_START = '\033[22;%dm' + BOLD_START = '\033[1m' + RESET = '\033[0m' + + def __init__(self, colored=COLOR_IF_TERMINAL): + """Create a new Color object, optionally disabling color output. + + Args: + enabled: True if color output should be enabled. If False then this + class will not add color codes at all. + """ + try: + self._enabled = (colored == COLOR_ALWAYS or + (colored == COLOR_IF_TERMINAL and + os.isatty(sys.stdout.fileno()))) + except: + self._enabled = False + + def start(self, color, bright=True): + """Returns a start color code. + + Args: + color: Color to use, .e.g BLACK, RED, etc. + + Returns: + If color is enabled, returns an ANSI sequence to start the given + color, otherwise returns empty string + """ + if self._enabled: + base = self.BRIGHT_START if bright else self.NORMAL_START + return base % (color + 30) + return '' + + def stop(self): + """Returns a stop color code. + + Returns: + If color is enabled, returns an ANSI color reset sequence, + otherwise returns empty string + """ + if self._enabled: + return self.RESET + return '' + + def build(self, color, text, bright=True): + """Returns text with conditionally added color escape sequences. + + Keyword arguments: + color: Text color -- one of the color constants defined in this + class. + text: The text to color. + + Returns: + If self._enabled is False, returns the original text. If it's True, + returns text with color escape sequences based on the value of + color. + """ + if not self._enabled: + return text + if color == self.BOLD: + start = self.BOLD_START + else: + base = self.BRIGHT_START if bright else self.NORMAL_START + start = base % (color + 30) + return start + text + self.RESET diff --git a/tools/u_boot_pylib/test_util.py b/tools/u_boot_pylib/test_util.py new file mode 100644 index 0000000000..e7564e10c9 --- /dev/null +++ b/tools/u_boot_pylib/test_util.py @@ -0,0 +1,216 @@ +# SPDX-License-Identifier: GPL-2.0+ +# +# Copyright (c) 2016 Google, Inc +# + +from contextlib import contextmanager +import doctest +import glob +import multiprocessing +import os +import sys +import unittest + +from u_boot_pylib import command + +from io import StringIO + +use_concurrent = True +try: + from concurrencytest import ConcurrentTestSuite + from concurrencytest import fork_for_tests +except: + use_concurrent = False + + +def run_test_coverage(prog, filter_fname, exclude_list, build_dir, required=None, + extra_args=None): + """Run tests and check that we get 100% coverage + + Args: + prog: Program to run (with be passed a '-t' argument to run tests + filter_fname: Normally all *.py files in the program's directory will + be included. If this is not None, then it is used to filter the + list so that only filenames that don't contain filter_fname are + included. + exclude_list: List of file patterns to exclude from the coverage + calculation + build_dir: Build directory, used to locate libfdt.py + required: List of modules which must be in the coverage report + extra_args (str): Extra arguments to pass to the tool before the -t/test + arg + + Raises: + ValueError if the code coverage is not 100% + """ + # This uses the build output from sandbox_spl to get _libfdt.so + path = os.path.dirname(prog) + if filter_fname: + glob_list = glob.glob(os.path.join(path, '*.py')) + glob_list = [fname for fname in glob_list if filter_fname in fname] + else: + glob_list = [] + glob_list += exclude_list + glob_list += ['*libfdt.py', '*site-packages*', '*dist-packages*'] + glob_list += ['*concurrencytest*'] + test_cmd = 'test' if 'binman' in prog or 'patman' in prog else '-t' + prefix = '' + if build_dir: + prefix = 'PYTHONPATH=$PYTHONPATH:%s/sandbox_spl/tools ' % build_dir + cmd = ('%spython3-coverage run ' + '--omit "%s" %s %s %s -P1' % (prefix, ','.join(glob_list), + prog, extra_args or '', test_cmd)) + os.system(cmd) + stdout = command.output('python3-coverage', 'report') + lines = stdout.splitlines() + if required: + # Convert '/path/to/name.py' just the module name 'name' + test_set = set([os.path.splitext(os.path.basename(line.split()[0]))[0] + for line in lines if '/etype/' in line]) + missing_list = required + missing_list.discard('__init__') + missing_list.difference_update(test_set) + if missing_list: + print('Missing tests for %s' % (', '.join(missing_list))) + print(stdout) + ok = False + + coverage = lines[-1].split(' ')[-1] + ok = True + print(coverage) + if coverage != '100%': + print(stdout) + print("To get a report in 'htmlcov/index.html', type: python3-coverage html") + print('Coverage error: %s, but should be 100%%' % coverage) + ok = False + if not ok: + raise ValueError('Test coverage failure') + + +# Use this to suppress stdout/stderr output: +# with capture_sys_output() as (stdout, stderr) +# ...do something... +@contextmanager +def capture_sys_output(): + capture_out, capture_err = StringIO(), StringIO() + old_out, old_err = sys.stdout, sys.stderr + try: + sys.stdout, sys.stderr = capture_out, capture_err + yield capture_out, capture_err + finally: + sys.stdout, sys.stderr = old_out, old_err + + +class FullTextTestResult(unittest.TextTestResult): + """A test result class that can print extended text results to a stream + + This is meant to be used by a TestRunner as a result class. Like + TextTestResult, this prints out the names of tests as they are run, + errors as they occur, and a summary of the results at the end of the + test run. Beyond those, this prints information about skipped tests, + expected failures and unexpected successes. + + Args: + stream: A file-like object to write results to + descriptions (bool): True to print descriptions with test names + verbosity (int): Detail of printed output per test as they run + Test stdout and stderr always get printed when buffering + them is disabled by the test runner. In addition to that, + 0: Print nothing + 1: Print a dot per test + 2: Print test names + """ + def __init__(self, stream, descriptions, verbosity): + self.verbosity = verbosity + super().__init__(stream, descriptions, verbosity) + + def printErrors(self): + "Called by TestRunner after test run to summarize the tests" + # The parent class doesn't keep unexpected successes in the same + # format as the rest. Adapt it to what printErrorList expects. + unexpected_successes = [ + (test, 'Test was expected to fail, but succeeded.\n') + for test in self.unexpectedSuccesses + ] + + super().printErrors() # FAIL and ERROR + self.printErrorList('SKIP', self.skipped) + self.printErrorList('XFAIL', self.expectedFailures) + self.printErrorList('XPASS', unexpected_successes) + + def addSkip(self, test, reason): + """Called when a test is skipped.""" + # Add empty line to keep spacing consistent with other results + if not reason.endswith('\n'): + reason += '\n' + super().addSkip(test, reason) + + +def run_test_suites(toolname, debug, verbosity, test_preserve_dirs, processes, + test_name, toolpath, class_and_module_list): + """Run a series of test suites and collect the results + + Args: + toolname: Name of the tool that ran the tests + debug: True to enable debugging, which shows a full stack trace on error + verbosity: Verbosity level to use (0-4) + test_preserve_dirs: True to preserve the input directory used by tests + so that it can be examined afterwards (only useful for debugging + tests). If a single test is selected (in args[0]) it also preserves + the output directory for this test. Both directories are displayed + on the command line. + processes: Number of processes to use to run tests (None=same as #CPUs) + test_name: Name of test to run, or None for all + toolpath: List of paths to use for tools + class_and_module_list: List of test classes (type class) and module + names (type str) to run + """ + sys.argv = [sys.argv[0]] + if debug: + sys.argv.append('-D') + if verbosity: + sys.argv.append('-v%d' % verbosity) + if toolpath: + for path in toolpath: + sys.argv += ['--toolpath', path] + + suite = unittest.TestSuite() + loader = unittest.TestLoader() + runner = unittest.TextTestRunner( + stream=sys.stdout, + verbosity=(1 if verbosity is None else verbosity), + resultclass=FullTextTestResult, + ) + + if use_concurrent and processes != 1: + suite = ConcurrentTestSuite(suite, + fork_for_tests(processes or multiprocessing.cpu_count())) + + for module in class_and_module_list: + if isinstance(module, str) and (not test_name or test_name == module): + suite.addTests(doctest.DocTestSuite(module)) + + for module in class_and_module_list: + if isinstance(module, str): + continue + # Test the test module about our arguments, if it is interested + if hasattr(module, 'setup_test_args'): + setup_test_args = getattr(module, 'setup_test_args') + setup_test_args(preserve_indir=test_preserve_dirs, + preserve_outdirs=test_preserve_dirs and test_name is not None, + toolpath=toolpath, verbosity=verbosity) + if test_name: + # Since Python v3.5 If an ImportError or AttributeError occurs + # while traversing a name then a synthetic test that raises that + # error when run will be returned. Check that the requested test + # exists, otherwise these errors are included in the results. + if test_name in loader.getTestCaseNames(module): + suite.addTests(loader.loadTestsFromName(test_name, module)) + else: + suite.addTests(loader.loadTestsFromTestCase(module)) + + print(f" Running {toolname} tests ".center(70, "=")) + result = runner.run(suite) + print() + + return result diff --git a/tools/u_boot_pylib/tools.py b/tools/u_boot_pylib/tools.py new file mode 100644 index 0000000000..187725b501 --- /dev/null +++ b/tools/u_boot_pylib/tools.py @@ -0,0 +1,596 @@ +# SPDX-License-Identifier: GPL-2.0+ +# +# Copyright (c) 2016 Google, Inc +# + +import glob +import os +import shlex +import shutil +import sys +import tempfile +import urllib.request + +from u_boot_pylib import command +from u_boot_pylib import tout + +# Output directly (generally this is temporary) +outdir = None + +# True to keep the output directory around after exiting +preserve_outdir = False + +# Path to the Chrome OS chroot, if we know it +chroot_path = None + +# Search paths to use for filename(), used to find files +search_paths = [] + +tool_search_paths = [] + +# Tools and the packages that contain them, on debian +packages = { + 'lz4': 'liblz4-tool', + } + +# List of paths to use when looking for an input file +indir = [] + +def prepare_output_dir(dirname, preserve=False): + """Select an output directory, ensuring it exists. + + This either creates a temporary directory or checks that the one supplied + by the user is valid. For a temporary directory, it makes a note to + remove it later if required. + + Args: + dirname: a string, name of the output directory to use to store + intermediate and output files. If is None - create a temporary + directory. + preserve: a Boolean. If outdir above is None and preserve is False, the + created temporary directory will be destroyed on exit. + + Raises: + OSError: If it cannot create the output directory. + """ + global outdir, preserve_outdir + + preserve_outdir = dirname or preserve + if dirname: + outdir = dirname + if not os.path.isdir(outdir): + try: + os.makedirs(outdir) + except OSError as err: + raise ValueError( + f"Cannot make output directory 'outdir': 'err.strerror'") + tout.debug("Using output directory '%s'" % outdir) + else: + outdir = tempfile.mkdtemp(prefix='binman.') + tout.debug("Using temporary directory '%s'" % outdir) + +def _remove_output_dir(): + global outdir + + shutil.rmtree(outdir) + tout.debug("Deleted temporary directory '%s'" % outdir) + outdir = None + +def finalise_output_dir(): + global outdir, preserve_outdir + + """Tidy up: delete output directory if temporary and not preserved.""" + if outdir and not preserve_outdir: + _remove_output_dir() + outdir = None + +def get_output_filename(fname): + """Return a filename within the output directory. + + Args: + fname: Filename to use for new file + + Returns: + The full path of the filename, within the output directory + """ + return os.path.join(outdir, fname) + +def get_output_dir(): + """Return the current output directory + + Returns: + str: The output directory + """ + return outdir + +def _finalise_for_test(): + """Remove the output directory (for use by tests)""" + global outdir + + if outdir: + _remove_output_dir() + outdir = None + +def set_input_dirs(dirname): + """Add a list of input directories, where input files are kept. + + Args: + dirname: a list of paths to input directories to use for obtaining + files needed by binman to place in the image. + """ + global indir + + indir = dirname + tout.debug("Using input directories %s" % indir) + +def get_input_filename(fname, allow_missing=False): + """Return a filename for use as input. + + Args: + fname: Filename to use for new file + allow_missing: True if the filename can be missing + + Returns: + fname, if indir is None; + full path of the filename, within the input directory; + None, if file is missing and allow_missing is True + + Raises: + ValueError if file is missing and allow_missing is False + """ + if not indir or fname[:1] == '/': + return fname + for dirname in indir: + pathname = os.path.join(dirname, fname) + if os.path.exists(pathname): + return pathname + + if allow_missing: + return None + raise ValueError("Filename '%s' not found in input path (%s) (cwd='%s')" % + (fname, ','.join(indir), os.getcwd())) + +def get_input_filename_glob(pattern): + """Return a list of filenames for use as input. + + Args: + pattern: Filename pattern to search for + + Returns: + A list of matching files in all input directories + """ + if not indir: + return glob.glob(pattern) + files = [] + for dirname in indir: + pathname = os.path.join(dirname, pattern) + files += glob.glob(pathname) + return sorted(files) + +def align(pos, align): + if align: + mask = align - 1 + pos = (pos + mask) & ~mask + return pos + +def not_power_of_two(num): + return num and (num & (num - 1)) + +def set_tool_paths(toolpaths): + """Set the path to search for tools + + Args: + toolpaths: List of paths to search for tools executed by run() + """ + global tool_search_paths + + tool_search_paths = toolpaths + +def path_has_file(path_spec, fname): + """Check if a given filename is in the PATH + + Args: + path_spec: Value of PATH variable to check + fname: Filename to check + + Returns: + True if found, False if not + """ + for dir in path_spec.split(':'): + if os.path.exists(os.path.join(dir, fname)): + return True + return False + +def get_host_compile_tool(env, name): + """Get the host-specific version for a compile tool + + This checks the environment variables that specify which version of + the tool should be used (e.g. ${HOSTCC}). + + The following table lists the host-specific versions of the tools + this function resolves to: + + Compile Tool | Host version + --------------+---------------- + as | ${HOSTAS} + ld | ${HOSTLD} + cc | ${HOSTCC} + cpp | ${HOSTCPP} + c++ | ${HOSTCXX} + ar | ${HOSTAR} + nm | ${HOSTNM} + ldr | ${HOSTLDR} + strip | ${HOSTSTRIP} + objcopy | ${HOSTOBJCOPY} + objdump | ${HOSTOBJDUMP} + dtc | ${HOSTDTC} + + Args: + name: Command name to run + + Returns: + host_name: Exact command name to run instead + extra_args: List of extra arguments to pass + """ + host_name = None + extra_args = [] + if name in ('as', 'ld', 'cc', 'cpp', 'ar', 'nm', 'ldr', 'strip', + 'objcopy', 'objdump', 'dtc'): + host_name, *host_args = env.get('HOST' + name.upper(), '').split(' ') + elif name == 'c++': + host_name, *host_args = env.get('HOSTCXX', '').split(' ') + + if host_name: + return host_name, extra_args + return name, [] + +def get_target_compile_tool(name, cross_compile=None): + """Get the target-specific version for a compile tool + + This first checks the environment variables that specify which + version of the tool should be used (e.g. ${CC}). If those aren't + specified, it checks the CROSS_COMPILE variable as a prefix for the + tool with some substitutions (e.g. "${CROSS_COMPILE}gcc" for cc). + + The following table lists the target-specific versions of the tools + this function resolves to: + + Compile Tool | First choice | Second choice + --------------+----------------+---------------------------- + as | ${AS} | ${CROSS_COMPILE}as + ld | ${LD} | ${CROSS_COMPILE}ld.bfd + | | or ${CROSS_COMPILE}ld + cc | ${CC} | ${CROSS_COMPILE}gcc + cpp | ${CPP} | ${CROSS_COMPILE}gcc -E + c++ | ${CXX} | ${CROSS_COMPILE}g++ + ar | ${AR} | ${CROSS_COMPILE}ar + nm | ${NM} | ${CROSS_COMPILE}nm + ldr | ${LDR} | ${CROSS_COMPILE}ldr + strip | ${STRIP} | ${CROSS_COMPILE}strip + objcopy | ${OBJCOPY} | ${CROSS_COMPILE}objcopy + objdump | ${OBJDUMP} | ${CROSS_COMPILE}objdump + dtc | ${DTC} | (no CROSS_COMPILE version) + + Args: + name: Command name to run + + Returns: + target_name: Exact command name to run instead + extra_args: List of extra arguments to pass + """ + env = dict(os.environ) + + target_name = None + extra_args = [] + if name in ('as', 'ld', 'cc', 'cpp', 'ar', 'nm', 'ldr', 'strip', + 'objcopy', 'objdump', 'dtc'): + target_name, *extra_args = env.get(name.upper(), '').split(' ') + elif name == 'c++': + target_name, *extra_args = env.get('CXX', '').split(' ') + + if target_name: + return target_name, extra_args + + if cross_compile is None: + cross_compile = env.get('CROSS_COMPILE', '') + + if name in ('as', 'ar', 'nm', 'ldr', 'strip', 'objcopy', 'objdump'): + target_name = cross_compile + name + elif name == 'ld': + try: + if run(cross_compile + 'ld.bfd', '-v'): + target_name = cross_compile + 'ld.bfd' + except: + target_name = cross_compile + 'ld' + elif name == 'cc': + target_name = cross_compile + 'gcc' + elif name == 'cpp': + target_name = cross_compile + 'gcc' + extra_args = ['-E'] + elif name == 'c++': + target_name = cross_compile + 'g++' + else: + target_name = name + return target_name, extra_args + +def get_env_with_path(): + """Get an updated environment with the PATH variable set correctly + + If there are any search paths set, these need to come first in the PATH so + that these override any other version of the tools. + + Returns: + dict: New environment with PATH updated, or None if there are not search + paths + """ + if tool_search_paths: + env = dict(os.environ) + env['PATH'] = ':'.join(tool_search_paths) + ':' + env['PATH'] + return env + +def run_result(name, *args, **kwargs): + """Run a tool with some arguments + + This runs a 'tool', which is a program used by binman to process files and + perhaps produce some output. Tools can be located on the PATH or in a + search path. + + Args: + name: Command name to run + args: Arguments to the tool + for_host: True to resolve the command to the version for the host + for_target: False to run the command as-is, without resolving it + to the version for the compile target + raise_on_error: Raise an error if the command fails (True by default) + + Returns: + CommandResult object + """ + try: + binary = kwargs.get('binary') + for_host = kwargs.get('for_host', False) + for_target = kwargs.get('for_target', not for_host) + raise_on_error = kwargs.get('raise_on_error', True) + env = get_env_with_path() + if for_target: + name, extra_args = get_target_compile_tool(name) + args = tuple(extra_args) + args + elif for_host: + name, extra_args = get_host_compile_tool(env, name) + args = tuple(extra_args) + args + name = os.path.expanduser(name) # Expand paths containing ~ + all_args = (name,) + args + result = command.run_pipe([all_args], capture=True, capture_stderr=True, + env=env, raise_on_error=False, binary=binary) + if result.return_code: + if raise_on_error: + raise ValueError("Error %d running '%s': %s" % + (result.return_code,' '.join(all_args), + result.stderr or result.stdout)) + return result + except ValueError: + if env and not path_has_file(env['PATH'], name): + msg = "Please install tool '%s'" % name + package = packages.get(name) + if package: + msg += " (e.g. from package '%s')" % package + raise ValueError(msg) + raise + +def tool_find(name): + """Search the current path for a tool + + This uses both PATH and any value from set_tool_paths() to search for a tool + + Args: + name (str): Name of tool to locate + + Returns: + str: Full path to tool if found, else None + """ + name = os.path.expanduser(name) # Expand paths containing ~ + paths = [] + pathvar = os.environ.get('PATH') + if pathvar: + paths = pathvar.split(':') + if tool_search_paths: + paths += tool_search_paths + for path in paths: + fname = os.path.join(path, name) + if os.path.isfile(fname) and os.access(fname, os.X_OK): + return fname + +def run(name, *args, **kwargs): + """Run a tool with some arguments + + This runs a 'tool', which is a program used by binman to process files and + perhaps produce some output. Tools can be located on the PATH or in a + search path. + + Args: + name: Command name to run + args: Arguments to the tool + for_host: True to resolve the command to the version for the host + for_target: False to run the command as-is, without resolving it + to the version for the compile target + + Returns: + CommandResult object + """ + result = run_result(name, *args, **kwargs) + if result is not None: + return result.stdout + +def filename(fname): + """Resolve a file path to an absolute path. + + If fname starts with ##/ and chroot is available, ##/ gets replaced with + the chroot path. If chroot is not available, this file name can not be + resolved, `None' is returned. + + If fname is not prepended with the above prefix, and is not an existing + file, the actual file name is retrieved from the passed in string and the + search_paths directories (if any) are searched to for the file. If found - + the path to the found file is returned, `None' is returned otherwise. + + Args: + fname: a string, the path to resolve. + + Returns: + Absolute path to the file or None if not found. + """ + if fname.startswith('##/'): + if chroot_path: + fname = os.path.join(chroot_path, fname[3:]) + else: + return None + + # Search for a pathname that exists, and return it if found + if fname and not os.path.exists(fname): + for path in search_paths: + pathname = os.path.join(path, os.path.basename(fname)) + if os.path.exists(pathname): + return pathname + + # If not found, just return the standard, unchanged path + return fname + +def read_file(fname, binary=True): + """Read and return the contents of a file. + + Args: + fname: path to filename to read, where ## signifiies the chroot. + + Returns: + data read from file, as a string. + """ + with open(filename(fname), binary and 'rb' or 'r') as fd: + data = fd.read() + #self._out.Info("Read file '%s' size %d (%#0x)" % + #(fname, len(data), len(data))) + return data + +def write_file(fname, data, binary=True): + """Write data into a file. + + Args: + fname: path to filename to write + data: data to write to file, as a string + """ + #self._out.Info("Write file '%s' size %d (%#0x)" % + #(fname, len(data), len(data))) + with open(filename(fname), binary and 'wb' or 'w') as fd: + fd.write(data) + +def get_bytes(byte, size): + """Get a string of bytes of a given size + + Args: + byte: Numeric byte value to use + size: Size of bytes/string to return + + Returns: + A bytes type with 'byte' repeated 'size' times + """ + return bytes([byte]) * size + +def to_bytes(string): + """Convert a str type into a bytes type + + Args: + string: string to convert + + Returns: + A bytes type + """ + return string.encode('utf-8') + +def to_string(bval): + """Convert a bytes type into a str type + + Args: + bval: bytes value to convert + + Returns: + Python 3: A bytes type + Python 2: A string type + """ + return bval.decode('utf-8') + +def to_hex(val): + """Convert an integer value (or None) to a string + + Returns: + hex value, or 'None' if the value is None + """ + return 'None' if val is None else '%#x' % val + +def to_hex_size(val): + """Return the size of an object in hex + + Returns: + hex value of size, or 'None' if the value is None + """ + return 'None' if val is None else '%#x' % len(val) + +def print_full_help(fname): + """Print the full help message for a tool using an appropriate pager. + + Args: + fname: Path to a file containing the full help message + """ + pager = shlex.split(os.getenv('PAGER', '')) + if not pager: + lesspath = shutil.which('less') + pager = [lesspath] if lesspath else None + if not pager: + pager = ['more'] + command.run(*pager, fname) + +def download(url, tmpdir_pattern='.patman'): + """Download a file to a temporary directory + + Args: + url (str): URL to download + tmpdir_pattern (str): pattern to use for the temporary directory + + Returns: + Tuple: + Full path to the downloaded archive file in that directory, + or None if there was an error while downloading + Temporary directory name + """ + print('- downloading: %s' % url) + leaf = url.split('/')[-1] + tmpdir = tempfile.mkdtemp(tmpdir_pattern) + response = urllib.request.urlopen(url) + fname = os.path.join(tmpdir, leaf) + fd = open(fname, 'wb') + meta = response.info() + size = int(meta.get('Content-Length')) + done = 0 + block_size = 1 << 16 + status = '' + + # Read the file in chunks and show progress as we go + while True: + buffer = response.read(block_size) + if not buffer: + print(chr(8) * (len(status) + 1), '\r', end=' ') + break + + done += len(buffer) + fd.write(buffer) + status = r'%10d MiB [%3d%%]' % (done // 1024 // 1024, + done * 100 // size) + status = status + chr(8) * (len(status) + 1) + print(status, end=' ') + sys.stdout.flush() + print('\r', end='') + sys.stdout.flush() + fd.close() + if done != size: + print('Error, failed to download') + os.remove(fname) + fname = None + return fname, tmpdir diff --git a/tools/u_boot_pylib/tout.py b/tools/u_boot_pylib/tout.py new file mode 100644 index 0000000000..6bd2806f88 --- /dev/null +++ b/tools/u_boot_pylib/tout.py @@ -0,0 +1,179 @@ +# SPDX-License-Identifier: GPL-2.0+ +# Copyright (c) 2016 Google, Inc +# +# Terminal output logging. +# + +import sys + +from u_boot_pylib import terminal + +# Output verbosity levels that we support +ERROR, WARNING, NOTICE, INFO, DETAIL, DEBUG = range(6) + +in_progress = False + +""" +This class handles output of progress and other useful information +to the user. It provides for simple verbosity level control and can +output nothing but errors at verbosity zero. + +The idea is that modules set up an Output object early in their years and pass +it around to other modules that need it. This keeps the output under control +of a single class. + +Public properties: + verbose: Verbosity level: 0=silent, 1=progress, 3=full, 4=debug +""" +def __enter__(): + return + +def __exit__(unused1, unused2, unused3): + """Clean up and remove any progress message.""" + clear_progress() + return False + +def user_is_present(): + """This returns True if it is likely that a user is present. + + Sometimes we want to prompt the user, but if no one is there then this + is a waste of time, and may lock a script which should otherwise fail. + + Returns: + True if it thinks the user is there, and False otherwise + """ + return stdout_is_tty and verbose > 0 + +def clear_progress(): + """Clear any active progress message on the terminal.""" + global in_progress + if verbose > 0 and stdout_is_tty and in_progress: + _stdout.write('\r%s\r' % (" " * len (_progress))) + _stdout.flush() + in_progress = False + +def progress(msg, warning=False, trailer='...'): + """Display progress information. + + Args: + msg: Message to display. + warning: True if this is a warning.""" + global in_progress + clear_progress() + if verbose > 0: + _progress = msg + trailer + if stdout_is_tty: + col = _color.YELLOW if warning else _color.GREEN + _stdout.write('\r' + _color.build(col, _progress)) + _stdout.flush() + in_progress = True + else: + _stdout.write(_progress + '\n') + +def _output(level, msg, color=None): + """Output a message to the terminal. + + Args: + level: Verbosity level for this message. It will only be displayed if + this as high as the currently selected level. + msg; Message to display. + error: True if this is an error message, else False. + """ + if verbose >= level: + clear_progress() + if color: + msg = _color.build(color, msg) + if level < NOTICE: + print(msg, file=sys.stderr) + else: + print(msg) + +def do_output(level, msg): + """Output a message to the terminal. + + Args: + level: Verbosity level for this message. It will only be displayed if + this as high as the currently selected level. + msg; Message to display. + """ + _output(level, msg) + +def error(msg): + """Display an error message + + Args: + msg; Message to display. + """ + _output(ERROR, msg, _color.RED) + +def warning(msg): + """Display a warning message + + Args: + msg; Message to display. + """ + _output(WARNING, msg, _color.YELLOW) + +def notice(msg): + """Display an important infomation message + + Args: + msg; Message to display. + """ + _output(NOTICE, msg) + +def info(msg): + """Display an infomation message + + Args: + msg; Message to display. + """ + _output(INFO, msg) + +def detail(msg): + """Display a detailed message + + Args: + msg; Message to display. + """ + _output(DETAIL, msg) + +def debug(msg): + """Display a debug message + + Args: + msg; Message to display. + """ + _output(DEBUG, msg) + +def user_output(msg): + """Display a message regardless of the current output level. + + This is used when the output was specifically requested by the user. + Args: + msg; Message to display. + """ + _output(0, msg) + +def init(_verbose=WARNING, stdout=sys.stdout): + """Initialize a new output object. + + Args: + verbose: Verbosity level (0-4). + stdout: File to use for stdout. + """ + global verbose, _progress, _color, _stdout, stdout_is_tty + + verbose = _verbose + _progress = '' # Our last progress message + _color = terminal.Color() + _stdout = stdout + + # TODO(sjg): Move this into Chromite libraries when we have them + stdout_is_tty = hasattr(sys.stdout, 'isatty') and sys.stdout.isatty() + stderr_is_tty = hasattr(sys.stderr, 'isatty') and sys.stderr.isatty() + +def uninit(): + clear_progress() + +init() diff --git a/tools/u_boot_pylib/u_boot_pylib b/tools/u_boot_pylib/u_boot_pylib new file mode 120000 index 0000000000..5a427d1942 --- /dev/null +++ b/tools/u_boot_pylib/u_boot_pylib @@ -0,0 +1 @@ +__main__.py
\ No newline at end of file |