~yerinalexey/pcrond

5e81bfac3222133914e20564d0bc55e96eaff15b — Luca Vercelli 2 years ago 2fed363
Day 0
6 files changed, 477 insertions(+), 0 deletions(-)

A README.rst
A pcrond.py
A pcrond/__init__.py
A pcrond/job.py
A pcrond/sched.py
A test_scheduler.py
A README.rst => README.rst +48 -0
@@ 0,0 1,48 @@
python crond
============
Userspace cron daemon

Some of the code was taken from https://github.com/dbader/schedule, release under MIT license.

This project is not interested in the "human stuff" of the original problem.
We want to launch processes in the same way crond does, and we want to do that in userspace.





.. image:: https://api.travis-ci.org/dbader/schedule.svg?branch=master
        :target: https://travis-ci.org/dbader/schedule

.. image:: https://coveralls.io/repos/dbader/schedule/badge.svg?branch=master
        :target: https://coveralls.io/r/dbader/schedule

.. image:: https://img.shields.io/pypi/v/schedule.svg
        :target: https://pypi.python.org/pypi/schedule


Features
--------
- A simple to use API for scheduling jobs.
- Very lightweight and no external dependencies.
- Excellent test coverage.
- Tested on Python 2.7, 3.5, and 3.6

Usage
-----

.. code-block:: bash

    $ python ./setup.py
    $ ./pcrond.py path/to/my/crontab/file
    

It is also possible to use this library within youir python program, howebver this is not the intended use.
If you want to do this, just mimic what pcrond.py does.

.. code-block:: python

    from pcrond import scheduler
    ...

    
\ No newline at end of file

A pcrond.py => pcrond.py +19 -0
@@ 0,0 1,19 @@
#!/usr/env python

from pcrond import scheduler

crontab_filename=None

def parse_args():
    import argparse
    parser = argparse.ArgumentParser(description='Launch a crond-like daemon in userspace.')
    parser.add_argument('filename', help='the crontab file')
    args = parser.parse_args()
    
    global crontab_filename
    crontab_filename = args.filename

if __name__ == "__main__":
    parse_args()
    scheduler.load_crontab_file(crontab_filename)
    scheduler.main_loop()

A pcrond/__init__.py => pcrond/__init__.py +6 -0
@@ 0,0 1,6 @@

from job import Job
from sched import Scheduler

#default instance
scheduler = Scheduler()

A pcrond/job.py => pcrond/job.py +177 -0
@@ 0,0 1,177 @@

MONTH_OFFSET = {'jan': 1, 'feb': 2, 'mar': 3, 'apr': 4, 'may': 5, 'jun': 6,
        'jul': 7, 'aug': 8, 'sep': 9, 'oct': 10, 'nov':11, 'dec':12}
WEEK_OFFSET = {'sun': 0, 'mon': 1, 'tue': 2, 'wed': 3, 'thu': 4, 'fri': 5,
        'sat': 6}
ALIASES = {
    '@yearly':  '0 0 1 1 *',
    '@annually':  '0 0 1 1 *',
    '@monthly': '0 0 1 * *',
    '@weekly':  '0 0 * * 0',
    '@daily':   '0 0 * * *',
    '@hourly':  '0 * * * *',
}

class Job(object):
    """
    A periodic job as used by :class:`Scheduler`.
    """
    def __init__(self, crontab, job_func=None, scheduler=None):
        """
        Constructor
        :param crontab:
            string containing crontab pattern
            Its tokens may be either: 1 (if alias), 5 (without year token), 6 (with year token)
        :param job_func:
            the job 0-ary function to run
            if None, you should set it later
        :param scheduler:
            scheduler to register with
            if None, you should set it later
        """
        self.job_func = job_func
        self.scheduler = scheduler
        self.running = False

        if crontab in ALIASES.keys():
            crontab = ALIASES[crontab]

        crontab_lst = crontab.split()
        
        if crontab_lst[0] in ALIASES.keys():
            raise ValueError("Cannot mix @Aliases and other tokens")
        
        if len(crontab_lst) == 5:
            crontab_lst.append("*")
        if len(crontab_lst) != 6:
            raise ValueError("Each crontab pattern *must* contain either 5 or 6 items")
        self.allowed_min = self._parse_min(crontab_lst[0])
        self.allowed_hours = self._parse_hour(crontab_lst[1])
        self.allowed_days_of_month = self._parse_day_in_month(crontab_lst[2])
        self.allowed_months = self._parse_month(crontab_lst[3])
        self.allowed_days_of_week = self._parse_day_in_week(crontab_lst[4])
        self.allowed_years = self._parse_year(crontab_lst[5])
        
        self.allowed_last_day_of_month = True if -1 in self.allowed_days_of_month else False
        
        if -1 in self.allowed_years:
            raise ValueError("Wrong format '%s' : 'L' is meaningless talking about Years" % crontab_lst[5])
        
        self.crontab_pattern = crontab_lst

    def _parse_token(self, token, offsets):
        if token in offsets.keys():
            newtoken = offsets[token]
            try:
                return int(newtoken)
            except ValueError:
                raise ValueError("token %s maps to %s, however the latter is not an integer" % (token, newtoken))
        try:
            return int(token)
        except ValueError:
            raise ValueError("token %s is not an integer, nor it is a known constant" % token)
            
    
    def _parse_common(self, s, maxval, offsets={}):
        """
        generate a set of integers, corresponding to "allowed values".
        Work for minute, hours, weeks, month, ad days of week, because they are all "similar".
        Does not work very well for years and days of month
        Supported formats: "*", "*/3", "1,2,3", "L", "1,2-5,jul,10-L", "50-10"
        :param maxval: es. 60 for minutes, 12 for month, ...
        :param offsets: a dict mapping names (es. "mar") to their offsets (es. 2).
        """
        if "L" not in offsets:
            offsets["L"] = maxval-1
        if s == "*":
            return range(0, maxval)     #every minute
        elif s.startswith("*/"):        #every tot minutes
            try:
                step = int(s[2:])
            except ValueError:
                raise ValueError("Wrong format '%s' - expecting an integer after '*/'" % s)
            return range(0, maxval, step)
        else:                           #at given minutes
            ranges = s.split(",")                       #   ["1","2-5","jul","10-L"]
            ranges = [x.split("-") for x in ranges]     #   [["1"],["2","5"],["aug"], ["10","L"]]
            ranges = [[self._parse_token(w, offsets) for w in x] for x in ranges]  #   [[1],[2,5],[7], [10,11]]
            #DEBUG
            #import pdb
            #pdb.set_trace()
            if max([len(x) for x in ranges]) > 2:
                raise ValueError("Wrong format '%s' - a string x-y-z is meaningless" % s)
            ranges1 = [x for x in ranges if len(x)==1]                                             # [[1], [7]]
            ranges1.extend([range(x[0], x[1]+1) for x in ranges if len(x)==2 and x[0]<=x[1]])      # [[2,3,4,5], [10, 11]]
            ranges1.extend([range(x[0], maxval) for x in ranges if len(x)==2 and x[0]>x[1]])       # [] in this case
            ranges1.extend([range(0, x[1]+1) for x in ranges if len(x)==2 and x[0]>x[1]])          # [] in this case
            return set([z for rng in ranges1 for z in rng])

    def _parse_min(self, s):
        return self._parse_common(s, 60)

    def _parse_hour(self, s):
        return self._parse_common(s, 24)

    def _parse_month(self, s):
        return self._parse_common(s, 12, MONTH_OFFSET)

    def _parse_day_in_week(self, s):
        return self._parse_common(s, 7, WEEK_OFFSET)

    def _parse_year(self, s):
        """ to put things simple, I assume a range of years between 0 and 3000 ... This is mostly useless. """
        return self._parse_common(s, 3000, {"L" : -1})

    def _parse_day_in_month(self, s):
        return self._parse_common(s, 31, {"L" : -1})    #this works by chance

    def _check_day_in_month(self, now):
        if self.allowed_last_day_of_month:
            #this is a hack for avoiding to calculate "L" when not needed
            import calendar
            last_day_of_month = calendar.monthrange(now.year, now.month)[1]
            if now.day == last_day_of_month:
                return True;
        return now.day in self.allowed_days_of_month;

    def __lt__(self, other):
        """
        Periodic Jobs are sortable based on the scheduled time they
        run next.
        """
        return self.next_run < other.next_run

    def should_run(self):
        """
        :return: ``True`` if the job should be run now.
        """
        import datetime
        now = datetime.datetime.now()
        #FIXME was: return datetime.datetime.now() >= self.next_run
        return not self.running \
            and now.year in self.allowed_years \
            and now.month in self.allowed_months \
            and now.weekday() in self.allowed_days_in_week \
            and now.hour in self.allowed_hours \
            and now.minute in self.allowed_minutes \
            and self._check_day_in_month(now);

    def run(self):
        """
        Run the job.
        :return: The return value returned by the `job_func`
        """
        self.running = True
        ret = self.job_func()
        self.running = false
        return ret
        
    def run_if_should(self):
        """
        Run the job if needed.
        :return: The return value returned by the `job_func`
        """
        if self.should_run():
            logger.info('Running job %s', self)
            return self.run()


A pcrond/sched.py => pcrond/sched.py +163 -0
@@ 0,0 1,163 @@
#most of the code here comes from https://github.com/dbader/schedule

from job import ALIASES, Job

def std_launch_func(cmd_splitted):
    """
    Default way of executing commands is to invoke subprocess.run()
    """
    def f():
        import subprocess
        subprocess.run(cmd_splitted)
        #not returning anything here
    return f

class Scheduler(object):
    """
    Objects instantiated by the :class:`Scheduler <Scheduler>` are
    factories to create jobs, keep record of scheduled jobs and
    handle their execution.
    """
    def __init__(self):
        self.delay = 60         #in seconds
        self.jobs = []

    def run_pending(self):
        """
        Run all jobs that are scheduled to run.
        Please note that it is *intended behavior that run_pending()
        does not run missed jobs*. For example, if you've registered a job
        that should run every minute and you only call run_pending()
        in one hour increments then your job won't be run 60 times in
        between but only once.
        """
        runnable_jobs = (job for job in self.jobs if job.should_run)
        for job in runnable_jobs:
            self._run_job(job)

    def run_all(self, delay_seconds=0):
        """
        Run all jobs regardless if they are scheduled to run or not.
        A delay of `delay` seconds is added between each job. This helps
        distribute system load generated by the jobs more evenly
        over time.
        :param delay_seconds: A delay added between every executed job
        """
        logger.info('Running *all* %i jobs with %is delay inbetween',
                    len(self.jobs), delay_seconds)
        for job in self.jobs[:]:
            self._run_job(job)
            time.sleep(delay_seconds)

    def clear(self):
        """
        Deletes scheduled jobs
        """
        del self.jobs[:]

    def cancel_job(self, job):
        """
        Delete a scheduled job.
        If the job is running it won't be stopped.
        :param job: The job to be unscheduled
        """
        try:
            self.jobs.remove(job)
        except ValueError:
            pass

    def cron(self, crontab, job_func):
        """
        Schedule a new periodic job.
        :param crontab: A crontab-like time specification
        :param job_func_or_command: the function to be executed
        :return: A configured :class:`Job <Job>`
        """
        job = Job(crontab, job_func)
        return job

    def _run_job(self, job):
        ret = job.job_func()

    def add_job(self, crontab, job_func):
        """
        Create a job and add it to this Scheduler
        :param crontab:
            string containing crontab pattern
            Its tokens may be either: 1 (if alias), 5 (without year token), 6 (with year token)
        :param job_func:
            the job 0-ary function to run
        :return: a Job
        """
        job = Job(crontab, job_func, self)
        self.jobs.append(job)
        return job
        
    def _load_crontab_line(self, rownum, crontab_line, job_func_func=std_launch_func):
        """
        create a Job from a single crontab entry, and add it to this Scheduler
        :param crontab_line:
            a line from crontab
            PRE: not empty and it not a comment
        :param job_func_func:
            function to be executed, @see load_crontab_file
        :return: a Job 
        """
        pieces = crontab_line.split()
        
        #is pattern using aliases?
        if pieces[0] in ALIASES.keys():
            try:
                #pattern using alias
                job = self.add_job(pieces[0:1], job_func_func(pieces[1:]))
                return job
            except ValueError:
                print("Error at line %d, cannot parse pattern" % rownum)    #shouldn't happen
                return None
        elif len(pieces) < 6:
            print("Error at line %d, expected at least 6 tokens" % rownum)
            return None
            if len(pieces) >= 7:
                try:
                    #pattern including year
                    job = self.add_job(" ".join(pieces[0:6]), job_func_func(pieces[6:]))
                    return job
                except ValueError:
                    pass
            try:
                #pattern not including  year
                job = self.add_job(" ".join(pieces[0:5]), job_func_func(pieces[5:]))
                return job
            except ValueError:
                print("Error at line %d, cannot parse pattern" % rownum)
                return None

    def load_crontab_file(self, crontab_file, clear=True, job_func_func=std_launch_func):
        """
        Read crontab file, create corresponding jobs in this scheduler
        :param crontab_file:
            crontab file path
        :param job_func_func:
            a function that takes a list of tokens (from crontab file) and returns a 0-args function
        :param clear:
            should the new schedule override the previous ones?
        """
        if clear:
            self.clear()
        with open(crontab_file) as fp:  
            for rownum, line in enumerate(fp):
                if line is not None:	# not sure if this can happen
                    line = line.strip()
                    if line != "" and line[0] != "#":
                        #skip empty lines and comments
                        self._load_crontab_line(rownum, line, job_func_func)

    def main_loop(self):
        """
        Perform main run-and-wait loop.
        """
        import time
        while True:
            self.run_pending()
            time.sleep(self.delay)


A test_scheduler.py => test_scheduler.py +64 -0
@@ 0,0 1,64 @@
"""Unit tests for pcrond.py"""
import datetime
import unittest #use "assert" and "with self.assertRaises(ValueError)"

# Silence "missing docstring", "method could be a function",
# "class already defined", and "too many public methods" messages:
# pylint: disable-msg=R0201,C0111,E0102,R0904,R0901

from pcrond import scheduler, Job, Scheduler


def do_nothing():
    pass

class SchedulerTests(unittest.TestCase):
    def setUp(self):
        scheduler.clear()

    def _test_job_constructor_basic(self):
        job = Job("* * * * *")
        assert len(job.allowed_min) == 60
        assert len(job.allowed_hours) == 24
        assert len(job.allowed_months) == 12
        assert len(job.allowed_days_of_week) == 7
        assert len(job.allowed_days_of_month) == 31
        assert datetime.datetime.now().year in job.allowed_years
        assert not job.allowed_last_day_of_month

    def _test_job_constructor_more_complicated(self):
        job = Job("30 4 * mar-jun,dec mon")
        assert job.allowed_min == set([30])
        assert job.allowed_hours == set([4])
        assert job.allowed_months == set([3,4,5,6,12])
        assert job.allowed_days_of_week == set([1])
        assert len(job.allowed_days_of_month) == 31
        assert datetime.datetime.now().year in job.allowed_years
        assert not job.allowed_last_day_of_month

    def _test_job_constructor_L(self):
        job = Job("* * L * *")
        assert job.allowed_last_day_of_month
        assert job.allowed_days_of_month == set([-1])
        assert job._check_day_in_month(datetime.datetime(2019,3,31))
        assert not job._check_day_in_month(datetime.datetime(2019,3,28))
        assert job._check_day_in_month(datetime.datetime(2019,2,28))

    def test_job_constructor_reverse_order(self):
        job = Job("* 23-4 * * *")
        print(job.allowed_hours)
        assert job.allowed_hours == set([23,0,1,2,3,4])

    def _test_misconfigured_job_wont_break_scheduler(self):
        """
        Ensure an interrupted job definition chain won't break
        the scheduler instance permanently.
        """
        scheduler.add_job("* * * * *", do_nothing)
        with self.assertRaises(ValueError):
            scheduler.add_job("some very bad string pattern", do_nothing)
        scheduler.run_pending()
        
        
if __name__ == '__main__':
    unittest.main()
\ No newline at end of file