forked from ahmetb/personal-dashboard
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtaskhost.py
executable file
·117 lines (93 loc) · 3.32 KB
/
taskhost.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#!/usr/bin/env python
# coding: utf-8
import sys
import time
import json
import logging
import datetime
from apscheduler.scheduler import Scheduler
import simplegauges
import tasks
_tasks_config_file = 'tasks.config'
def main():
configure_logging()
logger = logging.getLogger('taskhost')
config = {}
try:
with open(_tasks_config_file) as f:
config = json.loads(f.read())
logger.debug('Successfully read configuration file.')
except Exception as e:
logger.critical('Cannot read configuration file: {0}'
.format(_tasks_config_file))
logger.critical(e)
sys.exit(1)
from simplegauges.datastores.azuretable import AzureGaugeDatastore
gauges_ds = AzureGaugeDatastore(config['azure.account'],
config['azure.key'], config['azure.table'])
gauge_factory = simplegauges.gauge_factory(gauges_ds)
tasks.set_simplegauges_factory(gauge_factory)
tasks.set_config(config)
import fixture # should be imported after setting configs for decorators
if not fixture.tasks:
logger.error('No tasks found in the fixture.py')
sys.exit(1)
errors = False
for task in fixture.tasks:
method = task[0]
name = '{0}.{1}'.format(method.__module__, method.__name__)
try:
task[0]()
logger.info('Successfully bootstrapped: {0}'.format(name))
except Exception as e:
errors = True
logger.error('Error while bootstrapping: {0}'.format(name))
logger.error(e)
if errors:
logger.info('Starting scheduler in 10 seconds...')
time.sleep(10)
else:
logger.info('Starting scheduler...')
# at this point all tasks ran once successfully
sched = Scheduler()
# schedule tasks
for task in fixture.tasks:
cron_kwargs = parse_cron_tuple(task[1])
sched.add_cron_job(task[0], **cron_kwargs)
sched.start()
logger.info('Scheduler started with {0} jobs.'
.format(len(sched.get_jobs())))
now = datetime.datetime.now()
for j in sched.get_jobs():
logger.debug('Scheduled: {0}.{1}, next run:{2}'
.format(j.func.__module__, j.func.__name__,
j.compute_next_run_time(now)))
# deamonize the process
while True:
time.sleep(10)
def parse_cron_tuple(cron_tuple):
"""Parses (hour,minute,second) or (hour,minute) or (hour) cron
scheduling defintions into kwargs dictionary
"""
if type(cron_tuple) is not tuple:
raise Exception('Given cron format is not tuple: {0}'
.format(cron_tuple))
kwargs = {}
l = len(cron_tuple)
if l > 0:
kwargs['hour'] = cron_tuple[0]
if l > 1:
kwargs['minute'] = cron_tuple[1]
if l > 2:
kwargs['second'] = cron_tuple[2]
return kwargs
def configure_logging():
logfmt = '[%(asctime)s] %(levelname)s [%(name)s] %(message)s'
# configure to StreamHandler with log format
logging.basicConfig(level=logging.DEBUG, format=logfmt)
# reduce noise from 3rd party packages
logging.getLogger('requests.packages.urllib3.connectionpool')\
.setLevel(logging.CRITICAL)
logging.getLogger('apscheduler').setLevel(logging.WARNING)
if __name__ == '__main__':
main()