diff --git a/swh/scanner/model.py b/swh/scanner/model.py index 836dae7..94d4006 100644 --- a/swh/scanner/model.py +++ b/swh/scanner/model.py @@ -1,105 +1,106 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations import sys +import json from pathlib import PosixPath from typing import Any, Dict from enum import Enum from swh.model.identifiers import ( DIRECTORY, CONTENT ) class Color(Enum): blue = '\033[94m' green = '\033[92m' red = '\033[91m' end = '\033[0m' def colorize(text: str, color: Color): return color.value + text + Color.end.value class Tree: """Representation of a file system structure """ def __init__(self, path: PosixPath, father: Tree = None): self.father = father self.path = path self.otype = DIRECTORY if path.is_dir() else CONTENT self.pid = '' self.children: Dict[PosixPath, Tree] = {} def addNode(self, path: PosixPath, pid: str = None) -> None: """Recursively add a new node path """ relative_path = path.relative_to(self.path) if relative_path == PosixPath('.'): if pid is not None: self.pid = pid return new_path = self.path.joinpath(relative_path.parts[0]) if new_path not in self.children: self.children[new_path] = Tree(new_path, self) self.children[new_path].addNode(path, pid) def show(self, format) -> None: """Print all the tree""" if format == 'json': - print(self.getJsonTree()) + print(json.dumps(self.getTree(), indent=4, sort_keys=True)) elif format == 'text': isatty = sys.stdout.isatty() print(colorize(str(self.path), Color.blue) if isatty else str(self.path)) self.printChildren(isatty) def printChildren(self, isatty: bool, inc: int = 0) -> None: for path, node in self.children.items(): self.printNode(node, isatty, inc) if node.children: node.printChildren(isatty, inc+1) def printNode(self, node: Any, isatty: bool, inc: int) -> None: rel_path = str(node.path.relative_to(self.path)) begin = '│ ' * inc end = '/' if node.otype == DIRECTORY else '' if isatty: if not node.pid: rel_path = colorize(rel_path, Color.red) elif node.otype == DIRECTORY: rel_path = colorize(rel_path, Color.blue) elif node.otype == CONTENT: rel_path = colorize(rel_path, Color.green) print(f'{begin}{rel_path}{end}') - def getJsonTree(self): + def getTree(self): """Walk through the tree to discover content or directory that have a persistent identifier. If a persistent identifier is found it saves the path with the relative PID. Returns: child_tree: the tree with the content/directory found """ child_tree = {} for path, child_node in self.children.items(): rel_path = str(child_node.path.relative_to(self.path)) if child_node.pid: child_tree[rel_path] = child_node.pid else: - next_tree = child_node.getJsonTree() + next_tree = child_node.getTree() if next_tree: child_tree[rel_path] = next_tree return child_tree diff --git a/swh/scanner/tests/test_model.py b/swh/scanner/tests/test_model.py index 5e4eee2..ebb3817 100644 --- a/swh/scanner/tests/test_model.py +++ b/swh/scanner/tests/test_model.py @@ -1,67 +1,67 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from swh.scanner.model import Tree @pytest.fixture(scope='function') def example_tree(temp_folder): """Fixture that generate a Tree with the root present in the session fixture "temp_folder". """ example_tree = Tree(temp_folder['root']) assert example_tree.path == temp_folder['root'] return example_tree def test_tree_add_node(example_tree, temp_folder): avail_paths = temp_folder['paths'].keys() for path, pid in temp_folder['paths'].items(): example_tree.addNode(path, pid) for path, node in example_tree.children.items(): assert path in avail_paths if node.children: for subpath, subnode in node.children.items(): assert subpath in avail_paths def test_get_json_tree_all_not_present(example_tree, temp_folder): for path, pid in temp_folder['paths'].items(): example_tree.addNode(path) - json_tree = example_tree.getJsonTree() + json_tree = example_tree.getTree() assert len(json_tree) == 0 def test_get_json_tree_all_present(example_tree, temp_folder): for path, pid in temp_folder['paths'].items(): example_tree.addNode(path, pid) - tree_dict = example_tree.getJsonTree() + tree_dict = example_tree.getTree() assert len(tree_dict) == 3 # since subdir have a pid, it can't have a children path assert tree_dict['subdir0'] is not dict def test_get_json_tree_only_one_present(example_tree, temp_folder): filesample_path = temp_folder['filesample'] for path, pid in temp_folder['paths'].items(): if path == filesample_path: example_tree.addNode(path, pid) else: example_tree.addNode(path) - tree_dict = example_tree.getJsonTree() + tree_dict = example_tree.getTree() assert len(tree_dict) == 1 assert tree_dict['subdir0']['filesample.txt'] diff --git a/swh/scanner/tests/test_scanner.py b/swh/scanner/tests/test_scanner.py index 6165aec..e6200c9 100644 --- a/swh/scanner/tests/test_scanner.py +++ b/swh/scanner/tests/test_scanner.py @@ -1,79 +1,79 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest import json from pathlib import PosixPath from .data import correct_api_response from swh.scanner.scanner import pids_discovery, get_subpaths, run from swh.scanner.model import Tree from swh.scanner.exceptions import APIError aio_url = 'http://example.org/api/known/' def test_scanner_correct_api_request(mock_aioresponse, event_loop, aiosession): mock_aioresponse.post(aio_url, status=200, content_type='application/json', body=json.dumps(correct_api_response)) actual_result = event_loop.run_until_complete( pids_discovery([], aiosession, 'http://example.org/api/')) assert correct_api_response == actual_result def test_scanner_raise_apierror(mock_aioresponse, event_loop, aiosession): mock_aioresponse.post(aio_url, content_type='application/json', status=413) with pytest.raises(APIError): event_loop.run_until_complete( pids_discovery([], aiosession, 'http://example.org/api/')) def test_scanner_raise_apierror_input_size_limit( event_loop, aiosession, live_server): api_url = live_server.url() + '/' request = ["swh:1:cnt:7c4c57ba9ff496ad179b8f65b1d286edbda34c9a" for i in range(901)] # /known/ is limited at 900 with pytest.raises(APIError): event_loop.run_until_complete( pids_discovery(request, aiosession, api_url)) def test_scanner_get_subpaths(temp_folder, tmp_path): paths = temp_folder['paths'].keys() pids = temp_folder['paths'].values() for subpath, pid in get_subpaths(tmp_path): assert subpath in paths assert pid in pids @pytest.mark.options(debug=False) def test_app(app): assert not app.debug def test_scanner_result(live_server, event_loop, test_folder): api_url = live_server.url() + '/' result_path = test_folder.joinpath(PosixPath('sample-folder-result.json')) with open(result_path, 'r') as json_file: expected_result = json.loads(json_file.read()) sample_folder = test_folder.joinpath(PosixPath('sample-folder')) source_tree = Tree(sample_folder) event_loop.run_until_complete( run(sample_folder, api_url, source_tree)) - actual_result = source_tree.getJsonTree() + actual_result = source_tree.getTree() assert actual_result == expected_result