This repository was archived by the owner on Jan 26, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 541
/
Copy pathsubprocess.py
145 lines (136 loc) · 5.5 KB
/
subprocess.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# Copyright (c) 2017-present, Facebook, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##############################################################################
"""Primitives for running multiple single-GPU jobs in parallel over subranges of
data. These are used for running multi-GPU inference. Subprocesses are used to
avoid the GIL since inference may involve non-trivial amounts of Python code.
"""
# from __future__ import absolute_import
# from __future__ import division
# from __future__ import print_function
# from __future__ import unicode_literals
from io import IOBase
import logging
import os
import subprocess
from six.moves import shlex_quote
from six.moves import cPickle as pickle
import yaml
import numpy as np
import torch
from core.config import cfg
logger = logging.getLogger(__name__)
def process_in_parallel(
tag, total_range_size, binary, output_dir,
load_ckpt, load_detectron, opts=''):
"""Run the specified binary NUM_GPUS times in parallel, each time as a
subprocess that uses one GPU. The binary must accept the command line
arguments `--range {start} {end}` that specify a data processing range.
"""
# Snapshot the current cfg state in order to pass to the inference
# subprocesses
cfg_file = os.path.join(output_dir, '{}_range_config.yaml'.format(tag))
with open(cfg_file, 'w') as f:
yaml.dump(cfg, stream=f)
subprocess_env = os.environ.copy()
processes = []
NUM_GPUS = torch.cuda.device_count()
subinds = np.array_split(range(total_range_size), NUM_GPUS)
# Determine GPUs to use
cuda_visible_devices = os.environ.get('CUDA_VISIBLE_DEVICES')
if cuda_visible_devices:
gpu_inds = list(map(int, cuda_visible_devices.split(',')))
assert -1 not in gpu_inds, \
'Hiding GPU indices using the \'-1\' index is not supported'
else:
gpu_inds = range(NUM_GPUS)
gpu_inds = list(gpu_inds)
# Run the binary in cfg.NUM_GPUS subprocesses
for i, gpu_ind in enumerate(gpu_inds):
start = subinds[i][0]
end = subinds[i][-1] + 1
subprocess_env['CUDA_VISIBLE_DEVICES'] = str(gpu_ind)
cmd = ('python {binary} --range {start} {end} --cfg {cfg_file} --set {opts} '
'--output_dir {output_dir}')
if load_ckpt is not None:
cmd += ' --load_ckpt {load_ckpt}'
elif load_detectron is not None:
cmd += ' --load_detectron {load_detectron}'
cmd = cmd.format(
binary=shlex_quote(binary),
start=int(start),
end=int(end),
cfg_file=shlex_quote(cfg_file),
output_dir=output_dir,
load_ckpt=load_ckpt,
load_detectron=load_detectron,
opts=' '.join([shlex_quote(opt) for opt in opts])
)
logger.info('{} range command {}: {}'.format(tag, i, cmd))
if i == 0:
subprocess_stdout = subprocess.PIPE
else:
filename = os.path.join(
output_dir, '%s_range_%s_%s.stdout' % (tag, start, end)
)
subprocess_stdout = open(filename, 'w')
p = subprocess.Popen(
cmd,
shell=True,
env=subprocess_env,
stdout=subprocess_stdout,
stderr=subprocess.STDOUT,
bufsize=1
)
processes.append((i, p, start, end, subprocess_stdout))
# Log output from inference processes and collate their results
outputs = []
for i, p, start, end, subprocess_stdout in processes:
log_subprocess_output(i, p, output_dir, tag, start, end)
if isinstance(subprocess_stdout, IOBase):
subprocess_stdout.close()
range_file = os.path.join(
output_dir, '%s_range_%s_%s.pkl' % (tag, start, end)
)
range_data = pickle.load(open(range_file, 'rb'))
outputs.append(range_data)
return outputs
def log_subprocess_output(i, p, output_dir, tag, start, end):
"""Capture the output of each subprocess and log it in the parent process.
The first subprocess's output is logged in realtime. The output from the
other subprocesses is buffered and then printed all at once (in order) when
subprocesses finish.
"""
outfile = os.path.join(
output_dir, '%s_range_%s_%s.stdout' % (tag, start, end)
)
logger.info('# ' + '-' * 76 + ' #')
logger.info(
'stdout of subprocess %s with range [%s, %s]' % (i, start + 1, end)
)
logger.info('# ' + '-' * 76 + ' #')
if i == 0:
# Stream the piped stdout from the first subprocess in realtime
with open(outfile, 'w') as f:
for line in iter(p.stdout.readline, b''):
print(line.rstrip().decode('ascii'))
f.write(str(line, encoding='ascii'))
p.stdout.close()
ret = p.wait()
else:
# For subprocesses >= 1, wait and dump their log file
ret = p.wait()
with open(outfile, 'r') as f:
print(''.join(f.readlines()))
assert ret == 0, 'Range subprocess failed (exit code: {})'.format(ret)