Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/scanner/model.py b/swh/scanner/model.py
index 2e0aca2..5287376 100644
--- a/swh/scanner/model.py
+++ b/swh/scanner/model.py
@@ -1,179 +1,245 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from __future__ import annotations
import sys
import json
from pathlib import PosixPath
-from typing import Any, Dict, Tuple
+from typing import Any, Dict, Tuple, Iterable
from enum import Enum
from .plot import sunburst
from .exceptions import InvalidObjectType
from swh.model.identifiers import DIRECTORY, CONTENT
class Color(Enum):
blue = "\033[94m"
green = "\033[92m"
red = "\033[91m"
end = "\033[0m"
def colorize(text: str, color: Color):
return color.value + text + Color.end.value
class Tree:
"""Representation of a file system structure
"""
def __init__(self, path: PosixPath, father: Tree = None):
self.father = father
self.path = path
self.otype = DIRECTORY if path.is_dir() else CONTENT
- self.pid = ""
+ self.swhid = ""
+ self.known = False
self.children: Dict[PosixPath, Tree] = {}
- def addNode(self, path: PosixPath, pid: str = None) -> None:
+ def addNode(self, path: PosixPath, swhid: str, known: bool) -> None:
"""Recursively add a new path.
"""
relative_path = path.relative_to(self.path)
if relative_path == PosixPath("."):
- if pid is not None:
- self.pid = pid
+ self.swhid = swhid
+ self.known = known
return
new_path = self.path.joinpath(relative_path.parts[0])
if new_path not in self.children:
self.children[new_path] = Tree(new_path, self)
- self.children[new_path].addNode(path, pid)
+ self.children[new_path].addNode(path, swhid, known)
def show(self, format) -> None:
"""Show tree in different formats"""
if format == "json":
print(json.dumps(self.getTree(), indent=4, sort_keys=True))
elif format == "text":
isatty = sys.stdout.isatty()
print(colorize(str(self.path), Color.blue) if isatty else str(self.path))
self.printChildren(isatty)
elif format == "sunburst":
root = self.path
directories = self.getDirectoriesInfo(root)
sunburst(directories, root)
def printChildren(self, isatty: bool, inc: int = 1) -> None:
for path, node in self.children.items():
self.printNode(node, isatty, inc)
if node.children:
node.printChildren(isatty, inc + 1)
def printNode(self, node: Any, isatty: bool, inc: int) -> None:
rel_path = str(node.path.relative_to(self.path))
begin = "│ " * inc
end = "/" if node.otype == DIRECTORY else ""
if isatty:
- if not node.pid:
+ if not node.known:
rel_path = colorize(rel_path, Color.red)
elif node.otype == DIRECTORY:
rel_path = colorize(rel_path, Color.blue)
elif node.otype == CONTENT:
rel_path = colorize(rel_path, Color.green)
print(f"{begin}{rel_path}{end}")
+ @property
+ def attributes(self):
+ """
+ Get the attributes of the current node grouped by the relative path.
+
+ Returns:
+ a dictionary containing a path as key and its known/unknown status and the
+ Software Heritage persistent identifier as values.
+
+ """
+ return {str(self.path): {"swhid": self.swhid, "known": self.known,}}
+
+ def toDict(self, dict_nodes={}) -> Dict[str, Dict[str, Dict]]:
+ """
+ Recursively groups the current child nodes inside a dictionary.
+
+ For example, if you have the following structure:
+
+ .. code-block:: none
+
+ root {
+ subdir: {
+ file.txt
+ }
+ }
+
+ The generated dictionary will be:
+
+ .. code-block:: none
+
+ {
+ "root": {
+ "swhid": "...",
+ "known": True/False
+ }
+ "root/subdir": {
+ "swhid": "...",
+ "known": True/False
+ }
+ "root/subdir/file.txt": {
+ "swhid": "...",
+ "known": True/False
+ }
+ }
+
+
+ """
+ for node_dict in self.iterate():
+ dict_nodes.update(node_dict)
+ return dict_nodes
+
+ def iterate(self) -> Iterable[Dict[str, Dict]]:
+ """
+ Recursively iterate through the children of the current node
+
+ Yields:
+ a dictionary containing a path with its known/unknown status and the
+ Software Heritage persistent identifier
+
+ """
+ for _, child_node in self.children.items():
+ yield child_node.attributes
+ if child_node.otype == DIRECTORY:
+ yield from child_node.iterate()
+
def getTree(self):
"""Walk through the tree to discover content or directory that have
a persistent identifier. If a persistent identifier is found it saves
the path with the relative PID.
Returns:
child_tree: the tree with the content/directory found
"""
child_tree = {}
for path, child_node in self.children.items():
rel_path = str(child_node.path.relative_to(self.path))
- if child_node.pid:
- child_tree[rel_path] = child_node.pid
+ if child_node.swhid:
+ child_tree[rel_path] = child_node.swhid
else:
next_tree = child_node.getTree()
if next_tree:
child_tree[rel_path] = next_tree
return child_tree
def __getSubDirsInfo(self, root, directories):
"""Fills the directories given in input with the contents information
stored inside the directory child, only if they have contents.
"""
for path, child_node in self.children.items():
if child_node.otype == DIRECTORY:
rel_path = path.relative_to(root)
contents_info = child_node.count_contents()
# checks the first element of the tuple
# (the number of contents in a directory)
# if it is equal to zero it means that there are no contents
# in that directory.
if not contents_info[0] == 0:
directories[rel_path] = contents_info
if child_node.has_dirs():
child_node.__getSubDirsInfo(root, directories)
def getDirectoriesInfo(self, root: PosixPath) -> Dict[PosixPath, Tuple[int, int]]:
"""Get information about all directories under the given root.
Returns:
A dictionary with a directory path as key and the relative
contents information (the result of count_contents) as values.
"""
directories = {root: self.count_contents()}
self.__getSubDirsInfo(root, directories)
return directories
def count_contents(self) -> Tuple[int, int]:
"""Count how many contents are present inside a directory.
If a directory has a pid returns as it has all the contents.
Returns:
A tuple with the total number of the contents and the number
of contents known (the ones that have a persistent identifier).
"""
contents = 0
discovered = 0
if not self.otype == DIRECTORY:
raise InvalidObjectType(
"Can't calculate contents of the " "object type: %s" % self.otype
)
- if self.pid:
+ if self.known:
# to identify a directory with all files/directories present
return (1, 1)
else:
for _, child_node in self.children.items():
if child_node.otype == CONTENT:
contents += 1
- if child_node.pid:
+ if child_node.known:
discovered += 1
return (contents, discovered)
def has_dirs(self) -> bool:
"""Checks if node has directories
"""
for _, child_node in self.children.items():
if child_node.otype == DIRECTORY:
return True
return False
diff --git a/swh/scanner/scanner.py b/swh/scanner/scanner.py
index 9759c4c..e1ff2bc 100644
--- a/swh/scanner/scanner.py
+++ b/swh/scanner/scanner.py
@@ -1,182 +1,180 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
import itertools
import asyncio
import aiohttp
from typing import List, Dict, Tuple, Iterator, Union, Set, Any
from pathlib import PosixPath
from .exceptions import error_response
from .model import Tree
from swh.model.from_disk import Directory, Content, accept_all_directories
from swh.model.identifiers import (
persistent_identifier,
parse_persistent_identifier,
DIRECTORY,
CONTENT,
)
async def pids_discovery(
pids: List[str], session: aiohttp.ClientSession, api_url: str,
) -> Dict[str, Dict[str, bool]]:
"""API Request to get information about the persistent identifiers given in
input.
Args:
pids: a list of persistent identifier
api_url: url for the API request
Returns:
A dictionary with:
key: persistent identifier searched
value:
value['known'] = True if the pid is found
value['known'] = False if the pid is not found
"""
endpoint = api_url + "known/"
chunk_size = 1000
requests = []
def get_chunk(pids):
for i in range(0, len(pids), chunk_size):
yield pids[i : i + chunk_size]
async def make_request(pids):
async with session.post(endpoint, json=pids) as resp:
if resp.status != 200:
error_response(resp.reason, resp.status, endpoint)
return await resp.json()
if len(pids) > chunk_size:
for pids_chunk in get_chunk(pids):
requests.append(asyncio.create_task(make_request(pids_chunk)))
res = await asyncio.gather(*requests)
# concatenate list of dictionaries
return dict(itertools.chain.from_iterable(e.items() for e in res))
else:
return await make_request(pids)
def directory_filter(path_name: Union[str, bytes], exclude_patterns: Set[Any]) -> bool:
"""It checks if the path_name is matching with the patterns given in input.
It is also used as a `dir_filter` function when generating the directory
object from `swh.model.from_disk`
Returns:
False if the directory has to be ignored, True otherwise
"""
path = PosixPath(path_name.decode() if isinstance(path_name, bytes) else path_name)
for sre_pattern in exclude_patterns:
if sre_pattern.match(str(path)):
return False
return True
def get_subpaths(
path: PosixPath, exclude_patterns: Set[Any]
) -> Iterator[Tuple[PosixPath, str]]:
"""Find the persistent identifier of the directories and files under a
given path.
Args:
path: the root path
Yields:
pairs of: path, the relative persistent identifier
"""
def pid_of(path):
if path.is_dir():
if exclude_patterns:
def dir_filter(dirpath, *args):
return directory_filter(dirpath, exclude_patterns)
else:
dir_filter = accept_all_directories
obj = Directory.from_disk(
path=bytes(path), dir_filter=dir_filter
).get_data()
return persistent_identifier(DIRECTORY, obj)
else:
obj = Content.from_file(path=bytes(path)).get_data()
return persistent_identifier(CONTENT, obj)
dirpath, dnames, fnames = next(os.walk(path))
for node in itertools.chain(dnames, fnames):
sub_path = PosixPath(dirpath).joinpath(node)
yield (sub_path, pid_of(sub_path))
async def parse_path(
path: PosixPath,
session: aiohttp.ClientSession,
api_url: str,
exclude_patterns: Set[Any],
) -> Iterator[Tuple[str, str, bool]]:
"""Check if the sub paths of the given path are present in the
archive or not.
Args:
path: the source path
api_url: url for the API request
Returns:
a map containing tuples with: a subpath of the given path,
the pid of the subpath and the result of the api call
"""
parsed_paths = dict(get_subpaths(path, exclude_patterns))
parsed_pids = await pids_discovery(list(parsed_paths.values()), session, api_url)
def unpack(tup):
subpath, pid = tup
return (subpath, pid, parsed_pids[pid]["known"])
return map(unpack, parsed_paths.items())
async def run(
root: PosixPath, api_url: str, source_tree: Tree, exclude_patterns: Set[Any]
) -> None:
"""Start scanning from the given root.
It fills the source tree with the path discovered.
Args:
root: the root path to scan
api_url: url for the API request
"""
async def _scan(root, session, api_url, source_tree, exclude_patterns):
- for path, pid, found in await parse_path(
+ for path, pid, known in await parse_path(
root, session, api_url, exclude_patterns
):
obj_type = parse_persistent_identifier(pid).object_type
if obj_type == CONTENT:
- source_tree.addNode(path, pid if found else None)
+ source_tree.addNode(path, pid, known)
elif obj_type == DIRECTORY and directory_filter(path, exclude_patterns):
- if found:
- source_tree.addNode(path, pid)
- else:
- source_tree.addNode(path)
+ source_tree.addNode(path, pid, known)
+ if not known:
await _scan(path, session, api_url, source_tree, exclude_patterns)
async with aiohttp.ClientSession() as session:
await _scan(root, session, api_url, source_tree, exclude_patterns)
diff --git a/swh/scanner/tests/conftest.py b/swh/scanner/tests/conftest.py
index fafcc6c..1b1493c 100644
--- a/swh/scanner/tests/conftest.py
+++ b/swh/scanner/tests/conftest.py
@@ -1,136 +1,136 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import pytest
import asyncio
import aiohttp
import os
from pathlib import PosixPath
from aioresponses import aioresponses # type: ignore
from swh.model.cli import pid_of_file, pid_of_dir
from swh.scanner.model import Tree
from .flask_api import create_app
@pytest.fixture
def mock_aioresponse():
with aioresponses() as m:
yield m
@pytest.fixture
def event_loop():
"""Fixture that generate an asyncio event loop."""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
yield loop
loop.close()
@pytest.fixture
async def aiosession():
"""Fixture that generate an aiohttp Client Session."""
session = aiohttp.ClientSession()
yield session
session.detach()
@pytest.fixture(scope="session")
def temp_folder(tmp_path_factory):
"""Fixture that generates a temporary folder with the following
structure:
.. code-block:: python
root = {
subdir: {
subsubdir
filesample.txt
filesample2.txt
}
subdir2
subfile.txt
}
"""
root = tmp_path_factory.getbasetemp()
subdir = tmp_path_factory.mktemp("subdir")
subsubdir = subdir.joinpath("subsubdir")
subsubdir.mkdir()
subdir2 = tmp_path_factory.mktemp("subdir2")
subfile = root / "subfile.txt"
subfile.touch()
filesample = subdir / "filesample.txt"
filesample.touch()
filesample2 = subdir / "filesample2.txt"
filesample2.touch()
avail_path = {
subdir: pid_of_dir(bytes(subdir)),
subsubdir: pid_of_dir(bytes(subsubdir)),
subdir2: pid_of_dir(bytes(subdir2)),
subfile: pid_of_file(bytes(subfile)),
filesample: pid_of_file(bytes(filesample)),
filesample2: pid_of_file(bytes(filesample2)),
}
return {
"root": root,
"paths": avail_path,
"filesample": filesample,
"filesample2": filesample2,
"subsubdir": subsubdir,
"subdir": subdir,
}
@pytest.fixture(scope="function")
def example_tree(temp_folder):
"""Fixture that generate a Tree with the root present in the
- session fixture "temp_folder".
+ session fixture "temp_folder".
"""
example_tree = Tree(temp_folder["root"])
assert example_tree.path == temp_folder["root"]
return example_tree
@pytest.fixture(scope="function")
def example_dirs(example_tree, temp_folder):
"""
Fixture that fill the fixture example_tree with the values contained in
the fixture temp_folder and returns the directories information of the
filled example_tree.
"""
root = temp_folder["root"]
filesample_path = temp_folder["filesample"]
filesample2_path = temp_folder["filesample2"]
subsubdir_path = temp_folder["subsubdir"]
known_paths = [filesample_path, filesample2_path, subsubdir_path]
for path, pid in temp_folder["paths"].items():
if path in known_paths:
- example_tree.addNode(path, pid)
+ example_tree.addNode(path, pid, True)
else:
- example_tree.addNode(path)
+ example_tree.addNode(path, pid, False)
return example_tree.getDirectoriesInfo(root)
@pytest.fixture
def test_folder():
"""Location of the "data" folder """
tests_path = PosixPath(os.path.abspath(__file__)).parent
tests_data_folder = tests_path.joinpath("data")
assert tests_data_folder.exists()
return tests_data_folder
@pytest.fixture(scope="session")
def app():
"""Flask backend API (used by live_server)."""
app = create_app()
return app
diff --git a/swh/scanner/tests/data.py b/swh/scanner/tests/data.py
index 8cd289b..1235b94 100644
--- a/swh/scanner/tests/data.py
+++ b/swh/scanner/tests/data.py
@@ -1,18 +1,21 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
correct_api_response = {
"swh:1:dir:17d207da3804cc60a77cba58e76c3b2f767cb112": {"known": False},
"swh:1:dir:01fa282bb80be5907505d44b4692d3fa40fad140": {"known": True},
"swh:1:dir:4b825dc642cb6eb9a060e54bf8d69288fbee4904": {"known": True},
}
# present pids inside /data/sample-folder
-present_pids = [
+present_swhids = [
"swh:1:cnt:7c4c57ba9ff496ad179b8f65b1d286edbda34c9a", # quotes.md
"swh:1:cnt:68769579c3eaadbe555379b9c3538e6628bae1eb", # some-binary
"swh:1:dir:9619a28687b2462efbb5be816bc1185b95753d93", # barfoo2/
"swh:1:dir:07d4d9ec5c406632d203dbd4631e7863612a0326", # toexclude/
]
+
+
+to_exclude_swhid = "swh:1:dir:07d4d9ec5c406632d203dbd4631e7863612a0326"
diff --git a/swh/scanner/tests/data/sample-folder-result-no-toexclude.json b/swh/scanner/tests/data/sample-folder-result-no-toexclude.json
deleted file mode 100644
index d16a6f7..0000000
--- a/swh/scanner/tests/data/sample-folder-result-no-toexclude.json
+++ /dev/null
@@ -1,12 +0,0 @@
-{
- "foo": {
- "quotes.md": "swh:1:cnt:7c4c57ba9ff496ad179b8f65b1d286edbda34c9a"
- },
- "bar": {
- "barfoo2": "swh:1:dir:9619a28687b2462efbb5be816bc1185b95753d93"
- },
- "link-to-foo": {
- "quotes.md": "swh:1:cnt:7c4c57ba9ff496ad179b8f65b1d286edbda34c9a"
- },
- "some-binary": "swh:1:cnt:68769579c3eaadbe555379b9c3538e6628bae1eb"
-}
diff --git a/swh/scanner/tests/data/sample-folder-result.json b/swh/scanner/tests/data/sample-folder-result.json
deleted file mode 100644
index 78350c0..0000000
--- a/swh/scanner/tests/data/sample-folder-result.json
+++ /dev/null
@@ -1,13 +0,0 @@
-{
- "foo": {
- "quotes.md": "swh:1:cnt:7c4c57ba9ff496ad179b8f65b1d286edbda34c9a"
- },
- "bar": {
- "barfoo2": "swh:1:dir:9619a28687b2462efbb5be816bc1185b95753d93"
- },
- "link-to-foo": {
- "quotes.md": "swh:1:cnt:7c4c57ba9ff496ad179b8f65b1d286edbda34c9a"
- },
- "toexclude": "swh:1:dir:07d4d9ec5c406632d203dbd4631e7863612a0326",
- "some-binary": "swh:1:cnt:68769579c3eaadbe555379b9c3538e6628bae1eb"
-}
diff --git a/swh/scanner/tests/flask_api.py b/swh/scanner/tests/flask_api.py
index 7896d82..7fb0b1c 100644
--- a/swh/scanner/tests/flask_api.py
+++ b/swh/scanner/tests/flask_api.py
@@ -1,32 +1,32 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from flask import Flask, request
-from .data import present_pids
+from .data import present_swhids
from swh.web.common.exc import LargePayloadExc
def create_app():
app = Flask(__name__)
@app.route("/known/", methods=["POST"])
def known():
- pids = request.get_json()
+ swhids = request.get_json()
- if len(pids) > 900:
+ if len(swhids) > 900:
raise LargePayloadExc(
"The maximum number of PIDs this endpoint " "can receive is 900"
)
- res = {pid: {"known": False} for pid in pids}
- for pid in pids:
- if pid in present_pids:
- res[pid]["known"] = True
+ res = {swhid: {"known": False} for swhid in swhids}
+ for swhid in swhids:
+ if swhid in present_swhids:
+ res[swhid]["known"] = True
return res
return app
diff --git a/swh/scanner/tests/test_model.py b/swh/scanner/tests/test_model.py
index 5e2757c..730d3bf 100644
--- a/swh/scanner/tests/test_model.py
+++ b/swh/scanner/tests/test_model.py
@@ -1,71 +1,78 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
def test_tree_add_node(example_tree, temp_folder):
avail_paths = temp_folder["paths"].keys()
for path, pid in temp_folder["paths"].items():
- example_tree.addNode(path, pid)
+ example_tree.addNode(path, pid, False)
for path, node in example_tree.children.items():
assert path in avail_paths
if node.children:
for subpath, subnode in node.children.items():
assert subpath in avail_paths
-def test_get_json_tree_all_not_present(example_tree, temp_folder):
+def test_to_json_no_one_present(example_tree, temp_folder):
for path, pid in temp_folder["paths"].items():
- example_tree.addNode(path)
+ example_tree.addNode(path, pid, False)
- json_tree = example_tree.getTree()
+ result = example_tree.toDict()
- assert len(json_tree) == 0
+ assert len(result) == 6
+
+ for _, node_info in result.items():
+ assert node_info["known"] is False
def test_get_json_tree_all_present(example_tree, temp_folder):
for path, pid in temp_folder["paths"].items():
- example_tree.addNode(path, pid)
+ example_tree.addNode(path, pid, True)
+
+ result = example_tree.toDict()
- tree_dict = example_tree.getTree()
+ assert len(result) == 6
- assert len(tree_dict) == 3
- # since subdir have a pid, it can't have a children path
- assert tree_dict["subdir0"] is not dict
+ for _, node_info in result.items():
+ assert node_info["known"] is True
def test_get_json_tree_only_one_present(example_tree, temp_folder):
+ root = temp_folder["root"]
filesample_path = temp_folder["filesample"]
for path, pid in temp_folder["paths"].items():
- if path == filesample_path:
- example_tree.addNode(path, pid)
- else:
- example_tree.addNode(path)
+ example_tree.addNode(path, pid, True if path == filesample_path else False)
- tree_dict = example_tree.getTree()
+ result = example_tree.toDict()
- assert len(tree_dict) == 1
- assert tree_dict["subdir0"]["filesample.txt"]
+ assert len(result) == 6
+
+ for path, node_attr in result.items():
+ if path == str(root) + "/subdir0/filesample.txt":
+ assert node_attr["known"] is True
+ else:
+ assert node_attr["known"] is False
def test_get_directories_info(example_tree, temp_folder):
root_path = temp_folder["root"]
filesample_path = temp_folder["filesample"]
filesample2_path = temp_folder["filesample2"]
subdir_path = temp_folder["subdir"].relative_to(root_path)
subsubdir_path = temp_folder["subsubdir"].relative_to(root_path)
for path, pid in temp_folder["paths"].items():
if path == filesample_path or path == filesample2_path:
- example_tree.addNode(path, pid)
+ example_tree.addNode(path, pid, True)
else:
- example_tree.addNode(path)
+ example_tree.addNode(path, pid, False)
directories = example_tree.getDirectoriesInfo(example_tree.path)
assert subsubdir_path not in directories
assert directories[subdir_path] == (2, 2)
diff --git a/swh/scanner/tests/test_scanner.py b/swh/scanner/tests/test_scanner.py
index f5c6b70..b9bc544 100644
--- a/swh/scanner/tests/test_scanner.py
+++ b/swh/scanner/tests/test_scanner.py
@@ -1,112 +1,106 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import pytest
import json
from pathlib import PosixPath
-from .data import correct_api_response
+from .data import correct_api_response, present_swhids, to_exclude_swhid
from swh.scanner.scanner import pids_discovery, get_subpaths, run
from swh.scanner.model import Tree
from swh.scanner.cli import extract_regex_objs
from swh.scanner.exceptions import APIError
aio_url = "http://example.org/api/known/"
def test_scanner_correct_api_request(mock_aioresponse, event_loop, aiosession):
mock_aioresponse.post(
aio_url,
status=200,
content_type="application/json",
body=json.dumps(correct_api_response),
)
actual_result = event_loop.run_until_complete(
pids_discovery([], aiosession, "http://example.org/api/")
)
assert correct_api_response == actual_result
def test_scanner_raise_apierror(mock_aioresponse, event_loop, aiosession):
mock_aioresponse.post(aio_url, content_type="application/json", status=413)
with pytest.raises(APIError):
event_loop.run_until_complete(
pids_discovery([], aiosession, "http://example.org/api/")
)
def test_scanner_raise_apierror_input_size_limit(event_loop, aiosession, live_server):
api_url = live_server.url() + "/"
request = [
"swh:1:cnt:7c4c57ba9ff496ad179b8f65b1d286edbda34c9a" for i in range(901)
] # /known/ is limited at 900
with pytest.raises(APIError):
event_loop.run_until_complete(pids_discovery(request, aiosession, api_url))
def test_scanner_get_subpaths(temp_folder):
root = temp_folder["root"]
actual_result = []
for subpath, pid in get_subpaths(root, tuple()):
# also check if it's a symlink since pytest tmp_dir fixture create
# also a symlink to each directory inside the tmp_dir path
if subpath.is_dir() and not subpath.is_symlink():
actual_result.append((subpath, pid))
assert len(actual_result) == 2
@pytest.mark.options(debug=False)
def test_app(app):
assert not app.debug
def test_scanner_result(live_server, event_loop, test_folder):
api_url = live_server.url() + "/"
- result_path = test_folder.joinpath(PosixPath("sample-folder-result.json"))
- with open(result_path, "r") as json_file:
- expected_result = json.loads(json_file.read())
-
sample_folder = test_folder.joinpath(PosixPath("sample-folder"))
source_tree = Tree(sample_folder)
- event_loop.run_until_complete(run(sample_folder, api_url, source_tree, tuple()))
-
- actual_result = source_tree.getTree()
+ event_loop.run_until_complete(run(sample_folder, api_url, source_tree, set()))
- assert actual_result == expected_result
+ for node_dict in source_tree.iterate():
+ node_info = list(node_dict.values())[0]
+ if node_info["swhid"] in present_swhids:
+ assert node_info["known"] is True
+ else:
+ assert node_info["known"] is False
def test_scanner_result_with_exclude_patterns(live_server, event_loop, test_folder):
api_url = live_server.url() + "/"
- result_path = test_folder.joinpath(
- PosixPath("sample-folder-result-no-toexclude.json")
- )
- with open(result_path, "r") as json_file:
- expected_result = json.loads(json_file.read())
-
sample_folder = test_folder.joinpath(PosixPath("sample-folder"))
+
patterns = (str(sample_folder) + "/toexclude",)
exclude_pattern = {
reg_obj for reg_obj in extract_regex_objs(sample_folder, patterns)
}
source_tree = Tree(sample_folder)
event_loop.run_until_complete(
run(sample_folder, api_url, source_tree, exclude_pattern)
)
- actual_result = source_tree.getTree()
-
- assert actual_result == expected_result
+ for node_dict in source_tree.iterate():
+ node_info = list(node_dict.values())[0]
+ assert node_info["swhid"] != to_exclude_swhid

File Metadata

Mime Type
text/x-diff
Expires
Jul 4 2025, 8:45 AM (6 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3275254

Event Timeline