From ef223896340c61f0862068f556cd827cc97168ba Mon Sep 17 00:00:00 2001 From: Eric Neilsen Date: Tue, 18 Mar 2025 08:31:32 -0700 Subject: [PATCH 1/2] move opsim simulation archive submodule out of rubin_scheduler into rubin_sim --- batch/compile_prenight_metadata_cache.sh | 34 + batch/run_prenight_sims.sh | 69 ++ docs/api.rst | 4 +- docs/archive.rst | 276 +++++++ docs/sim-archive-api.rst | 12 + docs/user-guide.rst | 5 +- pyproject.toml | 5 +- rubin_sim/sim_archive/__init__.py | 11 + rubin_sim/sim_archive/make_snapshot.py | 188 +++++ rubin_sim/sim_archive/prenight.py | 386 ++++++++++ rubin_sim/sim_archive/sim_archive.py | 933 +++++++++++++++++++++++ tests/sim_archive/test_make_snapshot.py | 24 + tests/sim_archive/test_prenight.py | 28 + tests/sim_archive/test_sim_archive.py | 139 ++++ 14 files changed, 2109 insertions(+), 5 deletions(-) create mode 100755 batch/compile_prenight_metadata_cache.sh create mode 100755 batch/run_prenight_sims.sh create mode 100644 docs/archive.rst create mode 100644 docs/sim-archive-api.rst create mode 100644 rubin_sim/sim_archive/__init__.py create mode 100644 rubin_sim/sim_archive/make_snapshot.py create mode 100644 rubin_sim/sim_archive/prenight.py create mode 100644 rubin_sim/sim_archive/sim_archive.py create mode 100644 tests/sim_archive/test_make_snapshot.py create mode 100644 tests/sim_archive/test_prenight.py create mode 100644 tests/sim_archive/test_sim_archive.py diff --git a/batch/compile_prenight_metadata_cache.sh b/batch/compile_prenight_metadata_cache.sh new file mode 100755 index 000000000..7c0b265ea --- /dev/null +++ b/batch/compile_prenight_metadata_cache.sh @@ -0,0 +1,34 @@ +#!/usr/bin/env bash +#SBATCH --account=rubin:developers # Account name +#SBATCH --job-name=auxtel_prenight_daily # Job name +#SBATCH --output=/sdf/data/rubin/shared/scheduler/prenight/sbatch/compile_prenight_metadata_cache.out # Output file (stdout) +#SBATCH --error=/sdf/data/rubin/shared/scheduler/prenight/sbatch/compile_prenight_metadata_cache.err # Error file (stderr) +#SBATCH --partition=milano # Partition (queue) names +#SBATCH --nodes=1 # Number of nodes +#SBATCH --ntasks=1 # Number of tasks run in parallel +#SBATCH --cpus-per-task=1 # Number of CPUs per task +#SBATCH --mem=16G # Requested memory +#SBATCH --time=1:00:00 # Wall time (hh:mm:ss) + +echo "******** START of compile_prenight_metadata_cache.sh **********" + +# Source global definitions +if [ -f /etc/bashrc ]; then + . /etc/bashrc +fi + +# SLAC S3DF - source all files under ~/.profile.d +if [[ -e ~/.profile.d && -n "$(ls -A ~/.profile.d/)" ]]; then + source <(cat $(find -L ~/.profile.d -name '*.conf')) +fi + +source /sdf/group/rubin/sw/w_latest/loadLSST.sh +conda activate /sdf/data/rubin/shared/scheduler/envs/prenight +export AWS_PROFILE=prenight +WORK_DIR=$(date '+/sdf/data/rubin/shared/scheduler/prenight/work/compile_prenight_metadata_cache/%Y-%m-%dT%H%M%S' --utc) +echo "Working in $WORK_DIR" +mkdir ${WORK_DIR} +cd ${WORK_DIR} +printenv > env.out +compile_sim_archive_metadata_resource --append +echo "******* END of compile_prenight_metadata_cache.sh *********" diff --git a/batch/run_prenight_sims.sh b/batch/run_prenight_sims.sh new file mode 100755 index 000000000..68ad5462f --- /dev/null +++ b/batch/run_prenight_sims.sh @@ -0,0 +1,69 @@ +#!/usr/bin/env bash +#SBATCH --account=rubin:developers # Account name +#SBATCH --job-name=auxtel_prenight_daily # Job name +#SBATCH --output=/sdf/data/rubin/shared/scheduler/prenight/sbatch/run_prenight_sims.out # Output file (stdout) +#SBATCH --error=/sdf/data/rubin/shared/scheduler/prenight/sbatch/run_prenight_sims.err # Error file (stderr) +#SBATCH --partition=milano # Partition (queue) names +#SBATCH --nodes=1 # Number of nodes +#SBATCH --ntasks=1 # Number of tasks run in parallel +#SBATCH --cpus-per-task=1 # Number of CPUs per task +#SBATCH --mem=16G # Requested memory +#SBATCH --time=1:00:00 # Wall time (hh:mm:ss) + +echo "******** START of run_prenight_sims.sh **********" + +# Source global definitions +if [ -f /etc/bashrc ]; then + . /etc/bashrc +fi + +# SLAC S3DF - source all files under ~/.profile.d +if [[ -e ~/.profile.d && -n "$(ls -A ~/.profile.d/)" ]]; then + source <(cat $(find -L ~/.profile.d -name '*.conf')) +fi + +date --iso=s + +source /sdf/group/rubin/sw/w_latest/loadLSST.sh +conda activate /sdf/data/rubin/shared/scheduler/envs/prenight + +set -o xtrace + +export AWS_PROFILE=prenight +WORK_DIR=$(date '+/sdf/data/rubin/shared/scheduler/prenight/work/run_prenight_sims/%Y-%m-%dT%H%M%S' --utc) +echo "Working in $WORK_DIR" +mkdir ${WORK_DIR} +cd ${WORK_DIR} + +# Get ts_ocs_config +TS_CONFIG_OCS_VERSION=$(obs_version_at_time ts_config_ocs) +curl --location --output ts_config_ocs.zip https://github.com/lsst-ts/ts_config_ocs/archive/${TS_CONFIG_OCS_VERSION}.zip +unzip ts_config_ocs.zip +mv $(find . -maxdepth 1 -type d -name ts_config_ocs\*) ts_config_ocs + +# Install required python packages +PACKAGE_DIR=$(readlink -f ${WORK_DIR}/packages) +mkdir ${PACKAGE_DIR} + +# Get the scheduler version from the EFD and install it. +RUBIN_SCHEDULER_TAG=v$(obs_version_at_time rubin_scheduler) +pip install --no-deps --target=${PACKAGE_DIR} git+https://github.com/lsst/rubin_scheduler.git@${RUBIN_SCHEDULER_TAG} + +# Cannot get ts_fbs_utils from the EFD, so just guess the highest semantic version tag in the repo. +TS_FBS_UTILS_TAG=$(curl -s https://api.github.com/repos/lsst-ts/ts_fbs_utils/tags | jq -r '.[].name' | egrep '^v[0-9]+.[0-9]+.[0-9]+$' | sort -V | tail -1) +pip install --no-deps --target=${PACKAGE_DIR} git+https://github.com/lsst-ts/ts_fbs_utils.git@${TS_FBS_UTILS_TAG} + +# Get the scheduler configuration script +SCHEDULER_CONFIG_SCRIPT=$(scheduler_config_at_time latiss) + +# Get the path to prenight_sim as provided by the current environment, +# so we do not accidentally run one from the adjusted PATH below. +PRENIGHT_SIM=$(which prenight_sim) + +export PYTHONPATH=${PACKAGE_DIR}:${PYTHONPATH} +export PATH=${PACKAGE_DIR}/bin:${PATH} +printenv > env.out +date --iso=s +time ${PRENIGHT_SIM} --scheduler auxtel.pickle.xz --opsim None --script ${SCHEDULER_CONFIG_SCRIPT} +date --iso=s +echo "******* END of run_prenight_sims.sh *********" diff --git a/docs/api.rst b/docs/api.rst index 228b21ef6..730a75909 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -20,4 +20,6 @@ API Self Calibration - Skybrightness \ No newline at end of file + Sim archive + + Skybrightness diff --git a/docs/archive.rst b/docs/archive.rst new file mode 100644 index 000000000..7bc83b3a4 --- /dev/null +++ b/docs/archive.rst @@ -0,0 +1,276 @@ +.. py:currentmodule:: rubin_sim.sim_archive + +.. _archive: + +======================================================== +The Prototype OpSim Archive for ``schedview`` Dashboards +======================================================== + +Introduction +------------ + +Several tools will require an archive that provides access to simulations provided by ``rubin_scheduler``. +For example, the prenight briefing dashboard supplied by ``schedview`` is a tool for visualizing simualtions of a night of observing, and it requires access to such simulations. +There will eventually be other dashboards that will also need to read such tables for visualization, as well as tables describing visits completed by the actual instruments. +Users of these dashboards will need to use them to select which data sets are to be visualised: the dashboard code needs to provide both user interface elements that let the user select the desired table of visits, and also actually load the table itself. +The dashboard servers run within containers on kubernetes-based infrastructure, which operate best when persistent data is stored outside the containers themselves. +Therefore, these dashboards require an external (to the container) resource that supports searching for available simulations, and access to the data itself. + +This archive design is intended primarily as a prototype, something to experiment with for a better informed development of requiriments. +So, flexibility and speed of implementation have been prioritized, with the intention that there be a significant rofactoring (or even outright replacement) when requirements have been more thoroughly developed. + +Design +------ + +The archive itself is a directory tree. +The URI of an archive is the URI of the root of the directory tree as supported by the ``lsst.resources`` package. +Each simulation in the archive has its own directory in this directory tree, named according to the ISO-8601 date on which a simulation was added to the archive, and a simple incrementing index separating different simulations added on the same day, such that the format for the directory for a specific simulation is:: + + ${ARCHIVE_URI}/{ISO_DATE}/{ADDITION_INDEX} + +For example, if the URI of the archive is:: + + file:///my_data/sim_archive + +then the URI of the third simulation added on June 21, 2030 will be:: + + file:///my_data/sim_archive/2030-06-21/3 + +Each simulation directory contains a metadata yaml file named ``sim_metadata.yaml``. +In the above example, the URI for this metadata file would be:: + + file:///my_data/sim_archive/2030-06-21/3/sim_metadata.yaml + +A minimal ``sim_metadata.yaml`` file specifies the name of the sqlite3 database file with the visits. +For example, if the URI for the visit database in the above example is ``file:///my_data/sim_archive/2030-06-21/3/opsim.db``, then the minimal content of ``sim_metadata.yaml`` would be:: + + files: + observations: + name: 'opsim.db' + +All other data in the metadata file is optional, but additional metadata will be required if the archived simulation is to be used for some use cases. +For example, if ``schedview``'s ``prenight`` dashboard is to be able to load the reward data, it must be able to locate the reward data from the metadata file, so that the metadata file needs to look something like this:: + + files: + observations: + name: 'opsim.db' + rewards: + name: 'rewards.h5' + +Clients of the archive will also need to search available simulations for those meeting relevant criteria. +For example, the ``prenight`` dashboard will seach for simulations the include a desired night, in which case the range of nights covered by the simulation must be included. + +A sample metadata file that includes an early guess at what the ``prenight`` dashboard will use looks like this:: + + files: + observations: + name: opsim.db + rewards: + name: rewards.h5 + label: Notebook test on 2024-01-04 16:49:44.299 + simulated_dates: + first: '2025-05-05' + last: '2025-05-05' + +In the above: + +``label`` + Simulations will appear in drop-down section widgets in dashdoards such as the pre-night dashboard. + The ``label`` element in the determines how the simulation will appear in the dropdown. + In other applications, this element may also be used as plot annotations or column or row headings. + +``simulation_dates`` + Shows the range of dates covered by the simulation. + When the user specifies a night, the ``prenight`` dashboard will restrict the offered to those that cover the specified date. + + +Finally, a number of other elements may be included for debugging purposes. +A full file might look something like this:: + + files: + environment: + md5: 4381d7cc82049141c70216121e39f56d + name: environment.txt + notebook: + md5: 6b75c1dd8c4a3b83797c873f3270cc04 + name: notebook.ipynb + observations: + md5: 1909d1afaf744ee50bdcf1a9625826ab + name: opsim.db + pypi: + md5: 9c86ea9b4e7aa40d3e206fad1a59ea31 + name: pypi.json + rewards: + md5: 6d3c9d3e0dd7764ed60312e459586e1b + name: rewards.h5 + scheduler: + md5: 5e88dfee657e6283dbc7a343f048db92 + name: scheduler.pickle.xz + statistics: + md5: c515ba27d83bdbfa9e65cdefff2d9d75 + name: obs_stats.txt + label: Notebook test on 2024-01-04 16:49:44.299 + simulated_dates: + first: '2025-05-05' + last: '2025-05-05' + scheduler_version: 1.0.1.dev25+gba1ca4d.d20240102 + sim_runner_kwargs: + mjd_start: 60800.9565967191 + record_rewards: true + survey_length: 0.5155218997970223 + tags: + - notebook + - devel + host: neilsen-nb + username: neilsen + +This example has a number of additional elements useful for debugging, and which pehaps might be useful for future applictions, but which are not used (or planned to be used) by the prenight dashboard. + +``files/*`` + A number of other types of files associated with specific simulations may be included. + These may be useful in future applications, or for debugging only. + See below for descriptions of the extra types of files in this example. +``files/${TYPE}/md5`` + Checksums for various files. + These can be useful both for checking for corruption, and for determining whether two simulations are identical without needing to download either. +``scheduler_version`` + The version of the scheduler used to produce the simualtions. +``sim_runner_kwargs`` + The arguments to the execution of ``sim_runner`` used to run the simulation. +``tags`` + A list of ad-hoc keywords. + For example, simulations used to test a specific jira issue may all have the name of the issue as a keyword. + Simulations used to support a give tech note may have the name of the tech note. +``host`` + The hostname on which the simulation was run. +``username`` + The username of the user who ran the simulation. + +Optional (for debugging or speculative future uses only) file types listed above are: + +``environment`` + The conda environment specification for the environment used to run the simulation. +``notebook`` + The notebook used to create the simulation, for example as created using the ``%notebook`` jupyter magic. +``pypy`` + The ``pypy`` package list of the environment used to run the simulation. + If the simulation is run using only conda-installed packages, this will be redundant with ``environment``. +``scheduler`` + A python pickle of the scheduler, in the state as of the start of the simulation. +``statistics`` + Basic statistics for the visit database. + +Metadata cache +-------------- + +Reading each ``sim_metadata.yaml`` individually when loading metadata for a large number of simulations can be slow. +Therefore, metadata for sets of simulations can be compiled into a ``compiled_metadata_cache.h5`` file. +This file stores four tables in `hdf5` format: ``simulations``, ``files``, ``kwargs``, and ``tags``. +Each of these tables is indexed by the URI of a simulation. + +The ``files`` table contains one column for each key in the ``files`` dictionary in the yaml metadata file for the simulation, providing the metadata needed to reconstruct this element of the dictionary. + +The ``kwargs`` table contains one column for each key in the ``sim_runner_kwargs`` dictionary in the yaml metadata file for the simulation, providing the metadata needed to reconstruct this element of the dictionary. +If a keyword argument is not set, an `numpy.nan` value is stored in the table. + +The ``tags`` table contains one column: ``tag``, and contains one row for each tag in each simulation. + +The ``simulations`` table contains one column for every other keyword found in the metadata yaml files. +If a keyword argument is not set, an `numpy.nan` value is stored in the table. + +The ``compile_sim_archive_metadata_resource`` command in ``rubin_sim`` maintains the ``compiled_metadata_cache.h5`` file in an archive. +By default, it reads every ``sim_metadata.yaml`` file in the archive and builds a corresponding cache hdf5 file from scratch. +If called with an ``--append`` flag, it reads an existing metadata cache file, reads ``sim_metadata.yaml`` files for simulations more recently added than the last file in the existing cache, appends them to the previous results from the cache, and writes the result to the cache. +The ``append`` flag therefore speeds up the update considerably, but does not update the cache for any changes to previously added simulations (including deletions). + +The ``compile_sim_archive_metadata_resource`` needs to be run to update the cache. +Normall, a cron job will execute this command routinely to keep the cache reasonably up to date. +Because the tools read the metadata yaml files for any simulations added after the most recent cache update, it will function correctly even if the cache is out of date (but slower). + +Automatic archiving of generated data +------------------------------------- + +The ``rubin_sim`` package provides a tool to combine running a simulation and adding the results to an archive, including any metadata that can be derived automatically. +The ``rubin_sim.sim_archive.drive_sim`` function is wrapper around ``rubin_sim.scheduler.sim_runner`` that incorporates this metadata collection and the creation of the entry in an archive. +It takes all of the same arguments that ``sim_runner`` does, and passes them directly to ``sim_runner``. +In addition, it takes a few arguments that specify the archive into which it is to be added (``archive_uri``), the label to be included in the metadata (``label``), and the code used to run the simulation (ethier ``script`` or ``notebook``). +Details are available in the ``drive_sim`` docstring. + +For example, if this code is put into a file and run as a script, it will run the specificed simulation and add it to the specified archive:: + + from astropy.time import Time + + from rubin_sim.scheduler.example import example_scheduler + from rubin_sim.scheduler.model_observatory import ModelObservatory + from rubin_sim.sim_archive import drive_sim + + sim_mjd_start = Time("2025-05-05").mjd + 0.5 + # The start date of the simualtion. + # Offset by 0.5 to avoid starting late when the MJD rollover occurs during or + # after twilight. See dayObs in SITCOMTN-32: https://sitcomtn-032.lsst.io/ . + + sim_length = 1.0 + # Passed to sum_runner, in units of days. + + archive_uri = "file:///sdf/data/rubin/user/neilsen/data/test_sim_archive/" + # The URI of the root of the archive. The trailing "/" is required. + + observatory = ModelObservatory() + scheduler = example_scheduler() + scheduler.keep_rewards = True + + results = drive_sim( + observatory=observatory, + scheduler=scheduler, + archive_uri=archive_uri, + label=f"Example simulation started at {Time.now().iso}.", + script=__file__, + tags=["example"], + mjd_start=sim_mjd_start, + survey_length=sim_length, + record_rewards=True, + ) + +The result looks like this:: + + bash$ ls /sdf/data/rubin/user/neilsen/data/test_sim_archive/2024-01-18/1 + environment.txt example_archived_sim_driver.py obs_stats.txt opsim.db pypi.json rewards.h5 scheduler.pickle.xz sim_metadata.yaml + bash$ cat /sdf/data/rubin/user/neilsen/data/test_sim_archive/2024-01-18/1/sim_metadata.yaml + files: + environment: + md5: 33f94ddf8975f9641a1f524fd22e362e + name: environment.txt + observations: + md5: 8b1ee9a604a88d2708d2bfd924ac3cd9 + name: opsim.db + pypi: + md5: 51a8deee5018f59f20d5741fd1a64778 + name: pypi.json + rewards: + md5: 10e4ab9397382bfa108fa21354da3526 + name: rewards.h5 + scheduler: + md5: 35713860dc9ba7a425500f63939d0e02 + name: scheduler.pickle.xz + script: + md5: b4a476a4fd1231ea1ca44149784f1c3f + name: example_archived_sim_driver.py + statistics: + md5: 7c6a6af38aff3ce4145146e35f929b47 + name: obs_stats.txt + host: sdfrome002.sdf.slac.stanford.edu + label: Example simulation started at 2024-01-18 15:46:27.758. + scheduler_version: 1.0.1.dev25+gba1ca4d.d20240102 + sim_runner_kwargs: + mjd_start: 60800.5 + record_rewards: true + survey_length: 1.0 + simulated_dates: + first: '2025-05-04' + last: '2025-05-04' + tags: + - example + username: neilsen + +Alternately, the simulation can be run from a ``jupyter`` notebook similarly, excepet that instead of saving the script that generated the simulation, a notebook with the cells of notebook that created the simulation up to the cell that runs the simulation can be stored instead. +An example can be found in the ``archive/sim_and_archive.ipynb`` in the `rubin_sim_notebook github repository `_. diff --git a/docs/sim-archive-api.rst b/docs/sim-archive-api.rst new file mode 100644 index 000000000..9c55a596c --- /dev/null +++ b/docs/sim-archive-api.rst @@ -0,0 +1,12 @@ +.. py:currentmodule:: rubin_sim.sim_archive + +.. _sim-archive-api: + +=============== +sim_archive API +=============== + +.. automodule:: rubin_sim.sim_archive + :imported-members: + :members: + :show-inheritance: diff --git a/docs/user-guide.rst b/docs/user-guide.rst index d1d726cfd..47bec5da7 100644 --- a/docs/user-guide.rst +++ b/docs/user-guide.rst @@ -21,7 +21,6 @@ User Guide Self Calibration - Skybrightness - + Simulation archive - + Skybrightness diff --git a/pyproject.toml b/pyproject.toml index 4f7a14416..5886061b9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -72,7 +72,10 @@ run_selfcal_metric = "rubin_sim.maf.run_selfcal_metric:run_selfcal_metric" make_fbs_tracking_db = "rubin_sim.maf.make_fbs_tracking_db:make_fbs_tracking_db" show_maf = "rubin_sim.maf.show_maf:show_maf" make_lsst_obs = "rubin_sim.moving_objects.make_lsst_obs:make_lsst_obs" - +archive_sim = "rubin_sim.sim_archive:make_sim_archive_cli" +compile_sim_archive_metadata_resource = "rubin_sim.sim_archive.sim_archive:compile_sim_archive_metadata_cli" +prenight_sim = "rubin_sim.sim_archive:prenight_sim_cli" +scheduler_snapshot = "rubin_sim.sim_archive:make_scheduler_snapshot_cli" [tool.setuptools.dynamic] version = { attr = "setuptools_scm.get_version" } diff --git a/rubin_sim/sim_archive/__init__.py b/rubin_sim/sim_archive/__init__.py new file mode 100644 index 000000000..40426bd53 --- /dev/null +++ b/rubin_sim/sim_archive/__init__.py @@ -0,0 +1,11 @@ +import importlib.util +import logging + +from .make_snapshot import * + +HAVE_LSST_RESOURCES = importlib.util.find_spec("lsst") and importlib.util.find_spec("lsst.resources") +if HAVE_LSST_RESOURCES: + from .prenight import * + from .sim_archive import * +else: + logging.error("rubin_sim.sim_archive requires lsst.resources.") diff --git a/rubin_sim/sim_archive/make_snapshot.py b/rubin_sim/sim_archive/make_snapshot.py new file mode 100644 index 000000000..5a1b20afe --- /dev/null +++ b/rubin_sim/sim_archive/make_snapshot.py @@ -0,0 +1,188 @@ +__all__ = [ + "get_scheduler", + "save_scheduler", + "add_make_scheduler_snapshot_args", + "make_scheduler_snapshot_cli", + "get_scheduler_instance_from_repo", +] + +import argparse +import bz2 +import gzip +import importlib.util +import lzma +import pickle +import sys +import types +import typing +from pathlib import Path +from tempfile import TemporaryDirectory + +from git import Repo +from rubin_scheduler.scheduler.example import example_scheduler +from rubin_scheduler.scheduler.schedulers.core_scheduler import CoreScheduler + + +def get_scheduler_instance_from_path(config_script_path: str | Path) -> CoreScheduler: + """Generate a CoreScheduler according to a configuration in a file. + + Parameters + ---------- + config_script_path : `str` + The configuration script path (relative to the repository root). + + Returns + ------- + scheduler : `CoreScheduler` + An instance of the Rubin Observatory FBS. + + Raises + ------ + ValueError + If the config file is invalid, or has invalid content. + """ + + config_module_name: str = "scheduler_config" + config_module_spec = importlib.util.spec_from_file_location(config_module_name, config_script_path) + if config_module_spec is None or config_module_spec.loader is None: + # Make type checking happy + raise ValueError(f"Cannot load config file {config_script_path}") + + config_module: types.ModuleType = importlib.util.module_from_spec(config_module_spec) + sys.modules[config_module_name] = config_module + config_module_spec.loader.exec_module(config_module) + + scheduler: CoreScheduler = config_module.get_scheduler()[1] + return scheduler + + +def get_scheduler_instance_from_repo( + config_repo: str, + config_script: str, + config_branch: str = "main", +) -> CoreScheduler: + """Generate a CoreScheduler according to a configuration in git. + + Parameters + ---------- + config_repo : `str` + The git repository with the configuration. + config_script : `str` + The configuration script path (relative to the repository root). + config_branch : `str`, optional + The branch of the repository to use, by default "main" + + Returns + ------- + scheduler : `CoreScheduler` + An instance of the Rubin Observatory FBS. + + Raises + ------ + ValueError + If the config file is invalid, or has invalid content. + """ + + with TemporaryDirectory() as local_config_repo_parent: + repo: Repo = Repo.clone_from(config_repo, local_config_repo_parent, branch=config_branch) + full_config_script_path: Path = Path(repo.working_dir).joinpath(config_script) + scheduler = get_scheduler_instance_from_path(full_config_script_path) + + return scheduler + + +def get_scheduler( + config_repo: str | None, + config_script: str | None, + config_branch: str = "main", +) -> CoreScheduler: + """Generate a CoreScheduler according to a configuration in git. + + Parameters + ---------- + config_repo : `str` + The git repository with the configuration. + config_script : `str` + The configuration script path (relative to the repository root). + config_branch : `str`, optional + The branch of the repository to use, by default "main" + + Returns + ------- + scheduler : `CoreScheduler` + An instance of the Rubin Observatory FBS. + + Raises + ------ + ValueError + If the config file is invalid, or has invalid content. + """ + if config_repo is not None and len(config_repo) > 0: + if config_script is None: + raise ValueError("If the config repo is set, the script must be as well.") + scheduler = get_scheduler_instance_from_repo( + config_repo=config_repo, config_script=config_script, config_branch=config_branch + ) + elif config_script is not None: + scheduler = get_scheduler_instance_from_path(config_script) + else: + example_scheduler_result = example_scheduler() + if isinstance(example_scheduler_result, CoreScheduler): + scheduler = example_scheduler_result + else: + # It might return a observatory, scheduler, observations tuple + # instead. + scheduler = example_scheduler_result[1] + + return scheduler + + +def save_scheduler(scheduler: CoreScheduler, file_name: str) -> None: + """Save an instances of the scheduler in a pickle file, + compressed according to its extension. + + Parameters + ---------- + scheduler : `CoreScheduler` + The scheduler to save. + file_name : `str` + The file in which to save the schedulers. + """ + opener: typing.Callable = open + + if file_name.endswith(".bz2"): + opener = bz2.open + elif file_name.endswith(".xz"): + opener = lzma.open + elif file_name.endswith(".gz"): + opener = gzip.open + + with opener(file_name, "wb") as pio: + pickle.dump(scheduler, pio) + + +def add_make_scheduler_snapshot_args(parser: argparse.ArgumentParser) -> None: + """Add arguments needed for saving a scheduler to an argument parser.""" + parser.add_argument("--scheduler_fname", type=str, help="The file in which to save the scheduler.") + parser.add_argument( + "--repo", type=str, default=None, help="The repository from which to load the configuration." + ) + parser.add_argument( + "--script", type=str, default=None, help="The path to the config script (relative to the repo root)." + ) + parser.add_argument( + "--branch", type=str, default="main", help="The branch of the repo from which to get the script" + ) + + +def make_scheduler_snapshot_cli(cli_args: list = []) -> None: + parser = argparse.ArgumentParser(description="Create a scheduler pickle") + add_make_scheduler_snapshot_args(parser) + args: argparse.Namespace = parser.parse_args() if len(cli_args) == 0 else parser.parse_args(cli_args) + + scheduler: CoreScheduler = get_scheduler(args.repo, args.config, args.branch) + save_scheduler(scheduler, args.scheduler_fname) + + +if __name__ == "__main__": + make_scheduler_snapshot_cli() diff --git a/rubin_sim/sim_archive/prenight.py b/rubin_sim/sim_archive/prenight.py new file mode 100644 index 000000000..f91bc33dd --- /dev/null +++ b/rubin_sim/sim_archive/prenight.py @@ -0,0 +1,386 @@ +"""Tools for running the set of simulations used for a pre-night briefing.""" + +__all__ = [ + "AnomalousOverheadFunc", + "run_prenights", + "prenight_sim_cli", +] + +import argparse +import bz2 +import gzip +import io +import logging +import lzma +import os +import pickle +import typing +from datetime import datetime +from functools import partial +from tempfile import TemporaryFile +from typing import Callable, Optional, Sequence +from warnings import warn + +import numpy as np +import numpy.typing as npt +from astropy.time import Time +from matplotlib.pylab import Generator +from rubin_scheduler.scheduler.example import example_scheduler +from rubin_scheduler.scheduler.model_observatory import ModelObservatory +from rubin_scheduler.scheduler.schedulers.core_scheduler import CoreScheduler +from rubin_scheduler.scheduler.utils import SchemaConverter +from rubin_scheduler.site_models import Almanac + +from rubin_sim.sim_archive.sim_archive import drive_sim + +from .make_snapshot import add_make_scheduler_snapshot_args, get_scheduler, save_scheduler + +try: + from rubin_sim.data import get_baseline # type: ignore +except ModuleNotFoundError: + get_baseline = partial(warn, "Cannot find default baseline because rubin_sim is not installed.") + +DEFAULT_ARCHIVE_URI = "s3://rubin-scheduler-prenight/opsim/" + + +def _run_sim( + sim_start_mjd: float, + archive_uri: str, + scheduler_io: io.BufferedRandom, + label: str, + tags: Sequence[str] = tuple(), + sim_duration: float = 2, + anomalous_overhead_func: Optional[Callable] = None, + opsim_metadata: dict | None = None, +) -> None: + logging.info(f"Running {label}.") + + scheduler_io.seek(0) + scheduler = pickle.load(scheduler_io) + + observatory = ModelObservatory( + nside=scheduler.nside, + cloud_data="ideal", + seeing_data="ideal", + downtimes="ideal", + ) + + drive_sim( + observatory=observatory, + scheduler=scheduler, + archive_uri=archive_uri, + label=label, + tags=tags, + script=__file__, + sim_start_mjd=sim_start_mjd, + sim_duration=sim_duration, + record_rewards=True, + anomalous_overhead_func=anomalous_overhead_func, + opsim_metadata=opsim_metadata, + ) + + +def _mjd_now() -> float: + # Used instead of just Time.now().mjd to make type checker happy. + mjd = Time.now().mjd + assert isinstance(mjd, float) + return mjd + + +def _iso8601_now() -> str: + # Used instead of just Time.now().mjd to make type checker happy. + now_iso = Time.now().iso + assert isinstance(now_iso, str) + return now_iso + + +def _create_scheduler_io( + day_obs_mjd: float, + scheduler_fname: Optional[str] = None, + scheduler_instance: Optional[CoreScheduler] = None, + opsim_db: str | None = None, +) -> io.BufferedRandom: + if scheduler_instance is not None: + scheduler = scheduler_instance + elif scheduler_fname is None: + sample_scheduler = example_scheduler() + if not isinstance(sample_scheduler, CoreScheduler): + raise TypeError() + + scheduler = sample_scheduler + + else: + opener: typing.Callable = open + + if scheduler_fname.endswith(".bz2"): + opener = bz2.open + elif scheduler_fname.endswith(".xz"): + opener = lzma.open + elif scheduler_fname.endswith(".gz"): + opener = gzip.open + + with opener(scheduler_fname, "rb") as sched_io: + scheduler = pickle.load(sched_io) + + scheduler.keep_rewards = True + + if opsim_db is not None: + last_preexisting_obs_mjd = day_obs_mjd + 0.5 + obs = SchemaConverter().opsim2obs(opsim_db) + obs = obs[obs["mjd"] < last_preexisting_obs_mjd] + if len(obs) > 0: + scheduler.add_observations_array(obs) + + scheduler_io = TemporaryFile() + pickle.dump(scheduler, scheduler_io) + scheduler_io.seek(0) + return scheduler_io + + +class AnomalousOverheadFunc: + """Callable to return random overhead. + + Parameters + ---------- + seed : `int` + Random number seed. + slew_scale : `float` + The scale for the scatter in the slew offest (seconds). + visit_scale : `float`, optional + The scale for the scatter in the visit overhead offset (seconds). + Defaults to 0.0. + slew_loc : `float`, optional + The location of the scatter in the slew offest (seconds). + Defaults to 0.0. + visit_loc : `float`, optional + The location of the scatter in the visit offset (seconds). + Defaults to 0.0. + """ + + def __init__( + self, + seed: int, + slew_scale: float, + visit_scale: float = 0.0, + slew_loc: float = 0.0, + visit_loc: float = 0.0, + ) -> None: + self.rng: Generator = np.random.default_rng(seed) + self.visit_loc: float = visit_loc + self.visit_scale: float = visit_scale + self.slew_loc: float = slew_loc + self.slew_scale: float = slew_scale + + def __call__(self, visittime: float, slewtime: float) -> float: + """Return a randomized offset for the visit overhead. + + Parameters + ---------- + visittime : `float` + The visit time (seconds). + slewtime : `float` + The slew time (seconds). + + Returns + ------- + offset: `float` + Random offset (seconds). + """ + + slew_overhead: float = slewtime * self.rng.normal(self.slew_loc, self.slew_scale) + + # Slew might be faster that expected, but it will never take negative + # time. + if (slewtime + slew_overhead) < 0: + slew_overhead = 0.0 + + visit_overhead: float = visittime * self.rng.normal(self.slew_loc, self.slew_scale) + # There might be anomalous overhead that makes visits take longer, + # but faster is unlikely. + if visit_overhead < 0: + visit_overhead = 0.0 + + return slew_overhead + visit_overhead + + +def run_prenights( + day_obs_mjd: float, + archive_uri: str, + scheduler_file: Optional[str] = None, + opsim_db: Optional[str] = None, + minutes_delays: tuple[float, ...] = (0, 1, 10, 60, 240), + anomalous_overhead_seeds: tuple[int, ...] = (101, 102, 103, 104, 105), + sim_nights: int = 2, + opsim_metadata: dict | None = None, +) -> None: + """Run the set of scheduler simulations needed to prepare for a night. + + Parameters + ---------- + day_obs_mjd : `float` + The starting MJD. + archive_uri : `str` + The URI of visits completed before this night. + scheduler_file : `str`, optional + File from which to load the scheduler. None defaults to the example + scheduler in rubin_sim, if it is installed. + The default is None. + opsim_db : `str`, optional + The file name of the visit database for visits preceeding the + simulation. + The default is None. + minutes_delays : `tuple` of `float` + Delayed starts to be simulated. + anomalous_overhead_seeds: `tuple` of `int` + Random number seeds to use for anomalous overhead runs. + sim_nights: `int` + Number of nights to simulate. Defaults to 2. + opsim_metadata: `dict` + Extra metadata for the archive + """ + + exec_time: str = _iso8601_now() + scheduler_io: io.BufferedRandom = _create_scheduler_io( + day_obs_mjd, scheduler_fname=scheduler_file, opsim_db=opsim_db + ) + + # Assign args common to all sims for this execution. + run_sim = partial( + _run_sim, archive_uri=archive_uri, scheduler_io=scheduler_io, opsim_metadata=opsim_metadata + ) + + # Find the start of observing for the specified day_obs. + # Almanac.get_sunset_info does not use day_obs, so just index + # Almanac.sunsets for what we want directly. + all_sun_n12_setting: npt.NDArray[np.float_] = Almanac().sunsets["sun_n12_setting"] + before_first_day_obs: npt.NDArray[np.bool_] = all_sun_n12_setting < day_obs_mjd + 0.5 + after_first_day_obs: npt.NDArray[np.bool_] = all_sun_n12_setting > day_obs_mjd + 1.5 + on_first_day_obs: npt.NDArray[np.bool_] = ~(before_first_day_obs | after_first_day_obs) + sim_start_mjd: float = all_sun_n12_setting[on_first_day_obs].item() + + all_sun_n12_rising: npt.NDArray[np.float_] = Almanac().sunsets["sun_n12_rising"] + before_last_day_obs: npt.NDArray[np.bool_] = all_sun_n12_setting < day_obs_mjd + sim_nights + 0.5 + after_last_day_obs: npt.NDArray[np.bool_] = all_sun_n12_setting > day_obs_mjd + sim_nights + 1.5 + on_last_day_obs: npt.NDArray[np.bool_] = ~(before_last_day_obs | after_last_day_obs) + sim_end_mjd: float = all_sun_n12_rising[on_last_day_obs].item() + sim_duration: float = sim_end_mjd - sim_start_mjd + + # Begin with an ideal pure model sim. + completed_run_without_delay: bool = False + if len(minutes_delays) == 0 or (np.array(minutes_delays) == 0).any(): + run_sim( + sim_start_mjd, + label=f"Nominal start and overhead, ideal conditions, run at {exec_time}", + tags=["ideal", "nominal"], + ) + completed_run_without_delay = True + + # Delayed start + for minutes_delay in minutes_delays: + if completed_run_without_delay and minutes_delay == 0: + # Did this already. + continue + + delayed_sim_start_mjd = sim_start_mjd + minutes_delay / (24.0 * 60) + sim_duration = sim_end_mjd - delayed_sim_start_mjd + + run_sim( + delayed_sim_start_mjd, + label=f"Start time delayed by {minutes_delay} minutes," + + f" Nominal slew and visit overhead, ideal conditions, run at {exec_time}", + tags=["ideal", f"delay_{minutes_delay}"], + sim_duration=sim_duration, + ) + + # Run a few different scatters of visit time + anomalous_overhead_scale = 0.1 + for anomalous_overhead_seed in anomalous_overhead_seeds: + anomalous_overhead_func = AnomalousOverheadFunc(anomalous_overhead_seed, anomalous_overhead_scale) + run_sim( + sim_start_mjd, + label=f"Anomalous overhead {anomalous_overhead_seed, anomalous_overhead_scale}," + + f" Nominal start, ideal conditions, run at {exec_time}", + tags=["ideal", "anomalous_overhead"], + anomalous_overhead_func=anomalous_overhead_func, + sim_duration=sim_duration, + ) + + +def _parse_dayobs_to_mjd(dayobs: str | float) -> float: + try: + day_obs_mjd = Time(dayobs).mjd + except ValueError: + try: + day_obs_mjd = Time(datetime.strptime(str(dayobs), "%Y%m%d")) + except ValueError: + day_obs_mjd = Time(dayobs, format="mjd") + + if not isinstance(day_obs_mjd, float): + raise ValueError + + return day_obs_mjd + + +def prenight_sim_cli(cli_args: list = []) -> None: + parser = argparse.ArgumentParser(description="Run prenight simulations") + default_time = Time(int(_mjd_now() - 0.5), format="mjd") + parser.add_argument( + "--dayobs", + type=str, + default=default_time.iso, + help="The day_obs of the night to simulate.", + ) + parser.add_argument( + "--archive", + type=str, + default=DEFAULT_ARCHIVE_URI, + help="Archive in which to store simulation results.", + ) + parser.add_argument("--scheduler", type=str, default=None, help="pickle file of the scheduler to run.") + + # Only pass a default if we have an opsim + baseline = get_baseline() + if baseline is not None: + parser.add_argument( + "--opsim", default=baseline, type=str, help="Opsim database from which to load previous visits." + ) + else: + parser.add_argument("--opsim", type=str, help="Opsim database from which to load previous visits.") + + add_make_scheduler_snapshot_args(parser) + + args = parser.parse_args() if len(cli_args) == 0 else parser.parse_args(cli_args) + + day_obs_mjd = _parse_dayobs_to_mjd(args.dayobs) + archive_uri = args.archive + opsim_db = None if args.opsim in ("", "None") else args.opsim + + scheduler_file = args.scheduler + if args.repo is not None or args.script is not None: + if os.path.exists(scheduler_file): + raise ValueError(f"File {scheduler_file} already exists!") + + scheduler: CoreScheduler = get_scheduler(args.repo, args.script, args.branch) + save_scheduler(scheduler, scheduler_file) + + opsim_metadata = { + "opsim_config_repository": args.repo, + "opsim_config_script": args.script, + "opsim_config_branch": args.branch, + } + else: + opsim_metadata = None + + run_prenights( + day_obs_mjd, + archive_uri, + scheduler_file, + opsim_db, + minutes_delays=(0, 1, 10, 60), + anomalous_overhead_seeds=(101, 102), + opsim_metadata=opsim_metadata, + ) + + +if __name__ == "__main__": + prenight_sim_cli() diff --git a/rubin_sim/sim_archive/sim_archive.py b/rubin_sim/sim_archive/sim_archive.py new file mode 100644 index 000000000..d451c95ce --- /dev/null +++ b/rubin_sim/sim_archive/sim_archive.py @@ -0,0 +1,933 @@ +"""Tools for maintaining an archive of opsim output and metadata.""" + +__all__ = [ + "make_sim_archive_dir", + "transfer_archive_dir", + "check_opsim_archive_resource", + "read_archived_sim_metadata", + "make_sim_archive_cli", + "drive_sim", +] + +import argparse +import datetime +import hashlib +import json +import logging +import lzma +import os +import pickle +import shutil +import socket +import sys +from contextlib import redirect_stdout +from numbers import Integral, Number +from pathlib import Path +from tempfile import TemporaryDirectory +from urllib.parse import urlparse + +import numpy as np +import pandas as pd +import rubin_scheduler +import yaml +from astropy.time import Time +from rubin_scheduler.scheduler import sim_runner +from rubin_scheduler.scheduler.utils import SchemaConverter + +LOGGER = logging.getLogger(__name__) + +try: + from lsst.resources import ResourcePath +except ModuleNotFoundError: + LOGGER.error("Module lsst.resources required to use rubin_sim.sim_archive.") + +try: + from conda.cli.main_list import print_packages + from conda.gateways.disk.test import is_conda_environment + + have_conda = True +except ModuleNotFoundError: + have_conda = False + LOGGER.warning("No conda module found, no conda envirment data will be saved") + + +def make_sim_archive_dir( + observations, + reward_df=None, + obs_rewards=None, + in_files={}, + sim_runner_kwargs={}, + tags=[], + label=None, + data_path=None, + capture_env=True, + opsim_metadata=None, +): + """Create or fill a local simulation archive directory. + + Parameters + ---------- + observations : `numpy.recarray` + The observations data, in the "obs" format as accepted and + created by `rubin_scheduler.scheduler.utils.SchemaConverter`. + reward_df : `pandas.DataFrame`, optional + The reward data, by default None. + obs_rewards : `pandas.DataFrame`, optional + The observation rewards data, by default None. + in_files : `dict`, optional + Additional input files to be included in the archive, + by default {}. + sim_runner_kwargs : `dict`, optional + Additional simulation runner keyword arguments, by default {}. + tags : `list` [`str`], optional + A list of tags/keywords to be included in the metadata, by + default []. + label : `str`, optional + A label to be included in the metadata, by default None. + data_path : `str` or `pathlib.Path`, optional + The path to the simulation archive directory, by default None. + capture_env : `bool` + Use the current environment as the sim environment. + Defaults to True. + opsim_metadata : `dict` + Metadata to be included. + + Returns + ------- + data_dir : `pathlib.Path` or `tempfile.TemporaryDirectory` + The temporary directory containing the simulation archive. + """ + if data_path is None: + data_dir = TemporaryDirectory() + data_path = Path(data_dir.name) + else: + data_dir = None + + if not isinstance(data_path, Path): + data_path = Path(data_path) + + files = {} + + # Save the observations + files["observations"] = {"name": "opsim.db"} + opsim_output_fname = data_path.joinpath(files["observations"]["name"]) + SchemaConverter().obs2opsim(observations, filename=opsim_output_fname) + + # Save the rewards + if reward_df is not None and obs_rewards is not None: + files["rewards"] = {"name": "rewards.h5"} + rewards_fname = data_path.joinpath(files["rewards"]["name"]) + if reward_df is not None: + reward_df.to_hdf(rewards_fname, key="reward_df") + if obs_rewards is not None: + obs_rewards.to_hdf(rewards_fname, key="obs_rewards") + + # Save basic statistics + files["statistics"] = {"name": "obs_stats.txt"} + stats_fname = data_path.joinpath(files["statistics"]["name"]) + with open(stats_fname, "w") as stats_io: + print(SchemaConverter().obs2opsim(observations).describe().T.to_csv(sep="\t"), file=stats_io) + + if capture_env: + # Save the conda environment + conda_prefix = Path(sys.executable).parent.parent.as_posix() + if have_conda and is_conda_environment(conda_prefix): + conda_base_fname = "environment.txt" + environment_fname = data_path.joinpath(conda_base_fname).as_posix() + + # Python equivilent of + # conda list --export -p $conda_prefix > $environment_fname + with open(environment_fname, "w") as environment_io: + with redirect_stdout(environment_io): + print_packages(conda_prefix, format="export") + + files["environment"] = {"name": conda_base_fname} + + # Save pypi packages + pypi_base_fname = "pypi.json" + pypi_fname = data_path.joinpath(pypi_base_fname).as_posix() + + pip_json_output = os.popen("pip list --format json") + pip_list = json.loads(pip_json_output.read()) + + with open(pypi_fname, "w") as pypi_io: + print(json.dumps(pip_list, indent=4), file=pypi_io) + + files["pypi"] = {"name": pypi_base_fname} + + # Add supplied files + for file_type, fname in in_files.items(): + files[file_type] = {"name": Path(fname).name} + try: + shutil.copyfile(fname, data_path.joinpath(files[file_type]["name"])) + except shutil.SameFileError: + pass + + # Add file hashes + for file_type in files: + fname = data_path.joinpath(files[file_type]["name"]) + with open(fname, "rb") as file_io: + content = file_io.read() + + files[file_type]["md5"] = hashlib.md5(content).hexdigest() + + def convert_mjd_to_dayobs(mjd): + # Use dayObs defn. from SITCOMTN-32: https://sitcomtn-032.lsst.io/ + evening_local_mjd = np.floor(mjd - 0.5).astype(int) + evening_local_iso = Time(evening_local_mjd, format="mjd").iso[:10] + return evening_local_iso + + if opsim_metadata is None: + opsim_metadata = {} + + if capture_env: + opsim_metadata["scheduler_version"] = rubin_scheduler.__version__ + opsim_metadata["host"] = socket.getfqdn() + + opsim_metadata["username"] = os.environ["USER"] + + simulation_dates = {} + if "sim_start_mjd" in sim_runner_kwargs: + simulation_dates["first"] = convert_mjd_to_dayobs(sim_runner_kwargs["sim_start_mjd"]) + + if "sim_duration" in sim_runner_kwargs: + simulation_dates["last"] = convert_mjd_to_dayobs( + sim_runner_kwargs["sim_start_mjd"] + sim_runner_kwargs["sim_duration"] - 1 + ) + else: + simulation_dates["first"] = convert_mjd_to_dayobs(observations["mjd"].min()) + simulation_dates["last"] = convert_mjd_to_dayobs(observations["mjd"].max()) + + if len(sim_runner_kwargs) > 0: + opsim_metadata["sim_runner_kwargs"] = {} + for key, value in sim_runner_kwargs.items(): + # Cast numpy number types to ints, floats, and reals to avoid + # confusing the yaml module. + match value: + case bool(): + opsim_metadata["sim_runner_kwargs"][key] = value + case Integral(): + opsim_metadata["sim_runner_kwargs"][key] = int(value) + case Number(): + opsim_metadata["sim_runner_kwargs"][key] = float(value) + case _: + opsim_metadata["sim_runner_kwargs"][key] = str(value) + + opsim_metadata["simulated_dates"] = simulation_dates + opsim_metadata["files"] = files + + if len(tags) > 0: + for tag in tags: + assert isinstance(tag, str), "Tags must be strings." + opsim_metadata["tags"] = tags + + if label is not None: + assert isinstance(label, str), "The sim label must be a string." + opsim_metadata["label"] = label + + sim_metadata_fname = data_path.joinpath("sim_metadata.yaml") + with open(sim_metadata_fname, "w") as sim_metadata_io: + print(yaml.dump(opsim_metadata, indent=4), file=sim_metadata_io) + + files["metadata"] = {"name": sim_metadata_fname} + + if data_dir is not None: + # If we created a temporary directory, if we do not return it, it + # will get automatically cleaned up, losing our work. + # So, return it. + return data_dir + + return data_path + + +def transfer_archive_dir(archive_dir, archive_base_uri="s3://rubin-scheduler-prenight/opsim/"): + """Transfer the contents of an archive directory to an resource. + + Parameters + ---------- + archive_dir : `str` + The path to the archive directory containing the files to be + transferred. + archive_base_uri : `str`, optional + The base URI where the archive files will be transferred to. + Default is "s3://rubin-scheduler-prenight/opsim/". + + Returns + ------- + resource_rpath : `ResourcePath` + The destination resource. + """ + + metadata_fname = Path(archive_dir).joinpath("sim_metadata.yaml") + with open(metadata_fname, "r") as metadata_io: + sim_metadata = yaml.safe_load(metadata_io) + + insert_date = datetime.datetime.utcnow().date().isoformat() + insert_date_rpath = ResourcePath(archive_base_uri).join(insert_date, forceDirectory=True) + if not insert_date_rpath.exists(): + insert_date_rpath.mkdir() + + # Number the sims in the insert date dir by + # looing for all the interger directories, and choosing the next one. + found_ids = [] + for base_dir, found_dirs, found_files in insert_date_rpath.walk(): + if base_dir == insert_date_rpath: + for found_dir in found_dirs: + try: + found_dir_index = found_dir[:-1] if found_dir.endswith("/") else found_dir + found_ids.append(int(found_dir_index)) + except ValueError: + pass + + new_id = max(found_ids) + 1 if len(found_ids) > 0 else 1 + resource_rpath = insert_date_rpath.join(f"{new_id}", forceDirectory=True) + resource_rpath.mkdir() + + # Include the metadata file itself. + sim_metadata["files"]["metadata"] = {"name": "sim_metadata.yaml"} + + for file_info in sim_metadata["files"].values(): + source_fname = Path(archive_dir).joinpath(file_info["name"]) + with open(source_fname, "rb") as source_io: + content = source_io.read() + + destination_rpath = resource_rpath.join(file_info["name"]) + destination_rpath.write(content) + + LOGGER.info(f"Copied {source_fname} to {destination_rpath}") + + return resource_rpath + + +def check_opsim_archive_resource(archive_uri): + """Check the contents of an opsim archive resource. + + Parameters + ---------- + archive_uri : `str` + The URI of the archive resource to be checked. + + Returns + ------- + validity: `dict` + A dictionary of files checked, and their validity. + """ + metadata_path = ResourcePath(archive_uri).join("sim_metadata.yaml") + with metadata_path.open(mode="r") as metadata_io: + sim_metadata = yaml.safe_load(metadata_io) + + results = {} + + for file_info in sim_metadata["files"].values(): + resource_path = ResourcePath(archive_uri).join(file_info["name"]) + content = resource_path.read() + + results[file_info["name"]] = file_info["md5"] == hashlib.md5(content).hexdigest() + + return results + + +def _build_archived_sim_label(base_uri, metadata_resource, metadata): + label_base = metadata_resource.dirname().geturl().removeprefix(base_uri).rstrip("/").lstrip("/") + + # If a label is supplied by the metadata, use it + if "label" in metadata: + label = f"{label_base} {metadata['label']}" + return label + + try: + sim_dates = metadata["simulated_dates"] + first_date = sim_dates["first"] + last_date = sim_dates["last"] + label = f"{label_base} of {first_date}" + if last_date != first_date: + label = f"{label} through {last_date}" + except KeyError: + label = label_base + + if "scheduler_version" in metadata: + label = f"{label} with {metadata['scheduler_version']}" + + return label + + +def read_archived_sim_metadata( + base_uri, latest=None, num_nights=5, compilation_resource=None, verify_compilation=False +): + """Read metadata for a time range of archived opsim output. + + Parameters + ---------- + base_uri : `str` + The base URI of the archive resource to be checked. + latest : `str`, optional + The date of the latest simulation whose metadata should be loaded. + This is the date on which the simulations was added to the archive, + not necessarily the date on which the simulation was run, or any + of the dates simulated. + Default is today. + num_nights : `int` + The number of nights of the date window to load. + compilation_resource : `ResourcePath` or `str` or `None` + The ResourcePath to an hdf5 compilation of metadata. + verify_compilation : `bool` + Verify that metadata in compilation corresponds to an existing + resource, and the all existing resources have metadata. Defaults + to False, which will work provided that the compilation is complete + and correct up to the date of the last simulation it includes. + + Returns + ------- + sim_metadata: `dict` + A dictionary of metadata for simulations in the date range. + """ + latest_mjd = int(Time.now().mjd if latest is None else Time(latest).mjd) + earliest_mjd = int(latest_mjd - (num_nights - 1)) + + compilation = {} + compiled_uris_by_date = {} + max_compiled_date = "1900-01-01" + if compilation_resource is not None: + try: + compilation.update(read_sim_metadata_from_hdf(compilation_resource)) + for uri in compilation: + iso_date = Path(urlparse(uri).path).parts[-2] + if iso_date not in compiled_uris_by_date: + compiled_uris_by_date[iso_date] = [] + compiled_uris_by_date[iso_date].append(uri) + max_compiled_date = max(max_compiled_date, iso_date) + except FileNotFoundError: + LOGGER.warning(f"No metadata cache {compilation_resource}, not using cache.") + pass + + all_metadata = {} + for mjd in range(earliest_mjd, latest_mjd + 1): + iso_date = Time(mjd, format="mjd").iso[:10] + + # Make the comparison >=, rather than >, so + # it won't miss sims in which the compilation does not complete + # the final date. + if verify_compilation or (iso_date >= max_compiled_date): + date_resource = ResourcePath(base_uri).join(iso_date, forceDirectory=True) + if date_resource.exists(): + for base_dir, found_dirs, found_files in date_resource.walk( + file_filter=r".*sim_metadata.yaml" + ): + for found_file in found_files: + found_resource = ResourcePath(base_dir).join(found_file) + sim_uri = str(found_resource.dirname()) + if sim_uri in compilation: + these_metadata = compilation[sim_uri] + else: + these_metadata = yaml.safe_load(found_resource.read().decode("utf-8")) + these_metadata["label"] = _build_archived_sim_label( + base_uri, found_resource, these_metadata + ) + if iso_date < max_compiled_date: + print(f"Simulation at {sim_uri} expected but not found in compilation.") + all_metadata[sim_uri] = these_metadata + else: + if iso_date in compiled_uris_by_date: + for sim_uri in compiled_uris_by_date[iso_date]: + all_metadata[sim_uri] = compilation[sim_uri] + + if verify_compilation: + if iso_date in compiled_uris_by_date: + for sim_uri in compiled_uris_by_date[iso_date]: + if sim_uri not in all_metadata: + print(f"Simulation at {sim_uri} in compiled metadata but not archive.") + + return all_metadata + + +def make_sim_archive_cli(*args): + parser = argparse.ArgumentParser(description="Add files to sim archive") + parser.add_argument( + "label", + type=str, + help="A label for the simulation.", + ) + parser.add_argument( + "opsim", + type=str, + help="File name of opsim database.", + ) + parser.add_argument("--rewards", type=str, default=None, help="A rewards HDF5 file.") + parser.add_argument( + "--scheduler_version", + type=str, + default=None, + help="The version of the scheduler that producte the opsim database.", + ) + parser.add_argument( + "--scheduler", + type=str, + default=None, + help="A snapshot of the scheduler used to produce the database, at the start of the simulation.", + ) + parser.add_argument( + "--script", type=str, default=None, help="The file name of the script run to create the simulation." + ) + + notebook_help = "The file name of the notebook run to create the simulation." + notebook_help = notebook_help + " This can be produced using the %%notebook magic." + parser.add_argument( + "--notebook", + type=str, + default=None, + help=notebook_help, + ) + parser.add_argument( + "--current_env", + action="store_true", + help="Record the current environment as the simulation environment.", + ) + parser.add_argument( + "--archive_base_uri", + type=str, + default="s3://rubin-scheduler-prenight/opsim/", + help="Base URI for the archive", + ) + parser.add_argument("--tags", type=str, default=[], nargs="*", help="The tags on the simulation.") + arg_values = parser.parse_args() if len(args) == 0 else parser.parse_args(args) + + observations = SchemaConverter().opsim2obs(arg_values.opsim) + + if arg_values.rewards is not None: + try: + reward_df = pd.read_hdf(arg_values.rewards, "reward_df") + except KeyError: + reward_df = None + + try: + obs_rewards = pd.read_hdf(arg_values.rewards, "obs_rewards") + except KeyError: + obs_rewards = None + else: + reward_df = None + obs_rewards = None + + filename_args = ["scheduler", "script", "notebook"] + in_files = {} + for filename_arg in filename_args: + try: + filename = getattr(arg_values, filename_arg) + if filename is not None: + in_files[filename] = filename + except AttributeError: + pass + + data_path = make_sim_archive_dir( + observations, + reward_df, + obs_rewards, + in_files, + tags=arg_values.tags, + label=arg_values.label, + capture_env=arg_values.current_env, + ) + + sim_archive_uri = transfer_archive_dir(data_path.name, arg_values.archive_base_uri) + + return sim_archive_uri + + +def compile_sim_metadata( + archive_uri: str, compilation_resource: str | ResourcePath, num_nights: int = 10000, append=False +) -> str: + """Read sim archive metadata and export it to tables in an hdf5 files. + + Parameters + ---------- + archive_uri : `str` + URI of the sim archive from which to read metadata. + compilation_resource : `str` or `ResourcePath` + Resource for hdf5 file to be written to + num_nights : `int`, optional + Number of nights to include, by default 10000 + append : `bool`, optional + Do not rebuild the whole compilation, but instead reread what is + there already, and add new metadata after the last date already + include. Defaults to False. + + Returns + ------- + compilation_fname : `ResourcePath` + The resource to which the hdf5 file was written. + """ + + if append: + sim_metadata = read_archived_sim_metadata( + archive_uri, num_nights=num_nights, compilation_resource=compilation_resource + ) + else: + sim_metadata = read_archived_sim_metadata(archive_uri, num_nights=num_nights) + + sim_rows = [] + file_rows = [] + sim_runner_kwargs = [] + tags = [] + for uri, metadata in list(sim_metadata.items()): + sim_row = {"sim_uri": uri} + for key, value in metadata.items(): + match key: + case "files": + for file_type, file_metadata in value.items(): + this_file = {"sim_uri": uri, "file_type": file_type} | file_metadata + file_rows.append(this_file) + case "sim_runner_kwargs": + these_args = {"sim_uri": uri} | value + sim_runner_kwargs.append(these_args) + case "tags": + for tag in value: + tags.append({"sim_uri": uri, "tag": tag}) + case "simulated_dates": + sim_row["first_simulated_date"] = value["start"] if "start" in value else value["first"] + sim_row["last_simulated_date"] = value["end"] if "end" in value else value["last"] + case _: + sim_row[key] = value + sim_rows.append(sim_row) + + sim_df = pd.DataFrame(sim_rows).set_index("sim_uri") + file_df = pd.DataFrame(file_rows).set_index("sim_uri") + sim_runner_kwargs_df = pd.DataFrame(sim_runner_kwargs).set_index("sim_uri") + tags_df = pd.DataFrame(tags).set_index("sim_uri") + + with TemporaryDirectory() as local_data_dir: + local_hdf_fname = Path(local_data_dir).joinpath("scheduler.pickle.xz") + sim_df.to_hdf(local_hdf_fname, key="simulations", format="table") + file_df.to_hdf(local_hdf_fname, key="files", format="table") + sim_runner_kwargs_df.to_hdf(local_hdf_fname, key="kwargs", format="table") + tags_df.to_hdf(local_hdf_fname, key="tags", format="table") + + with open(local_hdf_fname, "rb") as local_hdf_io: + hdf_bytes = local_hdf_io.read() + + if isinstance(compilation_resource, str): + compilation_resource = ResourcePath(compilation_resource) + + assert isinstance(compilation_resource, ResourcePath) + LOGGER.info(f"Writing metadata compilation to {compilation_resource}") + compilation_resource.write(hdf_bytes) + + return compilation_resource + + +def read_sim_metadata_from_hdf(compilation_resource: str | ResourcePath) -> dict: + """Read sim archive metadata from an hdf5 file. + Return a dict as if it were generated by read_archived_sim_metadata. + + Parameters + ---------- + compilation_fname : `str` or `ResourcePath` + The source of the hdf5 data to read. + + Returns + ------- + sim_archive_metadata: `dict` + A nested dictionary with the simulation metadata. + """ + + if isinstance(compilation_resource, str): + compilation_resource = ResourcePath(compilation_resource) + assert isinstance(compilation_resource, ResourcePath) + + with compilation_resource.as_local() as local_compilation_resource: + compilation_fname: str = local_compilation_resource.ospath + sim_df = pd.read_hdf(compilation_fname, "simulations") + file_df = pd.read_hdf(compilation_fname, "files") + sim_runner_kwargs_df = pd.read_hdf(compilation_fname, "kwargs") + tags_df = pd.read_hdf(compilation_fname, "tags") + + def make_file_dict(g): + if isinstance(g, pd.Series): + g = g.to_frame().T + + return g.set_index("file_type").T.to_dict() + + sim_df["files"] = file_df.groupby("sim_uri").apply(make_file_dict) + + def make_kwargs_dict(g): + sim_kwargs = g.to_dict(orient="records")[0] + # Keyword args that are not set get recorded as nans. + # Do not include them in the dictionary. + nan_keys = [] + for key in sim_kwargs: + try: + if np.isnan(sim_kwargs[key]): + nan_keys.append(key) + except TypeError: + pass + + for key in nan_keys: + del sim_kwargs[key] + + return sim_kwargs + + sim_df["sim_runner_kwargs"] = sim_runner_kwargs_df.groupby("sim_uri").apply(make_kwargs_dict) + + sim_df["tags"] = tags_df.groupby("sim_uri").agg({"tag": list}) + + sim_metadata = sim_df.to_dict(orient="index") + + for sim_uri in sim_metadata: + # Return begin and end date to their nested dict format. + sim_metadata[sim_uri]["simulated_dates"] = { + "first": sim_metadata[sim_uri]["first_simulated_date"], + "last": sim_metadata[sim_uri]["last_simulated_date"], + } + del sim_metadata[sim_uri]["first_simulated_date"] + del sim_metadata[sim_uri]["last_simulated_date"] + + # Unset keys show up as nans. + # Do not put them in the resultant dict. + nan_keys = [] + for key in sim_metadata[sim_uri]: + try: + if np.isnan(sim_metadata[sim_uri][key]): + nan_keys.append(key) + except TypeError: + pass + + for key in nan_keys: + del sim_metadata[sim_uri][key] + + return sim_metadata + + +def verify_compiled_sim_metadata( + archive_uri: str, compilation_resource: str | ResourcePath, num_nights: int = 10000 +) -> list[dict]: + """Verify that a compilation of sim archive metadata matches directaly + read metadata. + + Parameters + ---------- + archive_uri : `str` + Archive from which to directly read metadata. + compilation_resource : `str` or `ResourcePath` + Resource for the metadata compilation + num_nights : `int`, optional + number of nights to check, by default 10000 + + Returns + ------- + differences : `list[dict]` + A list of dicts describing differences. If they match, it will return + and empty list. + """ + + direct_sim_metadata = read_archived_sim_metadata(archive_uri, num_nights=num_nights) + + try: + # One old sim uses a couple of non-standard keywords, so update them. + simulated_dates = direct_sim_metadata["s3://rubin:rubin-scheduler-prenight/opsim/2023-12-15/1/"][ + "simulated_dates" + ] + simulated_dates["first"] = simulated_dates["start"] + del simulated_dates["start"] + simulated_dates["last"] = simulated_dates["end"] + del simulated_dates["end"] + except KeyError: + # If the archive doesn't have this old sim, don't worry about it. + pass + + compiled_sim_metadata = read_sim_metadata_from_hdf(compilation_resource) + + # Test that everything in direct_sim_metadata has a corresponding matching + # entry in the compilation. + differences = [] + for sim_uri in direct_sim_metadata: + for key in direct_sim_metadata[sim_uri]: + try: + if direct_sim_metadata[sim_uri][key] != compiled_sim_metadata[sim_uri][key]: + differences.append( + { + "sim_uri": sim_uri, + "key": key, + "direct_value": direct_sim_metadata[sim_uri][key], + "compiled_value": compiled_sim_metadata[sim_uri][key], + } + ) + except KeyError: + differences.append( + { + "sim_uri": sim_uri, + "key": key, + "direct_value": direct_sim_metadata[sim_uri][key], + "compiled_value": "MISSING", + } + ) + + # Test that everything in the compilation has a corresponding matching + # entry in direct_sim_metadata. + for sim_uri in compiled_sim_metadata: + for key in compiled_sim_metadata[sim_uri]: + if sim_uri not in direct_sim_metadata: + differences.append( + { + "sim_uri": sim_uri, + "direct_value": "MISSING", + } + ) + elif key not in direct_sim_metadata[sim_uri]: + differences.append( + { + "sim_uri": sim_uri, + "key": key, + "direct_value": "MISSING", + "compiled_value": compiled_sim_metadata[sim_uri][key], + } + ) + + return differences + + +def drive_sim( + observatory, + scheduler, + archive_uri=None, + label=None, + tags=[], + script=None, + notebook=None, + opsim_metadata=None, + **kwargs, +): + """Run a simulation and archive the results. + + Parameters + ---------- + observatory : `ModelObservatory` + The model for the observatory. + scheduler : `CoreScheduler` + The scheduler to use. + archive_uri : `str`, optional + The root URI of the archive resource into which the results should + be stored. Defaults to None. + label : `str`, optional + The label for the simulation in the archive. Defaults to None. + tags : `list` of `str`, optional + The tags for the simulation in the archive. Defaults to an + empty list. + script : `str` + The filename of the script producing this simulation. + Defaults to None. + notebook : `str`, optional + The filename of the notebook producing the simulation. + Defaults to None. + opsim_metadata : `dict`, optional + Extra metadata to store in the archive. + + Returns + ------- + observatory : `ModelObservatory` + The model for the observatory. + scheduler : `CoreScheduler` + The scheduler used. + observations : `numpy.recarray` + The observations produced. + reward_df : `pandas.DataFrame`, optional + The table of rewards. Present if `record_rewards` + or `scheduler.keep_rewards` is True. + obs_rewards : `pandas.Series`, optional + The mapping of entries in reward_df to observations. Present if + `record_rewards` or `scheduler.keep_rewards` is True. + resource_path : `ResourcePath`, optional + The resource path to the archive of the simulation. Present if + `archive_uri` was set. + + Notes + ----- + Additional parameters not described above will be passed into + `sim_runner`. + + If the `archive_uri` parameter is not supplied, `sim_runner` is run + directly, so that `drive_sim` can act as a drop-in replacement of + `sim-runner`. + + In a jupyter notebook, the notebook can be saved for the notebook + paramater using `%notebook $notebook_fname` (where `notebook_fname` + is variable holding the filename for the notebook) in the cell prior + to calling `drive_sim`. + """ + if "record_rewards" in kwargs: + if kwargs["record_rewards"] and not scheduler.keep_rewards: + raise ValueError("To keep rewards, scheduler.keep_rewards must be True") + else: + kwargs["record_rewards"] = scheduler.keep_rewards + + in_files = {} + if script is not None: + in_files["script"] = script + + if notebook is not None: + in_files["notebook"] = notebook + + with TemporaryDirectory() as local_data_dir: + # We want to store the state of the scheduler at the start of + # the sim, so we need to save it now before we run the simulation. + scheduler_path = Path(local_data_dir).joinpath("scheduler.pickle.xz") + with lzma.open(scheduler_path, "wb", format=lzma.FORMAT_XZ) as pio: + pickle.dump(scheduler, pio) + in_files["scheduler"] = scheduler_path.as_posix() + + sim_results = sim_runner(observatory, scheduler, **kwargs) + + observations = sim_results[2] + reward_df = sim_results[3] if scheduler.keep_rewards else None + obs_rewards = sim_results[4] if scheduler.keep_rewards else None + + data_dir = make_sim_archive_dir( + observations, + reward_df=reward_df, + obs_rewards=obs_rewards, + in_files=in_files, + sim_runner_kwargs=kwargs, + tags=tags, + label=label, + capture_env=True, + opsim_metadata=opsim_metadata, + ) + + if archive_uri is not None: + resource_path = transfer_archive_dir(data_dir.name, archive_uri) + else: + resource_path = ResourcePath(data_dir.name, forceDirctory=True) + + results = sim_results + (resource_path,) + return results + + +def compile_sim_archive_metadata_cli(*args): + parser = argparse.ArgumentParser(description="Create a metadata compilation HDF5 file at a URI") + parser.add_argument( + "--compilation_uri", + type=str, + default=None, + help="The URI of the metadata archive compilation to write. " + + "Defaults to compilation_metadate.h5 in the archive.", + ) + parser.add_argument( + "--archive_base_uri", + type=str, + default="s3://rubin:rubin-scheduler-prenight/opsim/", + help="Base URI for the archive", + ) + parser.add_argument( + "--append", + action="store_true", + help="Do not rebuild the whole compilation, " + + "but add new simulations with dates after the last current entry.", + ) + + arg_values = parser.parse_args() if len(args) == 0 else parser.parse_args(args) + archive_uri = arg_values.archive_base_uri + compilation_uri = arg_values.compilation_uri + append = arg_values.append + if compilation_uri is None: + compilation_resource = ResourcePath(archive_uri).join("compiled_metadata_cache.h5") + else: + compilation_resource = ResourcePath(compilation_uri) + + compilation_resource = compile_sim_metadata(archive_uri, compilation_resource, append=append) diff --git a/tests/sim_archive/test_make_snapshot.py b/tests/sim_archive/test_make_snapshot.py new file mode 100644 index 000000000..d1ef6cf2e --- /dev/null +++ b/tests/sim_archive/test_make_snapshot.py @@ -0,0 +1,24 @@ +import importlib.util +import unittest + +from rubin_scheduler.scheduler.schedulers.core_scheduler import CoreScheduler + +if importlib.util.find_spec("lsst"): + HAVE_TS = importlib.util.find_spec("lsst.ts") +else: + HAVE_TS = False + +if HAVE_TS: + from rubin_sim.sim_archive import get_scheduler_instance_from_repo + + +class TestMakeSnapshot(unittest.TestCase): + @unittest.skip("Skipping because test depends on external repo.") + @unittest.skipIf(not HAVE_TS, "No lsst.ts") + def test_get_scheduler_instance_photcal(self): + scheduler = get_scheduler_instance_from_repo( + config_repo="https://github.com/lsst-ts/ts_config_ocs.git", + config_script="Scheduler/feature_scheduler/auxtel/fbs_config_image_photocal_survey.py", + config_branch="main", + ) + self.assertIsInstance(scheduler, CoreScheduler) diff --git a/tests/sim_archive/test_prenight.py b/tests/sim_archive/test_prenight.py new file mode 100644 index 000000000..66de61a12 --- /dev/null +++ b/tests/sim_archive/test_prenight.py @@ -0,0 +1,28 @@ +import importlib.util +import unittest +from tempfile import TemporaryDirectory + +try: + from lsst.resources import ResourcePath + + HAVE_RESOURCES = True +except ModuleNotFoundError: + HAVE_RESOURCES = False + +if HAVE_RESOURCES: + from rubin_sim.sim_archive import prenight_sim_cli + +# We need rubin_sim to get the baseline sim +# Tooling prefers checking that it exists using importlib rather +# than importing it and not actually using it. +HAVE_RUBIN_SIM = importlib.util.find_spec("rubin_sim") + + +class TestPrenight(unittest.TestCase): + @unittest.skip("Too slow") + @unittest.skipIf(not HAVE_RESOURCES, "No lsst.resources") + @unittest.skipIf(not HAVE_RUBIN_SIM, "No rubin_sim, needed for rubin_sim.data.get_baseline") + def test_prenight(self): + with TemporaryDirectory() as test_archive_dir: + archive_uri = ResourcePath(test_archive_dir).geturl() # type: ignore + prenight_sim_cli("--archive", archive_uri) diff --git a/tests/sim_archive/test_sim_archive.py b/tests/sim_archive/test_sim_archive.py new file mode 100644 index 000000000..0a1bfe8bb --- /dev/null +++ b/tests/sim_archive/test_sim_archive.py @@ -0,0 +1,139 @@ +import importlib +import lzma +import pickle +import unittest +import urllib +from pathlib import Path +from tempfile import TemporaryDirectory + +from rubin_scheduler.scheduler import sim_runner +from rubin_scheduler.scheduler.example import example_scheduler +from rubin_scheduler.scheduler.model_observatory import ModelObservatory +from rubin_scheduler.utils import SURVEY_START_MJD + +HAVE_LSST_RESOURCES = importlib.util.find_spec("lsst") and importlib.util.find_spec("lsst.resources") +if HAVE_LSST_RESOURCES: + from lsst.resources import ResourcePath + + from rubin_sim.sim_archive.sim_archive import ( + check_opsim_archive_resource, + compile_sim_metadata, + make_sim_archive_cli, + make_sim_archive_dir, + read_archived_sim_metadata, + read_sim_metadata_from_hdf, + transfer_archive_dir, + verify_compiled_sim_metadata, + ) + + +class TestSimArchive(unittest.TestCase): + @unittest.skipIf(not HAVE_LSST_RESOURCES, "No lsst.resources") + def test_sim_archive(self): + # Begin by running a short simulation + sim_start_mjd = SURVEY_START_MJD + sim_duration = 1 # days + scheduler = example_scheduler(mjd_start=sim_start_mjd) + scheduler.keep_rewards = True + observatory = ModelObservatory(mjd_start=sim_start_mjd) + + # Record the state of the scheduler at the start of the sim. + data_dir = TemporaryDirectory() + data_path = Path(data_dir.name) + + scheduler_fname = data_path.joinpath("scheduler.pickle.xz") + with lzma.open(scheduler_fname, "wb", format=lzma.FORMAT_XZ) as pio: + pickle.dump(scheduler, pio) + + files_to_archive = {"scheduler": scheduler_fname} + + # Run the simulation + sim_runner_kwargs = { + "sim_start_mjd": sim_start_mjd, + "sim_duration": sim_duration, + "record_rewards": True, + } + + observatory, scheduler, observations, reward_df, obs_rewards = sim_runner( + observatory, scheduler, **sim_runner_kwargs + ) + + # Make the scratch sim archive + make_sim_archive_dir( + observations, + reward_df=reward_df, + obs_rewards=obs_rewards, + in_files=files_to_archive, + sim_runner_kwargs=sim_runner_kwargs, + tags=["test"], + label="test", + data_path=data_path, + ) + + # Move the scratch sim archive to a test resource + test_resource_dir = TemporaryDirectory() + test_resource_uri = "file://" + test_resource_dir.name + sim_archive_uri = transfer_archive_dir(data_dir.name, test_resource_uri) + + # Check the saved archive + archive_check = check_opsim_archive_resource(sim_archive_uri) + self.assertEqual( + archive_check.keys(), + set( + [ + "opsim.db", + "rewards.h5", + "scheduler.pickle.xz", + "obs_stats.txt", + "environment.txt", + "pypi.json", + ] + ), + ) + for value in archive_check.values(): + self.assertTrue(value) + + # Read back the metadata + archive_metadata = read_archived_sim_metadata(test_resource_uri) + base = sim_archive_uri.dirname().geturl().removeprefix(test_resource_uri).rstrip("/").lstrip("/") + expected_label = f"{base} test" + self.assertEqual(archive_metadata[sim_archive_uri.geturl()]["label"], expected_label) + + # Cache the metadata + test_compiled_metadata_uri = test_resource_uri + "/compiled_metadata_cache.h5" + + # Test reading from cached metadata + compile_sim_metadata(test_resource_uri, test_compiled_metadata_uri) + read_sim_metadata_from_hdf(test_compiled_metadata_uri) + read_archived_sim_metadata(test_resource_uri, compilation_resource=test_compiled_metadata_uri) + verify_compiled_sim_metadata(test_resource_uri, test_compiled_metadata_uri) + + @unittest.skipIf(not HAVE_LSST_RESOURCES, "No lsst.resources") + @unittest.skipIf(importlib.util.find_spec("schedview") is None, "No schedview") + def test_cli(self): + test_resource_path = ResourcePath("resource://schedview/data/") + with test_resource_path.join("sample_opsim.db").as_local() as local_rp: + opsim = urllib.parse.urlparse(local_rp.geturl()).path + + with test_resource_path.join("sample_rewards.h5").as_local() as local_rp: + rewards = urllib.parse.urlparse(local_rp.geturl()).path + + with test_resource_path.join("sample_scheduler.pickle.xz").as_local() as local_rp: + scheduler = urllib.parse.urlparse(local_rp.geturl()).path + + with TemporaryDirectory() as test_archive_dir: + test_archive_uri = f"file://{test_archive_dir}/" + make_sim_archive_cli( + "Test", + opsim, + "--rewards", + rewards, + "--scheduler", + scheduler, + "--archive_base_uri", + test_archive_uri, + ) + + +if __name__ == "__main__": + unittest.main() From d1d49f790923ef6adabf3719648172e0d8607023 Mon Sep 17 00:00:00 2001 From: Eric Neilsen Date: Thu, 27 Mar 2025 08:01:46 -0700 Subject: [PATCH 2/2] Reduce memory request --- batch/compile_prenight_metadata_cache.sh | 2 +- batch/run_prenight_sims.sh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/batch/compile_prenight_metadata_cache.sh b/batch/compile_prenight_metadata_cache.sh index 7c0b265ea..b046bd0a5 100755 --- a/batch/compile_prenight_metadata_cache.sh +++ b/batch/compile_prenight_metadata_cache.sh @@ -7,7 +7,7 @@ #SBATCH --nodes=1 # Number of nodes #SBATCH --ntasks=1 # Number of tasks run in parallel #SBATCH --cpus-per-task=1 # Number of CPUs per task -#SBATCH --mem=16G # Requested memory +#SBATCH --mem=4G # Requested memory #SBATCH --time=1:00:00 # Wall time (hh:mm:ss) echo "******** START of compile_prenight_metadata_cache.sh **********" diff --git a/batch/run_prenight_sims.sh b/batch/run_prenight_sims.sh index 68ad5462f..aeebfabaa 100755 --- a/batch/run_prenight_sims.sh +++ b/batch/run_prenight_sims.sh @@ -7,7 +7,7 @@ #SBATCH --nodes=1 # Number of nodes #SBATCH --ntasks=1 # Number of tasks run in parallel #SBATCH --cpus-per-task=1 # Number of CPUs per task -#SBATCH --mem=16G # Requested memory +#SBATCH --mem=4G # Requested memory #SBATCH --time=1:00:00 # Wall time (hh:mm:ss) echo "******** START of run_prenight_sims.sh **********"