Skip to content

Commit

Permalink
feature: use two subfolders to reconstruct priority and normal after …
Browse files Browse the repository at this point in the history
…restart
  • Loading branch information
denniswittich committed Mar 4, 2025
1 parent df37fd9 commit ec981e9
Showing 1 changed file with 14 additions and 9 deletions.
23 changes: 14 additions & 9 deletions learning_loop_node/detector/outbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ def __init__(self) -> None:
self.upload_folders: deque[str] = deque()
self.folders_lock = Lock()

for file in glob(f'{self.path}/*'):
for file in glob(f'{self.path}/priority/*'):
self.priority_upload_folders.append(file)
for file in glob(f'{self.path}/normal/*'):
self.upload_folders.append(file)

async def save(self,
Expand All @@ -90,15 +92,15 @@ async def save(self,
identifier = datetime.now().isoformat(sep='_', timespec='microseconds')

try:
await run.io_bound(self._save_files_to_disk, identifier, image, image_metadata, tags, source, creation_date)
await run.io_bound(self._save_files_to_disk, identifier, image, image_metadata, tags, source, creation_date, upload_priority)
except Exception as e:
self.log.error('Failed to save files for image %s: %s', identifier, e)
return

if upload_priority:
self.priority_upload_folders.append(self.path + '/' + identifier)
self.priority_upload_folders.append(f'{self.path}/priority/{identifier}')
else:
self.upload_folders.appendleft(self.path + '/' + identifier)
self.upload_folders.appendleft(f'{self.path}/normal/{identifier}')

await self._trim_upload_queue()

Expand All @@ -108,8 +110,11 @@ def _save_files_to_disk(self,
image_metadata: ImageMetadata,
tags: List[str],
source: Optional[str],
creation_date: Optional[str]) -> None:
if os.path.exists(self.path + '/' + identifier):
creation_date: Optional[str],
upload_priority: bool) -> None:
subpath = 'priority' if upload_priority else 'normal'
full_path = f'{self.path}/{subpath}/{identifier}'
if os.path.exists(full_path):
raise FileExistsError(f'Directory with identifier {identifier} already exists')

tmp = f'{GLOBALS.data_folder}/tmp/{identifier}'
Expand All @@ -130,9 +135,9 @@ def _save_files_to_disk(self,
f.write(image)

if not os.path.exists(tmp):
self.log.error('Could not rename %s to %s', tmp, self.path + '/' + identifier)
raise FileNotFoundError(f'Could not rename {tmp} to {self.path + "/" + identifier}')
os.rename(tmp, self.path + '/' + identifier)
self.log.error('Could not rename %s to %s', tmp, full_path)
raise FileNotFoundError(f'Could not rename {tmp} to {full_path}')
os.rename(tmp, full_path)

async def _trim_upload_queue(self) -> None:
if len(self.upload_folders) > self.MAX_UPLOAD_LENGTH:
Expand Down

0 comments on commit ec981e9

Please sign in to comment.