Changeset View
Changeset View
Standalone View
Standalone View
swh/scanner/policy.py
Show All 9 Lines | |||||
import aiohttp | import aiohttp | ||||
from swh.model.from_disk import Directory | from swh.model.from_disk import Directory | ||||
from swh.model.identifiers import CONTENT, DIRECTORY | from swh.model.identifiers import CONTENT, DIRECTORY | ||||
from .data import MerkleNodeInfo | from .data import MerkleNodeInfo | ||||
from .exceptions import error_response | from .exceptions import error_response | ||||
zackUnsubmitted Not Done Inline Actions
zack: 1) add a docstring for this
2) we have this information in a number of different places now… | |||||
# Maximum number of SWHIDs that can be requested by a single call to the | |||||
# Web API endpoint /known/ | |||||
QUERY_LIMIT = 1000 | |||||
async def swhids_discovery( | async def swhids_discovery( | ||||
swhids: List[str], session: aiohttp.ClientSession, api_url: str, | swhids: List[str], session: aiohttp.ClientSession, api_url: str, | ||||
Not Done Inline ActionsYou should translate SWHIDs to string as late as possible. So please make (in a separate diff) this function take CoreSWHIDs instances rather than converting them to string when you call this function. (Sorry, I noticed this only now :-)) zack: You should translate SWHIDs to string as late as possible. So please make (in a separate diff)… | |||||
) -> Dict[str, Dict[str, bool]]: | ) -> Dict[str, Dict[str, bool]]: | ||||
"""API Request to get information about the SoftWare Heritage persistent | """API Request to get information about the SoftWare Heritage persistent | ||||
IDentifiers (SWHIDs) given in input. | IDentifiers (SWHIDs) given in input. | ||||
Args: | Args: | ||||
swhids: a list of SWHIDS | swhids: a list of SWHIDS | ||||
api_url: url for the API request | api_url: url for the API request | ||||
Returns: | Returns: | ||||
A dictionary with: | A dictionary with: | ||||
key: | key: | ||||
SWHID searched | SWHID searched | ||||
value: | value: | ||||
value['known'] = True if the SWHID is found | value['known'] = True if the SWHID is found | ||||
value['known'] = False if the SWHID is not found | value['known'] = False if the SWHID is not found | ||||
""" | """ | ||||
endpoint = api_url + "known/" | endpoint = api_url + "known/" | ||||
chunk_size = 1000 | |||||
requests = [] | requests = [] | ||||
def get_chunk(swhids): | def get_chunk(swhids): | ||||
for i in range(0, len(swhids), chunk_size): | for i in range(0, len(swhids), QUERY_LIMIT): | ||||
yield swhids[i : i + chunk_size] | yield swhids[i : i + QUERY_LIMIT] | ||||
async def make_request(swhids): | async def make_request(swhids): | ||||
async with session.post(endpoint, json=swhids) as resp: | async with session.post(endpoint, json=swhids) as resp: | ||||
if resp.status != 200: | if resp.status != 200: | ||||
error_response(resp.reason, resp.status, endpoint) | error_response(resp.reason, resp.status, endpoint) | ||||
return await resp.json() | return await resp.json() | ||||
if len(swhids) > chunk_size: | if len(swhids) > QUERY_LIMIT: | ||||
for swhids_chunk in get_chunk(swhids): | for swhids_chunk in get_chunk(swhids): | ||||
requests.append(asyncio.create_task(make_request(swhids_chunk))) | requests.append(asyncio.create_task(make_request(swhids_chunk))) | ||||
res = await asyncio.gather(*requests) | res = await asyncio.gather(*requests) | ||||
# concatenate list of dictionaries | # concatenate list of dictionaries | ||||
return dict(itertools.chain.from_iterable(e.items() for e in res)) | return dict(itertools.chain.from_iterable(e.items() for e in res)) | ||||
else: | else: | ||||
return await make_request(swhids) | return await make_request(swhids) | ||||
Show All 17 Lines | class Policy(metaclass=abc.ABCMeta): | ||||
async def run( | async def run( | ||||
self, session: aiohttp.ClientSession, api_url: str, | self, session: aiohttp.ClientSession, api_url: str, | ||||
): | ): | ||||
"""Scan a source code project""" | """Scan a source code project""" | ||||
raise NotImplementedError("Must implement run method") | raise NotImplementedError("Must implement run method") | ||||
class LazyBFS(Policy): | class LazyBFS(Policy): | ||||
"""Read nodes in the merkle tree using the BFS algorithm. | |||||
Lookup only directories that are unknown otherwise set all the downstream | |||||
contents to known. | |||||
""" | |||||
async def run( | async def run( | ||||
self, session: aiohttp.ClientSession, api_url: str, | self, session: aiohttp.ClientSession, api_url: str, | ||||
): | ): | ||||
queue = [] | queue = [] | ||||
queue.append(self.source_tree) | queue.append(self.source_tree) | ||||
while queue: | while queue: | ||||
swhids = [str(node.swhid()) for node in queue] | swhids = [str(node.swhid()) for node in queue] | ||||
Show All 10 Lines | ): | ||||
else: | else: | ||||
for sub_node in node.iter_tree(): | for sub_node in node.iter_tree(): | ||||
if sub_node == node: | if sub_node == node: | ||||
continue | continue | ||||
self.data[sub_node.swhid()]["known"] = True # type: ignore | self.data[sub_node.swhid()]["known"] = True # type: ignore | ||||
class FilePriority(Policy): | class FilePriority(Policy): | ||||
"""Check the Merkle tree querying all the file contents and set all the upstream | |||||
directories to unknown in the case a file content is unknown. | |||||
Finally check all the directories which status is still unknown and set all the | |||||
sub-directories of known directories to known. | |||||
""" | |||||
@no_type_check | @no_type_check | ||||
async def run( | async def run( | ||||
self, session: aiohttp.ClientSession, api_url: str, | self, session: aiohttp.ClientSession, api_url: str, | ||||
): | ): | ||||
# get all the files | # get all the files | ||||
all_contents = list( | all_contents = list( | ||||
filter( | filter( | ||||
lambda node: node.object_type == CONTENT, self.source_tree.iter_tree() | lambda node: node.object_type == CONTENT, self.source_tree.iter_tree() | ||||
▲ Show 20 Lines • Show All 41 Lines • ▼ Show 20 Lines | ): | ||||
dir_.iter_tree(), | dir_.iter_tree(), | ||||
) | ) | ||||
) | ) | ||||
for node in sub_dirs: | for node in sub_dirs: | ||||
self.data[node.swhid()]["known"] = True | self.data[node.swhid()]["known"] = True | ||||
class DirectoryPriority(Policy): | class DirectoryPriority(Policy): | ||||
"""Check the Merkle tree querying all the directories that have at least one file | |||||
content and set all the upstream directories to unknown in the case a directory | |||||
is unknown otherwise set all the downstream contents to known. | |||||
Finally check the status of empty directories and all the remaining file | |||||
contents. | |||||
""" | |||||
@no_type_check | @no_type_check | ||||
async def run( | async def run( | ||||
self, session: aiohttp.ClientSession, api_url: str, | self, session: aiohttp.ClientSession, api_url: str, | ||||
): | ): | ||||
# get all directory contents that have at least one file content | # get all directory contents that have at least one file content | ||||
unknown_dirs = list( | unknown_dirs = list( | ||||
filter( | filter( | ||||
lambda dir_: dir_.object_type == DIRECTORY and self.has_contents(dir_), | lambda dir_: dir_.object_type == DIRECTORY and self.has_contents(dir_), | ||||
▲ Show 20 Lines • Show All 63 Lines • ▼ Show 20 Lines | def has_contents(self, directory: Directory): | ||||
return True | return True | ||||
return False | return False | ||||
def get_contents(self, dir_: Directory): | def get_contents(self, dir_: Directory): | ||||
"""Get all the contents of a given directory""" | """Get all the contents of a given directory""" | ||||
for _, node in list(dir_.items()): | for _, node in list(dir_.items()): | ||||
if node.object_type == CONTENT: | if node.object_type == CONTENT: | ||||
yield node | yield node | ||||
Not Done Inline ActionsI noticed it only now, even though it's a minor issue that affect all policies. Can you please add a docstring to each Policy class that briefly describe what that policy does? zack: I noticed it only now, even though it's a minor issue that affect all policies.
Can you please… | |||||
class QueryAll(Policy): | |||||
"""Check the status of every node in the Merkle tree. | |||||
""" | |||||
@no_type_check | |||||
async def run( | |||||
self, session: aiohttp.ClientSession, api_url: str, | |||||
): | |||||
all_nodes = [node for node in self.source_tree.iter_tree()] | |||||
all_swhids = [str(node.swhid()) for node in all_nodes] | |||||
swhids_res = await swhids_discovery(all_swhids, session, api_url) | |||||
for node in all_nodes: | |||||
self.data[node.swhid()]["known"] = swhids_res[str(node.swhid())]["known"] |