-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DM-48944 DREAM should send files to LFA #12
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good but I have some comments I hope you will consider before I can approve the PR.
Large File Annex S3 instance, for example "cp", "tuc" or "ls". | ||
type: string | ||
pattern: "^[a-z0-9][.a-z0-9]*[a-z0-9]$" | ||
url_root: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would this be any different than the hostname above? I imagine the data service will be running on the same machine that runs the DREAM controller no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely, the hostname will match the host
configuration item. What I'm not sure about is (1) whether the HTTP server will be http or https, and (2) which port the HTTP server would be expected to run on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, but I am pretty sure those are all things we control, and can specify. For example, we can say that the port will be port+1
(increment the communication port by one) and that it is not going to be https.
@@ -142,6 +158,7 @@ async def connect(self) -> None: | |||
self.weather_and_status_loop_task = asyncio.ensure_future( | |||
self.weather_and_status_loop() | |||
) | |||
self.data_product_loop_task = asyncio.ensure_future(self.data_product_loop()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The recommended way (also here) to schedule background tasks is to use create_task
instead of ensure_future
. I noticed that there are others use of ensure_future
here, you might want to update those as well.
try: | ||
await self.weather_and_status_loop_task | ||
await self.data_product_loop_task |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note that since self.weather_and_status_loop_task
was also cancelled, you will never reach this await
here. You might want to do a gather
here and since you are basically expecting them to be cancelled, you could use return_exception
and ignore the results.. something like this:
await asyncio.gather(self.weather_and_status_loop_task, self.data_product_loop_task, return_exception=True)
you can even remote the try/except in this case.
|
||
Parameters | ||
---------- | ||
data_product: DataProduct |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, reformat this to fit the numpy docs style, e.g.:
data_product : `DataProduct`
try: | ||
await self.upload_data_product(data_product) | ||
except Exception: | ||
self.log.exception("Upload data product failed") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, if this raises an exception, you are logging it and just ignoring. I think we might have to take a more proactive measure here, like sending the CSC to fault or something.
async with httpx.AsyncClient() as client: | ||
async with client.stream("GET", dream_url) as response: | ||
if response.status_code != 200: | ||
self.log.error( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be an exception instead?
) | ||
|
||
# Second attempt: Fresh request and save locally | ||
async with client.stream("GET", dream_url) as response: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you really need to retrieve the data again? You already retrieve it above, so why not fallback to saving it to local disk soon after the failure above?
You could do something like:
try:
# First attempt: Save to S3
await self.save_to_s3(response, key)
return # Success!
except Exception:
self.log.exception(
f"Could not upload {key} to S3; trying to save to local disk."
)
await self.save_to_local_disk(response, key)
Also, I am a bit weary of all these potential errors being ignored (after being logged). If you are afraid some transient errors might happen and want to make it resilient to that, it is probably fine. However, ignoring all errors like this is probably too extreme. I would rather have the CSC going to Fault.
) | ||
self.log.info(f"Successfully uploaded {key} to S3.") | ||
except Exception: | ||
self.log.exception(f"Failed to upload {key} to S3.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have the impression you are logging the same exceptions multiple times here. You might want to remove the try/except here and let the exception be lifted and handled at another layer.
self.log.info(f"Saved {key} to local disk at {filepath}") | ||
except Exception: | ||
self.log.exception("Could not save the file to local disk.") | ||
raise |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above. Consider just removing the try/except clause here and handle the exception at another layer.
generator="dream", | ||
date=data_product.start, | ||
other=other, | ||
suffix=".fits", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this right? is the data always going to be fits? I have the impression that in the mock below you are handling txt files. This should probably be extracted from the data_product no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I contacted Sjoerd to ask about this but I haven't heard back from him yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed this line to suffix=pathlib.Path(data_product.filename).suffix
so we should now be good in any case.
d900120
to
820dcf8
Compare
Large File Annex S3 instance, for example "cp", "tuc" or "ls". | ||
type: string | ||
pattern: "^[a-z0-9][.a-z0-9]*[a-z0-9]$" | ||
url_root: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, but I am pretty sure those are all things we control, and can specify. For example, we can say that the port will be port+1
(increment the communication port by one) and that it is not going to be https.
if not self.s3bucket: | ||
raise RuntimeError("S3 bucket not configured") | ||
|
||
filepath = pathlib.Path("/tmp") / self.s3bucket.name / key |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure you want to write this data to /tmp
? Data here can be wiped out by the OS at anytime. I think you probably want to write it somewhere else, maybe even to a configurable directory. This would allow us to, for instance, mount a nfs drive to be used as a backup.
It you really want to write to temporary storage, you might want to look into Python's tempfile module though, I am pretty sure that is not really what we want to do here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to a configurable directory.
async for chunk in response.aiter_bytes(): | ||
tmp_file.write(chunk) | ||
|
||
await self.evt_largeFileObjectAvailable.set_write( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, this event is reserved from when we write things to the S3 bucket so, in this case, don't publish it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! Thanks for the updates!
This patch adds functionality to the DREAM CSC to use the getNewDataProducts command from DREAM. For each new data product, it sets up a key on the LFA and uploads the file. If upload to the LFA fails, it saves the file in the
/tmp
directory.Note that this creates significant extra requirements for disk storage for the DREAM CSC. Given how rapidly DREAM generates data it may not be worth the effort to save data on the local disk as a fallback, and to just accept lost data whenever the LFA is unavailable.