Skip to content

Commit

Permalink
sleep on 429 (#45)
Browse files Browse the repository at this point in the history
Recently the loop often responds with 429 - "too many requests". This PR
enables automatic retries for REST calls after a small timeout.

---------

Co-authored-by: Niklas Neugebauer <[email protected]>
  • Loading branch information
denniswittich and NiklasNeugebauer authored Dec 3, 2024
1 parent b382199 commit 6a04d2a
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 3 deletions.
3 changes: 0 additions & 3 deletions learning_loop_node/data_exchanger.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,6 @@ async def download_images(self, image_uuids: List[str], image_folder: str, chunk

async def _download_one_image(self, path: str, image_id: str, image_folder: str) -> None:
response = await self.loop_communicator.get(path)
if response.status_code == 429:
await asyncio.sleep(1)
response = await self.loop_communicator.get(path)
if response.status_code != HTTPStatus.OK:
logging.error('bad status code %s for %s. Details: %s', response.status_code, path, response.text)
return
Expand Down
21 changes: 21 additions & 0 deletions learning_loop_node/loop_communication.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,23 @@

logging.basicConfig(level=logging.INFO)

SLEEP_TIME_ON_429 = 5
MAX_RETRIES_ON_429 = 20


def retry_on_429(func: Callable[..., Awaitable]) -> Callable[..., Awaitable]:
"""Decorator that retries requests that receive a 429 status code."""
async def wrapper(*args, **kwargs) -> httpx.Response:
for _ in range(MAX_RETRIES_ON_429):
response = await func(*args, **kwargs)
if response.status_code != 429:
return response

await asyncio.sleep(SLEEP_TIME_ON_429)

return response
return wrapper


class LoopCommunicationException(Exception):
"""Raised when there's an unexpected answer from the learning loop."""
Expand Down Expand Up @@ -96,6 +113,7 @@ async def get(self, path: str, requires_login: bool = True, api_prefix: str = '/
return await self.retry_on_401(self._get, path, api_prefix)
return await self._get(path, api_prefix)

@retry_on_429
async def _get(self, path: str, api_prefix: str) -> httpx.Response:
return await self.async_client.get(api_prefix+path)

Expand All @@ -105,6 +123,7 @@ async def put(self, path: str, files: Optional[List[str]] = None, requires_login
return await self.retry_on_401(self._put, path, files, api_prefix, **kwargs)
return await self._put(path, files, api_prefix, **kwargs)

@retry_on_429
async def _put(self, path: str, files: Optional[List[str]], api_prefix: str, **kwargs) -> httpx.Response:
if files is None:
return await self.async_client.put(api_prefix+path, **kwargs)
Expand Down Expand Up @@ -133,6 +152,7 @@ async def post(self, path: str, requires_login: bool = True, api_prefix: str = '
return await self.retry_on_401(self._post, path, api_prefix, **kwargs)
return await self._post(path, api_prefix, **kwargs)

@retry_on_429
async def _post(self, path, api_prefix='/api', **kwargs) -> httpx.Response:
return await self.async_client.post(api_prefix+path, **kwargs)

Expand All @@ -142,5 +162,6 @@ async def delete(self, path: str, requires_login: bool = True, api_prefix: str =
return await self.retry_on_401(self._delete, path, api_prefix, **kwargs)
return await self._delete(path, api_prefix, **kwargs)

@retry_on_429
async def _delete(self, path, api_prefix, **kwargs) -> httpx.Response:
return await self.async_client.delete(api_prefix+path, **kwargs)

0 comments on commit 6a04d2a

Please sign in to comment.