Changeset View
Changeset View
Standalone View
Standalone View
swh/scanner/policy.py
# Copyright (C) 2021 The Software Heritage developers | # Copyright (C) 2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import abc | import abc | ||||
from typing import no_type_check | from typing import no_type_check | ||||
from swh.core.utils import grouper | from swh.core.utils import grouper | ||||
from swh.model.from_disk import Directory | from swh.model.from_disk import Directory | ||||
from swh.model.identifiers import CONTENT, DIRECTORY | |||||
from .client import QUERY_LIMIT, Client | from .client import QUERY_LIMIT, Client | ||||
from .data import MerkleNodeInfo | from .data import MerkleNodeInfo | ||||
def source_size(source_tree: Directory): | def source_size(source_tree: Directory): | ||||
"""return the size of a source tree as the number of nodes it contains | """return the size of a source tree as the number of nodes it contains | ||||
""" | """ | ||||
Show All 31 Lines | async def run(self, client: Client): | ||||
while queue: | while queue: | ||||
swhids = [node.swhid() for node in queue] | swhids = [node.swhid() for node in queue] | ||||
swhids_res = await client.known(swhids) | swhids_res = await client.known(swhids) | ||||
for node in queue.copy(): | for node in queue.copy(): | ||||
queue.remove(node) | queue.remove(node) | ||||
self.data[node.swhid()]["known"] = swhids_res[str(node.swhid())][ | self.data[node.swhid()]["known"] = swhids_res[str(node.swhid())][ | ||||
"known" | "known" | ||||
] | ] | ||||
if node.object_type == DIRECTORY: | if node.object_type == "directory": | ||||
if not self.data[node.swhid()]["known"]: | if not self.data[node.swhid()]["known"]: | ||||
children = [n[1] for n in list(node.items())] | children = [n[1] for n in list(node.items())] | ||||
queue.extend(children) | queue.extend(children) | ||||
else: | else: | ||||
for sub_node in node.iter_tree(): | for sub_node in node.iter_tree(): | ||||
if sub_node == node: | if sub_node == node: | ||||
continue | continue | ||||
self.data[sub_node.swhid()]["known"] = True # type: ignore | self.data[sub_node.swhid()]["known"] = True # type: ignore | ||||
class GreedyBFS(Policy): | class GreedyBFS(Policy): | ||||
"""Query graph nodes in chunks (to maximize the Web API rate limit use) and set the | """Query graph nodes in chunks (to maximize the Web API rate limit use) and set the | ||||
downstream contents of known directories to known. | downstream contents of known directories to known. | ||||
""" | """ | ||||
async def run(self, client: Client): | async def run(self, client: Client): | ||||
ssize = source_size(self.source_tree) | ssize = source_size(self.source_tree) | ||||
seen = [] | seen = [] | ||||
async for nodes_chunk in self.get_nodes_chunks(client, ssize): | async for nodes_chunk in self.get_nodes_chunks(client, ssize): | ||||
for node in nodes_chunk: | for node in nodes_chunk: | ||||
seen.append(node) | seen.append(node) | ||||
if len(seen) == ssize: | if len(seen) == ssize: | ||||
return | return | ||||
if node.object_type == DIRECTORY and self.data[node.swhid()]["known"]: | if node.object_type == "directory" and self.data[node.swhid()]["known"]: | ||||
sub_nodes = [n for n in node.iter_tree(dedup=False)] | sub_nodes = [n for n in node.iter_tree(dedup=False)] | ||||
sub_nodes.remove(node) # remove root node | sub_nodes.remove(node) # remove root node | ||||
for sub_node in sub_nodes: | for sub_node in sub_nodes: | ||||
seen.append(sub_node) | seen.append(sub_node) | ||||
self.data[sub_node.swhid()]["known"] = True | self.data[sub_node.swhid()]["known"] = True | ||||
@no_type_check | @no_type_check | ||||
async def get_nodes_chunks(self, client: Client, ssize: int): | async def get_nodes_chunks(self, client: Client, ssize: int): | ||||
Show All 19 Lines | """Check the Merkle tree querying all the file contents and set all the upstream | ||||
sub-directories of known directories to known. | sub-directories of known directories to known. | ||||
""" | """ | ||||
@no_type_check | @no_type_check | ||||
async def run(self, client: Client): | async def run(self, client: Client): | ||||
# get all the files | # get all the files | ||||
all_contents = list( | all_contents = list( | ||||
filter( | filter( | ||||
lambda node: node.object_type == CONTENT, self.source_tree.iter_tree() | lambda node: node.object_type == "content", self.source_tree.iter_tree() | ||||
) | ) | ||||
) | ) | ||||
all_contents.reverse() # check deepest node first | all_contents.reverse() # check deepest node first | ||||
# query the backend to get all file contents status | # query the backend to get all file contents status | ||||
cnt_swhids = [node.swhid() for node in all_contents] | cnt_swhids = [node.swhid() for node in all_contents] | ||||
cnt_status_res = await client.known(cnt_swhids) | cnt_status_res = await client.known(cnt_swhids) | ||||
# set all the file contents status | # set all the file contents status | ||||
for cnt in all_contents: | for cnt in all_contents: | ||||
self.data[cnt.swhid()]["known"] = cnt_status_res[str(cnt.swhid())]["known"] | self.data[cnt.swhid()]["known"] = cnt_status_res[str(cnt.swhid())]["known"] | ||||
# set all the upstream directories of unknown file contents to unknown | # set all the upstream directories of unknown file contents to unknown | ||||
if not self.data[cnt.swhid()]["known"]: | if not self.data[cnt.swhid()]["known"]: | ||||
parent = cnt.parents[0] | parent = cnt.parents[0] | ||||
while parent: | while parent: | ||||
self.data[parent.swhid()]["known"] = False | self.data[parent.swhid()]["known"] = False | ||||
parent = parent.parents[0] if parent.parents else None | parent = parent.parents[0] if parent.parents else None | ||||
# get all unset directories and check their status | # get all unset directories and check their status | ||||
# (update children directories accordingly) | # (update children directories accordingly) | ||||
unset_dirs = list( | unset_dirs = list( | ||||
filter( | filter( | ||||
lambda node: node.object_type == DIRECTORY | lambda node: node.object_type == "directory" | ||||
and self.data[node.swhid()]["known"] is None, | and self.data[node.swhid()]["known"] is None, | ||||
self.source_tree.iter_tree(), | self.source_tree.iter_tree(), | ||||
) | ) | ||||
) | ) | ||||
# check unset directories | # check unset directories | ||||
for dir_ in unset_dirs: | for dir_ in unset_dirs: | ||||
if self.data[dir_.swhid()]["known"] is None: | if self.data[dir_.swhid()]["known"] is None: | ||||
# update directory status | # update directory status | ||||
dir_status = await client.known([dir_.swhid()]) | dir_status = await client.known([dir_.swhid()]) | ||||
dir_known = dir_status[str(dir_.swhid())]["known"] | dir_known = dir_status[str(dir_.swhid())]["known"] | ||||
self.data[dir_.swhid()]["known"] = dir_known | self.data[dir_.swhid()]["known"] = dir_known | ||||
if dir_known: | if dir_known: | ||||
sub_dirs = list( | sub_dirs = list( | ||||
filter( | filter( | ||||
lambda n: n.object_type == DIRECTORY | lambda n: n.object_type == "directory" | ||||
and self.data[n.swhid()]["known"] is None, | and self.data[n.swhid()]["known"] is None, | ||||
dir_.iter_tree(), | dir_.iter_tree(), | ||||
) | ) | ||||
) | ) | ||||
for node in sub_dirs: | for node in sub_dirs: | ||||
self.data[node.swhid()]["known"] = True | self.data[node.swhid()]["known"] = True | ||||
class DirectoryPriority(Policy): | class DirectoryPriority(Policy): | ||||
"""Check the Merkle tree querying all the directories that have at least one file | """Check the Merkle tree querying all the directories that have at least one file | ||||
content and set all the upstream directories to unknown in the case a directory | content and set all the upstream directories to unknown in the case a directory | ||||
is unknown otherwise set all the downstream contents to known. | is unknown otherwise set all the downstream contents to known. | ||||
Finally check the status of empty directories and all the remaining file | Finally check the status of empty directories and all the remaining file | ||||
contents. | contents. | ||||
""" | """ | ||||
@no_type_check | @no_type_check | ||||
async def run(self, client: Client): | async def run(self, client: Client): | ||||
# get all directory contents that have at least one file content | # get all directory contents that have at least one file content | ||||
unknown_dirs = list( | unknown_dirs = list( | ||||
filter( | filter( | ||||
lambda dir_: dir_.object_type == DIRECTORY and self.has_contents(dir_), | lambda dir_: dir_.object_type == "directory" | ||||
and self.has_contents(dir_), | |||||
self.source_tree.iter_tree(), | self.source_tree.iter_tree(), | ||||
) | ) | ||||
) | ) | ||||
unknown_dirs.reverse() # check deepest node first | unknown_dirs.reverse() # check deepest node first | ||||
for dir_ in unknown_dirs: | for dir_ in unknown_dirs: | ||||
if self.data[dir_.swhid()]["known"] is None: | if self.data[dir_.swhid()]["known"] is None: | ||||
dir_status = await client.known([dir_.swhid()]) | dir_status = await client.known([dir_.swhid()]) | ||||
dir_known = dir_status[str(dir_.swhid())]["known"] | dir_known = dir_status[str(dir_.swhid())]["known"] | ||||
self.data[dir_.swhid()]["known"] = dir_known | self.data[dir_.swhid()]["known"] = dir_known | ||||
# set all the downstream file contents to known | # set all the downstream file contents to known | ||||
if dir_known: | if dir_known: | ||||
for cnt in self.get_contents(dir_): | for cnt in self.get_contents(dir_): | ||||
self.data[cnt.swhid()]["known"] = True | self.data[cnt.swhid()]["known"] = True | ||||
# otherwise set all the upstream directories to unknown | # otherwise set all the upstream directories to unknown | ||||
else: | else: | ||||
parent = dir_.parents[0] | parent = dir_.parents[0] | ||||
while parent: | while parent: | ||||
self.data[parent.swhid()]["known"] = False | self.data[parent.swhid()]["known"] = False | ||||
parent = parent.parents[0] if parent.parents else None | parent = parent.parents[0] if parent.parents else None | ||||
# get remaining directories that have no file contents | # get remaining directories that have no file contents | ||||
empty_dirs = list( | empty_dirs = list( | ||||
filter( | filter( | ||||
lambda n: n.object_type == DIRECTORY | lambda n: n.object_type == "directory" | ||||
and not self.has_contents(n) | and not self.has_contents(n) | ||||
and self.data[n.swhid()]["known"] is None, | and self.data[n.swhid()]["known"] is None, | ||||
self.source_tree.iter_tree(), | self.source_tree.iter_tree(), | ||||
) | ) | ||||
) | ) | ||||
empty_dirs_swhids = [n.swhid() for n in empty_dirs] | empty_dirs_swhids = [n.swhid() for n in empty_dirs] | ||||
empty_dir_status = await client.known(empty_dirs_swhids) | empty_dir_status = await client.known(empty_dirs_swhids) | ||||
# update status of directories that have no file contents | # update status of directories that have no file contents | ||||
for dir_ in empty_dirs: | for dir_ in empty_dirs: | ||||
self.data[dir_.swhid()]["known"] = empty_dir_status[str(dir_.swhid())][ | self.data[dir_.swhid()]["known"] = empty_dir_status[str(dir_.swhid())][ | ||||
"known" | "known" | ||||
] | ] | ||||
# check unknown file contents | # check unknown file contents | ||||
unknown_cnts = list( | unknown_cnts = list( | ||||
filter( | filter( | ||||
lambda n: n.object_type == CONTENT | lambda n: n.object_type == "content" | ||||
and self.data[n.swhid()]["known"] is None, | and self.data[n.swhid()]["known"] is None, | ||||
self.source_tree.iter_tree(), | self.source_tree.iter_tree(), | ||||
) | ) | ||||
) | ) | ||||
unknown_cnts_swhids = [n.swhid() for n in unknown_cnts] | unknown_cnts_swhids = [n.swhid() for n in unknown_cnts] | ||||
unknown_cnts_status = await client.known(unknown_cnts_swhids) | unknown_cnts_status = await client.known(unknown_cnts_swhids) | ||||
for cnt in unknown_cnts: | for cnt in unknown_cnts: | ||||
self.data[cnt.swhid()]["known"] = unknown_cnts_status[str(cnt.swhid())][ | self.data[cnt.swhid()]["known"] = unknown_cnts_status[str(cnt.swhid())][ | ||||
"known" | "known" | ||||
] | ] | ||||
def has_contents(self, directory: Directory): | def has_contents(self, directory: Directory): | ||||
"""Check if the directory given in input has contents""" | """Check if the directory given in input has contents""" | ||||
for entry in directory.entries: | for entry in directory.entries: | ||||
if entry["type"] == "file": | if entry["type"] == "file": | ||||
return True | return True | ||||
return False | return False | ||||
def get_contents(self, dir_: Directory): | def get_contents(self, dir_: Directory): | ||||
"""Get all the contents of a given directory""" | """Get all the contents of a given directory""" | ||||
for _, node in list(dir_.items()): | for _, node in list(dir_.items()): | ||||
if node.object_type == CONTENT: | if node.object_type == "content": | ||||
yield node | yield node | ||||
class QueryAll(Policy): | class QueryAll(Policy): | ||||
"""Check the status of every node in the Merkle tree. | """Check the status of every node in the Merkle tree. | ||||
""" | """ | ||||
@no_type_check | @no_type_check | ||||
async def run(self, client: Client): | async def run(self, client: Client): | ||||
all_nodes = [node for node in self.source_tree.iter_tree()] | all_nodes = [node for node in self.source_tree.iter_tree()] | ||||
all_swhids = [node.swhid() for node in all_nodes] | all_swhids = [node.swhid() for node in all_nodes] | ||||
swhids_res = await client.known(all_swhids) | swhids_res = await client.known(all_swhids) | ||||
for node in all_nodes: | for node in all_nodes: | ||||
self.data[node.swhid()]["known"] = swhids_res[str(node.swhid())]["known"] | self.data[node.swhid()]["known"] = swhids_res[str(node.swhid())]["known"] |