Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-48944 DREAM should send files to LFA #12

Merged
merged 6 commits into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions doc/version-history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
Version History
###############

v0.5.1
======

* Implemented the getNewDataProducts command.

v0.5.0
======

Expand Down
12 changes: 11 additions & 1 deletion python/lsst/ts/dream/csc/config_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
$schema: http://json-schema.org/draft-07/schema#
$id: https://github.com/lsst-ts/ts_dream/blob/master/python/lsst/ts/dream/csc/config_schema.py
# title must end with one or more spaces followed by the schema version, which must begin with "v"
title: DREAM v3
title: DREAM v4
description: Schema for DREAM configuration files
type: object
properties:
Expand Down Expand Up @@ -67,13 +67,23 @@
minimum: 0
exclusiveMaximum: 100
default: 25
s3instance:
description: >-
Large File Annex S3 instance, for example "cp", "tuc" or "ls".
type: string
pattern: "^[a-z0-9][.a-z0-9]*[a-z0-9]$"
data_product_path:
description: Local filesystem path for fallback storage of data products
type: string
required:
- host
- port
- connection_timeout
- read_timeout
- poll_interval
- ess_index
- s3instance
- data_product_path
additionalProperties: false
"""
)
197 changes: 181 additions & 16 deletions python/lsst/ts/dream/csc/dream_csc.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,18 @@

import asyncio
import enum
import io
import logging
import pathlib
from types import SimpleNamespace
from typing import Any, Union

import httpx
from lsst.ts import salobj, utils

from . import CONFIG_SCHEMA, __version__
from .mock import MockDream
from .model import DreamModel
from .model import DataProduct, DreamModel

SAL_TIMEOUT = 10.0
CSC_RESET_SLEEP_TIME = 180.0
Expand All @@ -41,20 +44,21 @@ class ErrorCode(enum.IntEnum):
"""CSC error codes."""

TCPIP_CONNECT_ERROR = 1
UPLOAD_DATA_PRODUCT_FAILED = 2


class DreamCsc(salobj.ConfigurableCsc):
"""Commandable SAL Component for the DREAM.

Parameters
----------
config_dir: `string`
config_dir : `string`
The configuration directory
initial_state: `salobj.State`
initial_state : `salobj.State`
The initial state of the CSC
simulation_mode: `int`
simulation_mode : `int`
Simulation mode (1) or not (0)
mock_port: `int`, optional
mock_port : `int`, optional
The port that the mock DREAM will listen on. Ignored when
simulation_mode == 0.
"""
Expand All @@ -78,6 +82,7 @@ def __init__(
# Remote CSC for the weather data.
self.ess_remote: salobj.Remote | None = None
self.weather_and_status_loop_task = utils.make_done_future()
self.data_product_loop_task = utils.make_done_future()
self.weather_ok_flag: bool | None = None

super().__init__(
Expand All @@ -94,6 +99,11 @@ def __init__(

self.model: DreamModel | None = None

# LFA related configuration:
self.s3bucket: salobj.AsyncS3Bucket | None = None # Set by `connect`.
self.s3bucket_name: str | None = None # Set by `configure`.
self.s3instance: str | None = None # Set by `connect`.

async def connect(self) -> None:
"""Determine if running in local or remote mode and dispatch to the
corresponding connect coroutine.
Expand All @@ -115,13 +125,20 @@ async def connect(self) -> None:
if self.connected:
raise RuntimeError("Already connected")

# Set up the S3 bucket.
if self.s3bucket is None:
domock = self.simulation_mode != 0
self.s3bucket = salobj.AsyncS3Bucket(
name=self.s3bucket_name, domock=domock, create=domock
)

host: str = self.config.host
port: int = self.config.port

try:
if self.simulation_mode == 1:
if self.mock_port is None:
self.mock = MockDream(host="127.0.0.1", port=0)
self.mock = MockDream(host="127.0.0.1", port=0, log=self.log)
await self.mock.start_task
port = self.mock.port
self.log.info(f"Mock started on port {port}")
Expand All @@ -139,9 +156,10 @@ async def connect(self) -> None:
await self.fault(code=ErrorCode.TCPIP_CONNECT_ERROR, report=err_msg)
return

self.weather_and_status_loop_task = asyncio.ensure_future(
self.weather_and_status_loop_task = asyncio.create_task(
self.weather_and_status_loop()
)
self.data_product_loop_task = asyncio.create_task(self.data_product_loop())

async def end_enable(self, id_data: salobj.BaseDdsDataType) -> None:
"""End do_enable; called after state changes but before command
Expand All @@ -151,7 +169,7 @@ async def end_enable(self, id_data: salobj.BaseDdsDataType) -> None:

Parameters
----------
id_data: `CommandIdData`
id_data : `CommandIdData`
Command ID and data
"""
if not self.connected:
Expand All @@ -167,7 +185,7 @@ async def begin_disable(self, id_data: salobj.BaseDdsDataType) -> None:

Parameters
----------
id_data: `CommandIdData`
id_data : `CommandIdData`
Command ID and data
"""
await self.cmd_disable.ack_in_progress(id_data, timeout=SAL_TIMEOUT)
Expand All @@ -192,11 +210,12 @@ async def disconnect(self) -> None:
self.mock = None

self.weather_and_status_loop_task.cancel()
try:
await self.weather_and_status_loop_task
except asyncio.CancelledError:
# This is expected.
pass
self.data_product_loop_task.cancel()
await asyncio.gather(
self.weather_and_status_loop_task,
self.data_product_loop_task,
return_exceptions=True,
)

if self.ess_remote is not None:
await self.ess_remote.close()
Expand All @@ -206,6 +225,9 @@ async def disconnect(self) -> None:

async def configure(self, config: SimpleNamespace) -> None:
self.config = config
self.s3bucket_name = salobj.AsyncS3Bucket.make_bucket_name(
s3instance=config.s3instance,
)

@property
def connected(self) -> bool:
Expand All @@ -224,6 +246,35 @@ async def do_resume(self, data: salobj.BaseMsgType) -> None:
if not self.connected:
await self.connect()

async def data_product_loop(self) -> None:
"""Periodically check DREAM for new data products.

Send the getDataProducts command to DREAM. Based on the
response, collect the images from DREAM's HTTP server
and send them to LFA.
"""
if not self.config:
raise RuntimeError("Not yet configured")

while self.model is not None and self.model.connected:
self.log.debug("Checking for new data products...")

if self.model is not None:
data_products = await self.model.get_new_data_products()
for data_product in data_products:
self.log.info(f"New data product: {data_product.filename}")
try:
await self.upload_data_product(data_product)
except Exception as e:
self.log.exception("Upload data product failed")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, if this raises an exception, you are logging it and just ignoring. I think we might have to take a more proactive measure here, like sending the CSC to fault or something.

err_msg = f"Failed to upload DREAM data product: {e!r}"
await self.fault(
code=ErrorCode.UPLOAD_DATA_PRODUCT_FAILED, report=err_msg
)
return

await asyncio.sleep(self.config.poll_interval)

async def weather_and_status_loop(self) -> None:
"""Periodically check DREAM status and weather station and send a flag.

Expand Down Expand Up @@ -335,7 +386,7 @@ async def send_telemetry(self, status_data: dict[str, Any]) -> None:

Parameters
----------
status_data : dict[str, Any]
status_data : `dict[str, Any]`
The status structure returned by DREAM's getStatus
command.
"""
Expand Down Expand Up @@ -395,7 +446,7 @@ async def send_events(self, status_data: dict[str, Any]) -> None:

Parameters
----------
status_data : dict[str, Any]
status_data : `dict[str, Any]`
The status structure returned by DREAM's getStatus
command.

Expand Down Expand Up @@ -479,6 +530,120 @@ async def send_events(self, status_data: dict[str, Any]) -> None:
except KeyError:
self.log.exception("Status had unexpected format!")

async def upload_data_product(self, data_product: DataProduct) -> None:
"""Retrieve a data file from DREAM and uploads it to LFA.

Given a DataProduct structure transmitted from DREAM, pull
the file from the specified URL on the DREAM's HTTP server,
and then copy it to the LFA. If writing to the LFA fails,
write the file to the local filesystem in the configured
data product path directory (`self.config.data_product_path`)
instead.

Parameters
----------
data_product: `DataProduct`
Information about the new file that is available. This
structure is sent by DREAM in response to the
getNewDataProducts command.
"""
if not self.config:
raise RuntimeError("Not yet configured")

if not self.s3bucket:
raise RuntimeError("S3 bucket not configured")

# Set up an LFA bucket key
product_type = "" if data_product.type is None else f"_{data_product.type}"
other = (
f"{data_product.start.tai.isot}_{data_product.server}_"
f"{data_product.kind}{product_type}_"
f"{data_product.seq[0]:06d}_{data_product.seq[-1]:06d}"
)
key = self.s3bucket.make_key(
salname=self.salinfo.name,
salindexname=None,
generator="dream",
date=data_product.start,
other=other,
suffix=pathlib.Path(data_product.filename).suffix,
)

# Download the object with HTTP
dream_url = (
f"http://{self.config.host}:{self.config.port+1}/{data_product.filename}"
)
async with httpx.AsyncClient() as client:
async with client.stream("GET", dream_url) as response:
response.raise_for_status()

try:
# First attempt: Save to S3
await self.save_to_s3(response, key)
return # Success!
except Exception:
self.log.exception(
f"Could not upload {key} to S3; trying to save to local disk."
)
await self.save_to_local_disk(response, key)

async def save_to_s3(self, response: httpx.Response, key: str) -> None:
"""Upload file to S3 from an HTTP stream.

Parameters
----------
response : `httpx.Response`
HTTP response object for the file to save.

key : `str`
S3 style filename key to save to.
"""
if not self.s3bucket:
raise RuntimeError("S3 bucket not configured")

with io.BytesIO(await response.aread()) as buffer:
await self.s3bucket.upload(fileobj=buffer, key=key)
url = (
f"{self.s3bucket.service_resource.meta.client.meta.endpoint_url}/"
f"{self.s3bucket.name}/{key}"
)
await self.evt_largeFileObjectAvailable.set_write(
url=url,
generator="dream",
)
self.log.info(f"Successfully uploaded {key} to S3.")

async def save_to_local_disk(self, response: httpx.Response, key: str) -> None:
"""Save the file from an HTTP stream to local disk.

Parameters
----------
response : `httpx.Response`
HTTP response object for the file to save.

key: `str`
S3 style filename key to save to.
"""
if not self.config:
raise RuntimeError("Not yet configured")

if not self.s3bucket:
raise RuntimeError("S3 bucket not configured")

filepath = (
pathlib.Path(self.config.data_product_path) / self.s3bucket.name / key
)
dirpath = filepath.parent
if not dirpath.exists():
self.log.info(f"Creating directory {str(dirpath)}")
dirpath.mkdir(parents=True, exist_ok=True)

with open(filepath, "wb") as file:
async for chunk in response.aiter_bytes():
file.write(chunk)

self.log.info(f"Saved {key} to local disk at {filepath}")


def run_dream() -> None:
asyncio.run(DreamCsc.amain(index=None))
Loading