~yerinalexey/pcrond

1f29ae5fafdbbc2a45f0fabe4307dcfe297f8a10 — Alexey Yerin 5 months ago cf699d5
refactor: completely remove logging
3 files changed, 0 insertions(+), 28 deletions(-)

M pcrond/job.py
M pcrond/sched.py
M test_scheduler.py
M pcrond/job.py => pcrond/job.py +0 -3
@@ 1,7 1,5 @@

from datetime import datetime
import logging
logger = logging.getLogger('pcrond')

reboot_time = datetime.now()



@@ 163,7 161,6 @@ class Job(object):
        Run the job.
        :return: The return value returned by the `job_func`
        """
        logger.info('Running job %s', self)
        self.running = True
        ret = self.job_func()
        self.running = False

M pcrond/sched.py => pcrond/sched.py +0 -14
@@ 1,25 1,19 @@
# most of the code here comes from https://github.com/dbader/schedule

from .job import Job, ALIASES
import logging
import time

logger = logging.getLogger('pcrond')


def std_launch_func(cmd_splitted, stdin=None):
    """
    Default way of executing commands is to invoke subprocess.run()
    """
    if stdin is None:
        def f():
            logger.info("Now running: " + str(cmd_splitted))
            from subprocess import Popen
            Popen(cmd_splitted, stdin=None, stdout=None, stderr=None)
            # not returning anything here
    else:
        def f():
            logger.info("Now running: " + str(cmd_splitted))
            from subprocess import Popen, PIPE
            p = Popen(cmd_splitted, stdin=PIPE, stdout=None, stderr=None)
            p.communicate(input=stdin)


@@ 47,9 41,7 @@ class Scheduler(object):
        in one hour increments then your job won't be run 60 times in
        between but only once.
        """
        logger.debug("available jobs: " + str(self.jobs))
        runnable_jobs = (job for job in self.jobs if job.should_run())
        logger.debug("runnable jobs: " + str(self.jobs))
        for job in runnable_jobs:
            job.run()



@@ 61,7 53,6 @@ class Scheduler(object):
        over time.
        :param delay_seconds: A delay added between every executed job
        """
        logger.info('Running *all* %i jobs with %is delay inbetween',
                    len(self.jobs), delay_seconds)
        for job in self.jobs[:]:
            job.run()


@@ 72,7 63,6 @@ class Scheduler(object):
        Deletes scheduled jobs
        """
        del self.jobs[:]
        logger.info("jobs cleared")

    def cancel_job(self, job):
        """


@@ 119,11 109,9 @@ class Scheduler(object):
                return job
            except ValueError as e:
                # shouldn't happen
                logger.error(("Error at line %d, cannot parse pattern, the line will be ignored.\r\n" +
                              "Inner Exception: %s") % (rownum, str(e)))
                return None
        if len(pieces) < 6:
            logger.error("Error at line %d, expected at least 6 tokens" % rownum)
            return None
        if len(pieces) >= 7:
            try:


@@ 137,7 125,6 @@ class Scheduler(object):
            job = self.cron(" ".join(pieces[0:5]), job_func_func(pieces[5:]))
            return job
        except ValueError as e:
            logger.error(("Error at line %d, cannot parse pattern, the line will be ignored.\r\n" +
                          "Inner Exception: %s") % (rownum, str(e)))
            return None



@@ 177,7 164,6 @@ class Scheduler(object):
                        stdin = pieces[1] if len(pieces) > 1 else None
                        self._load_crontab_line(rownum, pieces[0], job_func_func, stdin)
                        # TODO support % sign inside command, should consider pieces[1] if any
        logger.info(str(len(self.jobs)) + " jobs loaded from configuration file")

    def main_loop(self):
        """

M test_scheduler.py => test_scheduler.py +0 -11
@@ 1,21 1,10 @@
#!/usr/bin/env python
"""Unit tests for pcrond.py"""
import unittest
import logging
import sys
from datetime import datetime as d, timedelta
from pcrond import scheduler, Job, Parser

# when tests with a logger fail, you can set this to True
SHOW_LOGGING = False

logger = logging.getLogger()
if SHOW_LOGGING:
    logging.basicConfig(level=logging.DEBUG)
else:
    logger.addHandler(logging.NullHandler())  # do not show logs.


def do_nothing():
    pass