From 73221a0009fb86be163941003f2edbc7102a101e Mon Sep 17 00:00:00 2001 From: "Dr. Dennis Wittich" Date: Wed, 5 Mar 2025 11:22:09 +0100 Subject: [PATCH] Improved image upload (#58) Implementation of new upload strategy. Basic concept: Hold a list of items to upload which is cut off if it gets to long. In each update cycle the first BS items are uploaded. There is a priority queue with images that are not dropped and always uploaded fist. Fixes an error due to an unclosed clientsession where we forgot to await the response. BREAKING CHANGE: The abstract method `evaluate` which needs to be implemented by detectors now provides the raw jpg image bytes instead of a semi-converted np.array. This means if the detector uses cv2, reading the file can be done with `cv_image = cv2.imdecode(np.frombuffer(image, np.uint8), cv2.IMREAD_COLOR)`. Alternatively PIL could be used as well. --------- Co-authored-by: Niklas Neugebauer --- README.md | 69 ++++--- learning_loop_node/detector/detector_logic.py | 6 +- learning_loop_node/detector/detector_node.py | 74 +++++--- .../detector/inbox_filter/relevance_filter.py | 20 +- learning_loop_node/detector/outbox.py | 178 +++++++++++++----- learning_loop_node/detector/rest/detect.py | 6 +- learning_loop_node/detector/rest/upload.py | 7 +- .../helpers/background_tasks.py | 78 ++++++++ learning_loop_node/helpers/run.py | 21 +++ learning_loop_node/node.py | 15 +- .../tests/annotator/conftest.py | 13 +- .../tests/annotator/test_annotator_node.py | 12 +- .../test_unexpected_observations_count.py | 7 +- .../detector/test_client_communication.py | 24 +-- .../tests/detector/test_outbox.py | 23 +-- learning_loop_node/tests/general/conftest.py | 10 +- 16 files changed, 379 insertions(+), 184 deletions(-) create mode 100644 learning_loop_node/helpers/background_tasks.py create mode 100644 learning_loop_node/helpers/run.py diff --git a/README.md b/README.md index 01c727dd..4bb4872b 100644 --- a/README.md +++ b/README.md @@ -65,6 +65,9 @@ The detector also has a sio **upload endpoint** that can be used to upload image - `image`: the image data in jpg format - `tags`: a list of strings. If not provided the tag is `picked_by_system` - `detections`: a dictionary representing the detections. UUIDs for the classes are automatically determined based on the category names. This field is optional. If not provided, no detections are uploaded. +- `source`: optional source identifier for the image +- `creation_date`: optional creation date for the image +- `upload_priority`: boolean flag to prioritize the upload (defaults to False) The endpoint returns None if the upload was successful and an error message otherwise. @@ -147,58 +150,52 @@ Upload a model with The model should now be available for the format 'format_a' `curl "https://learning-loop.ai/api/zauberzeug/projects/demo/models?format=format_a"` -```` - -{ -"models": [ +```json { -"id": "3c20d807-f71c-40dc-a996-8a8968aa5431", -"version": "4.0", -"formats": [ -"format_a" -], -"created": "2021-06-01T06:28:21.289092", -"comment": "uploaded at 2021-06-01 06:28:21.288442", -... -} -] + "models": [ + { + "id": "3c20d807-f71c-40dc-a996-8a8968aa5431", + "version": "4.0", + "formats": [ + "format_a" + ], + "created": "2021-06-01T06:28:21.289092", + "comment": "uploaded at 2021-06-01 06:28:21.288442", + ... + } + ] } - ``` but not in the format_b `curl "https://learning-loop.ai/api/zauberzeug/projects/demo/models?format=format_b"` -``` - +```json { -"models": [] + "models": [] } - ``` Connect the Node to the Learning Loop by simply starting the container. After a short time the converted model should be available as well. `curl https://learning-loop.ai/api/zauberzeug/projects/demo/models?format=format_b` -``` - +```json { -"models": [ -{ -"id": "3c20d807-f71c-40dc-a996-8a8968aa5431", -"version": "4.0", -"formats": [ -"format_a", -"format_b", -], -"created": "2021-06-01T06:28:21.289092", -"comment": "uploaded at 2021-06-01 06:28:21.288442", -... + "models": [ + { + "id": "3c20d807-f71c-40dc-a996-8a8968aa5431", + "version": "4.0", + "formats": [ + "format_a", + "format_b", + ], + "created": "2021-06-01T06:28:21.289092", + "comment": "uploaded at 2021-06-01 06:28:21.288442", + ... + } + ] } -] -} - ``` ## About Models (the currency between Nodes) @@ -217,5 +214,3 @@ After a short time the converted model should be available as well. - Nodes add properties to `model.json`, which contains all the information which are needed by subsequent nodes. These are typically the properties: - `resolution`: resolution in which the model expects images (as `int`, since the resolution is mostly square - later, ` resolution_x`` resolution_y ` would also be conceivable or `resolutions` to give a list of possible resolutions) - `categories`: list of categories with name, id, (later also type), in the order in which they are used by the model -- this is neccessary to be robust about renamings -``` -```` diff --git a/learning_loop_node/detector/detector_logic.py b/learning_loop_node/detector/detector_logic.py index 5af2b83f..81d321ef 100644 --- a/learning_loop_node/detector/detector_logic.py +++ b/learning_loop_node/detector/detector_logic.py @@ -2,8 +2,6 @@ from abc import abstractmethod from typing import List, Optional -import numpy as np - from ..data_classes import ImageMetadata, ModelInformation from ..globals import GLOBALS from .exceptions import NodeNeedsRestartError @@ -44,13 +42,13 @@ def load_model_info_and_init_model(self): def init(self): """Called when a (new) model was loaded. Initialize the model. Model information available via `self.model_info`""" - def evaluate_with_all_info(self, image: np.ndarray, tags: List[str], source: Optional[str] = None, creation_date: Optional[str] = None) -> ImageMetadata: # pylint: disable=unused-argument + def evaluate_with_all_info(self, image: bytes, tags: List[str], source: Optional[str] = None, creation_date: Optional[str] = None) -> ImageMetadata: # pylint: disable=unused-argument """Called by the detector node when an image should be evaluated (REST or SocketIO). Tags, source come from the caller and may be used in this function. By default, this function simply calls `evaluate`""" return self.evaluate(image) @abstractmethod - def evaluate(self, image: np.ndarray) -> ImageMetadata: + def evaluate(self, image: bytes) -> ImageMetadata: """Evaluate the image and return the detections. The object should return empty detections if it is not initialized""" diff --git a/learning_loop_node/detector/detector_node.py b/learning_loop_node/detector/detector_node.py index e11bdc5f..202a231b 100644 --- a/learning_loop_node/detector/detector_node.py +++ b/learning_loop_node/detector/detector_node.py @@ -1,14 +1,12 @@ import asyncio import contextlib -import math import os import shutil import subprocess import sys from dataclasses import asdict from datetime import datetime -from threading import Thread -from typing import Dict, List, Optional +from typing import Dict, List, Optional, cast import numpy as np import socketio @@ -30,7 +28,7 @@ from ..data_exchanger import DataExchanger, DownloadError from ..enums import OperationMode, VersionMode from ..globals import GLOBALS -from ..helpers import environment_reader +from ..helpers import background_tasks, environment_reader, run from ..node import Node from .detector_logic import DetectorLogic from .exceptions import NodeNeedsRestartError @@ -227,7 +225,7 @@ def setup_sio_server(self) -> None: async def detect(sid, data: Dict) -> Dict: try: det = await self.get_detections( - raw_image=np.frombuffer(data['image'], np.uint8), + raw_image=data['image'], camera_id=data.get('camera-id', None) or data.get('mac', None), tags=data.get('tags', []), source=data.get('source', None), @@ -279,9 +277,10 @@ async def set_outbox_mode(sid, data: str) -> Dict: return {'error': str(e)} @self.sio.event - async def upload(sid, data: Dict) -> Optional[Dict]: - '''upload an image with detections''' + async def upload(sid, data: Dict) -> Dict: + """Upload an image with detections""" + self.log.debug('Processing upload via socketio.') detection_data = data.get('detections', {}) if detection_data and self.detector_logic.model_info is not None: try: @@ -293,22 +292,19 @@ async def upload(sid, data: Dict) -> Optional[Dict]: else: image_metadata = ImageMetadata() - tags = data.get('tags', []) - tags.append('picked_by_system') - - source = data.get('source', None) - creation_date = data.get('creation_date', None) - - self.log.debug('running upload via socketio. tags: %s, source: %s, creation_date: %s', - tags, source, creation_date) - - loop = asyncio.get_event_loop() try: - await loop.run_in_executor(None, self.outbox.save, data['image'], image_metadata, tags, source, creation_date) + await self.upload_images( + images=[data['image']], + image_metadata=image_metadata, + tags=data.get('tags', []), + source=data.get('source', None), + creation_date=data.get('creation_date', None), + upload_priority=data.get('upload_priority', False) + ) except Exception as e: self.log.exception('could not upload via socketio') return {'error': str(e)} - return None + return {'status': 'OK'} @self.sio.event def connect(sid, environ, auth) -> None: @@ -469,7 +465,7 @@ async def set_operation_mode(self, mode: OperationMode): self.log.warning('Operation mode set to %s, but sync failed: %s', mode, e) def reload(self, reason: str): - '''provide a cause for the reload''' + """provide a cause for the reload""" self.log.info('########## reloading app because %s', reason) if os.path.isfile('/app/app_code/restart/restart.py'): @@ -482,7 +478,7 @@ def reload(self, reason: str): self.log.error('could not reload app') async def get_detections(self, - raw_image: np.ndarray, + raw_image: bytes, camera_id: Optional[str], tags: List[str], source: Optional[str] = None, @@ -494,8 +490,7 @@ async def get_detections(self, It can be converted e.g. using cv2.imdecode(raw_image, cv2.IMREAD_COLOR)""" await self.detection_lock.acquire() - loop = asyncio.get_event_loop() - detections = await loop.run_in_executor(None, self.detector_logic.evaluate_with_all_info, raw_image, tags, source, creation_date) + detections = await run.io_bound(self.detector_logic.evaluate_with_all_info, raw_image, tags, source, creation_date) self.detection_lock.release() fix_shape_detections(detections) @@ -503,21 +498,40 @@ async def get_detections(self, n_po, n_se = len(detections.point_detections), len(detections.segmentation_detections) self.log.debug('Detected: %d boxes, %d points, %d segs, %d classes', n_bo, n_po, n_se, n_cl) - if autoupload is None or autoupload == 'filtered': # NOTE default is filtered - Thread(target=self.relevance_filter.may_upload_detections, - args=(detections, camera_id, raw_image, tags, source, creation_date)).start() + autoupload = autoupload or 'filtered' + if autoupload == 'filtered' and camera_id is not None: + background_tasks.create(self.relevance_filter.may_upload_detections( + detections, camera_id, raw_image, tags, source, creation_date + )) elif autoupload == 'all': - Thread(target=self.outbox.save, args=(raw_image, detections, tags, source, creation_date)).start() + background_tasks.create(self.outbox.save(raw_image, detections, tags, source, creation_date)) elif autoupload == 'disabled': pass else: self.log.error('unknown autoupload value %s', autoupload) return detections - async def upload_images(self, images: List[bytes], source: Optional[str], creation_date: Optional[str]): - loop = asyncio.get_event_loop() + async def upload_images( + self, *, + images: List[bytes], + image_metadata: Optional[ImageMetadata] = None, + tags: Optional[List[str]] = None, + source: Optional[str], + creation_date: Optional[str], + upload_priority: bool = False + ) -> None: + """Save images to the outbox using an asyncio executor. + Used by SIO and REST upload endpoints.""" + + if image_metadata is None: + image_metadata = ImageMetadata() + if tags is None: + tags = [] + + tags.append('picked_by_system') + for image in images: - await loop.run_in_executor(None, self.outbox.save, image, ImageMetadata(), ['picked_by_system'], source, creation_date) + await self.outbox.save(image, image_metadata, tags, source, creation_date, upload_priority) def add_category_id_to_detections(self, model_info: ModelInformation, image_metadata: ImageMetadata): def find_category_id_by_name(categories: List[Category], category_name: str): diff --git a/learning_loop_node/detector/inbox_filter/relevance_filter.py b/learning_loop_node/detector/inbox_filter/relevance_filter.py index e072cfa9..81469ac4 100644 --- a/learning_loop_node/detector/inbox_filter/relevance_filter.py +++ b/learning_loop_node/detector/inbox_filter/relevance_filter.py @@ -11,14 +11,16 @@ def __init__(self, outbox: Outbox) -> None: self.cam_histories: Dict[str, CamObservationHistory] = {} self.outbox: Outbox = outbox - def may_upload_detections(self, - image_metadata: ImageMetadata, - cam_id: str, - raw_image: bytes, - tags: List[str], - source: Optional[str] = None, - creation_date: Optional[str] = None - ) -> List[str]: + async def may_upload_detections(self, + image_metadata: ImageMetadata, + cam_id: str, + raw_image: bytes, + tags: List[str], + source: Optional[str] = None, + creation_date: Optional[str] = None) -> List[str]: + """Check if the detection should be uploaded to the outbox. + If so, upload it and return the list of causes for the upload. + """ for group in self.cam_histories.values(): group.forget_old_detections() @@ -30,5 +32,5 @@ def may_upload_detections(self, if len(causes) > 0: tags = tags if tags is not None else [] tags.extend(causes) - self.outbox.save(raw_image, image_metadata, tags, source, creation_date) + await self.outbox.save(raw_image, image_metadata, tags, source, creation_date) return causes diff --git a/learning_loop_node/detector/outbox.py b/learning_loop_node/detector/outbox.py index 920f59d8..9fe5699e 100644 --- a/learning_loop_node/detector/outbox.py +++ b/learning_loop_node/detector/outbox.py @@ -5,13 +5,15 @@ import os import shutil from asyncio import Task +from collections import deque from dataclasses import asdict from datetime import datetime from glob import glob from io import BufferedReader, TextIOWrapper from multiprocessing import Event from multiprocessing.synchronize import Event as SyncEvent -from typing import List, Optional, Tuple, Union +from threading import Lock +from typing import List, Optional, Tuple, TypeVar, Union import aiohttp import PIL @@ -21,14 +23,27 @@ from ..data_classes import ImageMetadata from ..enums import OutboxMode from ..globals import GLOBALS -from ..helpers import environment_reader +from ..helpers import environment_reader, run + +T = TypeVar('T') class Outbox(): + """ + Outbox is a class that handles the uploading of images to the learning loop. + It uploads images from an internal queue (lifo) in batches of 20 every 5 seconds. + It handles upload failures by splitting the upload into two smaller batches until the problematic image is identified - and removed. + Any image can be saved to the normal or the priority queue. + Images in the priority queue are uploaded first. + The total queue length is limited to 1000 images. + """ + def __init__(self) -> None: self.log = logging.getLogger() self.path = f'{GLOBALS.data_folder}/outbox' os.makedirs(self.path, exist_ok=True) + os.makedirs(f'{self.path}/priority', exist_ok=True) + os.makedirs(f'{self.path}/normal', exist_ok=True) self.log = logging.getLogger() host = environment_reader.host() @@ -42,6 +57,8 @@ def __init__(self) -> None: self.log.info('Outbox initialized with target_uri: %s', self.target_uri) self.BATCH_SIZE = 20 + self.MAX_UPLOAD_LENGTH = 1000 # only affects the `upload_folders` list + self.UPLOAD_INTERVAL_S = 5 self.UPLOAD_TIMEOUT_S = 30 self.shutdown_event: SyncEvent = Event() @@ -49,15 +66,24 @@ def __init__(self) -> None: self.upload_counter = 0 - def save(self, - image: bytes, - image_metadata: Optional[ImageMetadata] = None, - tags: Optional[List[str]] = None, - source: Optional[str] = None, - creation_date: Optional[str] = None - ) -> None: + self.priority_upload_folders: List[str] = [] + self.upload_folders: deque[str] = deque() + self.folders_lock = Lock() + + for file in glob(f'{self.path}/priority/*'): + self.priority_upload_folders.append(file) + for file in glob(f'{self.path}/normal/*'): + self.upload_folders.append(file) - if not self._is_valid_jpg(image): + async def save(self, + image: bytes, + image_metadata: Optional[ImageMetadata] = None, + tags: Optional[List[str]] = None, + source: Optional[str] = None, + creation_date: Optional[str] = None, + upload_priority: bool = False) -> None: + + if not await run.io_bound(self._is_valid_jpg, image): self.log.error('Invalid jpg image') return @@ -66,9 +92,33 @@ def save(self, if not tags: tags = [] identifier = datetime.now().isoformat(sep='_', timespec='microseconds') - if os.path.exists(self.path + '/' + identifier): - self.log.error('Directory with identifier %s already exists', identifier) + + try: + await run.io_bound(self._save_files_to_disk, identifier, image, image_metadata, tags, source, creation_date, upload_priority) + except Exception as e: + self.log.error('Failed to save files for image %s: %s', identifier, e) return + + if upload_priority: + self.priority_upload_folders.append(f'{self.path}/priority/{identifier}') + else: + self.upload_folders.appendleft(f'{self.path}/normal/{identifier}') + + await self._trim_upload_queue() + + def _save_files_to_disk(self, + identifier: str, + image: bytes, + image_metadata: ImageMetadata, + tags: List[str], + source: Optional[str], + creation_date: Optional[str], + upload_priority: bool) -> None: + subpath = 'priority' if upload_priority else 'normal' + full_path = f'{self.path}/{subpath}/{identifier}' + if os.path.exists(full_path): + raise FileExistsError(f'Directory with identifier {identifier} already exists') + tmp = f'{GLOBALS.data_folder}/tmp/{identifier}' image_metadata.tags = tags if self._is_valid_isoformat(creation_date): @@ -77,6 +127,7 @@ def save(self, image_metadata.created = identifier image_metadata.source = source or 'unknown' + os.makedirs(tmp, exist_ok=True) with open(tmp + f'/image_{identifier}.json', 'w') as f: @@ -85,10 +136,34 @@ def save(self, with open(tmp + f'/image_{identifier}.jpg', 'wb') as f: f.write(image) - if os.path.exists(tmp): - os.rename(tmp, self.path + '/' + identifier) # NOTE rename is atomic so upload can run in parallel - else: - self.log.error('Could not rename %s to %s', tmp, self.path + '/' + identifier) + if not os.path.exists(tmp): + self.log.error('Could not rename %s to %s', tmp, full_path) + raise FileNotFoundError(f'Could not rename {tmp} to {full_path}') + os.rename(tmp, full_path) + + async def _trim_upload_queue(self) -> None: + if len(self.upload_folders) > self.MAX_UPLOAD_LENGTH: + excess = len(self.upload_folders) - self.MAX_UPLOAD_LENGTH + self.log.info('Dropping %s images from upload list', excess) + + folders_to_delete = [] + for _ in range(excess): + if self.upload_folders: + try: + folder = self.upload_folders.pop() + folders_to_delete.append(folder) + except Exception: + self.log.exception('Failed to get item from upload_folders') + + await run.io_bound(self._delete_folders, folders_to_delete) + + def _delete_folders(self, folders_to_delete: List[str]) -> None: + for folder in folders_to_delete: + try: + shutil.rmtree(folder) + self.log.debug('Deleted %s', folder) + except Exception: + self.log.exception('Failed to delete %s', folder) def _is_valid_isoformat(self, date: Optional[str]) -> bool: if date is None: @@ -99,10 +174,11 @@ def _is_valid_isoformat(self, date: Optional[str]) -> bool: except Exception: return False - def get_data_files(self): - return glob(f'{self.path}/*') + def get_upload_folders(self) -> List[str]: + with self.folders_lock: + return self.priority_upload_folders + list(self.upload_folders) - def ensure_continuous_upload(self): + def ensure_continuous_upload(self) -> None: self.log.debug('start_continuous_upload') if self._upload_process_alive(): self.log.debug('Upload thread already running') @@ -111,44 +187,58 @@ def ensure_continuous_upload(self): self.shutdown_event.clear() self.upload_task = asyncio.create_task(self._continuous_upload()) - async def _continuous_upload(self): + async def _continuous_upload(self) -> None: self.log.info('continuous upload started') assert self.shutdown_event is not None while not self.shutdown_event.is_set(): await self.upload() - await asyncio.sleep(5) + await asyncio.sleep(self.UPLOAD_INTERVAL_S) self.log.info('continuous upload ended') - async def upload(self): - items = self.get_data_files() + async def upload(self) -> None: + items = self.get_upload_folders() if not items: self.log.debug('No images found to upload') return self.log.info('Found %s images to upload', len(items)) - for i in range(0, len(items), self.BATCH_SIZE): - batch_items = items[i:i+self.BATCH_SIZE] - if self.shutdown_event.is_set(): - break - try: - await self._upload_batch(batch_items) - except Exception: - self.log.exception('Could not upload files') - async def _upload_batch(self, items: List[str]): + batch_items = items[:self.BATCH_SIZE] + try: + await self._upload_batch(batch_items) + except Exception: + self.log.exception('Could not upload files') - # NOTE: keys are not relevant for the server, but using a fixed key like 'files' - # results in a post failure on the first run of the test in a docker environment (WTF) + async def _clear_item(self, item: str) -> None: + try: + if item in self.upload_folders: + self.upload_folders.remove(item) + if item in self.priority_upload_folders: + self.priority_upload_folders.remove(item) + await run.io_bound(shutil.rmtree, item, ignore_errors=True) + self.log.debug('Deleted %s', item) + except Exception: + self.log.exception('Failed to delete %s', item) + + async def _upload_batch(self, items: List[str]) -> None: + """ + Uploads a batch of images to the server. + :param items: List of folders to upload (each folder contains an image and a metadata file) + """ data: List[Tuple[str, Union[TextIOWrapper, BufferedReader]]] = [] for item in items: + if not os.path.exists(item): + await self._clear_item(item) + continue identifier = os.path.basename(item) data.append(('files', open(f'{item}/image_{identifier}.json', 'r'))) data.append(('files', open(f'{item}/image_{identifier}.jpg', 'rb'))) try: async with aiohttp.ClientSession() as session: - response = await session.post(self.target_uri, data=data, timeout=self.UPLOAD_TIMEOUT_S) + response = await session.post(self.target_uri, data=data, timeout=aiohttp.ClientTimeout(total=self.UPLOAD_TIMEOUT_S)) + await response.read() except Exception: self.log.exception('Could not upload images') return @@ -159,23 +249,23 @@ async def _upload_batch(self, items: List[str]): if response.status == 200: self.upload_counter += len(items) + self.log.debug('Uploaded %s images', len(items)) for item in items: - try: - shutil.rmtree(item) - self.log.debug('Deleted %s', item) - except Exception: - self.log.exception('Failed to delete %s', item) - self.log.info('Uploaded %s images successfully', len(items)) - - elif response.status == 422: + await self._clear_item(item) + self.log.debug('Cleared %s images', len(items)) + return + + if response.status == 422: if len(items) == 1: self.log.error('Broken content in image: %s\n Skipping.', items[0]) - shutil.rmtree(items[0], ignore_errors=True) + await self._clear_item(items[0]) return self.log.exception('Broken content in batch. Splitting and retrying') await self._upload_batch(items[:len(items)//2]) await self._upload_batch(items[len(items)//2:]) + elif response.status == 429: + self.log.warning('Too many requests: %s', response.content) else: self.log.error('Could not upload images: %s', response.content) diff --git a/learning_loop_node/detector/rest/detect.py b/learning_loop_node/detector/rest/detect.py index 103832d7..d2fcb392 100644 --- a/learning_loop_node/detector/rest/detect.py +++ b/learning_loop_node/detector/rest/detect.py @@ -1,7 +1,6 @@ import logging from typing import TYPE_CHECKING, Optional -import numpy as np from fastapi import APIRouter, File, Header, Request, UploadFile from ...data_classes.image_metadata import ImageMetadata @@ -35,14 +34,15 @@ async def http_detect( """ try: - np_image = np.fromfile(file.file, np.uint8) + # Read file directly to bytes instead of using numpy + file_bytes = file.file.read() except Exception as exc: logging.exception('Error during reading of image %s.', file.filename) raise Exception(f'Uploaded file {file.filename} is no image file.') from exc try: app: 'DetectorNode' = request.app - detections = await app.get_detections(raw_image=np_image, + detections = await app.get_detections(raw_image=file_bytes, camera_id=camera_id or mac or None, tags=tags.split(',') if tags else [], source=source, diff --git a/learning_loop_node/detector/rest/upload.py b/learning_loop_node/detector/rest/upload.py index 85ae231d..0de6fdce 100644 --- a/learning_loop_node/detector/rest/upload.py +++ b/learning_loop_node/detector/rest/upload.py @@ -12,7 +12,8 @@ async def upload_image(request: Request, files: List[UploadFile] = File(...), source: Optional[str] = Query(None, description='Source of the image'), - creation_date: Optional[str] = Query(None, description='Creation date of the image')): + creation_date: Optional[str] = Query(None, description='Creation date of the image'), + upload_priority: bool = Query(False, description='Upload the image with priority')): """ Upload an image or multiple images to the learning loop. @@ -21,9 +22,9 @@ async def upload_image(request: Request, Example Usage - curl -X POST -F 'files=@test.jpg' "http://localhost:/upload?source=test&creation_date=2024-01-01T00:00:00" + curl -X POST -F 'files=@test.jpg' "http://localhost:/upload?source=test&creation_date=2024-01-01T00:00:00&upload_priority=true" """ raw_files = [await file.read() for file in files] node: DetectorNode = request.app - await node.upload_images(raw_files, source, creation_date) + await node.upload_images(images=raw_files, source=source, creation_date=creation_date, upload_priority=upload_priority) return 200, "OK" diff --git a/learning_loop_node/helpers/background_tasks.py b/learning_loop_node/helpers/background_tasks.py new file mode 100644 index 00000000..3682c891 --- /dev/null +++ b/learning_loop_node/helpers/background_tasks.py @@ -0,0 +1,78 @@ +# Copy of Nicegui background_tasks.py +# MIT License + +# Copyright (c) 2021 Zauberzeug GmbH + +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: + +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. + +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +"""inspired from https://quantlane.com/blog/ensure-asyncio-task-exceptions-get-logged/""" +from __future__ import annotations + +import asyncio +import logging +from typing import Awaitable, Dict, Set + +running_tasks: Set[asyncio.Task] = set() +lazy_tasks_running: Dict[str, asyncio.Task] = {} +lazy_tasks_waiting: Dict[str, Awaitable] = {} + + +def create(coroutine: Awaitable, *, name: str = 'unnamed task') -> asyncio.Task: + """Wraps a loop.create_task call and ensures there is an exception handler added to the task. + + If the task raises an exception, it is logged and handled by the global exception handlers. + Also a reference to the task is kept until it is done, so that the task is not garbage collected mid-execution. + See https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task. + """ + loop = asyncio.get_event_loop() + coroutine = coroutine if asyncio.iscoroutine(coroutine) else asyncio.wait_for(coroutine, None) + task: asyncio.Task = loop.create_task(coroutine, name=name) + task.add_done_callback(_handle_task_result) + running_tasks.add(task) + task.add_done_callback(running_tasks.discard) + return task + + +def create_lazy(coroutine: Awaitable, *, name: str) -> None: + """Wraps a create call and ensures a second task with the same name is delayed until the first one is done. + + If a third task with the same name is created while the first one is still running, the second one is discarded. + """ + if name in lazy_tasks_running: + if name in lazy_tasks_waiting: + asyncio.Task(lazy_tasks_waiting[name]).cancel() + lazy_tasks_waiting[name] = coroutine + return + + def finalize(name: str) -> None: + lazy_tasks_running.pop(name) + if name in lazy_tasks_waiting: + create_lazy(lazy_tasks_waiting.pop(name), name=name) + task = create(coroutine, name=name) + lazy_tasks_running[name] = task + task.add_done_callback(lambda _: finalize(name)) + + +def _handle_task_result(task: asyncio.Task) -> None: + try: + task.result() + except asyncio.CancelledError: + pass + except Exception: + logging.exception('Background task %s raised an exception', task.get_name()) diff --git a/learning_loop_node/helpers/run.py b/learning_loop_node/helpers/run.py new file mode 100644 index 00000000..b41021d4 --- /dev/null +++ b/learning_loop_node/helpers/run.py @@ -0,0 +1,21 @@ +import asyncio +import sys +from typing import Any, Callable, TypeVar + +T = TypeVar('T') + +if sys.version_info >= (3, 10): + from typing import ParamSpec + P = ParamSpec('P') + + async def io_bound(func: Callable[P, T], *args: P.args, **kwargs: P.kwargs) -> T: + """Run a blocking function in a thread pool executor. + This is useful for disk I/O operations that would block the event loop.""" + loop = asyncio.get_event_loop() + return await loop.run_in_executor(None, lambda: func(*args, **kwargs)) +else: + async def io_bound(func: Callable[..., T], *args: Any, **kwargs: Any) -> T: + """Run a blocking function in a thread pool executor. + This is useful for disk I/O operations that would block the event loop.""" + loop = asyncio.get_event_loop() + return await loop.run_in_executor(None, lambda: func(*args, **kwargs)) diff --git a/learning_loop_node/node.py b/learning_loop_node/node.py index c7dbbb8a..c2ed24f0 100644 --- a/learning_loop_node/node.py +++ b/learning_loop_node/node.py @@ -76,6 +76,8 @@ def __init__(self, name: str, uuid: Optional[str] = None, node_type: str = 'node self.previous_state: Optional[str] = None self.repeat_loop_cycle_sec = 5 + self._client_session: Optional[aiohttp.ClientSession] = None + def log_status_on_change(self, current_state_str: str, full_status: Any): if self.previous_state != current_state_str: self.previous_state = current_state_str @@ -127,6 +129,8 @@ async def _on_shutdown(self): await self.loop_communicator.shutdown() if self._sio_client is not None: await self._sio_client.disconnect() + if self._client_session is not None: + await self._client_session.close() self.log.info('successfully disconnected from loop.') await self.on_shutdown() @@ -205,12 +209,15 @@ async def _reconnect_socketio(self): ssl_context.verify_mode = ssl.CERT_REQUIRED connector = TCPConnector(ssl=ssl_context) + if self._client_session is not None: + await self._client_session.close() + if self.needs_login: - self._sio_client = AsyncClient(request_timeout=20, http_session=aiohttp.ClientSession( - cookies=cookies, connector=connector)) + self._client_session = aiohttp.ClientSession(cookies=cookies, connector=connector) else: - self._sio_client = AsyncClient(request_timeout=20, http_session=aiohttp.ClientSession( - connector=connector)) + self._client_session = aiohttp.ClientSession(connector=connector) + + self._sio_client = AsyncClient(request_timeout=20, http_session=self._client_session) # pylint: disable=protected-access self._sio_client._trigger_event = ensure_socket_response(self._sio_client._trigger_event) diff --git a/learning_loop_node/tests/annotator/conftest.py b/learning_loop_node/tests/annotator/conftest.py index 104f9f16..121e3d53 100644 --- a/learning_loop_node/tests/annotator/conftest.py +++ b/learning_loop_node/tests/annotator/conftest.py @@ -3,18 +3,23 @@ import os import shutil +# ====================================== REDUNDANT FIXTURES IN ALL CONFTESTS ! ====================================== +import sys + import pytest from ...globals import GLOBALS from ...loop_communication import LoopCommunicator -# ====================================== REDUNDANT FIXTURES IN ALL CONFTESTS ! ====================================== - @pytest.fixture() async def setup_test_project(): # pylint: disable=redefined-outer-name loop_communicator = LoopCommunicator() - await loop_communicator.delete("/zauberzeug/projects/pytest_nodelib_annotator?keep_images=true") + try: + await loop_communicator.delete("/zauberzeug/projects/pytest_nodelib_annotator?keep_images=true", timeout=10) + except Exception: + logging.warning("Failed to delete project pytest_nodelib_annotator") + sys.exit(1) await asyncio.sleep(1) project_conf = { 'project_name': 'pytest_nodelib_annotator', 'inbox': 0, 'annotate': 0, 'review': 0, 'complete': 3, 'image_style': 'beautiful', @@ -22,7 +27,7 @@ async def setup_test_project(): # pylint: disable=redefined-outer-name 'trainings': 1, 'box_detections': 3, 'box_annotations': 0} assert (await loop_communicator.post("/zauberzeug/projects/generator", json=project_conf)).status_code == 200 yield - await loop_communicator.delete("/zauberzeug/projects/pytest_nodelib_annotator?keep_images=true") + await loop_communicator.delete("/zauberzeug/projects/pytest_nodelib_annotator?keep_images=true", timeout=10) await loop_communicator.shutdown() diff --git a/learning_loop_node/tests/annotator/test_annotator_node.py b/learning_loop_node/tests/annotator/test_annotator_node.py index c6d7ba03..566df65b 100644 --- a/learning_loop_node/tests/annotator/test_annotator_node.py +++ b/learning_loop_node/tests/annotator/test_annotator_node.py @@ -7,7 +7,14 @@ from ...annotation.annotator_logic import AnnotatorLogic from ...annotation.annotator_node import AnnotatorNode -from ...data_classes import AnnotationData, Category, Context, Point, ToolOutput, UserInput +from ...data_classes import ( + AnnotationData, + Category, + Context, + Point, + ToolOutput, + UserInput, +) from ...enums import AnnotationEventType, CategoryType @@ -37,7 +44,8 @@ def default_user_input() -> UserInput: @pytest.mark.asyncio -async def test_image_download(setup_test_project): # pylint: disable=unused-argument +@pytest.mark.usefixtures('setup_test_project') +async def test_image_download(): image_folder = '/tmp/learning_loop_lib_data/zauberzeug/pytest_nodelib_annotator/images' assert os.path.exists(image_folder) is False or len(os.listdir(image_folder)) == 0 diff --git a/learning_loop_node/tests/detector/inbox_filter/test_unexpected_observations_count.py b/learning_loop_node/tests/detector/inbox_filter/test_unexpected_observations_count.py index 83209c93..a100d019 100644 --- a/learning_loop_node/tests/detector/inbox_filter/test_unexpected_observations_count.py +++ b/learning_loop_node/tests/detector/inbox_filter/test_unexpected_observations_count.py @@ -24,10 +24,11 @@ ['uncertain', 'unexpected_observations_count']), (ImageMetadata(box_detections=[h_conf_box_det], point_detections=[l_conf_point_det]), ['uncertain'])]) -def test_unexpected_observations_count(detections: ImageMetadata, reason: List[str]): +@pytest.mark.asyncio +async def test_unexpected_observations_count(detections: ImageMetadata, reason: List[str]): os.environ['LOOP_ORGANIZATION'] = 'zauberzeug' os.environ['LOOP_PROJECT'] = 'demo' outbox = Outbox() - r_filter = RelevanceFilter(outbox) - assert r_filter.may_upload_detections(detections, raw_image=b'', cam_id='0:0:0:0', tags=[]) == reason + relevance_filter = RelevanceFilter(outbox) + assert await relevance_filter.may_upload_detections(detections, raw_image=b'', cam_id='0:0:0:0', tags=[]) == reason diff --git a/learning_loop_node/tests/detector/test_client_communication.py b/learning_loop_node/tests/detector/test_client_communication.py index 6327543b..21b2b235 100644 --- a/learning_loop_node/tests/detector/test_client_communication.py +++ b/learning_loop_node/tests/detector/test_client_communication.py @@ -84,7 +84,7 @@ async def test_sio_upload(test_detector_node: DetectorNode, sio_client): with open(test_image_path, 'rb') as f: image_bytes = f.read() result = await sio_client.call('upload', {'image': image_bytes}) - assert result is None + assert result.get('status') == 'OK' assert len(get_outbox_files(test_detector_node.outbox)) == 2, 'There should be one image and one .json file.' @@ -175,25 +175,3 @@ def check_switch_to_mode(mode: str): check_switch_to_mode('stopped') check_switch_to_mode('continuous_upload') check_switch_to_mode('stopped') - - -async def test_api_responsive_during_large_upload(test_detector_node: DetectorNode): - assert len(get_outbox_files(test_detector_node.outbox)) == 0 - - with open(test_image_path, 'rb') as f: - image_bytes = f.read() - - for _ in range(200): - test_detector_node.outbox.save(image_bytes) - - outbox_size_early = len(get_outbox_files(test_detector_node.outbox)) - await asyncio.sleep(5) # NOTE: we wait 5 seconds because the continuous upload is running every 5 seconds - - # check if api is still responsive - response = requests.get(f'http://localhost:{GLOBALS.detector_port}/outbox_mode', timeout=2) - assert response.status_code == 200, response.content - - await asyncio.sleep(5) - outbox_size_late = len(get_outbox_files(test_detector_node.outbox)) - assert outbox_size_late > 0, 'The outbox should not be fully cleared, maybe the node was too fast.' - assert outbox_size_early > outbox_size_late, 'The outbox should have been partially emptied.' diff --git a/learning_loop_node/tests/detector/test_outbox.py b/learning_loop_node/tests/detector/test_outbox.py index faca86f4..626304df 100644 --- a/learning_loop_node/tests/detector/test_outbox.py +++ b/learning_loop_node/tests/detector/test_outbox.py @@ -6,8 +6,6 @@ import pytest from PIL import Image -from ...data_classes import ImageMetadata -from ...detector.detector_node import DetectorNode from ...detector.outbox import Outbox from ...globals import GLOBALS @@ -26,31 +24,24 @@ async def test_outbox(): shutil.rmtree(test_outbox.path, ignore_errors=True) -@pytest.mark.asyncio -async def test_files_are_automatically_uploaded_by_node(test_detector_node: DetectorNode): - test_detector_node.outbox.save(get_test_image_binary(), ImageMetadata()) - assert await wait_for_outbox_count(test_detector_node.outbox, 1) - assert await wait_for_outbox_count(test_detector_node.outbox, 0) - - @pytest.mark.asyncio async def test_set_outbox_mode(test_outbox: Outbox): await test_outbox.set_mode('stopped') - test_outbox.save(get_test_image_binary()) + await test_outbox.save(get_test_image_binary()) assert await wait_for_outbox_count(test_outbox, 1) await asyncio.sleep(6) assert await wait_for_outbox_count(test_outbox, 1), 'File was cleared even though outbox should be stopped' await test_outbox.set_mode('continuous_upload') - assert await wait_for_outbox_count(test_outbox, 0), 'File was not cleared even though outbox should be in continuous_upload' + assert await wait_for_outbox_count(test_outbox, 0, timeout=15), 'File was not cleared even though outbox should be in continuous_upload' assert test_outbox.upload_counter == 1 @pytest.mark.asyncio async def test_outbox_upload_is_successful(test_outbox: Outbox): - test_outbox.save(get_test_image_binary()) + await test_outbox.save(get_test_image_binary()) await asyncio.sleep(1) - test_outbox.save(get_test_image_binary()) + await test_outbox.save(get_test_image_binary()) assert await wait_for_outbox_count(test_outbox, 2) await test_outbox.upload() assert await wait_for_outbox_count(test_outbox, 0) @@ -60,8 +51,8 @@ async def test_outbox_upload_is_successful(test_outbox: Outbox): @pytest.mark.asyncio async def test_invalid_jpg_is_not_saved(test_outbox: Outbox): invalid_bytes = b'invalid jpg' - test_outbox.save(invalid_bytes) - assert len(test_outbox.get_data_files()) == 0 + await test_outbox.save(invalid_bytes) + assert len(test_outbox.get_upload_folders()) == 0 # ------------------------------ Helper functions -------------------------------------- @@ -90,7 +81,7 @@ def get_test_image_binary(): async def wait_for_outbox_count(outbox: Outbox, count: int, timeout: int = 10) -> bool: for _ in range(timeout): - if len(outbox.get_data_files()) == count: + if len(outbox.get_upload_folders()) == count: return True await asyncio.sleep(1) return False diff --git a/learning_loop_node/tests/general/conftest.py b/learning_loop_node/tests/general/conftest.py index 38c564a5..8cc34d1b 100644 --- a/learning_loop_node/tests/general/conftest.py +++ b/learning_loop_node/tests/general/conftest.py @@ -2,6 +2,7 @@ import logging import os import shutil +import sys import pytest @@ -15,7 +16,12 @@ async def create_project_for_module(): loop_communicator = LoopCommunicator() - await loop_communicator.delete("/zauberzeug/projects/pytest_nodelib_general?keep_images=true") + try: + await loop_communicator.delete("/zauberzeug/projects/pytest_nodelib_general", timeout=10) + except Exception: + logging.warning("Failed to delete project pytest_nodelib_general") + sys.exit(1) + await asyncio.sleep(1) project_configuration = { 'project_name': 'pytest_nodelib_general', 'inbox': 0, 'annotate': 0, 'review': 0, 'complete': 3, 'image_style': 'beautiful', @@ -23,7 +29,7 @@ async def create_project_for_module(): 'trainings': 1, 'box_detections': 3, 'box_annotations': 0} assert (await loop_communicator.post("/zauberzeug/projects/generator", json=project_configuration)).status_code == 200 yield - await loop_communicator.delete("/zauberzeug/projects/pytest_nodelib_general?keep_images=true") + await loop_communicator.delete("/zauberzeug/projects/pytest_nodelib_general", timeout=10) await loop_communicator.shutdown()