359 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			359 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
#!/usr/bin/python3
 | 
						|
import os
 | 
						|
import sys
 | 
						|
import subprocess
 | 
						|
import re
 | 
						|
import time
 | 
						|
import os
 | 
						|
import colorama
 | 
						|
import logging
 | 
						|
import argparse
 | 
						|
import pickle
 | 
						|
import json
 | 
						|
from pathlib import Path
 | 
						|
from datetime import datetime
 | 
						|
from collections import OrderedDict
 | 
						|
 | 
						|
 | 
						|
def system_call(command, get_stdout=True, show_er=True):
 | 
						|
    if get_stdout:
 | 
						|
        process = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True)
 | 
						|
        stdout = process.communicate()[0].decode(
 | 
						|
            "ascii", errors="ignore").splitlines()
 | 
						|
        er = process.returncode
 | 
						|
    else:
 | 
						|
        er = os.system(command)
 | 
						|
        stdout = ""
 | 
						|
    if er != 0:
 | 
						|
        logging.error("failed to {}, er={}".format(command, er))
 | 
						|
    return er, stdout
 | 
						|
 | 
						|
 | 
						|
def parse_args(argv):
 | 
						|
    parser = argparse.ArgumentParser(description='check prebuilt object')
 | 
						|
    parser.add_argument('-f', '--files',
 | 
						|
                        type=argparse.FileType('r'),
 | 
						|
                        nargs=2,
 | 
						|
                        help='compare 2 files directly')
 | 
						|
    parser.add_argument('-j', '--json', metavar='[.json]',
 | 
						|
                        dest='json', action='store',
 | 
						|
                        help='load json configure file')
 | 
						|
    parser.add_argument('-d', '--driver',
 | 
						|
                        action="store_true",
 | 
						|
                        help='check driver listed in json')
 | 
						|
    parser.add_argument('-g', '--glibc',
 | 
						|
                        action="store_true",
 | 
						|
                        help='check glibc listed in json')
 | 
						|
    parser.add_argument('-u', '--uclibc',
 | 
						|
                        action="store_true",
 | 
						|
                        help='check uclibc listed in json')
 | 
						|
    parser.add_argument('-r', '--rtos',
 | 
						|
                        action="store_true",
 | 
						|
                        help='check rtos listed in json')
 | 
						|
    args = parser.parse_args()
 | 
						|
    return args
 | 
						|
 | 
						|
 | 
						|
class ConsoleFormatter(logging.Formatter):
 | 
						|
    # MSG_FMT = "%(class)s%(asctime)s: %(message)s\033[0m"
 | 
						|
    MSG_FMT = "%(class)s%(message)s\033[0m"
 | 
						|
    CSS_CLASSES = {'WARNING': '\033[1;93m',
 | 
						|
                   'INFO': '\033[0m',
 | 
						|
                   'DEBUG': '\033[0m',
 | 
						|
                   'CRITICAL': '\033[1;91m',
 | 
						|
                   'ERROR': '\033[1;91m'}
 | 
						|
 | 
						|
    def __init__(self):
 | 
						|
        super().__init__()
 | 
						|
 | 
						|
    def format(self, record):
 | 
						|
        try:
 | 
						|
            class_name = self.CSS_CLASSES[record.levelname]
 | 
						|
        except KeyError:
 | 
						|
            class_name = "info"
 | 
						|
        tm_now = datetime.now()
 | 
						|
        asctime = "{:02d}:{:02d}".format(
 | 
						|
            tm_now.hour,
 | 
						|
            tm_now.minute)
 | 
						|
        message = record.msg
 | 
						|
        return self.MSG_FMT % {
 | 
						|
            "class": class_name,
 | 
						|
            "message": message,
 | 
						|
            "asctime": asctime
 | 
						|
        }
 | 
						|
 | 
						|
 | 
						|
def initiate():
 | 
						|
    colorama.init()
 | 
						|
    # set log file
 | 
						|
    rootLogger = logging.getLogger()
 | 
						|
    consoleHandler = logging.StreamHandler()
 | 
						|
    consoleHandler.setFormatter(ConsoleFormatter())
 | 
						|
    rootLogger.addHandler(consoleHandler)
 | 
						|
    rootLogger.setLevel(logging.DEBUG)
 | 
						|
    return 0
 | 
						|
 | 
						|
 | 
						|
def gen_list():
 | 
						|
    hdal_real_dir = os.path.realpath(os.getenv("NVT_HDAL_DIR"))
 | 
						|
    cmd = "find {} -name '*.o'".format(hdal_real_dir)
 | 
						|
    er, text = system_call(cmd)
 | 
						|
    if er != 0:
 | 
						|
        return er
 | 
						|
    with open('objs.pkl', 'wb') as f:
 | 
						|
        pickle.dump(text, f)
 | 
						|
    return 0
 | 
						|
 | 
						|
 | 
						|
def chk_one(path):
 | 
						|
    hdal_real_dir = os.path.realpath(os.getenv("NVT_HDAL_DIR"))
 | 
						|
    file_name = os.path.basename(path)
 | 
						|
    cmd = "find {} -name '*.o'".format(hdal_real_dir)
 | 
						|
    er, _ = system_call(cmd)
 | 
						|
    if er != 0:
 | 
						|
        return er
 | 
						|
 | 
						|
    cmd = "find {} -name {}".format(hdal_real_dir, file_name)
 | 
						|
    er, files = system_call(cmd)
 | 
						|
    if er != 0:
 | 
						|
        return er
 | 
						|
 | 
						|
    if len(files) < 2:
 | 
						|
        logging.info("{} skipped, no object pair".format(file_name))
 | 
						|
        return 0
 | 
						|
    elif len(files) > 2:
 | 
						|
        logging.error(
 | 
						|
            "{} exists 3 locations".format(file_name))
 | 
						|
        return -1
 | 
						|
 | 
						|
    crc32 = [0, 0]
 | 
						|
    for idx in range(len(files)):
 | 
						|
        cmd = \
 | 
						|
            "$STRIP --strip-debug {} -o tmp.o && ".format(files[idx]) + \
 | 
						|
            "$OBJDUMP -S tmp.o | " + \
 | 
						|
            "sed -e '/file format elf/d' > tmp.asm && " + \
 | 
						|
            "crc32 tmp.asm"
 | 
						|
        er, text = system_call(cmd)
 | 
						|
        if er != 0:
 | 
						|
            return er
 | 
						|
        crc32[idx] = text[0]
 | 
						|
 | 
						|
    if crc32[0] != crc32[1]:
 | 
						|
        logging.error("{} NG, value=0x{} vs value=0x{}".format(
 | 
						|
            file_name, crc32[0], crc32[1]))
 | 
						|
        return -1
 | 
						|
 | 
						|
    logging.warning("{} OK, value=0x{} vs value=0x{}".format(
 | 
						|
            file_name, crc32[0], crc32[1]))
 | 
						|
 | 
						|
    return 0
 | 
						|
 | 
						|
 | 
						|
def find_pair_in_hdal(path):
 | 
						|
    hdal_real_dir = os.path.realpath(os.getenv("NVT_HDAL_DIR"))
 | 
						|
    file_name = os.path.basename(path)
 | 
						|
    cmd = "find {} -name '*.o'".format(hdal_real_dir)
 | 
						|
    er, _ = system_call(cmd)
 | 
						|
    if er != 0:
 | 
						|
        return er, None
 | 
						|
 | 
						|
    cmd = "find {} -name {}".format(hdal_real_dir, file_name)
 | 
						|
    er, files = system_call(cmd)
 | 
						|
    return er, files
 | 
						|
 | 
						|
 | 
						|
def compare_two(filepath1, filepath2):
 | 
						|
    crc32 = [0, 0]
 | 
						|
    files = [filepath1, filepath2]
 | 
						|
    for idx in range(len(files)):
 | 
						|
        cmd = \
 | 
						|
            "$STRIP --strip-debug {} -o tmp.o && ".format(files[idx]) + \
 | 
						|
            "$OBJDUMP -S tmp.o | " + \
 | 
						|
            "sed -e '/file format elf/d' > tmp{}.asm && ".format(idx) + \
 | 
						|
            "crc32 tmp{}.asm".format(idx)
 | 
						|
        er, text = system_call(cmd)
 | 
						|
        if er != 0:
 | 
						|
            return er
 | 
						|
        crc32[idx] = text[0]
 | 
						|
 | 
						|
    # reduce path prefix string for debug message
 | 
						|
    dir1 = os.path.dirname(filepath1)
 | 
						|
    dir2 = os.path.dirname(filepath2)
 | 
						|
    n = len(dir1) if len(dir1) < len(dir2) else len(dir2)
 | 
						|
    last_slash = 0
 | 
						|
    for i in range(n):
 | 
						|
        if dir1[i] == dir2[i] and dir1[i] == '/':
 | 
						|
            last_slash = i+1
 | 
						|
        if dir1[i] != dir2[i]:
 | 
						|
            break
 | 
						|
    if last_slash == 0:
 | 
						|
        short_filename1 = os.path.basename(filepath1)
 | 
						|
        short_filename2 = os.path.basename(filepath2)
 | 
						|
    else:
 | 
						|
        short_filename1 = os.path.join(dir1[last_slash:], os.path.basename(filepath1))
 | 
						|
        short_filename2 = os.path.join(dir2[last_slash:], os.path.basename(filepath2))
 | 
						|
 | 
						|
    logging.info("[0]: {}".format(short_filename1))
 | 
						|
    logging.info("[1]: {}".format(short_filename2))
 | 
						|
 | 
						|
    if crc32[0] != crc32[1]:
 | 
						|
        logging.error("[{}] NG, crc:({}!={})".format(
 | 
						|
            os.path.basename(filepath1), crc32[0], crc32[1]))
 | 
						|
        return -1
 | 
						|
    logging.warning("[{}] OK, crc:({}=={})".format(
 | 
						|
        os.path.basename(filepath1), crc32[0], crc32[1]))
 | 
						|
    return 0
 | 
						|
 | 
						|
def chk_list(path_to_objs_pkl):
 | 
						|
    text = None
 | 
						|
    with open(path_to_objs_pkl, 'rb') as f:
 | 
						|
        text = pickle.load(f)
 | 
						|
    if text is None:
 | 
						|
        logging.error("failed to load data from {}".format(path_to_objs_pkl))
 | 
						|
        return -1
 | 
						|
 | 
						|
    total_er = 0
 | 
						|
    for line in text:
 | 
						|
        er, files = find_pair_in_hdal(line)
 | 
						|
        if er != 0:
 | 
						|
            return er
 | 
						|
 | 
						|
        if len(files) < 2:
 | 
						|
            logging.info("{} skipped, no object pair".format(os.path.basename(files[0])))
 | 
						|
            continue
 | 
						|
        elif len(files) > 2:
 | 
						|
            logging.error("{} exists 3 locations".format(os.path.basename(files[0])))
 | 
						|
            return -1
 | 
						|
 | 
						|
        if "nvt_audlib_" in files[0]:
 | 
						|
            continue
 | 
						|
 | 
						|
        er = compare_two(files[0], files[1])
 | 
						|
 | 
						|
        if er != 0:
 | 
						|
            # we check all for fully report
 | 
						|
            total_er = er
 | 
						|
    return total_er
 | 
						|
 | 
						|
def load_json(json_path):
 | 
						|
    cfg = None
 | 
						|
    with open(json_path, encoding="utf-8", errors='ignore') as file:
 | 
						|
        cfg = json.load(file, object_pairs_hook=OrderedDict)
 | 
						|
        if cfg["jsontype"] != "verify_prebuilt":
 | 
						|
            logging.error("this json file is not for this application.")
 | 
						|
            return None
 | 
						|
    return cfg
 | 
						|
 | 
						|
def process_json(cfg, item_name):
 | 
						|
    # check environment
 | 
						|
    if os.getenv("LINUX_BUILD_TOP") is None:
 | 
						|
        logging.error("source build/envsetup.sh first")
 | 
						|
        return -1
 | 
						|
 | 
						|
    logging.info("===== start checking {} =====".format(item_name))
 | 
						|
    total_er = 0
 | 
						|
    top_dir = os.getenv("LINUX_BUILD_TOP")
 | 
						|
    items = cfg[item_name]
 | 
						|
    for item in items:
 | 
						|
        file1 = os.path.join(top_dir, item[0])
 | 
						|
        file2 = os.path.join(top_dir, item[1])
 | 
						|
        # check file exist
 | 
						|
        if not os.path.exists(file1):
 | 
						|
            logging.error("{} is not existing".format(item[0]))
 | 
						|
            total_er = -1
 | 
						|
            continue
 | 
						|
        if not os.path.exists(file2):
 | 
						|
            logging.error("{} is not existing".format(item[1]))
 | 
						|
            total_er = -1
 | 
						|
            continue
 | 
						|
        er = compare_two(file1, file2)
 | 
						|
        if er != 0:
 | 
						|
            total_er = er
 | 
						|
    logging.info("\n")
 | 
						|
    return total_er
 | 
						|
 | 
						|
def rename_obj_to_no_tracer_obj(cfg, item_name):
 | 
						|
    # check environment
 | 
						|
    if os.getenv("LINUX_BUILD_TOP") is None:
 | 
						|
        logging.error("source build/envsetup.sh first")
 | 
						|
        return -1
 | 
						|
    items = cfg[item_name]
 | 
						|
    for item in items:
 | 
						|
        item[0] = item[0].replace(".o", "_no_tracer.o")
 | 
						|
        item[1] = item[1].replace(".o", "_no_tracer.o")
 | 
						|
    return cfg
 | 
						|
 | 
						|
def rename_archive_to_so(cfg, item_name):
 | 
						|
    # check environment
 | 
						|
    if os.getenv("LINUX_BUILD_TOP") is None:
 | 
						|
        logging.error("source build/envsetup.sh first")
 | 
						|
        return -1
 | 
						|
    items = cfg[item_name]
 | 
						|
    for item in items:
 | 
						|
        item[0] = item[0].replace(".a", ".so")
 | 
						|
        item[1] = item[1].replace(".a", ".so")
 | 
						|
    return cfg
 | 
						|
 | 
						|
def main(argv):
 | 
						|
    er = initiate()
 | 
						|
    if er != 0:
 | 
						|
        return er
 | 
						|
    # parse options
 | 
						|
    args = parse_args(argv)
 | 
						|
    if args.files:
 | 
						|
        er = compare_two(args.files[0].name, args.files[1].name)
 | 
						|
    if er != 0:
 | 
						|
        return -1
 | 
						|
 | 
						|
    cfg = None
 | 
						|
    if args.json:
 | 
						|
        cfg = load_json(args.json)
 | 
						|
 | 
						|
    if args.driver or args.glibc or args.uclibc or args.rtos:
 | 
						|
        if cfg is None:
 | 
						|
            logging.error("lack -j to load json file.")
 | 
						|
            return -1
 | 
						|
 | 
						|
    total_er = 0
 | 
						|
 | 
						|
    if args.driver:
 | 
						|
        er = process_json(cfg, "driver")
 | 
						|
        if er != 0:
 | 
						|
            total_er = er
 | 
						|
        # rename .o to _no_tracer.o and check again
 | 
						|
        cfg = rename_obj_to_no_tracer_obj(cfg, "driver")
 | 
						|
        er = process_json(cfg, "driver")
 | 
						|
        if er != 0:
 | 
						|
            total_er = er
 | 
						|
 | 
						|
    if args.glibc:
 | 
						|
        er = process_json(cfg, "glibc")
 | 
						|
        if er != 0:
 | 
						|
            total_er = er
 | 
						|
        # rename .a to .so and check again
 | 
						|
        cfg = rename_archive_to_so(cfg, "glibc")
 | 
						|
        er = process_json(cfg, "glibc")
 | 
						|
        if er != 0:
 | 
						|
            total_er = er
 | 
						|
 | 
						|
    if args.uclibc:
 | 
						|
        er = process_json(cfg, "uclibc")
 | 
						|
        if er != 0:
 | 
						|
            total_er = er
 | 
						|
        # rename .a to .so and check again
 | 
						|
        cfg = rename_archive_to_so(cfg, "uclibc")
 | 
						|
        er = process_json(cfg, "uclibc")
 | 
						|
        if er != 0:
 | 
						|
            total_er = er
 | 
						|
 | 
						|
    if args.rtos:
 | 
						|
        er = process_json(cfg, "rtos")
 | 
						|
        if er != 0:
 | 
						|
            total_er = er
 | 
						|
 | 
						|
    return total_er
 | 
						|
 | 
						|
 | 
						|
if __name__ == '__main__':
 | 
						|
    sys.exit(main(sys.argv))
 |