~yerinalexey/pcrond

ref: cf699d50cc90d3569245cbb453b48ff7a9818e40 pcrond/pcrond/sched.py -rw-r--r-- 7.0 KiB
cf699d50 — Alexey Yerin refactor: remove default Scheduler instance 6 months ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# most of the code here comes from https://github.com/dbader/schedule

from .job import Job, ALIASES
import logging
import time

logger = logging.getLogger('pcrond')


def std_launch_func(cmd_splitted, stdin=None):
    """
    Default way of executing commands is to invoke subprocess.run()
    """
    if stdin is None:
        def f():
            logger.info("Now running: " + str(cmd_splitted))
            from subprocess import Popen
            Popen(cmd_splitted, stdin=None, stdout=None, stderr=None)
            # not returning anything here
    else:
        def f():
            logger.info("Now running: " + str(cmd_splitted))
            from subprocess import Popen, PIPE
            p = Popen(cmd_splitted, stdin=PIPE, stdout=None, stderr=None)
            p.communicate(input=stdin)
            # not returning anything here
    return f


class Scheduler(object):
    """
    Objects instantiated by the :class:`Scheduler <Scheduler>` are
    factories to create jobs, keep record of scheduled jobs and
    handle their execution.
    """
    def __init__(self):
        self.delay = 60         # in seconds
        self.jobs = []
        self.ask_for_stop = False

    def run_pending(self):
        """
        Run all jobs that are scheduled to run.
        Please note that it is *intended behavior that run_pending()
        does not run missed jobs*. For example, if you've registered a job
        that should run every minute and you only call run_pending()
        in one hour increments then your job won't be run 60 times in
        between but only once.
        """
        logger.debug("available jobs: " + str(self.jobs))
        runnable_jobs = (job for job in self.jobs if job.should_run())
        logger.debug("runnable jobs: " + str(self.jobs))
        for job in runnable_jobs:
            job.run()

    def run_all(self, delay_seconds=0):
        """
        Run all jobs regardless if they are scheduled to run or not.
        A delay of `delay` seconds is added between each job. This helps
        distribute system load generated by the jobs more evenly
        over time.
        :param delay_seconds: A delay added between every executed job
        """
        logger.info('Running *all* %i jobs with %is delay inbetween',
                    len(self.jobs), delay_seconds)
        for job in self.jobs[:]:
            job.run()
            time.sleep(delay_seconds)

    def clear(self):
        """
        Deletes scheduled jobs
        """
        del self.jobs[:]
        logger.info("jobs cleared")

    def cancel_job(self, job):
        """
        Delete a scheduled job.
        If the job is running it won't be stopped.
        :param job: The job to be unscheduled
        """
        try:
            self.jobs.remove(job)
        except ValueError:
            pass

    def cron(self, crontab, job_func):
        """
        Create a job and add it to this Scheduler
        :param crontab:
            string containing crontab pattern
            Its tokens may be either: 1 (if alias), 5 (without year token),
            6 (with year token)
        :param job_func:
            the job 0-ary function to run
        :return: a Job
        """
        job = Job(crontab, job_func, self)
        self.jobs.append(job)
        return job

    def _load_crontab_line(self, rownum, crontab_line, job_func_func=std_launch_func, stdin=None):
        """
        create a Job from a single crontab entry, and add it to this Scheduler
        :param crontab_line:
            a line from crontab
            PRE: not empty and it not a comment
        :param job_func_func:
            function to be executed, @see load_crontab_file
        :return: a Job
        """
        pieces = crontab_line.split()

        if pieces[0] in ALIASES.keys():
            try:
                # CASE 1 - pattern using alias
                job = self.cron(pieces[0], job_func_func(pieces[1:]))
                return job
            except ValueError as e:
                # shouldn't happen
                logger.error(("Error at line %d, cannot parse pattern, the line will be ignored.\r\n" +
                              "Inner Exception: %s") % (rownum, str(e)))
                return None
        if len(pieces) < 6:
            logger.error("Error at line %d, expected at least 6 tokens" % rownum)
            return None
        if len(pieces) >= 7:
            try:
                # CASE 2 - pattern including year
                job = self.cron(" ".join(pieces[0:6]), job_func_func(pieces[6:]))
                return job
            except ValueError:
                pass
        try:
            # CASE 3 - pattern not including  year
            job = self.cron(" ".join(pieces[0:5]), job_func_func(pieces[5:]))
            return job
        except ValueError as e:
            logger.error(("Error at line %d, cannot parse pattern, the line will be ignored.\r\n" +
                          "Inner Exception: %s") % (rownum, str(e)))
            return None

    def _split_input_line(self, s):
        """
        Command is split in command and stdin using %, not %%
        :return: two strings, command and stdin
        """
        # s == aaaa%%bbbbbb%cccc%dd%%ee
        pieces = [x.split('%') for x in s.split('%%')]
        # pieces == [[aaaa],[bbbbbb,cccc,dd],[ee]]
        rejoin = "%".join(["\n".join(x) for x in pieces])
        # rejoin == aaaa%bbbbbb\ncccc\ndd%ee
        return rejoin.split('\n', 1)
        # lines == [aaaa%bbbbbb,ccc\ndd%ee]

    def load_crontab_file(self, crontab_file, clear=True, job_func_func=std_launch_func):
        """
        Read crontab file, create corresponding jobs in this scheduler
        :param crontab_file:
            crontab file path
        :param job_func_func:
            a function that takes a list of tokens (from crontab file) and
            returns a 0-args function
        :param clear:
            should the new schedule override the previous ones?
        """
        if clear:
            self.clear()
        with open(crontab_file) as fp:
            for rownum, line in enumerate(fp):
                if line is not None:                  # not sure if this can happen
                    line = line.strip()
                    if line != "" and line[0] != "#":
                        # skip empty lines and comments
                        pieces = self._split_input_line(line)
                        stdin = pieces[1] if len(pieces) > 1 else None
                        self._load_crontab_line(rownum, pieces[0], job_func_func, stdin)
                        # TODO support % sign inside command, should consider pieces[1] if any
        logger.info(str(len(self.jobs)) + " jobs loaded from configuration file")

    def main_loop(self):
        """
        Perform main run-and-wait loop.
        """
        import time
        while not self.ask_for_stop:
            self.run_pending()
            time.sleep(self.delay)
            # FIXME this will look at self.ask_for_stop only every self.delay seconds
            # see https://stackoverflow.com/questions/5114292/break-interrupt-a-time-sleep-in-python