Skip to content

Commit 947be39

Browse files
committed
handle 'after' state in launchers
- don't reserialize clusters that have stopped - load_clusters purges stopped clusters
1 parent fd57cd4 commit 947be39

File tree

4 files changed

+56
-11
lines changed

4 files changed

+56
-11
lines changed

ipyparallel/cluster/cluster.py

+25-5
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,7 @@ def __repr__(self):
378378
fields["profile_dir"] = repr(profile_dir)
379379

380380
if self.controller:
381-
fields["controller"] = "<running>"
381+
fields["controller"] = f"<{self.controller.state}>"
382382
if self.engines:
383383
fields["engine_sets"] = list(self.engines)
384384

@@ -398,7 +398,7 @@ def _cls_str(cls):
398398

399399
cluster_info["class"] = _cls_str(self.__class__)
400400

401-
if self.controller:
401+
if self.controller and self.controller.state != 'after':
402402
d["controller"] = {
403403
"class": _cls_str(self.controller_launcher_class),
404404
"state": None,
@@ -411,7 +411,8 @@ def _cls_str(cls):
411411
}
412412
sets = d["engines"]["sets"]
413413
for engine_set_id, engine_launcher in self.engines.items():
414-
sets[engine_set_id] = engine_launcher.to_dict()
414+
if engine_launcher.state != 'after':
415+
sets[engine_set_id] = engine_launcher.to_dict()
415416
return d
416417

417418
@classmethod
@@ -447,13 +448,15 @@ def from_dict(cls, d, **kwargs):
447448
)
448449
except launcher.NotRunning as e:
449450
self.log.error(f"Controller for {cluster_key} not running: {e}")
451+
else:
452+
self.controller.on_stop(self._controller_stopped)
450453

451454
engine_info = d.get("engines")
452455
if engine_info:
453456
cls = self.engine_launcher_class = import_item(engine_info["class"])
454457
for engine_set_id, engine_state in engine_info.get("sets", {}).items():
455458
try:
456-
self.engines[engine_set_id] = cls.from_dict(
459+
self.engines[engine_set_id] = engine_set = cls.from_dict(
457460
engine_state,
458461
engine_set_id=engine_set_id,
459462
parent=self,
@@ -462,6 +465,9 @@ def from_dict(cls, d, **kwargs):
462465
self.log.error(
463466
f"Engine set {cluster_key}{engine_set_id} not running: {e}"
464467
)
468+
else:
469+
engine_set.on_stop(partial(self._engines_stopped, engine_set_id))
470+
465471
# check if state changed
466472
if self.to_dict() != d:
467473
# if so, update our cluster file
@@ -527,7 +533,9 @@ def update_cluster_file(self):
527533
# setting cluster_file='' disables saving to disk
528534
return
529535

530-
if not self.controller and not self.engines:
536+
if (not self.controller or self.controller.state == 'after') and not any(
537+
es.state == 'after' for es in self.engines.values()
538+
):
531539
self.remove_cluster_file()
532540
else:
533541
self.write_cluster_file()
@@ -594,6 +602,7 @@ def add_args(args):
594602
def _controller_stopped(self, stop_data=None):
595603
"""Callback when a controller stops"""
596604
self.log.info(f"Controller stopped: {stop_data}")
605+
self.update_cluster_file()
597606

598607
async def start_engines(self, n=None, engine_set_id=None, **kwargs):
599608
"""Start an engine set
@@ -839,6 +848,17 @@ def load_clusters(
839848
- single profile by name
840849
- all IPython profiles, if nothing else specified
841850
"""
851+
852+
# first, check our current clusters
853+
for key, cluster in list(self.clusters.items()):
854+
# remove stopped clusters
855+
# but not *new* clusters that haven't started yet
856+
if (cluster.controller and cluster.controller.state == 'after') and all(
857+
es.state == 'after' for es in cluster.engines.values()
858+
):
859+
self.log.info("Removing stopped cluster {key}")
860+
self.clusters.pop(key)
861+
842862
if profile_dirs is None:
843863
if profile_dir is not None:
844864
profile_dirs = [profile_dir]

ipyparallel/cluster/launcher.py

+8-6
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ class BaseLauncher(LoggingConfigurable):
112112
start_data = Any()
113113
stop_data = Any()
114114

115-
identifier = Any(
115+
identifier = Unicode(
116116
help="Used for lookup in e.g. EngineSetLauncher during notify_stop and default log files"
117117
)
118118

@@ -521,7 +521,7 @@ def get_output(self, remove=False):
521521
with open(self.output_file) as f:
522522
self._output = f.read()
523523
except FileNotFoundError:
524-
self.log.error(f"Missing output file: {self.output_file}")
524+
self.log.debug(f"Missing output file: {self.output_file}")
525525
self._output = ""
526526
else:
527527
self._output = ""
@@ -658,13 +658,14 @@ def from_dict(cls, d, **kwargs):
658658
n = 0
659659
for i, engine_dict in d['engines'].items():
660660
try:
661-
self.launchers[i] = self.launcher_class.from_dict(
662-
engine_dict, parent=self
661+
self.launchers[i] = el = self.launcher_class.from_dict(
662+
engine_dict, identifier=i, parent=self
663663
)
664664
except NotRunning as e:
665665
self.log.error(f"Engine {i} not running: {e}")
666666
else:
667667
n += 1
668+
el.on_stop(self._notice_engine_stopped)
668669
if n == 0:
669670
raise NotRunning("No engines left")
670671
else:
@@ -676,15 +677,16 @@ def start(self, n):
676677
self.n = n
677678
dlist = []
678679
for i in range(n):
680+
identifier = str(i)
679681
if i > 0:
680682
time.sleep(self.delay)
681-
el = self.launchers[i] = self.launcher_class(
683+
el = self.launchers[identifier] = self.launcher_class(
682684
work_dir=self.work_dir,
683685
parent=self,
684686
log=self.log,
685687
profile_dir=self.profile_dir,
686688
cluster_id=self.cluster_id,
687-
identifier=i,
689+
identifier=identifier,
688690
output_file=os.path.join(
689691
self.profile_dir,
690692
"log",

ipyparallel/tests/test_cluster.py

+20
Original file line numberDiff line numberDiff line change
@@ -307,3 +307,23 @@ async def test_default_from_file(Cluster):
307307
assert cluster2.cluster_file == cluster.cluster_file
308308
with await cluster.connect_client() as rc:
309309
assert len(rc) == 1
310+
311+
312+
async def test_cluster_manager_notice_stop(Cluster):
313+
cm = cluster.ClusterManager()
314+
cm.load_clusters()
315+
c = Cluster(n=1)
316+
key = cm._cluster_key(c)
317+
assert key not in cm.clusters
318+
319+
await c.start_cluster()
320+
cm.load_clusters()
321+
assert key in cm.clusters
322+
c_copy = cm.clusters[key]
323+
324+
await c.stop_cluster()
325+
# give it a moment to notice
326+
time.sleep(5)
327+
# refresh list, cleans out stopped clusters
328+
cm.load_clusters()
329+
assert key not in cm.clusters

lab/src/clusters.tsx

+3
Original file line numberDiff line numberDiff line change
@@ -717,6 +717,9 @@ function ClusterListingItem(props: IClusterListingItemProps) {
717717
let cluster_state = "Stopped";
718718
if (cluster.controller) {
719719
cluster_state = cluster.controller.state.state;
720+
if (cluster_state == "after") {
721+
cluster_state = "Stopped";
722+
}
720723
}
721724

722725
// stop action is 'delete' for already-stopped clusters

0 commit comments

Comments
 (0)