Changeset View
Changeset View
Standalone View
Standalone View
swh/scanner/data.py
# Copyright (C) 2021 The Software Heritage developers | # Copyright (C) 2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from pathlib import Path | from pathlib import Path | ||||
from typing import Dict, Tuple | from typing import Dict, Optional, Tuple | ||||
from swh.model.exceptions import ValidationError | from swh.model.exceptions import ValidationError | ||||
from swh.model.from_disk import Directory | from swh.model.from_disk import Directory | ||||
from swh.model.identifiers import CONTENT, DIRECTORY, CoreSWHID | from swh.model.identifiers import CONTENT, DIRECTORY, CoreSWHID | ||||
from .client import Client | |||||
SUPPORTED_INFO = {"known", "origin"} | |||||
class MerkleNodeInfo(dict): | class MerkleNodeInfo(dict): | ||||
"""Store additional information about Merkle DAG nodes, using SWHIDs as keys""" | """Store additional information about Merkle DAG nodes, using SWHIDs as keys""" | ||||
def __setitem__(self, key, value): | def __setitem__(self, key, value): | ||||
"""The keys must be valid valid Software Heritage Persistent Identifiers | """The keys must be valid valid Software Heritage Persistent Identifiers | ||||
while values must be dict. | while values must be dict. | ||||
""" | """ | ||||
if not isinstance(key, CoreSWHID): | if not isinstance(key, CoreSWHID): | ||||
raise ValidationError("keys must be valid SWHID(s)") | raise ValidationError("keys must be valid SWHID(s)") | ||||
if not isinstance(value, dict): | if not isinstance(value, dict): | ||||
raise ValidationError(f"values must be dict, not {type(value)}") | raise ValidationError(f"values must be dict, not {type(value)}") | ||||
super(MerkleNodeInfo, self).__setitem__(key, value) | super(MerkleNodeInfo, self).__setitem__(key, value) | ||||
def init_merkle_node_info(source_tree: Directory, data: MerkleNodeInfo, info: set): | |||||
"""Populate the MerkleNodeInfo with the SWHIDs of the given source tree and the | |||||
attributes that will be stored. | |||||
""" | |||||
if not info: | |||||
raise Exception("Data initialization requires node attributes values.") | |||||
nodes_info: Dict[str, Optional[str]] = {} | |||||
for ainfo in info: | |||||
if ainfo in SUPPORTED_INFO: | |||||
nodes_info[ainfo] = None | |||||
else: | |||||
raise Exception(f"Information {ainfo} is not supported.") | |||||
for node in source_tree.iter_tree(): | |||||
data[node.swhid()] = nodes_info.copy() # type: ignore | |||||
async def add_origin(source_tree: Directory, data: MerkleNodeInfo, client: Client): | |||||
"""Store origin information about software artifacts retrieved from the Software | |||||
Heritage graph service. | |||||
""" | |||||
queue = [] | |||||
queue.append(source_tree) | |||||
while queue: | |||||
for node in queue.copy(): | |||||
queue.remove(node) | |||||
node_ori = await client.get_origin(node.swhid()) | |||||
if node_ori: | |||||
data[node.swhid()]["origin"] = node_ori | |||||
if node.object_type == DIRECTORY: | |||||
for sub_node in node.iter_tree(): | |||||
data[sub_node.swhid()]["origin"] = node_ori # type: ignore | |||||
else: | |||||
if node.object_type == DIRECTORY: | |||||
children = [sub_node for sub_node in node.iter_tree()] | |||||
children.remove(node) | |||||
queue.extend(children) # type: ignore | |||||
def get_directory_data( | def get_directory_data( | ||||
root_path: str, | root_path: str, | ||||
source_tree: Directory, | source_tree: Directory, | ||||
nodes_data: MerkleNodeInfo, | nodes_data: MerkleNodeInfo, | ||||
directory_data: Dict = {}, | directory_data: Dict = {}, | ||||
) -> Dict[Path, dict]: | ) -> Dict[Path, dict]: | ||||
"""Get content information for each directory inside source_tree. | """Get content information for each directory inside source_tree. | ||||
▲ Show 20 Lines • Show All 70 Lines • Show Last 20 Lines |