diff --git a/swh/scanner/cli.py b/swh/scanner/cli.py index 5f00d48..f4c0374 100644 --- a/swh/scanner/cli.py +++ b/swh/scanner/cli.py @@ -1,285 +1,287 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control import os from typing import Any, Dict, Optional import click from importlib_metadata import version import yaml from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group from .exceptions import DBError # Config for the "serve" option BACKEND_DEFAULT_PORT = 5011 # All generic config code should reside in swh.core.config CONFIG_ENVVAR = "SWH_CONFIG_FILE" DEFAULT_CONFIG_PATH = os.path.join(click.get_app_dir("swh"), "global.yml") DEFAULT_CONFIG: Dict[str, Any] = { "web-api": { "url": "https://archive.softwareheritage.org/api/1/", "auth-token": None, } } CONFIG_FILE_HELP = f"""Configuration file: \b The CLI option or the environment variable will fail if invalid. CLI option is checked first. Then, environment variable {CONFIG_ENVVAR} is checked. Then, if cannot load the default path, a set of default values are used. Default config path is {DEFAULT_CONFIG_PATH}. Default config values are: \b {yaml.dump(DEFAULT_CONFIG)}""" SCANNER_HELP = f"""Software Heritage Scanner tools. {CONFIG_FILE_HELP}""" def setup_config(ctx, api_url): config = ctx.obj["config"] if api_url: if not api_url.endswith("/"): api_url += "/" config["web-api"]["url"] = api_url return config @swh_cli_group.group( - name="scanner", context_settings=CONTEXT_SETTINGS, help=SCANNER_HELP, + name="scanner", + context_settings=CONTEXT_SETTINGS, + help=SCANNER_HELP, ) @click.option( "-C", "--config-file", default=None, type=click.Path(exists=False, dir_okay=False, path_type=str), help="""YAML configuration file""", ) @click.version_option( - version=version("swh.scanner"), prog_name="swh.scanner", + version=version("swh.scanner"), + prog_name="swh.scanner", ) @click.pass_context def scanner(ctx, config_file: Optional[str]): env_config_path = os.environ.get(CONFIG_ENVVAR) # read_raw_config do not fail if file does not exist, so check it beforehand # while enforcing loading priority if config_file: if not config.config_exists(config_file): raise click.BadParameter( f"File '{config_file}' cannot be opened.", param_hint="--config-file" ) elif env_config_path: if not config.config_exists(env_config_path): raise click.BadParameter( f"File '{env_config_path}' cannot be opened.", param_hint=CONFIG_ENVVAR ) config_file = env_config_path elif config.config_exists(DEFAULT_CONFIG_PATH): config_file = DEFAULT_CONFIG_PATH conf = DEFAULT_CONFIG if config_file is not None: conf = config.read_raw_config(config.config_basepath(config_file)) conf = config.merge_configs(DEFAULT_CONFIG, conf) ctx.ensure_object(dict) ctx.obj["config"] = conf @scanner.command(name="scan") @click.argument("root_path", required=True, type=click.Path(exists=True)) @click.option( "-u", "--api-url", default=None, metavar="API_URL", show_default=True, help="URL for the api request", ) @click.option( "--exclude", "-x", "patterns", metavar="PATTERN", multiple=True, help="Exclude directories using glob patterns \ (e.g., ``*.git`` to exclude all .git directories)", ) @click.option( "-f", "--output-format", "out_fmt", default="text", show_default=True, type=click.Choice(["text", "json", "ndjson", "sunburst"], case_sensitive=False), help="The output format", ) @click.option( "-i", "--interactive", is_flag=True, help="Show the result in a dashboard" ) @click.option( "-p", "--policy", default="auto", show_default=True, type=click.Choice(["auto", "bfs", "greedybfs", "filepriority", "dirpriority"]), help="The scan policy.", ) @click.option( "-e", "--extra-info", "extra_info", multiple=True, type=click.Choice(["origin"]), help="Add selected additional information about known software artifacts.", ) @click.pass_context def scan(ctx, root_path, api_url, patterns, out_fmt, interactive, policy, extra_info): """Scan a source code project to discover files and directories already present in the archive. The source code project can be checked using different policies that can be set using the -p/--policy option:\n \b auto: it selects the best policy based on the source code, for codebase(s) with less than 1000 file/dir contents all the nodes will be queried. bfs: scan the source code in the BFS order, checking unknown directories only. \b greedybfs: same as "bfs" policy, but lookup the status of source code artifacts in chunks, in order to minimize the number of Web API round-trips with the archive. \b filepriority: scan all the source code file contents, checking only unset directories. (useful if the codebase contains a lot of source files) dirpriority: scan all the source code directories and check only unknown directory contents. Other information about software artifacts could be specified with the -e/ --extra-info option:\n \b origin: search the origin url of each source code files/dirs using the in-memory - compressed graph. -""" + compressed graph.""" import swh.scanner.scanner as scanner config = setup_config(ctx, api_url) extra_info = set(extra_info) scanner.scan(config, root_path, patterns, out_fmt, interactive, policy, extra_info) @scanner.group("db", help="Manage local knowledge base for swh-scanner") @click.pass_context def db(ctx): pass @db.command("import") @click.option( "-i", "--input", "input_file", metavar="INPUT_FILE", required=True, type=click.File("r"), help="A file containing SWHIDs", ) @click.option( "-o", "--output", "output_file_db", metavar="OUTPUT_DB_FILE", required=True, show_default=True, help="The name of the generated sqlite database", ) @click.option( "-s", "--chunk-size", "chunk_size", default="10000", metavar="SIZE", show_default=True, type=int, help="The chunk size ", ) @click.pass_context def import_(ctx, chunk_size, input_file, output_file_db): """Create SQLite database of known SWHIDs from a textual list of SWHIDs""" from .db import Db db = Db(output_file_db) cur = db.conn.cursor() try: db.create_from(input_file, chunk_size, cur) db.close() except DBError as e: ctx.fail("Failed to import SWHIDs into database: {0}".format(e)) @db.command("serve") @click.option( "-h", "--host", metavar="HOST", default="127.0.0.1", show_default=True, help="The host of the API server", ) @click.option( "-p", "--port", metavar="PORT", default=f"{BACKEND_DEFAULT_PORT}", show_default=True, help="The port of the API server", ) @click.option( "-f", "--db-file", "db_file", metavar="DB_FILE", default="SWHID_DB.sqlite", show_default=True, type=click.Path(exists=True), help="An sqlite database file (it can be generated with: 'swh scanner db import')", ) @click.pass_context def serve(ctx, host, port, db_file): """Start an API service using the sqlite database generated with the "db import" option.""" import swh.scanner.backend as backend from .db import Db db = Db(db_file) backend.run(host, port, db) db.close() def main(): return scanner(auto_envvar_prefix="SWH_SCANNER") if __name__ == "__main__": main() diff --git a/swh/scanner/client.py b/swh/scanner/client.py index 0a07964..eac0e94 100644 --- a/swh/scanner/client.py +++ b/swh/scanner/client.py @@ -1,98 +1,96 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """ Minimal async web client for the Software Heritage Web API. This module could be removed when `T2635 ` is implemented. """ import asyncio import itertools from typing import Any, Dict, List, Optional import aiohttp from swh.model.swhids import CoreSWHID from .exceptions import error_response # Maximum number of SWHIDs that can be requested by a single call to the # Web API endpoint /known/ QUERY_LIMIT = 1000 KNOWN_EP = "known/" GRAPH_RANDOMWALK_EP = "graph/randomwalk/" class Client: - """Manage requests to the Software Heritage Web API. - """ + """Manage requests to the Software Heritage Web API.""" def __init__(self, api_url: str, session: aiohttp.ClientSession): self.api_url = api_url self.session = session async def get_origin(self, swhid: CoreSWHID) -> Optional[Any]: - """Walk the compressed graph to discover the origin of a given swhid - """ + """Walk the compressed graph to discover the origin of a given swhid""" endpoint = ( f"{self.api_url}{GRAPH_RANDOMWALK_EP}{str(swhid)}/ori/?direction=" f"backward&limit=-1&resolve_origins=true" ) res = None async with self.session.get(endpoint) as resp: if resp.status == 200: res = await resp.text() res = res.rstrip() return res if resp.status != 404: error_response(resp.reason, resp.status, endpoint) return res async def known(self, swhids: List[CoreSWHID]) -> Dict[str, Dict[str, bool]]: """API Request to get information about the SoftWare Heritage persistent IDentifiers (SWHIDs) given in input. Args: swhids: a list of CoreSWHID instances api_url: url for the API request Returns: A dictionary with: key: string SWHID searched value: value['known'] = True if the SWHID is found value['known'] = False if the SWHID is not found """ endpoint = self.api_url + KNOWN_EP requests = [] def get_chunk(swhids): for i in range(0, len(swhids), QUERY_LIMIT): yield swhids[i : i + QUERY_LIMIT] async def make_request(swhids): swhids = [str(swhid) for swhid in swhids] async with self.session.post(endpoint, json=swhids) as resp: if resp.status != 200: error_response(resp.reason, resp.status, endpoint) return await resp.json() if len(swhids) > QUERY_LIMIT: for swhids_chunk in get_chunk(swhids): requests.append(asyncio.create_task(make_request(swhids_chunk))) res = await asyncio.gather(*requests) # concatenate list of dictionaries return dict(itertools.chain.from_iterable(e.items() for e in res)) else: return await make_request(swhids) diff --git a/swh/scanner/dashboard/dashboard.py b/swh/scanner/dashboard/dashboard.py index e9597fc..3ba128d 100644 --- a/swh/scanner/dashboard/dashboard.py +++ b/swh/scanner/dashboard/dashboard.py @@ -1,103 +1,108 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from pathlib import Path import dash from dash.dependencies import Input, Output import dash_bootstrap_components as dbc import dash_core_components as dcc import dash_html_components as html import plotly.graph_objects as go from swh.model.from_disk import Directory from ..data import MerkleNodeInfo, get_content_from def generate_table_body( dir_path: bytes, source_tree: Directory, nodes_data: MerkleNodeInfo ): """ Generate the data_table from the path taken from the chart. For each file builds the html table rows showing the known status, a local link to the file and the relative SoftWare Heritage persistent IDentifier (SWHID). """ contents = get_content_from(dir_path, source_tree, nodes_data) data = [] for cnt, attr in contents.items(): file_path = Path(cnt.decode()) file_name = file_path.parts[len(file_path.parts) - 1] full_file_path = Path(Path(dir_path.decode()), file_path) data.append( html.Tr( [ html.Td("✔" if attr["known"] else ""), html.Td(html.A(file_name, href="file://" + str(full_file_path))), html.Td(attr["swhid"]), ] ) ) return [html.Tbody(data)] def run_app(graph_obj: go, source_tree: Directory, nodes_data: MerkleNodeInfo): app = dash.Dash(__name__) fig = go.Figure().add_trace(graph_obj) - fig.update_layout(height=800,) + fig.update_layout( + height=800, + ) table_header = [ html.Thead(html.Tr([html.Th("KNOWN"), html.Th("FILE NAME"), html.Th("SWHID")])) ] app.layout = html.Div( [ html.Div( [ html.Div( - [dcc.Graph(id="sunburst_chart", figure=fig),], className="col", + [ + dcc.Graph(id="sunburst_chart", figure=fig), + ], + className="col", ), html.Div( [ html.H3(id="directory_title"), dbc.Table( id="files_table", hover=True, responsive=True, striped=True, ), ], className="col", ), ], className="row", ), ] ) @app.callback( [Output("files_table", "children"), Output("directory_title", "children")], [Input("sunburst_chart", "clickData")], ) def update_files_table(click_data): """ Callback that takes the input (directory path) from the chart and update the `files_table` children with the relative files. """ if click_data is not None: full_path = click_data["points"][0]["label"] return ( table_header + generate_table_body(full_path.encode(), source_tree, nodes_data), full_path, ) else: return "", "" app.run_server(debug=True, use_reloader=True) diff --git a/swh/scanner/data.py b/swh/scanner/data.py index a0368db..cc4e205 100644 --- a/swh/scanner/data.py +++ b/swh/scanner/data.py @@ -1,150 +1,150 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from pathlib import Path from typing import Dict, Optional, Tuple from swh.model.exceptions import ValidationError from swh.model.from_disk import Directory from swh.model.swhids import CoreSWHID from .client import Client SUPPORTED_INFO = {"known", "origin"} class MerkleNodeInfo(dict): """Store additional information about Merkle DAG nodes, using SWHIDs as keys""" def __setitem__(self, key, value): """The keys must be valid valid Software Heritage Persistent Identifiers while values must be dict. """ if not isinstance(key, CoreSWHID): raise ValidationError("keys must be valid SWHID(s)") if not isinstance(value, dict): raise ValidationError(f"values must be dict, not {type(value)}") super(MerkleNodeInfo, self).__setitem__(key, value) def init_merkle_node_info(source_tree: Directory, data: MerkleNodeInfo, info: set): """Populate the MerkleNodeInfo with the SWHIDs of the given source tree and the - attributes that will be stored. + attributes that will be stored. """ if not info: raise Exception("Data initialization requires node attributes values.") nodes_info: Dict[str, Optional[str]] = {} for ainfo in info: if ainfo in SUPPORTED_INFO: nodes_info[ainfo] = None else: raise Exception(f"Information {ainfo} is not supported.") for node in source_tree.iter_tree(): data[node.swhid()] = nodes_info.copy() # type: ignore async def add_origin(source_tree: Directory, data: MerkleNodeInfo, client: Client): """Store origin information about software artifacts retrieved from the Software - Heritage graph service. + Heritage graph service. """ queue = [] queue.append(source_tree) while queue: for node in queue.copy(): queue.remove(node) node_ori = await client.get_origin(node.swhid()) if node_ori: data[node.swhid()]["origin"] = node_ori if node.object_type == "directory": for sub_node in node.iter_tree(): data[sub_node.swhid()]["origin"] = node_ori # type: ignore else: if node.object_type == "directory": children = [sub_node for sub_node in node.iter_tree()] children.remove(node) queue.extend(children) # type: ignore def get_directory_data( root_path: str, source_tree: Directory, nodes_data: MerkleNodeInfo, directory_data: Dict = {}, ) -> Dict[Path, dict]: """Get content information for each directory inside source_tree. Returns: A dictionary with a directory path as key and the relative contents information as values. """ def _get_directory_data( source_tree: Directory, nodes_data: MerkleNodeInfo, directory_data: Dict ): directories = list( filter( lambda n: n.object_type == "directory", map(lambda n: n[1], source_tree.items()), ) ) for node in directories: directory_info = directory_content(node, nodes_data) rel_path = Path(node.data["path"].decode()).relative_to(Path(root_path)) directory_data[rel_path] = directory_info if has_dirs(node): _get_directory_data(node, nodes_data, directory_data) _get_directory_data(source_tree, nodes_data, directory_data) return directory_data def directory_content(node: Directory, nodes_data: MerkleNodeInfo) -> Tuple[int, int]: """Count known contents inside the given directory. Returns: A tuple with the total number of contents inside the directory and the number of known contents. """ known_cnt = 0 node_contents = list( filter(lambda n: n.object_type == "content", map(lambda n: n[1], node.items())) ) for sub_node in node_contents: if nodes_data[sub_node.swhid()]["known"]: known_cnt += 1 return (len(node_contents), known_cnt) def has_dirs(node: Directory) -> bool: """Check if the given directory has other directories inside.""" for _, sub_node in node.items(): if isinstance(sub_node, Directory): return True return False def get_content_from( node_path: bytes, source_tree: Directory, nodes_data: MerkleNodeInfo ) -> Dict[bytes, dict]: """Get content information from the given directory node.""" # root in model.from_disk.Directory should be accessed with b"" directory = source_tree[node_path if node_path != source_tree.data["path"] else b""] node_contents = list( filter( lambda n: n.object_type == "content", map(lambda n: n[1], directory.items()) ) ) files_data = {} for node in node_contents: node_info = nodes_data[node.swhid()] node_info["swhid"] = str(node.swhid()) path_name = "path" if "path" in node.data.keys() else "data" files_data[node.data[path_name]] = node_info return files_data diff --git a/swh/scanner/plot.py b/swh/scanner/plot.py index ab7b4a9..67a6582 100644 --- a/swh/scanner/plot.py +++ b/swh/scanner/plot.py @@ -1,268 +1,270 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """ The purpose of this module is to display and to interact with the result of the scanner contained in the model. The `sunburst` function generates a navigable sunburst chart from the directories information retrieved from the model. The chart displays for each directory the total number of files and the percentage of file known. The size of the directory is defined by the total number of contents whereas the color gradient is generated relying on the percentage of contents known. """ from pathlib import Path from typing import Dict, List, Tuple import numpy as np import pandas as pd import plotly.graph_objects as go from plotly.offline import offline def build_hierarchical_df( dirs_dataframe: pd.DataFrame, levels: List[str], metrics_columns: List[str], root_name: str, ) -> pd.DataFrame: """ Build a hierarchy of levels for Sunburst or Treemap charts. For each directory the new dataframe will have the following information: id: the directory name parent: the parent directory of id contents: the total number of contents of the directory id and the relative subdirectories known: the percentage of contents known relative to computed 'contents' Example: Given the following dataframe: .. code-block:: none lev0 lev1 contents known '' '' 20 2 //root kernel kernel/subdirker 5 0 telnet telnet/subdirtel 10 4 The output hierarchical dataframe will be like the following: .. code-block:: none id parent contents known 20 10.00 kernel/subdirker kernel 5 0.00 telnet/subdirtel telnet 10 40.00 total 20 10.00 kernel total 5 0.00 telnet total 10 40.00 total 35 17.14 To create the hierarchical dataframe we need to iterate through the dataframe given in input relying on the number of levels. Based on the previous example we have to do two iterations: iteration 1 The generated dataframe 'df_tree' will be: .. code-block:: none id parent contents known 20 10.0 kernel/subdirker kernel 5 0.0 telnet/subdirtel telnet 10 40.0 iteration 2 The generated dataframe 'df_tree' will be: .. code-block:: none id parent contents known total 20 10.0 kernel total 5 0.0 telnet total 10 40.0 Note that since we have reached the last level, the parent given to the directory id is the directory root. The 'total' row il computed by adding the number of contents of the dataframe given in input and the average of the contents known on the total number of contents. """ def compute_known_percentage(contents: pd.Series, known: pd.Series) -> pd.Series: """This function compute the percentage of known contents and generate the new known column with the percentage values. It also assures that if there is no contents inside a directory the percentage is zero """ known_values = [] for idx, content_val in enumerate(contents): if content_val == 0: known_values.append(0) else: percentage = known[idx] / contents[idx] * 100 known_values.append(percentage) return pd.Series(np.array(known_values)) complete_df = pd.DataFrame(columns=["id", "parent", "contents", "known"]) # revert the level order to start from the deepest levels = [level for level in reversed(levels)] contents_col = metrics_columns[0] known_col = metrics_columns[1] df_tree_list = [] for i, level in enumerate(levels): df_tree = pd.DataFrame(columns=["id", "parent", "contents", "known"]) dfg = dirs_dataframe.groupby(levels[i:]).sum() dfg = dfg.reset_index() df_tree["id"] = dfg[level].copy() if i < len(levels) - 1: # copy the parent directories (one level above) df_tree["parent"] = dfg[levels[i + 1]].copy() else: # last level reached df_tree["parent"] = root_name # copy the contents column df_tree["contents"] = dfg[contents_col] # compute the percentage relative to the contents df_tree["known"] = compute_known_percentage(dfg[contents_col], dfg[known_col]) df_tree_list.append(df_tree) complete_df = complete_df.append(df_tree_list, ignore_index=True) # create the main parent total_contents = dirs_dataframe[contents_col].sum() total_known = dirs_dataframe[known_col].sum() total_avg = total_known / total_contents * 100 total = pd.Series( dict(id=root_name, parent="", contents=total_contents, known=total_avg) ) complete_df = complete_df.append(total, ignore_index=True) return complete_df def compute_max_depth(dirs_path: List[Path]) -> int: """Compute the maximum depth level of the given directory paths. Example: for `var/log/kernel/` the depth level is 3 """ max_depth = 0 for dir_path in dirs_path: dir_depth = len( dir_path.parts[1:] if dir_path.parts[0] == "/" else dir_path.parts ) if dir_depth > max_depth: max_depth = dir_depth return max_depth def generate_df_from_dirs( - dirs: Dict[Path, Tuple[int, int]], columns: List[str], max_depth: int, + dirs: Dict[Path, Tuple[int, int]], + columns: List[str], + max_depth: int, ) -> pd.DataFrame: """Generate a dataframe from the directories given in input. Example: given the following directories as input .. code-block:: python dirs = { '/var/log/': (23, 2), '/var/log/kernel': (5, 0), '/var/log/telnet': (10, 3) } The generated dataframe will be: .. code-block:: none lev0 lev1 lev2 contents known 'var' 'var/log' '' 23 2 'var' 'var/log' 'var/log/kernel' 5 0 'var' 'var/log' 'var/log/telnet' 10 3 """ def get_parents(path: Path): parts = path.parts[1:] if path.parts[0] == "/" else path.parts for i in range(1, len(parts) + 1): yield "/".join(parts[0:i]) def get_dirs_array(): for dir_path, contents_info in dirs.items(): empty_lvl = max_depth - len(dir_path.parts) yield list(get_parents(dir_path)) + [""] * empty_lvl + list(contents_info) df = pd.DataFrame( np.array([dir_array for dir_array in get_dirs_array()]), columns=columns ) df["contents"] = pd.to_numeric(df["contents"]) df["known"] = pd.to_numeric(df["known"]) return df def generate_sunburst( directories: Dict[Path, Tuple[int, int]], root: Path ) -> go.Sunburst: """Generate a sunburst chart from the directories given in input.""" max_depth = compute_max_depth(list(directories.keys())) metrics_columns = ["contents", "known"] levels_columns = ["lev" + str(i) for i in range(max_depth)] df_columns = levels_columns + metrics_columns dirs_df = generate_df_from_dirs(directories, df_columns, max_depth) hierarchical_df = build_hierarchical_df( dirs_df, levels_columns, metrics_columns, str(root) ) sunburst = go.Sunburst( labels=hierarchical_df["id"], parents=hierarchical_df["parent"], values=hierarchical_df["contents"], branchvalues="total", marker=dict( colors=hierarchical_df["known"], colorscale="matter", cmid=50, showscale=True, ), hovertemplate="""%{label}
Files: %{value}
Known: %{color:.2f}%""", name="", ) return sunburst def offline_plot(graph_object: go): """Plot a graph object to an html file""" fig = go.Figure() fig.add_trace(graph_object) offline.plot(fig, filename="chart.html") diff --git a/swh/scanner/policy.py b/swh/scanner/policy.py index 8716d4b..fc9ef5e 100644 --- a/swh/scanner/policy.py +++ b/swh/scanner/policy.py @@ -1,260 +1,258 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc from typing import no_type_check from swh.core.utils import grouper from swh.model.from_disk import Directory from .client import QUERY_LIMIT, Client from .data import MerkleNodeInfo def source_size(source_tree: Directory): - """return the size of a source tree as the number of nodes it contains - """ + """return the size of a source tree as the number of nodes it contains""" return sum(1 for n in source_tree.iter_tree(dedup=False)) class Policy(metaclass=abc.ABCMeta): data: MerkleNodeInfo """information about contents and directories of the merkle tree""" source_tree: Directory """representation of a source code project directory in the merkle tree""" def __init__(self, source_tree: Directory, data: MerkleNodeInfo): self.source_tree = source_tree self.data = data @abc.abstractmethod async def run(self, client: Client): """Scan a source code project""" raise NotImplementedError("Must implement run method") class LazyBFS(Policy): """Read nodes in the merkle tree using the BFS algorithm. - Lookup only directories that are unknown otherwise set all the downstream - contents to known. + Lookup only directories that are unknown otherwise set all the downstream + contents to known. """ async def run(self, client: Client): queue = [] queue.append(self.source_tree) while queue: swhids = [node.swhid() for node in queue] swhids_res = await client.known(swhids) for node in queue.copy(): queue.remove(node) self.data[node.swhid()]["known"] = swhids_res[str(node.swhid())][ "known" ] if node.object_type == "directory": if not self.data[node.swhid()]["known"]: children = [n[1] for n in list(node.items())] queue.extend(children) else: for sub_node in node.iter_tree(): if sub_node == node: continue self.data[sub_node.swhid()]["known"] = True # type: ignore class GreedyBFS(Policy): """Query graph nodes in chunks (to maximize the Web API rate limit use) and set the - downstream contents of known directories to known. + downstream contents of known directories to known. """ async def run(self, client: Client): ssize = source_size(self.source_tree) seen = [] async for nodes_chunk in self.get_nodes_chunks(client, ssize): for node in nodes_chunk: seen.append(node) if len(seen) == ssize: return if node.object_type == "directory" and self.data[node.swhid()]["known"]: sub_nodes = [n for n in node.iter_tree(dedup=False)] sub_nodes.remove(node) # remove root node for sub_node in sub_nodes: seen.append(sub_node) self.data[sub_node.swhid()]["known"] = True @no_type_check async def get_nodes_chunks(self, client: Client, ssize: int): """Query chunks of QUERY_LIMIT nodes at once in order to fill the Web API - rate limit. It query all the nodes in the case the source code contains - less than QUERY_LIMIT nodes. + rate limit. It query all the nodes in the case the source code contains + less than QUERY_LIMIT nodes. """ nodes = self.source_tree.iter_tree(dedup=False) for nodes_chunk in grouper(nodes, QUERY_LIMIT): nodes_chunk = [n for n in nodes_chunk] swhids = [node.swhid() for node in nodes_chunk] swhids_res = await client.known(swhids) for node in nodes_chunk: swhid = node.swhid() self.data[swhid]["known"] = swhids_res[str(swhid)]["known"] yield nodes_chunk class FilePriority(Policy): """Check the Merkle tree querying all the file contents and set all the upstream - directories to unknown in the case a file content is unknown. - Finally check all the directories which status is still unknown and set all the - sub-directories of known directories to known. + directories to unknown in the case a file content is unknown. + Finally check all the directories which status is still unknown and set all the + sub-directories of known directories to known. """ @no_type_check async def run(self, client: Client): # get all the files all_contents = list( filter( lambda node: node.object_type == "content", self.source_tree.iter_tree() ) ) all_contents.reverse() # check deepest node first # query the backend to get all file contents status cnt_swhids = [node.swhid() for node in all_contents] cnt_status_res = await client.known(cnt_swhids) # set all the file contents status for cnt in all_contents: self.data[cnt.swhid()]["known"] = cnt_status_res[str(cnt.swhid())]["known"] # set all the upstream directories of unknown file contents to unknown if not self.data[cnt.swhid()]["known"]: parent = cnt.parents[0] while parent: self.data[parent.swhid()]["known"] = False parent = parent.parents[0] if parent.parents else None # get all unset directories and check their status # (update children directories accordingly) unset_dirs = list( filter( lambda node: node.object_type == "directory" and self.data[node.swhid()]["known"] is None, self.source_tree.iter_tree(), ) ) # check unset directories for dir_ in unset_dirs: if self.data[dir_.swhid()]["known"] is None: # update directory status dir_status = await client.known([dir_.swhid()]) dir_known = dir_status[str(dir_.swhid())]["known"] self.data[dir_.swhid()]["known"] = dir_known if dir_known: sub_dirs = list( filter( lambda n: n.object_type == "directory" and self.data[n.swhid()]["known"] is None, dir_.iter_tree(), ) ) for node in sub_dirs: self.data[node.swhid()]["known"] = True class DirectoryPriority(Policy): """Check the Merkle tree querying all the directories that have at least one file - content and set all the upstream directories to unknown in the case a directory - is unknown otherwise set all the downstream contents to known. - Finally check the status of empty directories and all the remaining file - contents. + content and set all the upstream directories to unknown in the case a directory + is unknown otherwise set all the downstream contents to known. + Finally check the status of empty directories and all the remaining file + contents. """ @no_type_check async def run(self, client: Client): # get all directory contents that have at least one file content unknown_dirs = list( filter( lambda dir_: dir_.object_type == "directory" and self.has_contents(dir_), self.source_tree.iter_tree(), ) ) unknown_dirs.reverse() # check deepest node first for dir_ in unknown_dirs: if self.data[dir_.swhid()]["known"] is None: dir_status = await client.known([dir_.swhid()]) dir_known = dir_status[str(dir_.swhid())]["known"] self.data[dir_.swhid()]["known"] = dir_known # set all the downstream file contents to known if dir_known: for cnt in self.get_contents(dir_): self.data[cnt.swhid()]["known"] = True # otherwise set all the upstream directories to unknown else: parent = dir_.parents[0] while parent: self.data[parent.swhid()]["known"] = False parent = parent.parents[0] if parent.parents else None # get remaining directories that have no file contents empty_dirs = list( filter( lambda n: n.object_type == "directory" and not self.has_contents(n) and self.data[n.swhid()]["known"] is None, self.source_tree.iter_tree(), ) ) empty_dirs_swhids = [n.swhid() for n in empty_dirs] empty_dir_status = await client.known(empty_dirs_swhids) # update status of directories that have no file contents for dir_ in empty_dirs: self.data[dir_.swhid()]["known"] = empty_dir_status[str(dir_.swhid())][ "known" ] # check unknown file contents unknown_cnts = list( filter( lambda n: n.object_type == "content" and self.data[n.swhid()]["known"] is None, self.source_tree.iter_tree(), ) ) unknown_cnts_swhids = [n.swhid() for n in unknown_cnts] unknown_cnts_status = await client.known(unknown_cnts_swhids) for cnt in unknown_cnts: self.data[cnt.swhid()]["known"] = unknown_cnts_status[str(cnt.swhid())][ "known" ] def has_contents(self, directory: Directory): """Check if the directory given in input has contents""" for entry in directory.entries: if entry["type"] == "file": return True return False def get_contents(self, dir_: Directory): """Get all the contents of a given directory""" for _, node in list(dir_.items()): if node.object_type == "content": yield node class QueryAll(Policy): - """Check the status of every node in the Merkle tree. - """ + """Check the status of every node in the Merkle tree.""" @no_type_check async def run(self, client: Client): all_nodes = [node for node in self.source_tree.iter_tree()] all_swhids = [node.swhid() for node in all_nodes] swhids_res = await client.known(all_swhids) for node in all_nodes: self.data[node.swhid()]["known"] = swhids_res[str(node.swhid())]["known"] diff --git a/swh/scanner/scanner.py b/swh/scanner/scanner.py index 8200a4f..7dcab7c 100644 --- a/swh/scanner/scanner.py +++ b/swh/scanner/scanner.py @@ -1,101 +1,100 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import asyncio from typing import Any, Dict, Iterable import aiohttp from swh.model.cli import model_of_dir from swh.model.from_disk import Directory from .client import Client from .data import MerkleNodeInfo, add_origin, init_merkle_node_info from .output import Output from .policy import ( QUERY_LIMIT, DirectoryPriority, FilePriority, GreedyBFS, LazyBFS, QueryAll, source_size, ) async def run( config: Dict[str, Any], policy, source_tree: Directory, nodes_data: MerkleNodeInfo, extra_info: set, ) -> None: - """Scan a given source code according to the policy given in input. - """ + """Scan a given source code according to the policy given in input.""" api_url = config["web-api"]["url"] if config["web-api"]["auth-token"]: headers = {"Authorization": f"Bearer {config['web-api']['auth-token']}"} else: headers = {} async with aiohttp.ClientSession(headers=headers, trust_env=True) as session: client = Client(api_url, session) for info in extra_info: if info == "known": await policy.run(client) elif info == "origin": await add_origin(source_tree, nodes_data, client) else: raise Exception(f"The information '{info}' cannot be retrieved") def get_policy_obj(source_tree: Directory, nodes_data: MerkleNodeInfo, policy: str): if policy == "auto": return ( QueryAll(source_tree, nodes_data) if source_size(source_tree) <= QUERY_LIMIT else LazyBFS(source_tree, nodes_data) ) elif policy == "bfs": return LazyBFS(source_tree, nodes_data) elif policy == "greedybfs": return GreedyBFS(source_tree, nodes_data) elif policy == "filepriority": return FilePriority(source_tree, nodes_data) elif policy == "dirpriority": return DirectoryPriority(source_tree, nodes_data) else: raise Exception(f"policy '{policy}' not found") def scan( config: Dict[str, Any], root_path: str, exclude_patterns: Iterable[str], out_fmt: str, interactive: bool, policy: str, extra_info: set, ): """Scan a source code project to discover files and directories already present in the archive""" converted_patterns = [pattern.encode() for pattern in exclude_patterns] source_tree = model_of_dir(root_path.encode(), converted_patterns) nodes_data = MerkleNodeInfo() extra_info.add("known") init_merkle_node_info(source_tree, nodes_data, extra_info) policy = get_policy_obj(source_tree, nodes_data, policy) loop = asyncio.get_event_loop() loop.run_until_complete(run(config, policy, source_tree, nodes_data, extra_info)) out = Output(root_path, nodes_data, source_tree) if interactive: out.show("interactive") else: out.show(out_fmt) diff --git a/swh/scanner/tests/conftest.py b/swh/scanner/tests/conftest.py index c8b2355..524a943 100644 --- a/swh/scanner/tests/conftest.py +++ b/swh/scanner/tests/conftest.py @@ -1,155 +1,155 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import asyncio import os from pathlib import Path import shutil import sys import aiohttp from aioresponses import aioresponses import pytest from swh.model.cli import model_of_dir from swh.scanner.data import MerkleNodeInfo from swh.scanner.policy import QUERY_LIMIT from .data import present_swhids from .flask_api import create_app @pytest.fixture def mock_aioresponse(): with aioresponses() as m: yield m @pytest.fixture def event_loop(): """Fixture that generate an asyncio event loop.""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) yield loop loop.close() @pytest.fixture async def aiosession(): """Fixture that generate an aiohttp Client Session.""" session = aiohttp.ClientSession() yield session session.detach() @pytest.fixture(scope="function") def test_sample_folder(datadir, tmp_path): """Location of the "data" folder""" archive_path = Path(os.path.join(datadir, "sample-folder.tgz")) assert archive_path.exists() shutil.unpack_archive(archive_path, extract_dir=tmp_path) test_sample_folder = Path(os.path.join(tmp_path, "sample-folder")) assert test_sample_folder.exists() return test_sample_folder @pytest.fixture(scope="function") def test_sample_folder_policy(datadir, tmp_path): """Location of the sample source code project to test the scanner policies""" archive_path = Path(os.path.join(datadir, "sample-folder-policy.tgz")) assert archive_path.exists() shutil.unpack_archive(archive_path, extract_dir=tmp_path) test_sample_folder = Path(os.path.join(tmp_path, "sample-folder-policy")) assert test_sample_folder.exists() return test_sample_folder @pytest.fixture(scope="function") def source_tree(test_sample_folder): """Generate a model.from_disk.Directory object from the test sample folder """ return model_of_dir(str(test_sample_folder).encode()) @pytest.fixture(scope="function") def big_source_tree(tmp_path): """Generate a model.from_disk.Directory from a "big" temporary directory - (more than 1000 nodes) + (more than 1000 nodes) """ # workaround to avoid a RecursionError that could be generated while creating # a large number of directories sys.setrecursionlimit(1100) dir_ = tmp_path / "big-directory" sub_dirs = dir_ for i in range(0, QUERY_LIMIT + 1): sub_dirs = sub_dirs / "dir" sub_dirs.mkdir(parents=True, exist_ok=True) file_ = sub_dirs / "file.org" file_.touch() dir_obj = model_of_dir(str(dir_).encode()) return dir_obj @pytest.fixture(scope="function") def source_tree_policy(test_sample_folder_policy): """Generate a model.from_disk.Directory object from the test sample folder """ return model_of_dir(str(test_sample_folder_policy).encode()) @pytest.fixture(scope="function") def source_tree_dirs(source_tree): """Returns a list of all directories contained inside the test sample folder """ root = source_tree.data["path"] return list( map( lambda n: Path(n.data["path"].decode()).relative_to(Path(root.decode())), filter( lambda n: n.object_type == "directory" and not n.data["path"] == source_tree.data["path"], source_tree.iter_tree(dedup=False), ), ) ) @pytest.fixture(scope="function") def nodes_data(source_tree): """mock known status of file/dirs in test_sample_folder""" nodes_data = MerkleNodeInfo() for node in source_tree.iter_tree(): nodes_data[node.swhid()] = {"known": True} return nodes_data @pytest.fixture def test_swhids_sample(tmp_path): """Create and return the opened "swhids_sample" file, filled with present swhids present in data.py """ test_swhids_sample = Path(os.path.join(tmp_path, "swhids_sample.txt")) with open(test_swhids_sample, "w") as f: f.write("\n".join(swhid for swhid in present_swhids)) assert test_swhids_sample.exists() return open(test_swhids_sample, "r") @pytest.fixture(scope="session") def tmp_requests(tmpdir_factory): requests_file = tmpdir_factory.mktemp("data").join("requests.json") return requests_file @pytest.fixture(scope="session") def app(tmp_requests): """Flask backend API (used by live_server).""" app = create_app(tmp_requests) return app diff --git a/swh/scanner/tests/test_client.py b/swh/scanner/tests/test_client.py index 26c1625..64f6d87 100644 --- a/swh/scanner/tests/test_client.py +++ b/swh/scanner/tests/test_client.py @@ -1,58 +1,60 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import pytest from swh.model.swhids import CoreSWHID from swh.scanner.client import Client from swh.scanner.exceptions import APIError from .data import correct_known_api_response, correct_origin_api_response AIO_URL = "http://example.org/api/" KNOWN_URL = f"{AIO_URL}known/" ORIGIN_URL = f"{AIO_URL}graph/randomwalk/" def test_client_known_correct_api_request(mock_aioresponse, event_loop, aiosession): mock_aioresponse.post( KNOWN_URL, status=200, content_type="application/json", body=json.dumps(correct_known_api_response), ) client = Client(AIO_URL, aiosession) actual_result = event_loop.run_until_complete(client.known([])) assert correct_known_api_response == actual_result def test_client_known_raise_apierror(mock_aioresponse, event_loop, aiosession): mock_aioresponse.post(KNOWN_URL, content_type="application/json", status=413) client = Client(AIO_URL, aiosession) with pytest.raises(APIError): event_loop.run_until_complete(client.known([])) def test_client_get_origin_correct_api_request( mock_aioresponse, event_loop, aiosession ): origin_url = ( f"{ORIGIN_URL}swh:1:dir:01fa282bb80be5907505d44b4692d3fa40fad140/ori" f"/?direction=backward&limit=-1&resolve_origins=true" ) mock_aioresponse.get( - origin_url, status=200, body=correct_origin_api_response, + origin_url, + status=200, + body=correct_origin_api_response, ) client = Client(AIO_URL, aiosession) swhid = CoreSWHID.from_string("swh:1:dir:01fa282bb80be5907505d44b4692d3fa40fad140") actual_result = event_loop.run_until_complete(client.get_origin(swhid)) assert correct_origin_api_response == actual_result