Skip to content

Commit

Permalink
perf: use deque for upload_folders
Browse files Browse the repository at this point in the history
  • Loading branch information
NiklasNeugebauer committed Mar 3, 2025
1 parent e1d43ca commit fef0f12
Showing 1 changed file with 26 additions and 17 deletions.
43 changes: 26 additions & 17 deletions learning_loop_node/detector/outbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
import os
import shutil
from asyncio import Task
from collections import deque
from dataclasses import asdict
from datetime import datetime
from glob import glob
from io import BufferedReader, TextIOWrapper
from multiprocessing import Event
from multiprocessing.synchronize import Event as SyncEvent
from threading import Lock
from typing import List, Optional, Tuple, Union

import aiohttp
Expand Down Expand Up @@ -52,9 +54,13 @@ def __init__(self) -> None:
self.upload_counter = 0

self.priority_upload_folders: List[str] = []
self.upload_folders: List[str] = []
self.upload_folders: deque[str] = deque()
self.folders_lock = Lock()

self.upload_folders = self.get_all_data_files() # make sure to upload all existing images (e.g. after a restart)
# Load existing files into upload queue
with self.folders_lock:
for file in self.get_all_data_files():
self.upload_folders.append(file)

def save(self,
image: bytes,
Expand Down Expand Up @@ -98,24 +104,26 @@ def save(self,
self.log.error('Could not rename %s to %s', tmp, self.path + '/' + identifier)
return

if upload_priority:
self.priority_upload_folders.append(self.path + '/' + identifier)
else:
self.upload_folders.insert(0, self.path + '/' + identifier)
with self.folders_lock:
if upload_priority:
self.priority_upload_folders.append(self.path + '/' + identifier)
else:
self.upload_folders.appendleft(self.path + '/' + identifier)

# Cut off the upload list if it gets too long
if len(self.upload_folders) > self.MAX_UPLOAD_LENGTH:
items_to_drop = self.upload_folders[self.MAX_UPLOAD_LENGTH:]
self.log.info('Dropping %s images from upload list', len(items_to_drop))
self.upload_folders = self.upload_folders[:self.MAX_UPLOAD_LENGTH]
self._trim_upload_queue()

for item in items_to_drop:
try:
if os.path.exists(item):
def _trim_upload_queue(self) -> None:
if len(self.upload_folders) > self.MAX_UPLOAD_LENGTH:
excess = len(self.upload_folders) - self.MAX_UPLOAD_LENGTH
self.log.info('Dropping %s images from upload list', excess)
for _ in range(excess):
if self.upload_folders:
try:
item = self.upload_folders.pop()
shutil.rmtree(item)
self.log.debug('Deleted %s', item)
except Exception:
self.log.exception('Failed to delete %s', item)
except Exception:
self.log.exception('Failed to delete %s', item)

def _is_valid_isoformat(self, date: Optional[str]) -> bool:
if date is None:
Expand All @@ -130,7 +138,8 @@ def get_all_data_files(self) -> List[str]:
return glob(f'{self.path}/*')

def get_upload_folders(self) -> List[str]:
return self.priority_upload_folders + self.upload_folders
with self.folders_lock:
return self.priority_upload_folders + list(self.upload_folders)

def ensure_continuous_upload(self) -> None:
self.log.debug('start_continuous_upload')
Expand Down

0 comments on commit fef0f12

Please sign in to comment.