Skip to content

Commit b77f2ff

Browse files
authored
Merge pull request #549 from minrk/simpler-ids
Simpler default cluster / engine set ids
2 parents 5a791db + 04725e1 commit b77f2ff

File tree

5 files changed

+46
-24
lines changed

5 files changed

+46
-24
lines changed

ipyparallel/apps/baseapp.py

+1-8
Original file line numberDiff line numberDiff line change
@@ -118,13 +118,6 @@ def _work_dir_changed(self, change):
118118
""",
119119
)
120120

121-
@observe('cluster_id')
122-
def _cluster_id_changed(self, change):
123-
if change['new']:
124-
self.name = '{}-{}'.format(self.__class__.name, change['new'])
125-
else:
126-
self.name = self.__class__.name
127-
128121
loop = Instance(IOLoop)
129122

130123
def _loop_default(self):
@@ -197,7 +190,7 @@ def reinit_logging(self):
197190
pass
198191
if self.log_to_file:
199192
# Start logging to the new log file
200-
log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
193+
log_filename = f"{self.name}-{self.cluster_id}-{os.getpid()}.log"
201194
logfile = os.path.join(log_dir, log_filename)
202195
if sys.__stderr__:
203196
print(f"Sending logs to {logfile}", file=sys.__stderr__)

ipyparallel/cluster/cluster.py

+25-9
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ class Cluster(AsyncFirst, LoggingConfigurable):
101101

102102
@default("cluster_id")
103103
def _default_cluster_id(self):
104-
return f"{socket.gethostname()}-{int(time.time())}-{''.join(random.choice(_suffix_chars) for i in range(4))}"
104+
return f"{int(time.time())}-{''.join(random.choice(_suffix_chars) for i in range(4))}"
105105

106106
profile_dir = Unicode(
107107
help="""The profile directory.
@@ -511,6 +511,7 @@ def from_file(
511511
def write_cluster_file(self):
512512
"""Write cluster info to disk for later loading"""
513513
os.makedirs(os.path.dirname(self.cluster_file), exist_ok=True)
514+
self.log.debug(f"Updating {self.cluster_file}")
514515
with open(self.cluster_file, "w") as f:
515516
json.dump(self.to_dict(), f)
516517

@@ -523,6 +524,14 @@ def remove_cluster_file(self):
523524
else:
524525
self.log.debug(f"Removed cluster file: {self.cluster_file}")
525526

527+
def _is_running(self):
528+
"""Return if we have any running components"""
529+
if self.controller and self.controller.state != 'after':
530+
return True
531+
if any(es.state != 'after' for es in self.engines.values()):
532+
return True
533+
return False
534+
526535
def update_cluster_file(self):
527536
"""Update my cluster file
528537
@@ -533,9 +542,7 @@ def update_cluster_file(self):
533542
# setting cluster_file='' disables saving to disk
534543
return
535544

536-
if (not self.controller or self.controller.state == 'after') and not any(
537-
es.state == 'after' for es in self.engines.values()
538-
):
545+
if not self._is_running():
539546
self.remove_cluster_file()
540547
else:
541548
self.write_cluster_file()
@@ -604,14 +611,23 @@ def _controller_stopped(self, stop_data=None):
604611
self.log.info(f"Controller stopped: {stop_data}")
605612
self.update_cluster_file()
606613

614+
def _new_engine_set_id(self):
615+
"""Generate a new engine set id"""
616+
engine_set_id = base = f"{int(time.time())}"
617+
i = 1
618+
while engine_set_id in self.engines:
619+
engine_set_id = f"{base}-{i}"
620+
i += 1
621+
return engine_set_id
622+
607623
async def start_engines(self, n=None, engine_set_id=None, **kwargs):
608624
"""Start an engine set
609625
610626
Returns an engine set id which can be used in stop_engines
611627
"""
612628
# TODO: send engines connection info
613629
if engine_set_id is None:
614-
engine_set_id = f"{int(time.time())}-{''.join(random.choice(_suffix_chars) for i in range(4))}"
630+
engine_set_id = self._new_engine_set_id()
615631
engine_set = self.engines[engine_set_id] = self.engine_launcher_class(
616632
work_dir=u'.',
617633
parent=self,
@@ -853,10 +869,10 @@ def load_clusters(
853869
for key, cluster in list(self.clusters.items()):
854870
# remove stopped clusters
855871
# but not *new* clusters that haven't started yet
856-
if (cluster.controller and cluster.controller.state == 'after') and all(
857-
es.state == 'after' for es in cluster.engines.values()
858-
):
859-
self.log.info("Removing stopped cluster {key}")
872+
# if `cluster.controller` is present
873+
# that means it was running at some point
874+
if cluster.controller and not cluster._is_running():
875+
self.log.info(f"Removing stopped cluster {key}")
860876
self.clusters.pop(key)
861877

862878
if profile_dirs is None:

ipyparallel/cluster/launcher.py

+10
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,11 @@ async def join(self, timeout=None):
490490
raise TimeoutError(
491491
f"Process {self.pid} did not complete in {timeout} seconds."
492492
)
493+
if getattr(self, '_stop_waiting', None) and getattr(self, "_wait_thread", None):
494+
self._stop_waiting.set()
495+
# got here, should be done
496+
# wait for wait_thread to cleanup
497+
self._wait_thread.join()
493498

494499
def _stream_file(self, path):
495500
"""Stream one file"""
@@ -1226,6 +1231,11 @@ async def join(self, timeout=None):
12261231
wait()
12271232
else:
12281233
await asyncio.wrap_future(future)
1234+
if getattr(self, '_stop_waiting', None) and getattr(self, "_wait_thread", None):
1235+
self._stop_waiting.set()
1236+
# got here, should be done
1237+
# wait for wait_thread to cleanup
1238+
self._wait_thread.join()
12291239

12301240
def signal(self, sig):
12311241
if self.state == 'running':

ipyparallel/controller/app.py

+5-3
Original file line numberDiff line numberDiff line change
@@ -235,9 +235,11 @@ class IPController(BaseParallelApplication):
235235

236236
@observe('cluster_id')
237237
def _cluster_id_changed(self, change):
238-
super()._cluster_id_changed(change)
239-
self.engine_json_file = "%s-engine.json" % self.name
240-
self.client_json_file = "%s-client.json" % self.name
238+
base = 'ipcontroller'
239+
if change.new:
240+
base = f"{base}-{change.new}"
241+
self.engine_json_file = f"{base}-engine.json"
242+
self.client_json_file = f"{base}-client.json"
241243

242244
# internal
243245
children = List()

ipyparallel/tests/test_cluster.py

+5-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import asyncio
22
import json
3+
import logging
34
import os
45
import signal
56
import sys
@@ -310,9 +311,9 @@ async def test_default_from_file(Cluster):
310311

311312

312313
async def test_cluster_manager_notice_stop(Cluster):
313-
cm = cluster.ClusterManager()
314+
cm = cluster.ClusterManager(log=logging.getLogger())
314315
cm.load_clusters()
315-
c = Cluster(n=1)
316+
c = Cluster(n=1, log=cm.log)
316317
key = cm._cluster_key(c)
317318
assert key not in cm.clusters
318319

@@ -326,8 +327,8 @@ async def test_cluster_manager_notice_stop(Cluster):
326327
# refresh list, cleans out stopped clusters
327328
# can take some time to notice
328329
tic = time.perf_counter()
329-
deadline = time.perf_counter() + 30
330+
deadline = time.perf_counter() + _timeout
330331
while time.perf_counter() < deadline and key in cm.clusters:
331-
time.sleep(0.2)
332+
await asyncio.sleep(0.2)
332333
cm.load_clusters()
333334
assert key not in cm.clusters

0 commit comments

Comments
 (0)