Skip to content

Commit

Permalink
Merge pull request #91 from ArturAkh/main
Browse files Browse the repository at this point in the history
Several improvements of gbasf2 handling

Following options of using gbasf2 batch processing are improved/added:

* possibility to add input datafiles with `gbasf2_input_datafiles` option, which will be downloaded from SE's in addition. This is useful in case the sandbox files exceed 10 MB.
*  improved rescheduling: instead of performing it for each single failed job separately, perform it at once. Keeping track of n_retries is still maintained in the implementation of this pull request.
*  improved downloading of datasets: in case of failed downloads only the ones which are failed, are downloaded, based on a collection of LFNs from created from `gb2_ds_get` stdout.
*  fix of RuntimeError ---> RuntimeWorking conversion: first argument of `warnings.warn` should be a string. Otherwise, getting a uncatched TypeError, followed by a PipeError of luigi.

Edit 19.04.2021:

* added an improved handling of the `JobStatus` for `Done` jobs, since in some (rare) cases, `JobStatus` is set to `Done`, while `ApplicationStatus` is not `Done` (in particular, has an Upload error for output file).
* Implemented suggested minor code changes
  • Loading branch information
meliache authored Apr 27, 2021
2 parents e1160fe + c2ff35a commit df77eab
Show file tree
Hide file tree
Showing 16 changed files with 347 additions and 30 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.6.2
current_version = 0.6.3
commit = True
tag = True

Expand Down
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
- id: check-json
- id: check-toml
- id: end-of-file-fixer
exclude: ^tests/batch/_gbasf2_project_download_stdouts/
- id: trailing-whitespace
- id: check-added-large-files
- id: check-symlinks
Expand Down
2 changes: 1 addition & 1 deletion b2luigi/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Task scheduling and batch running for basf2 jobs made simple"""
__version__ = "0.6.2"
__version__ = "0.6.3"

from luigi import *
from luigi.util import inherits, copies
Expand Down
111 changes: 88 additions & 23 deletions b2luigi/batch/processes/gbasf2.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,9 @@ def output(self):
The following optional settings correspond to the equally named ``gbasf`` command line options
(without the ``gbasf_`` prefix) that you can set to customize your gbasf2 project:
``gbasf2_noscout``,
``gbasf2_additional_files``,
``gbasf2_input_datafiles``,
``gbasf2_n_repition_job``,
``gbasf2_force_submission``,
``gbasf2_cputime``,
Expand Down Expand Up @@ -259,6 +261,14 @@ def get_job_status(self):
for _, job_info in job_status_dict.items():
n_jobs_by_status[job_info["Status"]] += 1

# recheck the status more closely, and correct
# reason: sometimes 'Status' marked as 'Done',
# while 'ApplicationStatus' is not 'Done'
for _, job_info in job_status_dict.items():
if job_info["Status"] == "Done" and (job_info["ApplicationStatus"] != "Done"):
n_jobs_by_status["Done"] -= 1
n_jobs_by_status["Failed"] += 1

# print summary of jobs in project if setting is set and job status changed
if (get_setting("gbasf2_print_status_updates", default=True, task=self.task) and
n_jobs_by_status != self._n_jobs_by_status):
Expand Down Expand Up @@ -295,7 +305,7 @@ def get_job_status(self):
# RuntimeError might occur when download of output dataset was not complete. This is
# frequent, so we want to catch that error and just marking the job as failed
except RuntimeError as err:
warnings.warn(err, RuntimeWarning)
warnings.warn(repr(err), RuntimeWarning)
return JobStatus.aborted

return JobStatus.successful
Expand All @@ -317,7 +327,8 @@ def _on_failure_action(self):
"""
job_status_dict = get_gbasf2_project_job_status_dict(self.gbasf2_project_name, dirac_user=self.dirac_user)
failed_job_dict = {job_id: job_info for job_id, job_info in job_status_dict.items()
if job_info["Status"] == "Failed"}
if job_info["Status"] == "Failed" or
(job_info["Status"] == "Done" and job_info["ApplicationStatus"] != "Done")}
n_failed = len(failed_job_dict)
print(f"{n_failed} failed jobs:\n{failed_job_dict}")
if get_setting("gbasf2_download_logs", default=True, task=self.task):
Expand All @@ -328,31 +339,43 @@ def _reschedule_failed_jobs(self):
Tries to reschedule failed jobs in the project if ``self.max_retries`` has not been reached
and returns ``True`` if rescheduling has been successful.
"""
jobs_to_be_rescheduled = []
jobs_hitting_max_n_retries = []
job_status_dict = get_gbasf2_project_job_status_dict(
self.gbasf2_project_name, dirac_user=self.dirac_user)

for job_id, job_info in job_status_dict.items():
if job_info["Status"] == "Failed":
if self.n_retries_by_job[job_id] >= self.max_retries:
warnings.warn(
f"Reached maximum number of rescheduling tries ({self.max_retries}) for job {job_id}.",
RuntimeWarning
)
return False
self._reschedule_job(job_id)
self.n_retries_by_job[job_id] += 1
with open(self.retries_file_path, "w") as retries_file:
json.dump(self.n_retries_by_job, retries_file)
if job_info["Status"] == "Failed" or (job_info["Status"] == "Done" and job_info["ApplicationStatus"] != "Done"):
if self.n_retries_by_job[job_id] < self.max_retries:
self.n_retries_by_job[job_id] += 1
jobs_to_be_rescheduled.append(job_id)
else:
jobs_hitting_max_n_retries.append(job_id)

if jobs_to_be_rescheduled:
self._reschedule_jobs(jobs_to_be_rescheduled)
with open(self.retries_file_path, "w") as retries_file:
json.dump(self.n_retries_by_job, retries_file)

if jobs_hitting_max_n_retries:
warnings.warn(
f"Reached maximum number of rescheduling tries ({self.max_retries}) for following jobs:\n\t" +
"\n\t".join(str(j) for j in jobs_hitting_max_n_retries) + "\n",
RuntimeWarning
)
return False

return True

def _reschedule_job(self, job_id):
def _reschedule_jobs(self, job_ids):
"""
Reschedule job if the number of retries for it is below ``self.max_retries``
Reschedule chosen list of jobs.
"""
n_retries = self.n_retries_by_job[job_id]
print(f"Rescheduling job {job_id} (retry no. {n_retries + 1}).")
if self.n_retries_by_job[job_id] < self.max_retries:
reschedule_command = shlex.split(f"gb2_job_reschedule --jobid {job_id} --force")
run_with_gbasf2(reschedule_command)
print("Rescheduling jobs:")
print("\t" + "\n\t".join(f"{job_id} ({self.n_retries_by_job[job_id]} retries)" for job_id in job_ids))

reschedule_command = shlex.split(f"gb2_job_reschedule --jobid {' '.join(job_ids)} --force")
run_with_gbasf2(reschedule_command)

def start_job(self):
"""
Expand Down Expand Up @@ -410,6 +433,10 @@ def _build_gbasf2_submit_command(self):
gbasf2_command_str = (f"gbasf2 {self.wrapper_file_path} -f {' '.join(gbasf2_input_sandbox_files)} " +
f"-p {self.gbasf2_project_name} -s {gbasf2_release} ")

gbasf2_noscout = get_setting("gbasf2_noscout", default=False, task=self.task)
if gbasf2_noscout:
gbasf2_command_str += " --noscout "

gbasf2_input_dataset = get_setting("gbasf2_input_dataset", default=False, task=self.task)
gbasf2_input_dslist = get_setting("gbasf2_input_dslist", default=False, task=self.task)

Expand All @@ -429,6 +456,10 @@ def _build_gbasf2_submit_command(self):
if gbasf2_n_repition_jobs is not False:
gbasf2_command_str += f" --repetition {gbasf2_n_repition_jobs} "

gbasf2_input_datafiles = get_setting("gbasf2_input_datafiles", default=[], task=self.task)
if gbasf2_input_datafiles:
gbasf2_command_str += f" --input_datafiles {' '.join(gbasf2_input_datafiles)}"

# now add some additional optional options to the gbasf2 job submission string

# whether to ask user for confirmation before submitting job
Expand Down Expand Up @@ -570,6 +601,15 @@ def _local_gb2_dataset_is_complete(self, output_file_name: str, check_temp_dir:
)
return False

def _failed_files_from_dataset_download(self, stdout):
"""
Parse stdout from gb2_ds_get dataset download command to extract LFN's of failed file downloads.
"""
failed_files = stdout.split('Failed files:')[-1].\
split("Files with duplicated jobID, not downloaded:")[0].strip().split('\n')
failed_files = [line for line in failed_files if len(line.strip()) > 0]
return failed_files

def _download_dataset(self):
"""
Download the task outputs from the gbasf2 project dataset.
Expand Down Expand Up @@ -598,12 +638,36 @@ def _download_dataset(self):
tmp_output_dir_path = f"{output_dir_path}.partial"
os.makedirs(tmp_output_dir_path, exist_ok=True)

ds_get_command = shlex.split(f"gb2_ds_get --force {dataset_query_string}")
print("Downloading dataset with command ", " ".join(ds_get_command))
# Need a file to repeat download for FAILED ones only
monitoring_failed_downloads_file = os.path.join(tmp_output_dir_path, "failed_files.txt")

# In case of first download, this file does not exist
if not os.path.isfile(monitoring_failed_downloads_file):

ds_get_command = shlex.split(f"gb2_ds_get --force {dataset_query_string}")
print("Downloading dataset with command ", " ".join(ds_get_command))

# Any further time is based on the list of files from failed downloads
else:
ds_get_command = shlex.split(f"gb2_ds_get --force --input_dslist {monitoring_failed_downloads_file}")
print("Downloading remaining files from dataset with command ", " ".join(ds_get_command))

stdout = run_with_gbasf2(ds_get_command, cwd=tmp_output_dir_path, capture_output=True).stdout
print(stdout)
if "No file found" in stdout:
raise RuntimeError(f"No output data for gbasf2 project {self.gbasf2_project_name} found.")

failed_files = self._failed_files_from_dataset_download(stdout)
if failed_files:
with open(monitoring_failed_downloads_file, 'w') as ffs:
ffs.write("\n".join(failed_files))
else:
try:
os.remove(monitoring_failed_downloads_file)
except OSError as e:
if e.errno != errno.ENOENT: # errno.ENOENT = no such file or directory
raise # re-raise exception if a different error occurred

tmp_output_dir = os.path.join(tmp_output_dir_path, self.gbasf2_project_name, 'sub00')
if not self._local_gb2_dataset_is_complete(output_file_name, check_temp_dir=True, verbose=True):
raise RuntimeError(
Expand Down Expand Up @@ -678,7 +742,8 @@ def exists(self):
if check_project_exists(self.project_name, dirac_user=self.dirac_user):
# if there's data named after that project on the grid, ensure there are no jobs writig to it
project_status_dict = get_gbasf2_project_job_status_dict(self.project_name, self.dirac_user)
all_jobs_done = all(job_info["Status"] == "Done" for job_info in project_status_dict.values())
all_jobs_done = all(job_info["Status"] == "Done" and
job_info["ApplicationStatus"] == "Done" for job_info in project_status_dict.values())
if not all_jobs_done:
return False
return True
Expand Down
2 changes: 1 addition & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
# The short X.Y version
version = ''
# The full version, including alpha/beta/rc tags
release = '0.6.2'
release = '0.6.3'


# -- General configuration ---------------------------------------------------
Expand Down
15 changes: 15 additions & 0 deletions tests/batch/_gbasf2_project_download_stdouts/all_failed.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@

Download 3 files from SE
Trying to download srm://srm-storage-element:8443/srm/managerv2?SFN=/pnfs/to/output/sub00/output_000001_job1234567890_00.root to /local/dir/sub00/output_000001_job1234567890_00.root
Trying to download srm://srm-storage-element:8443/srm/managerv2?SFN=/pnfs/to/output/sub00/output_000002_job1234567891_00.root to /local/dir/sub00/output_000002_job1234567891_00.root
Trying to download srm://srm-storage-element:8443/srm/managerv2?SFN=/pnfs/to/output/sub00/output_000003_job1234567892_00.root to /local/dir/sub00/output_000003_job1234567892_00.root

Successfully downloaded files:


Failed files:
/output/sub00/output_000001_job1234567890_00.root
/output/sub00/output_000002_job1234567891_00.root
/output/sub00/output_000003_job1234567892_00.root


Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@

Download 3 files from SE
Trying to download srm://srm-storage-element:8443/srm/managerv2?SFN=/pnfs/to/output/sub00/output_000001_job1234567890_04.root to /local/dir/sub00/output_000001_job1234567890_04.root
Trying to download srm://srm-storage-element:8443/srm/managerv2?SFN=/pnfs/to/output/sub00/output_000002_job1234567891_00.root to /local/dir/sub00/output_000002_job1234567891_00.root
Trying to download srm://srm-storage-element:8443/srm/managerv2?SFN=/pnfs/to/output/sub00/output_000003_job1234567892_00.root to /local/dir/sub00/output_000003_job1234567892_00.root

Successfully downloaded files:


Failed files:
/output/sub00/output_000001_job1234567890_04.root
/output/sub00/output_000002_job1234567891_00.root
/output/sub00/output_000003_job1234567892_00.root


Files with duplicated jobID, not downloaded:
/output/sub00/output_000001_job1234567890_00.root
/output/sub00/output_000001_job1234567890_01.root
/output/sub00/output_000001_job1234567890_02.root
/output/sub00/output_000001_job1234567890_03.root
(See https://confluence.desy.de/display/BI/GBasf2+FAQ#GBasf2FAQ-OutputfileswithduplicatedJobID)


12 changes: 12 additions & 0 deletions tests/batch/_gbasf2_project_download_stdouts/all_successful.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/local/dir/sub00 already exists

Download 1 files from SE
Trying to download srm://srm-storage-element:8443/srm/managerv2?SFN=/pnfs/to/output/sub00/output_000001_job1234567890_00.root to /local/dir/sub00/output_000001_job1234567890_00.root

Successfully downloaded files:
/output/sub00/output_000001_job1234567890_00.root in /ceph/akhmet/test_fei_gbasf2_example_input/sub00/output_000001_job1234567890_00.root


Failed files:


Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@

Download 1 files from SE
Trying to download srm://srm-storage-element:8443/srm/managerv2?SFN=/pnfs/to/output/sub00/output_000001_job1234567890_04.root to /local/dir/sub00/output_000001_job1234567890_04.root

Successfully downloaded files:
/output/sub00/output_000001_job1234567890_04.root in /local/dir/sub00/output_000001_job1234567890_04.root


Failed files:


Files with duplicated jobID, not downloaded:
/output/sub00/output_000001_job1234567890_00.root
/output/sub00/output_000001_job1234567890_01.root
/output/sub00/output_000001_job1234567890_02.root
/output/sub00/output_000001_job1234567890_03.root
(See https://confluence.desy.de/display/BI/GBasf2+FAQ#GBasf2FAQ-OutputfileswithduplicatedJobID)



Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@

Download 3 files from SE
Trying to download srm://srm-storage-element:8443/srm/managerv2?SFN=/pnfs/to/output/sub00/output_000001_job1234567890_00.root to /local/dir/sub00/output_000001_job1234567890_00.root
Trying to download srm://srm-storage-element:8443/srm/managerv2?SFN=/pnfs/to/output/sub00/output_000002_job1234567891_00.root to /local/dir/sub00/output_000002_job1234567891_00.root
Trying to download srm://srm-storage-element:8443/srm/managerv2?SFN=/pnfs/to/output/sub00/output_000003_job1234567892_00.root to /local/dir/sub00/output_000003_job1234567892_00.root

Successfully downloaded files:
/output/sub00/output_000001_job1234567890_00.root in /local/dir/sub00/output_000001_job1234567890_00.root


Failed files:
/output/sub00/output_000002_job1234567891_00.root
/output/sub00/output_000003_job1234567892_00.root


Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@

Download 3 files from SE
Trying to download srm://srm-storage-element:8443/srm/managerv2?SFN=/pnfs/to/output/sub00/output_000001_job1234567890_04.root to /local/dir/sub00/output_000001_job1234567890_04.root
Trying to download srm://srm-storage-element:8443/srm/managerv2?SFN=/pnfs/to/output/sub00/output_000002_job1234567891_00.root to /local/dir/sub00/output_000002_job1234567891_00.root
Trying to download srm://srm-storage-element:8443/srm/managerv2?SFN=/pnfs/to/output/sub00/output_000003_job1234567892_00.root to /local/dir/sub00/output_000003_job1234567892_00.root

Successfully downloaded files:
/output/sub00/output_000001_job1234567890_04.root in /local/dir/sub00/output_000001_job1234567890_04.root


Failed files:
/output/sub00/output_000002_job1234567891_00.root
/output/sub00/output_000003_job1234567892_00.root


Files with duplicated jobID, not downloaded:
/output/sub00/output_000001_job1234567890_00.root
/output/sub00/output_000001_job1234567890_01.root
/output/sub00/output_000001_job1234567890_02.root
/output/sub00/output_000001_job1234567890_03.root
(See https://confluence.desy.de/display/BI/GBasf2+FAQ#GBasf2FAQ-OutputfileswithduplicatedJobID)


Large diffs are not rendered by default.

Large diffs are not rendered by default.

Loading

0 comments on commit df77eab

Please sign in to comment.