~amirouche/mutation

72dd13df65d458c70c450640c7d7db99f7fe2231 — Amirouche 7 months ago e1fd85d
refactor + black + isort.
1 files changed, 187 insertions(+), 135 deletions(-)

M mutation.py
M mutation.py => mutation.py +187 -135
@@ 12,12 12,10 @@ Options:
  -h --help     Show this screen.
  --version     Show version.
"""
from ast import Constant
import operator
import asyncio
import concurrent.futures
import fnmatch
import itertools
import operator
import os
import random
import re


@@ 25,6 23,8 @@ import shlex
import subprocess
import sys
import time
from ast import Constant
from concurrent import futures
from contextlib import contextmanager
from copy import deepcopy
from datetime import timedelta


@@ 39,14 39,13 @@ import lexode
import parso
import zstandard as zstd
from aiostream import pipe, stream
from astunparse import unparse
from coverage import Coverage
from docopt import docopt
from humanize import precisedelta
from loguru import logger as log
from lsm import LSM
from ulid import ULID
from astunparse import unparse


__version__ = (0, 1, 0)



@@ 259,9 258,7 @@ class DefinitionDrop(metaclass=Mutation):


def chunks(iterable, n):
    """Yield successive n-sized chunks from iterable.

    """
    """Yield successive n-sized chunks from iterable."""
    it = iter(iterable)
    while chunk := tuple(itertools.islice(it, n)):
        yield chunk


@@ 278,9 275,12 @@ class MutateNumber(metaclass=Mutation):
        value = eval(node.value)

        if isinstance(value, int):

            def randomize(x):
                return random.randint(0, x)

        else:

            def randomize(x):
                return random.random() * x



@@ 295,7 295,6 @@ class MutateNumber(metaclass=Mutation):


class MutateString(metaclass=Mutation):

    def predicate(self, node):
        # str or bytes.
        return node.type == "string"


@@ 304,10 303,10 @@ class MutateString(metaclass=Mutation):
        root, new = node_copy_tree(node, index)
        value = eval(new.value)
        if isinstance(value, bytes):
            value = b'coffeebad' + value
            value = b"coffeebad" + value
        else:
            value = "mutated string " + value
        value = Constant(value=value, kind='')
        value = Constant(value=value, kind="")
        value = unparse(value).strip()
        new.value = value
        yield root, new


@@ 327,7 326,11 @@ class MutateKeyword(metaclass=Mutation):

    def mutate(self, node, index):
        value = node.value
        targets = type(self).KEYWORDS if value in type(self).KEYWORDS else type(self).SINGLETON
        targets = (
            type(self).KEYWORDS
            if value in type(self).KEYWORDS
            else type(self).SINGLETON
        )
        for target in targets:
            if target == value:
                continue


@@ 337,9 340,8 @@ class MutateKeyword(metaclass=Mutation):


class Comparison(metaclass=Mutation):

    def predicate(self, node):
        return node == 'comparison'
        return node == "comparison"

    def mutate(self, node, index):
        root, new = node_copy_tree(node, index)


@@ 381,7 383,7 @@ class MutateOperator(metaclass=Mutation):
                yield root, new


def diff(source, target, filename=''):
def diff(source, target, filename=""):
    lines = unified_diff(
        source.split("\n"), target.split("\n"), filename, filename, lineterm=""
    )


@@ 397,27 399,25 @@ def mutate(node, index, mutations):


def is_interesting(new_node, coverage):
    if getattr(new_node, 'line', False):
    if getattr(new_node, "line", False):
        return new_node.line in coverage
    return new_node.get_first_leaf().line in coverage


def deltas_compute(source, path, coverage, predicate):
def deltas_compute(source, path, coverage, mutations):
    ast = parso.parse(source)

    mutations = [m for m in Mutation.ALL if predicate(m)]

    for (index, node) in zip(itertools.count(0), node_iter(ast)):
        for root, new_node in mutate(node, index, mutations):
            if is_interesting(new_node, coverage):
                msg = "Ignoring mutation because there is no coverage:"
                msg += " path={}, line={}"
                log.trace(msg, path, getattr(new_node, 'line', 'unknown'))
                log.trace(msg, path, getattr(new_node, "line", "unknown"))
                continue
            target = root.get_code()
            delta = diff(source, target, path)
            if delta.isspace():
                log.warning('diff is empty!')
                log.warning("diff is empty!")
            else:
                yield delta



@@ 449,12 449,13 @@ async def pool_for_each_par_map(loop, pool, f, p, iterator):
            limit = pool._max_workers - len(unfinished)


def proc(item):  # TODO: rename
    path, source, coverage, node_predicate = item
def mutation_create(item):
    path, source, coverage, mutation_predicate = item
    log.trace("Mutating file: {}...", path)
    mutations = [m for m in Mutation.ALL if mutation_predicate(m)]
    deltas = deltas_compute(source, path, coverage, mutations)
    # return the compressed deltas to save some time in the
    # mainthread.
    deltas = deltas_compute(source, path, coverage, node_predicate)
    out = [(path, zstd.compress(x.encode("utf8"))) for x in deltas]
    log.trace("There is {} mutations for the file `{}`", len(out), path)
    return out


@@ 534,7 535,7 @@ def run(args):  # TODO: rename
        if out.returncode == 0:
            msg = "no error with mutation: {}"
            log.error(msg, " ".join(command))
            db[lexode.pack([2, uid])] = b'\x42'
            db[lexode.pack([2, uid])] = b"\x42"
            mutation_show(uid.hex)




@@ 551,44 552,81 @@ def coverage_read(root):
    return out


async def play(loop, arguments):
    # TODO: Replay failed tests and remove them from failed if it is
    #       now ok...
    #
    # TODO: Always use git HEAD, and display a message as critical
    #       explaining what is happenning...
    #
    # TODO: mutation run -n ie. dryrun: display files taken into consideration
    #
    # TODO: use transactions to make tests as failed...
    #
    # TODO: mutation show all to display all failed tests with diff.
    #
    # TODO: pass plain foobar to pytest and capture output and store
    #       it when test is failed.

    max_workers = arguments["--max-workers"] or (os.cpu_count() - 1) or 1
    max_workers = int(max_workers)
def database_open(root, recreate=False):
    db = root / ".mutation.okvslite"
    if recreate and db.exists():
        log.trace("Deleting existing database...")
        for file in root.glob(".mutation.okvslite*"):
            file.unlink()

    include = arguments.get("--include") or "*.py"
    include = include.split(",")
    include = glob2predicate(include)
    db = LSM(str(db))

    exclude = arguments.get("--exclude") or "*test*"
    exclude = exclude.split(",")
    exclude = glob2predicate(exclude)
    return db

    root = Path(".").resolve()

def git_open(root):
    try:
        git.Repo(str(root))
        repository = git.Repo(str(root))
    except git.exc.InvalidGitRepositoryError:
        log.error("There is no git repository at {}", root)
        sys.exit(1)
        sys.exit(2)
    else:
        return repository

    seed = arguments["--randomly-seed"] or int(time.time())
    log.info("Using random seed: {}".format(seed))
    random.seed(seed)

def run(command, timeout=None):
    try:
        out = subprocess.run(
            command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=None
        )
    except Exception:
        return -1
    else:
        return out.returncode


def sampling_setup(sampling, total):
    if sampling is None:
        return total, lambda x: x

    if sampling.endswith("%"):
        # randomly choose percent mutations
        cutoff = int(sampling[:-1]) / 100

        def sampler(iterable):
            for item in iterable:
                value = random.random()
                if value < cutoff:
                    yield item

        total = int(total * cutoff)
    elif sampling.isdigit():
        # otherwise, it is the first COUNT mutations that are used.
        total = int(sampling)

        def sampler(iterable):
            remaining = total
            for item in iterable:
                yield item
                remaining -= 1
                if remaining == 0:
                    return

    else:
        msg = "Sampling passed via --sampling option must be a positive"
        msg += " integer or a percentage!"
        log.error(msg)
        sys.exit(2)

    if sampling:
        log.info("Taking into account sampling there is {} mutations.", total)

    return sampler, total


def play_test_tests(root, seed, repository, arguments):
    max_workers = arguments["--max-workers"] or (os.cpu_count() - 1) or 1
    max_workers = int(max_workers)

    log.info("Checking the tests are green...")
    #


@@ 613,13 651,13 @@ async def play(loop, arguments):
    ]

    with timeit() as alpha:
        out = subprocess.run(
            command,
            stdout=subprocess.DEVNULL,
            stderr=subprocess.DEVNULL,
        )
    if out.returncode != 0:
        msg = "Tests are not green or something... return code is {}"
        out = run(command)

    if out == 0:
        log.info("Tests are green!")
        alpha = alpha() * 2
    else:
        msg = "Tests are not green or something... return code is {}..."
        log.warning(msg, out.returncode)
        log.warning("I tried the following command: `{}`", " ".join(command))



@@ 633,43 671,54 @@ async def play(loop, arguments):
            "--randomly-seed={}".format(seed),
        ]

        # alpha is the approximate time in seconds to execute the test
        # suite once.  Multiply the difference with two because there
        # is two processus running the tests.
        with timeit() as alpha:
            out = subprocess.run(
                command,
                stdout=subprocess.DEVNULL,
                stderr=subprocess.DEVNULL,
            )
            out = run(command)

        if out.returncode != 0:
            msg = "Tests are definitly red! Return code is {}"
        if out != 0:
            msg = "Tests are definitly red! Return code is {}!!"
            log.error(msg, out.returncode)
            log.error("I tried the following command: `{}`", " ".join(command))
            sys.exit(1)
            sys.exit(2)

        # Otherwise, it is possible to run the tests but without
        # parallelization.
        log.info("Overriding max_workers=1 because tests do not pass in parallel")
        max_workers = 1
        alpha = alpha()
    else:
        # TODO: do that in the first branch
        log.info("Tests are green!")
        alpha = alpha() * 2

    msg = "Time required to run the full test suite once: {}..."
    log.info(msg, humanize(alpha))

    coverage = coverage_read(root)
    return alpha, max_workers


async def play_create_mutations(loop, root, db, repository, max_workers, arguments):
    # Go through all blobs in head, and produce mutations, take into
    # account include pattern, and exclude patterns.  Also, exclude
    # what has no coverage.
    include = arguments.get("--include") or "*.py"
    include = include.split(",")
    include = glob2predicate(include)

    exclude = arguments.get("--exclude") or "*test*"
    exclude = exclude.split(",")
    exclude = glob2predicate(exclude)

    blobs = repository_iter_latest_files(repository)
    blobs = (x for x in blobs if include(x.path) and not exclude(x.path))

    # setup coverage support
    coverage = coverage_read(root)
    only_dead_code = arguments["--only-deadcode-detection"]
    if only_dead_code:
        node_predicate = operator.attrgetter('deadcode_detection')

        def node_predicate(x):
            return getattr(x, "deadcode_detection", False)

    else:
        node_predicate = operator.attrgetter('predicate')

    blobs = repository_iter_latest_files(str(root))
    blobs = (x for x in blobs if include(x.path) and not exclude(x.path))
        def node_predicate(x):
            return True

    def make_item(blob):
        out = (


@@ 682,17 731,10 @@ async def play(loop, arguments):

    items = (make_item(blob) for blob in blobs)

    db = root / ".mutation.okvslite"
    if db.exists():
        log.trace("Deleting existing database...")
        for file in root.glob(".mutation.okvslite*"):
            file.unlink()

    db = LSM(str(db))

    # prepare to create mutations
    total = 0

    def increment(items):
    def on_mutations_created(items):
        nonlocal total
        total += len(items)
        for path, delta in items:


@@ 703,15 745,20 @@ async def play(loop, arguments):

    log.info("Mutation in progress...")
    with timeit() as delta:
        with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as pool:
            await pool_for_each_par_map(loop, pool, increment, proc, items)
        with futures.ProcessPoolExecutor(max_workers=max_workers) as pool:
            await pool_for_each_par_map(
                loop, pool, on_mutations_created, mutation_create, items
            )

    log.info("It took {} to compute mutations...", humanize(delta()))
    log.info("The number of mutation is {}!", total)

    log.info("Testing in progress...")
    return total


async def play_mutations(loop, db, seed, alpha, total, max_workers, arguments):
    # prepare to run tests against mutations
    command = arguments["TEST-COMMAND"] or PYTEST
    # fix random seed...
    command.append("--randomly-seed={}".format(seed))

    timeout = alpha * 2


@@ 719,40 766,8 @@ async def play(loop, arguments):

    # sampling
    sampling = arguments["--sampling"]

    if sampling and sampling.endswith("%"):
        # randomly choose percent mutations
        cutoff = int(sampling[:-1]) / 100

        def sampler(iterable):
            for item in iterable:
                value = random.random()
                if value < cutoff:
                    yield item

        total = int(total * cutoff)
        uids = sampler(uids)
    elif sampling and sampling.isdigit():
        # otherwise, it is the first COUNT mutations that are used.
        total = int(sampling)

        def sampler(iterable):
            remaining = total
            for item in iterable:
                yield item
                remaining -= 1
                if remaining == 0:
                    return

        uids = sampler(uids)
    elif sampling is not None:
        msg = "Sampling passed via --sampling option must be a positive"
        msg += " integer or a percentage!"
        log.error(msg)
        sys.exit(1)

    if sampling:
        log.info("Taking into account sampling there is {} mutations.", total)
    total, sampler = sampling_setup(sampling)
    uids = sampler(uids)

    for speed in [10_000, 1_000, 100, 10, 1]:
        if total // speed == 0:


@@ 762,10 777,13 @@ async def play(loop, arguments):

    gamma = time.perf_counter()

    errors = False
    remaining = total

    def progress(_):
    def on_progress(_):
        nonlocal remaining
        nonlocal errors
        errors = True
        remaining -= 1
        if (remaining % step) == 0 or (total - remaining == 10):
            percent = 100 - ((remaining / total) * 100)


@@ 776,17 794,50 @@ async def play(loop, arguments):
            log.debug(msg, percent)
            log.info("ETA {}...", humanize(eta))

    log.info("Testing in progress...")
    with timeit() as delta:
        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as pool:
            await pool_for_each_par_map(loop, pool, progress, run, uids)
        with futures.ThreadPoolExecutor(max_workers=max_workers) as pool:
            await pool_for_each_par_map(loop, pool, on_progress, mutation_test, uids)

    msg = "Checking that the test suite is strong against mutations took:"
    msg += " {}..."
    log.info(msg, humanize(delta()))

    db.close()
    return errors


async def play(loop, arguments):
    # TODO: Replay failed tests and remove them from failed if it is
    #       now ok...
    #
    # TODO: Always use git HEAD, and display a message as critical
    #       explaining what is happenning...
    #
    # TODO: mutation run -n ie. dryrun: display files taken into consideration
    #
    # TODO: use transactions to make tests as failed...
    #
    # TODO: mutation show all to display all failed tests with diff.
    #
    # TODO: pass plain foobar to pytest and capture output and store
    #       it when test is failed.

    root = Path(".").resolve()
    repository = git_open(root)

    seed = arguments["--randomly-seed"] or int(time.time())
    log.info("Using random seed: {}".format(seed))
    random.seed(seed)

    alpha, max_workers = play_test_tests(root, seed, repository, arguments)

    with database_open(root, recreate=True) as db:
        await play_create_mutations(loop, root, db, repository, max_workers, arguments)
        errors = await play_mutations(
            loop, db, seed, alpha, total, max_workers, arguments
        )

    return None
    sys.exit(1 if errors else 0)


async def replay(uid=None, interactive=False):


@@ 806,6 857,7 @@ async def replay(uid=None, interactive=False):
    # - commit
    # - skip
    #
    pass


def diff_highlight(diff):


@@ 870,7 922,7 @@ def main():
        sys.exit(0)

    loop = asyncio.get_event_loop()
    loop.run_until_complete(_main(loop, arguments))
    loop.run_until_complete(play(loop, arguments))
    loop.close()