-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtests.py
116 lines (100 loc) · 3.71 KB
/
tests.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#!/usr/bin/env python
import unittest
from multiprocessing import Queue
import time
import mock
from celery_task import WorkerBootstep
class CallbackMock(object):
called = False
wrong_called = False
def __init__(self, wrong=False):
if not wrong:
CallbackMock.called = False
CallbackMock.wrong_called = False
self.flag_wrong = wrong
def __call__(self):
if not self.flag_wrong:
CallbackMock.called = True
else:
CallbackMock.wrong_called = True
class TestWorkerBootStep(unittest.TestCase):
def setUp(self):
self.celery_worker = mock.Mock()
self.worker_bootstep = WorkerBootstep(self.celery_worker)
def test_start(self):
self.worker_bootstep.start(self.celery_worker)
self.celery_worker.timer.call_repeatedly.assert_called_once_with(
WorkerBootstep.GRANULARITY,
self.worker_bootstep.process_timers,
args=(WorkerBootstep.QUEUE_ADD, WorkerBootstep.QUEUE_REMOVE)
)
def test_consume_queues(self):
queue_add = Queue()
queue_remove = Queue()
now = time.time()
callback_add = CallbackMock()
callback_remove = lambda: 'not'
self.worker_bootstep.callbacks = {
'to_remove': {
'callback': callback_remove,
'ts_fire': now + 3
}
}
to_add = ('to_add', callback_add, now + 3)
to_remove = 'to_remove'
queue_add.put(to_add)
queue_remove.put(to_remove)
self.worker_bootstep._consume_queues(queue_add, queue_remove)
self.assertIn('to_add', self.worker_bootstep.callbacks)
self.assertIsInstance(
self.worker_bootstep.callbacks['to_add']['callback'],
type(callback_add)
)
self.assertEqual(self.worker_bootstep.callbacks['to_add']['ts_fire'],
now + 3)
@mock.patch('celery_task.time', new=mock.Mock(time=lambda: 5))
def test_fire_timers(self):
fired_call = mock.Mock()
not_fired_call = mock.Mock()
self.worker_bootstep.callbacks = {
'fired': {
'callback': fired_call,
'ts_fire': 2,
},
'not_fired': {
'callback': not_fired_call,
'ts_fire': 6,
}
}
self.worker_bootstep._fire_timers()
self.assertIn('not_fired', self.worker_bootstep.callbacks)
self.assertNotIn('fired', self.worker_bootstep.callbacks)
self.assertTrue(fired_call.called)
def test_process_timers(self):
queue_add = Queue()
queue_remove = Queue()
now = time.time()
callback_add = CallbackMock()
callback_remove = CallbackMock(wrong=True)
self.worker_bootstep.callbacks = {
'to_remove': {
'callback': callback_remove,
'ts_fire': now + 0.01
}
}
to_add = ('to_add', callback_add, now + 0.01)
to_remove = 'to_remove'
WorkerBootstep.QUEUE_ADD = queue_add
WorkerBootstep.QUEUE_REMOVE = queue_remove
queue_add.put(to_add)
queue_remove.put(to_remove)
self.worker_bootstep.process_timers(self.worker_bootstep.QUEUE_ADD,
self.worker_bootstep.QUEUE_REMOVE)
self.assertFalse(callback_add.called)
time.sleep(0.1)
self.worker_bootstep.process_timers(self.worker_bootstep.QUEUE_ADD,
self.worker_bootstep.QUEUE_REMOVE)
self.assertTrue(callback_add.called)
self.assertFalse(callback_remove.wrong_called)
if __name__ == '__main__':
unittest.main(verbosity=2)