Skip to content

Commit 068e133

Browse files
author
Michael Hammann
committed
feature: implement depends_on parameter to spawn processes only when all dependees are in RUNNING state
1 parent f4ff37d commit 068e133

File tree

4 files changed

+159
-13
lines changed

4 files changed

+159
-13
lines changed

Diff for: supervisor/options.py

+8-2
Original file line numberDiff line numberDiff line change
@@ -932,6 +932,8 @@ def get(section, opt, *args, **kwargs):
932932
serverurl = get(section, 'serverurl', None)
933933
if serverurl and serverurl.strip().upper() == 'AUTO':
934934
serverurl = None
935+
depends_on = get(section, 'depends_on', None)
936+
spawn_timeout = int(get(section, 'spawn_timeout', 60))
935937

936938
# find uid from "user" option
937939
user = get(section, 'user', None)
@@ -1057,7 +1059,10 @@ def get(section, opt, *args, **kwargs):
10571059
exitcodes=exitcodes,
10581060
redirect_stderr=redirect_stderr,
10591061
environment=environment,
1060-
serverurl=serverurl)
1062+
serverurl=serverurl,
1063+
depends_on=depends_on,
1064+
spawn_timeout=spawn_timeout,
1065+
)
10611066

10621067
programs.append(pconfig)
10631068

@@ -1875,7 +1880,8 @@ class ProcessConfig(Config):
18751880
'stderr_events_enabled', 'stderr_syslog',
18761881
'stopsignal', 'stopwaitsecs', 'stopasgroup', 'killasgroup',
18771882
'exitcodes', 'redirect_stderr' ]
1878-
optional_param_names = [ 'environment', 'serverurl' ]
1883+
optional_param_names = [ 'environment', 'serverurl',
1884+
'depends_on', 'spawn_timeout' ]
18791885

18801886
def __init__(self, options, **params):
18811887
self.options = options

Diff for: supervisor/process.py

+27-9
Original file line numberDiff line numberDiff line change
@@ -189,11 +189,29 @@ def record_spawnerr(self, msg):
189189
self.spawnerr = msg
190190
self.config.options.logger.info("spawnerr: %s" % msg)
191191

192-
def spawn(self):
192+
def queue_all_dependee_processes(self, supervisor):
193+
if (self.config.name not in supervisor.process_spawn_dict.keys() and
194+
self.config.name not in supervisor.process_started_dict.keys()):
195+
supervisor.process_spawn_dict[self.config.name] = self
196+
if self.config.depends_on is not None:
197+
for dependee in self.config.depends_on.values():
198+
if dependee.state is not ProcessStates.RUNNING and dependee.state is not ProcessStates.STARTING:
199+
if (dependee.config.name not in supervisor.process_spawn_dict.keys() and
200+
dependee.config.name not in supervisor.process_started_dict.keys()):
201+
supervisor.process_spawn_dict[dependee.config.name] = dependee
202+
dependee.queue_all_dependee_processes(supervisor)
203+
204+
def spawn(self, supervisor=None):
193205
"""Start the subprocess. It must not be running already.
194206
195207
Return the process id. If the fork() call fails, return None.
196208
"""
209+
if self.config.depends_on is not None:
210+
if any([dependee.state is not ProcessStates.RUNNING for dependee in
211+
self.config.depends_on.values()]):
212+
self.queue_all_dependee_processes(supervisor)
213+
return
214+
197215
options = self.config.options
198216
processname = as_string(self.config.name)
199217

@@ -648,7 +666,7 @@ def __repr__(self):
648666
def get_state(self):
649667
return self.state
650668

651-
def transition(self):
669+
def transition(self, supervisor=None):
652670
now = time.time()
653671
state = self.state
654672

@@ -660,22 +678,22 @@ def transition(self):
660678
# dont start any processes if supervisor is shutting down
661679
if state == ProcessStates.EXITED:
662680
if self.config.autorestart:
681+
# STOPPED -> STARTING
663682
if self.config.autorestart is RestartUnconditionally:
664683
# EXITED -> STARTING
665-
self.spawn()
684+
self.spawn(supervisor)
666685
else: # autorestart is RestartWhenExitUnexpected
667686
if self.exitstatus not in self.config.exitcodes:
668687
# EXITED -> STARTING
669-
self.spawn()
688+
self.spawn(supervisor)
670689
elif state == ProcessStates.STOPPED and not self.laststart:
671690
if self.config.autostart:
672-
# STOPPED -> STARTING
673-
self.spawn()
691+
self.spawn(supervisor)
674692
elif state == ProcessStates.BACKOFF:
675693
if self.backoff <= self.config.startretries:
676694
if now > self.delay:
677695
# BACKOFF -> STARTING
678-
self.spawn()
696+
self.spawn(supervisor)
679697

680698
processname = as_string(self.config.name)
681699
if state == ProcessStates.STARTING:
@@ -837,9 +855,9 @@ def before_remove(self):
837855
pass
838856

839857
class ProcessGroup(ProcessGroupBase):
840-
def transition(self):
858+
def transition(self, supervisor=None):
841859
for proc in self.processes.values():
842-
proc.transition()
860+
proc.transition(supervisor)
843861

844862
class FastCGIProcessGroup(ProcessGroup):
845863

Diff for: supervisor/rpcinterface.py

+10-1
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,9 @@ def startProcess(self, name, wait=True):
281281
@return boolean result Always true unless error
282282
283283
"""
284+
## check if the process is dependent upon any other process and if so make sure that one is in the RUNNING state
285+
group, process = self._getGroupAndProcess(name)
286+
284287
self._update('startProcess')
285288
group, process = self._getGroupAndProcess(name)
286289
if process is None:
@@ -303,7 +306,11 @@ def startProcess(self, name, wait=True):
303306
raise RPCError(Faults.FAILED,
304307
"%s is in an unknown process state" % name)
305308

306-
process.spawn()
309+
process.spawn(self.supervisord)
310+
# if process has dependees, return succesfull start -
311+
# errors will be handled in main loop and inside process.spawn()
312+
if process.config.depends_on is not None:
313+
return True
307314

308315
# We call reap() in order to more quickly obtain the side effects of
309316
# process.finish(), which reap() eventually ends up calling. This
@@ -592,6 +599,8 @@ def getAllConfigInfo(self):
592599
'stderr_logfile_backups': pconfig.stderr_logfile_backups,
593600
'stderr_logfile_maxbytes': pconfig.stderr_logfile_maxbytes,
594601
'stderr_syslog': pconfig.stderr_syslog,
602+
'depends_on': pconfig.depends_on,
603+
'spawn_timeout': pconfig.spawn_timeout,
595604
}
596605
# no support for these types in xml-rpc
597606
d.update((k, 'auto') for k, v in d.items() if v is Automatic)

Diff for: supervisor/supervisord.py

+114-1
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@
4444
from supervisor import events
4545
from supervisor.states import SupervisorStates
4646
from supervisor.states import getProcessStateDescription
47+
from supervisor.graphutils import Graph
48+
49+
from supervisor.states import ProcessStates
4750

4851
class Supervisor:
4952
stopping = False # set after we detect that we are handling a stop request
@@ -55,6 +58,8 @@ def __init__(self, options):
5558
self.options = options
5659
self.process_groups = {}
5760
self.ticks = {}
61+
self.process_spawn_dict = dict()
62+
self.process_started_dict = dict()
5863

5964
def main(self):
6065
if not self.options.first:
@@ -84,6 +89,29 @@ def run(self):
8489
try:
8590
for config in self.options.process_group_configs:
8691
self.add_process_group(config)
92+
# add processes to directed graph, to check for dependency cycles
93+
g = Graph(len(self.options.process_group_configs))
94+
# replace depends_on string with actual process object
95+
for config in (self.options.process_group_configs):
96+
# check dependencies for all programs in group:
97+
for conf in enumerate(config.process_configs):
98+
if config.process_configs[conf[0]].depends_on is not None:
99+
process_dict=dict({})
100+
# split to get all processes in case there are multiple dependencies
101+
dependent_processes = (config.process_configs[conf[0]].depends_on).split()
102+
for process in dependent_processes:
103+
# this can be of form group:process or simply process
104+
try:
105+
dependent_group, dependent_process=process.split(":")
106+
except:
107+
dependent_group=dependent_process=process
108+
g.addEdge(config.process_configs[conf[0]].name, dependent_process)
109+
process_dict[dependent_process] = self.process_groups[dependent_group].processes[dependent_process]
110+
config.process_configs[conf[0]].depends_on = process_dict
111+
# check for cyclical process dependencies
112+
if g.cyclic() == 1:
113+
raise AttributeError('Process config contains dependeny cycle(s)! Check config files again!')
114+
87115
self.options.openhttpservers(self)
88116
self.options.setsignals()
89117
if (not self.options.nodaemon) and self.options.first:
@@ -239,7 +267,10 @@ def runforever(self):
239267
combined_map[fd].handle_error()
240268

241269
for group in pgroups:
242-
group.transition()
270+
group.transition(self)
271+
272+
self._spawn_dependee_queue()
273+
self._handle_spawn_timeout()
243274

244275
self.reap()
245276
self.handle_signal()
@@ -316,6 +347,88 @@ def handle_signal(self):
316347
def get_state(self):
317348
return self.options.mood
318349

350+
def _spawn_dependee_queue(self):
351+
"""
352+
Iterate over processes that are not started but added to
353+
process_spawn_dict. Spawn all processes which are ready
354+
(All dependees RUNNING or process without dependees)
355+
"""
356+
if self.process_spawn_dict:
357+
for process_name, process_object in list(self.process_spawn_dict.items()):
358+
if process_object.config.depends_on is not None:
359+
if any([dependee.state is ProcessStates.FATAL for dependee in
360+
process_object.config.depends_on.values()]):
361+
self._set_fatal_state_and_empty_queue()
362+
break
363+
if all([dependee.state is ProcessStates.RUNNING for dependee in
364+
process_object.config.depends_on.values()]):
365+
self._spawn_process_from_process_dict(process_name, process_object)
366+
else:
367+
self._spawn_process_from_process_dict(process_name, process_object)
368+
369+
def _spawn_process_from_process_dict(self, process_name, process_object):
370+
self.process_started_dict[process_name] = process_object
371+
del self.process_spawn_dict[process_name]
372+
# only spawn if the process is not running yet (could be started in the meanwhile)
373+
if (process_object.state is not ProcessStates.STARTING and
374+
process_object.state is not ProcessStates.RUNNING):
375+
process_object.spawn(self)
376+
process_object.notify_timer = 5
377+
378+
def _set_fatal_state_and_empty_queue(self):
379+
for process_name, process_object in self.process_spawn_dict.items():
380+
process_object.record_spawnerr(
381+
'Dependee process did not start - set FATAL state for {}'
382+
.format(process_name))
383+
process_object.change_state(ProcessStates.FATAL)
384+
self.process_spawn_set = set()
385+
self.process_spawn_dict = dict()
386+
387+
def _handle_spawn_timeout(self):
388+
"""
389+
Log info message each 5 seconds if some process is waiting on a dependee
390+
Timeout if a process needs longer than spawn_timeout (default=60 seconds)
391+
to reach RUNNING
392+
"""
393+
# check if any of the processes that was started did not make it and remove RUNNING ones.
394+
if self.process_started_dict:
395+
for process_name, process_object in list(self.process_started_dict.items()):
396+
if process_object.state is ProcessStates.RUNNING:
397+
del self.process_started_dict[process_name]
398+
# handle timeout error.
399+
elif (time.time() - process_object.laststart) >= process_object.config.spawn_timeout:
400+
self._timeout_process(process_name, process_object)
401+
# notify user about waiting
402+
elif (time.time() - process_object.laststart) >= process_object.notify_timer:
403+
self._notfiy_user_about_waiting(process_name, process_object)
404+
405+
def _timeout_process(self, process_name, process_object):
406+
msg = ("timeout: dependee process {} in {} did not reach RUNNING within {} seconds, dependees {} are not spawned"
407+
.format(process_name,
408+
getProcessStateDescription(process_object.state),
409+
process_object.config.spawn_timeout,
410+
[process for process in self.process_spawn_dict.keys()]))
411+
process_object.config.options.logger.warn(msg)
412+
process_object.record_spawnerr(
413+
'timeout: Process {} did not reach RUNNING state within {} seconds'
414+
.format(process_name,
415+
process_object.config.spawn_timeout))
416+
process_object.change_state(ProcessStates.FATAL)
417+
for process_name, process_object in self.process_spawn_dict.items():
418+
process_object.record_spawnerr(
419+
'Dependee process did not start - set FATAL state for {}'
420+
.format(process_name))
421+
process_object.change_state(ProcessStates.FATAL)
422+
self.process_spawn_dict = dict()
423+
self.process_started_dict = dict()
424+
425+
def _notfiy_user_about_waiting(self, process_name, process_object):
426+
process_object.notify_timer += 5
427+
msg = ("waiting for dependee process {} in {} state to be RUNNING"
428+
.format(process_name,
429+
getProcessStateDescription(process_object.state)))
430+
process_object.config.options.logger.info(msg)
431+
319432
def timeslice(period, when):
320433
return int(when - (when % period))
321434

0 commit comments

Comments
 (0)