diff --git a/swh/scanner/dashboard/dashboard.py b/swh/scanner/dashboard/dashboard.py index 27db913..71d4c24 100644 --- a/swh/scanner/dashboard/dashboard.py +++ b/swh/scanner/dashboard/dashboard.py @@ -1,101 +1,101 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from pathlib import PosixPath +from pathlib import Path from ..model import Tree import plotly.graph_objects as go import dash import dash_core_components as dcc import dash_html_components as html import dash_bootstrap_components as dbc from dash.dependencies import Input, Output -def generate_table_body(dir_path: PosixPath, source: Tree): +def generate_table_body(dir_path: Path, source: Tree): """ Generate the data_table from the path taken from the chart. For each file builds the html table rows showing the known status, a local link to the file and the relative SoftWare Heritage persistent IDentifier (SWHID). """ data = [] for file_info in source.getFilesFromDir(dir_path): for file_path, attr in file_info.items(): - file_path = PosixPath(file_path) + file_path = Path(file_path) file_name = file_path.parts[len(file_path.parts) - 1] data.append( html.Tr( [ html.Td("✔" if attr["known"] else ""), html.Td( html.A(file_name, href="file://" + str(file_path.resolve())) ), html.Td(attr["swhid"]), ] ) ) return [html.Tbody(data)] def run_app(graph_obj: go, source: Tree): app = dash.Dash(__name__) fig = go.Figure().add_trace(graph_obj) fig.update_layout(height=800,) table_header = [ html.Thead(html.Tr([html.Th("KNOWN"), html.Th("FILE NAME"), html.Th("SWHID")])) ] app.layout = html.Div( [ html.Div( [ html.Div( [dcc.Graph(id="sunburst_chart", figure=fig),], className="col", ), html.Div( [ html.H3(id="directory_title"), dbc.Table( id="files_table", hover=True, responsive=True, striped=True, ), ], className="col", ), ], className="row", ), ] ) @app.callback( [Output("files_table", "children"), Output("directory_title", "children")], [Input("sunburst_chart", "clickData")], ) def update_files_table(click_data): """ Callback that takes the input (directory path) from the chart and update the `files_table` children with the relative files. """ if click_data is not None: raw_path = click_data["points"][0]["label"] full_path = ( source.path.joinpath(raw_path) if raw_path != str(source.path) - else PosixPath(raw_path) + else Path(raw_path) ) return table_header + generate_table_body(full_path, source), str(full_path) else: return "", "" app.run_server(debug=True, use_reloader=True) diff --git a/swh/scanner/model.py b/swh/scanner/model.py index 108e9e2..997e65b 100644 --- a/swh/scanner/model.py +++ b/swh/scanner/model.py @@ -1,265 +1,265 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations import sys import json -from pathlib import PosixPath +from pathlib import Path from typing import Any, Dict, Tuple, Iterable, List from enum import Enum import ndjson from .plot import generate_sunburst, offline_plot from .exceptions import InvalidObjectType, InvalidDirectoryPath from swh.model.identifiers import DIRECTORY, CONTENT class Color(Enum): blue = "\033[94m" green = "\033[92m" red = "\033[91m" end = "\033[0m" def colorize(text: str, color: Color): return color.value + text + Color.end.value class Tree: """Representation of a file system structure """ - def __init__(self, path: PosixPath, father: Tree = None): + def __init__(self, path: Path, father: Tree = None): self.father = father self.path = path self.otype = DIRECTORY if path.is_dir() else CONTENT self.swhid = "" self.known = False - self.children: Dict[PosixPath, Tree] = {} + self.children: Dict[Path, Tree] = {} - def addNode(self, path: PosixPath, swhid: str, known: bool) -> None: + def addNode(self, path: Path, swhid: str, known: bool) -> None: """Recursively add a new path. """ relative_path = path.relative_to(self.path) - if relative_path == PosixPath("."): + if relative_path == Path("."): self.swhid = swhid self.known = known return new_path = self.path.joinpath(relative_path.parts[0]) if new_path not in self.children: self.children[new_path] = Tree(new_path, self) self.children[new_path].addNode(path, swhid, known) def show(self, format) -> None: """Show tree in different formats""" if format == "json": print(json.dumps(self.toDict(), indent=4, sort_keys=True)) if format == "ndjson": print(ndjson.dumps(dict_path for dict_path in self.__iterNodesAttr())) elif format == "text": isatty = sys.stdout.isatty() print(colorize(str(self.path), Color.blue) if isatty else str(self.path)) self.printChildren(isatty) elif format == "sunburst": root = self.path directories = self.getDirectoriesInfo(root) sunburst = generate_sunburst(directories, root) offline_plot(sunburst) def printChildren(self, isatty: bool, inc: int = 1) -> None: for path, node in self.children.items(): self.printNode(node, isatty, inc) if node.children: node.printChildren(isatty, inc + 1) def printNode(self, node: Any, isatty: bool, inc: int) -> None: rel_path = str(node.path.relative_to(self.path)) begin = "│ " * inc end = "/" if node.otype == DIRECTORY else "" if isatty: if not node.known: rel_path = colorize(rel_path, Color.red) elif node.otype == DIRECTORY: rel_path = colorize(rel_path, Color.blue) elif node.otype == CONTENT: rel_path = colorize(rel_path, Color.green) print(f"{begin}{rel_path}{end}") @property def attributes(self): """ Get the attributes of the current node grouped by the relative path. Returns: a dictionary containing a path as key and its known/unknown status and the SWHID as values. """ return {str(self.path): {"swhid": self.swhid, "known": self.known,}} def toDict(self, dict_nodes={}) -> Dict[str, Dict[str, Dict]]: """ Recursively groups the current child nodes inside a dictionary. For example, if you have the following structure: .. code-block:: none root { subdir: { file.txt } } The generated dictionary will be: .. code-block:: none { "root": { "swhid": "...", "known": True/False } "root/subdir": { "swhid": "...", "known": True/False } "root/subdir/file.txt": { "swhid": "...", "known": True/False } } """ for node_dict in self.__iterNodesAttr(): dict_nodes.update(node_dict) return dict_nodes def iterate(self) -> Iterable[Tree]: """ Recursively iterate through the children of the current node """ for _, child_node in self.children.items(): yield child_node if child_node.otype == DIRECTORY: yield from child_node.iterate() def __iterNodesAttr(self) -> Iterable[Dict[str, Dict]]: """ Recursively iterate through the children of the current node returning an iterable of the children nodes attributes Yields: a dictionary containing a path with its known/unknown status and the SWHID """ for child_node in self.iterate(): yield child_node.attributes if child_node.otype == DIRECTORY: yield from child_node.__iterNodesAttr() - def getFilesFromDir(self, dir_path: PosixPath) -> List: + def getFilesFromDir(self, dir_path: Path) -> List: """ Retrieve files information about a specific directory path Returns: A list containing the files attributes present inside the directory given in input """ def getFiles(node): files = [] for _, node in node.children.items(): if node.otype == CONTENT: files.append(node.attributes) return files if dir_path == self.path: return getFiles(self) else: for node in self.iterate(): if node.path == dir_path: return getFiles(node) raise InvalidDirectoryPath( "The directory provided doesn't match any stored directory" ) def __getSubDirsInfo(self, root, directories): """Fills the directories given in input with the contents information stored inside the directory child, only if they have contents. """ for path, child_node in self.children.items(): if child_node.otype == DIRECTORY: rel_path = path.relative_to(root) contents_info = child_node.count_contents() # checks the first element of the tuple # (the number of contents in a directory) # if it is equal to zero it means that there are no contents # in that directory. if not contents_info[0] == 0: directories[rel_path] = contents_info if child_node.has_dirs(): child_node.__getSubDirsInfo(root, directories) - def getDirectoriesInfo(self, root: PosixPath) -> Dict[PosixPath, Tuple[int, int]]: + def getDirectoriesInfo(self, root: Path) -> Dict[Path, Tuple[int, int]]: """Get information about all directories under the given root. Returns: A dictionary with a directory path as key and the relative contents information (the result of count_contents) as values. """ directories = {root: self.count_contents()} self.__getSubDirsInfo(root, directories) return directories def count_contents(self) -> Tuple[int, int]: """Count how many contents are present inside a directory. If a directory has a SWHID returns as it has all the contents. Returns: A tuple with the total number of the contents and the number of contents known (the ones that have a persistent identifier). """ contents = 0 discovered = 0 if not self.otype == DIRECTORY: raise InvalidObjectType( "Can't calculate contents of the " "object type: %s" % self.otype ) if self.known: # to identify a directory with all files/directories present return (1, 1) else: for _, child_node in self.children.items(): if child_node.otype == CONTENT: contents += 1 if child_node.known: discovered += 1 return (contents, discovered) def has_dirs(self) -> bool: """Checks if node has directories """ for _, child_node in self.children.items(): if child_node.otype == DIRECTORY: return True return False diff --git a/swh/scanner/plot.py b/swh/scanner/plot.py index a1ccf6b..961d677 100644 --- a/swh/scanner/plot.py +++ b/swh/scanner/plot.py @@ -1,281 +1,278 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """ The purpose of this module is to display and to interact with the result of the scanner contained in the model. The `sunburst` function generates a navigable sunburst chart from the directories information retrieved from the model. The chart displays for each directory the total number of files and the percentage of file known. The size of the directory is defined by the total number of contents whereas the color gradient is generated relying on the percentage of contents known. """ from typing import List, Dict, Tuple -from pathlib import PosixPath +from pathlib import Path from plotly.offline import offline import plotly.graph_objects as go import pandas as pd # type: ignore import numpy as np # type: ignore def build_hierarchical_df( dirs_dataframe: pd.DataFrame, levels: List[str], metrics_columns: List[str], root_name: str, ) -> pd.DataFrame: """ Build a hierarchy of levels for Sunburst or Treemap charts. For each directory the new dataframe will have the following information: id: the directory name parent: the parent directory of id contents: the total number of contents of the directory id and the relative subdirectories known: the percentage of contents known relative to computed 'contents' Example: Given the following dataframe: .. code-block:: none lev0 lev1 contents known '' '' 20 2 //root kernel kernel/subdirker 5 0 telnet telnet/subdirtel 10 4 The output hierarchical dataframe will be like the following: .. code-block:: none id parent contents known 20 10.00 kernel/subdirker kernel 5 0.00 telnet/subdirtel telnet 10 40.00 total 20 10.00 kernel total 5 0.00 telnet total 10 40.00 total 35 17.14 To create the hierarchical dataframe we need to iterate through the dataframe given in input relying on the number of levels. Based on the previous example we have to do two iterations: iteration 1 The generated dataframe 'df_tree' will be: .. code-block:: none id parent contents known 20 10.0 kernel/subdirker kernel 5 0.0 telnet/subdirtel telnet 10 40.0 iteration 2 The generated dataframe 'df_tree' will be: .. code-block:: none id parent contents known total 20 10.0 kernel total 5 0.0 telnet total 10 40.0 Note that since we have reached the last level, the parent given to the directory id is the directory root. The 'total' row il computed by adding the number of contents of the dataframe given in input and the average of the contents known on the total number of contents. """ def compute_known_percentage(contents: pd.Series, known: pd.Series) -> pd.Series: """This function compute the percentage of known contents and generate the new known column with the percentage values. It also assures that if there is no contents inside a directory the percentage is zero """ known_values = [] for idx, content_val in enumerate(contents): if content_val == 0: known_values.append(0) else: percentage = known[idx] / contents[idx] * 100 known_values.append(percentage) return pd.Series(np.array(known_values)) complete_df = pd.DataFrame(columns=["id", "parent", "contents", "known"]) # revert the level order to start from the deepest levels = [level for level in reversed(levels)] contents_col = metrics_columns[0] known_col = metrics_columns[1] df_tree_list = [] for i, level in enumerate(levels): df_tree = pd.DataFrame(columns=["id", "parent", "contents", "known"]) dfg = dirs_dataframe.groupby(levels[i:]).sum() dfg = dfg.reset_index() df_tree["id"] = dfg[level].copy() if i < len(levels) - 1: # copy the parent directories (one level above) df_tree["parent"] = dfg[levels[i + 1]].copy() else: # last level reached df_tree["parent"] = root_name # copy the contents column df_tree["contents"] = dfg[contents_col] # compute the percentage relative to the contents df_tree["known"] = compute_known_percentage(dfg[contents_col], dfg[known_col]) df_tree_list.append(df_tree) complete_df = complete_df.append(df_tree_list, ignore_index=True) # create the main parent total_contents = dirs_dataframe[contents_col].sum() total_known = dirs_dataframe[known_col].sum() total_avg = total_known / total_contents * 100 total = pd.Series( dict(id=root_name, parent="", contents=total_contents, known=total_avg) ) complete_df = complete_df.append(total, ignore_index=True) return complete_df -def compute_max_depth(dirs_path: List[PosixPath], root: PosixPath) -> int: +def compute_max_depth(dirs_path: List[Path], root: Path) -> int: """Compute the maximum depth level of the given directory paths. Example: for `var/log/kernel/` the depth level is 3 """ max_depth = 0 for dir_path in dirs_path: if dir_path == root: continue dir_depth = len(dir_path.parts) if dir_depth > max_depth: max_depth = dir_depth return max_depth def generate_df_from_dirs( - dirs: Dict[PosixPath, Tuple[int, int]], - columns: List[str], - root: PosixPath, - max_depth: int, + dirs: Dict[Path, Tuple[int, int]], columns: List[str], root: Path, max_depth: int, ) -> pd.DataFrame: """Generate a dataframe from the directories given in input. Example: given the following directories as input .. code-block:: python dirs = { '/var/log/': (23, 2), '/var/log/kernel': (5, 0), '/var/log/telnet': (10, 3) } The generated dataframe will be: .. code-block:: none lev0 lev1 lev2 contents known 'var' 'var/log' '' 23 2 'var' 'var/log' 'var/log/kernel' 5 0 'var' 'var/log' 'var/log/telnet' 10 3 """ - def get_parents(path: PosixPath): + def get_parents(path: Path): parts = path.parts[1:] if path.parts[0] == "/" else path.parts for i in range(1, len(parts) + 1): yield "/".join(parts[0:i]) def get_dirs_array(): for dir_path, contents_info in dirs.items(): empty_lvl = max_depth - len(dir_path.parts) if dir_path == root: # ignore the root but store contents information yield [""] * (max_depth) + list(contents_info) else: yield list(get_parents(dir_path)) + [""] * empty_lvl + list( contents_info ) df = pd.DataFrame( np.array([dir_array for dir_array in get_dirs_array()]), columns=columns ) df["contents"] = pd.to_numeric(df["contents"]) df["known"] = pd.to_numeric(df["known"]) return df def generate_sunburst( - directories: Dict[PosixPath, Tuple[int, int]], root: PosixPath + directories: Dict[Path, Tuple[int, int]], root: Path ) -> go.Sunburst: """Generate a sunburst chart from the directories given in input. """ max_depth = compute_max_depth(list(directories.keys()), root) metrics_columns = ["contents", "known"] levels_columns = ["lev" + str(i) for i in range(max_depth)] df_columns = levels_columns + metrics_columns dirs_df = generate_df_from_dirs(directories, df_columns, root, max_depth) hierarchical_df = build_hierarchical_df( dirs_df, levels_columns, metrics_columns, str(root) ) sunburst = go.Sunburst( labels=hierarchical_df["id"], parents=hierarchical_df["parent"], values=hierarchical_df["contents"], branchvalues="total", marker=dict( colors=hierarchical_df["known"], colorscale="matter", cmid=50, showscale=True, ), hovertemplate="""%{label}
Files: %{value}
Known: %{color:.2f}%""", name="", ) return sunburst def offline_plot(graph_object: go): """Plot a graph object to an html file """ fig = go.Figure() fig.add_trace(graph_object) offline.plot(fig, filename="chart.html") diff --git a/swh/scanner/scanner.py b/swh/scanner/scanner.py index 8e0a805..ea89524 100644 --- a/swh/scanner/scanner.py +++ b/swh/scanner/scanner.py @@ -1,251 +1,250 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import asyncio import fnmatch import glob import itertools import os -from pathlib import PosixPath +from pathlib import Path import re from typing import List, Dict, Tuple, Iterator, Union, Iterable, Pattern, Any import aiohttp from swh.model.from_disk import Directory, Content, accept_all_directories from swh.model.identifiers import ( swhid, parse_swhid, DIRECTORY, CONTENT, ) from .exceptions import InvalidDirectoryPath, error_response from .model import Tree from .plot import generate_sunburst from .dashboard.dashboard import run_app async def swhids_discovery( swhids: List[str], session: aiohttp.ClientSession, api_url: str, ) -> Dict[str, Dict[str, bool]]: """API Request to get information about the SoftWare Heritage persistent IDentifiers (SWHIDs) given in input. Args: swhids: a list of SWHIDS api_url: url for the API request Returns: A dictionary with: key: SWHID searched value: value['known'] = True if the SWHID is found value['known'] = False if the SWHID is not found """ endpoint = api_url + "known/" chunk_size = 1000 requests = [] def get_chunk(swhids): for i in range(0, len(swhids), chunk_size): yield swhids[i : i + chunk_size] async def make_request(swhids): async with session.post(endpoint, json=swhids) as resp: if resp.status != 200: error_response(resp.reason, resp.status, endpoint) return await resp.json() if len(swhids) > chunk_size: for swhids_chunk in get_chunk(swhids): requests.append(asyncio.create_task(make_request(swhids_chunk))) res = await asyncio.gather(*requests) # concatenate list of dictionaries return dict(itertools.chain.from_iterable(e.items() for e in res)) else: return await make_request(swhids) def directory_filter( path_name: Union[str, bytes], exclude_patterns: Iterable[Pattern[str]] ) -> bool: """It checks if the path_name is matching with the patterns given in input. It is also used as a `dir_filter` function when generating the directory object from `swh.model.from_disk` Returns: False if the directory has to be ignored, True otherwise """ - path = PosixPath(path_name.decode() if isinstance(path_name, bytes) else path_name) + path = Path(path_name.decode() if isinstance(path_name, bytes) else path_name) for sre_pattern in exclude_patterns: if sre_pattern.match(str(path)): return False return True def get_subpaths( - path: PosixPath, exclude_patterns: Iterable[Pattern[str]] -) -> Iterator[Tuple[PosixPath, str]]: + path: Path, exclude_patterns: Iterable[Pattern[str]] +) -> Iterator[Tuple[Path, str]]: """Find the SoftWare Heritage persistent IDentifier (SWHID) of the directories and files under a given path. Args: path: the root path Yields: pairs of: path, the relative SWHID """ def swhid_of(path): if path.is_dir(): if exclude_patterns: def dir_filter(dirpath, *args): return directory_filter(dirpath, exclude_patterns) else: dir_filter = accept_all_directories obj = Directory.from_disk( path=bytes(path), dir_filter=dir_filter ).get_data() return swhid(DIRECTORY, obj) else: obj = Content.from_file(path=bytes(path)).get_data() return swhid(CONTENT, obj) dirpath, dnames, fnames = next(os.walk(path)) for node in itertools.chain(dnames, fnames): - sub_path = PosixPath(dirpath).joinpath(node) + sub_path = Path(dirpath).joinpath(node) yield (sub_path, swhid_of(sub_path)) async def parse_path( - path: PosixPath, + path: Path, session: aiohttp.ClientSession, api_url: str, exclude_patterns: Iterable[Pattern[str]], ) -> Iterator[Tuple[str, str, bool]]: """Check if the sub paths of the given path are present in the archive or not. Args: path: the source path api_url: url for the API request Returns: a map containing tuples with: a subpath of the given path, the SWHID of the subpath and the result of the api call """ parsed_paths = dict(get_subpaths(path, exclude_patterns)) parsed_swhids = await swhids_discovery( list(parsed_paths.values()), session, api_url ) def unpack(tup): subpath, swhid = tup return (subpath, swhid, parsed_swhids[swhid]["known"]) return map(unpack, parsed_paths.items()) async def run( config: Dict[str, Any], root: str, source_tree: Tree, exclude_patterns: Iterable[Pattern[str]], ) -> None: """Start scanning from the given root. It fills the source tree with the path discovered. Args: root: the root path to scan api_url: url for the API request """ api_url = config["web-api"]["url"] async def _scan(root, session, api_url, source_tree, exclude_patterns): for path, obj_swhid, known in await parse_path( root, session, api_url, exclude_patterns ): obj_type = parse_swhid(obj_swhid).object_type if obj_type == CONTENT: source_tree.addNode(path, obj_swhid, known) elif obj_type == DIRECTORY and directory_filter(path, exclude_patterns): source_tree.addNode(path, obj_swhid, known) if not known: await _scan(path, session, api_url, source_tree, exclude_patterns) if config["web-api"]["auth-token"]: headers = {"Authorization": f"Bearer {config['web-api']['auth-token']}"} else: headers = {} async with aiohttp.ClientSession(headers=headers) as session: await _scan(root, session, api_url, source_tree, exclude_patterns) def extract_regex_objs( - root_path: PosixPath, patterns: Iterable[str] + root_path: Path, patterns: Iterable[str] ) -> Iterator[Pattern[str]]: """Generates a regex object for each pattern given in input and checks if the path is a subdirectory or relative to the root path. Yields: an SRE_Pattern object """ for pattern in patterns: for path in glob.glob(pattern): - dirpath = PosixPath(path) + dirpath = Path(path) if root_path not in dirpath.parents: error_msg = ( f'The path "{dirpath}" is not a subdirectory or relative ' f'to the root directory path: "{root_path}"' ) raise InvalidDirectoryPath(error_msg) regex = fnmatch.translate((pattern)) yield re.compile(regex) def scan( config: Dict[str, Any], root_path: str, exclude_patterns: Iterable[str], out_fmt: str, interactive: bool, ): """Scan a source code project to discover files and directories already present in the archive""" sre_patterns = set() if exclude_patterns: sre_patterns = { - reg_obj - for reg_obj in extract_regex_objs(PosixPath(root_path), exclude_patterns) + reg_obj for reg_obj in extract_regex_objs(Path(root_path), exclude_patterns) } - source_tree = Tree(PosixPath(root_path)) + source_tree = Tree(Path(root_path)) loop = asyncio.get_event_loop() loop.run_until_complete(run(config, root_path, source_tree, sre_patterns)) if interactive: - root = PosixPath(root_path) + root = Path(root_path) directories = source_tree.getDirectoriesInfo(root) figure = generate_sunburst(directories, root) run_app(figure, source_tree) else: source_tree.show(out_fmt) diff --git a/swh/scanner/tests/conftest.py b/swh/scanner/tests/conftest.py index 58b4c5a..b2f8e1c 100644 --- a/swh/scanner/tests/conftest.py +++ b/swh/scanner/tests/conftest.py @@ -1,140 +1,140 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest import asyncio import aiohttp import os import shutil -from pathlib import PosixPath +from pathlib import Path from aioresponses import aioresponses # type: ignore from swh.model.cli import swhid_of_file, swhid_of_dir from swh.scanner.model import Tree from .flask_api import create_app @pytest.fixture def mock_aioresponse(): with aioresponses() as m: yield m @pytest.fixture def event_loop(): """Fixture that generate an asyncio event loop.""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) yield loop loop.close() @pytest.fixture async def aiosession(): """Fixture that generate an aiohttp Client Session.""" session = aiohttp.ClientSession() yield session session.detach() @pytest.fixture(scope="session") def temp_folder(tmp_path_factory): """Fixture that generates a temporary folder with the following structure: .. code-block:: python root = { subdir: { subsubdir filesample.txt filesample2.txt } subdir2 subfile.txt } """ root = tmp_path_factory.getbasetemp() subdir = tmp_path_factory.mktemp("subdir") subsubdir = subdir.joinpath("subsubdir") subsubdir.mkdir() subdir2 = tmp_path_factory.mktemp("subdir2") subfile = root / "subfile.txt" subfile.touch() filesample = subdir / "filesample.txt" filesample.touch() filesample2 = subdir / "filesample2.txt" filesample2.touch() avail_path = { subdir: swhid_of_dir(bytes(subdir)), subsubdir: swhid_of_dir(bytes(subsubdir)), subdir2: swhid_of_dir(bytes(subdir2)), subfile: swhid_of_file(bytes(subfile)), filesample: swhid_of_file(bytes(filesample)), filesample2: swhid_of_file(bytes(filesample2)), } return { "root": root, "paths": avail_path, "filesample": filesample, "filesample2": filesample2, "subsubdir": subsubdir, "subdir": subdir, } @pytest.fixture(scope="function") def example_tree(temp_folder): """Fixture that generate a Tree with the root present in the session fixture "temp_folder". """ example_tree = Tree(temp_folder["root"]) assert example_tree.path == temp_folder["root"] return example_tree @pytest.fixture(scope="function") def example_dirs(example_tree, temp_folder): """ Fixture that fill the fixture example_tree with the values contained in the fixture temp_folder and returns the directories information of the filled example_tree. """ root = temp_folder["root"] filesample_path = temp_folder["filesample"] filesample2_path = temp_folder["filesample2"] subsubdir_path = temp_folder["subsubdir"] known_paths = [filesample_path, filesample2_path, subsubdir_path] for path, swhid in temp_folder["paths"].items(): if path in known_paths: example_tree.addNode(path, swhid, True) else: example_tree.addNode(path, swhid, False) return example_tree.getDirectoriesInfo(root) @pytest.fixture def test_sample_folder(datadir, tmp_path): """Location of the "data" folder """ - archive_path = PosixPath(os.path.join(datadir, "sample-folder.tgz")) + archive_path = Path(os.path.join(datadir, "sample-folder.tgz")) assert archive_path.exists() shutil.unpack_archive(archive_path, extract_dir=tmp_path) - test_sample_folder = PosixPath(os.path.join(tmp_path, "sample-folder")) + test_sample_folder = Path(os.path.join(tmp_path, "sample-folder")) assert test_sample_folder.exists() return test_sample_folder @pytest.fixture(scope="session") def app(): """Flask backend API (used by live_server).""" app = create_app() return app