~yerinalexey/pcrond

ref: ca11ceb4431b59d320e1929e3d9b9e37aee792d8 pcrond/test_scheduler.py -rwxr-xr-x 4.4 KiB
ca11ceb4 — Luca Vercelli crond expressions bugfixs 2 years ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#!/usr/bin/env python
"""Unit tests for pcrond.py"""
import datetime
import unittest

from pcrond import scheduler, Job


def do_nothing():
    pass


def modify_obj(obj):
    def f():
        obj['modified'] = True
    return f


class SchedulerTests(unittest.TestCase):
    def setUp(self):
        scheduler.clear()

    def test_job_constructor_basic(self):
        job = Job("* 4 * * *")
        ###
        assert job.allowed_every_min
        ###
        assert not job.allowed_every_hour
        assert job.allowed_hours == set([4])
        ###
        assert job.allowed_every_dom
        assert not job.allowed_last_dom
        ###
        assert job.allowed_every_month
        ###
        assert job.allowed_every_dow
        ###
        assert job.allowed_every_year

    def test_job_constructor_more_complicated(self):
        job = Job("30 */3 * mar-jun,dec mon")
        ###
        assert not job.allowed_every_min
        assert job.allowed_min == set([30])
        ###
        assert not job.allowed_every_hour
        assert job.allowed_hours == set([0, 3, 6, 9, 12, 15, 18, 21])
        ###
        assert job.allowed_every_dom
        assert not job.allowed_last_dom
        ###
        assert not job.allowed_every_month
        assert job.allowed_months == set([3, 4, 5, 6, 12])
        ###
        assert not job.allowed_every_dow
        assert job.allowed_dow == set([1])
        ###
        assert job.allowed_every_year

    def test_job_constructor_L(self):
        job = Job("* * L * *")
        assert job.allowed_last_dom
        assert job.allowed_dom == set([-1])
        assert job._should_run_at(datetime.datetime(2019, 3, 31))
        assert not job._should_run_at(datetime.datetime(2019, 3, 28))
        assert job._should_run_at(datetime.datetime(2019, 2, 28))

    def test_job_constructor_alias(self):
        job = Job("@hourly")
        assert not job.allowed_every_min
        assert job.allowed_every_hour
        assert job.allowed_every_month
        assert job.allowed_every_year

    def test_job_constructor_reverse_order(self):
        job = Job("* 23-4 * * *")
        assert job.allowed_hours == set([23, 0, 1, 2, 3, 4])

    def test_job_constructor_wrong(self):
        with self.assertRaises(ValueError):
            Job("some silly text")
        with self.assertRaises(ValueError):
            Job("* * goofy * *")
        with self.assertRaises(ValueError):
            Job("* * * * * L")
        with self.assertRaises(ValueError):
            Job("* 1-2-3 * *")
        with self.assertRaises(ValueError):
            Job("* 1;2;3 * *")
        with self.assertRaises(ValueError):
            Job("* @hourly")
        # currently, hour=25 does not raise errors.

    def test_misconfigured_job_wont_break_scheduler(self):
        """
        Ensure an interrupted job definition chain won't break
        the scheduler instance permanently.
        """
        scheduler.cron("* * * * *", do_nothing)
        with self.assertRaises(ValueError):
            scheduler.cron("some very bad string pattern", do_nothing)
        scheduler.run_pending()

    def test_add_job_run_all(self):
        """ schedule a task, then invoke run_all()"""
        test_obj = {'modified': False}
        scheduler.cron("* * * * *", modify_obj(test_obj))
        assert len(scheduler.jobs) == 1
        scheduler.run_all()
        assert test_obj['modified']

    def test_add_job_run_pending(self):
        """ schedule a task for this exact minute, then invoke run_pending()"""
        test_obj = {'modified': False}
        now = datetime.datetime.now()
        scheduler.cron("%d %d * * *" % (now.minute, now.hour), modify_obj(test_obj))
        assert len(scheduler.jobs) == 1
        scheduler.run_pending()
        assert test_obj['modified']

    def test_add_job_run_pending_not(self):
        """ schedule a task for this exact minute+5, then invoke run_pending()"""
        test_obj = {'modified': False}
        now = datetime.datetime.now()
        scheduler.cron("%d %d * * *" % (now.minute+5, now.hour), modify_obj(test_obj))
        assert len(scheduler.jobs) == 1
        scheduler.run_pending()
        assert test_obj['modified'] is False

    def test_load_crontab(self):
        """ load test crontab file """
        import os
        scheduler.load_crontab_file(os.path.join("tests", "crontab.txt"))
        assert len(scheduler.jobs) == 3


if __name__ == '__main__':
    unittest.main()