-
Notifications
You must be signed in to change notification settings - Fork 6
/
dSQBatch.py
executable file
·136 lines (116 loc) · 3.72 KB
/
dSQBatch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#!/bin/env python
from __future__ import print_function
from datetime import datetime
from functools import partial
from os import path
from subprocess import Popen
import argparse
import os
import platform
import signal
import sys
import time
__version__ = 1.05
def forward_signal_to_child(pid, signum, frame):
print("[dSQ]: ", pid, signum, frame)
os.kill(pid, signum)
def exec_job(job_str):
process = Popen(job_str, shell=True)
signal.signal(signal.SIGCONT, partial(forward_signal_to_child, process.pid))
signal.signal(signal.SIGTERM, partial(forward_signal_to_child, process.pid))
return_code = process.wait()
return return_code
desc = """Dead Simple Queue Batch v{}
https://github.com/ycrc/dSQ
A wrapper script to run job arrays from job files, where each line in the plain-text file is a self-contained job. This script is usually called from a batch script generated by dsq.
""".format(
__version__
)
def parse_args():
parser = argparse.ArgumentParser(
description=desc,
usage="%(prog)s --job-file jobfile.txt [--suppress-stats-file | --status-dir dir/ ]",
formatter_class=argparse.RawTextHelpFormatter,
prog=path.basename(sys.argv[0]),
)
parser.add_argument(
"-v", "--version", action="version", version="%(prog)s {}".format(__version__)
)
parser.add_argument(
"--job-file",
nargs=1,
help="Job file, one job per line (not your job submission script).",
)
parser.add_argument(
"--suppress-stats-file",
action="store_true",
help="Don't save job stats to job_jobid_status.tsv",
)
parser.add_argument(
"--status-dir",
metavar="dir",
nargs=1,
default=".",
help="Directory to save the job_jobid_status.tsv file to. Defaults to working directory.",
)
return parser.parse_args()
def run_job(args):
jid = int(os.environ.get("SLURM_ARRAY_JOB_ID"))
tid = int(os.environ.get("SLURM_ARRAY_TASK_ID"))
# slurm calls individual job array indices "tasks"
hostname = platform.node()
# use task_id to get my job out of job_file
mycmd = ""
with open(args.job_file[0], "r") as tf:
for i, l in enumerate(tf):
if i == tid:
mycmd = l.strip()
break
# run job and track its execution time
if mycmd == "":
st = datetime.now()
mycmd = "# could not find zero-indexed line {} in job file {}".format(
tid, job_file
)
print(mycmd, file=sys.stderr)
ret = 1
et = datetime.now()
else:
st = datetime.now()
ret = exec_job(mycmd)
et = datetime.now()
if not args.suppress_stats_file:
# set up job stats
out_cols = [
"Array_Task_ID",
"Exit_Code",
"Hostname",
"T_Start",
"T_End",
"T_Elapsed",
"Task",
]
time_fmt = "%Y-%m-%d %H:%M:%S"
time_start = st.strftime(time_fmt)
time_end = et.strftime(time_fmt)
time_elapsed = (et - st).total_seconds()
out_dict = dict(
zip(
out_cols,
[tid, ret, hostname, time_start, time_end, time_elapsed, mycmd],
)
)
# append status file with job stats
with open(
path.join(args.status_dir[0], "job_{}_status.tsv".format(jid)), "a"
) as out_status:
print(
"{Array_Task_ID}\t{Exit_Code}\t{Hostname}\t{T_Start}\t{T_End}\t{T_Elapsed:.02f}\t{Task}".format(
**out_dict
),
file=out_status,
)
sys.exit(ret)
if __name__ == "__main__":
args = parse_args()
run_job(args)