diff options
Diffstat (limited to 'tools/patman')
-rw-r--r-- | tools/patman/__init__.py | 7 | ||||
-rwxr-xr-x | tools/patman/__main__.py | 10 | ||||
-rw-r--r-- | tools/patman/checkpatch.py | 50 | ||||
-rw-r--r-- | tools/patman/command.py | 139 | ||||
-rw-r--r-- | tools/patman/control.py | 4 | ||||
-rw-r--r-- | tools/patman/cros_subprocess.py | 401 | ||||
-rw-r--r-- | tools/patman/func_test.py | 8 | ||||
-rw-r--r-- | tools/patman/get_maintainer.py | 2 | ||||
-rw-r--r-- | tools/patman/gitutil.py | 4 | ||||
-rw-r--r-- | tools/patman/patchstream.py | 2 | ||||
-rw-r--r-- | tools/patman/patman.rst | 12 | ||||
-rw-r--r-- | tools/patman/pyproject.toml | 29 | ||||
-rw-r--r-- | tools/patman/series.py | 112 | ||||
-rw-r--r-- | tools/patman/status.py | 4 | ||||
-rw-r--r-- | tools/patman/terminal.py | 270 | ||||
-rw-r--r-- | tools/patman/test_checkpatch.py | 6 | ||||
-rw-r--r-- | tools/patman/test_settings.py | 2 | ||||
-rw-r--r-- | tools/patman/test_util.py | 247 | ||||
-rw-r--r-- | tools/patman/tools.py | 596 | ||||
-rw-r--r-- | tools/patman/tout.py | 179 |
20 files changed, 183 insertions, 1901 deletions
diff --git a/tools/patman/__init__.py b/tools/patman/__init__.py index 1b98ec7fee..08eeffdf6d 100644 --- a/tools/patman/__init__.py +++ b/tools/patman/__init__.py @@ -1,6 +1,5 @@ # SPDX-License-Identifier: GPL-2.0+ -__all__ = ['checkpatch', 'command', 'commit', 'control', 'cros_subprocess', - 'func_test', 'get_maintainer', 'gitutil', '__main__', 'patchstream', - 'project', 'series', 'setup', 'settings', 'terminal', - 'test_checkpatch', 'test_util', 'tools', 'tout'] +__all__ = ['checkpatch', 'commit', 'control', 'func_test', 'get_maintainer', + 'gitutil', '__main__', 'patchstream', 'project', 'series', + 'settings','setup', 'status', 'test_checkpatch', 'test_settings'] diff --git a/tools/patman/__main__.py b/tools/patman/__main__.py index 749e6348b6..48ffbc8ead 100755 --- a/tools/patman/__main__.py +++ b/tools/patman/__main__.py @@ -24,10 +24,9 @@ from patman import func_test from patman import gitutil from patman import project from patman import settings -from patman import terminal -from patman import test_util -from patman import test_checkpatch -from patman import tools +from u_boot_pylib import terminal +from u_boot_pylib import test_util +from u_boot_pylib import tools epilog = '''Create patches from commits in a branch, check them and email them as specified by tags you place in the commits. Use -n to do a dry run first.''' @@ -146,11 +145,12 @@ if not args.debug: # Run our meagre tests if args.cmd == 'test': from patman import func_test + from patman import test_checkpatch result = test_util.run_test_suites( 'patman', False, False, False, None, None, None, [test_checkpatch.TestPatch, func_test.TestFunctional, - 'gitutil', 'settings', 'terminal']) + 'gitutil', 'settings']) sys.exit(0 if result.wasSuccessful() else 1) diff --git a/tools/patman/checkpatch.py b/tools/patman/checkpatch.py index d1b902dd96..e03cac115e 100644 --- a/tools/patman/checkpatch.py +++ b/tools/patman/checkpatch.py @@ -3,13 +3,14 @@ # import collections +import concurrent.futures import os import re import sys -from patman import command from patman import gitutil -from patman import terminal +from u_boot_pylib import command +from u_boot_pylib import terminal EMACS_PREFIX = r'(?:[0-9]{4}.*\.patch:[0-9]+: )?' TYPE_NAME = r'([A-Z_]+:)?' @@ -244,26 +245,31 @@ def check_patches(verbose, args, use_tree): error_count, warning_count, check_count = 0, 0, 0 col = terminal.Color() - for fname in args: - result = check_patch(fname, verbose, use_tree=use_tree) - if not result.ok: - error_count += result.errors - warning_count += result.warnings - check_count += result.checks - print('%d errors, %d warnings, %d checks for %s:' % (result.errors, - result.warnings, result.checks, col.build(col.BLUE, fname))) - if (len(result.problems) != result.errors + result.warnings + - result.checks): - print("Internal error: some problems lost") - # Python seems to get confused by this - # pylint: disable=E1133 - for item in result.problems: - sys.stderr.write( - get_warning_msg(col, item.get('type', '<unknown>'), - item.get('file', '<unknown>'), - item.get('line', 0), item.get('msg', 'message'))) - print - #print(stdout) + with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor: + futures = [] + for fname in args: + f = executor.submit(check_patch, fname, verbose, use_tree=use_tree) + futures.append(f) + + for fname, f in zip(args, futures): + result = f.result() + if not result.ok: + error_count += result.errors + warning_count += result.warnings + check_count += result.checks + print('%d errors, %d warnings, %d checks for %s:' % (result.errors, + result.warnings, result.checks, col.build(col.BLUE, fname))) + if (len(result.problems) != result.errors + result.warnings + + result.checks): + print("Internal error: some problems lost") + # Python seems to get confused by this + # pylint: disable=E1133 + for item in result.problems: + sys.stderr.write( + get_warning_msg(col, item.get('type', '<unknown>'), + item.get('file', '<unknown>'), + item.get('line', 0), item.get('msg', 'message'))) + print if error_count or warning_count or check_count: str = 'checkpatch.pl found %d error(s), %d warning(s), %d checks(s)' color = col.GREEN diff --git a/tools/patman/command.py b/tools/patman/command.py deleted file mode 100644 index 92c453b5c1..0000000000 --- a/tools/patman/command.py +++ /dev/null @@ -1,139 +0,0 @@ -# SPDX-License-Identifier: GPL-2.0+ -# Copyright (c) 2011 The Chromium OS Authors. -# - -import os - -from patman import cros_subprocess - -"""Shell command ease-ups for Python.""" - -class CommandResult: - """A class which captures the result of executing a command. - - Members: - stdout: stdout obtained from command, as a string - stderr: stderr obtained from command, as a string - return_code: Return code from command - exception: Exception received, or None if all ok - """ - def __init__(self, stdout='', stderr='', combined='', return_code=0, - exception=None): - self.stdout = stdout - self.stderr = stderr - self.combined = combined - self.return_code = return_code - self.exception = exception - - def to_output(self, binary): - if not binary: - self.stdout = self.stdout.decode('utf-8') - self.stderr = self.stderr.decode('utf-8') - self.combined = self.combined.decode('utf-8') - return self - - -# This permits interception of RunPipe for test purposes. If it is set to -# a function, then that function is called with the pipe list being -# executed. Otherwise, it is assumed to be a CommandResult object, and is -# returned as the result for every run_pipe() call. -# When this value is None, commands are executed as normal. -test_result = None - -def run_pipe(pipe_list, infile=None, outfile=None, - capture=False, capture_stderr=False, oneline=False, - raise_on_error=True, cwd=None, binary=False, - output_func=None, **kwargs): - """ - Perform a command pipeline, with optional input/output filenames. - - Args: - pipe_list: List of command lines to execute. Each command line is - piped into the next, and is itself a list of strings. For - example [ ['ls', '.git'] ['wc'] ] will pipe the output of - 'ls .git' into 'wc'. - infile: File to provide stdin to the pipeline - outfile: File to store stdout - capture: True to capture output - capture_stderr: True to capture stderr - oneline: True to strip newline chars from output - output_func: Output function to call with each output fragment - (if it returns True the function terminates) - kwargs: Additional keyword arguments to cros_subprocess.Popen() - Returns: - CommandResult object - """ - if test_result: - if hasattr(test_result, '__call__'): - # pylint: disable=E1102 - result = test_result(pipe_list=pipe_list) - if result: - return result - else: - return test_result - # No result: fall through to normal processing - result = CommandResult(b'', b'', b'') - last_pipe = None - pipeline = list(pipe_list) - user_pipestr = '|'.join([' '.join(pipe) for pipe in pipe_list]) - kwargs['stdout'] = None - kwargs['stderr'] = None - while pipeline: - cmd = pipeline.pop(0) - if last_pipe is not None: - kwargs['stdin'] = last_pipe.stdout - elif infile: - kwargs['stdin'] = open(infile, 'rb') - if pipeline or capture: - kwargs['stdout'] = cros_subprocess.PIPE - elif outfile: - kwargs['stdout'] = open(outfile, 'wb') - if capture_stderr: - kwargs['stderr'] = cros_subprocess.PIPE - - try: - last_pipe = cros_subprocess.Popen(cmd, cwd=cwd, **kwargs) - except Exception as err: - result.exception = err - if raise_on_error: - raise Exception("Error running '%s': %s" % (user_pipestr, str)) - result.return_code = 255 - return result.to_output(binary) - - if capture: - result.stdout, result.stderr, result.combined = ( - last_pipe.communicate_filter(output_func)) - if result.stdout and oneline: - result.output = result.stdout.rstrip(b'\r\n') - result.return_code = last_pipe.wait() - else: - result.return_code = os.waitpid(last_pipe.pid, 0)[1] - if raise_on_error and result.return_code: - raise Exception("Error running '%s'" % user_pipestr) - return result.to_output(binary) - -def output(*cmd, **kwargs): - kwargs['raise_on_error'] = kwargs.get('raise_on_error', True) - return run_pipe([cmd], capture=True, **kwargs).stdout - -def output_one_line(*cmd, **kwargs): - """Run a command and output it as a single-line string - - The command us expected to produce a single line of output - - Returns: - String containing output of command - """ - raise_on_error = kwargs.pop('raise_on_error', True) - result = run_pipe([cmd], capture=True, oneline=True, - raise_on_error=raise_on_error, **kwargs).stdout.strip() - return result - -def run(*cmd, **kwargs): - return run_pipe([cmd], **kwargs).stdout - -def run_list(cmd): - return run_pipe([cmd], capture=True).stdout - -def stop_all(): - cros_subprocess.stay_alive = False diff --git a/tools/patman/control.py b/tools/patman/control.py index 38e98dab84..916ddf8fcf 100644 --- a/tools/patman/control.py +++ b/tools/patman/control.py @@ -14,7 +14,7 @@ import sys from patman import checkpatch from patman import gitutil from patman import patchstream -from patman import terminal +from u_boot_pylib import terminal def setup(): """Do required setup before doing anything""" @@ -85,7 +85,7 @@ def check_patches(series, patch_files, run_checkpatch, verbose, use_tree): # Do a few checks on the series series.DoChecks() - # Check the patches, and run them through 'git am' just to be sure + # Check the patches if run_checkpatch: ok = checkpatch.check_patches(verbose, patch_files, use_tree) else: diff --git a/tools/patman/cros_subprocess.py b/tools/patman/cros_subprocess.py deleted file mode 100644 index cd614f38a6..0000000000 --- a/tools/patman/cros_subprocess.py +++ /dev/null @@ -1,401 +0,0 @@ -# Copyright (c) 2012 The Chromium OS Authors. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. -# -# Copyright (c) 2003-2005 by Peter Astrand <astrand@lysator.liu.se> -# Licensed to PSF under a Contributor Agreement. -# See http://www.python.org/2.4/license for licensing details. - -"""Subprocess execution - -This module holds a subclass of subprocess.Popen with our own required -features, mainly that we get access to the subprocess output while it -is running rather than just at the end. This makes it easier to show -progress information and filter output in real time. -""" - -import errno -import os -import pty -import select -import subprocess -import sys -import unittest - - -# Import these here so the caller does not need to import subprocess also. -PIPE = subprocess.PIPE -STDOUT = subprocess.STDOUT -PIPE_PTY = -3 # Pipe output through a pty -stay_alive = True - - -class Popen(subprocess.Popen): - """Like subprocess.Popen with ptys and incremental output - - This class deals with running a child process and filtering its output on - both stdout and stderr while it is running. We do this so we can monitor - progress, and possibly relay the output to the user if requested. - - The class is similar to subprocess.Popen, the equivalent is something like: - - Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - - But this class has many fewer features, and two enhancement: - - 1. Rather than getting the output data only at the end, this class sends it - to a provided operation as it arrives. - 2. We use pseudo terminals so that the child will hopefully flush its output - to us as soon as it is produced, rather than waiting for the end of a - line. - - Use communicate_filter() to handle output from the subprocess. - - """ - - def __init__(self, args, stdin=None, stdout=PIPE_PTY, stderr=PIPE_PTY, - shell=False, cwd=None, env=None, **kwargs): - """Cut-down constructor - - Args: - args: Program and arguments for subprocess to execute. - stdin: See subprocess.Popen() - stdout: See subprocess.Popen(), except that we support the sentinel - value of cros_subprocess.PIPE_PTY. - stderr: See subprocess.Popen(), except that we support the sentinel - value of cros_subprocess.PIPE_PTY. - shell: See subprocess.Popen() - cwd: Working directory to change to for subprocess, or None if none. - env: Environment to use for this subprocess, or None to inherit parent. - kwargs: No other arguments are supported at the moment. Passing other - arguments will cause a ValueError to be raised. - """ - stdout_pty = None - stderr_pty = None - - if stdout == PIPE_PTY: - stdout_pty = pty.openpty() - stdout = os.fdopen(stdout_pty[1]) - if stderr == PIPE_PTY: - stderr_pty = pty.openpty() - stderr = os.fdopen(stderr_pty[1]) - - super(Popen, self).__init__(args, stdin=stdin, - stdout=stdout, stderr=stderr, shell=shell, cwd=cwd, env=env, - **kwargs) - - # If we're on a PTY, we passed the slave half of the PTY to the subprocess. - # We want to use the master half on our end from now on. Setting this here - # does make some assumptions about the implementation of subprocess, but - # those assumptions are pretty minor. - - # Note that if stderr is STDOUT, then self.stderr will be set to None by - # this constructor. - if stdout_pty is not None: - self.stdout = os.fdopen(stdout_pty[0]) - if stderr_pty is not None: - self.stderr = os.fdopen(stderr_pty[0]) - - # Insist that unit tests exist for other arguments we don't support. - if kwargs: - raise ValueError("Unit tests do not test extra args - please add tests") - - def convert_data(self, data): - """Convert stdout/stderr data to the correct format for output - - Args: - data: Data to convert, or None for '' - - Returns: - Converted data, as bytes - """ - if data is None: - return b'' - return data - - def communicate_filter(self, output, input_buf=''): - """Interact with process: Read data from stdout and stderr. - - This method runs until end-of-file is reached, then waits for the - subprocess to terminate. - - The output function is sent all output from the subprocess and must be - defined like this: - - def output([self,] stream, data) - Args: - stream: the stream the output was received on, which will be - sys.stdout or sys.stderr. - data: a string containing the data - - Returns: - True to terminate the process - - Note: The data read is buffered in memory, so do not use this - method if the data size is large or unlimited. - - Args: - output: Function to call with each fragment of output. - - Returns: - A tuple (stdout, stderr, combined) which is the data received on - stdout, stderr and the combined data (interleaved stdout and stderr). - - Note that the interleaved output will only be sensible if you have - set both stdout and stderr to PIPE or PIPE_PTY. Even then it depends on - the timing of the output in the subprocess. If a subprocess flips - between stdout and stderr quickly in succession, by the time we come to - read the output from each we may see several lines in each, and will read - all the stdout lines, then all the stderr lines. So the interleaving - may not be correct. In this case you might want to pass - stderr=cros_subprocess.STDOUT to the constructor. - - This feature is still useful for subprocesses where stderr is - rarely used and indicates an error. - - Note also that if you set stderr to STDOUT, then stderr will be empty - and the combined output will just be the same as stdout. - """ - - read_set = [] - write_set = [] - stdout = None # Return - stderr = None # Return - - if self.stdin: - # Flush stdio buffer. This might block, if the user has - # been writing to .stdin in an uncontrolled fashion. - self.stdin.flush() - if input_buf: - write_set.append(self.stdin) - else: - self.stdin.close() - if self.stdout: - read_set.append(self.stdout) - stdout = bytearray() - if self.stderr and self.stderr != self.stdout: - read_set.append(self.stderr) - stderr = bytearray() - combined = bytearray() - - stop_now = False - input_offset = 0 - while read_set or write_set: - try: - rlist, wlist, _ = select.select(read_set, write_set, [], 0.2) - except select.error as e: - if e.args[0] == errno.EINTR: - continue - raise - - if not stay_alive: - self.terminate() - - if self.stdin in wlist: - # When select has indicated that the file is writable, - # we can write up to PIPE_BUF bytes without risk - # blocking. POSIX defines PIPE_BUF >= 512 - chunk = input_buf[input_offset : input_offset + 512] - bytes_written = os.write(self.stdin.fileno(), chunk) - input_offset += bytes_written - if input_offset >= len(input_buf): - self.stdin.close() - write_set.remove(self.stdin) - - if self.stdout in rlist: - data = b'' - # We will get an error on read if the pty is closed - try: - data = os.read(self.stdout.fileno(), 1024) - except OSError: - pass - if not len(data): - self.stdout.close() - read_set.remove(self.stdout) - else: - stdout += data - combined += data - if output: - stop_now = output(sys.stdout, data) - if self.stderr in rlist: - data = b'' - # We will get an error on read if the pty is closed - try: - data = os.read(self.stderr.fileno(), 1024) - except OSError: - pass - if not len(data): - self.stderr.close() - read_set.remove(self.stderr) - else: - stderr += data - combined += data - if output: - stop_now = output(sys.stderr, data) - if stop_now: - self.terminate() - - # All data exchanged. Translate lists into strings. - stdout = self.convert_data(stdout) - stderr = self.convert_data(stderr) - combined = self.convert_data(combined) - - self.wait() - return (stdout, stderr, combined) - - -# Just being a unittest.TestCase gives us 14 public methods. Unless we -# disable this, we can only have 6 tests in a TestCase. That's not enough. -# -# pylint: disable=R0904 - -class TestSubprocess(unittest.TestCase): - """Our simple unit test for this module""" - - class MyOperation: - """Provides a operation that we can pass to Popen""" - def __init__(self, input_to_send=None): - """Constructor to set up the operation and possible input. - - Args: - input_to_send: a text string to send when we first get input. We will - add \r\n to the string. - """ - self.stdout_data = '' - self.stderr_data = '' - self.combined_data = '' - self.stdin_pipe = None - self._input_to_send = input_to_send - if input_to_send: - pipe = os.pipe() - self.stdin_read_pipe = pipe[0] - self._stdin_write_pipe = os.fdopen(pipe[1], 'w') - - def output(self, stream, data): - """Output handler for Popen. Stores the data for later comparison""" - if stream == sys.stdout: - self.stdout_data += data - if stream == sys.stderr: - self.stderr_data += data - self.combined_data += data - - # Output the input string if we have one. - if self._input_to_send: - self._stdin_write_pipe.write(self._input_to_send + '\r\n') - self._stdin_write_pipe.flush() - - def _basic_check(self, plist, oper): - """Basic checks that the output looks sane.""" - self.assertEqual(plist[0], oper.stdout_data) - self.assertEqual(plist[1], oper.stderr_data) - self.assertEqual(plist[2], oper.combined_data) - - # The total length of stdout and stderr should equal the combined length - self.assertEqual(len(plist[0]) + len(plist[1]), len(plist[2])) - - def test_simple(self): - """Simple redirection: Get process list""" - oper = TestSubprocess.MyOperation() - plist = Popen(['ps']).communicate_filter(oper.output) - self._basic_check(plist, oper) - - def test_stderr(self): - """Check stdout and stderr""" - oper = TestSubprocess.MyOperation() - cmd = 'echo fred >/dev/stderr && false || echo bad' - plist = Popen([cmd], shell=True).communicate_filter(oper.output) - self._basic_check(plist, oper) - self.assertEqual(plist [0], 'bad\r\n') - self.assertEqual(plist [1], 'fred\r\n') - - def test_shell(self): - """Check with and without shell works""" - oper = TestSubprocess.MyOperation() - cmd = 'echo test >/dev/stderr' - self.assertRaises(OSError, Popen, [cmd], shell=False) - plist = Popen([cmd], shell=True).communicate_filter(oper.output) - self._basic_check(plist, oper) - self.assertEqual(len(plist [0]), 0) - self.assertEqual(plist [1], 'test\r\n') - - def test_list_args(self): - """Check with and without shell works using list arguments""" - oper = TestSubprocess.MyOperation() - cmd = ['echo', 'test', '>/dev/stderr'] - plist = Popen(cmd, shell=False).communicate_filter(oper.output) - self._basic_check(plist, oper) - self.assertEqual(plist [0], ' '.join(cmd[1:]) + '\r\n') - self.assertEqual(len(plist [1]), 0) - - oper = TestSubprocess.MyOperation() - - # this should be interpreted as 'echo' with the other args dropped - cmd = ['echo', 'test', '>/dev/stderr'] - plist = Popen(cmd, shell=True).communicate_filter(oper.output) - self._basic_check(plist, oper) - self.assertEqual(plist [0], '\r\n') - - def test_cwd(self): - """Check we can change directory""" - for shell in (False, True): - oper = TestSubprocess.MyOperation() - plist = Popen('pwd', shell=shell, cwd='/tmp').communicate_filter( - oper.output) - self._basic_check(plist, oper) - self.assertEqual(plist [0], '/tmp\r\n') - - def test_env(self): - """Check we can change environment""" - for add in (False, True): - oper = TestSubprocess.MyOperation() - env = os.environ - if add: - env ['FRED'] = 'fred' - cmd = 'echo $FRED' - plist = Popen(cmd, shell=True, env=env).communicate_filter(oper.output) - self._basic_check(plist, oper) - self.assertEqual(plist [0], add and 'fred\r\n' or '\r\n') - - def test_extra_args(self): - """Check we can't add extra arguments""" - self.assertRaises(ValueError, Popen, 'true', close_fds=False) - - def test_basic_input(self): - """Check that incremental input works - - We set up a subprocess which will prompt for name. When we see this prompt - we send the name as input to the process. It should then print the name - properly to stdout. - """ - oper = TestSubprocess.MyOperation('Flash') - prompt = 'What is your name?: ' - cmd = 'echo -n "%s"; read name; echo Hello $name' % prompt - plist = Popen([cmd], stdin=oper.stdin_read_pipe, - shell=True).communicate_filter(oper.output) - self._basic_check(plist, oper) - self.assertEqual(len(plist [1]), 0) - self.assertEqual(plist [0], prompt + 'Hello Flash\r\r\n') - - def test_isatty(self): - """Check that ptys appear as terminals to the subprocess""" - oper = TestSubprocess.MyOperation() - cmd = ('if [ -t %d ]; then echo "terminal %d" >&%d; ' - 'else echo "not %d" >&%d; fi;') - both_cmds = '' - for fd in (1, 2): - both_cmds += cmd % (fd, fd, fd, fd, fd) - plist = Popen(both_cmds, shell=True).communicate_filter(oper.output) - self._basic_check(plist, oper) - self.assertEqual(plist [0], 'terminal 1\r\n') - self.assertEqual(plist [1], 'terminal 2\r\n') - - # Now try with PIPE and make sure it is not a terminal - oper = TestSubprocess.MyOperation() - plist = Popen(both_cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE, - shell=True).communicate_filter(oper.output) - self._basic_check(plist, oper) - self.assertEqual(plist [0], 'not 1\n') - self.assertEqual(plist [1], 'not 2\n') - -if __name__ == '__main__': - unittest.main() diff --git a/tools/patman/func_test.py b/tools/patman/func_test.py index c25a47bdeb..42ac4ed77b 100644 --- a/tools/patman/func_test.py +++ b/tools/patman/func_test.py @@ -23,9 +23,9 @@ from patman import patchstream from patman.patchstream import PatchStream from patman.series import Series from patman import settings -from patman import terminal -from patman import tools -from patman.test_util import capture_sys_output +from u_boot_pylib import terminal +from u_boot_pylib import tools +from u_boot_pylib.test_util import capture_sys_output import pygit2 from patman import status @@ -240,6 +240,8 @@ class TestFunctional(unittest.TestCase): self.assertEqual('Change log missing for v3', next(lines)) self.assertEqual('Change log for unknown version v4', next(lines)) self.assertEqual("Alias 'pci' not found", next(lines)) + while next(lines) != 'Cc processing complete': + pass self.assertIn('Dry run', next(lines)) self.assertEqual('', next(lines)) self.assertIn('Send a total of %d patches' % count, next(lines)) diff --git a/tools/patman/get_maintainer.py b/tools/patman/get_maintainer.py index f7011be1e4..8df3d124ba 100644 --- a/tools/patman/get_maintainer.py +++ b/tools/patman/get_maintainer.py @@ -7,8 +7,8 @@ import os import shlex import shutil -from patman import command from patman import gitutil +from u_boot_pylib import command def find_get_maintainer(script_file_name): diff --git a/tools/patman/gitutil.py b/tools/patman/gitutil.py index 5e742102c2..6700057359 100644 --- a/tools/patman/gitutil.py +++ b/tools/patman/gitutil.py @@ -5,9 +5,9 @@ import os import sys -from patman import command from patman import settings -from patman import terminal +from u_boot_pylib import command +from u_boot_pylib import terminal # True to use --no-decorate - we check this in setup() use_no_decorate = True diff --git a/tools/patman/patchstream.py b/tools/patman/patchstream.py index fb6a6036f3..f91669a940 100644 --- a/tools/patman/patchstream.py +++ b/tools/patman/patchstream.py @@ -14,10 +14,10 @@ import queue import shutil import tempfile -from patman import command from patman import commit from patman import gitutil from patman.series import Series +from u_boot_pylib import command # Tags that we detect and remove RE_REMOVE = re.compile(r'^BUG=|^TEST=|^BRANCH=|^Review URL:' diff --git a/tools/patman/patman.rst b/tools/patman/patman.rst index 6113962fb4..038b651ee8 100644 --- a/tools/patman/patman.rst +++ b/tools/patman/patman.rst @@ -41,6 +41,18 @@ In Linux and U-Boot this will also call get_maintainer.pl on each of your patches automatically (unless you use -m to disable this). +Installation +------------ + +You can install patman using:: + + pip install patch-manager + +The name is chosen since patman conflicts with an existing package. + +If you are using patman within the U-Boot tree, it may be easiest to add a +symlink from your local `~/.bin` directory to `/path/to/tools/patman/patman`. + How to use this tool -------------------- diff --git a/tools/patman/pyproject.toml b/tools/patman/pyproject.toml new file mode 100644 index 0000000000..c5dc7c7e27 --- /dev/null +++ b/tools/patman/pyproject.toml @@ -0,0 +1,29 @@ +[build-system] +requires = ["setuptools>=61.0"] +build-backend = "setuptools.build_meta" + +[project] +name = "patch-manager" +version = "0.0.2" +authors = [ + { name="Simon Glass", email="sjg@chromium.org" }, +] +dependencies = ["u_boot_pylib"] +description = "Patman patch manager" +readme = "README.rst" +requires-python = ">=3.7" +classifiers = [ + "Programming Language :: Python :: 3", + "License :: OSI Approved :: GNU General Public License v2 or later (GPLv2+)", + "Operating System :: OS Independent", +] + +[project.urls] +"Homepage" = "https://u-boot.readthedocs.io/en/latest/develop/patman.html" +"Bug Tracker" = "https://source.denx.de/groups/u-boot/-/issues" + +[project.scripts] +patman = "patman.__main__:run_patman" + +[tool.setuptools.package-data] +patman = ["*.rst"] diff --git a/tools/patman/series.py b/tools/patman/series.py index 2eeeef71dc..6866e1dbd0 100644 --- a/tools/patman/series.py +++ b/tools/patman/series.py @@ -5,14 +5,17 @@ from __future__ import print_function import collections +import concurrent.futures import itertools import os +import sys +import time from patman import get_maintainer from patman import gitutil from patman import settings -from patman import terminal -from patman import tools +from u_boot_pylib import terminal +from u_boot_pylib import tools # Series-xxx tags that we understand valid_series = ['to', 'cc', 'version', 'changes', 'prefix', 'notes', 'name', @@ -234,6 +237,49 @@ class Series(dict): str = 'Change log exists, but no version is set' print(col.build(col.RED, str)) + def GetCcForCommit(self, commit, process_tags, warn_on_error, + add_maintainers, limit, get_maintainer_script, + all_skips): + """Get the email CCs to use with a particular commit + + Uses subject tags and get_maintainers.pl script to find people to cc + on a patch + + Args: + commit (Commit): Commit to process + process_tags (bool): Process tags as if they were aliases + warn_on_error (bool): True to print a warning when an alias fails to + match, False to ignore it. + add_maintainers (bool or list of str): Either: + True/False to call the get_maintainers to CC maintainers + List of maintainers to include (for testing) + limit (int): Limit the length of the Cc list (None if no limit) + get_maintainer_script (str): The file name of the get_maintainer.pl + script (or compatible). + all_skips (set of str): Updated to include the set of bouncing email + addresses that were dropped from the output. This is essentially + a return value from this function. + + Returns: + list of str: List of email addresses to cc + """ + cc = [] + if process_tags: + cc += gitutil.build_email_list(commit.tags, + warn_on_error=warn_on_error) + cc += gitutil.build_email_list(commit.cc_list, + warn_on_error=warn_on_error) + if type(add_maintainers) == type(cc): + cc += add_maintainers + elif add_maintainers: + cc += get_maintainer.get_maintainer(get_maintainer_script, + commit.patch) + all_skips |= set(cc) & set(settings.bounces) + cc = list(set(cc) - set(settings.bounces)) + if limit is not None: + cc = cc[:limit] + return cc + def MakeCcFile(self, process_tags, cover_fname, warn_on_error, add_maintainers, limit, get_maintainer_script): """Make a cc file for us to use for per-commit Cc automation @@ -241,15 +287,15 @@ class Series(dict): Also stores in self._generated_cc to make ShowActions() faster. Args: - process_tags: Process tags as if they were aliases - cover_fname: If non-None the name of the cover letter. - warn_on_error: True to print a warning when an alias fails to match, - False to ignore it. - add_maintainers: Either: + process_tags (bool): Process tags as if they were aliases + cover_fname (str): If non-None the name of the cover letter. + warn_on_error (bool): True to print a warning when an alias fails to + match, False to ignore it. + add_maintainers (bool or list of str): Either: True/False to call the get_maintainers to CC maintainers List of maintainers to include (for testing) - limit: Limit the length of the Cc list (None if no limit) - get_maintainer_script: The file name of the get_maintainer.pl + limit (int): Limit the length of the Cc list (None if no limit) + get_maintainer_script (str): The file name of the get_maintainer.pl script (or compatible). Return: Filename of temp file created @@ -259,28 +305,42 @@ class Series(dict): fname = '/tmp/patman.%d' % os.getpid() fd = open(fname, 'w', encoding='utf-8') all_ccs = [] + all_skips = set() + with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor: + for i, commit in enumerate(self.commits): + commit.seq = i + commit.future = executor.submit( + self.GetCcForCommit, commit, process_tags, warn_on_error, + add_maintainers, limit, get_maintainer_script, all_skips) + + # Show progress any commits that are taking forever + lastlen = 0 + while True: + left = [commit for commit in self.commits + if not commit.future.done()] + if not left: + break + names = ', '.join(f'{c.seq + 1}:{c.subject}' + for c in left[:2]) + out = f'\r{len(left)} remaining: {names}'[:79] + spaces = ' ' * (lastlen - len(out)) + if lastlen: # Don't print anything the first time + print(out, spaces, end='') + sys.stdout.flush() + lastlen = len(out) + time.sleep(.25) + print(f'\rdone{" " * lastlen}\r', end='') + print('Cc processing complete') + for commit in self.commits: - cc = [] - if process_tags: - cc += gitutil.build_email_list(commit.tags, - warn_on_error=warn_on_error) - cc += gitutil.build_email_list(commit.cc_list, - warn_on_error=warn_on_error) - if type(add_maintainers) == type(cc): - cc += add_maintainers - elif add_maintainers: - - cc += get_maintainer.get_maintainer(get_maintainer_script, - commit.patch) - for x in set(cc) & set(settings.bounces): - print(col.build(col.YELLOW, 'Skipping "%s"' % x)) - cc = list(set(cc) - set(settings.bounces)) - if limit is not None: - cc = cc[:limit] + cc = commit.future.result() all_ccs += cc print(commit.patch, '\0'.join(sorted(set(cc))), file=fd) self._generated_cc[commit.patch] = cc + for x in sorted(all_skips): + print(col.build(col.YELLOW, f'Skipping "{x}"')) + if cover_fname: cover_cc = gitutil.build_email_list(self.get('cover_cc', '')) cover_cc = list(set(cover_cc + all_ccs)) diff --git a/tools/patman/status.py b/tools/patman/status.py index 47ed6d61d4..5fb436e08f 100644 --- a/tools/patman/status.py +++ b/tools/patman/status.py @@ -18,8 +18,8 @@ import requests from patman import patchstream from patman.patchstream import PatchStream -from patman import terminal -from patman import tout +from u_boot_pylib import terminal +from u_boot_pylib import tout # Patches which are part of a multi-patch series are shown with a prefix like # [prefix, version, sequence], for example '[RFC, v2, 3/5]'. All but the last diff --git a/tools/patman/terminal.py b/tools/patman/terminal.py deleted file mode 100644 index 40d79f8ac0..0000000000 --- a/tools/patman/terminal.py +++ /dev/null @@ -1,270 +0,0 @@ -# SPDX-License-Identifier: GPL-2.0+ -# Copyright (c) 2011 The Chromium OS Authors. -# - -"""Terminal utilities - -This module handles terminal interaction including ANSI color codes. -""" - -import os -import re -import shutil -import sys - -# Selection of when we want our output to be colored -COLOR_IF_TERMINAL, COLOR_ALWAYS, COLOR_NEVER = range(3) - -# Initially, we are set up to print to the terminal -print_test_mode = False -print_test_list = [] - -# The length of the last line printed without a newline -last_print_len = None - -# credit: -# stackoverflow.com/questions/14693701/how-can-i-remove-the-ansi-escape-sequences-from-a-string-in-python -ansi_escape = re.compile(r'\x1b(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') - -class PrintLine: - """A line of text output - - Members: - text: Text line that was printed - newline: True to output a newline after the text - colour: Text colour to use - """ - def __init__(self, text, colour, newline=True, bright=True): - self.text = text - self.newline = newline - self.colour = colour - self.bright = bright - - def __eq__(self, other): - return (self.text == other.text and - self.newline == other.newline and - self.colour == other.colour and - self.bright == other.bright) - - def __str__(self): - return ("newline=%s, colour=%s, bright=%d, text='%s'" % - (self.newline, self.colour, self.bright, self.text)) - - -def calc_ascii_len(text): - """Calculate the length of a string, ignoring any ANSI sequences - - When displayed on a terminal, ANSI sequences don't take any space, so we - need to ignore them when calculating the length of a string. - - Args: - text: Text to check - - Returns: - Length of text, after skipping ANSI sequences - - >>> col = Color(COLOR_ALWAYS) - >>> text = col.build(Color.RED, 'abc') - >>> len(text) - 14 - >>> calc_ascii_len(text) - 3 - >>> - >>> text += 'def' - >>> calc_ascii_len(text) - 6 - >>> text += col.build(Color.RED, 'abc') - >>> calc_ascii_len(text) - 9 - """ - result = ansi_escape.sub('', text) - return len(result) - -def trim_ascii_len(text, size): - """Trim a string containing ANSI sequences to the given ASCII length - - The string is trimmed with ANSI sequences being ignored for the length - calculation. - - >>> col = Color(COLOR_ALWAYS) - >>> text = col.build(Color.RED, 'abc') - >>> len(text) - 14 - >>> calc_ascii_len(trim_ascii_len(text, 4)) - 3 - >>> calc_ascii_len(trim_ascii_len(text, 2)) - 2 - >>> text += 'def' - >>> calc_ascii_len(trim_ascii_len(text, 4)) - 4 - >>> text += col.build(Color.RED, 'ghi') - >>> calc_ascii_len(trim_ascii_len(text, 7)) - 7 - """ - if calc_ascii_len(text) < size: - return text - pos = 0 - out = '' - left = size - - # Work through each ANSI sequence in turn - for m in ansi_escape.finditer(text): - # Find the text before the sequence and add it to our string, making - # sure it doesn't overflow - before = text[pos:m.start()] - toadd = before[:left] - out += toadd - - # Figure out how much non-ANSI space we have left - left -= len(toadd) - - # Add the ANSI sequence and move to the position immediately after it - out += m.group() - pos = m.start() + len(m.group()) - - # Deal with text after the last ANSI sequence - after = text[pos:] - toadd = after[:left] - out += toadd - - return out - - -def tprint(text='', newline=True, colour=None, limit_to_line=False, bright=True): - """Handle a line of output to the terminal. - - In test mode this is recorded in a list. Otherwise it is output to the - terminal. - - Args: - text: Text to print - newline: True to add a new line at the end of the text - colour: Colour to use for the text - """ - global last_print_len - - if print_test_mode: - print_test_list.append(PrintLine(text, colour, newline, bright)) - else: - if colour: - col = Color() - text = col.build(colour, text, bright=bright) - if newline: - print(text) - last_print_len = None - else: - if limit_to_line: - cols = shutil.get_terminal_size().columns - text = trim_ascii_len(text, cols) - print(text, end='', flush=True) - last_print_len = calc_ascii_len(text) - -def print_clear(): - """Clear a previously line that was printed with no newline""" - global last_print_len - - if last_print_len: - print('\r%s\r' % (' '* last_print_len), end='', flush=True) - last_print_len = None - -def set_print_test_mode(enable=True): - """Go into test mode, where all printing is recorded""" - global print_test_mode - - print_test_mode = enable - get_print_test_lines() - -def get_print_test_lines(): - """Get a list of all lines output through tprint() - - Returns: - A list of PrintLine objects - """ - global print_test_list - - ret = print_test_list - print_test_list = [] - return ret - -def echo_print_test_lines(): - """Print out the text lines collected""" - for line in print_test_list: - if line.colour: - col = Color() - print(col.build(line.colour, line.text), end='') - else: - print(line.text, end='') - if line.newline: - print() - - -class Color(object): - """Conditionally wraps text in ANSI color escape sequences.""" - BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8) - BOLD = -1 - BRIGHT_START = '\033[1;%dm' - NORMAL_START = '\033[22;%dm' - BOLD_START = '\033[1m' - RESET = '\033[0m' - - def __init__(self, colored=COLOR_IF_TERMINAL): - """Create a new Color object, optionally disabling color output. - - Args: - enabled: True if color output should be enabled. If False then this - class will not add color codes at all. - """ - try: - self._enabled = (colored == COLOR_ALWAYS or - (colored == COLOR_IF_TERMINAL and - os.isatty(sys.stdout.fileno()))) - except: - self._enabled = False - - def start(self, color, bright=True): - """Returns a start color code. - - Args: - color: Color to use, .e.g BLACK, RED, etc. - - Returns: - If color is enabled, returns an ANSI sequence to start the given - color, otherwise returns empty string - """ - if self._enabled: - base = self.BRIGHT_START if bright else self.NORMAL_START - return base % (color + 30) - return '' - - def stop(self): - """Returns a stop color code. - - Returns: - If color is enabled, returns an ANSI color reset sequence, - otherwise returns empty string - """ - if self._enabled: - return self.RESET - return '' - - def build(self, color, text, bright=True): - """Returns text with conditionally added color escape sequences. - - Keyword arguments: - color: Text color -- one of the color constants defined in this - class. - text: The text to color. - - Returns: - If self._enabled is False, returns the original text. If it's True, - returns text with color escape sequences based on the value of - color. - """ - if not self._enabled: - return text - if color == self.BOLD: - start = self.BOLD_START - else: - base = self.BRIGHT_START if bright else self.NORMAL_START - start = base % (color + 30) - return start + text + self.RESET diff --git a/tools/patman/test_checkpatch.py b/tools/patman/test_checkpatch.py index 4c2ab6e590..a8bb364e42 100644 --- a/tools/patman/test_checkpatch.py +++ b/tools/patman/test_checkpatch.py @@ -452,6 +452,12 @@ index 0000000..2234c87 self.check_strl("cat"); self.check_strl("cpy"); + def test_schema(self): + """Check for uses of strn(cat|cpy)""" + pm = PatchMaker() + pm.add_line('arch/sandbox/dts/sandbox.dtsi', '\tu-boot,dm-pre-proper;') + self.check_single_message(pm, 'PRE_SCHEMA', 'error') + if __name__ == "__main__": unittest.main() gitutil.RunTests() diff --git a/tools/patman/test_settings.py b/tools/patman/test_settings.py index c768a2fc64..06b7cbc3ab 100644 --- a/tools/patman/test_settings.py +++ b/tools/patman/test_settings.py @@ -10,7 +10,7 @@ import sys import tempfile from patman import settings -from patman import tools +from u_boot_pylib import tools @contextlib.contextmanager diff --git a/tools/patman/test_util.py b/tools/patman/test_util.py deleted file mode 100644 index 0f6d1aa902..0000000000 --- a/tools/patman/test_util.py +++ /dev/null @@ -1,247 +0,0 @@ -# SPDX-License-Identifier: GPL-2.0+ -# -# Copyright (c) 2016 Google, Inc -# - -from contextlib import contextmanager -import doctest -import glob -import multiprocessing -import os -import sys -import unittest - -from patman import command - -from io import StringIO - -buffer_outputs = True -use_concurrent = True -try: - from concurrencytest.concurrencytest import ConcurrentTestSuite - from concurrencytest.concurrencytest import fork_for_tests -except: - use_concurrent = False - - -def run_test_coverage(prog, filter_fname, exclude_list, build_dir, required=None, - extra_args=None): - """Run tests and check that we get 100% coverage - - Args: - prog: Program to run (with be passed a '-t' argument to run tests - filter_fname: Normally all *.py files in the program's directory will - be included. If this is not None, then it is used to filter the - list so that only filenames that don't contain filter_fname are - included. - exclude_list: List of file patterns to exclude from the coverage - calculation - build_dir: Build directory, used to locate libfdt.py - required: List of modules which must be in the coverage report - extra_args (str): Extra arguments to pass to the tool before the -t/test - arg - - Raises: - ValueError if the code coverage is not 100% - """ - # This uses the build output from sandbox_spl to get _libfdt.so - path = os.path.dirname(prog) - if filter_fname: - glob_list = glob.glob(os.path.join(path, '*.py')) - glob_list = [fname for fname in glob_list if filter_fname in fname] - else: - glob_list = [] - glob_list += exclude_list - glob_list += ['*libfdt.py', '*site-packages*', '*dist-packages*'] - glob_list += ['*concurrencytest*'] - test_cmd = 'test' if 'binman' in prog or 'patman' in prog else '-t' - prefix = '' - if build_dir: - prefix = 'PYTHONPATH=$PYTHONPATH:%s/sandbox_spl/tools ' % build_dir - cmd = ('%spython3-coverage run ' - '--omit "%s" %s %s %s -P1' % (prefix, ','.join(glob_list), - prog, extra_args or '', test_cmd)) - os.system(cmd) - stdout = command.output('python3-coverage', 'report') - lines = stdout.splitlines() - if required: - # Convert '/path/to/name.py' just the module name 'name' - test_set = set([os.path.splitext(os.path.basename(line.split()[0]))[0] - for line in lines if '/etype/' in line]) - missing_list = required - missing_list.discard('__init__') - missing_list.difference_update(test_set) - if missing_list: - print('Missing tests for %s' % (', '.join(missing_list))) - print(stdout) - ok = False - - coverage = lines[-1].split(' ')[-1] - ok = True - print(coverage) - if coverage != '100%': - print(stdout) - print("To get a report in 'htmlcov/index.html', type: python3-coverage html") - print('Coverage error: %s, but should be 100%%' % coverage) - ok = False - if not ok: - raise ValueError('Test coverage failure') - - -# Use this to suppress stdout/stderr output: -# with capture_sys_output() as (stdout, stderr) -# ...do something... -@contextmanager -def capture_sys_output(): - capture_out, capture_err = StringIO(), StringIO() - old_out, old_err = sys.stdout, sys.stderr - try: - sys.stdout, sys.stderr = capture_out, capture_err - yield capture_out, capture_err - finally: - sys.stdout, sys.stderr = old_out, old_err - - -class FullTextTestResult(unittest.TextTestResult): - """A test result class that can print extended text results to a stream - - This is meant to be used by a TestRunner as a result class. Like - TextTestResult, this prints out the names of tests as they are run, - errors as they occur, and a summary of the results at the end of the - test run. Beyond those, this prints information about skipped tests, - expected failures and unexpected successes. - - Args: - stream: A file-like object to write results to - descriptions (bool): True to print descriptions with test names - verbosity (int): Detail of printed output per test as they run - Test stdout and stderr always get printed when buffering - them is disabled by the test runner. In addition to that, - 0: Print nothing - 1: Print a dot per test - 2: Print test names - 3: Print test names, and buffered outputs for failing tests - """ - def __init__(self, stream, descriptions, verbosity): - self.verbosity = verbosity - super().__init__(stream, descriptions, verbosity) - - def printErrors(self): - "Called by TestRunner after test run to summarize the tests" - # The parent class doesn't keep unexpected successes in the same - # format as the rest. Adapt it to what printErrorList expects. - unexpected_successes = [ - (test, 'Test was expected to fail, but succeeded.\n') - for test in self.unexpectedSuccesses - ] - - super().printErrors() # FAIL and ERROR - self.printErrorList('SKIP', self.skipped) - self.printErrorList('XFAIL', self.expectedFailures) - self.printErrorList('XPASS', unexpected_successes) - - def addError(self, test, err): - """Called when an error has occurred.""" - super().addError(test, err) - self._mirrorOutput &= self.verbosity >= 3 - - def addFailure(self, test, err): - """Called when a test has failed.""" - super().addFailure(test, err) - self._mirrorOutput &= self.verbosity >= 3 - - def addSubTest(self, test, subtest, err): - """Called at the end of a subtest.""" - super().addSubTest(test, subtest, err) - self._mirrorOutput &= self.verbosity >= 3 - - def addSuccess(self, test): - """Called when a test has completed successfully""" - super().addSuccess(test) - # Don't print stdout/stderr for successful tests - self._mirrorOutput = False - - def addSkip(self, test, reason): - """Called when a test is skipped.""" - # Add empty line to keep spacing consistent with other results - if not reason.endswith('\n'): - reason += '\n' - super().addSkip(test, reason) - self._mirrorOutput &= self.verbosity >= 3 - - def addExpectedFailure(self, test, err): - """Called when an expected failure/error occurred.""" - super().addExpectedFailure(test, err) - self._mirrorOutput &= self.verbosity >= 3 - - -def run_test_suites(toolname, debug, verbosity, test_preserve_dirs, processes, - test_name, toolpath, class_and_module_list): - """Run a series of test suites and collect the results - - Args: - toolname: Name of the tool that ran the tests - debug: True to enable debugging, which shows a full stack trace on error - verbosity: Verbosity level to use (0-4) - test_preserve_dirs: True to preserve the input directory used by tests - so that it can be examined afterwards (only useful for debugging - tests). If a single test is selected (in args[0]) it also preserves - the output directory for this test. Both directories are displayed - on the command line. - processes: Number of processes to use to run tests (None=same as #CPUs) - test_name: Name of test to run, or None for all - toolpath: List of paths to use for tools - class_and_module_list: List of test classes (type class) and module - names (type str) to run - """ - sys.argv = [sys.argv[0]] - if debug: - sys.argv.append('-D') - if verbosity: - sys.argv.append('-v%d' % verbosity) - if toolpath: - for path in toolpath: - sys.argv += ['--toolpath', path] - - suite = unittest.TestSuite() - loader = unittest.TestLoader() - runner = unittest.TextTestRunner( - stream=sys.stdout, - verbosity=(1 if verbosity is None else verbosity), - buffer=False if test_name else buffer_outputs, - resultclass=FullTextTestResult, - ) - - if use_concurrent and processes != 1: - suite = ConcurrentTestSuite(suite, - fork_for_tests(processes or multiprocessing.cpu_count(), - buffer=False if test_name else buffer_outputs)) - - for module in class_and_module_list: - if isinstance(module, str) and (not test_name or test_name == module): - suite.addTests(doctest.DocTestSuite(module)) - - for module in class_and_module_list: - if isinstance(module, str): - continue - # Test the test module about our arguments, if it is interested - if hasattr(module, 'setup_test_args'): - setup_test_args = getattr(module, 'setup_test_args') - setup_test_args(preserve_indir=test_preserve_dirs, - preserve_outdirs=test_preserve_dirs and test_name is not None, - toolpath=toolpath, verbosity=verbosity) - if test_name: - # Since Python v3.5 If an ImportError or AttributeError occurs - # while traversing a name then a synthetic test that raises that - # error when run will be returned. Check that the requested test - # exists, otherwise these errors are included in the results. - if test_name in loader.getTestCaseNames(module): - suite.addTests(loader.loadTestsFromName(test_name, module)) - else: - suite.addTests(loader.loadTestsFromTestCase(module)) - - print(f" Running {toolname} tests ".center(70, "=")) - result = runner.run(suite) - print() - - return result diff --git a/tools/patman/tools.py b/tools/patman/tools.py deleted file mode 100644 index 2ac814d476..0000000000 --- a/tools/patman/tools.py +++ /dev/null @@ -1,596 +0,0 @@ -# SPDX-License-Identifier: GPL-2.0+ -# -# Copyright (c) 2016 Google, Inc -# - -import glob -import os -import shlex -import shutil -import sys -import tempfile -import urllib.request - -from patman import command -from patman import tout - -# Output directly (generally this is temporary) -outdir = None - -# True to keep the output directory around after exiting -preserve_outdir = False - -# Path to the Chrome OS chroot, if we know it -chroot_path = None - -# Search paths to use for filename(), used to find files -search_paths = [] - -tool_search_paths = [] - -# Tools and the packages that contain them, on debian -packages = { - 'lz4': 'liblz4-tool', - } - -# List of paths to use when looking for an input file -indir = [] - -def prepare_output_dir(dirname, preserve=False): - """Select an output directory, ensuring it exists. - - This either creates a temporary directory or checks that the one supplied - by the user is valid. For a temporary directory, it makes a note to - remove it later if required. - - Args: - dirname: a string, name of the output directory to use to store - intermediate and output files. If is None - create a temporary - directory. - preserve: a Boolean. If outdir above is None and preserve is False, the - created temporary directory will be destroyed on exit. - - Raises: - OSError: If it cannot create the output directory. - """ - global outdir, preserve_outdir - - preserve_outdir = dirname or preserve - if dirname: - outdir = dirname - if not os.path.isdir(outdir): - try: - os.makedirs(outdir) - except OSError as err: - raise ValueError( - f"Cannot make output directory 'outdir': 'err.strerror'") - tout.debug("Using output directory '%s'" % outdir) - else: - outdir = tempfile.mkdtemp(prefix='binman.') - tout.debug("Using temporary directory '%s'" % outdir) - -def _remove_output_dir(): - global outdir - - shutil.rmtree(outdir) - tout.debug("Deleted temporary directory '%s'" % outdir) - outdir = None - -def finalise_output_dir(): - global outdir, preserve_outdir - - """Tidy up: delete output directory if temporary and not preserved.""" - if outdir and not preserve_outdir: - _remove_output_dir() - outdir = None - -def get_output_filename(fname): - """Return a filename within the output directory. - - Args: - fname: Filename to use for new file - - Returns: - The full path of the filename, within the output directory - """ - return os.path.join(outdir, fname) - -def get_output_dir(): - """Return the current output directory - - Returns: - str: The output directory - """ - return outdir - -def _finalise_for_test(): - """Remove the output directory (for use by tests)""" - global outdir - - if outdir: - _remove_output_dir() - outdir = None - -def set_input_dirs(dirname): - """Add a list of input directories, where input files are kept. - - Args: - dirname: a list of paths to input directories to use for obtaining - files needed by binman to place in the image. - """ - global indir - - indir = dirname - tout.debug("Using input directories %s" % indir) - -def get_input_filename(fname, allow_missing=False): - """Return a filename for use as input. - - Args: - fname: Filename to use for new file - allow_missing: True if the filename can be missing - - Returns: - fname, if indir is None; - full path of the filename, within the input directory; - None, if file is missing and allow_missing is True - - Raises: - ValueError if file is missing and allow_missing is False - """ - if not indir or fname[:1] == '/': - return fname - for dirname in indir: - pathname = os.path.join(dirname, fname) - if os.path.exists(pathname): - return pathname - - if allow_missing: - return None - raise ValueError("Filename '%s' not found in input path (%s) (cwd='%s')" % - (fname, ','.join(indir), os.getcwd())) - -def get_input_filename_glob(pattern): - """Return a list of filenames for use as input. - - Args: - pattern: Filename pattern to search for - - Returns: - A list of matching files in all input directories - """ - if not indir: - return glob.glob(pattern) - files = [] - for dirname in indir: - pathname = os.path.join(dirname, pattern) - files += glob.glob(pathname) - return sorted(files) - -def align(pos, align): - if align: - mask = align - 1 - pos = (pos + mask) & ~mask - return pos - -def not_power_of_two(num): - return num and (num & (num - 1)) - -def set_tool_paths(toolpaths): - """Set the path to search for tools - - Args: - toolpaths: List of paths to search for tools executed by run() - """ - global tool_search_paths - - tool_search_paths = toolpaths - -def path_has_file(path_spec, fname): - """Check if a given filename is in the PATH - - Args: - path_spec: Value of PATH variable to check - fname: Filename to check - - Returns: - True if found, False if not - """ - for dir in path_spec.split(':'): - if os.path.exists(os.path.join(dir, fname)): - return True - return False - -def get_host_compile_tool(env, name): - """Get the host-specific version for a compile tool - - This checks the environment variables that specify which version of - the tool should be used (e.g. ${HOSTCC}). - - The following table lists the host-specific versions of the tools - this function resolves to: - - Compile Tool | Host version - --------------+---------------- - as | ${HOSTAS} - ld | ${HOSTLD} - cc | ${HOSTCC} - cpp | ${HOSTCPP} - c++ | ${HOSTCXX} - ar | ${HOSTAR} - nm | ${HOSTNM} - ldr | ${HOSTLDR} - strip | ${HOSTSTRIP} - objcopy | ${HOSTOBJCOPY} - objdump | ${HOSTOBJDUMP} - dtc | ${HOSTDTC} - - Args: - name: Command name to run - - Returns: - host_name: Exact command name to run instead - extra_args: List of extra arguments to pass - """ - host_name = None - extra_args = [] - if name in ('as', 'ld', 'cc', 'cpp', 'ar', 'nm', 'ldr', 'strip', - 'objcopy', 'objdump', 'dtc'): - host_name, *host_args = env.get('HOST' + name.upper(), '').split(' ') - elif name == 'c++': - host_name, *host_args = env.get('HOSTCXX', '').split(' ') - - if host_name: - return host_name, extra_args - return name, [] - -def get_target_compile_tool(name, cross_compile=None): - """Get the target-specific version for a compile tool - - This first checks the environment variables that specify which - version of the tool should be used (e.g. ${CC}). If those aren't - specified, it checks the CROSS_COMPILE variable as a prefix for the - tool with some substitutions (e.g. "${CROSS_COMPILE}gcc" for cc). - - The following table lists the target-specific versions of the tools - this function resolves to: - - Compile Tool | First choice | Second choice - --------------+----------------+---------------------------- - as | ${AS} | ${CROSS_COMPILE}as - ld | ${LD} | ${CROSS_COMPILE}ld.bfd - | | or ${CROSS_COMPILE}ld - cc | ${CC} | ${CROSS_COMPILE}gcc - cpp | ${CPP} | ${CROSS_COMPILE}gcc -E - c++ | ${CXX} | ${CROSS_COMPILE}g++ - ar | ${AR} | ${CROSS_COMPILE}ar - nm | ${NM} | ${CROSS_COMPILE}nm - ldr | ${LDR} | ${CROSS_COMPILE}ldr - strip | ${STRIP} | ${CROSS_COMPILE}strip - objcopy | ${OBJCOPY} | ${CROSS_COMPILE}objcopy - objdump | ${OBJDUMP} | ${CROSS_COMPILE}objdump - dtc | ${DTC} | (no CROSS_COMPILE version) - - Args: - name: Command name to run - - Returns: - target_name: Exact command name to run instead - extra_args: List of extra arguments to pass - """ - env = dict(os.environ) - - target_name = None - extra_args = [] - if name in ('as', 'ld', 'cc', 'cpp', 'ar', 'nm', 'ldr', 'strip', - 'objcopy', 'objdump', 'dtc'): - target_name, *extra_args = env.get(name.upper(), '').split(' ') - elif name == 'c++': - target_name, *extra_args = env.get('CXX', '').split(' ') - - if target_name: - return target_name, extra_args - - if cross_compile is None: - cross_compile = env.get('CROSS_COMPILE', '') - - if name in ('as', 'ar', 'nm', 'ldr', 'strip', 'objcopy', 'objdump'): - target_name = cross_compile + name - elif name == 'ld': - try: - if run(cross_compile + 'ld.bfd', '-v'): - target_name = cross_compile + 'ld.bfd' - except: - target_name = cross_compile + 'ld' - elif name == 'cc': - target_name = cross_compile + 'gcc' - elif name == 'cpp': - target_name = cross_compile + 'gcc' - extra_args = ['-E'] - elif name == 'c++': - target_name = cross_compile + 'g++' - else: - target_name = name - return target_name, extra_args - -def get_env_with_path(): - """Get an updated environment with the PATH variable set correctly - - If there are any search paths set, these need to come first in the PATH so - that these override any other version of the tools. - - Returns: - dict: New environment with PATH updated, or None if there are not search - paths - """ - if tool_search_paths: - env = dict(os.environ) - env['PATH'] = ':'.join(tool_search_paths) + ':' + env['PATH'] - return env - -def run_result(name, *args, **kwargs): - """Run a tool with some arguments - - This runs a 'tool', which is a program used by binman to process files and - perhaps produce some output. Tools can be located on the PATH or in a - search path. - - Args: - name: Command name to run - args: Arguments to the tool - for_host: True to resolve the command to the version for the host - for_target: False to run the command as-is, without resolving it - to the version for the compile target - raise_on_error: Raise an error if the command fails (True by default) - - Returns: - CommandResult object - """ - try: - binary = kwargs.get('binary') - for_host = kwargs.get('for_host', False) - for_target = kwargs.get('for_target', not for_host) - raise_on_error = kwargs.get('raise_on_error', True) - env = get_env_with_path() - if for_target: - name, extra_args = get_target_compile_tool(name) - args = tuple(extra_args) + args - elif for_host: - name, extra_args = get_host_compile_tool(env, name) - args = tuple(extra_args) + args - name = os.path.expanduser(name) # Expand paths containing ~ - all_args = (name,) + args - result = command.run_pipe([all_args], capture=True, capture_stderr=True, - env=env, raise_on_error=False, binary=binary) - if result.return_code: - if raise_on_error: - raise ValueError("Error %d running '%s': %s" % - (result.return_code,' '.join(all_args), - result.stderr or result.stdout)) - return result - except ValueError: - if env and not path_has_file(env['PATH'], name): - msg = "Please install tool '%s'" % name - package = packages.get(name) - if package: - msg += " (e.g. from package '%s')" % package - raise ValueError(msg) - raise - -def tool_find(name): - """Search the current path for a tool - - This uses both PATH and any value from set_tool_paths() to search for a tool - - Args: - name (str): Name of tool to locate - - Returns: - str: Full path to tool if found, else None - """ - name = os.path.expanduser(name) # Expand paths containing ~ - paths = [] - pathvar = os.environ.get('PATH') - if pathvar: - paths = pathvar.split(':') - if tool_search_paths: - paths += tool_search_paths - for path in paths: - fname = os.path.join(path, name) - if os.path.isfile(fname) and os.access(fname, os.X_OK): - return fname - -def run(name, *args, **kwargs): - """Run a tool with some arguments - - This runs a 'tool', which is a program used by binman to process files and - perhaps produce some output. Tools can be located on the PATH or in a - search path. - - Args: - name: Command name to run - args: Arguments to the tool - for_host: True to resolve the command to the version for the host - for_target: False to run the command as-is, without resolving it - to the version for the compile target - - Returns: - CommandResult object - """ - result = run_result(name, *args, **kwargs) - if result is not None: - return result.stdout - -def filename(fname): - """Resolve a file path to an absolute path. - - If fname starts with ##/ and chroot is available, ##/ gets replaced with - the chroot path. If chroot is not available, this file name can not be - resolved, `None' is returned. - - If fname is not prepended with the above prefix, and is not an existing - file, the actual file name is retrieved from the passed in string and the - search_paths directories (if any) are searched to for the file. If found - - the path to the found file is returned, `None' is returned otherwise. - - Args: - fname: a string, the path to resolve. - - Returns: - Absolute path to the file or None if not found. - """ - if fname.startswith('##/'): - if chroot_path: - fname = os.path.join(chroot_path, fname[3:]) - else: - return None - - # Search for a pathname that exists, and return it if found - if fname and not os.path.exists(fname): - for path in search_paths: - pathname = os.path.join(path, os.path.basename(fname)) - if os.path.exists(pathname): - return pathname - - # If not found, just return the standard, unchanged path - return fname - -def read_file(fname, binary=True): - """Read and return the contents of a file. - - Args: - fname: path to filename to read, where ## signifiies the chroot. - - Returns: - data read from file, as a string. - """ - with open(filename(fname), binary and 'rb' or 'r') as fd: - data = fd.read() - #self._out.Info("Read file '%s' size %d (%#0x)" % - #(fname, len(data), len(data))) - return data - -def write_file(fname, data, binary=True): - """Write data into a file. - - Args: - fname: path to filename to write - data: data to write to file, as a string - """ - #self._out.Info("Write file '%s' size %d (%#0x)" % - #(fname, len(data), len(data))) - with open(filename(fname), binary and 'wb' or 'w') as fd: - fd.write(data) - -def get_bytes(byte, size): - """Get a string of bytes of a given size - - Args: - byte: Numeric byte value to use - size: Size of bytes/string to return - - Returns: - A bytes type with 'byte' repeated 'size' times - """ - return bytes([byte]) * size - -def to_bytes(string): - """Convert a str type into a bytes type - - Args: - string: string to convert - - Returns: - A bytes type - """ - return string.encode('utf-8') - -def to_string(bval): - """Convert a bytes type into a str type - - Args: - bval: bytes value to convert - - Returns: - Python 3: A bytes type - Python 2: A string type - """ - return bval.decode('utf-8') - -def to_hex(val): - """Convert an integer value (or None) to a string - - Returns: - hex value, or 'None' if the value is None - """ - return 'None' if val is None else '%#x' % val - -def to_hex_size(val): - """Return the size of an object in hex - - Returns: - hex value of size, or 'None' if the value is None - """ - return 'None' if val is None else '%#x' % len(val) - -def print_full_help(fname): - """Print the full help message for a tool using an appropriate pager. - - Args: - fname: Path to a file containing the full help message - """ - pager = shlex.split(os.getenv('PAGER', '')) - if not pager: - lesspath = shutil.which('less') - pager = [lesspath] if lesspath else None - if not pager: - pager = ['more'] - command.run(*pager, fname) - -def download(url, tmpdir_pattern='.patman'): - """Download a file to a temporary directory - - Args: - url (str): URL to download - tmpdir_pattern (str): pattern to use for the temporary directory - - Returns: - Tuple: - Full path to the downloaded archive file in that directory, - or None if there was an error while downloading - Temporary directory name - """ - print('- downloading: %s' % url) - leaf = url.split('/')[-1] - tmpdir = tempfile.mkdtemp(tmpdir_pattern) - response = urllib.request.urlopen(url) - fname = os.path.join(tmpdir, leaf) - fd = open(fname, 'wb') - meta = response.info() - size = int(meta.get('Content-Length')) - done = 0 - block_size = 1 << 16 - status = '' - - # Read the file in chunks and show progress as we go - while True: - buffer = response.read(block_size) - if not buffer: - print(chr(8) * (len(status) + 1), '\r', end=' ') - break - - done += len(buffer) - fd.write(buffer) - status = r'%10d MiB [%3d%%]' % (done // 1024 // 1024, - done * 100 // size) - status = status + chr(8) * (len(status) + 1) - print(status, end=' ') - sys.stdout.flush() - print('\r', end='') - sys.stdout.flush() - fd.close() - if done != size: - print('Error, failed to download') - os.remove(fname) - fname = None - return fname, tmpdir diff --git a/tools/patman/tout.py b/tools/patman/tout.py deleted file mode 100644 index ff0fd92afc..0000000000 --- a/tools/patman/tout.py +++ /dev/null @@ -1,179 +0,0 @@ -# SPDX-License-Identifier: GPL-2.0+ -# Copyright (c) 2016 Google, Inc -# -# Terminal output logging. -# - -import sys - -from patman import terminal - -# Output verbosity levels that we support -ERROR, WARNING, NOTICE, INFO, DETAIL, DEBUG = range(6) - -in_progress = False - -""" -This class handles output of progress and other useful information -to the user. It provides for simple verbosity level control and can -output nothing but errors at verbosity zero. - -The idea is that modules set up an Output object early in their years and pass -it around to other modules that need it. This keeps the output under control -of a single class. - -Public properties: - verbose: Verbosity level: 0=silent, 1=progress, 3=full, 4=debug -""" -def __enter__(): - return - -def __exit__(unused1, unused2, unused3): - """Clean up and remove any progress message.""" - clear_progress() - return False - -def user_is_present(): - """This returns True if it is likely that a user is present. - - Sometimes we want to prompt the user, but if no one is there then this - is a waste of time, and may lock a script which should otherwise fail. - - Returns: - True if it thinks the user is there, and False otherwise - """ - return stdout_is_tty and verbose > 0 - -def clear_progress(): - """Clear any active progress message on the terminal.""" - global in_progress - if verbose > 0 and stdout_is_tty and in_progress: - _stdout.write('\r%s\r' % (" " * len (_progress))) - _stdout.flush() - in_progress = False - -def progress(msg, warning=False, trailer='...'): - """Display progress information. - - Args: - msg: Message to display. - warning: True if this is a warning.""" - global in_progress - clear_progress() - if verbose > 0: - _progress = msg + trailer - if stdout_is_tty: - col = _color.YELLOW if warning else _color.GREEN - _stdout.write('\r' + _color.build(col, _progress)) - _stdout.flush() - in_progress = True - else: - _stdout.write(_progress + '\n') - -def _output(level, msg, color=None): - """Output a message to the terminal. - - Args: - level: Verbosity level for this message. It will only be displayed if - this as high as the currently selected level. - msg; Message to display. - error: True if this is an error message, else False. - """ - if verbose >= level: - clear_progress() - if color: - msg = _color.build(color, msg) - if level < NOTICE: - print(msg, file=sys.stderr) - else: - print(msg) - -def do_output(level, msg): - """Output a message to the terminal. - - Args: - level: Verbosity level for this message. It will only be displayed if - this as high as the currently selected level. - msg; Message to display. - """ - _output(level, msg) - -def error(msg): - """Display an error message - - Args: - msg; Message to display. - """ - _output(ERROR, msg, _color.RED) - -def warning(msg): - """Display a warning message - - Args: - msg; Message to display. - """ - _output(WARNING, msg, _color.YELLOW) - -def notice(msg): - """Display an important infomation message - - Args: - msg; Message to display. - """ - _output(NOTICE, msg) - -def info(msg): - """Display an infomation message - - Args: - msg; Message to display. - """ - _output(INFO, msg) - -def detail(msg): - """Display a detailed message - - Args: - msg; Message to display. - """ - _output(DETAIL, msg) - -def debug(msg): - """Display a debug message - - Args: - msg; Message to display. - """ - _output(DEBUG, msg) - -def user_output(msg): - """Display a message regardless of the current output level. - - This is used when the output was specifically requested by the user. - Args: - msg; Message to display. - """ - _output(0, msg) - -def init(_verbose=WARNING, stdout=sys.stdout): - """Initialize a new output object. - - Args: - verbose: Verbosity level (0-4). - stdout: File to use for stdout. - """ - global verbose, _progress, _color, _stdout, stdout_is_tty - - verbose = _verbose - _progress = '' # Our last progress message - _color = terminal.Color() - _stdout = stdout - - # TODO(sjg): Move this into Chromite libraries when we have them - stdout_is_tty = hasattr(sys.stdout, 'isatty') and sys.stdout.isatty() - stderr_is_tty = hasattr(sys.stderr, 'isatty') and sys.stderr.isatty() - -def uninit(): - clear_progress() - -init() |