~amirouche/mutation

6d1053fcc936941790bf71c6a3907cca1ccd94b0 — Amirouche 8 months ago 2621bbe
add tests file ex.py and test.py...
5 files changed, 78 insertions(+), 66 deletions(-)

M .gitignore
A README.md
A foobar/ex.py
M mutation.py
A test.py
M .gitignore => .gitignore +1 -0
@@ 127,3 127,4 @@ dmypy.json

# Pyre type checker
.pyre/
.mutation.okvslite

A README.md => README.md +1 -0
@@ 0,0 1,1 @@
# mutation

A foobar/ex.py => foobar/ex.py +2 -0
@@ 0,0 1,2 @@
def decrement(a):
    return a - 2

M mutation.py => mutation.py +66 -66
@@ 3,6 3,7 @@
Usage:
  mutation play [--verbose] [--exclude=<globs>] [--only-deadcode-detection] [--include=<globs>] [--sampling=<s>] [--randomly-seed=<n>] [--max-workers=<n>] [<file-or-directory> ...] [-- TEST-COMMAND ...]
  mutation replay
  mutation list
  mutation show MUTATION
  mutation apply MUTATION
  mutation (-h | --help)


@@ 165,12 166,11 @@ def glob2predicate(patterns):
def node_iter(node, level=1):
    yield node
    for child in node.children:
        level += 1
        if not getattr(child, "children", False):
            yield child
            continue

        yield from node_iter(child, level)
        yield from node_iter(child, level + 1)


def node_copy_tree(node, index):


@@ 311,11 311,12 @@ class MutateKeyword(metaclass=Mutation):

    def mutate(self, node, index):
        value = node.value
        targets = (
            type(self).KEYWORDS
            if value in type(self).KEYWORDS
            else type(self).SINGLETON
        )
        for targets in [self.KEYWORDS, self.SINGLETON, self.BOOLEAN]:
            if value in targets:
                break
        else:
            raise NotImplementedError

        for target in targets:
            if target == value:
                continue


@@ 463,29 464,27 @@ def install_module_loader(uid):

    patched = patch(diff, source)

    # TODO: replace with a type(...) call with toplevel functions.
    class MutationLoader(SourceLoader):

        __slots__ = ("fullname", "path")

        def __init__(self, fullname, path):
            self.fullname = fullname
            self.path = path
    import imp
    module_path = path[:-3].replace('/', '.')

        def get_filename(self, fullname):
            return self.path
    class MyLoader:
        def load_module(self, fullname):
            try:
                return sys.modules[fullname]
            except KeyError:
                pass
            my_module = imp.new_module(fullname)
            exec(patched, my_module.__dict__)
            sys.modules.setdefault(fullname, my_module)
            return my_module

        def get_data(self, filepath):
            """exec_module is already defined for us, we just have to provide a way
            of getting the source code of the module"""
            if filepath.endswith(path):
                return patched
            # TODO: fetch files from git...
            with open(filepath) as fp:
                return fp.read()
    class MyFinder:
        def find_module(self, fullname, path=None):
            if fullname == module_path:
                return MyLoader()
            return None

    # insert the path hook before of other path hooks
    sys.path_hooks.insert(0, FileFinder.path_hook((MutationLoader, [".py"])))
    sys.meta_path.insert(0, MyFinder())


def pytest_configure(config):


@@ 517,7 516,7 @@ def mutation_pass(args):  # TODO: rename
        msg = "no error with mutation: {}"
        log.error(msg, " ".join(command))
        with database_open(".") as db:
            db[lexode.pack([2, uid])] = b"\x42"
            db[lexode.pack([2, uid])] = b"\x00"
        return False
    else:
        # TODO: pass root path...


@@ 526,7 525,7 @@ def mutation_pass(args):  # TODO: rename
        return True


PYTEST = "pytest --exitfirst --no-header --tb=no --quiet"
PYTEST = "pytest --exitfirst --tb=no --quiet --assert=plain"
PYTEST = shlex.split(PYTEST)




@@ 569,7 568,10 @@ def git_open(root):
def run(command, timeout=None):
    try:
        out = subprocess.run(
            command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=None
            command,
            # stdout=subprocess.DEVNULL,
            # stderr=subprocess.DEVNULL,
            timeout=None,
        )
    except Exception:
        return -1


@@ 781,6 783,7 @@ async def play_mutations(loop, db, seed, alpha, total, max_workers, arguments):
    # prepare to run tests against mutations
    command = list(arguments["TEST-COMMAND"] or PYTEST)
    command.append("--randomly-seed={}".format(seed))
    command.extend(arguments["<file-or-directory>"])

    timeout = alpha * 2
    uids = db[lexode.pack([1]) : lexode.pack([2])]


@@ 886,6 889,7 @@ def replay_mutation(db, uid, alpha, seed, max_workers, arguments):

    command = list(arguments["TEST-COMMAND"] or PYTEST)
    command.append("--randomly-seed={}".format(seed))

    max_workers = 1
    if max_workers > 1:
        command.append("--numprocesses={}".format(max_workers))


@@ 897,8 901,9 @@ def replay_mutation(db, uid, alpha, seed, max_workers, arguments):
            mutation_show(uid.hex)
            msg = "* Type 'skip' to go to next mutation or just enter to retry."
            print(msg)
            retry = input("> ") == "retry"
            if not retry:
            skip = input("> ") == "skip"
            if skip:
                db[lexode.pack([2, uid])] = b"\x01"
                return
            # Otherwise loop to re-test...
        else:


@@ 912,7 917,8 @@ def replay_mutation(db, uid, alpha, seed, max_workers, arguments):

                for file in non_indexed:
                    repository.add(file)
                repository.index.commit("fixed mutation bug uid={}".format(uid.hex))
                repository.index.commit("fixed mutation bug uid={}.".format(uid.hex))
            del db[lexode.pack([2, uid])]
            return




@@ 927,15 933,17 @@ def replay(arguments):
    command = dict(command)
    seed = command.pop("seed")
    random.seed(seed)
    command = command.pop("command")
    command = list(command.pop("command"))

    alpha, max_workers = play_test_tests(root, seed, repository, arguments, command)

    with database_open(root) as db:
        while True:
            uids = (lexode.unpack(key)[1] for key, _ in db[lexode.pack([2]) :])
            uids = (lexode.unpack(k)[1] for k, v in db[lexode.pack([2]):] if v == b"\x00")
            uids = sorted(
                uids, key=functools.partial(mutation_diff_size, db), reverse=True
                uids,
                key=functools.partial(mutation_diff_size, db),
                reverse=True,
            )
            if not uids:
                log.info("No mutation failures 👍")


@@ 945,42 953,30 @@ def replay(arguments):
                replay_mutation(db, uid, alpha, max_workers)


def diff_highlight(diff):
    # adapted from diff-highlight
    import re

    from highlights.command import show_hunk, write

    new, old = [], []
    in_header = True

    for line in diff.split("\n"):
        if in_header:
            if re.match("^(@|commit \w+$)", line):
                in_header = False
        else:
            if not re.match("^(?:[ +\-@\\\\]|diff)", line):
                in_header = True

        if not in_header and line.startswith("+"):
            new.append(line)
        elif not in_header and line.startswith("-"):
            old.append(line)
        else:
            show_hunk(new, old)
            new, old = [], []
            # XXX: add a new line here
            write(line + "\n")

    show_hunk(new, old)  # flush last hunk
def mutation_list():
    with database_open(".") as db:
        uids = ((lexode.unpack(k)[1], v) for k, v in db[lexode.pack([2]):])
        uids = sorted(
            uids,
            key=lambda x: mutation_diff_size(x[0], db),
            reverse=True
        )
    if not uids:
        log.info("No mutation failures 👍")
        sys.exit(0)
    for (uid, type) in uids:
        print("{}\n{}".format(uid.hex, "skipped" if type == b"\x01" else ""))


def mutation_show(uid):
    uid = UUID(hex=uid)
    log.info("mutation show {}", uid.hex)
    log.info("")
    with database_open(".") as db:
        path, diff = lexode.unpack(db[lexode.pack([1, uid])])
    diff = zstd.decompress(diff).decode("utf8")
    diff_highlight(diff)
    for line in diff.split("\n"):
        log.info(line)


def mutation_apply(uid):


@@ 1016,12 1012,16 @@ def main():
        replay(arguments)
        sys.exit(0)

    if arguments.get("list", False):
        mutation_list()
        sys.exit(0)

    if arguments.get("show", False):
        mutation_show(arguments["MUTATION"])
        sys.exit(0)

    if arguments.get("apply", False):
        mutation_show(arguments["MUTATION"])
        mutation_apply(arguments["MUTATION"])
        sys.exit(0)

    loop = asyncio.get_event_loop()

A test.py => test.py +8 -0
@@ 0,0 1,8 @@
import sys
from foobar.ex import decrement


def test_foobar():
    print(sys.meta_path)
    x = decrement(10)
    assert 7 < x < 9