diff --git a/swh/scanner/client.py b/swh/scanner/client.py index d814b72..0a07964 100644 --- a/swh/scanner/client.py +++ b/swh/scanner/client.py @@ -1,98 +1,98 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """ Minimal async web client for the Software Heritage Web API. This module could be removed when `T2635 ` is implemented. """ import asyncio import itertools from typing import Any, Dict, List, Optional import aiohttp -from swh.model.identifiers import CoreSWHID +from swh.model.swhids import CoreSWHID from .exceptions import error_response # Maximum number of SWHIDs that can be requested by a single call to the # Web API endpoint /known/ QUERY_LIMIT = 1000 KNOWN_EP = "known/" GRAPH_RANDOMWALK_EP = "graph/randomwalk/" class Client: """Manage requests to the Software Heritage Web API. """ def __init__(self, api_url: str, session: aiohttp.ClientSession): self.api_url = api_url self.session = session async def get_origin(self, swhid: CoreSWHID) -> Optional[Any]: """Walk the compressed graph to discover the origin of a given swhid """ endpoint = ( f"{self.api_url}{GRAPH_RANDOMWALK_EP}{str(swhid)}/ori/?direction=" f"backward&limit=-1&resolve_origins=true" ) res = None async with self.session.get(endpoint) as resp: if resp.status == 200: res = await resp.text() res = res.rstrip() return res if resp.status != 404: error_response(resp.reason, resp.status, endpoint) return res async def known(self, swhids: List[CoreSWHID]) -> Dict[str, Dict[str, bool]]: """API Request to get information about the SoftWare Heritage persistent IDentifiers (SWHIDs) given in input. Args: swhids: a list of CoreSWHID instances api_url: url for the API request Returns: A dictionary with: key: string SWHID searched value: value['known'] = True if the SWHID is found value['known'] = False if the SWHID is not found """ endpoint = self.api_url + KNOWN_EP requests = [] def get_chunk(swhids): for i in range(0, len(swhids), QUERY_LIMIT): yield swhids[i : i + QUERY_LIMIT] async def make_request(swhids): swhids = [str(swhid) for swhid in swhids] async with self.session.post(endpoint, json=swhids) as resp: if resp.status != 200: error_response(resp.reason, resp.status, endpoint) return await resp.json() if len(swhids) > QUERY_LIMIT: for swhids_chunk in get_chunk(swhids): requests.append(asyncio.create_task(make_request(swhids_chunk))) res = await asyncio.gather(*requests) # concatenate list of dictionaries return dict(itertools.chain.from_iterable(e.items() for e in res)) else: return await make_request(swhids) diff --git a/swh/scanner/data.py b/swh/scanner/data.py index 88ab170..a0368db 100644 --- a/swh/scanner/data.py +++ b/swh/scanner/data.py @@ -1,150 +1,150 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from pathlib import Path from typing import Dict, Optional, Tuple from swh.model.exceptions import ValidationError from swh.model.from_disk import Directory -from swh.model.identifiers import CONTENT, DIRECTORY, CoreSWHID +from swh.model.swhids import CoreSWHID from .client import Client SUPPORTED_INFO = {"known", "origin"} class MerkleNodeInfo(dict): """Store additional information about Merkle DAG nodes, using SWHIDs as keys""" def __setitem__(self, key, value): """The keys must be valid valid Software Heritage Persistent Identifiers while values must be dict. """ if not isinstance(key, CoreSWHID): raise ValidationError("keys must be valid SWHID(s)") if not isinstance(value, dict): raise ValidationError(f"values must be dict, not {type(value)}") super(MerkleNodeInfo, self).__setitem__(key, value) def init_merkle_node_info(source_tree: Directory, data: MerkleNodeInfo, info: set): """Populate the MerkleNodeInfo with the SWHIDs of the given source tree and the attributes that will be stored. """ if not info: raise Exception("Data initialization requires node attributes values.") nodes_info: Dict[str, Optional[str]] = {} for ainfo in info: if ainfo in SUPPORTED_INFO: nodes_info[ainfo] = None else: raise Exception(f"Information {ainfo} is not supported.") for node in source_tree.iter_tree(): data[node.swhid()] = nodes_info.copy() # type: ignore async def add_origin(source_tree: Directory, data: MerkleNodeInfo, client: Client): """Store origin information about software artifacts retrieved from the Software Heritage graph service. """ queue = [] queue.append(source_tree) while queue: for node in queue.copy(): queue.remove(node) node_ori = await client.get_origin(node.swhid()) if node_ori: data[node.swhid()]["origin"] = node_ori - if node.object_type == DIRECTORY: + if node.object_type == "directory": for sub_node in node.iter_tree(): data[sub_node.swhid()]["origin"] = node_ori # type: ignore else: - if node.object_type == DIRECTORY: + if node.object_type == "directory": children = [sub_node for sub_node in node.iter_tree()] children.remove(node) queue.extend(children) # type: ignore def get_directory_data( root_path: str, source_tree: Directory, nodes_data: MerkleNodeInfo, directory_data: Dict = {}, ) -> Dict[Path, dict]: """Get content information for each directory inside source_tree. Returns: A dictionary with a directory path as key and the relative contents information as values. """ def _get_directory_data( source_tree: Directory, nodes_data: MerkleNodeInfo, directory_data: Dict ): directories = list( filter( - lambda n: n.object_type == DIRECTORY, + lambda n: n.object_type == "directory", map(lambda n: n[1], source_tree.items()), ) ) for node in directories: directory_info = directory_content(node, nodes_data) rel_path = Path(node.data["path"].decode()).relative_to(Path(root_path)) directory_data[rel_path] = directory_info if has_dirs(node): _get_directory_data(node, nodes_data, directory_data) _get_directory_data(source_tree, nodes_data, directory_data) return directory_data def directory_content(node: Directory, nodes_data: MerkleNodeInfo) -> Tuple[int, int]: """Count known contents inside the given directory. Returns: A tuple with the total number of contents inside the directory and the number of known contents. """ known_cnt = 0 node_contents = list( - filter(lambda n: n.object_type == CONTENT, map(lambda n: n[1], node.items())) + filter(lambda n: n.object_type == "content", map(lambda n: n[1], node.items())) ) for sub_node in node_contents: if nodes_data[sub_node.swhid()]["known"]: known_cnt += 1 return (len(node_contents), known_cnt) def has_dirs(node: Directory) -> bool: """Check if the given directory has other directories inside.""" for _, sub_node in node.items(): if isinstance(sub_node, Directory): return True return False def get_content_from( node_path: bytes, source_tree: Directory, nodes_data: MerkleNodeInfo ) -> Dict[bytes, dict]: """Get content information from the given directory node.""" # root in model.from_disk.Directory should be accessed with b"" directory = source_tree[node_path if node_path != source_tree.data["path"] else b""] node_contents = list( filter( - lambda n: n.object_type == CONTENT, map(lambda n: n[1], directory.items()) + lambda n: n.object_type == "content", map(lambda n: n[1], directory.items()) ) ) files_data = {} for node in node_contents: node_info = nodes_data[node.swhid()] node_info["swhid"] = str(node.swhid()) path_name = "path" if "path" in node.data.keys() else "data" files_data[node.data[path_name]] = node_info return files_data diff --git a/swh/scanner/db.py b/swh/scanner/db.py index 0845c63..10bae3f 100644 --- a/swh/scanner/db.py +++ b/swh/scanner/db.py @@ -1,84 +1,80 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """ This module is an interface to interact with the local database where the SWHIDs will be saved for the local API. SWHIDs can be added directly from an input file. """ from io import TextIOWrapper import logging from pathlib import Path -import re import sqlite3 from typing import Iterable from swh.core.utils import grouper +from swh.model.swhids import SWHID_RE from .exceptions import DBError -# XXX copied and simplified from swh.model.identifiers (WIP), replace this in favor of -# swh.model.identifiers.SWHID_RE when it is landed there -SWHID_RE = re.compile("^swh:1:(ori|snp|rel|rev|dir|cnt):[0-9a-f]{40}$") - class Db: """Local database interface""" def __init__(self, db_file: Path): self.db_file: Path = db_file self.conn: sqlite3.Connection = sqlite3.connect( db_file, check_same_thread=False ) def close(self): """Close the connection to the database.""" self.conn.close() def create_table(self, cur: sqlite3.Cursor): """Create the table where the SWHIDs will be stored.""" cur.execute("""CREATE TABLE IF NOT EXISTS swhids (swhid text PRIMARY KEY)""") def add(self, swhids: Iterable[str], chunk_size: int, cur: sqlite3.Cursor): """Insert the SWHID inside the database.""" for swhids_chunk in grouper(swhids, chunk_size): cur.executemany( """INSERT INTO swhids VALUES (?)""", [(swhid_chunk,) for swhid_chunk in swhids_chunk], ) @staticmethod def iter_swhids(lines: Iterable[str]) -> Iterable[str]: lineno = 0 for line in lines: lineno += 1 swhid = line.rstrip() if SWHID_RE.match(swhid): yield swhid else: logging.error("ignoring invalid SWHID on line %d: %s", lineno, swhid) def create_from( self, input_file: TextIOWrapper, chunk_size: int, cur: sqlite3.Cursor ): """Create a new database with the SWHIDs present inside the input file.""" try: self.create_table(cur) cur.execute("PRAGMA synchronous = OFF") cur.execute("PRAGMA journal_mode = OFF") self.add(self.iter_swhids(input_file), chunk_size, cur) cur.close() self.conn.commit() except sqlite3.Error as e: raise DBError(f"SQLite error: {e}") def known(self, swhid: str, cur: sqlite3.Cursor): """Check if a given SWHID is present or not inside the local database.""" cur.execute("""SELECT 1 FROM swhids WHERE swhid=?""", (swhid,)) res = cur.fetchone() return res is not None diff --git a/swh/scanner/policy.py b/swh/scanner/policy.py index 44cf053..8716d4b 100644 --- a/swh/scanner/policy.py +++ b/swh/scanner/policy.py @@ -1,260 +1,260 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc from typing import no_type_check from swh.core.utils import grouper from swh.model.from_disk import Directory -from swh.model.identifiers import CONTENT, DIRECTORY from .client import QUERY_LIMIT, Client from .data import MerkleNodeInfo def source_size(source_tree: Directory): """return the size of a source tree as the number of nodes it contains """ return sum(1 for n in source_tree.iter_tree(dedup=False)) class Policy(metaclass=abc.ABCMeta): data: MerkleNodeInfo """information about contents and directories of the merkle tree""" source_tree: Directory """representation of a source code project directory in the merkle tree""" def __init__(self, source_tree: Directory, data: MerkleNodeInfo): self.source_tree = source_tree self.data = data @abc.abstractmethod async def run(self, client: Client): """Scan a source code project""" raise NotImplementedError("Must implement run method") class LazyBFS(Policy): """Read nodes in the merkle tree using the BFS algorithm. Lookup only directories that are unknown otherwise set all the downstream contents to known. """ async def run(self, client: Client): queue = [] queue.append(self.source_tree) while queue: swhids = [node.swhid() for node in queue] swhids_res = await client.known(swhids) for node in queue.copy(): queue.remove(node) self.data[node.swhid()]["known"] = swhids_res[str(node.swhid())][ "known" ] - if node.object_type == DIRECTORY: + if node.object_type == "directory": if not self.data[node.swhid()]["known"]: children = [n[1] for n in list(node.items())] queue.extend(children) else: for sub_node in node.iter_tree(): if sub_node == node: continue self.data[sub_node.swhid()]["known"] = True # type: ignore class GreedyBFS(Policy): """Query graph nodes in chunks (to maximize the Web API rate limit use) and set the downstream contents of known directories to known. """ async def run(self, client: Client): ssize = source_size(self.source_tree) seen = [] async for nodes_chunk in self.get_nodes_chunks(client, ssize): for node in nodes_chunk: seen.append(node) if len(seen) == ssize: return - if node.object_type == DIRECTORY and self.data[node.swhid()]["known"]: + if node.object_type == "directory" and self.data[node.swhid()]["known"]: sub_nodes = [n for n in node.iter_tree(dedup=False)] sub_nodes.remove(node) # remove root node for sub_node in sub_nodes: seen.append(sub_node) self.data[sub_node.swhid()]["known"] = True @no_type_check async def get_nodes_chunks(self, client: Client, ssize: int): """Query chunks of QUERY_LIMIT nodes at once in order to fill the Web API rate limit. It query all the nodes in the case the source code contains less than QUERY_LIMIT nodes. """ nodes = self.source_tree.iter_tree(dedup=False) for nodes_chunk in grouper(nodes, QUERY_LIMIT): nodes_chunk = [n for n in nodes_chunk] swhids = [node.swhid() for node in nodes_chunk] swhids_res = await client.known(swhids) for node in nodes_chunk: swhid = node.swhid() self.data[swhid]["known"] = swhids_res[str(swhid)]["known"] yield nodes_chunk class FilePriority(Policy): """Check the Merkle tree querying all the file contents and set all the upstream directories to unknown in the case a file content is unknown. Finally check all the directories which status is still unknown and set all the sub-directories of known directories to known. """ @no_type_check async def run(self, client: Client): # get all the files all_contents = list( filter( - lambda node: node.object_type == CONTENT, self.source_tree.iter_tree() + lambda node: node.object_type == "content", self.source_tree.iter_tree() ) ) all_contents.reverse() # check deepest node first # query the backend to get all file contents status cnt_swhids = [node.swhid() for node in all_contents] cnt_status_res = await client.known(cnt_swhids) # set all the file contents status for cnt in all_contents: self.data[cnt.swhid()]["known"] = cnt_status_res[str(cnt.swhid())]["known"] # set all the upstream directories of unknown file contents to unknown if not self.data[cnt.swhid()]["known"]: parent = cnt.parents[0] while parent: self.data[parent.swhid()]["known"] = False parent = parent.parents[0] if parent.parents else None # get all unset directories and check their status # (update children directories accordingly) unset_dirs = list( filter( - lambda node: node.object_type == DIRECTORY + lambda node: node.object_type == "directory" and self.data[node.swhid()]["known"] is None, self.source_tree.iter_tree(), ) ) # check unset directories for dir_ in unset_dirs: if self.data[dir_.swhid()]["known"] is None: # update directory status dir_status = await client.known([dir_.swhid()]) dir_known = dir_status[str(dir_.swhid())]["known"] self.data[dir_.swhid()]["known"] = dir_known if dir_known: sub_dirs = list( filter( - lambda n: n.object_type == DIRECTORY + lambda n: n.object_type == "directory" and self.data[n.swhid()]["known"] is None, dir_.iter_tree(), ) ) for node in sub_dirs: self.data[node.swhid()]["known"] = True class DirectoryPriority(Policy): """Check the Merkle tree querying all the directories that have at least one file content and set all the upstream directories to unknown in the case a directory is unknown otherwise set all the downstream contents to known. Finally check the status of empty directories and all the remaining file contents. """ @no_type_check async def run(self, client: Client): # get all directory contents that have at least one file content unknown_dirs = list( filter( - lambda dir_: dir_.object_type == DIRECTORY and self.has_contents(dir_), + lambda dir_: dir_.object_type == "directory" + and self.has_contents(dir_), self.source_tree.iter_tree(), ) ) unknown_dirs.reverse() # check deepest node first for dir_ in unknown_dirs: if self.data[dir_.swhid()]["known"] is None: dir_status = await client.known([dir_.swhid()]) dir_known = dir_status[str(dir_.swhid())]["known"] self.data[dir_.swhid()]["known"] = dir_known # set all the downstream file contents to known if dir_known: for cnt in self.get_contents(dir_): self.data[cnt.swhid()]["known"] = True # otherwise set all the upstream directories to unknown else: parent = dir_.parents[0] while parent: self.data[parent.swhid()]["known"] = False parent = parent.parents[0] if parent.parents else None # get remaining directories that have no file contents empty_dirs = list( filter( - lambda n: n.object_type == DIRECTORY + lambda n: n.object_type == "directory" and not self.has_contents(n) and self.data[n.swhid()]["known"] is None, self.source_tree.iter_tree(), ) ) empty_dirs_swhids = [n.swhid() for n in empty_dirs] empty_dir_status = await client.known(empty_dirs_swhids) # update status of directories that have no file contents for dir_ in empty_dirs: self.data[dir_.swhid()]["known"] = empty_dir_status[str(dir_.swhid())][ "known" ] # check unknown file contents unknown_cnts = list( filter( - lambda n: n.object_type == CONTENT + lambda n: n.object_type == "content" and self.data[n.swhid()]["known"] is None, self.source_tree.iter_tree(), ) ) unknown_cnts_swhids = [n.swhid() for n in unknown_cnts] unknown_cnts_status = await client.known(unknown_cnts_swhids) for cnt in unknown_cnts: self.data[cnt.swhid()]["known"] = unknown_cnts_status[str(cnt.swhid())][ "known" ] def has_contents(self, directory: Directory): """Check if the directory given in input has contents""" for entry in directory.entries: if entry["type"] == "file": return True return False def get_contents(self, dir_: Directory): """Get all the contents of a given directory""" for _, node in list(dir_.items()): - if node.object_type == CONTENT: + if node.object_type == "content": yield node class QueryAll(Policy): """Check the status of every node in the Merkle tree. """ @no_type_check async def run(self, client: Client): all_nodes = [node for node in self.source_tree.iter_tree()] all_swhids = [node.swhid() for node in all_nodes] swhids_res = await client.known(all_swhids) for node in all_nodes: self.data[node.swhid()]["known"] = swhids_res[str(node.swhid())]["known"] diff --git a/swh/scanner/tests/test_client.py b/swh/scanner/tests/test_client.py index 6a85eec..26c1625 100644 --- a/swh/scanner/tests/test_client.py +++ b/swh/scanner/tests/test_client.py @@ -1,58 +1,58 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import pytest -from swh.model.identifiers import CoreSWHID +from swh.model.swhids import CoreSWHID from swh.scanner.client import Client from swh.scanner.exceptions import APIError from .data import correct_known_api_response, correct_origin_api_response AIO_URL = "http://example.org/api/" KNOWN_URL = f"{AIO_URL}known/" ORIGIN_URL = f"{AIO_URL}graph/randomwalk/" def test_client_known_correct_api_request(mock_aioresponse, event_loop, aiosession): mock_aioresponse.post( KNOWN_URL, status=200, content_type="application/json", body=json.dumps(correct_known_api_response), ) client = Client(AIO_URL, aiosession) actual_result = event_loop.run_until_complete(client.known([])) assert correct_known_api_response == actual_result def test_client_known_raise_apierror(mock_aioresponse, event_loop, aiosession): mock_aioresponse.post(KNOWN_URL, content_type="application/json", status=413) client = Client(AIO_URL, aiosession) with pytest.raises(APIError): event_loop.run_until_complete(client.known([])) def test_client_get_origin_correct_api_request( mock_aioresponse, event_loop, aiosession ): origin_url = ( f"{ORIGIN_URL}swh:1:dir:01fa282bb80be5907505d44b4692d3fa40fad140/ori" f"/?direction=backward&limit=-1&resolve_origins=true" ) mock_aioresponse.get( origin_url, status=200, body=correct_origin_api_response, ) client = Client(AIO_URL, aiosession) swhid = CoreSWHID.from_string("swh:1:dir:01fa282bb80be5907505d44b4692d3fa40fad140") actual_result = event_loop.run_until_complete(client.get_origin(swhid)) assert correct_origin_api_response == actual_result diff --git a/swh/scanner/tests/test_dashboard.py b/swh/scanner/tests/test_dashboard.py index 93c7663..380718a 100644 --- a/swh/scanner/tests/test_dashboard.py +++ b/swh/scanner/tests/test_dashboard.py @@ -1,46 +1,46 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import dash_html_components as html -from swh.model.identifiers import CoreSWHID, ObjectType +from swh.model.swhids import CoreSWHID, ObjectType from swh.scanner.dashboard.dashboard import generate_table_body from swh.scanner.data import MerkleNodeInfo def test_generate_table_body(source_tree): chart_path = b"/bar/barfoo" dir_path = source_tree[b"/bar/barfoo"].data["path"].decode() nodes_data = MerkleNodeInfo() # CoreSWHID of 'another-quote.org' known_cnt_swhid = CoreSWHID( object_type=ObjectType.CONTENT, object_id=b"\x136\x93\xb1%\xba\xd2\xb4\xac1\x855\xb8I\x01\xeb\xb1\xf6\xb68", ) nodes_data[known_cnt_swhid] = {"known": True} generated_body = generate_table_body(chart_path, source_tree, nodes_data) expected_body = [ html.Tbody( [ html.Tr( [ html.Td("✔"), html.Td( html.A( children="another-quote.org", href=f"file://{dir_path}/another-quote.org", ) ), html.Td("swh:1:cnt:133693b125bad2b4ac318535b84901ebb1f6b638"), ] ), ] ) ] # workaround: dash_html_component.__eq__ checks for object identity only assert str(generated_body) == str(expected_body) diff --git a/swh/scanner/tests/test_policy.py b/swh/scanner/tests/test_policy.py index 937408c..6898e43 100644 --- a/swh/scanner/tests/test_policy.py +++ b/swh/scanner/tests/test_policy.py @@ -1,148 +1,148 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from flask import url_for import pytest -from swh.model.identifiers import CONTENT, CoreSWHID, ObjectType +from swh.model.swhids import CoreSWHID, ObjectType from swh.scanner.client import Client from swh.scanner.data import MerkleNodeInfo, init_merkle_node_info from swh.scanner.policy import ( DirectoryPriority, FilePriority, GreedyBFS, LazyBFS, source_size, ) def test_scanner_directory_priority_has_contents(source_tree): nodes_data = MerkleNodeInfo() policy = DirectoryPriority(source_tree, nodes_data) assert policy.has_contents(source_tree[b"/bar/barfoo"]) def get_backend_swhids_order(tmp_requests): with open(tmp_requests, "r") as f: backend_swhids_order = f.readlines() return [x.strip() for x in backend_swhids_order] def test_lazybfs_policy( live_server, aiosession, event_loop, source_tree_policy, tmp_requests ): open(tmp_requests, "w").close() api_url = url_for("index", _external=True) nodes_data = MerkleNodeInfo() init_merkle_node_info(source_tree_policy, nodes_data, {"known"}) policy = LazyBFS(source_tree_policy, nodes_data) client = Client(api_url, aiosession) event_loop.run_until_complete(policy.run(client)) backend_swhids_requests = get_backend_swhids_order(tmp_requests) assert ( backend_swhids_requests[0] == "swh:1:dir:fe8cd7076bef324eb8865f818ef08617879022ce" ) # the second request must contain 3 SWHIDs related to directories and one content dir_count, cnt_count = 0, 0 for swhid in backend_swhids_requests[1:5]: if CoreSWHID.from_string(swhid).object_type == ObjectType.DIRECTORY: dir_count += 1 else: cnt_count += 1 assert dir_count == 3 assert cnt_count == 1 # the last swhid must be a content related to the unknown directory # "sample-folder-policy/toexclude" assert ( backend_swhids_requests[5] == "swh:1:cnt:5f1cfce26640056bed3710cfaf3062a6a326a119" ) def test_directory_priority_policy( live_server, aiosession, event_loop, source_tree_policy, tmp_requests ): open(tmp_requests, "w").close() api_url = url_for("index", _external=True) nodes_data = MerkleNodeInfo() init_merkle_node_info(source_tree_policy, nodes_data, {"known"}) policy = DirectoryPriority(source_tree_policy, nodes_data) client = Client(api_url, aiosession) event_loop.run_until_complete(policy.run(client)) backend_swhids_requests = get_backend_swhids_order(tmp_requests) for swhid in backend_swhids_requests[0:4]: assert CoreSWHID.from_string(swhid).object_type == ObjectType.DIRECTORY for swhid in backend_swhids_requests[5:]: assert CoreSWHID.from_string(swhid).object_type == ObjectType.CONTENT def test_file_priority_policy( live_server, aiosession, event_loop, source_tree_policy, tmp_requests ): open(tmp_requests, "w").close() api_url = url_for("index", _external=True) nodes_data = MerkleNodeInfo() init_merkle_node_info(source_tree_policy, nodes_data, {"known"}) policy = FilePriority(source_tree_policy, nodes_data) client = Client(api_url, aiosession) event_loop.run_until_complete(policy.run(client)) backend_swhids_requests = get_backend_swhids_order(tmp_requests) for swhid in backend_swhids_requests[0:4]: assert CoreSWHID.from_string(swhid).object_type == ObjectType.CONTENT for swhid in backend_swhids_requests[5:]: assert CoreSWHID.from_string(swhid).object_type == ObjectType.DIRECTORY def test_greedy_bfs_policy( live_server, event_loop, aiosession, big_source_tree, tmp_requests ): open(tmp_requests, "w").close() api_url = url_for("index", _external=True) nodes_data = MerkleNodeInfo() init_merkle_node_info(big_source_tree, nodes_data, {"known"}) policy = GreedyBFS(big_source_tree, nodes_data) client = Client(api_url, aiosession) event_loop.run_until_complete(policy.run(client)) backend_swhids_requests = get_backend_swhids_order(tmp_requests) last_swhid = backend_swhids_requests[-1] assert CoreSWHID.from_string(last_swhid).object_type == ObjectType.CONTENT @pytest.mark.asyncio async def test_greedy_bfs_get_nodes_chunks(live_server, aiosession, big_source_tree): api_url = url_for("index", _external=True) nodes_data = MerkleNodeInfo() init_merkle_node_info(big_source_tree, nodes_data, {"known"}) policy = GreedyBFS(big_source_tree, nodes_data) client = Client(api_url, aiosession) chunks = [ n_chunk async for n_chunk in policy.get_nodes_chunks( client, source_size(big_source_tree) ) ] assert len(chunks) == 2 - assert chunks[1][-1].object_type == CONTENT + assert chunks[1][-1].object_type == "content"