From e81876a33d853704b68880941442eb7ac9ed9287 Mon Sep 17 00:00:00 2001 From: "Dr. Dennis Wittich" Date: Fri, 28 Feb 2025 16:30:35 +0100 Subject: [PATCH] Revert "fix upload mechanics and nasty ClientSession error" This reverts commit ca77fce5f29460026cc8a3302b14588227067504. --- learning_loop_node/detector/outbox.py | 49 ++++++++------------------- learning_loop_node/node.py | 34 +++++++++---------- 2 files changed, 32 insertions(+), 51 deletions(-) diff --git a/learning_loop_node/detector/outbox.py b/learning_loop_node/detector/outbox.py index 0126479..abe78c8 100644 --- a/learning_loop_node/detector/outbox.py +++ b/learning_loop_node/detector/outbox.py @@ -56,8 +56,6 @@ def __init__(self) -> None: self.upload_folders = self.get_all_data_files() # make sure to upload all existing images (e.g. after a restart) - self._session = aiohttp.ClientSession() - def save(self, image: bytes, image_metadata: Optional[ImageMetadata] = None, @@ -98,7 +96,6 @@ def save(self, os.rename(tmp, self.path + '/' + identifier) # NOTE rename is atomic so upload can run in parallel else: self.log.error('Could not rename %s to %s', tmp, self.path + '/' + identifier) - return if upload_priority: self.priority_upload_folders.append(self.path + '/' + identifier) @@ -109,15 +106,13 @@ def save(self, if len(self.upload_folders) > self.MAX_UPLOAD_LENGTH: items_to_drop = self.upload_folders[self.MAX_UPLOAD_LENGTH:] self.log.info('Dropping %s images from upload list', len(items_to_drop)) - self.upload_folders = self.upload_folders[:self.MAX_UPLOAD_LENGTH] - - for item in items_to_drop: - try: - if os.path.exists(item): - shutil.rmtree(item) - self.log.debug('Deleted %s', item) - except Exception: - self.log.exception('Failed to delete %s', item) + try: + for item in items_to_drop: + shutil.rmtree(item) + self.log.debug('Deleted %s', item) + self.upload_folders = self.upload_folders[:self.MAX_UPLOAD_LENGTH] + except Exception: + self.log.exception('Failed to cut upload list') def _is_valid_isoformat(self, date: Optional[str]) -> bool: if date is None: @@ -168,17 +163,6 @@ async def upload(self) -> None: except Exception: self.log.exception('Could not upload files') - def _clear_item(self, item: str) -> None: - if item in self.upload_folders: - self.upload_folders.remove(item) - if item in self.priority_upload_folders: - self.priority_upload_folders.remove(item) - try: - shutil.rmtree(item, ignore_errors=True) - self.log.debug('Deleted %s', item) - except Exception: - self.log.exception('Failed to delete %s', item) - async def _upload_batch(self, items: List[str]) -> None: """ Uploads a batch of images to the server. @@ -190,19 +174,13 @@ async def _upload_batch(self, items: List[str]) -> None: data: List[Tuple[str, Union[TextIOWrapper, BufferedReader]]] = [] for item in items: - if not os.path.exists(item): - self._clear_item(item) - continue identifier = os.path.basename(item) data.append(('files', open(f'{item}/image_{identifier}.json', 'r'))) data.append(('files', open(f'{item}/image_{identifier}.jpg', 'rb'))) try: - response = await self._session.post( - self.target_uri, - data=data, - timeout=aiohttp.ClientTimeout(total=self.UPLOAD_TIMEOUT_S) - ) + async with aiohttp.ClientSession() as session: + response = await session.post(self.target_uri, data=data, timeout=aiohttp.ClientTimeout(total=self.UPLOAD_TIMEOUT_S)) except Exception: self.log.exception('Could not upload images') return @@ -214,14 +192,17 @@ async def _upload_batch(self, items: List[str]) -> None: if response.status == 200: self.upload_counter += len(items) for item in items: - self._clear_item(item) - + try: + shutil.rmtree(item) + self.log.debug('Deleted %s', item) + except Exception: + self.log.exception('Failed to delete %s', item) self.log.info('Uploaded %s images successfully', len(items)) elif response.status == 422: if len(items) == 1: self.log.error('Broken content in image: %s\n Skipping.', items[0]) - self._clear_item(items[0]) + shutil.rmtree(items[0], ignore_errors=True) return self.log.exception('Broken content in batch. Splitting and retrying') diff --git a/learning_loop_node/node.py b/learning_loop_node/node.py index 6352c7d..c7dbbb8 100644 --- a/learning_loop_node/node.py +++ b/learning_loop_node/node.py @@ -55,7 +55,6 @@ def __init__(self, name: str, uuid: Optional[str] = None, node_type: str = 'node self.data_exchanger = DataExchanger(None, self.loop_communicator) self.startup_datetime = datetime.now() - self._session: Optional[aiohttp.ClientSession] = None self._sio_client: Optional[AsyncClient] = None self.status = NodeStatus(id=self.uuid, name=self.name) @@ -126,8 +125,8 @@ async def _on_startup(self): async def _on_shutdown(self): self.log.info('received "shutdown" lifecycle-event') await self.loop_communicator.shutdown() - if self._session: - await self._session.close() + if self._sio_client is not None: + await self._sio_client.disconnect() self.log.info('successfully disconnected from loop.') await self.on_shutdown() @@ -185,9 +184,17 @@ async def _reconnect_socketio(self): cookies = self.loop_communicator.get_cookies() self.log.debug('HTTP Cookies: %s\n', cookies) - if self._session: - await self._session.close() - self._session = None + if self._sio_client is not None: + try: + await self.sio_client.disconnect() + self.log.info('disconnected from loop via sio') + # NOTE: without waiting for the disconnect event, we might disconnect the next connection too early + await asyncio.wait_for(self.DISCONNECTED_FROM_LOOP.wait(), timeout=5) + except asyncio.TimeoutError: + self.log.warning( + 'Did not receive disconnect event from loop within 5 seconds.\nContinuing with new connection...') + except Exception as e: + self.log.warning('Could not disconnect from loop via sio: %s.\nIgnoring...', e) self._sio_client = None connector = None @@ -198,19 +205,12 @@ async def _reconnect_socketio(self): ssl_context.verify_mode = ssl.CERT_REQUIRED connector = TCPConnector(ssl=ssl_context) - self._session = aiohttp.ClientSession(connector=connector) - if self.needs_login: - self._sio_client = AsyncClient( - request_timeout=20, - http_session=self._session, - cookies=cookies - ) + self._sio_client = AsyncClient(request_timeout=20, http_session=aiohttp.ClientSession( + cookies=cookies, connector=connector)) else: - self._sio_client = AsyncClient( - request_timeout=20, - http_session=self._session - ) + self._sio_client = AsyncClient(request_timeout=20, http_session=aiohttp.ClientSession( + connector=connector)) # pylint: disable=protected-access self._sio_client._trigger_event = ensure_socket_response(self._sio_client._trigger_event)