Skip to content

Commit bb89fc9

Browse files
committed
Improve task cleanup tests
1 parent 978988d commit bb89fc9

File tree

3 files changed

+220
-191
lines changed

3 files changed

+220
-191
lines changed

tests/test_job.py

+4-191
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,19 @@
11
import os
22
import signal
3-
import subprocess
43
import time
54
from datetime import datetime
65
from os.path import isdir, isfile, join
76
from pathlib import Path
8-
from typing import Callable, Optional
7+
from typing import Optional
98

109
import pytest
1110

1211
from .conftest import HqEnv
1312
from .utils import wait_for_job_state
1413
from .utils.cmd import python
15-
from .utils.io import check_file_contents, read_file
14+
from .utils.io import check_file_contents, read_file, read_task_pid
1615
from .utils.job import default_task_output, list_jobs
17-
from .utils.wait import wait_for_pid_exit, wait_for_worker_state, wait_until
16+
from .utils.wait import wait_until
1817

1918

2019
def test_job_submit(hq_env: HqEnv):
@@ -437,88 +436,6 @@ def test_cancel_all(hq_env: HqEnv):
437436
assert "Job 3 canceled" in r[0]
438437

439438

440-
def test_cancel_terminate_process_children(hq_env: HqEnv):
441-
def cancel(worker_process):
442-
hq_env.command(["job", "cancel", "1"])
443-
wait_for_job_state(hq_env, 1, "CANCELED")
444-
445-
check_child_process_exited(hq_env, cancel)
446-
447-
448-
def test_cancel_send_sigint(hq_env: HqEnv):
449-
hq_env.start_server()
450-
hq_env.start_worker()
451-
452-
hq_env.command(
453-
[
454-
"submit",
455-
"--",
456-
*python(
457-
"""
458-
import sys
459-
import time
460-
import signal
461-
462-
def signal_handler(sig, frame):
463-
print("sigint", flush=True)
464-
sys.exit(0)
465-
466-
signal.signal(signal.SIGINT, signal_handler)
467-
468-
print("ready", flush=True)
469-
time.sleep(3600)
470-
"""
471-
),
472-
]
473-
)
474-
wait_for_job_state(hq_env, 1, "RUNNING")
475-
wait_until(lambda: read_file(default_task_output()).strip() == "ready")
476-
477-
hq_env.command(["job", "cancel", "1"])
478-
wait_for_job_state(hq_env, 1, "CANCELED")
479-
480-
wait_until(lambda: read_file(default_task_output()).splitlines(keepends=False)[1] == "sigint")
481-
482-
483-
def test_cancel_kill_if_sigint_fails(hq_env: HqEnv):
484-
hq_env.start_server()
485-
hq_env.start_worker()
486-
487-
hq_env.command(
488-
[
489-
"submit",
490-
"--",
491-
*python(
492-
"""
493-
import os
494-
import sys
495-
import time
496-
import signal
497-
498-
def signal_handler(sig, frame):
499-
print(os.getpid(), flush=True)
500-
time.sleep(3600)
501-
502-
signal.signal(signal.SIGINT, signal_handler)
503-
504-
print("ready", flush=True)
505-
time.sleep(3600)
506-
"""
507-
),
508-
]
509-
)
510-
wait_for_job_state(hq_env, 1, "RUNNING")
511-
wait_until(lambda: read_file(default_task_output()).strip() == "ready")
512-
513-
hq_env.command(["job", "cancel", "1"])
514-
wait_for_job_state(hq_env, 1, "CANCELED")
515-
516-
wait_until(lambda: len(read_file(default_task_output()).splitlines()) == 2)
517-
518-
pid = int(read_file(default_task_output()).splitlines()[1])
519-
wait_for_pid_exit(pid)
520-
521-
522439
def test_reporting_state_after_worker_lost(hq_env: HqEnv):
523440
hq_env.start_server()
524441
hq_env.start_workers(2, cpus=1)
@@ -1113,71 +1030,6 @@ def test_crashing_job_by_files(hq_env: HqEnv):
11131030
assert not os.path.exists("output.txt")
11141031

11151032

1116-
def test_kill_task_when_worker_dies(hq_env: HqEnv):
1117-
hq_env.start_server()
1118-
hq_env.start_worker()
1119-
1120-
hq_env.command(
1121-
[
1122-
"submit",
1123-
"--",
1124-
*python(
1125-
"""
1126-
import os
1127-
import time
1128-
1129-
print(os.getpid(), flush=True)
1130-
time.sleep(3600)
1131-
"""
1132-
),
1133-
]
1134-
)
1135-
wait_for_job_state(hq_env, 1, "RUNNING")
1136-
1137-
def get_pid():
1138-
pid = read_file(default_task_output()).strip()
1139-
if not pid:
1140-
return None
1141-
return int(pid)
1142-
1143-
pid = wait_until(get_pid)
1144-
1145-
hq_env.kill_worker(1)
1146-
1147-
wait_for_pid_exit(pid)
1148-
1149-
1150-
def test_kill_task_subprocess_when_worker_is_interrupted(hq_env: HqEnv):
1151-
def interrupt_worker(worker_process):
1152-
hq_env.kill_worker(1, signal=signal.SIGINT)
1153-
1154-
check_child_process_exited(hq_env, interrupt_worker)
1155-
1156-
1157-
def test_kill_task_subprocess_when_worker_is_terminated(hq_env: HqEnv):
1158-
def terminate_worker(worker_process):
1159-
hq_env.kill_worker(1, signal=signal.SIGTERM)
1160-
1161-
check_child_process_exited(hq_env, terminate_worker)
1162-
1163-
1164-
@pytest.mark.xfail
1165-
def test_kill_task_subprocess_when_worker_is_killed(hq_env: HqEnv):
1166-
def terminate_worker(worker_process):
1167-
hq_env.kill_worker(1, signal=signal.SIGKILL)
1168-
1169-
check_child_process_exited(hq_env, terminate_worker)
1170-
1171-
1172-
def test_kill_task_subprocess_when_worker_is_stopped(hq_env: HqEnv):
1173-
def stop_worker(worker_process):
1174-
hq_env.command(["worker", "stop", "1"])
1175-
wait_for_worker_state(hq_env, 1, "STOPPED")
1176-
hq_env.check_process_exited(worker_process)
1177-
1178-
check_child_process_exited(hq_env, stop_worker)
1179-
1180-
11811033
def test_kill_task_report_signal(hq_env: HqEnv):
11821034
hq_env.start_server()
11831035
hq_env.start_worker()
@@ -1196,9 +1048,7 @@ def test_kill_task_report_signal(hq_env: HqEnv):
11961048
),
11971049
]
11981050
)
1199-
wait_for_job_state(hq_env, 1, "RUNNING")
1200-
wait_until(lambda: len(read_file(default_task_output()).splitlines()) == 1)
1201-
pid = int(read_file(default_task_output()))
1051+
pid = read_task_pid(hq_env, 1)
12021052
os.kill(pid, signal.SIGKILL)
12031053

12041054
wait_for_job_state(hq_env, 1, "FAILED")
@@ -1218,43 +1068,6 @@ def test_fail_to_start_issue629(hq_env: HqEnv, tmpdir):
12181068
wait_for_job_state(hq_env, 1, "FAILED")
12191069

12201070

1221-
def check_child_process_exited(hq_env: HqEnv, stop_fn: Callable[[subprocess.Popen], None]):
1222-
"""
1223-
Creates a task that spawns a child, and then calls `stop_fn`, which should kill either the task
1224-
or the worker. The function then checks that both the task process and its child have been killed.
1225-
"""
1226-
hq_env.start_server()
1227-
worker_process = hq_env.start_worker()
1228-
1229-
hq_env.command(
1230-
[
1231-
"submit",
1232-
"--",
1233-
*python(
1234-
"""
1235-
import os
1236-
import sys
1237-
import time
1238-
print(os.getpid(), flush=True)
1239-
pid = os.fork()
1240-
if pid > 0:
1241-
print(pid, flush=True)
1242-
time.sleep(3600)
1243-
"""
1244-
),
1245-
]
1246-
)
1247-
wait_for_job_state(hq_env, 1, "RUNNING")
1248-
wait_until(lambda: len(read_file(default_task_output()).splitlines()) == 2)
1249-
pids = [int(pid) for pid in read_file(default_task_output()).splitlines()]
1250-
1251-
stop_fn(worker_process)
1252-
1253-
parent, child = pids
1254-
wait_for_pid_exit(parent)
1255-
wait_for_pid_exit(child)
1256-
1257-
12581071
def test_job_task_ids(hq_env: HqEnv):
12591072
hq_env.start_server()
12601073
hq_env.command(

0 commit comments

Comments
 (0)