diff options
Diffstat (limited to 'tmk_core/tool/mbed/mbed-sdk/workspace_tools/test_api.py')
-rw-r--r-- | tmk_core/tool/mbed/mbed-sdk/workspace_tools/test_api.py | 1841 |
1 files changed, 0 insertions, 1841 deletions
diff --git a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/test_api.py b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/test_api.py deleted file mode 100644 index bca2da527..000000000 --- a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/test_api.py +++ /dev/null @@ -1,1841 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2014 ARM Limited - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. - -Author: Przemyslaw Wirkus <Przemyslaw.wirkus@arm.com> -""" - -import os -import re -import sys -import json -import uuid -import pprint -import random -import optparse -import datetime -import threading -from types import ListType -from colorama import Fore, Back, Style -from prettytable import PrettyTable - -from time import sleep, time -from Queue import Queue, Empty -from os.path import join, exists, basename -from threading import Thread, Lock -from subprocess import Popen, PIPE - -# Imports related to mbed build api -from workspace_tools.tests import TESTS -from workspace_tools.tests import TEST_MAP -from workspace_tools.paths import BUILD_DIR -from workspace_tools.paths import HOST_TESTS -from workspace_tools.utils import ToolException -from workspace_tools.utils import construct_enum -from workspace_tools.targets import TARGET_MAP -from workspace_tools.test_db import BaseDBAccess -from workspace_tools.build_api import build_project, build_mbed_libs, build_lib -from workspace_tools.build_api import get_target_supported_toolchains -from workspace_tools.build_api import write_build_report -from workspace_tools.libraries import LIBRARIES, LIBRARY_MAP -from workspace_tools.toolchains import TOOLCHAIN_BIN_PATH -from workspace_tools.test_exporters import ReportExporter, ResultExporterType - - -import workspace_tools.host_tests.host_tests_plugins as host_tests_plugins - -try: - import mbed_lstools -except: - pass - - -class ProcessObserver(Thread): - def __init__(self, proc): - Thread.__init__(self) - self.proc = proc - self.queue = Queue() - self.daemon = True - self.active = True - self.start() - - def run(self): - while self.active: - c = self.proc.stdout.read(1) - self.queue.put(c) - - def stop(self): - self.active = False - try: - self.proc.terminate() - except Exception, _: - pass - - -class SingleTestExecutor(threading.Thread): - """ Example: Single test class in separate thread usage - """ - def __init__(self, single_test): - self.single_test = single_test - threading.Thread.__init__(self) - - def run(self): - start = time() - # Execute tests depending on options and filter applied - test_summary, shuffle_seed, test_summary_ext, test_suite_properties_ext = self.single_test.execute() - elapsed_time = time() - start - - # Human readable summary - if not self.single_test.opts_suppress_summary: - # prints well-formed summary with results (SQL table like) - print self.single_test.generate_test_summary(test_summary, shuffle_seed) - if self.single_test.opts_test_x_toolchain_summary: - # prints well-formed summary with results (SQL table like) - # table shows text x toolchain test result matrix - print self.single_test.generate_test_summary_by_target(test_summary, shuffle_seed) - print "Completed in %.2f sec"% (elapsed_time) - - -class SingleTestRunner(object): - """ Object wrapper for single test run which may involve multiple MUTs - """ - RE_DETECT_TESTCASE_RESULT = None - - # Return codes for test script - TEST_RESULT_OK = "OK" - TEST_RESULT_FAIL = "FAIL" - TEST_RESULT_ERROR = "ERROR" - TEST_RESULT_UNDEF = "UNDEF" - TEST_RESULT_IOERR_COPY = "IOERR_COPY" - TEST_RESULT_IOERR_DISK = "IOERR_DISK" - TEST_RESULT_IOERR_SERIAL = "IOERR_SERIAL" - TEST_RESULT_TIMEOUT = "TIMEOUT" - TEST_RESULT_NO_IMAGE = "NO_IMAGE" - TEST_RESULT_MBED_ASSERT = "MBED_ASSERT" - - GLOBAL_LOOPS_COUNT = 1 # How many times each test should be repeated - TEST_LOOPS_LIST = [] # We redefine no.of loops per test_id - TEST_LOOPS_DICT = {} # TEST_LOOPS_LIST in dict format: { test_id : test_loop_count} - - muts = {} # MUTs descriptor (from external file) - test_spec = {} # Test specification (from external file) - - # mbed test suite -> SingleTestRunner - TEST_RESULT_MAPPING = {"success" : TEST_RESULT_OK, - "failure" : TEST_RESULT_FAIL, - "error" : TEST_RESULT_ERROR, - "ioerr_copy" : TEST_RESULT_IOERR_COPY, - "ioerr_disk" : TEST_RESULT_IOERR_DISK, - "ioerr_serial" : TEST_RESULT_IOERR_SERIAL, - "timeout" : TEST_RESULT_TIMEOUT, - "no_image" : TEST_RESULT_NO_IMAGE, - "end" : TEST_RESULT_UNDEF, - "mbed_assert" : TEST_RESULT_MBED_ASSERT - } - - def __init__(self, - _global_loops_count=1, - _test_loops_list=None, - _muts={}, - _clean=False, - _opts_db_url=None, - _opts_log_file_name=None, - _opts_report_html_file_name=None, - _opts_report_junit_file_name=None, - _opts_report_build_file_name=None, - _test_spec={}, - _opts_goanna_for_mbed_sdk=None, - _opts_goanna_for_tests=None, - _opts_shuffle_test_order=False, - _opts_shuffle_test_seed=None, - _opts_test_by_names=None, - _opts_peripheral_by_names=None, - _opts_test_only_peripheral=False, - _opts_test_only_common=False, - _opts_verbose_skipped_tests=False, - _opts_verbose_test_result_only=False, - _opts_verbose=False, - _opts_firmware_global_name=None, - _opts_only_build_tests=False, - _opts_parallel_test_exec=False, - _opts_suppress_summary=False, - _opts_test_x_toolchain_summary=False, - _opts_copy_method=None, - _opts_mut_reset_type=None, - _opts_jobs=None, - _opts_waterfall_test=None, - _opts_extend_test_timeout=None): - """ Let's try hard to init this object - """ - from colorama import init - init() - - PATTERN = "\\{(" + "|".join(self.TEST_RESULT_MAPPING.keys()) + ")\\}" - self.RE_DETECT_TESTCASE_RESULT = re.compile(PATTERN) - # Settings related to test loops counters - try: - _global_loops_count = int(_global_loops_count) - except: - _global_loops_count = 1 - if _global_loops_count < 1: - _global_loops_count = 1 - self.GLOBAL_LOOPS_COUNT = _global_loops_count - self.TEST_LOOPS_LIST = _test_loops_list if _test_loops_list else [] - self.TEST_LOOPS_DICT = self.test_loop_list_to_dict(_test_loops_list) - - self.shuffle_random_seed = 0.0 - self.SHUFFLE_SEED_ROUND = 10 - - # MUT list and test specification storage - self.muts = _muts - self.test_spec = _test_spec - - # Settings passed e.g. from command line - self.opts_db_url = _opts_db_url - self.opts_log_file_name = _opts_log_file_name - self.opts_report_html_file_name = _opts_report_html_file_name - self.opts_report_junit_file_name = _opts_report_junit_file_name - self.opts_report_build_file_name = _opts_report_build_file_name - self.opts_goanna_for_mbed_sdk = _opts_goanna_for_mbed_sdk - self.opts_goanna_for_tests = _opts_goanna_for_tests - self.opts_shuffle_test_order = _opts_shuffle_test_order - self.opts_shuffle_test_seed = _opts_shuffle_test_seed - self.opts_test_by_names = _opts_test_by_names - self.opts_peripheral_by_names = _opts_peripheral_by_names - self.opts_test_only_peripheral = _opts_test_only_peripheral - self.opts_test_only_common = _opts_test_only_common - self.opts_verbose_skipped_tests = _opts_verbose_skipped_tests - self.opts_verbose_test_result_only = _opts_verbose_test_result_only - self.opts_verbose = _opts_verbose - self.opts_firmware_global_name = _opts_firmware_global_name - self.opts_only_build_tests = _opts_only_build_tests - self.opts_parallel_test_exec = _opts_parallel_test_exec - self.opts_suppress_summary = _opts_suppress_summary - self.opts_test_x_toolchain_summary = _opts_test_x_toolchain_summary - self.opts_copy_method = _opts_copy_method - self.opts_mut_reset_type = _opts_mut_reset_type - self.opts_jobs = _opts_jobs if _opts_jobs is not None else 1 - self.opts_waterfall_test = _opts_waterfall_test - self.opts_extend_test_timeout = _opts_extend_test_timeout - self.opts_clean = _clean - - # File / screen logger initialization - self.logger = CLITestLogger(file_name=self.opts_log_file_name) # Default test logger - - # Database related initializations - self.db_logger = factory_db_logger(self.opts_db_url) - self.db_logger_build_id = None # Build ID (database index of build_id table) - # Let's connect to database to set up credentials and confirm database is ready - if self.db_logger: - self.db_logger.connect_url(self.opts_db_url) # Save db access info inside db_logger object - if self.db_logger.is_connected(): - # Get hostname and uname so we can use it as build description - # when creating new build_id in external database - (_hostname, _uname) = self.db_logger.get_hostname() - _host_location = os.path.dirname(os.path.abspath(__file__)) - build_id_type = None if self.opts_only_build_tests is None else self.db_logger.BUILD_ID_TYPE_BUILD_ONLY - self.db_logger_build_id = self.db_logger.get_next_build_id(_hostname, desc=_uname, location=_host_location, type=build_id_type) - self.db_logger.disconnect() - - def dump_options(self): - """ Function returns data structure with common settings passed to SingelTestRunner - It can be used for example to fill _extra fields in database storing test suite single run data - Example: - data = self.dump_options() - or - data_str = json.dumps(self.dump_options()) - """ - result = {"db_url" : str(self.opts_db_url), - "log_file_name" : str(self.opts_log_file_name), - "shuffle_test_order" : str(self.opts_shuffle_test_order), - "shuffle_test_seed" : str(self.opts_shuffle_test_seed), - "test_by_names" : str(self.opts_test_by_names), - "peripheral_by_names" : str(self.opts_peripheral_by_names), - "test_only_peripheral" : str(self.opts_test_only_peripheral), - "test_only_common" : str(self.opts_test_only_common), - "verbose" : str(self.opts_verbose), - "firmware_global_name" : str(self.opts_firmware_global_name), - "only_build_tests" : str(self.opts_only_build_tests), - "copy_method" : str(self.opts_copy_method), - "mut_reset_type" : str(self.opts_mut_reset_type), - "jobs" : str(self.opts_jobs), - "extend_test_timeout" : str(self.opts_extend_test_timeout), - "_dummy" : '' - } - return result - - def shuffle_random_func(self): - return self.shuffle_random_seed - - def is_shuffle_seed_float(self): - """ return true if function parameter can be converted to float - """ - result = True - try: - float(self.shuffle_random_seed) - except ValueError: - result = False - return result - - # This will store target / toolchain specific properties - test_suite_properties_ext = {} # target : toolchain - # Here we store test results - test_summary = [] - # Here we store test results in extended data structure - test_summary_ext = {} - execute_thread_slice_lock = Lock() - - def execute_thread_slice(self, q, target, toolchains, clean, test_ids, build_report): - for toolchain in toolchains: - # Toolchain specific build successes and failures - build_report[toolchain] = { - "mbed_failure": False, - "library_failure": False, - "library_build_passing": [], - "library_build_failing": [], - "test_build_passing": [], - "test_build_failing": [] - } - # print target, toolchain - # Test suite properties returned to external tools like CI - test_suite_properties = {} - test_suite_properties['jobs'] = self.opts_jobs - test_suite_properties['clean'] = clean - test_suite_properties['target'] = target - test_suite_properties['test_ids'] = ', '.join(test_ids) - test_suite_properties['toolchain'] = toolchain - test_suite_properties['shuffle_random_seed'] = self.shuffle_random_seed - - - # print '=== %s::%s ===' % (target, toolchain) - # Let's build our test - if target not in TARGET_MAP: - print self.logger.log_line(self.logger.LogType.NOTIF, 'Skipped tests for %s target. Target platform not found'% (target)) - continue - - T = TARGET_MAP[target] - build_mbed_libs_options = ["analyze"] if self.opts_goanna_for_mbed_sdk else None - clean_mbed_libs_options = True if self.opts_goanna_for_mbed_sdk or clean or self.opts_clean else None - - try: - build_mbed_libs_result = build_mbed_libs(T, - toolchain, - options=build_mbed_libs_options, - clean=clean_mbed_libs_options, - jobs=self.opts_jobs) - - if not build_mbed_libs_result: - print self.logger.log_line(self.logger.LogType.NOTIF, 'Skipped tests for %s target. Toolchain %s is not yet supported for this target'% (T.name, toolchain)) - continue - except ToolException: - print self.logger.log_line(self.logger.LogType.ERROR, 'There were errors while building MBED libs for %s using %s'% (target, toolchain)) - build_report[toolchain]["mbed_failure"] = True - #return self.test_summary, self.shuffle_random_seed, self.test_summary_ext, self.test_suite_properties_ext - continue - - build_dir = join(BUILD_DIR, "test", target, toolchain) - - test_suite_properties['build_mbed_libs_result'] = build_mbed_libs_result - test_suite_properties['build_dir'] = build_dir - test_suite_properties['skipped'] = [] - - # Enumerate through all tests and shuffle test order if requested - test_map_keys = sorted(TEST_MAP.keys()) - - if self.opts_shuffle_test_order: - random.shuffle(test_map_keys, self.shuffle_random_func) - # Update database with shuffle seed f applicable - if self.db_logger: - self.db_logger.reconnect(); - if self.db_logger.is_connected(): - self.db_logger.update_build_id_info(self.db_logger_build_id, _shuffle_seed=self.shuffle_random_func()) - self.db_logger.disconnect(); - - if self.db_logger: - self.db_logger.reconnect(); - if self.db_logger.is_connected(): - # Update MUTs and Test Specification in database - self.db_logger.update_build_id_info(self.db_logger_build_id, _muts=self.muts, _test_spec=self.test_spec) - # Update Extra information in database (some options passed to test suite) - self.db_logger.update_build_id_info(self.db_logger_build_id, _extra=json.dumps(self.dump_options())) - self.db_logger.disconnect(); - - valid_test_map_keys = self.get_valid_tests(test_map_keys, target, toolchain, test_ids) - skipped_test_map_keys = self.get_skipped_tests(test_map_keys, valid_test_map_keys) - - for skipped_test_id in skipped_test_map_keys: - test_suite_properties['skipped'].append(skipped_test_id) - - - # First pass through all tests and determine which libraries need to be built - libraries = set() - for test_id in valid_test_map_keys: - test = TEST_MAP[test_id] - - # Detect which lib should be added to test - # Some libs have to compiled like RTOS or ETH - for lib in LIBRARIES: - if lib['build_dir'] in test.dependencies: - libraries.add(lib['id']) - - - build_project_options = ["analyze"] if self.opts_goanna_for_tests else None - clean_project_options = True if self.opts_goanna_for_tests or clean or self.opts_clean else None - - # Build all required libraries - for lib_id in libraries: - try: - build_lib(lib_id, - T, - toolchain, - options=build_project_options, - verbose=self.opts_verbose, - clean=clean_mbed_libs_options, - jobs=self.opts_jobs) - - build_report[toolchain]["library_build_passing"].append(lib_id) - - except ToolException: - print self.logger.log_line(self.logger.LogType.ERROR, 'There were errors while building library %s'% (lib_id)) - build_report[toolchain]["library_failure"] = True - build_report[toolchain]["library_build_failing"].append(lib_id) - #return self.test_summary, self.shuffle_random_seed, self.test_summary_ext, self.test_suite_properties_ext - continue - - - - - for test_id in valid_test_map_keys: - test = TEST_MAP[test_id] - - test_suite_properties['test.libs.%s.%s.%s'% (target, toolchain, test_id)] = ', '.join(libraries) - - # TODO: move this 2 below loops to separate function - INC_DIRS = [] - for lib_id in libraries: - if 'inc_dirs_ext' in LIBRARY_MAP[lib_id] and LIBRARY_MAP[lib_id]['inc_dirs_ext']: - INC_DIRS.extend(LIBRARY_MAP[lib_id]['inc_dirs_ext']) - - MACROS = [] - for lib_id in libraries: - if 'macros' in LIBRARY_MAP[lib_id] and LIBRARY_MAP[lib_id]['macros']: - MACROS.extend(LIBRARY_MAP[lib_id]['macros']) - MACROS.append('TEST_SUITE_TARGET_NAME="%s"'% target) - MACROS.append('TEST_SUITE_TEST_ID="%s"'% test_id) - test_uuid = uuid.uuid4() - MACROS.append('TEST_SUITE_UUID="%s"'% str(test_uuid)) - - project_name = self.opts_firmware_global_name if self.opts_firmware_global_name else None - try: - path = build_project(test.source_dir, - join(build_dir, test_id), - T, - toolchain, - test.dependencies, - options=build_project_options, - clean=clean_project_options, - verbose=self.opts_verbose, - name=project_name, - macros=MACROS, - inc_dirs=INC_DIRS, - jobs=self.opts_jobs) - build_report[toolchain]["test_build_passing"].append(test_id) - - except ToolException: - project_name_str = project_name if project_name is not None else test_id - print self.logger.log_line(self.logger.LogType.ERROR, 'There were errors while building project %s'% (project_name_str)) - build_report[toolchain]["test_build_failing"].append(test_id) - # return self.test_summary, self.shuffle_random_seed, self.test_summary_ext, self.test_suite_properties_ext - continue - - if self.opts_only_build_tests: - # With this option we are skipping testing phase - continue - - # Test duration can be increased by global value - test_duration = test.duration - if self.opts_extend_test_timeout is not None: - test_duration += self.opts_extend_test_timeout - - # For an automated test the duration act as a timeout after - # which the test gets interrupted - test_spec = self.shape_test_request(target, path, test_id, test_duration) - test_loops = self.get_test_loop_count(test_id) - - test_suite_properties['test.duration.%s.%s.%s'% (target, toolchain, test_id)] = test_duration - test_suite_properties['test.loops.%s.%s.%s'% (target, toolchain, test_id)] = test_loops - test_suite_properties['test.path.%s.%s.%s'% (target, toolchain, test_id)] = path - - # read MUTs, test specification and perform tests - single_test_result, detailed_test_results = self.handle(test_spec, target, toolchain, test_loops=test_loops) - - # Append test results to global test summary - if single_test_result is not None: - self.test_summary.append(single_test_result) - - # Prepare extended test results data structure (it can be used to generate detailed test report) - if toolchain not in self.test_summary_ext: - self.test_summary_ext[toolchain] = {} # test_summary_ext : toolchain - if target not in self.test_summary_ext[toolchain]: - self.test_summary_ext[toolchain][target] = {} # test_summary_ext : toolchain : target - if target not in self.test_summary_ext[toolchain][target]: - self.test_summary_ext[toolchain][target][test_id] = detailed_test_results # test_summary_ext : toolchain : target : test_it - - test_suite_properties['skipped'] = ', '.join(test_suite_properties['skipped']) - self.test_suite_properties_ext[target][toolchain] = test_suite_properties - - # return self.test_summary, self.shuffle_random_seed, test_summary_ext, self.test_suite_properties_ext - q.put(target + '_'.join(toolchains)) - return - - def execute(self): - clean = self.test_spec.get('clean', False) - test_ids = self.test_spec.get('test_ids', []) - q = Queue() - - # Generate seed for shuffle if seed is not provided in - self.shuffle_random_seed = round(random.random(), self.SHUFFLE_SEED_ROUND) - if self.opts_shuffle_test_seed is not None and self.is_shuffle_seed_float(): - self.shuffle_random_seed = round(float(self.opts_shuffle_test_seed), self.SHUFFLE_SEED_ROUND) - - build_reports = [] - - if self.opts_parallel_test_exec: - ################################################################### - # Experimental, parallel test execution per singletest instance. - ################################################################### - execute_threads = [] # Threads used to build mbed SDL, libs, test cases and execute tests - # Note: We are building here in parallel for each target separately! - # So we are not building the same thing multiple times and compilers - # in separate threads do not collide. - # Inside execute_thread_slice() function function handle() will be called to - # get information about available MUTs (per target). - for target, toolchains in self.test_spec['targets'].iteritems(): - self.test_suite_properties_ext[target] = {} - cur_build_report = {} - t = threading.Thread(target=self.execute_thread_slice, args = (q, target, toolchains, clean, test_ids, cur_build_report)) - build_reports.append({ "target": target, "report": cur_build_report}) - t.daemon = True - t.start() - execute_threads.append(t) - - for t in execute_threads: - q.get() # t.join() would block some threads because we should not wait in any order for thread end - else: - # Serialized (not parallel) test execution - for target, toolchains in self.test_spec['targets'].iteritems(): - if target not in self.test_suite_properties_ext: - self.test_suite_properties_ext[target] = {} - - cur_build_report = {} - self.execute_thread_slice(q, target, toolchains, clean, test_ids, cur_build_report) - build_reports.append({ "target": target, "report": cur_build_report}) - q.get() - - build_report = [] - - for target_build_report in build_reports: - cur_report = { - "target": target_build_report["target"], - "passing": [], - "failing": [] - } - - for toolchain in sorted(target_build_report["report"], key=target_build_report["report"].get): - print "%s - %s" % (target_build_report["target"], toolchain) - report = target_build_report["report"][toolchain] - - if report["mbed_failure"]: - cur_report["failing"].append({ - "toolchain": toolchain, - "project": "mbed library" - }) - else: - for passing_library in report["library_build_failing"]: - cur_report["failing"].append({ - "toolchain": toolchain, - "project": "Library::%s" % (passing_library) - }) - - for failing_library in report["library_build_passing"]: - cur_report["passing"].append({ - "toolchain": toolchain, - "project": "Library::%s" % (failing_library) - }) - - for passing_test in report["test_build_passing"]: - cur_report["passing"].append({ - "toolchain": toolchain, - "project": "Test::%s" % (passing_test) - }) - - for failing_test in report["test_build_failing"]: - cur_report["failing"].append({ - "toolchain": toolchain, - "project": "Test::%s" % (failing_test) - }) - - - build_report.append(cur_report) - - if self.db_logger: - self.db_logger.reconnect(); - if self.db_logger.is_connected(): - self.db_logger.update_build_id_info(self.db_logger_build_id, _status_fk=self.db_logger.BUILD_ID_STATUS_COMPLETED) - self.db_logger.disconnect(); - - return self.test_summary, self.shuffle_random_seed, self.test_summary_ext, self.test_suite_properties_ext, build_report - - def get_valid_tests(self, test_map_keys, target, toolchain, test_ids): - valid_test_map_keys = [] - - for test_id in test_map_keys: - test = TEST_MAP[test_id] - if self.opts_test_by_names and test_id not in self.opts_test_by_names.split(','): - continue - - if test_ids and test_id not in test_ids: - continue - - if self.opts_test_only_peripheral and not test.peripherals: - if self.opts_verbose_skipped_tests: - print self.logger.log_line(self.logger.LogType.INFO, 'Common test skipped for target %s'% (target)) - continue - - if self.opts_peripheral_by_names and test.peripherals and not len([i for i in test.peripherals if i in self.opts_peripheral_by_names.split(',')]): - # We will skip tests not forced with -p option - if self.opts_verbose_skipped_tests: - print self.logger.log_line(self.logger.LogType.INFO, 'Common test skipped for target %s'% (target)) - continue - - if self.opts_test_only_common and test.peripherals: - if self.opts_verbose_skipped_tests: - print self.logger.log_line(self.logger.LogType.INFO, 'Peripheral test skipped for target %s'% (target)) - continue - - if test.automated and test.is_supported(target, toolchain): - if test.peripherals is None and self.opts_only_build_tests: - # When users are using 'build only flag' and test do not have - # specified peripherals we can allow test building by default - pass - elif self.opts_peripheral_by_names and test_id not in self.opts_peripheral_by_names.split(','): - # If we force peripheral with option -p we expect test - # to pass even if peripheral is not in MUTs file. - pass - elif not self.is_peripherals_available(target, test.peripherals): - if self.opts_verbose_skipped_tests: - if test.peripherals: - print self.logger.log_line(self.logger.LogType.INFO, 'Peripheral %s test skipped for target %s'% (",".join(test.peripherals), target)) - else: - print self.logger.log_line(self.logger.LogType.INFO, 'Test %s skipped for target %s'% (test_id, target)) - continue - - # The test has made it through all the filters, so add it to the valid tests list - valid_test_map_keys.append(test_id) - - return valid_test_map_keys - - def get_skipped_tests(self, all_test_map_keys, valid_test_map_keys): - # NOTE: This will not preserve order - return list(set(all_test_map_keys) - set(valid_test_map_keys)) - - def generate_test_summary_by_target(self, test_summary, shuffle_seed=None): - """ Prints well-formed summary with results (SQL table like) - table shows text x toolchain test result matrix - """ - RESULT_INDEX = 0 - TARGET_INDEX = 1 - TOOLCHAIN_INDEX = 2 - TEST_INDEX = 3 - DESC_INDEX = 4 - - unique_targets = get_unique_value_from_summary(test_summary, TARGET_INDEX) - unique_tests = get_unique_value_from_summary(test_summary, TEST_INDEX) - unique_test_desc = get_unique_value_from_summary_ext(test_summary, TEST_INDEX, DESC_INDEX) - unique_toolchains = get_unique_value_from_summary(test_summary, TOOLCHAIN_INDEX) - - result = "Test summary:\n" - for target in unique_targets: - result_dict = {} # test : { toolchain : result } - unique_target_toolchains = [] - for test in test_summary: - if test[TARGET_INDEX] == target: - if test[TOOLCHAIN_INDEX] not in unique_target_toolchains: - unique_target_toolchains.append(test[TOOLCHAIN_INDEX]) - if test[TEST_INDEX] not in result_dict: - result_dict[test[TEST_INDEX]] = {} - result_dict[test[TEST_INDEX]][test[TOOLCHAIN_INDEX]] = test[RESULT_INDEX] - - pt_cols = ["Target", "Test ID", "Test Description"] + unique_target_toolchains - pt = PrettyTable(pt_cols) - for col in pt_cols: - pt.align[col] = "l" - pt.padding_width = 1 # One space between column edges and contents (default) - - for test in unique_tests: - if test in result_dict: - test_results = result_dict[test] - if test in unique_test_desc: - row = [target, test, unique_test_desc[test]] - for toolchain in unique_toolchains: - if toolchain in test_results: - row.append(test_results[toolchain]) - pt.add_row(row) - result += pt.get_string() - shuffle_seed_text = "Shuffle Seed: %.*f"% (self.SHUFFLE_SEED_ROUND, - shuffle_seed if shuffle_seed else self.shuffle_random_seed) - result += "\n%s"% (shuffle_seed_text if self.opts_shuffle_test_order else '') - return result - - def generate_test_summary(self, test_summary, shuffle_seed=None): - """ Prints well-formed summary with results (SQL table like) - table shows target x test results matrix across - """ - result = "Test summary:\n" - # Pretty table package is used to print results - pt = PrettyTable(["Result", "Target", "Toolchain", "Test ID", "Test Description", - "Elapsed Time (sec)", "Timeout (sec)", "Loops"]) - pt.align["Result"] = "l" # Left align - pt.align["Target"] = "l" # Left align - pt.align["Toolchain"] = "l" # Left align - pt.align["Test ID"] = "l" # Left align - pt.align["Test Description"] = "l" # Left align - pt.padding_width = 1 # One space between column edges and contents (default) - - result_dict = {self.TEST_RESULT_OK : 0, - self.TEST_RESULT_FAIL : 0, - self.TEST_RESULT_ERROR : 0, - self.TEST_RESULT_UNDEF : 0, - self.TEST_RESULT_IOERR_COPY : 0, - self.TEST_RESULT_IOERR_DISK : 0, - self.TEST_RESULT_IOERR_SERIAL : 0, - self.TEST_RESULT_NO_IMAGE : 0, - self.TEST_RESULT_TIMEOUT : 0, - self.TEST_RESULT_MBED_ASSERT : 0 - } - - for test in test_summary: - if test[0] in result_dict: - result_dict[test[0]] += 1 - pt.add_row(test) - result += pt.get_string() - result += "\n" - - # Print result count - result += "Result: " + ' / '.join(['%s %s' % (value, key) for (key, value) in {k: v for k, v in result_dict.items() if v != 0}.iteritems()]) - shuffle_seed_text = "Shuffle Seed: %.*f\n"% (self.SHUFFLE_SEED_ROUND, - shuffle_seed if shuffle_seed else self.shuffle_random_seed) - result += "\n%s"% (shuffle_seed_text if self.opts_shuffle_test_order else '') - return result - - def test_loop_list_to_dict(self, test_loops_str): - """ Transforms test_id=X,test_id=X,test_id=X into dictionary {test_id : test_id_loops_count} - """ - result = {} - if test_loops_str: - test_loops = test_loops_str.split(',') - for test_loop in test_loops: - test_loop_count = test_loop.split('=') - if len(test_loop_count) == 2: - _test_id, _test_loops = test_loop_count - try: - _test_loops = int(_test_loops) - except: - continue - result[_test_id] = _test_loops - return result - - def get_test_loop_count(self, test_id): - """ This function returns no. of loops per test (deducted by test_id_. - If test is not in list of redefined loop counts it will use default value. - """ - result = self.GLOBAL_LOOPS_COUNT - if test_id in self.TEST_LOOPS_DICT: - result = self.TEST_LOOPS_DICT[test_id] - return result - - def delete_file(self, file_path): - """ Remove file from the system - """ - result = True - resutl_msg = "" - try: - os.remove(file_path) - except Exception, e: - resutl_msg = e - result = False - return result, resutl_msg - - def handle(self, test_spec, target_name, toolchain_name, test_loops=1): - """ Function determines MUT's mbed disk/port and copies binary to - target. - Test is being invoked afterwards. - """ - data = json.loads(test_spec) - # Get test information, image and test timeout - test_id = data['test_id'] - test = TEST_MAP[test_id] - test_description = TEST_MAP[test_id].get_description() - image = data["image"] - duration = data.get("duration", 10) - - # Find a suitable MUT: - mut = None - for id, m in self.muts.iteritems(): - if m['mcu'] == data['mcu']: - mut = m - break - - if mut is None: - print "Error: No Mbed available: MUT[%s]" % data['mcu'] - return None - - disk = mut.get('disk') - port = mut.get('port') - - if disk is None or port is None: - return None - - target_by_mcu = TARGET_MAP[mut['mcu']] - # Some extra stuff can be declared in MUTs structure - reset_type = mut.get('reset_type') # reboot.txt, reset.txt, shutdown.txt - reset_tout = mut.get('reset_tout') # COPY_IMAGE -> RESET_PROC -> SLEEP(RESET_TOUT) - image_dest = mut.get('image_dest') # Image file destination DISK + IMAGE_DEST + BINARY_NAME - images_config = mut.get('images_config') # Available images selection via config file - mobo_config = mut.get('mobo_config') # Available board configuration selection e.g. core selection etc. - copy_method = mut.get('copy_method') # Available board configuration selection e.g. core selection etc. - - # When the build and test system were separate, this was relative to a - # base network folder base path: join(NETWORK_BASE_PATH, ) - image_path = image - - if self.db_logger: - self.db_logger.reconnect() - - selected_copy_method = self.opts_copy_method if copy_method is None else copy_method - - # Tests can be looped so test results must be stored for the same test - test_all_result = [] - # Test results for one test ran few times - detailed_test_results = {} # { Loop_number: { results ... } } - - for test_index in range(test_loops): - # Host test execution - start_host_exec_time = time() - - single_test_result = self.TEST_RESULT_UNDEF # single test run result - _copy_method = selected_copy_method - - if not exists(image_path): - single_test_result = self.TEST_RESULT_NO_IMAGE - elapsed_time = 0 - single_test_output = self.logger.log_line(self.logger.LogType.ERROR, 'Image file does not exist: %s'% image_path) - print single_test_output - else: - # Host test execution - start_host_exec_time = time() - - host_test_verbose = self.opts_verbose_test_result_only or self.opts_verbose - host_test_reset = self.opts_mut_reset_type if reset_type is None else reset_type - host_test_result = self.run_host_test(test.host_test, - image_path, disk, port, duration, - micro=target_name, - verbose=host_test_verbose, - reset=host_test_reset, - reset_tout=reset_tout, - copy_method=selected_copy_method, - program_cycle_s=target_by_mcu.program_cycle_s()) - single_test_result, single_test_output, single_testduration, single_timeout = host_test_result - - # Store test result - test_all_result.append(single_test_result) - total_elapsed_time = time() - start_host_exec_time # Test time with copy (flashing) / reset - elapsed_time = single_testduration # TIme of single test case execution after reset - - detailed_test_results[test_index] = { - 'single_test_result' : single_test_result, - 'single_test_output' : single_test_output, - 'target_name' : target_name, - 'toolchain_name' : toolchain_name, - 'test_id' : test_id, - 'test_description' : test_description, - 'elapsed_time' : round(elapsed_time, 2), - 'duration' : single_timeout, - 'copy_method' : _copy_method, - } - - print self.print_test_result(single_test_result, target_name, toolchain_name, - test_id, test_description, elapsed_time, single_timeout) - - # Update database entries for ongoing test - if self.db_logger and self.db_logger.is_connected(): - test_type = 'SingleTest' - self.db_logger.insert_test_entry(self.db_logger_build_id, - target_name, - toolchain_name, - test_type, - test_id, - single_test_result, - single_test_output, - elapsed_time, - single_timeout, - test_index) - - # If we perform waterfall test we test until we get OK and we stop testing - if self.opts_waterfall_test and single_test_result == self.TEST_RESULT_OK: - break - - if self.db_logger: - self.db_logger.disconnect() - - return (self.shape_global_test_loop_result(test_all_result), - target_name, - toolchain_name, - test_id, - test_description, - round(elapsed_time, 2), - single_timeout, - self.shape_test_loop_ok_result_count(test_all_result)), detailed_test_results - - def print_test_result(self, test_result, target_name, toolchain_name, - test_id, test_description, elapsed_time, duration): - """ Use specific convention to print test result and related data - """ - tokens = [] - tokens.append("TargetTest") - tokens.append(target_name) - tokens.append(toolchain_name) - tokens.append(test_id) - tokens.append(test_description) - separator = "::" - time_info = " in %.2f of %d sec" % (round(elapsed_time, 2), duration) - result = separator.join(tokens) + " [" + test_result +"]" + time_info - return Fore.MAGENTA + result + Fore.RESET - - def shape_test_loop_ok_result_count(self, test_all_result): - """ Reformats list of results to simple string - """ - test_loop_count = len(test_all_result) - test_loop_ok_result = test_all_result.count(self.TEST_RESULT_OK) - return "%d/%d"% (test_loop_ok_result, test_loop_count) - - def shape_global_test_loop_result(self, test_all_result): - """ Reformats list of results to simple string - """ - result = self.TEST_RESULT_FAIL - if all(test_all_result[0] == res for res in test_all_result): - result = test_all_result[0] - return result - - def run_host_test(self, name, image_path, disk, port, duration, - micro=None, reset=None, reset_tout=None, - verbose=False, copy_method=None, program_cycle_s=None): - """ Function creates new process with host test configured with particular test case. - Function also is pooling for serial port activity from process to catch all data - printed by test runner and host test during test execution - """ - - def get_char_from_queue(obs): - """ Get character from queue safe way - """ - try: - c = obs.queue.get(block=True, timeout=0.5) - except Empty, _: - c = None - return c - - def filter_queue_char(c): - """ Filters out non ASCII characters from serial port - """ - if ord(c) not in range(128): - c = ' ' - return c - - def get_test_result(output): - """ Parse test 'output' data - """ - result = self.TEST_RESULT_TIMEOUT - for line in "".join(output).splitlines(): - search_result = self.RE_DETECT_TESTCASE_RESULT.search(line) - if search_result and len(search_result.groups()): - result = self.TEST_RESULT_MAPPING[search_result.groups(0)[0]] - break - return result - - def get_auto_property_value(property_name, line): - """ Scans auto detection line from MUT and returns scanned parameter 'property_name' - Returns string - """ - result = None - if re.search("HOST: Property '%s'"% property_name, line) is not None: - property = re.search("HOST: Property '%s' = '([\w\d _]+)'"% property_name, line) - if property is not None and len(property.groups()) == 1: - result = property.groups()[0] - return result - - # print "{%s} port:%s disk:%s" % (name, port, disk), - cmd = ["python", - '%s.py'% name, - '-d', disk, - '-f', '"%s"'% image_path, - '-p', port, - '-t', str(duration), - '-C', str(program_cycle_s)] - - # Add extra parameters to host_test - if copy_method is not None: - cmd += ["-c", copy_method] - if micro is not None: - cmd += ["-m", micro] - if reset is not None: - cmd += ["-r", reset] - if reset_tout is not None: - cmd += ["-R", str(reset_tout)] - - if verbose: - print Fore.MAGENTA + "Executing '" + " ".join(cmd) + "'" + Fore.RESET - print "Test::Output::Start" - - proc = Popen(cmd, stdout=PIPE, cwd=HOST_TESTS) - obs = ProcessObserver(proc) - update_once_flag = {} # Stores flags checking if some auto-parameter was already set - line = '' - output = [] - start_time = time() - while (time() - start_time) < (2 * duration): - c = get_char_from_queue(obs) - if c: - if verbose: - sys.stdout.write(c) - c = filter_queue_char(c) - output.append(c) - # Give the mbed under test a way to communicate the end of the test - if c in ['\n', '\r']: - - # Checking for auto-detection information from the test about MUT reset moment - if 'reset_target' not in update_once_flag and "HOST: Reset target..." in line: - # We will update this marker only once to prevent multiple time resets - update_once_flag['reset_target'] = True - start_time = time() - - # Checking for auto-detection information from the test about timeout - auto_timeout_val = get_auto_property_value('timeout', line) - if 'timeout' not in update_once_flag and auto_timeout_val is not None: - # We will update this marker only once to prevent multiple time resets - update_once_flag['timeout'] = True - duration = int(auto_timeout_val) - - # Detect mbed assert: - if 'mbed assertation failed: ' in line: - output.append('{{mbed_assert}}') - break - - # Check for test end - if '{end}' in line: - break - line = '' - else: - line += c - end_time = time() - testcase_duration = end_time - start_time # Test case duration from reset to {end} - - c = get_char_from_queue(obs) - - if c: - if verbose: - sys.stdout.write(c) - c = filter_queue_char(c) - output.append(c) - - if verbose: - print "Test::Output::Finish" - # Stop test process - obs.stop() - - result = get_test_result(output) - return (result, "".join(output), testcase_duration, duration) - - def is_peripherals_available(self, target_mcu_name, peripherals=None): - """ Checks if specified target should run specific peripheral test case defined in MUTs file - """ - if peripherals is not None: - peripherals = set(peripherals) - for id, mut in self.muts.iteritems(): - # Target MCU name check - if mut["mcu"] != target_mcu_name: - continue - # Peripherals check - if peripherals is not None: - if 'peripherals' not in mut: - continue - if not peripherals.issubset(set(mut['peripherals'])): - continue - return True - return False - - def shape_test_request(self, mcu, image_path, test_id, duration=10): - """ Function prepares JSON structure describing test specification - """ - test_spec = { - "mcu": mcu, - "image": image_path, - "duration": duration, - "test_id": test_id, - } - return json.dumps(test_spec) - - -def get_unique_value_from_summary(test_summary, index): - """ Gets list of unique target names - """ - result = [] - for test in test_summary: - target_name = test[index] - if target_name not in result: - result.append(target_name) - return sorted(result) - - -def get_unique_value_from_summary_ext(test_summary, index_key, index_val): - """ Gets list of unique target names and return dictionary - """ - result = {} - for test in test_summary: - key = test[index_key] - val = test[index_val] - if key not in result: - result[key] = val - return result - - -def show_json_file_format_error(json_spec_filename, line, column): - """ Prints JSON broken content - """ - with open(json_spec_filename) as data_file: - line_no = 1 - for json_line in data_file: - if line_no + 5 >= line: # Print last few lines before error - print 'Line %d:\t'%line_no + json_line, # Prints line - if line_no == line: - print ' ' * len('Line %d:'%line_no) + '\t', '-' * (column-1) + '^' - break - line_no += 1 - - -def json_format_error_defect_pos(json_error_msg): - """ Gets first error line and column in JSON file format. - Parsed from exception thrown by json.loads() string - """ - result = None - line, column = 0, 0 - # Line value search - line_search = re.search('line [0-9]+', json_error_msg) - if line_search is not None: - ls = line_search.group().split(' ') - if len(ls) == 2: - line = int(ls[1]) - # Column position search - column_search = re.search('column [0-9]+', json_error_msg) - if column_search is not None: - cs = column_search.group().split(' ') - if len(cs) == 2: - column = int(cs[1]) - result = [line, column] - return result - - -def get_json_data_from_file(json_spec_filename, verbose=False): - """ Loads from file JSON formatted string to data structure - """ - result = None - try: - with open(json_spec_filename) as data_file: - try: - result = json.load(data_file) - except ValueError as json_error_msg: - result = None - print 'JSON file %s parsing failed. Reason: %s' % (json_spec_filename, json_error_msg) - # We can print where error occurred inside JSON file if we can parse exception msg - json_format_defect_pos = json_format_error_defect_pos(str(json_error_msg)) - if json_format_defect_pos is not None: - line = json_format_defect_pos[0] - column = json_format_defect_pos[1] - print - show_json_file_format_error(json_spec_filename, line, column) - - except IOError as fileopen_error_msg: - print 'JSON file %s not opened. Reason: %s'% (json_spec_filename, fileopen_error_msg) - print - if verbose and result: - pp = pprint.PrettyPrinter(indent=4) - pp.pprint(result) - return result - - -def print_muts_configuration_from_json(json_data, join_delim=", ", platform_filter=None): - """ Prints MUTs configuration passed to test script for verboseness - """ - muts_info_cols = [] - # We need to check all unique properties for each defined MUT - for k in json_data: - mut_info = json_data[k] - for mut_property in mut_info: - if mut_property not in muts_info_cols: - muts_info_cols.append(mut_property) - - # Prepare pretty table object to display all MUTs - pt_cols = ["index"] + muts_info_cols - pt = PrettyTable(pt_cols) - for col in pt_cols: - pt.align[col] = "l" - - # Add rows to pretty print object - for k in json_data: - row = [k] - mut_info = json_data[k] - - add_row = True - if platform_filter and 'mcu' in mut_info: - add_row = re.search(platform_filter, mut_info['mcu']) is not None - if add_row: - for col in muts_info_cols: - cell_val = mut_info[col] if col in mut_info else None - if type(cell_val) == ListType: - cell_val = join_delim.join(cell_val) - row.append(cell_val) - pt.add_row(row) - return pt.get_string() - - -def print_test_configuration_from_json(json_data, join_delim=", "): - """ Prints test specification configuration passed to test script for verboseness - """ - toolchains_info_cols = [] - # We need to check all toolchains for each device - for k in json_data: - # k should be 'targets' - targets = json_data[k] - for target in targets: - toolchains = targets[target] - for toolchain in toolchains: - if toolchain not in toolchains_info_cols: - toolchains_info_cols.append(toolchain) - - # Prepare pretty table object to display test specification - pt_cols = ["mcu"] + sorted(toolchains_info_cols) - pt = PrettyTable(pt_cols) - for col in pt_cols: - pt.align[col] = "l" - - # { target : [conflicted toolchains] } - toolchain_conflicts = {} - toolchain_path_conflicts = [] - for k in json_data: - # k should be 'targets' - targets = json_data[k] - for target in targets: - target_supported_toolchains = get_target_supported_toolchains(target) - if not target_supported_toolchains: - target_supported_toolchains = [] - target_name = target if target in TARGET_MAP else "%s*"% target - row = [target_name] - toolchains = targets[target] - - for toolchain in sorted(toolchains_info_cols): - # Check for conflicts: target vs toolchain - conflict = False - conflict_path = False - if toolchain in toolchains: - if toolchain not in target_supported_toolchains: - conflict = True - if target not in toolchain_conflicts: - toolchain_conflicts[target] = [] - toolchain_conflicts[target].append(toolchain) - # Add marker inside table about target usage / conflict - cell_val = 'Yes' if toolchain in toolchains else '-' - if conflict: - cell_val += '*' - # Check for conflicts: toolchain vs toolchain path - if toolchain in TOOLCHAIN_BIN_PATH: - toolchain_path = TOOLCHAIN_BIN_PATH[toolchain] - if not os.path.isdir(toolchain_path): - conflict_path = True - if toolchain not in toolchain_path_conflicts: - toolchain_path_conflicts.append(toolchain) - if conflict_path: - cell_val += '#' - row.append(cell_val) - pt.add_row(row) - - # generate result string - result = pt.get_string() # Test specification table - if toolchain_conflicts or toolchain_path_conflicts: - result += "\n" - result += "Toolchain conflicts:\n" - for target in toolchain_conflicts: - if target not in TARGET_MAP: - result += "\t* Target %s unknown\n"% (target) - conflict_target_list = join_delim.join(toolchain_conflicts[target]) - sufix = 's' if len(toolchain_conflicts[target]) > 1 else '' - result += "\t* Target %s does not support %s toolchain%s\n"% (target, conflict_target_list, sufix) - - for toolchain in toolchain_path_conflicts: - # Let's check toolchain configuration - if toolchain in TOOLCHAIN_BIN_PATH: - toolchain_path = TOOLCHAIN_BIN_PATH[toolchain] - if not os.path.isdir(toolchain_path): - result += "\t# Toolchain %s path not found: %s\n"% (toolchain, toolchain_path) - return result - - -def get_avail_tests_summary_table(cols=None, result_summary=True, join_delim=',',platform_filter=None): - """ Generates table summary with all test cases and additional test cases - information using pretty print functionality. Allows test suite user to - see test cases - """ - # get all unique test ID prefixes - unique_test_id = [] - for test in TESTS: - split = test['id'].split('_')[:-1] - test_id_prefix = '_'.join(split) - if test_id_prefix not in unique_test_id: - unique_test_id.append(test_id_prefix) - unique_test_id.sort() - counter_dict_test_id_types = dict((t, 0) for t in unique_test_id) - counter_dict_test_id_types_all = dict((t, 0) for t in unique_test_id) - - test_properties = ['id', - 'automated', - 'description', - 'peripherals', - 'host_test', - 'duration'] if cols is None else cols - - # All tests status table print - pt = PrettyTable(test_properties) - for col in test_properties: - pt.align[col] = "l" - pt.align['duration'] = "r" - - counter_all = 0 - counter_automated = 0 - pt.padding_width = 1 # One space between column edges and contents (default) - - for test_id in sorted(TEST_MAP.keys()): - if platform_filter is not None: - # FIlter out platforms using regex - if re.search(platform_filter, test_id) is None: - continue - row = [] - test = TEST_MAP[test_id] - split = test_id.split('_')[:-1] - test_id_prefix = '_'.join(split) - - for col in test_properties: - col_value = test[col] - if type(test[col]) == ListType: - col_value = join_delim.join(test[col]) - elif test[col] == None: - col_value = "-" - - row.append(col_value) - if test['automated'] == True: - counter_dict_test_id_types[test_id_prefix] += 1 - counter_automated += 1 - pt.add_row(row) - # Update counters - counter_all += 1 - counter_dict_test_id_types_all[test_id_prefix] += 1 - result = pt.get_string() - result += "\n\n" - - if result_summary and not platform_filter: - # Automation result summary - test_id_cols = ['automated', 'all', 'percent [%]', 'progress'] - pt = PrettyTable(test_id_cols) - pt.align['automated'] = "r" - pt.align['all'] = "r" - pt.align['percent [%]'] = "r" - - percent_progress = round(100.0 * counter_automated / float(counter_all), 1) - str_progress = progress_bar(percent_progress, 75) - pt.add_row([counter_automated, counter_all, percent_progress, str_progress]) - result += "Automation coverage:\n" - result += pt.get_string() - result += "\n\n" - - # Test automation coverage table print - test_id_cols = ['id', 'automated', 'all', 'percent [%]', 'progress'] - pt = PrettyTable(test_id_cols) - pt.align['id'] = "l" - pt.align['automated'] = "r" - pt.align['all'] = "r" - pt.align['percent [%]'] = "r" - for unique_id in unique_test_id: - # print "\t\t%s: %d / %d" % (unique_id, counter_dict_test_id_types[unique_id], counter_dict_test_id_types_all[unique_id]) - percent_progress = round(100.0 * counter_dict_test_id_types[unique_id] / float(counter_dict_test_id_types_all[unique_id]), 1) - str_progress = progress_bar(percent_progress, 75) - row = [unique_id, - counter_dict_test_id_types[unique_id], - counter_dict_test_id_types_all[unique_id], - percent_progress, - "[" + str_progress + "]"] - pt.add_row(row) - result += "Test automation coverage:\n" - result += pt.get_string() - result += "\n\n" - return result - - -def progress_bar(percent_progress, saturation=0): - """ This function creates progress bar with optional simple saturation mark - """ - step = int(percent_progress / 2) # Scale by to (scale: 1 - 50) - str_progress = '#' * step + '.' * int(50 - step) - c = '!' if str_progress[38] == '.' else '|' - if saturation > 0: - saturation = saturation / 2 - str_progress = str_progress[:saturation] + c + str_progress[saturation:] - return str_progress - - -def singletest_in_cli_mode(single_test): - """ Runs SingleTestRunner object in CLI (Command line interface) mode - """ - start = time() - # Execute tests depending on options and filter applied - test_summary, shuffle_seed, test_summary_ext, test_suite_properties_ext, build_report = single_test.execute() - elapsed_time = time() - start - - # Human readable summary - if not single_test.opts_suppress_summary: - # prints well-formed summary with results (SQL table like) - print single_test.generate_test_summary(test_summary, shuffle_seed) - if single_test.opts_test_x_toolchain_summary: - # prints well-formed summary with results (SQL table like) - # table shows text x toolchain test result matrix - print single_test.generate_test_summary_by_target(test_summary, shuffle_seed) - print "Completed in %.2f sec"% (elapsed_time) - - # Store extra reports in files - if single_test.opts_report_html_file_name: - # Export results in form of HTML report to separate file - report_exporter = ReportExporter(ResultExporterType.HTML) - report_exporter.report_to_file(test_summary_ext, single_test.opts_report_html_file_name, test_suite_properties=test_suite_properties_ext) - if single_test.opts_report_junit_file_name: - # Export results in form of JUnit XML report to separate file - report_exporter = ReportExporter(ResultExporterType.JUNIT) - report_exporter.report_to_file(test_summary_ext, single_test.opts_report_junit_file_name, test_suite_properties=test_suite_properties_ext) - if single_test.opts_report_build_file_name: - # Export build results as html report to sparate file - write_build_report(build_report, 'tests_build/report.html', single_test.opts_report_build_file_name) - - -class TestLogger(): - """ Super-class for logging and printing ongoing events for test suite pass - """ - def __init__(self, store_log=True): - """ We can control if logger actually stores log in memory - or just handled all log entries immediately - """ - self.log = [] - self.log_to_file = False - self.log_file_name = None - self.store_log = store_log - - self.LogType = construct_enum(INFO='Info', - WARN='Warning', - NOTIF='Notification', - ERROR='Error', - EXCEPT='Exception') - - self.LogToFileAttr = construct_enum(CREATE=1, # Create or overwrite existing log file - APPEND=2) # Append to existing log file - - def log_line(self, LogType, log_line, timestamp=True, line_delim='\n'): - """ Log one line of text - """ - log_timestamp = time() - log_entry = {'log_type' : LogType, - 'log_timestamp' : log_timestamp, - 'log_line' : log_line, - '_future' : None - } - # Store log in memory - if self.store_log: - self.log.append(log_entry) - return log_entry - - -class CLITestLogger(TestLogger): - """ Logger used with CLI (Command line interface) test suite. Logs on screen and to file if needed - """ - def __init__(self, store_log=True, file_name=None): - TestLogger.__init__(self) - self.log_file_name = file_name - #self.TIMESTAMP_FORMAT = '%y-%m-%d %H:%M:%S' # Full date and time - self.TIMESTAMP_FORMAT = '%H:%M:%S' # Time only - - def log_print(self, log_entry, timestamp=True): - """ Prints on screen formatted log entry - """ - ts = log_entry['log_timestamp'] - timestamp_str = datetime.datetime.fromtimestamp(ts).strftime("[%s] "% self.TIMESTAMP_FORMAT) if timestamp else '' - log_line_str = "%(log_type)s: %(log_line)s"% (log_entry) - return timestamp_str + log_line_str - - def log_line(self, LogType, log_line, timestamp=True, line_delim='\n'): - """ Logs line, if log file output was specified log line will be appended - at the end of log file - """ - log_entry = TestLogger.log_line(self, LogType, log_line) - log_line_str = self.log_print(log_entry, timestamp) - if self.log_file_name is not None: - try: - with open(self.log_file_name, 'a') as f: - f.write(log_line_str + line_delim) - except IOError: - pass - return log_line_str - - -def factory_db_logger(db_url): - """ Factory database driver depending on database type supplied in database connection string db_url - """ - if db_url is not None: - from workspace_tools.test_mysql import MySQLDBAccess - connection_info = BaseDBAccess().parse_db_connection_string(db_url) - if connection_info is not None: - (db_type, username, password, host, db_name) = BaseDBAccess().parse_db_connection_string(db_url) - if db_type == 'mysql': - return MySQLDBAccess() - return None - - -def detect_database_verbose(db_url): - """ uses verbose mode (prints) database detection sequence to check it database connection string is valid - """ - result = BaseDBAccess().parse_db_connection_string(db_url) - if result is not None: - # Parsing passed - (db_type, username, password, host, db_name) = result - #print "DB type '%s', user name '%s', password '%s', host '%s', db name '%s'"% result - # Let's try to connect - db_ = factory_db_logger(db_url) - if db_ is not None: - print "Connecting to database '%s'..."% db_url, - db_.connect(host, username, password, db_name) - if db_.is_connected(): - print "ok" - print "Detecting database..." - print db_.detect_database(verbose=True) - print "Disconnecting...", - db_.disconnect() - print "done" - else: - print "Database type '%s' unknown"% db_type - else: - print "Parse error: '%s' - DB Url error"% (db_url) - - -def get_module_avail(module_name): - """ This function returns True if module_name is already impored module - """ - return module_name in sys.modules.keys() - - -def get_autodetected_MUTS(mbeds_list, platform_name_filter=None): - """ Function detects all connected to host mbed-enabled devices and generates artificial MUTS file. - If function fails to auto-detect devices it will return empty dictionary. - - if get_module_avail('mbed_lstools'): - mbeds = mbed_lstools.create() - mbeds_list = mbeds.list_mbeds() - - @param mbeds_list list of mbeds captured from mbed_lstools - @param platform_name You can filter 'platform_name' with list of filtered targets from 'platform_name_filter' - """ - result = {} # Should be in muts_all.json format - # Align mbeds_list from mbed_lstools to MUT file format (JSON dictionary with muts) - # mbeds_list = [{'platform_name': 'NUCLEO_F302R8', 'mount_point': 'E:', 'target_id': '07050200623B61125D5EF72A', 'serial_port': u'COM34'}] - index = 1 - for mut in mbeds_list: - m = {'mcu' : mut['platform_name'], - 'port' : mut['serial_port'], - 'disk' : mut['mount_point'], - 'peripherals' : [] # No peripheral detection - } - if index not in result: - result[index] = {} - result[index] = m - index += 1 - return result - - -def get_autodetected_TEST_SPEC(mbeds_list, - use_default_toolchain=True, - use_supported_toolchains=False, - toolchain_filter=None, - platform_name_filter=None): - """ Function detects all connected to host mbed-enabled devices and generates artificial test_spec file. - If function fails to auto-detect devices it will return empty 'targets' test_spec description. - - use_default_toolchain - if True add default toolchain to test_spec - use_supported_toolchains - if True add all supported toolchains to test_spec - toolchain_filter - if [...list of toolchains...] add from all toolchains only those in filter to test_spec - """ - result = {'targets': {} } - - for mut in mbeds_list: - mcu = mut['platform_name'] - if platform_name_filter is None or (platform_name_filter and mut['platform_name'] in platform_name_filter): - if mcu in TARGET_MAP: - default_toolchain = TARGET_MAP[mcu].default_toolchain - supported_toolchains = TARGET_MAP[mcu].supported_toolchains - - # Decide which toolchains should be added to test specification toolchain pool for each target - toolchains = [] - if use_default_toolchain: - toolchains.append(default_toolchain) - if use_supported_toolchains: - toolchains += supported_toolchains - if toolchain_filter is not None: - all_toolchains = supported_toolchains + [default_toolchain] - for toolchain in toolchain_filter.split(','): - if toolchain in all_toolchains: - toolchains.append(toolchain) - - result['targets'][mcu] = list(set(toolchains)) - return result - - -def get_default_test_options_parser(): - """ Get common test script options used by CLI, web services etc. - """ - parser = optparse.OptionParser() - parser.add_option('-i', '--tests', - dest='test_spec_filename', - metavar="FILE", - help='Points to file with test specification') - - parser.add_option('-M', '--MUTS', - dest='muts_spec_filename', - metavar="FILE", - help='Points to file with MUTs specification (overwrites settings.py and private_settings.py)') - - parser.add_option("-j", "--jobs", - dest='jobs', - metavar="NUMBER", - type="int", - help="Define number of compilation jobs. Default value is 1") - - if get_module_avail('mbed_lstools'): - # Additional features available when mbed_lstools is installed on host and imported - # mbed_lstools allow users to detect connected to host mbed-enabled devices - parser.add_option('', '--auto', - dest='auto_detect', - metavar=False, - action="store_true", - help='Use mbed-ls module to detect all connected mbed devices') - - parser.add_option('', '--tc', - dest='toolchains_filter', - help="Toolchain filter for --auto option. Use toolcahins names separated by comma, 'default' or 'all' to select toolchains") - - parser.add_option('', '--clean', - dest='clean', - metavar=False, - action="store_true", - help='Clean the build directory') - - parser.add_option('-P', '--only-peripherals', - dest='test_only_peripheral', - default=False, - action="store_true", - help='Test only peripheral declared for MUT and skip common tests') - - parser.add_option('-C', '--only-commons', - dest='test_only_common', - default=False, - action="store_true", - help='Test only board internals. Skip perpherials tests and perform common tests.') - - parser.add_option('-n', '--test-by-names', - dest='test_by_names', - help='Runs only test enumerated it this switch. Use comma to separate test case names.') - - parser.add_option('-p', '--peripheral-by-names', - dest='peripheral_by_names', - help='Forces discovery of particular peripherals. Use comma to separate peripheral names.') - - copy_methods = host_tests_plugins.get_plugin_caps('CopyMethod') - copy_methods_str = "Plugin support: " + ', '.join(copy_methods) - - parser.add_option('-c', '--copy-method', - dest='copy_method', - help="Select binary copy (flash) method. Default is Python's shutil.copy() method. %s"% copy_methods_str) - - reset_methods = host_tests_plugins.get_plugin_caps('ResetMethod') - reset_methods_str = "Plugin support: " + ', '.join(reset_methods) - - parser.add_option('-r', '--reset-type', - dest='mut_reset_type', - default=None, - help='Extra reset method used to reset MUT by host test script. %s'% reset_methods_str) - - parser.add_option('-g', '--goanna-for-tests', - dest='goanna_for_tests', - metavar=False, - action="store_true", - help='Run Goanna static analyse tool for tests. (Project will be rebuilded)') - - parser.add_option('-G', '--goanna-for-sdk', - dest='goanna_for_mbed_sdk', - metavar=False, - action="store_true", - help='Run Goanna static analyse tool for mbed SDK (Project will be rebuilded)') - - parser.add_option('-s', '--suppress-summary', - dest='suppress_summary', - default=False, - action="store_true", - help='Suppresses display of wellformatted table with test results') - - parser.add_option('-t', '--test-summary', - dest='test_x_toolchain_summary', - default=False, - action="store_true", - help='Displays wellformatted table with test x toolchain test result per target') - - parser.add_option('-A', '--test-automation-report', - dest='test_automation_report', - default=False, - action="store_true", - help='Prints information about all tests and exits') - - parser.add_option('-R', '--test-case-report', - dest='test_case_report', - default=False, - action="store_true", - help='Prints information about all test cases and exits') - - parser.add_option("-S", "--supported-toolchains", - action="store_true", - dest="supported_toolchains", - default=False, - help="Displays supported matrix of MCUs and toolchains") - - parser.add_option("-O", "--only-build", - action="store_true", - dest="only_build_tests", - default=False, - help="Only build tests, skips actual test procedures (flashing etc.)") - - parser.add_option('', '--parallel', - dest='parallel_test_exec', - default=False, - action="store_true", - help='Experimental, you execute test runners for connected to your host MUTs in parallel (speeds up test result collection)') - - parser.add_option('', '--config', - dest='verbose_test_configuration_only', - default=False, - action="store_true", - help='Displays full test specification and MUTs configration and exits') - - parser.add_option('', '--loops', - dest='test_loops_list', - help='Set no. of loops per test. Format: TEST_1=1,TEST_2=2,TEST_3=3') - - parser.add_option('', '--global-loops', - dest='test_global_loops_value', - help='Set global number of test loops per test. Default value is set 1') - - parser.add_option('-W', '--waterfall', - dest='waterfall_test', - default=False, - action="store_true", - help='Used with --loops or --global-loops options. Tests until OK result occurs and assumes test passed.') - - parser.add_option('-N', '--firmware-name', - dest='firmware_global_name', - help='Set global name for all produced projects. Note, proper file extension will be added by buid scripts.') - - parser.add_option('-u', '--shuffle', - dest='shuffle_test_order', - default=False, - action="store_true", - help='Shuffles test execution order') - - parser.add_option('', '--shuffle-seed', - dest='shuffle_test_seed', - default=None, - help='Shuffle seed (If you want to reproduce your shuffle order please use seed provided in test summary)') - - parser.add_option('-f', '--filter', - dest='general_filter_regex', - default=None, - help='For some commands you can use filter to filter out results') - - parser.add_option('', '--inc-timeout', - dest='extend_test_timeout', - metavar="NUMBER", - type="int", - help='You can increase global timeout for each test by specifying additional test timeout in seconds') - - parser.add_option('', '--db', - dest='db_url', - help='This specifies what database test suite uses to store its state. To pass DB connection info use database connection string. Example: \'mysql://username:password@127.0.0.1/db_name\'') - - parser.add_option('-l', '--log', - dest='log_file_name', - help='Log events to external file (note not all console entries may be visible in log file)') - - parser.add_option('', '--report-html', - dest='report_html_file_name', - help='You can log test suite results in form of HTML report') - - parser.add_option('', '--report-junit', - dest='report_junit_file_name', - help='You can log test suite results in form of JUnit compliant XML report') - - parser.add_option("", "--report-build", - dest="report_build_file_name", - help="Output the build results to an html file") - - parser.add_option('', '--verbose-skipped', - dest='verbose_skipped_tests', - default=False, - action="store_true", - help='Prints some extra information about skipped tests') - - parser.add_option('-V', '--verbose-test-result', - dest='verbose_test_result_only', - default=False, - action="store_true", - help='Prints test serial output') - - parser.add_option('-v', '--verbose', - dest='verbose', - default=False, - action="store_true", - help='Verbose mode (prints some extra information)') - - parser.add_option('', '--version', - dest='version', - default=False, - action="store_true", - help='Prints script version and exits') - return parser |