Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7123638
D8541.id30801.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
6 KB
Subscribers
None
D8541.id30801.diff
View Options
diff --git a/swh/scanner/client.py b/swh/scanner/client.py
--- a/swh/scanner/client.py
+++ b/swh/scanner/client.py
@@ -12,7 +12,8 @@
import asyncio
import itertools
-from typing import Any, Dict, List, Optional
+import time
+from typing import Any, Dict, List, Optional, Tuple
import aiohttp
@@ -23,17 +24,40 @@
# Maximum number of SWHIDs that can be requested by a single call to the
# Web API endpoint /known/
QUERY_LIMIT = 1000
+MAX_RETRY = 10
KNOWN_EP = "known/"
GRAPH_RANDOMWALK_EP = "graph/randomwalk/"
+def _get_chunk(swhids):
+ """slice a list of `swhids` into smaller list of size QUERY_LIMIT"""
+ for i in range(0, len(swhids), QUERY_LIMIT):
+ yield swhids[i : i + QUERY_LIMIT]
+
+
+def _parse_limit_header(response) -> Tuple[Optional[int], Optional[int], Optional[int]]:
+ """parse the X-RateLimit Headers if any"""
+ limit = response.headers.get("X-RateLimit-Limit")
+ if limit is not None:
+ limit = int(limit)
+ remaining = response.headers.get("X-RateLimit-Remaining")
+ if remaining is not None:
+ remaining = int(remaining)
+ reset = response.headers.get("X-RateLimit-Reset")
+ if reset is not None:
+ reset = int(reset)
+ return (limit, remaining, reset)
+
+
class Client:
"""Manage requests to the Software Heritage Web API."""
def __init__(self, api_url: str, session: aiohttp.ClientSession):
+ self._sleep = 0
self.api_url = api_url
self.session = session
+ self._known_endpoint = self.api_url + KNOWN_EP
async def get_origin(self, swhid: CoreSWHID) -> Optional[Any]:
"""Walk the compressed graph to discover the origin of a given swhid"""
@@ -70,27 +94,93 @@
value['known'] = False if the SWHID is not found
"""
- endpoint = self.api_url + KNOWN_EP
requests = []
- def get_chunk(swhids):
- for i in range(0, len(swhids), QUERY_LIMIT):
- yield swhids[i : i + QUERY_LIMIT]
-
- async def make_request(swhids):
- swhids = [str(swhid) for swhid in swhids]
- async with self.session.post(endpoint, json=swhids) as resp:
- if resp.status != 200:
- error_response(resp.reason, resp.status, endpoint)
-
- return await resp.json()
+ swh_ids = [str(swhid) for swhid in swhids]
- if len(swhids) > QUERY_LIMIT:
- for swhids_chunk in get_chunk(swhids):
- requests.append(asyncio.create_task(make_request(swhids_chunk)))
+ if len(swhids) <= QUERY_LIMIT:
+ return await self._make_request(swh_ids)
+ else:
+ for swhids_chunk in _get_chunk(swh_ids):
+ task = asyncio.create_task(self._make_request(swhids_chunk))
+ requests.append(task)
res = await asyncio.gather(*requests)
# concatenate list of dictionaries
return dict(itertools.chain.from_iterable(e.items() for e in res))
+
+ def _mark_success(self, limit=None, remaining=None, reset=None):
+ """call when a request is successfully made, this will adjust the rate
+
+ The extra argument can be used to transmit the X-RateLimit information
+ from the server. This will be used to adjust the request rate"""
+ self._sleep = 0
+ factor = 0
+ if limit is not None and remaining is not None and reset is not None:
+ current = time.time()
+ time_windows = reset - current
+ if time_windows <= 0:
+ return
+ used_up = remaining / limit
+ if remaining <= 0:
+ # no more credit, we can sit up and wait.
+ #
+ # XXX we should warn the user. This can get very long.
+ self._sleep = time_windows
+ factor = -1
+ elif 0.99 < used_up:
+ # let us not limit the first flight of request.
+ factor = 0
+ else:
+ # the deeper we consume the credit the higher is the rate
+ # limiting, let's put a brake on our current rate the lower we get
+ #
+ # (The factor range from 1 to 1000)
+ factor = (0.01 + used_up) ** -1.5
+ if factor >= 0:
+ self._sleep = (1 + (time_windows / remaining)) * factor
+
+ def _mark_failure(self, limit=None, remaining=None, reset=None):
+ """call when a request failed, this will reduce the request rate.
+
+ The extra argument can be used to transmit the X-RateLimit information
+ from the server. This will be used to adjust the request rate"""
+ if remaining is not None and reset is not None and remaining <= 0:
+ current = time.time()
+ wait_for = reset - current
+ wait_for *= 1.1 # Add some margin to please the rate limiting code
+ if wait_for > 0 and wait_for >= self._sleep:
+ self._sleep = wait_for
+ return
+ if self._sleep <= 0:
+ self._sleep = 1
else:
- return await make_request(swhids)
+ self._sleep *= 2
+
+ async def _make_request(self, swhids):
+ endpoint = self._known_endpoint
+
+ data = None
+
+ retry = MAX_RETRY
+
+ while data is None:
+ # slow the pace of request if needed
+ if self._sleep > 0:
+ time.sleep(self._sleep)
+ async with self.session.post(endpoint, json=swhids) as resp:
+ rate_limit = _parse_limit_header(resp)
+ if resp.status == 200:
+ try:
+ # inform of success before the await
+ self._mark_success(*rate_limit)
+ data = await resp.json()
+ except aiohttp.client_exceptions.ClientConnectionError:
+ raise
+ else:
+ break
+ self._mark_failure(*rate_limit)
+ retry -= 1
+ if retry <= 0 or resp.status == 413: # 413: Payload Too Large
+ error_response(resp.reason, resp.status, endpoint)
+ return data
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Dec 19 2024, 4:01 PM (11 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3234241
Attached To
D8541: Make HTTP requests more resilient
Event Timeline
Log In to Comment