Changeset View
Changeset View
Standalone View
Standalone View
swh/scanner/policy.py
# Copyright (C) 2021 The Software Heritage developers | # Copyright (C) 2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import abc | import abc | ||||
import asyncio | from typing import no_type_check | ||||
import itertools | |||||
from typing import Dict, List, no_type_check | |||||
import aiohttp | |||||
from swh.core.utils import grouper | from swh.core.utils import grouper | ||||
from swh.model.from_disk import Directory | from swh.model.from_disk import Directory | ||||
from swh.model.identifiers import CONTENT, DIRECTORY, CoreSWHID | from swh.model.identifiers import CONTENT, DIRECTORY | ||||
from .client import QUERY_LIMIT, Client | |||||
from .data import MerkleNodeInfo | from .data import MerkleNodeInfo | ||||
from .exceptions import error_response | |||||
# Maximum number of SWHIDs that can be requested by a single call to the | |||||
# Web API endpoint /known/ | |||||
QUERY_LIMIT = 1000 | |||||
async def swhids_discovery( | |||||
swhids: List[CoreSWHID], session: aiohttp.ClientSession, api_url: str, | |||||
) -> Dict[str, Dict[str, bool]]: | |||||
"""API Request to get information about the SoftWare Heritage persistent | |||||
IDentifiers (SWHIDs) given in input. | |||||
Args: | |||||
swhids: a list of CoreSWHID instances | |||||
api_url: url for the API request | |||||
Returns: | |||||
A dictionary with: | |||||
key: | |||||
string SWHID searched | |||||
value: | |||||
value['known'] = True if the SWHID is found | |||||
value['known'] = False if the SWHID is not found | |||||
""" | |||||
endpoint = api_url + "known/" | |||||
requests = [] | |||||
def get_chunk(swhids): | |||||
for i in range(0, len(swhids), QUERY_LIMIT): | |||||
yield swhids[i : i + QUERY_LIMIT] | |||||
async def make_request(swhids): | |||||
swhids = [str(swhid) for swhid in swhids] | |||||
async with session.post(endpoint, json=swhids) as resp: | |||||
if resp.status != 200: | |||||
error_response(resp.reason, resp.status, endpoint) | |||||
return await resp.json() | |||||
if len(swhids) > QUERY_LIMIT: | |||||
for swhids_chunk in get_chunk(swhids): | |||||
requests.append(asyncio.create_task(make_request(swhids_chunk))) | |||||
res = await asyncio.gather(*requests) | |||||
# concatenate list of dictionaries | |||||
return dict(itertools.chain.from_iterable(e.items() for e in res)) | |||||
else: | |||||
return await make_request(swhids) | |||||
def source_size(source_tree: Directory): | def source_size(source_tree: Directory): | ||||
"""return the size of a source tree as the number of nodes it contains | """return the size of a source tree as the number of nodes it contains | ||||
""" | """ | ||||
return sum(1 for n in source_tree.iter_tree(dedup=False)) | return sum(1 for n in source_tree.iter_tree(dedup=False)) | ||||
class Policy(metaclass=abc.ABCMeta): | class Policy(metaclass=abc.ABCMeta): | ||||
data: MerkleNodeInfo | data: MerkleNodeInfo | ||||
"""information about contents and directories of the merkle tree""" | """information about contents and directories of the merkle tree""" | ||||
source_tree: Directory | source_tree: Directory | ||||
"""representation of a source code project directory in the merkle tree""" | """representation of a source code project directory in the merkle tree""" | ||||
def __init__(self, source_tree: Directory, data: MerkleNodeInfo): | def __init__(self, source_tree: Directory, data: MerkleNodeInfo): | ||||
self.data = data | |||||
self.source_tree = source_tree | self.source_tree = source_tree | ||||
for node in source_tree.iter_tree(): | self.data = data | ||||
self.data[node.swhid()] = {"known": None} # type: ignore | |||||
@abc.abstractmethod | @abc.abstractmethod | ||||
async def run( | async def run(self, client: Client): | ||||
self, session: aiohttp.ClientSession, api_url: str, | |||||
): | |||||
"""Scan a source code project""" | """Scan a source code project""" | ||||
raise NotImplementedError("Must implement run method") | raise NotImplementedError("Must implement run method") | ||||
class LazyBFS(Policy): | class LazyBFS(Policy): | ||||
"""Read nodes in the merkle tree using the BFS algorithm. | """Read nodes in the merkle tree using the BFS algorithm. | ||||
Lookup only directories that are unknown otherwise set all the downstream | Lookup only directories that are unknown otherwise set all the downstream | ||||
contents to known. | contents to known. | ||||
""" | """ | ||||
async def run( | async def run(self, client: Client): | ||||
self, session: aiohttp.ClientSession, api_url: str, | |||||
): | |||||
queue = [] | queue = [] | ||||
queue.append(self.source_tree) | queue.append(self.source_tree) | ||||
while queue: | while queue: | ||||
swhids = [node.swhid() for node in queue] | swhids = [node.swhid() for node in queue] | ||||
swhids_res = await swhids_discovery(swhids, session, api_url) | swhids_res = await client.known(swhids) | ||||
for node in queue.copy(): | for node in queue.copy(): | ||||
queue.remove(node) | queue.remove(node) | ||||
self.data[node.swhid()]["known"] = swhids_res[str(node.swhid())][ | self.data[node.swhid()]["known"] = swhids_res[str(node.swhid())][ | ||||
"known" | "known" | ||||
] | ] | ||||
if node.object_type == DIRECTORY: | if node.object_type == DIRECTORY: | ||||
if not self.data[node.swhid()]["known"]: | if not self.data[node.swhid()]["known"]: | ||||
children = [n[1] for n in list(node.items())] | children = [n[1] for n in list(node.items())] | ||||
queue.extend(children) | queue.extend(children) | ||||
else: | else: | ||||
for sub_node in node.iter_tree(): | for sub_node in node.iter_tree(): | ||||
if sub_node == node: | if sub_node == node: | ||||
continue | continue | ||||
self.data[sub_node.swhid()]["known"] = True # type: ignore | self.data[sub_node.swhid()]["known"] = True # type: ignore | ||||
class GreedyBFS(Policy): | class GreedyBFS(Policy): | ||||
"""Query graph nodes in chunks (to maximize the Web API rate limit use) and set the | """Query graph nodes in chunks (to maximize the Web API rate limit use) and set the | ||||
downstream contents of known directories to known. | downstream contents of known directories to known. | ||||
""" | """ | ||||
async def run( | async def run(self, client: Client): | ||||
self, session: aiohttp.ClientSession, api_url: str, | |||||
): | |||||
ssize = source_size(self.source_tree) | ssize = source_size(self.source_tree) | ||||
seen = [] | seen = [] | ||||
async for nodes_chunk in self.get_nodes_chunks(session, api_url, ssize): | async for nodes_chunk in self.get_nodes_chunks(client, ssize): | ||||
for node in nodes_chunk: | for node in nodes_chunk: | ||||
seen.append(node) | seen.append(node) | ||||
if len(seen) == ssize: | if len(seen) == ssize: | ||||
return | return | ||||
if node.object_type == DIRECTORY and self.data[node.swhid()]["known"]: | if node.object_type == DIRECTORY and self.data[node.swhid()]["known"]: | ||||
sub_nodes = [n for n in node.iter_tree(dedup=False)] | sub_nodes = [n for n in node.iter_tree(dedup=False)] | ||||
sub_nodes.remove(node) # remove root node | sub_nodes.remove(node) # remove root node | ||||
for sub_node in sub_nodes: | for sub_node in sub_nodes: | ||||
seen.append(sub_node) | seen.append(sub_node) | ||||
self.data[sub_node.swhid()]["known"] = True | self.data[sub_node.swhid()]["known"] = True | ||||
@no_type_check | @no_type_check | ||||
async def get_nodes_chunks( | async def get_nodes_chunks(self, client: Client, ssize: int): | ||||
self, session: aiohttp.ClientSession, api_url: str, ssize: int | |||||
): | |||||
"""Query chunks of QUERY_LIMIT nodes at once in order to fill the Web API | """Query chunks of QUERY_LIMIT nodes at once in order to fill the Web API | ||||
rate limit. It query all the nodes in the case the source code contains | rate limit. It query all the nodes in the case the source code contains | ||||
less than QUERY_LIMIT nodes. | less than QUERY_LIMIT nodes. | ||||
""" | """ | ||||
nodes = self.source_tree.iter_tree(dedup=False) | nodes = self.source_tree.iter_tree(dedup=False) | ||||
for nodes_chunk in grouper(nodes, QUERY_LIMIT): | for nodes_chunk in grouper(nodes, QUERY_LIMIT): | ||||
nodes_chunk = [n for n in nodes_chunk] | nodes_chunk = [n for n in nodes_chunk] | ||||
swhids = [node.swhid() for node in nodes_chunk] | swhids = [node.swhid() for node in nodes_chunk] | ||||
swhids_res = await swhids_discovery(swhids, session, api_url) | swhids_res = await client.known(swhids) | ||||
for node in nodes_chunk: | for node in nodes_chunk: | ||||
swhid = node.swhid() | swhid = node.swhid() | ||||
self.data[swhid]["known"] = swhids_res[str(swhid)]["known"] | self.data[swhid]["known"] = swhids_res[str(swhid)]["known"] | ||||
yield nodes_chunk | yield nodes_chunk | ||||
class FilePriority(Policy): | class FilePriority(Policy): | ||||
"""Check the Merkle tree querying all the file contents and set all the upstream | """Check the Merkle tree querying all the file contents and set all the upstream | ||||
directories to unknown in the case a file content is unknown. | directories to unknown in the case a file content is unknown. | ||||
Finally check all the directories which status is still unknown and set all the | Finally check all the directories which status is still unknown and set all the | ||||
sub-directories of known directories to known. | sub-directories of known directories to known. | ||||
""" | """ | ||||
@no_type_check | @no_type_check | ||||
async def run( | async def run(self, client: Client): | ||||
self, session: aiohttp.ClientSession, api_url: str, | |||||
): | |||||
# get all the files | # get all the files | ||||
all_contents = list( | all_contents = list( | ||||
filter( | filter( | ||||
lambda node: node.object_type == CONTENT, self.source_tree.iter_tree() | lambda node: node.object_type == CONTENT, self.source_tree.iter_tree() | ||||
) | ) | ||||
) | ) | ||||
all_contents.reverse() # check deepest node first | all_contents.reverse() # check deepest node first | ||||
# query the backend to get all file contents status | # query the backend to get all file contents status | ||||
cnt_swhids = [node.swhid() for node in all_contents] | cnt_swhids = [node.swhid() for node in all_contents] | ||||
cnt_status_res = await swhids_discovery(cnt_swhids, session, api_url) | cnt_status_res = await client.known(cnt_swhids) | ||||
# set all the file contents status | # set all the file contents status | ||||
for cnt in all_contents: | for cnt in all_contents: | ||||
self.data[cnt.swhid()]["known"] = cnt_status_res[str(cnt.swhid())]["known"] | self.data[cnt.swhid()]["known"] = cnt_status_res[str(cnt.swhid())]["known"] | ||||
# set all the upstream directories of unknown file contents to unknown | # set all the upstream directories of unknown file contents to unknown | ||||
if not self.data[cnt.swhid()]["known"]: | if not self.data[cnt.swhid()]["known"]: | ||||
parent = cnt.parents[0] | parent = cnt.parents[0] | ||||
while parent: | while parent: | ||||
self.data[parent.swhid()]["known"] = False | self.data[parent.swhid()]["known"] = False | ||||
parent = parent.parents[0] if parent.parents else None | parent = parent.parents[0] if parent.parents else None | ||||
# get all unset directories and check their status | # get all unset directories and check their status | ||||
# (update children directories accordingly) | # (update children directories accordingly) | ||||
unset_dirs = list( | unset_dirs = list( | ||||
filter( | filter( | ||||
lambda node: node.object_type == DIRECTORY | lambda node: node.object_type == DIRECTORY | ||||
and self.data[node.swhid()]["known"] is None, | and self.data[node.swhid()]["known"] is None, | ||||
self.source_tree.iter_tree(), | self.source_tree.iter_tree(), | ||||
) | ) | ||||
) | ) | ||||
# check unset directories | # check unset directories | ||||
for dir_ in unset_dirs: | for dir_ in unset_dirs: | ||||
if self.data[dir_.swhid()]["known"] is None: | if self.data[dir_.swhid()]["known"] is None: | ||||
# update directory status | # update directory status | ||||
dir_status = await swhids_discovery([dir_.swhid()], session, api_url) | dir_status = await client.known([dir_.swhid()]) | ||||
dir_known = dir_status[str(dir_.swhid())]["known"] | dir_known = dir_status[str(dir_.swhid())]["known"] | ||||
self.data[dir_.swhid()]["known"] = dir_known | self.data[dir_.swhid()]["known"] = dir_known | ||||
if dir_known: | if dir_known: | ||||
sub_dirs = list( | sub_dirs = list( | ||||
filter( | filter( | ||||
lambda n: n.object_type == DIRECTORY | lambda n: n.object_type == DIRECTORY | ||||
and self.data[n.swhid()]["known"] is None, | and self.data[n.swhid()]["known"] is None, | ||||
dir_.iter_tree(), | dir_.iter_tree(), | ||||
) | ) | ||||
) | ) | ||||
for node in sub_dirs: | for node in sub_dirs: | ||||
self.data[node.swhid()]["known"] = True | self.data[node.swhid()]["known"] = True | ||||
class DirectoryPriority(Policy): | class DirectoryPriority(Policy): | ||||
"""Check the Merkle tree querying all the directories that have at least one file | """Check the Merkle tree querying all the directories that have at least one file | ||||
content and set all the upstream directories to unknown in the case a directory | content and set all the upstream directories to unknown in the case a directory | ||||
is unknown otherwise set all the downstream contents to known. | is unknown otherwise set all the downstream contents to known. | ||||
Finally check the status of empty directories and all the remaining file | Finally check the status of empty directories and all the remaining file | ||||
contents. | contents. | ||||
""" | """ | ||||
@no_type_check | @no_type_check | ||||
async def run( | async def run(self, client: Client): | ||||
self, session: aiohttp.ClientSession, api_url: str, | |||||
): | |||||
# get all directory contents that have at least one file content | # get all directory contents that have at least one file content | ||||
unknown_dirs = list( | unknown_dirs = list( | ||||
filter( | filter( | ||||
lambda dir_: dir_.object_type == DIRECTORY and self.has_contents(dir_), | lambda dir_: dir_.object_type == DIRECTORY and self.has_contents(dir_), | ||||
self.source_tree.iter_tree(), | self.source_tree.iter_tree(), | ||||
) | ) | ||||
) | ) | ||||
unknown_dirs.reverse() # check deepest node first | unknown_dirs.reverse() # check deepest node first | ||||
for dir_ in unknown_dirs: | for dir_ in unknown_dirs: | ||||
if self.data[dir_.swhid()]["known"] is None: | if self.data[dir_.swhid()]["known"] is None: | ||||
dir_status = await swhids_discovery([dir_.swhid()], session, api_url) | dir_status = await client.known([dir_.swhid()]) | ||||
dir_known = dir_status[str(dir_.swhid())]["known"] | dir_known = dir_status[str(dir_.swhid())]["known"] | ||||
self.data[dir_.swhid()]["known"] = dir_known | self.data[dir_.swhid()]["known"] = dir_known | ||||
# set all the downstream file contents to known | # set all the downstream file contents to known | ||||
if dir_known: | if dir_known: | ||||
for cnt in self.get_contents(dir_): | for cnt in self.get_contents(dir_): | ||||
self.data[cnt.swhid()]["known"] = True | self.data[cnt.swhid()]["known"] = True | ||||
# otherwise set all the upstream directories to unknown | # otherwise set all the upstream directories to unknown | ||||
else: | else: | ||||
parent = dir_.parents[0] | parent = dir_.parents[0] | ||||
while parent: | while parent: | ||||
self.data[parent.swhid()]["known"] = False | self.data[parent.swhid()]["known"] = False | ||||
parent = parent.parents[0] if parent.parents else None | parent = parent.parents[0] if parent.parents else None | ||||
# get remaining directories that have no file contents | # get remaining directories that have no file contents | ||||
empty_dirs = list( | empty_dirs = list( | ||||
filter( | filter( | ||||
lambda n: n.object_type == DIRECTORY | lambda n: n.object_type == DIRECTORY | ||||
and not self.has_contents(n) | and not self.has_contents(n) | ||||
and self.data[n.swhid()]["known"] is None, | and self.data[n.swhid()]["known"] is None, | ||||
self.source_tree.iter_tree(), | self.source_tree.iter_tree(), | ||||
) | ) | ||||
) | ) | ||||
empty_dirs_swhids = [n.swhid() for n in empty_dirs] | empty_dirs_swhids = [n.swhid() for n in empty_dirs] | ||||
empty_dir_status = await swhids_discovery(empty_dirs_swhids, session, api_url) | empty_dir_status = await client.known(empty_dirs_swhids) | ||||
# update status of directories that have no file contents | # update status of directories that have no file contents | ||||
for dir_ in empty_dirs: | for dir_ in empty_dirs: | ||||
self.data[dir_.swhid()]["known"] = empty_dir_status[str(dir_.swhid())][ | self.data[dir_.swhid()]["known"] = empty_dir_status[str(dir_.swhid())][ | ||||
"known" | "known" | ||||
] | ] | ||||
# check unknown file contents | # check unknown file contents | ||||
unknown_cnts = list( | unknown_cnts = list( | ||||
filter( | filter( | ||||
lambda n: n.object_type == CONTENT | lambda n: n.object_type == CONTENT | ||||
and self.data[n.swhid()]["known"] is None, | and self.data[n.swhid()]["known"] is None, | ||||
self.source_tree.iter_tree(), | self.source_tree.iter_tree(), | ||||
) | ) | ||||
) | ) | ||||
unknown_cnts_swhids = [n.swhid() for n in unknown_cnts] | unknown_cnts_swhids = [n.swhid() for n in unknown_cnts] | ||||
unknown_cnts_status = await swhids_discovery( | unknown_cnts_status = await client.known(unknown_cnts_swhids) | ||||
unknown_cnts_swhids, session, api_url | |||||
) | |||||
for cnt in unknown_cnts: | for cnt in unknown_cnts: | ||||
self.data[cnt.swhid()]["known"] = unknown_cnts_status[str(cnt.swhid())][ | self.data[cnt.swhid()]["known"] = unknown_cnts_status[str(cnt.swhid())][ | ||||
"known" | "known" | ||||
] | ] | ||||
def has_contents(self, directory: Directory): | def has_contents(self, directory: Directory): | ||||
"""Check if the directory given in input has contents""" | """Check if the directory given in input has contents""" | ||||
Show All 9 Lines | def get_contents(self, dir_: Directory): | ||||
yield node | yield node | ||||
class QueryAll(Policy): | class QueryAll(Policy): | ||||
"""Check the status of every node in the Merkle tree. | """Check the status of every node in the Merkle tree. | ||||
""" | """ | ||||
@no_type_check | @no_type_check | ||||
async def run( | async def run(self, client: Client): | ||||
self, session: aiohttp.ClientSession, api_url: str, | |||||
): | |||||
all_nodes = [node for node in self.source_tree.iter_tree()] | all_nodes = [node for node in self.source_tree.iter_tree()] | ||||
all_swhids = [node.swhid() for node in all_nodes] | all_swhids = [node.swhid() for node in all_nodes] | ||||
swhids_res = await swhids_discovery(all_swhids, session, api_url) | swhids_res = await client.known(all_swhids) | ||||
for node in all_nodes: | for node in all_nodes: | ||||
self.data[node.swhid()]["known"] = swhids_res[str(node.swhid())]["known"] | self.data[node.swhid()]["known"] = swhids_res[str(node.swhid())]["known"] |