diff --git a/swh/scanner/benchmark_algos.py b/swh/scanner/benchmark_algos.py index dc1fd4c..e11a04b 100644 --- a/swh/scanner/benchmark_algos.py +++ b/swh/scanner/benchmark_algos.py @@ -1,391 +1,387 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import collections import itertools import json import os from pathlib import Path import random from typing import Dict, Iterable, List, Optional import requests from requests.adapters import HTTPAdapter from requests.packages.urllib3.util.retry import Retry from swh.model.from_disk import Content, Directory, accept_all_directories from swh.model.identifiers import CONTENT, DIRECTORY, swhid from .exceptions import APIError from .model import Status, Tree from .scanner import directory_filter, extract_regex_objs session = requests.Session() retries_rule = Retry(total=5, backoff_factor=1) session.mount("http://", HTTPAdapter(max_retries=retries_rule)) def query_swhids( swhids: List[Tree], api_url: str, counter: Optional[collections.Counter] = None ) -> Dict[str, Dict[str, bool]]: """ Returns: A dictionary with: key(str): persistent identifier value(dict): value['known'] = True if pid is found value['known'] = False if pid is not found """ endpoint = api_url + "known/" chunk_size = 1000 if counter: counter["queries"] += len(swhids) def make_request(swhids): swhids = [swhid.swhid for swhid in swhids] req = session.post(endpoint, json=swhids) if req.status_code != 200: error_message = "%s with given values %s" % (req.text, str(swhids)) raise APIError(error_message) if counter: counter["api_calls"] += 1 resp = req.text return json.loads(resp) def get_chunk(swhids): for i in range(0, len(swhids), chunk_size): yield swhids[i : i + chunk_size] if len(swhids) > chunk_size: return dict( itertools.chain.from_iterable( make_request(swhids_chunk).items() for swhids_chunk in get_chunk(swhids) ) ) else: return make_request(swhids) def stopngo(source_tree: Tree, api_url: str, counter: collections.Counter): nodes = [] nodes.append(source_tree) while len(nodes) > 0: parsed_nodes = query_swhids(nodes, api_url, counter) for node in nodes.copy(): nodes.remove(node) node.known = parsed_nodes[node.swhid]["known"] node.status = Status.queried if node.otype == DIRECTORY: if not node.known: nodes.extend(list(node.children.values())) else: set_children_status(node, [CONTENT, DIRECTORY], True) def set_father_status(node, known): """ Recursively change father known and visited status of a given node """ parent = node.father if parent is None: return if parent.status != Status.unset: return parent.known = known set_father_status(parent, known) def set_children_status( node: Tree, node_types: Iterable[str], known: bool, status: Status = Status.unset ): """ Recursively change the status of the children of the provided node """ for child_node in node.iterate(): if child_node.otype in node_types and child_node.status == status: child_node.known = known def file_priority(source_tree: Tree, api_url: str, counter: collections.Counter): # get all the files all_contents = list( filter(lambda node: node.otype == CONTENT, source_tree.iterate_bfs()) ) all_contents.reverse() # we check nodes from the deepest # query the backend to get all file contents status parsed_contents = query_swhids(all_contents, api_url, counter) # set all the file contents status for cnt in all_contents: cnt.known = parsed_contents[cnt.swhid]["known"] cnt.status = Status.queried # set all the upstream directories of unknown file contents to unknown if not cnt.known: set_father_status(cnt, False) # get all unset directories and check their status # (update children directories accordingly) unset_dirs = list( filter( lambda node: node.otype == DIRECTORY and node.status == Status.unset, source_tree.iterate(), ) ) if source_tree.status == Status.unset: unset_dirs.append(source_tree) # check unset directories for dir_ in unset_dirs: if dir_.status == Status.unset: # update directory status dir_.known = query_swhids([dir_], api_url, counter)[dir_.swhid]["known"] dir_.status = Status.queried if dir_.known: set_children_status(dir_, [DIRECTORY], True) def directory_priority(source_tree: Tree, api_url: str, counter: collections.Counter): # get all directory contents that have at least one file content unset_dirs = list( filter( lambda dir_: dir_.otype == DIRECTORY and dir_.has_contents, source_tree.iterate_bfs(), ) ) unset_dirs.reverse() - # insert root if it has no contents - if source_tree.has_contents: - unset_dirs.append(source_tree) for dir_ in unset_dirs: # if the directory is known set all the downstream file contents to known if dir_.status == Status.unset: dir_.known = query_swhids([dir_], api_url, counter)[dir_.swhid]["known"] dir_.status = Status.queried if dir_.known: set_children_status(dir_, [CONTENT], True) else: set_father_status(dir_, False) # get remaining directories that have no file contents unset_dirs_no_cnts = list( filter( lambda node: node.otype == DIRECTORY and not node.has_contents, source_tree.iterate_bfs(), ) ) parsed_dirs_no_cnts = query_swhids(unset_dirs_no_cnts, api_url, counter) # update status of directories that have no file contents for dir_ in unset_dirs_no_cnts: dir_.known = parsed_dirs_no_cnts[dir_.swhid]["known"] dir_.status = Status.queried # check unknown file contents unset_files = list( filter( lambda node: node.otype == CONTENT and node.status == Status.unset, source_tree.iterate(), ) ) parsed_unset_files = query_swhids(unset_files, api_url, counter) for file_ in unset_files: file_.known = parsed_unset_files[file_.swhid]["known"] file_.status = Status.queried def random_( source_tree: Tree, api_url: str, counter: collections.Counter, seed: Optional[int] = None, ): if seed: random.seed(seed) # get all directory/file contents all_nodes = [node for node in source_tree.iterate()] + [source_tree] # shuffle contents random.shuffle(all_nodes) while len(all_nodes): node = all_nodes.pop() if node.status != Status.unset: continue node.known = query_swhids([node], api_url, counter)[node.swhid]["known"] node.status = Status.queried if node.otype == DIRECTORY and node.known: for child_node in node.iterate(): child_node.known = True elif node.otype == CONTENT and not node.known: set_father_status(node, False) def algo_min(source_tree: Tree, api_url: str): """ The minimal number of queries knowing the known/unknown status of every node """ def remove_parents(node, nodes): parent = node.father if parent is None or parent not in nodes: return else: nodes.remove(parent) remove_parents(parent, nodes) def remove_children(node, nodes): for child_node in node.iterate(): nodes.remove(child_node) - all_nodes = [node for node in source_tree.iterate()] - all_nodes.insert(0, source_tree) + all_nodes = [node for node in source_tree.iterate_bfs()] parsed_nodes = query_swhids(all_nodes, api_url) for node in all_nodes: node.known = parsed_nodes[node.swhid]["known"] all_nodes_copy = all_nodes.copy() for node in all_nodes: if node.otype == CONTENT and not node.known: all_nodes_copy.remove(node) remove_parents(node, all_nodes_copy) for node in all_nodes_copy: if node.otype == DIRECTORY and node.known: remove_children(node, all_nodes_copy) return len(all_nodes_copy) def get_swhids(paths: Iterable[Path], exclude_patterns): def swhid_of(path): if path.is_dir(): if exclude_patterns: def dir_filter(dirpath, *args): return directory_filter(dirpath, exclude_patterns) else: dir_filter = accept_all_directories obj = Directory.from_disk( path=bytes(path), dir_filter=dir_filter ).get_data() return swhid(DIRECTORY, obj) else: obj = Content.from_file(path=bytes(path)).get_data() return swhid(CONTENT, obj) for path in paths: yield str(path), swhid_of(path) def load_source(root, sre_patterns): """ Load the source code inside the Tree data structure """ def _scan(root_path, source_tree, sre_patterns): dirpath, dnames, fnames = next(os.walk(root_path, followlinks=False)) dirpath = Path(dirpath) if fnames: files = [dirpath.joinpath(fname) for fname in fnames] parsed_file_swhids = dict(get_swhids(files, sre_patterns)) for path, swhid_ in parsed_file_swhids.items(): source_tree.add_node(Path(path), swhid_) if dnames: dirs = [dirpath.joinpath(dname) for dname in dnames] parsed_dirs_swhids = dict(get_swhids(dirs, sre_patterns)) for path, swhid_ in parsed_dirs_swhids.items(): if not directory_filter(path, sre_patterns): continue source_tree.add_node(Path(path), swhid_) _scan(path, source_tree, sre_patterns) source_tree = Tree(root) root_swhid = dict(get_swhids([root], sre_patterns)) source_tree.swhid = root_swhid[str(root)] _scan(root, source_tree, sre_patterns) return source_tree def run( root: str, api_url: str, backend_name: str, exclude_patterns: Iterable[str], algo: str, origin: str, commit: str, seed: Optional[int] = None, ): sre_patterns = set() if exclude_patterns: sre_patterns = { reg_obj for reg_obj in extract_regex_objs(Path(root), exclude_patterns) } # temporary directory prefix repo_id = Path(root).parts[-1].split("_")[0] counter: collections.Counter = collections.Counter() counter["api_calls"] = 0 counter["queries"] = 0 source_tree = load_source(Path(root), sre_patterns) if algo == "random": if seed: random_(source_tree, api_url, counter, seed) else: random_(source_tree, api_url, counter) elif algo == "algo_min": min_queries = algo_min(source_tree, api_url) min_result = ( repo_id, origin, commit, backend_name, len(source_tree), algo, -1, min_queries, ) print(*min_result, sep=",") return elif algo == "stopngo": stopngo(source_tree, api_url, counter) elif algo == "file_priority": file_priority(source_tree, api_url, counter) elif algo == "directory_priority": directory_priority(source_tree, api_url, counter) else: raise Exception(f'Algorithm "{algo}" not found') result = ( repo_id, origin, commit, backend_name, len(source_tree), algo, counter["api_calls"], counter["queries"], ) print(*result, sep=",") diff --git a/swh/scanner/model.py b/swh/scanner/model.py index 51a498b..e4802f2 100644 --- a/swh/scanner/model.py +++ b/swh/scanner/model.py @@ -1,291 +1,294 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations from enum import Enum import json from pathlib import Path import sys from typing import Any, Dict, Iterator, List, Tuple import ndjson from swh.model.identifiers import CONTENT, DIRECTORY from .exceptions import InvalidDirectoryPath, InvalidObjectType from .plot import generate_sunburst, offline_plot class Color(Enum): blue = "\033[94m" green = "\033[92m" yellow = "\033[93m" magenta = "\033[95m" red = "\033[91m" end = "\033[0m" def colorize(text: str, color: Color): return color.value + text + Color.end.value class Status(Enum): unset = 0 set = 1 queried = 2 class Tree: """Representation of a file system structure """ def __init__(self, path: Path, father: Tree = None): self.father = father self.path = path self.otype = DIRECTORY if path.is_dir() else CONTENT self.swhid = "" self.known = False self.status = Status.unset self.children: Dict[Path, Tree] = {} def __len__(self): return sum(1 for node in self.iterate()) + 1 # the root node def add_node(self, path: Path, swhid: str) -> None: """Recursively add a new path. """ relative_path = path.relative_to(self.path) if relative_path == Path("."): self.swhid = swhid return new_path = self.path.joinpath(relative_path.parts[0]) if new_path not in self.children: self.children[new_path] = Tree(new_path, self) self.children[new_path].add_node(path, swhid) def show(self, fmt) -> None: """Show tree in different formats""" if fmt == "json": print(json.dumps(self.to_dict(), indent=4, sort_keys=True)) if fmt == "ndjson": print(ndjson.dumps(node.attributes for node in self.iterate())) elif fmt == "text": isatty = sys.stdout.isatty() print(colorize(str(self.path), Color.blue) if isatty else str(self.path)) self.print_children(isatty) elif fmt == "sunburst": root = self.path directories = self.get_directories_info(root) sunburst = generate_sunburst(directories, root) offline_plot(sunburst) def print_children(self, isatty: bool, inc: int = 1) -> None: for path, node in self.children.items(): self.print_node(node, isatty, inc) if node.children: node.print_children(isatty, inc + 1) def print_node(self, node: Any, isatty: bool, inc: int) -> None: rel_path = str(node.path.relative_to(self.path)) begin = "│ " * inc end = "/" if node.otype == DIRECTORY else "" if isatty: if node.status == Status.unset: rel_path = colorize(rel_path, Color.magenta) elif node.status == Status.set and not node.known: rel_path = colorize(rel_path, Color.yellow) elif node.status == Status.set and node.known: rel_path = colorize(rel_path, Color.blue) elif node.status == Status.queried and not node.known: rel_path = colorize(rel_path, Color.red) elif node.status == Status.queried and node.known: rel_path = colorize(rel_path, Color.green) print(f"{begin}{rel_path}{end}") @property def known(self): return self._known @known.setter def known(self, value: bool): self._known = value self.status = Status.set @property def attributes(self) -> Dict[str, Dict[str, Any]]: """ Get the attributes of the current node grouped by the relative path. Returns: a dictionary containing a path as key and its known/unknown status and the SWHID as values. """ return {str(self.path): {"swhid": self.swhid, "known": self.known,}} def to_dict(self) -> Dict[str, Dict[str, Any]]: """ Recursively flatten the current tree nodes into a dictionary. For example, if you have the following structure: .. code-block:: none root { subdir: { file.txt } } The generated dictionary will be: .. code-block:: none { "root": { "swhid": "...", "known": True/False } "root/subdir": { "swhid": "...", "known": True/False } "root/subdir/file.txt": { "swhid": "...", "known": True/False } } """ return {k: v for node in self.iterate() for k, v in node.attributes.items()} def iterate(self) -> Iterator[Tree]: """ Recursively iterate through the children of the current node """ for _, child_node in self.children.items(): yield child_node if child_node.otype == DIRECTORY: yield from child_node.iterate() def iterate_bfs(self) -> Iterator[Tree]: """Get nodes in BFS order """ - nodes = set(node for node in self.children.values()) - for node in nodes: - yield node - for node in nodes: - if node.otype == DIRECTORY: - yield from node.iterate_bfs() + nodes = [] + nodes.append(self) + + while len(nodes) > 0: + for node in nodes.copy(): + yield node + nodes.remove(node) + if node.otype == DIRECTORY: + nodes.extend(list(node.children.values())) def get_files_from_dir(self, dir_path: Path) -> List: """ Retrieve files information about a specific directory path Returns: A list containing the files attributes present inside the directory given in input """ def get_files(node): files = [] for _, node in node.children.items(): if node.otype == CONTENT: files.append(node.attributes) return files if dir_path == self.path: return get_files(self) else: for node in self.iterate(): if node.path == dir_path: return get_files(node) raise InvalidDirectoryPath( "The directory provided doesn't match any stored directory" ) def _get_sub_dirs_info(self, root, directories): """Fills the directories given in input with the contents information stored inside the directory child, only if they have contents. """ for path, child_node in self.children.items(): if child_node.otype == DIRECTORY: rel_path = path.relative_to(root) contents_info = child_node.count_contents() # checks the first element of the tuple # (the number of contents in a directory) # if it is equal to zero it means that there are no contents # in that directory. if not contents_info[0] == 0: directories[rel_path] = contents_info if child_node.has_dirs(): child_node._get_sub_dirs_info(root, directories) def get_directories_info(self, root: Path) -> Dict[Path, Tuple[int, int]]: """Get information about all directories under the given root. Returns: A dictionary with a directory path as key and the relative contents information (the result of count_contents) as values. """ directories = {root: self.count_contents()} self._get_sub_dirs_info(root, directories) return directories def count_contents(self) -> Tuple[int, int]: """Count how many contents are present inside a directory. If a directory has a SWHID returns as it has all the contents. Returns: A tuple with the total number of the contents and the number of contents known (the ones that have a persistent identifier). """ contents = 0 discovered = 0 if not self.otype == DIRECTORY: raise InvalidObjectType( "Can't count contents of the object type: %s" % self.otype ) if self.known: # to identify a directory with all files/directories present return (1, 1) else: for _, child_node in self.children.items(): if child_node.otype == CONTENT: contents += 1 if child_node.known: discovered += 1 return (contents, discovered) def has_dirs(self) -> bool: """Checks if node has directories """ for _, child_node in self.children.items(): if child_node.otype == DIRECTORY: return True return False def has_contents(self) -> bool: for _, child_node in self.children.items(): if child_node.otype == CONTENT: return True return False