-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrun_stages.py
259 lines (224 loc) · 9.75 KB
/
run_stages.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
#!/usr/bin/env python2
import sys
import os
from os.path import join as opj
from copy import copy
import shutil
import datetime
import argparse
import subprocess
import getpass
import socket
from imp import load_source
def check_stage(ist, st, nm, typ):
if nm not in st:
raise IOError('"{0}" parameter missing from stage_list[{1}]'.format(nm, ist))
else:
if not isinstance(st[nm], typ):
raise TypeError('stage_list[{0}] is a {1} and not a {2}?'.format(ist, type(st), repr(typ)))
def makedirsif(d):
if not os.path.isdir(d) and d != '':
os.makedirs(d)
def incname(afile, suffix='_ConflictedCopy', i_start=1):
'''if afile exists, returns afilesuffix<i>, where <i> is the first integer name unused
else, returns afile
This is a duplicate of the function declared in utils--run_stages.py should not import from local files
'''
from os.path import exists, splitext
def incname_suffix(afile, suffix, i):
befext, ext = splitext(afile)
nfile = befext + suffix + str(i) + ext
if exists(nfile):
return incname_suffix(afile, suffix, i + 1)
else:
return nfile
if exists(afile):
return incname_suffix(afile, suffix, i_start)
else:
return afile
def incmove(afile, adir):
'''moves afile to adir
increments conflicting names using incname
afile can be a directory
'''
if not os.path.isdir(adir):
raise TypeError('{adir} is not a directory!'.format(adir=adir))
if not os.path.exists(afile):
raise IOError('{afile} does not exist!'.format(afile=afile))
dest = incname(opj(adir, afile))
makedirsif(os.path.dirname(dest))
shutil.move(afile, dest)
def safecall(acommand):
sc = subprocess.call([acommand], shell=True, executable='/bin/tcsh')
if sc != 0: # should be 0 if `call` worked
with open(GENERAL_LOG, 'a') as f:
f.write("subprocess.call({acomm}) failed. Returned {anum} Goodbye!\n".format(acomm=acommand, anum=sc))
sys.exit()
def capture_stdouterr(log):
import os
import sys
with open(log, 'a') as f:
os.dup2(f.fileno(), sys.stdout.fileno())
os.dup2(f.fileno(), sys.stderr.fileno())
######################################################################
# This is a template of LHCb MC generation
# Michael Wilkinson: Jan 11, 2018
# based on a script by
# Jianchun Wang 01/13/2012
# Updated for 2016 MC by Scott Ely: Aug 30, 2017
######################################################################
# -- basic info -- #
DATE = str(datetime.datetime.now()).replace(' ', '_')
USER = getpass.getuser()
NODE = socket.gethostname()
# -- set or pass job parameters -- #
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('configfile', type=os.path.abspath,
help='')
args = parser.parse_known_args()[0] # abandon unknown args, assumed to be handled by configfile
configfile = args.configfile
conf = load_source('conf', configfile)
SIGNAL_NAME = conf.SIGNAL_NAME
RUN_NUMBER = conf.RUN_NUMBER
RUN_SYS = conf.RUN_SYS
CLEANSTAGES = conf.CLEANSTAGES
CLEANWORK = conf.CLEANWORK
SOME_MISSING = conf.SOME_MISSING
WORK_DIR_EXISTS = conf.WORK_DIR_EXISTS
make_stage_list = conf.make_stage_list
# -- Directories and files -- #
BASE_NAME = '%s_%d' % (SIGNAL_NAME, RUN_NUMBER)
WORK_DIR = '%s/%s/work/%s/%d' % (RUN_SYS, USER, SIGNAL_NAME, RUN_NUMBER)
DATA_DIR = '%s/%s/data/%s/%d' % (RUN_SYS, USER, SIGNAL_NAME, RUN_NUMBER) # because the output dst all have the same name
LOG_DIR = '%s/%s/log/%s/%d' % (RUN_SYS, USER, SIGNAL_NAME, RUN_NUMBER) # because some log output files have the same name
GENERAL_LOG = opj(WORK_DIR, BASE_NAME + '_{0}_general.log'.format(DATE))
# -- set environment parameters missing in Condor -- #
PRE_SCRIPT = 'setenv HOME /home/{USER} && setenv PATH /bin:/usr/bin:/usr/local/bin && setenv LC_ALL C && set MCGEN_DIR = /home/{USER}/lhcbAnal/MCGen && setenv User_release_area /home/{USER}/lhcbAnal && setenv APPCONFIGOPTS /cvmfs/lhcb.cern.ch/lib/lhcb/DBASE/AppConfig/v3r340/options && source /cvmfs/lhcb.cern.ch/group_login.csh'.format(USER=USER)
# -- check, make, and change directories -- #
if os.path.isdir(WORK_DIR) and not WORK_DIR_EXISTS:
raise IOError(WORK_DIR + " exists")
for d in (DATA_DIR, LOG_DIR, WORK_DIR):
if not os.path.isdir(d):
os.makedirs(d)
os.chdir(WORK_DIR) # all references should be relative to the WORK_DIR or absolute
# -- redirect stdout and stderr -- #
capture_stdouterr(GENERAL_LOG)
# -- write parameter values to the log -- #
with open(GENERAL_LOG, 'w') as f:
f.write('''\
====================================================
NODE:\t\t\t{NODE}
START@:\t\t\t{DATE}
SIGNAL_NAME:\t\t{SIGNAL_NAME}
RUN_NUMBER:\t\t{RUN_NUMBER}
RUN_SYS:\t\t{RUN_SYS}
CLEANSTAGES:\t\t{CLEANSTAGES}
CLEANWORK:\t\t{CLEANWORK}
SOME_MISSING:\t\t{SOME_MISSING}
WORK_DIR_EXISTS:\t{WORK_DIR_EXISTS}
make_stage_list:\t{make_stage_list}
====================================================
'''.format(NODE=NODE, DATE=DATE, SIGNAL_NAME=SIGNAL_NAME, RUN_NUMBER=RUN_NUMBER, RUN_SYS=RUN_SYS, CLEANSTAGES=CLEANSTAGES, CLEANWORK=CLEANWORK, SOME_MISSING=SOME_MISSING, WORK_DIR_EXISTS=WORK_DIR_EXISTS, make_stage_list=make_stage_list,))
# -- make stages -- #
stage_list = make_stage_list(USER, BASE_NAME)
# verify stage_list
if not isinstance(stage_list, list):
raise TypeError('stage_list is a {0} not a list?'.format(type(stage_list)))
for istage, stage in enumerate(stage_list):
if not isinstance(stage, dict):
raise TypeError('stage_list[{0}] is a {1} and not a dict?'.format(istage, type(stage)))
to_check = [
('name', str),
('scripts', dict),
('log', str),
('call_string', str),
('to_remove', list),
('required', list),
('data', list),
('run', bool),
('scriptonly', bool),
]
for nm, typ in to_check:
check_stage(istage, stage, nm, typ)
# -- loop stages -- #
for istage, stage in enumerate(stage_list):
# is this stage selected to run?
if not stage['run']:
with open(GENERAL_LOG, 'a') as f:
f.write('{name} stage not selected to run. Next stage...\n'.format(name=stage['name']))
continue
DATE = str(datetime.datetime.now())
with open(GENERAL_LOG, 'a') as f:
f.write('''\
====================================================
Start {name} @ {DATE}
====================================================
'''.format(name=stage['name'], DATE=DATE))
# create stage scripts
with open(GENERAL_LOG, 'a') as f:
f.write("making {name} scripts\n".format(name=stage['name']))
for scriptname, scriptcontent in stage['scripts'].iteritems():
makedirsif(os.path.dirname(scriptname)) # create any needed directories
with open(scriptname, 'w') as stagef:
stagef.write(scriptcontent)
if stage['scriptonly']:
f.write("scriptonly option used. Will not start this stage.\n")
continue
f.write('starting {name} stage\n'.format(name=stage['name']))
# check required files exist
pass_req_check = True
for required in stage['required']:
required = required.rstrip('/') # if a directory, leaving the slash at the end could cause confusion
if os.path.exists(required):
continue
elif os.path.exists(opj(DATA_DIR, required)): # if the file is in DATA_DIR, move it to WORK_DIR
if os.path.exists(required):
pass_req_check = False
raise Exception('Something went wrong. Found {r} in {d}, but {r} is already in {w}!'.format(r=required, d=DATA_DIR, w=WORK_DIR))
makedirsif(os.path.dirname(required))
shutil.move(opj(DATA_DIR, required), required)
# if the file still doesn't exist:
if not os.path.exists(required):
pass_req_check = False
if SOME_MISSING:
with open(GENERAL_LOG, 'a') as f:
f.write('\n{r} not found for stage {s}, but SOME_MISSING option used. Will not run this stage.\n'.format(r=required, s=stage['name']))
else:
raise Exception('{r} not found for stage {s}'.format(r=required, s=stage['name']))
# skip this stage if not pass_req_check
if not pass_req_check:
if not SOME_MISSING:
raise Exception('Not all requirements were found for stage {s}!'.format(s=stage['name']))
continue
# redirect stdout and stderr
capture_stdouterr(opj(WORK_DIR, stage['log']))
# run stage
safecall(PRE_SCRIPT + ' && ' + stage['call_string'])
# redirect stdout and stderr
capture_stdouterr(GENERAL_LOG)
if CLEANSTAGES:
for torm in stage['to_remove']:
if os.path.isfile(torm):
os.remove(torm)
else:
with open(GENERAL_LOG, 'a') as f:
f.write("file {TORM} does not exist or is not a file; will not remove\n".format(TORM=torm))
DATE = str(datetime.datetime.now())
with open(GENERAL_LOG, 'a') as f:
f.write('''\
====================================================
Finish {name} @ {DATE}
====================================================
'''.format(name=stage['name'], DATE=DATE))
# -- mv files to final location and cleanup -- #
if CLEANWORK:
with open(GENERAL_LOG, 'a') as f:
f.write('contents of {WORK_DIR}:\n'.format(WORK_DIR=WORK_DIR))
f.write(str(os.listdir(WORK_DIR)))
for d in set([y for x in stage_list for y in x['required'] + x['data'] if os.path.exists(y)]):
incmove(d, DATA_DIR)
for f in os.listdir(WORK_DIR): # move everything else to logdir
incmove(f, LOG_DIR)
os.chdir('../')
shutil.rmtree(WORK_DIR)