Skip to content

Commit b461a74

Browse files
committedJan 8, 2022
Fix resource handling in CI without GPU
1 parent aaacc17 commit b461a74

File tree

2 files changed

+87
-37
lines changed

2 files changed

+87
-37
lines changed
 

‎fluid/fluid_executor.py

+87-32
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import random
88
import time
99
import traceback
10+
import warnings
1011
from contextlib import contextmanager
1112
from typing import TYPE_CHECKING, NamedTuple
1213

@@ -66,11 +67,20 @@ class FluidExecutor(TrialExecutor):
6667
def __init__(self, **kwargs):
6768
super().__init__(queue_trials=True) # type: ignore
6869

70+
# whether in testing environment without GPU
71+
self._fake_gpus = False
72+
6973
# resources
7074
self._avail_resources = Resources(cpu=0, gpu=0)
7175
self._committed_resources = Resources(cpu=0, gpu=0)
7276
self._resources_initialized = False
7377
self._last_resource_refresh = float("-inf")
78+
# list of trials that has resources committed
79+
# this is usually those trials in jobs_running,
80+
# but a trial may be only in _trials_running but not in jobs_running,
81+
# because fetch_result was called on it.
82+
# This is maintained solely by _commit_resources/_return_resources
83+
self._trials_running: List[Trial] = set()
7484

7585
# make sure our own GPU resources are created first in the cluster
7686
create_custom_gpu_res()
@@ -169,6 +179,9 @@ def _fluid(self, meta: TrialGroupMeta):
169179
self._dump_groups()
170180
# set of trials to consider
171181
A = {trial.trial_id for trial in self._trial_group(meta.grp)}
182+
logger.debug(
183+
f"_fluid: meta.perf.trials_missing_info={meta.perf.trials_missing_info} meta.trials={meta.trials}, meta.grp={meta.grp}, trial_groups={self.trial_groups}, A={A}"
184+
)
172185
# assignment of resources
173186
W: Dict[str, Resources] = {}
174187
# compute new idle resources if every trials in this group were stopped
@@ -209,8 +222,8 @@ def _fluid(self, meta: TrialGroupMeta):
209222
# \frac{1}{c}),
210223
# d
211224
# )$$
212-
c = 1 / 2 # TODO: calc c
213-
d = 4 # TODO: calc d
225+
c = 1 / 2
226+
d = 4
214227
w = np.minimum(
215228
np.maximum(np.floor(H1 * np.size(H1) / np.sum(H1)), 1 / c), d
216229
)
@@ -224,7 +237,7 @@ def _fluid(self, meta: TrialGroupMeta):
224237

225238
def _ensure_W(self, W: Dict[str, Resources], meta: TrialGroupMeta):
226239
"""Adjust group resources given in W"""
227-
logger.debug(f"ensure_W: meta.trials={meta.trials}")
240+
logger.debug(f"ensure_W: W={W} meta.trials={meta.trials}")
228241
# stop any trials with 0 res
229242
# this has to be done first to free up resources for others to use
230243
for trial_id, res in W.items():
@@ -233,31 +246,37 @@ def _ensure_W(self, W: Dict[str, Resources], meta: TrialGroupMeta):
233246
# add to paused, then ensure_stop, we do not change trial's status which is visible outside
234247
running = self._find_running(trial)
235248
if running is not None:
249+
# don't call pause_trial, which will trigger another fluid reschedule
236250
self.jobs_paused[running.in_flight_future] = running
237-
self._ensure_stop(running.trial)
238-
else:
239-
trial.resources = res
240-
self.start_trial(trial)
251+
self._ensure_stop(running.trial)
252+
trial.resources = res
253+
# add to pending
254+
self.start_trial(trial)
241255
# adjust any trials with different res, including any not already running
242256
for trial_id, res in W.items():
243257
# use trial group to map trial_id to trial
244258
trial = self.trial_groups[trial_id].trial
245259

246260
if res.cpu_total() + res.gpu_total() == 0:
261+
# already handled in the loop above
247262
continue
248263

249-
running = self._find_running(trial)
250-
if running is not None and (
251-
# trial.resources != res
264+
if (
265+
# current_res != res
252266
Resources.subtract(trial.resources, res).is_nonnegative()
253267
!= Resources.subtract(res, trial.resources).is_nonnegative()
254268
):
255-
self.jobs_paused[running.in_flight_future] = running
256-
self._ensure_stop(running.trial)
269+
running = self._find_running(trial)
270+
if running is not None:
271+
# don't call pause_trial, which will trigger another fluid reschedule
272+
self.jobs_paused[running.in_flight_future] = running
257273

258-
# construct PendingJob and use _kickoff to start the trial
259-
pending = PendingJob(trial, None, True)
260-
self._kickoff(pending, res)
274+
self._ensure_stop(trial)
275+
276+
# at this point, the job is always stopped but not in the pending queue,
277+
# because fluid clears the pending queue.
278+
trial.resources = res
279+
self._kickoff(PendingJob(trial, None, True), res)
261280

262281
def _find_group(self, trial: Trial) -> TrialGroupMeta:
263282
return self.trial_group_meta[self.trial_groups[trial.trial_id].group]
@@ -280,6 +299,9 @@ def _find_running(self, trial: Trial) -> Optional[RunningJob]:
280299
for _, job in self.jobs_running.items():
281300
if job.trial == trial:
282301
return job
302+
logger.debug(
303+
f"Cloud not find running trial: {trial}, currently running ones are {[job for _, job in self.jobs_running.items()]}"
304+
)
283305

284306
def _find_pending(self, trial: Trial) -> Optional[PendingJob]:
285307
for job in self.jobs_pending:
@@ -296,7 +318,7 @@ def _setup_remote_runner(
296318

297319
cls = ray.remote(
298320
num_cpus=res.cpu,
299-
num_gpus=res.gpu,
321+
num_gpus=0 if self._fake_gpus else res.gpu,
300322
memory=res.memory,
301323
object_store_memory=res.object_store_memory,
302324
resources=res.custom_resources,
@@ -335,12 +357,11 @@ def _kickoff(self, pending: PendingJob, res: Resources) -> Optional[RunningJob]:
335357
May return None if failed to start
336358
"""
337359
trial = pending.trial
338-
self._commit_resources(res)
339-
340360
# this is needed for the Trainer to setup distributed training
341361
# TODO: figure what config key is also needed to set resource info
342362
trial.resources = res
343363

364+
self._commit_resources(trial)
344365
try:
345366
reuse_allowed = pending.checkpoint is not None or trial.has_checkpoint()
346367
runner = self._setup_remote_runner(trial, res, reuse_allowed)
@@ -382,6 +403,7 @@ def _kickoff(self, pending: PendingJob, res: Resources) -> Optional[RunningJob]:
382403
stop_logger=True,
383404
# NOTE that we don't return the resources, since they may have been lost.
384405
release_resources=False,
406+
update_status=True,
385407
)
386408

387409
def _ensure_train(self, trial: Trial) -> RunningJob:
@@ -395,31 +417,40 @@ def _ensure_train(self, trial: Trial) -> RunningJob:
395417
fut = _LocalWrapper(fut)
396418
running = RunningJob(trial, fut)
397419
self.jobs_running[fut] = running
420+
logger.debug(f"Set trial to running: {trial}, jobs_running={self.jobs_running}")
398421
return running
399422

400423
def _ensure_stop(
401-
self, trial, error=False, error_msg="", stop_logger=True, release_resources=True
424+
self,
425+
trial,
426+
error=False,
427+
error_msg="",
428+
stop_logger=True,
429+
release_resources=True,
430+
update_status=False,
402431
):
403432
"""Stops the trial and its logger
404433
Handles any error
405434
"""
435+
logger.debug(f"_ensure_stop: trial.resources={trial.resources}")
406436
if stop_logger:
407437
trial.close_logger()
408438

409439
prior_status = trial.status
410-
self.set_status(trial, Trial.ERROR if error else Trial.TERMINATED)
411440
trial.set_location(Location())
441+
if update_status:
442+
self.set_status(trial, Trial.ERROR if error else Trial.TERMINATED)
412443

413444
# remove from running
414445
in_flight = [j for _, j in self.jobs_running.items() if j.trial == trial]
415446
for j in in_flight:
416447
self.jobs_running.pop(j.in_flight_future)
417-
if release_resources:
418-
logger.debug("Trial %s: Returning resources.", trial)
419-
self._return_resources(trial.resources)
420448
if in_flight:
421449
if prior_status not in [Trial.RUNNING, Trial.ERROR]:
422450
assert False, "trial status invalid"
451+
# release resources
452+
if release_resources:
453+
self._return_resources(trial)
423454

424455
# remove from trial group
425456
# del self.trial_groups[trial.trial_id]
@@ -451,7 +482,7 @@ def start_trial(self, trial, checkpoint=None, train=True):
451482
def stop_trial(self, trial, error=False, error_msg=None, stop_logger=True):
452483
"""Add to to-stop queue and reschedule"""
453484
logger.debug("stop_trial %s", trial)
454-
self._ensure_stop(trial, error, error_msg, stop_logger)
485+
self._ensure_stop(trial, error, error_msg, stop_logger, update_status=True)
455486
meta = self._find_group(trial)
456487
self._fluid(meta)
457488

@@ -540,6 +571,13 @@ def get_next_failed_trial(self) -> Optional[Trial]:
540571
return None
541572

542573
def fetch_result(self, trial):
574+
"""
575+
Note that this will remove the trial from running queue,
576+
so actions must be taken later to either continue_training/stop/pause,
577+
to maintain consistent system state.
578+
579+
This is usually called from the runner, knowning the the future for this trial is ready.
580+
"""
543581
running_job = self._find_running(trial)
544582
assert running_job, "Trial was not running"
545583
self.jobs_running.pop(running_job.in_flight_future)
@@ -675,11 +713,6 @@ def export_trial_if_needed(self, trial: Trial):
675713
def cleanup(self):
676714
self._trial_cleanup.cleanup(partial=False)
677715

678-
def has_gpus(self):
679-
if not self._resources_initialized:
680-
self._update_avail_resources()
681-
return self._avail_resources.gpu > 0
682-
683716
def on_step_begin(self, trial_runner):
684717
"""Before step() called, update the available resources."""
685718
self._update_avail_resources()
@@ -722,6 +755,16 @@ def _update_avail_resources(self, num_retries=5):
722755
)
723756
custom_resources = resources
724757

758+
if num_gpus == 0:
759+
warnings.warn(
760+
"No GPU resources found, assuming local test, using CPU resources instead"
761+
)
762+
# local test
763+
num_gpus = num_cpus
764+
self._fake_gpus = True
765+
else:
766+
self._fake_gpus = False
767+
725768
avail_resources = Resources(
726769
int(num_cpus),
727770
int(num_gpus),
@@ -742,7 +785,10 @@ def _update_avail_resources(self, num_retries=5):
742785
def idle_resources(self) -> Resources:
743786
return Resources.subtract(self._avail_resources, self._committed_resources)
744787

745-
def _commit_resources(self, resources):
788+
def _commit_resources(self, trial: Trial):
789+
resources = trial.resources
790+
self._trials_running.add(trial)
791+
746792
committed = self._committed_resources
747793
all_keys = set(resources.custom_resources).union(
748794
set(committed.custom_resources)
@@ -759,8 +805,15 @@ def _commit_resources(self, resources):
759805
committed.object_store_memory + resources.object_store_memory_total(),
760806
custom_resources=custom_resources,
761807
)
808+
logger.debug(f"Committed res={resources} -> {self._committed_resources}")
809+
810+
def _return_resources(self, trial: Trial):
811+
if trial not in self._trials_running:
812+
return
813+
logger.debug("Trial %s: Returning resources.", trial)
814+
self._trials_running.remove(trial)
815+
resources = trial.resources
762816

763-
def _return_resources(self, resources):
764817
committed = self._committed_resources
765818

766819
all_keys = set(resources.custom_resources).union(
@@ -778,7 +831,9 @@ def _return_resources(self, resources):
778831

779832
assert (
780833
self._committed_resources.is_nonnegative()
781-
), "Resource invalid: {}".format(resources)
834+
), "Resource invalid: {} - {} = {}".format(
835+
committed, resources, self._committed_resources
836+
)
782837

783838
def on_no_available_trials(self, trial_runner):
784839
"""This is called when we get all trial from a batch from the search algo"""

‎fluid/trainer.py

-5
Original file line numberDiff line numberDiff line change
@@ -187,11 +187,6 @@ def data_creator(config):
187187
188188
"""
189189

190-
# TODO: Implement autoscaling. If num_workers=-1, the trainer will use as
191-
# many resources as available. Upon each train call, TorchTrainer will
192-
# query the Ray global state for total available resources and resize
193-
# its remote workers to consume all available resources.
194-
195190
def __init__(
196191
self,
197192
*,

0 commit comments

Comments
 (0)
Please sign in to comment.