Skip to content

Commit

Permalink
Improved image upload (#58)
Browse files Browse the repository at this point in the history
Implementation of new upload strategy. 
Basic concept: Hold a list of items to upload which is cut off if it
gets to long.
In each update cycle the first BS items are uploaded.
There is a priority queue with images that are not dropped and always
uploaded fist.

Fixes an error due to an unclosed clientsession where we forgot to await
the response.

BREAKING CHANGE:
The abstract method `evaluate` which needs to be implemented by
detectors now provides the raw jpg image bytes instead of a
semi-converted np.array. This means if the detector uses cv2, reading
the file can be done with `cv_image = cv2.imdecode(np.frombuffer(image,
np.uint8), cv2.IMREAD_COLOR)`. Alternatively PIL could be used as well.

---------

Co-authored-by: Niklas Neugebauer <[email protected]>
  • Loading branch information
denniswittich and NiklasNeugebauer authored Mar 5, 2025
1 parent 47552d1 commit 73221a0
Show file tree
Hide file tree
Showing 16 changed files with 379 additions and 184 deletions.
69 changes: 32 additions & 37 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ The detector also has a sio **upload endpoint** that can be used to upload image
- `image`: the image data in jpg format
- `tags`: a list of strings. If not provided the tag is `picked_by_system`
- `detections`: a dictionary representing the detections. UUIDs for the classes are automatically determined based on the category names. This field is optional. If not provided, no detections are uploaded.
- `source`: optional source identifier for the image
- `creation_date`: optional creation date for the image
- `upload_priority`: boolean flag to prioritize the upload (defaults to False)

The endpoint returns None if the upload was successful and an error message otherwise.

Expand Down Expand Up @@ -147,58 +150,52 @@ Upload a model with
The model should now be available for the format 'format_a'
`curl "https://learning-loop.ai/api/zauberzeug/projects/demo/models?format=format_a"`

````
{
"models": [
```json
{
"id": "3c20d807-f71c-40dc-a996-8a8968aa5431",
"version": "4.0",
"formats": [
"format_a"
],
"created": "2021-06-01T06:28:21.289092",
"comment": "uploaded at 2021-06-01 06:28:21.288442",
...
}
]
"models": [
{
"id": "3c20d807-f71c-40dc-a996-8a8968aa5431",
"version": "4.0",
"formats": [
"format_a"
],
"created": "2021-06-01T06:28:21.289092",
"comment": "uploaded at 2021-06-01 06:28:21.288442",
...
}
]
}
```

but not in the format_b
`curl "https://learning-loop.ai/api/zauberzeug/projects/demo/models?format=format_b"`

```
```json
{
"models": []
"models": []
}
```

Connect the Node to the Learning Loop by simply starting the container.
After a short time the converted model should be available as well.
`curl https://learning-loop.ai/api/zauberzeug/projects/demo/models?format=format_b`

```
```json
{
"models": [
{
"id": "3c20d807-f71c-40dc-a996-8a8968aa5431",
"version": "4.0",
"formats": [
"format_a",
"format_b",
],
"created": "2021-06-01T06:28:21.289092",
"comment": "uploaded at 2021-06-01 06:28:21.288442",
...
"models": [
{
"id": "3c20d807-f71c-40dc-a996-8a8968aa5431",
"version": "4.0",
"formats": [
"format_a",
"format_b",
],
"created": "2021-06-01T06:28:21.289092",
"comment": "uploaded at 2021-06-01 06:28:21.288442",
...
}
]
}
]
}
```

## About Models (the currency between Nodes)
Expand All @@ -217,5 +214,3 @@ After a short time the converted model should be available as well.
- Nodes add properties to `model.json`, which contains all the information which are needed by subsequent nodes. These are typically the properties:
- `resolution`: resolution in which the model expects images (as `int`, since the resolution is mostly square - later, ` resolution_x`` resolution_y ` would also be conceivable or `resolutions` to give a list of possible resolutions)
- `categories`: list of categories with name, id, (later also type), in the order in which they are used by the model -- this is neccessary to be robust about renamings
```
````
6 changes: 2 additions & 4 deletions learning_loop_node/detector/detector_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
from abc import abstractmethod
from typing import List, Optional

import numpy as np

from ..data_classes import ImageMetadata, ModelInformation
from ..globals import GLOBALS
from .exceptions import NodeNeedsRestartError
Expand Down Expand Up @@ -44,13 +42,13 @@ def load_model_info_and_init_model(self):
def init(self):
"""Called when a (new) model was loaded. Initialize the model. Model information available via `self.model_info`"""

def evaluate_with_all_info(self, image: np.ndarray, tags: List[str], source: Optional[str] = None, creation_date: Optional[str] = None) -> ImageMetadata: # pylint: disable=unused-argument
def evaluate_with_all_info(self, image: bytes, tags: List[str], source: Optional[str] = None, creation_date: Optional[str] = None) -> ImageMetadata: # pylint: disable=unused-argument
"""Called by the detector node when an image should be evaluated (REST or SocketIO).
Tags, source come from the caller and may be used in this function.
By default, this function simply calls `evaluate`"""
return self.evaluate(image)

@abstractmethod
def evaluate(self, image: np.ndarray) -> ImageMetadata:
def evaluate(self, image: bytes) -> ImageMetadata:
"""Evaluate the image and return the detections.
The object should return empty detections if it is not initialized"""
74 changes: 44 additions & 30 deletions learning_loop_node/detector/detector_node.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import asyncio
import contextlib
import math
import os
import shutil
import subprocess
import sys
from dataclasses import asdict
from datetime import datetime
from threading import Thread
from typing import Dict, List, Optional
from typing import Dict, List, Optional, cast

import numpy as np
import socketio
Expand All @@ -30,7 +28,7 @@
from ..data_exchanger import DataExchanger, DownloadError
from ..enums import OperationMode, VersionMode
from ..globals import GLOBALS
from ..helpers import environment_reader
from ..helpers import background_tasks, environment_reader, run
from ..node import Node
from .detector_logic import DetectorLogic
from .exceptions import NodeNeedsRestartError
Expand Down Expand Up @@ -227,7 +225,7 @@ def setup_sio_server(self) -> None:
async def detect(sid, data: Dict) -> Dict:
try:
det = await self.get_detections(
raw_image=np.frombuffer(data['image'], np.uint8),
raw_image=data['image'],
camera_id=data.get('camera-id', None) or data.get('mac', None),
tags=data.get('tags', []),
source=data.get('source', None),
Expand Down Expand Up @@ -279,9 +277,10 @@ async def set_outbox_mode(sid, data: str) -> Dict:
return {'error': str(e)}

@self.sio.event
async def upload(sid, data: Dict) -> Optional[Dict]:
'''upload an image with detections'''
async def upload(sid, data: Dict) -> Dict:
"""Upload an image with detections"""

self.log.debug('Processing upload via socketio.')
detection_data = data.get('detections', {})
if detection_data and self.detector_logic.model_info is not None:
try:
Expand All @@ -293,22 +292,19 @@ async def upload(sid, data: Dict) -> Optional[Dict]:
else:
image_metadata = ImageMetadata()

tags = data.get('tags', [])
tags.append('picked_by_system')

source = data.get('source', None)
creation_date = data.get('creation_date', None)

self.log.debug('running upload via socketio. tags: %s, source: %s, creation_date: %s',
tags, source, creation_date)

loop = asyncio.get_event_loop()
try:
await loop.run_in_executor(None, self.outbox.save, data['image'], image_metadata, tags, source, creation_date)
await self.upload_images(
images=[data['image']],
image_metadata=image_metadata,
tags=data.get('tags', []),
source=data.get('source', None),
creation_date=data.get('creation_date', None),
upload_priority=data.get('upload_priority', False)
)
except Exception as e:
self.log.exception('could not upload via socketio')
return {'error': str(e)}
return None
return {'status': 'OK'}

@self.sio.event
def connect(sid, environ, auth) -> None:
Expand Down Expand Up @@ -469,7 +465,7 @@ async def set_operation_mode(self, mode: OperationMode):
self.log.warning('Operation mode set to %s, but sync failed: %s', mode, e)

def reload(self, reason: str):
'''provide a cause for the reload'''
"""provide a cause for the reload"""

self.log.info('########## reloading app because %s', reason)
if os.path.isfile('/app/app_code/restart/restart.py'):
Expand All @@ -482,7 +478,7 @@ def reload(self, reason: str):
self.log.error('could not reload app')

async def get_detections(self,
raw_image: np.ndarray,
raw_image: bytes,
camera_id: Optional[str],
tags: List[str],
source: Optional[str] = None,
Expand All @@ -494,30 +490,48 @@ async def get_detections(self,
It can be converted e.g. using cv2.imdecode(raw_image, cv2.IMREAD_COLOR)"""

await self.detection_lock.acquire()
loop = asyncio.get_event_loop()
detections = await loop.run_in_executor(None, self.detector_logic.evaluate_with_all_info, raw_image, tags, source, creation_date)
detections = await run.io_bound(self.detector_logic.evaluate_with_all_info, raw_image, tags, source, creation_date)
self.detection_lock.release()

fix_shape_detections(detections)
n_bo, n_cl = len(detections.box_detections), len(detections.classification_detections)
n_po, n_se = len(detections.point_detections), len(detections.segmentation_detections)
self.log.debug('Detected: %d boxes, %d points, %d segs, %d classes', n_bo, n_po, n_se, n_cl)

if autoupload is None or autoupload == 'filtered': # NOTE default is filtered
Thread(target=self.relevance_filter.may_upload_detections,
args=(detections, camera_id, raw_image, tags, source, creation_date)).start()
autoupload = autoupload or 'filtered'
if autoupload == 'filtered' and camera_id is not None:
background_tasks.create(self.relevance_filter.may_upload_detections(
detections, camera_id, raw_image, tags, source, creation_date
))
elif autoupload == 'all':
Thread(target=self.outbox.save, args=(raw_image, detections, tags, source, creation_date)).start()
background_tasks.create(self.outbox.save(raw_image, detections, tags, source, creation_date))
elif autoupload == 'disabled':
pass
else:
self.log.error('unknown autoupload value %s', autoupload)
return detections

async def upload_images(self, images: List[bytes], source: Optional[str], creation_date: Optional[str]):
loop = asyncio.get_event_loop()
async def upload_images(
self, *,
images: List[bytes],
image_metadata: Optional[ImageMetadata] = None,
tags: Optional[List[str]] = None,
source: Optional[str],
creation_date: Optional[str],
upload_priority: bool = False
) -> None:
"""Save images to the outbox using an asyncio executor.
Used by SIO and REST upload endpoints."""

if image_metadata is None:
image_metadata = ImageMetadata()
if tags is None:
tags = []

tags.append('picked_by_system')

for image in images:
await loop.run_in_executor(None, self.outbox.save, image, ImageMetadata(), ['picked_by_system'], source, creation_date)
await self.outbox.save(image, image_metadata, tags, source, creation_date, upload_priority)

def add_category_id_to_detections(self, model_info: ModelInformation, image_metadata: ImageMetadata):
def find_category_id_by_name(categories: List[Category], category_name: str):
Expand Down
20 changes: 11 additions & 9 deletions learning_loop_node/detector/inbox_filter/relevance_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,16 @@ def __init__(self, outbox: Outbox) -> None:
self.cam_histories: Dict[str, CamObservationHistory] = {}
self.outbox: Outbox = outbox

def may_upload_detections(self,
image_metadata: ImageMetadata,
cam_id: str,
raw_image: bytes,
tags: List[str],
source: Optional[str] = None,
creation_date: Optional[str] = None
) -> List[str]:
async def may_upload_detections(self,
image_metadata: ImageMetadata,
cam_id: str,
raw_image: bytes,
tags: List[str],
source: Optional[str] = None,
creation_date: Optional[str] = None) -> List[str]:
"""Check if the detection should be uploaded to the outbox.
If so, upload it and return the list of causes for the upload.
"""
for group in self.cam_histories.values():
group.forget_old_detections()

Expand All @@ -30,5 +32,5 @@ def may_upload_detections(self,
if len(causes) > 0:
tags = tags if tags is not None else []
tags.extend(causes)
self.outbox.save(raw_image, image_metadata, tags, source, creation_date)
await self.outbox.save(raw_image, image_metadata, tags, source, creation_date)
return causes
Loading

0 comments on commit 73221a0

Please sign in to comment.