Skip to content

Commit

Permalink
Revert "fix upload mechanics and nasty ClientSession error"
Browse files Browse the repository at this point in the history
This reverts commit ca77fce.
  • Loading branch information
denniswittich committed Feb 28, 2025
1 parent ca77fce commit e81876a
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 51 deletions.
49 changes: 15 additions & 34 deletions learning_loop_node/detector/outbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ def __init__(self) -> None:

self.upload_folders = self.get_all_data_files() # make sure to upload all existing images (e.g. after a restart)

self._session = aiohttp.ClientSession()

def save(self,
image: bytes,
image_metadata: Optional[ImageMetadata] = None,
Expand Down Expand Up @@ -98,7 +96,6 @@ def save(self,
os.rename(tmp, self.path + '/' + identifier) # NOTE rename is atomic so upload can run in parallel
else:
self.log.error('Could not rename %s to %s', tmp, self.path + '/' + identifier)
return

if upload_priority:
self.priority_upload_folders.append(self.path + '/' + identifier)
Expand All @@ -109,15 +106,13 @@ def save(self,
if len(self.upload_folders) > self.MAX_UPLOAD_LENGTH:
items_to_drop = self.upload_folders[self.MAX_UPLOAD_LENGTH:]
self.log.info('Dropping %s images from upload list', len(items_to_drop))
self.upload_folders = self.upload_folders[:self.MAX_UPLOAD_LENGTH]

for item in items_to_drop:
try:
if os.path.exists(item):
shutil.rmtree(item)
self.log.debug('Deleted %s', item)
except Exception:
self.log.exception('Failed to delete %s', item)
try:
for item in items_to_drop:
shutil.rmtree(item)
self.log.debug('Deleted %s', item)
self.upload_folders = self.upload_folders[:self.MAX_UPLOAD_LENGTH]
except Exception:
self.log.exception('Failed to cut upload list')

def _is_valid_isoformat(self, date: Optional[str]) -> bool:
if date is None:
Expand Down Expand Up @@ -168,17 +163,6 @@ async def upload(self) -> None:
except Exception:
self.log.exception('Could not upload files')

def _clear_item(self, item: str) -> None:
if item in self.upload_folders:
self.upload_folders.remove(item)
if item in self.priority_upload_folders:
self.priority_upload_folders.remove(item)
try:
shutil.rmtree(item, ignore_errors=True)
self.log.debug('Deleted %s', item)
except Exception:
self.log.exception('Failed to delete %s', item)

async def _upload_batch(self, items: List[str]) -> None:
"""
Uploads a batch of images to the server.
Expand All @@ -190,19 +174,13 @@ async def _upload_batch(self, items: List[str]) -> None:

data: List[Tuple[str, Union[TextIOWrapper, BufferedReader]]] = []
for item in items:
if not os.path.exists(item):
self._clear_item(item)
continue
identifier = os.path.basename(item)
data.append(('files', open(f'{item}/image_{identifier}.json', 'r')))
data.append(('files', open(f'{item}/image_{identifier}.jpg', 'rb')))

try:
response = await self._session.post(
self.target_uri,
data=data,
timeout=aiohttp.ClientTimeout(total=self.UPLOAD_TIMEOUT_S)
)
async with aiohttp.ClientSession() as session:
response = await session.post(self.target_uri, data=data, timeout=aiohttp.ClientTimeout(total=self.UPLOAD_TIMEOUT_S))
except Exception:
self.log.exception('Could not upload images')
return
Expand All @@ -214,14 +192,17 @@ async def _upload_batch(self, items: List[str]) -> None:
if response.status == 200:
self.upload_counter += len(items)
for item in items:
self._clear_item(item)

try:
shutil.rmtree(item)
self.log.debug('Deleted %s', item)
except Exception:
self.log.exception('Failed to delete %s', item)
self.log.info('Uploaded %s images successfully', len(items))

elif response.status == 422:
if len(items) == 1:
self.log.error('Broken content in image: %s\n Skipping.', items[0])
self._clear_item(items[0])
shutil.rmtree(items[0], ignore_errors=True)
return

self.log.exception('Broken content in batch. Splitting and retrying')
Expand Down
34 changes: 17 additions & 17 deletions learning_loop_node/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ def __init__(self, name: str, uuid: Optional[str] = None, node_type: str = 'node
self.data_exchanger = DataExchanger(None, self.loop_communicator)

self.startup_datetime = datetime.now()
self._session: Optional[aiohttp.ClientSession] = None
self._sio_client: Optional[AsyncClient] = None
self.status = NodeStatus(id=self.uuid, name=self.name)

Expand Down Expand Up @@ -126,8 +125,8 @@ async def _on_startup(self):
async def _on_shutdown(self):
self.log.info('received "shutdown" lifecycle-event')
await self.loop_communicator.shutdown()
if self._session:
await self._session.close()
if self._sio_client is not None:
await self._sio_client.disconnect()
self.log.info('successfully disconnected from loop.')
await self.on_shutdown()

Expand Down Expand Up @@ -185,9 +184,17 @@ async def _reconnect_socketio(self):
cookies = self.loop_communicator.get_cookies()
self.log.debug('HTTP Cookies: %s\n', cookies)

if self._session:
await self._session.close()
self._session = None
if self._sio_client is not None:
try:
await self.sio_client.disconnect()
self.log.info('disconnected from loop via sio')
# NOTE: without waiting for the disconnect event, we might disconnect the next connection too early
await asyncio.wait_for(self.DISCONNECTED_FROM_LOOP.wait(), timeout=5)
except asyncio.TimeoutError:
self.log.warning(
'Did not receive disconnect event from loop within 5 seconds.\nContinuing with new connection...')
except Exception as e:
self.log.warning('Could not disconnect from loop via sio: %s.\nIgnoring...', e)
self._sio_client = None

connector = None
Expand All @@ -198,19 +205,12 @@ async def _reconnect_socketio(self):
ssl_context.verify_mode = ssl.CERT_REQUIRED
connector = TCPConnector(ssl=ssl_context)

self._session = aiohttp.ClientSession(connector=connector)

if self.needs_login:
self._sio_client = AsyncClient(
request_timeout=20,
http_session=self._session,
cookies=cookies
)
self._sio_client = AsyncClient(request_timeout=20, http_session=aiohttp.ClientSession(
cookies=cookies, connector=connector))
else:
self._sio_client = AsyncClient(
request_timeout=20,
http_session=self._session
)
self._sio_client = AsyncClient(request_timeout=20, http_session=aiohttp.ClientSession(
connector=connector))

# pylint: disable=protected-access
self._sio_client._trigger_event = ensure_socket_response(self._sio_client._trigger_event)
Expand Down

0 comments on commit e81876a

Please sign in to comment.