-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathqsubber
executable file
·174 lines (148 loc) · 5.39 KB
/
qsubber
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Distributed under terms of the Community Research and Academic Programming
# License
'''
==============================================================================
FILE: qsubber
AUTHOR: Peter Combs, [email protected]
ORGANIZATION: Leland Stanford Junior University
LICENSE: CRAPL v0
CREATED: 11 August 2015
LAST MODIFIED: 12 August 2015
DESCRIPTION: Interactively submit the given command to the appropriate PBS
queue.
==============================================================================
'''
from argparse import ArgumentParser, REMAINDER
import subprocess as sp
from sys import stdin, stdout, stderr, exit
from tempfile import NamedTemporaryFile as tmpf
import os
import stat
import signal
from time import sleep
from random import random
def sigterm_handler(signum, frame):
if 'id' in globals():
id = globals()['id']
print("Killing", id)
proc = sp.Popen(['qdel', str(id)],)
proc.wait()
exit(9)
pass
def check_job(id):
try:
proc = sp.Popen(['qstat', str(id)],
stdout=sp.PIPE)
output, _ = proc.communicate()
line = output.decode().splitlines()[-1].split()
#print(line)
return line[-2]
except IndexError:
return 'ERROR'
signal.signal(signal.SIGTERM, sigterm_handler)
signal.siginterrupt(signal.SIGTERM, False)
default_threads = 1
default_commands = default_threads
default_queue = 'default'
parser = ArgumentParser(prefix_chars='-')
parser.add_argument('--sleep-random', type=float, default=3,
metavar='time',
help='sleep for a random period of time up to %(metavar)s before running the job')
parser.add_argument('--job-name', default=None)
parser.add_argument('--resource', '-l', default=None, nargs='*')
parser.add_argument('--local', '-L', action='store_true',
help='Run this command on the local '
'computer (i.e. do not use qsub)')
parser.add_argument('--keep-temporary', '-k', default=None,
metavar='DIR',
help='If flag is given, create a directory to keep the script (if necessary)')
parser.add_argument('--log-base', '-O', default=None)
parser.add_argument('-t', '--threads', default=default_threads,
help=("Over-ride number of threads per node, you should use this "
"if you want less than "+ str(default_threads)+ " to run at "
"once on a single node. Note that you will still be billed for "
"all "+ str(default_threads)+ " cores. This is a good idea if "
"you want a few jobs only to run. e.g. for a job requiring 30G "
"of memory, you will want one job per node, so you can set -t "
"to 1."))
parser.add_argument('-q', '--queue', default=None,
help=''.join(["Queue Choice, Default: ", default_queue]) )
parser.add_argument('-m', '--load-module', default=False,
help="Load a module")
parser.add_argument('--wait', default=False, action='store_true')
parser.add_argument('command', nargs=REMAINDER)
args = parser.parse_args()
#print(args)
call = []
delete = True
if args.keep_temporary:
if not os.path.isdir(args.keep_temporary):
os.makedirs(args.keep_temporary)
delete = False
scriptfile = tmpf(dir=args.keep_temporary, delete=delete)
os.chmod(scriptfile.name, 0x777)
#print( ' '.join(args.command))
scriptfile.write(bytes('#!/usr/bin/bash \n', 'ascii'))
if args.load_module:
scriptfile.write(bytes('\nmodule load STAR\n', 'ascii'))
scriptfile.write(bytes('module load {}\n'.format(args.load_module), 'ascii'))
scriptfile.write(bytes(' '.join(args.command),'ascii'))
scriptfile.write(bytes('\n', 'ascii'))
scriptfile.flush()
if args.local:
call.extend(['bash', scriptfile.name])
out=stdout
else:
call.extend(['qsub' ])
call.extend(['-d', '.'])
if args.job_name:
call.extend(['-N', args.job_name])
if args.resource:
for resource in args.resource:
call.extend(['-l', resource])
if args.queue:
call.extend(['-q', args.queue])
if args.log_base:
call.extend(['-o', args.log_base + '.o.log'])
call.extend(['-e', args.log_base + '.e.log'])
node_string = 'nodes=1:ppn={}'.format(args.threads)
call.extend(['-l', node_string])
call.append('-V')
call.append(scriptfile.name)
out=sp.PIPE
print(' '.join(call))
sleep(random()*args.sleep_random)
popen = sp.Popen(call, stdin=stdin, stdout=out, stderr=stderr)
if args.local:
status = popen.wait()
else:
output, _ = popen.communicate()
if popen.returncode:
for i in range(5):
sleep(2**(i))
popen = sp.Popen(call, stdin=stdin, stdout=out, stderr=stderr)
output, _ = popen.communicate()
if popen.returncode == 0:
break
else:
# Note the for-else construct. Raise exception if after 5 more
# tries we can't submit the job.
raise OSError("Unknown issue with the queue")
id = output.decode().split('.')[0]
print("ID: '''{}'''".format(id))
while check_job(id) != 'C':
try:
sleep(5)
except KeyboardInterrupt:
print("Killing", id)
proc = sp.Popen(['qdel', str(id)],)
proc.wait()
exit(9)
status = 0
sf = open(scriptfile.name)
sf.seek(0)
#print('-'*32, sf.read(), '-'*32, sep='\n')
#print("Finishing with status: {}".format(status))
exit(status)