diff --git a/benchmark.py b/benchmark.py new file mode 100755 index 0000000..919e961 --- /dev/null +++ b/benchmark.py @@ -0,0 +1,137 @@ +#!/usr/bin/env python3 +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import json +import logging +import os +from pathlib import Path +import shutil +import subprocess +import sys +from typing import Set + +import click + +SEED_OPTIONS = ["-s 10", "-s 20", "-s 30"] + + +def get_scenario_cmd(algo, kb_url, kb_label, origin_info, extracted_repo_path): + return [ + "swh", + "scanner", + "benchmark", + "--algo", + algo, + "--api-url", + kb_url, + "--backend-name", + kb_label, + "--origin", + origin_info["origin"], + "--commit", + origin_info["commit"], + "--exclude", + str(extracted_repo_path) + "/.git", + str(extracted_repo_path), + ] + + +def run_experiments( + repo_path: str, temp_path: str, kb_state_file: str, algos: Set[str] +): + """This function create a process for each experiment; one experiment is composed + by: the repository we want to scan, the algorithms we need to test and different + known-backends mapped in a "kb-state" file (given in input) + """ + dirpath, dnames, _ = next(os.walk(temp_path)) + extracted_repo_path = Path(dirpath).joinpath(dnames[0]) + + # get all the backends identifier and api URLs + backends = {} + with open(kb_state_file, "r") as kb_state_f: + for kb in kb_state_f.readlines(): + if kb.startswith("#"): + continue + elems = kb.split(" ") + backends[elems[0]] = elems[1] + + # get repository origin info from the "base_directory" + info_path = repo_path[:-7] + "info.json" + with open(info_path, "r") as json_file: + origin_info = json.load(json_file) + + scenario_cmds = [] + + for algo in algos: + for kb_label, kb_url in backends.items(): + if algo == "random": + for seed_opt in SEED_OPTIONS: + random_cmd = get_scenario_cmd( + algo, kb_url, kb_label, origin_info, str(extracted_repo_path) + ) + scenario_cmds.append(random_cmd + [seed_opt]) + else: + scenario_cmds.append( + get_scenario_cmd( + algo, kb_url, kb_label, origin_info, str(extracted_repo_path) + ) + ) + + processes = [ + subprocess.Popen(cmd, stdout=sys.stdout, stderr=sys.stderr) + for cmd in scenario_cmds + ] + + for proc in processes: + proc.wait() + + shutil.rmtree(extracted_repo_path) + + +@click.command( + help="""Run multiple benchmark from an input repository. The repository + will be unpacked in the provided temporary path and tested with + the input algorithms.""" +) +@click.argument("repo_path", type=click.Path(exists=True), required=True) +@click.argument("temp_path", type=click.Path(exists=True), required=True) +@click.argument("kb_state", type=click.Path(exists=True), required=True) +@click.option( + "-a", + "--algo", + "algos", + multiple=True, + required=True, + type=click.Choice( + ["stopngo", "file_priority", "directory_priority", "random", "algo_min"], + case_sensitive=False, + ), + metavar="ALGORITHM_NAME", + help="The algorithm name for the benchmark.", +) +def main(repo_path, temp_path, kb_state, algos): + logging.basicConfig( + filename="experiments.log", + format="%(asctime)s %(message)s", + datefmt="%m/%d/%Y %I:%M:%S %p", + ) + + try: + subprocess.run( + ["tar", "xvf", repo_path, "-C", temp_path], + check=True, + stdout=subprocess.DEVNULL, + stderr=sys.stderr, + ) + run_experiments(repo_path, temp_path, kb_state, set(algos)) + except Exception as e: + logging.exception(e) + except IOError as ioerror: + logging.exception(ioerror) + + +if __name__ == "__main__": + main() diff --git a/run_backend.sh b/run_backend.sh new file mode 100755 index 0000000..7b1fa37 --- /dev/null +++ b/run_backend.sh @@ -0,0 +1,15 @@ +#!/usr/bin/env bash + +# This script simply runs multiple scanner backend using the backend information provided from stdin. + +while IFS= read -r line; +do + kb_info=($line) + if [[ ${kb_info[0]} = "#" ]] + then + continue + else + gunicorn "-b" ${kb_info[1]:7:14} 'swh.scanner.backend:create_app("'${kb_info[2]}'")' & + fi + +done diff --git a/run_benchmark.sh b/run_benchmark.sh new file mode 100755 index 0000000..e5f06f9 --- /dev/null +++ b/run_benchmark.sh @@ -0,0 +1,34 @@ +#!/usr/bin/env bash + +# This script run the benchmark for the repositories taken from stdin +# You should provide: +# - temporary directory (where the repositories will be unpacked) +# - backend mapping file containing: the backend name, the backend api +# URL and the sqlite db file. +# - the algorithms to be executed +# +# USAGE EXAMPLE: +# find /repositories/dir -name '*.tar.zst' | ./run_benchmark.sh /temporary/dir kb_state.txt stopngo file_priority + + +temp_dir=$1 +kb_state=$2 + +if [ ! -d "$temp_dir" ]; then + echo "You should provide a valid temporary directory path" + exit 1 +fi + +if [ "$kb_state" == '' ]; then + echo "You should provide the file with mapped knowledge bases" + exit 1 +fi + +for i in "${@:3}"; do + algos="$algos -a $i" +done + +while IFS= read -r repo; +do + ./benchmark.py $repo $temp_dir $kb_state $algos +done diff --git a/swh/scanner/backend.py b/swh/scanner/backend.py index 6caa2c3..56290a9 100644 --- a/swh/scanner/backend.py +++ b/swh/scanner/backend.py @@ -1,41 +1,49 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from pathlib import Path + from flask import Flask, request from .db import Db from .exceptions import LargePayloadExc LIMIT = 1000 -def create_app(db: Db): +def create_app(db_file: str): """Backend for swh-scanner, implementing the /known endpoint of the Software Heritage Web API""" app = Flask(__name__) + db = Db(Path(db_file)) @app.route("/api/1/known/", methods=["POST"]) def known(): swhids = request.get_json() if len(swhids) > LIMIT: raise LargePayloadExc( f"The maximum number of SWHIDs this endpoint can receive is {LIMIT}" ) cur = db.conn.cursor() res = {swhid: {"known": db.known(swhid, cur)} for swhid in swhids} cur.close() return res return app -def run(host: str, port: int, db: Db): +def run(host: str, port: int, db_file: str): """Serve the local database """ - app = create_app(db) - app.run(host, port, debug=True) + # from .db import Db + + # db = Db(db_file) + # app = create_app(db) + # app.run(host, port, debug=False) + # db.close() + pass diff --git a/swh/scanner/benchmark_algos.py b/swh/scanner/benchmark_algos.py new file mode 100644 index 0000000..7a51b15 --- /dev/null +++ b/swh/scanner/benchmark_algos.py @@ -0,0 +1,383 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import collections +import itertools +import json +import os +from pathlib import Path +import random +from typing import Dict, Iterable, List, Optional + +import requests + +from swh.model.from_disk import Content, Directory, accept_all_directories +from swh.model.identifiers import CONTENT, DIRECTORY, swhid + +from .exceptions import APIError +from .model import Status, Tree +from .scanner import directory_filter, extract_regex_objs + + +def query_swhids( + swhids: List[Tree], api_url: str, counter: Optional[collections.Counter] = None +) -> Dict[str, Dict[str, bool]]: + """ + Returns: + A dictionary with: + key(str): persistent identifier + value(dict): + value['known'] = True if pid is found + value['known'] = False if pid is not found + """ + endpoint = api_url + "known/" + chunk_size = 1000 + + if counter: + counter["queries"] += len(swhids) + + def make_request(swhids): + swhids = [swhid.swhid for swhid in swhids] + req = requests.post(endpoint, json=swhids) + if req.status_code != 200: + error_message = "%s with given values %s" % (req.text, str(swhids)) + raise APIError(error_message) + if counter: + counter["api_calls"] += 1 + resp = req.text + return json.loads(resp) + + def get_chunk(swhids): + for i in range(0, len(swhids), chunk_size): + yield swhids[i : i + chunk_size] + + if len(swhids) > chunk_size: + return dict( + itertools.chain.from_iterable( + make_request(swhids_chunk).items() for swhids_chunk in get_chunk(swhids) + ) + ) + else: + return make_request(swhids) + + +def stopngo(source_tree: Tree, api_url: str, counter: collections.Counter): + def set_children_known(node): + for child_node in node.iterate(): + child_node.known = True + + nodes = [] + nodes.append(source_tree) + + while len(nodes) > 0: + parsed_nodes = query_swhids(nodes, api_url, counter) + for node in nodes.copy(): + nodes.remove(node) + node.known = parsed_nodes[node.swhid]["known"] + node.status = Status.queried + if node.otype == DIRECTORY: + if not node.known: + nodes.extend(list(node.children.values())) + else: + set_children_known(node) + + +def set_father_status(node, known): + """ + Recursively change father known and visited status of a given node + """ + parent = node.father + + if parent is None: + return + if parent.status != Status.unset: + return + + parent.known = known + set_father_status(parent, known) + + +def set_children_status(node, node_type, known, status: Status = Status.unset): + """ + Recursively change father known and visited status of a given node + """ + for child_node in node.iterate(): + if child_node.otype == node_type and child_node.status == status: + child_node.known = known + + +def file_priority(source_tree: Tree, api_url: str, counter: collections.Counter): + # get all the files + all_contents = list( + filter(lambda node: node.otype == CONTENT, source_tree.iterate_bfs()) + ) + all_contents.reverse() # we check nodes from the deepest + + # query the backend to get all file contents status + parsed_contents = query_swhids(all_contents, api_url, counter) + # set all the file contents status + for cnt in all_contents: + cnt.known = parsed_contents[cnt.swhid]["known"] + cnt.status = Status.queried + # set all the upstream directories of unknown file contents to unknown + if not cnt.known: + set_father_status(cnt, False) + + # get all unset directories and check their status + # (update children directories accordingly) + unset_dirs = list( + filter( + lambda node: node.otype == DIRECTORY and node.status == Status.unset, + source_tree.iterate(), + ) + ) + + if source_tree.status == Status.unset: + unset_dirs.append(source_tree) + + # check unset directories + for dir_ in unset_dirs: + if dir_.status == Status.unset: + # update directory status + dir_.known = query_swhids([dir_], api_url, counter)[dir_.swhid]["known"] + dir_.status = Status.queried + set_children_status(dir_, DIRECTORY, dir_.known) + + +def directory_priority(source_tree: Tree, api_url: str, counter: collections.Counter): + # get all directory contents that have at least one file content + unset_dirs = list( + filter( + lambda dir_: dir_.otype == DIRECTORY and dir_.has_contents, + source_tree.iterate_bfs(), + ) + ) + unset_dirs.reverse() + # insert root if it has no contents + if source_tree.has_contents: + unset_dirs.append(source_tree) + + for dir_ in unset_dirs: + # if the directory is known set all the downstream file contents to known + if dir_.status == Status.unset: + dir_.known = query_swhids([dir_], api_url, counter)[dir_.swhid]["known"] + dir_.status = Status.queried + if dir_.known: + set_children_status(dir_, CONTENT, True) + else: + set_father_status(dir_, False) + + # get remaining directories that have no file contents + unset_dirs_no_cnts = list( + filter( + lambda node: node.otype == DIRECTORY and not node.has_contents, + source_tree.iterate_bfs(), + ) + ) + parsed_dirs_no_cnts = query_swhids(unset_dirs_no_cnts, api_url, counter) + + # update status of directories that have no file contents + for dir_ in unset_dirs_no_cnts: + dir_.known = parsed_dirs_no_cnts[dir_.swhid]["known"] + dir_.status = Status.queried + + # check unknown file contents + unset_files = list( + filter( + lambda node: node.otype == CONTENT and node.status == Status.unset, + source_tree.iterate(), + ) + ) + parsed_unset_files = query_swhids(unset_files, api_url, counter) + + for file_ in unset_files: + file_.known = parsed_unset_files[file_.swhid]["known"] + file_.status = Status.queried + + +def random_( + source_tree: Tree, + api_url: str, + counter: collections.Counter, + seed: Optional[int] = None, +): + + if seed: + random.seed(seed) + # get all directory/file contents + all_nodes = [node for node in source_tree.iterate()] + [source_tree] + # shuffle contents + random.shuffle(all_nodes) + + while len(all_nodes): + node = all_nodes.pop() + + if node.status != Status.unset: + continue + + node.known = query_swhids([node], api_url, counter)[node.swhid]["known"] + node.status = Status.queried + if node.otype == DIRECTORY and node.known: + for child_node in node.iterate(): + child_node.known = True + elif node.otype == CONTENT and not node.known: + set_father_status(node, False) + + +def algo_min(source_tree: Tree, api_url: str): + """ + The minimal number of queries knowing the known/unknown status of every node + """ + + def remove_parents(node, nodes): + parent = node.father + if parent is None or parent not in nodes: + return + else: + nodes.remove(parent) + remove_parents(parent, nodes) + + def remove_children(node, nodes): + for child_node in node.iterate(): + nodes.remove(child_node) + + all_nodes = [node for node in source_tree.iterate()] + all_nodes.insert(0, source_tree) + + parsed_nodes = query_swhids(all_nodes, api_url) + for node in all_nodes: + node.known = parsed_nodes[node.swhid]["known"] + + all_nodes_copy = all_nodes.copy() + + for node in all_nodes: + if node.otype == CONTENT and not node.known: + remove_parents(node, all_nodes_copy) + + for node in all_nodes_copy: + if node.otype == DIRECTORY and node.known: + remove_children(node, all_nodes_copy) + + return len(all_nodes_copy) + + +def get_swhids(paths: Iterable[Path], exclude_patterns): + def swhid_of(path): + if path.is_dir(): + if exclude_patterns: + + def dir_filter(dirpath, *args): + return directory_filter(dirpath, exclude_patterns) + + else: + dir_filter = accept_all_directories + + obj = Directory.from_disk( + path=bytes(path), dir_filter=dir_filter + ).get_data() + + return swhid(DIRECTORY, obj) + else: + obj = Content.from_file(path=bytes(path)).get_data() + return swhid(CONTENT, obj) + + for path in paths: + yield str(path), swhid_of(path) + + +def load_source(root, sre_patterns): + """ + Load the source code inside the Tree data structure + """ + + def _scan(root_path, source_tree, sre_patterns): + dirpath, dnames, fnames = next(os.walk(root_path)) + dirpath = Path(dirpath) + + if fnames: + files = [dirpath.joinpath(fname) for fname in fnames] + parsed_file_swhids = dict(get_swhids(files, sre_patterns)) + + for path, swhid_ in parsed_file_swhids.items(): + source_tree.add_node(Path(path), swhid_) + + if dnames: + dirs = [dirpath.joinpath(dname) for dname in dnames] + parsed_dirs_swhids = dict(get_swhids(dirs, sre_patterns)) + + for path, swhid_ in parsed_dirs_swhids.items(): + if not directory_filter(path, sre_patterns): + continue + source_tree.add_node(Path(path), swhid_) + _scan(path, source_tree, sre_patterns) + + source_tree = Tree(root) + root_swhid = dict(get_swhids([root], sre_patterns)) + source_tree.swhid = root_swhid[str(root)] + _scan(root, source_tree, sre_patterns) + return source_tree + + +def run( + root: str, + api_url: str, + backend_name: str, + exclude_patterns: Iterable[str], + algo: str, + origin: str, + commit: str, + seed: Optional[int] = None, +): + sre_patterns = set() + if exclude_patterns: + sre_patterns = { + reg_obj for reg_obj in extract_regex_objs(Path(root), exclude_patterns) + } + + repo_id = Path(root).parts[-1] + counter: collections.Counter = collections.Counter() + counter["api_calls"] = 0 + counter["queries"] = 0 + source_tree = load_source(Path(root), sre_patterns) + + if algo == "random": + if seed: + random_(source_tree, api_url, counter, seed) + else: + random_(source_tree, api_url, counter) + elif algo == "algo_min": + min_queries = algo_min(source_tree, api_url) + min_result = ( + repo_id, + origin, + commit, + backend_name, + len(source_tree), + algo, + min_queries, + ) + print(min_result) + return + elif algo == "stopngo": + stopngo(source_tree, api_url, counter) + elif algo == "file_priority": + file_priority(source_tree, api_url, counter) + elif algo == "directory_priority": + directory_priority(source_tree, api_url, counter) + else: + raise Exception(f'Algorithm "{algo}" not found') + + result = ( + repo_id, + origin, + commit, + backend_name, + len(source_tree), + algo, + counter["api_calls"], + counter["queries"], + ) + + print(result) diff --git a/swh/scanner/cli.py b/swh/scanner/cli.py index 9d7cb35..b97aa18 100644 --- a/swh/scanner/cli.py +++ b/swh/scanner/cli.py @@ -1,242 +1,315 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control import os import sys from typing import Any, Dict, Optional import click import yaml from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group from .exceptions import DBError # Config for the "serve" option BACKEND_DEFAULT_PORT = 5011 # All generic config code should reside in swh.core.config CONFIG_ENVVAR = "SWH_CONFIG_FILE" DEFAULT_CONFIG_PATH = os.path.join(click.get_app_dir("swh"), "global.yml") DEFAULT_CONFIG: Dict[str, Any] = { "web-api": { "url": "https://archive.softwareheritage.org/api/1/", "auth-token": None, } } CONFIG_FILE_HELP = f"""Configuration file: \b The CLI option or the environment variable will fail if invalid. CLI option is checked first. Then, environment variable {CONFIG_ENVVAR} is checked. Then, if cannot load the default path, a set of default values are used. Default config path is {DEFAULT_CONFIG_PATH}. Default config values are: \b {yaml.dump(DEFAULT_CONFIG)}""" SCANNER_HELP = f"""Software Heritage Scanner tools. {CONFIG_FILE_HELP}""" def setup_config(ctx, api_url): config = ctx.obj["config"] if api_url: if not api_url.endswith("/"): api_url += "/" config["web-api"]["url"] = api_url return config @swh_cli_group.group( name="scanner", context_settings=CONTEXT_SETTINGS, help=SCANNER_HELP, ) @click.option( "-C", "--config-file", default=None, type=click.Path(exists=False, dir_okay=False, path_type=str), help="""YAML configuration file""", ) @click.pass_context def scanner(ctx, config_file: Optional[str]): env_config_path = os.environ.get(CONFIG_ENVVAR) # read_raw_config do not fail if file does not exist, so check it beforehand # while enforcing loading priority if config_file: if not config.config_exists(config_file): raise click.BadParameter( f"File '{config_file}' cannot be opened.", param_hint="--config-file" ) elif env_config_path: if not config.config_exists(env_config_path): raise click.BadParameter( f"File '{env_config_path}' cannot be opened.", param_hint=CONFIG_ENVVAR ) config_file = env_config_path elif config.config_exists(DEFAULT_CONFIG_PATH): config_file = DEFAULT_CONFIG_PATH conf = DEFAULT_CONFIG if config_file is not None: conf = config.read_raw_config(config.config_basepath(config_file)) conf = config.merge_configs(DEFAULT_CONFIG, conf) ctx.ensure_object(dict) ctx.obj["config"] = conf @scanner.command(name="scan") @click.argument("root_path", required=True, type=click.Path(exists=True)) @click.option( "-u", "--api-url", default=None, metavar="API_URL", show_default=True, help="URL for the api request", ) @click.option( "--exclude", "-x", "patterns", metavar="PATTERN", multiple=True, help="Exclude directories using glob patterns \ (e.g., '*.git' to exclude all .git directories)", ) @click.option( "-f", "--output-format", "out_fmt", default="text", show_default=True, type=click.Choice(["text", "json", "ndjson", "sunburst"], case_sensitive=False), help="The output format", ) @click.option( "-i", "--interactive", is_flag=True, help="Show the result in a dashboard" ) @click.pass_context def scan(ctx, root_path, api_url, patterns, out_fmt, interactive): """Scan a source code project to discover files and directories already present in the archive""" import swh.scanner.scanner as scanner config = setup_config(ctx, api_url) scanner.scan(config, root_path, patterns, out_fmt, interactive) @scanner.group("db", help="Manage local knowledge base for swh-scanner") @click.pass_context def db(ctx): pass @db.command("import") @click.option( "-i", "--input", "input_file", metavar="INPUT_FILE", required=True, type=click.File("r"), help="A file containing SWHIDs", ) @click.option( "-o", "--output", "output_file_db", metavar="OUTPUT_DB_FILE", required=True, show_default=True, help="The name of the generated sqlite database", ) @click.option( "-s", "--chunk-size", "chunk_size", default="10000", metavar="SIZE", show_default=True, type=int, help="The chunk size ", ) @click.pass_context def import_(ctx, chunk_size, input_file, output_file_db): """Create SQLite database of known SWHIDs from a textual list of SWHIDs """ from .db import Db db = Db(output_file_db) cur = db.conn.cursor() try: db.create_from(input_file, chunk_size, cur) db.close() except DBError: print("Failed to create database") os.remove(output_file_db) sys.exit(1) @db.command("serve") @click.option( "-h", "--host", metavar="HOST", default="127.0.0.1", show_default=True, help="The host of the API server", ) @click.option( "-p", "--port", metavar="PORT", default=f"{BACKEND_DEFAULT_PORT}", show_default=True, help="The port of the API server", ) @click.option( "-f", "--db-file", "db_file", metavar="DB_FILE", default="SWHID_DB.sqlite", show_default=True, type=click.Path(exists=True), help="An sqlite database file (it can be generated with: 'swh scanner db import')", ) @click.pass_context def serve(ctx, host, port, db_file): """Start an API service using the sqlite database generated with the "db import" option.""" import swh.scanner.backend as backend from .db import Db db = Db(db_file) backend.run(host, port, db) db.close() +@scanner.command() +@click.argument("root_path", required=True, type=click.Path(exists=True)) +@click.option( + "-u", + "--api-url", + default=None, + metavar="API_URL", + show_default=True, + required=True, + help="URL for the api request.", +) +@click.option( + "-n", + "--backend-name", + default=None, + metavar="BACKEND_NAME", + show_default=True, + required=True, + help="The backend name.", +) +@click.option( + "--origin", + "-o", + "origin_url", + metavar="ORIGIN_URL", + required=True, + help="Repository origin url.", +) +@click.option( + "--commit", "-c", metavar="COMMIT", required=True, help="Commit identifier.", +) +@click.option( + "--exclude", + "-x", + "patterns", + metavar="PATTERN", + multiple=True, + show_default=True, + help="Exclude directories using glob patterns \ + (e.g., '*.git' to exclude all .git directories).", +) +@click.option( + "--algo", "-a", metavar="ALGO NAME", required=True, help="Algorithm name.", +) +@click.option( + "--seed", "-s", metavar="SEED", type=int, help="Seed for the random algorithm" +) +@click.pass_context +def benchmark( + ctx, root_path, api_url, backend_name, origin_url, commit, patterns, algo, seed +): + from importlib import reload + import logging + + from swh.scanner.benchmark_algos import run + + # reload logging module avoid conflict with benchmark.py logging + reload(logging) + logging.basicConfig( + filename="experiments.log", + format="%(asctime)s %(message)s", + datefmt="%m/%d/%Y %I:%M:%S %p", + ) + + try: + run(root_path, api_url, backend_name, patterns, algo, origin_url, commit, seed) + except Exception as e: + logging.exception( + f'Repository: "{root_path}" using "{algo}" ' + f'algorithm on "{api_url}" FAILED: {e}' + ) + + def main(): return scanner(auto_envvar_prefix="SWH_SCANNER") if __name__ == "__main__": main() diff --git a/swh/scanner/model.py b/swh/scanner/model.py index ee87cc7..51a498b 100644 --- a/swh/scanner/model.py +++ b/swh/scanner/model.py @@ -1,251 +1,291 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations from enum import Enum import json from pathlib import Path import sys from typing import Any, Dict, Iterator, List, Tuple import ndjson from swh.model.identifiers import CONTENT, DIRECTORY from .exceptions import InvalidDirectoryPath, InvalidObjectType from .plot import generate_sunburst, offline_plot class Color(Enum): blue = "\033[94m" green = "\033[92m" + yellow = "\033[93m" + magenta = "\033[95m" red = "\033[91m" end = "\033[0m" def colorize(text: str, color: Color): return color.value + text + Color.end.value +class Status(Enum): + unset = 0 + set = 1 + queried = 2 + + class Tree: """Representation of a file system structure """ def __init__(self, path: Path, father: Tree = None): self.father = father self.path = path self.otype = DIRECTORY if path.is_dir() else CONTENT self.swhid = "" self.known = False + self.status = Status.unset self.children: Dict[Path, Tree] = {} - def add_node(self, path: Path, swhid: str, known: bool) -> None: + def __len__(self): + return sum(1 for node in self.iterate()) + 1 # the root node + + def add_node(self, path: Path, swhid: str) -> None: """Recursively add a new path. """ relative_path = path.relative_to(self.path) if relative_path == Path("."): self.swhid = swhid - self.known = known return new_path = self.path.joinpath(relative_path.parts[0]) if new_path not in self.children: self.children[new_path] = Tree(new_path, self) - self.children[new_path].add_node(path, swhid, known) + self.children[new_path].add_node(path, swhid) def show(self, fmt) -> None: """Show tree in different formats""" if fmt == "json": print(json.dumps(self.to_dict(), indent=4, sort_keys=True)) if fmt == "ndjson": print(ndjson.dumps(node.attributes for node in self.iterate())) elif fmt == "text": isatty = sys.stdout.isatty() print(colorize(str(self.path), Color.blue) if isatty else str(self.path)) self.print_children(isatty) elif fmt == "sunburst": root = self.path directories = self.get_directories_info(root) sunburst = generate_sunburst(directories, root) offline_plot(sunburst) def print_children(self, isatty: bool, inc: int = 1) -> None: for path, node in self.children.items(): self.print_node(node, isatty, inc) if node.children: node.print_children(isatty, inc + 1) def print_node(self, node: Any, isatty: bool, inc: int) -> None: rel_path = str(node.path.relative_to(self.path)) begin = "│ " * inc end = "/" if node.otype == DIRECTORY else "" if isatty: - if not node.known: - rel_path = colorize(rel_path, Color.red) - elif node.otype == DIRECTORY: + if node.status == Status.unset: + rel_path = colorize(rel_path, Color.magenta) + elif node.status == Status.set and not node.known: + rel_path = colorize(rel_path, Color.yellow) + elif node.status == Status.set and node.known: rel_path = colorize(rel_path, Color.blue) - elif node.otype == CONTENT: + elif node.status == Status.queried and not node.known: + rel_path = colorize(rel_path, Color.red) + elif node.status == Status.queried and node.known: rel_path = colorize(rel_path, Color.green) print(f"{begin}{rel_path}{end}") + @property + def known(self): + return self._known + + @known.setter + def known(self, value: bool): + self._known = value + self.status = Status.set + @property def attributes(self) -> Dict[str, Dict[str, Any]]: """ Get the attributes of the current node grouped by the relative path. Returns: a dictionary containing a path as key and its known/unknown status and the SWHID as values. """ return {str(self.path): {"swhid": self.swhid, "known": self.known,}} def to_dict(self) -> Dict[str, Dict[str, Any]]: """ Recursively flatten the current tree nodes into a dictionary. For example, if you have the following structure: .. code-block:: none root { subdir: { file.txt } } The generated dictionary will be: .. code-block:: none { "root": { "swhid": "...", "known": True/False } "root/subdir": { "swhid": "...", "known": True/False } "root/subdir/file.txt": { "swhid": "...", "known": True/False } } """ return {k: v for node in self.iterate() for k, v in node.attributes.items()} def iterate(self) -> Iterator[Tree]: """ Recursively iterate through the children of the current node """ for _, child_node in self.children.items(): yield child_node if child_node.otype == DIRECTORY: yield from child_node.iterate() + def iterate_bfs(self) -> Iterator[Tree]: + """Get nodes in BFS order + """ + nodes = set(node for node in self.children.values()) + for node in nodes: + yield node + for node in nodes: + if node.otype == DIRECTORY: + yield from node.iterate_bfs() + def get_files_from_dir(self, dir_path: Path) -> List: """ Retrieve files information about a specific directory path Returns: A list containing the files attributes present inside the directory given in input """ def get_files(node): files = [] for _, node in node.children.items(): if node.otype == CONTENT: files.append(node.attributes) return files if dir_path == self.path: return get_files(self) else: for node in self.iterate(): if node.path == dir_path: return get_files(node) raise InvalidDirectoryPath( "The directory provided doesn't match any stored directory" ) def _get_sub_dirs_info(self, root, directories): """Fills the directories given in input with the contents information stored inside the directory child, only if they have contents. """ for path, child_node in self.children.items(): if child_node.otype == DIRECTORY: rel_path = path.relative_to(root) contents_info = child_node.count_contents() # checks the first element of the tuple # (the number of contents in a directory) # if it is equal to zero it means that there are no contents # in that directory. if not contents_info[0] == 0: directories[rel_path] = contents_info if child_node.has_dirs(): child_node._get_sub_dirs_info(root, directories) def get_directories_info(self, root: Path) -> Dict[Path, Tuple[int, int]]: """Get information about all directories under the given root. Returns: A dictionary with a directory path as key and the relative contents information (the result of count_contents) as values. """ directories = {root: self.count_contents()} self._get_sub_dirs_info(root, directories) return directories def count_contents(self) -> Tuple[int, int]: """Count how many contents are present inside a directory. If a directory has a SWHID returns as it has all the contents. Returns: A tuple with the total number of the contents and the number of contents known (the ones that have a persistent identifier). """ contents = 0 discovered = 0 if not self.otype == DIRECTORY: raise InvalidObjectType( "Can't count contents of the object type: %s" % self.otype ) if self.known: # to identify a directory with all files/directories present return (1, 1) else: for _, child_node in self.children.items(): if child_node.otype == CONTENT: contents += 1 if child_node.known: discovered += 1 return (contents, discovered) def has_dirs(self) -> bool: """Checks if node has directories """ for _, child_node in self.children.items(): if child_node.otype == DIRECTORY: return True return False + + def has_contents(self) -> bool: + for _, child_node in self.children.items(): + if child_node.otype == CONTENT: + return True + return False