Changeset View
Changeset View
Standalone View
Standalone View
swh/scanner/policy.py
# Copyright (C) 2021 The Software Heritage developers | # Copyright (C) 2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import abc | import abc | ||||
from typing import no_type_check | import itertools | ||||
from typing import Iterable, List, no_type_check | |||||
from swh.core.utils import grouper | from swh.core.utils import grouper | ||||
from swh.model.from_disk import Directory | from swh.loader.core import discovery | ||||
from swh.model import from_disk | |||||
from swh.model.from_disk import model | |||||
from swh.model.model import Sha1Git | |||||
from .client import QUERY_LIMIT, Client | from .client import QUERY_LIMIT, Client | ||||
from .data import MerkleNodeInfo | from .data import MerkleNodeInfo | ||||
def source_size(source_tree: Directory): | def source_size(source_tree: from_disk.Directory): | ||||
"""return the size of a source tree as the number of nodes it contains""" | """return the size of a source tree as the number of nodes it contains""" | ||||
return sum(1 for n in source_tree.iter_tree(dedup=False)) | return sum(1 for n in source_tree.iter_tree(dedup=False)) | ||||
class Policy(metaclass=abc.ABCMeta): | class Policy(metaclass=abc.ABCMeta): | ||||
data: MerkleNodeInfo | data: MerkleNodeInfo | ||||
"""information about contents and directories of the merkle tree""" | """information about contents and directories of the merkle tree""" | ||||
source_tree: Directory | source_tree: from_disk.Directory | ||||
"""representation of a source code project directory in the merkle tree""" | """representation of a source code project directory in the merkle tree""" | ||||
def __init__(self, source_tree: Directory, data: MerkleNodeInfo): | def __init__(self, source_tree: from_disk.Directory, data: MerkleNodeInfo): | ||||
self.source_tree = source_tree | self.source_tree = source_tree | ||||
self.data = data | self.data = data | ||||
@abc.abstractmethod | @abc.abstractmethod | ||||
async def run(self, client: Client): | async def run(self, client: Client): | ||||
"""Scan a source code project""" | """Scan a source code project""" | ||||
raise NotImplementedError("Must implement run method") | raise NotImplementedError("Must implement run method") | ||||
▲ Show 20 Lines • Show All 189 Lines • ▼ Show 20 Lines | async def run(self, client: Client): | ||||
unknown_cnts_swhids = [n.swhid() for n in unknown_cnts] | unknown_cnts_swhids = [n.swhid() for n in unknown_cnts] | ||||
unknown_cnts_status = await client.known(unknown_cnts_swhids) | unknown_cnts_status = await client.known(unknown_cnts_swhids) | ||||
for cnt in unknown_cnts: | for cnt in unknown_cnts: | ||||
self.data[cnt.swhid()]["known"] = unknown_cnts_status[str(cnt.swhid())][ | self.data[cnt.swhid()]["known"] = unknown_cnts_status[str(cnt.swhid())][ | ||||
"known" | "known" | ||||
] | ] | ||||
def has_contents(self, directory: Directory): | def has_contents(self, directory: from_disk.Directory): | ||||
"""Check if the directory given in input has contents""" | """Check if the directory given in input has contents""" | ||||
for entry in directory.entries: | for entry in directory.entries: | ||||
if entry["type"] == "file": | if entry["type"] == "file": | ||||
return True | return True | ||||
return False | return False | ||||
def get_contents(self, dir_: Directory): | def get_contents(self, dir_: from_disk.Directory): | ||||
"""Get all the contents of a given directory""" | """Get all the contents of a given directory""" | ||||
for _, node in list(dir_.items()): | for _, node in list(dir_.items()): | ||||
if node.object_type == "content": | if node.object_type == "content": | ||||
yield node | yield node | ||||
class WebAPIConnection(discovery.ArchiveDiscoveryInterface): | |||||
"""Use the web APIs to query the archive""" | |||||
def __init__( | |||||
self, | |||||
contents: List[model.Content], | |||||
skipped_contents: List[model.SkippedContent], | |||||
directories: List[model.Directory], | |||||
client: Client, | |||||
) -> None: | |||||
super().__init__(contents, skipped_contents, directories) | |||||
self.client = client | |||||
self.sha_to_swhid = {} | |||||
self.swhid_to_sha = {} | |||||
for content in contents: | |||||
swhid = str(content.swhid()) | |||||
ardumont: etc... below
(as far as i remember to `.swhid` does trigger computation, so might as well… | |||||
Done Inline ActionsAh, good point. I sometimes forget there isn't an optimizer going over my code, heh. Alphare: Ah, good point. I sometimes forget there isn't an optimizer going over my code, heh. | |||||
self.sha_to_swhid[content.sha1_git] = swhid | |||||
self.swhid_to_sha[swhid] = content.sha1_git | |||||
for directory in directories: | |||||
swhid = str(directory.swhid()) | |||||
self.sha_to_swhid[directory.id] = swhid | |||||
self.swhid_to_sha[swhid] = directory.id | |||||
async def content_missing(self, contents: List[Sha1Git]) -> List[Sha1Git]: | |||||
"""List content missing from the archive by sha1""" | |||||
return await self._missing(contents) | |||||
async def skipped_content_missing( | |||||
self, skipped_contents: List[Sha1Git] | |||||
) -> Iterable[Sha1Git]: | |||||
"""List skipped content missing from the archive by sha1""" | |||||
# TODO what should we do about skipped contents? | |||||
return skipped_contents | |||||
async def directory_missing(self, directories: List[Sha1Git]) -> Iterable[Sha1Git]: | |||||
"""List directories missing from the archive by sha1""" | |||||
return await self._missing(directories) | |||||
async def _missing(self, shas): | |||||
res = await self.client.known([self.sha_to_swhid[o] for o in shas]) | |||||
return [self.swhid_to_sha[k] for k, v in res.items() if not v["known"]] | |||||
class RandomDirSamplingPriority(Policy): | |||||
"""Check the Merkle tree querying random directories. Set all ancestors to | |||||
unknown for unknown directories, otherwise set all descendants to known. | |||||
Finally check all the remaining file contents. | |||||
""" | |||||
@no_type_check | |||||
async def run(self, client: Client): | |||||
contents, skipped_contents, directories = from_disk.iter_directory( | |||||
self.source_tree | |||||
) | |||||
get_unknowns = discovery.filter_known_objects( | |||||
WebAPIConnection(contents, skipped_contents, directories, client), | |||||
) | |||||
unknowns = set(itertools.chain(*await get_unknowns)) | |||||
for obj in itertools.chain(contents, skipped_contents, directories): | |||||
self.data[obj.swhid()]["known"] = obj not in unknowns | |||||
class QueryAll(Policy): | class QueryAll(Policy): | ||||
"""Check the status of every node in the Merkle tree.""" | """Check the status of every node in the Merkle tree.""" | ||||
@no_type_check | @no_type_check | ||||
async def run(self, client: Client): | async def run(self, client: Client): | ||||
all_nodes = [node for node in self.source_tree.iter_tree()] | all_nodes = [node for node in self.source_tree.iter_tree()] | ||||
all_swhids = [node.swhid() for node in all_nodes] | all_swhids = [node.swhid() for node in all_nodes] | ||||
swhids_res = await client.known(all_swhids) | swhids_res = await client.known(all_swhids) | ||||
for node in all_nodes: | for node in all_nodes: | ||||
self.data[node.swhid()]["known"] = swhids_res[str(node.swhid())]["known"] | self.data[node.swhid()]["known"] = swhids_res[str(node.swhid())]["known"] |
etc... below
(as far as i remember to .swhid does trigger computation, so might as well reduce that)