~maelkum/viuavm

e33ad4f3847fac88d901d60bca3ac8db7593902c — Marek Marecki 9 months ago dc5070c
Refactor test-case function

The number of source lines did not decrease much, but this is because
the code is now better documented. Logic lines were cut, and comment
lines were added.

This should make it easier to understand what is happening during the
next refactoring, or when I need to add some features to the test suite.
1 files changed, 189 insertions(+), 273 deletions(-)

M new/tests/suite.py
M new/tests/suite.py => new/tests/suite.py +189 -273
@@ 777,133 777,44 @@ def detect_check_kind(test_path):
    raise No_check_file_for(test_path)


def test_case_impl(case_log, case_name, test_program, errors):
    check_kind = None
    try:
        check_kind = detect_check_kind(test_program)
    except No_check_file_for:
        return (
            Status.Normal,
            False,
            "no check file",
            None,
            None,
        )

    base_name = os.path.splitext(test_program)[0]

    test_relocatable = f"{base_name}.o"
    test_executable = f"{base_name}.elf"

    start_timepoint = datetime.datetime.now()
    count_runtime = lambda: (datetime.datetime.now() - start_timepoint)

    extra_source_files = glob.glob(f"{base_name}.*.s")

def test_case_impl_asm(case_log, out_path, asm_path):
    asm_args = (
        ASSEMBLER,
        "-o",
        test_relocatable,
        test_program,
        out_path,
        asm_path,
    )
    asm_return = subprocess.call(
    case_log.write(" ".join(asm_args))
    case_log.write("\n")
    r = subprocess.call(
        args=asm_args,
        stderr=subprocess.DEVNULL,
        stdout=subprocess.DEVNULL,
    )
    case_log.write(" ".join(asm_args))
    case_log.write("\n")
    if asm_return != 0:
        return (
            Status.Normal,
            False,
            ("failed to assemble: " + " ".join(asm_args)),
            count_runtime(),
            None,
        )
    return None if r == 0 else asm_args

    extra_relocatable_files = []
    for extra_source in extra_source_files:
        extra_base_name = os.path.splitext(extra_source)[0]
        extra_relocatable = f"{extra_base_name}.o"
        asm_args = (
            ASSEMBLER,
            "-o",
            extra_relocatable,
            extra_source,
        )
        case_log.write(" ".join(asm_args))
        case_log.write("\n")
        asm_return = subprocess.call(
            args=asm_args,
            stderr=subprocess.DEVNULL,
            stdout=subprocess.DEVNULL,
        )
        if asm_return != 0:
            return (
                Status.Normal,
                False,
                ("failed to assemble: " + " ".join(asm_args)),
                count_runtime(),
                None,
            )

        extra_relocatable_files.append(extra_relocatable)

    if os.path.isfile(test_deps := f"{base_name}.deps"):
        with open(test_deps, "r") as test_deps_fd:
            for dep in test_deps_fd:
                dep = dep.strip()
                if os.path.isfile(dep_o := f"{dep}.o"):
                    extra_relocatable_files.append(dep_o)
                else:
                    return (
                        Status.Normal,
                        False,
                        f"could not locate dependency: {dep}",
                        None,
                        None,
                    )

    # This should fuzz linker bugs related to bad offset calculations and
    # dependent on the order of input files.
    random.shuffle(extra_relocatable_files)

def test_case_impl_ld(case_log, exe_path, reloc_path, extras=()):
    ld_args = (
        LINKER,
        "-o",
        test_executable,
        test_relocatable,
        *extra_relocatable_files,
        exe_path,
        reloc_path,
        *extras,
    )
    case_log.write(" ".join(ld_args))
    case_log.write("\n")
    ld_return = subprocess.call(
    r = subprocess.call(
        args=ld_args,
        stderr=subprocess.DEVNULL,
        stdout=subprocess.DEVNULL,
    )
    if ld_return != 0:
        return (
            Status.Normal,
            False,
            ("failed to link: " + " ".join(ld_args)),
            count_runtime(),
            None,
        )
    return None if r == 0 else ld_args

    test_stdin = None
    if os.path.isfile(stdin_path := f"{base_name}.stdin"):
        with open(stdin_path, "r") as ifstream:
            test_stdin = ifstream.read()

    run_test = lambda: run_and_capture(
        INTERPRETER,
        test_executable,
        stdin=test_stdin,
    )

    result, ebreak, abort_report, perf = run_test()
def test_case_impl_checks(
    case_log, errors, count_runtime, base_path, check_kind, result, ebreak, abort_report
):
    r_exit = result["exit"]

    if r_exit == -6 and check_kind == "abort":


@@ 918,7 829,7 @@ def test_case_impl(case_log, case_name, test_program, errors):
        )

    if check_kind == "ebreak":
        ebreak_dump = os.path.splitext(test_program)[0] + ".ebreak"
        ebreak_dump = f"{base_path}.ebreak"
        with open(ebreak_dump, "r") as ifstream:
            ebreak_dump = ifstream.read().splitlines()



@@ 963,7 874,7 @@ def test_case_impl(case_log, case_name, test_program, errors):
                None,
            )
    elif check_kind == "abort":
        abort_test = os.path.splitext(test_program)[0] + ".abort"
        abort_test = f"{base_path}.abort"
        with open(abort_test, "r") as ifstream:
            abort_test = ifstream.read().splitlines()



@@ 1037,7 948,7 @@ def test_case_impl(case_log, case_name, test_program, errors):
            )
            raise Unexpected_value()
    elif check_kind == "stdout":
        stdout_test = os.path.splitext(test_program)[0] + ".stdout"
        stdout_test = f"{base_path}.stdout"
        want_stdout: str
        with open(stdout_test, "r") as ifstream:
            want_stdout = ifstream.read()


@@ 1071,213 982,218 @@ def test_case_impl(case_log, case_name, test_program, errors):
                None,
            )

    if SKIP_DISASSEMBLER_TESTS:
        return (
            Status.Normal,
            True,
            None,
            count_runtime(),
            perf,
        )
    # Nothing bad to report. Let's return None to signal that we did not detect
    # anything unexpected, and let the main test-case function create an OK
    # report.
    return None

    test_disassembled_program = test_program + DIS_EXTENSION
    dis_return = subprocess.call(
        args=(
            DISASSEMBLER,
            "-o",
            test_disassembled_program,
            test_executable,
        ),
        stderr=subprocess.DEVNULL,
        stdout=subprocess.DEVNULL,
    )
    if dis_return != 0:
        return (
            Status.Normal,
            False,
            "failed to disassemble",
            count_runtime(),
            None,
        )

    asm_args = (
        ASSEMBLER,
        "-o",
        test_relocatable,
        test_disassembled_program,
    )
    asm_return = subprocess.call(
        args=asm_args,
        stderr=subprocess.DEVNULL,
        stdout=subprocess.DEVNULL,
    )
    if asm_return != 0:
        return (
            Status.Normal,
            False,
            ("failed to reassemble: " + " ".join(asm_args)),
            count_runtime(),
            None,
        )
def test_case_impl(case_log, case_name, test_program, errors):
    start_timepoint = datetime.datetime.now()
    count_runtime = lambda: (datetime.datetime.now() - start_timepoint)

    ld_args = (
        LINKER,
        "-o",
        test_executable,
        test_relocatable,
    )
    ld_return = subprocess.call(
        args=ld_args,
        stderr=subprocess.DEVNULL,
        stdout=subprocess.DEVNULL,
    )
    if ld_return != 0:
    # Detect what kind of check the test requires. Some programs need to have
    # their register or memory contents checked, others must produce something
    # on standard output, and some are expected to just crash.
    check_kind = None
    try:
        check_kind = detect_check_kind(test_program)
    except No_check_file_for:
        return (
            Status.Normal,
            False,
            ("failed to relink: " + " ".join(ld_args)),
            count_runtime(),
            "no check file",
            None,
            None,
        )

    result, ebreak, abort_report, _ = run_test()
    r_exit = result["exit"]
    # All files describing a test share a common base path eg, "tests/asm/foo".
    # Sources are stored in "foo.asm" and "foo.*.s" files; checks in "foo.xyz";
    # etc.
    base_path = os.path.splitext(test_program)[0]

    if r_exit == -6 and check_kind == "abort":
        pass
    elif r_exit != 0:
        return (
            Status.Normal,
            False,
            "crashed after reassembly",
            count_runtime(),
            None,
        )
    # Some tests (usually for I/O) expect to receive some data on standard
    # input. The test suite is responsible for supplying it.
    # This is IMPORTANT, because otherwise they may just hang forever.
    test_stdin = None
    if os.path.isfile(stdin_path := f"{base_path}.stdin"):
        with open(stdin_path, "r") as ifstream:
            test_stdin = ifstream.read()

    if check_kind == "ebreak":
        ebreak_dump = os.path.splitext(test_program)[0] + ".ebreak"
        with open(ebreak_dump, "r") as ifstream:
            ebreak_dump = ifstream.read().splitlines()
    test_relocatable = f"{base_path}.o"
    test_executable = f"{base_path}.elf"

        if not ebreak_dump:
            return (
                Status.Normal,
                False,
                "empty ebreak file",
                count_runtime(),
                None,
            )
    asm = lambda out_reloc, in_asm: test_case_impl_asm(case_log, out_reloc, in_asm)

        if ebreak is None:
            return (
                Status.Normal,
                False,
                "program did not emit ebreak",
                count_runtime(),
                None,
            )
    # Some tests (usually for the linker) have their source split over several
    # files. Gather and assemble them all here, before the main file is
    # processed because we need to know what files to pass to the linker as
    # extra relocatables.
    extra_source_files = glob.glob(f"{base_path}.*.s")
    extra_relocatable_files = []
    for extra_source in extra_source_files:
        extra_base_name = os.path.splitext(extra_source)[0]
        extra_relocatable = f"{extra_base_name}.o"
        match asm(extra_relocatable, extra_source):
            case None:
                pass
            case asm_args:
                return (
                    Status.Normal,
                    False,
                    ("failed to assemble: " + " ".join(asm_args)),
                    count_runtime(),
                    None,
                )

        try:
            walk_ebreak_test(errors, ebreak_dump, ebreak)
        except (
            Missing_value,
            Unexpected_type,
            Unexpected_value,
        ) as e:
        extra_relocatable_files.append(extra_relocatable)

    # Some tests also have dependencies on "standard" modules. This is different
    # than split-source tests as the dependencies do not have to be assembled;
    # they are assumbed to be ALWAYS PRESENT (because they are part of the
    # "standard library").
    if os.path.isfile(test_deps := f"{base_path}.deps"):
        with open(test_deps, "r") as test_deps_fd:
            for dep in test_deps_fd:
                dep = dep.strip()
                if os.path.isfile(dep_o := f"{dep}.o"):
                    extra_relocatable_files.append(dep_o)
                else:
                    return (
                        Status.Normal,
                        False,
                        f"could not locate dependency: {dep}",
                        None,
                        None,
                    )

    # This should fuzz linker bugs related to bad offset calculations and
    # dependent on the order of input files.
    random.shuffle(extra_relocatable_files)

    ld = lambda out_exec, in_reloc, extras=(): test_case_impl_ld(
        case_log, out_exec, in_reloc, extras
    )
    run_test = lambda: run_and_capture(
        INTERPRETER,
        test_executable,
        stdin=test_stdin,
    )
    run_checks = lambda r, e, a: test_case_impl_checks(
        case_log, errors, count_runtime, base_path, check_kind, r, e, a
    )

    # FIRST RUN
    #
    # The first run should assemble, link, and execute the test program, and
    # then ensure that it produced the expected result.
    match asm(test_relocatable, test_program):
        case None:
            pass
        case asm_args:
            return (
                Status.Normal,
                False,
                e.to_string(),
                ("failed to assemble: " + " ".join(asm_args)),
                count_runtime(),
                None,
            )
        except Bad_ebreak_script as e:

    # The extra relocatables (including "standard library" moduels) are only
    # passed to the linker during the first run, because during the second one
    # all the necessary code will be present in the single disassembled source
    # file (due to static linking).
    match ld(test_executable, test_relocatable, extra_relocatable_files):
        case None:
            pass
        case ld_args:
            return (
                Status.Normal,
                False,
                f"bad ebreak script, error on line {e.args[0]}",
                ("failed to link: " + " ".join(ld_args)),
                count_runtime(),
                None,
            )
    elif check_kind == "abort":
        abort_test = os.path.splitext(test_program)[0] + ".abort"
        with open(abort_test, "r") as ifstream:
            abort_test = ifstream.read().splitlines()

        if not abort_test:
    result, ebreak, abort_report, perf = run_test()
    if (fail := run_checks(result, ebreak, abort_report)) is not None:
        return fail

    make_good_report = lambda: (
        Status.Normal,
        True,
        None,
        count_runtime(),
        (None if check_kind == "abort" else perf),
    )

    if SKIP_DISASSEMBLER_TESTS:
        return make_good_report()

    # SECOND RUN
    #
    # The second run is not strictly necessary, as the result produced MUST be
    # EXACTLY the same as what the first run produced so we run the same set of
    # checks twice. Why?
    #
    # To test the disassembler and linker. The first run uses manually written
    # sources, sometimes spread over several files, manually specified
    # dependencies, etc. The second run uses code "produced" by the disassembler
    # in which the symbols, objects, and code may eg, appear in a different
    # order.
    #
    # If both runs succeed we can be reasonably sure that the WHOLE TOOLCHAIN
    # works as intended, and that the code can be assembled, linked,
    # disassembled, and relinked without loss of quality and function.
    test_disassembled_program = test_program + DIS_EXTENSION
    dis_return = subprocess.call(
        args=(
            DISASSEMBLER,
            "-o",
            test_disassembled_program,
            test_executable,
        ),
        stderr=subprocess.DEVNULL,
        stdout=subprocess.DEVNULL,
    )
    if dis_return != 0:
        return (
            Status.Normal,
            False,
            "failed to disassemble",
            count_runtime(),
            None,
        )

    match asm(test_relocatable, test_disassembled_program):
        case None:
            pass
        case asm_args:
            return (
                Status.Normal,
                False,
                "empty abort file",
                ("failed to reassemble: " + " ".join(asm_args)),
                count_runtime(),
                None,
            )

        if abort_report is None:
    match ld(test_executable, test_relocatable):
        case None:
            pass
        case ld_args:
            return (
                Status.Normal,
                False,
                "program did not abort",
                ("failed to relink: " + " ".join(ld_args)),
                count_runtime(),
                None,
            )

        if (want_value := abort_test[0]) != (live_value := abort_report["ip"]):
            leader = f"    aborted IP"
            errors.write(
                "{} is {}\n".format(
                    leader,
                    live_value,
                    colorise("red", live_value),
                )
            )
            errors.write(
                "{} expected {}\n".format(
                    (len(leader) * " "),
                    want_value,
                    colorise("green", want_value),
                )
            )
            raise Unexpected_value()
        if (want_value := abort_test[1]) != (live_value := abort_report["instruction"]):
            leader = f"    aborted instruction"
            errors.write(
                "{} is {}\n".format(
                    leader,
                    live_value,
                    colorise("red", live_value),
                )
            )
            errors.write(
                "{} expected {}\n".format(
                    (len(leader) * " "),
                    want_value,
                    colorise("green", want_value),
                )
            )
            raise Unexpected_value()
        if (want_value := abort_test[2]) != (live_value := abort_report["message"]):
            leader = f"    abort message"
            errors.write(
                "{} is {}\n".format(
                    leader,
                    live_value,
                    colorise("red", live_value),
                )
            )
            errors.write(
                "{} expected {}\n".format(
                    (len(leader) * " "),
                    want_value,
                    colorise("green", want_value),
                )
            )
            raise Unexpected_value()

    if check_kind == "abort":
        perf = None
    result, ebreak, abort_report, _ = run_test()
    if (fail := run_checks(result, ebreak, abort_report)) is not None:
        return fail

    return (Status.Normal, True, None, count_runtime(), perf)
    return make_good_report()


def test_case(case_name, test_program, errors):