Page MenuHomeSoftware Heritage

D8541.id30801.diff
No OneTemporary

D8541.id30801.diff

diff --git a/swh/scanner/client.py b/swh/scanner/client.py
--- a/swh/scanner/client.py
+++ b/swh/scanner/client.py
@@ -12,7 +12,8 @@
import asyncio
import itertools
-from typing import Any, Dict, List, Optional
+import time
+from typing import Any, Dict, List, Optional, Tuple
import aiohttp
@@ -23,17 +24,40 @@
# Maximum number of SWHIDs that can be requested by a single call to the
# Web API endpoint /known/
QUERY_LIMIT = 1000
+MAX_RETRY = 10
KNOWN_EP = "known/"
GRAPH_RANDOMWALK_EP = "graph/randomwalk/"
+def _get_chunk(swhids):
+ """slice a list of `swhids` into smaller list of size QUERY_LIMIT"""
+ for i in range(0, len(swhids), QUERY_LIMIT):
+ yield swhids[i : i + QUERY_LIMIT]
+
+
+def _parse_limit_header(response) -> Tuple[Optional[int], Optional[int], Optional[int]]:
+ """parse the X-RateLimit Headers if any"""
+ limit = response.headers.get("X-RateLimit-Limit")
+ if limit is not None:
+ limit = int(limit)
+ remaining = response.headers.get("X-RateLimit-Remaining")
+ if remaining is not None:
+ remaining = int(remaining)
+ reset = response.headers.get("X-RateLimit-Reset")
+ if reset is not None:
+ reset = int(reset)
+ return (limit, remaining, reset)
+
+
class Client:
"""Manage requests to the Software Heritage Web API."""
def __init__(self, api_url: str, session: aiohttp.ClientSession):
+ self._sleep = 0
self.api_url = api_url
self.session = session
+ self._known_endpoint = self.api_url + KNOWN_EP
async def get_origin(self, swhid: CoreSWHID) -> Optional[Any]:
"""Walk the compressed graph to discover the origin of a given swhid"""
@@ -70,27 +94,93 @@
value['known'] = False if the SWHID is not found
"""
- endpoint = self.api_url + KNOWN_EP
requests = []
- def get_chunk(swhids):
- for i in range(0, len(swhids), QUERY_LIMIT):
- yield swhids[i : i + QUERY_LIMIT]
-
- async def make_request(swhids):
- swhids = [str(swhid) for swhid in swhids]
- async with self.session.post(endpoint, json=swhids) as resp:
- if resp.status != 200:
- error_response(resp.reason, resp.status, endpoint)
-
- return await resp.json()
+ swh_ids = [str(swhid) for swhid in swhids]
- if len(swhids) > QUERY_LIMIT:
- for swhids_chunk in get_chunk(swhids):
- requests.append(asyncio.create_task(make_request(swhids_chunk)))
+ if len(swhids) <= QUERY_LIMIT:
+ return await self._make_request(swh_ids)
+ else:
+ for swhids_chunk in _get_chunk(swh_ids):
+ task = asyncio.create_task(self._make_request(swhids_chunk))
+ requests.append(task)
res = await asyncio.gather(*requests)
# concatenate list of dictionaries
return dict(itertools.chain.from_iterable(e.items() for e in res))
+
+ def _mark_success(self, limit=None, remaining=None, reset=None):
+ """call when a request is successfully made, this will adjust the rate
+
+ The extra argument can be used to transmit the X-RateLimit information
+ from the server. This will be used to adjust the request rate"""
+ self._sleep = 0
+ factor = 0
+ if limit is not None and remaining is not None and reset is not None:
+ current = time.time()
+ time_windows = reset - current
+ if time_windows <= 0:
+ return
+ used_up = remaining / limit
+ if remaining <= 0:
+ # no more credit, we can sit up and wait.
+ #
+ # XXX we should warn the user. This can get very long.
+ self._sleep = time_windows
+ factor = -1
+ elif 0.99 < used_up:
+ # let us not limit the first flight of request.
+ factor = 0
+ else:
+ # the deeper we consume the credit the higher is the rate
+ # limiting, let's put a brake on our current rate the lower we get
+ #
+ # (The factor range from 1 to 1000)
+ factor = (0.01 + used_up) ** -1.5
+ if factor >= 0:
+ self._sleep = (1 + (time_windows / remaining)) * factor
+
+ def _mark_failure(self, limit=None, remaining=None, reset=None):
+ """call when a request failed, this will reduce the request rate.
+
+ The extra argument can be used to transmit the X-RateLimit information
+ from the server. This will be used to adjust the request rate"""
+ if remaining is not None and reset is not None and remaining <= 0:
+ current = time.time()
+ wait_for = reset - current
+ wait_for *= 1.1 # Add some margin to please the rate limiting code
+ if wait_for > 0 and wait_for >= self._sleep:
+ self._sleep = wait_for
+ return
+ if self._sleep <= 0:
+ self._sleep = 1
else:
- return await make_request(swhids)
+ self._sleep *= 2
+
+ async def _make_request(self, swhids):
+ endpoint = self._known_endpoint
+
+ data = None
+
+ retry = MAX_RETRY
+
+ while data is None:
+ # slow the pace of request if needed
+ if self._sleep > 0:
+ time.sleep(self._sleep)
+ async with self.session.post(endpoint, json=swhids) as resp:
+ rate_limit = _parse_limit_header(resp)
+ if resp.status == 200:
+ try:
+ # inform of success before the await
+ self._mark_success(*rate_limit)
+ data = await resp.json()
+ except aiohttp.client_exceptions.ClientConnectionError:
+ raise
+ else:
+ break
+ self._mark_failure(*rate_limit)
+ retry -= 1
+ if retry <= 0 or resp.status == 413: # 413: Payload Too Large
+ error_response(resp.reason, resp.status, endpoint)
+ return data

File Metadata

Mime Type
text/plain
Expires
Dec 19 2024, 4:01 PM (11 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3234241

Event Timeline