-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcelery_task.py
130 lines (101 loc) · 3.82 KB
/
celery_task.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
from multiprocessing import Queue
import threading
from Queue import Empty
import time
import json
import requests
from celery import Celery, bootsteps
queue_add = Queue()
queue_remove = Queue()
class WorkerBootstep(bootsteps.StartStopStep):
QUEUE_ADD = queue_add
QUEUE_REMOVE = queue_remove
GRANULARITY = 1
def __init__(self, worker, **kwargs):
print "Setting up Worker Bootstep"
self.callbacks = {}
def start(self, worker):
""" Called by Celery when Bootstep is processed """
print "Setting up the timer task"
worker.timer.call_repeatedly(self.GRANULARITY, self.process_timers,
args=(self.QUEUE_ADD, self.QUEUE_REMOVE))
def process_timers(self, _queue_add, _queue_remove):
""" Method fired repeatedly by the Celery Timer """
self._consume_queues(_queue_add, _queue_remove)
self._fire_timers()
def _consume_queues(self, _queue_add, _queue_remove):
"""
Extract all callbacks in the queues and add/remove them
from the callbacks dict as appropiate.
In the queue to add, expected payload is a tuple like so:
(key_id, callback, ts_to_be_fired)
In the queue to remove, only the key is expected
"""
while True:
try:
key, callback, ts_fire = _queue_add.get(block=False)
except Empty:
break
else:
self.callbacks[key] = {'callback': callback,
'ts_fire': ts_fire}
while True:
try:
key = _queue_remove.get(block=False)
except Empty:
break
else:
try:
del self.callbacks[key]
except KeyError:
# Proper logging warning. May happen if fired before being
# removed
pass
def _fire_timers(self):
""" If Any of the callback's timestamps is greater than now,
fire the callback.
Callbacks are fired on their own Thread. Also remove the callback
from the callbacks dict.
"""
_defered_delete = []
for key, callback in self.callbacks.iteritems():
if callback['ts_fire'] <= time.time():
_defered_delete.append(key)
threaded_callback = threading.Thread(
target=callback['callback']
)
print "Firing {0}".format(key)
threaded_callback.start()
else:
print "{0} still not called, remaining: {1}".format(
key, time.time() - callback['ts_fire']
)
for key in _defered_delete:
del self.callbacks[key]
def scream():
""" Demo callback. requests httpbin.org/ip and prints your ip.
The print result can be seen in the Celery worker's log.
requires requests module.
"""
res = requests.get('http://httpbin.org/ip')
ip = json.loads(res.text)['origin']
print """*** TO BE SEEN IN CELERY LOG ***
Your IP is: {0}
""".format(ip)
app = Celery("tasks")
app.steps['worker'].add(WorkerBootstep)
@app.task
def foo(*args):
""" Demo task of Celery. When receiving a new message, sets a new
callback to be fired after 5 seconds.
If publishing messages from outside Celery, remember that the payload
required by Celery is a JSON object with these keys:
{
"task": "celery_task.foo", # Name of the celery task
"id": 123123, # No idea of its utility, can be random for now
"args": ["foo"] # List with the arguments for the tas
}
"""
countdown = 5
ts_fire = time.time() + countdown
queue_add.put(('demo callback_{0}'.format(time.time()), scream, ts_fire))