Skip to content

Commit

Permalink
fix upload mechanics and nasty ClientSession error
Browse files Browse the repository at this point in the history
  • Loading branch information
denniswittich committed Feb 28, 2025
1 parent 4104673 commit ca77fce
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 32 deletions.
49 changes: 34 additions & 15 deletions learning_loop_node/detector/outbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ def __init__(self) -> None:

self.upload_folders = self.get_all_data_files() # make sure to upload all existing images (e.g. after a restart)

self._session = aiohttp.ClientSession()

def save(self,
image: bytes,
image_metadata: Optional[ImageMetadata] = None,
Expand Down Expand Up @@ -96,6 +98,7 @@ def save(self,
os.rename(tmp, self.path + '/' + identifier) # NOTE rename is atomic so upload can run in parallel
else:
self.log.error('Could not rename %s to %s', tmp, self.path + '/' + identifier)
return

if upload_priority:
self.priority_upload_folders.append(self.path + '/' + identifier)
Expand All @@ -106,13 +109,15 @@ def save(self,
if len(self.upload_folders) > self.MAX_UPLOAD_LENGTH:
items_to_drop = self.upload_folders[self.MAX_UPLOAD_LENGTH:]
self.log.info('Dropping %s images from upload list', len(items_to_drop))
try:
for item in items_to_drop:
shutil.rmtree(item)
self.log.debug('Deleted %s', item)
self.upload_folders = self.upload_folders[:self.MAX_UPLOAD_LENGTH]
except Exception:
self.log.exception('Failed to cut upload list')
self.upload_folders = self.upload_folders[:self.MAX_UPLOAD_LENGTH]

for item in items_to_drop:
try:
if os.path.exists(item):
shutil.rmtree(item)
self.log.debug('Deleted %s', item)
except Exception:
self.log.exception('Failed to delete %s', item)

def _is_valid_isoformat(self, date: Optional[str]) -> bool:
if date is None:
Expand Down Expand Up @@ -163,6 +168,17 @@ async def upload(self) -> None:
except Exception:
self.log.exception('Could not upload files')

def _clear_item(self, item: str) -> None:
if item in self.upload_folders:
self.upload_folders.remove(item)
if item in self.priority_upload_folders:
self.priority_upload_folders.remove(item)
try:
shutil.rmtree(item, ignore_errors=True)
self.log.debug('Deleted %s', item)
except Exception:
self.log.exception('Failed to delete %s', item)

async def _upload_batch(self, items: List[str]) -> None:
"""
Uploads a batch of images to the server.
Expand All @@ -174,13 +190,19 @@ async def _upload_batch(self, items: List[str]) -> None:

data: List[Tuple[str, Union[TextIOWrapper, BufferedReader]]] = []
for item in items:
if not os.path.exists(item):
self._clear_item(item)
continue
identifier = os.path.basename(item)
data.append(('files', open(f'{item}/image_{identifier}.json', 'r')))
data.append(('files', open(f'{item}/image_{identifier}.jpg', 'rb')))

try:
async with aiohttp.ClientSession() as session:
response = await session.post(self.target_uri, data=data, timeout=aiohttp.ClientTimeout(total=self.UPLOAD_TIMEOUT_S))
response = await self._session.post(
self.target_uri,
data=data,
timeout=aiohttp.ClientTimeout(total=self.UPLOAD_TIMEOUT_S)
)
except Exception:
self.log.exception('Could not upload images')
return
Expand All @@ -192,17 +214,14 @@ async def _upload_batch(self, items: List[str]) -> None:
if response.status == 200:
self.upload_counter += len(items)
for item in items:
try:
shutil.rmtree(item)
self.log.debug('Deleted %s', item)
except Exception:
self.log.exception('Failed to delete %s', item)
self._clear_item(item)

self.log.info('Uploaded %s images successfully', len(items))

elif response.status == 422:
if len(items) == 1:
self.log.error('Broken content in image: %s\n Skipping.', items[0])
shutil.rmtree(items[0], ignore_errors=True)
self._clear_item(items[0])
return

self.log.exception('Broken content in batch. Splitting and retrying')
Expand Down
34 changes: 17 additions & 17 deletions learning_loop_node/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def __init__(self, name: str, uuid: Optional[str] = None, node_type: str = 'node
self.data_exchanger = DataExchanger(None, self.loop_communicator)

self.startup_datetime = datetime.now()
self._session: Optional[aiohttp.ClientSession] = None
self._sio_client: Optional[AsyncClient] = None
self.status = NodeStatus(id=self.uuid, name=self.name)

Expand Down Expand Up @@ -125,8 +126,8 @@ async def _on_startup(self):
async def _on_shutdown(self):
self.log.info('received "shutdown" lifecycle-event')
await self.loop_communicator.shutdown()
if self._sio_client is not None:
await self._sio_client.disconnect()
if self._session:
await self._session.close()
self.log.info('successfully disconnected from loop.')
await self.on_shutdown()

Expand Down Expand Up @@ -184,17 +185,9 @@ async def _reconnect_socketio(self):
cookies = self.loop_communicator.get_cookies()
self.log.debug('HTTP Cookies: %s\n', cookies)

if self._sio_client is not None:
try:
await self.sio_client.disconnect()
self.log.info('disconnected from loop via sio')
# NOTE: without waiting for the disconnect event, we might disconnect the next connection too early
await asyncio.wait_for(self.DISCONNECTED_FROM_LOOP.wait(), timeout=5)
except asyncio.TimeoutError:
self.log.warning(
'Did not receive disconnect event from loop within 5 seconds.\nContinuing with new connection...')
except Exception as e:
self.log.warning('Could not disconnect from loop via sio: %s.\nIgnoring...', e)
if self._session:
await self._session.close()
self._session = None
self._sio_client = None

connector = None
Expand All @@ -205,12 +198,19 @@ async def _reconnect_socketio(self):
ssl_context.verify_mode = ssl.CERT_REQUIRED
connector = TCPConnector(ssl=ssl_context)

self._session = aiohttp.ClientSession(connector=connector)

if self.needs_login:
self._sio_client = AsyncClient(request_timeout=20, http_session=aiohttp.ClientSession(
cookies=cookies, connector=connector))
self._sio_client = AsyncClient(
request_timeout=20,
http_session=self._session,
cookies=cookies
)
else:
self._sio_client = AsyncClient(request_timeout=20, http_session=aiohttp.ClientSession(
connector=connector))
self._sio_client = AsyncClient(
request_timeout=20,
http_session=self._session
)

# pylint: disable=protected-access
self._sio_client._trigger_event = ensure_socket_response(self._sio_client._trigger_event)
Expand Down

0 comments on commit ca77fce

Please sign in to comment.