Skip to content

Commit

Permalink
Merge branch 'main' into release
Browse files Browse the repository at this point in the history
  • Loading branch information
denniswittich committed Jun 27, 2024
2 parents 37bfd5d + 5c3e05e commit ed3d067
Show file tree
Hide file tree
Showing 13 changed files with 279 additions and 87 deletions.
22 changes: 22 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ from learning_loop_node/learning_loop_node

Detector Nodes are normally deployed on edge devices like robots or machinery but can also run in the cloud to provide backend services for an app or similar. These nodes register themself at the Learning Loop. They provide REST and Socket.io APIs to run inference on images. The processed images can automatically be used for active learning: e.g. uncertain predictions will be send to the Learning Loop.

### Running Inference

Images can be send to the detector node via socketio or rest.
The later approach can be used via curl,

Expand All @@ -62,6 +64,26 @@ The detector also has a sio **upload endpoint** that can be used to upload image

The endpoint returns None if the upload was successful and an error message otherwise.

### Changing the outbox mode

If the autoupload is set to `all` or `filtered` (selected) images and the corresponding detections are saved on HDD (the outbox). A background thread will upload the images and detections to the Learning Loop. The outbox is located in the `outbox` folder in the root directory of the node. The outbox can be cleared by deleting the files in the folder.

The continuous upload can be stopped/started via a REST enpoint:

Example Usage:

- Enable upload: `curl -X PUT -d "continuous_upload" http://localhost/outbox_mode`
- Disable upload: `curl -X PUT -d "stopped" http://localhost/outbox_mode`

The current state can be queried via a GET request:
`curl http://localhost/outbox_mode`

### Explicit upload

The detector has a REST endpoint to upload images (and detections) to the Learning Loop. The endpoint takes a POST request with the image and optionally the detections. The image is expected to be in jpg format. The detections are expected to be a json dictionary. Example:

`curl -X POST -F '[email protected]' "http://localhost:/upload"`

## Trainer Node

Trainers fetch the images and anntoations from the Learning Loop to train new models.
Expand Down
13 changes: 9 additions & 4 deletions learning_loop_node/data_exchanger.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from .data_classes import Context
from .helpers.misc import create_resource_paths, create_task, is_valid_image
from .loop_communication import LoopCommunicator
from .trainer.exceptions import CriticalError


class DownloadError(Exception):
Expand Down Expand Up @@ -159,13 +160,17 @@ async def download_model(self, target_folder: str, context: Context, model_uuid:
logging.info(f'Downloaded model {model_uuid}({model_format}) to {target_folder}.')
return created_files

async def upload_model_get_uuid(self, context: Context, files: List[str], training_number: Optional[int], mformat: str) -> Optional[str]:
"""Used by the trainers. Function returns the new model uuid to use for detection."""
async def upload_model_get_uuid(self, context: Context, files: List[str], training_number: Optional[int], mformat: str) -> str:
"""Used by the trainers. Function returns the new model uuid to use for detection.
:return: The new model uuid.
:raise CriticalError: If the upload does not return status code 200.
"""
response = await self.loop_communicator.put(f'/{context.organization}/projects/{context.project}/trainings/{training_number}/models/latest/{mformat}/file', files=files)
if response.status_code != 200:
logging.error(f'Could not upload model for training {training_number}, format {mformat}: {response.text}')
response.raise_for_status()
return None
raise CriticalError(
f'Could not upload model for training {training_number}, format {mformat}: {response.text}')

uploaded_model = response.json()
logging.info(f'Uploaded model for training {training_number}, format {mformat}. Response is: {uploaded_model}')
Expand Down
6 changes: 4 additions & 2 deletions learning_loop_node/detector/detector_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from .rest import backdoor_controls
from .rest import detect as rest_detect
from .rest import operation_mode as rest_mode
from .rest import outbox_mode as rest_outbox_mode
from .rest import upload as rest_upload
from .rest.operation_mode import OperationMode

Expand Down Expand Up @@ -57,6 +58,7 @@ def __init__(self, name: str, detector: DetectorLogic, uuid: Optional[str] = Non
self.include_router(rest_upload.router, prefix="")
self.include_router(rest_mode.router, tags=["operation_mode"])
self.include_router(rest_about.router, tags=["about"])
self.include_router(rest_outbox_mode.router, tags=["outbox_mode"])

if use_backdoor_controls:
self.include_router(backdoor_controls.router)
Expand Down Expand Up @@ -89,15 +91,15 @@ async def soft_reload(self) -> None:

async def on_startup(self) -> None:
try:
self.outbox.start_continuous_upload()
self.outbox.ensure_continuous_upload()
self.detector_logic.load_model()
except Exception:
self.log.exception("error during 'startup'")
self.operation_mode = OperationMode.Idle

async def on_shutdown(self) -> None:
try:
self.outbox.stop_continuous_upload()
self.outbox.ensure_continuous_upload_stopped()
for sid in self.connected_clients:
# pylint: disable=no-member
await self.sio.disconnect(sid) # type:ignore
Expand Down
138 changes: 103 additions & 35 deletions learning_loop_node/detector/outbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import time
from dataclasses import asdict
from datetime import datetime
from enum import Enum
from glob import glob
from io import BufferedReader, TextIOWrapper
from multiprocessing import Event
from multiprocessing.synchronize import Event as SyncEvent
from threading import Thread
Expand All @@ -19,13 +21,18 @@
from ..helpers import environment_reader


class Outbox():
class OutboxMode(Enum):
CONTINUOUS_UPLOAD = 'continuous_upload'
STOPPED = 'stopped'


class Outbox():
def __init__(self) -> None:
self.log = logging.getLogger()
self.path = f'{GLOBALS.data_folder}/outbox'
os.makedirs(self.path, exist_ok=True)

self.log = logging.getLogger()
host = environment_reader.host()
o = environment_reader.organization()
p = environment_reader.project()
Expand All @@ -34,9 +41,12 @@ def __init__(self) -> None:
base_url = f'http{"s" if "learning-loop.ai" in host else ""}://{host}/api'
base: str = base_url
self.target_uri = f'{base}/{o}/projects/{p}/images'
self.log.info(f'Outbox initialized with target_uri: {self.target_uri}')
self.log.info('Outbox initialized with target_uri: %s', self.target_uri)

self.BATCH_SIZE = 20
self.UPLOAD_TIMEOUT_S = 30

self.shutdown_event: Optional[SyncEvent] = None
self.shutdown_event: SyncEvent = Event()
self.upload_process: Optional[Thread] = None

def save(self, image: bytes, detections: Optional[Detections] = None, tags: Optional[List[str]] = None) -> None:
Expand All @@ -59,59 +69,117 @@ def save(self, image: bytes, detections: Optional[Detections] = None, tags: Opti
if os.path.exists(tmp):
os.rename(tmp, self.path + '/' + identifier) # NOTE rename is atomic so upload can run in parallel
else:
self.log.error(f'Could not rename {tmp} to {self.path}/{identifier}')
self.log.error('Could not rename %s to %s', tmp, self.path + '/' + identifier)

def get_data_files(self):
return glob(f'{self.path}/*')

def start_continuous_upload(self):
self.shutdown_event = Event()
self.upload_process = Thread(target=self._continuous_upload)
def ensure_continuous_upload(self):
self.log.debug('start_continuous_upload')
if self._upload_process_alive():
self.log.debug('Upload thread already running')
return

self.shutdown_event.clear()
self.upload_process = Thread(target=self._continuous_upload, name='OutboxUpload')
self.upload_process.start()

def _continuous_upload(self):
self.log.info('start continuous upload')
self.log.info('continuous upload started')
assert self.shutdown_event is not None
while not self.shutdown_event.is_set():
self.upload()
time.sleep(1)
self.log.info('stop continuous upload')
time.sleep(5)
self.log.info('continuous upload ended')

def upload(self):
items = self.get_data_files()
if items:
self.log.info(f'Found {len(items)} images to upload')
for item in items:
if self.shutdown_event and self.shutdown_event.is_set():
break
try:
data = [('files', open(f'{item}/image.json', 'r')),
('files', open(f'{item}/image.jpg', 'rb'))]

response = requests.post(self.target_uri, files=data, timeout=30)
if response.status_code == 200:
shutil.rmtree(item)
self.log.info(f'uploaded {item} successfully')
elif response.status_code == 422:
self.log.error(f'Broken content in {item}: dropping this data')
shutil.rmtree(item)
else:
self.log.error(f'Could not upload {item}: {response.status_code}')
except Exception:
self.log.exception('could not upload files')

def stop_continuous_upload(self, timeout=5):
self.log.info('Found %s images to upload', len(items))
for i in range(0, len(items), self.BATCH_SIZE):
batch_items = items[i:i+self.BATCH_SIZE]
if self.shutdown_event.is_set():
break
try:
self._upload_batch(batch_items)
except Exception:
self.log.exception('Could not upload files')
else:
self.log.info('No images found to upload')

def _upload_batch(self, items: List[str]):
data: List[tuple[str, TextIOWrapper | BufferedReader]] = []
data = [('files', open(f'{item}/image.json', 'r')) for item in items]
data += [('files', open(f'{item}/image.jpg', 'rb')) for item in items]

response = requests.post(self.target_uri, files=data, timeout=self.UPLOAD_TIMEOUT_S)
if response.status_code == 200:
for item in items:
shutil.rmtree(item, ignore_errors=True)
self.log.info('Uploaded %s images successfully', len(items))
elif response.status_code == 422:
if len(items) == 1:
self.log.error('Broken content in image: %s\n Skipping.', items[0])
shutil.rmtree(items[0], ignore_errors=True)
return

self.log.exception('Broken content in batch. Splitting and retrying')
self._upload_batch(items[:len(items)//2])
self._upload_batch(items[len(items)//2:])
else:
self.log.error('Could not upload images: %s', response.content)

def ensure_continuous_upload_stopped(self) -> bool:
self.log.debug('Outbox: Ensuring continuous upload')
if not self._upload_process_alive():
self.log.debug('Upload thread already stopped')
return True
proc = self.upload_process
if not proc:
return
return True

try:
assert self.shutdown_event is not None
self.shutdown_event.set()
assert proc is not None
proc.join(timeout)
proc.join(self.UPLOAD_TIMEOUT_S + 1)
except Exception:
logging.exception('error while shutting down upload thread')
self.log.exception('Error while shutting down upload thread: ')

if proc.is_alive():
self.log.error('upload thread did not terminate')
self.log.error('Upload thread did not terminate')
return False

self.log.info('Upload thread terminated')
return True

def _upload_process_alive(self) -> bool:
return bool(self.upload_process and self.upload_process.is_alive())

def get_mode(self) -> OutboxMode:
''':return: current mode ('continuous_upload' or 'stopped')'''
if self.upload_process and self.upload_process.is_alive():
current_mode = OutboxMode.CONTINUOUS_UPLOAD
else:
current_mode = OutboxMode.STOPPED

self.log.debug('Outbox: Current mode is %s', current_mode)
return current_mode

def set_mode(self, mode: OutboxMode | str):
''':param mode: 'continuous_upload' or 'stopped'
:raises ValueError: if mode is not a valid OutboxMode
:raises TimeoutError: if the upload thread does not terminate within 31 seconds with mode='stopped'
'''
if isinstance(mode, str):
mode = OutboxMode(mode)

if mode == OutboxMode.CONTINUOUS_UPLOAD:
self.ensure_continuous_upload()
elif mode == OutboxMode.STOPPED:
try:
self.ensure_continuous_upload_stopped()
except TimeoutError as e:
raise TimeoutError(f'Upload thread did not terminate within {self.UPLOAD_TIMEOUT_S} seconds.') from e

self.log.debug('set outbox mode to %s', mode)
35 changes: 35 additions & 0 deletions learning_loop_node/detector/rest/outbox_mode.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from fastapi import APIRouter, HTTPException, Request
from fastapi.responses import PlainTextResponse

from ..outbox import Outbox

router = APIRouter()


@router.get("/outbox_mode")
async def get_outbox_mode(request: Request):
'''
Example Usage
curl http://localhost/outbox_mode
'''
outbox: Outbox = request.app.outbox
return PlainTextResponse(outbox.get_mode().value)


@router.put("/outbox_mode")
async def put_outbox_mode(request: Request):
'''
Example Usage
curl -X PUT -d "continuous_upload" http://localhost/outbox_mode
curl -X PUT -d "stopped" http://localhost/outbox_mode
'''
outbox: Outbox = request.app.outbox
content = str(await request.body(), 'utf-8')
try:
outbox.set_mode(content)
except TimeoutError as e:
raise HTTPException(202, 'Setting has not completed, yet: ' + str(e)) from e
except ValueError as e:
raise HTTPException(422, 'Could not set outbox mode: ' + str(e)) from e

return "OK"
8 changes: 6 additions & 2 deletions learning_loop_node/detector/rest/upload.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from typing import List
from typing import TYPE_CHECKING, List

from fastapi import APIRouter, File, Request, UploadFile

if TYPE_CHECKING:
from ..detector_node import DetectorNode

router = APIRouter()


Expand All @@ -13,5 +16,6 @@ async def upload_image(request: Request, files: List[UploadFile] = File(...)):
curl -X POST -F '[email protected]' "http://localhost:/upload"
"""
raw_files = [await file.read() for file in files]
await request.app.upload_images(raw_files)
node: DetectorNode = request.app
await node.upload_images(raw_files)
return 200, "OK"
16 changes: 16 additions & 0 deletions learning_loop_node/detector/tests/test_client_communication.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,19 @@ async def test_about_endpoint(test_detector_node: DetectorNode):
assert response_dict['state'] == 'online'
assert response_dict['target_model'] == '1.1'
assert any(c.name == 'purple point' for c in model_information.categories)


async def test_rest_outbox_mode(test_detector_node: DetectorNode):
await asyncio.sleep(3)

def check_switch_to_mode(mode: str):
response = requests.put(f'http://localhost:{GLOBALS.detector_port}/outbox_mode',
data=mode, timeout=30)
assert response.status_code == 200
response = requests.get(f'http://localhost:{GLOBALS.detector_port}/outbox_mode', timeout=30)
assert response.status_code == 200
assert response.content == mode.encode()

check_switch_to_mode('stopped')
check_switch_to_mode('continuous_upload')
check_switch_to_mode('stopped')
Loading

0 comments on commit ed3d067

Please sign in to comment.