~amirouche/mutation

e213db1a2c23d1b1faf406249df1316b55e483f1 — Amirouche 7 months ago 412a558 v0.2.9
v0.2.9

- cli: it is possible to pass pytest argument file-or-directory
  without going throught the ceremony of copy pasting the whole
  command.

- add tqdm to show progress.

- bump v0.2.9.

- print output to stdout.

- only give a summary of ignored mutations.

- not interesting != interesting.

- drop the delta.isspace() it never happens.

- rename mutation_tess to mutation_pass, reverse again the logic to
  return False when mutation fails.

- PYTEST: no tb, with summary.

- database_open accept a string or Path.

- run returns an integer.

- add replay command to help with debugging.
3 files changed, 171 insertions(+), 95 deletions(-)

M mutation.py
M poetry.lock
M pyproject.toml
M mutation.py => mutation.py +152 -93
@@ 1,7 1,7 @@
"""Mutation.

Usage:
  mutation play [--verbose] [--exclude=<globs>] [--only-deadcode-detection] [--include=<globs>] [--sampling=<s>] [--randomly-seed=<n>] [--max-workers=<n>] [-- TEST-COMMAND ...]
  mutation play [--verbose] [--exclude=<globs>] [--only-deadcode-detection] [--include=<globs>] [--sampling=<s>] [--randomly-seed=<n>] [--max-workers=<n>] [<file-or-directory> ...] [-- TEST-COMMAND ...]
  mutation show failed
  mutation show MUTATION
  mutation (-h | --help)


@@ 12,10 12,11 @@ Options:
  -h --help     Show this screen.
  --version     Show version.
"""
from tqdm import tqdm
import functools
import asyncio
import fnmatch
import itertools
import operator
import os
import random
import re


@@ 47,7 48,7 @@ from loguru import logger as log
from lsm import LSM
from ulid import ULID

__version__ = (0, 1, 0)
__version__ = (0, 3, 0)


MINUTE = 60  # seconds


@@ 76,7 77,7 @@ PRONOTION = "https://youtu.be/ihZEaj9ml4w?list=PLOSNaPJYYhrtliZqyEWDWL0oqeH0hOHn
log.remove()
if os.environ.get("DEBUG", False):
    log.add(
        sys.stderr,
        sys.stdout,
        format="<level>{level}</level> {message}",
        level="TRACE",
        colorize=True,


@@ 84,7 85,7 @@ if os.environ.get("DEBUG", False):
    )
else:
    log.add(
        sys.stderr,
        sys.stdout,
        format="<level>{level}</level> {message}",
        level="INFO",
        colorize=True,


@@ 398,7 399,7 @@ def mutate(node, index, mutations):
        yield from mutation.mutate(node, index)


def is_interesting(new_node, coverage):
def interesting(new_node, coverage):
    if getattr(new_node, "line", False):
        return new_node.line in coverage
    return new_node.get_first_leaf().line in coverage


@@ 406,22 407,18 @@ def is_interesting(new_node, coverage):

def deltas_compute(source, path, coverage, mutations):
    ast = parso.parse(source)

    ignored = 0
    for (index, node) in zip(itertools.count(0), node_iter(ast)):
        for root, new_node in mutate(node, index, mutations):
            if is_interesting(new_node, coverage):
                msg = "Ignoring mutation because there is no coverage:"
                msg += " path={}, line={}"
                line = getattr(new_node, "line", False)
                line = line or new_node.get_first_leaf().line
                log.trace(msg, path, line)
            if not interesting(new_node, coverage):
                ignored += 1
                continue
            target = root.get_code()
            delta = diff(source, target, path)
            if delta.isspace():
                log.warning("diff is empty!")
            else:
                yield delta
            yield delta
    if ignored > 1:
        msg = "Ignored {} mutations from file at {} because there is no associated coverage."
        log.trace(msg, ignored, path)


async def pool_for_each_par_map(loop, pool, f, p, iterator):


@@ 522,26 519,26 @@ def for_each_par_map(loop, pool, inc, proc, items):
    return out


def mutation_test(args):  # TODO: rename
def mutation_pass(args):  # TODO: rename
    command, uid, timeout = args
    command = command + ["--mutation={}".format(uid.hex)]
    try:
        out = subprocess.run(
            command,
            stdout=subprocess.DEVNULL,
            timeout=timeout,
        )
    except Exception as exc:  # TODO: remove wide exception
        log.trace("Exception with `{}`, exception=`{}`", uid.hex, exc)
    else:
        if out.returncode == 0:
            msg = "no error with mutation: {}"
            log.error(msg, " ".join(command))
    out = run(command, timeout=timeout)

    if out == 0:
        msg = "no error with mutation: {}"
        log.error(msg, " ".join(command))
        with database_open(".") as db:
            db[lexode.pack([2, uid])] = b"\x42"
            mutation_show(uid.hex)
        mutation_show(uid.hex)
        return False
    else:
        # TODO: pass root path...
        with database_open(".") as db:
            del db[lexode.pack([2, uid])]
        return True


PYTEST = "pytest --exitfirst --no-header --tb=long --showlocals --quiet"
PYTEST = "pytest --exitfirst --no-header --tb=no --quiet"
PYTEST = shlex.split(PYTEST)




@@ 555,6 552,7 @@ def coverage_read(root):


def database_open(root, recreate=False):
    root = root if isinstance(root, Path) else Path(root)
    db = root / ".mutation.okvslite"
    if recreate and db.exists():
        log.trace("Deleting existing database...")


@@ 630,7 628,7 @@ def play_test_tests(root, seed, repository, arguments):
    max_workers = arguments["--max-workers"] or (os.cpu_count() - 1) or 1
    max_workers = int(max_workers)

    log.info("Checking the tests are green...")
    log.info("Let's check that the tests are green...")
    #
    # TODO: use the coverage program instead with something along the
    # lines of:


@@ 639,8 637,12 @@ def play_test_tests(root, seed, repository, arguments):
    #
    # To be able to pass --omit and --include.
    #
    if arguments["<file-or-directory>"] and arguments["TEST-COMMAND"]:
        log.error("<file-or-directory> and TEST-COMMAND are exclusive!")
        sys.exit(1)

    command = arguments["TEST-COMMAND"] or PYTEST
    command = command + [
    command.extend([
        # Use pytest-xdist to make sure it is possible to run the
        # tests in parallel
        "--numprocesses={}".format(max_workers),


@@ 650,7 652,8 @@ def play_test_tests(root, seed, repository, arguments):
        "--no-cov-on-fail",
        # Pass random seed
        "--randomly-seed={}".format(seed),
    ]
    ])
    command.extend(arguments["<file-or-directory>"])

    with timeit() as alpha:
        out = run(command)


@@ 660,7 663,7 @@ def play_test_tests(root, seed, repository, arguments):
        alpha = alpha() * 2
    else:
        msg = "Tests are not green or something... return code is {}..."
        log.warning(msg, out.returncode)
        log.warning(msg, out)
        log.warning("I tried the following command: `{}`", " ".join(command))

        command = arguments["TEST-COMMAND"] or PYTEST


@@ 678,7 681,7 @@ def play_test_tests(root, seed, repository, arguments):

        if out != 0:
            msg = "Tests are definitly red! Return code is {}!!"
            log.error(msg, out.returncode)
            log.error(msg, out)
            log.error("I tried the following command: `{}`", " ".join(command))
            sys.exit(2)



@@ 734,26 737,30 @@ async def play_create_mutations(loop, root, db, repository, max_workers, argumen
        )
        return out

    items = (make_item(blob) for blob in blobs if coverage.get(blob.path, set()))
    items = [make_item(blob) for blob in blobs if coverage.get(blob.path, set())]

    # prepare to create mutations
    total = 0

    def on_mutations_created(items):
        nonlocal total
        total += len(items)
        for path, delta in items:
            # TODO: replace ULID with a content addressable hash.
            uid = ULID().to_uuid()
            # delta is a compressed unified diff
            db[lexode.pack([1, uid])] = lexode.pack([path, delta])

    log.info("Mutation in progress...")
    with timeit() as delta:
        with futures.ProcessPoolExecutor(max_workers=max_workers) as pool:
            await pool_for_each_par_map(
                loop, pool, on_mutations_created, mutation_create, items
            )
    log.info("Creating mutations from {} files...", len(items))
    with tqdm(total=len(items), desc="Files") as progress:

        def on_mutations_created(items):
            nonlocal total

            progress.update()
            total += len(items)
            for path, delta in items:
                # TODO: replace ULID with a content addressable hash.
                uid = ULID().to_uuid()
                # delta is a compressed unified diff
                db[lexode.pack([1, uid])] = lexode.pack([path, delta])

        with timeit() as delta:
            with futures.ProcessPoolExecutor(max_workers=max_workers) as pool:
                await pool_for_each_par_map(
                    loop, pool, on_mutations_created, mutation_create, items
                )

    log.info("It took {} to compute mutations...", humanize(delta()))
    log.info("The number of mutation is {}!", total)


@@ 785,36 792,42 @@ async def play_mutations(loop, db, seed, alpha, total, max_workers, arguments):
    errors = False
    remaining = total

    def on_progress(_):
        nonlocal remaining
        nonlocal errors
        errors = True
        remaining -= 1
        if (remaining % step) == 0 or (total - remaining == 10):
            percent = 100 - ((remaining / total) * 100)
            now = time.perf_counter()
            delta = now - gamma
            eta = (delta / (total - remaining)) * remaining
            msg = "Mutation tests {:.2f}% done..."
            log.debug(msg, percent)
            log.info("ETA {}...", humanize(eta))

    log.info("Testing in progress...")
    with timeit() as delta:
        with futures.ThreadPoolExecutor(max_workers=max_workers) as pool:
            await pool_for_each_par_map(loop, pool, on_progress, mutation_test, uids)
    log.info("Testing mutations in progress...")

    with tqdm(total=100) as progress:

        def on_progress(_):
            nonlocal remaining
            nonlocal errors

            errors = True
            remaining -= 1

            if (remaining % step) == 0 or (total - remaining == 10):
                percent = 100 - ((remaining / total) * 100)
                now = time.perf_counter()
                delta = now - gamma
                eta = (delta / (total - remaining)) * remaining

                progress.update(int(percent))
                progress.set_description("ETA {}".format(humanize(eta)))

                msg = "Mutation tests {:.2f}% done..."
                log.debug(msg, percent)
                log.debug("ETA {}...", humanize(eta))

        with timeit() as delta:
            with futures.ThreadPoolExecutor(max_workers=max_workers) as pool:
                await pool_for_each_par_map(loop, pool, on_progress, mutation_pass, uids)

    msg = "Checking that the test suite is strong against mutations took:"
    msg += " {}..."
    msg += " {}... And it is a success 💚"
    log.info(msg, humanize(delta()))

    return errors


async def play(loop, arguments):
    # TODO: Replay failed tests and remove them from failed if it is
    #       now ok...
    #
    # TODO: Always use git HEAD, and display a message as critical
    #       explaining what is happenning...
    #


@@ 845,24 858,72 @@ async def play(loop, arguments):
    sys.exit(1 if errors else 0)


async def replay(uid=None, interactive=False):
    # Retrieve all failed tests and sort them by diff length
    # decreasing length.
def mutation_diff_size(db, uid):
    _, diff = lexode.unpack(db[lexode.pack([1, uid])])
    out = len(zstd.decompress(diff))
    return out

    # For each mutation:

    # show diff
def replay_mutation(db, uid, alpha, seed, max_workers, arguments):
    print("* Use Ctrl+C to exit.")

    # Run a single test
    repository = git_open(".").index

    #
    # Use raw_input("% ") with the following actions:
    #
    # - replay
    # - commit
    # - skip
    #
    pass
    command = arguments["TEST-COMMAND"] or PYTEST
    command.append("--randomly-seed={}".format(seed))
    if max_workers > 1:
        command.append("--numprocesses={}".format(max_workers))
    timeout = alpha * 2

    while True:
        ok = mutation_pass((command, uid, timeout))
        if not ok:
            mutation_show(uid.hex)
            msg = "* Type 'skip' to go to next mutation or just enter to retry."
            print(msg)
            retry = input("> ") == 'retry'
            if not retry:
                return
            # Otherwise loop to re-test...
        else:
            non_indexed = repository.index.diff(None)
            indexed = repository.index.diff("HEAD")
            if indexed or non_indexed:
                print("* They are uncommited changes, do you want to commit?")
                yes = input("> ").startswith("y")
                if not yes:
                    return

                for file in non_indexed:
                    repository.add(file)
                repository.index.commit("fixed mutation bug uid={}".format(uid.hex))
            return


def replay(arguments):
    root = Path(".").resolve()
    seed = arguments["--randomly-seed"] or int(time.time())
    log.info("Using random seed: {}".format(seed))
    random.seed(seed)
    repository = git_open(root)
    uid = UUID(hex=arguments["uid"])

    alpha, max_workers = play_test_tests(root, seed, repository, arguments)

    with database_open(root) as db:
        while True:
            uids = (lexode.unpack(key)[1] for key, _ in db[lexode.pack([1])])
            uids = sorted(
                uids,
                key=functools.partial(mutation_diff_size, db),
                reverse=True
            )
            if not uids:
                print("No failures!")
                sys.exit(0)
            while uids:
                uid = uids.pop(0)
                replay_mutation(db, uid, alpha, max_workers)


def diff_highlight(diff):


@@ 896,10 957,8 @@ def diff_highlight(diff):


def mutation_show(uid):
    # TODO: pass the printer as an argument or at least make it
    # possible to pass sys.stderr.
    uid = UUID(hex=uid)
    with LSM(".mutation.okvslite") as db:
    with database_open(".") as db:
        path, diff = lexode.unpack(db[lexode.pack([1, uid])])
    diff = zstd.decompress(diff).decode("utf8")
    diff_highlight(diff)


@@ 911,7 970,7 @@ def main():
    if arguments.get("--verbose", False):
        log.remove()
        log.add(
            sys.stderr,
            sys.stdout,
            format="<level>{level}</level> {message}",
            level="DEBUG",
            colorize=True,

M poetry.lock => poetry.lock +17 -1
@@ 590,6 590,18 @@ optional = false
python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*"

[[package]]
name = "tqdm"
version = "4.56.0"
description = "Fast, Extensible Progress Meter"
category = "main"
optional = false
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,>=2.7"

[package.extras]
dev = ["py-make (>=0.1.0)", "twine", "wheel"]
telegram = ["requests"]

[[package]]
name = "traitlets"
version = "5.0.5"
description = "Traitlets Python configuration system"


@@ 655,7 667,7 @@ cffi = ["cffi (>=1.11)"]
[metadata]
lock-version = "1.1"
python-versions = "^3.8"
content-hash = "77c5532f73027cfb661ee0d3300a5b3a75885face2f5d7f7b1595a159fc43843"
content-hash = "354b8f6a684c5556fff1a8a301d3a394aef6056135f73edfa0974b570db6bae8"

[metadata.files]
aiostream = [


@@ 992,6 1004,10 @@ toml = [
    {file = "toml-0.10.2-py2.py3-none-any.whl", hash = "sha256:806143ae5bfb6a3c6e736a764057db0e6a0e05e338b5630894a5f779cabb4f9b"},
    {file = "toml-0.10.2.tar.gz", hash = "sha256:b3bda1d108d5dd99f4a20d24d9c348e91c4db7ab1b749200bded2f839ccbe68f"},
]
tqdm = [
    {file = "tqdm-4.56.0-py2.py3-none-any.whl", hash = "sha256:4621f6823bab46a9cc33d48105753ccbea671b68bab2c50a9f0be23d4065cb5a"},
    {file = "tqdm-4.56.0.tar.gz", hash = "sha256:fe3d08dd00a526850568d542ff9de9bbc2a09a791da3c334f3213d8d0bbbca65"},
]
traitlets = [
    {file = "traitlets-5.0.5-py3-none-any.whl", hash = "sha256:69ff3f9d5351f31a7ad80443c2674b7099df13cc41fc5fa6e2f6d3b0330b0426"},
    {file = "traitlets-5.0.5.tar.gz", hash = "sha256:178f4ce988f69189f7e523337a3e11d91c786ded9360174a3d9ca83e79bc5396"},

M pyproject.toml => pyproject.toml +2 -1
@@ 1,6 1,6 @@
[tool.poetry]
name = "mutation"
version = "0.2.1"
version = "0.2.9"
description = "test mutation for pytest."
authors = ["Amirouche <amirouche@hyper.dev>"]
license = "MIT"


@@ 22,6 22,7 @@ pytest-randomly = "^3.5.0"
humanize = "^3.2.0"
diff-highlight = "^1.2.0"
astunparse = "^1.6.3"
tqdm = "^4.56.0"

[tool.poetry.dev-dependencies]
debug = "^0.3.2"