Skip to content

Commit

Permalink
handle upload items in lists
Browse files Browse the repository at this point in the history
  • Loading branch information
denniswittich committed Feb 28, 2025
1 parent 026da82 commit 5e1c7d7
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 23 deletions.
8 changes: 4 additions & 4 deletions learning_loop_node/detector/detector_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,13 +298,13 @@ async def upload(sid, data: Dict) -> Optional[Dict]:

source = data.get('source', None)
creation_date = data.get('creation_date', None)

upload_priority = data.get('upload_priority', False)
self.log.debug('running upload via socketio. tags: %s, source: %s, creation_date: %s',
tags, source, creation_date)

loop = asyncio.get_event_loop()
try:
await loop.run_in_executor(None, self.outbox.save, data['image'], image_metadata, tags, source, creation_date)
await loop.run_in_executor(None, self.outbox.save, data['image'], image_metadata, tags, source, creation_date, upload_priority)
except Exception as e:
self.log.exception('could not upload via socketio')
return {'error': str(e)}
Expand Down Expand Up @@ -514,10 +514,10 @@ async def get_detections(self,
self.log.error('unknown autoupload value %s', autoupload)
return detections

async def upload_images(self, images: List[bytes], source: Optional[str], creation_date: Optional[str]):
async def upload_images(self, images: List[bytes], source: Optional[str], creation_date: Optional[str], upload_priority: bool = False):
loop = asyncio.get_event_loop()
for image in images:
await loop.run_in_executor(None, self.outbox.save, image, ImageMetadata(), ['picked_by_system'], source, creation_date)
await loop.run_in_executor(None, self.outbox.save, image, ImageMetadata(), ['picked_by_system'], source, creation_date, upload_priority)

def add_category_id_to_detections(self, model_info: ModelInformation, image_metadata: ImageMetadata):
def find_category_id_by_name(categories: List[Category], category_name: str):
Expand Down
60 changes: 46 additions & 14 deletions learning_loop_node/detector/outbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,27 @@ def __init__(self) -> None:
self.log.info('Outbox initialized with target_uri: %s', self.target_uri)

self.BATCH_SIZE = 20
self.MAX_UPLOAD_LENGTH = 1000 # only affects the `upload_folders` list
self.UPLOAD_INTERVAL_S = 5
self.UPLOAD_TIMEOUT_S = 30

self.shutdown_event: SyncEvent = Event()
self.upload_task: Optional[Task] = None

self.upload_counter = 0

self.priority_upload_folders: List[str] = []
self.upload_folders: List[str] = []

self.upload_folders = self.get_all_data_files() # make sure to upload all existing images (e.g. after a restart)

def save(self,
image: bytes,
image_metadata: Optional[ImageMetadata] = None,
tags: Optional[List[str]] = None,
source: Optional[str] = None,
creation_date: Optional[str] = None) -> None:
creation_date: Optional[str] = None,
upload_priority: bool = False) -> None:

if not self._is_valid_jpg(image):
self.log.error('Invalid jpg image')
Expand Down Expand Up @@ -89,6 +97,23 @@ def save(self,
else:
self.log.error('Could not rename %s to %s', tmp, self.path + '/' + identifier)

if upload_priority:
self.priority_upload_folders.append(self.path + '/' + identifier)
else:
self.upload_folders.insert(0, self.path + '/' + identifier)

# Cut off the upload list if it gets too long
if len(self.upload_folders) > self.MAX_UPLOAD_LENGTH:
items_to_drop = self.upload_folders[self.MAX_UPLOAD_LENGTH:]
self.log.info('Dropping %s images from upload list', len(items_to_drop))
try:
for item in items_to_drop:
shutil.rmtree(item)
self.log.debug('Deleted %s', item)
self.upload_folders = self.upload_folders[:self.MAX_UPLOAD_LENGTH]
except Exception:
self.log.exception('Failed to cut upload list')

def _is_valid_isoformat(self, date: Optional[str]) -> bool:
if date is None:
return False
Expand All @@ -98,9 +123,12 @@ def _is_valid_isoformat(self, date: Optional[str]) -> bool:
except Exception:
return False

def get_data_files(self) -> List[str]:
def get_all_data_files(self) -> List[str]:
return glob(f'{self.path}/*')

def get_upload_folders(self) -> List[str]:
return self.priority_upload_folders + self.upload_folders

def ensure_continuous_upload(self) -> None:
self.log.debug('start_continuous_upload')
if self._upload_process_alive():
Expand All @@ -115,26 +143,31 @@ async def _continuous_upload(self) -> None:
assert self.shutdown_event is not None
while not self.shutdown_event.is_set():
await self.upload()
await asyncio.sleep(5)
await asyncio.sleep(self.UPLOAD_INTERVAL_S)
self.log.info('continuous upload ended')

async def upload(self) -> None:
items = self.get_data_files()
items = self.get_upload_folders()
if not items:
self.log.debug('No images found to upload')
return

self.log.info('Found %s images to upload', len(items))
for i in range(0, len(items), self.BATCH_SIZE):
batch_items = items[i:i+self.BATCH_SIZE]
if self.shutdown_event.is_set():
break
try:
await self._upload_batch(batch_items)
except Exception:
self.log.exception('Could not upload files')
# NOTE (for reviewer):
# I changed the behaviour from trying to clear the outbox in each upload cycle to uploading the first BS images in each 5-sec cycle
# This simplifies the code and has the advantage that newer images or manual uploads are uploaded earlier

batch_items = items[:self.BATCH_SIZE]
try:
await self._upload_batch(batch_items)
except Exception:
self.log.exception('Could not upload files')

async def _upload_batch(self, items: List[str]) -> None:
"""
Uploads a batch of images to the server.
:param items: List of folders to upload (each folder contains an image and a metadata file)
"""

# NOTE: keys are not relevant for the server, but using a fixed key like 'files'
# results in a post failure on the first run of the test in a docker environment (WTF)
Expand Down Expand Up @@ -176,8 +209,7 @@ async def _upload_batch(self, items: List[str]) -> None:
await self._upload_batch(items[:len(items)//2])
await self._upload_batch(items[len(items)//2:])
elif response.status == 429:
self.log.error('Too many requests: %s', response.content)
await asyncio.sleep(5)
self.log.warning('Too many requests: %s', response.content)
else:
self.log.error('Could not upload images: %s', response.content)

Expand Down
7 changes: 4 additions & 3 deletions learning_loop_node/detector/rest/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
async def upload_image(request: Request,
files: List[UploadFile] = File(...),
source: Optional[str] = Query(None, description='Source of the image'),
creation_date: Optional[str] = Query(None, description='Creation date of the image')):
creation_date: Optional[str] = Query(None, description='Creation date of the image'),
upload_priority: bool = Query(False, description='Upload the image with priority')):
"""
Upload an image or multiple images to the learning loop.
Expand All @@ -21,9 +22,9 @@ async def upload_image(request: Request,
Example Usage
curl -X POST -F '[email protected]' "http://localhost:/upload?source=test&creation_date=2024-01-01T00:00:00"
curl -X POST -F '[email protected]' "http://localhost:/upload?source=test&creation_date=2024-01-01T00:00:00&upload_priority=true"
"""
raw_files = [await file.read() for file in files]
node: DetectorNode = request.app
await node.upload_images(raw_files, source, creation_date)
await node.upload_images(raw_files, source, creation_date, upload_priority=upload_priority)
return 200, "OK"
4 changes: 2 additions & 2 deletions learning_loop_node/tests/detector/test_outbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async def test_outbox_upload_is_successful(test_outbox: Outbox):
async def test_invalid_jpg_is_not_saved(test_outbox: Outbox):
invalid_bytes = b'invalid jpg'
test_outbox.save(invalid_bytes)
assert len(test_outbox.get_data_files()) == 0
assert len(test_outbox.get_upload_folders()) == 0


# ------------------------------ Helper functions --------------------------------------
Expand Down Expand Up @@ -90,7 +90,7 @@ def get_test_image_binary():

async def wait_for_outbox_count(outbox: Outbox, count: int, timeout: int = 10) -> bool:
for _ in range(timeout):
if len(outbox.get_data_files()) == count:
if len(outbox.get_upload_folders()) == count:
return True
await asyncio.sleep(1)
return False

0 comments on commit 5e1c7d7

Please sign in to comment.