469 lines
		
	
	
		
			17 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			469 lines
		
	
	
		
			17 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # SPDX-License-Identifier: GPL-2.0
 | |
| # Copyright (c) 2015 Stephen Warren
 | |
| # Copyright (c) 2015-2016, NVIDIA CORPORATION. All rights reserved.
 | |
| 
 | |
| # Common logic to interact with U-Boot via the console. This class provides
 | |
| # the interface that tests use to execute U-Boot shell commands and wait for
 | |
| # their results. Sub-classes exist to perform board-type-specific setup
 | |
| # operations, such as spawning a sub-process for Sandbox, or attaching to the
 | |
| # serial console of real hardware.
 | |
| 
 | |
| import multiplexed_log
 | |
| import os
 | |
| import pytest
 | |
| import re
 | |
| import sys
 | |
| import u_boot_spawn
 | |
| 
 | |
| # Regexes for text we expect U-Boot to send to the console.
 | |
| pattern_u_boot_spl_signon = re.compile('(U-Boot SPL \\d{4}\\.\\d{2}[^\r\n]*\\))')
 | |
| pattern_u_boot_main_signon = re.compile('(U-Boot \\d{4}\\.\\d{2}[^\r\n]*\\))')
 | |
| pattern_stop_autoboot_prompt = re.compile('Hit any key to stop autoboot: ')
 | |
| pattern_unknown_command = re.compile('Unknown command \'.*\' - try \'help\'')
 | |
| pattern_error_notification = re.compile('## Error: ')
 | |
| pattern_error_please_reset = re.compile('### ERROR ### Please RESET the board ###')
 | |
| 
 | |
| PAT_ID = 0
 | |
| PAT_RE = 1
 | |
| 
 | |
| bad_pattern_defs = (
 | |
|     ('spl_signon', pattern_u_boot_spl_signon),
 | |
|     ('main_signon', pattern_u_boot_main_signon),
 | |
|     ('stop_autoboot_prompt', pattern_stop_autoboot_prompt),
 | |
|     ('unknown_command', pattern_unknown_command),
 | |
|     ('error_notification', pattern_error_notification),
 | |
|     ('error_please_reset', pattern_error_please_reset),
 | |
| )
 | |
| 
 | |
| class ConsoleDisableCheck(object):
 | |
|     """Context manager (for Python's with statement) that temporarily disables
 | |
|     the specified console output error check. This is useful when deliberately
 | |
|     executing a command that is known to trigger one of the error checks, in
 | |
|     order to test that the error condition is actually raised. This class is
 | |
|     used internally by ConsoleBase::disable_check(); it is not intended for
 | |
|     direct usage."""
 | |
| 
 | |
|     def __init__(self, console, check_type):
 | |
|         self.console = console
 | |
|         self.check_type = check_type
 | |
| 
 | |
|     def __enter__(self):
 | |
|         self.console.disable_check_count[self.check_type] += 1
 | |
|         self.console.eval_bad_patterns()
 | |
| 
 | |
|     def __exit__(self, extype, value, traceback):
 | |
|         self.console.disable_check_count[self.check_type] -= 1
 | |
|         self.console.eval_bad_patterns()
 | |
| 
 | |
| class ConsoleSetupTimeout(object):
 | |
|     """Context manager (for Python's with statement) that temporarily sets up
 | |
|     timeout for specific command. This is useful when execution time is greater
 | |
|     then default 30s."""
 | |
| 
 | |
|     def __init__(self, console, timeout):
 | |
|         self.p = console.p
 | |
|         self.orig_timeout = self.p.timeout
 | |
|         self.p.timeout = timeout
 | |
| 
 | |
|     def __enter__(self):
 | |
|         return self
 | |
| 
 | |
|     def __exit__(self, extype, value, traceback):
 | |
|         self.p.timeout = self.orig_timeout
 | |
| 
 | |
| class ConsoleBase(object):
 | |
|     """The interface through which test functions interact with the U-Boot
 | |
|     console. This primarily involves executing shell commands, capturing their
 | |
|     results, and checking for common error conditions. Some common utilities
 | |
|     are also provided too."""
 | |
| 
 | |
|     def __init__(self, log, config, max_fifo_fill):
 | |
|         """Initialize a U-Boot console connection.
 | |
| 
 | |
|         Can only usefully be called by sub-classes.
 | |
| 
 | |
|         Args:
 | |
|             log: A mulptiplex_log.Logfile object, to which the U-Boot output
 | |
|                 will be logged.
 | |
|             config: A configuration data structure, as built by conftest.py.
 | |
|             max_fifo_fill: The maximum number of characters to send to U-Boot
 | |
|                 command-line before waiting for U-Boot to echo the characters
 | |
|                 back. For UART-based HW without HW flow control, this value
 | |
|                 should be set less than the UART RX FIFO size to avoid
 | |
|                 overflow, assuming that U-Boot can't keep up with full-rate
 | |
|                 traffic at the baud rate.
 | |
| 
 | |
|         Returns:
 | |
|             Nothing.
 | |
|         """
 | |
| 
 | |
|         self.log = log
 | |
|         self.config = config
 | |
|         self.max_fifo_fill = max_fifo_fill
 | |
| 
 | |
|         self.logstream = self.log.get_stream('console', sys.stdout)
 | |
| 
 | |
|         # Array slice removes leading/trailing quotes
 | |
|         self.prompt = self.config.buildconfig['config_sys_prompt'][1:-1]
 | |
|         self.prompt_compiled = re.compile('^' + re.escape(self.prompt), re.MULTILINE)
 | |
|         self.p = None
 | |
|         self.disable_check_count = {pat[PAT_ID]: 0 for pat in bad_pattern_defs}
 | |
|         self.eval_bad_patterns()
 | |
| 
 | |
|         self.at_prompt = False
 | |
|         self.at_prompt_logevt = None
 | |
| 
 | |
|     def eval_bad_patterns(self):
 | |
|         self.bad_patterns = [pat[PAT_RE] for pat in bad_pattern_defs \
 | |
|             if self.disable_check_count[pat[PAT_ID]] == 0]
 | |
|         self.bad_pattern_ids = [pat[PAT_ID] for pat in bad_pattern_defs \
 | |
|             if self.disable_check_count[pat[PAT_ID]] == 0]
 | |
| 
 | |
|     def close(self):
 | |
|         """Terminate the connection to the U-Boot console.
 | |
| 
 | |
|         This function is only useful once all interaction with U-Boot is
 | |
|         complete. Once this function is called, data cannot be sent to or
 | |
|         received from U-Boot.
 | |
| 
 | |
|         Args:
 | |
|             None.
 | |
| 
 | |
|         Returns:
 | |
|             Nothing.
 | |
|         """
 | |
| 
 | |
|         if self.p:
 | |
|             self.p.close()
 | |
|         self.logstream.close()
 | |
| 
 | |
|     def run_command(self, cmd, wait_for_echo=True, send_nl=True,
 | |
|             wait_for_prompt=True):
 | |
|         """Execute a command via the U-Boot console.
 | |
| 
 | |
|         The command is always sent to U-Boot.
 | |
| 
 | |
|         U-Boot echoes any command back to its output, and this function
 | |
|         typically waits for that to occur. The wait can be disabled by setting
 | |
|         wait_for_echo=False, which is useful e.g. when sending CTRL-C to
 | |
|         interrupt a long-running command such as "ums".
 | |
| 
 | |
|         Command execution is typically triggered by sending a newline
 | |
|         character. This can be disabled by setting send_nl=False, which is
 | |
|         also useful when sending CTRL-C.
 | |
| 
 | |
|         This function typically waits for the command to finish executing, and
 | |
|         returns the console output that it generated. This can be disabled by
 | |
|         setting wait_for_prompt=False, which is useful when invoking a long-
 | |
|         running command such as "ums".
 | |
| 
 | |
|         Args:
 | |
|             cmd: The command to send.
 | |
|             wait_for_echo: Boolean indicating whether to wait for U-Boot to
 | |
|                 echo the command text back to its output.
 | |
|             send_nl: Boolean indicating whether to send a newline character
 | |
|                 after the command string.
 | |
|             wait_for_prompt: Boolean indicating whether to wait for the
 | |
|                 command prompt to be sent by U-Boot. This typically occurs
 | |
|                 immediately after the command has been executed.
 | |
| 
 | |
|         Returns:
 | |
|             If wait_for_prompt == False:
 | |
|                 Nothing.
 | |
|             Else:
 | |
|                 The output from U-Boot during command execution. In other
 | |
|                 words, the text U-Boot emitted between the point it echod the
 | |
|                 command string and emitted the subsequent command prompts.
 | |
|         """
 | |
| 
 | |
|         if self.at_prompt and \
 | |
|                 self.at_prompt_logevt != self.logstream.logfile.cur_evt:
 | |
|             self.logstream.write(self.prompt, implicit=True)
 | |
| 
 | |
|         try:
 | |
|             self.at_prompt = False
 | |
|             if send_nl:
 | |
|                 cmd += '\n'
 | |
|             while cmd:
 | |
|                 # Limit max outstanding data, so UART FIFOs don't overflow
 | |
|                 chunk = cmd[:self.max_fifo_fill]
 | |
|                 cmd = cmd[self.max_fifo_fill:]
 | |
|                 self.p.send(chunk)
 | |
|                 if not wait_for_echo:
 | |
|                     continue
 | |
|                 chunk = re.escape(chunk)
 | |
|                 chunk = chunk.replace('\\\n', '[\r\n]')
 | |
|                 m = self.p.expect([chunk] + self.bad_patterns)
 | |
|                 if m != 0:
 | |
|                     self.at_prompt = False
 | |
|                     raise Exception('Bad pattern found on console: ' +
 | |
|                                     self.bad_pattern_ids[m - 1])
 | |
|             if not wait_for_prompt:
 | |
|                 return
 | |
|             m = self.p.expect([self.prompt_compiled] + self.bad_patterns)
 | |
|             if m != 0:
 | |
|                 self.at_prompt = False
 | |
|                 raise Exception('Bad pattern found on console: ' +
 | |
|                                 self.bad_pattern_ids[m - 1])
 | |
|             self.at_prompt = True
 | |
|             self.at_prompt_logevt = self.logstream.logfile.cur_evt
 | |
|             # Only strip \r\n; space/TAB might be significant if testing
 | |
|             # indentation.
 | |
|             return self.p.before.strip('\r\n')
 | |
|         except Exception as ex:
 | |
|             self.log.error(str(ex))
 | |
|             self.cleanup_spawn()
 | |
|             raise
 | |
|         finally:
 | |
|             self.log.timestamp()
 | |
| 
 | |
|     def run_command_list(self, cmds):
 | |
|         """Run a list of commands.
 | |
| 
 | |
|         This is a helper function to call run_command() with default arguments
 | |
|         for each command in a list.
 | |
| 
 | |
|         Args:
 | |
|             cmd: List of commands (each a string).
 | |
|         Returns:
 | |
|             A list of output strings from each command, one element for each
 | |
|             command.
 | |
|         """
 | |
|         output = []
 | |
|         for cmd in cmds:
 | |
|             output.append(self.run_command(cmd))
 | |
|         return output
 | |
| 
 | |
|     def ctrlc(self):
 | |
|         """Send a CTRL-C character to U-Boot.
 | |
| 
 | |
|         This is useful in order to stop execution of long-running synchronous
 | |
|         commands such as "ums".
 | |
| 
 | |
|         Args:
 | |
|             None.
 | |
| 
 | |
|         Returns:
 | |
|             Nothing.
 | |
|         """
 | |
| 
 | |
|         self.log.action('Sending Ctrl-C')
 | |
|         self.run_command(chr(3), wait_for_echo=False, send_nl=False)
 | |
| 
 | |
|     def wait_for(self, text):
 | |
|         """Wait for a pattern to be emitted by U-Boot.
 | |
| 
 | |
|         This is useful when a long-running command such as "dfu" is executing,
 | |
|         and it periodically emits some text that should show up at a specific
 | |
|         location in the log file.
 | |
| 
 | |
|         Args:
 | |
|             text: The text to wait for; either a string (containing raw text,
 | |
|                 not a regular expression) or an re object.
 | |
| 
 | |
|         Returns:
 | |
|             Nothing.
 | |
|         """
 | |
| 
 | |
|         if type(text) == type(''):
 | |
|             text = re.escape(text)
 | |
|         m = self.p.expect([text] + self.bad_patterns)
 | |
|         if m != 0:
 | |
|             raise Exception('Bad pattern found on console: ' +
 | |
|                             self.bad_pattern_ids[m - 1])
 | |
| 
 | |
|     def drain_console(self):
 | |
|         """Read from and log the U-Boot console for a short time.
 | |
| 
 | |
|         U-Boot's console output is only logged when the test code actively
 | |
|         waits for U-Boot to emit specific data. There are cases where tests
 | |
|         can fail without doing this. For example, if a test asks U-Boot to
 | |
|         enable USB device mode, then polls until a host-side device node
 | |
|         exists. In such a case, it is useful to log U-Boot's console output
 | |
|         in case U-Boot printed clues as to why the host-side even did not
 | |
|         occur. This function will do that.
 | |
| 
 | |
|         Args:
 | |
|             None.
 | |
| 
 | |
|         Returns:
 | |
|             Nothing.
 | |
|         """
 | |
| 
 | |
|         # If we are already not connected to U-Boot, there's nothing to drain.
 | |
|         # This should only happen when a previous call to run_command() or
 | |
|         # wait_for() failed (and hence the output has already been logged), or
 | |
|         # the system is shutting down.
 | |
|         if not self.p:
 | |
|             return
 | |
| 
 | |
|         orig_timeout = self.p.timeout
 | |
|         try:
 | |
|             # Drain the log for a relatively short time.
 | |
|             self.p.timeout = 1000
 | |
|             # Wait for something U-Boot will likely never send. This will
 | |
|             # cause the console output to be read and logged.
 | |
|             self.p.expect(['This should never match U-Boot output'])
 | |
|         except:
 | |
|             # We expect a timeout, since U-Boot won't print what we waited
 | |
|             # for. Squash it when it happens.
 | |
|             #
 | |
|             # Squash any other exception too. This function is only used to
 | |
|             # drain (and log) the U-Boot console output after a failed test.
 | |
|             # The U-Boot process will be restarted, or target board reset, once
 | |
|             # this function returns. So, we don't care about detecting any
 | |
|             # additional errors, so they're squashed so that the rest of the
 | |
|             # post-test-failure cleanup code can continue operation, and
 | |
|             # correctly terminate any log sections, etc.
 | |
|             pass
 | |
|         finally:
 | |
|             self.p.timeout = orig_timeout
 | |
| 
 | |
|     def ensure_spawned(self):
 | |
|         """Ensure a connection to a correctly running U-Boot instance.
 | |
| 
 | |
|         This may require spawning a new Sandbox process or resetting target
 | |
|         hardware, as defined by the implementation sub-class.
 | |
| 
 | |
|         This is an internal function and should not be called directly.
 | |
| 
 | |
|         Args:
 | |
|             None.
 | |
| 
 | |
|         Returns:
 | |
|             Nothing.
 | |
|         """
 | |
| 
 | |
|         if self.p:
 | |
|             return
 | |
|         try:
 | |
|             self.log.start_section('Starting U-Boot')
 | |
|             self.at_prompt = False
 | |
|             self.p = self.get_spawn()
 | |
|             # Real targets can take a long time to scroll large amounts of
 | |
|             # text if LCD is enabled. This value may need tweaking in the
 | |
|             # future, possibly per-test to be optimal. This works for 'help'
 | |
|             # on board 'seaboard'.
 | |
|             if not self.config.gdbserver:
 | |
|                 self.p.timeout = 30000
 | |
|             self.p.logfile_read = self.logstream
 | |
|             bcfg = self.config.buildconfig
 | |
|             config_spl = bcfg.get('config_spl', 'n') == 'y'
 | |
|             config_spl_serial_support = bcfg.get('config_spl_serial_support',
 | |
|                                                  'n') == 'y'
 | |
|             env_spl_skipped = self.config.env.get('env__spl_skipped',
 | |
|                                                   False)
 | |
|             if config_spl and config_spl_serial_support and not env_spl_skipped:
 | |
|                 m = self.p.expect([pattern_u_boot_spl_signon] +
 | |
|                                   self.bad_patterns)
 | |
|                 if m != 0:
 | |
|                     raise Exception('Bad pattern found on SPL console: ' +
 | |
|                                     self.bad_pattern_ids[m - 1])
 | |
|             m = self.p.expect([pattern_u_boot_main_signon] + self.bad_patterns)
 | |
|             if m != 0:
 | |
|                 raise Exception('Bad pattern found on console: ' +
 | |
|                                 self.bad_pattern_ids[m - 1])
 | |
|             self.u_boot_version_string = self.p.after
 | |
|             while True:
 | |
|                 m = self.p.expect([self.prompt_compiled,
 | |
|                     pattern_stop_autoboot_prompt] + self.bad_patterns)
 | |
|                 if m == 0:
 | |
|                     break
 | |
|                 if m == 1:
 | |
|                     self.p.send(' ')
 | |
|                     continue
 | |
|                 raise Exception('Bad pattern found on console: ' +
 | |
|                                 self.bad_pattern_ids[m - 2])
 | |
|             self.at_prompt = True
 | |
|             self.at_prompt_logevt = self.logstream.logfile.cur_evt
 | |
|         except Exception as ex:
 | |
|             self.log.error(str(ex))
 | |
|             self.cleanup_spawn()
 | |
|             raise
 | |
|         finally:
 | |
|             self.log.timestamp()
 | |
|             self.log.end_section('Starting U-Boot')
 | |
| 
 | |
|     def cleanup_spawn(self):
 | |
|         """Shut down all interaction with the U-Boot instance.
 | |
| 
 | |
|         This is used when an error is detected prior to re-establishing a
 | |
|         connection with a fresh U-Boot instance.
 | |
| 
 | |
|         This is an internal function and should not be called directly.
 | |
| 
 | |
|         Args:
 | |
|             None.
 | |
| 
 | |
|         Returns:
 | |
|             Nothing.
 | |
|         """
 | |
| 
 | |
|         try:
 | |
|             if self.p:
 | |
|                 self.p.close()
 | |
|         except:
 | |
|             pass
 | |
|         self.p = None
 | |
| 
 | |
|     def restart_uboot(self):
 | |
|         """Shut down and restart U-Boot."""
 | |
|         self.cleanup_spawn()
 | |
|         self.ensure_spawned()
 | |
| 
 | |
|     def get_spawn_output(self):
 | |
|         """Return the start-up output from U-Boot
 | |
| 
 | |
|         Returns:
 | |
|             The output produced by ensure_spawed(), as a string.
 | |
|         """
 | |
|         if self.p:
 | |
|             return self.p.get_expect_output()
 | |
|         return None
 | |
| 
 | |
|     def validate_version_string_in_text(self, text):
 | |
|         """Assert that a command's output includes the U-Boot signon message.
 | |
| 
 | |
|         This is primarily useful for validating the "version" command without
 | |
|         duplicating the signon text regex in a test function.
 | |
| 
 | |
|         Args:
 | |
|             text: The command output text to check.
 | |
| 
 | |
|         Returns:
 | |
|             Nothing. An exception is raised if the validation fails.
 | |
|         """
 | |
| 
 | |
|         assert(self.u_boot_version_string in text)
 | |
| 
 | |
|     def disable_check(self, check_type):
 | |
|         """Temporarily disable an error check of U-Boot's output.
 | |
| 
 | |
|         Create a new context manager (for use with the "with" statement) which
 | |
|         temporarily disables a particular console output error check.
 | |
| 
 | |
|         Args:
 | |
|             check_type: The type of error-check to disable. Valid values may
 | |
|             be found in self.disable_check_count above.
 | |
| 
 | |
|         Returns:
 | |
|             A context manager object.
 | |
|         """
 | |
| 
 | |
|         return ConsoleDisableCheck(self, check_type)
 | |
| 
 | |
|     def temporary_timeout(self, timeout):
 | |
|         """Temporarily set up different timeout for commands.
 | |
| 
 | |
|         Create a new context manager (for use with the "with" statement) which
 | |
|         temporarily change timeout.
 | |
| 
 | |
|         Args:
 | |
|             timeout: Time in milliseconds.
 | |
| 
 | |
|         Returns:
 | |
|             A context manager object.
 | |
|         """
 | |
| 
 | |
|         return ConsoleSetupTimeout(self, timeout)
 | 
