Changeset View
Changeset View
Standalone View
Standalone View
swh/scanner/policy.py
# Copyright (C) 2021 The Software Heritage developers | # Copyright (C) 2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import abc | import abc | ||||
import asyncio | import asyncio | ||||
import itertools | import itertools | ||||
from typing import Dict, List, no_type_check | from typing import Dict, List, no_type_check | ||||
import aiohttp | import aiohttp | ||||
from swh.model.from_disk import Directory | from swh.model.from_disk import Directory | ||||
from swh.model.identifiers import CONTENT, DIRECTORY | from swh.model.identifiers import CONTENT, DIRECTORY, CoreSWHID | ||||
from .data import MerkleNodeInfo | from .data import MerkleNodeInfo | ||||
from .exceptions import error_response | from .exceptions import error_response | ||||
# Maximum number of SWHIDs that can be requested by a single call to the | # Maximum number of SWHIDs that can be requested by a single call to the | ||||
# Web API endpoint /known/ | # Web API endpoint /known/ | ||||
QUERY_LIMIT = 1000 | QUERY_LIMIT = 1000 | ||||
async def swhids_discovery( | async def swhids_discovery( | ||||
swhids: List[str], session: aiohttp.ClientSession, api_url: str, | swhids: List[CoreSWHID], session: aiohttp.ClientSession, api_url: str, | ||||
) -> Dict[str, Dict[str, bool]]: | ) -> Dict[str, Dict[str, bool]]: | ||||
"""API Request to get information about the SoftWare Heritage persistent | """API Request to get information about the SoftWare Heritage persistent | ||||
IDentifiers (SWHIDs) given in input. | IDentifiers (SWHIDs) given in input. | ||||
Args: | Args: | ||||
swhids: a list of SWHIDS | swhids: a list of CoreSWHID instances | ||||
api_url: url for the API request | api_url: url for the API request | ||||
Returns: | Returns: | ||||
A dictionary with: | A dictionary with: | ||||
key: | key: | ||||
SWHID searched | string SWHID searched | ||||
value: | value: | ||||
value['known'] = True if the SWHID is found | value['known'] = True if the SWHID is found | ||||
value['known'] = False if the SWHID is not found | value['known'] = False if the SWHID is not found | ||||
""" | """ | ||||
endpoint = api_url + "known/" | endpoint = api_url + "known/" | ||||
requests = [] | requests = [] | ||||
def get_chunk(swhids): | def get_chunk(swhids): | ||||
for i in range(0, len(swhids), QUERY_LIMIT): | for i in range(0, len(swhids), QUERY_LIMIT): | ||||
yield swhids[i : i + QUERY_LIMIT] | yield swhids[i : i + QUERY_LIMIT] | ||||
async def make_request(swhids): | async def make_request(swhids): | ||||
swhids = [str(swhid) for swhid in swhids] | |||||
async with session.post(endpoint, json=swhids) as resp: | async with session.post(endpoint, json=swhids) as resp: | ||||
if resp.status != 200: | if resp.status != 200: | ||||
error_response(resp.reason, resp.status, endpoint) | error_response(resp.reason, resp.status, endpoint) | ||||
return await resp.json() | return await resp.json() | ||||
if len(swhids) > QUERY_LIMIT: | if len(swhids) > QUERY_LIMIT: | ||||
for swhids_chunk in get_chunk(swhids): | for swhids_chunk in get_chunk(swhids): | ||||
Show All 36 Lines | class LazyBFS(Policy): | ||||
async def run( | async def run( | ||||
self, session: aiohttp.ClientSession, api_url: str, | self, session: aiohttp.ClientSession, api_url: str, | ||||
): | ): | ||||
queue = [] | queue = [] | ||||
queue.append(self.source_tree) | queue.append(self.source_tree) | ||||
while queue: | while queue: | ||||
swhids = [str(node.swhid()) for node in queue] | swhids = [node.swhid() for node in queue] | ||||
swhids_res = await swhids_discovery(swhids, session, api_url) | swhids_res = await swhids_discovery(swhids, session, api_url) | ||||
for node in queue.copy(): | for node in queue.copy(): | ||||
queue.remove(node) | queue.remove(node) | ||||
self.data[node.swhid()]["known"] = swhids_res[str(node.swhid())][ | self.data[node.swhid()]["known"] = swhids_res[str(node.swhid())][ | ||||
"known" | "known" | ||||
] | ] | ||||
if node.object_type == DIRECTORY: | if node.object_type == DIRECTORY: | ||||
if not self.data[node.swhid()]["known"]: | if not self.data[node.swhid()]["known"]: | ||||
Show All 21 Lines | ): | ||||
all_contents = list( | all_contents = list( | ||||
filter( | filter( | ||||
lambda node: node.object_type == CONTENT, self.source_tree.iter_tree() | lambda node: node.object_type == CONTENT, self.source_tree.iter_tree() | ||||
) | ) | ||||
) | ) | ||||
all_contents.reverse() # check deepest node first | all_contents.reverse() # check deepest node first | ||||
# query the backend to get all file contents status | # query the backend to get all file contents status | ||||
cnt_swhids = [str(node.swhid()) for node in all_contents] | cnt_swhids = [node.swhid() for node in all_contents] | ||||
cnt_status_res = await swhids_discovery(cnt_swhids, session, api_url) | cnt_status_res = await swhids_discovery(cnt_swhids, session, api_url) | ||||
# set all the file contents status | # set all the file contents status | ||||
for cnt in all_contents: | for cnt in all_contents: | ||||
self.data[cnt.swhid()]["known"] = cnt_status_res[str(cnt.swhid())]["known"] | self.data[cnt.swhid()]["known"] = cnt_status_res[str(cnt.swhid())]["known"] | ||||
# set all the upstream directories of unknown file contents to unknown | # set all the upstream directories of unknown file contents to unknown | ||||
if not self.data[cnt.swhid()]["known"]: | if not self.data[cnt.swhid()]["known"]: | ||||
parent = cnt.parents[0] | parent = cnt.parents[0] | ||||
while parent: | while parent: | ||||
Show All 9 Lines | ): | ||||
self.source_tree.iter_tree(), | self.source_tree.iter_tree(), | ||||
) | ) | ||||
) | ) | ||||
# check unset directories | # check unset directories | ||||
for dir_ in unset_dirs: | for dir_ in unset_dirs: | ||||
if self.data[dir_.swhid()]["known"] is None: | if self.data[dir_.swhid()]["known"] is None: | ||||
# update directory status | # update directory status | ||||
dir_status = await swhids_discovery( | dir_status = await swhids_discovery([dir_.swhid()], session, api_url) | ||||
[str(dir_.swhid())], session, api_url | |||||
) | |||||
dir_known = dir_status[str(dir_.swhid())]["known"] | dir_known = dir_status[str(dir_.swhid())]["known"] | ||||
self.data[dir_.swhid()]["known"] = dir_known | self.data[dir_.swhid()]["known"] = dir_known | ||||
if dir_known: | if dir_known: | ||||
sub_dirs = list( | sub_dirs = list( | ||||
filter( | filter( | ||||
lambda n: n.object_type == DIRECTORY | lambda n: n.object_type == DIRECTORY | ||||
and self.data[n.swhid()]["known"] is None, | and self.data[n.swhid()]["known"] is None, | ||||
dir_.iter_tree(), | dir_.iter_tree(), | ||||
Show All 21 Lines | ): | ||||
lambda dir_: dir_.object_type == DIRECTORY and self.has_contents(dir_), | lambda dir_: dir_.object_type == DIRECTORY and self.has_contents(dir_), | ||||
self.source_tree.iter_tree(), | self.source_tree.iter_tree(), | ||||
) | ) | ||||
) | ) | ||||
unknown_dirs.reverse() # check deepest node first | unknown_dirs.reverse() # check deepest node first | ||||
for dir_ in unknown_dirs: | for dir_ in unknown_dirs: | ||||
if self.data[dir_.swhid()]["known"] is None: | if self.data[dir_.swhid()]["known"] is None: | ||||
dir_status = await swhids_discovery( | dir_status = await swhids_discovery([dir_.swhid()], session, api_url) | ||||
[str(dir_.swhid())], session, api_url | |||||
) | |||||
dir_known = dir_status[str(dir_.swhid())]["known"] | dir_known = dir_status[str(dir_.swhid())]["known"] | ||||
self.data[dir_.swhid()]["known"] = dir_known | self.data[dir_.swhid()]["known"] = dir_known | ||||
# set all the downstream file contents to known | # set all the downstream file contents to known | ||||
if dir_known: | if dir_known: | ||||
for cnt in self.get_contents(dir_): | for cnt in self.get_contents(dir_): | ||||
self.data[cnt.swhid()]["known"] = True | self.data[cnt.swhid()]["known"] = True | ||||
# otherwise set all the upstream directories to unknown | # otherwise set all the upstream directories to unknown | ||||
else: | else: | ||||
parent = dir_.parents[0] | parent = dir_.parents[0] | ||||
while parent: | while parent: | ||||
self.data[parent.swhid()]["known"] = False | self.data[parent.swhid()]["known"] = False | ||||
parent = parent.parents[0] if parent.parents else None | parent = parent.parents[0] if parent.parents else None | ||||
# get remaining directories that have no file contents | # get remaining directories that have no file contents | ||||
empty_dirs = list( | empty_dirs = list( | ||||
filter( | filter( | ||||
lambda n: n.object_type == DIRECTORY | lambda n: n.object_type == DIRECTORY | ||||
and not self.has_contents(n) | and not self.has_contents(n) | ||||
and self.data[n.swhid()]["known"] is None, | and self.data[n.swhid()]["known"] is None, | ||||
self.source_tree.iter_tree(), | self.source_tree.iter_tree(), | ||||
) | ) | ||||
) | ) | ||||
empty_dirs_swhids = [str(n.swhid()) for n in empty_dirs] | empty_dirs_swhids = [n.swhid() for n in empty_dirs] | ||||
empty_dir_status = await swhids_discovery(empty_dirs_swhids, session, api_url) | empty_dir_status = await swhids_discovery(empty_dirs_swhids, session, api_url) | ||||
# update status of directories that have no file contents | # update status of directories that have no file contents | ||||
for dir_ in empty_dirs: | for dir_ in empty_dirs: | ||||
self.data[dir_.swhid()]["known"] = empty_dir_status[str(dir_.swhid())][ | self.data[dir_.swhid()]["known"] = empty_dir_status[str(dir_.swhid())][ | ||||
"known" | "known" | ||||
] | ] | ||||
# check unknown file contents | # check unknown file contents | ||||
unknown_cnts = list( | unknown_cnts = list( | ||||
filter( | filter( | ||||
lambda n: n.object_type == CONTENT | lambda n: n.object_type == CONTENT | ||||
and self.data[n.swhid()]["known"] is None, | and self.data[n.swhid()]["known"] is None, | ||||
self.source_tree.iter_tree(), | self.source_tree.iter_tree(), | ||||
) | ) | ||||
) | ) | ||||
unknown_cnts_swhids = [str(n.swhid()) for n in unknown_cnts] | unknown_cnts_swhids = [n.swhid() for n in unknown_cnts] | ||||
unknown_cnts_status = await swhids_discovery( | unknown_cnts_status = await swhids_discovery( | ||||
unknown_cnts_swhids, session, api_url | unknown_cnts_swhids, session, api_url | ||||
) | ) | ||||
for cnt in unknown_cnts: | for cnt in unknown_cnts: | ||||
self.data[cnt.swhid()]["known"] = unknown_cnts_status[str(cnt.swhid())][ | self.data[cnt.swhid()]["known"] = unknown_cnts_status[str(cnt.swhid())][ | ||||
"known" | "known" | ||||
] | ] | ||||
Show All 16 Lines | class QueryAll(Policy): | ||||
"""Check the status of every node in the Merkle tree. | """Check the status of every node in the Merkle tree. | ||||
""" | """ | ||||
@no_type_check | @no_type_check | ||||
async def run( | async def run( | ||||
self, session: aiohttp.ClientSession, api_url: str, | self, session: aiohttp.ClientSession, api_url: str, | ||||
): | ): | ||||
all_nodes = [node for node in self.source_tree.iter_tree()] | all_nodes = [node for node in self.source_tree.iter_tree()] | ||||
all_swhids = [str(node.swhid()) for node in all_nodes] | all_swhids = [node.swhid() for node in all_nodes] | ||||
swhids_res = await swhids_discovery(all_swhids, session, api_url) | swhids_res = await swhids_discovery(all_swhids, session, api_url) | ||||
for node in all_nodes: | for node in all_nodes: | ||||
self.data[node.swhid()]["known"] = swhids_res[str(node.swhid())]["known"] | self.data[node.swhid()]["known"] = swhids_res[str(node.swhid())]["known"] |