Skip to content

Commit bf30bd5

Browse files
committed
retry using a decorator
1 parent a79c235 commit bf30bd5

File tree

1 file changed

+24
-18
lines changed

1 file changed

+24
-18
lines changed

learning_loop_node/loop_communication.py

+24-18
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,23 @@
1111
logging.basicConfig(level=logging.INFO)
1212

1313
SLEEP_TIME_ON_429 = 5
14+
MAX_RETRIES_ON_429 = 20
15+
16+
17+
def retry_on_429(func: Callable[..., Awaitable]) -> Callable[..., Awaitable]:
18+
"""Decorator that retries requests that receive a 429 status code."""
19+
async def wrapper(*args, **kwargs) -> httpx.Response:
20+
retries = 0
21+
while retries < MAX_RETRIES_ON_429:
22+
response = await func(*args, **kwargs)
23+
if response.status_code != 429:
24+
return response
25+
26+
retries += 1
27+
await asyncio.sleep(SLEEP_TIME_ON_429)
28+
29+
return response
30+
return wrapper
1431

1532

1633
class LoopCommunicationException(Exception):
@@ -98,19 +115,17 @@ async def get(self, path: str, requires_login: bool = True, api_prefix: str = '/
98115
return await self.retry_on_401(self._get, path, api_prefix)
99116
return await self._get(path, api_prefix)
100117

118+
@retry_on_429
101119
async def _get(self, path: str, api_prefix: str) -> httpx.Response:
102-
response = await self.async_client.get(api_prefix+path)
103-
while response.status_code == 429:
104-
await asyncio.sleep(SLEEP_TIME_ON_429)
105-
response = await self.async_client.get(api_prefix+path)
106-
return response
120+
return await self.async_client.get(api_prefix+path)
107121

108122
async def put(self, path: str, files: Optional[List[str]] = None, requires_login: bool = True, api_prefix: str = '/api', **kwargs) -> httpx.Response:
109123
if requires_login:
110124
await self.ensure_login()
111125
return await self.retry_on_401(self._put, path, files, api_prefix, **kwargs)
112126
return await self._put(path, files, api_prefix, **kwargs)
113127

128+
@retry_on_429
114129
async def _put(self, path: str, files: Optional[List[str]], api_prefix: str, **kwargs) -> httpx.Response:
115130
if files is None:
116131
return await self.async_client.put(api_prefix+path, **kwargs)
@@ -127,9 +142,6 @@ async def _put(self, path: str, files: Optional[List[str]], api_prefix: str, **k
127142
try:
128143
file_list = [('files', fh) for fh in file_handles] # Use file handles
129144
response = await self.async_client.put(api_prefix+path, files=file_list)
130-
while response.status_code == 429:
131-
await asyncio.sleep(SLEEP_TIME_ON_429)
132-
response = await self.async_client.put(api_prefix+path, files=file_list)
133145
finally:
134146
for fh in file_handles:
135147
fh.close() # Ensure all files are closed
@@ -142,22 +154,16 @@ async def post(self, path: str, requires_login: bool = True, api_prefix: str = '
142154
return await self.retry_on_401(self._post, path, api_prefix, **kwargs)
143155
return await self._post(path, api_prefix, **kwargs)
144156

157+
@retry_on_429
145158
async def _post(self, path, api_prefix='/api', **kwargs) -> httpx.Response:
146-
response = await self.async_client.post(api_prefix+path, **kwargs)
147-
while response.status_code == 429:
148-
await asyncio.sleep(SLEEP_TIME_ON_429)
149-
response = await self.async_client.post(api_prefix+path, **kwargs)
150-
return response
159+
return await self.async_client.post(api_prefix+path, **kwargs)
151160

152161
async def delete(self, path: str, requires_login: bool = True, api_prefix: str = '/api', **kwargs) -> httpx.Response:
153162
if requires_login:
154163
await self.ensure_login()
155164
return await self.retry_on_401(self._delete, path, api_prefix, **kwargs)
156165
return await self._delete(path, api_prefix, **kwargs)
157166

167+
@retry_on_429
158168
async def _delete(self, path, api_prefix, **kwargs) -> httpx.Response:
159-
response = await self.async_client.delete(api_prefix+path, **kwargs)
160-
while response.status_code == 429:
161-
await asyncio.sleep(SLEEP_TIME_ON_429)
162-
response = await self.async_client.delete(api_prefix+path, **kwargs)
163-
return response
169+
return await self.async_client.delete(api_prefix+path, **kwargs)

0 commit comments

Comments
 (0)