Skip to content

Commit

Permalink
refactor/ feat: make the outbox async (since we would now have rece c…
Browse files Browse the repository at this point in the history
…onditions)

typing: fix several type annotations arising from the changes
  • Loading branch information
NiklasNeugebauer committed Mar 3, 2025
1 parent e7aeb9d commit e9754a5
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 80 deletions.
6 changes: 2 additions & 4 deletions learning_loop_node/detector/detector_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
from abc import abstractmethod
from typing import List, Optional

import numpy as np

from ..data_classes import ImageMetadata, ModelInformation
from ..globals import GLOBALS
from .exceptions import NodeNeedsRestartError
Expand Down Expand Up @@ -44,13 +42,13 @@ def load_model_info_and_init_model(self):
def init(self):
"""Called when a (new) model was loaded. Initialize the model. Model information available via `self.model_info`"""

def evaluate_with_all_info(self, image: np.ndarray, tags: List[str], source: Optional[str] = None, creation_date: Optional[str] = None) -> ImageMetadata: # pylint: disable=unused-argument
def evaluate_with_all_info(self, image: bytes, tags: List[str], source: Optional[str] = None, creation_date: Optional[str] = None) -> ImageMetadata: # pylint: disable=unused-argument
"""Called by the detector node when an image should be evaluated (REST or SocketIO).
Tags, source come from the caller and may be used in this function.
By default, this function simply calls `evaluate`"""
return self.evaluate(image)

@abstractmethod
def evaluate(self, image: np.ndarray) -> ImageMetadata:
def evaluate(self, image: bytes) -> ImageMetadata:
"""Evaluate the image and return the detections.
The object should return empty detections if it is not initialized"""
25 changes: 11 additions & 14 deletions learning_loop_node/detector/detector_node.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import asyncio
import contextlib
import math
import os
import shutil
import subprocess
import sys
from dataclasses import asdict
from datetime import datetime
from threading import Thread
from typing import Dict, List, Optional
from typing import Dict, List, Optional, cast

import numpy as np
import socketio
Expand All @@ -30,7 +28,7 @@
from ..data_exchanger import DataExchanger, DownloadError
from ..enums import OperationMode, VersionMode
from ..globals import GLOBALS
from ..helpers import environment_reader
from ..helpers import background_tasks, environment_reader, run
from ..node import Node
from .detector_logic import DetectorLogic
from .exceptions import NodeNeedsRestartError
Expand Down Expand Up @@ -227,7 +225,7 @@ def setup_sio_server(self) -> None:
async def detect(sid, data: Dict) -> Dict:
try:
det = await self.get_detections(
raw_image=np.frombuffer(data['image'], np.uint8),
raw_image=cast(bytes, np.frombuffer(data['image'], np.uint8)),
camera_id=data.get('camera-id', None) or data.get('mac', None),
tags=data.get('tags', []),
source=data.get('source', None),
Expand Down Expand Up @@ -480,7 +478,7 @@ def reload(self, reason: str):
self.log.error('could not reload app')

async def get_detections(self,
raw_image: np.ndarray,
raw_image: bytes,
camera_id: Optional[str],
tags: List[str],
source: Optional[str] = None,
Expand All @@ -492,20 +490,20 @@ async def get_detections(self,
It can be converted e.g. using cv2.imdecode(raw_image, cv2.IMREAD_COLOR)"""

await self.detection_lock.acquire()
loop = asyncio.get_event_loop()
detections = await loop.run_in_executor(None, self.detector_logic.evaluate_with_all_info, raw_image, tags, source, creation_date)
detections = await run.io_bound(self.detector_logic.evaluate_with_all_info, raw_image, tags, source, creation_date)
self.detection_lock.release()

fix_shape_detections(detections)
n_bo, n_cl = len(detections.box_detections), len(detections.classification_detections)
n_po, n_se = len(detections.point_detections), len(detections.segmentation_detections)
self.log.debug('Detected: %d boxes, %d points, %d segs, %d classes', n_bo, n_po, n_se, n_cl)

if autoupload is None or autoupload == 'filtered': # NOTE default is filtered
Thread(target=self.relevance_filter.may_upload_detections,
args=(detections, camera_id, raw_image, tags, source, creation_date)).start()
autoupload = autoupload or 'filtered'
if autoupload == 'filtered' and camera_id is not None:
background_tasks.create(self.relevance_filter.may_upload_detections(
detections, camera_id, raw_image, tags, source, creation_date))
elif autoupload == 'all':
Thread(target=self.outbox.save, args=(raw_image, detections, tags, source, creation_date)).start()
background_tasks.create(self.outbox.save(raw_image, detections, tags, source, creation_date))
elif autoupload == 'disabled':
pass
else:
Expand All @@ -531,9 +529,8 @@ async def upload_images(

tags.append('picked_by_system')

loop = asyncio.get_event_loop()
for image in images:
await loop.run_in_executor(None, self.outbox.save, image, image_metadata, tags, source, creation_date, upload_priority)
await self.outbox.save(image, image_metadata, tags, source, creation_date, upload_priority)

def add_category_id_to_detections(self, model_info: ModelInformation, image_metadata: ImageMetadata):
def find_category_id_by_name(categories: List[Category], category_name: str):
Expand Down
18 changes: 9 additions & 9 deletions learning_loop_node/detector/inbox_filter/relevance_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ def __init__(self, outbox: Outbox) -> None:
self.cam_histories: Dict[str, CamObservationHistory] = {}
self.outbox: Outbox = outbox

def may_upload_detections(self,
image_metadata: ImageMetadata,
cam_id: str,
raw_image: bytes,
tags: List[str],
source: Optional[str] = None,
creation_date: Optional[str] = None
) -> List[str]:
async def may_upload_detections(self,
image_metadata: ImageMetadata,
cam_id: str,
raw_image: bytes,
tags: List[str],
source: Optional[str] = None,
creation_date: Optional[str] = None
) -> List[str]:
for group in self.cam_histories.values():
group.forget_old_detections()

Expand All @@ -30,5 +30,5 @@ def may_upload_detections(self,
if len(causes) > 0:
tags = tags if tags is not None else []
tags.extend(causes)
self.outbox.save(raw_image, image_metadata, tags, source, creation_date)
await self.outbox.save(raw_image, image_metadata, tags, source, creation_date)
return causes
115 changes: 68 additions & 47 deletions learning_loop_node/detector/outbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from multiprocessing import Event
from multiprocessing.synchronize import Event as SyncEvent
from threading import Lock
from typing import List, Optional, Tuple, Union
from typing import List, Optional, Tuple, TypeVar, Union

import aiohttp
import PIL
Expand All @@ -23,7 +23,9 @@
from ..data_classes import ImageMetadata
from ..enums import OutboxMode
from ..globals import GLOBALS
from ..helpers import environment_reader
from ..helpers import environment_reader, run

T = TypeVar('T')


class Outbox():
Expand Down Expand Up @@ -57,20 +59,18 @@ def __init__(self) -> None:
self.upload_folders: deque[str] = deque()
self.folders_lock = Lock()

# Load existing files into upload queue
with self.folders_lock:
for file in self.get_all_data_files():
self.upload_folders.append(file)

def save(self,
image: bytes,
image_metadata: Optional[ImageMetadata] = None,
tags: Optional[List[str]] = None,
source: Optional[str] = None,
creation_date: Optional[str] = None,
upload_priority: bool = False) -> None:

if not self._is_valid_jpg(image):
for file in self.get_all_data_files():
self.upload_folders.append(file)

async def save(self,
image: bytes,
image_metadata: Optional[ImageMetadata] = None,
tags: Optional[List[str]] = None,
source: Optional[str] = None,
creation_date: Optional[str] = None,
upload_priority: bool = False) -> None:

if not await run.io_bound(self._is_valid_jpg, image):
self.log.error('Invalid jpg image')
return

Expand All @@ -79,9 +79,30 @@ def save(self,
if not tags:
tags = []
identifier = datetime.now().isoformat(sep='_', timespec='microseconds')
if os.path.exists(self.path + '/' + identifier):
self.log.error('Directory with identifier %s already exists', identifier)

try:
await run.io_bound(self.save_files_to_disk, identifier, image, image_metadata, tags, source, creation_date)
except Exception as e:
self.log.error('Failed to save files for image %s: %s', identifier, e)
return

if upload_priority:
self.priority_upload_folders.append(self.path + '/' + identifier)
else:
self.upload_folders.appendleft(self.path + '/' + identifier)

await self._trim_upload_queue()

async def save_files_to_disk(self,
identifier: str,
image: bytes,
image_metadata: ImageMetadata,
tags: List[str],
source: Optional[str],
creation_date: Optional[str]) -> None:
if os.path.exists(self.path + '/' + identifier):
raise FileExistsError(f'Directory with identifier {identifier} already exists')

tmp = f'{GLOBALS.data_folder}/tmp/{identifier}'
image_metadata.tags = tags
if self._is_valid_isoformat(creation_date):
Expand All @@ -90,6 +111,7 @@ def save(self,
image_metadata.created = identifier

image_metadata.source = source or 'unknown'

os.makedirs(tmp, exist_ok=True)

with open(tmp + f'/image_{identifier}.json', 'w') as f:
Expand All @@ -98,32 +120,34 @@ def save(self,
with open(tmp + f'/image_{identifier}.jpg', 'wb') as f:
f.write(image)

if os.path.exists(tmp):
os.rename(tmp, self.path + '/' + identifier) # NOTE rename is atomic so upload can run in parallel
else:
if not os.path.exists(tmp):
self.log.error('Could not rename %s to %s', tmp, self.path + '/' + identifier)
return

with self.folders_lock:
if upload_priority:
self.priority_upload_folders.append(self.path + '/' + identifier)
else:
self.upload_folders.appendleft(self.path + '/' + identifier)
raise FileNotFoundError(f'Could not rename {tmp} to {self.path + "/" + identifier}')
os.rename(tmp, self.path + '/' + identifier)

self._trim_upload_queue()

def _trim_upload_queue(self) -> None:
async def _trim_upload_queue(self) -> None:
if len(self.upload_folders) > self.MAX_UPLOAD_LENGTH:
excess = len(self.upload_folders) - self.MAX_UPLOAD_LENGTH
self.log.info('Dropping %s images from upload list', excess)

folders_to_delete = []
for _ in range(excess):
if self.upload_folders:
try:
item = self.upload_folders.pop()
shutil.rmtree(item)
self.log.debug('Deleted %s', item)
folder = self.upload_folders.pop()
folders_to_delete.append(folder)
except Exception:
self.log.exception('Failed to delete %s', item)
self.log.exception('Failed to get item from upload_folders')

await run.io_bound(self.delete_folders, folders_to_delete)

def delete_folders(self, folders_to_delete: List[str]) -> None:
for folder in folders_to_delete:
try:
shutil.rmtree(folder)
self.log.debug('Deleted %s', folder)
except Exception:
self.log.exception('Failed to delete %s', folder)

def _is_valid_isoformat(self, date: Optional[str]) -> bool:
if date is None:
Expand Down Expand Up @@ -175,13 +199,13 @@ async def upload(self) -> None:
except Exception:
self.log.exception('Could not upload files')

def _clear_item(self, item: str) -> None:
if item in self.upload_folders:
self.upload_folders.remove(item)
if item in self.priority_upload_folders:
self.priority_upload_folders.remove(item)
async def _clear_item(self, item: str) -> None:
try:
shutil.rmtree(item, ignore_errors=True)
if item in self.upload_folders:
self.upload_folders.remove(item)
if item in self.priority_upload_folders:
self.priority_upload_folders.remove(item)
await run.io_bound(shutil.rmtree, item, ignore_errors=True)
self.log.debug('Deleted %s', item)
except Exception:
self.log.exception('Failed to delete %s', item)
Expand All @@ -192,13 +216,10 @@ async def _upload_batch(self, items: List[str]) -> None:
:param items: List of folders to upload (each folder contains an image and a metadata file)
"""

# NOTE: keys are not relevant for the server, but using a fixed key like 'files'
# results in a post failure on the first run of the test in a docker environment (WTF)

data: List[Tuple[str, Union[TextIOWrapper, BufferedReader]]] = []
for item in items:
if not os.path.exists(item):
self._clear_item(item)
await self._clear_item(item)
continue
identifier = os.path.basename(item)
data.append(('files', open(f'{item}/image_{identifier}.json', 'r')))
Expand All @@ -219,14 +240,14 @@ async def _upload_batch(self, items: List[str]) -> None:
self.upload_counter += len(items)
self.log.debug('Uploaded %s images', len(items))
for item in items:
self._clear_item(item)
await self._clear_item(item)
self.log.debug('Cleared %s images', len(items))
return

if response.status == 422:
if len(items) == 1:
self.log.error('Broken content in image: %s\n Skipping.', items[0])
self._clear_item(items[0])
await self._clear_item(items[0])
return

self.log.exception('Broken content in batch. Splitting and retrying')
Expand Down
4 changes: 2 additions & 2 deletions learning_loop_node/detector/rest/detect.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING, Optional, cast

import numpy as np
from fastapi import APIRouter, File, Header, Request, UploadFile
Expand Down Expand Up @@ -42,7 +42,7 @@ async def http_detect(

try:
app: 'DetectorNode' = request.app
detections = await app.get_detections(raw_image=np_image,
detections = await app.get_detections(raw_image=cast(bytes, np_image),
camera_id=camera_id or mac or None,
tags=tags.split(',') if tags else [],
source=source,
Expand Down
8 changes: 4 additions & 4 deletions learning_loop_node/tests/detector/test_outbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async def test_outbox():
@pytest.mark.asyncio
async def test_set_outbox_mode(test_outbox: Outbox):
await test_outbox.set_mode('stopped')
test_outbox.save(get_test_image_binary())
await test_outbox.save(get_test_image_binary())
assert await wait_for_outbox_count(test_outbox, 1)
await asyncio.sleep(6)
assert await wait_for_outbox_count(test_outbox, 1), 'File was cleared even though outbox should be stopped'
Expand All @@ -41,9 +41,9 @@ async def test_set_outbox_mode(test_outbox: Outbox):

@pytest.mark.asyncio
async def test_outbox_upload_is_successful(test_outbox: Outbox):
test_outbox.save(get_test_image_binary())
await test_outbox.save(get_test_image_binary())
await asyncio.sleep(1)
test_outbox.save(get_test_image_binary())
await test_outbox.save(get_test_image_binary())
assert await wait_for_outbox_count(test_outbox, 2)
await test_outbox.upload()
assert await wait_for_outbox_count(test_outbox, 0)
Expand All @@ -53,7 +53,7 @@ async def test_outbox_upload_is_successful(test_outbox: Outbox):
@pytest.mark.asyncio
async def test_invalid_jpg_is_not_saved(test_outbox: Outbox):
invalid_bytes = b'invalid jpg'
test_outbox.save(invalid_bytes)
await test_outbox.save(invalid_bytes)
assert len(test_outbox.get_upload_folders()) == 0


Expand Down

0 comments on commit e9754a5

Please sign in to comment.